Java多线程与并发编程实战指南 | Java高级编程教程

2025-09-17 0 868

深入理解Java多线程机制,掌握高并发场景下的编程技巧与最佳实践

Java多线程编程基础

Java从语言级别支持多线程编程,提供了丰富的API和内置机制来处理并发任务。理解多线程基础是构建高性能Java应用的关键。

线程的创建方式

Java中创建线程主要有三种方式:

1. 继承Thread类

public class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("线程运行中: " + Thread.currentThread().getName());
    }
    
    public static void main(String[] args) {
        MyThread thread = new MyThread();
        thread.start(); // 启动线程
    }
}

2. 实现Runnable接口

public class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("Runnable线程运行: " + Thread.currentThread().getName());
    }
    
    public static void main(String[] args) {
        Thread thread = new Thread(new MyRunnable());
        thread.start();
    }
}

3. 实现Callable接口(带返回值)

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

public class MyCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        return "Callable执行结果: " + Thread.currentThread().getName();
    }
    
    public static void main(String[] args) throws Exception {
        FutureTask<String> futureTask = new FutureTask<>(new MyCallable());
        Thread thread = new Thread(futureTask);
        thread.start();
        
        // 获取执行结果(会阻塞直到任务完成)
        String result = futureTask.get();
        System.out.println(result);
    }
}

线程生命周期

Java线程有以下几种状态:

  • NEW:新创建但尚未启动
  • RUNNABLE:正在运行或准备运行
  • BLOCKED:等待监视器锁(阻塞状态)
  • WAITING:无限期等待其他线程执行特定操作
  • TIMED_WAITING:有限期等待
  • TERMINATED:已终止

线程管理与同步

在多线程环境中,正确处理线程同步和资源共享是避免竞态条件和数据不一致的关键。

同步机制

1. synchronized关键字

public class SynchronizedCounter {
    private int count = 0;
    
    // 同步方法
    public synchronized void increment() {
        count++;
    }
    
    // 同步代码块
    public void decrement() {
        synchronized(this) {
            count--;
        }
    }
    
    public synchronized int getCount() {
        return count;
    }
}

2. ReentrantLock可重入锁

import java.util.concurrent.locks.ReentrantLock;

public class LockCounter {
    private int count = 0;
    private final ReentrantLock lock = new ReentrantLock();
    
    public void increment() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock(); // 确保锁被释放
        }
    }
    
    public int getCount() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
}

线程间通信

public class WaitNotifyExample {
    private boolean flag = false;
    
    public synchronized void waitForFlag() throws InterruptedException {
        while (!flag) {
            wait(); // 释放锁并等待
        }
        System.out.println("标志已设置,继续执行");
    }
    
    public synchronized void setFlag() {
        this.flag = true;
        notifyAll(); // 通知所有等待的线程
    }
}

