xds: add HTTP connection manager max_stream_duration support (#4122)

This commit is contained in:
Doug Fawley 2021-01-08 10:14:53 -08:00 committed by GitHub
parent 0bd76be2bb
commit 6a318bb011
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 166 additions and 47 deletions

View File

@ -147,6 +147,10 @@ type ListenerUpdate struct {
RouteConfigName string
// SecurityCfg contains security configuration sent by the control plane.
SecurityCfg *SecurityConfig
// MaxStreamDuration contains the HTTP connection manager's
// common_http_protocol_options.max_stream_duration field, or zero if
// unset.
MaxStreamDuration time.Duration
}
func (lu *ListenerUpdate) String() string {
@ -181,8 +185,13 @@ type Route struct {
Fraction *uint32
// If the matchers above indicate a match, the below configuration is used.
Action map[string]uint32 // action is weighted clusters.
MaxStreamDuration time.Duration
Action map[string]uint32 // action is weighted clusters.
// If MaxStreamDuration is nil, it indicates neither of the route action's
// max_stream_duration fields (grpc_timeout_header_max nor
// max_stream_duration) were set. In this case, the ListenerUpdate's
// MaxStreamDuration field should be used. If MaxStreamDuration is set to
// an explicit zero duration, the application's deadline should be used.
MaxStreamDuration *time.Duration
}
// HeaderMatcher represents header matchers.

View File

@ -21,6 +21,7 @@ package client
import (
"strings"
"testing"
"time"
v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
@ -37,6 +38,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/xds/internal/version"
"google.golang.org/protobuf/types/known/durationpb"
)
func (s) TestUnmarshalListener_ClientSide(t *testing.T) {
@ -87,6 +89,9 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) {
RouteConfigName: v3RouteConfigName,
},
},
CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{
MaxStreamDuration: durationpb.New(time.Second),
},
}
mcm, _ := ptypes.MarshalAny(cm)
lis := &v3listenerpb.Listener{
@ -278,7 +283,7 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) {
name: "v3 listener resource",
resources: []*anypb.Any{v3Lis},
wantUpdate: map[string]ListenerUpdate{
v3LDSTarget: {RouteConfigName: v3RouteConfigName},
v3LDSTarget: {RouteConfigName: v3RouteConfigName, MaxStreamDuration: time.Second},
},
},
{
@ -286,7 +291,7 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) {
resources: []*anypb.Any{v2Lis, v3Lis},
wantUpdate: map[string]ListenerUpdate{
v2LDSTarget: {RouteConfigName: v2RouteConfigName},
v3LDSTarget: {RouteConfigName: v3RouteConfigName},
v3LDSTarget: {RouteConfigName: v3RouteConfigName, MaxStreamDuration: time.Second},
},
},
}

View File

@ -317,7 +317,7 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
VirtualHosts: []*VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: time.Second}},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: newDurationP(time.Second)}},
},
},
},
@ -347,7 +347,7 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
VirtualHosts: []*VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: time.Second}},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: newDurationP(time.Second)}},
},
},
},
@ -377,7 +377,7 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
VirtualHosts: []*VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: 0}},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: newDurationP(0)}},
},
},
},
@ -803,3 +803,7 @@ func newUInt32P(i uint32) *uint32 {
func newBoolP(b bool) *bool {
return &b
}
func newDurationP(d time.Duration) *time.Duration {
return &d
}

View File

@ -80,6 +80,8 @@ func processListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) {
// processClientSideListener checks if the provided Listener proto meets
// the expected criteria. If so, it returns a non-empty routeConfigName.
func processClientSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) {
update := &ListenerUpdate{}
apiLisAny := lis.GetApiListener().GetApiListener()
if !IsHTTPConnManagerResource(apiLisAny.GetTypeUrl()) {
return nil, fmt.Errorf("xds: unexpected resource type: %q in LDS response", apiLisAny.GetTypeUrl())
@ -98,7 +100,7 @@ func processClientSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, err
if name == "" {
return nil, fmt.Errorf("xds: empty route_config_name in LDS response: %+v", lis)
}
return &ListenerUpdate{RouteConfigName: name}, nil
update.RouteConfigName = name
case *v3httppb.HttpConnectionManager_RouteConfig:
// TODO: Add support for specifying the RouteConfiguration inline
// in the LDS response.
@ -108,6 +110,10 @@ func processClientSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, err
default:
return nil, fmt.Errorf("xds: unsupported type %T for RouteSpecifier in received LDS response", apiLis.RouteSpecifier)
}
update.MaxStreamDuration = apiLis.GetCommonHttpProtocolOptions().GetMaxStreamDuration().AsDuration()
return update, nil
}
func processServerSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) {
@ -346,12 +352,16 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger)
}
route.Action = clusters
msd := action.GetMaxStreamDuration()
// Prefer grpc_timeout_header_max, if set.
if dur := msd.GetGrpcTimeoutHeaderMax(); dur != nil {
route.MaxStreamDuration = dur.AsDuration()
} else {
route.MaxStreamDuration = msd.GetMaxStreamDuration().AsDuration()
dur := msd.GetGrpcTimeoutHeaderMax()
if dur == nil {
dur = msd.GetMaxStreamDuration()
}
if dur != nil {
d := dur.AsDuration()
route.MaxStreamDuration = &d
}
routesRet = append(routesRet, &route)
}

