“秋天第一杯奶茶”的爆单限流问题:

针对类似于“秋天的第一杯奶茶”导致的爆单问题场景,设计并实现 高峰期商户动态限流与排队机制:基于 Redis 阈值控制 + RocketMQ 异步队列,超过阈值的订单进入排队并动态释放,前端实时展示等待进度,有效降低店员骑手负载,提升用户满意度,相比单纯 Redis 队列方案可靠性更高。

一、核心目标与整体流程

核心目标:在商户处理能力有限的情况下,确保订单不丢失、按顺序处理,同时让用户感知等待进度。
整体流程

用户下单 → 订单服务(生成订单)→ 限流判断(Redis)→ ① 可处理 → 直接处理  
                                          → ② 已爆单 → 进入RocketMQ队列 → 消费者按序处理

系统流程说明

  1. 下单流程
    • 用户下单请求到达订单服务
    • 订单服务调用限流组件判断是否可以直接处理
    • 未达阈值:直接处理订单并更新状态
    • 已达阈值:将订单状态设为 “排队中”,发送到 RocketMQ 队列
  2. 订单处理流程
    • 消费者从队列中按顺序获取订单
    • 尝试获取处理权限,成功则处理订单
    • 失败则挂起队列 30 秒后重试
    • 超过最大重试次数则进入死信队列
  3. 动态调整流程
    • 管理员通过后台调整商户阈值
    • 阈值变更实时生效,影响后续的订单处理判断
    • 系统根据新阈值自动调整订单处理速度

二、关键模块实现逻辑

1. 限流判断模块(基于 Redis)

核心逻辑:用 Redis 记录每个商户的 “当前活跃订单数”,与预设阈值对比,决定订单是否需要排队。

  • 分布式计数
    使用 Redis 的INCR命令原子性递增 “活跃订单数”(键格式:merchant:active:orders:{merchantId}),确保多实例部署时计数准确。java运行// 尝试获取处理权限:递增后判断是否超过阈值 Long currentActive = redisTemplate.opsForValue().increment(key, 1); if (currentActive <= threshold) { /* 可处理 */ } else { /* 回退计数,进入排队 */ }
  • 动态阈值管理
    阈值存储在 Redis 中(键格式:merchant:threshold:{merchantId}),支持实时调整(如运营手动修改),无需重启系统。
    • 新订单到来时,优先从 Redis 读取商户专属阈值;无配置时使用默认阈值。
    • 提供updateThreshold方法,供管理后台调用更新阈值。
  • 资源释放
    订单处理完成 / 取消时,调用release方法递减活跃订单数,避免计数堆积。

2. 订单排队模块(RocketMQ 生产者)

核心逻辑:超过阈值的订单进入 MQ 队列,通过 “商户 ID 分区” 保证同一商户的订单有序性。

  • 有序性保证
    发送订单时,使用MessageQueueSelector按 “商户 ID % 队列数” 映射到固定队列,确保同一商户的订单始终进入同一个队列(RocketMQ 单队列内消息严格 FIFO)。java运行// 按商户ID分区,同一商户订单进入固定队列 return mqs.get((int) (merchantId % mqs.size()));
  • 消息包装
    订单消息包含order(订单详情)、retryCount(重试次数)、createTime(创建时间),便于后续重试和顺序校验。

3. 顺序处理模块(RocketMQ 消费者)

核心逻辑:从队列中按顺序取单,处理失败时固定间隔重试,确保老订单优先处理。

  • 顺序消费
    使用MessageListenerOrderly(顺序消息监听器),保证同一队列内的订单按发送顺序处理(单线程消费,避免并发打乱顺序)。
  • 有限次重试
    • 处理失败时,通过context.setSuspendCurrentQueueTimeMillis(30000)让队列挂起 30 秒,而非阻塞线程(提升资源利用率)。
    • 重试次数累加(retryCount = 消息自带计数 + 本地计数),超过 60 次则进入死信队列(避免无限循环)。
  • 死信处理
    超过最大重试次数的订单被发送到死信队列(ORDER_DLQ_TOPIC),等待人工干预(如商户长期爆单时手动处理)。

4. 订单服务与业务整合

核心逻辑:串联限流、排队、处理流程,同时向前端提供实时状态。

  • 下单入口
    用户下单时,订单服务先调用限流组件:
    • 未超阈值:直接处理订单(更新状态为 “处理中”)。
    • 已超阈值:将订单状态设为 “排队中”,发送到 MQ 队列,并返回排队位置(通过 RocketMQ 的队列偏移量计算)。
  • 状态同步
    提供getOrderStatus接口,前端可查询订单是否在排队、当前位置、预计等待时间(基于队列长度和商户处理能力估算)。

