235 lines
7.0 KiB
Python
235 lines
7.0 KiB
Python
# Copyright 2019 The Kubeflow Authors
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from kubernetes import client as k8s_client
|
|
import kfp.dsl as dsl
|
|
import json
|
|
from string import Template
|
|
|
|
@dsl.pipeline(
|
|
name="Seldon MNIST TF",
|
|
description="Example of training and serving seldon MNIST TF model. Requires docker secret as per kubeflow/example-seldon. Simpler version is mnist_tf_nopush.py"
|
|
)
|
|
|
|
#Example derived from https://github.com/kubeflow/example-seldon
|
|
#This example is TF but R and SKLearn flows are similar - see kubeflow/example-seldon
|
|
#push access needed to chosen docker repo - see note below on secret
|
|
#requires seldon v0.3.0 or higher
|
|
def mnist_tf(docker_secret='docker-config',
|
|
training_repo='https://github.com/kubeflow/example-seldon.git',
|
|
training_branch='master',
|
|
training_files='./example-seldon/models/tf_mnist/train/*',
|
|
docker_repo_training='seldonio/deepmnistclassifier_trainer',
|
|
docker_tag_training='0.3',
|
|
serving_repo='https://github.com/kubeflow/example-seldon.git',
|
|
serving_branch='master',
|
|
serving_files='./example-seldon/models/tf_mnist/runtime/*',
|
|
docker_repo_serving='seldonio/deepmnistclassifier_runtime',
|
|
docker_tag_serving='0.3'):
|
|
|
|
#will be pushing image so need docker secret
|
|
#create from local with `kubectl create secret generic docker-config --from-file=config.json=${DOCKERHOME}/config.json --type=kubernetes.io/config`
|
|
secret = k8s_client.V1Volume(
|
|
name="docker-config-secret",
|
|
secret=k8s_client.V1SecretVolumeSource(secret_name=docker_secret)
|
|
)
|
|
|
|
#use volume for storing model
|
|
modelvolop = dsl.VolumeOp(
|
|
name="modelpvc",
|
|
resource_name="modelpvc",
|
|
size="50Mi",
|
|
modes=dsl.VOLUME_MODE_RWO
|
|
)
|
|
#and another as working directory between steps
|
|
wkdirop = dsl.VolumeOp(
|
|
name="wkdirpvc",
|
|
resource_name="wkdirpvc",
|
|
size="50Mi",
|
|
modes=dsl.VOLUME_MODE_RWO
|
|
)
|
|
|
|
#clone the training code and move to workspace dir as kaniko (next step) expects that
|
|
clone = dsl.ContainerOp(
|
|
name="clone",
|
|
image="alpine/git:latest",
|
|
command=["sh", "-c"],
|
|
arguments=["git clone --depth 1 --branch "+str(training_branch)+" "+str(training_repo)+"; cp "+str(training_files)+" /workspace; ls /workspace/;"],
|
|
pvolumes={"/workspace": wkdirop.volume}
|
|
)
|
|
|
|
#build and push image for training
|
|
build = dsl.ContainerOp(
|
|
name="build",
|
|
image="gcr.io/kaniko-project/executor:latest",
|
|
arguments=["--dockerfile","Dockerfile","--destination",str(docker_repo_training)+":"+str(docker_tag_training)],
|
|
pvolumes={"/workspace": clone.pvolume,"/root/.docker/": secret}
|
|
)
|
|
|
|
tfjobjson_template = Template("""
|
|
{
|
|
"apiVersion": "kubeflow.org/v1beta1",
|
|
"kind": "TFJob",
|
|
"metadata": {
|
|
"name": "mnist-train-{{workflow.uid}}",
|
|
"ownerReferences": [
|
|
{
|
|
"apiVersion": "argoproj.io/v1alpha1",
|
|
"kind": "Workflow",
|
|
"controller": true,
|
|
"name": "{{workflow.name}}",
|
|
"uid": "{{workflow.uid}}"
|
|
}
|
|
]
|
|
},
|
|
"spec": {
|
|
"tfReplicaSpecs": {
|
|
"Worker": {
|
|
"replicas": 1,
|
|
"template": {
|
|
"spec": {
|
|
"containers": [
|
|
{
|
|
"image": "$dockerrepotraining:$dockertagtraining",
|
|
"name": "tensorflow",
|
|
"volumeMounts": [
|
|
{
|
|
"mountPath": "/data",
|
|
"name": "persistent-storage"
|
|
}
|
|
]
|
|
}
|
|
],
|
|
"restartPolicy": "OnFailure",
|
|
"volumes": [
|
|
{
|
|
"name": "persistent-storage",
|
|
"persistentVolumeClaim": {
|
|
"claimName": "$modelpvc"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
""")
|
|
|
|
tfjobjson = tfjobjson_template.substitute({ 'dockerrepotraining': str(docker_repo_training),'dockertagtraining': str(docker_tag_training),'modelpvc': modelvolop.outputs["name"]})
|
|
|
|
tfjob = json.loads(tfjobjson)
|
|
|
|
train = dsl.ResourceOp(
|
|
name="train",
|
|
k8s_resource=tfjob,
|
|
success_condition='status.replicaStatuses.Worker.succeeded == 1'
|
|
).after(build)
|
|
|
|
#prepare the serving code
|
|
clone_serving = dsl.ContainerOp(
|
|
name="clone_serving",
|
|
image="alpine/git:latest",
|
|
command=["sh", "-c"],
|
|
arguments=["rm -rf /workspace/*; git clone --depth 1 --branch "+str(serving_branch)+" "+str(serving_repo)+"; cp "+str(serving_files)+" /workspace; ls /workspace/;"],
|
|
pvolumes={"/workspace": wkdirop.volume}
|
|
).after(train)
|
|
|
|
build_serving = dsl.ContainerOp(
|
|
name="build_serving",
|
|
image="gcr.io/kaniko-project/executor:latest",
|
|
arguments=["--dockerfile","Dockerfile","--destination",str(docker_repo_serving)+":"+str(docker_tag_serving)],
|
|
pvolumes={"/workspace": clone_serving.pvolume,"/root/.docker/": secret}
|
|
).after(clone_serving)
|
|
|
|
seldon_serving_json_template = Template("""
|
|
{
|
|
"apiVersion": "machinelearning.seldon.io/v1alpha2",
|
|
"kind": "SeldonDeployment",
|
|
"metadata": {
|
|
"labels": {
|
|
"app": "seldon"
|
|
},
|
|
"name": "mnist-classifier"
|
|
},
|
|
"spec": {
|
|
"annotations": {
|
|
"deployment_version": "v1",
|
|
"project_name": "MNIST Example"
|
|
},
|
|
"name": "mnist-classifier",
|
|
"predictors": [
|
|
{
|
|
"annotations": {
|
|
"predictor_version": "v1"
|
|
},
|
|
"componentSpecs": [
|
|
{
|
|
"spec": {
|
|
"containers": [
|
|
{
|
|
"image": "$dockerreposerving:$dockertagserving",
|
|
"imagePullPolicy": "Always",
|
|
"name": "mnist-classifier",
|
|
"volumeMounts": [
|
|
{
|
|
"mountPath": "/data",
|
|
"name": "persistent-storage"
|
|
}
|
|
]
|
|
}
|
|
],
|
|
"terminationGracePeriodSeconds": 1,
|
|
"volumes": [
|
|
{
|
|
"name": "persistent-storage",
|
|
"persistentVolumeClaim": {
|
|
"claimName": "$modelpvc"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
],
|
|
"graph": {
|
|
"children": [],
|
|
"endpoint": {
|
|
"type": "REST"
|
|
},
|
|
"name": "mnist-classifier",
|
|
"type": "MODEL"
|
|
},
|
|
"name": "mnist-classifier",
|
|
"replicas": 1
|
|
}
|
|
]
|
|
}
|
|
}
|
|
""")
|
|
seldon_serving_json = seldon_serving_json_template.substitute({ 'dockerreposerving': str(docker_repo_serving),'dockertagserving': str(docker_tag_serving),'modelpvc': modelvolop.outputs["name"]})
|
|
|
|
seldon_deployment = json.loads(seldon_serving_json)
|
|
|
|
serve = dsl.ResourceOp(
|
|
name='serve',
|
|
k8s_resource=seldon_deployment,
|
|
success_condition='status.state == Available'
|
|
).after(build_serving)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import kfp.compiler as compiler
|
|
compiler.Compiler().compile(mnist_tf, __file__ + ".tar.gz")
|