自学内容网 自学内容网

【源码】Sharding-JDBC源码分析之SQL中分片键路由ShardingSQLRouter的原理

 Sharding-JDBC系列

1、Sharding-JDBC分库分表的基本使用

2、Sharding-JDBC分库分表之SpringBoot分片策略

3、Sharding-JDBC分库分表之SpringBoot主从配置

4、SpringBoot集成Sharding-JDBC-5.3.0分库分表

5、SpringBoot集成Sharding-JDBC-5.3.0实现按月动态建表分表

6、【源码】Sharding-JDBC源码分析之JDBC

7、【源码】Sharding-JDBC源码分析之SPI机制

8、【源码】Sharding-JDBC源码分析之Yaml分片配置文件解析原理

9、【源码】Sharding-JDBC源码分析之Yaml分片配置原理(一)

10、【源码】Sharding-JDBC源码分析之Yaml分片配置原理(二)

11、【源码】Sharding-JDBC源码分析之Yaml分片配置转换原理

12、【源码】Sharding-JDBC源码分析之ShardingSphereDataSource的创建原理

13、【源码】Sharding-JDBC源码分析之ContextManager创建中mode分片配置信息的持久化存储的原理

14、【源码】Sharding-JDBC源码分析之ContextManager创建中ShardingSphereDatabase的创建原理

15、【源码】Sharding-JDBC源码分析之分片规则生成器DatabaseRuleBuilder实现规则配置到规则对象的生成原理

16、【源码】Sharding-JDBC源码分析之配置数据库定义的表的元数据解析原理

17、【源码】Sharding-JDBC源码分析之ShardingSphereConnection的创建原理

18、【源码】Sharding-JDBC源码分析之ShardingSpherePreparedStatement的创建原理

19、【源码】Sharding-JDBC源码分析之Sql解析的原理

20、【源码】Sharding-JDBC源码分析之SQL路由及SingleSQLRouter单表路由

21、【源码】Sharding-JDBC源码分析之SQL中分片键路由ShardingSQLRouter的原理

前言

ShardingSphere透明的为Java应用程序提供了数据库分片功能,只需配置好分片规则,无需关心底层的数据库分片细节。ShardingSphere框架根据配置好的分片规则,自动路由到实际操作的数据库、表中。本文从源码的角度分析 SQL 路由中的分片路由器ShardingSQLRouter的实现原理。

ShardingSpherePreparedStatement回顾

【源码】Sharding-JDBC源码分析之SQL路由及SingleSQLRouter单表路由-CSDN博客中分析在执行SQL语句前,会进行SQL路由,创建RouteContext对象,在RouteContext路由上下文对象中,包含了SQL真正执行的数据源、逻辑表及真实表的映射。如果是DML操作,且配置了分片键,则ShardingSQLRouter分片路由器会第一个执行。

ShardingSQLRouter

如果没有通过提示语指定路由的数据源,则从SPI获取路由器之后,优先执行ShardingSQLRouter分片路由器,即执行路由器的createRouteContext()方法。在该类中,对应的decorateRouteContext()方法为空方法。

ShardingSQLRouter的源码如下:

package org.apache.shardingsphere.sharding.route.engine;

/**
 * 分片SQL路由器
 */
public final class ShardingSQLRouter implements SQLRouter<ShardingRule> {

    /**
     * 创建路由上下文
     * @param queryContext 查询上下文
     * @param database 数据库信息
     * @param rule 分片规则
     * @param props 配置的属性
     * @param connectionContext 连接上下文
     * @return
     */
    @SuppressWarnings({"rawtypes", "unchecked"})
    @Override
    public RouteContext createRouteContext(final QueryContext queryContext, final ShardingSphereDatabase database, final ShardingRule rule,
                                           final ConfigurationProperties props, final ConnectionContext connectionContext) {
        // 获取对应数据库、对应查询类型的SQL语句对象
        SQLStatement sqlStatement = queryContext.getSqlStatementContext().getSqlStatement();
        // 解析,获取分片条件
        ShardingConditions shardingConditions = createShardingConditions(queryContext, database, rule);
        // 获取分片语句校验器,查询语句返回空
        Optional<ShardingStatementValidator> validator = ShardingStatementValidatorFactory.newInstance(sqlStatement, shardingConditions);
        // 校验
        validator.ifPresent(optional -> optional.preValidate(rule, queryContext.getSqlStatementContext(), queryContext.getParameters(), database, props));
        if (sqlStatement instanceof DMLStatement && shardingConditions.isNeedMerge()) {
            shardingConditions.merge();
        }
        // 创建路由引擎对象,执行路由,获取路由上下文
        RouteContext result = ShardingRouteEngineFactory.newInstance(rule, database, queryContext, shardingConditions, props, connectionContext).route(rule);
        // 校验
        validator.ifPresent(optional -> optional.postValidate(rule, queryContext.getSqlStatementContext(), queryContext.getHintValueContext(), queryContext.getParameters(), database, props, result));
        return result;
    }

