pipelines/samples/contrib/seldon/mnist_tf.py

235 lines
7.0 KiB
Python

# Copyright 2019 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kubernetes import client as k8s_client
import kfp.dsl as dsl
import json
from string import Template
@dsl.pipeline(
name="Seldon MNIST TF",
description="Example of training and serving seldon MNIST TF model. Requires docker secret as per kubeflow/example-seldon. Simpler version is mnist_tf_nopush.py"
)
#Example derived from https://github.com/kubeflow/example-seldon
#This example is TF but R and SKLearn flows are similar - see kubeflow/example-seldon
#push access needed to chosen docker repo - see note below on secret
#requires seldon v0.3.0 or higher
def mnist_tf(docker_secret='docker-config',
training_repo='https://github.com/kubeflow/example-seldon.git',
training_branch='master',
training_files='./example-seldon/models/tf_mnist/train/*',
docker_repo_training='seldonio/deepmnistclassifier_trainer',
docker_tag_training='0.3',
serving_repo='https://github.com/kubeflow/example-seldon.git',
serving_branch='master',
serving_files='./example-seldon/models/tf_mnist/runtime/*',
docker_repo_serving='seldonio/deepmnistclassifier_runtime',
docker_tag_serving='0.3'):
#will be pushing image so need docker secret
#create from local with `kubectl create secret generic docker-config --from-file=config.json=${DOCKERHOME}/config.json --type=kubernetes.io/config`
secret = k8s_client.V1Volume(
name="docker-config-secret",
secret=k8s_client.V1SecretVolumeSource(secret_name=docker_secret)
)
#use volume for storing model
modelvolop = dsl.VolumeOp(
name="modelpvc",
resource_name="modelpvc",
size="50Mi",
modes=dsl.VOLUME_MODE_RWO
)
#and another as working directory between steps
wkdirop = dsl.VolumeOp(
name="wkdirpvc",
resource_name="wkdirpvc",
size="50Mi",
modes=dsl.VOLUME_MODE_RWO
)
#clone the training code and move to workspace dir as kaniko (next step) expects that
clone = dsl.ContainerOp(
name="clone",
image="alpine/git:latest",
command=["sh", "-c"],
arguments=["git clone --depth 1 --branch "+str(training_branch)+" "+str(training_repo)+"; cp "+str(training_files)+" /workspace; ls /workspace/;"],
pvolumes={"/workspace": wkdirop.volume}
)
#build and push image for training
build = dsl.ContainerOp(
name="build",
image="gcr.io/kaniko-project/executor:latest",
arguments=["--dockerfile","Dockerfile","--destination",str(docker_repo_training)+":"+str(docker_tag_training)],
pvolumes={"/workspace": clone.pvolume,"/root/.docker/": secret}
)
tfjobjson_template = Template("""
{
"apiVersion": "kubeflow.org/v1beta1",
"kind": "TFJob",
"metadata": {
"name": "mnist-train-{{workflow.uid}}",
"ownerReferences": [
{
"apiVersion": "argoproj.io/v1alpha1",
"kind": "Workflow",
"controller": true,
"name": "{{workflow.name}}",
"uid": "{{workflow.uid}}"
}
]
},
"spec": {
"tfReplicaSpecs": {
"Worker": {
"replicas": 1,
"template": {
"spec": {
"containers": [
{
"image": "$dockerrepotraining:$dockertagtraining",
"name": "tensorflow",
"volumeMounts": [
{
"mountPath": "/data",
"name": "persistent-storage"
}
]
}
],
"restartPolicy": "OnFailure",
"volumes": [
{
"name": "persistent-storage",
"persistentVolumeClaim": {
"claimName": "$modelpvc"
}
}
]
}
}
}
}
}
}
""")
tfjobjson = tfjobjson_template.substitute({ 'dockerrepotraining': str(docker_repo_training),'dockertagtraining': str(docker_tag_training),'modelpvc': modelvolop.outputs["name"]})
tfjob = json.loads(tfjobjson)
train = dsl.ResourceOp(
name="train",
k8s_resource=tfjob,
success_condition='status.replicaStatuses.Worker.succeeded == 1'
).after(build)
#prepare the serving code
clone_serving = dsl.ContainerOp(
name="clone_serving",
image="alpine/git:latest",
command=["sh", "-c"],
arguments=["rm -rf /workspace/*; git clone --depth 1 --branch "+str(serving_branch)+" "+str(serving_repo)+"; cp "+str(serving_files)+" /workspace; ls /workspace/;"],
pvolumes={"/workspace": wkdirop.volume}
).after(train)
build_serving = dsl.ContainerOp(
name="build_serving",
image="gcr.io/kaniko-project/executor:latest",
arguments=["--dockerfile","Dockerfile","--destination",str(docker_repo_serving)+":"+str(docker_tag_serving)],
pvolumes={"/workspace": clone_serving.pvolume,"/root/.docker/": secret}
).after(clone_serving)
seldon_serving_json_template = Template("""
{
"apiVersion": "machinelearning.seldon.io/v1alpha2",
"kind": "SeldonDeployment",
"metadata": {
"labels": {
"app": "seldon"
},
"name": "mnist-classifier"
},
"spec": {
"annotations": {
"deployment_version": "v1",
"project_name": "MNIST Example"
},
"name": "mnist-classifier",
"predictors": [
{
"annotations": {
"predictor_version": "v1"
},
"componentSpecs": [
{
"spec": {
"containers": [
{
"image": "$dockerreposerving:$dockertagserving",
"imagePullPolicy": "Always",
"name": "mnist-classifier",
"volumeMounts": [
{
"mountPath": "/data",
"name": "persistent-storage"
}
]
}
],
"terminationGracePeriodSeconds": 1,
"volumes": [
{
"name": "persistent-storage",
"persistentVolumeClaim": {
"claimName": "$modelpvc"
}
}
]
}
}
],
"graph": {
"children": [],
"endpoint": {
"type": "REST"
},
"name": "mnist-classifier",
"type": "MODEL"
},
"name": "mnist-classifier",
"replicas": 1
}
]
}
}
""")
seldon_serving_json = seldon_serving_json_template.substitute({ 'dockerreposerving': str(docker_repo_serving),'dockertagserving': str(docker_tag_serving),'modelpvc': modelvolop.outputs["name"]})
seldon_deployment = json.loads(seldon_serving_json)
serve = dsl.ResourceOp(
name='serve',
k8s_resource=seldon_deployment,
success_condition='status.state == Available'
).after(build_serving)
if __name__ == "__main__":
import kfp.compiler as compiler
compiler.Compiler().compile(mnist_tf, __file__ + ".tar.gz")