pipelines/components/contrib/kfp/Run_component/component.yaml

122 lines
4.7 KiB
YAML

name: Run component or pipeline
metadata:
annotations:
author: Alexey Volkov <alexey.volkov@ark-kun.com>
canonical_location: 'https://raw.githubusercontent.com/Ark-kun/pipeline_components/master/components/kfp/Run_component/component.yaml'
inputs:
- {name: component_url, type: Url}
- {name: arguments, type: JsonObject}
- {name: endpoint, type: String, optional: true}
- {name: wait_timeout_seconds, type: Float, optional: true}
outputs:
- {name: run_id, type: String}
- {name: run_object, type: JsonObject}
implementation:
container:
image: python:3.9
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'kfp==1.4.0' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
--no-warn-script-location 'kfp==1.4.0' --user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def run_component_or_pipeline(
component_url,
arguments,
endpoint = None,
wait_timeout_seconds = None,
):
import json
import os
import kfp
from kfp_server_api import ApiClient
print('Loading component...')
op = kfp.components.load_component_from_url(component_url)
print('Loading component done.')
print('Submitting run...')
if not endpoint:
endpoint = 'http://' + os.environ['ML_PIPELINE_SERVICE_HOST'] + ':' + os.environ['ML_PIPELINE_SERVICE_PORT']
create_run_result = kfp.Client(host=endpoint).create_run_from_pipeline_func(op, arguments=arguments)
run_id = str(create_run_result.run_id)
print('Submitted run: ' + run_id)
run_url = f'{endpoint.rstrip("/")}/#/runs/details/{run_id}'
print(run_url)
print('Waiting for the run to finish...')
run_object = create_run_result.wait_for_run_completion(wait_timeout_seconds)
print('Run has finished.')
# sanitize_for_serialization uses correct field names and properly converts datetime values
run_dict = ApiClient().sanitize_for_serialization(run_object)
return (
run_id,
json.dumps(run_dict, indent=4),
)
def _serialize_json(obj) -> str:
if isinstance(obj, str):
return obj
import json
def default_serializer(obj):
if hasattr(obj, 'to_struct'):
return obj.to_struct()
else:
raise TypeError("Object of type '%s' is not JSON serializable and does not have .to_struct() method." % obj.__class__.__name__)
return json.dumps(obj, default=default_serializer, sort_keys=True)
def _serialize_str(str_value: str) -> str:
if not isinstance(str_value, str):
raise TypeError('Value "{}" has type "{}" instead of str.'.format(str(str_value), str(type(str_value))))
return str_value
import json
import argparse
_parser = argparse.ArgumentParser(prog='Run component or pipeline', description='')
_parser.add_argument("--component-url", dest="component_url", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--arguments", dest="arguments", type=json.loads, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--endpoint", dest="endpoint", type=str, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--wait-timeout-seconds", dest="wait_timeout_seconds", type=float, required=False, default=argparse.SUPPRESS)
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=2)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = run_component_or_pipeline(**_parsed_args)
_output_serializers = [
_serialize_str,
_serialize_json,
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
args:
- --component-url
- {inputValue: component_url}
- --arguments
- {inputValue: arguments}
- if:
cond: {isPresent: endpoint}
then:
- --endpoint
- {inputValue: endpoint}
- if:
cond: {isPresent: wait_timeout_seconds}
then:
- --wait-timeout-seconds
- {inputValue: wait_timeout_seconds}
- '----output-paths'
- {outputPath: run_id}
- {outputPath: run_object}