AJ ONeal e11b228765
fix(pgmigrate): handle 42P01 surfaced lazily at rows.Err()
pgx/v5's Conn.Query is lazy — when the queried table doesn't exist,
the 42P01 error doesn't surface at Query() time, it surfaces at
rows.Err() after the iteration loop. The original code only checked
for 42P01 at the Query() site, so first-run migrations against an
empty database failed with:

    reading rows: ERROR: relation "_migrations" does not exist (SQLSTATE 42P01)

Apply the typed-error check at both sites via a shared helper.
2026-04-10 00:01:55 -06:00

109 lines
3.2 KiB
Go

// Package pgmigrate implements sqlmigrate.Migrator for PostgreSQL via pgx/v5.
//
// # Multi-tenant schemas
//
// For schema-based multi-tenancy, set search_path on the connection
// before creating the migrator:
//
// conn, _ := pgx.Connect(ctx, pgURL)
// _, _ = conn.Exec(ctx, fmt.Sprintf("SET search_path TO %s", pgx.Identifier{schema}.Sanitize()))
// runner := pgmigrate.New(conn)
//
// Each schema gets its own _migrations table, so tenants are migrated
// independently. The sql-migrate CLI supports this via TENANT_SCHEMA;
// see the CLI help for details.
package pgmigrate
import (
"context"
"errors"
"fmt"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/therootcompany/golib/database/sqlmigrate"
)
// Migrator implements sqlmigrate.Migrator using a single pgx.Conn.
type Migrator struct {
Conn *pgx.Conn
}
// New creates a Migrator from the given connection.
func New(conn *pgx.Conn) *Migrator {
return &Migrator{Conn: conn}
}
// verify interface compliance at compile time
var _ sqlmigrate.Migrator = (*Migrator)(nil)
// ExecUp runs the up migration SQL inside a PostgreSQL transaction.
func (r *Migrator) ExecUp(ctx context.Context, m sqlmigrate.Migration, sql string) error {
return r.execInTx(ctx, sql)
}
// ExecDown runs the down migration SQL inside a PostgreSQL transaction.
func (r *Migrator) ExecDown(ctx context.Context, m sqlmigrate.Migration, sql string) error {
return r.execInTx(ctx, sql)
}
func (r *Migrator) execInTx(ctx context.Context, sql string) error {
tx, err := r.Conn.Begin(ctx)
if err != nil {
return fmt.Errorf("%w: begin: %w", sqlmigrate.ErrExecFailed, err)
}
defer func() { _ = tx.Rollback(ctx) }()
if _, err := tx.Exec(ctx, sql); err != nil {
return fmt.Errorf("%w: exec: %w", sqlmigrate.ErrExecFailed, err)
}
if err := tx.Commit(ctx); err != nil {
return fmt.Errorf("%w: commit: %w", sqlmigrate.ErrExecFailed, err)
}
return nil
}
// Applied returns all applied migrations from the _migrations table.
// Returns an empty slice if the table does not exist (PG error 42P01).
//
// Note: pgx.Conn.Query is lazy — when the table is missing, the 42P01
// error may surface at rows.Err() rather than at Query(). Both sites
// must check for it.
func (r *Migrator) Applied(ctx context.Context) ([]sqlmigrate.Migration, error) {
rows, err := r.Conn.Query(ctx, "SELECT id, name FROM _migrations ORDER BY name")
if err != nil {
if isUndefinedTable(err) {
return nil, nil
}
return nil, fmt.Errorf("%w: %w", sqlmigrate.ErrQueryApplied, err)
}
defer rows.Close()
var applied []sqlmigrate.Migration
for rows.Next() {
var a sqlmigrate.Migration
if err := rows.Scan(&a.ID, &a.Name); err != nil {
return nil, fmt.Errorf("%w: scanning row: %w", sqlmigrate.ErrQueryApplied, err)
}
applied = append(applied, a)
}
if err := rows.Err(); err != nil {
if isUndefinedTable(err) {
return nil, nil
}
return nil, fmt.Errorf("%w: reading rows: %w", sqlmigrate.ErrQueryApplied, err)
}
return applied, nil
}
// isUndefinedTable reports whether err is PostgreSQL error 42P01
// (undefined_table), which is what we get when _migrations doesn't exist yet.
func isUndefinedTable(err error) bool {
pgErr, ok := errors.AsType[*pgconn.PgError](err)
return ok && pgErr.Code == "42P01"
}