From 59a718ed18ded66494723e6188e762fdb76bfcbb Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Tue, 15 Dec 2020 12:19:56 -0500 Subject: [PATCH] fix the deadlock in priority and fairness config controller Kubernetes-commit: 7114319b3e66e331250c116ac926f1a57352072f --- pkg/util/flowcontrol/apf_controller.go | 17 +++- pkg/util/flowcontrol/controller_test.go | 103 ++++++++++++++++++++++++ 2 files changed, 119 insertions(+), 1 deletion(-) diff --git a/pkg/util/flowcontrol/apf_controller.go b/pkg/util/flowcontrol/apf_controller.go index 713fb5f7f..1015b33d3 100644 --- a/pkg/util/flowcontrol/apf_controller.go +++ b/pkg/util/flowcontrol/apf_controller.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/labels" apitypes "k8s.io/apimachinery/pkg/types" apierrors "k8s.io/apimachinery/pkg/util/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" "k8s.io/apiserver/pkg/authentication/user" @@ -232,14 +233,28 @@ func (cfgCtlr *configController) updateObservations() { } } +// used from the unit tests only. +func (cfgCtlr *configController) getPriorityLevelState(plName string) *priorityLevelState { + cfgCtlr.lock.Lock() + defer cfgCtlr.lock.Unlock() + return cfgCtlr.priorityLevelStates[plName] +} + func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error { + defer utilruntime.HandleCrash() + + // Let the config worker stop when we are done defer cfgCtlr.configQueue.ShutDown() + klog.Info("Starting API Priority and Fairness config controller") if ok := cache.WaitForCacheSync(stopCh, cfgCtlr.plInformerSynced, cfgCtlr.fsInformerSynced); !ok { return fmt.Errorf("Never achieved initial sync") } + klog.Info("Running API Priority and Fairness config worker") - wait.Until(cfgCtlr.runWorker, time.Second, stopCh) + go wait.Until(cfgCtlr.runWorker, time.Second, stopCh) + + <-stopCh klog.Info("Shutting down API Priority and Fairness config worker") return nil } diff --git a/pkg/util/flowcontrol/controller_test.go b/pkg/util/flowcontrol/controller_test.go index a2eacf7d8..3e18e31f2 100644 --- a/pkg/util/flowcontrol/controller_test.go +++ b/pkg/util/flowcontrol/controller_test.go @@ -21,12 +21,15 @@ import ( "fmt" "math/rand" "os" + "reflect" "sync" "testing" "time" flowcontrol "k8s.io/api/flowcontrol/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" "k8s.io/apiserver/pkg/util/flowcontrol/debug" fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" @@ -307,6 +310,106 @@ func TestConfigConsumer(t *testing.T) { } } +func TestAPFControllerWithGracefulShutdown(t *testing.T) { + const plName = "test-ps" + fs := &flowcontrol.FlowSchema{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-fs", + }, + Spec: flowcontrol.FlowSchemaSpec{ + MatchingPrecedence: 100, + PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{ + Name: plName, + }, + DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{ + Type: flowcontrol.FlowDistinguisherMethodByUserType, + }, + }, + } + + pl := &flowcontrol.PriorityLevelConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: plName, + }, + Spec: flowcontrol.PriorityLevelConfigurationSpec{ + Type: flowcontrol.PriorityLevelEnablementLimited, + Limited: &flowcontrol.LimitedPriorityLevelConfiguration{ + AssuredConcurrencyShares: 10, + LimitResponse: flowcontrol.LimitResponse{ + Type: flowcontrol.LimitResponseTypeReject, + }, + }, + }, + } + + clientset := clientsetfake.NewSimpleClientset(fs, pl) + informerFactory := informers.NewSharedInformerFactory(clientset, time.Second) + flowcontrolClient := clientset.FlowcontrolV1beta1() + cts := &ctlrTestState{t: t, + fcIfc: flowcontrolClient, + existingFSs: map[string]*flowcontrol.FlowSchema{}, + existingPLs: map[string]*flowcontrol.PriorityLevelConfiguration{}, + heldRequestsMap: map[string][]heldRequest{}, + queues: map[string]*ctlrTestQueueSet{}, + } + controller := newTestableController(TestableConfig{ + informerFactory, + flowcontrolClient, + 100, + time.Minute, + metrics.PriorityLevelConcurrencyObserverPairGenerator, + cts, + }) + + stopCh, controllerCompletedCh := make(chan struct{}), make(chan struct{}) + var controllerErr error + + informerFactory.Start(stopCh) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + status := informerFactory.WaitForCacheSync(ctx.Done()) + if names := unsynced(status); len(names) > 0 { + t.Fatalf("WaitForCacheSync did not successfully complete, resources=%#v", names) + } + + go func() { + defer close(controllerCompletedCh) + controllerErr = controller.Run(stopCh) + }() + + // ensure that the controller has run its first loop. + err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) { + if controller.getPriorityLevelState(plName) == nil { + return false, nil + } + return true, nil + }) + if err != nil { + t.Errorf("expected the controller to reconcile the priority level configuration object: %s, error: %s", plName, err) + } + + close(stopCh) + t.Log("waiting for the controller Run function to shutdown gracefully") + <-controllerCompletedCh + + if controllerErr != nil { + t.Errorf("expected nil error from controller Run function, but got: %#v", controllerErr) + } +} + +func unsynced(status map[reflect.Type]bool) []string { + names := make([]string, 0) + + for objType, synced := range status { + if !synced { + names = append(names, objType.Name()) + } + } + + return names +} + func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTestingRecord, catchAlls map[bool]*flowcontrol.FlowSchema) { t := cts.t ctlr := cts.cfgCtlr