From 338300df746fe7fec53e8b49b1390ce7dbd9ce6a Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Thu, 23 Feb 2017 08:03:43 -0800 Subject: [PATCH] agent: updates from proto updates --- agent/agent_cetcd.go | 4 ++-- agent/agent_consul.go | 10 ++++----- agent/agent_etcd.go | 14 ++++++------ agent/agent_zetcd.go | 4 ++-- agent/agent_zookeeper.go | 17 ++++++++------- agent/server.go | 47 ++++++++++++++++++++-------------------- agent/upload_log.go | 36 +++++++++++++++--------------- 7 files changed, 67 insertions(+), 65 deletions(-) diff --git a/agent/agent_cetcd.go b/agent/agent_cetcd.go index ecd4a900..025ada8c 100644 --- a/agent/agent_cetcd.go +++ b/agent/agent_cetcd.go @@ -34,8 +34,8 @@ func startCetcd(fs *flags, t *transporterServer) error { flags := []string{ // "-consuladdr", "0.0.0.0:8500", - "-consuladdr", fmt.Sprintf("%s:8500", peerIPs[t.req.IpIndex]), - "-etcd", clientURLs[t.req.IpIndex], // etcd endpoint + "-consuladdr", fmt.Sprintf("%s:8500", peerIPs[t.req.IPIndex]), + "-etcd", clientURLs[t.req.IPIndex], // etcd endpoint } flagString := strings.Join(flags, " ") diff --git a/agent/agent_consul.go b/agent/agent_consul.go index dce29da8..9e9127cc 100644 --- a/agent/agent_consul.go +++ b/agent/agent_consul.go @@ -34,14 +34,14 @@ func startConsul(fs *flags, t *transporterServer) error { peerIPs := strings.Split(t.req.PeerIPsString, "___") var flags []string - switch t.req.IpIndex { + switch t.req.IPIndex { case 0: // leader flags = []string{ "agent", "-server", "-data-dir", fs.consulDataDir, - "-bind", peerIPs[t.req.IpIndex], - "-client", peerIPs[t.req.IpIndex], + "-bind", peerIPs[t.req.IPIndex], + "-client", peerIPs[t.req.IPIndex], "-bootstrap-expect", "3", } @@ -50,8 +50,8 @@ func startConsul(fs *flags, t *transporterServer) error { "agent", "-server", "-data-dir", fs.consulDataDir, - "-bind", peerIPs[t.req.IpIndex], - "-client", peerIPs[t.req.IpIndex], + "-bind", peerIPs[t.req.IPIndex], + "-client", peerIPs[t.req.IPIndex], "-join", peerIPs[0], } } diff --git a/agent/agent_etcd.go b/agent/agent_etcd.go index 4bf710b1..36e90bf4 100644 --- a/agent/agent_etcd.go +++ b/agent/agent_etcd.go @@ -44,23 +44,23 @@ func startEtcd(fs *flags, t *transporterServer) error { members[i] = fmt.Sprintf("%s=%s", names[i], peerURLs[i]) } - qv := t.req.Etcdv3Config.QuotaSizeBytes + qv := t.req.Flag_Etcd_Tip.QuotaSizeBytes if qv > 8000000000 { plog.Warningf("maximum etcd quota is 8GB (got %d)... resetting to 8GB...", qv) qv = 8000000000 } flags := []string{ - "--name", names[t.req.IpIndex], + "--name", names[t.req.IPIndex], "--data-dir", fs.etcdDataDir, "--quota-backend-bytes", fmt.Sprintf("%d", qv), - "--snapshot-count", fmt.Sprintf("%d", t.req.Etcdv3Config.SnapCount), + "--snapshot-count", fmt.Sprintf("%d", t.req.Flag_Etcd_Tip.SnapshotCount), - "--listen-client-urls", clientURLs[t.req.IpIndex], - "--advertise-client-urls", clientURLs[t.req.IpIndex], + "--listen-client-urls", clientURLs[t.req.IPIndex], + "--advertise-client-urls", clientURLs[t.req.IPIndex], - "--listen-peer-urls", peerURLs[t.req.IpIndex], - "--initial-advertise-peer-urls", peerURLs[t.req.IpIndex], + "--listen-peer-urls", peerURLs[t.req.IPIndex], + "--initial-advertise-peer-urls", peerURLs[t.req.IPIndex], "--initial-cluster-token", "dbtester-etcd-token", "--initial-cluster", strings.Join(members, ","), diff --git a/agent/agent_zetcd.go b/agent/agent_zetcd.go index df5e3bf5..fd39d51e 100644 --- a/agent/agent_zetcd.go +++ b/agent/agent_zetcd.go @@ -34,8 +34,8 @@ func startZetcd(fs *flags, t *transporterServer) error { flags := []string{ // "-zkaddr", "0.0.0.0:2181", - "-zkaddr", fmt.Sprintf("%s:2181", peerIPs[t.req.IpIndex]), - "-endpoint", clientURLs[t.req.IpIndex], + "-zkaddr", fmt.Sprintf("%s:2181", peerIPs[t.req.IPIndex]), + "-endpoint", clientURLs[t.req.IPIndex], } flagString := strings.Join(flags, " ") diff --git a/agent/agent_zookeeper.go b/agent/agent_zookeeper.go index 438ee3c7..02344c4a 100644 --- a/agent/agent_zookeeper.go +++ b/agent/agent_zookeeper.go @@ -83,9 +83,10 @@ func startZookeeper(fs *flags, t *transporterServer) error { return err } + // TODO: support r3.5.2 ipath := filepath.Join(fs.zkDataDir, "myid") - plog.Infof("writing Zookeeper myid file %d to %s", t.req.ZookeeperConfig.MyID, ipath) - if err := toFile(fmt.Sprintf("%d", t.req.ZookeeperConfig.MyID), ipath); err != nil { + plog.Infof("writing Zookeeper myid file %d to %s", t.req.Flag_Zookeeper_R3_4_9.MyID, ipath) + if err := toFile(fmt.Sprintf("%d", t.req.Flag_Zookeeper_R3_4_9.MyID), ipath); err != nil { return err } @@ -95,14 +96,14 @@ func startZookeeper(fs *flags, t *transporterServer) error { peers = append(peers, ZookeeperPeer{MyID: i + 1, IP: peerIPs[i]}) } cfg := ZookeeperConfig{ - TickTime: t.req.ZookeeperConfig.TickTime, + TickTime: t.req.Flag_Zookeeper_R3_4_9.TickTime, DataDir: fs.zkDataDir, - ClientPort: t.req.ZookeeperConfig.ClientPort, - InitLimit: t.req.ZookeeperConfig.InitLimit, - SyncLimit: t.req.ZookeeperConfig.SyncLimit, - MaxClientConnections: t.req.ZookeeperConfig.MaxClientConnections, + ClientPort: t.req.Flag_Zookeeper_R3_4_9.ClientPort, + InitLimit: t.req.Flag_Zookeeper_R3_4_9.InitLimit, + SyncLimit: t.req.Flag_Zookeeper_R3_4_9.SyncLimit, + MaxClientConnections: t.req.Flag_Zookeeper_R3_4_9.MaxClientConnections, Peers: peers, - SnapCount: t.req.ZookeeperConfig.SnapCount, + SnapCount: t.req.Flag_Zookeeper_R3_4_9.SnapCount, } tpl := template.Must(template.New("zkTemplate").Parse(zkTemplate)) buf := new(bytes.Buffer) diff --git a/agent/server.go b/agent/server.go index 32c7c55f..3d9b9b10 100644 --- a/agent/server.go +++ b/agent/server.go @@ -24,6 +24,7 @@ import ( "github.com/coreos/dbtester/dbtesterpb" "github.com/coreos/dbtester/pkg/fileinspect" + "github.com/gyuho/psn" "golang.org/x/net/context" ) @@ -78,7 +79,7 @@ func (t *transporterServer) Transfer(ctx context.Context, req *dbtesterpb.Reques plog.Infof("received gRPC request %q with database %q (clients: %d)", req.Operation, req.DatabaseID, req.CurrentClientNumber) } - if req.Operation == dbtesterpb.Request_Start { + if req.Operation == dbtesterpb.Operation_Start { f, err := openToAppend(globalFlags.databaseLog) if err != nil { return nil, err @@ -87,7 +88,7 @@ func (t *transporterServer) Transfer(ctx context.Context, req *dbtesterpb.Reques plog.Infof("agent log path: %q", globalFlags.agentLog) plog.Infof("database log path: %q", globalFlags.databaseLog) - if req.DatabaseID == dbtesterpb.Request_zetcd || req.DatabaseID == dbtesterpb.Request_cetcd { + if req.DatabaseID == dbtesterpb.DatabaseID_zetcd__beta || req.DatabaseID == dbtesterpb.DatabaseID_cetcd__beta { proxyLog := globalFlags.databaseLog + "-" + t.req.DatabaseID.String() pf, err := openToAppend(proxyLog) if err != nil { @@ -99,24 +100,24 @@ func (t *transporterServer) Transfer(ctx context.Context, req *dbtesterpb.Reques plog.Infof("system metrics CSV path: %q", globalFlags.systemMetricsCSV) switch req.DatabaseID { - case dbtesterpb.Request_zookeeper: + case dbtesterpb.DatabaseID_zookeeper__r3_4_9: plog.Infof("Zookeeper working directory: %q", globalFlags.zkWorkDir) plog.Infof("Zookeeper data directory: %q", globalFlags.zkDataDir) plog.Infof("Zookeeper configuration path: %q", globalFlags.zkConfig) - case dbtesterpb.Request_etcdv2, dbtesterpb.Request_etcdv3: + case dbtesterpb.DatabaseID_etcd__v2_3, dbtesterpb.DatabaseID_etcd__tip: plog.Infof("etcd executable binary path: %q", globalFlags.etcdExec) plog.Infof("etcd data directory: %q", globalFlags.etcdDataDir) - case dbtesterpb.Request_zetcd: + case dbtesterpb.DatabaseID_zetcd__beta: plog.Infof("zetcd executable binary path: %q", globalFlags.zetcdExec) plog.Infof("zetcd data directory: %q", globalFlags.etcdDataDir) - case dbtesterpb.Request_cetcd: + case dbtesterpb.DatabaseID_cetcd__beta: plog.Infof("cetcd executable binary path: %q", globalFlags.cetcdExec) plog.Infof("cetcd data directory: %q", globalFlags.etcdDataDir) - case dbtesterpb.Request_consul: + case dbtesterpb.DatabaseID_consul__v0_7_5: plog.Infof("Consul executable binary path: %q", globalFlags.consulExec) plog.Infof("Consul data directory: %q", globalFlags.consulDataDir) } @@ -124,21 +125,21 @@ func (t *transporterServer) Transfer(ctx context.Context, req *dbtesterpb.Reques // re-use configurations for next requests t.req = *req } - if req.Operation == dbtesterpb.Request_Heartbeat { + if req.Operation == dbtesterpb.Operation_Heartbeat { t.req.CurrentClientNumber = req.CurrentClientNumber } var diskSpaceUsageBytes int64 switch req.Operation { - case dbtesterpb.Request_Start: + case dbtesterpb.Operation_Start: switch t.req.DatabaseID { - case dbtesterpb.Request_etcdv2, dbtesterpb.Request_etcdv3, dbtesterpb.Request_zetcd, dbtesterpb.Request_cetcd: + case dbtesterpb.DatabaseID_etcd__v2_3, dbtesterpb.DatabaseID_etcd__tip, dbtesterpb.DatabaseID_zetcd__beta, dbtesterpb.DatabaseID_cetcd__beta: if err := startEtcd(&globalFlags, t); err != nil { plog.Errorf("startEtcd error %v", err) return nil, err } switch t.req.DatabaseID { - case dbtesterpb.Request_zetcd: + case dbtesterpb.DatabaseID_zetcd__beta: if err := startZetcd(&globalFlags, t); err != nil { plog.Errorf("startZetcd error %v", err) return nil, err @@ -151,7 +152,7 @@ func (t *transporterServer) Transfer(ctx context.Context, req *dbtesterpb.Reques } plog.Infof("exiting %q", t.proxyCmd.Path) }() - case dbtesterpb.Request_cetcd: + case dbtesterpb.DatabaseID_cetcd__beta: if err := startCetcd(&globalFlags, t); err != nil { plog.Errorf("startCetcd error %v", err) return nil, err @@ -165,12 +166,12 @@ func (t *transporterServer) Transfer(ctx context.Context, req *dbtesterpb.Reques plog.Infof("exiting %q", t.proxyCmd.Path) }() } - case dbtesterpb.Request_zookeeper: + case dbtesterpb.DatabaseID_zookeeper__r3_4_9: if err := startZookeeper(&globalFlags, t); err != nil { plog.Errorf("startZookeeper error %v", err) return nil, err } - case dbtesterpb.Request_consul: + case dbtesterpb.DatabaseID_consul__v0_7_5: if err := startConsul(&globalFlags, t); err != nil { plog.Errorf("startConsul error %v", err) return nil, err @@ -193,7 +194,7 @@ func (t *transporterServer) Transfer(ctx context.Context, req *dbtesterpb.Reques return nil, err } - case dbtesterpb.Request_Stop: + case dbtesterpb.Operation_Stop: if t.cmd == nil { return nil, fmt.Errorf("nil command") } @@ -253,7 +254,7 @@ func (t *transporterServer) Transfer(ctx context.Context, req *dbtesterpb.Reques } diskSpaceUsageBytes = dbs - case dbtesterpb.Request_Heartbeat: + case dbtesterpb.Operation_Heartbeat: plog.Infof("overwriting clients num %d to %q", t.req.CurrentClientNumber, t.clientNumPath) if err := toFile(fmt.Sprintf("%d", t.req.CurrentClientNumber), t.clientNumPath); err != nil { return nil, err @@ -267,19 +268,19 @@ func (t *transporterServer) Transfer(ctx context.Context, req *dbtesterpb.Reques return &dbtesterpb.Response{Success: true, DiskSpaceUsageBytes: diskSpaceUsageBytes}, nil } -func measureDatabasSize(flg flags, rdb dbtesterpb.Request_Database) (int64, error) { +func measureDatabasSize(flg flags, rdb dbtesterpb.DatabaseID) (int64, error) { switch rdb { - case dbtesterpb.Request_etcdv2: + case dbtesterpb.DatabaseID_etcd__v2_3: return fileinspect.Size(flg.etcdDataDir) - case dbtesterpb.Request_etcdv3: + case dbtesterpb.DatabaseID_etcd__tip: return fileinspect.Size(flg.etcdDataDir) - case dbtesterpb.Request_zookeeper: + case dbtesterpb.DatabaseID_zookeeper__r3_4_9: return fileinspect.Size(flg.zkDataDir) - case dbtesterpb.Request_consul: + case dbtesterpb.DatabaseID_consul__v0_7_5: return fileinspect.Size(flg.consulDataDir) - case dbtesterpb.Request_cetcd: + case dbtesterpb.DatabaseID_cetcd__beta: return fileinspect.Size(flg.etcdDataDir) - case dbtesterpb.Request_zetcd: + case dbtesterpb.DatabaseID_zetcd__beta: return fileinspect.Size(flg.etcdDataDir) default: return 0, fmt.Errorf("uknown %q", rdb) diff --git a/agent/upload_log.go b/agent/upload_log.go index 80cdc63d..00c7da3a 100644 --- a/agent/upload_log.go +++ b/agent/upload_log.go @@ -26,8 +26,8 @@ import ( // uploadLog starts cetcd. This assumes that etcd is already started. func uploadLog(fs *flags, t *transporterServer) error { - plog.Infof("stopped collecting metrics; uploading logs to storage %q", t.req.Control.GoogleCloudProjectName) - u, err := remotestorage.NewGoogleCloudStorage([]byte(t.req.Control.GoogleCloudStorageKey), t.req.Control.GoogleCloudProjectName) + plog.Infof("stopped collecting metrics; uploading logs to storage %q", t.req.ConfigClientMachineInitial.GoogleCloudProjectName) + u, err := remotestorage.NewGoogleCloudStorage([]byte(t.req.ConfigClientMachineInitial.GoogleCloudStorageKey), t.req.ConfigClientMachineInitial.GoogleCloudProjectName) if err != nil { return err } @@ -38,12 +38,12 @@ func uploadLog(fs *flags, t *transporterServer) error { srcDatabaseLogPath := fs.databaseLog dstDatabaseLogPath := filepath.Base(fs.databaseLog) if !strings.HasPrefix(filepath.Base(fs.databaseLog), t.req.DatabaseTag) { - dstDatabaseLogPath = fmt.Sprintf("%s-%d-%s", t.req.DatabaseTag, t.req.IpIndex+1, filepath.Base(fs.databaseLog)) + dstDatabaseLogPath = fmt.Sprintf("%s-%d-%s", t.req.DatabaseTag, t.req.IPIndex+1, filepath.Base(fs.databaseLog)) } - dstDatabaseLogPath = filepath.Join(t.req.Control.GoogleCloudStorageSubDirectory, dstDatabaseLogPath) + dstDatabaseLogPath = filepath.Join(t.req.ConfigClientMachineInitial.GoogleCloudStorageSubDirectory, dstDatabaseLogPath) plog.Infof("uploading database log [%q -> %q]", srcDatabaseLogPath, dstDatabaseLogPath) for k := 0; k < 30; k++ { - if uerr = u.UploadFile(t.req.Control.GoogleCloudStorageBucketName, srcDatabaseLogPath, dstDatabaseLogPath); uerr != nil { + if uerr = u.UploadFile(t.req.ConfigClientMachineInitial.GoogleCloudStorageBucketName, srcDatabaseLogPath, dstDatabaseLogPath); uerr != nil { plog.Warningf("UploadFile error... sleep and retry... (%v)", uerr) time.Sleep(2 * time.Second) continue @@ -57,17 +57,17 @@ func uploadLog(fs *flags, t *transporterServer) error { } { - if t.req.DatabaseID == dbtesterpb.Request_zetcd || t.req.DatabaseID == dbtesterpb.Request_cetcd { + if t.req.DatabaseID == dbtesterpb.DatabaseID_zetcd__beta || t.req.DatabaseID == dbtesterpb.DatabaseID_cetcd__beta { dpath := fs.databaseLog + "-" + t.req.DatabaseID.String() srcDatabaseLogPath2 := dpath dstDatabaseLogPath2 := filepath.Base(dpath) if !strings.HasPrefix(filepath.Base(dpath), t.req.DatabaseTag) { - dstDatabaseLogPath2 = fmt.Sprintf("%s-%d-%s", t.req.DatabaseTag, t.req.IpIndex+1, filepath.Base(dpath)) + dstDatabaseLogPath2 = fmt.Sprintf("%s-%d-%s", t.req.DatabaseTag, t.req.IPIndex+1, filepath.Base(dpath)) } - dstDatabaseLogPath2 = filepath.Join(t.req.Control.GoogleCloudStorageSubDirectory, dstDatabaseLogPath2) + dstDatabaseLogPath2 = filepath.Join(t.req.ConfigClientMachineInitial.GoogleCloudStorageSubDirectory, dstDatabaseLogPath2) plog.Infof("uploading proxy-database log [%q -> %q]", srcDatabaseLogPath2, dstDatabaseLogPath2) for k := 0; k < 30; k++ { - if uerr = u.UploadFile(t.req.Control.GoogleCloudStorageBucketName, srcDatabaseLogPath2, dstDatabaseLogPath2); uerr != nil { + if uerr = u.UploadFile(t.req.ConfigClientMachineInitial.GoogleCloudStorageBucketName, srcDatabaseLogPath2, dstDatabaseLogPath2); uerr != nil { plog.Warningf("UploadFile error... sleep and retry... (%v)", uerr) time.Sleep(2 * time.Second) continue @@ -85,12 +85,12 @@ func uploadLog(fs *flags, t *transporterServer) error { srcSysMetricsDataPath := fs.systemMetricsCSV dstSysMetricsDataPath := filepath.Base(fs.systemMetricsCSV) if !strings.HasPrefix(filepath.Base(fs.systemMetricsCSV), t.req.DatabaseTag) { - dstSysMetricsDataPath = fmt.Sprintf("%s-%d-%s", t.req.DatabaseTag, t.req.IpIndex+1, filepath.Base(fs.systemMetricsCSV)) + dstSysMetricsDataPath = fmt.Sprintf("%s-%d-%s", t.req.DatabaseTag, t.req.IPIndex+1, filepath.Base(fs.systemMetricsCSV)) } - dstSysMetricsDataPath = filepath.Join(t.req.Control.GoogleCloudStorageSubDirectory, dstSysMetricsDataPath) + dstSysMetricsDataPath = filepath.Join(t.req.ConfigClientMachineInitial.GoogleCloudStorageSubDirectory, dstSysMetricsDataPath) plog.Infof("uploading system metrics data [%q -> %q]", srcSysMetricsDataPath, dstSysMetricsDataPath) for k := 0; k < 30; k++ { - if uerr := u.UploadFile(t.req.Control.GoogleCloudStorageBucketName, srcSysMetricsDataPath, dstSysMetricsDataPath); uerr != nil { + if uerr := u.UploadFile(t.req.ConfigClientMachineInitial.GoogleCloudStorageBucketName, srcSysMetricsDataPath, dstSysMetricsDataPath); uerr != nil { plog.Warningf("upload error... sleep and retry... (%v)", uerr) time.Sleep(2 * time.Second) continue @@ -107,12 +107,12 @@ func uploadLog(fs *flags, t *transporterServer) error { srcSysMetricsInterpolatedDataPath := fs.systemMetricsCSVInterpolated dstSysMetricsInterpolatedDataPath := filepath.Base(fs.systemMetricsCSVInterpolated) if !strings.HasPrefix(filepath.Base(fs.systemMetricsCSVInterpolated), t.req.DatabaseTag) { - dstSysMetricsInterpolatedDataPath = fmt.Sprintf("%s-%d-%s", t.req.DatabaseTag, t.req.IpIndex+1, filepath.Base(fs.systemMetricsCSVInterpolated)) + dstSysMetricsInterpolatedDataPath = fmt.Sprintf("%s-%d-%s", t.req.DatabaseTag, t.req.IPIndex+1, filepath.Base(fs.systemMetricsCSVInterpolated)) } - dstSysMetricsInterpolatedDataPath = filepath.Join(t.req.Control.GoogleCloudStorageSubDirectory, dstSysMetricsInterpolatedDataPath) + dstSysMetricsInterpolatedDataPath = filepath.Join(t.req.ConfigClientMachineInitial.GoogleCloudStorageSubDirectory, dstSysMetricsInterpolatedDataPath) plog.Infof("uploading system metrics interpolated data [%q -> %q]", srcSysMetricsInterpolatedDataPath, dstSysMetricsInterpolatedDataPath) for k := 0; k < 30; k++ { - if uerr := u.UploadFile(t.req.Control.GoogleCloudStorageBucketName, srcSysMetricsInterpolatedDataPath, dstSysMetricsInterpolatedDataPath); uerr != nil { + if uerr := u.UploadFile(t.req.ConfigClientMachineInitial.GoogleCloudStorageBucketName, srcSysMetricsInterpolatedDataPath, dstSysMetricsInterpolatedDataPath); uerr != nil { plog.Warningf("upload error... sleep and retry... (%v)", uerr) time.Sleep(2 * time.Second) continue @@ -129,12 +129,12 @@ func uploadLog(fs *flags, t *transporterServer) error { srcAgentLogPath := fs.agentLog dstAgentLogPath := filepath.Base(fs.agentLog) if !strings.HasPrefix(filepath.Base(fs.agentLog), t.req.DatabaseTag) { - dstAgentLogPath = fmt.Sprintf("%s-%d-%s", t.req.DatabaseTag, t.req.IpIndex+1, filepath.Base(fs.agentLog)) + dstAgentLogPath = fmt.Sprintf("%s-%d-%s", t.req.DatabaseTag, t.req.IPIndex+1, filepath.Base(fs.agentLog)) } - dstAgentLogPath = filepath.Join(t.req.Control.GoogleCloudStorageSubDirectory, dstAgentLogPath) + dstAgentLogPath = filepath.Join(t.req.ConfigClientMachineInitial.GoogleCloudStorageSubDirectory, dstAgentLogPath) plog.Infof("uploading agent logs [%q -> %q]", srcAgentLogPath, dstAgentLogPath) for k := 0; k < 30; k++ { - if uerr := u.UploadFile(t.req.Control.GoogleCloudStorageBucketName, srcAgentLogPath, dstAgentLogPath); uerr != nil { + if uerr := u.UploadFile(t.req.ConfigClientMachineInitial.GoogleCloudStorageBucketName, srcAgentLogPath, dstAgentLogPath); uerr != nil { plog.Warningf("UploadFile error... sleep and retry... (%v)", uerr) time.Sleep(2 * time.Second) continue