diff --git a/clientconn.go b/clientconn.go index 1e4e74d6c..c461bc9fd 100644 --- a/clientconn.go +++ b/clientconn.go @@ -255,22 +255,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * cc.dopts.bs = backoff.DefaultExponential } - if cc.dopts.scChan != nil { - // Blocking wait for the initial service config. - select { - case sc, ok := <-cc.dopts.scChan: - if ok { - cc.sc = &sc - cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc}) - } - case <-ctx.Done(): - return nil, ctx.Err() - } - } - if cc.dopts.scChan != nil { - go cc.scWatcher() - } - // This creates the name resolver, load balancer, blocking picker etc. if err := cc.exitIdleMode(); err != nil { return nil, err @@ -746,25 +730,6 @@ func (cc *ClientConn) Connect() { cc.balancerWrapper.exitIdleMode() } -func (cc *ClientConn) scWatcher() { - for { - select { - case sc, ok := <-cc.dopts.scChan: - if !ok { - return - } - cc.mu.Lock() - // TODO: load balance policy runtime change is ignored. - // We may revisit this decision in the future. - cc.sc = &sc - cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc}) - cc.mu.Unlock() - case <-cc.ctx.Done(): - return - } - } -} - // waitForResolvedAddrs blocks until the resolver has provided addresses or the // context expires. Returns nil unless the context expires first; otherwise // returns a status error based on the context. diff --git a/dialoptions.go b/dialoptions.go index 4d47e2cb8..79cd569a2 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -64,7 +64,6 @@ type dialOptions struct { block bool returnLastError bool timeout time.Duration - scChan <-chan ServiceConfig authority string binaryLogger binarylog.Logger copts transport.ConnectOptions @@ -251,19 +250,6 @@ func WithDecompressor(dc Decompressor) DialOption { }) } -// WithServiceConfig returns a DialOption which has a channel to read the -// service configuration. -// -// Deprecated: service config should be received through name resolver or via -// WithDefaultServiceConfig, as specified at -// https://github.com/grpc/grpc/blob/master/doc/service_config.md. Will be -// removed in a future 1.x release. -func WithServiceConfig(c <-chan ServiceConfig) DialOption { - return newFuncDialOption(func(o *dialOptions) { - o.scChan = c - }) -} - // WithConnectParams configures the ClientConn to use the provided ConnectParams // for creating and maintaining connections to servers. // diff --git a/test/end2end_test.go b/test/end2end_test.go index 184940ec2..97a7f1812 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -505,7 +505,6 @@ type test struct { clientNopCompression bool unaryClientInt grpc.UnaryClientInterceptor streamClientInt grpc.StreamClientInterceptor - sc <-chan grpc.ServiceConfig clientInitialWindowSize int32 clientInitialConnWindowSize int32 perRPCCreds credentials.PerRPCCredentials @@ -760,10 +759,6 @@ func (d *nopDecompressor) Type() string { func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string) { opts = append(opts, grpc.WithDialer(te.e.dialer), grpc.WithUserAgent(te.userAgent)) - if te.sc != nil { - opts = append(opts, grpc.WithServiceConfig(te.sc)) - } - if te.clientCompression { opts = append(opts, grpc.WithCompressor(grpc.NewGZIPCompressor()), @@ -1103,20 +1098,10 @@ func testServiceConfigSetup(t *testing.T, e env) *test { return te } -func newBool(b bool) (a *bool) { - return &b -} - func newInt(b int) (a *int) { return &b } -func newDuration(b time.Duration) (a *time.Duration) { - a = new(time.Duration) - *a = b - return -} - func (s) TestGetMethodConfig(t *testing.T) { te := testServiceConfigSetup(t, tcpClearRREnv) defer te.tearDown() diff --git a/test/service_config_deprecated_test.go b/test/service_config_deprecated_test.go deleted file mode 100644 index a1fd44d85..000000000 --- a/test/service_config_deprecated_test.go +++ /dev/null @@ -1,463 +0,0 @@ -/* - * - * Copyright 2023 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package test - -import ( - "context" - "testing" - "time" - - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - - testgrpc "google.golang.org/grpc/interop/grpc_testing" - testpb "google.golang.org/grpc/interop/grpc_testing" -) - -// The following functions with function name ending with TD indicates that they -// should be deleted after old service config API is deprecated and deleted. -func testServiceConfigSetupTD(t *testing.T, e env) (*test, chan grpc.ServiceConfig) { - te := newTest(t, e) - // We write before read. - ch := make(chan grpc.ServiceConfig, 1) - te.sc = ch - te.userAgent = testAppUA - te.declareLogNoise( - "transport: http2Client.notifyError got notified that the client transport was broken EOF", - "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", - "grpc: addrConn.resetTransport failed to create client transport: connection error", - "Failed to dial : context canceled; please retry.", - ) - return te, ch -} - -func (s) TestServiceConfigGetMethodConfigTD(t *testing.T) { - for _, e := range listTestEnv() { - testGetMethodConfigTD(t, e) - } -} - -func testGetMethodConfigTD(t *testing.T, e env) { - te, ch := testServiceConfigSetupTD(t, e) - defer te.tearDown() - - mc1 := grpc.MethodConfig{ - WaitForReady: newBool(true), - Timeout: newDuration(time.Millisecond), - } - mc2 := grpc.MethodConfig{WaitForReady: newBool(false)} - m := make(map[string]grpc.MethodConfig) - m["/grpc.testing.TestService/EmptyCall"] = mc1 - m["/grpc.testing.TestService/"] = mc2 - sc := grpc.ServiceConfig{ - Methods: m, - } - ch <- sc - - cc := te.clientConn() - tc := testgrpc.NewTestServiceClient(cc) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) - } - - m = make(map[string]grpc.MethodConfig) - m["/grpc.testing.TestService/UnaryCall"] = mc1 - m["/grpc.testing.TestService/"] = mc2 - sc = grpc.ServiceConfig{ - Methods: m, - } - ch <- sc - // Wait for the new service config to propagate. - for { - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { - break - } - } - // The following RPCs are expected to become fail-fast. - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable { - t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) - } -} - -func (s) TestServiceConfigWaitForReadyTD(t *testing.T) { - for _, e := range listTestEnv() { - testServiceConfigWaitForReadyTD(t, e) - } -} - -func testServiceConfigWaitForReadyTD(t *testing.T, e env) { - te, ch := testServiceConfigSetupTD(t, e) - defer te.tearDown() - - // Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds. - mc := grpc.MethodConfig{ - WaitForReady: newBool(false), - Timeout: newDuration(time.Millisecond), - } - m := make(map[string]grpc.MethodConfig) - m["/grpc.testing.TestService/EmptyCall"] = mc - m["/grpc.testing.TestService/FullDuplexCall"] = mc - sc := grpc.ServiceConfig{ - Methods: m, - } - ch <- sc - - cc := te.clientConn() - tc := testgrpc.NewTestServiceClient(cc) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) - } - if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) - } - - // Generate a service config update. - // Case2: Client API does not set failfast, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds. - mc.WaitForReady = newBool(true) - m = make(map[string]grpc.MethodConfig) - m["/grpc.testing.TestService/EmptyCall"] = mc - m["/grpc.testing.TestService/FullDuplexCall"] = mc - sc = grpc.ServiceConfig{ - Methods: m, - } - ch <- sc - - // Wait for the new service config to take effect. - ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { - mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall") - if *mc.WaitForReady { - break - } - } - if ctx.Err() != nil { - t.Fatalf("Timeout when waiting for service config to take effect") - } - - // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) - } - if _, err := tc.FullDuplexCall(ctx); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) - } -} - -func (s) TestServiceConfigTimeoutTD(t *testing.T) { - for _, e := range listTestEnv() { - testServiceConfigTimeoutTD(t, e) - } -} - -func testServiceConfigTimeoutTD(t *testing.T, e env) { - te, ch := testServiceConfigSetupTD(t, e) - defer te.tearDown() - - // Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout - // to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will - // wait until deadline exceeds. - mc := grpc.MethodConfig{ - Timeout: newDuration(time.Hour), - } - m := make(map[string]grpc.MethodConfig) - m["/grpc.testing.TestService/EmptyCall"] = mc - m["/grpc.testing.TestService/FullDuplexCall"] = mc - sc := grpc.ServiceConfig{ - Methods: m, - } - ch <- sc - - cc := te.clientConn() - tc := testgrpc.NewTestServiceClient(cc) - // The following RPCs are expected to become non-fail-fast ones with 1ns deadline. - ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) - } - cancel() - ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond) - if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) - } - cancel() - - // Generate a service config update. - // Case2: Client API sets timeout to be the default and ServiceConfig sets - // timeout to be 1ns. Timeout should be 1ns (min of 1ns and the default) - // and the rpc will wait until deadline exceeds. - mc.Timeout = newDuration(time.Nanosecond) - m = make(map[string]grpc.MethodConfig) - m["/grpc.testing.TestService/EmptyCall"] = mc - m["/grpc.testing.TestService/FullDuplexCall"] = mc - sc = grpc.ServiceConfig{ - Methods: m, - } - ch <- sc - - // Wait for the new service config to take effect. - ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - for ; ctx.Err() == nil; <-time.After(time.Millisecond) { - mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall") - if *mc.Timeout == time.Nanosecond { - break - } - } - if ctx.Err() != nil { - t.Fatalf("Timeout when waiting for service config to take effect") - } - - ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded || ctx.Err() != nil { - t.Fatalf("TestService/EmptyCall(_, _) = _, %v and ctx.Err() = %v; want _, %s and ctx.Err() = nil", err, ctx.Err(), codes.DeadlineExceeded) - } - - if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded || ctx.Err() != nil { - t.Fatalf("TestService/FullDuplexCall(_) = _, %v and ctx.Err() = %v; want _, %s and ctx.Err() = nil", err, ctx.Err(), codes.DeadlineExceeded) - } -} - -func (s) TestServiceConfigMaxMsgSizeTD(t *testing.T) { - for _, e := range listTestEnv() { - testServiceConfigMaxMsgSizeTD(t, e) - } -} - -func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { - // Setting up values and objects shared across all test cases. - const smallSize = 1 - const largeSize = 1024 - const extraLargeSize = 2048 - - smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) - if err != nil { - t.Fatal(err) - } - largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) - if err != nil { - t.Fatal(err) - } - extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize) - if err != nil { - t.Fatal(err) - } - - mc := grpc.MethodConfig{ - MaxReqSize: newInt(extraLargeSize), - MaxRespSize: newInt(extraLargeSize), - } - - m := make(map[string]grpc.MethodConfig) - m["/grpc.testing.TestService/UnaryCall"] = mc - m["/grpc.testing.TestService/FullDuplexCall"] = mc - sc := grpc.ServiceConfig{ - Methods: m, - } - // Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv). - te1, ch1 := testServiceConfigSetupTD(t, e) - te1.startServer(&testServer{security: e.security}) - defer te1.tearDown() - - ch1 <- sc - tc := testgrpc.NewTestServiceClient(te1.clientConn()) - - req := &testpb.SimpleRequest{ - ResponseType: testpb.PayloadType_COMPRESSABLE, - ResponseSize: int32(extraLargeSize), - Payload: smallPayload, - } - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - // Test for unary RPC recv. - if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) - } - - // Test for unary RPC send. - req.Payload = extraLargePayload - req.ResponseSize = int32(smallSize) - if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) - } - - // Test for streaming RPC recv. - respParam := []*testpb.ResponseParameters{ - { - Size: int32(extraLargeSize), - }, - } - sreq := &testpb.StreamingOutputCallRequest{ - ResponseType: testpb.PayloadType_COMPRESSABLE, - ResponseParameters: respParam, - Payload: smallPayload, - } - stream, err := tc.FullDuplexCall(te1.ctx) - if err != nil { - t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) - } - if err := stream.Send(sreq); err != nil { - t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) - } - if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) - } - - // Test for streaming RPC send. - respParam[0].Size = int32(smallSize) - sreq.Payload = extraLargePayload - stream, err = tc.FullDuplexCall(te1.ctx) - if err != nil { - t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) - } - if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) - } - - // Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). - te2, ch2 := testServiceConfigSetupTD(t, e) - te2.maxClientReceiveMsgSize = newInt(1024) - te2.maxClientSendMsgSize = newInt(1024) - te2.startServer(&testServer{security: e.security}) - defer te2.tearDown() - ch2 <- sc - tc = testgrpc.NewTestServiceClient(te2.clientConn()) - - // Test for unary RPC recv. - req.Payload = smallPayload - req.ResponseSize = int32(largeSize) - - if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) - } - - // Test for unary RPC send. - req.Payload = largePayload - req.ResponseSize = int32(smallSize) - if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) - } - - // Test for streaming RPC recv. - stream, err = tc.FullDuplexCall(te2.ctx) - respParam[0].Size = int32(largeSize) - sreq.Payload = smallPayload - if err != nil { - t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) - } - if err := stream.Send(sreq); err != nil { - t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) - } - if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) - } - - // Test for streaming RPC send. - respParam[0].Size = int32(smallSize) - sreq.Payload = largePayload - stream, err = tc.FullDuplexCall(te2.ctx) - if err != nil { - t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) - } - if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) - } - - // Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). - te3, ch3 := testServiceConfigSetupTD(t, e) - te3.maxClientReceiveMsgSize = newInt(4096) - te3.maxClientSendMsgSize = newInt(4096) - te3.startServer(&testServer{security: e.security}) - defer te3.tearDown() - ch3 <- sc - tc = testgrpc.NewTestServiceClient(te3.clientConn()) - - // Test for unary RPC recv. - req.Payload = smallPayload - req.ResponseSize = int32(largeSize) - - if _, err := tc.UnaryCall(ctx, req); err != nil { - t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want ", err) - } - - req.ResponseSize = int32(extraLargeSize) - if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) - } - - // Test for unary RPC send. - req.Payload = largePayload - req.ResponseSize = int32(smallSize) - if _, err := tc.UnaryCall(ctx, req); err != nil { - t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want ", err) - } - - req.Payload = extraLargePayload - if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) - } - - // Test for streaming RPC recv. - stream, err = tc.FullDuplexCall(te3.ctx) - if err != nil { - t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) - } - respParam[0].Size = int32(largeSize) - sreq.Payload = smallPayload - - if err := stream.Send(sreq); err != nil { - t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) - } - if _, err := stream.Recv(); err != nil { - t.Fatalf("%v.Recv() = _, %v, want ", stream, err) - } - - respParam[0].Size = int32(extraLargeSize) - - if err := stream.Send(sreq); err != nil { - t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) - } - if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) - } - - // Test for streaming RPC send. - respParam[0].Size = int32(smallSize) - sreq.Payload = largePayload - stream, err = tc.FullDuplexCall(te3.ctx) - if err != nil { - t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) - } - if err := stream.Send(sreq); err != nil { - t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) - } - sreq.Payload = extraLargePayload - if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) - } -}