xds: handle EDS in xds client (#3181)

This commit is contained in:
Menghan Li 2019-11-18 15:17:21 -08:00 committed by GitHub
parent 967379b15b
commit 71ba135a58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 527 additions and 104 deletions

View File

@ -38,8 +38,7 @@ import (
)
const (
edsType = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
endpointRequired = "endpoints_required"
edsType = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
)
// client is responsible for connecting to the specified traffic director, passing the received

View File

@ -26,6 +26,8 @@ import (
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
typepb "github.com/envoyproxy/go-control-plane/envoy/type"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/xds/internal"
)
@ -161,3 +163,69 @@ func ParseEDSRespProtoForTesting(m *xdspb.ClusterLoadAssignment) *EDSUpdate {
}
return u
}
// newEDSRequest generates an EDS request proto for the provided clusterName, to
// be sent out on the wire.
func (v2c *v2Client) newEDSRequest(clusterName []string) *xdspb.DiscoveryRequest {
return &xdspb.DiscoveryRequest{
Node: v2c.nodeProto,
TypeUrl: endpointURL,
ResourceNames: clusterName,
}
}
// sendEDS sends an EDS request for provided clusterName on the provided stream.
func (v2c *v2Client) sendEDS(stream adsStream, clusterName []string) bool {
if err := stream.Send(v2c.newEDSRequest(clusterName)); err != nil {
grpclog.Warningf("xds: EDS request for resource %v failed: %v", clusterName, err)
return false
}
return true
}
func (v2c *v2Client) handleEDSResponse(resp *xdspb.DiscoveryResponse) error {
v2c.mu.Lock()
defer v2c.mu.Unlock()
wi := v2c.watchMap[edsResource]
if wi == nil {
return fmt.Errorf("xds: no EDS watcher found when handling EDS response: %+v", resp)
}
var returnUpdate *EDSUpdate
for _, r := range resp.GetResources() {
var resource ptypes.DynamicAny
if err := ptypes.UnmarshalAny(r, &resource); err != nil {
return fmt.Errorf("xds: failed to unmarshal resource in EDS response: %v", err)
}
cla, ok := resource.Message.(*xdspb.ClusterLoadAssignment)
if !ok {
return fmt.Errorf("xds: unexpected resource type: %T in EDS response", resource.Message)
}
if cla.GetClusterName() != wi.target[0] {
// We won't validate the remaining resources. If one of the
// uninteresting ones is invalid, we will still ACK the response.
continue
}
u, err := ParseEDSRespProto(cla)
if err != nil {
return err
}
returnUpdate = u
// Break from the loop because the request resource is found. But
// this also means we won't validate the remaining resources. If one
// of the uninteresting ones is invalid, we will still ACK the
// response.
break
}
if returnUpdate != nil {
wi.expiryTimer.Stop()
wi.callback.(edsCallback)(returnUpdate, nil)
}
return nil
}

View File

