*: Google cloud integration

Fix https://github.com/coreos/dbtester/issues/6.
This commit is contained in:
Gyu-Ho Lee 2016-03-21 14:42:48 -07:00
parent 4d2a16638f
commit d921f18a23
8 changed files with 638 additions and 101 deletions

View File

@ -17,7 +17,7 @@ package agent
import (
"bytes"
"fmt"
"log"
"io/ioutil"
"net"
"os"
"os/exec"
@ -28,24 +28,19 @@ import (
"text/template"
"time"
log "github.com/Sirupsen/logrus"
"github.com/gyuho/psn/ps"
"github.com/spf13/cobra"
"golang.org/x/net/context"
"golang.org/x/oauth2/google"
"google.golang.org/cloud"
"google.golang.org/cloud/storage"
"google.golang.org/grpc"
)
type (
Flags struct {
GRPCPort string
WorkingDir string
DatabaseLogPath string
Monitor bool
MonitorInterval time.Duration
MonitorResultPath string
StorageSecretKeyPath string
}
// ZookeeperConfig is zookeeper configuration.
@ -69,6 +64,8 @@ type (
var (
shell = os.Getenv("SHELL")
agentLogPath = filepath.Join(homeDir(), "agent.log")
etcdBinaryPath = filepath.Join(os.Getenv("GOPATH"), "bin/etcd")
etcdToken = "etcd_token"
@ -109,48 +106,24 @@ maxClientCnxns={{.MaxClientCnxns}}
)
func init() {
log.SetFormatter(&log.JSONFormatter{})
log.SetLevel(log.InfoLevel)
if len(shell) == 0 {
shell = "sh"
}
Command.PersistentFlags().StringVar(&globalFlags.GRPCPort, "agent-port", ":3500", "Port to server agent gRPC server.")
Command.PersistentFlags().StringVar(&globalFlags.WorkingDir, "working-directory", homeDir(), "Working directory to store data.")
Command.PersistentFlags().StringVar(&globalFlags.DatabaseLogPath, "database-log-path", "database.log", "Path to save database log.")
Command.PersistentFlags().BoolVar(&globalFlags.Monitor, "monitor", false, "Periodically records resource usage.")
Command.PersistentFlags().DurationVar(&globalFlags.MonitorInterval, "monitor-interval", time.Second, "Resource monitor interval.")
Command.PersistentFlags().StringVar(&globalFlags.MonitorResultPath, "monitor-result-path", "monitor.csv", "File path to store monitor results.")
Command.PersistentFlags().StringVar(&globalFlags.StorageSecretKeyPath, "storage-secret-key-path", "", "Key to use for uploading logs and data.")
}
func CommandFunc(cmd *cobra.Command, args []string) {
if !filepath.HasPrefix(globalFlags.DatabaseLogPath, globalFlags.WorkingDir) {
globalFlags.DatabaseLogPath = filepath.Join(globalFlags.WorkingDir, globalFlags.DatabaseLogPath)
f, err := openToAppend(agentLogPath)
if err != nil {
log.Println(err)
os.Exit(-1)
}
if !filepath.HasPrefix(etcdDataDir, globalFlags.WorkingDir) {
etcdDataDir = filepath.Join(globalFlags.WorkingDir, etcdDataDir)
}
if !filepath.HasPrefix(zkWorkingDir, globalFlags.WorkingDir) {
zkWorkingDir = filepath.Join(globalFlags.WorkingDir, zkWorkingDir)
}
if !filepath.HasPrefix(zkDataDir, globalFlags.WorkingDir) {
zkDataDir = filepath.Join(globalFlags.WorkingDir, zkDataDir)
}
if !filepath.HasPrefix(globalFlags.MonitorResultPath, globalFlags.WorkingDir) {
globalFlags.MonitorResultPath = filepath.Join(globalFlags.WorkingDir, globalFlags.MonitorResultPath)
}
if !filepath.HasPrefix(globalFlags.StorageSecretKeyPath, globalFlags.WorkingDir) {
globalFlags.StorageSecretKeyPath = filepath.Join(globalFlags.WorkingDir, globalFlags.StorageSecretKeyPath)
}
log.Printf("gRPC has started serving at %s\n", globalFlags.GRPCPort)
log.Printf("Database log path: %s\n", globalFlags.DatabaseLogPath)
log.Printf("etcd data directory: %s\n", etcdDataDir)
log.Printf("Zookeeper working directory: %s\n", zkWorkingDir)
log.Printf("Zookeeper data directory: %s\n", zkDataDir)
log.Printf("Monitor result path: %s\n", globalFlags.MonitorResultPath)
log.Printf("Storage key secret path: %s\n", globalFlags.StorageSecretKeyPath) // TODO: use this to upload to Google cloud storage
defer f.Close()
log.SetOutput(f)
log.Printf("gRPC serving: %s\n", globalFlags.GRPCPort)
var (
grpcServer = grpc.NewServer()
@ -161,12 +134,9 @@ func CommandFunc(cmd *cobra.Command, args []string) {
log.Println(err)
os.Exit(-1)
}
RegisterTransporterServer(grpcServer, sender)
log.Printf("gRPC is now serving at %s\n", globalFlags.GRPCPort)
if globalFlags.Monitor {
log.Printf("As soon as database started, it will monitor every %v...\n", globalFlags.MonitorInterval)
}
if err := grpcServer.Serve(ln); err != nil {
log.Println(err)
os.Exit(-1)
@ -183,8 +153,47 @@ type transporterServer struct { // satisfy TransporterServer
var databaseStopped = make(chan struct{})
func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response, error) {
log.Printf("Received message for peer %q", r.PeerIPs)
log.Printf("Message from %q", r.PeerIPs)
peerIPs := strings.Split(r.PeerIPs, "___")
if r.Operation == Request_Start || r.Operation == Request_Restart {
if r.WorkingDirectory == "" {
r.WorkingDirectory = homeDir()
}
if !exist(r.WorkingDirectory) {
return nil, fmt.Errorf("%s does not exist", r.WorkingDirectory)
}
if !filepath.HasPrefix(etcdDataDir, r.WorkingDirectory) {
etcdDataDir = filepath.Join(r.WorkingDirectory, etcdDataDir)
}
if !filepath.HasPrefix(zkWorkingDir, r.WorkingDirectory) {
zkWorkingDir = filepath.Join(r.WorkingDirectory, zkWorkingDir)
}
if !filepath.HasPrefix(zkDataDir, r.WorkingDirectory) {
zkDataDir = filepath.Join(r.WorkingDirectory, zkDataDir)
}
if r.LogPrefix != "" {
if !strings.HasPrefix(filepath.Base(r.DatabaseLogPath), r.LogPrefix) {
r.DatabaseLogPath = filepath.Join(filepath.Dir(r.DatabaseLogPath), r.LogPrefix+"_"+filepath.Base(r.DatabaseLogPath))
}
if !strings.HasPrefix(filepath.Base(r.MonitorResultPath), r.LogPrefix) {
r.MonitorResultPath = filepath.Join(filepath.Dir(r.MonitorResultPath), r.LogPrefix+"_"+filepath.Base(r.MonitorResultPath))
}
} else {
if !filepath.HasPrefix(r.DatabaseLogPath, r.WorkingDirectory) {
r.DatabaseLogPath = filepath.Join(r.WorkingDirectory, r.DatabaseLogPath)
}
if !filepath.HasPrefix(r.MonitorResultPath, r.WorkingDirectory) {
r.MonitorResultPath = filepath.Join(r.WorkingDirectory, r.MonitorResultPath)
}
}
log.Printf("Working directory: %s\n", r.WorkingDirectory)
log.Printf("etcd data directory: %s\n", etcdDataDir)
log.Printf("Zookeeper working directory: %s\n", zkWorkingDir)
log.Printf("Zookeeper data directory: %s\n", zkDataDir)
log.Printf("Database log path: %s\n", r.DatabaseLogPath)
log.Printf("Monitor result path: %s\n", r.MonitorResultPath)
}
t.req = *r
var processPID int
@ -201,7 +210,7 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
return nil, err
}
f, err := openToAppend(globalFlags.DatabaseLogPath)
f, err := openToAppend(r.DatabaseLogPath)
if err != nil {
return nil, err
}
@ -242,7 +251,6 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
flagString := strings.Join(flags, " ")
cmd := exec.Command(etcdBinaryPath, flags...)
cmd.Stdin = nil
cmd.Stdout = f
cmd.Stderr = f
log.Printf("Starting: %s %s\n", cmd.Path, flagString)
@ -306,7 +314,7 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
return nil, err
}
f, err := openToAppend(globalFlags.DatabaseLogPath)
f, err := openToAppend(r.DatabaseLogPath)
if err != nil {
return nil, err
}
@ -317,7 +325,6 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
args := []string{shell, "-c", "/usr/bin/java " + flagString + " " + configFilePath}
cmd := exec.Command(args[0], args[1:]...)
cmd.Stdin = nil
cmd.Stdout = f
cmd.Stderr = f
log.Printf("Starting: %s %s\n", cmd.Path, strings.Join(args[1:], " "))
@ -352,14 +359,13 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
}
}
f, err := openToAppend(globalFlags.DatabaseLogPath)
f, err := openToAppend(r.DatabaseLogPath)
if err != nil {
return nil, err
}
t.logfile = f
cmd := exec.Command(t.cmd.Path, t.cmd.Args[1:]...)
cmd.Stdin = nil
cmd.Stdout = f
cmd.Stderr = f
log.Printf("Restarting: %s\n", strings.Join(t.cmd.Args, " "))
@ -395,7 +401,7 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
return nil, fmt.Errorf("Not implemented %v", r.Operation)
}
if globalFlags.Monitor && r.Operation == Request_Start {
if r.Operation == Request_Start || r.Operation == Request_Restart {
go func(processPID int) {
notifier := make(chan os.Signal, 1)
signal.Notify(notifier, syscall.SIGINT, syscall.SIGTERM)
@ -406,7 +412,7 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
return err
}
f, err := openToAppend(globalFlags.MonitorResultPath)
f, err := openToAppend(r.MonitorResultPath)
if err != nil {
return err
}
@ -415,25 +421,126 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
return ps.WriteToCSV(f, pss...)
}
log.Printf("%s monitor saved at %s\n", r.Database, globalFlags.MonitorResultPath)
log.Printf("%s monitor saved at %s\n", r.Database, r.MonitorResultPath)
var err error
if err = rFunc(); err != nil {
log.Println("error:", err)
log.Warningln("error:", err)
return
}
escape:
for {
select {
case <-time.After(globalFlags.MonitorInterval):
case <-time.After(time.Second):
if err = rFunc(); err != nil {
log.Printf("Monitoring error %v\n", err)
log.Warnf("Monitoring error %v\n", err)
break escape
}
case sig := <-notifier:
log.Printf("Received %v\n", sig)
return
case <-databaseStopped:
log.Println("Monitoring stopped")
log.Println("Monitoring stopped. Uploading data to cloud storage...")
// initialize auth
conf, err := google.JWTConfigFromJSON(
[]byte(r.GoogleCloudStorageJSONKey),
storage.ScopeFullControl,
)
if err != nil {
log.Warnf("error (%v) with\n\n%q\n\n", err, r.GoogleCloudStorageJSONKey)
return
}
ctx := context.Background()
aclient, err := storage.NewAdminClient(ctx, r.GoogleCloudProjectName, cloud.WithTokenSource(conf.TokenSource(ctx)))
if err != nil {
log.Warnf("error (%v) with %q\n", err, r.GoogleCloudProjectName)
}
defer aclient.Close()
if err := aclient.CreateBucket(context.Background(), r.GoogleCloudStorageBucketName, nil); err != nil {
if !strings.Contains(err.Error(), "You already own this bucket. Please select another name") {
log.Warnf("error (%v) with %q\n", err, r.GoogleCloudStorageBucketName)
}
}
sctx := context.Background()
sclient, err := storage.NewClient(sctx, cloud.WithTokenSource(conf.TokenSource(sctx)))
if err != nil {
log.Warnf("error (%v)\n", err)
return
}
defer sclient.Close()
// set up file names
srcDatabaseLogPath := r.DatabaseLogPath
dstDatabaseLogPath := filepath.Base(r.DatabaseLogPath)
if !strings.HasPrefix(filepath.Base(r.DatabaseLogPath), r.LogPrefix) {
dstDatabaseLogPath = r.LogPrefix + fmt.Sprintf("_%d_", r.EtcdServerIndex) + filepath.Base(r.DatabaseLogPath)
}
srcMonitorResultPath := r.MonitorResultPath
dstMonitorResultPath := filepath.Base(r.MonitorResultPath)
if !strings.HasPrefix(filepath.Base(r.MonitorResultPath), r.LogPrefix) {
dstMonitorResultPath = r.LogPrefix + fmt.Sprintf("_%d_", r.EtcdServerIndex) + filepath.Base(r.MonitorResultPath)
}
srcAgentLogPath := agentLogPath
dstAgentLogPath := filepath.Base(agentLogPath)
if !strings.HasPrefix(filepath.Base(agentLogPath), r.LogPrefix) {
dstAgentLogPath = r.LogPrefix + fmt.Sprintf("_%d_", r.EtcdServerIndex) + filepath.Base(agentLogPath)
}
log.Printf("Uploading %s\n", srcDatabaseLogPath)
wc1 := sclient.Bucket(r.GoogleCloudStorageBucketName).Object(dstDatabaseLogPath).NewWriter(context.Background())
wc1.ContentType = "text/plain"
bts1, err := ioutil.ReadFile(srcDatabaseLogPath)
if err != nil {
log.Warnf("error (%v)\n", err)
return
}
if _, err := wc1.Write(bts1); err != nil {
log.Warnf("error (%v)\n", err)
return
}
if err := wc1.Close(); err != nil {
log.Warnf("error (%v)\n", err)
return
}
log.Printf("Uploading %s\n", srcMonitorResultPath)
wc2 := sclient.Bucket(r.GoogleCloudStorageBucketName).Object(dstMonitorResultPath).NewWriter(context.Background())
wc2.ContentType = "text/plain"
bts2, err := ioutil.ReadFile(srcMonitorResultPath)
if err != nil {
log.Warnf("error (%v)\n", err)
return
}
if _, err := wc2.Write(bts2); err != nil {
log.Warnf("error (%v)\n", err)
return
}
if err := wc2.Close(); err != nil {
log.Warnf("error (%v)\n", err)
return
}
log.Printf("Uploading %s\n", srcAgentLogPath)
wc3 := sclient.Bucket(r.GoogleCloudStorageBucketName).Object(dstAgentLogPath).NewWriter(context.Background())
wc3.ContentType = "text/plain"
bts3, err := ioutil.ReadFile(srcAgentLogPath)
if err != nil {
log.Warnf("error (%v)\n", err)
return
}
if _, err := wc3.Write(bts3); err != nil {
log.Warnf("error (%v)\n", err)
return
}
if err := wc3.Close(); err != nil {
log.Warnf("error (%v)\n", err)
return
}
return
}
}

View File

@ -94,6 +94,23 @@ type Request struct {
// ZookeeperMaxClientCnxns limits the number of concurrent connections
// (at the socket level) that a single client, identified by IP address.
ZookeeperMaxClientCnxns int64 `protobuf:"varint,7,opt,name=zookeeperMaxClientCnxns,proto3" json:"zookeeperMaxClientCnxns,omitempty"`
// WorkingDirectory is the working directory of the remote machine.
// If empty, it will use its home directory.
WorkingDirectory string `protobuf:"bytes,8,opt,name=workingDirectory,proto3" json:"workingDirectory,omitempty"`
// LogPrefix prefixes all logs to be generated in agent.
LogPrefix string `protobuf:"bytes,9,opt,name=logPrefix,proto3" json:"logPrefix,omitempty"`
// DatabaseLogPath is the file path to store the database logs.
DatabaseLogPath string `protobuf:"bytes,10,opt,name=databaseLogPath,proto3" json:"databaseLogPath,omitempty"`
// MonitorResultPath is the file path to store monitoring results.
MonitorResultPath string `protobuf:"bytes,11,opt,name=monitorResultPath,proto3" json:"monitorResultPath,omitempty"`
// GoogleCloudProjectName is the project name to use
// to upload logs.
GoogleCloudProjectName string `protobuf:"bytes,12,opt,name=googleCloudProjectName,proto3" json:"googleCloudProjectName,omitempty"`
// GoogleCloudStorageJSONKey is the key to be used to upload
// data and logs to Google Cloud Storage.
GoogleCloudStorageJSONKey string `protobuf:"bytes,13,opt,name=googleCloudStorageJSONKey,proto3" json:"googleCloudStorageJSONKey,omitempty"`
// GoogleCloudStorageBucketName is the bucket name to store all data and logs.
GoogleCloudStorageBucketName string `protobuf:"bytes,14,opt,name=googleCloudStorageBucketName,proto3" json:"googleCloudStorageBucketName,omitempty"`
}
func (m *Request) Reset() { *m = Request{} }
@ -229,6 +246,48 @@ func (m *Request) MarshalTo(data []byte) (int, error) {
i++
i = encodeVarintMessage(data, i, uint64(m.ZookeeperMaxClientCnxns))
}
if len(m.WorkingDirectory) > 0 {
data[i] = 0x42
i++
i = encodeVarintMessage(data, i, uint64(len(m.WorkingDirectory)))
i += copy(data[i:], m.WorkingDirectory)
}
if len(m.LogPrefix) > 0 {
data[i] = 0x4a
i++
i = encodeVarintMessage(data, i, uint64(len(m.LogPrefix)))
i += copy(data[i:], m.LogPrefix)
}
if len(m.DatabaseLogPath) > 0 {
data[i] = 0x52
i++
i = encodeVarintMessage(data, i, uint64(len(m.DatabaseLogPath)))
i += copy(data[i:], m.DatabaseLogPath)
}
if len(m.MonitorResultPath) > 0 {
data[i] = 0x5a
i++
i = encodeVarintMessage(data, i, uint64(len(m.MonitorResultPath)))
i += copy(data[i:], m.MonitorResultPath)
}
if len(m.GoogleCloudProjectName) > 0 {
data[i] = 0x62
i++
i = encodeVarintMessage(data, i, uint64(len(m.GoogleCloudProjectName)))
i += copy(data[i:], m.GoogleCloudProjectName)
}
if len(m.GoogleCloudStorageJSONKey) > 0 {
data[i] = 0x6a
i++
i = encodeVarintMessage(data, i, uint64(len(m.GoogleCloudStorageJSONKey)))
i += copy(data[i:], m.GoogleCloudStorageJSONKey)
}
if len(m.GoogleCloudStorageBucketName) > 0 {
data[i] = 0x72
i++
i = encodeVarintMessage(data, i, uint64(len(m.GoogleCloudStorageBucketName)))
i += copy(data[i:], m.GoogleCloudStorageBucketName)
}
return i, nil
}
@ -312,6 +371,34 @@ func (m *Request) Size() (n int) {
if m.ZookeeperMaxClientCnxns != 0 {
n += 1 + sovMessage(uint64(m.ZookeeperMaxClientCnxns))
}
l = len(m.WorkingDirectory)
if l > 0 {
n += 1 + l + sovMessage(uint64(l))
}
l = len(m.LogPrefix)
if l > 0 {
n += 1 + l + sovMessage(uint64(l))
}
l = len(m.DatabaseLogPath)
if l > 0 {
n += 1 + l + sovMessage(uint64(l))
}
l = len(m.MonitorResultPath)
if l > 0 {
n += 1 + l + sovMessage(uint64(l))
}
l = len(m.GoogleCloudProjectName)
if l > 0 {
n += 1 + l + sovMessage(uint64(l))
}
l = len(m.GoogleCloudStorageJSONKey)
if l > 0 {
n += 1 + l + sovMessage(uint64(l))
}
l = len(m.GoogleCloudStorageBucketName)
if l > 0 {
n += 1 + l + sovMessage(uint64(l))
}
return n
}
@ -509,6 +596,209 @@ func (m *Request) Unmarshal(data []byte) error {
break
}
}
case 8:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field WorkingDirectory", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthMessage
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.WorkingDirectory = string(data[iNdEx:postIndex])
iNdEx = postIndex
case 9:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field LogPrefix", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthMessage
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.LogPrefix = string(data[iNdEx:postIndex])
iNdEx = postIndex
case 10:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field DatabaseLogPath", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthMessage
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.DatabaseLogPath = string(data[iNdEx:postIndex])
iNdEx = postIndex
case 11:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field MonitorResultPath", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthMessage
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.MonitorResultPath = string(data[iNdEx:postIndex])
iNdEx = postIndex
case 12:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field GoogleCloudProjectName", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthMessage
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.GoogleCloudProjectName = string(data[iNdEx:postIndex])
iNdEx = postIndex
case 13:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field GoogleCloudStorageJSONKey", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthMessage
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.GoogleCloudStorageJSONKey = string(data[iNdEx:postIndex])
iNdEx = postIndex
case 14:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field GoogleCloudStorageBucketName", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthMessage
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.GoogleCloudStorageBucketName = string(data[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipMessage(data[iNdEx:])
@ -706,30 +996,38 @@ var (
)
var fileDescriptorMessage = []byte{
// 392 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x74, 0x52, 0xcd, 0xae, 0xd2, 0x40,
0x14, 0x6e, 0x2f, 0x97, 0x4b, 0x7b, 0x6e, 0x7a, 0x2f, 0x99, 0x68, 0xee, 0x84, 0x05, 0x31, 0x95,
0x05, 0x1b, 0x4a, 0x02, 0xc6, 0xb8, 0x70, 0xa3, 0xb0, 0x21, 0xc6, 0x48, 0x5a, 0x57, 0xee, 0xa6,
0xe5, 0x50, 0x1b, 0xa1, 0x53, 0x67, 0xa6, 0x06, 0x79, 0x12, 0x1f, 0xc0, 0x87, 0x61, 0xe9, 0x23,
0xf8, 0xf3, 0x22, 0x0e, 0x03, 0x2d, 0x81, 0xe8, 0x62, 0x92, 0xf3, 0xfd, 0x9d, 0xf4, 0x9c, 0x53,
0xf0, 0xd6, 0x28, 0x25, 0x4b, 0x31, 0x28, 0x04, 0x57, 0x9c, 0x34, 0x75, 0x99, 0xab, 0xce, 0x20,
0xcd, 0xd4, 0xc7, 0x32, 0x0e, 0x12, 0xbe, 0x1e, 0xa6, 0x3c, 0xe5, 0x43, 0xa3, 0xc6, 0xe5, 0xd2,
0x20, 0x03, 0x4c, 0x75, 0x48, 0xf9, 0xdf, 0x1b, 0xd0, 0x0a, 0xf1, 0x73, 0x89, 0x52, 0x91, 0xe7,
0xe0, 0xf2, 0x02, 0x05, 0x53, 0x19, 0xcf, 0xa9, 0xfd, 0xc4, 0xee, 0xdf, 0x8d, 0x68, 0x60, 0xba,
0x06, 0x47, 0x4b, 0xf0, 0xae, 0xd2, 0xc3, 0x93, 0x95, 0x8c, 0xc1, 0x59, 0x30, 0xc5, 0x62, 0x26,
0x91, 0x5e, 0x99, 0xd8, 0xc3, 0x45, 0x6c, 0x7a, 0x94, 0xc3, 0xda, 0x48, 0x28, 0xb4, 0x0a, 0x44,
0x31, 0x9b, 0x4b, 0xda, 0xd0, 0x19, 0x37, 0xac, 0x20, 0xe9, 0xc3, 0x3d, 0xaa, 0x64, 0x11, 0xa1,
0xf8, 0xa2, 0x89, 0x7c, 0x81, 0x1b, 0x7a, 0xad, 0x1d, 0x5e, 0x78, 0x49, 0x93, 0x1e, 0x78, 0x5b,
0xce, 0x3f, 0x21, 0xea, 0x4f, 0x79, 0xfb, 0x75, 0x36, 0xa5, 0x4d, 0xe3, 0x3b, 0x27, 0xc9, 0x33,
0x78, 0x5c, 0x13, 0x73, 0x81, 0xaf, 0x56, 0x2b, 0x9e, 0x44, 0xd9, 0x16, 0xe9, 0x8d, 0x76, 0x37,
0xc2, 0x7f, 0x8b, 0xe4, 0x05, 0x3c, 0x9c, 0xda, 0xb0, 0xcd, 0x64, 0x95, 0xe9, 0x81, 0x26, 0xf9,
0x26, 0x97, 0xb4, 0x65, 0x72, 0xff, 0x93, 0xfd, 0x01, 0xb8, 0xf5, 0x9a, 0x88, 0x0b, 0xcd, 0x48,
0x31, 0xa1, 0xda, 0x16, 0x71, 0xe0, 0x3a, 0x52, 0xbc, 0x68, 0xdb, 0xe4, 0x76, 0xbf, 0x73, 0x69,
0xe8, 0x2b, 0xff, 0x29, 0x38, 0xd5, 0x7a, 0xf6, 0x96, 0xfd, 0x8c, 0xda, 0xec, 0x81, 0xfb, 0x81,
0xf3, 0x37, 0xa6, 0x7f, 0xdb, 0xf6, 0x7b, 0xe0, 0xe8, 0x44, 0xc1, 0xf3, 0xc3, 0xe6, 0x64, 0x99,
0x24, 0xfa, 0xf8, 0xe6, 0x48, 0x4e, 0x58, 0xc1, 0xd1, 0x4b, 0xb8, 0x7d, 0x2f, 0x58, 0xae, 0x7d,
0x42, 0xa1, 0x20, 0x03, 0x70, 0x0c, 0x5c, 0xea, 0xfa, 0xee, 0xfc, 0x22, 0x9d, 0xfb, 0x1a, 0x1f,
0xba, 0xfa, 0xd6, 0xeb, 0x47, 0xbb, 0x5f, 0x5d, 0x6b, 0xf7, 0xbb, 0x6b, 0xff, 0xd0, 0xef, 0xa7,
0x7e, 0xdf, 0xfe, 0x74, 0xad, 0xf8, 0xc6, 0xfc, 0x27, 0xe3, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff,
0x62, 0x4a, 0xb3, 0x66, 0x6e, 0x02, 0x00, 0x00,
// 526 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x7c, 0x93, 0xcf, 0x72, 0xd2, 0x50,
0x14, 0xc6, 0xa1, 0x7f, 0x20, 0x39, 0x08, 0xc5, 0x3b, 0x6a, 0xaf, 0x9d, 0x0e, 0xe3, 0x60, 0x17,
0x1d, 0x47, 0xe8, 0x4c, 0xeb, 0x38, 0x2e, 0xba, 0x11, 0xd8, 0xd4, 0x6a, 0xcb, 0x04, 0x57, 0xee,
0x42, 0x38, 0xa4, 0x91, 0x90, 0x8b, 0x37, 0x37, 0x0a, 0x5d, 0xf9, 0x18, 0x3e, 0x52, 0x97, 0x3e,
0x82, 0x7f, 0x5e, 0xc4, 0xc3, 0xa1, 0x01, 0x0b, 0xe2, 0x22, 0x33, 0xf7, 0x7c, 0xdf, 0xef, 0x3b,
0x49, 0xee, 0x3d, 0x17, 0x8a, 0x43, 0x8c, 0x63, 0xd7, 0xc7, 0xfa, 0x48, 0x2b, 0xa3, 0xc4, 0x36,
0x2d, 0x23, 0xb3, 0x57, 0xf3, 0x03, 0x73, 0x95, 0x74, 0xeb, 0x9e, 0x1a, 0x1e, 0xf9, 0xca, 0x57,
0x47, 0xec, 0x76, 0x93, 0x3e, 0x57, 0x5c, 0xf0, 0x6a, 0x96, 0xaa, 0x7e, 0xcd, 0x41, 0xde, 0xc1,
0x4f, 0x09, 0xc6, 0x46, 0xbc, 0x04, 0x5b, 0x8d, 0x50, 0xbb, 0x26, 0x50, 0x91, 0xcc, 0x3e, 0xc9,
0x1e, 0x96, 0x8e, 0x65, 0x9d, 0xbb, 0xd6, 0x6f, 0x91, 0xfa, 0x65, 0xea, 0x3b, 0x0b, 0x54, 0x9c,
0x80, 0xd5, 0x73, 0x8d, 0xdb, 0x75, 0x63, 0x94, 0x1b, 0x1c, 0xdb, 0x5d, 0x8a, 0xb5, 0x6e, 0x6d,
0x67, 0x0e, 0x0a, 0x09, 0xf9, 0x11, 0xa2, 0x3e, 0x6b, 0xc7, 0x72, 0x93, 0x32, 0xb6, 0x93, 0x96,
0xe2, 0x10, 0x76, 0xd0, 0x78, 0xbd, 0x0e, 0xea, 0xcf, 0x24, 0x44, 0x3d, 0x1c, 0xcb, 0x2d, 0x22,
0x8a, 0xce, 0xb2, 0x2c, 0x0e, 0xa0, 0x78, 0xad, 0xd4, 0x00, 0x91, 0x3e, 0xe5, 0xdd, 0xe4, 0xac,
0x25, 0xb7, 0x99, 0xbb, 0x2b, 0x8a, 0x17, 0xf0, 0x70, 0x2e, 0xb4, 0x35, 0xbe, 0x0e, 0x43, 0xe5,
0x75, 0x82, 0x6b, 0x94, 0x39, 0xa2, 0x37, 0x9d, 0x7f, 0x9b, 0xe2, 0x15, 0xec, 0x2e, 0xda, 0xb8,
0xe3, 0x66, 0x18, 0xd0, 0x0f, 0x35, 0xa3, 0x71, 0x14, 0xcb, 0x3c, 0xe7, 0xd6, 0xd9, 0xe2, 0x19,
0x94, 0xbf, 0x28, 0x3d, 0x08, 0x22, 0xbf, 0x15, 0x68, 0xf4, 0x8c, 0xd2, 0x13, 0x69, 0xf1, 0x2f,
0xae, 0xe8, 0x62, 0x1f, 0xec, 0x50, 0xf9, 0xf4, 0xe2, 0x7e, 0x30, 0x96, 0x36, 0x43, 0x0b, 0x61,
0xba, 0x13, 0xe9, 0x7e, 0xbd, 0x25, 0xd1, 0x35, 0x57, 0x12, 0x98, 0x59, 0x96, 0xc5, 0x73, 0xb8,
0x3f, 0x54, 0x51, 0x40, 0x3d, 0x1d, 0x8c, 0x93, 0xd0, 0x30, 0x5b, 0x60, 0x76, 0xd5, 0xa0, 0x83,
0x7e, 0xe4, 0x2b, 0xe5, 0x87, 0xd8, 0x0c, 0x55, 0xd2, 0x6b, 0x6b, 0xf5, 0x91, 0x3e, 0xe7, 0xc2,
0x1d, 0xa2, 0xbc, 0xc7, 0x91, 0x35, 0xae, 0x38, 0x85, 0xc7, 0x7f, 0x39, 0x1d, 0x6a, 0x4a, 0xc7,
0xfc, 0xa6, 0x73, 0x79, 0x71, 0x8e, 0x13, 0x59, 0xe4, 0xe8, 0x7a, 0x40, 0x34, 0x60, 0x7f, 0xd5,
0x6c, 0x24, 0xde, 0x00, 0x67, 0xef, 0x2e, 0x71, 0x83, 0xff, 0x32, 0xd5, 0x1a, 0xd8, 0xf3, 0x11,
0x14, 0x36, 0x6c, 0x77, 0x8c, 0xab, 0x4d, 0x39, 0x23, 0x2c, 0xd8, 0x22, 0x78, 0x54, 0xce, 0x8a,
0xc2, 0x74, 0x9e, 0x63, 0x96, 0x37, 0xaa, 0x4f, 0xc1, 0x4a, 0x47, 0x6f, 0x8a, 0x4c, 0xe7, 0x87,
0xe0, 0x22, 0xd8, 0x1f, 0x94, 0x3a, 0xe7, 0xb3, 0x2b, 0x67, 0xab, 0x07, 0x60, 0x51, 0x62, 0xa4,
0xa2, 0xd9, 0x54, 0xc6, 0x89, 0xe7, 0xd1, 0xc5, 0xe2, 0x0b, 0x60, 0x39, 0x69, 0x79, 0x7c, 0x0a,
0x85, 0xf7, 0xda, 0x8d, 0x88, 0xd3, 0x06, 0xb5, 0xa8, 0x81, 0xc5, 0x65, 0x9f, 0xd6, 0xa5, 0xbb,
0xd3, 0xbe, 0xb7, 0x33, 0xaf, 0x67, 0x5d, 0xab, 0x99, 0xc6, 0x83, 0x9b, 0x9f, 0x95, 0xcc, 0xcd,
0xaf, 0x4a, 0xf6, 0x3b, 0x3d, 0x3f, 0xe8, 0xf9, 0xf6, 0xbb, 0x92, 0xe9, 0xe6, 0xf8, 0x0e, 0x9e,
0xfc, 0x09, 0x00, 0x00, 0xff, 0xff, 0x7d, 0x4c, 0xf6, 0x23, 0xca, 0x03, 0x00, 0x00,
}

View File

@ -39,6 +39,30 @@ message Request {
// ZookeeperMaxClientCnxns limits the number of concurrent connections
// (at the socket level) that a single client, identified by IP address.
int64 zookeeperMaxClientCnxns = 7;
// WorkingDirectory is the working directory of the remote machine.
// If empty, it will use its home directory.
string workingDirectory = 8;
// LogPrefix prefixes all logs to be generated in agent.
string logPrefix = 9;
// DatabaseLogPath is the file path to store the database logs.
string databaseLogPath = 10;
// MonitorResultPath is the file path to store monitoring results.
string monitorResultPath = 11;
// GoogleCloudProjectName is the project name to use
// to upload logs.
string googleCloudProjectName = 12;
// GoogleCloudStorageJSONKey is the key to be used to upload
// data and logs to Google Cloud Storage.
string googleCloudStorageJSONKey = 13;
// GoogleCloudStorageBucketName is the bucket name to store all data and logs.
string googleCloudStorageBucketName = 14;
}
message Response {

View File

@ -19,6 +19,14 @@ import (
"runtime"
)
func openToRead(fpath string) (*os.File, error) {
f, err := os.OpenFile(fpath, os.O_RDONLY, 0444)
if err != nil {
return f, err
}
return f, nil
}
func openToAppend(fpath string) (*os.File, error) {
f, err := os.OpenFile(fpath, os.O_RDWR|os.O_APPEND, 0777)
if err != nil {
@ -55,3 +63,22 @@ func homeDir() string {
}
return os.Getenv("HOME")
}
// exist returns true if the file or directory exists.
func exist(fpath string) bool {
st, err := os.Stat(fpath)
if err != nil {
if os.IsNotExist(err) {
return false
}
}
if st.IsDir() {
return true
}
if _, err := os.Stat(fpath); err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}

View File

@ -40,6 +40,9 @@ var (
noHistogram bool
csvResultPath string
googleCloudProjectName string
googleCloudStorageJSONKeyPath string
googleCloudStorageBucketName string
bar *pb.ProgressBar
results chan result
@ -58,6 +61,9 @@ func init() {
Command.PersistentFlags().BoolVar(&sample, "sample", false, "'true' to sample requests for every second.")
Command.PersistentFlags().BoolVar(&noHistogram, "no-histogram", false, "'true' to not show results in histogram.")
Command.PersistentFlags().StringVar(&csvResultPath, "csv-result-path", "timeseries.csv", "path to store csv results.")
Command.PersistentFlags().StringVar(&googleCloudProjectName, "google-cloud-project-name", "", "Google cloud project name.")
Command.PersistentFlags().StringVar(&googleCloudStorageJSONKeyPath, "google-cloud-storage-json-key-path", "", "Path of JSON key file.")
Command.PersistentFlags().StringVar(&googleCloudStorageBucketName, "google-cloud-storage-bucket-name", "", "Google cloud storage bucket name.")
}
func main() {

View File

@ -58,18 +58,17 @@ func rangeFunc(cmd *cobra.Command, args []string) {
switch database {
case "etcd":
fmt.Printf("PUT '%s' to etcd\n", k)
var cerr error
var err error
for i := 0; i < 5; i++ {
clients := mustCreateClients(1, 1)
_, cerr = clients[0].Do(context.Background(), v3.OpPut(k, string(v)))
if cerr != nil {
_, err = clients[0].Do(context.Background(), v3.OpPut(k, string(v)))
if err != nil {
continue
}
fmt.Printf("Done with PUT '%s' to etcd\n", k)
break
}
if cerr != nil {
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
@ -77,18 +76,17 @@ func rangeFunc(cmd *cobra.Command, args []string) {
case "zk":
k = "/" + k
fmt.Printf("PUT '%s' to Zookeeper\n", k)
var cerr error
var err error
for i := 0; i < 5; i++ {
conns := mustCreateConnsZk(totalConns)
_, cerr = conns[0].Create(k, v, zkCreateFlags, zkCreateAcl)
if cerr != nil {
_, err = conns[0].Create(k, v, zkCreateFlags, zkCreateAcl)
if err != nil {
continue
}
fmt.Printf("Done with PUT '%s' to Zookeeper\n", k)
break
}
if cerr != nil {
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}

View File

@ -18,10 +18,17 @@ import (
"bytes"
"encoding/csv"
"fmt"
"io/ioutil"
"log"
"sort"
"strings"
"sync"
"time"
"golang.org/x/net/context"
"golang.org/x/oauth2/google"
"google.golang.org/cloud"
"google.golang.org/cloud/storage"
)
type timeSeries struct {
@ -112,7 +119,47 @@ func (ts TimeSeries) String() string {
if err := toFile(txt, csvResultPath); err != nil {
log.Println(err)
} else {
log.Println("time series saved")
log.Println("time series saved... uploading to Google cloud storage...")
kbts, err := ioutil.ReadFile(googleCloudStorageJSONKeyPath)
if err != nil {
log.Fatal(err)
}
conf, err := google.JWTConfigFromJSON(
kbts,
storage.ScopeFullControl,
)
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
aclient, err := storage.NewAdminClient(ctx, googleCloudProjectName, cloud.WithTokenSource(conf.TokenSource(ctx)))
if err != nil {
log.Fatal(err)
}
defer aclient.Close()
if err := aclient.CreateBucket(context.Background(), googleCloudStorageBucketName, nil); err != nil {
if !strings.Contains(err.Error(), "You already own this bucket. Please select another name") {
log.Fatal(err)
}
}
sctx := context.Background()
sclient, err := storage.NewClient(sctx, cloud.WithTokenSource(conf.TokenSource(sctx)))
if err != nil {
log.Fatal(err)
}
defer sclient.Close()
log.Printf("Uploading %s\n", csvResultPath)
wc := sclient.Bucket(googleCloudStorageBucketName).Object(csvResultPath).NewWriter(context.Background())
wc.ContentType = "text/plain"
if _, err := wc.Write([]byte(txt)); err != nil {
log.Fatal(err)
}
if err := wc.Close(); err != nil {
log.Fatal(err)
}
}
return fmt.Sprintf("\nSample in one second (unix latency throughput):\n%s", txt)
}

View File

@ -15,6 +15,7 @@
package control
import (
"io/ioutil"
"log"
"os"
"strings"
@ -32,6 +33,14 @@ type (
AgentEndpoints []string
ZookeeperPreAllocSize int64
ZookeeperMaxClientCnxns int64
WorkingDirectory string
LogPrefix string
DatabaseLogPath string
MonitorResultPath string
GoogleCloudProjectName string
GoogleCloudStorageJSONKeyPath string
GoogleCloudStorageBucketName string
}
)
@ -60,6 +69,14 @@ func init() {
StartCommand.PersistentFlags().Int64Var(&globalFlags.ZookeeperPreAllocSize, "zk-pre-alloc-size", 65536*1024, "Disk pre-allocation size in bytes.")
StartCommand.PersistentFlags().Int64Var(&globalFlags.ZookeeperMaxClientCnxns, "zk-max-client-conns", 5000, "Maximum number of concurrent Zookeeper connection.")
StartCommand.PersistentFlags().StringVar(&globalFlags.WorkingDirectory, "working-directory", "", "Working directory of the remote machine. If empty, it will use its home directory.")
StartCommand.PersistentFlags().StringVar(&globalFlags.LogPrefix, "log-prefix", "", "Prefix to all logs to be generated in agents.")
StartCommand.PersistentFlags().StringVar(&globalFlags.DatabaseLogPath, "database-log-path", "database.log", "Path of database log.")
StartCommand.PersistentFlags().StringVar(&globalFlags.MonitorResultPath, "monitor-result-path", "monitor.csv", "CSV file path of monitoring results.")
StartCommand.PersistentFlags().StringVar(&globalFlags.GoogleCloudProjectName, "google-cloud-project-name", "", "Google cloud project name.")
StartCommand.PersistentFlags().StringVar(&globalFlags.GoogleCloudStorageJSONKeyPath, "google-cloud-storage-json-key-path", "", "Path of JSON key file.")
StartCommand.PersistentFlags().StringVar(&globalFlags.GoogleCloudStorageBucketName, "google-cloud-storage-bucket-name", "", "Google cloud storage bucket name.")
StopCommand.PersistentFlags().StringSliceVar(&globalFlags.AgentEndpoints, "agent-endpoints", []string{""}, "Endpoints to send client requests to, then it automatically configures.")
RestartCommand.PersistentFlags().StringSliceVar(&globalFlags.AgentEndpoints, "agent-endpoints", []string{""}, "Endpoints to send client requests to, then it automatically configures.")
@ -100,6 +117,19 @@ func CommandFunc(cmd *cobra.Command, args []string) {
if cmd.Use == "start" {
req.ZookeeperPreAllocSize = globalFlags.ZookeeperPreAllocSize
req.ZookeeperMaxClientCnxns = globalFlags.ZookeeperMaxClientCnxns
req.WorkingDirectory = globalFlags.WorkingDirectory
req.LogPrefix = globalFlags.LogPrefix
req.DatabaseLogPath = globalFlags.DatabaseLogPath
req.MonitorResultPath = globalFlags.MonitorResultPath
req.GoogleCloudProjectName = globalFlags.GoogleCloudProjectName
bts, err := ioutil.ReadFile(globalFlags.GoogleCloudStorageJSONKeyPath)
if err != nil {
log.Println(err)
os.Exit(-1)
}
req.GoogleCloudStorageJSONKey = string(bts)
req.GoogleCloudStorageBucketName = globalFlags.GoogleCloudStorageBucketName
}
for i := range peerIPs {