Java高并发编程实战:构建亿级流量秒杀系统 | 分布式架构设计

2025-08-15 0 306

发布日期:2024年2月15日

一、系统架构设计

本教程将实现一个完整的秒杀系统,主要解决以下核心问题:

  • 瞬时高并发:QPS 10万+的流量冲击
  • 超卖问题:库存精确扣减
  • 防刷机制:黄牛识别与拦截
  • 系统保护:熔断降级策略
  • 数据一致性分布式事务处理

技术栈:Spring Boot 3 + Redis + RocketMQ + Zookeeper + Sentinel

二、基础环境搭建

1. 项目初始化

// 使用Spring Initializr创建项目
mvn archetype:generate 
  -DgroupId=com.seckill 
  -DartifactId=seckill-system 
  -DarchetypeArtifactId=maven-archetype-quickstart 
  -DinteractiveMode=false

// 添加关键依赖
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.2.3</version>
</dependency>

2. 分层架构设计

src/
├── main/
│   ├── java/
│   │   ├── com.seckill/
│   │   │   ├── api/        # 接口层
│   │   │   ├── application/# 应用服务
│   │   │   ├── domain/     # 领域模型
│   │   │   ├── infrastructure/
│   │   │   │   ├── cache/  # 缓存实现
│   │   │   │   ├── mq/     # 消息队列
│   │   │   │   └── repo/   # 仓储实现
│   │   │   └── startup/    # 启动配置
│   ├── resources/
│   │   ├── config/         # 多环境配置
│   │   └── scripts/        # 库存预热脚本
├── test/                   # 压测代码

三、核心业务实现

1. 分布式锁实现

public class RedisDistributedLock implements AutoCloseable {
    private final RedissonClient redisson;
    private final String lockKey;
    private RLock lock;
    
    public RedisDistributedLock(RedissonClient redisson, String lockKey) {
        this.redisson = redisson;
        this.lockKey = "lock:" + lockKey;
    }
    
    public boolean tryLock(long waitTime, long leaseTime) {
        try {
            lock = redisson.getLock(lockKey);
            return lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }
    
    @Override
    public void close() {
        if (lock != null && lock.isHeldByCurrentThread()) {
            lock.unlock();
        }
    }
}

// 使用示例
try (RedisDistributedLock lock = new RedisDistributedLock(redisson, "stock_"+skuId)) {
    if (lock.tryLock(1, 30)) {
        // 扣减库存操作
        reduceStock(skuId);
    }
}

2. 库存扣减逻辑

@Transactional
public boolean reduceStock(Long skuId, Integer num) {
    // 1. 校验库存是否充足
    SeckillSku sku = skuRepository.findById(skuId)
        .orElseThrow(() -> new BusinessException("商品不存在"));
    
    if (sku.getAvailableStock() < num) {
        return false;
    }
    
    // 2. 扣减数据库库存(乐观锁)
    int affected = skuRepository.reduceStock(skuId, num, sku.getVersion());
    if (affected == 0) {
        throw new ConcurrentUpdateException("库存并发修改冲突");
    }
    
    // 3. 同步Redis库存
    redisTemplate.opsForValue().decrement(
        "stock:" + skuId, num);
    
    // 4. 发送扣减消息
    sendStockReduceMessage(skuId, num);
    return true;
}

四、高并发优化方案

1. 多级缓存设计

public SeckillSku getSkuWithCache(Long skuId) {
    // 1. 本地缓存
    SeckillSku sku = caffeineCache.getIfPresent(skuId);
    if (sku != null) {
        return sku;
    }
    
    // 2. Redis缓存
    String cacheKey = "sku:" + skuId;
    String json = redisTemplate.opsForValue().get(cacheKey);
    if (json != null) {
        sku = JsonUtils.parse(json, SeckillSku.class);
        caffeineCache.put(skuId, sku);
        return sku;
    }
    
    // 3. 数据库查询(防击穿)
    synchronized (this) {
        // 双重检查
        json = redisTemplate.opsForValue().get(cacheKey);
        if (json != null) {
            return JsonUtils.parse(json, SeckillSku.class);
        }
        
        sku = skuRepository.findById(skuId).orElse(null);
        if (sku != null) {
            redisTemplate.opsForValue().set(
                cacheKey, 
                JsonUtils.stringify(sku),
                5, TimeUnit.MINUTES);
        }
        return sku;
    }
}

2. 令牌桶限流实现

public class TokenBucketLimiter {
    private final AtomicLong tokens;
    private final long capacity;
    private final long refillRate; // tokens/second
    private volatile long lastRefillTime;
    
    public TokenBucketLimiter(long capacity, long refillRate) {
        this.capacity = capacity;
        this.refillRate = refillRate;
        this.tokens = new AtomicLong(capacity);
        this.lastRefillTime = System.currentTimeMillis();
    }
    
