MySQL-13 Go 开发实战
目录
MySQL Go 开发实战
目录
1. 驱动与框架选择
1.1 生态全景
驱动层(必须):
github.com/go-sql-driver/mysql 最主流,go-sql-driver
github.com/pingcap/tidb/... TiDB 官方驱动(兼容 MySQL)
增强 SQL 库:
github.com/jmoiron/sqlx database/sql 的薄封装,保留 SQL 控制力
github.com/blockloop/scan 轻量扫描库
ORM 框架:
gorm.io/gorm Go 最流行的 ORM(v2)
github.com/ent/ent Facebook 开源,代码生成,类型安全
github.com/uptrace/bun 现代 ORM,SQL-first
查询构建器:
github.com/Masterminds/squirrel SQL 构建器,灵活
github.com/doug-martin/goqu 功能丰富的查询构建器
迁移工具:
github.com/golang-migrate/migrate 支持多种数据库
github.com/pressly/goose 轻量,内嵌迁移
atlas (ariga.io/atlas) 现代 schema 管理
1.2 选型建议
| 场景 | 推荐 |
|---|---|
| 复杂 SQL、高性能场景 | sqlx + squirrel |
| 快速业务开发 | GORM v2 |
| 类型安全、大型项目 | ent |
| 学习/简单脚本 | database/sql |
2. database/sql 标准库
2.1 完整连接示例
package main
import (
"context"
"database/sql"
"fmt"
"log"
"time"
_ "github.com/go-sql-driver/mysql"
)
type Config struct {
Host string
Port int
User string
Password string
DBName string
MaxOpenConns int
MaxIdleConns int
ConnMaxLifetime time.Duration
ConnMaxIdleTime time.Duration
}
func NewMySQL(cfg Config) (*sql.DB, error) {
dsn := fmt.Sprintf(
"%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local&timeout=5s&readTimeout=10s&writeTimeout=5s",
cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.DBName,
)
db, err := sql.Open("mysql", dsn)
if err != nil {
return nil, fmt.Errorf("open mysql: %w", err)
}
// 连接池配置
db.SetMaxOpenConns(cfg.MaxOpenConns)
db.SetMaxIdleConns(cfg.MaxIdleConns)
db.SetConnMaxLifetime(cfg.ConnMaxLifetime)
db.SetConnMaxIdleTime(cfg.ConnMaxIdleTime)
// 验证连通性
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err = db.PingContext(ctx); err != nil {
db.Close()
return nil, fmt.Errorf("ping mysql: %w", err)
}
return db, nil
}
2.2 CRUD 完整示例
type User struct {
ID int64
Name string
Email string
Age int
CreatedAt time.Time
UpdatedAt time.Time
DeletedAt sql.NullTime // 可空时间
}
// INSERT
func CreateUser(ctx context.Context, db *sql.DB, u *User) (int64, error) {
result, err := db.ExecContext(ctx,
`INSERT INTO users (name, email, age) VALUES (?, ?, ?)`,
u.Name, u.Email, u.Age,
)
if err != nil {
return 0, fmt.Errorf("insert user: %w", err)
}
return result.LastInsertId()
}
// SELECT 单行
func GetUserByID(ctx context.Context, db *sql.DB, id int64) (*User, error) {
var u User
err := db.QueryRowContext(ctx,
`SELECT id, name, email, age, created_at, updated_at FROM users WHERE id = ? AND deleted_at IS NULL`,
id,
).Scan(&u.ID, &u.Name, &u.Email, &u.Age, &u.CreatedAt, &u.UpdatedAt)
if err == sql.ErrNoRows {
return nil, nil // 未找到,返回 nil 而非错误
}
if err != nil {
return nil, fmt.Errorf("get user %d: %w", id, err)
}
return &u, nil
}
// SELECT 多行
func ListUsers(ctx context.Context, db *sql.DB, page, size int) ([]*User, error) {
rows, err := db.QueryContext(ctx,
`SELECT id, name, email, age, created_at FROM users
WHERE deleted_at IS NULL
ORDER BY id DESC
LIMIT ? OFFSET ?`,
size, (page-1)*size,
)
if err != nil {
return nil, fmt.Errorf("list users: %w", err)
}
defer rows.Close()
var users []*User
for rows.Next() {
var u User
if err = rows.Scan(&u.ID, &u.Name, &u.Email, &u.Age, &u.CreatedAt); err != nil {
return nil, fmt.Errorf("scan user: %w", err)
}
users = append(users, &u)
}
// 必须检查 rows.Err()(可能在 Next() 期间发生错误)
if err = rows.Err(); err != nil {
return nil, fmt.Errorf("rows error: %w", err)
}
return users, nil
}
// UPDATE
func UpdateUser(ctx context.Context, db *sql.DB, u *User) error {
result, err := db.ExecContext(ctx,
`UPDATE users SET name = ?, email = ?, age = ? WHERE id = ? AND deleted_at IS NULL`,
u.Name, u.Email, u.Age, u.ID,
)
if err != nil {
return fmt.Errorf("update user %d: %w", u.ID, err)
}
n, _ := result.RowsAffected()
if n == 0 {
return fmt.Errorf("user %d not found or already deleted", u.ID)
}
return nil
}
// 软删除
func DeleteUser(ctx context.Context, db *sql.DB, id int64) error {
_, err := db.ExecContext(ctx,
`UPDATE users SET deleted_at = NOW() WHERE id = ? AND deleted_at IS NULL`,
id,
)
return err
}
2.3 NULL 值处理
// 可空类型
import "database/sql"
type UserProfile struct {
ID int64
Name string
Phone sql.NullString // 可空字符串
Age sql.NullInt64 // 可空整数
Score sql.NullFloat64 // 可空浮点
Bio sql.NullString
}
// 扫描可空字段
var u UserProfile
rows.Scan(&u.ID, &u.Name, &u.Phone, &u.Age)
// 使用可空字段
if u.Phone.Valid {
fmt.Println(u.Phone.String)
}
// 推荐:使用指针替代 sql.Null* 类型(更直观)
type UserProfileV2 struct {
ID int64
Name string
Phone *string // nil 表示 NULL
Age *int
}
3. sqlx 增强库
sqlx 是对 database/sql 的薄封装,增加了:
- 结构体自动扫描(StructScan)
- 命名参数(NamedExec)
- IN 查询辅助
- 批量插入
import "github.com/jmoiron/sqlx"
type User struct {
ID int64 `db:"id"`
Name string `db:"name"`
Email string `db:"email"`
Age int `db:"age"`
CreatedAt time.Time `db:"created_at"`
}
func NewSQLX(dsn string) (*sqlx.DB, error) {
db, err := sqlx.Connect("mysql", dsn)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(100)
db.SetMaxIdleConns(10)
db.SetConnMaxLifetime(30 * time.Minute)
return db, nil
}
// 自动映射到结构体(不需要手动 Scan)
func GetUser(ctx context.Context, db *sqlx.DB, id int64) (*User, error) {
var u User
err := db.GetContext(ctx, &u,
"SELECT id, name, email, age, created_at FROM users WHERE id = ?", id,
)
if err == sql.ErrNoRows {
return nil, nil
}
return &u, err
}
// 查询多行
func ListUsers(ctx context.Context, db *sqlx.DB, status int) ([]*User, error) {
var users []*User
err := db.SelectContext(ctx, &users,
"SELECT id, name, email, age FROM users WHERE status = ? ORDER BY id", status,
)
return users, err
}
// 命名参数(更安全,参数顺序无关)
func CreateUser(ctx context.Context, db *sqlx.DB, u *User) error {
_, err := db.NamedExecContext(ctx,
`INSERT INTO users (name, email, age) VALUES (:name, :email, :age)`,
u,
)
return err
}
// IN 查询
func GetUsersByIDs(ctx context.Context, db *sqlx.DB, ids []int64) ([]*User, error) {
query, args, err := sqlx.In(
"SELECT id, name, email FROM users WHERE id IN (?)",
ids,
)
if err != nil {
return nil, err
}
query = db.Rebind(query) // 将 ? 替换为 MySQL 格式
var users []*User
err = db.SelectContext(ctx, &users, query, args...)
return users, err
}
4. GORM ORM 框架
4.1 初始化
import (
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
"log"
"os"
"time"
)
func NewGORM(dsn string) (*gorm.DB, error) {
newLogger := logger.New(
log.New(os.Stdout, "\r\n", log.LstdFlags),
logger.Config{
SlowThreshold: 200 * time.Millisecond,
LogLevel: logger.Warn,
IgnoreRecordNotFoundError: true,
Colorful: true,
},
)
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: newLogger,
NamingStrategy: schema.NamingStrategy{
TablePrefix: "t_", // 表名前缀
SingularTable: true, // 单数表名(user 而非 users)
},
DisableForeignKeyConstraintWhenMigrating: true,
})
if err != nil {
return nil, err
}
sqlDB, err := db.DB()
if err != nil {
return nil, err
}
sqlDB.SetMaxOpenConns(100)
sqlDB.SetMaxIdleConns(10)
sqlDB.SetConnMaxLifetime(30 * time.Minute)
return db, nil
}
4.2 模型定义
import "gorm.io/gorm"
// 嵌入 gorm.Model 自动包含 ID、CreatedAt、UpdatedAt、DeletedAt
type User struct {
gorm.Model
Name string `gorm:"size:64;not null;comment:用户名"`
Email string `gorm:"size:128;uniqueIndex;not null"`
Phone string `gorm:"size:11;index"`
Age int `gorm:"default:0"`
Status int8 `gorm:"default:1;index:idx_status_created,priority:1"`
// 联合索引
CreatedAt time.Time `gorm:"index:idx_status_created,priority:2"`
}
// 自定义表名
func (User) TableName() string {
return "users"
}
// 也可以不用 gorm.Model,完全自定义
type Order struct {
ID uint `gorm:"primarykey"`
UserID uint `gorm:"not null;index:idx_user_status,priority:1"`
Status int8 `gorm:"not null;index:idx_user_status,priority:2"`
Amount float64 `gorm:"type:decimal(10,2);not null"`
Remark string `gorm:"type:text"`
CreatedAt time.Time `gorm:"not null;default:CURRENT_TIMESTAMP"`
UpdatedAt time.Time `gorm:"not null;default:CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"`
DeletedAt gorm.DeletedAt `gorm:"index"` // 软删除
}
4.3 CRUD 操作
// Create
user := User{Name: "Alice", Email: "alice@example.com"}
result := db.Create(&user)
// user.ID 被自动填充
// 批量插入
users := []User{{Name: "Bob"}, {Name: "Carol"}}
db.CreateInBatches(&users, 100) // 每批 100 条
// 查询
// 主键查询
var user User
db.First(&user, 1) // SELECT * FROM users WHERE id = 1 ORDER BY id LIMIT 1
db.First(&user, "id = ?", 1)
// 条件查询
var users []User
db.Where("status = ? AND age > ?", 1, 18).Order("created_at DESC").Limit(20).Find(&users)
// 选择特定字段
db.Select("id", "name", "email").Where("status = 1").Find(&users)
// Pluck(只查一列)
var emails []string
db.Model(&User{}).Where("status = 1").Pluck("email", &emails)
// 更新
// 只更新非零值字段
db.Save(&user)
// 更新指定字段
db.Model(&user).Update("status", 0)
db.Model(&user).Updates(map[string]interface{}{"status": 0, "age": 25})
db.Model(&user).Updates(User{Status: 0, Age: 25}) // 只更新非零值
// 强制更新零值
db.Model(&user).Select("*").Updates(User{Status: 0})
// Delete(软删除,如果有 DeletedAt 字段)
db.Delete(&user) // UPDATE users SET deleted_at = NOW() WHERE id = ?
// 物理删除
db.Unscoped().Delete(&user)
4.4 关联关系
// 一对多
type User struct {
gorm.Model
Name string
Orders []Order `gorm:"foreignKey:UserID"`
}
type Order struct {
gorm.Model
UserID uint
Amount float64
User User // 反向关联
}
// 预加载(Preload)
var user User
db.Preload("Orders").First(&user, 1)
// SELECT * FROM users WHERE id = 1;
// SELECT * FROM orders WHERE user_id = 1;
// 带条件的预加载
db.Preload("Orders", "status = ?", 1).Find(&users)
// 连接查询(手动 JOIN)
type UserWithOrderCount struct {
User
OrderCount int64
}
var results []UserWithOrderCount
db.Table("users u").
Select("u.*, COUNT(o.id) as order_count").
Joins("LEFT JOIN orders o ON u.id = o.user_id AND o.deleted_at IS NULL").
Group("u.id").
Find(&results)
4.5 原生 SQL
// 执行原生 SQL
db.Raw("SELECT * FROM users WHERE name = ?", "Alice").Scan(&users)
// Exec
db.Exec("UPDATE users SET status = ? WHERE id > ?", 0, 100)
// 在 GORM 链中使用原生条件
db.Where("id IN (?)", db.Raw("SELECT user_id FROM vip_users")).Find(&users)
5. 连接池深度调优
5.1 关键参数详解
db.SetMaxOpenConns(n)
// 最大打开连接数(使用中 + 空闲)
// 超出此数的请求会等待(block),直到有可用连接或超时
// 建议:根据 MySQL max_connections 的 80%
db.SetMaxIdleConns(n)
// 空闲连接池大小
// 空闲连接维持已认证的 TCP 连接,下次使用无需重新建连
// 太大:浪费连接(MySQL 端也消耗资源)
// 太小:频繁创建连接(每次需要 TCP 握手 + 认证)
// 建议:MaxOpenConns 的 10%~20%
db.SetConnMaxLifetime(d)
// 连接最大存活时间
// 超时的连接在空闲时被关闭(不会强制关闭使用中的连接)
// 设置此值可以:
// 1. 应对 MySQL wait_timeout(空闲连接被服务器断开)
// 2. 在主从切换后让旧连接自然过期
// 建议:< MySQL wait_timeout (通常 600s),设 30min~1h
db.SetConnMaxIdleTime(d)
// 空闲连接最大存活时间(Go 1.15+)
// 比 ConnMaxLifetime 更激进,空闲超时立即关闭
// 适合流量不稳定的场景(高峰期扩展,低谷期收缩)
5.2 不同场景配置
// 场景1:高并发 Web 服务
db.SetMaxOpenConns(100)
db.SetMaxIdleConns(20)
db.SetConnMaxLifetime(30 * time.Minute)
db.SetConnMaxIdleTime(10 * time.Minute)
// 场景2:后台任务(并发低,但需要保持连接)
db.SetMaxOpenConns(20)
db.SetMaxIdleConns(5)
db.SetConnMaxLifetime(time.Hour)
db.SetConnMaxIdleTime(30 * time.Minute)
// 场景3:Lambda/Serverless(每次请求都可能重新实例化)
db.SetMaxOpenConns(2) // Serverless 并发低
db.SetMaxIdleConns(2)
db.SetConnMaxLifetime(10 * time.Minute)
db.SetConnMaxIdleTime(5 * time.Minute)
5.3 监控连接池
// 定期上报连接池指标
func MonitorDBPool(db *sql.DB, name string) {
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
stats := db.Stats()
log.Printf("[%s] pool stats: open=%d, inuse=%d, idle=%d, waitcount=%d, waitduration=%s",
name,
stats.OpenConnections,
stats.InUse,
stats.Idle,
stats.WaitCount,
stats.WaitDuration,
)
// 上报到 Prometheus/监控系统
metrics.Gauge("db.open_conns", float64(stats.OpenConnections), name)
metrics.Gauge("db.in_use", float64(stats.InUse), name)
metrics.Counter("db.wait_count", float64(stats.WaitCount), name)
}
}
6. 事务最佳实践
6.1 封装事务助手
// 通用事务助手(推荐)
func WithTx(ctx context.Context, db *sql.DB, fn func(*sql.Tx) error) (err error) {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer func() {
if p := recover(); p != nil {
_ = tx.Rollback()
panic(p)
} else if err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
err = fmt.Errorf("rollback error: %v; original: %w", rbErr, err)
}
} else {
err = tx.Commit()
}
}()
err = fn(tx)
return
}
// 支持隔离级别的版本
func WithTxIsolation(ctx context.Context, db *sql.DB, isolation sql.IsolationLevel, fn func(*sql.Tx) error) error {
tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: isolation})
if err != nil {
return err
}
defer func() {
if p := recover(); p != nil {
tx.Rollback()
panic(p)
}
}()
if err = fn(tx); err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}
6.2 分布式事务(本地消息表)
// 本地消息表模式:保证业务操作和消息发送的原子性
func CreateOrderWithMessage(ctx context.Context, db *sql.DB, order Order) error {
return WithTx(ctx, db, func(tx *sql.Tx) error {
// 1. 创建订单
result, err := tx.ExecContext(ctx,
"INSERT INTO orders (user_id, amount, status) VALUES (?, ?, 'pending')",
order.UserID, order.Amount,
)
if err != nil {
return err
}
orderID, _ := result.LastInsertId()
// 2. 写入本地消息表(与订单在同一事务)
_, err = tx.ExecContext(ctx,
"INSERT INTO outbox_messages (type, payload, status) VALUES ('ORDER_CREATED', ?, 'pending')",
fmt.Sprintf(`{"order_id": %d}`, orderID),
)
return err
})
// 后台进程定时扫描 outbox_messages,发送消息后标记为已发送
}
7. 批量操作优化
7.1 批量插入
// 方案1:手动拼接(最快)
func BulkInsert(ctx context.Context, db *sql.DB, users []User) error {
if len(users) == 0 {
return nil
}
const batchSize = 500
for i := 0; i < len(users); i += batchSize {
end := i + batchSize
if end > len(users) {
end = len(users)
}
batch := users[i:end]
placeholders := make([]string, len(batch))
args := make([]interface{}, 0, len(batch)*3)
for j, u := range batch {
placeholders[j] = "(?, ?, ?)"
args = append(args, u.Name, u.Email, u.Age)
}
query := "INSERT INTO users (name, email, age) VALUES " +
strings.Join(placeholders, ",")
if _, err := db.ExecContext(ctx, query, args...); err != nil {
return fmt.Errorf("batch insert at %d: %w", i, err)
}
}
return nil
}
// 方案2:GORM CreateInBatches
db.CreateInBatches(users, 500)
// 方案3:LOAD DATA INFILE(最快,适合大批量导入)
func LoadDataFromCSV(ctx context.Context, db *sql.DB, csvPath string) error {
_, err := db.ExecContext(ctx, fmt.Sprintf(`
LOAD DATA LOCAL INFILE '%s'
INTO TABLE users
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
(name, email, age)
`, csvPath))
return err
}
7.2 批量更新
// 方案1:CASE WHEN(单次 SQL 更新多行)
func BulkUpdateStatus(ctx context.Context, db *sql.DB, updates map[int64]int8) error {
if len(updates) == 0 {
return nil
}
cases := make([]string, 0, len(updates))
ids := make([]interface{}, 0, len(updates))
for id, status := range updates {
cases = append(cases, fmt.Sprintf("WHEN %d THEN %d", id, status))
ids = append(ids, id)
}
query := fmt.Sprintf(
"UPDATE users SET status = CASE id %s END WHERE id IN (%s)",
strings.Join(cases, " "),
strings.Repeat("?,", len(ids)-1)+"?",
)
_, err := db.ExecContext(ctx, query, ids...)
return err
}
// 方案2:ON DUPLICATE KEY UPDATE(upsert)
func UpsertUsers(ctx context.Context, db *sql.DB, users []User) error {
if len(users) == 0 {
return nil
}
placeholders := make([]string, len(users))
args := make([]interface{}, 0, len(users)*4)
for i, u := range users {
placeholders[i] = "(?, ?, ?, ?)"
args = append(args, u.ID, u.Name, u.Email, u.Age)
}
query := "INSERT INTO users (id, name, email, age) VALUES " +
strings.Join(placeholders, ",") +
" ON DUPLICATE KEY UPDATE name=VALUES(name), email=VALUES(email), age=VALUES(age)"
_, err := db.ExecContext(ctx, query, args...)
return err
}
8. 数据库迁移(Migrate)
8.1 golang-migrate
# 安装 CLI
go install -tags 'mysql' github.com/golang-migrate/migrate/v4/cmd/migrate@latest
# 创建迁移文件
migrate create -ext sql -dir migrations -seq create_users_table
# 生成:
# migrations/000001_create_users_table.up.sql
# migrations/000001_create_users_table.down.sql
-- 000001_create_users_table.up.sql
CREATE TABLE IF NOT EXISTS users (
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(64) NOT NULL,
email VARCHAR(128) NOT NULL,
status TINYINT NOT NULL DEFAULT 1,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_email (email),
KEY idx_status (status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 000001_create_users_table.down.sql
DROP TABLE IF EXISTS users;
# 执行迁移
migrate -path migrations -database "mysql://user:pass@tcp(localhost:3306)/myapp" up
# 回滚
migrate -path migrations -database "..." down 1
# 查看版本
migrate -path migrations -database "..." version
// 代码中集成
import (
"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/mysql"
_ "github.com/golang-migrate/migrate/v4/source/file"
)
func RunMigrations(dsn string) error {
m, err := migrate.New(
"file://migrations",
"mysql://"+dsn,
)
if err != nil {
return err
}
defer m.Close()
if err = m.Up(); err != nil && err != migrate.ErrNoChange {
return err
}
return nil
}
8.2 goose(嵌入式迁移)
import "github.com/pressly/goose/v3"
//go:embed migrations/*.sql
var embedMigrations embed.FS
func RunMigrationsGoose(db *sql.DB) error {
goose.SetBaseFS(embedMigrations)
if err := goose.SetDialect("mysql"); err != nil {
return err
}
return goose.Up(db, "migrations")
}
9. 测试策略
9.1 集成测试(真实数据库)
// testcontainers-go 启动测试数据库
import (
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/mysql"
)
func TestMain(m *testing.M) {
ctx := context.Background()
container, err := mysqlmodule.RunContainer(ctx,
testcontainers.WithImage("mysql:8.0"),
mysql.WithDatabase("testdb"),
mysql.WithUsername("root"),
mysql.WithPassword("test"),
)
if err != nil {
log.Fatal(err)
}
defer container.Terminate(ctx)
dsn, _ := container.ConnectionString(ctx)
testDB, _ = sql.Open("mysql", dsn+"?parseTime=True")
// 运行迁移
RunMigrations(dsn)
os.Exit(m.Run())
}
var testDB *sql.DB
func TestCreateUser(t *testing.T) {
ctx := context.Background()
id, err := CreateUser(ctx, testDB, &User{Name: "Test", Email: "test@example.com"})
require.NoError(t, err)
assert.Positive(t, id)
}
9.2 单元测试(Mock)
import "github.com/DATA-DOG/go-sqlmock"
func TestGetUser(t *testing.T) {
db, mock, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()
expectedUser := &User{ID: 1, Name: "Alice", Email: "alice@example.com"}
rows := sqlmock.NewRows([]string{"id", "name", "email", "age", "created_at", "updated_at"}).
AddRow(expectedUser.ID, expectedUser.Name, expectedUser.Email, 25, time.Now(), time.Now())
mock.ExpectQuery("SELECT .+ FROM users WHERE id = ?").
WithArgs(1).
WillReturnRows(rows)
user, err := GetUserByID(context.Background(), db, 1)
require.NoError(t, err)
assert.Equal(t, expectedUser.Name, user.Name)
assert.NoError(t, mock.ExpectationsWereMet())
}
9.3 测试辅助函数
// 数据清理(每个测试用例前后清理)
func truncateTables(t *testing.T, db *sql.DB, tables ...string) {
t.Helper()
for _, table := range tables {
_, err := db.Exec("TRUNCATE TABLE " + table)
require.NoError(t, err)
}
}
// 使用 testify/suite 组织测试
type UserRepoSuite struct {
suite.Suite
db *sql.DB
repo *UserRepo
}
func (s *UserRepoSuite) SetupTest() {
truncateTables(s.T(), s.db, "users", "orders")
}
func (s *UserRepoSuite) TestCreateAndGet() {
// ...
}
10. 常见坑与最佳实践
10.1 时区问题
// ❌ DSN 不指定时区,parseTime=True 时使用 UTC
dsn := "user:pass@tcp(host:3306)/db?parseTime=True"
// 从 DATETIME 列读出的 time.Time 是 UTC!
// ✅ 显式指定时区
dsn := "user:pass@tcp(host:3306)/db?parseTime=True&loc=Asia%2FShanghai"
// 或使用 Local
dsn := "...?parseTime=True&loc=Local"
// ✅ 同时设置 MySQL 时区
// my.cnf: default-time-zone = '+8:00'
10.2 DECIMAL 精度
// ❌ float64 有精度问题
var amount float64
rows.Scan(&amount)
// amount = 99.99999999 而不是 100.00
// ✅ 用 string 接收,再用 decimal 库处理
import "github.com/shopspring/decimal"
var amountStr string
rows.Scan(&amountStr)
amount, err := decimal.NewFromString(amountStr)
// GORM 中
type Order struct {
Amount decimal.Decimal `gorm:"type:decimal(10,2)"`
}
10.3 大 offset 分页
// ❌ 深分页(第 1000 页)
db.Offset(10000).Limit(20).Find(&orders)
// SQL: SELECT * FROM orders LIMIT 20 OFFSET 10000
// 需要扫描并丢弃前 10000 行
// ✅ 游标分页
type PageCursor struct {
LastID int64
PageSize int
}
func ListOrdersWithCursor(ctx context.Context, db *gorm.DB, cursor PageCursor) ([]Order, int64, error) {
var orders []Order
query := db.WithContext(ctx).
Where("id > ?", cursor.LastID).
Order("id ASC").
Limit(cursor.PageSize)
err := query.Find(&orders).Error
if err != nil || len(orders) == 0 {
return orders, 0, err
}
nextID := orders[len(orders)-1].ID
return orders, nextID, nil
}
10.4 N+1 查询问题
// ❌ N+1 问题
users, _ := listUsers() // 1次查询
for _, u := range users {
orders, _ := getOrders(u.ID) // N次查询!
}
// ✅ 方案1:JOIN 查询
db.Preload("Orders").Find(&users) // GORM Preload,2次查询
// ✅ 方案2:IN 查询(手动)
userIDs := extractIDs(users)
orders := getOrdersByUserIDs(userIDs) // 1次 IN 查询
orderMap := groupByUserID(orders)
for _, u := range users {
u.Orders = orderMap[u.ID]
}
10.5 Go 错误处理
import "github.com/go-sql-driver/mysql"
// 检查具体的 MySQL 错误码
func handleMySQLError(err error) {
var mysqlErr *mysql.MySQLError
if !errors.As(err, &mysqlErr) {
return
}
switch mysqlErr.Number {
case 1062:
// 唯一键冲突
fmt.Println("duplicate entry:", mysqlErr.Message)
case 1213:
// 死锁
fmt.Println("deadlock detected, retry")
case 1205:
// 锁等待超时
fmt.Println("lock wait timeout")
case 1406:
// 数据太长
fmt.Println("data too long")
}
}
// 常用 MySQL 错误码
const (
ErrDupEntry = 1062
ErrDeadlock = 1213
ErrLockTimeout = 1205
ErrDataTooLong = 1406
ErrAccessDenied = 1045
ErrTooManyConn = 1040
)
小结
Go + MySQL 最佳实践清单:
连接管理:
✅ 全局 *sql.DB 单例,不要每次创建
✅ 设置合理的连接池参数(MaxOpenConns/MaxIdleConns/MaxLifetime)
✅ DSN 指定 parseTime=True&loc=Local&timeout=5s
✅ 监控连接池状态(db.Stats())
SQL 安全:
✅ 永远使用占位符(?),不拼接 SQL
✅ 批量操作用 IN 语句而非循环单条
✅ DECIMAL 列用 string 接收,decimal 库处理
事务:
✅ 使用 defer + Rollback 保底
✅ 事务内不做耗时操作(网络调用等)
✅ 批量操作分批事务,避免超大事务
查询:
✅ 只查需要的列,避免 SELECT *
✅ 深分页用游标,避免大 OFFSET
✅ 预加载关联数据,避免 N+1
测试:
✅ 集成测试用 testcontainers
✅ 单元测试用 sqlmock
✅ 每个测试用例独立清理数据
系列完结
至此,MySQL 系列博客全部完成:
| 序号 | 标题 | 核心内容 |
|---|---|---|
| 01 | 概述与快速上手 | 安装、基本操作、Go 初体验 |
| 02 | 架构深度解析 | Server层、InnoDB、SQL执行链路 |
| 03 | 索引原理与优化 | B+Tree、聚簇索引、覆盖索引、EXPLAIN |
| 04 | SQL 语法大全 | DDL/DML/窗口函数/CTE/JSON |
| 05 | 事务与 MVCC | ACID、隔离级别、Read View |
| 06 | 锁机制详解 | 行锁、间隙锁、Next-Key Lock、死锁 |
| 07 | 日志系统 | Redo Log、Undo Log、Binlog、2PC |
| 08 | 查询优化 | EXPLAIN深度解读、SQL改写 |
| 09 | 高可用架构 | 主从复制、MGR、MHA、ProxySQL |
| 10 | 配置参数 | 内存/连接/InnoDB/安全配置 |
| 11 | 常见问题定位 | 慢查询/死锁/复制异常/磁盘满 |
| 12 | Go 开发实战 | database/sql/GORM/迁移/测试 |
xingliuhua