目录

MySQL-14 分布式事务

MySQL 分布式事务


目录

  1. 为什么需要分布式事务
  2. 理论基础:CAP 与 BASE
  3. 2PC:两阶段提交
  4. 3PC:三阶段提交
  5. MySQL XA 事务
  6. TCC 模式
  7. Saga 模式
  8. 本地消息表(Transactional Outbox)
  9. 方案对比与选型

1. 为什么需要分布式事务

1.1 问题场景

单机 InnoDB 事务能保证同一数据库内的 ACID。但在微服务架构下,一次下单操作可能跨越多个独立服务、独立数据库:

用户点击"立即购买",触发以下操作:

  ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
  │   订单服务       │    │   商品服务       │    │   账户服务       │
  │   order_db      │    │   product_db    │    │   account_db    │
  │                 │    │                 │    │                 │
  │  INSERT orders  │    │  UPDATE stock   │    │  UPDATE balance │
  │  (创建订单)      │    │  (扣减库存)      │    │  (扣减余额)      │
  └─────────────────┘    └─────────────────┘    └─────────────────┘

  三个操作分别在三个独立的数据库中,普通事务无法跨库

可能出现的问题

故障点 结果
订单创建成功,库存扣减失败 有订单,无库存扣减 → 超卖
订单创建成功,扣款失败 有订单,钱没扣 → 资损
网络超时,不知道对方是否成功 状态不确定 → 数据不一致

1.2 核心难题

分布式事务的核心难题:
  1. 原子性:多个节点的操作要么全部成功,要么全部回滚
  2. 不确定性:网络超时时,无法判断对方是否已经执行
  3. 故障恢复:任意节点崩溃后,系统能自动恢复到一致状态

2. 理论基础:CAP 与 BASE

2.1 CAP 定理

分布式系统三个特性最多同时满足两个

C(Consistency)         强一致性:所有节点同一时刻看到的数据相同
A(Availability)        可用性:每个请求都能收到响应(非错误)
P(Partition Tolerance) 分区容错性:网络分区时系统仍能运行

网络分区是客观必然(物理网络会发生中断),所以P 必须保证,实际是在 CP 和 AP 中选择:

选择 典型系统 行为
CP ZooKeeper、etcd 网络分区时拒绝服务,保证数据一致
AP Cassandra、DNS 网络分区时继续服务,允许数据短暂不一致

2.2 BASE 理论

BASE 是互联网系统对 CAP 中 AP 方向的实践总结,与 ACID 相对:

BA(Basically Available)   基本可用:核心功能正常,非核心功能可以降级
S(Soft State)             软状态:允许节点间数据存在短暂不一致的中间状态
E(Eventually Consistent)  最终一致性:经过一段时间(毫秒~秒级),所有节点数据达到一致

绝大多数互联网业务选择 BASE:放弃实时强一致性,换取高可用和高性能。


3. 2PC:两阶段提交

3.1 角色与流程

2PC(Two-Phase Commit)包含两个角色:

  • 协调者(Coordinator):驱动整个流程,做最终决策
  • 参与者(Participant):执行实际操作,听从协调者指令
┌────────────────────────────────────────────────────────────┐
│                    阶段一:Prepare                           │
│                                                            │
│  协调者 ──Prepare──▶ 参与者A:执行SQL,写Undo/Redo Log,锁资源,回复 Yes │
│          ──Prepare──▶ 参与者B:执行SQL,写Undo/Redo Log,锁资源,回复 Yes │
│          ──Prepare──▶ 参与者C:执行SQL,写Undo/Redo Log,锁资源,回复 No  │
└────────────────────────────────────────────────────────────┘

┌────────────────────────────────────────────────────────────┐
│                    阶段二:Commit / Rollback                 │
│                                                            │
│  全部 Yes → 协调者 ──Commit──▶ 所有参与者提交事务,释放锁    │
│  任一 No  → 协调者 ──Rollback─▶ 所有参与者回滚事务,释放锁   │
└────────────────────────────────────────────────────────────┘

3.2 2PC 的三个核心问题

问题 1:同步阻塞

Prepare 阶段参与者锁定资源后,必须等协调者的第二阶段指令。期间所有锁一直被持有,其他事务只能等待,性能差

问题 2:协调者单点故障

时序:
  协调者 ──Commit──▶ 参与者A  ✅ A 已提交
  协调者    宕机
  参与者B/C  ❓  没有收到指令,一直阻塞

结果:A 提交了,B/C 没有 → 数据不一致,且无法自动恢复

问题 3:脑裂

协调者发送 Commit,网络分区:
  参与者A 收到 Commit → 提交
  参与者B 没收到 → 一直等待

即使协调者恢复,也无法区分 B 是网络问题还是已宕机

4. 3PC:三阶段提交

3PC 在 2PC 基础上做了两个改进:增加 CanCommit 预检阶段 + 引入超时自动提交机制。

阶段一:CanCommit
  协调者问:你能执行吗?
  参与者:检查资源可用性,回复 Yes/No(不锁资源)

阶段二:PreCommit
  所有 Yes → 协调者发 PreCommit,参与者锁资源、执行SQL、写日志
  参与者等待 DoCommit,若超时则自动中断(解决阻塞)

阶段三:DoCommit
  所有 Ack → 协调者发 DoCommit,参与者提交
  若参与者等待 DoCommit 超时 → 默认提交(非回滚,这是关键区别)

3PC 的局限:DoCommit 阶段网络分区时,收到 Rollback 的参与者回滚,超时默认提交的参与者提交,仍然不一致。实现复杂而收益有限,实际业务中几乎不直接使用。


5. MySQL XA 事务

MySQL InnoDB 实现了标准 XA 协议,是数据库层面的 2PC 实现。

5.1 XA 基本语法

-- 开始 XA 事务,xid 全局唯一
XA START 'order-20240101-001';

INSERT INTO orders (user_id, product_id, quantity, amount, status)
VALUES (42, 101, 2, 199.00, 'pending');

XA END 'order-20240101-001';
XA PREPARE 'order-20240101-001';   -- Phase 1 完成,持久化到磁盘

-- 协调者做决策
XA COMMIT  'order-20240101-001';   -- Phase 2 提交
-- 或
XA ROLLBACK 'order-20240101-001'; -- Phase 2 回滚

