xds: implement fault injection HTTP filter (A33) (#4236)

This commit is contained in:
Doug Fawley 2021-03-12 08:38:49 -08:00 committed by GitHub
parent f168a3cb3b
commit d7737376c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 947 additions and 2 deletions

View File

@ -4,6 +4,7 @@ github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403 h1:cqQfy1jclcSy/FwLjemeg3SR1yaINm74aQyupQ0Bl8M=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d h1:QyzYnTnPE15SQyUeqU6qLbWxMkwyAyu+vGksa0b7j00=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
@ -29,9 +30,11 @@ github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@ -77,7 +80,9 @@ google.golang.org/protobuf v1.24.0 h1:UhZDfRO8JRQru4/+LlLE0BRKGF8L+PICnvYZmx/fEG
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@ -181,7 +181,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
if err != nil {
return nil, status.Convert(err).Err()
return nil, toRPCErr(err)
}
if rpcConfig != nil {
@ -196,7 +196,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns)
if err != nil {
return nil, status.Convert(err).Err()
return nil, toRPCErr(err)
}
return cs, nil
}

View File

@ -0,0 +1,295 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package fault implements the Envoy Fault Injection HTTP filter.
package fault
import (
"context"
"errors"
"fmt"
"io"
"strconv"
"sync/atomic"
"time"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/grpcrand"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/env"
"google.golang.org/grpc/xds/internal/httpfilter"
"google.golang.org/protobuf/types/known/anypb"
cpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/common/fault/v3"
fpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/fault/v3"
tpb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
)
const headerAbortHTTPStatus = "x-envoy-fault-abort-request"
const headerAbortGRPCStatus = "x-envoy-fault-abort-grpc-request"
const headerAbortPercentage = "x-envoy-fault-abort-request-percentage"
const headerDelayPercentage = "x-envoy-fault-delay-request-percentage"
const headerDelayDuration = "x-envoy-fault-delay-request"
var statusMap = map[int]codes.Code{
400: codes.Internal,
401: codes.Unauthenticated,
403: codes.PermissionDenied,
404: codes.Unimplemented,
429: codes.Unavailable,
502: codes.Unavailable,
503: codes.Unavailable,
504: codes.Unavailable,
}
func init() {
if env.FaultInjectionSupport {
httpfilter.Register(builder{})
}
}
type builder struct {
}
type config struct {
httpfilter.FilterConfig
config *fpb.HTTPFault
}
func (builder) TypeURLs() []string {
return []string{"type.googleapis.com/envoy.extensions.filters.http.fault.v3.HTTPFault"}
}
// Parsing is the same for the base config and the override config.
func parseConfig(cfg proto.Message) (httpfilter.FilterConfig, error) {
if cfg == nil {
return nil, fmt.Errorf("fault: nil configuration message provided")
}
any, ok := cfg.(*anypb.Any)
if !ok {
return nil, fmt.Errorf("fault: error parsing config %v: unknown type %T", cfg, cfg)
}
msg := new(fpb.HTTPFault)
if err := ptypes.UnmarshalAny(any, msg); err != nil {
return nil, fmt.Errorf("fault: error parsing config %v: %v", cfg, err)
}
return config{config: msg}, nil
}
func (builder) ParseFilterConfig(cfg proto.Message) (httpfilter.FilterConfig, error) {
return parseConfig(cfg)
}
func (builder) ParseFilterConfigOverride(override proto.Message) (httpfilter.FilterConfig, error) {
return parseConfig(override)
}
var _ httpfilter.ClientInterceptorBuilder = builder{}
func (builder) BuildClientInterceptor(cfg, override httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) {
if cfg == nil {
return nil, fmt.Errorf("fault: nil config provided")
}
c, ok := cfg.(config)
if !ok {
return nil, fmt.Errorf("fault: incorrect config type provided (%T): %v", cfg, cfg)
}
if override != nil {
// override completely replaces the listener configuration; but we
// still validate the listener config type.
c, ok = override.(config)
if !ok {
return nil, fmt.Errorf("fault: incorrect override config type provided (%T): %v", override, override)
}
}
return &interceptor{config: c.config}, nil
}
type interceptor struct {
config *fpb.HTTPFault
}
var activeFaults uint32 // global active faults; accessed atomically
func (i *interceptor) NewStream(ctx context.Context, ri iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) {
if maxAF := i.config.GetMaxActiveFaults(); maxAF != nil {
defer atomic.AddUint32(&activeFaults, ^uint32(0)) // decrement counter
if af := atomic.AddUint32(&activeFaults, 1); af > maxAF.GetValue() {
// Would exceed maximum active fault limit.
return newStream(ctx, done)
}
}
if err := injectDelay(ctx, i.config.GetDelay()); err != nil {
return nil, err
}
if err := injectAbort(ctx, i.config.GetAbort()); err != nil {
if err == errOKStream {
return &okStream{ctx: ctx}, nil
}
return nil, err
}
return newStream(ctx, done)
}
// For overriding in tests
var randIntn = grpcrand.Intn
var newTimer = time.NewTimer
func injectDelay(ctx context.Context, delayCfg *cpb.FaultDelay) error {
numerator, denominator := splitPct(delayCfg.GetPercentage())
var delay time.Duration
switch delayType := delayCfg.GetFaultDelaySecifier().(type) {
case *cpb.FaultDelay_FixedDelay:
delay = delayType.FixedDelay.AsDuration()
case *cpb.FaultDelay_HeaderDelay_:
md, _ := metadata.FromOutgoingContext(ctx)
v := md[headerDelayDuration]
if v == nil {
// No delay configured for this RPC.
return nil
}
ms, ok := parseIntFromMD(v)
if !ok {
// Malformed header; no delay.
return nil
}
delay = time.Duration(ms) * time.Millisecond
if v := md[headerDelayPercentage]; v != nil {
if num, ok := parseIntFromMD(v); ok && num < numerator {
numerator = num
}
}
}
if delay == 0 || randIntn(denominator) >= numerator {
return nil
}
t := newTimer(delay)
select {
case <-t.C:
case <-ctx.Done():
t.Stop()
return ctx.Err()
}
return nil
}
func injectAbort(ctx context.Context, abortCfg *fpb.FaultAbort) error {
numerator, denominator := splitPct(abortCfg.GetPercentage())
code := codes.OK
okCode := false
switch errType := abortCfg.GetErrorType().(type) {
case *fpb.FaultAbort_HttpStatus:
code, okCode = grpcFromHTTP(int(errType.HttpStatus))
case *fpb.FaultAbort_GrpcStatus:
code, okCode = sanitizeGRPCCode(codes.Code(errType.GrpcStatus)), true
case *fpb.FaultAbort_HeaderAbort_:
md, _ := metadata.FromOutgoingContext(ctx)
if v := md[headerAbortHTTPStatus]; v != nil {
// HTTP status has priority over gRPC status.
if httpStatus, ok := parseIntFromMD(v); ok {
code, okCode = grpcFromHTTP(httpStatus)
}
} else if v := md[headerAbortGRPCStatus]; v != nil {
if grpcStatus, ok := parseIntFromMD(v); ok {
code, okCode = sanitizeGRPCCode(codes.Code(grpcStatus)), true
}
}
if v := md[headerAbortPercentage]; v != nil {
if num, ok := parseIntFromMD(v); ok && num < numerator {
numerator = num
}
}
}
if !okCode || randIntn(denominator) >= numerator {
return nil
}
if code == codes.OK {
return errOKStream
}
return status.Errorf(code, "RPC terminated due to fault injection")
}
var errOKStream = errors.New("stream terminated early with OK status")
// parseIntFromMD returns the integer in the last header or nil if parsing
// failed.
func parseIntFromMD(header []string) (int, bool) {
if len(header) == 0 {
return 0, false
}
v, err := strconv.Atoi(header[len(header)-1])
return v, err == nil
}
func splitPct(fp *tpb.FractionalPercent) (num int, den int) {
if fp == nil {
return 0, 100
}
num = int(fp.GetNumerator())
switch fp.GetDenominator() {
case tpb.FractionalPercent_HUNDRED:
return num, 100
case tpb.FractionalPercent_TEN_THOUSAND:
return num, 10 * 1000
case tpb.FractionalPercent_MILLION:
return num, 1000 * 1000
}
return num, 100
}
func grpcFromHTTP(httpStatus int) (codes.Code, bool) {
if httpStatus < 200 || httpStatus >= 600 {
// Malformed; ignore this fault type.
return codes.OK, false
}
if c := statusMap[httpStatus]; c != codes.OK {
// OK = 0/the default for the map.
return c, true
}
// All undefined HTTP status codes convert to Unknown. HTTP status of 200
// is "success", but gRPC converts to Unknown due to missing grpc status.
return codes.Unknown, true
}
func sanitizeGRPCCode(c codes.Code) codes.Code {
if c > codes.Code(16) {
return codes.Unknown
}
return c
}
type okStream struct {
ctx context.Context
}
func (*okStream) Header() (metadata.MD, error) { return nil, nil }
func (*okStream) Trailer() metadata.MD { return nil }
func (*okStream) CloseSend() error { return nil }
func (o *okStream) Context() context.Context { return o.ctx }
func (*okStream) SendMsg(m interface{}) error { return io.EOF }
func (*okStream) RecvMsg(m interface{}) error { return io.EOF }

