针对类似于“秋天的第一杯奶茶”导致的爆单问题场景,设计并实现 高峰期商户动态限流与排队机制:基于 Redis 阈值控制 + RocketMQ 异步队列,超过阈值的订单进入排队并动态释放,前端实时展示等待进度,有效降低店员骑手负载,提升用户满意度,相比单纯 Redis 队列方案可靠性更高。
一、核心目标与整体流程
核心目标:在商户处理能力有限的情况下,确保订单不丢失、按顺序处理,同时让用户感知等待进度。
整体流程:
用户下单 → 订单服务(生成订单)→ 限流判断(Redis)→ ① 可处理 → 直接处理
→ ② 已爆单 → 进入RocketMQ队列 → 消费者按序处理
系统流程说明
- 下单流程:
- 用户下单请求到达订单服务
- 订单服务调用限流组件判断是否可以直接处理
- 未达阈值:直接处理订单并更新状态
- 已达阈值:将订单状态设为 “排队中”,发送到 RocketMQ 队列
- 订单处理流程:
- 消费者从队列中按顺序获取订单
- 尝试获取处理权限,成功则处理订单
- 失败则挂起队列 30 秒后重试
- 超过最大重试次数则进入死信队列
- 动态调整流程:
- 管理员通过后台调整商户阈值
- 阈值变更实时生效,影响后续的订单处理判断
- 系统根据新阈值自动调整订单处理速度
二、关键模块实现逻辑
1. 限流判断模块(基于 Redis)
核心逻辑:用 Redis 记录每个商户的 “当前活跃订单数”,与预设阈值对比,决定订单是否需要排队。
- 分布式计数:
使用 Redis 的INCR命令原子性递增 “活跃订单数”(键格式:merchant:active:orders:{merchantId}),确保多实例部署时计数准确。java运行// 尝试获取处理权限:递增后判断是否超过阈值 Long currentActive = redisTemplate.opsForValue().increment(key, 1); if (currentActive <= threshold) { /* 可处理 */ } else { /* 回退计数,进入排队 */ } - 动态阈值管理:
阈值存储在 Redis 中(键格式:merchant:threshold:{merchantId}),支持实时调整(如运营手动修改),无需重启系统。- 新订单到来时,优先从 Redis 读取商户专属阈值;无配置时使用默认阈值。
- 提供
updateThreshold方法,供管理后台调用更新阈值。
- 资源释放:
订单处理完成 / 取消时,调用release方法递减活跃订单数,避免计数堆积。
2. 订单排队模块(RocketMQ 生产者)
核心逻辑:超过阈值的订单进入 MQ 队列,通过 “商户 ID 分区” 保证同一商户的订单有序性。
- 有序性保证:
发送订单时,使用MessageQueueSelector按 “商户 ID % 队列数” 映射到固定队列,确保同一商户的订单始终进入同一个队列(RocketMQ 单队列内消息严格 FIFO)。java运行// 按商户ID分区,同一商户订单进入固定队列 return mqs.get((int) (merchantId % mqs.size())); - 消息包装:
订单消息包含order(订单详情)、retryCount(重试次数)、createTime(创建时间),便于后续重试和顺序校验。
3. 顺序处理模块(RocketMQ 消费者)
核心逻辑:从队列中按顺序取单,处理失败时固定间隔重试,确保老订单优先处理。
- 顺序消费:
使用MessageListenerOrderly(顺序消息监听器),保证同一队列内的订单按发送顺序处理(单线程消费,避免并发打乱顺序)。 - 有限次重试:
- 处理失败时,通过
context.setSuspendCurrentQueueTimeMillis(30000)让队列挂起 30 秒,而非阻塞线程(提升资源利用率)。 - 重试次数累加(
retryCount = 消息自带计数 + 本地计数),超过 60 次则进入死信队列(避免无限循环)。
- 处理失败时,通过
- 死信处理:
超过最大重试次数的订单被发送到死信队列(ORDER_DLQ_TOPIC),等待人工干预(如商户长期爆单时手动处理)。
4. 订单服务与业务整合
核心逻辑:串联限流、排队、处理流程,同时向前端提供实时状态。
- 下单入口:
用户下单时,订单服务先调用限流组件:- 未超阈值:直接处理订单(更新状态为 “处理中”)。
- 已超阈值:将订单状态设为 “排队中”,发送到 MQ 队列,并返回排队位置(通过 RocketMQ 的队列偏移量计算)。
- 状态同步:
提供getOrderStatus接口,前端可查询订单是否在排队、当前位置、预计等待时间(基于队列长度和商户处理能力估算)。
三、关键技术特性与优势
- 绝对顺序性:
同一商户的订单通过 “固定队列 + 顺序消费” 保证老订单永远在新订单前处理,避免插队。 - 动态适配:
阈值可实时调整(如高峰期临时提高热门商户阈值),订单处理速度随阈值自动变化。 - 高可靠性:
- RocketMQ 消息持久化避免订单丢失;
- 死信队列机制兜底异常订单;
- Redis 计数自带过期时间,避免内存泄漏。
- 用户体验优化:
前端可实时展示排队位置和预计等待时间,减少用户焦虑。
四、总结
该系统通过 “Redis 限流 + MQ 有序队列” 的组合,既解决了分布式环境下的计数一致性问题,又保证了订单的顺序性和可靠性。核心亮点在于:
- 用分区队列替代全局队列,实现 “单商户顺序 + 多商户隔离”;
- 用队列挂起替代线程阻塞,平衡顺序性和资源利用率;
- 用动态阈值应对流量波动,兼顾商户负载和用户体验。
这种设计特别适合突发性流量场景,既能保护商户系统不被冲垮,又能让用户清晰感知等待状态,是电商领域爆单问题的典型解决方案。
五、具体实现:
1. 限流组件(MerchantRateLimiter)
基于 Redis 实现分布式环境下的商户订单计数和阈值控制,支持动态调整阈值。
@Component
public class MerchantRateLimiter {
private final StringRedisTemplate redisTemplate;
// 商户阈值映射(从配置中心或数据库加载)
private final Map<Long, Integer> merchantThresholds;
// 默认阈值(未配置的商户使用)
private final int defaultThreshold;
// Redis中活跃订单数的key前缀
private static final String ACTIVE_ORDERS_KEY_PREFIX = "merchant:active:orders:";
// Redis中商户阈值的key前缀(支持动态更新)
private static final String THRESHOLD_KEY_PREFIX = "merchant:threshold:";
// 构造函数:注入Redis和初始阈值配置
public MerchantRateLimiter(
StringRedisTemplate redisTemplate,
@Value("#{${merchant.thresholds:{}}}") Map<Long, Integer> merchantThresholds,
@Value("${merchant.default-threshold:100}") int defaultThreshold
) {
this.redisTemplate = redisTemplate;
this.merchantThresholds = merchantThresholds;
this.defaultThreshold = defaultThreshold;
// 初始化时将阈值加载到Redis,支持后续动态更新
initThresholdsToRedis();
}
/**
* 初始化时将商户阈值加载到Redis
*/
private void initThresholdsToRedis() {
for (Map.Entry<Long, Integer> entry : merchantThresholds.entrySet()) {
redisTemplate.opsForValue().set(
THRESHOLD_KEY_PREFIX + entry.getKey(),
entry.getValue().toString()
);
}
}
/**
* 尝试获取商户订单处理权限
* @param merchantId 商户ID
* @return true:未达阈值(可处理);false:已爆单(需排队)
*/
public boolean tryAcquire(Long merchantId) {
// 1. 获取该商户的当前阈值(优先从Redis获取,支持动态更新)
int threshold = getCurrentThreshold(merchantId);
// 2. 生成Redis计数键
String key = ACTIVE_ORDERS_KEY_PREFIX + merchantId;
// 3. 原子递增并获取当前活跃订单数(Redis保证并发安全)
Long currentActive = redisTemplate.opsForValue().increment(key, 1);
// 4. 首次计数时设置过期时间(24小时,避免无效key堆积)
if (currentActive != null && currentActive == 1) {
redisTemplate.expire(key, 24, TimeUnit.HOURS);
}
// 5. 判断是否超过阈值:未超过则保留计数,超过则回退
if (currentActive != null && currentActive <= threshold) {
return true; // 允许处理
} else {
redisTemplate.opsForValue().decrement(key, 1); // 回退计数
return false; // 触发限流
}
}
/**
* 释放商户订单处理容量(订单完成/取消时调用)
* @param merchantId 商户ID
*/
public void release(Long merchantId) {
String key = ACTIVE_ORDERS_KEY_PREFIX + merchantId;
Long currentActive = redisTemplate.opsForValue().decrement(key, 1);
// 若计数归0,删除key节省内存
if (currentActive != null && currentActive <= 0) {
redisTemplate.delete(key);
}
}
/**
* 重置商户活跃订单数(用于异常场景恢复)
*/
public void reset(Long merchantId) {
redisTemplate.delete(ACTIVE_ORDERS_KEY_PREFIX + merchantId);
}
/**
* 获取商户当前阈值(支持动态更新)
*/
public int getCurrentThreshold(Long merchantId) {
String thresholdStr = redisTemplate.opsForValue().get(THRESHOLD_KEY_PREFIX + merchantId);
if (thresholdStr != null) {
return Integer.parseInt(thresholdStr);
}
// 若Redis中无值,使用默认阈值
return defaultThreshold;
}
/**
* 动态更新商户阈值(供管理后台调用)
*/
public void updateThreshold(Long merchantId, int newThreshold) {
redisTemplate.opsForValue().set(
THRESHOLD_KEY_PREFIX + merchantId,
String.valueOf(newThreshold)
);
}
/**
* 获取商户当前活跃订单数(用于前端展示)
*/
public int getCurrentActiveOrders(Long merchantId) {
String value = redisTemplate.opsForValue().get(ACTIVE_ORDERS_KEY_PREFIX + merchantId);
return value != null ? Integer.parseInt(value) : 0;
}
}
2. 消息生产者(OrderQueueProducer)
将超出阈值的订单发送到 RocketMQ,确保同一商户的订单进入固定队列,保证顺序性。
@Component
public class OrderQueueProducer {
private final RocketMQTemplate rocketMQTemplate;
private final String orderTopic;
// 从配置文件读取主题名称
public OrderQueueProducer(
RocketMQTemplate rocketMQTemplate,
@Value("${rocketmq.order-topic:MERCHANT_ORDER_QUEUE}") String orderTopic
) {
this.rocketMQTemplate = rocketMQTemplate;
this.orderTopic = orderTopic;
}
/**
* 发送订单到MQ队列
* @param order 订单对象
*/
public void sendToQueue(Order order) {
// 包装成消息对象,包含重试计数
OrderMessage message = new OrderMessage(order);
// 使用convertAndSend方法,自动序列化
rocketMQTemplate.convertAndSend(
orderTopic,
message,
order.getMerchantId(), // 分区键:商户ID
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message<?> msg, Object arg) {
// 按商户ID哈希映射到固定队列,确保同一商户订单顺序
Long merchantId = (Long) arg;
return mqs.get((int) (merchantId % mqs.size()));
}
}
);
}
/**
* 获取商户当前排队数量(用于前端展示)
*/
public long getQueueSize(Long merchantId) throws MQClientException {
// 获取指定主题的所有队列
Set<MessageQueue> mqs = rocketMQTemplate.getProducer().fetchSubscribeMessageQueues(orderTopic);
long totalSize = 0;
for (MessageQueue mq : mqs) {
// 只统计当前商户对应的队列
if (mq.getQueueId() == (int) (merchantId % mqs.size())) {
// 获取队列消息数量
long offset = rocketMQTemplate.getProducer().maxOffset(mq);
long consumerOffset = rocketMQTemplate.getConsumer().fetchConsumeOffset(mq, true);
totalSize = offset - consumerOffset;
break;
}
}
return totalSize;
}
}
3.消息消费者(OrderQueueConsumer)
从 RocketMQ 消费订单,按顺序处理,支持有限次重试和固定间隔。(此处设计为订单最多排队30min,否则进入死信队列)
@Component
public class OrderQueueConsumer {
private final DefaultMQPushConsumer consumer;
private final MerchantRateLimiter rateLimiter;
private final OrderProcessor orderProcessor;
private final RocketMQTemplate rocketMQTemplate;
private static final int MAX_RETRY = 60; // 最大重试次数
private static final long RETRY_INTERVAL = 30000L; // 30秒重试间隔
public OrderQueueConsumer(
@Value("${rocketmq.name-server}") String nameServer,
@Value("${rocketmq.order-topic}") String orderTopic,
@Value("${rocketmq.consumer-group}") String consumerGroup,
MerchantRateLimiter rateLimiter,
OrderProcessor orderProcessor,
RocketMQTemplate rocketMQTemplate
) throws MQClientException {
this.rateLimiter = rateLimiter;
this.orderProcessor = orderProcessor;
this.rocketMQTemplate = rocketMQTemplate;
// 初始化消费者
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(nameServer);
consumer.subscribe(orderTopic, "*");
// 设置消费模式为集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册顺序消息监听器(保证队列内消息顺序)
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
MessageExt msg = msgs.get(0);
OrderMessage orderMessage = JSON.parseObject(msg.getBody(), OrderMessage.class);
Order order = orderMessage.getOrder();
Long merchantId = order.getMerchantId();
// 重试次数累加(结合消息自带的重试计数)
int retryCount = orderMessage.getRetryCount() + msg.getReconsumeTimes();
orderMessage.setRetryCount(retryCount);
// 超过最大重试次数,进入死信队列
if (retryCount >= MAX_RETRY) {
sendToDeadLetterQueue(order);
return ConsumeOrderlyStatus.SUCCESS; // 提交位点,移除消息
}
// 尝试获取商家限流令牌
if (rateLimiter.tryAcquire(merchantId)) {
try {
// 处理订单
orderProcessor.process(order);
return ConsumeOrderlyStatus.SUCCESS; // 成功处理,提交位点
} finally {
// 释放令牌
rateLimiter.release(merchantId);
}
} else {
// 商家订单处理受限,挂起队列30s后重试
context.setSuspendCurrentQueueTimeMillis(RETRY_INTERVAL);
System.out.printf("商家%d的订单%s达到速率限制,挂起队列30s后重试,重试次数:%d%n",
merchantId, order.getId(), retryCount);
// 返回挂起状态,保证单商家顺序
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
});
// 启动消费者
consumer.start();
}
/**
* 发送到死信队列
*/
private void sendToDeadLetterQueue(Order order) {
System.err.println("订单" + order.getId() + "超过最大重试次数,已进入死信队列");
// 实际应用中可以发送到专门的死信主题
rocketMQTemplate.convertAndSend("ORDER_DLQ_TOPIC", order);
}
@PreDestroy
public void destroy() {
if (consumer != null) {
consumer.shutdown();
}
}
}
4.订单处理器(OrderProcessor)
处理订单的实际业务逻辑,如通知商户、更新库存等。
@Service
public class OrderProcessor {
private final OrderRepository orderRepository;
private final MerchantService merchantService;
private final NotificationService notificationService;
// 构造函数注入依赖
public OrderProcessor(OrderRepository orderRepository,
MerchantService merchantService,
NotificationService notificationService) {
this.orderRepository = orderRepository;
this.merchantService = merchantService;
this.notificationService = notificationService;
}
/**
* 处理订单的核心业务逻辑
*/
public void process(Order order) {
try {
// 1. 更新订单状态为处理中
order.setStatus("PROCESSING");
orderRepository.updateById(order);
// 2. 检查库存
boolean stockAvailable = merchantService.checkAndDeductStock(
order.getMerchantId(),
order.getProduct()
);
if (!stockAvailable) {
// 库存不足,更新订单状态
order.setStatus("OUT_OF_STOCK");
orderRepository.updateById(order);
// 通知用户
notificationService.notifyUser(order.getUserId(),
"订单" + order.getId() + "因库存不足无法处理");
return;
}
// 3. 通知商户有新订单
merchantService.notifyNewOrder(order);
// 4. 更新订单状态为已接受
order.setStatus("ACCEPTED");
orderRepository.updateById(order);
// 5. 通知用户订单已受理
notificationService.notifyUser(order.getUserId(),
"订单" + order.getId() + "已受理,正在制作中");
} catch (Exception e) {
// 处理异常,更新订单状态
order.setStatus("PROCESSING_FAILED");
orderRepository.updateById(order);
throw new RuntimeException("订单处理失败: " + e.getMessage(), e);
}
}
}
5.订单服务(OrderService)
整合限流逻辑和订单处理流程,是系统的入口点。
@Service
public class OrderService {
private final MerchantRateLimiter rateLimiter;
private final OrderQueueProducer queueProducer;
private final OrderRepository orderRepository;
private final IdGenerator idGenerator;
// 构造函数注入依赖
public OrderService(MerchantRateLimiter rateLimiter,
OrderQueueProducer queueProducer,
OrderRepository orderRepository,
IdGenerator idGenerator) {
this.rateLimiter = rateLimiter;
this.queueProducer = queueProducer;
this.orderRepository = orderRepository;
this.idGenerator = idGenerator;
}
/**
* 用户下单的核心流程
*/
@Transactional
public OrderDTO createOrder(OrderCreateRequest request) {
// 1. 生成订单
Order order = buildOrder(request);
orderRepository.insert(order);
// 2. 调用限流组件判断是否可直接处理
if (rateLimiter.tryAcquire(order.getMerchantId())) {
try {
// 3. 直接处理订单
order.setStatus("PROCESSING");
orderRepository.updateById(order);
// 4. 调用订单处理器处理订单
orderProcessor.process(order);
// 5. 返回订单信息
return convertToDTO(order);
} finally {
// 6. 释放容量
rateLimiter.release(order.getMerchantId());
}
} else {
// 7. 已爆单,发送到MQ排队
order.setStatus("PENDING");
orderRepository.updateById(order);
// 8. 发送到队列
queueProducer.sendToQueue(order);
// 9. 获取排队位置信息
long queueSize;
try {
queueSize = queueProducer.getQueueSize(order.getMerchantId());
} catch (MQClientException e) {
// 获取排队数量失败时的降级处理
queueSize = -1;
}
// 10. 返回订单信息和排队位置
OrderDTO dto = convertToDTO(order);
dto.setQueuePosition(queueSize);
dto.setEstimatedWaitingTime(calculateWaitingTime(queueSize, order.getMerchantId()));
return dto;
}
}
/**
* 构建订单对象
*/
private Order buildOrder(OrderCreateRequest request) {
Order order = new Order();
order.setId(idGenerator.generate());
order.setMerchantId(request.getMerchantId());
order.setUserId(request.getUserId());
order.setProduct(request.getProduct());
order.setCreateTime(LocalDateTime.now());
order.setStatus("CREATED");
// 设置其他订单属性...
return order;
}
/**
* 转换为DTO对象
*/
private OrderDTO convertToDTO(Order order) {
OrderDTO dto = new OrderDTO();
BeanUtils.copyProperties(order, dto);
// 其他转换逻辑...
return dto;
}
/**
* 估算等待时间
*/
private long calculateWaitingTime(long queueSize, Long merchantId) {
if (queueSize <= 0) {
return 0;
}
// 根据商户处理能力估算等待时间(秒)
int threshold = rateLimiter.getCurrentThreshold(merchantId);
// 假设每个订单平均处理时间为2分钟
return (long) (queueSize * 120.0 / threshold);
}
/**
* 获取订单状态和等待进度(供前端查询)
*/
public OrderStatusDTO getOrderStatus(String orderId) {
Order order = orderRepository.selectById(orderId);
if (order == null) {
throw new OrderNotFoundException("订单不存在: " + orderId);
}
OrderStatusDTO statusDTO = new OrderStatusDTO();
statusDTO.setOrderId(orderId);
statusDTO.setStatus(order.getStatus());
// 如果订单仍在排队,查询排队位置
if ("PENDING".equals(order.getStatus())) {
try {
long queueSize = queueProducer.getQueueSize(order.getMerchantId());
statusDTO.setQueuePosition(queueSize);
statusDTO.setEstimatedWaitingTime(
calculateWaitingTime(queueSize, order.getMerchantId())
);
} catch (MQClientException e) {
// 异常处理
}
}
return statusDTO;
}
}
6.配置文件:
# 商户阈值配置
merchant:
thresholds:
1001: 200 # 商户1001的阈值为200
1002: 150 # 商户1002的阈值为150
default-threshold: 100 # 未配置商户的默认阈值
# 自定义RocketMQ主题
rocketmq:
order-topic: MERCHANT_ORDER_QUEUE
dlq-topic: ORDER_DLQ_TOPIC
