Data aggregation on dataframe grouped on multiple key pyspark

This recipe helps you perform data aggregation on a DataFrame grouped on multiple keys in pyspark. PySpark is an interface for Apache Spark in Python and it supports most of Spark’s features such as Spark SQL, DataFrame.

Recipe Objective: How to perform data aggregation on a DataFrame, grouped on multiple keys in pyspark?

Data merging and data aggregation are essential parts of big data platforms' day-to-day activities in most big data scenarios. In this scenario, we will perform data aggregation on a DataFrame, grouped on multiple keys.

Build Log Analytics Application with Spark Streaming and Kafka

System requirements :

  • Install Ubuntu in the virtual machine click here
  • Install single-node Hadoop machine click here
  • Install pyspark or spark in Ubuntu click here
  • The below codes can be run in Jupyter notebook or any python console.

Step 1: Prepare a Dataset

Here we use the employee-related comma-separated values (CSV) dataset to read in jupyter notebook from the local. Download the CSV file into your local download, download the data set which we are using in this scenario

The output of the dataset:

bigdata_1.jpg

Step 2: Import the modules

In this scenario, we are going to import the pyspark and pyspark SQL modules and create a spark session as below :

import pyspark from pyspark.sql import SparkSession from pyspark.sql.types import StructType,StructField, StringType, IntegerType from pyspark.sql.types import ArrayType, DoubleType, BooleanType from pyspark.sql.functions import col,array_contains

Create a spark session as below

spark = SparkSession.builder.appName('Aggregation on multi cols').getOrCreate()

Read the CSV file below

Salesdf = spark.read.csv("/home/bigdata/Downloads/salesrecords.csv") Salesdf.printSchema()

The output of the above lines:

bigdata_2.jpg

Step 3: Create a schema

Here we are creating a StructField for each column. Then we will develop a schema of the full DataFrame. We can make that using a StructType object, as the following code line as below:

schema = StructType() \ .add("Region",StringType(),True) \ .add("Country",StringType(),True) \ .add("Item Type",StringType(),True) \ .add("Sales Channel",StringType(),True) \ .add("Order Priority",StringType(),True) \ .add("Order ID",IntegerType(),True) \ .add("Units Sold",IntegerType(),True) \ .add("Unit Price",DoubleType(),True) \ .add("Unit Cost",DoubleType(),True) \ .add("Total Revenue",DoubleType(),True) \ .add("Total Cost",DoubleType(),True) \ .add("Total Profit",DoubleType(),True)

Step 4: Read CSV file

Here we are going to read the CSV file from local where we downloaded the file, and also we are specifying the above-created schema to CSV file as below code:

SalesSchemaDf=spark.read.format("csv") \ .option("header", True) \ .schema(schema) \ .load("/home/bigdata/Downloads/salesrecords.csv") SalesSchemaDf.printSchema()

The output of the above lines

bigdata_3.jpg

To show the top 4 lines from the dataframe

SalesSchemaDf.show(4)

The output of the above line

bigdata_4.jpg

Step 5: To Perform Aggregation data on multiple column key

We are going to group data on the gender column using the groupby() function. After that, we are going to apply the mean() function.

To Calculating mean value group by Region and country as shown in below

groupOnRegionCountry =SalesSchemaDf.groupby( ["Region" ,"Country"] ).mean() groupOnRegionCountry.show(5)

The output of the above lines:

bigdata_5.jpg

To calculate the sum of total profit using sum() function and group by groupby() function

sumofTotalprofit =SalesSchemaDf.groupby("Country").sum("Total Profit") sumofTotalprofit.show()

The output of the above lines:

bigdata_6.jpg

Here we are going to find out the distinct count of the Region, country, and ItemType as shown below

from pyspark.sql.functions import countDistinct dist_counts = SalesSchemaDf.select(countDistinct("Region" ,"Country","Item Type")) dist_counts.show(5)

The output of the above code:

bigdata_7.jpg

Conclusion

Here we learned to perform data aggregation on a DataFrame, grouped on multiple keys in pyspark.

What Users are saying..

profile image

Ameeruddin Mohammed

ETL (Abintio) developer at IBM
linkedin profile url

I come from a background in Marketing and Analytics and when I developed an interest in Machine Learning algorithms, I did multiple in-class courses from reputed institutions though I got good... Read More

Relevant Projects

Analyse Yelp Dataset with Spark & Parquet Format on Azure Databricks
In this Databricks Azure project, you will use Spark & Parquet file formats to analyse the Yelp reviews dataset. As part of this you will deploy Azure data factory, data pipelines and visualise the analysis.

PySpark Tutorial - Learn to use Apache Spark with Python
PySpark Project-Get a handle on using Python with Spark through this hands-on data processing spark python tutorial.

Build a Scalable Event Based GCP Data Pipeline using DataFlow
In this GCP project, you will learn to build and deploy a fully-managed(serverless) event-driven data pipeline on GCP using services like Cloud Composer, Google Cloud Storage (GCS), Pub-Sub, Cloud Functions, BigQuery, BigTable

AWS CDK and IoT Core for Migrating IoT-Based Data to AWS
Learn how to use AWS CDK and various AWS services to replicate an On-Premise Data Center infrastructure by ingesting real-time IoT-based.

Web Server Log Processing using Hadoop in Azure
In this big data project, you will use Hadoop, Flume, Spark and Hive to process the Web Server logs dataset to glean more insights on the log data.

Azure Data Factory and Databricks End-to-End Project
Azure Data Factory and Databricks End-to-End Project to implement analytics on trip transaction data using Azure Services such as Data Factory, ADLS Gen2, and Databricks, with a focus on data transformation and pipeline resiliency.

SQL Project for Data Analysis using Oracle Database-Part 4
In this SQL Project for Data Analysis, you will learn to efficiently write queries using WITH clause and analyse data using SQL Aggregate Functions and various other operators like EXISTS, HAVING.

Build an ETL Pipeline with Talend for Export of Data from Cloud
In this Talend ETL Project, you will build an ETL pipeline using Talend to export employee data from the Snowflake database and investor data from the Azure database, combine them using a Loop-in mechanism, filter the data for each sales representative, and export the result as a CSV file.

SQL Project for Data Analysis using Oracle Database-Part 7
In this SQL project, you will learn to perform various data wrangling activities on an ecommerce database.

Python and MongoDB Project for Beginners with Source Code-Part 1
In this Python and MongoDB Project, you learn to do data analysis using PyMongo on MongoDB Atlas Cluster.