目录

MySQL-13 Go 开发实战

MySQL Go 开发实战


目录

  1. 驱动与框架选择
  2. database/sql 标准库
  3. sqlx 增强库
  4. GORM ORM 框架
  5. 连接池深度调优
  6. 事务最佳实践
  7. 批量操作优化
  8. 数据库迁移(Migrate)
  9. 测试策略
  10. 常见坑与最佳实践

1. 驱动与框架选择

1.1 生态全景

驱动层(必须):
  github.com/go-sql-driver/mysql    最主流,go-sql-driver
  github.com/pingcap/tidb/...       TiDB 官方驱动(兼容 MySQL)

增强 SQL 库:
  github.com/jmoiron/sqlx           database/sql 的薄封装,保留 SQL 控制力
  github.com/blockloop/scan         轻量扫描库

ORM 框架:
  gorm.io/gorm                      Go 最流行的 ORM(v2)
  github.com/ent/ent                Facebook 开源,代码生成,类型安全
  github.com/uptrace/bun            现代 ORM,SQL-first

查询构建器:
  github.com/Masterminds/squirrel   SQL 构建器,灵活
  github.com/doug-martin/goqu       功能丰富的查询构建器

迁移工具:
  github.com/golang-migrate/migrate  支持多种数据库
  github.com/pressly/goose          轻量,内嵌迁移
  atlas (ariga.io/atlas)            现代 schema 管理

1.2 选型建议

场景 推荐
复杂 SQL、高性能场景 sqlx + squirrel
快速业务开发 GORM v2
类型安全、大型项目 ent
学习/简单脚本 database/sql

2. database/sql 标准库

2.1 完整连接示例

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"
    "time"

    _ "github.com/go-sql-driver/mysql"
)

type Config struct {
    Host     string
    Port     int
    User     string
    Password string
    DBName   string

    MaxOpenConns    int
    MaxIdleConns    int
    ConnMaxLifetime time.Duration
    ConnMaxIdleTime time.Duration
}

func NewMySQL(cfg Config) (*sql.DB, error) {
    dsn := fmt.Sprintf(
        "%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local&timeout=5s&readTimeout=10s&writeTimeout=5s",
        cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.DBName,
    )

    db, err := sql.Open("mysql", dsn)
    if err != nil {
        return nil, fmt.Errorf("open mysql: %w", err)
    }

    // 连接池配置
    db.SetMaxOpenConns(cfg.MaxOpenConns)
    db.SetMaxIdleConns(cfg.MaxIdleConns)
    db.SetConnMaxLifetime(cfg.ConnMaxLifetime)
    db.SetConnMaxIdleTime(cfg.ConnMaxIdleTime)

    // 验证连通性
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    if err = db.PingContext(ctx); err != nil {
        db.Close()
        return nil, fmt.Errorf("ping mysql: %w", err)
    }

    return db, nil
}

2.2 CRUD 完整示例

type User struct {
    ID        int64
    Name      string
    Email     string
    Age       int
    CreatedAt time.Time
    UpdatedAt time.Time
    DeletedAt sql.NullTime  // 可空时间
}

// INSERT
func CreateUser(ctx context.Context, db *sql.DB, u *User) (int64, error) {
    result, err := db.ExecContext(ctx,
        `INSERT INTO users (name, email, age) VALUES (?, ?, ?)`,
        u.Name, u.Email, u.Age,
    )
    if err != nil {
        return 0, fmt.Errorf("insert user: %w", err)
    }
    return result.LastInsertId()
}

// SELECT 单行
func GetUserByID(ctx context.Context, db *sql.DB, id int64) (*User, error) {
    var u User
    err := db.QueryRowContext(ctx,
        `SELECT id, name, email, age, created_at, updated_at FROM users WHERE id = ? AND deleted_at IS NULL`,
        id,
    ).Scan(&u.ID, &u.Name, &u.Email, &u.Age, &u.CreatedAt, &u.UpdatedAt)
    if err == sql.ErrNoRows {
        return nil, nil  // 未找到,返回 nil 而非错误
    }
    if err != nil {
        return nil, fmt.Errorf("get user %d: %w", id, err)
    }
    return &u, nil
}