5.2 XA 状态机

XA START   → ACTIVE(事务进行中)
XA END     → IDLE(SQL 执行完,等待 Prepare)
XA PREPARE → PREPARED(Prepare 成功,可以 Commit 或 Rollback)
XA COMMIT  → 结束
XA ROLLBACK → 结束

PREPARED 是关键状态:MySQL 重启后仍然存在,等待协调者指令

5.3 崩溃恢复

-- 查看所有处于 PREPARED 状态的 XA 事务
XA RECOVER;
-- +----------+--------------+--------------+------------------------+
-- | formatID | gtrid_length | bqual_length | data                   |
-- +----------+--------------+--------------+------------------------+
-- |        1 |           20 |            0 | order-20240101-001     |

-- 根据协调者日志决定提交还是回滚
XA COMMIT  'order-20240101-001';
XA ROLLBACK 'order-20240101-001';

5.4 XA 的限制

1. 性能差:Prepare 阶段行锁持续到 Commit,等待期间阻塞大量请求
2. 不支持与普通 BEGIN 事务嵌套
3. 不支持 SAVEPOINT
4. 协调者本身仍然是单点,宕机后需要人工干预
5. 适用于跨库但同机构的低并发场景,不适合高并发互联网业务

6. TCC 模式

TCC(Try-Confirm-Cancel)是业务层实现的两阶段提交,对数据库无特殊要求。

6.1 三个阶段

Try(预留资源):
  检查业务可行性
  冻结/预占资源(不真正扣减,只是标记"已预留")
  操作必须幂等

Confirm(确认执行):
  使用 Try 阶段预留的资源,执行真正的业务操作
  只要 Try 成功,Confirm 必定能成功
  操作必须幂等

Cancel(释放资源):
  释放 Try 阶段预留的资源,恢复原始状态
  需要处理"空回滚":Try 未执行时 Cancel 被调用
  操作必须幂等

6.2 数据库表设计

订单服务(order_db)

CREATE TABLE orders (
    id           BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    order_no     VARCHAR(64) NOT NULL UNIQUE COMMENT '业务订单号',
    user_id      BIGINT UNSIGNED NOT NULL,
    product_id   BIGINT UNSIGNED NOT NULL,
    quantity     INT UNSIGNED NOT NULL,
    amount       DECIMAL(10, 2) NOT NULL,
    status       ENUM('pending','confirmed','cancelled') NOT NULL DEFAULT 'pending',
    created_at   DATETIME NOT NULL DEFAULT NOW(),
    updated_at   DATETIME NOT NULL DEFAULT NOW() ON UPDATE NOW(),
    INDEX idx_user_id (user_id),
    INDEX idx_product_id (product_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- TCC 事务记录(用于幂等和空回滚判断)
CREATE TABLE tcc_fence (
    xid          VARCHAR(128) NOT NULL COMMENT '全局事务ID',
    branch_id    VARCHAR(128) NOT NULL COMMENT '分支事务ID',
    op           TINYINT NOT NULL COMMENT '1=try 2=confirm 3=cancel',
    created_at   DATETIME NOT NULL DEFAULT NOW(),
    PRIMARY KEY (xid, branch_id, op)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

商品服务(product_db)

CREATE TABLE products (
    id           BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    name         VARCHAR(255) NOT NULL,
    price        DECIMAL(10, 2) NOT NULL,
    stock        INT UNSIGNED NOT NULL DEFAULT 0 COMMENT '实际库存',
    stock_frozen INT UNSIGNED NOT NULL DEFAULT 0 COMMENT '冻结库存(TCC预留)',
    version      INT UNSIGNED NOT NULL DEFAULT 0 COMMENT '乐观锁',
    updated_at   DATETIME NOT NULL DEFAULT NOW() ON UPDATE NOW()
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- TCC 事务记录
CREATE TABLE tcc_fence (
    xid          VARCHAR(128) NOT NULL,
    branch_id    VARCHAR(128) NOT NULL,
    op           TINYINT NOT NULL,
    created_at   DATETIME NOT NULL DEFAULT NOW(),
    PRIMARY KEY (xid, branch_id, op)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

6.3 GORM 模型定义

// ====== 订单服务模型 ======

type OrderStatus string

const (
    OrderStatusPending   OrderStatus = "pending"
    OrderStatusConfirmed OrderStatus = "confirmed"
    OrderStatusCancelled OrderStatus = "cancelled"
)

type Order struct {
    ID        uint64      `gorm:"primaryKey;autoIncrement"`
    OrderNo   string      `gorm:"uniqueIndex;size:64;not null"`
    UserID    uint64      `gorm:"not null;index"`
    ProductID uint64      `gorm:"not null;index"`
    Quantity  uint32      `gorm:"not null"`
    Amount    float64     `gorm:"type:decimal(10,2);not null"`
    Status    OrderStatus `gorm:"type:enum('pending','confirmed','cancelled');default:pending"`
    CreatedAt time.Time
    UpdatedAt time.Time
}

type TCCFence struct {
    XID      string    `gorm:"primaryKey;size:128"`
    BranchID string    `gorm:"primaryKey;size:128"`
    Op       int8      `gorm:"primaryKey"` // 1=try 2=confirm 3=cancel
    CreatedAt time.Time
}

// ====== 商品服务模型 ======

type Product struct {
    ID          uint64    `gorm:"primaryKey;autoIncrement"`
    Name        string    `gorm:"size:255;not null"`
    Price       float64   `gorm:"type:decimal(10,2);not null"`
    Stock       uint32    `gorm:"not null;default:0"`
    StockFrozen uint32    `gorm:"not null;default:0"`
    Version     uint32    `gorm:"not null;default:0"`
    UpdatedAt   time.Time
}

6.4 TCC 完整实现

// ====== 订单服务:TCC 接口 ======

type OrderTCC struct {
    db *gorm.DB
}

// Try:创建"预占"状态的订单
func (s *OrderTCC) Try(ctx context.Context, xid, branchID string, req CreateOrderReq) error {
    return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
        // 1. 幂等检查(防止重复 Try)
        fence := TCCFence{XID: xid, BranchID: branchID, Op: 1}
        result := tx.Create(&fence)
        if result.Error != nil {
            if isDuplicateKeyError(result.Error) {
                return nil // 已经执行过,幂等返回成功
            }
            return result.Error
        }

        // 2. 创建 pending 状态的订单
        order := Order{
            OrderNo:   fmt.Sprintf("ORD-%s", xid),
            UserID:    req.UserID,
            ProductID: req.ProductID,
            Quantity:  req.Quantity,
            Amount:    req.Amount,
            Status:    OrderStatusPending,
        }
        return tx.Create(&order).Error
    })
}

// Confirm:将订单改为 confirmed
func (s *OrderTCC) Confirm(ctx context.Context, xid, branchID string) error {
    return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
        // 1. 幂等检查
        fence := TCCFence{XID: xid, BranchID: branchID, Op: 2}
        result := tx.Create(&fence)
        if result.Error != nil {
            if isDuplicateKeyError(result.Error) {
                return nil
            }
            return result.Error
        }

        // 2. 更新订单状态
        r := tx.Model(&Order{}).
            Where("order_no = ? AND status = ?", fmt.Sprintf("ORD-%s", xid), OrderStatusPending).
            Update("status", OrderStatusConfirmed)
        if r.Error != nil {
            return r.Error
        }
        // RowsAffected=0 说明订单不存在或已被修改,Confirm 阶段按成功处理(幂等)
        return nil
    })
}

// Cancel:将订单改为 cancelled(空回滚安全)
func (s *OrderTCC) Cancel(ctx context.Context, xid, branchID string) error {
    return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
        // 1. 检查 Try 是否执行过(空回滚判断)
        var tryFence TCCFence
        err := tx.Where("xid = ? AND branch_id = ? AND op = 1", xid, branchID).
            First(&tryFence).Error
        if errors.Is(err, gorm.ErrRecordNotFound) {
            // Try 未执行,记录 Cancel fence 后直接返回(空回滚)
            cancelFence := TCCFence{XID: xid, BranchID: branchID, Op: 3}
            tx.Create(&cancelFence) // 忽略重复插入错误
            return nil
        }

        // 2. 幂等检查
        cancelFence := TCCFence{XID: xid, BranchID: branchID, Op: 3}
        result := tx.Create(&cancelFence)
        if result.Error != nil {
            if isDuplicateKeyError(result.Error) {
                return nil
            }
            return result.Error
        }

        // 3. 取消订单
        return tx.Model(&Order{}).
            Where("order_no = ? AND status = ?", fmt.Sprintf("ORD-%s", xid), OrderStatusPending).
            Update("status", OrderStatusCancelled).Error
    })
}

