golib/sync/dataset/dataset.go
AJ ONeal 11743c9a10
feat(sync/dataset): minimal group/view/fetcher for hot-swap refresh
Distilled from the previous net/dataset experiment and the inline
closure version in check-ip. Keeps what actually earned its keep:

  - Group ties one Fetcher to N views; a single Load drives all swaps,
    so shared sources (one git pull, one zip download) don't get
    re-fetched per view.
  - View[T].Value() is a lock-free atomic read; the atomic.Pointer is
    hidden so consumers never see in-flight reloads.
  - Tick runs Load on a ticker with stderr error logging.

Dropped from the v1 design: MultiSyncer (callers fan-out inline when
needed), Close (unused outside geoip), Name (callers wrap the logger),
standalone Dataset type (Group with one view covers it), Sync vs Init
asymmetry (Load handles first-call vs update internally).

check-ip rewires to use it — file/git/http modes all build a Group
with two views, uniform shape.
2026-04-20 13:33:05 -06:00

128 lines
3.5 KiB
Go

// Package dataset manages values that are periodically re-fetched from an
// upstream source and hot-swapped behind atomic pointers. Consumers read via
// View.Value (lock-free); a single Load drives any number of views off one
// Fetcher, so shared sources (one git pull, one zip download) don't get
// re-fetched per view.
//
// Typical lifecycle:
//
// g := dataset.NewGroup(repo) // *gitshallow.Repo satisfies Fetcher
// inbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... })
// outbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... })
// if err := g.Load(ctx); err != nil { ... } // initial populate
// go g.Tick(ctx, 47*time.Minute) // background refresh
// current := inbound.Value() // lock-free read
package dataset
import (
"context"
"fmt"
"os"
"sync/atomic"
"time"
)
// Fetcher reports whether an upstream source has changed since the last call.
// Implementations should dedup rapid-fire calls internally (e.g. gitshallow
// skips redundant pulls within a short window; httpcache uses ETag).
type Fetcher interface {
Fetch() (updated bool, err error)
}
// FetcherFunc adapts a plain function to Fetcher.
type FetcherFunc func() (bool, error)
func (f FetcherFunc) Fetch() (bool, error) { return f() }
// NopFetcher always reports no update. Use for groups backed by local files
// that don't need a refresh cycle.
type NopFetcher struct{}
func (NopFetcher) Fetch() (bool, error) { return false, nil }
// Group ties one Fetcher to one or more views. A Load call fetches once and,
// on the first call or when the source reports a change, reloads every view
// and atomically swaps its current value.
type Group struct {
fetcher Fetcher
views []reloader
loaded atomic.Bool
}
// reloader is a type-erased handle to a View's reload function.
type reloader interface {
reload() error
}
// NewGroup creates a Group backed by fetcher.
func NewGroup(fetcher Fetcher) *Group {
return &Group{fetcher: fetcher}
}
// Load fetches upstream and, on the first call or whenever the fetcher reports
// a change, reloads every view and atomically installs the new values.
func (g *Group) Load(ctx context.Context) error {
updated, err := g.fetcher.Fetch()
if err != nil {
return err
}
if g.loaded.Load() && !updated {
return nil
}
for _, v := range g.views {
if err := ctx.Err(); err != nil {
return err
}
if err := v.reload(); err != nil {
return err
}
}
g.loaded.Store(true)
return nil
}
// Tick calls Load every interval until ctx is done. Errors are written to
// stderr and do not stop the loop.
func (g *Group) Tick(ctx context.Context, interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
if err := g.Load(ctx); err != nil {
fmt.Fprintf(os.Stderr, "dataset: load error: %v\n", err)
}
}
}
}
// View is a read-only handle to one dataset inside a Group.
type View[T any] struct {
loader func() (*T, error)
ptr atomic.Pointer[T]
}
// Value returns the current snapshot. Nil before the Group is first loaded.
func (v *View[T]) Value() *T {
return v.ptr.Load()
}
func (v *View[T]) reload() error {
t, err := v.loader()
if err != nil {
return err
}
v.ptr.Store(t)
return nil
}
// Add registers a new view in g and returns it. Call after NewGroup and
// before the first Load.
func Add[T any](g *Group, loader func() (*T, error)) *View[T] {
v := &View[T]{loader: loader}
g.views = append(g.views, v)
return v
}