feat(sdk): Improve CLI experience for archiving experiments, managing recurring runs and listing resources (#6934)

This commit is contained in:
Alex Latchford 2021-12-01 10:07:13 -08:00 committed by GitHub
parent e0d7d1ba5b
commit ddbfcde16f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 522 additions and 111 deletions

View File

@ -8,6 +8,7 @@
* Merge v2 experimental change back to v2 namespace [\#6890](https://github.com/kubeflow/pipelines/pull/6890)
* Add ImporterSpec v2 [\#6917](https://github.com/kubeflow/pipelines/pull/6917)
* Add add set_env_variable for Pipeline task [\#6919](https://github.com/kubeflow/pipelines/pull/6919)
* Improve CLI experience for archiving experiments, managing recurring runs and listing resources [\#6934](https://github.com/kubeflow/pipelines/pull/6934)
## Breaking Changes

View File

@ -372,7 +372,7 @@ class Client(object):
config.refresh_api_key_hook(config)
return config
def set_user_namespace(self, namespace):
def set_user_namespace(self, namespace: str):
"""Set user namespace into local context setting file.
This function should only be used when Kubeflow Pipelines is in the multi-user mode.
@ -386,7 +386,7 @@ class Client(object):
with open(Client.LOCAL_KFP_CONTEXT, 'w') as f:
json.dump(self._context_setting, f)
def get_kfp_healthz(self):
def get_kfp_healthz(self) -> kfp_server_api.ApiGetHealthzResponse:
"""Gets healthz info of KFP deployment.
Returns:
@ -412,7 +412,7 @@ class Client(object):
'Failed to get healthz info attempt {} of 5.'.format(count))
time.sleep(5)
def get_user_namespace(self):
def get_user_namespace(self) -> str:
"""Get user namespace in context config.
Returns:
@ -420,7 +420,11 @@ class Client(object):
"""
return self._context_setting['namespace']
def create_experiment(self, name, description=None, namespace=None):
def create_experiment(
self,
name: str,
description: str = None,
namespace: str = None) -> kfp_server_api.ApiExperiment:
"""Create a new experiment.
Args:
@ -470,7 +474,7 @@ class Client(object):
IPython.display.display(IPython.display.HTML(html))
return experiment
def get_pipeline_id(self, name):
def get_pipeline_id(self, name) -> Optional[str]:
"""Find the id of a pipeline by name.
Args:
@ -497,12 +501,13 @@ class Client(object):
.format(name))
return None
def list_experiments(self,
page_token='',
page_size=10,
sort_by='',
namespace=None,
filter=None):
def list_experiments(
self,
page_token='',
page_size=10,
sort_by='',
namespace=None,
filter=None) -> kfp_server_api.ApiListExperimentsResponse:
"""List experiments.
Args:
@ -532,7 +537,7 @@ class Client(object):
def get_experiment(self,
experiment_id=None,
experiment_name=None,
namespace=None):
namespace=None) -> kfp_server_api.ApiExperiment:
"""Get details of an experiment.
Either experiment_id or experiment_name is required
@ -547,8 +552,8 @@ class Client(object):
Returns:
A response object including details of a experiment.
Throws:
Exception if experiment is not found or None of the arguments is provided
Raises:
kfp_server_api.ApiException: If experiment is not found or None of the arguments is provided
"""
namespace = namespace or self.get_user_namespace()
if experiment_id is None and experiment_name is None:
@ -581,6 +586,17 @@ class Client(object):
experiment_name))
return result.experiments[0]
def archive_experiment(self, experiment_id: str):
"""Archive experiment.
Args:
experiment_id: id of the experiment.
Raises:
kfp_server_api.ApiException: If experiment is not found.
"""
self._experiment_api.archive_experiment(experiment_id)
def delete_experiment(self, experiment_id):
"""Delete experiment.
@ -590,8 +606,8 @@ class Client(object):
Returns:
Object. If the method is called asynchronously, returns the request thread.
Throws:
Exception if experiment is not found.
Raises:
kfp_server_api.ApiException: If experiment is not found.
"""
return self._experiment_api.delete_experiment(id=experiment_id)
@ -643,7 +659,11 @@ class Client(object):
'pipelines.kubeflow.org/enable_caching'] = str(
enable_caching).lower()
def list_pipelines(self, page_token='', page_size=10, sort_by='', filter=None):
def list_pipelines(self,
page_token='',
page_size=10,
sort_by='',
filter=None) -> kfp_server_api.ApiListPipelinesResponse:
"""List pipelines.
Args:
@ -674,7 +694,7 @@ class Client(object):
pipeline_root: Optional[str] = None,
enable_caching: Optional[str] = None,
service_account: Optional[str] = None,
):
) -> kfp_server_api.ApiRun:
"""Run a specified pipeline.
Args:
@ -751,7 +771,7 @@ class Client(object):
enabled: bool = True,
enable_caching: Optional[bool] = None,
service_account: Optional[str] = None,
):
) -> kfp_server_api.ApiJob:
"""Create a recurring run.
Args:
@ -790,7 +810,11 @@ class Client(object):
Returns:
A Job object. Most important field is id.
Raises:
ValueError: If required parameters are not supplied.
"""
job_config = self._create_job_config(
experiment_id=experiment_id,
params=params,
@ -1060,7 +1084,7 @@ class Client(object):
)
return RunPipelineResult(self, run_info)
def delete_job(self, job_id):
def delete_job(self, job_id: str):
"""Deletes a job.
Args:
@ -1070,11 +1094,11 @@ class Client(object):
Object. If the method is called asynchronously, returns the request thread.
Raises:
ApiException: If the job is not found.
kfp_server_api.ApiException: If the job is not found.
"""
return self._job_api.delete_job(id=job_id)
def disable_job(self, job_id):
def disable_job(self, job_id: str):
"""Disables a job.
Args:
@ -1094,7 +1118,7 @@ class Client(object):
sort_by='',
experiment_id=None,
namespace=None,
filter=None):
filter=None) -> kfp_server_api.ApiListRunsResponse:
"""List runs, optionally can be filtered by experiment or namespace.
Args:
@ -1143,7 +1167,7 @@ class Client(object):
page_size=10,
sort_by='',
experiment_id=None,
filter=None):
filter=None) -> kfp_server_api.ApiListJobsResponse:
"""List recurring runs.
Args:
@ -1174,7 +1198,7 @@ class Client(object):
filter=filter)
return response
def get_recurring_run(self, job_id):
def get_recurring_run(self, job_id: str) -> kfp_server_api.ApiJob:
"""Get recurring_run details.
Args:
@ -1183,12 +1207,12 @@ class Client(object):
Returns:
A response object including details of a recurring_run.
Throws:
Exception if recurring_run is not found.
Raises:
kfp_server_api.ApiException: If recurring_run is not found.
"""
return self._job_api.get_job(id=job_id)
def get_run(self, run_id):
def get_run(self, run_id: str) -> kfp_server_api.ApiRun:
"""Get run details.
Args:
@ -1197,12 +1221,12 @@ class Client(object):
Returns:
A response object including details of a run.
Throws:
Exception if run is not found.
Raises:
kfp_server_api.ApiException: If run is not found.
"""
return self._run_api.get_run(run_id=run_id)
def wait_for_run_completion(self, run_id, timeout):
def wait_for_run_completion(self, run_id: str, timeout: int):
"""Waits for a run to complete.
Args:
@ -1262,7 +1286,7 @@ class Client(object):
pipeline_package_path: str = None,
pipeline_name: str = None,
description: str = None,
):
) -> kfp_server_api.ApiPipeline:
"""Uploads the pipeline to the Kubeflow Pipelines cluster.
Args:
@ -1283,14 +1307,15 @@ class Client(object):
IPython.display.display(IPython.display.HTML(html))
return response
def upload_pipeline_version(self,
pipeline_package_path,
pipeline_version_name: str,
pipeline_id: Optional[str] = None,
pipeline_name: Optional[str] = None,
description: Optional[str] = None):
"""Uploads a new version of the pipeline to the Kubeflow Pipelines
cluster.
def upload_pipeline_version(
self,
pipeline_package_path,
pipeline_version_name: str,
pipeline_id: Optional[str] = None,
pipeline_name: Optional[str] = None,
description: Optional[str] = None,
) -> kfp_server_api.ApiPipelineVersion:
"""Uploads a new version of the pipeline to the Kubeflow Pipelines cluster.
Args:
pipeline_package_path: Local path to the pipeline package.
@ -1298,11 +1323,13 @@ class Client(object):
pipeline_id: Optional. Id of the pipeline.
pipeline_name: Optional. Name of the pipeline.
description: Optional. Description of the pipeline version to be shown in the UI.
Returns:
Server response object containing pipleine id and other information.
Throws:
Raises:
ValueError when none or both of pipeline_id or pipeline_name are specified
Exception if pipeline id is not found.
kfp_server_api.ApiException: If pipeline id is not found.
"""
if all([pipeline_id, pipeline_name
@ -1337,7 +1364,7 @@ class Client(object):
IPython.display.display(IPython.display.HTML(html))
return response
def get_pipeline(self, pipeline_id):
def get_pipeline(self, pipeline_id: str) -> kfp_server_api.ApiPipeline:
"""Get pipeline details.
Args:
@ -1346,8 +1373,8 @@ class Client(object):
Returns:
A response object including details of a pipeline.
Throws:
Exception if pipeline is not found.
Raises:
kfp_server_api.ApiException: If pipeline is not found.
"""
return self._pipelines_api.get_pipeline(id=pipeline_id)
@ -1360,16 +1387,18 @@ class Client(object):
Returns:
Object. If the method is called asynchronously, returns the request thread.
Throws:
Exception if pipeline is not found.
Raises:
kfp_server_api.ApiException: If pipeline is not found.
"""
return self._pipelines_api.delete_pipeline(id=pipeline_id)
def list_pipeline_versions(self,
pipeline_id,
page_token='',
page_size=10,
sort_by=''):
def list_pipeline_versions(
self,
pipeline_id: str,
page_token: str = '',
page_size: int = 10,
sort_by: str = ''
) -> kfp_server_api.ApiListPipelineVersionsResponse:
"""Lists pipeline versions.
Args:
@ -1380,6 +1409,9 @@ class Client(object):
Returns:
A response object including a list of versions and next page token.
Raises:
kfp_server_api.ApiException: If pipeline is not found.
"""
return self._pipelines_api.list_pipeline_versions(
@ -1389,3 +1421,18 @@ class Client(object):
resource_key_type=kfp_server_api.models.api_resource_type
.ApiResourceType.PIPELINE,
resource_key_id=pipeline_id)
def delete_pipeline_version(self, version_id: str):
"""Delete pipeline version.
Args:
version_id: id of the pipeline version.
Returns:
Object. If the method is called asynchronously, returns the request thread.
Raises:
Exception if pipeline version is not found.
"""
return self._pipelines_api.delete_pipeline_version(
version_id=version_id)