// ====== 商品服务:TCC 接口 ======

type StockTCC struct {
    db *gorm.DB
}

// Try:冻结库存
func (s *StockTCC) Try(ctx context.Context, xid, branchID string, productID uint64, qty uint32) error {
    return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
        // 1. 幂等检查
        fence := TCCFence{XID: xid, BranchID: branchID, Op: 1}
        result := tx.Create(&fence)
        if result.Error != nil {
            if isDuplicateKeyError(result.Error) {
                return nil
            }
            return result.Error
        }

        // 2. 冻结库存:stock 不变,stock_frozen 增加
        //    条件:stock - stock_frozen >= qty(可用库存足够)
        r := tx.Model(&Product{}).
            Where("id = ? AND stock - stock_frozen >= ?", productID, qty).
            Updates(map[string]interface{}{
                "stock_frozen": gorm.Expr("stock_frozen + ?", qty),
                "version":      gorm.Expr("version + 1"),
            })
        if r.Error != nil {
            return r.Error
        }
        if r.RowsAffected == 0 {
            return errors.New("insufficient stock")
        }
        return nil
    })
}

// Confirm:真正扣减库存
func (s *StockTCC) Confirm(ctx context.Context, xid, branchID string, productID uint64, qty uint32) error {
    return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
        // 1. 幂等检查
        fence := TCCFence{XID: xid, BranchID: branchID, Op: 2}
        result := tx.Create(&fence)
        if result.Error != nil {
            if isDuplicateKeyError(result.Error) {
                return nil
            }
            return result.Error
        }

        // 2. 扣减库存:stock 和 stock_frozen 同时减少
        r := tx.Model(&Product{}).
            Where("id = ? AND stock_frozen >= ?", productID, qty).
            Updates(map[string]interface{}{
                "stock":        gorm.Expr("stock - ?", qty),
                "stock_frozen": gorm.Expr("stock_frozen - ?", qty),
                "version":      gorm.Expr("version + 1"),
            })
        if r.Error != nil {
            return r.Error
        }
        // RowsAffected=0:幂等处理,视为成功(Confirm 阶段不应失败)
        return nil
    })
}

// Cancel:解冻库存(空回滚安全)
func (s *StockTCC) Cancel(ctx context.Context, xid, branchID string, productID uint64, qty uint32) error {
    return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
        // 1. 空回滚判断
        var tryFence TCCFence
        err := tx.Where("xid = ? AND branch_id = ? AND op = 1", xid, branchID).
            First(&tryFence).Error
        if errors.Is(err, gorm.ErrRecordNotFound) {
            cancelFence := TCCFence{XID: xid, BranchID: branchID, Op: 3}
            tx.Create(&cancelFence)
            return nil
        }

        // 2. 幂等检查
        cancelFence := TCCFence{XID: xid, BranchID: branchID, Op: 3}
        result := tx.Create(&cancelFence)
        if result.Error != nil {
            if isDuplicateKeyError(result.Error) {
                return nil
            }
            return result.Error
        }

        // 3. 解冻库存
        return tx.Model(&Product{}).
            Where("id = ? AND stock_frozen >= ?", productID, qty).
            Updates(map[string]interface{}{
                "stock_frozen": gorm.Expr("stock_frozen - ?", qty),
                "version":      gorm.Expr("version + 1"),
            }).Error
    })
}

// ====== 协调者:驱动整个 TCC 流程 ======

type TCCCoordinator struct {
    orderTCC *OrderTCC
    stockTCC *StockTCC
}

