Java多线程并发编程实战:构建高性能电商库存管理系统 | Java开发指南

2025-08-22 0 752

一、项目概述与需求分析

在高并发电商场景中,库存管理是一个经典的多线程并发问题。本文将带领您使用Java多线程技术构建一个线程安全的库存管理系统,解决超卖、库存不一致等常见问题。

1.1 系统需求

  • 支持多用户同时下单
  • 防止库存超卖
  • 实现库存的原子性操作
  • 支持库存查询和统计
  • 提供订单处理日志

二、核心类设计

2.1 库存商品类 (InventoryItem)

public class InventoryItem {
    private final String productId;
    private final String productName;
    private volatile int stockQuantity;
    private final int maxStock;
    
    public InventoryItem(String productId, String productName, int initialStock, int maxStock) {
        this.productId = productId;
        this.productName = productName;
        this.stockQuantity = initialStock;
        this.maxStock = maxStock;
    }
    
    // 线程安全的库存减少方法
    public synchronized boolean decreaseStock(int quantity) {
        if (quantity <= 0 || stockQuantity < quantity) {
            return false;
        }
        stockQuantity -= quantity;
        return true;
    }
    
    // 线程安全的库存增加方法
    public synchronized boolean increaseStock(int quantity) {
        if (quantity  maxStock) {
            return false;
        }
        stockQuantity += quantity;
        return true;
    }
    
    // 获取当前库存(使用volatile保证可见性)
    public int getStockQuantity() {
        return stockQuantity;
    }
    
    // 省略getter方法
}

2.2 库存管理器 (InventoryManager)

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

public class InventoryManager {
    private final Map<String, InventoryItem> inventoryMap;
    private final AtomicLong totalOrders;
    private final AtomicLong successfulOrders;
    private final AtomicLong failedOrders;
    
    public InventoryManager() {
        this.inventoryMap = new ConcurrentHashMap();
        this.totalOrders = new AtomicLong(0);
        this.successfulOrders = new AtomicLong(0);
        this.failedOrders = new AtomicLong(0);
    }
    
    // 添加商品到库存
    public void addProduct(InventoryItem item) {
        inventoryMap.put(item.getProductId(), item);
    }
    
    // 处理订单(线程安全)
    public boolean processOrder(String productId, int quantity, String customerId) {
        totalOrders.incrementAndGet();
        
        InventoryItem item = inventoryMap.get(productId);
        if (item == null) {
            failedOrders.incrementAndGet();
            return false;
        }
        
        boolean success = item.decreaseStock(quantity);
        if (success) {
            successfulOrders.incrementAndGet();
            logOrder(productId, quantity, customerId, "SUCCESS");
            return true;
        } else {
            failedOrders.incrementAndGet();
            logOrder(productId, quantity, customerId, "FAILED - Insufficient stock");
            return false;
        }
    }
    
    // 库存补充
    public boolean restockProduct(String productId, int quantity) {
        InventoryItem item = inventoryMap.get(productId);
        if (item == null) {
            return false;
        }
        return item.increaseStock(quantity);
    }
    
    // 订单日志
    private void logOrder(String productId, int quantity, String customerId, String status) {
        System.out.println(String.format("Order Log: Product=%s, Quantity=%d, Customer=%s, Status=%s, Time=%s",
                productId, quantity, customerId, status, new java.util.Date()));
    }
    
    // 获取库存统计
    public void displayInventoryStatus() {
        System.out.println("n=== Current Inventory Status ===");
        inventoryMap.forEach((id, item) -> {
            System.out.println(String.format("Product: %s, Stock: %d", 
                    item.getProductName(), item.getStockQuantity()));
        });
        System.out.println(String.format("Total Orders: %d, Successful: %d, Failed: %d",
                totalOrders.get(), successfulOrders.get(), failedOrders.get()));
    }
}

2.3 订单处理任务 (OrderProcessingTask)

import java.util.Random;

public class OrderProcessingTask implements Runnable {
    private final InventoryManager inventoryManager;
    private final String taskName;
    private final Random random = new Random();
    
