*: fix memory aggregation

This commit is contained in:
Gyu-Ho Lee 2017-02-07 12:30:15 -08:00
parent ad579290da
commit febce63017
No known key found for this signature in database
GPG Key ID: 1DDD39C7EB70C24C
3 changed files with 89 additions and 76 deletions

View File

@ -18,6 +18,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/coreos/dbtester"
"github.com/gyuho/dataframe" "github.com/gyuho/dataframe"
) )
@ -407,7 +408,7 @@ func (data *analyzeData) aggregateAll(memoryByKeyPath string, totalRequests int6
return fmt.Errorf("SECOND column count %d, AVG-THROUGHPUT column count %d", colUnixSecond.Count(), colAvgThroughput.Count()) return fmt.Errorf("SECOND column count %d, AVG-THROUGHPUT column count %d", colUnixSecond.Count(), colAvgThroughput.Count())
} }
var tslice []keyNumAndMemory var cdata []dbtester.CumulativeKeyNumAndMemory
for i := 0; i < colUnixSecond.Count(); i++ { for i := 0; i < colUnixSecond.Count(); i++ {
vv0, err := colUnixSecond.Value(i) vv0, err := colUnixSecond.Value(i)
if err != nil { if err != nil {
@ -427,26 +428,28 @@ func (data *analyzeData) aggregateAll(memoryByKeyPath string, totalRequests int6
} }
vf2, _ := vv2.Float64() vf2, _ := vv2.Float64()
point := keyNumAndMemory{ point := dbtester.CumulativeKeyNumAndMemory{
keyNum: int64(vf2), UnixSecond: v0,
minMemoryMB: sec2minVMRSSMB[v0], Throughput: int64(vf2),
avgMemoryMB: vf1,
maxMemoryMB: sec2maxVMRSSMB[v0], MinMemoryMB: sec2minVMRSSMB[v0],
AvgMemoryMB: vf1,
MaxMemoryMB: sec2maxVMRSSMB[v0],
} }
tslice = append(tslice, point) cdata = append(cdata, point)
} }
// aggregate memory by number of keys // aggregate memory by number of keys
knms := findRangesMemory(tslice, 1000, totalRequests) knms := dbtester.FindRangesMemory(cdata, 1000, totalRequests)
ckk1 := dataframe.NewColumn("KEYS") ckk1 := dataframe.NewColumn("KEYS")
ckk2 := dataframe.NewColumn("MIN-VMRSS-MB") ckk2 := dataframe.NewColumn("MIN-VMRSS-MB")
ckk3 := dataframe.NewColumn("AVG-VMRSS-MB") ckk3 := dataframe.NewColumn("AVG-VMRSS-MB")
ckk4 := dataframe.NewColumn("MAX-VMRSS-MB") ckk4 := dataframe.NewColumn("MAX-VMRSS-MB")
for i := range knms { for i := range knms {
ckk1.PushBack(dataframe.NewStringValue(knms[i].keyNum)) ckk1.PushBack(dataframe.NewStringValue(knms[i].CumulativeKeyNum))
ckk2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].minMemoryMB))) ckk2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].MinMemoryMB)))
ckk3.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].avgMemoryMB))) ckk3.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].AvgMemoryMB)))
ckk4.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].maxMemoryMB))) ckk4.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].MaxMemoryMB)))
} }
fr := dataframe.New() fr := dataframe.New()
if err := fr.AddColumn(ckk1); err != nil { if err := fr.AddColumn(ckk1); err != nil {

View File

@ -32,7 +32,7 @@ type CumulativeKeyNumToAvgLatency struct {
MaxLatency time.Duration MaxLatency time.Duration
} }
// CumulativeKeyNumToAvgLatencySlice is a slice of CumulativeKeyNumToAvgLatency. // CumulativeKeyNumToAvgLatencySlice is a slice of CumulativeKeyNumToAvgLatency to sort by CumulativeKeyNum.
type CumulativeKeyNumToAvgLatencySlice []CumulativeKeyNumToAvgLatency type CumulativeKeyNumToAvgLatencySlice []CumulativeKeyNumToAvgLatency
func (t CumulativeKeyNumToAvgLatencySlice) Swap(i, j int) { t[i], t[j] = t[j], t[i] } func (t CumulativeKeyNumToAvgLatencySlice) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
@ -100,7 +100,8 @@ func FindRangesLatency(data report.TimeSeries, unit int64, totalRequests int64)
} }
kss := []CumulativeKeyNumToAvgLatency{} kss := []CumulativeKeyNumToAvgLatency{}
delete(rm, 0) delete(rm, 0) // drop data at beginning
for k, v := range rm { for k, v := range rm {
// make sure to use 'k' as CumulativeKeyNum // make sure to use 'k' as CumulativeKeyNum
kss = append(kss, CumulativeKeyNumToAvgLatency{ kss = append(kss, CumulativeKeyNumToAvgLatency{
@ -111,8 +112,7 @@ func FindRangesLatency(data report.TimeSeries, unit int64, totalRequests int64)
}) })
} }
// sort by cumulative throughput (number of keys) // sort by cumulative throughput (number of keys) in ascending order
// in ascending order
sort.Sort(CumulativeKeyNumToAvgLatencySlice(kss)) sort.Sort(CumulativeKeyNumToAvgLatencySlice(kss))
return kss return kss
} }
@ -121,6 +121,9 @@ func FindRangesLatency(data report.TimeSeries, unit int64, totalRequests int64)
// and according memory data. So the higher 'CumulativeKeyNum' is, // and according memory data. So the higher 'CumulativeKeyNum' is,
// the later the data points are in the time series. // the later the data points are in the time series.
type CumulativeKeyNumAndMemory struct { type CumulativeKeyNumAndMemory struct {
UnixSecond int64
Throughput int64
CumulativeKeyNum int64 CumulativeKeyNum int64
MinMemoryMB float64 MinMemoryMB float64
@ -128,7 +131,7 @@ type CumulativeKeyNumAndMemory struct {
MaxMemoryMB float64 MaxMemoryMB float64
} }
// CumulativeKeyNumAndMemorySlice is a slice of CumulativeKeyNumAndMemory. // CumulativeKeyNumAndMemorySlice is a slice of CumulativeKeyNumAndMemory to sort by CumulativeKeyNum.
type CumulativeKeyNumAndMemorySlice []CumulativeKeyNumAndMemory type CumulativeKeyNumAndMemorySlice []CumulativeKeyNumAndMemory
func (t CumulativeKeyNumAndMemorySlice) Swap(i, j int) { t[i], t[j] = t[j], t[i] } func (t CumulativeKeyNumAndMemorySlice) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
@ -137,14 +140,23 @@ func (t CumulativeKeyNumAndMemorySlice) Less(i, j int) bool {
return t[i].CumulativeKeyNum < t[j].CumulativeKeyNum return t[i].CumulativeKeyNum < t[j].CumulativeKeyNum
} }
// CumulativeKeyNumAndMemoryByUnixSecond is a slice of CumulativeKeyNumAndMemory to sort by UnixSecond.
type CumulativeKeyNumAndMemoryByUnixSecond []CumulativeKeyNumAndMemory
func (t CumulativeKeyNumAndMemoryByUnixSecond) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t CumulativeKeyNumAndMemoryByUnixSecond) Len() int { return len(t) }
func (t CumulativeKeyNumAndMemoryByUnixSecond) Less(i, j int) bool {
return t[i].UnixSecond < t[j].UnixSecond
}
// FindRangesMemory sorts all data points by its timestamp. // FindRangesMemory sorts all data points by its timestamp.
// And then aggregate by the cumulative throughput, // And then aggregate by the cumulative throughput,
// in order to map the number of keys to the average memory usage. // in order to map the number of keys to the average memory usage.
func FindRangesMemory(data []CumulativeKeyNumAndMemory, unit int64, totalRequests int64) CumulativeKeyNumAndMemorySlice { func FindRangesMemory(data []CumulativeKeyNumAndMemory, unit int64, totalRequests int64) CumulativeKeyNumAndMemorySlice {
// TODO: need to sort by timestamps because we want the 'cumulative' // need to sort by timestamps because we want the 'cumulative'
// trends as we write more keys, 'report.TimeSeries' already implements // trends as we write more keys, 'report.TimeSeries' already implements
// sort interface, so just sort.Sort(data) // sort interface, so just sort.Sort(data)
// sort.Sort(CumulativeKeyNumAndMemoryByUnixSecond(data))
cumulKeyN := int64(0) cumulKeyN := int64(0)
maxKey := int64(0) maxKey := int64(0)
@ -156,18 +168,16 @@ func FindRangesMemory(data []CumulativeKeyNumAndMemory, unit int64, totalRequest
// so the range is the key // so the range is the key
// and the value is the cumulative throughput // and the value is the cumulative throughput
for _, ts := range data { for _, ts := range data {
cumulKeyN += ts.CumulativeKeyNum cumulKeyN += ts.Throughput
if cumulKeyN < unit { if cumulKeyN < unit {
// not enough data points yet // not enough data points yet
continue continue
} }
mem := ts
// cumulKeyN >= unit // cumulKeyN >= unit
for cumulKeyN > maxKey { for cumulKeyN > maxKey {
maxKey += unit maxKey += unit
rm[maxKey] = mem rm[maxKey] = ts
} }
} }
@ -194,8 +204,7 @@ func FindRangesMemory(data []CumulativeKeyNumAndMemory, unit int64, totalRequest
}) })
} }
// sort by cumulative throughput (number of keys) // sort by cumulative throughput (number of keys) in ascending order
// in ascending order
sort.Sort(CumulativeKeyNumAndMemorySlice(kss)) sort.Sort(CumulativeKeyNumAndMemorySlice(kss))
return kss return kss
} }

