Starocks中的一致性检查ConsistencyChecker
背景
本文基于Starrocks 3.1.7
结论
SR一致性检查主要是涉及两个部分:
1. tablet元数据的一致性检查
每间隔两个两个小时进行检查,检查TabletInvertedIndex LocalMetastore的tablet的一致性,如果tablet既不在当前catalog中也不在回收站里,就直接从当前的TabletInvertedIndex 删除掉
2. tablet的数据的一致性检查
从 23点到4点 一直循环进行 一致性检查,如果上一批还没完,则等待上一批的全部检查完才进行下一批的一致性检查,
以db/table/partition/index/tablet这种层次 按照 lastCheckTime 从远到近排序,选择最多100个tablet进行一致性校验, 选择的tablet条件为:
1. 不是 olap table以及不是物化视图的的 Normal表不校验
2. 分区副本数据为1的不校验
3. 已经校验的tablet不校验
其中涉及到的 变量为 consistency_check_end_time consistency_check_start_time 以及 MAX_JOB_NUM
分析
涉及到的数据链路如下:
getConsistencyChecker().start();
||
\/
Daemon.runOneCycle
||
\/
ConsistencyChecker.runAfterCatalogReady
主要的逻辑也是在runAfterCatalogReady中:
if (System.currentTimeMillis() - lastTabletMetaCheckTime > Config.consistency_tablet_meta_check_interval_ms) {
checkTabletMetaConsistency();
lastTabletMetaCheckTime = System.currentTimeMillis();
}
// for each round. try chose enough new tablets to check
// only add new job when it's work time
if (itsTime() && getJobNum() == 0) {
List<Long> chosenTabletIds = chooseTablets();
for (Long tabletId : chosenTabletIds) {
CheckConsistencyJob job = new CheckConsistencyJob(tabletId);
addJob(job);
}
}
jobsLock.writeLock().lock();
try {
// handle all jobs
Iterator<Map.Entry<Long, CheckConsistencyJob>> iterator = jobs.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, CheckConsistencyJob> entry = iterator.next();
CheckConsistencyJob oneJob = entry.getValue();
JobState state = oneJob.getState();
switch (state) {
case PENDING:
if (!oneJob.sendTasks()) {
clearJob(oneJob);
iterator.remove();
}
break;
case RUNNING:
int res = oneJob.tryFinishJob();
if (res == -1 || res == 1) {
// cancelled or finished
clearJob(oneJob);
iterator.remove();
}
break;
default:
break;
}
} // end while
} finally {
jobsLock.writeLock().unlock();
}
- 进行tabllet元数据的检查,间隔为两个小时,主要是在checkTabletMetaConsistency方法中,该方法的主要就是判断如果tablet既不在当前catalog中也不在回收站
里,就直接从当前的TabletInvertedIndex 删除掉,逻辑比较清晰。 - tablet数据的一致性检查
- 首先是判断是否是处于一致性检查的时间断,consistency_check_start_time(默认是23),consistency_check_end_time(默认是4),也就是每天的23点到4点之间进行一致性检查 且等待上一批次的一致性检查任务都结束了才进行下一轮的检查
- 再次是选择进行一致性检查的tablet,见方法 chooseTablets ,这块逻辑也很清晰,以db/table/partition/index/tablet这种层次 按照 lastCheckTime 从
远到近排序,选择最多100个tablet进行一致性校验 - 组装成 CheckConsistencyJob ,并放入到jobs中
- 调用sendTasks方法,从jobs中 构造AgentBatchTask,并提交給backend,进行一致性检查
AgentBatchTask batchTask = new AgentBatchTask(); ... for (Replica replica : tablet.getImmutableReplicas()) { // 1. if state is CLONE, do not send task at this time if (replica.getState() == ReplicaState.CLONE || replica.getState() == ReplicaState.DECOMMISSION) { continue; } if (replica.getDataSize() > maxDataSize) { maxDataSize = replica.getDataSize(); } CheckConsistencyTask task = new CheckConsistencyTask(null, replica.getBackendId(), tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(), tabletMeta.getIndexId(), tabletId, checkedSchemaHash, checkedVersion); // add task to send batchTask.addTask(task); // init checksum as '-1' checksumMap.put(replica.getBackendId(), -1L); ++sentTaskReplicaNum; } ... for (AgentTask task : batchTask.getAllTasks()) { AgentTaskQueue.addTask(task); } AgentTaskExecutor.submit(batchTask);
- 构建AgentBatchTask实例,
- 对于每一个tablet的replica副本,构建CheckConsistencyTask,并加到AgentBatchTask中去,并在最后添加到AgentTaskQueue队列中,便于进行跟踪
- 调用AgentTaskExecutor.submit(batchTask)把任务提交到后端的backend中去执行,主要代码在AgentBatchTask.run方法中
public void run() { for (Long backendId : this.backendIdToTasks.keySet()) { BackendService.Client client = null; TNetworkAddress address = null; boolean ok = false; try { ComputeNode computeNode = GlobalStateMgr.getCurrentSystemInfo().getBackend(backendId); if (RunMode.getCurrentRunMode() == RunMode.SHARED_DATA && computeNode == null) { computeNode = GlobalStateMgr.getCurrentSystemInfo().getComputeNode(backendId); } if (computeNode == null || !computeNode.isAlive()) { continue; } String host = computeNode.getHost(); int port = computeNode.getBePort(); List<AgentTask> tasks = this.backendIdToTasks.get(backendId); // create AgentClient address = new TNetworkAddress(host, port); client = ClientPool.backendPool.borrowObject(address); List<TAgentTaskRequest> agentTaskRequests = new LinkedList<TAgentTaskRequest>(); for (AgentTask task : tasks) { agentTaskRequests.add(toAgentTaskRequest(task)); } client.submit_tasks(agentTaskRequests);
- 根据每个tablet的backend的host和port,构建Thrift client,并调用toAgentTaskRequest把每个AgentTask转换为thridt格式的请求
- 调用client.submit_tasks提交到backend执行CHECK_CONSISTENCY任务
原文地址:https://blog.csdn.net/monkeyboy_tech/article/details/143580456
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!