* Adds secretscope and workspaceitem Ops to Azure Databricks for KFP * Adds dbfsblock Op to Azure Databricks for KFP * Adds code coverage to Azure Databricks for KFP tests * Changes email address to generic one * Sets single source of truth for package version * Removes all warnings from tests * Removes deprecated calls to Compiler().compile in pipelines * Removes unnecessary .gitignore line Co-authored-by: creddy123 <31089923+creddy123@users.noreply.github.com> |
||
|---|---|---|
| .. | ||
| databricks | ||
| tests | ||
| README.md | ||
| setup.py | ||
README.md
Introduction to Azure Databricks for Kubeflow Pipelines
Azure Databricks Package provides a set of Kubeflow Pipeline Tasks (Ops) which let us manipulate Databricks resources using the Azure Databricks Operator for Kubernetes. This makes the user experience much nicer, and less error prone, than using the ResourceOp to manipulate these Databricks resources.
Supported Ops
- CreateClusterOp, to create a cluster in Databricks.
- DeleteClusterOp, to delete a cluster created with CreateClusterOp.
- CreateJobOp, to create a Spark job in Databricks.
- DeleteJobOp, to delete a job created with CreateJobOp.
- SubmitRunOp, to submit a job run in Databricks.
- DeleteRunOp, to delete a run submitted with SubmitRunOp.
- CreateSecretScopeOp, to create a secret scope in Databricks.
- DeleteSecretScopeOp, to delete a secret scope created with CreateSecretScopeOp.
- ImportWorkspaceItemOp, to import an item into a Databricks Workspace.
- DeleteWorkspaceItemOp, to delete an item imported with ImportWorkspaceItemOp.
- CreateDbfsBlockOp, to create Dbfs Block in Databricks.
- DeleteDbfsBlockOp, to delete Dbfs Block created with CreateDbfsBlockOp.
For each of these there are two ways a Kubeflow user can create the Ops:
- By passing the complete Databricks spec for the Op within a Python Dictionary.
- By using named parameters.
Setup
- Create an Azure Databricks workspace
- Deploy the Azure Databricks Operator for Kubernetes
- Install the Kubeflow Pipelines SDK
- Install Databricks Package:
pip install -e "git+https://github.com/kubeflow/pipelines#egg=kfp-azure-databricks&subdirectory=samples/contrib/azure-samples/kfp-azure-databricks" --upgrade
To uninstall Databricks Package use:
pip uninstall kfp-azure-databricks
Example
The following sample pipeline will submit a one-time job run with implicit cluster creation to Azure Databricks:
import kfp.dsl as dsl
import databricks
@dsl.pipeline(
name="DatabricksRun",
description="A toy pipeline that computes an approximation to pi with Databricks."
)
def calc_pipeline(run_name="test-run", parameter="10"):
submit_run_task = databricks.SubmitRunOp(
name="submitrun",
run_name=run_name,
new_cluster={
"spark_version": "5.3.x-scala2.11",
"node_type_id": "Standard_D3_v2",
"num_workers": 2
},
libraries=[{"jar": "dbfs:/docs/sparkpi.jar"}],
spark_jar_task={
"main_class_name": "org.apache.spark.examples.SparkPi",
"parameters": [parameter]
}
)
delete_run_task = databricks.DeleteRunOp(
name="deleterun",
run_name=run_name
)
delete_run_task.after(submit_run_task)
This sample is based on the following article: Create a spark-submit job, which points to the library sparkpi.jar. You may upload the library to Databricks File System using DBFS CLI.
Example using ResourceOp
This sample pipeline shows the code that would be required to submit a one-time job run with implicit cluster creation to Azure Databricks, but using ResourceOp instead of this package:
import kfp.dsl as dsl
import kfp.compiler as compiler
@dsl.pipeline(
name="DatabricksRun",
description="A toy pipeline that computes an approximation to pi with Databricks."
)
def calc_pipeline(run_name="test-run", parameter="10"):
submit_run_task = dsl.ResourceOp(
name="submitrun",
k8s_resource={
"apiVersion": "databricks.microsoft.com/v1alpha1",
"kind": "Run",
"metadata": {
"name":run_name,
},
"spec":{
"run_name": run_name,
"new_cluster": {
"spark_version": "5.3.x-scala2.11",
"node_type_id": "Standard_D3_v2",
"num_workers": 2
},
"libraries": [{"jar": "dbfs:/docs/sparkpi.jar"}],
"spark_jar_task": {
"main_class_name": "com.databricks.ComputeModels",
"parameters": [parameter]
}
},
},
action="create",
success_condition="status.metadata.state.life_cycle_state in (TERMINATED, SKIPPED, INTERNAL_ERROR)",
attribute_outputs={
"name": "{.metadata.name}",
"job_id": "{.status.metadata.job_id}",
"number_in_job": "{.status.metadata.number_in_job}",
"run_id": "{.status.metadata.run_id}",
"run_name": "{.status.metadata.run_name}",
"life_cycle_state": "{.status.metadata.state.life_cycle_state}",
"result_state": "{.status.metadata.state.result_state}",
"notebook_output_result": "{.status.notebook_output.result}",
"notebook_output_truncated": "{.status.notebook_output.truncated}",
"error": "{.status.error}"
}
)
delete_run_task = dsl.ResourceOp(
name="deleterun",
k8s_resource={
"apiVersion": "databricks.microsoft.com/v1alpha1",
"kind": "Run",
"metadata": {
"name": run_name
}
},
action="delete"
)
delete_run_task.after(submit_run_task)
Additional examples
More sample pipelines can be found in folder samples/contrib/azure-samples/databricks-pipelines and in the tests of this package: samples/contrib/azure-samples/kfp-azure-databricks/tests.
Additional information
- Kubeflow Pipelines
- Azure Databricks documentation
- Azure Databricks Operator for Kubernetes
- Golang SDK for DataBricks REST API 2.0 and Azure DataBricks REST API 2.0, used by Azure Databricks Operator.
- Databricks REST API 2.0
- Azure Databricks REST API 2.0
The following articles provide information on the supported spec fields for the supported Databricks Ops:
- Cluster Ops: Azure Databricks Cluster API
- Job Ops: Azure Databricks Jobs API
- Run Ops: Azure Databricks Jobs API - Runs Submit
- Secret Scope Ops: Azure Databricks Secrets API
- Workspace Item Ops: Azure Databricks Workspace API
- DbfsBlock Ops: Azure Databricks DBFS API