View File

@ -22,57 +22,6 @@ import (
"github.com/coreos/dbtester/pkg/report" "github.com/coreos/dbtester/pkg/report"
) )
func TestFindRangesMemory(t *testing.T) {
var data []CumulativeKeyNumAndMemory
for i := int64(0); i < 10; i++ {
dp := CumulativeKeyNumAndMemory{
CumulativeKeyNum: 50,
AvgMemoryMB: float64(i + 1),
}
data = append(data, dp)
}
pss := FindRangesMemory(data, 20, 555)
expexcted := []CumulativeKeyNumAndMemory{
{CumulativeKeyNum: 20, AvgMemoryMB: 1},
{CumulativeKeyNum: 40, AvgMemoryMB: 1},
{CumulativeKeyNum: 60, AvgMemoryMB: 1},
{CumulativeKeyNum: 80, AvgMemoryMB: 2},
{CumulativeKeyNum: 100, AvgMemoryMB: 2},
{CumulativeKeyNum: 120, AvgMemoryMB: 3},
{CumulativeKeyNum: 140, AvgMemoryMB: 3},
{CumulativeKeyNum: 160, AvgMemoryMB: 3},
{CumulativeKeyNum: 180, AvgMemoryMB: 4},
{CumulativeKeyNum: 200, AvgMemoryMB: 4},
{CumulativeKeyNum: 220, AvgMemoryMB: 5},
{CumulativeKeyNum: 240, AvgMemoryMB: 5},
{CumulativeKeyNum: 260, AvgMemoryMB: 5},
{CumulativeKeyNum: 280, AvgMemoryMB: 6},
{CumulativeKeyNum: 300, AvgMemoryMB: 6},
{CumulativeKeyNum: 320, AvgMemoryMB: 7},
{CumulativeKeyNum: 340, AvgMemoryMB: 7},
{CumulativeKeyNum: 360, AvgMemoryMB: 7},
{CumulativeKeyNum: 380, AvgMemoryMB: 8},
{CumulativeKeyNum: 400, AvgMemoryMB: 8},
{CumulativeKeyNum: 420, AvgMemoryMB: 9},
{CumulativeKeyNum: 440, AvgMemoryMB: 9},
{CumulativeKeyNum: 460, AvgMemoryMB: 9},
{CumulativeKeyNum: 480, AvgMemoryMB: 10},
{CumulativeKeyNum: 500, AvgMemoryMB: 10},
{CumulativeKeyNum: 520, AvgMemoryMB: 0},
{CumulativeKeyNum: 540, AvgMemoryMB: 0},
{CumulativeKeyNum: 555, AvgMemoryMB: 0},
}
if len(pss) != len(expexcted) {
t.Fatalf("expected %+v, got %+v", expexcted, pss)
}
for i, elem := range pss {
if !reflect.DeepEqual(elem, expexcted[i]) {
t.Fatalf("#%d: processed data point expected %+v, got %+v", i, expexcted[i], elem)
}
}
}
func TestFindRangesLatency(t *testing.T) { func TestFindRangesLatency(t *testing.T) {
var data report.TimeSeries var data report.TimeSeries
for i := int64(0); i < 10; i++ { for i := int64(0); i < 10; i++ {
@ -124,3 +73,55 @@ func TestFindRangesLatency(t *testing.T) {
} }
} }
} }
func TestFindRangesMemory(t *testing.T) {
var data []CumulativeKeyNumAndMemory
for i := int64(0); i < 10; i++ {
dp := CumulativeKeyNumAndMemory{
UnixSecond: i + 1,
Throughput: 50,
AvgMemoryMB: float64(i + 1),
}
data = append(data, dp)
}
pss := FindRangesMemory(data, 20, 555)
expexcted := []CumulativeKeyNumAndMemory{
{CumulativeKeyNum: 20, AvgMemoryMB: 1},
{CumulativeKeyNum: 40, AvgMemoryMB: 1},
{CumulativeKeyNum: 60, AvgMemoryMB: 1},
{CumulativeKeyNum: 80, AvgMemoryMB: 2},
{CumulativeKeyNum: 100, AvgMemoryMB: 2},
{CumulativeKeyNum: 120, AvgMemoryMB: 3},
{CumulativeKeyNum: 140, AvgMemoryMB: 3},
{CumulativeKeyNum: 160, AvgMemoryMB: 3},
{CumulativeKeyNum: 180, AvgMemoryMB: 4},
{CumulativeKeyNum: 200, AvgMemoryMB: 4},
{CumulativeKeyNum: 220, AvgMemoryMB: 5},
{CumulativeKeyNum: 240, AvgMemoryMB: 5},
{CumulativeKeyNum: 260, AvgMemoryMB: 5},
{CumulativeKeyNum: 280, AvgMemoryMB: 6},
{CumulativeKeyNum: 300, AvgMemoryMB: 6},
{CumulativeKeyNum: 320, AvgMemoryMB: 7},
{CumulativeKeyNum: 340, AvgMemoryMB: 7},
{CumulativeKeyNum: 360, AvgMemoryMB: 7},
{CumulativeKeyNum: 380, AvgMemoryMB: 8},
{CumulativeKeyNum: 400, AvgMemoryMB: 8},
{CumulativeKeyNum: 420, AvgMemoryMB: 9},
{CumulativeKeyNum: 440, AvgMemoryMB: 9},
{CumulativeKeyNum: 460, AvgMemoryMB: 9},
{CumulativeKeyNum: 480, AvgMemoryMB: 10},
{CumulativeKeyNum: 500, AvgMemoryMB: 10},
{CumulativeKeyNum: 520, AvgMemoryMB: 0},
{CumulativeKeyNum: 540, AvgMemoryMB: 0},
{CumulativeKeyNum: 555, AvgMemoryMB: 0},
}
if len(pss) != len(expexcted) {
t.Fatalf("expected %+v, got %+v", expexcted, pss)
}
for i, elem := range pss {
if !reflect.DeepEqual(elem, expexcted[i]) {
t.Fatalf("#%d: processed data point expected %+v, got %+v", i, expexcted[i], elem)
}
}
}