From febce6301770ae7d9d07290aaa43496d4e0f1fe6 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Tue, 7 Feb 2017 12:30:15 -0800 Subject: [PATCH] *: fix memory aggregation --- analyze/04_aggregate_all_analyze_data.go | 27 +++--- find_ranges.go | 35 +++++--- find_ranges_test.go | 103 ++++++++++++----------- 3 files changed, 89 insertions(+), 76 deletions(-) diff --git a/analyze/04_aggregate_all_analyze_data.go b/analyze/04_aggregate_all_analyze_data.go index 4c826f6e..48326da0 100644 --- a/analyze/04_aggregate_all_analyze_data.go +++ b/analyze/04_aggregate_all_analyze_data.go @@ -18,6 +18,7 @@ import ( "fmt" "strings" + "github.com/coreos/dbtester" "github.com/gyuho/dataframe" ) @@ -407,7 +408,7 @@ func (data *analyzeData) aggregateAll(memoryByKeyPath string, totalRequests int6 return fmt.Errorf("SECOND column count %d, AVG-THROUGHPUT column count %d", colUnixSecond.Count(), colAvgThroughput.Count()) } - var tslice []keyNumAndMemory + var cdata []dbtester.CumulativeKeyNumAndMemory for i := 0; i < colUnixSecond.Count(); i++ { vv0, err := colUnixSecond.Value(i) if err != nil { @@ -427,26 +428,28 @@ func (data *analyzeData) aggregateAll(memoryByKeyPath string, totalRequests int6 } vf2, _ := vv2.Float64() - point := keyNumAndMemory{ - keyNum: int64(vf2), - minMemoryMB: sec2minVMRSSMB[v0], - avgMemoryMB: vf1, - maxMemoryMB: sec2maxVMRSSMB[v0], + point := dbtester.CumulativeKeyNumAndMemory{ + UnixSecond: v0, + Throughput: int64(vf2), + + MinMemoryMB: sec2minVMRSSMB[v0], + AvgMemoryMB: vf1, + MaxMemoryMB: sec2maxVMRSSMB[v0], } - tslice = append(tslice, point) + cdata = append(cdata, point) } // aggregate memory by number of keys - knms := findRangesMemory(tslice, 1000, totalRequests) + knms := dbtester.FindRangesMemory(cdata, 1000, totalRequests) ckk1 := dataframe.NewColumn("KEYS") ckk2 := dataframe.NewColumn("MIN-VMRSS-MB") ckk3 := dataframe.NewColumn("AVG-VMRSS-MB") ckk4 := dataframe.NewColumn("MAX-VMRSS-MB") for i := range knms { - ckk1.PushBack(dataframe.NewStringValue(knms[i].keyNum)) - ckk2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].minMemoryMB))) - ckk3.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].avgMemoryMB))) - ckk4.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].maxMemoryMB))) + ckk1.PushBack(dataframe.NewStringValue(knms[i].CumulativeKeyNum)) + ckk2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].MinMemoryMB))) + ckk3.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].AvgMemoryMB))) + ckk4.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].MaxMemoryMB))) } fr := dataframe.New() if err := fr.AddColumn(ckk1); err != nil { diff --git a/find_ranges.go b/find_ranges.go index d5dba7d5..9e1a4860 100644 --- a/find_ranges.go +++ b/find_ranges.go @@ -32,7 +32,7 @@ type CumulativeKeyNumToAvgLatency struct { MaxLatency time.Duration } -// CumulativeKeyNumToAvgLatencySlice is a slice of CumulativeKeyNumToAvgLatency. +// CumulativeKeyNumToAvgLatencySlice is a slice of CumulativeKeyNumToAvgLatency to sort by CumulativeKeyNum. type CumulativeKeyNumToAvgLatencySlice []CumulativeKeyNumToAvgLatency func (t CumulativeKeyNumToAvgLatencySlice) Swap(i, j int) { t[i], t[j] = t[j], t[i] } @@ -100,7 +100,8 @@ func FindRangesLatency(data report.TimeSeries, unit int64, totalRequests int64) } kss := []CumulativeKeyNumToAvgLatency{} - delete(rm, 0) + delete(rm, 0) // drop data at beginning + for k, v := range rm { // make sure to use 'k' as CumulativeKeyNum kss = append(kss, CumulativeKeyNumToAvgLatency{ @@ -111,8 +112,7 @@ func FindRangesLatency(data report.TimeSeries, unit int64, totalRequests int64) }) } - // sort by cumulative throughput (number of keys) - // in ascending order + // sort by cumulative throughput (number of keys) in ascending order sort.Sort(CumulativeKeyNumToAvgLatencySlice(kss)) return kss } @@ -121,6 +121,9 @@ func FindRangesLatency(data report.TimeSeries, unit int64, totalRequests int64) // and according memory data. So the higher 'CumulativeKeyNum' is, // the later the data points are in the time series. type CumulativeKeyNumAndMemory struct { + UnixSecond int64 + Throughput int64 + CumulativeKeyNum int64 MinMemoryMB float64 @@ -128,7 +131,7 @@ type CumulativeKeyNumAndMemory struct { MaxMemoryMB float64 } -// CumulativeKeyNumAndMemorySlice is a slice of CumulativeKeyNumAndMemory. +// CumulativeKeyNumAndMemorySlice is a slice of CumulativeKeyNumAndMemory to sort by CumulativeKeyNum. type CumulativeKeyNumAndMemorySlice []CumulativeKeyNumAndMemory func (t CumulativeKeyNumAndMemorySlice) Swap(i, j int) { t[i], t[j] = t[j], t[i] } @@ -137,14 +140,23 @@ func (t CumulativeKeyNumAndMemorySlice) Less(i, j int) bool { return t[i].CumulativeKeyNum < t[j].CumulativeKeyNum } +// CumulativeKeyNumAndMemoryByUnixSecond is a slice of CumulativeKeyNumAndMemory to sort by UnixSecond. +type CumulativeKeyNumAndMemoryByUnixSecond []CumulativeKeyNumAndMemory + +func (t CumulativeKeyNumAndMemoryByUnixSecond) Swap(i, j int) { t[i], t[j] = t[j], t[i] } +func (t CumulativeKeyNumAndMemoryByUnixSecond) Len() int { return len(t) } +func (t CumulativeKeyNumAndMemoryByUnixSecond) Less(i, j int) bool { + return t[i].UnixSecond < t[j].UnixSecond +} + // FindRangesMemory sorts all data points by its timestamp. // And then aggregate by the cumulative throughput, // in order to map the number of keys to the average memory usage. func FindRangesMemory(data []CumulativeKeyNumAndMemory, unit int64, totalRequests int64) CumulativeKeyNumAndMemorySlice { - // TODO: need to sort by timestamps because we want the 'cumulative' + // need to sort by timestamps because we want the 'cumulative' // trends as we write more keys, 'report.TimeSeries' already implements // sort interface, so just sort.Sort(data) - // + sort.Sort(CumulativeKeyNumAndMemoryByUnixSecond(data)) cumulKeyN := int64(0) maxKey := int64(0) @@ -156,18 +168,16 @@ func FindRangesMemory(data []CumulativeKeyNumAndMemory, unit int64, totalRequest // so the range is the key // and the value is the cumulative throughput for _, ts := range data { - cumulKeyN += ts.CumulativeKeyNum + cumulKeyN += ts.Throughput if cumulKeyN < unit { // not enough data points yet continue } - mem := ts - // cumulKeyN >= unit for cumulKeyN > maxKey { maxKey += unit - rm[maxKey] = mem + rm[maxKey] = ts } } @@ -194,8 +204,7 @@ func FindRangesMemory(data []CumulativeKeyNumAndMemory, unit int64, totalRequest }) } - // sort by cumulative throughput (number of keys) - // in ascending order + // sort by cumulative throughput (number of keys) in ascending order sort.Sort(CumulativeKeyNumAndMemorySlice(kss)) return kss } diff --git a/find_ranges_test.go b/find_ranges_test.go index 7a89bc72..06afa1d9 100644 --- a/find_ranges_test.go +++ b/find_ranges_test.go @@ -22,57 +22,6 @@ import ( "github.com/coreos/dbtester/pkg/report" ) -func TestFindRangesMemory(t *testing.T) { - var data []CumulativeKeyNumAndMemory - for i := int64(0); i < 10; i++ { - dp := CumulativeKeyNumAndMemory{ - CumulativeKeyNum: 50, - AvgMemoryMB: float64(i + 1), - } - data = append(data, dp) - } - - pss := FindRangesMemory(data, 20, 555) - expexcted := []CumulativeKeyNumAndMemory{ - {CumulativeKeyNum: 20, AvgMemoryMB: 1}, - {CumulativeKeyNum: 40, AvgMemoryMB: 1}, - {CumulativeKeyNum: 60, AvgMemoryMB: 1}, - {CumulativeKeyNum: 80, AvgMemoryMB: 2}, - {CumulativeKeyNum: 100, AvgMemoryMB: 2}, - {CumulativeKeyNum: 120, AvgMemoryMB: 3}, - {CumulativeKeyNum: 140, AvgMemoryMB: 3}, - {CumulativeKeyNum: 160, AvgMemoryMB: 3}, - {CumulativeKeyNum: 180, AvgMemoryMB: 4}, - {CumulativeKeyNum: 200, AvgMemoryMB: 4}, - {CumulativeKeyNum: 220, AvgMemoryMB: 5}, - {CumulativeKeyNum: 240, AvgMemoryMB: 5}, - {CumulativeKeyNum: 260, AvgMemoryMB: 5}, - {CumulativeKeyNum: 280, AvgMemoryMB: 6}, - {CumulativeKeyNum: 300, AvgMemoryMB: 6}, - {CumulativeKeyNum: 320, AvgMemoryMB: 7}, - {CumulativeKeyNum: 340, AvgMemoryMB: 7}, - {CumulativeKeyNum: 360, AvgMemoryMB: 7}, - {CumulativeKeyNum: 380, AvgMemoryMB: 8}, - {CumulativeKeyNum: 400, AvgMemoryMB: 8}, - {CumulativeKeyNum: 420, AvgMemoryMB: 9}, - {CumulativeKeyNum: 440, AvgMemoryMB: 9}, - {CumulativeKeyNum: 460, AvgMemoryMB: 9}, - {CumulativeKeyNum: 480, AvgMemoryMB: 10}, - {CumulativeKeyNum: 500, AvgMemoryMB: 10}, - {CumulativeKeyNum: 520, AvgMemoryMB: 0}, - {CumulativeKeyNum: 540, AvgMemoryMB: 0}, - {CumulativeKeyNum: 555, AvgMemoryMB: 0}, - } - if len(pss) != len(expexcted) { - t.Fatalf("expected %+v, got %+v", expexcted, pss) - } - for i, elem := range pss { - if !reflect.DeepEqual(elem, expexcted[i]) { - t.Fatalf("#%d: processed data point expected %+v, got %+v", i, expexcted[i], elem) - } - } -} - func TestFindRangesLatency(t *testing.T) { var data report.TimeSeries for i := int64(0); i < 10; i++ { @@ -124,3 +73,55 @@ func TestFindRangesLatency(t *testing.T) { } } } + +func TestFindRangesMemory(t *testing.T) { + var data []CumulativeKeyNumAndMemory + for i := int64(0); i < 10; i++ { + dp := CumulativeKeyNumAndMemory{ + UnixSecond: i + 1, + Throughput: 50, + AvgMemoryMB: float64(i + 1), + } + data = append(data, dp) + } + + pss := FindRangesMemory(data, 20, 555) + expexcted := []CumulativeKeyNumAndMemory{ + {CumulativeKeyNum: 20, AvgMemoryMB: 1}, + {CumulativeKeyNum: 40, AvgMemoryMB: 1}, + {CumulativeKeyNum: 60, AvgMemoryMB: 1}, + {CumulativeKeyNum: 80, AvgMemoryMB: 2}, + {CumulativeKeyNum: 100, AvgMemoryMB: 2}, + {CumulativeKeyNum: 120, AvgMemoryMB: 3}, + {CumulativeKeyNum: 140, AvgMemoryMB: 3}, + {CumulativeKeyNum: 160, AvgMemoryMB: 3}, + {CumulativeKeyNum: 180, AvgMemoryMB: 4}, + {CumulativeKeyNum: 200, AvgMemoryMB: 4}, + {CumulativeKeyNum: 220, AvgMemoryMB: 5}, + {CumulativeKeyNum: 240, AvgMemoryMB: 5}, + {CumulativeKeyNum: 260, AvgMemoryMB: 5}, + {CumulativeKeyNum: 280, AvgMemoryMB: 6}, + {CumulativeKeyNum: 300, AvgMemoryMB: 6}, + {CumulativeKeyNum: 320, AvgMemoryMB: 7}, + {CumulativeKeyNum: 340, AvgMemoryMB: 7}, + {CumulativeKeyNum: 360, AvgMemoryMB: 7}, + {CumulativeKeyNum: 380, AvgMemoryMB: 8}, + {CumulativeKeyNum: 400, AvgMemoryMB: 8}, + {CumulativeKeyNum: 420, AvgMemoryMB: 9}, + {CumulativeKeyNum: 440, AvgMemoryMB: 9}, + {CumulativeKeyNum: 460, AvgMemoryMB: 9}, + {CumulativeKeyNum: 480, AvgMemoryMB: 10}, + {CumulativeKeyNum: 500, AvgMemoryMB: 10}, + {CumulativeKeyNum: 520, AvgMemoryMB: 0}, + {CumulativeKeyNum: 540, AvgMemoryMB: 0}, + {CumulativeKeyNum: 555, AvgMemoryMB: 0}, + } + if len(pss) != len(expexcted) { + t.Fatalf("expected %+v, got %+v", expexcted, pss) + } + for i, elem := range pss { + if !reflect.DeepEqual(elem, expexcted[i]) { + t.Fatalf("#%d: processed data point expected %+v, got %+v", i, expexcted[i], elem) + } + } +}