mirror of https://github.com/kubeflow/examples.git
Update Financial Time Series example to v0.4.0 and add Kubeflow pipelines (#535)
This commit is contained in:
parent
7a6dc7b911
commit
9332e7226a
|
@ -77,6 +77,7 @@ This example covers the following concepts:
|
|||
4. Deploy and serve with TF-serving
|
||||
5. Iterate training and serving
|
||||
6. Training on GPU
|
||||
7. Using Kubeflow Pipelines to automate ML workflow
|
||||
|
||||
|
||||
## Component-focused
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -20,80 +20,47 @@ Independent of the machine that you are using, you will need access to a Google
|
|||
|
||||
### Deploying Kubeflow on GKE
|
||||
The full deployment script for Kubeflow on GKE will create a cluster for you with machines that already have all the appropiate permissions.
|
||||
In the steps below we will actually use the deployment script for minikube but deployed on infrastructure in the cloud instead of your local machine.
|
||||
The reason for this is that we want to focus on the abilities of Kubeflow in this example rather than making sure it is set up securely for a production-like environment.
|
||||
If interested, instructions on how to deploy Kubeflow to GKE with authentification can be found on Kubeflow's [getting-started-GKE](https://v0-2.kubeflow.org/docs/started/getting-started-gke/) page.
|
||||
|
||||
We will first create a cluster named 'kubeflow' on google kubernetes engine.
|
||||
The following set of commands clones the repository, creates a cluster, connects our local environment to the cluster and changes the permissions on the cluster to allow Kubeflow to run properly.
|
||||
Make sure to replace the placeholder parameters between ```<>``` with proper values in the commands.
|
||||
Please follow the instructions on how to deploy Kubeflow to GKE on the [getting-started-GKE](https://v0-2.kubeflow.org/docs/started/getting-started-gke/) page from the `examples/financial_time_series` directory.
|
||||
|
||||
```
|
||||
git clone https://github.com/kubeflow/examples.git
|
||||
cd examples/financial_time_series
|
||||
gcloud container clusters create kubeflow --zone <your-zone> --machine-type n1-standard-4 --scopes=https://www.googleapis.com/auth/cloud-platform
|
||||
gcloud container clusters get-credentials kubeflow --zone <your-zone> --project <your-project-name>
|
||||
kubectl create clusterrolebinding default-admin --clusterrole=cluster-admin --user=<your-email-address>
|
||||
cd examples/financial_time_series/
|
||||
<follow instructions for deploying GKE>
|
||||
```
|
||||
After the step `${KUBEFLOW_SRC}/scripts/kfctl.sh generate platform` make sure you add 'https://www.googleapis.com/auth/cloud-platform' to the `VM_OAUTH_SCOPES` in the file `{KFAPP}/gcp_config/cluster.ninja`. This will allow the machines to make use of the BigQuery API, which we need for our use case as the data is stored in BigQuery, and to store data on Google Cloud Storage.
|
||||
Also we will set `enableNodeAutoprovisioning` to false in this file as we will work with our dedicated gpu-pool.
|
||||
The [node autoprivioning](https://cloud.google.com/kubernetes-engine/docs/how-to/node-auto-provisioning) can be useful to autoscale the cluster with non-user defined node pools.
|
||||
|
||||
Note that we had to define the scopes specifically since Kubernetes v1.10.
|
||||
If we would drop the scopes argument, the machines in the cluster have a lot of restrictions to use Google Cloud APIs to connect to other Google Cloud services such as Google Cloud Storage, BigQuery etc.
|
||||
Note that for this last command you will need to have ```container.clusterRoleBindings.create permission```, which you automatically have as a project owner.
|
||||
Our cluster is now up and running and properly set up in order to install Kubeflow.
|
||||
Note that it requires only a single command to deploy Kubeflow to an existing cluster.
|
||||
|
||||
Next to this, we also need to update the `{KFAPP}/gcp_config/iam_bindings.yaml` by adding the roles 'roles/bigquery.admin' and 'roles/storage.admin' for the VM service account so that it is authorized to create a BigQuery job and write files to Google Cloud Storage.
|
||||
Last but not least, we also need to update the `cluster-kubeflow.yaml` to enable GPUs on our cluster, set `gpu-pool-max-nodes` to 1 instead of 0.
|
||||
|
||||
Once the script is finished, you should a new folder in your directory`with the following subfolders.
|
||||
```
|
||||
export KUBEFLOW_VERSION=0.2.2
|
||||
curl https://raw.githubusercontent.com/kubeflow/kubeflow/v${KUBEFLOW_VERSION}/scripts/deploy.sh | bash
|
||||
```
|
||||
|
||||
As said before, the script above is the deployment script for minikube but deployed on infrastructure in the cloud instead of your local machine.
|
||||
|
||||
Once the script is finished, you should two new folders in your directory.
|
||||
```
|
||||
$ tree
|
||||
$ financial_time_series
|
||||
.
|
||||
├── kubeflow_ks_app
|
||||
└── kubeflow_repo
|
||||
```
|
||||
Next, we can easily verify the status of the pods by running ```kubectl get pods```:
|
||||
```
|
||||
NAME READY STATUS
|
||||
ambassador-788655d76f-8fkpv 2/2 Running
|
||||
ambassador-788655d76f-fvjld 2/2 Running
|
||||
ambassador-788655d76f-t4xqt 2/2 Running
|
||||
centraldashboard-6665fc46cb-jrwvf 1/1 Running
|
||||
spartakus-volunteer-9c546f4db-5pztt 1/1 Running
|
||||
tf-hub-0 1/1 Running
|
||||
tf-job-dashboard-644865ddff-fbwnw 1/1 Running
|
||||
tf-job-operator-v1alpha2-75bcb7f5f7-fgf9h 1/1 Running
|
||||
├── tensorflow_model
|
||||
└── <kubeflow_src>
|
||||
├── deployment
|
||||
├── <kf_app>
|
||||
├── kubeflow
|
||||
└── scripts
|
||||
```
|
||||
Next, we can easily verify the status of the pods by running ```kubectl get pods```.
|
||||
|
||||
Note that the corresponding services were also generated by checking the output of ```kubectl get svc``` .
|
||||
### Explore the Kubeflow UI
|
||||
After some time (about 10-15 minutes), an endpoint should now be available at `https://<kf_app>.endpoints.<project_id>.cloud.goog/`.
|
||||
From this page you can navigate between the different Kubeflow components.
|
||||
|
||||
### Exploration via tf-hub
|
||||
The TF-hub component of Kubeflow allows us to leverage [JupyterHub](https://github.com/jupyterhub/jupyterhub) to investigate the data and start building a feasible machine learning model for the specific problem.
|
||||
In order to access this component, we will set up port-forwarding between the TF-hub pod and our machine.
|
||||
```
|
||||
POD=`kubectl get pods --selector=app=tf-hub | awk '{print $1}' | tail -1`
|
||||
kubectl port-forward $POD 8000:8000 2>&1 >/dev/null &
|
||||
```
|
||||
You should now be able to access the TF-hub via ```localhost:8000```.
|
||||
From the Kubeflow starting page, you can click on the `Jupyterhub` tab.
|
||||
After filling in a dummy username and password you are prompted to select parameters to spawn a JupyterHub.
|
||||
In this case, we will just set the ```image``` to ```gcr.io/kubeflow-images-public/tensorflow-1.8.0-notebook-cpu:v0.2.1``` and hit spawn.
|
||||
In this case, we will just leave the default settings and hit spawn.
|
||||
|
||||
The following steps for configuring and running the Jupyter Notebook work better on a local machine kernel as the Google Cloud Shell is not meant to stand up a web socket service and is not configured for that.
|
||||
The following steps for running the Jupyter Notebook work better on a local machine kernel as the Google Cloud Shell is not meant to stand up a web socket service and is not configured for that.
|
||||
Note that this is not a compulsory step in order to be able to follow the next sections, so if you are working on a Google Cloud Shell you can simply investigate the notebook via the link below.
|
||||
|
||||
Once the JupyterHub instance is ready, we will launch a terminal on the instance to install the required packages that our code uses.
|
||||
In order to launch a terminal, click 'new' > 'terminal' and subsequently install the required packages.
|
||||
```
|
||||
pip3 install google-cloud-bigquery==1.6.0 --user
|
||||
```
|
||||
|
||||
Once the package is installed, navigate back to the JupyterHub home screen. Our JupyterHub instance should be ready to run the code from the slightly adjusted notebook ```Machine Learning with Financial Time Series Data.ipynb```, which is available [here](https://github.com/kubeflow/examples/blob/master/financial_time_series/Financial%20Time%20Series%20with%20Finance%20Data.ipynb).
|
||||
You can simply upload the notebook and walk through it step by step to better understand the problem and suggested solution(s).
|
||||
You can simply upload the [notebook](https://github.com/kubeflow/examples/blob/master/financial_time_series/Financial%20Time%20Series%20with%20Finance%20Data.ipynb) and walk through it step by step to better understand the problem and suggested solution(s).
|
||||
In this example, the goal is not focus on the notebook itself but rather on how this notebook is being translated in more scalable training jobs and later on serving.
|
||||
|
||||
### Training at scale with TF-jobs
|
||||
|
@ -110,12 +77,12 @@ gcloud builds submit --tag $TRAIN_PATH .
|
|||
Now that we have an image ready on Google Cloud Container Registry, it's time we start launching a training job.
|
||||
|
||||
```
|
||||
cd ../kubeflow_ks_app
|
||||
cd ../<kubeflow_src>/<kf_app>/ks_app
|
||||
ks generate tf-job-simple train
|
||||
```
|
||||
This Ksonnet protoytype needs to be slightly modified to our needs, you can simply copy an updated version of this prototype by copying the updated version from the repository.
|
||||
```
|
||||
cp ../tensorflow_model/CPU/train.jsonnet ./components/train.jsonnet
|
||||
cp ../../../tensorflow_model/CPU/train.jsonnet ./components/train.jsonnet
|
||||
```
|
||||
|
||||
Now we need to define the parameters which are currently set as placeholders in the training job prototype.
|
||||
|
@ -123,53 +90,49 @@ Note that this introduces a flexible and clean way of working, by changing the p
|
|||
|
||||
```
|
||||
# create storage bucket that will be used to store models
|
||||
$ BUCKET_NAME = <your-bucket-name>
|
||||
$ gsutil mb gs://$BUCKET_NAME/
|
||||
BUCKET_NAME=<your-bucket-name>
|
||||
gsutil mb gs://$BUCKET_NAME/
|
||||
# set parameters
|
||||
export TRAINING_NAME=trainingjob1
|
||||
ks param set train name $TRAINING_NAME
|
||||
ks param set train namespace "default"
|
||||
export TRAIN_PATH=gcr.io/<project-name>/<image-name>/cpu:v1
|
||||
ks param set train image $TRAIN_PATH
|
||||
ks param set train workingDir "opt/workdir"
|
||||
ks param set train args -- python,run_train.py,--model=FlatModel,--epochs=30001,--bucket=$BUCKET_NAME,--version=1
|
||||
ks param set train args -- python,run_preprocess_and_train.py,--model=FlatModel,--epochs=30001,--bucket=$BUCKET_NAME,--version=1
|
||||
```
|
||||
|
||||
You can verify the parameter settings in the params.libsonnet in the directory kubeflow_ks_app/components.
|
||||
This file keeps track of all the parameters used to instantiate components from prototypes.
|
||||
In order to submit our tf-job, we need to add our cloud cluster as an environment.
|
||||
Next we can launch the tf-job to our cloud environment and follow the progress via the logs of the pod.
|
||||
Next we can launch the tf-job to our Kubeflow cluster and follow the progress via the logs of the pod.
|
||||
|
||||
```
|
||||
ks env add cloud
|
||||
ks apply cloud -c train
|
||||
POD_NAME=$(kubectl get pods --selector=tf_job_key=$TRAINING_NAME,tf-replica-type=worker \
|
||||
ks apply default -c train
|
||||
POD_NAME=$(kubectl get pods --selector=tf_job_name=$TRAINING_NAME,tf-replica-type=worker \
|
||||
--template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}')
|
||||
kubectl logs -f $POD_NAME
|
||||
```
|
||||
|
||||
In the logs you can see that the trained model is being exported to google cloud storage. This saved model will be used later on for serving requests. With these parameters, the accuracy on the test set is approximating about 60%.
|
||||
Alternatively, you can also port-forward the ambassador and check the progress on ```localhost:8080```.
|
||||
The ambassador functions as the central point of the Kubeflow deployment and monitors the different components. From the ambassador, you can see the Jupyter Notebooks, tf-jobs and kubernetes resources.
|
||||
|
||||
```
|
||||
POD=`kubectl get pods --selector=service=ambassador | awk '{print $1}' | tail -1`
|
||||
kubectl port-forward $POD 8080:80 2>&1 >/dev/null &
|
||||
```
|
||||
|
||||
### Deploy and serve with TF-serving
|
||||
Once the model is trained, the next step will be to deploy it and serve requests.
|
||||
Kubeflow comes with a TF-serving module which you can use to deploy your model with only a few commands.
|
||||
```
|
||||
ks generate tf-serving serve --name=tf-serving
|
||||
ks param set serve modelPath gs://$BUCKET_NAME/
|
||||
ks apply cloud -c serve
|
||||
ks param set serve modelPath gs://$BUCKET_NAME/model/
|
||||
ks apply default -c serve
|
||||
```
|
||||
|
||||
After running these commands, a deployment and service will be launched on Kubernetes that will enable you to easily send requests to get predictions from your module.
|
||||
We will do a local test via GRPC to illustrate how to get results from this serving component. Once the pod is up we can set up port-forwarding to our localhost.
|
||||
Let's check if the model is loaded successfully.
|
||||
|
||||
```
|
||||
POD=`kubectl get pods --selector=app=tf-serving | awk '{print $1}' | tail -1`
|
||||
kubectl logs -f $POD
|
||||
```
|
||||
|
||||
We will do a local test via GRPC to illustrate how to get results from this serving component. Once the pod is up we can set up port-forwarding to our localhost.
|
||||
```
|
||||
kubectl port-forward $POD 9000:9000 2>&1 >/dev/null &
|
||||
```
|
||||
|
||||
|
@ -177,9 +140,9 @@ Now the only thing we need to do is send a request to ```localhost:9000``` with
|
|||
The saved model expects a time series from closing stocks and spits out the prediction as a 0 (S&P closes positive) or 1 (S&P closes negative) together with the version of the saved model which was memorized upon saving the model.
|
||||
Let's start with a script that populates a request with random numbers to test the service.
|
||||
```
|
||||
cd ..
|
||||
cd ../../../tensorflow_model
|
||||
pip3 install numpy tensorflow-serving-api
|
||||
python3 -m tensorflow_model.serving_requests.request_random
|
||||
python3 -m serving_requests.request_random
|
||||
```
|
||||
|
||||
The output should return an integer, 0 or 1 as explained above, and a string that represents the version.
|
||||
|
@ -188,10 +151,10 @@ In the following script, the same date is used as the one used at the end of the
|
|||
|
||||
```
|
||||
pip3 install pandas
|
||||
python3 -m tensorflow_model.serving_requests.request
|
||||
python3 -m serving_requests.request
|
||||
```
|
||||
|
||||
The response should indicate that S&P index is expected to close positive but from the actual data (which is prospected in the notebook mentioned above) we can see that it actually closed negative that day.
|
||||
The response should indicate that S&P index is expected to close positive (0) but from the actual data (which is prospected in the notebook mentioned above) we can see that it actually closed negative that day.
|
||||
Let's get back to training and see if we can improve our accuracy.
|
||||
|
||||
### Running another tf-job and serving update
|
||||
|
@ -200,16 +163,17 @@ Submitting another training job with Kubeflow is very easy.
|
|||
By simply adjusting the parameters we can instantiate another component from the ```train.jsonnet```prototype.
|
||||
This time, we will train a more complex neural network with several hidden layers.
|
||||
```
|
||||
cd kubeflow_ks_app
|
||||
cd ../<kubeflow_src>/<kf_app>/ks_app
|
||||
export TRAINING_NAME=trainingjob2
|
||||
ks param set train name $TRAINING_NAME
|
||||
ks param set train args -- python,run_train.py,--model=DeepModel,--epochs=30001,--bucket=$BUCKET_NAME,--version=2
|
||||
ks apply cloud -c train
|
||||
ks param set train args -- python,run_preprocess_and_train.py,--model=DeepModel,--epochs=30001,--bucket=$BUCKET_NAME,--version=2
|
||||
ks apply default -c train
|
||||
```
|
||||
|
||||
Verify the logs or use ```kubectl describe tfjobs trainingjob2```
|
||||
|
||||
```
|
||||
POD_NAME=$(kubectl get pods --selector=tf_job_key=$TRAINING_NAME,tf-replica-type=worker \
|
||||
POD_NAME=$(kubectl get pods --selector=tf_job_name=$TRAINING_NAME,tf-replica-type=worker \
|
||||
--template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}')
|
||||
kubectl logs -f $POD_NAME
|
||||
```
|
||||
|
@ -221,8 +185,8 @@ Since the newer version has a higher number than the previous one, our tf-servin
|
|||
Let's see if we get a response from the new version and if the new model gets it right this time.
|
||||
|
||||
```
|
||||
cd ..
|
||||
python3 -m tensorflow_model.serving_requests.request
|
||||
cd ../../../tensorflow_model
|
||||
python3 -m serving_requests.request
|
||||
```
|
||||
|
||||
The response returns the updated version number '2' and predicts the correct output 1, which means the S&P index closes negative, hurray!
|
||||
|
@ -231,10 +195,12 @@ The response returns the updated version number '2' and predicts the correct ou
|
|||
|
||||
Can we also run the tf-job on a GPU?
|
||||
Imagine the training job does not just take a few minutes but rather hours or days.
|
||||
In this case we can reduce the training time by using a GPU. The GKE deployment script for Kubeflow automatically adds a GPU-pool that can scale as needed so you don’t need to pay for a GPU when you don’t need it.
|
||||
Note that the Kubeflow deployment also installs the necessary Nvidia drivers for you so there is no need for you to worry about extra GPU device plugins.
|
||||
|
||||
We will need another image that installs ```tensorflow-gpu``` and has the necessary drivers.
|
||||
|
||||
```
|
||||
cd tensorflow_model
|
||||
cp GPU/Dockerfile ./Dockerfile
|
||||
export TRAIN_PATH_GPU=gcr.io/<project-name>/<image-name>/gpu:v1
|
||||
gcloud builds submit --tag $TRAIN_PATH_GPU .
|
||||
|
@ -244,59 +210,75 @@ Also the train.jsonnet will need to be slightly adjusted to make it flexible to
|
|||
You can simply copy the adjusted jsonnet by running following command.
|
||||
|
||||
```
|
||||
cp GPU/train.jsonnet ../kubeflow_ks_app/components/train.jsonnet
|
||||
cp GPU/train.jsonnet ../<kubeflow_src>/<kf_app>/ks_app/components/train.jsonnet
|
||||
```
|
||||
|
||||
Now we have to add a GPU to our cloud cluster.
|
||||
We will create a separate pool and install the necessary NVIDIA GPU device drivers.
|
||||
For more instruction on how to handle GPUs on Kubernetes, see https://cloud.google.com/kubernetes-engine/docs/how-to/gpus.
|
||||
|
||||
```
|
||||
gcloud container node-pools create gpu-pool --accelerator type=nvidia-tesla-k80,count=1 --zone europe-west1-b --cluster kubeflow --num-nodes 1 --min-nodes 1 --max-nodes 1 --enable-autoscaling --scopes=https://www.googleapis.com/auth/cloud-platform
|
||||
kubectl apply -f https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/stable/nvidia-driver-installer/cos/daemonset-preloaded.yaml
|
||||
```
|
||||
|
||||
Subsequently, the parameters must be updated to fit with new prototype in ```train.jsonnet```.
|
||||
```
|
||||
cd ../kubeflow_ks_app
|
||||
export TRAINING_NAME=trainingjob3
|
||||
cd ../<kubeflow_src>/<kf_app>/ks_app
|
||||
export TRAINING_NAME=trainingjobgpu
|
||||
ks param set train name $TRAINING_NAME
|
||||
ks param delete train image
|
||||
ks param set train cpuImage $TRAIN_PATH
|
||||
ks param set train gpuImage $TRAIN_PATH_GPU
|
||||
ks param set train num_gpu 1
|
||||
ks param set train args -- python,run_preprocess_and_train.py,--model=DeepModel,--epochs=30001,--bucket=$BUCKET_NAME,--version=3
|
||||
```
|
||||
Next we can deploy the tf-job to our GPU by simpy running following command.
|
||||
|
||||
Next we can deploy the tf-job to our GPU by simply running following command.
|
||||
|
||||
```
|
||||
ks apply cloud -c train
|
||||
ks apply default -c train
|
||||
```
|
||||
|
||||
First the pod will be unschedulable as there are no gpu-pool nodes available. This demand will be recognized by the kubernetes cluster and a node will be created on the gpu-pool automatically.
|
||||
Once the pod is up, you can check the logs and verify that the training time is significantly reduced compared to the previous tf-job.
|
||||
```
|
||||
POD_NAME=$(kubectl get pods --selector=tf_job_key=$TRAINING_NAME,tf-replica-type=worker \
|
||||
POD_NAME=$(kubectl get pods --selector=tf_job_name=$TRAINING_NAME,tf-replica-type=worker \
|
||||
--template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}')
|
||||
kubectl logs -f $POD_NAME
|
||||
```
|
||||
|
||||
### Kubeflow Pipelines
|
||||
Up to now, we clustered the preprocessing and training in a single script to illustrate the TFJobs.
|
||||
In practice, most often the preprocessing and training step will separated and they will need to run sequentially each time.
|
||||
In this way, we decouple the preprocessing from the training and can iterate faster different ML flows.
|
||||
Kubeflow pipelines offers an easy way of chaining these steps together and we will illustrate that here.
|
||||
As you can see, the script `run_preprocess_and_train.py` was using the two scripts `run_preprocess.py` and `run_train.py` underlying.
|
||||
The idea here is that these two steps will be containerized and chained together by Kubeflow pipelines.
|
||||
|
||||
KFP asks us to compile our pipeline Python3 file into a domain-specific-language.
|
||||
We do that with a tool called dsl-compile that comes with the Python3 SDK. So, first install that SDK:
|
||||
```
|
||||
pip3 install python-dateutil https://storage.googleapis.com/ml-pipeline/release/0.1.2/kfp.tar.gz --upgrade
|
||||
```
|
||||
|
||||
Update the `ml_pipeline.py` with the cpu image path that you built in the previous steps and your bucket name.
|
||||
Then, compile the DSL, using:
|
||||
```
|
||||
cd ../../../tensorflow_model
|
||||
python3 ml_pipeline.py
|
||||
```
|
||||
|
||||
Now a file `ml_pipeline.py.tar_gz` is generated that we can upload to the kubeflow pipelines UI.
|
||||
We will navigate again back to the Kubeflow UI homepage on `https://<kf_app>.endpoints.<project_id>.cloud.goog/` and click on the Pipeline dashboard.
|
||||
|
||||
|
||||
Once the browser is open, upload the tar.gz file. This simply makes the graph available.
|
||||
Next we can create a run and specify the params for the run. Make sure to specify version to 4 to check if this run creates a new saved model.
|
||||
When the pipeline is running, you can inspect the logs:
|
||||
|
||||

|
||||
|
||||
|
||||
### Clean up
|
||||
To clean up, we will simply run the Kubeflow deletion bash script which takes care of deleting all components in a correct manner.
|
||||
|
||||
```
|
||||
kubectl delete tfjobs trainingjob1
|
||||
kubectl delete tfjobs trainingjob2
|
||||
kubectl delete tfjobs trainingjob3
|
||||
ks delete cloud -c train
|
||||
ks delete cloud -c serve
|
||||
ks delete cloud -c kubeflow-core
|
||||
gcloud container clusters delete kubeflow
|
||||
cd ../<kubeflow_src>/<kf_app>
|
||||
${KUBEFLOW_REPO}/scripts/kfctl.sh delete all
|
||||
```
|
||||
|
||||
|
||||
### Summary
|
||||
Kubeflow makes it easy for everyone to develop, deploy, and manage portable, scalable ML everywhere and supports the full lifecycle of an ML product, including iteration via Jupyter notebooks.
|
||||
It removes the need for expertise in a large number of areas, reducing the barrier to entry for developing and maintaining ML products.
|
||||
|
||||
If you want to get started with Kubeflow, make sure to checkout the updated information on https://www.kubeflow.org.
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
Binary file not shown.
After Width: | Height: | Size: 366 KiB |
|
@ -16,8 +16,7 @@ FROM tensorflow/tensorflow:1.8.0-devel-py3
|
|||
|
||||
RUN pip3 install google-cloud-storage==1.10.0 \
|
||||
google-cloud-bigquery==1.6.0 \
|
||||
pandas==0.23.4 \
|
||||
numpy==1.15.2
|
||||
pandas==0.23.4
|
||||
|
||||
COPY . /opt/workdir
|
||||
WORKDIR /opt/workdir
|
||||
|
|
|
@ -15,7 +15,7 @@ local args =
|
|||
std.split(argsParam, ",");
|
||||
|
||||
local tfjob = {
|
||||
apiVersion: "kubeflow.org/v1alpha2",
|
||||
apiVersion: "kubeflow.org/v1beta1",
|
||||
kind: "TFJob",
|
||||
metadata: {
|
||||
name: name,
|
||||
|
|
|
@ -16,8 +16,7 @@ FROM tensorflow/tensorflow:1.8.0-devel-py3
|
|||
|
||||
RUN pip3 install google-cloud-storage==1.10.0 \
|
||||
google-cloud-bigquery==1.6.0 \
|
||||
pandas==0.23.4 \
|
||||
numpy==1.15.2
|
||||
pandas==0.23.4
|
||||
|
||||
COPY . /opt/workdir
|
||||
WORKDIR /opt/workdir
|
||||
|
|
|
@ -16,8 +16,7 @@ FROM tensorflow/tensorflow:1.8.0-devel-gpu-py3
|
|||
|
||||
RUN pip3 install google-cloud-storage==1.10.0 \
|
||||
google-cloud-bigquery==1.6.0 \
|
||||
pandas==0.23.4 \
|
||||
numpy==1.15.2
|
||||
pandas==0.23.4
|
||||
|
||||
COPY . /opt/workdir
|
||||
WORKDIR /opt/workdir
|
||||
|
|
|
@ -15,7 +15,7 @@ local args =
|
|||
std.split(argsParam, ",");
|
||||
|
||||
local tfjob = {
|
||||
apiVersion: "kubeflow.org/v1alpha2",
|
||||
apiVersion: "kubeflow.org/v1beta1",
|
||||
kind: "TFJob",
|
||||
metadata: {
|
||||
name: name,
|
||||
|
|
|
@ -7,7 +7,7 @@ import numpy as np
|
|||
import pandas as pd
|
||||
|
||||
|
||||
def load_data(tickers):
|
||||
def load_data(tickers, year_cutoff=None):
|
||||
"""Load stock market data (close values for each day) for given tickers.
|
||||
|
||||
Args:
|
||||
|
@ -23,8 +23,11 @@ def load_data(tickers):
|
|||
# get the data
|
||||
bq_query = {}
|
||||
for ticker in tickers:
|
||||
bq_query[ticker] = bigquery_client.query(
|
||||
'SELECT Date, Close from `bingo-ml-1.market_data.{}`'.format(ticker))
|
||||
query = 'SELECT Date, Close from `bingo-ml-1.market_data.{}`'.format(ticker)
|
||||
if year_cutoff:
|
||||
query += 'WHERE EXTRACT(YEAR FROM Date) >= {}'.format(year_cutoff)
|
||||
bq_query[ticker] = bigquery_client.query(query)
|
||||
|
||||
results = {}
|
||||
for ticker in tickers:
|
||||
results[ticker] = bq_query[ticker].result().to_dataframe().set_index('Date')
|
|
@ -0,0 +1,39 @@
|
|||
"""Module that handles downloads and uploads to Google Cloud Storage.
|
||||
|
||||
Helpers functions to perform uploads and downloads from Google Cloud Storage.
|
||||
"""
|
||||
import os
|
||||
from google.cloud import storage
|
||||
|
||||
|
||||
def upload_to_storage(bucket, export_path):
|
||||
"""Upload files from export path to Google Cloud Storage.
|
||||
|
||||
Args:
|
||||
bucket (str): Google Cloud Storage bucket
|
||||
export_path (str): export path
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
client = storage.Client()
|
||||
bucket = client.get_bucket(bucket)
|
||||
if bucket:
|
||||
for root, _, files in os.walk(export_path):
|
||||
for file in files:
|
||||
path = os.path.join(root, file)
|
||||
blob = bucket.blob(path)
|
||||
blob.upload_from_filename(path)
|
||||
|
||||
|
||||
def download_blob(bucket_name, source_blob_name, destination_file_name):
|
||||
"""Downloads a blob from the bucket."""
|
||||
storage_client = storage.Client()
|
||||
bucket = storage_client.get_bucket(bucket_name)
|
||||
blob = bucket.blob(source_blob_name)
|
||||
|
||||
blob.download_to_filename(destination_file_name)
|
||||
|
||||
print('Blob {} downloaded to {}.'.format(
|
||||
source_blob_name,
|
||||
destination_file_name))
|
|
@ -0,0 +1,70 @@
|
|||
#!/usr/bin/env python3
|
||||
# Copyright 2018 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import kfp.dsl as dsl
|
||||
|
||||
|
||||
class Preprocess(dsl.ContainerOp):
|
||||
|
||||
def __init__(self, name, bucket, cutoff_year):
|
||||
super(Preprocess, self).__init__(
|
||||
name=name,
|
||||
# image needs to be a compile-time string
|
||||
image='gcr.io/<project>/<image-name>/cpu:v1',
|
||||
command=['python3', 'run_preprocess.py'],
|
||||
arguments=[
|
||||
'--bucket', bucket,
|
||||
'--cutoff_year', cutoff_year,
|
||||
'--kfp'
|
||||
],
|
||||
file_outputs={'blob-path': '/blob_path.txt'}
|
||||
)
|
||||
|
||||
class Train(dsl.ContainerOp):
|
||||
|
||||
def __init__(self, name, blob_path, version, bucket, model):
|
||||
super(Train, self).__init__(
|
||||
name=name,
|
||||
# image needs to be a compile-time string
|
||||
image='gcr.io/<project>/<image-name>/cpu:v1',
|
||||
command=['python3', 'run_train.py'],
|
||||
arguments=[
|
||||
'--version', version,
|
||||
'--blob_path', blob_path,
|
||||
'--bucket', bucket,
|
||||
'--model', model
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@dsl.pipeline(
|
||||
name='financial time series',
|
||||
description='Train Financial Time Series'
|
||||
)
|
||||
def train_and_deploy(
|
||||
bucket=dsl.PipelineParam('bucket', value='<bucket>'),
|
||||
cutoff_year=dsl.PipelineParam('cutoff-year', value='2010'),
|
||||
version=dsl.PipelineParam('version', value='4'),
|
||||
model=dsl.PipelineParam('model', value='DeepModel')
|
||||
):
|
||||
"""Pipeline to train financial time series model"""
|
||||
preprocess_op = Preprocess('preprocess', bucket, cutoff_year)
|
||||
#pylint: disable=unused-variable
|
||||
train_op = Train('train and deploy', preprocess_op.output, version, bucket, model)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import kfp.compiler as compiler
|
||||
compiler.Compiler().compile(train_and_deploy, __file__ + '.tar.gz')
|
|
@ -0,0 +1,77 @@
|
|||
"""Module for running the data retrieval and preprocessing.
|
||||
|
||||
Scripts that performs all the steps to get the train and perform preprocessing.
|
||||
"""
|
||||
import logging
|
||||
import argparse
|
||||
import sys
|
||||
import shutil
|
||||
import os
|
||||
|
||||
#pylint: disable=no-name-in-module
|
||||
from helpers import preprocess
|
||||
from helpers import storage as storage_helper
|
||||
|
||||
|
||||
def parse_arguments(argv):
|
||||
"""Parse command line arguments
|
||||
Args:
|
||||
argv (list): list of command line arguments including program name
|
||||
Returns:
|
||||
The parsed arguments as returned by argparse.ArgumentParser
|
||||
"""
|
||||
parser = argparse.ArgumentParser(description='Preprocessing')
|
||||
|
||||
parser.add_argument('--bucket',
|
||||
type=str,
|
||||
help='GCS bucket where preprocessed data is saved',
|
||||
default='<your-bucket-name>')
|
||||
|
||||
parser.add_argument('--cutoff_year',
|
||||
type=str,
|
||||
help='Cutoff year for the stock data',
|
||||
default='2010')
|
||||
|
||||
parser.add_argument('--kfp',
|
||||
dest='kfp',
|
||||
action='store_true',
|
||||
help='Kubeflow pipelines flag')
|
||||
|
||||
args, _ = parser.parse_known_args(args=argv[1:])
|
||||
|
||||
return args
|
||||
|
||||
|
||||
def run_preprocess(argv=None):
|
||||
"""Runs the retrieval and preprocessing of the data.
|
||||
|
||||
Args:
|
||||
args: args that are passed when submitting the training
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
logging.info('starting preprocessing of data..')
|
||||
args = parse_arguments(sys.argv if argv is None else argv)
|
||||
tickers = ['snp', 'nyse', 'djia', 'nikkei', 'hangseng', 'ftse', 'dax', 'aord']
|
||||
closing_data = preprocess.load_data(tickers, args.cutoff_year)
|
||||
time_series = preprocess.preprocess_data(closing_data)
|
||||
logging.info('preprocessing of data complete..')
|
||||
|
||||
logging.info('starting uploading of the preprocessed data on GCS..')
|
||||
temp_folder = 'data'
|
||||
if not os.path.exists(temp_folder):
|
||||
os.mkdir(temp_folder)
|
||||
file_path = os.path.join(temp_folder, 'data_{}.csv'.format(args.cutoff_year))
|
||||
time_series.to_csv(file_path, index=False)
|
||||
storage_helper.upload_to_storage(args.bucket, temp_folder)
|
||||
shutil.rmtree(temp_folder)
|
||||
if args.kfp:
|
||||
with open("/blob_path.txt", "w") as output_file:
|
||||
output_file.write(file_path)
|
||||
logging.info('upload of the preprocessed data on GCS completed..')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
run_preprocess()
|
|
@ -0,0 +1,72 @@
|
|||
"""Module for running the training of the machine learning model.
|
||||
|
||||
Scripts that performs all the steps to train the ML model.
|
||||
"""
|
||||
import logging
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
|
||||
from run_preprocess import run_preprocess
|
||||
from run_train import run_training
|
||||
|
||||
|
||||
|
||||
def parse_arguments(argv):
|
||||
"""Parse command line arguments
|
||||
Args:
|
||||
argv (list): list of command line arguments including program name
|
||||
Returns:
|
||||
The parsed arguments as returned by argparse.ArgumentParser
|
||||
"""
|
||||
parser = argparse.ArgumentParser(description='Preprocess and Train')
|
||||
|
||||
parser.add_argument('--cutoff_year',
|
||||
type=str,
|
||||
help='Cutoff year for the stock data',
|
||||
default='2010')
|
||||
|
||||
parser.add_argument('--bucket',
|
||||
type=str,
|
||||
help='GCS bucket to store data and ML models',
|
||||
default='<your-bucket-name>')
|
||||
|
||||
|
||||
parser.add_argument('--model',
|
||||
type=str,
|
||||
help='model to be used for training',
|
||||
default='DeepModel',
|
||||
choices=['FlatModel', 'DeepModel'])
|
||||
|
||||
parser.add_argument('--epochs',
|
||||
type=int,
|
||||
help='number of epochs to train',
|
||||
default=30001)
|
||||
|
||||
parser.add_argument('--version',
|
||||
type=str,
|
||||
help='version (stored for serving)',
|
||||
default='1')
|
||||
|
||||
args, _ = parser.parse_known_args(args=argv[1:])
|
||||
|
||||
return args
|
||||
|
||||
def run_preprocess_and_train(argv=None):
|
||||
"""Runs the ML model pipeline.
|
||||
|
||||
Args:
|
||||
args: args that are passed when submitting the training
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
args = parse_arguments(sys.argv if argv is None else argv)
|
||||
run_preprocess(sys.argv)
|
||||
sys.argv.append('--blob_path=data/data_{}.csv'.format(args.cutoff_year))
|
||||
run_training(sys.argv)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
run_preprocess_and_train()
|
|
@ -6,45 +6,58 @@ import logging
|
|||
import os
|
||||
import argparse
|
||||
import time
|
||||
from google.cloud import storage
|
||||
import shutil
|
||||
import sys
|
||||
import pandas as pd
|
||||
import tensorflow as tf
|
||||
|
||||
import models
|
||||
import preprocess
|
||||
import metrics
|
||||
#pylint: disable=no-name-in-module
|
||||
from helpers import preprocess, models, metrics
|
||||
from helpers import storage as storage_helper
|
||||
|
||||
|
||||
def get_preprocessed_data():
|
||||
"""Obtain the preprocessed data."""
|
||||
tickers = ['snp', 'nyse', 'djia', 'nikkei', 'hangseng', 'ftse', 'dax', 'aord']
|
||||
closing_data = preprocess.load_data(tickers)
|
||||
time_series = preprocess.preprocess_data(closing_data)
|
||||
training_test_data = preprocess.train_test_split(time_series, train_test_ratio=0.8)
|
||||
return training_test_data
|
||||
|
||||
|
||||
def upload_to_storage(bucket, export_path):
|
||||
"""Upload files from export path to Google Cloud Storage.
|
||||
|
||||
def parse_arguments(argv):
|
||||
"""Parse command line arguments
|
||||
Args:
|
||||
bucket (str): Google Cloud Storage bucket
|
||||
export_path (str): export path
|
||||
|
||||
argv (list): list of command line arguments including program name
|
||||
Returns:
|
||||
|
||||
The parsed arguments as returned by argparse.ArgumentParser
|
||||
"""
|
||||
client = storage.Client()
|
||||
bucket = client.get_bucket(bucket)
|
||||
if bucket:
|
||||
for root, _, files in os.walk(export_path):
|
||||
for file in files:
|
||||
path = os.path.join(root, file)
|
||||
blob = bucket.blob(path)
|
||||
blob.upload_from_filename(path)
|
||||
parser = argparse.ArgumentParser(description='Training')
|
||||
|
||||
parser.add_argument('--model',
|
||||
type=str,
|
||||
help='model to be used for training',
|
||||
default='DeepModel',
|
||||
choices=['FlatModel', 'DeepModel'])
|
||||
|
||||
parser.add_argument('--epochs',
|
||||
type=int,
|
||||
help='number of epochs to train',
|
||||
default=30001)
|
||||
|
||||
parser.add_argument('--version',
|
||||
type=str,
|
||||
help='version (stored for serving)',
|
||||
default='1')
|
||||
|
||||
parser.add_argument('--bucket',
|
||||
type=str,
|
||||
help='GCS bucket to store data and ML models',
|
||||
default='<your-bucket-name>')
|
||||
|
||||
parser.add_argument('--blob_path',
|
||||
type=str,
|
||||
help='GCS blob path where data is saved',
|
||||
default='data')
|
||||
|
||||
args, _ = parser.parse_known_args(args=argv[1:])
|
||||
|
||||
return args
|
||||
|
||||
|
||||
def run_training(args):
|
||||
"""Runs the ML model training script.
|
||||
def run_training(argv=None):
|
||||
"""Runs the ML model training.
|
||||
|
||||
Args:
|
||||
args: args that are passed when submitting the training
|
||||
|
@ -53,12 +66,20 @@ def run_training(args):
|
|||
|
||||
"""
|
||||
# parse args
|
||||
logging.info('parsing args...')
|
||||
args = parse_arguments(sys.argv if argv is None else argv)
|
||||
logging.info('getting the ML model...')
|
||||
model = getattr(models, args.model)(nr_predictors=24, nr_classes=2)
|
||||
|
||||
# get the data
|
||||
logging.info('getting the data...')
|
||||
training_test_data = get_preprocessed_data()
|
||||
temp_folder = 'data'
|
||||
if not os.path.exists(temp_folder):
|
||||
os.mkdir(temp_folder)
|
||||
file_path = os.path.join(temp_folder, 'data.csv')
|
||||
storage_helper.download_blob(args.bucket, args.blob_path, file_path)
|
||||
time_series = pd.read_csv(file_path)
|
||||
training_test_data = preprocess.train_test_split(time_series, 0.8)
|
||||
|
||||
|
||||
# define training objective
|
||||
logging.info('defining the training objective...')
|
||||
|
@ -112,7 +133,7 @@ def run_training(args):
|
|||
# create signature for TensorFlow Serving
|
||||
logging.info('Exporting model for tensorflow-serving...')
|
||||
|
||||
export_path = args.version
|
||||
export_path = os.path.join("model", args.version)
|
||||
tf.saved_model.simple_save(
|
||||
sess,
|
||||
export_path,
|
||||
|
@ -123,37 +144,13 @@ def run_training(args):
|
|||
|
||||
# save model on GCS
|
||||
logging.info("uploading to " + args.bucket + "/" + export_path)
|
||||
upload_to_storage(args.bucket, export_path)
|
||||
storage_helper.upload_to_storage(args.bucket, export_path)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Training')
|
||||
|
||||
parser.add_argument('--model',
|
||||
type=str,
|
||||
help='model to be used for training',
|
||||
default='DeepModel',
|
||||
choices=['FlatModel', 'DeepModel'])
|
||||
|
||||
parser.add_argument('--epochs',
|
||||
type=int,
|
||||
help='number of epochs to train',
|
||||
default=30001)
|
||||
|
||||
parser.add_argument('--version',
|
||||
type=str,
|
||||
help='version (stored for serving)',
|
||||
default='1')
|
||||
|
||||
parser.add_argument('--bucket',
|
||||
type=str,
|
||||
help='GCS bucket where model is saved',
|
||||
default='<your-bucket-name>')
|
||||
|
||||
args = parser.parse_args()
|
||||
run_training(args)
|
||||
# remove local files
|
||||
shutil.rmtree(export_path)
|
||||
shutil.rmtree(temp_folder)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
main()
|
||||
run_training()
|
||||
|
|
|
@ -4,8 +4,9 @@ Obtains the prediction for a given date in the test.
|
|||
"""
|
||||
import numpy as np
|
||||
|
||||
#pylint: disable=no-name-in-module
|
||||
from helpers import preprocess
|
||||
from . import request_helper #pylint: disable=relative-beyond-top-level
|
||||
from .. import preprocess #pylint: disable=relative-beyond-top-level
|
||||
|
||||
|
||||
def send_pratical_request(date="2014-08-12"):
|
||||
|
|
Loading…
Reference in New Issue