自学内容网 自学内容网

Seata分布式事务实践

理论篇
什么是事务
关于事务我们一定会想到下面这四大特性:

原子性:所有操作要么全都完成,要么全都失败。
一致性: 保证数据库中的完整性约束和声明性约束。
隔离性:对统一资源的操作不会同时发生的。
持久性:对事务完成的操作最终会持久化到数据库中。

理解:

一致性:可以理解为数据是满足完整性约束的,也就是不会存在中间状态的数据。比如你账上有400,我账号上有100,你给我打2000,此时你账上的钱,因该是200,我账上的钱应该是300,不会存在我账号上钱加了,你账上钱没扣的中间状态。

隔离性:指的是多个事务并发执行的时候不会互相干扰。既一个事务内部的数据对于其他事务来说是隔离的。

分布式事务简介
而分布式事务,不仅包含上述四个特性,这个事务可能是跨服务也可能是跨数据源的一种事务。

什么是Seata

Seata是一款开源的分布式解决方案。致力于提供高性能和简单易用的分布式事务服务。Seata 为用户提供了 XA,AT、TCC 和 SAGA 事务模式,为用户打造一站式的分布式解决方案。

Seata这个开源工具实现分布式事务则是基于以下三个角色:

Seata的三大角色

  1. TC (Transaction Coordinator)事务协调者 :维护全局事务和分支事务的状态,协调全局事务的提交和回滚。
  2. TM (Transaction Manager) 事务管理器:定义全局事务的范围,开始全局事务、提交或者回滚全局事务。
  3. RM (Resource Manager) 资源管理器:管理分支事务处理的资源,并向TC注册分支事务以及报告分支事务的状态,并驱动分支事务的提交和回滚。

 可以查看相关文档:

Seata 是什么? | Apache Seata

部署安装前言

部署Seata并注册到Nacos上。

首先我们来部署以下Seata,首先我们得去官网下载一下资源,以笔者为例,笔者当前使用的是2.0.0这个版本:

一、下载seata安装包

先在seata官网下载seata安装包

压缩包解压后目录如下:

config目录下,根据application.example.yml文件里面配置,修改配置文件application.yml

script目录:

二、指定nacos为配置中心、注册中心

打开config/application.yml文件,我们主要关注以下几个模块的配置。

1、server端配置中心指定,seata.config.type

2、server端注册中心指定,seata.registry.type

3、server端存储模式指定,seata.store.mode

修改config/application.yml文件,指定nacos为配置中心、注册中心,并将seata.store.mode屏蔽,后续中配置中心中进行配置。

#  Copyright 1999-2019 Seata.io Group.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

server:
  port: 7091

spring:
  application:
    name: seata-server

logging:
  config: classpath:logback-spring.xml
  file:
    path: ${log.home:${user.home}/logs/seata}
  extend:
    logstash-appender:
      destination: 127.0.0.1:4560
    kafka-appender:
      bootstrap-servers: 127.0.0.1:9092
      topic: logback_to_logstash

console:
  user:
    username: seata
    password: seata
