Add throttling to the batching audit webhook

Signed-off-by: Mik Vyatskov <vmik@google.com>

Kubernetes-commit: 6bce120a11782caad7ea477aaaafe3ba31f797d1
This commit is contained in:
Mik Vyatskov 2017-10-05 23:19:45 +02:00 committed by Kubernetes Publisher
parent bddf432ba6
commit 29522c33dc
3 changed files with 283 additions and 238 deletions

508
Godeps/Godeps.json generated

File diff suppressed because it is too large Load Diff

View File

@ -42,6 +42,7 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/audit:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
],
)

View File

@ -35,6 +35,7 @@ import (
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/util/webhook"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
)
const (
@ -63,6 +64,9 @@ const (
defaultBatchMaxSize = 400 // Only send up to 400 events at a time.
defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute.
defaultInitialBackoff = 10 * time.Second // Wait at least 10 seconds before retrying.
defaultBatchThrottleQPS = 10 // Limit the send rate by 10 QPS.
defaultBatchThrottleBurst = 15 // Allow up to 15 QPS burst.
)
// The plugin name reported in error metrics.
@ -154,6 +158,7 @@ func newBatchWebhook(configFile string, groupVersion schema.GroupVersion) (*batc
maxBatchSize: defaultBatchMaxSize,
maxBatchWait: defaultBatchMaxWait,
shutdownCh: make(chan struct{}),
throttle: flowcontrol.NewTokenBucketRateLimiter(defaultBatchThrottleQPS, defaultBatchThrottleBurst),
}, nil
}
@ -181,6 +186,9 @@ type batchBackend struct {
// all requests have been completed and no new will be spawned, since the
// sending routine is not running anymore.
reqMutex sync.RWMutex
// Limits the number of requests sent to the backend per second.
throttle flowcontrol.RateLimiter
}
func (b *batchBackend) Run(stopCh <-chan struct{}) error {
@ -306,6 +314,10 @@ func (b *batchBackend) sendBatchEvents(events []auditinternal.Event) {
list := auditinternal.EventList{Items: events}
if b.throttle != nil {
b.throttle.Accept()
}
// Locking reqMutex for read will guarantee that the shutdown process will
// block until the goroutine started below is finished. At the same time, it
// will not prevent other batches from being proceed further this point.