锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

Spring Cloud 入门 第三节(完)

时间:2022-09-29 03:00:00 压力变送器tm21

分布式事务

需要解决的问题
随着分布式服务架构的流行和普及,原本在单个应用中执行的多个逻辑操作现在分为多个服务之间的远程调用。虽然服务给我们的系统带来了水平扩展的能力,但挑战是分布式事务问题。如果A成功执行,多个服务将使用自己的单独维护数据库。B但执行失败,A事务此时已提交,无法回滚,最终导致双方数据不一致;

CAP定理

CAP 定理,又称布鲁尔定理。架构师设计分布式系统(不仅仅是分布式事务),CAP 是你的入门理论。
分布式系统(distributed system)正变得越来越重要,大型网站几乎都是分布式的。
分布式系统最大的困难是如何同步每个节点的状态。CAP 定理是这方面的基本定理,也是理解分布式系统的起点。

C (一致性):指数据在分布式系统中的所有数据备份,以及数据在多个副本之间的一致性(严格一致性)是否相同。(所有节点同时具有相同的数据)
一致性(Consistency)是指多副本(Replications)问题中的数据一致性。可分为强一致性和弱一致性。

  • 强一致性:所有节点中的数据在任何时候都是一样的。
  • 弱一致性:数据更新后,如果能容忍部分或全部访问,则为弱一致性。

A (可用性):该系统提供的服务必须始终处于可用状态,服务器必须每次收到用户的请求。在合理的时间内返回合理的响应(非错误和加班响应)

P (网络分区容错性):当网络节点之间无法通信时,节点被隔离,产生网络分区, 整个系统仍然可以工作 . 大多数分布式系统分布在多个子网络中。每个子网络被称为一个区域(partition)。分区容错意味着区间通信可能失败。例如,一个服务器放在中国,另一个服务器放在美国,这两个区域可能无法通信。

什么是分区?
在分布式系统中,不同的节点分布在不同的子网络中。由于某些特殊原因,这些子节点之间存在网络堵塞,但其内部子网络正常。因此,整个系统的环境被划分为几个孤立的区域。这就是分区。

CAP原则的本质是要么AP,要么CP,要么AC,但是不存在CAP。

取舍策略
如今,对于大多数大型互联网应用场景,主机众多,部署分散,集群规模越来越大,节点只会越来越多。因此,节点故障和网络故障是正常的,因此分区容错已成为分布式系统不可避免的问题。然后只能在C和A之间的选择。

原因是:在分布式系统中,网络不能 100% 分区实际上是一种不可避免的现象,随着网络节点出现问题,产生了分区, 此时,其他节点和错误节点的数据必然不一致,此时将面临选择,

选择停止所有服务,修复网络节点后恢复数据,以确保一致性(PC),
还是选择继续提供服务,放弃强一致性的要求,以此来保证整体的可用性(PA)。

因此,最多满足两个条件:

组合 分析结果
CA 满足原子和可用性,放弃分区容错。说白了就是整体应用。
CP 为了满足原子和分区的容错性,即放弃可用性。当系统被划分时,为了确保原子性,必须放弃可用性,以停止服务。
AP 为了满足可用性和分区容错性,当分区出现时,为了保证可用性,节点必须继续对外服务,这必然导致原子性丧失。

