[Sample] Add back visualization in XGBoost sample (#2384)
* init.
* add delete op
* Add new transform and analyze op
* Save.
* Partial work
* partial work
* WIP
* Clean up code.
* Disable xgboost cm check for now.
* Update commit SHA
* Update kw arg
* Correct the url format.
* Switch to gcp component for create cluster.
* add secret
* fix create cluster op
* fix path problem
* Move display name
* no op
* Improve
* doc
* Solve
* add back visualization
* fix
* Fix naming
* add back check for cm
* remove todo
* Update readme
* fix naming
* fix flakiness
* Revert "fix flakiness"
This reverts commit 81ba4460
This commit is contained in:
parent
ac5a04ab9e
commit
5c008f600c
|
|
@ -3,7 +3,10 @@
|
|||
The `xgboost_training_cm.py` pipeline creates XGBoost models on structured data in CSV format. Both classification and regression are supported.
|
||||
|
||||
The pipeline starts by creating an Google DataProc cluster, and then running analysis, transformation, distributed training and
|
||||
prediction in the created cluster. Finally, a delete cluster operation runs to destroy the cluster it creates
|
||||
prediction in the created cluster.
|
||||
Then a single node confusion-matrix and ROC aggregator is used (for classification case) to
|
||||
provide the confusion matrix data, and ROC data to the front end, respectively.
|
||||
Finally, a delete cluster operation runs to destroy the cluster it creates
|
||||
in the beginning. The delete cluster operation is used as an exit handler, meaning it will run regardless of whether the pipeline fails
|
||||
or not.
|
||||
|
||||
|
|
@ -45,3 +48,11 @@ Delete Cluster:
|
|||
|
||||
The container file is located [here](https://github.com/kubeflow/pipelines/tree/master/components/gcp/container)
|
||||
|
||||
For visualization, we use confusion matrix and ROC.
|
||||
Confusion Matrix:
|
||||
[source code](https://github.com/kubeflow/pipelines/tree/master/components/local/confusion_matrix/src),
|
||||
[container](https://github.com/kubeflow/pipelines/tree/master/components/local/confusion_matrix)
|
||||
and ROC:
|
||||
[source code](https://github.com/kubeflow/pipelines/tree/master/components/local/roc/src),
|
||||
[container](https://github.com/kubeflow/pipelines/tree/master/components/local/roc)
|
||||
|
||||
|
|
|
|||
|
|
@ -22,7 +22,9 @@ from kfp import gcp
|
|||
import os
|
||||
import subprocess
|
||||
|
||||
# TODO(numerology): add ROC and CM back once UI metadata is enabled in this sample.
|
||||
confusion_matrix_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/e598176c02f45371336ccaa819409e8ec83743df/components/local/confusion_matrix/component.yaml')
|
||||
|
||||
roc_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/e598176c02f45371336ccaa819409e8ec83743df/components/local/roc/component.yaml')
|
||||
|
||||
dataproc_create_cluster_op = components.load_component_from_url(
|
||||
'https://raw.githubusercontent.com/kubeflow/pipelines/677fbaa281125fd604b81eab2488513efee7b600/components/gcp/dataproc/create_cluster/component.yaml')
|
||||
|
|
@ -211,6 +213,7 @@ def xgb_train_pipeline(
|
|||
target='resolution',
|
||||
rounds=200,
|
||||
workers=2,
|
||||
true_label='ACTION',
|
||||
):
|
||||
output_template = str(output) + '/' + dsl.RUN_ID_PLACEHOLDER + '/data'
|
||||
|
||||
|
|
@ -227,7 +230,7 @@ def xgb_train_pipeline(
|
|||
region=region,
|
||||
name=cluster_name
|
||||
)):
|
||||
create_cluster_op = dataproc_create_cluster_op(
|
||||
_create_cluster_op = dataproc_create_cluster_op(
|
||||
project_id=project,
|
||||
region=region,
|
||||
name=cluster_name,
|
||||
|
|
@ -238,16 +241,16 @@ def xgb_train_pipeline(
|
|||
image_version='1.2'
|
||||
)
|
||||
|
||||
analyze_op = dataproc_analyze_op(
|
||||
_analyze_op = dataproc_analyze_op(
|
||||
project=project,
|
||||
region=region,
|
||||
cluster_name=cluster_name,
|
||||
schema=schema,
|
||||
train_data=train_data,
|
||||
output=output_template
|
||||
).after(create_cluster_op).set_display_name('Analyzer')
|
||||
).after(_create_cluster_op).set_display_name('Analyzer')
|
||||
|
||||
transform_op = dataproc_transform_op(
|
||||
_transform_op = dataproc_transform_op(
|
||||
project=project,
|
||||
region=region,
|
||||
cluster_name=cluster_name,
|
||||
|
|
@ -256,9 +259,9 @@ def xgb_train_pipeline(
|
|||
target=target,
|
||||
analysis=analyze_output,
|
||||
output=output_template
|
||||
).after(analyze_op).set_display_name('Transformer')
|
||||
).after(_analyze_op).set_display_name('Transformer')
|
||||
|
||||
train_op = dataproc_train_op(
|
||||
_train_op = dataproc_train_op(
|
||||
project=project,
|
||||
region=region,
|
||||
cluster_name=cluster_name,
|
||||
|
|
@ -269,9 +272,9 @@ def xgb_train_pipeline(
|
|||
workers=workers,
|
||||
rounds=rounds,
|
||||
output=train_output
|
||||
).after(transform_op).set_display_name('Trainer')
|
||||
).after(_transform_op).set_display_name('Trainer')
|
||||
|
||||
predict_op = dataproc_predict_op(
|
||||
_predict_op = dataproc_predict_op(
|
||||
project=project,
|
||||
region=region,
|
||||
cluster_name=cluster_name,
|
||||
|
|
@ -280,7 +283,19 @@ def xgb_train_pipeline(
|
|||
target=target,
|
||||
analysis=analyze_output,
|
||||
output=predict_output
|
||||
).after(train_op).set_display_name('Predictor')
|
||||
).after(_train_op).set_display_name('Predictor')
|
||||
|
||||
_cm_op = confusion_matrix_op(
|
||||
predictions=os.path.join(predict_output, 'part-*.csv'),
|
||||
output_dir=output_template
|
||||
).after(_predict_op)
|
||||
|
||||
_roc_op = roc_op(
|
||||
predictions_dir=os.path.join(predict_output, 'part-*.csv'),
|
||||
true_class=true_label,
|
||||
true_score_column=true_label,
|
||||
output_dir=output_template
|
||||
).after(_predict_op)
|
||||
|
||||
dsl.get_pipeline_conf().add_op_transformer(
|
||||
gcp.use_gcp_secret('user-gcp-sa'))
|
||||
|
|
|
|||
|
|
@ -152,21 +152,20 @@ class PySampleChecker(object):
|
|||
exit(1)
|
||||
|
||||
###### Validate the results for specific test cases ######
|
||||
#TODO: Add result check for tfx-cab-classification after launch.
|
||||
# if self._testname == 'xgboost_training_cm':
|
||||
# # For xgboost sample, check its confusion matrix.
|
||||
# cm_tar_path = './confusion_matrix.tar.gz'
|
||||
# utils.get_artifact_in_minio(workflow_json, 'confusion-matrix', cm_tar_path,
|
||||
# 'mlpipeline-ui-metadata')
|
||||
# with tarfile.open(cm_tar_path) as tar_handle:
|
||||
# file_handles = tar_handle.getmembers()
|
||||
# assert len(file_handles) == 1
|
||||
#
|
||||
# with tar_handle.extractfile(file_handles[0]) as f:
|
||||
# cm_data = f.read()
|
||||
# utils.add_junit_test(self._test_cases, 'confusion matrix format',
|
||||
# (len(cm_data) > 0),
|
||||
# 'the confusion matrix file is empty')
|
||||
if self._testname == 'xgboost_training_cm':
|
||||
# For xgboost sample, check its confusion matrix.
|
||||
cm_tar_path = './confusion_matrix.tar.gz'
|
||||
utils.get_artifact_in_minio(workflow_json, 'confusion-matrix', cm_tar_path,
|
||||
'mlpipeline-ui-metadata')
|
||||
with tarfile.open(cm_tar_path) as tar_handle:
|
||||
file_handles = tar_handle.getmembers()
|
||||
assert len(file_handles) == 1
|
||||
|
||||
with tar_handle.extractfile(file_handles[0]) as f:
|
||||
cm_data = f.read()
|
||||
utils.add_junit_test(self._test_cases, 'confusion matrix format',
|
||||
(len(cm_data) > 0),
|
||||
'the confusion matrix file is empty')
|
||||
|
||||
###### Delete Job ######
|
||||
#TODO: add deletion when the backend API offers the interface.
|
||||
|
|
|
|||
Loading…
Reference in New Issue