    /**
     * 创建分片条件对象
     * @param queryContext 查询上下文
     * @param database 数据库信息
     * @param rule 分片规则
     * @return
     */
    @SuppressWarnings({"rawtypes", "unchecked"})
    private ShardingConditions createShardingConditions(final QueryContext queryContext, final ShardingSphereDatabase database, final ShardingRule rule) {
        List<ShardingCondition> shardingConditions;
        // 如果SQL操作是DML || 游标
        if (queryContext.getSqlStatementContext().getSqlStatement() instanceof DMLStatement || queryContext.getSqlStatementContext() instanceof CursorAvailable) {
            // 创建分片条件引擎的新实例。通过SPI获取,默认返回DefaultShardingConditionEngine
            ShardingConditionEngine shardingConditionEngine = ShardingConditionEngineFactory.createShardingConditionEngine(database, rule);
            // 通过引擎创建分片条件
            shardingConditions = shardingConditionEngine.createShardingConditions(queryContext.getSqlStatementContext(), queryContext.getParameters());
        } else {
            shardingConditions = Collections.emptyList();
        }
        // 创建分片条件集合对象
        return new ShardingConditions(shardingConditions, queryContext.getSqlStatementContext(), rule);
    }
    
    @Override
    public void decorateRouteContext(final RouteContext routeContext, final QueryContext queryContext, final ShardingSphereDatabase database, final ShardingRule rule,
                                     final ConfigurationProperties props, final ConnectionContext connectionContext) {
        // TODO
    }
    
    @Override
    public int getOrder() {
        return ShardingOrder.ORDER;
    }
    
    @Override
    public Class<ShardingRule> getTypeClass() {
        return ShardingRule.class;
    }
}

3.1 createRouteContext()

在createRouteContext()方法中,主要执行如下:

1)获取SQL语句对象;

2)调用createShardingConditions()方法,解析,获取分片条件集合对象ShardingConditions;

2.1)ShardingConditions包含ShardingCondition集合、sql语句对象及分片规则对象;

2.2)每个分片键对应一个ShardingCondition;

2.3)ShardingCondition中存放分片键的信息,包含分片键的下标、列名、表名、分片的值;

2.4)分片的值主要为两种类型,一种是固定值,用于表示 =、in;另一种为区间值,用于表示 >、< 等区间条件;

3)安全性校验;

4)对于DML语句,如果需要条件合并,则进行条件合并;

5)调用ShardingRouteEngineFactory.newInstance(),创建分片路由引擎对象,执行route()路由方法,获取路由上下文;

5.1)通过ShardingRouteEngineFactory.newInstance(),获取分片路由引擎。对于DML语句的普通分片规则,返回ShardingStandardRoutingEngine对象;

5.2)在ShardingStandardRoutingEngine的route()方法中,根据配置的规则,创建对应的分片策略,执行对应doSharding()方法,传入参数值,获取真正执行的数据库、表信息DataNode;

5.3)根据DataNode,创建路由单元,保存到新创建的RouteContext对象中;

6)安全性校验;

7)返回创建的路由上下文;

3.2 createShardingConditions()

在createShardingConditions()方法中,执行如下:

1)如果SQL操作是DML语句 || 游标类型的语句,则获取分片条件引擎对象,默认为DefaultShardingConditionEngine,执行DefaultShardingConditionEngine的createShardingConditions()方法,创建ShardingCondition集合;

1.1)SQL语句中涉及分片键的有两种类型,一种为插入语句,另一种带where的语句。插入语句的分片键在插入的值中,其他类型的语句分片键在where条件中;

1.2)如果是插入语句,则通过InsertClauseShardingConditionEngine创建ShardingCondition集合;

1.3)其他类型语句,则通过WhereClauseShardingConditionEngine创建ShardingCondition集合;

2)否则不支持分片规则,ShardingCondition集合为空;

3)创建一个ShardingConditions对象,传入上面获取的ShardingCondition集合;

WhereClauseShardingConditionEngine

WhereClauseShardingConditionEngine为从 where 条件中解析出ShardingCondition分片条件的引擎。在该类中,通过解析SQL语句中的where条件部分,结合配置的分片规则,从中找出符合条件的分片键,以及分片键对应的值。

WhereClauseShardingConditionEngine的源码如下:

package org.apache.shardingsphere.sharding.route.engine.condition.engine.impl;

/**
 * where分片条件引擎
 */
@RequiredArgsConstructor
public final class WhereClauseShardingConditionEngine {

    // 分片规则
    private final ShardingRule shardingRule;

    // 数据库信息
    private final ShardingSphereDatabase database;
    
    /**
     * 创建分片条件,每个分片键对应一个ShardingCondition
     * @param sqlStatementContext
     * @param params sql语句中的参数值
     * @return
     */
    public List<ShardingCondition> createShardingConditions(final SQLStatementContext<?> sqlStatementContext, final List<Object> params) {
        if (!(sqlStatementContext instanceof WhereAvailable)) {
            return Collections.emptyList();
        }
        // 获取where中用到的列,包括子查询
        Collection<ColumnSegment> columnSegments = ((WhereAvailable) sqlStatementContext).getColumnSegments();
        // 获取默认的schema的名称。默认为databaseName,即logic_db
        String defaultSchemaName = DatabaseTypeEngine.getDefaultSchemaName(sqlStatementContext.getDatabaseType(), database.getName());
        // 获取schema
        ShardingSphereSchema schema = sqlStatementContext.getTablesContext().getSchemaName()
                .map(database::getSchema).orElseGet(() -> database.getSchema(defaultSchemaName));
        // key:列名;value为表名
        Map<String, String> columnExpressionTableNames = sqlStatementContext.getTablesContext().findTableNamesByColumnSegment(columnSegments, schema);
        List<ShardingCondition> result = new ArrayList<>();
        for (WhereSegment each : ((WhereAvailable) sqlStatementContext).getWhereSegments()) {
            // each.getExpr()返回where条件中的二元运算等表达式,如BinaryOperationExpression
            // createShardingConditions():解析条件表达式中的分片键ShardingCondition对象
            result.addAll(createShardingConditions(each.getExpr(), params, columnExpressionTableNames));
        }
        return result;
    }

    /**
     * 从where表达式中创建分片条件对象。
     * 解析条件表达式中的分片键,
     * 1)如果是一个or条件,前后都有分片键,则创建两个ShardingCondition;
     * 2)如果是 and 条件,最多创建一个 ShardingCondition;
     * @param expression where条件中的某个表达式
     * @param params 参数值
     * @param columnExpressionTableNames
     * @return
     */
    private Collection<ShardingCondition> createShardingConditions(final ExpressionSegment expression, final List<Object> params, final Map<String, String> columnExpressionTableNames) {
        // 获取And的谓词。
        // 小于等于1个and,创建一个AndPredicate。一个or创建两个AndPredicate。多个and创建一个AndPredicate,其中的predicates包含多个
        Collection<AndPredicate> andPredicates = ExpressionExtractUtil.getAndPredicates(expression);
        Collection<ShardingCondition> result = new LinkedList<>();
        for (AndPredicate each : andPredicates) {
            // 从谓词中创建分片条件值Map。key为对应的列,value为对应列的分片值对象ShardingConditionValue
            Map<Column, Collection<ShardingConditionValue>> shardingConditionValues = createShardingConditionValueMap(each.getPredicates(), params, columnExpressionTableNames);
            // 如果没有分片键,则返回空
            if (shardingConditionValues.isEmpty()) {
                return Collections.emptyList();
            }
            // 创建分片条件。一个And谓词一个ShardingCondition对象
            ShardingCondition shardingCondition = createShardingCondition(shardingConditionValues);
            // TODO remove startIndex when federation has perfect support for subquery
            shardingCondition.setStartIndex(expression.getStartIndex());
            result.add(shardingCondition);
        }
        return result;
    }

    /**
     * 从谓词中创建分片条件值Map。key为对应的列,value为对应列的分片值对象ShardingConditionValue
     * @param predicates 条件中的谓词,如BinaryOperationExpression等
     * @param params 传入的参数
     * @param columnTableNames 列及对应表的Map
     * @return
     */
    private Map<Column, Collection<ShardingConditionValue>> createShardingConditionValueMap(final Collection<ExpressionSegment> predicates,
                                                                                            final List<Object> params, final Map<String, String> columnTableNames) {
        Map<Column, Collection<ShardingConditionValue>> result = new HashMap<>(predicates.size(), 1);
        // 遍历谓词的表达式
        for (ExpressionSegment each : predicates) {
            // 从谓词表达式部分中提取column部分
            for (ColumnSegment columnSegment : ColumnExtractor.extract(each)) {
                // 获取列对应的表
                Optional<String> tableName = Optional.ofNullable(columnTableNames.get(columnSegment.getExpression()));
                // 从列对应的表中查找配置的分片策略,如果配置的分片策略使用了对应columnName,则返回columnName;否则返回空
                Optional<String> shardingColumn = tableName.flatMap(optional -> shardingRule.findShardingColumn(columnSegment.getIdentifier().getValue(), optional));
                // 如果对应列没有配置分片策略,则跳过
                if (!tableName.isPresent() || !shardingColumn.isPresent()) {
                    continue;
                }
                Column column = new Column(shardingColumn.get(), tableName.get());
                // 创建分片条件值,不同的表达式,场景不同的值。如 =、in,创建ListShardingConditionValue;>、< 等表达式,创建RangeShardingConditionValue
                Optional<ShardingConditionValue> shardingConditionValue = ConditionValueGeneratorFactory.generate(each, column, params);
                if (!shardingConditionValue.isPresent()) {
                    continue;
                }
                result.computeIfAbsent(column, unused -> new LinkedList<>()).add(shardingConditionValue.get());
            }
        }
        return result;
    }

    /**
     * 创建分片条件对象
     * @param shardingConditionValues
     * @return
     */
    private ShardingCondition createShardingCondition(final Map<Column, Collection<ShardingConditionValue>> shardingConditionValues) {
        ShardingCondition result = new ShardingCondition();
        for (Entry<Column, Collection<ShardingConditionValue>> entry : shardingConditionValues.entrySet()) {
            try {
                // 同一个列的多个分片条件值合并为一个
                ShardingConditionValue shardingConditionValue = mergeShardingConditionValues(entry.getKey(), entry.getValue());
                if (shardingConditionValue instanceof AlwaysFalseShardingConditionValue) {
                    return new AlwaysFalseShardingCondition();
                }
                result.getValues().add(shardingConditionValue);
            } catch (final ClassCastException ex) {
                throw new ShardingValueDataTypeException(entry.getKey());
            }
        }
        return result;
    }

    /**
     * 同一个谓词中的同一个列的多个分片条件值合并为一个
     * @param column
     * @param shardingConditionValues
     * @return
     */
    @SuppressWarnings({"unchecked", "rawtypes"})
    private ShardingConditionValue mergeShardingConditionValues(final Column column, final Collection<ShardingConditionValue> shardingConditionValues) {
        Collection<Comparable<?>> listValue = null;
        Range<Comparable<?>> rangeValue = null;
        Set<Integer> parameterMarkerIndexes = new HashSet<>();
        for (ShardingConditionValue each : shardingConditionValues) {
            // 添加下标
            parameterMarkerIndexes.addAll(each.getParameterMarkerIndexes());
            // =、in 的处理
            if (each instanceof ListShardingConditionValue) {
                // 获取两个Collection集合的交集
                // 对于确定的值,如age = 15 and age in (15, 20, 21),则最终只能是查询出age = 15的记录,
                // 所以此处要进行集合的交集运算[15]和[15, 20, 21],结果为15
                listValue = mergeListShardingValues(((ListShardingConditionValue) each).getValues(), listValue);
                // 如果没有交集,则返回false
                if (listValue.isEmpty()) {
                    return new AlwaysFalseShardingConditionValue();
                }
            } else if (each instanceof RangeShardingConditionValue) { // 区间值的处理
                try {
                    // 值合并
                    rangeValue = mergeRangeShardingValues(((RangeShardingConditionValue) each).getValueRange(), rangeValue);
                } catch (final IllegalArgumentException ex) {
                    return new AlwaysFalseShardingConditionValue();
                }
            }
        }
        if (null == listValue) {
            return new RangeShardingConditionValue<>(column.getName(), column.getTableName(), rangeValue, new ArrayList<>(parameterMarkerIndexes));
        }
        if (null == rangeValue) {
            return new ListShardingConditionValue<>(column.getName(), column.getTableName(), listValue, new ArrayList<>(parameterMarkerIndexes));
        }
        listValue = mergeListAndRangeShardingValues(listValue, rangeValue);
        return listValue.isEmpty() ? new AlwaysFalseShardingConditionValue()
                : new ListShardingConditionValue<>(column.getName(), column.getTableName(), listValue, new ArrayList<>(parameterMarkerIndexes));
    }

    /**
     * 保留value1和value2交集的值
     * @param value1
     * @param value2
     * @return
     */
    private Collection<Comparable<?>> mergeListShardingValues(final Collection<Comparable<?>> value1, final Collection<Comparable<?>> value2) {
        if (null == value2) {
            return value1;
        }
        // 保留value1和value2交集的值
        value1.retainAll(value2);
        return value1;
    }

    /**
     * 整合区间值
     * @param value1
     * @param value2
     * @return
     */
    private Range<Comparable<?>> mergeRangeShardingValues(final Range<Comparable<?>> value1, final Range<Comparable<?>> value2) {
        return null == value2 ? value1 : SafeNumberOperationUtil.safeIntersection(value1, value2);
    }
    
    private Collection<Comparable<?>> mergeListAndRangeShardingValues(final Collection<Comparable<?>> listValue, final Range<Comparable<?>> rangeValue) {
        Collection<Comparable<?>> result = new LinkedList<>();
        for (Comparable<?> each : listValue) {
            if (SafeNumberOperationUtil.safeContains(rangeValue, each)) {
                result.add(each);
            }
        }
        return result;
    }
}

在createShardingConditions()方法中,主要执行如下:

1)获取where中用到的列,包括子查询部分;

2)获取默认的schema的名称。默认为databaseName,即logic_db;

3)获取列信息,为Map对象,key:列名;value为表名;

4)遍历查询语句的where部分,查询where部分中的分片键,创建ShardingCondition对象;

4.1)从where部分的表达式中获取AndPredicate谓词;

4.2)遍历AndPredicate谓词,每个谓词根据其中是否包含配置的分片键,以及值类型,创建ShardingCondition对象;

4.3)ShardingCondition对象中保存ShardingConditionValue集合。ShardingConditionValue包含了分片键、所属的表、值;

4.4)ShardingConditionValue主要包含两种类型:ListShardingConditionValue(确定的值,如 =、in 的条件)、RangeShardingConditionValue(区间值,如 > 、>= 等);

where部分解析的大体流程如下:

ShardingStandardRoutingEngine

在ShardingSQLRouter的createRouteContext()方法中,通过ShardingRouteEngineFactory.newInstance(),获取分片路由引擎。对于DML语句的普通分片规则,返回ShardingStandardRoutingEngine对象。

package org.apache.shardingsphere.sharding.route.engine.type.standard;

/**
 * 标准分片路由引擎
 */
public final class ShardingStandardRoutingEngine implements ShardingRouteEngine {

    // 逻辑表。有分片键的表或语句中的第一个表
    private final String logicTableName;

    // 分片键的分片条件信息对象
    private final ShardingConditions shardingConditions;

    // sql语句
    private final SQLStatementContext<?> sqlStatementContext;

    // 配置的props
    private final ConfigurationProperties props;

    // SQl 操作的涉及的分片数据节点集合
    private final Collection<Collection<DataNode>> originalDataNodes = new LinkedList<>();

    // SQL 语句中的提示提取器
    private final SQLHintExtractor sqlHintExtractor;
    
    public ShardingStandardRoutingEngine(final String logicTableName, final ShardingConditions shardingConditions, final SQLStatementContext<?> sqlStatementContext,
                                         final HintValueContext hintValueContext, final ConfigurationProperties props) {
        this.logicTableName = logicTableName;
        this.shardingConditions = shardingConditions;
        this.sqlStatementContext = sqlStatementContext;
        this.props = props;
        this.sqlHintExtractor = new SQLHintExtractor(sqlStatementContext.getSqlStatement(), hintValueContext);
    }

    /**
     * 创建路由上下文
     * @param shardingRule sharding rule
     * @return
     */
    @Override
    public RouteContext route(final ShardingRule shardingRule) {
        RouteContext result = new RouteContext();
        // 获取当前sql分片条件对应的操作的数据节点。(操作的数据库及表)
        Collection<DataNode> dataNodes = getDataNodes(shardingRule, shardingRule.getTableRule(logicTableName));
        result.getOriginalDataNodes().addAll(originalDataNodes);
        for (DataNode each : dataNodes) {
            result.getRouteUnits().add(
                    new RouteUnit(new RouteMapper(each.getDataSourceName(), each.getDataSourceName()), Collections.singleton(new RouteMapper(logicTableName, each.getTableName()))));
        }
        return result;
    }

    /**
     * 获取数据节点
     * @param shardingRule
     * @param tableRule
     * @return
     */
    private Collection<DataNode> getDataNodes(final ShardingRule shardingRule, final TableRule tableRule) {
        // 创建数据源分片算法对象
        ShardingStrategy databaseShardingStrategy = createShardingStrategy(shardingRule.getDatabaseShardingStrategyConfiguration(tableRule),
                shardingRule.getShardingAlgorithms(), shardingRule.getDefaultShardingColumn());
        // 创建表分片算法对象
        ShardingStrategy tableShardingStrategy = createShardingStrategy(shardingRule.getTableShardingStrategyConfiguration(tableRule),
                shardingRule.getShardingAlgorithms(), shardingRule.getDefaultShardingColumn());
        // 是否数据源和表规则都是Hint策略
        if (isRoutingByHint(shardingRule, tableRule)) {
            return routeByHint(tableRule, databaseShardingStrategy, tableShardingStrategy);
        }
        // 是否数据源和表规则都不是Hint策略
        if (isRoutingByShardingConditions(shardingRule, tableRule)) {
            return routeByShardingConditions(shardingRule, tableRule, databaseShardingStrategy, tableShardingStrategy);
        }
        // 混合策略
        return routeByMixedConditions(shardingRule, tableRule, databaseShardingStrategy, tableShardingStrategy);
    }

    /**
     * 是否为数据源和表规则都是Hint策略
     * @param shardingRule
     * @param tableRule
     * @return
     */
    private boolean isRoutingByHint(final ShardingRule shardingRule, final TableRule tableRule) {
        return shardingRule.getDatabaseShardingStrategyConfiguration(tableRule) instanceof HintShardingStrategyConfiguration
                && shardingRule.getTableShardingStrategyConfiguration(tableRule) instanceof HintShardingStrategyConfiguration;
    }

    /**
     * 判断sql的hint中是否包含了sql语句中的表
     * @return
     */
    private boolean isRoutingBySQLHint() {
        Collection<String> tableNames = sqlStatementContext.getTablesContext().getTableNames();
        for (String each : tableNames) {
            if (sqlHintExtractor.containsHintShardingValue(each)) {
                return true;
            }
        }
        return false;
    }

    /**
     * Hint的路由
     * @param tableRule
     * @param databaseShardingStrategy
     * @param tableShardingStrategy
     * @return
     */
    private Collection<DataNode> routeByHint(final TableRule tableRule, final ShardingStrategy databaseShardingStrategy, final ShardingStrategy tableShardingStrategy) {
        return route0(tableRule, databaseShardingStrategy, getDatabaseShardingValuesFromHint(), tableShardingStrategy, getTableShardingValuesFromHint());
    }

    /**
     * 是否数据源和表规则都不是Hint策略
     * @param shardingRule
     * @param tableRule
     * @return
     */
    private boolean isRoutingByShardingConditions(final ShardingRule shardingRule, final TableRule tableRule) {
        return !(shardingRule.getDatabaseShardingStrategyConfiguration(tableRule) instanceof HintShardingStrategyConfiguration
                || shardingRule.getTableShardingStrategyConfiguration(tableRule) instanceof HintShardingStrategyConfiguration);
    }

    /**
     * 通过分片条件路由
     * @param shardingRule
     * @param tableRule
     * @param databaseShardingStrategy
     * @param tableShardingStrategy
     * @return
     */
    private Collection<DataNode> routeByShardingConditions(final ShardingRule shardingRule, final TableRule tableRule,
                                                           final ShardingStrategy databaseShardingStrategy, final ShardingStrategy tableShardingStrategy) {
        return shardingConditions.getConditions().isEmpty()
                ? route0(tableRule, databaseShardingStrategy, Collections.emptyList(), tableShardingStrategy, Collections.emptyList())
                : routeByShardingConditionsWithCondition(shardingRule, tableRule, databaseShardingStrategy, tableShardingStrategy);
    }

    /**
     * 通过分片条件作为条件路由
     * @param shardingRule
     * @param tableRule
     * @param databaseShardingStrategy
     * @param tableShardingStrategy
     * @return
     */
    private Collection<DataNode> routeByShardingConditionsWithCondition(final ShardingRule shardingRule, final TableRule tableRule,
                                                                        final ShardingStrategy databaseShardingStrategy, final ShardingStrategy tableShardingStrategy) {
        Collection<DataNode> result = new LinkedList<>();
        for (ShardingCondition each : shardingConditions.getConditions()) {
            Collection<DataNode> dataNodes = route0(tableRule,
                    databaseShardingStrategy, getShardingValuesFromShardingConditions(shardingRule, databaseShardingStrategy.getShardingColumns(), each),
                    tableShardingStrategy, getShardingValuesFromShardingConditions(shardingRule, tableShardingStrategy.getShardingColumns(), each));
            result.addAll(dataNodes);
            originalDataNodes.add(dataNodes);
        }
        return result;
    }