func (c *TCCCoordinator) PlaceOrder(ctx context.Context, req CreateOrderReq) error {
    xid := generateXID() // 全局唯一事务 ID

    // ---- Phase 1: Try ----
    // 调用各参与者的 Try,任意失败则全部 Cancel

    err := c.orderTCC.Try(ctx, xid, "branch-order", req)
    if err != nil {
        // 订单 Try 失败,无需 Cancel(什么都没做)
        return fmt.Errorf("order try failed: %w", err)
    }

    err = c.stockTCC.Try(ctx, xid, "branch-stock", req.ProductID, uint32(req.Quantity))
    if err != nil {
        // 库存 Try 失败,Cancel 已成功的订单 Try
        c.orderTCC.Cancel(ctx, xid, "branch-order")
        return fmt.Errorf("stock try failed: %w", err)
    }

    // ---- Phase 2: Confirm ----
    // 所有 Try 成功,执行 Confirm
    // Confirm 失败需要重试(不能 Cancel,资源已预留)

    if err = c.orderTCC.Confirm(ctx, xid, "branch-order"); err != nil {
        // 实际生产中,Confirm 失败要持久化重试,不能直接报错
        return fmt.Errorf("order confirm failed (need retry): %w", err)
    }

    if err = c.stockTCC.Confirm(ctx, xid, "branch-stock", req.ProductID, uint32(req.Quantity)); err != nil {
        return fmt.Errorf("stock confirm failed (need retry): %w", err)
    }

    return nil
}

// 工具函数
func generateXID() string {
    return fmt.Sprintf("%d-%s", time.Now().UnixNano(), uuid.New().String()[:8])
}

func isDuplicateKeyError(err error) bool {
    var mysqlErr *mysql.MySQLError
    return errors.As(err, &mysqlErr) && mysqlErr.Number == 1062
}

6.5 TCC 的三大陷阱

陷阱 1:悬挂(Suspension)
  Cancel 先于 Try 到达(网络乱序)
  Cancel 执行时找不到 Try 记录,做空回滚
  随后 Try 到达,执行了预留,但再也不会有 Confirm/Cancel
  → 解决:Cancel 记录 fence 后,Try 检查到 cancel fence 则拒绝执行

陷阱 2:空回滚(Empty Cancel)
  Try 超时未执行(或网络丢包),Cancel 被调用
  Cancel 必须能识别"Try 未执行"的情况,直接返回成功
  → 解决:上面代码中的空回滚判断逻辑

陷阱 3:幂等(Idempotency)
  Confirm/Cancel 因重试被调用多次
  必须保证结果相同,不能重复扣减
  → 解决:tcc_fence 表的唯一主键,重复插入失败则返回成功

7. Saga 模式

7.1 基本思想

Saga 将分布式长事务拆分为若干本地事务,每个本地事务有对应的补偿事务。正向流程失败时,逆序执行补偿事务恢复数据。

下单 Saga 流程:

正向(Forward):
  T1: 订单服务  → 创建订单(status=pending)
  T2: 商品服务  → 扣减库存
  T3: 账户服务  → 扣减余额
  T4: 订单服务  → 更新订单状态(status=confirmed)

补偿(Compensate,T3 失败时从后往前):
  C2: 商品服务  → 恢复库存
  C1: 订单服务  → 取消订单(status=cancelled)

7.2 两种协调方式

编排(Choreography):各服务通过事件总线(MQ)驱动,无中央协调者。

订单服务
  → 发布 OrderCreated 事件
    → 商品服务消费,扣减库存
      → 发布 StockDeducted 事件
        → 账户服务消费,扣减余额
          → 发布 PaymentCompleted 事件
            → 订单服务消费,更新状态为 confirmed

任意步骤失败:
  → 发布 XXXFailed 事件
    → 上游服务监听并执行补偿

协同(Orchestration):由 Saga 协调者(状态机)统一驱动各步骤,逻辑集中,易于管理。

// Saga 执行状态记录
type SagaExecution struct {
    ID          uint64    `gorm:"primaryKey;autoIncrement"`
    SagaID      string    `gorm:"uniqueIndex;size:64"`
    BizType     string    `gorm:"size:64;not null"`
    Status      string    `gorm:"type:enum('running','completed','compensating','failed');default:running"`
    CurrentStep int       `gorm:"default:0"`
    CreatedAt   time.Time
    UpdatedAt   time.Time
}

type SagaStepLog struct {
    ID           uint64    `gorm:"primaryKey;autoIncrement"`
    SagaID       string    `gorm:"index;size:64"`
    StepNo       int       `gorm:"not null"`
    StepName     string    `gorm:"size:128;not null"`
    Action       string    `gorm:"type:enum('forward','compensate')"`
    Status       string    `gorm:"type:enum('pending','success','failed')"`
    RequestJSON  string    `gorm:"type:json"`
    ResponseJSON string    `gorm:"type:json"`
    CreatedAt    time.Time
}

8. 本地消息表(Transactional Outbox)

本地消息表是互联网系统中最常用的分布式事务方案,实现简单、可靠性高,是最终一致性的典型落地。

8.1 整体架构

┌─────────────────────────────────────────────────────────────────┐
│                         订单服务                                  │
│                                                                 │
│  HTTP 请求 ──▶  CreateOrder()                                   │
│                    │                                            │
│                    ▼                                            │
│              BEGIN TRANSACTION                                  │
│                INSERT orders (status=created)                   │
│                INSERT outbox_messages (status=pending)  ◀─┐    │
│              COMMIT                          同一事务      │    │
│                                                           │    │
│  ┌─────────────────────────────────────┐                  │    │
│  │         消息中继器(Relay)           │                  │    │
│  │  轮询 outbox_messages(status=pending)│                  │    │
│  │  → 发送到 MQ                         │                  │    │
│  │  → 更新 status=sent                  │──────────────────┘    │
│  └─────────────────────────────────────┘                        │
└─────────────────────────────────────────────────────────────────┘
                              │
                              │ MQ(Kafka/RocketMQ)
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                         商品服务                                  │
│                                                                 │
│  MQ 消费者 ──▶ HandleStockDeduct()                              │
│                    │                                            │
│                    ▼                                            │
│              幂等检查(consumed_messages)                        │
│                    │                                            │
│                    ▼                                            │
│              BEGIN TRANSACTION                                  │
│                UPDATE products SET stock = stock - qty          │
│                INSERT consumed_messages (message_id)            │
│              COMMIT                                             │
│                    │                                            │
│              ACK 消息(通知 MQ 消费成功)                         │
└─────────────────────────────────────────────────────────────────┘

8.2 数据库表设计

订单服务(order_db)

-- 订单表
CREATE TABLE orders (
    id           BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    order_no     VARCHAR(64) NOT NULL UNIQUE   COMMENT '业务单号,全局唯一',
    user_id      BIGINT UNSIGNED NOT NULL,
    product_id   BIGINT UNSIGNED NOT NULL,
    quantity     INT UNSIGNED NOT NULL,
    amount       DECIMAL(10, 2) NOT NULL,
    status       ENUM('created','confirmed','cancelled') NOT NULL DEFAULT 'created',
    created_at   DATETIME NOT NULL DEFAULT NOW(),
    updated_at   DATETIME NOT NULL DEFAULT NOW() ON UPDATE NOW(),
    INDEX idx_user_id (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 发件箱消息表(Outbox)
CREATE TABLE outbox_messages (
    id           BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    message_id   VARCHAR(64)  NOT NULL UNIQUE  COMMENT '全局唯一消息ID,消费者用于去重',
    topic        VARCHAR(128) NOT NULL          COMMENT 'MQ Topic',
    payload      JSON         NOT NULL          COMMENT '消息内容',
    status       ENUM('pending','sent','failed') NOT NULL DEFAULT 'pending',
    retry_count  TINYINT UNSIGNED NOT NULL DEFAULT 0,
    next_retry   DATETIME                        COMMENT '下次重试时间,NULL 表示立即可重试',
    created_at   DATETIME NOT NULL DEFAULT NOW(),
    sent_at      DATETIME                        COMMENT '成功发送时间',
    INDEX idx_pending (status, next_retry)       COMMENT '中继器轮询索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

商品服务(product_db)

-- 商品表
CREATE TABLE products (
    id           BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    name         VARCHAR(255) NOT NULL,
    price        DECIMAL(10, 2) NOT NULL,
    stock        INT UNSIGNED NOT NULL DEFAULT 0  COMMENT '可用库存',
    version      INT UNSIGNED NOT NULL DEFAULT 0  COMMENT '乐观锁版本号',
    updated_at   DATETIME NOT NULL DEFAULT NOW() ON UPDATE NOW()
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 已消费消息表(去重/幂等)
CREATE TABLE consumed_messages (
    message_id   VARCHAR(64) NOT NULL PRIMARY KEY COMMENT '与 outbox_messages.message_id 对应',
    consumed_at  DATETIME NOT NULL DEFAULT NOW(),
    INDEX idx_consumed_at (consumed_at)  COMMENT '用于清理过期记录'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 库存不足时的补偿发件箱(商品服务自己的 Outbox)
CREATE TABLE outbox_messages (
    id           BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    message_id   VARCHAR(64)  NOT NULL UNIQUE,
    topic        VARCHAR(128) NOT NULL,
    payload      JSON         NOT NULL,
    status       ENUM('pending','sent','failed') NOT NULL DEFAULT 'pending',
    retry_count  TINYINT UNSIGNED NOT NULL DEFAULT 0,
    next_retry   DATETIME,
    created_at   DATETIME NOT NULL DEFAULT NOW(),
    sent_at      DATETIME,
    INDEX idx_pending (status, next_retry)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

8.3 GORM 模型定义

// ====== 订单服务 models(order_db)======

type OrderStatus string

const (
    OrderStatusCreated   OrderStatus = "created"
    OrderStatusConfirmed OrderStatus = "confirmed"
    OrderStatusCancelled OrderStatus = "cancelled"
)

type Order struct {
    ID        uint64      `gorm:"primaryKey;autoIncrement"`
    OrderNo   string      `gorm:"uniqueIndex;size:64;not null"`
    UserID    uint64      `gorm:"not null;index"`
    ProductID uint64      `gorm:"not null"`
    Quantity  uint32      `gorm:"not null"`
    Amount    float64     `gorm:"type:decimal(10,2);not null"`
    Status    OrderStatus `gorm:"type:enum('created','confirmed','cancelled');default:created"`
    CreatedAt time.Time
    UpdatedAt time.Time
}

type MessageStatus string

const (
    MessageStatusPending MessageStatus = "pending"
    MessageStatusSent    MessageStatus = "sent"
    MessageStatusFailed  MessageStatus = "failed"
)

type OutboxMessage struct {
    ID         uint64        `gorm:"primaryKey;autoIncrement"`
    MessageID  string        `gorm:"uniqueIndex;size:64;not null"`
    Topic      string        `gorm:"size:128;not null"`
    Payload    datatypes.JSON `gorm:"type:json;not null"`
    Status     MessageStatus `gorm:"type:enum('pending','sent','failed');default:pending"`
    RetryCount uint8         `gorm:"default:0"`
    NextRetry  *time.Time    `gorm:"index"`
    CreatedAt  time.Time
    SentAt     *time.Time
}

// ====== 商品服务 models(product_db)======

type Product struct {
    ID        uint64    `gorm:"primaryKey;autoIncrement"`
    Name      string    `gorm:"size:255;not null"`
    Price     float64   `gorm:"type:decimal(10,2);not null"`
    Stock     uint32    `gorm:"not null;default:0"`
    Version   uint32    `gorm:"not null;default:0"`
    UpdatedAt time.Time
}

type ConsumedMessage struct {
    MessageID  string    `gorm:"primaryKey;size:64"`
    ConsumedAt time.Time `gorm:"index"`
}

// ====== 消息 Payload 结构体 ======

// 订单服务 → 商品服务:扣减库存
type StockDeductPayload struct {
    OrderNo   string  `json:"order_no"`
    ProductID uint64  `json:"product_id"`
    Quantity  uint32  `json:"quantity"`
    Amount    float64 `json:"amount"`
}

// 商品服务 → 订单服务:库存不足,请求取消订单
type OrderCancelPayload struct {
    OrderNo string `json:"order_no"`
    Reason  string `json:"reason"`
}

8.4 生产者:订单服务

步骤一:创建订单(业务操作 + 写消息表,同一事务)

type OrderService struct {
    db *gorm.DB
}

type CreateOrderReq struct {
    UserID    uint64
    ProductID uint64
    Quantity  uint32
    Amount    float64
}

func (s *OrderService) CreateOrder(ctx context.Context, req CreateOrderReq) (*Order, error) {
    orderNo := generateOrderNo() // 全局唯一单号
    messageID := uuid.New().String()

    var order Order

    // 关键:业务操作和写消息表在同一个本地事务
    // 要么都成功,要么都失败,天然原子性
    err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
        // 1. 创建订单
        order = Order{
            OrderNo:   orderNo,
            UserID:    req.UserID,
            ProductID: req.ProductID,
            Quantity:  req.Quantity,
            Amount:    req.Amount,
            Status:    OrderStatusCreated,
        }
        if err := tx.Create(&order).Error; err != nil {
            return fmt.Errorf("create order: %w", err)
        }

        // 2. 写入发件箱消息表
        payload, _ := json.Marshal(StockDeductPayload{
            OrderNo:   orderNo,
            ProductID: req.ProductID,
            Quantity:  req.Quantity,
            Amount:    req.Amount,
        })
        msg := OutboxMessage{
            MessageID: messageID,
            Topic:     "order.stock.deduct",
            Payload:   datatypes.JSON(payload),
            Status:    MessageStatusPending,
        }
        if err := tx.Create(&msg).Error; err != nil {
            return fmt.Errorf("create outbox message: %w", err)
        }

        return nil
    })

    if err != nil {
        return nil, err
    }
    return &order, nil
}

// generateOrderNo 生成全局唯一订单号
func generateOrderNo() string {
    return fmt.Sprintf("ORD%s%06d",
        time.Now().Format("20060102150405"),
        rand.Intn(999999),
    )
}

这一步的可靠性分析

场景 1:事务提交成功
  → 订单和消息都写入,等待中继器发送 ✅

场景 2:订单写入失败(如唯一键冲突)
  → 事务回滚,消息也不存在,客户端收到错误后重试 ✅

场景 3:消息写入失败
  → 事务回滚,订单也不存在,客户端收到错误后重试 ✅

场景 4:事务提交时 MySQL 宕机
  → 事务未提交,订单和消息都不存在(WAL 保证原子性)✅

结论:这一步不存在"订单有消息没有"或"消息有订单没有"的情况

步骤二:消息中继器(Relay)——将消息从数据库发送到 MQ

type MessageRelay struct {
    db  *gorm.DB
    mq  MQPublisher
    log *zap.Logger
}

// MQPublisher MQ 发布接口(可对接 Kafka、RocketMQ 等)
type MQPublisher interface {
    Publish(ctx context.Context, topic, messageID string, payload []byte) error
}

// Run 启动中继器(作为后台 goroutine 运行)
func (r *MessageRelay) Run(ctx context.Context) {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            r.log.Info("message relay stopped")
            return
        case <-ticker.C:
            if err := r.sendBatch(ctx); err != nil {
                r.log.Error("relay send batch failed", zap.Error(err))
            }
        }
    }
}

// sendBatch 批量发送一批待发消息
func (r *MessageRelay) sendBatch(ctx context.Context) error {
    var messages []OutboxMessage

    // 查询待发消息:status=pending 且 next_retry 未到或为空
    err := r.db.WithContext(ctx).
        Where("status = ? AND (next_retry IS NULL OR next_retry <= ?)",
            MessageStatusPending, time.Now()).
        Order("id ASC").
        Limit(100).
        Find(&messages).Error
    if err != nil {
        return fmt.Errorf("query pending messages: %w", err)
    }

    for _, msg := range messages {
        r.sendOne(ctx, msg)
    }
    return nil
}

// sendOne 发送单条消息,包含完整的错误处理
func (r *MessageRelay) sendOne(ctx context.Context, msg OutboxMessage) {
    // 发送到 MQ
    err := r.mq.Publish(ctx, msg.Topic, msg.MessageID, msg.Payload)
    if err != nil {
        r.log.Warn("publish message failed",
            zap.String("message_id", msg.MessageID),
            zap.Uint8("retry_count", msg.RetryCount),
            zap.Error(err),
        )
        r.markFailed(ctx, msg)
        return
    }

    // 发送成功,更新状态
    now := time.Now()
    result := r.db.WithContext(ctx).Model(&msg).Updates(map[string]interface{}{
        "status": MessageStatusSent,
        "sent_at": now,
    })
    if result.Error != nil {
        // 更新失败无影响:下次轮询会重新发送,消费者通过幂等处理重复
        r.log.Warn("mark message sent failed",
            zap.String("message_id", msg.MessageID),
            zap.Error(result.Error),
        )
    }
}

// markFailed 发送失败时,指数退避重试
func (r *MessageRelay) markFailed(ctx context.Context, msg OutboxMessage) {
    newRetryCount := msg.RetryCount + 1

    // 指数退避:1s, 2s, 4s, 8s, 16s, 32s ...
    backoff := time.Duration(1<<newRetryCount) * time.Second
    if backoff > 10*time.Minute {
        backoff = 10 * time.Minute // 最大退避 10 分钟
    }
    nextRetry := time.Now().Add(backoff)

    updates := map[string]interface{}{
        "retry_count": newRetryCount,
        "next_retry":  nextRetry,
    }

    // 超过最大重试次数,标记为 failed,触发告警人工介入
    const maxRetry = 10
    if newRetryCount >= maxRetry {
        updates["status"] = MessageStatusFailed
        r.log.Error("message exceeded max retries, marked as failed",
            zap.String("message_id", msg.MessageID),
            zap.Uint8("retry_count", newRetryCount),
        )
        // 实际生产中这里触发告警(钉钉/PagerDuty)
    }

    r.db.WithContext(ctx).Model(&msg).Updates(updates)
}

中继器各阶段错误分析

场景 1:MQ 短暂不可用
  → Publish 失败,写 next_retry(指数退避)
  → 等 MQ 恢复后自动重试 ✅

场景 2:MQ 发送成功,但更新 status=sent 失败
  → 消息被重新轮询,再次发送(重复投递)
  → 消费者通过 consumed_messages 去重,幂等处理 ✅

场景 3:中继器进程崩溃(重启)
  → pending 消息仍在数据库,重启后继续发送 ✅

场景 4:MQ 持续不可用,重试超限
  → status=failed,触发告警,人工介入(手动重置 status=pending)✅

场景 5:同一消息被两个 Relay 实例同时发送(多实例部署)
  → 两个实例都发送成功,消费者收到两条相同 message_id 的消息
  → consumed_messages 幂等去重 ✅

步骤三:订单确认(消费商品服务的回调)

// 商品服务扣减成功后,会发送 order.confirmed 消息
// 订单服务消费该消息,将订单状态更新为 confirmed

type OrderConsumer struct {
    db  *gorm.DB
    log *zap.Logger
}

func (c *OrderConsumer) HandleOrderConfirm(ctx context.Context, msg MQMessage) error {
    var payload struct {
        OrderNo string `json:"order_no"`
    }
    if err := json.Unmarshal(msg.Payload, &payload); err != nil {
        // 消息格式错误,不重试,直接 ACK(避免毒丸消息阻塞队列)
        c.log.Error("invalid message payload", zap.String("message_id", msg.ID))
        return nil
    }

    return c.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
        // 幂等:UPDATE 本身是幂等的(重复执行结果相同)
        return tx.Model(&Order{}).
            Where("order_no = ? AND status = ?", payload.OrderNo, OrderStatusCreated).
            Update("status", OrderStatusConfirmed).Error
    })
}

func (c *OrderConsumer) HandleOrderCancel(ctx context.Context, msg MQMessage) error {
    var payload OrderCancelPayload
    if err := json.Unmarshal(msg.Payload, &payload); err != nil {
        c.log.Error("invalid message payload", zap.String("message_id", msg.ID))
        return nil
    }

    return c.db.WithContext(ctx).
        Model(&Order{}).
        Where("order_no = ? AND status = ?", payload.OrderNo, OrderStatusCreated).
        Update("status", OrderStatusCancelled).Error
}

8.5 消费者:商品服务

完整消费逻辑

type StockConsumer struct {
    db  *gorm.DB
    log *zap.Logger
}

// HandleStockDeduct 消费"扣减库存"消息
// MQ 框架会在此函数返回 nil 时 ACK,返回 error 时 NACK(重新投递)
func (c *StockConsumer) HandleStockDeduct(ctx context.Context, msg MQMessage) error {
    // ---- 解析消息 ----
    var payload StockDeductPayload
    if err := json.Unmarshal(msg.Payload, &payload); err != nil {
        // 消息格式错误:不重试,记录日志后 ACK
        // 这类消息不应该阻塞后续正常消息的消费
        c.log.Error("invalid stock deduct payload",
            zap.String("message_id", msg.ID),
            zap.ByteString("payload", msg.Payload),
        )
        return nil // 返回 nil = ACK,不重试
    }

    // ---- 幂等检查 ----
    // 先查再插,避免每次都触发唯一键冲突(高并发下减少异常)
    var count int64
    c.db.WithContext(ctx).Model(&ConsumedMessage{}).
        Where("message_id = ?", msg.ID).Count(&count)
    if count > 0 {
        c.log.Info("message already consumed, skip",
            zap.String("message_id", msg.ID),
        )
        return nil // 已消费,直接 ACK
    }

    // ---- 核心业务:扣减库存 ----
    err := c.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
        // 1. 插入消费记录(幂等锁)
        //    若并发重复消费,只有一个能插入成功
        consumed := ConsumedMessage{
            MessageID:  msg.ID,
            ConsumedAt: time.Now(),
        }
        result := tx.Create(&consumed)
        if result.Error != nil {
            if isDuplicateKeyError(result.Error) {
                // 并发情况下的重复消费,直接返回 nil(幂等)
                return nil
            }
            return fmt.Errorf("insert consumed_message: %w", result.Error)
        }

        // 2. 扣减库存(带乐观锁)
        r := tx.Model(&Product{}).
            Where("id = ? AND stock >= ?", payload.ProductID, payload.Quantity).
            Updates(map[string]interface{}{
                "stock":   gorm.Expr("stock - ?", payload.Quantity),
                "version": gorm.Expr("version + 1"),
            })
        if r.Error != nil {
            return fmt.Errorf("deduct stock: %w", r.Error)
        }

        if r.RowsAffected == 0 {
            // 库存不足!需要发补偿消息,通知订单服务取消订单
            // 补偿消息也写入本服务的 outbox,保证原子性
            return c.writeCompensationMessage(tx, payload)
        }

        // 3. 库存扣减成功,写确认消息(通知订单服务确认订单)
        return c.writeConfirmMessage(tx, payload)
    })

    if err != nil {
        // 事务失败(DB 故障等),返回 error 让 MQ 重新投递
        c.log.Error("handle stock deduct failed",
            zap.String("message_id", msg.ID),
            zap.Error(err),
        )
        return err // 返回 error = NACK,MQ 重新投递
    }

    return nil // 返回 nil = ACK
}

