grpc: factor out setup func (#7909)

This uses a pattern that is new to our tests. setup accepts a variadic
list of options, and uses a type switch to make use of those options
during setup. This allows us to pass setup only the options that are
relevant to any given test case, leaving the rest to sensible defaults.
This commit is contained in:
Jacob Hoffman-Andrews 2025-01-20 09:31:57 -08:00 committed by GitHub
parent 6b1e7f04e8
commit 600010305a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 53 additions and 153 deletions

View File

@ -103,7 +103,7 @@ func TestWaitForReadyTrue(t *testing.T) {
clk: clock.NewFake(),
waitForReady: true,
}
conn, err := grpc.Dial("localhost:19876", // random, probably unused port
conn, err := grpc.NewClient("localhost:19876", // random, probably unused port
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, roundrobin.Name)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(ci.Unary))
@ -135,7 +135,7 @@ func TestWaitForReadyFalse(t *testing.T) {
clk: clock.NewFake(),
waitForReady: false,
}
conn, err := grpc.Dial("localhost:19876", // random, probably unused port
conn, err := grpc.NewClient("localhost:19876", // random, probably unused port
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, roundrobin.Name)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(ci.Unary))
@ -155,14 +155,14 @@ func TestWaitForReadyFalse(t *testing.T) {
}
}
// testServer is used to implement TestTimeouts, and will attempt to sleep for
// testTimeoutServer is used to implement TestTimeouts, and will attempt to sleep for
// the given amount of time (unless it hits a timeout or cancel).
type testServer struct {
type testTimeoutServer struct {
test_proto.UnimplementedChillerServer
}
// Chill implements ChillerServer.Chill
func (s *testServer) Chill(ctx context.Context, in *test_proto.Time) (*test_proto.Time, error) {
func (s *testTimeoutServer) Chill(ctx context.Context, in *test_proto.Time) (*test_proto.Time, error) {
start := time.Now()
// Sleep for either the requested amount of time, or the context times out or
// is canceled.
@ -176,42 +176,9 @@ func (s *testServer) Chill(ctx context.Context, in *test_proto.Time) (*test_prot
}
func TestTimeouts(t *testing.T) {
// start server
lis, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
port := lis.Addr().(*net.TCPAddr).Port
serverMetrics, err := newServerMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating server metrics")
si := newServerMetadataInterceptor(serverMetrics, clock.NewFake())
s := grpc.NewServer(grpc.UnaryInterceptor(si.Unary))
test_proto.RegisterChillerServer(s, &testServer{})
go func() {
start := time.Now()
err := s.Serve(lis)
if err != nil && !strings.HasSuffix(err.Error(), "use of closed network connection") {
t.Logf("s.Serve: %v after %s", err, time.Since(start))
}
}()
defer s.Stop()
// make client
clientMetrics, err := newClientMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating client metrics")
ci := &clientMetadataInterceptor{
timeout: 30 * time.Second,
metrics: clientMetrics,
clk: clock.NewFake(),
}
conn, err := grpc.Dial(net.JoinHostPort("localhost", strconv.Itoa(port)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(ci.Unary))
if err != nil {
t.Fatalf("did not connect: %v", err)
}
c := test_proto.NewChillerClient(conn)
server := new(testTimeoutServer)
client, _, stop := setup(t, server, clock.NewFake())
defer stop()
testCases := []struct {
timeout time.Duration
@ -225,7 +192,7 @@ func TestTimeouts(t *testing.T) {
t.Run(tc.timeout.String(), func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), tc.timeout)
defer cancel()
_, err := c.Chill(ctx, &test_proto.Time{Duration: durationpb.New(time.Second)})
_, err := client.Chill(ctx, &test_proto.Time{Duration: durationpb.New(time.Second)})
if err == nil {
t.Fatal("Got no error, expected a timeout")
}
@ -237,58 +204,22 @@ func TestTimeouts(t *testing.T) {
}
func TestRequestTimeTagging(t *testing.T) {
clk := clock.NewFake()
// Listen for TCP requests on a random system assigned port number
lis, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// Retrieve the concrete port numberthe system assigned our listener
port := lis.Addr().(*net.TCPAddr).Port
// Create a new ChillerServer
server := new(testTimeoutServer)
serverMetrics, err := newServerMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating server metrics")
si := newServerMetadataInterceptor(serverMetrics, clk)
s := grpc.NewServer(grpc.UnaryInterceptor(si.Unary))
test_proto.RegisterChillerServer(s, &testServer{})
// Chill until ill
go func() {
start := time.Now()
err := s.Serve(lis)
if err != nil && !strings.HasSuffix(err.Error(), "use of closed network connection") {
t.Logf("s.Serve: %v after %s", err, time.Since(start))
}
}()
defer s.Stop()
// Dial the ChillerServer
clientMetrics, err := newClientMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating client metrics")
ci := &clientMetadataInterceptor{
timeout: 30 * time.Second,
metrics: clientMetrics,
clk: clk,
}
conn, err := grpc.Dial(net.JoinHostPort("localhost", strconv.Itoa(port)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(ci.Unary))
if err != nil {
t.Fatalf("did not connect: %v", err)
}
// Create a ChillerClient with the connection to the ChillerServer
c := test_proto.NewChillerClient(conn)
client, _, stop := setup(t, server, serverMetrics)
defer stop()
// Make an RPC request with the ChillerClient with a timeout higher than the
// requested ChillerServer delay so that the RPC completes normally
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if _, err := c.Chill(ctx, &test_proto.Time{Duration: durationpb.New(time.Second * 5)}); err != nil {
if _, err := client.Chill(ctx, &test_proto.Time{Duration: durationpb.New(time.Second * 5)}); err != nil {
t.Fatalf("Unexpected error calling Chill RPC: %s", err)
}
// There should be one histogram sample in the serverInterceptor rpcLag stat
test.AssertMetricWithLabelsEquals(t, si.metrics.rpcLag, prometheus.Labels{}, 1)
test.AssertMetricWithLabelsEquals(t, serverMetrics.rpcLag, prometheus.Labels{}, 1)
}
func TestClockSkew(t *testing.T) {
@ -298,32 +229,23 @@ func TestClockSkew(t *testing.T) {
clientClk := clock.NewFake()
clientClk.Set(time.Now())
// Listen for TCP requests on a random system assigned port number
lis, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
port := lis.Addr().(*net.TCPAddr).Port
_, serverPort, stop := setup(t, &testTimeoutServer{}, serverClk)
defer stop()
// Start a gRPC server listening on that port
serverMetrics, err := newServerMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating server metrics")
si := newServerMetadataInterceptor(serverMetrics, serverClk)
s := grpc.NewServer(grpc.UnaryInterceptor(si.Unary))
test_proto.RegisterChillerServer(s, &testServer{})
go func() { _ = s.Serve(lis) }()
defer s.Stop()
// Start a gRPC client talking to the server
clientMetrics, err := newClientMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating client metrics")
ci := &clientMetadataInterceptor{metrics: clientMetrics, clk: clientClk, timeout: time.Second}
conn, err := grpc.NewClient(
net.JoinHostPort("localhost", strconv.Itoa(port)),
ci := &clientMetadataInterceptor{
timeout: 30 * time.Second,
metrics: clientMetrics,
clk: clientClk,
}
conn, err := grpc.NewClient(net.JoinHostPort("localhost", strconv.Itoa(serverPort)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(ci.Unary),
)
test.AssertNotError(t, err, "creating test client")
grpc.WithUnaryInterceptor(ci.Unary))
if err != nil {
t.Fatalf("did not connect: %v", err)
}
client := test_proto.NewChillerClient(conn)
// Create a context with plenty of timeout
@ -369,18 +291,15 @@ func (s *blockedServer) Chill(_ context.Context, _ *test_proto.Time) (*test_prot
}
func TestInFlightRPCStat(t *testing.T) {
clk := clock.NewFake()
// Listen for TCP requests on a random system assigned port number
lis, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// Retrieve the concrete port numberthe system assigned our listener
port := lis.Addr().(*net.TCPAddr).Port
// Create a new blockedServer to act as a ChillerServer
server := &blockedServer{}
metrics, err := newClientMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating client metrics")
client, _, stop := setup(t, server, metrics)
defer stop()
// Increment the roadblock waitgroup - this will cause all chill RPCs to
// the server to block until we call Done()!
server.roadblock.Add(1)
@ -391,43 +310,11 @@ func TestInFlightRPCStat(t *testing.T) {
numRPCs := 5
server.received.Add(numRPCs)
serverMetrics, err := newServerMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating server metrics")
si := newServerMetadataInterceptor(serverMetrics, clk)
s := grpc.NewServer(grpc.UnaryInterceptor(si.Unary))
test_proto.RegisterChillerServer(s, server)
// Chill until ill
go func() {
start := time.Now()
err := s.Serve(lis)
if err != nil && !strings.HasSuffix(err.Error(), "use of closed network connection") {
t.Logf("s.Serve: %v after %s", err, time.Since(start))
}
}()
defer s.Stop()
// Dial the ChillerServer
clientMetrics, err := newClientMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating client metrics")
ci := &clientMetadataInterceptor{
timeout: 30 * time.Second,
metrics: clientMetrics,
clk: clk,
}
conn, err := grpc.Dial(net.JoinHostPort("localhost", strconv.Itoa(port)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(ci.Unary))
if err != nil {
t.Fatalf("did not connect: %v", err)
}
// Create a ChillerClient with the connection to the ChillerServer
c := test_proto.NewChillerClient(conn)
// Fire off a few RPCs. They will block on the blockedServer's roadblock wg
for range numRPCs {
go func() {
// Ignore errors, just chilllll.
_, _ = c.Chill(context.Background(), &test_proto.Time{})
_, _ = client.Chill(context.Background(), &test_proto.Time{})
}()
}
@ -442,7 +329,7 @@ func TestInFlightRPCStat(t *testing.T) {
}
// We expect the inFlightRPCs gauge for the Chiller.Chill RPCs to be equal to numRPCs.
test.AssertMetricWithLabelsEquals(t, ci.metrics.inFlightRPCs, labels, float64(numRPCs))
test.AssertMetricWithLabelsEquals(t, metrics.inFlightRPCs, labels, float64(numRPCs))
// Unblock the blockedServer to let all of the Chiller.Chill RPCs complete
server.roadblock.Done()
@ -450,7 +337,7 @@ func TestInFlightRPCStat(t *testing.T) {
time.Sleep(1 * time.Second)
// Check the gauge value again
test.AssertMetricWithLabelsEquals(t, ci.metrics.inFlightRPCs, labels, 0)
test.AssertMetricWithLabelsEquals(t, metrics.inFlightRPCs, labels, 0)
}
func TestServiceAuthChecker(t *testing.T) {
@ -541,7 +428,7 @@ func (s *testUserAgentServer) Chill(ctx context.Context, in *test_proto.Time) (*
func TestUserAgentMetadata(t *testing.T) {
server := new(testUserAgentServer)
client, stop := setup(t, server)
client, _, stop := setup(t, server)
defer stop()
testUA := "test UA"
@ -557,13 +444,26 @@ func TestUserAgentMetadata(t *testing.T) {
}
}
func setup(t *testing.T, server test_proto.ChillerServer) (test_proto.ChillerClient, func()) {
// setup creates a server and client, returning the created client, the running server's port, and a stop function.
func setup(t *testing.T, server test_proto.ChillerServer, opts ...any) (test_proto.ChillerClient, int, func()) {
clk := clock.NewFake()
serverMetricsVal, err := newServerMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating server metrics")
clientMetricsVal, err := newClientMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating client metrics")
for _, opt := range opts {
switch optTyped := opt.(type) {
case clock.FakeClock:
clk = optTyped
case clientMetrics:
clientMetricsVal = optTyped
case serverMetrics:
serverMetricsVal = optTyped
default:
t.Fatalf("setup called with unrecognize option %#v", t)
}
}
lis, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatalf("failed to listen: %v", err)
@ -587,11 +487,11 @@ func setup(t *testing.T, server test_proto.ChillerServer) (test_proto.ChillerCli
metrics: clientMetricsVal,
clk: clock.NewFake(),
}
conn, err := grpc.Dial(net.JoinHostPort("localhost", strconv.Itoa(port)),
conn, err := grpc.NewClient(net.JoinHostPort("localhost", strconv.Itoa(port)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(ci.Unary))
if err != nil {
t.Fatalf("did not connect: %v", err)
}
return test_proto.NewChillerClient(conn), s.Stop
return test_proto.NewChillerClient(conn), port, s.Stop
}