diff --git a/internal/api/api.go b/internal/api/api.go index faa26b1..4ed1055 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -43,8 +43,6 @@ func RouteStopped(r chi.Router, runOpts *options.ServerConfig) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // r.Body is always .Close()ed by Go's http server r.Body = http.MaxBytesReader(w, r.Body, options.DefaultMaxBodySize) - // TODO admin auth middleware - log.Printf("TODO: handle authentication") next.ServeHTTP(w, r) }) }) @@ -96,6 +94,9 @@ func RouteStopped(r chi.Router, runOpts *options.ServerConfig) { r.Get("/logs/{oldID}", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") + // TODO admin auth middleware + log.Printf("[TODO] handle AUTH (logs could be sensitive)") + oldID := webhooks.URLSafeGitID(chi.URLParam(r, "oldID")) // TODO add `since` j, err := jobs.LoadLogs(runOpts, oldID) @@ -149,7 +150,7 @@ func RouteStopped(r chi.Router, runOpts *options.ServerConfig) { r.Post("/jobs", func(w http.ResponseWriter, r *http.Request) { decoder := json.NewDecoder(r.Body) - msg := &jobs.KillMsg{} + msg := &KillMsg{} if err := decoder.Decode(msg); nil != err { log.Printf("kill job invalid json:\n%v", err) http.Error(w, "invalid json body", http.StatusBadRequest) @@ -272,3 +273,9 @@ type Repo struct { CloneURL string `json:"clone_url"` Promotions []string `json:"_promotions"` } + +// KillMsg describes which job to kill +type KillMsg struct { + JobID string `json:"job_id"` + Kill bool `json:"kill"` +} diff --git a/internal/api/api_test.go b/internal/api/api_test.go index b8a40d1..766304d 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -1,6 +1,7 @@ package api import ( + "encoding/json" "fmt" "io/ioutil" "net/http" @@ -28,7 +29,7 @@ func init() { runOpts = &options.ServerConfig{ //Addr: "localhost:4483", ScriptsPath: "./testdata", - LogDir: "./test-logs/debounce", + LogDir: "./test-logs/api", TmpDir: tmpDir, DebounceDelay: 25 * time.Millisecond, StaleJobAge: 5 * time.Minute, @@ -69,16 +70,12 @@ func TestCallback(t *testing.T) { } // , _ := json.MarshallIndent(&hook , "", " ") jobs.Debounce(hook) - /* body := bytes.NewReader(hook) r := httptest.NewRequest("POST", "/api/local/webhook", body) - - //dec := json.NewDecoder(r.Body) - //dec.Decode() */ - t.Log("sleep so job can debounce, start, and finish") + // TODO use callback or chan chan to avoid sleeps? time.Sleep(debounceDelay) time.Sleep(jobDelay) @@ -92,16 +89,32 @@ func TestCallback(t *testing.T) { ) resp, err := http.Get(reqURL) if nil != err { - t.Logf("[DEBUG] Response Error: %v", err) + t.Errorf("HTTP response error: %s\n%#v", reqURL, err) return } - t.Logf("[DEBUG] Request URL: %s", reqURL) - t.Logf("[DEBUG] Response Headers: %d %#v", resp.StatusCode, resp.Header) - b, err := ioutil.ReadAll(resp.Body) - if nil != err { - t.Logf("[DEBUG] Response Error: %v", err) + job := &jobs.Job{} + dec := json.NewDecoder(resp.Body) + if err := dec.Decode(job); nil != err { + t.Errorf( + "response decode error: %d %s\n%#v\n%#v", + resp.StatusCode, reqURL, resp.Header, err, + ) + return + } + + if len(job.Logs) < 3 { + t.Errorf("too few logs: %s\n%#v", reqURL, job) + return + } + + if nil == job.Report || len(job.Report.Results) < 1 { + t.Errorf("too few results: %s\n%#v", reqURL, job) + return + } + + if nil == job.ExitCode || 0 != *job.ExitCode { + t.Errorf("non-zero exit code: %s\n%#v", reqURL, job) return } - t.Logf("[DEBUG] Response Body: %v", string(b)) } diff --git a/internal/jobs/backlog.go b/internal/jobs/backlog.go deleted file mode 100644 index d38f7d8..0000000 --- a/internal/jobs/backlog.go +++ /dev/null @@ -1,233 +0,0 @@ -package jobs - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "os" - "os/exec" - "path/filepath" - "strings" - "sync" - "time" - - "git.rootprojects.org/root/gitdeploy/internal/log" - "git.rootprojects.org/root/gitdeploy/internal/options" - "git.rootprojects.org/root/gitdeploy/internal/webhooks" -) - -// Debounce puts a job in the queue, in time -func Debounce(hook webhooks.Ref) { - webhooks.Hooks <- hook -} - -var jobsTimersMux sync.Mutex -var debounceTimers = make(map[webhooks.RefID]*time.Timer) - -func debounce(hook *webhooks.Ref, runOpts *options.ServerConfig) { - jobsTimersMux.Lock() - defer jobsTimersMux.Unlock() - - activeID := hook.GetRefID() - if _, ok := Actives.Load(activeID); ok { - log.Printf("Job in progress, not debouncing %s", hook) - return - } - - refID := hook.GetRefID() - timer, ok := debounceTimers[refID] - if ok { - log.Printf("Replacing previous debounce timer for %s", hook) - timer.Stop() - } - // this will not cause a mutual lock because it is async - debounceTimers[refID] = time.AfterFunc(runOpts.DebounceDelay, func() { - jobsTimersMux.Lock() - delete(debounceTimers, refID) - jobsTimersMux.Unlock() - - debounced <- hook - }) -} - -func getBacklogFilePath(baseDir string, hook *webhooks.Ref) (string, string, error) { - baseDir, _ = filepath.Abs(baseDir) - fileName := hook.RefName + ".json" - fileDir := filepath.Join(baseDir, hook.RepoID) - - err := os.MkdirAll(fileDir, 0755) - - return fileDir, fileName, err -} - -func saveBacklog(hook *webhooks.Ref, runOpts *options.ServerConfig) { - pendingID := hook.GetRefID() - Pending.Store(pendingID, hook) - - repoDir, repoFile, err := getBacklogFilePath(runOpts.TmpDir, hook) - if nil != err { - log.Printf("[WARN] could not create backlog dir %s:\n%v", repoDir, err) - return - } - - f, err := ioutil.TempFile(repoDir, "tmp-*") - if nil != err { - log.Printf("[WARN] could not create backlog file %s:\n%v", f.Name(), err) - return - } - - b, _ := json.MarshalIndent(hook, "", " ") - if _, err := f.Write(b); nil != err { - log.Printf("[WARN] could not write backlog file %s:\n%v", f.Name(), err) - return - } - - replace := false - backlogPath := filepath.Join(repoDir, repoFile) - if _, err := os.Stat(backlogPath); nil == err { - replace = true - _ = os.Remove(backlogPath) - } - if err := os.Rename(f.Name(), backlogPath); nil != err { - log.Printf("[WARN] rename backlog json failed:\n%v", err) - return - } - - if replace { - log.Printf("[backlog] replace backlog for %s", hook.GetRefID()) - } else { - log.Printf("[backlog] create backlog for %s", hook.GetRefID()) - } -} - -func run(curHook *webhooks.Ref, runOpts *options.ServerConfig) { - // because we want to lock the whole transaction all of the state - jobsTimersMux.Lock() - defer jobsTimersMux.Unlock() - - pendingID := curHook.GetRefID() - if _, ok := Actives.Load(pendingID); ok { - log.Printf("Job already in progress: %s", curHook.GetRefID()) - return - } - - var hook *webhooks.Ref - // Legacy, but would be nice to repurpose for resuming on reload - repoDir, repoFile, _ := getBacklogFilePath(runOpts.TmpDir, curHook) - backlogFile := filepath.Join(repoDir, repoFile) - if value, ok := Pending.Load(pendingID); ok { - hook = value.(*webhooks.Ref) - log.Printf("loaded from Pending state: %#v", hook) - } else { - // TODO add mutex (should not affect temp files) - _ = os.Remove(backlogFile + ".cur") - _ = os.Rename(backlogFile, backlogFile+".cur") - b, err := ioutil.ReadFile(backlogFile + ".cur") - if nil != err { - if !os.IsNotExist(err) { - log.Printf("[warn] could not read backlog file %s:\n%v", repoFile, err) - } - // doesn't exist => no backlog - log.Printf("[NO BACKLOG] no backlog for %s", repoFile) - return - } - - hook = &webhooks.Ref{} - if err := json.Unmarshal(b, hook); nil != err { - log.Printf("[warn] could not parse backlog file %s:\n%v", repoFile, err) - return - } - hook = webhooks.New(*hook) - log.Printf("loaded from file: %#v", hook) - } - - Pending.Delete(pendingID) - _ = os.Remove(backlogFile) - _ = os.Remove(backlogFile + ".cur") - - env := os.Environ() - envs := getEnvs(runOpts.Addr, string(pendingID), runOpts.RepoList, hook) - envs = append(envs, "GIT_DEPLOY_JOB_ID="+string(pendingID)) - - scriptPath, _ := filepath.Abs(runOpts.ScriptsPath + "/deploy.sh") - args := []string{ - "-i", - "--", - //strings.Join([]string{ - scriptPath, - string(pendingID), - hook.RefName, - hook.RefType, - hook.Owner, - hook.Repo, - hook.HTTPSURL, - //}, " "), - } - - args2 := append([]string{"[" + string(hook.GetRefID()) + "]", "bash"}, args...) - fmt.Println(strings.Join(args2, " ")) - cmd := exec.Command("bash", args...) - cmd.Env = append(env, envs...) - - now := time.Now() - j := &Job{ - StartedAt: now, - Cmd: cmd, - GitRef: hook, - Logs: []Log{}, - Promote: false, - } - // TODO jobs.New() - // Sets cmd.Stdout and cmd.Stderr - txtFile := setOutput(runOpts.LogDir, j) - - if err := cmd.Start(); nil != err { - log.Printf("gitdeploy exec error: %s\n", err) - return - } - - Actives.Store(pendingID, j) - - go func() { - log.Printf("Started job for %s", hook) - if err := cmd.Wait(); nil != err { - log.Printf("gitdeploy job for %s#%s exited with error: %v", hook.HTTPSURL, hook.RefName, err) - } else { - log.Printf("gitdeploy job for %s#%s finished\n", hook.HTTPSURL, hook.RefName) - } - if nil != txtFile { - _ = txtFile.Close() - } - - // TODO move to deathRow only? - updateExitStatus(j) - - // Switch ID to the more specific RevID - j.ID = string(j.GitRef.GetRevID()) - // replace the text log with a json log - if jsonFile, err := getJobFile(runOpts.LogDir, j.GitRef, ".json"); nil != err { - // jsonFile.Name() should be the full path - log.Printf("[warn] could not create log file '%s': %v", runOpts.LogDir, err) - } else { - enc := json.NewEncoder(jsonFile) - enc.SetIndent("", " ") - if err := enc.Encode(j); nil != err { - log.Printf("[warn] could not encode json log '%s': %v", jsonFile.Name(), err) - } else { - logdir, logname, _ := getJobFilePath(runOpts.LogDir, j.GitRef, ".log") - _ = os.Remove(filepath.Join(logdir, logname)) - } - _ = jsonFile.Close() - } - - // TODO move to deathRow only? - j.Logs = []Log{} - - // this will completely clear the finished job - deathRow <- pendingID - - // debounces without saving in the backlog - // TODO move this into deathRow? - debacklog <- hook - }() -} diff --git a/internal/jobs/job.go b/internal/jobs/job.go index 28266a0..59ff8bf 100644 --- a/internal/jobs/job.go +++ b/internal/jobs/job.go @@ -2,8 +2,10 @@ package jobs import ( "encoding/base64" + "encoding/json" "errors" "io" + "io/ioutil" "os" "os/exec" "path/filepath" @@ -16,12 +18,6 @@ import ( "git.rootprojects.org/root/gitdeploy/internal/webhooks" ) -var initialized = false -var done = make(chan struct{}) - -// Promotions channel -var Promotions = make(chan Promotion) - // Pending is the map of backlog jobs // map[webhooks.RefID]*webhooks.GitRef var Pending sync.Map @@ -34,13 +30,38 @@ var Actives sync.Map // map[webhooks.RevID]*Job var Recents sync.Map -// deathRow is for jobs to be killed +// Job represents a job started by the git webhook +// and also the JSON we send back through the API about jobs +type Job struct { + // normal json + StartedAt *time.Time `json:"started_at,omitempty"` // empty when pending + ID string `json:"id"` // could be URLSafeRefID or URLSafeRevID + GitRef *webhooks.Ref `json:"ref"` // always present + PromoteTo string `json:"promote_to,omitempty"` // empty when deploy and test + Promote bool `json:"promote,omitempty"` // empty when deploy and test + EndedAt *time.Time `json:"ended_at,omitempty"` // empty when running + ExitCode *int `json:"exit_code,omitempty"` // empty when running + // full json + Logs []Log `json:"logs,omitempty"` // exist when requested + Report *Report `json:"report,omitempty"` // empty unless given + // internal only + cmd *exec.Cmd `json:"-"` + mux sync.Mutex `json:"-"` +} + +// Report should have many items +type Report struct { + Name string `json:"name"` + Status string `json:"status,omitempty"` + Message string `json:"message,omitempty"` + Detail string `json:"detail,omitempty"` + Results []Report `json:"results,omitempty"` +} + +var initialized = false +var done = make(chan struct{}) var deathRow = make(chan webhooks.RefID) - -// debounced is for jobs that are ready to run var debounced = make(chan *webhooks.Ref) - -// debacklog is for debouncing without saving in the backlog var debacklog = make(chan *webhooks.Ref) // Start starts the job loop, channels, and cleanup routines @@ -50,7 +71,7 @@ func Start(runOpts *options.ServerConfig) { // Run starts the job loop and waits for it to be stopped func Run(runOpts *options.ServerConfig) { - log.Printf("Starting") + log.Printf("[gitdeploy] Starting") if initialized { panic(errors.New("should not double initialize 'jobs'")) } @@ -73,27 +94,26 @@ func Run(runOpts *options.ServerConfig) { select { case h := <-webhooks.Hooks: hook := webhooks.New(h) - log.Printf("Saving to backlog and debouncing") + //log.Printf("[%s] debouncing...", hook.GetRefID()) saveBacklog(hook, runOpts) debounce(hook, runOpts) case hook := <-debacklog: - log.Printf("Pulling from backlog and debouncing") + //log.Printf("[%s] checking for backlog...", hook.GetRefID()) debounce(hook, runOpts) case hook := <-debounced: - log.Printf("Debounced by timer and running") + //log.Printf("[%s] debounced!", hook.GetRefID()) run(hook, runOpts) case activeID := <-deathRow: - // should !nokill (so... should kill job on the spot?) - log.Printf("Removing after running exited, or being killed") + //log.Printf("[%s] done", activeID) remove(activeID /*, false*/) case promotion := <-Promotions: - log.Printf("Promoting from %s to %s", promotion.GitRef.RefName, promotion.PromoteTo) + log.Printf("[%s] promoting to %s", promotion.GitRef.GetRefID(), promotion.PromoteTo) promote(webhooks.New(*promotion.GitRef), promotion.PromoteTo, runOpts) case <-ticker.C: - log.Printf("Running cleanup for expired, exited jobs") + log.Printf("[gitdeploy] cleaning old jobs") expire(runOpts) case <-done: - log.Printf("Stopping") + log.Printf("[gitdeploy] stopping") // TODO kill jobs ticker.Stop() } @@ -106,44 +126,6 @@ func Stop() { initialized = false } -// Promotion is a channel message -type Promotion struct { - PromoteTo string - GitRef *webhooks.Ref -} - -// KillMsg describes which job to kill -type KillMsg struct { - JobID string `json:"job_id"` - Kill bool `json:"kill"` -} - -// Job represents a job started by the git webhook -// and also the JSON we send back through the API about jobs -type Job struct { - // normal json - StartedAt time.Time `json:"started_at,omitempty"` // empty when pending - ID string `json:"id"` // could be URLSafeRefID or URLSafeRevID - ExitCode *int `json:"exit_code"` // empty when running - GitRef *webhooks.Ref `json:"ref"` // always present - Promote bool `json:"promote,omitempty"` // empty when deploy and test - EndedAt time.Time `json:"ended_at,omitempty"` // empty when running - // extra - Logs []Log `json:"logs"` // exist when requested - Report *Report `json:"report,omitempty"` // empty unless given - Cmd *exec.Cmd `json:"-"` - mux sync.Mutex `json:"-"` -} - -// Report should have many items -type Report struct { - Name string `json:"name"` - Status string `json:"status,omitempty"` - Message string `json:"message,omitempty"` - Detail string `json:"detail,omitempty"` - Results []Report `json:"results,omitempty"` -} - // All returns all jobs, including active, recent, and (TODO) historical func All() []*Job { jobsTimersMux.Lock() @@ -170,8 +152,7 @@ func All() []*Job { StartedAt: job.StartedAt, ID: string(job.GitRef.GetURLSafeRefID()), GitRef: job.GitRef, - Promote: job.Promote, - EndedAt: job.EndedAt, + //Promote: job.Promote, } if nil != job.ExitCode { jobCopy.ExitCode = &(*job.ExitCode) @@ -186,8 +167,8 @@ func All() []*Job { StartedAt: job.StartedAt, ID: string(job.GitRef.GetURLSafeRevID()), GitRef: job.GitRef, - Promote: job.Promote, EndedAt: job.EndedAt, + //Promote: job.Promote, } if nil != job.ExitCode { jobCopy.ExitCode = &(*job.ExitCode) @@ -230,6 +211,277 @@ func Remove(gitID webhooks.URLSafeRefID /*, nokill bool*/) { deathRow <- webhooks.RefID(activeID) } +func getJobFilePath(baseDir string, hook *webhooks.Ref, suffix string) (string, string, error) { + baseDir, _ = filepath.Abs(baseDir) + fileTime := hook.Timestamp.UTC().Format(options.TimeFile) + fileName := fileTime + "." + hook.RefName + "." + hook.Rev[:7] + suffix // ".log" or ".json" + fileDir := filepath.Join(baseDir, hook.RepoID) + + err := os.MkdirAll(fileDir, 0755) + + return fileDir, fileName, err +} + +func getJobFile(baseDir string, hook *webhooks.Ref, suffix string) (*os.File, error) { + repoDir, repoFile, err := getJobFilePath(baseDir, hook, suffix) + if nil != err { + //log.Printf("[warn] could not create log directory '%s': %v", repoDir, err) + return nil, err + } + + path := filepath.Join(repoDir, repoFile) + return os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0644) + //return fmt.Sprintf("%s#%s", strings.ReplaceAll(hook.RepoID, "/", "-"), hook.RefName) +} + +func openJobFile(baseDir string, hook *webhooks.Ref, suffix string) (*os.File, error) { + repoDir, repoFile, _ := getJobFilePath(baseDir, hook, suffix) + return os.Open(filepath.Join(repoDir, repoFile)) +} + +func setOutput(logDir string, job *Job) *os.File { + var f *os.File = nil + + defer func() { + // TODO write to append-only log rather than keep in-memory + // (noting that we want to keep Stdout vs Stderr and timing) + cmd := job.cmd + wout := &outWriter{job: job} + werr := &errWriter{job: job} + if nil != f { + cmd.Stdout = io.MultiWriter(f, wout) + cmd.Stderr = io.MultiWriter(f, werr) + } else { + cmd.Stdout = io.MultiWriter(os.Stdout, wout) + cmd.Stderr = io.MultiWriter(os.Stderr, werr) + } + }() + + if "" == logDir { + return nil + } + + hook := job.GitRef + f, err := getJobFile(logDir, hook, ".log") + if nil != err { + // f.Name() should be the full path + log.Printf("[warn] could not create log file '%s': %v", logDir, err) + return nil + } + + cwd, _ := os.Getwd() + log.Printf("[%s] log to ./%s", hook.GetRefID(), f.Name()[len(cwd)+1:]) + return f +} + +// Debounce puts a job in the queue, in time +func Debounce(hook webhooks.Ref) { + webhooks.Hooks <- hook +} + +var jobsTimersMux sync.Mutex +var debounceTimers = make(map[webhooks.RefID]*time.Timer) + +func debounce(hook *webhooks.Ref, runOpts *options.ServerConfig) { + jobsTimersMux.Lock() + defer jobsTimersMux.Unlock() + + activeID := hook.GetRefID() + if _, ok := Actives.Load(activeID); ok { + //log.Printf("[%s] will run again after current job", hook.GetRefID()) + return + } + + refID := hook.GetRefID() + timer, ok := debounceTimers[refID] + if ok { + //log.Printf("[%s] replaced debounce timer", hook.GetRefID()) + timer.Stop() + } + // this will not cause a mutual lock because it is async + debounceTimers[refID] = time.AfterFunc(runOpts.DebounceDelay, func() { + jobsTimersMux.Lock() + delete(debounceTimers, refID) + jobsTimersMux.Unlock() + + debounced <- hook + }) +} + +func saveBacklog(hook *webhooks.Ref, runOpts *options.ServerConfig) { + pendingID := hook.GetRefID() + Pending.Store(pendingID, hook) + + repoDir, repoFile, err := getBacklogFilePath(runOpts.TmpDir, hook) + if nil != err { + log.Printf("[warn] could not create backlog dir %s:\n%v", repoDir, err) + return + } + f, err := ioutil.TempFile(repoDir, "tmp-*") + if nil != err { + log.Printf("[warn] could not create backlog file %s:\n%v", f.Name(), err) + return + } + + b, _ := json.MarshalIndent(hook, "", " ") + if _, err := f.Write(b); nil != err { + log.Printf("[warn] could not write backlog file %s:\n%v", f.Name(), err) + return + } + + replace := false + backlogPath := filepath.Join(repoDir, repoFile) + if _, err := os.Stat(backlogPath); nil == err { + replace = true + _ = os.Remove(backlogPath) + } + if err := os.Rename(f.Name(), backlogPath); nil != err { + log.Printf("[warn] rename backlog json failed:\n%v", err) + return + } + + if replace { + log.Printf("[%s] updated in queue", hook.GetRefID()) + } else { + log.Printf("[%s] added to queue", hook.GetRefID()) + } +} + +func getBacklogFilePath(baseDir string, hook *webhooks.Ref) (string, string, error) { + baseDir, _ = filepath.Abs(baseDir) + fileName := hook.RefName + ".json" + fileDir := filepath.Join(baseDir, hook.RepoID) + + err := os.MkdirAll(fileDir, 0755) + + return fileDir, fileName, err +} + +func run(curHook *webhooks.Ref, runOpts *options.ServerConfig) { + // because we want to lock the whole transaction all of the state + jobsTimersMux.Lock() + defer jobsTimersMux.Unlock() + + pendingID := curHook.GetRefID() + if _, ok := Actives.Load(pendingID); ok { + log.Printf("[%s] already in progress", pendingID) + return + } + + var hook *webhooks.Ref + // Legacy, but would be nice to repurpose for resuming on reload + repoDir, repoFile, _ := getBacklogFilePath(runOpts.TmpDir, curHook) + backlogFile := filepath.Join(repoDir, repoFile) + if value, ok := Pending.Load(pendingID); ok { + hook = value.(*webhooks.Ref) + } else { + // TODO add mutex (should not affect temp files) + _ = os.Remove(backlogFile + ".cur") + _ = os.Rename(backlogFile, backlogFile+".cur") + b, err := ioutil.ReadFile(backlogFile + ".cur") + if nil != err { + if !os.IsNotExist(err) { + log.Printf("[warn] could not read backlog file %s:\n%v", backlogFile, err) + return + } + // doesn't exist => no backlog + log.Printf("[%s] no backlog", pendingID) + return + } + + hook = &webhooks.Ref{} + if err := json.Unmarshal(b, hook); nil != err { + log.Printf("[warn] could not parse backlog %s:\n%v", backlogFile, err) + return + } + hook = webhooks.New(*hook) + } + + Pending.Delete(pendingID) + _ = os.Remove(backlogFile) + _ = os.Remove(backlogFile + ".cur") + + env := os.Environ() + envs := getEnvs(runOpts.Addr, string(pendingID), runOpts.RepoList, hook) + envs = append(envs, "GIT_DEPLOY_JOB_ID="+string(pendingID)) + + scriptPath, _ := filepath.Abs(runOpts.ScriptsPath + "/deploy.sh") + args := []string{"-i", "--", scriptPath} + + log.Printf("[%s] bash %s %s %s", hook.GetRefID(), args[0], args[1], args[2]) + cmd := exec.Command("bash", append(args, []string{ + string(pendingID), + hook.RefName, + hook.RefType, + hook.Owner, + hook.Repo, + hook.HTTPSURL, + }...)...) + cmd.Env = append(env, envs...) + + now := time.Now() + j := &Job{ + StartedAt: &now, + cmd: cmd, + GitRef: hook, + Logs: []Log{}, + Promote: false, + } + // TODO jobs.New() + // Sets cmd.Stdout and cmd.Stderr + txtFile := setOutput(runOpts.LogDir, j) + + if err := cmd.Start(); nil != err { + log.Printf("[ERROR] failed to exec: %s\n", err) + return + } + + Actives.Store(pendingID, j) + + go func() { + //log.Printf("[%s] job started", pendingID) + if err := cmd.Wait(); nil != err { + log.Printf("[%s] exited with error: %v", pendingID, err) + } else { + log.Printf("[%s] exited successfully", pendingID) + } + if nil != txtFile { + _ = txtFile.Close() + } + + // TODO move to deathRow only? + updateExitStatus(j) + + // Switch ID to the more specific RevID + j.ID = string(j.GitRef.GetRevID()) + // replace the text log with a json log + if jsonFile, err := getJobFile(runOpts.LogDir, j.GitRef, ".json"); nil != err { + // jsonFile.Name() should be the full path + log.Printf("[warn] could not create log file '%s': %v", runOpts.LogDir, err) + } else { + enc := json.NewEncoder(jsonFile) + enc.SetIndent("", " ") + if err := enc.Encode(j); nil != err { + log.Printf("[warn] could not encode json log '%s': %v", jsonFile.Name(), err) + } else { + logdir, logname, _ := getJobFilePath(runOpts.LogDir, j.GitRef, ".log") + _ = os.Remove(filepath.Join(logdir, logname)) + } + _ = jsonFile.Close() + } + + // TODO move to deathRow only? + j.Logs = []Log{} + + // this will completely clear the finished job + deathRow <- pendingID + + // debounces without saving in the backlog + // TODO move this into deathRow? + debacklog <- hook + }() +} + func getEnvs(addr, activeID string, repoList string, hook *webhooks.Ref) []string { port := strings.Split(addr, ":")[1] @@ -274,69 +526,6 @@ func getEnvs(addr, activeID string, repoList string, hook *webhooks.Ref) []strin return envs } -func getJobFilePath(baseDir string, hook *webhooks.Ref, suffix string) (string, string, error) { - baseDir, _ = filepath.Abs(baseDir) - fileTime := hook.Timestamp.UTC().Format(options.TimeFile) - fileName := fileTime + "." + hook.RefName + "." + hook.Rev[:7] + suffix // ".log" or ".json" - fileDir := filepath.Join(baseDir, hook.RepoID) - - err := os.MkdirAll(fileDir, 0755) - - return fileDir, fileName, err -} - -func getJobFile(baseDir string, hook *webhooks.Ref, suffix string) (*os.File, error) { - repoDir, repoFile, err := getJobFilePath(baseDir, hook, suffix) - if nil != err { - //log.Printf("[warn] could not create log directory '%s': %v", repoDir, err) - return nil, err - } - - path := filepath.Join(repoDir, repoFile) - return os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0644) - //return fmt.Sprintf("%s#%s", strings.ReplaceAll(hook.RepoID, "/", "-"), hook.RefName) -} - -func openJobFile(baseDir string, hook *webhooks.Ref, suffix string) (*os.File, error) { - repoDir, repoFile, _ := getJobFilePath(baseDir, hook, suffix) - return os.Open(filepath.Join(repoDir, repoFile)) -} - -func setOutput(logDir string, job *Job) *os.File { - var f *os.File = nil - - defer func() { - // TODO write to append-only log rather than keep in-memory - // (noting that we want to keep Stdout vs Stderr and timing) - cmd := job.Cmd - wout := &outWriter{job: job} - werr := &outWriter{job: job} - if nil != f { - cmd.Stdout = io.MultiWriter(f, wout) - cmd.Stderr = io.MultiWriter(f, werr) - } else { - cmd.Stdout = io.MultiWriter(os.Stdout, wout) - cmd.Stderr = io.MultiWriter(os.Stderr, werr) - } - }() - - if "" == logDir { - return nil - } - - hook := job.GitRef - f, err := getJobFile(logDir, hook, ".log") - if nil != err { - // f.Name() should be the full path - log.Printf("[warn] could not create log file '%s': %v", logDir, err) - return nil - } - - cwd, _ := os.Getwd() - log.Printf("["+hook.RepoID+"#"+hook.RefName+"] logging to '.%s'", f.Name()[len(cwd):]) - return f -} - // Remove kills the job and moves it to recents func remove(activeID webhooks.RefID /*, nokill bool*/) { // Encapsulate the whole transaction @@ -362,21 +551,22 @@ func remove(activeID webhooks.RefID /*, nokill bool*/) { } func updateExitStatus(job *Job) { - if nil == job.Cmd.ProcessState { + if nil == job.cmd.ProcessState { // is not yet finished - if nil != job.Cmd.Process { + if nil != job.cmd.Process { // but definitely was started - if err := job.Cmd.Process.Kill(); nil != err { + if err := job.cmd.Process.Kill(); nil != err { log.Printf("error killing job:\n%v", err) } } } - if nil != job.Cmd.ProcessState { - //*job.ExitCode = job.Cmd.ProcessState.ExitCode() - exitCode := job.Cmd.ProcessState.ExitCode() + if nil != job.cmd.ProcessState { + //*job.ExitCode = job.cmd.ProcessState.ExitCode() + exitCode := job.cmd.ProcessState.ExitCode() job.ExitCode = &exitCode } - job.EndedAt = time.Now() + now := time.Now() + job.EndedAt = &now } func expire(runOpts *options.ServerConfig) { diff --git a/internal/jobs/job_test.go b/internal/jobs/job_test.go index 617d513..5488114 100644 --- a/internal/jobs/job_test.go +++ b/internal/jobs/job_test.go @@ -2,7 +2,6 @@ package jobs import ( "encoding/base64" - "fmt" "io/ioutil" "os" "path/filepath" @@ -90,7 +89,7 @@ func TestDebounce(t *testing.T) { Repo: "repo", }) - t.Log("sleep so job can debounce and start") + //t.Log("sleep so job can debounce and start") time.Sleep(debounceDelay) var jobMatch *Job @@ -98,17 +97,17 @@ func TestDebounce(t *testing.T) { for i := range all { // WARN: lock value copied j := all[i] - fmt.Printf("[TEST] A-Job[%d]: %s\n%#v\n", i, j.GitRef.Timestamp, *j.GitRef) + //t.Logf("[TEST] A-Job[%d]: %s\n%#v\n", i, j.GitRef.Timestamp, *j.GitRef) if t0.Equal(j.GitRef.Timestamp) || (t1.Equal(j.GitRef.Timestamp) && r1 == j.GitRef.Rev) || (t2.Equal(j.GitRef.Timestamp) && r2 == j.GitRef.Rev) { - t.Error(fmt.Errorf("should not find debounced jobs")) + t.Errorf("should not find debounced jobs") t.Fail() return } if t3.Equal(j.GitRef.Timestamp) && r3 == j.GitRef.Rev { if nil != jobMatch { - t.Error(fmt.Errorf("should find only one instance of the 1st long-standing job")) + t.Errorf("should find only one instance of the 1st long-standing job") t.Fail() return } @@ -116,12 +115,12 @@ func TestDebounce(t *testing.T) { } } if nil == jobMatch { - t.Error(fmt.Errorf("should find the 1st long-standing job")) + t.Errorf("should find the 1st long-standing job") t.Fail() return } - t.Log("put another job on the queue while job is running") + //t.Log("put another job on the queue while job is running") // backlog debounce Debounce(webhooks.Ref{ Timestamp: t4, @@ -145,10 +144,10 @@ func TestDebounce(t *testing.T) { Repo: "repo", }) - t.Log("sleep so 1st job can finish") + //t.Log("sleep so 1st job can finish") time.Sleep(jobDelay) time.Sleep(jobDelay) - t.Log("sleep so backlog can debounce") + //t.Log("sleep so backlog can debounce") time.Sleep(debounceDelay) //var j *Job @@ -156,15 +155,15 @@ func TestDebounce(t *testing.T) { all = All() for i := range all { j := all[i] - fmt.Printf("[TEST] B-Job[%d]: %s\n%#v\n", i, j.GitRef.Timestamp, *j.GitRef) + //t.Logf("[TEST] B-Job[%d]: %s\n%#v\n", i, j.GitRef.Timestamp, *j.GitRef) if t4.Equal(j.GitRef.Timestamp) && r4 == j.GitRef.Rev { - t.Error(fmt.Errorf("should not find debounced jobs")) + t.Errorf("should not find debounced jobs") t.Fail() return } if t5.Equal(j.GitRef.Timestamp) && r5 == j.GitRef.Rev { if nil != jobMatch { - t.Error(fmt.Errorf("should find only one instance of the 2nd long-standing job")) + t.Errorf("should find only one instance of the 2nd long-standing job") t.Fail() return } @@ -172,15 +171,15 @@ func TestDebounce(t *testing.T) { } } if nil == jobMatch { - t.Error(fmt.Errorf("should find the 2nd long-standing job: %s %s", t5, r5)) + t.Errorf("should find the 2nd long-standing job: %s %s", t5, r5) t.Fail() return } - t.Log("sleep so 2nd job can finish") + //t.Log("sleep so 2nd job can finish") time.Sleep(jobDelay) - t.Log("sleep to ensure no more backlogs exist") + //t.Log("sleep to ensure no more backlogs exist") time.Sleep(jobDelay) time.Sleep(debounceDelay) time.Sleep(debounceDelay) @@ -226,7 +225,7 @@ func TestRecents(t *testing.T) { } Debounce(hook) - t.Log("sleep so job can debounce and start") + //t.Log("sleep so job can debounce and start") time.Sleep(debounceDelay) time.Sleep(jobDelay) @@ -247,7 +246,7 @@ func TestRecents(t *testing.T) { } if len(j.Logs) < 3 { - t.Errorf("should have logs from test deploy script") + t.Errorf("should have logs from test deploy script: %#v", j.Logs) t.Fail() return } @@ -258,9 +257,9 @@ func TestRecents(t *testing.T) { return } - t.Logf("[DEBUG] Report:\n%#v", j.Report) + //t.Logf("[DEBUG] Report:\n%#v", j.Report) - t.Logf("Logs:\n%v", err) + //t.Logf("Logs:\n%v", err) //Stop() } diff --git a/internal/jobs/logs.go b/internal/jobs/logs.go index 86e4f5e..32c3647 100644 --- a/internal/jobs/logs.go +++ b/internal/jobs/logs.go @@ -62,6 +62,10 @@ func WalkLogs(runOpts *options.ServerConfig) ([]*Job, error) { // don't keep all the logs in memory j.Logs = []Log{} j.ID = string(j.GitRef.GetRevID()) + if nil == j.EndedAt { + now := time.Now() + j.EndedAt = &now + } oldJobs = append(oldJobs, j) } } @@ -75,9 +79,11 @@ func WalkLogs(runOpts *options.ServerConfig) ([]*Job, error) { RefName: rev[1], Rev: rev[2], } + now := time.Now() oldJobs = append(oldJobs, &Job{ - ID: string(hook.GetRevID()), - GitRef: hook, + ID: string(hook.GetRevID()), + GitRef: hook, + EndedAt: &now, }) } } @@ -85,7 +91,7 @@ func WalkLogs(runOpts *options.ServerConfig) ([]*Job, error) { // ExpiredLogAge can be 0 for testing, // even when StaleLogAge is > 0 if age >= runOpts.ExpiredLogAge { - log.Printf("[info] remove log file: %s", logpath) + log.Printf("[gitdeploy] remove %s", logpath) os.Remove(logpath) } @@ -111,11 +117,10 @@ func LoadLogs(runOpts *options.ServerConfig, safeID webhooks.URLSafeGitID) (*Job var f *os.File = nil if value, ok := Actives.Load(refID); ok { j := value.(*Job) - f, err = openJobFile(runOpts.LogDir, j.GitRef, ".json") - if nil != err { - return nil, err - } - } else if value, ok := Recents.Load(revID); ok { + return j, nil + } + + if value, ok := Recents.Load(revID); ok { j := value.(*Job) f, err = openJobFile(runOpts.LogDir, j.GitRef, ".json") if nil != err { diff --git a/internal/jobs/promote.go b/internal/jobs/promote.go index d5df4be..e8bc0c2 100644 --- a/internal/jobs/promote.go +++ b/internal/jobs/promote.go @@ -10,6 +10,15 @@ import ( "git.rootprojects.org/root/gitdeploy/internal/webhooks" ) +// Promotions channel +var Promotions = make(chan Promotion) + +// Promotion is a channel message +type Promotion struct { + PromoteTo string + GitRef *webhooks.Ref +} + // Promote will run the promote script func Promote(msg webhooks.Ref, promoteTo string) { Promotions <- Promotion{ @@ -61,18 +70,32 @@ func promote(hook *webhooks.Ref, promoteTo string, runOpts *options.ServerConfig return } - now := time.Now() + t := time.Now() + now := &t + promoteID := hook.RepoID + "#" + hook.RefName + ".." + promoteTo Actives.Store(jobID1, &Job{ StartedAt: now, - Cmd: cmd, + ID: promoteID, GitRef: hook, - Promote: true, + PromoteTo: promoteTo, + Promote: true, // deprecated + cmd: cmd, }) Actives.Store(jobID2, &Job{ StartedAt: now, - Cmd: cmd, + ID: promoteID, GitRef: hook, - Promote: true, + PromoteTo: promoteTo, + Promote: true, // deprecated + cmd: cmd, + }) + Actives.Store(promoteID, &Job{ + StartedAt: now, + ID: promoteID, + GitRef: hook, + PromoteTo: promoteTo, + Promote: true, // deprecated + cmd: cmd, }) go func() { @@ -80,6 +103,7 @@ func promote(hook *webhooks.Ref, promoteTo string, runOpts *options.ServerConfig _ = cmd.Wait() deathRow <- jobID1 deathRow <- jobID2 + deathRow <- webhooks.RefID(promoteID) log.Printf("gitdeploy promote for %s#%s finished\n", hook.HTTPSURL, hook.RefName) // TODO check for backlog }() diff --git a/main.go b/main.go index 1047511..064449c 100644 --- a/main.go +++ b/main.go @@ -171,10 +171,10 @@ func main() { log.Printf("TEMP_DIR=%s", runOpts.TmpDir) } if 0 == runOpts.DebounceDelay { - runOpts.DebounceDelay = 2 * time.Second + runOpts.DebounceDelay = 5 * time.Second } if 0 == runOpts.StaleJobAge { - runOpts.StaleJobAge = 30 * time.Minute + runOpts.StaleJobAge = 3 * 24 * time.Hour } if 0 == runOpts.StaleLogAge { runOpts.StaleLogAge = 15 * 24 * time.Hour