control: clean up flags, change stale_read

This commit is contained in:
Gyu-Ho Lee 2016-11-10 10:43:16 -08:00
parent 672995f7dd
commit d6d051c915
5 changed files with 33 additions and 28 deletions

View File

@ -39,12 +39,9 @@ type Config struct {
AgentEndpoints []string
DatabaseEndpoints []string
// snappy
EtcdCompression string `yaml:"etcd_compression"`
// https://zookeeper.apache.org/doc/trunk/zookeeperAdmin.html
Step1 struct {
Skip bool `yaml:"skip"`
Skip bool `yaml:"skip"`
ZookeeperMaxClientCnxns int64 `yaml:"zookeeper_max_client_connections"`
ZookeeperSnapCount int64 `yaml:"zookeeper_snap_count"`
} `yaml:"step1"`
@ -52,7 +49,7 @@ type Config struct {
Step2 struct {
Skip bool `yaml:"skip"`
BenchType string `yaml:"bench_type"`
LocalRead bool `yaml:"local_read"`
StaleRead bool `yaml:"stale_read"`
ResultPath string `yaml:"result_path"`
Connections int `yaml:"connections"`
Clients int `yaml:"clients"`
@ -71,6 +68,12 @@ type Config struct {
}
}
var (
defaultZookeeperMaxClientCnxns int64 = 5000
defaultZookeeperSnapCount int64 = 100000
)
// ReadConfig reads control configuration file.
func ReadConfig(fpath string) (Config, error) {
bts, err := ioutil.ReadFile(fpath)
if err != nil {
@ -80,10 +83,19 @@ func ReadConfig(fpath string) (Config, error) {
if err := yaml.Unmarshal(bts, &rs); err != nil {
return Config{}, err
}
if rs.Step1.ZookeeperMaxClientCnxns == 0 {
rs.Step1.ZookeeperMaxClientCnxns = defaultZookeeperMaxClientCnxns
}
if rs.Step1.ZookeeperSnapCount == 0 {
rs.Step1.ZookeeperSnapCount = defaultZookeeperSnapCount
}
return rs, nil
}
func (cfg *Config) Request() agent.Request {
// ToRequest converts control configuration to agent RPC.
func (cfg *Config) ToRequest() agent.Request {
req := agent.Request{}
req.TestName = cfg.TestName
@ -111,7 +123,6 @@ func (cfg *Config) Request() agent.Request {
req.ZookeeperMaxClientCnxns = cfg.Step1.ZookeeperMaxClientCnxns
req.ZookeeperSnapCount = cfg.Step1.ZookeeperSnapCount
// req.EtcdCompression = cfg.EtcdCompression
return req
}

View File

@ -39,9 +39,6 @@ func TestReadConfig(t *testing.T) {
if c.DatabasePort != 2379 {
t.Fatalf("unexpected %d", c.DatabasePort)
}
if c.EtcdCompression != "snappy" {
t.Fatalf("unexpected %q", c.EtcdCompression)
}
if c.GoogleCloudProjectName != "etcd-development" {
t.Fatalf("unexpected %s", c.GoogleCloudProjectName)
}
@ -84,8 +81,8 @@ func TestReadConfig(t *testing.T) {
if c.Step2.Connections != 100 {
t.Fatalf("unexpected %d", c.Step2.Connections)
}
if !c.Step2.LocalRead {
t.Fatalf("unexpected %v", c.Step2.LocalRead)
if !c.Step2.StaleRead {
t.Fatalf("unexpected %v", c.Step2.StaleRead)
}
if c.Step2.TotalRequests != 3000000 {
t.Fatalf("unexpected %d", c.Step2.TotalRequests)
@ -93,6 +90,9 @@ func TestReadConfig(t *testing.T) {
if c.Step2.RequestIntervalMs != 100 {
t.Fatalf("unexpected %d", c.Step2.RequestIntervalMs)
}
if c.Step2.Etcdv3CompactionCycle != 100 {
t.Fatalf("unexpected %d", c.Step2.Etcdv3CompactionCycle)
}
if c.Step3.Skip {
t.Fatalf("unexpected %v", c.Step3.Skip)
}

View File

@ -237,7 +237,6 @@ func step2(cfg Config) error {
clients := mustCreateClientsEtcdv3(cfg.DatabaseEndpoints, etcdv3ClientCfg{
totalConns: 1,
totalClients: 1,
// compressionTypeTxt: cfg.EtcdCompression,
})
_, err = clients[0].Do(context.Background(), clientv3.OpPut(key, value))
if err != nil {
@ -303,6 +302,7 @@ func step2(cfg Config) error {
case "etcdv2":
clients := mustCreateClientsEtcdv2(cfg.DatabaseEndpoints, 1)
_, err = clients[0].Set(context.Background(), key, value, nil)
case "etcdv3":
clients := mustCreateClientsEtcdv3(cfg.DatabaseEndpoints, etcdv3ClientCfg{
totalConns: 1,
@ -310,10 +310,12 @@ func step2(cfg Config) error {
})
_, err = clients[0].Do(context.Background(), clientv3.OpPut(key, value))
clients[0].Close()
case "zk", "zookeeper":
conns := mustCreateConnsZk(cfg.DatabaseEndpoints, 1)
_, err = conns[0].Create("/"+key, vals.bytes[0], zkCreateFlags, zkCreateAcl)
conns[0].Close()
case "consul":
clients := mustCreateConnsConsul(cfg.DatabaseEndpoints, 1)
_, err = clients[0].Put(&consulapi.KVPair{Key: key, Value: vals.bytes[0]}, nil)
@ -334,7 +336,7 @@ func step2(cfg Config) error {
func step3(cfg Config) error { return bcastReq(cfg, agent.Request_Stop) }
func bcastReq(cfg Config, op agent.Request_Operation) error {
req := cfg.Request()
req := cfg.ToRequest()
req.Operation = op
donec, errc := make(chan struct{}), make(chan error)
@ -403,7 +405,6 @@ func newReadHandlers(cfg Config) (rhs []ReqHandler, done func()) {
clients := mustCreateClientsEtcdv3(cfg.DatabaseEndpoints, etcdv3ClientCfg{
totalConns: cfg.Step2.Connections,
totalClients: cfg.Step2.Clients,
// compressionTypeTxt: cfg.EtcdCompression,
})
for i := range clients {
rhs[i] = newGetEtcd3(clients[i].KV)
@ -444,7 +445,6 @@ func newWriteHandlers(cfg Config) (rhs []ReqHandler, done func()) {
etcdClients := mustCreateClientsEtcdv3(cfg.DatabaseEndpoints, etcdv3ClientCfg{
totalConns: cfg.Step2.Connections,
totalClients: cfg.Step2.Clients,
// compressionTypeTxt: cfg.EtcdCompression,
})
for i := range etcdClients {
rhs[i] = newPutEtcd3(etcdClients[i])
@ -551,21 +551,21 @@ func generateReads(cfg Config, key string, requests chan<- request) {
case "etcdv3":
opts := []clientv3.OpOption{clientv3.WithRange("")}
if cfg.Step2.LocalRead {
if cfg.Step2.StaleRead {
opts = append(opts, clientv3.WithSerializable())
}
requests <- request{etcdv3Op: clientv3.OpGet(key, opts...)}
case "zk", "zookeeper":
op := zkOp{key: key}
if cfg.Step2.LocalRead {
if cfg.Step2.StaleRead {
op.staleRead = true
}
requests <- request{zkOp: op}
case "consul":
op := consulOp{key: key}
if cfg.Step2.LocalRead {
if cfg.Step2.StaleRead {
op.staleRead = true
}
requests <- request{consulOp: op}

View File

@ -14,8 +14,6 @@ peer_ips:
agent_port: 3500
database_port: 2379
etcd_compression: snappy
# start database by sending RPC calls to agents
step1:
skip: false
@ -26,7 +24,7 @@ step1:
step2:
skip: false
bench_type: write
local_read: true
stale_read: true
result_path: timeseries.csv
connections: 100
clients: 100
@ -36,7 +34,7 @@ step2:
value_testdata_path: /home/gyuho/testdata # overwrites value_size
total_requests: 3000000
request_interval_ms: 100
etcdv3_compaction_cycle: 0
etcdv3_compaction_cycle: 100
# after benchmark
step3:

View File

@ -75,13 +75,11 @@ var (
dialTotal int
)
// func mustCreateConnEtcdv3(endpoints []string, compressType compress.Type) *clientv3.Client {
func mustCreateConnEtcdv3(endpoints []string) *clientv3.Client {
endpoint := endpoints[dialTotal%len(endpoints)]
dialTotal++
cfg := clientv3.Config{
Endpoints: []string{endpoint},
// CompressType: compressType,
}
client, err := clientv3.New(cfg)
if err != nil {
@ -94,13 +92,11 @@ func mustCreateConnEtcdv3(endpoints []string) *clientv3.Client {
type etcdv3ClientCfg struct {
totalConns int
totalClients int
// compressionTypeTxt string
}
func mustCreateClientsEtcdv3(endpoints []string, cfg etcdv3ClientCfg) []*clientv3.Client {
conns := make([]*clientv3.Client, cfg.totalConns)
for i := range conns {
// conns[i] = mustCreateConnEtcdv3(endpoints, compress.ParseType(cfg.compressionTypeTxt))
conns[i] = mustCreateConnEtcdv3(endpoints)
}