How To Use The Airflow SparkSubmitOperator in DAGs?

This simple Airflow code example introduces you to the Airflow SparkSubmitOperator and helps you learn how to use it with DAGs.

Objective: How To Use The Airflow SparkSubmitOperator in DAGs?

This Airflow code example will help you understand how to use the SparkSubmitOperator in Airflow DAG.

What is the Airflow SparkSubmitOperator?

The Apache Airflow SparkSubmitOperator is an operator that allows you to run Spark jobs from within your Airflow DAGs. It is a wrapper around the spark-submit binary. It offers various features that make it easy to use Apache Spark with Airflow, such as-

  • Support for different cluster managers and deploy modes,

  • The ability to pass arbitrary Spark configuration properties,

  • The ability to upload additional files to the executor running the job, and

  • The ability to submit additional jars to the job.

System Requirements For The Airflow SparkSubmitOperator Example

  • Install Ubuntu on the virtual machine click here

  • Install Apache Airflow click here

  • Install packages if you are using the latest version of Airflow 

pip3 install apache-airflow-providers-apache-spark pip3 install apache-airflow-providers-cncf-kubernetes

In this Airflow code example, you will schedule a dag file to submit and run a spark job using the SparkSubmitOperator.

Before you create the dag file, create a pyspark job file as below in your local

sudo gedit sparksubmit_basic.py

In this sparksubmit_basic.py file, you will be using sample code for word and line count program.

from pyspark import SparkContext logFilepath = "file:////home/hduser/wordcount.txt" sc = SparkContext("local", "first app") logData = sc.textFile(logFilepath).cache() numAs = logData.filter(lambda s: 'a' in s).count() numBs = logData.filter(lambda s: 'b' in s).count() print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

As in the above code, you will use some text files to count. Create a text file, add some text and give the path as above.

To create a dag file in /airflow/dags folder, you will use the below command.

sudo gedit sparkoperator_demo.py

After making the dag file in the dags folder, follow the below steps to write a dag file.

Steps Involved In The SparkSubmitOperator Airflow Example

The following steps will help you understand how to use the SparkSubmitOperator in Airflow DAGs with the help of a simple SparkSubmitOperator Airflow example.

Step 1: Importing Airflow SparkSubmitOperator And Python Modules

This step involves importing the Airflow SparkSubmitOperator and other Python dependencies needed for the workflow.

import airflow from datetime import timedelta from airflow import DAG from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator from airflow.utils.dates import days_ago

Step 2: Defining Default Arguments

You must define all the default and DAG-specific arguments.

