RocketMQ 消息索引流程

一、消息查询方式

对于 Producer 发送到 Broker 服务器的消息,RocketMQ 支持多种方式来方便地查询消息:

  • 根据查询消息

如下所示,在构建消息的时候,指定了这条消息的为 “OrderID001”:

Message msg =
    new Message("TopicTest",
                "TagA",
                "OrderID001", // Keys
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

那么,当这条消息发送成功后,我们可以使用 queryMsgByKey 命令查询到这条消息的详细信息:

MQAdminStartup.main(new String[] {
        "queryMsgByKey",
        "-n",
        "localhost:9876",
        "-t",
        "TopicTest",
        "-k",
        "OrderID001"
    });
  • 根据(偏移量) ID查询消息

消息在发送成功之后,其返回的SendResult类中包含了这条消息的唯一偏移量 ID(注意此处指的是 offsetMsgId):

sendresult_offset_msg_id

用户可以使用queryMsgById命令查询这条消息的详细信息:

MQAdminStartup.main(new String[] {
        "queryMsgById",
        "-n",
        "localhost:9876",
        "-i",
        "0A6C73D900002A9F0000000000004010"
    });
  • 根据唯一键查询消息

消息在发送成功之后,其返回的SendResult类中包含了这条消息的唯一 ID:

sendresult_msg_id

用户可以使用queryMsgByUniqueKey命令查询这条消息的详细信息:

MQAdminStartup.main(new String[] {
        "queryMsgByUniqueKey",
        "-n",
        "localhost:9876",
        "-i",
        "0A6C73D939B318B4AAC20CBA5D920000",
        "-t",
        "TopicTest"
    });
  • 根据消息队列偏移量查询消息

消息发送成功之后的 SendResult 中还包含了消息队列的其它信息,如消息队列 ID、消息队列偏移量等信息:

SendResult [sendStatus=SEND_OK,
            msgId=0A6C73D93EC518B4AAC20CC4ACD90000,
            offsetMsgId=0A6C73D900002A9F000000000000484E,
            messageQueue=MessageQueue [topic=TopicTest,
                                       brokerName=zk-pc,
                                       queueId=3],
            queueOffset=24]

根据这些信息,使用 queryMsgByOffset 命令也可以查询到这条消息的详细信息:

MQAdminStartup.main(new String[] {
        "queryMsgByOffset",
        "-n",
        "localhost:9876",
        "-t",
        "TopicTest",
        "-b",
        "zk-pc",
        "-i",
        "3",
        "-o",
        "24"
    });

二、(偏移量) ID 查询

二 (1)、生成 ID

(偏移量) ID 是在消息发送到 Broker 服务器存储的时候生成的,其包含如下几个字段:

  • Broker 服务器 IP 地址
  • Broker 服务器 端口号
  • 消息文件 CommitLog 写偏移量

msgid_generated_in_broker


public class CommitLog {

    class DefaultAppendMessageCallback implements AppendMessageCallback {

        public AppendMessageResult doAppend(final long fileFromOffset, /** 其它参数 **/) {
            String msgId = MessageDecoder
                .createMessageId(this.msgIdMemory,
                                 msgInner.getStoreHostBytes(hostHolder),
                                 wroteOffset);
            // ...
        }

    }

}

二 (2)、使用 ID 查询

Admin 端查询的时候,首先对msgId进行解析,取出 Broker 服务器的 IP 、端口号和消息偏移量:public class MessageDecoder {

public class MessageDecoder {

    public static MessageId decodeMessageId(final String msgId)
        throws UnknownHostException {
        byte[] ip = UtilAll.string2bytes(msgId.substring(0, 8));
        byte[] port = UtilAll.string2bytes(msgId.substring(8, 16));
        // offset
        byte[] data = UtilAll.string2bytes(msgId.substring(16, 32));
        // ...
    }

}

获取到偏移量之后,Admin 会对 Broker 服务器发送一个 VIEW_MESSAGE_BY_ID 的请求命令,Broker 服务器在收到请求后,会依据偏移量定位到 CommitLog 文件中的相应位置,然后取出消息,返回给 Admin 端:

public class DefaultMessageStore implements MessageStore {

    @Override
    public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {
        SelectMappedBufferResult sbr = this.commitLog
            .getMessage(commitLogOffset, 4);
        // 1 TOTALSIZE
        int size = sbr.getByteBuffer().getInt();
        return this.commitLog.getMessage(commitLogOffset, size);
    }

}

query_message_by_msg_offset_id

三、消息队列偏移量查询

根据队列偏移量查询是最简单的一种查询方式,Admin 会启动一个PullConsumer,然后利用用户传递给 Admin 的队列 ID、队列偏移量等信息,从服务器拉取一条消息过来:

public class QueryMsgByOffsetSubCommand implements SubCommand {

    @Override
    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
        // 根据参数构建 MessageQueue
        MessageQueue mq = new MessageQueue();
        mq.setTopic(topic);
        mq.setBrokerName(brokerName);
        mq.setQueueId(Integer.parseInt(queueId));

        // 从 Broker 服务器拉取消息
        PullResult pullResult = defaultMQPullConsumer.pull(mq, "*", Long.parseLong(offset), 1);
    }

}

query_msg_by_message_queue_offset

四、消息索引服务

在继续讲解剩下两种查询方式之前,我们必须先介绍以下 Broker 端的消息索引服务

在之前提到过,每当一条消息发送过来之后,其会封装为一个DispatchRequest来下发给各个转发服务,而CommitLogDispatcherBuildIndex构建索引服务便是其中之一:

class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
            DefaultMessageStore.this.indexService.buildIndex(request);
        }
    }

}

