🔄 RocketMQ 顺序消费者详解

深入理解消费端如何保证消息顺序性

❌ 并发消费(ConsumeMode.CONCURRENTLY)
📨 消息队列状态
Queue 0
订单创建
(orderId=1001) TagA
订单创建
(orderId=1002) 无Tag
Queue 1
订单支付
(orderId=1001) TagB
Queue 2
订单完成
(orderId=1001) TagC
🔀 并发消费者
consumeThreadMax = 10:共享线程池
• 10个线程可处理所有队列的消息
• 多队列并发,同队列内也可能并发
• 无法保证顺序
@RocketMQMessageListener( consumeThreadMax = 10, consumeMode = ConsumeMode.CONCURRENTLY, topic = "ORDER_TOPIC", // 与生产者Topic对应 consumerGroup = "ORDER_CONSUMER_GROUP" ) public class ConcurrentConsumer implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { // 获取生产者发送的完整消息信息 String topic = messageExt.getTopic(); // ORDER_TOPIC String tags = messageExt.getTags(); // TagA, TagB, TagC 或 null String keys = messageExt.getKeys(); // orderId_1001, orderId_1002 int queueId = messageExt.getQueueId(); // 0, 1, 2 // 获取消息体 String messageBody = new String(messageExt.getBody(), StandardCharsets.UTF_8); log.info("并发消费消息 - Topic: {}, Tags: {}, Keys: {}, QueueId: {}", topic, tags, keys, queueId); log.info("并发消费: {}, Tag: {}, Key: {}", messageBody, tags, keys); // 业务处理 processMessage(messageBody); } }
T1
线程1处理Queue 0:订单创建(orderId=1001) TagA(同时执行)
T2
线程2处理Queue 1:订单支付(orderId=1001) TagB(同时执行)
T3
线程3处理Queue 2:订单完成(orderId=1001) TagC(同时执行)
并发执行!可能支付比创建先完成
⚠️ 可能出现:订单支付 → 订单创建 → 订单完成
orderId=1001状态流转错误!
🔥 问题:多线程并发消费,同一订单的不同状态消息可能被乱序处理!
⚡ 并发模式线程模型
• 10个线程组成共享线程池
• 任意线程可处理任意队列的消息
• 同一队列的消息可能被多个线程同时处理
• 高吞吐量但无法保证顺序
📋 并发消费实际日志输出:
2024-06-12 14:23:01.123 INFO [ConsumeMessageThread_1] - 并发消费消息 - Topic: ORDER_TOPIC, Tags: TagA, Keys: orderId_1001, QueueId: 0
2024-06-12 14:23:01.124 INFO [ConsumeMessageThread_1] - 并发消费: 订单创建, Tag: TagA, Key: orderId_1001

2024-06-12 14:23:01.125 INFO [ConsumeMessageThread_2] - 并发消费消息 - Topic: ORDER_TOPIC, Tags: TagB, Keys: orderId_1001, QueueId: 1
2024-06-12 14:23:01.126 INFO [ConsumeMessageThread_2] - 并发消费: 订单支付, Tag: TagB, Key: orderId_1001

2024-06-12 14:23:01.127 INFO [ConsumeMessageThread_3] - 并发消费消息 - Topic: ORDER_TOPIC, Tags: TagC, Keys: orderId_1001, QueueId: 2
2024-06-12 14:23:01.128 INFO [ConsumeMessageThread_3] - 并发消费: 订单完成, Tag: TagC, Key: orderId_1001

2024-06-12 14:23:01.129 INFO [ConsumeMessageThread_1] - 并发消费消息 - Topic: ORDER_TOPIC, Tags: null, Keys: orderId_1002, QueueId: 0
2024-06-12 14:23:01.130 INFO [ConsumeMessageThread_1] - 并发消费: 订单创建, Tag: null, Key: orderId_1002
⚠️ 注意:orderId_1001的三条消息被3个线程同时处理!可能乱序!
VS
✅ 顺序消费(ConsumeMode.ORDERLY)
📨 消息队列状态(生产者hashKey保证)
Queue 0
订单创建
(orderId=1002) 无Tag
Queue 1
订单创建
(orderId=1001) TagA
订单支付
(orderId=1001) TagB
订单完成
(orderId=1001) TagC
Queue 2
📋 顺序消费者
consumeThreadMax = 10:队列专用线程
• 每个队列独占1个线程,最多10个队列
• 单队列内严格按FIFO顺序消费
• 保证消息顺序性
@Profile({"prod"}) @Slf4j @Service @RocketMQMessageListener( consumeThreadMax = 10, consumeMode = ConsumeMode.ORDERLY, topic = "ORDER_TOPIC", // 与生产者Topic对应 consumerGroup = "ORDER_CONSUMER_GROUP" ) public class OrderlyMQConsumer implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { try { // 获取生产者发送的完整消息信息 String topic = messageExt.getTopic(); // ORDER_TOPIC String tags = messageExt.getTags(); // TagA/TagB/TagC 或 null String keys = messageExt.getKeys(); // orderId_1001/orderId_1002 String msgId = messageExt.getMsgId(); int queueId = messageExt.getQueueId(); long queueOffset = messageExt.getQueueOffset(); // 获取消息体:订单创建/订单支付/订单完成 String messageBody = new String(messageExt.getBody(), StandardCharsets.UTF_8); log.info("顺序消费消息 - Topic: {}, Tags: {}, Keys: {}, QueueId: {}, QueueOffset: {}", topic, tags, keys, queueId, queueOffset); // 业务处理逻辑 processOrderlyMessage(messageBody, messageExt); log.info("顺序消费成功 - MsgId: {}", msgId); } catch (Exception e) { log.error("顺序消费失败 - MsgId: {}, 错误信息: {}", messageExt.getMsgId(), e.getMessage(), e); // 顺序消费失败时,当前队列会被暂停消费 throw new RuntimeException("顺序消息处理失败", e); } } private void processOrderlyMessage(String messageBody, MessageExt messageExt) { String orderKey = messageExt.getKeys(); // orderId_1001 String tag = messageExt.getTags(); // TagA/TagB/TagC 或 null log.info("处理订单消息 - OrderKey: {}, Tag: {}, Content: {}", orderKey, tag, messageBody); // 根据消息内容处理不同业务状态 switch (messageBody) { case "订单创建": handleOrderCreate(orderKey, tag); break; case "订单支付": handleOrderPay(orderKey, tag); break; case "订单完成": handleOrderComplete(orderKey, tag); break; } } }
T1
Queue 1专用线程:处理"订单创建(orderId=1001) TagA"
T1
同一线程等待完成后:处理"订单支付(orderId=1001) TagB"
T1
同一线程等待完成后:处理"订单完成(orderId=1001) TagC"
单线程串行执行,严格FIFO,绝不乱序
✅ 确保顺序:订单创建 → 订单支付 → 订单完成
orderId=1001状态流转正确!
🎉 关键:ConsumeMode.ORDERLY确保单队列内消息按FIFO严格顺序消费!
🔑 consumeThreadMax = 10 的真实含义对比
并发模式:10个线程共享处理所有队列,可能多线程同时处理同一队列
顺序模式:每个队列独占1个线程,最多支持10个队列并行,但单队列内串行
📋 顺序消费实际日志输出:
2024-06-12 14:23:01.123 INFO [ConsumeMessageOrderlyService_1] - 顺序消费消息 - Topic: ORDER_TOPIC, Tags: TagA, Keys: orderId_1001, QueueId: 1, QueueOffset: 0
2024-06-12 14:23:01.124 INFO [ConsumeMessageOrderlyService_1] - 处理订单消息 - OrderKey: orderId_1001, Tag: TagA, Content: 订单创建
2024-06-12 14:23:01.345 INFO [ConsumeMessageOrderlyService_1] - 顺序消费成功 - MsgId: AC14000A18B418B4AAC208A4DB2F0000

2024-06-12 14:23:01.346 INFO [ConsumeMessageOrderlyService_1] - 顺序消费消息 - Topic: ORDER_TOPIC, Tags: TagB, Keys: orderId_1001, QueueId: 1, QueueOffset: 1
2024-06-12 14:23:01.347 INFO [ConsumeMessageOrderlyService_1] - 处理订单消息 - OrderKey: orderId_1001, Tag: TagB, Content: 订单支付
2024-06-12 14:23:01.567 INFO [ConsumeMessageOrderlyService_1] - 顺序消费成功 - MsgId: AC14000A18B418B4AAC208A4DB2F0001

2024-06-12 14:23:01.568 INFO [ConsumeMessageOrderlyService_1] - 顺序消费消息 - Topic: ORDER_TOPIC, Tags: TagC, Keys: orderId_1001, QueueId: 1, QueueOffset: 2
2024-06-12 14:23:01.569 INFO [ConsumeMessageOrderlyService_1] - 处理订单消息 - OrderKey: orderId_1001, Tag: TagC, Content: 订单完成
2024-06-12 14:23:01.789 INFO [ConsumeMessageOrderlyService_1] - 顺序消费成功 - MsgId: AC14000A18B418B4AAC208A4DB2F0002

2024-06-12 14:23:01.790 INFO [ConsumeMessageOrderlyService_2] - 顺序消费消息 - Topic: ORDER_TOPIC, Tags: null, Keys: orderId_1002, QueueId: 0, QueueOffset: 0
2024-06-12 14:23:01.791 INFO [ConsumeMessageOrderlyService_2] - 处理订单消息 - OrderKey: orderId_1002, Tag: null, Content: 订单创建
2024-06-12 14:23:01.998 INFO [ConsumeMessageOrderlyService_2] - 顺序消费成功 - MsgId: AC14000A18B418B4AAC208A4DB2F0003
✅ 注意:orderId_1001在Queue1由线程1串行处理,orderId_1002在Queue0由线程2处理!

🔑 @RocketMQMessageListener核心参数详解

📋 注解参数说明
topic: "ORDER_TOPIC" - 必须与生产者发送的Topic完全一致
consumerGroup: 消费者组名,同组消费者共同消费Topic
consumeMode: 消费模式 - ORDERLY(顺序) vs CONCURRENTLY(并发)
consumeThreadMax: 最大消费线程数,顺序消费时每个队列独占一个线程
selectorExpression: Tag过滤表达式,如"TagA||TagB||TagC"
maxReconsumeTimes: 最大重试次数,顺序消费失败会阻塞队列

🏷️ Tag处理说明:
• 生产者: TagA(订单创建), TagB(订单支付), TagC(订单完成)
• 消费者: 通过messageExt.getTags()获取,可能为null
• 过滤: 设置selectorExpression进行Tag过滤消费

🔀 并发消费特点

线程模型: 共享线程池模式
consumeThreadMax=10: 10个线程共享处理
性能: 高吞吐量
顺序性: 无法保证
适用场景: 日志、通知、统计等
// 并发消费配置 consumeMode = ConsumeMode.CONCURRENTLY consumeThreadMax = 10 // 线程分配模式 共享线程池:10个线程可处理任意队列 - Thread1: Queue0, Queue1, Queue2 (随机分配) - Thread2: Queue0, Queue1, Queue2 (随机分配) - ... - 同队列消息可能被多线程同时处理

📋 顺序消费特点

线程模型: 队列专用线程模式
consumeThreadMax=10: 最多10个队列各占1线程
性能: 相对较低但可控
顺序性: 严格保证FIFO
适用场景: 订单、账户、状态机
// 顺序消费配置 consumeMode = ConsumeMode.ORDERLY consumeThreadMax = 10 // 线程分配模式 队列专用线程:每个队列独占一个线程 - Thread1: 专门处理Queue0 (FIFO) - Thread2: 专门处理Queue1 (FIFO) - Thread3: 专门处理Queue2 (FIFO) - 单队列内消息严格按顺序处理

🔗 与生产者完全对应的消费场景

生产者发送的消息:
Topic: "ORDER_TOPIC"
消息1: "订单创建" + TagA + hashKey="orderId_1001"
消息2: "订单支付" + TagB + hashKey="orderId_1001"
消息3: "订单完成" + TagC + hashKey="orderId_1001"
消息4: "订单创建" + 无Tag + hashKey="orderId_1002"
消费者配置要求:
Topic: "ORDER_TOPIC" (必须与生产者一致)
ConsumerGroup: "ORDER_CONSUMER_GROUP" (消费者组)
ConsumeMode: ORDERLY (保证orderId_1001的消息顺序)
Tag过滤: 可选择消费特定Tag的消息

关键: 生产者使用syncSendOrderly()确保同一orderId进入同一队列,消费者使用ORDERLY模式确保该队列内消息按序处理