How to initiate a streaming query in Pyspark

This recipe helps you initiate a streaming query in Pyspark

Recipe Objective: How to Initiate a streaming query in Pyspark?

In most big data scenarios, data merging and data aggregation are an essential part of the day-to-day activities in big data platforms. Spark Streaming is an extension of the core Spark API that allows data engineers and data scientists to process real-time data from various sources, including (but not limited to) Kafka, Flume, and Amazon Kinesis. This processed data can be pushed out to file systems, databases, and live dashboards. In this scenario, we are going to initiate a streaming query in Pyspark.

System requirements :

  • Install Ubuntu in the virtual machine click here
  • Install single-node Hadoop machine click here
  • Install pyspark or spark in Ubuntu click here
  • The below codes can be run in Jupyter notebook or any python console.

Step 1: Import the modules

In this scenario, we are going to import the pyspark and pyspark SQL modules and create a spark session as below :

Import pyspark from pyspark.sql import SparkSession from pyspark.sql.types import StructType spark = SparkSession.builder\ .master("local").appName("pyspark_stream_setup").getOrCreate()

Step 2: Create Schema

Here we are going to create a schema and assign it to the newly created DataFrame.

schooldrivers_Schema = StructType() \ .add("school_year", "string")\ .add("vendor_name", "string")\ .add("type_of_service", "string")\ .add("active_employees", "string")\ .add("job_type", "string")

Then we are going to print the content of the schema. Please follow the below code.

schooldrivers_Schema.simpleString()

The output of the code:

bigdata_1.jpg

In the output, we can see both the columns just added in the schema.

Step 3: Create Dataframe from Streaming

Here we are going to create the DataFrame from streaming as shown in the below code.

schooldrivers_stream_Df = spark.readStream\ .option("sep", ",")\ .schema(schooldrivers_Schema)\ .csv("/home/hduser/school_drives")

To make sure the DataFrame is in streaming mode, we use the isStreaming method shown below here.

schooldrivers_stream_Df.isStreaming

The output of the dataframe is in streaming mode:

bigdata_2.jpg

Step 4: Write to Stream

Here we will create a stream using the writeStream method, which will write into the console and keep on appending the incoming data.

query = schooldrivers_stream_Df.writeStream\ .format("console").outputMode("append").start()

When you run code, you will not see any output. When you place the drivers-1.csv file into the school_drivers directory, notice what happens then. You can observe the following output in the console where you start the jupyter notebook.

bigdata_3.jpg

The output of the streaming data: when we placed drivers-1.csv, it gave an output.

bigdata_4.jpg

Conclusion

Here we learned to Initiate a streaming query in Pyspark.

What Users are saying..

profile image

Savvy Sahai

Data Science Intern, Capgemini
linkedin profile url

As a student looking to break into the field of data engineering and data science, one can get really confused as to which path to take. Very few ways to do it are Google, YouTube, etc. I was one of... Read More

Relevant Projects

PySpark Project-Build a Data Pipeline using Kafka and Redshift
In this PySpark ETL Project, you will learn to build a data pipeline and perform ETL operations by integrating PySpark with Apache Kafka and AWS Redshift

Data Processing and Transformation in Hive using Azure VM
Hive Practice Example - Explore hive usage efficiently for data transformation and processing in this big data project using Azure VM.

PySpark Tutorial - Learn to use Apache Spark with Python
PySpark Project-Get a handle on using Python with Spark through this hands-on data processing spark python tutorial.

Streaming Data Pipeline using Spark, HBase and Phoenix
Build a Real-Time Streaming Data Pipeline for an application that monitors oil wells using Apache Spark, HBase and Apache Phoenix .

Build a Spark Streaming Pipeline with Synapse and CosmosDB
In this Spark Streaming project, you will learn to build a robust and scalable spark streaming pipeline using Azure Synapse Analytics and Azure Cosmos DB and also gain expertise in window functions, joins, and logic apps for comprehensive real-time data analysis and processing.

Implementing Slow Changing Dimensions in a Data Warehouse using Hive and Spark
Hive Project- Understand the various types of SCDs and implement these slowly changing dimesnsion in Hadoop Hive and Spark.

Talend Real-Time Project for ETL Process Automation
In this Talend Project, you will learn how to build an ETL pipeline in Talend Open Studio to automate the process of File Loading and Processing.

Retail Analytics Project Example using Sqoop, HDFS, and Hive
This Project gives a detailed explanation of How Data Analytics can be used in the Retail Industry, using technologies like Sqoop, HDFS, and Hive.

Learn Real-Time Data Ingestion with Azure Purview
In this Microsoft Azure project, you will learn data ingestion and preparation for Azure Purview.

SQL Project for Data Analysis using Oracle Database-Part 2
In this SQL Project for Data Analysis, you will learn to efficiently analyse data using JOINS and various other operations accessible through SQL in Oracle Database.