// writeCompensationMessage 库存不足时,写补偿消息(取消订单)
func (c *StockConsumer) writeCompensationMessage(tx *gorm.DB, payload StockDeductPayload) error {
    c.log.Warn("insufficient stock, writing cancel order message",
        zap.String("order_no", payload.OrderNo),
        zap.Uint32("required", payload.Quantity),
    )

    cancelPayload, _ := json.Marshal(OrderCancelPayload{
        OrderNo: payload.OrderNo,
        Reason:  fmt.Sprintf("insufficient stock for product %d", payload.ProductID),
    })
    msg := OutboxMessage{
        MessageID: uuid.New().String(),
        Topic:     "stock.order.cancel", // 订单服务监听此 Topic
        Payload:   datatypes.JSON(cancelPayload),
        Status:    MessageStatusPending,
    }
    return tx.Create(&msg).Error
}

// writeConfirmMessage 扣减成功时,写确认消息(确认订单)
func (c *StockConsumer) writeConfirmMessage(tx *gorm.DB, payload StockDeductPayload) error {
    confirmPayload, _ := json.Marshal(map[string]string{
        "order_no": payload.OrderNo,
    })
    msg := OutboxMessage{
        MessageID: uuid.New().String(),
        Topic:     "stock.order.confirm", // 订单服务监听此 Topic
        Payload:   datatypes.JSON(confirmPayload),
        Status:    MessageStatusPending,
    }
    return tx.Create(&msg).Error
}

消费者各阶段错误分析

场景 1:消息格式错误(Bad Payload)
  → 记录日志,直接 ACK
  → 不重试,避免毒丸消息无限阻塞队列 ✅

