fix the deadlock in priority and fairness config controller

Kubernetes-commit: 7114319b3e66e331250c116ac926f1a57352072f
This commit is contained in:
Abu Kashem 2020-12-15 12:19:56 -05:00 committed by Kubernetes Publisher
parent 9f65780e02
commit 59a718ed18
2 changed files with 119 additions and 1 deletions

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
apitypes "k8s.io/apimachinery/pkg/types"
apierrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/apiserver/pkg/authentication/user"
@ -232,14 +233,28 @@ func (cfgCtlr *configController) updateObservations() {
}
}
// used from the unit tests only.
func (cfgCtlr *configController) getPriorityLevelState(plName string) *priorityLevelState {
cfgCtlr.lock.Lock()
defer cfgCtlr.lock.Unlock()
return cfgCtlr.priorityLevelStates[plName]
}
func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
// Let the config worker stop when we are done
defer cfgCtlr.configQueue.ShutDown()
klog.Info("Starting API Priority and Fairness config controller")
if ok := cache.WaitForCacheSync(stopCh, cfgCtlr.plInformerSynced, cfgCtlr.fsInformerSynced); !ok {
return fmt.Errorf("Never achieved initial sync")
}
klog.Info("Running API Priority and Fairness config worker")
wait.Until(cfgCtlr.runWorker, time.Second, stopCh)
go wait.Until(cfgCtlr.runWorker, time.Second, stopCh)
<-stopCh
klog.Info("Shutting down API Priority and Fairness config worker")
return nil
}

View File

@ -21,12 +21,15 @@ import (
"fmt"
"math/rand"
"os"
"reflect"
"sync"
"testing"
"time"
flowcontrol "k8s.io/api/flowcontrol/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
@ -307,6 +310,106 @@ func TestConfigConsumer(t *testing.T) {
}
}
func TestAPFControllerWithGracefulShutdown(t *testing.T) {
const plName = "test-ps"
fs := &flowcontrol.FlowSchema{
ObjectMeta: metav1.ObjectMeta{
Name: "test-fs",
},
Spec: flowcontrol.FlowSchemaSpec{
MatchingPrecedence: 100,
PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{
Name: plName,
},
DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{
Type: flowcontrol.FlowDistinguisherMethodByUserType,
},
},
}
pl := &flowcontrol.PriorityLevelConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: plName,
},
Spec: flowcontrol.PriorityLevelConfigurationSpec{
Type: flowcontrol.PriorityLevelEnablementLimited,
Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
AssuredConcurrencyShares: 10,
LimitResponse: flowcontrol.LimitResponse{
Type: flowcontrol.LimitResponseTypeReject,
},
},
},
}
clientset := clientsetfake.NewSimpleClientset(fs, pl)
informerFactory := informers.NewSharedInformerFactory(clientset, time.Second)
flowcontrolClient := clientset.FlowcontrolV1beta1()
cts := &ctlrTestState{t: t,
fcIfc: flowcontrolClient,
existingFSs: map[string]*flowcontrol.FlowSchema{},
existingPLs: map[string]*flowcontrol.PriorityLevelConfiguration{},
heldRequestsMap: map[string][]heldRequest{},
queues: map[string]*ctlrTestQueueSet{},
}
controller := newTestableController(TestableConfig{
informerFactory,
flowcontrolClient,
100,
time.Minute,
metrics.PriorityLevelConcurrencyObserverPairGenerator,
cts,
})
stopCh, controllerCompletedCh := make(chan struct{}), make(chan struct{})
var controllerErr error
informerFactory.Start(stopCh)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
status := informerFactory.WaitForCacheSync(ctx.Done())
if names := unsynced(status); len(names) > 0 {
t.Fatalf("WaitForCacheSync did not successfully complete, resources=%#v", names)
}
go func() {
defer close(controllerCompletedCh)
controllerErr = controller.Run(stopCh)
}()
// ensure that the controller has run its first loop.
err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
if controller.getPriorityLevelState(plName) == nil {
return false, nil
}
return true, nil
})
if err != nil {
t.Errorf("expected the controller to reconcile the priority level configuration object: %s, error: %s", plName, err)
}
close(stopCh)
t.Log("waiting for the controller Run function to shutdown gracefully")
<-controllerCompletedCh
if controllerErr != nil {
t.Errorf("expected nil error from controller Run function, but got: %#v", controllerErr)
}
}
func unsynced(status map[reflect.Type]bool) []string {
names := make([]string, 0)
for objType, synced := range status {
if !synced {
names = append(names, objType.Name())
}
}
return names
}
func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTestingRecord, catchAlls map[bool]*flowcontrol.FlowSchema) {
t := cts.t
ctlr := cts.cfgCtlr