发布日期:2024年2月15日
一、系统架构设计
本教程将基于Java并发包实现一个分布式任务调度系统,主要功能模块包括:
- 任务调度引擎:支持CRON表达式和固定速率触发
- 分布式锁:基于Redis和Zookeeper实现
- 故障转移:心跳检测与任务重新分配
- 动态分片:大数据任务并行处理
- 监控看板:任务执行可视化追踪
技术栈:Java17 + Spring Boot3 + Redis + Zookeeper + Prometheus
二、核心类设计
1. 任务调度器接口
public interface TaskScheduler {
/**
* 添加定时任务
* @param taskId 任务唯一ID
* @param task 任务实例
* @param trigger 触发器配置
*/
void schedule(String taskId, Runnable task, Trigger trigger);
/**
* 取消任务
* @param taskId 任务ID
*/
void cancel(String taskId);
/**
* 获取执行统计
*/
ExecutionStats getStats();
}
public interface Trigger {
Date nextExecutionTime(TriggerContext context);
}
2. 分布式任务执行器
public class DistributedTaskExecutor {
private final ExecutorService workerPool;
private final DistributedLock lock;
private final TaskRepository repository;
public DistributedTaskExecutor(int poolSize,
DistributedLock lock,
TaskRepository repository) {
this.workerPool = new ThreadPoolExecutor(
poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(1000),
new TaskThreadFactory()
);
this.lock = lock;
this.repository = repository;
}
public CompletableFuture<TaskResult> execute(Task task) {
return CompletableFuture.supplyAsync(() -> {
if (lock.tryLock(task.getId())) {
try {
return task.execute();
} finally {
lock.unlock(task.getId());
repository.saveExecution(task.getId());
}
}
return TaskResult.skipped();
}, workerPool);
}
}
三、分布式锁实现
1. Redis分布式锁
public class RedisDistributedLock implements DistributedLock {
private final JedisPool jedisPool;
private final long lockTimeout;
public RedisDistributedLock(JedisPool jedisPool, long lockTimeout) {
this.jedisPool = jedisPool;
this.lockTimeout = lockTimeout;
}
@Override
public boolean tryLock(String lockKey) {
try (Jedis jedis = jedisPool.getResource()) {
String result = jedis.set(lockKey, "locked",
SetParams.setParams()
.nx()
.px(lockTimeout));
return "OK".equals(result);
}
}
@Override
public void unlock(String lockKey) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.del(lockKey);
}
}
}
2. 锁续约机制
public class LockRenewalTask implements Runnable {
private final String lockKey;
private final Jedis jedis;
private volatile boolean running = true;
public LockRenewalTask(String lockKey, Jedis jedis) {
this.lockKey = lockKey;
this.jedis = jedis;
}
@Override
public void run() {
while (running) {
try {
Thread.sleep(lockTimeout / 3);
jedis.expire(lockKey, lockTimeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
public void stop() {
running = false;
}
}
四、任务分片处理
1. 分片策略接口
public interface ShardingStrategy {
/**
* 获取当前节点应处理的分片
*/
List<Integer> getAssignedShards(String taskId, int totalShards);
/**
* 数据分片方法
*/
<T> Map<Integer, List<T>> shard(List<T> data, int shardCount);
}
2. 一致性哈希分片实现
public class ConsistentHashSharding implements ShardingStrategy {
private final HashFunction hashFunction;
private final String nodeId;
public ConsistentHashSharding(String nodeId) {
this.nodeId = nodeId;
this.hashFunction = Hashing.murmur3_32();
}
@Override
public List<Integer> getAssignedShards(String taskId, int totalShards) {
List<Integer> assigned = new ArrayList<>();
for (int i = 0; i < totalShards; i++) {
String key = taskId + ":" + i;
if (isAssignedToThisNode(key)) {
assigned.add(i);
}
}
return assigned;
}
private boolean isAssignedToThisNode(String key) {
int hash = hashFunction.hashString(key, StandardCharsets.UTF_8).asInt();
return Math.abs(hash % 100) < (100 / 3); // 假设3个节点
}
}
五、故障转移机制
1. 心跳检测服务
@Service
public class HeartbeatService {
private final ZkClient zkClient;
private final String nodePath;
private volatile boolean active = true;
@Value("${server.port}")
private int port;
public HeartbeatService(ZkClient zkClient) {
this.zkClient = zkClient;
this.nodePath = "/scheduler/nodes/" + getNodeId();
}
@PostConstruct
public void start() {
// 注册节点
zkClient.createEphemeral(nodePath, getNodeInfo());
// 启动心跳线程
new Thread(() -> {
while (active) {
try {
zkClient.writeData(nodePath, getNodeInfo());
Thread.sleep(5000);
} catch (Exception e) {
recover();
}
}
}).start();
}
private void recover() {
// 重新连接和注册逻辑
}
}
2. 任务重新分配
public class TaskReassigner {
private final TaskRepository repository;
private final TaskScheduler scheduler;
@Scheduled(fixedDelay = 10000)
public void checkAndReassign() {
List<String> orphanedTasks = repository.findOrphanedTasks();
for (String taskId : orphanedTasks) {
Task task = repository.loadTask(taskId);
scheduler.schedule(taskId, task, task.getTrigger());
}
}
}
六、性能优化技巧
1. 线程池优化配置
@Configuration
public class ThreadPoolConfig {
@Bean
public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(Runtime.getRuntime().availableProcessors() * 2);
scheduler.setThreadNamePrefix("task-scheduler-");
scheduler.setAwaitTerminationSeconds(60);
scheduler.setWaitForTasksToCompleteOnShutdown(true);
scheduler.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy());
return scheduler;
}
@Bean
public ExecutorService ioBoundExecutor() {
return new ThreadPoolExecutor(
0, 100,
60L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new NamedThreadFactory("io-worker-")
);
}
}
2. 异步任务流水线
public class TaskPipeline {
private final Executor[] stages;
public TaskPipeline(int stageCount, int threadsPerStage) {
this.stages = new Executor[stageCount];
for (int i = 0; i < stageCount; i++) {
stages[i] = Executors.newFixedThreadPool(threadsPerStage,
new NamedThreadFactory("stage-" + i + "-"));
}
}
public CompletableFuture<TaskResult> process(Task task) {
CompletableFuture<TaskContext> future =
CompletableFuture.supplyAsync(task::prepare, stages[0]);
for (int i = 1; i < stages.length; i++) {
future = future.thenApplyAsync(
ctx -> task.executeStage(ctx, i), stages[i]);
}
return future.thenApply(task::finalize);
}
}
七、监控与报警
1. Micrometer指标收集
@Service
public class TaskMetrics {
private final MeterRegistry registry;
private final Counter successCounter;
private final Timer executionTimer;
public TaskMetrics(MeterRegistry registry) {
this.registry = registry;
this.successCounter = registry.counter("task.execution", "result", "success");
this.executionTimer = registry.timer("task.duration");
}
public void recordExecution(Task task, long duration, boolean success) {
executionTimer.record(duration, TimeUnit.MILLISECONDS);
if (success) {
successCounter.increment();
}
registry.gauge("task.queue.size",
taskQueue.size(),
Tags.of("priority", task.getPriority()));
}
}
2. Prometheus配置示例
management:
endpoints:
web:
exposure:
include: health,info,metrics,prometheus
metrics:
export:
prometheus:
enabled: true
tags:
application: task-scheduler
八、总结与扩展
通过本教程,我们实现了:
- 高性能分布式任务调度核心
- 可靠的分布式锁与故障转移
- 大数据任务分片处理
- 完善的监控报警体系
扩展方向:
- Kubernetes Operator扩展
- 机器学习任务调度优化
- Serverless任务支持
- 跨数据中心调度