add lifted files

Signed-off-by: Garrybest <garrybest@foxmail.com>
This commit is contained in:
Garrybest 2022-10-30 22:04:06 +08:00
parent 40becff2a1
commit f52043b447
11 changed files with 2303 additions and 3 deletions

View File

@ -0,0 +1,157 @@
// This code is mostly lifted from the Kubernetes codebase to establish the internal cache.
// https://github.com/kubernetes/kubernetes/blob/release-1.25/pkg/scheduler/eventhandlers.go
package server
import (
"fmt"
corev1 "k8s.io/api/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)
func addAllEventHandlers(es *AccurateSchedulerEstimatorServer, informerFactory informers.SharedInformerFactory) {
// scheduled pod cache
informerFactory.Core().V1().Pods().Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *corev1.Pod:
return assignedPod(t)
case cache.DeletedFinalStateUnknown:
if _, ok := t.Obj.(*corev1.Pod); ok {
// The carried object may be stale, so we don't use it to check if
// it's assigned or not. Attempting to cleanup anyways.
return true
}
utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod", obj))
return false
default:
utilruntime.HandleError(fmt.Errorf("unable to handle object: %T", obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: es.addPodToCache,
UpdateFunc: es.updatePodInCache,
DeleteFunc: es.deletePodFromCache,
},
},
)
informerFactory.Core().V1().Nodes().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: es.addNodeToCache,
UpdateFunc: es.updateNodeInCache,
DeleteFunc: es.deleteNodeFromCache,
},
)
}
func (es *AccurateSchedulerEstimatorServer) addPodToCache(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", obj)
return
}
klog.V(3).InfoS("Add event for scheduled pod", "pod", klog.KObj(pod))
if err := es.Cache.AddPod(pod); err != nil {
klog.ErrorS(err, "Estimator cache AddPod failed", "pod", klog.KObj(pod))
}
}
func (es *AccurateSchedulerEstimatorServer) updatePodInCache(oldObj, newObj interface{}) {
oldPod, ok := oldObj.(*corev1.Pod)
if !ok {
klog.ErrorS(nil, "Cannot convert oldObj to *v1.Pod", "oldObj", oldObj)
return
}
newPod, ok := newObj.(*corev1.Pod)
if !ok {
klog.ErrorS(nil, "Cannot convert newObj to *v1.Pod", "newObj", newObj)
return
}
klog.V(4).InfoS("Update event for scheduled pod", "pod", klog.KObj(oldPod))
if err := es.Cache.UpdatePod(oldPod, newPod); err != nil {
klog.ErrorS(err, "Estimator cache UpdatePod failed", "pod", klog.KObj(oldPod))
}
}
func (es *AccurateSchedulerEstimatorServer) deletePodFromCache(obj interface{}) {
var pod *corev1.Pod
switch t := obj.(type) {
case *corev1.Pod:
pod = t
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*corev1.Pod)
if !ok {
klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", t.Obj)
return
}
default:
klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", t)
return
}
klog.V(3).InfoS("Delete event for scheduled pod", "pod", klog.KObj(pod))
if err := es.Cache.RemovePod(pod); err != nil {
klog.ErrorS(err, "Estimator cache RemovePod failed", "pod", klog.KObj(pod))
}
}
func (es *AccurateSchedulerEstimatorServer) addNodeToCache(obj interface{}) {
node, ok := obj.(*corev1.Node)
if !ok {
klog.ErrorS(nil, "Cannot convert to *v1.Node", "obj", obj)
return
}
es.Cache.AddNode(node)
klog.V(3).InfoS("Add event for node", "node", klog.KObj(node))
}
func (es *AccurateSchedulerEstimatorServer) updateNodeInCache(oldObj, newObj interface{}) {
oldNode, ok := oldObj.(*corev1.Node)
if !ok {
klog.ErrorS(nil, "Cannot convert oldObj to *v1.Node", "oldObj", oldObj)
return
}
newNode, ok := newObj.(*corev1.Node)
if !ok {
klog.ErrorS(nil, "Cannot convert newObj to *v1.Node", "newObj", newObj)
return
}
es.Cache.UpdateNode(oldNode, newNode)
}
func (es *AccurateSchedulerEstimatorServer) deleteNodeFromCache(obj interface{}) {
var node *corev1.Node
switch t := obj.(type) {
case *corev1.Node:
node = t
case cache.DeletedFinalStateUnknown:
var ok bool
node, ok = t.Obj.(*corev1.Node)
if !ok {
klog.ErrorS(nil, "Cannot convert to *v1.Node", "obj", t.Obj)
return
}
default:
klog.ErrorS(nil, "Cannot convert to *v1.Node", "obj", t)
return
}
klog.V(3).InfoS("Delete event for node", "node", klog.KObj(node))
if err := es.Cache.RemoveNode(node); err != nil {
klog.ErrorS(err, "Scheduler cache RemoveNode failed")
}
}
// assignedPod selects pods that are assigned (scheduled and running).
func assignedPod(pod *corev1.Pod) bool {
return len(pod.Spec.NodeName) != 0
}

View File

@ -44,7 +44,6 @@ package lifted
| objectwatcher.go | https://github.com/kubernetes-sigs/kubefed/blob/master/pkg/controller/util/propagatedversion.go#L35-L43 | func ObjectVersion | N |
| objectwatcher.go | https://github.com/kubernetes-sigs/kubefed/blob/master/pkg/controller/util/propagatedversion.go#L45-L59 | func ObjectNeedsUpdate | N |
| objectwatcher.go | https://github.com/kubernetes-sigs/kubefed/blob/master/pkg/controller/util/meta.go#L63-L80 | func objectMetaObjEquivalent | Y |
| parallelism_test.go | https://github.com/kubernetes/kubernetes/blob/release-1.23/pkg/scheduler/framework/parallelize/parallelism_test.go | func TestChunkSize | N |
| podtemplate.go | https://github.com/kubernetes/kubernetes/blob/release-1.23/pkg/controller/controller_utils.go#L466-L472 | func getPodsLabelSet | N |
| podtemplate.go | https://github.com/kubernetes/kubernetes/blob/release-1.23/pkg/controller/controller_utils.go#L474-L478 | func getPodsFinalizers | N |
| podtemplate.go | https://github.com/kubernetes/kubernetes/blob/release-1.23/pkg/controller/controller_utils.go#L480-L486 | func getPodsAnnotationSet | N |

767
pkg/util/lifted/scheduler/cache/cache.go vendored Normal file
View File

