自学内容网 自学内容网

分布式事务seata基于docker安装和项目集成seata

目录

本地事务

根据隔离性的等级会导致不同的问题

有四种隔离等级

分布式事务 

 现在有一个场景:

结果:

CAP定理 

矛盾

 总结:

es集群使用的是cp:

BASE理论 

解决分布式事务的思路 

Seata

Seata的架构

docker安装seata

1.拉取seata镜像

2.可以把seata镜像打包成jar包,方便下次使用

3.运行seata镜像

4.修改seata的配置文件

5.在nacos中添加seataServer.properties配置文件

6.在mysql创建seata数据库

​编辑

7.重启seata

 项目集成seata

1.导入依赖

2.在application.yml配置seata信息

3.直接在订单微服务使用GlobalTransactional注解开启全局事务


本地事务

本地事务,也就是传统的单机事务。在传统数据库事务中,必须要满足四个原则:

  • 原子性:事务中的所有操作必须同时成功或者同时失败
  • 一致性:数据总量前后必须保持一致性
  • 持久性:对数据库的所有修改必须要永久保存
  • 隔离性:对同一资源操作的事务不可以同时发生

根据隔离性的等级会导致不同的问题

  • 1.脏读:一个事务读取到了其他事务还没有提交的数据
  • 2.不可重复读:一个事务多次读取的数据不同,这是因为这个事务还在执行的时候,其他事务已经update数据(已经commit)
  • 3.幻读:一个事务读取到其他事务插入或者删除的数据(别的事务commit后),导致前后读取的记录行数不同

有四种隔离等级

  • 1.读未提交:会有所有问题
  • 2.读未提交:没有脏读的问题(大多数数据库默认)
  • 3.可重复读:只有幻读的问题(mysql默认)
  • 4.串行化(单线程):没有问题,但是效率低 

分布式事务 

分布式事务,就是指不是在单个服务或单个数据库架构下,产生的事务,例如:

  • 跨数据源的分布式事务
  • 跨服务的分布式事务
  • 综合情况

在数据库水平拆分、服务垂直拆分之后,一个业务操作通常要跨多个数据库、服务才能完成。例如电商行业中比较常见的下单付款案例,包括下面几个行为:

  • 创建新订单
  • 扣减商品库存
  • 从用户账户余额扣除金额

完成上面的操作需要访问三个不同的微服务和三个不同的数据库。

订单的创建、库存的扣减、账户扣款在每一个服务和数据库内是一个本地事务,可以保证ACID原则。

但是当我们把三件事情看做一个"业务",要满足保证“业务”的原子性,要么所有操作全部成功,要么全部失败,不允许出现部分成功部分失败的现象,这就是分布式系统下的事务了。

此时ACID难以满足,这是分布式事务要解决的问题

 现在有一个场景:

在订单服务创建订单时,会给账户服务删除账户金额,并给库存服务删除库存

现在库存只有10个,但是用户会买20个,看一下在分布式下ACID是否还能生效

订单创建,使用@Transactional注解添加事务

@Override
    @Transactional
    public Long create(Order order) {
        // 创建订单
        orderMapper.insert(order);
        try {
            // 扣用户余额
            accountClient.deduct(order.getUserId(), order.getMoney());
            // 扣库存
            storageClient.deduct(order.getCommodityCode(), order.getCount());

        } catch (FeignException e) {
            log.error("下单失败,原因:{}", e.contentUTF8(), e);
            throw new RuntimeException(e.contentUTF8(), e);
        }
        return order.getId();
    }

减少金额

    @Override
    @Transactional
    public void deduct(String userId, int money) {
        log.info("开始扣款");
        try {
            accountMapper.deduct(userId, money);
        } catch (Exception e) {
            throw new RuntimeException("扣款失败,可能是余额不足!", e);
        }
        log.info("扣款成功");
    }

减少库存

