mirror of https://github.com/kubeflow/examples.git
				
				
				
			Kaggle to kfp (#938)
* Add files via upload * Kaggle to kfp Converted Kaggle notebook of Facial-Keypoint-Detection to Kubeflow pipeline * Kaggle to kfp
This commit is contained in:
		
							parent
							
								
									97cb872bcf
								
							
						
					
					
						commit
						7a02695ac4
					
				|  | @ -0,0 +1,43 @@ | |||
| # Objective | ||||
| Here we convert the https://www.kaggle.com/competitions/facial-keypoints-detection code to kfp-pipeline  | ||||
| The objective of this task is to predict keypoint positions on face images | ||||
| 
 | ||||
| # Testing enviornment | ||||
| The pipeline is tested on `Kubeflow 1.4` and `kfp 1.1.2` , it should be compatible with previous releases of Kubeflow . kfp version used for testing is 1.1.2 which can be installed as `pip install kfp==1.1.2`   | ||||
| 
 | ||||
| # Components used | ||||
| 
 | ||||
| ## Docker | ||||
| Docker is used to create an enviornment to run each component. | ||||
| 
 | ||||
| ## Kubeflow pipelines | ||||
| Kubeflow pipelines connect each docker component and create a pipeline. Each Kubeflow pipeline is reproducable workflow wherein we pass input arguments and run entire workflow.   | ||||
| 
 | ||||
| # Docker | ||||
| We start with creating a docker account on dockerhub (https://hub.docker.com/). We signup with our individual email. After signup is compelete login to docker using your username and password using the command `docker login` on your terminal | ||||
| 
 | ||||
| ## Build train image | ||||
| Navigate to `train` directory, create a folder named `my_data` and put your `training.zip` and `test.zip` data from Kaggle repo in this folder and build docker image using : | ||||
| ``` | ||||
| docker build -t <docker_username>/<docker_imagename>:<tag> . | ||||
| ``` | ||||
| In my case this is: | ||||
| ``` | ||||
| docker build -t hubdocker76/demotrain:v1 . | ||||
| ``` | ||||
| 
 | ||||
| ## Build evaluate image | ||||
| Navigate to eval directory and build docker image using : | ||||
| ``` | ||||
| docker build -t <docker_username>/<docker_imagename>:<tag> . | ||||
| ``` | ||||
| In my case this is: | ||||
| ``` | ||||
| docker build -t hubdocker76/demoeval:v2 . | ||||
| ``` | ||||
| # Kubeflow pipelines | ||||
| 
 | ||||
| Go to generate-pipeline and run `python3 my_pipeline.py` this will generate a yaml file. which we can upload to Kubeflow pipelines UI and create a Run from it. | ||||
| 
 | ||||
| # Sample pipeline to run on Kubeflow | ||||
| Navigate to directory `geneate-pipeline` and run `python3 my_pipeline.py` this will generate yaml file. I have named this yaml as `face_pipeline_01.yaml`. Please upload this pipeline on Kubeflow and start a Run. | ||||
|  | @ -0,0 +1,14 @@ | |||
| FROM "ubuntu:bionic" | ||||
| RUN apt-get update && yes | apt-get upgrade | ||||
| RUN mkdir -p /tensorflow/models | ||||
| RUN apt-get install -y git python3-pip | ||||
| RUN pip3 install --upgrade pip | ||||
| RUN pip3 install tensorflow | ||||
| RUN pip3 install jupyter | ||||
| RUN pip3 install matplotlib | ||||
| RUN pip3 install kfp==1.1.2 | ||||
| RUN pip install opencv-python-headless | ||||
| RUN pip3 install pandas keras  | ||||
| RUN pip3 install sklearn | ||||
| RUN pip3 install autokeras | ||||
| COPY . / | ||||
|  | @ -0,0 +1,28 @@ | |||
| from tensorflow.keras.models import load_model | ||||
| import autokeras as ak | ||||
| import pandas as pd | ||||
| import numpy as np | ||||
| 
 | ||||
| ### Load model  | ||||
| loaded_model = load_model("/data/model_autokeras", custom_objects=ak.CUSTOM_OBJECTS) | ||||
| 
 | ||||
| ### Pint model summary | ||||
| print(loaded_model.summary()) | ||||
| 
 | ||||
| test_dir='/data/test.csv' | ||||
| test=pd.read_csv(test_dir) | ||||
| 
 | ||||
| X_test=[] | ||||
| for img in test['Image']: | ||||
|     X_test.append(np.asarray(img.split(),dtype=float).reshape(96,96,1)) | ||||
| X_test=np.reshape(X_test,(-1,96,96,1)) | ||||
| X_test = np.asarray(X_test).astype('float32') | ||||
| 
 | ||||
| ### predict  | ||||
| y_pred = loaded_model.predict(X_test) | ||||
| 
 | ||||
| ### Create submission file | ||||
| y_pred= y_pred.reshape(-1,) | ||||
| submission = pd.DataFrame({'Location': y_pred}) | ||||
| submission.to_csv('/data/submission.csv', index=True , index_label='RowId') | ||||
| 
 | ||||
|  | @ -0,0 +1,93 @@ | |||
| apiVersion: argoproj.io/v1alpha1 | ||||
| kind: Workflow | ||||
| metadata: | ||||
|   generateName: face-pipeline- | ||||
|   annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.1.2, pipelines.kubeflow.org/pipeline_compilation_time: '2022-03-27T11:03:51.876586', | ||||
|     pipelines.kubeflow.org/pipeline_spec: '{"description": "pipeline to detect facial | ||||
|       landmarks", "inputs": [{"name": "trial"}, {"name": "epoch"}, {"name": "patience"}], | ||||
|       "name": "face pipeline"}'} | ||||
|   labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.1.2} | ||||
| spec: | ||||
|   entrypoint: face-pipeline | ||||
|   templates: | ||||
|   - name: evaluate | ||||
|     container: | ||||
|       command: [python3, eval.py] | ||||
|       image: hubdocker76/demoeval:v2 | ||||
|       volumeMounts: | ||||
|       - {mountPath: /data, name: pvc} | ||||
|     inputs: | ||||
|       parameters: | ||||
|       - {name: pvc-name} | ||||
|     volumes: | ||||
|     - name: pvc | ||||
|       persistentVolumeClaim: {claimName: '{{inputs.parameters.pvc-name}}'} | ||||
|   - name: face-pipeline | ||||
|     inputs: | ||||
|       parameters: | ||||
|       - {name: epoch} | ||||
|       - {name: patience} | ||||
|       - {name: trial} | ||||
|     dag: | ||||
|       tasks: | ||||
|       - name: evaluate | ||||
|         template: evaluate | ||||
|         dependencies: [pvc, train] | ||||
|         arguments: | ||||
|           parameters: | ||||
|           - {name: pvc-name, value: '{{tasks.pvc.outputs.parameters.pvc-name}}'} | ||||
|       - {name: pvc, template: pvc} | ||||
|       - name: train | ||||
|         template: train | ||||
|         dependencies: [pvc] | ||||
|         arguments: | ||||
|           parameters: | ||||
|           - {name: epoch, value: '{{inputs.parameters.epoch}}'} | ||||
|           - {name: patience, value: '{{inputs.parameters.patience}}'} | ||||
|           - {name: pvc-name, value: '{{tasks.pvc.outputs.parameters.pvc-name}}'} | ||||
|           - {name: trial, value: '{{inputs.parameters.trial}}'} | ||||
|   - name: pvc | ||||
|     resource: | ||||
|       action: create | ||||
|       manifest: | | ||||
|         apiVersion: v1 | ||||
|         kind: PersistentVolumeClaim | ||||
|         metadata: | ||||
|           name: '{{workflow.name}}-pvc' | ||||
|         spec: | ||||
|           accessModes: | ||||
|           - ReadWriteOnce | ||||
|           resources: | ||||
|             requests: | ||||
|               storage: 1Gi | ||||
|     outputs: | ||||
|       parameters: | ||||
|       - name: pvc-manifest | ||||
|         valueFrom: {jsonPath: '{}'} | ||||
|       - name: pvc-name | ||||
|         valueFrom: {jsonPath: '{.metadata.name}'} | ||||
|       - name: pvc-size | ||||
|         valueFrom: {jsonPath: '{.status.capacity.storage}'} | ||||
|   - name: train | ||||
|     container: | ||||
|       args: [--trial, '{{inputs.parameters.trial}}', --epoch, '{{inputs.parameters.epoch}}', | ||||
|         --patience, '{{inputs.parameters.patience}}'] | ||||
|       command: [python3, train.py] | ||||
|       image: hubdocker76/demotrain:v1 | ||||
|       volumeMounts: | ||||
|       - {mountPath: /data, name: pvc} | ||||
|     inputs: | ||||
|       parameters: | ||||
|       - {name: epoch} | ||||
|       - {name: patience} | ||||
|       - {name: pvc-name} | ||||
|       - {name: trial} | ||||
|     volumes: | ||||
|     - name: pvc | ||||
|       persistentVolumeClaim: {claimName: '{{inputs.parameters.pvc-name}}'} | ||||
|   arguments: | ||||
|     parameters: | ||||
|     - {name: trial} | ||||
|     - {name: epoch} | ||||
|     - {name: patience} | ||||
|   serviceAccountName: pipeline-runner | ||||
|  | @ -0,0 +1,42 @@ | |||
| import kfp | ||||
| from kfp import dsl | ||||
| 
 | ||||
