Java并发编程实战:多线程与线程池深度解析 | Java高级教程

2025-09-16 0 738

作者:Java高级工程师 | 发布日期:2023年11月20日

为什么并发编程在现代Java开发中至关重要?

随着多核处理器的普及,并发编程已成为提升Java应用程序性能的关键技术。合理使用多线程能够充分利用CPU资源,提高程序吞吐量和响应速度,特别是在高并发、大数据处理的场景下。

然而,并发编程也是一把双刃剑。不正确的多线程实现会导致竞态条件、死锁、内存可见性等问题,使得程序行为不可预测且难以调试。本教程将深入探讨Java并发编程的核心概念、最佳实践和常见陷阱,帮助你编写出高效且线程安全的Java应用程序。

Java线程创建与管理的多种方式

Java提供了多种创建线程的方式,每种方式都有其适用场景和优缺点。

1. 继承Thread类


public class SimpleThread extends Thread {
    private final String message;
    
    public SimpleThread(String message) {
        this.message = message;
    }
    
    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            System.out.println(message + " : " + i);
            try {
                Thread.sleep(500); // 模拟耗时操作
            } catch (InterruptedException e) {
                System.err.println("线程被中断: " + e.getMessage());
                Thread.currentThread().interrupt(); // 恢复中断状态
            }
        }
    }
    
    // 使用示例
    public static void main(String[] args) {
        Thread thread1 = new SimpleThread("线程A");
        Thread thread2 = new SimpleThread("线程B");
        
        thread1.start(); // 启动线程
        thread2.start();
        
        try {
            thread1.join(); // 等待线程结束
            thread2.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        System.out.println("所有线程执行完毕");
    }
}
                

2. 实现Runnable接口


public class TaskRunnable implements Runnable {
    private final String taskName;
    
    public TaskRunnable(String taskName) {
        this.taskName = taskName;
    }
    
    @Override
    public void run() {
        System.out.println("开始执行任务: " + taskName + ",线程: " + Thread.currentThread().getName());
        // 模拟任务执行
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        System.out.println("任务完成: " + taskName);
    }
    
    // 使用示例
    public static void main(String[] args) {
        Thread thread1 = new Thread(new TaskRunnable("任务A"));
        Thread thread2 = new Thread(new TaskRunnable("任务B"));
        
        // 设置线程优先级(提示性,不保证)
        thread1.setPriority(Thread.MAX_PRIORITY);
        thread2.setPriority(Thread.MIN_PRIORITY);
        
        // 设置为守护线程(当所有非守护线程结束时,JVM会退出)
        thread1.setDaemon(false);
        thread2.setDaemon(false);
        
        thread1.start();
        thread2.start();
    }
}
                

3. 实现Callable接口和Future


import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class CallableExample implements Callable<Integer> {
    private final int number;
    
    public CallableExample(int number) {
        this.number = number;
    }
    
    @Override
    public Integer call() throws Exception {
        // 模拟计算任务
        if (number < 0) {
            throw new IllegalArgumentException("数字不能为负数");
        }
        
        int result = 0;
        for (int i = 1; i <= number; i++) {
            result += i;
            Thread.sleep(100); // 模拟耗时
        }
        return result;
    }
    
    // 使用示例
    public static void main(String[] args) {
        Callable<Integer> task = new CallableExample(10);
        FutureTask<Integer> futureTask = new FutureTask<>(task);
        
        Thread thread = new Thread(futureTask);
        thread.start();
        
        // 主线程可以继续执行其他任务
        
        try {
            // 获取计算结果(会阻塞直到任务完成)
            Integer result = futureTask.get();
            System.out.println("计算结果: " + result);
        } catch (InterruptedException | ExecutionException e) {
            System.err.println("任务执行出错: " + e.getMessage());
        }
    }
}
                

线程同步与锁机制

当多个线程访问共享资源时,必须使用同步机制来确保数据的一致性和线程安全。

1. synchronized关键字


public class SynchronizedCounter {
    private int count = 0;
    
    // 同步方法
    public synchronized void increment() {
        count++;
    }
    
    public synchronized void decrement() {
        count--;
    }
    
    public synchronized int getCount() {
        return count;
    }
}

// 使用同步代码块
public class SafeSharedResource {
    private final Object lock = new Object();
    private int value;
    
    public void updateValue(int newValue) {
        // 非同步操作
        System.out.println("准备更新值...");
        
        // 同步代码块
        synchronized(lock) {
            value = newValue;
            System.out.println("值已更新为: " + value);
        }
        
        // 更多的非同步操作
    }
    
    public int getValue() {
        synchronized(lock) {
            return value;
        }
    }
}
                

2. ReentrantLock可重入锁


import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockExample {
    private final Lock lock = new ReentrantLock();
    private int sharedValue = 0;
    
    public void performTask() {
        lock.lock(); // 获取锁
        try {
            // 临界区代码
            sharedValue++;
            System.out.println("当前值: " + sharedValue + ",线程: " + Thread.currentThread().getName());
            Thread.sleep(100); // 模拟工作
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock(); // 确保在finally块中释放锁
        }
    }
    
    // 尝试获取锁,避免死锁
    public boolean tryPerformTask() {
        if (lock.tryLock()) { // 尝试获取锁,立即返回
            try {
                // 临界区代码
                sharedValue--;
                System.out.println("递减后的值: " + sharedValue);
                return true;
            } finally {
                lock.unlock();
            }
        } else {
            System.out.println("无法获取锁,执行其他操作");
            return false;
        }
    }
}
                

3. 读写锁(ReadWriteLock)


import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteMap<K, V> {
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final java.util.Map<K, V> map = new java.util.HashMap<>();
    
    public V put(K key, V value) {
        lock.writeLock().lock(); // 获取写锁
        try {
            return map.put(key, value);
        } finally {
            lock.writeLock().unlock();
        }
    }
    
    public V get(K key) {
        lock.readLock().lock(); // 获取读锁
        try {
            return map.get(key);
        } finally {
            lock.readLock().unlock();
        }
    }
    
    public int size() {
        lock.readLock().lock();
        try {
            return map.size();
        } finally {
            lock.readLock().unlock();
        }
    }
}
                

线程池技术与Executor框架

Java的Executor框架提供了强大的线程池实现,可以有效地管理线程生命周期,减少线程创建和销毁的开销。

