PySpark DataFrame Comparison: A Must-Know Skill for Data Scientists & Engineers
How to create a custom .equals() method in Pyspark for comparing DataFrames
Introduction
Checking for equality is commonly used for situations such as unit testing. In unit testing the tester verify that individual units or components of the code (such as functions or methods) produce the expected output for given inputs. When the code involves data processing, checking for equality of two data objects becomes a fundamental part of unit testing. When you have a function that processes data and produces an output, you can use unit tests to compare the function’s output with the expected output.
If you are a Pandas user, you might be familiar with the .equals() method. Pandas’ .equals() method allows users to test the output and expected Pandas DataFrames or Series for equality. In essence it is testing if the objects contain the same elements, in the same order and have the same shape. Here’s an example of how .equals() is used in a unit test.
import pandas as pd
# Example function to be tested
def add_one_to_column(df, column_name):
df[column_name] = df[column_name] + 1
return df
# Define a function for testing
def test_add_one_to_column():
# Input DataFrame
input_df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]})
# Expected DataFrame after applying the function
expected_df = pd.DataFrame({'A': [2, 3, 4], 'B': [4, 5, 6]})
# Apply the function
result_df = add_one_to_column(input_df.copy(), 'A')
# Check if the result matches the expected DataFrame
if result_df.equals(expected_df):
print("Test passed: The output matches the expected result.")
else:
print("Test failed: The output does not match the expected result.")
# Run the test function
test_add_one_to_column()While Python’s Pandas library is excellent for working with small to medium-sized datasets, PySpark, an open-source framework for distributed data processing and analysis, is useful when handling massive datasets that exceed the memory capacity of a single machine. However, PySpark lacks a built-in function for comparing PySpark DataFrames equality, limiting its ability to ensure data integrity and consistency. To address this gap, this article will demonstrate how to create a custom .equals() function for PySpark DataFrames. This will enable data scientists and engineers to validate and compare PySpark DataFrames.
Custom Equality Function
The custom .equals() function checks for equality by comparing the following:
- Column names
- Column data types
- Data values
Compare Column Names
This function check if the column names in two DataFrames are identical.
def compare_column_names(df1, df2):
"""
Compare the column names of two PySpark DataFrames.
Args:
df1 (DataFrame): First DataFrame for comparison.
df2 (DataFrame): Second DataFrame for comparison.
Returns:
bool: True if the column names are identical, False otherwise.
"""
return df1.columns == df2.columnsHere is what’s going on:
df.columns: Retrieves the list of column names in each of the DataFrame==: Compares if the two list of column names are identical. It will returnTrueif the list are identical andFalseif they are not. This comparison also takes into account of the column ordering.
Let’s create a positive and negative test case for the compare_column_names function.
# Positive Test Case
data1 = [(1, "Alice"), (2, "Bob")]
data2 = [(3, "Carol"), (4, "David")]
df1 = spark.createDataFrame(data1, ["ID", "Name"])
df2 = spark.createDataFrame(data2, ["ID", "Name"])
result_positive = compare_column_names(df1, df2)
print("Positive Test Case Result:", result_positive)
# Negative Test Case
data3 = [(1, "Alice", 25), (2, "Bob", 30)]
df3 = spark.createDataFrame(data3, ["ID", "Name", "Age"])
result_negative = compare_column_names(df1, df3)
print("Negative Test Case Result:", result_negative)If the function works as expected, the positive test case will print True and negative test case will print False.
Compare Data Type
This function checks if the data types for the corresponding column names are identical.
def compare_column_data_types(df1, df2):
"""
Compare column data types between two PySpark DataFrames.
Args:
df1 (DataFrame): The first DataFrame for comparison.
df2 (DataFrame): The second DataFrame for comparison.
Returns:
bool: True if data types for corresponding column names are identical, False otherwise.
"""
# Get column names and data types for both DataFrames
schema1 = df1.dtypes
schema2 = df2.dtypes
# Check if data types are identical
return schema1 == schema2
Here is what’s going on:
df.dtypes: Retrieves the list of (column name, data type) type from each of the DataFrame==: Compares if the two list of column names and data types are identical. It will returnTrueif the list are identical andFalseif they are not. This comparison also takes into account of the column ordering.
Here’s a positive and negative test case for the compare_column_data_types function.
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# Positive Test Case
schema1 = StructType([
StructField("ID", IntegerType(), True),
StructField("Name", StringType(), True)])
schema2 = StructType([
StructField("ID", IntegerType(), True),
StructField("Name", StringType(), True)])
df1 = spark.createDataFrame([], schema1)
df2 = spark.createDataFrame([], schema2)
result_positive = compare_column_data_types(df1, df2)
print("Positive Test Case Result:", result_positive)
# Negative Test Case
schema3 = StructType([
StructField("ID", IntegerType(), True),
StructField("Age", IntegerType(), True)])
df3 = spark.createDataFrame([], schema3)
result_negative = compare_column_data_types(df1, df3)
print("Negative Test Case Result:", result_negative)If the function works as expected, the positive test case will print True and negative test case will print False.
Compare Data Values
This function compares if the values of the element in the DataFrames are identical.
def compare_dataframe_values(df1, df2):
"""
Compare the values of elements in two PySpark DataFrames.
Args:
df1 (DataFrame): The first DataFrame for comparison.
df2 (DataFrame): The second DataFrame for comparison.
Returns:
bool: True if the values of elements in the DataFrames are identical, False otherwise.
"""
# Collect data from both DataFrames as lists of rows
data1 = df1.collect()
data2 = df2.collect()
# Compare the collected data
return data1 == data2
Here’s what’s going on:
df.collect(): collects the into a list of rows.[Row(ID=1, Name='Alice'), Row(ID=2, Name='Bob')]==: Compares if the values in the two list are identical. It is worth noting that the ordering of elements in the list matters and the column names e.g.IDandNameare not compared.
from pyspark.sql import Row
# Positive Test Case
data1 = [Row(ID=1, Name="Alice"), Row(ID=2, Name="Bob")]
data2 = [Row(ID=1, Name="Alice"), Row(ID=2, Name="Bob")]
df1 = spark.createDataFrame(data1)
df2 = spark.createDataFrame(data2)
result_positive = compare_dataframe_values(df1, df2)
print("Positive Test Case Result:", result_positive) # Should print True
# Negative Test Case
data3 = [Row(ID=1, Name="Alice"), Row(ID=3, Name="Carol")]
df3 = spark.createDataFrame(data3)
# Check if values are identical
result_negative = compare_dataframe_values(df1, df3)
print("Negative Test Case Result:", result_negative)If the function works as expected, the positive test case will print True and negative test case will print False.
Putting it all together
Let’s put all of them together in a single .equals() function.
def equals(df1, df2):
"""
Compare colum names, data types and values of two PySpark DataFrames.
Args:
df1 (DataFrame): First DataFrame for comparison.
df2 (DataFrame): Second DataFrame for comparison.
Returns:
bool: True if both DataFrames are identical in terms of
column names, column data types, and data values; False otherwise.
"""
return (
compare_column_names(df1, df2) and
compare_column_data_types(df1, df2) and
compare_dataframe_values(df1, df2))
# monkey patch
pyspark.sql.DataFrame.equals = equalsLet’s test it out!
# Positive Test Case
data1 = [(1, "Alice"), (2, "Bob")]
data2 = [(1, "Alice"), (2, "Bob")]
df1 = spark.createDataFrame(data1, ["ID", "Name"])
df2 = spark.createDataFrame(data2, ["ID", "Name"])
result_positive = df1.equals(df2)
print("Positive Test Case Result:", result_positive) # output: True
# Negative Test Case
data3 = [(1, "Alice"), (3, "Carol")]
df3 = spark.createDataFrame(data3, ["ID", "Name"])
result_negative = df1.equals(df3)
print("Negative Test Case Result:", result_negative) #output: FalseThe positive test case outputs True and negative test case outputs False.
We can also use .equals() in an assert statement.
assert df1.equals(df3), 'Dataframe not equal'
# output: AssertionError: Dataframe not equalConclusion
In this article, we explored the scenarios where an equality function is useful when working with PySpark DataFrames. We raised the need for a tool that checks for equality of column names, data types, and the actual data elements between two PySpark DataFrames. To fulfil this need, we crafted a custom equals() function that empowers data engineers, data scientists, and developers to verify the integrity of their PySpark DataFrames.
In Plain English
Thank you for being a part of our community! Before you go:
- Be sure to clap and follow the writer! 👏
- You can find even more content at PlainEnglish.io 🚀
- Sign up for our free weekly newsletter. 🗞️
- Follow us on Twitter(X), LinkedIn, YouTube, and Discord.





