internal/grpcsync: refactor test (#6427)

This commit is contained in:
Arvind Bright 2023-06-30 16:31:29 -07:00 committed by GitHub
parent 51042db745
commit acbfcbb8e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 52 additions and 73 deletions

View File

@ -19,44 +19,26 @@
package grpcsync
import (
"fmt"
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
)
type testSubscriber struct {
mu sync.Mutex
msgs []int
onMsgCh chan struct{}
onMsgCh chan int
}
func newTestSubscriber(chSize int) *testSubscriber {
return &testSubscriber{onMsgCh: make(chan struct{}, chSize)}
return &testSubscriber{onMsgCh: make(chan int, chSize)}
}
func (ts *testSubscriber) OnMessage(msg interface{}) {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.msgs = append(ts.msgs, msg.(int))
select {
case ts.onMsgCh <- struct{}{}:
case ts.onMsgCh <- msg.(int):
default:
}
}
func (ts *testSubscriber) receivedMsgs() []int {
ts.mu.Lock()
defer ts.mu.Unlock()
msgs := make([]int, len(ts.msgs))
copy(msgs, ts.msgs)
return msgs
}
func (s) TestPubSub_PublishNoMsg(t *testing.T) {
pubsub := NewPubSub()
defer pubsub.Stop()
@ -66,7 +48,7 @@ func (s) TestPubSub_PublishNoMsg(t *testing.T) {
select {
case <-ts.onMsgCh:
t.Fatalf("Subscriber callback invoked when no message was published")
t.Fatal("Subscriber callback invoked when no message was published")
case <-time.After(defaultTestShortTimeout):
}
}
@ -78,7 +60,6 @@ func (s) TestPubSub_PublishMsgs_RegisterSubs_And_Stop(t *testing.T) {
ts1 := newTestSubscriber(numPublished)
pubsub.Subscribe(ts1)
wantMsgs1 := []int{}
var wg sync.WaitGroup
wg.Add(2)
@ -86,43 +67,41 @@ func (s) TestPubSub_PublishMsgs_RegisterSubs_And_Stop(t *testing.T) {
go func() {
for i := 0; i < numPublished; i++ {
pubsub.Publish(i)
wantMsgs1 = append(wantMsgs1, i)
}
wg.Done()
}()
isTimeout := false
go func() {
defer wg.Done()
for i := 0; i < numPublished; i++ {
select {
case <-ts1.onMsgCh:
case m := <-ts1.onMsgCh:
if m != i {
t.Errorf("Received unexpected message: %q; want: %q", m, i)
return
}
case <-time.After(defaultTestTimeout):
isTimeout = true
t.Error("Timeout when expecting the onMessage() callback to be invoked")
return
}
}
wg.Done()
}()
wg.Wait()
if isTimeout {
t.Fatalf("Timeout when expecting the onMessage() callback to be invoked")
}
if gotMsgs1 := ts1.receivedMsgs(); !cmp.Equal(gotMsgs1, wantMsgs1) {
t.Fatalf("Received messages is %v, want %v", gotMsgs1, wantMsgs1)
if t.Failed() {
t.FailNow()
}
// Register another subscriber and ensure that it receives the last published message.
ts2 := newTestSubscriber(numPublished)
pubsub.Subscribe(ts2)
wantMsgs2 := wantMsgs1[len(wantMsgs1)-1:]
select {
case <-ts2.onMsgCh:
case m := <-ts2.onMsgCh:
if m != numPublished-1 {
t.Fatalf("Received unexpected message: %q; want: %q", m, numPublished-1)
}
case <-time.After(defaultTestShortTimeout):
t.Fatalf("Timeout when expecting the onMessage() callback to be invoked")
}
if gotMsgs2 := ts2.receivedMsgs(); !cmp.Equal(gotMsgs2, wantMsgs2) {
t.Fatalf("Received messages is %v, want %v", gotMsgs2, wantMsgs2)
t.Fatal("Timeout when expecting the onMessage() callback to be invoked")
}
wg.Add(3)
@ -130,43 +109,43 @@ func (s) TestPubSub_PublishMsgs_RegisterSubs_And_Stop(t *testing.T) {
go func() {
for i := 0; i < numPublished; i++ {
pubsub.Publish(i)
wantMsgs1 = append(wantMsgs1, i)
wantMsgs2 = append(wantMsgs2, i)
}
wg.Done()
}()
errCh := make(chan error, 1)
go func() {
for i := 0; i < numPublished; i++ {
select {
case <-ts1.onMsgCh:
case <-time.After(defaultTestTimeout):
errCh <- fmt.Errorf("")
}
}
wg.Done()
}()
go func() {
defer wg.Done()
for i := 0; i < numPublished; i++ {
select {
case <-ts2.onMsgCh:
case m := <-ts1.onMsgCh:
if m != i {
t.Errorf("Received unexpected message: %q; want: %q", m, i)
return
}
case <-time.After(defaultTestTimeout):
errCh <- fmt.Errorf("")
t.Error("Timeout when expecting the onMessage() callback to be invoked")
return
}
}
}()
go func() {
defer wg.Done()
for i := 0; i < numPublished; i++ {
select {
case m := <-ts2.onMsgCh:
if m != i {
t.Errorf("Received unexpected message: %q; want: %q", m, i)
return
}
case <-time.After(defaultTestTimeout):
t.Error("Timeout when expecting the onMessage() callback to be invoked")
return
}
}
wg.Done()
}()
wg.Wait()
select {
case <-errCh:
t.Fatalf("Timeout when expecting the onMessage() callback to be invoked")
default:
}
if gotMsgs1 := ts1.receivedMsgs(); !cmp.Equal(gotMsgs1, wantMsgs1) {
t.Fatalf("Received messages is %v, want %v", gotMsgs1, wantMsgs1)
}
if gotMsgs2 := ts2.receivedMsgs(); !cmp.Equal(gotMsgs2, wantMsgs2) {
t.Fatalf("Received messages is %v, want %v", gotMsgs2, wantMsgs2)
if t.Failed() {
t.FailNow()
}
pubsub.Stop()
@ -178,9 +157,9 @@ func (s) TestPubSub_PublishMsgs_RegisterSubs_And_Stop(t *testing.T) {
// pubsub has already closed.
select {
case <-ts1.onMsgCh:
t.Fatalf("The callback was invoked after pubsub being stopped")
t.Fatal("The callback was invoked after pubsub being stopped")
case <-ts2.onMsgCh:
t.Fatalf("The callback was invoked after pubsub being stopped")
t.Fatal("The callback was invoked after pubsub being stopped")
case <-time.After(defaultTestShortTimeout):
}
}
@ -197,15 +176,15 @@ func (s) TestPubSub_PublishMsgs_BeforeRegisterSub(t *testing.T) {
ts := newTestSubscriber(numPublished)
pubsub.Subscribe(ts)
wantMsgs := []int{numPublished - 1}
// Ensure that the subscriber callback is invoked with a previously
// published message.
select {
case <-ts.onMsgCh:
if gotMsgs := ts.receivedMsgs(); !cmp.Equal(gotMsgs, wantMsgs) {
t.Fatalf("Received messages is %v, want %v", gotMsgs, wantMsgs)
case d := <-ts.onMsgCh:
if d != numPublished-1 {
t.Fatalf("Unexpected message received: %q; %q", d, numPublished-1)
}
case <-time.After(defaultTestShortTimeout):
t.Fatalf("Timeout when expecting the onMessage() callback to be invoked")
t.Fatal("Timeout when expecting the onMessage() callback to be invoked")
}
}