// SELECT 多行
func ListUsers(ctx context.Context, db *sql.DB, page, size int) ([]*User, error) {
    rows, err := db.QueryContext(ctx,
        `SELECT id, name, email, age, created_at FROM users
         WHERE deleted_at IS NULL
         ORDER BY id DESC
         LIMIT ? OFFSET ?`,
        size, (page-1)*size,
    )
    if err != nil {
        return nil, fmt.Errorf("list users: %w", err)
    }
    defer rows.Close()

    var users []*User
    for rows.Next() {
        var u User
        if err = rows.Scan(&u.ID, &u.Name, &u.Email, &u.Age, &u.CreatedAt); err != nil {
            return nil, fmt.Errorf("scan user: %w", err)
        }
        users = append(users, &u)
    }
    // 必须检查 rows.Err()(可能在 Next() 期间发生错误)
    if err = rows.Err(); err != nil {
        return nil, fmt.Errorf("rows error: %w", err)
    }
    return users, nil
}

// UPDATE
func UpdateUser(ctx context.Context, db *sql.DB, u *User) error {
    result, err := db.ExecContext(ctx,
        `UPDATE users SET name = ?, email = ?, age = ? WHERE id = ? AND deleted_at IS NULL`,
        u.Name, u.Email, u.Age, u.ID,
    )
    if err != nil {
        return fmt.Errorf("update user %d: %w", u.ID, err)
    }
    n, _ := result.RowsAffected()
    if n == 0 {
        return fmt.Errorf("user %d not found or already deleted", u.ID)
    }
    return nil
}

// 软删除
func DeleteUser(ctx context.Context, db *sql.DB, id int64) error {
    _, err := db.ExecContext(ctx,
        `UPDATE users SET deleted_at = NOW() WHERE id = ? AND deleted_at IS NULL`,
        id,
    )
    return err
}

2.3 NULL 值处理

// 可空类型
import "database/sql"

type UserProfile struct {
    ID    int64
    Name  string
    Phone sql.NullString   // 可空字符串
    Age   sql.NullInt64    // 可空整数
    Score sql.NullFloat64  // 可空浮点
    Bio   sql.NullString
}

// 扫描可空字段
var u UserProfile
rows.Scan(&u.ID, &u.Name, &u.Phone, &u.Age)

// 使用可空字段
if u.Phone.Valid {
    fmt.Println(u.Phone.String)
}

// 推荐:使用指针替代 sql.Null* 类型(更直观)
type UserProfileV2 struct {
    ID    int64
    Name  string
    Phone *string  // nil 表示 NULL
    Age   *int
}

3. sqlx 增强库

sqlx 是对 database/sql 的薄封装,增加了:

  • 结构体自动扫描(StructScan)
  • 命名参数(NamedExec)
  • IN 查询辅助
  • 批量插入
import "github.com/jmoiron/sqlx"

type User struct {
    ID        int64     `db:"id"`
    Name      string    `db:"name"`
    Email     string    `db:"email"`
    Age       int       `db:"age"`
    CreatedAt time.Time `db:"created_at"`
}

func NewSQLX(dsn string) (*sqlx.DB, error) {
    db, err := sqlx.Connect("mysql", dsn)
    if err != nil {
        return nil, err
    }
    db.SetMaxOpenConns(100)
    db.SetMaxIdleConns(10)
    db.SetConnMaxLifetime(30 * time.Minute)
    return db, nil
}

// 自动映射到结构体(不需要手动 Scan)
func GetUser(ctx context.Context, db *sqlx.DB, id int64) (*User, error) {
    var u User
    err := db.GetContext(ctx, &u,
        "SELECT id, name, email, age, created_at FROM users WHERE id = ?", id,
    )
    if err == sql.ErrNoRows {
        return nil, nil
    }
    return &u, err
}

// 查询多行
func ListUsers(ctx context.Context, db *sqlx.DB, status int) ([]*User, error) {
    var users []*User
    err := db.SelectContext(ctx, &users,
        "SELECT id, name, email, age FROM users WHERE status = ? ORDER BY id", status,
    )
    return users, err
}

// 命名参数(更安全,参数顺序无关)
func CreateUser(ctx context.Context, db *sqlx.DB, u *User) error {
    _, err := db.NamedExecContext(ctx,
        `INSERT INTO users (name, email, age) VALUES (:name, :email, :age)`,
        u,
    )
    return err
}

// IN 查询
func GetUsersByIDs(ctx context.Context, db *sqlx.DB, ids []int64) ([]*User, error) {
    query, args, err := sqlx.In(
        "SELECT id, name, email FROM users WHERE id IN (?)",
        ids,
    )
    if err != nil {
        return nil, err
    }
    query = db.Rebind(query)  // 将 ? 替换为 MySQL 格式

    var users []*User
    err = db.SelectContext(ctx, &users, query, args...)
    return users, err
}

4. GORM ORM 框架

4.1 初始化

import (
    "gorm.io/driver/mysql"
    "gorm.io/gorm"
    "gorm.io/gorm/logger"
    "log"
    "os"
    "time"
)

func NewGORM(dsn string) (*gorm.DB, error) {
    newLogger := logger.New(
        log.New(os.Stdout, "\r\n", log.LstdFlags),
        logger.Config{
            SlowThreshold:             200 * time.Millisecond,
            LogLevel:                  logger.Warn,
            IgnoreRecordNotFoundError: true,
            Colorful:                  true,
        },
    )

    db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{
        Logger:                                   newLogger,
        NamingStrategy: schema.NamingStrategy{
            TablePrefix:   "t_",   // 表名前缀
            SingularTable: true,   // 单数表名(user 而非 users)
        },
        DisableForeignKeyConstraintWhenMigrating: true,
    })
    if err != nil {
        return nil, err
    }

    sqlDB, err := db.DB()
    if err != nil {
        return nil, err
    }
    sqlDB.SetMaxOpenConns(100)
    sqlDB.SetMaxIdleConns(10)
    sqlDB.SetConnMaxLifetime(30 * time.Minute)

    return db, nil
}

4.2 模型定义

import "gorm.io/gorm"

// 嵌入 gorm.Model 自动包含 ID、CreatedAt、UpdatedAt、DeletedAt
type User struct {
    gorm.Model
    Name   string `gorm:"size:64;not null;comment:用户名"`
    Email  string `gorm:"size:128;uniqueIndex;not null"`
    Phone  string `gorm:"size:11;index"`
    Age    int    `gorm:"default:0"`
    Status int8   `gorm:"default:1;index:idx_status_created,priority:1"`
    // 联合索引
    CreatedAt time.Time `gorm:"index:idx_status_created,priority:2"`
}

// 自定义表名
func (User) TableName() string {
    return "users"
}

// 也可以不用 gorm.Model,完全自定义
type Order struct {
    ID        uint      `gorm:"primarykey"`
    UserID    uint      `gorm:"not null;index:idx_user_status,priority:1"`
    Status    int8      `gorm:"not null;index:idx_user_status,priority:2"`
    Amount    float64   `gorm:"type:decimal(10,2);not null"`
    Remark    string    `gorm:"type:text"`
    CreatedAt time.Time `gorm:"not null;default:CURRENT_TIMESTAMP"`
    UpdatedAt time.Time `gorm:"not null;default:CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"`
    DeletedAt gorm.DeletedAt `gorm:"index"`  // 软删除
}

4.3 CRUD 操作

// Create
user := User{Name: "Alice", Email: "alice@example.com"}
result := db.Create(&user)
// user.ID 被自动填充

// 批量插入
users := []User{{Name: "Bob"}, {Name: "Carol"}}
db.CreateInBatches(&users, 100)  // 每批 100 条

// 查询
// 主键查询
var user User
db.First(&user, 1)  // SELECT * FROM users WHERE id = 1 ORDER BY id LIMIT 1
db.First(&user, "id = ?", 1)

// 条件查询
var users []User
db.Where("status = ? AND age > ?", 1, 18).Order("created_at DESC").Limit(20).Find(&users)

