1 索引查询方式

当消息存放在CommitLog文件中时,RocketMq支持多种查询方式,主要分为如下几类:

1.1 根据主键keys查询

整个流程主要由QueryMsgByKeySubCommand类开始,一般发送消息时,会指定一个keys,

查询入口为:

QueryResult queryResult = admin.queryMessage(topic, key, 64, 0, Long.MAX_VALUE);

1.2 根据偏移量id查询

整个流程主要由QueryMsgByIdSubCommand类开始,当消息发送成功后,一般会返回相关信息给生产者,SendResultjie结果中包含了可以查询已经发送成功的消息信息。 此处输入图片的描述 用户可以使用 queryMsgById 命令查询这条消息的详细信息:

MQAdminStartup.main(new String[] {
        "queryMsgById",
        "-n",
        "localhost:9876",
        "-i",
        "0A6C73D900002A9F0000000000004010"
    });

1.3 根据消费队列和队列偏移量查询

主要由QueryMsgByOffsetSubCommand类处理,消息发送成功之后的SendResult中还包含了消息队列的其它信息,如消息队列 ID、消息队列偏移量等信息:

SendResult [sendStatus=SEND_OK,
            msgId=0A6C73D93EC518B4AAC20CC4ACD90000,
            offsetMsgId=0A6C73D900002A9F000000000000484E,
            messageQueue=MessageQueue [topic=TopicTest,
                                       brokerName=zk-pc,
                                       queueId=3],
            queueOffset=24]

根据这些信息,使用queryMsgByOffset命令也可以查询到这条消息的详细信息:

MQAdminStartup.main(new String[] {
        "queryMsgByOffset",
        "-n",
        "localhost:9876",
        "-t",
        "TopicTest",
        "-b",
        "zk-pc",
        "-i",
        "3",
        "-o",
        "24"
    });

1.4 根据唯一键查询

主要由QueryMsgByUniqueKeySubCommand类实现,消息在发送成功之后,其返回的SendResult类中包含了这条消息的唯一 ID: 此处输入图片的描述 用户可以使用 queryMsgByUniqueKey 命令查询这条消息的详细信息:

MQAdminStartup.main(new String[] {
        "queryMsgByUniqueKey",
        "-n",
        "localhost:9876",
        "-i",
        "0A6C73D939B318B4AAC20CBA5D920000",
        "-t",
        "TopicTest"
    });

上面不同方式查询消息的过程,需要使用到索引文件,下面介绍一下索引文件是如何创建的。

2 索引文件

在创建ConsumeQueue时,同时也会创建索引文件,下面单独看一下索引文件是如何创建的。

    class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

        @Override
        public void dispatch(DispatchRequest request) {
            // 建立 索引信息 到 IndexFile
            if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
                DefaultMessageStore.this.indexService.buildIndex(request);
            }
        }
    }

2.1 索引文件结构

消息的索引信息是存放在磁盘上的,文件以时间戳命名的,默认存放在 $HOME/store/index 目录下。由下图来看,一个索引文件的结构被分成了三部分:

  • 前 40 个字节存放固定的索引头信息,包含了存放在这个索引文件中的消息的最小/大存储时间、最小/大偏移量、哈希槽使用个数和索引槽使用个数等状况
  • 中间一段存储了 500 万个哈希槽位,每个槽内部存储的是索引文件的地址 (索引槽)
  • 最后一段存储了 2000万个索引内容信息,是实际的索引信息存储的地方。每一个槽位存储了这条消息的键哈希值、存储偏移量、存储时间戳与下一个索引槽地址 此处输入图片的描述 上面就是索引文件的结构,RocketMQ使用一个索引文件列表来管理所有创建的索引文件,每次将上一个indexFile的最后更新时间和最后偏移量作为下一个创建的indexFile文件的开始时间和开始偏移量。这样对于每一个索引文件,前一个文件的最大存储时间是下一个文件的最小存储时间,前一个文件的最大偏移量是下一个文件的最大偏移量。每一个索引文件都索引了在某个时间段内、某个偏移量段内的所有消息,当文件满了,就会用前一个文件的最大偏移量和最大存储时间作为起始值,创建下一个索引文件: 此处输入图片的描述 下面看一下索引文件的创建过程。

    2.2 索引文件创建

    索引文件的创建主要由buildIndex完成,下面看一下该流程:

    public void buildIndex(DispatchRequest req) {
         //获取一个索引文件
         IndexFile indexFile = retryGetAndCreateIndexFile();
         if (indexFile != null) {
             long endPhyOffset = indexFile.getEndPhyOffset();
             DispatchRequest msg = req;
             String topic = msg.getTopic();
             String keys = msg.getKeys();
             if (msg.getCommitLogOffset() < endPhyOffset) {
                 //如果已经建立所以的偏移量大于当前请求要创建的偏移量,直接返回,说明当前请求的索引已经创建过
                 return;
             }
    
             final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
             switch (tranType) {
                 case MessageSysFlag.TRANSACTION_NOT_TYPE:
                 case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                 case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                     break;
                 case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                     return;
             }
             //通过唯一键建立索引,用于后续查询
             if (req.getUniqKey() != null) {
                 indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
                 if (indexFile == null) {
                     log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                     return;
                 }
             }
             //通过主键keys创建索引,用于后续查询
             if (keys != null && keys.length() > 0) {
                 String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
                 for (int i = 0; i < keyset.length; i++) {
                     String key = keyset[i];
                     if (key.length() > 0) {
                         indexFile = putKey(indexFile, msg, buildKey(topic, key));
                         if (indexFile == null) {
                             log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                             return;
                         }
                     }
                 }
             }
         } else {
             log.error("build index error, stop building index");
         }
     }
    

    流程比较简单,首先会获取一个indexFile文件,然后判断当前请求是否需要创建索引,如果需要创建索引,则又分为两种形式的所以,一种是通过请求设置的唯一键创建索引;另一种则是通过主键keys创建索引。后续如果使用唯一键或者主键keys来查询对应的消息,则可以通过创建的索引文件查询。 其实,对于使用唯一键或者主键keys来创建索引并没有区别,都是调用putKey来创建索引。 下面先通过一个例子看一下putKey的整体流程,后续再详细分析代码:

当有新的消息过来后,构建索引服务会取出这条消息的键,然后对字符串“话题#键” 构建索引,其中的键可以是唯一键或者主键keys。构建索引的步骤如下:

  • 找出哈希槽:生成字符串哈希码,取余落到500W个槽位之一,并取出其中的值,默认为 0
  • 找出索引槽:IndexHeader维护了indexCount,每次让入索引槽都是顺序添加的,indexCount就是下一次需要放入到索引槽中数据的位置,通过indexCount,就可以获取这次应该将数据放在索引槽的位置
  • 存储索引内容:找到索引槽后,放入键哈希值、存储偏移量、存储时间戳与下一个索引槽地址。下一个索引槽地址就是第一步哈希槽中取出的值,0 代表这个槽位是第一次被索引,而不为0代表这个槽位之前的索引槽地址。由此,通过索引槽地址可以将相同哈希槽的消息串联起来,像单链表那样。
  • 更新哈希槽:更新原有哈希槽中存储的值,就是当前索引槽的位置信息
  • 更新IndexHeader头信息:增加哈希槽和索引槽数量,同时更新最后更新时间和偏移量,

我们以实际例子来说明。假设我们需要依次为键的哈希值为 “{16,29,29,8,16,16}” 这几条消息构建索引,我们在这个地方忽略了索引信息中存储的存储时间和偏移量字段,只是存储键哈希和下一索引槽信息,那么:

  • 放入 16:将 “16|0” 存储在第 1 个索引槽中,并更新哈希槽为 16 的值为 1,即哈希槽为 16 的第一个索引块的地址为 1
  • 放入 29:将 “29|0” 存储在第 2 个索引槽中,并更新哈希槽为 29 的值为 2,即哈希槽为 29 的第一个索引块的地址为 2
  • 放入 29:取出哈希槽为 29 中的值 2,然后将 “29|2” 存储在第 3 个索引槽中,并更新哈希槽为 29 的值为 3,即哈希槽为 29 的第一个索引块的地址为 3。而在找到索引块为 3 的索引信息后,又能取出上一个索引块的地址 2,构成链表为: “[29]->3->2”
  • 放入 8:将 “8|0” 存储在第 4 个索引槽中,并更新哈希槽为 8 的值为 4,即哈希槽为 8 的第一个索引块的地址为 4
  • 放入 16:取出哈希槽为 16 中的值 1,然后将 “16|1” 存储在第 5 个索引槽中,并更新哈希槽为 16 的值为 5。构成链表为: “[16]->5->1”
  • 放入 16:取出哈希槽为 16 中的值 5,然后将 “16|5” 存储在第 6 个索引槽中,并更新哈希槽为 16 的值为 6。构成链表为: “[16]->6->5->1” 整个过程如下图所示: 此处输入图片的描述 核心代码流程如下:

    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
         if (this.indexHeader.getIndexCount() < this.indexNum) {
             int keyHash = indexKeyHashMethod(key);
             int slotPos = keyHash % this.hashSlotNum;
             int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
    
             FileLock fileLock = null;
    
             try {
    
                 // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
                 // false);
                 int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                 if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                     slotValue = invalidIndex;
                 }
    
                 long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
    
                 timeDiff = timeDiff / 1000;
    
                 if (this.indexHeader.getBeginTimestamp() <= 0) {
                     timeDiff = 0;
                 } else if (timeDiff > Integer.MAX_VALUE) {
                     timeDiff = Integer.MAX_VALUE;
                 } else if (timeDiff < 0) {
                     timeDiff = 0;
                 }
    
                 int absIndexPos =
                     IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                         + this.indexHeader.getIndexCount() * indexSize;
                 //存放键哈希值
                 this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                 //存放物理偏移量
                 this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                 //存放时间戳
                 this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                 //存放下一个索引槽地址,同一hash值的key就是通过该索引槽找到的
                 this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
                 //将索引槽的index存放在哈希槽中,通过哈希值对应的哈希槽就可以获取索引槽,从索引槽就可以找到一系列哈希值相等的key
                 this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
    
                 if (this.indexHeader.getIndexCount() <= 1) {
                     this.indexHeader.setBeginPhyOffset(phyOffset);
                     this.indexHeader.setBeginTimestamp(storeTimestamp);
                 }
                 //增加已使用的哈希槽的数量
                 this.indexHeader.incHashSlotCount();
                 //增加已使用的索引槽的数量
                 this.indexHeader.incIndexCount();
                 //设置最后commitlog文件的物理偏移量
                 this.indexHeader.setEndPhyOffset(phyOffset);
                 //设置最后存储时间
                 this.indexHeader.setEndTimestamp(storeTimestamp);
    
                 return true;
             } catch (Exception e) {
                 log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
             } finally {
                 if (fileLock != null) {
                     try {
                         fileLock.release();
                     } catch (IOException e) {
                         log.error("Failed to release the lock", e);
                     }
                 }
             }
         } else {
             log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                 + "; index max num = " + this.indexNum);
         }
    
         return false;
     }
    

    2.3 索引文件查询

    当需要根据键来查询消息的时候,其会按照倒序回溯整个索引文件列表,对于每一个在时间上能够匹配用户传入的begin和end时间戳参数的索引文件,会一一进行消息查询: ```java public class IndexService {

    public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {

     // 倒序
     for (int i = this.indexFileList.size(); i > 0; i--) {
         // 位于时间段内
         if (f.isTimeMatched(begin, end)) {
             // 消息查询
         }
     }
    

    }

}

而具体到每一个索引文件,其查询匹配消息的过程如下所示:

 - 确定哈希槽:根据键生成哈希值,定位到哈希槽
 - 定位索引槽:哈希槽中的值存储的就是链表的第一个索引槽地址
 - 遍历索引槽:沿着索引槽地址,依次取出下一个索引槽地址,即沿着链表遍历,直至遇见下一个索引槽地址为非法地址 0 停止
 - 收集偏移量:在遇到匹配的消息之后,会将相应的物理偏移量放到列表中,最后根据物理偏移量,从 CommitLog 文件中取出消息

```java
    public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
        final long begin, final long end, boolean lock) {
        if (this.mappedFile.hold()) {
            //计算哈希值
            int keyHash = indexKeyHashMethod(key);
            //计算哈希槽
            int slotPos = keyHash % this.hashSlotNum;
            //绝对哈希槽
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            FileLock fileLock = null;
            try {
                //获取索引槽
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);

                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                    || this.indexHeader.getIndexCount() <= 1) {
                } else {
                    //获取哈希值相同的所有的链表值
                    for (int nextIndexToRead = slotValue; ; ) {
                        if (phyOffsets.size() >= maxNum) {
                            break;
                        }
                        //获取索引槽的绝对偏移量
                        int absIndexPos =
                            IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                                + nextIndexToRead * indexSize;

                        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);

                        long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                        int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);

                        if (timeDiff < 0) {
                            break;
                        }

                        timeDiff *= 1000L;

                        long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                        boolean timeMatched = (timeRead >= begin) && (timeRead <= end);

                        if (keyHash == keyHashRead && timeMatched) {
                            phyOffsets.add(phyOffsetRead);
                        }

                        if (prevIndexRead <= invalidIndex
                            || prevIndexRead > this.indexHeader.getIndexCount()
                            || prevIndexRead == nextIndexToRead || timeRead < begin) {
                            break;
                        }

                        nextIndexToRead = prevIndexRead;
                    }
                }
            }
        }
    }