场景 2:幂等检查通过,但事务中途消费者进程崩溃
  → consumed_messages 未插入(事务回滚)
  → MQ 超时后重新投递
  → 消费者重新消费,幂等检查未命中,正常处理 ✅

场景 3:库存扣减成功,但写 consumed_messages 失败
  → 事务回滚(stock 恢复)
  → MQ 重新投递,消费者重新执行 ✅

场景 4:整个事务成功,但 ACK 前消费者崩溃
  → MQ 重新投递
  → consumed_messages 已存在,幂等检查命中,直接 ACK ✅

场景 5:库存不足,写补偿消息失败
  → 事务回滚
  → MQ 重新投递,重新处理
  → 库存仍然不足,再次写补偿消息,最终写入成功 ✅

场景 6:补偿消息发送失败(商品服务 Relay 发送取消消息失败)
  → 同样走 Outbox 模式,指数退避重试
  → 最终订单服务收到取消消息 ✅

8.6 完整数据流时序图

正常下单成功流程:

客户端          订单服务           order_db         MQ             商品服务         product_db
  │               │                  │              │                │                 │
  │──CreateOrder─▶│                  │              │                │                 │
  │               │──BEGIN──────────▶│              │                │                 │
  │               │──INSERT orders──▶│              │                │                 │
  │               │──INSERT outbox──▶│              │                │                 │
  │               │──COMMIT─────────▶│              │                │                 │
  │◀──200 OK──────│                  │              │                │                 │
  │               │                  │              │                │                 │
  │          [Relay轮询]              │              │                │                 │
  │               │──SELECT outbox──▶│              │                │                 │
  │               │◀─pending msg─────│              │                │                 │
  │               │──────────Publish(stock.deduct)─▶│                │                 │
  │               │──UPDATE sent────▶│              │                │                 │
  │               │                  │              │                │                 │
  │               │                  │              │──deliver──────▶│                 │
  │               │                  │              │                │──BEGIN─────────▶│
  │               │                  │              │                │──UPDATE stock──▶│
  │               │                  │              │                │──INSERT consumed▶│
  │               │                  │              │                │──INSERT outbox──▶│
  │               │                  │              │                │──COMMIT────────▶│
  │               │                  │              │◀──ACK──────────│                 │
  │               │                  │              │                │                 │
  │               │                  │     [商品服务Relay轮询]        │                 │
  │               │                  │              │◀─Publish(stock.order.confirm)────│
  │               │                  │              │                │                 │
  │               │◀──deliver────────────────────── │                │                 │
  │               │──UPDATE confirmed▶│             │                │                 │


库存不足流程(补偿):

  ...(同上,直到商品服务消费消息)
                                                    │──BEGIN─────────▶│
                                                    │──stock不足,rows=0▶│
                                                    │──INSERT outbox(cancel)▶│
                                                    │──INSERT consumed▶│
                                                    │──COMMIT────────▶│
                                                    │◀──ACK───────────│
  [商品服务Relay发送取消消息]
              │◀──deliver(stock.order.cancel)──────│
              │──UPDATE cancelled─▶│

8.7 消息清理

// CleanConsumedMessages 定期清理过期的消费记录
// consumed_messages 表会持续增长,需要定期清理
func CleanConsumedMessages(ctx context.Context, db *gorm.DB) error {
    // 保留最近 7 天的消费记录(覆盖 MQ 最大重试窗口即可)
    cutoff := time.Now().AddDate(0, 0, -7)
    return db.WithContext(ctx).
        Where("consumed_at < ?", cutoff).
        Delete(&ConsumedMessage{}).Error
}

// CleanSentMessages 定期清理已发送的消息
func CleanSentMessages(ctx context.Context, db *gorm.DB) error {
    cutoff := time.Now().AddDate(0, 0, -3)
    return db.WithContext(ctx).
        Where("status = ? AND sent_at < ?", MessageStatusSent, cutoff).
        Delete(&OutboxMessage{}).Error
}

8.8 各环节故障总结

故障位置 现象 自动恢复方式
订单服务写 DB 失败 事务回滚,订单和消息都不存在 客户端重试
中继器发送 MQ 失败 消息留在 outbox,status=pending 指数退避自动重试
中继器更新 sent 失败 消息被重复发送 消费者幂等处理
MQ 宕机 消息积压在 outbox MQ 恢复后自动发送
消费者处理中崩溃 consumed 未写入,事务回滚 MQ 重新投递
消费者 DB 失败 事务回滚,返回 NACK MQ 重新投递
库存不足 写补偿消息(取消订单) 自动补偿流程
补偿消息发送失败 消息在商品服务 outbox 重试 自动重试到成功

9. 方案对比与选型

9.1 综合对比

方案 一致性 可用性 吞吐量 业务改造 实现难度 适用场景
MySQL XA 强一致 跨库、低并发、内部系统
TCC 较强(Try预占) 大(3接口) 金融支付、强一致要求
Saga 最终一致 中(2接口) 长流程、多步骤编排
本地消息表 最终一致 通用异步解耦,推荐首选

9.2 选型决策树

Q1: 业务能否改造为单库事务?
  → 能:直接用 InnoDB 本地事务,最简单可靠
  → 不能:继续 Q2

Q2: 业务能否接受最终一致性(秒级延迟)?
  → 不能(如实时转账、强监管场景):选 TCC
  → 能:继续 Q3

Q3: 业务是否是长流程(步骤 >= 3,耗时 >= 秒级)?
  → 是:选 Saga(Orchestration 模式)
  → 否:选 本地消息表(最简单)

实践建议:
  互联网电商、内容、社交 → 本地消息表(80% 的场景)
  金融支付、账务系统     → TCC(DTM/Seata 框架)
  订单履约长流程         → Saga
  内部数据同步、批处理   → MySQL XA

9.3 常用开源框架

框架 语言 支持模式 说明
DTM Go TCC/Saga/XA/消息 轻量,Go 生态首选
Seata Java AT/TCC/Saga/XA 阿里开源,Java 生态主流
RocketMQ 事务消息 Java/Go 消息最终一致 本地消息表的托管版本

RocketMQ 事务消息 本质上是云托管的本地消息表,省去了自己实现中继器的工作,有条件的可以直接使用。