xds: Improve xds_client_test.go (#3270)

Simplified the tests by only testing what is required and faking out
whatever can be faked out.

Also added a fakexds.Server implementation. Will switch other users of
the existing fakeserver implementation after this PR is merged.
This commit is contained in:
Easwar Swaminathan 2019-12-18 14:17:35 -08:00 committed by GitHub
parent 04c179315b
commit 660df6a06b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 318 additions and 322 deletions

View File

@ -19,28 +19,20 @@
package balancer
import (
"fmt"
"errors"
"testing"
"time"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
"github.com/golang/protobuf/proto"
anypb "github.com/golang/protobuf/ptypes/any"
structpb "github.com/golang/protobuf/ptypes/struct"
wrpb "github.com/golang/protobuf/ptypes/wrappers"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/lrs"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"
"google.golang.org/grpc/xds/internal/client/fakexds"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakexds"
)
const (
@ -50,357 +42,162 @@ const (
var (
testServiceName = "test/foo"
testEDSClusterName = "test/service/eds"
testClusterLoadAssignment = &xdspb.ClusterLoadAssignment{
ClusterName: testEDSClusterName,
Endpoints: []*endpointpb.LocalityLbEndpoints{{
Locality: &corepb.Locality{
Region: "asia-east1",
Zone: "1",
SubZone: "sa",
},
LbEndpoints: []*endpointpb.LbEndpoint{{
HostIdentifier: &endpointpb.LbEndpoint_Endpoint{
Endpoint: &endpointpb.Endpoint{
Address: &corepb.Address{
Address: &corepb.Address_SocketAddress{
SocketAddress: &corepb.SocketAddress{
Address: "1.1.1.1",
PortSpecifier: &corepb.SocketAddress_PortValue{
PortValue: 10001,
},
ResolverName: "dns",
},
},
},
HealthCheckConfig: nil,
},
},
Metadata: &corepb.Metadata{
FilterMetadata: map[string]*structpb.Struct{
"xx.lb": {
Fields: map[string]*structpb.Value{
"endpoint_name": {
Kind: &structpb.Value_StringValue{
StringValue: "some.endpoint.name",
},
},
},
},
},
},
}},
LoadBalancingWeight: &wrpb.UInt32Value{
Value: 1,
},
Priority: 0,
}},
}
marshaledClusterLoadAssignment, _ = proto.Marshal(testClusterLoadAssignment)
testEDSResp = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: edsType,
Value: marshaledClusterLoadAssignment,
},
},
TypeUrl: edsType,
}
)
func (s) TestEDSClientResponseHandling(t *testing.T) {
td, cleanup := fakexds.StartServer(t)
defer cleanup()
edsRespChan := make(chan *xdsclient.EDSUpdate, 10)
newEDS := func(i *xdsclient.EDSUpdate) error {
edsRespChan <- i
return nil
// TestClientWrapperWatchEDS verifies that the clientWrapper registers an
// EDS watch for expected resource upon receiving an update from the top-level
// edsBalancer.
//
// The test does the following:
// * Starts a fake xDS server.
// * Creates a clientWrapper.
// * Sends updates with different edsServiceNames and expects new watches to be
// registered.
func (s) TestClientWrapperWatchEDS(t *testing.T) {
fakeServer, cleanup, err := fakexds.StartServer()
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
client := newXDSClientWrapper(newEDS, func() {}, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil)
defer client.close()
defer cleanup()
cw := newXDSClientWrapper(nil, nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil)
defer cw.close()
// Test that EDS requests sent match XDSConfig.
for _, test := range []struct {
name string
edsServiceName string
wantResourceName string
}{
{
// Update with an empty edsServiceName should trigger an EDS watch
// for the user's dial target.
name: "empty-edsServiceName",
edsServiceName: "",
// EDSServiceName is an empty string, user's dialing target will be
// set in resource names.
wantResourceName: testServiceName,
},
{
name: "non-empty-edsServiceName",
edsServiceName: testEDSClusterName,
wantResourceName: testEDSClusterName,
// Update with an non-empty edsServiceName should trigger an EDS
// watch for the same.
name: "first-non-empty-edsServiceName",
edsServiceName: "foobar-1",
wantResourceName: "foobar-1",
},
{
// Also test the case where the edsServerName changes from one
// non-empty name to another, and make sure a new watch is
// registered.
name: "second-non-empty-edsServiceName",
edsServiceName: "foobar-2",
wantResourceName: "foobar-2",
},
} {
client.handleUpdate(&XDSConfig{
BalancerName: td.Address,
t.Run(test.name, func(t *testing.T) {
cw.handleUpdate(&XDSConfig{
BalancerName: fakeServer.Address,
EDSServiceName: test.edsServiceName,
LrsLoadReportingServerName: nil,
}, nil)
req := <-td.RequestChan
if req.Err != nil {
t.Fatalf("EDS RPC failed with err: %v", req.Err)
req, err := fakeServer.XDSRequestChan.Receive()
if err != nil {
t.Fatalf("EDS RPC failed with err: %v", err)
}
wantReq1 := &xdspb.DiscoveryRequest{
edsReq := req.(*fakexds.Request)
if edsReq.Err != nil {
t.Fatalf("EDS RPC failed with err: %v", edsReq.Err)
}
wantReq := &xdspb.DiscoveryRequest{
TypeUrl: edsType,
ResourceNames: []string{test.wantResourceName},
Node: &corepb.Node{},
}
if !proto.Equal(req.Req, wantReq1) {
t.Fatalf("%v: got EDS request %v, expected: %v, diff: %s", test.name, req.Req, wantReq1, cmp.Diff(req.Req, wantReq1, cmp.Comparer(proto.Equal)))
if !proto.Equal(edsReq.Req, wantReq) {
t.Fatalf("got EDS request %v, expected: %v, diff: %s", edsReq.Req, wantReq, cmp.Diff(edsReq.Req, wantReq, cmp.Comparer(proto.Equal)))
}
})
}
}
// Make sure that the responses from the stream are also handled.
td.ResponseChan <- &fakexds.Response{Resp: testEDSResp}
gotResp := <-edsRespChan
want, err := xdsclient.ParseEDSRespProto(testClusterLoadAssignment)
if err != nil {
t.Fatalf("parsing wanted EDS response failed: %v", err)
}
if !cmp.Equal(gotResp, want) {
t.Fatalf("received unexpected EDS response, got %v, want %v", gotResp, want)
}
}
type testXDSClient struct {
clusterNameChan *testutils.Channel
edsCb func(*xdsclient.EDSUpdate, error)
}
func newTestXDSClient() *testXDSClient {
return &testXDSClient{
clusterNameChan: testutils.NewChannel(),
}
}
func (c *testXDSClient) WatchEDS(clusterName string, edsCb func(*xdsclient.EDSUpdate, error)) (cancel func()) {
c.clusterNameChan.Send(clusterName)
c.edsCb = edsCb
return func() {}
}
func (c *testXDSClient) ReportLoad(server string, clusterName string, loadStore lrs.Store) (cancel func()) {
panic("implement me 2")
}
func (c *testXDSClient) Close() {
panic("implement me 3")
}
// Test that error from the xds client is handled correctly.
func (s) TestEDSClientResponseErrorHandling(t *testing.T) {
td, cleanup := fakexds.StartServer(t)
defer cleanup()
edsRespChan := make(chan *xdsclient.EDSUpdate, 10)
newEDS := func(i *xdsclient.EDSUpdate) error {
edsRespChan <- i
// TestClientWrapperHandleUpdateError verifies that the clientWrapper handles
// errors from the edsWatch callback appropriately.
//
// The test does the following:
// * Creates a clientWrapper.
// * Creates a fakexds.Client and passes it to the clientWrapper in attributes.
// * Verifies the clientWrapper registers an EDS watch.
// * Forces the fakexds.Client to invoke the registered EDS watch callback with
// an error. Verifies that the wrapper does not invoke the top-level
// edsBalancer with the received error.
func (s) TestClientWrapperHandleUpdateError(t *testing.T) {
edsRespChan := testutils.NewChannel()
newEDS := func(update *xdsclient.EDSUpdate) error {
edsRespChan.Send(update)
return nil
}
client := newXDSClientWrapper(newEDS, func() {}, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil)
defer client.close()
cw := newXDSClientWrapper(newEDS, nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil)
defer cw.close()
// Create a client to be passed in attributes.
c := newTestXDSClient()
client.handleUpdate(&XDSConfig{
BalancerName: td.Address,
EDSServiceName: testEDSClusterName,
LrsLoadReportingServerName: nil,
}, attributes.New(xdsinternal.XDSClientID, c),
)
if gotClusterName, err := c.clusterNameChan.Receive(); err != nil || gotClusterName != testEDSClusterName {
t.Fatalf("got EDS watch clusterName %v, expected: %v", gotClusterName, testEDSClusterName)
xdsC := fakexds.NewClient()
cw.handleUpdate(&XDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC))
gotCluster, err := xdsC.WaitForWatchEDS()
if err != nil {
t.Fatalf("xdsClient.WatchEDS failed with error: %v", err)
}
c.edsCb(nil, fmt.Errorf("testing err"))
if gotCluster != testEDSClusterName {
t.Fatalf("xdsClient.WatchEDS() called with cluster: %v, want %v", gotCluster, testEDSClusterName)
}
xdsC.InvokeWatchEDSCallback(nil, errors.New("EDS watch callback error"))
// The ballback is called with an error, expect no update from edsRespChan.
// The callback is called with an error, expect no update from edsRespChan.
//
// TODO: check for loseContact() when errors indicating "lose contact" are
// handled correctly.
timer := time.NewTimer(time.Second)
defer timer.Stop()
select {
case <-timer.C:
case resp := <-edsRespChan:
t.Fatalf("unexpected resp: %v", resp)
if gotUpdate, gotErr := edsRespChan.Receive(); gotErr != testutils.ErrRecvTimeout {
t.Fatalf("edsBalancer got edsUpdate {%+v, %v}, when none was expected", gotUpdate, gotErr)
}
}
// Test that if xds_client is in attributes, the xdsclientnew function will not
// be called, and the xds_client from attributes will be used.
//
// And also that when xds_client in attributes is updated, the new one will be
// used, and watch will be restarted.
func (s) TestEDSClientInAttributes(t *testing.T) {
edsRespChan := make(chan *xdsclient.EDSUpdate, 10)
newEDS := func(i *xdsclient.EDSUpdate) error {
edsRespChan <- i
return nil
}
// TestClientWrapperGetsXDSClientInAttributes verfies the case where the
// clientWrapper receives the xdsClient to use in the attributes section of the
// update.
func (s) TestClientWrapperGetsXDSClientInAttributes(t *testing.T) {
oldxdsclientNew := xdsclientNew
xdsclientNew = func(opts xdsclient.Options) (clientInterface xdsClientInterface, e error) {
xdsclientNew = func(_ xdsclient.Options) (xdsClientInterface, error) {
t.Fatalf("unexpected call to xdsclientNew when xds_client is set in attributes")
return nil, nil
}
defer func() { xdsclientNew = oldxdsclientNew }()
td, cleanup := fakexds.StartServer(t)
defer cleanup()
// Create a client to be passed in attributes.
c, _ := oldxdsclientNew(xdsclient.Options{
Config: bootstrap.Config{
BalancerName: td.Address,
Creds: grpc.WithInsecure(),
NodeProto: &corepb.Node{},
},
})
// Need to manually close c because xdsclientWrapper won't close it (it's
// from attributes).
defer c.Close()
cw := newXDSClientWrapper(nil, nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil)
defer cw.close()
client := newXDSClientWrapper(newEDS, func() {}, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil)
defer client.close()
client.handleUpdate(
&XDSConfig{EDSServiceName: testEDSClusterName, LrsLoadReportingServerName: nil},
attributes.New(xdsinternal.XDSClientID, c),
)
expectedReq := &xdspb.DiscoveryRequest{
TypeUrl: edsType,
ResourceNames: []string{testEDSClusterName},
Node: &corepb.Node{},
// Verify that the eds watch is registered for the expected resource name.
xdsC1 := fakexds.NewClient()
cw.handleUpdate(&XDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC1))
gotCluster, err := xdsC1.WaitForWatchEDS()
if err != nil {
t.Fatalf("xdsClient.WatchEDS failed with error: %v", err)
}
if gotCluster != testEDSClusterName {
t.Fatalf("xdsClient.WatchEDS() called with cluster: %v, want %v", gotCluster, testEDSClusterName)
}
// Make sure the requests are sent to the correct td.
req := <-td.RequestChan
if req.Err != nil {
t.Fatalf("EDS RPC failed with err: %v", req.Err)
// Pass a new client in the attributes. Verify that the watch is
// re-registered on the new client, and that the old client is not closed
// (because clientWrapper only closes clients that it creates, it does not
// close client that are passed through attributes).
xdsC2 := fakexds.NewClient()
cw.handleUpdate(&XDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC2))
gotCluster, err = xdsC2.WaitForWatchEDS()
if err != nil {
t.Fatalf("xdsClient.WatchEDS failed with error: %v", err)
}
if !proto.Equal(req.Req, expectedReq) {
t.Fatalf("got EDS request %T, expected: %T, diff: %s", req.Req, expectedReq, cmp.Diff(req.Req, expectedReq, cmp.Comparer(proto.Equal)))
if gotCluster != testEDSClusterName {
t.Fatalf("xdsClient.WatchEDS() called with cluster: %v, want %v", gotCluster, testEDSClusterName)
}
td2, cleanup2 := fakexds.StartServer(t)
defer cleanup2()
// Create a client to be passed in attributes.
c2, _ := oldxdsclientNew(xdsclient.Options{
Config: bootstrap.Config{
BalancerName: td2.Address,
Creds: grpc.WithInsecure(),
NodeProto: &corepb.Node{},
},
})
// Need to manually close c because xdsclientWrapper won't close it (it's
// from attributes).
defer c2.Close()
// Update with a new xds_client in attributes.
client.handleUpdate(
&XDSConfig{EDSServiceName: "", LrsLoadReportingServerName: nil},
attributes.New(xdsinternal.XDSClientID, c2),
)
expectedReq2 := &xdspb.DiscoveryRequest{
TypeUrl: edsType,
// The edsServiceName in new update is an empty string, user's dial
// target should be used as eds name to watch.
ResourceNames: []string{testServiceName},
Node: &corepb.Node{},
}
// Make sure the requests are sent to the correct td.
req2 := <-td2.RequestChan
if req.Err != nil {
t.Fatalf("EDS RPC failed with err: %v", req.Err)
}
if !proto.Equal(req2.Req, expectedReq2) {
t.Fatalf("got EDS request %T, expected: %T, diff: %s", req2.Req, expectedReq, cmp.Diff(req2.Req, expectedReq2, cmp.Comparer(proto.Equal)))
}
}
// Test that when edsServiceName from service config is updated, the new one
// will be watched.
func (s) TestEDSServiceNameUpdate(t *testing.T) {
edsRespChan := make(chan *xdsclient.EDSUpdate, 10)
newEDS := func(i *xdsclient.EDSUpdate) error {
edsRespChan <- i
return nil
}
oldxdsclientNew := xdsclientNew
xdsclientNew = func(opts xdsclient.Options) (clientInterface xdsClientInterface, e error) {
t.Fatalf("unexpected call to xdsclientNew when xds_client is set in attributes")
return nil, nil
}
defer func() { xdsclientNew = oldxdsclientNew }()
td, cleanup := fakexds.StartServer(t)
defer cleanup()
// Create a client to be passed in attributes.
c, _ := oldxdsclientNew(xdsclient.Options{
Config: bootstrap.Config{
BalancerName: td.Address,
Creds: grpc.WithInsecure(),
NodeProto: &corepb.Node{},
},
})
// Need to manually close c because xdsclientWrapper won't close it (it's
// from attributes).
defer c.Close()
client := newXDSClientWrapper(newEDS, func() {}, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil)
defer client.close()
client.handleUpdate(
&XDSConfig{EDSServiceName: testEDSClusterName, LrsLoadReportingServerName: nil},
attributes.New(xdsinternal.XDSClientID, c),
)
expectedReq := &xdspb.DiscoveryRequest{
TypeUrl: edsType,
ResourceNames: []string{testEDSClusterName},
Node: &corepb.Node{},
}
// Make sure the requests are sent to the correct td.
req := <-td.RequestChan
if req.Err != nil {
t.Fatalf("EDS RPC failed with err: %v", req.Err)
}
if !proto.Equal(req.Req, expectedReq) {
t.Fatalf("got EDS request %T, expected: %T, diff: %s", req.Req, expectedReq, cmp.Diff(req.Req, expectedReq, cmp.Comparer(proto.Equal)))
}
// Update with a new edsServiceName.
client.handleUpdate(
&XDSConfig{EDSServiceName: "", LrsLoadReportingServerName: nil},
attributes.New(xdsinternal.XDSClientID, c),
)
expectedReq2 := &xdspb.DiscoveryRequest{
TypeUrl: edsType,
// The edsServiceName in new update is an empty string, user's dial
// target should be used as eds name to watch.
ResourceNames: []string{testServiceName},
Node: &corepb.Node{},
}
// Make sure the requests are sent to the correct td.
req2 := <-td.RequestChan
if req.Err != nil {
t.Fatalf("EDS RPC failed with err: %v", req.Err)
}
if !proto.Equal(req2.Req, expectedReq2) {
t.Fatalf("got EDS request %T, expected: %T, diff: %s", req2.Req, expectedReq, cmp.Diff(req2.Req, expectedReq2, cmp.Comparer(proto.Equal)))
if err := xdsC1.WaitForClose(); err != testutils.ErrRecvTimeout {
t.Fatalf("clientWrapper closed xdsClient received in attributes")
}
}

View File

@ -652,7 +652,7 @@ func (s) TestXDSSubConnStateChange(t *testing.T) {
fblb.waitForSubConnStateChange(&scStateChange{sc: fsc, state: state})
}
func TestXdsBalancerConfigParsing(t *testing.T) {
func TestXDSBalancerConfigParsing(t *testing.T) {
const testEDSName = "eds.service"
var testLRSName = "lrs.server"
b := bytes.NewBuffer(nil)

View File

@ -124,6 +124,13 @@ func (xdsC *Client) Close() {
xdsC.closeCh.Send(nil)
}
// WaitForClose waits for Close to be invoked on this client within a
// reasonable timeout, and returns testutils.ErrRecvTimeout otherwise.
func (xdsC *Client) WaitForClose() error {
_, err := xdsC.closeCh.Receive()
return err
}
// Name returns the name of the xds client.
func (xdsC *Client) Name() string {
return xdsC.name

View File

@ -0,0 +1,192 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package fakexds
import (
"fmt"
"io"
"net"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/testutils"
discoverypb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
)
// TODO: Make this a var or a field in the server if there is a need to use a
// value other than this default.
const defaultChannelBufferSize = 50
// Request wraps the request protobuf (xds/LRS) and error received by the
// Server in a call to stream.Recv().
type Request struct {
Req proto.Message
Err error
}
// Response wraps the response protobuf (xds/LRS) and error that the Server
// should send out to the client through a call to stream.Send()
type Response struct {
Resp proto.Message
Err error
}
// Server is a fake implementation of xDS and LRS protocols. It listens on the
// same port for both services and exposes a bunch of channels to send/receive
// messages.
type Server struct {
// XDSRequestChan is a channel on which received xDS requests are made
// available to the users of this Server.
XDSRequestChan *testutils.Channel
// XDSResponseChan is a channel on which the Server accepts xDS responses
// to be sent to the client.
XDSResponseChan chan *Response
// LRSRequestChan is a channel on which received LRS requests are made
// available to the users of this Server.
LRSRequestChan *testutils.Channel
// LRSResponseChan is a channel on which the Server accepts the LRS
// response to be sent to the client.
LRSResponseChan chan *Response
// Address is the host:port on which the Server is listening for requests.
Address string
// The underlying fake implementation of xDS and LRS.
xdsS *xdsServer
lrsS *lrsServer
}
// StartServer makes a new Server and gets it to start listening on a local
// port for gRPC requests. The returned cancel function should be invoked by
// the caller upon completion of the test.
func StartServer() (*Server, func(), error) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, func() {}, fmt.Errorf("net.Listen() failed: %v", err)
}
s := &Server{
XDSRequestChan: testutils.NewChannelWithSize(defaultChannelBufferSize),
LRSRequestChan: testutils.NewChannelWithSize(defaultChannelBufferSize),
XDSResponseChan: make(chan *Response, defaultChannelBufferSize),
LRSResponseChan: make(chan *Response, 1), // The server only ever sends one response.
Address: lis.Addr().String(),
}
s.xdsS = &xdsServer{reqChan: s.XDSRequestChan, respChan: s.XDSResponseChan}
s.lrsS = &lrsServer{reqChan: s.LRSRequestChan, respChan: s.LRSResponseChan}
server := grpc.NewServer()
lrsgrpc.RegisterLoadReportingServiceServer(server, s.lrsS)
adsgrpc.RegisterAggregatedDiscoveryServiceServer(server, s.xdsS)
go server.Serve(lis)
return s, func() { server.Stop() }, nil
}
type xdsServer struct {
reqChan *testutils.Channel
respChan chan *Response
}
func (xdsS *xdsServer) StreamAggregatedResources(s adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
errCh := make(chan error, 2)
go func() {
for {
req, err := s.Recv()
if err != nil {
errCh <- err
return
}
xdsS.reqChan.Send(&Request{req, err})
}
}()
go func() {
var retErr error
defer func() {
errCh <- retErr
}()
for {
select {
case r := <-xdsS.respChan:
if r.Err != nil {
retErr = r.Err
return
}
if err := s.Send(r.Resp.(*discoverypb.DiscoveryResponse)); err != nil {
retErr = err
return
}
case <-s.Context().Done():
retErr = s.Context().Err()
return
}
}
}()
if err := <-errCh; err != nil {
return err
}
return nil
}
func (xdsS *xdsServer) DeltaAggregatedResources(adsgrpc.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
return status.Error(codes.Unimplemented, "")
}
type lrsServer struct {
reqChan *testutils.Channel
respChan chan *Response
}
func (lrsS *lrsServer) StreamLoadStats(s lrsgrpc.LoadReportingService_StreamLoadStatsServer) error {
req, err := s.Recv()
if err != nil {
return err
}
lrsS.reqChan.Send(&Request{req, err})
select {
case r := <-lrsS.respChan:
if r.Err != nil {
return r.Err
}
if err := s.Send(r.Resp.(*lrspb.LoadStatsResponse)); err != nil {
return err
}
case <-s.Context().Done():
return s.Context().Err()
}
for {
req, err := s.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
lrsS.reqChan.Send(&Request{req, err})
}
}