Explain the Joins functions in PySpark in Databricks

This recipe explains what the Joins functions in PySpark in Databricks

Recipe Objective - Explain the Joins functions in PySpark in Databricks?

In PySpark, Join is widely and popularly used to combine the two DataFrames and by chaining these multiple DataFrames can be joined easily. The Join in PySpark supports all the basic join type operations available in the traditional SQL like INNER, LEFT OUTER, RIGHT OUTER, LEFT ANTI, LEFT SEMI, SELF JOIN, CROSS. The PySpark Joins are wider transformations that further involves the data shuffling across the network. The PySpark SQL Joins comes with more optimization by default however still there are some performance issues to consider while using it. The join() operation takes many parameters as input and returns the DataFrame. The "param other" parameter defines the right side of the join. The "param on" parameter defines a string for the join column name. The "param how" parameter defines default inner and must be one of inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_anti and left_semi. The Resilient Distributed Datasets or RDDs are defined as the fundamental data structure of Apache PySpark. It was developed by The Apache Software Foundation. It is the immutable distributed collection of objects. In RDD, each dataset is divided into logical partitions which may be computed on different nodes of the cluster. The RDDs concept was launched in the year 2011. The Dataset is defined as a data structure in the SparkSQL that is strongly typed and is a map to the relational schema. It represents the structured queries with encoders and is an extension to dataframe API. Spark Dataset provides both the type safety and object-oriented programming interface. The Datasets concept was launched in the year 2015.

Deploy an Auto Twitter Handle with Spark and Kafka

System Requirements

  • Python (3.0 version)
  • Apache Spark (3.1.1 version)

This recipe explains what are Joins and explaining their usage in PySpark.

Implementing the Joins in Databricks in PySpark

# Importing packages
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
Databricks-1

The Sparksession, col is imported in the environment to use Joins functions in the PySpark.

# Implementing the Joins in Databricks in PySpark
spark = SparkSession.builder.appName('Joins PySpark').getOrCreate()
Emp = [(1,"Ram",-2,"2019","11","M",2000), \
(2,"Pooja",2,"2011","30","F",5000), \
(3,"Shyam",1,"2011","20","M",2000), \
(4,"Madhavi",2,"2006","20","F",3000), \
(5,"Brown",2,"2011","30","",-2), \
(6,"Brown",2,"2011","40","",-2) \
]
Emp_Columns = ["emp_id","name","superior_emp_id","year_joined", \
"emp_dept_id","gender","salary"]
Emp_Dataframe = spark.createDataFrame(data = Emp, schema = Emp_Columns)
Emp_Dataframe.printSchema()
Emp_Dataframe.show(truncate=False)
Dept = [("Marketing",10), \
("Finance",20), \
("IT",30), \
("Sales",40) \
] Dept_Columns = ["dept_name","dept_id"]
Dept_Dataframe = spark.createDataFrame(data = Dept, schema = Dept_Columns)
Dept_Dataframe.printSchema()
Dept_Dataframe.show(truncate=False)
# Using inner join
Emp_Dataframe.join(Dept_Dataframe, Emp_Dataframe.emp_dept_id == Dept_Dataframe.dept_id,"inner") \
.show(truncate=False)
# Using outer join
Emp_Dataframe.join(Dept_Dataframe, Emp_Dataframe.emp_dept_id == Dept_Dataframe.dept_id,"outer") \
.show(truncate=False)
# Using full join
Emp_Dataframe.join(Dept_Dataframe, Emp_Dataframe.emp_dept_id == Dept_Dataframe.dept_id,"full") \
.show(truncate=False)
# Using full outer join
Emp_Dataframe.join(Dept_Dataframe, Emp_Dataframe.emp_dept_id == Dept_Dataframe.dept_id,"fullouter") \
.show(truncate=False)
# Using left join
Emp_Dataframe.join(Dept_Dataframe, Emp_Dataframe.emp_dept_id == Dept_Dataframe.dept_id,"left") \
.show(truncate=False)
# Using left outer join
Emp_Dataframe.join(Dept_Dataframe, Emp_Dataframe.emp_dept_id == Dept_Dataframe.dept_id,"leftouter") \
.show(truncate=False)
# Using right join
Emp_Dataframe.join(Dept_Dataframe, Emp_Dataframe.emp_dept_id == Dept_Dataframe.dept_id,"right") \
.show(truncate=False)
# Using right outer join
Emp_Dataframe.join(Dept_Dataframe, Emp_Dataframe.emp_dept_id == Dept_Dataframe.dept_id,"rightouter") \
.show(truncate=False)
# Using left semi join
Emp_Dataframe.join(Dept_Dataframe, Emp_Dataframe.emp_dept_id == Dept_Dataframe.dept_id,"leftsemi") \
.show(truncate=False)
# Using left anti join
Emp_Dataframe.join(Dept_Dataframe, Emp_Dataframe.emp_dept_id == Dept_Dataframe.dept_id,"leftanti") \
.show(truncate=False)
# Using self join
Emp_Dataframe.alias("emp1").join(Emp_Dataframe.alias("emp2"), \
col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
.select(col("emp1.emp_id"),col("emp1.name"), \
col("emp2.emp_id").alias("superior_emp_id"), \
col("emp2.name").alias("superior_emp_name")) \
.show(truncate=False)
Databricks-2

Databricks-3
Databricks-4
Databricks-5
Databricks-6
Databricks-7
Databricks-8
Databricks-9
Databricks-10
Databricks-11
Databricks-12
Databricks-13

The Spark Session is defined. The "Emp" and "Emp_Columns" are defined. The "Emp_Dataframe" is defined using the Emp and Emp Columns. Further, the "Dept" and the "Dept_Columns" are defined. The "Dept_Dataframe" is defined using the Dept and Dept Columns. The Emp_Dataframe and the Dept_Dataframe data frames are printed using the show() function. The Inner join is usually gets performed on two datasets on key columns where the keys don’t match the rows that get dropped from both the datasets (emp & dept) that is inner join drops “emp_dept_id” 50 from “emp” and “dept_id” 30 from “dept” datasets. The outer, full and full outer joins are performed and our “emp” dataset’s “emp_dept_id” with value 50 doesn’t have a record on “dept” that is dept columns have null and the “dept_id” 30 doesn’t have a record in “emp”, so it output null’s on the emp columns. The left outer join is performed and “emp_dept_id” so it doesn’t have a record on the “dept” dataset hence the record contains null on the “dept” columns (dept_name & dept_id) and “dept_id” 30 from “dept” dataset have been dropped from the results. The right, right outer join is performed and the right dataset “dept_id” 30 doesn’t have it on the left dataset “emp” therefore the record contains null on the “emp” columns and “emp_dept_id” 50 dropped as the match not found on the left. The left semi-join is similar to the inner join difference being left semi-join returns all the columns from a left dataset and ignores all the columns from the right dataset. The left anti join works the exact opposite of the left semi and returns only the columns from the left dataset for the non-matched records. The self-join is created using other joins and the emp dataset is joined with itself for finding superior emp_id and name for all the employees.

What Users are saying..

profile image

Ameeruddin Mohammed

ETL (Abintio) developer at IBM
linkedin profile url

I come from a background in Marketing and Analytics and when I developed an interest in Machine Learning algorithms, I did multiple in-class courses from reputed institutions though I got good... Read More

Relevant Projects

SQL Project for Data Analysis using Oracle Database-Part 2
In this SQL Project for Data Analysis, you will learn to efficiently analyse data using JOINS and various other operations accessible through SQL in Oracle Database.

SQL Project for Data Analysis using Oracle Database-Part 5
In this SQL Project for Data Analysis, you will learn to analyse data using various SQL functions like ROW_NUMBER, RANK, DENSE_RANK, SUBSTR, INSTR, COALESCE and NVL.

Retail Analytics Project Example using Sqoop, HDFS, and Hive
This Project gives a detailed explanation of How Data Analytics can be used in the Retail Industry, using technologies like Sqoop, HDFS, and Hive.

Snowflake Real Time Data Warehouse Project for Beginners-1
In this Snowflake Data Warehousing Project, you will learn to implement the Snowflake architecture and build a data warehouse in the cloud to deliver business value.

Data Processing and Transformation in Hive using Azure VM
Hive Practice Example - Explore hive usage efficiently for data transformation and processing in this big data project using Azure VM.

AWS CDK Project for Building Real-Time IoT Infrastructure
AWS CDK Project for Beginners to Build Real-Time IoT Infrastructure and migrate and analyze data to

Build an AWS ETL Data Pipeline in Python on YouTube Data
AWS Project - Learn how to build ETL Data Pipeline in Python on YouTube Data using Athena, Glue and Lambda

Hadoop Project to Perform Hive Analytics using SQL and Scala
In this hadoop project, learn about the features in Hive that allow us to perform analytical queries over large datasets.

Build Serverless Pipeline using AWS CDK and Lambda in Python
In this AWS Data Engineering Project, you will learn to build a serverless pipeline using AWS CDK and other AWS serverless technologies like AWS Lambda and Glue.

Build an Incremental ETL Pipeline with AWS CDK
Learn how to build an Incremental ETL Pipeline with AWS CDK using Cryptocurrency data