RocketMq主从复制

标签(空格分隔): 消息中间件

1 主从同步方式

我们知道,broker有两种角色master和slave,每种角色有两个关键配置:brokerId和brokerRole,brokerId指定此master或slave在Broker中的序号,0表示Master,1及之后的表示Slave,Broker中Slave可以有多个,当然一般一个就够了,所以brokerId的值一般为1。brokerRole表示其在Broker中的角色定位,有3个值可选:

public enum BrokerRole {
    ASYNC_MASTER,
    SYNC_MASTER,
    SLAVE;
}

ASYNC_MASTER:异步Master,也就是新的消息存储时不需要等Slave同步完成; SYNC_MASTER:同步Master,新消息存现时需要等Slave同步完成,也就是返回的 Ack Offset >= 当前消息的CommitLog Offset; SLAVE:Slave角色其brokerRole;

2 Master和Slave的关系

我们知道Master和Slave主要通过心跳建立联系,如果长时间检测不到心跳,则长连接将被关系,同时,由于对broker的写操作都是发生在Master上,所以Master和Slave之间的数据需要进行同步,数据同步的发生主要通过Slave向Master上报同步进度。 下面,我们来看一下Slave如何获取到Master的地址信息,首先,如果在配置文件中指定了haMasterAddress,那么这个就会作为Master的IP+端口,Slave会向此地址发送请求,建立长连接,看broker初始化过程:

public boolean initialize() throws CloneNotSupportedException {
    //当前角色是Slave时
            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                // 如果当前Broker配置中指定了haMasterAddress,则赋值 HAClient 的 masterAddress
                if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                    // 将haMasterAddress的值设置到 HAService 的 HAClient 的masterAddress中
                    this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                    this.updateMasterHAServerAddrPeriodically = false;
                } else {
                    //如果配置中未指定Master的IP,则定期从Namesrv处更新获取
                    this.updateMasterHAServerAddrPeriodically = true;
                }

                //Slave每隔60S从Master处同步TopicConfig,ConsumerOffset,DelayOffset,SubscriptionGroupConfig
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            BrokerController.this.slaveSynchronize.syncAll();
                        } catch (Throwable e) {
                            log.error("ScheduledTask syncAll slave exception", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
            }
}

但是,如果在配置文件中没有配置Master的地址信息,则会在broker向Namesrv注册broker(registerBrokerAll)时,由Namesrv将master的地址信息更新到Slave中:

    public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {
        ...
        // 当注册Slave时,返回Master的addr作为高可用的地址
                if (MixAll.MASTER_ID != brokerId) {
                    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                    if (masterAddr != null) {
                        // 获取master的brokerLiveInfo,将其haServerAddr(addr)赋值给Slave的haServerAddr
                        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                        if (brokerLiveInfo != null) {
                            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                            result.setMasterAddr(masterAddr);
                        }
                    }
                }
                ...
    }

因此,Slave能够获取到Master的地址信息,通过以上两种方法指定MasterAdress后,将其值赋给HAService里的HAClient的masterAddress变量:

    public void updateMasterAddress(final String newAddr) {
        if (this.haClient != null) {
            this.haClient.updateMasterAddress(newAddr);
        }
    }

3 主从复制流程

3.1 初始化以及连接创建

主从复制过程主要和HAService类相关,Broker启动时,会相应启动HAService,用于主从复制,同时相应启动其内部的HAClient、AcceptSocketService(用于服务端接收连接线程实现类)、GroupTransferService(用于判断主从同步复制是否完成)。 Slave向Master发送请求,那就需要去Master看看处理这个请求的方式,Broker在启动时会启动HAService内的AcceptSocketService,这个对象就是用来接收Socket请求连接的服务对象,看其对Slave的请求处理形式:

public void run() {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
    //如果有客户端发起的连接请求,则建立连接
    SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

    ···
    //接收到Slave的请求,生成一个HAConnection,用户向Channel写数据以及读取Slave传过来的数据
    HAConnection conn = new HAConnection(HAService.this, sc);
    conn.start();
    ···
}

