Java多线程生产者-消费者模式实战教程 – 并发编程核心案例解析

2025-09-03 0 609

一、生产者-消费者模式概述

生产者-消费者模式是并发编程中的经典设计模式,用于解决多线程环境下线程间协作和资源共享的问题。

该模式包含三个主要组件:

  • 生产者:生成数据并放入共享缓冲区
  • 消费者:从共享缓冲区取出并处理数据
  • 缓冲区:协调生产者和消费者的工作节奏,解耦两者

二、实现方式比较

实现方式 优点 缺点
wait()/notify() 原生支持,无需额外依赖 代码复杂,容易出错
BlockingQueue 简单易用,线程安全 需要理解队列特性
Lock和Condition 灵活性高,功能强大 代码相对复杂

三、基于BlockingQueue的实现

Java的java.util.concurrent包提供了BlockingQueue接口,它是实现生产者-消费者模式的最佳选择。

1. 生产者线程实现

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class Producer implements Runnable {
    private final BlockingQueue queue;
    private final AtomicInteger counter = new AtomicInteger(0);
    private final int maxItems;
    private static final int DELAY = 100;

    public Producer(BlockingQueue queue, int maxItems) {
        this.queue = queue;
        this.maxItems = maxItems;
    }

    @Override
    public void run() {
        try {
            while (counter.get() < maxItems) {
                int value = counter.incrementAndGet();
                queue.put(value); // 阻塞直到有空间可用
                System.out.println(Thread.currentThread().getName() + " 生产: " + value);
                Thread.sleep(DELAY); // 模拟生产耗时
            }
            System.out.println(Thread.currentThread().getName() + " 完成生产");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println(Thread.currentThread().getName() + " 被中断");
        }
    }
}
    

2. 消费者线程实现

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
    private final BlockingQueue queue;
    private static final int DELAY = 150;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Integer value = queue.take(); // 阻塞直到有元素可用
                System.out.println(Thread.currentThread().getName() + " 消费: " + value);
                processValue(value);
                Thread.sleep(DELAY); // 模拟消费耗时
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println(Thread.currentThread().getName() + " 被中断");
        }
    }

    private void processValue(Integer value) {
        // 模拟数据处理
        System.out.println(Thread.currentThread().getName() + " 正在处理: " + value);
    }
}
    

3. 主程序与测试

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ProducerConsumerDemo {
    public static void main(String[] args) throws InterruptedException {
        // 创建有界阻塞队列,容量为5
        BlockingQueue queue = new LinkedBlockingQueue(5);
        
        // 创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(4);
        
        // 创建生产者
        Producer producer1 = new Producer(queue, 10);
        Producer producer2 = new Producer(queue, 10);
        
        // 创建消费者
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);
        
        // 提交任务到线程池
        executor.execute(producer1);
        executor.execute(producer2);
        executor.execute(consumer1);
        executor.execute(consumer2);
        
        // 运行30秒后关闭
        Thread.sleep(30000);
        
        // 优雅关闭线程池
        executor.shutdownNow();
        if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
            System.err.println("线程池未正常关闭");
        }
        
        System.out.println("程序结束,队列剩余: " + queue.size() + " 个元素");
    }
}
    

四、使用wait()和notify()的传统实现

虽然不推荐在现代Java代码中使用,但理解wait()/notify()机制对掌握线程同步原理很有帮助。

1. 传统方式的缓冲区实现

import java.util.LinkedList;
import java.util.Queue;

public class TraditionalBuffer {
    private final Queue queue = new LinkedList();
    private final int capacity;
    
    public TraditionalBuffer(int capacity) {
        this.capacity = capacity;
    }
    
    public synchronized void put(int value) throws InterruptedException {
        while (queue.size() == capacity) {
            wait(); // 缓冲区满,等待
        }
        queue.add(value);
        System.out.println("生产: " + value + ",缓冲区大小: " + queue.size());
        notifyAll(); // 通知消费者可以消费了
    }
    
    public synchronized int take() throws InterruptedException {
        while (queue.isEmpty()) {
            wait(); // 缓冲区空,等待
        }
        int value = queue.poll();
        System.out.println("消费: " + value + ",缓冲区大小: " + queue.size());
        notifyAll(); // 通知生产者可以生产了
        return value;
    }
    
    public synchronized int size() {
        return queue.size();
    }
}
    

五、高级主题:使用Lock和Condition

Java 5引入了Lock和Condition接口,提供了更灵活的线程同步机制。

1. 使用Lock和Condition的缓冲区

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class AdvancedBuffer {
    private final Queue queue = new LinkedList();
    private final int capacity;
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
    
    public AdvancedBuffer(int capacity) {
        this.capacity = capacity;
    }
    
    public void put(int value) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                notFull.await(); // 缓冲区满,等待
            }
            queue.add(value);
            System.out.println("生产: " + value + ",缓冲区大小: " + queue.size());
            notEmpty.signal(); // 通知消费者可以消费了
        } finally {
            lock.unlock();
        }
    }
    
    public int take() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                notEmpty.await(); // 缓冲区空,等待
            }
            int value = queue.poll();
            System.out.println("消费: " + value + ",缓冲区大小: " + queue.size());
            notFull.signal(); // 通知生产者可以生产了
            return value;
        } finally {
            lock.unlock();
        }
    }
    
    public int size() {
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }
}
    

六、性能优化和最佳实践

  1. 选择合适的队列类型
    • ArrayBlockingQueue:固定大小,基于数组
    • LinkedBlockingQueue:可选边界,基于链表
    • PriorityBlockingQueue:支持优先级排序
    • SynchronousQueue:不存储元素,直接传递
  2. 合理设置线程数量:根据CPU核心数和任务类型确定
  3. 正确处理中断:及时响应中断请求,确保线程能正常退出
  4. 使用线程池:避免频繁创建销毁线程的开销
  5. 监控和调试:使用JMX或自定义监控统计生产消费速度

七、常见问题与解决方案

问题 现象 解决方案
死锁 程序卡住,无响应 避免嵌套锁,按固定顺序获取锁
活锁 线程持续工作但无进展 引入随机退避机制
资源耗尽 内存溢出,系统变慢 使用有界队列,限制生产速度
生产者过快 队列积压,内存增长 使用背压机制,限制生产速率
消费者过快 CPU空转,资源浪费 合理设置消费者等待策略

八、总结

生产者-消费者模式是Java多线程编程中最重要和最常用的模式之一。本文介绍了三种实现方式:

  1. 基于BlockingQueue的现代实现(推荐)
  2. 使用wait()/notify()的传统实现
  3. 使用Lock和Condition的高级实现

在实际开发中,应根据具体需求选择合适的实现方式。对于大多数场景,建议使用BlockingQueue,因为它提供了线程安全、高效且易于使用的API。

正确实现生产者-消费者模式能够提高程序性能、资源利用率和响应能力,是多线程开发必备的核心技能。

Java多线程生产者-消费者模式实战教程 - 并发编程核心案例解析
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 java Java多线程生产者-消费者模式实战教程 – 并发编程核心案例解析 https://www.taomawang.com/server/java/1020.html

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务