AJ ONeal 67ad7a9fa2 fix(litemigrate,mymigrate,msmigrate): take *sql.Conn instead of *sql.DB
Same issue as pgmigrate: *sql.DB is a connection pool, so each call
may land on a different connection. Migrations need a pinned connection
for session state (SET search_path, temp tables, etc.) to persist
across sequential calls. *sql.Conn (from db.Conn(ctx)) pins one
underlying connection for its lifetime.
2026-04-09 10:56:47 -06:00

114 lines
3.7 KiB
Go

// Package mymigrate implements sqlmigrate.Migrator for MySQL and MariaDB
// using database/sql with github.com/go-sql-driver/mysql.
//
// The *sql.Conn must originate from a *sql.DB opened with
// multiStatements=true in the DSN; without it, multi-statement migration
// files will silently execute only the first statement. The
// multiStatements requirement is validated lazily on the first ExecUp or
// ExecDown call:
//
// db, err := sql.Open("mysql", "user:pass@tcp(host:3306)/dbname?multiStatements=true")
// conn, err := db.Conn(ctx)
//
// MySQL and MariaDB do not support transactional DDL. Statements like
// CREATE TABLE and ALTER TABLE cause an implicit commit, so if a migration
// fails partway through, earlier DDL statements in that migration will
// already be committed. DML-only migrations are fully transactional.
package mymigrate
import (
"context"
"database/sql"
"errors"
"fmt"
"github.com/go-sql-driver/mysql"
"github.com/therootcompany/golib/database/sqlmigrate"
)
// Migrator implements sqlmigrate.Migrator using a *sql.Conn with MySQL/MariaDB.
type Migrator struct {
Conn *sql.Conn
validated bool
}
// New creates a Migrator from the given connection.
// Use db.Conn(ctx) to obtain a *sql.Conn from a *sql.DB.
// The multiStatements=true DSN requirement is validated lazily on the
// first ExecUp or ExecDown call.
func New(conn *sql.Conn) *Migrator {
return &Migrator{Conn: conn}
}
var _ sqlmigrate.Migrator = (*Migrator)(nil)
// ExecUp runs the up migration SQL in a transaction. DDL statements
// (CREATE, ALTER, DROP) are implicitly committed by MySQL; see package docs.
func (m *Migrator) ExecUp(ctx context.Context, mig sqlmigrate.Migration, sql string) error {
return m.exec(ctx, sql)
}
// ExecDown runs the down migration SQL in a transaction. DDL statements
// (CREATE, ALTER, DROP) are implicitly committed by MySQL; see package docs.
func (m *Migrator) ExecDown(ctx context.Context, mig sqlmigrate.Migration, sql string) error {
return m.exec(ctx, sql)
}
func (m *Migrator) exec(ctx context.Context, sqlStr string) error {
if !m.validated {
// Probe for multi-statement support. Without it, migration files
// that contain more than one statement silently execute only the first.
if _, err := m.Conn.ExecContext(ctx, "DO 1; DO 1"); err != nil {
return fmt.Errorf(
"%w: mymigrate: migration requires multiStatements=true in the MySQL DSN",
sqlmigrate.ErrExecFailed,
)
}
m.validated = true
}
tx, err := m.Conn.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("%w: begin: %w", sqlmigrate.ErrExecFailed, err)
}
defer func() { _ = tx.Rollback() }()
if _, err := tx.ExecContext(ctx, sqlStr); err != nil {
return fmt.Errorf("%w: exec: %w", sqlmigrate.ErrExecFailed, err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("%w: commit: %w", sqlmigrate.ErrExecFailed, err)
}
return nil
}
// Applied returns all applied migrations from the _migrations table.
// Returns an empty slice if the table does not exist (MySQL error 1146).
func (m *Migrator) Applied(ctx context.Context) ([]sqlmigrate.Migration, error) {
rows, err := m.Conn.QueryContext(ctx, "SELECT id, name FROM _migrations ORDER BY name")
if err != nil {
if mysqlErr, ok := errors.AsType[*mysql.MySQLError](err); ok && mysqlErr.Number == 1146 {
return nil, nil
}
return nil, fmt.Errorf("%w: %w", sqlmigrate.ErrQueryApplied, err)
}
defer rows.Close()
var applied []sqlmigrate.Migration
for rows.Next() {
var a sqlmigrate.Migration
if err := rows.Scan(&a.ID, &a.Name); err != nil {
return nil, fmt.Errorf("%w: scanning row: %w", sqlmigrate.ErrQueryApplied, err)
}
applied = append(applied, a)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("%w: reading rows: %w", sqlmigrate.ErrQueryApplied, err)
}
return applied, nil
}