mirror of https://github.com/grpc/grpc-go.git
				
				
				
			
		
			
				
	
	
		
			550 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			550 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Go
		
	
	
	
/*
 | 
						|
 * Copyright 2022 gRPC authors.
 | 
						|
 *
 | 
						|
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
 * you may not use this file except in compliance with the License.
 | 
						|
 * You may obtain a copy of the License at
 | 
						|
 *
 | 
						|
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 *
 | 
						|
 * Unless required by applicable law or agreed to in writing, software
 | 
						|
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
 * See the License for the specific language governing permissions and
 | 
						|
 * limitations under the License.
 | 
						|
 */
 | 
						|
 | 
						|
package orca_test
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"sync"
 | 
						|
	"testing"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/golang/protobuf/proto"
 | 
						|
	"google.golang.org/grpc"
 | 
						|
	"google.golang.org/grpc/balancer"
 | 
						|
	"google.golang.org/grpc/balancer/roundrobin"
 | 
						|
	"google.golang.org/grpc/codes"
 | 
						|
	"google.golang.org/grpc/credentials/insecure"
 | 
						|
	"google.golang.org/grpc/internal/grpctest"
 | 
						|
	"google.golang.org/grpc/internal/testutils"
 | 
						|
	"google.golang.org/grpc/orca"
 | 
						|
	"google.golang.org/grpc/orca/internal"
 | 
						|
	"google.golang.org/grpc/resolver"
 | 
						|
	"google.golang.org/grpc/resolver/manual"
 | 
						|
	"google.golang.org/grpc/status"
 | 
						|
 | 
						|
	v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
 | 
						|
	v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3"
 | 
						|
	v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3"
 | 
						|
)
 | 
						|
 | 
						|
// customLBB wraps a round robin LB policy but provides a ClientConn wrapper to
 | 
						|
// add an ORCA OOB report producer for all created SubConns.
 | 
						|
type customLBB struct{}
 | 
						|
 | 
						|
func (customLBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
 | 
						|
	return balancer.Get(roundrobin.Name).Build(&ccWrapper{ClientConn: cc}, opts)
 | 
						|
}
 | 
						|
 | 
						|
func (customLBB) Name() string { return "customLB" }
 | 
						|
 | 
						|
func init() {
 | 
						|
	balancer.Register(customLBB{})
 | 
						|
}
 | 
						|
 | 
						|
type ccWrapper struct {
 | 
						|
	balancer.ClientConn
 | 
						|
}
 | 
						|
 | 
						|
func (w *ccWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
 | 
						|
	if len(addrs) != 1 {
 | 
						|
		panic(fmt.Sprintf("got addrs=%v; want len(addrs) == 1", addrs))
 | 
						|
	}
 | 
						|
	sc, err := w.ClientConn.NewSubConn(addrs, opts)
 | 
						|
	if err != nil {
 | 
						|
		return sc, err
 | 
						|
	}
 | 
						|
	l := getListenerInfo(addrs[0])
 | 
						|
	l.listener.cleanup = orca.RegisterOOBListener(sc, l.listener, l.opts)
 | 
						|
	l.sc = sc
 | 
						|
	return sc, nil
 | 
						|
}
 | 
						|
 | 
						|
// listenerInfo is stored in an address's attributes to allow ORCA
 | 
						|
// listeners to be registered on subconns created for that address.
 | 
						|
type listenerInfo struct {
 | 
						|
	listener *testOOBListener
 | 
						|
	opts     orca.OOBListenerOptions
 | 
						|
	sc       balancer.SubConn // Set by the LB policy
 | 
						|
}
 | 
						|
 | 
						|
type listenerInfoKey struct{}
 | 
						|
 | 
						|
func setListenerInfo(addr resolver.Address, l *listenerInfo) resolver.Address {
 | 
						|
	addr.Attributes = addr.Attributes.WithValue(listenerInfoKey{}, l)
 | 
						|
	return addr
 | 
						|
}
 | 
						|
 | 
						|