四 (1)、索引文件结构

消息的索引信息是存放在磁盘上的,文件以时间戳命名的,默认存放在$HOME/store/index目录下。由下图来看,一个索引文件的结构被分成了三部分:

  • 前 40 个字节存放固定的索引头信息,包含了存放在这个索引文件中的消息的最小/大存储时间、最小/大偏移量等状况
  • 中间一段存储了 500 万个哈希槽位,每个槽内部存储的是索引文件的地址 (索引槽)
  • 最后一段存储了 2000 万个索引内容信息,是实际的索引信息存储的地方。每一个槽位存储了这条消息的键哈希值、存储偏移量、存储时间戳与下一个索引槽地址

indexfile_structure

RocketMQ 在内存中还维护了一个索引文件列表,对于每一个索引文件,前一个文件的最大存储时间是下一个文件的最小存储时间,前一个文件的最大偏移量是下一个文件的最大偏移量。每一个索引文件都索引了在某个时间段内、某个偏移量段内的所有消息,当文件满了,就会用前一个文件的最大偏移量和最大存储时间作为起始值,创建下一个索引文件:

indexfile_list

四 (2)、添加消息

当有新的消息过来后,构建索引服务会取出这条消息的,然后对字符串 “话题#键” 构建索引。构建索引的步骤如下:

  • 找出哈希槽:生成字符串哈希码,取余落到 500W 个槽位之一,并取出其中的值,默认为 0
  • 找出索引槽:IndexHeader维护了indexCount,实际存储的索引槽就是直接依次顺延添加的
  • 存储索引内容:找到索引槽后,放入键哈希值、存储偏移量、存储时间戳与下一个索引槽地址。下一个索引槽地址就是第一步哈希槽中取出的值,0 代表这个槽位是第一次被索引,而不为 0 代表这个槽位之前的索引槽地址。由此,通过索引槽地址可以将相同哈希槽的消息串联起来,像单链表那样。
  • 更新哈希槽:更新原有哈希槽中存储的值

我们以实际例子来说明。假设我们需要依次为键的哈希值为 “{16,29,29,8,16,16}” 这几条消息构建索引,我们在这个地方忽略了索引信息中存储的存储时间和偏移量字段,只是存储键哈希和下一索引槽信息,那么:

  • 放入 16:将 “16|0” 存储在第 1 个索引槽中,并更新哈希槽为 16 的值为 1,即哈希槽为 16 的第一个索引块的地址为 1
  • 放入 29:将 “29|0” 存储在第 2 个索引槽中,并更新哈希槽为 29 的值为 2,即哈希槽为 29 的第一个索引块的地址为 2
  • 放入 29:取出哈希槽为 29 中的值 2,然后将 “29|2” 存储在第 3 个索引槽中,并更新哈希槽为 29 的值为 3,即哈希槽为 29 的第一个索引块的地址为 3。而在找到索引块为 3 的索引信息后,又能取出上一个索引块的地址 2,构成链表为: “[29]->3->2”
  • 放入 8:将 “8|0” 存储在第 4 个索引槽中,并更新哈希槽为 8 的值为 4,即哈希槽为 8 的第一个索引块的地址为 4
  • 放入 16:取出哈希槽为 16 中的值 1,然后将 “16|1” 存储在第 5 个索引槽中,并更新哈希槽为 16 的值为 5。构成链表为: “[16]->5->1”
  • 放入 16:取出哈希槽为 16 中的值 5,然后将 “16|5” 存储在第 6 个索引槽中,并更新哈希槽为 16 的值为 6。构成链表为: “[16]->6->5->1”

