This commit is contained in:
LegGasai 2025-06-22 07:21:44 +00:00 committed by GitHub
commit a329f6864e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 13827 additions and 797 deletions

View File

@ -106,9 +106,7 @@ generate-code:
manifests: controller-gen
go mod vendor
# volcano crd base
$(CONTROLLER_GEN) $(CRD_OPTIONS) paths="./vendor/volcano.sh/apis/pkg/apis/scheduling/v1beta1;./vendor/volcano.sh/apis/pkg/apis/batch/v1alpha1;./vendor/volcano.sh/apis/pkg/apis/bus/v1alpha1;./vendor/volcano.sh/apis/pkg/apis/nodeinfo/v1alpha1;./vendor/volcano.sh/apis/pkg/apis/topology/v1alpha1" output:crd:artifacts:config=config/crd/volcano/bases
# generate volcano job crd yaml without description to avoid yaml size limit when using `kubectl apply`
$(CONTROLLER_GEN) $(CRD_OPTIONS_EXCLUDE_DESCRIPTION) paths="./vendor/volcano.sh/apis/pkg/apis/batch/v1alpha1" output:crd:artifacts:config=config/crd/volcano/bases
$(CONTROLLER_GEN) $(CRD_OPTIONS_EXCLUDE_DESCRIPTION) paths="./vendor/volcano.sh/apis/pkg/apis/scheduling/v1beta1;./vendor/volcano.sh/apis/pkg/apis/batch/v1alpha1;./vendor/volcano.sh/apis/pkg/apis/bus/v1alpha1;./vendor/volcano.sh/apis/pkg/apis/nodeinfo/v1alpha1;./vendor/volcano.sh/apis/pkg/apis/topology/v1alpha1" output:crd:artifacts:config=config/crd/volcano/bases
# jobflow crd base
$(CONTROLLER_GEN) $(CRD_OPTIONS) paths="./vendor/volcano.sh/apis/pkg/apis/flow/v1alpha1" output:crd:artifacts:config=config/crd/jobflow/bases
# generate volcano jobflow crd yaml without description to avoid yaml size limit when using `kubectl apply`

View File

@ -172,6 +172,10 @@ spec:
replicas:
format: int32
type: integer
reservationNodeNames:
items:
type: string
type: array
template:
properties:
metadata:

View File

@ -190,6 +190,10 @@ spec:
replicas:
format: int32
type: integer
reservationNodeNames:
items:
type: string
type: array
template:
properties:
metadata:

View File

@ -17,69 +17,32 @@ spec:
- name: v1alpha1
schema:
openAPIV3Schema:
description: Command defines command structure.
properties:
action:
description: Action defines the action that will be took to the target
object.
type: string
apiVersion:
description: |-
APIVersion defines the versioned schema of this representation of an object.
Servers should convert recognized schemas to the latest internal value, and
may reject unrecognized values.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
type: string
kind:
description: |-
Kind is a string value representing the REST resource this object represents.
Servers may infer this from the endpoint the client submits requests to.
Cannot be updated.
In CamelCase.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
message:
description: Human-readable message indicating details of this command.
type: string
metadata:
type: object
reason:
description: Unique, one-word, CamelCase reason for this command.
type: string
target:
description: TargetObject defines the target object of this command.
properties:
apiVersion:
description: API version of the referent.
type: string
blockOwnerDeletion:
description: |-
If true, AND if the owner has the "foregroundDeletion" finalizer, then
the owner cannot be deleted from the key-value store until this
reference is removed.
See https://kubernetes.io/docs/concepts/architecture/garbage-collection/#foreground-deletion
for how the garbage collector interacts with this field and enforces the foreground deletion.
Defaults to false.
To set this field, a user needs "delete" permission of the owner,
otherwise 422 (Unprocessable Entity) will be returned.
type: boolean
controller:
description: If true, this reference points to the managing controller.
type: boolean
kind:
description: |-
Kind of the referent.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
name:
description: |-
Name of the referent.
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names#names
type: string
uid:
description: |-
UID of the referent.
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names#uids
type: string
required:
- apiVersion

View File

@ -19,31 +19,17 @@ spec:
- name: v1alpha1
schema:
openAPIV3Schema:
description: Numatopology is the Schema for the Numatopologies API
properties:
apiVersion:
description: |-
APIVersion defines the versioned schema of this representation of an object.
Servers should convert recognized schemas to the latest internal value, and
may reject unrecognized values.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
type: string
kind:
description: |-
Kind is a string value representing the REST resource this object represents.
Servers may infer this from the endpoint the client submits requests to.
Cannot be updated.
In CamelCase.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
metadata:
type: object
spec:
description: Specification of the numa information of the worker node
properties:
cpuDetail:
additionalProperties:
description: CPUInfo is the cpu topology detail
properties:
core:
minimum: 0
@ -55,35 +41,23 @@ spec:
minimum: 0
type: integer
type: object
description: |-
Specifies the cpu topology info
Key is cpu id
type: object
numares:
additionalProperties:
description: ResourceInfo is the sets about resource capacity and
allocatable
properties:
allocatable:
type: string
capacity:
type: integer
type: object
description: |-
Specifies the numa info for the resource
Key is resource name
type: object
policies:
additionalProperties:
type: string
description: Specifies the policy of the manager
type: object
resReserved:
additionalProperties:
type: string
description: |-
Specifies the reserved resource of the node
Key is resource name
type: object
type: object
type: object

View File

@ -37,35 +37,16 @@ spec:
name: v1beta1
schema:
openAPIV3Schema:
description: PodGroup is a collection of Pod; used for batch workload.
properties:
apiVersion:
description: |-
APIVersion defines the versioned schema of this representation of an object.
Servers should convert recognized schemas to the latest internal value, and
may reject unrecognized values.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
type: string
kind:
description: |-
Kind is a string value representing the REST resource this object represents.
Servers may infer this from the endpoint the client submits requests to.
Cannot be updated.
In CamelCase.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
metadata:
type: object
spec:
description: |-
Specification of the desired behavior of the pod group.
More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#spec-and-status
properties:
minMember:
description: |-
MinMember defines the minimal number of members/tasks to run the pod group;
if there's not enough resources to start all tasks, the scheduler
will not start anyone.
format: int32
type: integer
minResources:
@ -75,102 +56,59 @@ spec:
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: |-
MinResources defines the minimal resource of members/tasks to run the pod group;
if there's not enough resources to start all tasks, the scheduler
will not start anyone.
type: object
minTaskMember:
additionalProperties:
format: int32
type: integer
description: |-
MinTaskMember defines the minimal number of pods to run each task in the pod group;
if there's not enough resources to start each task, the scheduler
will not start anyone.
type: object
networkTopology:
description: NetworkTopology defines the NetworkTopology config, this
field works in conjunction with network topology feature and hyperNode
CRD.
properties:
highestTierAllowed:
default: 1
description: HighestTierAllowed specifies the highest tier that
a job allowed to cross when scheduling.
type: integer
mode:
default: hard
description: Mode specifies the mode of the network topology constrain.
enum:
- hard
- soft
type: string
type: object
priorityClassName:
description: |-
If specified, indicates the PodGroup's priority. "system-node-critical" and
"system-cluster-critical" are two special keywords which indicate the
highest priorities with the former being the highest priority. Any other
name must be defined by creating a PriorityClass object with that name.
If not specified, the PodGroup priority will be default or zero if there is no
default.
type: string
queue:
default: default
description: |-
Queue defines the queue to allocate resource for PodGroup; if queue does not exist,
the PodGroup will not be scheduled. Defaults to `default` Queue with the lowest weight.
type: string
type: object
status:
description: |-
Status represents the current information about a pod group.
This data may not be up to date.
properties:
conditions:
description: The conditions of PodGroup.
items:
description: PodGroupCondition contains details for the current
state of this pod group.
properties:
lastTransitionTime:
description: Last time the phase transitioned from another to
current phase.
format: date-time
type: string
message:
description: Human-readable message indicating details about
last transition.
type: string
reason:
description: Unique, one-word, CamelCase reason for the phase's
last transition.
type: string
status:
description: Status is the status of the condition.
type: string
transitionID:
description: The ID of condition transition.
type: string
type:
description: Type is the type of the condition
type: string
type: object
type: array
failed:
description: The number of pods which reached phase Failed.
format: int32
type: integer
phase:
description: Current phase of PodGroup.
type: string
running:
description: The number of actively running pods.
format: int32
type: integer
succeeded:
description: The number of pods which reached phase Succeeded.
format: int32
type: integer
type: object

View File

@ -24,38 +24,18 @@ spec:
name: v1beta1
schema:
openAPIV3Schema:
description: Queue is a queue of PodGroup.
properties:
apiVersion:
description: |-
APIVersion defines the versioned schema of this representation of an object.
Servers should convert recognized schemas to the latest internal value, and
may reject unrecognized values.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
type: string
kind:
description: |-
Kind is a string value representing the REST resource this object represents.
Servers may infer this from the endpoint the client submits requests to.
Cannot be updated.
In CamelCase.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
metadata:
type: object
spec:
description: |-
Specification of the desired behavior of the queue.
More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#spec-and-status
properties:
affinity:
description: If specified, the pod owned by the queue will be scheduled
with constraint
properties:
nodeGroupAffinity:
description: Describes nodegroup affinity scheduling rules for
the queue(e.g. putting pods of the queue in the nodes of the
nodegroup)
properties:
preferredDuringSchedulingIgnoredDuringExecution:
items:
@ -67,9 +47,6 @@ spec:
type: array
type: object
nodeGroupAntiAffinity:
description: Describes nodegroup anti-affinity scheduling rules
for the queue(e.g. avoid putting pods of the queue in the nodes
of the nodegroup).
properties:
preferredDuringSchedulingIgnoredDuringExecution:
items:
@ -88,7 +65,6 @@ spec:
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: ResourceList is a set of (resource name, quantity) pairs.
type: object
deserved:
additionalProperties:
@ -97,14 +73,9 @@ spec:
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: The amount of resources configured by the user. This
part of resource can be shared with other queues and reclaimed back.
type: object
extendClusters:
description: extendCluster indicate the jobs in this Queue will be
dispatched to these clusters.
items:
description: CluterSpec represents the template of Cluster
properties:
capacity:
additionalProperties:
@ -113,8 +84,6 @@ spec:
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: ResourceList is a set of (resource name, quantity)
pairs.
type: object
name:
type: string
@ -124,7 +93,6 @@ spec:
type: object
type: array
guarantee:
description: Guarantee indicate configuration about resource reservation
properties:
resource:
additionalProperties:
@ -133,24 +101,16 @@ spec:
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: The amount of cluster resource reserved for queue.
Just set either `percentage` or `resource`
type: object
type: object
parent:
description: Parent define the parent of queue
type: string
priority:
description: Priority define the priority of queue. Higher values
are prioritized for scheduling and considered later during reclamation.
format: int32
type: integer
reclaimable:
description: Reclaimable indicate whether the queue can be reclaimed
by other queue
type: boolean
type:
description: Type define the type of queue
type: string
weight:
default: 1
@ -160,7 +120,6 @@ spec:
type: integer
type: object
status:
description: The status of queue.
properties:
allocated:
additionalProperties:
@ -169,26 +128,19 @@ spec:
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: Allocated is allocated resources in queue
type: object
completed:
description: The number of `Completed` PodGroup in this queue.
format: int32
type: integer
inqueue:
description: The number of `Inqueue` PodGroup in this queue.
format: int32
type: integer
pending:
description: The number of 'Pending' PodGroup in this queue.
format: int32
type: integer
reservation:
description: Reservation is the profile of resource reservation for
queue
properties:
nodes:
description: Nodes are Locked nodes for queue
items:
type: string
type: array
@ -199,19 +151,14 @@ spec:
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: Resource is a list of total idle resource in locked
nodes.
type: object
type: object
running:
description: The number of 'Running' PodGroup in this queue.
format: int32
type: integer
state:
description: State is state of queue
type: string
unknown:
description: The number of 'Unknown' PodGroup in this queue.
format: int32
type: integer
type: object

File diff suppressed because it is too large Load Diff

View File

