上周分享了一篇关于分布式事务实现原理的文章。本周作为实战篇的延续,将带大家深入代码层面,动手搭建一个可运行的TCC框架。
本着“理论先行,实践紧随”的理念,建议大家先理解分布式事务的理论基础,再跟随本文进入实战环节。
在撰写本文的同时,我并行完成了一个开源项目的搭建——这是一个基于 Go 语言从零实现的 TCC 分布式事务框架,项目在 GitHub 的开源地址为:https://github.com/xiaoxuxiansheng/gotcc。

本期分享将紧密围绕这个开源项目展开。受限于个人水平,项目实现与行文讲解难免有疏漏之处,权当抛砖引玉,欢迎大家批评指正。
1 架构设计
1.1 整体架构
首先,我们快速回顾一下分布式事务与TCC的核心概念。
事务的语义是“要么全做,要么全不做”,它要求针对多个操作建立一套原子性的、不可分割的执行机制。当这些操作涉及到跨数据库、跨服务等场景时,就构成了分布式事务。
分布式事务的实现颇具挑战,目前业界一种相对成熟且被广泛认可的解决方案就是 TCC(Try-Confirm/Cancel)。
TCC本质上是两阶段提交(2PC)的一种实现:
- 将分布式事务中负责状态数据变更的模块,封装成 TCC组件。
- 将数据变更状态拆分为:对应Try操作的 冻结、对应Confirm操作的 成功 以及对应Cancel操作的 失败回滚。
- 抽出一个统筹全局的 事务协调者角色 TXManager。执行分布式事务时分为两个阶段:
- 阶段 I:对所有组件执行 Try 操作。
- 阶段 II:根据第一阶段Try操作的执行结果,决定批量执行 Confirm 还是 Cancel 操作。

在我们动手实现TCC框架时,首先要明确两点:
- 哪些是TCC架构中通用的流程,可以抽取到SDK中供复用。
- 哪些需要给使用者预留足够的自由度,由他们自行实现并与通用SDK对接。
最终划分如下:
- 在TCC SDK中实现的通用逻辑:
- 事务协调器TXManager开启事务。
- Try-Confirm/Cancel的两阶段流程串联。
- TXManager的异步轮询任务,用于推进未完成事务。
- TCC组件的注册流程。
- 预定义事务日志存储模块 TXStore 的接口规范。
- 预定义TCC组件 TCCComponent 的接口规范。
- 需要由使用方自行实现:
- 具体实现 TCCComponent 类,包括其 Try、Confirm、Cancel 方法的业务逻辑。
- 具体实现 TXStore 日志存储模块,可根据需要选择底层存储组件(如MySQL、Redis等)。

1.2 TCC Component
TCC组件需要由用户实现,并在TXManager启动时注册到注册中心。当开启事务时,TXManager会通过注册中心获取并使用这些组件。
一个TCC组件需要支持以下核心操作:

1.3 TX Manager
事务协调器TXManager是整个TCC架构的核心。
- 作为gotcc的统一入口,供使用方启动事务和注册组件。
- 作为中枢,分别与RegisterCenter、TXStore交互。
- 负责串联整个Try-Confirm/Cancel的两阶段调用流程。
- 运行异步轮询任务,推进未完成的事务走向终态。

1.4 TX Store
TXStore是用于存储和管理事务日志明细的模块。
- 需要支持事务明细数据的CRUD能力。
- 通常需要底层存储组件(如数据库)支持。
- 在gotcc的SDK中体现为一个抽象接口,需要由用户实现具体类并注入到TXManager。

1.5 RegistryCenter
TCC组件注册管理中心RegistryCenter,负责为TXManager提供注册和查询TCC组件的能力。

2 TXManager 核心源码讲解
理清基本概念后,我们进入一线实战环节,剖析核心代码。
2.1 类图
首先,了解gotcc核心SDK中的几个核心类/接口:
- TXManager:事务协调器(class)。
- TXStore:事务日志存储模块(interface)。
- registryCenter:TCC组件注册管理中心(class)。
- TCCComponent:TCC组件(interface)。
它们之间的关系通过下面的UML类图展示:

2.2 核心类定义
2.2.1 TXManager
下面是事务协调器TXManager的核心字段:
txStore:内置的事务日志存储模块,需由使用方实现并注入。
registryCenter:TCC组件的注册管理中心。
opts:TXManager的配置项,可由使用方通过option模式自定义注入。
ctx:反映TXManager生命周期的context,终止时异步轮询任务退出。
stop:停止TXManager的控制器。