三、关键技术特性与优势

  1. 绝对顺序性
    同一商户的订单通过 “固定队列 + 顺序消费” 保证老订单永远在新订单前处理,避免插队。
  2. 动态适配
    阈值可实时调整(如高峰期临时提高热门商户阈值),订单处理速度随阈值自动变化。
  3. 高可靠性
    • RocketMQ 消息持久化避免订单丢失;
    • 死信队列机制兜底异常订单;
    • Redis 计数自带过期时间,避免内存泄漏。
  4. 用户体验优化
    前端可实时展示排队位置和预计等待时间,减少用户焦虑。

四、总结

该系统通过 “Redis 限流 + MQ 有序队列” 的组合,既解决了分布式环境下的计数一致性问题,又保证了订单的顺序性和可靠性。核心亮点在于:

  • 分区队列替代全局队列,实现 “单商户顺序 + 多商户隔离”;
  • 队列挂起替代线程阻塞,平衡顺序性和资源利用率;
  • 动态阈值应对流量波动,兼顾商户负载和用户体验。

这种设计特别适合突发性流量场景,既能保护商户系统不被冲垮,又能让用户清晰感知等待状态,是电商领域爆单问题的典型解决方案。

五、具体实现:

1. 限流组件(MerchantRateLimiter)

基于 Redis 实现分布式环境下的商户订单计数和阈值控制,支持动态调整阈值。

@Component
public class MerchantRateLimiter {
    private final StringRedisTemplate redisTemplate;
    // 商户阈值映射(从配置中心或数据库加载)
    private final Map<Long, Integer> merchantThresholds;
    // 默认阈值(未配置的商户使用)
    private final int defaultThreshold;
    // Redis中活跃订单数的key前缀
    private static final String ACTIVE_ORDERS_KEY_PREFIX = "merchant:active:orders:";
    // Redis中商户阈值的key前缀(支持动态更新)
    private static final String THRESHOLD_KEY_PREFIX = "merchant:threshold:";

    // 构造函数:注入Redis和初始阈值配置
    public MerchantRateLimiter(
            StringRedisTemplate redisTemplate,
            @Value("#{${merchant.thresholds:{}}}") Map<Long, Integer> merchantThresholds,
            @Value("${merchant.default-threshold:100}") int defaultThreshold
    ) {
        this.redisTemplate = redisTemplate;
        this.merchantThresholds = merchantThresholds;
        this.defaultThreshold = defaultThreshold;
        
        // 初始化时将阈值加载到Redis,支持后续动态更新
        initThresholdsToRedis();
    }
    
    /**
     * 初始化时将商户阈值加载到Redis
     */
    private void initThresholdsToRedis() {
        for (Map.Entry<Long, Integer> entry : merchantThresholds.entrySet()) {
            redisTemplate.opsForValue().set(
                THRESHOLD_KEY_PREFIX + entry.getKey(), 
                entry.getValue().toString()
            );
        }
    }

    /**
     * 尝试获取商户订单处理权限
     * @param merchantId 商户ID
     * @return true:未达阈值(可处理);false:已爆单(需排队)
     */
    public boolean tryAcquire(Long merchantId) {
        // 1. 获取该商户的当前阈值(优先从Redis获取,支持动态更新)
        int threshold = getCurrentThreshold(merchantId);
        
        // 2. 生成Redis计数键
        String key = ACTIVE_ORDERS_KEY_PREFIX + merchantId;
        
        // 3. 原子递增并获取当前活跃订单数(Redis保证并发安全)
        Long currentActive = redisTemplate.opsForValue().increment(key, 1);
        
        // 4. 首次计数时设置过期时间(24小时,避免无效key堆积)
        if (currentActive != null && currentActive == 1) {
            redisTemplate.expire(key, 24, TimeUnit.HOURS);
        }
        
        // 5. 判断是否超过阈值:未超过则保留计数,超过则回退
        if (currentActive != null && currentActive <= threshold) {
            return true; // 允许处理
        } else {
            redisTemplate.opsForValue().decrement(key, 1); // 回退计数
            return false; // 触发限流
        }
    }

    /**
     * 释放商户订单处理容量(订单完成/取消时调用)
     * @param merchantId 商户ID
     */
    public void release(Long merchantId) {
        String key = ACTIVE_ORDERS_KEY_PREFIX + merchantId;
        Long currentActive = redisTemplate.opsForValue().decrement(key, 1);
        // 若计数归0,删除key节省内存
        if (currentActive != null && currentActive <= 0) {
            redisTemplate.delete(key);
        }
    }

    /**
     * 重置商户活跃订单数(用于异常场景恢复)
     */
    public void reset(Long merchantId) {
        redisTemplate.delete(ACTIVE_ORDERS_KEY_PREFIX + merchantId);
    }
    
    /**
     * 获取商户当前阈值(支持动态更新)
     */
    public int getCurrentThreshold(Long merchantId) {
        String thresholdStr = redisTemplate.opsForValue().get(THRESHOLD_KEY_PREFIX + merchantId);
        if (thresholdStr != null) {
            return Integer.parseInt(thresholdStr);
        }
        // 若Redis中无值,使用默认阈值
        return defaultThreshold;
    }
    
    /**
     * 动态更新商户阈值(供管理后台调用)
     */
    public void updateThreshold(Long merchantId, int newThreshold) {
        redisTemplate.opsForValue().set(
            THRESHOLD_KEY_PREFIX + merchantId, 
            String.valueOf(newThreshold)
        );
    }
    
    /**
     * 获取商户当前活跃订单数(用于前端展示)
     */
    public int getCurrentActiveOrders(Long merchantId) {
        String value = redisTemplate.opsForValue().get(ACTIVE_ORDERS_KEY_PREFIX + merchantId);
        return value != null ? Integer.parseInt(value) : 0;
    }
}

2. 消息生产者(OrderQueueProducer)

将超出阈值的订单发送到 RocketMQ,确保同一商户的订单进入固定队列,保证顺序性。

@Component
public class OrderQueueProducer {
    private final RocketMQTemplate rocketMQTemplate;
    private final String orderTopic;
    
    // 从配置文件读取主题名称
    public OrderQueueProducer(
            RocketMQTemplate rocketMQTemplate, 
            @Value("${rocketmq.order-topic:MERCHANT_ORDER_QUEUE}") String orderTopic
    ) {
        this.rocketMQTemplate = rocketMQTemplate;
        this.orderTopic = orderTopic;
    }

    /**
     * 发送订单到MQ队列
     * @param order 订单对象
     */
    public void sendToQueue(Order order) {
        // 包装成消息对象,包含重试计数
        OrderMessage message = new OrderMessage(order);
        
        // 使用convertAndSend方法,自动序列化
        rocketMQTemplate.convertAndSend(
            orderTopic,
            message,
            order.getMerchantId(),  // 分区键:商户ID
            new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message<?> msg, Object arg) {
                    // 按商户ID哈希映射到固定队列,确保同一商户订单顺序
                    Long merchantId = (Long) arg;
                    return mqs.get((int) (merchantId % mqs.size()));
                }
            }
        );
    }
    
    /**
     * 获取商户当前排队数量(用于前端展示)
     */
    public long getQueueSize(Long merchantId) throws MQClientException {
        // 获取指定主题的所有队列
        Set<MessageQueue> mqs = rocketMQTemplate.getProducer().fetchSubscribeMessageQueues(orderTopic);
        
        long totalSize = 0;
        for (MessageQueue mq : mqs) {
            // 只统计当前商户对应的队列
            if (mq.getQueueId() == (int) (merchantId % mqs.size())) {
                // 获取队列消息数量
                long offset = rocketMQTemplate.getProducer().maxOffset(mq);
                long consumerOffset = rocketMQTemplate.getConsumer().fetchConsumeOffset(mq, true);
                totalSize = offset - consumerOffset;
                break;
            }
        }
        return totalSize;
    }
}

3.消息消费者(OrderQueueConsumer)

从 RocketMQ 消费订单,按顺序处理,支持有限次重试和固定间隔。(此处设计为订单最多排队30min,否则进入死信队列)

@Component
public class OrderQueueConsumer {
    private final DefaultMQPushConsumer consumer;
    private final MerchantRateLimiter rateLimiter;
    private final OrderProcessor orderProcessor;
    private final RocketMQTemplate rocketMQTemplate;
    private static final int MAX_RETRY = 60; // 最大重试次数
    private static final long RETRY_INTERVAL = 30000L; // 30秒重试间隔

