Address review comments

Change Abs to abs
Remove unimplemented distribution
Name change
Get server port from config or cmd line option
This commit is contained in:
Menghan Li 2016-04-28 17:48:05 -07:00
parent 19d3a3572d
commit 0ca699c979
4 changed files with 33 additions and 37 deletions

View File

@ -80,7 +80,7 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error)
grpclog.Printf(" - security params: %v", config.SecurityParams) grpclog.Printf(" - security params: %v", config.SecurityParams)
if config.SecurityParams != nil { if config.SecurityParams != nil {
creds, err := credentials.NewClientTLSFromFile(Abs(caFile), config.SecurityParams.ServerHostOverride) creds, err := credentials.NewClientTLSFromFile(abs(caFile), config.SecurityParams.ServerHostOverride)
if err != nil { if err != nil {
return nil, grpc.Errorf(codes.InvalidArgument, "failed to create TLS credentials %v", err) return nil, grpc.Errorf(codes.InvalidArgument, "failed to create TLS credentials %v", err)
} }
@ -90,7 +90,7 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error)
} }
grpclog.Printf(" - core limit: %v", config.CoreLimit) grpclog.Printf(" - core limit: %v", config.CoreLimit)
// Use one cpu core by default // Use one cpu core by default.
// TODO: change default number of cores used if 1 is not fastest. // TODO: change default number of cores used if 1 is not fastest.
if config.CoreLimit > 1 { if config.CoreLimit > 1 {
runtime.GOMAXPROCS(int(config.CoreLimit)) runtime.GOMAXPROCS(int(config.CoreLimit))
@ -125,13 +125,11 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error)
rpcCountPerConn, connCount := int(config.OutstandingRpcsPerChannel), int(config.ClientChannels) rpcCountPerConn, connCount := int(config.OutstandingRpcsPerChannel), int(config.ClientChannels)
grpclog.Printf(" - load params: %v", config.LoadParams) grpclog.Printf(" - load params: %v", config.LoadParams)
var dist *int // TODO add open loop distribution.
switch lp := config.LoadParams.Load.(type) { switch config.LoadParams.Load.(type) {
case *testpb.LoadParams_ClosedLoop: case *testpb.LoadParams_ClosedLoop:
case *testpb.LoadParams_Poisson: case *testpb.LoadParams_Poisson:
grpclog.Printf(" - %v", lp.Poisson)
return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams) return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams)
// TODO poisson
case *testpb.LoadParams_Uniform: case *testpb.LoadParams_Uniform:
return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams) return nil, grpc.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams)
case *testpb.LoadParams_Determ: case *testpb.LoadParams_Determ:
@ -175,21 +173,17 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error)
switch rpcType { switch rpcType {
case "unary": case "unary":
if dist == nil { bc.doCloseLoopUnary(conns, rpcCountPerConn, payloadReqSize, payloadRespSize)
bc.doCloseLoopUnaryBenchmark(conns, rpcCountPerConn, payloadReqSize, payloadRespSize) // TODO open loop.
}
// TODO else do open loop
case "streaming": case "streaming":
if dist == nil { bc.doCloseLoopStreaming(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType)
bc.doCloseLoopStreamingBenchmark(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType) // TODO open loop.
}
// TODO else do open loop
} }
return &bc, nil return &bc, nil
} }
func (bc *benchmarkClient) doCloseLoopUnaryBenchmark(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int) { func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int) {
clients := make([]testpb.BenchmarkServiceClient, len(conns)) clients := make([]testpb.BenchmarkServiceClient, len(conns))
for ic, conn := range conns { for ic, conn := range conns {
clients[ic] = testpb.NewBenchmarkServiceClient(conn) clients[ic] = testpb.NewBenchmarkServiceClient(conn)
@ -235,7 +229,7 @@ func (bc *benchmarkClient) doCloseLoopUnaryBenchmark(conns []*grpc.ClientConn, r
} }
} }
func (bc *benchmarkClient) doCloseLoopStreamingBenchmark(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string) { func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string) {
var doRPC func(testpb.BenchmarkService_StreamingCallClient, int, int) error var doRPC func(testpb.BenchmarkService_StreamingCallClient, int, int) error
if payloadType == "bytebuf" { if payloadType == "bytebuf" {
doRPC = benchmark.DoByteBufStreamingRoundTrip doRPC = benchmark.DoByteBufStreamingRoundTrip
@ -319,6 +313,8 @@ func (bc *benchmarkClient) getStats() *testpb.ClientStats {
} }
} }
// reset clears the contents for histogram and set lastResetTime to Now().
// It is often called to get ready for benchmark runs.
func (bc *benchmarkClient) reset() { func (bc *benchmarkClient) reset() {
bc.mu.Lock() bc.mu.Lock()
defer bc.mu.Unlock() defer bc.mu.Unlock()

View File

@ -57,7 +57,7 @@ var (
type benchmarkServer struct { type benchmarkServer struct {
port int port int
cores int cores int
close func() closeFunc func()
mu sync.RWMutex mu sync.RWMutex
lastResetTime time.Time lastResetTime time.Time
} }
@ -83,7 +83,7 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma
grpclog.Printf(" - security params: %v", config.SecurityParams) grpclog.Printf(" - security params: %v", config.SecurityParams)
if config.SecurityParams != nil { if config.SecurityParams != nil {
creds, err := credentials.NewServerTLSFromFile(Abs(certFile), Abs(keyFile)) creds, err := credentials.NewServerTLSFromFile(abs(certFile), abs(keyFile))
if err != nil { if err != nil {
grpclog.Fatalf("failed to generate credentials %v", err) grpclog.Fatalf("failed to generate credentials %v", err)
} }
@ -95,29 +95,27 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma
numOfCores := 1 numOfCores := 1
if config.CoreLimit > 1 { if config.CoreLimit > 1 {
numOfCores = int(config.CoreLimit) numOfCores = int(config.CoreLimit)
runtime.GOMAXPROCS(numOfCores)
} }
runtime.GOMAXPROCS(numOfCores)
grpclog.Printf(" - port: %v", config.Port) grpclog.Printf(" - port: %v", config.Port)
var port int
// Priority: config.Port > serverPort > default (0). // Priority: config.Port > serverPort > default (0).
if config.Port != 0 { port := int(config.Port)
port = int(config.Port) if port == 0 {
} else if serverPort != 0 {
port = serverPort port = serverPort
} }
grpclog.Printf(" - payload config: %v", config.PayloadConfig) grpclog.Printf(" - payload config: %v", config.PayloadConfig)
var ( var (
addr string addr string
close func() closeFunc func()
err error err error
) )
if config.PayloadConfig != nil { if config.PayloadConfig != nil {
switch payload := config.PayloadConfig.Payload.(type) { switch payload := config.PayloadConfig.Payload.(type) {
case *testpb.PayloadConfig_BytebufParams: case *testpb.PayloadConfig_BytebufParams:
opts = append(opts, grpc.CustomCodec(byteBufCodec{})) opts = append(opts, grpc.CustomCodec(byteBufCodec{}))
addr, close, err = benchmark.StartServer(benchmark.ServerInfo{ addr, closeFunc, err = benchmark.StartServer(benchmark.ServerInfo{
Addr: ":" + strconv.Itoa(port), Addr: ":" + strconv.Itoa(port),
Type: "bytebuf", Type: "bytebuf",
Metadata: payload.BytebufParams.RespSize, Metadata: payload.BytebufParams.RespSize,
@ -126,7 +124,7 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma
grpclog.Fatalf("failed to start server: %v", err) grpclog.Fatalf("failed to start server: %v", err)
} }
case *testpb.PayloadConfig_SimpleParams: case *testpb.PayloadConfig_SimpleParams:
addr, close, err = benchmark.StartServer(benchmark.ServerInfo{ addr, closeFunc, err = benchmark.StartServer(benchmark.ServerInfo{
Addr: ":" + strconv.Itoa(port), Addr: ":" + strconv.Itoa(port),
Type: "protobuf", Type: "protobuf",
}, opts...) }, opts...)
@ -140,7 +138,7 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma
} }
} else { } else {
// Start protobuf server is payload config is nil. // Start protobuf server is payload config is nil.
addr, close, err = benchmark.StartServer(benchmark.ServerInfo{ addr, closeFunc, err = benchmark.StartServer(benchmark.ServerInfo{
Addr: ":" + strconv.Itoa(port), Addr: ":" + strconv.Itoa(port),
Type: "protobuf", Type: "protobuf",
}, opts...) }, opts...)
@ -156,7 +154,7 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma
grpclog.Fatalf("failed to get port number from server address: %v", err) grpclog.Fatalf("failed to get port number from server address: %v", err)
} }
return &benchmarkServer{port: p, cores: numOfCores, close: close, lastResetTime: time.Now()}, nil return &benchmarkServer{port: p, cores: numOfCores, closeFunc: closeFunc, lastResetTime: time.Now()}, nil
} }
func (bs *benchmarkServer) getStats() *testpb.ServerStats { func (bs *benchmarkServer) getStats() *testpb.ServerStats {

View File

@ -59,7 +59,7 @@ type byteBufCodec struct {
func (byteBufCodec) Marshal(v interface{}) ([]byte, error) { func (byteBufCodec) Marshal(v interface{}) ([]byte, error) {
b, ok := v.(*[]byte) b, ok := v.(*[]byte)
if !ok { if !ok {
return nil, fmt.Errorf("Failed to marshal: %v is not type of *[]byte") return nil, fmt.Errorf("failed to marshal: %v is not type of *[]byte")
} }
return *b, nil return *b, nil
} }
@ -67,16 +67,18 @@ func (byteBufCodec) Marshal(v interface{}) ([]byte, error) {
func (byteBufCodec) Unmarshal(data []byte, v interface{}) error { func (byteBufCodec) Unmarshal(data []byte, v interface{}) error {
b, ok := v.(*[]byte) b, ok := v.(*[]byte)
if !ok { if !ok {
return fmt.Errorf("Failed to marshal: %v is not type of *[]byte") return fmt.Errorf("failed to marshal: %v is not type of *[]byte")
} }
*b = data *b = data
return nil return nil
} }
func (byteBufCodec) String() string { func (byteBufCodec) String() string {
return "byteBufCodec" return "bytebuffer"
} }
// workerServer implements WorkerService rpc handlers.
// It can create benchmarkServer or benchmarkClient on demand.
type workerServer struct { type workerServer struct {
stop chan<- bool stop chan<- bool
serverPort int serverPort int
@ -88,7 +90,7 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er
// Close benchmark server when stream ends. // Close benchmark server when stream ends.
grpclog.Printf("closing benchmark server") grpclog.Printf("closing benchmark server")
if bs != nil { if bs != nil {
bs.close() bs.closeFunc()
} }
}() }()
for { for {
@ -110,7 +112,7 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er
} }
if bs != nil { if bs != nil {
grpclog.Printf("server setup received when server already exists, closing the existing server") grpclog.Printf("server setup received when server already exists, closing the existing server")
bs.close() bs.closeFunc()
} }
bs = newbs bs = newbs
out = &testpb.ServerStatus{ out = &testpb.ServerStatus{

View File

@ -38,10 +38,10 @@ import (
"path/filepath" "path/filepath"
) )
// Abs returns the absolute path the given relative file or directory path, // abs returns the absolute path the given relative file or directory path,
// relative to the google.golang.org/grpc directory in the user's GOPATH. // relative to the google.golang.org/grpc directory in the user's GOPATH.
// If rel is already absolute, it is returned unmodified. // If rel is already absolute, it is returned unmodified.
func Abs(rel string) string { func abs(rel string) string {
if filepath.IsAbs(rel) { if filepath.IsAbs(rel) {
return rel return rel
} }