feat(sync/dataset): minimal group/view/fetcher for hot-swap refresh

Distilled from the previous net/dataset experiment and the inline
closure version in check-ip. Keeps what actually earned its keep:

  - Group ties one Fetcher to N views; a single Load drives all swaps,
    so shared sources (one git pull, one zip download) don't get
    re-fetched per view.
  - View[T].Value() is a lock-free atomic read; the atomic.Pointer is
    hidden so consumers never see in-flight reloads.
  - Tick runs Load on a ticker with stderr error logging.

Dropped from the v1 design: MultiSyncer (callers fan-out inline when
needed), Close (unused outside geoip), Name (callers wrap the logger),
standalone Dataset type (Group with one view covers it), Sync vs Init
asymmetry (Load handles first-call vs update internally).

check-ip rewires to use it — file/git/http modes all build a Group
with two views, uniform shape.
This commit is contained in:
AJ ONeal 2026-04-20 13:33:05 -06:00
parent 01a9185c03
commit 11743c9a10
No known key found for this signature in database
6 changed files with 332 additions and 93 deletions

View File

@ -7,4 +7,5 @@ require (
github.com/therootcompany/golib/net/gitshallow v0.0.0
github.com/therootcompany/golib/net/httpcache v0.0.0
github.com/therootcompany/golib/net/ipcohort v0.0.0
github.com/therootcompany/golib/sync/dataset v0.0.0
)

View File

@ -7,8 +7,8 @@
// - --git URL shallow-clone a git repo of blocklists
// - (default) fetch raw blocklist files over HTTP with caching
//
// Cohorts are held in atomic.Pointers and hot-swapped on refresh so callers
// never see a partial view. A single goroutine reloads on a ticker.
// Each mode builds a sync/dataset.Group: one Fetcher shared by the inbound
// and outbound views, so a single git pull (or HTTP-304 cycle) drives both.
package main
import (
@ -19,13 +19,13 @@ import (
"os"
"path/filepath"
"strings"
"sync/atomic"
"time"
"github.com/therootcompany/golib/net/geoip"
"github.com/therootcompany/golib/net/gitshallow"
"github.com/therootcompany/golib/net/httpcache"
"github.com/therootcompany/golib/net/ipcohort"
"github.com/therootcompany/golib/sync/dataset"
)
const (
@ -102,19 +102,16 @@ func run(cfg Config, ipStr string) (blocked bool, err error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var inbound, outbound atomic.Pointer[ipcohort.Cohort]
refresh, err := buildRefresher(cfg, &inbound, &outbound)
group, inbound, outbound, err := newBlocklistGroup(cfg)
if err != nil {
return false, err
}
if err := refresh(); err != nil {
if err := group.Load(ctx); err != nil {
return false, fmt.Errorf("blacklist: %w", err)
}
fmt.Fprintf(os.Stderr, "Loaded inbound=%d outbound=%d\n",
inbound.Load().Size(), outbound.Load().Size())
go tick(ctx, refreshInterval, "blacklist", refresh)
inbound.Value().Size(), outbound.Value().Size())
go group.Tick(ctx, refreshInterval)
whitelist, err := loadWhitelist(cfg.Whitelist)
if err != nil {
@ -127,8 +124,8 @@ func run(cfg Config, ipStr string) (blocked bool, err error) {
}
defer func() { _ = geo.Close() }()
blockedIn := isBlocked(ipStr, whitelist, inbound.Load())
blockedOut := isBlocked(ipStr, whitelist, outbound.Load())
blockedIn := isBlocked(ipStr, whitelist, inbound.Value())
blockedOut := isBlocked(ipStr, whitelist, outbound.Value())
switch {
case blockedIn && blockedOut:
@ -145,71 +142,51 @@ func run(cfg Config, ipStr string) (blocked bool, err error) {
return blockedIn || blockedOut, nil
}
// buildRefresher wires the chosen source (files/git/http) to the inbound and
// outbound atomic pointers, and returns a function that performs one refresh
// cycle: fetch upstream, and if anything changed (or on the first call),
// reload both cohorts and atomically swap them in.
func buildRefresher(
cfg Config,
inbound, outbound *atomic.Pointer[ipcohort.Cohort],
) (func() error, error) {
loadAndSwap := func(inPaths, outPaths []string) error {
in, err := ipcohort.LoadFiles(inPaths...)
if err != nil {
return fmt.Errorf("inbound: %w", err)
}
out, err := ipcohort.LoadFiles(outPaths...)
if err != nil {
return fmt.Errorf("outbound: %w", err)
}
inbound.Store(in)
outbound.Store(out)
return nil
// newBlocklistGroup wires a dataset.Group to the configured source (local
// files, git, or HTTP-cached raw files) and registers inbound/outbound views.
func newBlocklistGroup(cfg Config) (
_ *dataset.Group,
inbound, outbound *dataset.View[ipcohort.Cohort],
err error,
) {
fetcher, inPaths, outPaths, err := newFetcher(cfg)
if err != nil {
return nil, nil, nil, err
}
g := dataset.NewGroup(fetcher)
inbound = dataset.Add(g, loadCohort(inPaths))
outbound = dataset.Add(g, loadCohort(outPaths))
return g, inbound, outbound, nil
}
// newFetcher picks a Fetcher based on cfg and returns the on-disk file paths
// each view should parse after a sync.
func newFetcher(cfg Config) (fetcher dataset.Fetcher, inPaths, outPaths []string, err error) {
switch {
case cfg.Inbound != "" || cfg.Outbound != "":
inPaths, outPaths := splitCSV(cfg.Inbound), splitCSV(cfg.Outbound)
loaded := false
return func() error {
if loaded {
return nil
}
loaded = true
return loadAndSwap(inPaths, outPaths)
}, nil
return dataset.NopFetcher{}, splitCSV(cfg.Inbound), splitCSV(cfg.Outbound), nil
case cfg.GitURL != "":
dir, err := cacheDir(cfg.DataDir)
if err != nil {
return nil, err
return nil, nil, nil, err
}
repo := gitshallow.New(cfg.GitURL, dir, 1, "")
inPaths := []string{
repo.FilePath("tables/inbound/single_ips.txt"),
repo.FilePath("tables/inbound/networks.txt"),
}
outPaths := []string{
repo.FilePath("tables/outbound/single_ips.txt"),
repo.FilePath("tables/outbound/networks.txt"),
}
first := true
return func() error {
updated, err := repo.Sync()
if err != nil {
return err
}
if !first && !updated {
return nil
}
first = false
return loadAndSwap(inPaths, outPaths)
}, nil
return repo,
[]string{
repo.FilePath("tables/inbound/single_ips.txt"),
repo.FilePath("tables/inbound/networks.txt"),
},
[]string{
repo.FilePath("tables/outbound/single_ips.txt"),
repo.FilePath("tables/outbound/networks.txt"),
},
nil
default:
dir, err := cacheDir(cfg.DataDir)
if err != nil {
return nil, err
return nil, nil, nil, err
}
cachers := []*httpcache.Cacher{
httpcache.New(bitwireRawBase+"/inbound/single_ips.txt", filepath.Join(dir, "inbound_single_ips.txt")),
@ -217,40 +194,26 @@ func buildRefresher(
httpcache.New(bitwireRawBase+"/outbound/single_ips.txt", filepath.Join(dir, "outbound_single_ips.txt")),
httpcache.New(bitwireRawBase+"/outbound/networks.txt", filepath.Join(dir, "outbound_networks.txt")),
}
inPaths := []string{cachers[0].Path, cachers[1].Path}
outPaths := []string{cachers[2].Path, cachers[3].Path}
first := true
return func() error {
var anyUpdated bool
for _, c := range cachers {
u, err := c.Fetch()
if err != nil {
return err
return dataset.FetcherFunc(func() (bool, error) {
var any bool
for _, c := range cachers {
u, err := c.Fetch()
if err != nil {
return false, err
}
any = any || u
}
anyUpdated = anyUpdated || u
}
if !first && !anyUpdated {
return nil
}
first = false
return loadAndSwap(inPaths, outPaths)
}, nil
return any, nil
}),
[]string{cachers[0].Path, cachers[1].Path},
[]string{cachers[2].Path, cachers[3].Path},
nil
}
}
// tick calls fn every interval until ctx is done. Errors are logged, not fatal.
func tick(ctx context.Context, interval time.Duration, name string, fn func() error) {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
if err := fn(); err != nil {
fmt.Fprintf(os.Stderr, "%s: refresh error: %v\n", name, err)
}
}
func loadCohort(paths []string) func() (*ipcohort.Cohort, error) {
return func() (*ipcohort.Cohort, error) {
return ipcohort.LoadFiles(paths...)
}
}

View File

@ -8,6 +8,7 @@ use (
./net/gitshallow
./net/httpcache
./net/ipcohort
./sync/dataset
)
replace (
@ -16,4 +17,5 @@ replace (
github.com/therootcompany/golib/net/gitshallow v0.0.0 => ./net/gitshallow
github.com/therootcompany/golib/net/httpcache v0.0.0 => ./net/httpcache
github.com/therootcompany/golib/net/ipcohort v0.0.0 => ./net/ipcohort
github.com/therootcompany/golib/sync/dataset v0.0.0 => ./sync/dataset
)

127
sync/dataset/dataset.go Normal file
View File

@ -0,0 +1,127 @@
// Package dataset manages values that are periodically re-fetched from an
// upstream source and hot-swapped behind atomic pointers. Consumers read via
// View.Value (lock-free); a single Load drives any number of views off one
// Fetcher, so shared sources (one git pull, one zip download) don't get
// re-fetched per view.
//
// Typical lifecycle:
//
// g := dataset.NewGroup(repo) // *gitshallow.Repo satisfies Fetcher
// inbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... })
// outbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... })
// if err := g.Load(ctx); err != nil { ... } // initial populate
// go g.Tick(ctx, 47*time.Minute) // background refresh
// current := inbound.Value() // lock-free read
package dataset
import (
"context"
"fmt"
"os"
"sync/atomic"
"time"
)
// Fetcher reports whether an upstream source has changed since the last call.
// Implementations should dedup rapid-fire calls internally (e.g. gitshallow
// skips redundant pulls within a short window; httpcache uses ETag).
type Fetcher interface {
Fetch() (updated bool, err error)
}
// FetcherFunc adapts a plain function to Fetcher.
type FetcherFunc func() (bool, error)
func (f FetcherFunc) Fetch() (bool, error) { return f() }
// NopFetcher always reports no update. Use for groups backed by local files
// that don't need a refresh cycle.
type NopFetcher struct{}
func (NopFetcher) Fetch() (bool, error) { return false, nil }
// Group ties one Fetcher to one or more views. A Load call fetches once and,
// on the first call or when the source reports a change, reloads every view
// and atomically swaps its current value.
type Group struct {
fetcher Fetcher
views []reloader
loaded atomic.Bool
}
// reloader is a type-erased handle to a View's reload function.
type reloader interface {
reload() error
}
// NewGroup creates a Group backed by fetcher.
func NewGroup(fetcher Fetcher) *Group {
return &Group{fetcher: fetcher}
}
// Load fetches upstream and, on the first call or whenever the fetcher reports
// a change, reloads every view and atomically installs the new values.
func (g *Group) Load(ctx context.Context) error {
updated, err := g.fetcher.Fetch()
if err != nil {
return err
}
if g.loaded.Load() && !updated {
return nil
}
for _, v := range g.views {
if err := ctx.Err(); err != nil {
return err
}
if err := v.reload(); err != nil {
return err
}
}
g.loaded.Store(true)
return nil
}
// Tick calls Load every interval until ctx is done. Errors are written to
// stderr and do not stop the loop.
func (g *Group) Tick(ctx context.Context, interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
if err := g.Load(ctx); err != nil {
fmt.Fprintf(os.Stderr, "dataset: load error: %v\n", err)
}
}
}
}
// View is a read-only handle to one dataset inside a Group.
type View[T any] struct {
loader func() (*T, error)
ptr atomic.Pointer[T]
}
// Value returns the current snapshot. Nil before the Group is first loaded.
func (v *View[T]) Value() *T {
return v.ptr.Load()
}
func (v *View[T]) reload() error {
t, err := v.loader()
if err != nil {
return err
}
v.ptr.Store(t)
return nil
}
// Add registers a new view in g and returns it. Call after NewGroup and
// before the first Load.
func Add[T any](g *Group, loader func() (*T, error)) *View[T] {
v := &View[T]{loader: loader}
g.views = append(g.views, v)
return v
}

View File

@ -0,0 +1,143 @@
package dataset_test
import (
"errors"
"sync/atomic"
"testing"
"github.com/therootcompany/golib/sync/dataset"
)
type countFetcher struct {
calls atomic.Int32
updated bool
err error
}
func (f *countFetcher) Fetch() (bool, error) {
f.calls.Add(1)
return f.updated, f.err
}
func TestGroup_LoadPopulatesAllViews(t *testing.T) {
f := &countFetcher{}
g := dataset.NewGroup(f)
var aCalls, bCalls int
a := dataset.Add(g, func() (*string, error) {
aCalls++
v := "a"
return &v, nil
})
b := dataset.Add(g, func() (*int, error) {
bCalls++
v := 42
return &v, nil
})
if err := g.Load(t.Context()); err != nil {
t.Fatal(err)
}
if f.calls.Load() != 1 {
t.Errorf("Fetch called %d times, want 1", f.calls.Load())
}
if aCalls != 1 || bCalls != 1 {
t.Errorf("loaders called (%d,%d), want (1,1)", aCalls, bCalls)
}
if got := a.Value(); got == nil || *got != "a" {
t.Errorf("a.Value() = %v", got)
}
if got := b.Value(); got == nil || *got != 42 {
t.Errorf("b.Value() = %v", got)
}
}
func TestGroup_SecondLoadSkipsUnchanged(t *testing.T) {
f := &countFetcher{updated: false}
g := dataset.NewGroup(f)
calls := 0
dataset.Add(g, func() (*string, error) {
calls++
v := "x"
return &v, nil
})
if err := g.Load(t.Context()); err != nil {
t.Fatal(err)
}
if calls != 1 {
t.Fatalf("initial load ran loader %d times, want 1", calls)
}
if err := g.Load(t.Context()); err != nil {
t.Fatal(err)
}
if calls != 1 {
t.Errorf("second load ran loader %d times, want 1 (no upstream change)", calls)
}
}
func TestGroup_LoadOnUpdateSwaps(t *testing.T) {
f := &countFetcher{updated: true}
g := dataset.NewGroup(f)
n := 0
v := dataset.Add(g, func() (*int, error) {
n++
return &n, nil
})
if err := g.Load(t.Context()); err != nil {
t.Fatal(err)
}
if err := g.Load(t.Context()); err != nil {
t.Fatal(err)
}
if got := v.Value(); got == nil || *got != 2 {
t.Errorf("v.Value() = %v, want 2", got)
}
}
func TestGroup_ValueBeforeLoad(t *testing.T) {
g := dataset.NewGroup(dataset.NopFetcher{})
v := dataset.Add(g, func() (*string, error) {
s := "x"
return &s, nil
})
if v.Value() != nil {
t.Error("Value() before Load should be nil")
}
}
func TestGroup_FetchError(t *testing.T) {
f := &countFetcher{err: errors.New("offline")}
g := dataset.NewGroup(f)
dataset.Add(g, func() (*string, error) {
s := "x"
return &s, nil
})
if err := g.Load(t.Context()); err == nil {
t.Error("expected fetch error")
}
}
func TestGroup_LoaderError(t *testing.T) {
g := dataset.NewGroup(dataset.NopFetcher{})
dataset.Add(g, func() (*string, error) {
return nil, errors.New("parse fail")
})
if err := g.Load(t.Context()); err == nil {
t.Error("expected loader error")
}
}
func TestFetcherFunc(t *testing.T) {
var called bool
f := dataset.FetcherFunc(func() (bool, error) {
called = true
return true, nil
})
updated, err := f.Fetch()
if err != nil {
t.Fatal(err)
}
if !called || !updated {
t.Errorf("FetcherFunc: called=%v updated=%v", called, updated)
}
}

3
sync/dataset/go.mod Normal file
View File

@ -0,0 +1,3 @@
module github.com/therootcompany/golib/sync/dataset
go 1.26.0