mirror of
https://github.com/therootcompany/golib.git
synced 2026-04-24 12:48:00 +00:00
Libraries shouldn't decide where errors go. Tick now passes Load errors to onError (nil to ignore); callers pick log/count/page. check-ip supplies its own stderr writer.
127 lines
3.6 KiB
Go
127 lines
3.6 KiB
Go
// Package dataset manages values that are periodically re-fetched from an
|
|
// upstream source and hot-swapped behind atomic pointers. Consumers read via
|
|
// View.Value (lock-free); a single Load drives any number of views off one
|
|
// Fetcher, so shared sources (one git pull, one zip download) don't get
|
|
// re-fetched per view.
|
|
//
|
|
// Typical lifecycle:
|
|
//
|
|
// g := dataset.NewGroup(repo) // *gitshallow.Repo satisfies Fetcher
|
|
// inbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... })
|
|
// outbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... })
|
|
// if err := g.Load(ctx); err != nil { ... } // initial populate
|
|
// go g.Tick(ctx, 47*time.Minute) // background refresh
|
|
// current := inbound.Value() // lock-free read
|
|
package dataset
|
|
|
|
import (
|
|
"context"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// Fetcher reports whether an upstream source has changed since the last call.
|
|
// Implementations should dedup rapid-fire calls internally (e.g. gitshallow
|
|
// skips redundant pulls within a short window; httpcache uses ETag).
|
|
type Fetcher interface {
|
|
Fetch() (updated bool, err error)
|
|
}
|
|
|
|
// FetcherFunc adapts a plain function to Fetcher.
|
|
type FetcherFunc func() (bool, error)
|
|
|
|
func (f FetcherFunc) Fetch() (bool, error) { return f() }
|
|
|
|
// NopFetcher always reports no update. Use for groups backed by local files
|
|
// that don't need a refresh cycle.
|
|
type NopFetcher struct{}
|
|
|
|
func (NopFetcher) Fetch() (bool, error) { return false, nil }
|
|
|
|
// Group ties one Fetcher to one or more views. A Load call fetches once and,
|
|
// on the first call or when the source reports a change, reloads every view
|
|
// and atomically swaps its current value.
|
|
type Group struct {
|
|
fetcher Fetcher
|
|
views []reloader
|
|
loaded atomic.Bool
|
|
}
|
|
|
|
// reloader is a type-erased handle to a View's reload function.
|
|
type reloader interface {
|
|
reload() error
|
|
}
|
|
|
|
// NewGroup creates a Group backed by fetcher.
|
|
func NewGroup(fetcher Fetcher) *Group {
|
|
return &Group{fetcher: fetcher}
|
|
}
|
|
|
|
// Load fetches upstream and, on the first call or whenever the fetcher reports
|
|
// a change, reloads every view and atomically installs the new values.
|
|
func (g *Group) Load(ctx context.Context) error {
|
|
updated, err := g.fetcher.Fetch()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if g.loaded.Load() && !updated {
|
|
return nil
|
|
}
|
|
for _, v := range g.views {
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
if err := v.reload(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
g.loaded.Store(true)
|
|
return nil
|
|
}
|
|
|
|
// Tick calls Load every interval until ctx is done. Load errors are passed to
|
|
// onError (if non-nil) and do not stop the loop; callers choose whether to log,
|
|
// count, page, or ignore. Run in a goroutine: `go g.Tick(ctx, d, onError)`.
|
|
func (g *Group) Tick(ctx context.Context, interval time.Duration, onError func(error)) {
|
|
t := time.NewTicker(interval)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-t.C:
|
|
if err := g.Load(ctx); err != nil && onError != nil {
|
|
onError(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// View is a read-only handle to one dataset inside a Group.
|
|
type View[T any] struct {
|
|
loader func() (*T, error)
|
|
ptr atomic.Pointer[T]
|
|
}
|
|
|
|
// Value returns the current snapshot. Nil before the Group is first loaded.
|
|
func (v *View[T]) Value() *T {
|
|
return v.ptr.Load()
|
|
}
|
|
|
|
func (v *View[T]) reload() error {
|
|
t, err := v.loader()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
v.ptr.Store(t)
|
|
return nil
|
|
}
|
|
|
|
// Add registers a new view in g and returns it. Call after NewGroup and
|
|
// before the first Load.
|
|
func Add[T any](g *Group, loader func() (*T, error)) *View[T] {
|
|
v := &View[T]{loader: loader}
|
|
g.views = append(g.views, v)
|
|
return v
|
|
}
|