feat(sdk)!: make CLI output consistent, readable, and usable (#7739)

* fix cli upload pipeline version

* organize requirements.in

* generate requirements.txt

* add pip-tools to requirements-dev

* improve CLI output

* update release notes
This commit is contained in:
Connor McCarthy 2022-05-19 18:03:23 -06:00 committed by GitHub
parent 0932905ff5
commit b0db428165
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 499 additions and 383 deletions

View File

@ -2,8 +2,10 @@
## Major Features and Improvements
* feat(sdk): Implement Registry Client [\#7597](https://github.com/kubeflow/pipelines/pull/7597)
* Write compiled JSON with formatting (multiline with indentation) [\#7712](https://github.com/kubeflow/pipelines/pull/7712)
## Breaking Changes
* Make CLI output consistent, readable, and usable [\#7739](https://github.com/kubeflow/pipelines/pull/7739)
### For Pipeline Authors
@ -12,6 +14,7 @@
## Deprecations
## Bug Fixes and Other Changes
* Fix CLI upload pipeline version [\#7722](https://github.com/kubeflow/pipelines/pull/7722)
## Documentation Updates
# Current Version (2.0.0-alpha.3)

View File

@ -136,25 +136,29 @@ def dsl_compile(
f'Failed to parse --pipeline-parameters argument: {pipeline_parameters}'
)
raise e
package_path = os.path.join(os.getcwd(), output)
_compile_pipeline_function(
pipeline_funcs=pipeline_funcs,
function_name=function_name,
pipeline_parameters=parsed_parameters,
package_path=output,
package_path=package_path,
disable_type_check=disable_type_check,
)
finally:
del sys.path[0]
click.echo(package_path)
def main():
logging.basicConfig(format='%(message)s', level=logging.INFO)
try:
dsl_compile.help = '(Deprecated. Please use `kfp dsl compile` instead.)\n\n' + dsl_compile.help
logging.error(
'`dsl-compile` is deprecated. Please use `kfp dsl compile` instead.'
)
click.echo(
'`dsl-compile` is deprecated. Please use `kfp dsl compile` instead.',
err=True)
dsl_compile(obj={}, auto_envvar_prefix='KFP')
except Exception as e:

View File

@ -1,11 +1,6 @@
import json
from typing import List
import click
import kfp_server_api
from kfp import client
from kfp.cli.output import OutputFormat
from kfp.cli.output import print_output
from kfp.cli import output
from kfp.cli.utils import parsing
from kfp_server_api.models.api_experiment import ApiExperiment
@ -26,12 +21,15 @@ def experiment():
@click.pass_context
def create(ctx: click.Context, description: str, name: str):
"""Create an experiment."""
client = ctx.obj['client']
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
experiment = client.create_experiment(name, description=description)
_display_experiment(experiment, output_format)
click.echo(f'Created experiment {experiment.id}.')
experiment = client_obj.create_experiment(name, description=description)
output.print_output(
experiment,
output.ModelType.EXPERIMENT,
output_format,
)
@experiment.command()
@ -55,22 +53,19 @@ def create(ctx: click.Context, description: str, name: str):
def list(ctx: click.Context, page_token: str, max_size: int, sort_by: str,
filter: str):
"""List experiments."""
client = ctx.obj['client']
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
response = client.list_experiments(
response = client_obj.list_experiments(
page_token=page_token,
page_size=max_size,
sort_by=sort_by,
filter=filter)
if response.experiments:
_display_experiments(response.experiments, output_format)
else:
if output_format == OutputFormat.json.name:
msg = json.dumps([])
else:
msg = 'No experiments found'
click.echo(msg)
output.print_output(
response.experiments or [],
output.ModelType.EXPERIMENT,
output_format,
)
@experiment.command()
@ -78,11 +73,15 @@ def list(ctx: click.Context, page_token: str, max_size: int, sort_by: str,
@click.pass_context
def get(ctx: click.Context, experiment_id: str):
"""Get information about an experiment."""
client = ctx.obj['client']
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
response = client.get_experiment(experiment_id)
_display_experiment(response, output_format)
experiment = client_obj.get_experiment(experiment_id)
output.print_output(
experiment,
output.ModelType.EXPERIMENT,
output_format,
)
@experiment.command()
@ -97,34 +96,11 @@ def delete(ctx: click.Context, experiment_id: str):
if not click.confirm(confirmation):
return
client = ctx.obj['client']
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
client.delete_experiment(experiment_id)
click.echo(f'Deleted experiment {experiment_id}.')
def _display_experiments(experiments: List[ApiExperiment],
output_format: OutputFormat):
headers = ['Experiment ID', 'Name', 'Created at']
data = [
[exp.id, exp.name, exp.created_at.isoformat()] for exp in experiments
]
print_output(data, headers, output_format, table_format='grid')
def _display_experiment(exp: kfp_server_api.ApiExperiment,
output_format: OutputFormat):
table = [
['ID', exp.id],
['Name', exp.name],
['Description', exp.description],
['Created at', exp.created_at.isoformat()],
]
if output_format == OutputFormat.table.name:
print_output([], ['Experiment Details'], output_format)
print_output(table, [], output_format, table_format='plain')
elif output_format == OutputFormat.json.name:
print_output(dict(table), [], output_format)
client_obj.delete_experiment(experiment_id)
output.print_deleted_text('experiment', experiment_id, output_format)
either_option_required = 'Either --experiment-id or --experiment-name is required.'
@ -144,18 +120,26 @@ either_option_required = 'Either --experiment-id or --experiment-name is require
@click.pass_context
def archive(ctx: click.Context, experiment_id: str, experiment_name: str):
"""Archive an experiment."""
client = ctx.obj['client']
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
if (experiment_id is None) == (experiment_name is None):
raise ValueError(
'Either --experiment-id or --experiment-name is required.')
raise ValueError(either_option_required)
if not experiment_id:
experiment = client.get_experiment(experiment_name=experiment_name)
experiment = client_obj.get_experiment(experiment_name=experiment_name)
experiment_id = experiment.id
client.archive_experiment(experiment_id=experiment_id)
click.echo(f'Archived experiment {experiment_id}.')
client_obj.archive_experiment(experiment_id=experiment_id)
if experiment_id:
experiment = client_obj.get_experiment(experiment_id=experiment_id)
else:
experiment = client_obj.get_experiment(experiment_name=experiment_name)
output.print_output(
experiment,
output.ModelType.EXPERIMENT,
output_format,
)
@experiment.command()
@ -172,15 +156,23 @@ def archive(ctx: click.Context, experiment_id: str, experiment_name: str):
@click.pass_context
def unarchive(ctx: click.Context, experiment_id: str, experiment_name: str):
"""Unarchive an experiment."""
client = ctx.obj['client']
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
if (experiment_id is None) == (experiment_name is None):
raise ValueError(
'Either --expriment-id or --experiment-name is required.')
raise ValueError(either_option_required)
if not experiment_id:
experiment = client.get_experiment(experiment_name=experiment_name)
experiment = client_obj.get_experiment(experiment_name=experiment_name)
experiment_id = experiment.id
client.unarchive_experiment(experiment_id=experiment_id)
click.echo(f'Unarchived experiment {experiment_id}.')
client_obj.unarchive_experiment(experiment_id=experiment_id)
if experiment_id:
experiment = client_obj.get_experiment(experiment_id=experiment_id)
else:
experiment = client_obj.get_experiment(experiment_name=experiment_name)
output.print_output(
experiment,
output.ModelType.EXPERIMENT,
output_format,
)

View File

@ -1,4 +1,4 @@
# Copyright 2020 The Kubeflow Authors
# Copyright 2020-2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -12,46 +12,260 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import dataclasses
import datetime
import enum
import json
from enum import Enum, unique
from typing import Union
from typing import Any, Dict, Union
import click
from tabulate import tabulate
import kfp_server_api
import tabulate
KFP_TABLE_FORMAT = 'custom-simple'
tabulate._table_formats.update({ # type: ignore
KFP_TABLE_FORMAT:
tabulate.TableFormat(
lineabove=None,
linebelowheader=None,
linebetweenrows=None,
linebelow=None,
headerrow=tabulate.DataRow('', ' ', ''),
datarow=tabulate.DataRow('', ' ', ''),
padding=0,
with_header_hide=['lineabove', 'linebelow'])
})
@unique
class OutputFormat(Enum):
@enum.unique
class OutputFormat(enum.Enum):
"""Enumerated class with the allowed output format constants."""
table = "table"
json = "json"
table = 'table'
json = 'json'
def print_output(data: Union[list, dict],
headers: list,
output_format: OutputFormat,
table_format: str = "simple"):
"""Prints the output from the cli command execution based on the specified
format.
RUN_STORAGE_STATE_MAP = {
kfp_server_api.ApiRunStorageState.AVAILABLE: 'Available',
kfp_server_api.ApiRunStorageState.ARCHIVED: 'Archived',
}
EXPERIMENT_STORAGE_STATE_MAP = {
kfp_server_api.ApiExperimentStorageState.AVAILABLE: 'Available',
kfp_server_api.ApiExperimentStorageState.ARCHIVED: 'Archived',
kfp_server_api.ApiExperimentStorageState.UNSPECIFIED: 'Unspecified',
}
def snake_to_header(string: str) -> str:
"""Converts a snake case string to a table header by replacing underscores
with spaces and making uppercase.
Args:
data (Union[list, dict]): Nested list of values representing the rows to be printed.
headers (list): List of values representing the column names to be printed
for the ``data``.
output_format (str): The desired formatting of the text from the command output.
table_format (str): The formatting for the table ``output_format``.
Default value set to ``simple``.
string (str): The snake case string.
Returns:
None: Prints the output.
str: The header.
"""
return string.replace('_', ' ').upper()
@dataclasses.dataclass
class ExperimentData:
id: str
name: str
created_at: str
state: str
def transform_experiment(exp: kfp_server_api.ApiExperiment) -> Dict[str, Any]:
return dataclasses.asdict(
ExperimentData(
id=exp.id,
name=exp.name,
created_at=exp.created_at.isoformat(),
state=EXPERIMENT_STORAGE_STATE_MAP.get(
exp.storage_state, EXPERIMENT_STORAGE_STATE_MAP[
kfp_server_api.ApiExperimentStorageState.AVAILABLE])))
@dataclasses.dataclass
class PipelineData:
id: str
name: str
created_at: str
default_version: str
def transform_pipeline(pipeline: kfp_server_api.ApiPipeline) -> Dict[str, Any]:
default_version_id = pipeline.default_version.id if hasattr(
pipeline,
'default_version') and pipeline.default_version is not None and hasattr(
pipeline.default_version, 'id') else None
return dataclasses.asdict(
PipelineData(
id=pipeline.id,
name=pipeline.name,
created_at=pipeline.created_at.isoformat(),
default_version=default_version_id))
@dataclasses.dataclass
class PipelineVersionData:
id: str
name: str
created_at: str
parent_id: str
def transform_pipeline_version(
pipeline_version: kfp_server_api.ApiPipelineVersion) -> Dict[str, Any]:
parent_id = next(
rr for rr in pipeline_version.resource_references
if rr.relationship == kfp_server_api.ApiRelationship.OWNER).key.id
return dataclasses.asdict(
PipelineVersionData(
id=pipeline_version.id,
name=pipeline_version.name,
created_at=pipeline_version.created_at.isoformat(),
parent_id=parent_id,
))
@dataclasses.dataclass
class RunData:
id: str
name: str
created_at: str
status: str
state: str
def transform_run(
run: Union[kfp_server_api.ApiRun, kfp_server_api.ApiRunDetail]
) -> Dict[str, Any]:
return dataclasses.asdict((RunData(
id=run.id,
name=run.name,
created_at=run.created_at.isoformat(),
status=run.status,
state=RUN_STORAGE_STATE_MAP.get(
run.storage_state,
RUN_STORAGE_STATE_MAP[kfp_server_api.ApiRunStorageState.AVAILABLE]))
))
@dataclasses.dataclass
class JobData:
id: str
name: str
created_at: str
experiment_id: str
status: str
def transform_job(recurring_run: kfp_server_api.ApiJob) -> Dict[str, Any]:
experiment_id = next(
rr for rr in recurring_run.resource_references
if rr.key.type == kfp_server_api.ApiResourceType.EXPERIMENT).key.id
return dataclasses.asdict(
JobData(
id=recurring_run.id,
name=recurring_run.name,
created_at=recurring_run.created_at.isoformat(),
experiment_id=experiment_id,
status=recurring_run.status))
@enum.unique
class ModelType(enum.Enum):
"""Enumerated class with the allowed output format constants."""
EXPERIMENT = 'EXPERIMENT'
PIPELINE = 'PIPELINE'
PIPELINE_VERSION = 'PIPELINE_VERSION'
RUN = 'RUN'
JOB = 'JOB'
transformer_map = {
ModelType.EXPERIMENT: transform_experiment,
ModelType.PIPELINE: transform_pipeline,
ModelType.PIPELINE_VERSION: transform_pipeline_version,
ModelType.RUN: transform_run,
ModelType.JOB: transform_job,
}
dataclass_map = {
ModelType.EXPERIMENT: ExperimentData,
ModelType.PIPELINE: PipelineData,
ModelType.PIPELINE_VERSION: PipelineVersionData,
ModelType.RUN: RunData,
ModelType.JOB: JobData,
}
class DatetimeEncoder(json.JSONEncoder):
"""JSON encoder for serializing datetime objects."""
def default(self, obj: Any) -> Any:
if isinstance(obj, datetime.datetime):
return obj.isoformat()
return json.JSONEncoder.default(self, obj)
def print_output(resources: list, model_type: ModelType,
output_format: str) -> None:
"""Prints output in tabular or JSON format, using click.echo.
Args:
resources (list): List of same-type resources to print.
output_format (str): One of 'table' or 'json'.
Raises:
NotImplementedError: If the ``output_format`` is unknown.
NotImplementedError: If the output format is not one of 'table' or 'json'.
"""
if isinstance(resources, list):
single_resource = False
else:
resources = [resources]
single_resource = True
if output_format == OutputFormat.table.name:
transformer = transformer_map[model_type]
output_headers = dataclass_map[ # type: ignore
model_type].__dataclass_fields__.keys()
resources = [transformer(r) for r in resources]
data = [list(resource.values()) for resource in resources]
headers = [snake_to_header(header) for header in output_headers]
click.echo(
tabulate.tabulate(data, headers=headers, tablefmt='custom-simple'))
elif output_format == OutputFormat.json.name:
data = resources[0].to_dict() if single_resource else [
resources.to_dict() for resources in resources
]
click.echo(json.dumps(data, indent=2, cls=DatetimeEncoder), nl=False)
else:
raise NotImplementedError(f'Unknown output format: {output_format}.')
def print_deleted_text(resource_type: str, resource_id: str,
output_format: str) -> None:
"""Prints a standardized output for deletion actions, using click.echo.
Args:
resource_type (str): The type of resource (e.g. 'experiment') deleted.
resource_id (str): The ID of the resource deleted.
output_format (str): The format for the output (one of 'table' or 'json').
Raises:
NotImplementedError: If the output format is not one of 'table' or 'json'.
"""
if output_format == OutputFormat.table.name:
click.echo(tabulate(data, headers=headers, tablefmt=table_format))
click.echo(f'{resource_type.capitalize()} {resource_id} deleted.')
elif output_format == OutputFormat.json.name:
output = [dict(zip(headers, row)) for row in data] if headers else data
click.echo(json.dumps(output, indent=4))
click.echo(json.dumps(resource_id, indent=2), nl=False)
else:
raise NotImplementedError(f"Unknown Output Format: {output_format}")
raise NotImplementedError(f'Unknown output format: {output_format}.')

View File

@ -12,15 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from optparse import Option
from typing import Any, Dict, List, Optional, Union
from typing import Optional
import click
import kfp_server_api
from kfp import client
from kfp.cli.output import OutputFormat
from kfp.cli.output import print_output
from kfp.cli import output
from kfp.cli.utils import deprecated_alias_group
from kfp.cli.utils import parsing
@ -52,12 +48,16 @@ def create(ctx: click.Context,
package_file: str,
description: str = None):
"""Upload a pipeline."""
client = ctx.obj['client']
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
pipeline = client.upload_pipeline(package_file, pipeline_name, description)
_display_pipeline(pipeline, output_format)
click.echo(f'Created pipeline {pipeline.id}.')
pipeline = client_obj.upload_pipeline(package_file, pipeline_name,
description)
output.print_output(
pipeline,
output.ModelType.PIPELINE,
output_format,
)
either_option_required = 'Either --pipeline-id or --pipeline-name is required.'
@ -98,23 +98,26 @@ def create_version(ctx: click.Context,
pipeline_name: Optional[str] = None,
description: Optional[str] = None):
"""Upload a version of a pipeline."""
client = ctx.obj['client']
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
if bool(pipeline_id) == bool(pipeline_name):
raise ValueError(either_option_required)
if pipeline_name is not None:
pipeline_id = client.get_pipeline_id(name=pipeline_name)
pipeline_id = client_obj.get_pipeline_id(name=pipeline_name)
if pipeline_id is None:
raise ValueError(
f"Can't find a pipeline with name: {pipeline_name}")
version = client.upload_pipeline_version(
version = client_obj.upload_pipeline_version(
pipeline_package_path=package_file,
pipeline_version_name=pipeline_version,
pipeline_id=pipeline_id,
pipeline_name=pipeline_name,
description=description)
_display_pipeline_version(version, output_format)
click.echo(f'Created pipeline version {version.id}.')
output.print_output(
version,
output.ModelType.PIPELINE,
output_format,
)
@pipeline.command()
@ -138,22 +141,19 @@ def create_version(ctx: click.Context,
def list(ctx: click.Context, page_token: str, max_size: int, sort_by: str,
filter: str):
"""List pipelines."""
client = ctx.obj['client']
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
response = client.list_pipelines(
response = client_obj.list_pipelines(
page_token=page_token,
page_size=max_size,
sort_by=sort_by,
filter=filter)
if response.pipelines:
_print_pipelines(response.pipelines, output_format)
else:
if output_format == OutputFormat.json.name:
msg = json.dumps([])
else:
msg = 'No pipelines found'
click.echo(msg)
output.print_output(
response.pipelines or [],
output.ModelType.PIPELINE,
output_format,
)
@pipeline.command()
@ -182,48 +182,36 @@ def list(ctx: click.Context, page_token: str, max_size: int, sort_by: str,
def list_versions(ctx: click.Context, pipeline_id: str, page_token: str,
max_size: int, sort_by: str, filter: str):
"""List versions of a pipeline."""
client = ctx.obj['client']
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
response = client.list_pipeline_versions(
response = client_obj.list_pipeline_versions(
pipeline_id,
page_token=page_token,
page_size=max_size,
sort_by=sort_by,
filter=filter)
if response.versions:
_print_pipeline_versions(response.versions, output_format)
else:
if output_format == OutputFormat.json.name:
msg = json.dumps([])
else:
msg = 'No pipeline or version found'
click.echo(msg)
output.print_output(
response.versions or [],
output.ModelType.PIPELINE,
output_format,
)
@pipeline.command()
@click.argument('version-id')
@click.pass_context
def delete_version(ctx: click.Context, version_id: str):
"""Delete a version of a pipeline.
Args:
version_id: id of the pipeline version.
Returns:
Object. If the method is called asynchronously, returns the request thread.
Throws:
Exception if pipeline version is not found.
"""
"""Delete a version of a pipeline."""
confirmation = f'Are you sure you want to delete pipeline version {version_id}?'
if not click.confirm(confirmation):
return
client = ctx.obj['client']
res = client.delete_pipeline_version(version_id)
click.echo(f'Deleted pipeline version {version_id}.')
return res
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
client_obj.delete_pipeline_version(version_id)
output.print_deleted_text('pipeline version', version_id, output_format)
@pipeline.command()
@ -231,11 +219,15 @@ def delete_version(ctx: click.Context, version_id: str):
@click.pass_context
def get(ctx: click.Context, pipeline_id: str):
"""Get information about a pipeline."""
client = ctx.obj['client']
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
pipeline = client.get_pipeline(pipeline_id)
_display_pipeline(pipeline, output_format)
pipeline = client_obj.get_pipeline(pipeline_id)
output.print_output(
pipeline,
output.ModelType.PIPELINE,
output_format,
)
@pipeline.command()
@ -243,11 +235,15 @@ def get(ctx: click.Context, pipeline_id: str):
@click.pass_context
def get_version(ctx: click.Context, version_id: str):
"""Get information about a version of a pipeline."""
client = ctx.obj['client']
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
version = client.get_pipeline_version(version_id=version_id)
_display_pipeline_version(version, output_format)
version = client_obj.get_pipeline_version(version_id=version_id)
output.print_output(
version,
output.ModelType.PIPELINE,
output_format,
)
@pipeline.command()
@ -255,75 +251,12 @@ def get_version(ctx: click.Context, version_id: str):
@click.pass_context
def delete(ctx: click.Context, pipeline_id: str):
"""Delete a pipeline."""
client = ctx.obj['client']
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
confirmation = f'Are you sure you want to delete pipeline {pipeline_id}?'
if not click.confirm(confirmation):
return
client.delete_pipeline(pipeline_id)
click.echo(f'Deleted pipeline {pipeline_id}.')
def _print_pipelines(pipelines: List[kfp_server_api.ApiPipeline],
output_format: OutputFormat):
headers = ['Pipeline ID', 'Name', 'Uploaded at']
data = [[pipeline.id, pipeline.name,
pipeline.created_at.isoformat()] for pipeline in pipelines]
print_output(data, headers, output_format, table_format='grid')
def _print_pipeline_versions(versions: List[kfp_server_api.ApiPipelineVersion],
output_format: OutputFormat):
headers = ['Version ID', 'Version name', 'Uploaded at', 'Pipeline ID']
data = [[
version.id, version.name,
version.created_at.isoformat(),
next(rr
for rr in version.resource_references
if rr.key.type == kfp_server_api.ApiResourceType.PIPELINE).key.id
]
for version in versions]
print_output(data, headers, output_format, table_format='grid')
def _display_pipeline(pipeline: kfp_server_api.ApiPipeline,
output_format: OutputFormat):
# Pipeline information
table = [['Pipeline ID', pipeline.id], ['Name', pipeline.name],
['Description', pipeline.description],
['Uploaded at', pipeline.created_at.isoformat()],
['Version ID', pipeline.default_version.id]]
# Pipeline parameter details
headers = ['Parameter Name', 'Default Value']
data = []
if pipeline.parameters is not None:
data = [[param.name, param.value] for param in pipeline.parameters]
if output_format == OutputFormat.table.name:
print_output([], ['Pipeline Details'], output_format)
print_output(table, [], output_format, table_format='plain')
print_output(data, headers, output_format, table_format='grid')
elif output_format == OutputFormat.json.name:
OutputType = Dict[str, Union[Dict[str, str], List[Dict[str, Any]]]]
output: OutputType = {'Pipeline Details': dict(table)}
params = [dict(zip(headers, item)) for item in data]
output['Pipeline Parameters'] = params
print_output(output, [], output_format)
def _display_pipeline_version(version: kfp_server_api.ApiPipelineVersion,
output_format: OutputFormat):
pipeline_id = next(
rr for rr in version.resource_references
if rr.key.type == kfp_server_api.ApiResourceType.PIPELINE).key.id
table = [['Pipeline ID', pipeline_id], ['Version name', version.name],
['Uploaded at', version.created_at.isoformat()],
['Version ID', version.id]]
if output_format == OutputFormat.table.name:
print_output([], ['Pipeline Version Details'], output_format)
print_output(table, [], output_format, table_format='plain')
elif output_format == OutputFormat.json.name:
print_output(dict(table), [], output_format)
client_obj.delete_pipeline(pipeline_id)
output.print_deleted_text('pipeline', pipeline_id, output_format)

View File

@ -12,14 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from typing import Any, Dict, List, Optional
import click
import kfp_server_api
from kfp import client
from kfp.cli.output import OutputFormat
from kfp.cli.output import print_output
from kfp.cli import output
from kfp.cli.utils import parsing
@ -121,7 +118,7 @@ def create(ctx: click.Context,
version_id: Optional[str] = None,
args: Optional[List[str]] = None):
"""Create a recurring run."""
client = ctx.obj['client']
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
if enable_caching is not None:
@ -138,14 +135,14 @@ def create(ctx: click.Context,
if (experiment_id is None) == (experiment_name is None):
raise ValueError(either_option_required)
if not experiment_id:
experiment = client.create_experiment(experiment_name)
experiment = client_obj.create_experiment(experiment_name)
experiment_id = experiment.id
# Ensure we only split on the first equals char so the value can contain
# equals signs too.
split_args: List = [arg.split('=', 1) for arg in args or []]
params: Dict[str, Any] = dict(split_args)
recurring_run = client.create_recurring_run(
recurring_run = client_obj.create_recurring_run(
cron_expression=cron_expression,
description=description,
enabled=enabled,
@ -161,8 +158,11 @@ def create(ctx: click.Context,
pipeline_id=pipeline_id,
start_time=start_time,
version_id=version_id)
_display_recurring_run(recurring_run, output_format)
click.echo(f'Created job {recurring_run.id}.')
output.print_output(
recurring_run,
output.ModelType.JOB,
output_format,
)
@recurring_run.command()
@ -193,23 +193,20 @@ def create(ctx: click.Context,
def list(ctx: click.Context, experiment_id: str, page_token: str, max_size: int,
sort_by: str, filter: str):
"""List recurring runs."""
client = ctx.obj['client']
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
response = client.list_recurring_runs(
response = client_obj.list_recurring_runs(
experiment_id=experiment_id,
page_token=page_token,
page_size=max_size,
sort_by=sort_by,
filter=filter)
if response.jobs:
_display_recurring_runs(response.jobs, output_format)
else:
if output_format == OutputFormat.json.name:
msg = json.dumps([])
else:
msg = 'No recurring runs found'
click.echo(msg)
output.print_output(
response.jobs or [],
output.ModelType.JOB,
output_format,
)
@recurring_run.command()
@ -217,11 +214,15 @@ def list(ctx: click.Context, experiment_id: str, page_token: str, max_size: int,
@click.pass_context
def get(ctx: click.Context, job_id: str):
"""Get information about a recurring run."""
client = ctx.obj['client']
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
response = client.get_recurring_run(job_id)
_display_recurring_run(response, output_format)
recurring_run = client_obj.get_recurring_run(job_id)
output.print_output(
recurring_run,
output.ModelType.JOB,
output_format,
)
@recurring_run.command()
@ -229,12 +230,13 @@ def get(ctx: click.Context, job_id: str):
@click.pass_context
def delete(ctx: click.Context, job_id: str):
"""Delete a recurring run."""
client = ctx.obj['client']
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
confirmation = f'Are you sure you want to delete job {job_id}?'
if not click.confirm(confirmation):
return
client.delete_job(job_id)
click.echo(f'Deleted job {job_id}.')
client_obj.delete_job(job_id)
output.print_deleted_text('job', job_id, output_format)
@recurring_run.command()
@ -242,9 +244,17 @@ def delete(ctx: click.Context, job_id: str):
@click.pass_context
def enable(ctx: click.Context, job_id: str):
"""Enable a recurring run."""
client = ctx.obj['client']
client.enable_job(job_id=job_id)
click.echo(f'Enabled job {job_id}.')
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
client_obj.enable_job(job_id=job_id)
# TODO: add wait option, since enable takes time to complete
recurring_run = client_obj.get_recurring_run(job_id=job_id)
output.print_output(
recurring_run,
output.ModelType.JOB,
output_format,
)
@recurring_run.command()
@ -252,26 +262,14 @@ def enable(ctx: click.Context, job_id: str):
@click.pass_context
def disable(ctx: click.Context, job_id: str):
"""Disable a recurring run."""
client = ctx.obj['client']
client.disable_job(job_id=job_id)
click.echo(f'Disabled job {job_id}.')
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
def _display_recurring_runs(recurring_runs: List[kfp_server_api.ApiJob],
output_format: OutputFormat):
headers = ['Recurring Run ID', 'Name']
data = [[rr.id, rr.name] for rr in recurring_runs]
print_output(data, headers, output_format, table_format='grid')
def _display_recurring_run(recurring_run: kfp_server_api.ApiJob,
output_format: OutputFormat):
table = [
['Recurring Run ID', recurring_run.id],
['Name', recurring_run.name],
]
if output_format == OutputFormat.table.name:
print_output([], ['Recurring Run Details'], output_format)
print_output(table, [], output_format, table_format='plain')
elif output_format == OutputFormat.json.name:
print_output(dict(table), [], output_format)
client_obj.disable_job(job_id=job_id)
# TODO: add wait option, since disable takes time to complete
recurring_run = client_obj.get_recurring_run(job_id=job_id)
output.print_output(
recurring_run,
output.ModelType.JOB,
output_format,
)

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import json
import shutil
import subprocess
@ -20,10 +19,8 @@ import time
from typing import List
import click
import kfp_server_api
from kfp import client
from kfp.cli.output import OutputFormat
from kfp.cli.output import print_output
from kfp.cli import output
from kfp.cli.utils import deprecated_alias_group
from kfp.cli.utils import parsing
@ -60,22 +57,19 @@ def run():
def list(ctx: click.Context, experiment_id: str, page_token: str, max_size: int,
sort_by: str, filter: str):
"""List pipeline runs."""
client = ctx.obj['client']
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
response = client.list_runs(
response = client_obj.list_runs(
experiment_id=experiment_id,
page_token=page_token,
page_size=max_size,
sort_by=sort_by,
filter=filter)
if response and response.runs:
_print_runs(response.runs, output_format)
else:
if output_format == OutputFormat.json.name:
msg = json.dumps([])
else:
msg = 'No runs found.'
click.echo(msg)
output.print_output(
response.runs or [],
output.ModelType.RUN,
output_format,
)
@run.command()
@ -121,14 +115,14 @@ def create(ctx: click.Context, experiment_name: str, run_name: str,
package_file: str, pipeline_id: str, pipeline_name: str, watch: bool,
timeout: int, version: str, args: List[str]):
"""Submit a pipeline run."""
client = ctx.obj['client']
client_obj: client.Client = ctx.obj['client']
namespace = ctx.obj['namespace']
output_format = ctx.obj['output']
if not run_name:
run_name = experiment_name
if not pipeline_id and pipeline_name:
pipeline_id = client.get_pipeline_id(name=pipeline_name)
pipeline_id = client_obj.get_pipeline_id(name=pipeline_name)
if not package_file and not pipeline_id and not version:
click.echo(
@ -138,8 +132,8 @@ def create(ctx: click.Context, experiment_name: str, run_name: str,
arg_dict = dict(arg.split('=', maxsplit=1) for arg in args)
experiment = client.create_experiment(experiment_name)
run = client.run_pipeline(
experiment = client_obj.create_experiment(experiment_name)
run = client_obj.run_pipeline(
experiment_id=experiment.id,
job_name=run_name,
pipeline_package_path=package_file,
@ -147,10 +141,14 @@ def create(ctx: click.Context, experiment_name: str, run_name: str,
pipeline_id=pipeline_id,
version_id=version)
if timeout > 0:
_wait_for_run_completion(client, run.id, timeout, output_format)
run_detail = client_obj.wait_for_run_completion(run.id, timeout)
output.print_output(
run_detail.run,
output.ModelType.RUN,
output_format,
)
else:
_display_run(client, namespace, run.id, watch, output_format)
click.echo(f'Created run {run.id}.')
display_run(client_obj, namespace, run.id, watch, output_format)
@run.command()
@ -170,11 +168,15 @@ def create(ctx: click.Context, experiment_name: str, run_name: str,
@click.pass_context
def get(ctx: click.Context, watch: bool, detail: bool, run_id: str):
"""Get information about a pipeline run."""
client = ctx.obj['client']
client_obj: client.Client = ctx.obj['client']
namespace = ctx.obj['namespace']
output_format = ctx.obj['output']
_display_run(client, namespace, run_id, watch, output_format, detail)
if detail:
output_format = 'json'
click.echo(
'The --detail/-d flag is deprecated. Please use --output=json instead.',
err=True)
display_run(client_obj, namespace, run_id, watch, output_format)
@run.command()
@ -182,18 +184,16 @@ def get(ctx: click.Context, watch: bool, detail: bool, run_id: str):
@click.pass_context
def archive(ctx: click.Context, run_id: str):
"""Archive a pipeline run."""
client = ctx.obj['client']
if run_id is None:
click.echo('You must provide a run-id.', err=True)
sys.exit(1)
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
get_response = client.get_run(run_id=run_id)
if get_response.run.storage_state == 'STORAGESTATE_ARCHIVED':
click.echo('Run is already archived.', err=True)
sys.exit(1)
client.archive_run(run_id=run_id)
click.echo(f'Archived run {run_id}.')
client_obj.archive_run(run_id=run_id)
run = client_obj.get_run(run_id=run_id)
output.print_output(
run.run,
output.ModelType.RUN,
output_format,
)
@run.command()
@ -201,18 +201,15 @@ def archive(ctx: click.Context, run_id: str):
@click.pass_context
def unarchive(ctx: click.Context, run_id: str):
"""Unarchive a pipeline run."""
client = ctx.obj['client']
if run_id is None:
click.echo('You must provide a run-id.', err=True)
sys.exit(1)
get_response = client.get_run(run_id=run_id)
if get_response.run.storage_state is None:
click.echo('Run is not archived.', err=True)
sys.exit(1)
client.unarchive_run(run_id=run_id)
click.echo(f'Unarchived run {run_id}.')
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
client_obj.unarchive_run(run_id=run_id)
run = client_obj.get_run(run_id=run_id)
output.print_output(
run.run,
output.ModelType.RUN,
output_format,
)
@run.command()
@ -225,32 +222,22 @@ def delete(ctx: click.Context, run_id: str):
if not click.confirm(confirmation):
return
client = ctx.obj['client']
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
client.delete_run(run_id)
click.echo(f'Deleted run {run_id}.')
client_obj.delete_run(run_id=run_id)
output.print_deleted_text('run', run_id, output_format)
def _display_run(client: client.Client,
namespace: str,
run_id: str,
watch: bool,
output_format: OutputFormat,
detail: bool = False):
def display_run(client: client.Client, namespace: str, run_id: str, watch: bool,
output_format: str):
run = client.get_run(run_id).run
if detail:
data = {
key:
value.isoformat() if isinstance(value, datetime.datetime) else value
for key, value in run.to_dict().items()
if key not in ['pipeline_spec'
] # useless but too much detailed field
}
click.echo(data)
return
_print_runs([run], output_format)
output.print_output(
run,
output.ModelType.RUN,
output_format,
)
if not watch:
return
argo_path = shutil.which('argo')
@ -277,23 +264,8 @@ def _display_run(client: client.Client,
if argo_workflow_name:
subprocess.run(
[argo_path, 'watch', argo_workflow_name, '-n', namespace])
_print_runs([run], output_format)
def _wait_for_run_completion(client: client.Client, run_id: str, timeout: int,
output_format: OutputFormat):
run_detail = client.wait_for_run_completion(run_id, timeout)
_print_runs([run_detail.run], output_format)
def _print_runs(runs: List[kfp_server_api.ApiRun], output_format: OutputFormat):
headers = ['run id', 'name', 'status', 'created at', 'experiment id']
data = [[
run.id, run.name, run.status,
run.created_at.isoformat(),
next(rr
for rr in run.resource_references
if rr.key.type == kfp_server_api.ApiResourceType.EXPERIMENT).key.id
]
for run in runs]
print_output(data, headers, output_format, table_format='grid')
output.print_output(
run,
output.ModelType.RUN,
output_format,
)

View File

@ -1,8 +1,9 @@
docformatter==1.4
mypy==0.941
pip-tools==6.0.0
pylint==2.12.2
types-protobuf==3.19.15
types-PyYAML==6.0.5
types-requests==2.27.14
types-tabulate==0.8.6
yapf==0.32.0
yapf==0.32.0

View File

@ -4,6 +4,7 @@
## kfp ##
absl-py>=0.9,<2
click>=7.1.2,<9
docstring-parser>=0.7.3,<1
# kfp.Client GCP auth
# Pin google-api-core version for the bug fixing in 1.31.5
@ -26,6 +27,7 @@ kubernetes>=8.0.0,<19
protobuf>=3.13.0,<4
PyYAML>=5.3,<6
requests-toolbelt>=0.8.0,<1
tabulate>=0.8.6,<1
## kfp.deprecated ##
cloudpickle>=2.0.0,<3
@ -36,8 +38,6 @@ strip-hints>=0.1.8,<1
uritemplate>=3.0.1,<4
## kfp.deprecated.cli ##
click>=7.1.2,<9
tabulate>=0.8.6,<1
typer>=0.3.2,<1.0
## standard library backports ##

View File

@ -4,20 +4,20 @@
#
# pip-compile --no-emit-index-url requirements.in
#
absl-py==0.11.0
absl-py==1.0.0
# via -r requirements.in
attrs==21.2.0
attrs==21.4.0
# via jsonschema
cachetools==4.2.4
cachetools==5.0.0
# via google-auth
certifi==2021.5.30
certifi==2021.10.8
# via
# kfp-server-api
# kubernetes
# requests
charset-normalizer==2.0.6
charset-normalizer==2.0.12
# via requests
click==8.0.3
click==8.1.3
# via
# -r requirements.in
# typer
@ -25,33 +25,33 @@ cloudpickle==2.0.0
# via -r requirements.in
deprecated==1.2.13
# via -r requirements.in
docstring-parser==0.11
docstring-parser==0.14.1
# via -r requirements.in
fire==0.4.0
# via -r requirements.in
google-api-core==2.6.0
google-api-core==2.7.3
# via
# -r requirements.in
# google-cloud-core
# google-cloud-storage
google-auth==1.35.0
google-auth==2.6.6
# via
# -r requirements.in
# google-api-core
# google-cloud-core
# google-cloud-storage
# kubernetes
google-cloud-core==2.1.0
google-cloud-core==2.3.0
# via google-cloud-storage
google-cloud-storage==2.2.1
google-cloud-storage==2.3.0
# via -r requirements.in
google-crc32c==1.3.0
# via google-resumable-media
google-resumable-media==2.3.2
# via google-cloud-storage
googleapis-common-protos==1.53.0
googleapis-common-protos==1.56.0
# via google-api-core
idna==3.2
idna==3.3
# via requests
importlib-metadata==4.11.3
# via
@ -61,13 +61,13 @@ jsonschema==3.2.0
# via -r requirements.in
kfp-pipeline-spec==0.1.14
# via -r requirements.in
kfp-server-api==2.0.0a0
kfp-server-api==2.0.0a2
# via -r requirements.in
kubernetes==18.20.0
# via -r requirements.in
oauthlib==3.1.1
oauthlib==3.2.0
# via requests-oauthlib
protobuf==3.17.3
protobuf==3.20.1
# via
# -r requirements.in
# google-api-core
@ -80,7 +80,7 @@ pyasn1==0.4.8
# rsa
pyasn1-modules==0.2.8
# via google-auth
pyrsistent==0.18.0
pyrsistent==0.18.1
# via jsonschema
python-dateutil==2.8.2
# via
@ -90,18 +90,18 @@ pyyaml==5.4.1
# via
# -r requirements.in
# kubernetes
requests==2.26.0
requests==2.27.1
# via
# google-api-core
# google-cloud-storage
# kubernetes
# requests-oauthlib
# requests-toolbelt
requests-oauthlib==1.3.0
requests-oauthlib==1.3.1
# via kubernetes
requests-toolbelt==0.9.1
# via -r requirements.in
rsa==4.7.2
rsa==4.8
# via google-auth
six==1.16.0
# via
@ -111,7 +111,6 @@ six==1.16.0
# jsonschema
# kfp-server-api
# kubernetes
# protobuf
# python-dateutil
strip-hints==0.1.10
# via -r requirements.in
@ -119,24 +118,24 @@ tabulate==0.8.9
# via -r requirements.in
termcolor==1.1.0
# via fire
typer==0.4.0
typer==0.4.1
# via -r requirements.in
typing-extensions==3.10.0.2 ; python_version < "3.9"
typing-extensions==4.2.0 ; python_version < "3.9"
# via
# -r requirements.in
# importlib-metadata
uritemplate==3.0.1
# via -r requirements.in
urllib3==1.26.7
urllib3==1.26.9
# via
# kfp-server-api
# kubernetes
# requests
websocket-client==1.2.1
websocket-client==1.3.2
# via kubernetes
wheel==0.37.0
wheel==0.37.1
# via strip-hints
wrapt==1.13.1
wrapt==1.14.1
# via deprecated
zipp==3.8.0
# via importlib-metadata