/* Copyright 2024 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package generic import ( "context" goerrors "errors" "fmt" "sync" "sync/atomic" "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/admission/plugin/policy/internal/generic" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" ) type policySource[P runtime.Object, B runtime.Object, E Evaluator] struct { ctx context.Context policyInformer generic.Informer[P] bindingInformer generic.Informer[B] restMapper meta.RESTMapper newPolicyAccessor func(P) PolicyAccessor newBindingAccessor func(B) BindingAccessor informerFactory informers.SharedInformerFactory dynamicClient dynamic.Interface compiler func(P) E // Currently compiled list of valid/active policy-binding pairs policies atomic.Pointer[[]PolicyHook[P, B, E]] // Whether the cache of policies is dirty and needs to be recompiled policiesDirty atomic.Bool lock sync.Mutex compiledPolicies map[types.NamespacedName]compiledPolicyEntry[E] // Temporary until we use the dynamic informer factory paramsCRDControllers map[schema.GroupVersionKind]*paramInfo } type paramInfo struct { mapping meta.RESTMapping // When the param is changed, or the informer is done being used, the cancel // func should be called to stop/cleanup the original informer cancelFunc func() // The lister for this param informer informers.GenericInformer } type compiledPolicyEntry[E Evaluator] struct { policyVersion string evaluator E } type PolicyHook[P runtime.Object, B runtime.Object, E Evaluator] struct { Policy P Bindings []B // ParamInformer is the informer for the param CRD for this policy, or nil if // there is no param or if there was a configuration error ParamInformer informers.GenericInformer ParamScope meta.RESTScope Evaluator E ConfigurationError error } var _ Source[PolicyHook[runtime.Object, runtime.Object, Evaluator]] = &policySource[runtime.Object, runtime.Object, Evaluator]{} func NewPolicySource[P runtime.Object, B runtime.Object, E Evaluator]( policyInformer cache.SharedIndexInformer, bindingInformer cache.SharedIndexInformer, newPolicyAccessor func(P) PolicyAccessor, newBindingAccessor func(B) BindingAccessor, compiler func(P) E, paramInformerFactory informers.SharedInformerFactory, dynamicClient dynamic.Interface, restMapper meta.RESTMapper, ) Source[PolicyHook[P, B, E]] { res := &policySource[P, B, E]{ compiler: compiler, policyInformer: generic.NewInformer[P](policyInformer), bindingInformer: generic.NewInformer[B](bindingInformer), compiledPolicies: map[types.NamespacedName]compiledPolicyEntry[E]{}, newPolicyAccessor: newPolicyAccessor, newBindingAccessor: newBindingAccessor, paramsCRDControllers: map[schema.GroupVersionKind]*paramInfo{}, informerFactory: paramInformerFactory, dynamicClient: dynamicClient, restMapper: restMapper, } return res } func (s *policySource[P, B, E]) Run(ctx context.Context) error { if s.ctx != nil { return fmt.Errorf("policy source already running") } // Wait for initial cache sync of policies and informers before reconciling // any if !cache.WaitForNamedCacheSync(fmt.Sprintf("%T", s), ctx.Done(), s.UpstreamHasSynced) { err := ctx.Err() if err == nil { err = fmt.Errorf("initial cache sync for %T failed", s) } return err } s.ctx = ctx // Perform initial policy compilation after initial list has finished s.notify() s.refreshPolicies() notifyFuncs := cache.ResourceEventHandlerFuncs{ AddFunc: func(_ interface{}) { s.notify() }, UpdateFunc: func(_, _ interface{}) { s.notify() }, DeleteFunc: func(_ interface{}) { s.notify() }, } handle, err := s.policyInformer.AddEventHandler(notifyFuncs) if err != nil { return err } defer func() { if err := s.policyInformer.RemoveEventHandler(handle); err != nil { utilruntime.HandleError(fmt.Errorf("failed to remove policy event handler: %w", err)) } }() bindingHandle, err := s.bindingInformer.AddEventHandler(notifyFuncs) if err != nil { return err } defer func() { if err := s.bindingInformer.RemoveEventHandler(bindingHandle); err != nil { utilruntime.HandleError(fmt.Errorf("failed to remove binding event handler: %w", err)) } }() // Start a worker that checks every second to see if policy data is dirty // and needs to be recompiled go func() { // Loop every 1 second until context is cancelled, refreshing policies wait.Until(s.refreshPolicies, 1*time.Second, ctx.Done()) }() <-ctx.Done() return nil } func (s *policySource[P, B, E]) UpstreamHasSynced() bool { return s.policyInformer.HasSynced() && s.bindingInformer.HasSynced() } // HasSynced implements Source. func (s *policySource[P, B, E]) HasSynced() bool { // As an invariant we never store `nil` into the atomic list of processed // policy hooks. If it is nil, then we haven't compiled all the policies // and stored them yet. return s.Hooks() != nil } // Hooks implements Source. func (s *policySource[P, B, E]) Hooks() []PolicyHook[P, B, E] { res := s.policies.Load() // Error case should not happen since evaluation function never // returns error if res == nil { // Not yet synced return nil } return *res } func (s *policySource[P, B, E]) refreshPolicies() { if !s.UpstreamHasSynced() { return } else if !s.policiesDirty.Swap(false) { return } // It is ok the cache gets marked dirty again between us clearing the // flag and us calculating the policies. The dirty flag would be marked again, // and we'd have a no-op after comparing resource versions on the next sync. klog.Infof("refreshing policies") policies, err := s.calculatePolicyData() // Intentionally store policy list regardless of error. There may be // an error returned if there was a configuration error in one of the policies, // but we would still want those policies evaluated // (for instance to return error on failaction). Or if there was an error // listing all policies at all, we would want to wipe the list. s.policies.Store(&policies) if err != nil { // An error was generated while syncing policies. Mark it as dirty again // so we can retry later utilruntime.HandleError(fmt.Errorf("encountered error syncing policies: %w. Rescheduling policy sync", err)) s.notify() } } func (s *policySource[P, B, E]) notify() { s.policiesDirty.Store(true) } // calculatePolicyData calculates the list of policies and bindings for each // policy. If there is an error in generation, it will return the error and // the partial list of policies that were able to be generated. Policies that // have an error will have a non-nil ConfigurationError field, but still be // included in the result. // // This function caches the result of the intermediate compilations func (s *policySource[P, B, E]) calculatePolicyData() ([]PolicyHook[P, B, E], error) { if !s.UpstreamHasSynced() { return nil, fmt.Errorf("cannot calculate policy data until upstream has synced") } // Fat-fingered lock that can be made more fine-tuned if required s.lock.Lock() defer s.lock.Unlock() // Create a local copy of all policies and bindings policiesToBindings := map[types.NamespacedName][]B{} bindingList, err := s.bindingInformer.List(labels.Everything()) if err != nil { // This should never happen unless types are misconfigured // (can't use meta.accessor on them) return nil, err } // Gather a list of all active policy bindings for _, bindingSpec := range bindingList { bindingAccessor := s.newBindingAccessor(bindingSpec) policyKey := bindingAccessor.GetPolicyName() // Add this binding to the list of bindings for this policy policiesToBindings[policyKey] = append(policiesToBindings[policyKey], bindingSpec) } result := make([]PolicyHook[P, B, E], 0, len(bindingList)) usedParams := map[schema.GroupVersionKind]struct{}{} var errs []error for policyKey, bindingSpecs := range policiesToBindings { var inf generic.NamespacedLister[P] = s.policyInformer if len(policyKey.Namespace) > 0 { inf = s.policyInformer.Namespaced(policyKey.Namespace) } policySpec, err := inf.Get(policyKey.Name) if errors.IsNotFound(err) { // Policy for bindings doesn't exist. This can happen if the policy // was deleted before the binding, or the binding was created first. // // Just skip bindings that refer to non-existent policies // If the policy is recreated, the cache will be marked dirty and // this function will run again. continue } else if err != nil { // This should never happen since fetching from a cache should never // fail and this function checks that the cache was synced before // even getting to this point. errs = append(errs, err) continue } var parsedParamKind *schema.GroupVersionKind policyAccessor := s.newPolicyAccessor(policySpec) if paramKind := policyAccessor.GetParamKind(); paramKind != nil { groupVersion, err := schema.ParseGroupVersion(paramKind.APIVersion) if err != nil { errs = append(errs, fmt.Errorf("failed to parse paramKind APIVersion: %w", err)) continue } parsedParamKind = &schema.GroupVersionKind{ Group: groupVersion.Group, Version: groupVersion.Version, Kind: paramKind.Kind, } // TEMPORARY UNTIL WE HAVE SHARED PARAM INFORMERS usedParams[*parsedParamKind] = struct{}{} } paramInformer, paramScope, configurationError := s.ensureParamsForPolicyLocked(parsedParamKind) result = append(result, PolicyHook[P, B, E]{ Policy: policySpec, Bindings: bindingSpecs, Evaluator: s.compilePolicyLocked(policySpec), ParamInformer: paramInformer, ParamScope: paramScope, ConfigurationError: configurationError, }) // Should queue a re-sync for policy sync error. If our shared param // informer can notify us when CRD discovery changes we can remove this // and just rely on the informer to notify us when the CRDs change if configurationError != nil { errs = append(errs, configurationError) } } // Clean up orphaned policies by replacing the old cache of compiled policies // (the map of used policies is updated by `compilePolicy`) for policyKey := range s.compiledPolicies { if _, wasSeen := policiesToBindings[policyKey]; !wasSeen { delete(s.compiledPolicies, policyKey) } } // Clean up orphaned param informers for paramKind, info := range s.paramsCRDControllers { if _, wasSeen := usedParams[paramKind]; !wasSeen { info.cancelFunc() delete(s.paramsCRDControllers, paramKind) } } err = nil if len(errs) > 0 { err = goerrors.Join(errs...) } return result, err } // ensureParamsForPolicyLocked ensures that the informer for the paramKind is // started and returns the informer and the scope of the paramKind. // // Must be called under write lock func (s *policySource[P, B, E]) ensureParamsForPolicyLocked(paramSource *schema.GroupVersionKind) (informers.GenericInformer, meta.RESTScope, error) { if paramSource == nil { return nil, nil, nil } else if info, ok := s.paramsCRDControllers[*paramSource]; ok { return info.informer, info.mapping.Scope, nil } mapping, err := s.restMapper.RESTMapping(schema.GroupKind{ Group: paramSource.Group, Kind: paramSource.Kind, }, paramSource.Version) if err != nil { // Failed to resolve. Return error so we retry again (rate limited) // Save a record of this definition with an evaluator that unconditionally return nil, nil, fmt.Errorf("failed to find resource referenced by paramKind: '%v'", *paramSource) } // We are not watching this param. Start an informer for it. instanceContext, instanceCancel := context.WithCancel(s.ctx) var informer informers.GenericInformer // Try to see if our provided informer factory has an informer for this type. // We assume the informer is already started, and starts all types associated // with it. if genericInformer, err := s.informerFactory.ForResource(mapping.Resource); err == nil { informer = genericInformer // Start the informer s.informerFactory.Start(instanceContext.Done()) } else { // Dynamic JSON informer fallback. // Cannot use shared dynamic informer since it would be impossible // to clean CRD informers properly with multiple dependents // (cannot start ahead of time, and cannot track dependencies via stopCh) informer = dynamicinformer.NewFilteredDynamicInformer( s.dynamicClient, mapping.Resource, corev1.NamespaceAll, // Use same interval as is used for k8s typed sharedInformerFactory // https://github.com/kubernetes/kubernetes/blob/7e0923899fed622efbc8679cca6b000d43633e38/cmd/kube-apiserver/app/server.go#L430 10*time.Minute, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, nil, ) go informer.Informer().Run(instanceContext.Done()) } klog.Infof("informer started for %v", *paramSource) ret := ¶mInfo{ mapping: *mapping, cancelFunc: instanceCancel, informer: informer, } s.paramsCRDControllers[*paramSource] = ret return ret.informer, mapping.Scope, nil } // For testing func (s *policySource[P, B, E]) getParamInformer(param schema.GroupVersionKind) (informers.GenericInformer, meta.RESTScope) { s.lock.Lock() defer s.lock.Unlock() if info, ok := s.paramsCRDControllers[param]; ok { return info.informer, info.mapping.Scope } return nil, nil } // compilePolicyLocked compiles the policy and returns the evaluator for it. // If the policy has not changed since the last compilation, it will return // the cached evaluator. // // Must be called under write lock func (s *policySource[P, B, E]) compilePolicyLocked(policySpec P) E { policyMeta, err := meta.Accessor(policySpec) if err != nil { // This should not happen if P, and B have ObjectMeta, but // unfortunately there is no way to express "able to call // meta.Accessor" as a type constraint utilruntime.HandleError(err) var emptyEvaluator E return emptyEvaluator } key := types.NamespacedName{ Namespace: policyMeta.GetNamespace(), Name: policyMeta.GetName(), } compiledPolicy, wasCompiled := s.compiledPolicies[key] // If the policy or binding has changed since it was last compiled, // and if there is no configuration error (like a missing param CRD) // then we recompile if !wasCompiled || compiledPolicy.policyVersion != policyMeta.GetResourceVersion() { compiledPolicy = compiledPolicyEntry[E]{ policyVersion: policyMeta.GetResourceVersion(), evaluator: s.compiler(policySpec), } s.compiledPolicies[key] = compiledPolicy } return compiledPolicy.evaluator }