1. 创建不同类型的线程池


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExamples {
    
    public static void fixedThreadPoolExample() {
        // 创建固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(5);
        
        for (int i = 0; i  {
                System.out.println("执行任务 " + taskId + ",线程: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000); // 模拟任务执行
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        // 优雅关闭线程池
        executor.shutdown();
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow(); // 强制关闭
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
    
    public static void cachedThreadPoolExample() {
        // 创建可缓存的线程池
        ExecutorService executor = Executors.newCachedThreadPool();
        
        for (int i = 0; i  {
                System.out.println("执行短期任务 " + taskId + ",线程: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        executor.shutdown();
    }
    
    public static void scheduledThreadPoolExample() {
        // 创建定时任务线程池
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);
        
        // 延迟执行
        scheduler.schedule(() -> {
            System.out.println("延迟5秒执行的任务");
        }, 5, TimeUnit.SECONDS);
        
        // 固定频率执行
        scheduler.scheduleAtFixedRate(() -> {
            System.out.println("固定频率任务,时间: " + new java.util.Date());
        }, 1, 2, TimeUnit.SECONDS); // 初始延迟1秒,之后每2秒执行一次
        
        // 固定延迟执行
        scheduler.scheduleWithFixedDelay(() -> {
            System.out.println("固定延迟任务开始");
            try {
                Thread.sleep(1000); // 模拟任务执行时间
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("固定延迟任务结束");
        }, 1, 3, TimeUnit.SECONDS); // 每次任务结束后延迟3秒再执行下一次
        
        // 运行一段时间后关闭
        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        scheduler.shutdown();
    }
    
    public static void main(String[] args) {
        System.out.println("=== 固定大小线程池示例 ===");
        fixedThreadPoolExample();
        
        System.out.println("=== 缓存线程池示例 ===");
        cachedThreadPoolExample();
        
        System.out.println("=== 定时任务线程池示例 ===");
        scheduledThreadPoolExample();
    }
}
                

2. 自定义线程池


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CustomThreadPool {
    
    public static void main(String[] args) {
        // 创建有界队列
        BlockingQueue workQueue = new LinkedBlockingQueue(100);
        
        // 自定义线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5, // 核心线程数
            10, // 最大线程数
            60L, TimeUnit.SECONDS, // 空闲线程存活时间
            workQueue, // 工作队列
            new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
        );
        
        // 允许核心线程超时
        executor.allowCoreThreadTimeOut(true);
        
        // 监控线程池状态
        monitorThreadPool(executor);
        
        // 提交任务
        for (int i = 0; i  {
                System.out.println("执行任务 " + taskId + 
                                 ", 活跃线程: " + executor.getActiveCount() +
                                 ", 队列大小: " + executor.getQueue().size());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        // 优雅关闭
        executor.shutdown();
    }
    
    private static void monitorThreadPool(ThreadPoolExecutor executor) {
        Thread monitorThread = new Thread(() -> {
            while (!executor.isTerminated()) {
                System.out.println("监控 - 线程池状态: " +
                                 "核心大小: " + executor.getCorePoolSize() +
                                 ", 活跃线程: " + executor.getActiveCount() +
                                 ", 最大大小: " + executor.getMaximumPoolSize() +
                                 ", 队列大小: " + executor.getQueue().size());
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
        monitorThread.setDaemon(true);
        monitorThread.start();
    }
}
                

并发集合类的使用

Java并发包提供了多种线程安全的集合类,比使用同步包装器更高效。

1. ConcurrentHashMap


import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class ConcurrentHashMapExample {
    
    public static void main(String[] args) {
        ConcurrentMap map = new ConcurrentHashMap();
        
        // 多个线程同时操作
        Thread writer1 = new Thread(() -> {
            for (int i = 0; i  {
            for (int i = 0; i  {
            while (!Thread.currentThread().isInterrupted()) {
                map.forEach((key, value) -> {
                    System.out.println(key + " = " + value);
                });
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        
        writer1.start();
        writer2.start();
        reader.start();
        
        try {
            writer1.join();
            writer2.join();
            Thread.sleep(2000);
            reader.interrupt();
            reader.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        System.out.println("最终映射大小: " + map.size());
    }
}
                

2. BlockingQueue实现生产者-消费者模式


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConsumerExample {
    
    public static void main(String[] args) {
        BlockingQueue queue = new LinkedBlockingQueue(10);
        
        // 生产者
        Runnable producer = () -> {
            try {
                for (int i = 0; i  {
            try {
                while (true) {
                    String item = queue.take(); // 如果队列空则阻塞
                    System.out.println("消费: " + item + ",队列大小: " + queue.size());
                    Thread.sleep(200);
                    
                    if (queue.isEmpty() && Thread.currentThread().isInterrupted()) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        };
        
        Thread producerThread = new Thread(producer, "Producer");
        Thread consumerThread = new Thread(consumer, "Consumer");
        
        producerThread.start();
        consumerThread.start();
        
        try {
            producerThread.join();
            Thread.sleep(1000); // 给消费者时间处理剩余项目
            consumerThread.interrupt();
            consumerThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        System.out.println("生产消费模式完成");
    }
}
                

实战案例:多线程文件处理系统

下面是一个完整的实战案例,演示如何使用线程池和并发集合处理大量文件。


import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class ConcurrentFileProcessor {
    private final ExecutorService executor;
    private final ConcurrentMap wordCounts = new ConcurrentHashMap();
    
    public ConcurrentFileProcessor(int threadPoolSize) {
        this.executor = Executors.newFixedThreadPool(threadPoolSize);
    }
    
    public void processDirectory(String directoryPath) throws IOException, InterruptedException {
        // 获取目录下所有文本文件
        List files = Files.walk(Paths.get(directoryPath))
                                .filter(Files::isRegularFile)
                                .filter(p -> p.toString().endsWith(".txt"))
                                .collect(Collectors.toList());
        
        // 创建CompletionService用于获取完成的任务
        CompletionService<ConcurrentMap> completionService = 
            new ExecutorCompletionService(executor);
        
        // 提交所有文件处理任务
        for (Path file : files) {
            completionService.submit(() -> processFile(file));
        }
        
        // 等待所有任务完成并合并结果
        for (int i = 0; i < files.size(); i++) {
            try {
                Future<ConcurrentMap> future = completionService.take();
                ConcurrentMap fileCounts = future.get();
                
                // 合并词频统计
                fileCounts.forEach((word, count) -> 
                    wordCounts.merge(word, count, Long::sum)
                );
            } catch (ExecutionException e) {
                System.err.println("文件处理失败: " + e.getCause().getMessage());
            }
        }
        
        // 输出最常见的10个词
        wordCounts.entrySet().stream()
                  .sorted((e1, e2) -> Long.compare(e2.getValue(), e1.getValue()))
                  .limit(10)
                  .forEach(entry -> 
                      System.out.println(entry.getKey() + ": " + entry.getValue())
                  );
    }
    
    private ConcurrentMap processFile(Path file) throws IOException {
        System.out.println("处理文件: " + file + ",线程: " + Thread.currentThread().getName());
        
        ConcurrentMap localCounts = new ConcurrentHashMap();
        
        // 读取文件并统计词频
        Files.lines(file)
             .flatMap(line -> List.of(line.split("\W+")).stream())
             .filter(word -> !word.isEmpty())
             .map(String::toLowerCase)
             .forEach(word -> 
                 localCounts.merge(word, 1L, Long::sum)
             );
        
        return localCounts;
    }
    
    public void shutdown() {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
    
    public static void main(String[] args) {
        ConcurrentFileProcessor processor = new ConcurrentFileProcessor(8);
        
        try {
            long startTime = System.currentTimeMillis();
            processor.processDirectory("./documents"); // 替换为实际目录
            long endTime = System.currentTimeMillis();
            
            System.out.println("处理完成,耗时: " + (endTime - startTime) + "ms");
            System.out.println("总共统计单词: " + processor.wordCounts.size());
            
        } catch (IOException | InterruptedException e) {
            System.err.println("处理失败: " + e.getMessage());
        } finally {
            processor.shutdown();
        }
    }
}
                

总结与最佳实践

通过本教程,我们深入探讨了Java并发编程的核心概念和技术。以下是关键要点和最佳实践:

并发编程最佳实践

  • 优先使用高级并发工具:如Executor框架、并发集合类,而不是手动管理线程
  • 明确线程边界:清晰定义哪些数据由哪些线程访问,减少共享状态
  • 使用不可变对象:不可变对象天生线程安全,可以自由共享
  • 谨慎使用同步:同步范围应尽可能小,避免在同步块中执行耗时操作
  • 处理中断 properly:正确响应中断请求,确保线程能够优雅终止

性能考虑

  • 根据任务类型选择合适的线程池(CPU密集型 vs I/O密集型)
  • 避免过度创建线程,线程创建和上下文切换都有开销
  • 使用线程池监控工具(如JMX)来调优线程池参数
  • 考虑使用Fork/Join框架处理可分治的递归任务

调试与测试

  • 使用线程转储(thread dump)分析死锁和性能问题
  • 编写可测试的并发代码,尽量减少同步复杂性
  • 使用专门的并发测试工具(如JCStress)测试线程安全性

并发编程是Java开发中的高级主题,需要深入理解和大量实践。通过掌握这些技术,你将能够构建出高性能、高并发的Java应用程序,充分利用现代多核处理器的计算能力。

Java并发编程实战:多线程与线程池深度解析 | Java高级教程
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 java Java并发编程实战:多线程与线程池深度解析 | Java高级教程 https://www.taomawang.com/server/java/1067.html

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务