mirror of https://github.com/grpc/grpc-go.git
				
				
				
			
		
			
				
	
	
		
			333 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			333 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Go
		
	
	
	
| /*
 | |
|  *
 | |
|  * Copyright 2023 gRPC authors.
 | |
|  *
 | |
|  * Licensed under the Apache License, Version 2.0 (the "License");
 | |
|  * you may not use this file except in compliance with the License.
 | |
|  * You may obtain a copy of the License at
 | |
|  *
 | |
|  *     http://www.apache.org/licenses/LICENSE-2.0
 | |
|  *
 | |
|  * Unless required by applicable law or agreed to in writing, software
 | |
|  * distributed under the License is distributed on an "AS IS" BASIS,
 | |
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|  * See the License for the specific language governing permissions and
 | |
|  * limitations under the License.
 | |
|  *
 | |
|  */
 | |
| 
 | |
| package grpc_test
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"io"
 | |
| 	"runtime"
 | |
| 	"sync"
 | |
| 	"testing"
 | |
| 	"time"
 | |
| 
 | |
| 	"google.golang.org/grpc"
 | |
| 	"google.golang.org/grpc/codes"
 | |
| 	"google.golang.org/grpc/credentials/insecure"
 | |
| 	"google.golang.org/grpc/internal/grpcsync"
 | |
| 	"google.golang.org/grpc/internal/stubserver"
 | |
| 	"google.golang.org/grpc/status"
 | |
| 
 | |
| 	testgrpc "google.golang.org/grpc/interop/grpc_testing"
 | |
| 	testpb "google.golang.org/grpc/interop/grpc_testing"
 | |
| )
 | |
| 
 | |
| // TestServer_MaxHandlers ensures that no more than MaxConcurrentStreams server
 | |
| // handlers are active at one time.
 | |
| func (s) TestServer_MaxHandlers(t *testing.T) {
 | |
| 	started := make(chan struct{})
 | |
| 	blockCalls := grpcsync.NewEvent()
 | |
| 
 | |
| 	// This stub server does not properly respect the stream context, so it will
 | |
| 	// not exit when the context is canceled.
 | |
| 	ss := stubserver.StubServer{
 | |
| 		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
 | |
| 			started <- struct{}{}
 | |
| 			<-blockCalls.Done()
 | |
| 			return nil
 | |
| 		},
 | |
| 	}
 | |
| 	if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)}); err != nil {
 | |
| 		t.Fatal("Error starting server:", err)
 | |
| 	}
 | |
| 	defer ss.Stop()
 | |
| 
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 	defer cancel()
 | |
| 
 | |
| 	// Start one RPC to the server.
 | |
| 	ctx1, cancel1 := context.WithCancel(ctx)
 | |
| 	_, err := ss.Client.FullDuplexCall(ctx1)
 | |
| 	if err != nil {
 | |
| 		t.Fatal("Error staring call:", err)
 | |
| 	}
 | |
| 
 | |
| 	// Wait for the handler to be invoked.
 | |
| 	select {
 | |
| 	case <-started:
 | |
| 	case <-ctx.Done():
 | |
| 		t.Fatalf("Timed out waiting for RPC to start on server.")
 | |
| 	}
 | |
| 
 | |
| 	// Cancel it on the client.  The server handler will still be running.
 | |
| 	cancel1()
 | |
| 
 | |
| 	ctx2, cancel2 := context.WithCancel(ctx)
 | |
| 	defer cancel2()
 | |
| 	s, err := ss.Client.FullDuplexCall(ctx2)
 | |
| 	if err != nil {
 | |
| 		t.Fatal("Error staring call:", err)
 | |
| 	}
 | |
| 
 | |
| 	// After 100ms, allow the first call to unblock.  That should allow the
 | |
| 	// second RPC to run and finish.
 | |
| 	select {
 | |
| 	case <-started:
 | |
| 		blockCalls.Fire()
 | |
| 		t.Fatalf("RPC started unexpectedly.")
 | |
| 	case <-time.After(100 * time.Millisecond):
 | |
| 		blockCalls.Fire()
 | |
| 	}
 | |
| 
 | |
| 	select {
 | |
| 	case <-started:
 | |
| 	case <-ctx.Done():
 | |
| 		t.Fatalf("Timed out waiting for second RPC to start on server.")
 | |
| 	}
 | |
| 	if _, err := s.Recv(); err != io.EOF {
 | |
| 		t.Fatal("Received unexpected RPC error:", err)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Tests the case where the stream worker goroutine option is enabled, and a
 | |
| // number of RPCs are initiated around the same time that Stop() is called. This
 | |
| // used to result in a write to a closed channel. This test verifies that there
 | |
| // is no panic.
 | |
| func (s) TestStreamWorkers_RPCsAndStop(t *testing.T) {
 | |
| 	ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU())))
 | |
