Monitor all resource changes. (#176)

Signed-off-by: RainbowMango <renhongcai@huawei.com>
This commit is contained in:
Hongcai Ren 2021-02-27 17:38:49 +08:00 committed by GitHub
parent 14154f7c77
commit 8efa91208b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 268 additions and 1 deletions

View File

@ -4,6 +4,7 @@ import (
"flag"
"fmt"
"os"
"time"
"github.com/spf13/cobra"
"k8s.io/client-go/dynamic"
@ -21,6 +22,8 @@ import (
"github.com/karmada-io/karmada/pkg/controllers/namespace"
"github.com/karmada-io/karmada/pkg/controllers/propagationpolicy"
"github.com/karmada-io/karmada/pkg/controllers/status"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/detector"
"github.com/karmada-io/karmada/pkg/util/gclient"
"github.com/karmada-io/karmada/pkg/util/informermanager"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
@ -94,6 +97,16 @@ func setupControllers(mgr controllerruntime.Manager, stopChan <-chan struct{}) {
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), kubeClientSet, mgr.GetRESTMapper())
overridemanager := overridemanager.New(mgr.GetClient())
resourceDetector := &detector.ResourceDetector{
ClientSet: kubeClientSet,
InformerManager: informermanager.NewSingleClusterInformerManager(dynamicClientSet, 0),
}
resourceDetector.EventHandler = informermanager.NewFilteringHandlerOnAllEvents(resourceDetector.EventFilter, resourceDetector.OnAdd, resourceDetector.OnUpdate, resourceDetector.OnDelete)
resourceDetector.Processor = util.NewAsyncWorker("resource detector", time.Second, detector.ClusterWideKeyFunc, resourceDetector.Reconcile)
if err := mgr.Add(resourceDetector); err != nil {
klog.Fatalf("Failed to setup resource detector: %v", err)
}
ClusterController := &cluster.Controller{
Client: mgr.GetClient(),
KubeClientSet: kubeClientSet,

View File

@ -0,0 +1,130 @@
package detector
import (
"strings"
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/manager"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/informermanager"
"github.com/karmada-io/karmada/pkg/util/names"
)
// ResourceDetector is a resource watcher which watches all resources and reconcile the events.
type ResourceDetector struct {
ClientSet kubernetes.Interface
InformerManager informermanager.SingleClusterInformerManager
EventHandler cache.ResourceEventHandler
Processor util.AsyncWorker
stopCh <-chan struct{}
}
// Start runs the detector, never stop until stopCh closed.
func (d *ResourceDetector) Start(stopCh <-chan struct{}) error {
klog.Infof("Starting resource detector.")
d.stopCh = stopCh
d.Processor.Run(1, stopCh)
go d.discoverResources(30 * time.Second)
<-stopCh
klog.Infof("Stopped as stopCh closed.")
return nil
}
// Check if our ResourceDetector implements necessary interfaces
var _ manager.Runnable = &ResourceDetector{}
var _ manager.LeaderElectionRunnable = &ResourceDetector{}
func (d *ResourceDetector) discoverResources(period time.Duration) {
wait.Until(func() {
newResources := GetDeletableResources(d.ClientSet.Discovery())
for r := range newResources {
if d.InformerManager.IsHandlerExist(r, d.EventHandler) {
continue
}
klog.Infof("Setup informer for %s", r.String())
d.InformerManager.ForResource(r, d.EventHandler)
}
d.InformerManager.Start(d.stopCh)
}, period, d.stopCh)
}
// NeedLeaderElection implements LeaderElectionRunnable interface.
// So that the detector could run in the leader election mode.
func (d *ResourceDetector) NeedLeaderElection() bool {
return true
}
// Reconcile performs a full reconciliation for the object referred to by the key.
// The key will be re-queued if an error is non-nil.
func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
klog.Infof("Syncing %s", key)
// TODO(RainbowMango): implement later.
return nil
}
// EventFilter tells if an object should be take care of.
//
// All objects under Kubernetes reserved namespace should be ignored:
// - kube-system
// - kube-public
// - kube-node-lease
// All objects under Karmada reserved namespace should be ignored:
// - karmada-system
// - karmada-cluster
// - karmada-es-*
// All objects which API group defined by Karmada should be ignored:
// - cluster.karmada.io
// - policy.karmada.io
func (d *ResourceDetector) EventFilter(obj interface{}) bool {
key, err := ClusterWideKeyFunc(obj)
if err != nil {
return false
}
clusterWideKey, ok := key.(ClusterWideKey)
if !ok {
klog.Errorf("Invalid key")
return false
}
if strings.HasPrefix(clusterWideKey.Namespace, names.KubernetesReservedNSPrefix) ||
strings.HasPrefix(clusterWideKey.Namespace, names.KarmadaReservedNSPrefix) {
return false
}
if clusterWideKey.GVK.Group == clusterv1alpha1.GroupName ||
clusterWideKey.GVK.Group == policyv1alpha1.GroupName {
return false
}
return true
}
// OnAdd handles object add event and push the object to queue.
func (d *ResourceDetector) OnAdd(obj interface{}) {
runtimeObj, ok := obj.(runtime.Object)
if !ok {
return
}
d.Processor.EnqueueRateLimited(runtimeObj)
}
// OnUpdate handles object update event and push the object to queue.
func (d *ResourceDetector) OnUpdate(oldObj, newObj interface{}) {
d.OnAdd(newObj)
}
// OnDelete handles object delete event and push the object to queue.
func (d *ResourceDetector) OnDelete(obj interface{}) {
d.OnAdd(obj)
}

View File

@ -0,0 +1,47 @@
package detector
import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/klog/v2"
)
// GetDeletableResources returns all resources from discoveryClient.
// More specifically, all preferred resources which support the 'delete', 'list', and 'watch' verbs.
//
// All discovery errors are considered temporary. Upon encountering any error,
// GetDeletableResources will log and return any discovered resources it was
// able to process (which may be none).
//
// This code is directly lifted from the Kubernetes codebase.
// For reference: https://github.com/kubernetes/kubernetes/blob/1e11e4a2108024935ecfcb2912226cedeafd99df/pkg/controller/garbagecollector/garbagecollector.go#L638-L667
func GetDeletableResources(discoveryClient discovery.ServerResourcesInterface) map[schema.GroupVersionResource]struct{} {
preferredResources, err := discoveryClient.ServerPreferredResources()
if err != nil {
if discovery.IsGroupDiscoveryFailedError(err) {
klog.Warningf("failed to discover some groups: %v", err.(*discovery.ErrGroupDiscoveryFailed).Groups)
} else {
klog.Warningf("failed to discover preferred resources: %v", err)
}
}
if preferredResources == nil {
return map[schema.GroupVersionResource]struct{}{}
}
// This is extracted from discovery.GroupVersionResources to allow tolerating
// failures on a per-resource basis.
deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete", "list", "watch"}}, preferredResources)
deletableGroupVersionResources := map[schema.GroupVersionResource]struct{}{}
for _, rl := range deletableResources {
gv, err := schema.ParseGroupVersion(rl.GroupVersion)
if err != nil {
klog.Warningf("ignoring invalid discovered resource %q: %v", rl.GroupVersion, err)
continue
}
for i := range rl.APIResources {
deletableGroupVersionResources[schema.GroupVersionResource{Group: gv.Group, Version: gv.Version, Resource: rl.APIResources[i].Name}] = struct{}{}
}
}
return deletableGroupVersionResources
}

View File

@ -0,0 +1,45 @@
package detector
import (
"fmt"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/pkg/util"
)
// ClusterWideKey is the object key which is a unique identifier under a cluster, across all resources.
type ClusterWideKey struct {
GVK schema.GroupVersionKind
Namespace string
Name string
}
func (k *ClusterWideKey) String() string {
return k.GVK.String() + "/" + k.Namespace + "/" + k.Name
}
// ClusterWideKeyFunc generates a ClusterWideKey for object.
func ClusterWideKeyFunc(obj interface{}) (util.QueueKey, error) {
runtimeObject, ok := obj.(runtime.Object)
if !ok {
klog.Errorf("Invalid object")
return nil, fmt.Errorf("not runtime object")
}
metaInfo, err := meta.Accessor(obj)
if err != nil { // should not happen
return nil, fmt.Errorf("object has no meta: %v", err)
}
key := ClusterWideKey{
GVK: runtimeObject.GetObjectKind().GroupVersionKind(),
Namespace: metaInfo.GetNamespace(),
Name: metaInfo.GetName(),
}
return key, nil
}

View File

@ -37,3 +37,21 @@ func NewHandlerOnAllEvents(fn func(runtime.Object)) cache.ResourceEventHandler {
},
}
}
// NewFilteringHandlerOnAllEvents builds a FilteringResourceEventHandler applies the provided filter to all events
// coming in, ensuring the appropriate nested handler method is invoked.
//
// Note: An object that starts passing the filter after an update is considered an add, and
// an object that stops passing the filter after an update is considered a delete.
// Like the handlers, the filter MUST NOT modify the objects it is given.
func NewFilteringHandlerOnAllEvents(filterFunc func(obj interface{}) bool, addFunc func(obj interface{}),
updateFunc func(oldObj, newObj interface{}), deleteFunc func(obj interface{})) cache.ResourceEventHandler {
return &cache.FilteringResourceEventHandler{
FilterFunc: filterFunc,
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: addFunc,
UpdateFunc: updateFunc,
DeleteFunc: deleteFunc,
},
}
}

View File

@ -5,6 +5,20 @@ import (
"strings"
)
const (
// KubernetesReservedNSPrefix is the prefix of namespace which reserved by Kubernetes system, such as:
// - kube-system
// - kube-public
// - kube-node-lease
KubernetesReservedNSPrefix = "kube-"
// KarmadaReservedNSPrefix is the prefix of namespace which reserved by Karmada system, such as:
// - karmada-system
// - karmada-cluster
// - karmada-es-*
KarmadaReservedNSPrefix = "karmada-"
)
// executionSpacePrefix is the prefix of execution space
const executionSpacePrefix = "karmada-es-"

View File

@ -188,7 +188,7 @@ func (w *asyncWorker) worker() {
}
defer w.queue.Done(key)
err := w.reconcileFunc(key.(string))
err := w.reconcileFunc(key)
w.handleError(err, key)
}