Merge StartServer and StartByteBufServer into a general StartServer

This commit is contained in:
Menghan Li 2016-04-28 17:34:55 -07:00
parent bb1be7190b
commit 19d3a3572d
3 changed files with 57 additions and 26 deletions

View File

@ -37,6 +37,7 @@ Package benchmark implements the building blocks to setup end-to-end gRPC benchm
package benchmark
import (
"fmt"
"io"
"net"
@ -91,22 +92,6 @@ func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallS
}
}
// StartServer starts a gRPC server serving a benchmark service on the given
// address, which may be something like "localhost:0". It returns its listen
// address and a function to stop the server.
func StartServer(addr string, opts ...grpc.ServerOption) (string, func()) {
lis, err := net.Listen("tcp", addr)
if err != nil {
grpclog.Fatalf("Failed to listen: %v", err)
}
s := grpc.NewServer(opts...)
testpb.RegisterBenchmarkServiceServer(s, &testServer{})
go s.Serve(lis)
return lis.Addr().String(), func() {
s.Stop()
}
}
type byteBufServer struct {
respSize int32
}
@ -132,19 +117,39 @@ func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCa
}
}
// StartbyteBufServer starts a benchmark service server that supports custom codec.
// ServerInfo is used to create server.
// It contains the address and type of the server to be created, and optional metadata.
type ServerInfo struct {
Addr string
Type string
Metadata interface{}
}
// StartServer starts a gRPC server serving a benchmark service on the given ServerInfo.
// Different types of servers are created according to Type.
// It returns its listen address and a function to stop the server.
func StartByteBufServer(addr string, respSize int32, opts ...grpc.ServerOption) (string, func()) {
lis, err := net.Listen("tcp", addr)
func StartServer(info ServerInfo, opts ...grpc.ServerOption) (string, func(), error) {
lis, err := net.Listen("tcp", info.Addr)
if err != nil {
grpclog.Fatalf("Failed to listen: %v", err)
}
s := grpc.NewServer(opts...)
testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize})
switch info.Type {
case "protobuf":
testpb.RegisterBenchmarkServiceServer(s, &testServer{})
case "bytebuf":
respSize, ok := info.Metadata.(int32)
if !ok {
return "", nil, fmt.Errorf("invalid metadata: %v, for Type: %v", info.Metadata, info.Type)
}
testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize})
default:
return "", nil, fmt.Errorf("unknown Type: %v", info.Type)
}
go s.Serve(lis)
return lis.Addr().String(), func() {
s.Stop()
}
}, nil
}
// DoUnaryCall performs an unary RPC with given stub and request and response sizes.

View File

@ -16,7 +16,10 @@ import (
func runUnary(b *testing.B, maxConcurrentCalls int) {
s := stats.AddStats(b, 38)
b.StopTimer()
target, stopper := StartServer("localhost:0")
target, stopper, err := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf"})
if err != nil {
grpclog.Fatalf("failed to start server: %v", err)
}
defer stopper()
conn := NewClientConn(target, grpc.WithInsecure())
tc := testpb.NewBenchmarkServiceClient(conn)
@ -59,7 +62,10 @@ func runUnary(b *testing.B, maxConcurrentCalls int) {
func runStream(b *testing.B, maxConcurrentCalls int) {
s := stats.AddStats(b, 38)
b.StopTimer()
target, stopper := StartServer("localhost:0")
target, stopper, err := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf"})
if err != nil {
grpclog.Fatalf("failed to start server: %v", err)
}
defer stopper()
conn := NewClientConn(target, grpc.WithInsecure())
tc := testpb.NewBenchmarkServiceClient(conn)

View File

@ -111,14 +111,28 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma
var (
addr string
close func()
err error
)
if config.PayloadConfig != nil {
switch payload := config.PayloadConfig.Payload.(type) {
case *testpb.PayloadConfig_BytebufParams:
opts = append(opts, grpc.CustomCodec(byteBufCodec{}))
addr, close = benchmark.StartByteBufServer(":"+strconv.Itoa(port), payload.BytebufParams.RespSize, opts...)
addr, close, err = benchmark.StartServer(benchmark.ServerInfo{
Addr: ":" + strconv.Itoa(port),
Type: "bytebuf",
Metadata: payload.BytebufParams.RespSize,
}, opts...)
if err != nil {
grpclog.Fatalf("failed to start server: %v", err)
}
case *testpb.PayloadConfig_SimpleParams:
addr, close = benchmark.StartServer(":"+strconv.Itoa(port), opts...)
addr, close, err = benchmark.StartServer(benchmark.ServerInfo{
Addr: ":" + strconv.Itoa(port),
Type: "protobuf",
}, opts...)
if err != nil {
grpclog.Fatalf("failed to start server: %v", err)
}
case *testpb.PayloadConfig_ComplexParams:
return nil, grpc.Errorf(codes.Unimplemented, "unsupported payload config: %v", config.PayloadConfig)
default:
@ -126,7 +140,13 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma
}
} else {
// Start protobuf server is payload config is nil.
addr, close = benchmark.StartServer(":"+strconv.Itoa(port), opts...)
addr, close, err = benchmark.StartServer(benchmark.ServerInfo{
Addr: ":" + strconv.Itoa(port),
Type: "protobuf",
}, opts...)
if err != nil {
grpclog.Fatalf("failed to start server: %v", err)
}
}
grpclog.Printf("benchmark server listening at %v", addr)