自学内容网 自学内容网

xxl-job--03--分片广播 & 动态分片

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


xxl-job通过分片广播模式

前言

xxl-job 是一个分布式任务调度平台,支持定时任务和分片任务。其中,分片任务可以将一个大任务拆分成多个小任务,分布式地执行,提高任务的执行效率和可靠性。分片任务中,有一种特殊的任务类型叫做分片广播任务,可以将一个任务广播到所有的执行器节点上执行,本质上是一种并行执行的方式。

1.定义

什么是分片广播:即xxl-job调度中心发出一次调度,所有相关节点全部执行一次
  • 采用传统轮询调度缺点:轮询调度只会调度某一台节点,也就是这100W数据都会冲击到1台节点执行,显然效率不合格。

采用分片广播调度优点

  • 优点1:发动所有节点同时执行,即100W的用户数据,均匀分配到所有节点,每个节点只需要执行部分数据即可,当然具体分片策略需要在代码逻辑中进行编写,调度中心做的只是让所有节点全部执行

  • 优点2:xxl-job会为每个注册节点分配一个index,这个index是xxl-job自行分配的,可以获取到,并且xxl-job也可以记录注册节点的总个数,总个数也可以获取到

2.API介绍

  • 在调度中心创建任务的时候,选择路由策略为:分片路由

在这里插入图片描述

如何获取分片信息

ShardingUtil.ShardingVO shardingVo = ShardingUtil.getShardingVo();
// 获取当前分片
int index = shardingVo.getIndex();
// 获取总分数量
int total = shardingVo.getTotal();

分片参数属性说明:

  • index:当前分片序号(从0开始),执行器集群列表中当前执行器的序号;
  • total:总分片数,执行器集群的总机器数量;

数据分片方案

  • 通过获取注册节点总个数totalCount 与 每个节点对应的index。 我们采用【用户id % totalCount = index】的方式,
  • 即余数=1,该条数据的重置就在对应index=1的节点上执行
用户id % totalCount = index

案例

示例1

xxl-job 分片广播任务的代码示例:

@XxlJob("broadcastJob")
public void broadcastJob() {
    int shardCount = 10; // 分片总数
    int shardIndex = XxlJobHelper.getShardIndex(); // 当前分片项

    // 执行任务逻辑
    for (int i = 0; i < 100; i++) {
        if (i % shardCount == shardIndex) {
            // 当前分片项需要执行的任务逻辑
            System.out.println("Shard " + shardIndex + " is running: " + i);
        }
    }
}

上述示例中,使用了 xxl-job 的注解 @XxlJob 标记了一个分片广播任务。任务的名称是broadcastJob,任务的执行逻辑在 broadcastJob 方法中实现。

  1. 首先获取了分片总数和当前分片项,然后根据分片参数执行具体的任务逻辑。任务逻辑是循环输出数字,并根据分片参数判断是否需要执行。
  2. 这里使用了 xxl-job 的工具类 XxlJobHelper 来获取分片参数。getShardIndex方法用于获取当前分片项,getShardTotal 方法用于获取分片总数。在任务执行时,xxl-job会自动传入分片参数,无需手动设置。

示例2

广播分片处理16个数据库,每个库有32 张表

@XxlJob("broadcastJob")
public void broadcastJob() {
    int shardCount = 24; // 分片总数
    int shardIndex = XxlJobHelper.getShardIndex(); // 当前分片项

    // 数据库列表
    String[] databases = {"db1", "db2", "db3", "db4", "db5", "db6", "db7", "db8", "db9", "db10", "db11", "db12", "db13", "db14", "db15", "db16"};

    // 处理每个数据库
    for (String database : databases) {
        // 表列表
        String[] tables = {"table1", "table2", "table3", "table4", "table5", "table6", "table7", "table8", "table9", "table10", "table11", "table12", "table13", "table14", "table15", "table16", "table17", "table18", "table19", "table20", "table21", "table22", "table23", "table24", "table25", "table26", "table27", "table28", "table29", "table30", "table31", "table32"};

        // 处理每张表
        for (String table : tables) {
            if ((shardIndex + table.hashCode()) % shardCount == shardIndex) {
                // 当前分片项需要处理的表
                System.out.println("Shard " + shardIndex + " is processing database " + database + ", table " + table);
                
                // 执行具体的任务逻辑,例如从数据库中读取数据并进行处理
                // ...
            }
        }
    }
}

示例中,使用了 xxl-job 的注解 @XxlJob 标记了一个分片广播任务。任务的名称是 broadcastJob,任务的执行逻辑在 broadcastJob 方法中实现。首先获取了分片总数和当前分片项,然后根据分片参数处理每个数据库中的每张表。

  • 在本例中,任务逻辑是输出需要处理的表的信息,并执行具体的任务逻辑,例如从数据库中读取数据并进行处理。这里使用了 hashCode方法将表名转换为整数,然后根据分片参数判断是否需要处理。这种方式可以保证每张表的处理任务分布均匀,不会因为表名的特殊性导致某些分片项的负载过大

3.总结

分片广播是 xxl-job 的一种任务类型,适用于一些需要并行执行的任务场景。在生产环境中,分片广播通常用于以下场景:

  • 数据处理任务:例如对大量数据进行清洗、分析、转换等操作,可以将任务拆分成多个小任务,分布式地执行,提高任务的执行效率和可靠性。
  • 分布式计算任务:例如对大规模数据进行机器学习、深度学习等计算,可以将计算任务拆分成多个小任务,分布式地执行,加速计算过程。
  • 并发请求任务:例如对多个服务进行并发请求,可以将请求拆分成多个小请求,分布式地执行,提高请求的并发处理能力。分片广播适用于需要将一个任务拆分成多个小任务,分布式地执行的场景,可以提高任务的执行效率和可靠性,同时降低单个节点的负载压力。

原文地址:https://blog.csdn.net/weixin_48052161/article/details/142634045

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!