control: consolidate code for broadcasting requests to the agent

This commit is contained in:
Anthony Romano 2016-09-22 16:20:05 -07:00
parent f5e4fc7dd0
commit c86e05806e
1 changed files with 41 additions and 85 deletions

View File

@ -112,60 +112,7 @@ func CommandFunc(cmd *cobra.Command, args []string) error {
return nil return nil
} }
func step1(cfg Config) error { func step1(cfg Config) error { return bcastReq(cfg, agent.Request_Start) }
req := cfg.Request()
req.Operation = agent.Request_Start
donec, errc := make(chan struct{}), make(chan error)
for i := range cfg.PeerIPs {
go func(i int) {
nreq := req
nreq.ServerIndex = uint32(i)
nreq.ZookeeperMyID = uint32(i + 1)
ep := cfg.AgentEndpoints[nreq.ServerIndex]
logger.Infof("sending message [index: %d | operation: %q | database: %q | endpoint: %q]", i, req.Operation.String(), req.Database.String(), ep)
conn, err := grpc.Dial(ep, grpc.WithInsecure())
if err != nil {
logger.Errorf("grpc.Dial connecting error (%v) [index: %d | endpoint: %q]", err, i, ep)
errc <- err
return
}
defer conn.Close()
cli := agent.NewTransporterClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) // Consul takes longer
resp, err := cli.Transfer(ctx, &nreq)
cancel()
if err != nil {
logger.Errorf("cli.Transfer error (%v) [index: %d | endpoint: %q]", err, i, ep)
errc <- err
return
}
logger.Infof("got response [index: %d | endpoint: %q | response: %+v]", i, ep, resp)
donec <- struct{}{}
}(i)
time.Sleep(time.Second)
}
cnt := 0
for cnt != len(cfg.PeerIPs) {
select {
case <-donec:
case err := <-errc:
return err
}
cnt++
}
return nil
}
var ( var (
bar *pb.ProgressBar bar *pb.ProgressBar
@ -522,23 +469,49 @@ func step2(cfg Config) error {
return nil return nil
} }
func step3(cfg Config) error { func step3(cfg Config) error { return bcastReq(cfg, agent.Request_Stop) }
func bcastReq(cfg Config, op agent.Request_Operation) error {
req := cfg.Request() req := cfg.Request()
req.Operation = agent.Request_Stop req.Operation = op
donec, errc := make(chan struct{}), make(chan error) donec, errc := make(chan struct{}), make(chan error)
for i := range cfg.PeerIPs { for i := range cfg.PeerIPs {
go func(i int) { go func(i int) {
ep := cfg.AgentEndpoints[i] if err := sendReq(cfg.AgentEndpoints[i], req, i); err != nil {
errc <- err
} else {
donec <- struct{}{}
}
}(i)
time.Sleep(time.Second)
}
var errs []error
for cnt := 0; cnt != len(cfg.PeerIPs); cnt++ {
select {
case <-donec:
case err := <-errc:
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errs[0]
}
return nil
}
func sendReq(ep string, req agent.Request, i int) error {
req.ServerIndex = uint32(i)
req.ZookeeperMyID = uint32(i + 1)
logger.Infof("sending message [index: %d | operation: %q | database: %q | endpoint: %q]", i, req.Operation.String(), req.Database.String(), ep) logger.Infof("sending message [index: %d | operation: %q | database: %q | endpoint: %q]", i, req.Operation.String(), req.Database.String(), ep)
conn, err := grpc.Dial(ep, grpc.WithInsecure()) conn, err := grpc.Dial(ep, grpc.WithInsecure())
if err != nil { if err != nil {
logger.Errorf("grpc.Dial connecting error (%v) [index: %d | endpoint: %q]", err, i, ep) logger.Errorf("grpc.Dial connecting error (%v) [index: %d | endpoint: %q]", err, i, ep)
errc <- err return err
return
} }
defer conn.Close() defer conn.Close()
@ -549,26 +522,9 @@ func step3(cfg Config) error {
cancel() cancel()
if err != nil { if err != nil {
logger.Errorf("cli.Transfer error (%v) [index: %d | endpoint: %q]", err, i, ep) logger.Errorf("cli.Transfer error (%v) [index: %d | endpoint: %q]", err, i, ep)
errc <- err return err
return
} }
logger.Infof("got response [index: %d | endpoint: %q | response: %+v]", i, ep, resp) logger.Infof("got response [index: %d | endpoint: %q | response: %+v]", i, ep, resp)
donec <- struct{}{}
}(i)
time.Sleep(time.Second)
}
cnt := 0
for cnt != len(cfg.PeerIPs) {
select {
case <-donec:
case err := <-errc:
return err
}
cnt++
}
return nil return nil
} }