RocketMq通信流程

1 相关数据结构

1.1 消息的协议结构设计

在Client和Server之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作。 例如,一次请求RemotingCommand header格式

code=103,//这里的103对应的code就是broker向nameserver注册自己的消息
language=JAVA,
version=137,
opaque=58,//这个就是requestId
flag(B)=0,
remark=null,
extFields={
    brokerId=0,
    clusterName=DefaultCluster,
    brokerAddr=ip1: 10911,
    haServerAddr=ip1: 10912,
    brokerName=LAPTOP-SMF2CKDN
},
serializeTypeCurrentRPC=JSON

1.1.1 消息的编码

消息需要在网络中传输,首先需要对消息进行编码,netty主要通过org.apache.rocketmq.remoting.netty.NettyEncoder类对消息进行编码,由于消息体body已经是字节,因此只需要调用remotingCommand.encodeHeader()给请求头进行编码:

public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
        throws Exception {
        try {
            ByteBuffer header = remotingCommand.encodeHeader();
            out.writeBytes(header);
            byte[] body = remotingCommand.getBody();
            if (body != null) {
                out.writeBytes(body);
            }
        } catch (Exception e) {
            log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            if (remotingCommand != null) {
                log.error(remotingCommand.toString());
            }
            RemotingUtil.closeChannel(ctx.channel());
        }
    }

首先对消息头进行编码,转换为ByteBuffer结构,这里先写数据总长度,再写入header长度,最后写入header的数据,在写入body数据,写入结构 Length | Header length | Header data | Body

1.1.2 消息的解码

通过org.apache.rocketmq.remoting.netty.NettyDecoder类对收到的请求进行解码操作,

public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame = null;
        try {
            frame = (ByteBuf) super.decode(ctx, in);
            if (null == frame) {
                return null;
            }

            ByteBuffer byteBuffer = frame.nioBuffer();

            return RemotingCommand.decode(byteBuffer);
        } catch (Exception e) {
            log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
            RemotingUtil.closeChannel(ctx.channel());
        } finally {
            if (null != frame) {
                frame.release();
            }
        }

        return null;
    }

先获取header长度,获取header的数据,decode出来RemotingCommand,之后获取body。

1.2 ResponseFuture

invokeSyncImpl和invokeAsyncImpl都使用了,请求方会new一个ResponseFuture对象缓存起来ConcurrentHashMap,并且设置opaque 值,Broker接收请求将opaque 直接把这个值设置回响应对象,客户端接收到这个响应,通过opaque从缓存查找对应的ResponseFuture对象。 每次有消息需要发送,就会生成resposneFuture用于接收消息回应,但是如果始终没有收到回应,Map中的responseFuture就会堆积。 这个时候就需要一个线程来专门做Map的清理回收,这个线程会1s调用一次来检查所有的responseFuture是否有效,如果过期就会调用里面的回调方法通知调用者消息已经超过有效期,被移除掉了。

2 通信方式

在RocketMQ消息队列中支持通信的方式主要有以下三种: (1)同步(sync) (2)异步(async) (3)单向(oneway) 其中“同步”通信模式相对简单,同步获取responseFuture需要等待回应(responseFuture.waitResponse(timeoutMillis)),而异步的过程是利用回调(InvokeCallback)。

2.1 同步调用

2.1.1 客户端发送请求

同步发送请求,时序图流程如下:

发送消息,一直到调用该方法,设置CommunicationMode是SYNC,是同步模式

   public SendResult send(Message msg,
        long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
    }

发送消息的核心调用方法sendKernelImpl方法,下面摘取重要的部分:

