xdsClient: change WatchRDS to return all virtual hosts (#3944)

Instead of finding the best matching domain for the service, and return only
that one virtual host's routes.

This removes the lds request name from the xds client, and makes it xds client
handle multiple RDS watches, so one xds client can be shared by multiple
ClientConns.

This also removes some response validation from the client (e.g. if no virtual
host matches what the client is asking for, the response won't be nack'ed).
This commit is contained in:
Menghan Li 2020-10-15 15:21:24 -07:00 committed by GitHub
parent c6cfaba14d
commit ea47aa91b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 283 additions and 278 deletions

View File

@ -149,6 +149,15 @@ type ListenerUpdate struct {
// RouteConfigUpdate contains information received in an RDS response, which is
// of interest to the registered RDS watcher.
type RouteConfigUpdate struct {
VirtualHosts []*VirtualHost
}
// VirtualHost contains the routes for a list of Domains.
//
// Note that the domains in this slice can be a wildcard, not an exact string.
// The consumer of this struct needs to find the best match for its hostname.
type VirtualHost struct {
Domains []string
// Routes contains a list of routes, each containing matchers and
// corresponding action.
Routes []*Route

View File

@ -195,36 +195,6 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
wantUpdate RouteConfigUpdate
wantError bool
}{
{
name: "no-virtual-hosts-in-rc",
rc: &v3routepb.RouteConfiguration{},
wantError: true,
},
{
name: "no-domains-in-rc",
rc: &v3routepb.RouteConfiguration{
VirtualHosts: []*v3routepb.VirtualHost{{}},
},
wantError: true,
},
{
name: "non-matching-domain-in-rc",
rc: &v3routepb.RouteConfiguration{
VirtualHosts: []*v3routepb.VirtualHost{
{Domains: []string{uninterestingDomain}},
},
},
wantError: true,
},
{
name: "no-routes-in-rc",
rc: &v3routepb.RouteConfiguration{
VirtualHosts: []*v3routepb.VirtualHost{
{Domains: []string{ldsTarget}},
},
},
wantError: true,
},
{
name: "default-route-match-field-is-nil",
rc: &v3routepb.RouteConfiguration{
@ -345,7 +315,18 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
},
},
},
wantUpdate: RouteConfigUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{clusterName: 1}}}},
wantUpdate: RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{uninterestingDomain},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{uninterestingClusterName: 1}}},
},
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{clusterName: 1}}},
},
},
},
},
{
// default route's match is not empty string, but "/".
@ -368,7 +349,14 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
},
},
},
wantUpdate: RouteConfigUpdate{Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}}}},
wantUpdate: RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}}},
},
},
},
},
{
// weights not add up to total-weight.
@ -431,13 +419,20 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
},
},
},
wantUpdate: RouteConfigUpdate{Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{"a": 2, "b": 3, "c": 5}}}},
wantUpdate: RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{"a": 2, "b": 3, "c": 5}}},
},
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
gotUpdate, gotError := generateRDSUpdateFromRouteConfiguration(test.rc, ldsTarget, nil)
gotUpdate, gotError := generateRDSUpdateFromRouteConfiguration(test.rc, nil)
if (gotError != nil) != test.wantError || !cmp.Equal(gotUpdate, test.wantUpdate, cmpopts.EquateEmpty()) {
t.Errorf("generateRDSUpdateFromRouteConfiguration(%+v, %v) = %v, want %v", test.rc, ldsTarget, gotUpdate, test.wantUpdate)
}
@ -558,24 +553,6 @@ func (s) TestUnmarshalRouteConfig(t *testing.T) {
},
wantErr: true,
},
{
name: "bad routeConfig resource",
resources: []*anypb.Any{
{
TypeUrl: version.V3RouteConfigURL,
Value: func() []byte {
rc := &v3routepb.RouteConfiguration{
VirtualHosts: []*v3routepb.VirtualHost{
{Domains: []string{uninterestingDomain}},
},
}
m, _ := proto.Marshal(rc)
return m
}(),
},
},
wantErr: true,
},
{
name: "empty resource list",
},
@ -583,28 +560,72 @@ func (s) TestUnmarshalRouteConfig(t *testing.T) {
name: "v2 routeConfig resource",
resources: []*anypb.Any{v2RouteConfig},
wantUpdate: map[string]RouteConfigUpdate{
v2RouteConfigName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{v2ClusterName: 1}}}},
v2RouteConfigName: {
VirtualHosts: []*VirtualHost{
{
Domains: []string{uninterestingDomain},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{uninterestingClusterName: 1}}},
},
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{v2ClusterName: 1}}},
},
},
},
},
},
{
name: "v3 routeConfig resource",
resources: []*anypb.Any{v3RouteConfig},
wantUpdate: map[string]RouteConfigUpdate{
v3RouteConfigName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{v3ClusterName: 1}}}},
v3RouteConfigName: {
VirtualHosts: []*VirtualHost{
{
Domains: []string{uninterestingDomain},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{uninterestingClusterName: 1}}},
},
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{v3ClusterName: 1}}},
},
},
},
},
},
{
name: "multiple routeConfig resources",
resources: []*anypb.Any{v2RouteConfig, v3RouteConfig},
wantUpdate: map[string]RouteConfigUpdate{
v3RouteConfigName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{v3ClusterName: 1}}}},
v2RouteConfigName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{v2ClusterName: 1}}}},
v3RouteConfigName: {
VirtualHosts: []*VirtualHost{
{
Domains: []string{uninterestingDomain},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{uninterestingClusterName: 1}}},
},
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{v3ClusterName: 1}}},
},
},
},
v2RouteConfigName: {
VirtualHosts: []*VirtualHost{
{
Domains: []string{uninterestingDomain},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{uninterestingClusterName: 1}}},
},
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{v2ClusterName: 1}}},
},
},
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
update, err := UnmarshalRouteConfig(test.resources, ldsTarget, nil)
update, err := UnmarshalRouteConfig(test.resources, nil)
if ((err != nil) != test.wantErr) || !cmp.Equal(update, test.wantUpdate, cmpopts.EquateEmpty()) {
t.Errorf("UnmarshalRouteConfig(%v, %v) = (%v, %v) want (%v, %v)", test.resources, ldsTarget, update, err, test.wantUpdate, test.wantErr)
}
@ -660,38 +681,32 @@ func (s) TestMatch(t *testing.T) {
func (s) TestFindBestMatchingVirtualHost(t *testing.T) {
var (
oneExactMatch = &v3routepb.VirtualHost{
Name: "one-exact-match",
oneExactMatch = &VirtualHost{
Domains: []string{"foo.bar.com"},
}
oneSuffixMatch = &v3routepb.VirtualHost{
Name: "one-suffix-match",
oneSuffixMatch = &VirtualHost{
Domains: []string{"*.bar.com"},
}
onePrefixMatch = &v3routepb.VirtualHost{
Name: "one-prefix-match",
onePrefixMatch = &VirtualHost{
Domains: []string{"foo.bar.*"},
}
oneUniversalMatch = &v3routepb.VirtualHost{
Name: "one-universal-match",
oneUniversalMatch = &VirtualHost{
Domains: []string{"*"},
}
longExactMatch = &v3routepb.VirtualHost{
Name: "one-exact-match",
longExactMatch = &VirtualHost{
Domains: []string{"v2.foo.bar.com"},
}
multipleMatch = &v3routepb.VirtualHost{
Name: "multiple-match",
multipleMatch = &VirtualHost{
Domains: []string{"pi.foo.bar.com", "314.*", "*.159"},
}
vhs = []*v3routepb.VirtualHost{oneExactMatch, oneSuffixMatch, onePrefixMatch, oneUniversalMatch, longExactMatch, multipleMatch}
vhs = []*VirtualHost{oneExactMatch, oneSuffixMatch, onePrefixMatch, oneUniversalMatch, longExactMatch, multipleMatch}
)
tests := []struct {
name string
host string
vHosts []*v3routepb.VirtualHost
want *v3routepb.VirtualHost
vHosts []*VirtualHost
want *VirtualHost
}{
{name: "exact-match", host: "foo.bar.com", vHosts: vhs, want: oneExactMatch},
{name: "suffix-match", host: "123.bar.com", vHosts: vhs, want: oneSuffixMatch},

View File

@ -272,7 +272,7 @@ func (c *Client) WatchService(serviceName string, cb func(ServiceUpdate, error))
}
c.mu.Unlock()
w := &serviceUpdateWatcher{c: c, serviceCb: cb}
w := &serviceUpdateWatcher{c: c, serviceName: serviceName, serviceCb: cb}
w.ldsCancel = c.watchLDS(serviceName, w.handleLDSResp)
return w.close
@ -280,10 +280,15 @@ func (c *Client) WatchService(serviceName string, cb func(ServiceUpdate, error))
// serviceUpdateWatcher handles LDS and RDS response, and calls the service
// callback at the right time.
//
// TODO: move serviceUpdateWatcher and all its functions into xds resolver. The
// resolver should be responsible for making WatchListener() and WatchRoute()
// calls, and finding the best matching virtual host.
type serviceUpdateWatcher struct {
c *Client
ldsCancel func()
serviceCb func(ServiceUpdate, error)
c *Client
serviceName string
ldsCancel func()
serviceCb func(ServiceUpdate, error)
mu sync.Mutex
closed bool
@ -342,7 +347,15 @@ func (w *serviceUpdateWatcher) handleRDSResp(update RouteConfigUpdate, err error
w.serviceCb(ServiceUpdate{}, err)
return
}
w.serviceCb(ServiceUpdate(update), nil)
matchVh := findBestMatchingVirtualHost(w.serviceName, update.VirtualHosts)
if matchVh == nil {
// No matching virtual host found.
w.serviceCb(ServiceUpdate{}, fmt.Errorf("no matching virtual host found for %q", w.serviceName))
return
}
w.serviceCb(ServiceUpdate{Routes: matchVh.Routes}, nil)
}
func (w *serviceUpdateWatcher) close() {

View File

@ -62,7 +62,14 @@ func (s) TestRDSWatch(t *testing.T) {
t.Fatalf("want new watch to start, got error %v", err)
}
wantUpdate := RouteConfigUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
wantUpdate := RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{testLDSName},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}},
},
},
}
client.NewRouteConfigs(map[string]RouteConfigUpdate{testRDSName: wantUpdate})
if err := verifyRouteConfigUpdate(ctx, rdsUpdateCh, wantUpdate); err != nil {
t.Fatal(err)
@ -127,7 +134,14 @@ func (s) TestRDSTwoWatchSameResourceName(t *testing.T) {
}
}
wantUpdate := RouteConfigUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
wantUpdate := RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{testLDSName},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}},
},
},
}
client.NewRouteConfigs(map[string]RouteConfigUpdate{testRDSName: wantUpdate})
for i := 0; i < count; i++ {
if err := verifyRouteConfigUpdate(ctx, rdsUpdateChs[i], wantUpdate); err != nil {
@ -199,8 +213,22 @@ func (s) TestRDSThreeWatchDifferentResourceName(t *testing.T) {
t.Fatalf("want new watch to start, got error %v", err)
}
wantUpdate1 := RouteConfigUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "1": 1}}}}
wantUpdate2 := RouteConfigUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}}}
wantUpdate1 := RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{testLDSName},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "1": 1}}},
},
},
}
wantUpdate2 := RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{testLDSName},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}},
},
},
}
client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName + "1": wantUpdate1,
testRDSName + "2": wantUpdate2,
@ -244,7 +272,14 @@ func (s) TestRDSWatchAfterCache(t *testing.T) {
t.Fatalf("want new watch to start, got error %v", err)
}
wantUpdate := RouteConfigUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
wantUpdate := RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{testLDSName},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}},
},
},
}
client.NewRouteConfigs(map[string]RouteConfigUpdate{testRDSName: wantUpdate})
if err := verifyRouteConfigUpdate(ctx, rdsUpdateCh, wantUpdate); err != nil {
t.Fatal(err)

View File

@ -67,7 +67,14 @@ func (s) TestServiceWatch(t *testing.T) {
t.Fatalf("want new watch to start, got error %v", err)
}
client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
testRDSName: {
VirtualHosts: []*VirtualHost{
{
Domains: []string{testLDSName},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}},
},
},
},
})
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
t.Fatal(err)
@ -81,10 +88,12 @@ func (s) TestServiceWatch(t *testing.T) {
}
client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName: {
Routes: []*Route{{
Prefix: newStringP(""),
Action: map[string]uint32{testCDSName: 1},
}},
VirtualHosts: []*VirtualHost{
{
Domains: []string{testLDSName},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}},
},
},
},
})
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil {
@ -127,7 +136,14 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {
t.Fatalf("want new watch to start, got error %v", err)
}
client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
testRDSName: {
VirtualHosts: []*VirtualHost{
{
Domains: []string{testLDSName},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}},
},
},
},
})
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
t.Fatal(err)
@ -141,7 +157,14 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {
// Another update for the old name.
client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
testRDSName: {
VirtualHosts: []*VirtualHost{
{
Domains: []string{testLDSName},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}},
},
},
},
})
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
@ -152,7 +175,14 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {
// RDS update for the new name.
wantUpdate2 := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}}}
client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName + "2": {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}}},
testRDSName + "2": {
VirtualHosts: []*VirtualHost{
{
Domains: []string{testLDSName},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}},
},
},
},
})
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil {
t.Fatal(err)
@ -194,7 +224,14 @@ func (s) TestServiceWatchSecond(t *testing.T) {
t.Fatalf("want new watch to start, got error %v", err)
}
client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
testRDSName: {
VirtualHosts: []*VirtualHost{
{
Domains: []string{testLDSName},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}},
},
},
},
})
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
t.Fatal(err)
@ -221,7 +258,14 @@ func (s) TestServiceWatchSecond(t *testing.T) {
// timeout.
client.NewListeners(map[string]ListenerUpdate{testLDSName: {RouteConfigName: testRDSName}})
client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
testRDSName: {
VirtualHosts: []*VirtualHost{
{
Domains: []string{testLDSName},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}},
},
},
},
})
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
t.Fatal(err)
@ -396,7 +440,14 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) {
t.Fatalf("want new watch to start, got error %v", err)
}
client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
testRDSName: {
VirtualHosts: []*VirtualHost{
{
Domains: []string{testLDSName},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}},
},
},
},
})
wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
@ -449,7 +500,14 @@ func (s) TestServiceResourceRemoved(t *testing.T) {
t.Fatalf("want new watch to start, got error %v", err)
}
client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
testRDSName: {
VirtualHosts: []*VirtualHost{
{
Domains: []string{testLDSName},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}},
},
},
},
})
wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
@ -469,7 +527,14 @@ func (s) TestServiceResourceRemoved(t *testing.T) {
// Send RDS update for the removed LDS resource, expect no updates to
// callback, because RDS should be canceled.
client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new": 1}}}},
testRDSName: {
VirtualHosts: []*VirtualHost{
{
Domains: []string{testLDSName},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new": 1}}},
},
},
},
})
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
@ -491,7 +556,14 @@ func (s) TestServiceResourceRemoved(t *testing.T) {
}
client.NewRouteConfigs(map[string]RouteConfigUpdate{
testRDSName: {Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new2": 1}}}},
testRDSName: {
VirtualHosts: []*VirtualHost{
{
Domains: []string{testLDSName},
Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new2": 1}}},
},
},
},
})
wantUpdate = ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new2": 1}}}}
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {

View File

@ -104,7 +104,7 @@ func getRouteConfigNameFromListener(lis *v3listenerpb.Listener, logger *grpclog.
// validates them, and transforms them into a native struct which contains only
// fields we are interested in. The provided hostname determines the route
// configuration resources of interest.
func UnmarshalRouteConfig(resources []*anypb.Any, hostname string, logger *grpclog.PrefixLogger) (map[string]RouteConfigUpdate, error) {
func UnmarshalRouteConfig(resources []*anypb.Any, logger *grpclog.PrefixLogger) (map[string]RouteConfigUpdate, error) {
update := make(map[string]RouteConfigUpdate)
for _, r := range resources {
if !IsRouteConfigResource(r.GetTypeUrl()) {
@ -114,10 +114,10 @@ func UnmarshalRouteConfig(resources []*anypb.Any, hostname string, logger *grpcl
if err := proto.Unmarshal(r.GetValue(), rc); err != nil {
return nil, fmt.Errorf("xds: failed to unmarshal resource in RDS response: %v", err)
}
logger.Infof("Resource with name: %v, type: %T, contains: %v. Picking routes for current watching hostname %v", rc.GetName(), rc, rc, hostname)
logger.Infof("Resource with name: %v, type: %T, contains: %v.", rc.GetName(), rc, rc)
// Use the hostname (resourceName for LDS) to find the routes.
u, err := generateRDSUpdateFromRouteConfiguration(rc, hostname, logger)
u, err := generateRDSUpdateFromRouteConfiguration(rc, logger)
if err != nil {
return nil, fmt.Errorf("xds: received invalid RouteConfiguration in RDS response: %+v with err: %v", rc, err)
}
@ -142,30 +142,19 @@ func UnmarshalRouteConfig(resources []*anypb.Any, hostname string, logger *grpcl
// field must be empty and whose route field must be set. Inside that route
// message, the cluster field will contain the clusterName or weighted clusters
// we are looking for.
func generateRDSUpdateFromRouteConfiguration(rc *v3routepb.RouteConfiguration, host string, logger *grpclog.PrefixLogger) (RouteConfigUpdate, error) {
//
// Currently this returns "" on error, and the caller will return an error.
// But the error doesn't contain details of why the response is invalid
// (mismatch domain or empty route).
//
// For logging purposes, we can log in line. But if we want to populate
// error details for nack, a detailed error needs to be returned.
vh := findBestMatchingVirtualHost(host, rc.GetVirtualHosts())
if vh == nil {
// No matching virtual host found.
return RouteConfigUpdate{}, fmt.Errorf("no matching virtual host found")
func generateRDSUpdateFromRouteConfiguration(rc *v3routepb.RouteConfiguration, logger *grpclog.PrefixLogger) (RouteConfigUpdate, error) {
var vhs []*VirtualHost
for _, vh := range rc.GetVirtualHosts() {
routes, err := routesProtoToSlice(vh.Routes, logger)
if err != nil {
return RouteConfigUpdate{}, fmt.Errorf("received route is invalid: %v", err)
}
vhs = append(vhs, &VirtualHost{
Domains: vh.GetDomains(),
Routes: routes,
})
}
if len(vh.Routes) == 0 {
// The matched virtual host has no routes, this is invalid because there
// should be at least one default route.
return RouteConfigUpdate{}, fmt.Errorf("matched virtual host has no routes")
}
routes, err := routesProtoToSlice(vh.Routes, logger)
if err != nil {
return RouteConfigUpdate{}, fmt.Errorf("received route is invalid: %v", err)
}
return RouteConfigUpdate{Routes: routes}, nil
return RouteConfigUpdate{VirtualHosts: vhs}, nil
}
func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger) ([]*Route, error) {
@ -339,14 +328,14 @@ func match(domain, host string) (domainMatchType, bool) {
// - If two matches are of the same pattern type, the longer match is better
// - This is to compare the length of the matching pattern, e.g. “*ABCDE” >
// “*ABC”
func findBestMatchingVirtualHost(host string, vHosts []*v3routepb.VirtualHost) *v3routepb.VirtualHost {
func findBestMatchingVirtualHost(host string, vHosts []*VirtualHost) *VirtualHost {
var (
matchVh *v3routepb.VirtualHost
matchVh *VirtualHost
matchType = domainMatchTypeInvalid
matchLen int
)
for _, vh := range vHosts {
for _, domain := range vh.GetDomains() {
for _, domain := range vh.Domains {
typ, matched := match(domain, host)
if typ == domainMatchTypeInvalid {
// The rds response is invalid.

View File

@ -22,7 +22,6 @@ package v2
import (
"context"
"fmt"
"sync"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
@ -93,54 +92,6 @@ type client struct {
// ClientConn to the xDS gRPC server. Owned by the parent xdsClient.
cc *grpc.ClientConn
nodeProto *v2corepb.Node
mu sync.Mutex
// ldsResourceName is the LDS resource_name to watch. It is set to the first
// LDS resource_name to watch, and removed when the LDS watch is canceled.
//
// It's from the dial target of the parent ClientConn. RDS resource
// processing needs this to do the host matching.
ldsResourceName string
ldsWatchCount int
}
// AddWatch overrides the transport helper's AddWatch to save the LDS
// resource_name. This is required when handling an RDS response to perform host
// matching.
func (v2c *client) AddWatch(rType xdsclient.ResourceType, rName string) {
v2c.mu.Lock()
// Special handling for LDS, because RDS needs the LDS resource_name for
// response host matching.
if rType == xdsclient.ListenerResource {
// Set hostname to the first LDS resource_name, and reset it when the
// last LDS watch is removed. The upper level Client isn't expected to
// watchLDS more than once.
v2c.ldsWatchCount++
if v2c.ldsWatchCount == 1 {
v2c.ldsResourceName = rName
}
}
v2c.mu.Unlock()
v2c.TransportHelper.AddWatch(rType, rName)
}
// RemoveWatch overrides the transport helper's RemoveWatch to clear the LDS
// resource_name when the last watch is removed.
func (v2c *client) RemoveWatch(rType xdsclient.ResourceType, rName string) {
v2c.mu.Lock()
// Special handling for LDS, because RDS needs the LDS resource_name for
// response host matching.
if rType == xdsclient.ListenerResource {
// Set hostname to the first LDS resource_name, and reset it when the
// last LDS watch is removed. The upper level Client isn't expected to
// watchLDS more than once.
v2c.ldsWatchCount--
if v2c.ldsWatchCount == 0 {
v2c.ldsResourceName = ""
}
}
v2c.mu.Unlock()
v2c.TransportHelper.RemoveWatch(rType, rName)
}
func (v2c *client) NewStream(ctx context.Context) (grpc.ClientStream, error) {
@ -242,11 +193,7 @@ func (v2c *client) handleLDSResponse(resp *v2xdspb.DiscoveryResponse) error {
// receipt of a good response, it caches validated resources and also invokes
// the registered watcher callback.
func (v2c *client) handleRDSResponse(resp *v2xdspb.DiscoveryResponse) error {
v2c.mu.Lock()
hostname := v2c.ldsResourceName
v2c.mu.Unlock()
update, err := xdsclient.UnmarshalRouteConfig(resp.GetResources(), hostname, v2c.logger)
update, err := xdsclient.UnmarshalRouteConfig(resp.GetResources(), v2c.logger)
if err != nil {
return err
}

View File

@ -74,10 +74,12 @@ func (s) TestRDSHandleResponseWithRouting(t *testing.T) {
// RouteConfiguration, since the others are covered in
// TestGetClusterFromRouteConfiguration.
{
name: "no-virtual-hosts-in-response",
rdsResponse: noVirtualHostsInRDSResponse,
wantErr: true,
wantUpdate: nil,
name: "no-virtual-hosts-in-response",
rdsResponse: noVirtualHostsInRDSResponse,
wantErr: false,
wantUpdate: &xdsclient.RouteConfigUpdate{
VirtualHosts: nil,
},
wantUpdateErr: false,
},
// Response contains one good RouteConfiguration, uninteresting though.
@ -90,24 +92,20 @@ func (s) TestRDSHandleResponseWithRouting(t *testing.T) {
},
// Response contains one good interesting RouteConfiguration.
{
name: "one-good-route-config",
rdsResponse: goodRDSResponse1,
wantErr: false,
wantUpdate: &xdsclient.RouteConfigUpdate{Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{goodClusterName1: 1}}}},
wantUpdateErr: false,
},
{
name: "one-good-route-config with routes",
name: "one-good-route-config",
rdsResponse: goodRDSResponse1,
wantErr: false,
wantUpdate: &xdsclient.RouteConfigUpdate{
// Instead of just weighted targets when routing is disabled,
// this result contains a route with perfix "", and action as
// weighted targets.
Routes: []*xdsclient.Route{{
Prefix: newStringP(""),
Action: map[string]uint32{goodClusterName1: 1},
}},
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{uninterestingDomain},
Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{uninterestingClusterName: 1}}},
},
{
Domains: []string{goodLDSTarget1},
Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{goodClusterName1: 1}}},
},
},
},
wantUpdateErr: false,
},
@ -126,25 +124,6 @@ func (s) TestRDSHandleResponseWithRouting(t *testing.T) {
}
}
// TestRDSHandleResponseWithoutLDSWatch tests the case where the v2Client
// receives an RDS response without a registered LDS watcher.
func (s) TestRDSHandleResponseWithoutLDSWatch(t *testing.T) {
_, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c, err := newV2Client(&testUpdateReceiver{
f: func(xdsclient.ResourceType, map[string]interface{}) {},
}, cc, goodNodeProto, func(int) time.Duration { return 0 }, nil)
if err != nil {
t.Fatal(err)
}
defer v2c.Close()
if v2c.handleRDSResponse(goodRDSResponse1) == nil {
t.Fatal("v2c.handleRDSResponse() succeeded, should have failed")
}
}
// TestRDSHandleResponseWithoutRDSWatch tests the case where the v2Client
// receives an RDS response without a registered RDS watcher.
func (s) TestRDSHandleResponseWithoutRDSWatch(t *testing.T) {

View File

@ -236,13 +236,15 @@ var (
},
TypeUrl: version.V2RouteConfigURL,
}
emptyRouteConfig = &xdspb.RouteConfiguration{}
marshaledEmptyRouteConfig, _ = proto.Marshal(emptyRouteConfig)
noVirtualHostsInRDSResponse = &xdspb.DiscoveryResponse{
noVirtualHostsRouteConfig = &xdspb.RouteConfiguration{
Name: goodRouteName1,
}
marshaledNoVirtualHostsRouteConfig, _ = proto.Marshal(noVirtualHostsRouteConfig)
noVirtualHostsInRDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: version.V2RouteConfigURL,
Value: marshaledEmptyRouteConfig,
Value: marshaledNoVirtualHostsRouteConfig,
},
},
TypeUrl: version.V2RouteConfigURL,
@ -411,11 +413,6 @@ func testWatchHandle(t *testing.T, test *watchHandleTestcase) {
}
defer v2c.Close()
// RDS needs an existing LDS watch for the hostname.
if test.rType == xdsclient.RouteConfigResource {
doLDS(t, v2c, fakeServer)
}
// Register the watcher, this will also trigger the v2Client to send the xDS
// request.
v2c.AddWatch(test.rType, test.resourceName)

View File

@ -22,7 +22,6 @@ package v3
import (
"context"
"fmt"
"sync"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
@ -93,52 +92,6 @@ type client struct {
// ClientConn to the xDS gRPC server. Owned by the parent xdsClient.
cc *grpc.ClientConn
nodeProto *v3corepb.Node
mu sync.Mutex
// ldsResourceName is the LDS resource_name to watch. It is set to the first
// LDS resource_name to watch, and removed when the LDS watch is canceled.
//
// It's from the dial target of the parent ClientConn. RDS resource
// processing needs this to do the host matching.
ldsResourceName string
ldsWatchCount int
}
// AddWatch overrides the transport helper's AddWatch to save the LDS
// resource_name. This is required when handling an RDS response to perform host
// matching.
func (v3c *client) AddWatch(rType xdsclient.ResourceType, rName string) {
v3c.mu.Lock()
// Special handling for LDS, because RDS needs the LDS resource_name for
// response host matching.
if rType == xdsclient.ListenerResource {
// Set hostname to the first LDS resource_name, and reset it when the
// last LDS watch is removed. The upper level Client isn't expected to
// watchLDS more than once.
v3c.ldsWatchCount++
if v3c.ldsWatchCount == 1 {
v3c.ldsResourceName = rName
}
}
v3c.mu.Unlock()
v3c.TransportHelper.AddWatch(rType, rName)
}
func (v3c *client) RemoveWatch(rType xdsclient.ResourceType, rName string) {
v3c.mu.Lock()
// Special handling for LDS, because RDS needs the LDS resource_name for
// response host matching.
if rType == xdsclient.ListenerResource {
// Set hostname to the first LDS resource_name, and reset it when the
// last LDS watch is removed. The upper level Client isn't expected to
// watchLDS more than once.
v3c.ldsWatchCount--
if v3c.ldsWatchCount == 0 {
v3c.ldsResourceName = ""
}
}
v3c.mu.Unlock()
v3c.TransportHelper.RemoveWatch(rType, rName)
}
func (v3c *client) NewStream(ctx context.Context) (grpc.ClientStream, error) {
@ -240,11 +193,7 @@ func (v3c *client) handleLDSResponse(resp *v3discoverypb.DiscoveryResponse) erro
// receipt of a good response, it caches validated resources and also invokes
// the registered watcher callback.
func (v3c *client) handleRDSResponse(resp *v3discoverypb.DiscoveryResponse) error {
v3c.mu.Lock()
hostname := v3c.ldsResourceName
v3c.mu.Unlock()
update, err := xdsclient.UnmarshalRouteConfig(resp.GetResources(), hostname, v3c.logger)
update, err := xdsclient.UnmarshalRouteConfig(resp.GetResources(), v3c.logger)
if err != nil {
return err
}