@ -29,74 +29,35 @@ spec:
name: v1alpha1
schema:
openAPIV3Schema:
description: HyperNode represents a collection of nodes sharing similar network
topology or performance characteristics.
properties:
apiVersion:
description: |-
APIVersion defines the versioned schema of this representation of an object.
Servers should convert recognized schemas to the latest internal value, and
may reject unrecognized values.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
type: string
kind:
description: |-
Kind is a string value representing the REST resource this object represents.
Servers may infer this from the endpoint the client submits requests to.
Cannot be updated.
In CamelCase.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
metadata:
type: object
spec:
description: Spec defines the desired configuration of the HyperNode.
properties:
members:
description: Members defines a list of node groups or individual nodes
included in the HyperNode.
items:
description: MemberSpec represents a specific node or a hyperNodes
in the hyperNode.
properties:
selector:
description: Selector defines the selection rules for this member.
properties:
exactMatch:
description: ExactMatch defines the exact match criteria.
properties:
name:
description: Name specifies the exact name of the node
to match.
type: string
type: object
labelMatch:
description: LabelMatch defines the labels match criteria
(only take effect when Member Type is "Node").
properties:
matchExpressions:
description: matchExpressions is a list of label selector
requirements. The requirements are ANDed.
items:
description: |-
A label selector requirement is a selector that contains values, a key, and an operator that
relates the key and values.
properties:
key:
description: key is the label key that the selector
applies to.
type: string
operator:
description: |-
operator represents a key's relationship to a set of values.
Valid operators are In, NotIn, Exists and DoesNotExist.
type: string
values:
description: |-
values is an array of string values. If the operator is In or NotIn,
the values array must be non-empty. If the operator is Exists or DoesNotExist,
the values array must be empty. This array is replaced during a strategic
merge patch.
items:
type: string
type: array
@ -110,19 +71,12 @@ spec:
matchLabels:
additionalProperties:
type: string
description: |-
matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels
map is equivalent to an element of matchExpressions, whose key field is "key", the
operator is "In", and the values array contains only "value". The requirements are ANDed.
type: object
type: object
x-kubernetes-map-type: atomic
regexMatch:
description: RegexMatch defines the regex match criteria.
properties:
pattern:
description: Pattern defines the regex pattern to match
node names.
type: string
type: object
type: object
@ -135,7 +89,6 @@ spec:
rule: '(has(self.exactMatch) ? 1 : 0) + (has(self.regexMatch)
? 1 : 0) + (has(self.labelMatch) ? 1 : 0) <= 1'
type:
description: Type specifies the member type.
enum:
- Node
- HyperNode
@ -145,61 +98,37 @@ spec:
type: object
type: array
tier:
description: Tier categorizes the performance level of the HyperNode.
type: integer
required:
- tier
type: object
status:
description: Status provides the current state of the HyperNode.
properties:
conditions:
description: Conditions provide details about the current state of
the HyperNode.
items:
description: Condition contains details for one aspect of the current
state of this API Resource.
properties:
lastTransitionTime:
description: |-
lastTransitionTime is the last time the condition transitioned from one status to another.
This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable.
format: date-time
type: string
message:
description: |-
message is a human readable message indicating details about the transition.
This may be an empty string.
maxLength: 32768
type: string
observedGeneration:
description: |-
observedGeneration represents the .metadata.generation that the condition was set based upon.
For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date
with respect to the current state of the instance.
format: int64
minimum: 0
type: integer
reason:
description: |-
reason contains a programmatic identifier indicating the reason for the condition's last transition.
Producers of specific condition types may define expected values and meanings for this field,
and whether the values are considered a guaranteed API.
The value should be a CamelCase string.
This field may not be empty.
maxLength: 1024
minLength: 1
pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$
type: string
status:
description: status of the condition, one of True, False, Unknown.
enum:
- "True"
- "False"
- Unknown
type: string
type:
description: type of condition in CamelCase or in foo.example.com/CamelCase.
maxLength: 316
pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$
type: string
@ -212,8 +141,6 @@ spec:
type: object
type: array
nodeCount:
description: NodeCount is the total number of nodes currently in the
HyperNode.
format: int64
minimum: 0
type: integer

9
go.mod
View File

