cleanup and more testing

This commit is contained in:
AJ ONeal 2021-02-24 01:13:36 -07:00
parent 2873457cf1
commit 4a67172f81
8 changed files with 419 additions and 414 deletions

View File

@ -43,8 +43,6 @@ func RouteStopped(r chi.Router, runOpts *options.ServerConfig) {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// r.Body is always .Close()ed by Go's http server // r.Body is always .Close()ed by Go's http server
r.Body = http.MaxBytesReader(w, r.Body, options.DefaultMaxBodySize) r.Body = http.MaxBytesReader(w, r.Body, options.DefaultMaxBodySize)
// TODO admin auth middleware
log.Printf("TODO: handle authentication")
next.ServeHTTP(w, r) next.ServeHTTP(w, r)
}) })
}) })
@ -96,6 +94,9 @@ func RouteStopped(r chi.Router, runOpts *options.ServerConfig) {
r.Get("/logs/{oldID}", func(w http.ResponseWriter, r *http.Request) { r.Get("/logs/{oldID}", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
// TODO admin auth middleware
log.Printf("[TODO] handle AUTH (logs could be sensitive)")
oldID := webhooks.URLSafeGitID(chi.URLParam(r, "oldID")) oldID := webhooks.URLSafeGitID(chi.URLParam(r, "oldID"))
// TODO add `since` // TODO add `since`
j, err := jobs.LoadLogs(runOpts, oldID) j, err := jobs.LoadLogs(runOpts, oldID)
@ -149,7 +150,7 @@ func RouteStopped(r chi.Router, runOpts *options.ServerConfig) {
r.Post("/jobs", func(w http.ResponseWriter, r *http.Request) { r.Post("/jobs", func(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body) decoder := json.NewDecoder(r.Body)
msg := &jobs.KillMsg{} msg := &KillMsg{}
if err := decoder.Decode(msg); nil != err { if err := decoder.Decode(msg); nil != err {
log.Printf("kill job invalid json:\n%v", err) log.Printf("kill job invalid json:\n%v", err)
http.Error(w, "invalid json body", http.StatusBadRequest) http.Error(w, "invalid json body", http.StatusBadRequest)
@ -272,3 +273,9 @@ type Repo struct {
CloneURL string `json:"clone_url"` CloneURL string `json:"clone_url"`
Promotions []string `json:"_promotions"` Promotions []string `json:"_promotions"`
} }
// KillMsg describes which job to kill
type KillMsg struct {
JobID string `json:"job_id"`
Kill bool `json:"kill"`
}

View File

@ -1,6 +1,7 @@
package api package api
import ( import (
"encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
@ -28,7 +29,7 @@ func init() {
runOpts = &options.ServerConfig{ runOpts = &options.ServerConfig{
//Addr: "localhost:4483", //Addr: "localhost:4483",
ScriptsPath: "./testdata", ScriptsPath: "./testdata",
LogDir: "./test-logs/debounce", LogDir: "./test-logs/api",
TmpDir: tmpDir, TmpDir: tmpDir,
DebounceDelay: 25 * time.Millisecond, DebounceDelay: 25 * time.Millisecond,
StaleJobAge: 5 * time.Minute, StaleJobAge: 5 * time.Minute,
@ -69,16 +70,12 @@ func TestCallback(t *testing.T) {
} }
// , _ := json.MarshallIndent(&hook , "", " ") // , _ := json.MarshallIndent(&hook , "", " ")
jobs.Debounce(hook) jobs.Debounce(hook)
/* /*
body := bytes.NewReader(hook) body := bytes.NewReader(hook)
r := httptest.NewRequest("POST", "/api/local/webhook", body) r := httptest.NewRequest("POST", "/api/local/webhook", body)
//dec := json.NewDecoder(r.Body)
//dec.Decode()
*/ */
t.Log("sleep so job can debounce, start, and finish") // TODO use callback or chan chan to avoid sleeps?
time.Sleep(debounceDelay) time.Sleep(debounceDelay)
time.Sleep(jobDelay) time.Sleep(jobDelay)
@ -92,16 +89,32 @@ func TestCallback(t *testing.T) {
) )
resp, err := http.Get(reqURL) resp, err := http.Get(reqURL)
if nil != err { if nil != err {
t.Logf("[DEBUG] Response Error: %v", err) t.Errorf("HTTP response error: %s\n%#v", reqURL, err)
return return
} }
t.Logf("[DEBUG] Request URL: %s", reqURL) job := &jobs.Job{}
t.Logf("[DEBUG] Response Headers: %d %#v", resp.StatusCode, resp.Header) dec := json.NewDecoder(resp.Body)
b, err := ioutil.ReadAll(resp.Body) if err := dec.Decode(job); nil != err {
if nil != err { t.Errorf(
t.Logf("[DEBUG] Response Error: %v", err) "response decode error: %d %s\n%#v\n%#v",
resp.StatusCode, reqURL, resp.Header, err,
)
return
}
if len(job.Logs) < 3 {
t.Errorf("too few logs: %s\n%#v", reqURL, job)
return
}
if nil == job.Report || len(job.Report.Results) < 1 {
t.Errorf("too few results: %s\n%#v", reqURL, job)
return
}
if nil == job.ExitCode || 0 != *job.ExitCode {
t.Errorf("non-zero exit code: %s\n%#v", reqURL, job)
return return
} }
t.Logf("[DEBUG] Response Body: %v", string(b))
} }

View File

@ -1,233 +0,0 @@
package jobs
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
"git.rootprojects.org/root/gitdeploy/internal/log"
"git.rootprojects.org/root/gitdeploy/internal/options"
"git.rootprojects.org/root/gitdeploy/internal/webhooks"
)
// Debounce puts a job in the queue, in time
func Debounce(hook webhooks.Ref) {
webhooks.Hooks <- hook
}
var jobsTimersMux sync.Mutex
var debounceTimers = make(map[webhooks.RefID]*time.Timer)
func debounce(hook *webhooks.Ref, runOpts *options.ServerConfig) {
jobsTimersMux.Lock()
defer jobsTimersMux.Unlock()
activeID := hook.GetRefID()
if _, ok := Actives.Load(activeID); ok {
log.Printf("Job in progress, not debouncing %s", hook)
return
}
refID := hook.GetRefID()
timer, ok := debounceTimers[refID]
if ok {
log.Printf("Replacing previous debounce timer for %s", hook)
timer.Stop()
}
// this will not cause a mutual lock because it is async
debounceTimers[refID] = time.AfterFunc(runOpts.DebounceDelay, func() {
jobsTimersMux.Lock()
delete(debounceTimers, refID)
jobsTimersMux.Unlock()
debounced <- hook
})
}
func getBacklogFilePath(baseDir string, hook *webhooks.Ref) (string, string, error) {
baseDir, _ = filepath.Abs(baseDir)
fileName := hook.RefName + ".json"
fileDir := filepath.Join(baseDir, hook.RepoID)
err := os.MkdirAll(fileDir, 0755)
return fileDir, fileName, err
}
func saveBacklog(hook *webhooks.Ref, runOpts *options.ServerConfig) {
pendingID := hook.GetRefID()
Pending.Store(pendingID, hook)
repoDir, repoFile, err := getBacklogFilePath(runOpts.TmpDir, hook)
if nil != err {
log.Printf("[WARN] could not create backlog dir %s:\n%v", repoDir, err)
return
}
f, err := ioutil.TempFile(repoDir, "tmp-*")
if nil != err {
log.Printf("[WARN] could not create backlog file %s:\n%v", f.Name(), err)
return
}
b, _ := json.MarshalIndent(hook, "", " ")
if _, err := f.Write(b); nil != err {
log.Printf("[WARN] could not write backlog file %s:\n%v", f.Name(), err)
return
}
replace := false
backlogPath := filepath.Join(repoDir, repoFile)
if _, err := os.Stat(backlogPath); nil == err {
replace = true
_ = os.Remove(backlogPath)
}
if err := os.Rename(f.Name(), backlogPath); nil != err {
log.Printf("[WARN] rename backlog json failed:\n%v", err)
return
}
if replace {
log.Printf("[backlog] replace backlog for %s", hook.GetRefID())
} else {
log.Printf("[backlog] create backlog for %s", hook.GetRefID())
}
}
func run(curHook *webhooks.Ref, runOpts *options.ServerConfig) {
// because we want to lock the whole transaction all of the state
jobsTimersMux.Lock()
defer jobsTimersMux.Unlock()
pendingID := curHook.GetRefID()
if _, ok := Actives.Load(pendingID); ok {
log.Printf("Job already in progress: %s", curHook.GetRefID())
return
}
var hook *webhooks.Ref
// Legacy, but would be nice to repurpose for resuming on reload
repoDir, repoFile, _ := getBacklogFilePath(runOpts.TmpDir, curHook)
backlogFile := filepath.Join(repoDir, repoFile)
if value, ok := Pending.Load(pendingID); ok {
hook = value.(*webhooks.Ref)
log.Printf("loaded from Pending state: %#v", hook)
} else {
// TODO add mutex (should not affect temp files)
_ = os.Remove(backlogFile + ".cur")
_ = os.Rename(backlogFile, backlogFile+".cur")
b, err := ioutil.ReadFile(backlogFile + ".cur")
if nil != err {
if !os.IsNotExist(err) {
log.Printf("[warn] could not read backlog file %s:\n%v", repoFile, err)
}
// doesn't exist => no backlog
log.Printf("[NO BACKLOG] no backlog for %s", repoFile)
return
}
hook = &webhooks.Ref{}
if err := json.Unmarshal(b, hook); nil != err {
log.Printf("[warn] could not parse backlog file %s:\n%v", repoFile, err)
return
}
hook = webhooks.New(*hook)
log.Printf("loaded from file: %#v", hook)
}
Pending.Delete(pendingID)
_ = os.Remove(backlogFile)
_ = os.Remove(backlogFile + ".cur")
env := os.Environ()
envs := getEnvs(runOpts.Addr, string(pendingID), runOpts.RepoList, hook)
envs = append(envs, "GIT_DEPLOY_JOB_ID="+string(pendingID))
scriptPath, _ := filepath.Abs(runOpts.ScriptsPath + "/deploy.sh")
args := []string{
"-i",
"--",
//strings.Join([]string{
scriptPath,
string(pendingID),
hook.RefName,
hook.RefType,
hook.Owner,
hook.Repo,
hook.HTTPSURL,
//}, " "),
}
args2 := append([]string{"[" + string(hook.GetRefID()) + "]", "bash"}, args...)
fmt.Println(strings.Join(args2, " "))
cmd := exec.Command("bash", args...)
cmd.Env = append(env, envs...)
now := time.Now()
j := &Job{
StartedAt: now,
Cmd: cmd,
GitRef: hook,
Logs: []Log{},
Promote: false,
}
// TODO jobs.New()
// Sets cmd.Stdout and cmd.Stderr
txtFile := setOutput(runOpts.LogDir, j)
if err := cmd.Start(); nil != err {
log.Printf("gitdeploy exec error: %s\n", err)
return
}
Actives.Store(pendingID, j)
go func() {
log.Printf("Started job for %s", hook)
if err := cmd.Wait(); nil != err {
log.Printf("gitdeploy job for %s#%s exited with error: %v", hook.HTTPSURL, hook.RefName, err)
} else {
log.Printf("gitdeploy job for %s#%s finished\n", hook.HTTPSURL, hook.RefName)
}
if nil != txtFile {
_ = txtFile.Close()
}
// TODO move to deathRow only?
updateExitStatus(j)
// Switch ID to the more specific RevID
j.ID = string(j.GitRef.GetRevID())
// replace the text log with a json log
if jsonFile, err := getJobFile(runOpts.LogDir, j.GitRef, ".json"); nil != err {
// jsonFile.Name() should be the full path
log.Printf("[warn] could not create log file '%s': %v", runOpts.LogDir, err)
} else {
enc := json.NewEncoder(jsonFile)
enc.SetIndent("", " ")
if err := enc.Encode(j); nil != err {
log.Printf("[warn] could not encode json log '%s': %v", jsonFile.Name(), err)
} else {
logdir, logname, _ := getJobFilePath(runOpts.LogDir, j.GitRef, ".log")
_ = os.Remove(filepath.Join(logdir, logname))
}
_ = jsonFile.Close()
}
// TODO move to deathRow only?
j.Logs = []Log{}
// this will completely clear the finished job
deathRow <- pendingID
// debounces without saving in the backlog
// TODO move this into deathRow?
debacklog <- hook
}()
}

View File

@ -2,8 +2,10 @@ package jobs
import ( import (
"encoding/base64" "encoding/base64"
"encoding/json"
"errors" "errors"
"io" "io"
"io/ioutil"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
@ -16,12 +18,6 @@ import (
"git.rootprojects.org/root/gitdeploy/internal/webhooks" "git.rootprojects.org/root/gitdeploy/internal/webhooks"
) )
var initialized = false
var done = make(chan struct{})
// Promotions channel
var Promotions = make(chan Promotion)
// Pending is the map of backlog jobs // Pending is the map of backlog jobs
// map[webhooks.RefID]*webhooks.GitRef // map[webhooks.RefID]*webhooks.GitRef
var Pending sync.Map var Pending sync.Map
@ -34,13 +30,38 @@ var Actives sync.Map
// map[webhooks.RevID]*Job // map[webhooks.RevID]*Job
var Recents sync.Map var Recents sync.Map
// deathRow is for jobs to be killed // Job represents a job started by the git webhook
// and also the JSON we send back through the API about jobs
type Job struct {
// normal json
StartedAt *time.Time `json:"started_at,omitempty"` // empty when pending
ID string `json:"id"` // could be URLSafeRefID or URLSafeRevID
GitRef *webhooks.Ref `json:"ref"` // always present
PromoteTo string `json:"promote_to,omitempty"` // empty when deploy and test
Promote bool `json:"promote,omitempty"` // empty when deploy and test
EndedAt *time.Time `json:"ended_at,omitempty"` // empty when running
ExitCode *int `json:"exit_code,omitempty"` // empty when running
// full json
Logs []Log `json:"logs,omitempty"` // exist when requested
Report *Report `json:"report,omitempty"` // empty unless given
// internal only
cmd *exec.Cmd `json:"-"`
mux sync.Mutex `json:"-"`
}
// Report should have many items
type Report struct {
Name string `json:"name"`
Status string `json:"status,omitempty"`
Message string `json:"message,omitempty"`
Detail string `json:"detail,omitempty"`
Results []Report `json:"results,omitempty"`
}
var initialized = false
var done = make(chan struct{})
var deathRow = make(chan webhooks.RefID) var deathRow = make(chan webhooks.RefID)
// debounced is for jobs that are ready to run
var debounced = make(chan *webhooks.Ref) var debounced = make(chan *webhooks.Ref)
// debacklog is for debouncing without saving in the backlog
var debacklog = make(chan *webhooks.Ref) var debacklog = make(chan *webhooks.Ref)
// Start starts the job loop, channels, and cleanup routines // Start starts the job loop, channels, and cleanup routines
@ -50,7 +71,7 @@ func Start(runOpts *options.ServerConfig) {
// Run starts the job loop and waits for it to be stopped // Run starts the job loop and waits for it to be stopped
func Run(runOpts *options.ServerConfig) { func Run(runOpts *options.ServerConfig) {
log.Printf("Starting") log.Printf("[gitdeploy] Starting")
if initialized { if initialized {
panic(errors.New("should not double initialize 'jobs'")) panic(errors.New("should not double initialize 'jobs'"))
} }
@ -73,27 +94,26 @@ func Run(runOpts *options.ServerConfig) {
select { select {
case h := <-webhooks.Hooks: case h := <-webhooks.Hooks:
hook := webhooks.New(h) hook := webhooks.New(h)
log.Printf("Saving to backlog and debouncing") //log.Printf("[%s] debouncing...", hook.GetRefID())
saveBacklog(hook, runOpts) saveBacklog(hook, runOpts)
debounce(hook, runOpts) debounce(hook, runOpts)
case hook := <-debacklog: case hook := <-debacklog:
log.Printf("Pulling from backlog and debouncing") //log.Printf("[%s] checking for backlog...", hook.GetRefID())
debounce(hook, runOpts) debounce(hook, runOpts)
case hook := <-debounced: case hook := <-debounced:
log.Printf("Debounced by timer and running") //log.Printf("[%s] debounced!", hook.GetRefID())
run(hook, runOpts) run(hook, runOpts)
case activeID := <-deathRow: case activeID := <-deathRow:
// should !nokill (so... should kill job on the spot?) //log.Printf("[%s] done", activeID)
log.Printf("Removing after running exited, or being killed")
remove(activeID /*, false*/) remove(activeID /*, false*/)
case promotion := <-Promotions: case promotion := <-Promotions:
log.Printf("Promoting from %s to %s", promotion.GitRef.RefName, promotion.PromoteTo) log.Printf("[%s] promoting to %s", promotion.GitRef.GetRefID(), promotion.PromoteTo)
promote(webhooks.New(*promotion.GitRef), promotion.PromoteTo, runOpts) promote(webhooks.New(*promotion.GitRef), promotion.PromoteTo, runOpts)
case <-ticker.C: case <-ticker.C:
log.Printf("Running cleanup for expired, exited jobs") log.Printf("[gitdeploy] cleaning old jobs")
expire(runOpts) expire(runOpts)
case <-done: case <-done:
log.Printf("Stopping") log.Printf("[gitdeploy] stopping")
// TODO kill jobs // TODO kill jobs
ticker.Stop() ticker.Stop()
} }
@ -106,44 +126,6 @@ func Stop() {
initialized = false initialized = false
} }
// Promotion is a channel message
type Promotion struct {
PromoteTo string
GitRef *webhooks.Ref
}
// KillMsg describes which job to kill
type KillMsg struct {
JobID string `json:"job_id"`
Kill bool `json:"kill"`
}
// Job represents a job started by the git webhook
// and also the JSON we send back through the API about jobs
type Job struct {
// normal json
StartedAt time.Time `json:"started_at,omitempty"` // empty when pending
ID string `json:"id"` // could be URLSafeRefID or URLSafeRevID
ExitCode *int `json:"exit_code"` // empty when running
GitRef *webhooks.Ref `json:"ref"` // always present
Promote bool `json:"promote,omitempty"` // empty when deploy and test
EndedAt time.Time `json:"ended_at,omitempty"` // empty when running
// extra
Logs []Log `json:"logs"` // exist when requested
Report *Report `json:"report,omitempty"` // empty unless given
Cmd *exec.Cmd `json:"-"`
mux sync.Mutex `json:"-"`
}
// Report should have many items
type Report struct {
Name string `json:"name"`
Status string `json:"status,omitempty"`
Message string `json:"message,omitempty"`
Detail string `json:"detail,omitempty"`
Results []Report `json:"results,omitempty"`
}
// All returns all jobs, including active, recent, and (TODO) historical // All returns all jobs, including active, recent, and (TODO) historical
func All() []*Job { func All() []*Job {
jobsTimersMux.Lock() jobsTimersMux.Lock()
@ -170,8 +152,7 @@ func All() []*Job {
StartedAt: job.StartedAt, StartedAt: job.StartedAt,
ID: string(job.GitRef.GetURLSafeRefID()), ID: string(job.GitRef.GetURLSafeRefID()),
GitRef: job.GitRef, GitRef: job.GitRef,
Promote: job.Promote, //Promote: job.Promote,
EndedAt: job.EndedAt,
} }
if nil != job.ExitCode { if nil != job.ExitCode {
jobCopy.ExitCode = &(*job.ExitCode) jobCopy.ExitCode = &(*job.ExitCode)
@ -186,8 +167,8 @@ func All() []*Job {
StartedAt: job.StartedAt, StartedAt: job.StartedAt,
ID: string(job.GitRef.GetURLSafeRevID()), ID: string(job.GitRef.GetURLSafeRevID()),
GitRef: job.GitRef, GitRef: job.GitRef,
Promote: job.Promote,
EndedAt: job.EndedAt, EndedAt: job.EndedAt,
//Promote: job.Promote,
} }
if nil != job.ExitCode { if nil != job.ExitCode {
jobCopy.ExitCode = &(*job.ExitCode) jobCopy.ExitCode = &(*job.ExitCode)
@ -230,6 +211,277 @@ func Remove(gitID webhooks.URLSafeRefID /*, nokill bool*/) {
deathRow <- webhooks.RefID(activeID) deathRow <- webhooks.RefID(activeID)
} }
func getJobFilePath(baseDir string, hook *webhooks.Ref, suffix string) (string, string, error) {
baseDir, _ = filepath.Abs(baseDir)
fileTime := hook.Timestamp.UTC().Format(options.TimeFile)
fileName := fileTime + "." + hook.RefName + "." + hook.Rev[:7] + suffix // ".log" or ".json"
fileDir := filepath.Join(baseDir, hook.RepoID)
err := os.MkdirAll(fileDir, 0755)
return fileDir, fileName, err
}
func getJobFile(baseDir string, hook *webhooks.Ref, suffix string) (*os.File, error) {
repoDir, repoFile, err := getJobFilePath(baseDir, hook, suffix)
if nil != err {
//log.Printf("[warn] could not create log directory '%s': %v", repoDir, err)
return nil, err
}
path := filepath.Join(repoDir, repoFile)
return os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0644)
//return fmt.Sprintf("%s#%s", strings.ReplaceAll(hook.RepoID, "/", "-"), hook.RefName)
}
func openJobFile(baseDir string, hook *webhooks.Ref, suffix string) (*os.File, error) {
repoDir, repoFile, _ := getJobFilePath(baseDir, hook, suffix)
return os.Open(filepath.Join(repoDir, repoFile))
}
func setOutput(logDir string, job *Job) *os.File {
var f *os.File = nil
defer func() {
// TODO write to append-only log rather than keep in-memory
// (noting that we want to keep Stdout vs Stderr and timing)
cmd := job.cmd
wout := &outWriter{job: job}
werr := &errWriter{job: job}
if nil != f {
cmd.Stdout = io.MultiWriter(f, wout)
cmd.Stderr = io.MultiWriter(f, werr)
} else {
cmd.Stdout = io.MultiWriter(os.Stdout, wout)
cmd.Stderr = io.MultiWriter(os.Stderr, werr)
}
}()
if "" == logDir {
return nil
}
hook := job.GitRef
f, err := getJobFile(logDir, hook, ".log")
if nil != err {
// f.Name() should be the full path
log.Printf("[warn] could not create log file '%s': %v", logDir, err)
return nil
}
cwd, _ := os.Getwd()
log.Printf("[%s] log to ./%s", hook.GetRefID(), f.Name()[len(cwd)+1:])
return f
}
// Debounce puts a job in the queue, in time
func Debounce(hook webhooks.Ref) {
webhooks.Hooks <- hook
}
var jobsTimersMux sync.Mutex
var debounceTimers = make(map[webhooks.RefID]*time.Timer)
func debounce(hook *webhooks.Ref, runOpts *options.ServerConfig) {
jobsTimersMux.Lock()
defer jobsTimersMux.Unlock()
activeID := hook.GetRefID()
if _, ok := Actives.Load(activeID); ok {
//log.Printf("[%s] will run again after current job", hook.GetRefID())
return
}
refID := hook.GetRefID()
timer, ok := debounceTimers[refID]
if ok {
//log.Printf("[%s] replaced debounce timer", hook.GetRefID())
timer.Stop()
}
// this will not cause a mutual lock because it is async
debounceTimers[refID] = time.AfterFunc(runOpts.DebounceDelay, func() {
jobsTimersMux.Lock()
delete(debounceTimers, refID)
jobsTimersMux.Unlock()
debounced <- hook
})
}
func saveBacklog(hook *webhooks.Ref, runOpts *options.ServerConfig) {
pendingID := hook.GetRefID()
Pending.Store(pendingID, hook)
repoDir, repoFile, err := getBacklogFilePath(runOpts.TmpDir, hook)
if nil != err {
log.Printf("[warn] could not create backlog dir %s:\n%v", repoDir, err)
return
}
f, err := ioutil.TempFile(repoDir, "tmp-*")
if nil != err {
log.Printf("[warn] could not create backlog file %s:\n%v", f.Name(), err)
return
}
b, _ := json.MarshalIndent(hook, "", " ")
if _, err := f.Write(b); nil != err {
log.Printf("[warn] could not write backlog file %s:\n%v", f.Name(), err)
return
}
replace := false
backlogPath := filepath.Join(repoDir, repoFile)
if _, err := os.Stat(backlogPath); nil == err {
replace = true
_ = os.Remove(backlogPath)
}
if err := os.Rename(f.Name(), backlogPath); nil != err {
log.Printf("[warn] rename backlog json failed:\n%v", err)
return
}
if replace {
log.Printf("[%s] updated in queue", hook.GetRefID())
} else {
log.Printf("[%s] added to queue", hook.GetRefID())
}
}
func getBacklogFilePath(baseDir string, hook *webhooks.Ref) (string, string, error) {
baseDir, _ = filepath.Abs(baseDir)
fileName := hook.RefName + ".json"
fileDir := filepath.Join(baseDir, hook.RepoID)
err := os.MkdirAll(fileDir, 0755)
return fileDir, fileName, err
}
func run(curHook *webhooks.Ref, runOpts *options.ServerConfig) {
// because we want to lock the whole transaction all of the state
jobsTimersMux.Lock()
defer jobsTimersMux.Unlock()
pendingID := curHook.GetRefID()
if _, ok := Actives.Load(pendingID); ok {
log.Printf("[%s] already in progress", pendingID)
return
}
var hook *webhooks.Ref
// Legacy, but would be nice to repurpose for resuming on reload
repoDir, repoFile, _ := getBacklogFilePath(runOpts.TmpDir, curHook)
backlogFile := filepath.Join(repoDir, repoFile)
if value, ok := Pending.Load(pendingID); ok {
hook = value.(*webhooks.Ref)
} else {
// TODO add mutex (should not affect temp files)
_ = os.Remove(backlogFile + ".cur")
_ = os.Rename(backlogFile, backlogFile+".cur")
b, err := ioutil.ReadFile(backlogFile + ".cur")
if nil != err {
if !os.IsNotExist(err) {
log.Printf("[warn] could not read backlog file %s:\n%v", backlogFile, err)
return
}
// doesn't exist => no backlog
log.Printf("[%s] no backlog", pendingID)
return
}
hook = &webhooks.Ref{}
if err := json.Unmarshal(b, hook); nil != err {
log.Printf("[warn] could not parse backlog %s:\n%v", backlogFile, err)
return
}
hook = webhooks.New(*hook)
}
Pending.Delete(pendingID)
_ = os.Remove(backlogFile)
_ = os.Remove(backlogFile + ".cur")
env := os.Environ()
envs := getEnvs(runOpts.Addr, string(pendingID), runOpts.RepoList, hook)
envs = append(envs, "GIT_DEPLOY_JOB_ID="+string(pendingID))
scriptPath, _ := filepath.Abs(runOpts.ScriptsPath + "/deploy.sh")
args := []string{"-i", "--", scriptPath}
log.Printf("[%s] bash %s %s %s", hook.GetRefID(), args[0], args[1], args[2])
cmd := exec.Command("bash", append(args, []string{
string(pendingID),
hook.RefName,
hook.RefType,
hook.Owner,
hook.Repo,
hook.HTTPSURL,
}...)...)
cmd.Env = append(env, envs...)
now := time.Now()
j := &Job{
StartedAt: &now,
cmd: cmd,
GitRef: hook,
Logs: []Log{},
Promote: false,
}
// TODO jobs.New()
// Sets cmd.Stdout and cmd.Stderr
txtFile := setOutput(runOpts.LogDir, j)
if err := cmd.Start(); nil != err {
log.Printf("[ERROR] failed to exec: %s\n", err)
return
}
Actives.Store(pendingID, j)
go func() {
//log.Printf("[%s] job started", pendingID)
if err := cmd.Wait(); nil != err {
log.Printf("[%s] exited with error: %v", pendingID, err)
} else {
log.Printf("[%s] exited successfully", pendingID)
}
if nil != txtFile {
_ = txtFile.Close()
}
// TODO move to deathRow only?
updateExitStatus(j)
// Switch ID to the more specific RevID
j.ID = string(j.GitRef.GetRevID())
// replace the text log with a json log
if jsonFile, err := getJobFile(runOpts.LogDir, j.GitRef, ".json"); nil != err {
// jsonFile.Name() should be the full path
log.Printf("[warn] could not create log file '%s': %v", runOpts.LogDir, err)
} else {
enc := json.NewEncoder(jsonFile)
enc.SetIndent("", " ")
if err := enc.Encode(j); nil != err {
log.Printf("[warn] could not encode json log '%s': %v", jsonFile.Name(), err)
} else {
logdir, logname, _ := getJobFilePath(runOpts.LogDir, j.GitRef, ".log")
_ = os.Remove(filepath.Join(logdir, logname))
}
_ = jsonFile.Close()
}
// TODO move to deathRow only?
j.Logs = []Log{}
// this will completely clear the finished job
deathRow <- pendingID
// debounces without saving in the backlog
// TODO move this into deathRow?
debacklog <- hook
}()
}
func getEnvs(addr, activeID string, repoList string, hook *webhooks.Ref) []string { func getEnvs(addr, activeID string, repoList string, hook *webhooks.Ref) []string {
port := strings.Split(addr, ":")[1] port := strings.Split(addr, ":")[1]
@ -274,69 +526,6 @@ func getEnvs(addr, activeID string, repoList string, hook *webhooks.Ref) []strin
return envs return envs
} }
func getJobFilePath(baseDir string, hook *webhooks.Ref, suffix string) (string, string, error) {
baseDir, _ = filepath.Abs(baseDir)
fileTime := hook.Timestamp.UTC().Format(options.TimeFile)
fileName := fileTime + "." + hook.RefName + "." + hook.Rev[:7] + suffix // ".log" or ".json"
fileDir := filepath.Join(baseDir, hook.RepoID)
err := os.MkdirAll(fileDir, 0755)
return fileDir, fileName, err
}
func getJobFile(baseDir string, hook *webhooks.Ref, suffix string) (*os.File, error) {
repoDir, repoFile, err := getJobFilePath(baseDir, hook, suffix)
if nil != err {
//log.Printf("[warn] could not create log directory '%s': %v", repoDir, err)
return nil, err
}
path := filepath.Join(repoDir, repoFile)
return os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0644)
//return fmt.Sprintf("%s#%s", strings.ReplaceAll(hook.RepoID, "/", "-"), hook.RefName)
}
func openJobFile(baseDir string, hook *webhooks.Ref, suffix string) (*os.File, error) {
repoDir, repoFile, _ := getJobFilePath(baseDir, hook, suffix)
return os.Open(filepath.Join(repoDir, repoFile))
}
func setOutput(logDir string, job *Job) *os.File {
var f *os.File = nil
defer func() {
// TODO write to append-only log rather than keep in-memory
// (noting that we want to keep Stdout vs Stderr and timing)
cmd := job.Cmd
wout := &outWriter{job: job}
werr := &outWriter{job: job}
if nil != f {
cmd.Stdout = io.MultiWriter(f, wout)
cmd.Stderr = io.MultiWriter(f, werr)
} else {
cmd.Stdout = io.MultiWriter(os.Stdout, wout)
cmd.Stderr = io.MultiWriter(os.Stderr, werr)
}
}()
if "" == logDir {
return nil
}
hook := job.GitRef
f, err := getJobFile(logDir, hook, ".log")
if nil != err {
// f.Name() should be the full path
log.Printf("[warn] could not create log file '%s': %v", logDir, err)
return nil
}
cwd, _ := os.Getwd()
log.Printf("["+hook.RepoID+"#"+hook.RefName+"] logging to '.%s'", f.Name()[len(cwd):])
return f
}
// Remove kills the job and moves it to recents // Remove kills the job and moves it to recents
func remove(activeID webhooks.RefID /*, nokill bool*/) { func remove(activeID webhooks.RefID /*, nokill bool*/) {
// Encapsulate the whole transaction // Encapsulate the whole transaction
@ -362,21 +551,22 @@ func remove(activeID webhooks.RefID /*, nokill bool*/) {
} }
func updateExitStatus(job *Job) { func updateExitStatus(job *Job) {
if nil == job.Cmd.ProcessState { if nil == job.cmd.ProcessState {
// is not yet finished // is not yet finished
if nil != job.Cmd.Process { if nil != job.cmd.Process {
// but definitely was started // but definitely was started
if err := job.Cmd.Process.Kill(); nil != err { if err := job.cmd.Process.Kill(); nil != err {
log.Printf("error killing job:\n%v", err) log.Printf("error killing job:\n%v", err)
} }
} }
} }
if nil != job.Cmd.ProcessState { if nil != job.cmd.ProcessState {
//*job.ExitCode = job.Cmd.ProcessState.ExitCode() //*job.ExitCode = job.cmd.ProcessState.ExitCode()
exitCode := job.Cmd.ProcessState.ExitCode() exitCode := job.cmd.ProcessState.ExitCode()
job.ExitCode = &exitCode job.ExitCode = &exitCode
} }
job.EndedAt = time.Now() now := time.Now()
job.EndedAt = &now
} }
func expire(runOpts *options.ServerConfig) { func expire(runOpts *options.ServerConfig) {

View File

@ -2,7 +2,6 @@ package jobs
import ( import (
"encoding/base64" "encoding/base64"
"fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
@ -90,7 +89,7 @@ func TestDebounce(t *testing.T) {
Repo: "repo", Repo: "repo",
}) })
t.Log("sleep so job can debounce and start") //t.Log("sleep so job can debounce and start")
time.Sleep(debounceDelay) time.Sleep(debounceDelay)
var jobMatch *Job var jobMatch *Job
@ -98,17 +97,17 @@ func TestDebounce(t *testing.T) {
for i := range all { for i := range all {
// WARN: lock value copied // WARN: lock value copied
j := all[i] j := all[i]
fmt.Printf("[TEST] A-Job[%d]: %s\n%#v\n", i, j.GitRef.Timestamp, *j.GitRef) //t.Logf("[TEST] A-Job[%d]: %s\n%#v\n", i, j.GitRef.Timestamp, *j.GitRef)
if t0.Equal(j.GitRef.Timestamp) || if t0.Equal(j.GitRef.Timestamp) ||
(t1.Equal(j.GitRef.Timestamp) && r1 == j.GitRef.Rev) || (t1.Equal(j.GitRef.Timestamp) && r1 == j.GitRef.Rev) ||
(t2.Equal(j.GitRef.Timestamp) && r2 == j.GitRef.Rev) { (t2.Equal(j.GitRef.Timestamp) && r2 == j.GitRef.Rev) {
t.Error(fmt.Errorf("should not find debounced jobs")) t.Errorf("should not find debounced jobs")
t.Fail() t.Fail()
return return
} }
if t3.Equal(j.GitRef.Timestamp) && r3 == j.GitRef.Rev { if t3.Equal(j.GitRef.Timestamp) && r3 == j.GitRef.Rev {
if nil != jobMatch { if nil != jobMatch {
t.Error(fmt.Errorf("should find only one instance of the 1st long-standing job")) t.Errorf("should find only one instance of the 1st long-standing job")
t.Fail() t.Fail()
return return
} }
@ -116,12 +115,12 @@ func TestDebounce(t *testing.T) {
} }
} }
if nil == jobMatch { if nil == jobMatch {
t.Error(fmt.Errorf("should find the 1st long-standing job")) t.Errorf("should find the 1st long-standing job")
t.Fail() t.Fail()
return return
} }
t.Log("put another job on the queue while job is running") //t.Log("put another job on the queue while job is running")
// backlog debounce // backlog debounce
Debounce(webhooks.Ref{ Debounce(webhooks.Ref{
Timestamp: t4, Timestamp: t4,
@ -145,10 +144,10 @@ func TestDebounce(t *testing.T) {
Repo: "repo", Repo: "repo",
}) })
t.Log("sleep so 1st job can finish") //t.Log("sleep so 1st job can finish")
time.Sleep(jobDelay) time.Sleep(jobDelay)
time.Sleep(jobDelay) time.Sleep(jobDelay)
t.Log("sleep so backlog can debounce") //t.Log("sleep so backlog can debounce")
time.Sleep(debounceDelay) time.Sleep(debounceDelay)
//var j *Job //var j *Job
@ -156,15 +155,15 @@ func TestDebounce(t *testing.T) {
all = All() all = All()
for i := range all { for i := range all {
j := all[i] j := all[i]
fmt.Printf("[TEST] B-Job[%d]: %s\n%#v\n", i, j.GitRef.Timestamp, *j.GitRef) //t.Logf("[TEST] B-Job[%d]: %s\n%#v\n", i, j.GitRef.Timestamp, *j.GitRef)
if t4.Equal(j.GitRef.Timestamp) && r4 == j.GitRef.Rev { if t4.Equal(j.GitRef.Timestamp) && r4 == j.GitRef.Rev {
t.Error(fmt.Errorf("should not find debounced jobs")) t.Errorf("should not find debounced jobs")
t.Fail() t.Fail()
return return
} }
if t5.Equal(j.GitRef.Timestamp) && r5 == j.GitRef.Rev { if t5.Equal(j.GitRef.Timestamp) && r5 == j.GitRef.Rev {
if nil != jobMatch { if nil != jobMatch {
t.Error(fmt.Errorf("should find only one instance of the 2nd long-standing job")) t.Errorf("should find only one instance of the 2nd long-standing job")
t.Fail() t.Fail()
return return
} }
@ -172,15 +171,15 @@ func TestDebounce(t *testing.T) {
} }
} }
if nil == jobMatch { if nil == jobMatch {
t.Error(fmt.Errorf("should find the 2nd long-standing job: %s %s", t5, r5)) t.Errorf("should find the 2nd long-standing job: %s %s", t5, r5)
t.Fail() t.Fail()
return return
} }
t.Log("sleep so 2nd job can finish") //t.Log("sleep so 2nd job can finish")
time.Sleep(jobDelay) time.Sleep(jobDelay)
t.Log("sleep to ensure no more backlogs exist") //t.Log("sleep to ensure no more backlogs exist")
time.Sleep(jobDelay) time.Sleep(jobDelay)
time.Sleep(debounceDelay) time.Sleep(debounceDelay)
time.Sleep(debounceDelay) time.Sleep(debounceDelay)
@ -226,7 +225,7 @@ func TestRecents(t *testing.T) {
} }
Debounce(hook) Debounce(hook)
t.Log("sleep so job can debounce and start") //t.Log("sleep so job can debounce and start")
time.Sleep(debounceDelay) time.Sleep(debounceDelay)
time.Sleep(jobDelay) time.Sleep(jobDelay)
@ -247,7 +246,7 @@ func TestRecents(t *testing.T) {
} }
if len(j.Logs) < 3 { if len(j.Logs) < 3 {
t.Errorf("should have logs from test deploy script") t.Errorf("should have logs from test deploy script: %#v", j.Logs)
t.Fail() t.Fail()
return return
} }
@ -258,9 +257,9 @@ func TestRecents(t *testing.T) {
return return
} }
t.Logf("[DEBUG] Report:\n%#v", j.Report) //t.Logf("[DEBUG] Report:\n%#v", j.Report)
t.Logf("Logs:\n%v", err) //t.Logf("Logs:\n%v", err)
//Stop() //Stop()
} }

View File

@ -62,6 +62,10 @@ func WalkLogs(runOpts *options.ServerConfig) ([]*Job, error) {
// don't keep all the logs in memory // don't keep all the logs in memory
j.Logs = []Log{} j.Logs = []Log{}
j.ID = string(j.GitRef.GetRevID()) j.ID = string(j.GitRef.GetRevID())
if nil == j.EndedAt {
now := time.Now()
j.EndedAt = &now
}
oldJobs = append(oldJobs, j) oldJobs = append(oldJobs, j)
} }
} }
@ -75,9 +79,11 @@ func WalkLogs(runOpts *options.ServerConfig) ([]*Job, error) {
RefName: rev[1], RefName: rev[1],
Rev: rev[2], Rev: rev[2],
} }
now := time.Now()
oldJobs = append(oldJobs, &Job{ oldJobs = append(oldJobs, &Job{
ID: string(hook.GetRevID()), ID: string(hook.GetRevID()),
GitRef: hook, GitRef: hook,
EndedAt: &now,
}) })
} }
} }
@ -85,7 +91,7 @@ func WalkLogs(runOpts *options.ServerConfig) ([]*Job, error) {
// ExpiredLogAge can be 0 for testing, // ExpiredLogAge can be 0 for testing,
// even when StaleLogAge is > 0 // even when StaleLogAge is > 0
if age >= runOpts.ExpiredLogAge { if age >= runOpts.ExpiredLogAge {
log.Printf("[info] remove log file: %s", logpath) log.Printf("[gitdeploy] remove %s", logpath)
os.Remove(logpath) os.Remove(logpath)
} }
@ -111,11 +117,10 @@ func LoadLogs(runOpts *options.ServerConfig, safeID webhooks.URLSafeGitID) (*Job
var f *os.File = nil var f *os.File = nil
if value, ok := Actives.Load(refID); ok { if value, ok := Actives.Load(refID); ok {
j := value.(*Job) j := value.(*Job)
f, err = openJobFile(runOpts.LogDir, j.GitRef, ".json") return j, nil
if nil != err { }
return nil, err
} if value, ok := Recents.Load(revID); ok {
} else if value, ok := Recents.Load(revID); ok {
j := value.(*Job) j := value.(*Job)
f, err = openJobFile(runOpts.LogDir, j.GitRef, ".json") f, err = openJobFile(runOpts.LogDir, j.GitRef, ".json")
if nil != err { if nil != err {

View File

@ -10,6 +10,15 @@ import (
"git.rootprojects.org/root/gitdeploy/internal/webhooks" "git.rootprojects.org/root/gitdeploy/internal/webhooks"
) )
// Promotions channel
var Promotions = make(chan Promotion)
// Promotion is a channel message
type Promotion struct {
PromoteTo string
GitRef *webhooks.Ref
}
// Promote will run the promote script // Promote will run the promote script
func Promote(msg webhooks.Ref, promoteTo string) { func Promote(msg webhooks.Ref, promoteTo string) {
Promotions <- Promotion{ Promotions <- Promotion{
@ -61,18 +70,32 @@ func promote(hook *webhooks.Ref, promoteTo string, runOpts *options.ServerConfig
return return
} }
now := time.Now() t := time.Now()
now := &t
promoteID := hook.RepoID + "#" + hook.RefName + ".." + promoteTo
Actives.Store(jobID1, &Job{ Actives.Store(jobID1, &Job{
StartedAt: now, StartedAt: now,
Cmd: cmd, ID: promoteID,
GitRef: hook, GitRef: hook,
Promote: true, PromoteTo: promoteTo,
Promote: true, // deprecated
cmd: cmd,
}) })
Actives.Store(jobID2, &Job{ Actives.Store(jobID2, &Job{
StartedAt: now, StartedAt: now,
Cmd: cmd, ID: promoteID,
GitRef: hook, GitRef: hook,
Promote: true, PromoteTo: promoteTo,
Promote: true, // deprecated
cmd: cmd,
})
Actives.Store(promoteID, &Job{
StartedAt: now,
ID: promoteID,
GitRef: hook,
PromoteTo: promoteTo,
Promote: true, // deprecated
cmd: cmd,
}) })
go func() { go func() {
@ -80,6 +103,7 @@ func promote(hook *webhooks.Ref, promoteTo string, runOpts *options.ServerConfig
_ = cmd.Wait() _ = cmd.Wait()
deathRow <- jobID1 deathRow <- jobID1
deathRow <- jobID2 deathRow <- jobID2
deathRow <- webhooks.RefID(promoteID)
log.Printf("gitdeploy promote for %s#%s finished\n", hook.HTTPSURL, hook.RefName) log.Printf("gitdeploy promote for %s#%s finished\n", hook.HTTPSURL, hook.RefName)
// TODO check for backlog // TODO check for backlog
}() }()

View File

@ -171,10 +171,10 @@ func main() {
log.Printf("TEMP_DIR=%s", runOpts.TmpDir) log.Printf("TEMP_DIR=%s", runOpts.TmpDir)
} }
if 0 == runOpts.DebounceDelay { if 0 == runOpts.DebounceDelay {
runOpts.DebounceDelay = 2 * time.Second runOpts.DebounceDelay = 5 * time.Second
} }
if 0 == runOpts.StaleJobAge { if 0 == runOpts.StaleJobAge {
runOpts.StaleJobAge = 30 * time.Minute runOpts.StaleJobAge = 3 * 24 * time.Hour
} }
if 0 == runOpts.StaleLogAge { if 0 == runOpts.StaleLogAge {
runOpts.StaleLogAge = 15 * 24 * time.Hour runOpts.StaleLogAge = 15 * 24 * time.Hour