RocketMq消息消费
标签(空格分隔): 消息中间件
1 消费方式
RocketMQ 提供了两类消费方式:PUSH 和 PULL。 PushConsumer:在大多数场景下使用,实时性更好。名字虽然是 Push 开头,实际在实现时,使用 Pull 方式实现。通过 Pull 不断不断不断轮询 Broker 获取消息。当不存在新消息时,Broker会挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和 Broker 主动 Push 做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询( Long-Polling )。本文主要讲解PushConsumer。 PullConsumer:后面再说明。
2 消费者启动
先看一下 PushConsumer 包含的服务:
- RebalanceService:均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列( MessageQueue )。当有新的 Consumer 的加入或移除,都会重新分配消息队列。
- PullMessageService:拉取消息服务,不断不断不断从 Broker 拉取消息,并提交消费任务到 ConsumeMessageService。 ConsumeMessageService:消费消息服务,不断不断不断消费消息,并处理消费结果。
- RemoteBrokerOffsetStore:Consumer 消费进度管理,负责从 Broker 获取消费进度,同步消费进度到 Broker。
- ProcessQueue :消息处理队列。
- MQClientInstance :封装对 Namesrv,Broker 的 API调用,提供给 Producer、Consumer 使用。
2.1 消费者订阅消息
生产者负责往服务器 Broker 发送消息,消费者则从Broker 获取消息。消费者获取消息采用的是订阅者模式,即消费者客户端可以任意订阅一个或者多个话题来消费消息:
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("Jodie_topic_1023", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
先看subscribe主要完成的流程:
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
//创建订阅数据
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
// 通过心跳同步Consumer信息到Broker
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
首先,创建订阅数据,包括消费者设置的过滤表达式,然后通过心跳将消费者的信息同步到broker,并且过滤信息同步到过滤服务器。 那么,Consumer是如何获取broker的信息,然后将数据同步到broker呢?下面看一下消费者的启动流程。
2.2 消费者注册监听器
public void registerMessageListener(MessageListenerConcurrently messageListener) {
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
2.3 消费者启动
启动主要流程在DefaultMQPushConsumerImpl#start中完成,
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
...
this.consumeMessageService.start();
...
mQClientFactory.start();
...
}
}
先看一下mQClientFactory.start()做了什么,
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
...
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
...
}
}
}
在消费者启动时,将相关的服务都启动。前面提到,Consumer需要将订阅信息同步到broker,那broker的获取正是通过this.startScheduledTask()定时认为来获得的。 当消费者客户端启动以后,其会每隔30秒从命名服务器查询一次用户订阅的所有话题路由信息:
private void startScheduledTask() {
...
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
...
}
主要通过 MQClientInstance.this.updateTopicRouteInfoFromNameServer();方法从Namesrv来获取每个topic以及对应的路由信息。因为在消息发送时,我们就知道每条消息会以轮循的方式均衡地分发的不同Broker的不同队列中去。由此,消费者客户端从服务器命名服务器获取下来的便是话题的所有消息队列:
在获取话题路由信息的时候,客户端还会将话题路由信息中的所有 Broker 地址保存到本地:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
...
if (changed) {
// 克隆对象的原因:topicRouteData会被设置到下面的publishInfo/subscribeInfo
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
// 更新 Broker 地址相关信息,当某个Broker心跳超时后,会被从BrokerData的brokerAddrs中移除(由Namesrv定时操作)
// Namesrv存在Slave的BrokerData,所以brokerAddrTable含有Slave的brokerAddr
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update Pub info
{
// 更新生产者里的TopicPublishInfo,Slave在注册Broker时不会生成QueueData,但会生成BrokerData
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
// Update sub info
{
// 更新订阅者(消费者)里的队列信息,Slave在注册Broker时不会生成QueueData,但会生成BrokerData
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
...
}
当消费者客户端获取到了Broker地址列表之后,其变回每隔30秒给服务器发送一条心跳数据包,告知所有Broker服务器这台消费者客户端的存在。在每次发送心跳包的同时,其数据包内还会捎带这个客户端消息订阅的一些组信息,比如用户订阅了哪几个话题等,与此相对应,每台Broker服务器会在内存中维护一份当前所有的消费者客户端列表信息:
private void startScheduledTask() {
...
// 每隔30S清空下线的Broker(Master或Slave),向Broker发送心跳,传递生产者或订阅信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
...
}
从内存brokerAddrTable中获取broker的地址信息,然后将这台消费者客户端的信息,通过心跳的方式同步给broker:
/**
* 向所有Broker发送心跳,被Namesrv关闭连接的不在其中
* 生产者只向Master发送心跳,因为只有Master才能写入数据
* 消费者向Master和Slave都发送心跳
*/
private void sendHeartbeatToAllBroker() {
...
//从内存中维护的broker地址信息,获取broker地址,发送心跳信息
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
if (consumerEmpty) {
//没有消费者,当前为纯生产者客户端
if (id != MixAll.MASTER_ID)
//生产者只向Master发送心跳
continue;
}
}
...
// 发送心跳
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
...
}
}
}
...
}
public void sendHearbeat(//
final String addr, //
final HeartbeatData heartbeatData, //
final long timeoutMillis//
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
request.setBody(heartbeatData.encode());
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
消费者客户端与 Broker 服务器进行沟通的整体流程如下图所示:
3 负载均衡
在消费者启动时,会启动负载均衡线程rebalanceService.start(),然后在一切服务启动就绪后,唤醒线程,执行负载均衡策略org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start调用this.mQClientFactory.rebalanceImmediately()立即唤醒RebalanceService服务,消息队列的负载均衡是由一个不停运行的均衡服务来定时执行的:
public void doRebalance() {
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
接着往下看,我们会知道,消息的消费存在两种模式,广播模式和集群模式。在广播模式下,当前这台消费者消费和话题相关的所有消息队列,而集群模式会先按照某种分配策略来进行消息队列的分配,得到的结果就是当前这台消费者需要消费的消息队列:
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
//广播模式
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
}
} else {
}
break;
}
case CLUSTERING: {
//集群模式
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
...
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
...
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
我们知道无论是消息生产还是消息消费,都会指定消息的topic,并且对每一个topic,消息生产者和消费者都会做负载均衡,在消息生产时,会将消息使用不同的负载均衡策略,存放到不同broker的不同队列。尽管不论是消息生产还是消息消费都需要指定消息的topic,然而实际上消息在 Broker 服务器上并不是以topic为单位进行存储的,而是采用了比topic更细粒度的消息队列来进行存储的。当消息生产者发送8条相同topic的消息,这8条topic可能存储在了不同 Broker 服务器的不同队列中。 对于消息的消息,有两种不同的消费模式,因此,当我们讨论消息队列负载均衡的时候,需要根据不同的消费模式来分别进行处理。在RocketMQ中,客户端有两种消费模式,一种是广播模式,另外一种是集群模式。如果没有指定消费模式,默认是集群模式。
我们现在假设总共有两台 Broker 服务器,假设用户使用 Producer 已经发送了 8 条消息,这 8 条消息现在均衡的分布在两台 Broker 服务器的 8 个队列中,每个队列中有一个消息。现在有 3 台都订阅了 Test 话题的消费者实例,我们来看在不同消费模式下,不同的消费者会收到哪几条消息。
3.1 广播模式
广播模式是指所有消息队列中的消息都会广播给所有的消费者客户端,对于集群中的每一个消费者,都会收到同一条消息,如下图所示,每一个消费者都能收到这 8 条消息:
下面来看一下广播模式的负载均衡策略,可以看到,每一个消费者消费所有队列中的消息:
case BROADCASTING: {
//广播模式( BROADCASTING ) 下,分配 Topic 对应的所有消息队列。
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
...
break;
}
3.2 集群模式
集群模式是指所有的消息队列会按照某种分配策略来分给不同的消费者客户端,比如消费者 1 消费前 3 个队列中的消息,消费者 2 消费中间 3 个队列中的消息,消费者 3 消费最后 3 个队列中的消息。我们现在着重看 RocketMQ 为我们提供的三个比较重要的消息队列分配策略:
(1) 平均分配策略
平均分配策略下,三个消费者的消费情况如下所示:
Consumer-1 消费前 3 个消息队列中的消息
Consumer-2 消费中间 3 个消息队列中的消息
Consumer-3 消费最后 2 个消息队列中的消息
(2) 平均分配轮循策略
平均分配轮循策略下,三个消费者的消费情况如下所示:
Consumer-1 消费 1、4、7消息队列中的消息
Consumer-2 消费 2、5、8消息队列中的消息
Consumer-3 消费 3、6消息队列中的消息
(3) 一致性哈希策略
一致性哈希算法是根据这三台消费者各自的某个有代表性的属性(我们假设就是客户端ID)来计算出三个 Hash 值,此处为了减少由于 Hash 函数选取的不理想的情况, RocketMQ 算法对于每个消费者通过在客户端ID后面添加 1、2、3 索引来使每一个消费者多生成几个哈希值。那么现在我们需要哈希的就是九个字符串:
Consumer-1-1 Consumer-1-2 Consumer-1-3 Consumer-2-1 Consumer-2-2 Consumer-2-3 Consumer-3-1 Consumer-3-2 Consumer-3-3 计算完这 9 个哈希值以后,我们按照从小到大的顺序来排列成一个环 (如图所示)。这个时候我们需要一一对这 8 个消息队列也要计算一下 Hash 值,当 Hash 值落在两个圈之间的时候,我们就选取沿着环的方向的那个节点作为这个消息队列的消费者。如下图所示 (注意: 图只是示例,并非真正的消费情况):
在一致性哈希策略下,三个消费者的消费情况如下所示:
Consumer-1 消费 1、2、3、4消息队列中的消息
Consumer-2 消费 5、8消息队列中的消息
Consumer-3 消费 6、7消息队列中的消息
集群模式下,有多种负载均衡策略,AllocateMachineRoomNearby、AllocateMessageQueueAveragely、AllocateMessageQueueAveragelyByCircle、AllocateMessageQueueByConfig、AllocateMessageQueueByMachineRoom、AllocateMessageQueueConsistentHash,其中,在消费者初始化时,默认使用的策略:
public DefaultMQPushConsumer(final String consumerGroup) {
this(consumerGroup, null, new AllocateMessageQueueAveragely());
}
初始化集群方式的负载均衡策略后,我们看一下集群模式主要完成哪些功能:
case CLUSTERING: {
//集群模式
// 获取 topic 对应的 队列 和 consumer信息
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//获取该topic和group下的所有消费者
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
...
if (mqSet != null && cidAll != null) {
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
// 根据 队列分配策略 分配消息队列
List<MessageQueue> allocateResult = null;
try {
//返回当前这台消费者被分配到的消息队列
allocateResult = strategy.allocate(this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
return;
}
...
}
4 消息拉取
4.1 创建拉取请求
负载均衡处理后得到分配给当前消费者的消息队列,然后将这些队列进行updateProcessQueueTableInRebalance处理。
/**
* 当负载均衡时,更新 消息处理队列
* - 移除 在processQueueTable && 不存在于 mqSet 里的消息队列
* - 增加 不在processQueueTable && 存在于mqSet 里的消息队列
*
* @param topic Topic
* @param mqSet 负载均衡结果后的消息队列数组
* @param isOrder 是否顺序
* @return 是否变更
*/
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,final boolean isOrder) {
boolean changed = false;
// 移除 在processQueueTable && 不存在于 mqSet 里的消息队列
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {
// 不包含的队列
pq.setDropped(true);
//移除不需要的消息队列
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
}
} else if (pq.isPullExpired()) {
// 队列拉取超时,进行清理
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
}
break;
default:
break;
}
}
}
}
// 增加 不在processQueueTable && 存在于mqSet 里的消息队列。
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
continue;
}
this.removeDirtyOffset(mq);
//创建ProcessQueue结构,加入到processQueueTable中
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
} else {
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
}
}
}
// 发起消息拉取请求
this.dispatchPullRequest(pullRequestList);
return changed;
}
流程图如下:
I. 首先检查当前RebalancePushImpl实例processQueueTable中与mqSet的包含关系
(1)如图中processQueueTable的灰色部分,表示与mqSet集合不互不包含的队列,对这些队列首先设置Dropped为true,然后看这些队列是否可以移除出processQueueTable--removeUnnecessaryMessageQueue,即每隔1s 看是否可以拿到当前队列的消费锁(tryLock()),拿到后返回true, 如果等待1s后仍然拿不到当前队列的消费锁则返回false,如果返回true则从processQueueTable移除对应的Entry
public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
// 同步队列的消费进度,并移除之。
this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
// 顺序消息
...
return false;
}
return true;
}
对于广播模式和集群模式,上述移除操作是不同的,对于广播模式,使用本地文件的消费进度,对应的persist&removeOffset不需要做任何操作,直接返回。对于集群模式,需要将本地当前队列的消费偏移量同步到broker,并且删除本地当前队列的偏移量信息。
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
II. 经过 I 处理,processQueueTable更新之后,将processQueueTable集合与mqSet的的相对补集: processQueueTable(mq) - mqSet 里的消息队列依次封装成pullRequest,然后dispatchPullRequest到pullRequestQueue中。
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
其中创建PullRequest请求比较重要的是需要计算该请求从哪里开始从broker拉取消息,计算过程由computePullFromWhere完成,从磁盘中获取拉取的偏移量:
public long computePullFromWhere(MessageQueue mq) {
long result = -1;
//默认方式为CONSUME_FROM_LAST_OFFSET
final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
switch (consumeFromWhere) {
case CONSUME_FROM_LAST_OFFSET: {
//从磁盘获取当前请求队列消费的偏移量
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
}
// First start,no offset
else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
result = 0L;
} else {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
break;
}
case CONSUME_FROM_FIRST_OFFSET: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
} else if (-1 == lastOffset) {
result = 0L;
} else {
result = -1;
}
break;
}
case CONSUME_FROM_TIMESTAMP: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
} else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
} else {
try {
long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
UtilAll.YYYYMMDDHHMMSS).getTime();
result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
break;
}
default:
break;
}
return result;
}
PushConsumer 读取消费进度有三种选项:
- CONSUME_FROM_LAST_OFFSET:一个新的消费集群第一次启动从队列的最后位置开始消费。后续再启动接着上次消费的进度开始消费。
- CONSUME_FROM_FIRST_OFFSET一个新的消费集群第一次启动从队列的最前位置开始消费。后续再启动接着上次消费的进度开始消费。
- CONSUME_FROM_TIMESTAMP一个新的消费集群第一次启动从指定时间点开始消费。后续再启动接着上次消费的进度开始消费。
经过上述处理后,待拉起消息的请求放在了pullRequestList中,之后遍历pullRequestList,对遍历的每个队列进行拉取消息,代码如下:
public void dispatchPullRequest(List<PullRequest> pullRequestList) { for (PullRequest pullRequest : pullRequestList) { this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); } }
4.2 执行拉取动作
每一个拉取请求pullRequest都关联这一个MessageQueue和一个ProcessQueue,在ProcessQueue的内部还维护了一个用来等待用户消费的消息树,在遍历每一个拉取请求时,将拉取请求放入到拉取请求队列pullRequestQueue中,会导致唤醒一个专门用来拉取消息的后台服务PullMessageService,其接收每个对队列创建PullRequest拉取消息请求,然后拉取消息: ```java public void dispatchPullRequest(ListpullRequestList) {
}for (PullRequest pullRequest : pullRequestList) { this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); }
public void executePullRequestImmediately(final PullRequest pullRequest) { try { this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); } }
唤醒拉取服务PullMessageService,不断从Broker拉取消息,并提交消费任务到 ConsumeMessageService。PullMessageService提交拉取请求,以异步非阻塞的形式执行。
```java
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
真正执行拉取动作是由org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage完成,具体的执行流程如下:
当真正尝试拉取消息之前,其会检查当前请求的内部缓存的消息数量、消息大小、消息阈值跨度是否超过了某个阈值,如果超过某个阈值,则推迟 50 毫秒重新执行这个请求:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void pullMessage(final PullRequest pullRequest) {
// ...
final ProcessQueue processQueue = pullRequest.getProcessQueue();
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
// 缓存消息数量阈值,默认为 1000
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}
// 缓存消息大小阈值,默认为 100 MB
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}
if (!this.consumeOrderly) {
// 最小偏移量和最大偏移量的阈值跨度,默认为 2000 偏移量,消费速度不能太慢
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}
}
// ...
}
}
当执行完一些必要的检查之后,客户端会将用户指定的过滤信息以及一些其它必要消费字段封装到请求信息体中,然后才开始从Broker服务器拉取这个请求从当前偏移量开始的消息,默认一次性最多拉取32条,服务器返回的响应会告诉客户端这个队列下次开始拉取时的偏移量。客户端每次都会注册一个 PullCallback 回调,用以接受服务器返回的响应信息,根据响应信息的不同状态信息,然后修正这个请求的偏移量,并进行下次请求:
public void pullMessage(final PullRequest pullRequest) {
...
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
// 设置下次拉取消息队列位置
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
// 提交拉取到的消息到消息处理队列
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// 提交消费请求
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
// 提交下次拉取消息请求
}
break;
case NO_NEW_MSG:
// 立即提交拉取消息请求
break;
case NO_MATCHED_MSG:
//提交立即拉取消息请求
break;
case OFFSET_ILLEGAL:
//延迟执行
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
//延迟执行
}
};
...
try {
//执行拉取。如果拉取请求发生异常时,提交延迟拉取消息请求
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
}
拉取的过程比较简单,主要包括两步:1)获取到broker的信息;2)以异步的方式通过netty通信,创建RequestCode.PULL_MESSAGE请求,从broker获取消息。
private void pullMessageAsync(
final String addr,
final RemotingCommand request,
final long timeoutMillis,
final PullCallback pullCallback
) throws RemotingException, InterruptedException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
assert pullResult != null;
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
pullCallback.onException(e);
}
} else {
if (!responseFuture.isSendRequestOK()) {
pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
} else if (responseFuture.isTimeout()) {
pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
responseFuture.getCause()));
} else {
pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
}
}
}
});
}
4.3 broker处理拉取请求
上面说完了消费者拉取消息时的一些机制,下章再详细说一下Broker服务器端收到拉取请求后的操作流程。 服务器在收到客户端的请求之后,会根据话题和队列ID定位到对应的消费队列。然后根据这条请求传入的offset消费队列偏移量,定位到对应的消费队列文件。偏移量指定的是消费队列文件的消费下限,而最大上限是由如下算法来进行约束的:
final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
客户端和 Broker 服务器端完整拉取消息的流程图如下所示:
4.4 客户端获取拉取结果
在broker将拉取消息结果放入Respons中后,客户端通过回调的方式获取拉取结果,如果拉取消息成功,则调用onSuccess,否则调用onException方法。同时,处理拉取结果,主要由PullAPIWrapper#processPullResult完成:
/**
* 处理拉取结果
* 1. 更新消息队列拉取消息Broker编号的映射
* 2. 解析消息,并根据订阅信息消息tagCode匹配合适消息
* @param mq 消息队列
* @param pullResult 拉取结果
* @param subscriptionData 订阅消息
* @return 拉取结果
*/
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;
// 更新消息队列拉取消息Broker编号的映射
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
// 解析消息,并根据订阅信息消息tagCode匹配合适消息
if (PullStatus.FOUND == pullResult.getPullStatus()) {
// 解析消息
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
// 根据订阅信息消息tagCode匹配合适消息
List<MessageExt> msgListFilterAgain = msgList;
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}
// 设置消息队列当前最小/最大位置到消息拓展字段
for (MessageExt msg : msgListFilterAgain) {
String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
}
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
Long.toString(pullResult.getMinOffset()));
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
Long.toString(pullResult.getMaxOffset()));
}
// 设置消息列表
pullResultExt.setMsgFoundList(msgListFilterAgain);
}
// 清空消息二进制数组
pullResultExt.setMessageBinary(null);
return pullResult;
}
处理拉取结果,主要包含三个步骤,首先,更新消息队列拉取消息的broker编号的映射,用于下次还从该broker拉取;其次,更加消费者订阅的tag,过滤匹配的消息发送给消费者;最后,更新下次拉取的偏移量,并将过滤后的消息放到消息队列的消息树中,然后发送给消费者。
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
// 提交消息小于批量消息数,直接提交消费请求
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
// 提交消息大于批量消息数,进行分拆成多个消费请求
for (int total = 0; total < msgs.size(); ) {
// 计算当前拆分请求包含的消息
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
// 提交拆分消费请求
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
// 如果被拒绝,则将当前拆分消息+剩余消息提交延迟消费请求。
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
5 消息消费
依赖于用户指定的消息监听器回调函数的不同,消息的消费分为两种:并发消费和有序消费。 并发消费没有考虑消息发送的顺序,客户端从服务器获取到消息就会直接回调给用户。而有序消费会考虑每个队列消息发送的顺序,对于某个话题的某个队列,发往这个队列的消息,客户端接受消息的顺序与发送的顺序完全一致。
下面我们分别看这两种消费模式是如何实现的。
5.1 并发消费
当用户注册消息回调类的时候,如果注册的是 MessageListenerConcurrently 回调类,那么就认为用户不关心消息的顺序问题。我们在上文提到过每个 PullRequest 都关联了一个处理队列ProcessQueue,而每个处理队列又都关联了一颗消息树 msgTreeMap。当客户端拉取到新的消息以后,其先将消息放入到这个请求所关联的处理队列的消息树中:
// 提交拉取到的消息到消息处理队列,消息放入处理队列的消息树中
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
然后提交一个消息消费请求,用以回调用户端的代码消费消息。并行消费将消息提交给ConsumeMessageConcurrentlyService服务处理,对于PUSH模式,需要消费者实现监听器,用来对PullMessageService服务定时从broker拉取的消息进行回调消费:
public void run() {
// 监听器
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
// 消费Context
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
// 消费结果状态
ConsumeConcurrentlyStatus status = null;
try {
if (msgs != null && !msgs.isEmpty()) {
// 进行消费
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
}
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
}
}
我们可以看到 msgs 是直接从服务器端拿到的最新消息,直接喂给了客户端进行消费,并未做任何有序处理。当消费成功后,会从消息树中将这些消息再给删除掉:
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
switch (status) {
case CONSUME_SUCCESS:
// 计算从consumeRequest.msgs[0]到consumeRequest.msgs[ackIndex]的消息消费成功
// 统计成功/失败数量
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
break;
case RECONSUME_LATER:
ackIndex = -1;
break;
default:
break;
}
// 处理消费失败的消息
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
// 广播模式,无论是否消费失败,不发回消息到Broker,只打印Log
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
// 发回消息失败到Broker
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
//发回broker,延迟重试,后续单独讲解
boolean result = this.sendMessageBack(msg, context);
}
// 移除消费成功消息,并更新最新消费进度
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
//更新当前队列的消费进度,只更新本地的消费进度,后续通过定时同步消费进度到broker
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
5.2 顺序消费
对于有序消息,由ConsumeMessageOrderlyService类处理消息的消费。消费者客户端每一次拉取消息请求,如果有发现新的消息,那么都会将这些消息封装为 ConsumeRequest 来喂给消费线程池,以待消费。如果消息特别多,这样一个队列可能有多个消费请求正在等待客户端消费,用户可能会先消费偏移量大的消息,后消费偏移量小的消息。所以消费同一队列的时候,需要一把锁以消费请求顺序化:
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
class ConsumeRequest implements Runnable {
@Override
public void run() {
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
// ...
}
}
}
}
顺序消息的生产和消费后续详细说明。