Broker处理请求

标签(空格分隔): 消息中间件

1 接收请求

对于发送消息,处理类为SendMessageProcessor。

public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                          RemotingCommand request) throws RemotingCommandException {
        SendMessageContext mqtraceContext;
        switch (request.getCode()) {
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.consumerSendMsgBack(ctx, request);
            default:
                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                if (requestHeader == null) {
                    return null;
                }

                mqtraceContext = buildMsgContext(ctx, requestHeader);
                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

                RemotingCommand response;
                if (requestHeader.isBatch()) {
                    response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                } else {
                    response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
                }

                this.executeSendMessageHookAfter(response, mqtraceContext);
                return response;
        }
    }

主要处理两种消息:

  • CONSUMER_SEND_MSG_BACK:consumer消费失败的消息发回broker,重试消息。
  • default:producer发消息到broker消息。 下面先讲解default消息。

    2 处理请求

    处理流程:

    private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
                                         final RemotingCommand request,
                                         final SendMessageContext sendMessageContext,
                                         final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
    
         final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
         final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
         //透传回response,consumer和prodcuer实际是根据opaque进行请求的应答匹配
         response.setOpaque(request.getOpaque());
    
         response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
         response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
    
         log.debug("receive SendMessage request command, {}", request);
         //进行了broker存活检查,即this.brokerController.getMessageStore().now()小于getStartAcceptSendRequestTimeStamp() ,表明broker异常。
         final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
         if (this.brokerController.getMessageStore().now() < startTimstamp) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
             return response;
         }
    
         response.setCode(-1);
         //校验topic
         super.msgCheck(ctx, requestHeader, response);
         if (response.getCode() != -1) {
             return response;
         }
    
         final byte[] body = request.getBody();
    
         int queueIdInt = requestHeader.getQueueId();
         TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    
         if (queueIdInt < 0) {
             queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
         }
    
         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
         msgInner.setTopic(requestHeader.getTopic());
         msgInner.setQueueId(queueIdInt);
    
         if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
             return response;
         }
    
         msgInner.setBody(body);
         msgInner.setFlag(requestHeader.getFlag());
         MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
         msgInner.setPropertiesString(requestHeader.getProperties());
         msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
         msgInner.setBornHost(ctx.channel().remoteAddress());
         msgInner.setStoreHost(this.getStoreHost());
         msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
         PutMessageResult putMessageResult = null;
         Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
         String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
         if (traFlag != null && Boolean.parseBoolean(traFlag)) {
             if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                 response.setCode(ResponseCode.NO_PERMISSION);
                 response.setRemark(
                     "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                         + "] sending transaction message is forbidden");
                 return response;
             }
             putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
         } else {
             putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
         }
    
         return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
    
     }
    

    2.1 校验topic存在性

    第一次发送请求时,消息生产者设置的topic在broker中不一定存在,所以对请求的topic的合理性进行校验。

    /**
      * msgCheck方法对发送的消息进行了相关检查:topic是否有写权限,topic是否存在,请求的queueId是否存在。
      * 如果topic不存在的话,broker端配置autoCreateTopicEnable=true的话,将创建队列数为4的topic。
      * @param ctx
      * @param requestHeader
      * @param response
      * @return
      */
     protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
         final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
         if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
             && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
             response.setCode(ResponseCode.NO_PERMISSION);
             response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                 + "] sending message is forbidden");
             return response;
         }
         // 检查topic是否可以被发送。目前是{@link MixAll.DEFAULT_TOPIC}不被允许发送
         if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
             String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
             log.warn(errorMsg);
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark(errorMsg);
             return response;
         }
    
         TopicConfig topicConfig =
             this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
         if (null == topicConfig) {
             int topicSysFlag = 0;
             if (requestHeader.isUnitMode()) {
                 if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                     topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
                 } else {
                     topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
                 }
             }
    
             log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
             //如果if (null == topicConfig) ,就会进行topic的创建,真正的创建逻辑在TopicConfigManager#createTopicInSendMessageMethod
             //如果话题不存在的话,那么便会创建一个话题信息存储到本地,并将所有话题再进行一次同步给所有的 Name 服务器
             topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
                 requestHeader.getTopic(),
                 requestHeader.getDefaultTopic(),
                 RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                 requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
    
             if (null == topicConfig) {
                 // 再次创建
                 if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                     topicConfig =
                         this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                             requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
                             topicSysFlag);
                 }
             }
    
             if (null == topicConfig) {
                 //创建失败
                 response.setCode(ResponseCode.TOPIC_NOT_EXIST);
                 response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
                     + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
                 return response;
             }
         }
    
         int queueIdInt = requestHeader.getQueueId();
         int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
         if (queueIdInt >= idValid) {
             String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
                 queueIdInt,
                 topicConfig.toString(),
                 RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
    
             log.warn(errorInfo);
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark(errorInfo);
    
             return response;
         }
         return response;
     }
    

    如果话题不存在的话,那么便会创建一个话题信息存储到本地,并将所有话题再进行一次同步给所有的 Name 服务器:

    public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
         ...
                     topicConfig = this.topicConfigTable.get(topic);
                     if (topicConfig != null)
                         return topicConfig;
    
                         ...
    
                             topicConfig.setReadQueueNums(queueNums);
                             topicConfig.setWriteQueueNums(queueNums);
                             int perm = defaultTopicConfig.getPerm();
                             perm &= ~PermName.PERM_INHERIT;
                             topicConfig.setPerm(perm);
                             topicConfig.setTopicSysFlag(topicSysFlag);
                             topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
                         ...
    
                     if (topicConfig != null) {
                         ...
    
                         this.persist();
                     }
         ...
    
         if (createNew) {
             this.brokerController.registerBrokerAll(false, true,true);
         }
    
         return topicConfig;
    }
    

    持久化到磁盘的topics.json文件中

    public synchronized void persist() {
         String jsonString = this.encode(true);
         if (jsonString != null) {
             String fileName = this.configFilePath();
             try {
                 MixAll.string2File(jsonString, fileName);
             } catch (IOException e) {
                 log.error("persist file " + fileName + " exception", e);
             }
         }
     }
    

