Add Helm hook to upgrade CRDs

Signed-off-by: Yi Chen <github@chenyicn.net>
This commit is contained in:
Yi Chen 2024-12-24 14:25:13 +08:00
parent 92deff0be9
commit 023391d24c
16 changed files with 729 additions and 1 deletions

View File

@ -2,7 +2,6 @@
.idea/
.vscode/
bin/
charts/
docs/
config/
examples/

View File

@ -55,6 +55,8 @@ USER ${SPARK_UID}:${SPARK_GID}
COPY --from=builder /workspace/bin/spark-operator /usr/bin/spark-operator
COPY --from=builder /workspace/charts/spark-operator-chart/crds /etc/spark-operator/crds
COPY entrypoint.sh /usr/bin/
ENTRYPOINT ["/usr/bin/entrypoint.sh"]

View File

@ -83,6 +83,9 @@ See [helm uninstall](https://helm.sh/docs/helm/helm_uninstall) for command docum
| image.tag | string | If not set, the chart appVersion will be used. | Image tag. |
| image.pullPolicy | string | `"IfNotPresent"` | Image pull policy. |
| image.pullSecrets | list | `[]` | Image pull secrets for private image registry. |
| hook.upgradeCrd | bool | `true` | Specifies whether to update CRDs with a Helm hook job. |
| hook.resources | object | `{"limits":{"cpu":"100m","memory":"64Mi"},"requests":{"cpu":"100m","memory":"64Mi"}}` | Resource requests and limits for hook containers. |
| hook.securityContext | object | `{"allowPrivilegeEscalation":false,"capabilities":{"drop":["ALL"]},"privileged":false,"readOnlyRootFilesystem":true,"runAsNonRoot":true}` | Security context for hook containers. |
| controller.replicas | int | `1` | Number of replicas of controller. |
| controller.workers | int | `10` | Reconcile concurrency, higher values might increase memory usage. |
| controller.logLevel | string | `"info"` | Configure the verbosity of logging, can be one of `debug`, `info`, `error`. |

View File

@ -0,0 +1,65 @@
{{/*
Copyright 2024 The Kubeflow authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/}}
{{/*
Create the name of Helm hook
*/}}
{{- define "spark-operator.hook.name" -}}
{{- include "spark-operator.fullname" . }}-hook
{{- end -}}
{{/*
Common labels for the Helm hook
*/}}
{{- define "spark-operator.hook.labels" -}}
{{ include "spark-operator.labels" . }}
app.kubernetes.io/component: hook
{{- end -}}
{{/*
Selector labels for the Helm hook
*/}}
{{- define "spark-operator.hook.selectorLabels" -}}
{{ include "spark-operator.hook.labels" . }}
{{- end -}}
{{/*
Create the name of the service account to be used by the Helm hooks.
*/}}
{{- define "spark-operator.hook.serviceAccountName" -}}
{{ include "spark-operator.hook.name" . }}
{{- end -}}
{{/*
Create the name of the cluster role to be used by the Helm hooks.
*/}}
{{- define "spark-operator.hook.clusterRoleName" -}}
{{ include "spark-operator.hook.name" . }}
{{- end }}
{{/*
Create the name of the cluster role binding to be used by the Helm hooks.
*/}}
{{- define "spark-operator.hook.clusterRoleBindingName" -}}
{{ include "spark-operator.hook.clusterRoleName" . }}
{{- end }}
{{/*
Create the name of the Helm hook job.
*/}}
{{- define "spark-operator.hook.jobName" -}}
{{ include "spark-operator.hook.name" . }}
{{- end }}

View File

@ -0,0 +1,58 @@
{{/*
Copyright 2024 The Kubeflow authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/}}
{{- if .Values.hook.upgradeCrd }}
apiVersion: batch/v1
kind: Job
metadata:
name: {{ include "spark-operator.hook.jobName" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "spark-operator.hook.labels" . | nindent 4 }}
annotations:
helm.sh/hook: pre-install,pre-upgrade
helm.sh/hook-delete-policy: before-hook-creation,hook-succeeded
helm.sh/hook-weight: "3"
spec:
template:
spec:
containers:
- name: spark-operator-hook
image: {{ include "spark-operator.image" . }}
{{- with .Values.image.pullPolicy }}
imagePullPolicy: {{ . }}
{{- end }}
args:
- hook
- start
- --upgrade-crds
- --crds-path
- /etc/spark-operator/crds
{{- with .Values.hook.resources }}
resources:
{{- toYaml . | nindent 10 }}
{{- end }}
{{- with .Values.hook.securityContext }}
securityContext:
{{- toYaml . | nindent 10 }}
{{- end }}
{{- with .Values.image.pullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
serviceAccountName: {{ include "spark-operator.hook.serviceAccountName" . }}
restartPolicy: Never
{{- end }}

View File

@ -0,0 +1,61 @@
{{/*
Copyright 2024 The Kubeflow authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/}}
{{- if .Values.hook.upgradeCrd }}
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: {{ include "spark-operator.hook.clusterRoleName" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "spark-operator.hook.labels" . | nindent 4 }}
annotations:
helm.sh/hook: pre-install,pre-upgrade
helm.sh/hook-delete-policy: before-hook-creation,hook-succeeded,hook-failed
helm.sh/hook-weight: "2"
rules:
- apiGroups:
- apiextensions.k8s.io
resources:
- customresourcedefinitions
resourceNames:
- sparkapplications.sparkoperator.k8s.io
- scheduledsparkapplications.sparkoperator.k8s.io
verbs:
- get
- update
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: {{ include "spark-operator.hook.clusterRoleBindingName" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "spark-operator.hook.labels" . | nindent 4 }}
annotations:
helm.sh/hook: pre-install,pre-upgrade
helm.sh/hook-delete-policy: before-hook-creation,hook-succeeded,hook-failed
helm.sh/hook-weight: "2"
subjects:
- kind: ServiceAccount
name: {{ include "spark-operator.hook.serviceAccountName" . }}
namespace: {{ .Release.Namespace }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: {{ include "spark-operator.hook.clusterRoleName" . }}
{{- end }}

View File

@ -0,0 +1,29 @@
{{/*
Copyright 2024 The Kubeflow authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/}}
{{- if .Values.hook.upgradeCrd }}
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ include "spark-operator.hook.serviceAccountName" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "spark-operator.hook.labels" . | nindent 4 }}
annotations:
helm.sh/hook: pre-install,pre-upgrade
helm.sh/hook-delete-policy: before-hook-creation,hook-succeeded,hook-failed
helm.sh/hook-weight: "1"
{{- end }}

View File

@ -0,0 +1,110 @@
#
# Copyright 2024 The Kubeflow authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
suite: Test hook job
templates:
- hook/job.yaml
release:
name: spark-operator
namespace: spark-operator
tests:
- it: Should not create hook job if `hook.upgradeCrd` is false
set:
hook:
upgradeCrd: false
asserts:
- hasDocuments:
count: 0
- it: Should create hook job by default
asserts:
- containsDocument:
apiVersion: batch/v1
kind: Job
name: spark-operator-hook
- it: Should use the specified image repository if `image.registry`, `image.repository` and `image.tag` are set
set:
image:
registry: test-registry
repository: test-repository
tag: test-tag
asserts:
- equal:
path: spec.template.spec.containers[?(@.name=="spark-operator-hook")].image
value: test-registry/test-repository:test-tag
- it: Should use the specified image pull policy if `image.pullPolicy` is set
set:
image:
pullPolicy: Always
asserts:
- equal:
path: spec.template.spec.containers[*].imagePullPolicy
value: Always
- it: Should add resources if `hook.resources` is set
set:
hook:
resources:
requests:
memory: 64Mi
cpu: 250m
limits:
memory: 128Mi
cpu: 500m
asserts:
- equal:
path: spec.template.spec.containers[?(@.name=="spark-operator-hook")].resources
value:
requests:
memory: 64Mi
cpu: 250m
limits:
memory: 128Mi
cpu: 500m
- it: Should add container securityContext if `hook.securityContext` is set
set:
hook:
securityContext:
readOnlyRootFilesystem: true
runAsUser: 1000
runAsGroup: 2000
fsGroup: 3000
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
runAsNonRoot: true
privileged: false
asserts:
- equal:
path: spec.template.spec.containers[?(@.name=="spark-operator-hook")].securityContext
value:
readOnlyRootFilesystem: true
runAsUser: 1000
runAsGroup: 2000
fsGroup: 3000
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
runAsNonRoot: true
privileged: false

View File

@ -0,0 +1,75 @@
#
# Copyright 2024 The Kubeflow authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
suite: Test hook rbac
templates:
- hook/rbac.yaml
release:
name: spark-operator
namespace: spark-operator
tests:
- it: Should not create hook RBAC resources if `hook.upgradeCrd` is false
set:
hook:
upgradeCrd: false
asserts:
- hasDocuments:
count: 0
- it: Should create hook ClusterRole by default
documentIndex: 0
asserts:
- containsDocument:
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
name: spark-operator-hook
- contains:
path: rules
content:
apiGroups:
- apiextensions.k8s.io
resources:
- customresourcedefinitions
resourceNames:
- sparkapplications.sparkoperator.k8s.io
- scheduledsparkapplications.sparkoperator.k8s.io
verbs:
- get
- update
- it: Should create hook ClusterRoleBinding by default
documentIndex: 1
asserts:
- containsDocument:
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
name: spark-operator-hook
- contains:
path: subjects
content:
kind: ServiceAccount
name: spark-operator-hook
namespace: spark-operator
count: 1
- equal:
path: roleRef
value:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: spark-operator-hook

View File

@ -0,0 +1,40 @@
#
# Copyright 2024 The Kubeflow authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
suite: Test hook service account
templates:
- hook/serviceaccount.yaml
release:
name: spark-operator
namespace: spark-operator
tests:
- it: Should not create hook service account if `hook.upgradeCrd` is false
set:
hook:
upgradeCrd: false
asserts:
- hasDocuments:
count: 0
- it: Should create hook service account by default
asserts:
- containsDocument:
apiVersion: v1
kind: ServiceAccount
name: spark-operator-hook

View File

@ -41,6 +41,29 @@ image:
pullSecrets: []
# - name: <secret-name>
hook:
# -- Specifies whether to update CRDs with a Helm hook job.
upgradeCrd: true
# -- Resource requests and limits for hook containers.
resources:
requests:
cpu: 100m
memory: 64Mi
limits:
cpu: 100m
memory: 64Mi
# -- Security context for hook containers.
securityContext:
readOnlyRootFilesystem: true
privileged: false
allowPrivilegeEscalation: false
runAsNonRoot: true
capabilities:
drop:
- ALL
controller:
# -- Number of replicas of controller.
replicas: 1

33
cmd/operator/hook/root.go Normal file
View File

@ -0,0 +1,33 @@
/*
Copyright 2024 The Kubeflow authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hook
import (
"github.com/spf13/cobra"
)
func NewCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "hook",
Short: "Spark operator hook",
RunE: func(cmd *cobra.Command, _ []string) error {
return cmd.Help()
},
}
cmd.AddCommand(NewStartCommand())
return cmd
}

179
cmd/operator/hook/start.go Normal file
View File

@ -0,0 +1,179 @@
/*
Copyright 2024 The Kubeflow authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hook
import (
"context"
"flag"
"fmt"
"io/fs"
"os"
"path/filepath"
"github.com/spf13/cobra"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
logzap "sigs.k8s.io/controller-runtime/pkg/log/zap"
"github.com/kubeflow/spark-operator/pkg/util"
)
var (
scheme = runtime.NewScheme()
logger = ctrl.Log.WithName("")
)
var (
upgradeCrds bool
crdsPath string
development bool
zapOptions = logzap.Options{}
)
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(apiextensionsv1.AddToScheme(scheme))
}
func NewStartCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "start",
Short: "Start hook",
PreRunE: func(cmd *cobra.Command, args []string) error {
if crdsPath == "" {
return fmt.Errorf("--crds-path is required")
}
return nil
},
RunE: func(_ *cobra.Command, args []string) error {
return start()
},
}
cmd.Flags().BoolVar(&upgradeCrds, "upgrade-crds", false, "Upgrade SparkApplication and ScheduledSparkApplication CRDs")
cmd.Flags().StringVar(&crdsPath, "crds-path", "", "Path to the CRDs directory")
flagSet := flag.NewFlagSet("hook", flag.ExitOnError)
ctrl.RegisterFlags(flagSet)
zapOptions.BindFlags(flagSet)
cmd.Flags().AddGoFlagSet(flagSet)
return cmd
}
func start() error {
setupLog()
if upgradeCrds {
if err := upgradeCRDs(); err != nil {
return fmt.Errorf("failed to upgrade CRDs: %v", err)
}
}
return nil
}
func upgradeCRDs() error {
// Create the client rest config. Use kubeConfig if given, otherwise assume in-cluster.
cfg, err := ctrl.GetConfig()
if err != nil {
logger.Error(err, "failed to get kube config")
os.Exit(1)
}
k8sClient, err := client.New(cfg, client.Options{Scheme: scheme})
if err != nil {
return fmt.Errorf("failed to create k8s client: %v", err)
}
// Find all CRD files.
crdFiles := []string{}
if err := filepath.Walk(crdsPath, func(path string, info fs.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() && filepath.Ext(path) == ".yaml" {
crdFiles = append(crdFiles, path)
}
return nil
}); err != nil {
return fmt.Errorf("failed to walk crds path: %v", err)
}
// Loop through each CRD file and update the CRD if it has changed.
for _, crdFile := range crdFiles {
crd := &apiextensionsv1.CustomResourceDefinition{}
if err := util.ReadObjectFromFile(crd, crdFile); err != nil {
logger.Error(err, "Failed to read CRD from file", "file", crdFile)
continue
}
apiextensionsv1.SetDefaults_CustomResourceDefinition(crd)
key := client.ObjectKey{Name: crd.Name}
oldCrd := &apiextensionsv1.CustomResourceDefinition{}
if err := k8sClient.Get(context.TODO(), key, oldCrd); err != nil {
logger.Error(err, "Failed to get CRD", "name", oldCrd.Name)
continue
}
if equality.Semantic.DeepEqual(oldCrd.Spec, crd.Spec) {
logger.Info("Skip updating CRD as its specification does not change", "name", crd.Name)
continue
}
newCrd := oldCrd.DeepCopy()
newCrd.Spec = crd.Spec
if err := k8sClient.Update(context.TODO(), newCrd); err != nil {
logger.Error(err, "Failed to update CRD", "name", crd.Name)
continue
}
logger.Info("Updated CRD", "name", crd.Name)
}
return nil
}
// setupLog Configures the logging system
func setupLog() {
ctrl.SetLogger(logzap.New(
logzap.UseFlagOptions(&zapOptions),
func(o *logzap.Options) {
o.Development = development
}, func(o *logzap.Options) {
o.ZapOpts = append(o.ZapOpts, zap.AddCaller())
}, func(o *logzap.Options) {
var config zapcore.EncoderConfig
if !development {
config = zap.NewProductionEncoderConfig()
} else {
config = zap.NewDevelopmentEncoderConfig()
}
config.EncodeLevel = zapcore.CapitalColorLevelEncoder
config.EncodeTime = zapcore.ISO8601TimeEncoder
config.EncodeCaller = zapcore.ShortCallerEncoder
o.Encoder = zapcore.NewConsoleEncoder(config)
}),
)
}

View File

@ -23,6 +23,7 @@ import (
"github.com/spf13/cobra"
"github.com/kubeflow/spark-operator/cmd/operator/controller"
"github.com/kubeflow/spark-operator/cmd/operator/hook"
"github.com/kubeflow/spark-operator/cmd/operator/version"
"github.com/kubeflow/spark-operator/cmd/operator/webhook"
)
@ -37,6 +38,7 @@ func NewCommand() *cobra.Command {
}
command.AddCommand(controller.NewCommand())
command.AddCommand(webhook.NewCommand())
command.AddCommand(hook.NewCommand())
command.AddCommand(version.NewCommand())
return command
}

View File

@ -118,3 +118,17 @@ func WriteObjectToFile(obj interface{}, filePath string) error {
return nil
}
// ReadObjectFromFile unmarshals the given YAML document into the given object.
func ReadObjectFromFile(obj interface{}, filePath string) error {
bytes, err := os.ReadFile(filePath)
if err != nil {
return fmt.Errorf("failed to open %s: %v", filePath, err)
}
if err := yaml.Unmarshal(bytes, obj); err != nil {
return fmt.Errorf("failed to unmarshal %s: %v", filePath, err)
}
return nil
}

View File

@ -22,6 +22,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/kubeflow/spark-operator/pkg/common"
@ -201,3 +202,37 @@ spec:
Expect(os.Remove(file)).NotTo(HaveOccurred())
})
})
var _ = Describe("ReadObjectFromFile", func() {
It("Should read object from the given file", func() {
oldObj := &corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Labels: map[string]string{
"key1": "value1",
"key2": "value2",
},
Annotations: map[string]string{
"key3": "value3",
"key4": "value4",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test-container",
Image: "test-image",
},
},
},
}
file := "pod-template.yaml"
Expect(util.WriteObjectToFile(oldObj, file)).To(Succeed())
newObj := &corev1.PodTemplateSpec{}
Expect(util.ReadObjectFromFile(newObj, file)).To(Succeed())
Expect(equality.Semantic.DeepEqual(oldObj, newObj))
})
})