线程池管理

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExample {
    public static void main(String[] args) {
        // 创建固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(5);
        
        // 提交任务
        for (int i = 0; i  {
                System.out.println("执行任务: " + taskId + 
                                 ", 线程: " + Thread.currentThread().getName());
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        // 优雅关闭线程池
        executor.shutdown();
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

Java并发工具类

Java并发包(java.util.concurrent)提供了强大的工具类来处理复杂的并发场景。

1. CountDownLatch

import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 5;
        CountDownLatch startLatch = new CountDownLatch(1);
        CountDownLatch doneLatch = new CountDownLatch(threadCount);
        
        for (int i = 0; i  {
                try {
                    startLatch.await(); // 等待开始信号
                    System.out.println(Thread.currentThread().getName() + " 开始工作");
                    Thread.sleep(1000);
                    doneLatch.countDown(); // 完成工作
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
        
        System.out.println("准备开始所有线程...");
        Thread.sleep(1000);
        startLatch.countDown(); // 发出开始信号
        
        doneLatch.await(); // 等待所有线程完成
        System.out.println("所有线程已完成工作");
    }
}

2. CyclicBarrier

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
    public static void main(String[] args) {
        int threadCount = 3;
        CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
            System.out.println("所有线程已到达屏障点,继续执行");
        });
        
        for (int i = 0; i  {
                try {
                    System.out.println("线程 " + threadId + " 开始第一阶段");
                    Thread.sleep(1000 + threadId * 200);
                    System.out.println("线程 " + threadId + " 到达屏障点");
                    barrier.await();
                    
                    System.out.println("线程 " + threadId + " 开始第二阶段");
                    Thread.sleep(500);
                    System.out.println("线程 " + threadId + " 完成工作");
                } catch (InterruptedException | BrokenBarrierException e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
    }
}

3. Semaphore

import java.util.concurrent.Semaphore;

public class SemaphoreExample {
    public static void main(String[] args) {
        // 限制同时访问的线程数量为3
        Semaphore semaphore = new Semaphore(3);
        
        for (int i = 0; i  {
                try {
                    semaphore.acquire(); // 获取许可
                    System.out.println("线程 " + threadId + " 获得许可,开始工作");
                    Thread.sleep(2000);
                    System.out.println("线程 " + threadId + " 完成工作,释放许可");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    semaphore.release(); // 释放许可
                }
            }).start();
        }
    }
}

4. Concurrent Collections

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

public class ConcurrentCollectionExample {
    public static void main(String[] args) {
        // 线程安全的HashMap
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
        
        // 线程安全的List
        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
        
        // 多个线程可以安全地并发访问这些集合
        Thread writer = new Thread(() -> {
            for (int i = 0; i  {
            for (int i = 0; i < 100; i++) {
                Integer value = map.get("key" + i);
                if (value != null) {
                    System.out.println("读取: key" + i + " = " + value);
                }
            }
        });
        
        writer.start();
        reader.start();
    }
}

实战项目:多线程文件处理器

下面我们实现一个多线程文件处理系统,演示如何在实际项目中应用Java并发编程技术。

项目需求

  • 监控指定目录下的新文件
  • 使用多线程并行处理文件
  • 限制同时处理的文件数量
  • 支持处理结果统计和错误处理

核心实现

import java.io.IOException;
import java.nio.file.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class MultiThreadFileProcessor {
    private final Path directory;
    private final ExecutorService processorPool;
    private final Semaphore processingSemaphore;
    private final AtomicInteger processedCount = new AtomicInteger(0);
    private final AtomicInteger failedCount = new AtomicInteger(0);
    private volatile boolean running = true;
    
    public MultiThreadFileProcessor(String dirPath, int maxConcurrentFiles) {
        this.directory = Paths.get(dirPath);
        this.processorPool = Executors.newFixedThreadPool(maxConcurrentFiles);
        this.processingSemaphore = new Semaphore(maxConcurrentFiles);
    }
    
    public void start() throws IOException, InterruptedException {
        System.out.println("启动文件处理器,监控目录: " + directory);
        
        // 创建监控服务
        WatchService watchService = FileSystems.getDefault().newWatchService();
        directory.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
        
        // 处理现有文件
        processExistingFiles();
        
        // 监控新文件
        while (running) {
            WatchKey key = watchService.take();
            
            for (WatchEvent<?> event : key.pollEvents()) {
                if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
                    Path filePath = directory.resolve((Path) event.context());
                    processFile(filePath);
                }
            }
            
            key.reset();
        }
        
        shutdown();
    }
    
    private void processExistingFiles() throws IOException {
        try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory)) {
            for (Path file : stream) {
                if (Files.isRegularFile(file)) {
                    processFile(file);
                }
            }
        }
    }
    
    private void processFile(Path filePath) {
        if (!processingSemaphore.tryAcquire()) {
            System.out.println("达到处理上限,跳过文件: " + filePath.getFileName());
            return;
        }
        
        processorPool.submit(() -> {
            try {
                System.out.println("开始处理文件: " + filePath.getFileName() + 
                                 " (线程: " + Thread.currentThread().getName() + ")");
                
                // 模拟文件处理
                processFileContent(filePath);
                
                processedCount.incrementAndGet();
                System.out.println("成功处理文件: " + filePath.getFileName());
            } catch (Exception e) {
                failedCount.incrementAndGet();
                System.err.println("处理文件失败: " + filePath.getFileName() + 
                                  ", 错误: " + e.getMessage());
            } finally {
                processingSemaphore.release();
            }
        });
    }
    
    private void processFileContent(Path filePath) throws IOException, InterruptedException {
        // 模拟处理时间
        long fileSize = Files.size(filePath);
        long processingTime = Math.min(5000, fileSize / 1000);
        
        Thread.sleep(processingTime);
        
        // 这里可以添加实际的文件处理逻辑
        // 例如: 文件解析、数据提取、格式转换等
        
        // 模拟随机失败(10%概率)
        if (Math.random() < 0.1) {
            throw new IOException("处理过程中发生随机错误");
        }
    }
    
    public void stop() {
        running = false;
    }
    
    public void shutdown() {
        stop();
        processorPool.shutdown();
        try {
            if (!processorPool.awaitTermination(60, TimeUnit.SECONDS)) {
                processorPool.shutdownNow();
            }
        } catch (InterruptedException e) {
            processorPool.shutdownNow();
            Thread.currentThread().interrupt();
        }
        
        System.out.println("处理完成统计 - 成功: " + processedCount.get() + 
                          ", 失败: " + failedCount.get());
    }
    
    public static void main(String[] args) {
        if (args.length < 1) {
            System.out.println("用法: java MultiThreadFileProcessor  [并发数]");
            return;
        }
        
        String directoryPath = args[0];
        int maxConcurrent = args.length > 1 ? Integer.parseInt(args[1]) : 5;
        
        MultiThreadFileProcessor processor = new MultiThreadFileProcessor(directoryPath, maxConcurrent);
        
        // 添加关闭钩子
        Runtime.getRuntime().addShutdownHook(new Thread(processor::shutdown));
        
        try {
            processor.start();
        } catch (Exception e) {
            System.err.println("处理器启动失败: " + e.getMessage());
            processor.shutdown();
        }
    }
}

高级特性扩展

// 1. 使用CompletableFuture进行异步处理链
public CompletableFuture<ProcessingResult> processFileAsync(Path filePath) {
    return CompletableFuture.supplyAsync(() -> {
        try {
            return processFileContentAdvanced(filePath);
        } catch (Exception e) {
            throw new CompletionException(e);
        }
    }, processorPool).handle((result, exception) -> {
        if (exception != null) {
            return new ProcessingResult(filePath, false, exception.getMessage());
        }
        return new ProcessingResult(filePath, true, "成功");
    });
}

// 2. 使用ThreadLocal存储线程特定数据
private static final ThreadLocal<SimpleDateFormat> DATE_FORMATTER = 
    ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));

// 3. 使用Phaser进行多阶段处理
public void processWithPhases(Path filePath) {
    Phaser phaser = new Phaser(1); // 注册主线程
    
    for (int phase = 0; phase  {
            phaser.register();
            try {
                processPhase(filePath, currentPhase);
            } finally {
                phaser.arriveAndDeregister();
            }
        });
    }
    
    // 等待所有阶段完成
    phaser.arriveAndAwaitAdvance();
}