default_args = { 'owner': 'airflow', #'start_date': airflow.utils.dates.days_ago(2), # 'end_date': datetime(), # 'depends_on_past': False, # 'email': ['airflow@example.com'], # 'email_on_failure': False, #'email_on_retry': False, # If a task fails, retry it once after waiting # at least 5 minutes #'retries': 1, 'retry_delay': timedelta(minutes=5) }

Explore These End-to-End Solved Apache Spark Projects And Become A Big Data Pro

Step 4: Instantiate A DAG

The next step involves assigning the DAG name and configuring the schedule and DAG settings.

dag_spark = DAG( dag_id = "sparkoperator_demo", default_args=args, # schedule_interval='0 0 * * *', schedule_interval='@once', dagrun_timeout=timedelta(minutes=60), description='use case of sparkoperator in airflow', start_date = airflow.utils.dates.days_ago(1) )

As shown in the table below, you can schedule by giving preset or cron format.

preset

meaning

cron

None

Don't schedule; use exclusively "externally triggered" DAGs.

 

@once

Schedule once and only once

 

@hourly

Run once an hour at the beginning of the hour

0 * * * *

@daily

Run once a day at midnight

0 0 * * *

@weekly

Run once a week at midnight on Sunday morning

0 0 * * 0

@monthly

Run once a month at midnight on the first day of the month

0 0 1 * *

@yearly

Run once a year at midnight of January 1

0 0 1 1 *

Note- Use schedule_interval=None and not schedule_interval='None' when you don't want to schedule your DAG.

Step 5: Set The Airflow SparkSubmitOperator Tasks

The next step is setting up all the tasks you want in the workflow. In the below code, spark_submit_local code is a task created by instantiating.

spark_submit_local = SparkSubmitOperator( application ='/home/hduser/basicsparksubmit.py' , conn_id= 'spark_local', task_id='spark_submit_task', dag=dag_spark )

Step 6: Setting Up Airflow SparkSubmitOperator Task Dependencies

In this step, you must set up the dependencies or the order in which the tasks should be executed. Here are a few ways you can define dependencies between them-

spark_submit_local if __name__ == "__main__": dag_spark.cli()

The above code lines explain that spark_submit_local will execute. 

A DAG is a Python file that organizes tasks and sets their execution context. DAGs do not perform any actual computation. However, tasks are the element of Airflow that actually "do the work" we want to be performed. To create a complete data pipeline, you must write the configuration and organize the tasks in specific orders.

Step 9: Creating The Apache Airflow-Spark Connection

This step involves creating the Airflow connection to Spark as shown below-

Airflow Spark Connection

Go to the Admin tab and select Connections; then, you will get a new window to create and pass the details of the Spark connection as below.

Create New Airflow Spark Connection

Click on the plus button beside the action tab to create a connection in Airflow to connect to Spark. 

Passing Details For New Airflow Spark Connection

Fill in the Conn Id, select Spark for the Conn Type, mention the host, specify the host name, and write the Spark home in the extra field.

Step 7: Verifying The SparkSubmitOperator Airflow Tasks

You will unpause the sparkoperator_demo dag file as shown below-

Airflow DAG Folder

Click on the "sparkoperator_demo" name to check the dag log file and then select the graph view; as shown in the image below, you have a task called spark_submit_task.

Airflow SparkSubmitOperator Task Dependency

To check the log file how the query ran, click on the spark_submit_task in graph view. You will get the below window- 

Airflow SparkSubmitOperator Task Output Log

Click on the log tab to check the log file.

Spark_Submit_Task Output Log Details

The above log file shows that the task has started running, and the image below shows the task's output.

Spark_Submit_Task Output

Dive Deeper Into The Airflow SparkSubmitOperator With ProjectPro

The Airflow SparkSubmitOperator is a game-changer when it comes to seamlessly integrating Apache Spark into your data pipelines. Following the steps outlined in this tutorial, you have learned how to leverage the SparkSubmitOperator within your Airflow DAGs, enabling you to submit Spark jobs with ease and precision. To further enhance your Airflow expertise and dive deeper into SparkSubmitOperator, explore the Airflow projects offered by ProjectPro. These hands-on end-to-end solved projects provide a practical and immersive learning experience, equipping you with the knowledge and skills to effectively incorporate SparkSubmitOperator into real-world data workflows.

What Users are saying..

profile image

Ray han

Tech Leader | Stanford / Yale University
linkedin profile url

I think that they are fantastic. I attended Yale and Stanford and have worked at Honeywell,Oracle, and Arthur Andersen(Accenture) in the US. I have taken Big Data and Hadoop,NoSQL, Spark, Hadoop... Read More

Relevant Projects

Migration of MySQL Databases to Cloud AWS using AWS DMS
IoT-based Data Migration Project using AWS DMS and Aurora Postgres aims to migrate real-time IoT-based data from an MySQL database to the AWS cloud.

Getting Started with Pyspark on AWS EMR and Athena
In this AWS Big Data Project, you will learn to perform Spark Transformations using a real-time currency ticker API and load the processed data to Athena using Glue Crawler.

SQL Project for Data Analysis using Oracle Database-Part 3
In this SQL Project for Data Analysis, you will learn to efficiently write sub-queries and analyse data using various SQL functions and operators.

Build Streaming Data Pipeline using Azure Stream Analytics
In this Azure Data Engineering Project, you will learn how to build a real-time streaming platform using Azure Stream Analytics, Azure Event Hub, and Azure SQL database.

Build Serverless Pipeline using AWS CDK and Lambda in Python
In this AWS Data Engineering Project, you will learn to build a serverless pipeline using AWS CDK and other AWS serverless technologies like AWS Lambda and Glue.

Explore features of Spark SQL in practice on Spark 2.0
The goal of this spark project for students is to explore the features of Spark SQL in practice on the latest version of Spark i.e. Spark 2.0.

GCP Project to Learn using BigQuery for Exploring Data
Learn using GCP BigQuery for exploring and preparing data for analysis and transformation of your datasets.

PySpark Project-Build a Data Pipeline using Hive and Cassandra
In this PySpark ETL Project, you will learn to build a data pipeline and perform ETL operations by integrating PySpark with Hive and Cassandra

AWS Snowflake Data Pipeline Example using Kinesis and Airflow
Learn to build a Snowflake Data Pipeline starting from the EC2 logs to storage in Snowflake and S3 post-transformation and processing through Airflow DAGs

Flask API Big Data Project using Databricks and Unity Catalog
In this Flask Project, you will use Flask APIs, Databricks, and Unity Catalog to build a secure data processing platform focusing on climate data. You will also explore advanced features like Docker containerization, data encryption, and detailed data lineage tracking.