以查询哈希值 16 的消息为例,图示如下: 此处输入图片的描述 先通过哈希值计算出对应的slot,由于key的hash值不同但模数相同,所以在查询时会比较一次key的hash值,然后加入返回列表,每次最多返回32条索引信息。这里需要注意,由于hash值相同但key不等下产生的相同slot,也会被返回给客户端,所以在客户端又进行了一次处理。

3 消息查询

对于消息查询,有4种方式,其中,用偏移量id和消费队列及偏移量查询,并不需要使用到索引文件,下面分别介绍这四种方式的消息查询流程。

3.1 通过偏移量id查询

偏移量id其实就是这消息发送成功后,broker返回给消息发送者的。ID 是在消息发送到 Broker 服务器存储的时候生成的,其包含如下几个字段:

  • Broker 服务器 IP 地址
  • Broker 服务器端口号
  • 消息文件 CommitLog 写偏移量 此处输入图片的描述 下面看一下broker是如何创建,然后返回给消息生产者的:

    public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
    
     //消息id的产生
     String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
    }
    

    其实msgId就是由broker的地址和端口以及写入commitLog文件的偏移量组成。然后,通过response返回给客户端,在客户端进行如下处理,然后返回给消息生产者:

    private SendResult processSendResponse(
         final String brokerName,
         final Message msg,
         final RemotingCommand response
     ) throws MQBrokerException, RemotingCommandException {
         //客户端自己产生,唯一键id
         String uniqMsgId = MessageClientIDSetter.getUniqID(msg);
         //组装返回结果
         SendResult sendResult = new SendResult(sendStatus,
                     uniqMsgId,
                     responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
     }
    

    其中,uniqMsgId赋值给msgId,而responseHeader.getMsgId()则赋值给offsetMsgId。 所以如果我们可以通过offsetMsgId来直接查询存储在commitLog文件中的消息。 下面看一下通过查询流程:

     public MessageExt viewMessage(
         String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    
         MessageId messageId = null;
         try {
             //通过offsetMsgId解析出broker的信息和对应的commitLog文件中消息的偏移量
             messageId = MessageDecoder.decodeMessageId(msgId);
         } catch (Exception e) {
             throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message.");
         }
         //发送请求查询消息
         return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),
             messageId.getOffset(), timeoutMillis);
     }
    

    获取到偏移量之后,Admin 会对 Broker 服务器发送一个VIEW_MESSAGE_BY_ID 的请求命令,Broker 服务器在收到请求后,会依据偏移量定位到 CommitLog 文件中的相应位置,然后取出消息,返回给 Admin 端:

    public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request)
         throws RemotingCommandException {
         ...
         //通过磁盘觉得偏移量,查询磁盘中消息
         final SelectMappedBufferResult selectMappedBufferResult =
             this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());
         ...
         return response;
     }
    

    此处输入图片的描述

    3.2 通过消息队列和消息队列偏移量查询

    根据队列偏移量查询是最简单的一种查询方式,Admin会启动一个PullConsumer ,然后利用用户传递给Admin的队列ID、队列偏移量等信息,从服务器拉取一条消息过来:

    @Override
    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
     MessageQueue mq = new MessageQueue();
     mq.setTopic(topic);
     mq.setBrokerName(brokerName);
     mq.setQueueId(Integer.parseInt(queueId));
    
     defaultMQPullConsumer.start();
     defaultMQAdminExt.start();
    
     PullResult pullResult = defaultMQPullConsumer.pull(mq, "*", Long.parseLong(offset), 1);
    }
    

    此处输入图片的描述 和消费者拉取消息消费一样,通过给定参数找到对应的ConsumeQueue队列,然后通过偏移量,获取到队列的一条记录,从而获取到commitLog的偏移量,拉取消息。 上面介绍的两种方式都不需要使用到indexFile文件,下面说明的查询方式则需要借助前面创建的indexFile文件进行查询。

    3.3 通过唯一键查询

    唯一键是根据客户端的进程ID、IP地址、ClassLoader哈希码、时间戳、计数器这几个值来生成的一个唯一的键,然后作为这条消息的附属属性发送到 Broker 服务器的:

     public static String createUniqID() {
         StringBuilder sb = new StringBuilder(LEN * 2);
         sb.append(FIX_STRING);
         sb.append(UtilAll.bytes2string(createUniqIDBuffer()));
         return sb.toString();
     }
    

    下面看一下通过唯一键查询的流程,首先会将msgId作为偏移量id来查询,如果查询没有结果,则再调用queryMessageByUniqKey,使用唯一键id查询:

     @Override
     public MessageExt viewMessage(String topic,
         String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         try {
             MessageDecoder.decodeMessageId(msgId);
             return this.viewMessage(msgId);
         } catch (Exception e) {
             log.warn("the msgId maybe created by new client. msgId={}", msgId, e);
         }
         //通过唯一键查询
         return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, msgId);
     }
    

    客户端在生成消息唯一键的时候,在 ByteBuffer 的第 11 位到第 14 位放置的是当前的时间与当月第一天的时间的毫秒差:

    public class MessageClientIDSetter {
    
     private static byte[] createUniqIDBuffer() {
         long current = System.currentTimeMillis();
         if (current >= nextStartTime) {
             setStartTime(current);
         }
    
         // 时间差 [当前时间 - 这个月 1 号的时间]
         // putInt 占据的是第 11 位到第 14 位
         buffer.putInt((int) (System.currentTimeMillis() - startTime));
     }
    
     private synchronized static void setStartTime(long millis) {
         Calendar cal = Calendar.getInstance();
         cal.setTimeInMillis(millis);
         cal.set(Calendar.DAY_OF_MONTH, 1);
         cal.set(Calendar.HOUR_OF_DAY, 0);
         cal.set(Calendar.MINUTE, 0);
         cal.set(Calendar.SECOND, 0);
         cal.set(Calendar.MILLISECOND, 0);
         // 开始时间设置为这个月的 1 号
         startTime = cal.getTimeInMillis();
         // ...
     }
    }
    

    我们知道消息索引服务的查询需要用户传入begin和end这两个时间值,以进行这段时间内的匹配。所以 RocketMQ 为了加速消息的查询,于是在 Admin 端对特定 ID 进行查询的时候,首先取出了这段时间差值,然后与当月时间进行相加得到 begin 时间值。当begin时间戳确定以后,客户端便会将其它必要的信息如话题、Key等信息封装到 QUERY_MESSAGE 的包中,然后向 Broker 服务器传递这个请求,来进行消息的查询。Broker服务器在获取到这个查询消息的请求后,便会根据Key从索引文件中查询符合的消息,最终返回到客户端。

QueryMessageRequestHeader requestHeader = new QueryMessageRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setKey(key);
requestHeader.setMaxNum(maxNum);
requestHeader.setBeginTimestamp(begin);
requestHeader.setEndTimestamp(end);

this.mQClientFactory.getMQClientAPIImpl().queryMessage(addr, requestHeader, timeoutMillis * 3,new InvokeCallback() {
    @Override
    public void operationComplete(ResponseFuture responseFuture) {
        try {
            RemotingCommand response = responseFuture.getResponseCommand();
            if (response != null) {
                switch (response.getCode()) {
                    case ResponseCode.SUCCESS: {
                        QueryMessageResponseHeader responseHeader = null;
                        try {
                            responseHeader =(QueryMessageResponseHeader) response.decodeCommandCustomHeader(QueryMessageResponseHeader.class);
                        } catch (RemotingCommandException e) {
                            log.error("decodeCommandCustomHeader exception", e);
                            return;
                        }


                    }

                }
            }
        }
    }
}

3.4 通过关键字key查询

我们提到过,在发送消息的时候,可以填充一个keys的值,这个值将会作为消息的一个属性被发送到 Broker 服务器上:

public class Message implements Serializable {

    public void setKeys(String keys) {
        this.putProperty(MessageConst.PROPERTY_KEYS, keys);
    }

}

在定时构建索引文件时,会将对应的keys取出来,分隔为一个个key,分别创建索引文件中的记录。

public class IndexService {

    public void buildIndex(DispatchRequest req) {
        // ...
        if (keys != null && keys.length() > 0) {
            // 使用空格进行分割
            String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
            for (int i = 0; i < keyset.length; i++) {
                String key = keyset[i];
                if (key.length() > 0) {
                    indexFile = putKey(indexFile, msg, buildKey(topic, key));
                }
            }
        }
    }

}

由此我们也可以得知,keys键的设置通过使用空格分割字符串,一条消息可以指定多个键。 keys 键查询的方式也是通过将参数封装为 QUERY_MESSAGE 请求包中去请求服务器返回相应的信息。由于键本身不能和时间戳相关联,因此 begin 值设置的是 0,这是和使用唯一键查询不同之处:

public class QueryMsgByKeySubCommand implements SubCommand {

    private void queryByKey(final DefaultMQAdminExt admin, final String topic, final String key)
        throws MQClientException, InterruptedException {
        // begin: 0
        // end: Long.MAX_VALUE
        QueryResult queryResult = admin.queryMessage(topic, key, 64, 0, Long.MAX_VALUE);
    }

}

results matching ""

    No results matching ""