    /**
     * 混合条件的路由,即包含Hint、又包含其他的规则
     * @param shardingRule
     * @param tableRule
     * @param databaseShardingStrategy
     * @param tableShardingStrategy
     * @return
     */
    private Collection<DataNode> routeByMixedConditions(final ShardingRule shardingRule, final TableRule tableRule,
                                                        final ShardingStrategy databaseShardingStrategy, final ShardingStrategy tableShardingStrategy) {
        return shardingConditions.getConditions().isEmpty()
                ? routeByMixedConditionsWithHint(shardingRule, tableRule, databaseShardingStrategy, tableShardingStrategy)
                : routeByMixedConditionsWithCondition(shardingRule, tableRule, databaseShardingStrategy, tableShardingStrategy);
    }

    /**
     * 通过解析的分片条件进行混合条件的路由
     * @param shardingRule
     * @param tableRule
     * @param databaseShardingStrategy
     * @param tableShardingStrategy
     * @return
     */
    private Collection<DataNode> routeByMixedConditionsWithCondition(final ShardingRule shardingRule, final TableRule tableRule,
                                                                     final ShardingStrategy databaseShardingStrategy, final ShardingStrategy tableShardingStrategy) {
        Collection<DataNode> result = new LinkedList<>();
        for (ShardingCondition each : shardingConditions.getConditions()) {
            Collection<DataNode> dataNodes = route0(tableRule, databaseShardingStrategy,
                    getDatabaseShardingValues(shardingRule, databaseShardingStrategy, each), tableShardingStrategy, getTableShardingValues(shardingRule, tableShardingStrategy, each));
            result.addAll(dataNodes);
            originalDataNodes.add(dataNodes);
        }
        return result;
    }

    /**
     * 有包含hint的复合条件
     * @param shardingRule
     * @param tableRule
     * @param databaseShardingStrategy
     * @param tableShardingStrategy
     * @return
     */
    private Collection<DataNode> routeByMixedConditionsWithHint(final ShardingRule shardingRule, final TableRule tableRule,
                                                                final ShardingStrategy databaseShardingStrategy, final ShardingStrategy tableShardingStrategy) {
        // 其中table规则是Hint
        if (shardingRule.getDatabaseShardingStrategyConfiguration(tableRule) instanceof HintShardingStrategyConfiguration) {
            return route0(tableRule, databaseShardingStrategy, getDatabaseShardingValuesFromHint(), tableShardingStrategy, Collections.emptyList());
        }
        // database规则为Hint
        return route0(tableRule, databaseShardingStrategy, Collections.emptyList(), tableShardingStrategy, getTableShardingValuesFromHint());
    }

    /**
     * 获取数据库的分片条件值
     * @param shardingRule
     * @param databaseShardingStrategy
     * @param shardingCondition
     * @return
     */
    private List<ShardingConditionValue> getDatabaseShardingValues(final ShardingRule shardingRule, final ShardingStrategy databaseShardingStrategy, final ShardingCondition shardingCondition) {
        return isGettingShardingValuesFromHint(databaseShardingStrategy)
                // 如果是Hint类型的策略,从Hint中获取
                ? getDatabaseShardingValuesFromHint()
                // 如果不是Hint类型的策略,从分片条件对象中获取分片值
                : getShardingValuesFromShardingConditions(shardingRule, databaseShardingStrategy.getShardingColumns(), shardingCondition);
    }

    /**
     * 获取分片表的值
     * @param shardingRule
     * @param tableShardingStrategy
     * @param shardingCondition
     * @return
     */
    private List<ShardingConditionValue> getTableShardingValues(final ShardingRule shardingRule, final ShardingStrategy tableShardingStrategy, final ShardingCondition shardingCondition) {
        return isGettingShardingValuesFromHint(tableShardingStrategy)
                ? getTableShardingValuesFromHint()
                : getShardingValuesFromShardingConditions(shardingRule, tableShardingStrategy.getShardingColumns(), shardingCondition);
    }

    /**
     * 判断策略是否为Hint类型的策略
     * @param shardingStrategy
     * @return
     */
    private boolean isGettingShardingValuesFromHint(final ShardingStrategy shardingStrategy) {
        return shardingStrategy instanceof HintShardingStrategy;
    }

    /**
     * 从Hint中获取数据库的分片条件值
     * @return
     */
    private List<ShardingConditionValue> getDatabaseShardingValuesFromHint() {
        if (isRoutingBySQLHint()) {
            return getDatabaseShardingValuesFromSQLHint();
        }
        return getShardingConditions(HintManager.isDatabaseShardingOnly() ? HintManager.getDatabaseShardingValues() : HintManager.getDatabaseShardingValues(logicTableName));
    }

    /**
     * 从SQL语句中的Hint信息中获取数据库的分片条件值
     * @return
     */
    private List<ShardingConditionValue> getDatabaseShardingValuesFromSQLHint() {
        Collection<Comparable<?>> shardingValues = new LinkedList<>();
        Collection<String> tableNames = sqlStatementContext.getTablesContext().getTableNames();
        for (String each : tableNames) {
            // 如果是sql语句中的第一个表
            if (each.equals(logicTableName) && sqlHintExtractor.containsHintShardingDatabaseValue(each)) {
                // 创建分片条件值
                shardingValues.addAll(sqlHintExtractor.getHintShardingDatabaseValue(each));
            }
        }
        return getShardingConditions(shardingValues);
    }

    /**
     * 从Hint中获取分片表的条件值。
     */
    private List<ShardingConditionValue> getTableShardingValuesFromHint() {
        if (isRoutingBySQLHint()) {
            return getTableShardingValuesFromSQLHint();
        }
        return getShardingConditions(HintManager.getTableShardingValues(logicTableName));
    }

