xds: Instantiated HTTP Filters on Server Side (#4669)

* Instantiated HTTP Filters on Server Side
This commit is contained in:
Zach Reyes 2021-08-31 09:27:06 -04:00 committed by GitHub
parent ef66d13abb
commit 198d951db5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 354 additions and 34 deletions

View File

@ -117,9 +117,12 @@ type ClientInterceptor interface {
NewStream(ctx context.Context, ri RPCInfo, done func(), newStream func(ctx context.Context, done func()) (ClientStream, error)) (ClientStream, error)
}
// ServerInterceptor is unimplementable; do not use.
// ServerInterceptor is an interceptor for incoming RPC's on gRPC server side.
type ServerInterceptor interface {
notDefined()
// AllowRPC checks if an incoming RPC is allowed to proceed based on
// information about connection RPC was received on, and HTTP Headers. This
// information will be piped into context.
AllowRPC(ctx context.Context) error // TODO: Make this a real interceptor for filters such as rate limiting.
}
type csKeyType string

View File

@ -117,8 +117,8 @@ type routeCluster struct {
}
type route struct {
m *compositeMatcher // converted from route matchers
clusters wrr.WRR // holds *routeCluster entries
m *xdsclient.CompositeMatcher // converted from route matchers
clusters wrr.WRR // holds *routeCluster entries
maxStreamDuration time.Duration
// map from filter name to its config
httpFilterConfigOverride map[string]httpfilter.FilterConfig
@ -146,7 +146,7 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
var rt *route
// Loop through routes in order and select first match.
for _, r := range cs.routes {
if r.m.match(rpcInfo) {
if r.m.Match(rpcInfo) {
rt = &r
break
}
@ -350,7 +350,7 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro
cs.routes[i].clusters = clusters
var err error
cs.routes[i].m, err = routeToMatcher(rt)
cs.routes[i].m, err = xdsclient.RouteToMatcher(rt)
if err != nil {
return nil, err
}

View File

@ -1391,13 +1391,13 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) {
func replaceRandNumGenerator(start int64) func() {
nextInt := start
grpcrandInt63n = func(int64) (ret int64) {
xdsclient.RandInt63n = func(int64) (ret int64) {
ret = nextInt
nextInt++
return
}
return func() {
grpcrandInt63n = grpcrand.Int63n
xdsclient.RandInt63n = grpcrand.Int63n
}
}

View File

@ -58,6 +58,10 @@ type connWrapper struct {
// completing the HTTP2 handshake.
deadlineMu sync.Mutex
deadline time.Time
// The virtual hosts with matchable routes and instantiated HTTP Filters per
// route.
virtualHosts []xdsclient.VirtualHostWithInterceptors
}
// SetDeadline makes a copy of the passed in deadline and forwards the call to

View File

@ -21,10 +21,13 @@
package server
import (
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"unsafe"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/grpclog"
@ -208,12 +211,13 @@ type listenerWrapper struct {
mode ServingMode
// Filter chains received as part of the last good update.
filterChains *xdsclient.FilterChainManager
// rdsHandler is used for any dynamic RDS resources specified in a LDS
// update.
rdsHandler *rdsHandler
// rdsUpdates are the RDS resources received from the management
// server, keyed on the RouteName of the RDS resource.
rdsUpdates map[string]xdsclient.RouteConfigUpdate // TODO: if this will be read in accept, this will need a read lock as well.
rdsUpdates unsafe.Pointer // map[string]xdsclient.RouteConfigUpdate
// ldsUpdateCh is a channel for XDSClient LDS updates.
ldsUpdateCh chan ldsUpdateWithError
// rdsUpdateCh is a channel for XDSClient RDS updates.
@ -297,11 +301,35 @@ func (l *listenerWrapper) Accept() (net.Conn, error) {
conn.Close()
continue
}
// TODO: once matched an accepted connection to a filter chain,
// instantiate the HTTP filters in the filter chain + the filter
// overrides, pipe filters and route into connection, which will
// eventually be passed to xdsUnary/Stream interceptors.
return &connWrapper{Conn: conn, filterChain: fc, parent: l}, nil
var rc xdsclient.RouteConfigUpdate
if fc.InlineRouteConfig != nil {
rc = *fc.InlineRouteConfig
} else {
rcPtr := atomic.LoadPointer(&l.rdsUpdates)
rcuPtr := (*map[string]xdsclient.RouteConfigUpdate)(rcPtr)
// This shouldn't happen, but this error protects against a panic.
if rcuPtr == nil {
return nil, errors.New("route configuration pointer is nil")
}
rcu := *rcuPtr
rc = rcu[fc.RouteConfigName]
}
// The filter chain will construct a usuable route table on each
// connection accept. This is done because preinstantiating every route
// table before it is needed for a connection would potentially lead to
// a lot of cpu time and memory allocated for route tables that will
// never be used. There was also a thought to cache this configuration,
// and reuse it for the next accepted connection. However, this would
// lead to a lot of code complexity (RDS Updates for a given route name
// can come it at any time), and connections aren't accepted too often,
// so this reinstantation of the Route Configuration is an acceptable
// tradeoff for simplicity.
if err := fc.ConstructUsableRouteConfiguration(rc); err != nil {
l.logger.Warningf("route configuration construction: %v", err)
conn.Close()
continue
}
return &connWrapper{Conn: conn, filterChain: fc, parent: l, virtualHosts: fc.VirtualHosts}, nil
}
}
@ -367,7 +395,7 @@ func (l *listenerWrapper) handleRDSUpdate(update rdsHandlerUpdate) {
// continue to use the old configuration.
return
}
l.rdsUpdates = update.updates
atomic.StorePointer(&l.rdsUpdates, unsafe.Pointer(&update.updates))
l.switchMode(l.filterChains, ServingModeServing, nil)
l.goodUpdate.Fire()

View File

@ -28,6 +28,8 @@ import (
v3tlspb "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/xds/internal/httpfilter"
"google.golang.org/grpc/xds/internal/version"
)
@ -65,6 +67,81 @@ type FilterChain struct {
//
// Only one of RouteConfigName and InlineRouteConfig is set.
InlineRouteConfig *RouteConfigUpdate
// VirtualHosts are the virtual hosts ready to be used in the xds interceptors.
// It contains a way to match routes using a matcher and also instantiates
// HTTPFilter overrides to simply run incoming RPC's through if they are selected.
VirtualHosts []VirtualHostWithInterceptors
}
// VirtualHostWithInterceptors captures information present in a VirtualHost
// update, and also contains routes with instantiated HTTP Filters.
type VirtualHostWithInterceptors struct {
// Domains are the domain names which map to this Virtual Host. On the
// server side, this will be dictated by the :authority header of the
// incoming RPC.
Domains []string
// Routes are the Routes for this Virtual Host.
Routes []RouteWithInterceptors
}
// RouteWithInterceptors captures information in a Route, and contains
// a usable matcher and also instantiated HTTP Filters.
type RouteWithInterceptors struct {
// M is the matcher used to match to this route.
M *CompositeMatcher
// Interceptors are interceptors instantiated for this route. These will be
// constructed from a combination of the top level configuration and any
// HTTP Filter overrides present in Virtual Host or Route.
Interceptors []resolver.ServerInterceptor
}
// ConstructUsableRouteConfiguration takes Route Configuration and converts it
// into matchable route configuration, with instantiated HTTP Filters per route.
func (f *FilterChain) ConstructUsableRouteConfiguration(config RouteConfigUpdate) error {
vhs := make([]VirtualHostWithInterceptors, len(config.VirtualHosts))
for _, vh := range config.VirtualHosts {
vhwi, err := f.convertVirtualHost(vh)
if err != nil {
return fmt.Errorf("virtual host construction: %v", err)
}
vhs = append(vhs, vhwi)
}
f.VirtualHosts = vhs
return nil
}
func (f *FilterChain) convertVirtualHost(virtualHost *VirtualHost) (VirtualHostWithInterceptors, error) {
rs := make([]RouteWithInterceptors, len(virtualHost.Routes))
for i, r := range virtualHost.Routes {
var err error
rs[i].M, err = RouteToMatcher(r)
if err != nil {
return VirtualHostWithInterceptors{}, fmt.Errorf("matcher construction: %v", err)
}
for _, filter := range f.HTTPFilters {
// Route is highest priority on server side, as there is no concept
// of an upstream cluster on server side.
override := r.HTTPFilterConfigOverride[filter.Name]
if override == nil {
// Virtual Host is second priority.
override = virtualHost.HTTPFilterConfigOverride[filter.Name]
}
sb, ok := filter.Filter.(httpfilter.ServerInterceptorBuilder)
if !ok {
// Should not happen if it passed xdsClient validation.
return VirtualHostWithInterceptors{}, fmt.Errorf("filter does not support use in server")
}
si, err := sb.BuildServerInterceptor(filter.Config, override)
if err != nil {
return VirtualHostWithInterceptors{}, fmt.Errorf("filter construction: %v", err)
}
if si != nil {
rs[i].Interceptors = append(rs[i].Interceptors, si)
}
}
}
return VirtualHostWithInterceptors{Domains: virtualHost.Domains, Routes: rs}, nil
}
// SourceType specifies the connection source IP match type.

View File

@ -19,6 +19,8 @@
package xdsclient
import (
"context"
"errors"
"fmt"
"net"
"strings"
@ -36,6 +38,7 @@ import (
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/wrapperspb"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/xds/internal/httpfilter"
"google.golang.org/grpc/xds/internal/httpfilter/router"
@ -43,6 +46,12 @@ import (
"google.golang.org/grpc/xds/internal/version"
)
const (
topLevel = "top level"
vhLevel = "virtual host level"
rLevel = "route level"
)
var (
routeConfig = &v3routepb.RouteConfiguration{
Name: "routeName",
@ -2442,6 +2451,202 @@ func TestLookup_Successes(t *testing.T) {
}
}
type filterCfg struct {
httpfilter.FilterConfig
// Level is what differentiates top level filters ("top level") vs. second
// level ("virtual host level"), and third level ("route level").
level string
}
type filterBuilder struct {
httpfilter.Filter
}
var _ httpfilter.ServerInterceptorBuilder = &filterBuilder{}
func (fb *filterBuilder) BuildServerInterceptor(config httpfilter.FilterConfig, override httpfilter.FilterConfig) (iresolver.ServerInterceptor, error) {
var level string
level = config.(filterCfg).level
if override != nil {
level = override.(filterCfg).level
}
return &serverInterceptor{level: level}, nil
}
type serverInterceptor struct {
level string
}
func (si *serverInterceptor) AllowRPC(context.Context) error {
return errors.New(si.level)
}
func TestHTTPFilterInstantiation(t *testing.T) {
tests := []struct {
name string
filters []HTTPFilter
routeConfig RouteConfigUpdate
// A list of strings which will be built from iterating through the
// filters ["top level", "vh level", "route level", "route level"...]
// wantErrs is the list of error strings that will be constructed from
// the deterministic iteration through the vh list and route list. The
// error string will be determined by the level of config that the
// filter builder receives (i.e. top level, vs. virtual host level vs.
// route level).
wantErrs []string
}{
{
name: "one http filter no overrides",
filters: []HTTPFilter{
{Name: "server-interceptor", Filter: &filterBuilder{}, Config: filterCfg{level: topLevel}},
},
routeConfig: RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{"target"},
Routes: []*Route{{
Prefix: newStringP("1"),
},
},
},
}},
wantErrs: []string{topLevel},
},
{
name: "one http filter vh override",
filters: []HTTPFilter{
{Name: "server-interceptor", Filter: &filterBuilder{}, Config: filterCfg{level: topLevel}},
},
routeConfig: RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{"target"},
Routes: []*Route{{
Prefix: newStringP("1"),
},
},
HTTPFilterConfigOverride: map[string]httpfilter.FilterConfig{
"server-interceptor": filterCfg{level: vhLevel},
},
},
}},
wantErrs: []string{vhLevel},
},
{
name: "one http filter route override",
filters: []HTTPFilter{
{Name: "server-interceptor", Filter: &filterBuilder{}, Config: filterCfg{level: topLevel}},
},
routeConfig: RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{"target"},
Routes: []*Route{{
Prefix: newStringP("1"),
HTTPFilterConfigOverride: map[string]httpfilter.FilterConfig{
"server-interceptor": filterCfg{level: rLevel},
},
},
},
},
}},
wantErrs: []string{rLevel},
},
// This tests the scenario where there are three http filters, and one
// gets overridden by route and one by virtual host.
{
name: "three http filters vh override route override",
filters: []HTTPFilter{
{Name: "server-interceptor1", Filter: &filterBuilder{}, Config: filterCfg{level: topLevel}},
{Name: "server-interceptor2", Filter: &filterBuilder{}, Config: filterCfg{level: topLevel}},
{Name: "server-interceptor3", Filter: &filterBuilder{}, Config: filterCfg{level: topLevel}},
},
routeConfig: RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{"target"},
Routes: []*Route{{
Prefix: newStringP("1"),
HTTPFilterConfigOverride: map[string]httpfilter.FilterConfig{
"server-interceptor3": filterCfg{level: rLevel},
},
},
},
HTTPFilterConfigOverride: map[string]httpfilter.FilterConfig{
"server-interceptor2": filterCfg{level: vhLevel},
},
},
}},
wantErrs: []string{topLevel, vhLevel, rLevel},
},
// This tests the scenario where there are three http filters, and two
// virtual hosts with different vh + route overrides for each virtual
// host.
{
name: "three http filters two vh",
filters: []HTTPFilter{
{Name: "server-interceptor1", Filter: &filterBuilder{}, Config: filterCfg{level: topLevel}},
{Name: "server-interceptor2", Filter: &filterBuilder{}, Config: filterCfg{level: topLevel}},
{Name: "server-interceptor3", Filter: &filterBuilder{}, Config: filterCfg{level: topLevel}},
},
routeConfig: RouteConfigUpdate{
VirtualHosts: []*VirtualHost{
{
Domains: []string{"target"},
Routes: []*Route{{
Prefix: newStringP("1"),
HTTPFilterConfigOverride: map[string]httpfilter.FilterConfig{
"server-interceptor3": filterCfg{level: rLevel},
},
},
},
HTTPFilterConfigOverride: map[string]httpfilter.FilterConfig{
"server-interceptor2": filterCfg{level: vhLevel},
},
},
{
Domains: []string{"target"},
Routes: []*Route{{
Prefix: newStringP("1"),
HTTPFilterConfigOverride: map[string]httpfilter.FilterConfig{
"server-interceptor1": filterCfg{level: rLevel},
"server-interceptor2": filterCfg{level: rLevel},
},
},
},
HTTPFilterConfigOverride: map[string]httpfilter.FilterConfig{
"server-interceptor2": filterCfg{level: vhLevel},
"server-interceptor3": filterCfg{level: vhLevel},
},
},
}},
wantErrs: []string{topLevel, vhLevel, rLevel, rLevel, rLevel, vhLevel},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fc := FilterChain{
HTTPFilters: test.filters,
}
fc.ConstructUsableRouteConfiguration(test.routeConfig)
// Build out list of errors by iterating through the virtual hosts and routes,
// and running the filters in route configurations.
var errs []string
for _, vh := range fc.VirtualHosts {
for _, r := range vh.Routes {
for _, int := range r.Interceptors {
errs = append(errs, int.AllowRPC(context.Background()).Error())
}
}
}
if !cmp.Equal(errs, test.wantErrs) {
t.Fatalf("List of errors %v, want %v", errs, test.wantErrs)
}
})
}
}
// The Equal() methods defined below help with using cmp.Equal() on these types
// which contain all unexported fields.

View File

@ -16,7 +16,7 @@
*
*/
package resolver
package xdsclient
import (
"fmt"
@ -27,10 +27,10 @@ import (
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/xds/matcher"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/xds/internal/xdsclient"
)
func routeToMatcher(r *xdsclient.Route) (*compositeMatcher, error) {
// RouteToMatcher converts a route to a Matcher to match incoming RPC's against.
func RouteToMatcher(r *Route) (*CompositeMatcher, error) {
var pm pathMatcher
switch {
case r.Regex != nil:
@ -75,18 +75,20 @@ func routeToMatcher(r *xdsclient.Route) (*compositeMatcher, error) {
return newCompositeMatcher(pm, headerMatchers, fractionMatcher), nil
}
// compositeMatcher.match returns true if all matchers return true.
type compositeMatcher struct {
// CompositeMatcher is a matcher that holds onto many matchers and aggregates
// the matching results.
type CompositeMatcher struct {
pm pathMatcher
hms []matcher.HeaderMatcher
fm *fractionMatcher
}
func newCompositeMatcher(pm pathMatcher, hms []matcher.HeaderMatcher, fm *fractionMatcher) *compositeMatcher {
return &compositeMatcher{pm: pm, hms: hms, fm: fm}
func newCompositeMatcher(pm pathMatcher, hms []matcher.HeaderMatcher, fm *fractionMatcher) *CompositeMatcher {
return &CompositeMatcher{pm: pm, hms: hms, fm: fm}
}
func (a *compositeMatcher) match(info iresolver.RPCInfo) bool {
// Match returns true if all matchers return true.
func (a *CompositeMatcher) Match(info iresolver.RPCInfo) bool {
if a.pm != nil && !a.pm.match(info.Method) {
return false
}
@ -119,7 +121,7 @@ func (a *compositeMatcher) match(info iresolver.RPCInfo) bool {
return true
}
func (a *compositeMatcher) String() string {
func (a *CompositeMatcher) String() string {
var ret string
if a.pm != nil {
ret += a.pm.String()
@ -141,10 +143,11 @@ func newFractionMatcher(fraction uint32) *fractionMatcher {
return &fractionMatcher{fraction: int64(fraction)}
}
var grpcrandInt63n = grpcrand.Int63n
// RandInt63n overwrites grpcrand for control in tests.
var RandInt63n = grpcrand.Int63n
func (fm *fractionMatcher) match() bool {
t := grpcrandInt63n(1000000)
t := RandInt63n(1000000)
return t <= fm.fraction
}

View File

@ -16,7 +16,7 @@
*
*/
package resolver
package xdsclient
import (
"regexp"

View File

@ -16,7 +16,7 @@
*
*/
package resolver
package xdsclient
import (
"regexp"

View File

@ -16,7 +16,7 @@
*
*/
package resolver
package xdsclient
import (
"context"
@ -107,7 +107,7 @@ func TestAndMatcherMatch(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := newCompositeMatcher(tt.pm, []matcher.HeaderMatcher{tt.hm}, nil)
if got := a.match(tt.info); got != tt.want {
if got := a.Match(tt.info); got != tt.want {
t.Errorf("match() = %v, want %v", got, tt.want)
}
})
@ -118,11 +118,11 @@ func TestFractionMatcherMatch(t *testing.T) {
const fraction = 500000
fm := newFractionMatcher(fraction)
defer func() {
grpcrandInt63n = grpcrand.Int63n
RandInt63n = grpcrand.Int63n
}()
// rand > fraction, should return false.
grpcrandInt63n = func(n int64) int64 {
RandInt63n = func(n int64) int64 {
return fraction + 1
}
if matched := fm.match(); matched {
@ -130,7 +130,7 @@ func TestFractionMatcherMatch(t *testing.T) {
}
// rand == fraction, should return true.
grpcrandInt63n = func(n int64) int64 {
RandInt63n = func(n int64) int64 {
return fraction
}
if matched := fm.match(); !matched {
@ -138,7 +138,7 @@ func TestFractionMatcherMatch(t *testing.T) {
}
// rand < fraction, should return true.
grpcrandInt63n = func(n int64) int64 {
RandInt63n = func(n int64) int64 {
return fraction - 1
}
if matched := fm.match(); !matched {