avatarEdwin Tan

Summary

The article provides a guide on creating a custom .equals() method for comparing PySpark DataFrames, which is essential for data integrity checks in large-scale data processing tasks.

Abstract

The article titled "PySpark DataFrame Comparison: A Must-Know Skill for Data Scientists & Engineers" addresses the lack of a built-in equality function in PySpark by demonstrating how to implement a custom .equals() method. This method is crucial for unit testing and ensuring the accuracy of data processing outputs, especially when dealing with large datasets that don't fit into the memory of a single machine. The custom function checks for equality in column names, data types, and data values between two DataFrames. The article includes code examples and test cases to illustrate the implementation and usage of the custom equality function, emphasizing the importance of data integrity and consistency in distributed data processing frameworks like PySpark.

Opinions

  • The author emphasizes the necessity of equality checks in data processing, particularly for unit testing.
  • Pandas' .equals() method is highlighted as a useful tool for smaller datasets, but the absence of a similar function in PySpark is seen as a limitation.
  • The custom .equals() function for PySpark is presented as a solution to ensure data integrity and consistency when working with massive datasets.
  • The article suggests that the ability to compare DataFrames is a fundamental skill for data professionals working with PySpark.
  • The author provides a comprehensive approach to comparing DataFrames by considering column names, data types, and values, ensuring a thorough comparison.
  • The use of monkey patching to add the custom .equals() method to PySpark's DataFrame class is suggested, indicating a preference for seamless integration with existing PySpark workflows.

PySpark DataFrame Comparison: A Must-Know Skill for Data Scientists & Engineers

How to create a custom .equals() method in Pyspark for comparing DataFrames

Photo by Randy Fath on Unsplash

Introduction

Checking for equality is commonly used for situations such as unit testing. In unit testing the tester verify that individual units or components of the code (such as functions or methods) produce the expected output for given inputs. When the code involves data processing, checking for equality of two data objects becomes a fundamental part of unit testing. When you have a function that processes data and produces an output, you can use unit tests to compare the function’s output with the expected output.

If you are a Pandas user, you might be familiar with the .equals() method. Pandas’ .equals() method allows users to test the output and expected Pandas DataFrames or Series for equality. In essence it is testing if the objects contain the same elements, in the same order and have the same shape. Here’s an example of how .equals() is used in a unit test.

import pandas as pd

# Example function to be tested
def add_one_to_column(df, column_name):
    df[column_name] = df[column_name] + 1
    return df
# Define a function for testing
def test_add_one_to_column():
    # Input DataFrame
    input_df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]})
    # Expected DataFrame after applying the function
    expected_df = pd.DataFrame({'A': [2, 3, 4], 'B': [4, 5, 6]})
    # Apply the function
    result_df = add_one_to_column(input_df.copy(), 'A')
    # Check if the result matches the expected DataFrame
    if result_df.equals(expected_df):
        print("Test passed: The output matches the expected result.")
    else:
        print("Test failed: The output does not match the expected result.")

# Run the test function
test_add_one_to_column()

While Python’s Pandas library is excellent for working with small to medium-sized datasets, PySpark, an open-source framework for distributed data processing and analysis, is useful when handling massive datasets that exceed the memory capacity of a single machine. However, PySpark lacks a built-in function for comparing PySpark DataFrames equality, limiting its ability to ensure data integrity and consistency. To address this gap, this article will demonstrate how to create a custom .equals() function for PySpark DataFrames. This will enable data scientists and engineers to validate and compare PySpark DataFrames.

Custom Equality Function

The custom .equals() function checks for equality by comparing the following:

  1. Column names
  2. Column data types
  3. Data values

Compare Column Names

This function check if the column names in two DataFrames are identical.

def compare_column_names(df1, df2):
    """
    Compare the column names of two PySpark DataFrames.
    
    Args:
        df1 (DataFrame): First DataFrame for comparison.
        df2 (DataFrame): Second DataFrame for comparison.
        
    Returns:
        bool: True if the column names are identical, False otherwise.
    """
    return df1.columns == df2.columns

Here is what’s going on:

  • df.columns: Retrieves the list of column names in each of the DataFrame
  • ==: Compares if the two list of column names are identical. It will return True if the list are identical and False if they are not. This comparison also takes into account of the column ordering.

Let’s create a positive and negative test case for the compare_column_names function.

# Positive Test Case
data1 = [(1, "Alice"), (2, "Bob")]
data2 = [(3, "Carol"), (4, "David")]

df1 = spark.createDataFrame(data1, ["ID", "Name"])
df2 = spark.createDataFrame(data2, ["ID", "Name"])
result_positive = compare_column_names(df1, df2)
print("Positive Test Case Result:", result_positive)
# Negative Test Case
data3 = [(1, "Alice", 25), (2, "Bob", 30)]
df3 = spark.createDataFrame(data3, ["ID", "Name", "Age"])
result_negative = compare_column_names(df1, df3)
print("Negative Test Case Result:", result_negative)

If the function works as expected, the positive test case will print True and negative test case will print False.

Compare Data Type

This function checks if the data types for the corresponding column names are identical.

