diff --git a/pkg/util/flowcontrol/apf_controller.go b/pkg/util/flowcontrol/apf_controller.go index 8c34b2b06..3ee386745 100644 --- a/pkg/util/flowcontrol/apf_controller.go +++ b/pkg/util/flowcontrol/apf_controller.go @@ -23,6 +23,7 @@ import ( "encoding/json" "fmt" "math" + "math/rand" "sort" "sync" "time" @@ -34,8 +35,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" apitypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/clock" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" "k8s.io/apiserver/pkg/authentication/user" @@ -53,6 +56,8 @@ import ( flowcontrollister "k8s.io/client-go/listers/flowcontrol/v1beta1" ) +const timeFmt = "2006-01-02T15:04:05.999" + // This file contains a simple local (to the apiserver) controller // that digests API Priority and Fairness config objects (FlowSchema // and PriorityLevelConfiguration) into the data structure that the @@ -85,9 +90,19 @@ type RequestDigest struct { // this type and cfgMeal follow the convention that the suffix // "Locked" means that the caller must hold the configController lock. type configController struct { + name string // varies in tests of fighting controllers + clock clock.PassiveClock queueSetFactory fq.QueueSetFactory obsPairGenerator metrics.TimedObserverPairGenerator + // How this controller appears in an ObjectMeta ManagedFieldsEntry.Manager + asFieldManager string + + // Given a boolean indicating whether a FlowSchema's referenced + // PriorityLevelConfig exists, return a boolean indicating whether + // the reference is dangling + foundToDangling func(bool) bool + // configQueue holds `(interface{})(0)` when the configuration // objects need to be reprocessed. configQueue workqueue.RateLimitingInterface @@ -122,6 +137,18 @@ type configController struct { // name to the state for that level. Every name referenced from a // member of `flowSchemas` has an entry here. priorityLevelStates map[string]*priorityLevelState + + // the most recent update attempts, ordered by increasing age. + // Consumer trims to keep only the last minute's worth of entries. + // The controller uses this to limit itself to at most six updates + // to a given FlowSchema in any minute. + // This may only be accessed from the one and only worker goroutine. + mostRecentUpdates []updateAttempt +} + +type updateAttempt struct { + timeUpdated time.Time + updatedItems sets.String // FlowSchema names } // priorityLevelState holds the state specific to a priority level. @@ -154,14 +181,18 @@ type priorityLevelState struct { // NewTestableController is extra flexible to facilitate testing func newTestableController(config TestableConfig) *configController { cfgCtlr := &configController{ + name: config.Name, + clock: config.Clock, queueSetFactory: config.QueueSetFactory, obsPairGenerator: config.ObsPairGenerator, + asFieldManager: config.AsFieldManager, + foundToDangling: config.FoundToDangling, serverConcurrencyLimit: config.ServerConcurrencyLimit, requestWaitLimit: config.RequestWaitLimit, flowcontrolClient: config.FlowcontrolClient, priorityLevelStates: make(map[string]*priorityLevelState), } - klog.V(2).Infof("NewTestableController with serverConcurrencyLimit=%d, requestWaitLimit=%s", cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit) + klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, requestWaitLimit=%s, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit, cfgCtlr.name, cfgCtlr.asFieldManager) // Start with longish delay because conflicts will be between // different processes, so take some time to go away. cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue") @@ -177,40 +208,60 @@ func newTestableController(config TestableConfig) *configController { pli.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { pl := obj.(*flowcontrol.PriorityLevelConfiguration) - klog.V(7).Infof("Triggered API priority and fairness config reloading due to creation of PLC %s", pl.Name) + klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to creation of PLC %s", cfgCtlr.name, pl.Name) cfgCtlr.configQueue.Add(0) }, UpdateFunc: func(oldObj, newObj interface{}) { newPL := newObj.(*flowcontrol.PriorityLevelConfiguration) oldPL := oldObj.(*flowcontrol.PriorityLevelConfiguration) if !apiequality.Semantic.DeepEqual(oldPL.Spec, newPL.Spec) { - klog.V(7).Infof("Triggered API priority and fairness config reloading due to spec update of PLC %s", newPL.Name) + klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to spec update of PLC %s", cfgCtlr.name, newPL.Name) cfgCtlr.configQueue.Add(0) + } else { + klog.V(7).Infof("No trigger API priority and fairness config reloading in %s due to spec non-change of PLC %s", cfgCtlr.name, newPL.Name) } }, DeleteFunc: func(obj interface{}) { name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) - klog.V(7).Infof("Triggered API priority and fairness config reloading due to deletion of PLC %s", name) + klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to deletion of PLC %s", cfgCtlr.name, name) cfgCtlr.configQueue.Add(0) }}) fsi.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { fs := obj.(*flowcontrol.FlowSchema) - klog.V(7).Infof("Triggered API priority and fairness config reloading due to creation of FS %s", fs.Name) + klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to creation of FS %s", cfgCtlr.name, fs.Name) cfgCtlr.configQueue.Add(0) }, UpdateFunc: func(oldObj, newObj interface{}) { newFS := newObj.(*flowcontrol.FlowSchema) oldFS := oldObj.(*flowcontrol.FlowSchema) - if !apiequality.Semantic.DeepEqual(oldFS.Spec, newFS.Spec) { - klog.V(7).Infof("Triggered API priority and fairness config reloading due to spec update of FS %s", newFS.Name) + // Changes to either Spec or Status are relevant. The + // concern is that we might, in some future release, want + // different behavior than is implemented now. One of the + // hardest questions is how does an operator roll out the + // new release in a cluster with multiple kube-apiservers + // --- in a way that works no matter what servers crash + // and restart when. If this handler reacts only to + // changes in Spec then we have a scenario in which the + // rollout leaves the old Status in place. The scenario + // ends with this subsequence: deploy the last new server + // before deleting the last old server, and in between + // those two operations the last old server crashes and + // recovers. The chosen solution is making this controller + // insist on maintaining the particular state that it + // establishes. + if !(apiequality.Semantic.DeepEqual(oldFS.Spec, newFS.Spec) && + apiequality.Semantic.DeepEqual(oldFS.Status, newFS.Status)) { + klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to spec and/or status update of FS %s", cfgCtlr.name, newFS.Name) cfgCtlr.configQueue.Add(0) + } else { + klog.V(7).Infof("No trigger of API priority and fairness config reloading in %s due to spec and status non-change of FS %s", cfgCtlr.name, newFS.Name) } }, DeleteFunc: func(obj interface{}) { name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) - klog.V(7).Infof("Triggered API priority and fairness config reloading due to deletion of FS %s", name) + klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to deletion of FS %s", cfgCtlr.name, name) cfgCtlr.configQueue.Add(0) }}) @@ -253,11 +304,16 @@ func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error { return nil } +// runWorker is the logic of the one and only worker goroutine. We +// limit the number to one in order to obviate explicit +// synchronization around access to `cfgCtlr.mostRecentUpdates`. func (cfgCtlr *configController) runWorker() { for cfgCtlr.processNextWorkItem() { } } +// processNextWorkItem works on one entry from the work queue. +// Only invoke this in the one and only worker goroutine. func (cfgCtlr *configController) processNextWorkItem() bool { obj, shutdown := cfgCtlr.configQueue.Get() if shutdown { @@ -266,9 +322,14 @@ func (cfgCtlr *configController) processNextWorkItem() bool { func(obj interface{}) { defer cfgCtlr.configQueue.Done(obj) - if !cfgCtlr.syncOne() { + specificDelay, err := cfgCtlr.syncOne(map[string]string{}) + switch { + case err != nil: + klog.Error(err) cfgCtlr.configQueue.AddRateLimited(obj) - } else { + case specificDelay > 0: + cfgCtlr.configQueue.AddAfter(obj, specificDelay) + default: cfgCtlr.configQueue.Forget(obj) } }(obj) @@ -276,27 +337,22 @@ func (cfgCtlr *configController) processNextWorkItem() bool { return true } -// syncOne attempts to sync all the API Priority and Fairness config -// objects. It either succeeds and returns `true` or logs an error -// and returns `false`. -func (cfgCtlr *configController) syncOne() bool { +// syncOne does one full synchronization. It reads all the API +// objects that configure API Priority and Fairness and updates the +// local configController accordingly. +// Only invoke this in the one and only worker goroutine +func (cfgCtlr *configController) syncOne(flowSchemaRVs map[string]string) (specificDelay time.Duration, err error) { + klog.V(5).Infof("%s syncOne at %s", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt)) all := labels.Everything() newPLs, err := cfgCtlr.plLister.List(all) if err != nil { - klog.Errorf("Unable to list PriorityLevelConfiguration objects: %s", err.Error()) - return false + return 0, fmt.Errorf("unable to list PriorityLevelConfiguration objects: %w", err) } newFSs, err := cfgCtlr.fsLister.List(all) if err != nil { - klog.Errorf("Unable to list FlowSchema objects: %s", err.Error()) - return false + return 0, fmt.Errorf("unable to list FlowSchema objects: %w", err) } - err = cfgCtlr.digestConfigObjects(newPLs, newFSs) - if err == nil { - return true - } - klog.Error(err) - return false + return cfgCtlr.digestConfigObjects(newPLs, newFSs, flowSchemaRVs) } // cfgMeal is the data involved in the process of digesting the API @@ -336,35 +392,79 @@ type fsStatusUpdate struct { // digestConfigObjects is given all the API objects that configure // cfgCtlr and writes its consequent new configState. -func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema) error { +// Only invoke this in the one and only worker goroutine +func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema, flowSchemaRVs map[string]string) (time.Duration, error) { fsStatusUpdates := cfgCtlr.lockAndDigestConfigObjects(newPLs, newFSs) var errs []error + currResult := updateAttempt{ + timeUpdated: cfgCtlr.clock.Now(), + updatedItems: sets.String{}, + } + var suggestedDelay time.Duration for _, fsu := range fsStatusUpdates { + // if we should skip this name, indicate we will need a delay, but continue with other entries + if cfgCtlr.shouldDelayUpdate(fsu.flowSchema.Name) { + if suggestedDelay == 0 { + suggestedDelay = time.Duration(30+rand.Intn(45)) * time.Second + } + continue + } + + // if we are going to issue an update, be sure we track every name we update so we know if we update it too often. + currResult.updatedItems.Insert(fsu.flowSchema.Name) + enc, err := json.Marshal(fsu.condition) if err != nil { // should never happen because these conditions are created here and well formed panic(fmt.Sprintf("Failed to json.Marshall(%#+v): %s", fsu.condition, err.Error())) } - klog.V(4).Infof("Writing Condition %s to FlowSchema %s because its previous value was %s", string(enc), fsu.flowSchema.Name, fcfmt.Fmt(fsu.oldValue)) + klog.V(4).Infof("%s writing Condition %s to FlowSchema %s, which had ResourceVersion=%s, because its previous value was %s", cfgCtlr.name, string(enc), fsu.flowSchema.Name, fsu.flowSchema.ResourceVersion, fcfmt.Fmt(fsu.oldValue)) fsIfc := cfgCtlr.flowcontrolClient.FlowSchemas() patchBytes := []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc))) - patchOptions := metav1.PatchOptions{FieldManager: ConfigConsumerAsFieldManager} - _, err = fsIfc.Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, patchBytes, patchOptions, "status") + patchOptions := metav1.PatchOptions{FieldManager: cfgCtlr.asFieldManager} + patchedFlowSchema, err := fsIfc.Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, patchBytes, patchOptions, "status") if err == nil { - continue - } - if apierrors.IsNotFound(err) { + key, _ := cache.MetaNamespaceKeyFunc(patchedFlowSchema) + flowSchemaRVs[key] = patchedFlowSchema.ResourceVersion + } else if apierrors.IsNotFound(err) { // This object has been deleted. A notification is coming // and nothing more needs to be done here. - klog.V(5).Infof("Attempted update of concurrently deleted FlowSchema %s; nothing more needs to be done", fsu.flowSchema.Name) + klog.V(5).Infof("%s at %s: attempted update of concurrently deleted FlowSchema %s; nothing more needs to be done", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt), fsu.flowSchema.Name) } else { errs = append(errs, errors.Wrap(err, fmt.Sprintf("failed to set a status.condition for FlowSchema %s", fsu.flowSchema.Name))) } } - if len(errs) == 0 { - return nil + cfgCtlr.addUpdateResult(currResult) + + return suggestedDelay, utilerrors.NewAggregate(errs) +} + +// shouldDelayUpdate checks to see if a flowschema has been updated too often and returns true if a delay is needed. +// Only invoke this in the one and only worker goroutine +func (cfgCtlr *configController) shouldDelayUpdate(flowSchemaName string) bool { + numUpdatesInPastMinute := 0 + oneMinuteAgo := cfgCtlr.clock.Now().Add(-1 * time.Minute) + for idx, update := range cfgCtlr.mostRecentUpdates { + if oneMinuteAgo.After(update.timeUpdated) { + // this and the remaining items are no longer relevant + cfgCtlr.mostRecentUpdates = cfgCtlr.mostRecentUpdates[:idx] + return false + } + if update.updatedItems.Has(flowSchemaName) { + numUpdatesInPastMinute++ + if numUpdatesInPastMinute > 5 { + return true + } + } } - return utilerrors.NewAggregate(errs) + return false +} + +// addUpdateResult adds the result. It isn't a ring buffer because +// this is small and rate limited. +// Only invoke this in the one and only worker goroutine +func (cfgCtlr *configController) addUpdateResult(result updateAttempt) { + cfgCtlr.mostRecentUpdates = append([]updateAttempt{result}, cfgCtlr.mostRecentUpdates...) } func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema) []fsStatusUpdate { @@ -448,7 +548,7 @@ func (meal *cfgMeal) digestFlowSchemasLocked(newFSs []*flowcontrol.FlowSchema) { // // TODO: consider not even trying if server is not handling // requests yet. - meal.presyncFlowSchemaStatus(fs, !goodPriorityRef, fs.Spec.PriorityLevelConfiguration.Name) + meal.presyncFlowSchemaStatus(fs, meal.cfgCtlr.foundToDangling(goodPriorityRef), fs.Spec.PriorityLevelConfiguration.Name) if !goodPriorityRef { klog.V(6).Infof("Ignoring FlowSchema %s because of bad priority level reference %q", fs.Name, fs.Spec.PriorityLevelConfiguration.Name) @@ -612,12 +712,13 @@ func (meal *cfgMeal) presyncFlowSchemaStatus(fs *flowcontrol.FlowSchema, isDangl if danglingCondition.Status == desiredStatus && danglingCondition.Reason == desiredReason && danglingCondition.Message == desiredMessage { return } + now := meal.cfgCtlr.clock.Now() meal.fsStatusUpdates = append(meal.fsStatusUpdates, fsStatusUpdate{ flowSchema: fs, condition: flowcontrol.FlowSchemaCondition{ Type: flowcontrol.FlowSchemaConditionDangling, Status: desiredStatus, - LastTransitionTime: metav1.Now(), + LastTransitionTime: metav1.NewTime(now), Reason: desiredReason, Message: desiredMessage, }, @@ -710,7 +811,9 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig return selectedFlowSchema, plState.pl, false, req, startWaitingTime } -// Call this after getting a clue that the given priority level is undesired and idle +// maybeReap will remove the last internal traces of the named +// priority level if it has no more use. Call this after getting a +// clue that the given priority level is undesired and idle. func (cfgCtlr *configController) maybeReap(plName string) { cfgCtlr.lock.Lock() defer cfgCtlr.lock.Unlock() @@ -730,8 +833,11 @@ func (cfgCtlr *configController) maybeReap(plName string) { cfgCtlr.configQueue.Add(0) } -// Call this if both (1) plState.queues is non-nil and reported being -// idle, and (2) cfgCtlr's lock has not been released since then. +// maybeReapLocked requires the cfgCtlr's lock to already be held and +// will remove the last internal traces of the named priority level if +// it has no more use. Call this if both (1) plState.queues is +// non-nil and reported being idle, and (2) cfgCtlr's lock has not +// been released since then. func (cfgCtlr *configController) maybeReapLocked(plName string, plState *priorityLevelState) { if !(plState.quiescing && plState.numPending == 0) { return diff --git a/pkg/util/flowcontrol/apf_filter.go b/pkg/util/flowcontrol/apf_filter.go index e9564d4d5..825ae09ce 100644 --- a/pkg/util/flowcontrol/apf_filter.go +++ b/pkg/util/flowcontrol/apf_filter.go @@ -78,18 +78,43 @@ func New( requestWaitLimit time.Duration, ) Interface { grc := counter.NoOp{} + clk := clock.RealClock{} return NewTestable(TestableConfig{ + Name: "Controller", + Clock: clk, + AsFieldManager: ConfigConsumerAsFieldManager, + FoundToDangling: func(found bool) bool { return !found }, InformerFactory: informerFactory, FlowcontrolClient: flowcontrolClient, ServerConcurrencyLimit: serverConcurrencyLimit, RequestWaitLimit: requestWaitLimit, ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, - QueueSetFactory: fqs.NewQueueSetFactory(&clock.RealClock{}, grc), + QueueSetFactory: fqs.NewQueueSetFactory(clk, grc), }) } // TestableConfig carries the parameters to an implementation that is testable type TestableConfig struct { + // Name of the controller + Name string + + // Clock to use in timing deliberate delays + Clock clock.PassiveClock + + // AsFieldManager is the string to use in the metadata for + // server-side apply. Normally this is + // `ConfigConsumerAsFieldManager`. This is exposed as a parameter + // so that a test of competing controllers can supply different + // values. + AsFieldManager string + + // FoundToDangling maps the boolean indicating whether a + // FlowSchema's referenced PLC exists to the boolean indicating + // that FlowSchema's status should indicate a dangling reference. + // This is a parameter so that we can write tests of what happens + // when servers disagree on that bit of Status. + FoundToDangling func(bool) bool + // InformerFactory to use in building the controller InformerFactory kubeinformers.SharedInformerFactory diff --git a/pkg/util/flowcontrol/controller_test.go b/pkg/util/flowcontrol/controller_test.go index 985728ae7..92f820916 100644 --- a/pkg/util/flowcontrol/controller_test.go +++ b/pkg/util/flowcontrol/controller_test.go @@ -28,6 +28,7 @@ import ( flowcontrol "k8s.io/api/flowcontrol/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" @@ -251,10 +252,14 @@ func TestConfigConsumer(t *testing.T) { queues: map[string]*ctlrTestQueueSet{}, } ctlr := newTestableController(TestableConfig{ + Name: "Controller", + Clock: clock.RealClock{}, + AsFieldManager: ConfigConsumerAsFieldManager, + FoundToDangling: func(found bool) bool { return !found }, InformerFactory: informerFactory, FlowcontrolClient: flowcontrolClient, - ServerConcurrencyLimit: 100, - RequestWaitLimit: time.Minute, + ServerConcurrencyLimit: 100, // server concurrency limit + RequestWaitLimit: time.Minute, // request wait limit ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, QueueSetFactory: cts, }) @@ -378,12 +383,16 @@ func TestAPFControllerWithGracefulShutdown(t *testing.T) { queues: map[string]*ctlrTestQueueSet{}, } controller := newTestableController(TestableConfig{ - informerFactory, - flowcontrolClient, - 100, - time.Minute, - metrics.PriorityLevelConcurrencyObserverPairGenerator, - cts, + Name: "Controller", + Clock: clock.RealClock{}, + AsFieldManager: ConfigConsumerAsFieldManager, + FoundToDangling: func(found bool) bool { return !found }, + InformerFactory: informerFactory, + FlowcontrolClient: flowcontrolClient, + ServerConcurrencyLimit: 100, + RequestWaitLimit: time.Minute, + ObsPairGenerator: metrics.PriorityLevelConcurrencyObserverPairGenerator, + QueueSetFactory: cts, }) stopCh, controllerCompletedCh := make(chan struct{}), make(chan struct{})