analyze: save read/write bytes

This commit is contained in:
Gyu-Ho Lee 2017-02-13 15:52:06 -08:00
parent 1929baaf0b
commit f89084ef8b
No known key found for this signature in database
GPG Key ID: 1DDD39C7EB70C24C
2 changed files with 82 additions and 13 deletions

View File

@ -19,11 +19,12 @@ import (
"strings"
"github.com/coreos/dbtester"
humanize "github.com/dustin/go-humanize"
"github.com/gyuho/dataframe"
)
// aggregateAll aggregates all system metrics from 3+ nodes.
func (data *analyzeData) aggregateAll(memoryByKeyPath string, totalRequests int64) error {
func (data *analyzeData) aggregateAll(memoryByKeyPath string, readBytesDeltaByKeyPath string, writeBytesDeltaByKeyPath string, totalRequests int64) error {
colSys, err := data.sysAgg.Column("UNIX-SECOND")
if err != nil {
return err
@ -416,6 +417,14 @@ func (data *analyzeData) aggregateAll(memoryByKeyPath string, totalRequests int6
if err != nil {
return err
}
colReadBytesNumDelta, err := data.aggregated.Column("AVG-READ-BYTES-NUM-DELTA")
if err != nil {
return err
}
colWriteBytesNumDelta, err := data.aggregated.Column("AVG-WRITE-BYTES-NUM-DELTA")
if err != nil {
return err
}
if colUnixSecond.Count() != colMemoryMB.Count() {
return fmt.Errorf("SECOND column count %d, AVG-VMRSS-MB column count %d", colUnixSecond.Count(), colMemoryMB.Count())
}
@ -425,8 +434,14 @@ func (data *analyzeData) aggregateAll(memoryByKeyPath string, totalRequests int6
if colUnixSecond.Count() != colAvgThroughput.Count() {
return fmt.Errorf("SECOND column count %d, AVG-THROUGHPUT column count %d", colUnixSecond.Count(), colAvgThroughput.Count())
}
if colReadBytesNumDelta.Count() != colAvgThroughput.Count() {
return fmt.Errorf("AVG-READ-BYTES-NUM-DELTA column count %d, AVG-THROUGHPUT column count %d", colReadBytesNumDelta.Count(), colAvgThroughput.Count())
}
if colWriteBytesNumDelta.Count() != colAvgThroughput.Count() {
return fmt.Errorf("AVG-WRITE-BYTES-NUM-DELTA column count %d, AVG-THROUGHPUT column count %d", colWriteBytesNumDelta.Count(), colAvgThroughput.Count())
}
var cdata []dbtester.CumulativeKeyNumAndMemory
var cdata []dbtester.CumulativeKeyNumAndOtherData
for i := 0; i < colUnixSecond.Count(); i++ {
vv0, err := colUnixSecond.Value(i)
if err != nil {
@ -446,43 +461,98 @@ func (data *analyzeData) aggregateAll(memoryByKeyPath string, totalRequests int6
}
vf2, _ := vv2.Float64()
point := dbtester.CumulativeKeyNumAndMemory{
vv3, err := colReadBytesNumDelta.Value(i)
if err != nil {
return err
}
vf3, _ := vv3.Float64()
vv4, err := colWriteBytesNumDelta.Value(i)
if err != nil {
return err
}
vf4, _ := vv4.Float64()
point := dbtester.CumulativeKeyNumAndOtherData{
UnixSecond: v0,
Throughput: int64(vf2),
MinMemoryMB: sec2minVMRSSMB[v0],
AvgMemoryMB: vf1,
MaxMemoryMB: sec2maxVMRSSMB[v0],
AvgReadBytesDelta: vf3,
AvgWriteBytesDelta: vf4,
}
cdata = append(cdata, point)
}
// aggregate memory by number of keys
knms := dbtester.FindRangesMemory(cdata, 1000, totalRequests)
// aggregate memory,write/read bytes by number of keys
knms := dbtester.FindRangesData(cdata, 1000, totalRequests)
ckk1 := dataframe.NewColumn("KEYS")
ckk2 := dataframe.NewColumn("MIN-VMRSS-MB")
ckk3 := dataframe.NewColumn("AVG-VMRSS-MB")
ckk4 := dataframe.NewColumn("MAX-VMRSS-MB")
ckk5 := dataframe.NewColumn("AVG-READ-BYTES-NUM-DELTA")
ckk6 := dataframe.NewColumn("AVG-READ-BYTES")
ckk7 := dataframe.NewColumn("AVG-WRITE-BYTES-NUM-DELTA")
ckk8 := dataframe.NewColumn("AVG-WRITE-BYTES")
for i := range knms {
ckk1.PushBack(dataframe.NewStringValue(knms[i].CumulativeKeyNum))
ckk2.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].MinMemoryMB)))
ckk3.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].AvgMemoryMB)))
ckk4.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].MaxMemoryMB)))
ckk5.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].AvgReadBytesDelta)))
ckk6.PushBack(dataframe.NewStringValue(humanize.Bytes(uint64(knms[i].AvgReadBytesDelta))))
ckk7.PushBack(dataframe.NewStringValue(fmt.Sprintf("%.2f", knms[i].AvgWriteBytesDelta)))
ckk8.PushBack(dataframe.NewStringValue(humanize.Bytes(uint64(knms[i].AvgWriteBytesDelta))))
}
fr := dataframe.New()
if err := fr.AddColumn(ckk1); err != nil {
fr1 := dataframe.New()
if err := fr1.AddColumn(ckk1); err != nil {
plog.Fatal(err)
}
if err := fr.AddColumn(ckk2); err != nil {
if err := fr1.AddColumn(ckk2); err != nil {
plog.Fatal(err)
}
if err := fr.AddColumn(ckk3); err != nil {
if err := fr1.AddColumn(ckk3); err != nil {
plog.Fatal(err)
}
if err := fr.AddColumn(ckk4); err != nil {
if err := fr1.AddColumn(ckk4); err != nil {
plog.Fatal(err)
}
if err := fr.CSV(memoryByKeyPath); err != nil {
if err := fr1.CSV(memoryByKeyPath); err != nil {
plog.Fatal(err)
}
// aggregate read bytes by number of keys
fr2 := dataframe.New()
if err := fr2.AddColumn(ckk1); err != nil {
plog.Fatal(err)
}
if err := fr2.AddColumn(ckk5); err != nil {
plog.Fatal(err)
}
if err := fr2.AddColumn(ckk6); err != nil {
plog.Fatal(err)
}
if err := fr2.CSV(readBytesDeltaByKeyPath); err != nil {
plog.Fatal(err)
}
// aggregate write bytes by number of keys
fr3 := dataframe.New()
if err := fr3.AddColumn(ckk1); err != nil {
plog.Fatal(err)
}
if err := fr3.AddColumn(ckk7); err != nil {
plog.Fatal(err)
}
if err := fr3.AddColumn(ckk8); err != nil {
plog.Fatal(err)
}
if err := fr3.CSV(writeBytesDeltaByKeyPath); err != nil {
plog.Fatal(err)
}

View File

@ -88,7 +88,7 @@ func do(configPath string) error {
if err = ad.importBenchMetrics(testdata.ClientLatencyThroughputTimeseriesPath); err != nil {
return err
}
if err = ad.aggregateAll(testdata.ServerMemoryByKeyNumberPath, testgroup.RequestNumber); err != nil {
if err = ad.aggregateAll(testdata.ServerMemoryByKeyNumberPath, testdata.ServerReadBytesDeltaByKeyNumberPath, testdata.ServerWriteBytesDeltaByKeyNumberPath, testgroup.RequestNumber); err != nil {
return err
}
if err = ad.save(); err != nil {
@ -239,7 +239,6 @@ func do(configPath string) error {
row28WritesCompletedDeltaSum = append(row28WritesCompletedDeltaSum, humanize.Comma(int64(writesCompletedDeltaSum)))
row29SectorsWrittenDeltaSum = append(row29SectorsWrittenDeltaSum, humanize.Comma(int64(sectorsWrittenDeltaSum)))
// TODO: handle overflowed memory value?
sort.Float64s(maxAvgVMRSSMBs)
mv := maxAvgVMRSSMBs[len(maxAvgVMRSSMBs)-1]
mb := uint64(mv * 1000000)