Run the following command to install the Kubeflow Pipelines SDK. If you run this command in a Jupyter
    notebook, restart the kernel after installing the SDK. 

In [None]:
%pip install kfp --upgrade
# to install tekton compiler uncomment the line below
# %pip install kfp_tekton

Import Packages

In [None]:
import json
import time
import yaml

import kfp
import kfp.components as comp
import kfp.dsl as dsl

In [None]:
SPARK_COMPLETED_STATE = "COMPLETED"
SPARK_APPLICATION_KIND = "sparkapplications"

In [None]:

def get_spark_job_definition():
    """
    Read Spark Operator job manifest file and return the corresponding dictionary and
    add some randomness in the job name
    :return: dictionary defining the spark job
    """
    # Read manifest file
    with open("spark-job.yaml", "r") as stream:
        spark_job_manifest = yaml.safe_load(stream)

    # Add epoch time in the job name
    epoch = int(time.time())
    spark_job_manifest["metadata"]["name"] = spark_job_manifest["metadata"]["name"].format(epoch=epoch)

    return spark_job_manifest

In [None]:
def print_op(msg):
    """
    Op to print a message.
    """
    return dsl.ContainerOp(
        name="Print message.",
        image="alpine:3.6",
        command=["echo", msg],
    )

In [None]:
@dsl.graph_component  # Graph component decorator is used to annotate recursive functions
def graph_component_spark_app_status(input_application_name):
    k8s_get_op = comp.load_component_from_file("k8s-get-component.yaml")
    check_spark_application_status_op = k8s_get_op(
        name=input_application_name,
        kind=SPARK_APPLICATION_KIND
    )
    # Remove cache
    check_spark_application_status_op.execution_options.caching_strategy.max_cache_staleness = "P0D"

    time.sleep(5)
    with dsl.Condition(check_spark_application_status_op.outputs["applicationstate"] != SPARK_COMPLETED_STATE):
        graph_component_spark_app_status(check_spark_application_status_op.outputs["name"])

In [None]:
@dsl.pipeline(
    name="Spark Operator job pipeline",
    description="Spark Operator job pipeline"
)
def spark_job_pipeline():

    # Load spark job manifest
    spark_job_definition = get_spark_job_definition()

    # Load the kubernetes apply component
    k8s_apply_op = comp.load_component_from_file("k8s-apply-component.yaml")

    # Execute the apply command
    spark_job_op = k8s_apply_op(object=json.dumps(spark_job_definition))

    # Fetch spark job name
    spark_job_name = spark_job_op.outputs["name"]

    # Remove cache for the apply operator
    spark_job_op.execution_options.caching_strategy.max_cache_staleness = "P0D"

    spark_application_status_op = graph_component_spark_app_status(spark_job_op.outputs["name"])
    spark_application_status_op.after(spark_job_op)

    print_message = print_op(f"Job {spark_job_name} is completed.")
    print_message.after(spark_application_status_op)
    print_message.execution_options.caching_strategy.max_cache_staleness = "P0D"


### Compile and run your pipeline

After defining the pipeline in Python as described in the preceding section, use one of the following options to compile the pipeline and submit it to the Kubeflow Pipelines service.

#### Option 1: Compile and then upload in UI

1.  Run the following to compile your pipeline and save it as `spark_job_pipeline.yaml`. 

For Argo (Default)

In [None]:
# create piepline file for argo backend the default one if you use tekton use the block below
if __name__ == "__main__":
    # Compile the pipeline
    import kfp.compiler as compiler
    import logging
    logging.basicConfig(level=logging.INFO)
    pipeline_func = spark_job_pipeline
    pipeline_filename = pipeline_func.__name__ + ".yaml"
    compiler.Compiler().compile(pipeline_func, pipeline_filename)
    logging.info(f"Generated pipeline file: {pipeline_filename}.")

For Tekton

In [None]:
# uncomment the block below to create pipeline file for tekton

# if __name__ == '__main__':
#     from kfp_tekton.compiler import TektonCompiler
#     import logging
#     logging.basicConfig(level=logging.INFO)
#     pipeline_func = spark_job_pipeline
#     pipeline_filename = pipeline_func.__name__ + ".yaml"
#     TektonCompiler().compile(pipeline_func, pipeline_filename)
#     logging.info(f"Generated pipeline file: {pipeline_filename}.")

2.  Upload and run your `spark_job_pipeline.yaml` using the Kubeflow Pipelines user interface.
See the guide to [getting started with the UI][quickstart].

[quickstart]: https://www.kubeflow.org/docs/components/pipelines/overview/quickstart

#### Option 2: run the pipeline using Kubeflow Pipelines SDK client

1.  Create an instance of the [`kfp.Client` class][kfp-client] following steps in [connecting to Kubeflow Pipelines using the SDK client][connect-api].

[kfp-client]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.client.html#kfp.Client
[connect-api]: https://www.kubeflow.org/docs/components/pipelines/sdk/connect-api

In [None]:
client = kfp.Client() # change arguments accordingly

In [None]:
client.create_run_from_pipeline_func(
   spark_job_pipeline)