Make object transformation concurrent to remove watch cache scalability issue for conversion webhook
Test by enabling consistent list from cache in storage version migrator stress test that uses conversion webhook that bottlenects events comming to watch cache. Set concurrency to 10, based on maximum/average transform latency when running stress test. In my testing max was about 60-100ms, while average was 6-10ms. Kubernetes-commit: bb686f203308481bcd7808f767171cdef27e12a0
This commit is contained in:
parent
9aa7a6ac61
commit
3adae5fd46
|
@ -46,8 +46,9 @@ import (
|
|||
|
||||
const (
|
||||
// We have set a buffer in order to reduce times of context switches.
|
||||
incomingBufSize = 100
|
||||
outgoingBufSize = 100
|
||||
incomingBufSize = 100
|
||||
outgoingBufSize = 100
|
||||
processEventConcurrency = 10
|
||||
)
|
||||
|
||||
// defaultWatcherMaxLimit is used to facilitate construction tests
|
||||
|
@ -230,8 +231,7 @@ func (wc *watchChan) run(initialEventsEndBookmarkRequired, forceInitialEvents bo
|
|||
go wc.startWatching(watchClosedCh, initialEventsEndBookmarkRequired, forceInitialEvents)
|
||||
|
||||
var resultChanWG sync.WaitGroup
|
||||
resultChanWG.Add(1)
|
||||
go wc.processEvent(&resultChanWG)
|
||||
wc.processEvents(&resultChanWG)
|
||||
|
||||
select {
|
||||
case err := <-wc.errChan:
|
||||
|
@ -424,10 +424,17 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEnd
|
|||
close(watchClosedCh)
|
||||
}
|
||||
|
||||
// processEvent processes events from etcd watcher and sends results to resultChan.
|
||||
func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
|
||||
// processEvents processes events from etcd watcher and sends results to resultChan.
|
||||
func (wc *watchChan) processEvents(wg *sync.WaitGroup) {
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ConcurrentWatchObjectDecode) {
|
||||
wc.concurrentProcessEvents(wg)
|
||||
} else {
|
||||
wg.Add(1)
|
||||
go wc.serialProcessEvents(wg)
|
||||
}
|
||||
}
|
||||
func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case e := <-wc.incomingEventChan:
|
||||
|
@ -435,7 +442,7 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
|
|||
if res == nil {
|
||||
continue
|
||||
}
|
||||
if len(wc.resultChan) == outgoingBufSize {
|
||||
if len(wc.resultChan) == cap(wc.resultChan) {
|
||||
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource)
|
||||
}
|
||||
// If user couldn't receive results fast enough, we also block incoming events from watcher.
|
||||
|
@ -452,6 +459,95 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
|
|||
}
|
||||
}
|
||||
|
||||
func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) {
|
||||
p := concurrentOrderedEventProcessing{
|
||||
input: wc.incomingEventChan,
|
||||
processFunc: wc.transform,
|
||||
output: wc.resultChan,
|
||||
processingQueue: make(chan chan *watch.Event, processEventConcurrency-1),
|
||||
|
||||
objectType: wc.watcher.objectType,
|
||||
groupResource: wc.watcher.groupResource,
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
p.scheduleEventProcessing(wc.ctx, wg)
|
||||
}()
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
p.collectEventProcessing(wc.ctx)
|
||||
}()
|
||||
}
|
||||
|
||||
type concurrentOrderedEventProcessing struct {
|
||||
input chan *event
|
||||
processFunc func(*event) *watch.Event
|
||||
output chan watch.Event
|
||||
|
||||
processingQueue chan chan *watch.Event
|
||||
// Metadata for logging
|
||||
objectType string
|
||||
groupResource schema.GroupResource
|
||||
}
|
||||
|
||||
func (p *concurrentOrderedEventProcessing) scheduleEventProcessing(ctx context.Context, wg *sync.WaitGroup) {
|
||||
var e *event
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case e = <-p.input:
|
||||
}
|
||||
processingResponse := make(chan *watch.Event, 1)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case p.processingQueue <- processingResponse:
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(e *event, response chan<- *watch.Event) {
|
||||
defer wg.Done()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case response <- p.processFunc(e):
|
||||
}
|
||||
}(e, processingResponse)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Context) {
|
||||
var processingResponse chan *watch.Event
|
||||
var e *watch.Event
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case processingResponse = <-p.processingQueue:
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case e = <-processingResponse:
|
||||
}
|
||||
if e == nil {
|
||||
continue
|
||||
}
|
||||
if len(p.output) == cap(p.output) {
|
||||
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.objectType, "groupResource", p.groupResource)
|
||||
}
|
||||
// If user couldn't receive results fast enough, we also block incoming events from watcher.
|
||||
// Because storing events in local will cause more memory usage.
|
||||
// The worst case would be closing the fast watcher.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case p.output <- *e:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (wc *watchChan) filter(obj runtime.Object) bool {
|
||||
if wc.internalPred.Empty() {
|
||||
return true
|
||||
|
|
|
@ -133,6 +133,12 @@ func TestEtcdWatchSemantics(t *testing.T) {
|
|||
storagetesting.RunWatchSemantics(ctx, t, store)
|
||||
}
|
||||
|
||||
func TestEtcdWatchSemanticsWithConcurrentDecode(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConcurrentWatchObjectDecode, true)
|
||||
ctx, store, _ := testSetup(t)
|
||||
storagetesting.RunWatchSemantics(ctx, t, store)
|
||||
}
|
||||
|
||||
func TestEtcdWatchSemanticInitialEventsExtended(t *testing.T) {
|
||||
ctx, store, _ := testSetup(t)
|
||||
storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store)
|
||||
|
|
Loading…
Reference in New Issue