1 消费队列ConsumeQueue
从图中可以看出ConsumeQueue主要存放CommitLog的偏移量,首先我们应该知道,消息往 Broker 存储就是在向CommitLog消息文件中写入数据的一个过程。那ConsumeQueue队列是如何创建的呢?这与ReputMessageService服务相关。在 Broker 启动过程中,其会启动一个叫做ReputMessageService的服务,这个服务每隔 1 秒会检查一下这个CommitLog是否有新的数据写入。ReputMessageService 自身维护了一个偏移量reputFromOffset,用以对比和CommitLog文件中的消息总偏移量的差距。当这两个偏移量不同的时候,就代表有新的消息到来了:
class ReputMessageService extends ServiceThread {
/**
* 开始重放消息的CommitLog物理位置
*/
private volatile long reputFromOffset = 0;
private boolean isCommitLogAvailable() {
// 看当前有没有新的消息到来
return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}
@Override
public void run() {
while (!this.isStopped()) {
try {
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
}
}
1.1 创建消息队列
在有新的消息到来之后,先检查是否commitLog生产消息,如果需要,doReput()函数会取出新到来的所有消息,每一条消息都会封装为一个 DispatchRequest请求,进而将这条请求分发给不同的请求消费者:
private void doReput() {
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
// 获取从reputFromOffset开始的commitLog对应的MappedFile对应的MappedByteBuffer
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
// 生成重放消息重放调度请求
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
// 消息长度
int size = dispatchRequest.getMsgSize();
// 根据请求的结果处理
if (dispatchRequest.isSuccess()) {
if (size > 0) {
// 执行调度请求 1. 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue 2. 建立 索引信息 到 IndexFile
//将commitLog偏移量写入队列 MappedFile 文件,后续需要持久化到磁盘
DefaultMessageStore.this.doDispatch(dispatchRequest);
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
//当 Broker 是主节点 && Broker 开启的是长轮询,通知消费队列有新的消息。
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(),dispatchRequest.getConsumeQueueOffset() + 1,dispatchRequest.getTagsCode(),dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(),dispatchRequest.getPropertiesMap());
}
this.reputFromOffset += size;
readSize += size;
} else if (size == 0) {
//请求对应的是 Blank,即文件尾,跳转指向下一个 MappedFile
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {
}
}
} finally {
result.release();
}
} else {
doNext = false;
}
}
}
CommitLog文件由多个MappedFile文件组成,其中如果最后一个MappedFile文件存放不下当前消息,则存入空白消息,正常消息和空白消息通过MAGIC CODE来控制,如果为MESSAGE_MAGIC_CODE,说明当前位置存放的是正常消息;否则,为BLANK_MAGIC_CODE,说明存放的是空白消息;
如果返回MESSAGE_MAGIC_CODE,则调用doDispatch执行调度请求;否则,获取下一个MappedFile文件。
doDispatch主要执行请求调度,对创建的dispatchRequest请求做分发,在RocketMq中实现了多种分发器,CommitLogDispatcherBuildConsumeQueue(用来构建ConsumeQueue队列)、CommitLogDispatcherBuildIndex(用来构建查询消息的索引文件)、CommitLogDispatcherCalcBitMap。
对于CommitLogDispatcherBuildConsumeQueue分发器,会对非事务消息或者已经提交事务的消息进行构建,创建ConsumeQueue队列:
/**
* 建立 消息位置信息 到 ConsumeQueue
* @param dispatchRequest
*/
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
//由一个topic和对应的队列id,可以获取一个老的ConsumeQueue队列,如果没有则新创建一个队列
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
cq.putMessagePositionInfoWrapper(dispatchRequest);
}
然后重试30次,将该条消息的信息写入ConsumeQueue队列,此时创建的ConsumeQueue文件并没有持久化到磁盘,后续持久化操作由定时调度执行。
/**
* 添加位置信息,并返回添加是否成功
* @param offset offset commitLog存储位置
* @param size 消息长度
* @param tagsCode 消息tagsCode
* @param cqOffset 队列位置
* @return
*/
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
// 如果已经重放过,直接返回成功
if (offset <= this.maxPhysicOffset) {
return true;
}
// 写入位置信息到byteBuffer
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
//消息在commitLog文件中的偏移量
this.byteBufferIndex.putLong(offset);
//消息的大小
this.byteBufferIndex.putInt(size);
//过滤tag
this.byteBufferIndex.putLong(tagsCode);
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
// 计算consumeQueue存储位置,并获得对应的MappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
}
// 设置commitLog重放消息到ConsumeQueue位置。
this.maxPhysicOffset = offset;
// 插入mappedFile
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
1.2 持久化消息队列
以上阐述了消费队列创建并存储消息的一个过程,但是消费队列文件中的消息是需要持久化到磁盘中去的。持久化的过程是通过后台服务 FlushConsumeQueueService 来定时持久化的,每隔1s执行一次:
private void doFlush(int retryTimes) {
//每次至少刷2页
int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
// retryTimes == RETRY_TIMES_OVER时,进行强制flush。主要用于shutdown时。
if (retryTimes == RETRY_TIMES_OVER) {
flushConsumeQueueLeastPages = 0;
}
long logicsMsgTimestamp = 0;
int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
// 当时间满足flushConsumeQueueThoroughInterval时,即使写入的数量不足flushConsumeQueueLeastPages,也进行flush
this.lastFlushTimestamp = currentTimeMillis;
flushConsumeQueueLeastPages = 0;
logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
}
ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
// flush消费队列
for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
for (ConsumeQueue cq : maps.values()) {
boolean result = false;
for (int i = 0; i < retryTimes && !result; i++) {
result = cq.flush(flushConsumeQueueLeastPages);
}
}
}
}
1.3 ConsumeQueue创建流程
ConsumeQueue创建主要和两个服务相关,ReputMessageService和FlushConsumeQueueService服务,这两个服务每隔1s检查dan当前写入消息的情况,然后执行对应的操作。上述过程体现在磁盘文件的变化如下图所示,commitLog文件夹下面包含多个MappedFile文件,文件中存放的是完整的消息,来一条消息,向MappedFile文件中追加一条消息。同时每隔1s,会检查commitLog文件的偏移量和reputFromOffset的大小,决定是否需要创建ConsumeQueue队列。根据从commitlog读取的消息信息,确定这一条消息属于TopicTest话题下的哪一个队列,又会往相应的consumequeue文件下的相应消费队列文件中追加消息的偏移量、消息大小和标签码:
总流程图如下所示:
2 ConsumeQueue偏移量
上面我们已经知道Broker服务器存储了各个消费队列,客户端就是通过这些消费队列来消费每个消费队列中的消息的,通过topic+queueId就是唯一确定一个消费队列。消费模式的不同,每个客户端所消费的消息队列也不同。那么客户端如何记录自己所消费的队列消费到哪里了呢?答案就是消费队列偏移量。
针对同一话题,在集群模式下,由于每个客户端所消费的消息队列不同,每个消费队列可能被不同的消费者消费,所以每个消息队列已经消费到哪里的消费偏移量是记录在 Broker服务器端的,这样每个消费这个队列的客户端都能获取到已经消费的偏移量。而在广播模式下,由于每个客户端分配消费这个话题的所有消息队列,所以每个消息队列已经消费到哪里的消费偏移量是记录在客户端本地的。 下面分别讲述两种模式下偏移量是如何获取和更新的:
2.1 集群模式
在集群模式下,消费者客户端在内存中维护了一个 offsetTable 表:
public class RemoteBrokerOffsetStore implements OffsetStore {
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>();
}
通用在broker服务器端也维护了一个偏移量表:
public class ConsumerOffsetManager extends ConfigManager {
private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
}
由于在集群模式下,队列的消费偏移量是存放在broker,所以RebalanceService服务在启动后,会定时地(默认20秒)从Broker服务器获取当前客户端所需要消费的消息队列,并与当前消费者客户端的消费队列进行对比,看是否有变化。对于每个消费队列,会从Broker服务器查询这个队列当前的消费偏移量。然后根据这几个消费队列,创建对应的拉取请求 PullRequest 准备从 Broker 服务器拉取消息,下图显示客户端和broker之间消息队列偏移量的同步过程:
首先,客户端每隔20s会将队列的偏移量同步到本地,然后通过从broker获取的消费偏移量,去broker拉取消息进行消费,在消息消费成功后,会将消费后的偏移量更新到本地内存,但是此时并没有持久化和同步到broker,这个过程通过定时任务,每隔5s将偏移量同步到Broker 服务器端:
public class MQClientInstance {
private void startScheduledTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
MQClientInstance.this.persistAllConsumerOffset();
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
}
}
而维护在 Broker 服务器端的偏移量表也会每隔 5 秒钟序列化到磁盘中:
public class BrokerController {
public boolean initialize() throws CloneNotSupportedException {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
BrokerController.this.consumerOffsetManager.persist();
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
}
}
保存的格式如下所示:
上述整体流程如下所示,红框框住的是这个话题下面的队列的ID,箭头指向的分别是每个队列的消费偏移量:
2.2 广播模式
对于广播模式而言,每个消费队列的偏移量肯定不能存储在Broker服务器端,因为多个消费者对于同一个队列的消费可能不一致,偏移量会互相覆盖掉。因此,在广播模式下,每个客户端的消费偏移量是存储在本地的,然后每隔 5 秒将内存中的 offsetTable持久化到磁盘中。当首次从服务器获取可消费队列的时候,拉取偏移量不像集群模式下是从Broker服务器读取的,而是直接从本地文件中读取的:
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
if (mq != null) {
switch (type) {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY: {
AtomicLong offset = this.offsetTable.get(mq);
if (offset != null) {
return offset.get();
} else if (ReadOffsetType.READ_FROM_MEMORY == type) {
return -1;
}
}
//从磁盘中获取拉取偏移量
case READ_FROM_STORE: {
OffsetSerializeWrapper offsetSerializeWrapper;
try {
//读取本地文件
offsetSerializeWrapper = this.readLocalOffset();
} catch (MQClientException e) {
return -1;
}
if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
if (offset != null) {
this.updateOffset(mq, offset.get(), false);
return offset.get();
}
}
}
default:
break;
}
}
return -1;
}
当消息消费成功后,偏移量的更新也是持久化到本地,而非更新到 Broker 服务器中。由定时调度每5s执行一次持久化存储,存储到磁盘。这里提一下,在广播模式下,消息队列的偏移量默认放在用户目录下的 .rocketmq_offsets 目录下:
//每隔5S持久化消费进度
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
存储格式如下,由于消息存储在本地,所有消息存放对应的brokerName的信息也需要保存:
简要流程图如下:
3 broker处理拉取请求
当客户端发起消息消费请求,请求码为RequestCode.PULL_MESSAGE,对应的处理类为PullMessageProcessor,服务器在收到客户端的请求之后,会根据topic和queueId定位到对应的消费队列。然后根据这条请求传入的offset消费队列偏移量,定位到对应的消费队列文件。偏移量指定的是消费队列文件的消费下限,由于在拉取是已经设置默认拉取32条消息,所以拉取消息的下限为:
final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
有了上限和下限,客户端便会从指定的位置获取到消费队列ConsumeQueue,然后从消费队列文件中取出每个消息的偏移量和消息大小,然后再根据这两个值去 CommitLog 文件中寻找相应的完整的消息,并执行tag过滤,并添加到最后的消息队列中,精简过的代码如下所示:
/**
* 获取消息结果
* @param group Consumer group that launches this query. 消费分组
* @param topic Topic to query. 主题
* @param queueId Queue ID to query. 队列编号
* @param offset Logical offset to start from. 队列位置
* @param maxMsgNums Maximum count of messages to query. 消息数量
* @param messageFilter Message filter used to screen desired messages. 订阅信息
* @return 消息结果
*/
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums,
final MessageFilter messageFilter) {
GetMessageResult getResult = new GetMessageResult();
// 获取消费队列
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
//maxMsgNums 最大拉取条数,默认为32条
// 最大消息长度
final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
// 取消息
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
//commitlog 的偏移量
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
//消息大小
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
// 根据消息的偏移量和消息的大小从 CommitLog 文件中取出一条消息
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
getResult.addMessage(selectResult);
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
}
// 计算下次拉取消息的消息队列编号
// 增加下次开始的偏移量
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long diff = maxOffsetPy - maxPhyOffsetPulling;
}
if (GetMessageStatus.FOUND == status) {
this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
}
getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
return getResult;
}
客户端和 Broker 服务器端完整拉取消息的流程图如下所示: