refactor: fold dataset into gitshallow, caller owns atomic.Pointer

fs/dataset deleted — generic File[T] wrapper didn't earn its abstraction layer
gitshallow.ShallowRepo → Repo (redundant with package name)
gitshallow.Repo.Register(func() error) — callbacks fire after each sync
gitshallow.Repo.Init/Run — full lifecycle in one package
caller (check-ip-blacklist) holds atomic.Pointer[Cohort] directly
This commit is contained in:
AJ ONeal 2026-04-19 22:51:52 -06:00
parent 8731eaf10b
commit d6837d31ed
No known key found for this signature in database
3 changed files with 104 additions and 171 deletions

View File

@ -1,134 +0,0 @@
package dataset
import (
"context"
"fmt"
"os"
"path/filepath"
"sync/atomic"
"time"
"github.com/therootcompany/golib/net/gitshallow"
)
// File holds an atomically-swappable pointer to a value loaded from a file.
// Reads are lock-free. Use NewFile for file-only use, or AddFile to attach
// to a GitRepo so the value refreshes whenever the repo is updated.
type File[T any] struct {
atomic.Pointer[T]
path string
loadFile func(string) (*T, error)
}
// NewFile creates a file-backed dataset with no git dependency.
// Call Reload to do the initial load and after any file change.
func NewFile[T any](path string, loadFile func(string) (*T, error)) *File[T] {
d := &File[T]{
path: path,
loadFile: loadFile,
}
d.Store(new(T))
return d
}
// Reload reads the file and atomically replaces the stored value.
func (d *File[T]) Reload() error {
v, err := d.loadFile(d.path)
if err != nil {
return err
}
d.Store(v)
return nil
}
func (d *File[T]) reloadFile() error {
return d.Reload()
}
// reloader is the internal interface GitRepo uses to trigger file reloads.
type reloader interface {
reloadFile() error
}
// GitRepo manages a shallow git clone and reloads all registered files
// whenever the repo is updated. Multiple files from the same repo share
// one clone and one pull, avoiding git file-lock conflicts.
type GitRepo struct {
path string
shallowRepo *gitshallow.ShallowRepo
files []reloader
}
// NewRepo creates a GitRepo backed by the given git URL, cloning into repoPath.
func NewRepo(gitURL, repoPath string) *GitRepo {
return &GitRepo{
path: repoPath,
shallowRepo: gitshallow.New(gitURL, repoPath, 1, ""),
}
}
// AddFile registers a file inside this repo and returns its handle.
// relPath is relative to the repo root. The file is reloaded automatically
// whenever the repo is synced via Init or Run.
func AddFile[T any](repo *GitRepo, relPath string, loadFile func(string) (*T, error)) *File[T] {
d := NewFile(filepath.Join(repo.path, relPath), loadFile)
repo.files = append(repo.files, d)
return d
}
// Init clones the repo if missing, syncs once, and loads all registered files.
// Always runs aggressive GC — acceptable as a one-time startup cost.
func (r *GitRepo) Init() error {
gitDir := filepath.Join(r.path, ".git")
if _, err := os.Stat(gitDir); err != nil {
if _, err := r.shallowRepo.Clone(); err != nil {
return err
}
}
_, err := r.sync(false, true)
return err
}
// Run periodically syncs the repo and reloads files. Blocks until ctx is done.
// lightGC=false (zero value) runs aggressive GC with immediate pruning to keep footprint minimal.
// Pass true to skip both when the periodic GC is too slow for your workload.
func (r *GitRepo) Run(ctx context.Context, lightGC bool) {
ticker := time.NewTicker(47 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if updated, err := r.sync(lightGC, false); err != nil {
fmt.Fprintf(os.Stderr, "error: git repo sync: %v\n", err)
} else if updated {
fmt.Fprintf(os.Stderr, "git repo: files reloaded\n")
}
case <-ctx.Done():
return
}
}
}
// Sync pulls the latest commits and reloads all files if HEAD changed.
// lightGC=false (zero value) runs aggressive GC with immediate pruning to keep footprint minimal.
func (r *GitRepo) Sync(lightGC bool) (bool, error) {
return r.sync(lightGC, false)
}
func (r *GitRepo) sync(lightGC, force bool) (bool, error) {
updated, err := r.shallowRepo.Sync(lightGC)
if err != nil {
return false, fmt.Errorf("git sync: %w", err)
}
if !updated && !force {
return false, nil
}
for _, f := range r.files {
if err := f.reloadFile(); err != nil {
fmt.Fprintf(os.Stderr, "error: reload file: %v\n", err)
}
}
return true, nil
}

