Merge pull request #96 from gyuho/proxy

agent: handle 'zetcd', 'cetcd' database request
This commit is contained in:
Gyu-Ho Lee 2016-12-13 14:52:55 -08:00 committed by GitHub
commit 4b3d69835f
6 changed files with 196 additions and 41 deletions

View File

@ -68,6 +68,9 @@ var (
etcdBinaryPath = filepath.Join(os.Getenv("GOPATH"), "bin/etcd")
consulBinaryPath = filepath.Join(os.Getenv("GOPATH"), "bin/consul")
zetcdBinaryPath = filepath.Join(os.Getenv("GOPATH"), "bin/zetcd")
cetcdBinaryPath = filepath.Join(os.Getenv("GOPATH"), "bin/cetcd")
javaBinaryPath = "/usr/bin/java"
etcdToken = "etcd_token"
@ -153,9 +156,14 @@ func CommandFunc(cmd *cobra.Command, args []string) error {
type transporterServer struct { // satisfy TransporterServer
req Request
cmd *exec.Cmd
logfile *os.File
pid int
proxyCmd *exec.Cmd
proxyLogfile *os.File
proxyPid int
}
var uploadSig = make(chan Request_Operation, 1)
@ -211,9 +219,15 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
switch r.Operation {
case Request_Start:
switch t.req.Database {
case Request_etcdv2, Request_etcdv3:
case Request_etcdv2, Request_etcdv3, Request_zetcd, Request_cetcd:
if !exist(etcdBinaryPath) {
return nil, fmt.Errorf("%q does not exist", etcdBinaryPath)
return nil, fmt.Errorf("etcd binary %q does not exist", etcdBinaryPath)
}
if t.req.Database == Request_zetcd && !exist(zetcdBinaryPath) {
return nil, fmt.Errorf("zetcd binary %q does not exist", zetcdBinaryPath)
}
if t.req.Database == Request_cetcd && !exist(cetcdBinaryPath) {
return nil, fmt.Errorf("cetcd binary %q does not exist", cetcdBinaryPath)
}
if err := os.RemoveAll(etcdDataDir); err != nil {
return nil, err
@ -273,6 +287,47 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
plog.Infof("exiting %q", cmdString)
}()
if t.req.Database == Request_zetcd || t.req.Database == Request_cetcd {
f2, err := openToAppend(databaseLogPath + t.req.Database.String())
if err != nil {
return nil, err
}
t.proxyLogfile = f2
flags2 := []string{}
if t.req.Database == Request_zetcd {
flags2 = []string{
"-zkaddr", "0.0.0.0:2181",
"-endpoint", clientURLs[t.req.ServerIndex], // etcd endpoint
}
} else {
flags2 = []string{
"-consuladdr", "0.0.0.0:8500",
"-endpoint", clientURLs[t.req.ServerIndex], // etcd endpoint
}
}
flagString2 := strings.Join(flags2, " ")
cmd2 := exec.Command(zetcdBinaryPath, flags2...)
cmd2.Stdout = f2
cmd2.Stderr = f2
cmdString2 := fmt.Sprintf("%s %s", cmd2.Path, flagString2)
plog.Infof("starting binary %q", cmdString2)
if err := cmd2.Start(); err != nil {
return nil, err
}
t.proxyCmd = cmd2
t.proxyPid = cmd2.Process.Pid
plog.Infof("started binary %q [PID: %d]", cmdString2, t.proxyPid)
go func() {
if err := cmd2.Wait(); err != nil {
plog.Errorf("cmd.Wait %q returned error %v", cmdString2, err)
return
}
plog.Infof("exiting %q", cmdString2)
}()
}
case Request_ZooKeeper:
if !exist(javaBinaryPath) {
return nil, fmt.Errorf("%q does not exist", javaBinaryPath)
@ -404,14 +459,14 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
plog.Error("cmd.Wait returned error", cmdString, err)
return
}
plog.Infof("exiting", cmdString, err)
plog.Infof("exiting %q (%v)", cmdString, err)
}()
default:
return nil, fmt.Errorf("unknown database %q", r.Database)
}
case Request_Restart:
case Request_Restart: // TODO: proxy is not supported!
if t.cmd == nil {
return nil, fmt.Errorf("nil command")
}
@ -462,6 +517,15 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
if t.logfile != nil {
t.logfile.Close()
}
if t.proxyCmd != nil {
plog.Infof("stopping proxy binary %q [PID: %d]", t.req.Database.String(), t.proxyPid)
if err := syscall.Kill(t.proxyPid, syscall.SIGTERM); err != nil {
return nil, err
}
}
if t.proxyLogfile != nil {
t.proxyLogfile.Close()
}
plog.Infof("stopped binary %q [PID: %d]", t.req.Database.String(), t.pid)
processPID = t.pid
uploadSig <- Request_Stop

View File

@ -0,0 +1,41 @@
database: zetcd
test_name: zetcd-v3.1-go1.7.4
google_cloud_project_name: etcd-development
google_cloud_storage_key_path: /home/gyuho/gcloud-key.json
google_cloud_storage_bucket_name: dbtester-results
google_cloud_storage_sub_directory: 2016Q403-etcd-zk-consul/01-write-2M-keys
peer_ips:
- 10.240.0.37
- 10.240.0.38
- 10.240.0.39
agent_port: 3500
database_port: 2379
result_path_time_series: timeseries.csv
result_path_log: result.log
# start database by sending RPC calls to agents
step1:
skip_start_database: false
zookeeper_max_client_connections: 5000
zookeeper_snap_count: 100000
# start benchmark
step2:
skip_stress_database: false
bench_type: write
stale_read: true
connections: 1000
clients: 1000
key_size: 8
value_size: 256
total_requests: 2000000
request_interval_ms: 0
etcdv3_compaction_cycle: 0
# after benchmark
step3:
action: stop

View File

@ -0,0 +1,41 @@
database: cetcd
test_name: cetcd-v3.1-go1.7.4
google_cloud_project_name: etcd-development
google_cloud_storage_key_path: /home/gyuho/gcloud-key.json
google_cloud_storage_bucket_name: dbtester-results
google_cloud_storage_sub_directory: 2016Q403-etcd-zk-consul/01-write-2M-keys
peer_ips:
- 10.240.0.41
- 10.240.0.42
- 10.240.0.43
agent_port: 3500
database_port: 2379
result_path_time_series: timeseries.csv
result_path_log: result.log
# start database by sending RPC calls to agents
step1:
skip_start_database: false
zookeeper_max_client_connections: 5000
zookeeper_snap_count: 100000
# start benchmark
step2:
skip_stress_database: false
bench_type: write
stale_read: true
connections: 1000
clients: 1000
key_size: 8
value_size: 256
total_requests: 2000000
request_interval_ms: 0
etcdv3_compaction_cycle: 0
# after benchmark
step3:
action: stop

View File

@ -22,6 +22,7 @@ import (
"gopkg.in/yaml.v2"
)
// Config configures dbtester control clients.
type Config struct {
Database string `yaml:"database"`
TestName string `yaml:"test_name"`
@ -114,8 +115,14 @@ func (cfg *Config) ToRequest() agent.Request {
cfg.Database = "zookeeper"
req.Database = agent.Request_ZooKeeper
case "zetcd":
req.Database = agent.Request_zetcd
case "consul":
req.Database = agent.Request_Consul
case "cetcd":
req.Database = agent.Request_cetcd
}
req.PeerIPString = cfg.PeerIPString

View File

@ -35,10 +35,11 @@ import (
)
var (
// Command implements 'control' command.
Command = &cobra.Command{
Use: "control",
Short: "Controls tests.",
RunE: CommandFunc,
RunE: commandFunc,
}
configPath string
)
@ -47,7 +48,7 @@ func init() {
Command.PersistentFlags().StringVarP(&configPath, "config", "c", "", "YAML configuration file path.")
}
func CommandFunc(cmd *cobra.Command, args []string) error {
func commandFunc(cmd *cobra.Command, args []string) error {
cfg, err := ReadConfig(configPath)
if err != nil {
return err
@ -56,7 +57,9 @@ func CommandFunc(cmd *cobra.Command, args []string) error {
case "etcdv2":
case "etcdv3":
case "zk", "zookeeper":
case "zetcd":
case "consul":
case "cetcd":
default:
return fmt.Errorf("%q is not supported", cfg.Database)
}
@ -227,9 +230,9 @@ func step2(cfg Config) error {
totalKeysFunc = getTotalKeysEtcdv2
case "etcdv3":
totalKeysFunc = getTotalKeysEtcdv3
case "zk", "zookeeper":
case "zk", "zookeeper", "zetcd":
totalKeysFunc = getTotalKeysZk
case "consul":
case "consul", "cetcd":
totalKeysFunc = getTotalKeysConsul
}
for k, v := range totalKeysFunc(cfg.DatabaseEndpoints) {
@ -242,7 +245,7 @@ func step2(cfg Config) error {
switch cfg.Database {
case "etcdv2":
plog.Infof("write started [request: PUT | key: %q | database: %q]", key, "etcdv2")
plog.Infof("write started [request: PUT | key: %q | database: %q]", key, cfg.Database)
var err error
for i := 0; i < 7; i++ {
clients := mustCreateClientsEtcdv2(cfg.DatabaseEndpoints, cfg.Step2.Connections)
@ -250,16 +253,16 @@ func step2(cfg Config) error {
if err != nil {
continue
}
plog.Infof("write done [request: PUT | key: %q | database: %q]", key, "etcdv2")
plog.Infof("write done [request: PUT | key: %q | database: %q]", key, cfg.Database)
break
}
if err != nil {
plog.Errorf("write error [request: PUT | key: %q | database: %q]", key, "etcdv2")
plog.Errorf("write error [request: PUT | key: %q | database: %q]", key, cfg.Database)
os.Exit(1)
}
case "etcdv3":
plog.Infof("write started [request: PUT | key: %q | database: %q]", key, "etcdv3")
plog.Infof("write started [request: PUT | key: %q | database: %q]", key, cfg.Database)
var err error
for i := 0; i < 7; i++ {
clients := mustCreateClientsEtcdv3(cfg.DatabaseEndpoints, etcdv3ClientCfg{
@ -270,16 +273,16 @@ func step2(cfg Config) error {
if err != nil {
continue
}
plog.Infof("write done [request: PUT | key: %q | database: %q]", key, "etcdv3")
plog.Infof("write done [request: PUT | key: %q | database: %q]", key, cfg.Database)
break
}
if err != nil {
plog.Errorf("write error [request: PUT | key: %q | database: %q]", key, "etcdv3")
plog.Errorf("write error [request: PUT | key: %q | database: %q]", key, cfg.Database)
os.Exit(1)
}
case "zk", "zookeeper":
plog.Infof("write started [request: PUT | key: %q | database: %q]", key, "zookeeper")
case "zk", "zookeeper", "zetcd":
plog.Infof("write started [request: PUT | key: %q | database: %q]", key, cfg.Database)
var err error
for i := 0; i < 7; i++ {
conns := mustCreateConnsZk(cfg.DatabaseEndpoints, cfg.Step2.Connections)
@ -290,16 +293,16 @@ func step2(cfg Config) error {
for j := range conns {
conns[j].Close()
}
plog.Infof("write done [request: PUT | key: %q | database: %q]", key, "zookeeper")
plog.Infof("write done [request: PUT | key: %q | database: %q]", key, cfg.Database)
break
}
if err != nil {
plog.Errorf("write error [request: PUT | key: %q | database: %q]", key, "zookeeper")
plog.Errorf("write error [request: PUT | key: %q | database: %q]", key, cfg.Database)
os.Exit(1)
}
case "consul":
plog.Infof("write started [request: PUT | key: %q | database: %q]", key, "consul")
case "consul", "cetcd":
plog.Infof("write started [request: PUT | key: %q | database: %q]", key, cfg.Database)
var err error
for i := 0; i < 7; i++ {
clients := mustCreateConnsConsul(cfg.DatabaseEndpoints, cfg.Step2.Connections)
@ -307,11 +310,11 @@ func step2(cfg Config) error {
if err != nil {
continue
}
plog.Infof("write done [request: PUT | key: %q | database: %q]", key, "consul")
plog.Infof("write done [request: PUT | key: %q | database: %q]", key, cfg.Database)
break
}
if err != nil {
plog.Errorf("write done [request: PUT | key: %q | database: %q]", key, "consul")
plog.Errorf("write done [request: PUT | key: %q | database: %q]", key, cfg.Database)
os.Exit(1)
}
}
@ -339,12 +342,12 @@ func step2(cfg Config) error {
_, err = clients[0].Do(context.Background(), clientv3.OpPut(key, value))
clients[0].Close()
case "zk", "zookeeper":
case "zk", "zookeeper", "zetcd":
conns := mustCreateConnsZk(cfg.DatabaseEndpoints, 1)
_, err = conns[0].Create("/"+key, vals.bytes[0], zkCreateFlags, zkCreateAcl)
conns[0].Close()
case "consul":
case "consul", "cetcd":
clients := mustCreateConnsConsul(cfg.DatabaseEndpoints, 1)
_, err = clients[0].Put(&consulapi.KVPair{Key: key, Value: vals.bytes[0]}, nil)
}
@ -410,7 +413,7 @@ func sendReq(ep string, req agent.Request, i int) error {
req.ServerIndex = uint32(i)
req.ZookeeperMyID = uint32(i + 1)
plog.Infof("sending message [index: %d | operation: %q | database: %q | endpoint: %q]", i, req.Operation.String(), req.Database.String(), ep)
plog.Infof("sending message [index: %d | operation: %q | database: %q | endpoint: %q]", i, req.Operation, req.Database, ep)
conn, err := grpc.Dial(ep, grpc.WithInsecure())
if err != nil {
@ -454,7 +457,7 @@ func newReadHandlers(cfg Config) (rhs []ReqHandler, done func()) {
clients[i].Close()
}
}
case "zk", "zookeeper":
case "zk", "zookeeper", "zetcd":
conns := mustCreateConnsZk(cfg.DatabaseEndpoints, cfg.Step2.Connections)
for i := range conns {
rhs[i] = newGetZK(conns[i])
@ -464,7 +467,7 @@ func newReadHandlers(cfg Config) (rhs []ReqHandler, done func()) {
conns[i].Close()
}
}
case "consul":
case "consul", "cetcd":
conns := mustCreateConnsConsul(cfg.DatabaseEndpoints, cfg.Step2.Connections)
for i := range conns {
rhs[i] = newGetConsul(conns[i])
@ -494,11 +497,11 @@ func newWriteHandlers(cfg Config) (rhs []ReqHandler, done func()) {
etcdClients[i].Close()
}
}
case "zk", "zookeeper":
case "zk", "zookeeper", "zetcd":
if cfg.Step2.SameKey {
key := sameKey(cfg.Step2.KeySize)
valueBts := randBytes(cfg.Step2.ValueSize)
plog.Infof("write started [request: PUT | key: %q | database: %q]", key, "zookeeper")
plog.Infof("write started [request: PUT | key: %q | database: %q]", key, cfg.Database)
var err error
for i := 0; i < 7; i++ {
conns := mustCreateConnsZk(cfg.DatabaseEndpoints, cfg.Step2.Connections)
@ -509,11 +512,11 @@ func newWriteHandlers(cfg Config) (rhs []ReqHandler, done func()) {
for j := range conns {
conns[j].Close()
}
plog.Infof("write done [request: PUT | key: %q | database: %q]", key, "zookeeper")
plog.Infof("write done [request: PUT | key: %q | database: %q]", key, cfg.Database)
break
}
if err != nil {
plog.Errorf("write error [request: PUT | key: %q | database: %q]", key, "zookeeper")
plog.Errorf("write error [request: PUT | key: %q | database: %q]", key, cfg.Database)
os.Exit(1)
}
}
@ -531,7 +534,7 @@ func newWriteHandlers(cfg Config) (rhs []ReqHandler, done func()) {
conns[i].Close()
}
}
case "consul":
case "consul", "cetcd":
conns := mustCreateConnsConsul(cfg.DatabaseEndpoints, cfg.Step2.Connections)
for i := range conns {
rhs[i] = newPutConsul(conns[i])
@ -561,7 +564,7 @@ func newReadOneshotHandlers(cfg Config) []ReqHandler {
return newGetEtcd3(conns[0])(ctx, req)
}
}
case "zk", "zookeeper":
case "zk", "zookeeper", "zetcd":
for i := range rhs {
rhs[i] = func(ctx context.Context, req *request) error {
conns := mustCreateConnsZk(cfg.DatabaseEndpoints, cfg.Step2.Connections)
@ -569,7 +572,7 @@ func newReadOneshotHandlers(cfg Config) []ReqHandler {
return newGetZK(conns[0])(ctx, req)
}
}
case "consul":
case "consul", "cetcd":
for i := range rhs {
rhs[i] = func(ctx context.Context, req *request) error {
conns := mustCreateConnsConsul(cfg.DatabaseEndpoints, 1)
@ -596,14 +599,14 @@ func generateReads(cfg Config, key string, requests chan<- request) {
}
requests <- request{etcdv3Op: clientv3.OpGet(key, opts...)}
case "zk", "zookeeper":
case "zk", "zookeeper", "zetcd":
op := zkOp{key: key}
if cfg.Step2.StaleRead {
op.staleRead = true
}
requests <- request{zkOp: op}
case "consul":
case "consul", "cetcd":
op := consulOp{key: key}
if cfg.Step2.StaleRead {
op.staleRead = true
@ -636,9 +639,9 @@ func generateWrites(cfg Config, vals values, requests chan<- request) {
requests <- request{etcdv2Op: etcdv2Op{key: k, value: vs}}
case "etcdv3":
requests <- request{etcdv3Op: clientv3.OpPut(k, vs)}
case "zk", "zookeeper":
case "zk", "zookeeper", "zetcd":
requests <- request{zkOp: zkOp{key: "/" + k, value: v}}
case "consul":
case "consul", "cetcd":
requests <- request{consulOp: consulOp{key: k, value: v}}
}
if cfg.Step2.RequestIntervalMs > 0 {

View File

@ -39,4 +39,3 @@ step2:
# after benchmark
step3:
action: stop # OR 'only-upload-log'