自学内容网 自学内容网

RocketMQ源码之消息刷盘分析

前言

刷盘是将内存中的消息写入磁盘,分为同步刷盘和异步刷盘。同步刷盘指一条消息写入磁盘才返回成功,异步刷盘指写入内存就返回成功,稍后异步线程刷盘。
在创建CommitLog对象的时候,会初始化刷盘服务:

//代码位置:org.apache.rocketmq.store.CommitLog
public CommitLog(final DefaultMessageStore defaultMessageStore) {
   
    //省略代码
    //同步刷盘
    if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
   
        this.flushCommitLogService = new GroupCommitService();
    } else {
   
        //异步刷盘
        this.flushCommitLogService = new FlushRealTimeService();
    }
    //实时提交服务
    this.commitLogService = new CommitRealTimeService();

    //省略代码
}

RocketMQ默认的是异步刷盘。刷盘服务的继承关系如下:
在这里插入图片描述
从上面类图可以看出,不管是同步刷盘还是异步刷盘都是继承了FlushCommitLogService类,GroupCommitService类用于同步刷盘,FlushRealTimeService用于异步刷盘。如果transientStorePoolEnable=true,并且是异步刷盘,则消息append时会放入writeBuffer(堆外内存),CommitRealTimeService的作用是将堆外内存中的消息异步写入到fileChannel中

在CommitLog启动的时候,会启动刷盘服务,代码如下:

//org.apache.rocketmq.store.CommitLog#start
   public void start() {
   
        this.flushCommitLogService.start();

        if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
   
            this.commitLogService.start();
        }
    }

isTransientStorePoolEnable的判断为transientStorePoolEnable为true,并且是异步刷盘。由于transientStorePoolEnable默认为false,所以默认不会进行commit操作

  public boolean isTransientStorePoolEnable() {
   
        return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()
            && BrokerRole.SLAVE != getBrokerRole();
    }

在putMessage方法中,当消息写到缓存以后,就会调用handleDiskFlush方法进行刷盘,handleDiskFlush方法如下:

//代码位置:org.apache.rocketmq.store.CommitLog#handleDiskFlush
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
   
        // Synchronization flush
        //同步刷盘
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
   
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            //等待消息存储成功
            if (messageExt.isWaitStoreMsgOK()) {
   
                //创建一个同步刷盘请求
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                //添加同步刷盘请求
                service.putRequest(request);
                CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
                PutMessageStatus flushStatus = null;
                try {
   
                    //阻塞获取刷盘状态
                    flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
                            TimeUnit.MILLISECONDS);
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
   
                    //flushOK=false;
                }
                //刷盘的状态不成功
                if (flushStatus != PutMessageStatus.PUT_OK) {
   
                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                        + " client address: " + messageExt.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {
   
                //唤醒刷盘服务
                service.wakeup();
            }
        }
        // Asynchronous flush
        else {
   
            //TransientStorePool开关不可用
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable())

原文地址:https://blog.csdn.net/Shrimp_millet/article/details/144713245

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!