Java微服务架构下分布式事务解决方案实战 | Saga模式详解

2025-10-04 0 153

分布式事务的挑战与Saga模式概述

微服务架构日益普及的今天,分布式事务管理成为系统设计的核心挑战。传统的两阶段提交(2PC)在微服务场景下存在性能瓶颈和可用性问题。本文将深入探讨基于Saga模式的分布式事务解决方案,通过完整的电商案例展示如何在Java微服务体系中实现最终一致性。

Saga模式的核心思想

  • 长事务拆分:将大事务拆分为一系列可补偿的本地事务
  • 事件驱动:通过事件消息协调各个服务的执行顺序
  • 补偿机制:为每个正向操作定义对应的补偿操作
  • 最终一致性:通过重试和补偿保证数据的最终一致

电商订单场景的业务分析

以电商平台的创建订单流程为例,涉及多个微服务的协同操作:

创建订单Saga流程:
1. 订单服务 - 创建订单(待确认状态)
2. 库存服务 - 扣减商品库存
3. 积分服务 - 扣减用户积分
4. 支付服务 - 执行支付操作
5. 订单服务 - 更新订单为成功状态

补偿流程:
任一服务失败时,需要执行已成功服务的补偿操作

技术架构设计

系统组件

分布式事务Saga架构:
├── Saga协调器 (Saga Orchestrator)
│   ├── 流程定义管理
│   ├── 事务状态跟踪
│   └── 补偿流程执行
├── 参与服务 (Participant Services)
│   ├── 订单服务 (Order Service)
│   ├── 库存服务 (Inventory Service)
│   ├── 积分服务 (Points Service)
│   └── 支付服务 (Payment Service)
├── 事件总线 (Event Bus)
│   ├── 命令事件分发
│   └── 回复事件收集
└── 持久化存储
    ├── Saga日志存储
    └── 事务状态存储

依赖配置

<dependency>
    <groupId>org.springframework.boot</groupId>
    <groupId>spring-boot-starter-web</groupId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

核心代码实现

1. Saga协调器核心实现

// Saga定义实体
@Entity
@Table(name = "saga_definition")
public class SagaDefinition {
    @Id
    private String sagaType;
    
    @OneToMany(cascade = CascadeType.ALL, fetch = FetchType.EAGER)
    @JoinColumn(name = "saga_type")
    private List<SagaStep> steps;
    
    // getters and setters
}

@Entity
@Table(name = "saga_step")
public class SagaStep {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    private String serviceName;
    private String commandType;
    private String compensationType;
    private Integer stepOrder;
    
    // getters and setters
}

// Saga实例管理
@Entity
@Table(name = "saga_instance")
public class SagaInstance {
    @Id
    private String sagaId;
    
    private String sagaType;
    private SagaStatus status;
    
    @ElementCollection
    @CollectionTable(name = "saga_step_status", joinColumns = @JoinColumn(name = "saga_id"))
    private List<SagaStepStatus> stepStatuses;
    
    private LocalDateTime createdTime;
    private LocalDateTime updatedTime;
    
    // getters and setters
}

public enum SagaStatus {
    STARTED, COMPENSATING, COMPENSATED, COMPLETED, FAILED
}

2. Saga协调器服务

@Service
@Slf4j
public class SagaOrchestrator {
    
    @Autowired
    private SagaDefinitionRepository definitionRepository;
    
    @Autowired
    private SagaInstanceRepository instanceRepository;
    
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    /**
     * 开始执行Saga事务
     */
    public String startSaga(String sagaType, Object data) {
        String sagaId = UUID.randomUUID().toString();
        
        SagaInstance instance = new SagaInstance();
        instance.setSagaId(sagaId);
        instance.setSagaType(sagaType);
        instance.setStatus(SagaStatus.STARTED);
        instance.setCreatedTime(LocalDateTime.now());
        
        // 初始化步骤状态
        List<SagaStepStatus> stepStatuses = initializeStepStatuses(sagaType);
        instance.setStepStatuses(stepStatuses);
        
        instanceRepository.save(instance);
        
        // 开始执行第一个步骤
        executeNextStep(sagaId);
        
        return sagaId;
    }
    
    /**
     * 处理步骤执行结果
     */
    @KafkaListener(topics = "saga-reply")
    public void handleStepReply(SagaReplyMessage reply) {
        SagaInstance instance = instanceRepository.findById(reply.getSagaId())
                .orElseThrow(() -> new RuntimeException("Saga实例不存在"));
        
        if (reply.isSuccess()) {
            handleStepSuccess(instance, reply);
        } else {
            handleStepFailure(instance, reply);
        }
    }
    
    private void handleStepSuccess(SagaInstance instance, SagaReplyMessage reply) {
        // 更新步骤状态为成功
        updateStepStatus(instance, reply.getStepOrder(), StepStatus.SUCCEEDED);
        
        // 检查是否所有步骤都完成
        if (isAllStepsCompleted(instance)) {
            instance.setStatus(SagaStatus.COMPLETED);
            instanceRepository.save(instance);
            log.info("Saga事务完成: {}", instance.getSagaId());
        } else {
            // 执行下一个步骤
            executeNextStep(instance.getSagaId());
        }
    }
    
    private void handleStepFailure(SagaInstance instance, SagaReplyMessage reply) {
        log.error("Saga步骤执行失败,开始补偿流程: {}", reply.getSagaId());
        
        instance.setStatus(SagaStatus.COMPENSATING);
        instanceRepository.save(instance);
        
        // 执行补偿流程
        executeCompensation(instance.getSagaId(), reply.getStepOrder());
    }
    
    /**
     * 执行补偿流程
     */
    private void executeCompensation(String sagaId, Integer failedStepOrder) {
        SagaInstance instance = instanceRepository.findById(sagaId)
                .orElseThrow(() -> new RuntimeException("Saga实例不存在"));
        
        // 从失败步骤的前一步开始反向补偿
        for (int i = failedStepOrder - 1; i >= 0; i--) {
            SagaStepStatus stepStatus = instance.getStepStatuses().get(i);
            if (stepStatus.getStatus() == StepStatus.SUCCEEDED) {
                sendCompensationCommand(sagaId, i);
            }
        }
        
        instance.setStatus(SagaStatus.COMPENSATED);
        instanceRepository.save(instance);
    }
    
    private void sendCompensationCommand(String sagaId, Integer stepOrder) {
        SagaCompensationCommand command = new SagaCompensationCommand();
        command.setSagaId(sagaId);
        command.setStepOrder(stepOrder);
        command.setTimestamp(LocalDateTime.now());
        
        kafkaTemplate.send("saga-compensation", sagaId, command);
    }
}

3. 订单服务实现

@Service
@Slf4j
public class OrderService {
    
    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    /**
     * 创建订单 - Saga正向操作
     */
    @KafkaListener(topics = "order-create-command")
    public void handleCreateOrder(CreateOrderCommand command) {
        try {
            log.info("开始创建订单: {}", command.getOrderId());
            
            Order order = new Order();
            order.setOrderId(command.getOrderId());
            order.setUserId(command.getUserId());
            order.setTotalAmount(command.getTotalAmount());
            order.setStatus(OrderStatus.PENDING);
            order.setCreatedTime(LocalDateTime.now());
            
            orderRepository.save(order);
            
            // 发送成功回复
            SagaReplyMessage reply = new SagaReplyMessage();
            reply.setSagaId(command.getSagaId());
            reply.setStepOrder(command.getStepOrder());
            reply.setSuccess(true);
            
            kafkaTemplate.send("saga-reply", command.getSagaId(), reply);
            
        } catch (Exception e) {
            log.error("创建订单失败: {}", command.getOrderId(), e);
            
            SagaReplyMessage reply = new SagaReplyMessage();
            reply.setSagaId(command.getSagaId());
            reply.setStepOrder(command.getStepOrder());
            reply.setSuccess(false);
            reply.setErrorMessage(e.getMessage());
            
            kafkaTemplate.send("saga-reply", command.getSagaId(), reply);
        }
    }
    
    /**
     * 取消订单 - Saga补偿操作
     */
    @KafkaListener(topics = "order-compensation")
    public void handleCompensateOrder(OrderCompensationCommand command) {
        try {
            log.info("开始补偿订单: {}", command.getOrderId());
            
            Order order = orderRepository.findById(command.getOrderId())
                    .orElseThrow(() -> new RuntimeException("订单不存在"));
            
            order.setStatus(OrderStatus.CANCELLED);
            order.setUpdatedTime(LocalDateTime.now());
            
            orderRepository.save(order);
            
            log.info("订单补偿完成: {}", command.getOrderId());
            
        } catch (Exception e) {
            log.error("订单补偿失败: {}", command.getOrderId(), e);
            // 记录补偿失败,需要人工干预
        }
    }
}

@Entity
@Table(name = "orders")
public class Order {
    @Id
    private String orderId;
    
    private String userId;
    private BigDecimal totalAmount;
    private OrderStatus status;
    private LocalDateTime createdTime;
    private LocalDateTime updatedTime;
    
    // getters and setters
}

public enum OrderStatus {
    PENDING, CONFIRMED, CANCELLED, COMPLETED
}

4. 库存服务实现

@Service
@Slf4j
public class InventoryService {
    
    @Autowired
    private InventoryRepository inventoryRepository;
    
    @Transactional
    @KafkaListener(topics = "inventory-deduct-command")
    public void handleDeductInventory(DeductInventoryCommand command) {
        try {
            log.info("开始扣减库存: {}", command.getProductId());
            
            Inventory inventory = inventoryRepository.findByProductId(command.getProductId())
                    .orElseThrow(() -> new RuntimeException("商品库存不存在"));
            
            if (inventory.getQuantity()  new RuntimeException("商品库存不存在"));
            
            inventory.setQuantity(inventory.getQuantity() + command.getQuantity());
            inventory.setUpdatedTime(LocalDateTime.now());
            
            inventoryRepository.save(inventory);
            
            log.info("库存恢复完成: {}", command.getProductId());
            
        } catch (Exception e) {
            log.error("库存恢复失败: {}", command.getProductId(), e);
            // 记录补偿失败
        }
    }
}

