Small fixes in worker main.go

This commit is contained in:
Menghan Li 2016-04-26 16:25:14 -07:00
parent b54a56774d
commit 61623241f4
1 changed files with 28 additions and 17 deletions

View File

@ -71,8 +71,6 @@ func (byteBufCodec) String() string {
type workerServer struct {
stop chan<- bool
serverPort int
// TODO move bc out of workerServer
bc *benchmarkClient
}
func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) error {
@ -92,6 +90,7 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er
return err
}
var out *testpb.ServerStatus
switch argtype := in.Argtype.(type) {
case *testpb.ServerArgs_Setup:
grpclog.Printf("server setup received:")
@ -104,6 +103,11 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er
bs.close()
}
bs = newbs
out = &testpb.ServerStatus{
Stats: bs.getStats(),
Port: int32(bs.port),
Cores: int32(bs.cores),
}
case *testpb.ServerArgs_Mark:
grpclog.Printf("server mark received:")
@ -111,16 +115,16 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er
if bs == nil {
return grpc.Errorf(codes.InvalidArgument, "server does not exist when mark received")
}
out = &testpb.ServerStatus{
Stats: bs.getStats(),
Port: int32(bs.port),
Cores: int32(bs.cores),
}
if argtype.Mark.Reset_ {
bs.reset()
}
}
out := &testpb.ServerStatus{
Stats: bs.getStats(),
Port: int32(bs.port),
Cores: int32(bs.cores),
}
if err := stream.Send(out); err != nil {
return err
}
@ -130,11 +134,13 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er
}
func (s *workerServer) RunClient(stream testpb.WorkerService_RunClientServer) error {
var bc *benchmarkClient
for {
in, err := stream.Recv()
if err == io.EOF {
s.bc.shutdown()
if bc != nil {
bc.shutdown()
}
return nil
}
if err != nil {
@ -145,28 +151,33 @@ func (s *workerServer) RunClient(stream testpb.WorkerService_RunClientServer) er
switch t := in.Argtype.(type) {
case *testpb.ClientArgs_Setup:
grpclog.Printf("client setup received:")
bc, err := startBenchmarkClientWithSetup(t.Setup)
newbc, err := startBenchmarkClientWithSetup(t.Setup)
if err != nil {
return err
}
s.bc = bc
out = &testpb.ClientStatus{
Stats: s.bc.getStats(),
if bc != nil {
grpclog.Printf("client setup received when client already exists, shuting down the existing client")
bc.shutdown()
}
bc = newbc
out = &testpb.ClientStatus{
Stats: bc.getStats(),
}
case *testpb.ClientArgs_Mark:
grpclog.Printf("client mark received:")
grpclog.Printf(" - %v", t)
if s.bc == nil {
if bc == nil {
return grpc.Errorf(codes.InvalidArgument, "client does not exist when mark received")
}
out = &testpb.ClientStatus{
Stats: s.bc.getStats(),
Stats: bc.getStats(),
}
if t.Mark.Reset_ {
s.bc.reset()
bc.reset()
}
}
if err := stream.Send(out); err != nil {
return err
}