diff --git a/control/control.go b/control/control.go index 57ae3920..fe09ee25 100644 --- a/control/control.go +++ b/control/control.go @@ -62,6 +62,7 @@ func CommandFunc(cmd *cobra.Command, args []string) error { switch cfg.Step2.BenchType { case "write": case "read": + case "read-oneshot": default: return fmt.Errorf("%q is not supported", cfg.Step2.BenchType) } @@ -114,12 +115,6 @@ func CommandFunc(cmd *cobra.Command, args []string) error { func step1(cfg Config) error { return bcastReq(cfg, agent.Request_Start) } -var ( - bar *pb.ProgressBar - results chan result - wg sync.WaitGroup -) - type values struct { bytes [][]byte strings []string @@ -152,6 +147,38 @@ func newValues(cfg Config) (v values, rerr error) { return } +func generateReport(cfg Config, h []ReqHandler, reqGen func(chan<- request)) { + var wg sync.WaitGroup + results := make(chan result) + requests := make(chan request, cfg.Step2.Clients) + bar := pb.New(cfg.Step2.TotalRequests) + pdoneC := printReport(results, cfg) + bar.Format("Bom !") + bar.Start() + for i := range h { + wg.Add(1) + go func(rh ReqHandler) { + defer wg.Done() + for req := range requests { + st := time.Now() + err := rh(context.Background(), &req) + var errStr string + if err != nil { + errStr = err.Error() + } + results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()} + bar.Increment() + } + }(h[i]) + } + go reqGen(requests) + wg.Wait() + bar.Finish() + + close(results) + <-pdoneC +} + func step2(cfg Config) error { vals, err := newValues(cfg) if err != nil { @@ -159,32 +186,12 @@ func step2(cfg Config) error { } switch cfg.Step2.BenchType { case "write": - results = make(chan result) - requests := make(chan request, cfg.Step2.Clients) - bar = pb.New(cfg.Step2.TotalRequests) - h, done := newWriteHandlers(cfg) if done != nil { defer done() } - - pdoneC := printReport(results, cfg) - - bar.Format("Bom !") - bar.Start() - for i := range h { - wg.Add(1) - go func(rh ReqHandler) { - defer wg.Done() - doHandler(context.Background(), rh, requests) - }(h[i]) - } - go generateWrites(cfg, vals, requests) - wg.Wait() - bar.Finish() - - close(results) - <-pdoneC + reqGen := func(reqs chan<- request) { generateWrites(cfg, vals, reqs) } + generateReport(cfg, h, reqGen) var totalKeysFunc func([]string) map[string]int64 switch cfg.Database { @@ -282,34 +289,43 @@ func step2(cfg Config) error { } } - results = make(chan result) - requests := make(chan request, cfg.Step2.Clients) - h, done := newReadHandlers(cfg) if done != nil { defer done() } - - bar = pb.New(cfg.Step2.TotalRequests) - bar.Format("Bom !") - bar.Start() - - for i := range h { - wg.Add(1) - go func(rh ReqHandler) { - defer wg.Done() - doHandler(context.Background(), rh, requests) - }(h[i]) + reqGen := func(reqs chan<- request) { generateReads(cfg, key, reqs) } + generateReport(cfg, h, reqGen) + case "read-oneshot": + key, value := sameKey(cfg.Step2.KeySize), vals.strings[0] + logger.Infof("writing key for read-oneshot [key: %q | database: %q]", key, cfg.Database) + var err error + switch cfg.Database { + case "etcdv2": + clients := mustCreateClientsEtcdv2(cfg.DatabaseEndpoints, 1) + _, err = clients[0].Set(context.Background(), key, value, nil) + case "etcdv3": + clients := mustCreateClientsEtcdv3(cfg.DatabaseEndpoints, etcdv3ClientCfg{ + totalConns: 1, + totalClients: 1, + }) + _, err = clients[0].Do(context.Background(), clientv3.OpPut(key, value)) + clients[0].Close() + case "zk", "zookeeper": + conns := mustCreateConnsZk(cfg.DatabaseEndpoints, 1) + _, err = conns[0].Create("/"+key, vals.bytes[0], zkCreateFlags, zkCreateAcl) + conns[0].Close() + case "consul": + clients := mustCreateConnsConsul(cfg.DatabaseEndpoints, 1) + _, err = clients[0].Put(&consulapi.KVPair{Key: key, Value: vals.bytes[0]}, nil) + } + if err != nil { + logger.Errorf("write error on read-oneshot (%v)", err) + os.Exit(1) } - pdoneC := printReport(results, cfg) - go generateReads(cfg, key, requests) - wg.Wait() - - bar.Finish() - - close(results) - <-pdoneC + h := newReadOneshotHandlers(cfg) + reqGen := func(reqs chan<- request) { generateReads(cfg, key, reqs) } + generateReport(cfg, h, reqGen) } return nil @@ -377,7 +393,6 @@ func sendReq(ep string, req agent.Request, i int) error { func newReadHandlers(cfg Config) (rhs []ReqHandler, done func()) { rhs = make([]ReqHandler, cfg.Step2.Clients) - switch cfg.Database { case "etcdv2": conns := mustCreateClientsEtcdv2(cfg.DatabaseEndpoints, cfg.Step2.Connections) @@ -414,7 +429,7 @@ func newReadHandlers(cfg Config) (rhs []ReqHandler, done func()) { rhs[i] = newGetConsul(conns[i]) } } - return + return rhs, done } func newWriteHandlers(cfg Config) (rhs []ReqHandler, done func()) { @@ -485,7 +500,47 @@ func newWriteHandlers(cfg Config) (rhs []ReqHandler, done func()) { return } -func generateReads(cfg Config, key string, requests chan request) { +func newReadOneshotHandlers(cfg Config) []ReqHandler { + rhs := make([]ReqHandler, cfg.Step2.Clients) + switch cfg.Database { + case "etcdv2": + for i := range rhs { + rhs[i] = func(ctx context.Context, req *request) error { + conns := mustCreateClientsEtcdv2(cfg.DatabaseEndpoints, 1) + return newGetEtcd2(conns[0])(ctx, req) + } + } + case "etcdv3": + for i := range rhs { + rhs[i] = func(ctx context.Context, req *request) error { + conns := mustCreateClientsEtcdv3(cfg.DatabaseEndpoints, etcdv3ClientCfg{ + totalConns: 1, + totalClients: 1, + }) + defer conns[0].Close() + return newGetEtcd3(conns[0])(ctx, req) + } + } + case "zk", "zookeeper": + for i := range rhs { + rhs[i] = func(ctx context.Context, req *request) error { + conns := mustCreateConnsZk(cfg.DatabaseEndpoints, cfg.Step2.Connections) + defer conns[0].Close() + return newGetZK(conns[0])(ctx, req) + } + } + case "consul": + for i := range rhs { + rhs[i] = func(ctx context.Context, req *request) error { + conns := mustCreateConnsConsul(cfg.DatabaseEndpoints, 1) + return newGetConsul(conns[0])(ctx, req) + } + } + } + return rhs +} + +func generateReads(cfg Config, key string, requests chan<- request) { defer close(requests) for i := 0; i < cfg.Step2.TotalRequests; i++ { @@ -515,10 +570,12 @@ func generateReads(cfg Config, key string, requests chan request) { } } - -func generateWrites(cfg Config, vals values, requests chan request) { - defer close(requests) - +func generateWrites(cfg Config, vals values, requests chan<- request) { + var wg sync.WaitGroup + defer func() { + close(requests) + wg.Wait() + }() for i := 0; i < cfg.Step2.TotalRequests; i++ { if cfg.Database == "etcdv3" && cfg.Step2.Etcdv3CompactionCycle > 0 && i%cfg.Step2.Etcdv3CompactionCycle == 0 { logger.Infof("starting compaction [index: %d | database: %q]", i, "etcdv3") diff --git a/control/util.go b/control/util.go index 9b7c5570..28addbd1 100644 --- a/control/util.go +++ b/control/util.go @@ -186,19 +186,6 @@ func mustRandBytes(n int) []byte { return rb } -func doHandler(ctx context.Context, f ReqHandler, requests <-chan request) { - for req := range requests { - st := time.Now() - err := f(ctx, &req) - var errStr string - if err != nil { - errStr = err.Error() - } - results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()} - bar.Increment() - } -} - func getTotalKeysEtcdv2(endpoints []string) map[string]int64 { rs := make(map[string]int64) for _, ep := range endpoints {