// Copyright 2015 CoreOS, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package control import ( "bufio" "crypto/rand" "fmt" "io" "io/ioutil" "net" "net/http" "os" "path/filepath" "sort" "strconv" "strings" "time" mrand "math/rand" clientv2 "github.com/coreos/etcd/client" "github.com/coreos/etcd/clientv3" "github.com/dustin/go-humanize" consulapi "github.com/hashicorp/consul/api" "github.com/samuel/go-zookeeper/zk" ) var ( zkCreateFlags = int32(0) zkCreateAcl = zk.WorldACL(zk.PermAll) ) type request struct { etcdv2Op etcdv2Op etcdv3Op clientv3.Op zkOp zkOp consulOp consulOp } type etcdv2Op struct { key string value string } type zkOp struct { key string value []byte staleRead bool } type consulOp struct { key string value []byte staleRead bool } var ( // dialTotal counts the number of mustCreateConn calls so that endpoint // connections can be handed out in round-robin order dialTotal int ) func mustCreateConnEtcdv3(endpoints []string) *clientv3.Client { endpoint := endpoints[dialTotal%len(endpoints)] dialTotal++ cfg := clientv3.Config{ Endpoints: []string{endpoint}, } client, err := clientv3.New(cfg) if err != nil { fmt.Fprintf(os.Stderr, "dial error: %v\n", err) os.Exit(1) } return client } type etcdv3ClientCfg struct { totalConns int totalClients int } func mustCreateClientsEtcdv3(endpoints []string, cfg etcdv3ClientCfg) []*clientv3.Client { conns := make([]*clientv3.Client, cfg.totalConns) for i := range conns { conns[i] = mustCreateConnEtcdv3(endpoints) } clients := make([]*clientv3.Client, cfg.totalClients) for i := range clients { clients[i] = conns[i%int(cfg.totalConns)] } return clients } func mustCreateClientsEtcdv2(endpoints []string, total int) []clientv2.KeysAPI { cks := make([]clientv2.KeysAPI, total) for i := range cks { endpoint := endpoints[dialTotal%len(endpoints)] dialTotal++ if !strings.HasPrefix(endpoint, "http://") { endpoint = "http://" + endpoint } tr := &http.Transport{ Proxy: http.ProxyFromEnvironment, Dial: (&net.Dialer{ Timeout: 30 * time.Second, KeepAlive: 30 * time.Second, }).Dial, TLSHandshakeTimeout: 10 * time.Second, } cfg := clientv2.Config{ Endpoints: []string{endpoint}, Transport: tr, HeaderTimeoutPerRequest: time.Second, } c, err := clientv2.New(cfg) if err != nil { plog.Fatal(err) } kapi := clientv2.NewKeysAPI(c) cks[i] = kapi } return cks } func mustCreateConnsZk(endpoints []string, total int) []*zk.Conn { zks := make([]*zk.Conn, total) for i := range zks { endpoint := endpoints[dialTotal%len(endpoints)] dialTotal++ conn, _, err := zk.Connect([]string{endpoint}, time.Second) if err != nil { plog.Fatal(err) } zks[i] = conn } return zks } func mustCreateConnsConsul(endpoints []string, total int) []*consulapi.KV { css := make([]*consulapi.KV, total) for i := range css { endpoint := endpoints[dialTotal%len(endpoints)] dialTotal++ dcfg := consulapi.DefaultConfig() dcfg.Address = endpoint // x.x.x.x:8500 cli, err := consulapi.NewClient(dcfg) if err != nil { plog.Fatal(err) } css[i] = cli.KV() } return css } func mustRandBytes(n int) []byte { rb := make([]byte, n) _, err := rand.Read(rb) if err != nil { fmt.Fprintf(os.Stderr, "failed to generate value: %v\n", err) os.Exit(1) } return rb } func getTotalKeysEtcdv2(endpoints []string) map[string]int64 { rs := make(map[string]int64) for _, ep := range endpoints { rs[ep] = 0 // not supported in metrics } return rs } func getTotalKeysEtcdv3(endpoints []string) map[string]int64 { rs := make(map[string]int64) for _, ep := range endpoints { if !strings.HasPrefix(ep, "http://") { ep = "http://" + ep } plog.Println("GET", ep+"/metrics") resp, err := http.Get(ep + "/metrics") if err != nil { plog.Println(err) rs[ep] = 0 } scanner := bufio.NewScanner(resp.Body) for scanner.Scan() { txt := scanner.Text() if strings.HasPrefix(txt, "#") { continue } ts := strings.SplitN(txt, " ", 2) fv := 0.0 if len(ts) == 2 { v, err := strconv.ParseFloat(ts[1], 64) if err == nil { fv = v } } if ts[0] == "etcd_debugging_mvcc_keys_total" { rs[ep] = int64(fv) break } } gracefulClose(resp) } plog.Println("getTotalKeysEtcdv3", rs) return rs } func getTotalKeysZk(endpoints []string) map[string]int64 { rs := make(map[string]int64) stats, ok := zk.FLWSrvr(endpoints, 5*time.Second) if !ok { plog.Printf("getTotalKeysZk failed with %+v", stats) for _, ep := range endpoints { rs[ep] = 0 } return rs } for i, s := range stats { rs[endpoints[i]] = s.NodeCount } return rs } func getTotalKeysConsul(endpoints []string) map[string]int64 { rs := make(map[string]int64) for _, ep := range endpoints { rs[ep] = 0 // not supported in consul } return rs } func max(n1, n2 int64) int64 { if n1 > n2 { return n1 } return n2 } func randBytes(bytesN int) []byte { const ( letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" letterIdxBits = 6 // 6 bits to represent a letter index letterIdxMask = 1<= 0; { if remain == 0 { cache, remain = src.Int63(), letterIdxMax } if idx := int(cache & letterIdxMask); idx < len(letterBytes) { b[i] = letterBytes[idx] i-- } cache >>= letterIdxBits remain-- } return b } func multiRandStrings(keyN, sliceN int) []string { m := make(map[string]struct{}) for len(m) != sliceN { m[string(randBytes(keyN))] = struct{}{} } rs := make([]string, sliceN) idx := 0 for k := range m { rs[idx] = k idx++ } return rs } func toFile(txt, fpath string) error { f, err := os.OpenFile(fpath, os.O_RDWR|os.O_TRUNC, 0777) if err != nil { f, err = os.Create(fpath) if err != nil { return err } } defer f.Close() _, err = f.WriteString(txt) return err } func toMillisecond(d time.Duration) float64 { return d.Seconds() * 1000 } // gracefulClose drains http.Response.Body until it hits EOF // and closes it. This prevents TCP/TLS connections from closing, // therefore available for reuse. func gracefulClose(resp *http.Response) { io.Copy(ioutil.Discard, resp.Body) resp.Body.Close() } // sequentialKey returns '00012' when size is 5 and num is 12. func sequentialKey(size, num int) string { txt := fmt.Sprintf("%d", num) if len(txt) > size { return txt } delta := size - len(txt) return strings.Repeat("0", delta) + txt } func sameKey(size int) string { return strings.Repeat("a", size) } func walk(targetDir string) (map[string]os.FileInfo, error) { rm := make(map[string]os.FileInfo) visit := func(path string, f os.FileInfo, err error) error { if f != nil { if !f.IsDir() { if !filepath.HasPrefix(path, ".") && !strings.Contains(path, "/.") { wd, err := os.Getwd() if err != nil { return err } rm[filepath.Join(wd, strings.Replace(path, wd, "", -1))] = f } } } return nil } err := filepath.Walk(targetDir, visit) if err != nil { return nil, err } return rm, nil } type filepathSize struct { path string size uint64 sizeTxt string } func filterByKbs(fs []filepathSize, kbLimit int) []filepathSize { var ns []filepathSize for _, v := range fs { if v.size > uint64(kbLimit*1024) { continue } ns = append(ns, v) } return ns } type filepathSizeSlice []filepathSize func (f filepathSizeSlice) Len() int { return len(f) } func (f filepathSizeSlice) Swap(i, j int) { f[i], f[j] = f[j], f[i] } func (f filepathSizeSlice) Less(i, j int) bool { return f[i].size < f[j].size } func walkDir(targetDir string) ([]filepathSize, error) { rm, err := walk(targetDir) if err != nil { return nil, err } var fs []filepathSize for k, v := range rm { fv := filepathSize{ path: k, size: uint64(v.Size()), sizeTxt: humanize.Bytes(uint64(v.Size())), } fs = append(fs, fv) } sort.Sort(filepathSizeSlice(fs)) return fs, nil }