@Transactional
    @Override
    public void deduct(String commodityCode, int count) {
        log.info("开始扣减库存");
        try {
            storageMapper.deduct(commodityCode, count);
        } catch (Exception e) {
            throw new RuntimeException("扣减库存失败,可能是库存不足!", e);
        }
        log.info("扣减库存成功");
    }

发送请求

http://localhost:8082/order?userId=user202103032042012&commodityCode=100202003032041&count=20&money=200

结果:

库存微服务中存在事务,由于库存不足,数据会自动回滚,库存数量不变

订单微服务中也没有新建订单,因为调用库存微服务的接口保存,事务也会回滚

 但是账户微服务就会出现问题,因为减少金额的过程中没有出现任务问题,就会让金额减少

原本1000元,这就是分布式中事务的问题,导致数据的不一致性 

测试发现,当库存不足时,如果余额已经扣减,并不会回滚,出现了分布式事务问题。 

CAP定理 

分布式中三个指标,只能三选二,并且一定要选分区容错性: 

  • Consistency(一致性)
  • Availability(可用性)
  • Partition tolerance (分区容错性)

Consistency(一致性):用户访问分布式系统(集群)中的任意节点,得到的数据必须一致。

 Availability (可用性):用户访问集群中的任意健康节点,必须能得到响应,而不是超时或拒绝。

Partition(分区):因为网络故障或其它原因导致分布式系统中的部分节点与其它节点失去连接,形成独立分区。

Tolerance(容错):在集群出现分区时,整个系统也要持续对外提供服务 

网络出现问题,导致集群分成不同的区域,在分布式事务中,我们不会因为网络问题的原因就让集群中只会有一个区域,分区之后就会一致性和可用性必须二选一

矛盾

在分布式系统中,系统间的网络不能100%保证健康,一定会有故障的时候,而服务有必须对外保证服务。因此Partition Tolerance不可避免。

当节点接收到新的数据变更时,就会出现问题了:

如果此时要保证一致性就必须等待网络恢复,完成数据同步后(node3断开连接,无法同步数据),node3服务处于阻塞状态,不可用。

有两种情况

  • 1.不同节点存的数据是不同的(总的数据被分片到不同的节点,如es),node3节点失联时,node3节点的数据会被重新分配到其他的节点中,保证数据的一致性(总的数据一致性)
  • 2.不同节点存的数据是相同的,node3节点失联时,node3服务不可用,保证在node1和node2的数据一致性,因为(node3与其他节点无法同步数据)

如果此时要保证可用性,就不能等待网络恢复,那node01、node02与node03之间就会出现数据不一致。

也就是说,在P一定会出现的情况下,A和C之间只能实现一个。

 总结:

分布式系统节点通过网络连接,一定会出现分区问题(P)

当分区出现时,系统的一致性和可用性就无法同时满足

cp-->不同节点的角色不同

ap-->不同节点的角色相同

es集群使用的是cp:

es集群出现分区时,故障节点会被剔除出集群,数据分片会被重新分配到其他节点中,保证数据的一致性。因此是低可用性,高一致性,属于CP

eureka集群-->ap  zookeeper-->cp nacos集群-->默认ap可改成cp

BASE理论 

BASE理论是对AP的一种解决思路,包含三个思想:

  • Basically Available (基本可用):分布式系统在出现故障时,允许损失部分可用性,即保证核心可用。
  • **Soft State(软状态):**在一定时间内,允许出现中间状态,比如临时的不一致状态。
  • Eventually Consistent(最终一致性):虽然无法保证强一致性,但是在软状态结束后,最终达到数据一致。

解决分布式事务的思路 

分布式事务最大的问题是各个子事务的一致性问题,因此可以借鉴CAP定理和BASE理论,有两种解决思路:

  • AP模式:各子事务分别执行和提交,允许出现结果不一致,然后采用弥补措施恢复数据即可,实现最终一致。

  • CP模式:各个子事务执行后互相等待,同时提交,同时回滚,达成强一致。但事务等待过程中,处于弱可用状态。

但不管是哪一种模式,都需要在子系统事务之间互相通讯,协调事务状态,也就是需要一个事务协调者(TC)

 这里的子系统事务,称为分支事务;有关联的各个分支事务在一起称为全局事务。

全局事务:整个分布式事务

分支事务:分布式事务中包含的每个子系统的事务

最终一致思想:各分支事务先分别执行并提交,如果有不一致的情况,再想办法恢复数据

强一致思想:各分支事务执行完各自的业务先不提交,等待彼此最后的结果。最后决定是统一提交还是回滚 

Seata

Seata是 2019 年 1 月份蚂蚁金服和阿里巴巴共同开源的分布式事务解决方案。致力于提供高性能和简单易用的分布式事务服务,为用户打造一站式的分布式解决方案。

官网地址:​​​​​​http://seata.io/,其中的文档、播客中提供了大量的使用说明、源码分析。

Seata的架构

Seata事务管理中有三个重要的角色:

  • TC (Transaction Coordinator) - **事务协调者:-->需要单独安装部署**维护全局和分支事务的状态,协调全局事务提交或回滚

  • TM (Transaction Manager) - **事务管理器:**定义全局事务的范围、开始全局事务、提交或回滚全局事务。

  • RM (Resource Manager) - **资源管理器:**管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

就比如再订单微服务里使用一个全局事务的注解,开启全局事务,然后在订单方法里的账号微服务,库存微服务,加上订单微服务自己,一共有三个分支事务

 

Seata基于上述架构提供了四种不同的分布式事务解决方案:

  • XA模式:强一致性分阶段事务模式,牺牲了一定的可用性,无业务侵入
  • TCC模式:最终一致的分阶段事务模式,有业务侵入
  • AT模式:最终一致的分阶段事务模式,无业务侵入,也是Seata的默认模式
  • SAGA模式:长事务模式,有业务侵入

无论哪种方案,都离不开TC,也就是事务的协调者。

docker安装seata

1.拉取seata镜像

docker pull seataio/seata-server:1.4.2

 拉取时遇到

Error response from daemon: Get "https://registry-1.docker.io/v2/": dial tcp 74.86.118.24:443: i/o timeout问题

 vim /etc/docker/daemon.json 

添加以下配置

{
  "registry-mirrors": ["https://docker.registry.cyou",
"https://docker-cf.registry.cyou",
"https://dockercf.jsdelivr.fyi",
"https://docker.jsdelivr.fyi",
"https://dockertest.jsdelivr.fyi",
"https://mirror.aliyuncs.com",
"https://dockerproxy.com",
"https://mirror.baidubce.com",
"https://docker.m.daocloud.io",
"https://docker.nju.edu.cn",
"https://docker.mirrors.sjtug.sjtu.edu.cn",
"https://docker.mirrors.ustc.edu.cn",
"https://mirror.iscas.ac.cn",
"https://docker.rainbond.cc"]
}

 [root@bogon soft]# systemctl daemon-reload
[root@bogon soft]# systemctl restart docker

 

2.可以把seata镜像打包成jar包,方便下次使用

docker save -o seata.tar seataio/seata-server

3.运行seata镜像

docker volume creata seata-config #创建数据卷
docker run --name seata -p 8091:8091 -e SEATA_IP=192.168.230.100 -e SEATA_PORT=8091 -v seata-config:/seata-server/resources -id seataio/seata-server:1.4.2

4.修改seata的配置文件

[root@bogon soft]# docker volume inspect seata-config 
[
    {
        "CreatedAt": "2024-11-16T20:59:21+08:00",
        "Driver": "local",
        "Labels": null,
        "Mountpoint": "/var/lib/docker/volumes/seata-config/_data",
        "Name": "seata-config",
        "Options": null,
        "Scope": "local"
    }
]
[root@bogon soft]# cd /var/lib/docker/volumes/seata-config/_data
[root@bogon _data]# vi registry.conf 

