From 11743c9a10f054ab64e3a67342bf5478645dd8a2 Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Mon, 20 Apr 2026 13:33:05 -0600 Subject: [PATCH] feat(sync/dataset): minimal group/view/fetcher for hot-swap refresh MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Distilled from the previous net/dataset experiment and the inline closure version in check-ip. Keeps what actually earned its keep: - Group ties one Fetcher to N views; a single Load drives all swaps, so shared sources (one git pull, one zip download) don't get re-fetched per view. - View[T].Value() is a lock-free atomic read; the atomic.Pointer is hidden so consumers never see in-flight reloads. - Tick runs Load on a ticker with stderr error logging. Dropped from the v1 design: MultiSyncer (callers fan-out inline when needed), Close (unused outside geoip), Name (callers wrap the logger), standalone Dataset type (Group with one view covers it), Sync vs Init asymmetry (Load handles first-call vs update internally). check-ip rewires to use it — file/git/http modes all build a Group with two views, uniform shape. --- cmd/check-ip/go.mod | 1 + cmd/check-ip/main.go | 149 +++++++++++++---------------------- go.work | 2 + sync/dataset/dataset.go | 127 +++++++++++++++++++++++++++++ sync/dataset/dataset_test.go | 143 +++++++++++++++++++++++++++++++++ sync/dataset/go.mod | 3 + 6 files changed, 332 insertions(+), 93 deletions(-) create mode 100644 sync/dataset/dataset.go create mode 100644 sync/dataset/dataset_test.go create mode 100644 sync/dataset/go.mod diff --git a/cmd/check-ip/go.mod b/cmd/check-ip/go.mod index f7ce278..142840e 100644 --- a/cmd/check-ip/go.mod +++ b/cmd/check-ip/go.mod @@ -7,4 +7,5 @@ require ( github.com/therootcompany/golib/net/gitshallow v0.0.0 github.com/therootcompany/golib/net/httpcache v0.0.0 github.com/therootcompany/golib/net/ipcohort v0.0.0 + github.com/therootcompany/golib/sync/dataset v0.0.0 ) diff --git a/cmd/check-ip/main.go b/cmd/check-ip/main.go index a669731..9c5d436 100644 --- a/cmd/check-ip/main.go +++ b/cmd/check-ip/main.go @@ -7,8 +7,8 @@ // - --git URL shallow-clone a git repo of blocklists // - (default) fetch raw blocklist files over HTTP with caching // -// Cohorts are held in atomic.Pointers and hot-swapped on refresh so callers -// never see a partial view. A single goroutine reloads on a ticker. +// Each mode builds a sync/dataset.Group: one Fetcher shared by the inbound +// and outbound views, so a single git pull (or HTTP-304 cycle) drives both. package main import ( @@ -19,13 +19,13 @@ import ( "os" "path/filepath" "strings" - "sync/atomic" "time" "github.com/therootcompany/golib/net/geoip" "github.com/therootcompany/golib/net/gitshallow" "github.com/therootcompany/golib/net/httpcache" "github.com/therootcompany/golib/net/ipcohort" + "github.com/therootcompany/golib/sync/dataset" ) const ( @@ -102,19 +102,16 @@ func run(cfg Config, ipStr string) (blocked bool, err error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var inbound, outbound atomic.Pointer[ipcohort.Cohort] - - refresh, err := buildRefresher(cfg, &inbound, &outbound) + group, inbound, outbound, err := newBlocklistGroup(cfg) if err != nil { return false, err } - if err := refresh(); err != nil { + if err := group.Load(ctx); err != nil { return false, fmt.Errorf("blacklist: %w", err) } fmt.Fprintf(os.Stderr, "Loaded inbound=%d outbound=%d\n", - inbound.Load().Size(), outbound.Load().Size()) - - go tick(ctx, refreshInterval, "blacklist", refresh) + inbound.Value().Size(), outbound.Value().Size()) + go group.Tick(ctx, refreshInterval) whitelist, err := loadWhitelist(cfg.Whitelist) if err != nil { @@ -127,8 +124,8 @@ func run(cfg Config, ipStr string) (blocked bool, err error) { } defer func() { _ = geo.Close() }() - blockedIn := isBlocked(ipStr, whitelist, inbound.Load()) - blockedOut := isBlocked(ipStr, whitelist, outbound.Load()) + blockedIn := isBlocked(ipStr, whitelist, inbound.Value()) + blockedOut := isBlocked(ipStr, whitelist, outbound.Value()) switch { case blockedIn && blockedOut: @@ -145,71 +142,51 @@ func run(cfg Config, ipStr string) (blocked bool, err error) { return blockedIn || blockedOut, nil } -// buildRefresher wires the chosen source (files/git/http) to the inbound and -// outbound atomic pointers, and returns a function that performs one refresh -// cycle: fetch upstream, and if anything changed (or on the first call), -// reload both cohorts and atomically swap them in. -func buildRefresher( - cfg Config, - inbound, outbound *atomic.Pointer[ipcohort.Cohort], -) (func() error, error) { - loadAndSwap := func(inPaths, outPaths []string) error { - in, err := ipcohort.LoadFiles(inPaths...) - if err != nil { - return fmt.Errorf("inbound: %w", err) - } - out, err := ipcohort.LoadFiles(outPaths...) - if err != nil { - return fmt.Errorf("outbound: %w", err) - } - inbound.Store(in) - outbound.Store(out) - return nil +// newBlocklistGroup wires a dataset.Group to the configured source (local +// files, git, or HTTP-cached raw files) and registers inbound/outbound views. +func newBlocklistGroup(cfg Config) ( + _ *dataset.Group, + inbound, outbound *dataset.View[ipcohort.Cohort], + err error, +) { + fetcher, inPaths, outPaths, err := newFetcher(cfg) + if err != nil { + return nil, nil, nil, err } + g := dataset.NewGroup(fetcher) + inbound = dataset.Add(g, loadCohort(inPaths)) + outbound = dataset.Add(g, loadCohort(outPaths)) + return g, inbound, outbound, nil +} +// newFetcher picks a Fetcher based on cfg and returns the on-disk file paths +// each view should parse after a sync. +func newFetcher(cfg Config) (fetcher dataset.Fetcher, inPaths, outPaths []string, err error) { switch { case cfg.Inbound != "" || cfg.Outbound != "": - inPaths, outPaths := splitCSV(cfg.Inbound), splitCSV(cfg.Outbound) - loaded := false - return func() error { - if loaded { - return nil - } - loaded = true - return loadAndSwap(inPaths, outPaths) - }, nil + return dataset.NopFetcher{}, splitCSV(cfg.Inbound), splitCSV(cfg.Outbound), nil case cfg.GitURL != "": dir, err := cacheDir(cfg.DataDir) if err != nil { - return nil, err + return nil, nil, nil, err } repo := gitshallow.New(cfg.GitURL, dir, 1, "") - inPaths := []string{ - repo.FilePath("tables/inbound/single_ips.txt"), - repo.FilePath("tables/inbound/networks.txt"), - } - outPaths := []string{ - repo.FilePath("tables/outbound/single_ips.txt"), - repo.FilePath("tables/outbound/networks.txt"), - } - first := true - return func() error { - updated, err := repo.Sync() - if err != nil { - return err - } - if !first && !updated { - return nil - } - first = false - return loadAndSwap(inPaths, outPaths) - }, nil + return repo, + []string{ + repo.FilePath("tables/inbound/single_ips.txt"), + repo.FilePath("tables/inbound/networks.txt"), + }, + []string{ + repo.FilePath("tables/outbound/single_ips.txt"), + repo.FilePath("tables/outbound/networks.txt"), + }, + nil default: dir, err := cacheDir(cfg.DataDir) if err != nil { - return nil, err + return nil, nil, nil, err } cachers := []*httpcache.Cacher{ httpcache.New(bitwireRawBase+"/inbound/single_ips.txt", filepath.Join(dir, "inbound_single_ips.txt")), @@ -217,40 +194,26 @@ func buildRefresher( httpcache.New(bitwireRawBase+"/outbound/single_ips.txt", filepath.Join(dir, "outbound_single_ips.txt")), httpcache.New(bitwireRawBase+"/outbound/networks.txt", filepath.Join(dir, "outbound_networks.txt")), } - inPaths := []string{cachers[0].Path, cachers[1].Path} - outPaths := []string{cachers[2].Path, cachers[3].Path} - first := true - return func() error { - var anyUpdated bool - for _, c := range cachers { - u, err := c.Fetch() - if err != nil { - return err + return dataset.FetcherFunc(func() (bool, error) { + var any bool + for _, c := range cachers { + u, err := c.Fetch() + if err != nil { + return false, err + } + any = any || u } - anyUpdated = anyUpdated || u - } - if !first && !anyUpdated { - return nil - } - first = false - return loadAndSwap(inPaths, outPaths) - }, nil + return any, nil + }), + []string{cachers[0].Path, cachers[1].Path}, + []string{cachers[2].Path, cachers[3].Path}, + nil } } -// tick calls fn every interval until ctx is done. Errors are logged, not fatal. -func tick(ctx context.Context, interval time.Duration, name string, fn func() error) { - t := time.NewTicker(interval) - defer t.Stop() - for { - select { - case <-ctx.Done(): - return - case <-t.C: - if err := fn(); err != nil { - fmt.Fprintf(os.Stderr, "%s: refresh error: %v\n", name, err) - } - } +func loadCohort(paths []string) func() (*ipcohort.Cohort, error) { + return func() (*ipcohort.Cohort, error) { + return ipcohort.LoadFiles(paths...) } } diff --git a/go.work b/go.work index b35b232..29a4b42 100644 --- a/go.work +++ b/go.work @@ -8,6 +8,7 @@ use ( ./net/gitshallow ./net/httpcache ./net/ipcohort + ./sync/dataset ) replace ( @@ -16,4 +17,5 @@ replace ( github.com/therootcompany/golib/net/gitshallow v0.0.0 => ./net/gitshallow github.com/therootcompany/golib/net/httpcache v0.0.0 => ./net/httpcache github.com/therootcompany/golib/net/ipcohort v0.0.0 => ./net/ipcohort + github.com/therootcompany/golib/sync/dataset v0.0.0 => ./sync/dataset ) diff --git a/sync/dataset/dataset.go b/sync/dataset/dataset.go new file mode 100644 index 0000000..6835ba9 --- /dev/null +++ b/sync/dataset/dataset.go @@ -0,0 +1,127 @@ +// Package dataset manages values that are periodically re-fetched from an +// upstream source and hot-swapped behind atomic pointers. Consumers read via +// View.Value (lock-free); a single Load drives any number of views off one +// Fetcher, so shared sources (one git pull, one zip download) don't get +// re-fetched per view. +// +// Typical lifecycle: +// +// g := dataset.NewGroup(repo) // *gitshallow.Repo satisfies Fetcher +// inbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... }) +// outbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... }) +// if err := g.Load(ctx); err != nil { ... } // initial populate +// go g.Tick(ctx, 47*time.Minute) // background refresh +// current := inbound.Value() // lock-free read +package dataset + +import ( + "context" + "fmt" + "os" + "sync/atomic" + "time" +) + +// Fetcher reports whether an upstream source has changed since the last call. +// Implementations should dedup rapid-fire calls internally (e.g. gitshallow +// skips redundant pulls within a short window; httpcache uses ETag). +type Fetcher interface { + Fetch() (updated bool, err error) +} + +// FetcherFunc adapts a plain function to Fetcher. +type FetcherFunc func() (bool, error) + +func (f FetcherFunc) Fetch() (bool, error) { return f() } + +// NopFetcher always reports no update. Use for groups backed by local files +// that don't need a refresh cycle. +type NopFetcher struct{} + +func (NopFetcher) Fetch() (bool, error) { return false, nil } + +// Group ties one Fetcher to one or more views. A Load call fetches once and, +// on the first call or when the source reports a change, reloads every view +// and atomically swaps its current value. +type Group struct { + fetcher Fetcher + views []reloader + loaded atomic.Bool +} + +// reloader is a type-erased handle to a View's reload function. +type reloader interface { + reload() error +} + +// NewGroup creates a Group backed by fetcher. +func NewGroup(fetcher Fetcher) *Group { + return &Group{fetcher: fetcher} +} + +// Load fetches upstream and, on the first call or whenever the fetcher reports +// a change, reloads every view and atomically installs the new values. +func (g *Group) Load(ctx context.Context) error { + updated, err := g.fetcher.Fetch() + if err != nil { + return err + } + if g.loaded.Load() && !updated { + return nil + } + for _, v := range g.views { + if err := ctx.Err(); err != nil { + return err + } + if err := v.reload(); err != nil { + return err + } + } + g.loaded.Store(true) + return nil +} + +// Tick calls Load every interval until ctx is done. Errors are written to +// stderr and do not stop the loop. +func (g *Group) Tick(ctx context.Context, interval time.Duration) { + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + if err := g.Load(ctx); err != nil { + fmt.Fprintf(os.Stderr, "dataset: load error: %v\n", err) + } + } + } +} + +// View is a read-only handle to one dataset inside a Group. +type View[T any] struct { + loader func() (*T, error) + ptr atomic.Pointer[T] +} + +// Value returns the current snapshot. Nil before the Group is first loaded. +func (v *View[T]) Value() *T { + return v.ptr.Load() +} + +func (v *View[T]) reload() error { + t, err := v.loader() + if err != nil { + return err + } + v.ptr.Store(t) + return nil +} + +// Add registers a new view in g and returns it. Call after NewGroup and +// before the first Load. +func Add[T any](g *Group, loader func() (*T, error)) *View[T] { + v := &View[T]{loader: loader} + g.views = append(g.views, v) + return v +} diff --git a/sync/dataset/dataset_test.go b/sync/dataset/dataset_test.go new file mode 100644 index 0000000..f5d12b1 --- /dev/null +++ b/sync/dataset/dataset_test.go @@ -0,0 +1,143 @@ +package dataset_test + +import ( + "errors" + "sync/atomic" + "testing" + + "github.com/therootcompany/golib/sync/dataset" +) + +type countFetcher struct { + calls atomic.Int32 + updated bool + err error +} + +func (f *countFetcher) Fetch() (bool, error) { + f.calls.Add(1) + return f.updated, f.err +} + +func TestGroup_LoadPopulatesAllViews(t *testing.T) { + f := &countFetcher{} + g := dataset.NewGroup(f) + + var aCalls, bCalls int + a := dataset.Add(g, func() (*string, error) { + aCalls++ + v := "a" + return &v, nil + }) + b := dataset.Add(g, func() (*int, error) { + bCalls++ + v := 42 + return &v, nil + }) + + if err := g.Load(t.Context()); err != nil { + t.Fatal(err) + } + if f.calls.Load() != 1 { + t.Errorf("Fetch called %d times, want 1", f.calls.Load()) + } + if aCalls != 1 || bCalls != 1 { + t.Errorf("loaders called (%d,%d), want (1,1)", aCalls, bCalls) + } + if got := a.Value(); got == nil || *got != "a" { + t.Errorf("a.Value() = %v", got) + } + if got := b.Value(); got == nil || *got != 42 { + t.Errorf("b.Value() = %v", got) + } +} + +func TestGroup_SecondLoadSkipsUnchanged(t *testing.T) { + f := &countFetcher{updated: false} + g := dataset.NewGroup(f) + calls := 0 + dataset.Add(g, func() (*string, error) { + calls++ + v := "x" + return &v, nil + }) + if err := g.Load(t.Context()); err != nil { + t.Fatal(err) + } + if calls != 1 { + t.Fatalf("initial load ran loader %d times, want 1", calls) + } + if err := g.Load(t.Context()); err != nil { + t.Fatal(err) + } + if calls != 1 { + t.Errorf("second load ran loader %d times, want 1 (no upstream change)", calls) + } +} + +func TestGroup_LoadOnUpdateSwaps(t *testing.T) { + f := &countFetcher{updated: true} + g := dataset.NewGroup(f) + n := 0 + v := dataset.Add(g, func() (*int, error) { + n++ + return &n, nil + }) + if err := g.Load(t.Context()); err != nil { + t.Fatal(err) + } + if err := g.Load(t.Context()); err != nil { + t.Fatal(err) + } + if got := v.Value(); got == nil || *got != 2 { + t.Errorf("v.Value() = %v, want 2", got) + } +} + +func TestGroup_ValueBeforeLoad(t *testing.T) { + g := dataset.NewGroup(dataset.NopFetcher{}) + v := dataset.Add(g, func() (*string, error) { + s := "x" + return &s, nil + }) + if v.Value() != nil { + t.Error("Value() before Load should be nil") + } +} + +func TestGroup_FetchError(t *testing.T) { + f := &countFetcher{err: errors.New("offline")} + g := dataset.NewGroup(f) + dataset.Add(g, func() (*string, error) { + s := "x" + return &s, nil + }) + if err := g.Load(t.Context()); err == nil { + t.Error("expected fetch error") + } +} + +func TestGroup_LoaderError(t *testing.T) { + g := dataset.NewGroup(dataset.NopFetcher{}) + dataset.Add(g, func() (*string, error) { + return nil, errors.New("parse fail") + }) + if err := g.Load(t.Context()); err == nil { + t.Error("expected loader error") + } +} + +func TestFetcherFunc(t *testing.T) { + var called bool + f := dataset.FetcherFunc(func() (bool, error) { + called = true + return true, nil + }) + updated, err := f.Fetch() + if err != nil { + t.Fatal(err) + } + if !called || !updated { + t.Errorf("FetcherFunc: called=%v updated=%v", called, updated) + } +} diff --git a/sync/dataset/go.mod b/sync/dataset/go.mod new file mode 100644 index 0000000..b22cfc7 --- /dev/null +++ b/sync/dataset/go.mod @@ -0,0 +1,3 @@ +module github.com/therootcompany/golib/sync/dataset + +go 1.26.0