消息发送

1 启动服务

  • 启动 Name 服务器
  • 启动 Broker 服务器

2 创建消息

  • 消息topic
  • 消息体

3 启动Producer生产消息

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();

启动后,服务如下图:

4 消息发送流程

producer.send(msg);

详细流程:

private SendResult sendDefaultImpl(//
        Message msg, //
        final CommunicationMode communicationMode, //
        final SendCallback sendCallback, //
        final long timeout//
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 校验 Producer 处于运行状态
        this.makeSureStateOK();
        // 校验消息格式
        Validators.checkMessage(msg, this.defaultMQProducer);
        // 调用编号;用于下面打印日志,标记为同一次发送消息
        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        // 获取 Topic路由信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null; // 最后选择消息要发送到的队列
            Exception exception = null;
            SendResult sendResult = null; // 最后一次发送结果
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 失败重试,同步多次调用
            int times = 0; // 第几次发送
            String[] brokersSent = new String[timesTotal]; // 存储每次发送消息选择的broker名
            // 循环调用发送消息,直到成功
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);  // 选择消息要发送到的队列
                if (tmpmq != null) {
                    mq = tmpmq;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        // 调用发送消息核心方法
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                        endTimestamp = System.currentTimeMillis();
                        // 更新Broker可用性信息,更新延迟容错信息
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    // 同步发送成功但存储有问题时 && 配置存储异常时重新发送开关 时,进行重试
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        // 打印异常,更新Broker可用性信息,更新继续循环
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        // 打印异常,更新Broker可用性信息,继续循环
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        // 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        switch (e.getResponseCode()) {
                            // 如下异常continue,进行发送消息重试

                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue;
                                // 如果有发送结果,进行返回,否则,抛出异常;
                            default:
                                if (sendResult != null) {
                                    return sendResult;
                                }

                                throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());

                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }
            // 返回发送结果
            if (sendResult != null) {
                return sendResult;
            }
            // 根据不同情况,抛出不同的异常
            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));

            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
            // Namesrv找不到异常
            // 消息路由找不到异常
            MQClientException mqClientException = new MQClientException(info, exception);
            if (exception instanceof MQBrokerException) {
                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
            } else if (exception instanceof RemotingConnectException) {
                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
            } else if (exception instanceof RemotingTimeoutException) {
                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
            } else if (exception instanceof MQClientException) {
                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
            }

            throw mqClientException;
        }

        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
        if (null == nsList || nsList.isEmpty()) {
            throw new MQClientException(
                "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
        }

        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }

5 寻找发送topic的路由信息

主要调用

// 获取 Topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

当生产者发送消息的时候,其首先会尝试寻找话题路由信息。即这条消息应该被发送到哪个地方去。

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); //优先从缓存topicPublishInfoTable,其次从Namesrv中获得。
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); // 当无可用的 Topic发布信息时,从Namesrv获取一次
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
        ...
    }

客户端在内存中维护了一份和话题相关的路由信息表

topicPublishInfoTable

,当发送消息的时候,会首先尝试从此表中获取信息。如果此表不存在这条话题的话,那么便会从 Name 服务器获取路由消息。

get_test_topic_route_info

public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        return updateTopicRouteInfoFromNameServer(topic, false, null);
    }

    /**
     * 第一次发布topic,查询不到路由信息,抛出异常后会使用默认topic TBW102的配置信息创建该topic的路由信息
     */
    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    //如果是默认topic则获取默认topic的信息 否则获取topic的信息
                    if (isDefault && defaultMQProducer != null) {
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                        if (topicRouteData != null) {
                            for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());//一个为客户设置,另一个通过读取默认topic的配置值
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            }
                        }
                    } else {
                        //获取topic的消息
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
                    if (topicRouteData != null) {
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if (!changed) {
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        } else {
                            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                        }

                        if (changed) {
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                            //获取该topic对应的所有broker,缓存到brokerAddrTable
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }
                            /**
                             * 观察是否需要更新topic路由信息,如果需要更新,则更新所有Producer的路由信息。然后会执行一个代码块,
                             * 更新消费者的订阅信息(如果这个Client实例上有消费者的话)。
                             */
                            // Update Pub info
                            {
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }

                            // Update sub info
                            {
                                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQConsumerInner> entry = it.next();
                                    MQConsumerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                    }
                                }
                            }
                            log.info("topicRouteTable.put TopicRouteData[{}]", cloneTopicRouteData);
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    } else {
                        log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
                    }
                } catch (Exception e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC)) {
                        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                    }
                } finally {
                    this.lockNamesrv.unlock();
                }
            } else {
                log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
            }
        } catch (InterruptedException e) {
            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
        }

        return false;
    }

5.1 NameServer选择

在获取topic的路由信息时,首先需要选择NameServer进行连接,当调用

mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer

public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
        throws RemotingException, MQClientException, InterruptedException {
        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
        requestHeader.setTopic(topic);

        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);

        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.TOPIC_NOT_EXIST: {
                // TODO LOG
                break;
            }
            case ResponseCode.SUCCESS: {
                byte[] body = response.getBody();
                if (body != null) {
                    return TopicRouteData.decode(body, TopicRouteData.class);
                }
            }
            default:
                break;
        }

        throw new MQClientException(response.getCode(), response.getRemark());
    }

    @Override
    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        //这里获取连接,该方法里面会做连接的检查和恢复
        final Channel channel = this.getAndCreateChannel(addr);
        //最后如果还是不是有效连接,则关闭连接,抛出异常
        if (channel != null && channel.isActive()) {
            try {
                if (this.rpcHook != null) {
                    this.rpcHook.doBeforeRequest(addr, request);
                }
                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
                if (this.rpcHook != null) {
                    this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                }
                return response;
            } catch (RemotingSendRequestException e) {
                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            } catch (RemotingTimeoutException e) {
                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr, channel);
                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                }
                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

    private Channel getAndCreateChannel(final String addr) throws InterruptedException {
        //无论是producer还是consumer,传进来的namesrv addr参数都是null
        if (null == addr)
            return getAndCreateNameserverChannel();
        //因为客户端传入的addr是null,所以客户端不会走到这里来,只有broker才会走到这里来,因为broker传入的addr不为null
        ChannelWrapper cw = this.channelTables.get(addr);
        if (cw != null && cw.isOK()) {
            return cw.getChannel();
        }
        //注意,如果和某个addr的连接不OK了,则再向该nameserver发起重连
        return this.createChannel(addr);
    }

在 RocketMQ 的设计中,客户端需要首先询问 Name 服务器才能确定一个合适的 Broker 以进行消息的发送:

然而这么多 Name 服务器,客户端是如何选择一个合适的 Name 服务器呢?

首先,我们要意识到很重要的一点,Name 服务器全部都是处于相同状态的,保存的都是相同的信息。在 Broker 启动的时候,其会将自己在本地存储的话题配置文件 (默认位于 $HOME/store/config/topics.json 目录) 中的所有话题加载到内存中去,然后会将这些所有的话题全部同步到所有的 Name 服务器中。与此同时,Broker 也会启动一个定时任务,默认每隔 30 秒来执行一次话题全同步:

由于 Name 服务器每台机器存储的数据都是一致的。因此我们客户端任意选择一台服务器进行沟通即可。

其中客户端一开始选择 Name 服务器的源码如下所示:

/**
     * <p/>Producer、Consumer 从 Namesrv列表选择一个可连接的进行通信。
     * 因为客户端都是与某一台nameserver长连接,因此长连接一旦选定,后面不会变化,
     * 除非nameserver挂掉,所以已建立的长连接要保存起来
     * @return
     * @throws InterruptedException
     */
    private Channel getAndCreateNameserverChannel() throws InterruptedException {
        // 返回已选择、可连接Namesrv
        String addr = this.namesrvAddrChoosed.get();
        if (addr != null) {
            ChannelWrapper cw = this.channelTables.get(addr);
            //注意这里,虽然长连接已经建立了,但是每次调用时,仍然要通过“cw != null && cw.isOK()”检查连接是否OK
            if (cw != null && cw.isOK()) {
                return cw.getChannel();
            }
        }
        //如果连接没有建立或连接已经断开,则继续往下,真正创建连接时需要加锁的
        final List<String> addrList = this.namesrvAddrList.get();
        if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                addr = this.namesrvAddrChoosed.get();
                if (addr != null) {
                    ChannelWrapper cw = this.channelTables.get(addr);
                    if (cw != null && cw.isOK()) {
                        return cw.getChannel();
                    }
                }
                //namesrvIndex指示了当前跟哪个nameserver发生连接,初始值是个随机数,跟nameserver数量取模,走到这一步,要么是首次发起调用,之前连接还未创建现在要创建了,
                // 或者是已创建的连接无效了要连接下一个nameserver,就是“cw.isOK()”为false。
                if (addrList != null && !addrList.isEmpty()) {
                    for (int i = 0; i < addrList.size(); i++) {
                        int index = this.namesrvIndex.incrementAndGet();
                        index = Math.abs(index);
                        index = index % addrList.size();
                        String newAddr = addrList.get(index);

                        this.namesrvAddrChoosed.set(newAddr);
                        Channel channelNew = this.createChannel(newAddr);
                        if (channelNew != null)
                            return channelNew;
                    }
                }
            } catch (Exception e) {
                log.error("getAndCreateNameserverChannel: create name server channel exception", e);
            } finally {
                this.lockNamesrvChannel.unlock();
            }
        } else {
            log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
        }

        return null;
    }

以后,如果 namesrvAddrChoosed 选择的服务器如果一直处于连接状态,那么客户端就会一直与这台服务器进行沟通。否则的话,如上源代码所示,就会自动轮寻下一台可用服务器。

5.2 从NameServer获取topic信息

当尝试从NameServer服务器获取路由信息的时候,其可能会返回两种情况:

(1) 这个话题是新创建的,Name 服务器不存在和此话题相关的信息

nameserver_not_exist_topic

