mirror of
https://github.com/therootcompany/golib.git
synced 2026-04-24 20:58:00 +00:00
Remove View[T] — Add now returns *Dataset[T] directly. Callers use Load() on the returned Dataset; Init/Run belong to the owning Group. main.go simplified: declare syncer + file paths per case, then one g.Init() and one g.Run(). No manual loops over individual datasets. Add gitshallow.Repo.FilePath helper.
214 lines
5.4 KiB
Go
214 lines
5.4 KiB
Go
// Package dataset couples a Syncer (fetch) with a Loader (parse) and an
|
|
// atomic.Pointer (hot-swap), providing a generic periodically-updated
|
|
// in-memory dataset with lock-free reads.
|
|
//
|
|
// Standalone dataset:
|
|
//
|
|
// ds := dataset.New(cacher, func() (*MyType, error) {
|
|
// return mytype.LoadFile(path)
|
|
// })
|
|
// if err := ds.Init(); err != nil { ... }
|
|
// go ds.Run(ctx, 47*time.Minute)
|
|
// val := ds.Load() // *MyType, lock-free
|
|
//
|
|
// Group (one syncer, multiple datasets):
|
|
//
|
|
// g := dataset.NewGroup(repo)
|
|
// inbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... })
|
|
// outbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... })
|
|
// if err := g.Init(); err != nil { ... }
|
|
// go g.Run(ctx, 47*time.Minute)
|
|
// val := inbound.Load() // lock-free; Init/Run belong to the Group
|
|
package dataset
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// Syncer reports whether a remote resource has changed.
|
|
type Syncer interface {
|
|
Fetch() (updated bool, err error)
|
|
}
|
|
|
|
// MultiSyncer fans out Fetch to multiple Syncers, returning updated=true if
|
|
// any reports a change. Stops and returns the first error.
|
|
type MultiSyncer []Syncer
|
|
|
|
func (ms MultiSyncer) Fetch() (bool, error) {
|
|
var anyUpdated bool
|
|
for _, s := range ms {
|
|
updated, err := s.Fetch()
|
|
if err != nil {
|
|
return anyUpdated, err
|
|
}
|
|
anyUpdated = anyUpdated || updated
|
|
}
|
|
return anyUpdated, nil
|
|
}
|
|
|
|
// NopSyncer always reports no update. Use for local-file datasets.
|
|
type NopSyncer struct{}
|
|
|
|
func (NopSyncer) Fetch() (bool, error) { return false, nil }
|
|
|
|
// Dataset couples a Syncer, a load function, and an atomic.Pointer[T].
|
|
// Load is safe for concurrent use without locks.
|
|
//
|
|
// When a Dataset is added to a Group via Add, Init and Run belong to the
|
|
// Group; call Load on the Dataset to read the current value.
|
|
type Dataset[T any] struct {
|
|
// Name is used in error messages.
|
|
Name string
|
|
// Close is called with the old value after each successful swap.
|
|
Close func(*T)
|
|
|
|
syncer Syncer
|
|
load func() (*T, error)
|
|
ptr atomic.Pointer[T]
|
|
}
|
|
|
|
// New creates a Dataset. The syncer reports changes; load produces the value.
|
|
func New[T any](syncer Syncer, load func() (*T, error)) *Dataset[T] {
|
|
return &Dataset[T]{syncer: syncer, load: load}
|
|
}
|
|
|
|
// Load returns the current value. Returns nil before Init is called.
|
|
func (d *Dataset[T]) Load() *T {
|
|
return d.ptr.Load()
|
|
}
|
|
|
|
func (d *Dataset[T]) swap() error {
|
|
val, err := d.load()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if old := d.ptr.Swap(val); old != nil && d.Close != nil {
|
|
d.Close(old)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Sync calls the syncer and, if updated, reloads and atomically installs the
|
|
// new value. Returns whether the source changed.
|
|
func (d *Dataset[T]) Sync() (bool, error) {
|
|
updated, err := d.syncer.Fetch()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if !updated {
|
|
return false, nil
|
|
}
|
|
return true, d.swap()
|
|
}
|
|
|
|
// Init syncs and always loads, ensuring the dataset is populated from an
|
|
// existing local file even if nothing changed upstream.
|
|
func (d *Dataset[T]) Init() error {
|
|
if _, err := d.syncer.Fetch(); err != nil {
|
|
return err
|
|
}
|
|
return d.swap()
|
|
}
|
|
|
|
// Run calls Sync on every interval. Errors are written to stderr and do not
|
|
// stop the loop.
|
|
func (d *Dataset[T]) Run(ctx context.Context, interval time.Duration) {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
if _, err := d.Sync(); err != nil {
|
|
name := d.Name
|
|
if name == "" {
|
|
name = "dataset"
|
|
}
|
|
fmt.Fprintf(os.Stderr, "%s: sync error: %v\n", name, err)
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// -- Group: one Syncer driving multiple datasets ---------------------------
|
|
|
|
// member is the type-erased swap handle stored in a Group.
|
|
type member interface {
|
|
swap() error
|
|
}
|
|
|
|
// Group ties one Syncer to multiple datasets so a single Fetch drives all
|
|
// swaps — no redundant network calls when datasets share a source.
|
|
//
|
|
// Datasets added via Add are owned by the Group; call Init and Run on the
|
|
// Group, not on individual datasets.
|
|
type Group struct {
|
|
syncer Syncer
|
|
members []member
|
|
}
|
|
|
|
// NewGroup creates a Group backed by syncer.
|
|
func NewGroup(syncer Syncer) *Group {
|
|
return &Group{syncer: syncer}
|
|
}
|
|
|
|
func (g *Group) swapAll() error {
|
|
for _, m := range g.members {
|
|
if err := m.swap(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Sync calls the syncer and, if updated, reloads all member datasets.
|
|
// Returns whether the source changed.
|
|
func (g *Group) Sync() (bool, error) {
|
|
updated, err := g.syncer.Fetch()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if !updated {
|
|
return false, nil
|
|
}
|
|
return true, g.swapAll()
|
|
}
|
|
|
|
// Init syncs and always loads all datasets.
|
|
func (g *Group) Init() error {
|
|
if _, err := g.syncer.Fetch(); err != nil {
|
|
return err
|
|
}
|
|
return g.swapAll()
|
|
}
|
|
|
|
// Run calls Sync on every interval; reloads all datasets only when the source
|
|
// reports a change.
|
|
func (g *Group) Run(ctx context.Context, interval time.Duration) {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
if _, err := g.Sync(); err != nil {
|
|
fmt.Fprintf(os.Stderr, "dataset group: sync error: %v\n", err)
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Add registers a new dataset in g and returns it for reading via Load.
|
|
// Init and Run are driven by the Group; do not call them on the returned Dataset.
|
|
func Add[T any](g *Group, load func() (*T, error)) *Dataset[T] {
|
|
d := &Dataset[T]{load: load}
|
|
g.members = append(g.members, d)
|
|
return d
|
|
}
|