diff --git a/cmd/kubectl-delivery/app/server.go b/cmd/kubectl-delivery/app/server.go index 8f97d89..680d789 100644 --- a/cmd/kubectl-delivery/app/server.go +++ b/cmd/kubectl-delivery/app/server.go @@ -108,6 +108,10 @@ func Run(opt *options.ServerOption) error { continue } lines := strings.SplitN(string(line), " ", 2) + // When using Intel MPI or MPICH, the hostfile format is hostname:slots, so need spliting the line by colon. + if strings.Contains(lines[0], ":") { + lines = strings.SplitN(string(line), ":", 2) + } if !strings.HasSuffix(lines[0], launcherPodSuffix) { pods = append(pods, lines[0]) } diff --git a/pkg/apis/kubeflow/v1alpha2/types.go b/pkg/apis/kubeflow/v1alpha2/types.go index b29a16a..5395565 100644 --- a/pkg/apis/kubeflow/v1alpha2/types.go +++ b/pkg/apis/kubeflow/v1alpha2/types.go @@ -72,6 +72,11 @@ type MPIJobSpec struct { // active. The policies specified in `RunPolicy` take precedence over // the following fields: `BackoffLimit` and `ActiveDeadlineSeconds`. RunPolicy *common.RunPolicy `json:"runPolicy,omitempty"` + + // MPIDistribution specifies name of the MPI framwork which is used + // Defaults to "OpenMPI" + // Options includes "OpenMPI", "IntelMPI" and "MPICH" + MPIDistribution *MPIDistributionType `json:"mpiDistribution,omitempty"` } // MPIReplicaType is the type for MPIReplica. @@ -84,3 +89,17 @@ const ( // MPIReplicaTypeWorker is the type for worker replicas. MPIReplicaTypeWorker MPIReplicaType = "Worker" ) + +// MPIDistributionType is the type for MPIDistribution. +type MPIDistributionType string + +const( + // MPIDistributionTypeOpenMPI is the type for Open MPI. + MPIDistributionTypeOpenMPI MPIDistributionType = "OpenMPI" + + // MPIDistributionTypeIntelMPI is the type for Intel MPI. + MPIDistributionTypeIntelMPI MPIDistributionType = "IntelMPI" + + // MPIDistributionTypeMPICH is the type for MPICh. + MPIDistributionTypeMPICH MPIDistributionType = "MPICH" +) \ No newline at end of file diff --git a/pkg/controllers/kubectl_delivery/controller.go b/pkg/controllers/kubectl_delivery/controller.go index bb36c5f..50fbd78 100644 --- a/pkg/controllers/kubectl_delivery/controller.go +++ b/pkg/controllers/kubectl_delivery/controller.go @@ -15,7 +15,9 @@ package kubectl_delivery import ( + "bufio" "fmt" + "os" "sync" "time" @@ -114,7 +116,10 @@ func (c *KubectlDeliveryController) Run(threadiness int, stopCh <-chan struct{}) if ok := cache.WaitForCacheSync(stopCh, c.podSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } + // Copy a list of pods to get their ip address + var workerPods []string for name := range c.watchedPods { + workerPods = append(workerPods, name) pod, err := c.podLister.Pods(c.namespace).Get(name) if err != nil { continue @@ -139,6 +144,9 @@ func (c *KubectlDeliveryController) Run(threadiness int, stopCh <-chan struct{}) return nil case <-ticker.C: if len(c.watchedPods) == 0 { + if err := c.generateHosts("/etc/hosts", "/opt/kube/hosts", workerPods); err != nil { + return fmt.Errorf("Error generating hosts file: %v", err) + } klog.Info("Shutting down workers") return nil } @@ -147,6 +155,43 @@ func (c *KubectlDeliveryController) Run(threadiness int, stopCh <-chan struct{}) } } +// generateHosts will get and record all workers' ip address in a hosts-format +// file, which would be sent to each worker pod before launching the remote +// process manager. It will create and write file to filePath, and will use +// pod lister to get ip address, so syncing is required before this. +func (c *KubectlDeliveryController) generateHosts(localHostsPath string, filePath string, workerPods []string) error { + var hosts string + // First, open local hosts file to read launcher pod ip + fd, err := os.Open(localHostsPath) + if err != nil { + return fmt.Errorf("can't open file[%s]: %v", localHostsPath, err) + } + defer fd.Close() + // Read the last line of hosts file -- the ip address of localhost + scanner := bufio.NewScanner(fd) + for scanner.Scan() { + hosts = scanner.Text() + } + // Use client-go to find up ip addresses of each node + for index := range workerPods { + pod, err := c.podLister.Pods(c.namespace).Get(workerPods[index]) + if err != nil { + return fmt.Errorf("can't get IP address of node[%s]", workerPods[index]) + } + hosts = fmt.Sprintf("%s\n%s\t%s", hosts, pod.Status.PodIP, pod.Name) + } + // Write the hosts-format ip record to volume, and will be sent to worker later. + fp, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return fmt.Errorf("can't create file[%s]: %v", filePath, err) + } + defer fp.Close() + if _, err := fp.WriteString(hosts); err != nil { + return fmt.Errorf("can't write file[%s]: %v", filePath, err) + } + return nil +} + // runWorker is a long-running function that will continually call the // processNextWorkItem function in order to read and process a message on the // work queue. diff --git a/pkg/controllers/kubectl_delivery/controller_test.go b/pkg/controllers/kubectl_delivery/controller_test.go index 71e789f..2f51714 100644 --- a/pkg/controllers/kubectl_delivery/controller_test.go +++ b/pkg/controllers/kubectl_delivery/controller_test.go @@ -15,6 +15,10 @@ package kubectl_delivery import ( + "io/ioutil" + "os" + "path/filepath" + "strconv" "testing" corev1 "k8s.io/api/core/v1" @@ -38,3 +42,57 @@ func TestWaitingPodRunning(t *testing.T) { f.setUpPods(fakePod) f.run(namespace, podName) } + +// Test hosts file generating function +func TestGeneratingHostsFile(t *testing.T) { + namespace := "default" + podNames := []string{"test", "tester-2", "worker_3", "pod4"} + // content of fake local hosts file + content := []byte(`# this line is a comment + 127.0.0.1 localhost + ::1 localhost + 234.98.76.54 uselesshost + 10.234.56.78 launcher.fullname.test launcher`) + + // content of excepted outputs + exceptedHosts := make(map[string]string) + exceptedHosts["launcher"] = "10.234.56.78" + exceptedHosts["launcher.fullname.test"] = "10.234.56.78" + f := newFixture(t) + // add fake worker pods + fakePod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + }, + } + fakePod.Status.Phase = corev1.PodRunning + for index := range podNames { + fakePod.Status.PodIP = "123.123.123." + strconv.Itoa(index) + fakePod.ObjectMeta.Name = podNames[index] + exceptedHosts[fakePod.ObjectMeta.Name] = fakePod.Status.PodIP + f.setUpPods(fakePod.DeepCopy()) + } + // set up temp directory for testing about files + p, tmphf := f.setUpTmpDir("hostsGenerating", content) + defer os.RemoveAll(p) // clean up the temp directory + tmpof := filepath.Join(p, "output") + c, _ := f.newController(namespace, podNames) + // generate new hosts file + err := c.generateHosts(tmphf, tmpof, podNames) + if err != nil { + t.Errorf("Error, cannot generating hosts of worker pods, errs: %v", err) + } + // get the output file content + outputContent, err := ioutil.ReadFile(tmpof) + if err != nil { + t.Fatal(err) + } + // slice the content of output to avoid space/tab interference + generatedHosts := f.getResolvedHosts(outputContent) + // check the output + for hostname, exceptedIP := range exceptedHosts { + if resolvedIP, ok := generatedHosts[hostname]; !ok || resolvedIP != exceptedIP { + t.Errorf("Error, generated hosts file incorrect. Host: %s, excepted: %s, resolved: %s", hostname, exceptedIP, resolvedIP) + } + } +} diff --git a/pkg/controllers/kubectl_delivery/utils_test.go b/pkg/controllers/kubectl_delivery/utils_test.go index 24a9142..8c58a00 100644 --- a/pkg/controllers/kubectl_delivery/utils_test.go +++ b/pkg/controllers/kubectl_delivery/utils_test.go @@ -15,8 +15,13 @@ package kubectl_delivery import ( + "bufio" "fmt" + "io/ioutil" + "os" "path" + "path/filepath" + "strings" "testing" "time" @@ -107,3 +112,44 @@ func (f *fixture) setUpPods(p *corev1.Pod) { f.podLister = append(f.podLister, p) f.kubeObjects = append(f.kubeObjects, p) } + +// getResolvedHosts will resolve the hosts file to a map object, +// with the hostname as key and IP address as value +func (f *fixture) getResolvedHosts(contentBytes []byte) map[string]string { + // create a scanner to read content line by line + hostRecords := make(map[string]string) + contentStrReader := strings.NewReader(string(contentBytes)) + scanner := bufio.NewScanner(contentStrReader) + for scanner.Scan() { + line := scanner.Text() + if line[0] == '#' { // skip the comment line + continue + } + lines := strings.Fields(line) + if len(lines) == 0 { // skip the space line + continue + } + if len(lines) == 1 { // the format must has some mistakes + f.t.Error("Error, generated hosts file has wrong format.") + continue + } + for i := 1; i < len(lines); i++ { + hostRecords[lines[i]] = lines[0] // use map to record hosts + } + } + return hostRecords +} + +// setUpTmpDir will create a temp directory and create a temp hosts file +// with provided file content, and return the path of the directory. +func (f *fixture) setUpTmpDir(dirName string, content []byte) (string, string) { + p, err := ioutil.TempDir(os.TempDir(), "hosts") + if err != nil { + f.t.Fatal(err) + } + tmphf := filepath.Join(p, "hosts") + if err := ioutil.WriteFile(tmphf, content, 0644); err != nil { + f.t.Fatal(err) + } + return p, tmphf +} diff --git a/pkg/controllers/v1alpha2/mpi_job_controller.go b/pkg/controllers/v1alpha2/mpi_job_controller.go index 0e559e2..611dce3 100644 --- a/pkg/controllers/v1alpha2/mpi_job_controller.go +++ b/pkg/controllers/v1alpha2/mpi_job_controller.go @@ -958,15 +958,27 @@ func (c *MPIJobController) doUpdateJobStatus(mpiJob *kubeflow.MPIJob) error { // resource. It also sets the appropriate OwnerReferences on the resource so // handleObject can discover the MPIJob resource that 'owns' it. func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32) *corev1.ConfigMap { + // This part closely related to specific ssh commands. + // It is very likely to fail due to the version change of the MPI framework. + // Attempt to automatically filter prefix parameters by detecting "-" matches. + // In order to enable IntelMPI and MVAPICH2 to parse pod names, in the Init container, + // a hosts file containing all workers is generated based on the pods list. + // Will use kubectl to send it to the workers and append it to the end of the original hosts file. kubexec := fmt.Sprintf(`#!/bin/sh set -x POD_NAME=$1 +while [ ${POD_NAME%%${POD_NAME#?}} = "-" ] +do shift -%s/kubectl exec ${POD_NAME}`, kubectlMountPath) +POD_NAME=$1 +done +shift +%s/kubectl cp %s/hosts ${POD_NAME}:/etc/hosts_of_nodes +%s/kubectl exec ${POD_NAME}`, kubectlMountPath, kubectlMountPath, kubectlMountPath) if len(mpiJob.Spec.MainContainer) > 0 { kubexec = fmt.Sprintf("%s --container %s", kubexec, mpiJob.Spec.MainContainer) } - kubexec = fmt.Sprintf("%s -- /bin/sh -c \"$*\"", kubexec) + kubexec = fmt.Sprintf("%s -- /bin/sh -c \"cat /etc/hosts_of_nodes >> /etc/hosts && $*\"", kubexec) // If no processing unit is specified, default to 1 slot. slots := 1 @@ -974,8 +986,16 @@ shift slots = int(*mpiJob.Spec.SlotsPerWorker) } var buffer bytes.Buffer + // For the different MPI frameworks, the format of the hostfile file is inconsistent. + // For Intel MPI and MVAPICH2, use ":" syntax to indicate how many operating slots the current node has. + // But for Open MPI, use "slots=" syntax to achieve this function. for i := 0; i < int(workerReplicas); i++ { - buffer.WriteString(fmt.Sprintf("%s%s-%d slots=%d\n", mpiJob.Name, workerSuffix, i, slots)) + mpiDistribution := mpiJob.Spec.MPIDistribution + if mpiDistribution != nil && (*mpiDistribution == kubeflow.MPIDistributionTypeIntelMPI || *mpiDistribution == kubeflow.MPIDistributionTypeMPICH) { + buffer.WriteString(fmt.Sprintf("%s%s-%d:%d\n", mpiJob.Name, workerSuffix, i, slots)) + } else { + buffer.WriteString(fmt.Sprintf("%s%s-%d slots=%d\n", mpiJob.Name, workerSuffix, i, slots)) + } } return &corev1.ConfigMap{ @@ -1278,13 +1298,28 @@ func (c *MPIJobController) newLauncher(mpiJob *kubeflow.MPIJob, kubectlDeliveryI return nil } container := podSpec.Spec.Containers[0] + // Different MPI frameworks use different environment variables + // to specify the path of the remote task launcher and hostfile file. + mpiRshExecPathEnvName := "OMPI_MCA_plm_rsh_agent" + mpiHostfilePathEnvName := "OMPI_MCA_orte_default_hostfile" + // If the MPIDistribution is not specificed as the "IntelMPI" or "MPICH", + // then think that the default "OpenMPI" will be used. + if mpiJob.Spec.MPIDistribution != nil { + if *mpiJob.Spec.MPIDistribution == kubeflow.MPIDistributionTypeIntelMPI { + mpiRshExecPathEnvName = "I_MPI_HYDRA_BOOTSTRAP_EXEC" + mpiHostfilePathEnvName = "I_MPI_HYDRA_HOST_FILE" + } else if *mpiJob.Spec.MPIDistribution == kubeflow.MPIDistributionTypeMPICH { + mpiRshExecPathEnvName = "HYDRA_LAUNCHER_EXEC" + mpiHostfilePathEnvName = "HYDRA_HOST_FILE" + } + } container.Env = append(container.Env, corev1.EnvVar{ - Name: "OMPI_MCA_plm_rsh_agent", + Name: mpiRshExecPathEnvName, Value: fmt.Sprintf("%s/%s", configMountPath, kubexecScriptName), }, corev1.EnvVar{ - Name: "OMPI_MCA_orte_default_hostfile", + Name: mpiHostfilePathEnvName, Value: fmt.Sprintf("%s/%s", configMountPath, hostfileName), }, // We overwrite these environment variables so that users will not