Natural Language Processing with PySpark and Spark-NLP
Diving in to the text of Financial Services Consumer Complaints
The question: what words (from complaints) are distinctly Equifax-y?
Today we’re diving deeper into the US Consumer Financial Protection Bureau’s Financial Services Consumer Complaint database to look at the text of the complaints filed against companies. The question: what words (from complaints) are distinctly Equifax-y? We’re going to be looking at text cleaning, tokenization, and lemming with Spark-NLP, counting with PySpark, and tf-idf (term frequency-inverse document frequency) analysis.
I used John Snow LABS’ Spark-NLP library. You can learn more from them, or wikipedia (probably also written by them, but in a different style).
Installing Spark-NLP
John Snow LABS provides a couple of different quick start guides — here and here — that I found useful together.
If you haven’t already installed PySpark (note: PySpark version 2.4.4 is the only supported version):
$ conda install pyspark==2.4.4
$ conda install -c johnsnowlabs spark-nlpIf you already have PySpark, make sure to install spark-nlp in the same channel as PySpark (you can check the channel from conda list). In my case, PySpark is installed on my conda-forge channel, so I used
$ conda install -c johnsnowlabs spark-nlp — channel conda-forgeI already had PySpark installed and set up for use with Jupyter notebooks, but if you don’t, you may need to set some additional environment variables in the terminal (as mentioned in the second quick start guide, but not the first, so …)
$ export SPARK_HOME=/path/to/your/spark/folder
$ export PYSPARK_PYTHON=python3
$ export PYSPARK_DRIVER_PYTHON=jupyter
$ export PYSPARK_DRIVER_PYTHON_OPTS=notebookGetting Started with Spark-NLP
If you are looking to play around with pre-installed data sets, and therefore don’t need to access the spark session, you can get started with the following two lines:
import sparknlp
sparknlp.start()In my case, I need the SparkSession to load my data from the parquet file, so I’ll add .config(“spark.jars.packages”, “com.johnsnowlabs.nlp:spark-nlp_2.11:2.3.5”) to my SparkSession.builder
from pyspark.sql import SparkSession# start spark session configured for spark nlp
spark = SparkSession.builder \
.master('local[*]') \
.appName('Spark NLP') \
.config('spark.jars.packages',
'com.johnsnowlabs.nlp:spark-nlp_2.11:2.3.5') \
.getOrCreate()That’s it! You’re up and going!
Stop Words
Spark-NLP does not come with a built-in stop words dictionary, so I chose to use the NLTK English Language stop words, as well as the ‘xxxx’ redacting string found in my data set.
from nltk.corpus import stopwordseng_stopwords = stopwords.words('english')
eng_stopwords.append('xxxx')Setting up your text manipulation pipeline
To start, import your tools:
from sparknlp.base import Finisher, DocumentAssembler
from sparknlp.annotator import (Tokenizer, Normalizer,
LemmatizerModel, StopWordsCleaner)
from pyspark.ml import PipelineMost projects are going to need DocumentAssembler to convert the text into a Spark-NLP annotator-ready form at the beginning, and Finisher to convert back to human-readable form at the end. You can select the annotators you need from the annotator docs.
Before you set up the pipeline, we need to initialize the annotators with the proper inputs. We’ll use the typical Spark ML format of .setInputCols(list of columns) and .setOutputCol(output column name), along with other functions specific to the annotator (see docs). Each output column will be the input column for the following annotator.
documentAssembler = DocumentAssembler() \
.setInputCol('consumer_complaint_narrative') \
.setOutputCol('document')tokenizer = Tokenizer() \
.setInputCols(['document']) \
.setOutputCol('token')# note normalizer defaults to changing all words to lowercase.
# Use .setLowercase(False) to maintain input case.
normalizer = Normalizer() \
.setInputCols(['token']) \
.setOutputCol('normalized') \
.setLowercase(True)# note that lemmatizer needs a dictionary. So I used the pre-trained
# model (note that it defaults to english)
lemmatizer = LemmatizerModel.pretrained() \
.setInputCols(['normalized']) \
.setOutputCol('lemma')stopwords_cleaner = StopWordsCleaner() \
.setInputCols(['lemma']) \
.setOutputCol('clean_lemma') \
.setCaseSensitive(False) \
.setStopWords(eng_stopwords)# finisher converts tokens to human-readable output
finisher = Finisher() \
.setInputCols(['clean_lemma']) \
.setCleanAnnotations(False)Now we’re ready to define the pipeline:
pipeline = Pipeline() \
.setStages([
documentAssembler,
tokenizer,
normalizer,
lemmatizer,
stopwords_cleaner,
finisher
])Using the pipeline
I import and select my data, and then use pipeline.fit(data).transform(data). For example:
# import data
df = spark.read.load('../data/consumer_complaints.parquet',
inferSchema='true', header='true')# select equifax text data as test
data = df.filter((df['company'] == 'EQUIFAX, INC.')
& (df['consumer_complaint_narrative'].isNull() == False))
data = data.select('consumer_complaint_narrative')# transform text with the pipeline
equifax = pipeline.fit(data).transform(data)This returns a DataFrame with the added columns specified in the pipeline’s annotators. So equifax.columns returns:
['consumer_complaint_narrative',
'document',
'token',
'normalized',
'lemma',
'clean_lemma',
'finished_clean_lemma']When we look more closely at the output of the finisher, in this case the “finished_clean_lemma”, we see that each record is a list of words — eg. [address, never, …], [pay, satisfied, …].
Count vectorizing the text
In order to get each word on the same level, I used the pyspark.sql explode function.
from pyspark.sql.functions import explode, colequifax_words = equifax_words.withColumn('exploded_text',
explode(col('finished_clean_lemma')))Now the text is ready to .groupby().count() to get the count for each word.
I then converted the result to pandas and used a dictionary comprehension to convert the table into a dictionary (this may not be the most elegant strategy).
counts = equifax_words.groupby('exploded_text').count()
counts_pd = counts.toPandas()
equifax_dict = {counts_pd.loc[i, 'exploded_text']:
counts_pd.loc[i, 'count']
for i in range(counts_pd.shape[0])}Full disclosure: even using Spark running on all four cores, doing this for the top 20 complaint earners took a significant amount of time — this is a lot of computation!
Tf-idf
Now that I have the text from each company’s complaints count vectorized (aka converted into a dictionary of {word1: count1, word2: count2…}), I am ready to get the tf-idf for each word in each company’s word set.
Helper functions:
def term_frequency(BoW_dict):
tot_words = sum(BoW_dict.values())
freq_dict = {word: BoW_dict[word]/tot_words
for word in BoW_dict.keys()}
return freq_dictfrom math import logdef inverse_document_frequency(list_of_dicts):
tot_docs = len(list_of_dicts)
words = set([w for w_dict in list_of_dicts
for w in w_dict.keys()])
idf_dict = {word: log(float(tot_docs)/
(1.0 + sum([1 for w_dict in list_of_dicts
if word in w_dict.keys()])))
for word in words}
return idf_dictdef tf_idf(list_of_dicts):
words = set([w for w_dict in list_of_dicts
for w in w_dict.keys()])
tf_idf_dicts = []
idfs = inverse_document_frequency(list_of_dicts)
for i, w_dict in enumerate(list_of_dicts):
w_dict.update({word: 0 for word in words
if word not in w_dict.keys()})
tf = term_frequency(w_dict)
tf_idf_dicts.append({word: tf[word]*idfs[word]
for word in words})
return tf_idf_dictsPutting it together:
list_of_word_dicts = [company_complaint_word_counts_dict[company]
for company in companies]
tf_idf_by_company_list = tf_idf(list_of_word_dicts)
tf_idf_by_company_dict = {c: tf_dict
for c, tf_dict in zip(companies, tf_idf_by_company_list)}What’s Unique About the Complaints for each of our Top Companies
To find the words that set each company apart, I found the words with the top tf-idf scores for my companies of interest.
Unfortunately, this revealed that I hadn’t done enough to clean my data. All of the highest tf-idf score words were typos or sets of words smashed together (eg. ‘tobe’, ‘calledthem’, etc.). Sometimes 1000 character strings without spaces. So here, I added a filter based on the nltk.corpus words.words() list. Unfortunately tobe is actually a word, so it still shows up in some of the top results. Given the issue with missing spaces in the dataset, I doubt that people were talking about “an outer garment traditionally word in some parts of north and central Africa, consisting of a length of cloth that is sewn into a long loose skirt or is draped around the body and fastened over one shoulder” (Collins Dictionary).
Let’s take a look at some of the top tf-idf scoring words for our top ten companies:
Equifax: reseller, accuser, reinsertion, certifiably, certifiable, runner
Experian: reseller, accuser, certifiably, certifiable, runner, reinsertion
Transunion: reseller, accuser, certifiably, certifiable, compliantly, reinsertion
BofA: merchant, foreclose, teller, platinum, mellon(?), firearm, mesne
Wells Fargo: preservation, foreclose, appraisal, preservationist, teller, dealer
JP Morgan Chase: sapphire, southwest, merchant, airway, explorer, teller
CitiBank: depot, promotional, goodyear, prestige, merchant, dividend
Capital One: kohl, quicksilver, savor, orchard, merchant, venture, rebill
Navient Solutions: pioneer, mae, unsubsidized, diploma, recertify, graduation
Ocwen Financial: homeward, foreclose, suspense, hooligan, duplex
We have do see the different categories of financial institutions, with the financial bureaus producing different results from the banks that do more mortgage lending and storefront banking, and from the U.S. Department of Education loan servicer Navient.
At the same time, this result is pretty disheartening. These most important words don’t feel super insightful. I’m thinking:
- Maybe we need to look at more than these top words, like the top 100?
- Alternatively, tf-idf might no be the right tool for this job. With SO many complaints for these companies, many words appear at least once in all or nearly all of the companies’ “corpus” of complaints. This factor, the inverse document frequency, is overpowering the text frequency. I wonder how the results would vary if I used all of the companies, instead of just the top 20.
As always, find more (code) in the GitHub repo.
Happy Coding!
