mirror of
https://github.com/therootcompany/golib.git
synced 2026-04-24 20:58:00 +00:00
refactor: delete net/dataset package
check-ip and geoip no longer use it; formmailer now takes *atomic.Pointer[ipcohort.Cohort] for Blacklist so callers own the refresh + swap lifecycle directly. gitshallow doc comments that referenced dataset.Syncer are trimmed. The concepts the package tried to share (atomic-swap, group sync, ticker-driven refresh) may come back under sync/dataset once we have more than one in-tree caller that wants them.
This commit is contained in:
parent
5985ea5e2d
commit
01a9185c03
2
go.work
2
go.work
@ -3,7 +3,6 @@ go 1.26.1
|
||||
use (
|
||||
.
|
||||
./cmd/check-ip
|
||||
./net/dataset
|
||||
./net/formmailer
|
||||
./net/geoip
|
||||
./net/gitshallow
|
||||
@ -12,7 +11,6 @@ use (
|
||||
)
|
||||
|
||||
replace (
|
||||
github.com/therootcompany/golib/net/dataset v0.0.0 => ./net/dataset
|
||||
github.com/therootcompany/golib/net/formmailer v0.0.0 => ./net/formmailer
|
||||
github.com/therootcompany/golib/net/geoip v0.0.0 => ./net/geoip
|
||||
github.com/therootcompany/golib/net/gitshallow v0.0.0 => ./net/gitshallow
|
||||
|
||||
@ -1,213 +0,0 @@
|
||||
// Package dataset couples a Syncer (fetch) with a Loader (parse) and an
|
||||
// atomic.Pointer (hot-swap), providing a generic periodically-updated
|
||||
// in-memory dataset with lock-free reads.
|
||||
//
|
||||
// Standalone dataset:
|
||||
//
|
||||
// ds := dataset.New(cacher, func() (*MyType, error) {
|
||||
// return mytype.LoadFile(path)
|
||||
// })
|
||||
// if err := ds.Init(); err != nil { ... }
|
||||
// go ds.Run(ctx, 47*time.Minute)
|
||||
// val := ds.Load() // *MyType, lock-free
|
||||
//
|
||||
// Group (one syncer, multiple datasets):
|
||||
//
|
||||
// g := dataset.NewGroup(repo)
|
||||
// inbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... })
|
||||
// outbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... })
|
||||
// if err := g.Init(); err != nil { ... }
|
||||
// go g.Run(ctx, 47*time.Minute)
|
||||
// val := inbound.Load() // lock-free; Init/Run belong to the Group
|
||||
package dataset
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Syncer reports whether a remote resource has changed.
|
||||
type Syncer interface {
|
||||
Fetch() (updated bool, err error)
|
||||
}
|
||||
|
||||
// MultiSyncer fans out Fetch to multiple Syncers, returning updated=true if
|
||||
// any reports a change. Stops and returns the first error.
|
||||
type MultiSyncer []Syncer
|
||||
|
||||
func (ms MultiSyncer) Fetch() (bool, error) {
|
||||
var anyUpdated bool
|
||||
for _, s := range ms {
|
||||
updated, err := s.Fetch()
|
||||
if err != nil {
|
||||
return anyUpdated, err
|
||||
}
|
||||
anyUpdated = anyUpdated || updated
|
||||
}
|
||||
return anyUpdated, nil
|
||||
}
|
||||
|
||||
// NopSyncer always reports no update. Use for local-file datasets.
|
||||
type NopSyncer struct{}
|
||||
|
||||
func (NopSyncer) Fetch() (bool, error) { return false, nil }
|
||||
|
||||
// Dataset couples a Syncer, a load function, and an atomic.Pointer[T].
|
||||
// Load is safe for concurrent use without locks.
|
||||
//
|
||||
// When a Dataset is added to a Group via Add, Init and Run belong to the
|
||||
// Group; call Load on the Dataset to read the current value.
|
||||
type Dataset[T any] struct {
|
||||
// Name is used in error messages.
|
||||
Name string
|
||||
// Close is called with the old value after each successful swap.
|
||||
Close func(*T)
|
||||
|
||||
syncer Syncer
|
||||
load func() (*T, error)
|
||||
ptr atomic.Pointer[T]
|
||||
}
|
||||
|
||||
// New creates a Dataset. The syncer reports changes; load produces the value.
|
||||
func New[T any](syncer Syncer, load func() (*T, error)) *Dataset[T] {
|
||||
return &Dataset[T]{syncer: syncer, load: load}
|
||||
}
|
||||
|
||||
// Load returns the current value. Returns nil before Init is called.
|
||||
func (d *Dataset[T]) Load() *T {
|
||||
return d.ptr.Load()
|
||||
}
|
||||
|
||||
func (d *Dataset[T]) swap() error {
|
||||
val, err := d.load()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if old := d.ptr.Swap(val); old != nil && d.Close != nil {
|
||||
d.Close(old)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sync calls the syncer and, if updated, reloads and atomically installs the
|
||||
// new value. Returns whether the source changed.
|
||||
func (d *Dataset[T]) Sync() (bool, error) {
|
||||
updated, err := d.syncer.Fetch()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !updated {
|
||||
return false, nil
|
||||
}
|
||||
return true, d.swap()
|
||||
}
|
||||
|
||||
// Init syncs and always loads, ensuring the dataset is populated from an
|
||||
// existing local file even if nothing changed upstream.
|
||||
func (d *Dataset[T]) Init() error {
|
||||
if _, err := d.syncer.Fetch(); err != nil {
|
||||
return err
|
||||
}
|
||||
return d.swap()
|
||||
}
|
||||
|
||||
// Run calls Sync on every interval. Errors are written to stderr and do not
|
||||
// stop the loop.
|
||||
func (d *Dataset[T]) Run(ctx context.Context, interval time.Duration) {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if _, err := d.Sync(); err != nil {
|
||||
name := d.Name
|
||||
if name == "" {
|
||||
name = "dataset"
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, "%s: sync error: %v\n", name, err)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// -- Group: one Syncer driving multiple datasets ---------------------------
|
||||
|
||||
// member is the type-erased swap handle stored in a Group.
|
||||
type member interface {
|
||||
swap() error
|
||||
}
|
||||
|
||||
// Group ties one Syncer to multiple datasets so a single Fetch drives all
|
||||
// swaps — no redundant network calls when datasets share a source.
|
||||
//
|
||||
// Datasets added via Add are owned by the Group; call Init and Run on the
|
||||
// Group, not on individual datasets.
|
||||
type Group struct {
|
||||
syncer Syncer
|
||||
members []member
|
||||
}
|
||||
|
||||
// NewGroup creates a Group backed by syncer.
|
||||
func NewGroup(syncer Syncer) *Group {
|
||||
return &Group{syncer: syncer}
|
||||
}
|
||||
|
||||
func (g *Group) swapAll() error {
|
||||
for _, m := range g.members {
|
||||
if err := m.swap(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sync calls the syncer and, if updated, reloads all member datasets.
|
||||
// Returns whether the source changed.
|
||||
func (g *Group) Sync() (bool, error) {
|
||||
updated, err := g.syncer.Fetch()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !updated {
|
||||
return false, nil
|
||||
}
|
||||
return true, g.swapAll()
|
||||
}
|
||||
|
||||
// Init syncs and always loads all datasets.
|
||||
func (g *Group) Init() error {
|
||||
if _, err := g.syncer.Fetch(); err != nil {
|
||||
return err
|
||||
}
|
||||
return g.swapAll()
|
||||
}
|
||||
|
||||
// Run calls Sync on every interval; reloads all datasets only when the source
|
||||
// reports a change.
|
||||
func (g *Group) Run(ctx context.Context, interval time.Duration) {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if _, err := g.Sync(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "dataset group: sync error: %v\n", err)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add registers a new dataset in g and returns it for reading via Load.
|
||||
// Init and Run are driven by the Group; do not call them on the returned Dataset.
|
||||
func Add[T any](g *Group, load func() (*T, error)) *Dataset[T] {
|
||||
d := &Dataset[T]{load: load}
|
||||
g.members = append(g.members, d)
|
||||
return d
|
||||
}
|
||||
@ -1,283 +0,0 @@
|
||||
package dataset_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/therootcompany/golib/net/dataset"
|
||||
)
|
||||
|
||||
// countSyncer counts Fetch calls and optionally reports updated.
|
||||
type countSyncer struct {
|
||||
calls atomic.Int32
|
||||
updated bool
|
||||
err error
|
||||
}
|
||||
|
||||
func (s *countSyncer) Fetch() (bool, error) {
|
||||
s.calls.Add(1)
|
||||
return s.updated, s.err
|
||||
}
|
||||
|
||||
func TestDataset_Init(t *testing.T) {
|
||||
syn := &countSyncer{updated: false}
|
||||
calls := 0
|
||||
ds := dataset.New(syn, func() (*string, error) {
|
||||
calls++
|
||||
v := "hello"
|
||||
return &v, nil
|
||||
})
|
||||
|
||||
if err := ds.Init(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if got := ds.Load(); got == nil || *got != "hello" {
|
||||
t.Fatalf("Load() = %v, want \"hello\"", got)
|
||||
}
|
||||
if calls != 1 {
|
||||
t.Errorf("loader called %d times, want 1", calls)
|
||||
}
|
||||
if syn.calls.Load() != 1 {
|
||||
t.Errorf("Fetch called %d times, want 1", syn.calls.Load())
|
||||
}
|
||||
}
|
||||
|
||||
func TestDataset_LoadBeforeInit(t *testing.T) {
|
||||
syn := dataset.NopSyncer{}
|
||||
ds := dataset.New(syn, func() (*string, error) {
|
||||
v := "x"
|
||||
return &v, nil
|
||||
})
|
||||
if ds.Load() != nil {
|
||||
t.Error("Load() before Init should return nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDataset_SyncNoUpdate(t *testing.T) {
|
||||
syn := &countSyncer{updated: false}
|
||||
calls := 0
|
||||
ds := dataset.New(syn, func() (*string, error) {
|
||||
calls++
|
||||
v := "hello"
|
||||
return &v, nil
|
||||
})
|
||||
if err := ds.Init(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
calls = 0
|
||||
|
||||
updated, err := ds.Sync()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if updated {
|
||||
t.Error("Sync() reported updated=true but syncer returned false")
|
||||
}
|
||||
if calls != 0 {
|
||||
t.Errorf("loader called %d times on no-update Sync, want 0", calls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDataset_SyncWithUpdate(t *testing.T) {
|
||||
syn := &countSyncer{updated: true}
|
||||
n := 0
|
||||
ds := dataset.New(syn, func() (*string, error) {
|
||||
n++
|
||||
v := "v" + string(rune('0'+n))
|
||||
return &v, nil
|
||||
})
|
||||
if err := ds.Init(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
updated, err := ds.Sync()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !updated {
|
||||
t.Error("Sync() reported updated=false but syncer returned true")
|
||||
}
|
||||
if got := ds.Load(); got == nil || *got != "v2" {
|
||||
t.Errorf("Load() after Sync = %v, want \"v2\"", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDataset_InitError(t *testing.T) {
|
||||
syn := &countSyncer{err: errors.New("fetch failed")}
|
||||
ds := dataset.New(syn, func() (*string, error) {
|
||||
v := "x"
|
||||
return &v, nil
|
||||
})
|
||||
if err := ds.Init(); err == nil {
|
||||
t.Error("expected error from Init when syncer fails")
|
||||
}
|
||||
if ds.Load() != nil {
|
||||
t.Error("Load() should be nil after failed Init")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDataset_LoaderError(t *testing.T) {
|
||||
syn := dataset.NopSyncer{}
|
||||
ds := dataset.New(syn, func() (*string, error) {
|
||||
return nil, errors.New("load failed")
|
||||
})
|
||||
if err := ds.Init(); err == nil {
|
||||
t.Error("expected error from Init when loader fails")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDataset_Close(t *testing.T) {
|
||||
syn := &countSyncer{updated: true}
|
||||
var closed []string
|
||||
n := 0
|
||||
ds := dataset.New(syn, func() (*string, error) {
|
||||
n++
|
||||
v := "v" + string(rune('0'+n))
|
||||
return &v, nil
|
||||
})
|
||||
ds.Close = func(s *string) { closed = append(closed, *s) }
|
||||
|
||||
if err := ds.Init(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// First swap: old is nil, Close should not be called.
|
||||
if len(closed) != 0 {
|
||||
t.Errorf("Close called %d times on Init, want 0", len(closed))
|
||||
}
|
||||
|
||||
if _, err := ds.Sync(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(closed) != 1 || closed[0] != "v1" {
|
||||
t.Errorf("Close got %v, want [\"v1\"]", closed)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDataset_Run(t *testing.T) {
|
||||
syn := &countSyncer{updated: true}
|
||||
n := atomic.Int32{}
|
||||
ds := dataset.New(syn, func() (*int32, error) {
|
||||
v := n.Add(1)
|
||||
return &v, nil
|
||||
})
|
||||
if err := ds.Init(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
ds.Run(ctx, 10*time.Millisecond)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
time.Sleep(60 * time.Millisecond)
|
||||
cancel()
|
||||
<-done
|
||||
|
||||
if n.Load() < 2 {
|
||||
t.Errorf("Run did not tick: loader called %d times", n.Load())
|
||||
}
|
||||
}
|
||||
|
||||
// --- Group tests ---
|
||||
|
||||
func TestGroup_Init(t *testing.T) {
|
||||
syn := &countSyncer{}
|
||||
g := dataset.NewGroup(syn)
|
||||
|
||||
callsA, callsB := 0, 0
|
||||
dsA := dataset.Add(g, func() (*string, error) {
|
||||
callsA++
|
||||
v := "a"
|
||||
return &v, nil
|
||||
})
|
||||
dsB := dataset.Add(g, func() (*int, error) {
|
||||
callsB++
|
||||
v := 42
|
||||
return &v, nil
|
||||
})
|
||||
|
||||
if err := g.Init(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if syn.calls.Load() != 1 {
|
||||
t.Errorf("Fetch called %d times, want 1", syn.calls.Load())
|
||||
}
|
||||
if callsA != 1 || callsB != 1 {
|
||||
t.Errorf("loaders called (%d,%d), want (1,1)", callsA, callsB)
|
||||
}
|
||||
if got := dsA.Load(); got == nil || *got != "a" {
|
||||
t.Errorf("dsA.Load() = %v", got)
|
||||
}
|
||||
if got := dsB.Load(); got == nil || *got != 42 {
|
||||
t.Errorf("dsB.Load() = %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGroup_SyncNoUpdate(t *testing.T) {
|
||||
syn := &countSyncer{updated: false}
|
||||
g := dataset.NewGroup(syn)
|
||||
calls := 0
|
||||
dataset.Add(g, func() (*string, error) {
|
||||
calls++
|
||||
v := "x"
|
||||
return &v, nil
|
||||
})
|
||||
if err := g.Init(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
calls = 0
|
||||
|
||||
updated, err := g.Sync()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if updated || calls != 0 {
|
||||
t.Errorf("Sync() updated=%v calls=%d, want false/0", updated, calls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGroup_SyncWithUpdate(t *testing.T) {
|
||||
syn := &countSyncer{updated: true}
|
||||
g := dataset.NewGroup(syn)
|
||||
n := 0
|
||||
ds := dataset.Add(g, func() (*int, error) {
|
||||
n++
|
||||
return &n, nil
|
||||
})
|
||||
if err := g.Init(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := g.Sync(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if got := ds.Load(); got == nil || *got != 2 {
|
||||
t.Errorf("ds.Load() = %v, want 2", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGroup_FetchError(t *testing.T) {
|
||||
syn := &countSyncer{err: errors.New("network down")}
|
||||
g := dataset.NewGroup(syn)
|
||||
dataset.Add(g, func() (*string, error) {
|
||||
v := "x"
|
||||
return &v, nil
|
||||
})
|
||||
if err := g.Init(); err == nil {
|
||||
t.Error("expected error from Group.Init when syncer fails")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGroup_LoaderError(t *testing.T) {
|
||||
syn := dataset.NopSyncer{}
|
||||
g := dataset.NewGroup(syn)
|
||||
dataset.Add(g, func() (*string, error) {
|
||||
return nil, errors.New("parse error")
|
||||
})
|
||||
if err := g.Init(); err == nil {
|
||||
t.Error("expected error from Group.Init when loader fails")
|
||||
}
|
||||
}
|
||||
@ -1,3 +0,0 @@
|
||||
module github.com/therootcompany/golib/net/dataset
|
||||
|
||||
go 1.26.0
|
||||
@ -3,9 +3,8 @@
|
||||
//
|
||||
// Typical setup:
|
||||
//
|
||||
// blGroup, _, inboundDS, _ := src.Datasets()
|
||||
// blGroup.Init()
|
||||
// go blGroup.Run(ctx, 47*time.Minute)
|
||||
// var blacklist atomic.Pointer[ipcohort.Cohort]
|
||||
// // ... caller loads blacklist and hot-swaps on a timer ...
|
||||
//
|
||||
// fm := &formmailer.FormMailer{
|
||||
// SMTPHost: "smtp.example.com:587",
|
||||
@ -16,7 +15,7 @@
|
||||
// Subject: "Contact from {.Email}",
|
||||
// SuccessBody: successHTML,
|
||||
// ErrorBody: errorHTML,
|
||||
// Blacklist: inboundDS,
|
||||
// Blacklist: &blacklist,
|
||||
// AllowedCountries: []string{"US", "CA", "MX"},
|
||||
// }
|
||||
// http.Handle("POST /contact", fm)
|
||||
@ -35,12 +34,12 @@ import (
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/phuslu/iploc"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/therootcompany/golib/net/dataset"
|
||||
"github.com/therootcompany/golib/net/ipcohort"
|
||||
)
|
||||
|
||||
@ -102,7 +101,7 @@ type FormMailer struct {
|
||||
ContentType string // inferred from SuccessBody if empty
|
||||
|
||||
// Blacklist — if set, matching IPs are rejected before any other processing.
|
||||
Blacklist *dataset.Dataset[ipcohort.Cohort]
|
||||
Blacklist *atomic.Pointer[ipcohort.Cohort]
|
||||
|
||||
// AllowedCountries — if non-nil, only requests from listed ISO codes are
|
||||
// accepted. Unknown country ("") is always allowed.
|
||||
|
||||
@ -4,7 +4,6 @@ go 1.26.0
|
||||
|
||||
require (
|
||||
github.com/phuslu/iploc v1.0.20260415
|
||||
github.com/therootcompany/golib/net/dataset v0.0.0
|
||||
github.com/therootcompany/golib/net/ipcohort v0.0.0
|
||||
golang.org/x/time v0.15.0
|
||||
)
|
||||
|
||||
@ -168,7 +168,7 @@ func (r *Repo) Sync() (bool, error) {
|
||||
return r.syncGit()
|
||||
}
|
||||
|
||||
// Fetch satisfies dataset.Syncer.
|
||||
// Fetch syncs the repo and reports whether HEAD changed.
|
||||
func (r *Repo) Fetch() (bool, error) {
|
||||
return r.syncGit()
|
||||
}
|
||||
@ -186,8 +186,8 @@ func (r *Repo) File(relPath string) *File {
|
||||
}
|
||||
|
||||
// File is a handle to a single file inside a Repo.
|
||||
// It implements dataset.Syncer: Fetch syncs the repo (deduped across all File
|
||||
// handles sharing the same Repo) then reports whether this file changed.
|
||||
// Fetch syncs the repo (deduped across all File handles sharing the same
|
||||
// Repo) and reports whether this file changed.
|
||||
type File struct {
|
||||
repo *Repo
|
||||
rel string
|
||||
@ -206,7 +206,7 @@ func (f *File) Open() (*os.File, error) {
|
||||
}
|
||||
|
||||
// Fetch syncs the repo and reports whether this file changed since last call.
|
||||
// Implements dataset.Syncer; safe to call concurrently.
|
||||
// Safe to call concurrently.
|
||||
func (f *File) Fetch() (bool, error) {
|
||||
if _, err := f.repo.syncGit(); err != nil {
|
||||
return false, err
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user