Saga配置与流程定义

Saga流程配置

@Configuration
public class OrderSagaConfiguration {
    
    @Bean
    public SagaDefinition createOrderSagaDefinition() {
        List<SagaStep> steps = new ArrayList<>();
        
        // 步骤1: 创建订单
        steps.add(createStep(1, "order-service", "CREATE_ORDER", "CANCEL_ORDER"));
        
        // 步骤2: 扣减库存
        steps.add(createStep(2, "inventory-service", "DEDUCT_INVENTORY", "RESTORE_INVENTORY"));
        
        // 步骤3: 扣减积分
        steps.add(createStep(3, "points-service", "DEDUCT_POINTS", "RESTORE_POINTS"));
        
        // 步骤4: 执行支付
        steps.add(createStep(4, "payment-service", "EXECUTE_PAYMENT", "REFUND_PAYMENT"));
        
        // 步骤5: 确认订单
        steps.add(createStep(5, "order-service", "CONFIRM_ORDER", null));
        
        SagaDefinition definition = new SagaDefinition();
        definition.setSagaType("CREATE_ORDER");
        definition.setSteps(steps);
        
        return definition;
    }
    
    private SagaStep createStep(int order, String service, String command, String compensation) {
        SagaStep step = new SagaStep();
        step.setStepOrder(order);
        step.setServiceName(service);
        step.setCommandType(command);
        step.setCompensationType(compensation);
        return step;
    }
}

监控与运维

Saga事务监控

@RestController
@RequestMapping("/api/saga")
public class SagaMonitorController {
    
    @Autowired
    private SagaInstanceRepository instanceRepository;
    
    @GetMapping("/instances")
    public ResponseEntity<List<SagaInstance>> getSagaInstances(
            @RequestParam(required = false) SagaStatus status,
            @RequestParam(required = false) String sagaType) {
        
        Specification<SagaInstance> spec = Specification.where(null);
        
        if (status != null) {
            spec = spec.and((root, query, cb) -> cb.equal(root.get("status"), status));
        }
        
        if (sagaType != null) {
            spec = spec.and((root, query, cb) -> cb.equal(root.get("sagaType"), sagaType));
        }
        
        List<SagaInstance> instances = instanceRepository.findAll(spec);
        return ResponseEntity.ok(instances);
    }
    
    @GetMapping("/instances/{sagaId}")
    public ResponseEntity<SagaInstance> getSagaInstance(@PathVariable String sagaId) {
        return instanceRepository.findById(sagaId)
                .map(ResponseEntity::ok)
                .orElse(ResponseEntity.notFound().build());
    }
    
    @PostMapping("/instances/{sagaId}/retry")
    public ResponseEntity<String> retrySaga(@PathVariable String sagaId) {
        // 重试失败的Saga事务
        // 实现重试逻辑
        return ResponseEntity.ok("重试请求已接受");
    }
}

测试策略

集成测试示例

@SpringBootTest
@TestPropertySource(locations = "classpath:application-test.properties")
class OrderSagaIntegrationTest {
    
    @Autowired
    private SagaOrchestrator sagaOrchestrator;
    
    @Autowired
    private TestKafkaTemplate kafkaTemplate;
    
    @Test
    void testCreateOrderSaga_Success() {
        // 准备测试数据
        CreateOrderRequest request = new CreateOrderRequest();
        request.setUserId("user123");
        request.setProductId("product456");
        request.setQuantity(2);
        request.setTotalAmount(new BigDecimal("199.99"));
        
        // 执行Saga事务
        String sagaId = sagaOrchestrator.startSaga("CREATE_ORDER", request);
        
        // 验证最终状态
        await().atMost(30, TimeUnit.SECONDS)
               .until(() -> {
                   SagaInstance instance = sagaOrchestrator.getSagaInstance(sagaId);
                   return instance.getStatus() == SagaStatus.COMPLETED;
               });
        
        // 验证业务数据
        // 验证订单状态、库存扣减、积分扣减等
    }
    
    @Test
    void testCreateOrderSaga_Compensation() {
        // 模拟支付服务失败场景
        // 验证补偿流程正确执行
    }
}

性能优化与最佳实践

优化策略

  • 异步处理:Saga步骤采用异步消息驱动,避免阻塞
  • 幂等性设计:所有操作和补偿操作都要保证幂等性

总结

本文详细介绍了在Java微服务架构下使用Saga模式解决分布式事务问题的完整方案。通过电商订单案例,展示了从架构设计、代码实现到测试运维的全流程实践。

核心价值:

  • 提供了微服务架构下分布式事务的可行解决方案
  • 通过补偿机制保证最终一致性
  • 具备良好的可扩展性和容错能力
  • 完整的监控和运维支持

Saga模式虽然增加了系统的复杂性,但在需要高可用性和性能的微服务场景下,它提供了比传统2PC更优的解决方案。在实际应用中,还需要结合具体业务场景进行适当的调整和优化。

Java微服务架构下分布式事务解决方案实战 | Saga模式详解
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 java Java微服务架构下分布式事务解决方案实战 | Saga模式详解 https://www.taomawang.com/server/java/1164.html

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务