完成的流程如下:

  • 首先判断当前topic是否已经存在,topic从TopicConfigManager.topicConfigTable中获取该topic的配置信息对象TopicConfig;若获取到了则直接返回该对象;否则创建一个新的配置对象并存入topicConfigTable变量中;
  • 如果topic不存在,则以参数defaultTopic从TopicConfigManager.topicConfigTable中获取该topic的配置信息对象TopicConfig,该参数在Producer发送信息时默认为"TBW102",而该topic在Broker启动时会自动创建;
  • 用topic来初始化新的TopicConfig对象,并用以defaultTopic获取到的TopicConfig对象的其他参数来初始化新创建的TopicConfig对象;
  • 将新创建的TopicConfig对象以topic为key值存入TopicConfigManager.topicConfigTable变量中,并且更新dataVersion值;
  • 然后将topicConfigTable变量值持久化到topics.json物理文件中,this.persist();。
  • 同时,调用brokerController.registerBrokerAll,将创建的对应的topic的信息同步到Namesrv服务器。

流程图如下: 此处输入图片的描述

2.2 消息存储

在校验to pic、队列长度进行校验后,组装MessageExtBrokerInner对象,然后执行消息存储:

putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

当 Broker 对消息的一些字段做过一番必要的检查之后,便会存储到磁盘中去:

public PutMessageResult putMessage(MessageExtBrokerInner msg) {
        ...
        long beginTime = this.getSystemClock().now();
        //将消息存入磁盘
        PutMessageResult result = this.commitLog.putMessage(msg);

        long eclipseTime = this.getSystemClock().now() - beginTime;
        if (eclipseTime > 500) {
            log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
        }
        this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);

        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }

        return result;
}

在具体介绍消息的存储流程前,先介绍与消息存储相关的数据结构,下一章具体分析。

results matching ""

    No results matching ""