diff --git a/agent/agent.go b/agent/agent.go index 106dc515..0f836382 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -68,7 +68,10 @@ var ( etcdBinaryPath = filepath.Join(os.Getenv("GOPATH"), "bin/etcd") consulBinaryPath = filepath.Join(os.Getenv("GOPATH"), "bin/consul") - javaBinaryPath = "/usr/bin/java" + zetcdBinaryPath = filepath.Join(os.Getenv("GOPATH"), "bin/zetcd") + cetcdBinaryPath = filepath.Join(os.Getenv("GOPATH"), "bin/cetcd") + + javaBinaryPath = "/usr/bin/java" etcdToken = "etcd_token" etcdDataDir = "data.etcd" @@ -152,10 +155,15 @@ func CommandFunc(cmd *cobra.Command, args []string) error { } type transporterServer struct { // satisfy TransporterServer - req Request + req Request + cmd *exec.Cmd logfile *os.File pid int + + proxyCmd *exec.Cmd + proxyLogfile *os.File + proxyPid int } var uploadSig = make(chan Request_Operation, 1) @@ -211,9 +219,15 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response switch r.Operation { case Request_Start: switch t.req.Database { - case Request_etcdv2, Request_etcdv3: + case Request_etcdv2, Request_etcdv3, Request_zetcd, Request_cetcd: if !exist(etcdBinaryPath) { - return nil, fmt.Errorf("%q does not exist", etcdBinaryPath) + return nil, fmt.Errorf("etcd binary %q does not exist", etcdBinaryPath) + } + if t.req.Database == Request_zetcd && !exist(zetcdBinaryPath) { + return nil, fmt.Errorf("zetcd binary %q does not exist", zetcdBinaryPath) + } + if t.req.Database == Request_cetcd && !exist(cetcdBinaryPath) { + return nil, fmt.Errorf("cetcd binary %q does not exist", cetcdBinaryPath) } if err := os.RemoveAll(etcdDataDir); err != nil { return nil, err @@ -273,6 +287,47 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response plog.Infof("exiting %q", cmdString) }() + if t.req.Database == Request_zetcd || t.req.Database == Request_cetcd { + f2, err := openToAppend(databaseLogPath + t.req.Database.String()) + if err != nil { + return nil, err + } + t.proxyLogfile = f2 + flags2 := []string{} + if t.req.Database == Request_zetcd { + flags2 = []string{ + "-zkaddr", "0.0.0.0:2181", + "-endpoint", clientURLs[t.req.ServerIndex], // etcd endpoint + } + } else { + flags2 = []string{ + "-consuladdr", "0.0.0.0:8500", + "-endpoint", clientURLs[t.req.ServerIndex], // etcd endpoint + } + } + flagString2 := strings.Join(flags2, " ") + + cmd2 := exec.Command(zetcdBinaryPath, flags2...) + cmd2.Stdout = f2 + cmd2.Stderr = f2 + + cmdString2 := fmt.Sprintf("%s %s", cmd2.Path, flagString2) + plog.Infof("starting binary %q", cmdString2) + if err := cmd2.Start(); err != nil { + return nil, err + } + t.proxyCmd = cmd2 + t.proxyPid = cmd2.Process.Pid + plog.Infof("started binary %q [PID: %d]", cmdString2, t.proxyPid) + go func() { + if err := cmd2.Wait(); err != nil { + plog.Errorf("cmd.Wait %q returned error %v", cmdString2, err) + return + } + plog.Infof("exiting %q", cmdString2) + }() + } + case Request_ZooKeeper: if !exist(javaBinaryPath) { return nil, fmt.Errorf("%q does not exist", javaBinaryPath) @@ -404,14 +459,14 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response plog.Error("cmd.Wait returned error", cmdString, err) return } - plog.Infof("exiting", cmdString, err) + plog.Infof("exiting %q (%v)", cmdString, err) }() default: return nil, fmt.Errorf("unknown database %q", r.Database) } - case Request_Restart: + case Request_Restart: // TODO: proxy is not supported! if t.cmd == nil { return nil, fmt.Errorf("nil command") } @@ -462,6 +517,15 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response if t.logfile != nil { t.logfile.Close() } + if t.proxyCmd != nil { + plog.Infof("stopping proxy binary %q [PID: %d]", t.req.Database.String(), t.proxyPid) + if err := syscall.Kill(t.proxyPid, syscall.SIGTERM); err != nil { + return nil, err + } + } + if t.proxyLogfile != nil { + t.proxyLogfile.Close() + } plog.Infof("stopped binary %q [PID: %d]", t.req.Database.String(), t.pid) processPID = t.pid uploadSig <- Request_Stop diff --git a/bench-configuration/01-write-2M-keys/04-zetcd-etcd-v3.1.yaml b/bench-configuration/01-write-2M-keys/04-zetcd-etcd-v3.1.yaml new file mode 100644 index 00000000..1eec1f10 --- /dev/null +++ b/bench-configuration/01-write-2M-keys/04-zetcd-etcd-v3.1.yaml @@ -0,0 +1,41 @@ +database: zetcd +test_name: zetcd-v3.1-go1.7.4 + +google_cloud_project_name: etcd-development +google_cloud_storage_key_path: /home/gyuho/gcloud-key.json +google_cloud_storage_bucket_name: dbtester-results +google_cloud_storage_sub_directory: 2016Q403-etcd-zk-consul/01-write-2M-keys + +peer_ips: + - 10.240.0.37 + - 10.240.0.38 + - 10.240.0.39 + +agent_port: 3500 +database_port: 2379 + +result_path_time_series: timeseries.csv +result_path_log: result.log + +# start database by sending RPC calls to agents +step1: + skip_start_database: false + zookeeper_max_client_connections: 5000 + zookeeper_snap_count: 100000 + +# start benchmark +step2: + skip_stress_database: false + bench_type: write + stale_read: true + connections: 1000 + clients: 1000 + key_size: 8 + value_size: 256 + total_requests: 2000000 + request_interval_ms: 0 + etcdv3_compaction_cycle: 0 + +# after benchmark +step3: + action: stop diff --git a/bench-configuration/01-write-2M-keys/05-cetcd-etcd-v3.1.yaml b/bench-configuration/01-write-2M-keys/05-cetcd-etcd-v3.1.yaml new file mode 100644 index 00000000..41231708 --- /dev/null +++ b/bench-configuration/01-write-2M-keys/05-cetcd-etcd-v3.1.yaml @@ -0,0 +1,41 @@ +database: cetcd +test_name: cetcd-v3.1-go1.7.4 + +google_cloud_project_name: etcd-development +google_cloud_storage_key_path: /home/gyuho/gcloud-key.json +google_cloud_storage_bucket_name: dbtester-results +google_cloud_storage_sub_directory: 2016Q403-etcd-zk-consul/01-write-2M-keys + +peer_ips: + - 10.240.0.41 + - 10.240.0.42 + - 10.240.0.43 + +agent_port: 3500 +database_port: 2379 + +result_path_time_series: timeseries.csv +result_path_log: result.log + +# start database by sending RPC calls to agents +step1: + skip_start_database: false + zookeeper_max_client_connections: 5000 + zookeeper_snap_count: 100000 + +# start benchmark +step2: + skip_stress_database: false + bench_type: write + stale_read: true + connections: 1000 + clients: 1000 + key_size: 8 + value_size: 256 + total_requests: 2000000 + request_interval_ms: 0 + etcdv3_compaction_cycle: 0 + +# after benchmark +step3: + action: stop diff --git a/control/config.go b/control/config.go index bc96481d..97b9f975 100644 --- a/control/config.go +++ b/control/config.go @@ -22,6 +22,7 @@ import ( "gopkg.in/yaml.v2" ) +// Config configures dbtester control clients. type Config struct { Database string `yaml:"database"` TestName string `yaml:"test_name"` @@ -114,8 +115,14 @@ func (cfg *Config) ToRequest() agent.Request { cfg.Database = "zookeeper" req.Database = agent.Request_ZooKeeper + case "zetcd": + req.Database = agent.Request_zetcd + case "consul": req.Database = agent.Request_Consul + + case "cetcd": + req.Database = agent.Request_cetcd } req.PeerIPString = cfg.PeerIPString diff --git a/control/control.go b/control/control.go index 6057aab7..fe930f26 100644 --- a/control/control.go +++ b/control/control.go @@ -35,10 +35,11 @@ import ( ) var ( + // Command implements 'control' command. Command = &cobra.Command{ Use: "control", Short: "Controls tests.", - RunE: CommandFunc, + RunE: commandFunc, } configPath string ) @@ -47,7 +48,7 @@ func init() { Command.PersistentFlags().StringVarP(&configPath, "config", "c", "", "YAML configuration file path.") } -func CommandFunc(cmd *cobra.Command, args []string) error { +func commandFunc(cmd *cobra.Command, args []string) error { cfg, err := ReadConfig(configPath) if err != nil { return err @@ -56,7 +57,9 @@ func CommandFunc(cmd *cobra.Command, args []string) error { case "etcdv2": case "etcdv3": case "zk", "zookeeper": + case "zetcd": case "consul": + case "cetcd": default: return fmt.Errorf("%q is not supported", cfg.Database) } @@ -227,9 +230,9 @@ func step2(cfg Config) error { totalKeysFunc = getTotalKeysEtcdv2 case "etcdv3": totalKeysFunc = getTotalKeysEtcdv3 - case "zk", "zookeeper": + case "zk", "zookeeper", "zetcd": totalKeysFunc = getTotalKeysZk - case "consul": + case "consul", "cetcd": totalKeysFunc = getTotalKeysConsul } for k, v := range totalKeysFunc(cfg.DatabaseEndpoints) { @@ -242,7 +245,7 @@ func step2(cfg Config) error { switch cfg.Database { case "etcdv2": - plog.Infof("write started [request: PUT | key: %q | database: %q]", key, "etcdv2") + plog.Infof("write started [request: PUT | key: %q | database: %q]", key, cfg.Database) var err error for i := 0; i < 7; i++ { clients := mustCreateClientsEtcdv2(cfg.DatabaseEndpoints, cfg.Step2.Connections) @@ -250,16 +253,16 @@ func step2(cfg Config) error { if err != nil { continue } - plog.Infof("write done [request: PUT | key: %q | database: %q]", key, "etcdv2") + plog.Infof("write done [request: PUT | key: %q | database: %q]", key, cfg.Database) break } if err != nil { - plog.Errorf("write error [request: PUT | key: %q | database: %q]", key, "etcdv2") + plog.Errorf("write error [request: PUT | key: %q | database: %q]", key, cfg.Database) os.Exit(1) } case "etcdv3": - plog.Infof("write started [request: PUT | key: %q | database: %q]", key, "etcdv3") + plog.Infof("write started [request: PUT | key: %q | database: %q]", key, cfg.Database) var err error for i := 0; i < 7; i++ { clients := mustCreateClientsEtcdv3(cfg.DatabaseEndpoints, etcdv3ClientCfg{ @@ -270,16 +273,16 @@ func step2(cfg Config) error { if err != nil { continue } - plog.Infof("write done [request: PUT | key: %q | database: %q]", key, "etcdv3") + plog.Infof("write done [request: PUT | key: %q | database: %q]", key, cfg.Database) break } if err != nil { - plog.Errorf("write error [request: PUT | key: %q | database: %q]", key, "etcdv3") + plog.Errorf("write error [request: PUT | key: %q | database: %q]", key, cfg.Database) os.Exit(1) } - case "zk", "zookeeper": - plog.Infof("write started [request: PUT | key: %q | database: %q]", key, "zookeeper") + case "zk", "zookeeper", "zetcd": + plog.Infof("write started [request: PUT | key: %q | database: %q]", key, cfg.Database) var err error for i := 0; i < 7; i++ { conns := mustCreateConnsZk(cfg.DatabaseEndpoints, cfg.Step2.Connections) @@ -290,16 +293,16 @@ func step2(cfg Config) error { for j := range conns { conns[j].Close() } - plog.Infof("write done [request: PUT | key: %q | database: %q]", key, "zookeeper") + plog.Infof("write done [request: PUT | key: %q | database: %q]", key, cfg.Database) break } if err != nil { - plog.Errorf("write error [request: PUT | key: %q | database: %q]", key, "zookeeper") + plog.Errorf("write error [request: PUT | key: %q | database: %q]", key, cfg.Database) os.Exit(1) } - case "consul": - plog.Infof("write started [request: PUT | key: %q | database: %q]", key, "consul") + case "consul", "cetcd": + plog.Infof("write started [request: PUT | key: %q | database: %q]", key, cfg.Database) var err error for i := 0; i < 7; i++ { clients := mustCreateConnsConsul(cfg.DatabaseEndpoints, cfg.Step2.Connections) @@ -307,11 +310,11 @@ func step2(cfg Config) error { if err != nil { continue } - plog.Infof("write done [request: PUT | key: %q | database: %q]", key, "consul") + plog.Infof("write done [request: PUT | key: %q | database: %q]", key, cfg.Database) break } if err != nil { - plog.Errorf("write done [request: PUT | key: %q | database: %q]", key, "consul") + plog.Errorf("write done [request: PUT | key: %q | database: %q]", key, cfg.Database) os.Exit(1) } } @@ -339,12 +342,12 @@ func step2(cfg Config) error { _, err = clients[0].Do(context.Background(), clientv3.OpPut(key, value)) clients[0].Close() - case "zk", "zookeeper": + case "zk", "zookeeper", "zetcd": conns := mustCreateConnsZk(cfg.DatabaseEndpoints, 1) _, err = conns[0].Create("/"+key, vals.bytes[0], zkCreateFlags, zkCreateAcl) conns[0].Close() - case "consul": + case "consul", "cetcd": clients := mustCreateConnsConsul(cfg.DatabaseEndpoints, 1) _, err = clients[0].Put(&consulapi.KVPair{Key: key, Value: vals.bytes[0]}, nil) } @@ -410,7 +413,7 @@ func sendReq(ep string, req agent.Request, i int) error { req.ServerIndex = uint32(i) req.ZookeeperMyID = uint32(i + 1) - plog.Infof("sending message [index: %d | operation: %q | database: %q | endpoint: %q]", i, req.Operation.String(), req.Database.String(), ep) + plog.Infof("sending message [index: %d | operation: %q | database: %q | endpoint: %q]", i, req.Operation, req.Database, ep) conn, err := grpc.Dial(ep, grpc.WithInsecure()) if err != nil { @@ -454,7 +457,7 @@ func newReadHandlers(cfg Config) (rhs []ReqHandler, done func()) { clients[i].Close() } } - case "zk", "zookeeper": + case "zk", "zookeeper", "zetcd": conns := mustCreateConnsZk(cfg.DatabaseEndpoints, cfg.Step2.Connections) for i := range conns { rhs[i] = newGetZK(conns[i]) @@ -464,7 +467,7 @@ func newReadHandlers(cfg Config) (rhs []ReqHandler, done func()) { conns[i].Close() } } - case "consul": + case "consul", "cetcd": conns := mustCreateConnsConsul(cfg.DatabaseEndpoints, cfg.Step2.Connections) for i := range conns { rhs[i] = newGetConsul(conns[i]) @@ -494,11 +497,11 @@ func newWriteHandlers(cfg Config) (rhs []ReqHandler, done func()) { etcdClients[i].Close() } } - case "zk", "zookeeper": + case "zk", "zookeeper", "zetcd": if cfg.Step2.SameKey { key := sameKey(cfg.Step2.KeySize) valueBts := randBytes(cfg.Step2.ValueSize) - plog.Infof("write started [request: PUT | key: %q | database: %q]", key, "zookeeper") + plog.Infof("write started [request: PUT | key: %q | database: %q]", key, cfg.Database) var err error for i := 0; i < 7; i++ { conns := mustCreateConnsZk(cfg.DatabaseEndpoints, cfg.Step2.Connections) @@ -509,11 +512,11 @@ func newWriteHandlers(cfg Config) (rhs []ReqHandler, done func()) { for j := range conns { conns[j].Close() } - plog.Infof("write done [request: PUT | key: %q | database: %q]", key, "zookeeper") + plog.Infof("write done [request: PUT | key: %q | database: %q]", key, cfg.Database) break } if err != nil { - plog.Errorf("write error [request: PUT | key: %q | database: %q]", key, "zookeeper") + plog.Errorf("write error [request: PUT | key: %q | database: %q]", key, cfg.Database) os.Exit(1) } } @@ -531,7 +534,7 @@ func newWriteHandlers(cfg Config) (rhs []ReqHandler, done func()) { conns[i].Close() } } - case "consul": + case "consul", "cetcd": conns := mustCreateConnsConsul(cfg.DatabaseEndpoints, cfg.Step2.Connections) for i := range conns { rhs[i] = newPutConsul(conns[i]) @@ -561,7 +564,7 @@ func newReadOneshotHandlers(cfg Config) []ReqHandler { return newGetEtcd3(conns[0])(ctx, req) } } - case "zk", "zookeeper": + case "zk", "zookeeper", "zetcd": for i := range rhs { rhs[i] = func(ctx context.Context, req *request) error { conns := mustCreateConnsZk(cfg.DatabaseEndpoints, cfg.Step2.Connections) @@ -569,7 +572,7 @@ func newReadOneshotHandlers(cfg Config) []ReqHandler { return newGetZK(conns[0])(ctx, req) } } - case "consul": + case "consul", "cetcd": for i := range rhs { rhs[i] = func(ctx context.Context, req *request) error { conns := mustCreateConnsConsul(cfg.DatabaseEndpoints, 1) @@ -596,14 +599,14 @@ func generateReads(cfg Config, key string, requests chan<- request) { } requests <- request{etcdv3Op: clientv3.OpGet(key, opts...)} - case "zk", "zookeeper": + case "zk", "zookeeper", "zetcd": op := zkOp{key: key} if cfg.Step2.StaleRead { op.staleRead = true } requests <- request{zkOp: op} - case "consul": + case "consul", "cetcd": op := consulOp{key: key} if cfg.Step2.StaleRead { op.staleRead = true @@ -636,9 +639,9 @@ func generateWrites(cfg Config, vals values, requests chan<- request) { requests <- request{etcdv2Op: etcdv2Op{key: k, value: vs}} case "etcdv3": requests <- request{etcdv3Op: clientv3.OpPut(k, vs)} - case "zk", "zookeeper": + case "zk", "zookeeper", "zetcd": requests <- request{zkOp: zkOp{key: "/" + k, value: v}} - case "consul": + case "consul", "cetcd": requests <- request{consulOp: consulOp{key: k, value: v}} } if cfg.Step2.RequestIntervalMs > 0 { diff --git a/control/test.yaml b/control/test.yaml index dfd980f1..72a1603b 100644 --- a/control/test.yaml +++ b/control/test.yaml @@ -39,4 +39,3 @@ step2: # after benchmark step3: action: stop # OR 'only-upload-log' -