【源码】Sharding-JDBC源码分析之SQL中分片键路由ShardingSQLRouter的原理
Sharding-JDBC系列
2、Sharding-JDBC分库分表之SpringBoot分片策略
3、Sharding-JDBC分库分表之SpringBoot主从配置
4、SpringBoot集成Sharding-JDBC-5.3.0分库分表
5、SpringBoot集成Sharding-JDBC-5.3.0实现按月动态建表分表
8、【源码】Sharding-JDBC源码分析之Yaml分片配置文件解析原理
9、【源码】Sharding-JDBC源码分析之Yaml分片配置原理(一)
10、【源码】Sharding-JDBC源码分析之Yaml分片配置原理(二)
11、【源码】Sharding-JDBC源码分析之Yaml分片配置转换原理
12、【源码】Sharding-JDBC源码分析之ShardingSphereDataSource的创建原理
13、【源码】Sharding-JDBC源码分析之ContextManager创建中mode分片配置信息的持久化存储的原理
14、【源码】Sharding-JDBC源码分析之ContextManager创建中ShardingSphereDatabase的创建原理
15、【源码】Sharding-JDBC源码分析之分片规则生成器DatabaseRuleBuilder实现规则配置到规则对象的生成原理
16、【源码】Sharding-JDBC源码分析之配置数据库定义的表的元数据解析原理
17、【源码】Sharding-JDBC源码分析之ShardingSphereConnection的创建原理
18、【源码】Sharding-JDBC源码分析之ShardingSpherePreparedStatement的创建原理
19、【源码】Sharding-JDBC源码分析之Sql解析的原理
20、【源码】Sharding-JDBC源码分析之SQL路由及SingleSQLRouter单表路由
21、【源码】Sharding-JDBC源码分析之SQL中分片键路由ShardingSQLRouter的原理
前言
ShardingSphere透明的为Java应用程序提供了数据库分片功能,只需配置好分片规则,无需关心底层的数据库分片细节。ShardingSphere框架根据配置好的分片规则,自动路由到实际操作的数据库、表中。本文从源码的角度分析 SQL 路由中的分片路由器ShardingSQLRouter的实现原理。
ShardingSpherePreparedStatement回顾
在【源码】Sharding-JDBC源码分析之SQL路由及SingleSQLRouter单表路由-CSDN博客中分析在执行SQL语句前,会进行SQL路由,创建RouteContext对象,在RouteContext路由上下文对象中,包含了SQL真正执行的数据源、逻辑表及真实表的映射。如果是DML操作,且配置了分片键,则ShardingSQLRouter分片路由器会第一个执行。
ShardingSQLRouter
如果没有通过提示语指定路由的数据源,则从SPI获取路由器之后,优先执行ShardingSQLRouter分片路由器,即执行路由器的createRouteContext()方法。在该类中,对应的decorateRouteContext()方法为空方法。
ShardingSQLRouter的源码如下:
package org.apache.shardingsphere.sharding.route.engine;
/**
* 分片SQL路由器
*/
public final class ShardingSQLRouter implements SQLRouter<ShardingRule> {
/**
* 创建路由上下文
* @param queryContext 查询上下文
* @param database 数据库信息
* @param rule 分片规则
* @param props 配置的属性
* @param connectionContext 连接上下文
* @return
*/
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public RouteContext createRouteContext(final QueryContext queryContext, final ShardingSphereDatabase database, final ShardingRule rule,
final ConfigurationProperties props, final ConnectionContext connectionContext) {
// 获取对应数据库、对应查询类型的SQL语句对象
SQLStatement sqlStatement = queryContext.getSqlStatementContext().getSqlStatement();
// 解析,获取分片条件
ShardingConditions shardingConditions = createShardingConditions(queryContext, database, rule);
// 获取分片语句校验器,查询语句返回空
Optional<ShardingStatementValidator> validator = ShardingStatementValidatorFactory.newInstance(sqlStatement, shardingConditions);
// 校验
validator.ifPresent(optional -> optional.preValidate(rule, queryContext.getSqlStatementContext(), queryContext.getParameters(), database, props));
if (sqlStatement instanceof DMLStatement && shardingConditions.isNeedMerge()) {
shardingConditions.merge();
}
// 创建路由引擎对象,执行路由,获取路由上下文
RouteContext result = ShardingRouteEngineFactory.newInstance(rule, database, queryContext, shardingConditions, props, connectionContext).route(rule);
// 校验
validator.ifPresent(optional -> optional.postValidate(rule, queryContext.getSqlStatementContext(), queryContext.getHintValueContext(), queryContext.getParameters(), database, props, result));
return result;
}
/**
* 创建分片条件对象
* @param queryContext 查询上下文
* @param database 数据库信息
* @param rule 分片规则
* @return
*/
@SuppressWarnings({"rawtypes", "unchecked"})
private ShardingConditions createShardingConditions(final QueryContext queryContext, final ShardingSphereDatabase database, final ShardingRule rule) {
List<ShardingCondition> shardingConditions;
// 如果SQL操作是DML || 游标
if (queryContext.getSqlStatementContext().getSqlStatement() instanceof DMLStatement || queryContext.getSqlStatementContext() instanceof CursorAvailable) {
// 创建分片条件引擎的新实例。通过SPI获取,默认返回DefaultShardingConditionEngine
ShardingConditionEngine shardingConditionEngine = ShardingConditionEngineFactory.createShardingConditionEngine(database, rule);
// 通过引擎创建分片条件
shardingConditions = shardingConditionEngine.createShardingConditions(queryContext.getSqlStatementContext(), queryContext.getParameters());
} else {
shardingConditions = Collections.emptyList();
}
// 创建分片条件集合对象
return new ShardingConditions(shardingConditions, queryContext.getSqlStatementContext(), rule);
}
@Override
public void decorateRouteContext(final RouteContext routeContext, final QueryContext queryContext, final ShardingSphereDatabase database, final ShardingRule rule,
final ConfigurationProperties props, final ConnectionContext connectionContext) {
// TODO
}
@Override
public int getOrder() {
return ShardingOrder.ORDER;
}
@Override
public Class<ShardingRule> getTypeClass() {
return ShardingRule.class;
}
}
3.1 createRouteContext()
在createRouteContext()方法中,主要执行如下:
1)获取SQL语句对象;
2)调用createShardingConditions()方法,解析,获取分片条件集合对象ShardingConditions;
2.1)ShardingConditions包含ShardingCondition集合、sql语句对象及分片规则对象;
2.2)每个分片键对应一个ShardingCondition;
2.3)ShardingCondition中存放分片键的信息,包含分片键的下标、列名、表名、分片的值;
2.4)分片的值主要为两种类型,一种是固定值,用于表示 =、in;另一种为区间值,用于表示 >、< 等区间条件;
3)安全性校验;
4)对于DML语句,如果需要条件合并,则进行条件合并;
5)调用ShardingRouteEngineFactory.newInstance(),创建分片路由引擎对象,执行route()路由方法,获取路由上下文;
5.1)通过ShardingRouteEngineFactory.newInstance(),获取分片路由引擎。对于DML语句的普通分片规则,返回ShardingStandardRoutingEngine对象;
5.2)在ShardingStandardRoutingEngine的route()方法中,根据配置的规则,创建对应的分片策略,执行对应doSharding()方法,传入参数值,获取真正执行的数据库、表信息DataNode;
5.3)根据DataNode,创建路由单元,保存到新创建的RouteContext对象中;
6)安全性校验;
7)返回创建的路由上下文;
3.2 createShardingConditions()
在createShardingConditions()方法中,执行如下:
1)如果SQL操作是DML语句 || 游标类型的语句,则获取分片条件引擎对象,默认为DefaultShardingConditionEngine,执行DefaultShardingConditionEngine的createShardingConditions()方法,创建ShardingCondition集合;
1.1)SQL语句中涉及分片键的有两种类型,一种为插入语句,另一种带where的语句。插入语句的分片键在插入的值中,其他类型的语句分片键在where条件中;
1.2)如果是插入语句,则通过InsertClauseShardingConditionEngine创建ShardingCondition集合;
1.3)其他类型语句,则通过WhereClauseShardingConditionEngine创建ShardingCondition集合;
2)否则不支持分片规则,ShardingCondition集合为空;
3)创建一个ShardingConditions对象,传入上面获取的ShardingCondition集合;
WhereClauseShardingConditionEngine
WhereClauseShardingConditionEngine为从 where 条件中解析出ShardingCondition分片条件的引擎。在该类中,通过解析SQL语句中的where条件部分,结合配置的分片规则,从中找出符合条件的分片键,以及分片键对应的值。
WhereClauseShardingConditionEngine的源码如下:
package org.apache.shardingsphere.sharding.route.engine.condition.engine.impl;
/**
* where分片条件引擎
*/
@RequiredArgsConstructor
public final class WhereClauseShardingConditionEngine {
// 分片规则
private final ShardingRule shardingRule;
// 数据库信息
private final ShardingSphereDatabase database;
/**
* 创建分片条件,每个分片键对应一个ShardingCondition
* @param sqlStatementContext
* @param params sql语句中的参数值
* @return
*/
public List<ShardingCondition> createShardingConditions(final SQLStatementContext<?> sqlStatementContext, final List<Object> params) {
if (!(sqlStatementContext instanceof WhereAvailable)) {
return Collections.emptyList();
}
// 获取where中用到的列,包括子查询
Collection<ColumnSegment> columnSegments = ((WhereAvailable) sqlStatementContext).getColumnSegments();
// 获取默认的schema的名称。默认为databaseName,即logic_db
String defaultSchemaName = DatabaseTypeEngine.getDefaultSchemaName(sqlStatementContext.getDatabaseType(), database.getName());
// 获取schema
ShardingSphereSchema schema = sqlStatementContext.getTablesContext().getSchemaName()
.map(database::getSchema).orElseGet(() -> database.getSchema(defaultSchemaName));
// key:列名;value为表名
Map<String, String> columnExpressionTableNames = sqlStatementContext.getTablesContext().findTableNamesByColumnSegment(columnSegments, schema);
List<ShardingCondition> result = new ArrayList<>();
for (WhereSegment each : ((WhereAvailable) sqlStatementContext).getWhereSegments()) {
// each.getExpr()返回where条件中的二元运算等表达式,如BinaryOperationExpression
// createShardingConditions():解析条件表达式中的分片键ShardingCondition对象
result.addAll(createShardingConditions(each.getExpr(), params, columnExpressionTableNames));
}
return result;
}
/**
* 从where表达式中创建分片条件对象。
* 解析条件表达式中的分片键,
* 1)如果是一个or条件,前后都有分片键,则创建两个ShardingCondition;
* 2)如果是 and 条件,最多创建一个 ShardingCondition;
* @param expression where条件中的某个表达式
* @param params 参数值
* @param columnExpressionTableNames
* @return
*/
private Collection<ShardingCondition> createShardingConditions(final ExpressionSegment expression, final List<Object> params, final Map<String, String> columnExpressionTableNames) {
// 获取And的谓词。
// 小于等于1个and,创建一个AndPredicate。一个or创建两个AndPredicate。多个and创建一个AndPredicate,其中的predicates包含多个
Collection<AndPredicate> andPredicates = ExpressionExtractUtil.getAndPredicates(expression);
Collection<ShardingCondition> result = new LinkedList<>();
for (AndPredicate each : andPredicates) {
// 从谓词中创建分片条件值Map。key为对应的列,value为对应列的分片值对象ShardingConditionValue
Map<Column, Collection<ShardingConditionValue>> shardingConditionValues = createShardingConditionValueMap(each.getPredicates(), params, columnExpressionTableNames);
// 如果没有分片键,则返回空
if (shardingConditionValues.isEmpty()) {
return Collections.emptyList();
}
// 创建分片条件。一个And谓词一个ShardingCondition对象
ShardingCondition shardingCondition = createShardingCondition(shardingConditionValues);
// TODO remove startIndex when federation has perfect support for subquery
shardingCondition.setStartIndex(expression.getStartIndex());
result.add(shardingCondition);
}
return result;
}
/**
* 从谓词中创建分片条件值Map。key为对应的列,value为对应列的分片值对象ShardingConditionValue
* @param predicates 条件中的谓词,如BinaryOperationExpression等
* @param params 传入的参数
* @param columnTableNames 列及对应表的Map
* @return
*/
private Map<Column, Collection<ShardingConditionValue>> createShardingConditionValueMap(final Collection<ExpressionSegment> predicates,
final List<Object> params, final Map<String, String> columnTableNames) {
Map<Column, Collection<ShardingConditionValue>> result = new HashMap<>(predicates.size(), 1);
// 遍历谓词的表达式
for (ExpressionSegment each : predicates) {
// 从谓词表达式部分中提取column部分
for (ColumnSegment columnSegment : ColumnExtractor.extract(each)) {
// 获取列对应的表
Optional<String> tableName = Optional.ofNullable(columnTableNames.get(columnSegment.getExpression()));
// 从列对应的表中查找配置的分片策略,如果配置的分片策略使用了对应columnName,则返回columnName;否则返回空
Optional<String> shardingColumn = tableName.flatMap(optional -> shardingRule.findShardingColumn(columnSegment.getIdentifier().getValue(), optional));
// 如果对应列没有配置分片策略,则跳过
if (!tableName.isPresent() || !shardingColumn.isPresent()) {
continue;
}
Column column = new Column(shardingColumn.get(), tableName.get());
// 创建分片条件值,不同的表达式,场景不同的值。如 =、in,创建ListShardingConditionValue;>、< 等表达式,创建RangeShardingConditionValue
Optional<ShardingConditionValue> shardingConditionValue = ConditionValueGeneratorFactory.generate(each, column, params);
if (!shardingConditionValue.isPresent()) {
continue;
}
result.computeIfAbsent(column, unused -> new LinkedList<>()).add(shardingConditionValue.get());
}
}
return result;
}
/**
* 创建分片条件对象
* @param shardingConditionValues
* @return
*/
private ShardingCondition createShardingCondition(final Map<Column, Collection<ShardingConditionValue>> shardingConditionValues) {
ShardingCondition result = new ShardingCondition();
for (Entry<Column, Collection<ShardingConditionValue>> entry : shardingConditionValues.entrySet()) {
try {
// 同一个列的多个分片条件值合并为一个
ShardingConditionValue shardingConditionValue = mergeShardingConditionValues(entry.getKey(), entry.getValue());
if (shardingConditionValue instanceof AlwaysFalseShardingConditionValue) {
return new AlwaysFalseShardingCondition();
}
result.getValues().add(shardingConditionValue);
} catch (final ClassCastException ex) {
throw new ShardingValueDataTypeException(entry.getKey());
}
}
return result;
}
/**
* 同一个谓词中的同一个列的多个分片条件值合并为一个
* @param column
* @param shardingConditionValues
* @return
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private ShardingConditionValue mergeShardingConditionValues(final Column column, final Collection<ShardingConditionValue> shardingConditionValues) {
Collection<Comparable<?>> listValue = null;
Range<Comparable<?>> rangeValue = null;
Set<Integer> parameterMarkerIndexes = new HashSet<>();
for (ShardingConditionValue each : shardingConditionValues) {
// 添加下标
parameterMarkerIndexes.addAll(each.getParameterMarkerIndexes());
// =、in 的处理
if (each instanceof ListShardingConditionValue) {
// 获取两个Collection集合的交集
// 对于确定的值,如age = 15 and age in (15, 20, 21),则最终只能是查询出age = 15的记录,
// 所以此处要进行集合的交集运算[15]和[15, 20, 21],结果为15
listValue = mergeListShardingValues(((ListShardingConditionValue) each).getValues(), listValue);
// 如果没有交集,则返回false
if (listValue.isEmpty()) {
return new AlwaysFalseShardingConditionValue();
}
} else if (each instanceof RangeShardingConditionValue) { // 区间值的处理
try {
// 值合并
rangeValue = mergeRangeShardingValues(((RangeShardingConditionValue) each).getValueRange(), rangeValue);
} catch (final IllegalArgumentException ex) {
return new AlwaysFalseShardingConditionValue();
}
}
}
if (null == listValue) {
return new RangeShardingConditionValue<>(column.getName(), column.getTableName(), rangeValue, new ArrayList<>(parameterMarkerIndexes));
}
if (null == rangeValue) {
return new ListShardingConditionValue<>(column.getName(), column.getTableName(), listValue, new ArrayList<>(parameterMarkerIndexes));
}
listValue = mergeListAndRangeShardingValues(listValue, rangeValue);
return listValue.isEmpty() ? new AlwaysFalseShardingConditionValue()
: new ListShardingConditionValue<>(column.getName(), column.getTableName(), listValue, new ArrayList<>(parameterMarkerIndexes));
}
/**
* 保留value1和value2交集的值
* @param value1
* @param value2
* @return
*/
private Collection<Comparable<?>> mergeListShardingValues(final Collection<Comparable<?>> value1, final Collection<Comparable<?>> value2) {
if (null == value2) {
return value1;
}
// 保留value1和value2交集的值
value1.retainAll(value2);
return value1;
}
/**
* 整合区间值
* @param value1
* @param value2
* @return
*/
private Range<Comparable<?>> mergeRangeShardingValues(final Range<Comparable<?>> value1, final Range<Comparable<?>> value2) {
return null == value2 ? value1 : SafeNumberOperationUtil.safeIntersection(value1, value2);
}
private Collection<Comparable<?>> mergeListAndRangeShardingValues(final Collection<Comparable<?>> listValue, final Range<Comparable<?>> rangeValue) {
Collection<Comparable<?>> result = new LinkedList<>();
for (Comparable<?> each : listValue) {
if (SafeNumberOperationUtil.safeContains(rangeValue, each)) {
result.add(each);
}
}
return result;
}
}
在createShardingConditions()方法中,主要执行如下:
1)获取where中用到的列,包括子查询部分;
2)获取默认的schema的名称。默认为databaseName,即logic_db;
3)获取列信息,为Map对象,key:列名;value为表名;
4)遍历查询语句的where部分,查询where部分中的分片键,创建ShardingCondition对象;
4.1)从where部分的表达式中获取AndPredicate谓词;
4.2)遍历AndPredicate谓词,每个谓词根据其中是否包含配置的分片键,以及值类型,创建ShardingCondition对象;
4.3)ShardingCondition对象中保存ShardingConditionValue集合。ShardingConditionValue包含了分片键、所属的表、值;
4.4)ShardingConditionValue主要包含两种类型:ListShardingConditionValue(确定的值,如 =、in 的条件)、RangeShardingConditionValue(区间值,如 > 、>= 等);
where部分解析的大体流程如下:
ShardingStandardRoutingEngine
在ShardingSQLRouter的createRouteContext()方法中,通过ShardingRouteEngineFactory.newInstance(),获取分片路由引擎。对于DML语句的普通分片规则,返回ShardingStandardRoutingEngine对象。
package org.apache.shardingsphere.sharding.route.engine.type.standard;
/**
* 标准分片路由引擎
*/
public final class ShardingStandardRoutingEngine implements ShardingRouteEngine {
// 逻辑表。有分片键的表或语句中的第一个表
private final String logicTableName;
// 分片键的分片条件信息对象
private final ShardingConditions shardingConditions;
// sql语句
private final SQLStatementContext<?> sqlStatementContext;
// 配置的props
private final ConfigurationProperties props;
// SQl 操作的涉及的分片数据节点集合
private final Collection<Collection<DataNode>> originalDataNodes = new LinkedList<>();
// SQL 语句中的提示提取器
private final SQLHintExtractor sqlHintExtractor;
public ShardingStandardRoutingEngine(final String logicTableName, final ShardingConditions shardingConditions, final SQLStatementContext<?> sqlStatementContext,
final HintValueContext hintValueContext, final ConfigurationProperties props) {
this.logicTableName = logicTableName;
this.shardingConditions = shardingConditions;
this.sqlStatementContext = sqlStatementContext;
this.props = props;
this.sqlHintExtractor = new SQLHintExtractor(sqlStatementContext.getSqlStatement(), hintValueContext);
}
/**
* 创建路由上下文
* @param shardingRule sharding rule
* @return
*/
@Override
public RouteContext route(final ShardingRule shardingRule) {
RouteContext result = new RouteContext();
// 获取当前sql分片条件对应的操作的数据节点。(操作的数据库及表)
Collection<DataNode> dataNodes = getDataNodes(shardingRule, shardingRule.getTableRule(logicTableName));
result.getOriginalDataNodes().addAll(originalDataNodes);
for (DataNode each : dataNodes) {
result.getRouteUnits().add(
new RouteUnit(new RouteMapper(each.getDataSourceName(), each.getDataSourceName()), Collections.singleton(new RouteMapper(logicTableName, each.getTableName()))));
}
return result;
}
/**
* 获取数据节点
* @param shardingRule
* @param tableRule
* @return
*/
private Collection<DataNode> getDataNodes(final ShardingRule shardingRule, final TableRule tableRule) {
// 创建数据源分片算法对象
ShardingStrategy databaseShardingStrategy = createShardingStrategy(shardingRule.getDatabaseShardingStrategyConfiguration(tableRule),
shardingRule.getShardingAlgorithms(), shardingRule.getDefaultShardingColumn());
// 创建表分片算法对象
ShardingStrategy tableShardingStrategy = createShardingStrategy(shardingRule.getTableShardingStrategyConfiguration(tableRule),
shardingRule.getShardingAlgorithms(), shardingRule.getDefaultShardingColumn());
// 是否数据源和表规则都是Hint策略
if (isRoutingByHint(shardingRule, tableRule)) {
return routeByHint(tableRule, databaseShardingStrategy, tableShardingStrategy);
}
// 是否数据源和表规则都不是Hint策略
if (isRoutingByShardingConditions(shardingRule, tableRule)) {
return routeByShardingConditions(shardingRule, tableRule, databaseShardingStrategy, tableShardingStrategy);
}
// 混合策略
return routeByMixedConditions(shardingRule, tableRule, databaseShardingStrategy, tableShardingStrategy);
}
/**
* 是否为数据源和表规则都是Hint策略
* @param shardingRule
* @param tableRule
* @return
*/
private boolean isRoutingByHint(final ShardingRule shardingRule, final TableRule tableRule) {
return shardingRule.getDatabaseShardingStrategyConfiguration(tableRule) instanceof HintShardingStrategyConfiguration
&& shardingRule.getTableShardingStrategyConfiguration(tableRule) instanceof HintShardingStrategyConfiguration;
}
/**
* 判断sql的hint中是否包含了sql语句中的表
* @return
*/
private boolean isRoutingBySQLHint() {
Collection<String> tableNames = sqlStatementContext.getTablesContext().getTableNames();
for (String each : tableNames) {
if (sqlHintExtractor.containsHintShardingValue(each)) {
return true;
}
}
return false;
}
/**
* Hint的路由
* @param tableRule
* @param databaseShardingStrategy
* @param tableShardingStrategy
* @return
*/
private Collection<DataNode> routeByHint(final TableRule tableRule, final ShardingStrategy databaseShardingStrategy, final ShardingStrategy tableShardingStrategy) {
return route0(tableRule, databaseShardingStrategy, getDatabaseShardingValuesFromHint(), tableShardingStrategy, getTableShardingValuesFromHint());
}
/**
* 是否数据源和表规则都不是Hint策略
* @param shardingRule
* @param tableRule
* @return
*/
private boolean isRoutingByShardingConditions(final ShardingRule shardingRule, final TableRule tableRule) {
return !(shardingRule.getDatabaseShardingStrategyConfiguration(tableRule) instanceof HintShardingStrategyConfiguration
|| shardingRule.getTableShardingStrategyConfiguration(tableRule) instanceof HintShardingStrategyConfiguration);
}
/**
* 通过分片条件路由
* @param shardingRule
* @param tableRule
* @param databaseShardingStrategy
* @param tableShardingStrategy
* @return
*/
private Collection<DataNode> routeByShardingConditions(final ShardingRule shardingRule, final TableRule tableRule,
final ShardingStrategy databaseShardingStrategy, final ShardingStrategy tableShardingStrategy) {
return shardingConditions.getConditions().isEmpty()
? route0(tableRule, databaseShardingStrategy, Collections.emptyList(), tableShardingStrategy, Collections.emptyList())
: routeByShardingConditionsWithCondition(shardingRule, tableRule, databaseShardingStrategy, tableShardingStrategy);
}
/**
* 通过分片条件作为条件路由
* @param shardingRule
* @param tableRule
* @param databaseShardingStrategy
* @param tableShardingStrategy
* @return
*/
private Collection<DataNode> routeByShardingConditionsWithCondition(final ShardingRule shardingRule, final TableRule tableRule,
final ShardingStrategy databaseShardingStrategy, final ShardingStrategy tableShardingStrategy) {
Collection<DataNode> result = new LinkedList<>();
for (ShardingCondition each : shardingConditions.getConditions()) {
Collection<DataNode> dataNodes = route0(tableRule,
databaseShardingStrategy, getShardingValuesFromShardingConditions(shardingRule, databaseShardingStrategy.getShardingColumns(), each),
tableShardingStrategy, getShardingValuesFromShardingConditions(shardingRule, tableShardingStrategy.getShardingColumns(), each));
result.addAll(dataNodes);
originalDataNodes.add(dataNodes);
}
return result;
}
/**
* 混合条件的路由,即包含Hint、又包含其他的规则
* @param shardingRule
* @param tableRule
* @param databaseShardingStrategy
* @param tableShardingStrategy
* @return
*/
private Collection<DataNode> routeByMixedConditions(final ShardingRule shardingRule, final TableRule tableRule,
final ShardingStrategy databaseShardingStrategy, final ShardingStrategy tableShardingStrategy) {
return shardingConditions.getConditions().isEmpty()
? routeByMixedConditionsWithHint(shardingRule, tableRule, databaseShardingStrategy, tableShardingStrategy)
: routeByMixedConditionsWithCondition(shardingRule, tableRule, databaseShardingStrategy, tableShardingStrategy);
}
/**
* 通过解析的分片条件进行混合条件的路由
* @param shardingRule
* @param tableRule
* @param databaseShardingStrategy
* @param tableShardingStrategy
* @return
*/
private Collection<DataNode> routeByMixedConditionsWithCondition(final ShardingRule shardingRule, final TableRule tableRule,
final ShardingStrategy databaseShardingStrategy, final ShardingStrategy tableShardingStrategy) {
Collection<DataNode> result = new LinkedList<>();
for (ShardingCondition each : shardingConditions.getConditions()) {
Collection<DataNode> dataNodes = route0(tableRule, databaseShardingStrategy,
getDatabaseShardingValues(shardingRule, databaseShardingStrategy, each), tableShardingStrategy, getTableShardingValues(shardingRule, tableShardingStrategy, each));
result.addAll(dataNodes);
originalDataNodes.add(dataNodes);
}
return result;
}
/**
* 有包含hint的复合条件
* @param shardingRule
* @param tableRule
* @param databaseShardingStrategy
* @param tableShardingStrategy
* @return
*/
private Collection<DataNode> routeByMixedConditionsWithHint(final ShardingRule shardingRule, final TableRule tableRule,
final ShardingStrategy databaseShardingStrategy, final ShardingStrategy tableShardingStrategy) {
// 其中table规则是Hint
if (shardingRule.getDatabaseShardingStrategyConfiguration(tableRule) instanceof HintShardingStrategyConfiguration) {
return route0(tableRule, databaseShardingStrategy, getDatabaseShardingValuesFromHint(), tableShardingStrategy, Collections.emptyList());
}
// database规则为Hint
return route0(tableRule, databaseShardingStrategy, Collections.emptyList(), tableShardingStrategy, getTableShardingValuesFromHint());
}
/**
* 获取数据库的分片条件值
* @param shardingRule
* @param databaseShardingStrategy
* @param shardingCondition
* @return
*/
private List<ShardingConditionValue> getDatabaseShardingValues(final ShardingRule shardingRule, final ShardingStrategy databaseShardingStrategy, final ShardingCondition shardingCondition) {
return isGettingShardingValuesFromHint(databaseShardingStrategy)
// 如果是Hint类型的策略,从Hint中获取
? getDatabaseShardingValuesFromHint()
// 如果不是Hint类型的策略,从分片条件对象中获取分片值
: getShardingValuesFromShardingConditions(shardingRule, databaseShardingStrategy.getShardingColumns(), shardingCondition);
}
/**
* 获取分片表的值
* @param shardingRule
* @param tableShardingStrategy
* @param shardingCondition
* @return
*/
private List<ShardingConditionValue> getTableShardingValues(final ShardingRule shardingRule, final ShardingStrategy tableShardingStrategy, final ShardingCondition shardingCondition) {
return isGettingShardingValuesFromHint(tableShardingStrategy)
? getTableShardingValuesFromHint()
: getShardingValuesFromShardingConditions(shardingRule, tableShardingStrategy.getShardingColumns(), shardingCondition);
}
/**
* 判断策略是否为Hint类型的策略
* @param shardingStrategy
* @return
*/
private boolean isGettingShardingValuesFromHint(final ShardingStrategy shardingStrategy) {
return shardingStrategy instanceof HintShardingStrategy;
}
/**
* 从Hint中获取数据库的分片条件值
* @return
*/
private List<ShardingConditionValue> getDatabaseShardingValuesFromHint() {
if (isRoutingBySQLHint()) {
return getDatabaseShardingValuesFromSQLHint();
}
return getShardingConditions(HintManager.isDatabaseShardingOnly() ? HintManager.getDatabaseShardingValues() : HintManager.getDatabaseShardingValues(logicTableName));
}
/**
* 从SQL语句中的Hint信息中获取数据库的分片条件值
* @return
*/
private List<ShardingConditionValue> getDatabaseShardingValuesFromSQLHint() {
Collection<Comparable<?>> shardingValues = new LinkedList<>();
Collection<String> tableNames = sqlStatementContext.getTablesContext().getTableNames();
for (String each : tableNames) {
// 如果是sql语句中的第一个表
if (each.equals(logicTableName) && sqlHintExtractor.containsHintShardingDatabaseValue(each)) {
// 创建分片条件值
shardingValues.addAll(sqlHintExtractor.getHintShardingDatabaseValue(each));
}
}
return getShardingConditions(shardingValues);
}
/**
* 从Hint中获取分片表的条件值。
*/
private List<ShardingConditionValue> getTableShardingValuesFromHint() {
if (isRoutingBySQLHint()) {
return getTableShardingValuesFromSQLHint();
}
return getShardingConditions(HintManager.getTableShardingValues(logicTableName));
}
/**
* 从SQL的hint中获取分片表的值
* @return
*/
private List<ShardingConditionValue> getTableShardingValuesFromSQLHint() {
Collection<Comparable<?>> shardingValues = new LinkedList<>();
Collection<String> tableNames = sqlStatementContext.getTablesContext().getTableNames();
for (String each : tableNames) {
if (each.equals(logicTableName) && sqlHintExtractor.containsHintShardingTableValue(each)) {
shardingValues.addAll(sqlHintExtractor.getHintShardingTableValue(each));
}
}
return getShardingConditions(shardingValues);
}
/**
* 创建分片条件值集合。如果传入的集合不为空,则创建ListShardingConditionValue对象,否则为空集合
* @param shardingValue
* @return
*/
private List<ShardingConditionValue> getShardingConditions(final Collection<Comparable<?>> shardingValue) {
return shardingValue.isEmpty() ? Collections.emptyList() : Collections.singletonList(new ListShardingConditionValue<>("", logicTableName, shardingValue));
}
/**
* 从分片条件对象中获取分片值
* @param shardingRule
* @param shardingColumns
* @param shardingCondition
* @return
*/
private List<ShardingConditionValue> getShardingValuesFromShardingConditions(final ShardingRule shardingRule, final Collection<String> shardingColumns, final ShardingCondition shardingCondition) {
List<ShardingConditionValue> result = new ArrayList<>(shardingColumns.size());
// 遍历分片条件
for (ShardingConditionValue each : shardingCondition.getValues()) {
// 查找绑定关联表
Optional<BindingTableRule> bindingTableRule = shardingRule.findBindingTableRule(each.getTableName());
// (等于逻辑表 || 有关联表 && 关联表是逻辑表) && 属于分片键,则将该分片条件值加入到result中
if ((logicTableName.equals(each.getTableName()) || bindingTableRule.isPresent() && bindingTableRule.get().hasLogicTable(logicTableName))
&& shardingColumns.contains(each.getColumnName())) {
result.add(each);
}
}
return result;
}
/**
* 路由
* @param tableRule 表规则
* @param databaseShardingStrategy 配置的数据源分片策略
* @param databaseShardingValues 数据源分片值
* @param tableShardingStrategy 配置的表分片策略
* @param tableShardingValues 表的分片值
* @return
*/
private Collection<DataNode> route0(final TableRule tableRule,
final ShardingStrategy databaseShardingStrategy, final List<ShardingConditionValue> databaseShardingValues,
final ShardingStrategy tableShardingStrategy, final List<ShardingConditionValue> tableShardingValues) {
// 数据源路由
Collection<String> routedDataSources = routeDataSources(tableRule, databaseShardingStrategy, databaseShardingValues);
Collection<DataNode> result = new LinkedList<>();
// 遍历数据源
for (String each : routedDataSources) {
// 表路由
result.addAll(routeTables(tableRule, each, tableShardingStrategy, tableShardingValues));
}
return result;
}
/**
* 数据源路由
* @param tableRule
* @param databaseShardingStrategy
* @param databaseShardingValues
* @return
*/
private Collection<String> routeDataSources(final TableRule tableRule, final ShardingStrategy databaseShardingStrategy, final List<ShardingConditionValue> databaseShardingValues) {
// 如果数据没有对应的分片值,即查询的列没有设置数据源分片,则返回实际数据源名称
if (databaseShardingValues.isEmpty()) {
return tableRule.getActualDataSourceNames();
}
// 执行分片算法的doSharding(),获取分片的数据源名称集合
Collection<String> result = databaseShardingStrategy.doSharding(tableRule.getActualDataSourceNames(), databaseShardingValues, tableRule.getDataSourceDataNode(), props);
Preconditions.checkState(!result.isEmpty(), "No database route info");
Preconditions.checkState(tableRule.getActualDataSourceNames().containsAll(result),
"Some routed data sources do not belong to configured data sources. routed data sources: `%s`, configured data sources: `%s`", result, tableRule.getActualDataSourceNames());
return result;
}
/**
* 表路由
* @param tableRule 表规则
* @param routedDataSource 路由的数据源
* @param tableShardingStrategy 表分片策略
* @param tableShardingValues 表分片值
* @return
*/
private Collection<DataNode> routeTables(final TableRule tableRule, final String routedDataSource,
final ShardingStrategy tableShardingStrategy, final List<ShardingConditionValue> tableShardingValues) {
// 获取目标数据源的实际表的集合
Collection<String> availableTargetTables = tableRule.getActualTableNames(routedDataSource);
Collection<String> routedTables = tableShardingValues.isEmpty()
? availableTargetTables
// 执行表分片算法的doSharding()方法,获取分片表名
: tableShardingStrategy.doSharding(availableTargetTables, tableShardingValues, tableRule.getTableDataNode(), props);
Collection<DataNode> result = new LinkedList<>();
for (String each : routedTables) {
// 创建分片的DataNode对象
result.add(new DataNode(routedDataSource, each));
}
return result;
}
/**
* 根据配置的信息,创建分片策略对象
* @param shardingStrategyConfig 分片策略配置,如算法名称、分片键等
* @param shardingAlgorithms 分片算法。key为算法的名称,value为对应算法类型的类(如:ClassBasedShardingAlgorithm等)
* @param defaultShardingColumn 默认的分片键
* @return
*/
private ShardingStrategy createShardingStrategy(final ShardingStrategyConfiguration shardingStrategyConfig, final Map<String, ShardingAlgorithm> shardingAlgorithms,
final String defaultShardingColumn) {
return null == shardingStrategyConfig ? new NoneShardingStrategy()
: ShardingStrategyFactory.newInstance(shardingStrategyConfig, shardingAlgorithms.get(shardingStrategyConfig.getShardingAlgorithmName()), defaultShardingColumn);
}
}
在route()方法中,执行如下:
1)执行getDataNodes(),获取当前SQL的分片数据节点,即真正要执行的数据库及表的映射对象;
1.1)根据配置的分片键的分片策略,创建数据源、表的分片策略对象。如对应分片键配置的策略为standard,则创建StandardShardingStrategy对象。其中的StandardShardingAlgorithm算法对象为算法配置时指定的type所对应的对象。如type配置为CLASS_BASED,则对应的StandardShardingAlgorithm算法对象为ClassBasedShardingAlgorithm。而在ClassBasedShardingAlgorithm中的StandardShardingAlgorithm算法对象才是真正自定义的算法对象;
1.2)根据配置的分片键的分片策略,判断数据源、表的策略条件,分为两种:Hint、非Hint。结合数据源、表,总共有三种场景:数据源、表都为Hint;都不为Hint;其中一种为Hint。针对上面的三种情况,分别判断处理,最后针对Hint和非Hint,获取分片的数据源和表;
1.2.1)如果是Hint的,则从提示中获取指定的数据源或表,创建新的ShardingCondition分片条件集合。逻辑为:
a)先从SQL语句中的注释信息中获取指定的数据源或表;
b)如果没有找到,则从HintManager中获取动态指定的数据源或表;
1.2.2)如果不是Hint,则使用WhereClauseShardingConditionEngine中解析where条件获取的ShardingCondition分片条件集合;
1.3)执行route0()方法,创建分片的数据节点;
1.3.1)执行routeDataSources(),执行数据源的分片算法(如ClassBasedShardingAlgorithm的doSharding()方法,在该方法中,执行自定义的分片算法的doSharding()方法),获取数据源字符串集合;
详见:【源码】Sharding-JDBC源码分析之分片规则生成器DatabaseRuleBuilder实现规则配置到规则对象的生成原理-中的ClassBasedShardingAlgorithm部分
1.3.2)遍历数据源字符串集合,执行routeTables(),执行表的分片算法,获取表字符串集合。同数据源字符串映射,生成DataNode对象。该对象记录分片的真实数据源、表的映射;
2)遍历DataNode集合对象,创建路由映射RouteMapper对象(逻辑表和真实表映射、数据源映射),创建路由单元RouteUnit对象。添加到RouteContext对象中;
3)返回RouteContext对象;
小结
限于篇幅,本篇先分享到这里。以下做一个小结:
1)在ShardingSpherePreparedStatement中真正执行SQL之前,要先创建路由上下文RouteContext对象;
该对象保存了SQL真正执行的数据库、逻辑表及真实表的映射信息;
2)对于非全路由的SQL操作,且没有在SQL的注释中指定数据源,则执行配置的路由器;
路由器包括ShardingSQLRouter(分片路由器)、SingleSQLRouter(单表路由器)、ReadwriteSplittingSQLRouter(读写分离路由器)、DatabaseDiscoverySQLRouter(数据库发现路由器)、ShadowSQLRouter(影子库路由器)。只需进行对应的配置,将自动创建对应的路由器。所有路由器以上面的顺序执行,第一个执行的会创建路由上下文对象,后执行的则合并路由上下文;
3)分片路由器ShardingSQLRouter如果有配置,会是第一个执行的路由器。在该路由器中,主要针对数据库的DML操作进行分片路由,创建路由上下文RouteContext对象;
3.1)DML操作主要分为两类,一种是Insert,另一种是带Where的。对于Insert语句,分片键的值在values部分,处理在InsertClauseShardingConditionEngine;后一种分片键的值在where部分,处理在WhereClauseShardingConditionEngine。通过分片条件引擎,解析分片键及值,创建ShardingCondition集合;
3.2)通过ShardingRouteEngineFactory.newInstance(),获取分片路由引擎。对于DML语句的普通分片规则,返回ShardingStandardRoutingEngine对象;
3.2.1)根据配置的分片键的分片策略,创建数据源、表的分片策略对象。如对应分片键配置的策略为standard,则创建StandardShardingStrategy对象。其中的StandardShardingAlgorithm算法对象为算法配置时指定的type所对应的对象。如type配置为CLASS_BASED,则对应的StandardShardingAlgorithm算法对象为ClassBasedShardingAlgorithm。而在ClassBasedShardingAlgorithm中的StandardShardingAlgorithm算法对象才是真正自定义的算法对象;
3.2.2)如果配置的是Hint策略,则从提示中获取指定的数据源或表(先从SQL的注释中获取,没有从HintManager中获取),创建新的ShardingCondition分片条件集合;
3.2.3)传入ShardingCondition分片条件,执行分片算法,获取实际执行的数据源及表,生成DataNode集合。先执行type对应的分片算法,在算法中执行自定义的分片算法(如果有配置);
3.2.4)遍历DataNode集合,创建路由映射(逻辑表和真实表映射、数据源映射)及路由单元,添加到新创建的路由上下文RouteContext中;
关于本篇内容你有什么自己的想法或独到见解,欢迎在评论区一起交流探讨下吧。
原文地址:https://blog.csdn.net/JingAi_jia917/article/details/143803962
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!