Java多线程与并发编程实战指南:构建高性能应用 | Java高级教程

2025-09-17 0 188

深入理解Java多线程机制,掌握并发编程核心技术,构建高性能、高并发的Java应用程序

Java多线程编程基础

多线程是Java语言的重要特性,允许程序同时执行多个任务,提高CPU利用率和应用程序响应性。现代多核处理器架构下,合理使用多线程可以显著提升程序性能。

线程创建方式

Java提供了多种创建线程的方式,每种方式都有其适用场景:

1. 继承Thread类

public class SimpleThread extends Thread {
    @Override
    public void run() {
        System.out.println("线程运行: " + Thread.currentThread().getName());
    }
    
    public static void main(String[] args) {
        SimpleThread thread = new SimpleThread();
        thread.start(); // 启动线程
    }
}

2. 实现Runnable接口

public class Task implements Runnable {
    @Override
    public void run() {
        System.out.println("任务执行: " + Thread.currentThread().getName());
    }
    
    public static void main(String[] args) {
        Thread thread = new Thread(new Task());
        thread.start();
    }
}

3. 实现Callable接口(带返回值)

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

public class ResultTask implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        int sum = 0;
        for (int i = 1; i <= 100; i++) {
            sum += i;
        }
        return sum;
    }
    
    public static void main(String[] args) throws Exception {
        FutureTask<Integer> futureTask = new FutureTask(new ResultTask());
        Thread thread = new Thread(futureTask);
        thread.start();
        
        // 获取计算结果(会阻塞直到计算完成)
        Integer result = futureTask.get();
        System.out.println("计算结果: " + result);
    }
}

线程生命周期

Java线程有以下几种状态:

  • NEW:新创建但尚未启动
  • RUNNABLE:正在JVM中执行或等待系统资源
  • BLOCKED:等待监视器锁(synchronized)
  • WAITING:无限期等待其他线程执行特定操作
  • TIMED_WAITING:有限时间等待
  • TERMINATED:已退出

线程管理与同步

多线程环境下,线程间的协调和资源共享是关键挑战。Java提供了多种同步机制来保证线程安全

同步机制

1. synchronized关键字

public class Counter {
    private int count = 0;
    
    // 同步方法
    public synchronized void increment() {
        count++;
    }
    
    // 同步代码块
    public void decrement() {
        synchronized(this) {
            count--;
        }
    }
    
    public synchronized int getCount() {
        return count;
    }
}

2. ReentrantLock可重入锁

import java.util.concurrent.locks.ReentrantLock;

public class SafeCounter {
    private int count = 0;
    private final ReentrantLock lock = new ReentrantLock();
    
    public void increment() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }
    
    public int getCount() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
}

线程间通信

wait()和notify()方法

public class MessageQueue {
    private String message;
    private boolean empty = true;
    
    public synchronized String take() {
        while (empty) {
            try {
                wait(); // 等待消息
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return null;
            }
        }
        empty = true;
        notifyAll(); // 通知生产者
        return message;
    }
    
    public synchronized void put(String message) {
        while (!empty) {
            try {
                wait(); // 等待消费
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        }
        empty = false;
        this.message = message;
        notifyAll(); // 通知消费者
    }
}

线程池管理

使用线程池可以避免频繁创建和销毁线程的开销:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExample {
    public static void main(String[] args) {
        // 创建固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(5);
        
        // 提交任务
        for (int i = 0; i  {
                System.out.println("执行任务 " + taskId + " 线程: " + 
                                  Thread.currentThread().getName());
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        // 优雅关闭线程池
        executor.shutdown();
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

Java并发工具类

Java并发包(java.util.concurrent)提供了丰富的工具类,简化并发编程。

1. CountDownLatch

import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 5;
        CountDownLatch startLatch = new CountDownLatch(1);
        CountDownLatch doneLatch = new CountDownLatch(threadCount);
        
        for (int i = 0; i  {
                try {
                    startLatch.await(); // 等待开始信号
                    System.out.println(Thread.currentThread().getName() + " 开始工作");
                    Thread.sleep(1000); // 模拟工作
                    doneLatch.countDown(); // 完成计数减一
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
        
        System.out.println("主线程准备就绪");
        Thread.sleep(1000);
        startLatch.countDown(); // 发出开始信号
        doneLatch.await(); // 等待所有线程完成
        System.out.println("所有线程已完成工作");
    }
}

2. CyclicBarrier

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
    public static void main(String[] args) {
        int threadCount = 3;
        CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
            System.out.println("所有线程已到达屏障,继续执行");
        });
        
        for (int i = 0; i  {
                try {
                    System.out.println("线程 " + threadId + " 开始第一阶段");
                    Thread.sleep(1000 + threadId * 200);
                    System.out.println("线程 " + threadId + " 到达屏障");
                    barrier.await();
                    
                    System.out.println("线程 " + threadId + " 开始第二阶段");
                    Thread.sleep(500 + threadId * 100);
                    System.out.println("线程 " + threadId + " 再次到达屏障");
                    barrier.await();
                    
                    System.out.println("线程 " + threadId + " 完成");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

3. Semaphore

import java.util.concurrent.Semaphore;

public class SemaphoreExample {
    public static void main(String[] args) {
        // 限制同时只有3个线程可以访问资源
        Semaphore semaphore = new Semaphore(3);
        
        for (int i = 0; i  {
                try {
                    System.out.println("线程 " + threadId + " 等待许可");
                    semaphore.acquire();
                    System.out.println("线程 " + threadId + " 获得许可,开始工作");
                    Thread.sleep(2000); // 模拟资源使用
                    System.out.println("线程 " + threadId + " 释放许可");
                    semaphore.release();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
    }
}

4. ConcurrentHashMap

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class ConcurrentMapExample {
    public static void main(String[] args) {
        ConcurrentMap<String, Integer> map = new ConcurrentHashMap();
        
        // 多个线程同时操作
        Thread writer1 = new Thread(() -> {
            for (int i = 0; i  {
            for (int i = 100; i  {
            while (map.size() < 200) {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            System.out.println("Map大小: " + map.size());
        });
        
        writer1.start();
        writer2.start();
        reader.start();
        
        try {
            writer1.join();
            writer2.join();
            reader.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

实战项目:多线程文件处理器

下面我们将实现一个多线程文件处理系统,展示如何在实际项目中应用Java并发编程技术。

项目需求

  • 监控指定目录下的新文件
  • 使用多线程并行处理文件
  • 支持不同类型的文件处理器
  • 限制同时处理的文件数量
  • 提供处理进度监控

核心实现

1. 文件处理器接口

import java.io.File;

public interface FileProcessor {
    boolean supports(File file);
    void process(File file) throws ProcessingException;
}

public class ProcessingException extends Exception {
    public ProcessingException(String message, Throwable cause) {
        super(message, cause);
    }
}

2. 文件处理任务

import java.io.File;
import java.util.List;

public class FileProcessingTask implements Runnable {
    private final File file;
    private final List<FileProcessor> processors;
    
    public FileProcessingTask(File file, List<FileProcessor> processors) {
        this.file = file;
        this.processors = processors;
    }
    
    @Override
    public void run() {
        System.out.println("开始处理文件: " + file.getName());
        
        for (FileProcessor processor : processors) {
            if (processor.supports(file)) {
                try {
                    processor.process(file);
                    System.out.println("文件 " + file.getName() + " 处理成功");
                    return;
                } catch (ProcessingException e) {
                    System.err.println("处理文件 " + file.getName() + " 失败: " + e.getMessage());
                }
            }
        }
        
        System.out.println("没有找到适合的处理器处理文件: " + file.getName());
    }
}

3. 文件监控器

import java.io.File;
import java.nio.file.*;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;

import static java.nio.file.StandardWatchEventKinds.*;

public class FileMonitor {
    private final Path directory;
    private final ExecutorService executor;
    private final List<FileProcessor> processors;
    private final Semaphore semaphore;
    private volatile boolean running = false;
    
    public FileMonitor(String directoryPath, ExecutorService executor, 
                      List<FileProcessor> processors, int maxConcurrent) {
        this.directory = Paths.get(directoryPath);
        this.executor = executor;
        this.processors = processors;
        this.semaphore = new Semaphore(maxConcurrent);
    }
    
    public void start() throws Exception {
        if (!Files.exists(directory) || !Files.isDirectory(directory)) {
            throw new IllegalArgumentException("目录不存在或不是有效目录: " + directory);
        }
        
        running = true;
        WatchService watchService = FileSystems.getDefault().newWatchService();
        directory.register(watchService, ENTRY_CREATE);
        
        System.out.println("开始监控目录: " + directory);
        
        // 处理现有文件
        processExistingFiles();
        
        // 监控新文件
        while (running) {
            WatchKey key = watchService.take();
            for (WatchEvent<?> event : key.pollEvents()) {
                if (event.kind() == ENTRY_CREATE) {
                    Path filePath = directory.resolve((Path) event.context());
                    submitFileForProcessing(filePath.toFile());
                }
            }
            key.reset();
        }
    }
    
    private void processExistingFiles() {
        File[] existingFiles = directory.toFile().listFiles();
        if (existingFiles != null) {
            for (File file : existingFiles) {
                if (file.isFile()) {
                    submitFileForProcessing(file);
                }
            }
        }
    }
    
    private void submitFileForProcessing(File file) {
        try {
            semaphore.acquire(); // 获取许可
            executor.submit(() -> {
                try {
                    new FileProcessingTask(file, processors).run();
                } finally {
                    semaphore.release(); // 释放许可
                }
            });
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.err.println("提交文件处理任务被中断: " + file.getName());
        }
    }
    
    public void stop() {
        running = false;
    }
}

4. 示例文件处理器

import java.io.File;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;

public class TextFileProcessor implements FileProcessor {
    @Override
    public boolean supports(File file) {
        return file.getName().toLowerCase().endsWith(".txt");
    }
    
    @Override
    public void process(File file) throws ProcessingException {
        try {
            // 模拟文本文件处理:转换为大写并保存
            String content = new String(Files.readAllBytes(file.toPath()));
            String upperCaseContent = content.toUpperCase();
            
            Path outputPath = file.toPath().resolveSibling(
                file.getName().replace(".txt", "_processed.txt"));
            
            Files.write(outputPath, upperCaseContent.getBytes());
            System.out.println("文本文件处理完成: " + file.getName());
        } catch (Exception e) {
            throw new ProcessingException("处理文本文件失败", e);
        }
    }
}

public class ImageFileProcessor implements FileProcessor {
    @Override
    public boolean supports(File file) {
        String name = file.getName().toLowerCase();
        return name.endsWith(".jpg") || name.endsWith(".png") || name.endsWith(".gif");
    }
    
    @Override
    public void process(File file) throws ProcessingException {
        try {
            // 模拟图像处理:创建缩略图
            Path thumbnailPath = file.toPath().resolveSibling(
                "thumbnail_" + file.getName());
            
            // 这里只是模拟,实际应用中会使用图像处理库
            Files.copy(file.toPath(), thumbnailPath, StandardCopyOption.REPLACE_EXISTING);
            System.out.println("图像文件处理完成: " + file.getName());
        } catch (Exception e) {
            throw new ProcessingException("处理图像文件失败", e);
        }
    }
}

5. 主应用程序

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class MultiThreadFileProcessorApp {
    public static void main(String[] args) {
        if (args.length < 1) {
            System.out.println("用法: java MultiThreadFileProcessorApp ");
            return;
        }
        
        String directoryPath = args[0];
        int maxThreads = Runtime.getRuntime().availableProcessors() * 2;
        int maxConcurrentFiles = 5;
        
        ExecutorService executor = Executors.newFixedThreadPool(maxThreads);
        List<FileProcessor> processors = Arrays.asList(
            new TextFileProcessor(),
            new ImageFileProcessor()
        );
        
        FileMonitor fileMonitor = new FileMonitor(directoryPath, executor, 
                                                processors, maxConcurrentFiles);
        
        // 添加关闭钩子
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("正在关闭应用程序...");
            fileMonitor.stop();
            executor.shutdown();
            try {
                if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
                    executor.shutdownNow();
                }
            } catch (InterruptedException e) {
                executor.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }));
        
        try {
            fileMonitor.start();
        } catch (Exception e) {
            System.err.println("启动文件监控器失败: " + e.getMessage());
            e.printStackTrace();
            executor.shutdown();
        }
    }
}

项目总结与最佳实践

  • 合理使用线程池管理线程资源
  • 使用Semaphore控制并发访问数量
  • 实现优雅的应用程序关闭机制
  • 为不同类型的任务创建专用处理器
  • 使用WatchService监控文件系统变化

性能优化建议

  1. 根据CPU核心数合理设置线程池大小
  2. 使用合适的并发控制机制避免资源竞争
  3. 实现批处理机制减少IO操作次数
  4. 使用缓冲区提高文件处理效率
  5. 监控线程状态,及时发现和解决死锁问题

// 为页面添加简单的交互功能
document.addEventListener(‘DOMContentLoaded’, function() {
// 平滑滚动导航
const navLinks = document.querySelectorAll(‘nav a’);
navLinks.forEach(link => {
link.addEventListener(‘click’, function(e) {
e.preventDefault();
const targetId = this.getAttribute(‘href’);
const targetElement = document.querySelector(targetId);

if (targetElement) {
targetElement.scrollIntoView({
behavior: ‘smooth’,
block: ‘start’
});
}
});
});

// 代码块复制功能
const codeBlocks = document.querySelectorAll(‘pre’);
codeBlocks.forEach(block => {
block.addEventListener(‘click’, function() {
const textToCopy = this.textContent;
navigator.clipboard.writeText(textToCopy).then(() => {
const originalText = this.textContent;
this.textContent = ‘代码已复制到剪贴板!’;

setTimeout(() => {
this.textContent = originalText;
}, 1500);
}).catch(err => {
console.error(‘无法复制文本: ‘, err);
});
});
});

// 添加代码语法高亮提示
const codeElements = document.querySelectorAll(‘code’);
codeElements.forEach(code => {
code.title = ‘点击复制代码’;
});
});

Java多线程与并发编程实战指南:构建高性能应用 | Java高级教程
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 java Java多线程与并发编程实战指南:构建高性能应用 | Java高级教程 https://www.taomawang.com/server/java/1074.html

下一篇:

已经没有下一篇了!

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务