Explain Window Aggregate and Analytic Functions in Spark SQL

This recipe explains what Window Aggregate and Analytic Functions in Spark SQL

Recipe Objective: Explain Window Aggregate and Analytic Functions in Spark SQL

Window functions operate on a group of rows, referred to as a window, and calculate a return value for each row based on the group of rows. Window functions are helpful for processing tasks such as calculating a moving average, computing a cumulative statistic, or accessing the value of rows given the relative position of the current row. Here we focus on the Aggregate functions like max, min, avg, sum, count, and Analytical functions Cumulative distribution, Lag, Lead. These operations carried over a column of rows within a window. Here, a window refers to a group of columns packed based on a specific column or columns values.

Learn Spark SQL for Relational Big Data Procesing

Implementation Info:

  1. Databricks Community Edition click here
  2. Spark-Scala
  3. storage - Databricks File System(DBFS)

Planned Module of learning flows as below:

  1. Create a test DataFrame
  2. Aggregate Functions (max, min, avg, sum, count)
  3. Cumulative distribution Function
  4. Lag Function
  5. Lead Function

1. Create a test DataFrame

Here,we are creating salary_df containing columns "dept", "emp_id", "salary". This DataFrame is used to group rows department-wise and perform aggregate functions to find max, min, avg, the sum of each department, and count of employees in each department.

println("creating a DataFrame") val salary_df = Seq(("sales", 1, 50000),("personnel", 2, 39000), ("sales", 3, 48000),("sales", 4, 48000), ("personnel", 5, 35000),("personnel", 12, 36000), ("develop", 7, 42000),("develop", 8, 60000), ("develop", 9, 45000),("develop", 10, 52000), ("develop", 11, 52000),("engineering", 13, 42000), ("engineering", 15, 60000),("engineering", 21, 45000)) .toDF("dept","emp_id","salary") salary_df.show()

bigdata_1.PNG

2. Aggregate Functions (max, min, avg, sum, count)

The window is created by doing partitionBy on "dept" Hence, a group of records belonging to a particular department act as a window partition. Here I have applied all aggregate functions max, min, avg, sum, count to calculate respective values over the partition.

//Aggregate functions import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window println("Aggregate Functions max,min,avg,sum,count") val window = Window.partitionBy("dept") val aggregate_df = salary_df.withColumn("max_salary_dept",max("salary").over(window)) .withColumn("min_salary_dept",min("salary").over(window)) .withColumn("avg_salary_dept",avg("salary").over(window)) .withColumn("sum_salary_dept",sum("salary").over(window)) .withColumn("num_employee_dept",count("emp_id").over(window)) aggregate_df.show()

bigdata_2.PNG

3. Cumulative distribution Function

This function returns the cumulative distribution of values within a window partition. The return value for a record in the partition is based on the formula, i.e., let N = the number of rows less than or equal to the current row value. And M is the total number of rows within the partition. Therefore cume_dist of a row is (N/M). i.e., In the given example develop department's lowest salary is 42000 and the number of records within the development window is 5. Therefore cume_dist of emp_id 7 is 0.2

//Cumulative distribution function: import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window println("Cumulative distribution function") val window = Window.partitionBy("dept").orderBy("salary") val cume_dist_df = salary_df.withColumn("max_salary_dept",cume_dist().over(window)) cume_dist_df.show()

bigdata_3.PNG

4. Lag Function

This function will return the value before offset rows from DataFrame. The lag function takes three arguments (lag(col, count = 1, default = None)), col: defines the columns on which function needs to be applied. count: for how many rows we need to look back. default: defines the default value. If you observe, I gave an offset value and created a new column lag_salary. Then used to generate a trend column where the current row compares with the preceding row salary. If the salary of current row> preceding row trend assigned with "HIGH" and if it is low then "LOW" and if salaries are equal then "SAME" is assigned to trend col.

//Lag function: import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window println("Lag function") val window = Window.partitionBy("dept").orderBy("emp_id") var lag_df = salary_df.withColumn("lag_salary",lag("salary",1).over(window)) //Here we are creating a trend such that ordering employees based on emp_id and comparing salary with his immediate above employee in his department lag_df = lag_df.withColumn("trend_col",when((col("lag_salary") > col("salary")),lit("HIGH")) .when((col("lag_salary") < col("salary")),lit("LOW")) .when((col("lag_salary") === col("salary")),lit("SAME")) .when(col("lag_salary").isNull,lit("REFERENCE"))) lag_df.show()

