diff --git a/pkg/estimator/server/eventhandlers.go b/pkg/estimator/server/eventhandlers.go new file mode 100644 index 000000000..73b45a661 --- /dev/null +++ b/pkg/estimator/server/eventhandlers.go @@ -0,0 +1,157 @@ +// This code is mostly lifted from the Kubernetes codebase to establish the internal cache. +// https://github.com/kubernetes/kubernetes/blob/release-1.25/pkg/scheduler/eventhandlers.go + +package server + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +func addAllEventHandlers(es *AccurateSchedulerEstimatorServer, informerFactory informers.SharedInformerFactory) { + // scheduled pod cache + informerFactory.Core().V1().Pods().Informer().AddEventHandler( + cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch t := obj.(type) { + case *corev1.Pod: + return assignedPod(t) + case cache.DeletedFinalStateUnknown: + if _, ok := t.Obj.(*corev1.Pod); ok { + // The carried object may be stale, so we don't use it to check if + // it's assigned or not. Attempting to cleanup anyways. + return true + } + utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod", obj)) + return false + default: + utilruntime.HandleError(fmt.Errorf("unable to handle object: %T", obj)) + return false + } + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: es.addPodToCache, + UpdateFunc: es.updatePodInCache, + DeleteFunc: es.deletePodFromCache, + }, + }, + ) + informerFactory.Core().V1().Nodes().Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: es.addNodeToCache, + UpdateFunc: es.updateNodeInCache, + DeleteFunc: es.deleteNodeFromCache, + }, + ) +} + +func (es *AccurateSchedulerEstimatorServer) addPodToCache(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + if !ok { + klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", obj) + return + } + klog.V(3).InfoS("Add event for scheduled pod", "pod", klog.KObj(pod)) + + if err := es.Cache.AddPod(pod); err != nil { + klog.ErrorS(err, "Estimator cache AddPod failed", "pod", klog.KObj(pod)) + } +} + +func (es *AccurateSchedulerEstimatorServer) updatePodInCache(oldObj, newObj interface{}) { + oldPod, ok := oldObj.(*corev1.Pod) + if !ok { + klog.ErrorS(nil, "Cannot convert oldObj to *v1.Pod", "oldObj", oldObj) + return + } + newPod, ok := newObj.(*corev1.Pod) + if !ok { + klog.ErrorS(nil, "Cannot convert newObj to *v1.Pod", "newObj", newObj) + return + } + klog.V(4).InfoS("Update event for scheduled pod", "pod", klog.KObj(oldPod)) + + if err := es.Cache.UpdatePod(oldPod, newPod); err != nil { + klog.ErrorS(err, "Estimator cache UpdatePod failed", "pod", klog.KObj(oldPod)) + } +} + +func (es *AccurateSchedulerEstimatorServer) deletePodFromCache(obj interface{}) { + var pod *corev1.Pod + switch t := obj.(type) { + case *corev1.Pod: + pod = t + case cache.DeletedFinalStateUnknown: + var ok bool + pod, ok = t.Obj.(*corev1.Pod) + if !ok { + klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", t.Obj) + return + } + default: + klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", t) + return + } + klog.V(3).InfoS("Delete event for scheduled pod", "pod", klog.KObj(pod)) + if err := es.Cache.RemovePod(pod); err != nil { + klog.ErrorS(err, "Estimator cache RemovePod failed", "pod", klog.KObj(pod)) + } +} + +func (es *AccurateSchedulerEstimatorServer) addNodeToCache(obj interface{}) { + node, ok := obj.(*corev1.Node) + if !ok { + klog.ErrorS(nil, "Cannot convert to *v1.Node", "obj", obj) + return + } + + es.Cache.AddNode(node) + klog.V(3).InfoS("Add event for node", "node", klog.KObj(node)) +} + +func (es *AccurateSchedulerEstimatorServer) updateNodeInCache(oldObj, newObj interface{}) { + oldNode, ok := oldObj.(*corev1.Node) + if !ok { + klog.ErrorS(nil, "Cannot convert oldObj to *v1.Node", "oldObj", oldObj) + return + } + newNode, ok := newObj.(*corev1.Node) + if !ok { + klog.ErrorS(nil, "Cannot convert newObj to *v1.Node", "newObj", newObj) + return + } + + es.Cache.UpdateNode(oldNode, newNode) +} + +func (es *AccurateSchedulerEstimatorServer) deleteNodeFromCache(obj interface{}) { + var node *corev1.Node + switch t := obj.(type) { + case *corev1.Node: + node = t + case cache.DeletedFinalStateUnknown: + var ok bool + node, ok = t.Obj.(*corev1.Node) + if !ok { + klog.ErrorS(nil, "Cannot convert to *v1.Node", "obj", t.Obj) + return + } + default: + klog.ErrorS(nil, "Cannot convert to *v1.Node", "obj", t) + return + } + klog.V(3).InfoS("Delete event for node", "node", klog.KObj(node)) + if err := es.Cache.RemoveNode(node); err != nil { + klog.ErrorS(err, "Scheduler cache RemoveNode failed") + } +} + +// assignedPod selects pods that are assigned (scheduled and running). +func assignedPod(pod *corev1.Pod) bool { + return len(pod.Spec.NodeName) != 0 +} diff --git a/pkg/util/lifted/doc.go b/pkg/util/lifted/doc.go index 9096e0f12..ff6d1f2bc 100644 --- a/pkg/util/lifted/doc.go +++ b/pkg/util/lifted/doc.go @@ -44,7 +44,6 @@ package lifted | objectwatcher.go | https://github.com/kubernetes-sigs/kubefed/blob/master/pkg/controller/util/propagatedversion.go#L35-L43 | func ObjectVersion | N | | objectwatcher.go | https://github.com/kubernetes-sigs/kubefed/blob/master/pkg/controller/util/propagatedversion.go#L45-L59 | func ObjectNeedsUpdate | N | | objectwatcher.go | https://github.com/kubernetes-sigs/kubefed/blob/master/pkg/controller/util/meta.go#L63-L80 | func objectMetaObjEquivalent | Y | -| parallelism_test.go | https://github.com/kubernetes/kubernetes/blob/release-1.23/pkg/scheduler/framework/parallelize/parallelism_test.go | func TestChunkSize | N | | podtemplate.go | https://github.com/kubernetes/kubernetes/blob/release-1.23/pkg/controller/controller_utils.go#L466-L472 | func getPodsLabelSet | N | | podtemplate.go | https://github.com/kubernetes/kubernetes/blob/release-1.23/pkg/controller/controller_utils.go#L474-L478 | func getPodsFinalizers | N | | podtemplate.go | https://github.com/kubernetes/kubernetes/blob/release-1.23/pkg/controller/controller_utils.go#L480-L486 | func getPodsAnnotationSet | N | diff --git a/pkg/util/lifted/scheduler/cache/cache.go b/pkg/util/lifted/scheduler/cache/cache.go new file mode 100644 index 000000000..ab7be0790 --- /dev/null +++ b/pkg/util/lifted/scheduler/cache/cache.go @@ -0,0 +1,767 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This code is directly lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package. +// For reference: +// https://github.com/kubernetes/kubernetes/blob/release-1.25/pkg/scheduler/internal/cache/cache.go + +package cache + +import ( + "fmt" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" + + "github.com/karmada-io/karmada/pkg/util/lifted/scheduler/framework" +) + +var ( + cleanAssumedPeriod = 1 * time.Second +) + +// New returns a Cache implementation. +// It automatically starts a go routine that manages expiration of assumed pods. +// "ttl" is how long the assumed pod will get expired, "0" means pod will never expire. +// "stop" is the channel that would close the background goroutine. +func New(ttl time.Duration, stop <-chan struct{}) Cache { + cache := newCache(ttl, cleanAssumedPeriod, stop) + cache.run() + return cache +} + +// nodeInfoListItem holds a NodeInfo pointer and acts as an item in a doubly +// linked list. When a NodeInfo is updated, it goes to the head of the list. +// The items closer to the head are the most recently updated items. +type nodeInfoListItem struct { + info *framework.NodeInfo + next *nodeInfoListItem + prev *nodeInfoListItem +} + +type cacheImpl struct { + stop <-chan struct{} + ttl time.Duration + period time.Duration + + // This mutex guards all fields within this cache struct. + mu sync.RWMutex + // a set of assumed pod keys. + // The key could further be used to get an entry in podStates. + assumedPods sets.String + // a map from pod key to podState. + podStates map[string]*podState + nodes map[string]*nodeInfoListItem + // headNode points to the most recently updated NodeInfo in "nodes". It is the + // head of the linked list. + headNode *nodeInfoListItem + nodeTree *nodeTree + // A map from image name to its imageState. + imageStates map[string]*imageState +} + +type podState struct { + pod *corev1.Pod + // Used by assumedPod to determinate expiration. + // If deadline is nil, assumedPod will never expire. + deadline *time.Time + // Used to block cache from expiring assumedPod if binding still runs + bindingFinished bool +} + +type imageState struct { + // Size of the image + size int64 + // A set of node names for nodes having this image present + nodes sets.String +} + +// createImageStateSummary returns a summarizing snapshot of the given image's state. +func (cache *cacheImpl) createImageStateSummary(state *imageState) *framework.ImageStateSummary { + return &framework.ImageStateSummary{ + Size: state.size, + NumNodes: len(state.nodes), + } +} + +func newCache(ttl, period time.Duration, stop <-chan struct{}) *cacheImpl { + return &cacheImpl{ + ttl: ttl, + period: period, + stop: stop, + + nodes: make(map[string]*nodeInfoListItem), + nodeTree: newNodeTree(nil), + assumedPods: make(sets.String), + podStates: make(map[string]*podState), + imageStates: make(map[string]*imageState), + } +} + +// newNodeInfoListItem initializes a new nodeInfoListItem. +func newNodeInfoListItem(ni *framework.NodeInfo) *nodeInfoListItem { + return &nodeInfoListItem{ + info: ni, + } +} + +// moveNodeInfoToHead moves a NodeInfo to the head of "cache.nodes" doubly +// linked list. The head is the most recently updated NodeInfo. +// We assume cache lock is already acquired. +func (cache *cacheImpl) moveNodeInfoToHead(name string) { + ni, ok := cache.nodes[name] + if !ok { + klog.ErrorS(nil, "No node info with given name found in the cache", "node", klog.KRef("", name)) + return + } + // if the node info list item is already at the head, we are done. + if ni == cache.headNode { + return + } + + if ni.prev != nil { + ni.prev.next = ni.next + } + if ni.next != nil { + ni.next.prev = ni.prev + } + if cache.headNode != nil { + cache.headNode.prev = ni + } + ni.next = cache.headNode + ni.prev = nil + cache.headNode = ni +} + +// removeNodeInfoFromList removes a NodeInfo from the "cache.nodes" doubly +// linked list. +// We assume cache lock is already acquired. +func (cache *cacheImpl) removeNodeInfoFromList(name string) { + ni, ok := cache.nodes[name] + if !ok { + klog.ErrorS(nil, "No node info with given name found in the cache", "node", klog.KRef("", name)) + return + } + + if ni.prev != nil { + ni.prev.next = ni.next + } + if ni.next != nil { + ni.next.prev = ni.prev + } + // if the removed item was at the head, we must update the head. + if ni == cache.headNode { + cache.headNode = ni.next + } + delete(cache.nodes, name) +} + +// Dump produces a dump of the current scheduler cache. This is used for +// debugging purposes only and shouldn't be confused with UpdateSnapshot +// function. +// This method is expensive, and should be only used in non-critical path. +func (cache *cacheImpl) Dump() *Dump { + cache.mu.RLock() + defer cache.mu.RUnlock() + + nodes := make(map[string]*framework.NodeInfo, len(cache.nodes)) + for k, v := range cache.nodes { + nodes[k] = v.info.Clone() + } + + return &Dump{ + Nodes: nodes, + AssumedPods: cache.assumedPods.Union(nil), + } +} + +// UpdateSnapshot takes a snapshot of cached NodeInfo map. This is called at +// beginning of every scheduling cycle. +// The snapshot only includes Nodes that are not deleted at the time this function is called. +// nodeInfo.Node() is guaranteed to be not nil for all the nodes in the snapshot. +// This function tracks generation number of NodeInfo and updates only the +// entries of an existing snapshot that have changed after the snapshot was taken. +// +//nolint:gocyclo +func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error { + cache.mu.Lock() + defer cache.mu.Unlock() + + // Get the last generation of the snapshot. + snapshotGeneration := nodeSnapshot.generation + + // NodeInfoList and HavePodsWithAffinityNodeInfoList must be re-created if a node was added + // or removed from the cache. + updateAllLists := false + // HavePodsWithAffinityNodeInfoList must be re-created if a node changed its + // status from having pods with affinity to NOT having pods with affinity or the other + // way around. + updateNodesHavePodsWithAffinity := false + // HavePodsWithRequiredAntiAffinityNodeInfoList must be re-created if a node changed its + // status from having pods with required anti-affinity to NOT having pods with required + // anti-affinity or the other way around. + updateNodesHavePodsWithRequiredAntiAffinity := false + // usedPVCSet must be re-created whenever the head node generation is greater than + // last snapshot generation. + updateUsedPVCSet := false + + // Start from the head of the NodeInfo doubly linked list and update snapshot + // of NodeInfos updated after the last snapshot. + for node := cache.headNode; node != nil; node = node.next { + if node.info.Generation <= snapshotGeneration { + // all the nodes are updated before the existing snapshot. We are done. + break + } + if np := node.info.Node(); np != nil { + existing, ok := nodeSnapshot.nodeInfoMap[np.Name] + if !ok { + updateAllLists = true + existing = &framework.NodeInfo{} + nodeSnapshot.nodeInfoMap[np.Name] = existing + } + clone := node.info.Clone() + // We track nodes that have pods with affinity, here we check if this node changed its + // status from having pods with affinity to NOT having pods with affinity or the other + // way around. + if (len(existing.PodsWithAffinity) > 0) != (len(clone.PodsWithAffinity) > 0) { + updateNodesHavePodsWithAffinity = true + } + if (len(existing.PodsWithRequiredAntiAffinity) > 0) != (len(clone.PodsWithRequiredAntiAffinity) > 0) { + updateNodesHavePodsWithRequiredAntiAffinity = true + } + if !updateUsedPVCSet { + if len(existing.PVCRefCounts) != len(clone.PVCRefCounts) { + updateUsedPVCSet = true + } else { + for pvcKey := range clone.PVCRefCounts { + if _, found := existing.PVCRefCounts[pvcKey]; !found { + updateUsedPVCSet = true + break + } + } + } + } + // We need to preserve the original pointer of the NodeInfo struct since it + // is used in the NodeInfoList, which we may not update. + *existing = *clone + } + } + // Update the snapshot generation with the latest NodeInfo generation. + if cache.headNode != nil { + nodeSnapshot.generation = cache.headNode.info.Generation + } + + // Comparing to pods in nodeTree. + // Deleted nodes get removed from the tree, but they might remain in the nodes map + // if they still have non-deleted Pods. + if len(nodeSnapshot.nodeInfoMap) > cache.nodeTree.numNodes { + cache.removeDeletedNodesFromSnapshot(nodeSnapshot) + updateAllLists = true + } + + if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity || updateUsedPVCSet { + cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists) + } + + if len(nodeSnapshot.nodeInfoList) != cache.nodeTree.numNodes { + errMsg := fmt.Sprintf("snapshot state is not consistent, length of NodeInfoList=%v not equal to length of nodes in tree=%v "+ + ", length of NodeInfoMap=%v, length of nodes in cache=%v"+ + ", trying to recover", + len(nodeSnapshot.nodeInfoList), cache.nodeTree.numNodes, + len(nodeSnapshot.nodeInfoMap), len(cache.nodes)) + klog.ErrorS(nil, errMsg) + // We will try to recover by re-creating the lists for the next scheduling cycle, but still return an + // error to surface the problem, the error will likely cause a failure to the current scheduling cycle. + cache.updateNodeInfoSnapshotList(nodeSnapshot, true) + return fmt.Errorf(errMsg) + } + + return nil +} + +func (cache *cacheImpl) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) { + snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) + snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) + snapshot.usedPVCSet = sets.NewString() + if updateAll { + // Take a snapshot of the nodes order in the tree + snapshot.nodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) + nodesList, err := cache.nodeTree.list() + if err != nil { + klog.ErrorS(err, "Error occurred while retrieving the list of names of the nodes from node tree") + } + for _, nodeName := range nodesList { + if nodeInfo := snapshot.nodeInfoMap[nodeName]; nodeInfo != nil { + snapshot.nodeInfoList = append(snapshot.nodeInfoList, nodeInfo) + if len(nodeInfo.PodsWithAffinity) > 0 { + snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo) + } + if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 { + snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo) + } + for key := range nodeInfo.PVCRefCounts { + snapshot.usedPVCSet.Insert(key) + } + } else { + klog.ErrorS(nil, "Node exists in nodeTree but not in NodeInfoMap, this should not happen", "node", klog.KRef("", nodeName)) + } + } + } else { + for _, nodeInfo := range snapshot.nodeInfoList { + if len(nodeInfo.PodsWithAffinity) > 0 { + snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo) + } + if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 { + snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo) + } + for key := range nodeInfo.PVCRefCounts { + snapshot.usedPVCSet.Insert(key) + } + } + } +} + +// If certain nodes were deleted after the last snapshot was taken, we should remove them from the snapshot. +func (cache *cacheImpl) removeDeletedNodesFromSnapshot(snapshot *Snapshot) { + toDelete := len(snapshot.nodeInfoMap) - cache.nodeTree.numNodes + for name := range snapshot.nodeInfoMap { + if toDelete <= 0 { + break + } + if n, ok := cache.nodes[name]; !ok || n.info.Node() == nil { + delete(snapshot.nodeInfoMap, name) + toDelete-- + } + } +} + +// NodeCount returns the number of nodes in the cache. +// DO NOT use outside of tests. +func (cache *cacheImpl) NodeCount() int { + cache.mu.RLock() + defer cache.mu.RUnlock() + return len(cache.nodes) +} + +// PodCount returns the number of pods in the cache (including those from deleted nodes). +// DO NOT use outside of tests. +func (cache *cacheImpl) PodCount() (int, error) { + cache.mu.RLock() + defer cache.mu.RUnlock() + // podFilter is expected to return true for most or all of the pods. We + // can avoid expensive array growth without wasting too much memory by + // pre-allocating capacity. + count := 0 + for _, n := range cache.nodes { + count += len(n.info.Pods) + } + return count, nil +} + +func (cache *cacheImpl) AssumePod(pod *corev1.Pod) error { + key, err := framework.GetPodKey(pod) + if err != nil { + return err + } + + cache.mu.Lock() + defer cache.mu.Unlock() + if _, ok := cache.podStates[key]; ok { + return fmt.Errorf("pod %v(%v) is in the cache, so can't be assumed", key, klog.KObj(pod)) + } + + return cache.addPod(pod, true) +} + +func (cache *cacheImpl) FinishBinding(pod *corev1.Pod) error { + return cache.finishBinding(pod, time.Now()) +} + +// finishBinding exists to make tests deterministic by injecting now as an argument +func (cache *cacheImpl) finishBinding(pod *corev1.Pod, now time.Time) error { + key, err := framework.GetPodKey(pod) + if err != nil { + return err + } + + cache.mu.RLock() + defer cache.mu.RUnlock() + + klog.V(5).InfoS("Finished binding for pod, can be expired", "podKey", key, "pod", klog.KObj(pod)) + currState, ok := cache.podStates[key] + if ok && cache.assumedPods.Has(key) { + if cache.ttl == time.Duration(0) { + currState.deadline = nil + } else { + dl := now.Add(cache.ttl) + currState.deadline = &dl + } + currState.bindingFinished = true + } + return nil +} + +func (cache *cacheImpl) ForgetPod(pod *corev1.Pod) error { + key, err := framework.GetPodKey(pod) + if err != nil { + return err + } + + cache.mu.Lock() + defer cache.mu.Unlock() + + currState, ok := cache.podStates[key] + if ok && currState.pod.Spec.NodeName != pod.Spec.NodeName { + return fmt.Errorf("pod %v(%v) was assumed on %v but assigned to %v", key, klog.KObj(pod), pod.Spec.NodeName, currState.pod.Spec.NodeName) + } + + // Only assumed pod can be forgotten. + if ok && cache.assumedPods.Has(key) { + return cache.removePod(pod) + } + return fmt.Errorf("pod %v(%v) wasn't assumed so cannot be forgotten", key, klog.KObj(pod)) +} + +// Assumes that lock is already acquired. +func (cache *cacheImpl) addPod(pod *corev1.Pod, assumePod bool) error { + key, err := framework.GetPodKey(pod) + if err != nil { + return err + } + n, ok := cache.nodes[pod.Spec.NodeName] + if !ok { + n = newNodeInfoListItem(framework.NewNodeInfo()) + cache.nodes[pod.Spec.NodeName] = n + } + n.info.AddPod(pod) + cache.moveNodeInfoToHead(pod.Spec.NodeName) + ps := &podState{ + pod: pod, + } + cache.podStates[key] = ps + if assumePod { + cache.assumedPods.Insert(key) + } + return nil +} + +// Assumes that lock is already acquired. +func (cache *cacheImpl) updatePod(oldPod, newPod *corev1.Pod) error { + if err := cache.removePod(oldPod); err != nil { + return err + } + return cache.addPod(newPod, false) +} + +// Assumes that lock is already acquired. +// Removes a pod from the cached node info. If the node information was already +// removed and there are no more pods left in the node, cleans up the node from +// the cache. +func (cache *cacheImpl) removePod(pod *corev1.Pod) error { + key, err := framework.GetPodKey(pod) + if err != nil { + return err + } + + n, ok := cache.nodes[pod.Spec.NodeName] + if !ok { + klog.ErrorS(nil, "Node not found when trying to remove pod", "node", klog.KRef("", pod.Spec.NodeName), "podKey", key, "pod", klog.KObj(pod)) + } else { + if err := n.info.RemovePod(pod); err != nil { + return err + } + if len(n.info.Pods) == 0 && n.info.Node() == nil { + cache.removeNodeInfoFromList(pod.Spec.NodeName) + } else { + cache.moveNodeInfoToHead(pod.Spec.NodeName) + } + } + + delete(cache.podStates, key) + delete(cache.assumedPods, key) + return nil +} + +func (cache *cacheImpl) AddPod(pod *corev1.Pod) error { + key, err := framework.GetPodKey(pod) + if err != nil { + return err + } + + cache.mu.Lock() + defer cache.mu.Unlock() + + currState, ok := cache.podStates[key] + switch { + case ok && cache.assumedPods.Has(key): + // When assuming, we've already added the Pod to cache, + // Just update here to make sure the Pod's status is up-to-date. + if err = cache.updatePod(currState.pod, pod); err != nil { + klog.ErrorS(err, "Error occurred while updating pod") + } + if currState.pod.Spec.NodeName != pod.Spec.NodeName { + // The pod was added to a different node than it was assumed to. + klog.InfoS("Pod was added to a different node than it was assumed", "podKey", key, "pod", klog.KObj(pod), "assumedNode", klog.KRef("", pod.Spec.NodeName), "currentNode", klog.KRef("", currState.pod.Spec.NodeName)) + return nil + } + case !ok: + // Pod was expired. We should add it back. + if err = cache.addPod(pod, false); err != nil { + klog.ErrorS(err, "Error occurred while adding pod") + } + default: + return fmt.Errorf("pod %v(%v) was already in added state", key, klog.KObj(pod)) + } + return nil +} + +func (cache *cacheImpl) UpdatePod(oldPod, newPod *corev1.Pod) error { + key, err := framework.GetPodKey(oldPod) + if err != nil { + return err + } + + cache.mu.Lock() + defer cache.mu.Unlock() + + currState, ok := cache.podStates[key] + // An assumed pod won't have Update/Remove event. It needs to have Add event + // before Update event, in which case the state would change from Assumed to Added. + if ok && !cache.assumedPods.Has(key) { + if currState.pod.Spec.NodeName != newPod.Spec.NodeName { + klog.ErrorS(nil, "Pod updated on a different node than previously added to", "podKey", key, "pod", klog.KObj(oldPod)) + klog.ErrorS(nil, "scheduler cache is corrupted and can badly affect scheduling decisions") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + return cache.updatePod(oldPod, newPod) + } + return fmt.Errorf("pod %v(%v) is not added to scheduler cache, so cannot be updated", key, klog.KObj(oldPod)) +} + +func (cache *cacheImpl) RemovePod(pod *corev1.Pod) error { + key, err := framework.GetPodKey(pod) + if err != nil { + return err + } + + cache.mu.Lock() + defer cache.mu.Unlock() + + currState, ok := cache.podStates[key] + if !ok { + return fmt.Errorf("pod %v(%v) is not found in scheduler cache, so cannot be removed from it", key, klog.KObj(pod)) + } + if currState.pod.Spec.NodeName != pod.Spec.NodeName { + klog.ErrorS(nil, "Pod was added to a different node than it was assumed", "podKey", key, "pod", klog.KObj(pod), "assumedNode", klog.KRef("", pod.Spec.NodeName), "currentNode", klog.KRef("", currState.pod.Spec.NodeName)) + if pod.Spec.NodeName != "" { + // An empty NodeName is possible when the scheduler misses a Delete + // event and it gets the last known state from the informer cache. + klog.ErrorS(nil, "scheduler cache is corrupted and can badly affect scheduling decisions") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + } + return cache.removePod(currState.pod) +} + +func (cache *cacheImpl) IsAssumedPod(pod *corev1.Pod) (bool, error) { + key, err := framework.GetPodKey(pod) + if err != nil { + return false, err + } + + cache.mu.RLock() + defer cache.mu.RUnlock() + + return cache.assumedPods.Has(key), nil +} + +// GetPod might return a pod for which its node has already been deleted from +// the main cache. This is useful to properly process pod update events. +func (cache *cacheImpl) GetPod(pod *corev1.Pod) (*corev1.Pod, error) { + key, err := framework.GetPodKey(pod) + if err != nil { + return nil, err + } + + cache.mu.RLock() + defer cache.mu.RUnlock() + + podState, ok := cache.podStates[key] + if !ok { + return nil, fmt.Errorf("pod %v(%v) does not exist in scheduler cache", key, klog.KObj(pod)) + } + + return podState.pod, nil +} + +func (cache *cacheImpl) AddNode(node *corev1.Node) *framework.NodeInfo { + cache.mu.Lock() + defer cache.mu.Unlock() + + n, ok := cache.nodes[node.Name] + if !ok { + n = newNodeInfoListItem(framework.NewNodeInfo()) + cache.nodes[node.Name] = n + } else { + cache.removeNodeImageStates(n.info.Node()) + } + cache.moveNodeInfoToHead(node.Name) + + cache.nodeTree.addNode(node) + cache.addNodeImageStates(node, n.info) + n.info.SetNode(node) + return n.info.Clone() +} + +func (cache *cacheImpl) UpdateNode(oldNode, newNode *corev1.Node) *framework.NodeInfo { + cache.mu.Lock() + defer cache.mu.Unlock() + + n, ok := cache.nodes[newNode.Name] + if !ok { + n = newNodeInfoListItem(framework.NewNodeInfo()) + cache.nodes[newNode.Name] = n + cache.nodeTree.addNode(newNode) + } else { + cache.removeNodeImageStates(n.info.Node()) + } + cache.moveNodeInfoToHead(newNode.Name) + + cache.nodeTree.updateNode(oldNode, newNode) + cache.addNodeImageStates(newNode, n.info) + n.info.SetNode(newNode) + return n.info.Clone() +} + +// RemoveNode removes a node from the cache's tree. +// The node might still have pods because their deletion events didn't arrive +// yet. Those pods are considered removed from the cache, being the node tree +// the source of truth. +// However, we keep a ghost node with the list of pods until all pod deletion +// events have arrived. A ghost node is skipped from snapshots. +func (cache *cacheImpl) RemoveNode(node *corev1.Node) error { + cache.mu.Lock() + defer cache.mu.Unlock() + + n, ok := cache.nodes[node.Name] + if !ok { + return fmt.Errorf("node %v is not found", node.Name) + } + n.info.RemoveNode() + // We remove NodeInfo for this node only if there aren't any pods on this node. + // We can't do it unconditionally, because notifications about pods are delivered + // in a different watch, and thus can potentially be observed later, even though + // they happened before node removal. + if len(n.info.Pods) == 0 { + cache.removeNodeInfoFromList(node.Name) + } else { + cache.moveNodeInfoToHead(node.Name) + } + if err := cache.nodeTree.removeNode(node); err != nil { + return err + } + cache.removeNodeImageStates(node) + return nil +} + +// addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in +// scheduler cache. This function assumes the lock to scheduler cache has been acquired. +func (cache *cacheImpl) addNodeImageStates(node *corev1.Node, nodeInfo *framework.NodeInfo) { + newSum := make(map[string]*framework.ImageStateSummary) + + for _, image := range node.Status.Images { + for _, name := range image.Names { + // update the entry in imageStates + state, ok := cache.imageStates[name] + if !ok { + state = &imageState{ + size: image.SizeBytes, + nodes: sets.NewString(node.Name), + } + cache.imageStates[name] = state + } else { + state.nodes.Insert(node.Name) + } + // create the imageStateSummary for this image + if _, ok := newSum[name]; !ok { + newSum[name] = cache.createImageStateSummary(state) + } + } + } + nodeInfo.ImageStates = newSum +} + +// removeNodeImageStates removes the given node record from image entries having the node +// in imageStates cache. After the removal, if any image becomes free, i.e., the image +// is no longer available on any node, the image entry will be removed from imageStates. +func (cache *cacheImpl) removeNodeImageStates(node *corev1.Node) { + if node == nil { + return + } + + for _, image := range node.Status.Images { + for _, name := range image.Names { + state, ok := cache.imageStates[name] + if ok { + state.nodes.Delete(node.Name) + if len(state.nodes) == 0 { + // Remove the unused image to make sure the length of + // imageStates represents the total number of different + // images on all nodes + delete(cache.imageStates, name) + } + } + } + } +} + +func (cache *cacheImpl) run() { + go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop) +} + +func (cache *cacheImpl) cleanupExpiredAssumedPods() { + cache.cleanupAssumedPods(time.Now()) +} + +// cleanupAssumedPods exists for making test deterministic by taking time as input argument. +// It also reports metrics on the cache size for nodes, pods, and assumed pods. +func (cache *cacheImpl) cleanupAssumedPods(now time.Time) { + cache.mu.Lock() + defer cache.mu.Unlock() + + // The size of assumedPods should be small + for key := range cache.assumedPods { + ps, ok := cache.podStates[key] + if !ok { + klog.ErrorS(nil, "Key found in assumed set but not in podStates, potentially a logical error") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + if !ps.bindingFinished { + klog.V(5).InfoS("Could not expire cache for pod as binding is still in progress", "podKey", key, "pod", klog.KObj(ps.pod)) + continue + } + if cache.ttl != 0 && now.After(*ps.deadline) { + klog.InfoS("Pod expired", "podKey", key, "pod", klog.KObj(ps.pod)) + if err := cache.removePod(ps.pod); err != nil { + klog.ErrorS(err, "ExpirePod failed", "podKey", key, "pod", klog.KObj(ps.pod)) + } + } + } +} diff --git a/pkg/util/lifted/scheduler/cache/interface.go b/pkg/util/lifted/scheduler/cache/interface.go new file mode 100644 index 000000000..7c162451a --- /dev/null +++ b/pkg/util/lifted/scheduler/cache/interface.go @@ -0,0 +1,127 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This code is directly lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package. +// For reference: +// https://github.com/kubernetes/kubernetes/blob/release-1.25/pkg/scheduler/internal/cache/interface.go + +package cache + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + + "github.com/karmada-io/karmada/pkg/util/lifted/scheduler/framework" +) + +// Cache collects pods' information and provides node-level aggregated information. +// It's intended for generic scheduler to do efficient lookup. +// Cache's operations are pod centric. It does incremental updates based on pod events. +// Pod events are sent via network. We don't have guaranteed delivery of all events: +// We use Reflector to list and watch from remote. +// Reflector might be slow and do a relist, which would lead to missing events. +// +// State Machine of a pod's events in scheduler's cache: +// +// +-------------------------------------------+ +----+ +// | Add | | | +// | | | | Update +// + Assume Add v v | +// +// Initial +--------> Assumed +------------+---> Added <--+ +// +// ^ + + | + +// | | | | | +// | | | Add | | Remove +// | | | | | +// | | | + | +// +----------------+ +-----------> Expired +----> Deleted +// Forget Expire +// +// Note that an assumed pod can expire, because if we haven't received Add event notifying us +// for a while, there might be some problems and we shouldn't keep the pod in cache anymore. +// +// Note that "Initial", "Expired", and "Deleted" pods do not actually exist in cache. +// Based on existing use cases, we are making the following assumptions: +// - No pod would be assumed twice +// - A pod could be added without going through scheduler. In this case, we will see Add but not Assume event. +// - If a pod wasn't added, it wouldn't be removed or updated. +// - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue, +// a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache. +type Cache interface { + // NodeCount returns the number of nodes in the cache. + // DO NOT use outside of tests. + NodeCount() int + + // PodCount returns the number of pods in the cache (including those from deleted nodes). + // DO NOT use outside of tests. + PodCount() (int, error) + + // AssumePod assumes a pod scheduled and aggregates the pod's information into its node. + // The implementation also decides the policy to expire pod before being confirmed (receiving Add event). + // After expiration, its information would be subtracted. + AssumePod(pod *corev1.Pod) error + + // FinishBinding signals that cache for assumed pod can be expired + FinishBinding(pod *corev1.Pod) error + + // ForgetPod removes an assumed pod from cache. + ForgetPod(pod *corev1.Pod) error + + // AddPod either confirms a pod if it's assumed, or adds it back if it's expired. + // If added back, the pod's information would be added again. + AddPod(pod *corev1.Pod) error + + // UpdatePod removes oldPod's information and adds newPod's information. + UpdatePod(oldPod, newPod *corev1.Pod) error + + // RemovePod removes a pod. The pod's information would be subtracted from assigned node. + RemovePod(pod *corev1.Pod) error + + // GetPod returns the pod from the cache with the same namespace and the + // same name of the specified pod. + GetPod(pod *corev1.Pod) (*corev1.Pod, error) + + // IsAssumedPod returns true if the pod is assumed and not expired. + IsAssumedPod(pod *corev1.Pod) (bool, error) + + // AddNode adds overall information about node. + // It returns a clone of added NodeInfo object. + AddNode(node *corev1.Node) *framework.NodeInfo + + // UpdateNode updates overall information about node. + // It returns a clone of updated NodeInfo object. + UpdateNode(oldNode, newNode *corev1.Node) *framework.NodeInfo + + // RemoveNode removes overall information about node. + RemoveNode(node *corev1.Node) error + + // UpdateSnapshot updates the passed infoSnapshot to the current contents of Cache. + // The node info contains aggregated information of pods scheduled (including assumed to be) + // on this node. + // The snapshot only includes Nodes that are not deleted at the time this function is called. + // nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot. + UpdateSnapshot(nodeSnapshot *Snapshot) error + + // Dump produces a dump of the current cache. + Dump() *Dump +} + +// Dump is a dump of the cache state. +type Dump struct { + AssumedPods sets.String + Nodes map[string]*framework.NodeInfo +} diff --git a/pkg/util/lifted/scheduler/cache/node_tree.go b/pkg/util/lifted/scheduler/cache/node_tree.go new file mode 100644 index 000000000..f5ae8943f --- /dev/null +++ b/pkg/util/lifted/scheduler/cache/node_tree.go @@ -0,0 +1,147 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This code is directly lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package. +// For reference: +// https://github.com/kubernetes/kubernetes/blob/release-1.25/pkg/scheduler/internal/cache/node_tree.go + +package cache + +import ( + "errors" + "fmt" + + corev1 "k8s.io/api/core/v1" + utilnode "k8s.io/component-helpers/node/topology" + "k8s.io/klog/v2" +) + +// nodeTree is a tree-like data structure that holds node names in each zone. Zone names are +// keys to "NodeTree.tree" and values of "NodeTree.tree" are arrays of node names. +// NodeTree is NOT thread-safe, any concurrent updates/reads from it must be synchronized by the caller. +// It is used only by schedulerCache, and should stay as such. +type nodeTree struct { + tree map[string][]string // a map from zone (region-zone) to an array of nodes in the zone. + zones []string // a list of all the zones in the tree (keys) + numNodes int +} + +// newNodeTree creates a NodeTree from nodes. +func newNodeTree(nodes []*corev1.Node) *nodeTree { + nt := &nodeTree{ + tree: make(map[string][]string, len(nodes)), + } + for _, n := range nodes { + nt.addNode(n) + } + return nt +} + +// addNode adds a node and its corresponding zone to the tree. If the zone already exists, the node +// is added to the array of nodes in that zone. +func (nt *nodeTree) addNode(n *corev1.Node) { + zone := utilnode.GetZoneKey(n) + if na, ok := nt.tree[zone]; ok { + for _, nodeName := range na { + if nodeName == n.Name { + klog.InfoS("Node already exists in the NodeTree", "node", klog.KObj(n)) + return + } + } + nt.tree[zone] = append(na, n.Name) + } else { + nt.zones = append(nt.zones, zone) + nt.tree[zone] = []string{n.Name} + } + klog.V(2).InfoS("Added node in listed group to NodeTree", "node", klog.KObj(n), "zone", zone) + nt.numNodes++ +} + +// removeNode removes a node from the NodeTree. +func (nt *nodeTree) removeNode(n *corev1.Node) error { + zone := utilnode.GetZoneKey(n) + if na, ok := nt.tree[zone]; ok { + for i, nodeName := range na { + if nodeName == n.Name { + nt.tree[zone] = append(na[:i], na[i+1:]...) + if len(nt.tree[zone]) == 0 { + nt.removeZone(zone) + } + klog.V(2).InfoS("Removed node in listed group from NodeTree", "node", klog.KObj(n), "zone", zone) + nt.numNodes-- + return nil + } + } + } + klog.ErrorS(nil, "Node in listed group was not found", "node", klog.KObj(n), "zone", zone) + return fmt.Errorf("node %q in group %q was not found", n.Name, zone) +} + +// removeZone removes a zone from tree. +// This function must be called while writer locks are hold. +func (nt *nodeTree) removeZone(zone string) { + delete(nt.tree, zone) + for i, z := range nt.zones { + if z == zone { + nt.zones = append(nt.zones[:i], nt.zones[i+1:]...) + return + } + } +} + +// updateNode updates a node in the NodeTree. +func (nt *nodeTree) updateNode(old, new *corev1.Node) { + var oldZone string + if old != nil { + oldZone = utilnode.GetZoneKey(old) + } + newZone := utilnode.GetZoneKey(new) + // If the zone ID of the node has not changed, we don't need to do anything. Name of the node + // cannot be changed in an update. + if oldZone == newZone { + return + } + _ = nt.removeNode(old) // No error checking. We ignore whether the old node exists or not. + nt.addNode(new) +} + +// list returns the list of names of the node. NodeTree iterates over zones and in each zone iterates +// over nodes in a round robin fashion. +func (nt *nodeTree) list() ([]string, error) { + if len(nt.zones) == 0 { + return nil, nil + } + nodesList := make([]string, 0, nt.numNodes) + numExhaustedZones := 0 + nodeIndex := 0 + for len(nodesList) < nt.numNodes { + if numExhaustedZones >= len(nt.zones) { // all zones are exhausted. + return nodesList, errors.New("all zones exhausted before reaching count of nodes expected") + } + for zoneIndex := 0; zoneIndex < len(nt.zones); zoneIndex++ { + na := nt.tree[nt.zones[zoneIndex]] + if nodeIndex >= len(na) { // If the zone is exhausted, continue + if nodeIndex == len(na) { // If it is the first time the zone is exhausted + numExhaustedZones++ + } + continue + } + nodesList = append(nodesList, na[nodeIndex]) + } + nodeIndex++ + } + return nodesList, nil +} diff --git a/pkg/util/lifted/scheduler/cache/snapshot.go b/pkg/util/lifted/scheduler/cache/snapshot.go new file mode 100644 index 000000000..5b31adc91 --- /dev/null +++ b/pkg/util/lifted/scheduler/cache/snapshot.go @@ -0,0 +1,205 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This code is directly lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package. +// For reference: +// https://github.com/kubernetes/kubernetes/blob/release-1.25/pkg/scheduler/internal/cache/snapshot.go + +package cache + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + + "github.com/karmada-io/karmada/pkg/util/lifted/scheduler/framework" +) + +// Snapshot is a snapshot of cache NodeInfo and NodeTree order. The scheduler takes a +// snapshot at the beginning of each scheduling cycle and uses it for its operations in that cycle. +type Snapshot struct { + // nodeInfoMap a map of node name to a snapshot of its NodeInfo. + nodeInfoMap map[string]*framework.NodeInfo + // nodeInfoList is the list of nodes as ordered in the cache's nodeTree. + nodeInfoList []*framework.NodeInfo + // havePodsWithAffinityNodeInfoList is the list of nodes with at least one pod declaring affinity terms. + havePodsWithAffinityNodeInfoList []*framework.NodeInfo + // havePodsWithRequiredAntiAffinityNodeInfoList is the list of nodes with at least one pod declaring + // required anti-affinity terms. + havePodsWithRequiredAntiAffinityNodeInfoList []*framework.NodeInfo + // usedPVCSet contains a set of PVC names that have one or more scheduled pods using them, + // keyed in the format "namespace/name". + usedPVCSet sets.String + generation int64 +} + +var _ framework.SharedLister = &Snapshot{} + +// NewEmptySnapshot initializes a Snapshot struct and returns it. +func NewEmptySnapshot() *Snapshot { + return &Snapshot{ + nodeInfoMap: make(map[string]*framework.NodeInfo), + usedPVCSet: sets.NewString(), + } +} + +// NewSnapshot initializes a Snapshot struct and returns it. +func NewSnapshot(pods []*corev1.Pod, nodes []*corev1.Node) *Snapshot { + nodeInfoMap := createNodeInfoMap(pods, nodes) + nodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap)) + havePodsWithAffinityNodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap)) + havePodsWithRequiredAntiAffinityNodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap)) + for _, v := range nodeInfoMap { + nodeInfoList = append(nodeInfoList, v) + if len(v.PodsWithAffinity) > 0 { + havePodsWithAffinityNodeInfoList = append(havePodsWithAffinityNodeInfoList, v) + } + if len(v.PodsWithRequiredAntiAffinity) > 0 { + havePodsWithRequiredAntiAffinityNodeInfoList = append(havePodsWithRequiredAntiAffinityNodeInfoList, v) + } + } + + s := NewEmptySnapshot() + s.nodeInfoMap = nodeInfoMap + s.nodeInfoList = nodeInfoList + s.havePodsWithAffinityNodeInfoList = havePodsWithAffinityNodeInfoList + s.havePodsWithRequiredAntiAffinityNodeInfoList = havePodsWithRequiredAntiAffinityNodeInfoList + s.usedPVCSet = createUsedPVCSet(pods) + + return s +} + +// createNodeInfoMap obtains a list of pods and pivots that list into a map +// where the keys are node names and the values are the aggregated information +// for that node. +func createNodeInfoMap(pods []*corev1.Pod, nodes []*corev1.Node) map[string]*framework.NodeInfo { + nodeNameToInfo := make(map[string]*framework.NodeInfo) + for _, pod := range pods { + nodeName := pod.Spec.NodeName + if _, ok := nodeNameToInfo[nodeName]; !ok { + nodeNameToInfo[nodeName] = framework.NewNodeInfo() + } + nodeNameToInfo[nodeName].AddPod(pod) + } + imageExistenceMap := createImageExistenceMap(nodes) + + for _, node := range nodes { + if _, ok := nodeNameToInfo[node.Name]; !ok { + nodeNameToInfo[node.Name] = framework.NewNodeInfo() + } + nodeInfo := nodeNameToInfo[node.Name] + nodeInfo.SetNode(node) + nodeInfo.ImageStates = getNodeImageStates(node, imageExistenceMap) + } + return nodeNameToInfo +} + +func createUsedPVCSet(pods []*corev1.Pod) sets.String { + usedPVCSet := sets.NewString() + for _, pod := range pods { + if pod.Spec.NodeName == "" { + continue + } + + for _, v := range pod.Spec.Volumes { + if v.PersistentVolumeClaim == nil { + continue + } + + key := framework.GetNamespacedName(pod.Namespace, v.PersistentVolumeClaim.ClaimName) + usedPVCSet.Insert(key) + } + } + return usedPVCSet +} + +// getNodeImageStates returns the given node's image states based on the given imageExistence map. +func getNodeImageStates(node *corev1.Node, imageExistenceMap map[string]sets.String) map[string]*framework.ImageStateSummary { + imageStates := make(map[string]*framework.ImageStateSummary) + + for _, image := range node.Status.Images { + for _, name := range image.Names { + imageStates[name] = &framework.ImageStateSummary{ + Size: image.SizeBytes, + NumNodes: len(imageExistenceMap[name]), + } + } + } + return imageStates +} + +// createImageExistenceMap returns a map recording on which nodes the images exist, keyed by the images' names. +func createImageExistenceMap(nodes []*corev1.Node) map[string]sets.String { + imageExistenceMap := make(map[string]sets.String) + for _, node := range nodes { + for _, image := range node.Status.Images { + for _, name := range image.Names { + if _, ok := imageExistenceMap[name]; !ok { + imageExistenceMap[name] = sets.NewString(node.Name) + } else { + imageExistenceMap[name].Insert(node.Name) + } + } + } + } + return imageExistenceMap +} + +// NodeInfos returns a NodeInfoLister. +func (s *Snapshot) NodeInfos() framework.NodeInfoLister { + return s +} + +// StorageInfos returns a StorageInfoLister. +func (s *Snapshot) StorageInfos() framework.StorageInfoLister { + return s +} + +// NumNodes returns the number of nodes in the snapshot. +func (s *Snapshot) NumNodes() int { + return len(s.nodeInfoList) +} + +// List returns the list of nodes in the snapshot. +func (s *Snapshot) List() ([]*framework.NodeInfo, error) { + return s.nodeInfoList, nil +} + +// HavePodsWithAffinityList returns the list of nodes with at least one pod with inter-pod affinity +func (s *Snapshot) HavePodsWithAffinityList() ([]*framework.NodeInfo, error) { + return s.havePodsWithAffinityNodeInfoList, nil +} + +// HavePodsWithRequiredAntiAffinityList returns the list of nodes with at least one pod with +// required inter-pod anti-affinity +func (s *Snapshot) HavePodsWithRequiredAntiAffinityList() ([]*framework.NodeInfo, error) { + return s.havePodsWithRequiredAntiAffinityNodeInfoList, nil +} + +// Get returns the NodeInfo of the given node name. +func (s *Snapshot) Get(nodeName string) (*framework.NodeInfo, error) { + if v, ok := s.nodeInfoMap[nodeName]; ok && v.Node() != nil { + return v, nil + } + return nil, fmt.Errorf("nodeinfo not found for node name %q", nodeName) +} + +// IsPVCUsedByPods returns true/false on whether the PVC is used by one or more scheduled pods, +// keyed in the format "namespace/name". +func (s *Snapshot) IsPVCUsedByPods(key string) bool { + return s.usedPVCSet.Has(key) +} diff --git a/pkg/util/lifted/scheduler/framework/listers.go b/pkg/util/lifted/scheduler/framework/listers.go new file mode 100644 index 000000000..a44cea4ce --- /dev/null +++ b/pkg/util/lifted/scheduler/framework/listers.go @@ -0,0 +1,46 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This code is directly lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package. +// For reference: +// https://github.com/kubernetes/kubernetes/blob/release-1.25/pkg/scheduler/framework/listers.go + +package framework + +// NodeInfoLister interface represents anything that can list/get NodeInfo objects from node name. +type NodeInfoLister interface { + // List returns the list of NodeInfos. + List() ([]*NodeInfo, error) + // HavePodsWithAffinityList returns the list of NodeInfos of nodes with pods with affinity terms. + HavePodsWithAffinityList() ([]*NodeInfo, error) + // HavePodsWithRequiredAntiAffinityList returns the list of NodeInfos of nodes with pods with required anti-affinity terms. + HavePodsWithRequiredAntiAffinityList() ([]*NodeInfo, error) + // Get returns the NodeInfo of the given node name. + Get(nodeName string) (*NodeInfo, error) +} + +// StorageInfoLister interface represents anything that handles storage-related operations and resources. +type StorageInfoLister interface { + // IsPVCUsedByPods returns true/false on whether the PVC is used by one or more scheduled pods, + // keyed in the format "namespace/name". + IsPVCUsedByPods(key string) bool +} + +// SharedLister groups scheduler-specific listers. +type SharedLister interface { + NodeInfos() NodeInfoLister + StorageInfos() StorageInfoLister +} diff --git a/pkg/util/lifted/parallelism.go b/pkg/util/lifted/scheduler/framework/parallelize/parallelism.go similarity index 98% rename from pkg/util/lifted/parallelism.go rename to pkg/util/lifted/scheduler/framework/parallelize/parallelism.go index f0c930dd1..1d171641c 100644 --- a/pkg/util/lifted/parallelism.go +++ b/pkg/util/lifted/scheduler/framework/parallelize/parallelism.go @@ -18,7 +18,7 @@ limitations under the License. // For reference: // https://github.com/kubernetes/kubernetes/blob/release-1.23/pkg/scheduler/framework/parallelize/parallelism.go -package lifted +package parallelize import ( "context" diff --git a/pkg/util/lifted/parallelism_test.go b/pkg/util/lifted/scheduler/framework/parallelize/parallelism_test.go similarity index 98% rename from pkg/util/lifted/parallelism_test.go rename to pkg/util/lifted/scheduler/framework/parallelize/parallelism_test.go index 3f7904a8b..3f30dc5a5 100644 --- a/pkg/util/lifted/parallelism_test.go +++ b/pkg/util/lifted/scheduler/framework/parallelize/parallelism_test.go @@ -18,7 +18,7 @@ limitations under the License. // For reference: // https://github.com/kubernetes/kubernetes/blob/release-1.23/pkg/scheduler/framework/parallelize/parallelism_test.go -package lifted +package parallelize import ( "fmt" diff --git a/pkg/util/lifted/scheduler/framework/types.go b/pkg/util/lifted/scheduler/framework/types.go new file mode 100644 index 000000000..6d9a6009a --- /dev/null +++ b/pkg/util/lifted/scheduler/framework/types.go @@ -0,0 +1,770 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This code is directly lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package. +// For reference: +// https://github.com/kubernetes/kubernetes/blob/release-1.25/pkg/scheduler/framework/types.go + +package framework + +import ( + "errors" + "fmt" + "sync/atomic" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + + "github.com/karmada-io/karmada/pkg/util" + schedutil "github.com/karmada-io/karmada/pkg/util/lifted/scheduler/util" +) + +var generation int64 + +// ActionType is an integer to represent one type of resource change. +// Different ActionTypes can be bit-wised to compose new semantics. +type ActionType int64 + +// Constants for ActionTypes. +const ( + Add ActionType = 1 << iota // 1 + Delete // 10 + // UpdateNodeXYZ is only applicable for Node events. + UpdateNodeAllocatable // 100 + UpdateNodeLabel // 1000 + UpdateNodeTaint // 10000 + UpdateNodeCondition // 100000 + + All ActionType = 1< 0 { + clone.Pods = append([]*PodInfo(nil), n.Pods...) + } + if len(n.UsedPorts) > 0 { + // HostPortInfo is a map-in-map struct + // make sure it's deep copied + for ip, portMap := range n.UsedPorts { + clone.UsedPorts[ip] = make(map[ProtocolPort]struct{}) + for protocolPort, v := range portMap { + clone.UsedPorts[ip][protocolPort] = v + } + } + } + if len(n.PodsWithAffinity) > 0 { + clone.PodsWithAffinity = append([]*PodInfo(nil), n.PodsWithAffinity...) + } + if len(n.PodsWithRequiredAntiAffinity) > 0 { + clone.PodsWithRequiredAntiAffinity = append([]*PodInfo(nil), n.PodsWithRequiredAntiAffinity...) + } + for key, value := range n.PVCRefCounts { + clone.PVCRefCounts[key] = value + } + return clone +} + +// String returns representation of human readable format of this NodeInfo. +func (n *NodeInfo) String() string { + podKeys := make([]string, len(n.Pods)) + for i, p := range n.Pods { + podKeys[i] = p.Pod.Name + } + return fmt.Sprintf("&NodeInfo{Pods:%v, RequestedResource:%#v, NonZeroRequest: %#v, UsedPort: %#v, AllocatableResource:%#v}", + podKeys, n.Requested, n.NonZeroRequested, n.UsedPorts, n.Allocatable) +} + +// AddPodInfo adds pod information to this NodeInfo. +// Consider using this instead of AddPod if a PodInfo is already computed. +func (n *NodeInfo) AddPodInfo(podInfo *PodInfo) { + n.Pods = append(n.Pods, podInfo) + if podWithAffinity(podInfo.Pod) { + n.PodsWithAffinity = append(n.PodsWithAffinity, podInfo) + } + if podWithRequiredAntiAffinity(podInfo.Pod) { + n.PodsWithRequiredAntiAffinity = append(n.PodsWithRequiredAntiAffinity, podInfo) + } + n.update(podInfo.Pod, 1) +} + +// AddPod is a wrapper around AddPodInfo. +func (n *NodeInfo) AddPod(pod *corev1.Pod) { + n.AddPodInfo(NewPodInfo(pod)) +} + +func podWithAffinity(p *corev1.Pod) bool { + affinity := p.Spec.Affinity + return affinity != nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil) +} + +func podWithRequiredAntiAffinity(p *corev1.Pod) bool { + affinity := p.Spec.Affinity + return affinity != nil && affinity.PodAntiAffinity != nil && + len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 +} + +func removeFromSlice(s []*PodInfo, k string) []*PodInfo { + for i := range s { + k2, err := GetPodKey(s[i].Pod) + if err != nil { + klog.ErrorS(err, "Cannot get pod key", "pod", klog.KObj(s[i].Pod)) + continue + } + if k == k2 { + // delete the element + s[i] = s[len(s)-1] + s = s[:len(s)-1] + break + } + } + return s +} + +// RemovePod subtracts pod information from this NodeInfo. +func (n *NodeInfo) RemovePod(pod *corev1.Pod) error { + k, err := GetPodKey(pod) + if err != nil { + return err + } + if podWithAffinity(pod) { + n.PodsWithAffinity = removeFromSlice(n.PodsWithAffinity, k) + } + if podWithRequiredAntiAffinity(pod) { + n.PodsWithRequiredAntiAffinity = removeFromSlice(n.PodsWithRequiredAntiAffinity, k) + } + + for i := range n.Pods { + k2, err := GetPodKey(n.Pods[i].Pod) + if err != nil { + klog.ErrorS(err, "Cannot get pod key", "pod", klog.KObj(n.Pods[i].Pod)) + continue + } + if k == k2 { + // delete the element + n.Pods[i] = n.Pods[len(n.Pods)-1] + n.Pods = n.Pods[:len(n.Pods)-1] + + n.update(pod, -1) + n.resetSlicesIfEmpty() + return nil + } + } + return fmt.Errorf("no corresponding pod %s in pods of node %s", pod.Name, n.node.Name) +} + +// update node info based on the pod and sign. +// The sign will be set to `+1` when AddPod and to `-1` when RemovePod. +func (n *NodeInfo) update(pod *corev1.Pod, sign int64) { + res, non0CPU, non0Mem := calculateResource(pod) + n.Requested.MilliCPU += sign * res.MilliCPU + n.Requested.Memory += sign * res.Memory + n.Requested.EphemeralStorage += sign * res.EphemeralStorage + if n.Requested.ScalarResources == nil && len(res.ScalarResources) > 0 { + n.Requested.ScalarResources = map[corev1.ResourceName]int64{} + } + for rName, rQuant := range res.ScalarResources { + n.Requested.ScalarResources[rName] += sign * rQuant + } + n.NonZeroRequested.MilliCPU += sign * non0CPU + n.NonZeroRequested.Memory += sign * non0Mem + + // Consume ports when pod added or release ports when pod removed. + n.updateUsedPorts(pod, sign > 0) + n.updatePVCRefCounts(pod, sign > 0) + + n.Generation = nextGeneration() +} + +// resets the slices to nil so that we can do DeepEqual in unit tests. +func (n *NodeInfo) resetSlicesIfEmpty() { + if len(n.PodsWithAffinity) == 0 { + n.PodsWithAffinity = nil + } + if len(n.PodsWithRequiredAntiAffinity) == 0 { + n.PodsWithRequiredAntiAffinity = nil + } + if len(n.Pods) == 0 { + n.Pods = nil + } +} + +func max(a, b int64) int64 { + if a >= b { + return a + } + return b +} + +// resourceRequest = max(sum(podSpec.Containers), podSpec.InitContainers) + overHead +func calculateResource(pod *corev1.Pod) (res util.Resource, non0CPU int64, non0Mem int64) { + resPtr := &res + for _, c := range pod.Spec.Containers { + resPtr.Add(c.Resources.Requests) + non0CPUReq, non0MemReq := schedutil.GetNonzeroRequests(&c.Resources.Requests) + non0CPU += non0CPUReq + non0Mem += non0MemReq + // No non-zero resources for GPUs or opaque resources. + } + + for _, ic := range pod.Spec.InitContainers { + resPtr.SetMaxResource(ic.Resources.Requests) + non0CPUReq, non0MemReq := schedutil.GetNonzeroRequests(&ic.Resources.Requests) + non0CPU = max(non0CPU, non0CPUReq) + non0Mem = max(non0Mem, non0MemReq) + } + + // If Overhead is being utilized, add to the total requests for the pod + if pod.Spec.Overhead != nil { + resPtr.Add(pod.Spec.Overhead) + if _, found := pod.Spec.Overhead[corev1.ResourceCPU]; found { + non0CPU += pod.Spec.Overhead.Cpu().MilliValue() + } + + if _, found := pod.Spec.Overhead[corev1.ResourceMemory]; found { + non0Mem += pod.Spec.Overhead.Memory().Value() + } + } + + return +} + +// updateUsedPorts updates the UsedPorts of NodeInfo. +func (n *NodeInfo) updateUsedPorts(pod *corev1.Pod, add bool) { + for _, container := range pod.Spec.Containers { + for _, podPort := range container.Ports { + if add { + n.UsedPorts.Add(podPort.HostIP, string(podPort.Protocol), podPort.HostPort) + } else { + n.UsedPorts.Remove(podPort.HostIP, string(podPort.Protocol), podPort.HostPort) + } + } + } +} + +// updatePVCRefCounts updates the PVCRefCounts of NodeInfo. +func (n *NodeInfo) updatePVCRefCounts(pod *corev1.Pod, add bool) { + for _, v := range pod.Spec.Volumes { + if v.PersistentVolumeClaim == nil { + continue + } + + key := GetNamespacedName(pod.Namespace, v.PersistentVolumeClaim.ClaimName) + if add { + n.PVCRefCounts[key]++ + } else { + n.PVCRefCounts[key]-- + if n.PVCRefCounts[key] <= 0 { + delete(n.PVCRefCounts, key) + } + } + } +} + +// SetNode sets the overall node information. +func (n *NodeInfo) SetNode(node *corev1.Node) { + n.node = node + n.Allocatable = util.NewResource(node.Status.Allocatable) + n.Generation = nextGeneration() +} + +// RemoveNode removes the node object, leaving all other tracking information. +func (n *NodeInfo) RemoveNode() { + n.node = nil + n.Generation = nextGeneration() +} + +// GetPodKey returns the string key of a pod. +func GetPodKey(pod *corev1.Pod) (string, error) { + uid := string(pod.UID) + if len(uid) == 0 { + return "", errors.New("cannot get cache key for pod with empty UID") + } + return uid, nil +} + +// GetNamespacedName returns the string format of a namespaced resource name. +func GetNamespacedName(namespace, name string) string { + return fmt.Sprintf("%s/%s", namespace, name) +} + +// DefaultBindAllHostIP defines the default ip address used to bind to all host. +const DefaultBindAllHostIP = "0.0.0.0" + +// ProtocolPort represents a protocol port pair, e.g. tcp:80. +type ProtocolPort struct { + Protocol string + Port int32 +} + +// NewProtocolPort creates a ProtocolPort instance. +func NewProtocolPort(protocol string, port int32) *ProtocolPort { + pp := &ProtocolPort{ + Protocol: protocol, + Port: port, + } + + if len(pp.Protocol) == 0 { + pp.Protocol = string(corev1.ProtocolTCP) + } + + return pp +} + +// HostPortInfo stores mapping from ip to a set of ProtocolPort +type HostPortInfo map[string]map[ProtocolPort]struct{} + +// Add adds (ip, protocol, port) to HostPortInfo +func (h HostPortInfo) Add(ip, protocol string, port int32) { + if port <= 0 { + return + } + + h.sanitize(&ip, &protocol) + + pp := NewProtocolPort(protocol, port) + if _, ok := h[ip]; !ok { + h[ip] = map[ProtocolPort]struct{}{ + *pp: {}, + } + return + } + + h[ip][*pp] = struct{}{} +} + +// Remove removes (ip, protocol, port) from HostPortInfo +func (h HostPortInfo) Remove(ip, protocol string, port int32) { + if port <= 0 { + return + } + + h.sanitize(&ip, &protocol) + + pp := NewProtocolPort(protocol, port) + if m, ok := h[ip]; ok { + delete(m, *pp) + if len(h[ip]) == 0 { + delete(h, ip) + } + } +} + +// Len returns the total number of (ip, protocol, port) tuple in HostPortInfo +func (h HostPortInfo) Len() int { + length := 0 + for _, m := range h { + length += len(m) + } + return length +} + +// CheckConflict checks if the input (ip, protocol, port) conflicts with the existing +// ones in HostPortInfo. +func (h HostPortInfo) CheckConflict(ip, protocol string, port int32) bool { + if port <= 0 { + return false + } + + h.sanitize(&ip, &protocol) + + pp := NewProtocolPort(protocol, port) + + // If ip is 0.0.0.0 check all IP's (protocol, port) pair + if ip == DefaultBindAllHostIP { + for _, m := range h { + if _, ok := m[*pp]; ok { + return true + } + } + return false + } + + // If ip isn't 0.0.0.0, only check IP and 0.0.0.0's (protocol, port) pair + for _, key := range []string{DefaultBindAllHostIP, ip} { + if m, ok := h[key]; ok { + if _, ok2 := m[*pp]; ok2 { + return true + } + } + } + + return false +} + +// sanitize the parameters +func (h HostPortInfo) sanitize(ip, protocol *string) { + if len(*ip) == 0 { + *ip = DefaultBindAllHostIP + } + if len(*protocol) == 0 { + *protocol = string(corev1.ProtocolTCP) + } +} diff --git a/pkg/util/lifted/scheduler/util/pod_resources.go b/pkg/util/lifted/scheduler/util/pod_resources.go new file mode 100644 index 000000000..3e3c5a6bc --- /dev/null +++ b/pkg/util/lifted/scheduler/util/pod_resources.go @@ -0,0 +1,82 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This code is directly lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package. +// For reference: +// https://github.com/kubernetes/kubernetes/blob/release-1.25/pkg/scheduler/util/pod_resources.go + +package util + +import ( + corev1 "k8s.io/api/core/v1" +) + +// For each of these resources, a pod that doesn't request the resource explicitly +// will be treated as having requested the amount indicated below, for the purpose +// of computing priority only. This ensures that when scheduling zero-request pods, such +// pods will not all be scheduled to the node with the smallest in-use request, +// and that when scheduling regular pods, such pods will not see zero-request pods as +// consuming no resources whatsoever. We chose these values to be similar to the +// resources that we give to cluster addon pods (#10653). But they are pretty arbitrary. +// As described in #11713, we use request instead of limit to deal with resource requirements. +const ( + // DefaultMilliCPURequest defines default milli cpu request number. + DefaultMilliCPURequest int64 = 100 // 0.1 core + // DefaultMemoryRequest defines default memory request size. + DefaultMemoryRequest int64 = 200 * 1024 * 1024 // 200 MB +) + +// GetNonzeroRequests returns the default cpu and memory resource request if none is found or +// what is provided on the request. +func GetNonzeroRequests(requests *corev1.ResourceList) (int64, int64) { + return GetRequestForResource(corev1.ResourceCPU, requests, true), + GetRequestForResource(corev1.ResourceMemory, requests, true) +} + +// GetRequestForResource returns the requested values unless nonZero is true and there is no defined request +// for CPU and memory. +// If nonZero is true and the resource has no defined request for CPU or memory, it returns a default value. +func GetRequestForResource(resource corev1.ResourceName, requests *corev1.ResourceList, nonZero bool) int64 { + if requests == nil { + return 0 + } + switch resource { + case corev1.ResourceCPU: + // Override if un-set, but not if explicitly set to zero + if _, found := (*requests)[corev1.ResourceCPU]; !found && nonZero { + return DefaultMilliCPURequest + } + return requests.Cpu().MilliValue() + case corev1.ResourceMemory: + // Override if un-set, but not if explicitly set to zero + if _, found := (*requests)[corev1.ResourceMemory]; !found && nonZero { + return DefaultMemoryRequest + } + return requests.Memory().Value() + case corev1.ResourceEphemeralStorage: + quantity, found := (*requests)[corev1.ResourceEphemeralStorage] + if !found { + return 0 + } + return quantity.Value() + default: + quantity, found := (*requests)[resource] + if !found { + return 0 + } + return quantity.Value() + } +}