122 lines
4.7 KiB
YAML
122 lines
4.7 KiB
YAML
name: Run component or pipeline
|
|
metadata:
|
|
annotations:
|
|
author: Alexey Volkov <alexey.volkov@ark-kun.com>
|
|
canonical_location: 'https://raw.githubusercontent.com/Ark-kun/pipeline_components/master/components/kfp/Run_component/component.yaml'
|
|
inputs:
|
|
- {name: component_url, type: Url}
|
|
- {name: arguments, type: JsonObject}
|
|
- {name: endpoint, type: String, optional: true}
|
|
- {name: wait_timeout_seconds, type: Float, optional: true}
|
|
outputs:
|
|
- {name: run_id, type: String}
|
|
- {name: run_object, type: JsonObject}
|
|
implementation:
|
|
container:
|
|
image: python:3.9
|
|
command:
|
|
- sh
|
|
- -c
|
|
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
|
|
'kfp==1.4.0' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
|
|
--no-warn-script-location 'kfp==1.4.0' --user) && "$0" "$@"
|
|
- sh
|
|
- -ec
|
|
- |
|
|
program_path=$(mktemp)
|
|
printf "%s" "$0" > "$program_path"
|
|
python3 -u "$program_path" "$@"
|
|
- |
|
|
def run_component_or_pipeline(
|
|
component_url,
|
|
arguments,
|
|
endpoint = None,
|
|
wait_timeout_seconds = None,
|
|
):
|
|
import json
|
|
import os
|
|
import kfp
|
|
from kfp_server_api import ApiClient
|
|
print('Loading component...')
|
|
op = kfp.components.load_component_from_url(component_url)
|
|
print('Loading component done.')
|
|
print('Submitting run...')
|
|
if not endpoint:
|
|
endpoint = 'http://' + os.environ['ML_PIPELINE_SERVICE_HOST'] + ':' + os.environ['ML_PIPELINE_SERVICE_PORT']
|
|
create_run_result = kfp.Client(host=endpoint).create_run_from_pipeline_func(op, arguments=arguments)
|
|
run_id = str(create_run_result.run_id)
|
|
print('Submitted run: ' + run_id)
|
|
run_url = f'{endpoint.rstrip("/")}/#/runs/details/{run_id}'
|
|
print(run_url)
|
|
print('Waiting for the run to finish...')
|
|
run_object = create_run_result.wait_for_run_completion(wait_timeout_seconds)
|
|
print('Run has finished.')
|
|
# sanitize_for_serialization uses correct field names and properly converts datetime values
|
|
run_dict = ApiClient().sanitize_for_serialization(run_object)
|
|
return (
|
|
run_id,
|
|
json.dumps(run_dict, indent=4),
|
|
)
|
|
|
|
def _serialize_json(obj) -> str:
|
|
if isinstance(obj, str):
|
|
return obj
|
|
import json
|
|
def default_serializer(obj):
|
|
if hasattr(obj, 'to_struct'):
|
|
return obj.to_struct()
|
|
else:
|
|
raise TypeError("Object of type '%s' is not JSON serializable and does not have .to_struct() method." % obj.__class__.__name__)
|
|
return json.dumps(obj, default=default_serializer, sort_keys=True)
|
|
|
|
def _serialize_str(str_value: str) -> str:
|
|
if not isinstance(str_value, str):
|
|
raise TypeError('Value "{}" has type "{}" instead of str.'.format(str(str_value), str(type(str_value))))
|
|
return str_value
|
|
|
|
import json
|
|
import argparse
|
|
_parser = argparse.ArgumentParser(prog='Run component or pipeline', description='')
|
|
_parser.add_argument("--component-url", dest="component_url", type=str, required=True, default=argparse.SUPPRESS)
|
|
_parser.add_argument("--arguments", dest="arguments", type=json.loads, required=True, default=argparse.SUPPRESS)
|
|
_parser.add_argument("--endpoint", dest="endpoint", type=str, required=False, default=argparse.SUPPRESS)
|
|
_parser.add_argument("--wait-timeout-seconds", dest="wait_timeout_seconds", type=float, required=False, default=argparse.SUPPRESS)
|
|
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=2)
|
|
_parsed_args = vars(_parser.parse_args())
|
|
_output_files = _parsed_args.pop("_output_paths", [])
|
|
|
|
_outputs = run_component_or_pipeline(**_parsed_args)
|
|
|
|
_output_serializers = [
|
|
_serialize_str,
|
|
_serialize_json,
|
|
|
|
]
|
|
|
|
import os
|
|
for idx, output_file in enumerate(_output_files):
|
|
try:
|
|
os.makedirs(os.path.dirname(output_file))
|
|
except OSError:
|
|
pass
|
|
with open(output_file, 'w') as f:
|
|
f.write(_output_serializers[idx](_outputs[idx]))
|
|
args:
|
|
- --component-url
|
|
- {inputValue: component_url}
|
|
- --arguments
|
|
- {inputValue: arguments}
|
|
- if:
|
|
cond: {isPresent: endpoint}
|
|
then:
|
|
- --endpoint
|
|
- {inputValue: endpoint}
|
|
- if:
|
|
cond: {isPresent: wait_timeout_seconds}
|
|
then:
|
|
- --wait-timeout-seconds
|
|
- {inputValue: wait_timeout_seconds}
|
|
- '----output-paths'
|
|
- {outputPath: run_id}
|
|
- {outputPath: run_object}
|