def compare_column_data_types(df1, df2):
    """
    Compare column data types between two PySpark DataFrames.

    Args:
        df1 (DataFrame): The first DataFrame for comparison.
        df2 (DataFrame): The second DataFrame for comparison.
    Returns:
        bool: True if data types for corresponding column names are identical, False otherwise.
    """
    # Get column names and data types for both DataFrames
    schema1 = df1.dtypes
    schema2 = df2.dtypes
    # Check if data types are identical
    return schema1 == schema2

Here is what’s going on:

  • df.dtypes: Retrieves the list of (column name, data type) type from each of the DataFrame
  • ==: Compares if the two list of column names and data types are identical. It will return True if the list are identical and False if they are not. This comparison also takes into account of the column ordering.

Here’s a positive and negative test case for the compare_column_data_types function.

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Positive Test Case
schema1 = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Name", StringType(), True)])

schema2 = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Name", StringType(), True)])

df1 = spark.createDataFrame([], schema1)
df2 = spark.createDataFrame([], schema2)
result_positive = compare_column_data_types(df1, df2)
print("Positive Test Case Result:", result_positive)

# Negative Test Case
schema3 = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Age", IntegerType(), True)])
df3 = spark.createDataFrame([], schema3)
result_negative = compare_column_data_types(df1, df3)
print("Negative Test Case Result:", result_negative)

If the function works as expected, the positive test case will print True and negative test case will print False.

Compare Data Values

This function compares if the values of the element in the DataFrames are identical.

def compare_dataframe_values(df1, df2):
    """
    Compare the values of elements in two PySpark DataFrames.

    Args:
        df1 (DataFrame): The first DataFrame for comparison.
        df2 (DataFrame): The second DataFrame for comparison.
    Returns:
        bool: True if the values of elements in the DataFrames are identical, False otherwise.
    """
    # Collect data from both DataFrames as lists of rows
    data1 = df1.collect()
    data2 = df2.collect()
    # Compare the collected data
    return data1 == data2

Here’s what’s going on:

  • df.collect(): collects the into a list of rows. [Row(ID=1, Name='Alice'), Row(ID=2, Name='Bob')]
  • ==: Compares if the values in the two list are identical. It is worth noting that the ordering of elements in the list matters and the column names e.g. ID and Name are not compared.
from pyspark.sql import Row

# Positive Test Case
data1 = [Row(ID=1, Name="Alice"), Row(ID=2, Name="Bob")]
data2 = [Row(ID=1, Name="Alice"), Row(ID=2, Name="Bob")]
df1 = spark.createDataFrame(data1)
df2 = spark.createDataFrame(data2)
result_positive = compare_dataframe_values(df1, df2)
print("Positive Test Case Result:", result_positive)  # Should print True


# Negative Test Case
data3 = [Row(ID=1, Name="Alice"), Row(ID=3, Name="Carol")]
df3 = spark.createDataFrame(data3)
# Check if values are identical
result_negative = compare_dataframe_values(df1, df3)
print("Negative Test Case Result:", result_negative)

If the function works as expected, the positive test case will print True and negative test case will print False.

Putting it all together

Let’s put all of them together in a single .equals() function.

def equals(df1, df2):
  """
  Compare colum names, data types and values of two PySpark DataFrames.
  Args:
        df1 (DataFrame): First DataFrame for comparison.
        df2 (DataFrame): Second DataFrame for comparison.
  Returns:
      bool: True if both DataFrames are identical in terms of
            column names, column data types, and data values; False otherwise.
  """
  return (
    compare_column_names(df1, df2) and
    compare_column_data_types(df1, df2) and
    compare_dataframe_values(df1, df2))


# monkey patch
pyspark.sql.DataFrame.equals = equals

Let’s test it out!

# Positive Test Case
data1 = [(1, "Alice"), (2, "Bob")]
data2 = [(1, "Alice"), (2, "Bob")]

df1 = spark.createDataFrame(data1, ["ID", "Name"])
df2 = spark.createDataFrame(data2, ["ID", "Name"])
result_positive = df1.equals(df2)
print("Positive Test Case Result:", result_positive) # output: True
# Negative Test Case
data3 = [(1, "Alice"), (3, "Carol")]
df3 = spark.createDataFrame(data3, ["ID", "Name"])
result_negative = df1.equals(df3)
print("Negative Test Case Result:", result_negative) #output: False

The positive test case outputs True and negative test case outputs False.

We can also use .equals() in an assert statement.

assert df1.equals(df3), 'Dataframe not equal'

# output: AssertionError: Dataframe not equal

Conclusion

In this article, we explored the scenarios where an equality function is useful when working with PySpark DataFrames. We raised the need for a tool that checks for equality of column names, data types, and the actual data elements between two PySpark DataFrames. To fulfil this need, we crafted a custom equals() function that empowers data engineers, data scientists, and developers to verify the integrity of their PySpark DataFrames.

In Plain English

Thank you for being a part of our community! Before you go:

Pyspark
Data Science
Data Engineering
Pandas
Artificial Intelligence
Recommended from ReadMedium
avatarManpreet Buttar
Pyspark Dataframe and SQL

Setup

4 min read