    public OrderQueueConsumer(
            @Value("${rocketmq.name-server}") String nameServer,
            @Value("${rocketmq.order-topic}") String orderTopic,
            @Value("${rocketmq.consumer-group}") String consumerGroup,
            MerchantRateLimiter rateLimiter,
            OrderProcessor orderProcessor,
            RocketMQTemplate rocketMQTemplate
    ) throws MQClientException {
        this.rateLimiter = rateLimiter;
        this.orderProcessor = orderProcessor;
        this.rocketMQTemplate = rocketMQTemplate;

        // 初始化消费者
        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(nameServer);
        consumer.subscribe(orderTopic, "*");
        
        // 设置消费模式为集群模式
        consumer.setMessageModel(MessageModel.CLUSTERING);

        // 注册顺序消息监听器(保证队列内消息顺序)
        consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
            MessageExt msg = msgs.get(0);
            OrderMessage orderMessage = JSON.parseObject(msg.getBody(), OrderMessage.class);
            Order order = orderMessage.getOrder();
            Long merchantId = order.getMerchantId();
            
            // 重试次数累加(结合消息自带的重试计数)
            int retryCount = orderMessage.getRetryCount() + msg.getReconsumeTimes();
            orderMessage.setRetryCount(retryCount);

            // 超过最大重试次数,进入死信队列
            if (retryCount >= MAX_RETRY) {
                sendToDeadLetterQueue(order);
                return ConsumeOrderlyStatus.SUCCESS; // 提交位点,移除消息
            }

            // 尝试获取商家限流令牌
            if (rateLimiter.tryAcquire(merchantId)) {
                try {
                    // 处理订单
                    orderProcessor.process(order);
                    return ConsumeOrderlyStatus.SUCCESS; // 成功处理,提交位点
                } finally {
                    // 释放令牌
                    rateLimiter.release(merchantId);
                }
            } else {
                // 商家订单处理受限,挂起队列30s后重试
                context.setSuspendCurrentQueueTimeMillis(RETRY_INTERVAL);
                System.out.printf("商家%d的订单%s达到速率限制,挂起队列30s后重试,重试次数:%d%n",
                        merchantId, order.getId(), retryCount);
                
                // 返回挂起状态,保证单商家顺序
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
        });

        // 启动消费者
        consumer.start();
    }

    /**
     * 发送到死信队列
     */
    private void sendToDeadLetterQueue(Order order) {
        System.err.println("订单" + order.getId() + "超过最大重试次数,已进入死信队列");
        // 实际应用中可以发送到专门的死信主题
        rocketMQTemplate.convertAndSend("ORDER_DLQ_TOPIC", order);
    }
    
    @PreDestroy
    public void destroy() {
        if (consumer != null) {
            consumer.shutdown();
        }
    }
}

4.订单处理器(OrderProcessor)

处理订单的实际业务逻辑,如通知商户、更新库存等。

@Service
public class OrderProcessor {
    private final OrderRepository orderRepository;
    private final MerchantService merchantService;
    private final NotificationService notificationService;
    
    // 构造函数注入依赖
    public OrderProcessor(OrderRepository orderRepository, 
                         MerchantService merchantService,
                         NotificationService notificationService) {
        this.orderRepository = orderRepository;
        this.merchantService = merchantService;
        this.notificationService = notificationService;
    }
    
    /**
     * 处理订单的核心业务逻辑
     */
    public void process(Order order) {
        try {
            // 1. 更新订单状态为处理中
            order.setStatus("PROCESSING");
            orderRepository.updateById(order);
            
            // 2. 检查库存
            boolean stockAvailable = merchantService.checkAndDeductStock(
                order.getMerchantId(), 
                order.getProduct()
            );
            
            if (!stockAvailable) {
                // 库存不足,更新订单状态
                order.setStatus("OUT_OF_STOCK");
                orderRepository.updateById(order);
                // 通知用户
                notificationService.notifyUser(order.getUserId(), 
                    "订单" + order.getId() + "因库存不足无法处理");
                return;
            }
            
            // 3. 通知商户有新订单
            merchantService.notifyNewOrder(order);
            
            // 4. 更新订单状态为已接受
            order.setStatus("ACCEPTED");
            orderRepository.updateById(order);
            
            // 5. 通知用户订单已受理
            notificationService.notifyUser(order.getUserId(), 
                "订单" + order.getId() + "已受理,正在制作中");
                
        } catch (Exception e) {
            // 处理异常,更新订单状态
            order.setStatus("PROCESSING_FAILED");
            orderRepository.updateById(order);
            throw new RuntimeException("订单处理失败: " + e.getMessage(), e);
        }
    }
}

5.订单服务(OrderService)

整合限流逻辑和订单处理流程,是系统的入口点。

