updates to admin stats
This commit is contained in:
parent
025fa5da6b
commit
d5b2837033
|
@ -80,7 +80,8 @@ func InitAdmin(authURL string) {
|
|||
})
|
||||
})
|
||||
|
||||
r.Get("/subscribers", getSubscribers)
|
||||
r.Get("/subscribers", getAllSubscribers)
|
||||
r.Get("/subscribers/{subject}", getSubscribers)
|
||||
r.Delete("/subscribers/{subject}", delSubscribers)
|
||||
r.NotFound(apiNotFoundHandler)
|
||||
})
|
||||
|
@ -115,32 +116,11 @@ type SubscriberStatus struct {
|
|||
// TODO bytes read
|
||||
}
|
||||
|
||||
func getSubscribers(w http.ResponseWriter, r *http.Request) {
|
||||
func getAllSubscribers(w http.ResponseWriter, r *http.Request) {
|
||||
statuses := []*SubscriberStatus{}
|
||||
table.Servers.Range(func(key, value interface{}) bool {
|
||||
status := &SubscriberStatus{
|
||||
Since: nil,
|
||||
Subject: "",
|
||||
Sockets: []string{},
|
||||
Clients: 0,
|
||||
}
|
||||
//subject := key.(string)
|
||||
srvMap := value.(*sync.Map)
|
||||
srvMap.Range(func(k, v interface{}) bool {
|
||||
status.Sockets = append(status.Sockets, k.(string))
|
||||
srv := v.(*table.SubscriberConn)
|
||||
if nil == status.Since || srv.Since.Sub(*status.Since) < 0 {
|
||||
copied := srv.Since.Truncate(time.Second)
|
||||
status.Since = &copied
|
||||
}
|
||||
status.Subject = srv.Grants.Subject
|
||||
srv.Clients.Range(func(k, v interface{}) bool {
|
||||
status.Clients++
|
||||
return true
|
||||
})
|
||||
|
||||
return true
|
||||
})
|
||||
status := getSubscribersHelper(srvMap)
|
||||
statuses = append(statuses, status)
|
||||
return true
|
||||
})
|
||||
|
@ -153,6 +133,53 @@ func getSubscribers(w http.ResponseWriter, r *http.Request) {
|
|||
})
|
||||
}
|
||||
|
||||
func getSubscribers(w http.ResponseWriter, r *http.Request) {
|
||||
subject := chi.URLParam(r, "subject")
|
||||
statuses := &struct {
|
||||
Success bool `json:"success"`
|
||||
Subscribers []*SubscriberStatus `json:"subscribers"`
|
||||
}{
|
||||
Success: true,
|
||||
Subscribers: []*SubscriberStatus{},
|
||||
}
|
||||
|
||||
var srvMap *sync.Map
|
||||
srvMapX, ok := table.Servers.Load(subject)
|
||||
if ok {
|
||||
srvMap = srvMapX.(*sync.Map)
|
||||
statuses.Subscribers = append(statuses.Subscribers, getSubscribersHelper(srvMap))
|
||||
}
|
||||
|
||||
_ = json.NewEncoder(w).Encode(statuses)
|
||||
}
|
||||
|
||||
func getSubscribersHelper(srvMap *sync.Map) *SubscriberStatus {
|
||||
status := &SubscriberStatus{
|
||||
Since: nil,
|
||||
Subject: "",
|
||||
Sockets: []string{},
|
||||
Clients: 0,
|
||||
}
|
||||
|
||||
srvMap.Range(func(k, v interface{}) bool {
|
||||
status.Sockets = append(status.Sockets, k.(string))
|
||||
srv := v.(*table.SubscriberConn)
|
||||
if nil == status.Since || srv.Since.Sub(*status.Since) < 0 {
|
||||
copied := srv.Since.Truncate(time.Second)
|
||||
status.Since = &copied
|
||||
}
|
||||
status.Subject = srv.Grants.Subject
|
||||
srv.Clients.Range(func(k, v interface{}) bool {
|
||||
status.Clients++
|
||||
return true
|
||||
})
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
return status
|
||||
}
|
||||
|
||||
func delSubscribers(w http.ResponseWriter, r *http.Request) {
|
||||
subject := chi.URLParam(r, "subject")
|
||||
|
||||
|
|
Loading…
Reference in New Issue