RocketMq主从复制
标签(空格分隔): 消息中间件
1 主从同步方式
我们知道,broker有两种角色master和slave,每种角色有两个关键配置:brokerId和brokerRole,brokerId指定此master或slave在Broker中的序号,0表示Master,1及之后的表示Slave,Broker中Slave可以有多个,当然一般一个就够了,所以brokerId的值一般为1。brokerRole表示其在Broker中的角色定位,有3个值可选:
public enum BrokerRole {
ASYNC_MASTER,
SYNC_MASTER,
SLAVE;
}
ASYNC_MASTER:异步Master,也就是新的消息存储时不需要等Slave同步完成; SYNC_MASTER:同步Master,新消息存现时需要等Slave同步完成,也就是返回的 Ack Offset >= 当前消息的CommitLog Offset; SLAVE:Slave角色其brokerRole;
2 Master和Slave的关系
我们知道Master和Slave主要通过心跳建立联系,如果长时间检测不到心跳,则长连接将被关系,同时,由于对broker的写操作都是发生在Master上,所以Master和Slave之间的数据需要进行同步,数据同步的发生主要通过Slave向Master上报同步进度。 下面,我们来看一下Slave如何获取到Master的地址信息,首先,如果在配置文件中指定了haMasterAddress,那么这个就会作为Master的IP+端口,Slave会向此地址发送请求,建立长连接,看broker初始化过程:
public boolean initialize() throws CloneNotSupportedException {
//当前角色是Slave时
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
// 如果当前Broker配置中指定了haMasterAddress,则赋值 HAClient 的 masterAddress
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
// 将haMasterAddress的值设置到 HAService 的 HAClient 的masterAddress中
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
//如果配置中未指定Master的IP,则定期从Namesrv处更新获取
this.updateMasterHAServerAddrPeriodically = true;
}
//Slave每隔60S从Master处同步TopicConfig,ConsumerOffset,DelayOffset,SubscriptionGroupConfig
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
} catch (Throwable e) {
log.error("ScheduledTask syncAll slave exception", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}
但是,如果在配置文件中没有配置Master的地址信息,则会在broker向Namesrv注册broker(registerBrokerAll)时,由Namesrv将master的地址信息更新到Slave中:
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
...
// 当注册Slave时,返回Master的addr作为高可用的地址
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
// 获取master的brokerLiveInfo,将其haServerAddr(addr)赋值给Slave的haServerAddr
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
...
}
因此,Slave能够获取到Master的地址信息,通过以上两种方法指定MasterAdress后,将其值赋给HAService里的HAClient的masterAddress变量:
public void updateMasterAddress(final String newAddr) {
if (this.haClient != null) {
this.haClient.updateMasterAddress(newAddr);
}
}
3 主从复制流程
3.1 初始化以及连接创建
主从复制过程主要和HAService类相关,Broker启动时,会相应启动HAService,用于主从复制,同时相应启动其内部的HAClient、AcceptSocketService(用于服务端接收连接线程实现类)、GroupTransferService(用于判断主从同步复制是否完成)。 Slave向Master发送请求,那就需要去Master看看处理这个请求的方式,Broker在启动时会启动HAService内的AcceptSocketService,这个对象就是用来接收Socket请求连接的服务对象,看其对Slave的请求处理形式:
public void run() {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
//如果有客户端发起的连接请求,则建立连接
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
···
//接收到Slave的请求,生成一个HAConnection,用户向Channel写数据以及读取Slave传过来的数据
HAConnection conn = new HAConnection(HAService.this, sc);
conn.start();
···
}
AcceptSocketService在启动时会监听SelectionKey.OP_ACCEPT事件,当接收到请求时,创建一个SocketChannel长连接对象,然后用HAConnection去封装SocketChannel,之后启动HAConnection,可以先看看HAConnection的start( ) 方法做了什么:
public void start() {
//启动{@link #readSocketService}读取Slave传过来的数据
this.readSocketService.start();
//启动{@link #writeSocketService}向Channel写入消息,同步给Slave
this.writeSocketService.start();
}
HAConnection 在实例化时生成了writeSocketService 和readSocketService 两个对象。
- writeSocketService:负责同步操作,将Master中CommitLog的消息数据写入到SocketChannel;
- readSocketService:负责从SocketChannel中读取Slave传递的ACK进度,也就是其CommitLog的maxPhyOffset,Slave向Master同步的进度信息。
下面看HAClient的启动过程:
public void run() {
while (!this.isStopped()) {
try {
//当存在masterAddress != null && 连接Master成功
if (this.connectMaster()) {
// 若距离上次上报时间超过5S,上报到Master进度
if (this.isTimeToReportOffset()) {
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
//最多阻塞1S,直到Master有数据同步于过来。若1S满了还是没有接受到数据,中断阻塞,
// 执行processReadEvent(),但结果读入byteBufferRead的大小为0,然后循环到这步
this.selector.select(1000);
// 处理读取事件
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
// 若进度有变化,上报到Master进度
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
long interval =
HAService.this.getDefaultMessageStore().getSystemClock().now()
- this.lastWriteTimestamp;
// Master超过20S未返回数据,关闭连接
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
.getHaHousekeepingInterval()) {
log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
+ "] expired, " + interval);
this.closeMaster();
log.warn("HAClient, master not response some time, so close connection");
}
} else {
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
this.waitForRunning(1000 * 5);
}
}
}
可以看出HAClient启动的第一步,便是和Master建立连接:
/**
* 连接Master节点
* @return 是否连接成功
* @throws ClosedChannelException 当注册读事件失败时
*/
private boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) {
String addr = this.masterAddress.get();
if (addr != null) {
//当自身角色是Slave时,会将配置中的MessageStoreConfig中的haMasterAddress赋值给masterAddress,或者在registerBroker时的返回值赋值
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
// 连接Master节点。如果连接失败,直接关闭,不抛出异常。
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
//创建连接成功,注册读事件
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
//更新最近上报Offset,也就是当前CommitLog的maxOffset
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
//更新最近写入时间
this.lastWriteTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}
Master在接收到Slave的连接请求后,创建SocketChannel,封装成一个HAConnection,同时启动writeSocketService和readSocketService服务。但Master启动时是不会主动传输数据的,因为其不知道Slave的CommitLog的maxPhyOffset,也就是不知道从哪个位置开始同步,需要Slave先上报当前CommitLog的maxPhyOffset。
3.2 Slave上报偏移量
Slave和Master之间的互相传输的第首先是Slave开始的。下面我们看Slave发起的请求步骤,首先,判断距离上次上报Master的时间间隔:
/**
* 是否满足上报进度的时间
* 距离上一次上报进度时间超过5S , 也就是每5S上报一次进度
* @return
*/
private boolean isTimeToReportOffset() {
long interval =
HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
.getHaSendHeartbeatInterval();
return needHeart;
}
如果满足条件,则执行reportSlaveMaxOffset上报进度:
/**
* 上报进度,传递进度的时候仅传递一个Long类型的Offset,8个字节,没有其他数据
* @param maxOffset
* @return
*/
private boolean reportSlaveMaxOffset(final long maxOffset) {
this.reportOffset.position(0);
this.reportOffset.limit(8);
this.reportOffset.putLong(maxOffset);
this.reportOffset.position(0);
this.reportOffset.limit(8);
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
this.socketChannel.write(this.reportOffset);
} catch (IOException e) {
log.error(this.getServiceName()
+ "reportSlaveMaxOffset this.socketChannel.write exception", e);
return false;
}
}
return !this.reportOffset.hasRemaining();
}
Master和Slave在创建连接成功后,生成SocketChannel,都注册SelectionKey.OP_READ事件。
3.3 Master获取Slave同步偏移量
当Slave上报数据时,会触发 SelectionKey.OP_READ事件,然后Master将请求交由ReadSocketService服务处理:
public void run() {
...
while (!this.isStopped()) {
try {
//最多阻塞1S钟,当有一个Channel发送了数据后,立即中断阻塞
//若1S时间内也没有Channel发送了数据,则processReadEvent()会立即返回true,然后再次阻塞,循环如此
this.selector.select(1000);
boolean ok = this.processReadEvent();
if (!ok) {
break;
}
// Slave超过20S没有返回数据,断开连接
long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
//超过20S时间没有收到Slave传递的消息,断开Slave连接
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
break;
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
...
}
当Slave传递了自身的maxPhyOffset时,Master会马上中断 selector.select(1000),执行processReadEvent()方法:
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
// 重置读取缓冲区以及processPosition
if (!this.byteBufferRead.hasRemaining()) {
//超过范围,重置
this.byteBufferRead.flip();
this.processPostion = 0;
}
while (this.byteBufferRead.hasRemaining()) {
try {
//从Channel里读取数据,可能Slave没有发送过来进度
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
// 设置最后读取时间
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
//传递过来的数据大于8字节,有效的同步,8个字节代表着有一个有效的CommitLog Offset
if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
// 读取Slave 请求来的CommitLog的最大位置
// position减去8的余数,这么做也是是为了防止流传输数据的粘包问题
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
// 读取ByteBuffer中position内最大的8字节的倍数,比如 position = 571,则 571-(571%8)= 571-3 = 568
// 也就是读取 560-568位置的字节,因为Slave可能发送了多个进度过来,Master只读最末尾也就是最大的那个
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPostion = pos;
// 设置Slave CommitLog的最大位置
HAConnection.this.slaveAckOffset = readOffset;
// 设置Slave 第一次请求的位置,初始化 slaveRequestOffset = Slave的请求位置
if (HAConnection.this.slaveRequestOffset < 0) {
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
}
// 通知目前Slave进度。主要用于Master节点为同步类型的。
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
//连续读取3此没有数据,break,返回true,休眠
break;
}
} else {
log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
return false;
}
} catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}
return true;
}
}
其中,最重要的是修改HAConnection.this.slaveRequestOffset的值,由于其值默认是-1,导致WriteSocketService不知道从哪里开始同步给Slave,从而一直sleep,当获取到Slave第一次同步进度后,更新其值为Slave发送过了需要同步的偏移量后,writeSocketService启动后的定期循环,会发现slaveRequestOffset已经不是-1,然后知道从哪里开始将数据发送给Slave了,所以就可以从这个进度开始向SocketChannel写入数据。
3.4 Master同步数据到Slave
下面看一下WriteSocketService如何将数据同步到Slave:
public void run() {
while (!this.isStopped()) {
try {
//由于没有 SelectionKey.OP_WRITE事件,每次都会阻塞1S
this.selector.select(1000);
//如果slaveRequestOffset没有没重置,因为不知道Slave需要同步的开始位置,会休眠,然后继续,
// 只有Slave把maxPhyOffset传给Master后才能往下
if (-1 == HAConnection.this.slaveRequestOffset) {
Thread.sleep(10);
continue;
}
if (-1 == this.nextTransferFromWhere) {
//计算下一次需要同步Slave的位置
if (0 == HAConnection.this.slaveRequestOffset) {
//如果Slave传的maxPhyOffset为0,那么Master只会同步最后一个MappedFile的数据到Slave
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMapedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
this.nextTransferFromWhere = masterOffset;
} else {
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+ "], and slave request " + HAConnection.this.slaveRequestOffset);
}
if (this.lastWriteOver) {
//上次的数据已经传输完成
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
//如果距离上次同步(写数据给Slave)超过5S,也就是5S内没有新的消息,发送一个空包过去,就是没有实际数据的,刷新Slave的最后写入时间
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getHaSendHeartbeatInterval()) {
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
} else {
// 上次传输未完成,继续传输
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
// 选择新的CommitLog内容进行传输,创建新的SelectMappedBufferResult
SelectMappedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
int size = selectResult.getSize();
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;
selectResult.getByteBuffer().limit(size);
this.selectMappedBufferResult = selectResult;
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(thisOffset);
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
} else {
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
} catch (Exception e) {
break;
}
}
...
}
当Slave第一次传给Master,发送的maxPhyOffset等于0,那么Master在同步数据时会仅同步最后一个MappedFile,后续会按照maxPhyOffset的偏移量开始同步给Slave。 从上面可以看出,将Master的CommitLog文件数据传输给Slave的主要流程由transferData完成:
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
// Write Header
while (this.byteBufferHeader.hasRemaining()) {
//如果请求头有数据,先传输请求头,同时更新最后写入时间
int writeSize = this.socketChannel.write(this.byteBufferHeader);
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write header error < 0");
}
}
if (null == this.selectMappedBufferResult) {
return !this.byteBufferHeader.hasRemaining();
}
writeSizeZeroTimes = 0;
// Write Body
if (!this.byteBufferHeader.hasRemaining()) {
//请求头没有数据
while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
//请求体有数据
int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write body error < 0");
}
}
}
boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
this.selectMappedBufferResult.release();
this.selectMappedBufferResult = null;
}
//如果请求头和请求体的数据都传输完了,则返回true,否则,返回false
return result;
}
byteBufferHeader主要存放的是请求头,包含请求头大小、请求的偏移量、请求体的大小等信息; selectMappedBufferResult存放的是请求体,其中的数据主要从CommitLog文件中获取(getCommitLogData);
3.5 Slave处理Master同步的数据
到目前为止,Master已经将数据传输给Slave,当Master有数据传输过来, 触发 SelectionKey.OP_READ,此时Slave就能读取其数据。下面看看Slave如何处理收到的数据:
/**
* Slave处理Master同步的数据
* @return
*/
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
while (this.byteBufferRead.hasRemaining()) {
try {
//将从Master处通过SocketChannel获取到的数据写入 byteBufferRead,返回写入的字节数量
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
readSizeZeroTimes = 0;
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
} catch (IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}
return true;
}
其主要处理过程是由dispatchReadRequest完成,下面看看核心流程:
/**
* 读取Master传输的CommitLog数据,并返回是否异常
* 如果读取到数据,写入CommitLog
* 异常原因:
* 1. Master传输来的数据offset 不等于 Slave的CommitLog数据最大offset
* 2. 上报到Master进度失败
*
* @return 是否异常
*/
private boolean dispatchReadRequest() {
final int msgHeaderSize = 8 + 4; // phyoffset + size
int readSocketPos = this.byteBufferRead.position();
while (true) {
int diff = this.byteBufferRead.position() - this.dispatchPostion;
if (diff >= msgHeaderSize) {
//获取Master传过来的同步偏移量。用dispatchPostion的原因是:处理数据“粘包”导致数据读取不完整。
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);
//获取Slave的最大偏移量,Slave的CommitLog数据都是从Master同步过来,而不是客户端写入的
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
if (slavePhyOffset != 0) {
//比较Master传过来的偏移量和Slave的偏移量是否相同
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ slavePhyOffset + " MASTER: " + masterPhyOffset);
return false;
}
}
// 读取到消息
if (diff >= (msgHeaderSize + bodySize)) {
// 写入CommitLog
byte[] bodyData = new byte[bodySize];
this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);
//只取出body的数据
this.byteBufferRead.get(bodyData);
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
// 设置处理到的位置
this.byteBufferRead.position(readSocketPos);
this.dispatchPostion += msgHeaderSize + bodySize;
// 上报到Master进度
if (!reportSlaveMaxOffsetPlus()) {
return false;
}
continue;
}
}
// 空间写满,重新分配空间
if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}
break;
}
return true;
}
从dispatchReadRequest方法里可以看到,Slave使用dispatchPostion变量来指定每次处理的位置,其目的是为了应对粘包问题。每次提取数据的body部分,追加到CommitLog,当添加成功一次就马上向Master上报此次的进度。