Merge pull request #234 from gyuho/process-agg

*: improve time series process utils
This commit is contained in:
Gyu-Ho Lee 2017-02-02 09:51:26 -08:00 committed by GitHub
commit 99535d31f4
13 changed files with 253 additions and 118 deletions

View File

@ -31,6 +31,7 @@ type RawData struct {
DataBenchmarkThroughput string `yaml:"data_benchmark_throughput"`
DataBenchmarkLatencyByKey string `yaml:"data_benchmark_latency_by_key"`
DataBenchmarkMemoryByKey string `yaml:"data_benchmark_memory_by_key"`
TotalRequests int `yaml:"total_requests"`
}
// Config defines analyze configuration.

View File

@ -3,6 +3,7 @@ work_dir: 2017Q1-01-etcd-zookeeper-consul/01-write-1M-keys
all_aggregated_path: 2017Q1-01-etcd-zookeeper-consul/01-write-1M-keys/aggregated.csv
all_latency_by_key: 2017Q1-01-etcd-zookeeper-consul/01-write-1M-keys/aggregated-data-latency-by-key-number.csv
all_memory_by_key: 2017Q1-01-etcd-zookeeper-consul/01-write-1M-keys/aggregated-data-memory-by-key-number.csv
total_requests: 100000
raw_data:
- legend: etcd v3.1 (Go 1.7.4)

View File

@ -23,7 +23,7 @@ import (
)
// aggregateAll aggregates all system metrics from 3+ nodes.
func (data *analyzeData) aggregateAll(memoryByKeyPath string) error {
func (data *analyzeData) aggregateAll(memoryByKeyPath string, totalRequests int) error {
// TODO: UNIX-TS from pkg/report data is time.Time.Unix
// UNIX-TS from psn.CSV data is time.Time.UnixNano
// we need some kind of way to combine those with matching timestamps
@ -411,90 +411,28 @@ func (data *analyzeData) aggregateAll(memoryByKeyPath string) error {
}
sort.Sort(keyNumAndMemorys(tslice))
{
sorted := processTimeSeries(tslice, 1000)
c1 := dataframe.NewColumn("KEYS")
c2 := dataframe.NewColumn("AVG-VMRSS-MB")
for i := range sorted {
c1.PushBack(dataframe.NewStringValue(sorted[i].keyNum))
c2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", sorted[i].memoryMB)))
}
fr := dataframe.New()
if err := fr.AddColumn(c1); err != nil {
plog.Fatal(err)
}
if err := fr.AddColumn(c2); err != nil {
plog.Fatal(err)
}
if err := fr.CSV(memoryByKeyPath); err != nil {
plog.Fatal(err)
}
// aggregate memory by number of keys
knms := processTimeSeries(tslice, 1000, totalRequests)
ckk1 := dataframe.NewColumn("KEYS")
ckk2 := dataframe.NewColumn("AVG-VMRSS-MB")
for i := range knms {
ckk1.PushBack(dataframe.NewStringValue(knms[i].keyNum))
ckk2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].memoryMB)))
}
fr := dataframe.New()
if err := fr.AddColumn(ckk1); err != nil {
plog.Fatal(err)
}
if err := fr.AddColumn(ckk2); err != nil {
plog.Fatal(err)
}
if err := fr.CSV(memoryByKeyPath); err != nil {
plog.Fatal(err)
}
return nil
}
func processTimeSeries(tslice []keyNumAndMemory, unit int64) []keyNumAndMemory {
sort.Sort(keyNumAndMemorys(tslice))
cumulKeyN := int64(0)
maxKey := int64(0)
rm := make(map[int64]float64)
// this data is aggregated by second
// and we want to map number of keys to latency
// so the range is the key
// and the value is the cumulative throughput
for _, ts := range tslice {
cumulKeyN += ts.keyNum
if cumulKeyN < unit {
// not enough data points yet
continue
}
mem := ts.memoryMB
// cumulKeyN >= unit
for cumulKeyN > maxKey {
maxKey += unit
rm[maxKey] = mem
}
}
kss := []keyNumAndMemory{}
for k, v := range rm {
kss = append(kss, keyNumAndMemory{keyNum: k, memoryMB: v})
}
sort.Sort(keyNumAndMemorys(kss))
return kss
}
type keyNumAndMemory struct {
keyNum int64
memoryMB float64
}
type keyNumAndMemorys []keyNumAndMemory
func (t keyNumAndMemorys) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t keyNumAndMemorys) Len() int { return len(t) }
func (t keyNumAndMemorys) Less(i, j int) bool { return t[i].keyNum < t[j].keyNum }
func (data *analyzeData) save() error {
return data.aggregated.CSV(data.csvOutputpath)
}
func makeHeader(column string, tag string) string {
return fmt.Sprintf("%s-%s", column, tag)
}
func makeTag(legend string) string {
legend = strings.ToLower(legend)
legend = strings.Replace(legend, "go ", "go", -1)
legend = strings.Replace(legend, "java ", "java", -1)
legend = strings.Replace(legend, "(", "", -1)
legend = strings.Replace(legend, ")", "", -1)
return strings.Replace(legend, " ", "-", -1)
}

