深入理解Java多线程机制,掌握高并发系统开发核心技术
Java多线程编程基础
多线程是Java语言的重要特性,允许程序同时执行多个任务,提高CPU利用率和程序响应性。
线程的创建方式
Java提供了多种创建线程的方式:
1. 继承Thread类
public class MyThread extends Thread {
@Override
public void run() {
System.out.println("线程运行中: " + Thread.currentThread().getName());
}
public static void main(String[] args) {
MyThread thread = new MyThread();
thread.start(); // 启动线程
}
}
2. 实现Runnable接口
public class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("线程运行中: " + Thread.currentThread().getName());
}
public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.start();
}
}
3. 实现Callable接口(带返回值)
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class MyCallable implements Callable {
@Override
public String call() throws Exception {
return "线程执行结果: " + Thread.currentThread().getName();
}
public static void main(String[] args) throws Exception {
FutureTask futureTask = new FutureTask(new MyCallable());
Thread thread = new Thread(futureTask);
thread.start();
// 获取线程执行结果
String result = futureTask.get();
System.out.println(result);
}
}
4. 使用Lambda表达式(Java 8+)
public class LambdaThread {
public static void main(String[] args) {
// 使用Lambda创建Runnable
Thread thread = new Thread(() -> {
System.out.println("Lambda线程: " + Thread.currentThread().getName());
});
thread.start();
}
}
线程生命周期与状态管理
Java线程有6种状态:NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED。
线程状态转换示例
public class ThreadStateDemo {
public static void main(String[] args) throws InterruptedException {
Object lock = new Object();
Thread thread = new Thread(() -> {
synchronized (lock) {
try {
System.out.println("线程进入WAITING状态");
lock.wait(); // 进入WAITING状态
System.out.println("线程被唤醒");
Thread.sleep(1000); // 进入TIMED_WAITING状态
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
System.out.println("线程创建后状态: " + thread.getState()); // NEW
thread.start();
Thread.sleep(100); // 确保线程启动
System.out.println("线程启动后状态: " + thread.getState()); // RUNNABLE或BLOCKED
Thread.sleep(100);
System.out.println("线程调用wait后状态: " + thread.getState()); // WAITING
synchronized (lock) {
lock.notify(); // 唤醒线程
}
Thread.sleep(100);
System.out.println("线程被唤醒后状态: " + thread.getState()); // TIMED_WAITING或RUNNABLE
thread.join(); // 等待线程结束
System.out.println("线程结束后状态: " + thread.getState()); // TERMINATED
}
}
线程同步与锁机制
多线程环境下,共享资源的访问需要同步控制,避免数据不一致问题。
synchronized关键字
public class SynchronizedDemo {
private int count = 0;
private final Object lock = new Object();
// 同步方法
public synchronized void increment() {
count++;
}
// 同步代码块
public void decrement() {
synchronized (lock) {
count--;
}
}
public static void main(String[] args) throws InterruptedException {
SynchronizedDemo demo = new SynchronizedDemo();
Thread t1 = new Thread(() -> {
for (int i = 0; i {
for (int i = 0; i < 1000; i++) {
demo.decrement();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("最终结果: " + demo.count); // 应该是0
}
}
ReentrantLock可重入锁
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockDemo {
private int count = 0;
private final ReentrantLock lock = new ReentrantLock();
public void increment() {
lock.lock(); // 获取锁
try {
count++;
} finally {
lock.unlock(); // 释放锁
}
}
public void tryIncrement() {
if (lock.tryLock()) { // 尝试获取锁
try {
count++;
} finally {
lock.unlock();
}
} else {
System.out.println("获取锁失败,执行其他操作");
}
}
}
ReadWriteLock读写锁
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockDemo {
private int data = 0;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
public int read() {
lock.readLock().lock(); // 获取读锁
try {
return data;
} finally {
lock.readLock().unlock(); // 释放读锁
}
}
public void write(int value) {
lock.writeLock().lock(); // 获取写锁
try {
data = value;
} finally {
lock.writeLock().unlock(); // 释放写锁
}
}
}
线程池与Executor框架
Java提供了强大的线程池框架,可以有效管理线程资源,提高系统性能。
线程池的创建与使用
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo {
public static void main(String[] args) {
// 创建固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交任务
for (int i = 0; i {
System.out.println("执行任务 " + taskId + ",线程: " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟任务执行
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("完成任务 " + taskId);
});
}
// 关闭线程池
executor.shutdown();
try {
// 等待所有任务完成
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 强制关闭
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
}
自定义线程池
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CustomThreadPool {
public static void main(String[] args) {
// 创建自定义线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
5, // 最大线程数
60, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new ArrayBlockingQueue(10), // 工作队列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 提交任务
for (int i = 0; i {
System.out.println("执行任务 " + taskId + ",线程: " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 监控线程池状态
monitorThreadPool(executor);
executor.shutdown();
}
private static void monitorThreadPool(ThreadPoolExecutor executor) {
new Thread(() -> {
while (!executor.isTerminated()) {
System.out.println("线程池状态: " +
"核心线程数: " + executor.getCorePoolSize() +
", 活动线程数: " + executor.getActiveCount() +
", 任务总数: " + executor.getTaskCount() +
", 已完成任务: " + executor.getCompletedTaskCount() +
", 队列大小: " + executor.getQueue().size());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
并发集合类
Java提供了线程安全的并发集合类,可以在多线程环境下安全使用。
常用并发集合示例
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
public class ConcurrentCollectionsDemo {
public static void main(String[] args) throws InterruptedException {
// ConcurrentHashMap示例
ConcurrentHashMap map = new ConcurrentHashMap();
map.put("key1", 1);
map.putIfAbsent("key1", 2); // 不会覆盖已有值
System.out.println("ConcurrentHashMap: " + map.get("key1")); // 1
// CopyOnWriteArrayList示例
CopyOnWriteArrayList list = new CopyOnWriteArrayList();
list.add("item1");
list.add("item2");
// 遍历时修改不会抛出ConcurrentModificationException
for (String item : list) {
if (item.equals("item1")) {
list.add("item3");
}
}
System.out.println("CopyOnWriteArrayList: " + list);
// BlockingQueue示例
BlockingQueue queue = new ArrayBlockingQueue(10);
// 生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 0; i {
try {
for (int i = 0; i < 5; i++) {
String item = queue.take();
System.out.println("消费: " + item);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
producer.join();
consumer.join();
}
}
原子操作与CAS
Java提供了原子变量类,基于CAS(Compare-And-Swap)实现无锁线程安全操作。
原子类使用示例
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class AtomicDemo {
public static void main(String[] args) throws InterruptedException {
// AtomicInteger示例
AtomicInteger atomicInt = new AtomicInteger(0);
Thread t1 = new Thread(() -> {
for (int i = 0; i {
for (int i = 0; i < 1000; i++) {
atomicInt.decrementAndGet(); // 原子递减
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("AtomicInteger最终值: " + atomicInt.get()); // 0
// AtomicLong示例
AtomicLong atomicLong = new AtomicLong(0);
atomicLong.addAndGet(100);
System.out.println("AtomicLong值: " + atomicLong.get());
// AtomicReference示例
AtomicReference atomicRef = new AtomicReference("初始值");
// CAS操作
boolean success = atomicRef.compareAndSet("初始值", "新值");
System.out.println("CAS操作结果: " + success + ", 当前值: " + atomicRef.get());
// 模拟ABA问题
AtomicReference abaRef = new AtomicReference(100);
Thread t3 = new Thread(() -> {
abaRef.compareAndSet(100, 101);
abaRef.compareAndSet(101, 100); // 改回原值
});
Thread t4 = new Thread(() -> {
try {
Thread.sleep(100); // 确保t3先执行
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean result = abaRef.compareAndSet(100, 102);
System.out.println("ABA场景CAS结果: " + result); // true,但可能有问题
});
t3.start();
t4.start();
t3.join();
t4.join();
}
}
实战案例:多线程下载管理器
下面我们实现一个多线程下载管理器,展示Java多线程的实际应用。
多线程下载器实现
import java.io.BufferedInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class MultiThreadDownloader {
private final String fileUrl;
private final String savePath;
private final int threadCount;
private final AtomicLong downloadedBytes = new AtomicLong(0);
private long fileSize = 0;
public MultiThreadDownloader(String fileUrl, String savePath, int threadCount) {
this.fileUrl = fileUrl;
this.savePath = savePath;
this.threadCount = threadCount;
}
public void download() throws IOException, InterruptedException {
// 获取文件大小
HttpURLConnection connection = (HttpURLConnection) new URL(fileUrl).openConnection();
connection.setRequestMethod("HEAD");
fileSize = connection.getContentLengthLong();
connection.disconnect();
System.out.println("文件大小: " + fileSize + " bytes");
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
// 计算每个线程下载的字节范围
long chunkSize = fileSize / threadCount;
for (int i = 0; i < threadCount; i++) {
long start = i * chunkSize;
long end = (i == threadCount - 1) ? fileSize - 1 : start + chunkSize - 1;
executor.execute(new DownloadTask(start, end, i));
}
executor.shutdown();
// 显示下载进度
showProgress();
// 等待下载完成
if (!executor.awaitTermination(1, TimeUnit.HOURS)) {
executor.shutdownNow();
throw new RuntimeException("下载超时");
}
System.out.println("n下载完成: " + savePath);
}
private class DownloadTask implements Runnable {
private final long start;
private final long end;
private final int threadId;
public DownloadTask(long start, long end, int threadId) {
this.start = start;
this.end = end;
this.threadId = threadId;
}
@Override
public void run() {
try {
HttpURLConnection connection = (HttpURLConnection) new URL(fileUrl).openConnection();
connection.setRequestProperty("Range", "bytes=" + start + "-" + end);
try (BufferedInputStream in = new BufferedInputStream(connection.getInputStream());
FileOutputStream out = new FileOutputStream(savePath + ".part" + threadId, true)) {
byte[] buffer = new byte[8192];
int bytesRead;
long totalRead = 0;
while ((bytesRead = in.read(buffer)) != -1 && totalRead {
while (downloadedBytes.get() < fileSize) {
try {
Thread.sleep(1000);
long downloaded = downloadedBytes.get();
double progress = (double) downloaded / fileSize * 100;
System.out.printf("下载进度: %.2f%% (%d/%d bytes)n",
progress, downloaded, fileSize);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}).start();
}
public static void main(String[] args) {
try {
MultiThreadDownloader downloader = new MultiThreadDownloader(
"https://example.com/largefile.zip",
"downloaded_file.zip",
4
);
downloader.download();
} catch (Exception e) {
System.err.println("下载失败: " + e.getMessage());
e.printStackTrace();
}
}
}
Java并发编程最佳实践
编写高质量的多线程代码需要遵循一些最佳实践原则。
1. 避免死锁
public class DeadlockPrevention {
private final Object lock1 = new Object();
private final Object lock2 = new Object();
public void method1() {
synchronized (lock1) {
System.out.println("获取lock1");
// 一些操作...
synchronized (lock2) {
System.out.println("获取lock2");
// 一些操作...
}
}
}
public void method2() {
// 按照相同顺序获取锁,避免死锁
synchronized (lock1) {
System.out.println("获取lock1");
// 一些操作...
synchronized (lock2) {
System.out.println("获取lock2");
// 一些操作...
}
}
}
// 使用tryLock避免死锁
public void tryLockMethod() {
boolean gotLock1 = false;
boolean gotLock2 = false;
try {
// 尝试获取锁,设置超时时间
gotLock1 = lock1.tryLock(100, TimeUnit.MILLISECONDS);
if (gotLock1) {
// 一些操作...
gotLock2 = lock2.tryLock(100, TimeUnit.MILLISECONDS);
if (gotLock2) {
// 一些操作...
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (gotLock2) {
lock2.unlock();
}
if (gotLock1) {
lock1.unlock();
}
}
}
}
2. 性能优化建议
- 尽量减少同步代码块的范围
- 使用读写锁替代独占锁,提高读性能
- 合理设置线程池参数,避免资源浪费
- 使用并发集合替代同步包装类
- 考虑使用无锁数据结构
3. 调试与监控
- 使用Thread Dump分析死锁和性能问题
- 使用JMX监控线程池状态
- 使用VisualVM或JProfiler进行性能分析
- 添加适当的日志记录,便于问题排查
总结
Java多线程与并发编程是Java开发中的高级主题,掌握这些技术对于构建高性能、高并发的应用程序至关重要。
通过本文的学习,你应该已经掌握了:
- Java多线程的基本概念和创建方式
- 线程同步和锁机制的原理与应用
- 线程池的管理和优化策略
- 并发集合类和原子操作的使用
- 实际项目中的多线程编程最佳实践
多线程编程虽然复杂,但通过不断实践和深入学习,你可以编写出高效、稳定的并发应用程序。记住始终优先考虑线程安全,避免常见的并发问题如竞态条件、死锁等。