limit jobs and logs with '?since=0.0'
This commit is contained in:
parent
fe4b72eeb6
commit
e7b1ceaf14
|
@ -164,7 +164,7 @@ GIT_REPO_TRUSTED=true
|
||||||
## API
|
## API
|
||||||
|
|
||||||
```txt
|
```txt
|
||||||
GET /api/admin/jobs
|
GET /api/admin/jobs?since=1577881845.999
|
||||||
|
|
||||||
{
|
{
|
||||||
"success": true,
|
"success": true,
|
||||||
|
@ -197,7 +197,7 @@ POST /api/admin/jobs
|
||||||
|
|
||||||
{ "success": true }
|
{ "success": true }
|
||||||
|
|
||||||
GET /api/admin/logs/{job_id}
|
GET /api/admin/logs/{job_id}?since=1577881845.999
|
||||||
|
|
||||||
{
|
{
|
||||||
"success": true,
|
"success": true,
|
||||||
|
|
|
@ -3,10 +3,13 @@ package api
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.rootprojects.org/root/gitdeploy/internal/jobs"
|
"git.rootprojects.org/root/gitdeploy/internal/jobs"
|
||||||
"git.rootprojects.org/root/gitdeploy/internal/log"
|
"git.rootprojects.org/root/gitdeploy/internal/log"
|
||||||
|
@ -49,6 +52,8 @@ func RouteStopped(r chi.Router, runOpts *options.ServerConfig) {
|
||||||
|
|
||||||
r.Route("/admin", func(r chi.Router) {
|
r.Route("/admin", func(r chi.Router) {
|
||||||
r.Get("/repos", func(w http.ResponseWriter, r *http.Request) {
|
r.Get("/repos", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
||||||
repos := []Repo{}
|
repos := []Repo{}
|
||||||
|
|
||||||
for _, id := range strings.Fields(runOpts.RepoList) {
|
for _, id := range strings.Fields(runOpts.RepoList) {
|
||||||
|
@ -87,33 +92,48 @@ func RouteStopped(r chi.Router, runOpts *options.ServerConfig) {
|
||||||
Success: true,
|
Success: true,
|
||||||
Repos: repos,
|
Repos: repos,
|
||||||
}, "", " ")
|
}, "", " ")
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
w.Write(append(b, '\n'))
|
w.Write(append(b, '\n'))
|
||||||
})
|
})
|
||||||
|
|
||||||
r.Get("/logs/{oldID}", func(w http.ResponseWriter, r *http.Request) {
|
r.Get("/logs/{oldID}", func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
||||||
// TODO admin auth middleware
|
since, httpErr := ParseSince(r.URL.Query().Get("since"))
|
||||||
log.Printf("[TODO] handle AUTH (logs could be sensitive)")
|
if nil != httpErr {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
writeError(w, httpErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
oldID := webhooks.URLSafeGitID(chi.URLParam(r, "oldID"))
|
oldID := webhooks.URLSafeGitID(chi.URLParam(r, "oldID"))
|
||||||
// TODO add `since`
|
// TODO add `since`
|
||||||
j, err := jobs.LoadLogs(runOpts, oldID)
|
j, err := jobs.LoadLogs(runOpts, oldID)
|
||||||
if nil != err {
|
if nil != err {
|
||||||
w.WriteHeader(404)
|
w.WriteHeader(http.StatusNotFound)
|
||||||
w.Write([]byte(
|
w.Write([]byte(
|
||||||
`{ "success": false, "error": "job log does not exist" }` + "\n",
|
`{ "success": false, "error": "job log does not exist" }` + "\n",
|
||||||
))
|
))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
jobCopy := *j
|
||||||
|
logs := []jobs.Log{}
|
||||||
|
for _, log := range j.Logs {
|
||||||
|
if log.Timestamp.Sub(since) > 0 {
|
||||||
|
logs = append(logs, log)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
jobCopy.Logs = logs
|
||||||
|
|
||||||
|
// TODO admin auth middleware
|
||||||
|
log.Printf("[TODO] handle AUTH (logs could be sensitive)")
|
||||||
|
|
||||||
b, _ := json.MarshalIndent(struct {
|
b, _ := json.MarshalIndent(struct {
|
||||||
Success bool `json:"success"`
|
Success bool `json:"success"`
|
||||||
jobs.Job
|
jobs.Job
|
||||||
}{
|
}{
|
||||||
Success: true,
|
Success: true,
|
||||||
Job: *j,
|
Job: jobCopy,
|
||||||
}, "", " ")
|
}, "", " ")
|
||||||
w.Write(append(b, '\n'))
|
w.Write(append(b, '\n'))
|
||||||
})
|
})
|
||||||
|
@ -136,8 +156,16 @@ func RouteStopped(r chi.Router, runOpts *options.ServerConfig) {
|
||||||
*/
|
*/
|
||||||
|
|
||||||
r.Get("/jobs", func(w http.ResponseWriter, r *http.Request) {
|
r.Get("/jobs", func(w http.ResponseWriter, r *http.Request) {
|
||||||
all := jobs.All()
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
||||||
|
since, httpErr := ParseSince(r.URL.Query().Get("since"))
|
||||||
|
if nil != httpErr {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
writeError(w, httpErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
all := jobs.All(since)
|
||||||
b, _ := json.Marshal(struct {
|
b, _ := json.Marshal(struct {
|
||||||
Success bool `json:"success"`
|
Success bool `json:"success"`
|
||||||
Jobs []*jobs.Job `json:"jobs"`
|
Jobs []*jobs.Job `json:"jobs"`
|
||||||
|
@ -149,6 +177,8 @@ func RouteStopped(r chi.Router, runOpts *options.ServerConfig) {
|
||||||
})
|
})
|
||||||
|
|
||||||
r.Post("/jobs", func(w http.ResponseWriter, r *http.Request) {
|
r.Post("/jobs", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
||||||
decoder := json.NewDecoder(r.Body)
|
decoder := json.NewDecoder(r.Body)
|
||||||
msg := &KillMsg{}
|
msg := &KillMsg{}
|
||||||
if err := decoder.Decode(msg); nil != err {
|
if err := decoder.Decode(msg); nil != err {
|
||||||
|
@ -157,7 +187,6 @@ func RouteStopped(r chi.Router, runOpts *options.ServerConfig) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
if _, ok := jobs.Actives.Load(webhooks.URLSafeRefID(msg.JobID)); !ok {
|
if _, ok := jobs.Actives.Load(webhooks.URLSafeRefID(msg.JobID)); !ok {
|
||||||
if _, ok := jobs.Pending.Load(webhooks.URLSafeRefID(msg.JobID)); !ok {
|
if _, ok := jobs.Pending.Load(webhooks.URLSafeRefID(msg.JobID)); !ok {
|
||||||
w.Write([]byte(
|
w.Write([]byte(
|
||||||
|
@ -225,7 +254,7 @@ func RouteStopped(r chi.Router, runOpts *options.ServerConfig) {
|
||||||
report := &Report{}
|
report := &Report{}
|
||||||
if err := decoder.Decode(report); nil != err {
|
if err := decoder.Decode(report); nil != err {
|
||||||
w.WriteHeader(http.StatusBadRequest)
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
writeError(w, HTTPError{
|
writeError(w, &HTTPError{
|
||||||
Code: "E_PARSE",
|
Code: "E_PARSE",
|
||||||
Message: "could not parse request body",
|
Message: "could not parse request body",
|
||||||
Detail: err.Error(),
|
Detail: err.Error(),
|
||||||
|
@ -236,7 +265,7 @@ func RouteStopped(r chi.Router, runOpts *options.ServerConfig) {
|
||||||
jobID := webhooks.URLSafeRefID(chi.URLParam(r, "jobID"))
|
jobID := webhooks.URLSafeRefID(chi.URLParam(r, "jobID"))
|
||||||
if err := jobs.SetReport(jobID, report.Report); nil != err {
|
if err := jobs.SetReport(jobID, report.Report); nil != err {
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
writeError(w, HTTPError{
|
writeError(w, &HTTPError{
|
||||||
Code: "E_SERVER",
|
Code: "E_SERVER",
|
||||||
Message: "could not update report",
|
Message: "could not update report",
|
||||||
Detail: err.Error(),
|
Detail: err.Error(),
|
||||||
|
@ -254,7 +283,51 @@ func RouteStopped(r chi.Router, runOpts *options.ServerConfig) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func writeError(w http.ResponseWriter, err HTTPError) {
|
func ParseSince(sinceStr string) (time.Time, *HTTPError) {
|
||||||
|
if 0 == len(sinceStr) {
|
||||||
|
return time.Time{}, &HTTPError{
|
||||||
|
Code: "E_QUERY",
|
||||||
|
Message: "missing query parameter '?since=' (Unix epoch seconds as float64)",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t, err := ParseUnixTime(sinceStr)
|
||||||
|
if nil != err {
|
||||||
|
return time.Time{}, &HTTPError{
|
||||||
|
Code: "E_QUERY",
|
||||||
|
Message: "invalid query parameter '?since=' (Unix epoch seconds as float64)",
|
||||||
|
Detail: err.Error(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func ParseUnixTime(seconds string) (time.Time, error) {
|
||||||
|
secs, nano, err := ParseSeconds(seconds)
|
||||||
|
if nil != err {
|
||||||
|
return time.Time{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return time.Unix(secs, nano), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func ParseSeconds(s string) (int64, int64, error) {
|
||||||
|
seconds, err := strconv.ParseFloat(s, 64)
|
||||||
|
if nil != err {
|
||||||
|
return 0.0, 0.0, err
|
||||||
|
}
|
||||||
|
secs, nanos := SecondsToInts(seconds)
|
||||||
|
return secs, nanos, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func SecondsToInts(seconds float64) (int64, int64) {
|
||||||
|
secs := math.Floor(seconds)
|
||||||
|
nanos := math.Round((seconds - secs) * 1000000000)
|
||||||
|
return int64(secs), int64(nanos)
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeError(w http.ResponseWriter, err *HTTPError) {
|
||||||
enc := json.NewEncoder(w)
|
enc := json.NewEncoder(w)
|
||||||
enc.SetIndent("", " ")
|
enc.SetIndent("", " ")
|
||||||
_ = enc.Encode(err)
|
_ = enc.Encode(err)
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"os"
|
"os"
|
||||||
|
@ -52,6 +53,19 @@ func init() {
|
||||||
//server.Close()
|
//server.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ToUnixSeconds(t time.Time) float64 {
|
||||||
|
// 1614236182.651912345
|
||||||
|
secs := float64(t.Unix()) // 1614236182
|
||||||
|
nanos := float64(t.Nanosecond()) / 1_000_000_000.0 // 0.651912345
|
||||||
|
//nanos := (float64(t.UnixNano()) - secs) / 1_000_000_000.0 // 0.651912345
|
||||||
|
|
||||||
|
// in my case I want to truncate the precision to milliseconds
|
||||||
|
nanos = math.Round((10000 * nanos) / 10000) // 0.6519
|
||||||
|
|
||||||
|
s := secs + nanos // 1614236182.651912345
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
func TestCallback(t *testing.T) {
|
func TestCallback(t *testing.T) {
|
||||||
// TODO use full API request with local webhook
|
// TODO use full API request with local webhook
|
||||||
t7 := time.Now().Add(-40 * time.Second)
|
t7 := time.Now().Add(-40 * time.Second)
|
||||||
|
@ -82,10 +96,12 @@ func TestCallback(t *testing.T) {
|
||||||
// TODO test that the API gives this back to us
|
// TODO test that the API gives this back to us
|
||||||
urlRevID := hook.GetURLSafeRevID()
|
urlRevID := hook.GetURLSafeRevID()
|
||||||
|
|
||||||
|
s := ToUnixSeconds(t7.Add(-1 * time.Second))
|
||||||
|
|
||||||
// TODO needs auth
|
// TODO needs auth
|
||||||
reqURL := fmt.Sprintf("http://%s/api/admin/logs/%s",
|
reqURL := fmt.Sprintf(
|
||||||
runOpts.Addr,
|
"http://%s/api/admin/logs/%s?since=%f",
|
||||||
string(urlRevID),
|
runOpts.Addr, string(urlRevID), s,
|
||||||
)
|
)
|
||||||
resp, err := http.Get(reqURL)
|
resp, err := http.Get(reqURL)
|
||||||
if nil != err {
|
if nil != err {
|
||||||
|
|
|
@ -127,7 +127,7 @@ func Stop() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// All returns all jobs, including active, recent, and (TODO) historical
|
// All returns all jobs, including active, recent, and (TODO) historical
|
||||||
func All() []*Job {
|
func All(then time.Time) []*Job {
|
||||||
jobsTimersMux.Lock()
|
jobsTimersMux.Lock()
|
||||||
defer jobsTimersMux.Unlock()
|
defer jobsTimersMux.Unlock()
|
||||||
|
|
||||||
|
@ -135,6 +135,10 @@ func All() []*Job {
|
||||||
|
|
||||||
Pending.Range(func(key, value interface{}) bool {
|
Pending.Range(func(key, value interface{}) bool {
|
||||||
hook := value.(*webhooks.Ref)
|
hook := value.(*webhooks.Ref)
|
||||||
|
if hook.Timestamp.Sub(then) <= 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
jobCopy := &Job{
|
jobCopy := &Job{
|
||||||
//StartedAt: job.StartedAt,
|
//StartedAt: job.StartedAt,
|
||||||
ID: string(hook.GetURLSafeRefID()),
|
ID: string(hook.GetURLSafeRefID()),
|
||||||
|
@ -148,6 +152,10 @@ func All() []*Job {
|
||||||
|
|
||||||
Actives.Range(func(key, value interface{}) bool {
|
Actives.Range(func(key, value interface{}) bool {
|
||||||
job := value.(*Job)
|
job := value.(*Job)
|
||||||
|
if job.GitRef.Timestamp.Sub(then) <= 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
jobCopy := &Job{
|
jobCopy := &Job{
|
||||||
StartedAt: job.StartedAt,
|
StartedAt: job.StartedAt,
|
||||||
ID: string(job.GitRef.GetURLSafeRefID()),
|
ID: string(job.GitRef.GetURLSafeRefID()),
|
||||||
|
@ -163,6 +171,10 @@ func All() []*Job {
|
||||||
|
|
||||||
Recents.Range(func(key, value interface{}) bool {
|
Recents.Range(func(key, value interface{}) bool {
|
||||||
job := value.(*Job)
|
job := value.(*Job)
|
||||||
|
if job.GitRef.Timestamp.Sub(then) <= 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
jobCopy := &Job{
|
jobCopy := &Job{
|
||||||
StartedAt: job.StartedAt,
|
StartedAt: job.StartedAt,
|
||||||
ID: string(job.GitRef.GetURLSafeRevID()),
|
ID: string(job.GitRef.GetURLSafeRevID()),
|
||||||
|
|
|
@ -93,7 +93,7 @@ func TestDebounce(t *testing.T) {
|
||||||
time.Sleep(debounceDelay)
|
time.Sleep(debounceDelay)
|
||||||
|
|
||||||
var jobMatch *Job
|
var jobMatch *Job
|
||||||
all := All()
|
all := All(time.Time{})
|
||||||
for i := range all {
|
for i := range all {
|
||||||
// WARN: lock value copied
|
// WARN: lock value copied
|
||||||
j := all[i]
|
j := all[i]
|
||||||
|
@ -152,7 +152,7 @@ func TestDebounce(t *testing.T) {
|
||||||
|
|
||||||
//var j *Job
|
//var j *Job
|
||||||
jobMatch = nil
|
jobMatch = nil
|
||||||
all = All()
|
all = All(time.Time{})
|
||||||
for i := range all {
|
for i := range all {
|
||||||
j := all[i]
|
j := all[i]
|
||||||
//t.Logf("[TEST] B-Job[%d]: %s\n%#v\n", i, j.GitRef.Timestamp, *j.GitRef)
|
//t.Logf("[TEST] B-Job[%d]: %s\n%#v\n", i, j.GitRef.Timestamp, *j.GitRef)
|
||||||
|
|
Loading…
Reference in New Issue