| def SendMsg(trial, epoch, patience): | ||||
|     vop = dsl.VolumeOp(name="pvc", | ||||
|                        resource_name="pvc", size='1Gi',  | ||||
|                        modes=dsl.VOLUME_MODE_RWO) | ||||
| 
 | ||||
|     return dsl.ContainerOp( | ||||
|         name = 'Train',  | ||||
|         image = 'hubdocker76/demotrain:v1',  | ||||
|         command = ['python3', 'train.py'], | ||||
|         arguments=[ | ||||
|             '--trial', trial, | ||||
|             '--epoch', epoch, | ||||
|             '--patience', patience | ||||
|         ], | ||||
|         pvolumes={ | ||||
|             '/data': vop.volume | ||||
|         } | ||||
|     ) | ||||
| 
 | ||||
| def GetMsg(comp1): | ||||
|     return dsl.ContainerOp( | ||||
|         name = 'Evaluate', | ||||
|         image = 'hubdocker76/demoeval:v2', | ||||
|         pvolumes={ | ||||
|             '/data': comp1.pvolumes['/data'] | ||||
|         }, | ||||
|         command = ['python3', 'eval.py'] | ||||
|     ) | ||||
| 
 | ||||
| @dsl.pipeline( | ||||
|     name = 'face pipeline', | ||||
|     description = 'pipeline to detect facial landmarks') | ||||
| def  passing_parameter(trial, epoch, patience): | ||||
|     comp1 = SendMsg(trial, epoch, patience) | ||||
|     comp2 = GetMsg(comp1) | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|   import kfp.compiler as compiler | ||||
|   compiler.Compiler().compile(passing_parameter, __file__ + '.yaml') | ||||
|  | @ -0,0 +1,14 @@ | |||
| FROM "ubuntu:bionic" | ||||
| RUN apt-get update && yes | apt-get upgrade | ||||
| RUN mkdir -p /tensorflow/models | ||||
| RUN apt-get install -y git python3-pip | ||||
| RUN pip3 install --upgrade pip | ||||
| RUN pip3 install tensorflow | ||||
| RUN pip3 install jupyter | ||||
| RUN pip3 install matplotlib | ||||
| RUN pip3 install kfp==1.1.2 | ||||
| RUN pip install opencv-python-headless | ||||
| RUN pip3 install pandas keras  | ||||
| RUN pip3 install sklearn | ||||
| RUN pip3 install autokeras | ||||
| COPY . / | ||||
|  | @ -0,0 +1,79 @@ | |||
| import numpy as np | ||||
| import os | ||||
| from sklearn.utils import shuffle            | ||||
| import matplotlib.pyplot as plt              | ||||
| import tensorflow as tf                 | ||||
| import pandas as pd | ||||
| from tensorflow.keras.models import load_model | ||||
| import os | ||||
| import shutil | ||||
| import argparse | ||||
| import autokeras as ak | ||||
| 
 | ||||
