一、顺序消息
顺序消息是Apache RocketMQ提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。相比其他类型消息,顺序消息在发送、存储和投递的处理过程中,更多强调多条消息间的先后顺序关系。
Apache RocketMQ顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息
设置
归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。基于
消息组的顺序判定逻辑,支持按照业务逻辑做细粒度拆分,可以在满足业务局部顺序的前提下提高系统的并行度和吞吐能力。
Apache RocketMQ的消息的顺序性分为两部分,生产顺序性和消费顺序性
二、生产顺序性
Apache RocketMQ通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化
如需保证消息生产的顺序性,则必须满足以下条件:
条件 | 描述 |
---|---|
单一生产者 | 消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序 |
串行发送 | Apache RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序 |
满足以上条件的生产者,将顺序消息发送至Apache RocketMQ后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:
(1) 相同消息组的消息按照先后顺序被存储在同一个队列
(2) 不同消息组的消息可以混合在同一个队列中,且不保证连续
三、消费顺序性
Apache RocketMQ 通过消费者和服务端的协议保障消息消费严格按照存储的先后顺序来处理
如需保证消息消费的顺序性,则必须满足以下条件:
条件 | 描述 |
---|---|
投递顺序 | Apache RocketMQ通过客户端SDK和服务端通信协议保障消息按照服务端存储顺序投递,但业务方消费消息时需要严格按照接收—处理—应答的语义处理消息,避免因异步处理导致消息乱序。消费者类型为PushConsumer时,Apache RocketMQ保证消息按照存储顺序一条一条投递给消费者,若消费者类型为SimpleConsumer,则消费者有可能一次拉取多条消息。此时,消息消费的顺序性需要由业务方自行保证 |
有限重试 | Apache RocketMQ顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理 |
四、顺序消息生命周期
(1) 初始化(initialized)
消息被生产者构建并完成初始化,待发送到服务端的状态
(2) 待消费(ready)
消息被发送到服务端,对消费者可见,等待消费者消费的状态
(3) 消费中(inflight)
消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理
(4) 消费提交(acked)
消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)
Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费
(5)消息删除(deleted)
Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除
五、RocketMQ 5.0使用顺序消息示例
(1)创建顺序主题
Apache RocketMQ 5.0版本下创建主题操作,需使用mqadmin工具
./bin/mqadmin updateTopic -c DefaultCluster -t FIFOTopic -o true -n 127.0.0.1:9876 -a +message.type=FIFO
参数 | 描述 |
---|---|
-c | 集群名称 |
-t | Topic名称 |
-n | Nameserver地址 |
-o | 创建顺序消息主题 |
(2)创建顺序订阅消费组
./bin/mqadmin updateSubGroup -c DefaultCluster -g FIFOGroup -n 127.0.0.1:9876 -o true
参数 | 描述 |
---|---|
-c | 集群名称 |
-g | ConsumerGroup名称 |
-n | Nameserver地址 |
-o | 创建顺序订阅消费组 |
(3)发送顺序消息代码示例(Java)
ClientServiceProvider provider = ClientServiceProvider.loadService();
StaticSessionCredentialsProvider sscProvider = new StaticSessionCredentialsProvider("accessKey","secretKey");
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints(this.endPoint)
.setCredentialProvider(sscProvider)
.build();
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(config)
.build();
Message message = provider.newMessageBuilder()
// 设置消息所在主题
.setTopic("topic")
//设置顺序消息的排序分组
.setMessageGroup("messageGroup")
// 设置消息体
.setBody("messageBody".getBytes())
.build();
try {
// 发送消息,需要关注发送结果,并捕获失败等异常
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}