type TXManager struct {
ctx context.Context
stop context.CancelFunc
opts *Options
txStore TXStore
registryCenter *registryCenter
}
func NewTXManager(txStore TXStore, opts ...Option) *TXManager {
ctx, cancel := context.WithCancel(context.Background())
txManager := TXManager{
opts: &Options{},
txStore: txStore,
registryCenter: newRegistryCenter(),
ctx: ctx,
stop: cancel,
}
for _, opt := range opts {
opt(txManager.opts)
}
repair(txManager.opts)
go txManager.run()
return &txManager
}
2.2.2 RegistryCenter
注册中心registryCenter结构简单,通过一个map存储所有注册的TCC组件(要求组件ID唯一),并用读写锁保护map的并发安全。
type registryCenter struct {
mux sync.RWMutex
components map[string]component.TCCComponent
}
func newRegistryCenter() *registryCenter {
return ®istryCenter{
components: make(map[string]component.TCCComponent),
}
}
2.2.3 TXStore
以下是SDK中对事务日志存储模块TXStore接口的定义。这一点至关重要:使用方在实现具体TXStore时,必须实现以下所有方法,并确保功能符合预期。
CreateTX:创建事务记录,传入涉及的TCC组件列表,返回全局唯一的事务ID。
TXUpdate:更新事务记录,特指更新某个TCC组件的Try响应状态。
TXSubmit:提交事务的最终结果(成功或失败)。
GetHangingTXs:获取所有未完成(hanging状态)的事务记录。
GetTX:根据事务ID获取指定事务记录。
Lock:锁住整个事务日志存储模块(需为分布式锁)。
Unlock:解锁整个事务日志存储模块。
type TXStore interface {
// 创建一条事务明细记录
CreateTX(ctx context.Context, components ...component.TCCComponent) (txID string, err error)
// 更新事务进度:
// 规则为:倘若有一个 component try 操作执行失败,则整个事务失败;倘若所有 component try 操作执行成功,则事务成功
TXUpdate(ctx context.Context, txID string, componentID string, accept bool) error
// 提交事务的最终状态
TXSubmit(ctx context.Context, txID string, success bool) error
// 获取到所有处于中间态的事务
GetHangingTXs(ctx context.Context) ([]*Transaction, error)
// 获取指定的一笔事务
GetTX(ctx context.Context, txID string) (*Transaction, error)
// 锁住事务日志表
Lock(ctx context.Context, expireDuration time.Duration) error
// 解锁事务日志表
Unlock(ctx context.Context) error
}
2.3 注册组件
注册TCC组件的流程如下:
使用方通过TXManager的公开方法Register传入TCCComponent。
func (t *TXManager) Register(component component.TCCComponent) error {
return t.registryCenter.register(component)
}
- TXManager调用注册中心的
register方法,将组件注入到map中。
Register方法可并发调用,内部通过读写锁保证安全。
- TCC组件ID不能重复注册。
func (r *registryCenter) register(component component.TCCComponent) error {
r.mux.Lock()
defer r.mux.Unlock()
if _, ok := r.components[component.ID()]; ok {
return errors.New("repeat component id")
}
r.components[component.ID()] = component
return nil
}
TXManager可以通过组件ID查询TCC组件。若组件ID不存在,则返回错误。
func (r *registryCenter) getComponents(componentIDs ...string) ([]component.TCCComponent, error) {
components := make([]component.TCCComponent, 0, len(componentIDs))
r.mux.RLock()
defer r.mux.RUnlock()
for _, componentID := range componentIDs {
component, ok := r.components[componentID]
if !ok {
return nil, fmt.Errorf("component id: %s not existed", componentID)
}
components = append(components, component)
}
return components, nil
}
2.4 事务主流程
接下来是分布式事务最核心的运行流程。
2.4.1 主流程
用户可以通过txManager.Transaction方法一键启动分布式事务,其核心步骤如下图所示:

txManager.Transaction是用户启动分布式事务的入口,需在入参中声明本次事务涉及的组件及Try阶段要传递的请求参数。
type RequestEntity struct {
// 组件名称
ComponentID string `json:"componentName"`
// Try 请求时传递的参数
Request map[string]interface{} `json:"request"`
}
txManager.Transaction源码如下,核心步骤已添加注释。核心的try-confirm/cancel流程在后续的twoPhaseCommit方法中展开。
// 启动事务
func (t *TXManager) Transaction(ctx context.Context, reqs ...*RequestEntity) (bool, error) {
// 1 限制分布式事务执行时长
tctx, cancel := context.WithTimeout(ctx, t.opts.Timeout)
defer cancel()
// 2 获得所有的涉及使用的 tcc 组件
componentEntities, err := t.getComponents(tctx, reqs...)
if err != nil {
return false, err
}
// 3 调用 txStore 模块,创建新的事务明细记录,并取得全局唯一的事务 id
txID, err := t.txStore.CreateTX(tctx, componentEntities.ToComponents()...)
if err != nil {
return false, err
}
// 4. 开启两阶段提交流程:try-confirm/cancel
return t.twoPhaseCommit(ctx, txID, componentEntities)
}
2.4.2 2PC 串联
twoPhaseCommit方法串联了整个try-confirm/cancel流程,可以说是gotcc框架的精髓。对应流程图与源码如下:

func (t *TXManager) twoPhaseCommit(ctx context.Context, txID string, componentEntities ComponentEntities) (bool, error) {
// 1 创建子 context 用于管理子 goroutine 生命周期
// 手握 cancel 终止器,能保证在需要的时候终止所有子 goroutine 生命周期
cctx, cancel := context.WithCancel(ctx)
defer cancel()
// 2 创建一个 chan,用于接收子 goroutine 传递的错误
errCh := make(chan error)
// 3 并发启动,批量执行各 tcc 组件的 try 流程
go func() {
// 通过 waitGroup 进行多个子 goroutine 的汇总
var wg sync.WaitGroup
for _, componentEntity := range componentEntities {
// shadow
componentEntity := componentEntity
wg.Add(1)
// 并发执行各组件的 try 流程
go func() {
defer wg.Done()
resp, err := componentEntity.Component.Try(cctx, &component.TCCReq{
ComponentID: componentEntity.Component.ID(),
TXID: txID,
Data: componentEntity.Request,
})
// 出现 tcc 组件执行 try 操作失败,则需要对事务明细记录进行更新,同时把错误通过 chan 抛给父 goroutine
if err != nil || !resp.ACK {
// 对对应的事务进行更新
_ = t.txStore.TXUpdate(cctx, txID, componentEntity.Component.ID(), false)
errCh <- fmt.Errorf("component: %s try failed", componentEntity.Component.ID())
return
}
// try 请求成功,则对事务明细记录进行更新. 倘若更新失败,也要视为错误,抛给父 goroutine
if err = t.txStore.TXUpdate(cctx, txID, componentEntity.Component.ID(), true); err != nil {
errCh <- err
}
}()
}
// 等待所有子 goroutine 运行完成
wg.Wait()
// 关闭 errCh,告知父 goroutine 所有任务已运行完成的信息
close(errCh)
}()
successful := true
// 4 通过 chan,阻塞子 goroutine 执行完成
// 4.1 但凡出现一个子 goroutine 遇到了错误,则会提前接收到错误,并调用 cancel 方法熔断其他所有子 goroutine 流程
// 4.2 倘若所有子 goroutine 都执行成功,则会通过 chan 的关闭事件推进流程,对应 err 为 nil
if err := <-errCh; err != nil {
// 只要有一笔 try 请求出现问题,其他的都进行终止
cancel()
successful = false
}
// 5 异步执行第二阶段的 confirm/cancel 流程
// 之所以是异步,是因为实际上在第一阶段 try 的响应结果尘埃落定时,对应事务的成败已经有了定论
// 第二阶段能够容忍异步执行的原因在于,执行失败时,还有轮询任务进行兜底
go t.advanceProgressByTXID(txID)
// 6 响应结果
// 6.1 倘若所有 try 请求都成功,则 successful 为 try,事务成功
// 6.2 但凡有一个 try 请求处理出现问题,successful 为 false,事务失败
return successful, nil
}
2.4.3 事务进度推进
当一笔事务的第一阶段所有Try请求都有响应后,需要根据结果执行第二阶段的Confirm或Cancel操作,并将事务状态推进为终态。
- 若所有组件Try成功,则批量调用Confirm,并更新事务状态为成功。
- 若任一组件Try失败或事务超时,则批量调用Cancel,并更新事务状态为失败。

// 传入一个事务 id 推进其进度
func (t *TXManager) advanceProgressByTXID(txID string) error {
// 获取事务日志明细
tx, err := t.txStore.GetTX(t.ctx, txID)
if err != nil {
return err
}
// 推进进度
return t.advanceProgress(tx)
}
// 传入一个事务 id 推进其进度
func (t *TXManager) advanceProgress(tx *Transaction) error {
// 1 推断出事务当前的状态
// 1.1 倘若所有组件 try 都成功,则为 successful
// 1.2 倘若存在组件 try 失败,则为 failure
// 1.3 倘若事务超时了,则为 failure
// 1.4 否则事务状态为 hanging
txStatus := tx.getStatus(time.Now().Add(-t.opts.Timeout))
// hanging 状态的事务暂时不处理
if txStatus == TXHanging {
return nil
}
// 2 根据事务是否成功,定制不同的处理函数
success := txStatus == TXSuccessful
var confirmOrCancel func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error)
var txAdvanceProgress func(ctx context.Context) error
if success {
// 如果事务成功,则需要对组件进行 confirm
confirmOrCancel = func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error) {
return component.Confirm(ctx, tx.TXID)
}
// 如果事务成功,则需要在最后更新事务日志记录的状态为成功
txAdvanceProgress = func(ctx context.Context) error {
return t.txStore.TXSubmit(ctx, tx.TXID, true)
}
} else {
// 如果事务失败,则需要对组件进行 cancel
confirmOrCancel = func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error) {
return component.Cancel(ctx, tx.TXID)
}
// 如果事务失败,则需要在最后更新事务日志记录的状态为失败
txAdvanceProgress = func(ctx context.Context) error {
return t.txStore.TXSubmit(ctx, tx.TXID, false)
}
}
// 3 批量调用组件,执行第二阶段的 confirm/cancel 操作
for _, component := range tx.Components {
// 获取对应的 tcc component
components, err := t.registryCenter.getComponents(component.ComponentID)
if err != nil || len(components) == 0 {
return errors.New("get tcc component failed")
}
resp, err := confirmOrCancel(t.ctx, components[0])
if err != nil {
return err
}
if !resp.ACK {
return fmt.Errorf("component: %s ack failed", component.ComponentID)
}
}
// 4 二阶段 confirm/cancel 操作都执行完成后,对事务状态进行提交
return txAdvanceProgress(t.ctx)
}
2.5 异步轮询流程
接下来是TXManager的异步轮询流程,这是保证框架鲁棒性的重要兜底机制。如果事务完成了第一阶段Try,但第二阶段执行失败或未执行,将由异步轮询任务补全操作并更新状态。
2.5.1 启动时机
异步轮询任务在TXManager初始化时启动,通过一个独立的goroutine持久运行。
func NewTXManager(txStore TXStore, opts ...Option) *TXManager {
ctx, cancel := context.WithCancel(context.Background())
txManager := TXManager{
opts: &Options{},
txStore: txStore,
registryCenter: NewRegistryCenter(),
ctx: ctx,
stop: cancel,
}
for _, opt := range opts {
opt(txManager.opts)
}
repair(txManager.opts)
go txManager.run()
return &txManager
}
2.5.2 轮询流程
异步轮询任务基于 for 循环 + select 多路复用实现定时执行。轮询间隔会根据处理过程是否出错动态调整:若出错则增大间隔,以实现节点间的退避谦让。

func (t *TXManager) run() {
var tick time.Duration
var err error
// 1 for 循环自旋式运行任务
for {
// 如果处理过程中出现了错误,需要增长轮询时间间隔
if err == nil {
tick = t.opts.MonitorTick
} else {
tick = t.backOffTick(tick)
}
// select 多路复用
select {
// 倘若 txManager.ctx 被终止,则异步轮询任务退出
case <-t.ctx.Done():
return
// 2 等待 tick 对应时长后,开始执行任务
case <-time.After(tick):
// 对 txStore 加分布式锁,避免分布式服务下多个服务节点的轮询任务重复执行
if err = t.txStore.Lock(t.ctx, t.opts.MonitorTick); err != nil {
// 取锁失败时(大概率被其他节点占有),不需要增加 tick 时长
err = nil
continue
}
// 3 获取处于 hanging 状态的事务
var txs []*Transaction
if txs, err = t.txStore.GetHangingTXs(t.ctx); err != nil {
_ = t.txStore.Unlock(t.ctx)
continue
}
// 4 批量推进事务进度
err = t.batchAdvanceProgress(txs)
_ = t.txStore.Unlock(t.ctx)
}
}
}
轮询间隔的退避策略是每次翻倍,上限为初始时长的8倍。
func (t *TXManager) backOffTick(tick time.Duration) time.Duration {
tick <<= 1
if threshold := t.opts.MonitorTick << 3; tick > threshold {
return threshold
}
return tick
}
2.5.3 批量推进事务进度
这是异步轮询任务中批量推进事务第二阶段执行的流程,核心是并发处理多项事务。
func (t *TXManager) batchAdvanceProgress(txs []*Transaction) error {
// 1 创建一个 chan,用于接收子 goroutine 传输的 err
errCh := make(chan error)
go func() {
// 2 通过 waitGroup 聚合多个子 groutine
var wg sync.WaitGroup
for _, tx := range txs {
// shadow
tx := tx
wg.Add(1)
go func() {
defer wg.Done()
// 3 推进每笔事务的进度
if err := t.advanceProgress(tx); err != nil {
// 遇到错误则投递到 errCh
errCh <- err
}
}()
}
// 4 收口等待所有子 goroutine 执行完成
wg.Wait()
// 5 所有子 goroutine 执行完成后关闭 chan,唤醒阻塞等待的父 goroutine
close(errCh)
}()
// 记录遇到的第一个错误
var firstErr error
// 6 父 goroutine 通过 chan 阻塞在这里,直到所有 goroutine 执行完成,chan 被 close 才能往下
for err := range errCh {
// 记录遇到的第一个错误
if firstErr != nil {
continue
}
firstErr = err
}
// 7 返回错误,核心是标识执行过程中,是否发生过错误
return firstErr
}
3 GOTCC 使用案例讲解
从本章开始,我们将从使用方视角出发,定义需要实现的模块,并给出应用gotcc框架的完整代码示例。
3.1 TCC 组件实现
首先,我们定义一个具体的TCC组件实现类。
3.1.1 类定义
定义MockComponent类,内置Redis客户端,用于状态数据的存取。