@ -0,0 +1,767 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This code is directly lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package.
// For reference:
// https://github.com/kubernetes/kubernetes/blob/release-1.25/pkg/scheduler/internal/cache/cache.go
package cache
import (
"fmt"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/pkg/util/lifted/scheduler/framework"
)
var (
cleanAssumedPeriod = 1 * time.Second
)
// New returns a Cache implementation.
// It automatically starts a go routine that manages expiration of assumed pods.
// "ttl" is how long the assumed pod will get expired, "0" means pod will never expire.
// "stop" is the channel that would close the background goroutine.
func New(ttl time.Duration, stop <-chan struct{}) Cache {
cache := newCache(ttl, cleanAssumedPeriod, stop)
cache.run()
return cache
}
// nodeInfoListItem holds a NodeInfo pointer and acts as an item in a doubly
// linked list. When a NodeInfo is updated, it goes to the head of the list.
// The items closer to the head are the most recently updated items.
type nodeInfoListItem struct {
info *framework.NodeInfo
next *nodeInfoListItem
prev *nodeInfoListItem
}
type cacheImpl struct {
stop <-chan struct{}
ttl time.Duration
period time.Duration
// This mutex guards all fields within this cache struct.
mu sync.RWMutex
// a set of assumed pod keys.
// The key could further be used to get an entry in podStates.
assumedPods sets.String
// a map from pod key to podState.
podStates map[string]*podState
nodes map[string]*nodeInfoListItem
// headNode points to the most recently updated NodeInfo in "nodes". It is the
// head of the linked list.
headNode *nodeInfoListItem
nodeTree *nodeTree
// A map from image name to its imageState.
imageStates map[string]*imageState
}
type podState struct {
pod *corev1.Pod
// Used by assumedPod to determinate expiration.
// If deadline is nil, assumedPod will never expire.
deadline *time.Time
// Used to block cache from expiring assumedPod if binding still runs
bindingFinished bool
}
type imageState struct {
// Size of the image
size int64
// A set of node names for nodes having this image present
nodes sets.String
}
// createImageStateSummary returns a summarizing snapshot of the given image's state.
func (cache *cacheImpl) createImageStateSummary(state *imageState) *framework.ImageStateSummary {
return &framework.ImageStateSummary{
Size: state.size,
NumNodes: len(state.nodes),
}
}
func newCache(ttl, period time.Duration, stop <-chan struct{}) *cacheImpl {
return &cacheImpl{
ttl: ttl,
period: period,
stop: stop,
nodes: make(map[string]*nodeInfoListItem),
nodeTree: newNodeTree(nil),
assumedPods: make(sets.String),
podStates: make(map[string]*podState),
imageStates: make(map[string]*imageState),
}
}
// newNodeInfoListItem initializes a new nodeInfoListItem.
func newNodeInfoListItem(ni *framework.NodeInfo) *nodeInfoListItem {
return &nodeInfoListItem{
info: ni,
}
}
// moveNodeInfoToHead moves a NodeInfo to the head of "cache.nodes" doubly
// linked list. The head is the most recently updated NodeInfo.
// We assume cache lock is already acquired.
func (cache *cacheImpl) moveNodeInfoToHead(name string) {
ni, ok := cache.nodes[name]
if !ok {
klog.ErrorS(nil, "No node info with given name found in the cache", "node", klog.KRef("", name))
return
}
// if the node info list item is already at the head, we are done.
if ni == cache.headNode {
return
}
if ni.prev != nil {
ni.prev.next = ni.next
}
if ni.next != nil {
ni.next.prev = ni.prev
}
if cache.headNode != nil {
cache.headNode.prev = ni
}
ni.next = cache.headNode
ni.prev = nil
cache.headNode = ni
}
// removeNodeInfoFromList removes a NodeInfo from the "cache.nodes" doubly
// linked list.
// We assume cache lock is already acquired.
func (cache *cacheImpl) removeNodeInfoFromList(name string) {
ni, ok := cache.nodes[name]
if !ok {
klog.ErrorS(nil, "No node info with given name found in the cache", "node", klog.KRef("", name))
return
}
if ni.prev != nil {
ni.prev.next = ni.next
}
if ni.next != nil {
ni.next.prev = ni.prev
}
// if the removed item was at the head, we must update the head.
if ni == cache.headNode {
cache.headNode = ni.next
}
delete(cache.nodes, name)
}
// Dump produces a dump of the current scheduler cache. This is used for
// debugging purposes only and shouldn't be confused with UpdateSnapshot
// function.
// This method is expensive, and should be only used in non-critical path.
func (cache *cacheImpl) Dump() *Dump {
cache.mu.RLock()
defer cache.mu.RUnlock()
nodes := make(map[string]*framework.NodeInfo, len(cache.nodes))
for k, v := range cache.nodes {
nodes[k] = v.info.Clone()
}
return &Dump{
Nodes: nodes,
AssumedPods: cache.assumedPods.Union(nil),
}
}
// UpdateSnapshot takes a snapshot of cached NodeInfo map. This is called at
// beginning of every scheduling cycle.
// The snapshot only includes Nodes that are not deleted at the time this function is called.
// nodeInfo.Node() is guaranteed to be not nil for all the nodes in the snapshot.
// This function tracks generation number of NodeInfo and updates only the
// entries of an existing snapshot that have changed after the snapshot was taken.
//
//nolint:gocyclo
func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error {
cache.mu.Lock()
defer cache.mu.Unlock()
// Get the last generation of the snapshot.
snapshotGeneration := nodeSnapshot.generation
// NodeInfoList and HavePodsWithAffinityNodeInfoList must be re-created if a node was added
// or removed from the cache.
updateAllLists := false
// HavePodsWithAffinityNodeInfoList must be re-created if a node changed its
// status from having pods with affinity to NOT having pods with affinity or the other
// way around.
updateNodesHavePodsWithAffinity := false
// HavePodsWithRequiredAntiAffinityNodeInfoList must be re-created if a node changed its
// status from having pods with required anti-affinity to NOT having pods with required
// anti-affinity or the other way around.
updateNodesHavePodsWithRequiredAntiAffinity := false
// usedPVCSet must be re-created whenever the head node generation is greater than
// last snapshot generation.
updateUsedPVCSet := false
// Start from the head of the NodeInfo doubly linked list and update snapshot
// of NodeInfos updated after the last snapshot.
for node := cache.headNode; node != nil; node = node.next {
if node.info.Generation <= snapshotGeneration {
// all the nodes are updated before the existing snapshot. We are done.
break
}
if np := node.info.Node(); np != nil {
existing, ok := nodeSnapshot.nodeInfoMap[np.Name]
if !ok {
updateAllLists = true
existing = &framework.NodeInfo{}
nodeSnapshot.nodeInfoMap[np.Name] = existing
}
clone := node.info.Clone()
// We track nodes that have pods with affinity, here we check if this node changed its
// status from having pods with affinity to NOT having pods with affinity or the other
// way around.
if (len(existing.PodsWithAffinity) > 0) != (len(clone.PodsWithAffinity) > 0) {
updateNodesHavePodsWithAffinity = true
}
if (len(existing.PodsWithRequiredAntiAffinity) > 0) != (len(clone.PodsWithRequiredAntiAffinity) > 0) {
updateNodesHavePodsWithRequiredAntiAffinity = true
}
if !updateUsedPVCSet {
if len(existing.PVCRefCounts) != len(clone.PVCRefCounts) {
updateUsedPVCSet = true
} else {
for pvcKey := range clone.PVCRefCounts {
if _, found := existing.PVCRefCounts[pvcKey]; !found {
updateUsedPVCSet = true
break
}
}
}
}
// We need to preserve the original pointer of the NodeInfo struct since it
// is used in the NodeInfoList, which we may not update.
*existing = *clone
}
}
// Update the snapshot generation with the latest NodeInfo generation.
if cache.headNode != nil {
nodeSnapshot.generation = cache.headNode.info.Generation
}
// Comparing to pods in nodeTree.
// Deleted nodes get removed from the tree, but they might remain in the nodes map
// if they still have non-deleted Pods.
if len(nodeSnapshot.nodeInfoMap) > cache.nodeTree.numNodes {
cache.removeDeletedNodesFromSnapshot(nodeSnapshot)
updateAllLists = true
}
if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity || updateUsedPVCSet {
cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists)
}
if len(nodeSnapshot.nodeInfoList) != cache.nodeTree.numNodes {
errMsg := fmt.Sprintf("snapshot state is not consistent, length of NodeInfoList=%v not equal to length of nodes in tree=%v "+
", length of NodeInfoMap=%v, length of nodes in cache=%v"+
", trying to recover",
len(nodeSnapshot.nodeInfoList), cache.nodeTree.numNodes,
len(nodeSnapshot.nodeInfoMap), len(cache.nodes))
klog.ErrorS(nil, errMsg)
// We will try to recover by re-creating the lists for the next scheduling cycle, but still return an
// error to surface the problem, the error will likely cause a failure to the current scheduling cycle.
cache.updateNodeInfoSnapshotList(nodeSnapshot, true)
return fmt.Errorf(errMsg)
}
return nil
}
func (cache *cacheImpl) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) {
snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
snapshot.usedPVCSet = sets.NewString()
if updateAll {
// Take a snapshot of the nodes order in the tree
snapshot.nodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
nodesList, err := cache.nodeTree.list()
if err != nil {
klog.ErrorS(err, "Error occurred while retrieving the list of names of the nodes from node tree")
}
for _, nodeName := range nodesList {
if nodeInfo := snapshot.nodeInfoMap[nodeName]; nodeInfo != nil {
snapshot.nodeInfoList = append(snapshot.nodeInfoList, nodeInfo)
if len(nodeInfo.PodsWithAffinity) > 0 {
snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo)
}
if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)
}
for key := range nodeInfo.PVCRefCounts {
snapshot.usedPVCSet.Insert(key)
}
} else {
klog.ErrorS(nil, "Node exists in nodeTree but not in NodeInfoMap, this should not happen", "node", klog.KRef("", nodeName))
}
}
} else {
for _, nodeInfo := range snapshot.nodeInfoList {
if len(nodeInfo.PodsWithAffinity) > 0 {
snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo)
}
if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)
}
for key := range nodeInfo.PVCRefCounts {
snapshot.usedPVCSet.Insert(key)
}
}
}
}
// If certain nodes were deleted after the last snapshot was taken, we should remove them from the snapshot.
func (cache *cacheImpl) removeDeletedNodesFromSnapshot(snapshot *Snapshot) {
toDelete := len(snapshot.nodeInfoMap) - cache.nodeTree.numNodes
for name := range snapshot.nodeInfoMap {
if toDelete <= 0 {
break
}
if n, ok := cache.nodes[name]; !ok || n.info.Node() == nil {
delete(snapshot.nodeInfoMap, name)
toDelete--
}
}
}
// NodeCount returns the number of nodes in the cache.
// DO NOT use outside of tests.
func (cache *cacheImpl) NodeCount() int {
cache.mu.RLock()
defer cache.mu.RUnlock()
return len(cache.nodes)
}
// PodCount returns the number of pods in the cache (including those from deleted nodes).
// DO NOT use outside of tests.
func (cache *cacheImpl) PodCount() (int, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()
// podFilter is expected to return true for most or all of the pods. We
// can avoid expensive array growth without wasting too much memory by
// pre-allocating capacity.
count := 0
for _, n := range cache.nodes {
count += len(n.info.Pods)
}
return count, nil
}
func (cache *cacheImpl) AssumePod(pod *corev1.Pod) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
if _, ok := cache.podStates[key]; ok {
return fmt.Errorf("pod %v(%v) is in the cache, so can't be assumed", key, klog.KObj(pod))
}
return cache.addPod(pod, true)
}
func (cache *cacheImpl) FinishBinding(pod *corev1.Pod) error {
return cache.finishBinding(pod, time.Now())
}
// finishBinding exists to make tests deterministic by injecting now as an argument
func (cache *cacheImpl) finishBinding(pod *corev1.Pod, now time.Time) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.RLock()
defer cache.mu.RUnlock()
klog.V(5).InfoS("Finished binding for pod, can be expired", "podKey", key, "pod", klog.KObj(pod))
currState, ok := cache.podStates[key]
if ok && cache.assumedPods.Has(key) {
if cache.ttl == time.Duration(0) {
currState.deadline = nil
} else {
dl := now.Add(cache.ttl)
currState.deadline = &dl
}
currState.bindingFinished = true
}
return nil
}
func (cache *cacheImpl) ForgetPod(pod *corev1.Pod) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
if ok && currState.pod.Spec.NodeName != pod.Spec.NodeName {
return fmt.Errorf("pod %v(%v) was assumed on %v but assigned to %v", key, klog.KObj(pod), pod.Spec.NodeName, currState.pod.Spec.NodeName)
}
// Only assumed pod can be forgotten.
if ok && cache.assumedPods.Has(key) {
return cache.removePod(pod)
}
return fmt.Errorf("pod %v(%v) wasn't assumed so cannot be forgotten", key, klog.KObj(pod))
}
// Assumes that lock is already acquired.
func (cache *cacheImpl) addPod(pod *corev1.Pod, assumePod bool) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
n = newNodeInfoListItem(framework.NewNodeInfo())
cache.nodes[pod.Spec.NodeName] = n
}
n.info.AddPod(pod)
cache.moveNodeInfoToHead(pod.Spec.NodeName)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
if assumePod {
cache.assumedPods.Insert(key)
}
return nil
}
// Assumes that lock is already acquired.
func (cache *cacheImpl) updatePod(oldPod, newPod *corev1.Pod) error {
if err := cache.removePod(oldPod); err != nil {
return err
}
return cache.addPod(newPod, false)
}
// Assumes that lock is already acquired.
// Removes a pod from the cached node info. If the node information was already
// removed and there are no more pods left in the node, cleans up the node from
// the cache.
func (cache *cacheImpl) removePod(pod *corev1.Pod) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
klog.ErrorS(nil, "Node not found when trying to remove pod", "node", klog.KRef("", pod.Spec.NodeName), "podKey", key, "pod", klog.KObj(pod))
} else {
if err := n.info.RemovePod(pod); err != nil {
return err
}
if len(n.info.Pods) == 0 && n.info.Node() == nil {
cache.removeNodeInfoFromList(pod.Spec.NodeName)
} else {
cache.moveNodeInfoToHead(pod.Spec.NodeName)
}
}
delete(cache.podStates, key)
delete(cache.assumedPods, key)
return nil
}
func (cache *cacheImpl) AddPod(pod *corev1.Pod) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
switch {
case ok && cache.assumedPods.Has(key):
// When assuming, we've already added the Pod to cache,
// Just update here to make sure the Pod's status is up-to-date.
if err = cache.updatePod(currState.pod, pod); err != nil {
klog.ErrorS(err, "Error occurred while updating pod")
}
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
// The pod was added to a different node than it was assumed to.
klog.InfoS("Pod was added to a different node than it was assumed", "podKey", key, "pod", klog.KObj(pod), "assumedNode", klog.KRef("", pod.Spec.NodeName), "currentNode", klog.KRef("", currState.pod.Spec.NodeName))
return nil
}
case !ok:
// Pod was expired. We should add it back.
if err = cache.addPod(pod, false); err != nil {
klog.ErrorS(err, "Error occurred while adding pod")
}
default:
return fmt.Errorf("pod %v(%v) was already in added state", key, klog.KObj(pod))
}
return nil
}
func (cache *cacheImpl) UpdatePod(oldPod, newPod *corev1.Pod) error {
key, err := framework.GetPodKey(oldPod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
// An assumed pod won't have Update/Remove event. It needs to have Add event
// before Update event, in which case the state would change from Assumed to Added.
if ok && !cache.assumedPods.Has(key) {
if currState.pod.Spec.NodeName != newPod.Spec.NodeName {
klog.ErrorS(nil, "Pod updated on a different node than previously added to", "podKey", key, "pod", klog.KObj(oldPod))
klog.ErrorS(nil, "scheduler cache is corrupted and can badly affect scheduling decisions")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
return cache.updatePod(oldPod, newPod)
}
return fmt.Errorf("pod %v(%v) is not added to scheduler cache, so cannot be updated", key, klog.KObj(oldPod))
}
func (cache *cacheImpl) RemovePod(pod *corev1.Pod) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
if !ok {
return fmt.Errorf("pod %v(%v) is not found in scheduler cache, so cannot be removed from it", key, klog.KObj(pod))
}
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
klog.ErrorS(nil, "Pod was added to a different node than it was assumed", "podKey", key, "pod", klog.KObj(pod), "assumedNode", klog.KRef("", pod.Spec.NodeName), "currentNode", klog.KRef("", currState.pod.Spec.NodeName))
if pod.Spec.NodeName != "" {
// An empty NodeName is possible when the scheduler misses a Delete
// event and it gets the last known state from the informer cache.
klog.ErrorS(nil, "scheduler cache is corrupted and can badly affect scheduling decisions")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}
return cache.removePod(currState.pod)
}
func (cache *cacheImpl) IsAssumedPod(pod *corev1.Pod) (bool, error) {
key, err := framework.GetPodKey(pod)
if err != nil {
return false, err
}
cache.mu.RLock()
defer cache.mu.RUnlock()
return cache.assumedPods.Has(key), nil
}
// GetPod might return a pod for which its node has already been deleted from
// the main cache. This is useful to properly process pod update events.
func (cache *cacheImpl) GetPod(pod *corev1.Pod) (*corev1.Pod, error) {
key, err := framework.GetPodKey(pod)
if err != nil {
return nil, err
}
cache.mu.RLock()
defer cache.mu.RUnlock()
podState, ok := cache.podStates[key]
if !ok {
return nil, fmt.Errorf("pod %v(%v) does not exist in scheduler cache", key, klog.KObj(pod))
}
return podState.pod, nil
}
func (cache *cacheImpl) AddNode(node *corev1.Node) *framework.NodeInfo {
cache.mu.Lock()
defer cache.mu.Unlock()
n, ok := cache.nodes[node.Name]
if !ok {
n = newNodeInfoListItem(framework.NewNodeInfo())
cache.nodes[node.Name] = n
} else {
cache.removeNodeImageStates(n.info.Node())
}
cache.moveNodeInfoToHead(node.Name)
cache.nodeTree.addNode(node)
cache.addNodeImageStates(node, n.info)
n.info.SetNode(node)
return n.info.Clone()
}
func (cache *cacheImpl) UpdateNode(oldNode, newNode *corev1.Node) *framework.NodeInfo {
cache.mu.Lock()
defer cache.mu.Unlock()
n, ok := cache.nodes[newNode.Name]
if !ok {
n = newNodeInfoListItem(framework.NewNodeInfo())
cache.nodes[newNode.Name] = n
cache.nodeTree.addNode(newNode)
} else {
cache.removeNodeImageStates(n.info.Node())
}
cache.moveNodeInfoToHead(newNode.Name)
cache.nodeTree.updateNode(oldNode, newNode)
cache.addNodeImageStates(newNode, n.info)
n.info.SetNode(newNode)
return n.info.Clone()
}
// RemoveNode removes a node from the cache's tree.
// The node might still have pods because their deletion events didn't arrive
// yet. Those pods are considered removed from the cache, being the node tree
// the source of truth.
// However, we keep a ghost node with the list of pods until all pod deletion
// events have arrived. A ghost node is skipped from snapshots.
func (cache *cacheImpl) RemoveNode(node *corev1.Node) error {
cache.mu.Lock()
defer cache.mu.Unlock()
n, ok := cache.nodes[node.Name]
if !ok {
return fmt.Errorf("node %v is not found", node.Name)
}
n.info.RemoveNode()
// We remove NodeInfo for this node only if there aren't any pods on this node.
// We can't do it unconditionally, because notifications about pods are delivered
// in a different watch, and thus can potentially be observed later, even though
// they happened before node removal.
if len(n.info.Pods) == 0 {
cache.removeNodeInfoFromList(node.Name)
} else {
cache.moveNodeInfoToHead(node.Name)
}
if err := cache.nodeTree.removeNode(node); err != nil {
return err
}
cache.removeNodeImageStates(node)
return nil
}
// addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in
// scheduler cache. This function assumes the lock to scheduler cache has been acquired.
func (cache *cacheImpl) addNodeImageStates(node *corev1.Node, nodeInfo *framework.NodeInfo) {
newSum := make(map[string]*framework.ImageStateSummary)
for _, image := range node.Status.Images {
for _, name := range image.Names {
// update the entry in imageStates
state, ok := cache.imageStates[name]
if !ok {
state = &imageState{
size: image.SizeBytes,
nodes: sets.NewString(node.Name),
}
cache.imageStates[name] = state
} else {
state.nodes.Insert(node.Name)
}
// create the imageStateSummary for this image
if _, ok := newSum[name]; !ok {
newSum[name] = cache.createImageStateSummary(state)
}
}
}
nodeInfo.ImageStates = newSum
}
// removeNodeImageStates removes the given node record from image entries having the node
// in imageStates cache. After the removal, if any image becomes free, i.e., the image
// is no longer available on any node, the image entry will be removed from imageStates.
func (cache *cacheImpl) removeNodeImageStates(node *corev1.Node) {
if node == nil {
return
}
for _, image := range node.Status.Images {
for _, name := range image.Names {
state, ok := cache.imageStates[name]
if ok {
state.nodes.Delete(node.Name)
if len(state.nodes) == 0 {
// Remove the unused image to make sure the length of
// imageStates represents the total number of different
// images on all nodes
delete(cache.imageStates, name)
}
}
}
}
}
func (cache *cacheImpl) run() {
go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
}
func (cache *cacheImpl) cleanupExpiredAssumedPods() {
cache.cleanupAssumedPods(time.Now())
}
// cleanupAssumedPods exists for making test deterministic by taking time as input argument.
// It also reports metrics on the cache size for nodes, pods, and assumed pods.
func (cache *cacheImpl) cleanupAssumedPods(now time.Time) {
cache.mu.Lock()
defer cache.mu.Unlock()
// The size of assumedPods should be small
for key := range cache.assumedPods {
ps, ok := cache.podStates[key]
if !ok {
klog.ErrorS(nil, "Key found in assumed set but not in podStates, potentially a logical error")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
if !ps.bindingFinished {
klog.V(5).InfoS("Could not expire cache for pod as binding is still in progress", "podKey", key, "pod", klog.KObj(ps.pod))
continue
}
if cache.ttl != 0 && now.After(*ps.deadline) {
klog.InfoS("Pod expired", "podKey", key, "pod", klog.KObj(ps.pod))
if err := cache.removePod(ps.pod); err != nil {
klog.ErrorS(err, "ExpirePod failed", "podKey", key, "pod", klog.KObj(ps.pod))
}
}
}
}

View File

@ -0,0 +1,127 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This code is directly lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package.
// For reference:
// https://github.com/kubernetes/kubernetes/blob/release-1.25/pkg/scheduler/internal/cache/interface.go
package cache
import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"github.com/karmada-io/karmada/pkg/util/lifted/scheduler/framework"
)
// Cache collects pods' information and provides node-level aggregated information.
// It's intended for generic scheduler to do efficient lookup.
// Cache's operations are pod centric. It does incremental updates based on pod events.
// Pod events are sent via network. We don't have guaranteed delivery of all events:
// We use Reflector to list and watch from remote.
// Reflector might be slow and do a relist, which would lead to missing events.
//
// State Machine of a pod's events in scheduler's cache:
//
// +-------------------------------------------+ +----+
// | Add | | |
// | | | | Update
// + Assume Add v v |
//
// Initial +--------> Assumed +------------+---> Added <--+
//
// ^ + + | +
// | | | | |
// | | | Add | | Remove
// | | | | |
// | | | + |
// +----------------+ +-----------> Expired +----> Deleted
// Forget Expire
//
// Note that an assumed pod can expire, because if we haven't received Add event notifying us
// for a while, there might be some problems and we shouldn't keep the pod in cache anymore.
//
// Note that "Initial", "Expired", and "Deleted" pods do not actually exist in cache.
// Based on existing use cases, we are making the following assumptions:
// - No pod would be assumed twice
// - A pod could be added without going through scheduler. In this case, we will see Add but not Assume event.
// - If a pod wasn't added, it wouldn't be removed or updated.
// - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue,
// a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache.
type Cache interface {
// NodeCount returns the number of nodes in the cache.
// DO NOT use outside of tests.
NodeCount() int
// PodCount returns the number of pods in the cache (including those from deleted nodes).
// DO NOT use outside of tests.
PodCount() (int, error)
// AssumePod assumes a pod scheduled and aggregates the pod's information into its node.
// The implementation also decides the policy to expire pod before being confirmed (receiving Add event).
// After expiration, its information would be subtracted.
AssumePod(pod *corev1.Pod) error
// FinishBinding signals that cache for assumed pod can be expired
FinishBinding(pod *corev1.Pod) error
// ForgetPod removes an assumed pod from cache.
ForgetPod(pod *corev1.Pod) error
// AddPod either confirms a pod if it's assumed, or adds it back if it's expired.
// If added back, the pod's information would be added again.
AddPod(pod *corev1.Pod) error
// UpdatePod removes oldPod's information and adds newPod's information.
UpdatePod(oldPod, newPod *corev1.Pod) error
// RemovePod removes a pod. The pod's information would be subtracted from assigned node.
RemovePod(pod *corev1.Pod) error
// GetPod returns the pod from the cache with the same namespace and the
// same name of the specified pod.
GetPod(pod *corev1.Pod) (*corev1.Pod, error)
// IsAssumedPod returns true if the pod is assumed and not expired.
IsAssumedPod(pod *corev1.Pod) (bool, error)
// AddNode adds overall information about node.
// It returns a clone of added NodeInfo object.
AddNode(node *corev1.Node) *framework.NodeInfo
// UpdateNode updates overall information about node.
// It returns a clone of updated NodeInfo object.
UpdateNode(oldNode, newNode *corev1.Node) *framework.NodeInfo
// RemoveNode removes overall information about node.
RemoveNode(node *corev1.Node) error
// UpdateSnapshot updates the passed infoSnapshot to the current contents of Cache.
// The node info contains aggregated information of pods scheduled (including assumed to be)
// on this node.
// The snapshot only includes Nodes that are not deleted at the time this function is called.
// nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot.
UpdateSnapshot(nodeSnapshot *Snapshot) error
// Dump produces a dump of the current cache.
Dump() *Dump
}
// Dump is a dump of the cache state.
type Dump struct {
AssumedPods sets.String
Nodes map[string]*framework.NodeInfo
}

View File

@ -0,0 +1,147 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This code is directly lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package.
// For reference:
// https://github.com/kubernetes/kubernetes/blob/release-1.25/pkg/scheduler/internal/cache/node_tree.go
package cache
import (
"errors"
"fmt"
corev1 "k8s.io/api/core/v1"
utilnode "k8s.io/component-helpers/node/topology"
"k8s.io/klog/v2"
)
// nodeTree is a tree-like data structure that holds node names in each zone. Zone names are
// keys to "NodeTree.tree" and values of "NodeTree.tree" are arrays of node names.
// NodeTree is NOT thread-safe, any concurrent updates/reads from it must be synchronized by the caller.
// It is used only by schedulerCache, and should stay as such.
type nodeTree struct {
tree map[string][]string // a map from zone (region-zone) to an array of nodes in the zone.
zones []string // a list of all the zones in the tree (keys)
numNodes int
}
// newNodeTree creates a NodeTree from nodes.
func newNodeTree(nodes []*corev1.Node) *nodeTree {
nt := &nodeTree{
tree: make(map[string][]string, len(nodes)),
}
for _, n := range nodes {
nt.addNode(n)
}
return nt
}
// addNode adds a node and its corresponding zone to the tree. If the zone already exists, the node
// is added to the array of nodes in that zone.
func (nt *nodeTree) addNode(n *corev1.Node) {
zone := utilnode.GetZoneKey(n)
if na, ok := nt.tree[zone]; ok {
for _, nodeName := range na {
if nodeName == n.Name {
klog.InfoS("Node already exists in the NodeTree", "node", klog.KObj(n))
return
}
}
nt.tree[zone] = append(na, n.Name)
} else {
nt.zones = append(nt.zones, zone)
nt.tree[zone] = []string{n.Name}
}
klog.V(2).InfoS("Added node in listed group to NodeTree", "node", klog.KObj(n), "zone", zone)
nt.numNodes++
}
// removeNode removes a node from the NodeTree.
func (nt *nodeTree) removeNode(n *corev1.Node) error {
zone := utilnode.GetZoneKey(n)
if na, ok := nt.tree[zone]; ok {
for i, nodeName := range na {
if nodeName == n.Name {
nt.tree[zone] = append(na[:i], na[i+1:]...)
if len(nt.tree[zone]) == 0 {
nt.removeZone(zone)
}
klog.V(2).InfoS("Removed node in listed group from NodeTree", "node", klog.KObj(n), "zone", zone)
nt.numNodes--
return nil
}
}
}
klog.ErrorS(nil, "Node in listed group was not found", "node", klog.KObj(n), "zone", zone)
return fmt.Errorf("node %q in group %q was not found", n.Name, zone)
}
// removeZone removes a zone from tree.
// This function must be called while writer locks are hold.
func (nt *nodeTree) removeZone(zone string) {
delete(nt.tree, zone)
for i, z := range nt.zones {
if z == zone {
nt.zones = append(nt.zones[:i], nt.zones[i+1:]...)
return
}
}
}
// updateNode updates a node in the NodeTree.
func (nt *nodeTree) updateNode(old, new *corev1.Node) {
var oldZone string
if old != nil {
oldZone = utilnode.GetZoneKey(old)
}
newZone := utilnode.GetZoneKey(new)
// If the zone ID of the node has not changed, we don't need to do anything. Name of the node
// cannot be changed in an update.
if oldZone == newZone {
return
}
_ = nt.removeNode(old) // No error checking. We ignore whether the old node exists or not.
nt.addNode(new)
}
// list returns the list of names of the node. NodeTree iterates over zones and in each zone iterates
// over nodes in a round robin fashion.
func (nt *nodeTree) list() ([]string, error) {
if len(nt.zones) == 0 {
return nil, nil
}
nodesList := make([]string, 0, nt.numNodes)
numExhaustedZones := 0
nodeIndex := 0
for len(nodesList) < nt.numNodes {
if numExhaustedZones >= len(nt.zones) { // all zones are exhausted.
return nodesList, errors.New("all zones exhausted before reaching count of nodes expected")
}
for zoneIndex := 0; zoneIndex < len(nt.zones); zoneIndex++ {
na := nt.tree[nt.zones[zoneIndex]]
if nodeIndex >= len(na) { // If the zone is exhausted, continue
if nodeIndex == len(na) { // If it is the first time the zone is exhausted
numExhaustedZones++
}
continue
}
nodesList = append(nodesList, na[nodeIndex])
}
nodeIndex++
}
return nodesList, nil
}

View File

@ -0,0 +1,205 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This code is directly lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package.
// For reference:
// https://github.com/kubernetes/kubernetes/blob/release-1.25/pkg/scheduler/internal/cache/snapshot.go
package cache
import (
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"github.com/karmada-io/karmada/pkg/util/lifted/scheduler/framework"
)
// Snapshot is a snapshot of cache NodeInfo and NodeTree order. The scheduler takes a
// snapshot at the beginning of each scheduling cycle and uses it for its operations in that cycle.
type Snapshot struct {
// nodeInfoMap a map of node name to a snapshot of its NodeInfo.
nodeInfoMap map[string]*framework.NodeInfo
// nodeInfoList is the list of nodes as ordered in the cache's nodeTree.
nodeInfoList []*framework.NodeInfo
// havePodsWithAffinityNodeInfoList is the list of nodes with at least one pod declaring affinity terms.
havePodsWithAffinityNodeInfoList []*framework.NodeInfo
// havePodsWithRequiredAntiAffinityNodeInfoList is the list of nodes with at least one pod declaring
// required anti-affinity terms.
havePodsWithRequiredAntiAffinityNodeInfoList []*framework.NodeInfo
// usedPVCSet contains a set of PVC names that have one or more scheduled pods using them,
// keyed in the format "namespace/name".
usedPVCSet sets.String
generation int64
}
var _ framework.SharedLister = &Snapshot{}
// NewEmptySnapshot initializes a Snapshot struct and returns it.
func NewEmptySnapshot() *Snapshot {
return &Snapshot{
nodeInfoMap: make(map[string]*framework.NodeInfo),
usedPVCSet: sets.NewString(),
}
}
// NewSnapshot initializes a Snapshot struct and returns it.
func NewSnapshot(pods []*corev1.Pod, nodes []*corev1.Node) *Snapshot {
nodeInfoMap := createNodeInfoMap(pods, nodes)
nodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap))
havePodsWithAffinityNodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap))
havePodsWithRequiredAntiAffinityNodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap))
for _, v := range nodeInfoMap {
nodeInfoList = append(nodeInfoList, v)
if len(v.PodsWithAffinity) > 0 {
havePodsWithAffinityNodeInfoList = append(havePodsWithAffinityNodeInfoList, v)
}
if len(v.PodsWithRequiredAntiAffinity) > 0 {
havePodsWithRequiredAntiAffinityNodeInfoList = append(havePodsWithRequiredAntiAffinityNodeInfoList, v)
}
}
s := NewEmptySnapshot()
s.nodeInfoMap = nodeInfoMap
s.nodeInfoList = nodeInfoList
s.havePodsWithAffinityNodeInfoList = havePodsWithAffinityNodeInfoList
s.havePodsWithRequiredAntiAffinityNodeInfoList = havePodsWithRequiredAntiAffinityNodeInfoList
s.usedPVCSet = createUsedPVCSet(pods)
return s
}
// createNodeInfoMap obtains a list of pods and pivots that list into a map
// where the keys are node names and the values are the aggregated information
// for that node.
func createNodeInfoMap(pods []*corev1.Pod, nodes []*corev1.Node) map[string]*framework.NodeInfo {
nodeNameToInfo := make(map[string]*framework.NodeInfo)
for _, pod := range pods {
nodeName := pod.Spec.NodeName
if _, ok := nodeNameToInfo[nodeName]; !ok {
nodeNameToInfo[nodeName] = framework.NewNodeInfo()
}
nodeNameToInfo[nodeName].AddPod(pod)
}
imageExistenceMap := createImageExistenceMap(nodes)
for _, node := range nodes {
if _, ok := nodeNameToInfo[node.Name]; !ok {
nodeNameToInfo[node.Name] = framework.NewNodeInfo()
}
nodeInfo := nodeNameToInfo[node.Name]
nodeInfo.SetNode(node)
nodeInfo.ImageStates = getNodeImageStates(node, imageExistenceMap)
}
return nodeNameToInfo
}
func createUsedPVCSet(pods []*corev1.Pod) sets.String {
usedPVCSet := sets.NewString()
for _, pod := range pods {
if pod.Spec.NodeName == "" {
continue
}
for _, v := range pod.Spec.Volumes {
if v.PersistentVolumeClaim == nil {
continue
}
key := framework.GetNamespacedName(pod.Namespace, v.PersistentVolumeClaim.ClaimName)
usedPVCSet.Insert(key)
}
}
return usedPVCSet
}
// getNodeImageStates returns the given node's image states based on the given imageExistence map.
func getNodeImageStates(node *corev1.Node, imageExistenceMap map[string]sets.String) map[string]*framework.ImageStateSummary {
imageStates := make(map[string]*framework.ImageStateSummary)
for _, image := range node.Status.Images {
for _, name := range image.Names {
imageStates[name] = &framework.ImageStateSummary{
Size: image.SizeBytes,
NumNodes: len(imageExistenceMap[name]),
}
}
}
return imageStates
}
// createImageExistenceMap returns a map recording on which nodes the images exist, keyed by the images' names.
func createImageExistenceMap(nodes []*corev1.Node) map[string]sets.String {
imageExistenceMap := make(map[string]sets.String)
for _, node := range nodes {
for _, image := range node.Status.Images {
for _, name := range image.Names {
if _, ok := imageExistenceMap[name]; !ok {
imageExistenceMap[name] = sets.NewString(node.Name)
} else {
imageExistenceMap[name].Insert(node.Name)
}
}
}
}
return imageExistenceMap
}
// NodeInfos returns a NodeInfoLister.
func (s *Snapshot) NodeInfos() framework.NodeInfoLister {
return s
}
// StorageInfos returns a StorageInfoLister.
func (s *Snapshot) StorageInfos() framework.StorageInfoLister {
return s
}
// NumNodes returns the number of nodes in the snapshot.
func (s *Snapshot) NumNodes() int {
return len(s.nodeInfoList)
}
// List returns the list of nodes in the snapshot.
func (s *Snapshot) List() ([]*framework.NodeInfo, error) {
return s.nodeInfoList, nil
}
// HavePodsWithAffinityList returns the list of nodes with at least one pod with inter-pod affinity
func (s *Snapshot) HavePodsWithAffinityList() ([]*framework.NodeInfo, error) {
return s.havePodsWithAffinityNodeInfoList, nil
}
// HavePodsWithRequiredAntiAffinityList returns the list of nodes with at least one pod with
// required inter-pod anti-affinity
func (s *Snapshot) HavePodsWithRequiredAntiAffinityList() ([]*framework.NodeInfo, error) {
return s.havePodsWithRequiredAntiAffinityNodeInfoList, nil
}
// Get returns the NodeInfo of the given node name.
func (s *Snapshot) Get(nodeName string) (*framework.NodeInfo, error) {
if v, ok := s.nodeInfoMap[nodeName]; ok && v.Node() != nil {
return v, nil
}
return nil, fmt.Errorf("nodeinfo not found for node name %q", nodeName)
}
// IsPVCUsedByPods returns true/false on whether the PVC is used by one or more scheduled pods,
// keyed in the format "namespace/name".
func (s *Snapshot) IsPVCUsedByPods(key string) bool {
return s.usedPVCSet.Has(key)
}

