diff --git a/cmd/check-ip/main.go b/cmd/check-ip/main.go index 54bd885..859ab26 100644 --- a/cmd/check-ip/main.go +++ b/cmd/check-ip/main.go @@ -45,51 +45,56 @@ func main() { // -- Blacklist ---------------------------------------------------------- - var inboundDS, outboundDS *dataset.Dataset[ipcohort.Cohort] + var ( + syncer dataset.Syncer + inboundPaths []string + outboundPaths []string + ) switch { case *inbound != "" || *outbound != "": - inboundDS = cohortDataset(dataset.NopSyncer{}, splitPaths(*inbound)...) - outboundDS = cohortDataset(dataset.NopSyncer{}, splitPaths(*outbound)...) + syncer = dataset.NopSyncer{} + inboundPaths = splitPaths(*inbound) + outboundPaths = splitPaths(*outbound) case *gitURL != "": dir := cacheDir(*dataDir, "bitwire-it") gr := gitshallow.New(*gitURL, dir, 1, "") - inSingle := gr.File("tables/inbound/single_ips.txt") - inNetwork := gr.File("tables/inbound/networks.txt") - outSingle := gr.File("tables/outbound/single_ips.txt") - outNetwork:= gr.File("tables/outbound/networks.txt") - // Each File.Fetch deduplicates the git pull via the shared Repo. - inboundDS = cohortDataset(dataset.MultiSyncer{inSingle, inNetwork}, inSingle.Path(), inNetwork.Path()) - outboundDS = cohortDataset(dataset.MultiSyncer{outSingle, outNetwork}, outSingle.Path(), outNetwork.Path()) + syncer = gr + inboundPaths = []string{gr.FilePath("tables/inbound/single_ips.txt"), gr.FilePath("tables/inbound/networks.txt")} + outboundPaths = []string{gr.FilePath("tables/outbound/single_ips.txt"), gr.FilePath("tables/outbound/networks.txt")} default: - dir := cacheDir(*dataDir, "bitwire-it") - inSingle := httpcache.New(inboundSingleURL, filepath.Join(dir, "inbound_single_ips.txt")) - inNetwork := httpcache.New(inboundNetworkURL, filepath.Join(dir, "inbound_networks.txt")) - outSingle := httpcache.New(outboundSingleURL, filepath.Join(dir, "outbound_single_ips.txt")) - outNetwork:= httpcache.New(outboundNetworkURL, filepath.Join(dir, "outbound_networks.txt")) - inboundDS = cohortDataset(dataset.MultiSyncer{inSingle, inNetwork}, inSingle.Path, inNetwork.Path) - outboundDS = cohortDataset(dataset.MultiSyncer{outSingle, outNetwork}, outSingle.Path, outNetwork.Path) + dir := cacheDir(*dataDir, "bitwire-it") + inSingle := httpcache.New(inboundSingleURL, filepath.Join(dir, "inbound_single_ips.txt")) + inNetwork := httpcache.New(inboundNetworkURL, filepath.Join(dir, "inbound_networks.txt")) + outSingle := httpcache.New(outboundSingleURL, filepath.Join(dir, "outbound_single_ips.txt")) + outNetwork := httpcache.New(outboundNetworkURL, filepath.Join(dir, "outbound_networks.txt")) + syncer = dataset.MultiSyncer{inSingle, inNetwork, outSingle, outNetwork} + inboundPaths = []string{inSingle.Path, inNetwork.Path} + outboundPaths = []string{outSingle.Path, outNetwork.Path} } - var whitelistDS *dataset.Dataset[ipcohort.Cohort] - if *whitelist != "" { - whitelistDS = cohortDataset(dataset.NopSyncer{}, splitPaths(*whitelist)...) - } + g := dataset.NewGroup(syncer) + inboundDS := dataset.Add(g, loadCohort(inboundPaths...)) + outboundDS := dataset.Add(g, loadCohort(outboundPaths...)) - for _, ds := range []*dataset.Dataset[ipcohort.Cohort]{whitelistDS, inboundDS, outboundDS} { - if ds == nil { - continue - } - if err := ds.Init(); err != nil { - fmt.Fprintf(os.Stderr, "error: blacklist: %v\n", err) - os.Exit(1) - } + if err := g.Init(); err != nil { + fmt.Fprintf(os.Stderr, "error: blacklist: %v\n", err) + os.Exit(1) } fmt.Fprintf(os.Stderr, "Loaded inbound=%d outbound=%d\n", inboundDS.Load().Size(), outboundDS.Load().Size()) + var whitelistDS *dataset.Dataset[ipcohort.Cohort] + if *whitelist != "" { + whitelistDS = dataset.New(dataset.NopSyncer{}, loadCohort(splitPaths(*whitelist)...)) + if err := whitelistDS.Init(); err != nil { + fmt.Fprintf(os.Stderr, "error: whitelist: %v\n", err) + os.Exit(1) + } + } + // -- GeoIP (optional) -------------------------------------------------- geo, err := setupGeo(*geoipConf, *cityDB, *asnDB) @@ -106,11 +111,7 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - for _, ds := range []*dataset.Dataset[ipcohort.Cohort]{whitelistDS, inboundDS, outboundDS} { - if ds != nil { - go ds.Run(ctx, 47*time.Minute) - } - } + go g.Run(ctx, 47*time.Minute) geo.Run(ctx, 47*time.Minute) // -- Check and report -------------------------------------------------- @@ -135,11 +136,10 @@ func main() { } } -// cohortDataset creates a Dataset that fetches via syncer and loads paths as a Cohort. -func cohortDataset(syncer dataset.Syncer, paths ...string) *dataset.Dataset[ipcohort.Cohort] { - return dataset.New(syncer, func() (*ipcohort.Cohort, error) { +func loadCohort(paths ...string) func() (*ipcohort.Cohort, error) { + return func() (*ipcohort.Cohort, error) { return ipcohort.LoadFiles(paths...) - }) + } } func isBlocked(ip string, whitelist, cohort *dataset.Dataset[ipcohort.Cohort]) bool { diff --git a/net/dataset/dataset.go b/net/dataset/dataset.go index 1dcc87a..298ccfd 100644 --- a/net/dataset/dataset.go +++ b/net/dataset/dataset.go @@ -11,13 +11,14 @@ // go ds.Run(ctx, 47*time.Minute) // val := ds.Load() // *MyType, lock-free // -// Group (one syncer, multiple values): +// Group (one syncer, multiple datasets): // // g := dataset.NewGroup(repo) // inbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... }) // outbound := dataset.Add(g, func() (*ipcohort.Cohort, error) { ... }) // if err := g.Init(); err != nil { ... } // go g.Run(ctx, 47*time.Minute) +// val := inbound.Load() // lock-free; Init/Run belong to the Group package dataset import ( @@ -56,6 +57,9 @@ func (NopSyncer) Fetch() (bool, error) { return false, nil } // Dataset couples a Syncer, a load function, and an atomic.Pointer[T]. // Load is safe for concurrent use without locks. +// +// When a Dataset is added to a Group via Add, Init and Run belong to the +// Group; call Load on the Dataset to read the current value. type Dataset[T any] struct { // Name is used in error messages. Name string @@ -140,6 +144,9 @@ type member interface { // Group ties one Syncer to multiple datasets so a single Fetch drives all // swaps — no redundant network calls when datasets share a source. +// +// Datasets added via Add are owned by the Group; call Init and Run on the +// Group, not on individual datasets. type Group struct { syncer Syncer members []member @@ -197,20 +204,10 @@ func (g *Group) Run(ctx context.Context, interval time.Duration) { } } -// View is the read-only handle returned by Add. Sync is driven by the owning -// Group. -type View[T any] struct { - d *Dataset[T] -} - -// Load returns the current value. Returns nil before the Group is initialised. -func (v *View[T]) Load() *T { return v.d.ptr.Load() } - -func (v *View[T]) swap() error { return v.d.swap() } - -// Add registers a new dataset in g and returns a View for reading. -func Add[T any](g *Group, load func() (*T, error)) *View[T] { - v := &View[T]{d: &Dataset[T]{load: load}} - g.members = append(g.members, v) - return v +// Add registers a new dataset in g and returns it for reading via Load. +// Init and Run are driven by the Group; do not call them on the returned Dataset. +func Add[T any](g *Group, load func() (*T, error)) *Dataset[T] { + d := &Dataset[T]{load: load} + g.members = append(g.members, d) + return d } diff --git a/net/formmailer/formmailer.go b/net/formmailer/formmailer.go index 831463a..9052dd7 100644 --- a/net/formmailer/formmailer.go +++ b/net/formmailer/formmailer.go @@ -102,7 +102,7 @@ type FormMailer struct { ContentType string // inferred from SuccessBody if empty // Blacklist — if set, matching IPs are rejected before any other processing. - Blacklist *dataset.View[ipcohort.Cohort] + Blacklist *dataset.Dataset[ipcohort.Cohort] // AllowedCountries — if non-nil, only requests from listed ISO codes are // accepted. Unknown country ("") is always allowed. diff --git a/net/gitshallow/gitshallow.go b/net/gitshallow/gitshallow.go index cb43597..b53eb1b 100644 --- a/net/gitshallow/gitshallow.go +++ b/net/gitshallow/gitshallow.go @@ -173,6 +173,11 @@ func (r *Repo) Fetch() (bool, error) { return r.syncGit() } +// FilePath returns the absolute path to relPath within this repo. +func (r *Repo) FilePath(rel string) string { + return filepath.Join(r.Path, rel) +} + // File returns a handle to relPath within this repo. // The handle's Path and Open methods give access to the file; its Fetch method // syncs the repo and reports whether this specific file changed (by mtime).