MySQL-14 分布式事务
MySQL 分布式事务
目录
- 为什么需要分布式事务
- 理论基础:CAP 与 BASE
- 2PC:两阶段提交
- 3PC:三阶段提交
- MySQL XA 事务
- TCC 模式
- Saga 模式
- 本地消息表(Transactional Outbox)
- 方案对比与选型
1. 为什么需要分布式事务
1.1 问题场景
单机 InnoDB 事务能保证同一数据库内的 ACID。但在微服务架构下,一次下单操作可能跨越多个独立服务、独立数据库:
用户点击"立即购买",触发以下操作:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 订单服务 │ │ 商品服务 │ │ 账户服务 │
│ order_db │ │ product_db │ │ account_db │
│ │ │ │ │ │
│ INSERT orders │ │ UPDATE stock │ │ UPDATE balance │
│ (创建订单) │ │ (扣减库存) │ │ (扣减余额) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
三个操作分别在三个独立的数据库中,普通事务无法跨库
可能出现的问题:
| 故障点 | 结果 |
|---|---|
| 订单创建成功,库存扣减失败 | 有订单,无库存扣减 → 超卖 |
| 订单创建成功,扣款失败 | 有订单,钱没扣 → 资损 |
| 网络超时,不知道对方是否成功 | 状态不确定 → 数据不一致 |
1.2 核心难题
分布式事务的核心难题:
1. 原子性:多个节点的操作要么全部成功,要么全部回滚
2. 不确定性:网络超时时,无法判断对方是否已经执行
3. 故障恢复:任意节点崩溃后,系统能自动恢复到一致状态
2. 理论基础:CAP 与 BASE
2.1 CAP 定理
分布式系统三个特性最多同时满足两个:
C(Consistency) 强一致性:所有节点同一时刻看到的数据相同
A(Availability) 可用性:每个请求都能收到响应(非错误)
P(Partition Tolerance) 分区容错性:网络分区时系统仍能运行
网络分区是客观必然(物理网络会发生中断),所以P 必须保证,实际是在 CP 和 AP 中选择:
| 选择 | 典型系统 | 行为 |
|---|---|---|
| CP | ZooKeeper、etcd | 网络分区时拒绝服务,保证数据一致 |
| AP | Cassandra、DNS | 网络分区时继续服务,允许数据短暂不一致 |
2.2 BASE 理论
BASE 是互联网系统对 CAP 中 AP 方向的实践总结,与 ACID 相对:
BA(Basically Available) 基本可用:核心功能正常,非核心功能可以降级
S(Soft State) 软状态:允许节点间数据存在短暂不一致的中间状态
E(Eventually Consistent) 最终一致性:经过一段时间(毫秒~秒级),所有节点数据达到一致
绝大多数互联网业务选择 BASE:放弃实时强一致性,换取高可用和高性能。
3. 2PC:两阶段提交
3.1 角色与流程
2PC(Two-Phase Commit)包含两个角色:
- 协调者(Coordinator):驱动整个流程,做最终决策
- 参与者(Participant):执行实际操作,听从协调者指令
┌────────────────────────────────────────────────────────────┐
│ 阶段一:Prepare │
│ │
│ 协调者 ──Prepare──▶ 参与者A:执行SQL,写Undo/Redo Log,锁资源,回复 Yes │
│ ──Prepare──▶ 参与者B:执行SQL,写Undo/Redo Log,锁资源,回复 Yes │
│ ──Prepare──▶ 参与者C:执行SQL,写Undo/Redo Log,锁资源,回复 No │
└────────────────────────────────────────────────────────────┘
┌────────────────────────────────────────────────────────────┐
│ 阶段二:Commit / Rollback │
│ │
│ 全部 Yes → 协调者 ──Commit──▶ 所有参与者提交事务,释放锁 │
│ 任一 No → 协调者 ──Rollback─▶ 所有参与者回滚事务,释放锁 │
└────────────────────────────────────────────────────────────┘
3.2 2PC 的三个核心问题
问题 1:同步阻塞
Prepare 阶段参与者锁定资源后,必须等协调者的第二阶段指令。期间所有锁一直被持有,其他事务只能等待,性能差。
问题 2:协调者单点故障
时序:
协调者 ──Commit──▶ 参与者A ✅ A 已提交
协调者 宕机
参与者B/C ❓ 没有收到指令,一直阻塞
结果:A 提交了,B/C 没有 → 数据不一致,且无法自动恢复
问题 3:脑裂
协调者发送 Commit,网络分区:
参与者A 收到 Commit → 提交
参与者B 没收到 → 一直等待
即使协调者恢复,也无法区分 B 是网络问题还是已宕机
4. 3PC:三阶段提交
3PC 在 2PC 基础上做了两个改进:增加 CanCommit 预检阶段 + 引入超时自动提交机制。
阶段一:CanCommit
协调者问:你能执行吗?
参与者:检查资源可用性,回复 Yes/No(不锁资源)
阶段二:PreCommit
所有 Yes → 协调者发 PreCommit,参与者锁资源、执行SQL、写日志
参与者等待 DoCommit,若超时则自动中断(解决阻塞)
阶段三:DoCommit
所有 Ack → 协调者发 DoCommit,参与者提交
若参与者等待 DoCommit 超时 → 默认提交(非回滚,这是关键区别)
3PC 的局限:DoCommit 阶段网络分区时,收到 Rollback 的参与者回滚,超时默认提交的参与者提交,仍然不一致。实现复杂而收益有限,实际业务中几乎不直接使用。
5. MySQL XA 事务
MySQL InnoDB 实现了标准 XA 协议,是数据库层面的 2PC 实现。
5.1 XA 基本语法
-- 开始 XA 事务,xid 全局唯一
XA START 'order-20240101-001';
INSERT INTO orders (user_id, product_id, quantity, amount, status)
VALUES (42, 101, 2, 199.00, 'pending');
XA END 'order-20240101-001';
XA PREPARE 'order-20240101-001'; -- Phase 1 完成,持久化到磁盘
-- 协调者做决策
XA COMMIT 'order-20240101-001'; -- Phase 2 提交
-- 或
XA ROLLBACK 'order-20240101-001'; -- Phase 2 回滚
5.2 XA 状态机
XA START → ACTIVE(事务进行中)
XA END → IDLE(SQL 执行完,等待 Prepare)
XA PREPARE → PREPARED(Prepare 成功,可以 Commit 或 Rollback)
XA COMMIT → 结束
XA ROLLBACK → 结束
PREPARED 是关键状态:MySQL 重启后仍然存在,等待协调者指令
5.3 崩溃恢复
-- 查看所有处于 PREPARED 状态的 XA 事务
XA RECOVER;
-- +----------+--------------+--------------+------------------------+
-- | formatID | gtrid_length | bqual_length | data |
-- +----------+--------------+--------------+------------------------+
-- | 1 | 20 | 0 | order-20240101-001 |
-- 根据协调者日志决定提交还是回滚
XA COMMIT 'order-20240101-001';
XA ROLLBACK 'order-20240101-001';
5.4 XA 的限制
1. 性能差:Prepare 阶段行锁持续到 Commit,等待期间阻塞大量请求
2. 不支持与普通 BEGIN 事务嵌套
3. 不支持 SAVEPOINT
4. 协调者本身仍然是单点,宕机后需要人工干预
5. 适用于跨库但同机构的低并发场景,不适合高并发互联网业务
6. TCC 模式
TCC(Try-Confirm-Cancel)是业务层实现的两阶段提交,对数据库无特殊要求。
6.1 三个阶段
Try(预留资源):
检查业务可行性
冻结/预占资源(不真正扣减,只是标记"已预留")
操作必须幂等
Confirm(确认执行):
使用 Try 阶段预留的资源,执行真正的业务操作
只要 Try 成功,Confirm 必定能成功
操作必须幂等
Cancel(释放资源):
释放 Try 阶段预留的资源,恢复原始状态
需要处理"空回滚":Try 未执行时 Cancel 被调用
操作必须幂等
6.2 数据库表设计
订单服务(order_db):
CREATE TABLE orders (
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
order_no VARCHAR(64) NOT NULL UNIQUE COMMENT '业务订单号',
user_id BIGINT UNSIGNED NOT NULL,
product_id BIGINT UNSIGNED NOT NULL,
quantity INT UNSIGNED NOT NULL,
amount DECIMAL(10, 2) NOT NULL,
status ENUM('pending','confirmed','cancelled') NOT NULL DEFAULT 'pending',
created_at DATETIME NOT NULL DEFAULT NOW(),
updated_at DATETIME NOT NULL DEFAULT NOW() ON UPDATE NOW(),
INDEX idx_user_id (user_id),
INDEX idx_product_id (product_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- TCC 事务记录(用于幂等和空回滚判断)
CREATE TABLE tcc_fence (
xid VARCHAR(128) NOT NULL COMMENT '全局事务ID',
branch_id VARCHAR(128) NOT NULL COMMENT '分支事务ID',
op TINYINT NOT NULL COMMENT '1=try 2=confirm 3=cancel',
created_at DATETIME NOT NULL DEFAULT NOW(),
PRIMARY KEY (xid, branch_id, op)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
商品服务(product_db):
CREATE TABLE products (
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
price DECIMAL(10, 2) NOT NULL,
stock INT UNSIGNED NOT NULL DEFAULT 0 COMMENT '实际库存',
stock_frozen INT UNSIGNED NOT NULL DEFAULT 0 COMMENT '冻结库存(TCC预留)',
version INT UNSIGNED NOT NULL DEFAULT 0 COMMENT '乐观锁',
updated_at DATETIME NOT NULL DEFAULT NOW() ON UPDATE NOW()
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- TCC 事务记录
CREATE TABLE tcc_fence (
xid VARCHAR(128) NOT NULL,
branch_id VARCHAR(128) NOT NULL,
op TINYINT NOT NULL,
created_at DATETIME NOT NULL DEFAULT NOW(),
PRIMARY KEY (xid, branch_id, op)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
6.3 GORM 模型定义
// ====== 订单服务模型 ======
type OrderStatus string
const (
OrderStatusPending OrderStatus = "pending"
OrderStatusConfirmed OrderStatus = "confirmed"
OrderStatusCancelled OrderStatus = "cancelled"
)
type Order struct {
ID uint64 `gorm:"primaryKey;autoIncrement"`
OrderNo string `gorm:"uniqueIndex;size:64;not null"`
UserID uint64 `gorm:"not null;index"`
ProductID uint64 `gorm:"not null;index"`
Quantity uint32 `gorm:"not null"`
Amount float64 `gorm:"type:decimal(10,2);not null"`
Status OrderStatus `gorm:"type:enum('pending','confirmed','cancelled');default:pending"`
CreatedAt time.Time
UpdatedAt time.Time
}
type TCCFence struct {
XID string `gorm:"primaryKey;size:128"`
BranchID string `gorm:"primaryKey;size:128"`
Op int8 `gorm:"primaryKey"` // 1=try 2=confirm 3=cancel
CreatedAt time.Time
}
// ====== 商品服务模型 ======
type Product struct {
ID uint64 `gorm:"primaryKey;autoIncrement"`
Name string `gorm:"size:255;not null"`
Price float64 `gorm:"type:decimal(10,2);not null"`
Stock uint32 `gorm:"not null;default:0"`
StockFrozen uint32 `gorm:"not null;default:0"`
Version uint32 `gorm:"not null;default:0"`
UpdatedAt time.Time
}
6.4 TCC 完整实现
// ====== 订单服务:TCC 接口 ======
type OrderTCC struct {
db *gorm.DB
}
// Try:创建"预占"状态的订单
func (s *OrderTCC) Try(ctx context.Context, xid, branchID string, req CreateOrderReq) error {
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// 1. 幂等检查(防止重复 Try)
fence := TCCFence{XID: xid, BranchID: branchID, Op: 1}
result := tx.Create(&fence)
if result.Error != nil {
if isDuplicateKeyError(result.Error) {
return nil // 已经执行过,幂等返回成功
}
return result.Error
}
// 2. 创建 pending 状态的订单
order := Order{
OrderNo: fmt.Sprintf("ORD-%s", xid),
UserID: req.UserID,
ProductID: req.ProductID,
Quantity: req.Quantity,
Amount: req.Amount,
Status: OrderStatusPending,
}
return tx.Create(&order).Error
})
}
// Confirm:将订单改为 confirmed
func (s *OrderTCC) Confirm(ctx context.Context, xid, branchID string) error {
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// 1. 幂等检查
fence := TCCFence{XID: xid, BranchID: branchID, Op: 2}
result := tx.Create(&fence)
if result.Error != nil {
if isDuplicateKeyError(result.Error) {
return nil
}
return result.Error
}
// 2. 更新订单状态
r := tx.Model(&Order{}).
Where("order_no = ? AND status = ?", fmt.Sprintf("ORD-%s", xid), OrderStatusPending).
Update("status", OrderStatusConfirmed)
if r.Error != nil {
return r.Error
}
// RowsAffected=0 说明订单不存在或已被修改,Confirm 阶段按成功处理(幂等)
return nil
})
}
// Cancel:将订单改为 cancelled(空回滚安全)
func (s *OrderTCC) Cancel(ctx context.Context, xid, branchID string) error {
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// 1. 检查 Try 是否执行过(空回滚判断)
var tryFence TCCFence
err := tx.Where("xid = ? AND branch_id = ? AND op = 1", xid, branchID).
First(&tryFence).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
// Try 未执行,记录 Cancel fence 后直接返回(空回滚)
cancelFence := TCCFence{XID: xid, BranchID: branchID, Op: 3}
tx.Create(&cancelFence) // 忽略重复插入错误
return nil
}
// 2. 幂等检查
cancelFence := TCCFence{XID: xid, BranchID: branchID, Op: 3}
result := tx.Create(&cancelFence)
if result.Error != nil {
if isDuplicateKeyError(result.Error) {
return nil
}
return result.Error
}
// 3. 取消订单
return tx.Model(&Order{}).
Where("order_no = ? AND status = ?", fmt.Sprintf("ORD-%s", xid), OrderStatusPending).
Update("status", OrderStatusCancelled).Error
})
}
// ====== 商品服务:TCC 接口 ======
type StockTCC struct {
db *gorm.DB
}
// Try:冻结库存
func (s *StockTCC) Try(ctx context.Context, xid, branchID string, productID uint64, qty uint32) error {
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// 1. 幂等检查
fence := TCCFence{XID: xid, BranchID: branchID, Op: 1}
result := tx.Create(&fence)
if result.Error != nil {
if isDuplicateKeyError(result.Error) {
return nil
}
return result.Error
}
// 2. 冻结库存:stock 不变,stock_frozen 增加
// 条件:stock - stock_frozen >= qty(可用库存足够)
r := tx.Model(&Product{}).
Where("id = ? AND stock - stock_frozen >= ?", productID, qty).
Updates(map[string]interface{}{
"stock_frozen": gorm.Expr("stock_frozen + ?", qty),
"version": gorm.Expr("version + 1"),
})
if r.Error != nil {
return r.Error
}
if r.RowsAffected == 0 {
return errors.New("insufficient stock")
}
return nil
})
}
// Confirm:真正扣减库存
func (s *StockTCC) Confirm(ctx context.Context, xid, branchID string, productID uint64, qty uint32) error {
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// 1. 幂等检查
fence := TCCFence{XID: xid, BranchID: branchID, Op: 2}
result := tx.Create(&fence)
if result.Error != nil {
if isDuplicateKeyError(result.Error) {
return nil
}
return result.Error
}
// 2. 扣减库存:stock 和 stock_frozen 同时减少
r := tx.Model(&Product{}).
Where("id = ? AND stock_frozen >= ?", productID, qty).
Updates(map[string]interface{}{
"stock": gorm.Expr("stock - ?", qty),
"stock_frozen": gorm.Expr("stock_frozen - ?", qty),
"version": gorm.Expr("version + 1"),
})
if r.Error != nil {
return r.Error
}
// RowsAffected=0:幂等处理,视为成功(Confirm 阶段不应失败)
return nil
})
}
// Cancel:解冻库存(空回滚安全)
func (s *StockTCC) Cancel(ctx context.Context, xid, branchID string, productID uint64, qty uint32) error {
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// 1. 空回滚判断
var tryFence TCCFence
err := tx.Where("xid = ? AND branch_id = ? AND op = 1", xid, branchID).
First(&tryFence).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
cancelFence := TCCFence{XID: xid, BranchID: branchID, Op: 3}
tx.Create(&cancelFence)
return nil
}
// 2. 幂等检查
cancelFence := TCCFence{XID: xid, BranchID: branchID, Op: 3}
result := tx.Create(&cancelFence)
if result.Error != nil {
if isDuplicateKeyError(result.Error) {
return nil
}
return result.Error
}
// 3. 解冻库存
return tx.Model(&Product{}).
Where("id = ? AND stock_frozen >= ?", productID, qty).
Updates(map[string]interface{}{
"stock_frozen": gorm.Expr("stock_frozen - ?", qty),
"version": gorm.Expr("version + 1"),
}).Error
})
}
// ====== 协调者:驱动整个 TCC 流程 ======
type TCCCoordinator struct {
orderTCC *OrderTCC
stockTCC *StockTCC
}
func (c *TCCCoordinator) PlaceOrder(ctx context.Context, req CreateOrderReq) error {
xid := generateXID() // 全局唯一事务 ID
// ---- Phase 1: Try ----
// 调用各参与者的 Try,任意失败则全部 Cancel
err := c.orderTCC.Try(ctx, xid, "branch-order", req)
if err != nil {
// 订单 Try 失败,无需 Cancel(什么都没做)
return fmt.Errorf("order try failed: %w", err)
}
err = c.stockTCC.Try(ctx, xid, "branch-stock", req.ProductID, uint32(req.Quantity))
if err != nil {
// 库存 Try 失败,Cancel 已成功的订单 Try
c.orderTCC.Cancel(ctx, xid, "branch-order")
return fmt.Errorf("stock try failed: %w", err)
}
// ---- Phase 2: Confirm ----
// 所有 Try 成功,执行 Confirm
// Confirm 失败需要重试(不能 Cancel,资源已预留)
if err = c.orderTCC.Confirm(ctx, xid, "branch-order"); err != nil {
// 实际生产中,Confirm 失败要持久化重试,不能直接报错
return fmt.Errorf("order confirm failed (need retry): %w", err)
}
if err = c.stockTCC.Confirm(ctx, xid, "branch-stock", req.ProductID, uint32(req.Quantity)); err != nil {
return fmt.Errorf("stock confirm failed (need retry): %w", err)
}
return nil
}
// 工具函数
func generateXID() string {
return fmt.Sprintf("%d-%s", time.Now().UnixNano(), uuid.New().String()[:8])
}
func isDuplicateKeyError(err error) bool {
var mysqlErr *mysql.MySQLError
return errors.As(err, &mysqlErr) && mysqlErr.Number == 1062
}
6.5 TCC 的三大陷阱
陷阱 1:悬挂(Suspension)
Cancel 先于 Try 到达(网络乱序)
Cancel 执行时找不到 Try 记录,做空回滚
随后 Try 到达,执行了预留,但再也不会有 Confirm/Cancel
→ 解决:Cancel 记录 fence 后,Try 检查到 cancel fence 则拒绝执行
陷阱 2:空回滚(Empty Cancel)
Try 超时未执行(或网络丢包),Cancel 被调用
Cancel 必须能识别"Try 未执行"的情况,直接返回成功
→ 解决:上面代码中的空回滚判断逻辑
陷阱 3:幂等(Idempotency)
Confirm/Cancel 因重试被调用多次
必须保证结果相同,不能重复扣减
→ 解决:tcc_fence 表的唯一主键,重复插入失败则返回成功
7. Saga 模式
7.1 基本思想
Saga 将分布式长事务拆分为若干本地事务,每个本地事务有对应的补偿事务。正向流程失败时,逆序执行补偿事务恢复数据。
下单 Saga 流程:
正向(Forward):
T1: 订单服务 → 创建订单(status=pending)
T2: 商品服务 → 扣减库存
T3: 账户服务 → 扣减余额
T4: 订单服务 → 更新订单状态(status=confirmed)
补偿(Compensate,T3 失败时从后往前):
C2: 商品服务 → 恢复库存
C1: 订单服务 → 取消订单(status=cancelled)
7.2 两种协调方式
编排(Choreography):各服务通过事件总线(MQ)驱动,无中央协调者。
订单服务
→ 发布 OrderCreated 事件
→ 商品服务消费,扣减库存
→ 发布 StockDeducted 事件
→ 账户服务消费,扣减余额
→ 发布 PaymentCompleted 事件
→ 订单服务消费,更新状态为 confirmed
任意步骤失败:
→ 发布 XXXFailed 事件
→ 上游服务监听并执行补偿
协同(Orchestration):由 Saga 协调者(状态机)统一驱动各步骤,逻辑集中,易于管理。
// Saga 执行状态记录
type SagaExecution struct {
ID uint64 `gorm:"primaryKey;autoIncrement"`
SagaID string `gorm:"uniqueIndex;size:64"`
BizType string `gorm:"size:64;not null"`
Status string `gorm:"type:enum('running','completed','compensating','failed');default:running"`
CurrentStep int `gorm:"default:0"`
CreatedAt time.Time
UpdatedAt time.Time
}
type SagaStepLog struct {
ID uint64 `gorm:"primaryKey;autoIncrement"`
SagaID string `gorm:"index;size:64"`
StepNo int `gorm:"not null"`
StepName string `gorm:"size:128;not null"`
Action string `gorm:"type:enum('forward','compensate')"`
Status string `gorm:"type:enum('pending','success','failed')"`
RequestJSON string `gorm:"type:json"`
ResponseJSON string `gorm:"type:json"`
CreatedAt time.Time
}
8. 本地消息表(Transactional Outbox)
本地消息表是互联网系统中最常用的分布式事务方案,实现简单、可靠性高,是最终一致性的典型落地。
8.1 整体架构
┌─────────────────────────────────────────────────────────────────┐
│ 订单服务 │
│ │
│ HTTP 请求 ──▶ CreateOrder() │
│ │ │
│ ▼ │
│ BEGIN TRANSACTION │
│ INSERT orders (status=created) │
│ INSERT outbox_messages (status=pending) ◀─┐ │
│ COMMIT 同一事务 │ │
│ │ │
│ ┌─────────────────────────────────────┐ │ │
│ │ 消息中继器(Relay) │ │ │
│ │ 轮询 outbox_messages(status=pending)│ │ │
│ │ → 发送到 MQ │ │ │
│ │ → 更新 status=sent │──────────────────┘ │
│ └─────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
│ MQ(Kafka/RocketMQ)
▼
┌─────────────────────────────────────────────────────────────────┐
│ 商品服务 │
│ │
│ MQ 消费者 ──▶ HandleStockDeduct() │
│ │ │
│ ▼ │
│ 幂等检查(consumed_messages) │
│ │ │
│ ▼ │
│ BEGIN TRANSACTION │
│ UPDATE products SET stock = stock - qty │
│ INSERT consumed_messages (message_id) │
│ COMMIT │
│ │ │
│ ACK 消息(通知 MQ 消费成功) │
└─────────────────────────────────────────────────────────────────┘
8.2 数据库表设计
订单服务(order_db):
-- 订单表
CREATE TABLE orders (
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
order_no VARCHAR(64) NOT NULL UNIQUE COMMENT '业务单号,全局唯一',
user_id BIGINT UNSIGNED NOT NULL,
product_id BIGINT UNSIGNED NOT NULL,
quantity INT UNSIGNED NOT NULL,
amount DECIMAL(10, 2) NOT NULL,
status ENUM('created','confirmed','cancelled') NOT NULL DEFAULT 'created',
created_at DATETIME NOT NULL DEFAULT NOW(),
updated_at DATETIME NOT NULL DEFAULT NOW() ON UPDATE NOW(),
INDEX idx_user_id (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 发件箱消息表(Outbox)
CREATE TABLE outbox_messages (
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
message_id VARCHAR(64) NOT NULL UNIQUE COMMENT '全局唯一消息ID,消费者用于去重',
topic VARCHAR(128) NOT NULL COMMENT 'MQ Topic',
payload JSON NOT NULL COMMENT '消息内容',
status ENUM('pending','sent','failed') NOT NULL DEFAULT 'pending',
retry_count TINYINT UNSIGNED NOT NULL DEFAULT 0,
next_retry DATETIME COMMENT '下次重试时间,NULL 表示立即可重试',
created_at DATETIME NOT NULL DEFAULT NOW(),
sent_at DATETIME COMMENT '成功发送时间',
INDEX idx_pending (status, next_retry) COMMENT '中继器轮询索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
商品服务(product_db):
-- 商品表
CREATE TABLE products (
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
price DECIMAL(10, 2) NOT NULL,
stock INT UNSIGNED NOT NULL DEFAULT 0 COMMENT '可用库存',
version INT UNSIGNED NOT NULL DEFAULT 0 COMMENT '乐观锁版本号',
updated_at DATETIME NOT NULL DEFAULT NOW() ON UPDATE NOW()
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 已消费消息表(去重/幂等)
CREATE TABLE consumed_messages (
message_id VARCHAR(64) NOT NULL PRIMARY KEY COMMENT '与 outbox_messages.message_id 对应',
consumed_at DATETIME NOT NULL DEFAULT NOW(),
INDEX idx_consumed_at (consumed_at) COMMENT '用于清理过期记录'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 库存不足时的补偿发件箱(商品服务自己的 Outbox)
CREATE TABLE outbox_messages (
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
message_id VARCHAR(64) NOT NULL UNIQUE,
topic VARCHAR(128) NOT NULL,
payload JSON NOT NULL,
status ENUM('pending','sent','failed') NOT NULL DEFAULT 'pending',
retry_count TINYINT UNSIGNED NOT NULL DEFAULT 0,
next_retry DATETIME,
created_at DATETIME NOT NULL DEFAULT NOW(),
sent_at DATETIME,
INDEX idx_pending (status, next_retry)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
8.3 GORM 模型定义
// ====== 订单服务 models(order_db)======
type OrderStatus string
const (
OrderStatusCreated OrderStatus = "created"
OrderStatusConfirmed OrderStatus = "confirmed"
OrderStatusCancelled OrderStatus = "cancelled"
)
type Order struct {
ID uint64 `gorm:"primaryKey;autoIncrement"`
OrderNo string `gorm:"uniqueIndex;size:64;not null"`
UserID uint64 `gorm:"not null;index"`
ProductID uint64 `gorm:"not null"`
Quantity uint32 `gorm:"not null"`
Amount float64 `gorm:"type:decimal(10,2);not null"`
Status OrderStatus `gorm:"type:enum('created','confirmed','cancelled');default:created"`
CreatedAt time.Time
UpdatedAt time.Time
}
type MessageStatus string
const (
MessageStatusPending MessageStatus = "pending"
MessageStatusSent MessageStatus = "sent"
MessageStatusFailed MessageStatus = "failed"
)
type OutboxMessage struct {
ID uint64 `gorm:"primaryKey;autoIncrement"`
MessageID string `gorm:"uniqueIndex;size:64;not null"`
Topic string `gorm:"size:128;not null"`
Payload datatypes.JSON `gorm:"type:json;not null"`
Status MessageStatus `gorm:"type:enum('pending','sent','failed');default:pending"`
RetryCount uint8 `gorm:"default:0"`
NextRetry *time.Time `gorm:"index"`
CreatedAt time.Time
SentAt *time.Time
}
// ====== 商品服务 models(product_db)======
type Product struct {
ID uint64 `gorm:"primaryKey;autoIncrement"`
Name string `gorm:"size:255;not null"`
Price float64 `gorm:"type:decimal(10,2);not null"`
Stock uint32 `gorm:"not null;default:0"`
Version uint32 `gorm:"not null;default:0"`
UpdatedAt time.Time
}
type ConsumedMessage struct {
MessageID string `gorm:"primaryKey;size:64"`
ConsumedAt time.Time `gorm:"index"`
}
// ====== 消息 Payload 结构体 ======
// 订单服务 → 商品服务:扣减库存
type StockDeductPayload struct {
OrderNo string `json:"order_no"`
ProductID uint64 `json:"product_id"`
Quantity uint32 `json:"quantity"`
Amount float64 `json:"amount"`
}
// 商品服务 → 订单服务:库存不足,请求取消订单
type OrderCancelPayload struct {
OrderNo string `json:"order_no"`
Reason string `json:"reason"`
}
8.4 生产者:订单服务
步骤一:创建订单(业务操作 + 写消息表,同一事务)
type OrderService struct {
db *gorm.DB
}
type CreateOrderReq struct {
UserID uint64
ProductID uint64
Quantity uint32
Amount float64
}
func (s *OrderService) CreateOrder(ctx context.Context, req CreateOrderReq) (*Order, error) {
orderNo := generateOrderNo() // 全局唯一单号
messageID := uuid.New().String()
var order Order
// 关键:业务操作和写消息表在同一个本地事务
// 要么都成功,要么都失败,天然原子性
err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// 1. 创建订单
order = Order{
OrderNo: orderNo,
UserID: req.UserID,
ProductID: req.ProductID,
Quantity: req.Quantity,
Amount: req.Amount,
Status: OrderStatusCreated,
}
if err := tx.Create(&order).Error; err != nil {
return fmt.Errorf("create order: %w", err)
}
// 2. 写入发件箱消息表
payload, _ := json.Marshal(StockDeductPayload{
OrderNo: orderNo,
ProductID: req.ProductID,
Quantity: req.Quantity,
Amount: req.Amount,
})
msg := OutboxMessage{
MessageID: messageID,
Topic: "order.stock.deduct",
Payload: datatypes.JSON(payload),
Status: MessageStatusPending,
}
if err := tx.Create(&msg).Error; err != nil {
return fmt.Errorf("create outbox message: %w", err)
}
return nil
})
if err != nil {
return nil, err
}
return &order, nil
}
// generateOrderNo 生成全局唯一订单号
func generateOrderNo() string {
return fmt.Sprintf("ORD%s%06d",
time.Now().Format("20060102150405"),
rand.Intn(999999),
)
}
这一步的可靠性分析:
场景 1:事务提交成功
→ 订单和消息都写入,等待中继器发送 ✅
场景 2:订单写入失败(如唯一键冲突)
→ 事务回滚,消息也不存在,客户端收到错误后重试 ✅
场景 3:消息写入失败
→ 事务回滚,订单也不存在,客户端收到错误后重试 ✅
场景 4:事务提交时 MySQL 宕机
→ 事务未提交,订单和消息都不存在(WAL 保证原子性)✅
结论:这一步不存在"订单有消息没有"或"消息有订单没有"的情况
步骤二:消息中继器(Relay)——将消息从数据库发送到 MQ
type MessageRelay struct {
db *gorm.DB
mq MQPublisher
log *zap.Logger
}
// MQPublisher MQ 发布接口(可对接 Kafka、RocketMQ 等)
type MQPublisher interface {
Publish(ctx context.Context, topic, messageID string, payload []byte) error
}
// Run 启动中继器(作为后台 goroutine 运行)
func (r *MessageRelay) Run(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
r.log.Info("message relay stopped")
return
case <-ticker.C:
if err := r.sendBatch(ctx); err != nil {
r.log.Error("relay send batch failed", zap.Error(err))
}
}
}
}
// sendBatch 批量发送一批待发消息
func (r *MessageRelay) sendBatch(ctx context.Context) error {
var messages []OutboxMessage
// 查询待发消息:status=pending 且 next_retry 未到或为空
err := r.db.WithContext(ctx).
Where("status = ? AND (next_retry IS NULL OR next_retry <= ?)",
MessageStatusPending, time.Now()).
Order("id ASC").
Limit(100).
Find(&messages).Error
if err != nil {
return fmt.Errorf("query pending messages: %w", err)
}
for _, msg := range messages {
r.sendOne(ctx, msg)
}
return nil
}
// sendOne 发送单条消息,包含完整的错误处理
func (r *MessageRelay) sendOne(ctx context.Context, msg OutboxMessage) {
// 发送到 MQ
err := r.mq.Publish(ctx, msg.Topic, msg.MessageID, msg.Payload)
if err != nil {
r.log.Warn("publish message failed",
zap.String("message_id", msg.MessageID),
zap.Uint8("retry_count", msg.RetryCount),
zap.Error(err),
)
r.markFailed(ctx, msg)
return
}
// 发送成功,更新状态
now := time.Now()
result := r.db.WithContext(ctx).Model(&msg).Updates(map[string]interface{}{
"status": MessageStatusSent,
"sent_at": now,
})
if result.Error != nil {
// 更新失败无影响:下次轮询会重新发送,消费者通过幂等处理重复
r.log.Warn("mark message sent failed",
zap.String("message_id", msg.MessageID),
zap.Error(result.Error),
)
}
}
// markFailed 发送失败时,指数退避重试
func (r *MessageRelay) markFailed(ctx context.Context, msg OutboxMessage) {
newRetryCount := msg.RetryCount + 1
// 指数退避:1s, 2s, 4s, 8s, 16s, 32s ...
backoff := time.Duration(1<<newRetryCount) * time.Second
if backoff > 10*time.Minute {
backoff = 10 * time.Minute // 最大退避 10 分钟
}
nextRetry := time.Now().Add(backoff)
updates := map[string]interface{}{
"retry_count": newRetryCount,
"next_retry": nextRetry,
}
// 超过最大重试次数,标记为 failed,触发告警人工介入
const maxRetry = 10
if newRetryCount >= maxRetry {
updates["status"] = MessageStatusFailed
r.log.Error("message exceeded max retries, marked as failed",
zap.String("message_id", msg.MessageID),
zap.Uint8("retry_count", newRetryCount),
)
// 实际生产中这里触发告警(钉钉/PagerDuty)
}
r.db.WithContext(ctx).Model(&msg).Updates(updates)
}
中继器各阶段错误分析:
场景 1:MQ 短暂不可用
→ Publish 失败,写 next_retry(指数退避)
→ 等 MQ 恢复后自动重试 ✅
场景 2:MQ 发送成功,但更新 status=sent 失败
→ 消息被重新轮询,再次发送(重复投递)
→ 消费者通过 consumed_messages 去重,幂等处理 ✅
场景 3:中继器进程崩溃(重启)
→ pending 消息仍在数据库,重启后继续发送 ✅
场景 4:MQ 持续不可用,重试超限
→ status=failed,触发告警,人工介入(手动重置 status=pending)✅
场景 5:同一消息被两个 Relay 实例同时发送(多实例部署)
→ 两个实例都发送成功,消费者收到两条相同 message_id 的消息
→ consumed_messages 幂等去重 ✅
步骤三:订单确认(消费商品服务的回调)
// 商品服务扣减成功后,会发送 order.confirmed 消息
// 订单服务消费该消息,将订单状态更新为 confirmed
type OrderConsumer struct {
db *gorm.DB
log *zap.Logger
}
func (c *OrderConsumer) HandleOrderConfirm(ctx context.Context, msg MQMessage) error {
var payload struct {
OrderNo string `json:"order_no"`
}
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
// 消息格式错误,不重试,直接 ACK(避免毒丸消息阻塞队列)
c.log.Error("invalid message payload", zap.String("message_id", msg.ID))
return nil
}
return c.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// 幂等:UPDATE 本身是幂等的(重复执行结果相同)
return tx.Model(&Order{}).
Where("order_no = ? AND status = ?", payload.OrderNo, OrderStatusCreated).
Update("status", OrderStatusConfirmed).Error
})
}
func (c *OrderConsumer) HandleOrderCancel(ctx context.Context, msg MQMessage) error {
var payload OrderCancelPayload
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
c.log.Error("invalid message payload", zap.String("message_id", msg.ID))
return nil
}
return c.db.WithContext(ctx).
Model(&Order{}).
Where("order_no = ? AND status = ?", payload.OrderNo, OrderStatusCreated).
Update("status", OrderStatusCancelled).Error
}
8.5 消费者:商品服务
完整消费逻辑
type StockConsumer struct {
db *gorm.DB
log *zap.Logger
}
// HandleStockDeduct 消费"扣减库存"消息
// MQ 框架会在此函数返回 nil 时 ACK,返回 error 时 NACK(重新投递)
func (c *StockConsumer) HandleStockDeduct(ctx context.Context, msg MQMessage) error {
// ---- 解析消息 ----
var payload StockDeductPayload
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
// 消息格式错误:不重试,记录日志后 ACK
// 这类消息不应该阻塞后续正常消息的消费
c.log.Error("invalid stock deduct payload",
zap.String("message_id", msg.ID),
zap.ByteString("payload", msg.Payload),
)
return nil // 返回 nil = ACK,不重试
}
// ---- 幂等检查 ----
// 先查再插,避免每次都触发唯一键冲突(高并发下减少异常)
var count int64
c.db.WithContext(ctx).Model(&ConsumedMessage{}).
Where("message_id = ?", msg.ID).Count(&count)
if count > 0 {
c.log.Info("message already consumed, skip",
zap.String("message_id", msg.ID),
)
return nil // 已消费,直接 ACK
}
// ---- 核心业务:扣减库存 ----
err := c.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// 1. 插入消费记录(幂等锁)
// 若并发重复消费,只有一个能插入成功
consumed := ConsumedMessage{
MessageID: msg.ID,
ConsumedAt: time.Now(),
}
result := tx.Create(&consumed)
if result.Error != nil {
if isDuplicateKeyError(result.Error) {
// 并发情况下的重复消费,直接返回 nil(幂等)
return nil
}
return fmt.Errorf("insert consumed_message: %w", result.Error)
}
// 2. 扣减库存(带乐观锁)
r := tx.Model(&Product{}).
Where("id = ? AND stock >= ?", payload.ProductID, payload.Quantity).
Updates(map[string]interface{}{
"stock": gorm.Expr("stock - ?", payload.Quantity),
"version": gorm.Expr("version + 1"),
})
if r.Error != nil {
return fmt.Errorf("deduct stock: %w", r.Error)
}
if r.RowsAffected == 0 {
// 库存不足!需要发补偿消息,通知订单服务取消订单
// 补偿消息也写入本服务的 outbox,保证原子性
return c.writeCompensationMessage(tx, payload)
}
// 3. 库存扣减成功,写确认消息(通知订单服务确认订单)
return c.writeConfirmMessage(tx, payload)
})
if err != nil {
// 事务失败(DB 故障等),返回 error 让 MQ 重新投递
c.log.Error("handle stock deduct failed",
zap.String("message_id", msg.ID),
zap.Error(err),
)
return err // 返回 error = NACK,MQ 重新投递
}
return nil // 返回 nil = ACK
}
// writeCompensationMessage 库存不足时,写补偿消息(取消订单)
func (c *StockConsumer) writeCompensationMessage(tx *gorm.DB, payload StockDeductPayload) error {
c.log.Warn("insufficient stock, writing cancel order message",
zap.String("order_no", payload.OrderNo),
zap.Uint32("required", payload.Quantity),
)
cancelPayload, _ := json.Marshal(OrderCancelPayload{
OrderNo: payload.OrderNo,
Reason: fmt.Sprintf("insufficient stock for product %d", payload.ProductID),
})
msg := OutboxMessage{
MessageID: uuid.New().String(),
Topic: "stock.order.cancel", // 订单服务监听此 Topic
Payload: datatypes.JSON(cancelPayload),
Status: MessageStatusPending,
}
return tx.Create(&msg).Error
}
// writeConfirmMessage 扣减成功时,写确认消息(确认订单)
func (c *StockConsumer) writeConfirmMessage(tx *gorm.DB, payload StockDeductPayload) error {
confirmPayload, _ := json.Marshal(map[string]string{
"order_no": payload.OrderNo,
})
msg := OutboxMessage{
MessageID: uuid.New().String(),
Topic: "stock.order.confirm", // 订单服务监听此 Topic
Payload: datatypes.JSON(confirmPayload),
Status: MessageStatusPending,
}
return tx.Create(&msg).Error
}
消费者各阶段错误分析:
场景 1:消息格式错误(Bad Payload)
→ 记录日志,直接 ACK
→ 不重试,避免毒丸消息无限阻塞队列 ✅
场景 2:幂等检查通过,但事务中途消费者进程崩溃
→ consumed_messages 未插入(事务回滚)
→ MQ 超时后重新投递
→ 消费者重新消费,幂等检查未命中,正常处理 ✅
场景 3:库存扣减成功,但写 consumed_messages 失败
→ 事务回滚(stock 恢复)
→ MQ 重新投递,消费者重新执行 ✅
场景 4:整个事务成功,但 ACK 前消费者崩溃
→ MQ 重新投递
→ consumed_messages 已存在,幂等检查命中,直接 ACK ✅
场景 5:库存不足,写补偿消息失败
→ 事务回滚
→ MQ 重新投递,重新处理
→ 库存仍然不足,再次写补偿消息,最终写入成功 ✅
场景 6:补偿消息发送失败(商品服务 Relay 发送取消消息失败)
→ 同样走 Outbox 模式,指数退避重试
→ 最终订单服务收到取消消息 ✅
8.6 完整数据流时序图
正常下单成功流程:
客户端 订单服务 order_db MQ 商品服务 product_db
│ │ │ │ │ │
│──CreateOrder─▶│ │ │ │ │
│ │──BEGIN──────────▶│ │ │ │
│ │──INSERT orders──▶│ │ │ │
│ │──INSERT outbox──▶│ │ │ │
│ │──COMMIT─────────▶│ │ │ │
│◀──200 OK──────│ │ │ │ │
│ │ │ │ │ │
│ [Relay轮询] │ │ │ │
│ │──SELECT outbox──▶│ │ │ │
│ │◀─pending msg─────│ │ │ │
│ │──────────Publish(stock.deduct)─▶│ │ │
│ │──UPDATE sent────▶│ │ │ │
│ │ │ │ │ │
│ │ │ │──deliver──────▶│ │
│ │ │ │ │──BEGIN─────────▶│
│ │ │ │ │──UPDATE stock──▶│
│ │ │ │ │──INSERT consumed▶│
│ │ │ │ │──INSERT outbox──▶│
│ │ │ │ │──COMMIT────────▶│
│ │ │ │◀──ACK──────────│ │
│ │ │ │ │ │
│ │ │ [商品服务Relay轮询] │ │
│ │ │ │◀─Publish(stock.order.confirm)────│
│ │ │ │ │ │
│ │◀──deliver────────────────────── │ │ │
│ │──UPDATE confirmed▶│ │ │ │
库存不足流程(补偿):
...(同上,直到商品服务消费消息)
│──BEGIN─────────▶│
│──stock不足,rows=0▶│
│──INSERT outbox(cancel)▶│
│──INSERT consumed▶│
│──COMMIT────────▶│
│◀──ACK───────────│
[商品服务Relay发送取消消息]
│◀──deliver(stock.order.cancel)──────│
│──UPDATE cancelled─▶│
8.7 消息清理
// CleanConsumedMessages 定期清理过期的消费记录
// consumed_messages 表会持续增长,需要定期清理
func CleanConsumedMessages(ctx context.Context, db *gorm.DB) error {
// 保留最近 7 天的消费记录(覆盖 MQ 最大重试窗口即可)
cutoff := time.Now().AddDate(0, 0, -7)
return db.WithContext(ctx).
Where("consumed_at < ?", cutoff).
Delete(&ConsumedMessage{}).Error
}
// CleanSentMessages 定期清理已发送的消息
func CleanSentMessages(ctx context.Context, db *gorm.DB) error {
cutoff := time.Now().AddDate(0, 0, -3)
return db.WithContext(ctx).
Where("status = ? AND sent_at < ?", MessageStatusSent, cutoff).
Delete(&OutboxMessage{}).Error
}
8.8 各环节故障总结
| 故障位置 | 现象 | 自动恢复方式 |
|---|---|---|
| 订单服务写 DB 失败 | 事务回滚,订单和消息都不存在 | 客户端重试 |
| 中继器发送 MQ 失败 | 消息留在 outbox,status=pending | 指数退避自动重试 |
| 中继器更新 sent 失败 | 消息被重复发送 | 消费者幂等处理 |
| MQ 宕机 | 消息积压在 outbox | MQ 恢复后自动发送 |
| 消费者处理中崩溃 | consumed 未写入,事务回滚 | MQ 重新投递 |
| 消费者 DB 失败 | 事务回滚,返回 NACK | MQ 重新投递 |
| 库存不足 | 写补偿消息(取消订单) | 自动补偿流程 |
| 补偿消息发送失败 | 消息在商品服务 outbox 重试 | 自动重试到成功 |
9. 方案对比与选型
9.1 综合对比
| 方案 | 一致性 | 可用性 | 吞吐量 | 业务改造 | 实现难度 | 适用场景 |
|---|---|---|---|---|---|---|
| MySQL XA | 强一致 | 低 | 差 | 无 | 低 | 跨库、低并发、内部系统 |
| TCC | 较强(Try预占) | 高 | 中 | 大(3接口) | 高 | 金融支付、强一致要求 |
| Saga | 最终一致 | 高 | 好 | 中(2接口) | 中 | 长流程、多步骤编排 |
| 本地消息表 | 最终一致 | 高 | 好 | 小 | 中 | 通用异步解耦,推荐首选 |
9.2 选型决策树
Q1: 业务能否改造为单库事务?
→ 能:直接用 InnoDB 本地事务,最简单可靠
→ 不能:继续 Q2
Q2: 业务能否接受最终一致性(秒级延迟)?
→ 不能(如实时转账、强监管场景):选 TCC
→ 能:继续 Q3
Q3: 业务是否是长流程(步骤 >= 3,耗时 >= 秒级)?
→ 是:选 Saga(Orchestration 模式)
→ 否:选 本地消息表(最简单)
实践建议:
互联网电商、内容、社交 → 本地消息表(80% 的场景)
金融支付、账务系统 → TCC(DTM/Seata 框架)
订单履约长流程 → Saga
内部数据同步、批处理 → MySQL XA
9.3 常用开源框架
| 框架 | 语言 | 支持模式 | 说明 |
|---|---|---|---|
| DTM | Go | TCC/Saga/XA/消息 | 轻量,Go 生态首选 |
| Seata | Java | AT/TCC/Saga/XA | 阿里开源,Java 生态主流 |
| RocketMQ 事务消息 | Java/Go | 消息最终一致 | 本地消息表的托管版本 |
RocketMQ 事务消息 本质上是云托管的本地消息表,省去了自己实现中继器的工作,有条件的可以直接使用。
xingliuhua