pipelines/sdk/python/kfp/deprecated/cli/recurring_run.py

215 lines
7.4 KiB
Python

# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Dict, List, Optional
import json
import click
from kfp.deprecated.cli.output import print_output, OutputFormat
import kfp_server_api
@click.group()
def recurring_run():
"""Manage recurring-run resources."""
@recurring_run.command()
@click.option(
'--catchup/--no-catchup',
help='Whether the recurring run should catch up if behind schedule.',
type=bool)
@click.option(
'--cron-expression',
help='A cron expression representing a set of times, using 6 space-separated fields, e.g. "0 0 9 ? * 2-6".'
)
@click.option('--description', help='The description of the recurring run.')
@click.option(
'--enable-caching/--disable-caching',
help='Optional. Whether or not to enable caching for the run.',
type=bool)
@click.option(
'--enabled/--disabled',
help='A bool indicating whether the recurring run is enabled or disabled.',
type=bool)
@click.option(
'--end-time',
help='The RFC3339 time string of the time when to end the job.')
@click.option(
'--experiment-id',
help='The ID of the experiment to create the recurring run under, can only supply either an experiment ID or name.'
)
@click.option(
'--experiment-name',
help='The name of the experiment to create the recurring run under, can only supply either an experiment ID or name.'
)
@click.option('--job-name', help='The name of the recurring run.')
@click.option(
'--interval-second',
help='Integer indicating the seconds between two recurring runs in for a periodic schedule.'
)
@click.option(
'--max-concurrency',
help='Integer indicating how many jobs can be run in parallel.',
type=int)
@click.option(
'--pipeline-id',
help='The ID of the pipeline to use to create the recurring run.')
@click.option(
'--pipeline-package-path',
help='Local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml).'
)
@click.option(
'--start-time',
help='The RFC3339 time string of the time when to start the job.')
@click.option('--version-id', help='The id of a pipeline version.')
@click.argument("args", nargs=-1)
@click.pass_context
def create(ctx: click.Context,
job_name: str,
experiment_id: Optional[str] = None,
experiment_name: Optional[str] = None,
catchup: Optional[bool] = None,
cron_expression: Optional[str] = None,
enabled: Optional[bool] = None,
description: Optional[str] = None,
enable_caching: Optional[bool] = None,
end_time: Optional[str] = None,
interval_second: Optional[int] = None,
max_concurrency: Optional[int] = None,
params: Optional[dict] = None,
pipeline_package_path: Optional[str] = None,
pipeline_id: Optional[str] = None,
start_time: Optional[str] = None,
version_id: Optional[str] = None,
args: Optional[List[str]] = None):
"""Create a recurring run."""
client = ctx.obj["client"]
output_format = ctx.obj['output']
if (experiment_id is None) == (experiment_name is None):
raise ValueError('Either experiment_id or experiment_name is required')
if not experiment_id:
experiment = client.create_experiment(experiment_name)
experiment_id = experiment.id
# Ensure we only split on the first equals char so the value can contain
# equals signs too.
split_args: List = [arg.split("=", 1) for arg in args or []]
params: Dict[str, Any] = dict(split_args)
recurring_run = client.create_recurring_run(
cron_expression=cron_expression,
description=description,
enabled=enabled,
enable_caching=enable_caching,
end_time=end_time,
experiment_id=experiment_id,
interval_second=interval_second,
job_name=job_name,
max_concurrency=max_concurrency,
no_catchup=not catchup,
params=params,
pipeline_package_path=pipeline_package_path,
pipeline_id=pipeline_id,
start_time=start_time,
version_id=version_id)
_display_recurring_run(recurring_run, output_format)
@recurring_run.command()
@click.option(
'-e',
'--experiment-id',
help='Parent experiment ID of listed recurring runs.')
@click.option(
'--page-token', default='', help="Token for starting of the page.")
@click.option(
'-m',
'--max-size',
default=100,
help="Max size of the listed recurring runs.")
@click.option(
'--sort-by',
default="created_at desc",
help="Can be '[field_name]', '[field_name] desc'. For example, 'name desc'."
)
@click.option(
'--filter',
help=(
"filter: A url-encoded, JSON-serialized Filter protocol buffer "
"(see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto))."
))
@click.pass_context
def list(ctx: click.Context, experiment_id: str, page_token: str, max_size: int,
sort_by: str, filter: str):
"""List recurring runs."""
client = ctx.obj['client']
output_format = ctx.obj['output']
response = client.list_recurring_runs(
experiment_id=experiment_id,
page_token=page_token,
page_size=max_size,
sort_by=sort_by,
filter=filter)
if response.jobs:
_display_recurring_runs(response.jobs, output_format)
else:
if output_format == OutputFormat.json.name:
msg = json.dumps([])
else:
msg = "No recurring runs found"
click.echo(msg)
@recurring_run.command()
@click.argument("job-id")
@click.pass_context
def get(ctx: click.Context, job_id: str):
"""Get detailed information about an experiment."""
client = ctx.obj["client"]
output_format = ctx.obj["output"]
response = client.get_recurring_run(job_id)
_display_recurring_run(response, output_format)
@recurring_run.command()
@click.argument("job-id")
@click.pass_context
def delete(ctx: click.Context, job_id: str):
"""Delete a recurring run."""
client = ctx.obj["client"]
client.delete_job(job_id)
def _display_recurring_runs(recurring_runs: List[kfp_server_api.ApiJob],
output_format: OutputFormat):
headers = ["Recurring Run ID", "Name"]
data = [[rr.id, rr.name] for rr in recurring_runs]
print_output(data, headers, output_format, table_format="grid")
def _display_recurring_run(recurring_run: kfp_server_api.ApiJob,
output_format: OutputFormat):
table = [
["Recurring Run ID", recurring_run.id],
["Name", recurring_run.name],
]
if output_format == OutputFormat.table.name:
print_output([], ["Recurring Run Details"], output_format)
print_output(table, [], output_format, table_format="plain")
elif output_format == OutputFormat.json.name:
print_output(dict(table), [], output_format)