AcceptSocketService在启动时会监听SelectionKey.OP_ACCEPT事件,当接收到请求时,创建一个SocketChannel长连接对象,然后用HAConnection去封装SocketChannel,之后启动HAConnection,可以先看看HAConnection的start( ) 方法做了什么:

    public void start() {
        //启动{@link #readSocketService}读取Slave传过来的数据
        this.readSocketService.start();
        //启动{@link #writeSocketService}向Channel写入消息,同步给Slave
        this.writeSocketService.start();
    }

HAConnection 在实例化时生成了writeSocketService 和readSocketService 两个对象。

  • writeSocketService:负责同步操作,将Master中CommitLog的消息数据写入到SocketChannel;
  • readSocketService:负责从SocketChannel中读取Slave传递的ACK进度,也就是其CommitLog的maxPhyOffset,Slave向Master同步的进度信息。

下面看HAClient的启动过程:

    public void run() {
            while (!this.isStopped()) {
                try {
                    //当存在masterAddress != null && 连接Master成功
                    if (this.connectMaster()) {
                        // 若距离上次上报时间超过5S,上报到Master进度
                        if (this.isTimeToReportOffset()) {
                            boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                            if (!result) {
                                this.closeMaster();
                            }
                        }
                        //最多阻塞1S,直到Master有数据同步于过来。若1S满了还是没有接受到数据,中断阻塞,
                        // 执行processReadEvent(),但结果读入byteBufferRead的大小为0,然后循环到这步
                        this.selector.select(1000);
                        // 处理读取事件
                        boolean ok = this.processReadEvent();
                        if (!ok) {
                            this.closeMaster();
                        }
                        // 若进度有变化,上报到Master进度
                        if (!reportSlaveMaxOffsetPlus()) {
                            continue;
                        }

                        long interval =
                            HAService.this.getDefaultMessageStore().getSystemClock().now()
                                - this.lastWriteTimestamp;
                        // Master超过20S未返回数据,关闭连接
                        if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                            .getHaHousekeepingInterval()) {
                            log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
                                + "] expired, " + interval);
                            this.closeMaster();
                            log.warn("HAClient, master not response some time, so close connection");
                        }
                    } else {
                        this.waitForRunning(1000 * 5);
                    }
                } catch (Exception e) {
                    this.waitForRunning(1000 * 5);
                }
            }
    }

可以看出HAClient启动的第一步,便是和Master建立连接:

        /**
         * 连接Master节点
         * @return 是否连接成功
         * @throws ClosedChannelException 当注册读事件失败时
         */
        private boolean connectMaster() throws ClosedChannelException {
            if (null == socketChannel) {
                String addr = this.masterAddress.get();
                if (addr != null) {
                    //当自身角色是Slave时,会将配置中的MessageStoreConfig中的haMasterAddress赋值给masterAddress,或者在registerBroker时的返回值赋值
                    SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
                    if (socketAddress != null) {
                        // 连接Master节点。如果连接失败,直接关闭,不抛出异常。
                        this.socketChannel = RemotingUtil.connect(socketAddress);
                        if (this.socketChannel != null) {
                            //创建连接成功,注册读事件
                            this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                        }
                    }
                }
                //更新最近上报Offset,也就是当前CommitLog的maxOffset
                this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
                //更新最近写入时间
                this.lastWriteTimestamp = System.currentTimeMillis();
            }

            return this.socketChannel != null;
        }

Master在接收到Slave的连接请求后,创建SocketChannel,封装成一个HAConnection,同时启动writeSocketService和readSocketService服务。但Master启动时是不会主动传输数据的,因为其不知道Slave的CommitLog的maxPhyOffset,也就是不知道从哪个位置开始同步,需要Slave先上报当前CommitLog的maxPhyOffset。

3.2 Slave上报偏移量

Slave和Master之间的互相传输的第首先是Slave开始的。下面我们看Slave发起的请求步骤,首先,判断距离上次上报Master的时间间隔:

        /**
         * 是否满足上报进度的时间
         * 距离上一次上报进度时间超过5S , 也就是每5S上报一次进度
         * @return
         */
        private boolean isTimeToReportOffset() {
            long interval =
                HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
            boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
                .getHaSendHeartbeatInterval();

            return needHeart;
        }

