From ca72ad6d8b4dda3eab45c3dfccc9623ead595139 Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Mon, 1 Jun 2020 02:39:35 -0600 Subject: [PATCH] add client pinging, and server recent pings --- mplexer/cmd/telebit/telebit.go | 22 ++++++++++++++++++++-- mplexer/mgmt-active.sh | 4 ++++ mplexer/mgmt/auth.go | 23 +++++++++++++++++++++++ mplexer/mgmt/authstore/authstore.go | 1 + mplexer/mgmt/authstore/postgresql.go | 18 ++++++++++++++++++ mplexer/mgmt/cmd/mgmt/devices.go | 27 +++++++++++++++++++++++++++ 6 files changed, 93 insertions(+), 2 deletions(-) create mode 100644 mplexer/mgmt-active.sh diff --git a/mplexer/cmd/telebit/telebit.go b/mplexer/cmd/telebit/telebit.go index 1e0c885..40ef4ca 100644 --- a/mplexer/cmd/telebit/telebit.go +++ b/mplexer/cmd/telebit/telebit.go @@ -199,7 +199,8 @@ func main() { connected := make(chan net.Conn) go func() { - timeoutCtx, cancelTimeout := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second)) + timeoutCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second)) + defer cancel() tun, err := telebit.DialWebsocketTunnel(timeoutCtx, *relay, *token) if nil != err { msg := "" @@ -210,10 +211,27 @@ func main() { os.Exit(1) return } - cancelTimeout() + + err = mgmt.Ping(*authURL, *token) + if nil != err { + fmt.Fprintf(os.Stderr, "failed to ping mgmt server: %s", err) + //os.Exit(1) + } + connected <- tun }() + go func() { + for { + time.Sleep(10 * time.Minute) + err = mgmt.Ping(*authURL, *token) + if nil != err { + fmt.Fprintf(os.Stderr, "failed to ping mgmt server: %s", err) + //os.Exit(1) + } + } + }() + tun := <-connected fmt.Printf("Listening at %s\n", *relay) log.Fatal("Closed server: ", telebit.ListenAndServe(tun, mux)) diff --git a/mplexer/mgmt-active.sh b/mplexer/mgmt-active.sh new file mode 100644 index 0000000..f940bf5 --- /dev/null +++ b/mplexer/mgmt-active.sh @@ -0,0 +1,4 @@ +TOKEN=$(go run cmd/signjwt/*.go) +echo "TOKEN: $TOKEN" + +curl -L http://localhost:3000/api/devices -H "Authorization: Bearer ${TOKEN}" diff --git a/mplexer/mgmt/auth.go b/mplexer/mgmt/auth.go index 2791cfc..13e7144 100644 --- a/mplexer/mgmt/auth.go +++ b/mplexer/mgmt/auth.go @@ -14,6 +14,29 @@ type Grants struct { Domains []string `json:"domains"` } +type SuccessResponse struct { + Success bool `json:"success"` +} + +func Ping(authURL, token string) error { + msg, err := telebit.Request("POST", authURL+"/ping", token, nil) + if nil != err { + return err + } + if nil == msg { + return fmt.Errorf("invalid response") + } + resp := SuccessResponse{} + err = json.NewDecoder(msg).Decode(&resp) + if err != nil { + return err + } + if true != resp.Success { + return fmt.Errorf("expected successful response") + } + return nil +} + func Inspect(authURL, token string) (*Grants, error) { msg, err := telebit.Request("GET", authURL+"/inspect", token, nil) if nil != err { diff --git a/mplexer/mgmt/authstore/authstore.go b/mplexer/mgmt/authstore/authstore.go index ba26874..a131e77 100644 --- a/mplexer/mgmt/authstore/authstore.go +++ b/mplexer/mgmt/authstore/authstore.go @@ -29,6 +29,7 @@ type Store interface { SetMaster(secret string) error Add(auth *Authorization) error Set(auth *Authorization) error + Active() ([]Authorization, error) Touch(id string) error Get(id string) (*Authorization, error) GetBySlug(id string) (*Authorization, error) diff --git a/mplexer/mgmt/authstore/postgresql.go b/mplexer/mgmt/authstore/postgresql.go index d9f2616..ed759f3 100644 --- a/mplexer/mgmt/authstore/postgresql.go +++ b/mplexer/mgmt/authstore/postgresql.go @@ -156,6 +156,24 @@ func (s *PGStore) Touch(pub string) error { return nil } +func (s *PGStore) Active() ([]Authorization, error) { + ctx, done := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second)) + defer done() + + auths := []Authorization{} + query := ` + SELECT * FROM authorizations + WHERE deleted_at = '1970-01-01 00:00:00' + AND updated_at > $1 + ` + ago15Min := time.Now().Add(-15 * time.Minute) + err := s.dbx.SelectContext(ctx, &auths, query, ago15Min) + if nil != err { + return nil, err + } + return auths, nil +} + func (s *PGStore) Get(id string) (*Authorization, error) { ctx, done := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second)) defer done() diff --git a/mplexer/mgmt/cmd/mgmt/devices.go b/mplexer/mgmt/cmd/mgmt/devices.go index b880bc5..5005bfb 100644 --- a/mplexer/mgmt/cmd/mgmt/devices.go +++ b/mplexer/mgmt/cmd/mgmt/devices.go @@ -91,6 +91,33 @@ func handleDeviceRoutes(r chi.Router) { w.Write([]byte(string(result) + "\n")) }) + r.Get("/", func(w http.ResponseWriter, r *http.Request) { + things, err := store.Active() + if nil != err { + msg := `{"error":"not really sure what happened, but it didn't go well (check the logs)"}` + log.Printf("/api/devices/\n") + log.Println(err) + http.Error(w, msg, http.StatusInternalServerError) + return + } + + for i, _ := range things { + auth := things[i] + // Redact private data + if "" != auth.MachinePPID { + auth.MachinePPID = "[redacted]" + } + if "" != auth.SharedKey { + auth.SharedKey = "[redacted]" + } + things[i] = auth + } + + encoder := json.NewEncoder(w) + encoder.SetEscapeHTML(true) + _ = encoder.Encode(things) + }) + r.Get("/{slug}", func(w http.ResponseWriter, r *http.Request) { slug := chi.URLParam(r, "slug") // TODO store should be concurrency-safe