View File

@ -20,6 +20,7 @@ import typer
from kfp._client import Client
from kfp.cli.run import run
from kfp.cli.recurring_run import recurring_run
from kfp.cli.pipeline import pipeline
from kfp.cli.diagnose_me_cli import diagnose_me
from kfp.cli.experiment import experiment
@ -49,8 +50,8 @@ from kfp.cli import components
show_default=True,
help='The formatting style for command output.')
@click.pass_context
def cli(ctx, endpoint, iap_client_id, namespace, other_client_id,
other_client_secret, output):
def cli(ctx: click.Context, endpoint: str, iap_client_id: str, namespace: str,
other_client_id: str, other_client_secret: str, output: OutputFormat):
"""kfp is the command line interface to KFP service.
Feature stage:
@ -68,6 +69,7 @@ def cli(ctx, endpoint, iap_client_id, namespace, other_client_id,
def main():
logging.basicConfig(format='%(message)s', level=logging.INFO)
cli.add_command(run)
cli.add_command(recurring_run)
cli.add_command(pipeline)
cli.add_command(diagnose_me, 'diagnose_me')
cli.add_command(experiment)

View File

@ -35,7 +35,8 @@ def diagnose_me():
help='Namespace to use for Kubernetes cluster.all-namespaces is used if not specified.'
)
@click.pass_context
def diagnose_me(ctx, json, project_id, namespace):
def diagnose_me(ctx: click.Context, json: bool, project_id: str,
namespace: str):
"""Runs environment diagnostic with specified parameters.
Feature stage:

View File

@ -1,7 +1,10 @@
import click
import json
from typing import List
from kfp.cli.output import print_output, OutputFormat
import kfp_server_api
from kfp_server_api.models.api_experiment import ApiExperiment
@click.group()
@ -14,7 +17,7 @@ def experiment():
@click.option('-d', '--description', help="Description of the experiment.")
@click.argument("name")
@click.pass_context
def create(ctx, description, name):
def create(ctx: click.Context, description: str, name: str):
"""Create an experiment."""
client = ctx.obj["client"]
output_format = ctx.obj["output"]
@ -24,16 +27,33 @@ def create(ctx, description, name):
@experiment.command()
@click.option(
'--page-token', default='', help="Token for starting of the page.")
@click.option(
'-m', '--max-size', default=100, help="Max size of the listed experiments.")
@click.option(
'--sort-by',
default="created_at desc",
help="Can be '[field_name]', '[field_name] desc'. For example, 'name desc'."
)
@click.option(
'--filter',
help=(
"filter: A url-encoded, JSON-serialized Filter protocol buffer "
"(see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto))."
))
@click.pass_context
def list(ctx, max_size):
def list(ctx: click.Context, page_token: str, max_size: int, sort_by: str,
filter: str):
"""List experiments."""
client = ctx.obj['client']
output_format = ctx.obj['output']
response = client.list_experiments(
page_size=max_size, sort_by="created_at desc")
page_token=page_token,
page_size=max_size,
sort_by=sort_by,
filter=filter)
if response.experiments:
_display_experiments(response.experiments, output_format)
else:
@ -47,7 +67,7 @@ def list(ctx, max_size):
@experiment.command()
@click.argument("experiment-id")
@click.pass_context
def get(ctx, experiment_id):
def get(ctx: click.Context, experiment_id: str):
"""Get detailed information about an experiment."""
client = ctx.obj["client"]
output_format = ctx.obj["output"]
@ -59,7 +79,7 @@ def get(ctx, experiment_id):
@experiment.command()
@click.argument("experiment-id")
@click.pass_context
def delete(ctx, experiment_id):
def delete(ctx: click.Context, experiment_id: str):
"""Delete an experiment."""
confirmation = "Caution. The RunDetails page could have an issue" \
@ -74,7 +94,8 @@ def delete(ctx, experiment_id):
click.echo("{} is deleted.".format(experiment_id))
def _display_experiments(experiments, output_format):
def _display_experiments(experiments: List[ApiExperiment],
output_format: OutputFormat):
headers = ["Experiment ID", "Name", "Created at"]
data = [
[exp.id, exp.name, exp.created_at.isoformat()] for exp in experiments
@ -82,7 +103,8 @@ def _display_experiments(experiments, output_format):
print_output(data, headers, output_format, table_format="grid")
def _display_experiment(exp, output_format):
def _display_experiment(exp: kfp_server_api.ApiExperiment,
output_format: OutputFormat):
table = [
["ID", exp.id],
["Name", exp.name],
@ -94,3 +116,27 @@ def _display_experiment(exp, output_format):
print_output(table, [], output_format, table_format="plain")
elif output_format == OutputFormat.json.name:
print_output(dict(table), [], output_format)
@experiment.command()
@click.option(
"--experiment-id",
default=None,
help="The ID of the experiment to archive, can only supply either an experiment ID or name.")
@click.option(
"--experiment-name",
default=None,
help="The name of the experiment to archive, can only supply either an experiment ID or name.")
@click.pass_context
def archive(ctx: click.Context, experiment_id: str, experiment_name: str):
"""Archive an experiment"""
client = ctx.obj["client"]
if (experiment_id is None) == (experiment_name is None):
raise ValueError('Either experiment_id or experiment_name is required')
if not experiment_id:
experiment = client.get_experiment(experiment_name=experiment_name)
experiment_id = experiment.id
client.archive_experiment(experiment_id=experiment_id)

View File

@ -14,7 +14,9 @@
import click
import json
from typing import List, Optional
import kfp_server_api
from kfp.cli.output import print_output, OutputFormat
@ -29,7 +31,10 @@ def pipeline():
@click.option("-d", "--description", help="Description for the pipeline.")
@click.argument("package-file")
@click.pass_context
def upload(ctx, pipeline_name, package_file, description=None):
def upload(ctx: click.Context,
pipeline_name: str,
package_file: str,
description: str = None):
"""Upload a KFP pipeline."""
client = ctx.obj["client"]
output_format = ctx.obj["output"]
@ -50,11 +55,11 @@ def upload(ctx, pipeline_name, package_file, description=None):
required=True)
@click.argument("package-file")
@click.pass_context
def upload_version(ctx,
package_file,
pipeline_version,
pipeline_id=None,
pipeline_name=None):
def upload_version(ctx: click.Context,
package_file: str,
pipeline_version: str,
pipeline_id: Optional[str] = None,
pipeline_name: Optional[str] = None):
"""Upload a version of the KFP pipeline."""
client = ctx.obj["client"]
output_format = ctx.obj["output"]
@ -72,15 +77,32 @@ def upload_version(ctx,
@pipeline.command()
@click.option(
"-m", "--max-size", default=100, help="Max size of the listed pipelines.")
'--page-token', default='', help="Token for starting of the page.")
@click.option(
'-m', '--max-size', default=100, help="Max size of the listed pipelines.")
@click.option(
'--sort-by',
default="created_at desc",
help="Can be '[field_name]', '[field_name] desc'. For example, 'name desc'."
)
@click.option(
'--filter',
help=(
"filter: A url-encoded, JSON-serialized Filter protocol buffer "
"(see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto))."
))
@click.pass_context
def list(ctx, max_size):
def list(ctx: click.Context, page_token: str, max_size: int, sort_by: str,
filter: str):
"""List uploaded KFP pipelines."""
client = ctx.obj["client"]
output_format = ctx.obj["output"]
response = client.list_pipelines(
page_size=max_size, sort_by="created_at desc")
page_token=page_token,
page_size=max_size,
sort_by=sort_by,
filter=filter)
if response.pipelines:
_print_pipelines(response.pipelines, output_format)
else:
@ -94,15 +116,36 @@ def list(ctx, max_size):
@pipeline.command()
@click.argument("pipeline-id")
@click.option(
"-m", "--max-size", default=10, help="Max size of the listed pipelines.")
'--page-token', default='', help="Token for starting of the page.")
@click.option(
'-m',
'--max-size',
default=100,
help="Max size of the listed pipeline versions.")
@click.option(
'--sort-by',
default="created_at desc",
help="Can be '[field_name]', '[field_name] desc'. For example, 'name desc'."
)
@click.option(
'--filter',
help=(
"filter: A url-encoded, JSON-serialized Filter protocol buffer "
"(see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto))."
))
@click.pass_context
def list_versions(ctx, pipeline_id, max_size):
"""List versions of an uploaded KFP pipeline."""
def list_versions(ctx: click.Context, pipeline_id: str, page_token: str,
max_size: int, sort_by: str, filter: str):
"""List versions of an uploaded KFP pipeline"""
client = ctx.obj["client"]
output_format = ctx.obj["output"]
response = client.list_pipeline_versions(
pipeline_id=pipeline_id, page_size=max_size, sort_by="created_at desc")
pipeline_id,
page_token=page_token,
page_size=max_size,
sort_by=sort_by,
filter=filter)
if response.versions:
_print_pipeline_versions(response.versions, output_format)
else:
@ -113,11 +156,30 @@ def list_versions(ctx, pipeline_id, max_size):
click.echo(msg)
@pipeline.command()
@click.argument("version-id")
@click.pass_context
def delete_version(ctx: click.Context, version_id: str):
"""Delete pipeline version.
Args:
version_id: id of the pipeline version.
Returns:
Object. If the method is called asynchronously, returns the request thread.
Throws:
Exception if pipeline version is not found.
"""
client = ctx.obj["client"]
return client.delete_pipeline_version(version_id)
@pipeline.command()
@click.argument("pipeline-id")
@click.pass_context
def get(ctx, pipeline_id):
"""Get detailed information about an uploaded KFP pipeline."""
def get(ctx: click.Context, pipeline_id: str):
"""Get detailed information about an uploaded KFP pipeline"""
client = ctx.obj["client"]
output_format = ctx.obj["output"]
@ -128,36 +190,43 @@ def get(ctx, pipeline_id):
@pipeline.command()
@click.argument("pipeline-id")
@click.pass_context
def delete(ctx, pipeline_id):
def delete(ctx: click.Context, pipeline_id: str):
"""Delete an uploaded KFP pipeline."""
client = ctx.obj["client"]
client.delete_pipeline(pipeline_id)
click.echo("{} is deleted".format(pipeline_id))
click.echo(f"{pipeline_id} is deleted")
def _print_pipelines(pipelines, output_format):
def _print_pipelines(pipelines: List[kfp_server_api.ApiPipeline],
output_format: OutputFormat):
headers = ["Pipeline ID", "Name", "Uploaded at"]
data = [[pipeline.id, pipeline.name,
pipeline.created_at.isoformat()] for pipeline in pipelines]
print_output(data, headers, output_format, table_format="grid")
def _print_pipeline_versions(versions, output_format):
headers = ["Version ID", "Version name", "Uploaded at"]
data = [[version.id, version.name,
version.created_at.isoformat()] for version in versions]
def _print_pipeline_versions(versions: List[kfp_server_api.ApiPipelineVersion],
output_format: OutputFormat):
headers = ["Version ID", "Version name", "Uploaded at", "Pipeline ID"]
data = [[
version.id, version.name,
version.created_at.isoformat(),
next(rr
for rr in version.resource_references
if rr.key.type == kfp_server_api.ApiResourceType.PIPELINE).id
]
for version in versions]
print_output(data, headers, output_format, table_format="grid")
def _display_pipeline(pipeline, output_format):
def _display_pipeline(pipeline: kfp_server_api.ApiPipeline,
output_format: OutputFormat):
# Pipeline information
table = [
["ID", pipeline.id],
["Name", pipeline.name],
["Description", pipeline.description],
["Uploaded at", pipeline.created_at.isoformat()],
]
table = [["Pipeline ID", pipeline.id], ["Name", pipeline.name],
["Description", pipeline.description],
["Uploaded at", pipeline.created_at.isoformat()],
["Version ID", pipeline.default_version.id]]
# Pipeline parameter details
headers = ["Parameter Name", "Default Value"]
@ -179,13 +248,14 @@ def _display_pipeline(pipeline, output_format):
print_output(output, [], output_format)
def _display_pipeline_version(version, output_format):
pipeline_id = version.resource_references[0].key.id
table = [
["Pipeline ID", pipeline_id],
["Version Name", version.name],
["Uploaded at", version.created_at.isoformat()],
]
def _display_pipeline_version(version: kfp_server_api.ApiPipelineVersion,
output_format: OutputFormat):
pipeline_id = next(
rr for rr in version.resource_references
if rr.key.type == kfp_server_api.ApiResourceType.PIPELINE).id
table = [["Pipeline ID", pipeline_id], ["Version name", version.name],
["Uploaded at", version.created_at.isoformat()],
["Version ID", version.id]]
if output_format == OutputFormat.table.name:
print_output([], ["Pipeline Version Details"], output_format)

