From 01a9185c0361d79b1f64678e282906c97783bf5a Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Mon, 20 Apr 2026 13:22:08 -0600 Subject: [PATCH] refactor: delete net/dataset package check-ip and geoip no longer use it; formmailer now takes *atomic.Pointer[ipcohort.Cohort] for Blacklist so callers own the refresh + swap lifecycle directly. gitshallow doc comments that referenced dataset.Syncer are trimmed. The concepts the package tried to share (atomic-swap, group sync, ticker-driven refresh) may come back under sync/dataset once we have more than one in-tree caller that wants them. --- go.work | 2 - net/dataset/dataset.go | 213 -------------------------- net/dataset/dataset_test.go | 283 ----------------------------------- net/dataset/go.mod | 3 - net/formmailer/formmailer.go | 11 +- net/formmailer/go.mod | 1 - net/gitshallow/gitshallow.go | 8 +- 7 files changed, 9 insertions(+), 512 deletions(-) delete mode 100644 net/dataset/dataset.go delete mode 100644 net/dataset/dataset_test.go delete mode 100644 net/dataset/go.mod diff --git a/go.work b/go.work index 64fbb52..b35b232 100644 --- a/go.work +++ b/go.work @@ -3,7 +3,6 @@ go 1.26.1 use ( . ./cmd/check-ip - ./net/dataset ./net/formmailer ./net/geoip ./net/gitshallow @@ -12,7 +11,6 @@ use ( ) replace ( - github.com/therootcompany/golib/net/dataset v0.0.0 => ./net/dataset github.com/therootcompany/golib/net/formmailer v0.0.0 => ./net/formmailer github.com/therootcompany/golib/net/geoip v0.0.0 => ./net/geoip github.com/therootcompany/golib/net/gitshallow v0.0.0 => ./net/gitshallow diff --git a/net/dataset/dataset.go b/net/dataset/dataset.go deleted file mode 100644 index 298ccfd..0000000 --- a/net/dataset/dataset.go +++ /dev/null @@ -1,213 +0,0 @@ -// Package dataset couples a Syncer (fetch) with a Loader (parse) and an -// atomic.Pointer (hot-swap), providing a generic periodically-updated -// in-memory dataset with lock-free reads. -// -// Standalone dataset: -// -// ds := dataset.New(cacher, func() (*MyType, error) { -// return mytype.LoadFile(path) -// }) -// if err := ds.Init(); err != nil { ... } -// go ds.Run(ctx, 47*time.Minute) -// val := ds.Load() // *MyType, lock-free -// -// Group (one syncer, multiple datasets): -// -// g := dataset.NewGroup(repo) -// inbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... }) -// outbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... }) -// if err := g.Init(); err != nil { ... } -// go g.Run(ctx, 47*time.Minute) -// val := inbound.Load() // lock-free; Init/Run belong to the Group -package dataset - -import ( - "context" - "fmt" - "os" - "sync/atomic" - "time" -) - -// Syncer reports whether a remote resource has changed. -type Syncer interface { - Fetch() (updated bool, err error) -} - -// MultiSyncer fans out Fetch to multiple Syncers, returning updated=true if -// any reports a change. Stops and returns the first error. -type MultiSyncer []Syncer - -func (ms MultiSyncer) Fetch() (bool, error) { - var anyUpdated bool - for _, s := range ms { - updated, err := s.Fetch() - if err != nil { - return anyUpdated, err - } - anyUpdated = anyUpdated || updated - } - return anyUpdated, nil -} - -// NopSyncer always reports no update. Use for local-file datasets. -type NopSyncer struct{} - -func (NopSyncer) Fetch() (bool, error) { return false, nil } - -// Dataset couples a Syncer, a load function, and an atomic.Pointer[T]. -// Load is safe for concurrent use without locks. -// -// When a Dataset is added to a Group via Add, Init and Run belong to the -// Group; call Load on the Dataset to read the current value. -type Dataset[T any] struct { - // Name is used in error messages. - Name string - // Close is called with the old value after each successful swap. - Close func(*T) - - syncer Syncer - load func() (*T, error) - ptr atomic.Pointer[T] -} - -// New creates a Dataset. The syncer reports changes; load produces the value. -func New[T any](syncer Syncer, load func() (*T, error)) *Dataset[T] { - return &Dataset[T]{syncer: syncer, load: load} -} - -// Load returns the current value. Returns nil before Init is called. -func (d *Dataset[T]) Load() *T { - return d.ptr.Load() -} - -func (d *Dataset[T]) swap() error { - val, err := d.load() - if err != nil { - return err - } - if old := d.ptr.Swap(val); old != nil && d.Close != nil { - d.Close(old) - } - return nil -} - -// Sync calls the syncer and, if updated, reloads and atomically installs the -// new value. Returns whether the source changed. -func (d *Dataset[T]) Sync() (bool, error) { - updated, err := d.syncer.Fetch() - if err != nil { - return false, err - } - if !updated { - return false, nil - } - return true, d.swap() -} - -// Init syncs and always loads, ensuring the dataset is populated from an -// existing local file even if nothing changed upstream. -func (d *Dataset[T]) Init() error { - if _, err := d.syncer.Fetch(); err != nil { - return err - } - return d.swap() -} - -// Run calls Sync on every interval. Errors are written to stderr and do not -// stop the loop. -func (d *Dataset[T]) Run(ctx context.Context, interval time.Duration) { - ticker := time.NewTicker(interval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - if _, err := d.Sync(); err != nil { - name := d.Name - if name == "" { - name = "dataset" - } - fmt.Fprintf(os.Stderr, "%s: sync error: %v\n", name, err) - } - case <-ctx.Done(): - return - } - } -} - -// -- Group: one Syncer driving multiple datasets --------------------------- - -// member is the type-erased swap handle stored in a Group. -type member interface { - swap() error -} - -// Group ties one Syncer to multiple datasets so a single Fetch drives all -// swaps — no redundant network calls when datasets share a source. -// -// Datasets added via Add are owned by the Group; call Init and Run on the -// Group, not on individual datasets. -type Group struct { - syncer Syncer - members []member -} - -// NewGroup creates a Group backed by syncer. -func NewGroup(syncer Syncer) *Group { - return &Group{syncer: syncer} -} - -func (g *Group) swapAll() error { - for _, m := range g.members { - if err := m.swap(); err != nil { - return err - } - } - return nil -} - -// Sync calls the syncer and, if updated, reloads all member datasets. -// Returns whether the source changed. -func (g *Group) Sync() (bool, error) { - updated, err := g.syncer.Fetch() - if err != nil { - return false, err - } - if !updated { - return false, nil - } - return true, g.swapAll() -} - -// Init syncs and always loads all datasets. -func (g *Group) Init() error { - if _, err := g.syncer.Fetch(); err != nil { - return err - } - return g.swapAll() -} - -// Run calls Sync on every interval; reloads all datasets only when the source -// reports a change. -func (g *Group) Run(ctx context.Context, interval time.Duration) { - ticker := time.NewTicker(interval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - if _, err := g.Sync(); err != nil { - fmt.Fprintf(os.Stderr, "dataset group: sync error: %v\n", err) - } - case <-ctx.Done(): - return - } - } -} - -// Add registers a new dataset in g and returns it for reading via Load. -// Init and Run are driven by the Group; do not call them on the returned Dataset. -func Add[T any](g *Group, load func() (*T, error)) *Dataset[T] { - d := &Dataset[T]{load: load} - g.members = append(g.members, d) - return d -} diff --git a/net/dataset/dataset_test.go b/net/dataset/dataset_test.go deleted file mode 100644 index ab5926b..0000000 --- a/net/dataset/dataset_test.go +++ /dev/null @@ -1,283 +0,0 @@ -package dataset_test - -import ( - "context" - "errors" - "sync/atomic" - "testing" - "time" - - "github.com/therootcompany/golib/net/dataset" -) - -// countSyncer counts Fetch calls and optionally reports updated. -type countSyncer struct { - calls atomic.Int32 - updated bool - err error -} - -func (s *countSyncer) Fetch() (bool, error) { - s.calls.Add(1) - return s.updated, s.err -} - -func TestDataset_Init(t *testing.T) { - syn := &countSyncer{updated: false} - calls := 0 - ds := dataset.New(syn, func() (*string, error) { - calls++ - v := "hello" - return &v, nil - }) - - if err := ds.Init(); err != nil { - t.Fatal(err) - } - if got := ds.Load(); got == nil || *got != "hello" { - t.Fatalf("Load() = %v, want \"hello\"", got) - } - if calls != 1 { - t.Errorf("loader called %d times, want 1", calls) - } - if syn.calls.Load() != 1 { - t.Errorf("Fetch called %d times, want 1", syn.calls.Load()) - } -} - -func TestDataset_LoadBeforeInit(t *testing.T) { - syn := dataset.NopSyncer{} - ds := dataset.New(syn, func() (*string, error) { - v := "x" - return &v, nil - }) - if ds.Load() != nil { - t.Error("Load() before Init should return nil") - } -} - -func TestDataset_SyncNoUpdate(t *testing.T) { - syn := &countSyncer{updated: false} - calls := 0 - ds := dataset.New(syn, func() (*string, error) { - calls++ - v := "hello" - return &v, nil - }) - if err := ds.Init(); err != nil { - t.Fatal(err) - } - calls = 0 - - updated, err := ds.Sync() - if err != nil { - t.Fatal(err) - } - if updated { - t.Error("Sync() reported updated=true but syncer returned false") - } - if calls != 0 { - t.Errorf("loader called %d times on no-update Sync, want 0", calls) - } -} - -func TestDataset_SyncWithUpdate(t *testing.T) { - syn := &countSyncer{updated: true} - n := 0 - ds := dataset.New(syn, func() (*string, error) { - n++ - v := "v" + string(rune('0'+n)) - return &v, nil - }) - if err := ds.Init(); err != nil { - t.Fatal(err) - } - updated, err := ds.Sync() - if err != nil { - t.Fatal(err) - } - if !updated { - t.Error("Sync() reported updated=false but syncer returned true") - } - if got := ds.Load(); got == nil || *got != "v2" { - t.Errorf("Load() after Sync = %v, want \"v2\"", got) - } -} - -func TestDataset_InitError(t *testing.T) { - syn := &countSyncer{err: errors.New("fetch failed")} - ds := dataset.New(syn, func() (*string, error) { - v := "x" - return &v, nil - }) - if err := ds.Init(); err == nil { - t.Error("expected error from Init when syncer fails") - } - if ds.Load() != nil { - t.Error("Load() should be nil after failed Init") - } -} - -func TestDataset_LoaderError(t *testing.T) { - syn := dataset.NopSyncer{} - ds := dataset.New(syn, func() (*string, error) { - return nil, errors.New("load failed") - }) - if err := ds.Init(); err == nil { - t.Error("expected error from Init when loader fails") - } -} - -func TestDataset_Close(t *testing.T) { - syn := &countSyncer{updated: true} - var closed []string - n := 0 - ds := dataset.New(syn, func() (*string, error) { - n++ - v := "v" + string(rune('0'+n)) - return &v, nil - }) - ds.Close = func(s *string) { closed = append(closed, *s) } - - if err := ds.Init(); err != nil { - t.Fatal(err) - } - // First swap: old is nil, Close should not be called. - if len(closed) != 0 { - t.Errorf("Close called %d times on Init, want 0", len(closed)) - } - - if _, err := ds.Sync(); err != nil { - t.Fatal(err) - } - if len(closed) != 1 || closed[0] != "v1" { - t.Errorf("Close got %v, want [\"v1\"]", closed) - } -} - -func TestDataset_Run(t *testing.T) { - syn := &countSyncer{updated: true} - n := atomic.Int32{} - ds := dataset.New(syn, func() (*int32, error) { - v := n.Add(1) - return &v, nil - }) - if err := ds.Init(); err != nil { - t.Fatal(err) - } - - ctx, cancel := context.WithCancel(context.Background()) - done := make(chan struct{}) - go func() { - ds.Run(ctx, 10*time.Millisecond) - close(done) - }() - - time.Sleep(60 * time.Millisecond) - cancel() - <-done - - if n.Load() < 2 { - t.Errorf("Run did not tick: loader called %d times", n.Load()) - } -} - -// --- Group tests --- - -func TestGroup_Init(t *testing.T) { - syn := &countSyncer{} - g := dataset.NewGroup(syn) - - callsA, callsB := 0, 0 - dsA := dataset.Add(g, func() (*string, error) { - callsA++ - v := "a" - return &v, nil - }) - dsB := dataset.Add(g, func() (*int, error) { - callsB++ - v := 42 - return &v, nil - }) - - if err := g.Init(); err != nil { - t.Fatal(err) - } - if syn.calls.Load() != 1 { - t.Errorf("Fetch called %d times, want 1", syn.calls.Load()) - } - if callsA != 1 || callsB != 1 { - t.Errorf("loaders called (%d,%d), want (1,1)", callsA, callsB) - } - if got := dsA.Load(); got == nil || *got != "a" { - t.Errorf("dsA.Load() = %v", got) - } - if got := dsB.Load(); got == nil || *got != 42 { - t.Errorf("dsB.Load() = %v", got) - } -} - -func TestGroup_SyncNoUpdate(t *testing.T) { - syn := &countSyncer{updated: false} - g := dataset.NewGroup(syn) - calls := 0 - dataset.Add(g, func() (*string, error) { - calls++ - v := "x" - return &v, nil - }) - if err := g.Init(); err != nil { - t.Fatal(err) - } - calls = 0 - - updated, err := g.Sync() - if err != nil { - t.Fatal(err) - } - if updated || calls != 0 { - t.Errorf("Sync() updated=%v calls=%d, want false/0", updated, calls) - } -} - -func TestGroup_SyncWithUpdate(t *testing.T) { - syn := &countSyncer{updated: true} - g := dataset.NewGroup(syn) - n := 0 - ds := dataset.Add(g, func() (*int, error) { - n++ - return &n, nil - }) - if err := g.Init(); err != nil { - t.Fatal(err) - } - if _, err := g.Sync(); err != nil { - t.Fatal(err) - } - if got := ds.Load(); got == nil || *got != 2 { - t.Errorf("ds.Load() = %v, want 2", got) - } -} - -func TestGroup_FetchError(t *testing.T) { - syn := &countSyncer{err: errors.New("network down")} - g := dataset.NewGroup(syn) - dataset.Add(g, func() (*string, error) { - v := "x" - return &v, nil - }) - if err := g.Init(); err == nil { - t.Error("expected error from Group.Init when syncer fails") - } -} - -func TestGroup_LoaderError(t *testing.T) { - syn := dataset.NopSyncer{} - g := dataset.NewGroup(syn) - dataset.Add(g, func() (*string, error) { - return nil, errors.New("parse error") - }) - if err := g.Init(); err == nil { - t.Error("expected error from Group.Init when loader fails") - } -} diff --git a/net/dataset/go.mod b/net/dataset/go.mod deleted file mode 100644 index 38698eb..0000000 --- a/net/dataset/go.mod +++ /dev/null @@ -1,3 +0,0 @@ -module github.com/therootcompany/golib/net/dataset - -go 1.26.0 diff --git a/net/formmailer/formmailer.go b/net/formmailer/formmailer.go index 9052dd7..081dc6c 100644 --- a/net/formmailer/formmailer.go +++ b/net/formmailer/formmailer.go @@ -3,9 +3,8 @@ // // Typical setup: // -// blGroup, _, inboundDS, _ := src.Datasets() -// blGroup.Init() -// go blGroup.Run(ctx, 47*time.Minute) +// var blacklist atomic.Pointer[ipcohort.Cohort] +// // ... caller loads blacklist and hot-swaps on a timer ... // // fm := &formmailer.FormMailer{ // SMTPHost: "smtp.example.com:587", @@ -16,7 +15,7 @@ // Subject: "Contact from {.Email}", // SuccessBody: successHTML, // ErrorBody: errorHTML, -// Blacklist: inboundDS, +// Blacklist: &blacklist, // AllowedCountries: []string{"US", "CA", "MX"}, // } // http.Handle("POST /contact", fm) @@ -35,12 +34,12 @@ import ( "slices" "strings" "sync" + "sync/atomic" "time" "github.com/phuslu/iploc" "golang.org/x/time/rate" - "github.com/therootcompany/golib/net/dataset" "github.com/therootcompany/golib/net/ipcohort" ) @@ -102,7 +101,7 @@ type FormMailer struct { ContentType string // inferred from SuccessBody if empty // Blacklist — if set, matching IPs are rejected before any other processing. - Blacklist *dataset.Dataset[ipcohort.Cohort] + Blacklist *atomic.Pointer[ipcohort.Cohort] // AllowedCountries — if non-nil, only requests from listed ISO codes are // accepted. Unknown country ("") is always allowed. diff --git a/net/formmailer/go.mod b/net/formmailer/go.mod index 99330aa..e088eac 100644 --- a/net/formmailer/go.mod +++ b/net/formmailer/go.mod @@ -4,7 +4,6 @@ go 1.26.0 require ( github.com/phuslu/iploc v1.0.20260415 - github.com/therootcompany/golib/net/dataset v0.0.0 github.com/therootcompany/golib/net/ipcohort v0.0.0 golang.org/x/time v0.15.0 ) diff --git a/net/gitshallow/gitshallow.go b/net/gitshallow/gitshallow.go index b53eb1b..e5867cf 100644 --- a/net/gitshallow/gitshallow.go +++ b/net/gitshallow/gitshallow.go @@ -168,7 +168,7 @@ func (r *Repo) Sync() (bool, error) { return r.syncGit() } -// Fetch satisfies dataset.Syncer. +// Fetch syncs the repo and reports whether HEAD changed. func (r *Repo) Fetch() (bool, error) { return r.syncGit() } @@ -186,8 +186,8 @@ func (r *Repo) File(relPath string) *File { } // File is a handle to a single file inside a Repo. -// It implements dataset.Syncer: Fetch syncs the repo (deduped across all File -// handles sharing the same Repo) then reports whether this file changed. +// Fetch syncs the repo (deduped across all File handles sharing the same +// Repo) and reports whether this file changed. type File struct { repo *Repo rel string @@ -206,7 +206,7 @@ func (f *File) Open() (*os.File, error) { } // Fetch syncs the repo and reports whether this file changed since last call. -// Implements dataset.Syncer; safe to call concurrently. +// Safe to call concurrently. func (f *File) Fetch() (bool, error) { if _, err := f.repo.syncGit(); err != nil { return false, err