// 实现的 tcc 组件
type MockComponent struct {
// tcc 组件唯一标识 id,构造时由使用方传入
id string
// redis 客户端
client *redis_lock.Client
}
func NewMockComponent(id string, client *redis_lock.Client) *MockComponent {
return &MockComponent{
id: id,
client: client,
}
}
// 返回 tcc 组件的唯一标识 id
func (m *MockComponent) ID() string {
return m.id
}
3.1.2 Try 流程
实现TCC组件的Try方法,关键要点已在代码注释中说明。

func (m *MockComponent) Try(ctx context.Context, req *component.TCCReq) (*component.TCCResp, error) {
// 1 基于 txID 维度加 redis 分布式锁
lock := redis_lock.NewRedisLock(pkg.BuildTXLockKey(m.id, req.TXID), m.client)
if err := lock.Lock(ctx); err != nil {
return nil, err
}
defer func() {
_ = lock.Unlock(ctx)
}()
// 2 基于 txID 幂等性去, 需要对事务的状态进行检查
txStatus, err := m.client.Get(ctx, pkg.BuildTXKey(m.id, req.TXID))
if err != nil && !errors.Is(err, redis_lock.ErrNil) {
return nil, err
}
res := component.TCCResp{
ComponentID: m.id,
TXID: req.TXID,
}
switch txStatus {
case TXTried.String(), TXConfirmed.String(): // 重复的 try 请求,给予成功的响应
res.ACK = true
return &res, nil
case TXCanceled.String(): // 此前该事务已 cancel,则拒绝本次 try 请求
return &res, nil
default:
}
// 3 建立 txID 与 bizID 的关联
bizID := gocast.ToString(req.Data["biz_id"])
if _, err = m.client.Set(ctx, pkg.BuildTXDetailKey(m.id, req.TXID), bizID); err != nil {
return nil, err
}
// 4 把 bizID 对应的业务数据置为冻结态
reply, err := m.client.SetNX(ctx, pkg.BuildDataKey(m.id, req.TXID, bizID), DataFrozen.String())
if err != nil {
return nil, err
}
// 倘若数据此前已冻结或已使用,则拒绝本次 try 请求
if reply != 1 {
return &res, nil
}
// 5 更新当前组件下的事务状态为 tried
_, err = m.client.Set(ctx, pkg.BuildTXKey(m.id, req.TXID), TXTried.String())
if err != nil {
return nil, err
}
// 6 给予接收 try 请求的响应
res.ACK = true
return &res, nil
}
3.1.3 Confirm 流程
实现TCC组件的Confirm方法。

func (m *MockComponent) Confirm(ctx context.Context, txID string) (*component.TCCResp, error) {
// 1 基于 txID 维度加锁
lock := redis_lock.NewRedisLock(pkg.BuildTXLockKey(m.id, txID), m.client)
if err := lock.Lock(ctx); err != nil {
return nil, err
}
defer func() {
_ = lock.Unlock(ctx)
}()
// 2. 校验事务状态,要求对应组件下,事务此前的状态为 tried
txStatus, err := m.client.Get(ctx, pkg.BuildTXKey(m.id, txID))
if err != nil {
return nil, err
}
res := component.TCCResp{
ComponentID: m.id,
TXID: txID,
}
switch txStatus {
case TXConfirmed.String(): // 事务状态已 confirm,直接幂等响应为成功
res.ACK = true
return &res, nil
case TXTried.String(): // 只有事务状态为 try 才是合法的,会对程序放行
default: // 其他情况直接拒绝,ack 为 false
return &res, nil
}
// 3 获取事务对应的 bizID
bizID, err := m.client.Get(ctx, pkg.BuildTXDetailKey(m.id, txID))
if err != nil {
return nil, err
}
// 4. 校验业务数据此前状态是否为冻结
dataStatus, err := m.client.Get(ctx, pkg.BuildDataKey(m.id, txID, bizID))
if err != nil {
return nil, err
}
// 如果此前非冻结态,则拒绝本次请求
if dataStatus != DataFrozen.String() {
return &res, nil
}
// 5 把业务数据的更新操作置为 successful
if _, err = m.client.Set(ctx, pkg.BuildDataKey(m.id, txID, bizID), DataSuccessful.String()); err != nil {
return nil, err
}
// 6 把对应组件下的事务状态更新为成功,这一步哪怕失败了也不阻塞主流程
_, _ = m.client.Set(ctx, pkg.BuildTXKey(m.id, txID), TXConfirmed.String())
// 7 处理成功,给予成功的响应
res.ACK = true
return &res, nil
}
3.1.4 Cancel 流程
实现TCC组件的Cancel方法。

