Java并发编程核心概念
🚀 线程生命周期
深入理解线程的创建、运行、阻塞、等待和终止状态,掌握线程状态转换的关键机制。
🛡️ 线程安全
学习同步机制、锁优化、原子操作等关键技术,确保多线程环境下的数据一致性。
⚡ 性能优化
掌握线程池配置、并发集合使用、避免死锁等性能优化技巧。
线程创建与同步机制
1. 线程创建的三种方式
方式一:继承Thread类
public class MyThread extends Thread {
private String taskName;
public MyThread(String taskName) {
this.taskName = taskName;
}
@Override
public void run() {
System.out.println(taskName + " 开始执行,线程ID: " + Thread.currentThread().getId());
try {
// 模拟任务执行
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(taskName + " 执行完成");
}
// 使用示例
public static void main(String[] args) {
MyThread thread1 = new MyThread("任务A");
MyThread thread2 = new MyThread("任务B");
thread1.start();
thread2.start();
}
}
方式二:实现Runnable接口
public class TaskRunner implements Runnable {
private final String taskName;
private final int executionTime;
public TaskRunner(String taskName, int executionTime) {
this.taskName = taskName;
this.executionTime = executionTime;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 开始执行: " + taskName);
try {
Thread.sleep(executionTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println(Thread.currentThread().getName() + " 完成: " + taskName);
}
// 使用示例
public static void main(String[] args) {
Thread thread1 = new Thread(new TaskRunner("数据处理", 2000));
Thread thread2 = new Thread(new TaskRunner("文件下载", 3000));
thread1.start();
thread2.start();
}
}
方式三:实现Callable接口(带返回值)
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class ComputeTask implements Callable {
private final int number;
public ComputeTask(int number) {
this.number = number;
}
@Override
public Integer call() throws Exception {
System.out.println("计算任务开始,数字: " + number);
// 模拟复杂计算
Thread.sleep(1000);
int result = number * number;
System.out.println("计算完成,结果: " + result);
return result;
}
// 使用示例
public static void main(String[] args) throws Exception {
ComputeTask task = new ComputeTask(5);
FutureTask futureTask = new FutureTask(task);
Thread thread = new Thread(futureTask);
thread.start();
// 获取计算结果(会阻塞直到计算完成)
Integer result = futureTask.get();
System.out.println("获取到结果: " + result);
}
}
线程池深度解析与实战
自定义线程池配置
import java.util.concurrent.*;
public class CustomThreadPool {
private final ExecutorService executor;
private final int corePoolSize;
private final int maxPoolSize;
private final long keepAliveTime;
public CustomThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime) {
this.corePoolSize = corePoolSize;
this.maxPoolSize = maxPoolSize;
this.keepAliveTime = keepAliveTime;
// 创建线程池
this.executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue(100), // 任务队列
new CustomThreadFactory(), // 线程工厂
new CustomRejectionPolicy() // 拒绝策略
);
}
// 自定义线程工厂
static class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "custom-pool-" + threadNumber.getAndIncrement());
thread.setDaemon(false);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
}
// 自定义拒绝策略
static class CustomRejectionPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.err.println("任务被拒绝: " + r.toString());
// 可以在这里实现自定义的拒绝处理逻辑
throw new RejectedExecutionException("线程池已满,任务被拒绝");
}
}
public Future submit(Runnable task) {
return executor.submit(task);
}
public Future submit(Callable task) {
return executor.submit(task);
}
public void shutdown() {
executor.shutdown();
}
}
线程池监控与管理
import java.util.concurrent.ThreadPoolExecutor;
public class ThreadPoolMonitor {
private final ThreadPoolExecutor executor;
public ThreadPoolMonitor(ThreadPoolExecutor executor) {
this.executor = executor;
}
public void printPoolStatus() {
System.out.println("=== 线程池状态监控 ===");
System.out.println("核心线程数: " + executor.getCorePoolSize());
System.out.println("当前线程数: " + executor.getPoolSize());
System.out.println("最大线程数: " + executor.getMaximumPoolSize());
System.out.println("活跃线程数: " + executor.getActiveCount());
System.out.println("任务队列大小: " + executor.getQueue().size());
System.out.println("已完成任务数: " + executor.getCompletedTaskCount());
System.out.println("总任务数: " + executor.getTaskCount());
System.out.println("=========================");
}
public boolean isOverloaded() {
// 判断线程池是否过载
double loadFactor = (double) executor.getActiveCount() / executor.getMaximumPoolSize();
return loadFactor > 0.8;
}
public void adaptiveAdjustment() {
// 自适应调整线程池参数
int currentLoad = executor.getActiveCount();
int corePoolSize = executor.getCorePoolSize();
if (currentLoad > corePoolSize * 0.8) {
// 增加核心线程数
executor.setCorePoolSize(Math.min(corePoolSize + 2, executor.getMaximumPoolSize()));
System.out.println("增加核心线程数至: " + executor.getCorePoolSize());
} else if (currentLoad < corePoolSize * 0.3) {
// 减少核心线程数
executor.setCorePoolSize(Math.max(corePoolSize - 1, 1));
System.out.println("减少核心线程数至: " + executor.getCorePoolSize());
}
}
}
并发集合类实战应用
ConcurrentHashMap 高级用法
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
public class ConcurrentMapExample {
private final ConcurrentHashMap wordCount = new ConcurrentHashMap();
// 线程安全的计数方法
public void addWord(String word) {
wordCount.computeIfAbsent(word, k -> new LongAdder()).increment();
}
// 批量操作
public void mergeWordCounts(ConcurrentHashMap other) {
other.forEach((word, count) -> {
wordCount.merge(word, count, (oldCount, newCount) -> {
oldCount.add(newCount.longValue());
return oldCount;
});
});
}
// 搜索操作
public String findMostFrequentWord() {
return wordCount.reduceEntries(1,
(entry1, entry2) -> entry1.getValue().longValue() > entry2.getValue().longValue() ? entry1 : entry2
).getKey();
}
}
BlockingQueue 生产者消费者模式
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ProducerConsumerExample {
private final BlockingQueue queue = new LinkedBlockingQueue(100);
// 生产者
class Producer implements Runnable {
private final String name;
public Producer(String name) {
this.name = name;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
String item = name + "-item-" + i;
queue.put(item);
System.out.println("生产者 " + name + " 生产: " + item);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 消费者
class Consumer implements Runnable {
private final String name;
public Consumer(String name) {
this.name = name;
}
@Override
public void run() {
try {
while (true) {
String item = queue.take();
System.out.println("消费者 " + name + " 消费: " + item);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
综合实战:高性能Web服务器
基于NIO的异步任务处理器
import java.util.concurrent.*;
public class AsyncTaskProcessor {
private final ExecutorService ioExecutor;
private final ExecutorService computeExecutor;
private final ScheduledExecutorService scheduler;
public AsyncTaskProcessor() {
// IO密集型任务线程池
this.ioExecutor = new ThreadPoolExecutor(
10, 50, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue(1000),
new ThreadFactory() {
private final AtomicInteger count = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "io-worker-" + count.getAndIncrement());
}
}
);
// 计算密集型任务线程池
int processors = Runtime.getRuntime().availableProcessors();
this.computeExecutor = new ThreadPoolExecutor(
processors, processors * 2, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue(1000),
new ThreadFactory() {
private final AtomicInteger count = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "compute-worker-" + count.getAndIncrement());
}
}
);
// 调度线程池
this.scheduler = Executors.newScheduledThreadPool(2);
}
// 提交IO任务
public CompletableFuture submitIoTask(Callable task) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
} catch (Exception e) {
throw new CompletionException(e);
}
}, ioExecutor);
}
// 提交计算任务
public CompletableFuture submitComputeTask(Callable task) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
} catch (Exception e) {
throw new CompletionException(e);
}
}, computeExecutor);
}
// 定时任务
public ScheduledFuture scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) {
return scheduler.scheduleAtFixedRate(task, initialDelay, period, unit);
}
// 组合任务
public CompletableFuture processPipeline(T input) {
return submitIoTask(() -> {
// 第一阶段:IO操作
System.out.println("第一阶段IO处理: " + input);
return input.toString().toUpperCase();
}).thenComposeAsync(ioResult ->
submitComputeTask(() -> {
// 第二阶段:计算操作
System.out.println("第二阶段计算处理: " + ioResult);
return (U) ("处理结果: " + ioResult + "-" + System.currentTimeMillis());
}), computeExecutor
);
}
public void shutdown() {
ioExecutor.shutdown();
computeExecutor.shutdown();
scheduler.shutdown();
}
}
Java并发编程最佳实践总结
✅ 线程池配置
- IO密集型:2N个线程
- 计算密集型:N+1个线程
- 合理设置队列容量
✅ 锁优化策略
- 减少锁的持有时间
- 使用读写锁分离
- 避免嵌套锁
✅ 内存可见性
- 正确使用volatile
- 理解happens-before原则
- 避免指令重排序
通过合理的线程池配置、正确的同步机制和优化的数据结构选择,可以构建出高性能、高可用的Java并发应用。

