How to perform Spark Streaming with JSON Files from a directory

This recipe helps you perform Spark Streaming with JSON Files from a directory

Recipe Objective: How to perform Spark Streaming with JSON Files from a directory?

Working with streaming data is different from working with batch data. With streaming data, we will never have complete data for analysis, as data is continuously coming in. Apache Spark provides a streaming API to analyze streaming data in pretty much the same way we work with batch data. Apache Spark Structured Streaming is built on top of the Spark-SQL API to leverage its optimization. Spark Streaming is an engine to process data in real-time from sources and output data to external storage systems.

Spark Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads. It extends the core Spark API to process real-time data from sources like Kafka, Flume.

Kafka Interview Questions to Help you Prepare for your Big Data Job Interview

Implementation Info:

  1. Databricks Community Edition click here
  2. Spark-Scala
  3. JSON file 1 click here
  4. JSON file 2 click here
  5. JSON file 3 click here
  6. JSON file 4 click here
  7. JSON file 5 click here
  8. storage - Databricks File System(DBFS)

Step 1: Uploading data to DBFS

Follow the below steps to upload data files from local to DBFS

  1. Click create in Databricks menu
  2. Click Table in the drop-down menu, it will open a create new table UI
  3. In UI, specify the folder name in which you want to save your files.
  4. click browse to upload and upload files from local.
  5. path is like /FileStore/tables/your folder name/your file

Refer to the image below for example

bigdata_01.PNG

Step 2: Reading JSON Files from Directory

Spark Streaming has three major components: input sources, processing engine, and sink(destination). Spark Streaming engine processes incoming data from various input sources. Input sources generate data like Kafka, Flume, HDFS/S3/any file system, etc. Sinks store processed data from Spark Streaming engines like HDFS/File System, relational databases, or NoSQL DB's.

Here we are using the File system as a source for Streaming. Spark reads files written in a directory as a stream of data. Files will be processed in the order of file modification time. If the latestFirst is set, the order will be reversed. Supported file formats are text, CSV, JSON, ORC, Parquet. Note that the files must be atomically placed in the given directory, which can be achieved by file move operations in most file systems.

In our example the JSON Files are placed in /FileStore/tables/spark_stream directory. In databricks, the notebook itself creates a spark session when it is executed. So, there is no need to define spark sessions explicitly. Here we define the custom schema and impose it on the data while we read the JSON files. And to identify whether a DataFrame/Dataset has streaming data or not by using df.isStreaming.

Here while reading files from the directory, we are setting a property maxFilesPerTrigger = 2. Such that Spark reads two files per batch.

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} val schema = StructType( List(StructField("RecordNumber", IntegerType, true), StructField("Zipcode", StringType, true), StructField("ZipCodeType", StringType, true), StructField("City", StringType, true), StructField("State", StringType, true), StructField("LocationType", StringType, true), StructField("Lat", StringType, true), StructField("Long", StringType, true), StructField("Xaxis", StringType, true), StructField("Yaxis", StringType, true), StructField("Zaxis", StringType, true), StructField("WorldRegion", StringType, true), StructField("Country", StringType, true), StructField("LocationText", StringType, true), StructField("Location", StringType, true), StructField("Decommisioned", StringType, true))) val df = spark.readStream.schema(schema).option("maxFilesPerTrigger", 2).json("/FileStore/tables/spark_stream") df.printSchema() println("Streaming DataFrame : " + df.isStreaming)

Refer to the below image for reference

bigdata_02.PNG

Step 3: Basic transformation

Now we are Performing a basic transformation on df to generate another column "address" and dropping "City", "State", "Country", "Zipcode" columns. We must import "org.apache.spark.sql.functions._" before doing any column-level operations over the dataframe.

import org.apache.spark.sql.functions._ var final_df = df.withColumn("address",concat_ws(",",col("City"),col("State"),col("Country"),col("Zipcode"))) .drop("City","State","Country","Zipcode") final_df.printSchema()

Refer to the below image for reference

bigdata_03.PNG

Step 4: Output to the console

After processing the streaming data, Spark needs to store it somewhere on persistent storage. Spark uses various output modes to store the streaming data. Here we are using "append" since we didn't perform any aggregation over the data.

Append Mode: In this mode, Spark will output only newly processed rows since the last trigger.

Update Mode: In this mode, Spark will output only updated rows since the last trigger. If we are not using aggregation on streaming data, then it will behave similarly to append mode.

Complete Mode: In this mode, Spark will output all the rows it has processed.

final_df.writeStream.format("console") .outputMode("append").start() // Start the computation .awaitTermination()

Refer to the below image for output reference. While reading, we mentioned only two files to be processed per trigger; we are getting three batches as we have five files in the source directory.

bigdata_04.PNG

Conclusion

You have learned how to stream or read a JSON file from a directory using a Scala example. Spark Structured Streaming uses readStream to read, and writeStream to write DataFrame/Dataset and learned the difference between complete and append outputMode.

What Users are saying..

profile image

Abhinav Agarwal

Graduate Student at Northwestern University
linkedin profile url

I come from Northwestern University, which is ranked 9th in the US. Although the high-quality academics at school taught me all the basics I needed, obtaining practical experience was a challenge.... Read More

Relevant Projects

Yelp Data Processing Using Spark And Hive Part 1
In this big data project, you will learn how to process data using Spark and Hive as well as perform queries on Hive tables.

AWS Project - Build an ETL Data Pipeline on AWS EMR Cluster
Build a fully working scalable, reliable and secure AWS EMR complex data pipeline from scratch that provides support for all data stages from data collection to data analysis and visualization.

Python and MongoDB Project for Beginners with Source Code-Part 2
In this Python and MongoDB Project for Beginners, you will learn how to use Apache Sedona and perform advanced analysis on the Transportation dataset.

Migration of MySQL Databases to Cloud AWS using AWS DMS
IoT-based Data Migration Project using AWS DMS and Aurora Postgres aims to migrate real-time IoT-based data from an MySQL database to the AWS cloud.

Databricks Data Lineage and Replication Management
Databricks Project on data lineage and replication management to help you optimize your data management practices | ProjectPro

SQL Project for Data Analysis using Oracle Database-Part 2
In this SQL Project for Data Analysis, you will learn to efficiently analyse data using JOINS and various other operations accessible through SQL in Oracle Database.

Learn Efficient Multi-Source Data Processing with Talend ETL
In this Talend ETL Project , you will create a multi-source ETL Pipeline to load data from multiple sources such as MySQL Database, Azure Database, and API to Snowflake cloud using Talend Jobs.

Data Processing and Transformation in Hive using Azure VM
Hive Practice Example - Explore hive usage efficiently for data transformation and processing in this big data project using Azure VM.

Learn Data Processing with Spark SQL using Scala on AWS
In this AWS Spark SQL project, you will analyze the Movies and Ratings Dataset using RDD and Spark SQL to get hands-on experience on the fundamentals of Scala programming language.

Flask API Big Data Project using Databricks and Unity Catalog
In this Flask Project, you will use Flask APIs, Databricks, and Unity Catalog to build a secure data processing platform focusing on climate data. You will also explore advanced features like Docker containerization, data encryption, and detailed data lineage tracking.