| ### Declaring input arguments  | ||||
| 
 | ||||
| parser = argparse.ArgumentParser() | ||||
| parser.add_argument('--trial', type=int) | ||||
| parser.add_argument('--epoch', type=int) | ||||
| parser.add_argument('--patience', type=int) | ||||
| 
 | ||||
| args = vars(parser.parse_args()) | ||||
| 
 | ||||
| trials = args['trial'] | ||||
| epochs = args['epoch'] | ||||
| patience = args['patience'] | ||||
| 
 | ||||
| project="Facial-keypoints" | ||||
| run_id= "1.8" | ||||
| resume_run = True | ||||
| 
 | ||||
| MAX_TRIALS=trials | ||||
| EPOCHS=epochs | ||||
| PATIENCE=patience | ||||
| 
 | ||||
| ### Data Extraction : extract data and save to attached extenal pvc at location /data ### | ||||
| 
 | ||||
| base_dir='my_data/' | ||||
| train_dir_zip=base_dir+'training.zip' | ||||
| test_dir_zip=base_dir+'test.zip' | ||||
| 
 | ||||
| from zipfile import ZipFile | ||||
| with ZipFile(train_dir_zip,'r') as zipObj: | ||||
|     zipObj.extractall('/data') | ||||
|     print("Train Archive unzipped") | ||||
| with ZipFile(test_dir_zip,'r') as zipObj: | ||||
|     zipObj.extractall('/data') | ||||
|     print("Test Archive unzipped") | ||||
| 
 | ||||
