dbtester: update others with config proto updates

This commit is contained in:
Gyu-Ho Lee 2017-02-23 08:03:24 -08:00
parent dc4f09b0b1
commit b73c62d7fb
No known key found for this signature in database
GPG Key ID: 1DDD39C7EB70C24C
5 changed files with 96 additions and 81 deletions

View File

@ -24,8 +24,8 @@ import (
) )
// BroadcaseRequest sends request to all endpoints. // BroadcaseRequest sends request to all endpoints.
func (cfg *Config) BroadcaseRequest(databaseID string, op dbtesterpb.Request_Operation) (map[int]dbtesterpb.Response, error) { func (cfg *Config) BroadcaseRequest(databaseID string, op dbtesterpb.Operation) (map[int]dbtesterpb.Response, error) {
gcfg, ok := cfg.DatabaseIDToTestGroup[databaseID] gcfg, ok := cfg.DatabaseIDToConfigClientMachineAgentControl[databaseID]
if !ok { if !ok {
return nil, fmt.Errorf("database id %q does not exist", databaseID) return nil, fmt.Errorf("database id %q does not exist", databaseID)
} }

View File

@ -1,3 +1,17 @@
// Copyright 2017 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dbtester package dbtester
import ( import (
@ -8,7 +22,7 @@ import (
// WriteREADME writes README. // WriteREADME writes README.
func (cfg *Config) WriteREADME(summary string) error { func (cfg *Config) WriteREADME(summary string) error {
plog.Printf("writing README at %q", cfg.README.OutputPath) plog.Printf("writing README at %q", cfg.ConfigAnalyzeMachineREADME.OutputPath)
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
buf.WriteString("\n\n") buf.WriteString("\n\n")
@ -33,5 +47,5 @@ func (cfg *Config) WriteREADME(summary string) error {
buf.WriteString("\n\n") buf.WriteString("\n\n")
} }
return toFile(buf.String(), cfg.README.OutputPath) return toFile(buf.String(), cfg.ConfigAnalyzeMachineREADME.OutputPath)
} }

View File

@ -20,6 +20,7 @@ import (
"time" "time"
"github.com/cheggaaa/pb" "github.com/cheggaaa/pb"
"github.com/coreos/dbtester/dbtesterpb"
"github.com/coreos/etcd/pkg/report" "github.com/coreos/etcd/pkg/report"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
@ -135,8 +136,8 @@ func printStats(st report.Stats) {
} }
} }
func (cfg *Config) generateReport(gcfg TestGroup, h []ReqHandler, reqDone func(), reqGen func(chan<- request)) { func (cfg *Config) generateReport(gcfg dbtesterpb.ConfigClientMachineAgentControl, h []ReqHandler, reqDone func(), reqGen func(chan<- request)) {
b := newBenchmark(gcfg.RequestNumber, gcfg.ClientNumber, h, reqDone, reqGen) b := newBenchmark(gcfg.ConfigClientMachineBenchmarkOptions.RequestNumber, gcfg.ConfigClientMachineBenchmarkOptions.ClientNumber, h, reqDone, reqGen)
b.startRequests() b.startRequests()
b.waitAll() b.waitAll()

View File

@ -38,7 +38,7 @@ var DiskSpaceUsageSummaryColumns = []string{
// SaveDiskSpaceUsageSummary saves data size summary. // SaveDiskSpaceUsageSummary saves data size summary.
func (cfg *Config) SaveDiskSpaceUsageSummary(databaseID string, idxToResponse map[int]dbtesterpb.Response) error { func (cfg *Config) SaveDiskSpaceUsageSummary(databaseID string, idxToResponse map[int]dbtesterpb.Response) error {
gcfg, ok := cfg.DatabaseIDToTestGroup[databaseID] gcfg, ok := cfg.DatabaseIDToConfigClientMachineAgentControl[databaseID]
if !ok { if !ok {
return fmt.Errorf("%q does not exist", databaseID) return fmt.Errorf("%q does not exist", databaseID)
} }
@ -68,7 +68,7 @@ func (cfg *Config) SaveDiskSpaceUsageSummary(databaseID string, idxToResponse ma
return err return err
} }
return fr.CSV(cfg.Control.ServerDiskSpaceUsageSummaryPath) return fr.CSV(cfg.ConfigClientMachineInitial.ServerDiskSpaceUsageSummaryPath)
} }
func (cfg *Config) saveDataLatencyDistributionSummary(st report.Stats) { func (cfg *Config) saveDataLatencyDistributionSummary(st report.Stats) {
@ -126,7 +126,7 @@ func (cfg *Config) saveDataLatencyDistributionSummary(st report.Stats) {
} }
} }
if err := fr.CSVHorizontal(cfg.Control.ClientLatencyDistributionSummaryPath); err != nil { if err := fr.CSVHorizontal(cfg.ConfigClientMachineInitial.ClientLatencyDistributionSummaryPath); err != nil {
plog.Fatal(err) plog.Fatal(err)
} }
} }
@ -152,7 +152,7 @@ func (cfg *Config) saveDataLatencyDistributionPercentile(st report.Stats) {
if err := fr.AddColumn(c2); err != nil { if err := fr.AddColumn(c2); err != nil {
plog.Fatal(err) plog.Fatal(err)
} }
if err := fr.CSV(cfg.Control.ClientLatencyDistributionPercentilePath); err != nil { if err := fr.CSV(cfg.ConfigClientMachineInitial.ClientLatencyDistributionPercentilePath); err != nil {
plog.Fatal(err) plog.Fatal(err)
} }
} }
@ -205,16 +205,16 @@ func (cfg *Config) saveDataLatencyDistributionAll(st report.Stats) {
if err := fr.AddColumn(c2); err != nil { if err := fr.AddColumn(c2); err != nil {
plog.Fatal(err) plog.Fatal(err)
} }
if err := fr.CSV(cfg.Control.ClientLatencyDistributionAllPath); err != nil { if err := fr.CSV(cfg.ConfigClientMachineInitial.ClientLatencyDistributionAllPath); err != nil {
plog.Fatal(err) plog.Fatal(err)
} }
} }
func (cfg *Config) saveDataLatencyThroughputTimeseries(gcfg TestGroup, st report.Stats, clientNs []int64) { func (cfg *Config) saveDataLatencyThroughputTimeseries(gcfg dbtesterpb.ConfigClientMachineAgentControl, st report.Stats, clientNs []int64) {
if len(clientNs) == 0 && len(gcfg.ConnectionClientNumbers) == 0 { if len(clientNs) == 0 && len(gcfg.ConfigClientMachineBenchmarkOptions.ConnectionClientNumbers) == 0 {
clientNs = make([]int64, len(st.TimeSeries)) clientNs = make([]int64, len(st.TimeSeries))
for i := range clientNs { for i := range clientNs {
clientNs[i] = gcfg.BenchmarkOptions.ClientNumber clientNs[i] = gcfg.ConfigClientMachineBenchmarkOptions.ClientNumber
} }
} }
c1 := dataframe.NewColumn("UNIX-SECOND") c1 := dataframe.NewColumn("UNIX-SECOND")
@ -253,12 +253,12 @@ func (cfg *Config) saveDataLatencyThroughputTimeseries(gcfg TestGroup, st report
plog.Fatal(err) plog.Fatal(err)
} }
if err := fr.CSV(cfg.Control.ClientLatencyThroughputTimeseriesPath); err != nil { if err := fr.CSV(cfg.ConfigClientMachineInitial.ClientLatencyThroughputTimeseriesPath); err != nil {
plog.Fatal(err) plog.Fatal(err)
} }
// aggregate latency by the number of keys // aggregate latency by the number of keys
tss := FindRangesLatency(st.TimeSeries, 1000, gcfg.RequestNumber) tss := FindRangesLatency(st.TimeSeries, 1000, gcfg.ConfigClientMachineBenchmarkOptions.RequestNumber)
ctt1 := dataframe.NewColumn("KEYS") ctt1 := dataframe.NewColumn("KEYS")
ctt2 := dataframe.NewColumn("MIN-LATENCY-MS") ctt2 := dataframe.NewColumn("MIN-LATENCY-MS")
ctt3 := dataframe.NewColumn("AVG-LATENCY-MS") ctt3 := dataframe.NewColumn("AVG-LATENCY-MS")
@ -284,12 +284,12 @@ func (cfg *Config) saveDataLatencyThroughputTimeseries(gcfg TestGroup, st report
plog.Fatal(err) plog.Fatal(err)
} }
if err := frr.CSV(cfg.Control.ClientLatencyByKeyNumberPath); err != nil { if err := frr.CSV(cfg.ConfigClientMachineInitial.ClientLatencyByKeyNumberPath); err != nil {
plog.Fatal(err) plog.Fatal(err)
} }
} }
func (cfg *Config) saveAllStats(gcfg TestGroup, stats report.Stats, clientNs []int64) { func (cfg *Config) saveAllStats(gcfg dbtesterpb.ConfigClientMachineAgentControl, stats report.Stats, clientNs []int64) {
cfg.saveDataLatencyDistributionSummary(stats) cfg.saveDataLatencyDistributionSummary(stats)
cfg.saveDataLatencyDistributionPercentile(stats) cfg.saveDataLatencyDistributionPercentile(stats)
cfg.saveDataLatencyDistributionAll(stats) cfg.saveDataLatencyDistributionAll(stats)
@ -298,14 +298,14 @@ func (cfg *Config) saveAllStats(gcfg TestGroup, stats report.Stats, clientNs []i
// UploadToGoogle uploads target file to Google Cloud Storage. // UploadToGoogle uploads target file to Google Cloud Storage.
func (cfg *Config) UploadToGoogle(databaseID string, targetPath string) error { func (cfg *Config) UploadToGoogle(databaseID string, targetPath string) error {
gcfg, ok := cfg.DatabaseIDToTestGroup[databaseID] gcfg, ok := cfg.DatabaseIDToConfigClientMachineAgentControl[databaseID]
if !ok { if !ok {
return fmt.Errorf("%q does not exist", databaseID) return fmt.Errorf("%q does not exist", databaseID)
} }
if !exist(targetPath) { if !exist(targetPath) {
return fmt.Errorf("%q does not exist", targetPath) return fmt.Errorf("%q does not exist", targetPath)
} }
u, err := remotestorage.NewGoogleCloudStorage([]byte(cfg.Control.GoogleCloudStorageKey), cfg.Control.GoogleCloudProjectName) u, err := remotestorage.NewGoogleCloudStorage([]byte(cfg.ConfigClientMachineInitial.GoogleCloudStorageKey), cfg.ConfigClientMachineInitial.GoogleCloudProjectName)
if err != nil { if err != nil {
return err return err
} }
@ -315,11 +315,11 @@ func (cfg *Config) UploadToGoogle(databaseID string, targetPath string) error {
if !strings.HasPrefix(dstPath, gcfg.DatabaseTag) { if !strings.HasPrefix(dstPath, gcfg.DatabaseTag) {
dstPath = fmt.Sprintf("%s-%s", gcfg.DatabaseTag, dstPath) dstPath = fmt.Sprintf("%s-%s", gcfg.DatabaseTag, dstPath)
} }
dstPath = filepath.Join(cfg.Control.GoogleCloudStorageSubDirectory, dstPath) dstPath = filepath.Join(cfg.ConfigClientMachineInitial.GoogleCloudStorageSubDirectory, dstPath)
var uerr error var uerr error
for k := 0; k < 30; k++ { for k := 0; k < 30; k++ {
if uerr = u.UploadFile(cfg.Control.GoogleCloudStorageBucketName, srcPath, dstPath); uerr != nil { if uerr = u.UploadFile(cfg.ConfigClientMachineInitial.GoogleCloudStorageBucketName, srcPath, dstPath); uerr != nil {
plog.Printf("#%d: error %v while uploading %q", k, uerr, targetPath) plog.Printf("#%d: error %v while uploading %q", k, uerr, targetPath)
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
continue continue

118
stress.go
View File

@ -36,8 +36,8 @@ type values struct {
sampleSize int sampleSize int
} }
func newValues(gcfg TestGroup) (v values, rerr error) { func newValues(gcfg dbtesterpb.ConfigClientMachineAgentControl) (v values, rerr error) {
v.bytes = [][]byte{randBytes(gcfg.BenchmarkOptions.ValueSizeBytes)} v.bytes = [][]byte{randBytes(gcfg.ConfigClientMachineBenchmarkOptions.ValueSizeBytes)}
v.strings = []string{string(v.bytes[0])} v.strings = []string{string(v.bytes[0])}
v.sampleSize = 1 v.sampleSize = 1
return return
@ -45,7 +45,7 @@ func newValues(gcfg TestGroup) (v values, rerr error) {
// Stress stresses the database. // Stress stresses the database.
func (cfg *Config) Stress(databaseID string) error { func (cfg *Config) Stress(databaseID string) error {
gcfg, ok := cfg.DatabaseIDToTestGroup[databaseID] gcfg, ok := cfg.DatabaseIDToConfigClientMachineAgentControl[databaseID]
if !ok { if !ok {
return fmt.Errorf("%q does not exist", databaseID) return fmt.Errorf("%q does not exist", databaseID)
} }
@ -55,40 +55,40 @@ func (cfg *Config) Stress(databaseID string) error {
return err return err
} }
switch gcfg.BenchmarkOptions.Type { switch gcfg.ConfigClientMachineBenchmarkOptions.Type {
case "write": case "write":
plog.Println("write generateReport is started...") plog.Println("write generateReport is started...")
// fixed number of client numbers // fixed number of client numbers
if len(gcfg.BenchmarkOptions.ConnectionClientNumbers) == 0 { if len(gcfg.ConfigClientMachineBenchmarkOptions.ConnectionClientNumbers) == 0 {
h, done := newWriteHandlers(gcfg) h, done := newWriteHandlers(gcfg)
reqGen := func(inflightReqs chan<- request) { generateWrites(gcfg, 0, vals, inflightReqs) } reqGen := func(inflightReqs chan<- request) { generateWrites(gcfg, 0, vals, inflightReqs) }
cfg.generateReport(gcfg, h, done, reqGen) cfg.generateReport(gcfg, h, done, reqGen)
} else { } else {
// variable client numbers // variable client numbers
rs := assignRequest(gcfg.BenchmarkOptions.ConnectionClientNumbers, gcfg.BenchmarkOptions.RequestNumber) rs := assignRequest(gcfg.ConfigClientMachineBenchmarkOptions.ConnectionClientNumbers, gcfg.ConfigClientMachineBenchmarkOptions.RequestNumber)
var stats []report.Stats var stats []report.Stats
reqCompleted := int64(0) reqCompleted := int64(0)
for i := 0; i < len(rs); i++ { for i := 0; i < len(rs); i++ {
copied := gcfg copied := gcfg
copied.BenchmarkOptions.ConnectionNumber = gcfg.BenchmarkOptions.ConnectionClientNumbers[i] copied.ConfigClientMachineBenchmarkOptions.ConnectionNumber = gcfg.ConfigClientMachineBenchmarkOptions.ConnectionClientNumbers[i]
copied.BenchmarkOptions.ClientNumber = gcfg.BenchmarkOptions.ConnectionClientNumbers[i] copied.ConfigClientMachineBenchmarkOptions.ClientNumber = gcfg.ConfigClientMachineBenchmarkOptions.ConnectionClientNumbers[i]
copied.BenchmarkOptions.RequestNumber = rs[i] copied.ConfigClientMachineBenchmarkOptions.RequestNumber = rs[i]
ncfg := *cfg ncfg := *cfg
ncfg.DatabaseIDToTestGroup[databaseID] = copied ncfg.DatabaseIDToConfigClientMachineAgentControl[databaseID] = copied
go func() { go func() {
plog.Infof("signaling agent with client number %d", copied.BenchmarkOptions.ClientNumber) plog.Infof("signaling agent with client number %d", copied.ConfigClientMachineBenchmarkOptions.ClientNumber)
if _, err := (&ncfg).BroadcaseRequest(databaseID, dbtesterpb.Request_Heartbeat); err != nil { if _, err := (&ncfg).BroadcaseRequest(databaseID, dbtesterpb.Operation_Heartbeat); err != nil {
plog.Panic(err) plog.Panic(err)
} }
}() }()
h, done := newWriteHandlers(copied) h, done := newWriteHandlers(copied)
reqGen := func(inflightReqs chan<- request) { generateWrites(copied, reqCompleted, vals, inflightReqs) } reqGen := func(inflightReqs chan<- request) { generateWrites(copied, reqCompleted, vals, inflightReqs) }
b := newBenchmark(copied.BenchmarkOptions.RequestNumber, copied.BenchmarkOptions.ClientNumber, h, done, reqGen) b := newBenchmark(copied.ConfigClientMachineBenchmarkOptions.RequestNumber, copied.ConfigClientMachineBenchmarkOptions.ClientNumber, h, done, reqGen)
// wait until rs[i] requests are finished // wait until rs[i] requests are finished
// do not end reports yet // do not end reports yet
@ -106,7 +106,7 @@ func (cfg *Config) Stress(databaseID string) error {
plog.Info("combining all reports") plog.Info("combining all reports")
combined := report.Stats{ErrorDist: make(map[string]int)} combined := report.Stats{ErrorDist: make(map[string]int)}
combinedClientNumber := make([]int64, 0, gcfg.BenchmarkOptions.RequestNumber) combinedClientNumber := make([]int64, 0, gcfg.ConfigClientMachineBenchmarkOptions.RequestNumber)
for i, st := range stats { for i, st := range stats {
combined.AvgTotal += st.AvgTotal combined.AvgTotal += st.AvgTotal
combined.Total += st.Total combined.Total += st.Total
@ -128,7 +128,7 @@ func (cfg *Config) Stress(databaseID string) error {
// So now we have two duplicate unix time seconds. // So now we have two duplicate unix time seconds.
// This will be handled in aggregating by keys. // This will be handled in aggregating by keys.
// //
clientN := gcfg.BenchmarkOptions.ConnectionClientNumbers[i] clientN := gcfg.ConfigClientMachineBenchmarkOptions.ConnectionClientNumbers[i]
clientNs := make([]int64, len(st.TimeSeries)) clientNs := make([]int64, len(st.TimeSeries))
for i := range st.TimeSeries { for i := range st.TimeSeries {
clientNs[i] = clientN clientNs[i] = clientN
@ -184,18 +184,18 @@ func (cfg *Config) Stress(databaseID string) error {
} }
for k, v := range totalKeysFunc(gcfg.DatabaseEndpoints) { for k, v := range totalKeysFunc(gcfg.DatabaseEndpoints) {
plog.Infof("expected write total results [expected_total: %d | database: %q | endpoint: %q | number_of_keys: %d]", plog.Infof("expected write total results [expected_total: %d | database: %q | endpoint: %q | number_of_keys: %d]",
gcfg.BenchmarkOptions.RequestNumber, gcfg.DatabaseID, k, v) gcfg.ConfigClientMachineBenchmarkOptions.RequestNumber, gcfg.DatabaseID, k, v)
} }
case "read": case "read":
key, value := sameKey(gcfg.BenchmarkOptions.KeySizeBytes), vals.strings[0] key, value := sameKey(gcfg.ConfigClientMachineBenchmarkOptions.KeySizeBytes), vals.strings[0]
switch gcfg.DatabaseID { switch gcfg.DatabaseID {
case "etcdv2": case "etcdv2":
plog.Infof("write started [request: PUT | key: %q | database: %q]", key, gcfg.DatabaseID) plog.Infof("write started [request: PUT | key: %q | database: %q]", key, gcfg.DatabaseID)
var err error var err error
for i := 0; i < 7; i++ { for i := 0; i < 7; i++ {
clients := mustCreateClientsEtcdv2(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) clients := mustCreateClientsEtcdv2(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber)
_, err = clients[0].Set(context.Background(), key, value, nil) _, err = clients[0].Set(context.Background(), key, value, nil)
if err != nil { if err != nil {
continue continue
@ -232,7 +232,7 @@ func (cfg *Config) Stress(databaseID string) error {
plog.Infof("write started [request: PUT | key: %q | database: %q]", key, gcfg.DatabaseID) plog.Infof("write started [request: PUT | key: %q | database: %q]", key, gcfg.DatabaseID)
var err error var err error
for i := 0; i < 7; i++ { for i := 0; i < 7; i++ {
conns := mustCreateConnsZk(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) conns := mustCreateConnsZk(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber)
_, err = conns[0].Create("/"+key, vals.bytes[0], zkCreateFlags, zkCreateACL) _, err = conns[0].Create("/"+key, vals.bytes[0], zkCreateFlags, zkCreateACL)
if err != nil { if err != nil {
continue continue
@ -252,7 +252,7 @@ func (cfg *Config) Stress(databaseID string) error {
plog.Infof("write started [request: PUT | key: %q | database: %q]", key, gcfg.DatabaseID) plog.Infof("write started [request: PUT | key: %q | database: %q]", key, gcfg.DatabaseID)
var err error var err error
for i := 0; i < 7; i++ { for i := 0; i < 7; i++ {
clients := mustCreateConnsConsul(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) clients := mustCreateConnsConsul(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber)
_, err = clients[0].Put(&consulapi.KVPair{Key: key, Value: vals.bytes[0]}, nil) _, err = clients[0].Put(&consulapi.KVPair{Key: key, Value: vals.bytes[0]}, nil)
if err != nil { if err != nil {
continue continue
@ -272,7 +272,7 @@ func (cfg *Config) Stress(databaseID string) error {
plog.Println("read generateReport is finished...") plog.Println("read generateReport is finished...")
case "read-oneshot": case "read-oneshot":
key, value := sameKey(gcfg.BenchmarkOptions.KeySizeBytes), vals.strings[0] key, value := sameKey(gcfg.ConfigClientMachineBenchmarkOptions.KeySizeBytes), vals.strings[0]
plog.Infof("writing key for read-oneshot [key: %q | database: %q]", key, gcfg.DatabaseID) plog.Infof("writing key for read-oneshot [key: %q | database: %q]", key, gcfg.DatabaseID)
var err error var err error
switch gcfg.DatabaseID { switch gcfg.DatabaseID {
@ -311,18 +311,18 @@ func (cfg *Config) Stress(databaseID string) error {
return nil return nil
} }
func newReadHandlers(gcfg TestGroup) (rhs []ReqHandler, done func()) { func newReadHandlers(gcfg dbtesterpb.ConfigClientMachineAgentControl) (rhs []ReqHandler, done func()) {
rhs = make([]ReqHandler, gcfg.BenchmarkOptions.ClientNumber) rhs = make([]ReqHandler, gcfg.ConfigClientMachineBenchmarkOptions.ClientNumber)
switch gcfg.DatabaseID { switch gcfg.DatabaseID {
case "etcdv2": case "etcdv2":
conns := mustCreateClientsEtcdv2(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) conns := mustCreateClientsEtcdv2(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber)
for i := range conns { for i := range conns {
rhs[i] = newGetEtcd2(conns[i]) rhs[i] = newGetEtcd2(conns[i])
} }
case "etcdv3", "etcdtip": case "etcdv3", "etcdtip":
clients := mustCreateClientsEtcdv3(gcfg.DatabaseEndpoints, etcdv3ClientCfg{ clients := mustCreateClientsEtcdv3(gcfg.DatabaseEndpoints, etcdv3ClientCfg{
totalConns: gcfg.BenchmarkOptions.ConnectionNumber, totalConns: gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber,
totalClients: gcfg.BenchmarkOptions.ClientNumber, totalClients: gcfg.ConfigClientMachineBenchmarkOptions.ClientNumber,
}) })
for i := range clients { for i := range clients {
rhs[i] = newGetEtcd3(clients[i].KV) rhs[i] = newGetEtcd3(clients[i].KV)
@ -333,7 +333,7 @@ func newReadHandlers(gcfg TestGroup) (rhs []ReqHandler, done func()) {
} }
} }
case "zookeeper", "zetcd": case "zookeeper", "zetcd":
conns := mustCreateConnsZk(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) conns := mustCreateConnsZk(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber)
for i := range conns { for i := range conns {
rhs[i] = newGetZK(conns[i]) rhs[i] = newGetZK(conns[i])
} }
@ -343,7 +343,7 @@ func newReadHandlers(gcfg TestGroup) (rhs []ReqHandler, done func()) {
} }
} }
case "consul", "cetcd": case "consul", "cetcd":
conns := mustCreateConnsConsul(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) conns := mustCreateConnsConsul(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber)
for i := range conns { for i := range conns {
rhs[i] = newGetConsul(conns[i]) rhs[i] = newGetConsul(conns[i])
} }
@ -351,18 +351,18 @@ func newReadHandlers(gcfg TestGroup) (rhs []ReqHandler, done func()) {
return rhs, done return rhs, done
} }
func newWriteHandlers(gcfg TestGroup) (rhs []ReqHandler, done func()) { func newWriteHandlers(gcfg dbtesterpb.ConfigClientMachineAgentControl) (rhs []ReqHandler, done func()) {
rhs = make([]ReqHandler, gcfg.BenchmarkOptions.ClientNumber) rhs = make([]ReqHandler, gcfg.ConfigClientMachineBenchmarkOptions.ClientNumber)
switch gcfg.DatabaseID { switch gcfg.DatabaseID {
case "etcdv2": case "etcdv2":
conns := mustCreateClientsEtcdv2(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) conns := mustCreateClientsEtcdv2(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber)
for i := range conns { for i := range conns {
rhs[i] = newPutEtcd2(conns[i]) rhs[i] = newPutEtcd2(conns[i])
} }
case "etcdv3", "etcdtip": case "etcdv3", "etcdtip":
etcdClients := mustCreateClientsEtcdv3(gcfg.DatabaseEndpoints, etcdv3ClientCfg{ etcdClients := mustCreateClientsEtcdv3(gcfg.DatabaseEndpoints, etcdv3ClientCfg{
totalConns: gcfg.BenchmarkOptions.ConnectionNumber, totalConns: gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber,
totalClients: gcfg.BenchmarkOptions.ClientNumber, totalClients: gcfg.ConfigClientMachineBenchmarkOptions.ClientNumber,
}) })
for i := range etcdClients { for i := range etcdClients {
rhs[i] = newPutEtcd3(etcdClients[i]) rhs[i] = newPutEtcd3(etcdClients[i])
@ -373,13 +373,13 @@ func newWriteHandlers(gcfg TestGroup) (rhs []ReqHandler, done func()) {
} }
} }
case "zookeeper", "zetcd": case "zookeeper", "zetcd":
if gcfg.BenchmarkOptions.SameKey { if gcfg.ConfigClientMachineBenchmarkOptions.SameKey {
key := sameKey(gcfg.BenchmarkOptions.KeySizeBytes) key := sameKey(gcfg.ConfigClientMachineBenchmarkOptions.KeySizeBytes)
valueBts := randBytes(gcfg.BenchmarkOptions.ValueSizeBytes) valueBts := randBytes(gcfg.ConfigClientMachineBenchmarkOptions.ValueSizeBytes)
plog.Infof("write started [request: PUT | key: %q | database: %q]", key, gcfg.DatabaseID) plog.Infof("write started [request: PUT | key: %q | database: %q]", key, gcfg.DatabaseID)
var err error var err error
for i := 0; i < 7; i++ { for i := 0; i < 7; i++ {
conns := mustCreateConnsZk(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) conns := mustCreateConnsZk(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber)
_, err = conns[0].Create("/"+key, valueBts, zkCreateFlags, zkCreateACL) _, err = conns[0].Create("/"+key, valueBts, zkCreateFlags, zkCreateACL)
if err != nil { if err != nil {
continue continue
@ -396,9 +396,9 @@ func newWriteHandlers(gcfg TestGroup) (rhs []ReqHandler, done func()) {
} }
} }
conns := mustCreateConnsZk(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) conns := mustCreateConnsZk(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber)
for i := range conns { for i := range conns {
if gcfg.BenchmarkOptions.SameKey { if gcfg.ConfigClientMachineBenchmarkOptions.SameKey {
rhs[i] = newPutOverwriteZK(conns[i]) rhs[i] = newPutOverwriteZK(conns[i])
} else { } else {
rhs[i] = newPutCreateZK(conns[i]) rhs[i] = newPutCreateZK(conns[i])
@ -410,7 +410,7 @@ func newWriteHandlers(gcfg TestGroup) (rhs []ReqHandler, done func()) {
} }
} }
case "consul", "cetcd": case "consul", "cetcd":
conns := mustCreateConnsConsul(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) conns := mustCreateConnsConsul(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber)
for i := range conns { for i := range conns {
rhs[i] = newPutConsul(conns[i]) rhs[i] = newPutConsul(conns[i])
} }
@ -424,8 +424,8 @@ func newWriteHandlers(gcfg TestGroup) (rhs []ReqHandler, done func()) {
return return
} }
func newReadOneshotHandlers(gcfg TestGroup) []ReqHandler { func newReadOneshotHandlers(gcfg dbtesterpb.ConfigClientMachineAgentControl) []ReqHandler {
rhs := make([]ReqHandler, gcfg.BenchmarkOptions.ClientNumber) rhs := make([]ReqHandler, gcfg.ConfigClientMachineBenchmarkOptions.ClientNumber)
switch gcfg.DatabaseID { switch gcfg.DatabaseID {
case "etcdv2": case "etcdv2":
for i := range rhs { for i := range rhs {
@ -448,7 +448,7 @@ func newReadOneshotHandlers(gcfg TestGroup) []ReqHandler {
case "zookeeper", "zetcd": case "zookeeper", "zetcd":
for i := range rhs { for i := range rhs {
rhs[i] = func(ctx context.Context, req *request) error { rhs[i] = func(ctx context.Context, req *request) error {
conns := mustCreateConnsZk(gcfg.DatabaseEndpoints, gcfg.BenchmarkOptions.ConnectionNumber) conns := mustCreateConnsZk(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber)
defer conns[0].Close() defer conns[0].Close()
return newGetZK(conns[0])(ctx, req) return newGetZK(conns[0])(ctx, req)
} }
@ -464,18 +464,18 @@ func newReadOneshotHandlers(gcfg TestGroup) []ReqHandler {
return rhs return rhs
} }
func generateReads(gcfg TestGroup, key string, inflightReqs chan<- request) { func generateReads(gcfg dbtesterpb.ConfigClientMachineAgentControl, key string, inflightReqs chan<- request) {
defer close(inflightReqs) defer close(inflightReqs)
var rateLimiter *rate.Limiter var rateLimiter *rate.Limiter
if gcfg.BenchmarkOptions.RateLimitRequestsPerSecond > 0 { if gcfg.ConfigClientMachineBenchmarkOptions.RateLimitRequestsPerSecond > 0 {
rateLimiter = rate.NewLimiter( rateLimiter = rate.NewLimiter(
rate.Limit(gcfg.BenchmarkOptions.RateLimitRequestsPerSecond), rate.Limit(gcfg.ConfigClientMachineBenchmarkOptions.RateLimitRequestsPerSecond),
int(gcfg.BenchmarkOptions.RateLimitRequestsPerSecond), int(gcfg.ConfigClientMachineBenchmarkOptions.RateLimitRequestsPerSecond),
) )
} }
for i := int64(0); i < gcfg.BenchmarkOptions.RequestNumber; i++ { for i := int64(0); i < gcfg.ConfigClientMachineBenchmarkOptions.RequestNumber; i++ {
if rateLimiter != nil { if rateLimiter != nil {
rateLimiter.Wait(context.TODO()) rateLimiter.Wait(context.TODO())
} }
@ -487,21 +487,21 @@ func generateReads(gcfg TestGroup, key string, inflightReqs chan<- request) {
case "etcdv3", "etcdtip": case "etcdv3", "etcdtip":
opts := []clientv3.OpOption{clientv3.WithRange("")} opts := []clientv3.OpOption{clientv3.WithRange("")}
if gcfg.BenchmarkOptions.StaleRead { if gcfg.ConfigClientMachineBenchmarkOptions.StaleRead {
opts = append(opts, clientv3.WithSerializable()) opts = append(opts, clientv3.WithSerializable())
} }
inflightReqs <- request{etcdv3Op: clientv3.OpGet(key, opts...)} inflightReqs <- request{etcdv3Op: clientv3.OpGet(key, opts...)}
case "zookeeper", "zetcd": case "zookeeper", "zetcd":
op := zkOp{key: key} op := zkOp{key: key}
if gcfg.BenchmarkOptions.StaleRead { if gcfg.ConfigClientMachineBenchmarkOptions.StaleRead {
op.staleRead = true op.staleRead = true
} }
inflightReqs <- request{zkOp: op} inflightReqs <- request{zkOp: op}
case "consul", "cetcd": case "consul", "cetcd":
op := consulOp{key: key} op := consulOp{key: key}
if gcfg.BenchmarkOptions.StaleRead { if gcfg.ConfigClientMachineBenchmarkOptions.StaleRead {
op.staleRead = true op.staleRead = true
} }
inflightReqs <- request{consulOp: op} inflightReqs <- request{consulOp: op}
@ -509,12 +509,12 @@ func generateReads(gcfg TestGroup, key string, inflightReqs chan<- request) {
} }
} }
func generateWrites(gcfg TestGroup, startIdx int64, vals values, inflightReqs chan<- request) { func generateWrites(gcfg dbtesterpb.ConfigClientMachineAgentControl, startIdx int64, vals values, inflightReqs chan<- request) {
var rateLimiter *rate.Limiter var rateLimiter *rate.Limiter
if gcfg.BenchmarkOptions.RateLimitRequestsPerSecond > 0 { if gcfg.ConfigClientMachineBenchmarkOptions.RateLimitRequestsPerSecond > 0 {
rateLimiter = rate.NewLimiter( rateLimiter = rate.NewLimiter(
rate.Limit(gcfg.BenchmarkOptions.RateLimitRequestsPerSecond), rate.Limit(gcfg.ConfigClientMachineBenchmarkOptions.RateLimitRequestsPerSecond),
int(gcfg.BenchmarkOptions.RateLimitRequestsPerSecond), int(gcfg.ConfigClientMachineBenchmarkOptions.RateLimitRequestsPerSecond),
) )
} }
@ -524,10 +524,10 @@ func generateWrites(gcfg TestGroup, startIdx int64, vals values, inflightReqs ch
wg.Wait() wg.Wait()
}() }()
for i := int64(0); i < gcfg.BenchmarkOptions.RequestNumber; i++ { for i := int64(0); i < gcfg.ConfigClientMachineBenchmarkOptions.RequestNumber; i++ {
k := sequentialKey(gcfg.BenchmarkOptions.KeySizeBytes, i+startIdx) k := sequentialKey(gcfg.ConfigClientMachineBenchmarkOptions.KeySizeBytes, i+startIdx)
if gcfg.BenchmarkOptions.SameKey { if gcfg.ConfigClientMachineBenchmarkOptions.SameKey {
k = sameKey(gcfg.BenchmarkOptions.KeySizeBytes) k = sameKey(gcfg.ConfigClientMachineBenchmarkOptions.KeySizeBytes)
} }
v := vals.bytes[i%int64(vals.sampleSize)] v := vals.bytes[i%int64(vals.sampleSize)]