Broker处理请求
标签(空格分隔): 消息中间件
1 接收请求
对于发送消息,处理类为SendMessageProcessor。
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return null;
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
RemotingCommand response;
if (requestHeader.isBatch()) {
response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
}
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
}
主要处理两种消息:
- CONSUMER_SEND_MSG_BACK:consumer消费失败的消息发回broker,重试消息。
default:producer发消息到broker消息。 下面先讲解default消息。
2 处理请求
处理流程:
private RemotingCommand sendMessage(final ChannelHandlerContext ctx, final RemotingCommand request, final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); //透传回response,consumer和prodcuer实际是根据opaque进行请求的应答匹配 response.setOpaque(request.getOpaque()); response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); log.debug("receive SendMessage request command, {}", request); //进行了broker存活检查,即this.brokerController.getMessageStore().now()小于getStartAcceptSendRequestTimeStamp() ,表明broker异常。 final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); if (this.brokerController.getMessageStore().now() < startTimstamp) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); return response; } response.setCode(-1); //校验topic super.msgCheck(ctx, requestHeader, response); if (response.getCode() != -1) { return response; } final byte[] body = request.getBody(); int queueIdInt = requestHeader.getQueueId(); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (queueIdInt < 0) { queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); } MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(requestHeader.getTopic()); msgInner.setQueueId(queueIdInt); if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) { return response; } msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); msgInner.setPropertiesString(requestHeader.getProperties()); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); PutMessageResult putMessageResult = null; Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (traFlag != null && Boolean.parseBoolean(traFlag)) { if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); } return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); }
2.1 校验topic存在性
第一次发送请求时,消息生产者设置的topic在broker中不一定存在,所以对请求的topic的合理性进行校验。
/** * msgCheck方法对发送的消息进行了相关检查:topic是否有写权限,topic是否存在,请求的queueId是否存在。 * 如果topic不存在的话,broker端配置autoCreateTopicEnable=true的话,将创建队列数为4的topic。 * @param ctx * @param requestHeader * @param response * @return */ protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, final SendMessageRequestHeader requestHeader, final RemotingCommand response) { if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission()) && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden"); return response; } // 检查topic是否可以被发送。目前是{@link MixAll.DEFAULT_TOPIC}不被允许发送 if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) { String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; log.warn(errorMsg); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(errorMsg); return response; } TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (null == topicConfig) { int topicSysFlag = 0; if (requestHeader.isUnitMode()) { if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { topicSysFlag = TopicSysFlag.buildSysFlag(false, true); } else { topicSysFlag = TopicSysFlag.buildSysFlag(true, false); } } log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress()); //如果if (null == topicConfig) ,就会进行topic的创建,真正的创建逻辑在TopicConfigManager#createTopicInSendMessageMethod //如果话题不存在的话,那么便会创建一个话题信息存储到本地,并将所有话题再进行一次同步给所有的 Name 服务器 topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod( requestHeader.getTopic(), requestHeader.getDefaultTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getDefaultTopicQueueNums(), topicSysFlag); if (null == topicConfig) { // 再次创建 if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); } } if (null == topicConfig) { //创建失败 response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; } } int queueIdInt = requestHeader.getQueueId(); int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums()); if (queueIdInt >= idValid) { String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s", queueIdInt, topicConfig.toString(), RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(errorInfo); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(errorInfo); return response; } return response; }
如果话题不存在的话,那么便会创建一个话题信息存储到本地,并将所有话题再进行一次同步给所有的 Name 服务器:
public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic, ... topicConfig = this.topicConfigTable.get(topic); if (topicConfig != null) return topicConfig; ... topicConfig.setReadQueueNums(queueNums); topicConfig.setWriteQueueNums(queueNums); int perm = defaultTopicConfig.getPerm(); perm &= ~PermName.PERM_INHERIT; topicConfig.setPerm(perm); topicConfig.setTopicSysFlag(topicSysFlag); topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType()); ... if (topicConfig != null) { ... this.persist(); } ... if (createNew) { this.brokerController.registerBrokerAll(false, true,true); } return topicConfig; }
持久化到磁盘的topics.json文件中
public synchronized void persist() { String jsonString = this.encode(true); if (jsonString != null) { String fileName = this.configFilePath(); try { MixAll.string2File(jsonString, fileName); } catch (IOException e) { log.error("persist file " + fileName + " exception", e); } } }
完成的流程如下:
- 首先判断当前topic是否已经存在,topic从TopicConfigManager.topicConfigTable中获取该topic的配置信息对象TopicConfig;若获取到了则直接返回该对象;否则创建一个新的配置对象并存入topicConfigTable变量中;
- 如果topic不存在,则以参数defaultTopic从TopicConfigManager.topicConfigTable中获取该topic的配置信息对象TopicConfig,该参数在Producer发送信息时默认为"TBW102",而该topic在Broker启动时会自动创建;
- 用topic来初始化新的TopicConfig对象,并用以defaultTopic获取到的TopicConfig对象的其他参数来初始化新创建的TopicConfig对象;
- 将新创建的TopicConfig对象以topic为key值存入TopicConfigManager.topicConfigTable变量中,并且更新dataVersion值;
- 然后将topicConfigTable变量值持久化到topics.json物理文件中,this.persist();。
- 同时,调用brokerController.registerBrokerAll,将创建的对应的topic的信息同步到Namesrv服务器。
流程图如下:
2.2 消息存储
在校验to pic、队列长度进行校验后,组装MessageExtBrokerInner对象,然后执行消息存储:
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
当 Broker 对消息的一些字段做过一番必要的检查之后,便会存储到磁盘中去:
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
...
long beginTime = this.getSystemClock().now();
//将消息存入磁盘
PutMessageResult result = this.commitLog.putMessage(msg);
long eclipseTime = this.getSystemClock().now() - beginTime;
if (eclipseTime > 500) {
log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
在具体介绍消息的存储流程前,先介绍与消息存储相关的数据结构,下一章具体分析。