From bcda75d673b87fcf8ca172246e8a7805f705aa13 Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Mon, 20 Apr 2026 19:26:15 -0600 Subject: [PATCH] feat(dataset): close-on-swap, AddInitial, LoadedAt, Loaded MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - reload() Closes replaced value if it implements io.Closer (geoip readers leak mmap/file handles on hot-swap without this) - AddInitial pre-populates a view so Value() is non-nil before first Load — enables async-load startup paths - View.LoadedAt() and Set.Loaded() expose load state for health checks --- sync/dataset/dataset.go | 48 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/sync/dataset/dataset.go b/sync/dataset/dataset.go index 3fb9bdc..2a83e8c 100644 --- a/sync/dataset/dataset.go +++ b/sync/dataset/dataset.go @@ -16,6 +16,7 @@ package dataset import ( "context" + "io" "os" "sync" "sync/atomic" @@ -103,6 +104,11 @@ func NewSet(fetchers ...Fetcher) *Set { return &Set{fetchers: fetchers} } +// Loaded reports whether Load has completed successfully at least once. +func (s *Set) Loaded() bool { + return s.loaded.Load() +} + // Load fetches upstream and, on the first call or whenever any fetcher // reports a change, reloads every view and atomically installs the new values. func (s *Set) Load(ctx context.Context) error { @@ -151,28 +157,60 @@ func (s *Set) Tick(ctx context.Context, interval time.Duration, onError func(err // View is a read-only handle to one dataset inside a Set. type View[T any] struct { - loader func() (*T, error) - ptr atomic.Pointer[T] + loader func() (*T, error) + ptr atomic.Pointer[T] + loadedAt atomic.Pointer[time.Time] // nil until first successful reload } -// Value returns the current snapshot. Nil before the Set is first loaded. +// Value returns the current snapshot. Nil before the Set is first loaded +// unless the view was registered via AddInitial. func (v *View[T]) Value() *T { return v.ptr.Load() } +// LoadedAt returns the time of the most recent successful reload, or the +// zero time if the view has never loaded. +func (v *View[T]) LoadedAt() time.Time { + if t := v.loadedAt.Load(); t != nil { + return *t + } + return time.Time{} +} + func (v *View[T]) reload() error { t, err := v.loader() if err != nil { return err } - v.ptr.Store(t) + prev := v.ptr.Swap(t) + // Close the replaced value if it holds OS resources (open file handles, + // network connections). Geoip readers and similar wrappers implement + // io.Closer; cohort and other pure-in-memory values don't — the type + // assertion filters to only the ones that need it. + if closer, ok := any(prev).(io.Closer); ok && closer != nil { + _ = closer.Close() + } + now := time.Now() + v.loadedAt.Store(&now) return nil } // Add registers a new view in s and returns it. Call after NewSet and before -// the first Load. +// the first Load. View.Value() returns nil until Set.Load succeeds. func Add[T any](s *Set, loader func() (*T, error)) *View[T] { v := &View[T]{loader: loader} s.views = append(s.views, v) return v } + +// AddInitial is like Add but pre-populates the view with initial, so +// View.Value() returns a usable (possibly empty) value before the first +// Load completes. Use when the initial state is benign (e.g. an empty +// cohort matches nothing) and you want to start serving before the +// first load finishes. +func AddInitial[T any](s *Set, initial *T, loader func() (*T, error)) *View[T] { + v := &View[T]{loader: loader} + v.ptr.Store(initial) + s.views = append(s.views, v) + return v +}