Java队列

Java阻塞队列

阻塞队列通过添加两组方法来扩展队列:

o 一组方法无限期地阻塞

o 另一组方法允许您指定要阻止的时间段。

BlockingQueue 接口的实例表示阻塞队列。 BlockingQueue 接口继承自 Queue 接口。

put()和 offer()方法在阻塞队列的尾部添加一个元素。如果阻塞队列已满,则put()方法将无限期阻塞,直到队列中的空间可用。offer()方法允许您指定等待空间可用的时间段。 如果指定的元素添加成功,则返回true; 否则为假。

take()和poll()方法检索和删除阻塞队列的头。如果阻塞队列为空,take()方法将无限期阻塞。poll()方法允许您指定在阻塞队列为空时要等待的时间段; 如果在元素可用之前过去了指定的时间,则返回null。

来自 BlockingQueue 中 Queue 接口的方法就像使用 Queue 。

BlockingQueue 被设计为线程安全的并且可以使用在生产者/消费者的情况下。

阻塞队列不允许空元素和可以是有界的或无界的。

BlockingQueue 中的 remainingCapacity()返回可以添加到阻止队列中而不阻塞的元素数。

BlockingQueue 可以控制多个线程被阻塞时的公平性。 如果阻塞队列是公平的,它可以选择最长的等待线程来执行操作。如果阻塞队列不公平,则不指定选择的顺序。

BlockingQueue 接口及其所有实现类都在 java.util.concurrent 包中。 以下是 BlockingQueue 接口的实现类:

由数组支持的 ArrayBlockingQueue 是 BlockingQueue 的有界实现类。 我们可以在其构造函数中指定阻塞队列的公平性。 默认情况下,它不公平。

LinkedBlockingQueue 可以用作有界或无界阻塞队列。 它不允许为阻塞队列指定公平规则。

PriorityBlockingQueue 是 BlockingQueue 的无界实现类。 它的工作方式与 PriortyQueue 相同,用于排序阻塞队列中的元素,并将阻塞特性添加到 PriorityQueue 中。

SynchronousQueue 实现 BlockingQueue ,没有任何容量。 put操作等待take操作以获取元素。 它可以在两个线程之间进行握手,并在两个线程之间交换对象。 它的isEmpty()方法总是返回true。

DelayQueue是BlockingQueue的无界实现类。它保持一个元素,直到该元素经过指定的延迟。 如果有超过一个元素的延迟已经过去,那么其延迟最早传递的元素将被放置在队列的头部。

例子

以下代码显示了如何在生产者/消费者应用程序中使用阻塞队列。

import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

class BQProducer extends Thread {
  private final BlockingQueue queue;
  private final String name;
  public BQProducer(BlockingQueue queue, String name) {
    this.queue = queue;
    this.name = name;
  }
  @Override
  public void run() {
    while (true) {
      try {
        this.queue.put(UUID.randomUUID().toString());
        Thread.sleep(4000);
      }
      catch (InterruptedException e) {
        e.printStackTrace();
        break;
      }
    }
  }
}
class BQConsumer extends Thread {
  private final BlockingQueue queue;
  private final String name;
  public BQConsumer(BlockingQueue queue, String name) {
    this.queue = queue;
    this.name = name;
  }

  @Override
  public void run() {
    while (true) {
      try {
        String str = this.queue.take();
        System.out.println(name + "  took: " + str);
        Thread.sleep(3000);
      } catch (InterruptedException e) {
        e.printStackTrace();
        break;
      }
    }
  }
}

public class Main {
  public static void main(String[] args) {
    int capacity = 5;
    boolean fair = true;
    BlockingQueue queue = new ArrayBlockingQueue<>(capacity, fair);

    new BQProducer(queue, "Producer1").start();
    new BQProducer(queue, "Producer2").start();
    new BQProducer(queue, "Producer3").start();
    new BQConsumer(queue, "Consumer1").start();
    new BQConsumer(queue, "Consumer2").start();
  }
}

上面的代码生成以下结果。