From 10c4b6dbc31157ea5790a8c310c21c487164648d Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Mon, 20 Apr 2026 09:23:14 -0600 Subject: [PATCH] =?UTF-8?q?feat:=20add=20net/dataset=20=E2=80=94=20generic?= =?UTF-8?q?=20Syncer=E2=86=92atomic.Pointer=20with=20Dataset=20and=20Group?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Dataset[T]: one Syncer + one Loader + one atomic.Pointer. Init/Sync/Run. Group: one Syncer driving N datasets — single Fetch, all reloads fire together. Add[T](g, loader, path) registers a typed dataset in the group. Discovered organically: the reload+atomic-swap pattern repeated across every cmd is exactly this abstraction. --- net/dataset/dataset.go | 168 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 168 insertions(+) create mode 100644 net/dataset/dataset.go diff --git a/net/dataset/dataset.go b/net/dataset/dataset.go new file mode 100644 index 0000000..eb5fb8d --- /dev/null +++ b/net/dataset/dataset.go @@ -0,0 +1,168 @@ +// Package dataset couples a Syncer (fetch) with a Loader (parse) and an +// atomic.Pointer (hot-swap), providing a generic periodically-updated +// in-memory dataset with lock-free reads. +// +// Single dataset: +// +// ds := dataset.New(cacher, ipcohort.LoadFile, path) +// if err := ds.Init(); err != nil { ... } +// go ds.Run(ctx, 47*time.Minute) +// cohort := ds.Load() +// +// Multiple datasets sharing one syncer (e.g. inbound + outbound from one git repo): +// +// g := dataset.NewGroup(repo) +// inbound := dataset.Add(g, ipcohort.LoadFile, inboundPath) +// outbound := dataset.Add(g, ipcohort.LoadFile, outboundPath) +// if err := g.Init(); err != nil { ... } +// go g.Run(ctx, 47*time.Minute) +// in := inbound.Load() +// out := outbound.Load() +package dataset + +import ( + "context" + "fmt" + "os" + "sync/atomic" + "time" + + "github.com/therootcompany/golib/net/httpcache" +) + +// Loader reads path and returns the parsed value, or an error. +type Loader[T any] func(path string) (*T, error) + +// Dataset couples a Syncer, a Loader, and an atomic.Pointer. +// Load is safe for concurrent use without locks. +type Dataset[T any] struct { + syncer httpcache.Syncer + load Loader[T] + path string + ptr atomic.Pointer[T] +} + +// New creates a Dataset. The syncer fetches updates to path; load parses it. +func New[T any](syncer httpcache.Syncer, load Loader[T], path string) *Dataset[T] { + return &Dataset[T]{syncer: syncer, load: load, path: path} +} + +// Load returns the current value. Returns nil before Init is called. +func (d *Dataset[T]) Load() *T { + return d.ptr.Load() +} + +// Init fetches (if the syncer needs it) then loads, ensuring the dataset is +// populated on startup from an existing local file even if nothing changed. +func (d *Dataset[T]) Init() error { + if _, err := d.syncer.Fetch(); err != nil { + return err + } + return d.reload() +} + +// Sync fetches from the remote and reloads if the content changed. +// Returns whether the value was updated. +func (d *Dataset[T]) Sync() (bool, error) { + updated, err := d.syncer.Fetch() + if err != nil || !updated { + return updated, err + } + return true, d.reload() +} + +// Run calls Sync on every interval until ctx is done. +// Errors are written to stderr and do not stop the loop. +func (d *Dataset[T]) Run(ctx context.Context, interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if _, err := d.Sync(); err != nil { + fmt.Fprintf(os.Stderr, "dataset %s: sync error: %v\n", d.path, err) + } + case <-ctx.Done(): + return + } + } +} + +func (d *Dataset[T]) reload() error { + val, err := d.load(d.path) + if err != nil { + return err + } + d.ptr.Store(val) + return nil +} + +// -- Group: one Syncer driving multiple datasets --------------------------- + +// entry is the type-erased reload handle stored in a Group. +type entry interface { + reload() error +} + +// Group ties one Syncer to multiple datasets so a single Fetch drives all +// reloads — avoiding redundant network calls when datasets share a source +// (e.g. multiple files from the same git repo or HTTP directory). +type Group struct { + syncer httpcache.Syncer + entries []entry +} + +// NewGroup creates a Group backed by syncer. +func NewGroup(syncer httpcache.Syncer) *Group { + return &Group{syncer: syncer} +} + +// Add registers a new dataset in g and returns it. Subsequent Init/Sync/Run +// calls on g will reload this dataset whenever the syncer reports an update. +func Add[T any](g *Group, load Loader[T], path string) *Dataset[T] { + d := &Dataset[T]{load: load, path: path} + g.entries = append(g.entries, d) + return d +} + +// Init fetches once then reloads all registered datasets. +func (g *Group) Init() error { + if _, err := g.syncer.Fetch(); err != nil { + return err + } + return g.reloadAll() +} + +// Sync fetches and reloads all datasets if the syncer reports an update. +func (g *Group) Sync() (bool, error) { + updated, err := g.syncer.Fetch() + if err != nil || !updated { + return updated, err + } + return true, g.reloadAll() +} + +// Run calls Sync on every interval until ctx is done. +func (g *Group) Run(ctx context.Context, interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if _, err := g.Sync(); err != nil { + fmt.Fprintf(os.Stderr, "dataset group: sync error: %v\n", err) + } + case <-ctx.Done(): + return + } + } +} + +func (g *Group) reloadAll() error { + for _, e := range g.entries { + if err := e.reload(); err != nil { + return err + } + } + return nil +}