bigdata_4.PNG

5. Lead Function

This function will return the value after the offset rows from DataFrame. Lead function takes 3 arguments (lead(col, count = 1, default = None)). col: defines the columns on which the function needs to be applied. count: for how many rows we need to look forward/after the current row. If you observe this, I gave an offset value and created a new column lead_salary. Then generated a trend column where the current row compares with the immediate below row salary and If the salary of current row> next row trend assigned with "HIGH" and if it is low then "LOW" and if salaries are equal, then "SAME" is assigned to trend col. default: defines the default value.

//Window analytical functions: //Lead function: import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window println("Lead function") val window = Window.partitionBy("dept").orderBy("emp_id") var lead_df = salary_df.withColumn("lead_salary",lead("salary",1).over(window)) //Here we are creating a trend such that ordering employees based on emp_id and comparing salary with his immediate below employee in his department lead_df = lead_df.withColumn("trend_col",when((col("lead_salary") < col("salary")),lit("HIGH")) .when((col("lead_salary") > col("salary")),lit("LOW")) .when((col("lead_salary") === col("salary")),lit("SAME")) .when(col("lead_salary").isNull,lit("REFERENCE"))) lead_df.show()

bigdata_5.PNG

Conclusion

Thus we learned about Aggregate functions like max, min, avg, sum, count, and Analytical functions like Cumulative distribution, Lag, Lead, which were carried over a column of rows within a window (a group of columns packed based on a specific column or columns values.)

What Users are saying..

profile image

Ameeruddin Mohammed

ETL (Abintio) developer at IBM
linkedin profile url

I come from a background in Marketing and Analytics and when I developed an interest in Machine Learning algorithms, I did multiple in-class courses from reputed institutions though I got good... Read More

Relevant Projects

How to deal with slowly changing dimensions using snowflake?
Implement Slowly Changing Dimensions using Snowflake Method - Build Type 1 and Type 2 SCD in Snowflake using the Stream and Task Functionalities

PySpark Tutorial - Learn to use Apache Spark with Python
PySpark Project-Get a handle on using Python with Spark through this hands-on data processing spark python tutorial.

Python and MongoDB Project for Beginners with Source Code-Part 2
In this Python and MongoDB Project for Beginners, you will learn how to use Apache Sedona and perform advanced analysis on the Transportation dataset.

Create A Data Pipeline based on Messaging Using PySpark Hive
In this PySpark project, you will simulate a complex real-world data pipeline based on messaging. This project is deployed using the following tech stack - NiFi, PySpark, Hive, HDFS, Kafka, Airflow, Tableau and AWS QuickSight.

Flask API Big Data Project using Databricks and Unity Catalog
In this Flask Project, you will use Flask APIs, Databricks, and Unity Catalog to build a secure data processing platform focusing on climate data. You will also explore advanced features like Docker containerization, data encryption, and detailed data lineage tracking.

PySpark ETL Project for Real-Time Data Processing
In this PySpark ETL Project, you will learn to build a data pipeline and perform ETL operations for Real-Time Data Processing

AWS Project - Build an ETL Data Pipeline on AWS EMR Cluster
Build a fully working scalable, reliable and secure AWS EMR complex data pipeline from scratch that provides support for all data stages from data collection to data analysis and visualization.

Azure Data Factory and Databricks End-to-End Project
Azure Data Factory and Databricks End-to-End Project to implement analytics on trip transaction data using Azure Services such as Data Factory, ADLS Gen2, and Databricks, with a focus on data transformation and pipeline resiliency.

Learn Efficient Multi-Source Data Processing with Talend ETL
In this Talend ETL Project , you will create a multi-source ETL Pipeline to load data from multiple sources such as MySQL Database, Azure Database, and API to Snowflake cloud using Talend Jobs.

Explore features of Spark SQL in practice on Spark 2.0
The goal of this spark project for students is to explore the features of Spark SQL in practice on the latest version of Spark i.e. Spark 2.0.