agent: updates from proto updates

This commit is contained in:
Gyu-Ho Lee 2017-02-23 08:03:43 -08:00
parent b73c62d7fb
commit 338300df74
No known key found for this signature in database
GPG Key ID: 1DDD39C7EB70C24C
7 changed files with 67 additions and 65 deletions

View File

@ -34,8 +34,8 @@ func startCetcd(fs *flags, t *transporterServer) error {
flags := []string{
// "-consuladdr", "0.0.0.0:8500",
"-consuladdr", fmt.Sprintf("%s:8500", peerIPs[t.req.IpIndex]),
"-etcd", clientURLs[t.req.IpIndex], // etcd endpoint
"-consuladdr", fmt.Sprintf("%s:8500", peerIPs[t.req.IPIndex]),
"-etcd", clientURLs[t.req.IPIndex], // etcd endpoint
}
flagString := strings.Join(flags, " ")

View File

@ -34,14 +34,14 @@ func startConsul(fs *flags, t *transporterServer) error {
peerIPs := strings.Split(t.req.PeerIPsString, "___")
var flags []string
switch t.req.IpIndex {
switch t.req.IPIndex {
case 0: // leader
flags = []string{
"agent",
"-server",
"-data-dir", fs.consulDataDir,
"-bind", peerIPs[t.req.IpIndex],
"-client", peerIPs[t.req.IpIndex],
"-bind", peerIPs[t.req.IPIndex],
"-client", peerIPs[t.req.IPIndex],
"-bootstrap-expect", "3",
}
@ -50,8 +50,8 @@ func startConsul(fs *flags, t *transporterServer) error {
"agent",
"-server",
"-data-dir", fs.consulDataDir,
"-bind", peerIPs[t.req.IpIndex],
"-client", peerIPs[t.req.IpIndex],
"-bind", peerIPs[t.req.IPIndex],
"-client", peerIPs[t.req.IPIndex],
"-join", peerIPs[0],
}
}

View File

@ -44,23 +44,23 @@ func startEtcd(fs *flags, t *transporterServer) error {
members[i] = fmt.Sprintf("%s=%s", names[i], peerURLs[i])
}
qv := t.req.Etcdv3Config.QuotaSizeBytes
qv := t.req.Flag_Etcd_Tip.QuotaSizeBytes
if qv > 8000000000 {
plog.Warningf("maximum etcd quota is 8GB (got %d)... resetting to 8GB...", qv)
qv = 8000000000
}
flags := []string{
"--name", names[t.req.IpIndex],
"--name", names[t.req.IPIndex],
"--data-dir", fs.etcdDataDir,
"--quota-backend-bytes", fmt.Sprintf("%d", qv),
"--snapshot-count", fmt.Sprintf("%d", t.req.Etcdv3Config.SnapCount),
"--snapshot-count", fmt.Sprintf("%d", t.req.Flag_Etcd_Tip.SnapshotCount),
"--listen-client-urls", clientURLs[t.req.IpIndex],
"--advertise-client-urls", clientURLs[t.req.IpIndex],
"--listen-client-urls", clientURLs[t.req.IPIndex],
"--advertise-client-urls", clientURLs[t.req.IPIndex],
"--listen-peer-urls", peerURLs[t.req.IpIndex],
"--initial-advertise-peer-urls", peerURLs[t.req.IpIndex],
"--listen-peer-urls", peerURLs[t.req.IPIndex],
"--initial-advertise-peer-urls", peerURLs[t.req.IPIndex],
"--initial-cluster-token", "dbtester-etcd-token",
"--initial-cluster", strings.Join(members, ","),

View File

@ -34,8 +34,8 @@ func startZetcd(fs *flags, t *transporterServer) error {
flags := []string{
// "-zkaddr", "0.0.0.0:2181",
"-zkaddr", fmt.Sprintf("%s:2181", peerIPs[t.req.IpIndex]),
"-endpoint", clientURLs[t.req.IpIndex],
"-zkaddr", fmt.Sprintf("%s:2181", peerIPs[t.req.IPIndex]),
"-endpoint", clientURLs[t.req.IPIndex],
}
flagString := strings.Join(flags, " ")

View File

@ -83,9 +83,10 @@ func startZookeeper(fs *flags, t *transporterServer) error {
return err
}
// TODO: support r3.5.2
ipath := filepath.Join(fs.zkDataDir, "myid")
plog.Infof("writing Zookeeper myid file %d to %s", t.req.ZookeeperConfig.MyID, ipath)
if err := toFile(fmt.Sprintf("%d", t.req.ZookeeperConfig.MyID), ipath); err != nil {
plog.Infof("writing Zookeeper myid file %d to %s", t.req.Flag_Zookeeper_R3_4_9.MyID, ipath)
if err := toFile(fmt.Sprintf("%d", t.req.Flag_Zookeeper_R3_4_9.MyID), ipath); err != nil {
return err
}
@ -95,14 +96,14 @@ func startZookeeper(fs *flags, t *transporterServer) error {
peers = append(peers, ZookeeperPeer{MyID: i + 1, IP: peerIPs[i]})
}
cfg := ZookeeperConfig{
TickTime: t.req.ZookeeperConfig.TickTime,
TickTime: t.req.Flag_Zookeeper_R3_4_9.TickTime,
DataDir: fs.zkDataDir,
ClientPort: t.req.ZookeeperConfig.ClientPort,
InitLimit: t.req.ZookeeperConfig.InitLimit,
SyncLimit: t.req.ZookeeperConfig.SyncLimit,
MaxClientConnections: t.req.ZookeeperConfig.MaxClientConnections,
ClientPort: t.req.Flag_Zookeeper_R3_4_9.ClientPort,
InitLimit: t.req.Flag_Zookeeper_R3_4_9.InitLimit,
SyncLimit: t.req.Flag_Zookeeper_R3_4_9.SyncLimit,
MaxClientConnections: t.req.Flag_Zookeeper_R3_4_9.MaxClientConnections,
Peers: peers,
SnapCount: t.req.ZookeeperConfig.SnapCount,
SnapCount: t.req.Flag_Zookeeper_R3_4_9.SnapCount,
}
tpl := template.Must(template.New("zkTemplate").Parse(zkTemplate))
buf := new(bytes.Buffer)

View File

@ -24,6 +24,7 @@ import (
"github.com/coreos/dbtester/dbtesterpb"
"github.com/coreos/dbtester/pkg/fileinspect"
"github.com/gyuho/psn"
"golang.org/x/net/context"
)
@ -78,7 +79,7 @@ func (t *transporterServer) Transfer(ctx context.Context, req *dbtesterpb.Reques
plog.Infof("received gRPC request %q with database %q (clients: %d)", req.Operation, req.DatabaseID, req.CurrentClientNumber)
}
if req.Operation == dbtesterpb.Request_Start {
if req.Operation == dbtesterpb.Operation_Start {
f, err := openToAppend(globalFlags.databaseLog)
if err != nil {
return nil, err
@ -87,7 +88,7 @@ func (t *transporterServer) Transfer(ctx context.Context, req *dbtesterpb.Reques
plog.Infof("agent log path: %q", globalFlags.agentLog)
plog.Infof("database log path: %q", globalFlags.databaseLog)
if req.DatabaseID == dbtesterpb.Request_zetcd || req.DatabaseID == dbtesterpb.Request_cetcd {
if req.DatabaseID == dbtesterpb.DatabaseID_zetcd__beta || req.DatabaseID == dbtesterpb.DatabaseID_cetcd__beta {
proxyLog := globalFlags.databaseLog + "-" + t.req.DatabaseID.String()
pf, err := openToAppend(proxyLog)
if err != nil {
@ -99,24 +100,24 @@ func (t *transporterServer) Transfer(ctx context.Context, req *dbtesterpb.Reques
plog.Infof("system metrics CSV path: %q", globalFlags.systemMetricsCSV)
switch req.DatabaseID {
case dbtesterpb.Request_zookeeper:
case dbtesterpb.DatabaseID_zookeeper__r3_4_9:
plog.Infof("Zookeeper working directory: %q", globalFlags.zkWorkDir)
plog.Infof("Zookeeper data directory: %q", globalFlags.zkDataDir)
plog.Infof("Zookeeper configuration path: %q", globalFlags.zkConfig)
case dbtesterpb.Request_etcdv2, dbtesterpb.Request_etcdv3:
case dbtesterpb.DatabaseID_etcd__v2_3, dbtesterpb.DatabaseID_etcd__tip:
plog.Infof("etcd executable binary path: %q", globalFlags.etcdExec)
plog.Infof("etcd data directory: %q", globalFlags.etcdDataDir)
case dbtesterpb.Request_zetcd:
case dbtesterpb.DatabaseID_zetcd__beta:
plog.Infof("zetcd executable binary path: %q", globalFlags.zetcdExec)
plog.Infof("zetcd data directory: %q", globalFlags.etcdDataDir)
case dbtesterpb.Request_cetcd:
case dbtesterpb.DatabaseID_cetcd__beta:
plog.Infof("cetcd executable binary path: %q", globalFlags.cetcdExec)
plog.Infof("cetcd data directory: %q", globalFlags.etcdDataDir)
case dbtesterpb.Request_consul:
case dbtesterpb.DatabaseID_consul__v0_7_5:
plog.Infof("Consul executable binary path: %q", globalFlags.consulExec)
plog.Infof("Consul data directory: %q", globalFlags.consulDataDir)
}
@ -124,21 +125,21 @@ func (t *transporterServer) Transfer(ctx context.Context, req *dbtesterpb.Reques
// re-use configurations for next requests
t.req = *req
}
if req.Operation == dbtesterpb.Request_Heartbeat {
if req.Operation == dbtesterpb.Operation_Heartbeat {
t.req.CurrentClientNumber = req.CurrentClientNumber
}
var diskSpaceUsageBytes int64
switch req.Operation {
case dbtesterpb.Request_Start:
case dbtesterpb.Operation_Start:
switch t.req.DatabaseID {
case dbtesterpb.Request_etcdv2, dbtesterpb.Request_etcdv3, dbtesterpb.Request_zetcd, dbtesterpb.Request_cetcd:
case dbtesterpb.DatabaseID_etcd__v2_3, dbtesterpb.DatabaseID_etcd__tip, dbtesterpb.DatabaseID_zetcd__beta, dbtesterpb.DatabaseID_cetcd__beta:
if err := startEtcd(&globalFlags, t); err != nil {
plog.Errorf("startEtcd error %v", err)
return nil, err
}
switch t.req.DatabaseID {
case dbtesterpb.Request_zetcd:
case dbtesterpb.DatabaseID_zetcd__beta:
if err := startZetcd(&globalFlags, t); err != nil {
plog.Errorf("startZetcd error %v", err)
return nil, err
@ -151,7 +152,7 @@ func (t *transporterServer) Transfer(ctx context.Context, req *dbtesterpb.Reques
}
plog.Infof("exiting %q", t.proxyCmd.Path)
}()
case dbtesterpb.Request_cetcd:
case dbtesterpb.DatabaseID_cetcd__beta:
if err := startCetcd(&globalFlags, t); err != nil {
plog.Errorf("startCetcd error %v", err)
return nil, err
@ -165,12 +166,12 @@ func (t *transporterServer) Transfer(ctx context.Context, req *dbtesterpb.Reques
plog.Infof("exiting %q", t.proxyCmd.Path)
}()
}
case dbtesterpb.Request_zookeeper:
case dbtesterpb.DatabaseID_zookeeper__r3_4_9:
if err := startZookeeper(&globalFlags, t); err != nil {
plog.Errorf("startZookeeper error %v", err)
return nil, err
}
case dbtesterpb.Request_consul:
case dbtesterpb.DatabaseID_consul__v0_7_5:
if err := startConsul(&globalFlags, t); err != nil {
plog.Errorf("startConsul error %v", err)
return nil, err
@ -193,7 +194,7 @@ func (t *transporterServer) Transfer(ctx context.Context, req *dbtesterpb.Reques
return nil, err
}
case dbtesterpb.Request_Stop:
case dbtesterpb.Operation_Stop:
if t.cmd == nil {
return nil, fmt.Errorf("nil command")
}
@ -253,7 +254,7 @@ func (t *transporterServer) Transfer(ctx context.Context, req *dbtesterpb.Reques
}
diskSpaceUsageBytes = dbs
case dbtesterpb.Request_Heartbeat:
case dbtesterpb.Operation_Heartbeat:
plog.Infof("overwriting clients num %d to %q", t.req.CurrentClientNumber, t.clientNumPath)
if err := toFile(fmt.Sprintf("%d", t.req.CurrentClientNumber), t.clientNumPath); err != nil {
return nil, err
@ -267,19 +268,19 @@ func (t *transporterServer) Transfer(ctx context.Context, req *dbtesterpb.Reques
return &dbtesterpb.Response{Success: true, DiskSpaceUsageBytes: diskSpaceUsageBytes}, nil
}
func measureDatabasSize(flg flags, rdb dbtesterpb.Request_Database) (int64, error) {
func measureDatabasSize(flg flags, rdb dbtesterpb.DatabaseID) (int64, error) {
switch rdb {
case dbtesterpb.Request_etcdv2:
case dbtesterpb.DatabaseID_etcd__v2_3:
return fileinspect.Size(flg.etcdDataDir)
case dbtesterpb.Request_etcdv3:
case dbtesterpb.DatabaseID_etcd__tip:
return fileinspect.Size(flg.etcdDataDir)
case dbtesterpb.Request_zookeeper:
case dbtesterpb.DatabaseID_zookeeper__r3_4_9:
return fileinspect.Size(flg.zkDataDir)
case dbtesterpb.Request_consul:
case dbtesterpb.DatabaseID_consul__v0_7_5:
return fileinspect.Size(flg.consulDataDir)
case dbtesterpb.Request_cetcd:
case dbtesterpb.DatabaseID_cetcd__beta:
return fileinspect.Size(flg.etcdDataDir)
case dbtesterpb.Request_zetcd:
case dbtesterpb.DatabaseID_zetcd__beta:
return fileinspect.Size(flg.etcdDataDir)
default:
return 0, fmt.Errorf("uknown %q", rdb)

View File

@ -26,8 +26,8 @@ import (
// uploadLog starts cetcd. This assumes that etcd is already started.
func uploadLog(fs *flags, t *transporterServer) error {
plog.Infof("stopped collecting metrics; uploading logs to storage %q", t.req.Control.GoogleCloudProjectName)
u, err := remotestorage.NewGoogleCloudStorage([]byte(t.req.Control.GoogleCloudStorageKey), t.req.Control.GoogleCloudProjectName)
plog.Infof("stopped collecting metrics; uploading logs to storage %q", t.req.ConfigClientMachineInitial.GoogleCloudProjectName)
u, err := remotestorage.NewGoogleCloudStorage([]byte(t.req.ConfigClientMachineInitial.GoogleCloudStorageKey), t.req.ConfigClientMachineInitial.GoogleCloudProjectName)
if err != nil {
return err
}
@ -38,12 +38,12 @@ func uploadLog(fs *flags, t *transporterServer) error {
srcDatabaseLogPath := fs.databaseLog
dstDatabaseLogPath := filepath.Base(fs.databaseLog)
if !strings.HasPrefix(filepath.Base(fs.databaseLog), t.req.DatabaseTag) {
dstDatabaseLogPath = fmt.Sprintf("%s-%d-%s", t.req.DatabaseTag, t.req.IpIndex+1, filepath.Base(fs.databaseLog))
dstDatabaseLogPath = fmt.Sprintf("%s-%d-%s", t.req.DatabaseTag, t.req.IPIndex+1, filepath.Base(fs.databaseLog))
}
dstDatabaseLogPath = filepath.Join(t.req.Control.GoogleCloudStorageSubDirectory, dstDatabaseLogPath)
dstDatabaseLogPath = filepath.Join(t.req.ConfigClientMachineInitial.GoogleCloudStorageSubDirectory, dstDatabaseLogPath)
plog.Infof("uploading database log [%q -> %q]", srcDatabaseLogPath, dstDatabaseLogPath)
for k := 0; k < 30; k++ {
if uerr = u.UploadFile(t.req.Control.GoogleCloudStorageBucketName, srcDatabaseLogPath, dstDatabaseLogPath); uerr != nil {
if uerr = u.UploadFile(t.req.ConfigClientMachineInitial.GoogleCloudStorageBucketName, srcDatabaseLogPath, dstDatabaseLogPath); uerr != nil {
plog.Warningf("UploadFile error... sleep and retry... (%v)", uerr)
time.Sleep(2 * time.Second)
continue
@ -57,17 +57,17 @@ func uploadLog(fs *flags, t *transporterServer) error {
}
{
if t.req.DatabaseID == dbtesterpb.Request_zetcd || t.req.DatabaseID == dbtesterpb.Request_cetcd {
if t.req.DatabaseID == dbtesterpb.DatabaseID_zetcd__beta || t.req.DatabaseID == dbtesterpb.DatabaseID_cetcd__beta {
dpath := fs.databaseLog + "-" + t.req.DatabaseID.String()
srcDatabaseLogPath2 := dpath
dstDatabaseLogPath2 := filepath.Base(dpath)
if !strings.HasPrefix(filepath.Base(dpath), t.req.DatabaseTag) {
dstDatabaseLogPath2 = fmt.Sprintf("%s-%d-%s", t.req.DatabaseTag, t.req.IpIndex+1, filepath.Base(dpath))
dstDatabaseLogPath2 = fmt.Sprintf("%s-%d-%s", t.req.DatabaseTag, t.req.IPIndex+1, filepath.Base(dpath))
}
dstDatabaseLogPath2 = filepath.Join(t.req.Control.GoogleCloudStorageSubDirectory, dstDatabaseLogPath2)
dstDatabaseLogPath2 = filepath.Join(t.req.ConfigClientMachineInitial.GoogleCloudStorageSubDirectory, dstDatabaseLogPath2)
plog.Infof("uploading proxy-database log [%q -> %q]", srcDatabaseLogPath2, dstDatabaseLogPath2)
for k := 0; k < 30; k++ {
if uerr = u.UploadFile(t.req.Control.GoogleCloudStorageBucketName, srcDatabaseLogPath2, dstDatabaseLogPath2); uerr != nil {
if uerr = u.UploadFile(t.req.ConfigClientMachineInitial.GoogleCloudStorageBucketName, srcDatabaseLogPath2, dstDatabaseLogPath2); uerr != nil {
plog.Warningf("UploadFile error... sleep and retry... (%v)", uerr)
time.Sleep(2 * time.Second)
continue
@ -85,12 +85,12 @@ func uploadLog(fs *flags, t *transporterServer) error {
srcSysMetricsDataPath := fs.systemMetricsCSV
dstSysMetricsDataPath := filepath.Base(fs.systemMetricsCSV)
if !strings.HasPrefix(filepath.Base(fs.systemMetricsCSV), t.req.DatabaseTag) {
dstSysMetricsDataPath = fmt.Sprintf("%s-%d-%s", t.req.DatabaseTag, t.req.IpIndex+1, filepath.Base(fs.systemMetricsCSV))
dstSysMetricsDataPath = fmt.Sprintf("%s-%d-%s", t.req.DatabaseTag, t.req.IPIndex+1, filepath.Base(fs.systemMetricsCSV))
}
dstSysMetricsDataPath = filepath.Join(t.req.Control.GoogleCloudStorageSubDirectory, dstSysMetricsDataPath)
dstSysMetricsDataPath = filepath.Join(t.req.ConfigClientMachineInitial.GoogleCloudStorageSubDirectory, dstSysMetricsDataPath)
plog.Infof("uploading system metrics data [%q -> %q]", srcSysMetricsDataPath, dstSysMetricsDataPath)
for k := 0; k < 30; k++ {
if uerr := u.UploadFile(t.req.Control.GoogleCloudStorageBucketName, srcSysMetricsDataPath, dstSysMetricsDataPath); uerr != nil {
if uerr := u.UploadFile(t.req.ConfigClientMachineInitial.GoogleCloudStorageBucketName, srcSysMetricsDataPath, dstSysMetricsDataPath); uerr != nil {
plog.Warningf("upload error... sleep and retry... (%v)", uerr)
time.Sleep(2 * time.Second)
continue
@ -107,12 +107,12 @@ func uploadLog(fs *flags, t *transporterServer) error {
srcSysMetricsInterpolatedDataPath := fs.systemMetricsCSVInterpolated
dstSysMetricsInterpolatedDataPath := filepath.Base(fs.systemMetricsCSVInterpolated)
if !strings.HasPrefix(filepath.Base(fs.systemMetricsCSVInterpolated), t.req.DatabaseTag) {
dstSysMetricsInterpolatedDataPath = fmt.Sprintf("%s-%d-%s", t.req.DatabaseTag, t.req.IpIndex+1, filepath.Base(fs.systemMetricsCSVInterpolated))
dstSysMetricsInterpolatedDataPath = fmt.Sprintf("%s-%d-%s", t.req.DatabaseTag, t.req.IPIndex+1, filepath.Base(fs.systemMetricsCSVInterpolated))
}
dstSysMetricsInterpolatedDataPath = filepath.Join(t.req.Control.GoogleCloudStorageSubDirectory, dstSysMetricsInterpolatedDataPath)
dstSysMetricsInterpolatedDataPath = filepath.Join(t.req.ConfigClientMachineInitial.GoogleCloudStorageSubDirectory, dstSysMetricsInterpolatedDataPath)
plog.Infof("uploading system metrics interpolated data [%q -> %q]", srcSysMetricsInterpolatedDataPath, dstSysMetricsInterpolatedDataPath)
for k := 0; k < 30; k++ {
if uerr := u.UploadFile(t.req.Control.GoogleCloudStorageBucketName, srcSysMetricsInterpolatedDataPath, dstSysMetricsInterpolatedDataPath); uerr != nil {
if uerr := u.UploadFile(t.req.ConfigClientMachineInitial.GoogleCloudStorageBucketName, srcSysMetricsInterpolatedDataPath, dstSysMetricsInterpolatedDataPath); uerr != nil {
plog.Warningf("upload error... sleep and retry... (%v)", uerr)
time.Sleep(2 * time.Second)
continue
@ -129,12 +129,12 @@ func uploadLog(fs *flags, t *transporterServer) error {
srcAgentLogPath := fs.agentLog
dstAgentLogPath := filepath.Base(fs.agentLog)
if !strings.HasPrefix(filepath.Base(fs.agentLog), t.req.DatabaseTag) {
dstAgentLogPath = fmt.Sprintf("%s-%d-%s", t.req.DatabaseTag, t.req.IpIndex+1, filepath.Base(fs.agentLog))
dstAgentLogPath = fmt.Sprintf("%s-%d-%s", t.req.DatabaseTag, t.req.IPIndex+1, filepath.Base(fs.agentLog))
}
dstAgentLogPath = filepath.Join(t.req.Control.GoogleCloudStorageSubDirectory, dstAgentLogPath)
dstAgentLogPath = filepath.Join(t.req.ConfigClientMachineInitial.GoogleCloudStorageSubDirectory, dstAgentLogPath)
plog.Infof("uploading agent logs [%q -> %q]", srcAgentLogPath, dstAgentLogPath)
for k := 0; k < 30; k++ {
if uerr := u.UploadFile(t.req.Control.GoogleCloudStorageBucketName, srcAgentLogPath, dstAgentLogPath); uerr != nil {
if uerr := u.UploadFile(t.req.ConfigClientMachineInitial.GoogleCloudStorageBucketName, srcAgentLogPath, dstAgentLogPath); uerr != nil {
plog.Warningf("UploadFile error... sleep and retry... (%v)", uerr)
time.Sleep(2 * time.Second)
continue