View File

@ -0,0 +1,213 @@
# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Dict, List, Optional
import json
import click
from kfp.cli.output import print_output, OutputFormat
import kfp_server_api
@click.group()
def recurring_run():
"""Manage recurring-run resources"""
pass
@recurring_run.command()
@click.option(
'--catchup/--no-catchup',
help='Whether the recurring run should catch up if behind schedule.',
type=bool)
@click.option(
'--cron-expression',
help='A cron expression representing a set of times, using 6 space-separated fields, e.g. "0 0 9 ? * 2-6".'
)
@click.option('--description', help='The description of the recurring run.')
@click.option(
'--enable-caching/--disable-caching',
help='Optional. Whether or not to enable caching for the run.',
type=bool)
@click.option(
'--enabled/--disabled',
help='A bool indicating whether the recurring run is enabled or disabled.',
type=bool)
@click.option(
'--end-time',
help='The RFC3339 time string of the time when to end the job.')
@click.option(
'--experiment-id',
help='The ID of the experiment to create the recurring run under, can only supply either an experiment ID or name.')
@click.option(
'--experiment-name',
help='The name of the experiment to create the recurring run under, can only supply either an experiment ID or name.')
@click.option('--job-name', help='The name of the recurring run.')
@click.option(
'--interval-second',
help='Integer indicating the seconds between two recurring runs in for a periodic schedule.'
)
@click.option(
'--max-concurrency',
help='Integer indicating how many jobs can be run in parallel.',
type=int)
@click.option(
'--pipeline-id',
help='The ID of the pipeline to use to create the recurring run.')
@click.option(
'--pipeline-package-path',
help='Local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml).'
)
@click.option(
'--start-time',
help='The RFC3339 time string of the time when to start the job.')
@click.option('--version-id', help='The id of a pipeline version.')
@click.argument("args", nargs=-1)
@click.pass_context
def create(ctx: click.Context,
job_name: str,
experiment_id: Optional[str] = None,
experiment_name: Optional[str] = None,
catchup: Optional[bool] = None,
cron_expression: Optional[str] = None,
enabled: Optional[bool] = None,
description: Optional[str] = None,
enable_caching: Optional[bool] = None,
end_time: Optional[str] = None,
interval_second: Optional[int] = None,
max_concurrency: Optional[int] = None,
params: Optional[dict] = None,
pipeline_package_path: Optional[str] = None,
pipeline_id: Optional[str] = None,
start_time: Optional[str] = None,
version_id: Optional[str] = None,
args: Optional[List[str]] = None):
"""Create a recurring run."""
client = ctx.obj["client"]
output_format = ctx.obj['output']
if (experiment_id is None) == (experiment_name is None):
raise ValueError('Either experiment_id or experiment_name is required')
if not experiment_id:
experiment = client.create_experiment(experiment_name)
experiment_id = experiment.id
# Ensure we only split on the first equals char so the value can contain
# equals signs too.
split_args: List = [arg.split("=", 1) for arg in args or []]
params: Dict[str, Any] = dict(split_args)
recurring_run = client.create_recurring_run(
cron_expression=cron_expression,
description=description,
enabled=enabled,
enable_caching=enable_caching,
end_time=end_time,
experiment_id=experiment_id,
interval_second=interval_second,
job_name=job_name,
max_concurrency=max_concurrency,
no_catchup=not catchup,
params=params,
pipeline_package_path=pipeline_package_path,
pipeline_id=pipeline_id,
start_time=start_time,
version_id=version_id)
_display_recurring_run(recurring_run, output_format)
@recurring_run.command()
@click.option(
'-e',
'--experiment-id',
help='Parent experiment ID of listed recurring runs.')
@click.option(
'--page-token', default='', help="Token for starting of the page.")
@click.option(
'-m',
'--max-size',
default=100,
help="Max size of the listed recurring runs.")
@click.option(
'--sort-by',
default="created_at desc",
help="Can be '[field_name]', '[field_name] desc'. For example, 'name desc'."
)
@click.option(
'--filter',
help=(
"filter: A url-encoded, JSON-serialized Filter protocol buffer "
"(see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto))."
))
@click.pass_context
def list(ctx: click.Context, experiment_id: str, page_token: str, max_size: int,
sort_by: str, filter: str):
"""List recurring runs"""
client = ctx.obj['client']
output_format = ctx.obj['output']
response = client.list_recurring_runs(
experiment_id=experiment_id,
page_token=page_token,
page_size=max_size,
sort_by=sort_by,
filter=filter)
if response.jobs:
_display_recurring_runs(response.jobs, output_format)
else:
if output_format == OutputFormat.json.name:
msg = json.dumps([])
else:
msg = "No recurring runs found"
click.echo(msg)
@recurring_run.command()
@click.argument("job-id")
@click.pass_context
def get(ctx: click.Context, job_id: str):
"""Get detailed information about an experiment"""
client = ctx.obj["client"]
output_format = ctx.obj["output"]
response = client.get_recurring_run(job_id)
_display_recurring_run(response, output_format)
@recurring_run.command()
@click.argument("job-id")
@click.pass_context
def delete(ctx: click.Context, job_id: str):
"""Delete a recurring run"""
client = ctx.obj["client"]
client.delete_job(job_id)
def _display_recurring_runs(recurring_runs: List[kfp_server_api.ApiJob],
output_format: OutputFormat):
headers = ["Recurring Run ID", "Name"]
data = [[rr.id, rr.name] for rr in recurring_runs]
print_output(data, headers, output_format, table_format="grid")
def _display_recurring_run(recurring_run: kfp_server_api.ApiJob,
output_format: OutputFormat):
table = [
["Recurring Run ID", recurring_run.id],
["Name", recurring_run.name],
]
if output_format == OutputFormat.table.name:
print_output([], ["Recurring Run Details"], output_format)
print_output(table, [], output_format, table_format="plain")
elif output_format == OutputFormat.json.name:
print_output(dict(table), [], output_format)