内容修改为: 

命令行:%d  删除所有

registry {
  # tc服务的注册中心类,这里选择nacos,也可以是eureka、zookeeper等
  type = "nacos"

  nacos {
    # seata tc 服务注册到 nacos的服务名称,可以自定义
    application = "seata-tc-server"
    serverAddr = "192.168.230.100:8848"
    group = "DEFAULT_GROUP"
    namespace = ""
    cluster = "SH"
    username = "nacos"
    password = "nacos"
  }
}

config {
  # 读取tc服务端的配置文件的方式,这里是从nacos配置中心读取,这样如果tc是集群,可以共享配置
  type = "nacos"
  # 配置nacos地址等信息
  nacos {
    serverAddr = "192.168.230.100:8848"
    namespace = ""
    group = "DEFAULT_GROUP"
    username = "nacos"
    password = "nacos"
    dataId = "seataServer.properties"
  }
}

5.在nacos中添加seataServer.properties配置文件

 内容为

# 数据存储方式,db代表数据库
store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://192.168.230.100:3306/seata?useUnicode=true&rewriteBatchedStatements=true&useSSL=false
store.db.user=root
store.db.password=1234
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
# 事务、日志等配置
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000

# 客户端与服务端传输方式
transport.serialization=seata
transport.compressor=none
# 关闭metrics功能,提高性能
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

6.在mysql创建seata数据库

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(128),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `status`         TINYINT      NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_status` (`status`),
    KEY `idx_branch_id` (`branch_id`),
    KEY `idx_xid_and_branch_id` (`xid` , `branch_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

CREATE TABLE IF NOT EXISTS `distributed_lock`
(
    `lock_key`       CHAR(20) NOT NULL,
    `lock_value`     VARCHAR(20) NOT NULL,
    `expire`         BIGINT,
    primary key (`lock_key`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);

7.重启seata

docker restart seata

出现{dataSource}init表示连接seata数据库成功,就表示读取nacos的配置文件成功 

 

 项目集成seata

1.导入依赖

父工程导入依赖版本管理

<dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>${alibaba.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <!-- springCloud -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

导入seata依赖

<dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <exclusions>
                <!--版本较低,1.3.0,因此排除-->
                <exclusion>
                    <artifactId>seata-spring-boot-starter</artifactId>
                    <groupId>io.seata</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--seata starter 采用1.4.2版本-->
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
            <version>${seata.version}</version>
        </dependency>

2.在application.yml配置seata信息

seata:
  registry: # TC服务注册中心的配置,微服务根据这些信息去注册中心获取tc服务地址
    # 参考tc服务自己的registry.conf中的配置
    type: nacos
    nacos: # tc
      server-addr: 192.168.230.100:8848
      namespace: ""
      group: DEFAULT_GROUP
      application: seata-tc-server # tc服务在nacos中的服务名称
      cluster: SH
  tx-service-group: seata-demo # 事务组,根据这个获取tc服务的cluster名称
  service:
    vgroup-mapping: # 事务组与TC服务cluster的映射关系
      seata-demo: SH

3.直接在订单微服务使用GlobalTransactional注解开启全局事务

@Override
    //@Transactional
    @GlobalTransactional
    public Long create(Order order) {
        // 创建订单
        orderMapper.insert(order);
        try {
            // 扣用户余额
            accountClient.deduct(order.getUserId(), order.getMoney());
            // 扣库存
            storageClient.deduct(order.getCommodityCode(), order.getCount());

        } catch (FeignException e) {
            log.error("下单失败,原因:{}", e.contentUTF8(), e);
            throw new RuntimeException(e.contentUTF8(), e);
        }
        return order.getId();
    }


原文地址:https://blog.csdn.net/luosuss/article/details/143819201

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!