发布日期:2024年2月15日
一、系统架构设计
本教程将实现一个完整的秒杀系统,主要解决以下核心问题:
- 瞬时高并发:QPS 10万+的流量冲击
- 超卖问题:库存精确扣减
- 防刷机制:黄牛识别与拦截
- 系统保护:熔断降级策略
- 数据一致性:分布式事务处理
技术栈:Spring Boot 3 + Redis + RocketMQ + Zookeeper + Sentinel
二、基础环境搭建
1. 项目初始化
// 使用Spring Initializr创建项目
mvn archetype:generate
-DgroupId=com.seckill
-DartifactId=seckill-system
-DarchetypeArtifactId=maven-archetype-quickstart
-DinteractiveMode=false
// 添加关键依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
2. 分层架构设计
src/
├── main/
│ ├── java/
│ │ ├── com.seckill/
│ │ │ ├── api/ # 接口层
│ │ │ ├── application/# 应用服务
│ │ │ ├── domain/ # 领域模型
│ │ │ ├── infrastructure/
│ │ │ │ ├── cache/ # 缓存实现
│ │ │ │ ├── mq/ # 消息队列
│ │ │ │ └── repo/ # 仓储实现
│ │ │ └── startup/ # 启动配置
│ ├── resources/
│ │ ├── config/ # 多环境配置
│ │ └── scripts/ # 库存预热脚本
├── test/ # 压测代码
三、核心业务实现
1. 分布式锁实现
public class RedisDistributedLock implements AutoCloseable {
private final RedissonClient redisson;
private final String lockKey;
private RLock lock;
public RedisDistributedLock(RedissonClient redisson, String lockKey) {
this.redisson = redisson;
this.lockKey = "lock:" + lockKey;
}
public boolean tryLock(long waitTime, long leaseTime) {
try {
lock = redisson.getLock(lockKey);
return lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
@Override
public void close() {
if (lock != null && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
// 使用示例
try (RedisDistributedLock lock = new RedisDistributedLock(redisson, "stock_"+skuId)) {
if (lock.tryLock(1, 30)) {
// 扣减库存操作
reduceStock(skuId);
}
}
2. 库存扣减逻辑
@Transactional
public boolean reduceStock(Long skuId, Integer num) {
// 1. 校验库存是否充足
SeckillSku sku = skuRepository.findById(skuId)
.orElseThrow(() -> new BusinessException("商品不存在"));
if (sku.getAvailableStock() < num) {
return false;
}
// 2. 扣减数据库库存(乐观锁)
int affected = skuRepository.reduceStock(skuId, num, sku.getVersion());
if (affected == 0) {
throw new ConcurrentUpdateException("库存并发修改冲突");
}
// 3. 同步Redis库存
redisTemplate.opsForValue().decrement(
"stock:" + skuId, num);
// 4. 发送扣减消息
sendStockReduceMessage(skuId, num);
return true;
}
四、高并发优化方案
1. 多级缓存设计
public SeckillSku getSkuWithCache(Long skuId) {
// 1. 本地缓存
SeckillSku sku = caffeineCache.getIfPresent(skuId);
if (sku != null) {
return sku;
}
// 2. Redis缓存
String cacheKey = "sku:" + skuId;
String json = redisTemplate.opsForValue().get(cacheKey);
if (json != null) {
sku = JsonUtils.parse(json, SeckillSku.class);
caffeineCache.put(skuId, sku);
return sku;
}
// 3. 数据库查询(防击穿)
synchronized (this) {
// 双重检查
json = redisTemplate.opsForValue().get(cacheKey);
if (json != null) {
return JsonUtils.parse(json, SeckillSku.class);
}
sku = skuRepository.findById(skuId).orElse(null);
if (sku != null) {
redisTemplate.opsForValue().set(
cacheKey,
JsonUtils.stringify(sku),
5, TimeUnit.MINUTES);
}
return sku;
}
}
2. 令牌桶限流实现
public class TokenBucketLimiter {
private final AtomicLong tokens;
private final long capacity;
private final long refillRate; // tokens/second
private volatile long lastRefillTime;
public TokenBucketLimiter(long capacity, long refillRate) {
this.capacity = capacity;
this.refillRate = refillRate;
this.tokens = new AtomicLong(capacity);
this.lastRefillTime = System.currentTimeMillis();
}
public boolean tryAcquire(int permits) {
refillTokens();
while (true) {
long current = tokens.get();
if (current 1000) {
long newTokens = elapsedTime * refillRate / 1000;
synchronized (this) {
elapsedTime = now - lastRefillTime;
if (elapsedTime > 1000) {
long current = tokens.get();
long adjustedTokens = Math.min(
current + newTokens, capacity);
tokens.set(adjustedTokens);
lastRefillTime = now;
}
}
}
}
}
五、分布式事务方案
1. 可靠消息最终一致性
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(
Message msg, Object arg) {
try {
OrderCreateDTO dto = (OrderCreateDTO) arg;
// 1. 创建订单(数据库事务)
orderService.createOrder(dto);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String orderNo = msg.getUserProperty("orderNo");
Order order = orderService.getByOrderNo(orderNo);
return order != null ?
RocketMQLocalTransactionState.COMMIT :
RocketMQLocalTransactionState.ROLLBACK;
}
}
// 发送半消息
public void createOrderAsync(OrderCreateDTO dto) {
rocketMQTemplate.sendMessageInTransaction(
"order-tx-group",
"order-create-topic",
MessageBuilder.withPayload(dto)
.setHeader("orderNo", dto.getOrderNo())
.build(),
dto
);
}
2. TCC补偿事务
public class StockTccService {
@Transactional
public boolean prepareReduceStock(Long skuId, Integer num) {
// 预占库存(冻结字段)
int affected = skuRepository.freezeStock(skuId, num);
return affected > 0;
}
@Transactional
public boolean commitReduceStock(Long skuId, Integer num) {
// 确认扣减(真实减少)
return skuRepository.confirmStock(skuId, num) > 0;
}
@Transactional
public boolean cancelReduceStock(Long skuId, Integer num) {
// 取消预占(释放冻结)
return skuRepository.cancelFreeze(skuId, num) > 0;
}
}
六、系统保护机制
1. 熔断降级配置
@SentinelResource(
value = "createOrder",
blockHandler = "createOrderBlockHandler",
fallback = "createOrderFallback"
)
public OrderResult createOrder(OrderCreateDTO dto) {
// 正常业务逻辑
return orderService.create(dto);
}
// 流控处理
public OrderResult createOrderBlockHandler(
OrderCreateDTO dto, BlockException ex) {
log.warn("触发流控:{}", ex.getMessage());
return OrderResult.fail("系统繁忙,请稍后再试");
}
// 降级处理
public OrderResult createOrderFallback(
OrderCreateDTO dto, Throwable ex) {
log.error("订单创建失败:", ex);
return OrderResult.fail("服务暂时不可用");
}
2. 系统自适应保护
@PostConstruct
public void initFlowRules() {
List rules = new ArrayList();
// 订单创建接口QPS限流
FlowRule orderRule = new FlowRule();
orderRule.setResource("createOrder");
orderRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
orderRule.setCount(5000); // 阈值
orderRule.setControlBehavior(
RuleConstant.CONTROL_BEHAVIOR_WARM_UP); // 预热
orderRule.setWarmUpPeriodSec(10);
rules.add(orderRule);
// 库存查询接口线程数限流
FlowRule stockRule = new FlowRule();
stockRule.setResource("queryStock");
stockRule.setGrade(RuleConstant.FLOW_GRADE_THREAD);
stockRule.setCount(100);
rules.add(stockRule);
FlowRuleManager.loadRules(rules);
}
七、压测与优化
1. JMeter压测配置
<?xml version="1.0" encoding="UTF-8"?>
<jmeterTestPlan version="1.2">
<hashTree>
<TestPlan>
<stringProp name="TestPlan.comments">秒杀压测</stringProp>
</TestPlan>
<ThreadGroup>
<intProp name="ThreadGroup.num_threads">5000</intProp>
<intProp name="ThreadGroup.ramp_time">60</intProp>
<LoopController>
<intProp name="LoopController.loops">-1</intProp>
</LoopController>
</ThreadGroup>
<HTTPSampler>
<stringProp name="HTTPSampler.domain">api.seckill.com</stringProp>
<stringProp name="HTTPSampler.path">/order/create</stringProp>
<elementProp name="Arguments">
<collectionProp name="Arguments.arguments">
<elementProp>
<stringProp name="Argument.name">skuId</stringProp>
<stringProp name="Argument.value">1001</stringProp>
</elementProp>
</collectionProp>
</elementProp>
</HTTPSampler>
</hashTree>
</jmeterTestPlan>
2. Arthas性能诊断
# 启动Arthas
java -jar arthas-boot.jar
# 监控方法调用耗时
watch com.seckill.service.OrderService createOrder '{params,returnObj}' -x 3 -b
# 统计方法调用热路径
profiler start --event cpu
profiler stop --format html
# 追踪慢SQL
trace *.JdbcTemplate query
# 生成火焰图
profiler start --format flamegraph
profiler stop
八、总结与扩展
本系统实现了:
- 分布式环境下的库存精确扣减
- 多级缓存架构支撑高并发查询
- 柔性事务保证最终一致性
- 全链路系统保护机制
扩展方向:
- 全链路灰度发布
- 基于机器学习的动态限流
- 异构数据源实时同步
- Service Mesh架构升级