    /**
     * 从SQL的hint中获取分片表的值
     * @return
     */
    private List<ShardingConditionValue> getTableShardingValuesFromSQLHint() {
        Collection<Comparable<?>> shardingValues = new LinkedList<>();
        Collection<String> tableNames = sqlStatementContext.getTablesContext().getTableNames();
        for (String each : tableNames) {
            if (each.equals(logicTableName) && sqlHintExtractor.containsHintShardingTableValue(each)) {
                shardingValues.addAll(sqlHintExtractor.getHintShardingTableValue(each));
            }
        }
        return getShardingConditions(shardingValues);
    }

    /**
     * 创建分片条件值集合。如果传入的集合不为空,则创建ListShardingConditionValue对象,否则为空集合
     * @param shardingValue
     * @return
     */
    private List<ShardingConditionValue> getShardingConditions(final Collection<Comparable<?>> shardingValue) {
        return shardingValue.isEmpty() ? Collections.emptyList() : Collections.singletonList(new ListShardingConditionValue<>("", logicTableName, shardingValue));
    }

    /**
     * 从分片条件对象中获取分片值
     * @param shardingRule
     * @param shardingColumns
     * @param shardingCondition
     * @return
     */
    private List<ShardingConditionValue> getShardingValuesFromShardingConditions(final ShardingRule shardingRule, final Collection<String> shardingColumns, final ShardingCondition shardingCondition) {
        List<ShardingConditionValue> result = new ArrayList<>(shardingColumns.size());
        // 遍历分片条件
        for (ShardingConditionValue each : shardingCondition.getValues()) {
            // 查找绑定关联表
            Optional<BindingTableRule> bindingTableRule = shardingRule.findBindingTableRule(each.getTableName());
            // (等于逻辑表 || 有关联表 && 关联表是逻辑表) && 属于分片键,则将该分片条件值加入到result中
            if ((logicTableName.equals(each.getTableName()) || bindingTableRule.isPresent() && bindingTableRule.get().hasLogicTable(logicTableName))
                    && shardingColumns.contains(each.getColumnName())) {
                result.add(each);
            }
        }
        return result;
    }

    /**
     * 路由
     * @param tableRule 表规则
     * @param databaseShardingStrategy 配置的数据源分片策略
     * @param databaseShardingValues 数据源分片值
     * @param tableShardingStrategy 配置的表分片策略
     * @param tableShardingValues 表的分片值
     * @return
     */
    private Collection<DataNode> route0(final TableRule tableRule,
                                        final ShardingStrategy databaseShardingStrategy, final List<ShardingConditionValue> databaseShardingValues,
                                        final ShardingStrategy tableShardingStrategy, final List<ShardingConditionValue> tableShardingValues) {
        // 数据源路由
        Collection<String> routedDataSources = routeDataSources(tableRule, databaseShardingStrategy, databaseShardingValues);
        Collection<DataNode> result = new LinkedList<>();
        // 遍历数据源
        for (String each : routedDataSources) {
            // 表路由
            result.addAll(routeTables(tableRule, each, tableShardingStrategy, tableShardingValues));
        }
        return result;
    }

    /**
     * 数据源路由
     * @param tableRule
     * @param databaseShardingStrategy
     * @param databaseShardingValues
     * @return
     */
    private Collection<String> routeDataSources(final TableRule tableRule, final ShardingStrategy databaseShardingStrategy, final List<ShardingConditionValue> databaseShardingValues) {
        // 如果数据没有对应的分片值,即查询的列没有设置数据源分片,则返回实际数据源名称
        if (databaseShardingValues.isEmpty()) {
            return tableRule.getActualDataSourceNames();
        }
        // 执行分片算法的doSharding(),获取分片的数据源名称集合
        Collection<String> result = databaseShardingStrategy.doSharding(tableRule.getActualDataSourceNames(), databaseShardingValues, tableRule.getDataSourceDataNode(), props);
        Preconditions.checkState(!result.isEmpty(), "No database route info");
        Preconditions.checkState(tableRule.getActualDataSourceNames().containsAll(result),
                "Some routed data sources do not belong to configured data sources. routed data sources: `%s`, configured data sources: `%s`", result, tableRule.getActualDataSourceNames());
        return result;
    }

    /**
     * 表路由
     * @param tableRule 表规则
     * @param routedDataSource 路由的数据源
     * @param tableShardingStrategy 表分片策略
     * @param tableShardingValues 表分片值
     * @return
     */
    private Collection<DataNode> routeTables(final TableRule tableRule, final String routedDataSource,
                                             final ShardingStrategy tableShardingStrategy, final List<ShardingConditionValue> tableShardingValues) {
        // 获取目标数据源的实际表的集合
        Collection<String> availableTargetTables = tableRule.getActualTableNames(routedDataSource);
        Collection<String> routedTables = tableShardingValues.isEmpty()
                ? availableTargetTables
                // 执行表分片算法的doSharding()方法,获取分片表名
                : tableShardingStrategy.doSharding(availableTargetTables, tableShardingValues, tableRule.getTableDataNode(), props);
        Collection<DataNode> result = new LinkedList<>();
        for (String each : routedTables) {
            // 创建分片的DataNode对象
            result.add(new DataNode(routedDataSource, each));
        }
        return result;
    }

