mirror of https://github.com/etcd-io/dbtester.git
stress: drop old databases
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
parent
93d76f6a66
commit
b5eddb098d
62
stress.go
62
stress.go
|
|
@ -174,13 +174,11 @@ func (cfg *Config) Stress(databaseID string) error {
|
|||
plog.Println("checking total keys on", gcfg.DatabaseEndpoints)
|
||||
var totalKeysFunc func([]string) map[string]int64
|
||||
switch gcfg.DatabaseID {
|
||||
case "etcd__v2_3":
|
||||
totalKeysFunc = getTotalKeysEtcdv2
|
||||
case "etcd__v3_1", "etcd__v3_2", "etcd__tip":
|
||||
totalKeysFunc = getTotalKeysEtcdv3
|
||||
case "zookeeper__r3_4_9", "zookeeper__r3_5_2_alpha", "zookeeper__r3_5_3_beta", "zetcd__beta":
|
||||
totalKeysFunc = getTotalKeysZk
|
||||
case "consul__v0_7_5", "consul__v0_8_0", "consul__v0_8_4", "cetcd__beta":
|
||||
case "consul__v0_8_4", "cetcd__beta":
|
||||
totalKeysFunc = getTotalKeysConsul
|
||||
default:
|
||||
plog.Panicf("%q is unknown database ID", gcfg.DatabaseID)
|
||||
|
|
@ -194,23 +192,6 @@ func (cfg *Config) Stress(databaseID string) error {
|
|||
key, value := sameKey(gcfg.ConfigClientMachineBenchmarkOptions.KeySizeBytes), vals.strings[0]
|
||||
|
||||
switch gcfg.DatabaseID {
|
||||
case "etcd__v2_3":
|
||||
plog.Infof("write started [request: PUT | key: %q | database: %q]", key, gcfg.DatabaseID)
|
||||
var err error
|
||||
for i := 0; i < 7; i++ {
|
||||
clients := mustCreateClientsEtcdv2(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber)
|
||||
_, err = clients[0].Set(context.Background(), key, value, nil)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
plog.Infof("write done [request: PUT | key: %q | database: %q]", key, gcfg.DatabaseID)
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
plog.Errorf("write error [request: PUT | key: %q | database: %q]", key, gcfg.DatabaseID)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
case "etcd__v3_1", "etcd__v3_2", "etcd__tip":
|
||||
plog.Infof("write started [request: PUT | key: %q | database: %q]", key, gcfg.DatabaseID)
|
||||
var err error
|
||||
|
|
@ -251,7 +232,7 @@ func (cfg *Config) Stress(databaseID string) error {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
case "consul__v0_7_5", "consul__v0_8_0", "consul__v0_8_4", "cetcd__beta":
|
||||
case "consul__v0_8_4", "cetcd__beta":
|
||||
plog.Infof("write started [request: PUT | key: %q | database: %q]", key, gcfg.DatabaseID)
|
||||
var err error
|
||||
for i := 0; i < 7; i++ {
|
||||
|
|
@ -282,10 +263,6 @@ func (cfg *Config) Stress(databaseID string) error {
|
|||
plog.Infof("writing key for read-oneshot [key: %q | database: %q]", key, gcfg.DatabaseID)
|
||||
var err error
|
||||
switch gcfg.DatabaseID {
|
||||
case "etcd__v2_3":
|
||||
clients := mustCreateClientsEtcdv2(gcfg.DatabaseEndpoints, 1)
|
||||
_, err = clients[0].Set(context.Background(), key, value, nil)
|
||||
|
||||
case "etcd__v3_1", "etcd__v3_2", "etcd__tip":
|
||||
clients := mustCreateClientsEtcdv3(gcfg.DatabaseEndpoints, etcdv3ClientCfg{
|
||||
totalConns: 1,
|
||||
|
|
@ -299,7 +276,7 @@ func (cfg *Config) Stress(databaseID string) error {
|
|||
_, err = conns[0].Create("/"+key, vals.bytes[0], zkCreateFlags, zkCreateACL)
|
||||
conns[0].Close()
|
||||
|
||||
case "consul__v0_7_5", "consul__v0_8_0", "consul__v0_8_4", "cetcd__beta":
|
||||
case "consul__v0_8_4", "cetcd__beta":
|
||||
clients := mustCreateConnsConsul(gcfg.DatabaseEndpoints, 1)
|
||||
_, err = clients[0].Put(&consulapi.KVPair{Key: key, Value: vals.bytes[0]}, nil)
|
||||
|
||||
|
|
@ -323,11 +300,6 @@ func (cfg *Config) Stress(databaseID string) error {
|
|||
func newReadHandlers(gcfg dbtesterpb.ConfigClientMachineAgentControl) (rhs []ReqHandler, done func()) {
|
||||
rhs = make([]ReqHandler, gcfg.ConfigClientMachineBenchmarkOptions.ClientNumber)
|
||||
switch gcfg.DatabaseID {
|
||||
case "etcd__v2_3":
|
||||
conns := mustCreateClientsEtcdv2(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber)
|
||||
for i := range conns {
|
||||
rhs[i] = newGetEtcd2(conns[i])
|
||||
}
|
||||
case "etcd__v3_1", "etcd__v3_2", "etcd__tip":
|
||||
clients := mustCreateClientsEtcdv3(gcfg.DatabaseEndpoints, etcdv3ClientCfg{
|
||||
totalConns: gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber,
|
||||
|
|
@ -351,7 +323,7 @@ func newReadHandlers(gcfg dbtesterpb.ConfigClientMachineAgentControl) (rhs []Req
|
|||
conns[i].Close()
|
||||
}
|
||||
}
|
||||
case "consul__v0_7_5", "consul__v0_8_0", "consul__v0_8_4", "cetcd__beta":
|
||||
case "consul__v0_8_4", "cetcd__beta":
|
||||
conns := mustCreateConnsConsul(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber)
|
||||
for i := range conns {
|
||||
rhs[i] = newGetConsul(conns[i])
|
||||
|
|
@ -365,11 +337,6 @@ func newReadHandlers(gcfg dbtesterpb.ConfigClientMachineAgentControl) (rhs []Req
|
|||
func newWriteHandlers(gcfg dbtesterpb.ConfigClientMachineAgentControl) (rhs []ReqHandler, done func()) {
|
||||
rhs = make([]ReqHandler, gcfg.ConfigClientMachineBenchmarkOptions.ClientNumber)
|
||||
switch gcfg.DatabaseID {
|
||||
case "etcd__v2_3":
|
||||
conns := mustCreateClientsEtcdv2(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber)
|
||||
for i := range conns {
|
||||
rhs[i] = newPutEtcd2(conns[i])
|
||||
}
|
||||
case "etcd__v3_1", "etcd__v3_2", "etcd__tip":
|
||||
etcdClients := mustCreateClientsEtcdv3(gcfg.DatabaseEndpoints, etcdv3ClientCfg{
|
||||
totalConns: gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber,
|
||||
|
|
@ -420,7 +387,7 @@ func newWriteHandlers(gcfg dbtesterpb.ConfigClientMachineAgentControl) (rhs []Re
|
|||
conns[i].Close()
|
||||
}
|
||||
}
|
||||
case "consul__v0_7_5", "consul__v0_8_0", "consul__v0_8_4", "cetcd__beta":
|
||||
case "consul__v0_8_4", "cetcd__beta":
|
||||
conns := mustCreateConnsConsul(gcfg.DatabaseEndpoints, gcfg.ConfigClientMachineBenchmarkOptions.ConnectionNumber)
|
||||
for i := range conns {
|
||||
rhs[i] = newPutConsul(conns[i])
|
||||
|
|
@ -440,13 +407,6 @@ func newWriteHandlers(gcfg dbtesterpb.ConfigClientMachineAgentControl) (rhs []Re
|
|||
func newReadOneshotHandlers(gcfg dbtesterpb.ConfigClientMachineAgentControl) []ReqHandler {
|
||||
rhs := make([]ReqHandler, gcfg.ConfigClientMachineBenchmarkOptions.ClientNumber)
|
||||
switch gcfg.DatabaseID {
|
||||
case "etcd__v2_3":
|
||||
for i := range rhs {
|
||||
rhs[i] = func(ctx context.Context, req *request) error {
|
||||
conns := mustCreateClientsEtcdv2(gcfg.DatabaseEndpoints, 1)
|
||||
return newGetEtcd2(conns[0])(ctx, req)
|
||||
}
|
||||
}
|
||||
case "etcd__v3_1", "etcd__v3_2", "etcd__tip":
|
||||
for i := range rhs {
|
||||
rhs[i] = func(ctx context.Context, req *request) error {
|
||||
|
|
@ -466,7 +426,7 @@ func newReadOneshotHandlers(gcfg dbtesterpb.ConfigClientMachineAgentControl) []R
|
|||
return newGetZK(conns[0])(ctx, req)
|
||||
}
|
||||
}
|
||||
case "consul__v0_7_5", "consul__v0_8_0", "consul__v0_8_4", "cetcd__beta":
|
||||
case "consul__v0_8_4", "cetcd__beta":
|
||||
for i := range rhs {
|
||||
rhs[i] = func(ctx context.Context, req *request) error {
|
||||
conns := mustCreateConnsConsul(gcfg.DatabaseEndpoints, 1)
|
||||
|
|
@ -496,10 +456,6 @@ func generateReads(gcfg dbtesterpb.ConfigClientMachineAgentControl, key string,
|
|||
}
|
||||
|
||||
switch gcfg.DatabaseID {
|
||||
case "etcd__v2_3":
|
||||
// serializable read by default
|
||||
inflightReqs <- request{etcdv2Op: etcdv2Op{key: key}}
|
||||
|
||||
case "etcd__v3_1", "etcd__v3_2", "etcd__tip":
|
||||
opts := []clientv3.OpOption{clientv3.WithRange("")}
|
||||
if gcfg.ConfigClientMachineBenchmarkOptions.StaleRead {
|
||||
|
|
@ -514,7 +470,7 @@ func generateReads(gcfg dbtesterpb.ConfigClientMachineAgentControl, key string,
|
|||
}
|
||||
inflightReqs <- request{zkOp: op}
|
||||
|
||||
case "consul__v0_7_5", "consul__v0_8_0", "consul__v0_8_4", "cetcd__beta":
|
||||
case "consul__v0_8_4", "cetcd__beta":
|
||||
op := consulOp{key: key}
|
||||
if gcfg.ConfigClientMachineBenchmarkOptions.StaleRead {
|
||||
op.staleRead = true
|
||||
|
|
@ -555,13 +511,11 @@ func generateWrites(gcfg dbtesterpb.ConfigClientMachineAgentControl, startIdx in
|
|||
}
|
||||
|
||||
switch gcfg.DatabaseID {
|
||||
case "etcd__v2_3":
|
||||
inflightReqs <- request{etcdv2Op: etcdv2Op{key: k, value: vs}}
|
||||
case "etcd__v3_1", "etcd__v3_2", "etcd__tip":
|
||||
inflightReqs <- request{etcdv3Op: clientv3.OpPut(k, vs)}
|
||||
case "zookeeper__r3_4_9", "zookeeper__r3_5_2_alpha", "zookeeper__r3_5_3_beta", "zetcd__beta":
|
||||
inflightReqs <- request{zkOp: zkOp{key: "/" + k, value: v}}
|
||||
case "consul__v0_7_5", "consul__v0_8_0", "consul__v0_8_4", "cetcd__beta":
|
||||
case "consul__v0_8_4", "cetcd__beta":
|
||||
inflightReqs <- request{consulOp: consulOp{key: k, value: v}}
|
||||
default:
|
||||
plog.Panicf("%q is unknown database ID", gcfg.DatabaseID)
|
||||
|
|
|
|||
|
|
@ -1,87 +0,0 @@
|
|||
// Copyright 2017 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package dbtester
|
||||
|
||||
import (
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
clientv2 "github.com/coreos/etcd/client"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func mustCreateClientsEtcdv2(endpoints []string, total int64) []clientv2.KeysAPI {
|
||||
cks := make([]clientv2.KeysAPI, total)
|
||||
for i := range cks {
|
||||
endpoint := endpoints[dialTotal%len(endpoints)]
|
||||
dialTotal++
|
||||
|
||||
if !strings.HasPrefix(endpoint, "http://") {
|
||||
endpoint = "http://" + endpoint
|
||||
}
|
||||
|
||||
tr := &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
Dial: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).Dial,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
}
|
||||
cfg := clientv2.Config{
|
||||
Endpoints: []string{endpoint},
|
||||
Transport: tr,
|
||||
HeaderTimeoutPerRequest: time.Second,
|
||||
}
|
||||
c, err := clientv2.New(cfg)
|
||||
if err != nil {
|
||||
plog.Fatal(err)
|
||||
}
|
||||
kapi := clientv2.NewKeysAPI(c)
|
||||
|
||||
cks[i] = kapi
|
||||
}
|
||||
return cks
|
||||
}
|
||||
|
||||
type etcdv2Op struct {
|
||||
key string
|
||||
value string
|
||||
}
|
||||
|
||||
func newPutEtcd2(conn clientv2.KeysAPI) ReqHandler {
|
||||
return func(ctx context.Context, req *request) error {
|
||||
op := req.etcdv2Op
|
||||
_, err := conn.Set(context.Background(), op.key, op.value, nil)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func newGetEtcd2(conn clientv2.KeysAPI) ReqHandler {
|
||||
return func(ctx context.Context, req *request) error {
|
||||
_, err := conn.Get(ctx, req.etcdv2Op.key, nil)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func getTotalKeysEtcdv2(endpoints []string) map[string]int64 {
|
||||
rs := make(map[string]int64)
|
||||
for _, ep := range endpoints {
|
||||
rs[ep] = 0 // not supported in metrics
|
||||
}
|
||||
return rs
|
||||
}
|
||||
Loading…
Reference in New Issue