一、生产者-消费者模式概述
生产者-消费者模式是并发编程中的经典设计模式,用于解决多线程环境下线程间协作和资源共享的问题。
该模式包含三个主要组件:
- 生产者:生成数据并放入共享缓冲区
- 消费者:从共享缓冲区取出并处理数据
- 缓冲区:协调生产者和消费者的工作节奏,解耦两者
二、实现方式比较
实现方式 | 优点 | 缺点 |
---|---|---|
wait()/notify() | 原生支持,无需额外依赖 | 代码复杂,容易出错 |
BlockingQueue | 简单易用,线程安全 | 需要理解队列特性 |
Lock和Condition | 灵活性高,功能强大 | 代码相对复杂 |
三、基于BlockingQueue的实现
Java的java.util.concurrent
包提供了BlockingQueue
接口,它是实现生产者-消费者模式的最佳选择。
1. 生产者线程实现
import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; public class Producer implements Runnable { private final BlockingQueue queue; private final AtomicInteger counter = new AtomicInteger(0); private final int maxItems; private static final int DELAY = 100; public Producer(BlockingQueue queue, int maxItems) { this.queue = queue; this.maxItems = maxItems; } @Override public void run() { try { while (counter.get() < maxItems) { int value = counter.incrementAndGet(); queue.put(value); // 阻塞直到有空间可用 System.out.println(Thread.currentThread().getName() + " 生产: " + value); Thread.sleep(DELAY); // 模拟生产耗时 } System.out.println(Thread.currentThread().getName() + " 完成生产"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.out.println(Thread.currentThread().getName() + " 被中断"); } } }
2. 消费者线程实现
import java.util.concurrent.BlockingQueue; public class Consumer implements Runnable { private final BlockingQueue queue; private static final int DELAY = 150; public Consumer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { while (true) { Integer value = queue.take(); // 阻塞直到有元素可用 System.out.println(Thread.currentThread().getName() + " 消费: " + value); processValue(value); Thread.sleep(DELAY); // 模拟消费耗时 } } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.out.println(Thread.currentThread().getName() + " 被中断"); } } private void processValue(Integer value) { // 模拟数据处理 System.out.println(Thread.currentThread().getName() + " 正在处理: " + value); } }
3. 主程序与测试
import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public class ProducerConsumerDemo { public static void main(String[] args) throws InterruptedException { // 创建有界阻塞队列,容量为5 BlockingQueue queue = new LinkedBlockingQueue(5); // 创建线程池 ExecutorService executor = Executors.newFixedThreadPool(4); // 创建生产者 Producer producer1 = new Producer(queue, 10); Producer producer2 = new Producer(queue, 10); // 创建消费者 Consumer consumer1 = new Consumer(queue); Consumer consumer2 = new Consumer(queue); // 提交任务到线程池 executor.execute(producer1); executor.execute(producer2); executor.execute(consumer1); executor.execute(consumer2); // 运行30秒后关闭 Thread.sleep(30000); // 优雅关闭线程池 executor.shutdownNow(); if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { System.err.println("线程池未正常关闭"); } System.out.println("程序结束,队列剩余: " + queue.size() + " 个元素"); } }
四、使用wait()和notify()的传统实现
虽然不推荐在现代Java代码中使用,但理解wait()/notify()机制对掌握线程同步原理很有帮助。
1. 传统方式的缓冲区实现
import java.util.LinkedList; import java.util.Queue; public class TraditionalBuffer { private final Queue queue = new LinkedList(); private final int capacity; public TraditionalBuffer(int capacity) { this.capacity = capacity; } public synchronized void put(int value) throws InterruptedException { while (queue.size() == capacity) { wait(); // 缓冲区满,等待 } queue.add(value); System.out.println("生产: " + value + ",缓冲区大小: " + queue.size()); notifyAll(); // 通知消费者可以消费了 } public synchronized int take() throws InterruptedException { while (queue.isEmpty()) { wait(); // 缓冲区空,等待 } int value = queue.poll(); System.out.println("消费: " + value + ",缓冲区大小: " + queue.size()); notifyAll(); // 通知生产者可以生产了 return value; } public synchronized int size() { return queue.size(); } }
五、高级主题:使用Lock和Condition
Java 5引入了Lock和Condition接口,提供了更灵活的线程同步机制。
1. 使用Lock和Condition的缓冲区
import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class AdvancedBuffer { private final Queue queue = new LinkedList(); private final int capacity; private final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); public AdvancedBuffer(int capacity) { this.capacity = capacity; } public void put(int value) throws InterruptedException { lock.lock(); try { while (queue.size() == capacity) { notFull.await(); // 缓冲区满,等待 } queue.add(value); System.out.println("生产: " + value + ",缓冲区大小: " + queue.size()); notEmpty.signal(); // 通知消费者可以消费了 } finally { lock.unlock(); } } public int take() throws InterruptedException { lock.lock(); try { while (queue.isEmpty()) { notEmpty.await(); // 缓冲区空,等待 } int value = queue.poll(); System.out.println("消费: " + value + ",缓冲区大小: " + queue.size()); notFull.signal(); // 通知生产者可以生产了 return value; } finally { lock.unlock(); } } public int size() { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } }
六、性能优化和最佳实践
- 选择合适的队列类型:
- ArrayBlockingQueue:固定大小,基于数组
- LinkedBlockingQueue:可选边界,基于链表
- PriorityBlockingQueue:支持优先级排序
- SynchronousQueue:不存储元素,直接传递
- 合理设置线程数量:根据CPU核心数和任务类型确定
- 正确处理中断:及时响应中断请求,确保线程能正常退出
- 使用线程池:避免频繁创建销毁线程的开销
- 监控和调试:使用JMX或自定义监控统计生产消费速度
七、常见问题与解决方案
问题 | 现象 | 解决方案 |
---|---|---|
死锁 | 程序卡住,无响应 | 避免嵌套锁,按固定顺序获取锁 |
活锁 | 线程持续工作但无进展 | 引入随机退避机制 |
资源耗尽 | 内存溢出,系统变慢 | 使用有界队列,限制生产速度 |
生产者过快 | 队列积压,内存增长 | 使用背压机制,限制生产速率 |
消费者过快 | CPU空转,资源浪费 | 合理设置消费者等待策略 |
八、总结
生产者-消费者模式是Java多线程编程中最重要和最常用的模式之一。本文介绍了三种实现方式:
- 基于BlockingQueue的现代实现(推荐)
- 使用wait()/notify()的传统实现
- 使用Lock和Condition的高级实现
在实际开发中,应根据具体需求选择合适的实现方式。对于大多数场景,建议使用BlockingQueue
,因为它提供了线程安全、高效且易于使用的API。
正确实现生产者-消费者模式能够提高程序性能、资源利用率和响应能力,是多线程开发必备的核心技能。