分布式事务解决方案

  1. XA两段提交(低效率)-分布式事务解决方案
  2. TCC提交三段(2段,高效[不推荐(补偿代码))
  3. 本地消息(MQ Table)
  4. 事务消息(RocketMQ[alibaba])
  5. Seata(alibaba)

基于XA提交协议的两个阶段(2PC)

X/Open 组织(即现在 Open Group )分布式事务处理模型的定义

XA协议:XA是分布式事务协议。XA它大致分为两部分:事务管理器和本地资源管理器。例如,本地资源管理器通常由数据库实现Oracle、DB这些商业数据库都实现了XA接口,作为全球调度员,事务管理器负责提交和回滚各种本地资源。

概念
二阶段提交2PC(Two phase Commit)是指在分布式系统中,为了保证所有节点在提交事务时的一致性。

背景
在分布式系统里,每个节点都能知道自己操作的成败,却不能知道其他节点操作的成败。

当一个事务跨多个节点时,为了保持事务的原子性与一致性,需要引入一个协调者(Coordinator)统一控制一切参与者(Participant)操作结果,并指示是否真正提交操作结果(commit)或者回滚(rollback)。

思路
2PC顾名思义分为两个阶段,实施思路可概括为:

  1. 投票阶段(voting phase):参与者通知协调员操作结果;
  2. 提交阶段(commit phase):协调员收到参与者的通知后,再次向参与者发出通知,根据反馈决定参与者是否提交或回滚;

缺陷
在算法执行过程中,所有节点都被堵塞,所有节点持有的资源(如数据库数据、本地文件等)都被封

典型场景如下:

  1. 在某参与者发出通知之前,所有参与者和协调者都处于阻塞状态;
  2. 所有参与者在协调之前,所有参与者都被阻塞;

此外,如果协调员或参与者崩溃,为了避免整个算法处于完全阻塞状态,使用加时机制继续向前推进算法,因此算法的效率相对较低。

总的来说,2PC保守算法

代码补偿事务(TCC)

TCC解决跨服务调用场景下分布式事务问题的主要作用是

场景案例

  • 第一阶段:美团要求两家航空公司预订机票,两家航空公司告诉美团预订是成功还是失败。航空公司需要保证,如果机票预订成功,以后就可以购买了。

  • 第二阶段:如果两家航空公司预留成功,将确认购买请求分别发送给两家公司。
    如果两家航空公司中的任何一家未能预留,成功预留的航空公司也应取消预留。在这种情况下,取消之前预留成功机票的航班不会扣除用户的钱,因为购买实际上并没有发生,以前只是要求预留机票。

通过这个计划,我们可以确保两家航空公司购买机票的一致性,要么成功,要么失败,即使失败,用户的钱也不会被扣除。如果两个航班都确认购买,然后退款,肯定会扣除。

该方案为我们提供了一种跨服务解决方案,以确保事务的一致性,TCC的雏形。

TCC是Try ( 尝试 ) — Confirm(确认) — Cancel ( 取消 ) 的简称:

操作方法 含义
Try 完成所有业务检查(一致性),预留业务资源(准隔离) 回顾上述航班预订案例的阶段1,机票是业务资源,所有资源提供商(航空公司)预留成功,try阶段是成功的
Confirm 确认执行业务操作,不做任何业务检查, 只使用Try阶段预留的业务资源。回顾上述航班预订案例的阶段2,美团APP确认两家航空公司的机票预留成功,因此向两家航空公司发出确认购买的请求。
Cancel 取消Try阶段预留的业务资源。回顾上述航班预订案例的阶段2,如果某个业务方的业务资源没有预留成功,则取消所有业务资源预留请求。

TCC两阶段提交与XA两阶段提交的区别
XA是资源层面的分布式事务,强一致性,在两阶段提交的整个过程中,一直会持有资源的锁。(基于数据库)
TCC是业务层面的分布式事务,最终一致性,不会一直持有资源的锁。(自己写代码)

在这里插入图片描述

本地消息表(异步确保)- 事务最终一致性

这种实现方式的思路,其实是源于 ebay,后来通过支付宝等公司的布道,在业内广泛使用。其基本的设计思想是将远程分布式事务拆分成一系列的本地事务。如果不考虑性能及设计优雅,借助关系型数据库中的表即可实现。

举个经典的跨行转账的例子来描述。

第一步伪代码如下,扣款 1W,通过本地事务保证了凭证消息插入到消息表中。

第二步,通知对方银行账户上加 1W 了。那问题来了,如何通知到对方呢?

通常采用两种方式:

采用时效性高的 MQ,由对方订阅消息并监听,有消息时自动触发事件
采用定时轮询扫描的方式,去检查消息表的数据
两种方式其实各有利弊,仅仅依靠 MQ,可能会出现通知失败的问题。而过于频繁的定时轮询,效率也不是最佳的(90% 是无用功)。所以,我们一般会把两种方式结合起来使用。

解决了通知的问题,又有新的问题了。万一这消息有重复被消费,往用户帐号上多加了钱,那岂不是后果很严重?

仔细思考,其实我们可以消息消费方,也通过一个“消费状态表”来记录消费状态。在执行“加款”操作之前,检测下该消息(提供标识)是否已经消费过,消费完成后,通过本地事务控制来更新这个“消费状态表”。这样子就避免重复消费的问题。

总结:上述的方式是一种非常经典的实现,基本避免了分布式事务,实现了“最终一致性”。但是,关系型数据库的吞吐量和性能方面存在瓶颈,频繁的读写消息会给数据库造成压力。所以,在真正的高并发场景下,该方案也会有瓶颈和限制的。

在订单系统新增一条消息表,将新增订单和新增消息放到一个事务里完成,然后通过轮询的方式去查询消息表,将消息推送到 MQ,库存系统去消费 MQ。

执行流程:

  • 订单系统,添加一条订单和一条消息,在一个事务里提交。
  • 订单系统,使用定时任务轮询查询状态为未同步的消息表,发送到 MQ,如果发送失败,就重试发送。
  • 库存系统,接收 MQ 消息,修改库存表,需要保证幂等操作。
  • 如果修改成功,调用 RPC 接口修改订单系统消息表的状态为已完成或者直接删除这条消息。
  • 如果修改失败,可以不做处理,等待重试。

订单系统中的消息有可能由于业务问题会一直重复发送,所以为了避免这种情况可以记录一下发送次数,当达到次数限制之后报警,人工接入处理;库存系统需要保证幂等,避免同一条消息被多次消费造成数据一致。

本地消息表这种方案实现了最终一致性,需要在业务系统里增加消息表,业务逻辑中多一次插入的 DB 操作,所以性能会有损耗,而且最终一致性的间隔主要由定时任务的间隔时间决定。

优点: 一种非常经典的实现,避免了分布式事务,实现了最终一致性。在 .NET中 有现成的解决方案。

缺点: 消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。

MQ 事务消息

有一些第三方的MQ是支持事务消息的,比如RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的MQ都是不支持事务消息的,比如 RabbitMQ 和 Kafka 都不支持。

以阿里的 RocketMQ 中间件为例,其思路大致为:

  1. RocketMQ提供了类似X/Open XA的分布事务功能,通过MQ的事务消息能达到分布式事务的最终一致。
  2. 发送方在业务执行开始会先向消息队列中投递 “ 半消息 ” ,半消息即暂时不会真正投递的消息,当发送方(即生产者)将消息成功发送给了MQ服务端且并未将该消息的二次确认结果返回,此时消息状态是“ 暂时不可投递 ” 状态(可以认为是状态未知)。该状态下的消息即半消息。
  3. 如果出现网络闪断、生产者应用重启等原因导致事务消息二次确认丢失,MQ服务端会通过扫描发现某条消息长期处于 “ 半消息 ” 状态,MQ服务端会主动向生产者查询该消息的最终状态是处于Commit(消息提交)还是Rollback(消息回滚)。这个过程称为消息回查。
    在业务方法内要想消息队列提交两次请求,一次发送消息和一次确认消息。如果确认消息发送失败了RocketMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认,所以生产方需要实现一个check接口,RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

总体而言RocketMQ事务消息分为两条主线

  • 定时任务发送流程:发送half message(半消息),执行本地事务,发送事务执行结果
  • 定时任务回查流程:MQ服务器回查本地事务,发送事务执行结果

具体流程如下

  1. Producer 向 MQ 服务器 发送消息 , MQ Server 将消息状态标记为 Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。
  2. MQ 服务器收到消息并持久化成功之后,会向Producer 确认首次消息发送成功,此时消息处于 half message(半消息) 状态,并未发送给对应的 Consumer 。
  3. Producer 开始执行本地事务逻辑 , 通过本地数据库事务控制。
  4. 根据事务执行结果,Producer 向 MQ 服务器提交二次确认 ( commit 或者 rollback) 。MQ Server 收到 Commit 状态则将半消息标记为可投递,Consumer 最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,Consumer 将不会接受该消息。
  5. 在断网或者应用重启的情况下,二次确认未成功的发给 MQ Server,MQ Server 会主动向 Producer 启动消息回查
  6. Producer 根据事务执行结果,对消息回查返回对应的结果。
  7. Mq Server根据返回结果,决定继续投递消息或者丢弃消息(重复第4步操作)。

注意 1-4 为事务消息的发送过程, 5-6 为事务消息的回查过程。
优点: 实现了最终一致性,不需要依赖本地数据库事务。
缺点: 目前主流MQ中只有RocketMQ支持事务消息。

Seata

理想的方案应该是什么样子

针对上面所说的分布式事务解决方案的痛点,那很显然,我们理想的分布式事务解决方案肯定是性能要好而且要对业务无入侵,业务层上无需关心分布式事务机制的约束。

一个理想的分布式事务解决方案应该:像使用 本地事务 一样简单,业务逻辑只关注业务层面的需求,不需要考虑事务机制上的约束。

原理和设计
我们要设计一个对业务无侵入的方案,所以从业务无侵入的 XA 方案来思考:

是否可以在 XA 的基础上演进,解决掉 XA 方案面临的问题呢?

如何定义一个分布式事务
首先,很自然的,我们可以把一个分布式事务理解成一个包含了若干 分支事务全局事务全局事务 的职责是协调其下管辖的 分支事务 达成一致,要么一起成功提交,要么一起失败回滚。此外,通常 分支事务 本身就是一个满足 ACID 的 本地事务。这是我们对分布式事务结构的基本认识,与 XA 是一致的。

Seata介绍

seata是阿里开源的一个分布式事务框架,能够让大家在操作分布式事务时,像操作本地事务一样简单。一个注解搞定分布式事务。

解决分布式事务问题,有两个设计初衷

对业务无侵入:即减少技术架构上的微服务化所带来的分布式事务问题对业务的侵入
高性能:减少分布式事务解决方案所带来的性能消耗

seata中有两种分布式事务实现方案,AT及TCC

  • AT模式主要关注多 DB 访问的数据一致性,当然也包括多服务下的多 DB 数据访问一致性问题 2PC-改进
  • TCC 模式主要关注业务拆分,在按照业务横向扩展资源时,解决微服务间调用的一致性问题

那 Seata 是怎么做到的呢?下面说说它的各个模块之间的关系。

Seata 的设计思路是将一个分布式事务可以理解成一个全局事务,下面挂了若干个分支事务,而一个分支事务是一个满足 ACID 的本地事务,因此我们可以操作分布式事务像操作本地事务一样。

2019 年 1 月,阿里巴巴中间件团队发起了开源项目 Fescar*(Fast & EaSy Commit And Rollback)*,和社区一起共建开源分布式事务解决方案。Fescar 的愿景是让分布式事务的使用像本地事务的使用一样,简单和高效,并逐步解决开发者们遇到的分布式事务方面的所有难题。

AT模式
它使得应用代码可以像使用本地事务一样使用分布式事务,完全屏蔽了底层细节

AT 模式下,把每个数据库被当做是一个 Resource,Seata 里称为 DataSource Resource。业务通过 JDBC 标准接口访问数据库资源时,Seata 框架会对所有请求进行拦截,做一些操作。每个本地事务提交时,Seata RM(Resource Manager,资源管理器) 都会向 TC(Transaction Coordinator,事务协调器) 注册一个分支事务。当请求链路调用完成后,发起方通知 TC 提交或回滚分布式事务,进入二阶段调用流程。此时,TC 会根据之前注册的分支事务回调到对应参与者去执行对应资源的第二阶段。TC 是怎么找到分支事务与资源的对应关系呢?每个资源都有一个全局唯一的资源 ID,并且在初始化时用该 ID 向 TC 注册资源。在运行时,每个分支事务的注册都会带上其资源 ID。这样 TC 就能在二阶段调用时正确找到对应的资源。

解释:

Transaction Coordinator (TC): 事务协调器,维护全局事务的运行状态,负责协调并决定全局事务的提交或回滚。
Transaction Manager( TM ): 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。
Resource Manager (RM): 资源管理器,负责本地事务的注册,本地事务状态的汇报(投票),并且负责本地事务的提交和回滚。

XID:一个全局事务的唯一标识

其中,TM是一个分布式事务的发起者和终结者,TC负责维护分布式事务的运行状态,而RM则负责本地事务的运行。如下图所示:

下面是一个分布式事务在Seata中的执行流程:

  • TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID
  • XID 在微服务调用链路的上下文中传播。
  • RM 向 TC 注册分支事务,接着执行这个分支事务并提交(重点:RM在第一阶段就已经执行了本地事务的提交/回滚),最后将执行结果汇报给TC
  • TM 根据 TC 中所有的分支事务的执行情况,发起全局提交或回滚决议。
  • TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。

Seata 中有三大模块,分别是 TM、RM 和 TC。 其中 TM 和 RM 是作为 Seata 的客户端与业务系统集成在一起,TC 作为 Seata 的服务端独立部署。

MT 模式

Seata还支持MT模式。MT模式本质上是一种TCC方案,业务逻辑需要被拆分为 Prepare/Commit/Rollback 3 部分,形成一个 MT 分支,加入全局事务。如图所示:

MT 模式一方面是 AT 模式的补充。另外,更重要的价值在于,通过 MT 模式可以把众多非事务性资源纳入全局事务的管理中。

Seata使用

seata项目地址
官方文档

引入依赖:


<dependency>
    <groupId>com.alibaba.cloudgroupId>
    <artifactId>spring-cloud-alibaba-seataartifactId>
    <version>2.1.0.RELEASEversion>
dependency>

加入配置文件

fescar-parent\fescar-api\src\main\resources文件夹下加入添加Seata 配置文件registry.conffile.conf
registry.conf

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "file"

  nacos {
    serverAddr = "localhost"
    namespace = "public"
    cluster = "default"
  }
  eureka {
    serviceUrl = "http://localhost:8761/eureka"
    application = "default"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = "0"
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "file"

  nacos {
    serverAddr = "localhost"
    namespace = "public"
    cluster = "default"
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    app.id = "seata-server"
    apollo.meta = "http://192.168.1.204:8801"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

file.conf

transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  #thread factory for netty
  thread-factory {
    boss-thread-prefix = "NettyBoss"
    worker-thread-prefix = "NettyServerNIOWorker"
    server-executor-thread-prefix = "NettyServerBizHandler"
    share-boss-worker = false
    client-selector-thread-prefix = "NettyClientSelector"
    client-selector-thread-size = 1
    client-worker-thread-prefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    boss-thread-size = 1
    #auto default pin or 8
    worker-thread-size = 8
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}
service {
  #vgroup->rgroup
  vgroupMapping.my_test_tx_group = "default"
  #only support single node
  default.grouplist = "127.0.0.1:8091"
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
  #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
  max.commit.retry.timeout = "-1"
  max.rollback.retry.timeout = "-1"
}

client {
  async.commit.buffer.limit = 10000
  lock {
    retry.internal = 10
    retry.times = 30
  }
  report.retry.count = 5
}

## transaction log store
store {
  ## store mode: file、db
  mode = "file"

  ## file store
  file {
    dir = "sessionStore"

    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    max-branch-session-size = 16384
    # globe session size , if exceeded throws exceptions
    max-global-session-size = 512
    # file buffer size , if exceeded allocate new buffer
    file-write-buffer-cache-size = 16384
    # when recover batch read size
    session.reload.read_size = 100
    # async, sync
    flush-disk-mode = async
  }

  ## database store
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
    datasource = "dbcp"
    ## mysql/oracle/h2/oceanbase etc.
    db-type = "mysql"
    url = "jdbc:mysql://127.0.0.1:3306/seata"
    user = "mysql"
    password = "mysql"
    min-conn = 1
    max-conn = 3
    global.table = "global_table"
    branch.table = "branch_table"
    lock-table = "lock_table"
    query-limit = 100
  }
}
lock {
  ## the lock store mode: local、remote
  mode = "remote"

  local {
    ## store locks in user's database
  }

  remote {
    ## store locks in the seata's server
  }
}
recovery {
  committing-retry-delay = 30
  asyn-committing-retry-delay = 30
  rollbacking-retry-delay = 30
  timeout-retry-delay = 30
}

transaction {
  undo.data.validation = true
  undo.log.serialization = "jackson"
}

## metrics settings
metrics {
  enabled = false
  registry-type = "compact"
  # multi exporters use comma divided
  exporter-list = "prometheus"
  exporter-prometheus-port = 9898
}

注入数据源

@Configuration
public class DataSourceProxyConfig { 
        

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource dataSource() { 
        
        return new DruidDataSource();
    }

    @Bean
    public DataSourceProxy dataSourceProxy(DataSource dataSource) { 
        
        return new DataSourceProxy(dataSource);
    }

    @Bean
    public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception { 
        
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSourceProxy);
        return sqlSessionFactoryBean.getObject();
    }
}

添加 undo_log 表

在业务相关的数据库中添加 undo_log 表,用于保存需要回滚的数据
注意:是每一个微服务数据库都要建这个表

CREATE TABLE `undo_log`
(
    `id`            BIGINT(20)   NOT NULL AUTO_INCREMENT,
    `branch_id`     BIGINT(20)   NOT NULL,
    `xid`           VARCHAR(100) NOT NULL,
    `context`       VARCHAR(128) NOT NULL,
    `rollback_info` LONGBLOB     NOT NULL,
    `log_status`    INT(11)      NOT NULL,
    `log_created`   DATETIME     NOT NULL,
    `log_modified`  DATETIME     NOT NULL,
    `ext`           VARCHAR(100) DEFAULT NULL,
    PRIMARY KEY (`id`),
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8

启动 Seata-Server

在 https://github.com/seata/seata/releases 下载相应版本的 Seata-Server,修改 registry.conf为相应的配置(如果使用 file 则不需要修改),解压并通过以下命令启动:

sh ./bin/seata-server.sh

使用@GlobalTransactional开启事务

在业务的发起方的方法上使用@GlobalTransactional开启全局事务,Seata 会将事务的 xid 通过拦截器添加到调用其他服务的请求中,实现分布式事务

这里是fescar-business项目是业务的发起方

package com.atguigu.service.impl;

import com.atguigu.dao.LogInfoMapper;
import com.atguigu.feign.OrderInfoFeign;
import com.atguigu.feign.UserInfoFeign;
import com.atguigu.pojo.LogInfo;
import com.atguigu.service.BusinessService;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;

/***** * @Author: 马伟奇 * @Description: com.atguigu.service.impl ****/
@Service
public class BusinessServiceImpl implements BusinessService { 
        

    @Autowired
    private OrderInfoFeign orderInfoFeign;

    @Autowired
    private UserInfoFeign userInfoFeign;

    @Autowired
    private LogInfoMapper logInfoMapper;

    /*** * ① * 下单 * @GlobalTransactional:全局事务入口 * @param username * @param id * @param count */

    @Override
    @GlobalTransactional
    public void add(String username, int id, int count) { 
        
        //添加订单日志
        LogInfo logInfo = new LogInfo();
        logInfo.setContent("添加订单数据---"+new Date());
        logInfo.setCreatetime(new Date());
        int logcount = logInfoMapper.insertSelective(logInfo);
        System.out.println("添加日志受影响行数:"+logcount);

        //添加订单
        orderInfoFeign.add(username,id,count);

        //用户账户余额递减
        userInfoFeign.decrMoney(username,10);
    }
}

在每个相关业务项目中加上分组

修改 fescar-business , fescar-itemfescar-orderfescar-user各大微服务的 application.yml 配置,配置通信指定的组名( my_test_tx_group 在 file.conf 中有):

  cloud:  # 分组
    alibaba:
      seata:
        tx-service-group: my_test_tx_group

如在 fescar-business

server:
  port: 18081
spring:
  application:
    name: business
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/fescar-business?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
    username: root
    password: root
  main:
    allow-bean-definition-overriding: true
  cloud:  # 分组
    alibaba:
      seata:
        tx-service-group: my_test_tx_group
eureka:
  client:
    service-url:
      defaultZone: http://127.0.0.1:7001/eureka
  instance:
    prefer-ip-address: true
feign:
  hystrix:
    enabled: true

#读取超时设置
ribbon:
  ReadTimeout: 30000
#hystrix 配置
hystrix:
  command:
    default:
      execution:
        isolation:
          thread:
            timeoutInMilliseconds: 10000
          strategy: SEMAPHORE
锐单商城拥有海量元器件数据手册IC替代型号,打造电子元器件IC百科大全!

相关文章