seata:
  config:
    # support: nacos 、 consul 、 apollo 、 zk  、 etcd3
    type: nacos #使用nacos作为配置中心
    nacos:
      server-addr: 127.0.0.1:8088
      namespace: 01025263-d24e-4558-a2a1-2448142e9d57 #nacos命名空间id,""为nacos保留public空间空间
      group: DEFAULT_GROUP #指定配置至nacos注册中心的分组名
      #username:
      #password:
      #context-path:
      ##if use MSE Nacos with auth, mutex with username/password attribute
      #access-key:
      #secret-key:
      data-id: seataServer.properties
  registry:
    # support: nacos 、 eureka 、 redis 、 zk  、 consul 、 etcd3 、 sofa
    type: nacos #使用nacos作为注册中心
    nacos:
      application: seata-server #指定注册至nacos注册中心的服务名
      server-addr: 127.0.0.1:8088
      group: DEFAULT_GROUP
      namespace: 01025263-d24e-4558-a2a1-2448142e9d57
      cluster: default #指定注册至nacos注册中心的集群名,与配置中心文件中的service.vgroupMapping.[事务分组配置项]=TC集群的名称保持一致
      #username:
      #password:
      #context-path:
      ##if use MSE Nacos with auth, mutex with username/password attribute
      #access-key:
      #secret-key:
  #store:
    # support: file 、 db 、 redis 、 raft
    #mode: db
  #  server:
  #    service-port: 8091 #If not configured, the default is '${server.port} + 1000'
  security:
    secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
    tokenValidityInMilliseconds: 1800000
    ignore:
      urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/metadata/v1/**

其余配置可参考config/application.example.yml文件。

三、上传配置到nacos配置中心

请确保后台已经启动 Nacos 服务。

首先你需要在nacos新建seata配置,此处dataId为seataServer.propertie

​​

配置内容参考:

# socket通信方式, 公共部分
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false

transport.rpcRmRequestTimeout=30000
transport.rpcTmRequestTimeout=30000
transport.rpcTcRequestTimeout=30000
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
transport.serialization=seata
transport.compressor=none
 
# 首先应用程序(客户端)中配置了事务分组,若应用程序是SpringBoot则通过配置seata.tx-service-group=[事务分组配置项]
# 事务群组,service.vgroupMapping.[事务分组配置项]=TC集群的名称
#事务分组需要和服务端配置文件中一致
#service.vgroupMapping.default_tx_group=default
service.vgroupMapping.vmi-service-group=default

#If you use a registry, you can ignore it
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false

# undo配置
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k

#For TCC transaction mode
tcc.fence.logTableName=tcc_fence_log
tcc.fence.cleanPeriod=1h
# You can choose from the following options: fastjson, jackson, gson
tcc.contextJsonParserType=fastjson
#Log rule configuration, for client and server
log.exceptionRate=100

#事务会话信息存储方式
store.mode=db
#事务锁信息存储方式
store.lock.mode=db
#事务回话信息存储方式
store.session.mode=db
#db或redis存储密码解密公钥
#store.publicKey=

#存储方式为db
store.db.datasource=druid
store.db.dbType=mysql
#store.db.driverClassName=com.mysql.jdbc.Driver
#store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.driverClassName=com.mysql.cj.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true&serverTimezone=GMT%2B8
store.db.user=root
store.db.password=root12345
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.distributedLockTable=distributed_lock
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000

# 事务规则配置
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
server.distributedLockExpireTime=10000
server.session.branchAsyncQueueSize=5000
server.session.enableBranchAsyncRemove=false
server.enableParallelRequestHandle=true
server.enableParallelHandleBranch=false

server.raft.cluster=127.0.0.1:7091,127.0.0.1:7092,127.0.0.1:7093
server.raft.snapshotInterval=600
server.raft.applyBatch=32
server.raft.maxAppendBufferSize=262144
server.raft.maxReplicatorInflightMsgs=256
server.raft.disruptorBufferSize=16384
server.raft.electionTimeoutMs=2000
server.raft.reporterEnabled=false
server.raft.reporterInitialDelay=60
server.raft.serialization=jackson
server.raft.compressor=none
server.raft.sync=true

#Metrics配置
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

以上主要注意以下几个配置:

1.配置事务分组,注意,这个在springboot应用引入seata时配置需要。

#service.vgroupMapping.default_tx_group=default

service.vgroupMapping.vmi-service-group=default

2.存储方式为db时的配置

注意我的mysql为8.0版本,所以使用com.mysql.cj.jdbc.Driver

#存储方式为db
store.db.datasource=druid
store.db.dbType=mysql
#store.db.driverClassName=com.mysql.jdbc.Driver
#store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true
#我的mysql版本为8.0
store.db.driverClassName=com.mysql.cj.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true&serverTimezone=GMT%2B8
store.db.user=root
store.db.password=root12345

四、创建表

在上面的nacos配置文件中,已经配置了存储模型为db以及数据库连接配置,还需建表。

​建表语句在文件script/server/db/mysql.sql

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
create database if not exists seata DEFAULT CHARACTER SET utf8mb4;
use seata;
CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(128),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `status`         TINYINT      NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_status` (`status`),
    KEY `idx_branch_id` (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

CREATE TABLE IF NOT EXISTS `distributed_lock`
(
    `lock_key`       CHAR(20) NOT NULL,
    `lock_value`     VARCHAR(20) NOT NULL,
    `expire`         BIGINT,
    primary key (`lock_key`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);

五、启动seata server

支持的启动命令参数:

命令启动
liunx:

cd 到bin目录下

./seata-server.sh -h 127.0.0.1 -p 8091

windows:

打开cmd,cd到bin目录下运行seata-server.bat

或者到bin目录下直接双击seata-server.bat

可以看到启动成功

也可以输入http://localhost:7091,输入用户名seata,密码seata

另外在nacos也可以看到seata-server注册成功 

如果seata-server需要集群部署,多启几个实例,就行。

六、在服务中集成Seata

<spring-cloud-alibaba.version>2.1.0.RELEASE</spring-cloud-alibaba.version>
<io.seata.version>2.0.0</io.seata.version>
 <!--引入seata-->
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
            <version>${io.seata.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>io.netty</groupId>
                    <artifactId>netty-all</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.73.Final</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <version>${spring-cloud-alibaba.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-spring-boot-starter</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

然后在application.yml中添加下面这样一段配置,具体含义参加注释

server:
  port: 9990
spring:
  application:
    name: dc-user
  profiles:
    active: local
  cloud:
    nacos:
      discovery:
        server-addr: 192.168.31.186:8088
        namespace: 01025263-d24e-4558-a2a1-2448142e9d57
      config:
        server-addr: 192.168.31.186:8088
        namespace: 01025263-d24e-4558-a2a1-2448142e9d57
        group: DEFAULT_GROUP
        prefix: ${spring.application.name}
        file-extension: yaml
#seata配置
    alibaba:
      seata:
        tx-service-group: vmi-service-group #事务分组需要和服务端配置文件中一致
seata:
  enabled: true  # 是否开启seata,默认true
  #enable-auto-data-source-proxy: true #数据源自动代理
  #tx-service-group: vmi-service-group #事务分组需要和服务端配置文件中一致,这个配置无效可删除
  data-source-proxy-mode: AT
  service:
    #开启全局事务,默认为false开启
    #disable-global-transaction: false
    #可以不需要
    vgroup-mapping:
      vmi-service-group: default #事务分组需要和服务端配置文件中一致
  registry:
    type: nacos
    nacos:
      application : seata-server
      namespace: 01025263-d24e-4558-a2a1-2448142e9d57
      serverAddr : 127.0.0.1:8088
      group : DEFAULT_GROUP
      cluster : default
      #username : nacos
      #password : nacos
  config:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8088
      namespace: 01025263-d24e-4558-a2a1-2448142e9d57
      group: DEFAULT_GROUP
      #username: nacos
      #password: nacos
      data-id: seataServer.properties
特别注意,事务分组需要和服务端配置文件中一致,与seata服务seataServer.propertie配置文件中保持一致

注意以上文件这两点配置

spring:
  cloud:
    alibaba:
      seata:
        tx-service-group: vmi_tx_group #事务分组需要和服务端配置文件中一致
 
seata:
  service:
    vgroup-mapping:
      vmi_tx_group: default #事务分组需要和服务端配置文件中一致

七,基于Seata实现分布式事务的四种方式

(1)XA模式
了解XA模式的工作原理和优缺点

如下图,XA模式的分布式事务执行流程大抵如下,简而言之,我们可以将其分为两个阶段,首先是:

RM第一阶段:所有的分支事务向TC注册自己的状态,完成后各自执行SQL但是不提交,并将状态报告给TC。

TC 第二阶段:

1.TC查看当前事务的所有分支事务是否都成功了,如果都成功则协通知所有RM提交事务,反之回滚事务。
2.收到TC全局事务提交后,RM将自己管理的分支事务也提交了。

XA模式的分布式事务优缺点:

  1. 优点: 强一致性,符合ACID原则。 且实现简单,没有代码侵入。
  2. 缺点:为了保证强一致性,所以必须保证所有SQL执行没有问题才能提交,所以一阶段这些数据会被锁住,导致其他需要执行这些SQL的事务被阻塞,性能较差。
实践:

首先修改每个需要使用XA模式的服务,在yml使用下面这样一段配置

seata:
  data-source-proxy-mode: XA

XA模式代码如下所示,可以看到笔者仅仅是在方法上加一个@GlobalTransactional注解就能保证服务1和服务2之间分布式事务的ACID。感兴趣的读者可以自行编写一个demo,可以看到一旦任意服务报错,控制台就会输出RollBack将事务操作回滚。

 @GlobalTransactional
    public BaseResponse hello(VerifyRegisterDto dto) {
        IdentityDocument identityDocument=new IdentityDocument();
        identityDocument.setIdentityName(dto.getUserName()); //证件类型名
        identityDocument.setIdPictureDescOne("证件图片描述1"); // 证件图片描述1
        identityDocument.setIdPictureDescTwo(dto.getUserName()); //证件图片描述2
        // 证件信息保存
        identityDocumentMapper.insert(identityDocument);

        PSysUser sysUser = new PSysUser();
        sysUser.setUserName(dto.getUserName());
        sysUser.setPhone(dto.getPhone());
        sysUser.setRoleId(2);
        sysUser.setUid(UIDUtil.getUUID());
        sysUser.setUserId(UIDUtil.nextId());
        String salt = UIDUtil.getUUID();
        sysUser.setSalt(salt);
        String md5Password = DigestUtils.md5Hex(salt + dto.getPassword());
        sysUser.setPassword(md5Password);
        // 保存用户信息
        sysUserMapper.insertSelective(sysUser);
        return ResultUtil.success();
    }

这段自行编写

(2)AT模式

AT模式的工作流程图如下所示,总结以下它的工作流程:

RM第一阶段工作

  1. 注册分支事务。
  2. 解析SQL记录修改前后的SQL镜像并存储到undo-log
  3. 执行业务SQL并直接提交事务。
  4. 通知TC当前事务的状态。

RM第二阶段工作

  1. 如果TC确定所有事务都成功且发起通知告知当前RM,则RM会将undo-log删除。
  2. 如果TC通知失败则RM会根据undo-log将数据还原。

对着我们服务需要用到的所有数据库,刷入undo-log表。(应用所用到的所有数据库刷入此表)

-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log`  (
  `branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',
  `xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'global transaction id',
  `context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'undo_log context,such as serialization',
  `rollback_info` longblob NOT NULL COMMENT 'rollback info',
  `log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status',
  `log_created` datetime(6) NOT NULL COMMENT 'create datetime',
  `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
  UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Compact;

完成后对着yml文件修改或添加下面这样一段配置

seata:
  data-source-proxy-mode: AT # 默认就是AT

笔者代码如下所示,可以看到进行数据库的证件信息保存操作,和其他数据库保存用户信息的操作,但是再保存篇用户信息时故意添加了一个报错。

 @GlobalTransactional
    public BaseResponse hello(VerifyRegisterDto dto) {
        IdentityDocument identityDocument=new IdentityDocument();
        identityDocument.setIdentityName(dto.getUserName()); //证件类型名
        identityDocument.setIdPictureDescOne("证件图片描述1"); // 证件图片描述1
        identityDocument.setIdPictureDescTwo(dto.getUserName()); //证件图片描述2
        // 证件信息保存
        identityDocumentMapper.insert(identityDocument);

        PSysUser sysUser = new PSysUser();
        sysUser.setUserName(dto.getUserName());
        sysUser.setPhone(dto.getPhone());
        sysUser.setRoleId(2);
        sysUser.setUid(UIDUtil.getUUID());
        sysUser.setUserId(UIDUtil.nextId());
        String salt = UIDUtil.getUUID();
        sysUser.setSalt(salt);
        String md5Password = DigestUtils.md5Hex(salt + dto.getPassword());
        sysUser.setPassword(md5Password);
        // 保存用户信息
        sysUserMapper.insertSelective(sysUser);
        return ResultUtil.success();
    }

经过debug我们可以发现,在证件信息完成数据提交后

查询数据库,可以看到证件信息数据已保存在表中

其中

global_table记录了一个全局事务信息。

branch_table记录了一条分支事务信息。

lock_table记录了一条锁的信息。

应用所用到数据库中undo-log表也会记录一条信息,我们不妨点入查看详细内容

查看详细内容可以看到,这里面记录的就是插入前后的数据库的镜像

{
"@class": "io.seata.rm.datasource.undo.BranchUndoLog",
"xid": "192.168.31.186:8091:3558411956758468259",
"branchId": 3558411956758468261,
"sqlUndoLogs": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.undo.SQLUndoLog",
"sqlType": "INSERT",
"tableName": "userbusiness.identity_document",
"beforeImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords",
"tableName": "identity_document",
"rows": ["java.util.ArrayList", []]
},
"afterImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
"tableName": "identity_document",
"rows": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.sql.struct.Row",
"fields": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "id",
"keyType": "PRIMARY_KEY",
"type": 4,
"value": 15
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "identity_name",
"keyType": "NULL",
"type": 12,
"value": "aaa333"
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "Id_picture_desc_one",
"keyType": "NULL",
"type": 12,
"value": "证件图片描述1"
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "Id_picture_desc_two",
"keyType": "NULL",
"type": 12,
"value": "aaa333"
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "create_time",
"keyType": "NULL",
"type": 93,
"value": null
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "update_time",
"keyType": "NULL",
"type": 93,
"value": null
}]]
}]]
}
}]]
}

一旦报错控制台就会输出一段Rollback的内容,并且事务也会根据undo-log回滚

一旦报错,查询数据库,可以看到证件信息数据会回滚。

AT模式优缺点以及和XA模式的区别

XA一阶段会锁定资源,AT模式则是直接提交事务不锁定资源。
XA模式回滚依赖数据库事务,AT模式则是根据我们上面自己创建的undo-log的内容进行还原。
XA模式是强一致性,AT模式是最终一致性。
AT模式优缺点:

一阶段提交不会锁定资源,性能较好。
利用全局锁实现读写隔离。
没有代码侵入,便于使用。
缺点:

框架记录undo-log会有一定开销,但是性能相比XA会好很多。
两阶段属于软阶段,属于最终一致性,中间可能会有数据不一致问题。

(3)TCC模式
TCC模式依旧延续之前的架构,只不过TCC各个阶段都需要人工实现,TCC实现分布式事务我们必须实现以下三个方法:

try:进行资源检测和预留,例如我们要将user,id=1的name从zhangsan改为lisi,那么我们就得将zhangsan这个值预留下来(存到一张资源预留表中),并提交zhangsan改为lisi这个操作,并提交事务。
confirm:完成资源的业务操作(将预留表中的内容删除),try成功就要求confirm方法逻辑一定要成功。
cancel:事务提失败,将预留资源释放(将预留表中的数据状态设置为取消,并拿着这个旧值去还原数据,即手动回滚补偿)。

TCC模式优缺点:

优点

  1. 一阶段即可完成数据提交,释放数据库资源,性能好。
  2. 无需像AT模式那样需要生成快照,使用全局锁,性能最强。
  3. 不依赖数据库事务,而是手动补偿依赖操作,可以用非事务型数据库。

缺点:

  1. 有代码侵入,需要手写资源消耗、补偿等逻辑。
  2. 软状态,属于最终一致性。
  3. 需要考虑confirmcancel的情况,做好幂等相关处理。

为了演示TCC模式,笔者就基于自己的服务演示一下TCC的使用,举个例子,笔者现在有个为system的服务,system服务会将id为1的user名字进行修改,完成后同步修改file服务的相关数据库。

​ 

 所以笔者就以system为例实现一下TCC相关的服务,我们首先梳理一下思路,首先我们要做的就是将id为1的user的name由1改为2。那么我的try方法就需要进行以下操作:

1. 将user表id为1的用户的旧值冻结起来(这条数据状态设置为0,代表处于try状态),便于后续回滚补偿。
2. 将user表id为1的用户name值更新。

完成后编写confirm方法,因为只有try成功了才会走到confirm方法,它的逻辑很简单,将记录user表冻结的值的数据删除即可。

而cancel方法就是对资源的补偿处理,框架走到cancel则说明事务执行失败了,我们需要将修改的数据进行补偿,例如我们的user表id为1的name由1改为2,那么我们就需要到资源冻结表找到这个条数据的冻结记录,拿着旧值还原user数据,完成后再将这张资源冻结表数据状态设置为3(已取消)

完成上述逻辑后,我们还需要考虑两个问题,第一个是空悬挂问题,如下图,我们的try方法执行太久导致超时,框架自动执行cancel回滚补偿事务,结果try方法再次执行,因为此时资源已经回滚所以也没有try的必要了。
对此我们的要在try方法加上这么一个逻辑,如果资源冻结表关于本次操作的数据状态为3,则直接返回失败。

还有一个就是空回滚问题,即try方法执行过程中直接报错或者长时间阻塞了还没执行到sql逻辑,cancel就已经完成回滚了,结果还是走到了cancel方法,cancel并没有需要回滚的数据。
对此,我们只需要将修改前的值手动存到资源冻结表中并设置状态为已取消,制造一条资源冻结数据直接返回即可。

成上述分析后,我们就可以进行编码操作了,我们首先需要编写一个@LocalTCC接口,定义try、confirm、cancel方法,可以看到我们用TwoPhaseBusinessAction注解告知框架三大行为用哪个方法,并用BusinessActionContextParameter设置全局参数,confirm和cancel都可以通过BusinessActionContext 获取到这些参数

@LocalTCC
public interface SystemTCCService {


    @TwoPhaseBusinessAction(name = "doTry", commitMethod = "confirm", rollbackMethod = "cancel")
    void doTry(@BusinessActionContextParameter(paramName = "id") String id,
               @BusinessActionContextParameter(paramName = "oldVal") String oldVal,
                @BusinessActionContextParameter(paramName = "val") String val);

    boolean confirm(BusinessActionContext ctx);

    boolean cancel(BusinessActionContext ctx);


}

完成后,我们继承这个类实现一个service,代码如下可以看到笔者doTry做的就是拿着上下文的oldVal(即user表的旧值)冻结起来存到资源冻结表,并将状态设置为0(代表这条数据处于try状态)。通过xid查询资源冻结表看看是否有数据,若有则说明这是一个空悬挂操作,直接返回。

confirmcancel逻辑就比较简单了,读者可以自行阅读代码和注释,无非是删除补偿数据或回滚数据并作废资源冻结表数据而已。

@Service
public class SystemTCCServiceImpl implements SystemTCCService {
    @Autowired
    private UserMapper userMapper;
    @Autowired
    private SystemFreezeTblMapper systemFreezeTblMapper;

    @Override
    public void doTry(String id, String oldVal,String val) {

        // 0.获取事务id
        String xid = RootContext.getXID();
        // 业务悬挂判断: 判断freeze中是否有冻结记录,如果有,一定是cancel执行过,拒绝业务操作
        if (systemFreezeTblMapper.selectByPrimaryKey(xid) != null) {
            // cancel执行过,我要拒绝业务
            return;
        }


        User user = new User();
        user.setId(id);
        user.setLoginName(val);
        user.setName(val);
        user.setPassword(val);
        userMapper.updateByPrimaryKeySelective(user);

        //冻结住原有的值
        SystemFreezeTbl systemFreezeTbl = new SystemFreezeTbl();
        systemFreezeTbl.setXid(xid);
        systemFreezeTbl.setUserId(user.getId());
        systemFreezeTbl.setFreezeVal(user.getName());
        systemFreezeTbl.setFreezeOldVal(oldVal);
        //0 try 1 confim 2 cancel
        systemFreezeTbl.setState(0);
        systemFreezeTblMapper.insert(systemFreezeTbl);

    }

    @Override
    public boolean confirm(BusinessActionContext ctx) {
        //成功则删除冻结的表
        String xid = ctx.getXid();
        int count = systemFreezeTblMapper.deleteByPrimaryKey(xid);
        return count > 0;
    }

    @Override
    public boolean cancel(BusinessActionContext ctx) {
        // 0.查询冻结记录
        String xid = ctx.getXid();
        // 根据id查询冻结表的记录
        SystemFreezeTbl freeze = systemFreezeTblMapper.selectByPrimaryKey(xid);
        // 处理空回滚
        if (freeze == null) {
            //空回滚
            freeze = new SystemFreezeTbl();
            String userId = ctx.getActionContext("id").toString();
            freeze.setUserId(userId);
            freeze.setFreezeVal(ctx.getActionContext("val").toString());
            freeze.setFreezeOldVal(ctx.getActionContext("oldVal").toString());
            freeze.setState(2);
            freeze.setXid(xid);
            systemFreezeTblMapper.insert(freeze);
            return true;
        }
        // 幂等判断
        if (freeze.getState() == 2) {
            // 已经处理过了cancel,无需重复
            return true;
        }


        //手动补偿user表,进行数据还原
        User user = new User();
        user.setId(ctx.getActionContext("id").toString());
        user.setName(freeze.getFreezeOldVal());
        userMapper.updateByPrimaryKeySelective(user);

//将资源冻结表状态设置为2 代表已取消
        freeze.setFreezeVal("");
        freeze.setState(2);
        int count = systemFreezeTblMapper.updateByPrimaryKey(freeze);

        return count > 0;


    }
}

完成system核心服务层代码编写之后,我们也可以照猫画虎的完成file服务的编写,笔者这里就不多做演示了,直接贴出controller的代码。我们可以开着debug模式试着将让fileService报错,调试时就会发现systemTCCService最终会执行cancel完成数据补偿。

@GetMapping("/test")
    @GlobalTransactional
    public String hello() {

        systemTCCService.doTry("1", "1", "2");
        fileService.hello();
        return "success";
    }
(4)SAGA模式

Saga 模式是 Seata 即将开源的长事务解决方案,将由蚂蚁金服主要贡献。

总的来说它和TCC差不多,它也是分为两个阶段:

  1. 一阶段将修改提交。
  2. 二阶段则是根据一阶段进行反馈,若是成功则什么都不做,反之进行回滚补偿。

优缺点

优点:

1. 基于事件驱动实现异步调用,性能较好,吞吐高。
2. 一阶段直接提交事务,无锁,性能较好。
3. 无需像TCC一样手动编写不同阶段的方法。

缺点:

 1. 软状态时间不确定,时效性较差。
 2. 无事务隔离,可能出现脏写的情况。

 总结优缺点

分布式事务使用seata处理呢?为什么不用mq解决分布式事务的问题呢
主要还是考虑兼容性问题,目前主流的消息中间件中rocketMQ支持事务。而且考虑将来可扩展,可能我们还会更换中间件以及兼容多数据库,所以使用seata实现分布式事务是最合适的。
而且消息队列主要作用也并不是用于分布式事务问题,它的主要作用是解耦、异步、削峰。而seata是目前比较主流的分布式事务解决方案。


原文地址:https://blog.csdn.net/xiegongmiao/article/details/142496281

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!