生产消费者模型
多线程并发应用程序有一个经典的模型,即生产者/消费者模型。系统中,产生消息的是生产者,处理消息的是消费者,消费者和生产者通过一个缓冲区进行消息传递。生产者产生消息后提交到缓冲区,然后通知消费者可以从中取出消息进行处理。消费者处理完信息后,通知生产者可以继续提供消息。
要实现这个模型,关键在于消费者和生产者这两个线程进行同步。也就是说:只有缓冲区中有消息时,消费者才能够提取消息;只有消息已被处理,生产者才能产生消息提交到缓冲区。
生产消费者模式如下图。
缓冲区实际上是一个先进先出的队列,锁(信号量)的条件notEmpty和notFull。
Java实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
import java.util.concurrent.*; import java.util.concurrent.locks.*; public class ConsumerProducer { private static Buffer buffer = new Buffer(); public static void main(String[] args) { // Create a thread pool with two threads ExecutorService executor = Executors.newFixedThreadPool(2); executor.execute(new ProducerTask()); executor.execute(new ConsumerTask()); executor.shutdown(); } // A task for adding an int to the buffer private static class ProducerTask implements Runnable { public void run() { try { int i = 1; while (true) { System.out.println("生产者生产 " + i); buffer.write(i++); // Add a value to the buffer // Put the thread into sleep Thread.sleep((int)(Math.random() * 1000)); } } catch (InterruptedException ex) { ex.printStackTrace(); } } } // A task for reading and deleting an int from the buffer private static class ConsumerTask implements Runnable { public void run() { try { while (true) { System.out.println("\t\t\t消费者消费 " + buffer.read()); // Put the thread into sleep Thread.sleep((int)(Math.random() * 1000)); } } catch (InterruptedException ex) { ex.printStackTrace(); } } } // An inner class for buffer private static class Buffer { private static final int CAPACITY = 1; // buffer size private java.util.LinkedList<Integer> queue = new java.util.LinkedList<>(); // Create a new lock private static Lock lock = new ReentrantLock(); // Create two conditions private static Condition notEmpty = lock.newCondition(); private static Condition notFull = lock.newCondition(); public void write(int value) { lock.lock(); // Acquire the lock try { while (queue.size() == CAPACITY) { System.out.println("缓冲区已满!"); notFull.await(); } queue.offer(value); notEmpty.signal(); // Signal notEmpty condition } catch (InterruptedException ex) { ex.printStackTrace(); } finally { lock.unlock(); // Release the lock } } public int read() { int value = 0; lock.lock(); // Acquire the lock try { while (queue.isEmpty()) { System.out.println("\t\t\t缓冲区为空"); notEmpty.await(); } value = queue.remove(); notFull.signal(); // Signal notFull condition } catch (InterruptedException ex) { ex.printStackTrace(); } finally { lock.unlock(); // Release the lock return value; } } } } |
效果: