mirror of https://github.com/kubeflow/examples.git
Financial example v0.7 (#693)
* Update financial time series example to Kubeflow v0.7 * Move from GRPC to HTTP for serving client request for financial time series example * Update Tensorflow version to 1.15 on financial time series example * Update KFP pipeline to show accuracy metric and remove deprecated dsl.PipelineParam * Split train and deploy step and add conditional step to deploy in KFP * Clean up readme and add visuals for financial time series example
This commit is contained in:
parent
c20eafc4fc
commit
d93c18f66e
|
@ -1,3 +1 @@
|
|||
vcs.xml
|
||||
kubeflow_ks_app/*
|
||||
kubeflow_repo/*
|
||||
|
|
|
@ -73,7 +73,14 @@
|
|||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"!pip3 install google-cloud-bigquery==1.6.0 pandas==0.23.4 matplotlib==3.0.3 scipy==1.2.1"
|
||||
"!pip3 install google-cloud-bigquery==1.6.0 pandas==0.23.4 matplotlib==3.0.3 scipy==1.2.1 --user"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"You might need to restart the kernel if the `import bigquery` fails."
|
||||
]
|
||||
},
|
||||
{
|
||||
|
@ -1161,7 +1168,7 @@
|
|||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.6.4"
|
||||
"version": "3.6.8"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
|
|
|
@ -4,63 +4,62 @@ Using Kubeflow for Financial Time Series
|
|||
In this example, we will walk through the exploration, training and serving of a machine learning model by leveraging Kubeflow's main components.
|
||||
We will use the [Machine Learning with Financial Time Series Data](https://cloud.google.com/solutions/machine-learning-with-financial-time-series-data) use case.
|
||||
|
||||
## Goals
|
||||
|
||||
There are two primary goals for this tutorial:
|
||||
|
||||
* Demonstrate an End-to-End kubeflow example
|
||||
* Present a financial time series model example
|
||||
|
||||
By the end of this tutorial, you should learn how to:
|
||||
|
||||
* Setup a Kubeflow cluster
|
||||
* Spawn a Jupyter Notebook on the cluster
|
||||
* Train a time-series model using TensorFlow and GPUs on the cluster
|
||||
* Serve the model using [TF Serving](https://www.kubeflow.org/docs/components/serving/tfserving_new/)
|
||||
* Query the model via your local machine
|
||||
* Automate the steps 1/ preprocess, 2/ train and 3/ model deployment through a kubeflow pipeline
|
||||
|
||||
### Pre-requisites
|
||||
You can use a Google Cloud Shell to follow the steps outlined below.
|
||||
In that case you can skip the requirements below as these depencies are pre-installed with the exception that you might still need to install ksonnet via these [instructions](https://www.kubeflow.org/docs/guides/components/ksonnet/).
|
||||
In that case you can skip the requirements below as these depencies are pre-installed.
|
||||
You might also need to install ```uuid-runtime``` via ```sudo apt-get install uuid-runtime```.
|
||||
|
||||
Alternatively, you can work from your local environment.
|
||||
In that case you will need a Linux or Mac environment with Python 3.6.x and install the following requirements
|
||||
* Install [Cloud SDK](https://cloud.google.com/sdk/)
|
||||
* Install [gcloud](https://cloud.google.com/sdk/gcloud/)
|
||||
* Install [ksonnet](https://ksonnet.io/#get-started) version 0.11.0 or later
|
||||
* Install [kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl/)
|
||||
In that case you will need a Linux or Mac environment with Python 3.6.x and install the [Cloud SDK](https://cloud.google.com/sdk/).
|
||||
|
||||
Independent of the machine that you are using, you will need access to a Google Cloud Project and its GKE resources.
|
||||
|
||||
### Deploying Kubeflow on GKE
|
||||
The full deployment script for Kubeflow on GKE will create a cluster for you with machines that already have all the appropiate permissions.
|
||||
Please follow the instructions on how to deploy Kubeflow to GKE on the [getting-started-GKE](https://v0-2.kubeflow.org/docs/started/getting-started-gke/) page from the `examples/financial_time_series` directory.
|
||||
Please follow the instructions on how to deploy Kubeflow to GKE on the
|
||||
[Deploy using CLI](https://www.kubeflow.org/docs/gke/deploy/deploy-cli/) page with the following exceptions:
|
||||
|
||||
- After the step `kfctl build -V -f ${CONFIG_URI}` make sure you add
|
||||
'https://www.googleapis.com/auth/cloud-platform' to the `VM_OAUTH_SCOPES` in the file `{KF_NAME}/gcp_config/cluster.ninja`. This will allow the machines to make use of the BigQuery API, which we need for our use case as the data is stored in BigQuery, and to store data on Google Cloud Storage.
|
||||
- After the step `kfctl build -V -f ${CONFIG_URI}` make sure you set `enableNodeAutoprovisioning` to false in `{KF_NAME}/gcp_config/cluster-kubeflow.yaml` as we will work with our dedicated gpu-pool that Kubeflow deployment foresees.
|
||||
The [node autoprovisioning](https://cloud.google.com/kubernetes-engine/docs/how-to/node-auto-provisioning) can be useful to autoscale the cluster with non-user defined node pools.
|
||||
|
||||
|
||||
### Cloning the Examples
|
||||
|
||||
Clone the examples repository and change directory to the financial time series example:
|
||||
```
|
||||
git clone https://github.com/kubeflow/examples.git
|
||||
cd examples/financial_time_series/
|
||||
<follow instructions for deploying GKE>
|
||||
```
|
||||
After the step `${KUBEFLOW_SRC}/scripts/kfctl.sh generate platform` make sure you add 'https://www.googleapis.com/auth/cloud-platform' to the `VM_OAUTH_SCOPES` in the file `{KFAPP}/gcp_config/cluster.ninja`. This will allow the machines to make use of the BigQuery API, which we need for our use case as the data is stored in BigQuery, and to store data on Google Cloud Storage.
|
||||
Also we will set `enableNodeAutoprovisioning` to false in this file as we will work with our dedicated gpu-pool.
|
||||
The [node autoprivioning](https://cloud.google.com/kubernetes-engine/docs/how-to/node-auto-provisioning) can be useful to autoscale the cluster with non-user defined node pools.
|
||||
|
||||
Next to this, we also need to update the `{KFAPP}/gcp_config/iam_bindings.yaml` by adding the roles 'roles/bigquery.admin' and 'roles/storage.admin' for the VM service account so that it is authorized to create a BigQuery job and write files to Google Cloud Storage.
|
||||
Last but not least, we also need to update the `cluster-kubeflow.yaml` to enable GPUs on our cluster, set `gpu-pool-max-nodes` to 1 instead of 0.
|
||||
|
||||
Once the script is finished, you should a new folder in your directory`with the following subfolders.
|
||||
```
|
||||
$ financial_time_series
|
||||
.
|
||||
├── tensorflow_model
|
||||
└── <kubeflow_src>
|
||||
├── deployment
|
||||
├── <kf_app>
|
||||
├── kubeflow
|
||||
└── scripts
|
||||
```
|
||||
Next, we can easily verify the status of the pods by running ```kubectl get pods```.
|
||||
|
||||
### Explore the Kubeflow UI
|
||||
After some time (about 10-15 minutes), an endpoint should now be available at `https://<kf_app>.endpoints.<project_id>.cloud.goog/`.
|
||||
After some time (about 10-15 minutes), an endpoint should now be available at `https://<KF_NAME>.endpoints.<project_id>.cloud.goog/`.
|
||||
From this page you can navigate between the different Kubeflow components.
|
||||
|
||||
### Exploration via tf-hub
|
||||
The TF-hub component of Kubeflow allows us to leverage [JupyterHub](https://github.com/jupyterhub/jupyterhub) to investigate the data and start building a feasible machine learning model for the specific problem.
|
||||
From the Kubeflow starting page, you can click on the `Jupyterhub` tab.
|
||||
After filling in a dummy username and password you are prompted to select parameters to spawn a JupyterHub.
|
||||
In this case, we will just leave the default settings and hit spawn.
|
||||
### Exploration via Jupyter Hub
|
||||
The [JupyterHub](https://github.com/jupyterhub/jupyterhub) component of Kubeflow allows us to spin up Jupyter Notebooks quite easily.
|
||||
In the notebook we will investigate the data and start building a feasible machine learning model for the specific problem.
|
||||
|
||||
The following steps for running the Jupyter Notebook work better on a local machine kernel as the Google Cloud Shell is not meant to stand up a web socket service and is not configured for that.
|
||||
Note that this is not a compulsory step in order to be able to follow the next sections, so if you are working on a Google Cloud Shell you can simply investigate the notebook via the link below.
|
||||
|
||||
You can simply upload the [notebook](https://github.com/kubeflow/examples/blob/master/financial_time_series/Financial%20Time%20Series%20with%20Finance%20Data.ipynb) and walk through it step by step to better understand the problem and suggested solution(s).
|
||||
From the Kubeflow starting page, you can click on the `Notebook Servers` tab.
|
||||
Make sure you select a namespace on the top left and hit the 'new server'
|
||||
button. You can just fill in an appropiate name and leave all the options to
|
||||
the defaults. You can simply upload the [notebook](https://github.com/kubeflow/examples/blob/master/financial_time_series/Financial%20Time%20Series%20with%20Finance%20Data.ipynb) and walk through it step by step to better understand the problem and suggested solution(s).
|
||||
In this example, the goal is not focus on the notebook itself but rather on how this notebook is being translated in more scalable training jobs and later on serving.
|
||||
|
||||
### Training at scale with TF-jobs
|
||||
|
@ -69,45 +68,31 @@ In the folder ```tensorflow-model``` you can find these scripts together with a
|
|||
Subsequently we will build a docker image on Google Cloud by running following command:
|
||||
|
||||
```
|
||||
cd tensorflow-model/
|
||||
cd tensorflow_model/
|
||||
export TRAIN_PATH=gcr.io/<project>/<image-name>/cpu:v1
|
||||
gcloud builds submit --tag $TRAIN_PATH .
|
||||
```
|
||||
|
||||
Now that we have an image ready on Google Cloud Container Registry, it's time we start launching a training job.
|
||||
|
||||
```
|
||||
cd ../<kubeflow_src>/<kf_app>/ks_app
|
||||
ks generate tf-job-simple train
|
||||
```
|
||||
This Ksonnet protoytype needs to be slightly modified to our needs, you can simply copy an updated version of this prototype by copying the updated version from the repository.
|
||||
```
|
||||
cp ../../../tensorflow_model/CPU/train.jsonnet ./components/train.jsonnet
|
||||
```
|
||||
|
||||
Now we need to define the parameters which are currently set as placeholders in the training job prototype.
|
||||
Note that this introduces a flexible and clean way of working, by changing the parameters you can easily launch another training job without maintaining multiple YAML files in your repository.
|
||||
We will create a bucket to store our data and model artifacts:
|
||||
|
||||
```
|
||||
# create storage bucket that will be used to store models
|
||||
BUCKET_NAME=<your-bucket-name>
|
||||
gsutil mb gs://$BUCKET_NAME/
|
||||
# set parameters
|
||||
export TRAINING_NAME=trainingjob1
|
||||
ks param set train name $TRAINING_NAME
|
||||
ks param set train namespace "default"
|
||||
ks param set train image $TRAIN_PATH
|
||||
ks param set train workingDir "opt/workdir"
|
||||
ks param set train args -- python,run_preprocess_and_train.py,--model=FlatModel,--epochs=30001,--bucket=$BUCKET_NAME,--version=1
|
||||
```
|
||||
|
||||
You can verify the parameter settings in the params.libsonnet in the directory kubeflow_ks_app/components.
|
||||
This file keeps track of all the parameters used to instantiate components from prototypes.
|
||||
Now that we have an image ready on Google Cloud Container Registry, it's time we start launching a training job.
|
||||
Please have a look at the tfjob resource in `CPU/tfjob1.yaml` and update the
|
||||
image and bucket reference. In this case we
|
||||
are using a very simple definition of a
|
||||
[TF-job](https://www.kubeflow.org/docs/components/training/tftraining/),
|
||||
it only has a single worker as we are not doing any advanced training set-up (e.g. distributed training).
|
||||
|
||||
Next we can launch the tf-job to our Kubeflow cluster and follow the progress via the logs of the pod.
|
||||
|
||||
```
|
||||
ks apply default -c train
|
||||
POD_NAME=$(kubectl get pods --selector=tf_job_name=$TRAINING_NAME,tf-replica-type=worker \
|
||||
kubectl apply -f CPU/tfjob1.yaml
|
||||
POD_NAME=$(kubectl get pods --selector=tf-job-name=tfjob-flat \
|
||||
--template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}')
|
||||
kubectl logs -f $POD_NAME
|
||||
```
|
||||
|
@ -117,83 +102,85 @@ In the logs you can see that the trained model is being exported to google cloud
|
|||
|
||||
### Deploy and serve with TF-serving
|
||||
Once the model is trained, the next step will be to deploy it and serve requests.
|
||||
Kubeflow comes with a TF-serving module which you can use to deploy your model with only a few commands.
|
||||
We will use the standard TF-serving module that Kubeflow offers.
|
||||
Please have a look at the serving manifest `tfserving.yaml` and update the bucket name.
|
||||
We will use a ClusterIP to expose the service only inside the cluster. To
|
||||
reach out securely from outside of the cluster, you could use the secured
|
||||
set-up via the istio ingress-gateway, which
|
||||
Kubeflow offers out-of-the-box. For more information, see the
|
||||
[documentation](https://www.kubeflow.org/docs/components/serving/tfserving_new/).
|
||||
|
||||
```
|
||||
ks generate tf-serving serve --name=tf-serving
|
||||
ks param set serve modelPath gs://$BUCKET_NAME/model/
|
||||
ks apply default -c serve
|
||||
kubectl apply -f tfserving.yaml
|
||||
```
|
||||
|
||||
After running these commands, a deployment and service will be launched on Kubernetes that will enable you to easily send requests to get predictions from your module.
|
||||
Let's check if the model is loaded successfully.
|
||||
|
||||
```
|
||||
POD=`kubectl get pods --selector=app=tf-serving | awk '{print $1}' | tail -1`
|
||||
POD=`kubectl get pods --selector=app=model | awk '{print $1}' | tail -1`
|
||||
kubectl logs -f $POD
|
||||
```
|
||||
|
||||
We will do a local test via GRPC to illustrate how to get results from this serving component. Once the pod is up we can set up port-forwarding to our localhost.
|
||||
We will do a local test via HTTP to illustrate how to get results from this serving component. Once the pod is up we can set up port-forwarding to our localhost.
|
||||
```
|
||||
kubectl port-forward $POD 9000:9000 2>&1 >/dev/null &
|
||||
kubectl port-forward $POD 8500:8500 2>&1 >/dev/null &
|
||||
```
|
||||
|
||||
Now the only thing we need to do is send a request to ```localhost:9000``` with the expected input of the saved model and it will return a prediction.
|
||||
Now the only thing we need to do is send a request to ```localhost:8500``` with the expected input of the saved model and it will return a prediction.
|
||||
The saved model expects a time series from closing stocks and spits out the prediction as a 0 (S&P closes positive) or 1 (S&P closes negative) together with the version of the saved model which was memorized upon saving the model.
|
||||
Let's start with a script that populates a request with random numbers to test the service.
|
||||
|
||||
```
|
||||
cd ../../../tensorflow_model
|
||||
pip3 install numpy tensorflow-serving-api
|
||||
pip3 install numpy requests
|
||||
python3 -m serving_requests.request_random
|
||||
```
|
||||
|
||||
The output should return an integer, 0 or 1 as explained above, and a string that represents the version.
|
||||
The output should return an integer, 0 or 1 as explained above, and a string that represents the tag of the model.
|
||||
There is another script available that builds a more practical request, with time series data of closing stocks for a certain date.
|
||||
In the following script, the same date is used as the one used at the end of the notebook ```Machine Learning with Financial Time Series Data.ipynb``` for comparison reasons.
|
||||
|
||||
```
|
||||
pip3 install pandas
|
||||
pip3 install -r requirements.txt
|
||||
python3 -m serving_requests.request
|
||||
```
|
||||
|
||||
The response should indicate that S&P index is expected to close positive (0) but from the actual data (which is prospected in the notebook mentioned above) we can see that it actually closed negative that day.
|
||||
Let's get back to training and see if we can improve our accuracy.
|
||||
|
||||
### Running another tf-job and serving update
|
||||
### Running another TF-job and serving update
|
||||
Most likely a single training job will never be sufficient. It is very common to create a continuous training pipeline to iterate training and verify the output.
|
||||
Submitting another training job with Kubeflow is very easy.
|
||||
By simply adjusting the parameters we can instantiate another component from the ```train.jsonnet```prototype.
|
||||
Please have a look at the serving manifest `CPU/tfjob2.yaml` and update the
|
||||
image and bucket reference.
|
||||
This time, we will train a more complex neural network with several hidden layers.
|
||||
```
|
||||
cd ../<kubeflow_src>/<kf_app>/ks_app
|
||||
export TRAINING_NAME=trainingjob2
|
||||
ks param set train name $TRAINING_NAME
|
||||
ks param set train args -- python,run_preprocess_and_train.py,--model=DeepModel,--epochs=30001,--bucket=$BUCKET_NAME,--version=2
|
||||
ks apply default -c train
|
||||
```
|
||||
|
||||
Verify the logs or use ```kubectl describe tfjobs trainingjob2```
|
||||
|
||||
```
|
||||
POD_NAME=$(kubectl get pods --selector=tf_job_name=$TRAINING_NAME,tf-replica-type=worker \
|
||||
kubectl apply -f CPU/tfjob2.yaml
|
||||
```
|
||||
|
||||
Verify the logs via:
|
||||
|
||||
```
|
||||
POD_NAME=$(kubectl get pods --selector=tf-job-name=tfjob-deep \
|
||||
--template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}')
|
||||
kubectl logs -f $POD_NAME
|
||||
```
|
||||
|
||||
You should notice that the training now takes a few minutes instead of less than one minute, however the accuracy on the test set is now 72%.
|
||||
Our training job uploads the trained model to the serving directory of our running tf-serving component.
|
||||
The tf-serving component watches this serving directory and automatically loads the model of the folder with the highest version (as integer).
|
||||
Since the newer version has a higher number than the previous one, our tf-serving should have switched to this new model.
|
||||
You should notice that the training now takes a few minutes instead of less than one minute.
|
||||
The accuracy on the test set is now 72%.
|
||||
Our training job uploads the trained model to the serving directory of our running TF-serving component.
|
||||
Let's see if we get a response from the new version and if the new model gets it right this time.
|
||||
|
||||
```
|
||||
cd ../../../tensorflow_model
|
||||
python3 -m serving_requests.request
|
||||
```
|
||||
|
||||
The response returns the updated version number '2' and predicts the correct output 1, which means the S&P index closes negative, hurray!
|
||||
The response returns the model tag 'v2' and predicts the correct output 1, which means the S&P index closes negative, hurray!
|
||||
|
||||
### Running TF-job on a GPU
|
||||
|
||||
Can we also run the tf-job on a GPU?
|
||||
Can we also run the TF-job on a GPU?
|
||||
Imagine the training job does not just take a few minutes but rather hours or days.
|
||||
In this case we can reduce the training time by using a GPU. The GKE deployment script for Kubeflow automatically adds a GPU-pool that can scale as needed so you don’t need to pay for a GPU when you don’t need it.
|
||||
Note that the Kubeflow deployment also installs the necessary Nvidia drivers for you so there is no need for you to worry about extra GPU device plugins.
|
||||
|
@ -206,79 +193,68 @@ export TRAIN_PATH_GPU=gcr.io/<project-name>/<image-name>/gpu:v1
|
|||
gcloud builds submit --tag $TRAIN_PATH_GPU .
|
||||
```
|
||||
|
||||
Also the train.jsonnet will need to be slightly adjusted to make it flexible to also run on GPUs.
|
||||
You can simply copy the adjusted jsonnet by running following command.
|
||||
|
||||
```
|
||||
cp GPU/train.jsonnet ../<kubeflow_src>/<kf_app>/ks_app/components/train.jsonnet
|
||||
```
|
||||
|
||||
|
||||
|
||||
Subsequently, the parameters must be updated to fit with new prototype in ```train.jsonnet```.
|
||||
```
|
||||
cd ../<kubeflow_src>/<kf_app>/ks_app
|
||||
export TRAINING_NAME=trainingjobgpu
|
||||
ks param set train name $TRAINING_NAME
|
||||
ks param set train gpuImage $TRAIN_PATH_GPU
|
||||
ks param set train num_gpu 1
|
||||
ks param set train args -- python,run_preprocess_and_train.py,--model=DeepModel,--epochs=30001,--bucket=$BUCKET_NAME,--version=3
|
||||
```
|
||||
|
||||
Please have a look at the slightly altered training job manifest `GPU/tfjob3
|
||||
.yaml` and update the image and bucket reference.
|
||||
Note that the container now has a nodeSelector to point to the GPU-pool.
|
||||
Next we can deploy the tf-job to our GPU by simply running following command.
|
||||
|
||||
```
|
||||
ks apply default -c train
|
||||
kubectl apply -f GPU/tfjob3.yaml
|
||||
```
|
||||
|
||||
First the pod will be unschedulable as there are no gpu-pool nodes available. This demand will be recognized by the kubernetes cluster and a node will be created on the gpu-pool automatically.
|
||||
Once the pod is up, you can check the logs and verify that the training time is significantly reduced compared to the previous tf-job.
|
||||
Once the pod is up, you can check the logs and verify that the training time is reduced compared to the previous tf-job.
|
||||
|
||||
```
|
||||
POD_NAME=$(kubectl get pods --selector=tf_job_name=$TRAINING_NAME,tf-replica-type=worker \
|
||||
POD_NAME=$(kubectl get pods --selector=tf-job-name=tfjob-deep-gpu \
|
||||
--template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}')
|
||||
kubectl logs -f $POD_NAME
|
||||
```
|
||||
|
||||
### Kubeflow Pipelines
|
||||
Up to now, we clustered the preprocessing and training in a single script to illustrate the TFJobs.
|
||||
In practice, most often the preprocessing and training step will separated and they will need to run sequentially each time.
|
||||
In this way, we decouple the preprocessing from the training and can iterate faster different ML flows.
|
||||
Up to now, we clustered the preprocessing, training and deploy in a single script to illustrate the TFJobs.
|
||||
In practice, most often the preprocessing, training and deploy step will separated and they will need to run sequentially.
|
||||
Kubeflow pipelines offers an easy way of chaining these steps together and we will illustrate that here.
|
||||
As you can see, the script `run_preprocess_and_train.py` was using the two scripts `run_preprocess.py` and `run_train.py` underlying.
|
||||
The idea here is that these two steps will be containerized and chained together by Kubeflow pipelines.
|
||||
As you can see, the script `run_preprocess_train_deploy.py` was using the scripts `run_preprocess.py`, `run_train.py` and `run_deploy.py` underlying.
|
||||
The idea here is that these three steps will be containerized and chained together by Kubeflow pipelines.
|
||||
We will also introduce a condition that we will only deploy the model if the accuracy on the test set surpasses a treshold of 70%.
|
||||
|
||||
KFP asks us to compile our pipeline Python3 file into a domain-specific-language.
|
||||
Kubeflow Pipelines asks us to compile our pipeline Python3 file into a domain-specific-language.
|
||||
We do that with a tool called dsl-compile that comes with the Python3 SDK. So, first install that SDK:
|
||||
|
||||
```
|
||||
pip3 install python-dateutil https://storage.googleapis.com/ml-pipeline/release/0.1.2/kfp.tar.gz --upgrade
|
||||
pip3 install python-dateutil kfp==0.1.36
|
||||
```
|
||||
|
||||
Update the `ml_pipeline.py` with the cpu image path that you built in the previous steps and your bucket name.
|
||||
Please inspect the `ml_pipline.py` and update the `ml_pipeline.py` with the cpu image path that you built in the previous steps.
|
||||
Then, compile the DSL, using:
|
||||
|
||||
```
|
||||
cd ../../../tensorflow_model
|
||||
python3 ml_pipeline.py
|
||||
```
|
||||
|
||||
Now a file `ml_pipeline.py.tar_gz` is generated that we can upload to the kubeflow pipelines UI.
|
||||
We will navigate again back to the Kubeflow UI homepage on `https://<kf_app>.endpoints.<project_id>.cloud.goog/` and click on the Pipeline dashboard.
|
||||
We will navigate again back to the Kubeflow UI homepage on `https://<KF_NAME>.endpoints.<project_id>.cloud.goog/` and click on the 'Pipelines' in the menu on the left side.
|
||||
|
||||
|
||||
Once the browser is open, upload the tar.gz file. This simply makes the graph available.
|
||||
Next we can create a run and specify the params for the run. Make sure to specify version to 4 to check if this run creates a new saved model.
|
||||
Once the page is open, click 'Upload pipeline' and select the tar.gz file.
|
||||
If you click on the pipeline you can inspect the Directed Acyclic Graph (DAG).
|
||||
|
||||

|
||||
|
||||
Next we can click on the pipeline and create a run. For each run you need to specify the params that you want to use.
|
||||
When the pipeline is running, you can inspect the logs:
|
||||
|
||||

|
||||

|
||||
|
||||
This run with the less advanced model does not surpass the accuracy threshold and there is no deploy step.
|
||||
Note that you can also see the accuracy metrics across the different runs from the Experiments page.
|
||||

|
||||
|
||||
Also check that the more advanced model surpassed the accuracy threshold and was deployed by TF-serving.
|
||||

|
||||
|
||||
|
||||
### Clean up
|
||||
To clean up, we will simply run the Kubeflow deletion bash script which takes care of deleting all components in a correct manner.
|
||||
|
||||
```
|
||||
cd ../<kubeflow_src>/<kf_app>
|
||||
${KUBEFLOW_REPO}/scripts/kfctl.sh delete all
|
||||
```
|
||||
|
||||
|
||||
|
||||
|
||||
To clean up, follow the instructions ['Delete using CLI'](https://www.kubeflow.org/docs/gke/deploy/delete-cli/) so that all components are
|
||||
deleted in a correct manner.
|
||||
|
|
Binary file not shown.
After Width: | Height: | Size: 153 KiB |
Binary file not shown.
After Width: | Height: | Size: 222 KiB |
Binary file not shown.
Before Width: | Height: | Size: 366 KiB |
Binary file not shown.
After Width: | Height: | Size: 79 KiB |
Binary file not shown.
After Width: | Height: | Size: 97 KiB |
|
@ -12,9 +12,20 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
FROM tensorflow/tensorflow:1.8.0-devel-py3
|
||||
FROM tensorflow/tensorflow:1.15.0-py3
|
||||
|
||||
RUN pip3 install google-cloud-storage==1.10.0 \
|
||||
# install gcloud
|
||||
RUN apt-get update && apt-get install -y jq
|
||||
|
||||
RUN curl https://dl.google.com/dl/cloudsdk/release/google-cloud-sdk.tar.gz > /tmp/google-cloud-sdk.tar.gz
|
||||
RUN mkdir -p /usr/local/gcloud
|
||||
RUN tar -C /usr/local/gcloud -xvf /tmp/google-cloud-sdk.tar.gz
|
||||
RUN /usr/local/gcloud/google-cloud-sdk/install.sh
|
||||
|
||||
ENV PATH $PATH:/usr/local/gcloud/google-cloud-sdk/bin
|
||||
|
||||
# install python packages
|
||||
RUN pip3 install google-cloud-storage==1.17.0 \
|
||||
google-cloud-bigquery==1.6.0 \
|
||||
pandas==0.23.4
|
||||
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
apiVersion: kubeflow.org/v1
|
||||
kind: TFJob
|
||||
metadata:
|
||||
name: tfjob-flat
|
||||
namespace: kubeflow
|
||||
spec:
|
||||
tfReplicaSpecs:
|
||||
Worker:
|
||||
replicas: 1
|
||||
restartPolicy: OnFailure
|
||||
template:
|
||||
spec:
|
||||
containers:
|
||||
- name: tensorflow
|
||||
image: gcr.io/<project>/<image-name>/cpu:v1
|
||||
command:
|
||||
- python
|
||||
- run_preprocess_train_deploy.py
|
||||
- --model=FlatModel
|
||||
- --epochs=30001
|
||||
- --bucket=<BUCKET_NAME>
|
||||
- --tag=v1
|
||||
workingDir: "/opt/workdir"
|
||||
env:
|
||||
- name: GOOGLE_APPLICATION_CREDENTIALS
|
||||
value: "/secret/gcp-credentials/user-gcp-sa.json"
|
||||
volumeMounts:
|
||||
- name: sa
|
||||
mountPath: "/secret/gcp-credentials"
|
||||
readOnly: true
|
||||
volumes:
|
||||
- name: sa
|
||||
secret:
|
||||
secretName: user-gcp-sa
|
|
@ -0,0 +1,34 @@
|
|||
apiVersion: kubeflow.org/v1
|
||||
kind: TFJob
|
||||
metadata:
|
||||
name: tfjob-deep
|
||||
namespace: kubeflow
|
||||
spec:
|
||||
tfReplicaSpecs:
|
||||
Worker:
|
||||
replicas: 1
|
||||
restartPolicy: OnFailure
|
||||
template:
|
||||
spec:
|
||||
containers:
|
||||
- name: tensorflow
|
||||
image: gcr.io/<project>/<image-name>/cpu:v1
|
||||
command:
|
||||
- python
|
||||
- run_preprocess_train_deploy.py
|
||||
- --model=DeepModel
|
||||
- --epochs=30001
|
||||
- --bucket=<BUCKET_NAME>
|
||||
- --tag=v2
|
||||
workingDir: "/opt/workdir"
|
||||
env:
|
||||
- name: GOOGLE_APPLICATION_CREDENTIALS
|
||||
value: "/secret/gcp-credentials/user-gcp-sa.json"
|
||||
volumeMounts:
|
||||
- name: sa
|
||||
mountPath: "/secret/gcp-credentials"
|
||||
readOnly: true
|
||||
volumes:
|
||||
- name: sa
|
||||
secret:
|
||||
secretName: user-gcp-sa
|
|
@ -1,48 +0,0 @@
|
|||
local env = std.extVar("__ksonnet/environments");
|
||||
local params = std.extVar("__ksonnet/params").components.train;
|
||||
|
||||
local k = import "k.libsonnet";
|
||||
|
||||
local name = params.name;
|
||||
local namespace = env.namespace;
|
||||
local image = params.image;
|
||||
|
||||
local argsParam = params.args;
|
||||
local args =
|
||||
if argsParam == "null" then
|
||||
[]
|
||||
else
|
||||
std.split(argsParam, ",");
|
||||
|
||||
local tfjob = {
|
||||
apiVersion: "kubeflow.org/v1beta1",
|
||||
kind: "TFJob",
|
||||
metadata: {
|
||||
name: name,
|
||||
namespace: namespace,
|
||||
},
|
||||
spec: {
|
||||
tfReplicaSpecs: {
|
||||
Worker: {
|
||||
replicas: 1,
|
||||
template: {
|
||||
spec: {
|
||||
containers: [
|
||||
{
|
||||
args: args,
|
||||
image: image,
|
||||
name: "tensorflow",
|
||||
workingDir: "/opt/workdir",
|
||||
},
|
||||
],
|
||||
restartPolicy: "OnFailure",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
k.core.v1.list.new([
|
||||
tfjob,
|
||||
])
|
|
@ -12,9 +12,20 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
FROM tensorflow/tensorflow:1.8.0-devel-py3
|
||||
FROM tensorflow/tensorflow:1.15.0-py3
|
||||
|
||||
RUN pip3 install google-cloud-storage==1.10.0 \
|
||||
# install gcloud
|
||||
RUN apt-get update && apt-get install -y jq
|
||||
|
||||
RUN curl https://dl.google.com/dl/cloudsdk/release/google-cloud-sdk.tar.gz > /tmp/google-cloud-sdk.tar.gz
|
||||
RUN mkdir -p /usr/local/gcloud
|
||||
RUN tar -C /usr/local/gcloud -xvf /tmp/google-cloud-sdk.tar.gz
|
||||
RUN /usr/local/gcloud/google-cloud-sdk/install.sh
|
||||
|
||||
ENV PATH $PATH:/usr/local/gcloud/google-cloud-sdk/bin
|
||||
|
||||
# install python packages
|
||||
RUN pip3 install google-cloud-storage==1.17.0 \
|
||||
google-cloud-bigquery==1.6.0 \
|
||||
pandas==0.23.4
|
||||
|
||||
|
|
|
@ -12,9 +12,19 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
FROM tensorflow/tensorflow:1.8.0-devel-gpu-py3
|
||||
FROM tensorflow/tensorflow:1.15.0-gpu-py3
|
||||
|
||||
RUN pip3 install google-cloud-storage==1.10.0 \
|
||||
# install gcloud
|
||||
RUN apt-get update && apt-get install -y jq
|
||||
|
||||
RUN curl https://dl.google.com/dl/cloudsdk/release/google-cloud-sdk.tar.gz > /tmp/google-cloud-sdk.tar.gz
|
||||
RUN mkdir -p /usr/local/gcloud
|
||||
RUN tar -C /usr/local/gcloud -xvf /tmp/google-cloud-sdk.tar.gz
|
||||
RUN /usr/local/gcloud/google-cloud-sdk/install.sh
|
||||
|
||||
ENV PATH $PATH:/usr/local/gcloud/google-cloud-sdk/bin
|
||||
|
||||
RUN pip3 install google-cloud-storage==1.17.0 \
|
||||
google-cloud-bigquery==1.6.0 \
|
||||
pandas==0.23.4
|
||||
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
apiVersion: kubeflow.org/v1
|
||||
kind: TFJob
|
||||
metadata:
|
||||
name: tfjob-deep-gpu
|
||||
namespace: kubeflow
|
||||
spec:
|
||||
tfReplicaSpecs:
|
||||
Worker:
|
||||
replicas: 1
|
||||
restartPolicy: OnFailure
|
||||
template:
|
||||
spec:
|
||||
containers:
|
||||
- name: tensorflow
|
||||
image: gcr.io/<project>/<image-name>/gpu:v1
|
||||
command:
|
||||
- python
|
||||
- run_preprocess_train_deploy.py
|
||||
- --model=DeepModel
|
||||
- --epochs=30001
|
||||
- --bucket=<BUCKET_NAME>
|
||||
- --tag=v3
|
||||
workingDir: "/opt/workdir"
|
||||
env:
|
||||
- name: GOOGLE_APPLICATION_CREDENTIALS
|
||||
value: "/secret/gcp-credentials/user-gcp-sa.json"
|
||||
resources:
|
||||
limits:
|
||||
nvidia.com/gpu: 1
|
||||
volumeMounts:
|
||||
- name: sa
|
||||
mountPath: "/secret/gcp-credentials"
|
||||
readOnly: true
|
||||
volumes:
|
||||
- name: sa
|
||||
secret:
|
||||
secretName: user-gcp-sa
|
|
@ -1,53 +0,0 @@
|
|||
local env = std.extVar("__ksonnet/environments");
|
||||
local params = std.extVar("__ksonnet/params").components.train;
|
||||
|
||||
local k = import "k.libsonnet";
|
||||
|
||||
local name = params.name;
|
||||
local namespace = env.namespace;
|
||||
local image = params.image;
|
||||
|
||||
local argsParam = params.args;
|
||||
local args =
|
||||
if argsParam == "null" then
|
||||
[]
|
||||
else
|
||||
std.split(argsParam, ",");
|
||||
|
||||
local tfjob = {
|
||||
apiVersion: "kubeflow.org/v1beta1",
|
||||
kind: "TFJob",
|
||||
metadata: {
|
||||
name: name,
|
||||
namespace: namespace,
|
||||
},
|
||||
spec: {
|
||||
tfReplicaSpecs: {
|
||||
Worker: {
|
||||
replicas: 1,
|
||||
template: {
|
||||
spec: {
|
||||
containers: [
|
||||
{
|
||||
args: args,
|
||||
image: if params.num_gpu > 0 then params.gpuImage else params.cpuImage,
|
||||
name: "tensorflow",
|
||||
[if params.num_gpu > 0 then "resources"]: {
|
||||
limits: {
|
||||
"nvidia.com/gpu": params.num_gpu,
|
||||
},
|
||||
},
|
||||
workingDir: "/opt/workdir",
|
||||
},
|
||||
],
|
||||
restartPolicy: "OnFailure",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
k.core.v1.list.new([
|
||||
tfjob,
|
||||
])
|
|
@ -94,6 +94,9 @@ def tf_calc_confusion_metrics(true_pos, true_neg, false_pos, false_neg):
|
|||
print('F1 Score = ', f1_score)
|
||||
print('Accuracy = ', accuracy)
|
||||
|
||||
return {'precision': precision, 'recall': recall, 'f1': f1_score,
|
||||
'accuracy': accuracy}
|
||||
|
||||
|
||||
def tf_confusion_matrix(model, actual_classes, session, feed_dict):
|
||||
"""Calculates confusion matrix when training.
|
||||
|
@ -116,4 +119,4 @@ def tf_confusion_matrix(model, actual_classes, session, feed_dict):
|
|||
feed_dict
|
||||
)
|
||||
|
||||
tf_calc_confusion_metrics(true_pos, true_neg, false_pos, false_neg)
|
||||
return tf_calc_confusion_metrics(true_pos, true_neg, false_pos, false_neg)
|
||||
|
|
|
@ -37,3 +37,10 @@ def download_blob(bucket_name, source_blob_name, destination_file_name):
|
|||
print('Blob {} downloaded to {}.'.format(
|
||||
source_blob_name,
|
||||
destination_file_name))
|
||||
|
||||
|
||||
def list_blobs(bucket_name, prefix, delimiter=None):
|
||||
"""Lists all the blobs in the bucket."""
|
||||
storage_client = storage.Client()
|
||||
return storage_client.list_blobs(bucket_name, prefix=prefix,
|
||||
delimiter=delimiter)
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
import kfp.dsl as dsl
|
||||
import kfp.gcp as gcp
|
||||
|
||||
|
||||
class Preprocess(dsl.ContainerOp):
|
||||
|
@ -32,20 +33,39 @@ class Preprocess(dsl.ContainerOp):
|
|||
file_outputs={'blob-path': '/blob_path.txt'}
|
||||
)
|
||||
|
||||
|
||||
class Train(dsl.ContainerOp):
|
||||
|
||||
def __init__(self, name, blob_path, version, bucket, model):
|
||||
def __init__(self, name, blob_path, tag, bucket, model):
|
||||
super(Train, self).__init__(
|
||||
name=name,
|
||||
# image needs to be a compile-time string
|
||||
image='gcr.io/<project>/<image-name>/cpu:v1',
|
||||
command=['python3', 'run_train.py'],
|
||||
arguments=[
|
||||
'--version', version,
|
||||
'--tag', tag,
|
||||
'--blob_path', blob_path,
|
||||
'--bucket', bucket,
|
||||
'--model', model
|
||||
]
|
||||
'--model', model,
|
||||
'--kfp'
|
||||
],
|
||||
file_outputs={'mlpipeline_metrics': '/mlpipeline-metrics.json',
|
||||
'accuracy': '/tmp/accuracy'}
|
||||
)
|
||||
|
||||
|
||||
class Deploy(dsl.ContainerOp):
|
||||
|
||||
def __init__(self, name, tag, bucket):
|
||||
super(Deploy, self).__init__(
|
||||
name=name,
|
||||
# image needs to be a compile-time string
|
||||
image='gcr.io/<project>/<image-name>/cpu:v1',
|
||||
command=['python3', 'run_deploy.py'],
|
||||
arguments=[
|
||||
'--tag', tag,
|
||||
'--bucket', bucket,
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
|
@ -53,18 +73,22 @@ class Train(dsl.ContainerOp):
|
|||
name='financial time series',
|
||||
description='Train Financial Time Series'
|
||||
)
|
||||
def train_and_deploy(
|
||||
bucket=dsl.PipelineParam('bucket', value='<bucket>'),
|
||||
cutoff_year=dsl.PipelineParam('cutoff-year', value='2010'),
|
||||
version=dsl.PipelineParam('version', value='4'),
|
||||
model=dsl.PipelineParam('model', value='DeepModel')
|
||||
def preprocess_train_deploy(
|
||||
bucket: str = '<bucket>',
|
||||
cutoff_year: str = '2010',
|
||||
tag: str = '4',
|
||||
model: str = 'DeepModel'
|
||||
):
|
||||
"""Pipeline to train financial time series model"""
|
||||
preprocess_op = Preprocess('preprocess', bucket, cutoff_year)
|
||||
preprocess_op = Preprocess('preprocess', bucket, cutoff_year).apply(
|
||||
gcp.use_gcp_secret('user-gcp-sa'))
|
||||
#pylint: disable=unused-variable
|
||||
train_op = Train('train and deploy', preprocess_op.output, version, bucket, model)
|
||||
train_op = Train('train', preprocess_op.output, tag,
|
||||
bucket, model).apply(gcp.use_gcp_secret('user-gcp-sa'))
|
||||
with dsl.Condition(train_op.outputs['accuracy'] > 0.7):
|
||||
deploy_op = Deploy('deploy', tag, bucket).apply(gcp.use_gcp_secret('user-gcp-sa'))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import kfp.compiler as compiler
|
||||
compiler.Compiler().compile(train_and_deploy, __file__ + '.tar.gz')
|
||||
compiler.Compiler().compile(preprocess_train_deploy, __file__ + '.tar.gz')
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
google-cloud-storage==1.10.0
|
||||
google-cloud-storage==1.17.0
|
||||
google-cloud-bigquery==1.6.0
|
||||
pandas==0.23.4
|
||||
numpy==1.15.2
|
||||
tensorflow==1.8.0
|
||||
numpy==1.16.0
|
||||
tensorflow==1.15.0
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
"""Module for deploying a machine learning model to TF serving.
|
||||
|
||||
Scripts that performs the steps to deploy a model with TF serving
|
||||
"""
|
||||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
import subprocess
|
||||
|
||||
from helpers import storage as storage_helper
|
||||
|
||||
|
||||
def parse_arguments(argv):
|
||||
"""Parse command line arguments
|
||||
Args:
|
||||
argv (list): list of command line arguments including program name
|
||||
Returns:
|
||||
The parsed arguments as returned by argparse.ArgumentParser
|
||||
"""
|
||||
parser = argparse.ArgumentParser(description='Preprocessing')
|
||||
|
||||
parser.add_argument('--bucket',
|
||||
type=str,
|
||||
help='GCS bucket where preprocessed data is saved',
|
||||
default='<your-bucket-name>')
|
||||
|
||||
parser.add_argument('--tag',
|
||||
type=str,
|
||||
help='tag of the model',
|
||||
required=True)
|
||||
|
||||
args, _ = parser.parse_known_args(args=argv[1:])
|
||||
|
||||
return args
|
||||
|
||||
|
||||
def run_deploy(argv=None):
|
||||
"""Runs the retrieval and preprocessing of the data.
|
||||
|
||||
Args:
|
||||
args: args that are passed when submitting the training
|
||||
|
||||
"""
|
||||
args = parse_arguments(sys.argv if argv is None else argv)
|
||||
logging.info('start deploying model %s ..', args.tag)
|
||||
|
||||
# get latest active version for TF serving directory
|
||||
serving_dir = 'tfserving'
|
||||
blobs = storage_helper.list_blobs(args.bucket, prefix=serving_dir)
|
||||
version = set()
|
||||
for blob in blobs:
|
||||
version.add(int(blob.name.split('/')[1]))
|
||||
if version:
|
||||
new_version = max(version)+1
|
||||
else:
|
||||
new_version = 1
|
||||
|
||||
# copy the files
|
||||
logging.info('deploying model %s as model number %s on TF serving', args.tag, new_version)
|
||||
subprocess.call(['gcloud', 'auth', 'activate-service-account',
|
||||
'--key-file', '/secret/gcp-credentials/user-gcp-sa.json'])
|
||||
src_folder = 'gs://{}/models/{}'.format(args.bucket, args.tag)
|
||||
target_folder = 'gs://{}/tfserving/{}'.format(args.bucket, new_version)
|
||||
subprocess.call(['gsutil', 'cp', '-r', src_folder, target_folder])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
run_deploy()
|
|
@ -9,6 +9,7 @@ import sys
|
|||
|
||||
from run_preprocess import run_preprocess
|
||||
from run_train import run_training
|
||||
from run_deploy import run_deploy
|
||||
|
||||
|
||||
|
||||
|
@ -43,10 +44,10 @@ def parse_arguments(argv):
|
|||
help='number of epochs to train',
|
||||
default=30001)
|
||||
|
||||
parser.add_argument('--version',
|
||||
parser.add_argument('--tag',
|
||||
type=str,
|
||||
help='version (stored for serving)',
|
||||
default='1')
|
||||
help='tag of the model',
|
||||
default='v1')
|
||||
|
||||
args, _ = parser.parse_known_args(args=argv[1:])
|
||||
|
||||
|
@ -65,6 +66,7 @@ def run_preprocess_and_train(argv=None):
|
|||
run_preprocess(sys.argv)
|
||||
sys.argv.append('--blob_path=data/data_{}.csv'.format(args.cutoff_year))
|
||||
run_training(sys.argv)
|
||||
run_deploy(sys.argv)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
|
@ -3,6 +3,7 @@
|
|||
Scripts that performs all the steps to train the ML model.
|
||||
"""
|
||||
import logging
|
||||
import json
|
||||
import os
|
||||
import argparse
|
||||
import time
|
||||
|
@ -10,6 +11,7 @@ import shutil
|
|||
import sys
|
||||
import pandas as pd
|
||||
import tensorflow as tf
|
||||
from tensorflow.python.lib.io import file_io
|
||||
|
||||
#pylint: disable=no-name-in-module
|
||||
from helpers import preprocess, models, metrics
|
||||
|
@ -36,10 +38,10 @@ def parse_arguments(argv):
|
|||
help='number of epochs to train',
|
||||
default=30001)
|
||||
|
||||
parser.add_argument('--version',
|
||||
parser.add_argument('--tag',
|
||||
type=str,
|
||||
help='version (stored for serving)',
|
||||
default='1')
|
||||
help='tag of the model',
|
||||
default='v1')
|
||||
|
||||
parser.add_argument('--bucket',
|
||||
type=str,
|
||||
|
@ -51,6 +53,11 @@ def parse_arguments(argv):
|
|||
help='GCS blob path where data is saved',
|
||||
default='data')
|
||||
|
||||
parser.add_argument('--kfp',
|
||||
dest='kfp',
|
||||
action='store_true',
|
||||
help='Kubeflow pipelines flag')
|
||||
|
||||
args, _ = parser.parse_known_args(args=argv[1:])
|
||||
|
||||
return args
|
||||
|
@ -110,14 +117,15 @@ def run_training(argv=None):
|
|||
}
|
||||
)
|
||||
if i % 5000 == 0:
|
||||
print(i, sess.run(
|
||||
train_acc = sess.run(
|
||||
accuracy,
|
||||
feed_dict={
|
||||
feature_data: training_test_data['training_predictors_tf'].values,
|
||||
actual_classes: training_test_data['training_classes_tf'].values.reshape(
|
||||
len(training_test_data['training_classes_tf'].values), 2)
|
||||
}
|
||||
))
|
||||
)
|
||||
print(i, train_acc)
|
||||
time_dct['end'] = time.time()
|
||||
logging.info('training took {0:.2f} sec'.format(time_dct['end'] - time_dct['start']))
|
||||
|
||||
|
@ -128,24 +136,42 @@ def run_training(argv=None):
|
|||
actual_classes: training_test_data['test_classes_tf'].values.reshape(
|
||||
len(training_test_data['test_classes_tf'].values), 2)
|
||||
}
|
||||
metrics.tf_confusion_matrix(model, actual_classes, sess, feed_dict)
|
||||
test_acc = metrics.tf_confusion_matrix(model, actual_classes, sess,
|
||||
feed_dict)['accuracy']
|
||||
|
||||
# create signature for TensorFlow Serving
|
||||
logging.info('Exporting model for tensorflow-serving...')
|
||||
|
||||
export_path = os.path.join("model", args.version)
|
||||
export_path = os.path.join("models", args.tag)
|
||||
tf.saved_model.simple_save(
|
||||
sess,
|
||||
export_path,
|
||||
inputs={'predictors': feature_data},
|
||||
outputs={'prediction': tf.argmax(model, 1),
|
||||
'model-version': tf.constant([str(args.version)])}
|
||||
'model-tag': tf.constant([str(args.tag)])}
|
||||
)
|
||||
|
||||
# save model on GCS
|
||||
logging.info("uploading to " + args.bucket + "/" + export_path)
|
||||
storage_helper.upload_to_storage(args.bucket, export_path)
|
||||
|
||||
if args.kfp:
|
||||
metrics_info = {
|
||||
'metrics': [{
|
||||
'name': 'accuracy-train',
|
||||
'numberValue': float(train_acc),
|
||||
'format': "PERCENTAGE"
|
||||
}, {
|
||||
'name': 'accuracy-test',
|
||||
'numberValue': float(test_acc),
|
||||
'format': "PERCENTAGE"
|
||||
}]}
|
||||
with file_io.FileIO('/mlpipeline-metrics.json', 'w') as f:
|
||||
json.dump(metrics_info, f)
|
||||
|
||||
with open("/tmp/accuracy", "w") as output_file:
|
||||
output_file.write(str(float(test_acc)))
|
||||
|
||||
# remove local files
|
||||
shutil.rmtree(export_path)
|
||||
shutil.rmtree(temp_folder)
|
||||
|
|
|
@ -26,11 +26,7 @@ def send_pratical_request(date="2014-08-12"):
|
|||
training_test_data[training_test_data.columns[2:]].values[index],
|
||||
axis=0).astype(np.float32)
|
||||
|
||||
# send request
|
||||
value, version = request_helper.send_request(input_tensor)
|
||||
# print response
|
||||
print("Prediction : " + str(value))
|
||||
print("Version of model : " + str(version))
|
||||
request_helper.send_request(input_tensor)
|
||||
|
||||
|
||||
send_pratical_request()
|
||||
|
|
|
@ -1,12 +1,9 @@
|
|||
""" Module that builds the request and processes the response from the tf-server.
|
||||
|
||||
Uses GRPC protocol to send a request to the tf-server and processes it.
|
||||
Uses HTTP protocol to send a request to the tf-server and processes it.
|
||||
"""
|
||||
|
||||
from grpc.beta import implementations
|
||||
import tensorflow as tf
|
||||
from tensorflow_serving.apis import predict_pb2
|
||||
from tensorflow_serving.apis import prediction_service_pb2_grpc
|
||||
import requests
|
||||
|
||||
|
||||
def send_request(input_tensor):
|
||||
|
@ -20,28 +17,13 @@ def send_request(input_tensor):
|
|||
str: version of the ML model
|
||||
|
||||
"""
|
||||
host = '127.0.0.1'
|
||||
port = 8500
|
||||
model_name = "finance-model"
|
||||
path = 'http://{}:{}/v1/models/{}'.format(host, port, model_name)
|
||||
payoad = {'instances': input_tensor.tolist()}
|
||||
|
||||
# server settings
|
||||
server_host = '127.0.0.1'
|
||||
server_port = 9000
|
||||
server_name = "tf-serving"
|
||||
timeout = 10.0
|
||||
result = requests.post(url=path + ':predict', json=payoad).json()[
|
||||
'predictions'][0]
|
||||
|
||||
print("connecting to:%s:%i" % (server_host, server_port))
|
||||
|
||||
# initialize to server connection
|
||||
channel = implementations.insecure_channel(server_host, server_port)
|
||||
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel._channel) # pylint: disable=protected-access
|
||||
|
||||
# build request
|
||||
request = predict_pb2.PredictRequest()
|
||||
request.model_spec.name = server_name # pylint: disable=no-member
|
||||
request.model_spec.signature_name = 'serving_default' # pylint: disable=no-member
|
||||
request.inputs['predictors'].CopyFrom( # pylint: disable=no-member
|
||||
tf.contrib.util.make_tensor_proto(input_tensor, shape=input_tensor.shape))
|
||||
|
||||
# retrieve results
|
||||
result = stub.Predict(request, timeout)
|
||||
resultval = result.outputs['prediction'].int64_val
|
||||
version = result.outputs['model-version'].string_val
|
||||
return resultval[0], version[0]
|
||||
print(result)
|
||||
|
|
|
@ -13,10 +13,6 @@ def send_random_request():
|
|||
# create random input
|
||||
input_tensor = np.random.rand(1, 24).astype(np.float32)
|
||||
# send request
|
||||
value, version = request_helper.send_request(input_tensor)
|
||||
# print response
|
||||
print("Prediction : " + str(value))
|
||||
print("Version of model : " + str(version))
|
||||
|
||||
request_helper.send_request(input_tensor)
|
||||
|
||||
send_random_request()
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
labels:
|
||||
app: model
|
||||
name: tfserving-service
|
||||
namespace: kubeflow
|
||||
spec:
|
||||
ports:
|
||||
- name: grpc-tf-serving
|
||||
port: 9000
|
||||
targetPort: 9000
|
||||
- name: http-tf-serving
|
||||
port: 8500
|
||||
targetPort: 8500
|
||||
selector:
|
||||
app: model
|
||||
type: ClusterIP
|
||||
---
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
labels:
|
||||
app: model
|
||||
name: tfserving
|
||||
namespace: kubeflow
|
||||
spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app: model
|
||||
template:
|
||||
metadata:
|
||||
annotations:
|
||||
sidecar.istio.io/inject: "true"
|
||||
labels:
|
||||
app: model
|
||||
version: v1
|
||||
spec:
|
||||
containers:
|
||||
- args:
|
||||
- --port=9000
|
||||
- --rest_api_port=8500
|
||||
- --model_name=finance-model
|
||||
- --model_base_path=gs://<BUCKET_NAME>/tfserving/
|
||||
command:
|
||||
- /usr/bin/tensorflow_model_server
|
||||
image: tensorflow/serving:1.11.1
|
||||
imagePullPolicy: IfNotPresent
|
||||
livenessProbe:
|
||||
initialDelaySeconds: 30
|
||||
periodSeconds: 30
|
||||
tcpSocket:
|
||||
port: 9000
|
||||
name: model
|
||||
ports:
|
||||
- containerPort: 9000
|
||||
- containerPort: 8500
|
||||
resources:
|
||||
limits:
|
||||
cpu: "4"
|
||||
memory: 4Gi
|
||||
requests:
|
||||
cpu: "1"
|
||||
memory: 1Gi
|
||||
env:
|
||||
- name: GOOGLE_APPLICATION_CREDENTIALS
|
||||
value: "/etc/secrets/user-gcp-sa.json"
|
||||
volumeMounts:
|
||||
- name: sa
|
||||
mountPath: "/etc/secrets"
|
||||
readOnly: true
|
||||
volumes:
|
||||
- name: sa
|
||||
secret:
|
||||
secretName: user-gcp-sa
|
||||
|
Loading…
Reference in New Issue