From acbfcbb8e8f555225c86b05cb1c0f5b0674b12af Mon Sep 17 00:00:00 2001 From: Arvind Bright Date: Fri, 30 Jun 2023 16:31:29 -0700 Subject: [PATCH] internal/grpcsync: refactor test (#6427) --- internal/grpcsync/pubsub_test.go | 125 +++++++++++++------------------ 1 file changed, 52 insertions(+), 73 deletions(-) diff --git a/internal/grpcsync/pubsub_test.go b/internal/grpcsync/pubsub_test.go index 9aebf3593..c610f99b2 100644 --- a/internal/grpcsync/pubsub_test.go +++ b/internal/grpcsync/pubsub_test.go @@ -19,44 +19,26 @@ package grpcsync import ( - "fmt" "sync" "testing" "time" - - "github.com/google/go-cmp/cmp" ) type testSubscriber struct { - mu sync.Mutex - msgs []int - onMsgCh chan struct{} + onMsgCh chan int } func newTestSubscriber(chSize int) *testSubscriber { - return &testSubscriber{onMsgCh: make(chan struct{}, chSize)} + return &testSubscriber{onMsgCh: make(chan int, chSize)} } func (ts *testSubscriber) OnMessage(msg interface{}) { - ts.mu.Lock() - defer ts.mu.Unlock() - ts.msgs = append(ts.msgs, msg.(int)) select { - case ts.onMsgCh <- struct{}{}: + case ts.onMsgCh <- msg.(int): default: } } -func (ts *testSubscriber) receivedMsgs() []int { - ts.mu.Lock() - defer ts.mu.Unlock() - - msgs := make([]int, len(ts.msgs)) - copy(msgs, ts.msgs) - - return msgs -} - func (s) TestPubSub_PublishNoMsg(t *testing.T) { pubsub := NewPubSub() defer pubsub.Stop() @@ -66,7 +48,7 @@ func (s) TestPubSub_PublishNoMsg(t *testing.T) { select { case <-ts.onMsgCh: - t.Fatalf("Subscriber callback invoked when no message was published") + t.Fatal("Subscriber callback invoked when no message was published") case <-time.After(defaultTestShortTimeout): } } @@ -78,7 +60,6 @@ func (s) TestPubSub_PublishMsgs_RegisterSubs_And_Stop(t *testing.T) { ts1 := newTestSubscriber(numPublished) pubsub.Subscribe(ts1) - wantMsgs1 := []int{} var wg sync.WaitGroup wg.Add(2) @@ -86,43 +67,41 @@ func (s) TestPubSub_PublishMsgs_RegisterSubs_And_Stop(t *testing.T) { go func() { for i := 0; i < numPublished; i++ { pubsub.Publish(i) - wantMsgs1 = append(wantMsgs1, i) } wg.Done() }() - isTimeout := false go func() { + defer wg.Done() for i := 0; i < numPublished; i++ { select { - case <-ts1.onMsgCh: + case m := <-ts1.onMsgCh: + if m != i { + t.Errorf("Received unexpected message: %q; want: %q", m, i) + return + } case <-time.After(defaultTestTimeout): - isTimeout = true + t.Error("Timeout when expecting the onMessage() callback to be invoked") + return } } - wg.Done() }() - wg.Wait() - if isTimeout { - t.Fatalf("Timeout when expecting the onMessage() callback to be invoked") - } - if gotMsgs1 := ts1.receivedMsgs(); !cmp.Equal(gotMsgs1, wantMsgs1) { - t.Fatalf("Received messages is %v, want %v", gotMsgs1, wantMsgs1) + if t.Failed() { + t.FailNow() } // Register another subscriber and ensure that it receives the last published message. ts2 := newTestSubscriber(numPublished) pubsub.Subscribe(ts2) - wantMsgs2 := wantMsgs1[len(wantMsgs1)-1:] select { - case <-ts2.onMsgCh: + case m := <-ts2.onMsgCh: + if m != numPublished-1 { + t.Fatalf("Received unexpected message: %q; want: %q", m, numPublished-1) + } case <-time.After(defaultTestShortTimeout): - t.Fatalf("Timeout when expecting the onMessage() callback to be invoked") - } - if gotMsgs2 := ts2.receivedMsgs(); !cmp.Equal(gotMsgs2, wantMsgs2) { - t.Fatalf("Received messages is %v, want %v", gotMsgs2, wantMsgs2) + t.Fatal("Timeout when expecting the onMessage() callback to be invoked") } wg.Add(3) @@ -130,43 +109,43 @@ func (s) TestPubSub_PublishMsgs_RegisterSubs_And_Stop(t *testing.T) { go func() { for i := 0; i < numPublished; i++ { pubsub.Publish(i) - wantMsgs1 = append(wantMsgs1, i) - wantMsgs2 = append(wantMsgs2, i) - } - wg.Done() - }() - errCh := make(chan error, 1) - go func() { - for i := 0; i < numPublished; i++ { - select { - case <-ts1.onMsgCh: - case <-time.After(defaultTestTimeout): - errCh <- fmt.Errorf("") - } } wg.Done() }() go func() { + defer wg.Done() for i := 0; i < numPublished; i++ { select { - case <-ts2.onMsgCh: + case m := <-ts1.onMsgCh: + if m != i { + t.Errorf("Received unexpected message: %q; want: %q", m, i) + return + } case <-time.After(defaultTestTimeout): - errCh <- fmt.Errorf("") + t.Error("Timeout when expecting the onMessage() callback to be invoked") + return + } + } + + }() + go func() { + defer wg.Done() + for i := 0; i < numPublished; i++ { + select { + case m := <-ts2.onMsgCh: + if m != i { + t.Errorf("Received unexpected message: %q; want: %q", m, i) + return + } + case <-time.After(defaultTestTimeout): + t.Error("Timeout when expecting the onMessage() callback to be invoked") + return } } - wg.Done() }() wg.Wait() - select { - case <-errCh: - t.Fatalf("Timeout when expecting the onMessage() callback to be invoked") - default: - } - if gotMsgs1 := ts1.receivedMsgs(); !cmp.Equal(gotMsgs1, wantMsgs1) { - t.Fatalf("Received messages is %v, want %v", gotMsgs1, wantMsgs1) - } - if gotMsgs2 := ts2.receivedMsgs(); !cmp.Equal(gotMsgs2, wantMsgs2) { - t.Fatalf("Received messages is %v, want %v", gotMsgs2, wantMsgs2) + if t.Failed() { + t.FailNow() } pubsub.Stop() @@ -178,9 +157,9 @@ func (s) TestPubSub_PublishMsgs_RegisterSubs_And_Stop(t *testing.T) { // pubsub has already closed. select { case <-ts1.onMsgCh: - t.Fatalf("The callback was invoked after pubsub being stopped") + t.Fatal("The callback was invoked after pubsub being stopped") case <-ts2.onMsgCh: - t.Fatalf("The callback was invoked after pubsub being stopped") + t.Fatal("The callback was invoked after pubsub being stopped") case <-time.After(defaultTestShortTimeout): } } @@ -197,15 +176,15 @@ func (s) TestPubSub_PublishMsgs_BeforeRegisterSub(t *testing.T) { ts := newTestSubscriber(numPublished) pubsub.Subscribe(ts) - wantMsgs := []int{numPublished - 1} // Ensure that the subscriber callback is invoked with a previously // published message. select { - case <-ts.onMsgCh: - if gotMsgs := ts.receivedMsgs(); !cmp.Equal(gotMsgs, wantMsgs) { - t.Fatalf("Received messages is %v, want %v", gotMsgs, wantMsgs) + case d := <-ts.onMsgCh: + if d != numPublished-1 { + t.Fatalf("Unexpected message received: %q; %q", d, numPublished-1) } + case <-time.After(defaultTestShortTimeout): - t.Fatalf("Timeout when expecting the onMessage() callback to be invoked") + t.Fatal("Timeout when expecting the onMessage() callback to be invoked") } }