diff --git a/inmem/stack.go b/inmem/stack.go index 4509962830..90b66e87ed 100644 --- a/inmem/stack.go +++ b/inmem/stack.go @@ -2,6 +2,7 @@ package inmem import ( "container/list" + "fmt" "sync" ) @@ -11,6 +12,7 @@ import ( // are removed from the queue. type StackSender struct { stack *list.List + front *list.Element l sync.RWMutex } @@ -21,28 +23,77 @@ func NewStackSender() *StackSender { } } -func (s *StackSender) Send(msg *Message, mode int) (Receiver, Sender, error) { - s.l.RLock() - e := s.stack.Front() - s.l.RUnlock() - for e != nil { - h := e.Value.(Sender) - r, w, err := h.Send(msg, mode) +func (s *StackSender) Send(msg *Message, mode int) (r Receiver, w Sender, err error) { + ok := s.walk(func(h Sender) (ok bool) { + r, w, err = h.Send(msg, mode) + fmt.Printf("sending %v to %#v returned %v\n", msg, h, err) if err == nil { - return r, w, nil + return true } + return false + }) + if ok { + return r, w, err + } + // Silently drop messages if no valid backend is available. + return nil, nil, nil +} + +func (s *StackSender) Add(dst Sender) *StackSender { + s.l.Lock() + defer s.l.Unlock() + prev := &StackSender{ + stack: s.stack, + front: s.stack.Front(), + } + s.stack.PushFront(dst) + return prev +} + +func (s *StackSender) Close() error { + s.walk(func(h Sender) bool { + h.Close() + // remove all handlers + return false + }) + return nil +} + +func (s *StackSender) walk(f func(Sender) bool) bool { + var e *list.Element + if s.front == nil { + s.l.RLock() + e = s.stack.Front() + s.l.RUnlock() + } else { + e = s.front + } + for e != nil { + fmt.Printf("[StackSender.Walk] %v\n", e.Value.(Sender)) + ok := f(e.Value.(Sender)) + if ok { + fmt.Printf("[StackSender.Walk] %v -> OK\n", e.Value.(Sender)) + return true + } + fmt.Printf("[StackSender.Walk] %v invalid: removing\n", e.Value.(Sender)) s.l.Lock() next := e.Next() s.stack.Remove(e) s.l.Unlock() e = next } - // Silently drop messages if no valid backend is available. - return nil, nil, nil + fmt.Printf("[STackSender.Walk] -> no valid handler\n") + return false } - -func (s *StackSender) Add(dst Sender) { - s.l.Lock() - defer s.l.Unlock() - s.stack.PushFront(dst) +func (s *StackSender) Len() int { + s.l.RLock() + defer s.l.RUnlock() + if s.front == nil { + return s.stack.Len() + } + var len int + for e := s.front; e != nil ; e = e.Next() { + len++ + } + return len }