From 19d3a3572d2fa06db2ce192ce41e6f515a927635 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 28 Apr 2016 17:34:55 -0700 Subject: [PATCH] Merge StartServer and StartByteBufServer into a general StartServer --- benchmark/benchmark.go | 47 +++++++++++++++------------- benchmark/benchmark_test.go | 10 ++++-- benchmark/worker/benchmark_server.go | 26 +++++++++++++-- 3 files changed, 57 insertions(+), 26 deletions(-) diff --git a/benchmark/benchmark.go b/benchmark/benchmark.go index 208a54315..63bd41d78 100644 --- a/benchmark/benchmark.go +++ b/benchmark/benchmark.go @@ -37,6 +37,7 @@ Package benchmark implements the building blocks to setup end-to-end gRPC benchm package benchmark import ( + "fmt" "io" "net" @@ -91,22 +92,6 @@ func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallS } } -// StartServer starts a gRPC server serving a benchmark service on the given -// address, which may be something like "localhost:0". It returns its listen -// address and a function to stop the server. -func StartServer(addr string, opts ...grpc.ServerOption) (string, func()) { - lis, err := net.Listen("tcp", addr) - if err != nil { - grpclog.Fatalf("Failed to listen: %v", err) - } - s := grpc.NewServer(opts...) - testpb.RegisterBenchmarkServiceServer(s, &testServer{}) - go s.Serve(lis) - return lis.Addr().String(), func() { - s.Stop() - } -} - type byteBufServer struct { respSize int32 } @@ -132,19 +117,39 @@ func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCa } } -// StartbyteBufServer starts a benchmark service server that supports custom codec. +// ServerInfo is used to create server. +// It contains the address and type of the server to be created, and optional metadata. +type ServerInfo struct { + Addr string + Type string + Metadata interface{} +} + +// StartServer starts a gRPC server serving a benchmark service on the given ServerInfo. +// Different types of servers are created according to Type. // It returns its listen address and a function to stop the server. -func StartByteBufServer(addr string, respSize int32, opts ...grpc.ServerOption) (string, func()) { - lis, err := net.Listen("tcp", addr) +func StartServer(info ServerInfo, opts ...grpc.ServerOption) (string, func(), error) { + lis, err := net.Listen("tcp", info.Addr) if err != nil { grpclog.Fatalf("Failed to listen: %v", err) } s := grpc.NewServer(opts...) - testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize}) + switch info.Type { + case "protobuf": + testpb.RegisterBenchmarkServiceServer(s, &testServer{}) + case "bytebuf": + respSize, ok := info.Metadata.(int32) + if !ok { + return "", nil, fmt.Errorf("invalid metadata: %v, for Type: %v", info.Metadata, info.Type) + } + testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize}) + default: + return "", nil, fmt.Errorf("unknown Type: %v", info.Type) + } go s.Serve(lis) return lis.Addr().String(), func() { s.Stop() - } + }, nil } // DoUnaryCall performs an unary RPC with given stub and request and response sizes. diff --git a/benchmark/benchmark_test.go b/benchmark/benchmark_test.go index ccf0f457e..4e47631a6 100644 --- a/benchmark/benchmark_test.go +++ b/benchmark/benchmark_test.go @@ -16,7 +16,10 @@ import ( func runUnary(b *testing.B, maxConcurrentCalls int) { s := stats.AddStats(b, 38) b.StopTimer() - target, stopper := StartServer("localhost:0") + target, stopper, err := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf"}) + if err != nil { + grpclog.Fatalf("failed to start server: %v", err) + } defer stopper() conn := NewClientConn(target, grpc.WithInsecure()) tc := testpb.NewBenchmarkServiceClient(conn) @@ -59,7 +62,10 @@ func runUnary(b *testing.B, maxConcurrentCalls int) { func runStream(b *testing.B, maxConcurrentCalls int) { s := stats.AddStats(b, 38) b.StopTimer() - target, stopper := StartServer("localhost:0") + target, stopper, err := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf"}) + if err != nil { + grpclog.Fatalf("failed to start server: %v", err) + } defer stopper() conn := NewClientConn(target, grpc.WithInsecure()) tc := testpb.NewBenchmarkServiceClient(conn) diff --git a/benchmark/worker/benchmark_server.go b/benchmark/worker/benchmark_server.go index de104f1fd..7638f75f9 100644 --- a/benchmark/worker/benchmark_server.go +++ b/benchmark/worker/benchmark_server.go @@ -111,14 +111,28 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma var ( addr string close func() + err error ) if config.PayloadConfig != nil { switch payload := config.PayloadConfig.Payload.(type) { case *testpb.PayloadConfig_BytebufParams: opts = append(opts, grpc.CustomCodec(byteBufCodec{})) - addr, close = benchmark.StartByteBufServer(":"+strconv.Itoa(port), payload.BytebufParams.RespSize, opts...) + addr, close, err = benchmark.StartServer(benchmark.ServerInfo{ + Addr: ":" + strconv.Itoa(port), + Type: "bytebuf", + Metadata: payload.BytebufParams.RespSize, + }, opts...) + if err != nil { + grpclog.Fatalf("failed to start server: %v", err) + } case *testpb.PayloadConfig_SimpleParams: - addr, close = benchmark.StartServer(":"+strconv.Itoa(port), opts...) + addr, close, err = benchmark.StartServer(benchmark.ServerInfo{ + Addr: ":" + strconv.Itoa(port), + Type: "protobuf", + }, opts...) + if err != nil { + grpclog.Fatalf("failed to start server: %v", err) + } case *testpb.PayloadConfig_ComplexParams: return nil, grpc.Errorf(codes.Unimplemented, "unsupported payload config: %v", config.PayloadConfig) default: @@ -126,7 +140,13 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma } } else { // Start protobuf server is payload config is nil. - addr, close = benchmark.StartServer(":"+strconv.Itoa(port), opts...) + addr, close, err = benchmark.StartServer(benchmark.ServerInfo{ + Addr: ":" + strconv.Itoa(port), + Type: "protobuf", + }, opts...) + if err != nil { + grpclog.Fatalf("failed to start server: %v", err) + } } grpclog.Printf("benchmark server listening at %v", addr)