    public OrderProcessingTask(InventoryManager inventoryManager, String taskName) {
        this.inventoryManager = inventoryManager;
        this.taskName = taskName;
    }
    
    @Override
    public void run() {
        try {
            // 模拟随机订单处理
            String[] productIds = {"P001", "P002", "P003"};
            String productId = productIds[random.nextInt(productIds.length)];
            int quantity = random.nextInt(5) + 1; // 1-5件商品
            String customerId = "Customer_" + (random.nextInt(1000) + 1);
            
            System.out.println(String.format("%s processing order: %s x%d for %s",
                    taskName, productId, quantity, customerId));
            
            boolean success = inventoryManager.processOrder(productId, quantity, customerId);
            
            if (success) {
                System.out.println(String.format("%s: Order successful for %s x%d",
                        taskName, productId, quantity));
            } else {
                System.out.println(String.format("%s: Order failed for %s x%d",
                        taskName, productId, quantity));
            }
            
            // 模拟处理时间
            Thread.sleep(random.nextInt(200));
            
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println(taskName + " was interrupted");
        }
    }
}

三、线程池配置与性能优化

3.1 自定义线程工厂

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class InventoryThreadFactory implements ThreadFactory {
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
    
    public InventoryThreadFactory(String poolName) {
        namePrefix = poolName + "-thread-";
    }
    
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r, namePrefix + threadNumber.getAndIncrement());
        thread.setPriority(Thread.NORM_PRIORITY);
        thread.setUncaughtExceptionHandler((t, e) -> {
            System.err.println("Uncaught exception in thread: " + t.getName());
            e.printStackTrace();
        });
        return thread;
    }
}

3.2 拒绝策略处理器

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class InventoryRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.err.println("Task rejected: " + r.toString());
        // 可以在这里添加日志记录、报警等逻辑
        if (!executor.isShutdown()) {
            try {
                // 尝试重新放入队列
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("Rejected task handling interrupted");
            }
        }
    }
}

四、完整系统实现与测试

