control: separate files, remove TODO

This commit is contained in:
Gyu-Ho Lee 2017-02-02 10:01:16 -08:00
parent cb08d56c03
commit 7adc715ee3
3 changed files with 95 additions and 73 deletions

View File

@ -17,7 +17,6 @@ package control
import (
"fmt"
"math"
"sort"
"strings"
"sync"
"time"
@ -292,9 +291,10 @@ func saveDataLatencyDistributionAll(cfg Config, st report.Stats) {
}
func saveDataLatencyThroughputTimeseries(cfg Config, st report.Stats, tsToClientN map[int64]int) {
// TODO: UNIX-TS from pkg/report data is time.Time.Unix
// UNIX-TS from pkg/report data is time.Time.Unix
// UNIX-TS from psn.CSV data is time.Time.UnixNano
// we need some kind of way to combine those with matching timestamps
// SO keep UNIX-TS as time.Time.Unix, and convert psn.CSV data to time.Time.Unix
c1 := dataframe.NewColumn("UNIX-TS")
c2 := dataframe.NewColumn("CONTROL-CLIENT-NUM")
c3 := dataframe.NewColumn("AVG-LATENCY-MS")
@ -370,74 +370,3 @@ func saveAllStats(cfg Config, stats report.Stats, tsToClientN map[int64]int) {
// cfg.DataLatencyThroughputTimeseries
saveDataLatencyThroughputTimeseries(cfg, stats, tsToClientN)
}
// processTimeSeries sorts all data points by its timestamp.
// And then aggregate by the cumulative throughput,
// in order to map the number of keys to the average latency.
//
// type DataPoint struct {
// Timestamp int64
// AvgLatency time.Duration
// ThroughPut int64
// }
//
// If unis is 1000 and the average throughput per second is 30,000
// and its average latency is 10ms, it will have 30 data points with
// latency 10ms.
func processTimeSeries(tss report.TimeSeries, unit int64, totalRequests int) keyNumToAvgLatencys {
sort.Sort(tss)
cumulKeyN := int64(0)
maxKey := int64(0)
rm := make(map[int64]time.Duration)
// this data is aggregated by second
// and we want to map number of keys to latency
// so the range is the key
// and the value is the cumulative throughput
for _, ts := range tss {
cumulKeyN += ts.ThroughPut
if cumulKeyN < unit {
// not enough data points yet
continue
}
lat := ts.AvgLatency
// cumulKeyN >= unit
for cumulKeyN > maxKey {
maxKey += unit
rm[maxKey] = lat
}
}
// fill-in empty rows
for i := maxKey; i < int64(totalRequests); i += unit {
if _, ok := rm[i]; !ok {
rm[i] = time.Duration(0)
}
}
if _, ok := rm[int64(totalRequests)]; !ok {
rm[int64(totalRequests)] = time.Duration(0)
}
kss := []keyNumToAvgLatency{}
for k, v := range rm {
kss = append(kss, keyNumToAvgLatency{keyNum: k, avgLat: v})
}
sort.Sort(keyNumToAvgLatencys(kss))
return kss
}
type keyNumToAvgLatency struct {
keyNum int64
avgLat time.Duration
}
type keyNumToAvgLatencys []keyNumToAvgLatency
func (t keyNumToAvgLatencys) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t keyNumToAvgLatencys) Len() int { return len(t) }
func (t keyNumToAvgLatencys) Less(i, j int) bool { return t[i].keyNum < t[j].keyNum }

View File

@ -0,0 +1,93 @@
// Copyright 2017 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package control
import (
"sort"
"time"
"github.com/coreos/etcd/pkg/report"
)
// processTimeSeries sorts all data points by its timestamp.
// And then aggregate by the cumulative throughput,
// in order to map the number of keys to the average latency.
//
// type DataPoint struct {
// Timestamp int64
// AvgLatency time.Duration
// ThroughPut int64
// }
//
// If unis is 1000 and the average throughput per second is 30,000
// and its average latency is 10ms, it will have 30 data points with
// latency 10ms.
func processTimeSeries(tss report.TimeSeries, unit int64, totalRequests int) keyNumToAvgLatencys {
sort.Sort(tss)
cumulKeyN := int64(0)
maxKey := int64(0)
rm := make(map[int64]time.Duration)
// this data is aggregated by second
// and we want to map number of keys to latency
// so the range is the key
// and the value is the cumulative throughput
for _, ts := range tss {
cumulKeyN += ts.ThroughPut
if cumulKeyN < unit {
// not enough data points yet
continue
}
lat := ts.AvgLatency
// cumulKeyN >= unit
for cumulKeyN > maxKey {
maxKey += unit
rm[maxKey] = lat
}
}
// fill-in empty rows
for i := maxKey; i < int64(totalRequests); i += unit {
if _, ok := rm[i]; !ok {
rm[i] = time.Duration(0)
}
}
if _, ok := rm[int64(totalRequests)]; !ok {
rm[int64(totalRequests)] = time.Duration(0)
}
kss := []keyNumToAvgLatency{}
for k, v := range rm {
kss = append(kss, keyNumToAvgLatency{keyNum: k, avgLat: v})
}
sort.Sort(keyNumToAvgLatencys(kss))
return kss
}
type keyNumToAvgLatency struct {
keyNum int64
avgLat time.Duration
}
type keyNumToAvgLatencys []keyNumToAvgLatency
func (t keyNumToAvgLatencys) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t keyNumToAvgLatencys) Len() int { return len(t) }
func (t keyNumToAvgLatencys) Less(i, j int) bool { return t[i].keyNum < t[j].keyNum }