58 lines
1.7 KiB
Python
58 lines
1.7 KiB
Python
"""Submit a Job with implicit cluster creation to Databricks. Then submit a Run for that Job."""
|
|
import kfp.dsl as dsl
|
|
import kfp.compiler as compiler
|
|
import databricks
|
|
|
|
def create_job(job_name):
|
|
return databricks.CreateJobOp(
|
|
name="createjob",
|
|
job_name=job_name,
|
|
new_cluster={
|
|
"spark_version":"5.3.x-scala2.11",
|
|
"node_type_id": "Standard_D3_v2",
|
|
"num_workers": 2
|
|
},
|
|
libraries=[{"jar": "dbfs:/docs/sparkpi.jar"}],
|
|
spark_jar_task={
|
|
"main_class_name": "org.apache.spark.examples.SparkPi"
|
|
}
|
|
)
|
|
|
|
def submit_run(run_name, job_name, parameter):
|
|
return databricks.SubmitRunOp(
|
|
name="submitrun",
|
|
run_name=run_name,
|
|
job_name=job_name,
|
|
jar_params=[parameter]
|
|
)
|
|
|
|
def delete_run(run_name):
|
|
return databricks.DeleteRunOp(
|
|
name="deleterun",
|
|
run_name=run_name
|
|
)
|
|
|
|
def delete_job(job_name):
|
|
return databricks.DeleteJobOp(
|
|
name="deletejob",
|
|
job_name=job_name
|
|
)
|
|
|
|
@dsl.pipeline(
|
|
name="DatabricksJob",
|
|
description="A toy pipeline that computes an approximation to pi with Azure Databricks."
|
|
)
|
|
def calc_pipeline(job_name="test-job", run_name="test-job-run", parameter="10"):
|
|
create_job_task = create_job(job_name)
|
|
submit_run_task = submit_run(run_name, job_name, parameter)
|
|
submit_run_task.after(create_job_task)
|
|
delete_run_task = delete_run(run_name)
|
|
delete_run_task.after(submit_run_task)
|
|
delete_job_task = delete_job(job_name)
|
|
delete_job_task.after(delete_run_task)
|
|
|
|
if __name__ == "__main__":
|
|
compiler.Compiler()._create_and_write_workflow(
|
|
pipeline_func=calc_pipeline,
|
|
package_path=__file__ + ".tar.gz")
|