Explain the Joins functions in PySpark in Databricks

This recipe explains what the Joins functions in PySpark in Databricks

Recipe Objective - Explain the Joins functions in PySpark in Databricks?

In PySpark, Join is widely and popularly used to combine the two DataFrames and by chaining these multiple DataFrames can be joined easily. The Join in PySpark supports all the basic join type operations available in the traditional SQL like INNER, LEFT OUTER, RIGHT OUTER, LEFT ANTI, LEFT SEMI, SELF JOIN, CROSS. The PySpark Joins are wider transformations that further involves the data shuffling across the network. The PySpark SQL Joins comes with more optimization by default however still there are some performance issues to consider while using it. The join() operation takes many parameters as input and returns the DataFrame. The "param other" parameter defines the right side of the join. The "param on" parameter defines a string for the join column name. The "param how" parameter defines default inner and must be one of inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_anti and left_semi. The Resilient Distributed Datasets or RDDs are defined as the fundamental data structure of Apache PySpark. It was developed by The Apache Software Foundation. It is the immutable distributed collection of objects. In RDD, each dataset is divided into logical partitions which may be computed on different nodes of the cluster. The RDDs concept was launched in the year 2011. The Dataset is defined as a data structure in the SparkSQL that is strongly typed and is a map to the relational schema. It represents the structured queries with encoders and is an extension to dataframe API. Spark Dataset provides both the type safety and object-oriented programming interface. The Datasets concept was launched in the year 2015.

Deploy an Auto Twitter Handle with Spark and Kafka

System Requirements

  • Python (3.0 version)
  • Apache Spark (3.1.1 version)

This recipe explains what are Joins and explaining their usage in PySpark.

Implementing the Joins in Databricks in PySpark

# Importing packages
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
Databricks-1

The Sparksession, col is imported in the environment to use Joins functions in the PySpark.

