dbtester/analyze/02_read_all_metrics_to_anal...

149 lines
3.9 KiB
Go

// Copyright 2017 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package analyze
import (
"fmt"
"github.com/gyuho/dataframe"
)
type analyzeData struct {
databaseTag string
legend string
minUnixSecond int64
maxUnixSecond int64
sys []testData
// aggregated frame within [min,maxUnixSecond] from sys
sysAgg dataframe.Frame
benchMetricsFilePath string
benchMetrics testData
// aggregated from sysAgg and benchMetrics
aggregated dataframe.Frame
allAggregatedOutputPath string
}
// readSystemMetricsAll reads all system metric files
// (e.g. if cluster is 3-node, read all 3 files).
// It returns minimum and maximum common unix second and a list of frames.
func readSystemMetricsAll(fpaths ...string) (data *analyzeData, err error) {
data = &analyzeData{}
for i, fpath := range fpaths {
sm, err := readSystemMetrics(fpath)
if err != nil {
return nil, err
}
if i == 0 {
data.minUnixSecond = sm.frontUnixSecond
data.maxUnixSecond = sm.lastUnixSecond
}
if data.minUnixSecond < sm.frontUnixSecond {
data.minUnixSecond = sm.frontUnixSecond
}
if data.maxUnixSecond > sm.lastUnixSecond {
data.maxUnixSecond = sm.lastUnixSecond
}
data.sys = append(data.sys, sm)
}
return
}
// aggSystemMetrics aggregates all system metrics from 3+ nodes.
func (data *analyzeData) aggSystemMetrics() error {
// monitor CSVs from multiple servers, and want them to have equal number of rows
// Truncate all rows before data.minUnixSecond and after data.maxUnixSecond
minTS := fmt.Sprintf("%d", data.minUnixSecond)
maxTS := fmt.Sprintf("%d", data.maxUnixSecond)
data.sysAgg = dataframe.New()
for i := range data.sys {
uc, err := data.sys[i].frame.Column("UNIX-SECOND")
if err != nil {
return err
}
minTSIdx, ok := uc.FindFirst(dataframe.NewStringValue(minTS))
if !ok {
return fmt.Errorf("%v does not exist in %s", minTS, data.sys[i].filePath)
}
maxTSIdx, ok := uc.FindFirst(dataframe.NewStringValue(maxTS))
if !ok {
return fmt.Errorf("%v does not exist in %s", maxTS, data.sys[i].filePath)
}
for _, header := range data.sys[i].frame.Headers() {
if i > 0 && header == "UNIX-SECOND" {
// skip for other databases; we want to keep just one UNIX-SECOND column
continue
}
var col dataframe.Column
col, err = data.sys[i].frame.Column(header)
if err != nil {
return err
}
// just keep rows from [min,maxUnixSecond]
if err = col.Keep(minTSIdx, maxTSIdx+1); err != nil {
return err
}
if header == "UNIX-SECOND" {
if err = data.sysAgg.AddColumn(col); err != nil {
return err
}
continue
}
switch header {
case "CPU-NUM":
header = "CPU"
case "VMRSS-NUM":
header = "VMRSS-MB"
// convert bytes to mb
colN := col.Count()
for rowIdx := 0; rowIdx < colN; rowIdx++ {
var rowV dataframe.Value
rowV, err = col.Value(rowIdx)
if err != nil {
return err
}
fv, _ := rowV.Float64()
frv := float64(fv) * 0.000001
if err = col.Set(rowIdx, dataframe.NewStringValue(fmt.Sprintf("%.2f", frv))); err != nil {
return err
}
}
case "EXTRA":
// dbtester uses linux-inspect 'EXTRA' column as 'CLIENT-NUM'
header = "CLIENT-NUM"
}
// since we are aggregating multiple system-metrics CSV files
// suffix header with the index
col.UpdateHeader(fmt.Sprintf("%s-%d", header, i+1))
if err = data.sysAgg.AddColumn(col); err != nil {
return err
}
}
}
return nil
}