Java并发编程实战:构建高性能异步任务调度系统 | Java高级教程

2025-08-15 0 594

发布日期:2024年2月15日

一、系统架构设计

本教程将基于Java并发包实现一个分布式任务调度系统,主要功能模块包括:

  • 任务调度引擎:支持CRON表达式和固定速率触发
  • 分布式锁:基于Redis和Zookeeper实现
  • 故障转移:心跳检测与任务重新分配
  • 动态分片:大数据任务并行处理
  • 监控看板:任务执行可视化追踪

技术栈:Java17 + Spring Boot3 + Redis + Zookeeper + Prometheus

二、核心类设计

1. 任务调度器接口

public interface TaskScheduler {
    /**
     * 添加定时任务
     * @param taskId 任务唯一ID
     * @param task 任务实例
     * @param trigger 触发器配置
     */
    void schedule(String taskId, Runnable task, Trigger trigger);
    
    /**
     * 取消任务
     * @param taskId 任务ID
     */
    void cancel(String taskId);
    
    /**
     * 获取执行统计
     */
    ExecutionStats getStats();
}

public interface Trigger {
    Date nextExecutionTime(TriggerContext context);
}

2. 分布式任务执行器

public class DistributedTaskExecutor {
    private final ExecutorService workerPool;
    private final DistributedLock lock;
    private final TaskRepository repository;
    
    public DistributedTaskExecutor(int poolSize, 
                                 DistributedLock lock,
                                 TaskRepository repository) {
        this.workerPool = new ThreadPoolExecutor(
            poolSize, poolSize,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue(1000),
            new TaskThreadFactory()
        );
        this.lock = lock;
        this.repository = repository;
    }
    
    public CompletableFuture<TaskResult> execute(Task task) {
        return CompletableFuture.supplyAsync(() -> {
            if (lock.tryLock(task.getId())) {
                try {
                    return task.execute();
                } finally {
                    lock.unlock(task.getId());
                    repository.saveExecution(task.getId());
                }
            }
            return TaskResult.skipped();
        }, workerPool);
    }
}

三、分布式锁实现

1. Redis分布式锁

public class RedisDistributedLock implements DistributedLock {
    private final JedisPool jedisPool;
    private final long lockTimeout;
    
    public RedisDistributedLock(JedisPool jedisPool, long lockTimeout) {
        this.jedisPool = jedisPool;
        this.lockTimeout = lockTimeout;
    }
    
    @Override
    public boolean tryLock(String lockKey) {
        try (Jedis jedis = jedisPool.getResource()) {
            String result = jedis.set(lockKey, "locked", 
                SetParams.setParams()
                .nx()
                .px(lockTimeout));
            return "OK".equals(result);
        }
    }
    
    @Override
    public void unlock(String lockKey) {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.del(lockKey);
        }
    }
}

2. 锁续约机制

public class LockRenewalTask implements Runnable {
    private final String lockKey;
    private final Jedis jedis;
    private volatile boolean running = true;
    
    public LockRenewalTask(String lockKey, Jedis jedis) {
        this.lockKey = lockKey;
        this.jedis = jedis;
    }
    
    @Override
    public void run() {
        while (running) {
            try {
                Thread.sleep(lockTimeout / 3);
                jedis.expire(lockKey, lockTimeout);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
    
    public void stop() {
        running = false;
    }
}

四、任务分片处理

1. 分片策略接口

public interface ShardingStrategy {
    /**
     * 获取当前节点应处理的分片
     */
    List<Integer> getAssignedShards(String taskId, int totalShards);
    
    /**
     * 数据分片方法
     */
    <T> Map<Integer, List<T>> shard(List<T> data, int shardCount);
}

2. 一致性哈希分片实现

public class ConsistentHashSharding implements ShardingStrategy {
    private final HashFunction hashFunction;
    private final String nodeId;
    
    public ConsistentHashSharding(String nodeId) {
        this.nodeId = nodeId;
        this.hashFunction = Hashing.murmur3_32();
    }
    
    @Override
    public List<Integer> getAssignedShards(String taskId, int totalShards) {
        List<Integer> assigned = new ArrayList<>();
        for (int i = 0; i < totalShards; i++) {
            String key = taskId + ":" + i;
            if (isAssignedToThisNode(key)) {
                assigned.add(i);
            }
        }
        return assigned;
    }
    
    private boolean isAssignedToThisNode(String key) {
        int hash = hashFunction.hashString(key, StandardCharsets.UTF_8).asInt();
        return Math.abs(hash % 100) < (100 / 3); // 假设3个节点
    }
}

五、故障转移机制

1. 心跳检测服务

@Service
public class HeartbeatService {
    private final ZkClient zkClient;
    private final String nodePath;
    private volatile boolean active = true;
    
    @Value("${server.port}")
    private int port;
    
    public HeartbeatService(ZkClient zkClient) {
        this.zkClient = zkClient;
        this.nodePath = "/scheduler/nodes/" + getNodeId();
    }
    
    @PostConstruct
    public void start() {
        // 注册节点
        zkClient.createEphemeral(nodePath, getNodeInfo());
        
        // 启动心跳线程
        new Thread(() -> {
            while (active) {
                try {
                    zkClient.writeData(nodePath, getNodeInfo());
                    Thread.sleep(5000);
                } catch (Exception e) {
                    recover();
                }
            }
        }).start();
    }
    
    private void recover() {
        // 重新连接和注册逻辑
    }
}

2. 任务重新分配

public class TaskReassigner {
    private final TaskRepository repository;
    private final TaskScheduler scheduler;
    
    @Scheduled(fixedDelay = 10000)
    public void checkAndReassign() {
        List<String> orphanedTasks = repository.findOrphanedTasks();
        for (String taskId : orphanedTasks) {
            Task task = repository.loadTask(taskId);
            scheduler.schedule(taskId, task, task.getTrigger());
        }
    }
}

六、性能优化技巧

1. 线程池优化配置

@Configuration
public class ThreadPoolConfig {
    
    @Bean
    public ThreadPoolTaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(Runtime.getRuntime().availableProcessors() * 2);
        scheduler.setThreadNamePrefix("task-scheduler-");
        scheduler.setAwaitTerminationSeconds(60);
        scheduler.setWaitForTasksToCompleteOnShutdown(true);
        scheduler.setRejectedExecutionHandler(
            new ThreadPoolExecutor.CallerRunsPolicy());
        return scheduler;
    }
    
    @Bean
    public ExecutorService ioBoundExecutor() {
        return new ThreadPoolExecutor(
            0, 100,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<>(),
            new NamedThreadFactory("io-worker-")
        );
    }
}

2. 异步任务流水线

public class TaskPipeline {
    private final Executor[] stages;
    
    public TaskPipeline(int stageCount, int threadsPerStage) {
        this.stages = new Executor[stageCount];
        for (int i = 0; i < stageCount; i++) {
            stages[i] = Executors.newFixedThreadPool(threadsPerStage,
                new NamedThreadFactory("stage-" + i + "-"));
        }
    }
    
    public CompletableFuture<TaskResult> process(Task task) {
        CompletableFuture<TaskContext> future = 
            CompletableFuture.supplyAsync(task::prepare, stages[0]);
        
        for (int i = 1; i < stages.length; i++) {
            future = future.thenApplyAsync(
                ctx -> task.executeStage(ctx, i), stages[i]);
        }
        
        return future.thenApply(task::finalize);
    }
}

七、监控与报警

1. Micrometer指标收集

@Service
public class TaskMetrics {
    private final MeterRegistry registry;
    private final Counter successCounter;
    private final Timer executionTimer;
    
    public TaskMetrics(MeterRegistry registry) {
        this.registry = registry;
        this.successCounter = registry.counter("task.execution", "result", "success");
        this.executionTimer = registry.timer("task.duration");
    }
    
    public void recordExecution(Task task, long duration, boolean success) {
        executionTimer.record(duration, TimeUnit.MILLISECONDS);
        if (success) {
            successCounter.increment();
        }
        
        registry.gauge("task.queue.size", 
            taskQueue.size(), 
            Tags.of("priority", task.getPriority()));
    }
}

2. Prometheus配置示例

management:
  endpoints:
    web:
      exposure:
        include: health,info,metrics,prometheus
  metrics:
    export:
      prometheus:
        enabled: true
    tags:
      application: task-scheduler

八、总结与扩展

通过本教程,我们实现了:

  1. 高性能分布式任务调度核心
  2. 可靠的分布式锁与故障转移
  3. 大数据任务分片处理
  4. 完善的监控报警体系

扩展方向:

  • Kubernetes Operator扩展
  • 机器学习任务调度优化
  • Serverless任务支持
  • 跨数据中心调度
Java并发编程实战:构建高性能异步任务调度系统 | Java高级教程
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 java Java并发编程实战:构建高性能异步任务调度系统 | Java高级教程 https://www.taomawang.com/server/java/844.html

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务