整个过程如下图所示:

put_key_example

四 (3)、查询消息

当需要根据来查询消息的时候,其会按照倒序回溯整个索引文件列表,对于每一个在时间上能够匹配用户传入的beginend时间戳参数的索引文件,会一一进行消息查询:

public class IndexService {

    public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
        // 倒序
        for (int i = this.indexFileList.size(); i > 0; i--) {
            // 位于时间段内
            if (f.isTimeMatched(begin, end)) {
                // 消息查询
            }
        }
    }

}

而具体到每一个索引文件,其查询匹配消息的过程如下所示:

  • 确定哈希槽:根据键生成哈希值,定位到哈希槽
  • 定位索引槽:哈希槽中的值存储的就是链表的第一个索引槽地址
  • 遍历索引槽:沿着索引槽地址,依次取出下一个索引槽地址,即沿着链表遍历,直至遇见下一个索引槽地址为非法地址 0 停止
  • 收集偏移量:在遇到匹配的消息之后,会将相应的物理偏移量放到列表中,最后根据物理偏移量,从 CommitLog 文件中取出消息
public class DefaultMessageStore implements MessageStore {

    @Override
    public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {

        for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
            long offset = queryOffsetResult.getPhyOffsets().get(m);
            // 根据偏移量从 CommitLog 文件中取出消息
        }

    }

}

以查询哈希值 16 的消息为例,图示如下:

query_message_hash_16_example

五、唯一键查询消息

五 (1)、构建键

消息的唯一键是在客户端发送消息前构建的:

public class DefaultMQProducerImpl implements MQProducerInner {
    private SendResult sendKernelImpl(final Message msg, /** 其它参数 **/) throws XXXException {
        // ...
        if (!(msg instanceof MessageBatch)) {
            MessageClientIDSetter.setUniqID(msg);
        }
    }
}

创建唯一 ID 的算法:

public class MessageClientIDSetter {

    public static String createUniqID() {
        StringBuilder sb = new StringBuilder(LEN * 2);
        sb.append(FIX_STRING);
        sb.append(UtilAll.bytes2string(createUniqIDBuffer()));
        return sb.toString();
    }

}

唯一键是根据客户端的进程 ID、IP 地址、ClassLoader 哈希码、时间戳、计数器这几个值来生成的一个唯一的键,然后作为这条消息的附属属性发送到 Broker 服务器的:

public class MessageClientIDSetter {

    public static void setUniqID(final Message msg) {
        if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
            msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
        }
    }

}

五 (2)、索引键

当服务器收到客户端发送过来的消息之后,索引服务便会取出客户端生成的uniqKey并为之建立索引,放入到索引文件中:

public class IndexService {

    public void buildIndex(DispatchRequest req) {
        // ...
        if (req.getUniqKey() != null) {
            indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
        }
        // ...
    }

}

五 (3)、使用键查询

客户端在生成消息唯一键的时候,在ByteBuffer的第 11 位到第 14 位放置的是当前的时间当月第一天的时间的毫秒差:

public class MessageClientIDSetter {

    private static byte[] createUniqIDBuffer() {
        long current = System.currentTimeMillis();
        if (current >= nextStartTime) {
            setStartTime(current);
        }

        // 时间差 [当前时间 - 这个月 1 号的时间]
        // putInt 占据的是第 11 位到第 14 位
        buffer.putInt((int) (System.currentTimeMillis() - startTime));
    }

