228 lines
7.3 KiB
Python
228 lines
7.3 KiB
Python
# Copyright 2018 The Kubeflow Authors
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import sys
|
|
import subprocess
|
|
import time
|
|
import json
|
|
import click
|
|
import shutil
|
|
import datetime
|
|
from typing import List
|
|
|
|
import kfp_server_api
|
|
from kfp.deprecated._client import Client
|
|
from kfp.deprecated.cli.output import print_output, OutputFormat
|
|
|
|
|
|
@click.group()
|
|
def run():
|
|
"""manage run resources."""
|
|
|
|
|
|
@run.command()
|
|
@click.option(
|
|
'-e', '--experiment-id', help='Parent experiment ID of listed runs.')
|
|
@click.option(
|
|
'--page-token', default='', help="Token for starting of the page.")
|
|
@click.option(
|
|
'-m', '--max-size', default=100, help="Max size of the listed runs.")
|
|
@click.option(
|
|
'--sort-by',
|
|
default="created_at desc",
|
|
help="Can be '[field_name]', '[field_name] desc'. For example, 'name desc'."
|
|
)
|
|
@click.option(
|
|
'--filter',
|
|
help=(
|
|
"filter: A url-encoded, JSON-serialized Filter protocol buffer "
|
|
"(see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto))."
|
|
))
|
|
@click.pass_context
|
|
def list(ctx: click.Context, experiment_id: str, page_token: str, max_size: int,
|
|
sort_by: str, filter: str):
|
|
"""list recent KFP runs."""
|
|
client = ctx.obj['client']
|
|
output_format = ctx.obj['output']
|
|
response = client.list_runs(
|
|
experiment_id=experiment_id,
|
|
page_token=page_token,
|
|
page_size=max_size,
|
|
sort_by=sort_by,
|
|
filter=filter)
|
|
if response and response.runs:
|
|
_print_runs(response.runs, output_format)
|
|
else:
|
|
if output_format == OutputFormat.json.name:
|
|
msg = json.dumps([])
|
|
else:
|
|
msg = 'No runs found.'
|
|
click.echo(msg)
|
|
|
|
|
|
@run.command()
|
|
@click.option(
|
|
'-e',
|
|
'--experiment-name',
|
|
required=True,
|
|
help='Experiment name of the run.')
|
|
@click.option('-r', '--run-name', help='Name of the run.')
|
|
@click.option(
|
|
'-f',
|
|
'--package-file',
|
|
type=click.Path(exists=True, dir_okay=False),
|
|
help='Path of the pipeline package file.')
|
|
@click.option('-p', '--pipeline-id', help='ID of the pipeline template.')
|
|
@click.option('-n', '--pipeline-name', help='Name of the pipeline template.')
|
|
@click.option(
|
|
'-w',
|
|
'--watch',
|
|
is_flag=True,
|
|
default=False,
|
|
help='Watch the run status until it finishes.')
|
|
@click.option('-v', '--version', help='ID of the pipeline version.')
|
|
@click.option(
|
|
'-t',
|
|
'--timeout',
|
|
default=0,
|
|
help='Wait for a run to complete until timeout in seconds.',
|
|
type=int)
|
|
@click.argument('args', nargs=-1)
|
|
@click.pass_context
|
|
def submit(ctx: click.Context, experiment_name: str, run_name: str,
|
|
package_file: str, pipeline_id: str, pipeline_name: str, watch: bool,
|
|
timeout: int, version: str, args: List[str]):
|
|
"""submit a KFP run."""
|
|
client = ctx.obj['client']
|
|
namespace = ctx.obj['namespace']
|
|
output_format = ctx.obj['output']
|
|
if not run_name:
|
|
run_name = experiment_name
|
|
|
|
if not pipeline_id and pipeline_name:
|
|
pipeline_id = client.get_pipeline_id(name=pipeline_name)
|
|
|
|
if not package_file and not pipeline_id and not version:
|
|
click.echo(
|
|
'You must provide one of [package_file, pipeline_id, version].',
|
|
err=True)
|
|
sys.exit(1)
|
|
|
|
arg_dict = dict(arg.split('=', maxsplit=1) for arg in args)
|
|
|
|
experiment = client.create_experiment(experiment_name)
|
|
run = client.run_pipeline(
|
|
experiment.id,
|
|
run_name,
|
|
package_file,
|
|
arg_dict,
|
|
pipeline_id,
|
|
version_id=version)
|
|
if timeout > 0:
|
|
_wait_for_run_completion(client, run.id, timeout, output_format)
|
|
else:
|
|
_display_run(client, namespace, run.id, watch, output_format)
|
|
|
|
|
|
@run.command()
|
|
@click.option(
|
|
'-w',
|
|
'--watch',
|
|
is_flag=True,
|
|
default=False,
|
|
help='Watch the run status until it finishes.')
|
|
@click.option(
|
|
'-d',
|
|
'--detail',
|
|
is_flag=True,
|
|
default=False,
|
|
help='Get detailed information of the run in json format.')
|
|
@click.argument('run-id')
|
|
@click.pass_context
|
|
def get(ctx: click.Context, watch: bool, detail: bool, run_id: str):
|
|
"""display the details of a KFP run."""
|
|
client = ctx.obj['client']
|
|
namespace = ctx.obj['namespace']
|
|
output_format = ctx.obj['output']
|
|
|
|
_display_run(client, namespace, run_id, watch, output_format, detail)
|
|
|
|
|
|
def _display_run(client: click.Context,
|
|
namespace: str,
|
|
run_id: str,
|
|
watch: bool,
|
|
output_format: OutputFormat,
|
|
detail: bool = False):
|
|
run = client.get_run(run_id).run
|
|
|
|
if detail:
|
|
data = {
|
|
key:
|
|
value.isoformat() if isinstance(value, datetime.datetime) else value
|
|
for key, value in run.to_dict().items()
|
|
if key not in ['pipeline_spec'
|
|
] # useless but too much detailed field
|
|
}
|
|
click.echo(data)
|
|
return
|
|
|
|
_print_runs([run], output_format)
|
|
if not watch:
|
|
return
|
|
argo_path = shutil.which('argo')
|
|
if not argo_path:
|
|
raise RuntimeError(
|
|
"argo isn't found in $PATH. It's necessary for watch. "
|
|
"Please make sure it's installed and available. "
|
|
"Installation instructions be found here - "
|
|
"https://github.com/argoproj/argo-workflows/releases")
|
|
|
|
argo_workflow_name = None
|
|
while True:
|
|
time.sleep(1)
|
|
run_detail = client.get_run(run_id)
|
|
run = run_detail.run
|
|
if run_detail.pipeline_runtime and run_detail.pipeline_runtime.workflow_manifest:
|
|
manifest = json.loads(run_detail.pipeline_runtime.workflow_manifest)
|
|
if manifest['metadata'] and manifest['metadata']['name']:
|
|
argo_workflow_name = manifest['metadata']['name']
|
|
break
|
|
if run_detail.run.status in ['Succeeded', 'Skipped', 'Failed', 'Error']:
|
|
click.echo('Run is finished with status {}.'.format(
|
|
run_detail.run.status))
|
|
return
|
|
if argo_workflow_name:
|
|
subprocess.run(
|
|
[argo_path, 'watch', argo_workflow_name, '-n', namespace])
|
|
_print_runs([run], output_format)
|
|
|
|
|
|
def _wait_for_run_completion(client: Client, run_id: str, timeout: int,
|
|
output_format: OutputFormat):
|
|
run_detail = client.wait_for_run_completion(run_id, timeout)
|
|
_print_runs([run_detail.run], output_format)
|
|
|
|
|
|
def _print_runs(runs: List[kfp_server_api.ApiRun], output_format: OutputFormat):
|
|
headers = ['run id', 'name', 'status', 'created at', 'experiment id']
|
|
data = [[
|
|
run.id, run.name, run.status,
|
|
run.created_at.isoformat(),
|
|
next(rr
|
|
for rr in run.resource_references
|
|
if rr.key.type == kfp_server_api.ApiResourceType.EXPERIMENT).key.id
|
|
]
|
|
for run in runs]
|
|
print_output(data, headers, output_format, table_format='grid')
|