    public boolean tryAcquire(int permits) {
        refillTokens();
        while (true) {
            long current = tokens.get();
            if (current  1000) {
            long newTokens = elapsedTime * refillRate / 1000;
            synchronized (this) {
                elapsedTime = now - lastRefillTime;
                if (elapsedTime > 1000) {
                    long current = tokens.get();
                    long adjustedTokens = Math.min(
                        current + newTokens, capacity);
                    tokens.set(adjustedTokens);
                    lastRefillTime = now;
                }
            }
        }
    }
}

五、分布式事务方案

1. 可靠消息最终一致性

@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(
        Message msg, Object arg) {
        try {
            OrderCreateDTO dto = (OrderCreateDTO) arg;
            // 1. 创建订单(数据库事务)
            orderService.createOrder(dto);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String orderNo = msg.getUserProperty("orderNo");
        Order order = orderService.getByOrderNo(orderNo);
        return order != null ? 
            RocketMQLocalTransactionState.COMMIT :
            RocketMQLocalTransactionState.ROLLBACK;
    }
}

// 发送半消息
public void createOrderAsync(OrderCreateDTO dto) {
    rocketMQTemplate.sendMessageInTransaction(
        "order-tx-group",
        "order-create-topic",
        MessageBuilder.withPayload(dto)
            .setHeader("orderNo", dto.getOrderNo())
            .build(),
        dto
    );
}

2. TCC补偿事务

public class StockTccService {
    @Transactional
    public boolean prepareReduceStock(Long skuId, Integer num) {
        // 预占库存(冻结字段)
        int affected = skuRepository.freezeStock(skuId, num);
        return affected > 0;
    }
    
    @Transactional
    public boolean commitReduceStock(Long skuId, Integer num) {
        // 确认扣减(真实减少)
        return skuRepository.confirmStock(skuId, num) > 0;
    }
    
    @Transactional
    public boolean cancelReduceStock(Long skuId, Integer num) {
        // 取消预占(释放冻结)
        return skuRepository.cancelFreeze(skuId, num) > 0;
    }
}

六、系统保护机制

1. 熔断降级配置

@SentinelResource(
    value = "createOrder",
    blockHandler = "createOrderBlockHandler",
    fallback = "createOrderFallback"
)
public OrderResult createOrder(OrderCreateDTO dto) {
    // 正常业务逻辑
    return orderService.create(dto);
}

// 流控处理
public OrderResult createOrderBlockHandler(
    OrderCreateDTO dto, BlockException ex) {
    log.warn("触发流控:{}", ex.getMessage());
    return OrderResult.fail("系统繁忙,请稍后再试");
}

// 降级处理
public OrderResult createOrderFallback(
    OrderCreateDTO dto, Throwable ex) {
    log.error("订单创建失败:", ex);
    return OrderResult.fail("服务暂时不可用");
}

2. 系统自适应保护

@PostConstruct
public void initFlowRules() {
    List rules = new ArrayList();
    
    // 订单创建接口QPS限流
    FlowRule orderRule = new FlowRule();
    orderRule.setResource("createOrder");
    orderRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
    orderRule.setCount(5000); // 阈值
    orderRule.setControlBehavior(
        RuleConstant.CONTROL_BEHAVIOR_WARM_UP); // 预热
    orderRule.setWarmUpPeriodSec(10);
    rules.add(orderRule);
    
    // 库存查询接口线程数限流
    FlowRule stockRule = new FlowRule();
    stockRule.setResource("queryStock");
    stockRule.setGrade(RuleConstant.FLOW_GRADE_THREAD);
    stockRule.setCount(100); 
    rules.add(stockRule);
    
    FlowRuleManager.loadRules(rules);
}

七、压测与优化

1. JMeter压测配置

<?xml version="1.0" encoding="UTF-8"?>
<jmeterTestPlan version="1.2">
  <hashTree>
    <TestPlan>
      <stringProp name="TestPlan.comments">秒杀压测</stringProp>
    </TestPlan>
    <ThreadGroup>
      <intProp name="ThreadGroup.num_threads">5000</intProp>
      <intProp name="ThreadGroup.ramp_time">60</intProp>
      <LoopController>
        <intProp name="LoopController.loops">-1</intProp>
      </LoopController>
    </ThreadGroup>
    <HTTPSampler>
      <stringProp name="HTTPSampler.domain">api.seckill.com</stringProp>
      <stringProp name="HTTPSampler.path">/order/create</stringProp>
      <elementProp name="Arguments">
        <collectionProp name="Arguments.arguments">
          <elementProp>
            <stringProp name="Argument.name">skuId</stringProp>
            <stringProp name="Argument.value">1001</stringProp>
          </elementProp>
        </collectionProp>
      </elementProp>
    </HTTPSampler>
  </hashTree>
</jmeterTestPlan>

2. Arthas性能诊断

# 启动Arthas
java -jar arthas-boot.jar

# 监控方法调用耗时
watch com.seckill.service.OrderService createOrder '{params,returnObj}' -x 3 -b

# 统计方法调用热路径
profiler start --event cpu
profiler stop --format html

# 追踪慢SQL
trace *.JdbcTemplate query

# 生成火焰图
profiler start --format flamegraph
profiler stop

八、总结与扩展

本系统实现了:

  1. 分布式环境下的库存精确扣减
  2. 多级缓存架构支撑高并发查询
  3. 柔性事务保证最终一致性
  4. 全链路系统保护机制

扩展方向:

  • 全链路灰度发布
  • 基于机器学习的动态限流
  • 异构数据源实时同步
  • Service Mesh架构升级
Java高并发编程实战:构建亿级流量秒杀系统 | 分布式架构设计
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 css Java高并发编程实战:构建亿级流量秒杀系统 | 分布式架构设计 https://www.taomawang.com/web/css/845.html

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务