diff --git a/net/dataset/dataset.go b/net/dataset/dataset.go index f9a88e7..1dcc87a 100644 --- a/net/dataset/dataset.go +++ b/net/dataset/dataset.go @@ -2,7 +2,7 @@ // atomic.Pointer (hot-swap), providing a generic periodically-updated // in-memory dataset with lock-free reads. // -// Standalone dataset (one syncer, one value): +// Standalone dataset: // // ds := dataset.New(cacher, func() (*MyType, error) { // return mytype.LoadFile(path) @@ -11,11 +11,11 @@ // go ds.Run(ctx, 47*time.Minute) // val := ds.Load() // *MyType, lock-free // -// Group (one syncer, multiple values — e.g. inbound+outbound from one git repo): +// Group (one syncer, multiple values): // // g := dataset.NewGroup(repo) -// inbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { return ipcohort.LoadFiles(inboundPaths...) }) -// outbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { return ipcohort.LoadFiles(outboundPaths...) }) +// inbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... }) +// outbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... }) // if err := g.Init(); err != nil { ... } // go g.Run(ctx, 47*time.Minute) package dataset @@ -28,8 +28,7 @@ import ( "time" ) -// Syncer is implemented by any value that can fetch a remote resource and -// report whether it changed. +// Syncer reports whether a remote resource has changed. type Syncer interface { Fetch() (updated bool, err error) } @@ -50,8 +49,7 @@ func (ms MultiSyncer) Fetch() (bool, error) { return anyUpdated, nil } -// NopSyncer is a Syncer that always reports no update and no error. -// Use for datasets backed by local files with no remote source. +// NopSyncer always reports no update. Use for local-file datasets. type NopSyncer struct{} func (NopSyncer) Fetch() (bool, error) { return false, nil } @@ -59,10 +57,9 @@ func (NopSyncer) Fetch() (bool, error) { return false, nil } // Dataset couples a Syncer, a load function, and an atomic.Pointer[T]. // Load is safe for concurrent use without locks. type Dataset[T any] struct { - // Name is used in error messages. Optional. + // Name is used in error messages. Name string - // Close is called with the previous value after each successful swap. - // Use this for values that hold resources, e.g. func(r *geoip2.Reader) { r.Close() }. + // Close is called with the old value after each successful swap. Close func(*T) syncer Syncer @@ -70,8 +67,7 @@ type Dataset[T any] struct { ptr atomic.Pointer[T] } -// New creates a Dataset. The syncer fetches updates; load produces the value. -// load is a closure — it captures whatever paths or config it needs. +// New creates a Dataset. The syncer reports changes; load produces the value. func New[T any](syncer Syncer, load func() (*T, error)) *Dataset[T] { return &Dataset[T]{syncer: syncer, load: load} } @@ -81,26 +77,41 @@ func (d *Dataset[T]) Load() *T { return d.ptr.Load() } -// Init fetches (if needed) then always loads, ensuring the dataset is -// populated on startup from an existing local file even if nothing changed. +func (d *Dataset[T]) swap() error { + val, err := d.load() + if err != nil { + return err + } + if old := d.ptr.Swap(val); old != nil && d.Close != nil { + d.Close(old) + } + return nil +} + +// Sync calls the syncer and, if updated, reloads and atomically installs the +// new value. Returns whether the source changed. +func (d *Dataset[T]) Sync() (bool, error) { + updated, err := d.syncer.Fetch() + if err != nil { + return false, err + } + if !updated { + return false, nil + } + return true, d.swap() +} + +// Init syncs and always loads, ensuring the dataset is populated from an +// existing local file even if nothing changed upstream. func (d *Dataset[T]) Init() error { if _, err := d.syncer.Fetch(); err != nil { return err } - return d.reload() + return d.swap() } -// Sync fetches and reloads if the content changed. Returns whether updated. -func (d *Dataset[T]) Sync() (bool, error) { - updated, err := d.syncer.Fetch() - if err != nil || !updated { - return updated, err - } - return true, d.reload() -} - -// Run calls Sync on every interval until ctx is done. -// Errors are written to stderr and do not stop the loop. +// Run calls Sync on every interval. Errors are written to stderr and do not +// stop the loop. func (d *Dataset[T]) Run(ctx context.Context, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() @@ -120,26 +131,15 @@ func (d *Dataset[T]) Run(ctx context.Context, interval time.Duration) { } } -func (d *Dataset[T]) reload() error { - val, err := d.load() - if err != nil { - return err - } - if old := d.ptr.Swap(val); old != nil && d.Close != nil { - d.Close(old) - } - return nil -} - // -- Group: one Syncer driving multiple datasets --------------------------- -// member is the type-erased reload handle stored in a Group. +// member is the type-erased swap handle stored in a Group. type member interface { - reload() error + swap() error } // Group ties one Syncer to multiple datasets so a single Fetch drives all -// reloads — no redundant network calls when datasets share a source. +// swaps — no redundant network calls when datasets share a source. type Group struct { syncer Syncer members []member @@ -150,44 +150,38 @@ func NewGroup(syncer Syncer) *Group { return &Group{syncer: syncer} } -// View is the read-only handle returned by Add. It exposes only Load — -// fetch and reload are driven by the owning Group. -type View[T any] struct { - d *Dataset[T] +func (g *Group) swapAll() error { + for _, m := range g.members { + if err := m.swap(); err != nil { + return err + } + } + return nil } -// Load returns the current value. Returns nil before the Group is initialised. -func (v *View[T]) Load() *T { return v.d.ptr.Load() } - -func (v *View[T]) reload() error { return v.d.reload() } - -// Add registers a new dataset in g and returns a View. Call Load to read the -// current value. Drive updates by calling Init/Sync/Run on the Group. -// load is a closure capturing whatever paths or config it needs. -func Add[T any](g *Group, load func() (*T, error)) *View[T] { - v := &View[T]{d: &Dataset[T]{load: load}} - g.members = append(g.members, v) - return v +// Sync calls the syncer and, if updated, reloads all member datasets. +// Returns whether the source changed. +func (g *Group) Sync() (bool, error) { + updated, err := g.syncer.Fetch() + if err != nil { + return false, err + } + if !updated { + return false, nil + } + return true, g.swapAll() } -// Init fetches once then reloads all registered datasets. +// Init syncs and always loads all datasets. func (g *Group) Init() error { if _, err := g.syncer.Fetch(); err != nil { return err } - return g.reloadAll() + return g.swapAll() } -// Sync fetches and reloads all datasets if the syncer reports an update. -func (g *Group) Sync() (bool, error) { - updated, err := g.syncer.Fetch() - if err != nil || !updated { - return updated, err - } - return true, g.reloadAll() -} - -// Run calls Sync on every interval until ctx is done. +// Run calls Sync on every interval; reloads all datasets only when the source +// reports a change. func (g *Group) Run(ctx context.Context, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() @@ -203,11 +197,20 @@ func (g *Group) Run(ctx context.Context, interval time.Duration) { } } -func (g *Group) reloadAll() error { - for _, m := range g.members { - if err := m.reload(); err != nil { - return err - } - } - return nil +// View is the read-only handle returned by Add. Sync is driven by the owning +// Group. +type View[T any] struct { + d *Dataset[T] +} + +// Load returns the current value. Returns nil before the Group is initialised. +func (v *View[T]) Load() *T { return v.d.ptr.Load() } + +func (v *View[T]) swap() error { return v.d.swap() } + +// Add registers a new dataset in g and returns a View for reading. +func Add[T any](g *Group, load func() (*T, error)) *View[T] { + v := &View[T]{d: &Dataset[T]{load: load}} + g.members = append(g.members, v) + return v }