diff --git a/benchmark/worker/benchmark_server.go b/benchmark/worker/benchmark_server.go index 41c430a03..4a7072fe2 100644 --- a/benchmark/worker/benchmark_server.go +++ b/benchmark/worker/benchmark_server.go @@ -27,7 +27,7 @@ type benchmarkServer struct { lastResetTime time.Time } -func startBenchmarkServerWithSetup(setup *testpb.ServerConfig) (*benchmarkServer, error) { +func startBenchmarkServerWithSetup(setup *testpb.ServerConfig, serverPort int) (*benchmarkServer, error) { var opts []grpc.ServerOption grpclog.Printf(" - server type: %v", setup.ServerType) @@ -55,49 +55,43 @@ func startBenchmarkServerWithSetup(setup *testpb.ServerConfig) (*benchmarkServer if setup.CoreLimit > 0 { runtime.GOMAXPROCS(int(setup.CoreLimit)) } else { - // runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(1) } grpclog.Printf(" - core list: %v", setup.CoreList) if len(setup.CoreList) > 0 { - // TODO core list - grpclog.Printf("specifying cores to run server on: %v", setup.CoreList) + return nil, grpc.Errorf(codes.InvalidArgument, "specifying core list is not supported") } grpclog.Printf(" - port: %v", setup.Port) + var port int + if setup.Port != 0 { + port = int(setup.Port) + } else if serverPort != 0 { + port = serverPort + } grpclog.Printf(" - payload config: %v", setup.PayloadConfig) var p int var close func() if setup.PayloadConfig != nil { - // TODO payload config - grpclog.Printf("payload config: %v", setup.PayloadConfig) switch payload := setup.PayloadConfig.Payload.(type) { case *testpb.PayloadConfig_BytebufParams: opts = append(opts, grpc.CustomCodec(byteBufCodec{})) - p, close = benchmark.StartGenericServer(":"+strconv.Itoa(int(setup.Port)), payload.BytebufParams.ReqSize, payload.BytebufParams.RespSize, opts...) + p, close = benchmark.StartGenericServer(":"+strconv.Itoa(port), payload.BytebufParams.ReqSize, payload.BytebufParams.RespSize, opts...) case *testpb.PayloadConfig_SimpleParams: - p, close = benchmark.StartServer(":"+strconv.Itoa(int(setup.Port)), opts...) + p, close = benchmark.StartServer(":"+strconv.Itoa(port), opts...) case *testpb.PayloadConfig_ComplexParams: + return nil, grpc.Errorf(codes.InvalidArgument, "unsupported payload config: %v", setup.PayloadConfig) default: return nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", setup.PayloadConfig) } } else { // Start protobuf server is payload config is nil - p, close = benchmark.StartServer(":"+strconv.Itoa(int(setup.Port)), opts...) + p, close = benchmark.StartServer(":"+strconv.Itoa(port), opts...) } grpclog.Printf("benchmark server listening at port %v", p) - // temp := strings.Split(addr, ":") - // if len(temp) <= 0 { - // return nil, grpc.Errorf(codes.Internal, "benchmark test address not valid: %v", addr) - // } - // p, err := strconv.Atoi(temp[len(temp)-1]) - // if err != nil { - // return nil, grpc.Errorf(codes.Internal, "%v", err) - // } - bs := &benchmarkServer{port: p, close: close, lastResetTime: time.Now()} return bs, nil } @@ -109,6 +103,7 @@ func (bs *benchmarkServer) getStats() *testpb.ServerStats { } func (bs *benchmarkServer) reset() { + // TODO wall time, sys time, user time bs.mu.Lock() defer bs.mu.Unlock() bs.lastResetTime = time.Now() diff --git a/benchmark/worker/main.go b/benchmark/worker/main.go index dc4782dc3..006b46d84 100644 --- a/benchmark/worker/main.go +++ b/benchmark/worker/main.go @@ -1,10 +1,11 @@ package main import ( + "flag" "io" "net" "runtime" - "sync" + "strconv" "golang.org/x/net/context" "google.golang.org/grpc" @@ -14,8 +15,8 @@ import ( ) var ( - ports = []string{":10000"} - // ports = []string{":10010"} + driverPort = flag.Int("driver_port", 10000, "port for communication with driver") + serverPort = flag.Int("server_port", 0, "port for operation as a server") ) type byteBufCodec struct { @@ -35,7 +36,9 @@ func (byteBufCodec) String() string { } type workerServer struct { - bs *benchmarkServer + bs *benchmarkServer + stop chan<- bool + serverPort int } func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) error { @@ -57,7 +60,7 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er case *testpb.ServerArgs_Setup: grpclog.Printf("server setup received:") - bs, err := startBenchmarkServerWithSetup(t.Setup) + bs, err := startBenchmarkServerWithSetup(t.Setup, s.serverPort) if err != nil { return err } @@ -100,25 +103,25 @@ func (s *workerServer) QuitWorker(ctx context.Context, in *testpb.Void) (*testpb if s.bs != nil { s.bs.close() } + s.stop <- true return &testpb.Void{}, nil } func main() { - var wg sync.WaitGroup - wg.Add(len(ports)) - for i := 0; i < len(ports); i++ { - lis, err := net.Listen("tcp", ports[i]) - if err != nil { - grpclog.Fatalf("failed to listen: %v", err) - } - grpclog.Printf("worker %d listening at port %v", i, ports[i]) - - s := grpc.NewServer() - testpb.RegisterWorkerServiceServer(s, &workerServer{}) - go func() { - defer wg.Done() - s.Serve(lis) - }() + flag.Parse() + lis, err := net.Listen("tcp", ":"+strconv.Itoa(*driverPort)) + if err != nil { + grpclog.Fatalf("failed to listen: %v", err) } - wg.Wait() + grpclog.Printf("worker listening at port %v", *driverPort) + + s := grpc.NewServer() + stop := make(chan bool) + testpb.RegisterWorkerServiceServer(s, &workerServer{ + stop: stop, + serverPort: *serverPort, + }) + go s.Serve(lis) + <-stop + s.Stop() }