事务可以看做是一次大的活动,它由不同的小活动组成,这些活动要么全部成功,要么全部失败。
数据库事务在实现时会将一次事务涉及的所有操作全部纳入到一个不可分割的执行单元,该执行单元中的所有操作要么都成功,要么都失败,只要其中任一操作执行失败,都将导致整个事务的回滚。
分布式系统会把一个应用系统拆分为可独立部署的多个服务,因此需要服务与服务之间远程协作才能完成事务操作,这种分布式系统环境下由不同的服务之间通过网络远程协作完成事务称之为分布式事务,例如用户注册送积分事务、创建订单减库存事务,银行转账事务等都是分布式事务。
CAP是 Consistency、Availability、Partition tolerance三个词语的缩写,分别表示一致性、可用性、分区容忍性。
在所有分布式事务场景中不会同时具备CAP三个特性,因为在具备了P的前提下C和A是不能共存的。
CAP是一个已经被证实的理论:一个分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容忍性(Partition tolerance)这三项中的两项。它可以作为我们进行架构设计、技术选型的考量标准。对于多数大型互联网应用的场景,结点众多、部署分散,而且现在的集群规模越来越大,所以节点故障、网络故障是常态,而且要保证服务可用性达到N个9(99.99..%),并要达到良好的响应性能来提高用户体验,因此一般都会做出如下选择:保证P和A,舍弃C强一致,保证最终一致性。
BASE 是 Basically Available(基本可用)、Soft state(软状态)和 Eventually consistent (最终一致性)三个短语的缩写。BASE理论是对CAP中AP的一个扩展,通过牺牲强一致性来获得可用性,当出现故障允许部分不可用但要保证核心功能可用,允许数据在一段时间内是不一致的,但最终达到一致状态。满足BASE理论的事务,我们称之为“柔性事务”。
2PC即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段(Prepare phase)、提交阶段(commit phase),2是指两个阶段,P是指准备阶段,C是指提交阶段。
成功情况:
失败情况:
2PC的传统方案是在数据库层面实现的,如Oracle、MySQL都支持2PC协议,为了统一标准减少行业内不必要的对接成本,需要制定标准化的处理模型及接口标准,国际开放标准组织Open Group定义了分布式事务处理模型DTP(Distributed Transaction Processing Reference Model)。
整个2PC的事务流程涉及到三个角色AP、RM、TM。AP指的是使用2PC分布式事务的应用程序;RM指的是资源管理器,它控制着分支事务;TM指的是事务管理器,它控制着整个全局事务。
Seata的设计目标其一是对业务无侵入,因此从业务无侵入的2PC方案着手,在传统2PC的基础上演进,并解决2PC方案面临的问题。
Seata把一个分布式事务理解成一个包含了若干分支事务的全局事务。全局事务的职责是协调其下管辖的分支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个关系数据库的本地事务,下图是全局事务与分支事务的关系图:
与 传统2PC 的模型类似,Seata定义了3个组件来协议分布式事务的处理过程:
详情见:Spring Cloud Alibaba Seata
TCC是Try、Confirm、Cancel三个词语的缩写,TCC要求每个分支事务实现三个操作:预处理Try、确认Confirm、撤销Cancel。Try操作做业务检查及资源预留,Confirm做业务确认操作,Cancel实现一个与Try相反的操作即回滚操作。TM首先发起所有的分支事务的try操作,任何一个分支事务的try操作执行失败,TM将会发起所有分支事务的Cancel操作,若try操作全部成功,TM将会发起所有分支事务的Confifirm操作,其中Confirm/Cancel操作若执行失败,TM会进行重试。
成功情况:
失败情况:
TCC分为三个阶段:
框架名称 | Github地址 |
tcc-transaction
|
|
Hmily | |
ByteTCC
|
|
EasyTransaction
|
在没有调用 TCC 资源 Try 方法的情况下,调用了二阶段的 Cancel 方法,Cancel 方法需要识别出这是一个空回滚,然后直接返回成功。
出现原因:是当一个分支事务所在服务宕机或网络异常,分支事务调用记录为失败,这个时候其实是没有执行Try阶段,当故障恢复后,分布式事务进行回滚则会调用二阶段的Cancel方法,从而形成空回滚。
解决方法:识别出这个空回滚。需要知道一阶段是否执行,如果执行了,那就是正常回滚;如果没执行,那就是空回滚。前面已经说过TM在发起全局事务时生成全局事务记录,全局事务ID贯穿整个分布式事务调用链条。再额外增加一张分支事务记录表,其中有全局事务 ID 和分支事务 ID,第一阶段 Try 方法里会插入一条记录,表示一阶段执行了。
//在cancel中cancel空回滚处理,如果try没有执行,cancel不允许执行 if(accountInfoDao.isExistTry(transId)<=0){ log.info("bank1 空回滚处理,try没有执行,不允许cancel执行,xid:{}",transId); return ; }
为了保证TCC二阶段提交重试机制不会引发数据不一致,要求 TCC 的二阶段 Try、Confirm 和 Cancel 接口保证幂等,这样不会重复使用或者释放资源。如果幂等控制没有做好,很有可能导致数据不一致等严重问题。
//当前是在try中进行幂等判断 判断local_try_log表中是否有try日志记录,如果有则不再执行 if(accountInfoDao.isExistTry(transId)>0){ log.info("bank1 try 已经执行,无需重复执行,xid:{}",transId); return ; }
悬挂就是对于一个分布式事务,其二阶段 Cancel 接口比 Try 接口先执行。
出现原因:RPC 调用分支事务try时,先注册分支事务,再执行RPC调用,如果此时 RPC 调用的网络发生拥堵,通常 RPC 调用是有超时时间的,RPC 超时以后,TM就会通知RM回滚该分布式事务,可能回滚完,RPC 请求才到达参与者真正执行,而一个 Try 方法预留的业务资源。
解决思路:如果二阶段执行完成,那一阶段就不能再继续执行。在执行一阶段事务时判断在该全局事务下,“分支事务记录”表中是否已经有二阶段事务记录,如果有则不执行Try。
//try悬挂处理,如果cancel、confirm有一个已经执行了,try不再执行 if(accountInfoDao.isExistConfirm(transId)>0 || accountInfoDao.isExistCancel(transId)>0){ log.info("bank1 try悬挂处理 cancel或confirm已经执行,不允许执行try,xid:{}",transId); return ; }
项目源码:cloud-dtx-tcc
sql文件下载地址为:dtx-tcc-sql
涉及到分布式事务的工程均需要的配置
maven配置
<!-- hmily依赖 --> <dependency> <groupId>org.dromara</groupId> <artifactId>hmily‐springcloud</artifactId> <version>2.0.4‐RELEASE</version> </dependency>
application.yaml中添加hmily
org: dromara: hmily: serializer: kryo recoverDelayTime: 30 retryMax: 30 scheduledDelay: 30 scheduledThreadMax: 10 repositorySupport: db #对于发起方的时候,把此属性设置为true。参与方为false。 started: true hmilyDbConfig: driverClassName: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/hmily?useUnicode=true username: root password: 123456
注入hmily的配置Bean
@Bean public HmilyTransactionBootstrap hmilyTransactionBootstrap(HmilyInitService hmilyInitService){ HmilyTransactionBootstrap hmilyTransactionBootstrap = new HmilyTransactionBootstrap(hmilyInitService); hmilyTransactionBootstrap.setSerializer(env.getProperty("org.dromara.hmily.serializer")); hmilyTransactionBootstrap.setRecoverDelayTime(Integer.parseInt(env.getProperty("org.dromara.hmily.recoverDelayTime"))); hmilyTransactionBootstrap.setRetryMax(Integer.parseInt(env.getProperty("org.dromara.hmily.retryMax"))); hmilyTransactionBootstrap.setScheduledDelay(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledDelay"))); hmilyTransactionBootstrap.setScheduledThreadMax(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledThreadMax"))); hmilyTransactionBootstrap.setRepositorySupport(env.getProperty("org.dromara.hmily.repositorySupport")); hmilyTransactionBootstrap.setStarted(Boolean.parseBoolean(env.getProperty("org.dromara.hmily.started"))); HmilyDbConfig hmilyDbConfig = new HmilyDbConfig(); hmilyDbConfig.setDriverClassName(env.getProperty("org.dromara.hmily.hmilyDbConfig.driverClassName")); hmilyDbConfig.setUrl(env.getProperty("org.dromara.hmily.hmilyDbConfig.url")); hmilyDbConfig.setUsername(env.getProperty("org.dromara.hmily.hmilyDbConfig.username")); hmilyDbConfig.setPassword(env.getProperty("org.dromara.hmily.hmilyDbConfig.password")); hmilyTransactionBootstrap.setHmilyDbConfig(hmilyDbConfig); return hmilyTransactionBootstrap; }
启动类上添加注解
@ComponentScan({"org.dromara.hmily"})
try: try幂等校验 try悬挂处理 检查余额是够扣减金额 扣减金额 confirm: 空 cancel cancel幂等校验 cancel空回滚处理 增加可用余额
注意:远程调用bank2时,在feign调用的接口上加注解@Hmily
try: 空 confirm: confirm幂等校验 正式增加金额 cancel: 空
项目源码:cloud-dtx-txmsg
可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。
可靠消息需要解决的问题:
//先发消息如果数据库操作错误,消息已经发送 begin transaction; //1.发送MQ //2.数据库操作 commit transation; //如果数据库超时,此时数据库回滚,但是消息可能也已经发送 begin transaction; //1.数据库操作 //2.发送MQ commit transation;
事务参与方必须能够从消息队列接收到消息,如果接收消息失败可以重复接收消息。
由于网络2的存在,若某一个消费节点超时但是消费成功,此时消息中间件会重复投递此消息,就导致了消息的重 复消费。
要解决消息重复消费的问题就要实现事务参与方的方法幂等性。
bank1
CREATE DATABASE `bank1` CHARACTER SET ‘utf8‘ COLLATE ‘utf8_general_ci‘; DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` ( `id` BIGINT (20) NOT NULL AUTO_INCREMENT, `account_name` VARCHAR (100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘户 主姓名‘, `account_no` VARCHAR (100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘银行 卡号‘, `account_password` VARCHAR (100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘帐户密码‘, `account_balance` DOUBLE NULL DEFAULT NULL COMMENT ‘帐户余额‘, PRIMARY KEY (`id`) USING BTREE ) ENGINE = INNODB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic; INSERT INTO `account_info` VALUES ( 2, ‘张三的账户‘, ‘1‘, ‘‘, 10000 ); DROP TABLE IF EXISTS `de_duplication`; CREATE TABLE `de_duplication` ( `tx_no` VARCHAR (64) COLLATE utf8_bin NOT NULL, `create_time` datetime (0) NULL DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE = INNODB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
bank2
CREATE DATABASE `bank2` CHARACTER SET ‘utf8‘ COLLATE ‘utf8_general_ci‘; DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` ( `id` BIGINT (20) NOT NULL AUTO_INCREMENT, `account_name` VARCHAR (100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘户 主姓名‘, `account_no` VARCHAR (100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘银行 卡号‘, `account_password` VARCHAR (100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT ‘帐户密码‘, `account_balance` DOUBLE NULL DEFAULT NULL COMMENT ‘帐户余额‘, PRIMARY KEY (`id`) USING BTREE ) ENGINE = INNODB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic; INSERT INTO `account_info` VALUES ( 3, ‘李四的账户‘, ‘2‘, NULL, 0 ); CREATE TABLE `de_duplication` ( `tx_no` VARCHAR (64) COLLATE utf8_bin NOT NULL, `create_time` datetime (0) NULL DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE = INNODB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
maven
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency>
properties配置
rocketmq.producer.group = producer_bank2 rocketmq.name‐server = 127.0.0.1:9876
Service:AccountInfoServiceImpl
//两个方法 //1,向mq发送转账消息 //2,更新账户,扣减金额 (通过事务id保证幂等性)
Controller:AccountInfoController
//生成事务id,调用service的发消息接口
message:ProducerTxmsgListener
//事务消息发送后的回调方法。此时保证本地事务,调用Service扣减金额同时将消息改为COMMIT(可消费状态) //如果捕获异常,将消息改为ROLLBACK回滚
Service:AccountInfoServiceImpl
//更新账户bank2,增加金额。(通过事务id保证幂等性)
message:TxmsgConsumer
//监听bank1发送的消息topic,调用Service增加金额
可靠消息最终一致性就是保证消息从生产方经过消息中间件传递到消费方的一致性,本案例使用了RocketMQ作为消息中间件,RocketMQ主要解决了两个功能:
具体流程:
与方案1不同的是应用程序向接收通知方发送通知,如下图:
具体流程:
CREATE DATABASE /*!32312 IF NOT EXISTS*/`bank1_pay` /*!40100 DEFAULT CHARACTER SET utf8 */; USE `bank1_pay`; /*Table structure for table `account_pay` */ DROP TABLE IF EXISTS `account_pay`; CREATE TABLE `account_pay` ( `id` varchar(64) COLLATE utf8_bin NOT NULL, `account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT ‘账号‘, `pay_amount` double DEFAULT NULL COMMENT ‘充值余额‘, `result` varchar(20) COLLATE utf8_bin DEFAULT NULL COMMENT ‘充值结果:success,fail‘, PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC; /*Data for the table `account_pay` */ insert into `account_pay`(`id`,`account_no`,`pay_amount`,`result`) values (‘5678ef0a-1ff0-4cfd-97ac-640d749d596f‘,‘1‘,2,‘success‘),(‘7d7d469c-f100-4066-b927-014c0c3aa010‘,‘1‘,2,‘success‘),(‘947fafad-c19c-46bc-b0f0-43703a124fd4‘,‘1‘,2,‘success‘);
bank1.sql
CREATE DATABASE /*!32312 IF NOT EXISTS*/`bank1` /*!40100 DEFAULT CHARACTER SET utf8 */; USE `bank1`; /*Table structure for table `account_info` */ DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `account_name` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT ‘户主姓名‘, `account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT ‘银行卡号‘, `account_password` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT ‘帐户密码‘, `account_balance` double DEFAULT NULL COMMENT ‘帐户余额‘, PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC; /*Data for the table `account_info` */ insert into `account_info`(`id`,`account_name`,`account_no`,`account_password`,`account_balance`) values (2,‘张三‘,‘1‘,NULL,1000); /*Table structure for table `de_duplication` */ DROP TABLE IF EXISTS `de_duplication`; CREATE TABLE `de_duplication` ( `tx_no` varchar(64) COLLATE utf8_bin NOT NULL, `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;
基本配置同可靠消息一致性
Service:AccountPayServiceImpl
//两个方法 //1,插入充值记录。生成事务id,将事务id和充值信息发送给MQ队列 //2,查询充值记录。提供给调用方查询。
Controller:AccountPayController
//直接调用Service中的方法插入充值记录
Service:AccountInfoServiceImpl
//两个方法 //1,更新账户金额。根据事务id保证更新的幂等性。 //2,远程调用pay的查询充值结果。如果发现状态改变同时更新当前账号情况。
message:NotifyMsgListener
//监听消息。调用Service的更新账户金额,幂等更新。
Controller:AccountInfoController
//调用Service的查询充值结果
最大努力通知方案是分布式事务中对一致性要求最低的一种,适用于一些最终一致性时间敏感度低的业务;最大努力通知方案需要实现如下功能:
原文:https://www.cnblogs.com/bbgs-xc/p/14456917.html