agent: reuse previous start request

This commit is contained in:
Gyu-Ho Lee 2016-03-22 00:41:32 -07:00
parent 4b2d090ad1
commit b345242fcb
1 changed files with 41 additions and 46 deletions

View File

@ -170,14 +170,6 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
if !filepath.HasPrefix(zkDataDir, globalFlags.WorkingDirectory) {
zkDataDir = filepath.Join(globalFlags.WorkingDirectory, zkDataDir)
}
if r.LogPrefix != "" {
if !strings.HasPrefix(filepath.Base(r.DatabaseLogPath), r.LogPrefix) {
r.DatabaseLogPath = filepath.Join(filepath.Dir(r.DatabaseLogPath), r.LogPrefix+"_"+filepath.Base(r.DatabaseLogPath))
}
if !strings.HasPrefix(filepath.Base(r.MonitorResultPath), r.LogPrefix) {
r.MonitorResultPath = filepath.Join(filepath.Dir(r.MonitorResultPath), r.LogPrefix+"_"+filepath.Base(r.MonitorResultPath))
}
}
if !filepath.HasPrefix(r.DatabaseLogPath, globalFlags.WorkingDirectory) {
r.DatabaseLogPath = filepath.Join(globalFlags.WorkingDirectory, r.DatabaseLogPath)
}
@ -192,12 +184,20 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
log.Printf("database log path: %s", r.DatabaseLogPath)
log.Printf("monitor result path: %s", r.MonitorResultPath)
}
t.req = *r
if r.Operation == Request_Start {
t.req = *r
}
if t.req.GoogleCloudStorageJSONKey != "" {
if err := toFile(t.req.GoogleCloudStorageJSONKey, filepath.Join(globalFlags.WorkingDirectory, "key.json")); err != nil {
return nil, err
}
}
var processPID int
switch r.Operation {
case Request_Start:
switch r.Database {
switch t.req.Database {
case Request_etcd:
_, err := os.Stat(etcdBinaryPath)
if err != nil {
@ -208,7 +208,7 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
return nil, err
}
f, err := openToAppend(r.DatabaseLogPath)
f, err := openToAppend(t.req.DatabaseLogPath)
if err != nil {
return nil, err
}
@ -228,14 +228,14 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
}
clusterStr := strings.Join(members, ",")
flags := []string{
"--name", fmt.Sprintf("etcd-%d", r.EtcdServerIndex),
"--name", fmt.Sprintf("etcd-%d", t.req.EtcdServerIndex),
"--data-dir", etcdDataDir,
"--listen-client-urls", clientURLs[r.EtcdServerIndex],
"--advertise-client-urls", clientURLs[r.EtcdServerIndex],
"--listen-client-urls", clientURLs[t.req.EtcdServerIndex],
"--advertise-client-urls", clientURLs[t.req.EtcdServerIndex],
"--listen-peer-urls", peerURLs[r.EtcdServerIndex],
"--initial-advertise-peer-urls", peerURLs[r.EtcdServerIndex],
"--listen-peer-urls", peerURLs[t.req.EtcdServerIndex],
"--initial-advertise-peer-urls", peerURLs[t.req.EtcdServerIndex],
"--initial-cluster-token", etcdToken,
"--initial-cluster", clusterStr,
@ -281,8 +281,8 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
}
idFilePath := filepath.Join(zkDataDir, "myid")
log.Printf("Writing %d to %s", r.ZookeeperMyID, idFilePath)
if err := toFile(fmt.Sprintf("%d", r.ZookeeperMyID), idFilePath); err != nil {
log.Printf("Writing %d to %s", t.req.ZookeeperMyID, idFilePath)
if err := toFile(fmt.Sprintf("%d", t.req.ZookeeperMyID), idFilePath); err != nil {
return nil, err
}
@ -294,8 +294,8 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
peers = append(peers, ZookeeperPeer{MyID: i + 1, IP: peerIPs[i]})
}
zkCfg.Peers = peers
zkCfg.PreAllocSize = r.ZookeeperPreAllocSize
zkCfg.MaxClientCnxns = r.ZookeeperMaxClientCnxns
zkCfg.PreAllocSize = t.req.ZookeeperPreAllocSize
zkCfg.MaxClientCnxns = t.req.ZookeeperMaxClientCnxns
tpl := template.Must(template.New("zkTemplate").Parse(zkTemplate))
buf := new(bytes.Buffer)
if err := tpl.Execute(buf, zkCfg); err != nil {
@ -309,7 +309,7 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
return nil, err
}
f, err := openToAppend(r.DatabaseLogPath)
f, err := openToAppend(t.req.DatabaseLogPath)
if err != nil {
return nil, err
}
@ -354,7 +354,7 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
}
}
f, err := openToAppend(r.DatabaseLogPath)
f, err := openToAppend(t.req.DatabaseLogPath)
if err != nil {
return nil, err
}
@ -409,7 +409,7 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
return err
}
f, err := openToAppend(r.MonitorResultPath)
f, err := openToAppend(t.req.MonitorResultPath)
if err != nil {
return err
}
@ -418,7 +418,7 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
return ps.WriteToCSV(f, pss...)
}
log.Printf("%s monitor saved at %s", r.Database, r.MonitorResultPath)
log.Printf("%s monitor saved at %s", t.req.Database, t.req.MonitorResultPath)
var err error
if err = rFunc(); err != nil {
log.Warningln("error:", err)
@ -438,48 +438,43 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
return
case <-databaseStopped:
log.Println("Monitoring stopped. Uploading data to cloud storage...")
if err := toFile(r.GoogleCloudStorageJSONKey, filepath.Join(globalFlags.WorkingDirectory, "key.json")); err != nil {
log.Warnf("error (%v)", err)
return
}
u, err := remotestorage.NewGoogleCloudStorage([]byte(r.GoogleCloudStorageJSONKey), r.GoogleCloudProjectName)
u, err := remotestorage.NewGoogleCloudStorage([]byte(t.req.GoogleCloudStorageJSONKey), t.req.GoogleCloudProjectName)
if err != nil {
log.Warnf("error (%v)", err)
return
}
// set up file names
srcDatabaseLogPath := r.DatabaseLogPath
dstDatabaseLogPath := filepath.Base(r.DatabaseLogPath)
if !strings.HasPrefix(filepath.Base(r.DatabaseLogPath), r.LogPrefix) {
dstDatabaseLogPath = r.LogPrefix + fmt.Sprintf("_%d_", r.EtcdServerIndex) + filepath.Base(r.DatabaseLogPath)
srcDatabaseLogPath := t.req.DatabaseLogPath
dstDatabaseLogPath := filepath.Base(t.req.DatabaseLogPath)
if !strings.HasPrefix(filepath.Base(t.req.DatabaseLogPath), t.req.LogPrefix) {
dstDatabaseLogPath = fmt.Sprintf("%s-%d-%s", t.req.LogPrefix, t.req.EtcdServerIndex+1, filepath.Base(t.req.DatabaseLogPath))
}
srcMonitorResultPath := r.MonitorResultPath
dstMonitorResultPath := filepath.Base(r.MonitorResultPath)
if !strings.HasPrefix(filepath.Base(r.MonitorResultPath), r.LogPrefix) {
dstMonitorResultPath = r.LogPrefix + fmt.Sprintf("_%d_", r.EtcdServerIndex) + filepath.Base(r.MonitorResultPath)
srcMonitorResultPath := t.req.MonitorResultPath
dstMonitorResultPath := filepath.Base(t.req.MonitorResultPath)
if !strings.HasPrefix(filepath.Base(t.req.MonitorResultPath), t.req.LogPrefix) {
dstMonitorResultPath = fmt.Sprintf("%s-%d-%s", t.req.LogPrefix, t.req.EtcdServerIndex+1, filepath.Base(t.req.MonitorResultPath))
}
srcAgentLogPath := agentLogPath
dstAgentLogPath := filepath.Base(agentLogPath)
if !strings.HasPrefix(filepath.Base(agentLogPath), r.LogPrefix) {
dstAgentLogPath = r.LogPrefix + fmt.Sprintf("_%d_", r.EtcdServerIndex) + filepath.Base(agentLogPath)
if !strings.HasPrefix(filepath.Base(agentLogPath), t.req.LogPrefix) {
dstAgentLogPath = fmt.Sprintf("%s-%d-%s", t.req.LogPrefix, t.req.EtcdServerIndex+1, filepath.Base(agentLogPath))
}
log.Printf("Uploading %s", srcDatabaseLogPath)
if err := u.UploadFile(r.GoogleCloudStorageBucketName, srcDatabaseLogPath, dstDatabaseLogPath); err != nil {
log.Printf("Uploading %s to %s", srcDatabaseLogPath, dstDatabaseLogPath)
if err := u.UploadFile(t.req.GoogleCloudStorageBucketName, srcDatabaseLogPath, dstDatabaseLogPath); err != nil {
log.Fatal(err)
}
log.Printf("Uploading %s", srcMonitorResultPath)
if err := u.UploadFile(r.GoogleCloudStorageBucketName, srcMonitorResultPath, dstMonitorResultPath); err != nil {
log.Printf("Uploading %s to %s", srcMonitorResultPath, dstMonitorResultPath)
if err := u.UploadFile(t.req.GoogleCloudStorageBucketName, srcMonitorResultPath, dstMonitorResultPath); err != nil {
log.Fatal(err)
}
log.Printf("Uploading %s", srcAgentLogPath)
if err := u.UploadFile(r.GoogleCloudStorageBucketName, srcAgentLogPath, dstAgentLogPath); err != nil {
log.Printf("Uploading %s", srcAgentLogPath, dstAgentLogPath)
if err := u.UploadFile(t.req.GoogleCloudStorageBucketName, srcAgentLogPath, dstAgentLogPath); err != nil {
log.Fatal(err)
}