mirror of https://github.com/etcd-io/dbtester.git
control: implement rate-limiting
This commit is contained in:
parent
1cd6cf534a
commit
3ccd4b7a98
|
|
@ -27,6 +27,7 @@ import (
|
|||
"github.com/coreos/dbtester/remotestorage"
|
||||
"github.com/spf13/cobra"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/time/rate"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/cheggaaa/pb"
|
||||
|
|
@ -586,7 +587,16 @@ func newReadOneshotHandlers(cfg Config) []ReqHandler {
|
|||
func generateReads(cfg Config, key string, requests chan<- request) {
|
||||
defer close(requests)
|
||||
|
||||
var rateLimiter *rate.Limiter
|
||||
if cfg.Step2.RequestsPerSecond > 0 {
|
||||
rateLimiter = rate.NewLimiter(rate.Limit(cfg.Step2.RequestsPerSecond), cfg.Step2.RequestsPerSecond)
|
||||
}
|
||||
|
||||
for i := 0; i < cfg.Step2.TotalRequests; i++ {
|
||||
if rateLimiter != nil {
|
||||
rateLimiter.Wait(context.TODO())
|
||||
}
|
||||
|
||||
switch cfg.Database {
|
||||
case "etcdv2":
|
||||
// serializable read by default
|
||||
|
|
@ -613,18 +623,21 @@ func generateReads(cfg Config, key string, requests chan<- request) {
|
|||
}
|
||||
requests <- request{consulOp: op}
|
||||
}
|
||||
if cfg.Step2.RequestIntervalMs > 0 {
|
||||
time.Sleep(time.Duration(cfg.Step2.RequestIntervalMs) * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func generateWrites(cfg Config, vals values, requests chan<- request) {
|
||||
var rateLimiter *rate.Limiter
|
||||
if cfg.Step2.RequestsPerSecond > 0 {
|
||||
rateLimiter = rate.NewLimiter(rate.Limit(cfg.Step2.RequestsPerSecond), cfg.Step2.RequestsPerSecond)
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
defer func() {
|
||||
close(requests)
|
||||
wg.Wait()
|
||||
}()
|
||||
|
||||
for i := 0; i < cfg.Step2.TotalRequests; i++ {
|
||||
k := sequentialKey(cfg.Step2.KeySize, i)
|
||||
if cfg.Step2.SameKey {
|
||||
|
|
@ -634,6 +647,10 @@ func generateWrites(cfg Config, vals values, requests chan<- request) {
|
|||
v := vals.bytes[i%vals.sampleSize]
|
||||
vs := vals.strings[i%vals.sampleSize]
|
||||
|
||||
if rateLimiter != nil {
|
||||
rateLimiter.Wait(context.TODO())
|
||||
}
|
||||
|
||||
switch cfg.Database {
|
||||
case "etcdv2":
|
||||
requests <- request{etcdv2Op: etcdv2Op{key: k, value: vs}}
|
||||
|
|
@ -644,8 +661,5 @@ func generateWrites(cfg Config, vals values, requests chan<- request) {
|
|||
case "consul", "cetcd":
|
||||
requests <- request{consulOp: consulOp{key: k, value: v}}
|
||||
}
|
||||
if cfg.Step2.RequestIntervalMs > 0 {
|
||||
time.Sleep(time.Duration(cfg.Step2.RequestIntervalMs) * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue