RocketMq存储
标签(空格分隔): 消息中间件
1 数据结构介绍
1.1 CommitLog
每一个broker对应一个CommitLog结构,在broker初始化中会调用DefaultMessageStore初始化CommitLog结构,该结构对应一个MappedFileQueue结构,而MappedFileQueue结构由多个MappedFile文件组成,每个文件的大小为1G(private int mapedFileSizeCommitLog = 1024 1024 1024)。
CommitLog : MappedFileQueue : MappedFile = 1 : 1 : N。
在目录中/Users/xxx/store/commitlog,可以看到:
total 10485760
-rw-r--r-- 1 xxx staff 1073741824 4 21 16:27 00000000000000000000
-rw-r--r-- 1 xxx staff 1073741824 4 21 16:29 00000000001073741824
-rw-r--r-- 1 xxx staff 1073741824 4 21 16:32 00000000002147483648
-rw-r--r-- 1 xxx staff 1073741824 4 21 16:33 00000000003221225472
-rw-r--r-- 1 xxx staff 1073741824 4 21 16:32 00000000004294967296
在CommitLog目录中包含多个MappedFile文件,每一个文件的大小为1GB大小,即1024 1024 1024 = 1073741824 字节,这每个文件的命名是按照总的字节偏移量来命名的。
CommitLog、MappedFileQueue、MappedFile 的定义如下:
MappedFile :00000000000000000000、00000000001073741824、00000000002147483648等文件。 MappedFileQueue :MappedFile 所在的文件夹,对 MappedFile 进行封装成文件队列,对上层提供可无限使用的文件容量。 每个 MappedFile 统一文件大小。 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在 CommitLog 里默认为 1GB。 CommitLog :针对 MappedFileQueue 的封装使用。
CommitLog 目前存储的MappedFile文件有两种内容类型:
MESSAGE :消息信息。 BLANK :文件不足以存储消息时的空白占位。
每一个MappedFile文件的结构为: | MESSAGE(1) | MESSAGE(2) | ... |MESSAGE(n - 1) |MESSAGE(n)|BLANK| | :----: | :----: | :----: |:----: |:----: |:----: |
如果当前MappedFile文件没有空间存储接收的消息,此时将插入一条BLANK的消息,然后创建一个新的MappedFile文件来存放接收的消息。
MESSAGE 在 CommitLog 存储结构:
第几位 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | MsgLen | 消息总长度 | Int | 4 |
2 | MagicCode | MESSAGE_MAGIC_CODE | Int | 4 |
3 | BodyCRC | 消息内容CRC | Int | 4 |
4 | QueueId | 消息队列编号 | Int | 4 |
5 | Flag | flag | Int | 4 |
6 | QueueOffset | 消息队列位置 | Long | 8 |
7 | PhysicalOffset | 物理位置。在 CommitLog 的顺序存储位置。 | Long | 8 |
8 | SysFlag | MessageSysFlag | Int | 4 |
9 | BornTimestamp | 生成消息时间戳 | Long | 8 |
10 | BornHost | 生效消息的地址+端口 | Long | 8 |
11 | StoreTimestamp | 存储消息时间戳 | Long | 8 |
12 | StoreHost | 存储消息的地址+端口 | Long | 8 |
13 | ReconsumeTimes | 重新消费消息次数 | Int | 4 |
14 | PreparedTransationOffset | Long | 8 | |
15 | BodyLength + Body | 内容长度 + 内容 | Int + Bytes | 4 + bodyLength |
16 | TopicLength + Topic | Topic长度 + Topic | Byte + Bytes | 1 + topicLength |
17 | PropertiesLength + Properties | 拓展字段长度 + 拓展字段 | Short + Bytes | 2 + PropertiesLength |
BLANK 在 CommitLog 存储结构:
第几位 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | maxBlank | 空白长度 | Int | 4 |
2 | MagicCode | BLANK_MAGIC_CODE |
1.2 ConsumeQueue
consume queue是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件commit log上的位置。 ConsumeQueue对应一个topic的一个队列结构,由topic和queueId可以唯一创建一个ConsumeQueue结构,同样ConsumeQueue包含一个MappedFileQueue结构,而MappedFileQueue结构由多个MappedFile文件组成,每个文件的大小为30000020(300000 ConsumeQueue.CQ_STORE_UNIT_SIZE),其中20是queue中每个存储单元的大小。 ConsumeQueue : MappedFileQueue : MappedFile = 1 : 1 : N。 在目录中/Users/xxx/store/consumequeue/TopicA/0,即在topic=TopicA,queueId=0的目录中,可以看到:
-rw-r--r-- 1 xxx staff 6000000 4 21 16:27 00000000000000000000 -rw-r--r-- 1 xxx staff 6000000 4 21 16:29 00000000000006000000 -rw-r--r-- 1 xxx staff 6000000 4 21 16:32 00000000000012000000 -rw-r--r-- 1 xxx staff 6000000 4 21 16:33 00000000000018000000 -rw-r--r-- 1 xxx staff 6000000 4 21 16:32 00000000000024000000
每一个MappedFile文件大小都是6000000,文件名用字节偏移量来命名的。 每一个MappedFile文件的结构为,总共包含300000个20字节: | 20字节 | 20字节 | ... |20字节 |20字节| | :----: | :----: | :----: |:----: |:----: |
其中20字节的格式如下:
consumequeue文件存储单元格式
- CommitLog Offset是指这条消息在Commit Log文件中的实际偏移量
- Size存储中消息的大小
- Message Tag HashCode存储消息的Tag的哈希值:主要用于订阅时消息过滤(订阅时如果指定了Tag,会根据HashCode来快速查找到订阅的消息)
1.3 RocketMq数据存储图
在文件系统中的分布图:
RocketMQ消息存储流程图及数据结构图:
这张图片比较清晰的表明了CommitLog、ConsumeQueue和IndexFile之间的关系,IndexFile文件后续在讲解。
3 消息存储
接上一章的broker处理消息,来先消息如何存储到磁盘。主要从commitlog.putMessage开始执行,下面先来看一下该方法。
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
...
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
...
if (null == mappedFile || mappedFile.isFull()) {
//新建一个mappedFile文件
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
//将消息写入到创建的mappedFile中
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
//消息最大为4M,所以不可能一个mappedFile容不下
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
...
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
...
//判断是否需要将内存中消息刷到磁盘
handleDiskFlush(result, putMessageResult, msg);
handleHA(result, putMessageResult, msg);
return putMessageResult;
}
commitlog所有消息文件存储在一个列表中:
当有新的消息到来的时候,其会默认选择列表中的最后一个文件来进行消息的保存:
3.1 获取MappedFile文件
首先,从mappedFiles中获取最后一个文件:
public MappedFile getLastMappedFile() {
MappedFile mappedFileLast = null;
while (!this.mappedFiles.isEmpty()) {
try {
//获取最后一个MappedFile文件
mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
break;
} catch (IndexOutOfBoundsException e) {
//continue;
} catch (Exception e) {
log.error("getLastMappedFile has exception.", e);
break;
}
}
return mappedFileLast;
}
当然如果当前Broker之前从未接受过消息的话,那么mappedFiles肯定是空的或者从mappedFiles中获取到的最后一个文件已经满了。这样一旦有新的消息需要存储的时候,其就得需要立即创建一个 MappedFile 文件来存储消息,并将创建的文件加入mappedFiles列表中。
/**
* 创建一个新的MappedFile文件
* @param startOffset
* @param needCreate
* @return
*/
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile();
if (mappedFileLast == null) {
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
if (mappedFileLast != null && mappedFileLast.isFull()) {
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
if (createOffset != -1 && needCreate) {
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
// 添加至列表中
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
return mappedFileLast;
}
RocketMQ 提供了一个专门用来实例化 MappedFile 文件的服务类 AllocateMappedFileService。 整个创建MappedFile的过程,会使用预分配原则,创建2个MappedFile文件。首先,会创建一个AllocateRequest请求,将它放入一个优先请求队列requestQueue,用需要创建的文件大小排序。同时,还使用一张请求表 requestTable,以文件路径作为key,存放该请求。先将AllocateRequest请求放入请求表中,如果成功,再将其放入优先请求队列requestQueue。
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
//预分配MappedFile文件,提取创建好,每次预分配一个
int canSubmitRequests = 2;
···
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
if (nextPutOK) {
...
boolean offerOK = this.requestQueue.offer(nextReq);
...
canSubmitRequests--;
}
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
if (nextNextPutOK) {
...
boolean offerOK = this.requestQueue.offer(nextNextReq);
...
}
...
AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result != null) {
//等待创建MappedFile文件完成
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
if (!waitOK) {
log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
return null;
} else {
//等待 MappedFile 创建完毕之后,其便会从请求表 requestTable 中取出并删除表中记录
this.requestTable.remove(nextFilePath);
return result.getMappedFile();
}
} else {
log.error("find preallocate mmap failed, this never happen");
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
return null;
}
流程如下:
在broker初始化过程中,会初始化DefaultMessageStore结构,同时初始化并启动AllocateMappedFileService服务,服务类会一直等待优先级队列是否有新的请求到来,如果有,便会从队列中取出请求,然后创建对应的 MappedFile,并将请求表 requestTable 中 AllocateRequest 对象的字段 mappedFile 设置上值。最后将 AllocateRequest 对象上的 CountDownLatch 的计数器减 1 ,以标明此分配申请的 MappedFile 已经创建完毕了,然后在putRequestAndReturnMappedFile就可以获取到创建好的MappedFile文件:
public void run() {
// 一直运行
while (!this.isStopped() && this.mmapOperation()) {
}
}
private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {
//返回并删除队列中的第一个元素。如果队列是空的,这个方法将阻塞线程直到队列有元素。
req = this.requestQueue.take();
...
if (req.getMappedFile() == null) {
long beginTime = System.currentTimeMillis();
...
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
...
}
...
req.setMappedFile(mappedFile);
this.hasException = false;
isSuccess = true;
}
} catch (InterruptedException e) {
} catch (IOException e) {
} finally {
if (req != null && isSuccess)
req.getCountDownLatch().countDown();
}
return true;
}
创建好MappedFile文件后,然后将文件加入到mappedFiles列表中。
3.2 初始化MappedFile文件
在 MappedFile 的构造函数中,使用了FileChannel类提供了一个名为map( )的方法,该方法可以在一个打开的文件和一个特殊类型的ByteBuffer之间建立一个虚拟内存映射。在FileChannel上调用map( )方法会创建一个由磁盘文件支持的虚拟内存映射(virtual memory mapping)并在那块虚拟内存空间外部封装一个MappedByteBuffer对象。由map( )方法返回的MappedByteBuffer对象的行为在多数方面类似一个基于内存的缓冲区,只不过该对象的数据元素存储在磁盘上的一个文件中。调用get( )方法会从磁盘文件中获取数据,此数据反映该文件的当前内容,即使在映射建立之后文件已经被一个外部进程做了修改。通过文件映射看到的数据同您用常规方法读取文件看到的内容是完全一样的。相似地,对映射的缓冲区实现一个put( )会更新磁盘上的那个文件(假设对该文件您有写的权限),并且您做的修改对于该文件的其他阅读者也是可见的。采用这种方式,通常比传统的基于文件 IO 流的方式读取效率高。
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
//文件名即全局偏移量,而不是文件内的偏移量
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
ensureDirOK(this.file.getParent());
try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
//将进程空间映射到磁盘文件
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("create file channel " + this.fileName + " Failed. ", e);
throw e;
} catch (IOException e) {
log.error("map file " + this.fileName + " Failed. ", e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
3.3 写入消息
在获取到 MappedFile文件之后,我们便可以往这个文件里面写入消息了(mappedFile.appendMessage(msg,this.appendMessageCallback))。
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
...
//获取文件内的写入偏移量
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result = null;
if (messageExt instanceof MessageExtBrokerInner) {
//单条消息
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
//批量消息
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
//更新写入位置
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
...
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
先看单条消息的写入:
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
// PHY OFFSET
//文件起始位置+相对偏移量
long wroteOffset = fileFromOffset + byteBuffer.position();
this.resetByteBuffer(hostHolder, 8);
//消息id的产生
String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
// Record ConsumeQueue information
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
//用topic+queueId作为key,在broker唯一确定一个consumequeue,value记录对应存放的消息条数
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
//还没记录这个consumequeue记录的消息数,初始化
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
...
// Determines whether there is sufficient free space
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
//剩余空间不能完全容纳整条消息
....
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
//剩余空间能容纳消息
...
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
//消息条数增加
CommitLog.this.topicQueueTable.put(key, ++queueOffset);
break;
default:
break;
}
return result;
}
写入消息可能会遇见如下两种情况,一种是这条消息可以完全追加到这个文件中,另外一种是这条消息完全不能或者只有一小部分只能存放到这个文件中,其余的需要放到新的文件中。我们对于这两种情况分别讨论:
3.3.1 可以完全写入
通过判断当前文件的剩余空间是否能容纳需要写入消息的长度,来决定当前文件是否能容纳写入的文件,如果能容纳,比较比较简单,组装消息然后写入缓存ByteBuffer中,然后更新写入的位置:
// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
//this.msgBatchMemory.put(msgInner.getStoreHostBytes());
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.msgStoreItemMemory.put(propertiesData);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
每条消息写入前都包含4字节(总长度)和4字节(MAGICCODE)的空间,当读取到总长度后,可以确定下一条消息的位置,
3.3.2 不可以完全写入
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
//剩余空间不能完全容纳整条消息
this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
当空间不足以容纳当前消息时,只写入总长度+MAGICCODE,下面是一个MappedFile文件格式:
如果返回值为END_OF_FILE,则会再次执行如下代码,重新获取一个MappedFile文件,将消息写入:
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
···
//消息最大为4M,所以不可能一个mappedFile容不下
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
3.4 消息刷盘
当消息体追加到 MappedFile以后,这条消息实际上还只是存储在内存中,因此还需要将内存中的内容刷到磁盘上才算真正的存储下来,才能确保消息不丢失。一般而言,刷盘有两种策略: 异步刷盘和同步刷盘。
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
//唤醒服务,执行立即刷盘动作
service.wakeup();
}
}
// Asynchronous flush
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
}
3.4.1 同步刷盘
同步刷盘主要由GroupCommitService服务完成,首先,用下一次写入的位置初始化请求GroupCommitRequest请求,然后将请求放入到写请求链表,同时同步阻塞5s等待刷盘完成。 GroupCommitService线程启动后一致死循环执行刷盘动作:
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.waitForRunning(10);
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
...
}
protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
//用来交换读请求链表和写请求链表
this.onWaitEnd();
return;
}
//entry to wait
waitPoint.reset();
try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
//用来交换读请求链表和写请求链表
this.onWaitEnd();
}
}
在这个服务内部,每隔 10ms就会检查读请求队列是否不为空,如果不为空, 则会将读队列中的所有请求执行刷盘,并清空读链表。
class GroupCommitService extends FlushCommitLogService {
private void doCommit() {
// 检查所有读队列中的请求
for (GroupCommitRequest req : this.requestsRead) {
// 每个请求执行刷盘
CommitLog.this.mappedFileQueue.flush(0);
//唤醒在等待刷盘完成的broker线程
req.wakeupCustomer(flushOK);
}
this.requestsRead.clear();
}
}
GroupCommitService服务主要使用两个链表来存放请求,一个写请求链表和一个读请求链表,一般这两个队列每隔10ms就交换一下“身份”,这么做的目的其实也是为了读写分离。 在同步刷盘中,首先,通过创建一个请求刷盘的对象,然后通过 putRequest() 方法放入写请求链表中,这个时候会立即唤醒这个服务,写请求链表和读请求链表的角色会进行交换,交换角色之后,读请求链表就不为空,继而可以调用doCommit()执行所有刷盘请求了。而在这期间,Broker 会一直阻塞等待最多5秒钟,在这期间如果完不成刷盘请求的话,那么视作刷盘超时:
service.putRequest(request);
// 等待刷盘完成,完成后会立即被唤醒
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
// 刷盘超时
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
执行putRequest流程:
public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
if (hasNotified.compareAndSet(false, true)) {
//通知线程,有请求,可以立即执行
waitPoint.countDown(); // notify
}
}
读请求链表和写请求链表的变化过程如下: