Revert all codes related to batch jobs

This commit is contained in:
TommyLike 2019-04-19 14:55:29 +08:00
parent 40edc7f963
commit 4b74ec8f29
74 changed files with 1194 additions and 6099 deletions

View File

@ -1,37 +1,27 @@
dist: xenial
language: go
go:
- '1.11.x'
sudo: required
env:
- HOME=/home/travis
services:
- docker
go:
- "1.11"
go_import_path: github.com/kubernetes-sigs/kube-batch
jobs:
include:
- stage: Golint & Gofmt
before_script:
- go get -u golang.org/x/lint/golint
script:
- make verify
- stage: E2E Tests
before_script:
# Download kubectl
- sudo apt-get update && sudo apt-get install -y apt-transport-https
- curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
- echo "deb https://apt.kubernetes.io/ kubernetes-xenial main" | sudo tee -a /etc/apt/sources.list.d/kubernetes.list
- sudo apt-get update
- sudo apt-get install -y kubectl
# Download kind binary (0.2.0)
- sudo curl -o /usr/local/bin/kind -L https://github.com/kubernetes-sigs/kind/releases/download/0.2.0/kind-linux-amd64
- sudo chmod +x /usr/local/bin/kind
script:
- make vkctl
- make images
- make e2e-kind
after_failure:
# Echo logs and upload
- test -f volcano-admission.log && echo "******<<admission logs>>******" && cat volcano-admission.log
- test -f volcano-controller.log && echo "******<<controller logs>>******" && cat volcano-controller.log
- test -f volcano-scheduler.log && echo "******<<scheduler logs>>******" && cat volcano-scheduler.log
install:
- go get -u golang.org/x/lint/golint
before_script:
- export TEST_LOG_LEVEL=4
script:
- make verify
- make
- make run-test
- make e2e

View File

@ -4,61 +4,36 @@ REPO_PATH=github.com/kubernetes-sigs/kube-batch
GitSHA=`git rev-parse HEAD`
Date=`date "+%Y-%m-%d %H:%M:%S"`
REL_OSARCH="linux/amd64"
IMAGE_PREFIX=kubesigs/vk
LD_FLAGS=" \
-X '${REPO_PATH}/pkg/version.GitSHA=${GitSHA}' \
-X '${REPO_PATH}/pkg/version.Built=${Date}' \
-X '${REPO_PATH}/pkg/version.Version=${RELEASE_VER}'"
.EXPORT_ALL_VARIABLES:
all: kube-batch vk-controllers vk-admission vkctl
kube-batch: init
go build -ldflags ${LD_FLAGS} -o=${BIN_DIR}/kube-batch ./cmd/kube-batch
vk-controllers: init
go build -ldflags ${LD_FLAGS} -o=${BIN_DIR}/vk-controllers ./cmd/controllers
vk-admission: init
go build -ldflags ${LD_FLAGS} -o=${BIN_DIR}/vk-admission ./cmd/admission
vkctl: init
go build -ldflags ${LD_FLAGS} -o=${BIN_DIR}/vkctl ./cmd/cli
verify: generate-code
hack/verify-gofmt.sh
hack/verify-goimports.sh
hack/verify-golint.sh
hack/verify-gencode.sh
hack/verify-boilerplate.sh
hack/verify-spelling.sh
init:
mkdir -p ${BIN_DIR}
generate-code:
go build -o ${BIN_DIR}/deepcopy-gen ./cmd/deepcopy-gen/
${BIN_DIR}/deepcopy-gen -i ./pkg/apis/scheduling/v1alpha1/ -O zz_generated.deepcopy
rel_bins:
go get github.com/mitchellh/gox
#Build kube-batch binary
CGO_ENABLED=0 gox -osarch=${REL_OSARCH} -ldflags ${LD_FLAGS} \
-output=${BIN_DIR}/{{.OS}}/{{.Arch}}/kube-batch ./cmd/kube-batch
#Build job controller & job admission
#TODO: Add version support in job controller and admission to make LD_FLAGS work
for name in controllers admission; do\
CGO_ENABLED=0 gox -osarch=${REL_OSARCH} -ldflags ${LD_FLAGS} -output ${BIN_DIR}/{{.OS}}/{{.Arch}}/vk-$$name ./cmd/$$name; \
done
images: rel_bins
#Build kube-batch images
cp ${BIN_DIR}/${REL_OSARCH}/kube-batch ./deployment/images/
cp ./_output/bin/${REL_OSARCH}/kube-batch ./deployment/images/
docker build ./deployment/images -t kubesigs/kube-batch:${RELEASE_VER}
rm -f ./deployment/images/kube-batch
#Build job controller and admission images
for name in controllers admission; do\
cp ${BIN_DIR}/${REL_OSARCH}/vk-$$name ./deployment/images/$$name/; \
docker build --no-cache -t $(IMAGE_PREFIX)-$$name:$(RELEASE_VER) ./deployment/images/$$name; \
rm deployment/images/$$name/vk-$$name; \
done
run-test:
hack/make-rules/test.sh $(WHAT) $(TESTS)
@ -66,12 +41,6 @@ run-test:
e2e: kube-batch
hack/run-e2e.sh
generate-code:
./hack/update-gencode.sh
e2e-kind: vkctl images
./hack/run-e2e-kind.sh
coverage:
KUBE_COVER=y hack/make-rules/test.sh $(WHAT) $(TESTS)

View File

@ -1,154 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options
import (
"encoding/json"
"flag"
"fmt"
"k8s.io/api/admissionregistration/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
admissionregistrationv1beta1client "k8s.io/client-go/kubernetes/typed/admissionregistration/v1beta1"
)
// Config contains the admission-controller server config.
type Config struct {
Master string
Kubeconfig string
CertFile string
KeyFile string
CaCertFile string
Port int
MutateWebhookConfigName string
MutateWebhookName string
ValidateWebhookConfigName string
ValidateWebhookName string
}
// NewConfig returns a Config struct
func NewConfig() *Config {
c := Config{}
return &c
}
// AddFlags adds flags for admission
func (c *Config) AddFlags() {
flag.StringVar(&c.Master, "master", c.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
flag.StringVar(&c.Kubeconfig, "kubeconfig", c.Kubeconfig, "Path to kubeconfig file with authorization and master location information.")
flag.StringVar(&c.CertFile, "tls-cert-file", c.CertFile, ""+
"File containing the default x509 Certificate for HTTPS. (CA cert, if any, concatenated "+
"after server cert).")
flag.StringVar(&c.KeyFile, "tls-private-key-file", c.KeyFile, "File containing the default x509 private key matching --tls-cert-file.")
flag.StringVar(&c.CaCertFile, "ca-cert-file", c.CaCertFile, "File containing the x509 Certificate for HTTPS.")
flag.IntVar(&c.Port, "port", 443, "the port used by admission-controller-server.")
flag.StringVar(&c.MutateWebhookConfigName, "mutate-webhook-config-name", "volcano-mutate-job",
"Name of the mutatingwebhookconfiguration resource in Kubernetes.")
flag.StringVar(&c.MutateWebhookName, "mutate-webhook-name", "mutatejob.volcano.sh",
"Name of the webhook entry in the webhook config.")
flag.StringVar(&c.ValidateWebhookConfigName, "validate-webhook-config-name", "volcano-validate-job",
"Name of the mutatingwebhookconfiguration resource in Kubernetes.")
flag.StringVar(&c.ValidateWebhookName, "validate-webhook-name", "validatejob.volcano.sh",
"Name of the webhook entry in the webhook config.")
}
// CheckPortOrDie check the valid port range
func (c *Config) CheckPortOrDie() error {
if c.Port < 1 || c.Port > 65535 {
return fmt.Errorf("the port should be in the range of 1 and 65535")
}
return nil
}
// PatchMutateWebhookConfig patches a CA bundle into the specified webhook config.
func PatchMutateWebhookConfig(client admissionregistrationv1beta1client.MutatingWebhookConfigurationInterface,
webhookConfigName, webhookName string, caBundle []byte) error {
config, err := client.Get(webhookConfigName, metav1.GetOptions{})
if err != nil {
return err
}
prev, err := json.Marshal(config)
if err != nil {
return err
}
found := false
for i, w := range config.Webhooks {
if w.Name == webhookName {
config.Webhooks[i].ClientConfig.CABundle = caBundle[:]
found = true
break
}
}
if !found {
return apierrors.NewInternalError(fmt.Errorf(
"webhook entry %q not found in config %q", webhookName, webhookConfigName))
}
curr, err := json.Marshal(config)
if err != nil {
return err
}
patch, err := strategicpatch.CreateTwoWayMergePatch(prev, curr, v1beta1.MutatingWebhookConfiguration{})
if err != nil {
return err
}
if string(patch) != "{}" {
_, err = client.Patch(webhookConfigName, types.StrategicMergePatchType, patch)
}
return err
}
// PatchValidateWebhookConfig patches a CA bundle into the specified webhook config.
func PatchValidateWebhookConfig(client admissionregistrationv1beta1client.ValidatingWebhookConfigurationInterface,
webhookConfigName, webhookName string, caBundle []byte) error {
config, err := client.Get(webhookConfigName, metav1.GetOptions{})
if err != nil {
return err
}
prev, err := json.Marshal(config)
if err != nil {
return err
}
found := false
for i, w := range config.Webhooks {
if w.Name == webhookName {
config.Webhooks[i].ClientConfig.CABundle = caBundle[:]
found = true
break
}
}
if !found {
return apierrors.NewInternalError(fmt.Errorf(
"webhook entry %q not found in config %q", webhookName, webhookConfigName))
}
curr, err := json.Marshal(config)
if err != nil {
return err
}
patch, err := strategicpatch.CreateTwoWayMergePatch(prev, curr, v1beta1.ValidatingWebhookConfiguration{})
if err != nil {
return err
}
if string(patch) != "{}" {
_, err = client.Patch(webhookConfigName, types.StrategicMergePatchType, patch)
}
return err
}

View File

@ -1,122 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app
import (
"crypto/tls"
"encoding/json"
"io/ioutil"
"net/http"
"github.com/golang/glog"
"k8s.io/api/admission/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
appConf "github.com/kubernetes-sigs/kube-batch/cmd/admission/app/options"
admissioncontroller "github.com/kubernetes-sigs/kube-batch/pkg/admission"
)
const (
// CONTENTTYPE type of request content
CONTENTTYPE = "Content-Type"
// APPLICATIONJSON json application content
APPLICATIONJSON = "application/json"
)
// GetClient gets a clientset with in-cluster config.
func GetClient(c *appConf.Config) *kubernetes.Clientset {
var config *rest.Config
var err error
if c.Master != "" || c.Kubeconfig != "" {
config, err = clientcmd.BuildConfigFromFlags(c.Master, c.Kubeconfig)
} else {
config, err = rest.InClusterConfig()
}
if err != nil {
glog.Fatal(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
glog.Fatal(err)
}
return clientset
}
// ConfigTLS configure TLs certificates
func ConfigTLS(config *appConf.Config, clientset *kubernetes.Clientset) *tls.Config {
sCert, err := tls.LoadX509KeyPair(config.CertFile, config.KeyFile)
if err != nil {
glog.Fatal(err)
}
return &tls.Config{
Certificates: []tls.Certificate{sCert},
}
}
// Serve the http Request for admission controller
func Serve(w http.ResponseWriter, r *http.Request, admit admissioncontroller.AdmitFunc) {
var body []byte
if r.Body != nil {
if data, err := ioutil.ReadAll(r.Body); err == nil {
body = data
}
}
// verify the content type is accurate
contentType := r.Header.Get(CONTENTTYPE)
if contentType != APPLICATIONJSON {
glog.Errorf("contentType=%s, expect application/json", contentType)
return
}
var reviewResponse *v1beta1.AdmissionResponse
ar := v1beta1.AdmissionReview{}
deserializer := admissioncontroller.Codecs.UniversalDeserializer()
if _, _, err := deserializer.Decode(body, nil, &ar); err != nil {
reviewResponse = admissioncontroller.ToAdmissionResponse(err)
} else {
reviewResponse = admit(ar)
}
glog.V(3).Infof("sending response: %v", reviewResponse)
response := createResponse(reviewResponse, &ar)
resp, err := json.Marshal(response)
if err != nil {
glog.Error(err)
}
if _, err := w.Write(resp); err != nil {
glog.Error(err)
}
}
func createResponse(reviewResponse *v1beta1.AdmissionResponse, ar *v1beta1.AdmissionReview) v1beta1.AdmissionReview {
response := v1beta1.AdmissionReview{}
if reviewResponse != nil {
response.Response = reviewResponse
response.Response.UID = ar.Request.UID
}
// reset the Object and OldObject, they are not needed in a response.
ar.Request.Object = runtime.RawExtension{}
ar.Request.OldObject = runtime.RawExtension{}
return response
}

View File

@ -1,75 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"flag"
"fmt"
"io/ioutil"
"net/http"
"os"
"strconv"
"github.com/kubernetes-sigs/kube-batch/cmd/admission/app"
appConf "github.com/kubernetes-sigs/kube-batch/cmd/admission/app/options"
admissioncontroller "github.com/kubernetes-sigs/kube-batch/pkg/admission"
)
func serveJobs(w http.ResponseWriter, r *http.Request) {
app.Serve(w, r, admissioncontroller.AdmitJobs)
}
func serveMutateJobs(w http.ResponseWriter, r *http.Request) {
app.Serve(w, r, admissioncontroller.MutateJobs)
}
func main() {
config := appConf.NewConfig()
config.AddFlags()
flag.Parse()
http.HandleFunc(admissioncontroller.AdmitJobPath, serveJobs)
http.HandleFunc(admissioncontroller.MutateJobPath, serveMutateJobs)
if err := config.CheckPortOrDie(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
addr := ":" + strconv.Itoa(config.Port)
clientset := app.GetClient(config)
caCertPem, err := ioutil.ReadFile(config.CaCertFile)
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
} else {
// patch caBundle in webhook
if err = appConf.PatchMutateWebhookConfig(clientset.AdmissionregistrationV1beta1().MutatingWebhookConfigurations(),
config.MutateWebhookConfigName, config.MutateWebhookName, caCertPem); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
}
if err = appConf.PatchValidateWebhookConfig(clientset.AdmissionregistrationV1beta1().ValidatingWebhookConfigurations(),
config.ValidateWebhookConfigName, config.ValidateWebhookName, caCertPem); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
}
}
server := &http.Server{
Addr: addr,
TLSConfig: app.ConfigTLS(config, clientset),
}
server.ListenAndServeTLS("", "")
}

View File

@ -1,72 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"github.com/spf13/cobra"
"github.com/kubernetes-sigs/kube-batch/pkg/cli/job"
)
func buildJobCmd() *cobra.Command {
jobCmd := &cobra.Command{
Use: "job",
Short: "Operations related to the volcano job",
}
jobRunCmd := &cobra.Command{
Use: "run",
Short: "creates jobs",
Run: func(cmd *cobra.Command, args []string) {
checkError(cmd, job.RunJob())
},
}
job.InitRunFlags(jobRunCmd)
jobCmd.AddCommand(jobRunCmd)
jobListCmd := &cobra.Command{
Use: "list",
Short: "lists all the jobs",
Run: func(cmd *cobra.Command, args []string) {
checkError(cmd, job.ListJobs())
},
}
job.InitListFlags(jobListCmd)
jobCmd.AddCommand(jobListCmd)
jobSuspendCmd := &cobra.Command{
Use: "suspend",
Short: "creates a job command to abort job",
Run: func(cmd *cobra.Command, args []string) {
checkError(cmd, job.SuspendJob())
},
}
job.InitSuspendFlags(jobSuspendCmd)
jobCmd.AddCommand(jobSuspendCmd)
jobResumeCmd := &cobra.Command{
Use: "resume",
Short: "creates command to resume job",
Run: func(cmd *cobra.Command, args []string) {
checkError(cmd, job.ResumeJob())
},
}
job.InitResumeFlags(jobResumeCmd)
jobCmd.AddCommand(jobResumeCmd)
return jobCmd
}

View File

@ -1,62 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"fmt"
"time"
"github.com/golang/glog"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/wait"
)
var logFlushFreq = pflag.Duration("log-flush-frequency", 5*time.Second, "Maximum number of seconds between log flushes")
func main() {
// flag.InitFlags()
// The default glog flush interval is 30 seconds, which is frighteningly long.
go wait.Until(glog.Flush, *logFlushFreq, wait.NeverStop)
defer glog.Flush()
rootCmd := cobra.Command{
Use: "vkctl",
}
rootCmd.AddCommand(buildJobCmd())
rootCmd.AddCommand(buildQueueCmd())
if err := rootCmd.Execute(); err != nil {
fmt.Printf("Failed to execute command: %v", err)
}
}
func checkError(cmd *cobra.Command, err error) {
if err != nil {
msg := "Failed to"
// Ignore the root command.
for cur := cmd; cur.Parent() != nil; cur = cur.Parent() {
msg = msg + fmt.Sprintf(" %s", cur.Name())
}
fmt.Printf("%s: %v\n", msg, err)
}
}

View File

@ -1,54 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options
import (
"fmt"
"github.com/spf13/pflag"
)
// ServerOption is the main context object for the controller manager.
type ServerOption struct {
Master string
Kubeconfig string
EnableLeaderElection bool
LockObjectNamespace string
}
// NewServerOption creates a new CMServer with a default config.
func NewServerOption() *ServerOption {
s := ServerOption{}
return &s
}
// AddFlags adds flags for a specific CMServer to the specified FlagSet
func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.")
fs.BoolVar(&s.EnableLeaderElection, "leader-elect", s.EnableLeaderElection, "Start a leader election client and gain leadership before "+
"executing the main loop. Enable this when running replicated kar-scheduler for high availability.")
fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", s.LockObjectNamespace, "Define the namespace of the lock object.")
}
// CheckOptionOrDie check lock-object-namespace when LeaderElection is enabled
func (s *ServerOption) CheckOptionOrDie() error {
if s.EnableLeaderElection && s.LockObjectNamespace == "" {
return fmt.Errorf("lock-object-namespace must not be nil when LeaderElection is enabled")
}
return nil
}

View File

@ -1,53 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"fmt"
"os"
"time"
"github.com/golang/glog"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/flag"
"github.com/kubernetes-sigs/kube-batch/cmd/controllers/app"
"github.com/kubernetes-sigs/kube-batch/cmd/controllers/app/options"
)
var logFlushFreq = pflag.Duration("log-flush-frequency", 5*time.Second, "Maximum number of seconds between log flushes")
func main() {
s := options.NewServerOption()
s.AddFlags(pflag.CommandLine)
flag.InitFlags()
if err := s.CheckOptionOrDie(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
// The default glog flush interval is 30 seconds, which is frighteningly long.
go wait.Until(glog.Flush, *logFlushFreq, wait.NeverStop)
defer glog.Flush()
if err := app.Run(s); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}

View File

@ -1,19 +0,0 @@
# Copyright 2019 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM alpine:latest
ADD vk-admission /vk-admission
ENTRYPOINT ["/vk-admission"]

View File

@ -1,19 +0,0 @@
# Copyright 2019 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM alpine:latest
ADD vk-controllers /vk-controllers
ENTRYPOINT ["/vk-controllers"]

View File

@ -1,4 +0,0 @@
name: volcano
version: 0.0.1
description: volcano
apiVersion: v1

View File

@ -1,11 +0,0 @@
actions: "reclaim, allocate, backfill, preempt"
tiers:
- plugins:
- name: priority
- name: gang
- name: conformance
- plugins:
- name: drf
- name: predicates
- name: proportion
- name: nodeorder

View File

@ -1,11 +0,0 @@
actions: "allocate, backfill"
tiers:
- plugins:
- name: priority
- name: gang
- name: conformance
- plugins:
- name: drf
- name: predicates
- name: proportion
- name: nodeorder

View File

@ -1,133 +0,0 @@
#!/bin/bash
#TODO: this file is used for release, should not place it here
set -e
usage() {
cat <<EOF
Generate certificate suitable for use with an admission controller service.
This script uses k8s' CertificateSigningRequest API to a generate a
certificate signed by k8s CA suitable for use with webhook
services. This requires permissions to create and approve CSR. See
https://kubernetes.io/docs/tasks/tls/managing-tls-in-a-cluster for
detailed explanation and additional instructions.
The server key/cert k8s CA cert are stored in a k8s secret.
usage: ${0} [OPTIONS]
The following flags are required.
--service Service name of webhook.
--namespace Namespace where webhook service and secret reside.
--secret Secret name for CA certificate and server certificate/key pair.
EOF
exit 0
}
while [[ $# -gt 0 ]]; do
case ${1} in
--service)
service="$2"
shift
;;
--secret)
secret="$2"
shift
;;
--namespace)
namespace="$2"
shift
;;
*)
usage
;;
esac
shift
done
if [ -z ${service} ]; then
echo "'--service' must be specified"
exit 1
fi
[ -z ${secret} ] && secret=volcano-admission-secret
[ -z ${namespace} ] && namespace=default
if [ ! -x "$(command -v openssl)" ]; then
echo "openssl not found"
exit 1
fi
csrName=${service}.${namespace}
tmpdir=$(mktemp -d)
echo "creating certs in tmpdir ${tmpdir} "
cat <<EOF >> ${tmpdir}/csr.conf
[req]
req_extensions = v3_req
distinguished_name = req_distinguished_name
[req_distinguished_name]
[ v3_req ]
basicConstraints = CA:FALSE
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
extendedKeyUsage = serverAuth
subjectAltName = @alt_names
[alt_names]
DNS.1 = ${service}
DNS.2 = ${service}.${namespace}
DNS.3 = ${service}.${namespace}.svc
EOF
openssl genrsa -out ${tmpdir}/server-key.pem 2048
openssl req -new -key ${tmpdir}/server-key.pem -subj "/CN=${service}.${namespace}.svc" -out ${tmpdir}/server.csr -config ${tmpdir}/csr.conf
# clean-up any previously created CSR for our service. Ignore errors if not present.
kubectl delete csr ${csrName} 2>/dev/null || true
# create server cert/key CSR and send to k8s API
cat <<EOF | kubectl create -f -
apiVersion: certificates.k8s.io/v1beta1
kind: CertificateSigningRequest
metadata:
name: ${csrName}
spec:
groups:
- system:authenticated
request: $(cat ${tmpdir}/server.csr | base64 | tr -d '\n')
usages:
- digital signature
- key encipherment
- server auth
EOF
# verify CSR has been created
while true; do
kubectl get csr ${csrName}
if [ "$?" -eq 0 ]; then
break
fi
done
# approve and fetch the signed certificate
kubectl certificate approve ${csrName}
# verify certificate has been signed
for x in $(seq 20); do
serverCert=$(kubectl get csr ${csrName} -o jsonpath='{.status.certificate}')
if [[ ${serverCert} != '' ]]; then
break
fi
sleep 1
done
if [[ ${serverCert} == '' ]]; then
echo "ERROR: After approving csr ${csrName}, the signed certificate did not appear on the resource. Giving up after 20 attempts." >&2
exit 1
fi
echo ${serverCert} | openssl base64 -d -A -out ${tmpdir}/server-cert.pem
# ca cert
kubectl get configmap -n kube-system extension-apiserver-authentication -o=jsonpath='{.data.client-ca-file}' >> ${tmpdir}/ca-cert.pem
# create the secret with CA cert and server cert/key
kubectl create secret generic ${secret} \
--from-file=tls.key=${tmpdir}/server-key.pem \
--from-file=tls.crt=${tmpdir}/server-cert.pem \
--from-file=ca.crt=${tmpdir}/ca-cert.pem \
--dry-run -o yaml |
kubectl -n ${namespace} apply -f -

View File

@ -1,7 +0,0 @@
name: "gen-admission-secret"
version: "1.0.0"
usage: "Generate valid cert for admission server"
description: This plugin provides signed cert to admission server.
ignoreFlags: false
useTunnel: false
command: "$HELM_PLUGIN_DIR/gen-admission-secret.sh"

View File

@ -1,50 +0,0 @@
apiVersion: admissionregistration.k8s.io/v1beta1
kind: ValidatingWebhookConfiguration
metadata:
name: {{ .Release.Name }}-validate-job
annotations:
"helm.sh/hook": pre-install,pre-upgrade,post-delete
webhooks:
- clientConfig:
service:
name: {{ .Release.Name }}-admission-service
namespace: {{ .Release.Namespace }}
path: /jobs
failurePolicy: Ignore
name: validatejob.volcano.sh
namespaceSelector: {}
rules:
- apiGroups:
- "batch.volcano.sh"
apiVersions:
- "v1alpha1"
operations:
- CREATE
- UPDATE
resources:
- jobs
---
apiVersion: admissionregistration.k8s.io/v1beta1
kind: MutatingWebhookConfiguration
metadata:
name: {{ .Release.Name }}-mutate-job
annotations:
"helm.sh/hook": pre-install,pre-upgrade,post-delete
webhooks:
- clientConfig:
service:
name: {{ .Release.Name }}-admission-service
namespace: {{ .Release.Namespace }}
path: /mutating-jobs
failurePolicy: Ignore
name: mutatejob.volcano.sh
namespaceSelector: {}
rules:
- apiGroups:
- "batch.volcano.sh"
apiVersions:
- "v1alpha1"
operations:
- CREATE
resources:
- jobs

View File

