diff --git a/broadcast_request.go b/broadcast_request.go index 42e225c6..d5cf5325 100644 --- a/broadcast_request.go +++ b/broadcast_request.go @@ -24,8 +24,8 @@ import ( ) // BroadcaseRequest sends request to all endpoints. -func (cfg *Config) BroadcaseRequest(databaseID string, op dbtesterpb.Request_Operation) (map[int]dbtesterpb.Response, error) { - gcfg, ok := cfg.DatabaseIDToTestGroup[databaseID] +func (cfg *Config) BroadcaseRequest(databaseID string, op dbtesterpb.Operation) (map[int]dbtesterpb.Response, error) { + gcfg, ok := cfg.DatabaseIDToConfigClientMachineAgentControl[databaseID] if !ok { return nil, fmt.Errorf("database id %q does not exist", databaseID) } diff --git a/readme.go b/readme.go index 811c1e48..1168d069 100644 --- a/readme.go +++ b/readme.go @@ -1,3 +1,17 @@ +// Copyright 2017 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package dbtester import ( @@ -8,7 +22,7 @@ import ( // WriteREADME writes README. func (cfg *Config) WriteREADME(summary string) error { - plog.Printf("writing README at %q", cfg.README.OutputPath) + plog.Printf("writing README at %q", cfg.ConfigAnalyzeMachineREADME.OutputPath) buf := new(bytes.Buffer) buf.WriteString("\n\n") @@ -33,5 +47,5 @@ func (cfg *Config) WriteREADME(summary string) error { buf.WriteString("\n\n") } - return toFile(buf.String(), cfg.README.OutputPath) + return toFile(buf.String(), cfg.ConfigAnalyzeMachineREADME.OutputPath) } diff --git a/report.go b/report.go index 83b2773f..0a656161 100644 --- a/report.go +++ b/report.go @@ -20,6 +20,7 @@ import ( "time" "github.com/cheggaaa/pb" + "github.com/coreos/dbtester/dbtesterpb" "github.com/coreos/etcd/pkg/report" "golang.org/x/net/context" ) @@ -135,8 +136,8 @@ func printStats(st report.Stats) { } } -func (cfg *Config) generateReport(gcfg TestGroup, h []ReqHandler, reqDone func(), reqGen func(chan<- request)) { - b := newBenchmark(gcfg.RequestNumber, gcfg.ClientNumber, h, reqDone, reqGen) +func (cfg *Config) generateReport(gcfg dbtesterpb.ConfigClientMachineAgentControl, h []ReqHandler, reqDone func(), reqGen func(chan<- request)) { + b := newBenchmark(gcfg.ConfigClientMachineBenchmarkOptions.RequestNumber, gcfg.ConfigClientMachineBenchmarkOptions.ClientNumber, h, reqDone, reqGen) b.startRequests() b.waitAll() diff --git a/report_save_upload.go b/report_save_upload.go index 81fdd4f7..18165144 100644 --- a/report_save_upload.go +++ b/report_save_upload.go @@ -38,7 +38,7 @@ var DiskSpaceUsageSummaryColumns = []string{ // SaveDiskSpaceUsageSummary saves data size summary. func (cfg *Config) SaveDiskSpaceUsageSummary(databaseID string, idxToResponse map[int]dbtesterpb.Response) error { - gcfg, ok := cfg.DatabaseIDToTestGroup[databaseID] + gcfg, ok := cfg.DatabaseIDToConfigClientMachineAgentControl[databaseID] if !ok { return fmt.Errorf("%q does not exist", databaseID) } @@ -68,7 +68,7 @@ func (cfg *Config) SaveDiskSpaceUsageSummary(databaseID string, idxToResponse ma return err } - return fr.CSV(cfg.Control.ServerDiskSpaceUsageSummaryPath) + return fr.CSV(cfg.ConfigClientMachineInitial.ServerDiskSpaceUsageSummaryPath) } func (cfg *Config) saveDataLatencyDistributionSummary(st report.Stats) { @@ -126,7 +126,7 @@ func (cfg *Config) saveDataLatencyDistributionSummary(st report.Stats) { } } - if err := fr.CSVHorizontal(cfg.Control.ClientLatencyDistributionSummaryPath); err != nil { + if err := fr.CSVHorizontal(cfg.ConfigClientMachineInitial.ClientLatencyDistributionSummaryPath); err != nil { plog.Fatal(err) } } @@ -152,7 +152,7 @@ func (cfg *Config) saveDataLatencyDistributionPercentile(st report.Stats) { if err := fr.AddColumn(c2); err != nil { plog.Fatal(err) } - if err := fr.CSV(cfg.Control.ClientLatencyDistributionPercentilePath); err != nil { + if err := fr.CSV(cfg.ConfigClientMachineInitial.ClientLatencyDistributionPercentilePath); err != nil { plog.Fatal(err) } } @@ -205,16 +205,16 @@ func (cfg *Config) saveDataLatencyDistributionAll(st report.Stats) { if err := fr.AddColumn(c2); err != nil { plog.Fatal(err) } - if err := fr.CSV(cfg.Control.ClientLatencyDistributionAllPath); err != nil { + if err := fr.CSV(cfg.ConfigClientMachineInitial.ClientLatencyDistributionAllPath); err != nil { plog.Fatal(err) } } -func (cfg *Config) saveDataLatencyThroughputTimeseries(gcfg TestGroup, st report.Stats, clientNs []int64) { - if len(clientNs) == 0 && len(gcfg.ConnectionClientNumbers) == 0 { +func (cfg *Config) saveDataLatencyThroughputTimeseries(gcfg dbtesterpb.ConfigClientMachineAgentControl, st report.Stats, clientNs []int64) { + if len(clientNs) == 0 && len(gcfg.ConfigClientMachineBenchmarkOptions.ConnectionClientNumbers) == 0 { clientNs = make([]int64, len(st.TimeSeries)) for i := range clientNs { - clientNs[i] = gcfg.BenchmarkOptions.ClientNumber + clientNs[i] = gcfg.ConfigClientMachineBenchmarkOptions.ClientNumber } } c1 := dataframe.NewColumn("UNIX-SECOND") @@ -253,12 +253,12 @@ func (cfg *Config) saveDataLatencyThroughputTimeseries(gcfg TestGroup, st report plog.Fatal(err) } - if err := fr.CSV(cfg.Control.ClientLatencyThroughputTimeseriesPath); err != nil { + if err := fr.CSV(cfg.ConfigClientMachineInitial.ClientLatencyThroughputTimeseriesPath); err != nil { plog.Fatal(err) } // aggregate latency by the number of keys - tss := FindRangesLatency(st.TimeSeries, 1000, gcfg.RequestNumber) + tss := FindRangesLatency(st.TimeSeries, 1000, gcfg.ConfigClientMachineBenchmarkOptions.RequestNumber) ctt1 := dataframe.NewColumn("KEYS") ctt2 := dataframe.NewColumn("MIN-LATENCY-MS") ctt3 := dataframe.NewColumn("AVG-LATENCY-MS") @@ -284,12 +284,12 @@ func (cfg *Config) saveDataLatencyThroughputTimeseries(gcfg TestGroup, st report plog.Fatal(err) } - if err := frr.CSV(cfg.Control.ClientLatencyByKeyNumberPath); err != nil { + if err := frr.CSV(cfg.ConfigClientMachineInitial.ClientLatencyByKeyNumberPath); err != nil { plog.Fatal(err) } } -func (cfg *Config) saveAllStats(gcfg TestGroup, stats report.Stats, clientNs []int64) { +func (cfg *Config) saveAllStats(gcfg dbtesterpb.ConfigClientMachineAgentControl, stats report.Stats, clientNs []int64) { cfg.saveDataLatencyDistributionSummary(stats) cfg.saveDataLatencyDistributionPercentile(stats) cfg.saveDataLatencyDistributionAll(stats) @@ -298,14 +298,14 @@ func (cfg *Config) saveAllStats(gcfg TestGroup, stats report.Stats, clientNs []i // UploadToGoogle uploads target file to Google Cloud Storage. func (cfg *Config) UploadToGoogle(databaseID string, targetPath string) error { - gcfg, ok := cfg.DatabaseIDToTestGroup[databaseID] + gcfg, ok := cfg.DatabaseIDToConfigClientMachineAgentControl[databaseID] if !ok { return fmt.Errorf("%q does not exist", databaseID) } if !exist(targetPath) { return fmt.Errorf("%q does not exist", targetPath) } - u, err := remotestorage.NewGoogleCloudStorage([]byte(cfg.Control.GoogleCloudStorageKey), cfg.Control.GoogleCloudProjectName) + u, err := remotestorage.NewGoogleCloudStorage([]byte(cfg.ConfigClientMachineInitial.GoogleCloudStorageKey), cfg.ConfigClientMachineInitial.GoogleCloudProjectName) if err != nil { return err } @@ -315,11 +315,11 @@ func (cfg *Config) UploadToGoogle(databaseID string, targetPath string) error { if !strings.HasPrefix(dstPath, gcfg.DatabaseTag) { dstPath = fmt.Sprintf("%s-%s", gcfg.DatabaseTag, dstPath) } - dstPath = filepath.Join(cfg.Control.GoogleCloudStorageSubDirectory, dstPath) + dstPath = filepath.Join(cfg.ConfigClientMachineInitial.GoogleCloudStorageSubDirectory, dstPath) var uerr error for k := 0; k < 30; k++ { - if uerr = u.UploadFile(cfg.Control.GoogleCloudStorageBucketName, srcPath, dstPath); uerr != nil { + if uerr = u.UploadFile(cfg.ConfigClientMachineInitial.GoogleCloudStorageBucketName, srcPath, dstPath); uerr != nil { plog.Printf("#%d: error %v while uploading %q", k, uerr, targetPath) time.Sleep(2 * time.Second) continue diff --git a/stress.go b/stress.go index 0f248fdc..0405b7f9 100644 --- a/stress.go +++ b/stress.go @@ -36,8 +36,8 @@ type values struct { sampleSize int } -func newValues(gcfg TestGroup) (v values, rerr error) { - v.bytes = [][]byte{randBytes(gcfg.BenchmarkOptions.ValueSizeBytes)} +func newValues(gcfg dbtesterpb.ConfigClientMachineAgentControl) (v values, rerr error) { + v.bytes = [][]byte{randBytes(gcfg.ConfigClientMachineBenchmarkOptions.ValueSizeBytes)} v.strings = []string{string(v.bytes[0])} v.sampleSize = 1 return @@ -45,7 +45,7 @@ func newValues(gcfg TestGroup) (v values, rerr error) { // Stress stresses the database. func (cfg *Config) Stress(databaseID string) error { - gcfg, ok := cfg.DatabaseIDToTestGroup[databaseID] + gcfg, ok := cfg.DatabaseIDToConfigClientMachineAgentControl[databaseID] if !ok { return fmt.Errorf("%q does not exist", databaseID) } @@ -55,40 +55,40 @@ func (cfg *Config) Stress(databaseID string) error { return err } - switch gcfg.BenchmarkOptions.Type { + switch gcfg.ConfigClientMachineBenchmarkOptions.Type { case "write": plog.Println("write generateReport is started...") // fixed number of client numbers - if len(gcfg.BenchmarkOptions.ConnectionClientNumbers) == 0 { + if len(gcfg.ConfigClientMachineBenchmarkOptions.ConnectionClientNumbers) == 0 { h, done := newWriteHandlers(gcfg) reqGen := func(inflightReqs chan<- request) { generateWrites(gcfg, 0, vals, inflightReqs) } cfg.generateReport(gcfg, h, done, reqGen) } else { // variable client numbers - rs := assignRequest(gcfg.BenchmarkOptions.ConnectionClientNumbers, gcfg.BenchmarkOptions.RequestNumber) + rs := assignRequest(gcfg.ConfigClientMachineBenchmarkOptions.ConnectionClientNumbers, gcfg.ConfigClientMachineBenchmarkOptions.RequestNumber) var stats []report.Stats reqCompleted := int64(0) for i := 0; i < len(rs); i++ { copied := gcfg - copied.BenchmarkOptions.ConnectionNumber = gcfg.BenchmarkOptions.ConnectionClientNumbers[i] - copied.BenchmarkOptions.ClientNumber = gcfg.BenchmarkOptions.ConnectionClientNumbers[i] - copied.BenchmarkOptions.RequestNumber = rs[i] + copied.ConfigClientMachineBenchmarkOptions.ConnectionNumber = gcfg.ConfigClientMachineBenchmarkOptions.ConnectionClientNumbers[i] + copied.ConfigClientMachineBenchmarkOptions.ClientNumber = gcfg.ConfigClientMachineBenchmarkOptions.ConnectionClientNumbers[i] + copied.ConfigClientMachineBenchmarkOptions.RequestNumber = rs[i] ncfg := *cfg - ncfg.DatabaseIDToTestGroup[databaseID] = copied + ncfg.DatabaseIDToConfigClientMachineAgentControl[databaseID] = copied go func() { - plog.Infof("signaling agent with client number %d", copied.BenchmarkOptions.ClientNumber) - if _, err := (&ncfg).BroadcaseRequest(databaseID, dbtesterpb.Request_Heartbeat); err != nil { + plog.Infof("signaling agent with client number %d", copied.ConfigClientMachineBenchmarkOptions.ClientNumber) + if _, err := (&ncfg).BroadcaseRequest(databaseID, dbtesterpb.Operation_Heartbeat); err != nil { plog.Panic(err) } }() h, done := newWriteHandlers(copied) reqGen := func(inflightReqs chan<- request) { generateWrites(copied, reqCompleted, vals, inflightReqs) } - b := newBenchmark(copied.BenchmarkOptions.RequestNumber, copied.BenchmarkOptions.ClientNumber, h, done, reqGen) + b := newBenchmark(copied.ConfigClientMachineBenchmarkOptions.RequestNumber, copied.ConfigClientMachineBenchmarkOptions.ClientNumber, h, done, reqGen) // wait until rs[i] requests are finished // do not end reports yet @@ -106,7 +106,7 @@ func (cfg *Config) Stress(databaseID string) error { plog.Info("combining all reports") combined := report.Stats{ErrorDist: make(map[string]int)} - combinedClientNumber := make([]int64, 0, gcfg.BenchmarkOptions.RequestNumber) + combinedClientNumber := make([]int64, 0, gcfg.ConfigClientMachineBenchmarkOptions.RequestNumber) for i, st := range stats { combined.AvgTotal += st.AvgTotal combined.Total += st.Total @@ -128,7 +128,7 @@ func (cfg *Config) Stress(databaseID string) error { // So now we have two duplicate unix time seconds. // This will be handled in aggregating by keys. // - clientN := gcfg.BenchmarkOptions.ConnectionClientNumbers[i] + clientN := gcfg.ConfigClientMachineBenchmarkOptions.ConnectionClientNumbers[i] clientNs := make([]int64, len(st.TimeSeries)) for i := range st.TimeSeries { clientNs[i] = clientN @@ -184,18 +184,18 @@ func (cfg *Config) Stress(databaseID string) error { } for k, v := range totalKeysFunc(gcfg.DatabaseEndpoints) { plog.Infof("expected write total results [expected_total: %d | database: %q | endpoint: %q | number_of_keys: %d]", - gcfg.BenchmarkOptions.RequestNumber, gcfg.DatabaseID, k, v) + gcfg.ConfigClientMachineBenchmarkOptions.RequestNumber, gcfg.DatabaseID, k, v) } case "read": - key, value := sameKey(gcfg.BenchmarkOptions.KeySizeBytes), vals.strings[0] + key, value := sameKey(gcfg.ConfigClientMachineBenchmarkOptions.KeySizeBytes), vals.strings[0] switch gcfg.DatabaseID { case "etcdv2": plog.Infof("write started [request: PUT | key: %q | database: %q]", key, gcfg.DatabaseID) var err error for i := 0; i < 7; i++ { - clients := mustCreateClientsEtcdv2(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) + clients := mustCreateClientsEtcdv2(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber) _, err = clients[0].Set(context.Background(), key, value, nil) if err != nil { continue @@ -232,7 +232,7 @@ func (cfg *Config) Stress(databaseID string) error { plog.Infof("write started [request: PUT | key: %q | database: %q]", key, gcfg.DatabaseID) var err error for i := 0; i < 7; i++ { - conns := mustCreateConnsZk(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) + conns := mustCreateConnsZk(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber) _, err = conns[0].Create("/"+key, vals.bytes[0], zkCreateFlags, zkCreateACL) if err != nil { continue @@ -252,7 +252,7 @@ func (cfg *Config) Stress(databaseID string) error { plog.Infof("write started [request: PUT | key: %q | database: %q]", key, gcfg.DatabaseID) var err error for i := 0; i < 7; i++ { - clients := mustCreateConnsConsul(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) + clients := mustCreateConnsConsul(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber) _, err = clients[0].Put(&consulapi.KVPair{Key: key, Value: vals.bytes[0]}, nil) if err != nil { continue @@ -272,7 +272,7 @@ func (cfg *Config) Stress(databaseID string) error { plog.Println("read generateReport is finished...") case "read-oneshot": - key, value := sameKey(gcfg.BenchmarkOptions.KeySizeBytes), vals.strings[0] + key, value := sameKey(gcfg.ConfigClientMachineBenchmarkOptions.KeySizeBytes), vals.strings[0] plog.Infof("writing key for read-oneshot [key: %q | database: %q]", key, gcfg.DatabaseID) var err error switch gcfg.DatabaseID { @@ -311,18 +311,18 @@ func (cfg *Config) Stress(databaseID string) error { return nil } -func newReadHandlers(gcfg TestGroup) (rhs []ReqHandler, done func()) { - rhs = make([]ReqHandler, gcfg.BenchmarkOptions.ClientNumber) +func newReadHandlers(gcfg dbtesterpb.ConfigClientMachineAgentControl) (rhs []ReqHandler, done func()) { + rhs = make([]ReqHandler, gcfg.ConfigClientMachineBenchmarkOptions.ClientNumber) switch gcfg.DatabaseID { case "etcdv2": - conns := mustCreateClientsEtcdv2(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) + conns := mustCreateClientsEtcdv2(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber) for i := range conns { rhs[i] = newGetEtcd2(conns[i]) } case "etcdv3", "etcdtip": clients := mustCreateClientsEtcdv3(gcfg.DatabaseEndpoints, etcdv3ClientCfg{ - totalConns: gcfg.BenchmarkOptions.ConnectionNumber, - totalClients: gcfg.BenchmarkOptions.ClientNumber, + totalConns: gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber, + totalClients: gcfg.ConfigClientMachineBenchmarkOptions.ClientNumber, }) for i := range clients { rhs[i] = newGetEtcd3(clients[i].KV) @@ -333,7 +333,7 @@ func newReadHandlers(gcfg TestGroup) (rhs []ReqHandler, done func()) { } } case "zookeeper", "zetcd": - conns := mustCreateConnsZk(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) + conns := mustCreateConnsZk(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber) for i := range conns { rhs[i] = newGetZK(conns[i]) } @@ -343,7 +343,7 @@ func newReadHandlers(gcfg TestGroup) (rhs []ReqHandler, done func()) { } } case "consul", "cetcd": - conns := mustCreateConnsConsul(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) + conns := mustCreateConnsConsul(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber) for i := range conns { rhs[i] = newGetConsul(conns[i]) } @@ -351,18 +351,18 @@ func newReadHandlers(gcfg TestGroup) (rhs []ReqHandler, done func()) { return rhs, done } -func newWriteHandlers(gcfg TestGroup) (rhs []ReqHandler, done func()) { - rhs = make([]ReqHandler, gcfg.BenchmarkOptions.ClientNumber) +func newWriteHandlers(gcfg dbtesterpb.ConfigClientMachineAgentControl) (rhs []ReqHandler, done func()) { + rhs = make([]ReqHandler, gcfg.ConfigClientMachineBenchmarkOptions.ClientNumber) switch gcfg.DatabaseID { case "etcdv2": - conns := mustCreateClientsEtcdv2(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) + conns := mustCreateClientsEtcdv2(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber) for i := range conns { rhs[i] = newPutEtcd2(conns[i]) } case "etcdv3", "etcdtip": etcdClients := mustCreateClientsEtcdv3(gcfg.DatabaseEndpoints, etcdv3ClientCfg{ - totalConns: gcfg.BenchmarkOptions.ConnectionNumber, - totalClients: gcfg.BenchmarkOptions.ClientNumber, + totalConns: gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber, + totalClients: gcfg.ConfigClientMachineBenchmarkOptions.ClientNumber, }) for i := range etcdClients { rhs[i] = newPutEtcd3(etcdClients[i]) @@ -373,13 +373,13 @@ func newWriteHandlers(gcfg TestGroup) (rhs []ReqHandler, done func()) { } } case "zookeeper", "zetcd": - if gcfg.BenchmarkOptions.SameKey { - key := sameKey(gcfg.BenchmarkOptions.KeySizeBytes) - valueBts := randBytes(gcfg.BenchmarkOptions.ValueSizeBytes) + if gcfg.ConfigClientMachineBenchmarkOptions.SameKey { + key := sameKey(gcfg.ConfigClientMachineBenchmarkOptions.KeySizeBytes) + valueBts := randBytes(gcfg.ConfigClientMachineBenchmarkOptions.ValueSizeBytes) plog.Infof("write started [request: PUT | key: %q | database: %q]", key, gcfg.DatabaseID) var err error for i := 0; i < 7; i++ { - conns := mustCreateConnsZk(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) + conns := mustCreateConnsZk(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber) _, err = conns[0].Create("/"+key, valueBts, zkCreateFlags, zkCreateACL) if err != nil { continue @@ -396,9 +396,9 @@ func newWriteHandlers(gcfg TestGroup) (rhs []ReqHandler, done func()) { } } - conns := mustCreateConnsZk(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) + conns := mustCreateConnsZk(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber) for i := range conns { - if gcfg.BenchmarkOptions.SameKey { + if gcfg.ConfigClientMachineBenchmarkOptions.SameKey { rhs[i] = newPutOverwriteZK(conns[i]) } else { rhs[i] = newPutCreateZK(conns[i]) @@ -410,7 +410,7 @@ func newWriteHandlers(gcfg TestGroup) (rhs []ReqHandler, done func()) { } } case "consul", "cetcd": - conns := mustCreateConnsConsul(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) + conns := mustCreateConnsConsul(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber) for i := range conns { rhs[i] = newPutConsul(conns[i]) } @@ -424,8 +424,8 @@ func newWriteHandlers(gcfg TestGroup) (rhs []ReqHandler, done func()) { return } -func newReadOneshotHandlers(gcfg TestGroup) []ReqHandler { - rhs := make([]ReqHandler, gcfg.BenchmarkOptions.ClientNumber) +func newReadOneshotHandlers(gcfg dbtesterpb.ConfigClientMachineAgentControl) []ReqHandler { + rhs := make([]ReqHandler, gcfg.ConfigClientMachineBenchmarkOptions.ClientNumber) switch gcfg.DatabaseID { case "etcdv2": for i := range rhs { @@ -448,7 +448,7 @@ func newReadOneshotHandlers(gcfg TestGroup) []ReqHandler { case "zookeeper", "zetcd": for i := range rhs { rhs[i] = func(ctx context.Context, req *request) error { - conns := mustCreateConnsZk(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) + conns := mustCreateConnsZk(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber) defer conns[0].Close() return newGetZK(conns[0])(ctx, req) } @@ -464,18 +464,18 @@ func newReadOneshotHandlers(gcfg TestGroup) []ReqHandler { return rhs } -func generateReads(gcfg TestGroup, key string, inflightReqs chan<- request) { +func generateReads(gcfg dbtesterpb.ConfigClientMachineAgentControl, key string, inflightReqs chan<- request) { defer close(inflightReqs) var rateLimiter *rate.Limiter - if gcfg.BenchmarkOptions.RateLimitRequestsPerSecond > 0 { + if gcfg.ConfigClientMachineBenchmarkOptions.RateLimitRequestsPerSecond > 0 { rateLimiter = rate.NewLimiter( - rate.Limit(gcfg.BenchmarkOptions.RateLimitRequestsPerSecond), - int(gcfg.BenchmarkOptions.RateLimitRequestsPerSecond), + rate.Limit(gcfg.ConfigClientMachineBenchmarkOptions.RateLimitRequestsPerSecond), + int(gcfg.ConfigClientMachineBenchmarkOptions.RateLimitRequestsPerSecond), ) } - for i := int64(0); i < gcfg.BenchmarkOptions.RequestNumber; i++ { + for i := int64(0); i < gcfg.ConfigClientMachineBenchmarkOptions.RequestNumber; i++ { if rateLimiter != nil { rateLimiter.Wait(context.TODO()) } @@ -487,21 +487,21 @@ func generateReads(gcfg TestGroup, key string, inflightReqs chan<- request) { case "etcdv3", "etcdtip": opts := []clientv3.OpOption{clientv3.WithRange("")} - if gcfg.BenchmarkOptions.StaleRead { + if gcfg.ConfigClientMachineBenchmarkOptions.StaleRead { opts = append(opts, clientv3.WithSerializable()) } inflightReqs <- request{etcdv3Op: clientv3.OpGet(key, opts...)} case "zookeeper", "zetcd": op := zkOp{key: key} - if gcfg.BenchmarkOptions.StaleRead { + if gcfg.ConfigClientMachineBenchmarkOptions.StaleRead { op.staleRead = true } inflightReqs <- request{zkOp: op} case "consul", "cetcd": op := consulOp{key: key} - if gcfg.BenchmarkOptions.StaleRead { + if gcfg.ConfigClientMachineBenchmarkOptions.StaleRead { op.staleRead = true } inflightReqs <- request{consulOp: op} @@ -509,12 +509,12 @@ func generateReads(gcfg TestGroup, key string, inflightReqs chan<- request) { } } -func generateWrites(gcfg TestGroup, startIdx int64, vals values, inflightReqs chan<- request) { +func generateWrites(gcfg dbtesterpb.ConfigClientMachineAgentControl, startIdx int64, vals values, inflightReqs chan<- request) { var rateLimiter *rate.Limiter - if gcfg.BenchmarkOptions.RateLimitRequestsPerSecond > 0 { + if gcfg.ConfigClientMachineBenchmarkOptions.RateLimitRequestsPerSecond > 0 { rateLimiter = rate.NewLimiter( - rate.Limit(gcfg.BenchmarkOptions.RateLimitRequestsPerSecond), - int(gcfg.BenchmarkOptions.RateLimitRequestsPerSecond), + rate.Limit(gcfg.ConfigClientMachineBenchmarkOptions.RateLimitRequestsPerSecond), + int(gcfg.ConfigClientMachineBenchmarkOptions.RateLimitRequestsPerSecond), ) } @@ -524,10 +524,10 @@ func generateWrites(gcfg TestGroup, startIdx int64, vals values, inflightReqs ch wg.Wait() }() - for i := int64(0); i < gcfg.BenchmarkOptions.RequestNumber; i++ { - k := sequentialKey(gcfg.BenchmarkOptions.KeySizeBytes, i+startIdx) - if gcfg.BenchmarkOptions.SameKey { - k = sameKey(gcfg.BenchmarkOptions.KeySizeBytes) + for i := int64(0); i < gcfg.ConfigClientMachineBenchmarkOptions.RequestNumber; i++ { + k := sequentialKey(gcfg.ConfigClientMachineBenchmarkOptions.KeySizeBytes, i+startIdx) + if gcfg.ConfigClientMachineBenchmarkOptions.SameKey { + k = sameKey(gcfg.ConfigClientMachineBenchmarkOptions.KeySizeBytes) } v := vals.bytes[i%int64(vals.sampleSize)]