pipelines/samples/contrib/azure-samples/kfp-azure-databricks
Alejandro Campos Magencio 0708cd723e Add new Ops to Azure Databricks for KFP: secretscope, workspaceitem & dbfsblock (#2817)
* Adds secretscope and workspaceitem Ops to Azure Databricks for KFP

* Adds dbfsblock Op to Azure Databricks for KFP

* Adds code coverage to Azure Databricks for KFP tests

* Changes email address to generic one

* Sets single source of truth for package version

* Removes all warnings from tests

* Removes deprecated calls to Compiler().compile in pipelines

* Removes unnecessary .gitignore line

Co-authored-by: creddy123 <31089923+creddy123@users.noreply.github.com>
2020-01-21 08:13:36 -08:00
..
databricks Add new Ops to Azure Databricks for KFP: secretscope, workspaceitem & dbfsblock (#2817) 2020-01-21 08:13:36 -08:00
tests Add new Ops to Azure Databricks for KFP: secretscope, workspaceitem & dbfsblock (#2817) 2020-01-21 08:13:36 -08:00
README.md Add new Ops to Azure Databricks for KFP: secretscope, workspaceitem & dbfsblock (#2817) 2020-01-21 08:13:36 -08:00
setup.py Add new Ops to Azure Databricks for KFP: secretscope, workspaceitem & dbfsblock (#2817) 2020-01-21 08:13:36 -08:00

README.md

Introduction to Azure Databricks for Kubeflow Pipelines

Azure Databricks Package provides a set of Kubeflow Pipeline Tasks (Ops) which let us manipulate Databricks resources using the Azure Databricks Operator for Kubernetes. This makes the user experience much nicer, and less error prone, than using the ResourceOp to manipulate these Databricks resources.

Supported Ops

  • CreateClusterOp, to create a cluster in Databricks.
  • DeleteClusterOp, to delete a cluster created with CreateClusterOp.
  • CreateJobOp, to create a Spark job in Databricks.
  • DeleteJobOp, to delete a job created with CreateJobOp.
  • SubmitRunOp, to submit a job run in Databricks.
  • DeleteRunOp, to delete a run submitted with SubmitRunOp.
  • CreateSecretScopeOp, to create a secret scope in Databricks.
  • DeleteSecretScopeOp, to delete a secret scope created with CreateSecretScopeOp.
  • ImportWorkspaceItemOp, to import an item into a Databricks Workspace.
  • DeleteWorkspaceItemOp, to delete an item imported with ImportWorkspaceItemOp.
  • CreateDbfsBlockOp, to create Dbfs Block in Databricks.
  • DeleteDbfsBlockOp, to delete Dbfs Block created with CreateDbfsBlockOp.

For each of these there are two ways a Kubeflow user can create the Ops:

  1. By passing the complete Databricks spec for the Op within a Python Dictionary.
  2. By using named parameters.

Setup

  1. Create an Azure Databricks workspace
  2. Deploy the Azure Databricks Operator for Kubernetes
  3. Install the Kubeflow Pipelines SDK
  4. Install Databricks Package:
pip install -e "git+https://github.com/kubeflow/pipelines#egg=kfp-azure-databricks&subdirectory=samples/contrib/azure-samples/kfp-azure-databricks" --upgrade

To uninstall Databricks Package use:

pip uninstall kfp-azure-databricks

Example

The following sample pipeline will submit a one-time job run with implicit cluster creation to Azure Databricks:

import kfp.dsl as dsl
import databricks

@dsl.pipeline(
    name="DatabricksRun",
    description="A toy pipeline that computes an approximation to pi with Databricks."
)
def calc_pipeline(run_name="test-run", parameter="10"):
    submit_run_task = databricks.SubmitRunOp(
        name="submitrun",
        run_name=run_name,
        new_cluster={
            "spark_version": "5.3.x-scala2.11",
            "node_type_id": "Standard_D3_v2",
            "num_workers": 2
        },
        libraries=[{"jar": "dbfs:/docs/sparkpi.jar"}],
        spark_jar_task={
            "main_class_name": "org.apache.spark.examples.SparkPi",
            "parameters": [parameter]
        }
    )

    delete_run_task = databricks.DeleteRunOp(
        name="deleterun",
        run_name=run_name
    )
    delete_run_task.after(submit_run_task)    

This sample is based on the following article: Create a spark-submit job, which points to the library sparkpi.jar. You may upload the library to Databricks File System using DBFS CLI.

Example using ResourceOp

This sample pipeline shows the code that would be required to submit a one-time job run with implicit cluster creation to Azure Databricks, but using ResourceOp instead of this package:

import kfp.dsl as dsl
import kfp.compiler as compiler

@dsl.pipeline(
    name="DatabricksRun",
    description="A toy pipeline that computes an approximation to pi with Databricks."
)
def calc_pipeline(run_name="test-run", parameter="10"):
    submit_run_task = dsl.ResourceOp(
        name="submitrun",
        k8s_resource={
            "apiVersion": "databricks.microsoft.com/v1alpha1",
            "kind": "Run",
            "metadata": {
                "name":run_name,
            },
            "spec":{
                "run_name": run_name,
                "new_cluster": {
                    "spark_version": "5.3.x-scala2.11",
                    "node_type_id": "Standard_D3_v2",
                    "num_workers": 2
                },
                "libraries": [{"jar": "dbfs:/docs/sparkpi.jar"}],
                "spark_jar_task": {
                    "main_class_name": "com.databricks.ComputeModels",
                    "parameters": [parameter]
                }
            },
        },
        action="create",
        success_condition="status.metadata.state.life_cycle_state in (TERMINATED, SKIPPED, INTERNAL_ERROR)",
        attribute_outputs={
            "name": "{.metadata.name}",
            "job_id": "{.status.metadata.job_id}",
            "number_in_job": "{.status.metadata.number_in_job}",
            "run_id": "{.status.metadata.run_id}",
            "run_name": "{.status.metadata.run_name}",
            "life_cycle_state": "{.status.metadata.state.life_cycle_state}",
            "result_state": "{.status.metadata.state.result_state}",
            "notebook_output_result": "{.status.notebook_output.result}",
            "notebook_output_truncated": "{.status.notebook_output.truncated}",
            "error": "{.status.error}"
        }
    )

    delete_run_task = dsl.ResourceOp(
        name="deleterun",
        k8s_resource={
            "apiVersion": "databricks.microsoft.com/v1alpha1",
            "kind": "Run",
            "metadata": {
                "name": run_name
            }
        },
        action="delete"
    )
    delete_run_task.after(submit_run_task)

Additional examples

More sample pipelines can be found in folder samples/contrib/azure-samples/databricks-pipelines and in the tests of this package: samples/contrib/azure-samples/kfp-azure-databricks/tests.

Additional information

The following articles provide information on the supported spec fields for the supported Databricks Ops: