Add driver command line options

This commit is contained in:
Menghan Li 2016-04-21 15:59:24 -07:00
parent 643486f084
commit 26b336d491
2 changed files with 37 additions and 39 deletions

View File

@ -27,7 +27,7 @@ type benchmarkServer struct {
lastResetTime time.Time
}
func startBenchmarkServerWithSetup(setup *testpb.ServerConfig) (*benchmarkServer, error) {
func startBenchmarkServerWithSetup(setup *testpb.ServerConfig, serverPort int) (*benchmarkServer, error) {
var opts []grpc.ServerOption
grpclog.Printf(" - server type: %v", setup.ServerType)
@ -55,49 +55,43 @@ func startBenchmarkServerWithSetup(setup *testpb.ServerConfig) (*benchmarkServer
if setup.CoreLimit > 0 {
runtime.GOMAXPROCS(int(setup.CoreLimit))
} else {
// runtime.GOMAXPROCS(runtime.NumCPU())
runtime.GOMAXPROCS(1)
}
grpclog.Printf(" - core list: %v", setup.CoreList)
if len(setup.CoreList) > 0 {
// TODO core list
grpclog.Printf("specifying cores to run server on: %v", setup.CoreList)
return nil, grpc.Errorf(codes.InvalidArgument, "specifying core list is not supported")
}
grpclog.Printf(" - port: %v", setup.Port)
var port int
if setup.Port != 0 {
port = int(setup.Port)
} else if serverPort != 0 {
port = serverPort
}
grpclog.Printf(" - payload config: %v", setup.PayloadConfig)
var p int
var close func()
if setup.PayloadConfig != nil {
// TODO payload config
grpclog.Printf("payload config: %v", setup.PayloadConfig)
switch payload := setup.PayloadConfig.Payload.(type) {
case *testpb.PayloadConfig_BytebufParams:
opts = append(opts, grpc.CustomCodec(byteBufCodec{}))
p, close = benchmark.StartGenericServer(":"+strconv.Itoa(int(setup.Port)), payload.BytebufParams.ReqSize, payload.BytebufParams.RespSize, opts...)
p, close = benchmark.StartGenericServer(":"+strconv.Itoa(port), payload.BytebufParams.ReqSize, payload.BytebufParams.RespSize, opts...)
case *testpb.PayloadConfig_SimpleParams:
p, close = benchmark.StartServer(":"+strconv.Itoa(int(setup.Port)), opts...)
p, close = benchmark.StartServer(":"+strconv.Itoa(port), opts...)
case *testpb.PayloadConfig_ComplexParams:
return nil, grpc.Errorf(codes.InvalidArgument, "unsupported payload config: %v", setup.PayloadConfig)
default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", setup.PayloadConfig)
}
} else {
// Start protobuf server is payload config is nil
p, close = benchmark.StartServer(":"+strconv.Itoa(int(setup.Port)), opts...)
p, close = benchmark.StartServer(":"+strconv.Itoa(port), opts...)
}
grpclog.Printf("benchmark server listening at port %v", p)
// temp := strings.Split(addr, ":")
// if len(temp) <= 0 {
// return nil, grpc.Errorf(codes.Internal, "benchmark test address not valid: %v", addr)
// }
// p, err := strconv.Atoi(temp[len(temp)-1])
// if err != nil {
// return nil, grpc.Errorf(codes.Internal, "%v", err)
// }
bs := &benchmarkServer{port: p, close: close, lastResetTime: time.Now()}
return bs, nil
}
@ -109,6 +103,7 @@ func (bs *benchmarkServer) getStats() *testpb.ServerStats {
}
func (bs *benchmarkServer) reset() {
// TODO wall time, sys time, user time
bs.mu.Lock()
defer bs.mu.Unlock()
bs.lastResetTime = time.Now()

View File

@ -1,10 +1,11 @@
package main
import (
"flag"
"io"
"net"
"runtime"
"sync"
"strconv"
"golang.org/x/net/context"
"google.golang.org/grpc"
@ -14,8 +15,8 @@ import (
)
var (
ports = []string{":10000"}
// ports = []string{":10010"}
driverPort = flag.Int("driver_port", 10000, "port for communication with driver")
serverPort = flag.Int("server_port", 0, "port for operation as a server")
)
type byteBufCodec struct {
@ -35,7 +36,9 @@ func (byteBufCodec) String() string {
}
type workerServer struct {
bs *benchmarkServer
bs *benchmarkServer
stop chan<- bool
serverPort int
}
func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) error {
@ -57,7 +60,7 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er
case *testpb.ServerArgs_Setup:
grpclog.Printf("server setup received:")
bs, err := startBenchmarkServerWithSetup(t.Setup)
bs, err := startBenchmarkServerWithSetup(t.Setup, s.serverPort)
if err != nil {
return err
}
@ -100,25 +103,25 @@ func (s *workerServer) QuitWorker(ctx context.Context, in *testpb.Void) (*testpb
if s.bs != nil {
s.bs.close()
}
s.stop <- true
return &testpb.Void{}, nil
}
func main() {
var wg sync.WaitGroup
wg.Add(len(ports))
for i := 0; i < len(ports); i++ {
lis, err := net.Listen("tcp", ports[i])
if err != nil {
grpclog.Fatalf("failed to listen: %v", err)
}
grpclog.Printf("worker %d listening at port %v", i, ports[i])
s := grpc.NewServer()
testpb.RegisterWorkerServiceServer(s, &workerServer{})
go func() {
defer wg.Done()
s.Serve(lis)
}()
flag.Parse()
lis, err := net.Listen("tcp", ":"+strconv.Itoa(*driverPort))
if err != nil {
grpclog.Fatalf("failed to listen: %v", err)
}
wg.Wait()
grpclog.Printf("worker listening at port %v", *driverPort)
s := grpc.NewServer()
stop := make(chan bool)
testpb.RegisterWorkerServiceServer(s, &workerServer{
stop: stop,
serverPort: *serverPort,
})
go s.Serve(lis)
<-stop
s.Stop()
}