mirror of
https://github.com/therootcompany/golib.git
synced 2026-04-24 20:58:00 +00:00
refactor: Syncer interface, zero-length guard, Sources uses []Syncer
httpcache.Syncer interface: Fetch() (bool, error) — satisfied by both *httpcache.Cacher and *gitshallow.Repo (new Fetch method + LightGC field). httpcache.Cacher.Fetch now errors on zero-length 200 response instead of clobbering the existing file with empty content. Sources.Fetch/Init drop the lightGC param (baked into Repo.LightGC). Sources.syncs []httpcache.Syncer replaces the separate git/httpInbound/ httpOutbound fields — Fetch iterates syncs uniformly, no more switch. Sources itself satisfies httpcache.Syncer.
This commit is contained in:
parent
2abdc1c229
commit
105e99532d
@ -11,10 +11,11 @@ import (
|
|||||||
|
|
||||||
// Repo manages a shallow git clone used as a periodically-updated data source.
|
// Repo manages a shallow git clone used as a periodically-updated data source.
|
||||||
type Repo struct {
|
type Repo struct {
|
||||||
URL string
|
URL string
|
||||||
Path string
|
Path string
|
||||||
Depth int // 0 defaults to 1, -1 for all
|
Depth int // 0 defaults to 1, -1 for all
|
||||||
Branch string // Optional: specific branch to clone/pull
|
Branch string // Optional: specific branch to clone/pull
|
||||||
|
LightGC bool // true = skip aggressive GC; false (default) = aggressive+prune
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
@ -178,6 +179,11 @@ func (r *Repo) Sync(lightGC bool) (bool, error) {
|
|||||||
return r.syncGit(lightGC)
|
return r.syncGit(lightGC)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Fetch satisfies httpcache.Syncer using the Repo's LightGC setting.
|
||||||
|
func (r *Repo) Fetch() (bool, error) {
|
||||||
|
return r.syncGit(r.LightGC)
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Repo) syncGit(lightGC bool) (updated bool, err error) {
|
func (r *Repo) syncGit(lightGC bool) (updated bool, err error) {
|
||||||
r.mu.Lock()
|
r.mu.Lock()
|
||||||
defer r.mu.Unlock()
|
defer r.mu.Unlock()
|
||||||
|
|||||||
@ -11,6 +11,12 @@ import (
|
|||||||
|
|
||||||
const defaultTimeout = 30 * time.Second
|
const defaultTimeout = 30 * time.Second
|
||||||
|
|
||||||
|
// Syncer is implemented by any value that can fetch a remote resource and
|
||||||
|
// report whether it changed. Both *Cacher and *gitshallow.Repo satisfy this.
|
||||||
|
type Syncer interface {
|
||||||
|
Fetch() (updated bool, err error)
|
||||||
|
}
|
||||||
|
|
||||||
// Cacher fetches a URL to a local file, using ETag/Last-Modified to skip
|
// Cacher fetches a URL to a local file, using ETag/Last-Modified to skip
|
||||||
// unchanged responses.
|
// unchanged responses.
|
||||||
//
|
//
|
||||||
@ -129,12 +135,16 @@ func (c *Cacher) Fetch() (updated bool, err error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
if _, err := io.Copy(f, resp.Body); err != nil {
|
n, err := io.Copy(f, resp.Body)
|
||||||
f.Close()
|
f.Close()
|
||||||
|
if err != nil {
|
||||||
os.Remove(tmp)
|
os.Remove(tmp)
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
f.Close()
|
if n == 0 {
|
||||||
|
os.Remove(tmp)
|
||||||
|
return false, fmt.Errorf("empty response from %s", c.URL)
|
||||||
|
}
|
||||||
if err := os.Rename(tmp, c.Path); err != nil {
|
if err := os.Rename(tmp, c.Path); err != nil {
|
||||||
os.Remove(tmp)
|
os.Remove(tmp)
|
||||||
return false, err
|
return false, err
|
||||||
|
|||||||
@ -21,9 +21,8 @@ type Sources struct {
|
|||||||
inboundPaths []string
|
inboundPaths []string
|
||||||
outboundPaths []string
|
outboundPaths []string
|
||||||
|
|
||||||
git *gitshallow.Repo
|
gitRepo *gitshallow.Repo // non-nil for git source; used by Init for clone-if-missing
|
||||||
httpInbound []*httpcache.Cacher
|
syncs []httpcache.Syncer // all syncable sources (git repo or HTTP cachers)
|
||||||
httpOutbound []*httpcache.Cacher
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newFileSources(whitelist, inbound, outbound []string) *Sources {
|
func newFileSources(whitelist, inbound, outbound []string) *Sources {
|
||||||
@ -42,11 +41,13 @@ func newGitSources(gitURL, repoDir string, whitelist, inboundRel, outboundRel []
|
|||||||
}
|
}
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
repo := gitshallow.New(gitURL, repoDir, 1, "")
|
||||||
return &Sources{
|
return &Sources{
|
||||||
whitelistPaths: whitelist,
|
whitelistPaths: whitelist,
|
||||||
inboundPaths: abs(inboundRel),
|
inboundPaths: abs(inboundRel),
|
||||||
outboundPaths: abs(outboundRel),
|
outboundPaths: abs(outboundRel),
|
||||||
git: gitshallow.New(gitURL, repoDir, 1, ""),
|
gitRepo: repo,
|
||||||
|
syncs: []httpcache.Syncer{repo},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -54,60 +55,39 @@ func newHTTPSources(whitelist []string, inbound, outbound []HTTPSource) *Sources
|
|||||||
s := &Sources{whitelistPaths: whitelist}
|
s := &Sources{whitelistPaths: whitelist}
|
||||||
for _, src := range inbound {
|
for _, src := range inbound {
|
||||||
s.inboundPaths = append(s.inboundPaths, src.Path)
|
s.inboundPaths = append(s.inboundPaths, src.Path)
|
||||||
s.httpInbound = append(s.httpInbound, httpcache.New(src.URL, src.Path))
|
s.syncs = append(s.syncs, httpcache.New(src.URL, src.Path))
|
||||||
}
|
}
|
||||||
for _, src := range outbound {
|
for _, src := range outbound {
|
||||||
s.outboundPaths = append(s.outboundPaths, src.Path)
|
s.outboundPaths = append(s.outboundPaths, src.Path)
|
||||||
s.httpOutbound = append(s.httpOutbound, httpcache.New(src.URL, src.Path))
|
s.syncs = append(s.syncs, httpcache.New(src.URL, src.Path))
|
||||||
}
|
}
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch pulls updates from the remote (git or HTTP).
|
// Fetch pulls updates from all sources. Returns whether any new data arrived.
|
||||||
// Returns whether any new data was received.
|
// Satisfies httpcache.Syncer.
|
||||||
func (s *Sources) Fetch(lightGC bool) (bool, error) {
|
func (s *Sources) Fetch() (bool, error) {
|
||||||
switch {
|
var anyUpdated bool
|
||||||
case s.git != nil:
|
for _, syn := range s.syncs {
|
||||||
return s.git.Sync(lightGC)
|
updated, err := syn.Fetch()
|
||||||
case len(s.httpInbound) > 0 || len(s.httpOutbound) > 0:
|
if err != nil {
|
||||||
var anyUpdated bool
|
return anyUpdated, err
|
||||||
for _, c := range s.httpInbound {
|
|
||||||
updated, err := c.Fetch()
|
|
||||||
if err != nil {
|
|
||||||
return anyUpdated, err
|
|
||||||
}
|
|
||||||
anyUpdated = anyUpdated || updated
|
|
||||||
}
|
}
|
||||||
for _, c := range s.httpOutbound {
|
anyUpdated = anyUpdated || updated
|
||||||
updated, err := c.Fetch()
|
|
||||||
if err != nil {
|
|
||||||
return anyUpdated, err
|
|
||||||
}
|
|
||||||
anyUpdated = anyUpdated || updated
|
|
||||||
}
|
|
||||||
return anyUpdated, nil
|
|
||||||
default:
|
|
||||||
return false, nil
|
|
||||||
}
|
}
|
||||||
|
return anyUpdated, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init ensures the remote is ready (clones if needed, fetches HTTP files).
|
// Init ensures remotes are ready. For git: clones if missing then syncs.
|
||||||
// Always returns true so the caller knows to load data on startup.
|
// For HTTP: fetches each cacher unconditionally on first run.
|
||||||
func (s *Sources) Init(lightGC bool) error {
|
func (s *Sources) Init() error {
|
||||||
switch {
|
if s.gitRepo != nil {
|
||||||
case s.git != nil:
|
_, err := s.gitRepo.Init(s.gitRepo.LightGC)
|
||||||
_, err := s.git.Init(lightGC)
|
|
||||||
return err
|
return err
|
||||||
case len(s.httpInbound) > 0 || len(s.httpOutbound) > 0:
|
}
|
||||||
for _, c := range s.httpInbound {
|
for _, syn := range s.syncs {
|
||||||
if _, err := c.Fetch(); err != nil {
|
if _, err := syn.Fetch(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, c := range s.httpOutbound {
|
|
||||||
if _, err := c.Fetch(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@ -77,7 +77,7 @@ func main() {
|
|||||||
|
|
||||||
var whitelist, inbound, outbound atomic.Pointer[ipcohort.Cohort]
|
var whitelist, inbound, outbound atomic.Pointer[ipcohort.Cohort]
|
||||||
|
|
||||||
if err := src.Init(false); err != nil {
|
if err := src.Init(); err != nil {
|
||||||
fmt.Fprintf(os.Stderr, "error: %v\n", err)
|
fmt.Fprintf(os.Stderr, "error: %v\n", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
@ -186,7 +186,7 @@ func runLoop(ctx context.Context, src *Sources,
|
|||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
// Blocklists.
|
// Blocklists.
|
||||||
if updated, err := src.Fetch(false); err != nil {
|
if updated, err := src.Fetch(); err != nil {
|
||||||
fmt.Fprintf(os.Stderr, "error: blocklist sync: %v\n", err)
|
fmt.Fprintf(os.Stderr, "error: blocklist sync: %v\n", err)
|
||||||
} else if updated {
|
} else if updated {
|
||||||
if err := reloadBlocklists(src, whitelist, inbound, outbound); err != nil {
|
if err := reloadBlocklists(src, whitelist, inbound, outbound); err != nil {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user