View File

@ -201,11 +201,11 @@ var newWRR = wrr.NewRandom
func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, error) {
cs := &configSelector{
r: r,
routes: make([]route, len(su.Routes)),
routes: make([]route, len(su.routes)),
clusters: make(map[string]*clusterInfo),
}
for i, rt := range su.Routes {
for i, rt := range su.routes {
clusters := newWRR()
for cluster, weight := range rt.Action {
clusters.Add(cluster, int64(weight))
@ -227,7 +227,11 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro
if err != nil {
return nil, err
}
cs.routes[i].maxStreamDuration = rt.MaxStreamDuration
if rt.MaxStreamDuration == nil {
cs.routes[i].maxStreamDuration = su.ldsConfig.maxStreamDuration
} else {
cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration
}
}
return cs, nil

View File

@ -22,17 +22,28 @@ import (
"fmt"
"strings"
"sync"
"time"
"google.golang.org/grpc/internal/grpclog"
xdsclient "google.golang.org/grpc/xds/internal/client"
)
// serviceUpdate contains information received from the RDS responses which is
// of interested to the xds resolver. The RDS request is built by first making a
// LDS to get the RouteConfig name.
// serviceUpdate contains information received from the LDS/RDS responses which
// are of interest to the xds resolver. The RDS request is built by first
// making a LDS to get the RouteConfig name.
type serviceUpdate struct {
// Routes contain matchers+actions to route RPCs.
Routes []*xdsclient.Route
// routes contain matchers+actions to route RPCs.
routes []*xdsclient.Route
// ldsConfig contains configuration that applies to all routes.
ldsConfig ldsConfig
}
// ldsConfig contains information received from the LDS responses which are of
// interest to the xds resolver.
type ldsConfig struct {
// maxStreamDuration is from the HTTP connection manager's
// common_http_protocol_options field.
maxStreamDuration time.Duration
}
// watchService uses LDS and RDS to discover information about the provided
@ -61,6 +72,7 @@ type serviceUpdateWatcher struct {
serviceName string
ldsCancel func()
serviceCb func(serviceUpdate, error)
lastUpdate serviceUpdate
mu sync.Mutex
closed bool
@ -84,6 +96,7 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsclient.ListenerUpdate, er
w.rdsCancel()
w.rdsName = ""
w.rdsCancel = nil
w.lastUpdate = serviceUpdate{}
}
// The other error cases still return early without canceling the
// existing RDS watch.
@ -91,9 +104,18 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsclient.ListenerUpdate, er
return
}
oldLDSConfig := w.lastUpdate.ldsConfig
w.lastUpdate.ldsConfig = ldsConfig{maxStreamDuration: update.MaxStreamDuration}
if w.rdsName == update.RouteConfigName {
// If the new RouteConfigName is same as the previous, don't cancel and
// restart the RDS watch.
if w.lastUpdate.ldsConfig != oldLDSConfig {
// The route name didn't change but the LDS data did; send it now.
// If the route name did change, then we will wait until the first
// RDS update before reporting this LDS config.
w.serviceCb(w.lastUpdate, nil)
}
return
}
w.rdsName = update.RouteConfigName
@ -127,7 +149,8 @@ func (w *serviceUpdateWatcher) handleRDSResp(update xdsclient.RouteConfigUpdate,
return
}
w.serviceCb(serviceUpdate{Routes: matchVh.Routes}, nil)
w.lastUpdate.routes = matchVh.Routes
w.serviceCb(w.lastUpdate, nil)
}
func (w *serviceUpdateWatcher) close() {

View File

@ -22,6 +22,7 @@ import (
"context"
"fmt"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
@ -138,7 +139,7 @@ func verifyServiceUpdate(ctx context.Context, updateCh *testutils.Channel, wantU
return fmt.Errorf("timeout when waiting for service update: %v", err)
}
gotUpdate := u.(serviceUpdateErr)
if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty()) {
if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty(), cmp.AllowUnexported(serviceUpdate{}, ldsConfig{})) {
return fmt.Errorf("unexpected service update: (%v, %v), want: (%v, nil), diff (-want +got):\n%s", gotUpdate.u, gotUpdate.err, wantUpdate, cmp.Diff(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty()))
}
return nil
@ -165,7 +166,7 @@ func (s) TestServiceWatch(t *testing.T) {
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
wantUpdate := serviceUpdate{Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}
wantUpdate := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
@ -179,7 +180,7 @@ func (s) TestServiceWatch(t *testing.T) {
}
wantUpdate2 := serviceUpdate{
Routes: []*xdsclient.Route{{
routes: []*xdsclient.Route{{
Path: newStringP(""),
Action: map[string]uint32{cluster: 1},
}},
@ -219,7 +220,7 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
wantUpdate := serviceUpdate{Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}
wantUpdate := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
@ -240,7 +241,7 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {
waitForWatchRouteConfig(ctx, t, xdsC, routeStr+"2")
// RDS update for the new name.
wantUpdate2 := serviceUpdate{Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster + "2": 1}}}}
wantUpdate2 := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster + "2": 1}}}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
@ -254,6 +255,58 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {
}
}
// TestServiceWatchLDSUpdate covers the case that after first LDS and first RDS
// response, the second LDS response includes a new MaxStreamDuration. It also
// verifies this is reported in subsequent RDS updates.
func (s) TestServiceWatchLDSUpdateMaxStreamDuration(t *testing.T) {
serviceUpdateCh := testutils.NewChannel()
xdsC := fakeclient.NewClient()
cancelWatch := watchService(xdsC, targetStr, func(update serviceUpdate, err error) {
serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
}, nil)
defer cancelWatch()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, MaxStreamDuration: time.Second}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
wantUpdate := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}, ldsConfig: ldsConfig{maxStreamDuration: time.Second}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}},
},
},
}, nil)
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
t.Fatal(err)
}
// Another LDS update with the same RDS_name but different MaxStreamDuration (zero in this case).
wantUpdate2 := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil {
t.Fatal(err)
}
// RDS update.
wantUpdate3 := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster + "2": 1}}}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster + "2": 1}}},
},
},
}, nil)
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate3); err != nil {
t.Fatal(err)
}
}
// TestServiceNotCancelRDSOnSameLDSUpdate covers the case that if the second LDS
// update contains the same RDS name as the previous, the RDS watch isn't
// canceled and restarted.
@ -271,7 +324,7 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) {
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
wantUpdate := serviceUpdate{Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}
wantUpdate := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{

View File

@ -511,7 +511,7 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, MaxStreamDuration: time.Second}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
defer func(oldNewWRR func() wrr.WRR) { newWRR = oldNewWRR }(newWRR)
@ -526,11 +526,11 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) {
Routes: []*client.Route{{
Prefix: newStringP("/foo"),
Action: map[string]uint32{"A": 1},
MaxStreamDuration: 5 * time.Second,
MaxStreamDuration: newDurationP(5 * time.Second),
}, {
Prefix: newStringP("/bar"),
Action: map[string]uint32{"B": 1},
MaxStreamDuration: time.Duration(0),
MaxStreamDuration: newDurationP(0),
}, {
Prefix: newStringP(""),
Action: map[string]uint32{"C": 1},
@ -554,43 +554,50 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) {
}
testCases := []struct {
name string
method string
timeoutSupport bool
want *time.Duration
}{{
name: "RDS setting",
method: "/foo/method",
timeoutSupport: true,
want: func() *time.Duration { x := 5 * time.Second; return &x }(),
want: newDurationP(5 * time.Second),
}, {
name: "timeout support disabled",
method: "/foo/method",
timeoutSupport: false,
want: nil,
}, {
name: "explicit zero in RDS; ignore LDS",
method: "/bar/method",
timeoutSupport: true,
want: nil,
}, {
name: "no config in RDS; fallback to LDS",
method: "/baz/method",
timeoutSupport: true,
want: nil,
want: newDurationP(time.Second),
}}
for _, tc := range testCases {
env.TimeoutSupport = tc.timeoutSupport
req := iresolver.RPCInfo{
Method: tc.method,
Context: context.Background(),
}
res, err := cs.SelectConfig(req)
if err != nil {
t.Errorf("Unexpected error from cs.SelectConfig(%v): %v", req, err)
continue
}
res.OnCommitted()
got := res.MethodConfig.Timeout
if !reflect.DeepEqual(got, tc.want) {
t.Errorf("For method %q: res.MethodConfig.Timeout = %v; want %v", tc.method, got, tc.want)
}
t.Run(tc.name, func(t *testing.T) {
env.TimeoutSupport = tc.timeoutSupport
req := iresolver.RPCInfo{
Method: tc.method,
Context: context.Background(),
}
res, err := cs.SelectConfig(req)
if err != nil {
t.Errorf("Unexpected error from cs.SelectConfig(%v): %v", req, err)
return
}
res.OnCommitted()
got := res.MethodConfig.Timeout
if !reflect.DeepEqual(got, tc.want) {
t.Errorf("For method %q: res.MethodConfig.Timeout = %v; want %v", tc.method, got, tc.want)
}
})
}
}
@ -856,3 +863,7 @@ func replaceRandNumGenerator(start int64) func() {
grpcrandInt63n = grpcrand.Int63n
}
}
func newDurationP(d time.Duration) *time.Duration {
return &d
}