// 选择特定字段
db.Select("id", "name", "email").Where("status = 1").Find(&users)

// Pluck(只查一列)
var emails []string
db.Model(&User{}).Where("status = 1").Pluck("email", &emails)

// 更新
// 只更新非零值字段
db.Save(&user)

// 更新指定字段
db.Model(&user).Update("status", 0)
db.Model(&user).Updates(map[string]interface{}{"status": 0, "age": 25})
db.Model(&user).Updates(User{Status: 0, Age: 25})  // 只更新非零值

// 强制更新零值
db.Model(&user).Select("*").Updates(User{Status: 0})

// Delete(软删除,如果有 DeletedAt 字段)
db.Delete(&user)  // UPDATE users SET deleted_at = NOW() WHERE id = ?
// 物理删除
db.Unscoped().Delete(&user)

4.4 关联关系

// 一对多
type User struct {
    gorm.Model
    Name   string
    Orders []Order `gorm:"foreignKey:UserID"`
}

type Order struct {
    gorm.Model
    UserID uint
    Amount float64
    User   User  // 反向关联
}

// 预加载(Preload)
var user User
db.Preload("Orders").First(&user, 1)
// SELECT * FROM users WHERE id = 1;
// SELECT * FROM orders WHERE user_id = 1;

// 带条件的预加载
db.Preload("Orders", "status = ?", 1).Find(&users)

// 连接查询(手动 JOIN)
type UserWithOrderCount struct {
    User
    OrderCount int64
}
var results []UserWithOrderCount
db.Table("users u").
    Select("u.*, COUNT(o.id) as order_count").
    Joins("LEFT JOIN orders o ON u.id = o.user_id AND o.deleted_at IS NULL").
    Group("u.id").
    Find(&results)

4.5 原生 SQL

// 执行原生 SQL
db.Raw("SELECT * FROM users WHERE name = ?", "Alice").Scan(&users)

// Exec
db.Exec("UPDATE users SET status = ? WHERE id > ?", 0, 100)

// 在 GORM 链中使用原生条件
db.Where("id IN (?)", db.Raw("SELECT user_id FROM vip_users")).Find(&users)

5. 连接池深度调优

5.1 关键参数详解

db.SetMaxOpenConns(n)
// 最大打开连接数(使用中 + 空闲)
// 超出此数的请求会等待(block),直到有可用连接或超时
// 建议:根据 MySQL max_connections 的 80%

db.SetMaxIdleConns(n)
// 空闲连接池大小
// 空闲连接维持已认证的 TCP 连接,下次使用无需重新建连
// 太大:浪费连接(MySQL 端也消耗资源)
// 太小:频繁创建连接(每次需要 TCP 握手 + 认证)
// 建议:MaxOpenConns 的 10%~20%

db.SetConnMaxLifetime(d)
// 连接最大存活时间
// 超时的连接在空闲时被关闭(不会强制关闭使用中的连接)
// 设置此值可以:
//   1. 应对 MySQL wait_timeout(空闲连接被服务器断开)
//   2. 在主从切换后让旧连接自然过期
// 建议:< MySQL wait_timeout (通常 600s),设 30min~1h

db.SetConnMaxIdleTime(d)
// 空闲连接最大存活时间(Go 1.15+)
// 比 ConnMaxLifetime 更激进,空闲超时立即关闭
// 适合流量不稳定的场景(高峰期扩展,低谷期收缩)

5.2 不同场景配置

// 场景1:高并发 Web 服务
db.SetMaxOpenConns(100)
db.SetMaxIdleConns(20)
db.SetConnMaxLifetime(30 * time.Minute)
db.SetConnMaxIdleTime(10 * time.Minute)

// 场景2:后台任务(并发低,但需要保持连接)
db.SetMaxOpenConns(20)
db.SetMaxIdleConns(5)
db.SetConnMaxLifetime(time.Hour)
db.SetConnMaxIdleTime(30 * time.Minute)

