Merge pull request #271 from gyuho/config

*: don't use map to filter out the duplicate unix seconds
This commit is contained in:
Gyu-Ho Lee 2017-02-06 13:37:56 -08:00 committed by GitHub
commit 08660f4093
4 changed files with 24 additions and 56 deletions

View File

@ -143,19 +143,20 @@ func (data *analyzeData) importBenchMetrics(fpath string) (err error) {
return fmt.Errorf("cannot Float64 %v", hv)
}
// handle duplicate timestamps
if v, ok := sec2Data[ts]; !ok {
sec2Data[ts] = rowData{clientN: cn, minLat: minLat, avgLat: avgLat, maxLat: maxLat, throughput: dataThr}
} else {
oldCn := v.clientN
if oldCn != cn {
return fmt.Errorf("different client number with same timestamps! %d != %d", oldCn, cn)
}
// it is possible that there are duplicate timestamps with
// different client numbers, when clients number bump up
// these requests happen within this unix second, add up the
// throughput, and select min,max and avg of latencies
sec2Data[ts] = rowData{
clientN: cn,
minLat: minFloat64(v.minLat, minLat),
avgLat: (v.avgLat + avgLat) / 2.0,
maxLat: maxFloat64(v.maxLat, maxLat),
throughput: (v.throughput + dataThr) / 2.0,
throughput: v.throughput + dataThr,
}
}
}
@ -178,8 +179,9 @@ func (data *analyzeData) importBenchMetrics(fpath string) (err error) {
v, ok := sec2Data[second]
if !ok {
prev := findClosest(second, sec2Data)
newControlClientNumCol.PushBack(dataframe.NewStringValue(prev.clientN))
// fill-in missing rows with closest row
closest := findClosest(second, sec2Data)
newControlClientNumCol.PushBack(dataframe.NewStringValue(closest.clientN))
newMinLatencyCol.PushBack(dataframe.NewStringValue(0.0))
newAvgLatencyCol.PushBack(dataframe.NewStringValue(0.0))
newMaxLatencyCol.PushBack(dataframe.NewStringValue(0.0))

View File

@ -205,7 +205,7 @@ func (data *analyzeData) aggregateAll(memoryByKeyPath string, totalRequests int6
if v, ok := sec2minVMRSSMB[ts]; !ok {
sec2minVMRSSMB[ts] = vv
} else if v > vv {
} else if v > vv || (v == 0.0 && vv != 0.0) {
sec2minVMRSSMB[ts] = vv
}
if v, ok := sec2maxVMRSSMB[ts]; !ok {

View File

@ -210,7 +210,7 @@ func (cfg *Config) saveDataLatencyDistributionAll(st report.Stats) {
}
}
func (cfg *Config) saveDataLatencyThroughputTimeseries(gcfg TestGroup, st report.Stats, tsToClientN map[int64]int64) {
func (cfg *Config) saveDataLatencyThroughputTimeseries(gcfg TestGroup, st report.Stats, clientNs []int64) {
c1 := dataframe.NewColumn("UNIX-SECOND")
c2 := dataframe.NewColumn("CONTROL-CLIENT-NUM")
c3 := dataframe.NewColumn("MIN-LATENCY-MS")
@ -220,17 +220,10 @@ func (cfg *Config) saveDataLatencyThroughputTimeseries(gcfg TestGroup, st report
for i := range st.TimeSeries {
// this Timestamp is unix seconds
c1.PushBack(dataframe.NewStringValue(fmt.Sprintf("%d", st.TimeSeries[i].Timestamp)))
if len(tsToClientN) == 0 {
c2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%d", gcfg.ClientNumber)))
} else {
c2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%d", tsToClientN[st.TimeSeries[i].Timestamp])))
}
c2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%d", clientNs[i])))
c3.PushBack(dataframe.NewStringValue(fmt.Sprintf("%f", toMillisecond(st.TimeSeries[i].MinLatency))))
c4.PushBack(dataframe.NewStringValue(fmt.Sprintf("%f", toMillisecond(st.TimeSeries[i].AvgLatency))))
c5.PushBack(dataframe.NewStringValue(fmt.Sprintf("%f", toMillisecond(st.TimeSeries[i].MaxLatency))))
c6.PushBack(dataframe.NewStringValue(fmt.Sprintf("%d", st.TimeSeries[i].ThroughPut)))
}
@ -290,11 +283,11 @@ func (cfg *Config) saveDataLatencyThroughputTimeseries(gcfg TestGroup, st report
}
}
func (cfg *Config) saveAllStats(gcfg TestGroup, stats report.Stats, tsToClientN map[int64]int64) {
func (cfg *Config) saveAllStats(gcfg TestGroup, stats report.Stats, clientNs []int64) {
cfg.saveDataLatencyDistributionSummary(stats)
cfg.saveDataLatencyDistributionPercentile(stats)
cfg.saveDataLatencyDistributionAll(stats)
cfg.saveDataLatencyThroughputTimeseries(gcfg, stats, tsToClientN)
cfg.saveDataLatencyThroughputTimeseries(gcfg, stats, clientNs)
}
// UploadToGoogle uploads target file to Google Cloud Storage.

View File

@ -92,8 +92,8 @@ func (cfg *Config) Stress(databaseID string) error {
}
plog.Info("combining all reports")
tsToClientN := make(map[int64]int64, gcfg.BenchmarkOptions.RequestNumber)
combined := report.Stats{ErrorDist: make(map[string]int)}
combinedClientNumber := make([]int64, 0, gcfg.BenchmarkOptions.RequestNumber)
for i, st := range stats {
combined.AvgTotal += st.AvgTotal
combined.Total += st.Total
@ -112,19 +112,15 @@ func (cfg *Config) Stress(databaseID string) error {
// 1486389258, "700", 23188 === ending of previous combined.TimeSeries
// 1486389258, 1000, 5739 === beginning of current st.TimeSeries
//
// And the line below will overwrite the 'client-number' as:
//
// unix-second, client-number, throughput
// 1486389257, 700, 30335 === ending of previous combined.TimeSeries
// 1486389258, "1000", 23188 === ending of previous combined.TimeSeries
// 1486389258, 1000, 5739 === beginning of current st.TimeSeries
//
// So now we have two duplicate unix time seconds.
// This will be handled in aggregating by keys.
//
clientsN := gcfg.BenchmarkOptions.ConnectionClientNumbers[i]
for _, v := range st.TimeSeries {
tsToClientN[v.Timestamp] = clientsN
clientN := gcfg.BenchmarkOptions.ConnectionClientNumbers[i]
clientNs := make([]int64, len(st.TimeSeries))
for i := range st.TimeSeries {
clientNs[i] = clientN
}
combinedClientNumber = append(combinedClientNumber, clientNs...)
for k, v := range st.ErrorDist {
if _, ok := combined.ErrorDist[k]; !ok {
@ -134,32 +130,9 @@ func (cfg *Config) Stress(databaseID string) error {
}
}
}
// handle duplicate unix seconds around boundaries
sec2dp := make(map[int64]report.DataPoint)
for _, tss := range combined.TimeSeries {
v, ok := sec2dp[tss.Timestamp]
if !ok {
sec2dp[tss.Timestamp] = tss
}
// two datapoints share the time unix second
if v.MinLatency > tss.MinLatency {
v.MinLatency = tss.MinLatency
}
if v.MaxLatency < tss.MaxLatency {
v.MaxLatency = tss.MaxLatency
}
v.AvgLatency = (v.AvgLatency + tss.AvgLatency) / time.Duration(2)
v.ThroughPut += tss.ThroughPut
sec2dp[tss.Timestamp] = v
if len(combined.TimeSeries) != len(combinedClientNumber) {
return fmt.Errorf("len(combined.TimeSeries) %d != len(combinedClientNumber) %d", len(combined.TimeSeries), len(combinedClientNumber))
}
var fts report.TimeSeries
for _, dp := range sec2dp {
fts = append(fts, dp)
}
sort.Sort(report.TimeSeries(fts))
combined.TimeSeries = fts
combined.Average = combined.AvgTotal / float64(len(combined.Lats))
combined.RPS = float64(len(combined.Lats)) / combined.Total.Seconds()
@ -179,7 +152,7 @@ func (cfg *Config) Stress(databaseID string) error {
plog.Info("combined all reports")
printStats(combined)
cfg.saveAllStats(gcfg, combined, tsToClientN)
cfg.saveAllStats(gcfg, combined, combinedClientNumber)
}
plog.Println("write generateReport is finished...")