如果updateTopicRouteInfoFromNameServer调用返回false,如何执行如下流程:

 public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
        throws RemotingException, MQClientException, InterruptedException {
        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
        requestHeader.setTopic(topic);

        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);

        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.TOPIC_NOT_EXIST: {
                // TODO LOG
                break;
            }
            case ResponseCode.SUCCESS: {
                byte[] body = response.getBody();
                if (body != null) {
                    return TopicRouteData.decode(body, TopicRouteData.class);
                }
            }
            default:
                break;
        }

        throw new MQClientException(response.getCode(), response.getRemark());
    }

返回默认topic TBW102的配置信息创建该topic的路由信息。

(2) Name 服务器存在此话题信息

read_topic_info_from_nameserver

服务器返回的话题路由信息包括以下内容:

topic_route_data

“broker-1”、”broker-2” 分别为两个 Broker 服务器的名称,相同名称下可以有主从 Broker,因此每个 Broker 又都有 brokerId 。默认情况下,BrokerId 如果为

MixAll.MASTER_ID

(值为 0) 的话,那么认为这个 Broker 为 MASTER 主机,其余的位于相同名称下的 Broker 为这台 MASTER 主机的 SLAVE 主机。

6 选择发送的消息队列

private SendResult sendDefaultImpl(//
        Message msg, //
        final CommunicationMode communicationMode, //
        final SendCallback sendCallback, //
        final long timeout//
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 校验 Producer 处于运行状态
        this.makeSureStateOK();
        // 校验消息格式
        Validators.checkMessage(msg, this.defaultMQProducer);
        // 调用编号;用于下面打印日志,标记为同一次发送消息
        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        // 获取 Topic路由信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null; // 最后选择消息要发送到的队列
            Exception exception = null;
            SendResult sendResult = null; // 最后一次发送结果
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 失败重试,同步多次调用
            int times = 0; // 第几次发送
            String[] brokersSent = new String[timesTotal]; // 存储每次发送消息选择的broker名
            // 循环调用发送消息,直到成功
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);  // 选择消息要发送到的队列
                ...
}

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        //Producer消息发送容错策略。默认情况下容错策略关闭,即sendLatencyFaultEnable=false。
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    //容错策略选择消息队列逻辑。优先获取可用队列,其次选择一个broker获取队列,最差返回任意broker的一个队列。
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }
                // 选择一个相对好的broker,并获得其对应的一个消息队列,不考虑该队列的可用性
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            // 选择一个消息队列,不考虑队列的可用性
            return tpInfo.selectOneMessageQueue();
        }
        // 获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

发送线程以线程独立的方式自增,遍历MessageQueue选择一条待发送的MessageQueue。 volatile作用。

  • 若启动容错策略(默认false)
  • 1、通过ThreadLocalIndex遍历选出一条非faultItme的messageQueue。
  • 2、若无非faultItme的broker,将faultItme按照可用 < 不可用,失败时长短 < 失败时长长,恢复时间点早 < 恢复时间点晚来排序,pickOneAtLeast遍历排序链表的前半部分选出一个broker。即至少不是最差策略 —— bad but not worst。
  • 3、依次轮流写入broker中对应的messageQueue中,发端的负载均衡体现在这里。

异常情况:每次Producer发送失败时,维护并更新一个broker的faultItem的Map,则逻辑认为N秒内不可用,用于容错策略下的比较。 非异常情况下:看源码发现也会归为faultItem节点,延时设置为上一次从发出请求到响应的时长。

每个 Broker 上面可以绑定多个可写消息队列和多个可读消息队列,客户端根据返回的所有 Broker 地址列表和每个 Broker 的可写消息队列列表会在内存中构建一份所有的消息队列列表。之后客户端每次发送消息,都会在消息队列列表上轮循选择队列 (我们假设返回了两个 Broker,每个 Broker 均有 4 个可写消息队列):

7 调用核心方法发送

 // 调用发送消息核心方法
 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);

7.1 选择broker地址

public class MQClientInstance {

    public String findBrokerAddressInPublish(final String brokerName) {
        HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
        if (map != null && !map.isEmpty()) {
            return map.get(MixAll.MASTER_ID);
        }

        return null;
    }

}

7.2 发送消息

在确定了 Master Broker 地址和这个 Broker 的消息队列以后,客户端才开始真正地发送消息给这个 Broker,也是从这里客户端才开始与 Broker 进行交互:

client_send_msg_to_broker

 switch (communicationMode) {
                    case ASYNC:
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
                            brokerAddr, // 1
                            mq.getBrokerName(), // 2
                            msg, // 3
                            requestHeader, // 4
                            timeout, // 5
                            communicationMode, // 6
                            sendCallback, // 7
                            topicPublishInfo, // 8
                            this.mQClientFactory, // 9
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
                            context, //
                            this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }

主要有3种消息发送方式:SYNC、ASYNC、ONEWAY

  • SYNC:同步发送,发送方线程发送后同步堵塞等待SendResult,若failed则重试下一个broker。
  • ASYNC:异步发送,超时可抛出Timeout异常。
  • ONEWAY:单向发送,发送方不等待broker响应也没有回调函数触发,速度快但可靠性弱。

results matching ""

    No results matching ""