🚀 RocketMQ 消息队列分配规则详解

深度解析消息如何分配到队列 & Tag的作用机制

❌ 普通发送(轮询/随机分配)
📤 生产者发送多条消息
订单创建 orderId=1001
TagA
订单支付 orderId=1001
TagB
订单完成 orderId=1001
TagC
订单创建 orderId=1002
无Tag
// 普通发送 - 默认轮询分配 producer.send(msg1); // TagA producer.send(msg2); // TagB producer.send(msg3); // TagC producer.send(msg4); // 无Tag
🎲 默认队列选择器
轮询算法:messageIndex++ % queueSize
Tag不影响队列选择!
1
msg1 → Queue 0 (index=0 % 3=0)
2
msg2 → Queue 1 (index=1 % 3=1)
3
msg3 → Queue 2 (index=2 % 3=2)
4
msg4 → Queue 0 (index=3 % 3=0)
Queue 0
📦 创建(1001)
📦 创建(1002)
Queue 1
💰 支付(1001)
Queue 2
✅ 完成(1001)
🔀 并发消费者 (ConsumeMode.CONCURRENTLY)
Tag用于消费端过滤,不影响分配
⚠️ orderId=1001的消息被分散到3个队列
消费顺序无法保证!
🔥 问题:消息按发送顺序轮询分配,相同业务的消息被分散,导致乱序!
VS
✅ 顺序发送(ShardingKey分配)
📤 生产者发送多条消息
订单创建 orderId=1001
TagA
hashKey:orderId_1001
订单支付 orderId=1001
TagB
hashKey:orderId_1001
订单完成 orderId=1001
TagC
hashKey:orderId_1001
订单创建 orderId=1002
无Tag
hashKey:orderId_1002
// 顺序发送 - 使用封装好的syncSendOrderly方法 @Resource private IMQProducer<String> mqProducer; // 发送有序消息,使用hashKey确保同订单消息进入同一队列 SendResult result = mqProducer.syncSendOrderly( "ORDER_TOPIC", "订单创建消息", "orderId_1001" // hashKey ); // 底层实现:RocketMQTemplate.syncSendOrderly Message<T> message = MessageBuilder.withPayload(msg).build(); SendResult sendResult = rocketMQTemplate.syncSendOrderly( topic, message, hashKey );
🎯 RocketMQTemplate内置队列选择
hashKey算法:hashKey.hashCode() % queueSize
相同hashKey → 相同队列
1
msg1(hashKey="orderId_1001") → Queue 1
2
msg2(hashKey="orderId_1001") → Queue 1
3
msg3(hashKey="orderId_1001") → Queue 1
4
msg4(hashKey="orderId_1002") → Queue 2
计算示例:
"orderId_1001".hashCode() % 3 = 1 → Queue 1
"orderId_1002".hashCode() % 3 = 2 → Queue 2
"orderId_1003".hashCode() % 3 = 0 → Queue 0
Queue 0
Queue 1
📦 创建(1001)
💰 支付(1001)
✅ 完成(1001)
Queue 2
📦 创建(1002)
📋 顺序消费者 (ConsumeMode.ORDERLY)
单队列内部严格按序消费
✅ orderId=1001的消息全部在Queue 1
严格按序:创建 → 支付 → 完成
🎉 关键:使用syncSendOrderly()方法,传入hashKey确保相同业务的消息路由到同一队列!

🔑 核心分配规则总结

🎲 普通发送

分配规则:轮询/随机

算法:messageIndex % queueSize

Tag作用:仅用于消费端过滤

结果:消息分散,无法保序

🎯 顺序发送

分配规则:hashKey哈希

算法:hashKey.hashCode() % queueSize

方法:syncSendOrderly(topic, msg, hashKey)

结果:相同hashKey聚合,保证顺序

🏷️ Tag的真实作用

发送端:不影响队列选择

存储端:随消息一起存储

消费端:用于消息过滤

示例:selectorExpression="TagA||TagB"

💡 完整代码示例

// 1. 接口定义 public interface IMQProducer<T> { /** * 有序发送消息 * @param topic 消息主题 * @param msg 消息 * @param hashKey 分区键(如orderId、userId等) * @return 发送结果 */ SendResult syncSendOrderly(String topic, T msg, String hashKey); } // 2. 实现类 @Service public class RocketMQProducer<T> implements IMQProducer<T> { @Resource private RocketMQTemplate rocketMQTemplate; @Override public SendResult syncSendOrderly(String topic, T msg, String hashKey) { try { Message<T> message = MessageBuilder.withPayload(msg).build(); SendResult sendResult = rocketMQTemplate.syncSendOrderly( topic, message, hashKey ); log.info("有序消息发送成功 - MsgId: {}", sendResult.getMsgId()); return sendResult; } catch (Exception e) { log.error("有序消息发送失败", e); return null; } } } // 3. 使用示例 @Autowired private IMQProducer<String> mqProducer; // 发送同一订单的消息,使用相同的hashKey String orderId = "ORDER_1001"; mqProducer.syncSendOrderly("ORDER_TOPIC", "订单创建", orderId); mqProducer.syncSendOrderly("ORDER_TOPIC", "订单支付", orderId); mqProducer.syncSendOrderly("ORDER_TOPIC", "订单完成", orderId);

• 使用订单ID作为hashKey,确保同订单消息进入同一队列

• RocketMQTemplate内部处理队列选择逻辑

• 消费端配置ORDERLY模式保证按序消费

• 简化了原生RocketMQ的MessageQueueSelector编写