diff --git a/analyze/analyze_data_3_benchmark_metrics.go b/analyze/analyze_data_3_benchmark_metrics.go index 13957451..33c97c1a 100644 --- a/analyze/analyze_data_3_benchmark_metrics.go +++ b/analyze/analyze_data_3_benchmark_metrics.go @@ -143,19 +143,20 @@ func (data *analyzeData) importBenchMetrics(fpath string) (err error) { return fmt.Errorf("cannot Float64 %v", hv) } + // handle duplicate timestamps if v, ok := sec2Data[ts]; !ok { sec2Data[ts] = rowData{clientN: cn, minLat: minLat, avgLat: avgLat, maxLat: maxLat, throughput: dataThr} } else { - oldCn := v.clientN - if oldCn != cn { - return fmt.Errorf("different client number with same timestamps! %d != %d", oldCn, cn) - } + // it is possible that there are duplicate timestamps with + // different client numbers, when clients number bump up + // these requests happen within this unix second, add up the + // throughput, and select min,max and avg of latencies sec2Data[ts] = rowData{ clientN: cn, minLat: minFloat64(v.minLat, minLat), avgLat: (v.avgLat + avgLat) / 2.0, maxLat: maxFloat64(v.maxLat, maxLat), - throughput: (v.throughput + dataThr) / 2.0, + throughput: v.throughput + dataThr, } } } @@ -178,8 +179,9 @@ func (data *analyzeData) importBenchMetrics(fpath string) (err error) { v, ok := sec2Data[second] if !ok { - prev := findClosest(second, sec2Data) - newControlClientNumCol.PushBack(dataframe.NewStringValue(prev.clientN)) + // fill-in missing rows with closest row + closest := findClosest(second, sec2Data) + newControlClientNumCol.PushBack(dataframe.NewStringValue(closest.clientN)) newMinLatencyCol.PushBack(dataframe.NewStringValue(0.0)) newAvgLatencyCol.PushBack(dataframe.NewStringValue(0.0)) newMaxLatencyCol.PushBack(dataframe.NewStringValue(0.0)) diff --git a/analyze/analyze_data_4_aggregate_all.go b/analyze/analyze_data_4_aggregate_all.go index f3161f7c..b1cc466a 100644 --- a/analyze/analyze_data_4_aggregate_all.go +++ b/analyze/analyze_data_4_aggregate_all.go @@ -205,7 +205,7 @@ func (data *analyzeData) aggregateAll(memoryByKeyPath string, totalRequests int6 if v, ok := sec2minVMRSSMB[ts]; !ok { sec2minVMRSSMB[ts] = vv - } else if v > vv { + } else if v > vv || (v == 0.0 && vv != 0.0) { sec2minVMRSSMB[ts] = vv } if v, ok := sec2maxVMRSSMB[ts]; !ok { diff --git a/save_upload.go b/save_upload.go index 464e889a..70ed27eb 100644 --- a/save_upload.go +++ b/save_upload.go @@ -210,7 +210,7 @@ func (cfg *Config) saveDataLatencyDistributionAll(st report.Stats) { } } -func (cfg *Config) saveDataLatencyThroughputTimeseries(gcfg TestGroup, st report.Stats, tsToClientN map[int64]int64) { +func (cfg *Config) saveDataLatencyThroughputTimeseries(gcfg TestGroup, st report.Stats, clientNs []int64) { c1 := dataframe.NewColumn("UNIX-SECOND") c2 := dataframe.NewColumn("CONTROL-CLIENT-NUM") c3 := dataframe.NewColumn("MIN-LATENCY-MS") @@ -220,17 +220,10 @@ func (cfg *Config) saveDataLatencyThroughputTimeseries(gcfg TestGroup, st report for i := range st.TimeSeries { // this Timestamp is unix seconds c1.PushBack(dataframe.NewStringValue(fmt.Sprintf("%d", st.TimeSeries[i].Timestamp))) - - if len(tsToClientN) == 0 { - c2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%d", gcfg.ClientNumber))) - } else { - c2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%d", tsToClientN[st.TimeSeries[i].Timestamp]))) - } - + c2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%d", clientNs[i]))) c3.PushBack(dataframe.NewStringValue(fmt.Sprintf("%f", toMillisecond(st.TimeSeries[i].MinLatency)))) c4.PushBack(dataframe.NewStringValue(fmt.Sprintf("%f", toMillisecond(st.TimeSeries[i].AvgLatency)))) c5.PushBack(dataframe.NewStringValue(fmt.Sprintf("%f", toMillisecond(st.TimeSeries[i].MaxLatency)))) - c6.PushBack(dataframe.NewStringValue(fmt.Sprintf("%d", st.TimeSeries[i].ThroughPut))) } @@ -290,11 +283,11 @@ func (cfg *Config) saveDataLatencyThroughputTimeseries(gcfg TestGroup, st report } } -func (cfg *Config) saveAllStats(gcfg TestGroup, stats report.Stats, tsToClientN map[int64]int64) { +func (cfg *Config) saveAllStats(gcfg TestGroup, stats report.Stats, clientNs []int64) { cfg.saveDataLatencyDistributionSummary(stats) cfg.saveDataLatencyDistributionPercentile(stats) cfg.saveDataLatencyDistributionAll(stats) - cfg.saveDataLatencyThroughputTimeseries(gcfg, stats, tsToClientN) + cfg.saveDataLatencyThroughputTimeseries(gcfg, stats, clientNs) } // UploadToGoogle uploads target file to Google Cloud Storage. diff --git a/stress.go b/stress.go index 60e44d01..5bfd76ad 100644 --- a/stress.go +++ b/stress.go @@ -92,8 +92,8 @@ func (cfg *Config) Stress(databaseID string) error { } plog.Info("combining all reports") - tsToClientN := make(map[int64]int64, gcfg.BenchmarkOptions.RequestNumber) combined := report.Stats{ErrorDist: make(map[string]int)} + combinedClientNumber := make([]int64, 0, gcfg.BenchmarkOptions.RequestNumber) for i, st := range stats { combined.AvgTotal += st.AvgTotal combined.Total += st.Total @@ -112,19 +112,15 @@ func (cfg *Config) Stress(databaseID string) error { // 1486389258, "700", 23188 === ending of previous combined.TimeSeries // 1486389258, 1000, 5739 === beginning of current st.TimeSeries // - // And the line below will overwrite the 'client-number' as: - // - // unix-second, client-number, throughput - // 1486389257, 700, 30335 === ending of previous combined.TimeSeries - // 1486389258, "1000", 23188 === ending of previous combined.TimeSeries - // 1486389258, 1000, 5739 === beginning of current st.TimeSeries - // // So now we have two duplicate unix time seconds. + // This will be handled in aggregating by keys. // - clientsN := gcfg.BenchmarkOptions.ConnectionClientNumbers[i] - for _, v := range st.TimeSeries { - tsToClientN[v.Timestamp] = clientsN + clientN := gcfg.BenchmarkOptions.ConnectionClientNumbers[i] + clientNs := make([]int64, len(st.TimeSeries)) + for i := range st.TimeSeries { + clientNs[i] = clientN } + combinedClientNumber = append(combinedClientNumber, clientNs...) for k, v := range st.ErrorDist { if _, ok := combined.ErrorDist[k]; !ok { @@ -134,32 +130,9 @@ func (cfg *Config) Stress(databaseID string) error { } } } - - // handle duplicate unix seconds around boundaries - sec2dp := make(map[int64]report.DataPoint) - for _, tss := range combined.TimeSeries { - v, ok := sec2dp[tss.Timestamp] - if !ok { - sec2dp[tss.Timestamp] = tss - } - - // two datapoints share the time unix second - if v.MinLatency > tss.MinLatency { - v.MinLatency = tss.MinLatency - } - if v.MaxLatency < tss.MaxLatency { - v.MaxLatency = tss.MaxLatency - } - v.AvgLatency = (v.AvgLatency + tss.AvgLatency) / time.Duration(2) - v.ThroughPut += tss.ThroughPut - sec2dp[tss.Timestamp] = v + if len(combined.TimeSeries) != len(combinedClientNumber) { + return fmt.Errorf("len(combined.TimeSeries) %d != len(combinedClientNumber) %d", len(combined.TimeSeries), len(combinedClientNumber)) } - var fts report.TimeSeries - for _, dp := range sec2dp { - fts = append(fts, dp) - } - sort.Sort(report.TimeSeries(fts)) - combined.TimeSeries = fts combined.Average = combined.AvgTotal / float64(len(combined.Lats)) combined.RPS = float64(len(combined.Lats)) / combined.Total.Seconds() @@ -179,7 +152,7 @@ func (cfg *Config) Stress(databaseID string) error { plog.Info("combined all reports") printStats(combined) - cfg.saveAllStats(gcfg, combined, tsToClientN) + cfg.saveAllStats(gcfg, combined, combinedClientNumber) } plog.Println("write generateReport is finished...")