control: use connections, clients range

This commit is contained in:
Gyu-Ho Lee 2017-01-18 11:23:31 -08:00
parent cbfdb970e0
commit d4eae5cb74
No known key found for this signature in database
GPG Key ID: 1DDD39C7EB70C24C
6 changed files with 120 additions and 86 deletions

View File

@ -56,18 +56,17 @@ type Config struct {
} `yaml:"step1"`
Step2 struct {
SkipStressDatabase bool `yaml:"skip_stress_database"`
BenchType string `yaml:"bench_type"`
StaleRead bool `yaml:"stale_read"`
Connections int `yaml:"connections"`
Clients int `yaml:"clients"`
ConnectionsClientsDelta int `yaml:"connections_clients_delta"`
ConnectionsClientsMax int `yaml:"connections_clients_max"`
KeySize int `yaml:"key_size"`
SameKey bool `yaml:"same_key"`
ValueSize int `yaml:"value_size"`
TotalRequests int `yaml:"total_requests"`
RequestsPerSecond int `yaml:"requests_per_second"`
SkipStressDatabase bool `yaml:"skip_stress_database"`
BenchType string `yaml:"bench_type"`
StaleRead bool `yaml:"stale_read"`
Connections int `yaml:"connections"`
Clients int `yaml:"clients"`
ConnectionsClients []int `yaml:"connections_clients"`
KeySize int `yaml:"key_size"`
SameKey bool `yaml:"same_key"`
ValueSize int `yaml:"value_size"`
TotalRequests int `yaml:"total_requests"`
RequestsPerSecond int `yaml:"requests_per_second"`
} `yaml:"step2"`
Step3 struct {

View File

@ -87,14 +87,14 @@ func TestReadConfig(t *testing.T) {
if c.Step2.BenchType != "write" {
t.Fatalf("unexpected %s", c.Step2.BenchType)
}
if c.Step2.Clients != 100 {
if c.Step2.Clients != 1 {
t.Fatalf("unexpected %d", c.Step2.Clients)
}
if c.Step2.ConnectionsClientsDelta != 100 {
t.Fatalf("unexpected %d", c.Step2.ConnectionsClientsDelta)
if c.Step2.Connections != 1 {
t.Fatalf("unexpected %d", c.Step2.Connections)
}
if c.Step2.ConnectionsClientsMax != 2000 {
t.Fatalf("unexpected %d", c.Step2.ConnectionsClientsMax)
if !reflect.DeepEqual(c.Step2.ConnectionsClients, []int{1, 10, 50, 100, 300, 500, 700, 1000, 1500, 2000, 2500}) {
t.Fatalf("unexpected %d", c.Step2.ConnectionsClients)
}
if c.Step2.KeySize != 8 {
t.Fatalf("unexpected %d", c.Step2.KeySize)
@ -102,13 +102,10 @@ func TestReadConfig(t *testing.T) {
if !c.Step2.SameKey {
t.Fatalf("unexpected %v", c.Step2.SameKey)
}
if c.Step2.Connections != 100 {
t.Fatalf("unexpected %d", c.Step2.Connections)
}
if !c.Step2.StaleRead {
t.Fatalf("unexpected %v", c.Step2.StaleRead)
}
if c.Step2.TotalRequests != 2000000 {
if c.Step2.TotalRequests != 3000000 {
t.Fatalf("unexpected %d", c.Step2.TotalRequests)
}
if c.Step2.RequestsPerSecond != 100 {

View File

@ -32,14 +32,24 @@ step2:
skip_stress_database: false
bench_type: write
stale_read: true
connections: 100
clients: 100
connections_clients_delta: 100
connections_clients_max: 2000 # if not 0, increase clients per 100K requests (only for writes)
connections: 1
clients: 1
connections_clients: # if specified, connections, clients are overwritten
- 1
- 10
- 50
- 100
- 300
- 500
- 700
- 1000
- 1500
- 2000
- 2500
key_size: 8
same_key: true
value_size: 256
total_requests: 2000000
total_requests: 3000000
requests_per_second: 100
# after benchmark

View File

@ -46,8 +46,6 @@ func newValues(cfg Config) (v values, rerr error) {
}
type benchmark struct {
cfg Config
bar *pb.ProgressBar
report report.Report
reportDone <-chan report.Stats
@ -63,16 +61,15 @@ type benchmark struct {
}
// pass totalN in case that 'cfg' is manipulated
func newBenchmark(totalN int, cfg Config, reqHandlers []ReqHandler, reqDone func(), reqGen func(chan<- request)) (b *benchmark) {
func newBenchmark(totalN int, clientsN int, reqHandlers []ReqHandler, reqDone func(), reqGen func(chan<- request)) (b *benchmark) {
b = &benchmark{
cfg: cfg,
bar: pb.New(totalN),
reqHandlers: reqHandlers,
reqGen: reqGen,
reqDone: reqDone,
wg: sync.WaitGroup{},
}
b.inflightReqs = make(chan request, b.cfg.Step2.Clients)
b.inflightReqs = make(chan request, clientsN)
b.bar.Format("Bom !")
b.bar.Start()
@ -159,21 +156,6 @@ func printStats(st report.Stats) {
}
}
// func saveDataLatencyAll(cfg Config, st report.Stats) {
// fr := dataframe.New()
// c1 := dataframe.NewColumn("LATENCY-MS")
// // latencies are sorted in ascending order in seconds (from etcd)
// for _, lat := range st.Lats {
// c1.PushBack(dataframe.NewStringValue(fmt.Sprintf("%4.4f", 1000*lat)))
// }
// if err := fr.AddColumn(c1); err != nil {
// plog.Fatal(err)
// }
// if err := fr.CSV(cfg.DataLatencyAll); err != nil {
// plog.Fatal(err)
// }
// }
func saveDataLatencyDistributionSummary(cfg Config, st report.Stats) {
fr := dataframe.New()
@ -339,7 +321,7 @@ func saveDataLatencyThroughputTimeseries(cfg Config, st report.Stats) {
}
func generateReport(cfg Config, h []ReqHandler, reqDone func(), reqGen func(chan<- request)) {
b := newBenchmark(cfg.Step2.TotalRequests, cfg, h, reqDone, reqGen)
b := newBenchmark(cfg.Step2.TotalRequests, cfg.Step2.Clients, h, reqDone, reqGen)
b.startRequests()
b.waitAll()
@ -348,9 +330,6 @@ func generateReport(cfg Config, h []ReqHandler, reqDone func(), reqGen func(chan
}
func saveAllStats(cfg Config, stats report.Stats) {
// cfg.DataLatencyAll
// saveDataLatencyAll(cfg, stats)
// cfg.DataLatencyDistributionSummary
saveDataLatencyDistributionSummary(cfg, stats)
@ -373,53 +352,59 @@ func step2StressDatabase(cfg Config) error {
switch cfg.Step2.BenchType {
case "write":
plog.Println("write generateReport is started...")
if cfg.Step2.ConnectionsClientsMax == 0 {
// fixed number of clients,connections
if len(cfg.Step2.ConnectionsClients) == 0 {
h, done := newWriteHandlers(cfg)
reqGen := func(inflightReqs chan<- request) { generateWrites(cfg, 0, vals, inflightReqs) }
generateReport(cfg, h, done, reqGen)
} else {
// need client number increase
// TODO: currently, request range is 100000 (fixed)
// e.g. 2M requests, starts with clients 100, range 100K
// at 2M requests point, there will be 2K clients (20 * 100)
if cfg.Step2.Connections != cfg.Step2.Clients {
plog.Panicf("expected same connections %d != clients %d", cfg.Step2.Connections, cfg.Step2.Clients)
}
// out of clients,connections ranges,
// we expect the last number to be the peak
// therefore, we give more requests to the last config
rs := assignRequest(cfg.Step2.ConnectionsClients, cfg.Step2.TotalRequests)
// 1st request
copied := cfg
copied.Step2.TotalRequests = 100000
copied.Step2.Connections = cfg.Step2.ConnectionsClients[0]
copied.Step2.Clients = cfg.Step2.ConnectionsClients[0]
copied.Step2.TotalRequests = rs[0]
plog.Infof("signaling agent with client number %d", copied.Step2.Clients)
if err := bcastReq(copied, agentpb.Request_Heartbeat); err != nil {
return err
}
h, done := newWriteHandlers(copied)
reqGen := func(inflightReqs chan<- request) { generateWrites(copied, 0, vals, inflightReqs) }
b := newBenchmark(cfg.Step2.TotalRequests, copied, h, done, reqGen)
b := newBenchmark(cfg.Step2.TotalRequests, cfg.Step2.ConnectionsClients[0], h, done, reqGen)
reqCompleted := 0
for reqCompleted < cfg.Step2.TotalRequests {
plog.Infof("signaling agent on client number %d", copied.Step2.Clients)
// signal agent on the client number
if err := bcastReq(copied, agentpb.Request_Heartbeat); err != nil {
// wait until 1st requests are finished
// do not end reports yet
b.startRequests()
b.waitRequestsEnd()
// from 2nd request
reqCompleted := copied.Step2.TotalRequests
for i := 1; i < len(rs); i++ {
copied2 := cfg
copied2.Step2.Connections = cfg.Step2.ConnectionsClients[i]
copied2.Step2.Clients = cfg.Step2.ConnectionsClients[i]
copied2.Step2.TotalRequests = rs[i]
plog.Infof("signaling agent with client number %d", copied2.Step2.Clients)
if err := bcastReq(copied2, agentpb.Request_Heartbeat); err != nil {
return err
}
h, done = newWriteHandlers(copied2)
reqGen = func(inflightReqs chan<- request) { generateWrites(copied2, reqCompleted, vals, inflightReqs) }
b.reset(copied2.Step2.Clients, h, done, reqGen)
plog.Infof("updated client number %d", copied2.Step2.Clients)
// generate request
// wait until rs[i] requests are finished
// do not end reports yet
b.startRequests()
// wait until 100000 requests are finished
// do not finish reports yet
b.waitRequestsEnd()
// update request handlers, generator
copied.Step2.Connections += copied.Step2.ConnectionsClientsDelta
copied.Step2.Clients += copied.Step2.ConnectionsClientsDelta
if copied.Step2.Clients > copied.Step2.ConnectionsClientsMax {
copied.Step2.Connections = copied.Step2.ConnectionsClientsMax
copied.Step2.Clients = copied.Step2.ConnectionsClientsMax
}
h, done = newWriteHandlers(copied)
reqCompleted += 100000
reqGen = func(inflightReqs chan<- request) { generateWrites(copied, reqCompleted, vals, inflightReqs) }
b.reset(copied.Step2.Clients, h, done, reqGen)
plog.Infof("updated client number %d", copied.Step2.Clients)
// after one range of requests are finished
reqCompleted += rs[i]
}
// finish reports
@ -428,6 +413,7 @@ func step2StressDatabase(cfg Config) error {
printStats(b.stats)
saveAllStats(cfg, b.stats)
}
plog.Println("write generateReport is finished...")
plog.Println("checking total keys on", cfg.DatabaseEndpoints)
@ -803,3 +789,18 @@ func generateWrites(cfg Config, startIdx int, vals values, inflightReqs chan<- r
}
}
}
func assignRequest(ranges []int, total int) (rs []int) {
reqEach := total / (len(ranges) + 1)
curSum := 0
rs = make([]int, len(ranges))
for i := range ranges {
if i < len(ranges)-1 {
rs[i] = reqEach
curSum += reqEach
} else {
rs[i] = total - curSum
}
}
return
}

View File

@ -0,0 +1,30 @@
// Copyright 2017 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package control
import (
"reflect"
"testing"
)
func Test_assignRequest(t *testing.T) {
ranges := []int{1, 10, 50, 100, 300, 500, 700, 1000, 1500, 2000, 2500}
total := 3000000
rs := assignRequest(ranges, total)
expected := []int{250000, 250000, 250000, 250000, 250000, 250000, 250000, 250000, 250000, 250000, 500000}
if !reflect.DeepEqual(rs, expected) {
t.Fatalf("expected %+v, got %+v", expected, rs)
}
}

View File

@ -27,9 +27,6 @@ func step4UploadLogs(cfg Config) error {
if err := uploadToGoogle(cfg.Log, cfg); err != nil {
return err
}
// if err := uploadToGoogle(cfg.DataLatencyAll, cfg); err != nil {
// return err
// }
if err := uploadToGoogle(cfg.DataLatencyDistributionSummary, cfg); err != nil {
return err
}