From 07f33239d27b89721ad6027e7bfb168998859768 Mon Sep 17 00:00:00 2001 From: David Symonds Date: Sat, 6 Jun 2015 10:08:59 -0700 Subject: [PATCH] Fix data race in benchmark_test.go. Streams are not safe for concurrent use, so start a new stream for each goroutine. Fixes #213. --- benchmark/benchmark.go | 6 +++--- benchmark/benchmark_test.go | 7 ++++++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/benchmark/benchmark.go b/benchmark/benchmark.go index 0e46b56da..542b33c24 100644 --- a/benchmark/benchmark.go +++ b/benchmark/benchmark.go @@ -120,7 +120,7 @@ func DoUnaryCall(tc testpb.TestServiceClient, reqSize, respSize int) { } } -// DoStreamingRoundTrip performs a round trip for a single streaming rpc. +// DoStreamingRoundTrip performs a round trip for a single streaming rpc. func DoStreamingRoundTrip(tc testpb.TestServiceClient, stream testpb.TestService_StreamingCallClient, reqSize, respSize int) { pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize) req := &testpb.SimpleRequest{ @@ -129,10 +129,10 @@ func DoStreamingRoundTrip(tc testpb.TestServiceClient, stream testpb.TestService Payload: pl, } if err := stream.Send(req); err != nil { - grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) + grpclog.Fatalf("StreamingCall(_).Send: %v", err) } if _, err := stream.Recv(); err != nil { - grpclog.Fatal("%v.StreamingCall(_) = _, %v", tc, err) + grpclog.Fatalf("StreamingCall(_).Recv: %v", err) } } diff --git a/benchmark/benchmark_test.go b/benchmark/benchmark_test.go index f6b0c200a..49d1cfd6e 100644 --- a/benchmark/benchmark_test.go +++ b/benchmark/benchmark_test.go @@ -62,11 +62,12 @@ func runStream(b *testing.B, maxConcurrentCalls int) { defer stopper() conn := NewClientConn(target) tc := testpb.NewTestServiceClient(conn) + + // Warm up connection. stream, err := tc.StreamingCall(context.Background()) if err != nil { grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) } - // Warm up connection. for i := 0; i < 10; i++ { streamCaller(tc, stream) } @@ -81,6 +82,10 @@ func runStream(b *testing.B, maxConcurrentCalls int) { // Distribute the b.N calls over maxConcurrentCalls workers. for i := 0; i < maxConcurrentCalls; i++ { go func() { + stream, err := tc.StreamingCall(context.Background()) + if err != nil { + grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) + } for _ = range ch { start := time.Now() streamCaller(tc, stream)