View File

@ -0,0 +1,644 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package xds_test contains e2e tests for xDS use.
package fault
import (
"context"
"fmt"
"io"
"net"
"reflect"
"testing"
"time"
"github.com/golang/protobuf/ptypes"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/env"
"google.golang.org/grpc/xds/internal/httpfilter"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/e2e"
"google.golang.org/protobuf/types/known/wrapperspb"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
cpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/common/fault/v3"
fpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/fault/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
tpb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
testpb "google.golang.org/grpc/test/grpc_testing"
_ "google.golang.org/grpc/xds/internal/balancer" // Register the balancers.
_ "google.golang.org/grpc/xds/internal/client/v3" // Register the v3 xDS API client.
_ "google.golang.org/grpc/xds/internal/resolver" // Register the xds_resolver.
)
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
type testService struct {
testpb.TestServiceServer
}
func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
}
func (*testService) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
// End RPC after client does a CloseSend.
for {
if _, err := stream.Recv(); err == io.EOF {
return nil
} else if err != nil {
return err
}
}
}
// clientSetup performs a bunch of steps common to all xDS server tests here:
// - spin up an xDS management server on a local port
// - spin up a gRPC server and register the test service on it
// - create a local TCP listener and start serving on it
//
// Returns the following:
// - the management server: tests use this to configure resources
// - nodeID expected by the management server: this is set in the Node proto
// sent by the xdsClient for queries.
// - the port the server is listening on
// - cleanup function to be invoked by the tests when done
func clientSetup(t *testing.T) (*e2e.ManagementServer, string, uint32, func()) {
// Spin up a xDS management server on a local port.
nodeID := uuid.New().String()
fs, err := e2e.StartManagementServer()
if err != nil {
t.Fatal(err)
}
// Create a bootstrap file in a temporary directory.
bootstrapCleanup, err := e2e.SetupBootstrapFile(e2e.BootstrapOptions{
Version: e2e.TransportV3,
NodeID: nodeID,
ServerURI: fs.Address,
ServerListenerResourceNameTemplate: "grpc/server",
})
if err != nil {
t.Fatal(err)
}
// Initialize a gRPC server and register the stubServer on it.
server := grpc.NewServer()
testpb.RegisterTestServiceServer(server, &testService{})
// Create a local listener and pass it to Serve().
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
return fs, nodeID, uint32(lis.Addr().(*net.TCPAddr).Port), func() {
fs.Stop()
bootstrapCleanup()
server.Stop()
}
}
func init() {
env.FaultInjectionSupport = true
// Manually register to avoid a race between this init and the one that
// check the env var to register the fault injection filter.
httpfilter.Register(builder{})
}
func (s) TestFaultInjection_Unary(t *testing.T) {
type subcase struct {
name string
code codes.Code
repeat int
randIn []int // Intn calls per-repeat (not per-subcase)
delays []time.Duration // NewTimer calls per-repeat (not per-subcase)
md metadata.MD
}
testCases := []struct {
name string
cfgs []*fpb.HTTPFault
randOutInc int
want []subcase
}{{
name: "abort always",
cfgs: []*fpb.HTTPFault{{
Abort: &fpb.FaultAbort{
Percentage: &tpb.FractionalPercent{Numerator: 100, Denominator: tpb.FractionalPercent_HUNDRED},
ErrorType: &fpb.FaultAbort_GrpcStatus{GrpcStatus: uint32(codes.Aborted)},
},
}},
randOutInc: 5,
want: []subcase{{
code: codes.Aborted,
randIn: []int{100},
repeat: 25,
}},
}, {
name: "abort 10%",
cfgs: []*fpb.HTTPFault{{
Abort: &fpb.FaultAbort{
Percentage: &tpb.FractionalPercent{Numerator: 100000, Denominator: tpb.FractionalPercent_MILLION},
ErrorType: &fpb.FaultAbort_GrpcStatus{GrpcStatus: uint32(codes.Aborted)},
},
}},
randOutInc: 50000,
want: []subcase{{
name: "[0,10]%",
code: codes.Aborted,
randIn: []int{1000000},
repeat: 2,
}, {
name: "(10,100]%",
code: codes.OK,
randIn: []int{1000000},
repeat: 18,
}, {
name: "[0,10]% again",
code: codes.Aborted,
randIn: []int{1000000},
repeat: 2,
}},
}, {
name: "delay always",
cfgs: []*fpb.HTTPFault{{
Delay: &cpb.FaultDelay{
Percentage: &tpb.FractionalPercent{Numerator: 100, Denominator: tpb.FractionalPercent_HUNDRED},
FaultDelaySecifier: &cpb.FaultDelay_FixedDelay{FixedDelay: ptypes.DurationProto(time.Second)},
},
}},
randOutInc: 5,
want: []subcase{{
randIn: []int{100},
repeat: 25,
delays: []time.Duration{time.Second},
}},
}, {
name: "delay 10%",
cfgs: []*fpb.HTTPFault{{
Delay: &cpb.FaultDelay{
Percentage: &tpb.FractionalPercent{Numerator: 1000, Denominator: tpb.FractionalPercent_TEN_THOUSAND},
FaultDelaySecifier: &cpb.FaultDelay_FixedDelay{FixedDelay: ptypes.DurationProto(time.Second)},
},
}},
randOutInc: 500,
want: []subcase{{
name: "[0,10]%",
randIn: []int{10000},
repeat: 2,
delays: []time.Duration{time.Second},
}, {
name: "(10,100]%",
randIn: []int{10000},
repeat: 18,
}, {
name: "[0,10]% again",
randIn: []int{10000},
repeat: 2,
delays: []time.Duration{time.Second},
}},
}, {
name: "delay 80%, abort 50%",
cfgs: []*fpb.HTTPFault{{
Delay: &cpb.FaultDelay{
Percentage: &tpb.FractionalPercent{Numerator: 80, Denominator: tpb.FractionalPercent_HUNDRED},
FaultDelaySecifier: &cpb.FaultDelay_FixedDelay{FixedDelay: ptypes.DurationProto(3 * time.Second)},
},
Abort: &fpb.FaultAbort{
Percentage: &tpb.FractionalPercent{Numerator: 50, Denominator: tpb.FractionalPercent_HUNDRED},
ErrorType: &fpb.FaultAbort_GrpcStatus{GrpcStatus: uint32(codes.Unimplemented)},
},
}},
randOutInc: 5,
want: []subcase{{
name: "50% delay and abort",
code: codes.Unimplemented,
randIn: []int{100, 100},
repeat: 10,
delays: []time.Duration{3 * time.Second},
}, {
name: "30% delay, no abort",
randIn: []int{100, 100},
repeat: 6,
delays: []time.Duration{3 * time.Second},
}, {
name: "20% success",
randIn: []int{100, 100},
repeat: 4,
}, {
name: "50% delay and abort again",
code: codes.Unimplemented,
randIn: []int{100, 100},
repeat: 10,
delays: []time.Duration{3 * time.Second},
}},
}, {
name: "header abort",
cfgs: []*fpb.HTTPFault{{
Abort: &fpb.FaultAbort{
Percentage: &tpb.FractionalPercent{Numerator: 80, Denominator: tpb.FractionalPercent_HUNDRED},
ErrorType: &fpb.FaultAbort_HeaderAbort_{},
},
}},
randOutInc: 10,
want: []subcase{{
name: "30% abort; [0,30]%",
md: metadata.MD{
headerAbortGRPCStatus: []string{fmt.Sprintf("%d", codes.DataLoss)},
headerAbortPercentage: []string{"30"},
},
code: codes.DataLoss,
randIn: []int{100},
repeat: 3,
}, {
name: "30% abort; (30,60]%",
md: metadata.MD{
headerAbortGRPCStatus: []string{fmt.Sprintf("%d", codes.DataLoss)},
headerAbortPercentage: []string{"30"},
},
randIn: []int{100},
repeat: 3,
}, {
name: "80% abort; (60,80]%",
md: metadata.MD{
headerAbortGRPCStatus: []string{fmt.Sprintf("%d", codes.DataLoss)},
headerAbortPercentage: []string{"80"},
},
code: codes.DataLoss,
randIn: []int{100},
repeat: 2,
}, {
name: "cannot exceed percentage in filter",
md: metadata.MD{
headerAbortGRPCStatus: []string{fmt.Sprintf("%d", codes.DataLoss)},
headerAbortPercentage: []string{"100"},
},
randIn: []int{100},
repeat: 2,
}, {
name: "HTTP Status 404",
md: metadata.MD{
headerAbortHTTPStatus: []string{"404"},
headerAbortPercentage: []string{"100"},
},
code: codes.Unimplemented,
randIn: []int{100},
repeat: 1,
}, {
name: "HTTP Status 429",
md: metadata.MD{
headerAbortHTTPStatus: []string{"429"},
headerAbortPercentage: []string{"100"},
},
code: codes.Unavailable,
randIn: []int{100},
repeat: 1,
}, {
name: "HTTP Status 200",
md: metadata.MD{
headerAbortHTTPStatus: []string{"200"},
headerAbortPercentage: []string{"100"},
},
// No GRPC status, but HTTP Status of 200 translates to Unknown,
// per spec in statuscodes.md.
code: codes.Unknown,
randIn: []int{100},
repeat: 1,
}, {
name: "gRPC Status OK",
md: metadata.MD{
headerAbortGRPCStatus: []string{fmt.Sprintf("%d", codes.OK)},
headerAbortPercentage: []string{"100"},
},
// This should be Unimplemented (mismatched request/response
// count), per spec in statuscodes.md, but grpc-go currently
// returns io.EOF which status.Code() converts to Unknown
code: codes.Unknown,
randIn: []int{100},
repeat: 1,
}, {
name: "invalid header results in no abort",
md: metadata.MD{
headerAbortGRPCStatus: []string{"error"},
headerAbortPercentage: []string{"100"},
},
repeat: 1,
}, {
name: "invalid header results in default percentage",
md: metadata.MD{
headerAbortGRPCStatus: []string{fmt.Sprintf("%d", codes.DataLoss)},
headerAbortPercentage: []string{"error"},
},
code: codes.DataLoss,
randIn: []int{100},
repeat: 1,
}},
}, {
name: "header delay",
cfgs: []*fpb.HTTPFault{{
Delay: &cpb.FaultDelay{
Percentage: &tpb.FractionalPercent{Numerator: 80, Denominator: tpb.FractionalPercent_HUNDRED},
FaultDelaySecifier: &cpb.FaultDelay_HeaderDelay_{},
},
}},
randOutInc: 10,
want: []subcase{{
name: "30% delay; [0,30]%",
md: metadata.MD{
headerDelayDuration: []string{"2"},
headerDelayPercentage: []string{"30"},
},
randIn: []int{100},
delays: []time.Duration{2 * time.Millisecond},
repeat: 3,
}, {
name: "30% delay; (30, 60]%",
md: metadata.MD{
headerDelayDuration: []string{"2"},
headerDelayPercentage: []string{"30"},
},
randIn: []int{100},
repeat: 3,
}, {
name: "invalid header results in no delay",
md: metadata.MD{
headerDelayDuration: []string{"error"},
headerDelayPercentage: []string{"80"},
},
repeat: 1,
}, {
name: "invalid header results in default percentage",
md: metadata.MD{
headerDelayDuration: []string{"2"},
headerDelayPercentage: []string{"error"},
},
randIn: []int{100},
delays: []time.Duration{2 * time.Millisecond},
repeat: 1,
}, {
name: "invalid header results in default percentage",
md: metadata.MD{
headerDelayDuration: []string{"2"},
headerDelayPercentage: []string{"error"},
},
randIn: []int{100},
repeat: 1,
}, {
name: "cannot exceed percentage in filter",
md: metadata.MD{
headerDelayDuration: []string{"2"},
headerDelayPercentage: []string{"100"},
},
randIn: []int{100},
repeat: 1,
}},
}, {
name: "abort then delay filters",
cfgs: []*fpb.HTTPFault{{
Abort: &fpb.FaultAbort{
Percentage: &tpb.FractionalPercent{Numerator: 50, Denominator: tpb.FractionalPercent_HUNDRED},
ErrorType: &fpb.FaultAbort_GrpcStatus{GrpcStatus: uint32(codes.Unimplemented)},
},
}, {
Delay: &cpb.FaultDelay{
Percentage: &tpb.FractionalPercent{Numerator: 80, Denominator: tpb.FractionalPercent_HUNDRED},
FaultDelaySecifier: &cpb.FaultDelay_FixedDelay{FixedDelay: ptypes.DurationProto(time.Second)},
},
}},
randOutInc: 10,
want: []subcase{{
name: "50% delay and abort (abort skips delay)",
code: codes.Unimplemented,
randIn: []int{100},
repeat: 5,
}, {
name: "30% delay, no abort",
randIn: []int{100, 100},
repeat: 3,
delays: []time.Duration{time.Second},
}, {
name: "20% success",
randIn: []int{100, 100},
repeat: 2,
}},
}}
fs, nodeID, port, cleanup := clientSetup(t)
defer cleanup()
resources := e2e.DefaultClientResources("myservice", nodeID, "localhost", port)
hcm := new(v3httppb.HttpConnectionManager)
err := ptypes.UnmarshalAny(resources.Listeners[0].GetApiListener().GetApiListener(), hcm)
if err != nil {
t.Fatal(err)
}
routerFilter := hcm.HttpFilters[len(hcm.HttpFilters)-1]
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defer func() { randIntn = grpcrand.Intn; newTimer = time.NewTimer }()
var intnCalls []int
var newTimerCalls []time.Duration
randOut := 0
randIntn = func(n int) int {
intnCalls = append(intnCalls, n)
return randOut % n
}
newTimer = func(d time.Duration) *time.Timer {
newTimerCalls = append(newTimerCalls, d)
return time.NewTimer(0)
}
hcm.HttpFilters = nil
for i, cfg := range tc.cfgs {
hcm.HttpFilters = append(hcm.HttpFilters, e2e.HTTPFilter(fmt.Sprintf("fault%d", i), cfg))
}
hcm.HttpFilters = append(hcm.HttpFilters, routerFilter)
hcmAny, err := ptypes.MarshalAny(hcm)
if err != nil {
t.Fatal(err)
}
resources.Listeners[0].ApiListener.ApiListener = hcmAny
resources.Listeners[0].FilterChains[0].Filters[0].ConfigType = &v3listenerpb.Filter_TypedConfig{TypedConfig: hcmAny}
if err := fs.Update(resources); err != nil {
t.Fatal(err)
}
// Create a ClientConn and run the test case.
cc, err := grpc.Dial("xds:///myservice", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()
client := testpb.NewTestServiceClient(cc)
count := 0
for _, want := range tc.want {
t.Run(want.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if want.repeat == 0 {
t.Fatalf("invalid repeat count")
}
for n := 0; n < want.repeat; n++ {
intnCalls = nil
newTimerCalls = nil
ctx = metadata.NewOutgoingContext(ctx, want.md)
_, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true))
t.Logf("RPC %d: err: %v, intnCalls: %v, newTimerCalls: %v", count, err, intnCalls, newTimerCalls)
if status.Code(err) != want.code || !reflect.DeepEqual(intnCalls, want.randIn) || !reflect.DeepEqual(newTimerCalls, want.delays) {
t.Errorf("WANTED code: %v, intnCalls: %v, newTimerCalls: %v", want.code, want.randIn, want.delays)
}
randOut += tc.randOutInc
count++
}
})
}
})
}
}
func (s) TestFaultInjection_MaxActiveFaults(t *testing.T) {
fs, nodeID, port, cleanup := clientSetup(t)
defer cleanup()
resources := e2e.DefaultClientResources("myservice", nodeID, "localhost", port)
hcm := new(v3httppb.HttpConnectionManager)
err := ptypes.UnmarshalAny(resources.Listeners[0].GetApiListener().GetApiListener(), hcm)
if err != nil {
t.Fatal(err)
}
defer func() { newTimer = time.NewTimer }()
timers := make(chan *time.Timer, 2)
newTimer = func(d time.Duration) *time.Timer {
t := time.NewTimer(24 * time.Hour) // Will reset to fire.
timers <- t
return t
}
hcm.HttpFilters = append([]*v3httppb.HttpFilter{
e2e.HTTPFilter("fault", &fpb.HTTPFault{
MaxActiveFaults: wrapperspb.UInt32(2),
Delay: &cpb.FaultDelay{
Percentage: &tpb.FractionalPercent{Numerator: 100, Denominator: tpb.FractionalPercent_HUNDRED},
FaultDelaySecifier: &cpb.FaultDelay_FixedDelay{FixedDelay: ptypes.DurationProto(time.Second)},
},
})},
hcm.HttpFilters...)
hcmAny, err := ptypes.MarshalAny(hcm)
if err != nil {
t.Fatal(err)
}
resources.Listeners[0].ApiListener.ApiListener = hcmAny
resources.Listeners[0].FilterChains[0].Filters[0].ConfigType = &v3listenerpb.Filter_TypedConfig{TypedConfig: hcmAny}
if err := fs.Update(resources); err != nil {
t.Fatal(err)
}
// Create a ClientConn
cc, err := grpc.Dial("xds:///myservice", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()
client := testpb.NewTestServiceClient(cc)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
streams := make(chan testpb.TestService_FullDuplexCallClient)
startStream := func() {
str, err := client.FullDuplexCall(ctx)
if err != nil {
t.Error("RPC error:", err)
}
streams <- str
}
endStream := func() {
str := <-streams
str.CloseSend()
if _, err := str.Recv(); err != io.EOF {
t.Fatal("stream error:", err)
}
}
releaseStream := func() {
timer := <-timers
timer.Reset(0)
}
// Start three streams; two should delay.
go startStream()
go startStream()
go startStream()
// End one of the streams. Ensure the others are blocked on creation.
endStream()
select {
case <-streams:
t.Errorf("unexpected second stream created before delay expires")
case <-time.After(50 * time.Millisecond):
// Wait a short time to ensure no other streams were started yet.
}
// Start one more; it should not be blocked.
go startStream()
endStream()
// Expire one stream's delay; it should be created.
releaseStream()
endStream()
// Another new stream should delay.
go startStream()
select {
case <-streams:
t.Errorf("unexpected second stream created before delay expires")
case <-time.After(50 * time.Millisecond):
// Wait a short time to ensure no other streams were started yet.
}
// Expire both pending timers and end the two streams.
releaseStream()
releaseStream()
endStream()
endStream()
}

View File

@ -32,5 +32,6 @@ import (
_ "google.golang.org/grpc/xds/internal/balancer" // Register the balancers.
_ "google.golang.org/grpc/xds/internal/client/v2" // Register the v2 xDS API client.
_ "google.golang.org/grpc/xds/internal/client/v3" // Register the v3 xDS API client.
_ "google.golang.org/grpc/xds/internal/httpfilter/fault" // Register the fault injection filter.
_ "google.golang.org/grpc/xds/internal/resolver" // Register the xds_resolver.
)