grpc-go/internal/grpcsync/pubsub_test.go

197 lines
4.6 KiB
Go

/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpcsync
import (
"context"
"sync"
"testing"
"time"
)
type testSubscriber struct {
onMsgCh chan int
}
func newTestSubscriber(chSize int) *testSubscriber {
return &testSubscriber{onMsgCh: make(chan int, chSize)}
}
func (ts *testSubscriber) OnMessage(msg any) {
select {
case ts.onMsgCh <- msg.(int):
default:
}
}
func (s) TestPubSub_PublishNoMsg(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
pubsub := NewPubSub(ctx)
ts := newTestSubscriber(1)
pubsub.Subscribe(ts)
select {
case <-ts.onMsgCh:
t.Fatal("Subscriber callback invoked when no message was published")
case <-time.After(defaultTestShortTimeout):
}
}
func (s) TestPubSub_PublishMsgs_RegisterSubs_And_Stop(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
pubsub := NewPubSub(ctx)
const numPublished = 10
ts1 := newTestSubscriber(numPublished)
pubsub.Subscribe(ts1)
var wg sync.WaitGroup
wg.Add(2)
// Publish ten messages on the pubsub and ensure that they are received in order by the subscriber.
go func() {
for i := 0; i < numPublished; i++ {
pubsub.Publish(i)
}
wg.Done()
}()
go func() {
defer wg.Done()
for i := 0; i < numPublished; i++ {
select {
case m := <-ts1.onMsgCh:
if m != i {
t.Errorf("Received unexpected message: %q; want: %q", m, i)
return
}
case <-time.After(defaultTestTimeout):
t.Error("Timeout when expecting the onMessage() callback to be invoked")
return
}
}
}()
wg.Wait()
if t.Failed() {
t.FailNow()
}
// Register another subscriber and ensure that it receives the last published message.
ts2 := newTestSubscriber(numPublished)
pubsub.Subscribe(ts2)
select {
case m := <-ts2.onMsgCh:
if m != numPublished-1 {
t.Fatalf("Received unexpected message: %q; want: %q", m, numPublished-1)
}
case <-time.After(defaultTestShortTimeout):
t.Fatal("Timeout when expecting the onMessage() callback to be invoked")
}
wg.Add(3)
// Publish ten messages on the pubsub and ensure that they are received in order by the subscribers.
go func() {
for i := 0; i < numPublished; i++ {
pubsub.Publish(i)
}
wg.Done()
}()
go func() {
defer wg.Done()
for i := 0; i < numPublished; i++ {
select {
case m := <-ts1.onMsgCh:
if m != i {
t.Errorf("Received unexpected message: %q; want: %q", m, i)
return
}
case <-time.After(defaultTestTimeout):
t.Error("Timeout when expecting the onMessage() callback to be invoked")
return
}
}
}()
go func() {
defer wg.Done()
for i := 0; i < numPublished; i++ {
select {
case m := <-ts2.onMsgCh:
if m != i {
t.Errorf("Received unexpected message: %q; want: %q", m, i)
return
}
case <-time.After(defaultTestTimeout):
t.Error("Timeout when expecting the onMessage() callback to be invoked")
return
}
}
}()
wg.Wait()
if t.Failed() {
t.FailNow()
}
cancel()
<-pubsub.Done()
go func() {
pubsub.Publish(99)
}()
// Ensure that the subscriber callback is not invoked as instantiated
// pubsub has already closed.
select {
case <-ts1.onMsgCh:
t.Fatal("The callback was invoked after pubsub being stopped")
case <-ts2.onMsgCh:
t.Fatal("The callback was invoked after pubsub being stopped")
case <-time.After(defaultTestShortTimeout):
}
}
func (s) TestPubSub_PublishMsgs_BeforeRegisterSub(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
pubsub := NewPubSub(ctx)
const numPublished = 3
for i := 0; i < numPublished; i++ {
pubsub.Publish(i)
}
ts := newTestSubscriber(numPublished)
pubsub.Subscribe(ts)
// Ensure that the subscriber callback is invoked with a previously
// published message.
select {
case d := <-ts.onMsgCh:
if d != numPublished-1 {
t.Fatalf("Unexpected message received: %q; %q", d, numPublished-1)
}
case <-time.After(defaultTestShortTimeout):
t.Fatal("Timeout when expecting the onMessage() callback to be invoked")
}
}