@ -1,100 +0,0 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ .Release.Name }}-admission
namespace: {{ .Release.Namespace }}
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: {{ .Release.Name }}-admission
namespace: {{ .Release.Namespace }}
rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "list", "watch"]
- apiGroups: ["admissionregistration.k8s.io"]
resources: ["mutatingwebhookconfigurations"]
verbs: ["get", "list", "watch", "patch"]
- apiGroups: ["admissionregistration.k8s.io"]
resources: ["validatingwebhookconfigurations"]
verbs: ["get", "list", "watch", "patch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: {{ .Release.Name }}-admission-role
namespace: {{ .Release.Namespace }}
subjects:
- kind: ServiceAccount
name: {{ .Release.Name }}-admission
namespace: {{ .Release.Namespace }}
roleRef:
kind: ClusterRole
name: {{ .Release.Name }}-admission
apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: volcano-admission
name: {{ .Release.Name }}-admission
namespace: {{ .Release.Namespace }}
spec:
replicas: 1
selector:
matchLabels:
app: volcano-admission
template:
metadata:
labels:
app: volcano-admission
spec:
serviceAccount: {{ .Release.Name }}-admission
{{ if .Values.basic.image_pull_secret }}
imagePullSecrets:
- name: {{ .Values.basic.image_pull_secret }}
{{ end }}
containers:
- args:
- --tls-cert-file=/admission.local.config/certificates/tls.crt
- --tls-private-key-file=/admission.local.config/certificates/tls.key
- --ca-cert-file=/admission.local.config/certificates/ca.crt
- --mutate-webhook-config-name={{ .Release.Name }}-mutate-job
- --validate-webhook-config-name={{ .Release.Name }}-validate-job
- --alsologtostderr
- --port=443
- -v=4
- 2>&1
image: {{.Values.basic.admission_image_name}}:{{.Values.basic.image_tag_version}}
imagePullPolicy: IfNotPresent
name: admission
volumeMounts:
- mountPath: /admission.local.config/certificates
name: admission-certs
readOnly: true
volumes:
- name: admission-certs
secret:
defaultMode: 420
secretName: {{.Values.basic.admission_secret_name}}
---
apiVersion: v1
kind: Service
metadata:
labels:
app: volcano-admission
name: {{ .Release.Name }}-admission-service
namespace: {{ .Release.Namespace }}
spec:
ports:
- port: 443
protocol: TCP
targetPort: 443
selector:
app: volcano-admission
sessionAffinity: None

View File

@ -1,172 +0,0 @@
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: jobs.batch.volcano.sh
annotations:
"helm.sh/hook": crd-install
spec:
group: batch.volcano.sh
names:
kind: Job
plural: jobs
scope: Namespaced
validation:
openAPIV3Schema:
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this
object represents. Servers may infer this from the endpoint the client
submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds'
type: string
metadata:
type: object
spec:
description: Specification of the desired behavior of a cron job, including
the minAvailable
properties:
input:
description: The volume mount for input of Job
properties:
volumeClaim:
description: VolumeClaim defines the PVC used by the VolumeMount.
type: object
mountPath:
description: Path within the container at which the volume should
be mounted. Must not contain ':'.
type: string
required:
- mountPath
type: object
minAvailable:
description: The minimal available pods to run for this Job
format: int32
type: integer
output:
description: The volume mount for output of Job
properties:
volumeClaim:
description: VolumeClaim defines the PVC used by the VolumeMount.
type: object
mountPath:
description: Path within the container at which the volume should
be mounted. Must not contain ':'.
type: string
required:
- mountPath
type: object
policies:
description: Specifies the default lifecycle of tasks
items:
properties:
action:
description: The action that will be taken to the PodGroup according
to Event. One of "Restart", "None". Default to None.
type: string
event:
description: The Event recorded by scheduler; the controller takes
actions according to this Event.
type: string
timeout:
description: Timeout is the grace period for controller to take
actions. Default to nil (take action immediately).
type: object
type: object
type: array
schedulerName:
description: SchedulerName is the default value of `tasks.template.spec.schedulerName`.
type: string
tasks:
description: Tasks specifies the task specification of Job
items:
properties:
name:
description: Name specifies the name of tasks
type: string
policies:
description: Specifies the lifecycle of task
items:
properties:
action:
description: The action that will be taken to the PodGroup
according to Event. One of "Restart", "None". Default
to None.
type: string
event:
description: The Event recorded by scheduler; the controller
takes actions according to this Event.
type: string
timeout:
description: Timeout is the grace period for controller
to take actions. Default to nil (take action immediately).
type: object
type: object
type: array
replicas:
description: Replicas specifies the replicas of this TaskSpec
in Job
format: int32
type: integer
template:
description: Specifies the pod that will be created for this TaskSpec
when executing a Job
type: object
type: object
type: array
type: object
status:
description: Current status of Job
properties:
Succeeded:
description: The number of pods which reached phase Succeeded.
format: int32
type: integer
failed:
description: The number of pods which reached phase Failed.
format: int32
type: integer
minAvailable:
description: The minimal available pods to run for this Job
format: int32
type: integer
pending:
description: The number of pending pods.
format: int32
type: integer
running:
description: The number of running pods.
format: int32
type: integer
version:
description: Job's current version
format: int32
type: integer
state:
description: Current state of Job.
properties:
message:
description: Human-readable message indicating details about last
transition.
type: string
phase:
description: The phase of Job
type: string
reason:
description: Unique, one-word, CamelCase reason for the condition's
last transition.
type: string
type: object
type: object
version: v1alpha1
subresources:
status: {}
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
storedVersions: []

View File

@ -1,46 +0,0 @@
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: commands.bus.volcano.sh
annotations:
"helm.sh/hook": crd-install
spec:
group: bus.volcano.sh
names:
kind: Command
plural: commands
scope: Namespaced
validation:
openAPIV3Schema:
properties:
action:
description: Action defines the action that will be took to the target object.
type: string
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this
object represents. Servers may infer this from the endpoint the client
submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds'
type: string
message:
description: Human-readable message indicating details of this command.
type: string
metadata:
type: object
reason:
description: Unique, one-word, CamelCase reason for this command.
type: string
target:
description: TargetObject defines the target object of this command.
type: object
version: v1alpha1
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
storedVersions: []

View File

@ -1,90 +0,0 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ .Release.Name }}-controllers
namespace: {{ .Release.Namespace }}
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: {{ .Release.Name }}-controllers
namespace: {{ .Release.Namespace }}
rules:
- apiGroups: ["apiextensions.k8s.io"]
resources: ["customresourcedefinitions"]
verbs: ["create", "get", "list", "watch", "delete"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["create", "get", "list", "watch", "delete", "update"]
- apiGroups: ["batch.volcano.sh"]
resources: ["jobs"]
verbs: ["get", "list", "watch", "update", "delete"]
- apiGroups: ["batch.volcano.sh"]
resources: ["jobs/status"]
verbs: ["update", "patch"]
- apiGroups: ["bus.volcano.sh"]
resources: ["commands"]
verbs: ["get", "list", "watch", "delete"]
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "list", "watch", "update", "patch"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["create", "get", "list", "watch", "update", "bind", "delete"]
- apiGroups: [""]
resources: ["persistentvolumeclaims"]
verbs: ["get", "list", "watch", "create"]
- apiGroups: [""]
resources: ["services", "configmaps"]
verbs: ["get", "list", "watch", "create", "delete"]
- apiGroups: ["scheduling.incubator.k8s.io"]
resources: ["podgroups"]
verbs: ["get", "list", "watch", "create", "delete"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: {{ .Release.Name }}-controllers-role
namespace: {{ .Release.Namespace }}
subjects:
- kind: ServiceAccount
name: {{ .Release.Name }}-controllers
namespace: {{ .Release.Namespace }}
roleRef:
kind: ClusterRole
name: {{ .Release.Name }}-controllers
apiGroup: rbac.authorization.k8s.io
---
kind: Deployment
apiVersion: apps/v1
metadata:
name: {{ .Release.Name }}-controllers
namespace: {{ .Release.Namespace }}
labels:
app: volcano-controller
spec:
replicas: 1
selector:
matchLabels:
app: volcano-controller
template:
metadata:
labels:
app: volcano-controller
spec:
serviceAccount: {{ .Release.Name }}-controllers
{{ if .Values.basic.image_pull_secret }}
imagePullSecrets:
- name: {{ .Values.basic.image_pull_secret }}
{{ end }}
containers:
- name: {{ .Release.Name }}-controllers
image: {{.Values.basic.controller_image_name}}:{{.Values.basic.image_tag_version}}
args:
- --alsologtostderr
- -v=4
- 2>&1
imagePullPolicy: "IfNotPresent"

View File

@ -1,6 +0,0 @@
apiVersion: scheduling.incubator.k8s.io/v1alpha1
kind: Queue
metadata:
name: default
spec:
weight: 1

View File

@ -1,115 +0,0 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ .Release.Name }}-scheduler-configmap
data:
{{- (.Files.Glob "config/*").AsConfig | nindent 2 }}
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ .Release.Name }}-scheduler
namespace: {{ .Release.Namespace }}
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: {{ .Release.Name }}-scheduler
namespace: {{ .Release.Namespace }}
rules:
- apiGroups: ["apiextensions.k8s.io"]
resources: ["customresourcedefinitions"]
verbs: ["create", "get", "list", "watch", "delete"]
- apiGroups: ["batch.volcano.sh"]
resources: ["jobs"]
verbs: ["get", "list", "watch", "update", "delete"]
- apiGroups: ["batch.volcano.sh"]
resources: ["jobs/status"]
verbs: ["update", "patch"]
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "list", "watch", "update", "patch"]
- apiGroups: [""]
resources: ["pods", "pods/status"]
verbs: ["create", "get", "list", "watch", "update", "bind", "updateStatus", "delete"]
- apiGroups: [""]
resources: ["pods/binding"]
verbs: ["create"]
- apiGroups: [""]
resources: ["persistentvolumeclaims"]
verbs: ["list", "watch"]
- apiGroups: [""]
resources: ["persistentvolumes"]
verbs: ["list", "watch"]
- apiGroups: ["scheduling.incubator.k8s.io"]
resources: ["podgroups"]
verbs: ["list", "watch", "update"]
- apiGroups: [""]
resources: ["namespaces"]
verbs: ["list", "watch"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["list", "watch"]
- apiGroups: [""]
resources: ["nodes"]
verbs: ["list", "watch"]
- apiGroups: ["policy"]
resources: ["poddisruptionbudgets"]
verbs: ["list", "watch"]
- apiGroups: ["scheduling.incubator.k8s.io"]
resources: ["queues"]
verbs: ["get", "list", "watch", "create", "delete"]
- apiGroups: ["scheduling.k8s.io"]
resources: ["priorityclasses"]
verbs: ["get", "list", "watch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: {{ .Release.Name }}-scheduler-role
namespace: {{ .Release.Namespace }}
subjects:
- kind: ServiceAccount
name: {{ .Release.Name }}-scheduler
namespace: {{ .Release.Namespace }}
roleRef:
kind: ClusterRole
name: {{ .Release.Name }}-scheduler
apiGroup: rbac.authorization.k8s.io
---
kind: Deployment
apiVersion: apps/v1
metadata:
name: {{ .Release.Name }}-scheduler
namespace: {{ .Release.Namespace }}
labels:
app: volcano-scheduler
spec:
replicas: 1
selector:
matchLabels:
app: volcano-scheduler
template:
metadata:
labels:
app: volcano-scheduler
spec:
serviceAccount: {{ .Release.Name }}-scheduler
containers:
- name: {{ .Release.Name }}-scheduler
image: {{.Values.basic.scheduler_image_name}}:{{.Values.basic.image_tag_version}}
args:
- --alsologtostderr
- --scheduler-conf={{.Values.basic.scheduler_conf_file}}
- -v=3
- 2>&1
imagePullPolicy: "IfNotPresent"
volumeMounts:
- name: scheduler-config
mountPath: /volcano.scheduler
volumes:
- name: scheduler-config
configMap:
name: {{ .Release.Name }}-scheduler-configmap

View File

@ -1,41 +0,0 @@
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: podgroups.scheduling.incubator.k8s.io
annotations:
"helm.sh/hook": crd-install
spec:
group: scheduling.incubator.k8s.io
names:
kind: PodGroup
plural: podgroups
scope: Namespaced
validation:
openAPIV3Schema:
properties:
apiVersion:
type: string
kind:
type: string
metadata:
type: object
spec:
properties:
minMember:
format: int32
type: integer
type: object
status:
properties:
succeeded:
format: int32
type: integer
failed:
format: int32
type: integer
running:
format: int32
type: integer
type: object
type: object
version: v1alpha1

View File

@ -1,29 +0,0 @@
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: queues.scheduling.incubator.k8s.io
annotations:
"helm.sh/hook": crd-install
spec:
group: scheduling.incubator.k8s.io
names:
kind: Queue
plural: queues
scope: Cluster
validation:
openAPIV3Schema:
properties:
apiVersion:
type: string
kind:
type: string
metadata:
type: object
spec:
properties:
weight:
format: int32
type: integer
type: object
type: object
version: v1alpha1

View File

@ -1,8 +0,0 @@
basic:
image_tag_version: "latest"
controller_image_name: "kubesigs/vk-controllers"
scheduler_image_name: "kubesigs/kube-batch"
admission_image_name: "kubesigs/vk-admission"
admission_secret_name: "volcano-admission-secret"
image_pull_secret: ""
scheduler_conf_file: "/volcano.scheduler/kube-batch.conf"

View File

@ -1,645 +0,0 @@
# Job API
[@k82cn](http://github.com/k82cn); Dec 27, 2018
## Motivation
`Job` is the fundamental object of high performance workload; this document provides the definition of `Job` in Volcano.
## Scope
### In Scope
* Define the API of Job
* Define the behaviour of Job
* Clarify the interaction with other features
### Out of Scope
* Volumes: volume management is out of scope for job management related features
* Network: the addressing between tasks will be described in other project
## Function Detail
The definition of `Job` follow Kuberentes's style, e.g. Status, Spec; the follow sections will only describe
the major functions of `Job`, refer to [Appendix](#appendix) section for the whole definition of `Job`.
### Multiple Pod Template
As most jobs of high performance workload include different type of tasks, e.g. TensorFlow (ps/worker), Spark (driver/executor);
`Job` introduces `taskSpecs` to support multiple pod template, defined as follow. The `Policies` will describe in
[Error Handling](#error-handling) section.
```go
// JobSpec describes how the job execution will look like and when it will actually run
type JobSpec struct {
...
// Tasks specifies the task specification of Job
// +optional
Tasks []TaskSpec `json:"tasks,omitempty" protobuf:"bytes,5,opt,name=tasks"`
}
// TaskSpec specifies the task specification of Job
type TaskSpec struct {
// Name specifies the name of task
Name string `json:"name,omitempty" protobuf:"bytes,1,opt,name=name"`
// Replicas specifies the replicas of this TaskSpec in Job
Replicas int32 `json:"replicas,omitempty" protobuf:"bytes,2,opt,name=replicas"`
// Specifies the pod that will be created for this TaskSpec
// when executing a Job
Template v1.PodTemplateSpec `json:"template,omitempty" protobuf:"bytes,3,opt,name=template"`
// Specifies the lifecycle of tasks
// +optional
Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,4,opt,name=policies"`
}
```
`JobController` will create Pods based on the templates and replicas in `spec.tasks`;
the controlled `OwnerReference` of Pod will be set to the `Job`. The following is
an example YAML with multiple pod template.
```yaml
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: tf-job
spec:
tasks:
- name: "ps"
replicas: 2
template:
spec:
containers:
- name: ps
image: ps-img
- name: "worker"
replicas: 5
template:
spec:
containers:
- name: worker
image: worker-img
```
### Job Input/Output
Most of high performance workload will handle data which is considering as input/output of a Job.
The following types are introduced for Job's input/output.
```go
type VolumeSpec struct {
MountPath string `json:"mountPath" protobuf:"bytes,1,opt,name=mountPath"`
// VolumeClaim defines the PVC used by the VolumeSpec.
// + optional
VolumeClaim *PersistentVolumeClaim `json:"claim,omitempty" protobuf:"bytes,2,opt,name=claim"`
}
type JobSpec struct{
...
// The volume mount for input of Job
// +optional
Input *VolumeSpec `json:"input,omitempty" protobuf:"bytes,3,opt,name=input"`
// The volume mount for output of Job
// +optional
Output *VolumeSpec `json:"output,omitempty" protobuf:"bytes,4,opt,name=output"`
}
```
The `Input`&`Output` of Job can be `nil` which means user will manage data themselves. If `*put.volumeClaim` is `nil`,
`emptyDir` volume will be used for each Task/Pod.
### Conditions and Phases
The following phases are introduced to give a simple, high-level summary of where the Job is in its lifecycle; and the conditions array,
the reason and message field contain more detail about the job's status.
```go
type JobPhase string
const (
// Pending is the phase that job is pending in the queue, waiting for scheduling decision
Pending JobPhase = "Pending"
// Aborting is the phase that job is aborted, waiting for releasing pods
Aborting JobPhase = "Aborting"
// Aborted is the phase that job is aborted by user or error handling
Aborted JobPhase = "Aborted"
// Running is the phase that minimal available tasks of Job are running
Running JobPhase = "Running"
// Restarting is the phase that the Job is restarted, waiting for pod releasing and recreating
Restarting JobPhase = "Restarting"
// Completed is the phase that all tasks of Job are completed successfully
Completed JobPhase = "Completed"
// Terminating is the phase that the Job is terminated, waiting for releasing pods
Terminating JobPhase = "Terminating"
// Teriminated is the phase that the job is finished unexpected, e.g. events
Teriminated JobPhase = "Terminated"
)
// JobState contains details for the current state of the job.
type JobState struct {
// The phase of Job.
// +optional
Phase JobPhase `json:"phase,omitempty" protobuf:"bytes,1,opt,name=phase"`
// Unique, one-word, CamelCase reason for the phase's last transition.
// +optional
Reason string `json:"reason,omitempty" protobuf:"bytes,2,opt,name=reason"`
// Human-readable message indicating details about last transition.
// +optional
Message string `json:"message,omitempty" protobuf:"bytes,3,opt,name=message"`
}
// JobStatus represents the current state of a Job
type JobStatus struct {
// Current state of Job.
State JobState `json:"state,omitempty" protobuf:"bytes,1,opt,name=state"`
......
}
```
The following table shows available transactions between different phases. The phase can not transfer to the target
phase if the cell is empty.
| From \ To | Pending | Aborted | Running | Completed | Terminated |
| ------------- | ------- | ------- | ------- | --------- | ---------- |
| Pending | * | * | * | | |
| Aborted | * | * | | | |
| Running | | * | * | * | * |
| Completed | | | | * | |
| Terminated | | | | | * |
`Restarting`, `Aborting` and `Terminating` are temporary states to avoid race condition, e.g. there'll be several
`PodeEvictedEvent`s because of `TerminateJobAction` which should not be handled again.
### Error Handling
After Job was created in system, there'll be several events related to the Job, e.g. Pod succeeded, Pod failed;
and some events are critical to the Job, e.g. Pod of MPIJob failed. So `LifecyclePolicy` is introduced to handle different
events based on user's configuration.
```go
// Event is the type of Event related to the Job
type Event string
const (
// AllEvents means all event
AllEvents Event = "*"
// PodFailedEvent is triggered if Pod was failed
PodFailedEvent Event = "PodFailed"
// PodEvictedEvent is triggered if Pod was deleted
PodEvictedEvent Event = "PodEvicted"
// These below are several events can lead to job 'Unknown'
// 1. Task Unschedulable, this is triggered when part of
// pods can't be scheduled while some are already running in gang-scheduling case.
JobUnknownEvent Event = "Unknown"
// OutOfSyncEvent is triggered if Pod/Job were updated
OutOfSyncEvent Event = "OutOfSync"
// CommandIssuedEvent is triggered if a command is raised by user
CommandIssuedEvent Event = "CommandIssued"
)
// Action is the type of event handling
type Action string
const (
// AbortJobAction if this action is set, the whole job will be aborted:
// all Pod of Job will be evicted, and no Pod will be recreated
AbortJobAction Action = "AbortJob"
// RestartJobAction if this action is set, the whole job will be restarted
RestartJobAction Action = "RestartJob"
// RestartTaskAction if this action is set, only the task will be restarted; default action.
// This action can not work together with job level events, e.g. JobUnschedulable
RestartTaskAction Action = "RestartTask"
// TerminateJobAction if this action is set, the whole job wil be terminated
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
TerminateJobAction Action = "TerminateJob"
// ResumeJobAction is the action to resume an aborted job.
ResumeJobAction Action = "ResumeJob"
// SyncJobAction is the action to sync Job/Pod status.
SyncJobAction Action = "SyncJob"
)
// LifecyclePolicy specifies the lifecycle and error handling of task and job.
type LifecyclePolicy struct {
Event Event `json:"event,omitempty" protobuf:"bytes,1,opt,name=event"`
Action Action `json:"action,omitempty" protobuf:"bytes,2,opt,name=action"`
Timeout *metav1.Duration `json:"timeout,omitempty" protobuf:"bytes,3,opt,name=timeout"`
}
```
Both `JobSpec` and `TaskSpec` include lifecycle policy: the policies in `JobSpec` are the default policy if no policies
in `TaskSpec`; the policies in `TaskSpec` will overwrite defaults.
```go
// JobSpec describes how the job execution will look like and when it will actually run
type JobSpec struct {
...
// Specifies the default lifecycle of tasks
// +optional
Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,5,opt,name=policies"`
// Tasks specifies the task specification of Job
// +optional
Tasks []TaskSpec `json:"tasks,omitempty" protobuf:"bytes,6,opt,name=tasks"`
}
// TaskSpec specifies the task specification of Job
type TaskSpec struct {
...
// Specifies the lifecycle of tasks
// +optional
Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,4,opt,name=policies"`
}
```
The following examples demonstrate the usage of `LifecyclePolicy` for job and task.
For the training job of machine learning framework, the whole job should be restarted if any task was failed or evicted.
To simplify the configuration, a job level `LifecyclePolicy` is set as follows. As no `LifecyclePolicy` is set for any
task, all tasks will use the policies in `spec.policies`.
```yaml
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: tf-job
spec:
# If any event here, restart the whole job.
policies:
- event: *
action: RestartJob
tasks:
- name: "ps"
replicas: 1
template:
spec:
containers:
- name: ps
image: ps-img
- name: "worker"
replicas: 5
template:
spec:
containers:
- name: worker
image: worker-img
...
```
Some BigData framework (e.g. Spark) may have different requirements. Take Spark as example, the whole job will be restarted
if 'driver' tasks failed and only restart the task if 'executor' tasks failed. As `RestartTask` is the default action of
task events, `RestartJob` is set for driver `spec.tasks.policies` as follow.
```yaml
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: spark-job
spec:
tasks:
- name: "driver"
replicas: 1
policies:
- event: *
action: RestartJob
template:
spec:
containers:
- name: driver
image: driver-img
- name: "executor"
replicas: 5
template:
spec:
containers:
- name: executor
image: executor-img
```
## Features Interaction
### Admission Controller
The following validations must be included to make sure expected behaviours:
* `spec.minAvailable` <= sum(`spec.taskSpecs.replicas`)
* no duplicated name in `spec.taskSpecs` array
* no duplicated event handler in `LifecyclePolicy` array, both job policies and task policies
### CoScheduling
CoScheduling (or Gang-scheduling) is required by most of high performance workload, e.g. TF training job, MPI job.
The `spec.minAvailable` is used to identify how many pods will be scheduled together. The default value of `spec.minAvailable`
is summary of `spec.tasks.replicas`. The admission controller web hook will check `spec.minAvailable` against
the summary of `spec.tasks.replicas`; the job creation will be rejected if `spec.minAvailable` > sum(`spec.tasks.replicas`).
If `spec.minAvailable` < sum(`spec.tasks.replicas`), the pod of `spec.tasks` will be created randomly;
refer to [Task Priority with Job](#task-priority-within-job) section on how to create tasks in order.
```yaml
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: tf-job
spec:
# minAvailable to run job
minAvailable: 6
tasks:
- name: "ps"
replicas: 1
template:
spec:
containers:
- name: "ps"
image: "ps-img"
- name: "worker"
replicas: 5
template:
spec:
containers:
- name: "worker"
image: "worker-img"
```
### Task Priority within Job
In addition to multiple pod template, the priority of each task maybe different. `PriorityClass` of `PodTemplate` is reused
to define the priority of task within a job. This's an example to run spark job: 1 driver with 5 executors, the driver's
priority is `master-pri` which is higher than normal pods; as `spec.minAvailable` is 3, the scheduler will make sure one driver
with 2 executors will be scheduled if not enough resources.
```yaml
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: spark-job
spec:
minAvailable: 3
tasks:
- name: "driver"
replicas: 1
template:
spec:
priorityClass: "master-pri"
containers:
- name: driver
image: driver-img
- name: "executor"
replicas: 5
template:
spec:
containers:
- name: executor
image: executor-img
```
**NOTE**: although scheduler will make sure high priority pods with job will be scheduled firstly, there's still a race
condition between different kubelets that low priority pod maybe launched early; the job/task dependency will be introduced
later to handle such kind of race condition.
### Resource sharing between Job
By default, the `spec.minAvailable` is set to the summary of `spec.tasks.replicas`; if it's set to a smaller value,
the pod beyond `spec.minAvailable` will share resource between jobs.
```yaml
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: spark-job
spec:
minAvailable: 3
tasks:
- name: "driver"
replicas: 1
template:
spec:
priorityClass: "master-pri"
containers:
- name: driver
image: driver-img
- name: "executor"
replicas: 5
template:
spec:
containers:
- name: executor
image: executor-img
```
## Appendix
```go
type Job struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
// Specification of the desired behavior of a cron job, including the minAvailable
// +optional
Spec JobSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
// Current status of Job
// +optional
Status JobStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}
// JobSpec describes how the job execution will look like and when it will actually run
type JobSpec struct {
// SchedulerName is the default value of `taskSpecs.template.spec.schedulerName`.
// +optional
SchedulerName string `json:"schedulerName,omitempty" protobuf:"bytes,1,opt,name=schedulerName"`
// The minimal available pods to run for this Job
// +optional
MinAvailable int32 `json:"minAvailable,omitempty" protobuf:"bytes,2,opt,name=minAvailable"`
// The volume mount for input of Job
Input *VolumeSpec `json:"input,omitempty" protobuf:"bytes,3,opt,name=input"`
// The volume mount for output of Job
Output *VolumeSpec `json:"output,omitempty" protobuf:"bytes,4,opt,name=output"`
// Tasks specifies the task specification of Job
// +optional
Tasks []TaskSpec `json:"taskSpecs,omitempty" protobuf:"bytes,5,opt,name=taskSpecs"`
// Specifies the default lifecycle of tasks
// +optional
Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,6,opt,name=policies"`
}
// VolumeSpec defines the specification of Volume, e.g. PVC
type VolumeSpec struct {
MountPath string `json:"mountPath" protobuf:"bytes,1,opt,name=mountPath"`
// VolumeClaim defines the PVC used by the VolumeMount.
VolumeClaim *v1.PersistentVolumeClaimSpec `json:"claim,omitempty" protobuf:"bytes,1,opt,name=claim"`
}
// Event represent the phase of Job, e.g. pod-failed.
type Event string
const (
// AllEvent means all event
AllEvents Event = "*"
// PodFailedEvent is triggered if Pod was failed
PodFailedEvent Event = "PodFailed"
// PodEvictedEvent is triggered if Pod was deleted
PodEvictedEvent Event = "PodEvicted"
// These below are several events can lead to job 'Unknown'
// 1. Task Unschedulable, this is triggered when part of
// pods can't be scheduled while some are already running in gang-scheduling case.
JobUnknownEvent Event = "Unknown"
// OutOfSyncEvent is triggered if Pod/Job were updated
OutOfSyncEvent Event = "OutOfSync"
// CommandIssuedEvent is triggered if a command is raised by user
CommandIssuedEvent Event = "CommandIssued"
)
// Action is the action that Job controller will take according to the event.
type Action string
const (
// AbortJobAction if this action is set, the whole job will be aborted:
// all Pod of Job will be evicted, and no Pod will be recreated
AbortJobAction Action = "AbortJob"
// RestartJobAction if this action is set, the whole job will be restarted
RestartJobAction Action = "RestartJob"
// RestartTaskAction if this action is set, only the task will be restarted; default action.
// This action can not work together with job level events, e.g. JobUnschedulable
RestartTaskAction Action = "RestartTask"
// TerminateJobAction if this action is set, the whole job wil be terminated
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
TerminateJobAction Action = "TerminateJob"
// ResumeJobAction is the action to resume an aborted job.
ResumeJobAction Action = "ResumeJob"
// SyncJobAction is the action to sync Job/Pod status.
SyncJobAction Action = "SyncJob"
)
// LifecyclePolicy specifies the lifecycle and error handling of task and job.
type LifecyclePolicy struct {
// The action that will be taken to the PodGroup according to Event.
// One of "Restart", "None".
// Default to None.
// +optional
Action Action `json:"action,omitempty" protobuf:"bytes,1,opt,name=action"`
// The Event recorded by scheduler; the controller takes actions
// according to this Event.
// +optional
Event Event `json:"event,omitempty" protobuf:"bytes,2,opt,name=event"`
// Timeout is the grace period for controller to take actions.
// Default to nil (take action immediately).
// +optional
Timeout *metav1.Duration `json:"timeout,omitempty" protobuf:"bytes,3,opt,name=timeout"`
}
// TaskSpec specifies the task specification of Job
type TaskSpec struct {
// Name specifies the name of tasks
Name string `json:"name,omitempty" protobuf:"bytes,1,opt,name=name"`
// Replicas specifies the replicas of this TaskSpec in Job
Replicas int32 `json:"replicas,omitempty" protobuf:"bytes,2,opt,name=replicas"`
// Specifies the pod that will be created for this TaskSpec
// when executing a Job
Template v1.PodTemplateSpec `json:"template,omitempty" protobuf:"bytes,3,opt,name=template"`
// Specifies the lifecycle of task
// +optional
Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,4,opt,name=policies"`
}
type JobPhase string
const (
// Pending is the phase that job is pending in the queue, waiting for scheduling decision
Pending JobPhase = "Pending"
// Aborting is the phase that job is aborted, waiting for releasing pods
Aborting JobPhase = "Aborting"
// Aborted is the phase that job is aborted by user or error handling
Aborted JobPhase = "Aborted"
// Running is the phase that minimal available tasks of Job are running
Running JobPhase = "Running"
// Restarting is the phase that the Job is restarted, waiting for pod releasing and recreating
Restarting JobPhase = "Restarting"
// Completed is the phase that all tasks of Job are completed successfully
Completed JobPhase = "Completed"
// Terminating is the phase that the Job is terminated, waiting for releasing pods
Terminating JobPhase = "Terminating"
// Terminated is the phase that the job is finished unexpected, e.g. events
Terminated JobPhase = "Terminated"
)
// JobState contains details for the current state of the job.
type JobState struct {
// The phase of Job.
// +optional
Phase JobPhase `json:"phase,omitempty" protobuf:"bytes,1,opt,name=phase"`
// Unique, one-word, CamelCase reason for the phase's last transition.
// +optional
Reason string `json:"reason,omitempty" protobuf:"bytes,2,opt,name=reason"`
// Human-readable message indicating details about last transition.
// +optional
Message string `json:"message,omitempty" protobuf:"bytes,3,opt,name=message"`
}
// JobStatus represents the current status of a Job
type JobStatus struct {
// Current state of Job.
State JobState `json:"state,omitempty" protobuf:"bytes,1,opt,name=state"`
// The number of pending pods.
// +optional
Pending int32 `json:"pending,omitempty" protobuf:"bytes,2,opt,name=pending"`
// The number of running pods.
// +optional
Running int32 `json:"running,omitempty" protobuf:"bytes,3,opt,name=running"`
// The number of pods which reached phase Succeeded.
// +optional
Succeeded int32 `json:"Succeeded,omitempty" protobuf:"bytes,4,opt,name=succeeded"`
// The number of pods which reached phase Failed.
// +optional
Failed int32 `json:"failed,omitempty" protobuf:"bytes,5,opt,name=failed"`
// The minimal available pods to run for this Job
// +optional
MinAvailable int32 `json:"minAvailable,omitempty" protobuf:"bytes,6,opt,name=minAvailable"`
// The number of pods which reached phase Terminating.
// +optional
Terminating int32 `json:"terminating,omitempty" protobuf:"bytes,7,opt,name=terminating"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type JobList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
Items []Job `json:"items" protobuf:"bytes,2,rep,name=items"`
}
```

Binary file not shown.

Before

Width:  |  Height:  |  Size: 144 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 27 KiB

View File

@ -1,172 +0,0 @@
![volcano-logo](../images/volcano-logo.png)
# Volcano
[![Build Status](https://travis-ci.org/kubernetes-sigs/kube-batch.svg?branch=master)](https://travis-ci.org/kubernetes-sigs/kube-batch)
[![Go Report Card](https://goreportcard.com/badge/github.com/kubernetes-sigs/kube-batch)](https://goreportcard.com/report/github.com/kubernetes-sigs/kube-batch)
[![RepoSize](https://img.shields.io/github/repo-size/kubernetes-sigs/kube-batch.svg)](https://github.com/kubernetes-sigs/kube-batch)
[![Release](https://img.shields.io/github/release/kubernetes-sigs/kube-batch.svg)](https://github.com/kubernetes-sigs/kube-batch/releases)
[![LICENSE](https://img.shields.io/github/license/kubernetes-sigs/kube-batch.svg)](https://github.com/kubernetes-sigs/kube-batch/blob/master/LICENSE)
Volcano is system for running high performance workloads on
Kubernetes. It provides a suite of mechanisms currently missing from
Kubernetes that are commonly required by many classes of high
performance workload including:
1. machine learning/deep learning,
2. bioinformatics/genomics, and
3. other "big data" applications.
These types of applications typically run on generalized domain
frameworks like Tensorflow, Spark, PyTorch, MPI, etc, which Volcano integrates with.
Some examples of the mechanisms and features that Volcano adds to Kubernetes are:
1. Job management extensions and improvements, e.g:
1. Multi-pod jobs
2. Lifecycle management extensions including suspend/resume and
restart.
3. Improved error handling
4. Indexed jobs
5. Task dependencies
2. Scheduling extensions, e.g:
1. Co-scheduling
2. Fair-share scheduling
3. Queue scheduling
4. Preemption and reclaims
5. Reservartions and backfills
6. Topology-based scheduling
3. Runtime extensions, e.g:
1. Support for specialized continer runtimes like Singularity,
with GPU accelerator extensions and enhanced security features.
4. Other
1. Data locality awareness and intelligent scheduling
2. Optimizations for data throughput, round-trip latency, etc.
Volcano builds upon a decade and a half of experience running a wide
variety of high performance workloads at scale using several systems
and platforms, combined with best-of-breed ideas and practices from
the open source community.
## Overall Architecture
![volcano](../images/volcano-intro.png)
## Quick Start Guide
The easiest way to deploy Volcano is to use the Helm chart.
### Pre-requisites
First of all, clone the repo to your local path:
```
# mkdir -p $GOPATH/src/github.com/kubernetes-sigs
# cd $GOPATH/src/github.com/kubernetes-sigs
# git clone https://github.com/kubernetes-sigs/kube-batch
```
### 1. Volcano Image
Official images are available on [DockerHub](https://hub.docker.com/u/kubesigs), however you can
build them locally with the command:
```
cd $GOPATH/src/github.com/kubernetes-sigs/kube-batch
make images
## Verify your images
# docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
volcanosh/volcano-admission latest a83338506638 8 seconds ago 41.4MB
volcanosh/volcano-scheduler latest faa3c2a25ac3 9 seconds ago 49.6MB
volcanosh/volcano-controllers latest 7b11606ebfb8 10 seconds ago 44.2MB
```
**NOTE**: You need ensure the images are correctly loaded in your kubernetes cluster, for
example, if you are using [kind cluster](https://github.com/kubernetes-sigs/kind),
try command ```kind load docker-image <image-name>:<tag> ``` for each of the images.
### 2. Helm charts
Second, install the required helm plugin and generate valid
certificate, volcano uses a helm plugin **gen-admission-secret** to
generate certificate for admission service to communicate with
kubernetes API server.
Alternatively you can also follow the youtube tutorial over [here](https://www.youtube.com/watch?v=hsXXmWSUtyo) to checkout the installation step.
```
#1. Install helm plugin
helm plugin install installer/chart/volcano/plugins/gen-admission-secret
#2. Generate secret within service name
helm gen-admission-secret --service <specified-name>-admission-service --namespace <namespace>
## For eg:
kubectl create namespace volcano-trial
helm gen-admission-secret --service volcano-trial-admission-service --namespace volcano-trial
```
Finally, install helm chart.
```
helm install installer/chart/volcano --namespace <namespace> --name <specified-name>
For eg :
helm install installer/chart/volcano --namespace volcano-trial --name volcano-trial
```
**NOTE**:The ```<specified-name>``` used in the two commands above should be identical.
To Verify your installation run the following commands:
```
#1. Verify the Running Pods
# kubectl get pods --namespace <namespace>
NAME READY STATUS RESTARTS AGE
<specified-name>-admission-84fd9b9dd8-9trxn 1/1 Running 0 43s
<specified-name>-controllers-75dcc8ff89-42v6r 1/1 Running 0 43s
<specified-name>-scheduler-b94cdb867-89pm2 1/1 Running 0 43s
#2. Verify the Services
# kubectl get services --namespace <namespace>
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
<specified-name>-admission-service ClusterIP 10.105.78.53 <none> 443/TCP 91s
```
## Developing
### E2E Test
Volcano also utilize [kind cluster](https://github.com/kubernetes-sigs/kind) to provide a simple way to
cover E2E tests. Make sure you have kubectl and kind binary installed on your local environment
before running tests. Command as below:
```
make e2e-kind
```
In case of debugging, you can keep the kubernetes cluster environment after tests via:
```
CLEANUP_CLUSTER=-1 make e2e-kind
```
And if only parts of the tests are focused, please execute:
```
TEST_FILE=<test-file-name> make e2e-kind
```
Command above will finally be translated
into: ``go test ./test/e2e/volcano -v -timeout 30m -args --ginkgo.regexScansFilePath=true --ginkgo.focus=<test-file-name>``
## Community, discussion, contribution, and support
You can reach the maintainers of this project at:
Slack: [#volcano-sh](http://t.cn/Efa7LKx)
Mailing List: https://groups.google.com/forum/#!forum/volcano-sh

View File

@ -1,27 +0,0 @@
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: test-job-webhook-disallow
spec:
schedulerName: kube-batch
minAvailable: 1
tasks:
- replicas: 2
name: task-1
template:
metadata:
name: web-1
spec:
containers:
- image: nginx
imagePullPolicy: IfNotPresent
name: nginx
resources:
requests:
cpu: "1"
restartPolicy: OnFailure
policies:
- event: PodFailed
action: RestartJob
- event: PodFailed ## this job will be rejected because one event should not have different action
action: AbortJob

View File

@ -1,36 +0,0 @@
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: test-job-webhook-disallow
spec:
schedulerName: kube-batch
minAvailable: 4
tasks:
- replicas: 2
name: task-1
template:
metadata:
name: web-1
spec:
containers:
- image: nginx
imagePullPolicy: IfNotPresent
name: nginx
resources:
requests:
cpu: "1"
restartPolicy: OnFailure
- replicas: 2
name: task-1 ## this job will be rejected due to duplicated task name
template:
metadata:
name: web-1
spec:
containers:
- image: nginx
imagePullPolicy: IfNotPresent
name: nginx
resources:
requests:
cpu: "1"
restartPolicy: OnFailure

View File

@ -1,17 +0,0 @@
FROM ubuntu:16.04
MAINTAINER volcano <maintainer@volcano.sh>
RUN apt-get update --fix-missing \
&& apt-get install -y libopenmpi-dev openmpi-bin \
&& apt-get install -y git \
&& apt-get install -y build-essential \
&& apt-get install -y ssh \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
RUN git clone https://github.com/wesleykendall/mpitutorial \
&& cd mpitutorial/tutorials/mpi-hello-world/code \
&& make \
&& cp mpi_hello_world /home/ \
&& apt-get autoremove -y git \
&& apt-get autoremove -y build-essential \
&& rm -rf "/mpitutorial"
CMD mkdir -p /var/run/sshd; /usr/sbin/sshd;

View File

@ -1,5 +0,0 @@
## MPI Example
This is an example for deploying the mpi job in Volcano. You can follow the youtube tutorial over [here](https://www.youtube.com/watch?v=hsXXmWSUtyo) to checkout the installation step.
<iframe width="560" height="315" src="https://www.youtube.com/embed/hsXXmWSUtyo" frameborder="0" allow="accelerometer; autoplay; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe>

View File

@ -1,90 +0,0 @@
################################################
# #
# Demo for running MPI tasks on volcano #
# #
################################################
#
# This yaml used to demonstrate how to running a hello-world MPI
# task via Volcano Job, the MPI program is directly brought from
# official website (https://github.com/wesleykendall/mpitutorial/tree/gh-pages/tutorials/mpi-hello-world)
# more details are located in `Dockerfile`.
#
# There are two plugins that make MPI works in cluster.
# 1. **ENV**: env plugin used to generate host file for MPI communicating between pods.
# 2. **SSH**: ssh plugin used to generate required key and config for ssh tools.
#
# When Job is running, you can ensure the mpi-hello-world works correctly from master's stdout via:
#
# kubectl logs lm-mpi-job-mpimaster-0
#
# and output would below:
#
# -------------------------------------------------------------------------
# [[40437,1],1]: A high-performance Open MPI point-to-point messaging module
# was unable to find any relevant network interfaces:
#
# Module: OpenFabrics (openib)
# Host: lm-mpi-job-mpiworker-1
#
# Another transport will be used instead, although this may result in
# lower performance.
# --------------------------------------------------------------------------
# Hello world from processor lm-mpi-job-mpiworker-1, rank 1 out of 2 processors
# Hello world from processor lm-mpi-job-mpiworker-0, rank 0 out of 2 processors
#
# **NOTE**: There are two sleep command before&after MPI execution, the previous is used
# for waiting worker pods to become ready and the latter is used to guarantee
# logs are captured before exiting.
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: lm-mpi-job
spec:
minAvailable: 3
schedulerName: kube-batch
plugins:
ssh: []
env: []
tasks:
- replicas: 1
name: mpimaster
policies:
- event: TaskCompleted
action: CompleteJob
template:
spec:
containers:
- command:
- /bin/sh
- -c
- |
sleep 10s
mkdir -p /var/run/sshd; /usr/sbin/sshd;
mpiexec --allow-run-as-root --hostfile /etc/volcano/mpiworker.host -np 2 mpi_hello_world;
sleep 10s
image: volcanosh/example-mpi:0.0.1
name: mpimaster
ports:
- containerPort: 22
name: mpijob-port
workingDir: /home
restartPolicy: OnFailure
- replicas: 2
name: mpiworker
template:
spec:
containers:
- command:
- /bin/sh
- -c
- |
mkdir -p /var/run/sshd; /usr/sbin/sshd -D;
image: volcanosh/example-mpi:0.0.1
name: mpiworker
ports:
- containerPort: 22
name: mpijob-port
workingDir: /home
restartPolicy: OnFailure
---

View File

@ -1,35 +0,0 @@
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: test-job
spec:
minAvailable: 3
schedulerName: kube-batch
policies:
- event: PodEvicted
action: RestartJob
input:
mountPath: "/myinput"
output:
mountPath: "/myoutput"
volumeClaim:
accessModes: [ "ReadWriteOnce" ]
storageClassName: "my-storage-class"
resources:
requests:
storage: 1Gi
tasks:
- replicas: 6
name: "default-nginx"
template:
metadata:
name: web
spec:
containers:
- image: nginx
imagePullPolicy: IfNotPresent
name: nginx
resources:
requests:
cpu: "1"
restartPolicy: OnFailure

View File

@ -1,11 +0,0 @@
actions: "reclaim, allocate, backfill, preempt"
tiers:
- plugins:
- name: priority
- name: gang
- name: conformance
- plugins:
- name: drf
- name: predicates
- name: proportion
- name: nodeorder

View File

@ -1,36 +0,0 @@
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: test-job-webhook-disallow
spec:
schedulerName: kube-batch
minAvailable: 5 ## this job will be rejected because minAvailable is greater than total replicas in tasks
tasks:
- replicas: 2
name: task-1
template:
metadata:
name: web-1
spec:
containers:
- image: nginx
imagePullPolicy: IfNotPresent
name: nginx
resources:
requests:
cpu: "1"
restartPolicy: OnFailure
- replicas: 2
name: task-2
template:
metadata:
name: web-2
spec:
containers:
- image: nginx
imagePullPolicy: IfNotPresent
name: nginx
resources:
requests:
cpu: "1"
restartPolicy: OnFailure

View File

@ -1,12 +0,0 @@
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: default-as-admin
subjects:
- kind: ServiceAccount
name: default
namespace: kube-system
roleRef:
kind: ClusterRole
name: cluster-admin
apiGroup: rbac.authorization.k8s.io

View File

@ -1,18 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// +k8s:deepcopy-gen=package
package v1alpha1

View File

@ -1,25 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
const (
TaskSpecKey = "volcano.sh/task-spec"
JobNameKey = "volcano.sh/job-name"
JobNamespaceKey = "volcano.sh/job-namespace"
DefaultTaskSpec = "default"
JobVersion = "volcano.sh/job-version"
)

View File

@ -1,50 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
var (
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme
)
// GroupName is the group name used in this package.
const GroupName = "batch.volcano.sh"
// SchemeGroupVersion is the group version used to register these objects.
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1alpha1"}
// Resource takes an unqualified resource and returns a Group-qualified GroupResource.
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
// addKnownTypes adds the set of types defined in this package to the supplied scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&Job{},
&JobList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}

View File

@ -1,254 +0,0 @@
// +build !ignore_autogenerated
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by deepcopy-gen. DO NOT EDIT.
package v1alpha1
import (
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Job) DeepCopyInto(out *Job) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
in.Spec.DeepCopyInto(&out.Spec)
in.Status.DeepCopyInto(&out.Status)
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Job.
func (in *Job) DeepCopy() *Job {
if in == nil {
return nil
}
out := new(Job)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *Job) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *JobList) DeepCopyInto(out *JobList) {
*out = *in
out.TypeMeta = in.TypeMeta
out.ListMeta = in.ListMeta
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]Job, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobList.
func (in *JobList) DeepCopy() *JobList {
if in == nil {
return nil
}
out := new(JobList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *JobList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *JobSpec) DeepCopyInto(out *JobSpec) {
*out = *in
if in.Input != nil {
in, out := &in.Input, &out.Input
*out = new(VolumeSpec)
(*in).DeepCopyInto(*out)
}
if in.Output != nil {
in, out := &in.Output, &out.Output
*out = new(VolumeSpec)
(*in).DeepCopyInto(*out)
}
if in.Tasks != nil {
in, out := &in.Tasks, &out.Tasks
*out = make([]TaskSpec, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.Policies != nil {
in, out := &in.Policies, &out.Policies
*out = make([]LifecyclePolicy, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.Plugins != nil {
in, out := &in.Plugins, &out.Plugins
*out = make(map[string][]string, len(*in))
for key, val := range *in {
var outVal []string
if val == nil {
(*out)[key] = nil
} else {
in, out := &val, &outVal
*out = make([]string, len(*in))
copy(*out, *in)
}
(*out)[key] = outVal
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobSpec.
func (in *JobSpec) DeepCopy() *JobSpec {
if in == nil {
return nil
}
out := new(JobSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *JobState) DeepCopyInto(out *JobState) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobState.
func (in *JobState) DeepCopy() *JobState {
if in == nil {
return nil
}
out := new(JobState)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *JobStatus) DeepCopyInto(out *JobStatus) {
*out = *in
out.State = in.State
if in.ControlledResources != nil {
in, out := &in.ControlledResources, &out.ControlledResources
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobStatus.
func (in *JobStatus) DeepCopy() *JobStatus {
if in == nil {
return nil
}
out := new(JobStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LifecyclePolicy) DeepCopyInto(out *LifecyclePolicy) {
*out = *in
if in.ExitCode != nil {
in, out := &in.ExitCode, &out.ExitCode
*out = new(int32)
**out = **in
}
if in.Timeout != nil {
in, out := &in.Timeout, &out.Timeout
*out = new(v1.Duration)
**out = **in
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LifecyclePolicy.
func (in *LifecyclePolicy) DeepCopy() *LifecyclePolicy {
if in == nil {
return nil
}
out := new(LifecyclePolicy)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *TaskSpec) DeepCopyInto(out *TaskSpec) {
*out = *in
in.Template.DeepCopyInto(&out.Template)
if in.Policies != nil {
in, out := &in.Policies, &out.Policies
*out = make([]LifecyclePolicy, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TaskSpec.
func (in *TaskSpec) DeepCopy() *TaskSpec {
if in == nil {
return nil
}
out := new(TaskSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *VolumeSpec) DeepCopyInto(out *VolumeSpec) {
*out = *in
if in.VolumeClaim != nil {
in, out := &in.VolumeClaim, &out.VolumeClaim
*out = new(corev1.PersistentVolumeClaimSpec)
(*in).DeepCopyInto(*out)
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VolumeSpec.
func (in *VolumeSpec) DeepCopy() *VolumeSpec {
if in == nil {
return nil
}
out := new(VolumeSpec)
in.DeepCopyInto(out)
return out
}

View File

@ -1,18 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// +k8s:deepcopy-gen=package
package v1alpha1

View File

@ -1,50 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
var (
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme
)
// GroupName is the group name used in this package.
const GroupName = "bus.volcano.sh"
// SchemeGroupVersion is the group version used to register these objects.
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1alpha1"}
// Resource takes an unqualified resource and returns a Group-qualified GroupResource.
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
// addKnownTypes adds the set of types defined in this package to the supplied scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&Command{},
&CommandList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}

View File

@ -1,50 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type Command struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
// Action defines the action that will be took to the target object.
Action string `json:"action,omitempty" protobuf:"bytes,2,opt,name=action"`
// TargetObject defines the target object of this command.
TargetObject *metav1.OwnerReference `json:"target,omitempty" protobuf:"bytes,3,opt,name=target"`
// Unique, one-word, CamelCase reason for this command.
// +optional
Reason string `json:"reason,omitempty" protobuf:"bytes,4,opt,name=reason"`
// Human-readable message indicating details of this command.
// +optional
Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type CommandList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
Items []Command `json:"items" protobuf:"bytes,2,rep,name=items"`
}

View File

@ -1,90 +0,0 @@
// +build !ignore_autogenerated
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by deepcopy-gen. DO NOT EDIT.
package v1alpha1
import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Command) DeepCopyInto(out *Command) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
if in.TargetObject != nil {
in, out := &in.TargetObject, &out.TargetObject
*out = new(v1.OwnerReference)
(*in).DeepCopyInto(*out)
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Command.
func (in *Command) DeepCopy() *Command {
if in == nil {
return nil
}
out := new(Command)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *Command) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *CommandList) DeepCopyInto(out *CommandList) {
*out = *in
out.TypeMeta = in.TypeMeta
out.ListMeta = in.ListMeta
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]Command, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CommandList.
func (in *CommandList) DeepCopy() *CommandList {
if in == nil {
return nil
}
out := new(CommandList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *CommandList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}

View File

@ -1,28 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package env
const (
// ConfigMapTaskHostFmt is task's ConfigMap host format
ConfigMapTaskHostFmt = "%s.host"
// ConfigMapMountPath is configMap mount path
ConfigMapMountPath = "/etc/volcano"
// TaskVkIndex is task's vk index
TaskVkIndex = "VK_TASK_INDEX"
)

View File

@ -1,33 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ssh
const (
// SSHPrivateKey is private key file name
SSHPrivateKey = "id_rsa"
// SSHPublicKey is public key file name
SSHPublicKey = "id_rsa.pub"
// SSHAuthorizedKeys is authorized key file name
SSHAuthorizedKeys = "authorized_keys"
// SSHConfig is ssh config file name
SSHConfig = "config"
// SSHAbsolutePath absolute path for ssh folder
SSHAbsolutePath = "/root/.ssh"
// SSHRelativePath relative path for ssh folder
SSHRelativePath = ".ssh"
)

View File

@ -1,39 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
vkv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/batch/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/controllers/apis"
)
type abortedState struct {
job *apis.JobInfo
}
func (as *abortedState) Execute(action vkv1.Action) error {
switch action {
case vkv1.ResumeJobAction:
return SyncJob(as.job, func(status vkv1.JobStatus) vkv1.JobState {
return vkv1.JobState{
Phase: vkv1.Restarting,
}
})
default:
return KillJob(as.job, nil)
}
}

View File

@ -1,51 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
vkv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/batch/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/controllers/apis"
)
type abortingState struct {
job *apis.JobInfo
}
func (ps *abortingState) Execute(action vkv1.Action) error {
switch action {
case vkv1.ResumeJobAction:
// Already in Restarting phase, just sync it
return SyncJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
return vkv1.JobState{
Phase: vkv1.Restarting,
}
})
default:
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
// If any "alive" pods, still in Aborting phase
if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 {
return vkv1.JobState{
Phase: vkv1.Aborting,
}
}
return vkv1.JobState{
Phase: vkv1.Aborted,
}
})
}
}

View File

@ -1,41 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
vkv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/batch/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/controllers/apis"
)
type completingState struct {
job *apis.JobInfo
}
func (ps *completingState) Execute(action vkv1.Action) error {
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
// If any "alive" pods, still in Completing phase
if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 {
return vkv1.JobState{
Phase: vkv1.Completing,
}
}
return vkv1.JobState{
Phase: vkv1.Completed,
}
})
}

View File

@ -1,67 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
vkv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/batch/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/controllers/apis"
)
// NextStateFn returns jobState
type NextStateFn func(status vkv1.JobStatus) vkv1.JobState
// ActionFn is a function
type ActionFn func(job *apis.JobInfo, fn NextStateFn) error
var (
// SyncJob will create or delete Pods according to Job's spec.
SyncJob ActionFn
// KillJob kill all Pods of Job.
KillJob ActionFn
)
// State is a interface
type State interface {
// Execute executes the actions based on current state.
Execute(act vkv1.Action) error
}
// NewState returns state of the job
func NewState(jobInfo *apis.JobInfo) State {
job := jobInfo.Job
switch job.Status.State.Phase {
case vkv1.Pending:
return &pendingState{job: jobInfo}
case vkv1.Running:
return &runningState{job: jobInfo}
case vkv1.Restarting:
return &restartingState{job: jobInfo}
case vkv1.Terminated, vkv1.Completed:
return &finishedState{job: jobInfo}
case vkv1.Terminating:
return &terminatingState{job: jobInfo}
case vkv1.Aborting:
return &abortingState{job: jobInfo}
case vkv1.Aborted:
return &abortedState{job: jobInfo}
case vkv1.Completing:
return &completingState{job: jobInfo}
}
// It's pending by default.
return &pendingState{job: jobInfo}
}

View File

@ -1,31 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
vkv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/batch/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/controllers/apis"
)
type finishedState struct {
job *apis.JobInfo
}
func (ps *finishedState) Execute(action vkv1.Action) error {
// In finished state, e.g. Completed, always kill the whole job.
return KillJob(ps.job, nil)
}

View File

@ -1,76 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
vkv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/batch/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/controllers/apis"
)
type pendingState struct {
job *apis.JobInfo
}
func (ps *pendingState) Execute(action vkv1.Action) error {
switch action {
case vkv1.RestartJobAction:
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
phase := vkv1.Pending
if status.Terminating != 0 {
phase = vkv1.Restarting
}
return vkv1.JobState{
Phase: phase,
}
})
case vkv1.AbortJobAction:
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
phase := vkv1.Pending
if status.Terminating != 0 {
phase = vkv1.Aborting
}
return vkv1.JobState{
Phase: phase,
}
})
case vkv1.CompleteJobAction:
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
phase := vkv1.Completed
if status.Terminating != 0 {
phase = vkv1.Completing
}
return vkv1.JobState{
Phase: phase,
}
})
default:
return SyncJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
phase := vkv1.Pending
if ps.job.Job.Spec.MinAvailable <= status.Running {
phase = vkv1.Running
}
return vkv1.JobState{
Phase: phase,
}
})
}
}

View File

@ -1,44 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
vkv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/batch/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/controllers/apis"
)
type restartingState struct {
job *apis.JobInfo
}
func (ps *restartingState) Execute(action vkv1.Action) error {
return SyncJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
phase := vkv1.Restarting
if status.Terminating == 0 {
if status.Running >= ps.job.Job.Spec.MinAvailable {
phase = vkv1.Running
} else {
phase = vkv1.Pending
}
}
return vkv1.JobState{
Phase: phase,
}
})
}

View File

@ -1,86 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
vkv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/batch/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/controllers/apis"
)
type runningState struct {
job *apis.JobInfo
}
func (ps *runningState) Execute(action vkv1.Action) error {
switch action {
case vkv1.RestartJobAction:
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
phase := vkv1.Running
if status.Terminating != 0 {
phase = vkv1.Restarting
}
return vkv1.JobState{
Phase: phase,
}
})
case vkv1.AbortJobAction:
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
phase := vkv1.Running
if status.Terminating != 0 {
phase = vkv1.Aborting
}
return vkv1.JobState{
Phase: phase,
}
})
case vkv1.TerminateJobAction:
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
phase := vkv1.Running
if status.Terminating != 0 {
phase = vkv1.Terminating
}
return vkv1.JobState{
Phase: phase,
}
})
case vkv1.CompleteJobAction:
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
phase := vkv1.Completed
if status.Terminating != 0 {
phase = vkv1.Completing
}
return vkv1.JobState{
Phase: phase,
}
})
default:
return SyncJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
phase := vkv1.Running
if status.Succeeded+status.Failed == TotalTasks(ps.job.Job) {
phase = vkv1.Completed
}
return vkv1.JobState{
Phase: phase,
}
})
}
}

View File

@ -1,41 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
vkv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/batch/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/controllers/apis"
)
type terminatingState struct {
job *apis.JobInfo
}
func (ps *terminatingState) Execute(action vkv1.Action) error {
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
// If any "alive" pods, still in Terminating phase
if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 {
return vkv1.JobState{
Phase: vkv1.Terminating,
}
}
return vkv1.JobState{
Phase: vkv1.Terminated,
}
})
}

View File

@ -1,32 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
vkv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/batch/v1alpha1"
)
// TotalTasks returns total number of tasks in a job
func TotalTasks(job *vkv1.Job) int32 {
var rep int32
for _, task := range job.Spec.Tasks {
rep += task.Replicas
}
return rep
}

View File

@ -1,156 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/api/errors"
"github.com/kubernetes-sigs/kube-batch/pkg/apis/batch/v1alpha1"
)
var _ = Describe("Job E2E Test: Test Admission service", func() {
It("Duplicated Task Name", func() {
jobName := "job-duplicated"
namespace := "test"
context := initTestContext()
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU)
_, err := createJobInner(context, &jobSpec{
namespace: namespace,
name: jobName,
tasks: []taskSpec{
{
img: defaultNginxImage,
req: oneCPU,
min: rep,
rep: rep,
name: "duplicated",
},
{
img: defaultNginxImage,
req: oneCPU,
min: rep,
rep: rep,
name: "duplicated",
},
},
})
Expect(err).To(HaveOccurred())
stError, ok := err.(*errors.StatusError)
Expect(ok).To(Equal(true))
Expect(stError.ErrStatus.Code).To(Equal(int32(500)))
Expect(stError.ErrStatus.Message).To(ContainSubstring("duplicated task name"))
})
It("Duplicated Policy Event", func() {
jobName := "job-policy-duplicated"
namespace := "test"
context := initTestContext()
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU)
_, err := createJobInner(context, &jobSpec{
namespace: namespace,
name: jobName,
tasks: []taskSpec{
{
img: defaultNginxImage,
req: oneCPU,
min: rep,
rep: rep,
name: "taskname",
},
},
policies: []v1alpha1.LifecyclePolicy{
{
Event: v1alpha1.PodFailedEvent,
Action: v1alpha1.AbortJobAction,
},
{
Event: v1alpha1.PodFailedEvent,
Action: v1alpha1.RestartJobAction,
},
},
})
Expect(err).To(HaveOccurred())
stError, ok := err.(*errors.StatusError)
Expect(ok).To(Equal(true))
Expect(stError.ErrStatus.Code).To(Equal(int32(500)))
Expect(stError.ErrStatus.Message).To(ContainSubstring("duplicate event PodFailed"))
})
It("Min Available illegal", func() {
jobName := "job-min-illegal"
namespace := "test"
context := initTestContext()
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU)
_, err := createJobInner(context, &jobSpec{
min: rep * 2,
namespace: namespace,
name: jobName,
tasks: []taskSpec{
{
img: defaultNginxImage,
req: oneCPU,
min: rep,
rep: rep,
name: "taskname",
},
},
})
Expect(err).To(HaveOccurred())
stError, ok := err.(*errors.StatusError)
Expect(ok).To(Equal(true))
Expect(stError.ErrStatus.Code).To(Equal(int32(500)))
Expect(stError.ErrStatus.Message).To(ContainSubstring("'minAvailable' should not be greater than total replicas in tasks"))
})
It("Job Plugin illegal", func() {
jobName := "job-plugin-illegal"
namespace := "test"
context := initTestContext()
defer cleanupTestContext(context)
_, err := createJobInner(context, &jobSpec{
min: 1,
namespace: namespace,
name: jobName,
plugins: map[string][]string{
"big_plugin": {},
},
tasks: []taskSpec{
{
img: defaultNginxImage,
req: oneCPU,
min: 1,
rep: 1,
name: "taskname",
},
},
})
Expect(err).To(HaveOccurred())
stError, ok := err.(*errors.StatusError)
Expect(ok).To(Equal(true))
Expect(stError.ErrStatus.Code).To(Equal(int32(500)))
Expect(stError.ErrStatus.Message).To(ContainSubstring("unable to find job plugin: big_plugin"))
})
})

View File

@ -1,64 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"fmt"
"os/exec"
"strings"
. "github.com/onsi/gomega"
)
func ResumeJob(name string, namespace string) string {
command := []string{"job", "resume"}
Expect(name).NotTo(Equal(""), "Job name should not be empty in Resume job command")
command = append(command, "--name", name)
if namespace != "" {
command = append(command, "--namespace", namespace)
}
return RunCliCommand(command)
}
func SuspendJob(name string, namespace string) string {
command := []string{"job", "suspend"}
Expect(name).NotTo(Equal(""), "Job name should not be empty in Suspend job command")
command = append(command, "--name", name)
if namespace != "" {
command = append(command, "--namespace", namespace)
}
return RunCliCommand(command)
}
func ListJobs(namespace string) string {
command := []string{"job", "list"}
if namespace != "" {
command = append(command, "--namespace", namespace)
}
return RunCliCommand(command)
}
func RunCliCommand(command []string) string {
if masterURL() != "" {
command = append(command, "--master", masterURL())
}
command = append(command, "--kubeconfig", kubeconfigPath(homeDir()))
output, err := exec.Command(VolcanoCliBinary(), command...).Output()
Expect(err).NotTo(HaveOccurred(),
fmt.Sprintf("Command %s failed to execute: %s", strings.Join(command, ""), err))
return string(output)
}

