feat(controller): Integrate DependsOn API (#2484)

* feat(controller): Integrate DependsOn API

Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com>

* Use go for unit test

Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com>

* Update Makefile

Co-authored-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com>

* Update Makefile

Co-authored-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com>

* Fix integration test

Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com>

* Fix e2e

Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com>

* Exit 1 if e2e fails

Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com>

---------

Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com>
Co-authored-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
This commit is contained in:
Andrey Velichkevich 2025-03-07 01:11:34 +00:00 committed by GitHub
parent 870dac6cef
commit 9ac32413c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 173 additions and 169 deletions

View File

@ -1,7 +1,6 @@
name: E2E Test
on:
- push
- pull_request
jobs:
@ -51,7 +50,7 @@ jobs:
- name: Run e2e with Go
run: |
make test-e2e || kubectl logs -n kubeflow-system -l app.kubernetes.io/name=trainer
make test-e2e || (kubectl logs -n kubeflow-system -l app.kubernetes.io/name=trainer && exit 1)
- name: Run e2e test for example Notebooks
run: |

View File

@ -1,7 +1,6 @@
name: Unit and Integration Test - Go
on:
- push
- pull_request
jobs:

View File

@ -1,7 +1,6 @@
name: Unit and Integration Test - Python
on:
- push
- pull_request
jobs:

View File

@ -117,8 +117,8 @@ test: ## Run Go unit test.
go test $(shell go list ./... | grep -v '/test/') -coverprofile cover.out
.PHONY: test-integration
test-integration: envtest jobset-operator-crd scheduler-plugins-crd ## Run Go integration test.
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(K8S_VERSION) -p path)" go test ./test/integration/... -coverprofile cover.out
test-integration: ginkgo envtest jobset-operator-crd scheduler-plugins-crd ## Run Go integration test.
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(K8S_VERSION) -p path)" $(GINKGO) -v ./test/integration/... -coverprofile cover.out
.PHONY: test-python
test-python: ## Run Python unit test.

View File

@ -770,10 +770,8 @@ spec:
numNodes: 1
template:
spec:
startupPolicy:
startupPolicyOrder: InOrder
replicatedJobs:
- name: Initializer
- name: initializer
template:
spec:
template:
@ -802,7 +800,10 @@ spec:
- name: model-initializer
persistentVolumeClaim:
claimName: model-initializer
- name: Node
- name: trainer-node
dependsOn:
- name: initializer
status: Complete
template:
spec:
template:
@ -1446,10 +1447,8 @@ spec:
numNodes: 1
template:
spec:
startupPolicy:
startupPolicyOrder: InOrder
replicatedJobs:
- name: Initializer
- name: initializer
template:
spec:
template:
@ -1480,7 +1479,10 @@ spec:
- name: model-initializer
persistentVolumeClaim:
claimName: model-initializer
- name: Node
- name: trainer-node
dependsOn:
- name: initializer
status: Complete
template:
spec:
template:
@ -1531,10 +1533,8 @@ spec:
numNodes: 1
template:
spec:
startupPolicy:
startupPolicyOrder: InOrder
replicatedJobs:
- name: Initializer
- name: initializer
template:
spec:
template:
@ -1565,7 +1565,10 @@ spec:
- name: model-initializer
persistentVolumeClaim:
claimName: model-initializer
- name: Node
- name: trainer-node
dependsOn:
- name: initializer
status: Complete
template:
spec:
template:
@ -1650,8 +1653,6 @@ spec:
mpiImplementation: OpenMPI
numProcPerNode: 5
template:
startupPolicy:
startupPolicyOrder: InOrder
replicatedJobs:
- name: launcher
template:
@ -1664,6 +1665,9 @@ spec:
command:
- mpirun launch-job
- name: trainer-node
dependsOn:
- name: launcher
status: Ready
template:
spec:
template:
@ -1726,28 +1730,28 @@ On the other hand, the Internal APIs are not exposed and could not add any opera
- `Start Manager`: Start Manager.
- Extension Point
- `WatchExtension`: This registers arbitrary reconciler builders for watching any kind of resources
and triggering TrainJob reconciliations.
and triggering TrainJob reconciliations.
- `PreExecution Phase`:
- Extension Point:
- `CustomValidation`: This registers validators for validating any kind of resources to Admission Validating Webhook Servers
when TrainJob is created and updated.
when TrainJob is created and updated.
- `Build Phase`:
- Internal API:
- `ComponentDeployer`: This deploys built components (resources) to the cluster which is performed as a part of reconciler.
- Extension Point:
- `EnforcePodGroupPolicy`: This configures PodGroup specific parameters (e.x, specified in TrainingRuntime `.spec.podGroupPolicy`)
to any kind of resources like PodSpec.
to any kind of resources like PodSpec.
- `EnforceMLPolicy`: This configure MachineLearning framework specific parameters (e.x, specified in TrainingRuntime `.spec.mlPolicy`)
to any kind of resources like PodSpec.
to any kind of resources like PodSpec.
- `ComponentBuilder`: This builds Kubernetes resources leveraging `RuntimeInfo` and `TrainJob`.
`RuntimeInfo` is abstracted objects extracted from runtimes like TrainingRuntime and ClusterTrainingRuntime.
`RuntimeInfo` is abstracted objects extracted from runtimes like TrainingRuntime and ClusterTrainingRuntime.
- `PostExecution Phase`:
- Internal API:
- `SupendedCondition`: Check if TrainJob is suspended state, and then add `Suspended` condition to TrainJob.
- `CreatedConditon`: Check if TrainJob is created state, and then add `Created` condition to TrainJob.
- Extension Point:
- `TerminalCondition`: Check if TrainJob is terminated state, and then add `Complete` condition with
a propagated terminal reason and message from child Jobs to TrainJob.
a propagated terminal reason and message from child Jobs to TrainJob.
## Migration from Kubeflow Training V1

View File

@ -14,9 +14,6 @@ spec:
sshAuthMountPath: /root/.ssh
template:
spec:
# TODO (andreyvelich): Use dependsOn when it is released.
startupPolicy:
startupPolicyOrder: InOrder
replicatedJobs:
- name: launcher
template:
@ -30,8 +27,11 @@ spec:
command:
- /bin/sh
- -c
- "echo 'launcher runs for 10 seconds' && sleep 100"
- "echo 'launcher runs for 15 seconds' && sleep 15"
- name: trainer-node
dependsOn:
- name: launcher
status: Ready
template:
spec:
template:
@ -42,4 +42,4 @@ spec:
command:
- /bin/sh
- -c
- "echo 'launcher runs for 10 seconds' && sleep 100"
- "echo 'trainer node runs for 15 seconds' && sleep 15"

View File

@ -47,7 +47,7 @@ func TestClusterTrainingRuntimeNewObjects(t *testing.T) {
"succeeded to build PodGroup and JobSet with NumNodes from the Runtime and container from the Trainer.": {
clusterTrainingRuntime: testingutil.MakeClusterTrainingRuntimeWrapper("test-runtime").RuntimeSpec(
testingutil.MakeTrainingRuntimeSpecWrapper(testingutil.MakeClusterTrainingRuntimeWrapper("test-runtime").Spec).
InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
WithMLPolicy(
testingutil.MakeMLPolicyWrapper().
WithNumNodes(100).
@ -69,7 +69,7 @@ func TestClusterTrainingRuntimeNewObjects(t *testing.T) {
Obj(),
wantObjs: []runtime.Object{
testingutil.MakeJobSetWrapper(metav1.NamespaceDefault, "test-job").
InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
NumNodes(100).
ContainerTrainer("test:trainjob", []string{"trainjob"}, []string{"trainjob"}, resRequests).
Suspend(true).
@ -78,9 +78,9 @@ func TestClusterTrainingRuntimeNewObjects(t *testing.T) {
Obj(),
testingutil.MakeSchedulerPluginsPodGroup(metav1.NamespaceDefault, "test-job").
ControllerReference(trainer.SchemeGroupVersion.WithKind(trainer.TrainJobKind), "test-job", "uid").
MinMember(101). // 101 replicas = 100 Trainer nodes + 1 Initializer.
MinMember(101). // 101 replicas = 100 Trainer nodes + 1 Initializers.
MinResources(corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("101"), // Every replica has 1 CPU = 101 CPUs in total.
corev1.ResourceCPU: resource.MustParse("102"), // Trainer node has 100 CPUs + 2 CPUs from 2 initializer containers.
}).
SchedulingTimeout(120).
Obj(),

View File

@ -55,7 +55,7 @@ func TestTrainingRuntimeNewObjects(t *testing.T) {
Annotation("conflictAnnotation", "overridden").
RuntimeSpec(
testingutil.MakeTrainingRuntimeSpecWrapper(testingutil.MakeTrainingRuntimeWrapper(metav1.NamespaceDefault, "test-runtime").Spec).
InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
WithMLPolicy(
testingutil.MakeMLPolicyWrapper().
WithNumNodes(100).
@ -79,7 +79,7 @@ func TestTrainingRuntimeNewObjects(t *testing.T) {
Obj(),
wantObjs: []runtime.Object{
testingutil.MakeJobSetWrapper(metav1.NamespaceDefault, "test-job").
InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
NumNodes(30).
ContainerTrainer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
Suspend(true).
@ -92,10 +92,8 @@ func TestTrainingRuntimeNewObjects(t *testing.T) {
ControllerReference(trainer.SchemeGroupVersion.WithKind(trainer.TrainJobKind), "test-job", "uid").
MinMember(31). // 31 replicas = 30 Trainer nodes + 1 Initializer.
MinResources(corev1.ResourceList{
// Every replica has 1 CPU = 31 CPUs in total.
// Initializer uses InitContainers which execute sequentially.
// Thus, the MinResources is equal to the maximum from the initContainer resources.
corev1.ResourceCPU: resource.MustParse("31"),
// Trainer node has 30 CPUs + 2 CPUs from 2 initializer containers.
corev1.ResourceCPU: resource.MustParse("32"),
}).
SchedulingTimeout(120).
Obj(),
@ -172,7 +170,7 @@ func TestTrainingRuntimeNewObjects(t *testing.T) {
"succeeded to build JobSet with dataset and model initializer from the TrainJob.": {
trainingRuntime: testingutil.MakeTrainingRuntimeWrapper(metav1.NamespaceDefault, "test-runtime").RuntimeSpec(
testingutil.MakeTrainingRuntimeSpecWrapper(testingutil.MakeTrainingRuntimeWrapper(metav1.NamespaceDefault, "test-runtime").Spec).
InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
WithMLPolicy(
testingutil.MakeMLPolicyWrapper().
WithNumNodes(100).
@ -221,8 +219,8 @@ func TestTrainingRuntimeNewObjects(t *testing.T) {
testingutil.MakeJobSetWrapper(metav1.NamespaceDefault, "test-job").
NumNodes(100).
ContainerTrainer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
InitContainerDatasetInitializerEnv(
ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
ContainerDatasetInitializerEnv(
[]corev1.EnvVar{
{
Name: jobsetplugin.InitializerEnvStorageUri,
@ -234,7 +232,7 @@ func TestTrainingRuntimeNewObjects(t *testing.T) {
},
},
).
InitContainerDatasetInitializerEnvFrom(
ContainerDatasetInitializerEnvFrom(
[]corev1.EnvFromSource{
{
SecretRef: &corev1.SecretEnvSource{
@ -245,7 +243,7 @@ func TestTrainingRuntimeNewObjects(t *testing.T) {
},
},
).
InitContainerModelInitializerEnv(
ContainerModelInitializerEnv(
[]corev1.EnvVar{
{
Name: jobsetplugin.InitializerEnvStorageUri,
@ -257,7 +255,7 @@ func TestTrainingRuntimeNewObjects(t *testing.T) {
},
},
).
InitContainerModelInitializerEnvFrom(
ContainerModelInitializerEnvFrom(
[]corev1.EnvFromSource{
{
SecretRef: &corev1.SecretEnvSource{

View File

@ -40,13 +40,10 @@ func NewBuilder(jobSet *jobsetv1alpha2ac.JobSetApplyConfiguration) *Builder {
func (b *Builder) Initializer(trainJob *trainer.TrainJob) *Builder {
for i, rJob := range b.Spec.ReplicatedJobs {
if *rJob.Name == constants.JobInitializer {
// TODO (andreyvelich): Currently, we use initContainers for the initializers.
// Once JobSet supports execution policy for the ReplicatedJobs, we should migrate to containers.
// Ref: https://github.com/kubernetes-sigs/jobset/issues/672
for j, container := range rJob.Template.Spec.Template.Spec.InitContainers {
for j, container := range rJob.Template.Spec.Template.Spec.Containers {
// Update values for the dataset initializer container.
if *container.Name == constants.ContainerDatasetInitializer && trainJob.Spec.DatasetConfig != nil {
env := &b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].Env
env := &b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Env
// Update the dataset initializer envs.
if storageUri := trainJob.Spec.DatasetConfig.StorageUri; storageUri != nil {
apply.UpsertEnvVar(env, corev1ac.EnvVar().
@ -56,7 +53,7 @@ func (b *Builder) Initializer(trainJob *trainer.TrainJob) *Builder {
apply.UpsertEnvVars(env, apply.EnvVars(trainJob.Spec.DatasetConfig.Env...))
// Update the dataset initializer secret reference.
if trainJob.Spec.DatasetConfig.SecretRef != nil {
b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].
b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].
WithEnvFrom(corev1ac.EnvFromSource().
WithSecretRef(corev1ac.SecretEnvSource().
WithName(trainJob.Spec.DatasetConfig.SecretRef.Name)))
@ -68,7 +65,7 @@ func (b *Builder) Initializer(trainJob *trainer.TrainJob) *Builder {
trainJob.Spec.ModelConfig != nil &&
trainJob.Spec.ModelConfig.Input != nil {
// Update the model initializer envs.
env := &b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].Env
env := &b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Env
if storageUri := trainJob.Spec.ModelConfig.Input.StorageUri; storageUri != nil {
apply.UpsertEnvVar(env, corev1ac.EnvVar().
WithName(InitializerEnvStorageUri).
@ -77,7 +74,7 @@ func (b *Builder) Initializer(trainJob *trainer.TrainJob) *Builder {
apply.UpsertEnvVars(env, apply.EnvVars(trainJob.Spec.ModelConfig.Input.Env...))
// Update the model initializer secret reference.
if trainJob.Spec.ModelConfig.Input.SecretRef != nil {
b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].
b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].
WithEnvFrom(corev1ac.EnvFromSource().
WithSecretRef(corev1ac.SecretEnvSource().
WithName(trainJob.Spec.ModelConfig.Input.SecretRef.Name)))

View File

@ -15,12 +15,6 @@ const (
)
var (
// This is the temporary container that we use in the initializer ReplicatedJob.
// TODO (andreyvelich): Once JobSet supports execution policy, we can remove it.
ContainerBusyBox corev1.Container = corev1.Container{
Name: "busybox",
Image: "busybox:stable-glibc",
}
// VolumeMountModelInitializer is the volume mount for the model initializer container.
// TODO (andreyvelich): Add validation to check that initializer ReplicatedJob has the following volumes.

View File

@ -55,7 +55,7 @@ func MakeJobSetWrapper(namespace, name string) *JobSetWrapper {
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
InitContainers: []corev1.Container{
Containers: []corev1.Container{
{
Name: constants.ContainerDatasetInitializer,
VolumeMounts: []corev1.VolumeMount{
@ -69,9 +69,6 @@ func MakeJobSetWrapper(namespace, name string) *JobSetWrapper {
},
},
},
Containers: []corev1.Container{
jobsetplugin.ContainerBusyBox,
},
Volumes: []corev1.Volume{
jobsetplugin.VolumeInitializer,
},
@ -82,6 +79,12 @@ func MakeJobSetWrapper(namespace, name string) *JobSetWrapper {
},
{
Name: constants.JobTrainerNode,
DependsOn: []jobsetv1alpha2.DependsOn{
{
Name: constants.JobInitializer,
Status: jobsetv1alpha2.DependencyComplete,
},
},
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
@ -168,15 +171,15 @@ func (j *JobSetWrapper) ContainerTrainerEnv(env []corev1.EnvVar) *JobSetWrapper
return j
}
func (j *JobSetWrapper) InitContainerDatasetModelInitializer(image string, command []string, args []string, res corev1.ResourceList) *JobSetWrapper {
func (j *JobSetWrapper) ContainerDatasetModelInitializer(image string, command []string, args []string, res corev1.ResourceList) *JobSetWrapper {
for i, rJob := range j.Spec.ReplicatedJobs {
if rJob.Name == constants.JobInitializer {
for k, container := range rJob.Template.Spec.Template.Spec.InitContainers {
for k, container := range rJob.Template.Spec.Template.Spec.Containers {
if container.Name == constants.ContainerDatasetInitializer || container.Name == constants.ContainerModelInitializer {
j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].Image = image
j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].Command = command
j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].Args = args
j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].Resources.Requests = res
j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[k].Image = image
j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[k].Command = command
j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[k].Args = args
j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[k].Resources.Requests = res
}
}
}
@ -184,12 +187,12 @@ func (j *JobSetWrapper) InitContainerDatasetModelInitializer(image string, comma
return j
}
func (j *JobSetWrapper) InitContainerDatasetInitializerEnv(env []corev1.EnvVar) *JobSetWrapper {
func (j *JobSetWrapper) ContainerDatasetInitializerEnv(env []corev1.EnvVar) *JobSetWrapper {
for i, rJob := range j.Spec.ReplicatedJobs {
if rJob.Name == constants.JobInitializer {
for k, container := range rJob.Template.Spec.Template.Spec.InitContainers {
for k, container := range rJob.Template.Spec.Template.Spec.Containers {
if container.Name == constants.ContainerDatasetInitializer {
j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].Env = env
j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[k].Env = env
}
}
@ -198,12 +201,12 @@ func (j *JobSetWrapper) InitContainerDatasetInitializerEnv(env []corev1.EnvVar)
return j
}
func (j *JobSetWrapper) InitContainerDatasetInitializerEnvFrom(envFrom []corev1.EnvFromSource) *JobSetWrapper {
func (j *JobSetWrapper) ContainerDatasetInitializerEnvFrom(envFrom []corev1.EnvFromSource) *JobSetWrapper {
for i, rJob := range j.Spec.ReplicatedJobs {
if rJob.Name == constants.JobInitializer {
for k, container := range rJob.Template.Spec.Template.Spec.InitContainers {
for k, container := range rJob.Template.Spec.Template.Spec.Containers {
if container.Name == constants.ContainerDatasetInitializer {
j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].EnvFrom = envFrom
j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[k].EnvFrom = envFrom
}
}
}
@ -211,12 +214,12 @@ func (j *JobSetWrapper) InitContainerDatasetInitializerEnvFrom(envFrom []corev1.
return j
}
func (j *JobSetWrapper) InitContainerModelInitializerEnv(env []corev1.EnvVar) *JobSetWrapper {
func (j *JobSetWrapper) ContainerModelInitializerEnv(env []corev1.EnvVar) *JobSetWrapper {
for i, rJob := range j.Spec.ReplicatedJobs {
if rJob.Name == constants.JobInitializer {
for k, container := range rJob.Template.Spec.Template.Spec.InitContainers {
for k, container := range rJob.Template.Spec.Template.Spec.Containers {
if container.Name == constants.ContainerModelInitializer {
j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].Env = env
j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[k].Env = env
}
}
}
@ -224,12 +227,12 @@ func (j *JobSetWrapper) InitContainerModelInitializerEnv(env []corev1.EnvVar) *J
return j
}
func (j *JobSetWrapper) InitContainerModelInitializerEnvFrom(envFrom []corev1.EnvFromSource) *JobSetWrapper {
func (j *JobSetWrapper) ContainerModelInitializerEnvFrom(envFrom []corev1.EnvFromSource) *JobSetWrapper {
for i, rJob := range j.Spec.ReplicatedJobs {
if rJob.Name == constants.JobInitializer {
for k, container := range rJob.Template.Spec.Template.Spec.InitContainers {
for k, container := range rJob.Template.Spec.Template.Spec.Containers {
if container.Name == constants.ContainerModelInitializer {
j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[k].EnvFrom = envFrom
j.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[k].EnvFrom = envFrom
}
}
}
@ -500,7 +503,7 @@ func MakeTrainingRuntimeWrapper(namespace, name string) *TrainingRuntimeWrapper
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
InitContainers: []corev1.Container{
Containers: []corev1.Container{
{
Name: constants.ContainerDatasetInitializer,
VolumeMounts: []corev1.VolumeMount{
@ -514,9 +517,6 @@ func MakeTrainingRuntimeWrapper(namespace, name string) *TrainingRuntimeWrapper
},
},
},
Containers: []corev1.Container{
jobsetplugin.ContainerBusyBox,
},
Volumes: []corev1.Volume{
jobsetplugin.VolumeInitializer,
},
@ -527,6 +527,12 @@ func MakeTrainingRuntimeWrapper(namespace, name string) *TrainingRuntimeWrapper
},
{
Name: constants.JobTrainerNode,
DependsOn: []jobsetv1alpha2.DependsOn{
{
Name: constants.JobInitializer,
Status: jobsetv1alpha2.DependencyComplete,
},
},
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
@ -605,7 +611,7 @@ func MakeClusterTrainingRuntimeWrapper(name string) *ClusterTrainingRuntimeWrapp
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
InitContainers: []corev1.Container{
Containers: []corev1.Container{
{
Name: constants.ContainerDatasetInitializer,
VolumeMounts: []corev1.VolumeMount{
@ -619,9 +625,6 @@ func MakeClusterTrainingRuntimeWrapper(name string) *ClusterTrainingRuntimeWrapp
},
},
},
Containers: []corev1.Container{
jobsetplugin.ContainerBusyBox,
},
Volumes: []corev1.Volume{
jobsetplugin.VolumeInitializer,
},
@ -632,6 +635,12 @@ func MakeClusterTrainingRuntimeWrapper(name string) *ClusterTrainingRuntimeWrapp
},
{
Name: constants.JobTrainerNode,
DependsOn: []jobsetv1alpha2.DependsOn{
{
Name: constants.JobInitializer,
Status: jobsetv1alpha2.DependencyComplete,
},
},
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
@ -719,15 +728,15 @@ func (s *TrainingRuntimeSpecWrapper) ContainerTrainerEnv(env []corev1.EnvVar) *T
return s
}
func (s *TrainingRuntimeSpecWrapper) InitContainerDatasetModelInitializer(image string, command []string, args []string, res corev1.ResourceList) *TrainingRuntimeSpecWrapper {
func (s *TrainingRuntimeSpecWrapper) ContainerDatasetModelInitializer(image string, command []string, args []string, res corev1.ResourceList) *TrainingRuntimeSpecWrapper {
for i, rJob := range s.Template.Spec.ReplicatedJobs {
if rJob.Name == constants.JobInitializer {
for j, container := range rJob.Template.Spec.Template.Spec.InitContainers {
for j, container := range rJob.Template.Spec.Template.Spec.Containers {
if container.Name == constants.ContainerDatasetInitializer || container.Name == constants.ContainerModelInitializer {
s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].Image = image
s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].Command = command
s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].Args = args
s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.InitContainers[j].Resources.Requests = res
s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Image = image
s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Command = command
s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Args = args
s.Template.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Resources.Requests = res
}
}
}

View File

@ -106,7 +106,7 @@ class TrainerClient:
)
# TODO (andreyvelich): Currently, the labels must be presented.
if metadata.labels:
if metadata.labels and ml_policy.num_nodes:
# Get the Trainer container resources.
resources = None
for job in runtime.spec.template.spec.replicated_jobs: # type: ignore
@ -119,21 +119,24 @@ class TrainerClient:
if container.name == constants.CONTAINER_TRAINER:
resources = container.resources
# TODO (andreyvelich): Currently, only Torch is supported for NumProcPerNode.
num_procs = (
ml_policy.torch.num_proc_per_node if ml_policy.torch else None
)
# Get the accelerator for the Trainer nodes.
# TODO (andreyvelich): Currently, we get the accelerator type from
# the runtime labels.
_, accelerator_count = utils.get_container_devices(
resources, num_procs
)
if accelerator_count != constants.UNKNOWN:
accelerator_count = str(
int(accelerator_count) * int(ml_policy.num_nodes) # type: ignore
)
_, accelerator_count = utils.get_container_devices(resources)
# NumProcPerNode from Torch or MPI overrides accelerator count.
if (
ml_policy.torch
and ml_policy.torch.num_proc_per_node
and ml_policy.torch.num_proc_per_node.isdigit()
):
accelerator_count = ml_policy.torch.num_proc_per_node
elif ml_policy.mpi and ml_policy.mpi.num_proc_per_node:
accelerator_count = str(ml_policy.mpi.num_proc_per_node)
if accelerator_count.isdigit():
accelerator_count = int(accelerator_count) * ml_policy.num_nodes
result.append(
types.Runtime(
@ -148,7 +151,7 @@ class TrainerClient:
if constants.ACCELERATOR_KEY in metadata.labels
else constants.UNKNOWN
),
accelerator_count=accelerator_count,
accelerator_count=str(accelerator_count),
)
)
@ -495,63 +498,60 @@ class TrainerClient:
components=[],
)
# Select Pods created by appropriate JobSet, and Initializer, Launcher, or Trainer Job.
label_selector = "{}={},{} in ({}, {}, {})".format(
constants.JOBSET_NAME_KEY,
name,
constants.REPLICATED_JOB_KEY,
constants.JOB_INITIALIZER,
constants.JOB_LAUNCHER,
constants.JOB_TRAINER_NODE,
)
# Add the TrainJob components, e.g. trainer nodes and initializer.
try:
response = self.core_api.list_namespaced_pod(
namespace,
label_selector=f"{constants.JOBSET_NAME_KEY}={name}",
label_selector=label_selector,
async_req=True,
).get(constants.DEFAULT_TIMEOUT)
for pod in response.items:
labels = pod.metadata.labels
# The Pods might have these containers:
# dataset-initializer, model-initializer, launcher, and trainer
for c in pod.spec.containers:
device, device_count = utils.get_container_devices(c.resources)
if c.name == constants.CONTAINER_TRAINER:
# For Trainer numProcPerNode overrides container resources.
for env in c.env:
if (
env.name == constants.TORCH_ENV_NUM_PROC_PER_NODE
and env.value.isdigit()
):
device_count = env.value
# For Trainer node Job, component name is equal to <Job_Name>-<Index>
component_name = "{}-{}".format(
constants.JOB_TRAINER_NODE,
labels[constants.JOB_INDEX_KEY],
)
elif (
c.name == constants.CONTAINER_DATASET_INITIALIZER
or c.name == constants.CONTAINER_MODEL_INITIALIZER
or c.name == constants.CONTAINER_LAUNCHER
):
# For Initializer or Launcher Job, component is equal to container name.
component_name = c.name
# Component can be Trainer or Initializer.
if labels[constants.REPLICATED_JOB_KEY] == constants.JOB_TRAINER_NODE:
name = f"{constants.JOB_TRAINER_NODE}-{labels[constants.JOB_INDEX_KEY]}"
else:
name = labels[constants.REPLICATED_JOB_KEY]
# TODO (andreyvelich): This can be refactored once we use containers for init Job.
# Initializer Pod must have the dataset and/or model initializer containers.
if name == constants.JOB_INITIALIZER:
device_count = "0"
# TODO (andreyvelich): Currently, we use the InitContainers for initializers.
for container in pod.spec.init_containers:
if (
container.name == constants.CONTAINER_DATASET_INITIALIZER
or container.name == constants.CONTAINER_MODEL_INITIALIZER
):
device, dc = utils.get_container_devices(
container.resources
)
# If resources are not set in containers, we can't get the device.
if device == constants.UNKNOWN:
device_count = device
break
device_count = str(int(device_count) + int(dc))
# Trainer Pod must have the trainer container.
else:
for container in pod.spec.containers:
if container.name == constants.CONTAINER_TRAINER:
num_procs = None
# Get the num procs per node if it is set.
for env in container.env:
if env.name == constants.TORCH_ENV_NUM_PROC_PER_NODE:
num_procs = env.value
device, device_count = utils.get_container_devices(
container.resources, num_procs
)
c = types.Component(
name=name,
component = types.Component(
name=component_name,
status=pod.status.phase if pod.status else None, # type: ignore
device=device,
device_count=device_count,
pod_name=pod.metadata.name,
)
train_job.components.append(c)
train_job.components.append(component)
except multiprocessing.TimeoutError:
raise TimeoutError(
f"Timeout to list {constants.TRAINJOB_KIND}'s components: {namespace}/{name}"

View File

@ -108,6 +108,12 @@ DATASET_PATH = os.path.join(WORKSPACE_PATH, "dataset")
# The path where initializer downloads model.
MODEL_PATH = os.path.join(WORKSPACE_PATH, "model")
# The Job name for the launcher (e.g. mpirun launcher).
JOB_LAUNCHER = "launcher"
# The container name for the launcher
CONTAINER_LAUNCHER = "launcher"
# The Job name for the trainer nodes.
JOB_TRAINER_NODE = "trainer-node"

View File

@ -44,7 +44,6 @@ def get_default_target_namespace() -> str:
def get_container_devices(
resources: Optional[models.IoK8sApiCoreV1ResourceRequirements],
num_procs: Optional[str] = None,
) -> Tuple[str, str]:
"""
Get the device type and device count for the given container.
@ -75,10 +74,6 @@ def get_container_devices(
f"Unknown device type in the container resources: {resources.limits}"
)
# Num procs override the container resources for the Trainer node.
if num_procs and num_procs.isdigit():
device_count = num_procs
return device, device_count

View File

@ -14,6 +14,10 @@ import (
"github.com/kubeflow/trainer/test/util"
)
const (
torchRuntime = "torch-distributed"
)
var _ = ginkgo.Describe("TrainJob e2e", func() {
// Each test runs in a separate namespace.
var ns *corev1.Namespace
@ -45,7 +49,7 @@ var _ = ginkgo.Describe("TrainJob e2e", func() {
ginkgo.It("should create TrainJob with PyTorch runtime reference", func() {
// Create a TrainJob.
trainJob := testingutil.MakeTrainJobWrapper(ns.Name, "e2e-test").
RuntimeRef(trainer.SchemeGroupVersion.WithKind(trainer.ClusterTrainingRuntimeKind), "torch-distributed").
RuntimeRef(trainer.SchemeGroupVersion.WithKind(trainer.ClusterTrainingRuntimeKind), torchRuntime).
Obj()
ginkgo.By("Create a TrainJob with torch-distributed runtime reference", func() {

View File

@ -113,7 +113,7 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() {
Obj(),
).
ContainerTrainer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
PodGroupPolicyCoscheduling(&trainer.CoschedulingPodGroupPolicySource{ScheduleTimeoutSeconds: ptr.To[int32](100)}).
Obj()).
Obj()
@ -134,9 +134,9 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() {
Replicas(1).
NumNodes(100).
ContainerTrainer("test:trainJob", []string{"trainjob"}, []string{"trainjob"}, resRequests).
InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
InitContainerDatasetInitializerEnv([]corev1.EnvVar{{Name: jobsetplugin.InitializerEnvStorageUri, Value: "hf://trainjob-dataset"}}).
InitContainerModelInitializerEnv([]corev1.EnvVar{{Name: jobsetplugin.InitializerEnvStorageUri, Value: "hf://trainjob-model"}}).
ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
ContainerDatasetInitializerEnv([]corev1.EnvVar{{Name: jobsetplugin.InitializerEnvStorageUri, Value: "hf://trainjob-dataset"}}).
ContainerModelInitializerEnv([]corev1.EnvVar{{Name: jobsetplugin.InitializerEnvStorageUri, Value: "hf://trainjob-model"}}).
Suspend(true).
Label("testingKey", "testingVal").
Annotation("testingKey", "testingVal").
@ -150,8 +150,8 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() {
testingutil.MakeSchedulerPluginsPodGroup(ns.Name, trainJobKey.Name).
MinMember(101). // 101 replicas = 100 Trainer nodes + 1 Initializer.
MinResources(corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("101"), // 1 CPU and 4Gi per replica.
corev1.ResourceMemory: resource.MustParse("404Gi"),
corev1.ResourceCPU: resource.MustParse("102"), // 100 CPUs for Trainer + 2 CPUs for Initializer.
corev1.ResourceMemory: resource.MustParse("408Gi"),
}).
SchedulingTimeout(100).
ControllerReference(trainer.SchemeGroupVersion.WithKind(trainer.TrainJobKind), trainJobKey.Name, string(trainJob.UID)).
@ -189,9 +189,9 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() {
Replicas(1).
NumNodes(100).
ContainerTrainer(updatedImageName, []string{"trainjob"}, []string{"trainjob"}, resRequests).
InitContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
InitContainerDatasetInitializerEnv([]corev1.EnvVar{{Name: jobsetplugin.InitializerEnvStorageUri, Value: "hf://trainjob-dataset"}}).
InitContainerModelInitializerEnv([]corev1.EnvVar{{Name: jobsetplugin.InitializerEnvStorageUri, Value: "hf://trainjob-model"}}).
ContainerDatasetModelInitializer("test:runtime", []string{"runtime"}, []string{"runtime"}, resRequests).
ContainerDatasetInitializerEnv([]corev1.EnvVar{{Name: jobsetplugin.InitializerEnvStorageUri, Value: "hf://trainjob-dataset"}}).
ContainerModelInitializerEnv([]corev1.EnvVar{{Name: jobsetplugin.InitializerEnvStorageUri, Value: "hf://trainjob-model"}}).
Suspend(true).
Label("testingKey", "testingVal").
Annotation("testingKey", "testingVal").
@ -205,8 +205,8 @@ var _ = ginkgo.Describe("TrainJob controller", ginkgo.Ordered, func() {
testingutil.MakeSchedulerPluginsPodGroup(ns.Name, trainJobKey.Name).
MinMember(101).
MinResources(corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("101"), // 1 CPU and 4Gi per 101 replica.
corev1.ResourceMemory: resource.MustParse("404Gi"),
corev1.ResourceCPU: resource.MustParse("102"), // 100 CPUs for Trainer + 2 CPUs for Initializer.
corev1.ResourceMemory: resource.MustParse("408Gi"),
}).
SchedulingTimeout(100).
ControllerReference(trainer.SchemeGroupVersion.WithKind(trainer.TrainJobKind), trainJobKey.Name, string(trainJob.UID)).