View File

@ -0,0 +1,75 @@
// Copyright 2017 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package analyze
import "sort"
func processTimeSeries(tslice []keyNumAndMemory, unit int64, totalRequests int) []keyNumAndMemory {
sort.Sort(keyNumAndMemorys(tslice))
cumulKeyN := int64(0)
maxKey := int64(0)
rm := make(map[int64]float64)
// this data is aggregated by second
// and we want to map number of keys to latency
// so the range is the key
// and the value is the cumulative throughput
for _, ts := range tslice {
cumulKeyN += ts.keyNum
if cumulKeyN < unit {
// not enough data points yet
continue
}
mem := ts.memoryMB
// cumulKeyN >= unit
for cumulKeyN > maxKey {
maxKey += unit
rm[maxKey] = mem
}
}
// fill-in empty rows
for i := maxKey; i < int64(totalRequests); i += unit {
if _, ok := rm[i]; !ok {
rm[i] = 0.0
}
}
if _, ok := rm[int64(totalRequests)]; !ok {
rm[int64(totalRequests)] = 0.0
}
kss := []keyNumAndMemory{}
for k, v := range rm {
kss = append(kss, keyNumAndMemory{keyNum: k, memoryMB: v})
}
sort.Sort(keyNumAndMemorys(kss))
return kss
}
type keyNumAndMemory struct {
keyNum int64
memoryMB float64
}
type keyNumAndMemorys []keyNumAndMemory
func (t keyNumAndMemorys) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t keyNumAndMemorys) Len() int { return len(t) }
func (t keyNumAndMemorys) Less(i, j int) bool { return t[i].keyNum < t[j].keyNum }

View File

@ -0,0 +1,71 @@
// Copyright 2017 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package analyze
import (
"reflect"
"testing"
)
func Test_processTimeSeries(t *testing.T) {
var tslice []keyNumAndMemory
for i := int64(0); i < 10; i++ {
dp := keyNumAndMemory{
keyNum: 50,
memoryMB: float64(i + 1),
}
tslice = append(tslice, dp)
}
pss := processTimeSeries(tslice, 20, 555)
expexcted := []keyNumAndMemory{
{keyNum: 20, memoryMB: 1},
{keyNum: 40, memoryMB: 1},
{keyNum: 60, memoryMB: 1},
{keyNum: 80, memoryMB: 2},
{keyNum: 100, memoryMB: 2},
{keyNum: 120, memoryMB: 3},
{keyNum: 140, memoryMB: 3},
{keyNum: 160, memoryMB: 3},
{keyNum: 180, memoryMB: 4},
{keyNum: 200, memoryMB: 4},
{keyNum: 220, memoryMB: 5},
{keyNum: 240, memoryMB: 5},
{keyNum: 260, memoryMB: 5},
{keyNum: 280, memoryMB: 6},
{keyNum: 300, memoryMB: 6},
{keyNum: 320, memoryMB: 7},
{keyNum: 340, memoryMB: 7},
{keyNum: 360, memoryMB: 7},
{keyNum: 380, memoryMB: 8},
{keyNum: 400, memoryMB: 8},
{keyNum: 420, memoryMB: 9},
{keyNum: 440, memoryMB: 9},
{keyNum: 460, memoryMB: 9},
{keyNum: 480, memoryMB: 10},
{keyNum: 500, memoryMB: 10},
{keyNum: 520, memoryMB: 0},
{keyNum: 540, memoryMB: 0},
{keyNum: 555, memoryMB: 0},
}
if len(pss) != len(expexcted) {
t.Fatalf("expected %+v, got %+v", expexcted, pss)
}
for i, elem := range pss {
if !reflect.DeepEqual(elem, expexcted[i]) {
t.Fatalf("#%d: processed data point expected %+v, got %+v", i, expexcted[i], elem)
}
}
}

View File

@ -0,0 +1,33 @@
// Copyright 2017 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package analyze
import (
"fmt"
"strings"
)
func makeHeader(column string, tag string) string {
return fmt.Sprintf("%s-%s", column, tag)
}
func makeTag(legend string) string {
legend = strings.ToLower(legend)
legend = strings.Replace(legend, "go ", "go", -1)
legend = strings.Replace(legend, "java ", "java", -1)
legend = strings.Replace(legend, "(", "", -1)
legend = strings.Replace(legend, ")", "", -1)
return strings.Replace(legend, " ", "-", -1)
}

View File

@ -63,7 +63,7 @@ func do(configPath string) error {
if err = ad.importBenchMetrics(elem.DataBenchmarkThroughput); err != nil {
return err
}
if err = ad.aggregateAll(elem.DataBenchmarkMemoryByKey); err != nil {
if err = ad.aggregateAll(elem.DataBenchmarkMemoryByKey, elem.TotalRequests); err != nil {
return err
}
if err = ad.save(); err != nil {

View File

@ -3,6 +3,7 @@ work_dir: 2017Q1-02-etcd-zookeeper-consul/01-write-1M-keys-client-variable
all_aggregated_path: 2017Q1-02-etcd-zookeeper-consul/01-write-1M-keys-client-variable/aggregated.csv
all_latency_by_key: 2017Q1-02-etcd-zookeeper-consul/01-write-1M-keys-client-variable/aggregated-data-latency-by-key-number.csv
all_memory_by_key: 2017Q1-02-etcd-zookeeper-consul/01-write-1M-keys-client-variable/aggregated-data-memory-by-key-number.csv
total_requests: 1000000
raw_data:
- legend: etcd v3.1 (Go 1.7.4)

View File

@ -3,6 +3,7 @@ work_dir: 2017Q1-02-etcd-zookeeper-consul/02-write-1M-keys-best-throughput
all_aggregated_path: 2017Q1-02-etcd-zookeeper-consul/02-write-1M-keys-best-throughput/aggregated.csv
all_latency_by_key: 2017Q1-02-etcd-zookeeper-consul/02-write-1M-keys-best-throughput/aggregated-data-latency-by-key-number.csv
all_memory_by_key: 2017Q1-02-etcd-zookeeper-consul/02-write-1M-keys-best-throughput/aggregated-data-memory-by-key-number.csv
total_requests: 1000000
raw_data:
- legend: etcd v3.1 (Go 1.7.4)

View File

@ -3,6 +3,7 @@ work_dir: 2017Q1-02-etcd-zookeeper-consul/03-write-1M-keys-1000-client
all_aggregated_path: 2017Q1-02-etcd-zookeeper-consul/03-write-1M-keys-1000-client/aggregated.csv
all_latency_by_key: 2017Q1-02-etcd-zookeeper-consul/03-write-1M-keys-1000-client/aggregated-data-latency-by-key-number.csv
all_memory_by_key: 2017Q1-02-etcd-zookeeper-consul/03-write-1M-keys-1000-client/aggregated-data-memory-by-key-number.csv
total_requests: 1000000
raw_data:
- legend: etcd v3.1 (Go 1.7.4)

View File

@ -3,6 +3,7 @@ work_dir: 2017Q1-02-etcd-zookeeper-consul/04-write-too-many-keys
all_aggregated_path: 2017Q1-02-etcd-zookeeper-consul/04-write-too-many-keys/aggregated.csv
all_latency_by_key: 2017Q1-02-etcd-zookeeper-consul/04-write-too-many-keys/aggregated-data-latency-by-key-number.csv
all_memory_by_key: 2017Q1-02-etcd-zookeeper-consul/04-write-too-many-keys/aggregated-data-memory-by-key-number.csv
total_requests: ????
raw_data:
- legend: etcd v3.1 (Go 1.7.4)

View File

@ -328,25 +328,23 @@ func saveDataLatencyThroughputTimeseries(cfg Config, st report.Stats, tsToClient
plog.Fatal(err)
}
{
// aggregate latency by the number of keys
tslice := processTimeSeries(st.TimeSeries, 1000)
c1 := dataframe.NewColumn("KEYS")
c2 := dataframe.NewColumn("AVG-LATENCY-MS")
for i := range tslice {
c1.PushBack(dataframe.NewStringValue(tslice[i].keyNum))
c2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%f", toMillisecond(tslice[i].avgLat))))
}
fr := dataframe.New()
if err := fr.AddColumn(c1); err != nil {
plog.Fatal(err)
}
if err := fr.AddColumn(c2); err != nil {
plog.Fatal(err)
}
if err := fr.CSV(cfg.DataLatencyByKeyNumber); err != nil {
plog.Fatal(err)
}
// aggregate latency by the number of keys
tss := processTimeSeries(st.TimeSeries, 1000, cfg.Step2.TotalRequests)
ctt1 := dataframe.NewColumn("KEYS")
ctt2 := dataframe.NewColumn("AVG-LATENCY-MS")
for i := range tss {
ctt1.PushBack(dataframe.NewStringValue(tss[i].keyNum))
ctt2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%f", toMillisecond(tss[i].avgLat))))
}
frr := dataframe.New()
if err := frr.AddColumn(ctt1); err != nil {
plog.Fatal(err)
}
if err := frr.AddColumn(ctt2); err != nil {
plog.Fatal(err)
}
if err := frr.CSV(cfg.DataLatencyByKeyNumber); err != nil {
plog.Fatal(err)
}
}
@ -386,8 +384,8 @@ func saveAllStats(cfg Config, stats report.Stats, tsToClientN map[int64]int) {
// If unis is 1000 and the average throughput per second is 30,000
// and its average latency is 10ms, it will have 30 data points with
// latency 10ms.
func processTimeSeries(tslice report.TimeSeries, unit int64) KeyNumToAvgLatencys {
sort.Sort(tslice)
func processTimeSeries(tss report.TimeSeries, unit int64, totalRequests int) keyNumToAvgLatencys {
sort.Sort(tss)
cumulKeyN := int64(0)
maxKey := int64(0)
@ -398,7 +396,7 @@ func processTimeSeries(tslice report.TimeSeries, unit int64) KeyNumToAvgLatencys
// and we want to map number of keys to latency
// so the range is the key
// and the value is the cumulative throughput
for _, ts := range tslice {
for _, ts := range tss {
cumulKeyN += ts.ThroughPut
if cumulKeyN < unit {
// not enough data points yet
@ -414,22 +412,32 @@ func processTimeSeries(tslice report.TimeSeries, unit int64) KeyNumToAvgLatencys
}
}
kss := []KeyNumToAvgLatency{}
for k, v := range rm {
kss = append(kss, KeyNumToAvgLatency{keyNum: k, avgLat: v})
// fill-in empty rows
for i := maxKey; i < int64(totalRequests); i += unit {
if _, ok := rm[i]; !ok {
rm[i] = time.Duration(0)
}
}
sort.Sort(KeyNumToAvgLatencys(kss))
if _, ok := rm[int64(totalRequests)]; !ok {
rm[int64(totalRequests)] = time.Duration(0)
}
kss := []keyNumToAvgLatency{}
for k, v := range rm {
kss = append(kss, keyNumToAvgLatency{keyNum: k, avgLat: v})
}
sort.Sort(keyNumToAvgLatencys(kss))
return kss
}
type KeyNumToAvgLatency struct {
type keyNumToAvgLatency struct {
keyNum int64
avgLat time.Duration
}
type KeyNumToAvgLatencys []KeyNumToAvgLatency
type keyNumToAvgLatencys []keyNumToAvgLatency
func (t KeyNumToAvgLatencys) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t KeyNumToAvgLatencys) Len() int { return len(t) }
func (t KeyNumToAvgLatencys) Less(i, j int) bool { return t[i].keyNum < t[j].keyNum }
func (t keyNumToAvgLatencys) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t keyNumToAvgLatencys) Len() int { return len(t) }
func (t keyNumToAvgLatencys) Less(i, j int) bool { return t[i].keyNum < t[j].keyNum }

View File

@ -15,11 +15,9 @@
package control
import (
"testing"
"time"
"reflect"
"testing"
"time"
"github.com/coreos/etcd/pkg/report"
)
@ -35,8 +33,8 @@ func Test_processTimeSeries(t *testing.T) {
tslice = append(tslice, dp)
}
pss := processTimeSeries(tslice, 20)
expexcted := []KeyNumToAvgLatency{
pss := processTimeSeries(tslice, 20, 555)
expexcted := []keyNumToAvgLatency{
{keyNum: 20, avgLat: 1},
{keyNum: 40, avgLat: 1},
{keyNum: 60, avgLat: 1},
@ -62,6 +60,12 @@ func Test_processTimeSeries(t *testing.T) {
{keyNum: 460, avgLat: 9},
{keyNum: 480, avgLat: 10},
{keyNum: 500, avgLat: 10},
{keyNum: 520, avgLat: 0},
{keyNum: 540, avgLat: 0},
{keyNum: 555, avgLat: 0},
}
if len(pss) != len(expexcted) {
t.Fatalf("expected %+v, got %+v", expexcted, pss)
}
for i, elem := range pss {
if !reflect.DeepEqual(elem, expexcted[i]) {