diff --git a/agent/agent.go b/agent/agent.go index a8184bf2..0964d78a 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -17,7 +17,7 @@ package agent import ( "bytes" "fmt" - "log" + "io/ioutil" "net" "os" "os/exec" @@ -28,24 +28,19 @@ import ( "text/template" "time" + log "github.com/Sirupsen/logrus" "github.com/gyuho/psn/ps" "github.com/spf13/cobra" "golang.org/x/net/context" + "golang.org/x/oauth2/google" + "google.golang.org/cloud" + "google.golang.org/cloud/storage" "google.golang.org/grpc" ) type ( Flags struct { - GRPCPort string - WorkingDir string - - DatabaseLogPath string - - Monitor bool - MonitorInterval time.Duration - MonitorResultPath string - - StorageSecretKeyPath string + GRPCPort string } // ZookeeperConfig is zookeeper configuration. @@ -69,6 +64,8 @@ type ( var ( shell = os.Getenv("SHELL") + agentLogPath = filepath.Join(homeDir(), "agent.log") + etcdBinaryPath = filepath.Join(os.Getenv("GOPATH"), "bin/etcd") etcdToken = "etcd_token" @@ -109,48 +106,24 @@ maxClientCnxns={{.MaxClientCnxns}} ) func init() { + log.SetFormatter(&log.JSONFormatter{}) + log.SetLevel(log.InfoLevel) + if len(shell) == 0 { shell = "sh" } Command.PersistentFlags().StringVar(&globalFlags.GRPCPort, "agent-port", ":3500", "Port to server agent gRPC server.") - Command.PersistentFlags().StringVar(&globalFlags.WorkingDir, "working-directory", homeDir(), "Working directory to store data.") - - Command.PersistentFlags().StringVar(&globalFlags.DatabaseLogPath, "database-log-path", "database.log", "Path to save database log.") - - Command.PersistentFlags().BoolVar(&globalFlags.Monitor, "monitor", false, "Periodically records resource usage.") - Command.PersistentFlags().DurationVar(&globalFlags.MonitorInterval, "monitor-interval", time.Second, "Resource monitor interval.") - Command.PersistentFlags().StringVar(&globalFlags.MonitorResultPath, "monitor-result-path", "monitor.csv", "File path to store monitor results.") - - Command.PersistentFlags().StringVar(&globalFlags.StorageSecretKeyPath, "storage-secret-key-path", "", "Key to use for uploading logs and data.") } func CommandFunc(cmd *cobra.Command, args []string) { - if !filepath.HasPrefix(globalFlags.DatabaseLogPath, globalFlags.WorkingDir) { - globalFlags.DatabaseLogPath = filepath.Join(globalFlags.WorkingDir, globalFlags.DatabaseLogPath) + f, err := openToAppend(agentLogPath) + if err != nil { + log.Println(err) + os.Exit(-1) } - if !filepath.HasPrefix(etcdDataDir, globalFlags.WorkingDir) { - etcdDataDir = filepath.Join(globalFlags.WorkingDir, etcdDataDir) - } - if !filepath.HasPrefix(zkWorkingDir, globalFlags.WorkingDir) { - zkWorkingDir = filepath.Join(globalFlags.WorkingDir, zkWorkingDir) - } - if !filepath.HasPrefix(zkDataDir, globalFlags.WorkingDir) { - zkDataDir = filepath.Join(globalFlags.WorkingDir, zkDataDir) - } - if !filepath.HasPrefix(globalFlags.MonitorResultPath, globalFlags.WorkingDir) { - globalFlags.MonitorResultPath = filepath.Join(globalFlags.WorkingDir, globalFlags.MonitorResultPath) - } - if !filepath.HasPrefix(globalFlags.StorageSecretKeyPath, globalFlags.WorkingDir) { - globalFlags.StorageSecretKeyPath = filepath.Join(globalFlags.WorkingDir, globalFlags.StorageSecretKeyPath) - } - - log.Printf("gRPC has started serving at %s\n", globalFlags.GRPCPort) - log.Printf("Database log path: %s\n", globalFlags.DatabaseLogPath) - log.Printf("etcd data directory: %s\n", etcdDataDir) - log.Printf("Zookeeper working directory: %s\n", zkWorkingDir) - log.Printf("Zookeeper data directory: %s\n", zkDataDir) - log.Printf("Monitor result path: %s\n", globalFlags.MonitorResultPath) - log.Printf("Storage key secret path: %s\n", globalFlags.StorageSecretKeyPath) // TODO: use this to upload to Google cloud storage + defer f.Close() + log.SetOutput(f) + log.Printf("gRPC serving: %s\n", globalFlags.GRPCPort) var ( grpcServer = grpc.NewServer() @@ -161,12 +134,9 @@ func CommandFunc(cmd *cobra.Command, args []string) { log.Println(err) os.Exit(-1) } + RegisterTransporterServer(grpcServer, sender) - log.Printf("gRPC is now serving at %s\n", globalFlags.GRPCPort) - if globalFlags.Monitor { - log.Printf("As soon as database started, it will monitor every %v...\n", globalFlags.MonitorInterval) - } if err := grpcServer.Serve(ln); err != nil { log.Println(err) os.Exit(-1) @@ -183,8 +153,47 @@ type transporterServer struct { // satisfy TransporterServer var databaseStopped = make(chan struct{}) func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response, error) { - log.Printf("Received message for peer %q", r.PeerIPs) + log.Printf("Message from %q", r.PeerIPs) peerIPs := strings.Split(r.PeerIPs, "___") + + if r.Operation == Request_Start || r.Operation == Request_Restart { + if r.WorkingDirectory == "" { + r.WorkingDirectory = homeDir() + } + if !exist(r.WorkingDirectory) { + return nil, fmt.Errorf("%s does not exist", r.WorkingDirectory) + } + if !filepath.HasPrefix(etcdDataDir, r.WorkingDirectory) { + etcdDataDir = filepath.Join(r.WorkingDirectory, etcdDataDir) + } + if !filepath.HasPrefix(zkWorkingDir, r.WorkingDirectory) { + zkWorkingDir = filepath.Join(r.WorkingDirectory, zkWorkingDir) + } + if !filepath.HasPrefix(zkDataDir, r.WorkingDirectory) { + zkDataDir = filepath.Join(r.WorkingDirectory, zkDataDir) + } + if r.LogPrefix != "" { + if !strings.HasPrefix(filepath.Base(r.DatabaseLogPath), r.LogPrefix) { + r.DatabaseLogPath = filepath.Join(filepath.Dir(r.DatabaseLogPath), r.LogPrefix+"_"+filepath.Base(r.DatabaseLogPath)) + } + if !strings.HasPrefix(filepath.Base(r.MonitorResultPath), r.LogPrefix) { + r.MonitorResultPath = filepath.Join(filepath.Dir(r.MonitorResultPath), r.LogPrefix+"_"+filepath.Base(r.MonitorResultPath)) + } + } else { + if !filepath.HasPrefix(r.DatabaseLogPath, r.WorkingDirectory) { + r.DatabaseLogPath = filepath.Join(r.WorkingDirectory, r.DatabaseLogPath) + } + if !filepath.HasPrefix(r.MonitorResultPath, r.WorkingDirectory) { + r.MonitorResultPath = filepath.Join(r.WorkingDirectory, r.MonitorResultPath) + } + } + log.Printf("Working directory: %s\n", r.WorkingDirectory) + log.Printf("etcd data directory: %s\n", etcdDataDir) + log.Printf("Zookeeper working directory: %s\n", zkWorkingDir) + log.Printf("Zookeeper data directory: %s\n", zkDataDir) + log.Printf("Database log path: %s\n", r.DatabaseLogPath) + log.Printf("Monitor result path: %s\n", r.MonitorResultPath) + } t.req = *r var processPID int @@ -201,7 +210,7 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response return nil, err } - f, err := openToAppend(globalFlags.DatabaseLogPath) + f, err := openToAppend(r.DatabaseLogPath) if err != nil { return nil, err } @@ -242,7 +251,6 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response flagString := strings.Join(flags, " ") cmd := exec.Command(etcdBinaryPath, flags...) - cmd.Stdin = nil cmd.Stdout = f cmd.Stderr = f log.Printf("Starting: %s %s\n", cmd.Path, flagString) @@ -306,7 +314,7 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response return nil, err } - f, err := openToAppend(globalFlags.DatabaseLogPath) + f, err := openToAppend(r.DatabaseLogPath) if err != nil { return nil, err } @@ -317,7 +325,6 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response args := []string{shell, "-c", "/usr/bin/java " + flagString + " " + configFilePath} cmd := exec.Command(args[0], args[1:]...) - cmd.Stdin = nil cmd.Stdout = f cmd.Stderr = f log.Printf("Starting: %s %s\n", cmd.Path, strings.Join(args[1:], " ")) @@ -352,14 +359,13 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response } } - f, err := openToAppend(globalFlags.DatabaseLogPath) + f, err := openToAppend(r.DatabaseLogPath) if err != nil { return nil, err } t.logfile = f cmd := exec.Command(t.cmd.Path, t.cmd.Args[1:]...) - cmd.Stdin = nil cmd.Stdout = f cmd.Stderr = f log.Printf("Restarting: %s\n", strings.Join(t.cmd.Args, " ")) @@ -395,7 +401,7 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response return nil, fmt.Errorf("Not implemented %v", r.Operation) } - if globalFlags.Monitor && r.Operation == Request_Start { + if r.Operation == Request_Start || r.Operation == Request_Restart { go func(processPID int) { notifier := make(chan os.Signal, 1) signal.Notify(notifier, syscall.SIGINT, syscall.SIGTERM) @@ -406,7 +412,7 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response return err } - f, err := openToAppend(globalFlags.MonitorResultPath) + f, err := openToAppend(r.MonitorResultPath) if err != nil { return err } @@ -415,25 +421,126 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response return ps.WriteToCSV(f, pss...) } - log.Printf("%s monitor saved at %s\n", r.Database, globalFlags.MonitorResultPath) + log.Printf("%s monitor saved at %s\n", r.Database, r.MonitorResultPath) var err error if err = rFunc(); err != nil { - log.Println("error:", err) + log.Warningln("error:", err) + return } escape: for { select { - case <-time.After(globalFlags.MonitorInterval): + case <-time.After(time.Second): if err = rFunc(); err != nil { - log.Printf("Monitoring error %v\n", err) + log.Warnf("Monitoring error %v\n", err) break escape } case sig := <-notifier: log.Printf("Received %v\n", sig) return case <-databaseStopped: - log.Println("Monitoring stopped") + log.Println("Monitoring stopped. Uploading data to cloud storage...") + + // initialize auth + conf, err := google.JWTConfigFromJSON( + []byte(r.GoogleCloudStorageJSONKey), + storage.ScopeFullControl, + ) + if err != nil { + log.Warnf("error (%v) with\n\n%q\n\n", err, r.GoogleCloudStorageJSONKey) + return + } + ctx := context.Background() + aclient, err := storage.NewAdminClient(ctx, r.GoogleCloudProjectName, cloud.WithTokenSource(conf.TokenSource(ctx))) + if err != nil { + log.Warnf("error (%v) with %q\n", err, r.GoogleCloudProjectName) + } + defer aclient.Close() + + if err := aclient.CreateBucket(context.Background(), r.GoogleCloudStorageBucketName, nil); err != nil { + if !strings.Contains(err.Error(), "You already own this bucket. Please select another name") { + log.Warnf("error (%v) with %q\n", err, r.GoogleCloudStorageBucketName) + } + } + + sctx := context.Background() + sclient, err := storage.NewClient(sctx, cloud.WithTokenSource(conf.TokenSource(sctx))) + if err != nil { + log.Warnf("error (%v)\n", err) + return + } + defer sclient.Close() + + // set up file names + srcDatabaseLogPath := r.DatabaseLogPath + dstDatabaseLogPath := filepath.Base(r.DatabaseLogPath) + if !strings.HasPrefix(filepath.Base(r.DatabaseLogPath), r.LogPrefix) { + dstDatabaseLogPath = r.LogPrefix + fmt.Sprintf("_%d_", r.EtcdServerIndex) + filepath.Base(r.DatabaseLogPath) + } + + srcMonitorResultPath := r.MonitorResultPath + dstMonitorResultPath := filepath.Base(r.MonitorResultPath) + if !strings.HasPrefix(filepath.Base(r.MonitorResultPath), r.LogPrefix) { + dstMonitorResultPath = r.LogPrefix + fmt.Sprintf("_%d_", r.EtcdServerIndex) + filepath.Base(r.MonitorResultPath) + } + + srcAgentLogPath := agentLogPath + dstAgentLogPath := filepath.Base(agentLogPath) + if !strings.HasPrefix(filepath.Base(agentLogPath), r.LogPrefix) { + dstAgentLogPath = r.LogPrefix + fmt.Sprintf("_%d_", r.EtcdServerIndex) + filepath.Base(agentLogPath) + } + + log.Printf("Uploading %s\n", srcDatabaseLogPath) + wc1 := sclient.Bucket(r.GoogleCloudStorageBucketName).Object(dstDatabaseLogPath).NewWriter(context.Background()) + wc1.ContentType = "text/plain" + bts1, err := ioutil.ReadFile(srcDatabaseLogPath) + if err != nil { + log.Warnf("error (%v)\n", err) + return + } + if _, err := wc1.Write(bts1); err != nil { + log.Warnf("error (%v)\n", err) + return + } + if err := wc1.Close(); err != nil { + log.Warnf("error (%v)\n", err) + return + } + + log.Printf("Uploading %s\n", srcMonitorResultPath) + wc2 := sclient.Bucket(r.GoogleCloudStorageBucketName).Object(dstMonitorResultPath).NewWriter(context.Background()) + wc2.ContentType = "text/plain" + bts2, err := ioutil.ReadFile(srcMonitorResultPath) + if err != nil { + log.Warnf("error (%v)\n", err) + return + } + if _, err := wc2.Write(bts2); err != nil { + log.Warnf("error (%v)\n", err) + return + } + if err := wc2.Close(); err != nil { + log.Warnf("error (%v)\n", err) + return + } + + log.Printf("Uploading %s\n", srcAgentLogPath) + wc3 := sclient.Bucket(r.GoogleCloudStorageBucketName).Object(dstAgentLogPath).NewWriter(context.Background()) + wc3.ContentType = "text/plain" + bts3, err := ioutil.ReadFile(srcAgentLogPath) + if err != nil { + log.Warnf("error (%v)\n", err) + return + } + if _, err := wc3.Write(bts3); err != nil { + log.Warnf("error (%v)\n", err) + return + } + if err := wc3.Close(); err != nil { + log.Warnf("error (%v)\n", err) + return + } return } } diff --git a/agent/message.pb.go b/agent/message.pb.go index b5acb931..3b296c79 100644 --- a/agent/message.pb.go +++ b/agent/message.pb.go @@ -94,6 +94,23 @@ type Request struct { // ZookeeperMaxClientCnxns limits the number of concurrent connections // (at the socket level) that a single client, identified by IP address. ZookeeperMaxClientCnxns int64 `protobuf:"varint,7,opt,name=zookeeperMaxClientCnxns,proto3" json:"zookeeperMaxClientCnxns,omitempty"` + // WorkingDirectory is the working directory of the remote machine. + // If empty, it will use its home directory. + WorkingDirectory string `protobuf:"bytes,8,opt,name=workingDirectory,proto3" json:"workingDirectory,omitempty"` + // LogPrefix prefixes all logs to be generated in agent. + LogPrefix string `protobuf:"bytes,9,opt,name=logPrefix,proto3" json:"logPrefix,omitempty"` + // DatabaseLogPath is the file path to store the database logs. + DatabaseLogPath string `protobuf:"bytes,10,opt,name=databaseLogPath,proto3" json:"databaseLogPath,omitempty"` + // MonitorResultPath is the file path to store monitoring results. + MonitorResultPath string `protobuf:"bytes,11,opt,name=monitorResultPath,proto3" json:"monitorResultPath,omitempty"` + // GoogleCloudProjectName is the project name to use + // to upload logs. + GoogleCloudProjectName string `protobuf:"bytes,12,opt,name=googleCloudProjectName,proto3" json:"googleCloudProjectName,omitempty"` + // GoogleCloudStorageJSONKey is the key to be used to upload + // data and logs to Google Cloud Storage. + GoogleCloudStorageJSONKey string `protobuf:"bytes,13,opt,name=googleCloudStorageJSONKey,proto3" json:"googleCloudStorageJSONKey,omitempty"` + // GoogleCloudStorageBucketName is the bucket name to store all data and logs. + GoogleCloudStorageBucketName string `protobuf:"bytes,14,opt,name=googleCloudStorageBucketName,proto3" json:"googleCloudStorageBucketName,omitempty"` } func (m *Request) Reset() { *m = Request{} } @@ -229,6 +246,48 @@ func (m *Request) MarshalTo(data []byte) (int, error) { i++ i = encodeVarintMessage(data, i, uint64(m.ZookeeperMaxClientCnxns)) } + if len(m.WorkingDirectory) > 0 { + data[i] = 0x42 + i++ + i = encodeVarintMessage(data, i, uint64(len(m.WorkingDirectory))) + i += copy(data[i:], m.WorkingDirectory) + } + if len(m.LogPrefix) > 0 { + data[i] = 0x4a + i++ + i = encodeVarintMessage(data, i, uint64(len(m.LogPrefix))) + i += copy(data[i:], m.LogPrefix) + } + if len(m.DatabaseLogPath) > 0 { + data[i] = 0x52 + i++ + i = encodeVarintMessage(data, i, uint64(len(m.DatabaseLogPath))) + i += copy(data[i:], m.DatabaseLogPath) + } + if len(m.MonitorResultPath) > 0 { + data[i] = 0x5a + i++ + i = encodeVarintMessage(data, i, uint64(len(m.MonitorResultPath))) + i += copy(data[i:], m.MonitorResultPath) + } + if len(m.GoogleCloudProjectName) > 0 { + data[i] = 0x62 + i++ + i = encodeVarintMessage(data, i, uint64(len(m.GoogleCloudProjectName))) + i += copy(data[i:], m.GoogleCloudProjectName) + } + if len(m.GoogleCloudStorageJSONKey) > 0 { + data[i] = 0x6a + i++ + i = encodeVarintMessage(data, i, uint64(len(m.GoogleCloudStorageJSONKey))) + i += copy(data[i:], m.GoogleCloudStorageJSONKey) + } + if len(m.GoogleCloudStorageBucketName) > 0 { + data[i] = 0x72 + i++ + i = encodeVarintMessage(data, i, uint64(len(m.GoogleCloudStorageBucketName))) + i += copy(data[i:], m.GoogleCloudStorageBucketName) + } return i, nil } @@ -312,6 +371,34 @@ func (m *Request) Size() (n int) { if m.ZookeeperMaxClientCnxns != 0 { n += 1 + sovMessage(uint64(m.ZookeeperMaxClientCnxns)) } + l = len(m.WorkingDirectory) + if l > 0 { + n += 1 + l + sovMessage(uint64(l)) + } + l = len(m.LogPrefix) + if l > 0 { + n += 1 + l + sovMessage(uint64(l)) + } + l = len(m.DatabaseLogPath) + if l > 0 { + n += 1 + l + sovMessage(uint64(l)) + } + l = len(m.MonitorResultPath) + if l > 0 { + n += 1 + l + sovMessage(uint64(l)) + } + l = len(m.GoogleCloudProjectName) + if l > 0 { + n += 1 + l + sovMessage(uint64(l)) + } + l = len(m.GoogleCloudStorageJSONKey) + if l > 0 { + n += 1 + l + sovMessage(uint64(l)) + } + l = len(m.GoogleCloudStorageBucketName) + if l > 0 { + n += 1 + l + sovMessage(uint64(l)) + } return n } @@ -509,6 +596,209 @@ func (m *Request) Unmarshal(data []byte) error { break } } + case 8: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field WorkingDirectory", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMessage + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.WorkingDirectory = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 9: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field LogPrefix", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMessage + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.LogPrefix = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 10: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DatabaseLogPath", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMessage + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.DatabaseLogPath = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 11: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MonitorResultPath", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMessage + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.MonitorResultPath = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 12: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field GoogleCloudProjectName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMessage + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.GoogleCloudProjectName = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 13: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field GoogleCloudStorageJSONKey", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMessage + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.GoogleCloudStorageJSONKey = string(data[iNdEx:postIndex]) + iNdEx = postIndex + case 14: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field GoogleCloudStorageBucketName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMessage + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.GoogleCloudStorageBucketName = string(data[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipMessage(data[iNdEx:]) @@ -706,30 +996,38 @@ var ( ) var fileDescriptorMessage = []byte{ - // 392 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x74, 0x52, 0xcd, 0xae, 0xd2, 0x40, - 0x14, 0x6e, 0x2f, 0x97, 0x4b, 0x7b, 0x6e, 0x7a, 0x2f, 0x99, 0x68, 0xee, 0x84, 0x05, 0x31, 0x95, - 0x05, 0x1b, 0x4a, 0x02, 0xc6, 0xb8, 0x70, 0xa3, 0xb0, 0x21, 0xc6, 0x48, 0x5a, 0x57, 0xee, 0xa6, - 0xe5, 0x50, 0x1b, 0xa1, 0x53, 0x67, 0xa6, 0x06, 0x79, 0x12, 0x1f, 0xc0, 0x87, 0x61, 0xe9, 0x23, - 0xf8, 0xf3, 0x22, 0x0e, 0x03, 0x2d, 0x81, 0xe8, 0x62, 0x92, 0xf3, 0xfd, 0x9d, 0xf4, 0x9c, 0x53, - 0xf0, 0xd6, 0x28, 0x25, 0x4b, 0x31, 0x28, 0x04, 0x57, 0x9c, 0x34, 0x75, 0x99, 0xab, 0xce, 0x20, - 0xcd, 0xd4, 0xc7, 0x32, 0x0e, 0x12, 0xbe, 0x1e, 0xa6, 0x3c, 0xe5, 0x43, 0xa3, 0xc6, 0xe5, 0xd2, - 0x20, 0x03, 0x4c, 0x75, 0x48, 0xf9, 0xdf, 0x1b, 0xd0, 0x0a, 0xf1, 0x73, 0x89, 0x52, 0x91, 0xe7, - 0xe0, 0xf2, 0x02, 0x05, 0x53, 0x19, 0xcf, 0xa9, 0xfd, 0xc4, 0xee, 0xdf, 0x8d, 0x68, 0x60, 0xba, - 0x06, 0x47, 0x4b, 0xf0, 0xae, 0xd2, 0xc3, 0x93, 0x95, 0x8c, 0xc1, 0x59, 0x30, 0xc5, 0x62, 0x26, - 0x91, 0x5e, 0x99, 0xd8, 0xc3, 0x45, 0x6c, 0x7a, 0x94, 0xc3, 0xda, 0x48, 0x28, 0xb4, 0x0a, 0x44, - 0x31, 0x9b, 0x4b, 0xda, 0xd0, 0x19, 0x37, 0xac, 0x20, 0xe9, 0xc3, 0x3d, 0xaa, 0x64, 0x11, 0xa1, - 0xf8, 0xa2, 0x89, 0x7c, 0x81, 0x1b, 0x7a, 0xad, 0x1d, 0x5e, 0x78, 0x49, 0x93, 0x1e, 0x78, 0x5b, - 0xce, 0x3f, 0x21, 0xea, 0x4f, 0x79, 0xfb, 0x75, 0x36, 0xa5, 0x4d, 0xe3, 0x3b, 0x27, 0xc9, 0x33, - 0x78, 0x5c, 0x13, 0x73, 0x81, 0xaf, 0x56, 0x2b, 0x9e, 0x44, 0xd9, 0x16, 0xe9, 0x8d, 0x76, 0x37, - 0xc2, 0x7f, 0x8b, 0xe4, 0x05, 0x3c, 0x9c, 0xda, 0xb0, 0xcd, 0x64, 0x95, 0xe9, 0x81, 0x26, 0xf9, - 0x26, 0x97, 0xb4, 0x65, 0x72, 0xff, 0x93, 0xfd, 0x01, 0xb8, 0xf5, 0x9a, 0x88, 0x0b, 0xcd, 0x48, - 0x31, 0xa1, 0xda, 0x16, 0x71, 0xe0, 0x3a, 0x52, 0xbc, 0x68, 0xdb, 0xe4, 0x76, 0xbf, 0x73, 0x69, - 0xe8, 0x2b, 0xff, 0x29, 0x38, 0xd5, 0x7a, 0xf6, 0x96, 0xfd, 0x8c, 0xda, 0xec, 0x81, 0xfb, 0x81, - 0xf3, 0x37, 0xa6, 0x7f, 0xdb, 0xf6, 0x7b, 0xe0, 0xe8, 0x44, 0xc1, 0xf3, 0xc3, 0xe6, 0x64, 0x99, - 0x24, 0xfa, 0xf8, 0xe6, 0x48, 0x4e, 0x58, 0xc1, 0xd1, 0x4b, 0xb8, 0x7d, 0x2f, 0x58, 0xae, 0x7d, - 0x42, 0xa1, 0x20, 0x03, 0x70, 0x0c, 0x5c, 0xea, 0xfa, 0xee, 0xfc, 0x22, 0x9d, 0xfb, 0x1a, 0x1f, - 0xba, 0xfa, 0xd6, 0xeb, 0x47, 0xbb, 0x5f, 0x5d, 0x6b, 0xf7, 0xbb, 0x6b, 0xff, 0xd0, 0xef, 0xa7, - 0x7e, 0xdf, 0xfe, 0x74, 0xad, 0xf8, 0xc6, 0xfc, 0x27, 0xe3, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, - 0x62, 0x4a, 0xb3, 0x66, 0x6e, 0x02, 0x00, 0x00, + // 526 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x7c, 0x93, 0xcf, 0x72, 0xd2, 0x50, + 0x14, 0xc6, 0xa1, 0x7f, 0x20, 0x39, 0x08, 0xc5, 0x3b, 0x6a, 0xaf, 0x9d, 0x0e, 0xe3, 0x60, 0x17, + 0x1d, 0x47, 0xe8, 0x4c, 0xeb, 0x38, 0x2e, 0xba, 0x11, 0xd8, 0xd4, 0x6a, 0xcb, 0x04, 0x57, 0xee, + 0x42, 0x38, 0xa4, 0x91, 0x90, 0x8b, 0x37, 0x37, 0x0a, 0x5d, 0xf9, 0x18, 0x3e, 0x52, 0x97, 0x3e, + 0x82, 0x7f, 0x5e, 0xc4, 0xc3, 0xa1, 0x01, 0x0b, 0xe2, 0x22, 0x33, 0xf7, 0x7c, 0xdf, 0xef, 0x3b, + 0x49, 0xee, 0x3d, 0x17, 0x8a, 0x43, 0x8c, 0x63, 0xd7, 0xc7, 0xfa, 0x48, 0x2b, 0xa3, 0xc4, 0x36, + 0x2d, 0x23, 0xb3, 0x57, 0xf3, 0x03, 0x73, 0x95, 0x74, 0xeb, 0x9e, 0x1a, 0x1e, 0xf9, 0xca, 0x57, + 0x47, 0xec, 0x76, 0x93, 0x3e, 0x57, 0x5c, 0xf0, 0x6a, 0x96, 0xaa, 0x7e, 0xcd, 0x41, 0xde, 0xc1, + 0x4f, 0x09, 0xc6, 0x46, 0xbc, 0x04, 0x5b, 0x8d, 0x50, 0xbb, 0x26, 0x50, 0x91, 0xcc, 0x3e, 0xc9, + 0x1e, 0x96, 0x8e, 0x65, 0x9d, 0xbb, 0xd6, 0x6f, 0x91, 0xfa, 0x65, 0xea, 0x3b, 0x0b, 0x54, 0x9c, + 0x80, 0xd5, 0x73, 0x8d, 0xdb, 0x75, 0x63, 0x94, 0x1b, 0x1c, 0xdb, 0x5d, 0x8a, 0xb5, 0x6e, 0x6d, + 0x67, 0x0e, 0x0a, 0x09, 0xf9, 0x11, 0xa2, 0x3e, 0x6b, 0xc7, 0x72, 0x93, 0x32, 0xb6, 0x93, 0x96, + 0xe2, 0x10, 0x76, 0xd0, 0x78, 0xbd, 0x0e, 0xea, 0xcf, 0x24, 0x44, 0x3d, 0x1c, 0xcb, 0x2d, 0x22, + 0x8a, 0xce, 0xb2, 0x2c, 0x0e, 0xa0, 0x78, 0xad, 0xd4, 0x00, 0x91, 0x3e, 0xe5, 0xdd, 0xe4, 0xac, + 0x25, 0xb7, 0x99, 0xbb, 0x2b, 0x8a, 0x17, 0xf0, 0x70, 0x2e, 0xb4, 0x35, 0xbe, 0x0e, 0x43, 0xe5, + 0x75, 0x82, 0x6b, 0x94, 0x39, 0xa2, 0x37, 0x9d, 0x7f, 0x9b, 0xe2, 0x15, 0xec, 0x2e, 0xda, 0xb8, + 0xe3, 0x66, 0x18, 0xd0, 0x0f, 0x35, 0xa3, 0x71, 0x14, 0xcb, 0x3c, 0xe7, 0xd6, 0xd9, 0xe2, 0x19, + 0x94, 0xbf, 0x28, 0x3d, 0x08, 0x22, 0xbf, 0x15, 0x68, 0xf4, 0x8c, 0xd2, 0x13, 0x69, 0xf1, 0x2f, + 0xae, 0xe8, 0x62, 0x1f, 0xec, 0x50, 0xf9, 0xf4, 0xe2, 0x7e, 0x30, 0x96, 0x36, 0x43, 0x0b, 0x61, + 0xba, 0x13, 0xe9, 0x7e, 0xbd, 0x25, 0xd1, 0x35, 0x57, 0x12, 0x98, 0x59, 0x96, 0xc5, 0x73, 0xb8, + 0x3f, 0x54, 0x51, 0x40, 0x3d, 0x1d, 0x8c, 0x93, 0xd0, 0x30, 0x5b, 0x60, 0x76, 0xd5, 0xa0, 0x83, + 0x7e, 0xe4, 0x2b, 0xe5, 0x87, 0xd8, 0x0c, 0x55, 0xd2, 0x6b, 0x6b, 0xf5, 0x91, 0x3e, 0xe7, 0xc2, + 0x1d, 0xa2, 0xbc, 0xc7, 0x91, 0x35, 0xae, 0x38, 0x85, 0xc7, 0x7f, 0x39, 0x1d, 0x6a, 0x4a, 0xc7, + 0xfc, 0xa6, 0x73, 0x79, 0x71, 0x8e, 0x13, 0x59, 0xe4, 0xe8, 0x7a, 0x40, 0x34, 0x60, 0x7f, 0xd5, + 0x6c, 0x24, 0xde, 0x00, 0x67, 0xef, 0x2e, 0x71, 0x83, 0xff, 0x32, 0xd5, 0x1a, 0xd8, 0xf3, 0x11, + 0x14, 0x36, 0x6c, 0x77, 0x8c, 0xab, 0x4d, 0x39, 0x23, 0x2c, 0xd8, 0x22, 0x78, 0x54, 0xce, 0x8a, + 0xc2, 0x74, 0x9e, 0x63, 0x96, 0x37, 0xaa, 0x4f, 0xc1, 0x4a, 0x47, 0x6f, 0x8a, 0x4c, 0xe7, 0x87, + 0xe0, 0x22, 0xd8, 0x1f, 0x94, 0x3a, 0xe7, 0xb3, 0x2b, 0x67, 0xab, 0x07, 0x60, 0x51, 0x62, 0xa4, + 0xa2, 0xd9, 0x54, 0xc6, 0x89, 0xe7, 0xd1, 0xc5, 0xe2, 0x0b, 0x60, 0x39, 0x69, 0x79, 0x7c, 0x0a, + 0x85, 0xf7, 0xda, 0x8d, 0x88, 0xd3, 0x06, 0xb5, 0xa8, 0x81, 0xc5, 0x65, 0x9f, 0xd6, 0xa5, 0xbb, + 0xd3, 0xbe, 0xb7, 0x33, 0xaf, 0x67, 0x5d, 0xab, 0x99, 0xc6, 0x83, 0x9b, 0x9f, 0x95, 0xcc, 0xcd, + 0xaf, 0x4a, 0xf6, 0x3b, 0x3d, 0x3f, 0xe8, 0xf9, 0xf6, 0xbb, 0x92, 0xe9, 0xe6, 0xf8, 0x0e, 0x9e, + 0xfc, 0x09, 0x00, 0x00, 0xff, 0xff, 0x7d, 0x4c, 0xf6, 0x23, 0xca, 0x03, 0x00, 0x00, } diff --git a/agent/message.proto b/agent/message.proto index a16f6a08..b5bcfb87 100644 --- a/agent/message.proto +++ b/agent/message.proto @@ -39,6 +39,30 @@ message Request { // ZookeeperMaxClientCnxns limits the number of concurrent connections // (at the socket level) that a single client, identified by IP address. int64 zookeeperMaxClientCnxns = 7; + + // WorkingDirectory is the working directory of the remote machine. + // If empty, it will use its home directory. + string workingDirectory = 8; + + // LogPrefix prefixes all logs to be generated in agent. + string logPrefix = 9; + + // DatabaseLogPath is the file path to store the database logs. + string databaseLogPath = 10; + + // MonitorResultPath is the file path to store monitoring results. + string monitorResultPath = 11; + + // GoogleCloudProjectName is the project name to use + // to upload logs. + string googleCloudProjectName = 12; + + // GoogleCloudStorageJSONKey is the key to be used to upload + // data and logs to Google Cloud Storage. + string googleCloudStorageJSONKey = 13; + + // GoogleCloudStorageBucketName is the bucket name to store all data and logs. + string googleCloudStorageBucketName = 14; } message Response { diff --git a/agent/util.go b/agent/util.go index ec26c7bc..663cdb3f 100644 --- a/agent/util.go +++ b/agent/util.go @@ -19,6 +19,14 @@ import ( "runtime" ) +func openToRead(fpath string) (*os.File, error) { + f, err := os.OpenFile(fpath, os.O_RDONLY, 0444) + if err != nil { + return f, err + } + return f, nil +} + func openToAppend(fpath string) (*os.File, error) { f, err := os.OpenFile(fpath, os.O_RDWR|os.O_APPEND, 0777) if err != nil { @@ -55,3 +63,22 @@ func homeDir() string { } return os.Getenv("HOME") } + +// exist returns true if the file or directory exists. +func exist(fpath string) bool { + st, err := os.Stat(fpath) + if err != nil { + if os.IsNotExist(err) { + return false + } + } + if st.IsDir() { + return true + } + if _, err := os.Stat(fpath); err != nil { + if os.IsNotExist(err) { + return false + } + } + return true +} diff --git a/bench/main.go b/bench/main.go index e4e3b3ce..9941cd42 100644 --- a/bench/main.go +++ b/bench/main.go @@ -39,7 +39,10 @@ var ( sample bool noHistogram bool - csvResultPath string + csvResultPath string + googleCloudProjectName string + googleCloudStorageJSONKeyPath string + googleCloudStorageBucketName string bar *pb.ProgressBar results chan result @@ -58,6 +61,9 @@ func init() { Command.PersistentFlags().BoolVar(&sample, "sample", false, "'true' to sample requests for every second.") Command.PersistentFlags().BoolVar(&noHistogram, "no-histogram", false, "'true' to not show results in histogram.") Command.PersistentFlags().StringVar(&csvResultPath, "csv-result-path", "timeseries.csv", "path to store csv results.") + Command.PersistentFlags().StringVar(&googleCloudProjectName, "google-cloud-project-name", "", "Google cloud project name.") + Command.PersistentFlags().StringVar(&googleCloudStorageJSONKeyPath, "google-cloud-storage-json-key-path", "", "Path of JSON key file.") + Command.PersistentFlags().StringVar(&googleCloudStorageBucketName, "google-cloud-storage-bucket-name", "", "Google cloud storage bucket name.") } func main() { diff --git a/bench/range.go b/bench/range.go index 5ed7fb33..1d62c49c 100644 --- a/bench/range.go +++ b/bench/range.go @@ -58,18 +58,17 @@ func rangeFunc(cmd *cobra.Command, args []string) { switch database { case "etcd": fmt.Printf("PUT '%s' to etcd\n", k) - - var cerr error + var err error for i := 0; i < 5; i++ { clients := mustCreateClients(1, 1) - _, cerr = clients[0].Do(context.Background(), v3.OpPut(k, string(v))) - if cerr != nil { + _, err = clients[0].Do(context.Background(), v3.OpPut(k, string(v))) + if err != nil { continue } fmt.Printf("Done with PUT '%s' to etcd\n", k) break } - if cerr != nil { + if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } @@ -77,18 +76,17 @@ func rangeFunc(cmd *cobra.Command, args []string) { case "zk": k = "/" + k fmt.Printf("PUT '%s' to Zookeeper\n", k) - - var cerr error + var err error for i := 0; i < 5; i++ { conns := mustCreateConnsZk(totalConns) - _, cerr = conns[0].Create(k, v, zkCreateFlags, zkCreateAcl) - if cerr != nil { + _, err = conns[0].Create(k, v, zkCreateFlags, zkCreateAcl) + if err != nil { continue } fmt.Printf("Done with PUT '%s' to Zookeeper\n", k) break } - if cerr != nil { + if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } diff --git a/bench/timeseries.go b/bench/timeseries.go index 215580d2..ad88f53d 100644 --- a/bench/timeseries.go +++ b/bench/timeseries.go @@ -18,10 +18,17 @@ import ( "bytes" "encoding/csv" "fmt" + "io/ioutil" "log" "sort" + "strings" "sync" "time" + + "golang.org/x/net/context" + "golang.org/x/oauth2/google" + "google.golang.org/cloud" + "google.golang.org/cloud/storage" ) type timeSeries struct { @@ -112,7 +119,47 @@ func (ts TimeSeries) String() string { if err := toFile(txt, csvResultPath); err != nil { log.Println(err) } else { - log.Println("time series saved") + log.Println("time series saved... uploading to Google cloud storage...") + kbts, err := ioutil.ReadFile(googleCloudStorageJSONKeyPath) + if err != nil { + log.Fatal(err) + } + conf, err := google.JWTConfigFromJSON( + kbts, + storage.ScopeFullControl, + ) + if err != nil { + log.Fatal(err) + } + ctx := context.Background() + aclient, err := storage.NewAdminClient(ctx, googleCloudProjectName, cloud.WithTokenSource(conf.TokenSource(ctx))) + if err != nil { + log.Fatal(err) + } + defer aclient.Close() + + if err := aclient.CreateBucket(context.Background(), googleCloudStorageBucketName, nil); err != nil { + if !strings.Contains(err.Error(), "You already own this bucket. Please select another name") { + log.Fatal(err) + } + } + + sctx := context.Background() + sclient, err := storage.NewClient(sctx, cloud.WithTokenSource(conf.TokenSource(sctx))) + if err != nil { + log.Fatal(err) + } + defer sclient.Close() + + log.Printf("Uploading %s\n", csvResultPath) + wc := sclient.Bucket(googleCloudStorageBucketName).Object(csvResultPath).NewWriter(context.Background()) + wc.ContentType = "text/plain" + if _, err := wc.Write([]byte(txt)); err != nil { + log.Fatal(err) + } + if err := wc.Close(); err != nil { + log.Fatal(err) + } } return fmt.Sprintf("\nSample in one second (unix latency throughput):\n%s", txt) } diff --git a/control/control.go b/control/control.go index 5d32478a..6f6a4812 100644 --- a/control/control.go +++ b/control/control.go @@ -15,6 +15,7 @@ package control import ( + "io/ioutil" "log" "os" "strings" @@ -32,6 +33,14 @@ type ( AgentEndpoints []string ZookeeperPreAllocSize int64 ZookeeperMaxClientCnxns int64 + + WorkingDirectory string + LogPrefix string + DatabaseLogPath string + MonitorResultPath string + GoogleCloudProjectName string + GoogleCloudStorageJSONKeyPath string + GoogleCloudStorageBucketName string } ) @@ -60,6 +69,14 @@ func init() { StartCommand.PersistentFlags().Int64Var(&globalFlags.ZookeeperPreAllocSize, "zk-pre-alloc-size", 65536*1024, "Disk pre-allocation size in bytes.") StartCommand.PersistentFlags().Int64Var(&globalFlags.ZookeeperMaxClientCnxns, "zk-max-client-conns", 5000, "Maximum number of concurrent Zookeeper connection.") + StartCommand.PersistentFlags().StringVar(&globalFlags.WorkingDirectory, "working-directory", "", "Working directory of the remote machine. If empty, it will use its home directory.") + StartCommand.PersistentFlags().StringVar(&globalFlags.LogPrefix, "log-prefix", "", "Prefix to all logs to be generated in agents.") + StartCommand.PersistentFlags().StringVar(&globalFlags.DatabaseLogPath, "database-log-path", "database.log", "Path of database log.") + StartCommand.PersistentFlags().StringVar(&globalFlags.MonitorResultPath, "monitor-result-path", "monitor.csv", "CSV file path of monitoring results.") + StartCommand.PersistentFlags().StringVar(&globalFlags.GoogleCloudProjectName, "google-cloud-project-name", "", "Google cloud project name.") + StartCommand.PersistentFlags().StringVar(&globalFlags.GoogleCloudStorageJSONKeyPath, "google-cloud-storage-json-key-path", "", "Path of JSON key file.") + StartCommand.PersistentFlags().StringVar(&globalFlags.GoogleCloudStorageBucketName, "google-cloud-storage-bucket-name", "", "Google cloud storage bucket name.") + StopCommand.PersistentFlags().StringSliceVar(&globalFlags.AgentEndpoints, "agent-endpoints", []string{""}, "Endpoints to send client requests to, then it automatically configures.") RestartCommand.PersistentFlags().StringSliceVar(&globalFlags.AgentEndpoints, "agent-endpoints", []string{""}, "Endpoints to send client requests to, then it automatically configures.") @@ -100,6 +117,19 @@ func CommandFunc(cmd *cobra.Command, args []string) { if cmd.Use == "start" { req.ZookeeperPreAllocSize = globalFlags.ZookeeperPreAllocSize req.ZookeeperMaxClientCnxns = globalFlags.ZookeeperMaxClientCnxns + + req.WorkingDirectory = globalFlags.WorkingDirectory + req.LogPrefix = globalFlags.LogPrefix + req.DatabaseLogPath = globalFlags.DatabaseLogPath + req.MonitorResultPath = globalFlags.MonitorResultPath + req.GoogleCloudProjectName = globalFlags.GoogleCloudProjectName + bts, err := ioutil.ReadFile(globalFlags.GoogleCloudStorageJSONKeyPath) + if err != nil { + log.Println(err) + os.Exit(-1) + } + req.GoogleCloudStorageJSONKey = string(bts) + req.GoogleCloudStorageBucketName = globalFlags.GoogleCloudStorageBucketName } for i := range peerIPs {