Best Practices for building a RAG using Parent Document Retriever (PDR) and using metadata filtering with PDR.
Are you looking for building an efficient RAG using a csv/excel file? Then this article is definitely for you :)
In this article I will be talking about the best practices for building an efficient RAG especially when you are trying to use Parent Document Retriever in LangChain and using the csv/excel file with multiple columns as your dataset. By the way, why specifically Parent Document Retriever?
Hahaha…. out of my experience after running multiple experiments parent document retriever is best to use when you have lengthy documents.
Let’s get started!!
Firstly, we have a csv/excel data which means although one column acts as a source to answer the questions, there might be many other columns such as country name, date and other columns which can be used as meta data to improve the answers. Now let us first see different tips to improve the answer and later talk about using the metadata.
- Creating a Custom CSV Loader: creating a custom CSV loader which is similar to Lang chain’s CSVLoader with few customizations. This will help us define which column needs to be treated as “page content” and which columns needs to be treated as “metadata”. You can use this customized CSVLoader to load your csv file. Let me show an example:
import csv
from typing import Dict, List, Optional
from langchain.document_loaders.base import BaseLoader
from langchain.docstore.document import Document
class CSVLoader(BaseLoader):
"""Loads a CSV file into a list of documents.
Each document represents one row of the CSV file. Every row is converted into a
key/value pair and outputted to a new line in the document's page_content.
The source for each document loaded from csv is set to the value of the
`file_path` argument for all doucments by default.
You can override this by setting the `source_column` argument to the
name of a column in the CSV file.
The source of each document will then be set to the value of the column
with the name specified in `source_column`.
Output Example:
.. code-block:: txt
column1: value1
column2: value2
column3: value3
"""
def __init__(
self,
file_path: str,
source_column: Optional[str] = None,
metadata_columns: Optional[List[str]] = None, # < ADDED
csv_args: Optional[Dict] = None,
encoding: Optional[str] = None,
):
self.file_path = file_path
self.source_column = source_column
self.encoding = encoding
self.csv_args = csv_args or {}
self.metadata_columns = metadata_columns # < ADDED
def load(self) -> List[Document]:
"""Load data into document objects."""
docs = []
with open(self.file_path, newline="", encoding=self.encoding) as csvfile:
csv_reader = csv.DictReader(csvfile, **self.csv_args) # type: ignore
for i, row in enumerate(csv_reader):
content = "\n".join(f"{k.strip()}: {v.strip()}" for k, v in row.items() if k == "Source column name")
try:
source = (
row[self.source_column]
if self.source_column is not None
else self.file_path
)
except KeyError:
raise ValueError(
f"Source column '{self.source_column}' not found in CSV file."
)
metadata = {"source": source, "row": i}
# ADDED TO SAVE METADATA
if self.metadata_columns:
for k, v in row.items():
if k in self.metadata_columns:
metadata[k] = v
# END OF ADDED CODE
doc = Document(page_content=content, metadata=metadata)
docs.append(doc)
return docs
2. Two splitters: Usually we use only one text splitter to break the long text into multiple smaller chunks, but in case of Parent Document Retriever we use two splitters. One for the larger chunks with more context (let’s call these larger chunks parent) and another splitter for the smaller chunks with better semantic meaning (let’s call these smaller chunks children). Pro Tip✨ Play with the chunk_size: chunk size while creating the child documents plays an important role in determining how the RAG system produces the answer. I would suggest playing with the chunk size initially until you feel there is less overlap between the child documents created and the answers produced are as per your expectations.
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400)
3. Storing both parent and child chunks: store the embeddings✨: Initializing the parent document retriever and adding the documents to the retriever is something that happens in the run time which not only takes long time to run but also costs us to create the same embeddings every time query the same, you can use the below code line to store the created embeddings.
vectorstore.persist()
Using the metadata: ✨
Now comes the interesting topic. By default, we cannot use LangChain’s parent retriever to use the metadata even though there is a metadata parameter as per the documentation, applying it does not filter the retrieved documents based on the provided filter. Hence, we need to write a custom class which filters the relevant docs based on the metadata. There are two possible approaches for this:
- Fetch the retrieved relevant documents from the vector store and then apply the metadata filter on top of it.
- Apply metadata filtering while performing the vector search and return only the unique documents.
second option is the correct way of solving this. As a result, always create a custom class with custom functions to retrieve most relevant documents based on metadata filters. This way you not only apply the filters while searching but also can increase the number of documents retrieved (By Default only 4 relevant docs can be fetched). Let me show an example on how this can be done. I am considering Country name and Product name as two columns in my data which are loaded as metadata as explained above (Creating Custom CSVLoader).
class ParentDocumentRetriever(BaseRetriever):
vectorstore: VectorStore
docstore: BaseStore[str, Document]
id_key: str = "doc_id"
search_kwargs: dict = Field(default_factory=dict)
child_splitter: TextSplitter
parent_splitter: Optional[TextSplitter] = None
def _get_relevant_documents(
self,
query: str,
*,
run_manager: CallbackManagerForRetrieverRun,
metadata_filter: Optional[Dict[str, Any]] = None
) -> List[Document]:
all_results = []
if metadata_filter:
# Iterate over each key-value pair in the metadata_filter
unique_ids = set()
# Iterate over each key-value pair in the metadata_filter
for key, value in metadata_filter.items():
# Perform the similarity search for the current key-value pair
sub_docs = self.vectorstore.similarity_search(query, k=10, filter={key: value}, **self.search_kwargs)
ids = [d.metadata[self.id_key] for d in sub_docs]
# Add unique document IDs to the set
unique_ids.update(ids)
# Retrieve documents from the docstore based on the unique IDs
all_results = self.docstore.mget(list(unique_ids))
print("Filtering documents with metadata:", metadata_filter)
filtered_documents = []
for document in all_results:
if document is not None:
match = all(
any(value in document.metadata.get(key, []) for value in values)
if isinstance(document.metadata.get(key), list)
else document.metadata.get(key) in values
for key, values in metadata_filter.items() if values
)
if match:
filtered_documents.append(document)
docs = filtered_documents
else:
sub_docs = self.vectorstore.similarity_search(query, k=10, **self.search_kwargs)
ids = []
for d in sub_docs:
if d.metadata[self.id_key] not in ids:
ids.append(d.metadata[self.id_key])
docs = self.docstore.mget(ids)
return [d for d in docs if d is not None]
def add_documents(
self,
documents: List[Document],
ids: Optional[List[str]] = None,
add_to_docstore: bool = True,
) -> None:
if self.parent_splitter is not None:
documents = self.parent_splitter.split_documents(documents)
if ids is None:
doc_ids = [str(uuid.uuid4()) for _ in documents]
if not add_to_docstore:
raise ValueError(
"If ids are not passed in, `add_to_docstore` MUST be True"
)
else:
if len(documents) != len(ids):
raise ValueError(
"Got uneven list of documents and ids. "
"If `ids` is provided, should be same length as `documents`."
)
doc_ids = ids
docs = []
full_docs = []
for i, doc in enumerate(documents):
_id = doc_ids[i]
sub_docs = self.child_splitter.split_documents([doc])
for _doc in sub_docs:
_doc.metadata[self.id_key] = _id
docs.extend(sub_docs)
full_docs.append((_id, doc))
self.vectorstore.add_documents(docs)
if add_to_docstore:
self.docstore.mset(full_docs)
Let me explain what happens when you call retriever’s get_relavant_documents function:
For each of the metadata filter you provide (Country and ProductName)
parent_retriever = ParentDocumentRetriever(vectorstore=vectorstore,
docstore=store,
child_splitter=child_splitter,
parent_splitter=parent_splitter,
)
parent_retriever.get_relevant_documents(query, metadata_filter={"Country":"Canada","ProductName":"Sample"})
The vectorstore will perform similarity search upon applying the filters individually, combine them and then filter out the unique documents, these unique documents can be used for querying, i.e. vector search will happen n number of times, where n is number of key-value pair in the metadata_filter dictionary and then filter unique docs.
Once you have the most relevant docs from the retriever it's time to chain them:
context = parent_retriever.get_relevant_documents(query, metadata_filter={"Country":"Canada","ProductName":"Sample"})
response = llm_chain({"context": context, "question": query})
The response generated will be as per the expectation with correct metadata filters applied as per the requirement.
Hope this was helpful!! :)