传输队列扩展阻塞队列。
生产者使用 TransferQueue 的 transfer(E element)方法将元素传递给消费者。
当生产者调用传递(E元素)方法时,它等待直到消费者获取其元素。 tryTransfer()方法提供了该方法的非阻塞和超时版本。
getWaitingConsumerCount()方法返回等待消费者的数量。
如果有一个等待消费者, hasWaitingConsumer()方法返回true; 否则,返回false。 LinkedTransferQueue 是 TransferQueue 接口的实现类。 它提供了一个无界的 TransferQueue 。
以下代码显示如何使用 TransferQueue 。
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;
class TQProducer extends Thread {
private String name;
private TransferQueue<Integer> tQueue;
private AtomicInteger sequence;
public TQProducer(String name, TransferQueue<Integer> tQueue,
AtomicInteger sequence) {
this.name = name;
this.tQueue = tQueue;
this.sequence = sequence;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(4000);
int nextNum = this.sequence.incrementAndGet();
if (nextNum % 2 == 0) {
System.out.format("%s: Enqueuing: %d%n", name, nextNum);
tQueue.put(nextNum); // Enqueue
} else {
System.out.format("%s: Handing off: %d%n", name, nextNum);
System.out.format("%s: has a waiting consumer: %b%n", name,
tQueue.hasWaitingConsumer());
tQueue.transfer(nextNum); // A hand off
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class TQConsumer extends Thread {
private final String name;
private final TransferQueue<Integer> tQueue;
public TQConsumer(String name, TransferQueue<Integer> tQueue) {
this.name = name;
this.tQueue = tQueue;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(3000);
int item = tQueue.take();
System.out.format("%s removed: %d%n", name, item);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Main {
public static void main(String[] args) {
final TransferQueue<Integer> tQueue = new LinkedTransferQueue<>();
final AtomicInteger sequence = new AtomicInteger();
for (int i = 0; i < 5; i++) {
try {
tQueue.put(sequence.incrementAndGet());
System.out.println("Initial queue: " + tQueue);
new TQProducer("Producer-1", tQueue, sequence).start();
new TQConsumer("Consumer-1", tQueue).start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
上面的代码生成以下结果。