| 	// This deferred stop takes care of stopping the server when one of the
 | |
| 	// below grpc.Dials fail, and the test exits early.
 | |
| 	defer ss.Stop()
 | |
| 
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 	defer cancel()
 | |
| 	const numChannels = 20
 | |
| 	const numRPCLoops = 20
 | |
| 
 | |
| 	// Create a bunch of clientconns and ensure that they are READY by making an
 | |
| 	// RPC on them.
 | |
| 	ccs := make([]*grpc.ClientConn, numChannels)
 | |
| 	for i := 0; i < numChannels; i++ {
 | |
| 		var err error
 | |
| 		ccs[i], err = grpc.Dial(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
 | |
| 		if err != nil {
 | |
| 			t.Fatalf("[iteration: %d] grpc.Dial(%s) failed: %v", i, ss.Address, err)
 | |
| 		}
 | |
| 		defer ccs[i].Close()
 | |
| 		client := testgrpc.NewTestServiceClient(ccs[i])
 | |
| 		if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
 | |
| 			t.Fatalf("EmptyCall() failed: %v", err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Make a bunch of concurrent RPCs on the above clientconns. These will
 | |
| 	// eventually race with Stop(), and will start to fail.
 | |
| 	var wg sync.WaitGroup
 | |
| 	for i := 0; i < numChannels; i++ {
 | |
| 		client := testgrpc.NewTestServiceClient(ccs[i])
 | |
| 		for j := 0; j < numRPCLoops; j++ {
 | |
| 			wg.Add(1)
 | |
| 			go func(client testgrpc.TestServiceClient) {
 | |
| 				defer wg.Done()
 | |
| 				for {
 | |
| 					_, err := client.EmptyCall(ctx, &testpb.Empty{})
 | |
| 					if err == nil {
 | |
| 						continue
 | |
| 					}
 | |
| 					if code := status.Code(err); code == codes.Unavailable {
 | |
| 						// Once Stop() has been called on the server, we expect
 | |
| 						// subsequent calls to fail with Unavailable.
 | |
| 						return
 | |
| 					}
 | |
| 					t.Errorf("EmptyCall() failed: %v", err)
 | |
| 					return
 | |
| 				}
 | |
| 			}(client)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Call Stop() concurrently with the above RPC attempts.
 | |
| 	ss.Stop()
 | |
| 	wg.Wait()
 | |
| }
 | |
| 
 | |
| // Tests the case where the stream worker goroutine option is enabled, and both
 | |
| // Stop() and GracefulStop() care called. This used to result in a close of a
 | |
| // closed channel. This test verifies that there is no panic.
 | |
| func (s) TestStreamWorkers_GracefulStopAndStop(t *testing.T) {
 | |
| 	ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU())))
 | |
| 	defer ss.Stop()
 | |
| 
 | |
| 	if err := ss.StartClient(grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil {
 | |
| 		t.Fatalf("Failed to create client to stub server: %v", err)
 | |
| 	}
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 	defer cancel()
 | |
| 	client := testgrpc.NewTestServiceClient(ss.CC)
 | |
| 	if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
 | |
| 		t.Fatalf("EmptyCall() failed: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	ss.S.GracefulStop()
 | |
| }
 | |
| 
 | |
| // Tests the WaitForHandlers ServerOption by leaving an RPC running while Stop
 | |
| // is called, and ensures Stop doesn't return until the handler returns.
 | |
| func (s) TestServer_WaitForHandlers(t *testing.T) {
 | |
| 	started := grpcsync.NewEvent()
 | |
| 	blockCalls := grpcsync.NewEvent()
 | |
| 
 | |
| 	// This stub server does not properly respect the stream context, so it will
 | |
| 	// not exit when the context is canceled.
 | |
| 	ss := stubserver.StubServer{
 | |
| 		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
 | |
| 			started.Fire()
 | |
| 			<-blockCalls.Done()
 | |
| 			return nil
 | |
| 		},
 | |
| 	}
 | |
| 	if err := ss.Start([]grpc.ServerOption{grpc.WaitForHandlers(true)}); err != nil {
 | |
| 		t.Fatal("Error starting server:", err)
 | |
| 	}
 | |
| 	defer ss.Stop()
 | |
| 
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 	defer cancel()
 | |
| 
 | |
| 	// Start one RPC to the server.
 | |
| 	ctx1, cancel1 := context.WithCancel(ctx)
 | |
| 	_, err := ss.Client.FullDuplexCall(ctx1)
 | |
| 	if err != nil {
 | |
| 		t.Fatal("Error staring call:", err)
 | |
| 	}
 | |
| 
 | |
| 	// Wait for the handler to be invoked.
 | |
| 	select {
 | |
| 	case <-started.Done():
 | |
| 	case <-ctx.Done():
 | |
| 		t.Fatalf("Timed out waiting for RPC to start on server.")
 | |
| 	}
 | |
| 
 | |
| 	// Cancel it on the client.  The server handler will still be running.
 | |
| 	cancel1()
 | |
| 
 | |
| 	// Close the connection.  This might be sufficient to allow the server to
 | |
| 	// return if it doesn't properly wait for outstanding method handlers to
 | |
| 	// return.
 | |
| 	ss.CC.Close()
 | |
| 
 | |
| 	// Try to Stop() the server, which should block indefinitely (until
 | |
| 	// blockCalls is fired).
 | |
| 	stopped := grpcsync.NewEvent()
 | |
| 	go func() {
 | |
| 		ss.S.Stop()
 | |
| 		stopped.Fire()
 | |
| 	}()
 | |
| 
 | |
| 	// Wait 100ms and ensure stopped does not fire.
 | |
| 	select {
 | |
| 	case <-stopped.Done():
 | |
| 		trace := make([]byte, 4096)
 | |
| 		trace = trace[0:runtime.Stack(trace, true)]
 | |
| 		blockCalls.Fire()
 | |
| 		t.Fatalf("Server returned from Stop() illegally.  Stack trace:\n%v", string(trace))
 | |
| 	case <-time.After(100 * time.Millisecond):
 | |
| 		// Success; unblock the call and wait for stopped.
 | |
| 		blockCalls.Fire()
 | |
| 	}
 | |
| 
 | |
| 	select {
 | |
| 	case <-stopped.Done():
 | |
| 	case <-ctx.Done():
 | |
| 		t.Fatalf("Timed out waiting for second RPC to start on server.")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Tests that GracefulStop will wait for all method handlers to return by
 | |
| // blocking a handler and ensuring GracefulStop doesn't return until after it is
 | |
| // unblocked.
 | |
| func (s) TestServer_GracefulStopWaits(t *testing.T) {
 | |
| 	started := grpcsync.NewEvent()
 | |
| 	blockCalls := grpcsync.NewEvent()
 | |
| 
 | |
| 	// This stub server does not properly respect the stream context, so it will
 | |
| 	// not exit when the context is canceled.
 | |
| 	ss := stubserver.StubServer{
 | |
| 		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
 | |
| 			started.Fire()
 | |
| 			<-blockCalls.Done()
 | |
| 			return nil
 | |
| 		},
 | |
| 	}
 | |
| 	if err := ss.Start(nil); err != nil {
 | |
| 		t.Fatal("Error starting server:", err)
 | |
| 	}
 | |
| 	defer ss.Stop()
 | |
| 
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 	defer cancel()
 | |
| 
 | |
| 	// Start one RPC to the server.
 | |
| 	ctx1, cancel1 := context.WithCancel(ctx)
 | |
| 	_, err := ss.Client.FullDuplexCall(ctx1)
 | |
| 	if err != nil {
 | |
| 		t.Fatal("Error staring call:", err)
 | |
| 	}
 | |
| 
 | |
| 	// Wait for the handler to be invoked.
 | |
| 	select {
 | |
| 	case <-started.Done():
 | |
| 	case <-ctx.Done():
 | |
| 		t.Fatalf("Timed out waiting for RPC to start on server.")
 | |
| 	}
 | |
| 
 | |
| 	// Cancel it on the client.  The server handler will still be running.
 | |
| 	cancel1()
 | |
| 
 | |
| 	// Close the connection.  This might be sufficient to allow the server to
 | |
| 	// return if it doesn't properly wait for outstanding method handlers to
 | |
| 	// return.
 | |
| 	ss.CC.Close()
 | |
| 
 | |
| 	// Try to Stop() the server, which should block indefinitely (until
 | |
| 	// blockCalls is fired).
 | |
| 	stopped := grpcsync.NewEvent()
 | |
| 	go func() {
 | |
| 		ss.S.GracefulStop()
 | |
| 		stopped.Fire()
 | |
| 	}()
 | |
| 
 | |
| 	// Wait 100ms and ensure stopped does not fire.
 | |
| 	select {
 | |
| 	case <-stopped.Done():
 | |
| 		trace := make([]byte, 4096)
 | |
| 		trace = trace[0:runtime.Stack(trace, true)]
 | |
| 		blockCalls.Fire()
 | |
| 		t.Fatalf("Server returned from Stop() illegally.  Stack trace:\n%v", string(trace))
 | |
| 	case <-time.After(100 * time.Millisecond):
 | |
| 		// Success; unblock the call and wait for stopped.
 | |
| 		blockCalls.Fire()
 | |
| 	}
 | |
| 
 | |
| 	select {
 | |
| 	case <-stopped.Done():
 | |
| 	case <-ctx.Done():
 | |
| 		t.Fatalf("Timed out waiting for second RPC to start on server.")
 | |
| 	}
 | |
| }
 |