| 
 | ||||
| ## Data preprocess  | ||||
| 
 | ||||
| train_dir='/data/training.csv' | ||||
| test_dir='/data/test.csv' | ||||
| train=pd.read_csv(train_dir) | ||||
| test=pd.read_csv(test_dir) | ||||
| 
 | ||||
| train=train.dropna() | ||||
| train=train.reset_index(drop=True) | ||||
| 
 | ||||
| X_train=[] | ||||
| Y_train=[] | ||||
| 
 | ||||
| for img in train['Image']: | ||||
|     X_train.append(np.asarray(img.split(),dtype=float).reshape(96,96,1)) | ||||
| X_train=np.reshape(X_train,(-1,96,96,1)) | ||||
| X_train = np.asarray(X_train).astype('float32') | ||||
|      | ||||
| for i in range(len((train))):  | ||||
|     Y_train.append(np.asarray(train.iloc[i][0:30].to_numpy())) | ||||
| Y_train = np.asarray(Y_train).astype('float32') | ||||
| 
 | ||||
| 
 | ||||
| ## Data training | ||||
| 
 | ||||
| reg = ak.ImageRegressor(max_trials=MAX_TRIALS) | ||||
| reg.fit(X_train, Y_train, validation_split=0.15, epochs=EPOCHS) | ||||
| 
 | ||||
| # Export trained model to externally attached pvc  | ||||
| my_model = reg.export_model() | ||||
| my_model.save('/data/model_autokeras', save_format="tf") | ||||
		Loading…
	
		Reference in New Issue