Explain collectset and collectlist aggregate functions in PySpark

The following tutorial explains how PySpark's collectset and collectlist aggregate functions work in Databricks, also the difference between both the functions is explained thoroughly in this tutorial

Recipe Objective - Explain collect_set() and collect_list() aggregate functions in PySpark in Databricks?

The Aggregate functions in Apache PySpark accept input as the Column type or the column name in the string and follow several other arguments based on the process and returning the Column type. The Aggregate functions operate on the group of rows and calculate the single return value for every group. The PySpark SQL Aggregate functions are further grouped as the “agg_funcs” in the Pyspark. The collect_set() function returns all values from the present input column with the duplicate values eliminated. The collect_list() function returns all the current input column values with the duplicates.

System Requirements

  • Python (3.0 version)
  • Apache Spark (3.1.1 version)

This recipe explains what are collect_set() and collect_list() functions and how to perform them in PySpark.

Implementing the collect_set() and collect_list() functions in Databricks in PySpark

# Importing packages
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_set, collect_list
Databricks-1

The Sparksession, collect_set and collect_list packages are imported in the environment so as to perform first() and last() functions in PySpark.

Explore SQL Database Projects to Add them to Your Data Engineer Resume.

# Implementing the collect_set() and collect_list() functions in Databricks in PySpark
spark = SparkSession.builder.appName('PySpark collect_set() and collect_list()').getOrCreate()
Sample_Data = [("Rahul", "Technology", 8000),
("Prateek", "Finance", 7600),
("Ram", "Sales", 5100),
("Reetu", "Marketing", 4000),
("Himesh", "Sales", 2000),
("Shyam", "Finance", 3500),
("Harsh", "Finance", 4900),
("Ramesh", "Marketing", 4000),
("Raina", "Marketing", 3000),
("Ankit", "Sales", 5100)
]
Sample_schema = ["employee_name", "department", "salary"]
dataframe = spark.createDataFrame(data = Sample_Data, schema = Sample_schema)
dataframe.printSchema()
dataframe.show(truncate=False)
# Using collect_set() function
dataframe.select(collect_set("salary")).show(truncate=False)
# Using collect_list() function
dataframe.select(collect_list("salary")).show(truncate=False)
Databricks-2

Databricks-3
Databricks-4

The "dataframe" value is created in which the Sample_data and Sample_schema are defined. The collect_set() function returns all values from the present salary column with the duplicate values eliminated. The last() function returns all the current salary column values with the duplicates.

What Users are saying..

profile image

Ed Godalle

Director Data Analytics at EY / EY Tech
linkedin profile url

I am the Director of Data Analytics with over 10+ years of IT experience. I have a background in SQL, Python, and Big Data working with Accenture, IBM, and Infosys. I am looking to enhance my skills... Read More

Relevant Projects

GCP Project to Learn using BigQuery for Exploring Data
Learn using GCP BigQuery for exploring and preparing data for analysis and transformation of your datasets.

Big Data Project for Solving Small File Problem in Hadoop Spark
This big data project focuses on solving the small file problem to optimize data processing efficiency by leveraging Apache Hadoop and Spark within AWS EMR by implementing and demonstrating effective techniques for handling large numbers of small files.

Deploying auto-reply Twitter handle with Kafka, Spark and LSTM
Deploy an Auto-Reply Twitter Handle that replies to query-related tweets with a trackable ticket ID generated based on the query category predicted using LSTM deep learning model.

SQL Project for Data Analysis using Oracle Database-Part 7
In this SQL project, you will learn to perform various data wrangling activities on an ecommerce database.

SQL Project for Data Analysis using Oracle Database-Part 2
In this SQL Project for Data Analysis, you will learn to efficiently analyse data using JOINS and various other operations accessible through SQL in Oracle Database.

GCP Data Ingestion with SQL using Google Cloud Dataflow
In this GCP Project, you will learn to build a data processing pipeline With Apache Beam, Dataflow & BigQuery on GCP using Yelp Dataset.

Databricks Real-Time Streaming with Event Hubs and Snowflake
In this Azure Databricks Project, you will learn to use Azure Databricks, Event Hubs, and Snowflake to process and analyze real-time data, specifically in monitoring IoT devices.

Build an ETL Pipeline with Talend for Export of Data from Cloud
In this Talend ETL Project, you will build an ETL pipeline using Talend to export employee data from the Snowflake database and investor data from the Azure database, combine them using a Loop-in mechanism, filter the data for each sales representative, and export the result as a CSV file.

A Hands-On Approach to Learn Apache Spark using Scala
Get Started with Apache Spark using Scala for Big Data Analysis

Talend Real-Time Project for ETL Process Automation
In this Talend Project, you will learn how to build an ETL pipeline in Talend Open Studio to automate the process of File Loading and Processing.