refactor: dataset.Add returns *Dataset, no View; main uses Group for all cases

Remove View[T] — Add now returns *Dataset[T] directly. Callers use Load()
on the returned Dataset; Init/Run belong to the owning Group.

main.go simplified: declare syncer + file paths per case, then one
g.Init() and one g.Run(). No manual loops over individual datasets.
Add gitshallow.Repo.FilePath helper.
This commit is contained in:
AJ ONeal 2026-04-20 12:48:38 -06:00
parent cc945b0c09
commit 994d91b2bf
No known key found for this signature in database
4 changed files with 58 additions and 56 deletions

View File

@ -45,51 +45,56 @@ func main() {
// -- Blacklist ---------------------------------------------------------- // -- Blacklist ----------------------------------------------------------
var inboundDS, outboundDS *dataset.Dataset[ipcohort.Cohort] var (
syncer dataset.Syncer
inboundPaths []string
outboundPaths []string
)
switch { switch {
case *inbound != "" || *outbound != "": case *inbound != "" || *outbound != "":
inboundDS = cohortDataset(dataset.NopSyncer{}, splitPaths(*inbound)...) syncer = dataset.NopSyncer{}
outboundDS = cohortDataset(dataset.NopSyncer{}, splitPaths(*outbound)...) inboundPaths = splitPaths(*inbound)
outboundPaths = splitPaths(*outbound)
case *gitURL != "": case *gitURL != "":
dir := cacheDir(*dataDir, "bitwire-it") dir := cacheDir(*dataDir, "bitwire-it")
gr := gitshallow.New(*gitURL, dir, 1, "") gr := gitshallow.New(*gitURL, dir, 1, "")
inSingle := gr.File("tables/inbound/single_ips.txt") syncer = gr
inNetwork := gr.File("tables/inbound/networks.txt") inboundPaths = []string{gr.FilePath("tables/inbound/single_ips.txt"), gr.FilePath("tables/inbound/networks.txt")}
outSingle := gr.File("tables/outbound/single_ips.txt") outboundPaths = []string{gr.FilePath("tables/outbound/single_ips.txt"), gr.FilePath("tables/outbound/networks.txt")}
outNetwork:= gr.File("tables/outbound/networks.txt")
// Each File.Fetch deduplicates the git pull via the shared Repo.
inboundDS = cohortDataset(dataset.MultiSyncer{inSingle, inNetwork}, inSingle.Path(), inNetwork.Path())
outboundDS = cohortDataset(dataset.MultiSyncer{outSingle, outNetwork}, outSingle.Path(), outNetwork.Path())
default: default:
dir := cacheDir(*dataDir, "bitwire-it") dir := cacheDir(*dataDir, "bitwire-it")
inSingle := httpcache.New(inboundSingleURL, filepath.Join(dir, "inbound_single_ips.txt")) inSingle := httpcache.New(inboundSingleURL, filepath.Join(dir, "inbound_single_ips.txt"))
inNetwork := httpcache.New(inboundNetworkURL, filepath.Join(dir, "inbound_networks.txt")) inNetwork := httpcache.New(inboundNetworkURL, filepath.Join(dir, "inbound_networks.txt"))
outSingle := httpcache.New(outboundSingleURL, filepath.Join(dir, "outbound_single_ips.txt")) outSingle := httpcache.New(outboundSingleURL, filepath.Join(dir, "outbound_single_ips.txt"))
outNetwork:= httpcache.New(outboundNetworkURL, filepath.Join(dir, "outbound_networks.txt")) outNetwork := httpcache.New(outboundNetworkURL, filepath.Join(dir, "outbound_networks.txt"))
inboundDS = cohortDataset(dataset.MultiSyncer{inSingle, inNetwork}, inSingle.Path, inNetwork.Path) syncer = dataset.MultiSyncer{inSingle, inNetwork, outSingle, outNetwork}
outboundDS = cohortDataset(dataset.MultiSyncer{outSingle, outNetwork}, outSingle.Path, outNetwork.Path) inboundPaths = []string{inSingle.Path, inNetwork.Path}
outboundPaths = []string{outSingle.Path, outNetwork.Path}
} }
var whitelistDS *dataset.Dataset[ipcohort.Cohort] g := dataset.NewGroup(syncer)
if *whitelist != "" { inboundDS := dataset.Add(g, loadCohort(inboundPaths...))
whitelistDS = cohortDataset(dataset.NopSyncer{}, splitPaths(*whitelist)...) outboundDS := dataset.Add(g, loadCohort(outboundPaths...))
}
for _, ds := range []*dataset.Dataset[ipcohort.Cohort]{whitelistDS, inboundDS, outboundDS} { if err := g.Init(); err != nil {
if ds == nil { fmt.Fprintf(os.Stderr, "error: blacklist: %v\n", err)
continue os.Exit(1)
}
if err := ds.Init(); err != nil {
fmt.Fprintf(os.Stderr, "error: blacklist: %v\n", err)
os.Exit(1)
}
} }
fmt.Fprintf(os.Stderr, "Loaded inbound=%d outbound=%d\n", fmt.Fprintf(os.Stderr, "Loaded inbound=%d outbound=%d\n",
inboundDS.Load().Size(), outboundDS.Load().Size()) inboundDS.Load().Size(), outboundDS.Load().Size())
var whitelistDS *dataset.Dataset[ipcohort.Cohort]
if *whitelist != "" {
whitelistDS = dataset.New(dataset.NopSyncer{}, loadCohort(splitPaths(*whitelist)...))
if err := whitelistDS.Init(); err != nil {
fmt.Fprintf(os.Stderr, "error: whitelist: %v\n", err)
os.Exit(1)
}
}
// -- GeoIP (optional) -------------------------------------------------- // -- GeoIP (optional) --------------------------------------------------
geo, err := setupGeo(*geoipConf, *cityDB, *asnDB) geo, err := setupGeo(*geoipConf, *cityDB, *asnDB)
@ -106,11 +111,7 @@ func main() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
for _, ds := range []*dataset.Dataset[ipcohort.Cohort]{whitelistDS, inboundDS, outboundDS} { go g.Run(ctx, 47*time.Minute)
if ds != nil {
go ds.Run(ctx, 47*time.Minute)
}
}
geo.Run(ctx, 47*time.Minute) geo.Run(ctx, 47*time.Minute)
// -- Check and report -------------------------------------------------- // -- Check and report --------------------------------------------------
@ -135,11 +136,10 @@ func main() {
} }
} }
// cohortDataset creates a Dataset that fetches via syncer and loads paths as a Cohort. func loadCohort(paths ...string) func() (*ipcohort.Cohort, error) {
func cohortDataset(syncer dataset.Syncer, paths ...string) *dataset.Dataset[ipcohort.Cohort] { return func() (*ipcohort.Cohort, error) {
return dataset.New(syncer, func() (*ipcohort.Cohort, error) {
return ipcohort.LoadFiles(paths...) return ipcohort.LoadFiles(paths...)
}) }
} }
func isBlocked(ip string, whitelist, cohort *dataset.Dataset[ipcohort.Cohort]) bool { func isBlocked(ip string, whitelist, cohort *dataset.Dataset[ipcohort.Cohort]) bool {

View File

@ -11,13 +11,14 @@
// go ds.Run(ctx, 47*time.Minute) // go ds.Run(ctx, 47*time.Minute)
// val := ds.Load() // *MyType, lock-free // val := ds.Load() // *MyType, lock-free
// //
// Group (one syncer, multiple values): // Group (one syncer, multiple datasets):
// //
// g := dataset.NewGroup(repo) // g := dataset.NewGroup(repo)
// inbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... }) // inbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... })
// outbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... }) // outbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... })
// if err := g.Init(); err != nil { ... } // if err := g.Init(); err != nil { ... }
// go g.Run(ctx, 47*time.Minute) // go g.Run(ctx, 47*time.Minute)
// val := inbound.Load() // lock-free; Init/Run belong to the Group
package dataset package dataset
import ( import (
@ -56,6 +57,9 @@ func (NopSyncer) Fetch() (bool, error) { return false, nil }
// Dataset couples a Syncer, a load function, and an atomic.Pointer[T]. // Dataset couples a Syncer, a load function, and an atomic.Pointer[T].
// Load is safe for concurrent use without locks. // Load is safe for concurrent use without locks.
//
// When a Dataset is added to a Group via Add, Init and Run belong to the
// Group; call Load on the Dataset to read the current value.
type Dataset[T any] struct { type Dataset[T any] struct {
// Name is used in error messages. // Name is used in error messages.
Name string Name string
@ -140,6 +144,9 @@ type member interface {
// Group ties one Syncer to multiple datasets so a single Fetch drives all // Group ties one Syncer to multiple datasets so a single Fetch drives all
// swaps — no redundant network calls when datasets share a source. // swaps — no redundant network calls when datasets share a source.
//
// Datasets added via Add are owned by the Group; call Init and Run on the
// Group, not on individual datasets.
type Group struct { type Group struct {
syncer Syncer syncer Syncer
members []member members []member
@ -197,20 +204,10 @@ func (g *Group) Run(ctx context.Context, interval time.Duration) {
} }
} }
// View is the read-only handle returned by Add. Sync is driven by the owning // Add registers a new dataset in g and returns it for reading via Load.
// Group. // Init and Run are driven by the Group; do not call them on the returned Dataset.
type View[T any] struct { func Add[T any](g *Group, load func() (*T, error)) *Dataset[T] {
d *Dataset[T] d := &Dataset[T]{load: load}
} g.members = append(g.members, d)
return d
// Load returns the current value. Returns nil before the Group is initialised.
func (v *View[T]) Load() *T { return v.d.ptr.Load() }
func (v *View[T]) swap() error { return v.d.swap() }
// Add registers a new dataset in g and returns a View for reading.
func Add[T any](g *Group, load func() (*T, error)) *View[T] {
v := &View[T]{d: &Dataset[T]{load: load}}
g.members = append(g.members, v)
return v
} }

View File

@ -102,7 +102,7 @@ type FormMailer struct {
ContentType string // inferred from SuccessBody if empty ContentType string // inferred from SuccessBody if empty
// Blacklist — if set, matching IPs are rejected before any other processing. // Blacklist — if set, matching IPs are rejected before any other processing.
Blacklist *dataset.View[ipcohort.Cohort] Blacklist *dataset.Dataset[ipcohort.Cohort]
// AllowedCountries — if non-nil, only requests from listed ISO codes are // AllowedCountries — if non-nil, only requests from listed ISO codes are
// accepted. Unknown country ("") is always allowed. // accepted. Unknown country ("") is always allowed.

View File

@ -173,6 +173,11 @@ func (r *Repo) Fetch() (bool, error) {
return r.syncGit() return r.syncGit()
} }
// FilePath returns the absolute path to relPath within this repo.
func (r *Repo) FilePath(rel string) string {
return filepath.Join(r.Path, rel)
}
// File returns a handle to relPath within this repo. // File returns a handle to relPath within this repo.
// The handle's Path and Open methods give access to the file; its Fetch method // The handle's Path and Open methods give access to the file; its Fetch method
// syncs the repo and reports whether this specific file changed (by mtime). // syncs the repo and reports whether this specific file changed (by mtime).