最佳实践总结

  1. 合理配置线程池大小(CPU密集型 vs IO密集型任务)
  2. 始终使用try-finally块确保资源释放
  3. 优先使用java.util.concurrent包中的高级工具类
  4. 避免使用stop()、suspend()等已废弃的线程方法
  5. 使用ThreadLocal时注意内存泄漏问题
  6. 正确处理InterruptedException
  7. 使用原子类和volatile关键字处理简单的原子操作

// 为页面添加简单的交互功能
document.addEventListener(‘DOMContentLoaded’, function() {
// 平滑滚动导航
const navLinks = document.querySelectorAll(‘nav a’);
navLinks.forEach(link => {
link.addEventListener(‘click’, function(e) {
e.preventDefault();
const targetId = this.getAttribute(‘href’);
const targetElement = document.querySelector(targetId);

if (targetElement) {
targetElement.scrollIntoView({
behavior: ‘smooth’,
block: ‘start’
});
}
});
});

// 代码块复制功能
const codeBlocks = document.querySelectorAll(‘pre’);
codeBlocks.forEach(block => {
block.addEventListener(‘click’, function() {
const textToCopy = this.textContent;
navigator.clipboard.writeText(textToCopy).then(() => {
const originalText = this.textContent;
this.textContent = ‘代码已复制到剪贴板!’;

setTimeout(() => {
this.textContent = originalText;
}, 1500);
}).catch(err => {
console.error(‘无法复制文本: ‘, err);
});
});
});

// 添加简单的语法高亮提示
const keywords = [‘class’, ‘public’, ‘private’, ‘static’, ‘void’, ‘new’, ‘import’,
‘extends’, ‘implements’, ‘interface’, ‘synchronized’, ‘volatile’,
‘try’, ‘catch’, ‘finally’, ‘throw’, ‘throws’, ‘return’];

codeBlocks.forEach(block => {
let code = block.textContent;
keywords.forEach(keyword => {
const regex = new RegExp(`\b${keyword}\b`, ‘g’);
code = code.replace(regex, `${keyword}`);
});
block.innerHTML = code;
});
});

Java多线程与并发编程实战指南 | Java高级编程教程
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 python Java多线程与并发编程实战指南 | Java高级编程教程 https://www.taomawang.com/server/python/1075.html

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务