引言:现代应用中的并发挑战
在当今高并发的互联网应用中,如何有效管理和执行异步任务成为系统性能的关键。本文将带领您从零开始构建一个功能完整、性能优异的异步任务处理框架,涵盖线程池优化、任务调度、异常处理等核心话题。
框架设计目标
- 支持多种任务优先级
- 动态线程池管理
- 任务生命周期监控
- 优雅的关闭机制
- 可扩展的任务类型支持
第一步:定义任务接口和基础类
/**
* 异步任务接口
*/
public interface AsyncTask {
/**
* 执行任务
*/
T execute() throws Exception;
/**
* 获取任务ID
*/
String getTaskId();
/**
* 获取任务优先级
*/
TaskPriority getPriority();
/**
* 超时时间(毫秒)
*/
default long getTimeout() {
return 30000L;
}
}
/**
* 任务优先级枚举
*/
public enum TaskPriority {
HIGH(1), MEDIUM(2), LOW(3);
private final int value;
TaskPriority(int value) {
this.value = value;
}
public int getValue() {
return value;
}
}
/**
* 基础任务抽象类
*/
public abstract class AbstractTask implements AsyncTask {
private final String taskId;
private final TaskPriority priority;
private final long createTime;
protected AbstractTask(String taskId, TaskPriority priority) {
this.taskId = taskId;
this.priority = priority;
this.createTime = System.currentTimeMillis();
}
@Override
public String getTaskId() {
return taskId;
}
@Override
public TaskPriority getPriority() {
return priority;
}
public long getCreateTime() {
return createTime;
}
}
第二步:实现优先级任务队列
/**
* 优先级任务队列
*/
public class PriorityTaskQueue {
private final PriorityBlockingQueue queue;
public PriorityTaskQueue() {
this.queue = new PriorityBlockingQueue(100,
Comparator.comparingInt((QueueTask t) -> t.getPriority().getValue())
.thenComparingLong(QueueTask::getCreateTime)
);
}
/**
* 内部队列任务包装类
*/
private static class QueueTask {
private final AsyncTask task;
private final Future future;
private final long createTime;
public QueueTask(AsyncTask task, Future future) {
this.task = task;
this.future = future;
this.createTime = System.currentTimeMillis();
}
public TaskPriority getPriority() {
return task.getPriority();
}
public long getCreateTime() {
return createTime;
}
public AsyncTask getTask() {
return task;
}
public Future getFuture() {
return future;
}
}
public boolean offer(AsyncTask task, Future future) {
return queue.offer(new QueueTask(task, future));
}
public QueueTask poll() {
return queue.poll();
}
public QueueTask peek() {
return queue.peek();
}
public int size() {
return queue.size();
}
public boolean isEmpty() {
return queue.isEmpty();
}
}
第三步:构建智能线程池管理器
/**
* 智能线程池管理器
*/
public class SmartThreadPool {
private final ThreadPoolExecutor executor;
private final PriorityTaskQueue taskQueue;
private final AtomicInteger activeCount;
private final ScheduledExecutorService monitorExecutor;
public SmartThreadPool(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit) {
this.taskQueue = new PriorityTaskQueue();
this.activeCount = new AtomicInteger(0);
// 创建监控线程池
this.monitorExecutor = Executors.newScheduledThreadPool(1);
// 创建主线程池
this.executor = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, unit,
new LinkedBlockingQueue(),
new SmartThreadFactory(),
new SmartRejectedExecutionHandler()
);
startMonitoring();
}
/**
* 智能线程工厂
*/
private static class SmartThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix = "smart-pool-thread-";
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(false);
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
/**
* 智能拒绝策略
*/
private class SmartRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r instanceof FutureTask) {
// 这里可以扩展为将任务持久化到数据库或消息队列
System.err.println("任务被拒绝,考虑扩展持久化功能: " + r);
}
throw new RejectedExecutionException("任务队列已满,无法执行新任务");
}
}
/**
* 提交任务执行
*/
public CompletableFuture submit(AsyncTask task) {
activeCount.incrementAndGet();
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
try {
return task.execute();
} catch (Exception e) {
throw new CompletionException(e);
} finally {
activeCount.decrementAndGet();
}
}, executor);
// 设置超时处理
future.orTimeout(task.getTimeout(), TimeUnit.MILLISECONDS)
.exceptionally(throwable -> {
handleTaskException(task, throwable);
return null;
});
return future;
}
private void handleTaskException(AsyncTask task, Throwable throwable) {
if (throwable instanceof TimeoutException) {
System.err.println("任务超时: " + task.getTaskId());
} else if (throwable instanceof CompletionException) {
System.err.println("任务执行异常: " + task.getTaskId() + ", " +
throwable.getCause().getMessage());
}
}
/**
* 启动监控任务
*/
private void startMonitoring() {
monitorExecutor.scheduleAtFixedRate(() -> {
int poolSize = executor.getPoolSize();
int activeCount = executor.getActiveCount();
long completedTaskCount = executor.getCompletedTaskCount();
long taskCount = executor.getTaskCount();
System.out.printf("线程池状态: 池大小=%d, 活动线程=%d, 完成任务=%d, 总任务=%d%n",
poolSize, activeCount, completedTaskCount, taskCount);
// 动态调整线程池大小
adjustPoolSize();
}, 1, 5, TimeUnit.SECONDS);
}
/**
* 动态调整线程池大小
*/
private void adjustPoolSize() {
int activeCount = executor.getActiveCount();
int poolSize = executor.getPoolSize();
int corePoolSize = executor.getCorePoolSize();
// 如果活动线程数超过核心线程数的80%,考虑扩容
if (activeCount > corePoolSize * 0.8) {
int newCoreSize = Math.min(corePoolSize + 2, executor.getMaximumPoolSize());
executor.setCorePoolSize(newCoreSize);
System.out.println("动态扩容核心线程数到: " + newCoreSize);
}
// 如果活动线程数低于核心线程数的50%,考虑缩容
else if (activeCount 1) {
int newCoreSize = Math.max(1, corePoolSize - 1);
executor.setCorePoolSize(newCoreSize);
System.out.println("动态缩容核心线程数到: " + newCoreSize);
}
}
/**
* 优雅关闭
*/
public void gracefulShutdown() {
monitorExecutor.shutdown();
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("线程池未能正常关闭");
}
}
} catch (InterruptedException ie) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
第四步:实现任务执行器和管理器
/**
* 异步任务执行器
*/
public class AsyncTaskExecutor {
private final SmartThreadPool threadPool;
private final Map<String, CompletableFuture> taskFutures;
private final ReentrantReadWriteLock lock;
public AsyncTaskExecutor() {
this.threadPool = new SmartThreadPool(4, 16, 60, TimeUnit.SECONDS);
this.taskFutures = new ConcurrentHashMap();
this.lock = new ReentrantReadWriteLock();
}
/**
* 提交任务执行
*/
public CompletableFuture submit(AsyncTask task) {
CompletableFuture future = threadPool.submit(task);
lock.writeLock().lock();
try {
taskFutures.put(task.getTaskId(), future);
// 任务完成后清理
future.whenComplete((result, throwable) -> {
lock.writeLock().lock();
try {
taskFutures.remove(task.getTaskId());
} finally {
lock.writeLock().unlock();
}
});
} finally {
lock.writeLock().unlock();
}
return future;
}
/**
* 取消任务
*/
public boolean cancelTask(String taskId) {
lock.readLock().lock();
try {
CompletableFuture future = taskFutures.get(taskId);
if (future != null && !future.isDone()) {
return future.cancel(true);
}
return false;
} finally {
lock.readLock().unlock();
}
}
/**
* 获取任务状态
*/
public String getTaskStatus(String taskId) {
lock.readLock().lock();
try {
CompletableFuture future = taskFutures.get(taskId);
if (future == null) {
return "NOT_FOUND";
} else if (future.isDone()) {
return "COMPLETED";
} else if (future.isCancelled()) {
return "CANCELLED";
} else {
return "RUNNING";
}
} finally {
lock.readLock().unlock();
}
}
/**
* 关闭执行器
*/
public void shutdown() {
threadPool.gracefulShutdown();
}
}
/**
* 任务管理器
*/
public class TaskManager {
private final AsyncTaskExecutor executor;
private static volatile TaskManager instance;
private TaskManager() {
this.executor = new AsyncTaskExecutor();
}
public static TaskManager getInstance() {
if (instance == null) {
synchronized (TaskManager.class) {
if (instance == null) {
instance = new TaskManager();
}
}
}
return instance;
}
public CompletableFuture submitTask(AsyncTask task) {
return executor.submit(task);
}
public void shutdown() {
executor.shutdown();
}
}
第五步:实际应用示例
/**
* 邮件发送任务
*/
public class EmailSendTask extends AbstractTask {
private final String to;
private final String subject;
private final String content;
public EmailSendTask(String taskId, String to, String subject, String content) {
super(taskId, TaskPriority.MEDIUM);
this.to = to;
this.subject = subject;
this.content = content;
}
@Override
public Boolean execute() throws Exception {
System.out.println("开始发送邮件到: " + to);
// 模拟邮件发送过程
Thread.sleep(2000);
// 模拟90%的成功率
boolean success = Math.random() > 0.1;
if (success) {
System.out.println("邮件发送成功: " + subject);
} else {
System.out.println("邮件发送失败: " + subject);
}
return success;
}
@Override
public long getTimeout() {
return 10000L; // 邮件发送超时10秒
}
}
/**
* 图片处理任务
*/
public class ImageProcessTask extends AbstractTask {
private final String imagePath;
private final int width;
private final int height;
public ImageProcessTask(String taskId, String imagePath, int width, int height) {
super(taskId, TaskPriority.HIGH);
this.imagePath = imagePath;
this.width = width;
this.height = height;
}
@Override
public String execute() throws Exception {
System.out.println("开始处理图片: " + imagePath);
// 模拟图片处理过程
Thread.sleep(3000);
String resultPath = "/processed/" + System.currentTimeMillis() + ".jpg";
System.out.println("图片处理完成: " + resultPath);
return resultPath;
}
}
/**
* 演示类
*/
public class AsyncTaskDemo {
public static void main(String[] args) throws Exception {
TaskManager taskManager = TaskManager.getInstance();
// 提交多个任务
List<CompletableFuture> futures = new ArrayList();
// 提交邮件发送任务
for (int i = 1; i <= 5; i++) {
EmailSendTask emailTask = new EmailSendTask(
"email-" + i,
"user" + i + "@example.com",
"测试邮件主题 " + i,
"邮件内容 " + i
);
CompletableFuture future = taskManager.submitTask(emailTask);
futures.add(future);
// 添加回调处理
future.thenAccept(success -> {
if (success) {
System.out.println("邮件回调: 发送成功");
} else {
System.out.println("邮件回调: 发送失败");
}
});
}
// 提交图片处理任务
ImageProcessTask imageTask = new ImageProcessTask(
"image-1", "/uploads/photo.jpg", 800, 600
);
CompletableFuture imageFuture = taskManager.submitTask(imageTask);
futures.add(imageFuture);
imageFuture.thenAccept(resultPath -> {
System.out.println("图片处理完成,路径: " + resultPath);
});
// 等待所有任务完成
CompletableFuture allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
allFutures.get(30, TimeUnit.SECONDS);
System.out.println("所有任务执行完成");
// 关闭任务管理器
taskManager.shutdown();
}
}
性能优化和最佳实践
1. 线程池参数调优
// 根据系统资源动态计算线程池参数
public static SmartThreadPool createOptimizedPool() {
int availableProcessors = Runtime.getRuntime().availableProcessors();
int corePoolSize = Math.max(2, availableProcessors);
int maxPoolSize = Math.min(availableProcessors * 4, 64);
return new SmartThreadPool(corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS);
}
2. 内存泄漏防护
// 定期清理已完成的任务引用
public void cleanupCompletedTasks() {
lock.writeLock().lock();
try {
taskFutures.entrySet().removeIf(entry ->
entry.getValue().isDone() || entry.getValue().isCancelled()
);
} finally {
lock.writeLock().unlock();
}
}
3. 异常处理策略
// 全局异常处理器
public class GlobalExceptionHandler {
public static void handleTaskException(String taskId, Throwable throwable) {
// 记录日志
System.err.println("任务异常 - ID: " + taskId + ", 错误: " + throwable.getMessage());
// 发送告警
if (throwable instanceof OutOfMemoryError) {
sendAlert("内存不足告警");
}
// 重试逻辑
if (isRetryable(throwable)) {
scheduleRetry(taskId);
}
}
private static boolean isRetryable(Throwable throwable) {
return throwable instanceof IOException ||
throwable instanceof TimeoutException;
}
}
监控和运维
1. JMX监控集成
@MXBean
public interface ThreadPoolMonitorMBean {
int getPoolSize();
int getActiveCount();
long getCompletedTaskCount();
int getQueueSize();
double getAverageTaskTime();
}
public class ThreadPoolMonitor implements ThreadPoolMonitorMBean {
private final SmartThreadPool threadPool;
// 实现监控方法...
}
2. 指标收集
public class MetricsCollector {
private final Meter taskSubmissionMeter;
private final Timer taskExecutionTimer;
private final Counter failedTaskCounter;
public void recordTaskSubmission() {
taskSubmissionMeter.mark();
}
public void recordTaskExecution(long duration, TimeUnit unit) {
taskExecutionTimer.record(duration, unit);
}
public void recordTaskFailure() {
failedTaskCounter.increment();
}
}
扩展功能
1. 任务依赖管理
public class DependentTask extends AbstractTask {
private final List dependentTaskIds;
public boolean allDependenciesCompleted(TaskManager taskManager) {
return dependentTaskIds.stream()
.allMatch(taskId -> "COMPLETED".equals(taskManager.getTaskStatus(taskId)));
}
}
2. 分布式任务支持
public class DistributedTaskExecutor {
private final List nodes;
private final LoadBalancer loadBalancer;
public CompletableFuture submitDistributed(AsyncTask task) {
TaskManager selectedNode = loadBalancer.selectNode(nodes);
return selectedNode.submitTask(task);
}
}
总结
通过本文的完整实现,我们构建了一个功能丰富、性能优异的Java异步任务处理框架。这个框架不仅解决了基本的异步执行需求,还提供了优先级调度、动态资源管理、监控告警等企业级功能。
关键收获:
- 深入理解了Java并发编程的核心机制
- 掌握了线程池的优化和动态调整策略
- 学会了构建可扩展的异步处理架构
- 了解了企业级系统的监控和运维要求
这个框架可以作为基础,根据具体业务需求进行扩展和定制,为构建高性能的Java应用提供强有力的支持。

