215 lines
7.4 KiB
Python
215 lines
7.4 KiB
Python
# Copyright 2021 The Kubeflow Authors
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from typing import Any, Dict, List, Optional
|
|
import json
|
|
import click
|
|
from kfp.deprecated.cli.output import print_output, OutputFormat
|
|
import kfp_server_api
|
|
|
|
|
|
@click.group()
|
|
def recurring_run():
|
|
"""Manage recurring-run resources."""
|
|
|
|
|
|
@recurring_run.command()
|
|
@click.option(
|
|
'--catchup/--no-catchup',
|
|
help='Whether the recurring run should catch up if behind schedule.',
|
|
type=bool)
|
|
@click.option(
|
|
'--cron-expression',
|
|
help='A cron expression representing a set of times, using 6 space-separated fields, e.g. "0 0 9 ? * 2-6".'
|
|
)
|
|
@click.option('--description', help='The description of the recurring run.')
|
|
@click.option(
|
|
'--enable-caching/--disable-caching',
|
|
help='Optional. Whether or not to enable caching for the run.',
|
|
type=bool)
|
|
@click.option(
|
|
'--enabled/--disabled',
|
|
help='A bool indicating whether the recurring run is enabled or disabled.',
|
|
type=bool)
|
|
@click.option(
|
|
'--end-time',
|
|
help='The RFC3339 time string of the time when to end the job.')
|
|
@click.option(
|
|
'--experiment-id',
|
|
help='The ID of the experiment to create the recurring run under, can only supply either an experiment ID or name.'
|
|
)
|
|
@click.option(
|
|
'--experiment-name',
|
|
help='The name of the experiment to create the recurring run under, can only supply either an experiment ID or name.'
|
|
)
|
|
@click.option('--job-name', help='The name of the recurring run.')
|
|
@click.option(
|
|
'--interval-second',
|
|
help='Integer indicating the seconds between two recurring runs in for a periodic schedule.'
|
|
)
|
|
@click.option(
|
|
'--max-concurrency',
|
|
help='Integer indicating how many jobs can be run in parallel.',
|
|
type=int)
|
|
@click.option(
|
|
'--pipeline-id',
|
|
help='The ID of the pipeline to use to create the recurring run.')
|
|
@click.option(
|
|
'--pipeline-package-path',
|
|
help='Local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml).'
|
|
)
|
|
@click.option(
|
|
'--start-time',
|
|
help='The RFC3339 time string of the time when to start the job.')
|
|
@click.option('--version-id', help='The id of a pipeline version.')
|
|
@click.argument("args", nargs=-1)
|
|
@click.pass_context
|
|
def create(ctx: click.Context,
|
|
job_name: str,
|
|
experiment_id: Optional[str] = None,
|
|
experiment_name: Optional[str] = None,
|
|
catchup: Optional[bool] = None,
|
|
cron_expression: Optional[str] = None,
|
|
enabled: Optional[bool] = None,
|
|
description: Optional[str] = None,
|
|
enable_caching: Optional[bool] = None,
|
|
end_time: Optional[str] = None,
|
|
interval_second: Optional[int] = None,
|
|
max_concurrency: Optional[int] = None,
|
|
params: Optional[dict] = None,
|
|
pipeline_package_path: Optional[str] = None,
|
|
pipeline_id: Optional[str] = None,
|
|
start_time: Optional[str] = None,
|
|
version_id: Optional[str] = None,
|
|
args: Optional[List[str]] = None):
|
|
"""Create a recurring run."""
|
|
client = ctx.obj["client"]
|
|
output_format = ctx.obj['output']
|
|
|
|
if (experiment_id is None) == (experiment_name is None):
|
|
raise ValueError('Either experiment_id or experiment_name is required')
|
|
if not experiment_id:
|
|
experiment = client.create_experiment(experiment_name)
|
|
experiment_id = experiment.id
|
|
|
|
# Ensure we only split on the first equals char so the value can contain
|
|
# equals signs too.
|
|
split_args: List = [arg.split("=", 1) for arg in args or []]
|
|
params: Dict[str, Any] = dict(split_args)
|
|
recurring_run = client.create_recurring_run(
|
|
cron_expression=cron_expression,
|
|
description=description,
|
|
enabled=enabled,
|
|
enable_caching=enable_caching,
|
|
end_time=end_time,
|
|
experiment_id=experiment_id,
|
|
interval_second=interval_second,
|
|
job_name=job_name,
|
|
max_concurrency=max_concurrency,
|
|
no_catchup=not catchup,
|
|
params=params,
|
|
pipeline_package_path=pipeline_package_path,
|
|
pipeline_id=pipeline_id,
|
|
start_time=start_time,
|
|
version_id=version_id)
|
|
|
|
_display_recurring_run(recurring_run, output_format)
|
|
|
|
|
|
@recurring_run.command()
|
|
@click.option(
|
|
'-e',
|
|
'--experiment-id',
|
|
help='Parent experiment ID of listed recurring runs.')
|
|
@click.option(
|
|
'--page-token', default='', help="Token for starting of the page.")
|
|
@click.option(
|
|
'-m',
|
|
'--max-size',
|
|
default=100,
|
|
help="Max size of the listed recurring runs.")
|
|
@click.option(
|
|
'--sort-by',
|
|
default="created_at desc",
|
|
help="Can be '[field_name]', '[field_name] desc'. For example, 'name desc'."
|
|
)
|
|
@click.option(
|
|
'--filter',
|
|
help=(
|
|
"filter: A url-encoded, JSON-serialized Filter protocol buffer "
|
|
"(see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto))."
|
|
))
|
|
@click.pass_context
|
|
def list(ctx: click.Context, experiment_id: str, page_token: str, max_size: int,
|
|
sort_by: str, filter: str):
|
|
"""List recurring runs."""
|
|
client = ctx.obj['client']
|
|
output_format = ctx.obj['output']
|
|
|
|
response = client.list_recurring_runs(
|
|
experiment_id=experiment_id,
|
|
page_token=page_token,
|
|
page_size=max_size,
|
|
sort_by=sort_by,
|
|
filter=filter)
|
|
if response.jobs:
|
|
_display_recurring_runs(response.jobs, output_format)
|
|
else:
|
|
if output_format == OutputFormat.json.name:
|
|
msg = json.dumps([])
|
|
else:
|
|
msg = "No recurring runs found"
|
|
click.echo(msg)
|
|
|
|
|
|
@recurring_run.command()
|
|
@click.argument("job-id")
|
|
@click.pass_context
|
|
def get(ctx: click.Context, job_id: str):
|
|
"""Get detailed information about an experiment."""
|
|
client = ctx.obj["client"]
|
|
output_format = ctx.obj["output"]
|
|
|
|
response = client.get_recurring_run(job_id)
|
|
_display_recurring_run(response, output_format)
|
|
|
|
|
|
@recurring_run.command()
|
|
@click.argument("job-id")
|
|
@click.pass_context
|
|
def delete(ctx: click.Context, job_id: str):
|
|
"""Delete a recurring run."""
|
|
client = ctx.obj["client"]
|
|
client.delete_job(job_id)
|
|
|
|
|
|
def _display_recurring_runs(recurring_runs: List[kfp_server_api.ApiJob],
|
|
output_format: OutputFormat):
|
|
headers = ["Recurring Run ID", "Name"]
|
|
data = [[rr.id, rr.name] for rr in recurring_runs]
|
|
print_output(data, headers, output_format, table_format="grid")
|
|
|
|
|
|
def _display_recurring_run(recurring_run: kfp_server_api.ApiJob,
|
|
output_format: OutputFormat):
|
|
table = [
|
|
["Recurring Run ID", recurring_run.id],
|
|
["Name", recurring_run.name],
|
|
]
|
|
if output_format == OutputFormat.table.name:
|
|
print_output([], ["Recurring Run Details"], output_format)
|
|
print_output(table, [], output_format, table_format="plain")
|
|
elif output_format == OutputFormat.json.name:
|
|
print_output(dict(table), [], output_format)
|