View File

@ -0,0 +1,46 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This code is directly lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package.
// For reference:
// https://github.com/kubernetes/kubernetes/blob/release-1.25/pkg/scheduler/framework/listers.go
package framework
// NodeInfoLister interface represents anything that can list/get NodeInfo objects from node name.
type NodeInfoLister interface {
// List returns the list of NodeInfos.
List() ([]*NodeInfo, error)
// HavePodsWithAffinityList returns the list of NodeInfos of nodes with pods with affinity terms.
HavePodsWithAffinityList() ([]*NodeInfo, error)
// HavePodsWithRequiredAntiAffinityList returns the list of NodeInfos of nodes with pods with required anti-affinity terms.
HavePodsWithRequiredAntiAffinityList() ([]*NodeInfo, error)
// Get returns the NodeInfo of the given node name.
Get(nodeName string) (*NodeInfo, error)
}
// StorageInfoLister interface represents anything that handles storage-related operations and resources.
type StorageInfoLister interface {
// IsPVCUsedByPods returns true/false on whether the PVC is used by one or more scheduled pods,
// keyed in the format "namespace/name".
IsPVCUsedByPods(key string) bool
}
// SharedLister groups scheduler-specific listers.
type SharedLister interface {
NodeInfos() NodeInfoLister
StorageInfos() StorageInfoLister
}