// 场景3:Lambda/Serverless(每次请求都可能重新实例化)
db.SetMaxOpenConns(2)   // Serverless 并发低
db.SetMaxIdleConns(2)
db.SetConnMaxLifetime(10 * time.Minute)
db.SetConnMaxIdleTime(5 * time.Minute)

5.3 监控连接池

// 定期上报连接池指标
func MonitorDBPool(db *sql.DB, name string) {
    ticker := time.NewTicker(10 * time.Second)
    for range ticker.C {
        stats := db.Stats()
        log.Printf("[%s] pool stats: open=%d, inuse=%d, idle=%d, waitcount=%d, waitduration=%s",
            name,
            stats.OpenConnections,
            stats.InUse,
            stats.Idle,
            stats.WaitCount,
            stats.WaitDuration,
        )
        // 上报到 Prometheus/监控系统
        metrics.Gauge("db.open_conns", float64(stats.OpenConnections), name)
        metrics.Gauge("db.in_use", float64(stats.InUse), name)
        metrics.Counter("db.wait_count", float64(stats.WaitCount), name)
    }
}

6. 事务最佳实践

6.1 封装事务助手

// 通用事务助手(推荐)
func WithTx(ctx context.Context, db *sql.DB, fn func(*sql.Tx) error) (err error) {
    tx, err := db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }

    defer func() {
        if p := recover(); p != nil {
            _ = tx.Rollback()
            panic(p)
        } else if err != nil {
            if rbErr := tx.Rollback(); rbErr != nil {
                err = fmt.Errorf("rollback error: %v; original: %w", rbErr, err)
            }
        } else {
            err = tx.Commit()
        }
    }()

    err = fn(tx)
    return
}

// 支持隔离级别的版本
func WithTxIsolation(ctx context.Context, db *sql.DB, isolation sql.IsolationLevel, fn func(*sql.Tx) error) error {
    tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: isolation})
    if err != nil {
        return err
    }
    defer func() {
        if p := recover(); p != nil {
            tx.Rollback()
            panic(p)
        }
    }()

    if err = fn(tx); err != nil {
        tx.Rollback()
        return err
    }
    return tx.Commit()
}

6.2 分布式事务(本地消息表)

// 本地消息表模式:保证业务操作和消息发送的原子性
func CreateOrderWithMessage(ctx context.Context, db *sql.DB, order Order) error {
    return WithTx(ctx, db, func(tx *sql.Tx) error {
        // 1. 创建订单
        result, err := tx.ExecContext(ctx,
            "INSERT INTO orders (user_id, amount, status) VALUES (?, ?, 'pending')",
            order.UserID, order.Amount,
        )
        if err != nil {
            return err
        }
        orderID, _ := result.LastInsertId()

        // 2. 写入本地消息表(与订单在同一事务)
        _, err = tx.ExecContext(ctx,
            "INSERT INTO outbox_messages (type, payload, status) VALUES ('ORDER_CREATED', ?, 'pending')",
            fmt.Sprintf(`{"order_id": %d}`, orderID),
        )
        return err
    })
    // 后台进程定时扫描 outbox_messages,发送消息后标记为已发送
}

7. 批量操作优化

7.1 批量插入

// 方案1:手动拼接(最快)
func BulkInsert(ctx context.Context, db *sql.DB, users []User) error {
    if len(users) == 0 {
        return nil
    }

    const batchSize = 500
    for i := 0; i < len(users); i += batchSize {
        end := i + batchSize
        if end > len(users) {
            end = len(users)
        }
        batch := users[i:end]

        placeholders := make([]string, len(batch))
        args := make([]interface{}, 0, len(batch)*3)
        for j, u := range batch {
            placeholders[j] = "(?, ?, ?)"
            args = append(args, u.Name, u.Email, u.Age)
        }

        query := "INSERT INTO users (name, email, age) VALUES " +
            strings.Join(placeholders, ",")
        if _, err := db.ExecContext(ctx, query, args...); err != nil {
            return fmt.Errorf("batch insert at %d: %w", i, err)
        }
    }
    return nil
}

// 方案2:GORM CreateInBatches
db.CreateInBatches(users, 500)

