RocketMQ 消息索引流程
一、消息查询方式
对于 Producer 发送到 Broker 服务器的消息,RocketMQ 支持多种方式来方便地查询消息:
- 根据键查询消息
如下所示,在构建消息的时候,指定了这条消息的键为 “OrderID001”:
Message msg =
new Message("TopicTest",
"TagA",
"OrderID001", // Keys
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
那么,当这条消息发送成功后,我们可以使用 queryMsgByKey 命令查询到这条消息的详细信息:
MQAdminStartup.main(new String[] {
"queryMsgByKey",
"-n",
"localhost:9876",
"-t",
"TopicTest",
"-k",
"OrderID001"
});
- 根据(偏移量) ID查询消息
消息在发送成功之后,其返回的SendResult
类中包含了这条消息的唯一偏移量 ID(注意此处指的是 offsetMsgId):
用户可以使用queryMsgById
命令查询这条消息的详细信息:
MQAdminStartup.main(new String[] {
"queryMsgById",
"-n",
"localhost:9876",
"-i",
"0A6C73D900002A9F0000000000004010"
});
- 根据唯一键查询消息
消息在发送成功之后,其返回的SendResult
类中包含了这条消息的唯一 ID:
用户可以使用queryMsgByUniqueKey
命令查询这条消息的详细信息:
MQAdminStartup.main(new String[] {
"queryMsgByUniqueKey",
"-n",
"localhost:9876",
"-i",
"0A6C73D939B318B4AAC20CBA5D920000",
"-t",
"TopicTest"
});
- 根据消息队列偏移量查询消息
消息发送成功之后的 SendResult 中还包含了消息队列的其它信息,如消息队列 ID、消息队列偏移量等信息:
SendResult [sendStatus=SEND_OK,
msgId=0A6C73D93EC518B4AAC20CC4ACD90000,
offsetMsgId=0A6C73D900002A9F000000000000484E,
messageQueue=MessageQueue [topic=TopicTest,
brokerName=zk-pc,
queueId=3],
queueOffset=24]
根据这些信息,使用 queryMsgByOffset 命令也可以查询到这条消息的详细信息:
MQAdminStartup.main(new String[] {
"queryMsgByOffset",
"-n",
"localhost:9876",
"-t",
"TopicTest",
"-b",
"zk-pc",
"-i",
"3",
"-o",
"24"
});
二、(偏移量) ID 查询
二 (1)、生成 ID
(偏移量) ID 是在消息发送到 Broker 服务器存储的时候生成的,其包含如下几个字段:
- Broker 服务器 IP 地址
- Broker 服务器 端口号
- 消息文件 CommitLog 写偏移量
public class CommitLog {
class DefaultAppendMessageCallback implements AppendMessageCallback {
public AppendMessageResult doAppend(final long fileFromOffset, /** 其它参数 **/) {
String msgId = MessageDecoder
.createMessageId(this.msgIdMemory,
msgInner.getStoreHostBytes(hostHolder),
wroteOffset);
// ...
}
}
}
二 (2)、使用 ID 查询
Admin 端查询的时候,首先对msgId
进行解析,取出 Broker 服务器的 IP 、端口号和消息偏移量:public class MessageDecoder {
public class MessageDecoder {
public static MessageId decodeMessageId(final String msgId)
throws UnknownHostException {
byte[] ip = UtilAll.string2bytes(msgId.substring(0, 8));
byte[] port = UtilAll.string2bytes(msgId.substring(8, 16));
// offset
byte[] data = UtilAll.string2bytes(msgId.substring(16, 32));
// ...
}
}
获取到偏移量之后,Admin 会对 Broker 服务器发送一个 VIEW_MESSAGE_BY_ID 的请求命令,Broker 服务器在收到请求后,会依据偏移量定位到 CommitLog 文件中的相应位置,然后取出消息,返回给 Admin 端:
public class DefaultMessageStore implements MessageStore {
@Override
public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {
SelectMappedBufferResult sbr = this.commitLog
.getMessage(commitLogOffset, 4);
// 1 TOTALSIZE
int size = sbr.getByteBuffer().getInt();
return this.commitLog.getMessage(commitLogOffset, size);
}
}
query_message_by_msg_offset_id
三、消息队列偏移量查询
根据队列偏移量查询是最简单的一种查询方式,Admin 会启动一个PullConsumer
,然后利用用户传递给 Admin 的队列 ID、队列偏移量等信息,从服务器拉取一条消息过来:
public class QueryMsgByOffsetSubCommand implements SubCommand {
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
// 根据参数构建 MessageQueue
MessageQueue mq = new MessageQueue();
mq.setTopic(topic);
mq.setBrokerName(brokerName);
mq.setQueueId(Integer.parseInt(queueId));
// 从 Broker 服务器拉取消息
PullResult pullResult = defaultMQPullConsumer.pull(mq, "*", Long.parseLong(offset), 1);
}
}
query_msg_by_message_queue_offset
四、消息索引服务
在继续讲解剩下两种查询方式之前,我们必须先介绍以下 Broker 端的消息索引服务。
在之前提到过,每当一条消息发送过来之后,其会封装为一个DispatchRequest
来下发给各个转发服务,而CommitLogDispatcherBuildIndex
构建索引服务便是其中之一:
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
}
四 (1)、索引文件结构
消息的索引信息是存放在磁盘上的,文件以时间戳命名的,默认存放在$HOME/store/index
目录下。由下图来看,一个索引文件的结构被分成了三部分:
- 前 40 个字节存放固定的索引头信息,包含了存放在这个索引文件中的消息的最小/大存储时间、最小/大偏移量等状况
- 中间一段存储了 500 万个哈希槽位,每个槽内部存储的是索引文件的地址 (索引槽)
- 最后一段存储了 2000 万个索引内容信息,是实际的索引信息存储的地方。每一个槽位存储了这条消息的键哈希值、存储偏移量、存储时间戳与下一个索引槽地址
RocketMQ 在内存中还维护了一个索引文件列表,对于每一个索引文件,前一个文件的最大存储时间是下一个文件的最小存储时间,前一个文件的最大偏移量是下一个文件的最大偏移量。每一个索引文件都索引了在某个时间段内、某个偏移量段内的所有消息,当文件满了,就会用前一个文件的最大偏移量和最大存储时间作为起始值,创建下一个索引文件:
四 (2)、添加消息
当有新的消息过来后,构建索引服务会取出这条消息的键,然后对字符串 “话题#键” 构建索引。构建索引的步骤如下:
- 找出哈希槽:生成字符串哈希码,取余落到 500W 个槽位之一,并取出其中的值,默认为 0
- 找出索引槽:
IndexHeader
维护了indexCount
,实际存储的索引槽就是直接依次顺延添加的 - 存储索引内容:找到索引槽后,放入键哈希值、存储偏移量、存储时间戳与下一个索引槽地址。下一个索引槽地址就是第一步哈希槽中取出的值,0 代表这个槽位是第一次被索引,而不为 0 代表这个槽位之前的索引槽地址。由此,通过索引槽地址可以将相同哈希槽的消息串联起来,像单链表那样。
- 更新哈希槽:更新原有哈希槽中存储的值
我们以实际例子来说明。假设我们需要依次为键的哈希值为 “{16,29,29,8,16,16}” 这几条消息构建索引,我们在这个地方忽略了索引信息中存储的存储时间和偏移量字段,只是存储键哈希和下一索引槽信息,那么:
- 放入 16:将 “16|0” 存储在第 1 个索引槽中,并更新哈希槽为 16 的值为 1,即哈希槽为 16 的第一个索引块的地址为 1
- 放入 29:将 “29|0” 存储在第 2 个索引槽中,并更新哈希槽为 29 的值为 2,即哈希槽为 29 的第一个索引块的地址为 2
- 放入 29:取出哈希槽为 29 中的值 2,然后将 “29|2” 存储在第 3 个索引槽中,并更新哈希槽为 29 的值为 3,即哈希槽为 29 的第一个索引块的地址为 3。而在找到索引块为 3 的索引信息后,又能取出上一个索引块的地址 2,构成链表为: “[29]->3->2”
- 放入 8:将 “8|0” 存储在第 4 个索引槽中,并更新哈希槽为 8 的值为 4,即哈希槽为 8 的第一个索引块的地址为 4
- 放入 16:取出哈希槽为 16 中的值 1,然后将 “16|1” 存储在第 5 个索引槽中,并更新哈希槽为 16 的值为 5。构成链表为: “[16]->5->1”
- 放入 16:取出哈希槽为 16 中的值 5,然后将 “16|5” 存储在第 6 个索引槽中,并更新哈希槽为 16 的值为 6。构成链表为: “[16]->6->5->1”
整个过程如下图所示:
四 (3)、查询消息
当需要根据键来查询消息的时候,其会按照倒序回溯整个索引文件列表,对于每一个在时间上能够匹配用户传入的begin
和end
时间戳参数的索引文件,会一一进行消息查询:
public class IndexService {
public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
// 倒序
for (int i = this.indexFileList.size(); i > 0; i--) {
// 位于时间段内
if (f.isTimeMatched(begin, end)) {
// 消息查询
}
}
}
}
而具体到每一个索引文件,其查询匹配消息的过程如下所示:
- 确定哈希槽:根据键生成哈希值,定位到哈希槽
- 定位索引槽:哈希槽中的值存储的就是链表的第一个索引槽地址
- 遍历索引槽:沿着索引槽地址,依次取出下一个索引槽地址,即沿着链表遍历,直至遇见下一个索引槽地址为非法地址 0 停止
- 收集偏移量:在遇到匹配的消息之后,会将相应的物理偏移量放到列表中,最后根据物理偏移量,从 CommitLog 文件中取出消息
public class DefaultMessageStore implements MessageStore {
@Override
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
long offset = queryOffsetResult.getPhyOffsets().get(m);
// 根据偏移量从 CommitLog 文件中取出消息
}
}
}
以查询哈希值 16 的消息为例,图示如下:
五、唯一键查询消息
五 (1)、构建键
消息的唯一键是在客户端发送消息前构建的:
public class DefaultMQProducerImpl implements MQProducerInner {
private SendResult sendKernelImpl(final Message msg, /** 其它参数 **/) throws XXXException {
// ...
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
}
}
创建唯一 ID 的算法:
public class MessageClientIDSetter {
public static String createUniqID() {
StringBuilder sb = new StringBuilder(LEN * 2);
sb.append(FIX_STRING);
sb.append(UtilAll.bytes2string(createUniqIDBuffer()));
return sb.toString();
}
}
唯一键是根据客户端的进程 ID、IP 地址、ClassLoader 哈希码、时间戳、计数器这几个值来生成的一个唯一的键,然后作为这条消息的附属属性发送到 Broker 服务器的:
public class MessageClientIDSetter {
public static void setUniqID(final Message msg) {
if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
}
}
}
五 (2)、索引键
当服务器收到客户端发送过来的消息之后,索引服务便会取出客户端生成的uniqKey
并为之建立索引,放入到索引文件中:
public class IndexService {
public void buildIndex(DispatchRequest req) {
// ...
if (req.getUniqKey() != null) {
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
}
// ...
}
}
五 (3)、使用键查询
客户端在生成消息唯一键的时候,在ByteBuffer
的第 11 位到第 14 位放置的是当前的时间与当月第一天的时间的毫秒差:
public class MessageClientIDSetter {
private static byte[] createUniqIDBuffer() {
long current = System.currentTimeMillis();
if (current >= nextStartTime) {
setStartTime(current);
}
// 时间差 [当前时间 - 这个月 1 号的时间]
// putInt 占据的是第 11 位到第 14 位
buffer.putInt((int) (System.currentTimeMillis() - startTime));
}
private synchronized static void setStartTime(long millis) {
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(millis);
cal.set(Calendar.DAY_OF_MONTH, 1);
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
// 开始时间设置为这个月的 1 号
startTime = cal.getTimeInMillis();
// ...
}
}
我们知道消息索引服务的查询需要用户传入 begin 和 end 这连个时间值,以进行这段时间内的匹配。所以 RocketMQ 为了加速消息的查询,于是在 Admin 端对特定 ID 进行查询的时候,首先取出了这段时间差值,然后与当月时间进行相加得到 begin 时间值:
public class MessageClientIDSetter {
public static Date getNearlyTimeFromID(String msgID) {
ByteBuffer buf = ByteBuffer.allocate(8);
byte[] bytes = UtilAll.string2bytes(msgID);
buf.put((byte) 0);
buf.put((byte) 0);
buf.put((byte) 0);
buf.put((byte) 0);
// 取出第 11 位到 14 位
buf.put(bytes, 10, 4);
buf.position(0);
// 得到时间差值
long spanMS = buf.getLong();
Calendar cal = Calendar.getInstance();
long now = cal.getTimeInMillis();
cal.set(Calendar.DAY_OF_MONTH, 1);
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
long monStartTime = cal.getTimeInMillis();
if (monStartTime + spanMS >= now) {
cal.add(Calendar.MONTH, -1);
monStartTime = cal.getTimeInMillis();
}
// 设置为这个月(或者上个月) + 时间差值
cal.setTimeInMillis(monStartTime + spanMS);
return cal.getTime();
}
}
由于发送消息的客户端和查询消息的 Admin 端可能不在一台服务器上,而且从函数的命名getNearlyTimeFromID
与上述实现来看,Admin 端的时间戳得到的是一个近似起始值,它尽可能地加速用户的查询。而且太旧的消息(超过一个月的消息)是查询不到的。
当begin
时间戳确定以后,Admin 便会将其它必要的信息如话题、Key等信息封装到QUERY_MESSAGE
的包中,然后向 Broker 服务器传递这个请求,来进行消息的查询。Broker 服务器在获取到这个查询消息的请求后,便会根据Key
从索引文件中查询符合的消息,最终返回到 Admin 端。
六、键查询消息
六 (1)、构建键
我们提到过,在发送消息的时候,可以填充一个keys
的值,这个值将会作为消息的一个属性被发送到 Broker 服务器上:
public class Message implements Serializable {
public void setKeys(String keys) {
this.putProperty(MessageConst.PROPERTY_KEYS, keys);
}
}
六 (2)、索引键
当服务器收到客户端发送过来的消息之后,索引服务便会取出这条消息的keys
并将其用空格进行分割,分割后的每一个字符串都会作为一个单独的键,创建索引,放入到索引文件中:
public class IndexService {
public void buildIndex(DispatchRequest req) {
// ...
if (keys != null && keys.length() > 0) {
// 使用空格进行分割
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));
}
}
}
}
}
由此我们也可以得知,keys
键的设置通过使用空格分割字符串,一条消息可以指定多个键。
六 (3)、使用键查询
keys
键查询的方式也是通过将参数封装为QUERY_MESSAGE
请求包中去请求服务器返回相应的信息。由于键本身不能和时间戳相关联,因此begin
值设置的是 0,这是和第五节的不同之处:
public class QueryMsgByKeySubCommand implements SubCommand {
private void queryByKey(final DefaultMQAdminExt admin, final String topic, final String key)
throws MQClientException, InterruptedException {
// begin: 0
// end: Long.MAX_VALUE
QueryResult queryResult = admin.queryMessage(topic, key, 64, 0, Long.MAX_VALUE);
}
}