From 39b5064d5797bad4a9b2438953e0f8e33f6df26c Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Mon, 6 Feb 2017 21:41:58 -0800 Subject: [PATCH] analyze: clean up, use database description for plots --- ...ta.go => 01_read_raw_data_to_test_data.go} | 0 ...=> 02_read_all_metrics_to_analyze_data.go} | 0 ...read_benchmark_metrics_to_analyze_data.go} | 0 ...ll.go => 04_aggregate_all_analyze_data.go} | 0 .../{analyze_data_5_plot.go => 05_plot.go} | 0 analyze/command.go | 938 ++++++++++++++++- analyze/finalize_everything.go | 949 ------------------ ...te_timeseries.go => process_timeseries.go} | 0 ...ies_test.go => process_timeseries_test.go} | 0 9 files changed, 937 insertions(+), 950 deletions(-) rename analyze/{analyze_data_1_read_raw_data.go => 01_read_raw_data_to_test_data.go} (100%) rename analyze/{analyze_data_2_analyze_data.go => 02_read_all_metrics_to_analyze_data.go} (100%) rename analyze/{analyze_data_3_benchmark_metrics.go => 03_read_benchmark_metrics_to_analyze_data.go} (100%) rename analyze/{analyze_data_4_aggregate_all.go => 04_aggregate_all_analyze_data.go} (100%) rename analyze/{analyze_data_5_plot.go => 05_plot.go} (100%) delete mode 100644 analyze/finalize_everything.go rename analyze/{analyze_data_4_aggregate_timeseries.go => process_timeseries.go} (100%) rename analyze/{analyze_data_4_aggregate_timeseries_test.go => process_timeseries_test.go} (100%) diff --git a/analyze/analyze_data_1_read_raw_data.go b/analyze/01_read_raw_data_to_test_data.go similarity index 100% rename from analyze/analyze_data_1_read_raw_data.go rename to analyze/01_read_raw_data_to_test_data.go diff --git a/analyze/analyze_data_2_analyze_data.go b/analyze/02_read_all_metrics_to_analyze_data.go similarity index 100% rename from analyze/analyze_data_2_analyze_data.go rename to analyze/02_read_all_metrics_to_analyze_data.go diff --git a/analyze/analyze_data_3_benchmark_metrics.go b/analyze/03_read_benchmark_metrics_to_analyze_data.go similarity index 100% rename from analyze/analyze_data_3_benchmark_metrics.go rename to analyze/03_read_benchmark_metrics_to_analyze_data.go diff --git a/analyze/analyze_data_4_aggregate_all.go b/analyze/04_aggregate_all_analyze_data.go similarity index 100% rename from analyze/analyze_data_4_aggregate_all.go rename to analyze/04_aggregate_all_analyze_data.go diff --git a/analyze/analyze_data_5_plot.go b/analyze/05_plot.go similarity index 100% rename from analyze/analyze_data_5_plot.go rename to analyze/05_plot.go diff --git a/analyze/command.go b/analyze/command.go index 87724f1d..95f08237 100644 --- a/analyze/command.go +++ b/analyze/command.go @@ -14,7 +14,22 @@ package analyze -import "github.com/spf13/cobra" +import ( + "bytes" + "encoding/csv" + "fmt" + "path/filepath" + "reflect" + "sort" + "strconv" + "strings" + + "github.com/coreos/dbtester" + humanize "github.com/dustin/go-humanize" + "github.com/gyuho/dataframe" + "github.com/olekukonko/tablewriter" + "github.com/spf13/cobra" +) // Command implements 'analyze' command. var Command = &cobra.Command{ @@ -32,3 +47,924 @@ func init() { func commandFunc(cmd *cobra.Command, args []string) error { return do(configPath) } + +type allAggregatedData struct { + title string + data []*analyzeData + headerToDatabaseID map[string]string + headerToDatabaseDescription map[string]string + allDatabaseIDList []string +} + +func do(configPath string) error { + cfg, err := dbtester.ReadConfig(configPath, true) + if err != nil { + return err + } + + all := &allAggregatedData{ + title: cfg.TestTitle, + data: make([]*analyzeData, 0, len(cfg.DatabaseIDToTestData)), + headerToDatabaseID: make(map[string]string), + headerToDatabaseDescription: make(map[string]string), + allDatabaseIDList: cfg.AllDatabaseIDList, + } + for _, databaseID := range cfg.AllDatabaseIDList { + testgroup := cfg.DatabaseIDToTestGroup[databaseID] + testdata := cfg.DatabaseIDToTestData[databaseID] + + plog.Printf("reading system metrics data for %s", databaseID) + ad, err := readSystemMetricsAll(testdata.ServerSystemMetricsInterpolatedPathList...) + if err != nil { + return err + } + ad.databaseTag = testgroup.DatabaseTag + ad.legend = testgroup.DatabaseDescription + ad.allAggregatedOutputPath = testdata.AllAggregatedOutputPath + + if err = ad.aggSystemMetrics(); err != nil { + return err + } + if err = ad.importBenchMetrics(testdata.ClientLatencyThroughputTimeseriesPath); err != nil { + return err + } + if err = ad.aggregateAll(testdata.ServerMemoryByKeyNumberPath, testgroup.RequestNumber); err != nil { + return err + } + if err = ad.save(); err != nil { + return err + } + + all.data = append(all.data, ad) + for _, hd := range ad.aggregated.Headers() { + all.headerToDatabaseID[makeHeader(hd, testgroup.DatabaseTag)] = databaseID + all.headerToDatabaseDescription[makeHeader(hd, testgroup.DatabaseTag)] = testgroup.DatabaseDescription + } + } + + // aggregated everything + // 1. sum of all network usage per database + // 2. throughput, latency percentiles distribution + // + // FIRST ROW FOR HEADER: etcd, Zookeeper, Consul, ... + // FIRST COLUMN FOR LABELS: READS-COMPLETED-DELTA, ... + // SECOND COLUMNS ~ FOR DATA + row00Header := []string{""} // first is empty + for _, ad := range all.data { + // per database + for _, col := range ad.aggregated.Columns() { + databaseID := all.headerToDatabaseID[col.Header()] + row00Header = append(row00Header, cfg.DatabaseIDToTestGroup[databaseID].DatabaseTag) + break + } + } + + row19ServerReceiveBytesSum := []string{"SERVER-TOTAL-NETWORK-RECEIVE-DATA-SUM"} + row20ServerTransmitBytesSum := []string{"SERVER-TOTAL-NETWORK-TRANSMIT-DATA-SUM"} + row23ServerMaxCPUUsage := []string{"SERVER-MAX-CPU-USAGE"} + row24ServerMaxMemoryUsage := []string{"SERVER-MAX-MEMORY-USAGE"} + row28ReadsCompletedDeltaSum := []string{"SERVER-AVG-READS-COMPLETED-DELTA-SUM"} + row29SectorsReadDeltaSum := []string{"SERVER-AVG-SECTORS-READS-DELTA-SUM"} + row30WritesCompletedDeltaSum := []string{"SERVER-AVG-WRITES-COMPLETED-DELTA-SUM"} + row31SectorsWrittenDeltaSum := []string{"SERVER-AVG-SECTORS-WRITTEN-DELTA-SUM"} + + // iterate each database's all data + for _, ad := range all.data { + // per database + var ( + readsCompletedDeltaSum float64 + sectorsReadDeltaSum float64 + writesCompletedDeltaSum float64 + sectorsWrittenDeltaSum float64 + receiveBytesNumDeltaSum float64 + transmitBytesNumDeltaSum float64 + maxAvgCPU float64 + maxAvgVMRSSMBs []float64 + ) + for _, col := range ad.aggregated.Columns() { + hdr := col.Header() + switch { + case strings.HasPrefix(hdr, "RECEIVE-BYTES-NUM-DELTA-"): + cnt := col.Count() + for j := 0; j < cnt; j++ { + vv, err := col.Value(j) + if err != nil { + return err + } + fv, _ := vv.Float64() + receiveBytesNumDeltaSum += fv + } + case strings.HasPrefix(hdr, "TRANSMIT-BYTES-NUM-DELTA-"): + cnt := col.Count() + for j := 0; j < cnt; j++ { + vv, err := col.Value(j) + if err != nil { + return err + } + fv, _ := vv.Float64() + transmitBytesNumDeltaSum += fv + } + case strings.HasPrefix(hdr, "READS-COMPLETED-DELTA-"): + cnt := col.Count() + for j := 0; j < cnt; j++ { + vv, err := col.Value(j) + if err != nil { + return err + } + fv, _ := vv.Float64() + readsCompletedDeltaSum += fv + } + case strings.HasPrefix(hdr, "SECTORS-READS-DELTA-"): + cnt := col.Count() + for j := 0; j < cnt; j++ { + vv, err := col.Value(j) + if err != nil { + return err + } + fv, _ := vv.Float64() + sectorsReadDeltaSum += fv + } + case strings.HasPrefix(hdr, "WRITES-COMPLETED-DELTA-"): + cnt := col.Count() + for j := 0; j < cnt; j++ { + vv, err := col.Value(j) + if err != nil { + return err + } + fv, _ := vv.Float64() + writesCompletedDeltaSum += fv + } + case strings.HasPrefix(hdr, "SECTORS-WRITTEN-DELTA-"): + cnt := col.Count() + for j := 0; j < cnt; j++ { + vv, err := col.Value(j) + if err != nil { + return err + } + fv, _ := vv.Float64() + sectorsWrittenDeltaSum += fv + } + case strings.HasPrefix(hdr, "AVG-CPU-"): + cnt := col.Count() + for j := 0; j < cnt; j++ { + vv, err := col.Value(j) + if err != nil { + return err + } + fv, _ := vv.Float64() + maxAvgCPU = maxFloat64(maxAvgCPU, fv) + } + case strings.HasPrefix(hdr, "AVG-VMRSS-MB-"): + cnt := col.Count() + for j := 0; j < cnt; j++ { + vv, err := col.Value(j) + if err != nil { + return err + } + fv, _ := vv.Float64() + maxAvgVMRSSMBs = append(maxAvgVMRSSMBs, fv) + } + } + } + + row19ServerReceiveBytesSum = append(row19ServerReceiveBytesSum, humanize.Bytes(uint64(receiveBytesNumDeltaSum))) + row20ServerTransmitBytesSum = append(row20ServerTransmitBytesSum, humanize.Bytes(uint64(transmitBytesNumDeltaSum))) + row23ServerMaxCPUUsage = append(row23ServerMaxCPUUsage, fmt.Sprintf("%.2f %%", maxAvgCPU)) + row28ReadsCompletedDeltaSum = append(row28ReadsCompletedDeltaSum, humanize.Comma(int64(readsCompletedDeltaSum))) + row29SectorsReadDeltaSum = append(row29SectorsReadDeltaSum, humanize.Comma(int64(sectorsReadDeltaSum))) + row30WritesCompletedDeltaSum = append(row30WritesCompletedDeltaSum, humanize.Comma(int64(writesCompletedDeltaSum))) + row31SectorsWrittenDeltaSum = append(row31SectorsWrittenDeltaSum, humanize.Comma(int64(sectorsWrittenDeltaSum))) + + // TODO: handle overflowed memory value? + sort.Float64s(maxAvgVMRSSMBs) + mv := maxAvgVMRSSMBs[len(maxAvgVMRSSMBs)-1] + mb := uint64(mv * 1000000) + row24ServerMaxMemoryUsage = append(row24ServerMaxMemoryUsage, humanize.Bytes(mb)) + } + + row01TotalSeconds := []string{"TOTAL-SECONDS"} // TOTAL-SECONDS + row02TotalRequestNumber := []string{"TOTAL-REQUEST-NUMBER"} + row05MaxThroughput := []string{"MAX-THROUGHPUT"} // MAX AVG-THROUGHPUT + row06AverageThroughput := []string{"AVG-THROUGHPUT"} // REQUESTS-PER-SECOND + row07MinThroughput := []string{"MIN-THROUGHPUT"} // MIN AVG-THROUGHPUT + row08FastestLatency := []string{"FASTEST-LATENCY"} // FASTEST-LATENCY-MS + row09AverageLatency := []string{"AVG-LATENCY"} // AVERAGE-LATENCY-MS + row10SlowestLatency := []string{"SLOWEST-LATENCY"} // SLOWEST-LATENCY-MS + row11p10 := []string{"Latency p10"} // p10 + row12p25 := []string{"Latency p25"} // p25 + row13p50 := []string{"Latency p50"} // p50 + row14p75 := []string{"Latency p75"} // p75 + row15p90 := []string{"Latency p90"} // p90 + row16p95 := []string{"Latency p95"} // p95 + row17p99 := []string{"Latency p99"} // p99 + row18p999 := []string{"Latency p99.9"} // p99.9 + row21ClientReceiveBytesSum := []string{"CLIENT-TOTAL-NETWORK-RECEIVE-SUM"} // RECEIVE-BYTES-NUM-DELTA + row22lientTransmitBytesSum := []string{"CLIENT-TOTAL-NETWORK-TRANSMIT-SUM"} // TRANSMIT-BYTES-DELTA + row25ClientMaxCPU := []string{"CLIENT-MAX-CPU-USAGE"} // CPU-NUM + row26ClientMaxMemory := []string{"CLIENT-MAX-MEMORY-USAGE"} // VMRSS-NUM + row27ClientErrorCount := []string{"CLIENT-ERROR-COUNT"} // ERROR: + row32AverageDatasize := []string{"SERVER-AVG-DATA-SIZE-ON-DISK"} // TOTAL-DATA-SIZE + + databaseIDToErrs := make(map[string][]string) + for i, databaseID := range cfg.AllDatabaseIDList { + testgroup := cfg.DatabaseIDToTestGroup[databaseID] + testdata := cfg.DatabaseIDToTestData[databaseID] + + tag := testdata.DatabaseTag + if tag != row00Header[i+1] { + return fmt.Errorf("analyze config has different order; expected %q, got %q", row00Header[i+1], tag) + } + row02TotalRequestNumber = append(row02TotalRequestNumber, humanize.Comma(testgroup.RequestNumber)) + + { + fr, err := dataframe.NewFromCSV(nil, testdata.ClientSystemMetricsInterpolatedPath) + if err != nil { + return err + } + + var receiveBytesNumDeltaSum uint64 + col, err := fr.Column("RECEIVE-BYTES-NUM-DELTA") + if err != nil { + return err + } + for i := 0; i < col.Count(); i++ { + v, err := col.Value(i) + if err != nil { + return err + } + iv, _ := v.Uint64() + receiveBytesNumDeltaSum += iv + } + + var transmitBytesNumDeltaSum uint64 + col, err = fr.Column("TRANSMIT-BYTES-NUM-DELTA") + if err != nil { + return err + } + for i := 0; i < col.Count(); i++ { + v, err := col.Value(i) + if err != nil { + return err + } + iv, _ := v.Uint64() + transmitBytesNumDeltaSum += iv + } + + var maxAvgCPU float64 + col, err = fr.Column("CPU-NUM") + if err != nil { + return err + } + for i := 0; i < col.Count(); i++ { + v, err := col.Value(i) + if err != nil { + return err + } + fv, _ := v.Float64() + if maxAvgCPU == 0 || fv > maxAvgCPU { + maxAvgCPU = fv + } + } + + var maxVMRSSNum uint64 + col, err = fr.Column("VMRSS-NUM") + if err != nil { + return err + } + for i := 0; i < col.Count(); i++ { + v, err := col.Value(i) + if err != nil { + return err + } + iv, _ := v.Uint64() + if maxVMRSSNum == 0 || iv > maxVMRSSNum { + maxVMRSSNum = iv + } + } + + row21ClientReceiveBytesSum = append(row21ClientReceiveBytesSum, humanize.Bytes(receiveBytesNumDeltaSum)) + row22lientTransmitBytesSum = append(row22lientTransmitBytesSum, humanize.Bytes(transmitBytesNumDeltaSum)) + row25ClientMaxCPU = append(row25ClientMaxCPU, fmt.Sprintf("%.2f %%", maxAvgCPU)) + row26ClientMaxMemory = append(row26ClientMaxMemory, humanize.Bytes(maxVMRSSNum)) + } + { + f, err := openToRead(testdata.ClientLatencyDistributionSummaryPath) + if err != nil { + return err + } + defer f.Close() + + rd := csv.NewReader(f) + + // FieldsPerRecord is the number of expected fields per record. + // If FieldsPerRecord is positive, Read requires each record to + // have the given number of fields. If FieldsPerRecord is 0, Read sets it to + // the number of fields in the first record, so that future records must + // have the same field count. If FieldsPerRecord is negative, no check is + // made and records may have a variable number of fields. + rd.FieldsPerRecord = -1 + + rows, err := rd.ReadAll() + if err != nil { + return err + } + + var totalErrCnt int64 + for _, row := range rows { + switch row[0] { + case "TOTAL-SECONDS": + row01TotalSeconds = append(row01TotalSeconds, fmt.Sprintf("%s sec", row[1])) + case "REQUESTS-PER-SECOND": + fv, err := strconv.ParseFloat(row[1], 64) + if err != nil { + return err + } + avg := int64(fv) + row06AverageThroughput = append(row06AverageThroughput, fmt.Sprintf("%s req/sec", humanize.Comma(avg))) + case "SLOWEST-LATENCY-MS": + row10SlowestLatency = append(row10SlowestLatency, fmt.Sprintf("%s ms", row[1])) + case "FASTEST-LATENCY-MS": + row08FastestLatency = append(row08FastestLatency, fmt.Sprintf("%s ms", row[1])) + case "AVERAGE-LATENCY-MS": + row09AverageLatency = append(row09AverageLatency, fmt.Sprintf("%s ms", row[1])) + } + + if strings.HasPrefix(row[0], "ERROR:") { + iv, err := strconv.ParseInt(row[1], 10, 64) + if err != nil { + return err + } + totalErrCnt += iv + + c1 := strings.TrimSpace(strings.Replace(row[0], "ERROR:", "", -1)) + c2 := humanize.Comma(iv) + es := fmt.Sprintf("%s (count %s)", c1, c2) + if _, ok := databaseIDToErrs[databaseID]; !ok { + databaseIDToErrs[databaseID] = []string{es} + } else { + databaseIDToErrs[databaseID] = append(databaseIDToErrs[databaseID], es) + } + } + } + row27ClientErrorCount = append(row27ClientErrorCount, humanize.Comma(totalErrCnt)) + } + { + fr, err := dataframe.NewFromCSV(nil, testdata.ClientLatencyThroughputTimeseriesPath) + if err != nil { + return err + } + col, err := fr.Column("AVG-THROUGHPUT") + if err != nil { + return err + } + var min int64 + var max int64 + for i := 0; i < col.Count(); i++ { + val, err := col.Value(i) + if err != nil { + return err + } + fv, _ := val.Float64() + + if i == 0 { + min = int64(fv) + } + if max < int64(fv) { + max = int64(fv) + } + if min > int64(fv) { + min = int64(fv) + } + } + row05MaxThroughput = append(row05MaxThroughput, fmt.Sprintf("%s req/sec", humanize.Comma(max))) + row07MinThroughput = append(row07MinThroughput, fmt.Sprintf("%s req/sec", humanize.Comma(min))) + } + { + fr, err := dataframe.NewFromCSV(nil, testdata.ServerDatasizeOnDiskSummaryPath) + if err != nil { + return err + } + col, err := fr.Column(dbtester.DatasizeOnDiskSummaryColumns[3]) // datasize in bytes + if err != nil { + return err + } + var sum float64 + for i := 0; i < col.Count(); i++ { + val, err := col.Value(i) + if err != nil { + return err + } + fv, _ := val.Float64() + sum += fv + } + avg := uint64(sum / float64(col.Count())) + row32AverageDatasize = append(row32AverageDatasize, humanize.Bytes(avg)) + } + { + f, err := openToRead(testdata.ClientLatencyDistributionPercentilePath) + if err != nil { + return err + } + defer f.Close() + + rd := csv.NewReader(f) + + // FieldsPerRecord is the number of expected fields per record. + // If FieldsPerRecord is positive, Read requires each record to + // have the given number of fields. If FieldsPerRecord is 0, Read sets it to + // the number of fields in the first record, so that future records must + // have the same field count. If FieldsPerRecord is negative, no check is + // made and records may have a variable number of fields. + rd.FieldsPerRecord = -1 + + rows, err := rd.ReadAll() + if err != nil { + return err + } + + for ri, row := range rows { + if ri == 0 { + continue // skip header + } + switch row[0] { + case "p10": + row11p10 = append(row11p10, fmt.Sprintf("%s ms", row[1])) + case "p25": + row12p25 = append(row12p25, fmt.Sprintf("%s ms", row[1])) + case "p50": + row13p50 = append(row13p50, fmt.Sprintf("%s ms", row[1])) + case "p75": + row14p75 = append(row14p75, fmt.Sprintf("%s ms", row[1])) + case "p90": + row15p90 = append(row15p90, fmt.Sprintf("%s ms", row[1])) + case "p95": + row16p95 = append(row16p95, fmt.Sprintf("%s ms", row[1])) + case "p99": + row17p99 = append(row17p99, fmt.Sprintf("%s ms", row[1])) + case "p99.9": + row18p999 = append(row18p999, fmt.Sprintf("%s ms", row[1])) + } + } + } + } + + aggRows := [][]string{ + row00Header, + row01TotalSeconds, + row02TotalRequestNumber, + row05MaxThroughput, + row06AverageThroughput, + row07MinThroughput, + row08FastestLatency, + row09AverageLatency, + row10SlowestLatency, + row11p10, + row12p25, + row13p50, + row14p75, + row15p90, + row16p95, + row17p99, + row18p999, + + row19ServerReceiveBytesSum, + row20ServerTransmitBytesSum, + row21ClientReceiveBytesSum, + row22lientTransmitBytesSum, + + row23ServerMaxCPUUsage, + row24ServerMaxMemoryUsage, + row25ClientMaxCPU, + row26ClientMaxMemory, + + row27ClientErrorCount, + + row28ReadsCompletedDeltaSum, + row29SectorsReadDeltaSum, + row30WritesCompletedDeltaSum, + row31SectorsWrittenDeltaSum, + row32AverageDatasize, + } + + plog.Printf("saving summary data to %q", cfg.Analyze.AllAggregatedOutputPathCSV) + file, err := openToOverwrite(cfg.Analyze.AllAggregatedOutputPathCSV) + if err != nil { + return err + } + defer file.Close() + wr := csv.NewWriter(file) + if err := wr.WriteAll(aggRows); err != nil { + return err + } + wr.Flush() + if err := wr.Error(); err != nil { + return err + } + buf := new(bytes.Buffer) + tw := tablewriter.NewWriter(buf) + tw.SetHeader(aggRows[0]) + for _, row := range aggRows[1:] { + tw.Append(row) + } + tw.SetAutoFormatHeaders(false) + tw.SetAlignment(tablewriter.ALIGN_RIGHT) + tw.Render() + + errs := "" + for _, databaseID := range cfg.AllDatabaseIDList { + es, ok := databaseIDToErrs[databaseID] + if !ok { + continue + } + errs = databaseID + " " + "errors:\n" + strings.Join(es, "\n") + "\n" + } + plog.Printf("saving summary data to %q", cfg.Analyze.AllAggregatedOutputPathTXT) + stxt := buf.String() + if errs != "" { + stxt += "\n" + "\n" + errs + } + if err := toFile(stxt, changeExtToTxt(cfg.Analyze.AllAggregatedOutputPathTXT)); err != nil { + return err + } + + // KEYS, MIN-LATENCY-MS, AVG-LATENCY-MS, MAX-LATENCY-MS + plog.Info("combining all latency data by keys") + allLatencyFrame := dataframe.New() + for _, databaseID := range cfg.AllDatabaseIDList { + testdata := cfg.DatabaseIDToTestData[databaseID] + + fr, err := dataframe.NewFromCSV(nil, testdata.ClientLatencyByKeyNumberPath) + if err != nil { + return err + } + colKeys, err := fr.Column("KEYS") + if err != nil { + return err + } + colKeys.UpdateHeader(makeHeader("KEYS", testdata.DatabaseTag)) + if err = allLatencyFrame.AddColumn(colKeys); err != nil { + return err + } + + colMinLatency, err := fr.Column("MIN-LATENCY-MS") + if err != nil { + return err + } + colMinLatency.UpdateHeader(makeHeader("MIN-LATENCY-MS", testdata.DatabaseTag)) + if err = allLatencyFrame.AddColumn(colMinLatency); err != nil { + return err + } + + colAvgLatency, err := fr.Column("AVG-LATENCY-MS") + if err != nil { + return err + } + colAvgLatency.UpdateHeader(makeHeader("AVG-LATENCY-MS", testdata.DatabaseTag)) + if err = allLatencyFrame.AddColumn(colAvgLatency); err != nil { + return err + } + + colMaxLatency, err := fr.Column("MAX-LATENCY-MS") + if err != nil { + return err + } + colMaxLatency.UpdateHeader(makeHeader("MAX-LATENCY-MS", testdata.DatabaseTag)) + if err = allLatencyFrame.AddColumn(colMaxLatency); err != nil { + return err + } + } + // KEYS, MIN-VMRSS-MB, AVG-VMRSS-MB, MAX-VMRSS-MB + plog.Info("combining all server memory usage by keys") + allMemoryFrame := dataframe.New() + for _, databaseID := range cfg.AllDatabaseIDList { + testdata := cfg.DatabaseIDToTestData[databaseID] + + fr, err := dataframe.NewFromCSV(nil, testdata.ServerMemoryByKeyNumberPath) + if err != nil { + return err + } + colKeys, err := fr.Column("KEYS") + if err != nil { + return err + } + colKeys.UpdateHeader(makeHeader("KEYS", testdata.DatabaseTag)) + if err = allMemoryFrame.AddColumn(colKeys); err != nil { + return err + } + + colMemMin, err := fr.Column("MIN-VMRSS-MB") + if err != nil { + return err + } + colMemMin.UpdateHeader(makeHeader("MIN-VMRSS-MB", testdata.DatabaseTag)) + if err = allMemoryFrame.AddColumn(colMemMin); err != nil { + return err + } + + colMem, err := fr.Column("AVG-VMRSS-MB") + if err != nil { + return err + } + colMem.UpdateHeader(makeHeader("AVG-VMRSS-MB", testdata.DatabaseTag)) + if err = allMemoryFrame.AddColumn(colMem); err != nil { + return err + } + + colMemMax, err := fr.Column("MAX-VMRSS-MB") + if err != nil { + return err + } + colMemMax.UpdateHeader(makeHeader("MAX-VMRSS-MB", testdata.DatabaseTag)) + if err = allMemoryFrame.AddColumn(colMemMax); err != nil { + return err + } + } + + { + allLatencyFrameCfg := dbtester.Plot{ + Column: "AVG-LATENCY-MS", + XAxis: "Cumulative Number of Keys", + YAxis: "Latency(millisecond) by Keys", + OutputPathList: make([]string, len(cfg.PlotList[0].OutputPathList)), + } + allLatencyFrameCfg.OutputPathList[0] = filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-LATENCY-MS-BY-KEY.svg") + allLatencyFrameCfg.OutputPathList[1] = filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-LATENCY-MS-BY-KEY.png") + plog.Printf("plotting %v", allLatencyFrameCfg.OutputPathList) + var pairs []pair + allCols := allLatencyFrame.Columns() + for i := 0; i < len(allCols)-3; i += 4 { + pairs = append(pairs, pair{ + x: allCols[i], // x + y: allCols[i+2], // avg + }) + } + if err = all.drawXY(allLatencyFrameCfg, pairs...); err != nil { + return err + } + newCSV := dataframe.New() + for _, p := range pairs { + if err = newCSV.AddColumn(p.x); err != nil { + return err + } + if err = newCSV.AddColumn(p.y); err != nil { + return err + } + } + csvPath := filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-LATENCY-MS-BY-KEY.csv") + if err := newCSV.CSV(csvPath); err != nil { + return err + } + } + { + // with error points + allLatencyFrameCfg := dbtester.Plot{ + Column: "AVG-LATENCY-MS", + XAxis: "Cumulative Number of Keys", + YAxis: "Latency(millisecond) by Keys", + OutputPathList: make([]string, len(cfg.PlotList[0].OutputPathList)), + } + allLatencyFrameCfg.OutputPathList[0] = filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-LATENCY-MS-BY-KEY-ERROR-POINTS.svg") + allLatencyFrameCfg.OutputPathList[1] = filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-LATENCY-MS-BY-KEY-ERROR-POINTS.png") + plog.Printf("plotting %v", allLatencyFrameCfg.OutputPathList) + var triplets []triplet + allCols := allLatencyFrame.Columns() + for i := 0; i < len(allCols)-3; i += 4 { + triplets = append(triplets, triplet{ + x: allCols[i], + minCol: allCols[i+1], + avgCol: allCols[i+2], + maxCol: allCols[i+3], + }) + } + if err = all.drawXYWithErrorPoints(allLatencyFrameCfg, triplets...); err != nil { + return err + } + newCSV := dataframe.New() + for _, tri := range triplets { + if err = newCSV.AddColumn(tri.x); err != nil { + return err + } + if err = newCSV.AddColumn(tri.minCol); err != nil { + return err + } + if err = newCSV.AddColumn(tri.avgCol); err != nil { + return err + } + if err = newCSV.AddColumn(tri.maxCol); err != nil { + return err + } + } + csvPath := filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-LATENCY-MS-BY-KEY-ERROR-POINTS.csv") + if err := newCSV.CSV(csvPath); err != nil { + return err + } + } + { + allMemoryFrameCfg := dbtester.Plot{ + Column: "AVG-VMRSS-MB", + XAxis: "Cumulative Number of Keys", + YAxis: "Memory(MB) by Keys", + OutputPathList: make([]string, len(cfg.PlotList[0].OutputPathList)), + } + allMemoryFrameCfg.OutputPathList[0] = filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-VMRSS-MB-BY-KEY.svg") + allMemoryFrameCfg.OutputPathList[1] = filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-VMRSS-MB-BY-KEY.png") + plog.Printf("plotting %v", allMemoryFrameCfg.OutputPathList) + var pairs []pair + allCols := allMemoryFrame.Columns() + for i := 0; i < len(allCols)-3; i += 4 { + pairs = append(pairs, pair{ + x: allCols[i], // x + y: allCols[i+2], // avg + }) + } + if err = all.drawXY(allMemoryFrameCfg, pairs...); err != nil { + return err + } + newCSV := dataframe.New() + for _, p := range pairs { + if err = newCSV.AddColumn(p.x); err != nil { + return err + } + if err = newCSV.AddColumn(p.y); err != nil { + return err + } + } + csvPath := filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-VMRSS-MB-BY-KEY.csv") + if err := newCSV.CSV(csvPath); err != nil { + return err + } + } + { + // with error points + allMemoryFrameCfg := dbtester.Plot{ + Column: "AVG-VMRSS-MB", + XAxis: "Cumulative Number of Keys", + YAxis: "Memory(MB) by Keys", + OutputPathList: make([]string, len(cfg.PlotList[0].OutputPathList)), + } + allMemoryFrameCfg.OutputPathList[0] = filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-VMRSS-MB-BY-KEY-ERROR-POINTS.svg") + allMemoryFrameCfg.OutputPathList[1] = filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-VMRSS-MB-BY-KEY-ERROR-POINTS.png") + plog.Printf("plotting %v", allMemoryFrameCfg.OutputPathList) + var triplets []triplet + allCols := allMemoryFrame.Columns() + for i := 0; i < len(allCols)-3; i += 4 { + triplets = append(triplets, triplet{ + x: allCols[i], + minCol: allCols[i+1], + avgCol: allCols[i+2], + maxCol: allCols[i+3], + }) + } + if err = all.drawXYWithErrorPoints(allMemoryFrameCfg, triplets...); err != nil { + return err + } + newCSV := dataframe.New() + for _, tri := range triplets { + if err = newCSV.AddColumn(tri.x); err != nil { + return err + } + if err = newCSV.AddColumn(tri.minCol); err != nil { + return err + } + if err = newCSV.AddColumn(tri.avgCol); err != nil { + return err + } + if err = newCSV.AddColumn(tri.maxCol); err != nil { + return err + } + } + csvPath := filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-VMRSS-MB-BY-KEY-ERROR-POINTS.csv") + if err := newCSV.CSV(csvPath); err != nil { + return err + } + } + + plog.Println("combining data for plotting") + for _, plotConfig := range cfg.PlotList { + plog.Printf("plotting %q", plotConfig.Column) + var clientNumColumns []dataframe.Column + var pairs []pair + var dataColumns []dataframe.Column + for i, ad := range all.data { + databaseID := all.allDatabaseIDList[i] + tag := cfg.DatabaseIDToTestGroup[databaseID].DatabaseTag + + avgCol, err := ad.aggregated.Column("CONTROL-CLIENT-NUM") + if err != nil { + return err + } + avgCol.UpdateHeader(makeHeader("CONTROL-CLIENT-NUM", tag)) + clientNumColumns = append(clientNumColumns, avgCol) + + col, err := ad.aggregated.Column(plotConfig.Column) + if err != nil { + return err + } + col.UpdateHeader(makeHeader(plotConfig.Column, tag)) + pairs = append(pairs, pair{y: col}) + dataColumns = append(dataColumns, col) + } + if err = all.draw(plotConfig, pairs...); err != nil { + return err + } + + plog.Printf("saving data for %q of all database", plotConfig.Column) + nf1, err := dataframe.NewFromColumns(nil, dataColumns...) + if err != nil { + return err + } + if err = nf1.CSV(plotConfig.OutputPathCSV); err != nil { + return err + } + + plog.Printf("saving data for %q of all database (by client number)", plotConfig.Column) + nf2 := dataframe.New() + for i := range clientNumColumns { + if clientNumColumns[i].Count() != dataColumns[i].Count() { + return fmt.Errorf("%q row count %d != %q row count %d", + clientNumColumns[i].Header(), + clientNumColumns[i].Count(), + dataColumns[i].Header(), + dataColumns[i].Count(), + ) + } + if err := nf2.AddColumn(clientNumColumns[i]); err != nil { + return err + } + if err := nf2.AddColumn(dataColumns[i]); err != nil { + return err + } + } + if err = nf2.CSV(filepath.Join(filepath.Dir(plotConfig.OutputPathCSV), plotConfig.Column+"-BY-CLIENT-NUM"+".csv")); err != nil { + return err + } + + if len(cfg.DatabaseIDToTestGroup[cfg.AllDatabaseIDList[0]].BenchmarkOptions.ConnectionClientNumbers) > 0 { + plog.Printf("aggregating data for %q of all database (by client number)", plotConfig.Column) + nf3 := dataframe.New() + var firstKeys []int + for i := range clientNumColumns { + n := clientNumColumns[i].Count() + allData := make(map[int]float64) + for j := 0; j < n; j++ { + v1, err := clientNumColumns[i].Value(j) + if err != nil { + return err + } + num, _ := v1.Int64() + + v2, err := dataColumns[i].Value(j) + if err != nil { + return err + } + data, _ := v2.Float64() + + if v, ok := allData[int(num)]; ok { + allData[int(num)] = (v + data) / 2 + } else { + allData[int(num)] = data + } + } + var allKeys []int + for k := range allData { + allKeys = append(allKeys, k) + } + sort.Ints(allKeys) + + if i == 0 { + firstKeys = allKeys + } + if !reflect.DeepEqual(firstKeys, allKeys) { + return fmt.Errorf("all keys must be %+v, got %+v", firstKeys, allKeys) + } + + if i == 0 { + col1 := dataframe.NewColumn("CONTROL-CLIENT-NUM") + for j := range allKeys { + col1.PushBack(dataframe.NewStringValue(allKeys[j])) + } + if err := nf3.AddColumn(col1); err != nil { + return err + } + } + col2 := dataframe.NewColumn(dataColumns[i].Header()) + for j := range allKeys { + col2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.4f", allData[allKeys[j]]))) + } + if err := nf3.AddColumn(col2); err != nil { + return err + } + } + if err = nf3.CSV(filepath.Join(filepath.Dir(plotConfig.OutputPathCSV), plotConfig.Column+"-BY-CLIENT-NUM-aggregated"+".csv")); err != nil { + return err + } + } + } + + return cfg.WriteREADME(stxt) +} + +func changeExtToTxt(fpath string) string { + ext := filepath.Ext(fpath) + return strings.Replace(fpath, ext, ".txt", -1) +} diff --git a/analyze/finalize_everything.go b/analyze/finalize_everything.go deleted file mode 100644 index b764df07..00000000 --- a/analyze/finalize_everything.go +++ /dev/null @@ -1,949 +0,0 @@ -// Copyright 2017 CoreOS, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package analyze - -import ( - "bytes" - "encoding/csv" - "fmt" - "path/filepath" - "reflect" - "sort" - "strconv" - "strings" - - "github.com/coreos/dbtester" - humanize "github.com/dustin/go-humanize" - "github.com/gyuho/dataframe" - "github.com/olekukonko/tablewriter" -) - -type allAggregatedData struct { - title string - data []*analyzeData - headerToDatabaseID map[string]string - allDatabaseIDList []string -} - -func do(configPath string) error { - cfg, err := dbtester.ReadConfig(configPath, true) - if err != nil { - return err - } - - all := &allAggregatedData{ - title: cfg.TestTitle, - data: make([]*analyzeData, 0, len(cfg.DatabaseIDToTestData)), - headerToDatabaseID: make(map[string]string), - allDatabaseIDList: cfg.AllDatabaseIDList, - } - for _, databaseID := range cfg.AllDatabaseIDList { - testgroup := cfg.DatabaseIDToTestGroup[databaseID] - testdata := cfg.DatabaseIDToTestData[databaseID] - - plog.Printf("reading system metrics data for %s", databaseID) - ad, err := readSystemMetricsAll(testdata.ServerSystemMetricsInterpolatedPathList...) - if err != nil { - return err - } - ad.databaseTag = testgroup.DatabaseTag - ad.legend = testgroup.DatabaseDescription - ad.allAggregatedOutputPath = testdata.AllAggregatedOutputPath - - if err = ad.aggSystemMetrics(); err != nil { - return err - } - if err = ad.importBenchMetrics(testdata.ClientLatencyThroughputTimeseriesPath); err != nil { - return err - } - if err = ad.aggregateAll(testdata.ServerMemoryByKeyNumberPath, testgroup.RequestNumber); err != nil { - return err - } - if err = ad.save(); err != nil { - return err - } - - all.data = append(all.data, ad) - for _, hd := range ad.aggregated.Headers() { - all.headerToDatabaseID[makeHeader(hd, testgroup.DatabaseTag)] = databaseID - } - } - - // aggregated everything - // 1. sum of all network usage per database - // 2. throughput, latency percentiles distribution - // - // FIRST ROW FOR HEADER: etcd, Zookeeper, Consul, ... - // FIRST COLUMN FOR LABELS: READS-COMPLETED-DELTA, ... - // SECOND COLUMNS ~ FOR DATA - row00Header := []string{""} // first is empty - for _, ad := range all.data { - // per database - for _, col := range ad.aggregated.Columns() { - databaseID := all.headerToDatabaseID[col.Header()] - row00Header = append(row00Header, cfg.DatabaseIDToTestGroup[databaseID].DatabaseTag) - break - } - } - - row19ServerReceiveBytesSum := []string{"SERVER-TOTAL-NETWORK-RECEIVE-DATA-SUM"} - row20ServerTransmitBytesSum := []string{"SERVER-TOTAL-NETWORK-TRANSMIT-DATA-SUM"} - row23ServerMaxCPUUsage := []string{"SERVER-MAX-CPU-USAGE"} - row24ServerMaxMemoryUsage := []string{"SERVER-MAX-MEMORY-USAGE"} - row28ReadsCompletedDeltaSum := []string{"SERVER-AVG-READS-COMPLETED-DELTA-SUM"} - row29SectorsReadDeltaSum := []string{"SERVER-AVG-SECTORS-READS-DELTA-SUM"} - row30WritesCompletedDeltaSum := []string{"SERVER-AVG-WRITES-COMPLETED-DELTA-SUM"} - row31SectorsWrittenDeltaSum := []string{"SERVER-AVG-SECTORS-WRITTEN-DELTA-SUM"} - - // iterate each database's all data - for _, ad := range all.data { - // per database - var ( - readsCompletedDeltaSum float64 - sectorsReadDeltaSum float64 - writesCompletedDeltaSum float64 - sectorsWrittenDeltaSum float64 - receiveBytesNumDeltaSum float64 - transmitBytesNumDeltaSum float64 - maxAvgCPU float64 - maxAvgVMRSSMBs []float64 - ) - for _, col := range ad.aggregated.Columns() { - hdr := col.Header() - switch { - case strings.HasPrefix(hdr, "RECEIVE-BYTES-NUM-DELTA-"): - cnt := col.Count() - for j := 0; j < cnt; j++ { - vv, err := col.Value(j) - if err != nil { - return err - } - fv, _ := vv.Float64() - receiveBytesNumDeltaSum += fv - } - case strings.HasPrefix(hdr, "TRANSMIT-BYTES-NUM-DELTA-"): - cnt := col.Count() - for j := 0; j < cnt; j++ { - vv, err := col.Value(j) - if err != nil { - return err - } - fv, _ := vv.Float64() - transmitBytesNumDeltaSum += fv - } - case strings.HasPrefix(hdr, "READS-COMPLETED-DELTA-"): - cnt := col.Count() - for j := 0; j < cnt; j++ { - vv, err := col.Value(j) - if err != nil { - return err - } - fv, _ := vv.Float64() - readsCompletedDeltaSum += fv - } - case strings.HasPrefix(hdr, "SECTORS-READS-DELTA-"): - cnt := col.Count() - for j := 0; j < cnt; j++ { - vv, err := col.Value(j) - if err != nil { - return err - } - fv, _ := vv.Float64() - sectorsReadDeltaSum += fv - } - case strings.HasPrefix(hdr, "WRITES-COMPLETED-DELTA-"): - cnt := col.Count() - for j := 0; j < cnt; j++ { - vv, err := col.Value(j) - if err != nil { - return err - } - fv, _ := vv.Float64() - writesCompletedDeltaSum += fv - } - case strings.HasPrefix(hdr, "SECTORS-WRITTEN-DELTA-"): - cnt := col.Count() - for j := 0; j < cnt; j++ { - vv, err := col.Value(j) - if err != nil { - return err - } - fv, _ := vv.Float64() - sectorsWrittenDeltaSum += fv - } - case strings.HasPrefix(hdr, "AVG-CPU-"): - cnt := col.Count() - for j := 0; j < cnt; j++ { - vv, err := col.Value(j) - if err != nil { - return err - } - fv, _ := vv.Float64() - maxAvgCPU = maxFloat64(maxAvgCPU, fv) - } - case strings.HasPrefix(hdr, "AVG-VMRSS-MB-"): - cnt := col.Count() - for j := 0; j < cnt; j++ { - vv, err := col.Value(j) - if err != nil { - return err - } - fv, _ := vv.Float64() - maxAvgVMRSSMBs = append(maxAvgVMRSSMBs, fv) - } - } - } - - row19ServerReceiveBytesSum = append(row19ServerReceiveBytesSum, humanize.Bytes(uint64(receiveBytesNumDeltaSum))) - row20ServerTransmitBytesSum = append(row20ServerTransmitBytesSum, humanize.Bytes(uint64(transmitBytesNumDeltaSum))) - row23ServerMaxCPUUsage = append(row23ServerMaxCPUUsage, fmt.Sprintf("%.2f %%", maxAvgCPU)) - row28ReadsCompletedDeltaSum = append(row28ReadsCompletedDeltaSum, humanize.Comma(int64(readsCompletedDeltaSum))) - row29SectorsReadDeltaSum = append(row29SectorsReadDeltaSum, humanize.Comma(int64(sectorsReadDeltaSum))) - row30WritesCompletedDeltaSum = append(row30WritesCompletedDeltaSum, humanize.Comma(int64(writesCompletedDeltaSum))) - row31SectorsWrittenDeltaSum = append(row31SectorsWrittenDeltaSum, humanize.Comma(int64(sectorsWrittenDeltaSum))) - - // TODO: handle overflowed memory value? - sort.Float64s(maxAvgVMRSSMBs) - mv := maxAvgVMRSSMBs[len(maxAvgVMRSSMBs)-1] - mb := uint64(mv * 1000000) - row24ServerMaxMemoryUsage = append(row24ServerMaxMemoryUsage, humanize.Bytes(mb)) - } - - row01TotalSeconds := []string{"TOTAL-SECONDS"} // TOTAL-SECONDS - row02TotalRequestNumber := []string{"TOTAL-REQUEST-NUMBER"} - row05MaxThroughput := []string{"MAX-THROUGHPUT"} // MAX AVG-THROUGHPUT - row06AverageThroughput := []string{"AVG-THROUGHPUT"} // REQUESTS-PER-SECOND - row07MinThroughput := []string{"MIN-THROUGHPUT"} // MIN AVG-THROUGHPUT - row08FastestLatency := []string{"FASTEST-LATENCY"} // FASTEST-LATENCY-MS - row09AverageLatency := []string{"AVG-LATENCY"} // AVERAGE-LATENCY-MS - row10SlowestLatency := []string{"SLOWEST-LATENCY"} // SLOWEST-LATENCY-MS - row11p10 := []string{"Latency p10"} // p10 - row12p25 := []string{"Latency p25"} // p25 - row13p50 := []string{"Latency p50"} // p50 - row14p75 := []string{"Latency p75"} // p75 - row15p90 := []string{"Latency p90"} // p90 - row16p95 := []string{"Latency p95"} // p95 - row17p99 := []string{"Latency p99"} // p99 - row18p999 := []string{"Latency p99.9"} // p99.9 - row21ClientReceiveBytesSum := []string{"CLIENT-TOTAL-NETWORK-RECEIVE-SUM"} // RECEIVE-BYTES-NUM-DELTA - row22lientTransmitBytesSum := []string{"CLIENT-TOTAL-NETWORK-TRANSMIT-SUM"} // TRANSMIT-BYTES-DELTA - row25ClientMaxCPU := []string{"CLIENT-MAX-CPU-USAGE"} // CPU-NUM - row26ClientMaxMemory := []string{"CLIENT-MAX-MEMORY-USAGE"} // VMRSS-NUM - row27ClientErrorCount := []string{"CLIENT-ERROR-COUNT"} // ERROR: - row32AverageDatasize := []string{"SERVER-AVG-DATA-SIZE-ON-DISK"} // TOTAL-DATA-SIZE - - databaseIDToErrs := make(map[string][]string) - for i, databaseID := range cfg.AllDatabaseIDList { - testgroup := cfg.DatabaseIDToTestGroup[databaseID] - testdata := cfg.DatabaseIDToTestData[databaseID] - - tag := testdata.DatabaseTag - if tag != row00Header[i+1] { - return fmt.Errorf("analyze config has different order; expected %q, got %q", row00Header[i+1], tag) - } - row02TotalRequestNumber = append(row02TotalRequestNumber, humanize.Comma(testgroup.RequestNumber)) - - { - fr, err := dataframe.NewFromCSV(nil, testdata.ClientSystemMetricsInterpolatedPath) - if err != nil { - return err - } - - var receiveBytesNumDeltaSum uint64 - col, err := fr.Column("RECEIVE-BYTES-NUM-DELTA") - if err != nil { - return err - } - for i := 0; i < col.Count(); i++ { - v, err := col.Value(i) - if err != nil { - return err - } - iv, _ := v.Uint64() - receiveBytesNumDeltaSum += iv - } - - var transmitBytesNumDeltaSum uint64 - col, err = fr.Column("TRANSMIT-BYTES-NUM-DELTA") - if err != nil { - return err - } - for i := 0; i < col.Count(); i++ { - v, err := col.Value(i) - if err != nil { - return err - } - iv, _ := v.Uint64() - transmitBytesNumDeltaSum += iv - } - - var maxAvgCPU float64 - col, err = fr.Column("CPU-NUM") - if err != nil { - return err - } - for i := 0; i < col.Count(); i++ { - v, err := col.Value(i) - if err != nil { - return err - } - fv, _ := v.Float64() - if maxAvgCPU == 0 || fv > maxAvgCPU { - maxAvgCPU = fv - } - } - - var maxVMRSSNum uint64 - col, err = fr.Column("VMRSS-NUM") - if err != nil { - return err - } - for i := 0; i < col.Count(); i++ { - v, err := col.Value(i) - if err != nil { - return err - } - iv, _ := v.Uint64() - if maxVMRSSNum == 0 || iv > maxVMRSSNum { - maxVMRSSNum = iv - } - } - - row21ClientReceiveBytesSum = append(row21ClientReceiveBytesSum, humanize.Bytes(receiveBytesNumDeltaSum)) - row22lientTransmitBytesSum = append(row22lientTransmitBytesSum, humanize.Bytes(transmitBytesNumDeltaSum)) - row25ClientMaxCPU = append(row25ClientMaxCPU, fmt.Sprintf("%.2f %%", maxAvgCPU)) - row26ClientMaxMemory = append(row26ClientMaxMemory, humanize.Bytes(maxVMRSSNum)) - } - { - f, err := openToRead(testdata.ClientLatencyDistributionSummaryPath) - if err != nil { - return err - } - defer f.Close() - - rd := csv.NewReader(f) - - // FieldsPerRecord is the number of expected fields per record. - // If FieldsPerRecord is positive, Read requires each record to - // have the given number of fields. If FieldsPerRecord is 0, Read sets it to - // the number of fields in the first record, so that future records must - // have the same field count. If FieldsPerRecord is negative, no check is - // made and records may have a variable number of fields. - rd.FieldsPerRecord = -1 - - rows, err := rd.ReadAll() - if err != nil { - return err - } - - var totalErrCnt int64 - for _, row := range rows { - switch row[0] { - case "TOTAL-SECONDS": - row01TotalSeconds = append(row01TotalSeconds, fmt.Sprintf("%s sec", row[1])) - case "REQUESTS-PER-SECOND": - fv, err := strconv.ParseFloat(row[1], 64) - if err != nil { - return err - } - avg := int64(fv) - row06AverageThroughput = append(row06AverageThroughput, fmt.Sprintf("%s req/sec", humanize.Comma(avg))) - case "SLOWEST-LATENCY-MS": - row10SlowestLatency = append(row10SlowestLatency, fmt.Sprintf("%s ms", row[1])) - case "FASTEST-LATENCY-MS": - row08FastestLatency = append(row08FastestLatency, fmt.Sprintf("%s ms", row[1])) - case "AVERAGE-LATENCY-MS": - row09AverageLatency = append(row09AverageLatency, fmt.Sprintf("%s ms", row[1])) - } - - if strings.HasPrefix(row[0], "ERROR:") { - iv, err := strconv.ParseInt(row[1], 10, 64) - if err != nil { - return err - } - totalErrCnt += iv - - c1 := strings.TrimSpace(strings.Replace(row[0], "ERROR:", "", -1)) - c2 := humanize.Comma(iv) - es := fmt.Sprintf("%s (count %s)", c1, c2) - if _, ok := databaseIDToErrs[databaseID]; !ok { - databaseIDToErrs[databaseID] = []string{es} - } else { - databaseIDToErrs[databaseID] = append(databaseIDToErrs[databaseID], es) - } - } - } - row27ClientErrorCount = append(row27ClientErrorCount, humanize.Comma(totalErrCnt)) - } - { - fr, err := dataframe.NewFromCSV(nil, testdata.ClientLatencyThroughputTimeseriesPath) - if err != nil { - return err - } - col, err := fr.Column("AVG-THROUGHPUT") - if err != nil { - return err - } - var min int64 - var max int64 - for i := 0; i < col.Count(); i++ { - val, err := col.Value(i) - if err != nil { - return err - } - fv, _ := val.Float64() - - if i == 0 { - min = int64(fv) - } - if max < int64(fv) { - max = int64(fv) - } - if min > int64(fv) { - min = int64(fv) - } - } - row05MaxThroughput = append(row05MaxThroughput, fmt.Sprintf("%s req/sec", humanize.Comma(max))) - row07MinThroughput = append(row07MinThroughput, fmt.Sprintf("%s req/sec", humanize.Comma(min))) - } - { - fr, err := dataframe.NewFromCSV(nil, testdata.ServerDatasizeOnDiskSummaryPath) - if err != nil { - return err - } - col, err := fr.Column(dbtester.DatasizeOnDiskSummaryColumns[3]) // datasize in bytes - if err != nil { - return err - } - var sum float64 - for i := 0; i < col.Count(); i++ { - val, err := col.Value(i) - if err != nil { - return err - } - fv, _ := val.Float64() - sum += fv - } - avg := uint64(sum / float64(col.Count())) - row32AverageDatasize = append(row32AverageDatasize, humanize.Bytes(avg)) - } - { - f, err := openToRead(testdata.ClientLatencyDistributionPercentilePath) - if err != nil { - return err - } - defer f.Close() - - rd := csv.NewReader(f) - - // FieldsPerRecord is the number of expected fields per record. - // If FieldsPerRecord is positive, Read requires each record to - // have the given number of fields. If FieldsPerRecord is 0, Read sets it to - // the number of fields in the first record, so that future records must - // have the same field count. If FieldsPerRecord is negative, no check is - // made and records may have a variable number of fields. - rd.FieldsPerRecord = -1 - - rows, err := rd.ReadAll() - if err != nil { - return err - } - - for ri, row := range rows { - if ri == 0 { - continue // skip header - } - switch row[0] { - case "p10": - row11p10 = append(row11p10, fmt.Sprintf("%s ms", row[1])) - case "p25": - row12p25 = append(row12p25, fmt.Sprintf("%s ms", row[1])) - case "p50": - row13p50 = append(row13p50, fmt.Sprintf("%s ms", row[1])) - case "p75": - row14p75 = append(row14p75, fmt.Sprintf("%s ms", row[1])) - case "p90": - row15p90 = append(row15p90, fmt.Sprintf("%s ms", row[1])) - case "p95": - row16p95 = append(row16p95, fmt.Sprintf("%s ms", row[1])) - case "p99": - row17p99 = append(row17p99, fmt.Sprintf("%s ms", row[1])) - case "p99.9": - row18p999 = append(row18p999, fmt.Sprintf("%s ms", row[1])) - } - } - } - } - - aggRows := [][]string{ - row00Header, - row01TotalSeconds, - row02TotalRequestNumber, - row05MaxThroughput, - row06AverageThroughput, - row07MinThroughput, - row08FastestLatency, - row09AverageLatency, - row10SlowestLatency, - row11p10, - row12p25, - row13p50, - row14p75, - row15p90, - row16p95, - row17p99, - row18p999, - - row19ServerReceiveBytesSum, - row20ServerTransmitBytesSum, - row21ClientReceiveBytesSum, - row22lientTransmitBytesSum, - - row23ServerMaxCPUUsage, - row24ServerMaxMemoryUsage, - row25ClientMaxCPU, - row26ClientMaxMemory, - - row27ClientErrorCount, - - row28ReadsCompletedDeltaSum, - row29SectorsReadDeltaSum, - row30WritesCompletedDeltaSum, - row31SectorsWrittenDeltaSum, - row32AverageDatasize, - } - - plog.Printf("saving summary data to %q", cfg.Analyze.AllAggregatedOutputPathCSV) - file, err := openToOverwrite(cfg.Analyze.AllAggregatedOutputPathCSV) - if err != nil { - return err - } - defer file.Close() - wr := csv.NewWriter(file) - if err := wr.WriteAll(aggRows); err != nil { - return err - } - wr.Flush() - if err := wr.Error(); err != nil { - return err - } - buf := new(bytes.Buffer) - tw := tablewriter.NewWriter(buf) - tw.SetHeader(aggRows[0]) - for _, row := range aggRows[1:] { - tw.Append(row) - } - tw.SetAutoFormatHeaders(false) - tw.SetAlignment(tablewriter.ALIGN_RIGHT) - tw.Render() - - errs := "" - for _, databaseID := range cfg.AllDatabaseIDList { - es, ok := databaseIDToErrs[databaseID] - if !ok { - continue - } - errs = databaseID + " " + "errors:\n" + strings.Join(es, "\n") + "\n" - } - plog.Printf("saving summary data to %q", cfg.Analyze.AllAggregatedOutputPathTXT) - stxt := buf.String() - if errs != "" { - stxt += "\n" + "\n" + errs - } - if err := toFile(stxt, changeExtToTxt(cfg.Analyze.AllAggregatedOutputPathTXT)); err != nil { - return err - } - - // KEYS, MIN-LATENCY-MS, AVG-LATENCY-MS, MAX-LATENCY-MS - plog.Info("combining all latency data by keys") - allLatencyFrame := dataframe.New() - for _, databaseID := range cfg.AllDatabaseIDList { - testdata := cfg.DatabaseIDToTestData[databaseID] - - fr, err := dataframe.NewFromCSV(nil, testdata.ClientLatencyByKeyNumberPath) - if err != nil { - return err - } - colKeys, err := fr.Column("KEYS") - if err != nil { - return err - } - colKeys.UpdateHeader(makeHeader("KEYS", testdata.DatabaseTag)) - if err = allLatencyFrame.AddColumn(colKeys); err != nil { - return err - } - - colMinLatency, err := fr.Column("MIN-LATENCY-MS") - if err != nil { - return err - } - colMinLatency.UpdateHeader(makeHeader("MIN-LATENCY-MS", testdata.DatabaseTag)) - if err = allLatencyFrame.AddColumn(colMinLatency); err != nil { - return err - } - - colAvgLatency, err := fr.Column("AVG-LATENCY-MS") - if err != nil { - return err - } - colAvgLatency.UpdateHeader(makeHeader("AVG-LATENCY-MS", testdata.DatabaseTag)) - if err = allLatencyFrame.AddColumn(colAvgLatency); err != nil { - return err - } - - colMaxLatency, err := fr.Column("MAX-LATENCY-MS") - if err != nil { - return err - } - colMaxLatency.UpdateHeader(makeHeader("MAX-LATENCY-MS", testdata.DatabaseTag)) - if err = allLatencyFrame.AddColumn(colMaxLatency); err != nil { - return err - } - } - // KEYS, MIN-VMRSS-MB, AVG-VMRSS-MB, MAX-VMRSS-MB - plog.Info("combining all server memory usage by keys") - allMemoryFrame := dataframe.New() - for _, databaseID := range cfg.AllDatabaseIDList { - testdata := cfg.DatabaseIDToTestData[databaseID] - - fr, err := dataframe.NewFromCSV(nil, testdata.ServerMemoryByKeyNumberPath) - if err != nil { - return err - } - colKeys, err := fr.Column("KEYS") - if err != nil { - return err - } - colKeys.UpdateHeader(makeHeader("KEYS", testdata.DatabaseTag)) - if err = allMemoryFrame.AddColumn(colKeys); err != nil { - return err - } - - colMemMin, err := fr.Column("MIN-VMRSS-MB") - if err != nil { - return err - } - colMemMin.UpdateHeader(makeHeader("MIN-VMRSS-MB", testdata.DatabaseTag)) - if err = allMemoryFrame.AddColumn(colMemMin); err != nil { - return err - } - - colMem, err := fr.Column("AVG-VMRSS-MB") - if err != nil { - return err - } - colMem.UpdateHeader(makeHeader("AVG-VMRSS-MB", testdata.DatabaseTag)) - if err = allMemoryFrame.AddColumn(colMem); err != nil { - return err - } - - colMemMax, err := fr.Column("MAX-VMRSS-MB") - if err != nil { - return err - } - colMemMax.UpdateHeader(makeHeader("MAX-VMRSS-MB", testdata.DatabaseTag)) - if err = allMemoryFrame.AddColumn(colMemMax); err != nil { - return err - } - } - - { - allLatencyFrameCfg := dbtester.Plot{ - Column: "AVG-LATENCY-MS", - XAxis: "Cumulative Number of Keys", - YAxis: "Latency(millisecond) by Keys", - OutputPathList: make([]string, len(cfg.PlotList[0].OutputPathList)), - } - allLatencyFrameCfg.OutputPathList[0] = filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-LATENCY-MS-BY-KEY.svg") - allLatencyFrameCfg.OutputPathList[1] = filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-LATENCY-MS-BY-KEY.png") - plog.Printf("plotting %v", allLatencyFrameCfg.OutputPathList) - var pairs []pair - allCols := allLatencyFrame.Columns() - for i := 0; i < len(allCols)-3; i += 4 { - pairs = append(pairs, pair{ - x: allCols[i], // x - y: allCols[i+2], // avg - }) - } - if err = all.drawXY(allLatencyFrameCfg, pairs...); err != nil { - return err - } - newCSV := dataframe.New() - for _, p := range pairs { - if err = newCSV.AddColumn(p.x); err != nil { - return err - } - if err = newCSV.AddColumn(p.y); err != nil { - return err - } - } - csvPath := filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-LATENCY-MS-BY-KEY.csv") - if err := newCSV.CSV(csvPath); err != nil { - return err - } - } - { - // with error points - allLatencyFrameCfg := dbtester.Plot{ - Column: "AVG-LATENCY-MS", - XAxis: "Cumulative Number of Keys", - YAxis: "Latency(millisecond) by Keys", - OutputPathList: make([]string, len(cfg.PlotList[0].OutputPathList)), - } - allLatencyFrameCfg.OutputPathList[0] = filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-LATENCY-MS-BY-KEY-ERROR-POINTS.svg") - allLatencyFrameCfg.OutputPathList[1] = filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-LATENCY-MS-BY-KEY-ERROR-POINTS.png") - plog.Printf("plotting %v", allLatencyFrameCfg.OutputPathList) - var triplets []triplet - allCols := allLatencyFrame.Columns() - for i := 0; i < len(allCols)-3; i += 4 { - triplets = append(triplets, triplet{ - x: allCols[i], - minCol: allCols[i+1], - avgCol: allCols[i+2], - maxCol: allCols[i+3], - }) - } - if err = all.drawXYWithErrorPoints(allLatencyFrameCfg, triplets...); err != nil { - return err - } - newCSV := dataframe.New() - for _, tri := range triplets { - if err = newCSV.AddColumn(tri.x); err != nil { - return err - } - if err = newCSV.AddColumn(tri.minCol); err != nil { - return err - } - if err = newCSV.AddColumn(tri.avgCol); err != nil { - return err - } - if err = newCSV.AddColumn(tri.maxCol); err != nil { - return err - } - } - csvPath := filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-LATENCY-MS-BY-KEY-ERROR-POINTS.csv") - if err := newCSV.CSV(csvPath); err != nil { - return err - } - } - { - allMemoryFrameCfg := dbtester.Plot{ - Column: "AVG-VMRSS-MB", - XAxis: "Cumulative Number of Keys", - YAxis: "Memory(MB) by Keys", - OutputPathList: make([]string, len(cfg.PlotList[0].OutputPathList)), - } - allMemoryFrameCfg.OutputPathList[0] = filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-VMRSS-MB-BY-KEY.svg") - allMemoryFrameCfg.OutputPathList[1] = filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-VMRSS-MB-BY-KEY.png") - plog.Printf("plotting %v", allMemoryFrameCfg.OutputPathList) - var pairs []pair - allCols := allMemoryFrame.Columns() - for i := 0; i < len(allCols)-3; i += 4 { - pairs = append(pairs, pair{ - x: allCols[i], // x - y: allCols[i+2], // avg - }) - } - if err = all.drawXY(allMemoryFrameCfg, pairs...); err != nil { - return err - } - newCSV := dataframe.New() - for _, p := range pairs { - if err = newCSV.AddColumn(p.x); err != nil { - return err - } - if err = newCSV.AddColumn(p.y); err != nil { - return err - } - } - csvPath := filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-VMRSS-MB-BY-KEY.csv") - if err := newCSV.CSV(csvPath); err != nil { - return err - } - } - { - // with error points - allMemoryFrameCfg := dbtester.Plot{ - Column: "AVG-VMRSS-MB", - XAxis: "Cumulative Number of Keys", - YAxis: "Memory(MB) by Keys", - OutputPathList: make([]string, len(cfg.PlotList[0].OutputPathList)), - } - allMemoryFrameCfg.OutputPathList[0] = filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-VMRSS-MB-BY-KEY-ERROR-POINTS.svg") - allMemoryFrameCfg.OutputPathList[1] = filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-VMRSS-MB-BY-KEY-ERROR-POINTS.png") - plog.Printf("plotting %v", allMemoryFrameCfg.OutputPathList) - var triplets []triplet - allCols := allMemoryFrame.Columns() - for i := 0; i < len(allCols)-3; i += 4 { - triplets = append(triplets, triplet{ - x: allCols[i], - minCol: allCols[i+1], - avgCol: allCols[i+2], - maxCol: allCols[i+3], - }) - } - if err = all.drawXYWithErrorPoints(allMemoryFrameCfg, triplets...); err != nil { - return err - } - newCSV := dataframe.New() - for _, tri := range triplets { - if err = newCSV.AddColumn(tri.x); err != nil { - return err - } - if err = newCSV.AddColumn(tri.minCol); err != nil { - return err - } - if err = newCSV.AddColumn(tri.avgCol); err != nil { - return err - } - if err = newCSV.AddColumn(tri.maxCol); err != nil { - return err - } - } - csvPath := filepath.Join(filepath.Dir(cfg.PlotList[0].OutputPathList[0]), "AVG-VMRSS-MB-BY-KEY-ERROR-POINTS.csv") - if err := newCSV.CSV(csvPath); err != nil { - return err - } - } - - plog.Println("combining data for plotting") - for _, plotConfig := range cfg.PlotList { - plog.Printf("plotting %q", plotConfig.Column) - var clientNumColumns []dataframe.Column - var pairs []pair - var dataColumns []dataframe.Column - for i, ad := range all.data { - databaseID := all.allDatabaseIDList[i] - tag := cfg.DatabaseIDToTestGroup[databaseID].DatabaseTag - - avgCol, err := ad.aggregated.Column("CONTROL-CLIENT-NUM") - if err != nil { - return err - } - avgCol.UpdateHeader(makeHeader("CONTROL-CLIENT-NUM", tag)) - clientNumColumns = append(clientNumColumns, avgCol) - - col, err := ad.aggregated.Column(plotConfig.Column) - if err != nil { - return err - } - col.UpdateHeader(makeHeader(plotConfig.Column, tag)) - pairs = append(pairs, pair{y: col}) - dataColumns = append(dataColumns, col) - } - if err = all.draw(plotConfig, pairs...); err != nil { - return err - } - - plog.Printf("saving data for %q of all database", plotConfig.Column) - nf1, err := dataframe.NewFromColumns(nil, dataColumns...) - if err != nil { - return err - } - if err = nf1.CSV(plotConfig.OutputPathCSV); err != nil { - return err - } - - plog.Printf("saving data for %q of all database (by client number)", plotConfig.Column) - nf2 := dataframe.New() - for i := range clientNumColumns { - if clientNumColumns[i].Count() != dataColumns[i].Count() { - return fmt.Errorf("%q row count %d != %q row count %d", - clientNumColumns[i].Header(), - clientNumColumns[i].Count(), - dataColumns[i].Header(), - dataColumns[i].Count(), - ) - } - if err := nf2.AddColumn(clientNumColumns[i]); err != nil { - return err - } - if err := nf2.AddColumn(dataColumns[i]); err != nil { - return err - } - } - if err = nf2.CSV(filepath.Join(filepath.Dir(plotConfig.OutputPathCSV), plotConfig.Column+"-BY-CLIENT-NUM"+".csv")); err != nil { - return err - } - - if len(cfg.DatabaseIDToTestGroup[cfg.AllDatabaseIDList[0]].BenchmarkOptions.ConnectionClientNumbers) > 0 { - plog.Printf("aggregating data for %q of all database (by client number)", plotConfig.Column) - nf3 := dataframe.New() - var firstKeys []int - for i := range clientNumColumns { - n := clientNumColumns[i].Count() - allData := make(map[int]float64) - for j := 0; j < n; j++ { - v1, err := clientNumColumns[i].Value(j) - if err != nil { - return err - } - num, _ := v1.Int64() - - v2, err := dataColumns[i].Value(j) - if err != nil { - return err - } - data, _ := v2.Float64() - - if v, ok := allData[int(num)]; ok { - allData[int(num)] = (v + data) / 2 - } else { - allData[int(num)] = data - } - } - var allKeys []int - for k := range allData { - allKeys = append(allKeys, k) - } - sort.Ints(allKeys) - - if i == 0 { - firstKeys = allKeys - } - if !reflect.DeepEqual(firstKeys, allKeys) { - return fmt.Errorf("all keys must be %+v, got %+v", firstKeys, allKeys) - } - - if i == 0 { - col1 := dataframe.NewColumn("CONTROL-CLIENT-NUM") - for j := range allKeys { - col1.PushBack(dataframe.NewStringValue(allKeys[j])) - } - if err := nf3.AddColumn(col1); err != nil { - return err - } - } - col2 := dataframe.NewColumn(dataColumns[i].Header()) - for j := range allKeys { - col2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.4f", allData[allKeys[j]]))) - } - if err := nf3.AddColumn(col2); err != nil { - return err - } - } - if err = nf3.CSV(filepath.Join(filepath.Dir(plotConfig.OutputPathCSV), plotConfig.Column+"-BY-CLIENT-NUM-aggregated"+".csv")); err != nil { - return err - } - } - } - - return cfg.WriteREADME(stxt) -} - -func changeExtToTxt(fpath string) string { - ext := filepath.Ext(fpath) - return strings.Replace(fpath, ext, ".txt", -1) -} diff --git a/analyze/analyze_data_4_aggregate_timeseries.go b/analyze/process_timeseries.go similarity index 100% rename from analyze/analyze_data_4_aggregate_timeseries.go rename to analyze/process_timeseries.go diff --git a/analyze/analyze_data_4_aggregate_timeseries_test.go b/analyze/process_timeseries_test.go similarity index 100% rename from analyze/analyze_data_4_aggregate_timeseries_test.go rename to analyze/process_timeseries_test.go