func getListenerInfo(addr resolver.Address) *listenerInfo {
 | 
						|
	return addr.Attributes.Value(listenerInfoKey{}).(*listenerInfo)
 | 
						|
}
 | 
						|
 | 
						|
// testOOBListener is a simple listener that pushes load reports to a channel.
 | 
						|
type testOOBListener struct {
 | 
						|
	cleanup      func()
 | 
						|
	loadReportCh chan *v3orcapb.OrcaLoadReport
 | 
						|
}
 | 
						|
 | 
						|
func newTestOOBListener() *testOOBListener {
 | 
						|
	return &testOOBListener{cleanup: func() {}, loadReportCh: make(chan *v3orcapb.OrcaLoadReport)}
 | 
						|
}
 | 
						|
 | 
						|
func (t *testOOBListener) Stop() { t.cleanup() }
 | 
						|
 | 
						|
func (t *testOOBListener) OnLoadReport(r *v3orcapb.OrcaLoadReport) {
 | 
						|
	t.loadReportCh <- r
 | 
						|
}
 | 
						|
 | 
						|
// TestProducer is a basic, end-to-end style test of an LB policy with an
 | 
						|
// OOBListener communicating with a server with an ORCA service.
 | 
						|
func (s) TestProducer(t *testing.T) {
 | 
						|
	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | 
						|
	defer cancel()
 | 
						|
 | 
						|
	// Use a fixed backoff for stream recreation.
 | 
						|
	oldBackoff := internal.DefaultBackoffFunc
 | 
						|
	internal.DefaultBackoffFunc = func(int) time.Duration { return 10 * time.Millisecond }
 | 
						|
	defer func() { internal.DefaultBackoffFunc = oldBackoff }()
 | 
						|
 | 
						|
	// Initialize listener for our ORCA server.
 | 
						|
	lis, err := testutils.LocalTCPListener()
 | 
						|
	if err != nil {
 | 
						|
		t.Fatal(err)
 | 
						|
	}
 | 
						|
 | 
						|
	// Register the OpenRCAService with a very short metrics reporting interval.
 | 
						|
	const shortReportingInterval = 50 * time.Millisecond
 | 
						|
	opts := orca.ServiceOptions{MinReportingInterval: shortReportingInterval}
 | 
						|
	internal.AllowAnyMinReportingInterval.(func(*orca.ServiceOptions))(&opts)
 | 
						|
	s := grpc.NewServer()
 | 
						|
	orcaSrv, err := orca.Register(s, opts)
 | 
						|
	if err != nil {
 | 
						|
		t.Fatalf("orca.Register failed: %v", err)
 | 
						|
	}
 | 
						|
	go s.Serve(lis)
 | 
						|
	defer s.Stop()
 | 
						|
 | 
						|
	// Create our client with an OOB listener in the LB policy it selects.
 | 
						|
	r := manual.NewBuilderWithScheme("whatever")
 | 
						|
	oobLis := newTestOOBListener()
 | 
						|
 | 
						|
	lisOpts := orca.OOBListenerOptions{ReportInterval: 50 * time.Millisecond}
 | 
						|
	li := &listenerInfo{listener: oobLis, opts: lisOpts}
 | 
						|
	addr := setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)
 | 
						|
	r.InitialState(resolver.State{Addresses: []resolver.Address{addr}})
 | 
						|
	cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
 | 
						|
	if err != nil {
 | 
						|
		t.Fatalf("grpc.Dial failed: %v", err)
 | 
						|
	}
 | 
						|
	defer cc.Close()
 | 
						|
 | 
						|
	// Ensure the OOB listener is stopped before the client is closed to avoid
 | 
						|
	// a potential irrelevant error in the logs.
 | 
						|
	defer oobLis.Stop()
 | 
						|
 | 
						|
	// Set a few metrics and wait for them on the client side.
 | 
						|
	orcaSrv.SetCPUUtilization(10)
 | 
						|
	orcaSrv.SetMemoryUtilization(100)
 | 
						|
	orcaSrv.SetUtilization("bob", 555)
 | 
						|
	loadReportWant := &v3orcapb.OrcaLoadReport{
 | 
						|
		CpuUtilization: 10,
 | 
						|
		MemUtilization: 100,
 | 
						|
		Utilization:    map[string]float64{"bob": 555},
 | 
						|
	}
 | 
						|
 | 
						|
