❌ 并发消费(ConsumeMode.CONCURRENTLY)
📨 消息队列状态
订单创建
(orderId=1001) TagA
订单创建
(orderId=1002) 无Tag
🔀 并发消费者
• consumeThreadMax = 10:共享线程池
• 10个线程可处理所有队列的消息
• 多队列并发,同队列内也可能并发
• 无法保证顺序
@RocketMQMessageListener(
consumeThreadMax = 10,
consumeMode = ConsumeMode.CONCURRENTLY,
topic = "ORDER_TOPIC", // 与生产者Topic对应
consumerGroup = "ORDER_CONSUMER_GROUP"
)
public class ConcurrentConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
// 获取生产者发送的完整消息信息
String topic = messageExt.getTopic(); // ORDER_TOPIC
String tags = messageExt.getTags(); // TagA, TagB, TagC 或 null
String keys = messageExt.getKeys(); // orderId_1001, orderId_1002
int queueId = messageExt.getQueueId(); // 0, 1, 2
// 获取消息体
String messageBody = new String(messageExt.getBody(), StandardCharsets.UTF_8);
log.info("并发消费消息 - Topic: {}, Tags: {}, Keys: {}, QueueId: {}",
topic, tags, keys, queueId);
log.info("并发消费: {}, Tag: {}, Key: {}", messageBody, tags, keys);
// 业务处理
processMessage(messageBody);
}
}
T1
线程1处理Queue 0:订单创建(orderId=1001) TagA(同时执行)
T2
线程2处理Queue 1:订单支付(orderId=1001) TagB(同时执行)
T3
线程3处理Queue 2:订单完成(orderId=1001) TagC(同时执行)
⚠️ 可能出现:订单支付 → 订单创建 → 订单完成
orderId=1001状态流转错误!
🔥 问题:多线程并发消费,同一订单的不同状态消息可能被乱序处理!
⚡ 并发模式线程模型
• 10个线程组成共享线程池
• 任意线程可处理任意队列的消息
• 同一队列的消息可能被多个线程同时处理
• 高吞吐量但无法保证顺序
📋 并发消费实际日志输出:
2024-06-12 14:23:01.123 INFO [ConsumeMessageThread_1] - 并发消费消息 - Topic: ORDER_TOPIC, Tags: TagA, Keys: orderId_1001, QueueId: 0
2024-06-12 14:23:01.124 INFO [ConsumeMessageThread_1] - 并发消费: 订单创建, Tag: TagA, Key: orderId_1001
2024-06-12 14:23:01.125 INFO [ConsumeMessageThread_2] - 并发消费消息 - Topic: ORDER_TOPIC, Tags: TagB, Keys: orderId_1001, QueueId: 1
2024-06-12 14:23:01.126 INFO [ConsumeMessageThread_2] - 并发消费: 订单支付, Tag: TagB, Key: orderId_1001
2024-06-12 14:23:01.127 INFO [ConsumeMessageThread_3] - 并发消费消息 - Topic: ORDER_TOPIC, Tags: TagC, Keys: orderId_1001, QueueId: 2
2024-06-12 14:23:01.128 INFO [ConsumeMessageThread_3] - 并发消费: 订单完成, Tag: TagC, Key: orderId_1001
2024-06-12 14:23:01.129 INFO [ConsumeMessageThread_1] - 并发消费消息 - Topic: ORDER_TOPIC, Tags: null, Keys: orderId_1002, QueueId: 0
2024-06-12 14:23:01.130 INFO [ConsumeMessageThread_1] - 并发消费: 订单创建, Tag: null, Key: orderId_1002
⚠️ 注意:orderId_1001的三条消息被3个线程同时处理!可能乱序!
✅ 顺序消费(ConsumeMode.ORDERLY)
📨 消息队列状态(生产者hashKey保证)
订单创建
(orderId=1001) TagA
订单支付
(orderId=1001) TagB
订单完成
(orderId=1001) TagC
📋 顺序消费者
• consumeThreadMax = 10:队列专用线程
• 每个队列独占1个线程,最多10个队列
• 单队列内严格按FIFO顺序消费
• 保证消息顺序性
@Profile({"prod"})
@Slf4j
@Service
@RocketMQMessageListener(
consumeThreadMax = 10,
consumeMode = ConsumeMode.ORDERLY,
topic = "ORDER_TOPIC", // 与生产者Topic对应
consumerGroup = "ORDER_CONSUMER_GROUP"
)
public class OrderlyMQConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
try {
// 获取生产者发送的完整消息信息
String topic = messageExt.getTopic(); // ORDER_TOPIC
String tags = messageExt.getTags(); // TagA/TagB/TagC 或 null
String keys = messageExt.getKeys(); // orderId_1001/orderId_1002
String msgId = messageExt.getMsgId();
int queueId = messageExt.getQueueId();
long queueOffset = messageExt.getQueueOffset();
// 获取消息体:订单创建/订单支付/订单完成
String messageBody = new String(messageExt.getBody(), StandardCharsets.UTF_8);
log.info("顺序消费消息 - Topic: {}, Tags: {}, Keys: {}, QueueId: {}, QueueOffset: {}",
topic, tags, keys, queueId, queueOffset);
// 业务处理逻辑
processOrderlyMessage(messageBody, messageExt);
log.info("顺序消费成功 - MsgId: {}", msgId);
} catch (Exception e) {
log.error("顺序消费失败 - MsgId: {}, 错误信息: {}",
messageExt.getMsgId(), e.getMessage(), e);
// 顺序消费失败时,当前队列会被暂停消费
throw new RuntimeException("顺序消息处理失败", e);
}
}
private void processOrderlyMessage(String messageBody, MessageExt messageExt) {
String orderKey = messageExt.getKeys(); // orderId_1001
String tag = messageExt.getTags(); // TagA/TagB/TagC 或 null
log.info("处理订单消息 - OrderKey: {}, Tag: {}, Content: {}",
orderKey, tag, messageBody);
// 根据消息内容处理不同业务状态
switch (messageBody) {
case "订单创建":
handleOrderCreate(orderKey, tag);
break;
case "订单支付":
handleOrderPay(orderKey, tag);
break;
case "订单完成":
handleOrderComplete(orderKey, tag);
break;
}
}
}
T1
Queue 1专用线程:处理"订单创建(orderId=1001) TagA"
T1
同一线程等待完成后:处理"订单支付(orderId=1001) TagB"
T1
同一线程等待完成后:处理"订单完成(orderId=1001) TagC"
✅ 确保顺序:订单创建 → 订单支付 → 订单完成
orderId=1001状态流转正确!
🎉 关键:ConsumeMode.ORDERLY确保单队列内消息按FIFO严格顺序消费!
🔑 consumeThreadMax = 10 的真实含义对比
并发模式:10个线程共享处理所有队列,可能多线程同时处理同一队列
顺序模式:每个队列独占1个线程,最多支持10个队列并行,但单队列内串行
📋 顺序消费实际日志输出:
2024-06-12 14:23:01.123 INFO [ConsumeMessageOrderlyService_1] - 顺序消费消息 - Topic: ORDER_TOPIC, Tags: TagA, Keys: orderId_1001, QueueId: 1, QueueOffset: 0
2024-06-12 14:23:01.124 INFO [ConsumeMessageOrderlyService_1] - 处理订单消息 - OrderKey: orderId_1001, Tag: TagA, Content: 订单创建
2024-06-12 14:23:01.345 INFO [ConsumeMessageOrderlyService_1] - 顺序消费成功 - MsgId: AC14000A18B418B4AAC208A4DB2F0000
2024-06-12 14:23:01.346 INFO [ConsumeMessageOrderlyService_1] - 顺序消费消息 - Topic: ORDER_TOPIC, Tags: TagB, Keys: orderId_1001, QueueId: 1, QueueOffset: 1
2024-06-12 14:23:01.347 INFO [ConsumeMessageOrderlyService_1] - 处理订单消息 - OrderKey: orderId_1001, Tag: TagB, Content: 订单支付
2024-06-12 14:23:01.567 INFO [ConsumeMessageOrderlyService_1] - 顺序消费成功 - MsgId: AC14000A18B418B4AAC208A4DB2F0001
2024-06-12 14:23:01.568 INFO [ConsumeMessageOrderlyService_1] - 顺序消费消息 - Topic: ORDER_TOPIC, Tags: TagC, Keys: orderId_1001, QueueId: 1, QueueOffset: 2
2024-06-12 14:23:01.569 INFO [ConsumeMessageOrderlyService_1] - 处理订单消息 - OrderKey: orderId_1001, Tag: TagC, Content: 订单完成
2024-06-12 14:23:01.789 INFO [ConsumeMessageOrderlyService_1] - 顺序消费成功 - MsgId: AC14000A18B418B4AAC208A4DB2F0002
2024-06-12 14:23:01.790 INFO [ConsumeMessageOrderlyService_2] - 顺序消费消息 - Topic: ORDER_TOPIC, Tags: null, Keys: orderId_1002, QueueId: 0, QueueOffset: 0
2024-06-12 14:23:01.791 INFO [ConsumeMessageOrderlyService_2] - 处理订单消息 - OrderKey: orderId_1002, Tag: null, Content: 订单创建
2024-06-12 14:23:01.998 INFO [ConsumeMessageOrderlyService_2] - 顺序消费成功 - MsgId: AC14000A18B418B4AAC208A4DB2F0003
✅ 注意:orderId_1001在Queue1由线程1串行处理,orderId_1002在Queue0由线程2处理!