mirror of
https://github.com/therootcompany/golib.git
synced 2026-04-24 20:58:00 +00:00
feat(dataset): close-on-swap, AddInitial, LoadedAt, Loaded
- reload() Closes replaced value if it implements io.Closer (geoip readers leak mmap/file handles on hot-swap without this) - AddInitial pre-populates a view so Value() is non-nil before first Load — enables async-load startup paths - View.LoadedAt() and Set.Loaded() expose load state for health checks
This commit is contained in:
parent
46b31b75c2
commit
bcda75d673
@ -16,6 +16,7 @@ package dataset
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
@ -103,6 +104,11 @@ func NewSet(fetchers ...Fetcher) *Set {
|
|||||||
return &Set{fetchers: fetchers}
|
return &Set{fetchers: fetchers}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Loaded reports whether Load has completed successfully at least once.
|
||||||
|
func (s *Set) Loaded() bool {
|
||||||
|
return s.loaded.Load()
|
||||||
|
}
|
||||||
|
|
||||||
// Load fetches upstream and, on the first call or whenever any fetcher
|
// Load fetches upstream and, on the first call or whenever any fetcher
|
||||||
// reports a change, reloads every view and atomically installs the new values.
|
// reports a change, reloads every view and atomically installs the new values.
|
||||||
func (s *Set) Load(ctx context.Context) error {
|
func (s *Set) Load(ctx context.Context) error {
|
||||||
@ -153,26 +159,58 @@ func (s *Set) Tick(ctx context.Context, interval time.Duration, onError func(err
|
|||||||
type View[T any] struct {
|
type View[T any] struct {
|
||||||
loader func() (*T, error)
|
loader func() (*T, error)
|
||||||
ptr atomic.Pointer[T]
|
ptr atomic.Pointer[T]
|
||||||
|
loadedAt atomic.Pointer[time.Time] // nil until first successful reload
|
||||||
}
|
}
|
||||||
|
|
||||||
// Value returns the current snapshot. Nil before the Set is first loaded.
|
// Value returns the current snapshot. Nil before the Set is first loaded
|
||||||
|
// unless the view was registered via AddInitial.
|
||||||
func (v *View[T]) Value() *T {
|
func (v *View[T]) Value() *T {
|
||||||
return v.ptr.Load()
|
return v.ptr.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LoadedAt returns the time of the most recent successful reload, or the
|
||||||
|
// zero time if the view has never loaded.
|
||||||
|
func (v *View[T]) LoadedAt() time.Time {
|
||||||
|
if t := v.loadedAt.Load(); t != nil {
|
||||||
|
return *t
|
||||||
|
}
|
||||||
|
return time.Time{}
|
||||||
|
}
|
||||||
|
|
||||||
func (v *View[T]) reload() error {
|
func (v *View[T]) reload() error {
|
||||||
t, err := v.loader()
|
t, err := v.loader()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
v.ptr.Store(t)
|
prev := v.ptr.Swap(t)
|
||||||
|
// Close the replaced value if it holds OS resources (open file handles,
|
||||||
|
// network connections). Geoip readers and similar wrappers implement
|
||||||
|
// io.Closer; cohort and other pure-in-memory values don't — the type
|
||||||
|
// assertion filters to only the ones that need it.
|
||||||
|
if closer, ok := any(prev).(io.Closer); ok && closer != nil {
|
||||||
|
_ = closer.Close()
|
||||||
|
}
|
||||||
|
now := time.Now()
|
||||||
|
v.loadedAt.Store(&now)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add registers a new view in s and returns it. Call after NewSet and before
|
// Add registers a new view in s and returns it. Call after NewSet and before
|
||||||
// the first Load.
|
// the first Load. View.Value() returns nil until Set.Load succeeds.
|
||||||
func Add[T any](s *Set, loader func() (*T, error)) *View[T] {
|
func Add[T any](s *Set, loader func() (*T, error)) *View[T] {
|
||||||
v := &View[T]{loader: loader}
|
v := &View[T]{loader: loader}
|
||||||
s.views = append(s.views, v)
|
s.views = append(s.views, v)
|
||||||
return v
|
return v
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddInitial is like Add but pre-populates the view with initial, so
|
||||||
|
// View.Value() returns a usable (possibly empty) value before the first
|
||||||
|
// Load completes. Use when the initial state is benign (e.g. an empty
|
||||||
|
// cohort matches nothing) and you want to start serving before the
|
||||||
|
// first load finishes.
|
||||||
|
func AddInitial[T any](s *Set, initial *T, loader func() (*T, error)) *View[T] {
|
||||||
|
v := &View[T]{loader: loader}
|
||||||
|
v.ptr.Store(initial)
|
||||||
|
s.views = append(s.views, v)
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user