@Service
public class OrderService {
    private final MerchantRateLimiter rateLimiter;
    private final OrderQueueProducer queueProducer;
    private final OrderRepository orderRepository;
    private final IdGenerator idGenerator;
    
    // 构造函数注入依赖
    public OrderService(MerchantRateLimiter rateLimiter,
                       OrderQueueProducer queueProducer,
                       OrderRepository orderRepository,
                       IdGenerator idGenerator) {
        this.rateLimiter = rateLimiter;
        this.queueProducer = queueProducer;
        this.orderRepository = orderRepository;
        this.idGenerator = idGenerator;
    }
    
    /**
     * 用户下单的核心流程
     */
    @Transactional
    public OrderDTO createOrder(OrderCreateRequest request) {
        // 1. 生成订单
        Order order = buildOrder(request);
        orderRepository.insert(order);
        
        // 2. 调用限流组件判断是否可直接处理
        if (rateLimiter.tryAcquire(order.getMerchantId())) {
            try {
                // 3. 直接处理订单
                order.setStatus("PROCESSING");
                orderRepository.updateById(order);
                
                // 4. 调用订单处理器处理订单
                orderProcessor.process(order);
                
                // 5. 返回订单信息
                return convertToDTO(order);
            } finally {
                // 6. 释放容量
                rateLimiter.release(order.getMerchantId());
            }
        } else {
            // 7. 已爆单,发送到MQ排队
            order.setStatus("PENDING");
            orderRepository.updateById(order);
            
            // 8. 发送到队列
            queueProducer.sendToQueue(order);
            
            // 9. 获取排队位置信息
            long queueSize;
            try {
                queueSize = queueProducer.getQueueSize(order.getMerchantId());
            } catch (MQClientException e) {
                // 获取排队数量失败时的降级处理
                queueSize = -1;
            }
            
            // 10. 返回订单信息和排队位置
            OrderDTO dto = convertToDTO(order);
            dto.setQueuePosition(queueSize);
            dto.setEstimatedWaitingTime(calculateWaitingTime(queueSize, order.getMerchantId()));
            return dto;
        }
    }
    
    /**
     * 构建订单对象
     */
    private Order buildOrder(OrderCreateRequest request) {
        Order order = new Order();
        order.setId(idGenerator.generate());
        order.setMerchantId(request.getMerchantId());
        order.setUserId(request.getUserId());
        order.setProduct(request.getProduct());
        order.setCreateTime(LocalDateTime.now());
        order.setStatus("CREATED");
        // 设置其他订单属性...
        return order;
    }
    
    /**
     * 转换为DTO对象
     */
    private OrderDTO convertToDTO(Order order) {
        OrderDTO dto = new OrderDTO();
        BeanUtils.copyProperties(order, dto);
        // 其他转换逻辑...
        return dto;
    }
    
    /**
     * 估算等待时间
     */
    private long calculateWaitingTime(long queueSize, Long merchantId) {
        if (queueSize <= 0) {
            return 0;
        }
        // 根据商户处理能力估算等待时间(秒)
        int threshold = rateLimiter.getCurrentThreshold(merchantId);
        // 假设每个订单平均处理时间为2分钟
        return (long) (queueSize * 120.0 / threshold);
    }
    
    /**
     * 获取订单状态和等待进度(供前端查询)
     */
    public OrderStatusDTO getOrderStatus(String orderId) {
        Order order = orderRepository.selectById(orderId);
        if (order == null) {
            throw new OrderNotFoundException("订单不存在: " + orderId);
        }
        
        OrderStatusDTO statusDTO = new OrderStatusDTO();
        statusDTO.setOrderId(orderId);
        statusDTO.setStatus(order.getStatus());
        
        // 如果订单仍在排队,查询排队位置
        if ("PENDING".equals(order.getStatus())) {
            try {
                long queueSize = queueProducer.getQueueSize(order.getMerchantId());
                statusDTO.setQueuePosition(queueSize);
                statusDTO.setEstimatedWaitingTime(
                    calculateWaitingTime(queueSize, order.getMerchantId())
                );
            } catch (MQClientException e) {
                // 异常处理
            }
        }
        
        return statusDTO;
    }
}

6.配置文件:

# 商户阈值配置
merchant:
  thresholds:
    1001: 200  # 商户1001的阈值为200
    1002: 150  # 商户1002的阈值为150
  default-threshold: 100  # 未配置商户的默认阈值

# 自定义RocketMQ主题
rocketmq:
  order-topic: MERCHANT_ORDER_QUEUE
  dlq-topic: ORDER_DLQ_TOPIC

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