private SendResult sendKernelImpl(final Message msg, //
        final MessageQueue mq, //
        final CommunicationMode communicationMode, //
        final SendCallback sendCallback, //
        final TopicPublishInfo topicPublishInfo, //
        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        // 获取 broker地址
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        if (brokerAddr != null) {
            // 是否使用broker vip通道。broker会开启两个端口对外服务。
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); // 是否使用broker vip通道。broker会开启两个端口对外服务。

            byte[] prevBody = msg.getBody();// 记录消息内容。下面逻辑可能改变消息内容,例如消息压缩。
            try {
                // 设置唯一编号
                MessageClientIDSetter.setUniqID(msg);
                // 消息压缩
                int sysFlag = 0;
                if (this.tryToCompressMessage(msg)) { //消息大于4K需要压缩
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                }
                // 事务
                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }
                // hook:发送消息校验
                if (hasCheckForbiddenHook()) {
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }
                // hook:发送消息前逻辑
                if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }

                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    this.executeSendMessageHookBefore(context);
                }
                // 构建发送消息请求
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {  //重试的topic
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }
                // 发送消息
                SendResult sendResult = null;
                switch (communicationMode) {
                    case ASYNC:
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
                            brokerAddr, // 1
                            mq.getBrokerName(), // 2
                            msg, // 3
                            requestHeader, // 4
                            timeout, // 5
                            communicationMode, // 6
                            sendCallback, // 7
                            topicPublishInfo, // 8
                            this.mQClientFactory, // 9
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
                            context, //
                            this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }
                // hook:发送消息后逻辑
                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }
                // 返回发送结果
                return sendResult;
            } catch (RemotingException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (MQBrokerException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (InterruptedException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } finally {
                msg.setBody(prevBody);
            }
        }

        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

然后调用org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage,该方法主要用来创建RemotingCommand对象,并且从中可以看出,对于SYNC方式,需要等待请求处理后的响应,而ASYNC和ONEWAY,直接返回NULL。

public SendResult sendMessage(//
        final String addr, // 1
        final String brokerName, // 2
        final Message msg, // 3
        final SendMessageRequestHeader requestHeader, // 4
        final long timeoutMillis, // 5
        final CommunicationMode communicationMode, // 6
        final SendCallback sendCallback, // 7
        final TopicPublishInfo topicPublishInfo, // 8
        final MQClientInstance instance, // 9
        final int retryTimesWhenSendFailed, // 10
        final SendMessageContext context, // 11
        final DefaultMQProducerImpl producer // 12
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = null;
        if (sendSmartMsg) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }

        request.setBody(msg.getBody());

        switch (communicationMode) {
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
            default:
                assert false;
                break;
        }

        return null;
    }

NettyRemotingAbstract.invokeSyncImpl这里才是真正发送消息的逻辑,底层使用Netty通信。关于netty,我们知道在Netty中所有的IO操作都是异步的,因此,你不能立刻得知消息是否被正确处理,但是我们可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future和ChannelFuture,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发。总之,所有的操作都会返回一个ChannelFuture。

这里为channelFuture配置了ChannelFutureListener监听器,监听channel的操作结果。如果消息发送成功,则设置ResponseFuture中SendRequestOK为true,return。否则设置为false,并且设置responseFuture中的responseCommand为null。 在设置了监听器之后,线程会阻塞等待:

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        //相当于request ID, RemotingCommand会为每一个request产生一个request ID, 从0开始, 每次加1
        final int opaque = request.getOpaque();

        try {
            //构建ResponseFuture对象
            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
            //将ResponseFuture放入responseTable
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            //向netty注册回调,向channel写入RemotingCommand对象
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }

                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    responseFuture.putResponse(null);
                    PLOG.warn("send a request command to channel <" + addr + "> failed.");
                }
            });
            //等待服务器端响应结果
            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }

客户端发送流程:

  • 构建ResponseFuture,设置opaque值,把ResponseFuture以opaque为键放入Map中保存
  • netty发送请送请求
  • 发送成功设置ResponseFuture发送状态为成功;发送失败设置ResponseFuture发送失败,并且从Map存中移除ResponseFuture
  • responseFuture.waitResponse(timeoutMillis)获取响应(如果超时则抛出异常)
  • 收到服务端的回应以后,从Map中根据opaque拿出responseFuture,将回应写入其中,并从Map中移除
  • resposneFuture得到回应,并将返回给客户端

    2.1.2 客户端处理服务端响应

    在客户端的启动start()操作中,会绑定NettyClientHandler,NettyClientHandler接收的消息类型为RESPONSE_COMMAND,调用processResponseCommand方法。

     class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
    
         @Override
         protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
             processMessageReceived(ctx, msg);
         }
     }
    
     public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
         final RemotingCommand cmd = msg;
         if (cmd != null) {
             switch (cmd.getType()) {
                 case REQUEST_COMMAND:
                     processRequestCommand(ctx, cmd);
                     break;
                 case RESPONSE_COMMAND:
                     processResponseCommand(ctx, cmd);
                     break;
                 default:
                     break;
             }
         }
     }
    

    根据RemotingCommand的类型标识位,如果是REQUEST_COMMAND类型,表明是客户端请求服务端,如果是RESPONSE_COMMAND表明是服务器端返回的响应数据。类型区分通过flag字段判断,在createRequestCommand中flag=0,createResponseCommand中将flag置为1。

     public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
         final int opaque = cmd.getOpaque();
         final ResponseFuture responseFuture = responseTable.get(opaque);
         if (responseFuture != null) {
             responseFuture.setResponseCommand(cmd);
    
             responseTable.remove(opaque);
             //SYNC请求回调函数为null
             if (responseFuture.getInvokeCallback() != null) {
                 executeInvokeCallback(responseFuture);
             } else {
                 //SYNC这里,将响应设置responseCommand
                 responseFuture.putResponse(cmd);
                 responseFuture.release();
             }
         } else {
             log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
             log.warn(cmd.toString());
         }
     }
    

    //等待服务器端响应结果 RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); 拿到结果,然后返回;否则,抛出异常。

    2.1.3 服务端处理请求

    在服务端启动start()时,会绑定NettyServerHandler,NettyServerHandler接收的消息类型为REQUEST_COMMAND,调用processRequestCommand方法。 在创建请求RemotingCommand时,设置code为RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader),对应的处理器为SendMessageProcessor。

    public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                           RemotingCommand request) throws RemotingCommandException {
         SendMessageContext mqtraceContext;
         switch (request.getCode()) {
             case RequestCode.CONSUMER_SEND_MSG_BACK:
                 return this.consumerSendMsgBack(ctx, request);
             default:
                 SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                 if (requestHeader == null) {
                     return null;
                 }
    
                 mqtraceContext = buildMsgContext(ctx, requestHeader);
                 this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
    
                 RemotingCommand response;
                 if (requestHeader.isBatch()) {
                     response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                 } else {
                     response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
                 }
    
                 this.executeSendMessageHookAfter(response, mqtraceContext);
                 return response;
         }
     }
    

    上面是broker处理消息发送后的核心流程,下章重点讲解。处理完成后,然后将结果返回给netty客户端:

    public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
     ...
         final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
         ...
    
                         if (!cmd.isOnewayRPC()) {
                             //处理完成后,响应客户端
                             if (response != null) {
                                 response.setOpaque(opaque);
                                 response.markResponseType();
                                 try {
                                     ctx.writeAndFlush(response);
                                 } catch (Throwable e) {
                                     log.error("process request over, but response failed", e);
                                     log.error(cmd.toString());
                                     log.error(response.toString());
                                 }
                             } else {
    
                             }
                         }
         ...
     }
    

    服务端接收请求后处理流程:

  • netty监听得到发送过来的消息,分拣给Server端进行处理

  • 根据消息的code得到对应的处理器(Processor),broker在初始化时,已经根据code设置不同的处理器
  • 创建一个新的线程,在这个线程中让处理器去处理消息,并得到回应(Response)。判断如果消息不是单向的(one-way),则把请求中的opaque放回response中,并把消息发回给请求端。

下面是RocketMQ同步通信的整体流程图:

2.2 异步调用

2.2.1 客户端发送异步消息

private void sendMessageAsync(
        final String addr,
        final String brokerName,
        final Message msg,
        final long timeoutMillis,
        final RemotingCommand request,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int retryTimesWhenSendFailed,
        final AtomicInteger times,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws InterruptedException, RemotingException {
        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
            @Override
            public void operationComplete(ResponseFuture responseFuture) {
                //先从Server端返回的responseFuture变量中获取RemotingCommand的值
                RemotingCommand response = responseFuture.getResponseCommand();
                if (null == sendCallback && response != null) {

                    try {
                        //Client端处理发送消息的Reponse返回(包括对消息返回体的头部进行解码,
                        //取得“topic”、“BrokerName”、“QueueId”等值)
                        //随后构建sendResult对象并设置Context上下文中
                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                        if (context != null && sendResult != null) {
                            context.setSendResult(sendResult);
                            context.getProducer().executeSendMessageHookAfter(context);
                        }
                    } catch (Throwable e) {
                    }

                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                    return;
                }

                if (response != null) {
                    try {
                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                        assert sendResult != null;
                        if (context != null) {
                            context.setSendResult(sendResult);
                            context.getProducer().executeSendMessageHookAfter(context);
                        }

                        try {
                            sendCallback.onSuccess(sendResult);
                        } catch (Throwable e) {
                        }

                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                    } catch (Exception e) {
                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, e, context, false, producer);
                    }
                } else {
                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                    if (!responseFuture.isSendRequestOK()) {
                        MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                    } else if (responseFuture.isTimeout()) {
                        MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
                            responseFuture.getCause());
                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                    } else {
                        MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                    }
                }
            }
        });
    }

其中,调用remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback())发起异步消息,通过回调的方式调用operationComplete获取服务端返回的结果。

public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        long beginStartTime = System.currentTimeMillis();
        //相当于request ID, RemotingCommand会为每一个request产生一个request ID, 从0开始, 每次加1
        final int opaque = request.getOpaque();
        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTime) {
                throw new RemotingTooMuchRequestException("invokeAsyncImpl call timeout");
            }
            //根据request ID构建ResponseFuture
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
            //将ResponseFuture放入responseTable
            this.responseTable.put(opaque, responseFuture);
            try {
                //使用Netty的channel发送请求数据
                //netty io.netty.util.concurrent.DefaultPromise.notifyListener0 会回调operationComplete 方法
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    //消息发送后执行,由netty回调该方法
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        if (f.isSuccess()) {
                            //如果发送消息成功给Server,那么这里直接Set后return
                            responseFuture.setSendRequestOK(true);
                            return;
                        }
                        requestFail(opaque);
                        log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                    }
                });
            } catch (Exception e) {
                responseFuture.release();
                log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
            } else {
                String info =
                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                        timeoutMillis,
                        this.semaphoreAsync.getQueueLength(),
                        this.semaphoreAsync.availablePermits()
                    );
                log.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
    }
  • 获取信号量semaphoreAsync,看是否可以发送消息(异步一般链路耗时比较长,为了防止本地缓存的netty请求过多, 使用信号量来控制是否可以发动消息,信号量上限默认2048个)
  • 生成callback回调并构建ResponseFuture,设置opaque值,把ResponseFuture以opaque为键放入Map中保存
  • netty发送请送请求
  • 发送成功设置ResponseFuture发送状态为成功;发送失败设置ResponseFuture发送失败,并且从Map存中移除ResponseFuture
  • 当服务端的消息回应后,调用回调方法,得到处理response,并从Map中移除

2.2.2 客户端处理服务端响应

异步消息的处理和同步消息处理不同,responseFuture.getInvokeCallback()不为空,所以通过回调方式获取结果。

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) {
            responseFuture.setResponseCommand(cmd);

            responseTable.remove(opaque);
            //SYNC请求回调函数为null
            if (responseFuture.getInvokeCallback() != null) {
                //异步请求调用
                executeInvokeCallback(responseFuture);
            } else {
                //SYNC这里
                responseFuture.putResponse(cmd);
                responseFuture.release();
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }

private void executeInvokeCallback(final ResponseFuture responseFuture) {
        boolean runInThisThread = false;
        ExecutorService executor = this.getCallbackExecutor();
        if (executor != null) {
            try {
                executor.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            //调用回调方法,获取服务端响应结果
                            responseFuture.executeInvokeCallback();
                        } catch (Throwable e) {
                            log.warn("execute callback in executor exception, and callback throw", e);
                        } finally {
                            responseFuture.release();
                        }
                    }
                });
            } catch (Exception e) {
                runInThisThread = true;
                log.warn("execute callback in executor exception, maybe executor busy", e);
            }
        } else {
            runInThisThread = true;
        }

        if (runInThisThread) {
            try {
                responseFuture.executeInvokeCallback();
            } catch (Throwable e) {
                log.warn("executeInvokeCallback Exception", e);
            } finally {
                responseFuture.release();
            }
        }
    }

public void executeInvokeCallback() {
        if (invokeCallback != null) {
            if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) {
                invokeCallback.operationComplete(this);
            }
        }
    }

其中,invokeCallback.operationComplete(this)即调用sendMessageAsync中的回调方法operationComplete,

public void operationComplete(ResponseFuture responseFuture) {
                //先从Server端返回的responseFuture变量中获取RemotingCommand的值
                RemotingCommand response = responseFuture.getResponseCommand();
                ...
                if (response != null) {
                    try {
                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                        assert sendResult != null;
                        if (context != null) {
                            context.setSendResult(sendResult);
                            context.getProducer().executeSendMessageHookAfter(context);
                        }

                        try {
                            sendCallback.onSuccess(sendResult);
                        } catch (Throwable e) {
                        }

                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                    } catch (Exception e) {
                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                                retryTimesWhenSendFailed, times, e, context, false, producer);
                    }
                } 
                ...
            }

异步请求的结果,通过SendCallback回调方法获取。

2.2.3 服务端处理异步请求

和同步请求,服务端的处理并没有区别。 下面是RocketMQ异步通信的整体流程图:

2.3 单向调用

2.3.1 客户端发送请求

public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        request.markOnewayRPC();
        boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
            try {
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        once.release();
                        if (!f.isSuccess()) {
                            log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
                        }
                    }
                });
            } catch (Exception e) {
                once.release();
                log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
            } else {
                String info = String.format(
                    "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                    timeoutMillis,
                    this.semaphoreOneway.getQueueLength(),
                    this.semaphoreOneway.availablePermits()
                );
                log.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
    }

2.3.2 客户端处理响应

不需要处理结果

2.3.3 服务端处理请求

处理过程和同步方式一样,但是不需要将结果返回给客户端。 下面是RocketMQ单向调用的整体流程图:

results matching ""

    No results matching ""