    /**
     * 根据配置的信息,创建分片策略对象
     * @param shardingStrategyConfig 分片策略配置,如算法名称、分片键等
     * @param shardingAlgorithms 分片算法。key为算法的名称,value为对应算法类型的类(如:ClassBasedShardingAlgorithm等)
     * @param defaultShardingColumn 默认的分片键
     * @return
     */
    private ShardingStrategy createShardingStrategy(final ShardingStrategyConfiguration shardingStrategyConfig, final Map<String, ShardingAlgorithm> shardingAlgorithms,
                                                    final String defaultShardingColumn) {
        return null == shardingStrategyConfig ? new NoneShardingStrategy()
                : ShardingStrategyFactory.newInstance(shardingStrategyConfig, shardingAlgorithms.get(shardingStrategyConfig.getShardingAlgorithmName()), defaultShardingColumn);
    }
}

在route()方法中,执行如下:

1)执行getDataNodes(),获取当前SQL的分片数据节点,即真正要执行的数据库及表的映射对象;

1.1)根据配置的分片键的分片策略,创建数据源、表的分片策略对象。如对应分片键配置的策略为standard,则创建StandardShardingStrategy对象。其中的StandardShardingAlgorithm算法对象为算法配置时指定的type所对应的对象。如type配置为CLASS_BASED,则对应的StandardShardingAlgorithm算法对象为ClassBasedShardingAlgorithm。而在ClassBasedShardingAlgorithm中的StandardShardingAlgorithm算法对象才是真正自定义的算法对象;

1.2)根据配置的分片键的分片策略,判断数据源、表的策略条件,分为两种:Hint、非Hint。结合数据源、表,总共有三种场景:数据源、表都为Hint;都不为Hint;其中一种为Hint。针对上面的三种情况,分别判断处理,最后针对Hint和非Hint,获取分片的数据源和表;

1.2.1)如果是Hint的,则从提示中获取指定的数据源或表,创建新的ShardingCondition分片条件集合。逻辑为:

a)先从SQL语句中的注释信息中获取指定的数据源或表;

b)如果没有找到,则从HintManager中获取动态指定的数据源或表;

1.2.2)如果不是Hint,则使用WhereClauseShardingConditionEngine中解析where条件获取的ShardingCondition分片条件集合;

1.3)执行route0()方法,创建分片的数据节点;

1.3.1)执行routeDataSources(),执行数据源的分片算法(如ClassBasedShardingAlgorithm的doSharding()方法,在该方法中,执行自定义的分片算法的doSharding()方法),获取数据源字符串集合;

详见:【源码】Sharding-JDBC源码分析之分片规则生成器DatabaseRuleBuilder实现规则配置到规则对象的生成原理-中的ClassBasedShardingAlgorithm部分

1.3.2)遍历数据源字符串集合,执行routeTables(),执行表的分片算法,获取表字符串集合。同数据源字符串映射,生成DataNode对象。该对象记录分片的真实数据源、表的映射;

2)遍历DataNode集合对象,创建路由映射RouteMapper对象(逻辑表和真实表映射、数据源映射),创建路由单元RouteUnit对象。添加到RouteContext对象中;

3)返回RouteContext对象;

小结

限于篇幅,本篇先分享到这里。以下做一个小结:

1)在ShardingSpherePreparedStatement中真正执行SQL之前,要先创建路由上下文RouteContext对象;

该对象保存了SQL真正执行的数据库、逻辑表及真实表的映射信息;

2)对于非全路由的SQL操作,且没有在SQL的注释中指定数据源,则执行配置的路由器;

路由器包括ShardingSQLRouter(分片路由器)、SingleSQLRouter(单表路由器)、ReadwriteSplittingSQLRouter(读写分离路由器)、DatabaseDiscoverySQLRouter(数据库发现路由器)、ShadowSQLRouter(影子库路由器)。只需进行对应的配置,将自动创建对应的路由器。所有路由器以上面的顺序执行,第一个执行的会创建路由上下文对象,后执行的则合并路由上下文;

3)分片路由器ShardingSQLRouter如果有配置,会是第一个执行的路由器。在该路由器中,主要针对数据库的DML操作进行分片路由,创建路由上下文RouteContext对象;

3.1)DML操作主要分为两类,一种是Insert,另一种是带Where的。对于Insert语句,分片键的值在values部分,处理在InsertClauseShardingConditionEngine;后一种分片键的值在where部分,处理在WhereClauseShardingConditionEngine。通过分片条件引擎,解析分片键及值,创建ShardingCondition集合;

3.2)通过ShardingRouteEngineFactory.newInstance(),获取分片路由引擎。对于DML语句的普通分片规则,返回ShardingStandardRoutingEngine对象;

3.2.1)根据配置的分片键的分片策略,创建数据源、表的分片策略对象。如对应分片键配置的策略为standard,则创建StandardShardingStrategy对象。其中的StandardShardingAlgorithm算法对象为算法配置时指定的type所对应的对象。如type配置为CLASS_BASED,则对应的StandardShardingAlgorithm算法对象为ClassBasedShardingAlgorithm。而在ClassBasedShardingAlgorithm中的StandardShardingAlgorithm算法对象才是真正自定义的算法对象;

3.2.2)如果配置的是Hint策略,则从提示中获取指定的数据源或表(先从SQL的注释中获取,没有从HintManager中获取),创建新的ShardingCondition分片条件集合;

3.2.3)传入ShardingCondition分片条件,执行分片算法,获取实际执行的数据源及表,生成DataNode集合。先执行type对应的分片算法,在算法中执行自定义的分片算法(如果有配置);

3.2.4)遍历DataNode集合,创建路由映射(逻辑表和真实表映射、数据源映射)及路由单元,添加到新创建的路由上下文RouteContext中;

关于本篇内容你有什么自己的想法或独到见解,欢迎在评论区一起交流探讨下吧。


原文地址:https://blog.csdn.net/JingAi_jia917/article/details/143803962

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!