# Implementing the Joins in Databricks in PySpark
spark = SparkSession.builder.appName('Joins PySpark').getOrCreate()
Emp = [(1,"Ram",-2,"2019","11","M",2000), \
(2,"Pooja",2,"2011","30","F",5000), \
(3,"Shyam",1,"2011","20","M",2000), \
(4,"Madhavi",2,"2006","20","F",3000), \
(5,"Brown",2,"2011","30","",-2), \
(6,"Brown",2,"2011","40","",-2) \
]
Emp_Columns = ["emp_id","name","superior_emp_id","year_joined", \
"emp_dept_id","gender","salary"]
Emp_Dataframe = spark.createDataFrame(data = Emp, schema = Emp_Columns)
Emp_Dataframe.printSchema()
Emp_Dataframe.show(truncate=False)
Dept = [("Marketing",10), \
("Finance",20), \
("IT",30), \
("Sales",40) \
] Dept_Columns = ["dept_name","dept_id"]
Dept_Dataframe = spark.createDataFrame(data = Dept, schema = Dept_Columns)
Dept_Dataframe.printSchema()
Dept_Dataframe.show(truncate=False)
# Using inner join
Emp_Dataframe.join(Dept_Dataframe, Emp_Dataframe.emp_dept_id == Dept_Dataframe.dept_id,"inner") \
.show(truncate=False)
# Using outer join
Emp_Dataframe.join(Dept_Dataframe, Emp_Dataframe.emp_dept_id == Dept_Dataframe.dept_id,"outer") \
.show(truncate=False)
# Using full join
Emp_Dataframe.join(Dept_Dataframe, Emp_Dataframe.emp_dept_id == Dept_Dataframe.dept_id,"full") \
.show(truncate=False)
# Using full outer join
Emp_Dataframe.join(Dept_Dataframe, Emp_Dataframe.emp_dept_id == Dept_Dataframe.dept_id,"fullouter") \
.show(truncate=False)
# Using left join
Emp_Dataframe.join(Dept_Dataframe, Emp_Dataframe.emp_dept_id == Dept_Dataframe.dept_id,"left") \
.show(truncate=False)
# Using left outer join
Emp_Dataframe.join(Dept_Dataframe, Emp_Dataframe.emp_dept_id == Dept_Dataframe.dept_id,"leftouter") \
.show(truncate=False)
# Using right join
Emp_Dataframe.join(Dept_Dataframe, Emp_Dataframe.emp_dept_id == Dept_Dataframe.dept_id,"right") \
.show(truncate=False)
# Using right outer join
Emp_Dataframe.join(Dept_Dataframe, Emp_Dataframe.emp_dept_id == Dept_Dataframe.dept_id,"rightouter") \
.show(truncate=False)
# Using left semi join
Emp_Dataframe.join(Dept_Dataframe, Emp_Dataframe.emp_dept_id == Dept_Dataframe.dept_id,"leftsemi") \
.show(truncate=False)
# Using left anti join
Emp_Dataframe.join(Dept_Dataframe, Emp_Dataframe.emp_dept_id == Dept_Dataframe.dept_id,"leftanti") \
.show(truncate=False)
# Using self join
Emp_Dataframe.alias("emp1").join(Emp_Dataframe.alias("emp2"), \
col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
.select(col("emp1.emp_id"),col("emp1.name"), \
col("emp2.emp_id").alias("superior_emp_id"), \
col("emp2.name").alias("superior_emp_name")) \
.show(truncate=False)
Databricks-2

Databricks-3
Databricks-4
Databricks-5
Databricks-6
Databricks-7
Databricks-8
Databricks-9
Databricks-10
Databricks-11
Databricks-12
Databricks-13

The Spark Session is defined. The "Emp" and "Emp_Columns" are defined. The "Emp_Dataframe" is defined using the Emp and Emp Columns. Further, the "Dept" and the "Dept_Columns" are defined. The "Dept_Dataframe" is defined using the Dept and Dept Columns. The Emp_Dataframe and the Dept_Dataframe data frames are printed using the show() function. The Inner join is usually gets performed on two datasets on key columns where the keys don’t match the rows that get dropped from both the datasets (emp & dept) that is inner join drops “emp_dept_id” 50 from “emp” and “dept_id” 30 from “dept” datasets. The outer, full and full outer joins are performed and our “emp” dataset’s “emp_dept_id” with value 50 doesn’t have a record on “dept” that is dept columns have null and the “dept_id” 30 doesn’t have a record in “emp”, so it output null’s on the emp columns. The left outer join is performed and “emp_dept_id” so it doesn’t have a record on the “dept” dataset hence the record contains null on the “dept” columns (dept_name & dept_id) and “dept_id” 30 from “dept” dataset have been dropped from the results. The right, right outer join is performed and the right dataset “dept_id” 30 doesn’t have it on the left dataset “emp” therefore the record contains null on the “emp” columns and “emp_dept_id” 50 dropped as the match not found on the left. The left semi-join is similar to the inner join difference being left semi-join returns all the columns from a left dataset and ignores all the columns from the right dataset. The left anti join works the exact opposite of the left semi and returns only the columns from the left dataset for the non-matched records. The self-join is created using other joins and the emp dataset is joined with itself for finding superior emp_id and name for all the employees.

What Users are saying..

profile image

Anand Kumpatla

Sr Data Scientist @ Doubleslash Software Solutions Pvt Ltd
linkedin profile url

ProjectPro is a unique platform and helps many people in the industry to solve real-life problems with a step-by-step walkthrough of projects. A platform with some fantastic resources to gain... Read More

Relevant Projects

Build an Analytical Platform for eCommerce using AWS Services
In this AWS Big Data Project, you will use an eCommerce dataset to simulate the logs of user purchases, product views, cart history, and the user’s journey to build batch and real-time pipelines.

GCP Data Ingestion with SQL using Google Cloud Dataflow
In this GCP Project, you will learn to build a data processing pipeline With Apache Beam, Dataflow & BigQuery on GCP using Yelp Dataset.

SQL Project for Data Analysis using Oracle Database-Part 2
In this SQL Project for Data Analysis, you will learn to efficiently analyse data using JOINS and various other operations accessible through SQL in Oracle Database.

COVID-19 Data Analysis Project using Python and AWS Stack
COVID-19 Data Analysis Project using Python and AWS to build an automated data pipeline that processes COVID-19 data from Johns Hopkins University and generates interactive dashboards to provide insights into the pandemic for public health officials, researchers, and the general public.

SQL Project for Data Analysis using Oracle Database-Part 5
In this SQL Project for Data Analysis, you will learn to analyse data using various SQL functions like ROW_NUMBER, RANK, DENSE_RANK, SUBSTR, INSTR, COALESCE and NVL.

Build a Data Pipeline in AWS using NiFi, Spark, and ELK Stack
In this AWS Project, you will learn how to build a data pipeline Apache NiFi, Apache Spark, AWS S3, Amazon EMR cluster, Amazon OpenSearch, Logstash and Kibana.

Yelp Data Processing using Spark and Hive Part 2
In this spark project, we will continue building the data warehouse from the previous project Yelp Data Processing Using Spark And Hive Part 1 and will do further data processing to develop diverse data products.

Build an Incremental ETL Pipeline with AWS CDK
Learn how to build an Incremental ETL Pipeline with AWS CDK using Cryptocurrency data

Hive Mini Project to Build a Data Warehouse for e-Commerce
In this hive project, you will design a data warehouse for e-commerce application to perform Hive analytics on Sales and Customer Demographics data using big data tools such as Sqoop, Spark, and HDFS.

AWS Project for Batch Processing with PySpark on AWS EMR
In this AWS Project, you will learn how to perform batch processing on Wikipedia data with PySpark on AWS EMR.