大可制作:QQ群:31564239(asp|jsp|php|mysql)

Java Gossip: BlockingQueue

伫列(Queue)是个先前先出(First In First Out, FIFO)的数据结构。在JDK 5.0中新增了Blocking Queue,在多线程的情况下,如果Blocking Queue的内容为空,而有个线程试图从Queue中取出元素,则该线程会被Block,直到Queue有元素时才解除Block,反过来说,如果 Blocking Queue满了,而有个线程试图再把数据填入Queue中,则该线程会被Block,直到Queue中有元素被取走后解除Block。

BlockingQueue的几个主要操作为下:

方法 说明
add 加入元素,如果伫列是满的,则丢出IllegalStateException
remove 传回并从伫列移除元素,如果伫列是空的,则丢出NoSuchElementException
element 传回元素,如果伫列是空的,则丢出NoSuchElementException
offer 加入元素并传回true,如果伫列是满的,则传回false
poll 传回并从伫列移除元素,如果伫列是空的,则传回null
peek 传回元素,如果伫列是空的,则传回null
put 加入元素,如果伫列是满,就block
take 传回并移除元素,如果伫列是空的,就block

java.util.concurrent下提供几种不同的Blocking Queue,ArrayBlockingQueue要指定容量大小来建构,LinkedBlockingQueue默认没有容量上限,但也可以指定容量上限,PriorityBlockingQueue严格来说不是Queue,因为它是根据优先权(Priority)来移除元素。

在这边以 wait()、notify() 中的生产者、消费者程序为例,使用BlockQueue来加以改写,好处是我们不用亲自处理wait、notify的细节,首先生产者改写如下:

  • Producer.java
package onlyfun.caterpillar;

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {
private BlockingQueue<Integer> queue;

public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}

public void run() {
for(int product = 1; product <= 10; product++) {
try {
// wait for a random time
Thread.sleep((int) Math.random() * 3000);
queue.put(product);
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
}
}

消费者类改写如下:
  • Consumer.java
package onlyfun.caterpillar;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
private BlockingQueue<Integer> queue;

public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}

public void run() {
for(int i = 1; i <= 10; i++) {
try {
// wait for a random time
Thread.sleep((int) (Math.random() * 3000));
queue.take();
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
}
}

可以使用下面这个程序来简单的测试一下:
  • BlockingQueueDemo.java
package onlyfun.caterpillar;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(1);

Thread producerThread = new Thread(
new Producer(queue));
Thread consumerThread = new Thread(
new Consumer(queue));

producerThread.start();
consumerThread.start();
}
}