如果满足条件,则执行reportSlaveMaxOffset上报进度:

        /**
         * 上报进度,传递进度的时候仅传递一个Long类型的Offset,8个字节,没有其他数据
         * @param maxOffset
         * @return
         */
        private boolean reportSlaveMaxOffset(final long maxOffset) {
            this.reportOffset.position(0);
            this.reportOffset.limit(8);
            this.reportOffset.putLong(maxOffset);
            this.reportOffset.position(0);
            this.reportOffset.limit(8);

            for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
                try {
                    this.socketChannel.write(this.reportOffset);
                } catch (IOException e) {
                    log.error(this.getServiceName()
                        + "reportSlaveMaxOffset this.socketChannel.write exception", e);
                    return false;
                }
            }

            return !this.reportOffset.hasRemaining();
        }

Master和Slave在创建连接成功后,生成SocketChannel,都注册SelectionKey.OP_READ事件。

3.3 Master获取Slave同步偏移量

当Slave上报数据时,会触发 SelectionKey.OP_READ事件,然后Master将请求交由ReadSocketService服务处理:

public void run() {
            ...
            while (!this.isStopped()) {
                try {
                    //最多阻塞1S钟,当有一个Channel发送了数据后,立即中断阻塞
                    //若1S时间内也没有Channel发送了数据,则processReadEvent()会立即返回true,然后再次阻塞,循环如此
                    this.selector.select(1000);
                    boolean ok = this.processReadEvent();
                    if (!ok) {
                        break;
                    }
                    // Slave超过20S没有返回数据,断开连接
                    long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
                    //超过20S时间没有收到Slave传递的消息,断开Slave连接
                    if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
                        log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
                        break;
                    }
                } catch (Exception e) {
                    HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                    break;
                }
            }
        ...
    }

当Slave传递了自身的maxPhyOffset时,Master会马上中断 selector.select(1000),执行processReadEvent()方法:

private boolean processReadEvent() {
            int readSizeZeroTimes = 0;

            // 重置读取缓冲区以及processPosition
            if (!this.byteBufferRead.hasRemaining()) {
                //超过范围,重置
                this.byteBufferRead.flip();
                this.processPostion = 0;
            }

            while (this.byteBufferRead.hasRemaining()) {
                try {
                    //从Channel里读取数据,可能Slave没有发送过来进度
                    int readSize = this.socketChannel.read(this.byteBufferRead);
                    if (readSize > 0) {
                        readSizeZeroTimes = 0;
                        // 设置最后读取时间
                        this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                        //传递过来的数据大于8字节,有效的同步,8个字节代表着有一个有效的CommitLog Offset
                        if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
                            // 读取Slave 请求来的CommitLog的最大位置
                            // position减去8的余数,这么做也是是为了防止流传输数据的粘包问题
                            int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                            // 读取ByteBuffer中position内最大的8字节的倍数,比如 position = 571,则 571-(571%8)= 571-3 = 568
                            // 也就是读取 560-568位置的字节,因为Slave可能发送了多个进度过来,Master只读最末尾也就是最大的那个
                            long readOffset = this.byteBufferRead.getLong(pos - 8);
                            this.processPostion = pos;

                            // 设置Slave CommitLog的最大位置
                            HAConnection.this.slaveAckOffset = readOffset;

                            // 设置Slave 第一次请求的位置,初始化 slaveRequestOffset = Slave的请求位置
                            if (HAConnection.this.slaveRequestOffset < 0) {
                                HAConnection.this.slaveRequestOffset = readOffset;
                                log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
                            }
                            // 通知目前Slave进度。主要用于Master节点为同步类型的。
                            HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                        }
                    } else if (readSize == 0) {
                        if (++readSizeZeroTimes >= 3) {
                            //连续读取3此没有数据,break,返回true,休眠
                            break;
                        }
                    } else {
                        log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
                        return false;
                    }
                } catch (IOException e) {
                    log.error("processReadEvent exception", e);
                    return false;
                }
            }

            return true;
        }
    }

其中,最重要的是修改HAConnection.this.slaveRequestOffset的值,由于其值默认是-1,导致WriteSocketService不知道从哪里开始同步给Slave,从而一直sleep,当获取到Slave第一次同步进度后,更新其值为Slave发送过了需要同步的偏移量后,writeSocketService启动后的定期循环,会发现slaveRequestOffset已经不是-1,然后知道从哪里开始将数据发送给Slave了,所以就可以从这个进度开始向SocketChannel写入数据。

3.4 Master同步数据到Slave

下面看一下WriteSocketService如何将数据同步到Slave:

public void run() {

            while (!this.isStopped()) {
                try {
                    //由于没有 SelectionKey.OP_WRITE事件,每次都会阻塞1S
                    this.selector.select(1000);
                    //如果slaveRequestOffset没有没重置,因为不知道Slave需要同步的开始位置,会休眠,然后继续,
                    // 只有Slave把maxPhyOffset传给Master后才能往下
                    if (-1 == HAConnection.this.slaveRequestOffset) {
                        Thread.sleep(10);
                        continue;
                    }

                    if (-1 == this.nextTransferFromWhere) {
                        //计算下一次需要同步Slave的位置
                        if (0 == HAConnection.this.slaveRequestOffset) {
                            //如果Slave传的maxPhyOffset为0,那么Master只会同步最后一个MappedFile的数据到Slave
                            long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                            masterOffset =
                                masterOffset
                                    - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                                    .getMapedFileSizeCommitLog());

                            if (masterOffset < 0) {
                                masterOffset = 0;
                            }

                            this.nextTransferFromWhere = masterOffset;
                        } else {
                            this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
                        }

                        log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
                            + "], and slave request " + HAConnection.this.slaveRequestOffset);
                    }

                    if (this.lastWriteOver) {
                        //上次的数据已经传输完成
                        long interval =
                            HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
                        //如果距离上次同步(写数据给Slave)超过5S,也就是5S内没有新的消息,发送一个空包过去,就是没有实际数据的,刷新Slave的最后写入时间
                        if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                            .getHaSendHeartbeatInterval()) {

                            // Build Header
                            this.byteBufferHeader.position(0);
                            this.byteBufferHeader.limit(headerSize);
                            this.byteBufferHeader.putLong(this.nextTransferFromWhere);
                            this.byteBufferHeader.putInt(0);
                            this.byteBufferHeader.flip();

                            this.lastWriteOver = this.transferData();
                            if (!this.lastWriteOver)
                                continue;
                        }
                    } else {
                        // 上次传输未完成,继续传输
                        this.lastWriteOver = this.transferData();
                        if (!this.lastWriteOver)
                            continue;
                    }

                    // 选择新的CommitLog内容进行传输,创建新的SelectMappedBufferResult
                    SelectMappedBufferResult selectResult =
                        HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
                    if (selectResult != null) {
                        int size = selectResult.getSize();
                        if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                            size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
                        }

                        long thisOffset = this.nextTransferFromWhere;
                        this.nextTransferFromWhere += size;

                        selectResult.getByteBuffer().limit(size);
                        this.selectMappedBufferResult = selectResult;

                        // Build Header
                        this.byteBufferHeader.position(0);
                        this.byteBufferHeader.limit(headerSize);
                        this.byteBufferHeader.putLong(thisOffset);
                        this.byteBufferHeader.putInt(size);
                        this.byteBufferHeader.flip();

                        this.lastWriteOver = this.transferData();
                    } else {

                        HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
                    }
                } catch (Exception e) {
                    break;
                }
            }
            ...
    }

当Slave第一次传给Master,发送的maxPhyOffset等于0,那么Master在同步数据时会仅同步最后一个MappedFile,后续会按照maxPhyOffset的偏移量开始同步给Slave。 从上面可以看出,将Master的CommitLog文件数据传输给Slave的主要流程由transferData完成:

        private boolean transferData() throws Exception {
            int writeSizeZeroTimes = 0;
            // Write Header
            while (this.byteBufferHeader.hasRemaining()) {
                //如果请求头有数据,先传输请求头,同时更新最后写入时间
                int writeSize = this.socketChannel.write(this.byteBufferHeader);
                if (writeSize > 0) {
                    writeSizeZeroTimes = 0;
                    this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                } else if (writeSize == 0) {
                    if (++writeSizeZeroTimes >= 3) {
                        break;
                    }
                } else {
                    throw new Exception("ha master write header error < 0");
                }
            }

            if (null == this.selectMappedBufferResult) {
                return !this.byteBufferHeader.hasRemaining();
            }

            writeSizeZeroTimes = 0;

            // Write Body
            if (!this.byteBufferHeader.hasRemaining()) {
                //请求头没有数据
                while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                    //请求体有数据
                    int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
                    if (writeSize > 0) {
                        writeSizeZeroTimes = 0;
                        this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                    } else if (writeSize == 0) {
                        if (++writeSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        throw new Exception("ha master write body error < 0");
                    }
                }
            }

            boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();

            if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                this.selectMappedBufferResult.release();
                this.selectMappedBufferResult = null;
            }
            //如果请求头和请求体的数据都传输完了,则返回true,否则,返回false
            return result;
        }

byteBufferHeader主要存放的是请求头,包含请求头大小、请求的偏移量、请求体的大小等信息; selectMappedBufferResult存放的是请求体,其中的数据主要从CommitLog文件中获取(getCommitLogData);

3.5 Slave处理Master同步的数据

到目前为止,Master已经将数据传输给Slave,当Master有数据传输过来, 触发 SelectionKey.OP_READ,此时Slave就能读取其数据。下面看看Slave如何处理收到的数据:

        /**
         * Slave处理Master同步的数据
         * @return
         */
        private boolean processReadEvent() {
            int readSizeZeroTimes = 0;
            while (this.byteBufferRead.hasRemaining()) {
                try {
                //将从Master处通过SocketChannel获取到的数据写入 byteBufferRead,返回写入的字节数量
                    int readSize = this.socketChannel.read(this.byteBufferRead);
                    if (readSize > 0) {
                        lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
                        readSizeZeroTimes = 0;
                        boolean result = this.dispatchReadRequest();
                        if (!result) {
                            log.error("HAClient, dispatchReadRequest error");
                            return false;
                        }
                    } else if (readSize == 0) {
                        if (++readSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        log.info("HAClient, processReadEvent read socket < 0");
                        return false;
                    }
                } catch (IOException e) {
                    log.info("HAClient, processReadEvent read socket exception", e);
                    return false;
                }
            }

            return true;
        }

其主要处理过程是由dispatchReadRequest完成,下面看看核心流程:

        /**
         * 读取Master传输的CommitLog数据,并返回是否异常
         * 如果读取到数据,写入CommitLog
         * 异常原因:
         * 1. Master传输来的数据offset 不等于 Slave的CommitLog数据最大offset
         * 2. 上报到Master进度失败
         *
         * @return 是否异常
         */
        private boolean dispatchReadRequest() {
            final int msgHeaderSize = 8 + 4; // phyoffset + size
            int readSocketPos = this.byteBufferRead.position();

            while (true) {
                int diff = this.byteBufferRead.position() - this.dispatchPostion;
                if (diff >= msgHeaderSize) {
                    //获取Master传过来的同步偏移量。用dispatchPostion的原因是:处理数据“粘包”导致数据读取不完整。
                    long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
                    int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);
                    //获取Slave的最大偏移量,Slave的CommitLog数据都是从Master同步过来,而不是客户端写入的
                    long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

                    if (slavePhyOffset != 0) {
                        //比较Master传过来的偏移量和Slave的偏移量是否相同
                        if (slavePhyOffset != masterPhyOffset) {
                            log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                                + slavePhyOffset + " MASTER: " + masterPhyOffset);
                            return false;
                        }
                    }
                    // 读取到消息
                    if (diff >= (msgHeaderSize + bodySize)) {
                        // 写入CommitLog
                        byte[] bodyData = new byte[bodySize];
                        this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);
                        //只取出body的数据
                        this.byteBufferRead.get(bodyData);

                        HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
                        // 设置处理到的位置
                        this.byteBufferRead.position(readSocketPos);
                        this.dispatchPostion += msgHeaderSize + bodySize;
                        // 上报到Master进度
                        if (!reportSlaveMaxOffsetPlus()) {
                            return false;
                        }

                        continue;
                    }
                }
                // 空间写满,重新分配空间
                if (!this.byteBufferRead.hasRemaining()) {
                    this.reallocateByteBuffer();
                }

                break;
            }

            return true;
}

从dispatchReadRequest方法里可以看到,Slave使用dispatchPostion变量来指定每次处理的位置,其目的是为了应对粘包问题。每次提取数据的body部分,追加到CommitLog,当添加成功一次就马上向Master上报此次的进度。

results matching ""

    No results matching ""