    private synchronized static void setStartTime(long millis) {
        Calendar cal = Calendar.getInstance();
        cal.setTimeInMillis(millis);
        cal.set(Calendar.DAY_OF_MONTH, 1);
        cal.set(Calendar.HOUR_OF_DAY, 0);
        cal.set(Calendar.MINUTE, 0);
        cal.set(Calendar.SECOND, 0);
        cal.set(Calendar.MILLISECOND, 0);
        // 开始时间设置为这个月的 1 号
        startTime = cal.getTimeInMillis();
        // ...
    }

}

我们知道消息索引服务的查询需要用户传入 begin 和 end 这连个时间值,以进行这段时间内的匹配。所以 RocketMQ 为了加速消息的查询,于是在 Admin 端对特定 ID 进行查询的时候,首先取出了这段时间差值,然后与当月时间进行相加得到 begin 时间值:

public class MessageClientIDSetter {

    public static Date getNearlyTimeFromID(String msgID) {
        ByteBuffer buf = ByteBuffer.allocate(8);
        byte[] bytes = UtilAll.string2bytes(msgID);
        buf.put((byte) 0);
        buf.put((byte) 0);
        buf.put((byte) 0);
        buf.put((byte) 0);
        // 取出第 11 位到 14 位
        buf.put(bytes, 10, 4);

        buf.position(0);
        // 得到时间差值
        long spanMS = buf.getLong();

        Calendar cal = Calendar.getInstance();
        long now = cal.getTimeInMillis();
        cal.set(Calendar.DAY_OF_MONTH, 1);
        cal.set(Calendar.HOUR_OF_DAY, 0);
        cal.set(Calendar.MINUTE, 0);
        cal.set(Calendar.SECOND, 0);
        cal.set(Calendar.MILLISECOND, 0);
        long monStartTime = cal.getTimeInMillis();
        if (monStartTime + spanMS >= now) {
            cal.add(Calendar.MONTH, -1);
            monStartTime = cal.getTimeInMillis();
        }
        // 设置为这个月(或者上个月) + 时间差值
        cal.setTimeInMillis(monStartTime + spanMS);
        return cal.getTime();
    }

}

由于发送消息的客户端和查询消息的 Admin 端可能不在一台服务器上,而且从函数的命名getNearlyTimeFromID与上述实现来看,Admin 端的时间戳得到的是一个近似起始值,它尽可能地加速用户的查询。而且太旧的消息(超过一个月的消息)是查询不到的。

begin时间戳确定以后,Admin 便会将其它必要的信息如话题Key等信息封装到QUERY_MESSAGE的包中,然后向 Broker 服务器传递这个请求,来进行消息的查询。Broker 服务器在获取到这个查询消息的请求后,便会根据Key从索引文件中查询符合的消息,最终返回到 Admin 端。

六、键查询消息

六 (1)、构建键

我们提到过,在发送消息的时候,可以填充一个keys的值,这个值将会作为消息的一个属性被发送到 Broker 服务器上:

public class Message implements Serializable {

    public void setKeys(String keys) {
        this.putProperty(MessageConst.PROPERTY_KEYS, keys);
    }

}

六 (2)、索引键

当服务器收到客户端发送过来的消息之后,索引服务便会取出这条消息的keys并将其用空格进行分割,分割后的每一个字符串都会作为一个单独的,创建索引,放入到索引文件中:

public class IndexService {

    public void buildIndex(DispatchRequest req) {
        // ...
        if (keys != null && keys.length() > 0) {
            // 使用空格进行分割
            String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
            for (int i = 0; i < keyset.length; i++) {
                String key = keyset[i];
                if (key.length() > 0) {
                    indexFile = putKey(indexFile, msg, buildKey(topic, key));
                }
            }
        }
    }

}

由此我们也可以得知,keys键的设置通过使用空格分割字符串,一条消息可以指定多个键。

六 (3)、使用键查询

keys键查询的方式也是通过将参数封装为QUERY_MESSAGE请求包中去请求服务器返回相应的信息。由于键本身不能和时间戳相关联,因此begin值设置的是 0,这是和第五节的不同之处:

public class QueryMsgByKeySubCommand implements SubCommand {

    private void queryByKey(final DefaultMQAdminExt admin, final String topic, final String key)
        throws MQClientException, InterruptedException {
        // begin: 0
        // end: Long.MAX_VALUE
        QueryResult queryResult = admin.queryMessage(topic, key, 64, 0, Long.MAX_VALUE);
    }

}

results matching ""

    No results matching ""