testReport:
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case r := <-oobLis.loadReportCh:
 | 
						|
			t.Log("Load report received: ", r)
 | 
						|
			if proto.Equal(r, loadReportWant) {
 | 
						|
				// Success!
 | 
						|
				break testReport
 | 
						|
			}
 | 
						|
		case <-ctx.Done():
 | 
						|
			t.Fatalf("timed out waiting for load report: %v", loadReportWant)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Change and add metrics and wait for them on the client side.
 | 
						|
	orcaSrv.SetCPUUtilization(50)
 | 
						|
	orcaSrv.SetMemoryUtilization(200)
 | 
						|
	orcaSrv.SetUtilization("mary", 321)
 | 
						|
	loadReportWant = &v3orcapb.OrcaLoadReport{
 | 
						|
		CpuUtilization: 50,
 | 
						|
		MemUtilization: 200,
 | 
						|
		Utilization:    map[string]float64{"bob": 555, "mary": 321},
 | 
						|
	}
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case r := <-oobLis.loadReportCh:
 | 
						|
			t.Log("Load report received: ", r)
 | 
						|
			if proto.Equal(r, loadReportWant) {
 | 
						|
				// Success!
 | 
						|
				return
 | 
						|
			}
 | 
						|
		case <-ctx.Done():
 | 
						|
			t.Fatalf("timed out waiting for load report: %v", loadReportWant)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// fakeORCAService is a simple implementation of an ORCA service that pushes
 | 
						|
// requests it receives from clients to a channel and sends responses from a
 | 
						|
// channel back.  This allows tests to verify the client is sending requests
 | 
						|
// and processing responses properly.
 | 
						|
type fakeORCAService struct {
 | 
						|
	v3orcaservicegrpc.UnimplementedOpenRcaServiceServer
 | 
						|
 | 
						|
	reqCh  chan *v3orcaservicepb.OrcaLoadReportRequest
 | 
						|
	respCh chan interface{} // either *v3orcapb.OrcaLoadReport or error
 | 
						|
}
 | 
						|
 | 
						|
