一、项目概述与需求分析
在高并发电商场景中,库存管理是一个经典的多线程并发问题。本文将带领您使用Java多线程技术构建一个线程安全的库存管理系统,解决超卖、库存不一致等常见问题。
1.1 系统需求
- 支持多用户同时下单
- 防止库存超卖
- 实现库存的原子性操作
- 支持库存查询和统计
- 提供订单处理日志
二、核心类设计
2.1 库存商品类 (InventoryItem)
public class InventoryItem {
private final String productId;
private final String productName;
private volatile int stockQuantity;
private final int maxStock;
public InventoryItem(String productId, String productName, int initialStock, int maxStock) {
this.productId = productId;
this.productName = productName;
this.stockQuantity = initialStock;
this.maxStock = maxStock;
}
// 线程安全的库存减少方法
public synchronized boolean decreaseStock(int quantity) {
if (quantity <= 0 || stockQuantity < quantity) {
return false;
}
stockQuantity -= quantity;
return true;
}
// 线程安全的库存增加方法
public synchronized boolean increaseStock(int quantity) {
if (quantity maxStock) {
return false;
}
stockQuantity += quantity;
return true;
}
// 获取当前库存(使用volatile保证可见性)
public int getStockQuantity() {
return stockQuantity;
}
// 省略getter方法
}
2.2 库存管理器 (InventoryManager)
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
public class InventoryManager {
private final Map<String, InventoryItem> inventoryMap;
private final AtomicLong totalOrders;
private final AtomicLong successfulOrders;
private final AtomicLong failedOrders;
public InventoryManager() {
this.inventoryMap = new ConcurrentHashMap();
this.totalOrders = new AtomicLong(0);
this.successfulOrders = new AtomicLong(0);
this.failedOrders = new AtomicLong(0);
}
// 添加商品到库存
public void addProduct(InventoryItem item) {
inventoryMap.put(item.getProductId(), item);
}
// 处理订单(线程安全)
public boolean processOrder(String productId, int quantity, String customerId) {
totalOrders.incrementAndGet();
InventoryItem item = inventoryMap.get(productId);
if (item == null) {
failedOrders.incrementAndGet();
return false;
}
boolean success = item.decreaseStock(quantity);
if (success) {
successfulOrders.incrementAndGet();
logOrder(productId, quantity, customerId, "SUCCESS");
return true;
} else {
failedOrders.incrementAndGet();
logOrder(productId, quantity, customerId, "FAILED - Insufficient stock");
return false;
}
}
// 库存补充
public boolean restockProduct(String productId, int quantity) {
InventoryItem item = inventoryMap.get(productId);
if (item == null) {
return false;
}
return item.increaseStock(quantity);
}
// 订单日志
private void logOrder(String productId, int quantity, String customerId, String status) {
System.out.println(String.format("Order Log: Product=%s, Quantity=%d, Customer=%s, Status=%s, Time=%s",
productId, quantity, customerId, status, new java.util.Date()));
}
// 获取库存统计
public void displayInventoryStatus() {
System.out.println("n=== Current Inventory Status ===");
inventoryMap.forEach((id, item) -> {
System.out.println(String.format("Product: %s, Stock: %d",
item.getProductName(), item.getStockQuantity()));
});
System.out.println(String.format("Total Orders: %d, Successful: %d, Failed: %d",
totalOrders.get(), successfulOrders.get(), failedOrders.get()));
}
}
2.3 订单处理任务 (OrderProcessingTask)
import java.util.Random;
public class OrderProcessingTask implements Runnable {
private final InventoryManager inventoryManager;
private final String taskName;
private final Random random = new Random();
public OrderProcessingTask(InventoryManager inventoryManager, String taskName) {
this.inventoryManager = inventoryManager;
this.taskName = taskName;
}
@Override
public void run() {
try {
// 模拟随机订单处理
String[] productIds = {"P001", "P002", "P003"};
String productId = productIds[random.nextInt(productIds.length)];
int quantity = random.nextInt(5) + 1; // 1-5件商品
String customerId = "Customer_" + (random.nextInt(1000) + 1);
System.out.println(String.format("%s processing order: %s x%d for %s",
taskName, productId, quantity, customerId));
boolean success = inventoryManager.processOrder(productId, quantity, customerId);
if (success) {
System.out.println(String.format("%s: Order successful for %s x%d",
taskName, productId, quantity));
} else {
System.out.println(String.format("%s: Order failed for %s x%d",
taskName, productId, quantity));
}
// 模拟处理时间
Thread.sleep(random.nextInt(200));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(taskName + " was interrupted");
}
}
}
三、线程池配置与性能优化
3.1 自定义线程工厂
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class InventoryThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public InventoryThreadFactory(String poolName) {
namePrefix = poolName + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, namePrefix + threadNumber.getAndIncrement());
thread.setPriority(Thread.NORM_PRIORITY);
thread.setUncaughtExceptionHandler((t, e) -> {
System.err.println("Uncaught exception in thread: " + t.getName());
e.printStackTrace();
});
return thread;
}
}
3.2 拒绝策略处理器
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class InventoryRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.err.println("Task rejected: " + r.toString());
// 可以在这里添加日志记录、报警等逻辑
if (!executor.isShutdown()) {
try {
// 尝试重新放入队列
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Rejected task handling interrupted");
}
}
}
}
四、完整系统实现与测试
4.1 主应用程序 (InventorySystemApplication)
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class InventorySystemApplication {
public static void main(String[] args) {
// 初始化库存管理器
InventoryManager inventoryManager = new InventoryManager();
// 添加示例商品
inventoryManager.addProduct(new InventoryItem("P001", "iPhone 15", 100, 200));
inventoryManager.addProduct(new InventoryItem("P002", "MacBook Pro", 50, 100));
inventoryManager.addProduct(new InventoryItem("P003", "AirPods Pro", 200, 500));
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, // 核心线程数
20, // 最大线程数
60L, TimeUnit.SECONDS, // 空闲线程存活时间
new ArrayBlockingQueue(100), // 工作队列
new InventoryThreadFactory("InventoryPool"), // 线程工厂
new InventoryRejectedExecutionHandler() // 拒绝策略
);
// 提交订单处理任务
for (int i = 1; i <= 50; i++) {
OrderProcessingTask task = new OrderProcessingTask(
inventoryManager, "OrderProcessor-" + i);
executor.execute(task);
// 随机间隔提交任务,模拟真实场景
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
// 优雅关闭线程池
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
// 显示最终库存状态
inventoryManager.displayInventoryStatus();
// 性能统计
System.out.println("n=== Thread Pool Statistics ===");
System.out.println("Core pool size: " + executor.getCorePoolSize());
System.out.println("Maximum pool size: " + executor.getMaximumPoolSize());
System.out.println("Completed task count: " + executor.getCompletedTaskCount());
System.out.println("Active thread count: " + executor.getActiveCount());
}
}
4.2 测试用例 (InventoryTest)
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
public class InventoryTest {
private InventoryManager inventoryManager;
private InventoryItem testItem;
@BeforeEach
void setUp() {
inventoryManager = new InventoryManager();
testItem = new InventoryItem("TEST001", "Test Product", 10, 20);
inventoryManager.addProduct(testItem);
}
@Test
void testDecreaseStockSuccess() {
assertTrue(testItem.decreaseStock(5));
assertEquals(5, testItem.getStockQuantity());
}
@Test
void testDecreaseStockFailure() {
assertFalse(testItem.decreaseStock(15));
assertEquals(10, testItem.getStockQuantity());
}
@Test
void testConcurrentOrderProcessing() throws InterruptedException {
Thread thread1 = new Thread(() -> {
inventoryManager.processOrder("TEST001", 3, "Customer1");
});
Thread thread2 = new Thread(() -> {
inventoryManager.processOrder("TEST001", 4, "Customer2");
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
// 验证库存正确性
assertTrue(testItem.getStockQuantity() >= 3);
}
@Test
void testRestockFunctionality() {
assertTrue(inventoryManager.restockProduct("TEST001", 5));
assertEquals(15, testItem.getStockQuantity());
assertFalse(inventoryManager.restockProduct("TEST001", 10)); // 超过最大库存
assertEquals(15, testItem.getStockQuantity());
}
}
五、高级特性与优化建议
5.1 使用ReadWriteLock优化读操作
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class OptimizedInventoryItem {
private final String productId;
private int stockQuantity;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// 使用读锁优化库存查询
public int getStockQuantity() {
lock.readLock().lock();
try {
return stockQuantity;
} finally {
lock.readLock().unlock();
}
}
// 使用写锁保证库存修改的线程安全
public boolean decreaseStock(int quantity) {
lock.writeLock().lock();
try {
if (quantity <= 0 || stockQuantity < quantity) {
return false;
}
stockQuantity -= quantity;
return true;
} finally {
lock.writeLock().unlock();
}
}
}
5.2 使用CompletableFuture进行异步处理
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class AsyncInventoryService {
private final InventoryManager inventoryManager;
private final ExecutorService asyncExecutor;
public AsyncInventoryService(InventoryManager inventoryManager) {
this.inventoryManager = inventoryManager;
this.asyncExecutor = Executors.newFixedThreadPool(4);
}
public CompletableFuture<Boolean> processOrderAsync(String productId, int quantity, String customerId) {
return CompletableFuture.supplyAsync(() ->
inventoryManager.processOrder(productId, quantity, customerId),
asyncExecutor
);
}
public CompletableFuture<Void> batchProcessOrders(List<Order> orders) {
List<CompletableFuture<Boolean>> futures = orders.stream()
.map(order -> processOrderAsync(
order.getProductId(),
order.getQuantity(),
order.getCustomerId()))
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
}
六、总结与最佳实践
通过本实战教程,我们构建了一个完整的Java多线程库存管理系统,涵盖了以下核心内容:
- 线程安全设计:使用synchronized关键字和ConcurrentHashMap保证数据一致性
- 线程池配置:合理配置线程池参数,使用自定义线程工厂和拒绝策略
- 性能优化:使用ReadWriteLock分离读写操作,提高并发性能
- 异步处理:利用CompletableFuture实现非阻塞操作
- 异常处理:完善的异常处理和线程中断机制
最佳实践建议:
- 根据业务场景合理选择同步机制(synchronized vs Lock)
- 使用线程池而不是直接创建线程
- 注意避免死锁和资源竞争
- 实现合适的监控和日志记录
- 进行充分的并发测试
这个库存管理系统可以作为电商平台的基础组件,通过进一步的扩展可以支持分布式环境、数据库持久化、缓存优化等高级特性。