🎲 普通发送
分配规则:轮询/随机
算法:messageIndex % queueSize
Tag作用:仅用于消费端过滤
结果:消息分散,无法保序
🎯 顺序发送
分配规则:hashKey哈希
算法:hashKey.hashCode() % queueSize
方法:syncSendOrderly(topic, msg, hashKey)
结果:相同hashKey聚合,保证顺序
🏷️ Tag的真实作用
发送端:不影响队列选择
存储端:随消息一起存储
消费端:用于消息过滤
示例:selectorExpression="TagA||TagB"
💡 完整代码示例
// 1. 接口定义
public interface IMQProducer<T> {
/**
* 有序发送消息
* @param topic 消息主题
* @param msg 消息
* @param hashKey 分区键(如orderId、userId等)
* @return 发送结果
*/
SendResult syncSendOrderly(String topic, T msg, String hashKey);
}
// 2. 实现类
@Service
public class RocketMQProducer<T> implements IMQProducer<T> {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Override
public SendResult syncSendOrderly(String topic, T msg, String hashKey) {
try {
Message<T> message = MessageBuilder.withPayload(msg).build();
SendResult sendResult = rocketMQTemplate.syncSendOrderly(
topic, message, hashKey
);
log.info("有序消息发送成功 - MsgId: {}", sendResult.getMsgId());
return sendResult;
} catch (Exception e) {
log.error("有序消息发送失败", e);
return null;
}
}
}
// 3. 使用示例
@Autowired
private IMQProducer<String> mqProducer;
// 发送同一订单的消息,使用相同的hashKey
String orderId = "ORDER_1001";
mqProducer.syncSendOrderly("ORDER_TOPIC", "订单创建", orderId);
mqProducer.syncSendOrderly("ORDER_TOPIC", "订单支付", orderId);
mqProducer.syncSendOrderly("ORDER_TOPIC", "订单完成", orderId);
• 使用订单ID作为hashKey,确保同订单消息进入同一队列
• RocketMQTemplate内部处理队列选择逻辑
• 消费端配置ORDERLY模式保证按序消费
• 简化了原生RocketMQ的MessageQueueSelector编写