View File

@ -18,8 +18,10 @@ import json
import click
import shutil
import datetime
from typing import List
import kfp
import kfp_server_api
from kfp._client import Client
from kfp.cli.output import print_output, OutputFormat
@ -33,16 +35,32 @@ def run():
@click.option(
'-e', '--experiment-id', help='Parent experiment ID of listed runs.')
@click.option(
'-m', '--max-size', default=100, help='Max size of the listed runs.')
'--page-token', default='', help="Token for starting of the page.")
@click.option(
'-m', '--max-size', default=100, help="Max size of the listed runs.")
@click.option(
'--sort-by',
default="created_at desc",
help="Can be '[field_name]', '[field_name] desc'. For example, 'name desc'."
)
@click.option(
'--filter',
help=(
"filter: A url-encoded, JSON-serialized Filter protocol buffer "
"(see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto))."
))
@click.pass_context
def list(ctx, experiment_id, max_size):
def list(ctx: click.Context, experiment_id: str, page_token: str, max_size: int,
sort_by: str, filter: str):
"""list recent KFP runs."""
client = ctx.obj['client']
output_format = ctx.obj['output']
response = client.list_runs(
experiment_id=experiment_id,
page_token=page_token,
page_size=max_size,
sort_by='created_at desc')
sort_by=sort_by,
filter=filter)
if response and response.runs:
_print_runs(response.runs, output_format)
else:
@ -82,8 +100,9 @@ def list(ctx, experiment_id, max_size):
type=int)
@click.argument('args', nargs=-1)
@click.pass_context
def submit(ctx, experiment_name, run_name, package_file, pipeline_id,
pipeline_name, watch, timeout, version, args):
def submit(ctx: click.Context, experiment_name: str, run_name: str,
package_file: str, pipeline_id: str, pipeline_name: str, watch: bool,
timeout: int, version: str, args: List[str]):
"""submit a KFP run."""
client = ctx.obj['client']
namespace = ctx.obj['namespace']
@ -131,7 +150,7 @@ def submit(ctx, experiment_name, run_name, package_file, pipeline_id,
help='Get detailed information of the run in json format.')
@click.argument('run-id')
@click.pass_context
def get(ctx, watch, detail, run_id):
def get(ctx: click.Context, watch: bool, detail: bool, run_id: str):
"""display the details of a KFP run."""
client = ctx.obj['client']
namespace = ctx.obj['namespace']
@ -140,7 +159,12 @@ def get(ctx, watch, detail, run_id):
_display_run(client, namespace, run_id, watch, output_format, detail)
def _display_run(client, namespace, run_id, watch, output_format, detail=False):
def _display_run(client: click.Context,
namespace: str,
run_id: str,
watch: bool,
output_format: OutputFormat,
detail: bool = False):
run = client.get_run(run_id).run
if detail:
@ -185,13 +209,20 @@ def _display_run(client, namespace, run_id, watch, output_format, detail=False):
_print_runs([run], output_format)
def _wait_for_run_completion(client, run_id, timeout, output_format):
def _wait_for_run_completion(client: Client, run_id: str, timeout: int,
output_format: OutputFormat):
run_detail = client.wait_for_run_completion(run_id, timeout)
_print_runs([run_detail.run], output_format)
def _print_runs(runs, output_format):
headers = ['run id', 'name', 'status', 'created at']
data = [[run.id, run.name, run.status,
run.created_at.isoformat()] for run in runs]
def _print_runs(runs: List[kfp_server_api.ApiRun], output_format: OutputFormat):
headers = ['run id', 'name', 'status', 'created at', 'experiment id']
data = [[
run.id, run.name, run.status,
run.created_at.isoformat(),
next(rr
for rr in run.resource_references
if rr.key.type == kfp_server_api.ApiResourceType.EXPERIMENT).id
]
for run in runs]
print_output(data, headers, output_format, table_format='grid')