第十二章-Broker-同步刷盘(一)
12.1 刷盘
CommitLog.handleDiskFlush
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// 同步刷盘
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 同步刷盘用 GroupCommitService 服务
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) { // 判断同步刷盘情况下,要不要等待刷盘结束,默认是要的
// 创建刷盘提交请求
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
// 将请求放到缓存List中,供 GroupCommitService 服务线程执行刷盘
service.putRequest(request);
// 线程等待刷盘结果
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
// 等待超时,返回结果为刷盘超时,实际这个时候可能已经刷成功了,也可能没成功
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
// 如不等待,则直接唤醒刷盘服务线程,当前方法的执行就结束了
service.wakeup();
}
}
// 异步刷盘
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
// 异步刷盘用 CommitRealTimeService 服务,看`章节12.1.2`
commitLogService.wakeup();
}
}
}
putRequest 请求放到缓存List中
public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) {
// 将请求添加到List
this.requestsWrite.add(request);
}
// CAS hasNotified,默认是false,可以通过CAS处理
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // 通知等待在 waitPoint 的线程执行
}
}
12.1.1 同步刷盘
同步刷盘用的是类 GroupCommitService 来处理,这个类也是一个线程任务类,实现了Runnable,那么就需要启动线程,通过以下调用链启动该任务:
BrokerController.start()
-> DefaultMessageStore.start()
->CommitLog.start()
->GroupCommitService.start()
直接进入 GroupCommitService.run()方法
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {// 状态判断
try {
// 等待请求任务过来后再执行
this.waitForRunning(10);
// 执行提交的请求任务
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// Under normal circumstances shutdown, wait for the arrival of the
// request, and then flush
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}
synchronized (this) {
this.swapRequests();
}
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end");
}
protected void waitForRunning(long interval) {
// 前面提交请求时,已经将 hasNotified CAS 成 true,所以这一步能成功CAS为false
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd(); // 表示收到请求通知,这里等待结束
return;
}
// 没有收到请求任务通知,重置 waitPoint 的 countDown值
waitPoint.reset();
try {
// 继续等待 interval 时间
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
// 等待完成,一定要设置为false,否则下一次请求进来无法通知线程执行,看 putRequest 方法
hasNotified.set(false);
this.onWaitEnd();
}
}
protected void onWaitEnd() {
// 交换请求,将提交请求和任务执行请求分开来处理,不使用同一个List,保证线程安全和数据一致性
this.swapRequests();
}
private void doCommit() {
synchronized (this.requestsRead) {
// 判断任务要提取的请求不为空
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) { // 遍历List
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = false;
// 这里为啥要2次执行,也能理解,requestsRead 支持多消息提交刷盘,多消息就可能涉及2个文件,前一个文件尾部,后一个文件头,但是4.5.1版本的RocketMQ,默认是不存在这种情况的,不管是单条消息还是批量消息,都发生在一个文件中,并作为一个 requestsRead 请求提交,所以实现上,该版本不会存在2个文件的情况。这里可能只是一个扩展功能
for (int i = 0; i < 2 && !flushOK; i++) {
// 判断是不是已经刷过盘(注:每次刷盘都会记录偏移位置)
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
if (!flushOK) {
// 确认没有刷盘,那就直接去刷
CommitLog.this.mappedFileQueue.flush(0);
}
}
// 通知刷盘成功,通知谁呢,就是通知提交线程,因为入口方法 handleDiskFlush 的线程还在等着结果呢
req.wakeupCustomer(flushOK);
}
// 记录消息存储的时间
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
// 清空请求列表
this.requestsRead.clear();
} else {
// 这种情况就是不等待刷盘结果,直接刷盘
CommitLog.this.mappedFileQueue.flush(0);
}
}
}
MappedFileQueue.flush
public boolean flush(final int flushLeastPages) {
boolean result = true;
// 根据刷新的位置,确定要刷新的是哪个文件的映射
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
// 刷盘操作,重点看这一句,继续往里跟
int offset = mappedFile.flush(flushLeastPages);
// 刷盘成功后要增加已被刷的位置,用于下次刷盘的判断
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.flushedWhere;
this.flushedWhere = where;
if (0 == flushLeastPages) {
this.storeTimestamp = tmpTimeStamp;
}
}
return result;
}
MappedFile.flush
public int flush(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
int value = getReadPosition();
// 刷盘这一块的操作通过 try...catch... 包裹出来,也就是说真实刷盘 force() 有可能产生异常
try {
//We only append data to fileChannel or mappedByteBuffer, never both.
if (writeBuffer != null || this.fileChannel.position() != 0) {
this.fileChannel.force(false);
} else {
// 这里底层调用的是 msync,可以看jvm源码,下面有贴出来
this.mappedByteBuffer.force();
}
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}
// 记录已刷盘的最新位置。但是这段代码在try...catch...范围外,也就是说,即使force()产生异常,这一段还是会执行,那么就存在刷盘失败时,依然记录新的刷盘位置,这种极端情况就会导致消息数据丢失,而用户收到的却是成功,所以说,RocketMQ 也是存在数据丢失的。
this.flushedPosition.set(value);
this.release();
} else {
log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
return this.getFlushedPosition();
}
force()的JVM底层实现
这一段源码,在MappedByteBuffer.c 文件中,有兴趣的读者可以去看看。
msync系统调用失败的原因有很多,包含但不限于:
1、文件描述符无效或未正确打开
2、内存映射区域对应的文件被破坏
3、权限被篡改
4、系统资源不足
5、内核错误
6、msync同步问题:多个进程共同操作 msync
JNIEXPORT void JNICALL
Java_java_nio_MappedByteBuffer_force0(JNIEnv *env, jobject obj, jobject fdo,
jlong address, jlong len)
{
void* a = (void *)jlong_to_ptr(address);
// 看这一行,这其实就是一个系统调用API,作用就是将mmap中映射的内存内容同步到相应的文件
int result = msync(a, (size_t)len, MS_SYNC);
if (result == -1) {
JNU_ThrowIOExceptionWithLastError(env, "msync failed");
}
}
原文地址:https://blog.csdn.net/zhang527294844/article/details/137685489
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!