View File

@ -1,30 +1,33 @@
package gitshallow
import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
)
// ShallowRepo represents a shallow Git repository manager.
type ShallowRepo struct {
// Repo manages a shallow git clone used as a periodically-updated data source.
type Repo struct {
URL string
Path string
Depth int // 0 defaults to 1, -1 for all
Branch string // Optional: specific branch to clone/pull
mu sync.Mutex
callbacks []func() error
}
// New creates a new ShallowRepo instance.
func New(url, path string, depth int, branch string) *ShallowRepo {
// New creates a new Repo instance.
func New(url, path string, depth int, branch string) *Repo {
if depth == 0 {
depth = 1
}
return &ShallowRepo{
return &Repo{
URL: url,
Path: path,
Depth: depth,
@ -32,15 +35,60 @@ func New(url, path string, depth int, branch string) *ShallowRepo {
}
}
// Register adds a callback invoked after each successful clone or pull.
// Use this to reload files and update atomic pointers when the repo changes.
func (r *Repo) Register(fn func() error) {
r.callbacks = append(r.callbacks, fn)
}
// Init clones the repo if missing, syncs once, then invokes all callbacks
// regardless of whether git had new commits — ensuring files are loaded on startup.
func (r *Repo) Init(lightGC bool) error {
gitDir := filepath.Join(r.Path, ".git")
if _, err := os.Stat(gitDir); err != nil {
if _, err := r.Clone(); err != nil {
return err
}
}
if _, err := r.syncGit(lightGC); err != nil {
return err
}
return r.invokeCallbacks()
}
// Run periodically syncs the repo and invokes callbacks when HEAD changes.
// Blocks until ctx is done.
// lightGC=false (zero value) runs aggressive GC with immediate pruning to minimize disk use.
// Pass true to skip both when the periodic GC is too slow for your workload.
func (r *Repo) Run(ctx context.Context, lightGC bool) {
ticker := time.NewTicker(47 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if updated, err := r.Sync(lightGC); err != nil {
fmt.Fprintf(os.Stderr, "error: git sync: %v\n", err)
} else if updated {
fmt.Fprintf(os.Stderr, "git: repo updated\n")
}
case <-ctx.Done():
return
}
}
}
// Clone performs a shallow clone (--depth N --single-branch --no-tags).
func (r *ShallowRepo) Clone() (bool, error) {
func (r *Repo) Clone() (bool, error) {
r.mu.Lock()
defer r.mu.Unlock()
return r.clone()
}
func (r *ShallowRepo) clone() (bool, error) {
func (r *Repo) clone() (bool, error) {
if r.exists() {
return false, nil
}
@ -70,13 +118,13 @@ func (r *ShallowRepo) clone() (bool, error) {
}
// exists checks if the directory contains a .git folder.
func (r *ShallowRepo) exists() bool {
func (r *Repo) exists() bool {
_, err := os.Stat(filepath.Join(r.Path, ".git"))
return err == nil
}
// runGit executes a git command in the repo directory (or parent for clone).
func (r *ShallowRepo) runGit(args ...string) (string, error) {
func (r *Repo) runGit(args ...string) (string, error) {
cmd := exec.Command("git", args...)
if _, err := os.Stat(r.Path); err == nil && r.exists() {
@ -94,14 +142,14 @@ func (r *ShallowRepo) runGit(args ...string) (string, error) {
}
// Pull performs a shallow pull (--ff-only) and reports whether HEAD changed.
func (r *ShallowRepo) Pull() (updated bool, err error) {
func (r *Repo) Pull() (updated bool, err error) {
r.mu.Lock()
defer r.mu.Unlock()
return r.pull()
}
func (r *ShallowRepo) pull() (updated bool, err error) {
func (r *Repo) pull() (updated bool, err error) {
if !r.exists() {
return false, fmt.Errorf("repository does not exist at %s", r.Path)
}
@ -134,14 +182,14 @@ func (r *ShallowRepo) pull() (updated bool, err error) {
}
// GC runs git gc. aggressiveGC adds --aggressive; pruneNow adds --prune=now.
func (r *ShallowRepo) GC(aggressiveGC, pruneNow bool) error {
func (r *Repo) GC(aggressiveGC, pruneNow bool) error {
r.mu.Lock()
defer r.mu.Unlock()
return r.gc(aggressiveGC, pruneNow)
}
func (r *ShallowRepo) gc(aggressiveGC, pruneNow bool) error {
func (r *Repo) gc(aggressiveGC, pruneNow bool) error {
if !r.exists() {
return fmt.Errorf("repository does not exist at %s", r.Path)
}
@ -158,10 +206,19 @@ func (r *ShallowRepo) gc(aggressiveGC, pruneNow bool) error {
return err
}
// Sync clones if missing, pulls, and runs GC.
// lightGC=false (zero value) runs --aggressive GC with --prune=now to minimize disk use.
// Pass true to skip both when speed matters more than footprint.
func (r *ShallowRepo) Sync(lightGC bool) (updated bool, err error) {
// Sync clones if missing, pulls, runs GC, and invokes callbacks if HEAD changed.
// Returns whether HEAD changed.
// lightGC=false (zero value) runs aggressive GC with --prune=now to minimize disk use.
func (r *Repo) Sync(lightGC bool) (updated bool, err error) {
updated, err = r.syncGit(lightGC)
if err != nil || !updated {
return updated, err
}
return true, r.invokeCallbacks()
}
func (r *Repo) syncGit(lightGC bool) (updated bool, err error) {
r.mu.Lock()
defer r.mu.Unlock()
@ -172,16 +229,18 @@ func (r *ShallowRepo) Sync(lightGC bool) (updated bool, err error) {
}
updated, err = r.pull()
if err != nil {
return false, err
}
if !updated {
return false, nil
if err != nil || !updated {
return updated, err
}
if err := r.gc(!lightGC, !lightGC); err != nil {
return true, fmt.Errorf("gc failed but pull succeeded: %w", err)
}
return true, nil
return true, r.gc(!lightGC, !lightGC)
}
func (r *Repo) invokeCallbacks() error {
for _, fn := range r.callbacks {
if err := fn(); err != nil {
fmt.Fprintf(os.Stderr, "error: reload callback: %v\n", err)
}
}
return nil
}

View File

@ -4,8 +4,9 @@ import (
"fmt"
"os"
"path/filepath"
"sync/atomic"
"github.com/therootcompany/golib/fs/dataset"
"github.com/therootcompany/golib/net/gitshallow"
"github.com/therootcompany/golib/net/ipcohort"
)
@ -22,28 +23,35 @@ func main() {
gitURL = os.Args[3]
}
var blacklist *dataset.File[ipcohort.Cohort]
var cohort atomic.Pointer[ipcohort.Cohort]
load := func() error {
c, err := ipcohort.LoadFile(dataPath)
if err != nil {
return err
}
cohort.Store(c)
return nil
}
if gitURL != "" {
repoDir := filepath.Dir(dataPath)
relPath := filepath.Base(dataPath)
repo := dataset.NewRepo(gitURL, repoDir)
blacklist = dataset.AddFile(repo, relPath, ipcohort.LoadFile)
repo := gitshallow.New(gitURL, repoDir, 1, "")
repo.Register(load)
fmt.Fprintf(os.Stderr, "Syncing %q ...\n", repoDir)
if err := repo.Init(); err != nil {
if err := repo.Init(false); err != nil {
fmt.Fprintf(os.Stderr, "error: git sync: %v\n", err)
os.Exit(1)
}
} else {
blacklist = dataset.NewFile(dataPath, ipcohort.LoadFile)
fmt.Fprintf(os.Stderr, "Loading %q ...\n", dataPath)
if err := blacklist.Reload(); err != nil {
if err := load(); err != nil {
fmt.Fprintf(os.Stderr, "error: load: %v\n", err)
os.Exit(1)
}
}
c := blacklist.Load()
c := cohort.Load()
fmt.Fprintf(os.Stderr, "Loaded %d entries\n", c.Size())
if c.Contains(ipStr) {