自学内容网 自学内容网

Starocks中的一致性检查ConsistencyChecker

背景

本文基于Starrocks 3.1.7

结论

SR一致性检查主要是涉及两个部分:
 1. tablet元数据的一致性检查
    每间隔两个两个小时进行检查,检查TabletInvertedIndex LocalMetastore的tablet的一致性,如果tablet既不在当前catalog中也不在回收站里,就直接从当前的TabletInvertedIndex 删除掉
 2. tablet的数据的一致性检查
    从 23点到4点 一直循环进行 一致性检查,如果上一批还没完,则等待上一批的全部检查完才进行下一批的一致性检查,
    以db/table/partition/index/tablet这种层次 按照 lastCheckTime 从远到近排序,选择最多100个tablet进行一致性校验, 选择的tablet条件为:
    1. 不是 olap table以及不是物化视图的的 Normal表不校验
    2. 分区副本数据为1的不校验
    3. 已经校验的tablet不校验

其中涉及到的 变量为 consistency_check_end_time consistency_check_start_time 以及 MAX_JOB_NUM

分析

涉及到的数据链路如下:

getConsistencyChecker().start();
  ||
  \/
 Daemon.runOneCycle
  ||
  \/
 ConsistencyChecker.runAfterCatalogReady

主要的逻辑也是在runAfterCatalogReady中:

  if (System.currentTimeMillis() - lastTabletMetaCheckTime > Config.consistency_tablet_meta_check_interval_ms) {
      checkTabletMetaConsistency();
      lastTabletMetaCheckTime = System.currentTimeMillis();
  }

  // for each round. try chose enough new tablets to check
  // only add new job when it's work time
  if (itsTime() && getJobNum() == 0) {
      List<Long> chosenTabletIds = chooseTablets();
      for (Long tabletId : chosenTabletIds) {
          CheckConsistencyJob job = new CheckConsistencyJob(tabletId);
          addJob(job);
      }
  }

  jobsLock.writeLock().lock();
  try {
      // handle all jobs
      Iterator<Map.Entry<Long, CheckConsistencyJob>> iterator = jobs.entrySet().iterator();
      while (iterator.hasNext()) {
          Map.Entry<Long, CheckConsistencyJob> entry = iterator.next();
          CheckConsistencyJob oneJob = entry.getValue();

          JobState state = oneJob.getState();
          switch (state) {
              case PENDING:
                  if (!oneJob.sendTasks()) {
                      clearJob(oneJob);
                      iterator.remove();
                  }
                  break;
              case RUNNING:
                  int res = oneJob.tryFinishJob();
                  if (res == -1 || res == 1) {
                      // cancelled or finished
                      clearJob(oneJob);
                      iterator.remove();
                  }
                  break;
              default:
                  break;
          }
      } // end while
  } finally {
      jobsLock.writeLock().unlock();
  }
    
  • 进行tabllet元数据的检查,间隔为两个小时,主要是在checkTabletMetaConsistency方法中,该方法的主要就是判断如果tablet既不在当前catalog中也不在回收站
    里,就直接从当前的TabletInvertedIndex 删除掉,逻辑比较清晰。
  • tablet数据的一致性检查
    • 首先是判断是否是处于一致性检查的时间断,consistency_check_start_time(默认是23),consistency_check_end_time(默认是4),也就是每天的23点到4点之间进行一致性检查 且等待上一批次的一致性检查任务都结束了才进行下一轮的检查
    • 再次是选择进行一致性检查的tablet,见方法 chooseTablets ,这块逻辑也很清晰,以db/table/partition/index/tablet这种层次 按照 lastCheckTime 从
      远到近排序,选择最多100个tablet进行一致性校验
    • 组装成 CheckConsistencyJob ,并放入到jobs中
    • 调用sendTasks方法,从jobs中 构造AgentBatchTask,并提交給backend,进行一致性检查
        AgentBatchTask batchTask = new AgentBatchTask();
        ...
        for (Replica replica : tablet.getImmutableReplicas()) {
           // 1. if state is CLONE, do not send task at this time
           if (replica.getState() == ReplicaState.CLONE
                   || replica.getState() == ReplicaState.DECOMMISSION) {
               continue;
           }
      
           if (replica.getDataSize() > maxDataSize) {
               maxDataSize = replica.getDataSize();
           }
      
           CheckConsistencyTask task = new CheckConsistencyTask(null, replica.getBackendId(),
                   tabletMeta.getDbId(),
                   tabletMeta.getTableId(),
                   tabletMeta.getPartitionId(),
                   tabletMeta.getIndexId(),
                   tabletId, checkedSchemaHash,
                   checkedVersion);
      
           // add task to send
           batchTask.addTask(task);
      
           // init checksum as '-1'
           checksumMap.put(replica.getBackendId(), -1L);
      
           ++sentTaskReplicaNum;
        }
        ...
         for (AgentTask task : batchTask.getAllTasks()) {
              AgentTaskQueue.addTask(task);
          }
          AgentTaskExecutor.submit(batchTask);
      
      • 构建AgentBatchTask实例,
      • 对于每一个tablet的replica副本,构建CheckConsistencyTask,并加到AgentBatchTask中去,并在最后添加到AgentTaskQueue队列中,便于进行跟踪
      • 调用AgentTaskExecutor.submit(batchTask)把任务提交到后端的backend中去执行,主要代码在AgentBatchTask.run方法中
           public void run() {
             for (Long backendId : this.backendIdToTasks.keySet()) {
                 BackendService.Client client = null;
                 TNetworkAddress address = null;
                 boolean ok = false;
                 try {
                     ComputeNode computeNode = GlobalStateMgr.getCurrentSystemInfo().getBackend(backendId);
                     if (RunMode.getCurrentRunMode() == RunMode.SHARED_DATA && computeNode == null) {
                         computeNode = GlobalStateMgr.getCurrentSystemInfo().getComputeNode(backendId);
                     }
        
                     if (computeNode == null || !computeNode.isAlive()) {
                         continue;
                     }
        
                     String host = computeNode.getHost();
                     int port = computeNode.getBePort();
        
                     List<AgentTask> tasks = this.backendIdToTasks.get(backendId);
                     // create AgentClient
                     address = new TNetworkAddress(host, port);
                     client = ClientPool.backendPool.borrowObject(address);
                     List<TAgentTaskRequest> agentTaskRequests = new LinkedList<TAgentTaskRequest>();
                     for (AgentTask task : tasks) {
                         agentTaskRequests.add(toAgentTaskRequest(task));
                     }
                     client.submit_tasks(agentTaskRequests);
        
        • 根据每个tablet的backend的host和port,构建Thrift client,并调用toAgentTaskRequest把每个AgentTask转换为thridt格式的请求
        • 调用client.submit_tasks提交到backend执行CHECK_CONSISTENCY任务

原文地址:https://blog.csdn.net/monkeyboy_tech/article/details/143580456

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!