// 方案3:LOAD DATA INFILE(最快,适合大批量导入)
func LoadDataFromCSV(ctx context.Context, db *sql.DB, csvPath string) error {
    _, err := db.ExecContext(ctx, fmt.Sprintf(`
        LOAD DATA LOCAL INFILE '%s'
        INTO TABLE users
        FIELDS TERMINATED BY ','
        LINES TERMINATED BY '\n'
        (name, email, age)
    `, csvPath))
    return err
}

7.2 批量更新

// 方案1:CASE WHEN(单次 SQL 更新多行)
func BulkUpdateStatus(ctx context.Context, db *sql.DB, updates map[int64]int8) error {
    if len(updates) == 0 {
        return nil
    }

    cases := make([]string, 0, len(updates))
    ids := make([]interface{}, 0, len(updates))
    for id, status := range updates {
        cases = append(cases, fmt.Sprintf("WHEN %d THEN %d", id, status))
        ids = append(ids, id)
    }

    query := fmt.Sprintf(
        "UPDATE users SET status = CASE id %s END WHERE id IN (%s)",
        strings.Join(cases, " "),
        strings.Repeat("?,", len(ids)-1)+"?",
    )
    _, err := db.ExecContext(ctx, query, ids...)
    return err
}

// 方案2:ON DUPLICATE KEY UPDATE(upsert)
func UpsertUsers(ctx context.Context, db *sql.DB, users []User) error {
    if len(users) == 0 {
        return nil
    }

    placeholders := make([]string, len(users))
    args := make([]interface{}, 0, len(users)*4)
    for i, u := range users {
        placeholders[i] = "(?, ?, ?, ?)"
        args = append(args, u.ID, u.Name, u.Email, u.Age)
    }

    query := "INSERT INTO users (id, name, email, age) VALUES " +
        strings.Join(placeholders, ",") +
        " ON DUPLICATE KEY UPDATE name=VALUES(name), email=VALUES(email), age=VALUES(age)"

    _, err := db.ExecContext(ctx, query, args...)
    return err
}

8. 数据库迁移(Migrate)

8.1 golang-migrate

# 安装 CLI
go install -tags 'mysql' github.com/golang-migrate/migrate/v4/cmd/migrate@latest

