feat(sdk): port cli code from v1 to v2 (#7547)

* port cli

* fix docker mock
This commit is contained in:
Connor McCarthy 2022-04-13 14:19:52 -06:00 committed by GitHub
parent 94960cf5ab
commit d46fafe4ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 2750 additions and 3 deletions

View File

@ -0,0 +1,13 @@
# Copyright 2018 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

82
sdk/python/kfp/cli/cli.py Normal file
View File

@ -0,0 +1,82 @@
# Copyright 2018 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys
import click
import typer
from kfp.cli import components
from kfp.cli.diagnose_me_cli import diagnose_me
from kfp.cli.experiment import experiment
from kfp.cli.output import OutputFormat
from kfp.cli.pipeline import pipeline
from kfp.cli.recurring_run import recurring_run
from kfp.cli.run import run
from kfp.client import Client
_NO_CLIENT_COMMANDS = ['diagnose_me', 'components']
@click.group()
@click.option('--endpoint', help='Endpoint of the KFP API service to connect.')
@click.option('--iap-client-id', help='Client ID for IAP protected endpoint.')
@click.option(
'-n',
'--namespace',
default='kubeflow',
show_default=True,
help='Kubernetes namespace to connect to the KFP API.')
@click.option(
'--other-client-id',
help='Client ID for IAP protected endpoint to obtain the refresh token.')
@click.option(
'--other-client-secret',
help='Client ID for IAP protected endpoint to obtain the refresh token.')
@click.option(
'--output',
type=click.Choice(list(map(lambda x: x.name, OutputFormat))),
default=OutputFormat.table.name,
show_default=True,
help='The formatting style for command output.')
@click.pass_context
def cli(ctx: click.Context, endpoint: str, iap_client_id: str, namespace: str,
other_client_id: str, other_client_secret: str, output: OutputFormat):
"""kfp is the command line interface to KFP service.
Feature stage:
[Alpha](https://github.com/kubeflow/pipelines/blob/07328e5094ac2981d3059314cc848fbb71437a76/docs/release/feature-stages.md#alpha)
"""
if ctx.invoked_subcommand in _NO_CLIENT_COMMANDS:
# Do not create a client for these subcommands
return
ctx.obj['client'] = Client(endpoint, iap_client_id, namespace,
other_client_id, other_client_secret)
ctx.obj['namespace'] = namespace
ctx.obj['output'] = output
def main():
logging.basicConfig(format='%(message)s', level=logging.INFO)
cli.add_command(run)
cli.add_command(recurring_run)
cli.add_command(pipeline)
cli.add_command(diagnose_me, 'diagnose_me')
cli.add_command(experiment)
cli.add_command(typer.main.get_command(components.app))
try:
cli(obj={}, auto_envvar_prefix='KFP')
except Exception as e:
click.echo(str(e), err=True)
sys.exit(1)

View File

@ -0,0 +1,409 @@
# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import configparser
import contextlib
import enum
import pathlib
import shutil
import subprocess
import tempfile
from typing import Any, List, Optional
_DOCKER_IS_PRESENT = True
try:
import docker
except ImportError:
_DOCKER_IS_PRESENT = False
import kfp as kfp
import typer
from kfp.components import component_factory, kfp_config, utils
_REQUIREMENTS_TXT = 'requirements.txt'
_DOCKERFILE = 'Dockerfile'
_DOCKERFILE_TEMPLATE = '''
FROM {base_image}
WORKDIR {component_root_dir}
COPY requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
{maybe_copy_kfp_package}
RUN pip install --no-cache-dir {kfp_package_path}
COPY . .
'''
_DOCKERIGNORE = '.dockerignore'
# Location in which to write out shareable YAML for components.
_COMPONENT_METADATA_DIR = 'component_metadata'
_DOCKERIGNORE_TEMPLATE = '''
{}/
'''.format(_COMPONENT_METADATA_DIR)
# Location at which v2 Python function-based components will stored
# in containerized components.
_COMPONENT_ROOT_DIR = pathlib.Path('/usr/local/src/kfp/components')
@contextlib.contextmanager
def _registered_modules():
registered_modules = {}
component_factory.REGISTERED_MODULES = registered_modules
try:
yield registered_modules
finally:
component_factory.REGISTERED_MODULES = None
class _Engine(str, enum.Enum):
"""Supported container build engines."""
DOCKER = 'docker'
KANIKO = 'kaniko'
CLOUD_BUILD = 'cloudbuild'
app = typer.Typer()
def _info(message: Any):
info = typer.style('INFO', fg=typer.colors.GREEN)
typer.echo('{}: {}'.format(info, message))
def _warning(message: Any):
info = typer.style('WARNING', fg=typer.colors.YELLOW)
typer.echo('{}: {}'.format(info, message))
def _error(message: Any):
info = typer.style('ERROR', fg=typer.colors.RED)
typer.echo('{}: {}'.format(info, message))
class _ComponentBuilder():
"""Helper class for building containerized v2 KFP components."""
def __init__(
self,
context_directory: pathlib.Path,
kfp_package_path: Optional[pathlib.Path] = None,
component_filepattern: str = '**/*.py',
):
"""ComponentBuilder builds containerized components.
Args:
context_directory: Directory containing one or more Python files
with one or more KFP v2 components.
kfp_package_path: Path to a pip-installable location for KFP.
This can either be pointing to KFP SDK root directory located in
a local clone of the KFP repo, or a git+https location.
If left empty, defaults to KFP on PyPi.
"""
self._context_directory = context_directory
self._dockerfile = self._context_directory / _DOCKERFILE
self._component_filepattern = component_filepattern
self._components: List[
component_factory.component_factory.ComponentInfo] = []
# This is only set if we need to install KFP from local copy.
self._maybe_copy_kfp_package = ''
if kfp_package_path is None:
self._kfp_package_path = 'kfp=={}'.format(kfp.__version__)
elif kfp_package_path.is_dir():
_info('Building KFP package from local directory {}'.format(
typer.style(str(kfp_package_path), fg=typer.colors.CYAN)))
temp_dir = pathlib.Path(tempfile.mkdtemp())
try:
subprocess.run([
'python3',
kfp_package_path / 'setup.py',
'bdist_wheel',
'--dist-dir',
str(temp_dir),
],
cwd=kfp_package_path)
wheel_files = list(temp_dir.glob('*.whl'))
if len(wheel_files) != 1:
_error('Failed to find built KFP wheel under {}'.format(
temp_dir))
raise typer.Exit(1)
wheel_file = wheel_files[0]
shutil.copy(wheel_file, self._context_directory)
self._kfp_package_path = wheel_file.name
self._maybe_copy_kfp_package = 'COPY {wheel_name} {wheel_name}'.format(
wheel_name=self._kfp_package_path)
except subprocess.CalledProcessError as e:
_error('Failed to build KFP wheel locally:\n{}'.format(e))
raise typer.Exit(1)
finally:
_info('Cleaning up temporary directory {}'.format(temp_dir))
shutil.rmtree(temp_dir)
else:
self._kfp_package_path = kfp_package_path
_info('Building component using KFP package path: {}'.format(
typer.style(str(self._kfp_package_path), fg=typer.colors.CYAN)))
self._context_directory_files = [
file.name
for file in self._context_directory.glob('*')
if file.is_file()
]
self._component_files = [
file for file in self._context_directory.glob(
self._component_filepattern) if file.is_file()
]
self._base_image = None
self._target_image = None
self._load_components()
def _load_components(self):
if not self._component_files:
_error(
'No component files found matching pattern `{}` in directory {}'
.format(self._component_filepattern, self._context_directory))
raise typer.Exit(1)
for python_file in self._component_files:
with _registered_modules() as component_modules:
module_name = python_file.name[:-len('.py')]
module_directory = python_file.parent
utils.load_module(
module_name=module_name, module_directory=module_directory)
formatted_module_file = typer.style(
str(python_file), fg=typer.colors.CYAN)
if not component_modules:
_error('No KFP components found in file {}'.format(
formatted_module_file))
raise typer.Exit(1)
_info('Found {} component(s) in file {}:'.format(
len(component_modules), formatted_module_file))
for name, component in component_modules.items():
_info('{}: {}'.format(name, component))
self._components.append(component)
base_images = set([info.base_image for info in self._components])
target_images = set([info.target_image for info in self._components])
if len(base_images) != 1:
_error('Found {} unique base_image values {}. Components'
' must specify the same base_image and target_image.'.format(
len(base_images), base_images))
raise typer.Exit(1)
self._base_image = base_images.pop()
if self._base_image is None:
_error('Did not find a base_image specified in any of the'
' components. A base_image must be specified in order to'
' build the component.')
raise typer.Exit(1)
_info('Using base image: {}'.format(
typer.style(self._base_image, fg=typer.colors.YELLOW)))
if len(target_images) != 1:
_error('Found {} unique target_image values {}. Components'
' must specify the same base_image and'
' target_image.'.format(len(target_images), target_images))
raise typer.Exit(1)
self._target_image = target_images.pop()
if self._target_image is None:
_error('Did not find a target_image specified in any of the'
' components. A target_image must be specified in order'
' to build the component.')
raise typer.Exit(1)
_info('Using target image: {}'.format(
typer.style(self._target_image, fg=typer.colors.YELLOW)))
def _maybe_write_file(self,
filename: str,
contents: str,
overwrite: bool = False):
formatted_filename = typer.style(filename, fg=typer.colors.CYAN)
if filename in self._context_directory_files:
_info('Found existing file {} under {}.'.format(
formatted_filename, self._context_directory))
if not overwrite:
_info('Leaving this file untouched.')
return
else:
_warning(
'Overwriting existing file {}'.format(formatted_filename))
else:
_warning('{} not found under {}. Creating one.'.format(
formatted_filename, self._context_directory))
filepath = self._context_directory / filename
with open(filepath, 'w') as f:
f.write('# Generated by KFP.\n{}'.format(contents))
_info('Generated file {}.'.format(filepath))
def maybe_generate_requirements_txt(self):
self._maybe_write_file(_REQUIREMENTS_TXT, '')
def maybe_generate_dockerignore(self):
self._maybe_write_file(_DOCKERIGNORE, _DOCKERIGNORE_TEMPLATE)
def write_component_files(self):
for component_info in self._components:
filename = (
component_info.output_component_file or
component_info.function_name + '.yaml')
container_filename = (
self._context_directory / _COMPONENT_METADATA_DIR / filename)
container_filename.parent.mkdir(exist_ok=True, parents=True)
component_info.component_spec.save_to_component_yaml(
str(container_filename))
def generate_kfp_config(self):
config = kfp_config.KFPConfig(config_directory=self._context_directory)
for component_info in self._components:
relative_path = component_info.module_path.relative_to(
self._context_directory)
config.add_component(
function_name=component_info.function_name, path=relative_path)
config.save()
def maybe_generate_dockerfile(self, overwrite_dockerfile: bool = False):
dockerfile_contents = _DOCKERFILE_TEMPLATE.format(
base_image=self._base_image,
maybe_copy_kfp_package=self._maybe_copy_kfp_package,
component_root_dir=_COMPONENT_ROOT_DIR,
kfp_package_path=self._kfp_package_path)
self._maybe_write_file(_DOCKERFILE, dockerfile_contents,
overwrite_dockerfile)
def build_image(self, push_image: bool = True):
_info('Building image {} using Docker...'.format(
typer.style(self._target_image, fg=typer.colors.YELLOW)))
client = docker.from_env()
docker_log_prefix = typer.style('Docker', fg=typer.colors.CYAN)
try:
context = str(self._context_directory)
logs = client.api.build(
path=context,
dockerfile='Dockerfile',
tag=self._target_image,
decode=True,
)
for log in logs:
message = log.get('stream', '').rstrip('\n')
if message:
_info('{}: {}'.format(docker_log_prefix, message))
except docker.errors.BuildError as e:
for log in e.build_log:
message = log.get('message', '').rstrip('\n')
if message:
_error('{}: {}'.format(docker_log_prefix, message))
_error('{}: {}'.format(docker_log_prefix, e))
raise typer.Exit(1)
if not push_image:
return
_info('Pushing image {}...'.format(
typer.style(self._target_image, fg=typer.colors.YELLOW)))
try:
response = client.images.push(
self._target_image, stream=True, decode=True)
for log in response:
status = log.get('status', '').rstrip('\n')
layer = log.get('id', '')
if status:
_info('{}: {} {}'.format(docker_log_prefix, layer, status))
except docker.errors.BuildError as e:
_error('{}: {}'.format(docker_log_prefix, e))
raise e
_info('Built and pushed component container {}'.format(
typer.style(self._target_image, fg=typer.colors.YELLOW)))
@app.callback()
def components():
"""Builds shareable, containerized components."""
pass
@app.command()
def build(components_directory: pathlib.Path = typer.Argument(
...,
help="Path to a directory containing one or more Python"
" files with KFP v2 components. The container will be built"
" with this directory as the context."),
component_filepattern: str = typer.Option(
'**/*.py',
help="Filepattern to use when searching for KFP components. The"
" default searches all Python files in the specified directory."),
engine: _Engine = typer.Option(
_Engine.DOCKER,
help="Engine to use to build the component's container."),
kfp_package_path: Optional[pathlib.Path] = typer.Option(
None, help="A pip-installable path to the KFP package."),
overwrite_dockerfile: bool = typer.Option(
False,
help="Set this to true to always generate a Dockerfile"
" as part of the build process"),
push_image: bool = typer.Option(
True, help="Push the built image to its remote repository.")):
"""Builds containers for KFP v2 Python-based components."""
components_directory = components_directory.resolve()
if not components_directory.is_dir():
_error('{} does not seem to be a valid directory.'.format(
components_directory))
raise typer.Exit(1)
if engine != _Engine.DOCKER:
_error('Currently, only `docker` is supported for --engine.')
raise typer.Exit(1)
if engine == _Engine.DOCKER:
if not _DOCKER_IS_PRESENT:
_error(
'The `docker` Python package was not found in the current'
' environment. Please run `pip install docker` to install it.'
' Optionally, you can also install KFP with all of its'
' optional dependencies by running `pip install kfp[all]`.')
raise typer.Exit(1)
builder = _ComponentBuilder(
context_directory=components_directory,
kfp_package_path=kfp_package_path,
component_filepattern=component_filepattern,
)
builder.write_component_files()
builder.generate_kfp_config()
builder.maybe_generate_requirements_txt()
builder.maybe_generate_dockerignore()
builder.maybe_generate_dockerfile(overwrite_dockerfile=overwrite_dockerfile)
builder.build_image(push_image=push_image)
if __name__ == '__main__':
app()

View File

@ -0,0 +1,494 @@
# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for `components` command group in KFP CLI."""
import contextlib
import importlib
import pathlib
import sys
import textwrap
import unittest
from typing import List, Optional, Union
from unittest import mock
from typer import testing
# Docker is an optional install, but we need the import to succeed for tests.
# So we patch it before importing kfp.cli.components.
try:
import docker # pylint: disable=unused-import
except ImportError:
sys.modules['docker'] = mock.Mock()
from kfp.cli import components
from kfp.deprecated.cli import components
_COMPONENT_TEMPLATE = '''
from kfp.dsl import *
@component(
base_image={base_image},
target_image={target_image},
output_component_file={output_component_file})
def {func_name}():
pass
'''
def _make_component(func_name: str,
base_image: Optional[str] = None,
target_image: Optional[str] = None,
output_component_file: Optional[str] = None) -> str:
return textwrap.dedent('''
from kfp.dsl import *
@component(
base_image={base_image},
target_image={target_image},
output_component_file={output_component_file})
def {func_name}():
pass
''').format(
base_image=repr(base_image),
target_image=repr(target_image),
output_component_file=repr(output_component_file),
func_name=func_name)
def _write_file(filename: str, file_contents: str):
filepath = pathlib.Path(filename)
filepath.parent.mkdir(exist_ok=True, parents=True)
filepath.write_text(file_contents)
def _write_components(filename: str, component_template: Union[List[str], str]):
if isinstance(component_template, list):
file_contents = '\n\n'.join(component_template)
else:
file_contents = component_template
_write_file(filename=filename, file_contents=file_contents)
class Test(unittest.TestCase):
def setUp(self) -> None:
self._runner = testing.CliRunner()
components._DOCKER_IS_PRESENT = True
patcher = mock.patch('docker.from_env')
self._docker_client = patcher.start().return_value
self._docker_client.images.build.return_value = [{
'stream': 'Build logs'
}]
self._docker_client.images.push.return_value = [{'status': 'Pushed'}]
self.addCleanup(patcher.stop)
self._app = components.app
with contextlib.ExitStack() as stack:
stack.enter_context(self._runner.isolated_filesystem())
self._working_dir = pathlib.Path.cwd()
self.addCleanup(stack.pop_all().close)
return super().setUp()
def assertFileExists(self, path: str):
path_under_test_dir = self._working_dir / path
self.assertTrue(path_under_test_dir, f'File {path} does not exist!')
def assertFileExistsAndContains(self, path: str, expected_content: str):
self.assertFileExists(path)
path_under_test_dir = self._working_dir / path
got_content = path_under_test_dir.read_text()
self.assertEqual(got_content, expected_content)
def testKFPConfigForSingleFile(self):
preprocess_component = _make_component(
func_name='preprocess', target_image='custom-image')
train_component = _make_component(
func_name='train', target_image='custom-image')
_write_components('components.py',
[preprocess_component, train_component])
result = self._runner.invoke(
self._app,
['build', str(self._working_dir)],
)
self.assertEqual(result.exit_code, 0)
self.assertFileExistsAndContains(
'kfp_config.ini',
textwrap.dedent('''\
[Components]
preprocess = components.py
train = components.py
'''))
def testKFPConfigForSingleFileUnderNestedDirectory(self):
preprocess_component = _make_component(
func_name='preprocess', target_image='custom-image')
train_component = _make_component(
func_name='train', target_image='custom-image')
_write_components('dir1/dir2/dir3/components.py',
[preprocess_component, train_component])
result = self._runner.invoke(
self._app,
['build', str(self._working_dir)],
)
self.assertEqual(result.exit_code, 0)
self.assertFileExistsAndContains(
'kfp_config.ini',
textwrap.dedent('''\
[Components]
preprocess = dir1/dir2/dir3/components.py
train = dir1/dir2/dir3/components.py
'''))
def testKFPConfigForMultipleFiles(self):
component = _make_component(
func_name='preprocess', target_image='custom-image')
_write_components('preprocess_component.py', component)
component = _make_component(
func_name='train', target_image='custom-image')
_write_components('train_component.py', component)
result = self._runner.invoke(
self._app,
['build', str(self._working_dir)],
)
self.assertEqual(result.exit_code, 0)
self.assertFileExistsAndContains(
'kfp_config.ini',
textwrap.dedent('''\
[Components]
preprocess = preprocess_component.py
train = train_component.py
'''))
def testKFPConfigForMultipleFilesUnderNestedDirectories(self):
component = _make_component(
func_name='preprocess', target_image='custom-image')
_write_components('preprocess/preprocess_component.py', component)
component = _make_component(
func_name='train', target_image='custom-image')
_write_components('train/train_component.py', component)
result = self._runner.invoke(
self._app,
['build', str(self._working_dir)],
)
self.assertEqual(result.exit_code, 0)
self.assertFileExistsAndContains(
'kfp_config.ini',
textwrap.dedent('''\
[Components]
preprocess = preprocess/preprocess_component.py
train = train/train_component.py
'''))
def testTargetImageMustBeTheSameInAllComponents(self):
component_one = _make_component(func_name='one', target_image='image-1')
component_two = _make_component(func_name='two', target_image='image-1')
_write_components('one_two/one_two.py', [component_one, component_two])
component_three = _make_component(
func_name='three', target_image='image-2')
component_four = _make_component(
func_name='four', target_image='image-3')
_write_components('three_four/three_four.py',
[component_three, component_four])
result = self._runner.invoke(
self._app,
['build', str(self._working_dir)],
)
self.assertEqual(result.exit_code, 1)
def testTargetImageMustBeTheSameInAllComponents(self):
component_one = _make_component(
func_name='one', base_image='image-1', target_image='target-image')
component_two = _make_component(
func_name='two', base_image='image-1', target_image='target-image')
_write_components('one_two/one_two.py', [component_one, component_two])
component_three = _make_component(
func_name='three',
base_image='image-2',
target_image='target-image')
component_four = _make_component(
func_name='four', base_image='image-3', target_image='target-image')
_write_components('three_four/three_four.py',
[component_three, component_four])
result = self._runner.invoke(
self._app,
['build', str(self._working_dir)],
)
self.assertEqual(result.exit_code, 1)
def testComponentFilepatternCanBeUsedToRestrictDiscovery(self):
component = _make_component(
func_name='preprocess', target_image='custom-image')
_write_components('preprocess/preprocess_component.py', component)
component = _make_component(
func_name='train', target_image='custom-image')
_write_components('train/train_component.py', component)
result = self._runner.invoke(
self._app,
[
'build',
str(self._working_dir), '--component-filepattern=train/*'
],
)
self.assertEqual(result.exit_code, 0)
self.assertFileExistsAndContains(
'kfp_config.ini',
textwrap.dedent('''\
[Components]
train = train/train_component.py
'''))
def testEmptyRequirementsTxtFileIsGenerated(self):
component = _make_component(
func_name='train', target_image='custom-image')
_write_components('components.py', component)
result = self._runner.invoke(self._app,
['build', str(self._working_dir)])
self.assertEqual(result.exit_code, 0)
self.assertFileExistsAndContains('requirements.txt',
'# Generated by KFP.\n')
def testExistingRequirementsTxtFileIsUnchanged(self):
component = _make_component(
func_name='train', target_image='custom-image')
_write_components('components.py', component)
_write_file('requirements.txt', 'Some pre-existing content')
result = self._runner.invoke(self._app,
['build', str(self._working_dir)])
self.assertEqual(result.exit_code, 0)
self.assertFileExistsAndContains('requirements.txt',
'Some pre-existing content')
def testDockerignoreFileIsGenerated(self):
component = _make_component(
func_name='train', target_image='custom-image')
_write_components('components.py', component)
result = self._runner.invoke(self._app,
['build', str(self._working_dir)])
self.assertEqual(result.exit_code, 0)
self.assertFileExistsAndContains(
'.dockerignore',
textwrap.dedent('''\
# Generated by KFP.
component_metadata/
'''))
def testExistingDockerignoreFileIsUnchanged(self):
component = _make_component(
func_name='train', target_image='custom-image')
_write_components('components.py', component)
_write_file('.dockerignore', 'Some pre-existing content')
result = self._runner.invoke(self._app,
['build', str(self._working_dir)])
self.assertEqual(result.exit_code, 0)
self.assertFileExistsAndContains('.dockerignore',
'Some pre-existing content')
def testDockerEngineIsSupported(self):
component = _make_component(
func_name='train', target_image='custom-image')
_write_components('components.py', component)
result = self._runner.invoke(
self._app,
['build', str(self._working_dir), '--engine=docker'])
self.assertEqual(result.exit_code, 0)
self._docker_client.api.build.assert_called_once()
self._docker_client.images.push.assert_called_once_with(
'custom-image', stream=True, decode=True)
def testKanikoEngineIsNotSupported(self):
component = _make_component(
func_name='train', target_image='custom-image')
_write_components('components.py', component)
result = self._runner.invoke(
self._app,
['build', str(self._working_dir), '--engine=kaniko'],
)
self.assertEqual(result.exit_code, 1)
self._docker_client.api.build.assert_not_called()
self._docker_client.images.push.assert_not_called()
def testCloudBuildEngineIsNotSupported(self):
component = _make_component(
func_name='train', target_image='custom-image')
_write_components('components.py', component)
result = self._runner.invoke(
self._app,
['build', str(self._working_dir), '--engine=cloudbuild'],
)
self.assertEqual(result.exit_code, 1)
self._docker_client.api.build.assert_not_called()
self._docker_client.images.push.assert_not_called()
def testDockerClientIsCalledToBuildAndPushByDefault(self):
component = _make_component(
func_name='train', target_image='custom-image')
_write_components('components.py', component)
result = self._runner.invoke(
self._app,
['build', str(self._working_dir)],
)
self.assertEqual(result.exit_code, 0)
self._docker_client.api.build.assert_called_once()
self._docker_client.images.push.assert_called_once_with(
'custom-image', stream=True, decode=True)
def testDockerClientIsCalledToBuildButSkipsPushing(self):
component = _make_component(
func_name='train', target_image='custom-image')
_write_components('components.py', component)
result = self._runner.invoke(
self._app,
['build', str(self._working_dir), '--no-push-image'],
)
self.assertEqual(result.exit_code, 0)
self._docker_client.api.build.assert_called_once()
self._docker_client.images.push.assert_not_called()
@mock.patch('kfp.__version__', '1.2.3')
def testDockerfileIsCreatedCorrectly(self):
component = _make_component(
func_name='train', target_image='custom-image')
_write_components('components.py', component)
result = self._runner.invoke(
self._app,
['build', str(self._working_dir)],
)
self.assertEqual(result.exit_code, 0)
self._docker_client.api.build.assert_called_once()
self.assertFileExistsAndContains(
'Dockerfile',
textwrap.dedent('''\
# Generated by KFP.
FROM python:3.7
WORKDIR /usr/local/src/kfp/components
COPY requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install --no-cache-dir kfp==1.8.11
COPY . .
'''))
def testExistingDockerfileIsUnchangedByDefault(self):
component = _make_component(
func_name='train', target_image='custom-image')
_write_components('components.py', component)
_write_file('Dockerfile', 'Existing Dockerfile contents')
result = self._runner.invoke(
self._app,
['build', str(self._working_dir)],
)
self.assertEqual(result.exit_code, 0)
self._docker_client.api.build.assert_called_once()
self.assertFileExistsAndContains('Dockerfile',
'Existing Dockerfile contents')
@mock.patch('kfp.__version__', '1.2.3')
def testExistingDockerfileCanBeOverwritten(self):
component = _make_component(
func_name='train', target_image='custom-image')
_write_components('components.py', component)
_write_file('Dockerfile', 'Existing Dockerfile contents')
result = self._runner.invoke(
self._app,
['build', str(self._working_dir), '--overwrite-dockerfile'],
)
self.assertEqual(result.exit_code, 0)
self._docker_client.api.build.assert_called_once()
self.assertFileExistsAndContains(
'Dockerfile',
textwrap.dedent('''\
# Generated by KFP.
FROM python:3.7
WORKDIR /usr/local/src/kfp/components
COPY requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install --no-cache-dir kfp==1.8.11
COPY . .
'''))
def testDockerfileCanContainCustomKFPPackage(self):
component = _make_component(
func_name='train', target_image='custom-image')
_write_components('components.py', component)
result = self._runner.invoke(
self._app,
[
'build',
str(self._working_dir),
'--kfp-package-path=/Some/localdir/containing/kfp/source'
],
)
self.assertEqual(result.exit_code, 0)
self._docker_client.api.build.assert_called_once()
self.assertFileExistsAndContains(
'Dockerfile',
textwrap.dedent('''\
# Generated by KFP.
FROM python:3.7
WORKDIR /usr/local/src/kfp/components
COPY requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install --no-cache-dir /Some/localdir/containing/kfp/source
COPY . .
'''))
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,13 @@
# Copyright 2019 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License,Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,72 @@
# Lint as: python3
# Copyright 2019 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License,Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Functions for diagnostic data collection from development development."""
import enum
from kfp.cli.diagnose_me import utility
class Commands(enum.Enum):
"""Enum for gcloud and gsutil commands."""
PIP3_LIST = 1
PYTHON3_PIP_LIST = 2
PIP3_VERSION = 3
PYHYON3_PIP_VERSION = 4
WHICH_PYHYON3 = 5
WHICH_PIP3 = 6
_command_string = {
Commands.PIP3_LIST: 'pip3 list',
Commands.PYTHON3_PIP_LIST: 'python3 -m pip list',
Commands.PIP3_VERSION: 'pip3 -V',
Commands.PYHYON3_PIP_VERSION: 'python3 -m pip -V',
Commands.WHICH_PYHYON3: 'which python3',
Commands.WHICH_PIP3: 'which pip3',
}
def get_dev_env_configuration(
configuration: Commands,
human_readable: bool = False) -> utility.ExecutorResponse:
"""Captures the specified environment configuration.
Captures the developement environment configuration including PIP version and
Phython version as specifeid by configuration
Args:
configuration: Commands for specific information to be retrieved
- PIP3LIST: captures pip3 freeze results
- PYTHON3PIPLIST: captuers python3 -m pip freeze results
- PIP3VERSION: captuers pip3 -V results
- PYHYON3PIPVERSION: captuers python3 -m pip -V results
human_readable: If true all output will be in human readable form insted of
Json.
Returns:
A utility.ExecutorResponse with the output results for the specified
command.
"""
command_list = _command_string[configuration].split(' ')
if not human_readable and configuration not in (
Commands.PIP3_VERSION,
Commands.PYHYON3_PIP_VERSION,
Commands.WHICH_PYHYON3,
Commands.WHICH_PIP3,
):
command_list.extend(['--format', 'json'])
return utility.ExecutorResponse().execute_command(command_list)

View File

@ -0,0 +1,63 @@
# Lint as: python3
# Copyright 2019 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License,Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Integration tests for diagnose_me.dev_env."""
import unittest
from typing import Text
from unittest import mock
from kfp.cli.diagnose_me import dev_env, utility
class DevEnvTest(unittest.TestCase):
def test_Commands(self):
"""Verify commands are formaated properly."""
for command in dev_env.Commands:
self.assertIsInstance(dev_env._command_string[command], Text)
self.assertNotIn('\t', dev_env._command_string[command])
self.assertNotIn('\n', dev_env._command_string[command])
@mock.patch.object(utility, 'ExecutorResponse', autospec=True)
def test_dev_env_configuration(self, mock_executor_response):
"""Tests dev_env command execution."""
dev_env.get_dev_env_configuration(dev_env.Commands.PIP3_LIST)
mock_executor_response().execute_command.assert_called_with(
['pip3', 'list', '--format', 'json'])
@mock.patch.object(utility, 'ExecutorResponse', autospec=True)
def test_dev_env_configuration_human_readable(self, mock_executor_response):
"""Tests dev_env command execution."""
dev_env.get_dev_env_configuration(
dev_env.Commands.PIP3_LIST, human_readable=True)
mock_executor_response().execute_command.assert_called_with(
['pip3', 'list'])
@mock.patch.object(utility, 'ExecutorResponse', autospec=True)
def test_dev_env_configuration_version(self, mock_executor_response):
"""Tests dev_env command execution."""
# human readable = false should not set format flag for version calls
dev_env.get_dev_env_configuration(
dev_env.Commands.PIP3_VERSION, human_readable=False)
mock_executor_response().execute_command.assert_called_with(
['pip3', '-V'])
dev_env.get_dev_env_configuration(
dev_env.Commands.PYHYON3_PIP_VERSION, human_readable=False)
mock_executor_response().execute_command.assert_called_with(
['python3', '-m', 'pip', '-V'])
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,153 @@
# Lint as: python3
# Copyright 2019 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License,Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Functions for collecting GCP related environment configurations."""
import enum
from typing import List, Optional, Text
from kfp.cli.diagnose_me import utility
class Commands(enum.Enum):
"""Enum for gcloud and gsutil commands."""
GET_APIS = 1
GET_CONTAINER_CLUSTERS = 2
GET_CONTAINER_IMAGES = 3
GET_DISKS = 4
GET_GCLOUD_DEFAULT = 5
GET_NETWORKS = 6
GET_QUOTAS = 7
GET_SCOPES = 8
GET_SERVICE_ACCOUNTS = 9
GET_STORAGE_BUCKETS = 10
GET_GCLOUD_VERSION = 11
GET_AUTH_LIST = 12
_command_string = {
Commands.GET_APIS: 'services list',
Commands.GET_CONTAINER_CLUSTERS: 'container clusters list',
Commands.GET_CONTAINER_IMAGES: 'container images list',
Commands.GET_DISKS: 'compute disks list',
Commands.GET_GCLOUD_DEFAULT: 'config list --all',
Commands.GET_NETWORKS: 'compute networks list',
Commands.GET_QUOTAS: 'compute regions list',
Commands.GET_SCOPES: 'compute instances list',
Commands.GET_SERVICE_ACCOUNTS: 'iam service-accounts list',
Commands.GET_STORAGE_BUCKETS: 'ls',
Commands.GET_GCLOUD_VERSION: 'version',
Commands.GET_AUTH_LIST: 'auth list',
}
def execute_gcloud_command(
gcloud_command_list: List[Text],
project_id: Optional[Text] = None,
human_readable: Optional[bool] = False) -> utility.ExecutorResponse:
"""Function for invoking gcloud command.
Args:
gcloud_command_list: a command string list to be past to gcloud example
format is ['config', 'list', '--all']
project_id: specificies the project to run the commands against if not
provided provided will use gcloud default project if one is configured
otherwise will return an error message.
human_readable: If false sets parameter --format json for all calls,
otherwie output will be in human readable format.
Returns:
utility.ExecutorResponse with outputs from stdout,stderr and execution code.
"""
command_list = ['gcloud']
command_list.extend(gcloud_command_list)
if not human_readable:
command_list.extend(['--format', 'json'])
if project_id is not None:
command_list.extend(['--project', project_id])
return utility.ExecutorResponse().execute_command(command_list)
def execute_gsutil_command(
gsutil_command_list: List[Text],
project_id: Optional[Text] = None) -> utility.ExecutorResponse:
"""Function for invoking gsutil command.
This function takes in a gsutil parameter list and returns the results as a
list of dictionaries.
Args:
gsutil_command_list: a command string list to be past to gsutil example
format is ['config', 'list', '--all']
project_id: specific project to check the QUOTASs for,if no project id is
provided will use gcloud default project if one is configured otherwise
will return an erro massage.
Returns:
utility.ExecutorResponse with outputs from stdout,stderr and execution code.
"""
command_list = ['gsutil']
command_list.extend(gsutil_command_list)
if project_id is not None:
command_list.extend(['-p', project_id])
return utility.ExecutorResponse().execute_command(command_list)
def get_gcp_configuration(
configuration: Commands,
project_id: Optional[Text] = None,
human_readable: Optional[bool] = False) -> utility.ExecutorResponse:
"""Captures the specified environment configuration.
Captures the environment configuration for the specified setting such as
NETWORKSing configuration, project QUOTASs, etc.
Args:
configuration: Commands for specific information to be retrieved
- APIS: Captures a complete list of enabled APISs and their configuration
details under the specified project.
- CONTAINER_CLUSTERS: List all visible k8 clusters under the project.
- CONTAINER_IMAGES: List of all container images under the project
container repo.
- DISKS: List of storage allocated by the project including notebook
instances as well as k8 pds with corresponding state.
- GCLOUD_DEFAULT: Environment default configuration for gcloud
- NETWORKS: List all NETWORKSs and their configuration under the project.
- QUOTAS: Captures a complete list of QUOTASs for project per
region,returns the results as a list of dictionaries.
- SCOPES: list of SCOPESs for each compute resources in the project.
- SERVICE_ACCOUNTS: List of all service accounts that are enabled under
this project.
- STORAGE_BUCKETS: list of buckets and corresponding access information.
project_id: specific project to check the QUOTASs for,if no project id is
provided will use gcloud default project if one is configured otherwise
will return an error message.
human_readable: If true all output will be in human readable form insted of
Json.
Returns:
A utility.ExecutorResponse with the output results for the specified
command.
"""
# storage bucket call requires execute_gsutil_command
if configuration is Commands.GET_STORAGE_BUCKETS:
return execute_gsutil_command(
[_command_string[Commands.GET_STORAGE_BUCKETS]], project_id)
# For all other cases can execute the command directly
return execute_gcloud_command(_command_string[configuration].split(' '),
project_id, human_readable)

View File

@ -0,0 +1,87 @@
# Lint as: python3
# Copyright 2019 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License,Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for diagnose_me.gcp."""
import unittest
from typing import Text
from unittest import mock
from kfp.cli.diagnose_me import gcp, utility
class GoogleCloudTest(unittest.TestCase):
@mock.patch.object(gcp, 'execute_gcloud_command', autospec=True)
def test_project_configuration_gcloud(self, mock_execute_gcloud_command):
"""Tests gcloud commands."""
gcp.get_gcp_configuration(gcp.Commands.GET_APIS)
mock_execute_gcloud_command.assert_called_once_with(
['services', 'list'], project_id=None, human_readable=False)
@mock.patch.object(gcp, 'execute_gsutil_command', autospec=True)
def test_project_configuration_gsutil(self, mock_execute_gsutil_command):
"""Test Gsutil commands."""
gcp.get_gcp_configuration(gcp.Commands.GET_STORAGE_BUCKETS)
mock_execute_gsutil_command.assert_called_once_with(['ls'],
project_id=None)
def test_Commands(self):
"""Verify commands are formaated properly."""
for command in gcp.Commands:
self.assertIsInstance(gcp._command_string[command], Text)
self.assertNotIn('\t', gcp._command_string[command])
self.assertNotIn('\n', gcp._command_string[command])
@mock.patch.object(utility, 'ExecutorResponse', autospec=True)
def test_execute_gsutil_command(self, mock_executor_response):
"""Test execute_gsutil_command."""
gcp.execute_gsutil_command(
[gcp._command_string[gcp.Commands.GET_STORAGE_BUCKETS]])
mock_executor_response().execute_command.assert_called_once_with(
['gsutil', 'ls'])
gcp.execute_gsutil_command(
[gcp._command_string[gcp.Commands.GET_STORAGE_BUCKETS]],
project_id='test_project')
mock_executor_response().execute_command.assert_called_with(
['gsutil', 'ls', '-p', 'test_project'])
@mock.patch.object(utility, 'ExecutorResponse', autospec=True)
def test_execute_gcloud_command(self, mock_executor_response):
"""Test execute_gcloud_command."""
gcp.execute_gcloud_command(
gcp._command_string[gcp.Commands.GET_APIS].split(' '))
mock_executor_response().execute_command.assert_called_once_with(
['gcloud', 'services', 'list', '--format', 'json'])
gcp.execute_gcloud_command(
gcp._command_string[gcp.Commands.GET_APIS].split(' '),
project_id='test_project')
# verify project id is added correctly
mock_executor_response().execute_command.assert_called_with([
'gcloud', 'services', 'list', '--format', 'json', '--project',
'test_project'
])
# verify human_readable removes json fromat flag
gcp.execute_gcloud_command(
gcp._command_string[gcp.Commands.GET_APIS].split(' '),
project_id='test_project',
human_readable=True)
mock_executor_response().execute_command.assert_called_with(
['gcloud', 'services', 'list', '--project', 'test_project'])
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,125 @@
# Lint as: python3
# Copyright 2019 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License,Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Functions for collecting diagnostic information on Kubernetes cluster."""
import enum
from typing import List, Text
from kfp.cli.diagnose_me import utility
class Commands(enum.Enum):
"""Enum for kubernetes commands."""
GET_CONFIGURED_CONTEXT = 1
GET_PODS = 2
GET_PVCS = 3
GET_PVS = 4
GET_SECRETS = 5
GET_SERVICES = 6
GET_KUBECTL_VERSION = 7
GET_CONFIG_MAPS = 8
_command_string = {
Commands.GET_CONFIGURED_CONTEXT: 'config view',
Commands.GET_PODS: 'get pods',
Commands.GET_PVCS: 'get pvc',
Commands.GET_PVS: 'get pv',
Commands.GET_SECRETS: 'get secrets',
Commands.GET_SERVICES: 'get services',
Commands.GET_KUBECTL_VERSION: 'version',
Commands.GET_CONFIG_MAPS: 'get configmaps',
}
def execute_kubectl_command(
kubectl_command_list: List[Text],
human_readable: bool = False) -> utility.ExecutorResponse:
"""Invokes the kubectl command.
Args:
kubectl_command_list: a command string list to be past to kubectl example
format is ['config', 'view']
human_readable: If false sets parameter -o json for all calls, otherwie
output will be in human readable format.
Returns:
utility.ExecutorResponse with outputs from stdout,stderr and execution code.
"""
command_list = ['kubectl']
command_list.extend(kubectl_command_list)
if not human_readable:
command_list.extend(['-o', 'json'])
return utility.ExecutorResponse().execute_command(command_list)
def get_kubectl_configuration(
configuration: Commands,
kubernetes_context: Text = None,
namespace: Text = None,
human_readable: bool = False) -> utility.ExecutorResponse:
"""Captures the specified environment configuration.
Captures the environment state for the specified setting such as current
context, active pods, etc and returns it in as a dictionary format. if no
context is specified the system will use the current_context or error out of
none is specified.
Args:
configuration:
- K8_CONFIGURED_CONTEXT: returns all k8 configuration available in the
current env including current_context.
- PODS: returns all pods and their status details.
- PVCS: returns all PersistentVolumeClaim and their status details.
- SECRETS: returns all accessible k8 secrests.
- PVS: returns all PersistentVolume and their status details.
- SERVICES: returns all services and their status details.
kubernetes_context: Context to use to retrieve cluster specific commands, if
set to None calls will rely on current_context configured.
namespace: default name space to be used for the commaand, if not specifeid
--all-namespaces will be used.
human_readable: If true all output will be in human readable form insted of
Json.
Returns:
A list of dictionaries matching gcloud / gsutil output for the specified
configuration,or an error message if any occurs during execution.
"""
if configuration in (Commands.GET_CONFIGURED_CONTEXT,
Commands.GET_KUBECTL_VERSION):
return execute_kubectl_command(
(_command_string[configuration]).split(' '), human_readable)
execution_command = _command_string[configuration].split(' ')
if kubernetes_context:
execution_command.extend(['--context', kubernetes_context])
if namespace:
execution_command.extend(['--namespace', namespace])
else:
execution_command.extend(['--all-namespaces'])
return execute_kubectl_command(execution_command, human_readable)
def _get_kfp_runtime() -> Text:
"""Captures the current version of kpf in k8 cluster.
Returns:
Returns the run-time version of kfp in as a string.
"""
# TODO(chavoshi) needs to be implemented.
raise NotImplementedError

View File

@ -0,0 +1,77 @@
# Lint as: python3
# Copyright 2019 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License,Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for diagnose_me.kubernetes_cluster."""
import unittest
from typing import Text
from unittest import mock
from kfp.cli.diagnose_me import kubernetes_cluster as dkc
from kfp.cli.diagnose_me import utility
class KubernetesClusterTest(unittest.TestCase):
@mock.patch.object(dkc, 'execute_kubectl_command', autospec=True)
def test_project_configuration_gcloud(self, mock_execute_kubectl_command):
"""Tests gcloud commands."""
dkc.get_kubectl_configuration(dkc.Commands.GET_PODS)
mock_execute_kubectl_command.assert_called_once_with(
['get', 'pods', '--all-namespaces'], human_readable=False)
dkc.get_kubectl_configuration(dkc.Commands.GET_CONFIGURED_CONTEXT)
mock_execute_kubectl_command.assert_called_with(['config', 'view'],
human_readable=False)
dkc.get_kubectl_configuration(dkc.Commands.GET_KUBECTL_VERSION)
mock_execute_kubectl_command.assert_called_with(['version'],
human_readable=False)
dkc.get_kubectl_configuration(
dkc.Commands.GET_PODS, kubernetes_context='test_context')
mock_execute_kubectl_command.assert_called_with(
['get', 'pods', '--context', 'test_context', '--all-namespaces'],
human_readable=False)
dkc.get_kubectl_configuration(
dkc.Commands.GET_PODS, kubernetes_context='test_context')
mock_execute_kubectl_command.assert_called_with(
['get', 'pods', '--context', 'test_context', '--all-namespaces'],
human_readable=False)
def test_Commands(self):
"""Verify commands are formaated properly."""
for command in dkc.Commands:
self.assertIsInstance(dkc._command_string[command], Text)
self.assertNotIn('\t', dkc._command_string[command])
self.assertNotIn('\n', dkc._command_string[command])
@mock.patch.object(utility, 'ExecutorResponse', autospec=True)
def test_execute_kubectl_command(self, mock_executor_response):
"""Test execute_gsutil_command."""
dkc.execute_kubectl_command(
[dkc._command_string[dkc.Commands.GET_KUBECTL_VERSION]])
mock_executor_response().execute_command.assert_called_once_with(
['kubectl', 'version', '-o', 'json'])
dkc.execute_kubectl_command(
[dkc._command_string[dkc.Commands.GET_KUBECTL_VERSION]],
human_readable=True)
mock_executor_response().execute_command.assert_called_with(
['kubectl', 'version'])
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,91 @@
# Lint as: python3
# Copyright 2019 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License,Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Supporting tools and classes for diagnose_me."""
import json
import subprocess
from typing import List, Text
class ExecutorResponse(object):
"""Class for keeping track of output of _executor methods.
Data model for executing commands and capturing their response. This class
defines the data model layer for execution results, based on MVC design
pattern.
TODO() This class should be extended to contain data structure to better
represent the underlying data instaed of dict for various response types.
"""
def execute_command(self, command_list: List[Text]):
"""Executes the command in command_list.
sets values for _stdout,_std_err, and returncode accordingly.
TODO(): This method is kept in ExecutorResponse for simplicity, however this
deviates from MVP design pattern. It should be factored out in future.
Args:
command_list: A List of strings that represts the command and parameters
to be executed.
Returns:
Instance of utility.ExecutorResponse.
"""
try:
# TODO() switch to process.run to simplify the code.
process = subprocess.Popen(
command_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
self._stdout = stdout.decode('utf-8')
self._stderr = stderr.decode('utf-8')
self._returncode = process.returncode
except OSError as e:
self._stderr = e
self._stdout = ''
self._returncode = e.errno
self._parse_raw_input()
return self
def _parse_raw_input(self):
"""Parses the raw input and popluates _json and _parsed properies."""
try:
self._parsed_output = json.loads(self._stdout)
self._json = self._stdout
except json.JSONDecodeError:
self._json = json.dumps(self._stdout)
self._parsed_output = self._stdout
@property
def parsed_output(self) -> Text:
"""Json load results of stdout or raw results if stdout was not
Json."""
return self._parsed_output
@property
def has_error(self) -> bool:
"""Returns true if execution error code was not 0."""
return self._returncode != 0
@property
def json_output(self) -> Text:
"""Run results in stdout in json format."""
return self._parsed_output
@property
def stderr(self):
return self._stderr

View File

@ -0,0 +1,44 @@
# Lint as: python3
# Copyright 2019 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License,Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for diagnose_me.utility."""
import unittest
from kfp.cli.diagnose_me import utility
class UtilityTest(unittest.TestCase):
def test_parse_raw_input_json(self):
"""Testing json stdout is correctly parsed."""
response = utility.ExecutorResponse()
response._stdout = '{"key":"value"}'
response._parse_raw_input()
self.assertEqual(response._json, '{"key":"value"}')
self.assertEqual(response._parsed_output, {'key': 'value'})
def test_parse_raw_input_text(self):
"""Testing non-json stdout is correctly parsed."""
response = utility.ExecutorResponse()
response._stdout = 'non-json string'
response._parse_raw_input()
self.assertEqual(response._json, '"non-json string"')
self.assertEqual(response._parsed_output, 'non-json string')
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,107 @@
# Lint as: python3
"""CLI interface for KFP diagnose_me tool."""
import json as json_library
import sys
from typing import Dict, Text
import click
from kfp.cli.diagnose_me import dev_env, gcp
from kfp.cli.diagnose_me import kubernetes_cluster as k8
from kfp.cli.diagnose_me import utility
@click.group()
def diagnose_me():
"""Prints diagnoses information for KFP environment."""
pass
@diagnose_me.command()
@click.option(
'-j',
'--json',
is_flag=True,
help='Output in Json format, human readable format is set by default.')
@click.option(
'-p',
'--project-id',
type=Text,
help='Target project id. It will use environment default if not specified.')
@click.option(
'-n',
'--namespace',
type=Text,
help='Namespace to use for Kubernetes cluster.all-namespaces is used if not specified.'
)
@click.pass_context
def diagnose_me(ctx: click.Context, json: bool, project_id: str,
namespace: str):
"""Runs environment diagnostic with specified parameters.
Feature stage:
[Alpha](https://github.com/kubeflow/pipelines/blob/07328e5094ac2981d3059314cc848fbb71437a76/docs/release/feature-stages.md#alpha)
"""
# validate kubectl, gcloud , and gsutil exist
local_env_gcloud_sdk = gcp.get_gcp_configuration(
gcp.Commands.GET_GCLOUD_VERSION,
project_id=project_id,
human_readable=False)
for app in ['Google Cloud SDK', 'gsutil', 'kubectl']:
if app not in local_env_gcloud_sdk.json_output:
raise RuntimeError(
'%s is not installed, gcloud, gsutil and kubectl are required '
% app + 'for this app to run. Please follow instructions at ' +
'https://cloud.google.com/sdk/install to install the SDK.')
click.echo('Collecting diagnostic information ...', file=sys.stderr)
# default behaviour dump all configurations
results = {}
for gcp_command in gcp.Commands:
results[gcp_command] = gcp.get_gcp_configuration(
gcp_command, project_id=project_id, human_readable=not json)
for k8_command in k8.Commands:
results[k8_command] = k8.get_kubectl_configuration(
k8_command, human_readable=not json)
for dev_env_command in dev_env.Commands:
results[dev_env_command] = dev_env.get_dev_env_configuration(
dev_env_command, human_readable=not json)
print_to_sdtout(results, not json)
def print_to_sdtout(results: Dict[str, utility.ExecutorResponse],
human_readable: bool):
"""Viewer to print the ExecutorResponse results to stdout.
Args:
results: A dictionary with key:command names and val: Execution response
human_readable: Print results in human readable format. If set to True
command names will be printed as visual delimiters in new lines. If False
results are printed as a dictionary with command as key.
"""
output_dict = {}
human_readable_result = []
for key, val in results.items():
if val.has_error:
output_dict[
key.
name] = 'Following error occurred during the diagnoses: %s' % val.stderr
continue
output_dict[key.name] = val.json_output
human_readable_result.append('================ %s ===================' %
(key.name))
human_readable_result.append(val.parsed_output)
if human_readable:
result = '\n'.join(human_readable_result)
else:
result = json_library.dumps(
output_dict, sort_keys=True, indent=2, separators=(',', ': '))
click.echo(result)

View File

@ -0,0 +1,144 @@
import json
from typing import List
import click
import kfp_server_api
from kfp.cli.output import OutputFormat, print_output
from kfp_server_api.models.api_experiment import ApiExperiment
@click.group()
def experiment():
"""Manage experiment resources."""
pass
@experiment.command()
@click.option('-d', '--description', help="Description of the experiment.")
@click.argument("name")
@click.pass_context
def create(ctx: click.Context, description: str, name: str):
"""Create an experiment."""
client = ctx.obj["client"]
output_format = ctx.obj["output"]
response = client.create_experiment(name, description=description)
_display_experiment(response, output_format)
@experiment.command()
@click.option(
'--page-token', default='', help="Token for starting of the page.")
@click.option(
'-m', '--max-size', default=100, help="Max size of the listed experiments.")
@click.option(
'--sort-by',
default="created_at desc",
help="Can be '[field_name]', '[field_name] desc'. For example, 'name desc'."
)
@click.option(
'--filter',
help=(
"filter: A url-encoded, JSON-serialized Filter protocol buffer "
"(see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto))."
))
@click.pass_context
def list(ctx: click.Context, page_token: str, max_size: int, sort_by: str,
filter: str):
"""List experiments."""
client = ctx.obj['client']
output_format = ctx.obj['output']
response = client.list_experiments(
page_token=page_token,
page_size=max_size,
sort_by=sort_by,
filter=filter)
if response.experiments:
_display_experiments(response.experiments, output_format)
else:
if output_format == OutputFormat.json.name:
msg = json.dumps([])
else:
msg = "No experiments found"
click.echo(msg)
@experiment.command()
@click.argument("experiment-id")
@click.pass_context
def get(ctx: click.Context, experiment_id: str):
"""Get detailed information about an experiment."""
client = ctx.obj["client"]
output_format = ctx.obj["output"]
response = client.get_experiment(experiment_id)
_display_experiment(response, output_format)
@experiment.command()
@click.argument("experiment-id")
@click.pass_context
def delete(ctx: click.Context, experiment_id: str):
"""Delete an experiment."""
confirmation = "Caution. The RunDetails page could have an issue" \
" when it renders a run that has no experiment." \
" Do you want to continue?"
if not click.confirm(confirmation):
return
client = ctx.obj["client"]
client.delete_experiment(experiment_id)
click.echo("{} is deleted.".format(experiment_id))
def _display_experiments(experiments: List[ApiExperiment],
output_format: OutputFormat):
headers = ["Experiment ID", "Name", "Created at"]
data = [
[exp.id, exp.name, exp.created_at.isoformat()] for exp in experiments
]
print_output(data, headers, output_format, table_format="grid")
def _display_experiment(exp: kfp_server_api.ApiExperiment,
output_format: OutputFormat):
table = [
["ID", exp.id],
["Name", exp.name],
["Description", exp.description],
["Created at", exp.created_at.isoformat()],
]
if output_format == OutputFormat.table.name:
print_output([], ["Experiment Details"], output_format)
print_output(table, [], output_format, table_format="plain")
elif output_format == OutputFormat.json.name:
print_output(dict(table), [], output_format)
@experiment.command()
@click.option(
"--experiment-id",
default=None,
help="The ID of the experiment to archive, can only supply either an experiment ID or name."
)
@click.option(
"--experiment-name",
default=None,
help="The name of the experiment to archive, can only supply either an experiment ID or name."
)
@click.pass_context
def archive(ctx: click.Context, experiment_id: str, experiment_name: str):
"""Archive an experiment."""
client = ctx.obj["client"]
if (experiment_id is None) == (experiment_name is None):
raise ValueError('Either experiment_id or experiment_name is required')
if not experiment_id:
experiment = client.get_experiment(experiment_name=experiment_name)
experiment_id = experiment.id
client.archive_experiment(experiment_id=experiment_id)

View File

@ -0,0 +1,63 @@
# Copyright 2020 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from enum import Enum, unique
from typing import Union
import click
from tabulate import tabulate
@unique
class OutputFormat(Enum):
"""Enumerated class with the allowed output format constants."""
table = "table"
json = "json"
def print_output(data: Union[list, dict],
headers: list,
output_format: str,
table_format: str = "simple"):
"""Prints the output from the cli command execution based on the specified
format.
Args:
data (Union[list, dict]): Nested list of values representing the rows to be printed.
headers (list): List of values representing the column names to be printed
for the ``data``.
output_format (str): The desired formatting of the text from the command output.
table_format (str): The formatting for the table ``output_format``.
Default value set to ``simple``.
Returns:
None: Prints the output.
Raises:
NotImplementedError: If the ``output_format`` is unknown.
"""
if output_format == OutputFormat.table.name:
click.echo(tabulate(data, headers=headers, tablefmt=table_format))
elif output_format == OutputFormat.json.name:
if not headers:
output = data
else:
output = []
for row in data:
output.append(dict(zip(headers, row)))
click.echo(json.dumps(output, indent=4))
else:
raise NotImplementedError(
"Unknown Output Format: {}".format(output_format))

View File

@ -0,0 +1,264 @@
# Copyright 2019 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from typing import List, Optional
import click
import kfp_server_api
from kfp.cli.output import OutputFormat, print_output
@click.group()
def pipeline():
"""Manage pipeline resources."""
pass
@pipeline.command()
@click.option("-p", "--pipeline-name", help="Name of the pipeline.")
@click.option("-d", "--description", help="Description for the pipeline.")
@click.argument("package-file")
@click.pass_context
def upload(ctx: click.Context,
pipeline_name: str,
package_file: str,
description: str = None):
"""Upload a KFP pipeline."""
client = ctx.obj["client"]
output_format = ctx.obj["output"]
if not pipeline_name:
pipeline_name = package_file.split(".")[0]
pipeline = client.upload_pipeline(package_file, pipeline_name, description)
_display_pipeline(pipeline, output_format)
@pipeline.command()
@click.option("-p", "--pipeline-id", help="ID of the pipeline", required=False)
@click.option("-n", "--pipeline-name", help="Name of pipeline", required=False)
@click.option(
"-v",
"--pipeline-version",
help="Name of the pipeline version",
required=True)
@click.argument("package-file")
@click.pass_context
def upload_version(ctx: click.Context,
package_file: str,
pipeline_version: str,
pipeline_id: Optional[str] = None,
pipeline_name: Optional[str] = None):
"""Upload a version of the KFP pipeline."""
client = ctx.obj["client"]
output_format = ctx.obj["output"]
if bool(pipeline_id) == bool(pipeline_name):
raise ValueError("Need to supply 'pipeline-name' or 'pipeline-id'")
if pipeline_name is not None:
pipeline_id = client.get_pipeline_id(name=pipeline_name)
if pipeline_id is None:
raise ValueError("Can't find a pipeline with name: %s" %
pipeline_name)
version = client.pipeline_uploads.upload_pipeline_version(
package_file, name=pipeline_version, pipelineid=pipeline_id)
_display_pipeline_version(version, output_format)
@pipeline.command()
@click.option(
'--page-token', default='', help="Token for starting of the page.")
@click.option(
'-m', '--max-size', default=100, help="Max size of the listed pipelines.")
@click.option(
'--sort-by',
default="created_at desc",
help="Can be '[field_name]', '[field_name] desc'. For example, 'name desc'."
)
@click.option(
'--filter',
help=(
"filter: A url-encoded, JSON-serialized Filter protocol buffer "
"(see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto))."
))
@click.pass_context
def list(ctx: click.Context, page_token: str, max_size: int, sort_by: str,
filter: str):
"""List uploaded KFP pipelines."""
client = ctx.obj["client"]
output_format = ctx.obj["output"]
response = client.list_pipelines(
page_token=page_token,
page_size=max_size,
sort_by=sort_by,
filter=filter)
if response.pipelines:
_print_pipelines(response.pipelines, output_format)
else:
if output_format == OutputFormat.json.name:
msg = json.dumps([])
else:
msg = "No pipelines found"
click.echo(msg)
@pipeline.command()
@click.argument("pipeline-id")
@click.option(
'--page-token', default='', help="Token for starting of the page.")
@click.option(
'-m',
'--max-size',
default=100,
help="Max size of the listed pipeline versions.")
@click.option(
'--sort-by',
default="created_at desc",
help="Can be '[field_name]', '[field_name] desc'. For example, 'name desc'."
)
@click.option(
'--filter',
help=(
"filter: A url-encoded, JSON-serialized Filter protocol buffer "
"(see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto))."
))
@click.pass_context
def list_versions(ctx: click.Context, pipeline_id: str, page_token: str,
max_size: int, sort_by: str, filter: str):
"""List versions of an uploaded KFP pipeline."""
client = ctx.obj["client"]
output_format = ctx.obj["output"]
response = client.list_pipeline_versions(
pipeline_id,
page_token=page_token,
page_size=max_size,
sort_by=sort_by,
filter=filter)
if response.versions:
_print_pipeline_versions(response.versions, output_format)
else:
if output_format == OutputFormat.json.name:
msg = json.dumps([])
else:
msg = "No pipeline or version found"
click.echo(msg)
@pipeline.command()
@click.argument("version-id")
@click.pass_context
def delete_version(ctx: click.Context, version_id: str):
"""Delete pipeline version.
Args:
version_id: id of the pipeline version.
Returns:
Object. If the method is called asynchronously, returns the request thread.
Throws:
Exception if pipeline version is not found.
"""
client = ctx.obj["client"]
return client.delete_pipeline_version(version_id)
@pipeline.command()
@click.argument("pipeline-id")
@click.pass_context
def get(ctx: click.Context, pipeline_id: str):
"""Get detailed information about an uploaded KFP pipeline."""
client = ctx.obj["client"]
output_format = ctx.obj["output"]
pipeline = client.get_pipeline(pipeline_id)
_display_pipeline(pipeline, output_format)
@pipeline.command()
@click.argument("pipeline-id")
@click.pass_context
def delete(ctx: click.Context, pipeline_id: str):
"""Delete an uploaded KFP pipeline."""
client = ctx.obj["client"]
client.delete_pipeline(pipeline_id)
click.echo(f"{pipeline_id} is deleted")
def _print_pipelines(pipelines: List[kfp_server_api.ApiPipeline],
output_format: OutputFormat):
headers = ["Pipeline ID", "Name", "Uploaded at"]
data = [[pipeline.id, pipeline.name,
pipeline.created_at.isoformat()] for pipeline in pipelines]
print_output(data, headers, output_format, table_format="grid")
def _print_pipeline_versions(versions: List[kfp_server_api.ApiPipelineVersion],
output_format: OutputFormat):
headers = ["Version ID", "Version name", "Uploaded at", "Pipeline ID"]
data = [[
version.id, version.name,
version.created_at.isoformat(),
next(rr
for rr in version.resource_references
if rr.key.type == kfp_server_api.ApiResourceType.PIPELINE).key.id
]
for version in versions]
print_output(data, headers, output_format, table_format="grid")
def _display_pipeline(pipeline: kfp_server_api.ApiPipeline,
output_format: OutputFormat):
# Pipeline information
table = [["Pipeline ID", pipeline.id], ["Name", pipeline.name],
["Description", pipeline.description],
["Uploaded at", pipeline.created_at.isoformat()],
["Version ID", pipeline.default_version.id]]
# Pipeline parameter details
headers = ["Parameter Name", "Default Value"]
data = []
if pipeline.parameters is not None:
data = [[param.name, param.value] for param in pipeline.parameters]
if output_format == OutputFormat.table.name:
print_output([], ["Pipeline Details"], output_format)
print_output(table, [], output_format, table_format="plain")
print_output(data, headers, output_format, table_format="grid")
elif output_format == OutputFormat.json.name:
output = dict()
output["Pipeline Details"] = dict(table)
params = []
for item in data:
params.append(dict(zip(headers, item)))
output["Pipeline Parameters"] = params
print_output(output, [], output_format)
def _display_pipeline_version(version: kfp_server_api.ApiPipelineVersion,
output_format: OutputFormat):
pipeline_id = next(
rr for rr in version.resource_references
if rr.key.type == kfp_server_api.ApiResourceType.PIPELINE).key.id
table = [["Pipeline ID", pipeline_id], ["Version name", version.name],
["Uploaded at", version.created_at.isoformat()],
["Version ID", version.id]]
if output_format == OutputFormat.table.name:
print_output([], ["Pipeline Version Details"], output_format)
print_output(table, [], output_format, table_format="plain")
elif output_format == OutputFormat.json.name:
print_output(dict(table), [], output_format)

View File

@ -0,0 +1,216 @@
# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from typing import Any, Dict, List, Optional
import click
import kfp_server_api
from kfp.cli.output import OutputFormat, print_output
@click.group()
def recurring_run():
"""Manage recurring-run resources."""
pass
@recurring_run.command()
@click.option(
'--catchup/--no-catchup',
help='Whether the recurring run should catch up if behind schedule.',
type=bool)
@click.option(
'--cron-expression',
help='A cron expression representing a set of times, using 6 space-separated fields, e.g. "0 0 9 ? * 2-6".'
)
@click.option('--description', help='The description of the recurring run.')
@click.option(
'--enable-caching/--disable-caching',
help='Optional. Whether or not to enable caching for the run.',
type=bool)
@click.option(
'--enabled/--disabled',
help='A bool indicating whether the recurring run is enabled or disabled.',
type=bool)
@click.option(
'--end-time',
help='The RFC3339 time string of the time when to end the job.')
@click.option(
'--experiment-id',
help='The ID of the experiment to create the recurring run under, can only supply either an experiment ID or name.'
)
@click.option(
'--experiment-name',
help='The name of the experiment to create the recurring run under, can only supply either an experiment ID or name.'
)
@click.option('--job-name', help='The name of the recurring run.')
@click.option(
'--interval-second',
help='Integer indicating the seconds between two recurring runs in for a periodic schedule.'
)
@click.option(
'--max-concurrency',
help='Integer indicating how many jobs can be run in parallel.',
type=int)
@click.option(
'--pipeline-id',
help='The ID of the pipeline to use to create the recurring run.')
@click.option(
'--pipeline-package-path',
help='Local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml).'
)
@click.option(
'--start-time',
help='The RFC3339 time string of the time when to start the job.')
@click.option('--version-id', help='The id of a pipeline version.')
@click.argument("args", nargs=-1)
@click.pass_context
def create(ctx: click.Context,
job_name: str,
experiment_id: Optional[str] = None,
experiment_name: Optional[str] = None,
catchup: Optional[bool] = None,
cron_expression: Optional[str] = None,
enabled: Optional[bool] = None,
description: Optional[str] = None,
enable_caching: Optional[bool] = None,
end_time: Optional[str] = None,
interval_second: Optional[int] = None,
max_concurrency: Optional[int] = None,
params: Optional[dict] = None,
pipeline_package_path: Optional[str] = None,
pipeline_id: Optional[str] = None,
start_time: Optional[str] = None,
version_id: Optional[str] = None,
args: Optional[List[str]] = None):
"""Create a recurring run."""
client = ctx.obj["client"]
output_format = ctx.obj['output']
if (experiment_id is None) == (experiment_name is None):
raise ValueError('Either experiment_id or experiment_name is required')
if not experiment_id:
experiment = client.create_experiment(experiment_name)
experiment_id = experiment.id
# Ensure we only split on the first equals char so the value can contain
# equals signs too.
split_args: List = [arg.split("=", 1) for arg in args or []]
params: Dict[str, Any] = dict(split_args)
recurring_run = client.create_recurring_run(
cron_expression=cron_expression,
description=description,
enabled=enabled,
enable_caching=enable_caching,
end_time=end_time,
experiment_id=experiment_id,
interval_second=interval_second,
job_name=job_name,
max_concurrency=max_concurrency,
no_catchup=not catchup,
params=params,
pipeline_package_path=pipeline_package_path,
pipeline_id=pipeline_id,
start_time=start_time,
version_id=version_id)
_display_recurring_run(recurring_run, output_format)
@recurring_run.command()
@click.option(
'-e',
'--experiment-id',
help='Parent experiment ID of listed recurring runs.')
@click.option(
'--page-token', default='', help="Token for starting of the page.")
@click.option(
'-m',
'--max-size',
default=100,
help="Max size of the listed recurring runs.")
@click.option(
'--sort-by',
default="created_at desc",
help="Can be '[field_name]', '[field_name] desc'. For example, 'name desc'."
)
@click.option(
'--filter',
help=(
"filter: A url-encoded, JSON-serialized Filter protocol buffer "
"(see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto))."
))
@click.pass_context
def list(ctx: click.Context, experiment_id: str, page_token: str, max_size: int,
sort_by: str, filter: str):
"""List recurring runs."""
client = ctx.obj['client']
output_format = ctx.obj['output']
response = client.list_recurring_runs(
experiment_id=experiment_id,
page_token=page_token,
page_size=max_size,
sort_by=sort_by,
filter=filter)
if response.jobs:
_display_recurring_runs(response.jobs, output_format)
else:
if output_format == OutputFormat.json.name:
msg = json.dumps([])
else:
msg = "No recurring runs found"
click.echo(msg)
@recurring_run.command()
@click.argument("job-id")
@click.pass_context
def get(ctx: click.Context, job_id: str):
"""Get detailed information about an experiment."""
client = ctx.obj["client"]
output_format = ctx.obj["output"]
response = client.get_recurring_run(job_id)
_display_recurring_run(response, output_format)
@recurring_run.command()
@click.argument("job-id")
@click.pass_context
def delete(ctx: click.Context, job_id: str):
"""Delete a recurring run."""
client = ctx.obj["client"]
client.delete_job(job_id)
def _display_recurring_runs(recurring_runs: List[kfp_server_api.ApiJob],
output_format: OutputFormat):
headers = ["Recurring Run ID", "Name"]
data = [[rr.id, rr.name] for rr in recurring_runs]
print_output(data, headers, output_format, table_format="grid")
def _display_recurring_run(recurring_run: kfp_server_api.ApiJob,
output_format: OutputFormat):
table = [
["Recurring Run ID", recurring_run.id],
["Name", recurring_run.name],
]
if output_format == OutputFormat.table.name:
print_output([], ["Recurring Run Details"], output_format)
print_output(table, [], output_format, table_format="plain")
elif output_format == OutputFormat.json.name:
print_output(dict(table), [], output_format)

228
sdk/python/kfp/cli/run.py Normal file
View File

@ -0,0 +1,228 @@
# Copyright 2018 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import json
import shutil
import subprocess
import sys
import time
from typing import List
import click
import kfp_server_api
from kfp.cli.output import OutputFormat, print_output
from kfp.client import Client
@click.group()
def run():
"""manage run resources."""
pass
@run.command()
@click.option(
'-e', '--experiment-id', help='Parent experiment ID of listed runs.')
@click.option(
'--page-token', default='', help="Token for starting of the page.")
@click.option(
'-m', '--max-size', default=100, help="Max size of the listed runs.")
@click.option(
'--sort-by',
default="created_at desc",
help="Can be '[field_name]', '[field_name] desc'. For example, 'name desc'."
)
@click.option(
'--filter',
help=(
"filter: A url-encoded, JSON-serialized Filter protocol buffer "
"(see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto))."
))
@click.pass_context
def list(ctx: click.Context, experiment_id: str, page_token: str, max_size: int,
sort_by: str, filter: str):
"""list recent KFP runs."""
client = ctx.obj['client']
output_format = ctx.obj['output']
response = client.list_runs(
experiment_id=experiment_id,
page_token=page_token,
page_size=max_size,
sort_by=sort_by,
filter=filter)
if response and response.runs:
_print_runs(response.runs, output_format)
else:
if output_format == OutputFormat.json.name:
msg = json.dumps([])
else:
msg = 'No runs found.'
click.echo(msg)
@run.command()
@click.option(
'-e',
'--experiment-name',
required=True,
help='Experiment name of the run.')
@click.option('-r', '--run-name', help='Name of the run.')
@click.option(
'-f',
'--package-file',
type=click.Path(exists=True, dir_okay=False),
help='Path of the pipeline package file.')
@click.option('-p', '--pipeline-id', help='ID of the pipeline template.')
@click.option('-n', '--pipeline-name', help='Name of the pipeline template.')
@click.option(
'-w',
'--watch',
is_flag=True,
default=False,
help='Watch the run status until it finishes.')
@click.option('-v', '--version', help='ID of the pipeline version.')
@click.option(
'-t',
'--timeout',
default=0,
help='Wait for a run to complete until timeout in seconds.',
type=int)
@click.argument('args', nargs=-1)
@click.pass_context
def submit(ctx: click.Context, experiment_name: str, run_name: str,
package_file: str, pipeline_id: str, pipeline_name: str, watch: bool,
timeout: int, version: str, args: List[str]):
"""submit a KFP run."""
client = ctx.obj['client']
namespace = ctx.obj['namespace']
output_format = ctx.obj['output']
if not run_name:
run_name = experiment_name
if not pipeline_id and pipeline_name:
pipeline_id = client.get_pipeline_id(name=pipeline_name)
if not package_file and not pipeline_id and not version:
click.echo(
'You must provide one of [package_file, pipeline_id, version].',
err=True)
sys.exit(1)
arg_dict = dict(arg.split('=', maxsplit=1) for arg in args)
experiment = client.create_experiment(experiment_name)
run = client.run_pipeline(
experiment.id,
run_name,
package_file,
arg_dict,
pipeline_id,
version_id=version)
if timeout > 0:
_wait_for_run_completion(client, run.id, timeout, output_format)
else:
_display_run(client, namespace, run.id, watch, output_format)
@run.command()
@click.option(
'-w',
'--watch',
is_flag=True,
default=False,
help='Watch the run status until it finishes.')
@click.option(
'-d',
'--detail',
is_flag=True,
default=False,
help='Get detailed information of the run in json format.')
@click.argument('run-id')
@click.pass_context
def get(ctx: click.Context, watch: bool, detail: bool, run_id: str):
"""display the details of a KFP run."""
client = ctx.obj['client']
namespace = ctx.obj['namespace']
output_format = ctx.obj['output']
_display_run(client, namespace, run_id, watch, output_format, detail)
def _display_run(client: click.Context,
namespace: str,
run_id: str,
watch: bool,
output_format: OutputFormat,
detail: bool = False):
run = client.get_run(run_id).run
if detail:
data = {
key:
value.isoformat() if isinstance(value, datetime.datetime) else value
for key, value in run.to_dict().items()
if key not in ['pipeline_spec'
] # useless but too much detailed field
}
click.echo(data)
return
_print_runs([run], output_format)
if not watch:
return
argo_path = shutil.which('argo')
if not argo_path:
raise RuntimeError(
"argo isn't found in $PATH. It's necessary for watch. "
"Please make sure it's installed and available. "
"Installation instructions be found here - "
"https://github.com/argoproj/argo-workflows/releases")
argo_workflow_name = None
while True:
time.sleep(1)
run_detail = client.get_run(run_id)
run = run_detail.run
if run_detail.pipeline_runtime and run_detail.pipeline_runtime.workflow_manifest:
manifest = json.loads(run_detail.pipeline_runtime.workflow_manifest)
if manifest['metadata'] and manifest['metadata']['name']:
argo_workflow_name = manifest['metadata']['name']
break
if run_detail.run.status in ['Succeeded', 'Skipped', 'Failed', 'Error']:
click.echo('Run is finished with status {}.'.format(
run_detail.run.status))
return
if argo_workflow_name:
subprocess.run(
[argo_path, 'watch', argo_workflow_name, '-n', namespace])
_print_runs([run], output_format)
def _wait_for_run_completion(client: Client, run_id: str, timeout: int,
output_format: OutputFormat):
run_detail = client.wait_for_run_completion(run_id, timeout)
_print_runs([run_detail.run], output_format)
def _print_runs(runs: List[kfp_server_api.ApiRun], output_format: OutputFormat):
headers = ['run id', 'name', 'status', 'created at', 'experiment id']
data = [[
run.id, run.name, run.status,
run.created_at.isoformat(),
next(rr
for rr in run.resource_references
if rr.key.type == kfp_server_api.ApiResourceType.EXPERIMENT).key.id
]
for run in runs]
print_output(data, headers, output_format, table_format='grid')

View File

@ -17,15 +17,17 @@ import importlib
import pathlib
import sys
import textwrap
from typing import List, Optional, Union
import unittest
from typing import List, Optional, Union
from unittest import mock
from typer import testing
# Docker is an optional install, but we need the import to succeed for tests.
# So we patch it before importing kfp.cli.components.
if importlib.util.find_spec('docker') is None:
try:
import docker # pylint: disable=unused-import
except ImportError:
sys.modules['docker'] = mock.Mock()
from kfp.deprecated.cli import components

View File

@ -91,6 +91,6 @@ setuptools.setup(
'console_scripts': [
'dsl-compile = kfp.compiler.main:main',
'dsl-compile-deprecated = kfp.deprecated.compiler.main:main',
'kfp=kfp.__main__:main',
'kfp=kfp.cli.cli:main',
]
})