View File

@ -18,7 +18,7 @@ limitations under the License.
// For reference:
// https://github.com/kubernetes/kubernetes/blob/release-1.23/pkg/scheduler/framework/parallelize/parallelism.go
package lifted
package parallelize
import (
"context"

View File

@ -18,7 +18,7 @@ limitations under the License.
// For reference:
// https://github.com/kubernetes/kubernetes/blob/release-1.23/pkg/scheduler/framework/parallelize/parallelism_test.go
package lifted
package parallelize
import (
"fmt"

View File

@ -0,0 +1,770 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This code is directly lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package.
// For reference:
// https://github.com/kubernetes/kubernetes/blob/release-1.25/pkg/scheduler/framework/types.go
package framework
import (
"errors"
"fmt"
"sync/atomic"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"github.com/karmada-io/karmada/pkg/util"
schedutil "github.com/karmada-io/karmada/pkg/util/lifted/scheduler/util"
)
var generation int64
// ActionType is an integer to represent one type of resource change.
// Different ActionTypes can be bit-wised to compose new semantics.
type ActionType int64
// Constants for ActionTypes.
const (
Add ActionType = 1 << iota // 1
Delete // 10
// UpdateNodeXYZ is only applicable for Node events.
UpdateNodeAllocatable // 100
UpdateNodeLabel // 1000
UpdateNodeTaint // 10000
UpdateNodeCondition // 100000
All ActionType = 1<<iota - 1 // 111111
// Use the general Update type if you don't either know or care the specific sub-Update type to use.
Update = UpdateNodeAllocatable | UpdateNodeLabel | UpdateNodeTaint | UpdateNodeCondition
)
// GVK is short for group/version/kind, which can uniquely represent a particular API resource.
type GVK string
// Constants for GVKs.
const (
Pod GVK = "Pod"
Node GVK = "Node"
PersistentVolume GVK = "PersistentVolume"
PersistentVolumeClaim GVK = "PersistentVolumeClaim"
StorageClass GVK = "storage.k8s.io/StorageClass"
CSINode GVK = "storage.k8s.io/CSINode"
CSIDriver GVK = "storage.k8s.io/CSIDriver"
CSIStorageCapacity GVK = "storage.k8s.io/CSIStorageCapacity"
WildCard GVK = "*"
)
// ClusterEvent abstracts how a system resource's state gets changed.
// Resource represents the standard API resources such as Pod, Node, etc.
// ActionType denotes the specific change such as Add, Update or Delete.
type ClusterEvent struct {
Resource GVK
ActionType ActionType
Label string
}
// IsWildCard returns true if ClusterEvent follows WildCard semantics
func (ce ClusterEvent) IsWildCard() bool {
return ce.Resource == WildCard && ce.ActionType == All
}
// QueuedPodInfo is a Pod wrapper with additional information related to
// the pod's status in the scheduling queue, such as the timestamp when
// it's added to the queue.
type QueuedPodInfo struct {
*PodInfo
// The time pod added to the scheduling queue.
Timestamp time.Time
// Number of schedule attempts before successfully scheduled.
// It's used to record the # attempts metric.
Attempts int
// The time when the pod is added to the queue for the first time. The pod may be added
// back to the queue multiple times before it's successfully scheduled.
// It shouldn't be updated once initialized. It's used to record the e2e scheduling
// latency for a pod.
InitialAttemptTimestamp time.Time
// If a Pod failed in a scheduling cycle, record the plugin names it failed by.
UnschedulablePlugins sets.String
}
// DeepCopy returns a deep copy of the QueuedPodInfo object.
func (pqi *QueuedPodInfo) DeepCopy() *QueuedPodInfo {
return &QueuedPodInfo{
PodInfo: pqi.PodInfo.DeepCopy(),
Timestamp: pqi.Timestamp,
Attempts: pqi.Attempts,
InitialAttemptTimestamp: pqi.InitialAttemptTimestamp,
}
}
// PodInfo is a wrapper to a Pod with additional pre-computed information to
// accelerate processing. This information is typically immutable (e.g., pre-processed
// inter-pod affinity selectors).
type PodInfo struct {
Pod *corev1.Pod
RequiredAffinityTerms []AffinityTerm
RequiredAntiAffinityTerms []AffinityTerm
PreferredAffinityTerms []WeightedAffinityTerm
PreferredAntiAffinityTerms []WeightedAffinityTerm
ParseError error
}
// DeepCopy returns a deep copy of the PodInfo object.
func (pi *PodInfo) DeepCopy() *PodInfo {
return &PodInfo{
Pod: pi.Pod.DeepCopy(),
RequiredAffinityTerms: pi.RequiredAffinityTerms,
RequiredAntiAffinityTerms: pi.RequiredAntiAffinityTerms,
PreferredAffinityTerms: pi.PreferredAffinityTerms,
PreferredAntiAffinityTerms: pi.PreferredAntiAffinityTerms,
ParseError: pi.ParseError,
}
}
// Update creates a full new PodInfo by default. And only updates the pod when the PodInfo
// has been instantiated and the passed pod is the exact same one as the original pod.
func (pi *PodInfo) Update(pod *corev1.Pod) {
if pod != nil && pi.Pod != nil && pi.Pod.UID == pod.UID {
// PodInfo includes immutable information, and so it is safe to update the pod in place if it is
// the exact same pod
pi.Pod = pod
return
}
var preferredAffinityTerms []corev1.WeightedPodAffinityTerm
var preferredAntiAffinityTerms []corev1.WeightedPodAffinityTerm
if affinity := pod.Spec.Affinity; affinity != nil {
if a := affinity.PodAffinity; a != nil {
preferredAffinityTerms = a.PreferredDuringSchedulingIgnoredDuringExecution
}
if a := affinity.PodAntiAffinity; a != nil {
preferredAntiAffinityTerms = a.PreferredDuringSchedulingIgnoredDuringExecution
}
}
// Attempt to parse the affinity terms
var parseErrs []error
requiredAffinityTerms, err := getAffinityTerms(pod, getPodAffinityTerms(pod.Spec.Affinity))
if err != nil {
parseErrs = append(parseErrs, fmt.Errorf("requiredAffinityTerms: %w", err))
}
requiredAntiAffinityTerms, err := getAffinityTerms(pod,
getPodAntiAffinityTerms(pod.Spec.Affinity))
if err != nil {
parseErrs = append(parseErrs, fmt.Errorf("requiredAntiAffinityTerms: %w", err))
}
weightedAffinityTerms, err := getWeightedAffinityTerms(pod, preferredAffinityTerms)
if err != nil {
parseErrs = append(parseErrs, fmt.Errorf("preferredAffinityTerms: %w", err))
}
weightedAntiAffinityTerms, err := getWeightedAffinityTerms(pod, preferredAntiAffinityTerms)
if err != nil {
parseErrs = append(parseErrs, fmt.Errorf("preferredAntiAffinityTerms: %w", err))
}
pi.Pod = pod
pi.RequiredAffinityTerms = requiredAffinityTerms
pi.RequiredAntiAffinityTerms = requiredAntiAffinityTerms
pi.PreferredAffinityTerms = weightedAffinityTerms
pi.PreferredAntiAffinityTerms = weightedAntiAffinityTerms
pi.ParseError = utilerrors.NewAggregate(parseErrs)
}
// AffinityTerm is a processed version of v1.PodAffinityTerm.
type AffinityTerm struct {
Namespaces sets.String
Selector labels.Selector
TopologyKey string
NamespaceSelector labels.Selector
}
// Matches returns true if the pod matches the label selector and namespaces or namespace selector.
func (at *AffinityTerm) Matches(pod *corev1.Pod, nsLabels labels.Set) bool {
if at.Namespaces.Has(pod.Namespace) || at.NamespaceSelector.Matches(nsLabels) {
return at.Selector.Matches(labels.Set(pod.Labels))
}
return false
}
// WeightedAffinityTerm is a "processed" representation of v1.WeightedAffinityTerm.
type WeightedAffinityTerm struct {
AffinityTerm
Weight int32
}
func newAffinityTerm(pod *corev1.Pod, term *corev1.PodAffinityTerm) (*AffinityTerm, error) {
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
return nil, err
}
namespaces := getNamespacesFromPodAffinityTerm(pod, term)
nsSelector, err := metav1.LabelSelectorAsSelector(term.NamespaceSelector)
if err != nil {
return nil, err
}
return &AffinityTerm{Namespaces: namespaces, Selector: selector, TopologyKey: term.TopologyKey, NamespaceSelector: nsSelector}, nil
}
// getAffinityTerms receives a Pod and affinity terms and returns the namespaces and
// selectors of the terms.
func getAffinityTerms(pod *corev1.Pod, v1Terms []corev1.PodAffinityTerm) ([]AffinityTerm, error) {
if v1Terms == nil {
return nil, nil
}
var terms []AffinityTerm
for i := range v1Terms {
t, err := newAffinityTerm(pod, &v1Terms[i])
if err != nil {
// We get here if the label selector failed to process
return nil, err
}
terms = append(terms, *t)
}
return terms, nil
}
// getWeightedAffinityTerms returns the list of processed affinity terms.
func getWeightedAffinityTerms(pod *corev1.Pod, v1Terms []corev1.WeightedPodAffinityTerm) ([]WeightedAffinityTerm, error) {
if v1Terms == nil {
return nil, nil
}
var terms []WeightedAffinityTerm
for i := range v1Terms {
t, err := newAffinityTerm(pod, &v1Terms[i].PodAffinityTerm)
if err != nil {
// We get here if the label selector failed to process
return nil, err
}
terms = append(terms, WeightedAffinityTerm{AffinityTerm: *t, Weight: v1Terms[i].Weight})
}
return terms, nil
}
// NewPodInfo returns a new PodInfo.
func NewPodInfo(pod *corev1.Pod) *PodInfo {
pInfo := &PodInfo{}
pInfo.Update(pod)
return pInfo
}
func getPodAffinityTerms(affinity *corev1.Affinity) (terms []corev1.PodAffinityTerm) {
if affinity != nil && affinity.PodAffinity != nil {
if len(affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
terms = affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
}
return terms
}
func getPodAntiAffinityTerms(affinity *corev1.Affinity) (terms []corev1.PodAffinityTerm) {
if affinity != nil && affinity.PodAntiAffinity != nil {
if len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
terms = affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
}
return terms
}
// returns a set of names according to the namespaces indicated in podAffinityTerm.
// If namespaces is empty it considers the given pod's namespace.
func getNamespacesFromPodAffinityTerm(pod *corev1.Pod, podAffinityTerm *corev1.PodAffinityTerm) sets.String {
names := sets.String{}
if len(podAffinityTerm.Namespaces) == 0 && podAffinityTerm.NamespaceSelector == nil {
names.Insert(pod.Namespace)
} else {
names.Insert(podAffinityTerm.Namespaces...)
}
return names
}
// ImageStateSummary provides summarized information about the state of an image.
type ImageStateSummary struct {
// Size of the image
Size int64
// Used to track how many nodes have this image
NumNodes int
}
// NodeInfo is node level aggregated information.
type NodeInfo struct {
// Overall node information.
node *corev1.Node
// Pods running on the node.
Pods []*PodInfo
// The subset of pods with affinity.
PodsWithAffinity []*PodInfo
// The subset of pods with required anti-affinity.
PodsWithRequiredAntiAffinity []*PodInfo
// Ports allocated on the node.
UsedPorts HostPortInfo
// Total requested resources of all pods on this node. This includes assumed
// pods, which scheduler has sent for binding, but may not be scheduled yet.
Requested *util.Resource
// Total requested resources of all pods on this node with a minimum value
// applied to each container's CPU and memory requests. This does not reflect
// the actual resource requests for this node, but is used to avoid scheduling
// many zero-request pods onto one node.
NonZeroRequested *util.Resource
// We store allocatedResources (which is Node.Status.Allocatable.*) explicitly
// as int64, to avoid conversions and accessing map.
Allocatable *util.Resource
// ImageStates holds the entry of an image if and only if this image is on the node. The entry can be used for
// checking an image's existence and advanced usage (e.g., image locality scheduling policy) based on the image
// state information.
ImageStates map[string]*ImageStateSummary
// PVCRefCounts contains a mapping of PVC names to the number of pods on the node using it.
// Keys are in the format "namespace/name".
PVCRefCounts map[string]int
// Whenever NodeInfo changes, generation is bumped.
// This is used to avoid cloning it if the object didn't change.
Generation int64
}
// nextGeneration: Let's make sure history never forgets the name...
// Increments the generation number monotonically ensuring that generation numbers never collide.
// Collision of the generation numbers would be particularly problematic if a node was deleted and
// added back with the same name. See issue#63262.
func nextGeneration() int64 {
return atomic.AddInt64(&generation, 1)
}
// NewNodeInfo returns a ready to use empty NodeInfo object.
// If any pods are given in arguments, their information will be aggregated in
// the returned object.
func NewNodeInfo(pods ...*corev1.Pod) *NodeInfo {
ni := &NodeInfo{
Requested: &util.Resource{},
NonZeroRequested: &util.Resource{},
Allocatable: &util.Resource{},
Generation: nextGeneration(),
UsedPorts: make(HostPortInfo),
ImageStates: make(map[string]*ImageStateSummary),
PVCRefCounts: make(map[string]int),
}
for _, pod := range pods {
ni.AddPod(pod)
}
return ni
}
// Node returns overall information about this node.
func (n *NodeInfo) Node() *corev1.Node {
if n == nil {
return nil
}
return n.node
}
// Clone returns a copy of this node.
func (n *NodeInfo) Clone() *NodeInfo {
clone := &NodeInfo{
node: n.node,
Requested: n.Requested.Clone(),
NonZeroRequested: n.NonZeroRequested.Clone(),
Allocatable: n.Allocatable.Clone(),
UsedPorts: make(HostPortInfo),
ImageStates: n.ImageStates,
PVCRefCounts: make(map[string]int),
Generation: n.Generation,
}
if len(n.Pods) > 0 {
clone.Pods = append([]*PodInfo(nil), n.Pods...)
}
if len(n.UsedPorts) > 0 {
// HostPortInfo is a map-in-map struct
// make sure it's deep copied
for ip, portMap := range n.UsedPorts {
clone.UsedPorts[ip] = make(map[ProtocolPort]struct{})
for protocolPort, v := range portMap {
clone.UsedPorts[ip][protocolPort] = v
}
}
}
if len(n.PodsWithAffinity) > 0 {
clone.PodsWithAffinity = append([]*PodInfo(nil), n.PodsWithAffinity...)
}
if len(n.PodsWithRequiredAntiAffinity) > 0 {
clone.PodsWithRequiredAntiAffinity = append([]*PodInfo(nil), n.PodsWithRequiredAntiAffinity...)
}
for key, value := range n.PVCRefCounts {
clone.PVCRefCounts[key] = value
}
return clone
}
// String returns representation of human readable format of this NodeInfo.
func (n *NodeInfo) String() string {
podKeys := make([]string, len(n.Pods))
for i, p := range n.Pods {
podKeys[i] = p.Pod.Name
}
return fmt.Sprintf("&NodeInfo{Pods:%v, RequestedResource:%#v, NonZeroRequest: %#v, UsedPort: %#v, AllocatableResource:%#v}",
podKeys, n.Requested, n.NonZeroRequested, n.UsedPorts, n.Allocatable)
}
// AddPodInfo adds pod information to this NodeInfo.
// Consider using this instead of AddPod if a PodInfo is already computed.
func (n *NodeInfo) AddPodInfo(podInfo *PodInfo) {
n.Pods = append(n.Pods, podInfo)
if podWithAffinity(podInfo.Pod) {
n.PodsWithAffinity = append(n.PodsWithAffinity, podInfo)
}
if podWithRequiredAntiAffinity(podInfo.Pod) {
n.PodsWithRequiredAntiAffinity = append(n.PodsWithRequiredAntiAffinity, podInfo)
}
n.update(podInfo.Pod, 1)
}
// AddPod is a wrapper around AddPodInfo.
func (n *NodeInfo) AddPod(pod *corev1.Pod) {
n.AddPodInfo(NewPodInfo(pod))
}
func podWithAffinity(p *corev1.Pod) bool {
affinity := p.Spec.Affinity
return affinity != nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil)
}
func podWithRequiredAntiAffinity(p *corev1.Pod) bool {
affinity := p.Spec.Affinity
return affinity != nil && affinity.PodAntiAffinity != nil &&
len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0
}
func removeFromSlice(s []*PodInfo, k string) []*PodInfo {
for i := range s {
k2, err := GetPodKey(s[i].Pod)
if err != nil {
klog.ErrorS(err, "Cannot get pod key", "pod", klog.KObj(s[i].Pod))
continue
}
if k == k2 {
// delete the element
s[i] = s[len(s)-1]
s = s[:len(s)-1]
break
}
}
return s
}
// RemovePod subtracts pod information from this NodeInfo.
func (n *NodeInfo) RemovePod(pod *corev1.Pod) error {
k, err := GetPodKey(pod)
if err != nil {
return err
}
if podWithAffinity(pod) {
n.PodsWithAffinity = removeFromSlice(n.PodsWithAffinity, k)
}
if podWithRequiredAntiAffinity(pod) {
n.PodsWithRequiredAntiAffinity = removeFromSlice(n.PodsWithRequiredAntiAffinity, k)
}
for i := range n.Pods {
k2, err := GetPodKey(n.Pods[i].Pod)
if err != nil {
klog.ErrorS(err, "Cannot get pod key", "pod", klog.KObj(n.Pods[i].Pod))
continue
}
if k == k2 {
// delete the element
n.Pods[i] = n.Pods[len(n.Pods)-1]
n.Pods = n.Pods[:len(n.Pods)-1]
n.update(pod, -1)
n.resetSlicesIfEmpty()
return nil
}
}
return fmt.Errorf("no corresponding pod %s in pods of node %s", pod.Name, n.node.Name)
}
// update node info based on the pod and sign.
// The sign will be set to `+1` when AddPod and to `-1` when RemovePod.
func (n *NodeInfo) update(pod *corev1.Pod, sign int64) {
res, non0CPU, non0Mem := calculateResource(pod)
n.Requested.MilliCPU += sign * res.MilliCPU
n.Requested.Memory += sign * res.Memory
n.Requested.EphemeralStorage += sign * res.EphemeralStorage
if n.Requested.ScalarResources == nil && len(res.ScalarResources) > 0 {
n.Requested.ScalarResources = map[corev1.ResourceName]int64{}
}
for rName, rQuant := range res.ScalarResources {
n.Requested.ScalarResources[rName] += sign * rQuant
}
n.NonZeroRequested.MilliCPU += sign * non0CPU
n.NonZeroRequested.Memory += sign * non0Mem
// Consume ports when pod added or release ports when pod removed.
n.updateUsedPorts(pod, sign > 0)
n.updatePVCRefCounts(pod, sign > 0)
n.Generation = nextGeneration()
}
// resets the slices to nil so that we can do DeepEqual in unit tests.
func (n *NodeInfo) resetSlicesIfEmpty() {
if len(n.PodsWithAffinity) == 0 {
n.PodsWithAffinity = nil
}
if len(n.PodsWithRequiredAntiAffinity) == 0 {
n.PodsWithRequiredAntiAffinity = nil
}
if len(n.Pods) == 0 {
n.Pods = nil
}
}
func max(a, b int64) int64 {
if a >= b {
return a
}
return b
}
// resourceRequest = max(sum(podSpec.Containers), podSpec.InitContainers) + overHead
func calculateResource(pod *corev1.Pod) (res util.Resource, non0CPU int64, non0Mem int64) {
resPtr := &res
for _, c := range pod.Spec.Containers {
resPtr.Add(c.Resources.Requests)
non0CPUReq, non0MemReq := schedutil.GetNonzeroRequests(&c.Resources.Requests)
non0CPU += non0CPUReq
non0Mem += non0MemReq
// No non-zero resources for GPUs or opaque resources.
}
for _, ic := range pod.Spec.InitContainers {
resPtr.SetMaxResource(ic.Resources.Requests)
non0CPUReq, non0MemReq := schedutil.GetNonzeroRequests(&ic.Resources.Requests)
non0CPU = max(non0CPU, non0CPUReq)
non0Mem = max(non0Mem, non0MemReq)
}
// If Overhead is being utilized, add to the total requests for the pod
if pod.Spec.Overhead != nil {
resPtr.Add(pod.Spec.Overhead)
if _, found := pod.Spec.Overhead[corev1.ResourceCPU]; found {
non0CPU += pod.Spec.Overhead.Cpu().MilliValue()
}
if _, found := pod.Spec.Overhead[corev1.ResourceMemory]; found {
non0Mem += pod.Spec.Overhead.Memory().Value()
}
}
return
}
// updateUsedPorts updates the UsedPorts of NodeInfo.
func (n *NodeInfo) updateUsedPorts(pod *corev1.Pod, add bool) {
for _, container := range pod.Spec.Containers {
for _, podPort := range container.Ports {
if add {
n.UsedPorts.Add(podPort.HostIP, string(podPort.Protocol), podPort.HostPort)
} else {
n.UsedPorts.Remove(podPort.HostIP, string(podPort.Protocol), podPort.HostPort)
}
}
}
}
// updatePVCRefCounts updates the PVCRefCounts of NodeInfo.
func (n *NodeInfo) updatePVCRefCounts(pod *corev1.Pod, add bool) {
for _, v := range pod.Spec.Volumes {
if v.PersistentVolumeClaim == nil {
continue
}
key := GetNamespacedName(pod.Namespace, v.PersistentVolumeClaim.ClaimName)
if add {
n.PVCRefCounts[key]++
} else {
n.PVCRefCounts[key]--
if n.PVCRefCounts[key] <= 0 {
delete(n.PVCRefCounts, key)
}
}
}
}
// SetNode sets the overall node information.
func (n *NodeInfo) SetNode(node *corev1.Node) {
n.node = node
n.Allocatable = util.NewResource(node.Status.Allocatable)
n.Generation = nextGeneration()
}
// RemoveNode removes the node object, leaving all other tracking information.
func (n *NodeInfo) RemoveNode() {
n.node = nil
n.Generation = nextGeneration()
}
// GetPodKey returns the string key of a pod.
func GetPodKey(pod *corev1.Pod) (string, error) {
uid := string(pod.UID)
if len(uid) == 0 {
return "", errors.New("cannot get cache key for pod with empty UID")
}
return uid, nil
}
// GetNamespacedName returns the string format of a namespaced resource name.
func GetNamespacedName(namespace, name string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}
// DefaultBindAllHostIP defines the default ip address used to bind to all host.
const DefaultBindAllHostIP = "0.0.0.0"
// ProtocolPort represents a protocol port pair, e.g. tcp:80.
type ProtocolPort struct {
Protocol string
Port int32
}
// NewProtocolPort creates a ProtocolPort instance.
func NewProtocolPort(protocol string, port int32) *ProtocolPort {
pp := &ProtocolPort{
Protocol: protocol,
Port: port,
}
if len(pp.Protocol) == 0 {
pp.Protocol = string(corev1.ProtocolTCP)
}
return pp
}
// HostPortInfo stores mapping from ip to a set of ProtocolPort
type HostPortInfo map[string]map[ProtocolPort]struct{}
// Add adds (ip, protocol, port) to HostPortInfo
func (h HostPortInfo) Add(ip, protocol string, port int32) {
if port <= 0 {
return
}
h.sanitize(&ip, &protocol)
pp := NewProtocolPort(protocol, port)
if _, ok := h[ip]; !ok {
h[ip] = map[ProtocolPort]struct{}{
*pp: {},
}
return
}
h[ip][*pp] = struct{}{}
}
// Remove removes (ip, protocol, port) from HostPortInfo
func (h HostPortInfo) Remove(ip, protocol string, port int32) {
if port <= 0 {
return
}
h.sanitize(&ip, &protocol)
pp := NewProtocolPort(protocol, port)
if m, ok := h[ip]; ok {
delete(m, *pp)
if len(h[ip]) == 0 {
delete(h, ip)
}
}
}
// Len returns the total number of (ip, protocol, port) tuple in HostPortInfo
func (h HostPortInfo) Len() int {
length := 0
for _, m := range h {
length += len(m)
}
return length
}
// CheckConflict checks if the input (ip, protocol, port) conflicts with the existing
// ones in HostPortInfo.
func (h HostPortInfo) CheckConflict(ip, protocol string, port int32) bool {
if port <= 0 {
return false
}
h.sanitize(&ip, &protocol)
pp := NewProtocolPort(protocol, port)
// If ip is 0.0.0.0 check all IP's (protocol, port) pair
if ip == DefaultBindAllHostIP {
for _, m := range h {
if _, ok := m[*pp]; ok {
return true
}
}
return false
}
// If ip isn't 0.0.0.0, only check IP and 0.0.0.0's (protocol, port) pair
for _, key := range []string{DefaultBindAllHostIP, ip} {
if m, ok := h[key]; ok {
if _, ok2 := m[*pp]; ok2 {
return true
}
}
}
return false
}
// sanitize the parameters
func (h HostPortInfo) sanitize(ip, protocol *string) {
if len(*ip) == 0 {
*ip = DefaultBindAllHostIP
}
if len(*protocol) == 0 {
*protocol = string(corev1.ProtocolTCP)
}
}

View File

@ -0,0 +1,82 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This code is directly lifted from the Kubernetes codebase in order to avoid relying on the k8s.io/kubernetes package.
// For reference:
// https://github.com/kubernetes/kubernetes/blob/release-1.25/pkg/scheduler/util/pod_resources.go
package util
import (
corev1 "k8s.io/api/core/v1"
)
// For each of these resources, a pod that doesn't request the resource explicitly
// will be treated as having requested the amount indicated below, for the purpose
// of computing priority only. This ensures that when scheduling zero-request pods, such
// pods will not all be scheduled to the node with the smallest in-use request,
// and that when scheduling regular pods, such pods will not see zero-request pods as
// consuming no resources whatsoever. We chose these values to be similar to the
// resources that we give to cluster addon pods (#10653). But they are pretty arbitrary.
// As described in #11713, we use request instead of limit to deal with resource requirements.
const (
// DefaultMilliCPURequest defines default milli cpu request number.
DefaultMilliCPURequest int64 = 100 // 0.1 core
// DefaultMemoryRequest defines default memory request size.
DefaultMemoryRequest int64 = 200 * 1024 * 1024 // 200 MB
)
// GetNonzeroRequests returns the default cpu and memory resource request if none is found or
// what is provided on the request.
func GetNonzeroRequests(requests *corev1.ResourceList) (int64, int64) {
return GetRequestForResource(corev1.ResourceCPU, requests, true),
GetRequestForResource(corev1.ResourceMemory, requests, true)
}
// GetRequestForResource returns the requested values unless nonZero is true and there is no defined request
// for CPU and memory.
// If nonZero is true and the resource has no defined request for CPU or memory, it returns a default value.
func GetRequestForResource(resource corev1.ResourceName, requests *corev1.ResourceList, nonZero bool) int64 {
if requests == nil {
return 0
}
switch resource {
case corev1.ResourceCPU:
// Override if un-set, but not if explicitly set to zero
if _, found := (*requests)[corev1.ResourceCPU]; !found && nonZero {
return DefaultMilliCPURequest
}
return requests.Cpu().MilliValue()
case corev1.ResourceMemory:
// Override if un-set, but not if explicitly set to zero
if _, found := (*requests)[corev1.ResourceMemory]; !found && nonZero {
return DefaultMemoryRequest
}
return requests.Memory().Value()
case corev1.ResourceEphemeralStorage:
quantity, found := (*requests)[corev1.ResourceEphemeralStorage]
if !found {
return 0
}
return quantity.Value()
default:
quantity, found := (*requests)[resource]
if !found {
return 0
}
return quantity.Value()
}
}