func (m *MockComponent) Cancel(ctx context.Context, txID string) (*component.TCCResp, error) {
// 1 基于 txID 维度加锁
lock := redis_lock.NewRedisLock(pkg.BuildTXLockKey(m.id, txID), m.client)
if err := lock.Lock(ctx); err != nil {
return nil, err
}
defer func() {
_ = lock.Unlock(ctx)
}()
// 2 校验事务状态,只要不是 confirmed,都允许被置为 canceled
txStatus, err := m.client.Get(ctx, pkg.BuildTXKey(m.id, txID))
if err != nil && !errors.Is(err, redis_lock.ErrNil) {
return nil, err
}
// 倘若组件内事务此前的状态为 confirmed,则说明流程有异常.
if txStatus == TXConfirmed.String() {
return nil, fmt.Errorf("invalid tx status: %s, txid: %s", txStatus, txID)
}
// 3 根据事务获取对应的 bizID
bizID, err := m.client.Get(ctx, pkg.BuildTXDetailKey(m.id, txID))
if err != nil {
return nil, err
}
// 4 删除对应的 frozen 冻结记录,代表对数据执行了回滚操作
if err = m.client.Del(ctx, pkg.BuildDataKey(m.id, txID, bizID)); err != nil {
return nil, err
}
// 5 把事务状态更新为 canceled
_, err = m.client.Set(ctx, pkg.BuildTXKey(m.id, txID), TXCanceled.String())
if err != nil {
return nil, err
}
// 6 给予处理成功的 ack
return &component.TCCResp{
ACK: true,
ComponentID: m.id,
TXID: txID,
}, nil
}
3.2 TX Store 实现
接下来是实现事务日志存储模块TXStore。
3.2.1 类定义
声明MockTXStore类,使用MySQL存储事务日志明细,使用Redis实现模块级的分布式锁。与数据库交互的操作封装在TXRecordDAO中。

// TXStore 模块具体实现
type MockTXStore struct {
// redis 客户端,用于实现分布式锁
client *redis_lock.Client
// 事务日志存储 DAO 层
dao *expdao.TXRecordDAO
}
func NewMockTXStore(dao *expdao.TXRecordDAO, client *redis_lock.Client) *MockTXStore {
return &MockTXStore{
dao: dao,
client: client,
}
}
事务日志存储DAO层:
type TXRecordDAO struct {
db *gorm.DB
}
func NewTXRecordDAO(db *gorm.DB) *TXRecordDAO {
return &TXRecordDAO{
db: db,
}
}
事务日志明细的持久化对象(PO)模型定义:
- 内嵌
gorm.Model(包含ID、CreatedAt等字段)。
Status:事务状态(hanging/successful/failure)。
ComponentTryStatuses:以JSON字符串格式存储各组件Try响应结果,实际类型为map[string]*ComponentTryStatus。

