From d6837d31ed886cc0db97f2607012e61b0b0314b8 Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Sun, 19 Apr 2026 22:51:52 -0600 Subject: [PATCH] refactor: fold dataset into gitshallow, caller owns atomic.Pointer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fs/dataset deleted — generic File[T] wrapper didn't earn its abstraction layer gitshallow.ShallowRepo → Repo (redundant with package name) gitshallow.Repo.Register(func() error) — callbacks fire after each sync gitshallow.Repo.Init/Run — full lifecycle in one package caller (check-ip-blacklist) holds atomic.Pointer[Cohort] directly --- fs/dataset/dataset.go | 134 -------------------- net/gitshallow/gitshallow.go | 115 +++++++++++++---- net/ipcohort/cmd/check-ip-blacklist/main.go | 26 ++-- 3 files changed, 104 insertions(+), 171 deletions(-) delete mode 100644 fs/dataset/dataset.go diff --git a/fs/dataset/dataset.go b/fs/dataset/dataset.go deleted file mode 100644 index dc94c36..0000000 --- a/fs/dataset/dataset.go +++ /dev/null @@ -1,134 +0,0 @@ -package dataset - -import ( - "context" - "fmt" - "os" - "path/filepath" - "sync/atomic" - "time" - - "github.com/therootcompany/golib/net/gitshallow" -) - -// File holds an atomically-swappable pointer to a value loaded from a file. -// Reads are lock-free. Use NewFile for file-only use, or AddFile to attach -// to a GitRepo so the value refreshes whenever the repo is updated. -type File[T any] struct { - atomic.Pointer[T] - path string - loadFile func(string) (*T, error) -} - -// NewFile creates a file-backed dataset with no git dependency. -// Call Reload to do the initial load and after any file change. -func NewFile[T any](path string, loadFile func(string) (*T, error)) *File[T] { - d := &File[T]{ - path: path, - loadFile: loadFile, - } - d.Store(new(T)) - return d -} - -// Reload reads the file and atomically replaces the stored value. -func (d *File[T]) Reload() error { - v, err := d.loadFile(d.path) - if err != nil { - return err - } - d.Store(v) - return nil -} - -func (d *File[T]) reloadFile() error { - return d.Reload() -} - -// reloader is the internal interface GitRepo uses to trigger file reloads. -type reloader interface { - reloadFile() error -} - -// GitRepo manages a shallow git clone and reloads all registered files -// whenever the repo is updated. Multiple files from the same repo share -// one clone and one pull, avoiding git file-lock conflicts. -type GitRepo struct { - path string - shallowRepo *gitshallow.ShallowRepo - files []reloader -} - -// NewRepo creates a GitRepo backed by the given git URL, cloning into repoPath. -func NewRepo(gitURL, repoPath string) *GitRepo { - return &GitRepo{ - path: repoPath, - shallowRepo: gitshallow.New(gitURL, repoPath, 1, ""), - } -} - -// AddFile registers a file inside this repo and returns its handle. -// relPath is relative to the repo root. The file is reloaded automatically -// whenever the repo is synced via Init or Run. -func AddFile[T any](repo *GitRepo, relPath string, loadFile func(string) (*T, error)) *File[T] { - d := NewFile(filepath.Join(repo.path, relPath), loadFile) - repo.files = append(repo.files, d) - return d -} - -// Init clones the repo if missing, syncs once, and loads all registered files. -// Always runs aggressive GC — acceptable as a one-time startup cost. -func (r *GitRepo) Init() error { - gitDir := filepath.Join(r.path, ".git") - if _, err := os.Stat(gitDir); err != nil { - if _, err := r.shallowRepo.Clone(); err != nil { - return err - } - } - _, err := r.sync(false, true) - return err -} - -// Run periodically syncs the repo and reloads files. Blocks until ctx is done. -// lightGC=false (zero value) runs aggressive GC with immediate pruning to keep footprint minimal. -// Pass true to skip both when the periodic GC is too slow for your workload. -func (r *GitRepo) Run(ctx context.Context, lightGC bool) { - ticker := time.NewTicker(47 * time.Minute) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - if updated, err := r.sync(lightGC, false); err != nil { - fmt.Fprintf(os.Stderr, "error: git repo sync: %v\n", err) - } else if updated { - fmt.Fprintf(os.Stderr, "git repo: files reloaded\n") - } - case <-ctx.Done(): - return - } - } -} - -// Sync pulls the latest commits and reloads all files if HEAD changed. -// lightGC=false (zero value) runs aggressive GC with immediate pruning to keep footprint minimal. -func (r *GitRepo) Sync(lightGC bool) (bool, error) { - return r.sync(lightGC, false) -} - -func (r *GitRepo) sync(lightGC, force bool) (bool, error) { - updated, err := r.shallowRepo.Sync(lightGC) - if err != nil { - return false, fmt.Errorf("git sync: %w", err) - } - if !updated && !force { - return false, nil - } - - for _, f := range r.files { - if err := f.reloadFile(); err != nil { - fmt.Fprintf(os.Stderr, "error: reload file: %v\n", err) - } - } - return true, nil -} diff --git a/net/gitshallow/gitshallow.go b/net/gitshallow/gitshallow.go index c263ae1..1681021 100644 --- a/net/gitshallow/gitshallow.go +++ b/net/gitshallow/gitshallow.go @@ -1,30 +1,33 @@ package gitshallow import ( + "context" "fmt" "os" "os/exec" "path/filepath" "strings" "sync" + "time" ) -// ShallowRepo represents a shallow Git repository manager. -type ShallowRepo struct { +// Repo manages a shallow git clone used as a periodically-updated data source. +type Repo struct { URL string Path string Depth int // 0 defaults to 1, -1 for all Branch string // Optional: specific branch to clone/pull - mu sync.Mutex + mu sync.Mutex + callbacks []func() error } -// New creates a new ShallowRepo instance. -func New(url, path string, depth int, branch string) *ShallowRepo { +// New creates a new Repo instance. +func New(url, path string, depth int, branch string) *Repo { if depth == 0 { depth = 1 } - return &ShallowRepo{ + return &Repo{ URL: url, Path: path, Depth: depth, @@ -32,15 +35,60 @@ func New(url, path string, depth int, branch string) *ShallowRepo { } } +// Register adds a callback invoked after each successful clone or pull. +// Use this to reload files and update atomic pointers when the repo changes. +func (r *Repo) Register(fn func() error) { + r.callbacks = append(r.callbacks, fn) +} + +// Init clones the repo if missing, syncs once, then invokes all callbacks +// regardless of whether git had new commits — ensuring files are loaded on startup. +func (r *Repo) Init(lightGC bool) error { + gitDir := filepath.Join(r.Path, ".git") + if _, err := os.Stat(gitDir); err != nil { + if _, err := r.Clone(); err != nil { + return err + } + } + + if _, err := r.syncGit(lightGC); err != nil { + return err + } + + return r.invokeCallbacks() +} + +// Run periodically syncs the repo and invokes callbacks when HEAD changes. +// Blocks until ctx is done. +// lightGC=false (zero value) runs aggressive GC with immediate pruning to minimize disk use. +// Pass true to skip both when the periodic GC is too slow for your workload. +func (r *Repo) Run(ctx context.Context, lightGC bool) { + ticker := time.NewTicker(47 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if updated, err := r.Sync(lightGC); err != nil { + fmt.Fprintf(os.Stderr, "error: git sync: %v\n", err) + } else if updated { + fmt.Fprintf(os.Stderr, "git: repo updated\n") + } + case <-ctx.Done(): + return + } + } +} + // Clone performs a shallow clone (--depth N --single-branch --no-tags). -func (r *ShallowRepo) Clone() (bool, error) { +func (r *Repo) Clone() (bool, error) { r.mu.Lock() defer r.mu.Unlock() return r.clone() } -func (r *ShallowRepo) clone() (bool, error) { +func (r *Repo) clone() (bool, error) { if r.exists() { return false, nil } @@ -70,13 +118,13 @@ func (r *ShallowRepo) clone() (bool, error) { } // exists checks if the directory contains a .git folder. -func (r *ShallowRepo) exists() bool { +func (r *Repo) exists() bool { _, err := os.Stat(filepath.Join(r.Path, ".git")) return err == nil } // runGit executes a git command in the repo directory (or parent for clone). -func (r *ShallowRepo) runGit(args ...string) (string, error) { +func (r *Repo) runGit(args ...string) (string, error) { cmd := exec.Command("git", args...) if _, err := os.Stat(r.Path); err == nil && r.exists() { @@ -94,14 +142,14 @@ func (r *ShallowRepo) runGit(args ...string) (string, error) { } // Pull performs a shallow pull (--ff-only) and reports whether HEAD changed. -func (r *ShallowRepo) Pull() (updated bool, err error) { +func (r *Repo) Pull() (updated bool, err error) { r.mu.Lock() defer r.mu.Unlock() return r.pull() } -func (r *ShallowRepo) pull() (updated bool, err error) { +func (r *Repo) pull() (updated bool, err error) { if !r.exists() { return false, fmt.Errorf("repository does not exist at %s", r.Path) } @@ -134,14 +182,14 @@ func (r *ShallowRepo) pull() (updated bool, err error) { } // GC runs git gc. aggressiveGC adds --aggressive; pruneNow adds --prune=now. -func (r *ShallowRepo) GC(aggressiveGC, pruneNow bool) error { +func (r *Repo) GC(aggressiveGC, pruneNow bool) error { r.mu.Lock() defer r.mu.Unlock() return r.gc(aggressiveGC, pruneNow) } -func (r *ShallowRepo) gc(aggressiveGC, pruneNow bool) error { +func (r *Repo) gc(aggressiveGC, pruneNow bool) error { if !r.exists() { return fmt.Errorf("repository does not exist at %s", r.Path) } @@ -158,10 +206,19 @@ func (r *ShallowRepo) gc(aggressiveGC, pruneNow bool) error { return err } -// Sync clones if missing, pulls, and runs GC. -// lightGC=false (zero value) runs --aggressive GC with --prune=now to minimize disk use. -// Pass true to skip both when speed matters more than footprint. -func (r *ShallowRepo) Sync(lightGC bool) (updated bool, err error) { +// Sync clones if missing, pulls, runs GC, and invokes callbacks if HEAD changed. +// Returns whether HEAD changed. +// lightGC=false (zero value) runs aggressive GC with --prune=now to minimize disk use. +func (r *Repo) Sync(lightGC bool) (updated bool, err error) { + updated, err = r.syncGit(lightGC) + if err != nil || !updated { + return updated, err + } + + return true, r.invokeCallbacks() +} + +func (r *Repo) syncGit(lightGC bool) (updated bool, err error) { r.mu.Lock() defer r.mu.Unlock() @@ -172,16 +229,18 @@ func (r *ShallowRepo) Sync(lightGC bool) (updated bool, err error) { } updated, err = r.pull() - if err != nil { - return false, err - } - if !updated { - return false, nil + if err != nil || !updated { + return updated, err } - if err := r.gc(!lightGC, !lightGC); err != nil { - return true, fmt.Errorf("gc failed but pull succeeded: %w", err) - } - - return true, nil + return true, r.gc(!lightGC, !lightGC) +} + +func (r *Repo) invokeCallbacks() error { + for _, fn := range r.callbacks { + if err := fn(); err != nil { + fmt.Fprintf(os.Stderr, "error: reload callback: %v\n", err) + } + } + return nil } diff --git a/net/ipcohort/cmd/check-ip-blacklist/main.go b/net/ipcohort/cmd/check-ip-blacklist/main.go index 68b38c2..6293849 100644 --- a/net/ipcohort/cmd/check-ip-blacklist/main.go +++ b/net/ipcohort/cmd/check-ip-blacklist/main.go @@ -4,8 +4,9 @@ import ( "fmt" "os" "path/filepath" + "sync/atomic" - "github.com/therootcompany/golib/fs/dataset" + "github.com/therootcompany/golib/net/gitshallow" "github.com/therootcompany/golib/net/ipcohort" ) @@ -22,28 +23,35 @@ func main() { gitURL = os.Args[3] } - var blacklist *dataset.File[ipcohort.Cohort] + var cohort atomic.Pointer[ipcohort.Cohort] + + load := func() error { + c, err := ipcohort.LoadFile(dataPath) + if err != nil { + return err + } + cohort.Store(c) + return nil + } if gitURL != "" { repoDir := filepath.Dir(dataPath) - relPath := filepath.Base(dataPath) - repo := dataset.NewRepo(gitURL, repoDir) - blacklist = dataset.AddFile(repo, relPath, ipcohort.LoadFile) + repo := gitshallow.New(gitURL, repoDir, 1, "") + repo.Register(load) fmt.Fprintf(os.Stderr, "Syncing %q ...\n", repoDir) - if err := repo.Init(); err != nil { + if err := repo.Init(false); err != nil { fmt.Fprintf(os.Stderr, "error: git sync: %v\n", err) os.Exit(1) } } else { - blacklist = dataset.NewFile(dataPath, ipcohort.LoadFile) fmt.Fprintf(os.Stderr, "Loading %q ...\n", dataPath) - if err := blacklist.Reload(); err != nil { + if err := load(); err != nil { fmt.Fprintf(os.Stderr, "error: load: %v\n", err) os.Exit(1) } } - c := blacklist.Load() + c := cohort.Load() fmt.Fprintf(os.Stderr, "Loaded %d entries\n", c.Size()) if c.Contains(ipStr) {