之前介绍过分布式事务之Seata AT模式,这篇文章接着介绍如何使用Seata TCC模式。
TCC 是分布式事务中的二阶段提交协议,它的全称为 Try-Confirm-Cancel,即资源预留(Try)、确认操作(Confirm)、取消操作(Cancel),他们的具体含义如下:
一个分布式的全局事务,整体是两阶段提交Try-[Comfirm/Cancel] 的模型。在Seata中,AT模式与TCC模式事实上都是两阶段提交的具体实现。
他们的区别在于:
AT 模式基于支持本地 ACID 事务 的 关系型数据库(目前支持Mysql、Oracle与PostgreSQL):
TCC 模式不依赖于底层数据资源的事务支持,且Try-Confirm-Cancel三者完全由开发者自行开发定义:
AT和TCC模式在一阶段都会提交本地事务,二阶段都是异步执行,
相较于传统XA阻塞型事务
- 性能更高
- 无法做到强一致性,而是最终一致性,需要处理并且能够接受中间状态(软状态)
所谓 TCC 模式,是指支持把 自定义的 分支事务纳入到全局事务的管理中。
简单点概括,SEATA的TCC模式就是手工的AT模式,它允许你自定义两阶段的处理逻辑而不依赖AT模式的undo_log。
TCC 是一种侵入式的分布式事务解决方案,Try-Confirm-Cancel三个操作都需要业务系统自行实现,对业务系统有着非常大的入侵性,设计相对复杂,但优点是 TCC 完全不依赖数据库,能够实现跨数据库、跨应用资源管理,通过侵入式的编码方式来完成对不同数据源的访问,并将其纳入到分布式事务管理中,更好地解决了在各种复杂业务场景下的分布式事务问题。
如果服务中仅涉及到DB(Mysql、Oracle、PostgreSQL)持久化,Seata的AT模式基本上就足够了,且AT模式除了使用@GlobalTransactional注解外几乎不侵入代码(非侵入式)。
但是当服务中涉及如下情况,则可考虑使用TCC模式:
注:TCC模式对业务侵入较大,本文的选型建议是在仅考虑AT、TCC模式下给出,
实际使用时可亦可考虑其他更轻量、侵入低的分布式事务实现方式,如可靠消息、SAGA等模式。
本示例基于Seata Server 1.5.2版本,关于Seata Server 1.5.2的升级过程可参见我之前的博客:升级Seata Server 1.5.2
且Seata1.5版本后解决了TCC模式下的幂等、空回滚、悬挂的问题,
若需支持此特性,还需在各自服务的业务数据库中额外导入表tcc-fence-log:
CREATE TABLE IF NOT EXISTS `tcc_fence_log`
(`xid` VARCHAR(128) NOT NULL COMMENT 'global id',`branch_id` BIGINT NOT NULL COMMENT 'branch id',`action_name` VARCHAR(64) NOT NULL COMMENT 'action name',`status` TINYINT NOT NULL COMMENT 'status(tried:1;committed:2;rollbacked:3;suspended:4)',`gmt_create` DATETIME(3) NOT NULL COMMENT 'create time',`gmt_modified` DATETIME(3) NOT NULL COMMENT 'update time',PRIMARY KEY (`xid`, `branch_id`),KEY `idx_gmt_modified` (`gmt_modified`),KEY `idx_status` (`status`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
结合Seata官网下单流程示例,具体讲解Java客户端如何集成Seata TCC模式
示例模拟了一个下单流程,即由业务应用Business依次调用(服务间采用Openfeign Http调用)
Maven依赖、客户端Nacos配置说明可参见我之前的博客:分布式事务 - Seata - AT入门 => 三、AT模式
完整的示例代码可参见:https://gitee.com/luoex/distributed-transaction-demo/tree/master/seata-tcc
相关的Nacos配置、Sql定义可参见:https://gitee.com/luoex/distributed-transaction-demo/tree/master/config/seata
TCC核心就是Try-Confrim-Cancel三段逻辑的实现,在Seata中可通过定义一个接口及对应的方法来标记这三段逻辑,之后再具体实现该接口。同时由于我们使用的是 OpenFeign(基于Http协议),因此需在接口声明处使用@LocalTCC注解。TCC模式相关注解说明如下:
注:以上TCC相关注解仅在接口定义中进行标记即可,具体的接口实现类中无需再重复标记。
以库存服务为例,作为分支事务其TCC接口定义如下:
import com.luo.dt.common.model.result.RespResult;
import com.luo.dt.seata.tcc.model.dto.DeductStorageDto;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;/*** 库存信息 服务类
* 注:在Tcc接口上标注@LocalTcc注解** @author luohq* @since 2021-11-02*/
@LocalTCC
public interface IStorageService {/*** 减库存** @param deductStorageDto 扣库存参数* @return 响应结果*/@TwoPhaseBusinessAction(//该tcc的bean名称,写方法名便可,全局唯一name = "deduct",//二阶段确认方法commitMethod = "commitDeduct",//二阶段取消方法rollbackMethod = "cancelDeduct",//启用tcc防护(避免幂等、空回滚、悬挂)useTCCFence = true)RespResult deduct(@BusinessActionContextParameter("deductStorageDto") DeductStorageDto deductStorageDto);/*** 确认方法,与@TwoPhaseBusinessAction.commitMethod对应* 注:context可以传递try方法的参数** @param context 上下文* @return 是否成功*/Boolean commitDeduct(BusinessActionContext context);/*** 回滚方法,与@TwoPhaseBusinessAction.rollbackMethod对应* 注:context可以传递try方法的参数** @param context 上下文* @return 是否成功*/Boolean cancelDeduct(BusinessActionContext context);}
除了以上单独为TCC定义的接口(仅包含TCC三个方法),也可以在我们普通的服务接口上(包含多个业务方法)通过TCC相关注解仅对相关方法进行标记即可,如我实际示例代码中集成了Mybatis-Plus的IService接口,该接口定义了Mybatis-Plus框架内置的许多方法,我们仅需对TCC相关的业务方法进行标记即可。
interface IStorageService extends IService { ... }
TCC模式下各服务的Try-Confirm-Cancel实现内容如下:
注:
Order服务的Cancel阶段正常应该实现Try创建订单的补偿操作,即删除之前Try方法中保存的DB订单信息,
但由于当前Order服务位于整个服务调用链路的最后一位,执行到Order服务的Try阶段,即也意味着前面的Storage、Account服务的Try阶段都成功了,
不存在Order服务的Try阶段执行成功了还需进行全局回滚的情况,所以也就没必要实现Order服务的Cancel方法(其实就是懒😓)。
若需实现此Cancel方法,后文具体代码示例中有给出相关说明。
首先Business服务通过 @GlobalTransactional 标记的方法开启全局事务,然后依次调用Storage服务、Order服务 --> Account服务。
服务仅通过RPC协议(本例采用HTTP协议)暴露Try方法即可,且服务间的调用采用OpenFeign(Http协议)。
该编程模型需要注意的点如下:
TCC模式中的分支事务的Try方法组成了服务间的调用链,也就是说Try方法是通过RPC同步调用的,同步返回结果,而Confirm、Cancel都是异步调用的,无法同步返回结果。若上游服务需要下游服务的返回结果,如Business服务需要同步获取Order服务的订单创建信息,则订单创建信息需在Order服务的Try方法中返回;
仅当所有分支事务的Try操作均成功,才会提交全局事务,触发所有分支事务的Confirm方法;
若任一分支事务的Try操作出现异常,则回滚全局事务,触发 其他分支(不包括当前Try操作异常的分支) 事务的Cancel方法;
Try、Confirm、Cancel方法需保证自身的本地事务实现,如通过@Transactional注解修饰。
业务逻辑放在Try 或 Confirm?
Business服务开启全局事务代码如下:
/*** 业务服务 - 接口类** @author luohq* @date 2022-12-03*/
public interface BusinessService {/*** 创建订单** @param businessDto 业务参数* @return 响应结果*/RespResult handleBusinessAt(BusinessDto businessDto);}---------------------------------------------------------------------/*** 业务服务 - 实现类** @author luo* @date 2022-12-03*/
@Service
@Slf4j
public class BusinessServiceImpl implements BusinessService {@Resourceprivate StorageFeignClient storageFeignClient;@Resourceprivate OrderFeignClient orderFeignClient;/*** 下单操作 - TCC全局事务通过@GlobalTransctional注解发起** @param businessDto 业务参数* @return 响应结果*/@Override@GlobalTransactional(timeoutMills = 60000 * 2)public RespResult handleBusinessAt(BusinessDto businessDto) {log.info("开始TCC全局事务,XID={}", RootContext.getXID());/** 扣减库存 */DeductStorageDto deductStorageDto = new DeductStorageDto(businessDto.getCommodityCode(), businessDto.getCount());log.info("RPC扣减库存,参数:{}", deductStorageDto);RespResult storageResult = this.storageFeignClient.deduct(deductStorageDto);log.info("RPC扣减库存,结果:{}", storageResult);if (!RespResult.isSuccess(storageResult)) {throw new MsgRuntimeException("RPC扣减库存 - 返回失败结果!");}/** 创建订单 */CreateOrderDto createOrderDto = new CreateOrderDto(businessDto.getUserId(), businessDto.getCommodityCode(), businessDto.getCount());log.info("RPC创建订单,参数:{}", createOrderDto);RespResult orderResult = this.orderFeignClient.createOrder(createOrderDto);log.info("RPC创建订单,结果:{}", orderResult);if (!RespResult.isSuccess(orderResult)) {throw new MsgRuntimeException("RPC创建订单 - 返回失败结果!");}return orderResult;}}
注:
若全局事务发起者除了发起服务RPC调用,也需要实现自身对应的分支事务TCC处理逻辑,
则可单独定义并实现TCC接口,然后在@GlobalTransactional方法中调用该TCC接口即可,
且在Seata中支持TCC模式和AT模式的混合使用。
Storage服务TCC事务核心实现代码如下:
/*** 库存信息 服务类
* 注:在Tcc接口上标注@LocalTcc注解** @author luohq* @since 2022-12-03*/
@LocalTCC
public interface IStorageService extends IService {/*** 减库存** @param deductStorageDto 扣库存参数* @return 响应结果*/@TwoPhaseBusinessAction(//该tcc的bean名称,写方法名便可,全局唯一name = "deduct",//二阶段确认方法commitMethod = "commitDeduct",//二阶段取消方法rollbackMethod = "cancelDeduct",//启用tcc防护(避免幂等、空回滚、悬挂)useTCCFence = true)RespResult deduct(@BusinessActionContextParameter("deductStorageDto") DeductStorageDto deductStorageDto);/*** 确认方法,与@TwoPhaseBusinessAction.commitMethod对应* 注:context可以传递try方法的参数** @param context 上下文* @return 是否成功*/Boolean commitDeduct(BusinessActionContext context);/*** 回滚方法,与@TwoPhaseBusinessAction.rollbackMethod对应* 注:context可以传递try方法的参数** @param context 上下文* @return 是否成功*/Boolean cancelDeduct(BusinessActionContext context);}----------------------------------------------------------------/*** 库存信息 服务实现类** @author luohq* @since 2022-12-03*/
@Service
@Slf4j
public class StorageServiceImpl extends ServiceImpl implements IStorageService {private final String CACHE_STORAGE_KEY_FORMAT = "storage:%s";@Resourceprivate RedisTemplate redisTemplate;/*** 分支事务仅使用普通@Transactional注解即可*/@Overridepublic RespResult deduct(DeductStorageDto deductStorageDto) {log.info("开始TCC分支事务,XID={}", RootContext.getXID());log.info("扣减商品库存,参数: {}", deductStorageDto);/** 模拟回滚异常 */if ("product-2".equals(deductStorageDto.getCommodityCode())) {throw new MsgRuntimeException("异常:模拟业务异常:Storage branch exception");}/** 扣减缓存中的商品库存 */String cacheKey = String.format(CACHE_STORAGE_KEY_FORMAT, deductStorageDto.getCommodityCode());Long cacheStorageCount = this.redisTemplate.opsForValue().decrement(cacheKey, deductStorageDto.getCount());log.info("扣减后的商品库存: {}={}", cacheKey, cacheStorageCount);if (cacheStorageCount < 0) {throw new MsgRuntimeException("扣减商品缓存库存失败!");}return RespResult.success();}@Override@Transactional(rollbackFor = Exception.class)public Boolean commitDeduct(BusinessActionContext context) {//获取事务上下文传递的参数DeductStorageDto deductStorageDto = context.getActionContext("deductStorageDto", DeductStorageDto.class);log.info("TCC提交成功, XID={}, deductStorageDto={}", context.getXid(), deductStorageDto);/** 扣减DB中的商品库存 */Integer retCount = this.baseMapper.deductStorage(deductStorageDto.getCommodityCode(), deductStorageDto.getCount());log.info("修改商品库存,结果: {}", retCount);//修改商品库存失败,则直接回滚if (0 >= retCount) {throw new MsgRuntimeException("修改商品库存失败!");}return true;}@Overridepublic Boolean cancelDeduct(BusinessActionContext context) {//获取事务上下文传递的参数DeductStorageDto deductStorageDto = context.getActionContext("deductStorageDto", DeductStorageDto.class);log.warn("TCC回滚业务, XID={}, deductStorageDto={}", context.getXid(), deductStorageDto);/** 还原缓存中的商品库存 */String cacheKey = String.format(CACHE_STORAGE_KEY_FORMAT, deductStorageDto.getCommodityCode());Long cacheStorageCount = this.redisTemplate.opsForValue().increment(cacheKey, deductStorageDto.getCount());log.info("还原后的商品库存: {}={}", cacheKey, cacheStorageCount);return true;}
}
Order服务TCC事务核心实现代码如下:
/*** 订单信息 服务类
* 注:在Tcc接口上标注@LocalTcc注解** @author luohq* @date 2022-12-12*/
@LocalTCC
public interface IOrderService extends IService {/*** 创建订单(用户扣款、创建订单)** @param createOrderDto 创建订单参数* @return 响应结果*/@TwoPhaseBusinessAction(//该tcc的bean名称,写方法名便可,全局唯一name = "create",//二阶段确认方法commitMethod = "commitCreate",//二阶段取消方法rollbackMethod = "cancelCreate",//启用tcc防护(避免幂等、空回滚、悬挂)useTCCFence = true)RespResult create(@BusinessActionContextParameter("createOrderDto") CreateOrderDto createOrderDto);/*** 确认方法,与@TwoPhaseBusinessAction.commitMethod对应* 注:context可以传递try方法的参数** @param context 上下文* @return 是否成功*/Boolean commitCreate(BusinessActionContext context);/*** 回滚方法,与@TwoPhaseBusinessAction.rollbackMethod对应* 注:context可以传递try方法的参数** @param context 上下文* @return 是否成功*/Boolean cancelCreate(BusinessActionContext context);
}---------------------------------------------------------------/*** 订单信息 服务实现类** @author luohq* @date 2022-12-12*/
@Service
@Slf4j
public class OrderServiceImpl extends ServiceImpl implements IOrderService {@Resourceprivate AccountFeignClient accountFeignClient;/*** 分支事务仅使用普通@Transactional注解即可*/@Override@Transactional(rollbackFor = Exception.class)public RespResult create(CreateOrderDto createOrderDto) {log.info("开始TCC分支事务,XID={}", RootContext.getXID());//计算订单金额(假设商品单价5元)BigDecimal orderMoney = new BigDecimal(createOrderDto.getCount()).multiply(new BigDecimal(5));/** 用户扣款 */RespResult respResult = accountFeignClient.debit(new DebitDto(createOrderDto.getUserId(), orderMoney));log.info("RPC用户扣减余额服务,结果:{}", respResult);if (!RespResult.isSuccess(respResult)) {throw new MsgRuntimeException("RPC用户扣减余额服务失败!");}/** 创建订单 */Order order = new Order();order.setUserId(createOrderDto.getUserId());order.setCommodityCode(createOrderDto.getCommodityCode());order.setCount(createOrderDto.getCount());order.setMoney(orderMoney);log.info("保存订单信息,参数:{}", order);Boolean result = this.save(order);log.info("保存订单信息,结果:{}", result);if (!Boolean.TRUE.equals(result)) {throw new MsgRuntimeException("保存新订单信息失败!");}if ("product-3".equals(createOrderDto.getCommodityCode())) {throw new MsgRuntimeException("异常:模拟业务异常:Order branch exception");}return RespResult.successData(order);}@Overridepublic Boolean commitCreate(BusinessActionContext context) {log.info("TCC提交成功, XID={}, createOrderDto={}", context.getXid(), context.getActionContext("createOrderDto"));return true;}@Overridepublic Boolean cancelCreate(BusinessActionContext context) {log.warn("TCC回滚业务, XID={}, createOrderDto={}", context.getXid(), context.getActionContext("createOrderDto"));//此处正常应该实现create创建订单的补偿方法,即删除之前create方法中保存的DB订单信息,//但由于当前Order服务位于整个服务调用链路的最后一位,执行到Order服务的Try阶段,即也意味着前面的Storage、Account服务的Try阶段都成功了,//不存在Order服务的Try阶段执行成功了还需进行全局回滚的情况,所以也就没必要实现Order服务的Cancel方法(其实就是懒-_-|||)。//若需实现create创建订单的补偿方法,可在DB中新创建个表,如order_tx_relation(order_id, tx_id)//然后在create方法保存order成功后,同时将 (新生成的order_id, 全局事务XID) 的绑定关系保存到order_tx_relation表,//在此Cancel方法中通过XID查询到对应的order_id,然后删除此order_id对应的订单信息。return true;}
}
注1:
Order服务的Cancel阶段正常应该实现Try创建订单的补偿操作,即删除之前Try方法中保存的DB订单信息,
但由于当前Order服务位于整个服务调用链路的最后一位,执行到Order服务的Try阶段,即也意味着前面的Storage、Account服务的Try阶段都成功了,
不存在Order服务的Try阶段执行成功了还需进行全局回滚的情况,所以也就没必要实现Order服务的Cancel方法(其实就是懒😓)。
注2:
若需实现create创建订单的补偿方法,可在DB中新创建个表,如order_tx_relation(order_id, tx_id)
然后在create方法保存order成功后,同时将 (新生成的order_id, 全局事务XID) 的绑定关系保存到order_tx_relation表,
在此Cancel方法中通过XID查询到对应的order_id,然后删除此order_id对应的订单信息。
Account服务TCC事务核心实现代码如下:
*** 用户信息 服务类
* 注:在Tcc接口上标注@LocalTcc注解** @author luohq* @date 2022-12-12*/
@LocalTCC
public interface IAccountService extends IService {/*** 用户扣款** @param debitDto 扣款参数* @return 返回结果*/@TwoPhaseBusinessAction(//该tcc的bean名称,写方法名便可,全局唯一name = "debit",//二阶段确认方法commitMethod = "commitDebit",//二阶段取消方法rollbackMethod = "cancelDebit",//启用tcc防护(避免幂等、空回滚、悬挂)useTCCFence = true)RespResult debit(@BusinessActionContextParameter("debitDto") DebitDto debitDto);/*** 确认方法,与@TwoPhaseBusinessAction.commitMethod对应* 注:context可以传递try方法的参数** @param context 上下文* @return 是否成功*/Boolean commitDebit(BusinessActionContext context);/*** 回滚方法,与@TwoPhaseBusinessAction.rollbackMethod对应* 注:context可以传递try方法的参数** @param context 上下文* @return 是否成功*/Boolean cancelDebit(BusinessActionContext context);
}-----------------------------------------------------------------------------------------/*** 用户信息 服务实现类** @author luohq* @date 2022-12-12*/
@Service
@Slf4j
public class AccountServiceImpl extends ServiceImpl implements IAccountService {/*** 分支事务仅使用普通@Transactional注解即可*/@Override@Transactional(rollbackFor = Exception.class)public RespResult debit(DebitDto debitDto) {log.info("开始TCC分支事务,XID={}", RootContext.getXID());log.info("用户扣款,参数:{}", debitDto);int retCount = this.baseMapper.debit(debitDto.getUserId(), debitDto.getMoney());log.info("用户扣款,结果:{}", retCount);if (0 >= retCount) {throw new MsgRuntimeException("用户扣款失败!");}return RespResult.success();}@Overridepublic Boolean commitDebit(BusinessActionContext context) {log.info("TCC提交成功, XID={}, debitDto={}", context.getXid(), context.getActionContext("debitDto"));return true;}@Override@Transactional(rollbackFor = Exception.class)public Boolean cancelDebit(BusinessActionContext context) {DebitDto debitDto = context.getActionContext("debitDto", DebitDto.class);log.warn("TCC回滚业务, XID={}, debitDto={}", context.getXid(), debitDto);log.info("用户补款,参数:{}", debitDto);int retCount = this.baseMapper.debit(debitDto.getUserId(), debitDto.getMoney().multiply(new BigDecimal(-1)));log.info("用户补款,结果:{}", retCount);return true;}
}
TCC 模式中存在的三大问题:幂等、空回滚、悬挂。
幂等(Confirm/Cancel方法可能被多次调用)
在 Confirm/Cancel 阶段,因为 TC 没有收到分支事务的响应,需要进行重试,这就要分支事务Try/Cancel方法支持幂等。
空回滚(Try方法没被执行,却触发了Cancel方法被执行)
在 Try 阶段,分支事务所在节点发生了故障,Try 阶段在不考虑重试的情况下,全局事务必须要走向结束状态(全局事务回滚),这个时候其实是没有执行 Try方法,当故障节点恢复后,分布式事务进行回滚则会调用二阶段的 Cancel 方法,从而形成空回滚。
悬挂(Cancel方法优先于Try方法执行)
悬挂是指因为网络问题,RM 开始没有收到Try指令,但是执行了Cancel方法后 RM 又收到了 Try 指令并且预留资源成功,这时全局事务已经结束,最终导致预留的资源不能释放。
在 Seata1.5.1 版本中,增加了一张事务控制表tcc_fence_log,该表就是来解决这个问题。而在之前 3.3 TCC核心接口定义一章中 @TwoPhaseBusinessAction 注解中的属性 useTCCFence
就是来指定是否开启这个机制,useTCCFence属性值默认是 false(不开启)。若需开启TCC防护则需指定useTCCFence=true,同时在分支事务所在服务的DB中导入如下tcc_fence_log表
:
CREATE TABLE IF NOT EXISTS `tcc_fence_log`
(`xid` VARCHAR(128) NOT NULL COMMENT 'global id',`branch_id` BIGINT NOT NULL COMMENT 'branch id',`action_name` VARCHAR(64) NOT NULL COMMENT 'action name',`status` TINYINT NOT NULL COMMENT 'status(tried:1;committed:2;rollbacked:3;suspended:4)',`gmt_create` DATETIME(3) NOT NULL COMMENT 'create time',`gmt_modified` DATETIME(3) NOT NULL COMMENT 'update time',PRIMARY KEY (`xid`, `branch_id`),KEY `idx_gmt_modified` (`gmt_modified`),KEY `idx_status` (`status`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
TCC Fence相关的详细代码可参见:
io.seata.rm.tcc.TCCResourceManager
io.seata.rm.tcc.TCCFenceHandler
大体实现思路如下:
注: 分支事务生命周期内xid、branch_id是不变且全局唯一的,其中的action_name即对应@TwoPhaseBusinessAction.name属性。
Try阶段
status_tried:1
)Confirm阶段
status_tried:1
,则修改状态为status_commited:2
并执行Confirm方法status_commited:2
说明已经执行过Confirm方法,直接返回成功,不再重复调用Confirm方法,保证了Confirm阶段的幂等性Cancel阶段
status_suspended:4
) ,同时无需调用Cancel方法直接返回,避免了Cancel阶段空回滚 status_tried:1
,则执行Cancel方法且修改状态为status_rollbacked:3
status_rollbacked:3 或 status_suspended:4
说明已经执行过Cancel方法,直接返回成功,不再重复调用Cancel方法,证了Cancel阶段的幂等性参考:
http://seata.io/zh-cn/blog/integrate-seata-tcc-mode-with-spring-cloud.html
http://seata.io/zh-cn/blog/seata-tcc-fence.html
http://seata.io/zh-cn/blog/seata-tcc.html