From 165164ded01de39f731e92014a1dcb73c08877bd Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Sun, 11 May 2014 23:51:44 +0000 Subject: [PATCH] inmem: fix StackSender StackSender is now more solid, with the beginning of a test harness. Signed-off-by: Solomon Hykes --- inmem/stack.go | 78 ++++++++++++++++++++++++++------------------- inmem/stack_test.go | 63 ++++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 33 deletions(-) diff --git a/inmem/stack.go b/inmem/stack.go index 87378ca7f0..282b8eaf03 100644 --- a/inmem/stack.go +++ b/inmem/stack.go @@ -3,6 +3,7 @@ package inmem import ( "container/list" "fmt" + "strings" "sync" ) @@ -12,7 +13,6 @@ import ( // are removed from the queue. type StackSender struct { stack *list.List - front *list.Element l sync.RWMutex } @@ -24,15 +24,16 @@ func NewStackSender() *StackSender { } func (s *StackSender) Send(msg *Message, mode int) (r Receiver, w Sender, err error) { - ok := s.walk(func(h Sender) (ok bool) { + completed := s.walk(func(h Sender) (ok bool) { r, w, err = h.Send(msg, mode) - fmt.Printf("sending %v to %#v returned %v\n", msg, h, err) + fmt.Printf("[stacksender] sending %v to %#v returned %v\n", msg, h, err) if err == nil { return true } return false }) - if ok { + // If walk was completed, it means we didn't find a valid handler + if !completed { return r, w, err } // Silently drop messages if no valid backend is available. @@ -43,9 +44,10 @@ func (s *StackSender) Add(dst Sender) *StackSender { s.l.Lock() defer s.l.Unlock() prev := &StackSender{ - stack: s.stack, - front: s.stack.Front(), + stack: list.New(), } + prev.stack.PushFrontList(s.stack) + fmt.Printf("[ADD] prev %#v\n", prev) s.stack.PushFront(dst) return prev } @@ -59,41 +61,51 @@ func (s *StackSender) Close() error { return nil } -func (s *StackSender) walk(f func(Sender) bool) bool { +func (s *StackSender) _walk(f func(*list.Element) bool) bool { var e *list.Element - if s.front == nil { - s.l.RLock() - e = s.stack.Front() - s.l.RUnlock() - } else { - e = s.front - } + s.l.RLock() + e = s.stack.Front() + s.l.RUnlock() for e != nil { fmt.Printf("[StackSender.Walk] %v\n", e.Value.(Sender)) - ok := f(e.Value.(Sender)) - if ok { - fmt.Printf("[StackSender.Walk] %v -> OK\n", e.Value.(Sender)) - return true - } - fmt.Printf("[StackSender.Walk] %v invalid: removing\n", e.Value.(Sender)) - s.l.Lock() + s.l.RLock() next := e.Next() - s.stack.Remove(e) - s.l.Unlock() + s.l.RUnlock() + cont := f(e) + if !cont { + return false + } e = next } - fmt.Printf("[STackSender.Walk] -> no valid handler\n") - return false + return true } + +func (s *StackSender) walk(f func(Sender) bool) bool { + return s._walk(func(e *list.Element) bool { + ok := f(e.Value.(Sender)) + if ok { + // Found a valid handler. Stop walking. + return false + } + // Invalid handler: remove. + s.l.Lock() + s.stack.Remove(e) + s.l.Unlock() + return true + }) +} + func (s *StackSender) Len() int { s.l.RLock() defer s.l.RUnlock() - if s.front == nil { - return s.stack.Len() - } - var len int - for e := s.front; e != nil; e = e.Next() { - len++ - } - return len + return s.stack.Len() +} + +func (s *StackSender) String() string { + var parts []string + s._walk(func(e *list.Element) bool { + parts = append(parts, fmt.Sprintf("%v", e.Value.(Sender))) + return true + }) + return fmt.Sprintf("%d:[%s]", len(parts), strings.Join(parts, "->")) } diff --git a/inmem/stack_test.go b/inmem/stack_test.go index 1a37eeebff..44425ce38a 100644 --- a/inmem/stack_test.go +++ b/inmem/stack_test.go @@ -31,3 +31,66 @@ func TestSendStack(t *testing.T) { } }) } + +func TestStackLen(t *testing.T) { + s := NewStackSender() + if s.Len() != 0 { + t.Fatalf("empty StackSender has length %d", s.Len()) + } +} + +func TestStackAdd(t *testing.T) { + s := NewStackSender() + a := Buffer{} + beforeA := s.Add(&a) + // Add on an empty StackSender should return an empty StackSender + if beforeA.Len() != 0 { + t.Fatalf("%s has %d elements", beforeA, beforeA.Len()) + } + if s.Len() != 1 { + t.Fatalf("%#v", beforeA) + } + // Add a 2nd element + b := Buffer{} + beforeB := s.Add(&b) + if beforeB.Len() != 1 { + t.Fatalf("%#v", beforeA) + } + if s.Len() != 2 { + t.Fatalf("%#v", beforeA) + } + s.Send(&Message{"for b", nil}, 0) + beforeB.Send(&Message{"for a", nil}, 0) + beforeA.Send(&Message{"for nobody", nil}, 0) + if len(a) != 1 { + t.Fatalf("%#v", a) + } + if len(b) != 1 { + t.Fatalf("%#v", b) + } +} + +// Misbehaving backends must be removed +func TestStackAddBad(t *testing.T) { + s := NewStackSender() + buf := Buffer{} + s.Add(&buf) + r, w := Pipe() + s.Add(w) + if s.Len() != 2 { + t.Fatalf("%#v", s) + } + r.Close() + if _, _, err := s.Send(&Message{"for the buffer", nil}, 0); err != nil { + t.Fatal(err) + } + if s.Len() != 1 { + t.Fatalf("%#v") + } + if len(buf) != 1 { + t.Fatalf("%#v", buf) + } + if buf[0].Name != "for the buffer" { + t.Fatalf("%#v", buf) + } +}