pipelines/samples/contrib/azure-samples/databricks-pipelines/pipeline_cli.py

86 lines
2.4 KiB
Python

import os
import sys
import logging
import kfp
import fire
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO)
class MyCLI:
"""
CLI for Kubeflow Pipelines.
This CLI allows us to compile and run our pipelines in Kubeflow Pipelines without accessing
Kubeflow portal.
"""
@staticmethod
def compile(
path
):
"""
Compile a Kubeflow pipeline.
Args:
path: Path to the pipeline (e.g. databricks_run_pipeline.py)
"""
logging.info("Compiling '%s'...", path)
compiled_path = f"{path}.tar.gz"
result = os.system(f"dsl-compile --py {path} --output {compiled_path}")
if result != 0:
logging.error("Failed to compile '%s' with error code %i.", path, result)
sys.exit(result)
return compiled_path
@staticmethod
def run(
path,
host,
params={}
):
"""
Run a compiled pipeline in Kubeflow.
Args:
path: Path to the compiled pipeline (e.g. databricks_run_pipeline.py.tar.gz)
host: Host name to use to talk to Kubeflow Pipelines (e.g. http://localhost:8080/pipeline)
params: Pipeline parameters (e.g. '{\"run_name\":\"test-run\",\"parameter\":\"10\"}')
"""
logging.info("Running '%s' in '%s'...", path, host)
client = kfp.Client(f"{host}")
try:
result = client.create_run_from_pipeline_package(
pipeline_file=path,
arguments=params
)
logging.info("View run: %s/#/runs/details/%s",
host,
result.run_id)
except Exception as ex:
logging.error("Failed to run '{%s}' with error:\n{%s}", path, ex)
sys.exit(1)
@staticmethod
def compile_run(
path,
host,
params={}
):
"""
Compile and run a Kubeflow pipeline.
Args:
path: Path to the pipeline (e.g. databricks_run_pipeline.py)
host: Host name to use to talk to Kubeflow Pipelines (e.g. http://localhost:8080/pipeline)
params: Pipeline parameters (e.g. '{\"run_name\":\"test-run\",\"parameter\":\"10\"}')
"""
compiled_path = MyCLI.compile(path)
MyCLI.run(compiled_path, host, params)
if __name__ == '__main__':
fire.Fire(MyCLI())