Submit Databricks job using Rest API in Airflow

Chetanya-Patil
3 min readNov 23, 2023

--

Experience using rest api to submit job to databricks using /api/2.1/jobs/runs/submit from Airflow.

Apache Airflow is a very popular data tool to orchestrate tasks and creating data pipelines.

Here, we will going to use rest api end points to submit our jobs to databricks cluster. But we have to submit a request from Airflow code. For this as we know that we will be using requests library to submit the requests. So we have python script which will submit our requests to databricks cluster through rest api.

In Airflow, we have python operator which will help us to execute python scripts.

Python Operator

The PythonOperator is one of the core operators that allows you to define a custom Python function or callable to be executed as a task in a DAG. This operator is quite versatile and is commonly used for running Python code as part of an Airflow workflow

To know how to submit job to databricks using Rest API please visit:-
https://medium.com/@chetanyapatil/submit-databricks-job-using-rest-api-launched-by-runs-submit-api-57011cf644e1

Sample code of Airflow with python operator:


# Imports
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from utils.databricks_job_submitter import DatabricksJobSubmitter

# Variables
schedule_interval = None

# Define the job configs for job to submit.
# please have reference of sample job_config from https://medium.com/@chetanyapatil/submit-databricks-job-using-rest-api-launched-by-runs-submit-api-57011cf644e1
job_config = {

}

# Default arguments which we have pass
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=2)
}

# Instance of DAG
dag = DAG("databricks_dag",
start_date=datetime(2021,1,1),
schedule='@daily',
dafault_args=default_args)


submit_job = PythonOperator(
task_id="trigger_spark_jar_task",
# Here we calling run method which is present in DatabrickJobSubmitter class
python_callable=DatabricksJobSubmitter.run(job_config=job_config),
dag=dag
)

submit_job

Above in airflow code we have python operator were we are using python_callable to execute python script as a task in airflow. from which we are calling run method. run method have complete logic to submit our job to databricks cluster using rest api endpoints call.


# Imports
import time
from datetime import datetime, timedelta
import requests
import json

# DatabricksJobSubmitter class
class DatabricksJobSubmitter:
def __init__(self):
self.host = "https://adb-178*****86047.7.azuredatabricks.net"

def run(self,job_config):
print("Running job on Databricks")
# Add your databricks workspace access token here
auth_token = "<databricks_access token>"
self.headers = {"Authorization": f"Bearer {auth_token}"}

self.job_run(job_config)

def job_run(self,job_params):
submit_url = "{host}/api/2.1/jobs/runs/submit".format(host=self.host)
print(submit_url)

try:
submit_job = requests.post(submit_url,
headers=self.headers,
json=job_params)

# printing status code
print(submit_job.status_code)
response = submit_job.json()
print(response) # we will get {'run_id': 844875314241917} as response
run_id = submit_job.json()["run_id"]
print(run_id)
except Exception as e:
print("Error in submitting spark job at {} with params {}, error {}".format(submit_url,job_params,e))

To get information about a Databricks job submitted by using the jobs/runs/submit API endpoint, you can use the jobs/runs/get API endpoint with the run_id parameter. Here's a basic example in Python using the requests library:

import requests

# Set your Databricks workspace URL and access token
workspace_url = "https://<your-databricks-workspace-url>"
token = "<your-databricks-access-token>"

# Replace <your-run-id> with the actual run ID obtained after submitting the job
run_id = "<your-run-id>"

# Get information about the submitted job
get_run_response = requests.get(
f"{workspace_url}/api/2.1/jobs/runs/get",
headers={"Authorization": f"Bearer {token}"},
params={"run_id": run_id},
)

# Extract and print job run information
job_run_info = get_run_response.json()["run"]
print("Job Run Information:")
print(f"Run ID: {job_run_info['run_id']}")
print(f"Job ID: {job_run_info['job_id']}")
print(f"Cluster ID: {job_run_info['cluster_instance']['cluster_id']}")
print(f"State: {job_run_info['state']['life_cycle_state']}")
print(f"Result State: {job_run_info['state']['result_state']}")
print(f"Start Time: {job_run_info['start_time']}")
print(f"End Time: {job_run_info['end_time']}")

This script retrieves information about a specific run using the jobs/runs/get API endpoint and prints relevant details such as run ID, job ID, cluster ID, state, start time, and end time. We can modify the script based on the specific information we need.

To know more about response we got after submitting job to databricks cluster please visit:
< Add blog post link here>

Thanks for Reading!

If you like my work and want to support me…

  1. The BEST way to support me is by following me on Medium.
  2. I share content about #dataengineering. Let’s connect on LinkedIn.
  3. Feel free to give claps so I know how helpful this post was for you.

# databricksapi #rest api #databricks

--

--