diff --git a/analyze/04_aggregate_all_analyze_data.go b/analyze/04_aggregate_all_analyze_data.go index 3cb72386..c3b00188 100644 --- a/analyze/04_aggregate_all_analyze_data.go +++ b/analyze/04_aggregate_all_analyze_data.go @@ -19,11 +19,12 @@ import ( "strings" "github.com/coreos/dbtester" + humanize "github.com/dustin/go-humanize" "github.com/gyuho/dataframe" ) // aggregateAll aggregates all system metrics from 3+ nodes. -func (data *analyzeData) aggregateAll(memoryByKeyPath string, totalRequests int64) error { +func (data *analyzeData) aggregateAll(memoryByKeyPath string, readBytesDeltaByKeyPath string, writeBytesDeltaByKeyPath string, totalRequests int64) error { colSys, err := data.sysAgg.Column("UNIX-SECOND") if err != nil { return err @@ -416,6 +417,14 @@ func (data *analyzeData) aggregateAll(memoryByKeyPath string, totalRequests int6 if err != nil { return err } + colReadBytesNumDelta, err := data.aggregated.Column("AVG-READ-BYTES-NUM-DELTA") + if err != nil { + return err + } + colWriteBytesNumDelta, err := data.aggregated.Column("AVG-WRITE-BYTES-NUM-DELTA") + if err != nil { + return err + } if colUnixSecond.Count() != colMemoryMB.Count() { return fmt.Errorf("SECOND column count %d, AVG-VMRSS-MB column count %d", colUnixSecond.Count(), colMemoryMB.Count()) } @@ -425,8 +434,14 @@ func (data *analyzeData) aggregateAll(memoryByKeyPath string, totalRequests int6 if colUnixSecond.Count() != colAvgThroughput.Count() { return fmt.Errorf("SECOND column count %d, AVG-THROUGHPUT column count %d", colUnixSecond.Count(), colAvgThroughput.Count()) } + if colReadBytesNumDelta.Count() != colAvgThroughput.Count() { + return fmt.Errorf("AVG-READ-BYTES-NUM-DELTA column count %d, AVG-THROUGHPUT column count %d", colReadBytesNumDelta.Count(), colAvgThroughput.Count()) + } + if colWriteBytesNumDelta.Count() != colAvgThroughput.Count() { + return fmt.Errorf("AVG-WRITE-BYTES-NUM-DELTA column count %d, AVG-THROUGHPUT column count %d", colWriteBytesNumDelta.Count(), colAvgThroughput.Count()) + } - var cdata []dbtester.CumulativeKeyNumAndMemory + var cdata []dbtester.CumulativeKeyNumAndOtherData for i := 0; i < colUnixSecond.Count(); i++ { vv0, err := colUnixSecond.Value(i) if err != nil { @@ -446,43 +461,98 @@ func (data *analyzeData) aggregateAll(memoryByKeyPath string, totalRequests int6 } vf2, _ := vv2.Float64() - point := dbtester.CumulativeKeyNumAndMemory{ + vv3, err := colReadBytesNumDelta.Value(i) + if err != nil { + return err + } + vf3, _ := vv3.Float64() + + vv4, err := colWriteBytesNumDelta.Value(i) + if err != nil { + return err + } + vf4, _ := vv4.Float64() + + point := dbtester.CumulativeKeyNumAndOtherData{ UnixSecond: v0, Throughput: int64(vf2), MinMemoryMB: sec2minVMRSSMB[v0], AvgMemoryMB: vf1, MaxMemoryMB: sec2maxVMRSSMB[v0], + + AvgReadBytesDelta: vf3, + AvgWriteBytesDelta: vf4, } cdata = append(cdata, point) } - // aggregate memory by number of keys - knms := dbtester.FindRangesMemory(cdata, 1000, totalRequests) + // aggregate memory,write/read bytes by number of keys + knms := dbtester.FindRangesData(cdata, 1000, totalRequests) + ckk1 := dataframe.NewColumn("KEYS") ckk2 := dataframe.NewColumn("MIN-VMRSS-MB") ckk3 := dataframe.NewColumn("AVG-VMRSS-MB") ckk4 := dataframe.NewColumn("MAX-VMRSS-MB") + ckk5 := dataframe.NewColumn("AVG-READ-BYTES-NUM-DELTA") + ckk6 := dataframe.NewColumn("AVG-READ-BYTES") + ckk7 := dataframe.NewColumn("AVG-WRITE-BYTES-NUM-DELTA") + ckk8 := dataframe.NewColumn("AVG-WRITE-BYTES") for i := range knms { ckk1.PushBack(dataframe.NewStringValue(knms[i].CumulativeKeyNum)) ckk2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].MinMemoryMB))) ckk3.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].AvgMemoryMB))) ckk4.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].MaxMemoryMB))) + ckk5.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].AvgReadBytesDelta))) + ckk6.PushBack(dataframe.NewStringValue(humanize.Bytes(uint64(knms[i].AvgReadBytesDelta)))) + ckk7.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].AvgWriteBytesDelta))) + ckk8.PushBack(dataframe.NewStringValue(humanize.Bytes(uint64(knms[i].AvgWriteBytesDelta)))) } - fr := dataframe.New() - if err := fr.AddColumn(ckk1); err != nil { + + fr1 := dataframe.New() + if err := fr1.AddColumn(ckk1); err != nil { plog.Fatal(err) } - if err := fr.AddColumn(ckk2); err != nil { + if err := fr1.AddColumn(ckk2); err != nil { plog.Fatal(err) } - if err := fr.AddColumn(ckk3); err != nil { + if err := fr1.AddColumn(ckk3); err != nil { plog.Fatal(err) } - if err := fr.AddColumn(ckk4); err != nil { + if err := fr1.AddColumn(ckk4); err != nil { plog.Fatal(err) } - if err := fr.CSV(memoryByKeyPath); err != nil { + if err := fr1.CSV(memoryByKeyPath); err != nil { + plog.Fatal(err) + } + + // aggregate read bytes by number of keys + fr2 := dataframe.New() + if err := fr2.AddColumn(ckk1); err != nil { + plog.Fatal(err) + } + if err := fr2.AddColumn(ckk5); err != nil { + plog.Fatal(err) + } + if err := fr2.AddColumn(ckk6); err != nil { + plog.Fatal(err) + } + if err := fr2.CSV(readBytesDeltaByKeyPath); err != nil { + plog.Fatal(err) + } + + // aggregate write bytes by number of keys + fr3 := dataframe.New() + if err := fr3.AddColumn(ckk1); err != nil { + plog.Fatal(err) + } + if err := fr3.AddColumn(ckk7); err != nil { + plog.Fatal(err) + } + if err := fr3.AddColumn(ckk8); err != nil { + plog.Fatal(err) + } + if err := fr3.CSV(writeBytesDeltaByKeyPath); err != nil { plog.Fatal(err) } diff --git a/analyze/command.go b/analyze/command.go index 5fc9e8ad..f9276f9c 100644 --- a/analyze/command.go +++ b/analyze/command.go @@ -88,7 +88,7 @@ func do(configPath string) error { if err = ad.importBenchMetrics(testdata.ClientLatencyThroughputTimeseriesPath); err != nil { return err } - if err = ad.aggregateAll(testdata.ServerMemoryByKeyNumberPath, testgroup.RequestNumber); err != nil { + if err = ad.aggregateAll(testdata.ServerMemoryByKeyNumberPath, testdata.ServerReadBytesDeltaByKeyNumberPath, testdata.ServerWriteBytesDeltaByKeyNumberPath, testgroup.RequestNumber); err != nil { return err } if err = ad.save(); err != nil { @@ -239,7 +239,6 @@ func do(configPath string) error { row28WritesCompletedDeltaSum = append(row28WritesCompletedDeltaSum, humanize.Comma(int64(writesCompletedDeltaSum))) row29SectorsWrittenDeltaSum = append(row29SectorsWrittenDeltaSum, humanize.Comma(int64(sectorsWrittenDeltaSum))) - // TODO: handle overflowed memory value? sort.Float64s(maxAvgVMRSSMBs) mv := maxAvgVMRSSMBs[len(maxAvgVMRSSMBs)-1] mb := uint64(mv * 1000000)