type TXRecordPO struct {
gorm.Model
Status string `gorm:"status"`
ComponentTryStatuses string `gorm:"component_try_statuses"`
}
func (t TXRecordPO) TableName() string {
return "tx_record"
}
type ComponentTryStatus struct {
ComponentID string `json:"componentID"`
TryStatus string `json:"tryStatus"`
}
事务日志明细表建表语句:
CREATE TABLE IF NOT EXISTS `tx_record`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`status` varchar(16) NOT NULL COMMENT '事务状态 hanging/successful/failure',
`component_try_statuses` json DEFAULT NULL COMMENT '各组件 try 接口请求状态 hanging/successful/failure',
`deleted_at` datetime DEFAULT NULL COMMENT '删除时间',
`created_at` datetime NOT NULL COMMENT '创建时间',
`updated_at` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE COMMENT '主键索引',
KEY `idx_status` (`status`) COMMENT '事务状态索引'
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='事务日志记录';
3.2.2 创建事务记录
通过TXStore模块创建事务明细记录的实现。
func (m *MockTXStore) CreateTX(ctx context.Context, components ...component.TCCComponent) (string, error) {
// 创建一个记录组件 try 响应结果的 map,其中以组件 id 为 key
componentTryStatuses := make(map[string]*expdao.ComponentTryStatus, len(components))
for _, component := range components {
componentTryStatuses[component.ID()] = &expdao.ComponentTryStatus{
ComponentID: component.ID(),
TryStatus: txmanager.TryHanging.String(),
}
}
statusesBody, _ := json.Marshal(componentTryStatuses)
// 创建事务明细记录 po 示例,调用 dao 模块将记录落库
txID, err := m.dao.CreateTXRecord(ctx, &expdao.TXRecordPO{
Status: txmanager.TXHanging.String(),
ComponentTryStatuses: string(statusesBody),
})
if err != nil {
return "", err
}
return gocast.ToString(txID), nil
}
DAO层创建事务明细记录:
func (t *TXRecordDAO) CreateTXRecord(ctx context.Context, record *TXRecordPO) (uint, error) {
return record.ID, t.db.WithContext(ctx).Model(&TXRecordPO{}).Create(record).Error
}
3.2.3 事务明细更新
更新事务明细的方法。流程是:对记录加写锁 -> 根据组件Try结果更新JSON字符串 -> 写回表。
func (m *MockTXStore) TXUpdate(ctx context.Context, txID string, componentID string, accept bool) error {
// 后续需要闭包传入执行函数
do := func(ctx context.Context, dao *expdao.TXRecordDAO, record *expdao.TXRecordPO) error {
componentTryStatuses := make(map[string]*expdao.ComponentTryStatus)
_ = json.Unmarshal([]byte(record.ComponentTryStatuses), &componentTryStatuses)
if accept {
componentTryStatuses[componentID].TryStatus = txmanager.TrySucceesful.String()
} else {
componentTryStatuses[componentID].TryStatus = txmanager.TryFailure.String()
}
newBody, _ := json.Marshal(componentTryStatuses)
record.ComponentTryStatuses = string(newBody)
return dao.UpdateTXRecord(ctx, record)
}
_txID := gocast.ToUint(txID)
return m.dao.LockAndDo(ctx, _txID, do)
}
// 通过 gorm 实现数据记录加写锁,并执行闭包函数的操作:
func (t *TXRecordDAO) LockAndDo(ctx context.Context, id uint, do func(ctx context.Context, dao *TXRecordDAO, record *TXRecordPO) error) error {
// 开启事务
return t.db.Transaction(func(tx *gorm.DB) error {
defer func() {
if err := recover(); err != nil {
tx.Rollback()
}
}()
// 加写锁
var record TXRecordPO
if err := tx.Set("gorm:query_option", "FOR UPDATE").WithContext(ctx).First(&record, id).Error; err != nil {
return err
}
txDAO := NewTXRecordDAO(tx)
// 执行闭包函数
return do(ctx, txDAO, &record)
})
}
// 更新一条事务日志数据记录
func (t *TXRecordDAO) UpdateTXRecord(ctx context.Context, record *TXRecordPO) error {
return t.db.WithContext(ctx).Updates(record).Error
}
3.2.4 查询事务
查询事务的两个方法实现。
// 根据事务 id 查询指定的一笔事务明细记录:
func (m *MockTXStore) GetTX(ctx context.Context, txID string) (*txmanager.Transaction, error) {
// 通过 option 在查询条件中注入事务 id
records, err := m.dao.GetTXRecords(ctx, expdao.WithID(gocast.ToUint(txID)))
if err != nil {
return nil, err
}
if len(records) != 1 {
return nil, errors.New("get tx failed")
}
// 对各组件 try 明细内容进行反序列化
componentTryStatuses := make(map[string]*expdao.ComponentTryStatus)
_ = json.Unmarshal([]byte(records[0].ComponentTryStatuses), &componentTryStatuses)
components := make([]*txmanager.ComponentTryEntity, 0, len(componentTryStatuses))
for _, tryItem := range componentTryStatuses {
components = append(components, &txmanager.ComponentTryEntity{
ComponentID: tryItem.ComponentID,
TryStatus: txmanager.ComponentTryStatus(tryItem.TryStatus),
})
}
return &txmanager.Transaction{
TXID: txID,
Status: txmanager.TXStatus(records[0].Status),
Components: components,
CreatedAt: records[0].CreatedAt,
}, nil
}
// 获取全量处于中间态的事务明细记录
func (m *MockTXStore) GetHangingTXs(ctx context.Context) ([]*txmanager.Transaction, error) {
// 通过 option 在查询条件中指定事务状态为 hanging
records, err := m.dao.GetTXRecords(ctx, expdao.WithStatus(txmanager.TryHanging))
if err != nil {
return nil, err
}
txs := make([]*txmanager.Transaction, 0, len(records))
for _, record := range records {
// 对各组件 try 响应结果进行反序列化
componentTryStatuses := make(map[string]*expdao.ComponentTryStatus)
_ = json.Unmarshal([]byte(record.ComponentTryStatuses), &componentTryStatuses)
components := make([]*txmanager.ComponentTryEntity, 0, len(componentTryStatuses))
for _, component := range componentTryStatuses {
components = append(components, &txmanager.ComponentTryEntity{
ComponentID: component.ComponentID,
TryStatus: txmanager.ComponentTryStatus(component.TryStatus),
})
}
txs = append(txs, &txmanager.Transaction{
TXID: gocast.ToString(record.ID),
Status: txmanager.TXHanging,
CreatedAt: record.CreatedAt,
Components: components,
})
}
return txs, nil
}
DAO层通用的查询方法,通过option模式灵活组装查询条件。
func (t *TXRecordDAO) GetTXRecords(ctx context.Context, opts ...QueryOption) ([]*TXRecordPO, error) {
db := t.db.WithContext(ctx).Model(&TXRecordPO{})
for _, opt := range opts {
db = opt(db)
}
var records []*TXRecordPO
return records, db.Scan(&records).Error
}
Option定义示例:
type QueryOption func(db *gorm.DB) *gorm.DB
// 通过事务主键 id 进行查询
func WithID(id uint) QueryOption {
return func(db *gorm.DB) *gorm.DB {
return db.Where("id = ?", id)
}
}
// 通过事务状态进行查询
func WithStatus(status txmanager.ComponentTryStatus) QueryOption {
return func(db *gorm.DB) *gorm.DB {
return db.Where("status = ?", status.String())
}
}
3.2.5 提交事务结果
在事务执行完成后,将最终结果更新到事务记录中。
// 提交事务的最终状态
func (m *MockTXStore) TXSubmit(ctx context.Context, txID string, success bool) error {
do := func(ctx context.Context, dao *expdao.TXRecordDAO, record *expdao.TXRecordPO) error {
if success {
record.Status = txmanager.TXSuccessful.String()
} else {
record.Status = txmanager.TXFailure.String()
}
return dao.UpdateTXRecord(ctx, record)
}
return m.dao.LockAndDo(ctx, gocast.ToUint(txID), do)
}
3.2.6 加/解全局锁
实现整个TXStore模块的加解锁,内部基于Redis分布式锁。
func (m *MockTXStore) Lock(ctx context.Context, expireDuration time.Duration) error {
lock := redis_lock.NewRedisLock(pkg.BuildTXRecordLockKey(), m.client, redis_lock.WithExpireSeconds(int64(expireDuration.Seconds())))
return lock.Lock(ctx)
}
func (m *MockTXStore) Unlock(ctx context.Context) error {
lock := redis_lock.NewRedisLock(pkg.BuildTXRecordLockKey(), m.client)
return lock.Unlock(ctx)
}
至此,所有前置模块已实现完毕,下面展示应用gotcc框架的完整示例。
3.3 使用代码示例
由于实现的MockTXStore和MockComponent依赖MySQL和Redis,需要配置连接信息。以下是一个单元测试示例,关键点已在注释中说明。
const (
dsn = "请输入你的 mysql dsn"
network = "tcp"
address = "请输入你的 redis ip"
password = "请输入你的 redis 密码"
)
// 使用 tcc 单测代码
func Test_TCC(t *testing.T) {
// 创建 redis 客户端
redisClient := pkg.NewRedisClient(network, address, password)
// 创建 mysql 客户端
mysqlDB, err := pkg.NewDB(dsn)
if err != nil {
t.Error(err)
return
}
// 构造三个 tcc 组件
componentAID := "componentA"
componentBID := "componentB"
componentCID := "componentC"
componentA := NewMockComponent(componentAID, redisClient)
componentB := NewMockComponent(componentBID, redisClient)
componentC := NewMockComponent(componentCID, redisClient)
// 构造出事务日志存储模块
txRecordDAO := dao.NewTXRecordDAO(mysqlDB)
txStore := NewMockTXStore(txRecordDAO, redisClient)
// 构造出 txManager 模块
txManager := txmanager.NewTXManager(txStore, txmanager.WithMonitorTick(time.Second))
defer txManager.Stop()
// 完成三个组件的注册
if err := txManager.Register(componentA); err != nil {
t.Error(err)
return
}
if err := txManager.Register(componentB); err != nil {
t.Error(err)
return
}
if err := txManager.Register(componentC); err != nil {
t.Error(err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
// 启动分布式事务
success, err := txManager.Transaction(ctx, []*txmanager.RequestEntity{
{ComponentID: componentAID,
Request: map[string]interface{}{
"biz_id": componentAID + "_biz",
},
},
{ComponentID: componentBID,
Request: map[string]interface{}{
"biz_id": componentBID + "_biz",
},
},
{ComponentID: componentCID,
Request: map[string]interface{}{
"biz_id": componentCID + "_biz",
},
},
}...)
if err != nil {
t.Errorf("tx failed, err: %v", err)
return
}
if !success {
t.Error("tx failed")
return
}
// 分布式事务处理成功
t.Log("success")
}
4 总结
本文围绕基于Go语言实现的TCC分布式事务框架gotcc,进行了从架构设计到代码实战的详细讲解。核心内容总结如下:
- 本期从零搭建了TCC框架开源项目gotcc,并进行了源码级解读。
- 在gotcc中,将事务协调器TXManager的核心逻辑(如两阶段流程串联、组件注册、异步轮询)实现在SDK中,供应用方复用。
- 同时定义了TCC组件和事务日志存储模块的抽象接口,这部分需由应用方根据业务实现并注入。
gotcc的开源地址为 https://github.com/xiaoxuxiansheng/gotcc 。项目还有许多可优化空间,欢迎大家在云栈社区交流探讨。本文旨在提供一种清晰的TCC实现思路,个人水平有限,不足之处恳请指正。