# 创建迁移文件
migrate create -ext sql -dir migrations -seq create_users_table
# 生成:
# migrations/000001_create_users_table.up.sql
# migrations/000001_create_users_table.down.sql
-- 000001_create_users_table.up.sql
CREATE TABLE IF NOT EXISTS users (
  id         BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
  name       VARCHAR(64)  NOT NULL,
  email      VARCHAR(128) NOT NULL,
  status     TINYINT      NOT NULL DEFAULT 1,
  created_at DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP,
  updated_at DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  UNIQUE KEY uk_email (email),
  KEY idx_status (status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 000001_create_users_table.down.sql
DROP TABLE IF EXISTS users;
# 执行迁移
migrate -path migrations -database "mysql://user:pass@tcp(localhost:3306)/myapp" up

# 回滚
migrate -path migrations -database "..." down 1

# 查看版本
migrate -path migrations -database "..." version
// 代码中集成
import (
    "github.com/golang-migrate/migrate/v4"
    _ "github.com/golang-migrate/migrate/v4/database/mysql"
    _ "github.com/golang-migrate/migrate/v4/source/file"
)

func RunMigrations(dsn string) error {
    m, err := migrate.New(
        "file://migrations",
        "mysql://"+dsn,
    )
    if err != nil {
        return err
    }
    defer m.Close()

    if err = m.Up(); err != nil && err != migrate.ErrNoChange {
        return err
    }
    return nil
}

8.2 goose(嵌入式迁移)

import "github.com/pressly/goose/v3"

//go:embed migrations/*.sql
var embedMigrations embed.FS

func RunMigrationsGoose(db *sql.DB) error {
    goose.SetBaseFS(embedMigrations)

    if err := goose.SetDialect("mysql"); err != nil {
        return err
    }

    return goose.Up(db, "migrations")
}

9. 测试策略

9.1 集成测试(真实数据库)

// testcontainers-go 启动测试数据库
import (
    "github.com/testcontainers/testcontainers-go"
    "github.com/testcontainers/testcontainers-go/modules/mysql"
)

func TestMain(m *testing.M) {
    ctx := context.Background()

    container, err := mysqlmodule.RunContainer(ctx,
        testcontainers.WithImage("mysql:8.0"),
        mysql.WithDatabase("testdb"),
        mysql.WithUsername("root"),
        mysql.WithPassword("test"),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer container.Terminate(ctx)

    dsn, _ := container.ConnectionString(ctx)
    testDB, _ = sql.Open("mysql", dsn+"?parseTime=True")

    // 运行迁移
    RunMigrations(dsn)

    os.Exit(m.Run())
}

var testDB *sql.DB

func TestCreateUser(t *testing.T) {
    ctx := context.Background()
    id, err := CreateUser(ctx, testDB, &User{Name: "Test", Email: "test@example.com"})
    require.NoError(t, err)
    assert.Positive(t, id)
}

9.2 单元测试(Mock)

import "github.com/DATA-DOG/go-sqlmock"

func TestGetUser(t *testing.T) {
    db, mock, err := sqlmock.New()
    require.NoError(t, err)
    defer db.Close()

    expectedUser := &User{ID: 1, Name: "Alice", Email: "alice@example.com"}
    rows := sqlmock.NewRows([]string{"id", "name", "email", "age", "created_at", "updated_at"}).
        AddRow(expectedUser.ID, expectedUser.Name, expectedUser.Email, 25, time.Now(), time.Now())

    mock.ExpectQuery("SELECT .+ FROM users WHERE id = ?").
        WithArgs(1).
        WillReturnRows(rows)

    user, err := GetUserByID(context.Background(), db, 1)
    require.NoError(t, err)
    assert.Equal(t, expectedUser.Name, user.Name)

    assert.NoError(t, mock.ExpectationsWereMet())
}

9.3 测试辅助函数

// 数据清理(每个测试用例前后清理)
func truncateTables(t *testing.T, db *sql.DB, tables ...string) {
    t.Helper()
    for _, table := range tables {
        _, err := db.Exec("TRUNCATE TABLE " + table)
        require.NoError(t, err)
    }
}

// 使用 testify/suite 组织测试
type UserRepoSuite struct {
    suite.Suite
    db   *sql.DB
    repo *UserRepo
}

func (s *UserRepoSuite) SetupTest() {
    truncateTables(s.T(), s.db, "users", "orders")
}

func (s *UserRepoSuite) TestCreateAndGet() {
    // ...
}

10. 常见坑与最佳实践

10.1 时区问题

// ❌ DSN 不指定时区,parseTime=True 时使用 UTC
dsn := "user:pass@tcp(host:3306)/db?parseTime=True"
// 从 DATETIME 列读出的 time.Time 是 UTC!

// ✅ 显式指定时区
dsn := "user:pass@tcp(host:3306)/db?parseTime=True&loc=Asia%2FShanghai"
// 或使用 Local
dsn := "...?parseTime=True&loc=Local"

// ✅ 同时设置 MySQL 时区
// my.cnf: default-time-zone = '+8:00'

10.2 DECIMAL 精度

// ❌ float64 有精度问题
var amount float64
rows.Scan(&amount)
// amount = 99.99999999 而不是 100.00

// ✅ 用 string 接收,再用 decimal 库处理
import "github.com/shopspring/decimal"

var amountStr string
rows.Scan(&amountStr)
amount, err := decimal.NewFromString(amountStr)

// GORM 中
type Order struct {
    Amount decimal.Decimal `gorm:"type:decimal(10,2)"`
}

10.3 大 offset 分页

// ❌ 深分页(第 1000 页)
db.Offset(10000).Limit(20).Find(&orders)
// SQL: SELECT * FROM orders LIMIT 20 OFFSET 10000
// 需要扫描并丢弃前 10000 行

// ✅ 游标分页
type PageCursor struct {
    LastID    int64
    PageSize  int
}

func ListOrdersWithCursor(ctx context.Context, db *gorm.DB, cursor PageCursor) ([]Order, int64, error) {
    var orders []Order
    query := db.WithContext(ctx).
        Where("id > ?", cursor.LastID).
        Order("id ASC").
        Limit(cursor.PageSize)

    err := query.Find(&orders).Error
    if err != nil || len(orders) == 0 {
        return orders, 0, err
    }
    nextID := orders[len(orders)-1].ID
    return orders, nextID, nil
}

10.4 N+1 查询问题

// ❌ N+1 问题
users, _ := listUsers()        // 1次查询
for _, u := range users {
    orders, _ := getOrders(u.ID) // N次查询!
}

// ✅ 方案1:JOIN 查询
db.Preload("Orders").Find(&users)  // GORM Preload,2次查询

// ✅ 方案2:IN 查询(手动)
userIDs := extractIDs(users)
orders := getOrdersByUserIDs(userIDs)  // 1次 IN 查询
orderMap := groupByUserID(orders)
for _, u := range users {
    u.Orders = orderMap[u.ID]
}

10.5 Go 错误处理

import "github.com/go-sql-driver/mysql"

// 检查具体的 MySQL 错误码
func handleMySQLError(err error) {
    var mysqlErr *mysql.MySQLError
    if !errors.As(err, &mysqlErr) {
        return
    }

    switch mysqlErr.Number {
    case 1062:
        // 唯一键冲突
        fmt.Println("duplicate entry:", mysqlErr.Message)
    case 1213:
        // 死锁
        fmt.Println("deadlock detected, retry")
    case 1205:
        // 锁等待超时
        fmt.Println("lock wait timeout")
    case 1406:
        // 数据太长
        fmt.Println("data too long")
    }
}

// 常用 MySQL 错误码
const (
    ErrDupEntry       = 1062
    ErrDeadlock       = 1213
    ErrLockTimeout    = 1205
    ErrDataTooLong    = 1406
    ErrAccessDenied   = 1045
    ErrTooManyConn    = 1040
)

小结

Go + MySQL 最佳实践清单:

连接管理:
  ✅ 全局 *sql.DB 单例,不要每次创建
  ✅ 设置合理的连接池参数(MaxOpenConns/MaxIdleConns/MaxLifetime)
  ✅ DSN 指定 parseTime=True&loc=Local&timeout=5s
  ✅ 监控连接池状态(db.Stats())

SQL 安全:
  ✅ 永远使用占位符(?),不拼接 SQL
  ✅ 批量操作用 IN 语句而非循环单条
  ✅ DECIMAL 列用 string 接收,decimal 库处理

事务:
  ✅ 使用 defer + Rollback 保底
  ✅ 事务内不做耗时操作(网络调用等)
  ✅ 批量操作分批事务,避免超大事务

查询:
  ✅ 只查需要的列,避免 SELECT *
  ✅ 深分页用游标,避免大 OFFSET
  ✅ 预加载关联数据,避免 N+1

测试:
  ✅ 集成测试用 testcontainers
  ✅ 单元测试用 sqlmock
  ✅ 每个测试用例独立清理数据

系列完结

至此,MySQL 系列博客全部完成:

序号 标题 核心内容
01 概述与快速上手 安装、基本操作、Go 初体验
02 架构深度解析 Server层、InnoDB、SQL执行链路
03 索引原理与优化 B+Tree、聚簇索引、覆盖索引、EXPLAIN
04 SQL 语法大全 DDL/DML/窗口函数/CTE/JSON
05 事务与 MVCC ACID、隔离级别、Read View
06 锁机制详解 行锁、间隙锁、Next-Key Lock、死锁
07 日志系统 Redo Log、Undo Log、Binlog、2PC
08 查询优化 EXPLAIN深度解读、SQL改写
09 高可用架构 主从复制、MGR、MHA、ProxySQL
10 配置参数 内存/连接/InnoDB/安全配置
11 常见问题定位 慢查询/死锁/复制异常/磁盘满
12 Go 开发实战 database/sql/GORM/迁移/测试