View File

@ -1,155 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"bytes"
"fmt"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctlJob "github.com/kubernetes-sigs/kube-batch/pkg/cli/job"
jobUtil "github.com/kubernetes-sigs/kube-batch/pkg/controllers/job"
)
var _ = Describe("Job E2E Test: Test Job Command", func() {
It("List running jobs", func() {
var outBuffer bytes.Buffer
jobName := "test-job"
namespace := "test"
context := initTestContext()
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU)
job := createJob(context, &jobSpec{
namespace: namespace,
name: jobName,
tasks: []taskSpec{
{
img: defaultNginxImage,
req: oneCPU,
min: rep,
rep: rep,
},
},
})
//Pod is running
err := waitJobReady(context, job)
Expect(err).NotTo(HaveOccurred())
//Job Status is running
err = waitJobStateReady(context, job)
Expect(err).NotTo(HaveOccurred())
//Command outputs are identical
outputs := ListJobs(namespace)
jobs, err := context.vkclient.BatchV1alpha1().Jobs(namespace).List(metav1.ListOptions{})
ctlJob.PrintJobs(jobs, &outBuffer)
Expect(outputs).To(Equal(outBuffer.String()), "List command result should be:\n %s",
outBuffer.String())
})
It("Suspend running job&Resume aborted job", func() {
jobName := "test-suspend-running-job"
taskName := "long-live-task"
namespace := "test"
context := initTestContext()
defer cleanupTestContext(context)
job := createJob(context, &jobSpec{
namespace: namespace,
name: jobName,
tasks: []taskSpec{
{
name: taskName,
img: defaultNginxImage,
min: 1,
rep: 1,
},
},
})
//Job is running
err := waitJobReady(context, job)
Expect(err).NotTo(HaveOccurred())
err = waitJobStateReady(context, job)
Expect(err).NotTo(HaveOccurred())
//Suspend job and wait status change
SuspendJob(jobName, namespace)
err = waitJobStateAborted(context, job)
Expect(err).NotTo(HaveOccurred())
//Pod is gone
podName := jobUtil.MakePodName(jobName, taskName, 0)
_, err = context.kubeclient.CoreV1().Pods(namespace).Get(podName, metav1.GetOptions{})
Expect(apierrors.IsNotFound(err)).To(BeTrue(),
"Job related pod should be deleted when aborting job.")
//Resume job
ResumeJob(jobName, namespace)
//Job is running again
err = waitJobReady(context, job)
Expect(err).NotTo(HaveOccurred())
err = waitJobStateReady(context, job)
Expect(err).NotTo(HaveOccurred())
})
It("Suspend pending job", func() {
context := initTestContext()
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU) * 2
jobName := "test-suspend-pending-job"
namespace := "test"
taskName := "long-live-task"
job := createJob(context, &jobSpec{
namespace: namespace,
name: jobName,
tasks: []taskSpec{
{
name: taskName,
img: defaultNginxImage,
req: cpuResource(fmt.Sprintf("%dm", 1000*rep)),
min: 1,
rep: 1,
},
},
})
//Job is pending
err := waitJobPending(context, job)
Expect(err).NotTo(HaveOccurred())
err = waitJobStatePending(context, job)
Expect(err).NotTo(HaveOccurred())
//Suspend job and wait status change
SuspendJob(jobName, namespace)
err = waitJobStateAborted(context, job)
Expect(err).NotTo(HaveOccurred())
//Pod is gone
podName := jobUtil.MakePodName(jobName, taskName, 0)
_, err = context.kubeclient.CoreV1().Pods(namespace).Get(podName, metav1.GetOptions{})
Expect(apierrors.IsNotFound(err)).To(BeTrue(),
"Job related pod should be deleted when job aborted.")
})
})

View File

@ -25,5 +25,5 @@ import (
func TestE2E(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Volcano Scheduler Test Suite")
RunSpecs(t, "kube-batch Test Suite")
}

417
test/e2e/job.go Normal file
View File

@ -0,0 +1,417 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("Job E2E Test", func() {
It("Schedule Job", func() {
context := initTestContext()
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU)
_, pg := createJob(context, &jobSpec{
name: "qj-1",
tasks: []taskSpec{
{
img: "busybox",
req: oneCPU,
min: 2,
rep: rep,
},
},
})
err := waitPodGroupReady(context, pg)
checkError(context, err)
})
It("Schedule Multiple Jobs", func() {
context := initTestContext()
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU)
job := &jobSpec{
tasks: []taskSpec{
{
img: "busybox",
req: oneCPU,
min: 2,
rep: rep,
},
},
}
job.name = "mqj-1"
_, pg1 := createJob(context, job)
job.name = "mqj-2"
_, pg2 := createJob(context, job)
job.name = "mqj-3"
_, pg3 := createJob(context, job)
err := waitPodGroupReady(context, pg1)
checkError(context, err)
err = waitPodGroupReady(context, pg2)
checkError(context, err)
err = waitPodGroupReady(context, pg3)
checkError(context, err)
})
It("Gang scheduling", func() {
context := initTestContext()
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU)/2 + 1
replicaset := createReplicaSet(context, "rs-1", rep, "nginx", oneCPU)
err := waitReplicaSetReady(context, replicaset.Name)
checkError(context, err)
job := &jobSpec{
name: "gang-qj",
namespace: "test",
tasks: []taskSpec{
{
img: "busybox",
req: oneCPU,
min: rep,
rep: rep,
},
},
}
_, pg := createJob(context, job)
err = waitPodGroupPending(context, pg)
checkError(context, err)
err = waitPodGroupUnschedulable(context, pg)
checkError(context, err)
err = deleteReplicaSet(context, replicaset.Name)
checkError(context, err)
err = waitPodGroupReady(context, pg)
checkError(context, err)
})
It("Gang scheduling: Full Occupied", func() {
context := initTestContext()
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU)
job := &jobSpec{
namespace: "test",
tasks: []taskSpec{
{
img: "nginx",
req: oneCPU,
min: rep,
rep: rep,
},
},
}
job.name = "gang-fq-qj1"
_, pg1 := createJob(context, job)
err := waitPodGroupReady(context, pg1)
checkError(context, err)
job.name = "gang-fq-qj2"
_, pg2 := createJob(context, job)
err = waitPodGroupPending(context, pg2)
checkError(context, err)
err = waitPodGroupReady(context, pg1)
checkError(context, err)
})
It("Preemption", func() {
context := initTestContext()
defer cleanupTestContext(context)
slot := oneCPU
rep := clusterSize(context, slot)
job := &jobSpec{
tasks: []taskSpec{
{
img: "nginx",
req: slot,
min: 1,
rep: rep,
},
},
}
job.name = "preemptee-qj"
_, pg1 := createJob(context, job)
err := waitTasksReady(context, pg1, int(rep))
checkError(context, err)
job.name = "preemptor-qj"
_, pg2 := createJob(context, job)
err = waitTasksReady(context, pg1, int(rep)/2)
checkError(context, err)
err = waitTasksReady(context, pg2, int(rep)/2)
checkError(context, err)
})
It("Multiple Preemption", func() {
context := initTestContext()
defer cleanupTestContext(context)
slot := oneCPU
rep := clusterSize(context, slot)
job := &jobSpec{
tasks: []taskSpec{
{
img: "nginx",
req: slot,
min: 1,
rep: rep,
},
},
}
job.name = "preemptee-qj"
_, pg1 := createJob(context, job)
err := waitTasksReady(context, pg1, int(rep))
checkError(context, err)
job.name = "preemptor-qj1"
_, pg2 := createJob(context, job)
checkError(context, err)
job.name = "preemptor-qj2"
_, pg3 := createJob(context, job)
checkError(context, err)
err = waitTasksReady(context, pg1, int(rep)/3)
checkError(context, err)
err = waitTasksReady(context, pg2, int(rep)/3)
checkError(context, err)
err = waitTasksReady(context, pg3, int(rep)/3)
checkError(context, err)
})
It("Schedule BestEffort Job", func() {
context := initTestContext()
defer cleanupTestContext(context)
slot := oneCPU
rep := clusterSize(context, slot)
job := &jobSpec{
name: "test",
tasks: []taskSpec{
{
img: "nginx",
req: slot,
min: 2,
rep: rep,
},
{
img: "nginx",
min: 2,
rep: rep / 2,
},
},
}
_, pg := createJob(context, job)
err := waitPodGroupReady(context, pg)
checkError(context, err)
})
It("Statement", func() {
context := initTestContext()
defer cleanupTestContext(context)
slot := oneCPU
rep := clusterSize(context, slot)
job := &jobSpec{
namespace: "test",
tasks: []taskSpec{
{
img: "nginx",
req: slot,
min: rep,
rep: rep,
},
},
}
job.name = "st-qj-1"
_, pg1 := createJob(context, job)
err := waitPodGroupReady(context, pg1)
checkError(context, err)
now := time.Now()
job.name = "st-qj-2"
_, pg2 := createJob(context, job)
err = waitPodGroupUnschedulable(context, pg2)
checkError(context, err)
// No preemption event
evicted, err := podGroupEvicted(context, pg1, now)()
checkError(context, err)
Expect(evicted).NotTo(BeTrue())
})
It("TaskPriority", func() {
context := initTestContext()
defer cleanupTestContext(context)
slot := oneCPU
rep := clusterSize(context, slot)
replicaset := createReplicaSet(context, "rs-1", rep/2, "nginx", slot)
err := waitReplicaSetReady(context, replicaset.Name)
checkError(context, err)
_, pg := createJob(context, &jobSpec{
name: "multi-pod-job",
tasks: []taskSpec{
{
img: "nginx",
pri: workerPriority,
min: rep/2 - 1,
rep: rep,
req: slot,
},
{
img: "nginx",
pri: masterPriority,
min: 1,
rep: 1,
req: slot,
},
},
})
expteced := map[string]int{
masterPriority: 1,
workerPriority: int(rep/2) - 1,
}
err = waitTasksReadyEx(context, pg, expteced)
checkError(context, err)
})
It("Try to fit unassigned task with different resource requests in one loop", func() {
context := initTestContext()
defer cleanupTestContext(context)
slot := oneCPU
rep := clusterSize(context, slot)
minMemberOverride := int32(1)
replicaset := createReplicaSet(context, "rs-1", rep-1, "nginx", slot)
err := waitReplicaSetReady(context, replicaset.Name)
checkError(context, err)
_, pg := createJob(context, &jobSpec{
name: "multi-task-diff-resource-job",
tasks: []taskSpec{
{
img: "nginx",
pri: masterPriority,
min: 1,
rep: 1,
req: twoCPU,
},
{
img: "nginx",
pri: workerPriority,
min: 1,
rep: 1,
req: halfCPU,
},
},
minMember: &minMemberOverride,
})
err = waitPodGroupPending(context, pg)
checkError(context, err)
// task_1 has been scheduled
err = waitTasksReady(context, pg, int(minMemberOverride))
checkError(context, err)
})
It("Job Priority", func() {
context := initTestContext()
defer cleanupTestContext(context)
slot := oneCPU
rep := clusterSize(context, slot)
replicaset := createReplicaSet(context, "rs-1", rep, "nginx", slot)
err := waitReplicaSetReady(context, replicaset.Name)
checkError(context, err)
job1 := &jobSpec{
name: "pri-job-1",
pri: workerPriority,
tasks: []taskSpec{
{
img: "nginx",
req: oneCPU,
min: rep/2 + 1,
rep: rep,
},
},
}
job2 := &jobSpec{
name: "pri-job-2",
pri: masterPriority,
tasks: []taskSpec{
{
img: "nginx",
req: oneCPU,
min: rep/2 + 1,
rep: rep,
},
},
}
createJob(context, job1)
_, pg2 := createJob(context, job2)
// Delete ReplicaSet
err = deleteReplicaSet(context, replicaset.Name)
checkError(context, err)
err = waitPodGroupReady(context, pg2)
checkError(context, err)
})
})

View File

@ -1,288 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("Job E2E Test", func() {
It("Schedule Job", func() {
context := initTestContext()
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU)
job := createJob(context, &jobSpec{
name: "qj-1",
tasks: []taskSpec{
{
img: defaultBusyBoxImage,
req: oneCPU,
min: 2,
rep: rep,
},
},
})
err := waitJobReady(context, job)
Expect(err).NotTo(HaveOccurred())
})
It("Schedule Multiple Jobs", func() {
context := initTestContext()
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU)
job := &jobSpec{
tasks: []taskSpec{
{
img: defaultBusyBoxImage,
req: oneCPU,
min: 2,
rep: rep,
},
},
}
job.name = "mqj-1"
job1 := createJob(context, job)
job.name = "mqj-2"
job2 := createJob(context, job)
job.name = "mqj-3"
job3 := createJob(context, job)
err := waitJobReady(context, job1)
Expect(err).NotTo(HaveOccurred())
err = waitJobReady(context, job2)
Expect(err).NotTo(HaveOccurred())
err = waitJobReady(context, job3)
Expect(err).NotTo(HaveOccurred())
})
It("Gang scheduling", func() {
context := initTestContext()
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU)/2 + 1
replicaset := createReplicaSet(context, "rs-1", rep, defaultNginxImage, oneCPU)
err := waitReplicaSetReady(context, replicaset.Name)
Expect(err).NotTo(HaveOccurred())
jobSpec := &jobSpec{
name: "gang-qj",
namespace: "test",
tasks: []taskSpec{
{
img: defaultBusyBoxImage,
req: oneCPU,
min: rep,
rep: rep,
},
},
}
job := createJob(context, jobSpec)
err = waitJobPending(context, job)
Expect(err).NotTo(HaveOccurred())
err = waitJobUnschedulable(context, job)
Expect(err).NotTo(HaveOccurred())
err = deleteReplicaSet(context, replicaset.Name)
Expect(err).NotTo(HaveOccurred())
err = waitJobReady(context, job)
Expect(err).NotTo(HaveOccurred())
})
It("Gang scheduling: Full Occupied", func() {
context := initTestContext()
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU)
job := &jobSpec{
namespace: "test",
tasks: []taskSpec{
{
img: defaultNginxImage,
req: oneCPU,
min: rep,
rep: rep,
},
},
}
job.name = "gang-fq-qj1"
job1 := createJob(context, job)
err := waitJobReady(context, job1)
Expect(err).NotTo(HaveOccurred())
job.name = "gang-fq-qj2"
job2 := createJob(context, job)
err = waitJobPending(context, job2)
Expect(err).NotTo(HaveOccurred())
err = waitJobReady(context, job1)
Expect(err).NotTo(HaveOccurred())
})
It("Preemption", func() {
context := initTestContext()
defer cleanupTestContext(context)
slot := oneCPU
rep := clusterSize(context, slot)
job := &jobSpec{
tasks: []taskSpec{
{
img: defaultNginxImage,
req: slot,
min: 1,
rep: rep,
},
},
}
job.name = "preemptee-qj"
job1 := createJob(context, job)
err := waitTasksReady(context, job1, int(rep))
Expect(err).NotTo(HaveOccurred())
job.name = "preemptor-qj"
job2 := createJob(context, job)
err = waitTasksReady(context, job1, int(rep)/2)
Expect(err).NotTo(HaveOccurred())
err = waitTasksReady(context, job2, int(rep)/2)
Expect(err).NotTo(HaveOccurred())
})
It("Multiple Preemption", func() {
context := initTestContext()
defer cleanupTestContext(context)
slot := oneCPU
rep := clusterSize(context, slot)
job := &jobSpec{
tasks: []taskSpec{
{
img: defaultNginxImage,
req: slot,
min: 1,
rep: rep,
},
},
}
job.name = "multipreemptee-qj"
job1 := createJob(context, job)
err := waitTasksReady(context, job1, int(rep))
Expect(err).NotTo(HaveOccurred())
job.name = "multipreemptor-qj1"
job2 := createJob(context, job)
Expect(err).NotTo(HaveOccurred())
job.name = "multipreemptor-qj2"
job3 := createJob(context, job)
Expect(err).NotTo(HaveOccurred())
err = waitTasksReady(context, job1, int(rep)/3)
Expect(err).NotTo(HaveOccurred())
err = waitTasksReady(context, job2, int(rep)/3)
Expect(err).NotTo(HaveOccurred())
err = waitTasksReady(context, job3, int(rep)/3)
Expect(err).NotTo(HaveOccurred())
})
It("Schedule BestEffort Job", func() {
context := initTestContext()
defer cleanupTestContext(context)
slot := oneCPU
rep := clusterSize(context, slot)
spec := &jobSpec{
name: "test",
tasks: []taskSpec{
{
img: defaultNginxImage,
req: slot,
min: 2,
rep: rep,
},
{
img: defaultNginxImage,
min: 2,
rep: rep / 2,
},
},
}
job := createJob(context, spec)
err := waitJobReady(context, job)
Expect(err).NotTo(HaveOccurred())
})
It("Statement", func() {
context := initTestContext()
defer cleanupTestContext(context)
slot := oneCPU
rep := clusterSize(context, slot)
spec := &jobSpec{
namespace: "test",
tasks: []taskSpec{
{
img: defaultNginxImage,
req: slot,
min: rep,
rep: rep,
},
},
}
spec.name = "st-qj-1"
job1 := createJob(context, spec)
err := waitJobReady(context, job1)
Expect(err).NotTo(HaveOccurred())
now := time.Now()
spec.name = "st-qj-2"
job2 := createJob(context, spec)
err = waitJobUnschedulable(context, job2)
Expect(err).NotTo(HaveOccurred())
// No preemption event
evicted, err := jobEvicted(context, job1, now)()
Expect(err).NotTo(HaveOccurred())
Expect(evicted).NotTo(BeTrue())
})
})

View File

@ -1,4 +0,0 @@
These tests in this folder utilize the original ``Job`` object in kubernetes
instead of our ``jobs.batch.volcano.sh`` object, once the depended features
priorityClass is supported in our Job resource, they should be
upgraded and enabled again.

View File

@ -1,29 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube_batch
import (
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestE2E(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "volcano Test Suite")
}

View File

@ -1,152 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube_batch
import (
. "github.com/onsi/ginkgo"
)
var _ = Describe("Job E2E Test", func() {
It("TaskPriority", func() {
context := initTestContext()
defer cleanupTestContext(context)
slot := oneCPU
rep := clusterSize(context, slot)
replicaset := createReplicaSet(context, "rs-1", rep/2, "nginx", slot)
err := waitReplicaSetReady(context, replicaset.Name)
checkError(context, err)
_, pg := createJob(context, &jobSpec{
name: "multi-pod-job",
tasks: []taskSpec{
{
img: "nginx",
pri: workerPriority,
min: rep/2 - 1,
rep: rep,
req: slot,
},
{
img: "nginx",
pri: masterPriority,
min: 1,
rep: 1,
req: slot,
},
},
})
expteced := map[string]int{
masterPriority: 1,
workerPriority: int(rep/2) - 1,
}
err = waitTasksReadyEx(context, pg, expteced)
checkError(context, err)
})
It("Try to fit unassigned task with different resource requests in one loop", func() {
context := initTestContext()
defer cleanupTestContext(context)
slot := oneCPU
rep := clusterSize(context, slot)
minMemberOverride := int32(1)
replicaset := createReplicaSet(context, "rs-1", rep-1, "nginx", slot)
err := waitReplicaSetReady(context, replicaset.Name)
checkError(context, err)
_, pg := createJob(context, &jobSpec{
name: "multi-task-diff-resource-job",
tasks: []taskSpec{
{
img: "nginx",
pri: masterPriority,
min: 1,
rep: 1,
req: twoCPU,
},
{
img: "nginx",
pri: workerPriority,
min: 1,
rep: 1,
req: halfCPU,
},
},
minMember: &minMemberOverride,
})
err = waitPodGroupPending(context, pg)
checkError(context, err)
// task_1 has been scheduled
err = waitTasksReady(context, pg, int(minMemberOverride))
checkError(context, err)
})
It("Job Priority", func() {
context := initTestContext()
defer cleanupTestContext(context)
slot := oneCPU
rep := clusterSize(context, slot)
replicaset := createReplicaSet(context, "rs-1", rep, "nginx", slot)
err := waitReplicaSetReady(context, replicaset.Name)
checkError(context, err)
job1 := &jobSpec{
name: "pri-job-1",
pri: workerPriority,
tasks: []taskSpec{
{
img: "nginx",
req: oneCPU,
min: rep/2 + 1,
rep: rep,
},
},
}
job2 := &jobSpec{
name: "pri-job-2",
pri: masterPriority,
tasks: []taskSpec{
{
img: "nginx",
req: oneCPU,
min: rep/2 + 1,
rep: rep,
},
},
}
createJob(context, job1)
_, pg2 := createJob(context, job2)
// Delete ReplicaSet
err = deleteReplicaSet(context, replicaset.Name)
checkError(context, err)
err = waitPodGroupReady(context, pg2)
checkError(context, err)
})
})

File diff suppressed because it is too large Load Diff

View File

@ -1,77 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
vkv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/batch/v1alpha1"
)
var _ = Describe("MPI E2E Test", func() {
It("will run and complete finally", func() {
context := initTestContext()
defer cleanupTestContext(context)
slot := oneCPU
spec := &jobSpec{
name: "mpi",
policies: []vkv1.LifecyclePolicy{
{
Action: vkv1.CompleteJobAction,
Event: vkv1.TaskCompletedEvent,
},
},
plugins: map[string][]string{
"ssh": {},
"env": {},
},
tasks: []taskSpec{
{
name: "mpimaster",
img: defaultMPIImage,
req: slot,
min: 1,
rep: 1,
workingDir: "/home",
//Need sometime waiting for worker node ready
command: `sleep 5;
mkdir -p /var/run/sshd; /usr/sbin/sshd;
mpiexec --allow-run-as-root --hostfile /etc/volcano/mpiworker.host -np 2 mpi_hello_world > /home/re`,
},
{
name: "mpiworker",
img: defaultMPIImage,
req: slot,
min: 2,
rep: 2,
workingDir: "/home",
command: "mkdir -p /var/run/sshd; /usr/sbin/sshd -D;",
},
},
}
job := createJob(context, spec)
err := waitJobStates(context, job, []vkv1.JobPhase{
vkv1.Pending, vkv1.Running, vkv1.Completing, vkv1.Completed})
Expect(err).NotTo(HaveOccurred())
})
})

238
test/e2e/nodeorder.go Normal file
View File

@ -0,0 +1,238 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeletapi "k8s.io/kubernetes/pkg/kubelet/apis"
)
var _ = Describe("NodeOrder E2E Test", func() {
It("Node Affinity Test", func() {
context := initTestContext()
defer cleanupTestContext(context)
nodeNames := getAllWorkerNodes(context)
var preferredSchedulingTermSlice []v1.PreferredSchedulingTerm
nodeSelectorRequirement := v1.NodeSelectorRequirement{Key: kubeletapi.LabelHostname, Operator: v1.NodeSelectorOpIn, Values: []string{nodeNames[0]}}
nodeSelectorTerm := v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{nodeSelectorRequirement}}
schedulingTerm := v1.PreferredSchedulingTerm{Weight: 100, Preference: nodeSelectorTerm}
preferredSchedulingTermSlice = append(preferredSchedulingTermSlice, schedulingTerm)
slot := oneCPU
_, rep := computeNode(context, oneCPU)
Expect(rep).NotTo(Equal(0))
affinity := &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: preferredSchedulingTermSlice,
},
}
job := &jobSpec{
name: "pa-job",
tasks: []taskSpec{
{
img: "nginx",
req: slot,
min: 1,
rep: 1,
affinity: affinity,
},
},
}
_, pg := createJob(context, job)
err := waitPodGroupReady(context, pg)
checkError(context, err)
pods := getPodOfPodGroup(context, pg)
//All pods should be scheduled in particular node
for _, pod := range pods {
Expect(pod.Spec.NodeName).To(Equal(nodeNames[0]))
}
})
It("Pod Affinity Test", func() {
context := initTestContext()
defer cleanupTestContext(context)
var preferredSchedulingTermSlice []v1.WeightedPodAffinityTerm
labelSelectorRequirement := metav1.LabelSelectorRequirement{Key: "test", Operator: metav1.LabelSelectorOpIn, Values: []string{"e2e"}}
labelSelector := &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{labelSelectorRequirement}}
podAffinityTerm := v1.PodAffinityTerm{LabelSelector: labelSelector, TopologyKey: "kubernetes.io/hostname"}
weightedPodAffinityTerm := v1.WeightedPodAffinityTerm{Weight: 100, PodAffinityTerm: podAffinityTerm}
preferredSchedulingTermSlice = append(preferredSchedulingTermSlice, weightedPodAffinityTerm)
labels := make(map[string]string)
labels["test"] = "e2e"
job1 := &jobSpec{
name: "pa-job1",
tasks: []taskSpec{
{
img: "nginx",
req: halfCPU,
min: 1,
rep: 1,
labels: labels,
},
},
}
_, pg1 := createJob(context, job1)
err := waitPodGroupReady(context, pg1)
checkError(context, err)
pods := getPodOfPodGroup(context, pg1)
nodeName := pods[0].Spec.NodeName
affinity := &v1.Affinity{
PodAffinity: &v1.PodAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: preferredSchedulingTermSlice,
},
}
job2 := &jobSpec{
name: "pa-job2",
tasks: []taskSpec{
{
img: "nginx",
req: halfCPU,
min: 1,
rep: 1,
affinity: affinity,
},
},
}
_, pg2 := createJob(context, job2)
err = waitPodGroupReady(context, pg2)
checkError(context, err)
podsWithAffinity := getPodOfPodGroup(context, pg2)
// All Pods Should be Scheduled in same node
nodeNameWithAffinity := podsWithAffinity[0].Spec.NodeName
Expect(nodeNameWithAffinity).To(Equal(nodeName))
})
It("Least Requested Resource Test", func() {
context := initTestContext()
defer cleanupTestContext(context)
nodeNames := getAllWorkerNodes(context)
affinityNodeOne := &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: kubeletapi.LabelHostname,
Operator: v1.NodeSelectorOpIn,
Values: []string{nodeNames[0]},
},
},
},
},
},
},
}
job1 := &jobSpec{
name: "pa-job",
tasks: []taskSpec{
{
img: "nginx",
req: halfCPU,
min: 3,
rep: 3,
affinity: affinityNodeOne,
},
},
}
//Schedule Job in first Node
_, pg1 := createJob(context, job1)
err := waitPodGroupReady(context, pg1)
checkError(context, err)
affinityNodeTwo := &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: kubeletapi.LabelHostname,
Operator: v1.NodeSelectorOpIn,
Values: []string{nodeNames[1]},
},
},
},
},
},
},
}
job2 := &jobSpec{
name: "pa-job1",
tasks: []taskSpec{
{
img: "nginx",
req: halfCPU,
min: 3,
rep: 3,
affinity: affinityNodeTwo,
},
},
}
//Schedule Job in Second Node
_, pg2 := createJob(context, job2)
err = waitPodGroupReady(context, pg2)
checkError(context, err)
testJob := &jobSpec{
name: "pa-test-job",
tasks: []taskSpec{
{
img: "nginx",
req: oneCPU,
min: 1,
rep: 1,
},
},
}
//This job should be scheduled in third node
_, pg3 := createJob(context, testJob)
err = waitPodGroupReady(context, pg3)
checkError(context, err)
pods := getPodOfPodGroup(context, pg3)
for _, pod := range pods {
Expect(pod.Spec.NodeName).NotTo(Equal(nodeNames[0]))
Expect(pod.Spec.NodeName).NotTo(Equal(nodeNames[1]))
}
})
})

View File

@ -22,7 +22,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/api"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
)
var _ = Describe("Predicates E2E Test", func() {
@ -41,7 +41,7 @@ var _ = Describe("Predicates E2E Test", func() {
{
MatchFields: []v1.NodeSelectorRequirement{
{
Key: api.NodeFieldSelectorKeyNodeName,
Key: schedulerapi.NodeFieldSelectorKeyNodeName,
Operator: v1.NodeSelectorOpIn,
Values: []string{nodeName},
},
@ -52,11 +52,11 @@ var _ = Describe("Predicates E2E Test", func() {
},
}
spec := &jobSpec{
name: "na-spec",
job := &jobSpec{
name: "na-job",
tasks: []taskSpec{
{
img: defaultNginxImage,
img: "nginx",
req: slot,
min: 1,
rep: 1,
@ -65,11 +65,11 @@ var _ = Describe("Predicates E2E Test", func() {
},
}
job := createJob(context, spec)
err := waitJobReady(context, job)
Expect(err).NotTo(HaveOccurred())
_, pg := createJob(context, job)
err := waitPodGroupReady(context, pg)
checkError(context, err)
pods := getTasksOfJob(context, job)
pods := getPodOfPodGroup(context, pg)
for _, pod := range pods {
Expect(pod.Spec.NodeName).To(Equal(nodeName))
}
@ -81,11 +81,11 @@ var _ = Describe("Predicates E2E Test", func() {
nn := clusterNodeNumber(context)
spec := &jobSpec{
name: "hp-spec",
job := &jobSpec{
name: "hp-job",
tasks: []taskSpec{
{
img: defaultNginxImage,
img: "nginx",
min: int32(nn),
req: oneCPU,
rep: int32(nn * 2),
@ -94,13 +94,13 @@ var _ = Describe("Predicates E2E Test", func() {
},
}
job := createJob(context, spec)
_, pg := createJob(context, job)
err := waitTasksReady(context, job, nn)
Expect(err).NotTo(HaveOccurred())
err := waitTasksReady(context, pg, nn)
checkError(context, err)
err = waitTasksPending(context, job, nn)
Expect(err).NotTo(HaveOccurred())
err = waitTasksPending(context, pg, nn)
checkError(context, err)
})
It("Pod Affinity", func() {
@ -126,11 +126,11 @@ var _ = Describe("Predicates E2E Test", func() {
},
}
spec := &jobSpec{
name: "pa-spec",
job := &jobSpec{
name: "pa-job",
tasks: []taskSpec{
{
img: defaultNginxImage,
img: "nginx",
req: slot,
min: rep,
rep: rep,
@ -140,11 +140,11 @@ var _ = Describe("Predicates E2E Test", func() {
},
}
job := createJob(context, spec)
err := waitJobReady(context, job)
Expect(err).NotTo(HaveOccurred())
_, pg := createJob(context, job)
err := waitPodGroupReady(context, pg)
checkError(context, err)
pods := getTasksOfJob(context, job)
pods := getPodOfPodGroup(context, pg)
// All pods should be scheduled to the same node.
nodeName := pods[0].Spec.NodeName
for _, pod := range pods {
@ -165,13 +165,13 @@ var _ = Describe("Predicates E2E Test", func() {
}
err := taintAllNodes(context, taints)
Expect(err).NotTo(HaveOccurred())
checkError(context, err)
spec := &jobSpec{
name: "tt-spec",
job := &jobSpec{
name: "tt-job",
tasks: []taskSpec{
{
img: defaultNginxImage,
img: "nginx",
req: oneCPU,
min: 1,
rep: 1,
@ -179,15 +179,15 @@ var _ = Describe("Predicates E2E Test", func() {
},
}
job := createJob(context, spec)
err = waitJobPending(context, job)
Expect(err).NotTo(HaveOccurred())
_, pg := createJob(context, job)
err = waitPodGroupPending(context, pg)
checkError(context, err)
err = removeTaintsFromAllNodes(context, taints)
Expect(err).NotTo(HaveOccurred())
checkError(context, err)
err = waitJobReady(context, job)
Expect(err).NotTo(HaveOccurred())
err = waitPodGroupReady(context, pg)
checkError(context, err)
})
})

View File

@ -1,5 +1,5 @@
/*
Copyright 2019 The Kubernetes Authors.
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -20,21 +20,23 @@ import (
"fmt"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("Queue E2E Test", func() {
var _ = Describe("Predicates E2E Test", func() {
It("Reclaim", func() {
context := initTestContext()
defer cleanupTestContext(context)
createQueues(context)
defer deleteQueues(context)
slot := oneCPU
rep := clusterSize(context, slot)
spec := &jobSpec{
job := &jobSpec{
tasks: []taskSpec{
{
img: defaultNginxImage,
img: "nginx",
req: slot,
min: 1,
rep: rep,
@ -42,11 +44,11 @@ var _ = Describe("Queue E2E Test", func() {
},
}
spec.name = "q1-qj-1"
spec.queue = "q1"
job1 := createJob(context, spec)
err := waitJobReady(context, job1)
Expect(err).NotTo(HaveOccurred())
job.name = "q1-qj-1"
job.queue = "q1"
_, pg1 := createJob(context, job)
err := waitPodGroupReady(context, pg1)
checkError(context, err)
expected := int(rep) / 2
// Reduce one pod to tolerate decimal fraction.
@ -54,17 +56,17 @@ var _ = Describe("Queue E2E Test", func() {
expected--
} else {
err := fmt.Errorf("expected replica <%d> is too small", expected)
Expect(err).NotTo(HaveOccurred())
checkError(context, err)
}
spec.name = "q2-qj-2"
spec.queue = "q2"
job2 := createJob(context, spec)
err = waitTasksReady(context, job2, expected)
Expect(err).NotTo(HaveOccurred())
job.name = "q2-qj-2"
job.queue = "q2"
_, pg2 := createJob(context, job)
err = waitTasksReady(context, pg2, expected)
checkError(context, err)
err = waitTasksReady(context, job1, expected)
Expect(err).NotTo(HaveOccurred())
err = waitTasksReady(context, pg1, expected)
checkError(context, err)
})
})

File diff suppressed because it is too large Load Diff