@ -18,22 +18,303 @@
package client
import (
"errors"
"fmt"
"testing"
"time"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/golang/protobuf/ptypes"
anypb "github.com/golang/protobuf/ptypes/any"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/client/fakexds"
)
// Only error cases are tested, normal cases are covered because EDS balancer
// tests build an EDS responses and parses them.
// TODO: add more tests, with error cases and normal cases.
// Test that parsing fails if EDS response doesn't have all priorities.
// Priorities should range from 0 (highest) to N (lowest) without skipping
func TestParseEDSRespProtoPriorityError(t *testing.T) {
clab0 := NewClusterLoadAssignmentBuilder("test", nil)
clab0.AddLocality("locality-1", 1, 0, []string{"addr1:314"}, nil)
clab0.AddLocality("locality-2", 1, 2, []string{"addr2:159"}, nil)
_, err := ParseEDSRespProto(clab0.Build())
if err == nil {
t.Errorf("ParseEDSRespProto() error = %v, wantErr <non-nil>", err)
return
func TestParseEDSRespProto(t *testing.T) {
tests := []struct {
name string
m *xdspb.ClusterLoadAssignment
want *EDSUpdate
wantErr bool
}{
{
name: "missing-priority",
m: func() *xdspb.ClusterLoadAssignment {
clab0 := NewClusterLoadAssignmentBuilder("test", nil)
clab0.AddLocality("locality-1", 1, 0, []string{"addr1:314"}, nil)
clab0.AddLocality("locality-2", 1, 2, []string{"addr2:159"}, nil)
return clab0.Build()
}(),
want: nil,
wantErr: true,
},
{
name: "missing-locality-ID",
m: func() *xdspb.ClusterLoadAssignment {
clab0 := NewClusterLoadAssignmentBuilder("test", nil)
clab0.AddLocality("", 1, 0, []string{"addr1:314"}, nil)
return clab0.Build()
}(),
want: nil,
wantErr: true,
},
{
name: "good",
m: func() *xdspb.ClusterLoadAssignment {
clab0 := NewClusterLoadAssignmentBuilder("test", nil)
clab0.AddLocality("locality-1", 1, 1, []string{"addr1:314"}, &AddLocalityOptions{
Health: []corepb.HealthStatus{corepb.HealthStatus_UNHEALTHY},
Weight: []uint32{271},
})
clab0.AddLocality("locality-2", 1, 0, []string{"addr2:159"}, &AddLocalityOptions{
Health: []corepb.HealthStatus{corepb.HealthStatus_DRAINING},
Weight: []uint32{828},
})
return clab0.Build()
}(),
want: &EDSUpdate{
Drops: nil,
Localities: []Locality{
{
Endpoints: []Endpoint{{
Address: "addr1:314",
HealthStatus: EndpointHealthStatusUnhealthy,
Weight: 271,
}},
ID: internal.Locality{SubZone: "locality-1"},
Priority: 1,
Weight: 1,
},
{
Endpoints: []Endpoint{{
Address: "addr2:159",
HealthStatus: EndpointHealthStatusDraining,
Weight: 828,
}},
ID: internal.Locality{SubZone: "locality-2"},
Priority: 0,
Weight: 1,
},
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ParseEDSRespProto(tt.m)
if (err != nil) != tt.wantErr {
t.Errorf("ParseEDSRespProto() error = %v, wantErr %v", err, tt.wantErr)
return
}
if d := cmp.Diff(got, tt.want); d != "" {
t.Errorf("ParseEDSRespProto() got = %v, want %v, diff: %v", got, tt.want, d)
}
})
}
}
var (
badlyMarshaledEDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: endpointURL,
Value: []byte{1, 2, 3, 4},
},
},
TypeUrl: endpointURL,
}
badResourceTypeInEDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: httpConnManagerURL,
Value: marshaledConnMgr1,
},
},
TypeUrl: endpointURL,
}
goodEDSResponse1 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
func() *anypb.Any {
clab0 := NewClusterLoadAssignmentBuilder(goodEDSName, nil)
clab0.AddLocality("locality-1", 1, 1, []string{"addr1:314"}, nil)
clab0.AddLocality("locality-2", 1, 0, []string{"addr2:159"}, nil)
a, _ := ptypes.MarshalAny(clab0.Build())
return a
}(),
},
TypeUrl: endpointURL,
}
goodEDSResponse2 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
func() *anypb.Any {
clab0 := NewClusterLoadAssignmentBuilder("not-goodEDSName", nil)
clab0.AddLocality("locality-1", 1, 1, []string{"addr1:314"}, nil)
clab0.AddLocality("locality-2", 1, 0, []string{"addr2:159"}, nil)
a, _ := ptypes.MarshalAny(clab0.Build())
return a
}(),
},
TypeUrl: endpointURL,
}
)
func TestHandleEDSResponse(t *testing.T) {
fakeServer, client, cleanup := fakexds.StartClientAndServer(t)
defer cleanup()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
tests := []struct {
name string
edsResponse *xdspb.DiscoveryResponse
wantErr bool
wantUpdate *EDSUpdate
wantUpdateErr bool
}{
// Any in resource is badly marshaled.
{
name: "badly-marshaled_response",
edsResponse: badlyMarshaledEDSResponse,
wantErr: true,
wantUpdate: nil,
wantUpdateErr: false,
},
// Response doesn't contain resource with the right type.
{
name: "no-config-in-response",
edsResponse: badResourceTypeInEDSResponse,
wantErr: true,
wantUpdate: nil,
wantUpdateErr: false,
},
// Response contains one uninteresting ClusterLoadAssignment.
{
name: "one-uninterestring-assignment",
edsResponse: goodEDSResponse2,
wantErr: false,
wantUpdate: nil,
wantUpdateErr: false,
},
// Response contains one good ClusterLoadAssignment.
{
name: "one-good-assignment",
edsResponse: goodEDSResponse1,
wantErr: false,
wantUpdate: &EDSUpdate{
Localities: []Locality{
{
Endpoints: []Endpoint{{Address: "addr1:314"}},
ID: internal.Locality{SubZone: "locality-1"},
Priority: 1,
Weight: 1,
},
{
Endpoints: []Endpoint{{Address: "addr2:159"}},
ID: internal.Locality{SubZone: "locality-2"},
Priority: 0,
Weight: 1,
},
},
},
wantUpdateErr: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
gotUpdateCh := make(chan *EDSUpdate, 1)
gotUpdateErrCh := make(chan error, 1)
// Register a watcher, to trigger the v2Client to send an EDS request.
cancelWatch := v2c.watchEDS(goodEDSName, func(u *EDSUpdate, err error) {
t.Logf("in v2c.watchEDS callback, edsUpdate: %+v, err: %v", u, err)
gotUpdateCh <- u
gotUpdateErrCh <- err
})
// Wait till the request makes it to the fakeServer. This ensures that
// the watch request has been processed by the v2Client.
<-fakeServer.RequestChan
// Directly push the response through a call to handleEDSResponse,
// thereby bypassing the fakeServer.
if err := v2c.handleEDSResponse(test.edsResponse); (err != nil) != test.wantErr {
t.Fatalf("v2c.handleEDSResponse() returned err: %v, wantErr: %v", err, test.wantErr)
}
// If the test needs the callback to be invoked, verify the update and
// error pushed to the callback.
if test.wantUpdate != nil {
timer := time.NewTimer(defaultTestTimeout)
select {
case <-timer.C:
t.Fatal("Timeout when expecting EDS update")
case gotUpdate := <-gotUpdateCh:
timer.Stop()
if d := cmp.Diff(gotUpdate, test.wantUpdate); d != "" {
t.Fatalf("got EDS update : %+v, want %+v, diff: %v", gotUpdate, *test.wantUpdate, d)
}
}
// Since the callback that we registered pushes to both channels at
// the same time, this channel read should return immediately.
gotUpdateErr := <-gotUpdateErrCh
if (gotUpdateErr != nil) != test.wantUpdateErr {
t.Fatalf("got EDS update error {%v}, wantErr: %v", gotUpdateErr, test.wantUpdateErr)
}
}
cancelWatch()
})
}
}
// TestHandleEDSResponseWithoutEDSWatch tests the case where the v2Client
// receives an EDS response without a registered EDS watcher.
func TestHandleEDSResponseWithoutEDSWatch(t *testing.T) {
_, client, cleanup := fakexds.StartClientAndServer(t)
defer cleanup()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
if v2c.handleEDSResponse(goodEDSResponse1) == nil {
t.Fatal("v2c.handleEDSResponse() succeeded, should have failed")
}
}
func TestEDSWatchExpiryTimer(t *testing.T) {
oldWatchExpiryTimeout := defaultWatchExpiryTimeout
defaultWatchExpiryTimeout = 1 * time.Second
defer func() {
defaultWatchExpiryTimeout = oldWatchExpiryTimeout
}()
fakeServer, client, cleanup := fakexds.StartClientAndServer(t)
defer cleanup()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
edsCallbackCh := make(chan error, 1)
v2c.watchEDS(goodRouteName1, func(u *EDSUpdate, err error) {
t.Logf("Received callback with edsUpdate {%+v} and error {%v}", u, err)
if u != nil {
edsCallbackCh <- fmt.Errorf("received EDSUpdate %v in edsCallback, wanted nil", u)
}
if err == nil {
edsCallbackCh <- errors.New("received nil error in edsCallback")
}
edsCallbackCh <- nil
})
<-fakeServer.RequestChan
timer := time.NewTimer(2 * time.Second)
select {
case <-timer.C:
t.Fatalf("Timeout expired when expecting EDS update")
case err := <-edsCallbackCh:
timer.Stop()
if err != nil {
t.Fatal(err)
}
}
}

