mirror of https://github.com/etcd-io/dbtester.git
Merge pull request #58 from gyuho/new
*: update 'psn' vendor, support quorum/stale reads on Zk, Consul
This commit is contained in:
commit
c5fe7bb466
|
|
@ -324,8 +324,9 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
|
|||
}
|
||||
t.logfile = f
|
||||
|
||||
// this changes for different releases
|
||||
flagString := `-cp zookeeper-3.4.8.jar:lib/slf4j-api-1.6.1.jar:lib/slf4j-log4j12-1.6.1.jar:lib/log4j-1.2.16.jar:conf org.apache.zookeeper.server.quorum.QuorumPeerMain`
|
||||
// TODO: change for different releases
|
||||
// https://zookeeper.apache.org/doc/trunk/zookeeperAdmin.html
|
||||
flagString := `-cp zookeeper-3.4.9.jar:lib/slf4j-api-1.6.1.jar:lib/slf4j-log4j12-1.6.1.jar:lib/log4j-1.2.16.jar:conf org.apache.zookeeper.server.quorum.QuorumPeerMain`
|
||||
args := []string{shell, "-c", javaBinaryPath + " " + flagString + " " + configFilePath}
|
||||
|
||||
cmd := exec.Command(args[0], args[1:]...)
|
||||
|
|
@ -566,7 +567,7 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
|
|||
plog.Infof("uploading agent logs [%q -> %q]", srcAgentLogPath, dstAgentLogPath)
|
||||
for k := 0; k < 30; k++ {
|
||||
if uerr = u.UploadFile(t.req.GoogleCloudStorageBucketName, srcAgentLogPath, dstAgentLogPath); uerr != nil {
|
||||
plog.Error("u.UploadFile error... sleep and retry... (%v)", uerr)
|
||||
plog.Errorf("u.UploadFile error... sleep and retry... (%v)", uerr)
|
||||
time.Sleep(2 * time.Second)
|
||||
continue
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ func CommandFunc(cmd *cobra.Command, args []string) error {
|
|||
return err
|
||||
}
|
||||
nf := dataframe.New()
|
||||
c1, err := fr.GetColumn("unix_ts")
|
||||
c1, err := fr.GetColumn("unix-ts")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -137,13 +137,13 @@ func CommandFunc(cmd *cobra.Command, args []string) error {
|
|||
}
|
||||
}
|
||||
|
||||
// make all columns have equal row number, based on the column unix_ts
|
||||
// make all columns have equal row number, based on the column unix-ts
|
||||
// truncate all rows before maxCommonMinUnixTime and after maxCommonMinUnixTime
|
||||
minTS := fmt.Sprintf("%d", maxCommonMinUnixTime)
|
||||
maxTS := fmt.Sprintf("%d", maxCommonMaxUnixTime)
|
||||
frMonitor := dataframe.New()
|
||||
for i := range frames {
|
||||
uc, err := frames[i].GetColumn("unix_ts")
|
||||
uc, err := frames[i].GetColumn("unix-ts")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -157,7 +157,7 @@ func CommandFunc(cmd *cobra.Command, args []string) error {
|
|||
}
|
||||
|
||||
for _, hd := range frames[i].GetHeader() {
|
||||
if i > 0 && hd == "unix_ts" {
|
||||
if i > 0 && hd == "unix-ts" {
|
||||
continue
|
||||
}
|
||||
var col dataframe.Column
|
||||
|
|
@ -168,7 +168,7 @@ func CommandFunc(cmd *cobra.Command, args []string) error {
|
|||
if err = col.KeepRows(j, k+1); err != nil {
|
||||
return err
|
||||
}
|
||||
if hd != "unix_ts" {
|
||||
if hd != "unix-ts" {
|
||||
switch hd {
|
||||
case "CpuUsageFloat64":
|
||||
hd = "cpu"
|
||||
|
|
@ -199,7 +199,7 @@ func CommandFunc(cmd *cobra.Command, args []string) error {
|
|||
}
|
||||
|
||||
plog.Printf("Step 1-%d-%d: creating dataframe from %s", step1Idx, len(elem.DataPathList), elem.DataBenchmarkPath)
|
||||
colMonitorUnixTs, err := frMonitor.GetColumn("unix_ts")
|
||||
colMonitorUnixTs, err := frMonitor.GetColumn("unix-ts")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -208,7 +208,7 @@ func CommandFunc(cmd *cobra.Command, args []string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
colBenchUnixTs, err := frBench.GetColumn("unix_ts")
|
||||
colBenchUnixTs, err := frBench.GetColumn("unix-ts")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -248,7 +248,7 @@ func CommandFunc(cmd *cobra.Command, args []string) error {
|
|||
}
|
||||
|
||||
for _, hd := range frMonitor.GetHeader() {
|
||||
if hd == "unix_ts" {
|
||||
if hd == "unix-ts" {
|
||||
continue
|
||||
}
|
||||
var col dataframe.Column
|
||||
|
|
@ -267,10 +267,10 @@ func CommandFunc(cmd *cobra.Command, args []string) error {
|
|||
plog.Printf("Step 1-%d-%d: calculating average values", step1Idx, len(elem.DataPathList)+1)
|
||||
var (
|
||||
sampleSize = float64(len(elem.DataPathList))
|
||||
cumulativeThroughputCol = dataframe.NewColumn("cumulative_throughput")
|
||||
cumulativeThroughputCol = dataframe.NewColumn("cumulative-throughput")
|
||||
totalThrougput int
|
||||
avgCpuCol = dataframe.NewColumn("avg_cpu")
|
||||
avgMemCol = dataframe.NewColumn("avg_memory_mb")
|
||||
avgCPUCol = dataframe.NewColumn("avg-cpu")
|
||||
avgMemCol = dataframe.NewColumn("avg-memory-mb")
|
||||
)
|
||||
for i := 0; i < benchLastIdx; i++ {
|
||||
var (
|
||||
|
|
@ -295,12 +295,12 @@ func CommandFunc(cmd *cobra.Command, args []string) error {
|
|||
cumulativeThroughputCol.PushBack(dataframe.NewStringValue(totalThrougput))
|
||||
}
|
||||
}
|
||||
avgCpuCol.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", cpuTotal/sampleSize)))
|
||||
avgCPUCol.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", cpuTotal/sampleSize)))
|
||||
avgMemCol.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", memoryTotal/sampleSize)))
|
||||
}
|
||||
|
||||
plog.Printf("Step 1-%d-%d: combine %s and %q", step1Idx, len(elem.DataPathList)+2, elem.DataBenchmarkPath, elem.DataPathList)
|
||||
unixTsCol, err := frBench.GetColumn("unix_ts")
|
||||
unixTsCol, err := frBench.GetColumn("unix-ts")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -330,7 +330,7 @@ func CommandFunc(cmd *cobra.Command, args []string) error {
|
|||
aggFr.AddColumn(col)
|
||||
}
|
||||
}
|
||||
aggFr.AddColumn(avgCpuCol)
|
||||
aggFr.AddColumn(avgCPUCol)
|
||||
aggFr.AddColumn(avgMemCol)
|
||||
|
||||
plog.Printf("Step 1-%d-%d: saving to %s", step1Idx, len(elem.DataPathList)+3, elem.OutputPath)
|
||||
|
|
@ -355,7 +355,7 @@ func CommandFunc(cmd *cobra.Command, args []string) error {
|
|||
}
|
||||
frames = append(frames, fr)
|
||||
|
||||
col, err := fr.GetColumn("unix_ts")
|
||||
col, err := fr.GetColumn("unix-ts")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -371,7 +371,7 @@ func CommandFunc(cmd *cobra.Command, args []string) error {
|
|||
secondCol.PushBack(dataframe.NewStringValue(i))
|
||||
}
|
||||
nf.AddColumn(secondCol)
|
||||
colsToKeep := []string{"avg_latency_ms", "throughput", "cumulative_throughput", "avg_cpu", "avg_memory_mb"}
|
||||
colsToKeep := []string{"avg_latency_ms", "throughput", "cumulative-throughput", "avg-cpu", "avg-memory-mb"}
|
||||
for i, fr := range frames {
|
||||
dbID := elem.DataList[i].Name
|
||||
plog.Printf("Step 2-%d-%d: cleaning up %s...", step2Idx, i, dbID)
|
||||
|
|
|
|||
|
|
@ -557,12 +557,18 @@ func generateReads(cfg Config, key string, requests chan<- request) {
|
|||
requests <- request{etcdv3Op: clientv3.OpGet(key, opts...)}
|
||||
|
||||
case "zk", "zookeeper":
|
||||
// serializable read by default
|
||||
requests <- request{zkOp: zkOp{key: key}}
|
||||
op := zkOp{key: key}
|
||||
if cfg.Step2.LocalRead {
|
||||
op.staleRead = true
|
||||
}
|
||||
requests <- request{zkOp: op}
|
||||
|
||||
case "consul":
|
||||
// serializable read by default
|
||||
requests <- request{consulOp: consulOp{key: key}}
|
||||
op := consulOp{key: key}
|
||||
if cfg.Step2.LocalRead {
|
||||
op.staleRead = true
|
||||
}
|
||||
requests <- request{consulOp: op}
|
||||
}
|
||||
if cfg.Step2.RequestIntervalMs > 0 {
|
||||
time.Sleep(time.Duration(cfg.Step2.RequestIntervalMs) * time.Millisecond)
|
||||
|
|
|
|||
|
|
@ -15,6 +15,8 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
clientv2 "github.com/coreos/etcd/client"
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
consulapi "github.com/hashicorp/consul/api"
|
||||
|
|
@ -80,14 +82,27 @@ func newGetEtcd3(conn clientv3.KV) ReqHandler {
|
|||
|
||||
func newGetZK(conn *zk.Conn) ReqHandler {
|
||||
return func(ctx context.Context, req *request) error {
|
||||
errt := ""
|
||||
if !req.zkOp.staleRead {
|
||||
_, err := conn.Sync(req.zkOp.key)
|
||||
if err != nil {
|
||||
errt += err.Error()
|
||||
}
|
||||
}
|
||||
_, _, err := conn.Get(req.zkOp.key)
|
||||
return err
|
||||
if err != nil {
|
||||
errt += ";" + err.Error()
|
||||
}
|
||||
if errt != "" {
|
||||
return errors.New(errt)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func newGetConsul(conn *consulapi.KV) ReqHandler {
|
||||
return func(ctx context.Context, req *request) error {
|
||||
_, _, err := conn.Get(req.consulOp.key, &consulapi.QueryOptions{AllowStale: true})
|
||||
_, _, err := conn.Get(req.consulOp.key, &consulapi.QueryOptions{AllowStale: req.consulOp.staleRead})
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -110,7 +110,7 @@ func (sp *secondPoints) getTimeSeries() TimeSeries {
|
|||
func (ts TimeSeries) String() string {
|
||||
buf := new(bytes.Buffer)
|
||||
wr := csv.NewWriter(buf)
|
||||
if err := wr.Write([]string{"unix_ts", "avg_latency_ms", "throughput"}); err != nil {
|
||||
if err := wr.Write([]string{"unix-ts", "avg-latency-ms", "throughput"}); err != nil {
|
||||
plog.Fatal(err)
|
||||
}
|
||||
rows := [][]string{}
|
||||
|
|
|
|||
|
|
@ -58,13 +58,15 @@ type etcdv2Op struct {
|
|||
}
|
||||
|
||||
type zkOp struct {
|
||||
key string
|
||||
value []byte
|
||||
key string
|
||||
value []byte
|
||||
staleRead bool
|
||||
}
|
||||
|
||||
type consulOp struct {
|
||||
key string
|
||||
value []byte
|
||||
key string
|
||||
value []byte
|
||||
staleRead bool
|
||||
}
|
||||
|
||||
var (
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
hash: b4cda15a1fdf34ae8833ffcef8200fab6fd1300930a8f87aca11292bfe869fb6
|
||||
updated: 2016-11-08T19:59:54.043067507-08:00
|
||||
hash: be4fa6455a48e608f8f01354a06963bad34cd33cd7affc4b2c31ad9a63f594ad
|
||||
updated: 2016-11-08T23:36:14.836893881-08:00
|
||||
imports:
|
||||
- name: bitbucket.org/ww/goautoneg
|
||||
version: 75cd24fc2f2c2a2088577d12123ddee5f54e0675
|
||||
|
|
@ -99,7 +99,7 @@ imports:
|
|||
- name: github.com/gyuho/dataframe
|
||||
version: 573cd728a011e5473510a6a1df0f39023c305e04
|
||||
- name: github.com/gyuho/psn
|
||||
version: 415d7423cced5ca6bf6dbf81c5cbe37605f1dfaa
|
||||
version: 29bf72881f998ac840f3cd5d30ed22c9a8fe50a9
|
||||
subpackages:
|
||||
- process
|
||||
- name: github.com/hashicorp/consul
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ import:
|
|||
- package: github.com/gyuho/dataframe
|
||||
version: 573cd728a011e5473510a6a1df0f39023c305e04
|
||||
- package: github.com/gyuho/psn
|
||||
version: 415d7423cced5ca6bf6dbf81c5cbe37605f1dfaa
|
||||
version: 29bf72881f998ac840f3cd5d30ed22c9a8fe50a9
|
||||
subpackages:
|
||||
- process
|
||||
- package: github.com/hashicorp/consul
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ var (
|
|||
)
|
||||
|
||||
func init() {
|
||||
for i, v := range append([]string{"unix_ts"}, ProcessTableColumns...) {
|
||||
for i, v := range append([]string{"unix-ts"}, ProcessTableColumns...) {
|
||||
ColumnsPS[v] = i
|
||||
}
|
||||
}
|
||||
|
|
@ -104,10 +104,10 @@ func ReadCSVs(columns map[string]int, fpaths ...string) (Table, error) {
|
|||
ntb.MinTS = minTS
|
||||
ntb.MaxTS = maxTS
|
||||
ntb.Columns = make(map[string]int)
|
||||
ntb.Columns["unix_tx"] = 0
|
||||
ntb.Columns["unix-ts"] = 0
|
||||
for i := range fpaths {
|
||||
ntb.Columns[fmt.Sprintf("cpu_%d", i+1)] = 2*i + 1
|
||||
ntb.Columns[fmt.Sprintf("memory_mb_%d", i+1)] = 2*i + 2
|
||||
ntb.Columns[fmt.Sprintf("cpu-%d", i+1)] = 2*i + 1
|
||||
ntb.Columns[fmt.Sprintf("memory-mb-%d", i+1)] = 2*i + 2
|
||||
}
|
||||
columnSlice := make([]string, len(ntb.Columns))
|
||||
for k, v := range ntb.Columns {
|
||||
|
|
@ -123,7 +123,7 @@ func ReadCSVs(columns map[string]int, fpaths ...string) (Table, error) {
|
|||
// push-front from minTS to tb.MinTS-1
|
||||
rows := make([][]string, tb.MinTS-minTS)
|
||||
for i := range rows {
|
||||
emptyRow := append([]string{fmt.Sprintf("%d", minTS+int64(i))}, strings.Split(strings.Repeat("0.00_", len(ProcessTableColumns)), "_")...)
|
||||
emptyRow := append([]string{fmt.Sprintf("%d", minTS+int64(i))}, strings.Split(strings.Repeat("0.00-", len(ProcessTableColumns)), "-")...)
|
||||
rows[i] = emptyRow
|
||||
}
|
||||
tb.Rows = append(rows, tb.Rows...)
|
||||
|
|
@ -132,7 +132,7 @@ func ReadCSVs(columns map[string]int, fpaths ...string) (Table, error) {
|
|||
// push-back from tb.MaxTS+1 to maxTS
|
||||
rows := make([][]string, maxTS-tb.MaxTS)
|
||||
for i := range rows {
|
||||
emptyRow := append([]string{fmt.Sprintf("%d", tb.MaxTS+int64(i))}, strings.Split(strings.Repeat("0.00_", len(ProcessTableColumns)), "_")...)
|
||||
emptyRow := append([]string{fmt.Sprintf("%d", tb.MaxTS+int64(i))}, strings.Split(strings.Repeat("0.00-", len(ProcessTableColumns)), "-")...)
|
||||
rows[i] = emptyRow
|
||||
}
|
||||
tb.Rows = append(tb.Rows, rows...)
|
||||
|
|
@ -198,7 +198,7 @@ func ReadCSVFillIn(fpath string) (Table, error) {
|
|||
return Table{}, err
|
||||
}
|
||||
var (
|
||||
uidx = tb.Columns["unix_ts"]
|
||||
uidx = tb.Columns["unix-ts"]
|
||||
tsToRow = make(map[int64][]string)
|
||||
)
|
||||
for _, row := range tb.Rows {
|
||||
|
|
|
|||
|
|
@ -236,7 +236,7 @@ func WriteToCSV(f *os.File, pss ...Process) error {
|
|||
|
||||
var werr error
|
||||
writeCSVHeader := func() {
|
||||
if err := wr.Write(append([]string{"unix_ts"}, ProcessTableColumns...)); err != nil {
|
||||
if err := wr.Write(append([]string{"unix-ts"}, ProcessTableColumns...)); err != nil {
|
||||
werr = err
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue