control: test processTimeSeries, fill in empty rows

This commit is contained in:
Gyu-Ho Lee 2017-02-02 09:41:33 -08:00
parent d02a51b327
commit f20c98a8e5
No known key found for this signature in database
GPG Key ID: 1DDD39C7EB70C24C
2 changed files with 49 additions and 37 deletions

View File

@ -328,25 +328,23 @@ func saveDataLatencyThroughputTimeseries(cfg Config, st report.Stats, tsToClient
plog.Fatal(err)
}
{
// aggregate latency by the number of keys
tslice := processTimeSeries(st.TimeSeries, 1000)
c1 := dataframe.NewColumn("KEYS")
c2 := dataframe.NewColumn("AVG-LATENCY-MS")
for i := range tslice {
c1.PushBack(dataframe.NewStringValue(tslice[i].keyNum))
c2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%f", toMillisecond(tslice[i].avgLat))))
}
fr := dataframe.New()
if err := fr.AddColumn(c1); err != nil {
plog.Fatal(err)
}
if err := fr.AddColumn(c2); err != nil {
plog.Fatal(err)
}
if err := fr.CSV(cfg.DataLatencyByKeyNumber); err != nil {
plog.Fatal(err)
}
// aggregate latency by the number of keys
tss := processTimeSeries(st.TimeSeries, 1000, cfg.Step2.TotalRequests)
ctt1 := dataframe.NewColumn("KEYS")
ctt2 := dataframe.NewColumn("AVG-LATENCY-MS")
for i := range tss {
ctt1.PushBack(dataframe.NewStringValue(tss[i].keyNum))
ctt2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%f", toMillisecond(tss[i].avgLat))))
}
frr := dataframe.New()
if err := frr.AddColumn(ctt1); err != nil {
plog.Fatal(err)
}
if err := frr.AddColumn(ctt2); err != nil {
plog.Fatal(err)
}
if err := frr.CSV(cfg.DataLatencyByKeyNumber); err != nil {
plog.Fatal(err)
}
}
@ -386,8 +384,8 @@ func saveAllStats(cfg Config, stats report.Stats, tsToClientN map[int64]int) {
// If unis is 1000 and the average throughput per second is 30,000
// and its average latency is 10ms, it will have 30 data points with
// latency 10ms.
func processTimeSeries(tslice report.TimeSeries, unit int64) KeyNumToAvgLatencys {
sort.Sort(tslice)
func processTimeSeries(tss report.TimeSeries, unit int64, totalRequests int) keyNumToAvgLatencys {
sort.Sort(tss)
cumulKeyN := int64(0)
maxKey := int64(0)
@ -398,7 +396,7 @@ func processTimeSeries(tslice report.TimeSeries, unit int64) KeyNumToAvgLatencys
// and we want to map number of keys to latency
// so the range is the key
// and the value is the cumulative throughput
for _, ts := range tslice {
for _, ts := range tss {
cumulKeyN += ts.ThroughPut
if cumulKeyN < unit {
// not enough data points yet
@ -414,22 +412,32 @@ func processTimeSeries(tslice report.TimeSeries, unit int64) KeyNumToAvgLatencys
}
}
kss := []KeyNumToAvgLatency{}
for k, v := range rm {
kss = append(kss, KeyNumToAvgLatency{keyNum: k, avgLat: v})
// fill-in empty rows
for i := maxKey; i < int64(totalRequests); i += unit {
if _, ok := rm[i]; !ok {
rm[i] = time.Duration(0)
}
}
sort.Sort(KeyNumToAvgLatencys(kss))
if _, ok := rm[int64(totalRequests)]; !ok {
rm[int64(totalRequests)] = time.Duration(0)
}
kss := []keyNumToAvgLatency{}
for k, v := range rm {
kss = append(kss, keyNumToAvgLatency{keyNum: k, avgLat: v})
}
sort.Sort(keyNumToAvgLatencys(kss))
return kss
}
type KeyNumToAvgLatency struct {
type keyNumToAvgLatency struct {
keyNum int64
avgLat time.Duration
}
type KeyNumToAvgLatencys []KeyNumToAvgLatency
type keyNumToAvgLatencys []keyNumToAvgLatency
func (t KeyNumToAvgLatencys) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t KeyNumToAvgLatencys) Len() int { return len(t) }
func (t KeyNumToAvgLatencys) Less(i, j int) bool { return t[i].keyNum < t[j].keyNum }
func (t keyNumToAvgLatencys) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t keyNumToAvgLatencys) Len() int { return len(t) }
func (t keyNumToAvgLatencys) Less(i, j int) bool { return t[i].keyNum < t[j].keyNum }

View File

@ -15,11 +15,9 @@
package control
import (
"testing"
"time"
"reflect"
"testing"
"time"
"github.com/coreos/etcd/pkg/report"
)
@ -35,8 +33,8 @@ func Test_processTimeSeries(t *testing.T) {
tslice = append(tslice, dp)
}
pss := processTimeSeries(tslice, 20)
expexcted := []KeyNumToAvgLatency{
pss := processTimeSeries(tslice, 20, 555)
expexcted := []keyNumToAvgLatency{
{keyNum: 20, avgLat: 1},
{keyNum: 40, avgLat: 1},
{keyNum: 60, avgLat: 1},
@ -62,6 +60,12 @@ func Test_processTimeSeries(t *testing.T) {
{keyNum: 460, avgLat: 9},
{keyNum: 480, avgLat: 10},
{keyNum: 500, avgLat: 10},
{keyNum: 520, avgLat: 0},
{keyNum: 540, avgLat: 0},
{keyNum: 555, avgLat: 0},
}
if len(pss) != len(expexcted) {
t.Fatalf("expected %+v, got %+v", expexcted, pss)
}
for i, elem := range pss {
if !reflect.DeepEqual(elem, expexcted[i]) {