@ -49,7 +49,8 @@ require (
sigs.k8s.io/controller-runtime v0.13.0
sigs.k8s.io/yaml v1.4.0
stathat.com/c/consistent v1.0.0
volcano.sh/apis v1.12.1
//volcano.sh/apis v1.12.0
volcano.sh/apis v0.0.0
)
require (
@ -113,7 +114,7 @@ require (
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/google/uuid v1.6.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
@ -211,3 +212,7 @@ replace (
k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.32.2
k8s.io/sample-controller => k8s.io/sample-controller v0.32.2
)
// replace volcano.sh/apis => ../volcano-apis
replace volcano.sh/apis => github.com/LegGasai/apis v0.0.0-20250608025211-f68ef56e19bb

4
go.sum
View File

@ -6,6 +6,8 @@ github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/JeffAshton/win_pdh v0.0.0-20161109143554-76bb4ee9f0ab h1:UKkYhof1njT1/xq4SEg5z+VpTgjmNeHwPGRQl7takDI=
github.com/JeffAshton/win_pdh v0.0.0-20161109143554-76bb4ee9f0ab/go.mod h1:3VYc5hodBMJ5+l/7J4xAyMeuM2PNuepvHlGs8yilUCA=
github.com/LegGasai/apis v0.0.0-20250608025211-f68ef56e19bb h1:cOXexFLVkKIRWZ9srnD6KU/UEIPLjQip8UvWkOC8i8k=
github.com/LegGasai/apis v0.0.0-20250608025211-f68ef56e19bb/go.mod h1:0XNNnIOevJSYNiXRmwhXUrYCcCcWcBeTY0nxrlkk03A=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I=
@ -528,5 +530,3 @@ sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=
volcano.sh/apis v1.12.1 h1:yq5dVj/g21vnWObCIKsJKPhMoThpzDrHDD/GMouYVxk=
volcano.sh/apis v1.12.1/go.mod h1:0XNNnIOevJSYNiXRmwhXUrYCcCcWcBeTY0nxrlkk03A=

View File

@ -92,6 +92,7 @@ tail -n +2 ${VOLCANO_CRD_DIR}/bases/batch.volcano.sh_jobs.yaml > ${HELM_VOLCANO_
tail -n +2 ${VOLCANO_CRD_DIR}/bases/bus.volcano.sh_commands.yaml > ${HELM_VOLCANO_CRD_DIR}/bases/bus.volcano.sh_commands.yaml
tail -n +2 ${VOLCANO_CRD_DIR}/bases/scheduling.volcano.sh_podgroups.yaml > ${HELM_VOLCANO_CRD_DIR}/bases/scheduling.volcano.sh_podgroups.yaml
tail -n +2 ${VOLCANO_CRD_DIR}/bases/scheduling.volcano.sh_queues.yaml > ${HELM_VOLCANO_CRD_DIR}/bases/scheduling.volcano.sh_queues.yaml
tail -n +2 ${VOLCANO_CRD_DIR}/bases/scheduling.volcano.sh_reservations.yaml > ${HELM_VOLCANO_CRD_DIR}/bases/scheduling.volcano.sh_reservations.yaml
tail -n +2 ${VOLCANO_CRD_DIR}/bases/nodeinfo.volcano.sh_numatopologies.yaml > ${HELM_VOLCANO_CRD_DIR}/bases/nodeinfo.volcano.sh_numatopologies.yaml
tail -n +2 ${VOLCANO_CRD_DIR}/bases/topology.volcano.sh_hypernodes.yaml > ${HELM_VOLCANO_CRD_DIR}/bases/topology.volcano.sh_hypernodes.yaml
@ -136,6 +137,7 @@ ${HELM_BIN_DIR}/helm template ${VK_ROOT}/installer/helm/chart/volcano --namespac
-s templates/scheduler.yaml \
-s templates/scheduling_v1beta1_podgroup.yaml \
-s templates/scheduling_v1beta1_queue.yaml \
-s templates/scheduling_v1beta1_reservation.yaml \
-s templates/nodeinfo_v1alpha1_numatopologies.yaml \
-s templates/topology_v1alpha1_hypernodes.yaml \
-s templates/webhooks.yaml \

View File

@ -171,6 +171,10 @@ spec:
replicas:
format: int32
type: integer
reservationNodeNames:
items:
type: string
type: array
template:
properties:
metadata:

View File

@ -13,3 +13,4 @@ tiers:
- name: proportion
- name: nodeorder
- name: binpack
- name: reservation

View File

@ -189,6 +189,10 @@ spec:
replicas:
format: int32
type: integer
reservationNodeNames:
items:
type: string
type: array
template:
properties:
metadata:

View File

@ -16,69 +16,32 @@ spec:
- name: v1alpha1
schema:
openAPIV3Schema:
description: Command defines command structure.
properties:
action:
description: Action defines the action that will be took to the target
object.
type: string
apiVersion:
description: |-
APIVersion defines the versioned schema of this representation of an object.
Servers should convert recognized schemas to the latest internal value, and
may reject unrecognized values.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
type: string
kind:
description: |-
Kind is a string value representing the REST resource this object represents.
Servers may infer this from the endpoint the client submits requests to.
Cannot be updated.
In CamelCase.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
message:
description: Human-readable message indicating details of this command.
type: string
metadata:
type: object
reason:
description: Unique, one-word, CamelCase reason for this command.
type: string
target:
description: TargetObject defines the target object of this command.
properties:
apiVersion:
description: API version of the referent.
type: string
blockOwnerDeletion:
description: |-
If true, AND if the owner has the "foregroundDeletion" finalizer, then
the owner cannot be deleted from the key-value store until this
reference is removed.
See https://kubernetes.io/docs/concepts/architecture/garbage-collection/#foreground-deletion
for how the garbage collector interacts with this field and enforces the foreground deletion.
Defaults to false.
To set this field, a user needs "delete" permission of the owner,
otherwise 422 (Unprocessable Entity) will be returned.
type: boolean
controller:
description: If true, this reference points to the managing controller.
type: boolean
kind:
description: |-
Kind of the referent.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
name:
description: |-
Name of the referent.
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names#names
type: string
uid:
description: |-
UID of the referent.
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names#uids
type: string
required:
- apiVersion

View File

@ -18,31 +18,17 @@ spec:
- name: v1alpha1
schema:
openAPIV3Schema:
description: Numatopology is the Schema for the Numatopologies API
properties:
apiVersion:
description: |-
APIVersion defines the versioned schema of this representation of an object.
Servers should convert recognized schemas to the latest internal value, and
may reject unrecognized values.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
type: string
kind:
description: |-
Kind is a string value representing the REST resource this object represents.
Servers may infer this from the endpoint the client submits requests to.
Cannot be updated.
In CamelCase.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
metadata:
type: object
spec:
description: Specification of the numa information of the worker node
properties:
cpuDetail:
additionalProperties:
description: CPUInfo is the cpu topology detail
properties:
core:
minimum: 0
@ -54,35 +40,23 @@ spec:
minimum: 0
type: integer
type: object
description: |-
Specifies the cpu topology info
Key is cpu id
type: object
numares:
additionalProperties:
description: ResourceInfo is the sets about resource capacity and
allocatable
properties:
allocatable:
type: string
capacity:
type: integer
type: object
description: |-
Specifies the numa info for the resource
Key is resource name
type: object
policies:
additionalProperties:
type: string
description: Specifies the policy of the manager
type: object
resReserved:
additionalProperties:
type: string
description: |-
Specifies the reserved resource of the node
Key is resource name
type: object
type: object
type: object

View File

@ -36,35 +36,16 @@ spec:
name: v1beta1
schema:
openAPIV3Schema:
description: PodGroup is a collection of Pod; used for batch workload.
properties:
apiVersion:
description: |-
APIVersion defines the versioned schema of this representation of an object.
Servers should convert recognized schemas to the latest internal value, and
may reject unrecognized values.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
type: string
kind:
description: |-
Kind is a string value representing the REST resource this object represents.
Servers may infer this from the endpoint the client submits requests to.
Cannot be updated.
In CamelCase.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
metadata:
type: object
spec:
description: |-
Specification of the desired behavior of the pod group.
More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#spec-and-status
properties:
minMember:
description: |-
MinMember defines the minimal number of members/tasks to run the pod group;
if there's not enough resources to start all tasks, the scheduler
will not start anyone.
format: int32
type: integer
minResources:
@ -74,102 +55,59 @@ spec:
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: |-
MinResources defines the minimal resource of members/tasks to run the pod group;
if there's not enough resources to start all tasks, the scheduler
will not start anyone.
type: object
minTaskMember:
additionalProperties:
format: int32
type: integer
description: |-
MinTaskMember defines the minimal number of pods to run each task in the pod group;
if there's not enough resources to start each task, the scheduler
will not start anyone.
type: object
networkTopology:
description: NetworkTopology defines the NetworkTopology config, this
field works in conjunction with network topology feature and hyperNode
CRD.
properties:
highestTierAllowed:
default: 1
description: HighestTierAllowed specifies the highest tier that
a job allowed to cross when scheduling.
type: integer
mode:
default: hard
description: Mode specifies the mode of the network topology constrain.
enum:
- hard
- soft
type: string
type: object
priorityClassName:
description: |-
If specified, indicates the PodGroup's priority. "system-node-critical" and
"system-cluster-critical" are two special keywords which indicate the
highest priorities with the former being the highest priority. Any other
name must be defined by creating a PriorityClass object with that name.
If not specified, the PodGroup priority will be default or zero if there is no
default.
type: string
queue:
default: default
description: |-
Queue defines the queue to allocate resource for PodGroup; if queue does not exist,
the PodGroup will not be scheduled. Defaults to `default` Queue with the lowest weight.
type: string
type: object
status:
description: |-
Status represents the current information about a pod group.
This data may not be up to date.
properties:
conditions:
description: The conditions of PodGroup.
items:
description: PodGroupCondition contains details for the current
state of this pod group.
properties:
lastTransitionTime:
description: Last time the phase transitioned from another to
current phase.
format: date-time
type: string
message:
description: Human-readable message indicating details about
last transition.
type: string
reason:
description: Unique, one-word, CamelCase reason for the phase's
last transition.
type: string
status:
description: Status is the status of the condition.
type: string
transitionID:
description: The ID of condition transition.
type: string
type:
description: Type is the type of the condition
type: string
type: object
type: array
failed:
description: The number of pods which reached phase Failed.
format: int32
type: integer
phase:
description: Current phase of PodGroup.
type: string
running:
description: The number of actively running pods.
format: int32
type: integer
succeeded:
description: The number of pods which reached phase Succeeded.
format: int32
type: integer
type: object

View File

@ -23,38 +23,18 @@ spec:
name: v1beta1
schema:
openAPIV3Schema:
description: Queue is a queue of PodGroup.
properties:
apiVersion:
description: |-
APIVersion defines the versioned schema of this representation of an object.
Servers should convert recognized schemas to the latest internal value, and
may reject unrecognized values.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
type: string
kind:
description: |-
Kind is a string value representing the REST resource this object represents.
Servers may infer this from the endpoint the client submits requests to.
Cannot be updated.
In CamelCase.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
metadata:
type: object
spec:
description: |-
Specification of the desired behavior of the queue.
More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#spec-and-status
properties:
affinity:
description: If specified, the pod owned by the queue will be scheduled
with constraint
properties:
nodeGroupAffinity:
description: Describes nodegroup affinity scheduling rules for
the queue(e.g. putting pods of the queue in the nodes of the
nodegroup)
properties:
preferredDuringSchedulingIgnoredDuringExecution:
items:
@ -66,9 +46,6 @@ spec:
type: array
type: object
nodeGroupAntiAffinity:
description: Describes nodegroup anti-affinity scheduling rules
for the queue(e.g. avoid putting pods of the queue in the nodes
of the nodegroup).
properties:
preferredDuringSchedulingIgnoredDuringExecution:
items:
@ -87,7 +64,6 @@ spec:
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: ResourceList is a set of (resource name, quantity) pairs.
type: object
deserved:
additionalProperties:
@ -96,14 +72,9 @@ spec:
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: The amount of resources configured by the user. This
part of resource can be shared with other queues and reclaimed back.
type: object
extendClusters:
description: extendCluster indicate the jobs in this Queue will be
dispatched to these clusters.
items:
description: CluterSpec represents the template of Cluster
properties:
capacity:
additionalProperties:
@ -112,8 +83,6 @@ spec:
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: ResourceList is a set of (resource name, quantity)
pairs.
type: object
name:
type: string
@ -123,7 +92,6 @@ spec:
type: object
type: array
guarantee:
description: Guarantee indicate configuration about resource reservation
properties:
resource:
additionalProperties:
@ -132,24 +100,16 @@ spec:
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: The amount of cluster resource reserved for queue.
Just set either `percentage` or `resource`
type: object
type: object
parent:
description: Parent define the parent of queue
type: string
priority:
description: Priority define the priority of queue. Higher values
are prioritized for scheduling and considered later during reclamation.
format: int32
type: integer
reclaimable:
description: Reclaimable indicate whether the queue can be reclaimed
by other queue
type: boolean
type:
description: Type define the type of queue
type: string
weight:
default: 1
@ -159,7 +119,6 @@ spec:
type: integer
type: object
status:
description: The status of queue.
properties:
allocated:
additionalProperties:
@ -168,26 +127,19 @@ spec:
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: Allocated is allocated resources in queue
type: object
completed:
description: The number of `Completed` PodGroup in this queue.
format: int32
type: integer
inqueue:
description: The number of `Inqueue` PodGroup in this queue.
format: int32
type: integer
pending:
description: The number of 'Pending' PodGroup in this queue.
format: int32
type: integer
reservation:
description: Reservation is the profile of resource reservation for
queue
properties:
nodes:
description: Nodes are Locked nodes for queue
items:
type: string
type: array
@ -198,19 +150,14 @@ spec:
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: Resource is a list of total idle resource in locked
nodes.
type: object
type: object
running:
description: The number of 'Running' PodGroup in this queue.
format: int32
type: integer
state:
description: State is state of queue
type: string
unknown:
description: The number of 'Unknown' PodGroup in this queue.
format: int32
type: integer
type: object

File diff suppressed because it is too large Load Diff

View File

@ -28,74 +28,35 @@ spec:
name: v1alpha1
schema:
openAPIV3Schema:
description: HyperNode represents a collection of nodes sharing similar network
topology or performance characteristics.
properties:
apiVersion:
description: |-
APIVersion defines the versioned schema of this representation of an object.
Servers should convert recognized schemas to the latest internal value, and
may reject unrecognized values.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
type: string
kind:
description: |-
Kind is a string value representing the REST resource this object represents.
Servers may infer this from the endpoint the client submits requests to.
Cannot be updated.
In CamelCase.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
metadata:
type: object
spec:
description: Spec defines the desired configuration of the HyperNode.
properties:
members:
description: Members defines a list of node groups or individual nodes
included in the HyperNode.
items:
description: MemberSpec represents a specific node or a hyperNodes
in the hyperNode.
properties:
selector:
description: Selector defines the selection rules for this member.
properties:
exactMatch:
description: ExactMatch defines the exact match criteria.
properties:
name:
description: Name specifies the exact name of the node
to match.
type: string
type: object
labelMatch:
description: LabelMatch defines the labels match criteria
(only take effect when Member Type is "Node").
properties:
matchExpressions:
description: matchExpressions is a list of label selector
requirements. The requirements are ANDed.
items:
description: |-
A label selector requirement is a selector that contains values, a key, and an operator that
relates the key and values.
properties:
key:
description: key is the label key that the selector
applies to.
type: string
operator:
description: |-
operator represents a key's relationship to a set of values.
Valid operators are In, NotIn, Exists and DoesNotExist.
type: string
values:
description: |-
values is an array of string values. If the operator is In or NotIn,
the values array must be non-empty. If the operator is Exists or DoesNotExist,
the values array must be empty. This array is replaced during a strategic
merge patch.
items:
type: string
type: array
@ -109,19 +70,12 @@ spec:
matchLabels:
additionalProperties:
type: string
description: |-
matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels
map is equivalent to an element of matchExpressions, whose key field is "key", the
operator is "In", and the values array contains only "value". The requirements are ANDed.
type: object
type: object
x-kubernetes-map-type: atomic
regexMatch:
description: RegexMatch defines the regex match criteria.
properties:
pattern:
description: Pattern defines the regex pattern to match
node names.
type: string
type: object
type: object
@ -134,7 +88,6 @@ spec:
rule: '(has(self.exactMatch) ? 1 : 0) + (has(self.regexMatch)
? 1 : 0) + (has(self.labelMatch) ? 1 : 0) <= 1'
type:
description: Type specifies the member type.
enum:
- Node
- HyperNode
@ -144,61 +97,37 @@ spec:
type: object
type: array
tier:
description: Tier categorizes the performance level of the HyperNode.
type: integer
required:
- tier
type: object
status:
description: Status provides the current state of the HyperNode.
properties:
conditions:
description: Conditions provide details about the current state of
the HyperNode.
items:
description: Condition contains details for one aspect of the current
state of this API Resource.
properties:
lastTransitionTime:
description: |-
lastTransitionTime is the last time the condition transitioned from one status to another.
This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable.
format: date-time
type: string
message:
description: |-
message is a human readable message indicating details about the transition.
This may be an empty string.
maxLength: 32768
type: string
observedGeneration:
description: |-
observedGeneration represents the .metadata.generation that the condition was set based upon.
For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date
with respect to the current state of the instance.
format: int64
minimum: 0
type: integer
reason:
description: |-
reason contains a programmatic identifier indicating the reason for the condition's last transition.
Producers of specific condition types may define expected values and meanings for this field,
and whether the values are considered a guaranteed API.
The value should be a CamelCase string.
This field may not be empty.
maxLength: 1024
minLength: 1
pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$
type: string
status:
description: status of the condition, one of True, False, Unknown.
enum:
- "True"
- "False"
- Unknown
type: string
type:
description: type of condition in CamelCase or in foo.example.com/CamelCase.
maxLength: 316
pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$
type: string
@ -211,8 +140,6 @@ spec:
type: object
type: array
nodeCount:
description: NodeCount is the total number of nodes currently in the
HyperNode.
format: int64
minimum: 0
type: integer

View File

@ -47,9 +47,15 @@ rules:
- apiGroups: ["batch.volcano.sh"]
resources: ["jobs"]
verbs: ["get", "list", "watch", "update", "delete"]
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
resources: ["reservations"]
verbs: ["get", "list", "watch", "update", "delete"]
- apiGroups: ["batch.volcano.sh"]
resources: ["jobs/status"]
verbs: ["update", "patch"]
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
resources: ["reservations/status"]
verbs: ["update", "patch"]
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "list", "watch", "update", "patch"]

View File

@ -0,0 +1 @@
{{- tpl ($.Files.Get (printf "crd/%s/scheduling.volcano.sh_reservations.yaml" (include "crd_version" .))) . }}

File diff suppressed because it is too large Load Diff

View File

@ -22,6 +22,7 @@ import (
"k8s.io/klog/v2"
"volcano.sh/apis/pkg/apis/scheduling"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/framework"
@ -197,7 +198,12 @@ func (alloc *Action) allocateResources(queues *util.PriorityQueue, jobsMap map[a
pendingTasks[job.UID] = tasksQueue
}
} else {
stmt = alloc.allocateResourcesForTasks(tasks, job, queue, allNodes, "")
// todo: 需要解耦
if job.IsUseReservation() {
stmt = alloc.allocateResourcesForReservationTasks(tasks, job, jobs, allNodes)
} else {
stmt = alloc.allocateResourcesForTasks(tasks, job, queue, allNodes, "")
}
// There are still left tasks that need to be allocated when min available < replicas, put the job back
if tasks.Len() > 0 {
jobs.Push(job)
@ -591,4 +597,73 @@ func (alloc *Action) predicate(task *api.TaskInfo, node *api.NodeInfo) error {
return alloc.session.PredicateForAllocateAction(task, node)
}
func (alloc *Action) allocateResourcesForReservationTasks(tasks *util.PriorityQueue, job *api.JobInfo, jobs *util.PriorityQueue, allNodes []*api.NodeInfo) *framework.Statement {
klog.V(3).Infof("Allocating resources for reservation tasks in job <%s/%s> with <%d> tasks", job.Namespace, job.Name, tasks.Len())
ssn := alloc.session
stmt := framework.NewStatement(ssn)
for !tasks.Empty() {
task := tasks.Pop().(*api.TaskInfo)
if job.TaskHasFitErrors(task) {
klog.V(5).Infof("Task %s with role spec %s has already predicated failed, skip", task.Name, task.TaskRole)
continue
}
if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
fitErrors := api.NewFitErrors()
for _, ni := range allNodes {
fitErrors.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fitErrors
break
}
reservationTask := task.ReservationTaskInfo
if reservationTask == nil {
klog.Warningf("Task <%s/%s> does not have a corresponding reservation task", task.Namespace, task.Name)
continue
}
reservedNodeName := reservationTask.NodeName
if reservedNodeName == "" {
klog.Warningf("Node for reservation Task <%s/%s> not found", task.Namespace, task.Name)
continue
}
reservedNode, found := ssn.Nodes[reservedNodeName]
if !found {
klog.Warningf("Reserved node %s for Task <%s/%s> not found", reservedNodeName, task.Namespace, task.Name)
continue
}
klog.V(3).Infof("Binding Reservation Task <%s/%s> to reserved node <%v>", task.Namespace, task.Name, reservedNodeName)
if err := stmt.Allocate(task, reservedNode); err != nil {
klog.Errorf("Failed to bind reservation Task %v on reserved node %v in Session %v, err: %v", task.UID, reservedNode.Name, ssn.UID, err)
if rollbackErr := stmt.UnAllocate(task); rollbackErr != nil {
klog.Errorf("Failed to unallocate Task %v on reserved node %v in Session %v for %v.", task.UID, reservedNode.Name, ssn.UID, rollbackErr)
}
} else {
metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
}
if ssn.JobReady(job) && !tasks.Empty() {
jobs.Push(job)
break
}
}
if ssn.JobReady(job) {
return stmt
} else {
if !ssn.JobPipelined(job) {
stmt.Discard()
}
return nil
}
}
func (alloc *Action) UnInitialize() {}

View File

@ -20,6 +20,7 @@ import (
"math"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"volcano.sh/volcano/pkg/scheduler/api"
)
@ -87,3 +88,58 @@ func Share(l, r float64) float64 {
return share
}
func IsPodSpecMatch(taskPodSpec, resvPodSpec *v1.PodSpec) bool {
if taskPodSpec.SchedulerName != resvPodSpec.SchedulerName {
return false
}
if !apiequality.Semantic.DeepEqual(taskPodSpec.NodeSelector, resvPodSpec.NodeSelector) {
return false
}
if !apiequality.Semantic.DeepEqual(taskPodSpec.Affinity, resvPodSpec.Affinity) {
return false
}
if !apiequality.Semantic.DeepEqual(taskPodSpec.Tolerations, resvPodSpec.Tolerations) {
return false
}
if taskPodSpec.PriorityClassName != resvPodSpec.PriorityClassName {
return false
}
if !isContainerListEqual(taskPodSpec.Containers, resvPodSpec.Containers) {
return false
}
if !isContainerListEqual(taskPodSpec.InitContainers, resvPodSpec.InitContainers) {
return false
}
return true
}
func isContainerListEqual(a, b []v1.Container) bool {
if len(a) != len(b) {
return false
}
containerMap := make(map[string]v1.Container, len(a))
for _, c := range a {
containerMap[c.Name] = c
}
for _, c := range b {
ref, ok := containerMap[c.Name]
if !ok {
return false
}
if c.Image != ref.Image {
return false
}
if !apiequality.Semantic.DeepEqual(c.Resources.Requests, ref.Resources.Requests) {
return false
}
if !apiequality.Semantic.DeepEqual(c.Resources.Limits, ref.Resources.Limits) {
return false
}
}
return true
}

View File

@ -5,6 +5,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/resource"
"volcano.sh/volcano/pkg/scheduler/api"
)
@ -40,3 +41,274 @@ func TestMax(t *testing.T) {
t.Errorf("expected: %#v, got: %#v", expected, re)
}
}
func TestIsPodSpecMatch(t *testing.T) {
cpu500m := resource.MustParse("500m")
cpu250m := resource.MustParse("250m")
mem128Mi := resource.MustParse("128Mi")
tests := []struct {
name string
specA *v1.PodSpec
specB *v1.PodSpec
expectMatch bool
}{
{
name: "Same spec",
specA: &v1.PodSpec{
SchedulerName: "default-scheduler",
Containers: []v1.Container{
{
Name: "c1",
Image: "nginx",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: cpu500m,
},
},
},
},
},
specB: &v1.PodSpec{
SchedulerName: "default-scheduler",
Containers: []v1.Container{
{
Name: "c1",
Image: "nginx",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: cpu500m,
},
},
},
},
},
expectMatch: true,
},
{
name: "Different image",
specA: &v1.PodSpec{
SchedulerName: "default-scheduler",
Containers: []v1.Container{
{Name: "c1", Image: "nginx"},
},
},
specB: &v1.PodSpec{
SchedulerName: "default-scheduler",
Containers: []v1.Container{
{Name: "c1", Image: "busybox"},
},
},
expectMatch: false,
},
{
name: "Container Resources different",
specA: &v1.PodSpec{
SchedulerName: "default-scheduler",
Containers: []v1.Container{
{
Name: "c1",
Image: "nginx",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: cpu500m,
},
},
},
},
},
specB: &v1.PodSpec{
SchedulerName: "default-scheduler",
Containers: []v1.Container{
{
Name: "c1",
Image: "nginx",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: cpu250m,
},
},
},
},
},
expectMatch: false,
},
{
name: "The order of containers is different",
specA: &v1.PodSpec{
SchedulerName: "default-scheduler",
Containers: []v1.Container{
{
Name: "a",
Image: "nginx",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceMemory: mem128Mi,
},
},
},
{
Name: "b",
Image: "busybox",
},
},
},
specB: &v1.PodSpec{
SchedulerName: "default-scheduler",
Containers: []v1.Container{
{
Name: "b",
Image: "busybox",
},
{
Name: "a",
Image: "nginx",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceMemory: mem128Mi,
},
},
},
},
},
expectMatch: true,
},
{
name: "Different schedulerName",
specA: &v1.PodSpec{
SchedulerName: "custom-scheduler",
Containers: []v1.Container{
{Name: "c1", Image: "nginx"},
},
},
specB: &v1.PodSpec{
SchedulerName: "default-scheduler",
Containers: []v1.Container{
{Name: "c1", Image: "nginx"},
},
},
expectMatch: false,
},
{
name: "Different tolerations",
specA: &v1.PodSpec{
SchedulerName: "default-scheduler",
Tolerations: []v1.Toleration{
{Key: "key1", Operator: v1.TolerationOpExists},
},
Containers: []v1.Container{
{Name: "c1", Image: "nginx"},
},
},
specB: &v1.PodSpec{
SchedulerName: "default-scheduler",
Tolerations: []v1.Toleration{
{Key: "key2", Operator: v1.TolerationOpExists},
},
Containers: []v1.Container{
{Name: "c1", Image: "nginx"},
},
},
expectMatch: false,
},
{
name: "Different affinity",
specA: &v1.PodSpec{
SchedulerName: "default-scheduler",
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{Key: "zone", Operator: v1.NodeSelectorOpIn, Values: []string{"zoneA"}},
},
},
},
},
},
},
Containers: []v1.Container{
{Name: "c1", Image: "nginx"},
},
},
specB: &v1.PodSpec{
SchedulerName: "default-scheduler",
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{Key: "zone", Operator: v1.NodeSelectorOpIn, Values: []string{"zoneB"}},
},
},
},
},
},
},
Containers: []v1.Container{
{Name: "c1", Image: "nginx"},
},
},
expectMatch: false,
},
{
name: "Tolerations and Affinity are same",
specA: &v1.PodSpec{
SchedulerName: "default-scheduler",
Tolerations: []v1.Toleration{
{Key: "key1", Operator: v1.TolerationOpExists},
},
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{Key: "zone", Operator: v1.NodeSelectorOpIn, Values: []string{"zoneA"}},
},
},
},
},
},
},
Containers: []v1.Container{
{Name: "c1", Image: "nginx"},
},
},
specB: &v1.PodSpec{
SchedulerName: "default-scheduler",
Tolerations: []v1.Toleration{
{Key: "key1", Operator: v1.TolerationOpExists},
},
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{Key: "zone", Operator: v1.NodeSelectorOpIn, Values: []string{"zoneA"}},
},
},
},
},
},
},
Containers: []v1.Container{
{Name: "c1", Image: "nginx"},
},
},
expectMatch: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
match := IsPodSpecMatch(tt.specA, tt.specB)
if match != tt.expectMatch {
t.Errorf("Expected match = %v, got %v", tt.expectMatch, match)
}
})
}
}

View File

@ -76,6 +76,12 @@ type TransactionContext struct {
Status TaskStatus
}
// ReservationContext holds all the fields that needed by reservation scheduling
type ReservationContext struct {
ReservationTaskInfo *TaskInfo
ReservationNodeNames []string
}
// Clone returns a clone of TransactionContext
func (ctx *TransactionContext) Clone() *TransactionContext {
if ctx == nil {
@ -118,6 +124,7 @@ type TaskInfo struct {
InitResreq *Resource
TransactionContext
ReservationContext
// LastTransaction holds the context of last scheduling transaction
LastTransaction *TransactionContext
@ -235,6 +242,23 @@ func (ti *TaskInfo) ClearLastTxContext() {
ti.LastTransaction = nil
}
// IsReservationTask returns true if the task is only for reservation
func (ti *TaskInfo) IsReservationTask() bool {
if ti.Pod == nil || ti.Pod.Annotations == nil {
return false
}
annotationValue, ok := ti.Pod.Annotations[v1beta1.VolcanoGroupReservationOnlyAnnotationKey]
if !ok {
return false
}
return annotationValue == "true"
}
func (ti *TaskInfo) IsUseReservationTask() bool {
return ti.ReservationTaskInfo != nil
}
// Return if the pod of a task is scheduling gated by checking if length of sch gates is zero
// When the Pod is not yet created or sch gates field not set, return false
func calSchedulingGated(pod *v1.Pod) bool {
@ -291,6 +315,10 @@ func (ti *TaskInfo) Clone() *TaskInfo {
NodeName: ti.NodeName,
Status: ti.Status,
},
ReservationContext: ReservationContext{
ReservationTaskInfo: ti.ReservationTaskInfo,
ReservationNodeNames: ti.ReservationNodeNames,
},
LastTransaction: ti.LastTransaction.Clone(),
}
}
@ -1073,3 +1101,28 @@ func (ji *JobInfo) ResetFitErr() {
ji.JobFitErrors = ""
ji.NodesFitErrors = make(map[TaskID]*FitErrors)
}
func (ji *JobInfo) GetReservationName() string {
if ji.PodGroup == nil || ji.PodGroup.Annotations == nil {
return ""
}
annotationValue, ok := ji.PodGroup.Annotations[v1beta1.VolcanoGroupTargetReservationAnnotationKey]
if !ok {
return ""
}
return annotationValue
}
func (ji *JobInfo) SetReservation(reservation *ReservationInfo) {
if ji.PodGroup == nil {
return
}
if ji.PodGroup.Annotations == nil {
ji.PodGroup.Annotations = make(map[string]string)
}
ji.PodGroup.Annotations[v1beta1.VolcanoGroupTargetReservationAnnotationKey] = reservation.Reservation.Name
}
func (ji *JobInfo) IsUseReservation() bool {
return ji.GetReservationName() != ""
}

View File

@ -0,0 +1,35 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import "volcano.sh/apis/pkg/apis/scheduling"
// ReservationInfo will have all info of a Reservation
type ReservationInfo struct {
JobID JobID
JobInfo *JobInfo
Reservation *scheduling.Reservation
}
// NewReservationInfo creates a new reservationInfo
func NewReservationInfo(jobID JobID, jobInfo *JobInfo, reservation *scheduling.Reservation) *ReservationInfo {
return &ReservationInfo{
JobID: jobID,
JobInfo: jobInfo,
Reservation: reservation,
}
}

View File

@ -785,3 +785,22 @@ func ExceededPart(left, right *Resource) *Resource {
diff, _ := left.Diff(right, Zero)
return diff
}
func (r *Resource) ConvertResourceToResourceList() v1.ResourceList {
rl := v1.ResourceList{}
if r == nil {
return rl
}
if r.MilliCPU > 0 {
rl[v1.ResourceCPU] = *resource.NewMilliQuantity(int64(r.MilliCPU), resource.DecimalSI)
}
if r.Memory > 0 {
rl[v1.ResourceMemory] = *resource.NewQuantity(int64(r.Memory), resource.BinarySI)
}
for name, val := range r.ScalarResources {
if val > 0 {
rl[name] = *resource.NewQuantity(int64(val), resource.DecimalSI)
}
}
return rl
}

View File

@ -64,7 +64,9 @@ import (
vcinformer "volcano.sh/apis/pkg/client/informers/externalversions"
cpuinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/nodeinfo/v1alpha1"
vcinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1"
topologyinformerv1alpha1 "volcano.sh/apis/pkg/client/informers/externalversions/topology/v1alpha1"
"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/features"
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
@ -115,6 +117,7 @@ type SchedulerCache struct {
hyperNodeInformer topologyinformerv1alpha1.HyperNodeInformer
podGroupInformerV1beta1 vcinformerv1.PodGroupInformer
queueInformerV1beta1 vcinformerv1.QueueInformer
reservationInformerV1beta1 vcinformerv1.ReservationInformer
pvInformer infov1.PersistentVolumeInformer
pvcInformer infov1.PersistentVolumeClaimInformer
scInformer storagev1.StorageClassInformer
@ -144,10 +147,11 @@ type SchedulerCache struct {
NamespaceCollection map[string]*schedulingapi.NamespaceCollection
errTasks workqueue.TypedRateLimitingInterface[string]
nodeQueue workqueue.TypedRateLimitingInterface[string]
DeletedJobs workqueue.TypedRateLimitingInterface[*schedulingapi.JobInfo]
hyperNodesQueue workqueue.TypedRateLimitingInterface[string]
errTasks workqueue.TypedRateLimitingInterface[string]
nodeQueue workqueue.TypedRateLimitingInterface[string]
DeletedJobs workqueue.TypedRateLimitingInterface[*schedulingapi.JobInfo]
hyperNodesQueue workqueue.TypedRateLimitingInterface[string]
DeletedReservations workqueue.TypedRateLimitingInterface[*schedulingapi.ReservationInfo]
informerFactory informers.SharedInformerFactory
vcInformerFactory vcinformer.SharedInformerFactory
@ -174,6 +178,12 @@ type SchedulerCache struct {
// sharedDRAManager is used in DRA plugin, contains resourceClaimTracker, resourceSliceLister and deviceClassLister
sharedDRAManager k8sframework.SharedDRAManager
ReservationCache *ReservationCache
}
func (sc *SchedulerCache) GetReservationCache() *ReservationCache {
return sc.ReservationCache
}
type multiSchedulerInfo struct {
@ -521,6 +531,7 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
nodeQueue: workqueue.NewTypedRateLimitingQueue[string](workqueue.DefaultTypedControllerRateLimiter[string]()),
DeletedJobs: workqueue.NewTypedRateLimitingQueue[*schedulingapi.JobInfo](workqueue.DefaultTypedControllerRateLimiter[*schedulingapi.JobInfo]()),
hyperNodesQueue: workqueue.NewTypedRateLimitingQueue[string](workqueue.DefaultTypedControllerRateLimiter[string]()),
DeletedReservations: workqueue.NewTypedRateLimitingQueue[*schedulingapi.ReservationInfo](workqueue.DefaultTypedControllerRateLimiter[*schedulingapi.ReservationInfo]()),
kubeClient: kubeClient,
vcClient: vcClient,
restConfig: config,
@ -574,6 +585,8 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
vcclient: sc.vcClient,
}
sc.ReservationCache = newReservationCache(sc.vcClient)
sc.binderRegistry = NewBinderRegistry()
// add all events handlers
@ -743,6 +756,14 @@ func (sc *SchedulerCache) addEventHandler() {
DeleteFunc: sc.DeleteQueueV1beta1,
})
// create informer(v1beta1) for Reservation information
sc.reservationInformerV1beta1 = vcinformers.Scheduling().V1beta1().Reservations()
sc.reservationInformerV1beta1.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddReservationV1beta1,
UpdateFunc: sc.UpdateReservationV1beta1,
DeleteFunc: sc.DeleteReservationV1beta1,
})
if utilfeature.DefaultFeatureGate.Enabled(features.ResourceTopology) {
sc.cpuInformer = vcinformers.Nodeinfo().V1alpha1().Numatopologies()
sc.cpuInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -786,6 +807,9 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
// Cleanup jobs.
go wait.Until(sc.processCleanupJob, 0, stopCh)
// Cleanup reservations.
go wait.Until(sc.processCleanupReservation, 0, stopCh)
go wait.Until(sc.processBindTask, time.Millisecond*20, stopCh)
// Get metrics data
@ -796,6 +820,8 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
}
klog.V(3).Infof("The interval for querying metrics data is %v", interval)
go wait.Until(sc.GetMetricsData, interval, stopCh)
go wait.Until(sc.gcExpiredReservationsOnce, time.Millisecond*20, stopCh)
}
// WaitForCacheSync sync the cache with the api server
@ -898,6 +924,9 @@ func (sc *SchedulerCache) Bind(ctx context.Context, bindContexts []*BindContext)
for _, bindContext := range bindContexts {
if reason, ok := errMsg[bindContext.TaskInfo.UID]; !ok {
if err := sc.executePostBind(ctx, bindContext); err != nil {
klog.Errorf("Failed to execute postBind for task %s/%s, err: %v", bindContext.TaskInfo.Namespace, bindContext.TaskInfo.Name, err)
}
sc.Recorder.Eventf(bindContext.TaskInfo.Pod, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", bindContext.TaskInfo.Namespace, bindContext.TaskInfo.Name, bindContext.TaskInfo.NodeName)
} else {
unschedulableMsg := fmt.Sprintf("failed to bind to node %s: %s", bindContext.TaskInfo.NodeName, reason)
@ -1199,6 +1228,12 @@ func (sc *SchedulerCache) processSyncHyperNode() {
// AddBindTask add task to be bind to a cache which consumes by go runtime
func (sc *SchedulerCache) AddBindTask(bindContext *BindContext) error {
klog.V(5).Infof("add bind task %v/%v", bindContext.TaskInfo.Namespace, bindContext.TaskInfo.Name)
// todo: 需要解耦
if bindContext.TaskInfo.IsReservationTask() {
return sc.processReservationTask(bindContext.TaskInfo)
}
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
job, task, err := sc.findJobAndTask(bindContext.TaskInfo)
@ -1223,17 +1258,51 @@ func (sc *SchedulerCache) AddBindTask(bindContext *BindContext) error {
}
task.NumaInfo = bindContext.TaskInfo.NumaInfo.Clone()
// Add task to the node.
if err := node.AddTask(task); err != nil {
// After failing to update task to a node we need to revert task status from Releasing,
// otherwise task might be stuck in the Releasing state indefinitely.
if err := job.UpdateTaskStatus(task, originalStatus); err != nil {
klog.Errorf("Task <%s/%s> will be resynchronized after failing to revert status "+
"from %s to %s after failing to update Task on Node <%s>: %v",
task.Namespace, task.Name, task.Status, originalStatus, node.Name, err)
sc.resyncTask(task)
// todo: 需要解耦
if bindContext.TaskInfo.IsUseReservationTask() {
reservationTask := bindContext.TaskInfo.ReservationTaskInfo
// Remove reservation task from the node if it exists to release the reserved resources on node.
if err := node.RemoveTask(reservationTask); err != nil {
// After failing to update task to a node we need to revert task status from Releasing,
// otherwise task might be stuck in the Releasing state indefinitely.
if err := job.UpdateTaskStatus(task, originalStatus); err != nil {
klog.Errorf("Task <%s/%s> will be resynchronized after failing to revert status "+
"from %s to %s after failing to update Task on Node <%s>: %v",
task.Namespace, task.Name, task.Status, originalStatus, node.Name, err)
sc.resyncTask(task)
}
return err
}
// Add task to the node.
if err := node.AddTask(task); err != nil {
// After failing to update task to a node we need to revert task status from Releasing,
// otherwise task might be stuck in the Releasing state indefinitely.
if err := job.UpdateTaskStatus(task, originalStatus); err != nil {
klog.Errorf("Task <%s/%s> will be resynchronized after failing to revert status "+
"from %s to %s after failing to update Task on Node <%s>: %v",
task.Namespace, task.Name, task.Status, originalStatus, node.Name, err)
sc.resyncTask(task)
}
if rollbackErr := node.AddTask(reservationTask); rollbackErr != nil {
klog.Errorf("Rollback reservation task failed: unable to add reservation task <%s/%s> back to node <%s>: %v",
reservationTask.Namespace, reservationTask.Name, node.Name, rollbackErr)
}
return err
}
} else {
// Add task to the node.
if err := node.AddTask(task); err != nil {
// After failing to update task to a node we need to revert task status from Releasing,
// otherwise task might be stuck in the Releasing state indefinitely.
if err := job.UpdateTaskStatus(task, originalStatus); err != nil {
klog.Errorf("Task <%s/%s> will be resynchronized after failing to revert status "+
"from %s to %s after failing to update Task on Node <%s>: %v",
task.Namespace, task.Name, task.Status, originalStatus, node.Name, err)
sc.resyncTask(task)
}
return err
}
return err
}
sc.BindFlowChannel <- bindContext
@ -1267,6 +1336,83 @@ func (sc *SchedulerCache) processBindTask() {
sc.BindTask()
}
func (sc *SchedulerCache) processReservationTask(taskInfo *schedulingapi.TaskInfo) error {
klog.V(5).Infof("reserve task %v/%v", taskInfo.Namespace, taskInfo.Name)
job, task, err := sc.findJobAndTask(taskInfo)
if err != nil {
return err
}
node, found := sc.Nodes[taskInfo.NodeName]
if !found {
return fmt.Errorf("failed to reserve Task %v to host %v, host does not exist",
task.UID, taskInfo.NodeName)
}
originalStatus := task.Status
if err := job.UpdateTaskStatus(task, schedulingapi.Bound); err != nil {
return err
}
err = taskInfo.SetPodResourceDecision()
if err != nil {
return fmt.Errorf("set reserve task %v/%v resource decision failed, err %v", task.Namespace, task.Name, err)
}
task.NumaInfo = taskInfo.NumaInfo.Clone()
// Add task to the node.
if err := node.AddTask(task); err != nil {
// After failing to update task to a node we need to revert task status from Releasing,
// otherwise task might be stuck in the Releasing state indefinitely.
if err := job.UpdateTaskStatus(task, originalStatus); err != nil {
klog.Errorf("Reserve task <%s/%s> will be resynchronized after failing to revert status "+
"from %s to %s after failing to update Task on Node <%s>: %v",
task.Namespace, task.Name, task.Status, originalStatus, node.Name, err)
sc.resyncTask(task)
}
return err
}
// Sync to reservation cache
if err := sc.ReservationCache.SyncTaskStatus(task, job); err != nil {
return err
}
return nil
}
func (sc *SchedulerCache) SyncBindToReservationTask(task *schedulingapi.TaskInfo) error {
return sc.syncBindToReservationTask(task)
}
func (sc *SchedulerCache) syncBindToReservationTask(taskInfo *schedulingapi.TaskInfo) error {
klog.V(1).Infof("sync bind to reservation task %v/%v", taskInfo.Namespace, taskInfo.Name)
reservationTask := taskInfo.ReservationTaskInfo
reservationJobId := reservationTask.Job
reservationJob, ok := sc.Jobs[reservationJobId]
if !ok {
return fmt.Errorf("failed to find reservation job %s for reservation task %s", reservationJobId, reservationTask.UID)
}
if err := reservationJob.UpdateTaskStatus(reservationTask, schedulingapi.Succeeded); err != nil {
return err
}
jobId := taskInfo.Job
job, ok := sc.Jobs[jobId]
if !ok {
return fmt.Errorf("failed to find job %s", jobId)
}
if err := sc.ReservationCache.AllocateJobToReservation(reservationTask, job); err != nil {
return err
}
if err := sc.ReservationCache.SyncTaskStatus(reservationTask, reservationJob); err != nil {
return err
}
return nil
}
// executePreBind executes PreBind for one bindContext
func (sc *SchedulerCache) executePreBind(ctx context.Context, bindContext *BindContext) error {
executedPreBinders := make([]PreBinder, 0, len(sc.binderRegistry.preBinders))
@ -1315,6 +1461,24 @@ func (sc *SchedulerCache) executePreBinds(ctx context.Context, bindContexts []*B
return successfulBindContexts
}
// executePostBinds executes PostBind for one bindContext
func (sc *SchedulerCache) executePostBind(ctx context.Context, bindContext *BindContext) error {
sc.binderRegistry.mu.RLock()
defer sc.binderRegistry.mu.RUnlock()
for _, postBinder := range sc.binderRegistry.postBinders {
if postBinder != nil {
if err := postBinder.PostBind(ctx, bindContext); err != nil {
klog.Errorf("PostBind failed for task %s/%s: %v",
bindContext.TaskInfo.Namespace, bindContext.TaskInfo.Name, err)
return err
}
}
}
return nil
}
// BindTask do k8s binding with a goroutine
func (sc *SchedulerCache) BindTask() {
klog.V(5).Infof("batch bind task count %d", sc.batchNum)
@ -1660,3 +1824,48 @@ func (sc *SchedulerCache) RegisterBinder(name string, binder interface{}) {
}
sc.binderRegistry.Register(name, binder)
}
func (sc *SchedulerCache) cleanReservation(reservation *schedulingapi.ReservationInfo) {
rs := reservation.Reservation
klog.V(3).Infof("Try to delete Reservation <%v:%v/%v>", rs.UID, rs.Namespace, rs.Name)
sc.DeletedReservations.Add(reservation)
}
func (sc *SchedulerCache) retryCleanReservation(reservation *schedulingapi.ReservationInfo) {
rs := reservation.Reservation
klog.V(3).Infof("Retry to delete Reservation <%v:%v/%v>", rs.UID, rs.Namespace, rs.Name)
sc.DeletedReservations.AddRateLimited(reservation)
}
func (sc *SchedulerCache) processCleanupReservation() {
reservation, shutdown := sc.DeletedReservations.Get()
if shutdown {
return
}
defer sc.DeletedReservations.Done(reservation)
if !isReservationNeedExpiration(reservation, time.Now()) {
klog.V(4).Infof("Reservation %s not expired or no need to clean yet, skipping", reservation.Reservation.Name)
sc.DeletedReservations.Forget(reservation)
return
}
err := sc.gcReservation(reservation)
if err != nil {
klog.Errorf("Failed to GC reservation %s: %v, and will retry...", reservation.Reservation.Name, err)
sc.retryCleanReservation(reservation)
} else {
sc.DeletedReservations.Forget(reservation)
}
}
func (sc *SchedulerCache) gcExpiredReservationsOnce() {
now := time.Now()
sc.ReservationCache.ScanExpiredReservations(now, func(reservation *schedulingapi.ReservationInfo) {
klog.V(4).Infof("Reservation %s expired, enqueuing for deletion", reservation.Reservation.Name)
sc.DeletedReservations.Add(reservation)
})
}

View File

@ -24,6 +24,7 @@ import (
"slices"
"strconv"
"github.com/google/uuid"
v1 "k8s.io/api/core/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
sv1 "k8s.io/api/storage/v1"
@ -31,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/component-helpers/storage/ephemeral"
@ -40,6 +42,8 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/utils/cpuset"
batch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
"volcano.sh/apis/pkg/apis/helpers"
nodeinfov1alpha1 "volcano.sh/apis/pkg/apis/nodeinfo/v1alpha1"
"volcano.sh/apis/pkg/apis/scheduling"
@ -47,6 +51,8 @@ import (
schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
topologyv1alpha1 "volcano.sh/apis/pkg/apis/topology/v1alpha1"
"volcano.sh/apis/pkg/apis/utils"
jobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers"
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/metrics"
)
@ -770,6 +776,15 @@ func getJobID(pg *schedulingapi.PodGroup) schedulingapi.JobID {
return schedulingapi.JobID(fmt.Sprintf("%s/%s", pg.Namespace, pg.Name))
}
func getPodGroupName(rs *scheduling.Reservation) string {
return fmt.Sprintf("%s-%s", rs.Name, string(rs.UID))
}
func getJobIDByReservation(rs *scheduling.Reservation) schedulingapi.JobID {
pgName := getPodGroupName(rs)
return schedulingapi.JobID(fmt.Sprintf("%s/%s", rs.Namespace, pgName))
}
// Assumes that lock is already acquired.
func (sc *SchedulerCache) setPodGroup(ss *schedulingapi.PodGroup) error {
job := getJobID(ss)
@ -817,24 +832,14 @@ func (sc *SchedulerCache) AddPodGroupV1beta1(obj interface{}) {
return
}
podgroup := scheduling.PodGroup{}
if err := scheme.Scheme.Convert(ss, &podgroup, nil); err != nil {
podgroup := &scheduling.PodGroup{}
if err := scheme.Scheme.Convert(ss, podgroup, nil); err != nil {
klog.Errorf("Failed to convert podgroup from %T to %T", ss, podgroup)
return
}
if podgroup.GetAnnotations() == nil {
podgroup.SetAnnotations(map[string]string{})
}
pg := &schedulingapi.PodGroup{PodGroup: podgroup, Version: schedulingapi.PodGroupVersionV1Beta1}
klog.V(4).Infof("Add PodGroup(%s) into cache, spec(%#v)", ss.Name, ss.Spec)
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
if err := sc.setPodGroup(pg); err != nil {
klog.Errorf("Failed to add PodGroup %s into cache: %v", ss.Name, err)
return
}
sc.addPodGroup(podgroup)
}
// UpdatePodGroupV1beta1 add podgroup to scheduler cache
@ -902,6 +907,19 @@ func (sc *SchedulerCache) DeletePodGroupV1beta1(obj interface{}) {
}
}
func (sc *SchedulerCache) addPodGroup(podgroup *scheduling.PodGroup) {
if podgroup.GetAnnotations() == nil {
podgroup.SetAnnotations(map[string]string{})
}
pg := &schedulingapi.PodGroup{PodGroup: *podgroup, Version: schedulingapi.PodGroupVersionV1Beta1}
klog.V(4).Infof("Add PodGroup(%s) into cache, spec(%#v)", podgroup.Name, podgroup.Spec)
if err := sc.setPodGroup(pg); err != nil {
klog.Errorf("Failed to add PodGroup %s into cache: %v", podgroup.Name, err)
return
}
}
// AddQueueV1beta1 add queue to scheduler cache
func (sc *SchedulerCache) AddQueueV1beta1(obj interface{}) {
ss, ok := obj.(*schedulingv1beta1.Queue)
@ -1338,6 +1356,254 @@ func (sc *SchedulerCache) setCSIResourceOnNode(csiNode *sv1.CSINode, node *v1.No
}
}
// AddReservationV1beta1 add reservation to scheduler cache
func (sc *SchedulerCache) AddReservationV1beta1(obj interface{}) {
ss, ok := obj.(*schedulingv1beta1.Reservation)
if !ok {
klog.Errorf("Cannot convert to *schedulingv1beta1.Reservation: %v", obj)
return
}
reservation := &scheduling.Reservation{}
if err := scheme.Scheme.Convert(ss, reservation, nil); err != nil {
klog.Errorf("Failed to convert reservation from %T to %T", ss, reservation)
return
}
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
klog.V(3).Infof("Add Reservation <%s/%s> into cache", ss.Namespace, ss.Name)
sc.addReservation(reservation)
}
// UpdateReservationV1beta1 update reservation to scheduler cache
func (sc *SchedulerCache) UpdateReservationV1beta1(oldObj, newObj interface{}) {
// TODO
klog.V(3).Infof("Update Reservation, ignore. Not support now.")
return
}
// DeleteReservationV1beta1 delete reservation from the scheduler cache
func (sc *SchedulerCache) DeleteReservationV1beta1(obj interface{}) {
var ss *schedulingv1beta1.Reservation
switch t := obj.(type) {
case *schedulingv1beta1.Reservation:
ss = t
case cache.DeletedFinalStateUnknown:
var ok bool
ss, ok = t.Obj.(*schedulingv1beta1.Reservation)
if !ok {
klog.Errorf("Cannot convert to *schedulingv1beta1.Reservation: %v", t.Obj)
return
}
default:
klog.Errorf("Cannot convert to Numatopo: %v", t)
return
}
reservation := &scheduling.Reservation{}
if err := scheme.Scheme.Convert(ss, reservation, nil); err != nil {
klog.Errorf("Failed to convert reservation from %T to %T", ss, reservation)
return
}
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
sc.deleteReservation(reservation)
klog.V(3).Infof("Delete Reservation <%s/%s> from cache", ss.Namespace, ss.Name)
}
func (sc *SchedulerCache) addReservation(reservation *scheduling.Reservation) {
_, err := sc.getQueueByName(reservation.Spec.Queue)
if err != nil {
klog.Errorf(err.Error())
return
}
if isInitiated(reservation) {
klog.V(3).Infof("Reservation <%s/%s> is already initiated", reservation.Namespace, reservation.Name)
return
}
if _, err := sc.initiateReservation(reservation); err != nil {
klog.Errorf(err.Error())
return
}
// Create fake JobInfo and TaskInfos for Reservation
jobId := getJobIDByReservation(reservation)
if _, found := sc.Jobs[jobId]; !found {
sc.Jobs[jobId] = schedulingapi.NewJobInfo(jobId)
}
job := sc.Jobs[jobId]
for _, ts := range reservation.Spec.Tasks {
ts.Template.Name = ts.Name
tc := ts.Template.DeepCopy()
for i := 0; i < int(ts.Replicas); i++ {
newPod := createReservationPod(reservation, tc, i)
pi, err := sc.NewTaskInfo(newPod)
pi.ReservationNodeNames = ts.ReservationNodeNames
pi.Status = schedulingapi.Pending
klog.V(5).Infof("Created TaskInfo: %+v", pi)
if err != nil {
klog.Errorf("Failed to create task in cache for reservation pod<%s/%s>: %v",
newPod.Namespace, newPod.Name, err)
return
}
err = sc.addTask(pi)
if err != nil {
klog.Errorf("Failed to add taskInfo for pod <%s/%s> into cache: %v",
newPod.Namespace, newPod.Name, err)
return
}
klog.V(4).Infof("Added TaskInfo:%v for pod %s/%s to scheduler cache", pi, newPod.Namespace, newPod.Name)
}
}
klog.V(5).Infof("Job Tasks: %v", job.Tasks)
// Create ReservationInfo
reservationInfo := schedulingapi.NewReservationInfo(jobId, job, reservation)
// Add ReservationInfo into Reservation Cache
sc.ReservationCache.AddReservation(reservationInfo)
klog.V(4).Infof("Added ReservationInfo %s/%s to ReservationCache, UID: %s", reservation.Namespace, reservation.Name, reservationInfo.Reservation.UID)
}
func (sc *SchedulerCache) deleteReservation(ss *scheduling.Reservation) {
reservationInfo, ok := sc.ReservationCache.GetReservationById(ss.UID)
if !ok {
return
}
job := reservationInfo.JobInfo
// clean related tasks from reservation
tasks := job.Tasks
for _, task := range tasks {
if err := sc.deleteTask(task); err != nil {
klog.Errorf("Failed to delete task <%s/%s> for reservation <%s/%s> from cache: %v",
task.Namespace, task.Name, reservationInfo.Reservation.Namespace, reservationInfo.Reservation.Name, err)
} else {
klog.V(4).Infof("Delete task <%s/%s> for reservation <%s/%s> from cache",
task.Namespace, task.Name, reservationInfo.Reservation.Namespace, reservationInfo.Reservation.Name)
}
}
sc.ReservationCache.DeleteReservation(ss.UID)
// clean related podgroup from cache
if err := sc.deletePodGroup(job.UID); err != nil {
klog.Errorf("Failed to delete podgroup %s for reservation from cache: %v", ss.Name, err)
return
}
}
func (sc *SchedulerCache) gcReservation(reservation *schedulingapi.ReservationInfo) error {
job := reservation.JobInfo
// clean related tasks from reservation
tasks := job.Tasks
for _, task := range tasks {
if err := sc.deleteTask(task); err != nil {
klog.Errorf("Failed to delete task <%s/%s> for reservation <%s/%s> from cache: %v",
task.Namespace, task.Name, reservation.Reservation.Namespace, reservation.Reservation.Name, err)
} else {
klog.V(4).Infof("Delete task <%s/%s> for reservation <%s/%s> from cache",
task.Namespace, task.Name, reservation.Reservation.Namespace, reservation.Reservation.Name)
}
}
// clean related podgroup from cache
if err := sc.deletePodGroup(job.UID); err != nil {
klog.Errorf("Failed to delete podgroup %s for reservation from cache: %v", reservation.Reservation.Name, err)
return err
}
// gc reservation from cache
return sc.ReservationCache.GcExpiredReservation(reservation)
}
func createReservationPod(reservation *scheduling.Reservation, template *v1.PodTemplateSpec, ix int) *v1.Pod {
templateCopy := template.DeepCopy()
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: jobhelpers.MakePodName(reservation.Name, template.Name, ix),
Namespace: reservation.Namespace,
UID: types.UID(uuid.New().String()),
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(reservation, helpers.ReservationKind),
},
Labels: templateCopy.Labels,
Annotations: templateCopy.Annotations,
},
Spec: templateCopy.Spec,
Status: v1.PodStatus{
Phase: v1.PodPending,
},
}
// If no scheduler name in Pod, use scheduler name from Reservation.
if len(pod.Spec.SchedulerName) == 0 {
pod.Spec.SchedulerName = reservation.Spec.SchedulerName
}
// Set some default value for matching the pod using reservation
if pod.Spec.ServiceAccountName == "" {
pod.Spec.ServiceAccountName = "default"
}
if pod.Spec.AutomountServiceAccountToken == nil {
trueVal := true
pod.Spec.AutomountServiceAccountToken = &trueVal
}
defaultTolerations := []v1.Toleration{
{
Key: "node.kubernetes.io/not-ready",
Operator: v1.TolerationOpExists,
Effect: v1.TaintEffectNoExecute,
TolerationSeconds: intPtr(300),
},
{
Key: "node.kubernetes.io/unreachable",
Operator: v1.TolerationOpExists,
Effect: v1.TaintEffectNoExecute,
TolerationSeconds: intPtr(300),
},
}
pod.Spec.Tolerations = mergeTolerations(pod.Spec.Tolerations, defaultTolerations)
tsKey := templateCopy.Name
if len(tsKey) == 0 {
tsKey = batch.DefaultTaskSpec
}
if len(pod.Annotations) == 0 {
pod.Annotations = make(map[string]string)
}
index := strconv.Itoa(ix)
pod.Annotations[batch.TaskIndex] = index
pod.Annotations[batch.TaskSpecKey] = tsKey
pod.Annotations[schedulingv1beta1.KubeGroupNameAnnotationKey] = getPodGroupName(reservation)
pod.Annotations[batch.ReservationNameKey] = reservation.Name
pod.Annotations[batch.QueueNameKey] = reservation.Spec.Queue
pod.Annotations[batch.PodTemplateKey] = fmt.Sprintf("%s-%s", reservation.Name, template.Name)
// important
pod.Annotations[schedulingv1beta1.VolcanoGroupReservationOnlyAnnotationKey] = "true"
if len(pod.Labels) == 0 {
pod.Labels = make(map[string]string)
}
// Set pod labels for Service.
pod.Labels[batch.TaskIndex] = index
pod.Labels[batch.ReservationNameKey] = reservation.Name
pod.Labels[batch.TaskSpecKey] = tsKey
pod.Labels[batch.JobNamespaceKey] = reservation.Namespace
pod.Labels[batch.QueueNameKey] = reservation.Spec.Queue
return pod
}
// AddHyperNode adds hyperNode name to the hyperNodesQueue.
func (sc *SchedulerCache) AddHyperNode(obj interface{}) {
hn, ok := obj.(*topologyv1alpha1.HyperNode)
@ -1390,3 +1656,95 @@ func (sc *SchedulerCache) updateHyperNode(hn *topologyv1alpha1.HyperNode) error
func (sc *SchedulerCache) deleteHyperNode(name string) error {
return sc.HyperNodesInfo.DeleteHyperNode(name)
}
func (sc *SchedulerCache) getQueueByName(name string) (*schedulingapi.QueueInfo, error) {
if queue, ok := sc.Queues[schedulingapi.QueueID(name)]; ok {
return queue, nil
}
return nil, fmt.Errorf("queue <%s> not found", name)
}
func (sc *SchedulerCache) initiateReservation(reservation *scheduling.Reservation) (*scheduling.Reservation, error) {
klog.V(3).Infof("Starting to initiate Reservation <%s/%s>", reservation.Namespace, reservation.Name)
reservationInstance, err := sc.initReservationStatus(reservation)
if err != nil {
return nil, err
}
if err := sc.createReservationPodGroup(reservationInstance); err != nil {
return nil, err
}
return reservationInstance, nil
}
func (sc *SchedulerCache) initReservationStatus(reservation *scheduling.Reservation) (*scheduling.Reservation, error) {
if reservation.Status.State.Phase != "" {
return reservation, nil
}
reservation.Status.State.Phase = scheduling.ReservationPending
reservation.Status.State.LastTransitionTime = metav1.Now()
reservation.Status.MinAvailable = reservation.Spec.MinAvailable
reservationCondition := newCondition(reservation.Status.State.Phase, &reservation.Status.State.LastTransitionTime)
reservation.Status.Conditions = append(reservation.Status.Conditions, reservationCondition)
// calculate the resources
reservation.Status.Allocatable = calculateAllocatable(reservation)
reservationV1beta1 := &schedulingv1beta1.Reservation{}
if err := scheme.Scheme.Convert(reservation, reservationV1beta1, nil); err != nil {
klog.Errorf("Error while converting scheduling.Reservation to v1beta1.Reservation with error: %v", err)
return nil, err
}
newReservationV1beta1, err := sc.vcClient.SchedulingV1beta1().Reservations(reservation.Namespace).UpdateStatus(context.TODO(), reservationV1beta1, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update status of Reservation %v/%v: %v",
reservation.Namespace, reservation.Name, err)
return nil, err
}
newReservation := &scheduling.Reservation{}
if err := scheme.Scheme.Convert(newReservationV1beta1, newReservation, nil); err != nil {
klog.Errorf("Error while converting returned v1beta1.Reservation to scheduling.Reservation with error: %v", err)
return nil, err
}
return newReservation, nil
}
func (sc *SchedulerCache) createReservationPodGroup(reservation *scheduling.Reservation) error {
minTaskMember := map[string]int32{}
for _, task := range reservation.Spec.Tasks {
minTaskMember[task.Name] = task.Replicas
}
minReq := calculateAllocatable(reservation)
annotations := make(map[string]string)
for k, v := range reservation.Annotations {
annotations[k] = v
}
annotations[schedulingv1beta1.VolcanoGroupReservationOnlyAnnotationKey] = "true"
pg := &scheduling.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: reservation.Namespace,
// add reservation.UID into its name when create new PodGroup
Name: generateReservationPodGroupName(reservation),
Annotations: annotations,
Labels: reservation.Labels,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(reservation, helpers.ReservationKind),
},
},
Spec: scheduling.PodGroupSpec{
MinMember: reservation.Spec.MinAvailable,
MinTaskMember: minTaskMember,
Queue: reservation.Spec.Queue,
MinResources: &minReq,
},
}
sc.addPodGroup(pg)
return nil
}

View File

@ -21,14 +21,15 @@ func GetBindMethod() Binder {
// BinderRegistry is used to hold the registered binders, such as pre-binders, post-binders
type BinderRegistry struct {
mu sync.RWMutex
preBinders map[string]PreBinder
// Can add postBinders in the future
mu sync.RWMutex
preBinders map[string]PreBinder
postBinders map[string]PostBinder
}
func NewBinderRegistry() *BinderRegistry {
return &BinderRegistry{
preBinders: make(map[string]PreBinder),
preBinders: make(map[string]PreBinder),
postBinders: make(map[string]PostBinder),
}
}
@ -43,4 +44,9 @@ func (r *BinderRegistry) Register(name string, binder interface{}) {
klog.V(5).Infof("Register preBinder %s successfully", name)
r.preBinders[name] = pb
}
if pb, ok := binder.(PostBinder); ok {
klog.V(5).Infof("Register postBinder %s successfully", name)
r.postBinders[name] = pb
}
}

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework"
vcclient "volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/scheduler/api"
)
@ -87,6 +88,11 @@ type Cache interface {
// SharedDRAManager returns the shared DRAManager
SharedDRAManager() framework.SharedDRAManager
// GetReservationCache returns the reservation cache
GetReservationCache() *ReservationCache
SyncBindToReservationTask(task *api.TaskInfo) error
}
// Binder interface for binding task and hostname
@ -117,3 +123,7 @@ type PreBinder interface {
// PreBindRollBack is called when the pre-bind or bind fails.
PreBindRollBack(ctx context.Context, bindCtx *BindContext)
}
type PostBinder interface {
PostBind(ctx context.Context, bindCtx *BindContext) error
}

439
pkg/scheduler/cache/reservation_cache.go vendored Normal file
View File

@ -0,0 +1,439 @@
/*
Copyright 2021 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"context"
"fmt"
"sync"
"time"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"volcano.sh/apis/pkg/apis/batch/v1alpha1"
"volcano.sh/apis/pkg/apis/helpers"
"volcano.sh/apis/pkg/apis/scheduling"
vcclientset "volcano.sh/apis/pkg/client/clientset/versioned"
schedulerapi "volcano.sh/volcano/pkg/scheduler/api"
)
type ReservationCache struct {
sync.RWMutex
vcClient vcclientset.Interface
reservations map[types.UID]*schedulerapi.ReservationInfo
nameToUID map[string]types.UID
}
func newReservationCache(vcClient vcclientset.Interface) *ReservationCache {
return &ReservationCache{
RWMutex: sync.RWMutex{},
vcClient: vcClient,
reservations: make(map[types.UID]*schedulerapi.ReservationInfo),
nameToUID: make(map[string]types.UID),
}
}
func (rc *ReservationCache) AddReservation(reservation *schedulerapi.ReservationInfo) {
rc.Lock()
defer rc.Unlock()
rc.reservations[reservation.Reservation.UID] = reservation
rc.nameToUID[reservation.Reservation.Name] = reservation.Reservation.UID
}
func (rc *ReservationCache) DeleteReservation(reservationId types.UID) {
rc.Lock()
defer rc.Unlock()
reservation, ok := rc.reservations[reservationId]
if ok {
delete(rc.reservations, reservation.Reservation.UID)
delete(rc.nameToUID, reservation.Reservation.Name)
}
}
func (rc *ReservationCache) GetReservationById(reservationId types.UID) (*schedulerapi.ReservationInfo, bool) {
return rc.getReservationInfo(reservationId)
}
func (rc *ReservationCache) getReservationInfo(reservationId types.UID) (*schedulerapi.ReservationInfo, bool) {
rc.RLock()
defer rc.RUnlock()
reservation, ok := rc.reservations[reservationId]
if !ok {
klog.Errorf("ReservationInfo with UID %s not found in cache", reservationId)
return nil, false
}
return reservation, true
}
func (rc *ReservationCache) GetReservationByName(name string) (*schedulerapi.ReservationInfo, bool) {
rc.RLock()
defer rc.RUnlock()
uid, ok := rc.nameToUID[name]
if !ok {
return nil, false
}
res, ok := rc.reservations[uid]
return res, ok
}
func (rc *ReservationCache) SyncTaskStatus(task *schedulerapi.TaskInfo, job *schedulerapi.JobInfo) error {
rc.Lock()
defer rc.Unlock()
reservation, ok := rc.getReservationByTask(task)
if !ok {
return fmt.Errorf("cannot find reservation from task <%s/%s>", task.Namespace, task.Name)
}
// re-calculate task status and update reservation status
if err := rc.syncReservation(reservation, job); err != nil {
klog.Errorf("Failed to update status of Reservation %v: %v",
reservation.Reservation.UID, err)
return err
}
return nil
}
// todo: Async use Queue
func (rc *ReservationCache) syncReservation(reservation *schedulerapi.ReservationInfo, job *schedulerapi.JobInfo) error {
rsveV1beta1, err := rc.vcClient.SchedulingV1beta1().Reservations(reservation.Reservation.Namespace).Get(context.TODO(), reservation.Reservation.Name, metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed to get Reservation %s/%s: %v", reservation.Reservation.Namespace, reservation.Reservation.Name, err)
return err
}
rsve, err := ConvertToInternalReservation(rsveV1beta1)
if err != nil {
klog.Errorf("Failed to convert reservation from %T to %T", rsveV1beta1, rsve)
return err
}
oldStatus := reservation.Reservation.Status.DeepCopy()
taskStatusCount := make(map[string]v1alpha1.TaskState)
var pending, available, succeeded, failed int32
calculateTasksStatus(job, &taskStatusCount, &pending, &available, &succeeded, &failed)
newStatus := scheduling.ReservationStatus{
State: oldStatus.State,
MinAvailable: oldStatus.MinAvailable,
TaskStatusCount: taskStatusCount,
Pending: pending,
Available: available,
Succeeded: succeeded,
Failed: failed,
CurrentOwner: oldStatus.CurrentOwner,
Allocatable: oldStatus.Allocatable,
Allocated: job.Allocated.ConvertResourceToResourceList(),
}
rsve.Status = newStatus
rsve.Status.State = calculateReservationState(pending, available, succeeded, failed)
reservationCondition := newCondition(rsve.Status.State.Phase, &rsve.Status.State.LastTransitionTime)
rsve.Status.Conditions = append(rsve.Status.Conditions, reservationCondition)
newReservationV1, err := ConvertToV1beta1Reservation(rsve)
if err != nil {
klog.Errorf("Failed to convert reservation from %T to %T", rsve, newReservationV1)
return err
}
// update reservation status in API server
newReservationV1beta1, err := rc.vcClient.SchedulingV1beta1().Reservations(rsve.Namespace).UpdateStatus(context.TODO(), newReservationV1, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update status of Reservation %v/%v: %v",
rsve.Namespace, rsve.Name, err)
return err
}
newReservation, err := ConvertToInternalReservation(newReservationV1beta1)
if err != nil {
klog.Errorf("Failed to convert reservation from %T to %T", newReservationV1beta1, newReservation)
return err
}
// sync cache
reservation.Reservation = newReservation
return nil
}
func (rc *ReservationCache) AllocateJobToReservation(task *schedulerapi.TaskInfo, targetJob *schedulerapi.JobInfo) error {
klog.V(5).Infof("AllocateJobToReservation: task <%s/%s> targetJob <%s/%s>", task.Namespace, task.Name, targetJob.Namespace, targetJob.Name)
rc.Lock()
defer rc.Unlock()
reservation, ok := rc.getReservationByTask(task)
if !ok {
return fmt.Errorf("cannot find reservation from task <%s/%s>", task.Namespace, task.Name)
}
if reservation.Reservation.Status.CurrentOwner.Name == "" {
pg := targetJob.PodGroup
if len(pg.OwnerReferences) > 0 {
ownerRef := pg.OwnerReferences[0]
reservation.Reservation.Status.CurrentOwner = v1.ObjectReference{
Kind: ownerRef.Kind,
Namespace: pg.Namespace,
Name: ownerRef.Name,
UID: ownerRef.UID,
APIVersion: ownerRef.APIVersion,
}
} else {
reservation.Reservation.Status.CurrentOwner = v1.ObjectReference{
Namespace: targetJob.Namespace,
Name: targetJob.Name,
UID: types.UID(targetJob.UID),
}
}
klog.V(5).Infof("Setting current owner for reservation <%s/%s> to job <%s/%s>", reservation.Reservation.Namespace, reservation.Reservation.Name, targetJob.Namespace, targetJob.Name)
}
return nil
}
func (rc *ReservationCache) MatchReservationForPod(pod *v1.Pod) (*schedulerapi.ReservationInfo, bool) {
rc.RLock()
defer rc.RUnlock()
for _, reservationInfo := range rc.reservations {
if reservationInfo == nil || reservationInfo.Reservation == nil {
continue
}
reservation := reservationInfo.Reservation
for _, owner := range reservation.Spec.Owners {
if owner.Object != nil &&
owner.Object.Kind == "Pod" &&
owner.Object.Name == pod.Name &&
owner.Object.Namespace == pod.Namespace {
return reservationInfo, true
}
}
}
return nil, false
}
// Assumes that lock is already acquired.
func (rc *ReservationCache) getReservationByTask(task *schedulerapi.TaskInfo) (*schedulerapi.ReservationInfo, bool) {
if task == nil || task.Pod == nil {
return nil, false
}
reservationUID := GetReservationUIDFromTask(task)
if reservationUID == "" {
return nil, false
}
reservation, ok := rc.reservations[reservationUID]
return reservation, ok
}
func (rc *ReservationCache) ScanExpiredReservations(now time.Time, onExpired func(*schedulerapi.ReservationInfo)) {
rc.RLock()
defer rc.RUnlock()
for _, reservation := range rc.reservations {
if isReservationNeedExpiration(reservation, now) {
onExpired(reservation)
}
}
}
func (rc *ReservationCache) GcExpiredReservation(reservation *schedulerapi.ReservationInfo) error {
rc.Lock()
defer rc.Unlock()
// Remove reservation from cache
delete(rc.reservations, reservation.Reservation.UID)
delete(rc.nameToUID, reservation.Reservation.Name)
// Sync status to API server
rsveV1beta1, err := rc.vcClient.SchedulingV1beta1().Reservations(reservation.Reservation.Namespace).Get(context.TODO(), reservation.Reservation.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
klog.V(3).Infof("Reservation %s/%s not found in API server, maybe it's already deleted.",
reservation.Reservation.Namespace, reservation.Reservation.Name)
return nil
}
klog.Errorf("Failed to get Reservation %s/%s for GC: %v",
reservation.Reservation.Namespace, reservation.Reservation.Name, err)
return err
}
rsve, err := ConvertToInternalReservation(rsveV1beta1)
if err != nil {
klog.Errorf("Failed to convert reservation from %T to %T", rsveV1beta1, rsve)
return err
}
now := metav1.Now()
rsve.Status.State.Phase = scheduling.ReservationFailed
rsve.Status.State.Reason = "Expired"
rsve.Status.State.Message = "Reservation expired and was cleaned up by the scheduler"
rsve.Status.State.LastTransitionTime = now
reservationCondition := newCondition(rsve.Status.State.Phase, &rsve.Status.State.LastTransitionTime)
rsve.Status.Conditions = append(rsve.Status.Conditions, reservationCondition)
newReservationV1, err := ConvertToV1beta1Reservation(rsve)
if err != nil {
klog.Errorf("Failed to convert reservation from %T to %T", rsve, newReservationV1)
return err
}
_, err = rc.vcClient.SchedulingV1beta1().Reservations(rsve.Namespace).UpdateStatus(context.TODO(), newReservationV1, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update status of Reservation %v/%v: %v",
rsve.Namespace, rsve.Name, err)
return err
}
return nil
}
func GetReservationUIDFromTask(taskInfo *schedulerapi.TaskInfo) types.UID {
if taskInfo == nil || taskInfo.Pod == nil {
return ""
}
for _, ownerRef := range taskInfo.Pod.OwnerReferences {
if ownerRef.Kind == helpers.ReservationKind.Kind && ownerRef.APIVersion == helpers.ReservationKind.GroupVersion().String() {
return ownerRef.UID
}
}
return ""
}
func calculateTasksStatus(job *schedulerapi.JobInfo, taskStatusCount *map[string]v1alpha1.TaskState, pending *int32, available *int32, succeeded *int32, failed *int32) {
for _, task := range job.Tasks {
if task == nil || task.Pod == nil {
continue
}
taskName, ok := task.Pod.Annotations[v1alpha1.TaskSpecKey]
if !ok {
continue
}
state, exists := (*taskStatusCount)[taskName]
if !exists {
state = v1alpha1.TaskState{Phase: make(map[v1.PodPhase]int32)}
}
switch task.Status {
case schedulerapi.Pending:
state.Phase[v1.PodPending]++
*pending++
case schedulerapi.Bound:
state.Phase[v1.PodRunning]++
*available++
case schedulerapi.Succeeded:
state.Phase[v1.PodSucceeded]++
*succeeded++
case schedulerapi.Failed:
state.Phase[v1.PodFailed]++
*failed++
default:
state.Phase[v1.PodUnknown]++
}
(*taskStatusCount)[taskName] = state
}
}
func calculateReservationState(pending, available, succeeded, failed int32) scheduling.ReservationState {
now := metav1.Now()
total := pending + available + succeeded + failed
var phase scheduling.ReservationPhase
var reason, message string
switch {
case failed > 0:
phase = scheduling.ReservationFailed
reason = "TaskFailed"
message = fmt.Sprintf("Reservation failed: %d/%d task(s) failed", failed, total)
case succeeded == total && total > 0:
phase = scheduling.ReservationSucceeded
reason = "AllSucceeded"
message = fmt.Sprintf("Reservation succeeded: all %d task(s) have be allocated", total)
case available == total && total > 0:
phase = scheduling.ReservationAvailable
reason = "AllAvailable"
message = fmt.Sprintf("Reservation available: all %d task(s) are available", available)
case available > 0 && available < total:
phase = scheduling.ReservationWaiting
reason = "PartiallyAvailable"
message = fmt.Sprintf("Reservation waiting: %d/%d task(s) are available", available, total)
default:
// available == 0
phase = scheduling.ReservationPending
reason = "NoAvailableTasks"
message = fmt.Sprintf("Reservation pending: no tasks are available yet (total: %d)", total)
}
return scheduling.ReservationState{
Phase: phase,
Reason: reason,
Message: message,
LastTransitionTime: now,
}
}
func newCondition(status scheduling.ReservationPhase, lastTransitionTime *metav1.Time) scheduling.ReservationCondition {
return scheduling.ReservationCondition{
Status: status,
LastTransitionTime: lastTransitionTime,
}
}
func isReservationNeedExpiration(reservation *schedulerapi.ReservationInfo, now time.Time) bool {
// 1. Skip failed or succeeded reservations
rs := reservation.Reservation
if rs.Status.State.Phase == scheduling.ReservationFailed || rs.Status.State.Phase == scheduling.ReservationSucceeded {
return false
}
// 2. Skip if TTL is set to 0
if rs.Spec.TTL != nil && rs.Spec.TTL.Duration == 0 {
return false
}
// 3. Check expiration via Expires field (preferred)
if rs.Spec.Expires != nil {
expireAt := rs.Spec.Expires.Time.UTC()
if now.UTC().After(expireAt) {
return true
}
}
// 4. Fallback to TTL-based expiration
if rs.Spec.TTL != nil {
createAt := rs.CreationTimestamp.Time.UTC()
ttlExpireAt := createAt.Add(rs.Spec.TTL.Duration)
if now.UTC().After(ttlExpireAt) {
return true
}
}
return false
}

View File

@ -24,10 +24,13 @@ import (
"strings"
v1 "k8s.io/api/core/v1"
quotav1 "k8s.io/apiserver/pkg/quota/v1"
"k8s.io/klog/v2"
"stathat.com/c/consistent"
scheduling "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/apis/pkg/apis/scheduling"
"volcano.sh/apis/pkg/apis/scheduling/scheme"
schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/controllers/util"
)
type hyperNodeEventSource string
@ -81,7 +84,7 @@ func responsibleForNode(nodeName string, mySchedulerPodName string, c *consisten
}
// responsibleForPodGroup returns true if Job which PodGroup belongs is assigned to current scheduler in multi-schedulers scenario
func responsibleForPodGroup(pg *scheduling.PodGroup, mySchedulerPodName string, c *consistent.Consistent) bool {
func responsibleForPodGroup(pg *schedulingv1beta1.PodGroup, mySchedulerPodName string, c *consistent.Consistent) bool {
if c != nil {
var key string
if len(pg.OwnerReferences) != 0 {
@ -132,3 +135,68 @@ func getHyperNodeEventSource(source string) []string {
}
return parts
}
// mergeTolerations merges the original tolerations with the default tolerations.
func mergeTolerations(orig, defaults []v1.Toleration) []v1.Toleration {
exists := map[string]bool{}
for _, t := range orig {
key := tolerationKey(t)
exists[key] = true
}
for _, t := range defaults {
key := tolerationKey(t)
if !exists[key] {
orig = append(orig, t)
exists[key] = true
}
}
return orig
}
// generateTolerationKey generates a unique key for a toleration.
func tolerationKey(t v1.Toleration) string {
seconds := int64(0)
if t.TolerationSeconds != nil {
seconds = *t.TolerationSeconds
}
return fmt.Sprintf("%s/%s/%s/%d", t.Key, t.Operator, t.Effect, seconds)
}
// intPtr converts an int to a pointer to an int64.
func intPtr(i int) *int64 {
v := int64(i)
return &v
}
func isInitiated(rc *scheduling.Reservation) bool {
if rc.Status.State.Phase == "" || rc.Status.State.Phase == scheduling.ReservationPending {
return false
}
return true
}
func calculateAllocatable(reservation *scheduling.Reservation) v1.ResourceList {
tasks := reservation.Spec.Tasks
total := v1.ResourceList{}
for _, task := range tasks {
total = quotav1.Add(total, util.CalTaskRequests(&v1.Pod{Spec: task.Template.Spec}, task.Replicas))
}
return total
}
func generateReservationPodGroupName(reservation *scheduling.Reservation) string {
return fmt.Sprintf("%s-%s", reservation.Name, string(reservation.UID))
}
func ConvertToInternalReservation(reservationV1beta1 *schedulingv1beta1.Reservation) (*scheduling.Reservation, error) {
reservation := &scheduling.Reservation{}
err := scheme.Scheme.Convert(reservationV1beta1, reservation, nil)
return reservation, err
}
func ConvertToV1beta1Reservation(reservation *scheduling.Reservation) (*schedulingv1beta1.Reservation, error) {
reservationV1beta1 := &schedulingv1beta1.Reservation{}
err := scheme.Scheme.Convert(reservation, reservationV1beta1, nil)
return reservationV1beta1, err
}

View File

@ -25,6 +25,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
@ -34,12 +35,13 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"
"volcano.sh/apis/pkg/apis/scheduling"
schedulingscheme "volcano.sh/apis/pkg/apis/scheduling/scheme"
vcv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
vcclient "volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/api/helpers"
"volcano.sh/volcano/pkg/scheduler/cache"
"volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/metrics"
@ -785,3 +787,188 @@ func (ssn *Session) String() string {
return msg
}
func (ssn *Session) MatchReservationForPod(job *api.JobInfo) {
klog.V(3).Infof("MatchReservationForPod job <%v/%v>", job.Namespace, job.Name)
klog.V(5).Infof("[Debug]: %+v, tasks: %+v", job, job.Tasks)
reservationName := job.GetReservationName()
if reservationName != "" {
return
}
// try match a reservation for the job
if len(job.Tasks) != 1 {
return
}
var task *api.TaskInfo
for _, t := range job.Tasks {
task = t
break
}
if task == nil || task.Pod == nil {
return
}
pod := task.Pod
klog.V(3).Infof("MatchReservationForPod pod <%s/%s>", pod.Namespace, pod.Name)
// check if the pod has a reservation
reservation, ok := ssn.cache.GetReservationCache().MatchReservationForPod(pod)
if ok {
job.SetReservation(reservation)
klog.V(3).Infof("Job <%s/%s> has matched reservation <%s> for pod <%s/%s>", job.Namespace, job.Name, reservation.Reservation.Name, pod.Namespace, pod.Name)
}
}
func (ssn *Session) CheckReservationAvailable(job *api.JobInfo) bool {
reservationName := job.GetReservationName()
// not using reservation, return true
if reservationName == "" {
return true
}
reservationInfo, ok := ssn.cache.GetReservationCache().GetReservationByName(reservationName)
reservation := reservationInfo.Reservation
if !ok {
klog.V(4).Infof("Reservation %s is not available for job <%s/%s>", reservationName, job.Namespace, job.Name)
return false
}
owner := reservation.Status.CurrentOwner
if owner.Name == job.Name && owner.Namespace == job.Namespace {
return true
}
if reservation.Status.State.Phase != scheduling.ReservationAvailable {
klog.V(4).Infof("Reservation %s is not in available phase for job <%s/%s>", reservationName, job.Namespace, job.Name)
return false
}
return true
}
func (ssn *Session) CheckReservationOwners(job *api.JobInfo) bool {
reservationName := job.GetReservationName()
if reservationName == "" {
return true
}
reservationInfo, ok := ssn.cache.GetReservationCache().GetReservationByName(reservationName)
if !ok {
return false
}
owners := reservationInfo.Reservation.Spec.Owners
pg := job.PodGroup
for _, owner := range owners {
// 1. Match by object reference
if owner.Object != nil {
// convert to actual name in cache
actualName := fmt.Sprintf("%s-%s", owner.Object.Name, pg.UID)
if (actualName == pg.Name) && owner.Object.Namespace == pg.Namespace {
return true
}
if pg.OwnerReferences != nil {
for _, ownerRef := range pg.OwnerReferences {
if owner.Object.Name == ownerRef.Name {
return true
}
}
}
if owner.Object.Kind == "Pod" {
for _, task := range job.Tasks {
if task.Pod == nil {
continue
}
pod := task.Pod
if pod.Name == owner.Object.Name && pod.Namespace == owner.Object.Namespace {
return true
}
}
}
}
// 2. Match by label selector
if owner.LabelSelector != nil {
selector, err := metav1.LabelSelectorAsSelector(owner.LabelSelector)
if err != nil {
continue
}
if selector.Matches(labels.Set(pg.Labels)) {
return true
}
}
}
klog.V(1).Infof("The owner of Reservation %s is not matched with job <%s/%s> owners", reservationName, job.Namespace, job.Name)
return false
}
func (ssn *Session) CheckReservationMatch(job *api.JobInfo) bool {
reservationName := job.GetReservationName()
if reservationName == "" {
return true
}
reservationInfo, ok := ssn.cache.GetReservationCache().GetReservationByName(reservationName)
if !ok {
return false
}
resvTasks := reservationInfo.JobInfo.Tasks
jobTasks := job.Tasks
if len(resvTasks) != len(jobTasks) {
klog.V(5).Infof("Reservation %s tasks count %d is not equal to job %s tasks count %d", reservationName, len(resvTasks), job.Name, len(jobTasks))
return false
}
// jobTask -> resvTask
matched := make(map[*api.TaskInfo]*api.TaskInfo)
// resvTask -> bool
used := make(map[*api.TaskInfo]bool)
for _, task := range jobTasks {
// skip tasks that are not in pending or has reservation task info
if task.Status != api.Pending || task.ReservationTaskInfo != nil {
continue
}
// find a matching reservation task for every job task
found := false
for _, resvTask := range resvTasks {
if used[resvTask] {
continue
}
if resvTask.Status != api.Bound {
continue
}
if resvTask.Pod == nil || task.Pod == nil {
continue
}
klog.Infof("[debug] task pod: %+v", task.Pod.Spec)
klog.Infof("[debug] resv pod: %+v", resvTask.Pod.Spec)
if helpers.IsPodSpecMatch(&task.Pod.Spec, &resvTask.Pod.Spec) {
matched[task] = resvTask
used[resvTask] = true
found = true
break
}
}
if !found {
return false
}
}
for jobTask, resvTask := range matched {
jobTask.ReservationTaskInfo = resvTask
}
klog.V(1).Infof("[debug]: matched: %v", matched)
klog.V(1).Infof("[debug]: used: %v", used)
return true
}
func (ssn *Session) Cache() cache.Cache {
return ssn.cache
}

View File

@ -22,8 +22,8 @@ import (
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
schedulingv1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/cache"
"volcano.sh/volcano/pkg/scheduler/util"

View File

@ -276,6 +276,7 @@ func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err er
task.NodeName = hostname
if node, found := s.ssn.Nodes[hostname]; found {
// fixme: if task is using reservation, we should remove the resource reservation task uses first.
if err := node.AddTask(task); err != nil {
klog.Errorf("Failed to add task <%v/%v> to node <%v> when allocating in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, err)

View File

@ -36,6 +36,7 @@ import (
"volcano.sh/volcano/pkg/scheduler/plugins/priority"
"volcano.sh/volcano/pkg/scheduler/plugins/proportion"
"volcano.sh/volcano/pkg/scheduler/plugins/rescheduling"
"volcano.sh/volcano/pkg/scheduler/plugins/reservation"
"volcano.sh/volcano/pkg/scheduler/plugins/resourcequota"
"volcano.sh/volcano/pkg/scheduler/plugins/sla"
tasktopology "volcano.sh/volcano/pkg/scheduler/plugins/task-topology"
@ -64,6 +65,7 @@ func init() {
framework.RegisterPluginBuilder(pdb.PluginName, pdb.New)
framework.RegisterPluginBuilder(nodegroup.PluginName, nodegroup.New)
framework.RegisterPluginBuilder(networktopologyaware.PluginName, networktopologyaware.New)
framework.RegisterPluginBuilder(reservation.PluginName, reservation.New)
// Plugins for Queues
framework.RegisterPluginBuilder(proportion.PluginName, proportion.New)

View File

@ -0,0 +1,149 @@
/*
Copyright 2019 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reservation
import (
"context"
"fmt"
"k8s.io/klog/v2"
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/cache"
"volcano.sh/volcano/pkg/scheduler/framework"
)
const (
// PluginName indicates name of volcano scheduler plugin.
PluginName = "reservation"
)
type reservationPlugin struct {
// Arguments given for the plugin
session *framework.Session
}
// New function returns prioritizePlugin object
func New(aruguments framework.Arguments) framework.Plugin {
return &reservationPlugin{}
}
func (rp *reservationPlugin) Name() string {
return PluginName
}
func (rp *reservationPlugin) OnSessionOpen(ssn *framework.Session) {
klog.V(5).Infof("Enter reservation plugin ...")
rp.session = ssn
defer func() {
klog.V(5).Infof("Leaving reservation plugin...")
}()
validJobFn := func(obj interface{}) *api.ValidateResult {
job, ok := obj.(*api.JobInfo)
if !ok {
return &api.ValidateResult{
Pass: false,
Message: fmt.Sprintf("Failed to convert <%v> to *JobInfo", obj),
}
}
ssn.MatchReservationForPod(job)
if valid := ssn.CheckReservationAvailable(job); !valid {
return &api.ValidateResult{
Pass: false,
Reason: v1beta1.InvalidReservationReason,
Message: fmt.Sprintf("Reservation specified by job <%s/%s> is not Available", job.Namespace, job.Name),
}
}
if ownerMatched := ssn.CheckReservationOwners(job); !ownerMatched {
return &api.ValidateResult{
Pass: false,
Reason: v1beta1.ReservationOwnerNotMatchReason,
Message: fmt.Sprintf(
"Reservation specified by job <%s/%s> is not owned by the job (ownership mismatch by ownerObject or label selectors)",
job.Namespace, job.Name,
),
}
}
if specMatched := ssn.CheckReservationMatch(job); !specMatched {
return &api.ValidateResult{
Pass: false,
Reason: v1beta1.ReservationSpecNotMatchReason,
Message: fmt.Sprintf(
"Reservation specified by job <%s/%s> does not match job task spec: task count or PodSpec does not match",
job.Namespace, job.Name,
),
}
}
return nil
}
ssn.AddJobValidFn(rp.Name(), validJobFn)
bestNodeFn := func(task *api.TaskInfo, scores map[float64][]*api.NodeInfo) *api.NodeInfo {
klog.V(5).Infof("[debug]: enter reservation best node function for task %s/%s", task.Namespace, task.Name)
if !task.IsReservationTask() {
return nil
}
reservationNodeNames := task.ReservationNodeNames
if len(reservationNodeNames) == 0 {
return nil
}
nodeSet := make(map[string]struct{})
for _, nodeList := range scores {
for _, node := range nodeList {
nodeSet[node.Name] = struct{}{}
}
}
// match reservation node names specified in given order with available nodes
for _, reserved := range reservationNodeNames {
if _, ok := nodeSet[reserved]; ok {
klog.V(5).Infof("[debug]: Found reservation node %s for task %s/%s", reserved, task.Namespace, task.Name)
return ssn.Nodes[reserved]
}
}
klog.V(5).Infof("[debug]: None of the specified reserved nodes are available for task %s/%s, falling back to scheduler default decision", task.Namespace, task.Name)
return nil
}
ssn.AddBestNodeFn(rp.Name(), bestNodeFn)
ssn.RegisterBinder(rp.Name(), rp)
}
func (rp *reservationPlugin) PostBind(ctx context.Context, bindCtx *cache.BindContext) error {
task := bindCtx.TaskInfo
if !task.IsUseReservationTask() {
return nil
}
if err := rp.session.Cache().SyncBindToReservationTask(task); err != nil {
klog.Errorf("Failed to sync task %s to reservation task, err: %v", task.Name, err)
}
return nil
}
func (rp *reservationPlugin) OnSessionClose(ssn *framework.Session) {}

View File

@ -0,0 +1 @@
package reservation