func newFakeORCAService() *fakeORCAService {
 | 
						|
	return &fakeORCAService{
 | 
						|
		reqCh:  make(chan *v3orcaservicepb.OrcaLoadReportRequest),
 | 
						|
		respCh: make(chan interface{}),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (f *fakeORCAService) close() {
 | 
						|
	close(f.respCh)
 | 
						|
}
 | 
						|
 | 
						|
func (f *fakeORCAService) StreamCoreMetrics(req *v3orcaservicepb.OrcaLoadReportRequest, stream v3orcaservicegrpc.OpenRcaService_StreamCoreMetricsServer) error {
 | 
						|
	f.reqCh <- req
 | 
						|
	for resp := range f.respCh {
 | 
						|
		if err, ok := resp.(error); ok {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		if err := stream.Send(resp.(*v3orcapb.OrcaLoadReport)); err != nil {
 | 
						|
			// In the event that a stream error occurs, a new stream will have
 | 
						|
			// been created that was waiting for this response message.  Push
 | 
						|
			// it back onto the channel and return.
 | 
						|
			//
 | 
						|
			// This happens because we range over respCh.  If we changed to
 | 
						|
			// instead select on respCh + stream.Context(), the same situation
 | 
						|
			// could still occur due to a race between noticing the two events,
 | 
						|
			// so such a workaround would still be needed to prevent flakiness.
 | 
						|
			f.respCh <- resp
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// TestProducerBackoff verifies that the ORCA producer applies the proper
 | 
						|
// backoff after stream failures.
 | 
						|
func (s) TestProducerBackoff(t *testing.T) {
 | 
						|
	grpctest.TLogger.ExpectErrorN("injected error", 4)
 | 
						|
 | 
						|
	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | 
						|
	defer cancel()
 | 
						|
 | 
						|
	// Provide a convenient way to expect backoff calls and return a minimal
 | 
						|
	// value.
 | 
						|
	const backoffShouldNotBeCalled = 9999 // Use to assert backoff function is not called.
 | 
						|
	const backoffAllowAny = -1            // Use to ignore any backoff calls.
 | 
						|
	expectedBackoff := backoffAllowAny
 | 
						|
	oldBackoff := internal.DefaultBackoffFunc
 | 
						|
	internal.DefaultBackoffFunc = func(got int) time.Duration {
 | 
						|
		if expectedBackoff == backoffShouldNotBeCalled {
 | 
						|
			t.Errorf("Unexpected backoff call; parameter = %v", got)
 | 
						|
		} else if expectedBackoff != backoffAllowAny {
 | 
						|
			if got != expectedBackoff {
 | 
						|
				t.Errorf("Unexpected backoff received; got %v want %v", got, expectedBackoff)
 | 
						|
			}
 | 
						|
		}
 | 
						|
		return time.Millisecond
 | 
						|
	}
 | 
						|
	defer func() { internal.DefaultBackoffFunc = oldBackoff }()
 | 
						|
 | 
						|
	// Initialize listener for our ORCA server.
 | 
						|
	lis, err := testutils.LocalTCPListener()
 | 
						|
	if err != nil {
 | 
						|
		t.Fatal(err)
 | 
						|
	}
 | 
						|
 | 
						|
	// Register our fake ORCA service.
 | 
						|
	s := grpc.NewServer()
 | 
						|
	fake := newFakeORCAService()
 | 
						|
	defer fake.close()
 | 
						|
	v3orcaservicegrpc.RegisterOpenRcaServiceServer(s, fake)
 | 
						|
	go s.Serve(lis)
 | 
						|
	defer s.Stop()
 | 
						|
 | 
						|
	// Define the report interval and a function to wait for it to be sent to
 | 
						|
	// the server.
 | 
						|
	const reportInterval = 123 * time.Second
 | 
						|
	awaitRequest := func(interval time.Duration) {
 | 
						|
		select {
 | 
						|
		case req := <-fake.reqCh:
 | 
						|
			if got := req.GetReportInterval().AsDuration(); got != interval {
 | 
						|
				t.Errorf("Unexpected report interval; got %v want %v", got, interval)
 | 
						|
			}
 | 
						|
		case <-ctx.Done():
 | 
						|
			t.Fatalf("Did not receive client request")
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Create our client with an OOB listener in the LB policy it selects.
 | 
						|
	r := manual.NewBuilderWithScheme("whatever")
 | 
						|
	oobLis := newTestOOBListener()
 | 
						|
 | 
						|
	lisOpts := orca.OOBListenerOptions{ReportInterval: reportInterval}
 | 
						|
	li := &listenerInfo{listener: oobLis, opts: lisOpts}
 | 
						|
	r.InitialState(resolver.State{Addresses: []resolver.Address{setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)}})
 | 
						|
	cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
 | 
						|
	if err != nil {
 | 
						|
		t.Fatalf("grpc.Dial failed: %v", err)
 | 
						|
	}
 | 
						|
	defer cc.Close()
 | 
						|
 | 
						|
	// Ensure the OOB listener is stopped before the client is closed to avoid
 | 
						|
	// a potential irrelevant error in the logs.
 | 
						|
	defer oobLis.Stop()
 | 
						|
 | 
						|
	// Define a load report to send and expect the client to see.
 | 
						|
	loadReportWant := &v3orcapb.OrcaLoadReport{
 | 
						|
		CpuUtilization: 10,
 | 
						|
		MemUtilization: 100,
 | 
						|
		Utilization:    map[string]float64{"bob": 555},
 | 
						|
	}
 | 
						|
 | 
						|
	// Unblock the fake.
 | 
						|
	awaitRequest(reportInterval)
 | 
						|
	fake.respCh <- loadReportWant
 | 
						|
	select {
 | 
						|
	case r := <-oobLis.loadReportCh:
 | 
						|
		t.Log("Load report received: ", r)
 | 
						|
		if proto.Equal(r, loadReportWant) {
 | 
						|
			// Success!
 | 
						|
			break
 | 
						|
		}
 | 
						|
	case <-ctx.Done():
 | 
						|
		t.Fatalf("timed out waiting for load report: %v", loadReportWant)
 | 
						|
	}
 | 
						|
 | 
						|
	// The next request should be immediate, since there was a message
 | 
						|
	// received.
 | 
						|
	expectedBackoff = backoffShouldNotBeCalled
 | 
						|
	fake.respCh <- status.Errorf(codes.Internal, "injected error")
 | 
						|
	awaitRequest(reportInterval)
 | 
						|
 | 
						|
	// The next requests will need to backoff.
 | 
						|
	expectedBackoff = 0
 | 
						|
	fake.respCh <- status.Errorf(codes.Internal, "injected error")
 | 
						|
	awaitRequest(reportInterval)
 | 
						|
	expectedBackoff = 1
 | 
						|
	fake.respCh <- status.Errorf(codes.Internal, "injected error")
 | 
						|
	awaitRequest(reportInterval)
 | 
						|
	expectedBackoff = 2
 | 
						|
	fake.respCh <- status.Errorf(codes.Internal, "injected error")
 | 
						|
	awaitRequest(reportInterval)
 | 
						|
	// The next request should be immediate, since there was a message
 | 
						|
	// received.
 | 
						|
	expectedBackoff = backoffShouldNotBeCalled
 | 
						|
 | 
						|
	// Send another valid response and wait for it on the client.
 | 
						|
	fake.respCh <- loadReportWant
 | 
						|
	select {
 | 
						|
	case r := <-oobLis.loadReportCh:
 | 
						|
		t.Log("Load report received: ", r)
 | 
						|
		if proto.Equal(r, loadReportWant) {
 | 
						|
			// Success!
 | 
						|
			break
 | 
						|
		}
 | 
						|
	case <-ctx.Done():
 | 
						|
		t.Fatalf("timed out waiting for load report: %v", loadReportWant)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// TestProducerMultipleListeners tests that multiple listeners works as
 | 
						|
// expected in a producer: requesting the proper interval and delivering the
 | 
						|
// update to all listeners.
 | 
						|
func (s) TestProducerMultipleListeners(t *testing.T) {
 | 
						|
	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | 
						|
	defer cancel()
 | 
						|
 | 
						|
	// Provide a convenient way to expect backoff calls and return a minimal
 | 
						|
	// value.
 | 
						|
	oldBackoff := internal.DefaultBackoffFunc
 | 
						|
	internal.DefaultBackoffFunc = func(got int) time.Duration {
 | 
						|
		return time.Millisecond
 | 
						|
	}
 | 
						|
	defer func() { internal.DefaultBackoffFunc = oldBackoff }()
 | 
						|
 | 
						|
	// Initialize listener for our ORCA server.
 | 
						|
	lis, err := testutils.LocalTCPListener()
 | 
						|
	if err != nil {
 | 
						|
		t.Fatal(err)
 | 
						|
	}
 | 
						|
 | 
						|
	// Register our fake ORCA service.
 | 
						|
	s := grpc.NewServer()
 | 
						|
	fake := newFakeORCAService()
 | 
						|
	defer fake.close()
 | 
						|
	v3orcaservicegrpc.RegisterOpenRcaServiceServer(s, fake)
 | 
						|
	go s.Serve(lis)
 | 
						|
	defer s.Stop()
 | 
						|
 | 
						|
	// Define the report interval and a function to wait for it to be sent to
 | 
						|
	// the server.
 | 
						|
	const reportInterval1 = 123 * time.Second
 | 
						|
	const reportInterval2 = 234 * time.Second
 | 
						|
	const reportInterval3 = 56 * time.Second
 | 
						|
	awaitRequest := func(interval time.Duration) {
 | 
						|
		select {
 | 
						|
		case req := <-fake.reqCh:
 | 
						|
			if got := req.GetReportInterval().AsDuration(); got != interval {
 | 
						|
				t.Errorf("Unexpected report interval; got %v want %v", got, interval)
 | 
						|
			}
 | 
						|
		case <-ctx.Done():
 | 
						|
			t.Fatalf("Did not receive client request")
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Create our client with an OOB listener in the LB policy it selects.
 | 
						|
	r := manual.NewBuilderWithScheme("whatever")
 | 
						|
	oobLis1 := newTestOOBListener()
 | 
						|
	lisOpts1 := orca.OOBListenerOptions{ReportInterval: reportInterval1}
 | 
						|
	li := &listenerInfo{listener: oobLis1, opts: lisOpts1}
 | 
						|
	r.InitialState(resolver.State{Addresses: []resolver.Address{setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)}})
 | 
						|
	cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
 | 
						|
	if err != nil {
 | 
						|
		t.Fatalf("grpc.Dial failed: %v", err)
 | 
						|
	}
 | 
						|
	defer cc.Close()
 | 
						|
 | 
						|
	// Ensure the OOB listener is stopped before the client is closed to avoid
 | 
						|
	// a potential irrelevant error in the logs.
 | 
						|
	defer oobLis1.Stop()
 | 
						|
 | 
						|
	oobLis2 := newTestOOBListener()
 | 
						|
	lisOpts2 := orca.OOBListenerOptions{ReportInterval: reportInterval2}
 | 
						|
 | 
						|
	oobLis3 := newTestOOBListener()
 | 
						|
	lisOpts3 := orca.OOBListenerOptions{ReportInterval: reportInterval3}
 | 
						|
 | 
						|
	// Define a load report to send and expect the client to see.
 | 
						|
	loadReportWant := &v3orcapb.OrcaLoadReport{
 | 
						|
		CpuUtilization: 10,
 | 
						|
		MemUtilization: 100,
 | 
						|
		Utilization:    map[string]float64{"bob": 555},
 | 
						|
	}
 | 
						|
 | 
						|
	// Receive reports and update counts for the three listeners.
 | 
						|
	var reportsMu sync.Mutex
 | 
						|
	var reportsReceived1, reportsReceived2, reportsReceived3 int
 | 
						|
	go func() {
 | 
						|
		for {
 | 
						|
			select {
 | 
						|
			case r := <-oobLis1.loadReportCh:
 | 
						|
				t.Log("Load report 1 received: ", r)
 | 
						|
				if !proto.Equal(r, loadReportWant) {
 | 
						|
					t.Errorf("Unexpected report received: %+v", r)
 | 
						|
				}
 | 
						|
				reportsMu.Lock()
 | 
						|
				reportsReceived1++
 | 
						|
				reportsMu.Unlock()
 | 
						|
			case r := <-oobLis2.loadReportCh:
 | 
						|
				t.Log("Load report 2 received: ", r)
 | 
						|
				if !proto.Equal(r, loadReportWant) {
 | 
						|
					t.Errorf("Unexpected report received: %+v", r)
 | 
						|
				}
 | 
						|
				reportsMu.Lock()
 | 
						|
				reportsReceived2++
 | 
						|
				reportsMu.Unlock()
 | 
						|
			case r := <-oobLis3.loadReportCh:
 | 
						|
				t.Log("Load report 3 received: ", r)
 | 
						|
				if !proto.Equal(r, loadReportWant) {
 | 
						|
					t.Errorf("Unexpected report received: %+v", r)
 | 
						|
				}
 | 
						|
				reportsMu.Lock()
 | 
						|
				reportsReceived3++
 | 
						|
				reportsMu.Unlock()
 | 
						|
			case <-ctx.Done():
 | 
						|
				// Test has ended; exit
 | 
						|
				return
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	// checkReports is a helper function to check the report counts for the three listeners.
 | 
						|
	checkReports := func(r1, r2, r3 int) {
 | 
						|
		t.Helper()
 | 
						|
		for ctx.Err() == nil {
 | 
						|
			reportsMu.Lock()
 | 
						|
			if r1 == reportsReceived1 && r2 == reportsReceived2 && r3 == reportsReceived3 {
 | 
						|
				// Success!
 | 
						|
				reportsMu.Unlock()
 | 
						|
				return
 | 
						|
			}
 | 
						|
			if reportsReceived1 > r1 || reportsReceived2 > r2 || reportsReceived3 > r3 {
 | 
						|
				reportsMu.Unlock()
 | 
						|
				t.Fatalf("received excess reports. got %v %v %v; want %v %v %v", reportsReceived1, reportsReceived2, reportsReceived3, r1, r2, r3)
 | 
						|
				return
 | 
						|
			}
 | 
						|
			reportsMu.Unlock()
 | 
						|
			time.Sleep(10 * time.Millisecond)
 | 
						|
		}
 | 
						|
		t.Fatalf("timed out waiting for reports received. got %v %v %v; want %v %v %v", reportsReceived1, reportsReceived2, reportsReceived3, r1, r2, r3)
 | 
						|
	}
 | 
						|
 | 
						|
	// Only 1 listener; expect reportInterval1 to be used and expect the report
 | 
						|
	// to be sent to the listener.
 | 
						|
	awaitRequest(reportInterval1)
 | 
						|
	fake.respCh <- loadReportWant
 | 
						|
	checkReports(1, 0, 0)
 | 
						|
 | 
						|
	// Register listener 2 with a less frequent interval; no need to recreate
 | 
						|
	// stream.  Report should go to both listeners.
 | 
						|
	oobLis2.cleanup = orca.RegisterOOBListener(li.sc, oobLis2, lisOpts2)
 | 
						|
	fake.respCh <- loadReportWant
 | 
						|
	checkReports(2, 1, 0)
 | 
						|
 | 
						|
	// Register listener 3 with a more frequent interval; stream is recreated
 | 
						|
	// with this interval after the next report is received.  The first report
 | 
						|
	// will go to all three listeners.
 | 
						|
	oobLis3.cleanup = orca.RegisterOOBListener(li.sc, oobLis3, lisOpts3)
 | 
						|
	fake.respCh <- loadReportWant
 | 
						|
	checkReports(3, 2, 1)
 | 
						|
	awaitRequest(reportInterval3)
 | 
						|
 | 
						|
	// Another report without a change in listeners should go to all three listeners.
 | 
						|
	fake.respCh <- loadReportWant
 | 
						|
	checkReports(4, 3, 2)
 | 
						|
 | 
						|
	// Stop listener 2.  This does not affect the interval as listener 3 is
 | 
						|
	// still the shortest.  The next update goes to listeners 1 and 3.
 | 
						|
	oobLis2.Stop()
 | 
						|
	fake.respCh <- loadReportWant
 | 
						|
	checkReports(5, 3, 3)
 | 
						|
 | 
						|
	// Stop listener 3.  This makes the interval longer, with stream recreation
 | 
						|
	// delayed until the next report is received.  Reports should only go to
 | 
						|
	// listener 1 now.
 | 
						|
	oobLis3.Stop()
 | 
						|
	fake.respCh <- loadReportWant
 | 
						|
	checkReports(6, 3, 3)
 | 
						|
	awaitRequest(reportInterval1)
 | 
						|
	// Another report without a change in listeners should go to the first listener.
 | 
						|
	fake.respCh <- loadReportWant
 | 
						|
	checkReports(7, 3, 3)
 | 
						|
}
 |