From 70291c3bce8be6014a53a612822fd5d542a79b0e Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Sun, 21 Feb 2021 04:25:42 -0700 Subject: [PATCH] rewrite to debounce jobs with channels --- examples/dotenv | 3 + internal/api/api.go | 383 +++++------------------ internal/jobs/backlog.go | 231 ++++++++++++++ internal/jobs/hook.go | 1 + internal/jobs/job.go | 366 ++++++++++++++++++++++ internal/jobs/job_test.go | 260 +++++++++++++++ internal/jobs/logs.go | 175 +++++++++++ internal/jobs/promote.go | 86 +++++ internal/jobs/testdata/deploy.sh | 13 + internal/options/options.go | 32 +- internal/webhooks/bitbucket/bitbucket.go | 17 +- internal/webhooks/bitbucket/payload.go | 5 + internal/webhooks/gitea/gitea.go | 13 +- internal/webhooks/gitea/payload.go | 4 +- internal/webhooks/github/github.go | 31 +- internal/webhooks/webhooks.go | 101 +++++- main.go | 27 ++ 17 files changed, 1393 insertions(+), 355 deletions(-) create mode 100644 internal/jobs/backlog.go create mode 100644 internal/jobs/hook.go create mode 100644 internal/jobs/job.go create mode 100644 internal/jobs/job_test.go create mode 100644 internal/jobs/logs.go create mode 100644 internal/jobs/promote.go create mode 100644 internal/jobs/testdata/deploy.sh diff --git a/examples/dotenv b/examples/dotenv index e834b5c..c96ca4e 100644 --- a/examples/dotenv +++ b/examples/dotenv @@ -7,6 +7,9 @@ # List promotions in descending order PROMOTIONS="production staging master" +# Log dir +LOG_DIR=./logs + # Whether to trust X-Forward-* headers TRUST_PROXY=false diff --git a/internal/api/api.go b/internal/api/api.go index d6a0d50..111d809 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -1,17 +1,15 @@ package api import ( - "encoding/base64" "encoding/json" "fmt" - "io/ioutil" "net/http" "os" - "os/exec" "path/filepath" "strings" "time" + "git.rootprojects.org/root/gitdeploy/internal/jobs" "git.rootprojects.org/root/gitdeploy/internal/log" "git.rootprojects.org/root/gitdeploy/internal/options" "git.rootprojects.org/root/gitdeploy/internal/webhooks" @@ -19,40 +17,14 @@ import ( "github.com/go-chi/chi" ) -type job struct { - ID string // {HTTPSURL}#{BRANCH} - Cmd *exec.Cmd - GitRef webhooks.Ref - CreatedAt time.Time -} - -var jobs = make(map[string]*job) -var killers = make(chan string) -var tmpDir string - -// Job is the JSON we send back through the API about jobs -type Job struct { - JobID string `json:"job_id"` - CreatedAt time.Time `json:"created_at"` - GitRef webhooks.Ref `json:"ref"` - Promote bool `json:"promote,omitempty"` -} - -// KillMsg describes which job to kill -type KillMsg struct { - JobID string `json:"job_id"` - Kill bool `json:"kill"` -} - -func init() { - var err error - tmpDir, err = ioutil.TempDir("", "gitdeploy-*") - if nil != err { - fmt.Fprintf(os.Stderr, "could not create temporary directory") - os.Exit(1) - return - } - log.Printf("TEMP_DIR=%s", tmpDir) +// HookResponse is a GitRef but with a little extra as HTTP response +type HookResponse struct { + RepoID string `json:"repo_id"` + CreatedAt time.Time `json:"created_at"` + EndedAt time.Time `json:"ended_at"` + ExitCode *int `json:"exit_code,omitempty"` + Log string `json:"log"` + LogURL string `json:"log_url"` } // ReposResponse is the successful response to /api/repos @@ -71,18 +43,7 @@ type Repo struct { // Route will set up the API and such func Route(r chi.Router, runOpts *options.ServerConfig) { - go func() { - // TODO read from backlog - for { - //hook := webhooks.Accept() - select { - case hook := <-webhooks.Hooks: - runHook(hook, runOpts) - case jobID := <-killers: - remove(jobID, false) - } - } - }() + jobs.Start(runOpts) webhooks.RouteHandlers(r) @@ -141,22 +102,56 @@ func Route(r chi.Router, runOpts *options.ServerConfig) { w.Write(append(b, '\n')) }) - r.Get("/jobs", func(w http.ResponseWriter, r *http.Request) { - // again, possible race condition, but not one that much matters - jjobs := []Job{} - for jobID, job := range jobs { - jjobs = append(jjobs, Job{ - JobID: jobID, - GitRef: job.GitRef, - CreatedAt: job.CreatedAt, - }) + r.Get("/logs/{oldID}", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + oldID := webhooks.URLSafeGitID(chi.URLParam(r, "oldID")) + // TODO add `since` + j, err := jobs.LoadLogs(runOpts, oldID) + if nil != err { + w.WriteHeader(404) + w.Write([]byte( + `{ "success": false, "error": "job log does not exist" }` + "\n", + )) + return } + b, _ := json.Marshal(struct { - Success bool `json:"success"` - Jobs []Job `json:"jobs"` + Success bool `json:"success"` + jobs.Job }{ Success: true, - Jobs: jjobs, + Job: *j, + }) + w.Write(append(b, '\n')) + }) + + /* + r.Get("/logs/*", func(w http.ResponseWriter, r *http.Request) { + // TODO add ?since= + // TODO JSON logs + logPath := chi.URLParam(r, "*") + f, err := os.Open(filepath.Join(os.Getenv("LOG_DIR"), logPath)) + if nil != err { + w.WriteHeader(404) + w.Write([]byte( + `{ "success": false, "error": "job log does not exist" }` + "\n", + )) + return + } + io.Copy(w, f) + }) + */ + + r.Get("/jobs", func(w http.ResponseWriter, r *http.Request) { + all := jobs.All() + + b, _ := json.Marshal(struct { + Success bool `json:"success"` + Jobs []*jobs.Job `json:"jobs"` + }{ + Success: true, + Jobs: all, }) w.Write(append(b, '\n')) }) @@ -167,7 +162,7 @@ func Route(r chi.Router, runOpts *options.ServerConfig) { }() decoder := json.NewDecoder(r.Body) - msg := &KillMsg{} + msg := &jobs.KillMsg{} if err := decoder.Decode(msg); nil != err { log.Printf("kill job invalid json:\n%v", err) http.Error(w, "invalid json body", http.StatusBadRequest) @@ -175,16 +170,24 @@ func Route(r chi.Router, runOpts *options.ServerConfig) { } w.Header().Set("Content-Type", "application/json") - // possible race condition, but not the kind that should matter - if _, exists := jobs[msg.JobID]; !exists { - w.Write([]byte( - `{ "success": false, "error": "job does not exist" }` + "\n", - )) - return + if _, ok := jobs.Actives.Load(webhooks.URLSafeRefID(msg.JobID)); !ok { + if _, ok := jobs.Pending.Load(webhooks.URLSafeRefID(msg.JobID)); !ok { + w.Write([]byte( + `{ "success": false, "error": "job does not exist" }` + "\n", + )) + return + } } // killing a job *should* always succeed ...right? - killers <- msg.JobID + jobs.Remove(webhooks.URLSafeRefID(msg.JobID)) + w.Write([]byte( + `{ "success": true }` + "\n", + )) + }) + + r.Post("/jobs/{jobID}", func(w http.ResponseWriter, r *http.Request) { + // Attach additional logs / reports to running job w.Write([]byte( `{ "success": true }` + "\n", )) @@ -192,8 +195,8 @@ func Route(r chi.Router, runOpts *options.ServerConfig) { r.Post("/promote", func(w http.ResponseWriter, r *http.Request) { decoder := json.NewDecoder(r.Body) - msg := &webhooks.Ref{} - if err := decoder.Decode(msg); nil != err { + msg := webhooks.Ref{} + if err := decoder.Decode(&msg); nil != err { log.Printf("promotion job invalid json:\n%v", err) http.Error(w, "invalid json body", http.StatusBadRequest) return @@ -218,7 +221,7 @@ func Route(r chi.Router, runOpts *options.ServerConfig) { } promoteTo := runOpts.Promotions[n] - runPromote(*msg, promoteTo, runOpts) + jobs.Promote(msg, promoteTo) b, _ := json.Marshal(struct { Success bool `json:"success"` @@ -233,239 +236,3 @@ func Route(r chi.Router, runOpts *options.ServerConfig) { }) } - -func runHook(hook webhooks.Ref, runOpts *options.ServerConfig) { - fmt.Printf("%#v\n", hook) - - jobID := base64.RawURLEncoding.EncodeToString([]byte( - fmt.Sprintf("%s#%s", hook.HTTPSURL, hook.RefName), - )) - repoID := getRepoID(hook.HTTPSURL) - jobName := fmt.Sprintf("%s#%s", strings.ReplaceAll(repoID, "/", "-"), hook.RefName) - - env := os.Environ() - envs := getEnvs(jobID, runOpts.RepoList, hook) - envs = append(envs, "GIT_DEPLOY_JOB_ID="+jobID) - - args := []string{ - runOpts.ScriptsPath + "/deploy.sh", - jobID, - hook.RefName, - hook.RefType, - hook.Owner, - hook.Repo, - hook.HTTPSURL, - } - cmd := exec.Command("bash", args...) - cmd.Env = append(env, envs...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - - if _, exists := jobs[jobID]; exists { - saveBacklog(hook, jobName, jobID) - log.Printf("[runHook] gitdeploy job already started for %s#%s\n", hook.HTTPSURL, hook.RefName) - return - } - - if err := cmd.Start(); nil != err { - log.Printf("gitdeploy exec error: %s\n", err) - return - } - - jobs[jobID] = &job{ - ID: jobID, - Cmd: cmd, - GitRef: hook, - CreatedAt: time.Now(), - } - - go func() { - log.Printf("gitdeploy job for %s#%s started\n", hook.HTTPSURL, hook.RefName) - if err := cmd.Wait(); nil != err { - log.Printf("gitdeploy job for %s#%s exited with error: %v", hook.HTTPSURL, hook.RefName, err) - } else { - log.Printf("gitdeploy job for %s#%s finished\n", hook.HTTPSURL, hook.RefName) - } - remove(jobID, true) - restoreBacklog(jobName, jobID) - }() -} - -func remove(jobID string, nokill bool) { - job, exists := jobs[jobID] - if !exists { - return - } - delete(jobs, jobID) - - if nil != job.Cmd.ProcessState { - // is not yet finished - if nil != job.Cmd.Process { - // but definitely was started - err := job.Cmd.Process.Kill() - log.Printf("error killing job:\n%v", err) - } - } -} - -func runPromote(hook webhooks.Ref, promoteTo string, runOpts *options.ServerConfig) { - // TODO create an origin-branch tag with a timestamp? - jobID1 := base64.RawURLEncoding.EncodeToString([]byte( - fmt.Sprintf("%s#%s", hook.HTTPSURL, hook.RefName), - )) - jobID2 := base64.RawURLEncoding.EncodeToString([]byte( - fmt.Sprintf("%s#%s", hook.HTTPSURL, promoteTo), - )) - - args := []string{ - runOpts.ScriptsPath + "/promote.sh", - jobID1, - promoteTo, - hook.RefName, - hook.RefType, - hook.Owner, - hook.Repo, - hook.HTTPSURL, - } - cmd := exec.Command("bash", args...) - - env := os.Environ() - envs := getEnvs(jobID1, runOpts.RepoList, hook) - envs = append(envs, "GIT_DEPLOY_PROMOTE_TO="+promoteTo) - cmd.Env = append(env, envs...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - - if _, exists := jobs[jobID1]; exists { - // TODO put promote in backlog - log.Printf("[promote] gitdeploy job already started for %s#%s\n", hook.HTTPSURL, hook.RefName) - return - } - if _, exists := jobs[jobID2]; exists { - // TODO put promote in backlog - log.Printf("[promote] gitdeploy job already started for %s#%s\n", hook.HTTPSURL, promoteTo) - return - } - - if err := cmd.Start(); nil != err { - log.Printf("gitdeploy exec error: %s\n", err) - return - } - - jobs[jobID1] = &job{ - ID: jobID2, - Cmd: cmd, - GitRef: hook, - CreatedAt: time.Now(), - } - jobs[jobID2] = &job{ - ID: jobID2, - Cmd: cmd, - GitRef: hook, - CreatedAt: time.Now(), - } - - go func() { - log.Printf("gitdeploy promote for %s#%s started\n", hook.HTTPSURL, hook.RefName) - _ = cmd.Wait() - killers <- jobID1 - killers <- jobID2 - log.Printf("gitdeploy promote for %s#%s finished\n", hook.HTTPSURL, hook.RefName) - // TODO check for backlog - }() -} - -func saveBacklog(hook webhooks.Ref, jobName, jobID string) { - b, _ := json.MarshalIndent(hook, "", " ") - f, err := ioutil.TempFile(tmpDir, "tmp-*") - if nil != err { - log.Printf("[warn] could not create backlog file for %s:\n%v", jobID, err) - return - } - if _, err := f.Write(b); nil != err { - log.Printf("[warn] could not write backlog file for %s:\n%v", jobID, err) - return - } - - jobFile := filepath.Join(tmpDir, jobName) - _ = os.Remove(jobFile) - if err := os.Rename(f.Name(), jobFile); nil != err { - log.Printf("[warn] could not rename file %s => %s:\n%v", f.Name(), jobFile, err) - return - } - log.Printf("[BACKLOG] new backlog job for %s", jobName) -} - -func restoreBacklog(jobName, jobID string) { - jobFile := filepath.Join(tmpDir, jobName) - _ = os.Remove(jobFile + ".cur") - _ = os.Rename(jobFile, jobFile+".cur") - - b, err := ioutil.ReadFile(jobFile + ".cur") - if nil != err { - if !os.IsNotExist(err) { - log.Printf("[warn] could not create backlog file for %s:\n%v", jobID, err) - } - // doesn't exist => no backlog - log.Printf("[NO BACKLOG] no backlog items for %s", jobName) - return - } - - ref := webhooks.Ref{} - if err := json.Unmarshal(b, &ref); nil != err { - log.Printf("[warn] could not parse backlog file for %s:\n%v", jobID, err) - return - } - log.Printf("[BACKLOG] pop backlog for %s", jobName) - webhooks.Hook(ref) -} - -// https://git.example.com/example/project.git -// => git.example.com/example/project -func getRepoID(httpsURL string) string { - repoID := strings.TrimPrefix(httpsURL, "https://") - repoID = strings.TrimPrefix(repoID, "https://") - repoID = strings.TrimSuffix(repoID, ".git") - return repoID -} - -func getEnvs(jobID string, repoList string, hook webhooks.Ref) []string { - repoID := getRepoID(hook.HTTPSURL) - - envs := []string{ - "GIT_DEPLOY_JOB_ID=" + jobID, - "GIT_REF_NAME=" + hook.RefName, - "GIT_REF_TYPE=" + hook.RefType, - "GIT_REPO_ID=" + repoID, - "GIT_REPO_OWNER=" + hook.Owner, - "GIT_REPO_NAME=" + hook.Repo, - "GIT_CLONE_URL=" + hook.HTTPSURL, // deprecated - "GIT_HTTPS_URL=" + hook.HTTPSURL, - "GIT_SSH_URL=" + hook.SSHURL, - } - - // GIT_REPO_TRUSTED - // Set GIT_REPO_TRUSTED=TRUE if the repo matches exactly, or by pattern - repoID = strings.ToLower(repoID) - for _, repo := range strings.Fields(repoList) { - last := len(repo) - 1 - if len(repo) < 0 { - continue - } - repo = strings.ToLower(repo) - if '*' == repo[last] { - // Wildcard match a prefix, for example: - // github.com/whatever/* MATCHES github.com/whatever/foo - // github.com/whatever/ProjectX-* MATCHES github.com/whatever/ProjectX-Foo - if strings.HasPrefix(repoID, repo[:last]) { - envs = append(envs, "GIT_REPO_TRUSTED=true") - break - } - } else if repo == repoID { - envs = append(envs, "GIT_REPO_TRUSTED=true") - break - } - } - - return envs -} diff --git a/internal/jobs/backlog.go b/internal/jobs/backlog.go new file mode 100644 index 0000000..946ec05 --- /dev/null +++ b/internal/jobs/backlog.go @@ -0,0 +1,231 @@ +package jobs + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "time" + + "git.rootprojects.org/root/gitdeploy/internal/log" + "git.rootprojects.org/root/gitdeploy/internal/options" + "git.rootprojects.org/root/gitdeploy/internal/webhooks" +) + +// Debounce puts a job in the queue, in time +func Debounce(hook webhooks.Ref) { + webhooks.Hooks <- hook +} + +var jobsTimersMux sync.Mutex +var debounceTimers = make(map[webhooks.RefID]*time.Timer) + +func debounce(hook *webhooks.Ref, runOpts *options.ServerConfig) { + jobsTimersMux.Lock() + defer jobsTimersMux.Unlock() + + activeID := hook.GetRefID() + if _, ok := Actives.Load(activeID); ok { + log.Printf("Job in progress, not debouncing %s", hook) + return + } + + refID := hook.GetRefID() + timer, ok := debounceTimers[refID] + if ok { + log.Printf("Replacing previous debounce timer for %s", hook) + timer.Stop() + } + // this will not cause a mutual lock because it is async + debounceTimers[refID] = time.AfterFunc(runOpts.DebounceDelay, func() { + //fmt.Println("DEBUG [1] wait for jobs and timers") + jobsTimersMux.Lock() + delete(debounceTimers, refID) + jobsTimersMux.Unlock() + + debounced <- hook + //fmt.Println("DEBUG [1] release jobs and timers") + }) +} + +func getBacklogFilePath(baseDir string, hook *webhooks.Ref) (string, string, error) { + baseDir, _ = filepath.Abs(baseDir) + fileName := hook.RefName + ".json" + fileDir := filepath.Join(baseDir, hook.RepoID) + + err := os.MkdirAll(fileDir, 0755) + + return fileDir, fileName, err +} + +func saveBacklog(hook *webhooks.Ref, runOpts *options.ServerConfig) { + pendingID := hook.GetRefID() + Pending.Store(pendingID, hook) + + repoDir, repoFile, err := getBacklogFilePath(runOpts.TmpDir, hook) + if nil != err { + log.Printf("[WARN] could not create backlog dir %s:\n%v", repoDir, err) + return + } + + f, err := ioutil.TempFile(repoDir, "tmp-*") + if nil != err { + log.Printf("[WARN] could not create backlog file %s:\n%v", f.Name(), err) + return + } + + b, _ := json.MarshalIndent(hook, "", " ") + if _, err := f.Write(b); nil != err { + log.Printf("[WARN] could not write backlog file %s:\n%v", f.Name(), err) + return + } + + replace := false + backlogPath := filepath.Join(repoDir, repoFile) + if _, err := os.Stat(backlogPath); nil == err { + replace = true + _ = os.Remove(backlogPath) + } + if err := os.Rename(f.Name(), backlogPath); nil != err { + log.Printf("[WARN] rename backlog json failed:\n%v", err) + return + } + + if replace { + log.Printf("[backlog] replace backlog for %s", hook.GetRefID()) + } else { + log.Printf("[backlog] create backlog for %s", hook.GetRefID()) + } +} + +func run(curHook *webhooks.Ref, runOpts *options.ServerConfig) { + // because we want to lock the whole transaction all of the state + jobsTimersMux.Lock() + defer jobsTimersMux.Unlock() + + pendingID := curHook.GetRefID() + if _, ok := Actives.Load(pendingID); ok { + log.Printf("Job already in progress: %s", curHook.GetRefID()) + return + } + + var hook *webhooks.Ref + // Legacy, but would be nice to repurpose for resuming on reload + repoDir, repoFile, _ := getBacklogFilePath(runOpts.TmpDir, curHook) + backlogFile := filepath.Join(repoDir, repoFile) + if value, ok := Pending.Load(pendingID); ok { + hook = value.(*webhooks.Ref) + log.Printf("loaded from Pending state: %#v", hook) + } else { + // TODO add mutex (should not affect temp files) + _ = os.Remove(backlogFile + ".cur") + _ = os.Rename(backlogFile, backlogFile+".cur") + b, err := ioutil.ReadFile(backlogFile + ".cur") + if nil != err { + if !os.IsNotExist(err) { + log.Printf("[warn] could not read backlog file %s:\n%v", repoFile, err) + } + // doesn't exist => no backlog + log.Printf("[NO BACKLOG] no backlog for %s", repoFile) + return + } + + hook = &webhooks.Ref{} + if err := json.Unmarshal(b, hook); nil != err { + log.Printf("[warn] could not parse backlog file %s:\n%v", repoFile, err) + return + } + hook = webhooks.New(*hook) + log.Printf("loaded from file: %#v", hook) + } + + Pending.Delete(pendingID) + _ = os.Remove(backlogFile) + _ = os.Remove(backlogFile + ".cur") + + env := os.Environ() + envs := getEnvs(runOpts.Addr, string(pendingID), runOpts.RepoList, hook) + envs = append(envs, "GIT_DEPLOY_JOB_ID="+string(pendingID)) + + scriptPath, _ := filepath.Abs(runOpts.ScriptsPath + "/deploy.sh") + args := []string{ + "-i", + "--", + //strings.Join([]string{ + scriptPath, + string(pendingID), + hook.RefName, + hook.RefType, + hook.Owner, + hook.Repo, + hook.HTTPSURL, + //}, " "), + } + + args2 := append([]string{"[" + string(hook.GetRefID()) + "]", "bash"}, args...) + fmt.Println(strings.Join(args2, " ")) + cmd := exec.Command("bash", args...) + cmd.Env = append(env, envs...) + + now := time.Now() + j := &Job{ + StartedAt: now, + Cmd: cmd, + GitRef: hook, + Logs: []Log{}, + Promote: false, + } + // TODO jobs.New() + // Sets cmd.Stdout and cmd.Stderr + f := setOutput(runOpts.LogDir, j) + + if err := cmd.Start(); nil != err { + log.Printf("gitdeploy exec error: %s\n", err) + return + } + + Actives.Store(pendingID, j) + + go func() { + log.Printf("Started job for %s", hook) + if err := cmd.Wait(); nil != err { + log.Printf("gitdeploy job for %s#%s exited with error: %v", hook.HTTPSURL, hook.RefName, err) + } else { + log.Printf("gitdeploy job for %s#%s finished\n", hook.HTTPSURL, hook.RefName) + } + if nil != f { + _ = f.Close() + } + + // Switch ID to the more specific RevID + j.ID = string(j.GitRef.GetRevID()) + // replace the text log with a json log + if f, err := getJobFile(runOpts.LogDir, j.GitRef, ".json"); nil != err { + // f.Name() should be the full path + log.Printf("[warn] could not create log file '%s': %v", runOpts.LogDir, err) + } else { + enc := json.NewEncoder(f) + enc.SetIndent("", " ") + if err := enc.Encode(j); nil != err { + log.Printf("[warn] could not encode json log '%s': %v", f.Name(), err) + } else { + logdir, logname, _ := getJobFilePath(runOpts.LogDir, j.GitRef, ".log") + _ = os.Remove(filepath.Join(logdir, logname)) + } + _ = f.Close() + log.Printf("[DEBUG] wrote log to %s", f.Name()) + } + j.Logs = []Log{} + + // this will completely clear the finished job + deathRow <- pendingID + + // debounces without saving in the backlog + // TODO move this into deathRow? + debacklog <- hook + }() +} diff --git a/internal/jobs/hook.go b/internal/jobs/hook.go new file mode 100644 index 0000000..415ac45 --- /dev/null +++ b/internal/jobs/hook.go @@ -0,0 +1 @@ +package jobs diff --git a/internal/jobs/job.go b/internal/jobs/job.go new file mode 100644 index 0000000..9858537 --- /dev/null +++ b/internal/jobs/job.go @@ -0,0 +1,366 @@ +package jobs + +import ( + "encoding/base64" + "errors" + "io" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "time" + + "git.rootprojects.org/root/gitdeploy/internal/log" + "git.rootprojects.org/root/gitdeploy/internal/options" + "git.rootprojects.org/root/gitdeploy/internal/webhooks" +) + +var initialized = false +var done = make(chan struct{}) + +// Start starts the job loop, channels, and cleanup routines +func Start(runOpts *options.ServerConfig) { + go Run(runOpts) +} + +// Run starts the job loop and waits for it to be stopped +func Run(runOpts *options.ServerConfig) { + log.Printf("Starting") + if initialized { + panic(errors.New("should not double initialize 'jobs'")) + } + initialized = true + + // TODO load the backlog from disk too + + oldJobs, err := WalkLogs(runOpts) + if nil != err { + panic(err) + } + for i := range oldJobs { + job := oldJobs[i] + job.ID = string(job.GitRef.GetRevID()) + Recents.Store(job.GitRef.GetRevID(), job) + } + + ticker := time.NewTicker(runOpts.StaleJobAge / 2) + for { + select { + case h := <-webhooks.Hooks: + hook := webhooks.New(h) + log.Printf("Saving to backlog and debouncing") + saveBacklog(hook, runOpts) + debounce(hook, runOpts) + case hook := <-debacklog: + log.Printf("Pulling from backlog and debouncing") + debounce(hook, runOpts) + case hook := <-debounced: + log.Printf("Debounced by timer and running") + run(hook, runOpts) + case activeID := <-deathRow: + // should !nokill (so... should kill job on the spot?) + log.Printf("Removing after running exited, or being killed") + remove(activeID /*, false*/) + case promotion := <-Promotions: + log.Printf("Promoting from %s to %s", promotion.GitRef.RefName, promotion.PromoteTo) + promote(webhooks.New(*promotion.GitRef), promotion.PromoteTo, runOpts) + case <-ticker.C: + log.Printf("Running cleanup for expired, exited jobs") + expire(runOpts) + case <-done: + log.Printf("Stopping") + // TODO kill jobs + ticker.Stop() + } + } +} + +// Stop will cancel the job loop and its timers +func Stop() { + done <- struct{}{} + initialized = false +} + +// Promotions channel +var Promotions = make(chan Promotion) + +// Promotion is a channel message +type Promotion struct { + PromoteTo string + GitRef *webhooks.Ref +} + +// Pending is the map of backlog jobs +// map[webhooks.RefID]*webhooks.GitRef +var Pending sync.Map + +// Actives is the map of jobs +// map[webhooks.RefID]*Job +var Actives sync.Map + +// Recents are jobs that are dead, but recent +// map[webhooks.RevID]*Job +var Recents sync.Map + +// deathRow is for jobs to be killed +var deathRow = make(chan webhooks.RefID) + +// debounced is for jobs that are ready to run +var debounced = make(chan *webhooks.Ref) + +// debacklog is for debouncing without saving in the backlog +var debacklog = make(chan *webhooks.Ref) + +// KillMsg describes which job to kill +type KillMsg struct { + JobID string `json:"job_id"` + Kill bool `json:"kill"` +} + +// Job represents a job started by the git webhook +// and also the JSON we send back through the API about jobs +type Job struct { + // normal json + StartedAt time.Time `json:"started_at,omitempty"` // empty when pending + ID string `json:"id"` // could be URLSafeRefID or URLSafeRevID + ExitCode *int `json:"exit_code"` // empty when running + GitRef *webhooks.Ref `json:"ref"` // always present + Promote bool `json:"promote,omitempty"` // empty when deploy and test + EndedAt time.Time `json:"ended_at,omitempty"` // empty when running + // extra + Logs []Log `json:"logs"` // exist when requested + Report Report `json:"report,omitempty"` // empty unless given + Cmd *exec.Cmd `json:"-"` + mux sync.Mutex `json:"-"` +} + +// Report should have many items +type Report struct { + Results []string `json:"results"` +} + +// All returns all jobs, including active, recent, and (TODO) historical +func All() []*Job { + jobsTimersMux.Lock() + defer jobsTimersMux.Unlock() + + jobsCopy := []*Job{} + + Pending.Range(func(key, value interface{}) bool { + hook := value.(*webhooks.Ref) + jobCopy := &Job{ + //StartedAt: job.StartedAt, + ID: string(hook.GetURLSafeRefID()), + GitRef: hook, + //Promote: job.Promote, + //EndedAt: job.EndedAt, + } + jobsCopy = append(jobsCopy, jobCopy) + return true + }) + + Actives.Range(func(key, value interface{}) bool { + job := value.(*Job) + jobCopy := &Job{ + StartedAt: job.StartedAt, + ID: string(job.GitRef.GetURLSafeRefID()), + GitRef: job.GitRef, + Promote: job.Promote, + EndedAt: job.EndedAt, + } + if nil != job.ExitCode { + jobCopy.ExitCode = &(*job.ExitCode) + } + jobsCopy = append(jobsCopy, jobCopy) + return true + }) + + Recents.Range(func(key, value interface{}) bool { + job := value.(*Job) + jobCopy := &Job{ + StartedAt: job.StartedAt, + ID: string(job.GitRef.GetURLSafeRevID()), + GitRef: job.GitRef, + Promote: job.Promote, + EndedAt: job.EndedAt, + } + if nil != job.ExitCode { + jobCopy.ExitCode = &(*job.ExitCode) + } + jobsCopy = append(jobsCopy, jobCopy) + return true + }) + + return jobsCopy +} + +// Remove will put a job on death row +func Remove(gitID webhooks.URLSafeRefID /*, nokill bool*/) { + activeID, err := + base64.RawURLEncoding.DecodeString(string(gitID)) + if nil != err { + log.Printf("bad id: %s", activeID) + return + } + deathRow <- webhooks.RefID(activeID) +} + +func getEnvs(addr, activeID string, repoList string, hook *webhooks.Ref) []string { + + port := strings.Split(addr, ":")[1] + + envs := []string{ + "GIT_DEPLOY_JOB_ID=" + activeID, + "GIT_DEPLOY_TIMESTAMP=" + hook.Timestamp.Format(time.RFC3339), + "GIT_DEPLOY_CALLBACK_URL=" + "http://localhost:" + port + "/api/jobs/" + activeID, + "GIT_REF_NAME=" + hook.RefName, + "GIT_REF_TYPE=" + hook.RefType, + "GIT_REPO_ID=" + hook.RepoID, + "GIT_REPO_OWNER=" + hook.Owner, + "GIT_REPO_NAME=" + hook.Repo, + "GIT_CLONE_URL=" + hook.HTTPSURL, // deprecated + "GIT_HTTPS_URL=" + hook.HTTPSURL, + "GIT_SSH_URL=" + hook.SSHURL, + } + + // GIT_REPO_TRUSTED + // Set GIT_REPO_TRUSTED=TRUE if the repo matches exactly, or by pattern + repoID := strings.ToLower(hook.RepoID) + for _, repo := range strings.Fields(repoList) { + last := len(repo) - 1 + if len(repo) < 0 { + continue + } + repo = strings.ToLower(repo) + if '*' == repo[last] { + // Wildcard match a prefix, for example: + // github.com/whatever/* MATCHES github.com/whatever/foo + // github.com/whatever/ProjectX-* MATCHES github.com/whatever/ProjectX-Foo + if strings.HasPrefix(repoID, repo[:last]) { + envs = append(envs, "GIT_REPO_TRUSTED=true") + break + } + } else if repo == repoID { + envs = append(envs, "GIT_REPO_TRUSTED=true") + break + } + } + + return envs +} + +func getJobFilePath(baseDir string, hook *webhooks.Ref, suffix string) (string, string, error) { + baseDir, _ = filepath.Abs(baseDir) + fileTime := hook.Timestamp.UTC().Format(options.TimeFile) + fileName := fileTime + "." + hook.RefName + "." + hook.Rev[:7] + suffix // ".log" or ".json" + fileDir := filepath.Join(baseDir, hook.RepoID) + + err := os.MkdirAll(fileDir, 0755) + + return fileDir, fileName, err +} + +func getJobFile(baseDir string, hook *webhooks.Ref, suffix string) (*os.File, error) { + repoDir, repoFile, err := getJobFilePath(baseDir, hook, suffix) + if nil != err { + //log.Printf("[warn] could not create log directory '%s': %v", repoDir, err) + return nil, err + } + + path := filepath.Join(repoDir, repoFile) + return os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0644) + //return fmt.Sprintf("%s#%s", strings.ReplaceAll(hook.RepoID, "/", "-"), hook.RefName) +} + +func openJobFile(baseDir string, hook *webhooks.Ref, suffix string) (*os.File, error) { + repoDir, repoFile, _ := getJobFilePath(baseDir, hook, suffix) + return os.Open(filepath.Join(repoDir, repoFile)) +} + +func setOutput(logDir string, job *Job) *os.File { + var f *os.File = nil + + defer func() { + // TODO write to append-only log rather than keep in-memory + // (noting that we want to keep Stdout vs Stderr and timing) + cmd := job.Cmd + wout := &outWriter{job: job} + werr := &outWriter{job: job} + if nil != f { + cmd.Stdout = io.MultiWriter(f, wout) + cmd.Stderr = io.MultiWriter(f, werr) + } else { + cmd.Stdout = io.MultiWriter(os.Stdout, wout) + cmd.Stderr = io.MultiWriter(os.Stderr, werr) + } + }() + + if "" == logDir { + return nil + } + + hook := job.GitRef + f, err := getJobFile(logDir, hook, ".log") + if nil != err { + // f.Name() should be the full path + log.Printf("[warn] could not create log file '%s': %v", logDir, err) + return nil + } + + cwd, _ := os.Getwd() + log.Printf("["+hook.RepoID+"#"+hook.RefName+"] logging to '.%s'", f.Name()[len(cwd):]) + return f +} + +// Remove kills the job and moves it to recents +func remove(activeID webhooks.RefID /*, nokill bool*/) { + // Encapsulate the whole transaction + jobsTimersMux.Lock() + defer jobsTimersMux.Unlock() + + value, ok := Actives.Load(activeID) + if !ok { + return + } + job := value.(*Job) + Actives.Delete(activeID) + + // JSON should have been written to disk by this point + job.Logs = []Log{} + // transition to RevID for non-active, non-pending jobs + job.ID = string(job.GitRef.GetRevID()) + Recents.Store(job.GitRef.GetRevID(), job) + + if nil == job.Cmd.ProcessState { + // is not yet finished + if nil != job.Cmd.Process { + // but definitely was started + err := job.Cmd.Process.Kill() + log.Printf("error killing job:\n%v", err) + } + } + if nil != job.Cmd.ProcessState { + //*job.ExitCode = job.Cmd.ProcessState.ExitCode() + exitCode := job.Cmd.ProcessState.ExitCode() + job.ExitCode = &exitCode + } + job.EndedAt = time.Now() +} + +func expire(runOpts *options.ServerConfig) { + staleJobIDs := []webhooks.URLSafeRevID{} + + Recents.Range(func(key, value interface{}) bool { + revID := key.(webhooks.URLSafeRevID) + age := time.Now().Sub(value.(*Job).GitRef.Timestamp) + if age > runOpts.StaleJobAge { + staleJobIDs = append(staleJobIDs, revID) + } + return true + }) + + for _, revID := range staleJobIDs { + Recents.Delete(revID) + } +} diff --git a/internal/jobs/job_test.go b/internal/jobs/job_test.go new file mode 100644 index 0000000..6980b01 --- /dev/null +++ b/internal/jobs/job_test.go @@ -0,0 +1,260 @@ +package jobs + +import ( + "encoding/base64" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "git.rootprojects.org/root/gitdeploy/internal/options" + "git.rootprojects.org/root/gitdeploy/internal/webhooks" +) + +var debounceDelay time.Duration +var jobDelay time.Duration +var runOpts *options.ServerConfig +var logDir string + +var t0 = time.Now().UTC() + +func init() { + tmpDir, _ := ioutil.TempDir("", "gitdeploy-*") + runOpts = &options.ServerConfig{ + Addr: "localhost:4483", + ScriptsPath: "./testdata", + LogDir: "./test-logs/debounce", + TmpDir: tmpDir, + DebounceDelay: 25 * time.Millisecond, + StaleJobAge: 5 * time.Minute, + StaleLogAge: 5 * time.Minute, + ExpiredLogAge: 10 * time.Minute, + } + logDir, _ = filepath.Abs(runOpts.LogDir) + + os.Setenv("GIT_DEPLOY_TEST_WAIT", "0.1") + debounceDelay = 50 * time.Millisecond + jobDelay = 250 * time.Millisecond + + Start(runOpts) +} + +func TestDebounce(t *testing.T) { + t.Log("TestDebounce Log Dir: " + logDir) + + t1 := t0.Add(-100 * time.Second) + t2 := t0.Add(-90 * time.Second) + t3 := t0.Add(-80 * time.Second) + t4 := t0.Add(-70 * time.Second) + t5 := t0.Add(-60 * time.Second) + + r1 := "abcdef7890" + r2 := "1abcdef789" + r3 := "12abcdef78" + r4 := "123abcdef7" + r5 := "1234abcdef" + + // skip debounce + Debounce(webhooks.Ref{ + Timestamp: t1, + RepoID: "git.example.com/owner/repo", + HTTPSURL: "https://git.example.com/owner/repo.git", + Rev: r1, + RefName: "master", + RefType: "branch", + Owner: "owner", + Repo: "repo", + }) + // skip debounce + Debounce(webhooks.Ref{ + Timestamp: t2, + RepoID: "git.example.com/owner/repo", + HTTPSURL: "https://git.example.com/owner/repo.git", + Rev: r2, + RefName: "master", + RefType: "branch", + Owner: "owner", + Repo: "repo", + }) + // hit + Debounce(webhooks.Ref{ + Timestamp: t3, + RepoID: "git.example.com/owner/repo", + HTTPSURL: "https://git.example.com/owner/repo.git", + Rev: r3, + RefName: "master", + RefType: "branch", + Owner: "owner", + Repo: "repo", + }) + + // TODO make debounce time configurable + t.Log("sleep so job can debounce and start") + time.Sleep(debounceDelay) + + var jobMatch *Job + all := All() + for i := range all { + // WARN: lock value copied + j := all[i] + fmt.Printf("[TEST] A-Job[%d]: %s\n%#v\n", i, j.GitRef.Timestamp, *j.GitRef) + if t0.Equal(j.GitRef.Timestamp) || + (t1.Equal(j.GitRef.Timestamp) && r1 == j.GitRef.Rev) || + (t2.Equal(j.GitRef.Timestamp) && r2 == j.GitRef.Rev) { + t.Error(fmt.Errorf("should not find debounced jobs")) + t.Fail() + return + } + if t3.Equal(j.GitRef.Timestamp) && r3 == j.GitRef.Rev { + if nil != jobMatch { + t.Error(fmt.Errorf("should find only one instance of the 1st long-standing job")) + t.Fail() + return + } + jobMatch = all[i] + } + } + if nil == jobMatch { + t.Error(fmt.Errorf("should find the 1st long-standing job")) + t.Fail() + return + } + + t.Log("put another job on the queue while job is running") + // backlog debounce + Debounce(webhooks.Ref{ + Timestamp: t4, + RepoID: "git.example.com/owner/repo", + HTTPSURL: "https://git.example.com/owner/repo.git", + Rev: r4, + RefName: "master", + RefType: "branch", + Owner: "owner", + Repo: "repo", + }) + // backlog hit + Debounce(webhooks.Ref{ + Timestamp: t5, + RepoID: "git.example.com/owner/repo", + HTTPSURL: "https://git.example.com/owner/repo.git", + Rev: r5, + RefName: "master", + RefType: "branch", + Owner: "owner", + Repo: "repo", + }) + + t.Log("sleep so 1st job can finish") + time.Sleep(jobDelay) + time.Sleep(jobDelay) + t.Log("sleep so backlog can debounce") + time.Sleep(debounceDelay) + + //var j *Job + jobMatch = nil + all = All() + for i := range all { + j := all[i] + fmt.Printf("[TEST] B-Job[%d]: %s\n%#v\n", i, j.GitRef.Timestamp, *j.GitRef) + if t4.Equal(j.GitRef.Timestamp) && r4 == j.GitRef.Rev { + t.Error(fmt.Errorf("should not find debounced jobs")) + t.Fail() + return + } + if t5.Equal(j.GitRef.Timestamp) && r5 == j.GitRef.Rev { + if nil != jobMatch { + t.Error(fmt.Errorf("should find only one instance of the 2nd long-standing job")) + t.Fail() + return + } + jobMatch = all[i] + } + } + if nil == jobMatch { + t.Error(fmt.Errorf("should find the 2nd long-standing job: %s %s", t5, r5)) + t.Fail() + return + } + + t.Log("sleep so 2nd job can finish") + time.Sleep(jobDelay) + + t.Log("sleep to ensure no more backlogs exist") + time.Sleep(jobDelay) + time.Sleep(debounceDelay) + time.Sleep(debounceDelay) + + //Stop() +} + +func TestRecents(t *testing.T) { + /* + tmpDir, _ := ioutil.TempDir("", "gitdeploy-*") + runOpts = &options.ServerConfig{ + Addr: "localhost:4483", + ScriptsPath: "./testdata", + LogDir: "./test-logs/recents", + TmpDir: tmpDir, + DebounceDelay: 1 * time.Millisecond, + StaleJobAge: 5 * time.Minute, + StaleLogAge: 5 * time.Minute, + ExpiredLogAge: 10 * time.Minute, + } + logDir, _ = filepath.Abs(runOpts.LogDir) + + os.Setenv("GIT_DEPLOY_TEST_WAIT", "0.01") + debounceDelay := 50 * time.Millisecond + jobDelay := 250 * time.Millisecond + */ + + //Start(runOpts) + + t6 := t0.Add(-50 * time.Second) + r6 := "12345abcde" + + // skip debounce + hook := webhooks.Ref{ + Timestamp: t6, + RepoID: "git.example.com/owner/repo", + HTTPSURL: "https://git.example.com/owner/repo.git", + Rev: r6, + RefName: "master", + RefType: "branch", + Owner: "owner", + Repo: "repo", + } + Debounce(hook) + + // TODO make debounce time configurable + t.Log("sleep so job can debounce and start") + time.Sleep(debounceDelay) + time.Sleep(jobDelay) + + urlRefID := webhooks.URLSafeGitID( + base64.RawURLEncoding.EncodeToString([]byte(hook.GetRefID())), + ) + j, err := LoadLogs(runOpts, urlRefID) + if nil != err { + urlRevID := webhooks.URLSafeGitID( + base64.RawURLEncoding.EncodeToString([]byte(hook.GetRevID())), + ) + j, err = LoadLogs(runOpts, urlRevID) + if nil != err { + t.Errorf("error loading logs: %v", err) + return + } + return + } + + if len(j.Logs) < 3 { + t.Errorf("should have logs from test deploy script") + t.Fail() + return + } + + t.Logf("Logs:\n%v", err) + + //Stop() +} diff --git a/internal/jobs/logs.go b/internal/jobs/logs.go new file mode 100644 index 0000000..0d756ef --- /dev/null +++ b/internal/jobs/logs.go @@ -0,0 +1,175 @@ +package jobs + +import ( + "encoding/base64" + "encoding/json" + "errors" + "io/fs" + "os" + "path/filepath" + "strings" + "time" + + "git.rootprojects.org/root/gitdeploy/internal/log" + "git.rootprojects.org/root/gitdeploy/internal/options" + "git.rootprojects.org/root/gitdeploy/internal/webhooks" +) + +// WalkLogs creates partial webhooks.Refs from walking the log dir +func WalkLogs(runOpts *options.ServerConfig) ([]*Job, error) { + oldJobs := []*Job{} + if 0 == len(runOpts.LogDir) { + return oldJobs, nil + } + + now := time.Now() + pathLen := len(runOpts.LogDir + "/") + err := filepath.WalkDir(runOpts.LogDir, func(logpath string, d fs.DirEntry, err error) error { + if nil != err { + log.Printf("failed to walk log dir: %v", err) + return nil + } + if !d.Type().IsRegular() || '.' == logpath[0] || '_' == logpath[0] || '~' == logpath[0] { + return nil + } + + rel := logpath[pathLen:] + paths := strings.Split(rel, "/") + repoID := strings.Join(paths[:len(paths)-1], "/") + repoName := paths[len(paths)-2] + var repoOwner string + //repoHost := paths[0] + if len(paths) >= 4 { + repoOwner = paths[len(paths)-3] + } + logname := paths[len(paths)-1] + + rev := strings.Split(logname, ".") + if 4 != len(rev) { + return nil + } + + ts, _ := time.ParseInLocation(options.TimeFile, rev[0], time.UTC) + age := now.Sub(ts) + if age <= runOpts.StaleLogAge { + if "json" == rev[3] { + if f, err := os.Open(logpath); nil != err { + log.Printf("[warn] failed to read log dir") + } else { + dec := json.NewDecoder(f) + j := &Job{} + if err := dec.Decode(j); nil == err { + // don't keep all the logs in memory + j.Logs = []Log{} + j.ID = string(j.GitRef.GetRevID()) + oldJobs = append(oldJobs, j) + } + } + } else { + hook := &webhooks.Ref{ + HTTPSURL: "//" + repoID + ".git", + RepoID: repoID, + Owner: repoOwner, + Repo: repoName, + Timestamp: ts, + RefName: rev[1], + Rev: rev[2], + } + oldJobs = append(oldJobs, &Job{ + ID: string(hook.GetRevID()), + GitRef: hook, + }) + } + } + + // ExpiredLogAge can be 0 for testing, + // even when StaleLogAge is > 0 + if age >= runOpts.ExpiredLogAge { + log.Printf("[DEBUG] remove log file: %s", logpath) + os.Remove(logpath) + } + + return nil + }) + + return oldJobs, err +} + +// LoadLogs will log logs for a job +func LoadLogs(runOpts *options.ServerConfig, safeID webhooks.URLSafeGitID) (*Job, error) { + b, err := base64.RawURLEncoding.DecodeString(string(safeID)) + if nil != err { + return nil, err + } + + gitID := string(b) + refID := webhooks.RefID(gitID) + revID := webhooks.RevID(gitID) + + var f *os.File = nil + if value, ok := Actives.Load(refID); ok { + j := value.(*Job) + f, err = openJobFile(runOpts.LogDir, j.GitRef, ".json") + if nil != err { + return nil, err + } + } else if value, ok := Recents.Load(revID); ok { + j := value.(*Job) + f, err = openJobFile(runOpts.LogDir, j.GitRef, ".json") + if nil != err { + return nil, err + } + } + + if nil == f { + return nil, errors.New("no job found") + } + dec := json.NewDecoder(f) + j := &Job{} + if err := dec.Decode(j); nil != err { + log.Printf("[DEBUG] decode error: %v", err) + return nil, errors.New("couldn't read log file") + } + j.ID = string(gitID) + + return j, nil +} + +// Log is a log message +type Log struct { + Timestamp time.Time `json:"timestamp"` + Stderr bool `json:"stderr"` + Text string `json:"text"` +} + +type outWriter struct { + //io.Writer + job *Job +} + +func (w outWriter) Write(b []byte) (int, error) { + w.job.mux.Lock() + w.job.Logs = append(w.job.Logs, Log{ + Timestamp: time.Now().UTC(), + Stderr: false, + Text: string(b), + }) + w.job.mux.Unlock() + return len(b), nil +} + +type errWriter struct { + //io.Writer + job *Job +} + +func (w errWriter) Write(b []byte) (int, error) { + w.job.mux.Lock() + w.job.Logs = append(w.job.Logs, Log{ + Timestamp: time.Now().UTC(), + Stderr: true, + Text: string(b), + }) + w.job.mux.Unlock() + return len(b), nil +} diff --git a/internal/jobs/promote.go b/internal/jobs/promote.go new file mode 100644 index 0000000..d5df4be --- /dev/null +++ b/internal/jobs/promote.go @@ -0,0 +1,86 @@ +package jobs + +import ( + "os" + "os/exec" + "time" + + "git.rootprojects.org/root/gitdeploy/internal/log" + "git.rootprojects.org/root/gitdeploy/internal/options" + "git.rootprojects.org/root/gitdeploy/internal/webhooks" +) + +// Promote will run the promote script +func Promote(msg webhooks.Ref, promoteTo string) { + Promotions <- Promotion{ + PromoteTo: promoteTo, + GitRef: &msg, + } +} + +// promote will run the promote script +func promote(hook *webhooks.Ref, promoteTo string, runOpts *options.ServerConfig) { + // TODO create an origin-branch tag with a timestamp? + jobID1 := hook.GetRefID() + hookTo := *hook + hookTo.RefName = promoteTo + jobID2 := hookTo.GetRefID() + + args := []string{ + runOpts.ScriptsPath + "/promote.sh", + string(jobID1), + promoteTo, + hook.RefName, + hook.RefType, + hook.Owner, + hook.Repo, + hook.HTTPSURL, + } + cmd := exec.Command("bash", args...) + + env := os.Environ() + envs := getEnvs(runOpts.Addr, string(jobID1), runOpts.RepoList, hook) + envs = append(envs, "GIT_DEPLOY_PROMOTE_TO="+promoteTo) + cmd.Env = append(env, envs...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + if _, ok := Actives.Load(jobID1); ok { + // TODO put promote in backlog + log.Printf("[promote] gitdeploy job already started for %s#%s\n", hook.HTTPSURL, hook.RefName) + return + } + if _, ok := Actives.Load(jobID2); ok { + // TODO put promote in backlog + log.Printf("[promote] gitdeploy job already started for %s#%s\n", hook.HTTPSURL, promoteTo) + return + } + + if err := cmd.Start(); nil != err { + log.Printf("gitdeploy exec error: %s\n", err) + return + } + + now := time.Now() + Actives.Store(jobID1, &Job{ + StartedAt: now, + Cmd: cmd, + GitRef: hook, + Promote: true, + }) + Actives.Store(jobID2, &Job{ + StartedAt: now, + Cmd: cmd, + GitRef: hook, + Promote: true, + }) + + go func() { + log.Printf("gitdeploy promote for %s#%s started\n", hook.HTTPSURL, hook.RefName) + _ = cmd.Wait() + deathRow <- jobID1 + deathRow <- jobID2 + log.Printf("gitdeploy promote for %s#%s finished\n", hook.HTTPSURL, hook.RefName) + // TODO check for backlog + }() +} diff --git a/internal/jobs/testdata/deploy.sh b/internal/jobs/testdata/deploy.sh new file mode 100644 index 0000000..36cc51c --- /dev/null +++ b/internal/jobs/testdata/deploy.sh @@ -0,0 +1,13 @@ +#!/bin/bash +set -e +set -u +set -x + +echo "[${GIT_REPO_ID:-}#${GIT_REF_NAME:-}] Started at ${GIT_DEPLOY_TIMESTAMP:-}" +sleep ${GIT_DEPLOY_TEST_WAIT:-0.1} +echo "[${GIT_REPO_ID:-}#${GIT_REF_NAME:-}] Finished" + +# TODO start/end? duration? +#curl -X POST "${GIT_DEPLOY_CALLBACK_URL}" -d ' +# { "report": [ { "name":"sleep", "code":"PASS", "message":"", "details":"" } ] } +#' diff --git a/internal/options/options.go b/internal/options/options.go index 1cf5ed7..ee44c48 100644 --- a/internal/options/options.go +++ b/internal/options/options.go @@ -2,24 +2,42 @@ package options import ( "flag" + "time" ) +// Server is an instance of the config var Server *ServerConfig +// ServerConfig is an options struct type ServerConfig struct { - Addr string - TrustProxy bool - RepoList string - Compress bool - ServePath string - ScriptsPath string - Promotions []string + Addr string + TrustProxy bool + RepoList string + Compress bool + ServePath string + ScriptsPath string + Promotions []string + LogDir string // where the job logs should go + TmpDir string // where the backlog files go + DebounceDelay time.Duration + StaleJobAge time.Duration // how old a dead job is before it's stale + StaleLogAge time.Duration + ExpiredLogAge time.Duration + // TODO use BacklogDir instead? } +// ServerFlags are the flags the web server can use var ServerFlags *flag.FlagSet + +// InitFlags are the flags for the main binary itself var InitFlags *flag.FlagSet + +// DefaultMaxBodySize is for the web server input var DefaultMaxBodySize int64 = 1024 * 1024 +// TimeFile is a time format like RFC3339, but filename-friendly +const TimeFile = "2006-01-02_15-04-05" + func init() { Server = &ServerConfig{} ServerFlags = flag.NewFlagSet("run", flag.ExitOnError) diff --git a/internal/webhooks/bitbucket/bitbucket.go b/internal/webhooks/bitbucket/bitbucket.go index a3bdc01..da7ff35 100644 --- a/internal/webhooks/bitbucket/bitbucket.go +++ b/internal/webhooks/bitbucket/bitbucket.go @@ -91,8 +91,8 @@ func InitWebhook(providername string, secretList *string, envname string) func() return } - var branch string - var tag string + //var branch string + //var tag string var ref string n := len(info.Push.Changes) @@ -108,10 +108,10 @@ func InitWebhook(providername string, secretList *string, envname string) func() refType := info.Push.Changes[0].New.Type switch refType { case "tag": - tag = refName + //tag = refName ref = fmt.Sprintf("refs/tags/%s", refName) case "branch": - branch = refName + //branch = refName ref = fmt.Sprintf("refs/heads/%s", refName) default: log.Printf("unexpected bitbucket RefType %s\n", refType) @@ -121,10 +121,10 @@ func InitWebhook(providername string, secretList *string, envname string) func() switch refType { case "tags": refType = "tag" - tag = refName + //tag = refName case "heads": refType = "branch" - branch = refName + //branch = refName } var rev string @@ -135,15 +135,16 @@ func InitWebhook(providername string, secretList *string, envname string) func() } webhooks.Hook(webhooks.Ref{ + // appears to be missing timestamp HTTPSURL: info.Repository.Links.HTML.Href, Rev: rev, Ref: ref, RefType: refType, RefName: refName, - Branch: branch, - Tag: tag, Repo: info.Repository.Name, Owner: info.Repository.Workspace.Slug, + //Branch: branch, + //Tag: tag, }) }) }) diff --git a/internal/webhooks/bitbucket/payload.go b/internal/webhooks/bitbucket/payload.go index 6e601d7..4d90af3 100644 --- a/internal/webhooks/bitbucket/payload.go +++ b/internal/webhooks/bitbucket/payload.go @@ -4,13 +4,16 @@ import "time" // Thank you Matt! // See https://mholt.github.io/json-to-go/ +// See `repo:push payload` on https://support.atlassian.com/bitbucket-cloud/docs/event-payloads/ +// Webhook is a smaller version of type Webhook struct { Push Push `json:"push"` Actor Actor `json:"actor"` Repository Repository `json:"repository"` } +// Push is the bitbucket webhook type Push struct { Changes []struct { Forced bool `json:"forced"` @@ -191,6 +194,7 @@ type Push struct { } `json:"changes"` } +// Actor represents the user / account taking action type Actor struct { DisplayName string `json:"display_name"` UUID string `json:"uuid"` @@ -199,6 +203,7 @@ type Actor struct { AccountID string `json:"account_id"` } +// Repository represents repo info type Repository struct { Name string `json:"name"` Scm string `json:"scm"` diff --git a/internal/webhooks/gitea/gitea.go b/internal/webhooks/gitea/gitea.go index 2d64732..3945c4e 100644 --- a/internal/webhooks/gitea/gitea.go +++ b/internal/webhooks/gitea/gitea.go @@ -72,8 +72,8 @@ func InitWebhook(providername string, secretList *string, envname string) func() return } - var tag string - var branch string + //var tag string + //var branch string ref := info.Ref // refs/heads/master parts := strings.Split(ref, "/") refType := parts[1] // refs/[heads]/master @@ -82,25 +82,26 @@ func InitWebhook(providername string, secretList *string, envname string) func() switch refType { case "tags": refType = "tag" - tag = refName + //tag = refName case "heads": refType = "branch" - branch = refName + //branch = refName default: refType = "unknown" } webhooks.Hook(webhooks.Ref{ + // missing Timestamp HTTPSURL: info.Repository.CloneURL, SSHURL: info.Repository.SSHURL, Rev: info.After, Ref: ref, RefType: refType, RefName: refName, - Branch: branch, - Tag: tag, Repo: info.Repository.Name, Owner: info.Repository.Owner.Login, + //Branch: branch, + //Tag: tag, }) }) }) diff --git a/internal/webhooks/gitea/payload.go b/internal/webhooks/gitea/payload.go index d8c8379..eb9b2cb 100644 --- a/internal/webhooks/gitea/payload.go +++ b/internal/webhooks/gitea/payload.go @@ -6,8 +6,8 @@ package gitea // repository.full_name // repository.clone_url -// See https://docs.gitea.io/en-us/webhooks/ -// and https://mholt.github.io/json-to-go/ +// Webhook mirrors https://docs.gitea.io/en-us/webhooks/. +// Created in part with https://mholt.github.io/json-to-go/. type Webhook struct { Secret string `json:"secret"` Ref string `json:"ref"` diff --git a/internal/webhooks/github/github.go b/internal/webhooks/github/github.go index ee37b62..1f2b6c3 100644 --- a/internal/webhooks/github/github.go +++ b/internal/webhooks/github/github.go @@ -26,6 +26,7 @@ func init() { webhooks.AddProvider("github", InitWebhook("github", &githubSecrets, "GITHUB_SECRET")) } +// InitWebhook initializes the webhook when registered func InitWebhook(providername string, secretList *string, envname string) func() { return func() { secrets := webhooks.ParseSecrets(providername, *secretList, envname) @@ -68,8 +69,8 @@ func InitWebhook(providername string, secretList *string, envname string) func() switch e := event.(type) { case *github.PushEvent: - var branch string - var tag string + //var branch string + //var tag string ref := e.GetRef() // *e.Ref parts := strings.Split(ref, "/") @@ -79,22 +80,24 @@ func InitWebhook(providername string, secretList *string, envname string) func() switch refType { case "tags": refType = "tag" - tag = refName + //tag = refName case "heads": refType = "branch" - branch = refName + //branch = refName } + webhooks.Hook(webhooks.Ref{ - HTTPSURL: e.GetRepo().GetCloneURL(), - SSHURL: e.GetRepo().GetSSHURL(), - Rev: e.GetAfter(), // *e.After - Ref: ref, - RefType: refType, - RefName: refName, - Branch: branch, - Tag: tag, - Repo: e.GetRepo().GetName(), // *e.Repo.Name - Owner: e.GetRepo().GetOwner().GetLogin(), + Timestamp: e.GetRepo().GetPushedAt().Time, + HTTPSURL: e.GetRepo().GetCloneURL(), + SSHURL: e.GetRepo().GetSSHURL(), + Rev: e.GetAfter(), // *e.After + Ref: ref, + RefType: refType, + RefName: refName, + Repo: e.GetRepo().GetName(), // *e.Repo.Name + Owner: e.GetRepo().GetOwner().GetLogin(), + //Branch: branch, + //Tag: tag, }) /* case *github.PullRequestEvent: diff --git a/internal/webhooks/webhooks.go b/internal/webhooks/webhooks.go index 2b009db..415aa5e 100644 --- a/internal/webhooks/webhooks.go +++ b/internal/webhooks/webhooks.go @@ -1,8 +1,10 @@ package webhooks import ( + "encoding/base64" "os" "strings" + "time" "github.com/go-chi/chi" ) @@ -16,16 +18,78 @@ import ( // Repo ex: example // Org ex: example type Ref struct { - HTTPSURL string `json:"https_url"` - SSHURL string `json:"ssh_url"` - Rev string `json:"rev"` - Ref string `json:"ref"` // refs/tags/v0.0.1, refs/heads/master - RefType string `json:"ref_type"` // tag, branch - RefName string `json:"ref_name"` - Branch string `json:"branch"` - Tag string `json:"tag"` - Owner string `json:"repo_owner"` - Repo string `json:"repo_name"` + RepoID string `json:"repo_id"` + Timestamp time.Time `json:"timestamp"` + HTTPSURL string `json:"https_url"` + SSHURL string `json:"ssh_url"` + Rev string `json:"rev"` + Ref string `json:"ref"` // refs/tags/v0.0.1, refs/heads/master + RefType string `json:"ref_type"` // tag, branch + RefName string `json:"ref_name"` + Owner string `json:"repo_owner"` + Repo string `json:"repo_name"` + //Branch string `json:"branch"` // deprecated + //Tag string `json:"tag"` // deprecated +} + +// RefID is a newtype string +type RefID string + +// URLSafeRefID is a newtype string +type URLSafeRefID string + +// RevID is a newtype string +type RevID string + +// URLSafeRevID is a newtype string +type URLSafeRevID string + +// URLSafeGitID is a newtype string +type URLSafeGitID string + +// New returns a normalized Ref (Git reference) +func New(r Ref) *Ref { + if len(r.HTTPSURL) > 0 { + r.RepoID = getRepoID(r.HTTPSURL) + } else /*if len(r.SSHURL) > 0*/ { + r.RepoID = getRepoID(r.SSHURL) + } + r.Timestamp = getTimestamp(r.Timestamp) + + return &r +} + +// String prints object as git.example.com#branch@rev +func (h *Ref) String() string { + return string(h.GetRefID()) + "@" + h.Rev[:7] +} + +// GetRefID returns a unique reference like "github.com/org/project#branch" +func (h *Ref) GetRefID() RefID { + return RefID(h.RepoID + "#" + h.RefName) +} + +// GetURLSafeRefID returns the URL-safe Base64 encoding of the RefID +func (h *Ref) GetURLSafeRefID() URLSafeRefID { + return URLSafeRefID( + base64.RawURLEncoding.EncodeToString( + []byte(h.GetRefID()), + ), + ) +} + +// GetRevID returns a unique reference like "github.com/org/project#abcd7890" +func (h *Ref) GetRevID() RevID { + return RevID(h.RepoID + "#" + h.Rev) +} + +// GetURLSafeRevID returns the URL-safe Base64 encoding of the RevID +func (h *Ref) GetURLSafeRevID() URLSafeRevID { + return URLSafeRevID( + base64.RawURLEncoding.EncodeToString( + []byte(h.GetRevID()), + ), + ) } // Providers is a map of the git webhook providers @@ -93,3 +157,20 @@ func ParseSecrets(providername, secretList, envname string) [][]byte { return secrets } + +// https://git.example.com/example/project.git +// => git.example.com/example/project +func getRepoID(url string) string { + repoID := strings.TrimPrefix(url, "https://") + repoID = strings.TrimPrefix(repoID, "http://") + repoID = strings.TrimPrefix(repoID, "ssh://") + repoID = strings.TrimSuffix(repoID, ".git") + return repoID +} + +func getTimestamp(t time.Time) time.Time { + if t.IsZero() { + t = time.Now().UTC() + } + return t +} diff --git a/main.go b/main.go index b2997af..1047511 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "encoding/json" "flag" "fmt" + "io/ioutil" "net/http" "os" "path/filepath" @@ -156,6 +157,32 @@ func main() { if 0 == len(runOpts.RepoList) { runOpts.RepoList = os.Getenv("TRUST_REPOS") } + if 0 == len(runOpts.LogDir) { + runOpts.LogDir = os.Getenv("LOG_DIR") + } + if 0 == len(runOpts.TmpDir) { + var err error + runOpts.TmpDir, err = ioutil.TempDir("", "gitdeploy-*") + if nil != err { + fmt.Fprintf(os.Stderr, "could not create temporary directory") + os.Exit(1) + return + } + log.Printf("TEMP_DIR=%s", runOpts.TmpDir) + } + if 0 == runOpts.DebounceDelay { + runOpts.DebounceDelay = 2 * time.Second + } + if 0 == runOpts.StaleJobAge { + runOpts.StaleJobAge = 30 * time.Minute + } + if 0 == runOpts.StaleLogAge { + runOpts.StaleLogAge = 15 * 24 * time.Hour + } + if 0 == runOpts.ExpiredLogAge { + runOpts.ExpiredLogAge = 90 * 24 * time.Hour + } + if len(runOpts.RepoList) > 0 { runOpts.RepoList = strings.ReplaceAll(runOpts.RepoList, ",", " ") runOpts.RepoList = strings.ReplaceAll(runOpts.RepoList, " ", " ")