*: don't use map to filter out the duplicate unix seconds

This commit is contained in:
Gyu-Ho Lee 2017-02-06 13:28:19 -08:00
parent 4e4ad7436c
commit 1bb1099a64
No known key found for this signature in database
GPG Key ID: 1DDD39C7EB70C24C
3 changed files with 15 additions and 49 deletions

View File

@ -205,7 +205,7 @@ func (data *analyzeData) aggregateAll(memoryByKeyPath string, totalRequests int6
if v, ok := sec2minVMRSSMB[ts]; !ok {
sec2minVMRSSMB[ts] = vv
} else if v > vv {
} else if v > vv || (v == 0.0 && vv != 0.0) {
sec2minVMRSSMB[ts] = vv
}
if v, ok := sec2maxVMRSSMB[ts]; !ok {

View File

@ -210,7 +210,7 @@ func (cfg *Config) saveDataLatencyDistributionAll(st report.Stats) {
}
}
func (cfg *Config) saveDataLatencyThroughputTimeseries(gcfg TestGroup, st report.Stats, tsToClientN map[int64]int64) {
func (cfg *Config) saveDataLatencyThroughputTimeseries(gcfg TestGroup, st report.Stats, clientNs []int64) {
c1 := dataframe.NewColumn("UNIX-SECOND")
c2 := dataframe.NewColumn("CONTROL-CLIENT-NUM")
c3 := dataframe.NewColumn("MIN-LATENCY-MS")
@ -220,17 +220,10 @@ func (cfg *Config) saveDataLatencyThroughputTimeseries(gcfg TestGroup, st report
for i := range st.TimeSeries {
// this Timestamp is unix seconds
c1.PushBack(dataframe.NewStringValue(fmt.Sprintf("%d", st.TimeSeries[i].Timestamp)))
if len(tsToClientN) == 0 {
c2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%d", gcfg.ClientNumber)))
} else {
c2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%d", tsToClientN[st.TimeSeries[i].Timestamp])))
}
c2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%d", clientNs[i])))
c3.PushBack(dataframe.NewStringValue(fmt.Sprintf("%f", toMillisecond(st.TimeSeries[i].MinLatency))))
c4.PushBack(dataframe.NewStringValue(fmt.Sprintf("%f", toMillisecond(st.TimeSeries[i].AvgLatency))))
c5.PushBack(dataframe.NewStringValue(fmt.Sprintf("%f", toMillisecond(st.TimeSeries[i].MaxLatency))))
c6.PushBack(dataframe.NewStringValue(fmt.Sprintf("%d", st.TimeSeries[i].ThroughPut)))
}
@ -290,11 +283,11 @@ func (cfg *Config) saveDataLatencyThroughputTimeseries(gcfg TestGroup, st report
}
}
func (cfg *Config) saveAllStats(gcfg TestGroup, stats report.Stats, tsToClientN map[int64]int64) {
func (cfg *Config) saveAllStats(gcfg TestGroup, stats report.Stats, clientNs []int64) {
cfg.saveDataLatencyDistributionSummary(stats)
cfg.saveDataLatencyDistributionPercentile(stats)
cfg.saveDataLatencyDistributionAll(stats)
cfg.saveDataLatencyThroughputTimeseries(gcfg, stats, tsToClientN)
cfg.saveDataLatencyThroughputTimeseries(gcfg, stats, clientNs)
}
// UploadToGoogle uploads target file to Google Cloud Storage.

View File

@ -92,8 +92,8 @@ func (cfg *Config) Stress(databaseID string) error {
}
plog.Info("combining all reports")
tsToClientN := make(map[int64]int64, gcfg.BenchmarkOptions.RequestNumber)
combined := report.Stats{ErrorDist: make(map[string]int)}
combinedClientNumber := make([]int64, 0, gcfg.BenchmarkOptions.RequestNumber)
for i, st := range stats {
combined.AvgTotal += st.AvgTotal
combined.Total += st.Total
@ -112,19 +112,15 @@ func (cfg *Config) Stress(databaseID string) error {
// 1486389258, "700", 23188 === ending of previous combined.TimeSeries
// 1486389258, 1000, 5739 === beginning of current st.TimeSeries
//
// And the line below will overwrite the 'client-number' as:
//
// unix-second, client-number, throughput
// 1486389257, 700, 30335 === ending of previous combined.TimeSeries
// 1486389258, "1000", 23188 === ending of previous combined.TimeSeries
// 1486389258, 1000, 5739 === beginning of current st.TimeSeries
//
// So now we have two duplicate unix time seconds.
// This will be handled in aggregating by keys.
//
clientsN := gcfg.BenchmarkOptions.ConnectionClientNumbers[i]
for _, v := range st.TimeSeries {
tsToClientN[v.Timestamp] = clientsN
clientN := gcfg.BenchmarkOptions.ConnectionClientNumbers[i]
clientNs := make([]int64, len(st.TimeSeries))
for i := range st.TimeSeries {
clientNs[i] = clientN
}
combinedClientNumber = append(combinedClientNumber, clientNs...)
for k, v := range st.ErrorDist {
if _, ok := combined.ErrorDist[k]; !ok {
@ -134,32 +130,9 @@ func (cfg *Config) Stress(databaseID string) error {
}
}
}
// handle duplicate unix seconds around boundaries
sec2dp := make(map[int64]report.DataPoint)
for _, tss := range combined.TimeSeries {
v, ok := sec2dp[tss.Timestamp]
if !ok {
sec2dp[tss.Timestamp] = tss
}
// two datapoints share the time unix second
if v.MinLatency > tss.MinLatency {
v.MinLatency = tss.MinLatency
}
if v.MaxLatency < tss.MaxLatency {
v.MaxLatency = tss.MaxLatency
}
v.AvgLatency = (v.AvgLatency + tss.AvgLatency) / time.Duration(2)
v.ThroughPut += tss.ThroughPut
sec2dp[tss.Timestamp] = v
if len(combined.TimeSeries) != len(combinedClientNumber) {
return fmt.Errorf("len(combined.TimeSeries) %d != len(combinedClientNumber) %d", len(combined.TimeSeries), len(combinedClientNumber))
}
var fts report.TimeSeries
for _, dp := range sec2dp {
fts = append(fts, dp)
}
sort.Sort(report.TimeSeries(fts))
combined.TimeSeries = fts
combined.Average = combined.AvgTotal / float64(len(combined.Lats))
combined.RPS = float64(len(combined.Lats)) / combined.Total.Seconds()
@ -179,7 +152,7 @@ func (cfg *Config) Stress(databaseID string) error {
plog.Info("combined all reports")
printStats(combined)
cfg.saveAllStats(gcfg, combined, tsToClientN)
cfg.saveAllStats(gcfg, combined, combinedClientNumber)
}
plog.Println("write generateReport is finished...")