4.1 主应用程序 (InventorySystemApplication)

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class InventorySystemApplication {
    public static void main(String[] args) {
        // 初始化库存管理器
        InventoryManager inventoryManager = new InventoryManager();
        
        // 添加示例商品
        inventoryManager.addProduct(new InventoryItem("P001", "iPhone 15", 100, 200));
        inventoryManager.addProduct(new InventoryItem("P002", "MacBook Pro", 50, 100));
        inventoryManager.addProduct(new InventoryItem("P003", "AirPods Pro", 200, 500));
        
        // 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            10, // 核心线程数
            20, // 最大线程数
            60L, TimeUnit.SECONDS, // 空闲线程存活时间
            new ArrayBlockingQueue(100), // 工作队列
            new InventoryThreadFactory("InventoryPool"), // 线程工厂
            new InventoryRejectedExecutionHandler() // 拒绝策略
        );
        
        // 提交订单处理任务
        for (int i = 1; i <= 50; i++) {
            OrderProcessingTask task = new OrderProcessingTask(
                inventoryManager, "OrderProcessor-" + i);
            executor.execute(task);
            
            // 随机间隔提交任务,模拟真实场景
            try {
                Thread.sleep((long) (Math.random() * 100));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
        
        // 优雅关闭线程池
        executor.shutdown();
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        
        // 显示最终库存状态
        inventoryManager.displayInventoryStatus();
        
        // 性能统计
        System.out.println("n=== Thread Pool Statistics ===");
        System.out.println("Core pool size: " + executor.getCorePoolSize());
        System.out.println("Maximum pool size: " + executor.getMaximumPoolSize());
        System.out.println("Completed task count: " + executor.getCompletedTaskCount());
        System.out.println("Active thread count: " + executor.getActiveCount());
    }
}

4.2 测试用例 (InventoryTest)

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;

public class InventoryTest {
    private InventoryManager inventoryManager;
    private InventoryItem testItem;
    
    @BeforeEach
    void setUp() {
        inventoryManager = new InventoryManager();
        testItem = new InventoryItem("TEST001", "Test Product", 10, 20);
        inventoryManager.addProduct(testItem);
    }
    
    @Test
    void testDecreaseStockSuccess() {
        assertTrue(testItem.decreaseStock(5));
        assertEquals(5, testItem.getStockQuantity());
    }
    
    @Test
    void testDecreaseStockFailure() {
        assertFalse(testItem.decreaseStock(15));
        assertEquals(10, testItem.getStockQuantity());
    }
    
    @Test
    void testConcurrentOrderProcessing() throws InterruptedException {
        Thread thread1 = new Thread(() -> {
            inventoryManager.processOrder("TEST001", 3, "Customer1");
        });
        
        Thread thread2 = new Thread(() -> {
            inventoryManager.processOrder("TEST001", 4, "Customer2");
        });
        
        thread1.start();
        thread2.start();
        
        thread1.join();
        thread2.join();
        
        // 验证库存正确性
        assertTrue(testItem.getStockQuantity() >= 3);
    }
    
    @Test
    void testRestockFunctionality() {
        assertTrue(inventoryManager.restockProduct("TEST001", 5));
        assertEquals(15, testItem.getStockQuantity());
        
        assertFalse(inventoryManager.restockProduct("TEST001", 10)); // 超过最大库存
        assertEquals(15, testItem.getStockQuantity());
    }
}

五、高级特性与优化建议

5.1 使用ReadWriteLock优化读操作

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class OptimizedInventoryItem {
    private final String productId;
    private int stockQuantity;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    
    // 使用读锁优化库存查询
    public int getStockQuantity() {
        lock.readLock().lock();
        try {
            return stockQuantity;
        } finally {
            lock.readLock().unlock();
        }
    }
    
    // 使用写锁保证库存修改的线程安全
    public boolean decreaseStock(int quantity) {
        lock.writeLock().lock();
        try {
            if (quantity <= 0 || stockQuantity < quantity) {
                return false;
            }
            stockQuantity -= quantity;
            return true;
        } finally {
            lock.writeLock().unlock();
        }
    }
}

5.2 使用CompletableFuture进行异步处理

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AsyncInventoryService {
    private final InventoryManager inventoryManager;
    private final ExecutorService asyncExecutor;
    
    public AsyncInventoryService(InventoryManager inventoryManager) {
        this.inventoryManager = inventoryManager;
        this.asyncExecutor = Executors.newFixedThreadPool(4);
    }
    
    public CompletableFuture<Boolean> processOrderAsync(String productId, int quantity, String customerId) {
        return CompletableFuture.supplyAsync(() -> 
            inventoryManager.processOrder(productId, quantity, customerId), 
            asyncExecutor
        );
    }
    
    public CompletableFuture<Void> batchProcessOrders(List<Order> orders) {
        List<CompletableFuture<Boolean>> futures = orders.stream()
            .map(order -> processOrderAsync(
                order.getProductId(), 
                order.getQuantity(), 
                order.getCustomerId()))
            .collect(Collectors.toList());
        
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
    }
}

六、总结与最佳实践

通过本实战教程,我们构建了一个完整的Java多线程库存管理系统,涵盖了以下核心内容:

  • 线程安全设计:使用synchronized关键字和ConcurrentHashMap保证数据一致性
  • 线程池配置:合理配置线程池参数,使用自定义线程工厂和拒绝策略
  • 性能优化:使用ReadWriteLock分离读写操作,提高并发性能
  • 异步处理:利用CompletableFuture实现非阻塞操作
  • 异常处理:完善的异常处理和线程中断机制

最佳实践建议:

  1. 根据业务场景合理选择同步机制(synchronized vs Lock)
  2. 使用线程池而不是直接创建线程
  3. 注意避免死锁和资源竞争
  4. 实现合适的监控和日志记录
  5. 进行充分的并发测试

这个库存管理系统可以作为电商平台的基础组件,通过进一步的扩展可以支持分布式环境、数据库持久化、缓存优化等高级特性。

Java多线程并发编程实战:构建高性能电商库存管理系统 | Java开发指南
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 java Java多线程并发编程实战:构建高性能电商库存管理系统 | Java开发指南 https://www.taomawang.com/server/java/943.html

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务