RocketMq 定时消息和重试消息
标签(空格分隔): 消息中间件
1 定时消息
1.1 概念
定时消息是指消息发到Broker后,不能立刻被Consumer消费,要到特定的时间点或者等待特定的时间后才能被消费。
其实定时消息实现原理比较简单,如果一个topic对应的消息在发送端被设置为定时消息,那么会将该消息先存放在topic为SCHEDULE_TOPIC_XXXX的消息队列中,并将原始消息的信息存放在commitLog文件中,由于topic为SCHEDULE_TOPIC_XXXX,所以该消息不会被立即消息,然后通过定时扫描的方式,将到达延迟时间的消息,转换为正确的消息,发送到相应的队列进行消费。
1.2 延迟级别
尽管RocketMq支持定时消息,但是当前开源版本的RocketMq所支持的定时时间是有限的、不同级别的精度的时间,并不是任意无限制的定时时间。因此在每条消息上设置定时时间的 API 叫做 setDelayTimeLevel,而非 setDelayTime 这样的命名。默认 Broker服务器端有18个定时级别,每一个级别分别对应不同的延迟时间:
延迟级别 | 时间 |
---|---|
1 | 1s |
2 | 5s |
3 | 10s |
4 | 30s |
5 | 1m |
6 | 2m |
7 | 3m |
8 | 4m |
9 | 5m |
10 | 6m |
11 | 7m |
12 | 8m |
13 | 9m |
14 | 10m |
15 | 20m |
16 | 30m |
17 | 1h |
18 | 2h |
2 消息预存储
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
msg.setDelayTimeLevel(i + 1);
从中可以看出,和普通消息不同之处在于该消息设置了延迟级别,客户端在为某条消息设置上定时级别的时候,实际上级别这个字段会被作为附属属性放到消息中:
public class Message implements Serializable {
public void setDelayTimeLevel(int level) {
this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}
}
我们先前的文章提到过,发送到 Broker 服务器的消息会被存储到 CommitLog 消息文件中。那么在此处即使是定时消息也不例外,将定时消息存储下来是为了保证消息最大程度地不丢失。然而毕竟和普通消息不同,在遇到定时消息后,CommitLog 会将这条消息的话题和队列ID替换成专门用于定时的话题和相应的级别对应的队列 ID。真实的话题和队列 ID会作为属性放置到这条消息中。
- 存储消息时,延迟消息进入 Topic 为 SCHEDULE_TOPIC_XXXX。
- 延迟级别 与 消息队列编号 做固定映射:QueueId = DelayLevel - 1。
下面看一下broker对处理定时消息和普通消息的不同之处:
public class CommitLog {
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
//延迟消息专用队列
topic = ScheduleMessageService.SCHEDULE_TOPIC;
//延迟消息存放的消息队列
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
//当前消息真是的topic和消息队列保存在属性中
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 替换 Topic 和 QueueID
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
}
随后,这条消息会被存储在CommitLog消息文件中。而我们知道后台重放消息服务 ReputMessageService会一直定时执行观察CommitLog文件是否添加了新的消息。如果有了新的消息添加到commitLog文件,该消息服务会取出消息并封装为DispatchRequest请求,然后将其分发给不同的三个分发服务,分别执行 1)执行ConsumeQueue队列的创建,2)创建索引文件IndexFile结构。
而此处在创建DispatchRequest请求的时候,当遇到定时消息时,又多做了一些额外的事情。当遇见定时消息时,CommitLog计算tagsCode标签码与普通消息不同。对于定时消息,tagsCode值设置的是这条消息的投递时间(计划消费时间),即建立消费队列文件的时候,文件中的tagsCode存储的是这条消息未来在什么时候被投递:
public class CommitLog {
public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer,
final boolean checkCRC,
final boolean readBody) {
// Timing message processing
{
String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
int delayLevel = Integer.parseInt(t);
if (delayLevel > 0) {
tagsCode = this.defaultMessageStore.getScheduleMessageService()
.computeDeliverTimestamp(delayLevel,storeTimestamp);
}
}
}
}
}
如下是,发送了 10 条定时级别分别为1-10的消息以后,$HOME/store/consumequeue 文件下的消费队列文件的分布情况:
不同的定时级别对应于不同的队列 ID,定时级别减 1 得到的就是队列 ID 的值。因此级别 1-10 对应的是 0-9 的队列 ID:
public class ScheduleMessageService extends ConfigManager {
public static int delayLevel2QueueId(final int delayLevel) {
return delayLevel - 1;
}
}
3 定时消息发送
对 SCHEDULE_TOPIC_XXXX的消息队列,Broker启动的时候,会开启一个调度消息服务,此服务会监控所有定时消息队列,每一个消息队列会创建一个专门的延时消息投递任务用以到达规定时间后投递此消息:
public class ScheduleMessageService extends ConfigManager {
public void start() {
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
}
}
每个消息队里的消息投递任务,会检查自己跟踪的消息队列,并从此消息队列所对应的定时级别的偏移量中检查是否有新的定时消息到来。其中定时级别的偏移量是维护在内存中的偏移量表 offsetTable 中。每隔 10 秒钟,这个表会被持久化到磁盘上的 delayOffset.json 文件中一次:
public class ScheduleMessageService extends ConfigManager {
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
new ConcurrentHashMap<Integer, Long>(32);
public void start() {
// 每隔 10 秒钟持久化一次
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
ScheduleMessageService.this.persist();
}
},
10000,
this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
delayOffset.json 文件中存储的示例信息如下所示:
DeliverDelayedMessageTimerTask任务会根据上面的队列级别和队列偏移量从消费任务队列文件中取出偏移量最小的定时消息的 tagsCode ,由于对于topic为SCHEDULE_TOPIC_XXXX的队列,tagscode中存放的是这条消息应该的投递时间,所以只需要判断当前时间和tagscode的差值countdown的大小。如果到了,即countdown<=0,那么便会从CommitLog文件中取出消息,修正消息的话题和队列ID等信息,然后重新存储此条消息。如果还没有到,那么便会重新执行一个定时时间设置为countdown毫秒的定时任务。在完成之后,会更新当前的偏移量表,为下一次做准备:
class DeliverDelayedMessageTimerTask extends TimerTask {
public void executeOnTimeup() {
// ...
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
// 是否到时间
long countdown = deliverTimestamp - now;
if (countdown <= 0) {
// 取出消息
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
// 修正消息,设置上正确的话题和队列 ID
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
// 重新存储消息
PutMessageResult putMessageResult =
ScheduleMessageService.this.defaultMessageStore
.putMessage(msgInner);
} else {
// countdown 后投递此消息
ScheduleMessageService.this
.timer
.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);
// 更新偏移量
}
} // end of for
// 更新偏移量
}
}
4 消息重试概述
消息重试分为生产者端重试和消费者端重试,生产者端重试是指消息从 Producer 端发送到 Broker服务器的失败以后的重试情况,消费者端重试是指 Consumer 在消费消息的时候出现异常或者失败的重试情况,消息端重试其实就是通过定时消息来完成,重试次数越多,延迟级别越高。
Producer 端通过配置如下这两个两个API可以分别配置在同步发送和异步发送消息失败的时候的重试次数:
DefaultMQProducer producer =
new DefaultMQProducer("please_rename_unique_group_name");
producer.setRetryTimesWhenSendAsyncFailed(3);
producer.setRetryTimesWhenSendFailed(3);
Consumer 端在消费的时候,如果接收消息的回调函数出现了以下几种情况:
- 抛出异常
- 返回 NULL 状态
- 返回 RECONSUME_LATER 状态
超时 15 分钟没有响应 那么 Consumer 便会将消费失败的消息重新调度直到成功消费:
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // 抛出异常 // 返回 NULL 或者 RECONSUME_LATER 状态 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } });
4.1 Producer 消息发送重试
发送失败的重试方式,主要表现在发送消息的时候,会最多尝试 getRetryTimesWhenSendFailed()次发送,当成功发送以后,会直接返回发送结果给调用者。当发送失败以后,会继续进行下一次发送尝试,核心代码如下所示: ```java public class DefaultMQProducerImpl implements MQProducerInner {
private SendResult sendDefaultImpl(Message msg, / 其他参数 /) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException { int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; for (; times < timesTotal; times++) { // 尝试发送消息,发送成功 return,发送失败 continue }
}
}
### 4.2 Consumer 消息消费重试
#### 4.2.1 订阅重试话题
Consumer 在启动的时候,会执行一个函数 copySubscription() ,当用户注册的消息模型为集群模式的时候,会根据用户指定的组创建重试组话题并放入到注册信息中:
```java
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
// ...
this.copySubscription();
// ...
this.serviceState = ServiceState.RUNNING;
break;
}
}
private void copySubscription() throws MQClientException {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
// 重试话题组
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
}
}
假设用户指定的组为 “ORDER”,那么重试话题则为 “%RETRY%ORDER”,即前面加上了 “%RETRY%” 这个字符串。
Consumer 在一开始启动的时候,就为用户自动注册了订阅组的重试话题。即用户不单单只接受这个组的话题的消息,也接受这个组的重试话题的消息。这样一来,就为下文用户如何重试接受消息奠定了基础。
4.2.2 失败消息发往重试话题
当 Consumer 客户端在消费消息的时候,抛出了异常、返回了非正确消费的状态等错误的时候,这个时候 ConsumeMessageConcurrentlyService 会收集所有失败的消息,然后将每一条消息封装进 CONSUMER_SEND_MSG_BACK 的请求中,并将其发送到 Broker 服务器:
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
public void processConsumeResult(final ConsumeConcurrentlyStatus status,
/** 其他参数 **/) {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
// ...
break;
case CLUSTERING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
// 重新将消息发往 Broker 服务器
boolean result = this.sendMessageBack(msg, context);
}
// ...
break;
default:
break;
}
}
}
当消费失败的消息重新发送到服务器后,Broker会为其指定新的话题重试话题,并根据当前这条消息的已有的重试次数来选择定时级别,即将这条消息变成定时消息投放到重试话题消息队列中。可见消息消费失败后并不是立即进行新的投递,而是有一定的延迟时间的。延迟时间随着重试次数的增加而增加,也即投递的时间的间隔也越来越长:
public class SendMessageProcessor
extends AbstractSendMessageProcessor
implements NettyRequestProcessor {
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx,
final RemotingCommand request)
throws RemotingCommandException {
// 指定为重试话题
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
// 指定为延时信息,设定延时级别
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
// 重试次数增加,下次延迟级别增加
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
// 重新存储
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
// ...
}
}
在将重试消息存储到commitLog文件时,由于getDelayTimeLevel不为0而和定时消息一样,会使用“SCHEDULE_TOPIC_XXXX”和对应的延迟级别队列Id分别替换MessageExtBrokerInner对象的Topic和QueueId属性值,并将原来设置的重试队列主题(“%RETRY%+consumerGroup”)的Topic和QueueId属性值做一个备份分别存入扩展属性properties的“REAL_TOPIC”和“REAL_QID”属性中。后面由定时任务DeliverDelayedMessageTimerTask将投递时间已经到了的消息,替换真实的topic和queueId,进行投递。所以,定时延迟队列只是为了用于暂存的,然后延迟一段时间再将消息移入至重试队列中。
当然,消息如果一直消费不成功,那也不会一直无限次的尝试重新投递的。当重试次数大于最大重试次数(默认为16次)的时候,该消息将会被送往死信话题队列,认定这条话题投递无门:
public class SendMessageProcessor
extends AbstractSendMessageProcessor
implements NettyRequestProcessor {
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx,
final RemotingCommand request)
throws RemotingCommandException {
// 重试次数大于最大重试次数
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
// 死信队列话题
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
}
// ...
}
}
上述客户端消费失败信息的流程图如下所示: