cli/monitor/monitor.go

157 lines
3.1 KiB
Go

package monitor
import (
"encoding/json"
"fmt"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/patrickmn/go-cache"
"github.com/rancher/cli/cliclient"
"github.com/sirupsen/logrus"
)
type Event struct {
Name string `json:"name"`
ResourceType string `json:"resourceType"`
ResourceID string `json:"resourceId"`
Data map[string]interface{} `json:"data"`
}
type Monitor struct {
sync.Mutex
c *cliclient.MasterClient
cache *cache.Cache
subCounter int
subscriptions map[int]*Subscription
}
func (m *Monitor) Subscribe() *Subscription {
m.Lock()
defer m.Unlock()
m.subCounter++
sub := &Subscription{
id: m.subCounter,
C: make(chan *Event, 1024),
}
m.subscriptions[sub.id] = sub
return sub
}
func (m *Monitor) Unsubscribe(sub *Subscription) {
m.Lock()
defer m.Unlock()
close(sub.C)
delete(m.subscriptions, sub.id)
}
type Subscription struct {
id int
C chan *Event
}
func New(c *cliclient.MasterClient) *Monitor {
return &Monitor{
c: c,
cache: cache.New(5*time.Minute, 30*time.Second),
subscriptions: map[int]*Subscription{},
}
}
//func (m *Monitor) Start() error {
// schema, err := m.c.ManagementClient.DynamicSchema.ByID("subscribe")
// if nil != err {
// return fmt.Errorf("not authorized to subscribe")
// }
//
// urlString := schema.Links["collection"]
// u, err := url.Parse(urlString)
// if err != nil {
// return err
// }
//
// switch u.Scheme {
// case "http":
// u.Scheme = "ws"
// case "https":
// u.Scheme = "wss"
// }
//
// q := u.Query()
// q.Add("eventNames", "resource.change")
// q.Add("eventNames", "service.kubernetes.change")
//
// u.RawQuery = q.Encode()
//
// conn, resp, err := m.c.Websocket(u.String(), nil)
// if err != nil {
// return err
// }
//
// if resp.StatusCode != 101 {
// return fmt.Errorf("Bad status code: %d %s", resp.StatusCode, resp.Status)
// }
//
// logrus.Debugf("Connected to: %s", u.String())
//
// return m.watch(conn)
//}
func (m *Monitor) Get(resourceType, resourceID string, obj interface{}) (bool, error) {
val, ok := m.cache.Get(key(resourceType, resourceID))
if !ok {
return ok, nil
}
if val == nil {
return true, nil
}
content, err := json.Marshal(val)
if err != nil {
return ok, err
}
return true, json.Unmarshal(content, obj)
}
func key(a, b string) string {
return fmt.Sprintf("%s:%s", a, b)
}
func (m *Monitor) put(resourceType, resourceID string, event *Event) {
if resourceType == "" && resourceID == "" {
return
}
m.cache.Replace(key(resourceType, resourceID), event.Data["resource"], cache.DefaultExpiration)
m.Lock()
defer m.Unlock()
for _, sub := range m.subscriptions {
sub.C <- event
}
}
func (m *Monitor) watch(conn *websocket.Conn) error {
for {
v := Event{}
_, r, err := conn.NextReader()
if err != nil {
return err
}
if err := json.NewDecoder(r).Decode(&v); err != nil {
logrus.Errorf("Failed to parse json in message")
continue
}
logrus.Debugf("Event: %s %s %s %v", v.Name, v.ResourceType, v.ResourceID, v.Data)
m.put(v.ResourceType, v.ResourceID, &v)
}
}