View File

@ -67,6 +67,7 @@ func NewClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32)
// AddLocalityOptions contains options when adding locality to the builder.
type AddLocalityOptions struct {
Health []corepb.HealthStatus
Weight []uint32
}
// AddLocality adds a locality to the builder.
@ -93,18 +94,28 @@ func (clab *ClusterLoadAssignmentBuilder) AddLocality(subzone string, weight uin
PortSpecifier: &corepb.SocketAddress_PortValue{
PortValue: uint32(port)}}}}}},
}
if opts != nil && i < len(opts.Health) {
lbe.HealthStatus = opts.Health[i]
if opts != nil {
if i < len(opts.Health) {
lbe.HealthStatus = opts.Health[i]
}
if i < len(opts.Weight) {
lbe.LoadBalancingWeight = &wrapperspb.UInt32Value{Value: opts.Weight[i]}
}
}
lbEndPoints = append(lbEndPoints, lbe)
}
clab.v.Endpoints = append(clab.v.Endpoints, &endpointpb.LocalityLbEndpoints{
Locality: &corepb.Locality{
var localityID *corepb.Locality
if subzone != "" {
localityID = &corepb.Locality{
Region: "",
Zone: "",
SubZone: subzone,
},
}
}
clab.v.Endpoints = append(clab.v.Endpoints, &endpointpb.LocalityLbEndpoints{
Locality: localityID,
LbEndpoints: lbEndPoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: weight},
Priority: priority,

View File

@ -75,13 +75,15 @@ func TestGetRouteConfigNameFromListener(t *testing.T) {
}
for _, test := range tests {
gotRoute, err := getRouteConfigNameFromListener(test.lis)
if gotRoute != test.wantRoute {
t.Errorf("%s: getRouteConfigNameFromListener(%+v) = %v, want %v", test.name, test.lis, gotRoute, test.wantRoute)
}
if (err != nil) != test.wantErr {
t.Errorf("%s: getRouteConfigNameFromListener(%+v) = %v, want %v", test.name, test.lis, err, test.wantErr)
}
t.Run(test.name, func(t *testing.T) {
gotRoute, err := getRouteConfigNameFromListener(test.lis)
if gotRoute != test.wantRoute {
t.Errorf("getRouteConfigNameFromListener(%+v) = %v, want %v", test.lis, gotRoute, test.wantRoute)
}
if (err != nil) != test.wantErr {
t.Errorf("getRouteConfigNameFromListener(%+v) = %v, want %v", test.lis, err, test.wantErr)
}
})
}
}
@ -172,47 +174,49 @@ func TestHandleLDSResponse(t *testing.T) {
}
for _, test := range tests {
gotUpdateCh := make(chan ldsUpdate, 1)
gotUpdateErrCh := make(chan error, 1)
t.Run(test.name, func(t *testing.T) {
gotUpdateCh := make(chan ldsUpdate, 1)
gotUpdateErrCh := make(chan error, 1)
// Register a watcher, to trigger the v2Client to send an LDS request.
cancelWatch := v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) {
t.Logf("%s: in v2c.watchLDS callback, ldsUpdate: %+v, err: %v", test.name, u, err)
gotUpdateCh <- u
gotUpdateErrCh <- err
})
// Register a watcher, to trigger the v2Client to send an LDS request.
cancelWatch := v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) {
t.Logf("in v2c.watchLDS callback, ldsUpdate: %+v, err: %v", u, err)
gotUpdateCh <- u
gotUpdateErrCh <- err
})
// Wait till the request makes it to the fakeServer. This ensures that
// the watch request has been processed by the v2Client.
<-fakeServer.RequestChan
// Wait till the request makes it to the fakeServer. This ensures that
// the watch request has been processed by the v2Client.
<-fakeServer.RequestChan
// Directly push the response through a call to handleLDSResponse,
// thereby bypassing the fakeServer.
if err := v2c.handleLDSResponse(test.ldsResponse); (err != nil) != test.wantErr {
t.Fatalf("%s: v2c.handleLDSResponse() returned err: %v, wantErr: %v", test.name, err, test.wantErr)
}
// Directly push the response through a call to handleLDSResponse,
// thereby bypassing the fakeServer.
if err := v2c.handleLDSResponse(test.ldsResponse); (err != nil) != test.wantErr {
t.Fatalf("v2c.handleLDSResponse() returned err: %v, wantErr: %v", err, test.wantErr)
}
// If the test needs the callback to be invoked, verify the update and
// error pushed to the callback.
if test.wantUpdate != nil {
timer := time.NewTimer(defaultTestTimeout)
select {
case <-timer.C:
t.Fatal("Timeout when expecting LDS update")
case gotUpdate := <-gotUpdateCh:
timer.Stop()
if !reflect.DeepEqual(gotUpdate, *test.wantUpdate) {
t.Fatalf("%s: got LDS update : %+v, want %+v", test.name, gotUpdate, *test.wantUpdate)
// If the test needs the callback to be invoked, verify the update and
// error pushed to the callback.
if test.wantUpdate != nil {
timer := time.NewTimer(defaultTestTimeout)
select {
case <-timer.C:
t.Fatal("Timeout when expecting LDS update")
case gotUpdate := <-gotUpdateCh:
timer.Stop()
if !reflect.DeepEqual(gotUpdate, *test.wantUpdate) {
t.Fatalf("got LDS update : %+v, want %+v", gotUpdate, *test.wantUpdate)
}
}
// Since the callback that we registered pushes to both channels at
// the same time, this channel read should return immediately.
gotUpdateErr := <-gotUpdateErrCh
if (gotUpdateErr != nil) != test.wantUpdateErr {
t.Fatalf("got LDS update error {%v}, wantErr: %v", gotUpdateErr, test.wantUpdateErr)
}
}
// Since the callback that we registered pushes to both channels at
// the same time, this channel read should return immediately.
gotUpdateErr := <-gotUpdateErrCh
if (gotUpdateErr != nil) != test.wantUpdateErr {
t.Fatalf("%s: got LDS update error {%v}, wantErr: %v", test.name, gotUpdateErr, test.wantUpdateErr)
}
}
cancelWatch()
cancelWatch()
})
}
}

View File

@ -41,7 +41,7 @@ func (v2c *v2Client) newRDSRequest(routeName []string) *xdspb.DiscoveryRequest {
// sendRDS sends an RDS request for provided routeName on the provided stream.
func (v2c *v2Client) sendRDS(stream adsStream, routeName []string) bool {
if err := stream.Send(v2c.newRDSRequest(routeName)); err != nil {
grpclog.Infof("xds: RDS request for resource %v failed: %v", routeName, err)
grpclog.Warningf("xds: RDS request for resource %v failed: %v", routeName, err)
return false
}
return true
@ -82,6 +82,8 @@ func (v2c *v2Client) handleRDSResponse(resp *xdspb.DiscoveryResponse) error {
// If we get here, it means that this resource was a good one.
localCache[rc.GetName()] = cluster
// TODO: remove cache, and only process resources that are interesting.
if rc.GetName() == wi.target[0] {
returnCluster = cluster
}

View File

@ -132,9 +132,11 @@ func TestGetClusterFromRouteConfiguration(t *testing.T) {
}
for _, test := range tests {
if gotCluster := getClusterFromRouteConfiguration(test.rc, goodLDSTarget1); gotCluster != test.wantCluster {
t.Errorf("%s: getClusterFromRouteConfiguration(%+v, %v) = %v, want %v", test.name, test.rc, goodLDSTarget1, gotCluster, test.wantCluster)
}
t.Run(test.name, func(t *testing.T) {
if gotCluster := getClusterFromRouteConfiguration(test.rc, goodLDSTarget1); gotCluster != test.wantCluster {
t.Errorf("getClusterFromRouteConfiguration(%+v, %v) = %v, want %v", test.rc, goodLDSTarget1, gotCluster, test.wantCluster)
}
})
}
}
@ -211,47 +213,49 @@ func TestHandleRDSResponse(t *testing.T) {
}
for _, test := range tests {
gotUpdateCh := make(chan rdsUpdate, 1)
gotUpdateErrCh := make(chan error, 1)
t.Run(test.name, func(t *testing.T) {
gotUpdateCh := make(chan rdsUpdate, 1)
gotUpdateErrCh := make(chan error, 1)
// Register a watcher, to trigger the v2Client to send an RDS request.
cancelWatch := v2c.watchRDS(goodRouteName1, func(u rdsUpdate, err error) {
t.Logf("%s: in v2c.watchRDS callback, rdsUpdate: %+v, err: %v", test.name, u, err)
gotUpdateCh <- u
gotUpdateErrCh <- err
})
// Register a watcher, to trigger the v2Client to send an RDS request.
cancelWatch := v2c.watchRDS(goodRouteName1, func(u rdsUpdate, err error) {
t.Logf("in v2c.watchRDS callback, rdsUpdate: %+v, err: %v", u, err)
gotUpdateCh <- u
gotUpdateErrCh <- err
})
// Wait till the request makes it to the fakeServer. This ensures that
// the watch request has been processed by the v2Client.
<-fakeServer.RequestChan
// Wait till the request makes it to the fakeServer. This ensures that
// the watch request has been processed by the v2Client.
<-fakeServer.RequestChan
// Directly push the response through a call to handleRDSResponse,
// thereby bypassing the fakeServer.
if err := v2c.handleRDSResponse(test.rdsResponse); (err != nil) != test.wantErr {
t.Fatalf("%s: v2c.handleRDSResponse() returned err: %v, wantErr: %v", test.name, err, test.wantErr)
}
// Directly push the response through a call to handleRDSResponse,
// thereby bypassing the fakeServer.
if err := v2c.handleRDSResponse(test.rdsResponse); (err != nil) != test.wantErr {
t.Fatalf("v2c.handleRDSResponse() returned err: %v, wantErr: %v", err, test.wantErr)
}
// If the test needs the callback to be invoked, verify the update and
// error pushed to the callback.
if test.wantUpdate != nil {
timer := time.NewTimer(defaultTestTimeout)
select {
case <-timer.C:
t.Fatal("Timeout when expecting RDS update")
case gotUpdate := <-gotUpdateCh:
timer.Stop()
if !reflect.DeepEqual(gotUpdate, *test.wantUpdate) {
t.Fatalf("%s: got RDS update : %+v, want %+v", test.name, gotUpdate, *test.wantUpdate)
// If the test needs the callback to be invoked, verify the update and
// error pushed to the callback.
if test.wantUpdate != nil {
timer := time.NewTimer(defaultTestTimeout)
select {
case <-timer.C:
t.Fatal("Timeout when expecting RDS update")
case gotUpdate := <-gotUpdateCh:
timer.Stop()
if !reflect.DeepEqual(gotUpdate, *test.wantUpdate) {
t.Fatalf("got RDS update : %+v, want %+v", gotUpdate, *test.wantUpdate)
}
}
// Since the callback that we registered pushes to both channels at
// the same time, this channel read should return immediately.
gotUpdateErr := <-gotUpdateErrCh
if (gotUpdateErr != nil) != test.wantUpdateErr {
t.Fatalf("got RDS update error {%v}, wantErr: %v", gotUpdateErr, test.wantUpdateErr)
}
}
// Since the callback that we registered pushes to both channels at
// the same time, this channel read should return immediately.
gotUpdateErr := <-gotUpdateErrCh
if (gotUpdateErr != nil) != test.wantUpdateErr {
t.Fatalf("%s: got RDS update error {%v}, wantErr: %v", test.name, gotUpdateErr, test.wantUpdateErr)
}
}
cancelWatch()
cancelWatch()
})
}
}

View File

@ -80,3 +80,5 @@ type rdsUpdate struct {
}
type rdsCallback func(rdsUpdate, error)
type edsCallback func(*EDSUpdate, error)

View File

@ -69,6 +69,10 @@ type v2Client struct {
// when we received them (because we could become interested in them in the
// future and the server wont send us those resources again).
// Protected by the above mutex.
//
// TODO: remove RDS cache. The updated spec says client can ignore
// unrequested resources.
// https://github.com/envoyproxy/envoy/blob/master/api/xds_protocol.rst#resource-hints
rdsCache map[string]string
}
@ -159,6 +163,10 @@ func (v2c *v2Client) sendExisting(stream adsStream) bool {
if !v2c.sendRDS(stream, wi.target) {
return false
}
case edsResource:
if !v2c.sendEDS(stream, wi.target) {
return false
}
}
}
@ -198,6 +206,10 @@ func (v2c *v2Client) send(stream adsStream, done chan struct{}) {
if !v2c.sendRDS(stream, target) {
return
}
case edsResource:
if !v2c.sendEDS(stream, target) {
return
}
}
case <-done:
return
@ -211,6 +223,7 @@ func (v2c *v2Client) recv(stream adsStream) bool {
success := false
for {
resp, err := stream.Recv()
// TODO: call watch callbacks with error when stream is broken.
if err != nil {
grpclog.Warningf("xds: ADS stream recv failed: %v", err)
return success
@ -226,6 +239,11 @@ func (v2c *v2Client) recv(stream adsStream) bool {
grpclog.Warningf("xds: RDS response handler failed: %v", err)
return success
}
case endpointURL:
if err := v2c.handleEDSResponse(resp); err != nil {
grpclog.Warningf("xds: EDS response handler failed: %v", err)
return success
}
default:
grpclog.Warningf("xds: unknown response URL type: %v", resp.GetTypeUrl())
}
@ -278,6 +296,30 @@ func (v2c *v2Client) watchRDS(routeName string, rdsCb rdsCallback) (cancel func(
}
}
// watchEDS registers an EDS watcher for the provided clusterName. Updates
// corresponding to received EDS responses will be pushed to the provided
// callback. The caller can cancel the watch by invoking the returned cancel
// function.
// The provided callback should not block or perform any expensive operations
// or call other methods of the v2Client object.
func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallback) (cancel func()) {
wi := &watchInfo{wType: edsResource, target: []string{clusterName}, callback: edsCb}
v2c.watchCh.Put(wi)
return func() {
v2c.mu.Lock()
defer v2c.mu.Unlock()
if wi.state == watchEnqueued {
wi.state = watchCancelled
return
}
v2c.watchMap[edsResource].cancel()
delete(v2c.watchMap, edsResource)
// TODO: Once a registered EDS watch is cancelled, we should send an
// EDS request with no resources. This will let the server know that we
// are no longer interested in this resource.
}
}
// checkCacheAndUpdateWatchMap is called when a new watch call is handled in
// send(). If an existing watcher is found, its expiry timer is stopped. If the
// watchInfo to be added to the watchMap is found in the cache, the watcher
@ -322,5 +364,14 @@ func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) {
wi.callback.(rdsCallback)(rdsUpdate{clusterName: ""}, fmt.Errorf("xds: RDS target %s not found", wi.target))
v2c.mu.Unlock()
})
case edsResource:
wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
// We need to grab the lock here because we are accessing the
// watchInfo (which is now stored in the watchMap) from this
// method which will be called when the timer fires.
v2c.mu.Lock()
wi.callback.(edsCallback)(nil, fmt.Errorf("xds: EDS target %s not found", wi.target))
v2c.mu.Unlock()
})
}
}

View File

@ -41,6 +41,7 @@ const (
goodLDSTarget2 = "lds.target.good:2222"
goodRouteName1 = "GoodRouteConfig1"
goodRouteName2 = "GoodRouteConfig2"
goodEDSName = "GoodClusterAssignment1"
uninterestingRouteName = "UninterestingRouteName"
goodMatchingDomain = "lds.target.good"
uninterestingDomain = "uninteresting.domain"
@ -183,7 +184,7 @@ var (
badResourceTypeInLDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: listenerURL,
TypeUrl: httpConnManagerURL,
Value: marshaledConnMgr1,
},
},
@ -240,7 +241,7 @@ var (
badResourceTypeInRDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: routeURL,
TypeUrl: httpConnManagerURL,
Value: marshaledConnMgr1,
},
},