Building a Data Lake on PB scale with Apache Spark
How we deal with Big Data at Emplifi
Professionally, I have spent the last four years in the data engineering team at the company Emplifi (formerly Socialbakers) and one of the largest projects I have been working on is building a distributed data storage system that holds currently nearly one PB of data with the goal to provide data analysts and researchers with tables of data they can efficiently analyze and research. As you can probably imagine — building and maintaining such a Data Lake is not a trivial task considering that not only does the data change frequently but also its schema evolves over time having dozens or even hundreds of fields with different levels of nesting.
In this article, I would like to share my experience and highlights from this journey mainly on the technical level.
The origin of the data we are processing on a daily basis is social networks such as Facebook, Twitter, Instagram, YouTube, LinkedIn, or TikTok. The processed datasets are mainly public profiles with published posts on these networks. Some smaller portion of the data comes also from internal systems. Our storage system is built on S3 AWS and we call it the Data Lake because we store here data in the raw format and also in preprocessed, processed, and aggregated form. The raw data is mainly in compressed JSON files, on the other hand, the processed and aggregated datasets are stored in the Apache Parquet format and are exposed to the users in the form of (Hive metastore) tables.
Data Flow
The data passes through several points in the company infrastructure, first, it gets downloaded from social networks using public and in some cases also private APIs. Next, it comes to DynamoDB, a key-value distributed database service on AWS that we call the primary database. DynamoDB is good at dealing with frequent updates of the records and this is very useful for our use case considering the data comes from social networks where each record such as a post evolves over time — it collects interactions and changes frequently. Each update in the DynamoDB is saved also on S3 in the stage level of our Data Lake. This is the starting point for a series of steps implemented in Apache Spark that will lead to consistent tables that can be efficiently accessed using standard analytical tools by our data analysts and researchers. In the next sections, I will describe these steps more in detail.
Structure of the Data
The incoming data to the stage is a continuous stream of records that are saved as compressed JSON files. Each record has two substructures — oldImage and newImage where the first one is what the record looked like before the update while the latter contains data after the update. Each record (post) might be downloaded several times throughout the day from the social network and thus this stream contains duplicates in the sense that each id that uniquely identifies the post may occur here multiple times. We need to consider only the latest version for each id to correctly update the final table. Here you can see a simple example of the data structure:
{
'newImage': {
'id': 1,
'version': 100,
'created_time': '2023-01-01: 10:00:00',
'interactions': 50,
MANY OTHER FIELDS...
}
'oldImage': {
'id': 1,
'version': 101,
'created_time': '2023-01-01: 10:00:00',
'interactions': 51,
MANY OTHER FIELDS...
}
}
Data Lake — Three Layers of Abstraction
Our Data Lake is built on S3 (distributed object store on AWS) and has three levels of abstraction, which we call the stage, target, and mart. In the stage layer, we keep the raw data that comes mostly from DynamoDB, but also from other databases such as Elasticsearch, Postgres, MongoDB, and also from various internal APIs. This data arrives here mostly in compressed JSON format and in some cases also in CSV. These datasets are preprocessed on a daily basis and saved in the Apache Parquet format also in the stage.
These Parquet files are next processed and used to update our tables in the target also either on a daily basis or in some cases — where the data freshness is not so crucial — on a weekly basis. As opposed to the stage data, the target tables are directly accessed by our data users.
For some tables where the data structure is not very user-friendly, we create new tables where we transform the data so it can be easily queried and save them in the mart layer — we don’t do that in the target because here we want to keep the tables as mirrors of the source systems from which the data comes, so the users are not confused that for example, the data in DynamoDB differs from the data in the target.
Having the tables efficient for querying is a high priority for us. It is quite painful for a data analyst to have to wait several minutes for a query to finish. We achieve that with different success at both levels:
- In the target: our hands are tight a little bit because we guarantee the same structure of the data as is in the original database. We can however achieve quite a lot with custom organization of the files under the table using techniques such as partitioning, bucketing, and sorting. See the next sections for more details.
- In the mart: we can do even more, simplify the nested structures, and pre-join, or pre-aggregate the data based on the typical queries that are executed against the table.
Stage
The stage is the entry point for the data that should be eventually exposed to users. The raw data that comes here is processed on daily basis with the following 3 steps in the processing pipeline:
- schema evolution
- data cleansing
- data deduplication
Schema Evolution — stage layer
It is not unusual that the structure of the incoming data changes over time — new fields might be added to the dataset or even the datatype of some field can change. This is quite a problem if you don’t have a solid schema-evolution step in place. Apache Spark allows you to read the data either by providing the schema or letting Spark infer it (using a so-called schema-on-read).
Usually, you have some expectations of what the schema should be (ideally the same as the schema of the already existing target table), but providing this schema to Spark will lead to missing all new fields that might come for some reason in the new increment of the data. Also if a data type of some field changes, Spark will read it as NULL (if these two datatypes are not compatible), or it will drop the rows, or will fail entirely based on the specified mode (also see my other article about JSON schema evolution). On the other hand, letting Spark infer the schema seems more reasonable, however, there are also some drawbacks and points that you need to consider:
- timestamps will be inferred as strings (with the exception of Spark 3.0 where it is inferred as timestamp)
- map type is inferred as a struct
- with no other logic in place, all new fields might be propagated to the final table which might be undesirable — someone should confirm that the new field can be exposed to the users (maybe the new field is produced just by some error)
- with no other logic in place, if the data type is different from the type in the target table, merging the increment to the table in the target will fail
The way how we approach this problem is by using an in-house tool for schema comparison that we developed internally. We have a versioning system (schema registry) for the schemas of all datasets. In the processing, we first let Spark infer the schema and then compare it with the versioned schema. It detects all new fields and reports that in some notification channels. Also if the data types change, the algorithm decides if these new types can be safely cast with no loss in the data to the versioned types. A new schema is created based on this comparison and the data is read again using this new schema and the changed fields are cast to the versioned fields.
The only problem occurs if the new data type cannot be safely cast in which case we proceed with the new data type but report an error in the notification channel because it means that the following job in the target layer will fail because of it. In this case, we need to manually investigate why it happened, if this is just a one-time change caused by some error or if it is some permanent change in the data structure which means that we will have to adjust the final table accordingly.
Data Cleansing
After reading the data into a DataFrame with the evolved schema and casting the types we apply a set of filters where we exclude records with NULL values in some crucial fields such as id, version, created_time, and so on. These fields are necessary for the correct deduplication of the data and correct merging to the final table in the target layer. We refer to these records as corrupted and save them in a separate folder where we can investigate why the values are missing.
Data Deduplication
As mentioned before, the data in the stage may contain multiple versions of the record with the same id. This is because individual records (mostly posts from social networks) are continuously changing and we download them from social networks multiple times throughout the day.
The processing job in the stage layer runs in the early morning and its goal is to take the increments (new-coming data) for the previous day and prepare it for the target job that will merge it into the final table. After schema evolution and data cleansing, we also need to deduplicate it in the sense that for each id we will keep only the latest version:
increment_df = (
spark.read
.format('json')
.option('path', input_path)
.schema(evolved_schema)
.load()
.select('newImage.*')
)
w = Window().partitionBy('id').orderBy(desc('version'))
result_df = (
increment_df
.withColumn('r', row_number().over(w))
.filter(col('r') == 1)
.drop('r')
)
As you can see from the code, we select only the data from the newImage and deduplicate it using the window function row_number() (for details about using the window functions in Spark see my other article).
Target
The goal of the target layer is to create tables that are mirrors of DynamoDB, Elasticsearch, and other internal databases where these tables can be directly accessed by data analysts and researchers using standard analytical tools, such as SQL, DataFrame API in Spark or Pandas and the entire ecosystem of Python libraries for data science can be used on top of it. Note that this would be very difficult to achieve by accessing directly the original databases.
Technically, in the target, we take the parquet files from the stage and upsert them on the existing tables. We also want to provide the analysts with tables that are prepared for the analytical queries, so we invest some effort when creating the layout of the tables. A table itself is just some abstraction on top of the files for which, again, we use the Apache Parquet format.
Tables Layout
We use the concept of copy-on-write which means that the entire table with all files is overwritten during each update. This is quite expensive, especially for large tables, however, it allows us to keep the files compact and organized as is required for efficient reads.
The tables that represent posts from social networks are partitioned by the time dimension, typical partition columns are created_year and created_month where both are derived from the created_time of the post. This partitioning speeds up the reading of the data because the analysts are usually interested in some recent data. Hence, processing engines such as Spark or Presto make sure that only the queried partitions are scanned and the rest are skipped. The tables are also bucketed by the column profile_id because it can be joined on this column with tables that represent the profiles. Queries with aggregations and joins on this column will not induce shuffle in Spark thanks to the bucketing and so it will improve the overall efficiency of such queries. Also, bucket pruning can take place if the query contains a filter on the profile_id column (for more information about bucketing see my other article).
Merging the Increment
The processing job in the target layer runs in the early morning after the stage job and its goal is to merge the increments from the previous day into the target table. We read both, the increment and the table into DataFrames, union them, and using the window function row_number — similarly as we did in the stage — we select the most recent version of the record that will be saved in the new snapshot. Thus, if a particular id is already in the table it will be replaced by the one coming from the increment if it has a bigger version:
stage_df = (
spark.read
.format('parquet')
.schema(input_schema)
.option(input_path)
.load()
)
target_df = spark.table(table_name)
w = Window().partitionBy('id').orderBy(desc('version'))
result_df = (
stage_df.unionByName(target_df)
.withColumn('r', row_number().over(w))
.filter(col('r') == 1)
.drop('r')
)
This will not work, however, if the schemas of stage_df and target_df are different, thus we need to apply a schema evolution step yet again.
Schema Evolution — target layer
Similarly, as we evolved the schema in the stage, we need to do it also in the target. To merge successfully the increment to the target table it is necessary for both to have the same schema. This won’t be the case, however, if new fields were added to the increment.
We are controlling if these new fields are added to the table or not using a schema that we call the input_schema. This input_schema is also versioned and if we want to promote new fields to the table we create a new version of this input_schema that contains these new fields. It is a manual step which seems plausible considering someone should confirm that the new fields can be exposed to the data users anyway.
Next, we also need to change the schema of the table to be the same as the schema of the increment. Spark SQL provides a way to add a new column to the table using ALTER TABLE table_name ADD COLUMNS but it doesn’t provide a way how to add nested fields.
Thus we implemented this functionality into our framework ourselves. In this situation, the target job creates an empty table with the new schema at a temporal location. After that, we drop the original table and point the empty table to the location of the original table where the data sits. Thus we obtain a new table with the same data but a modified schema:
(
spark.createDataFrame([], new_schema)
.write
.mode('overwrite')
.option('path', table_location_temp)
.saveAsTable(table_name_temp)
)
spark.sql('ALTER TABLE table_name_temp SET LOCATION table_location')
spark.sql('MSCK REPAIR TABLE table_name_temp')
spark.sql('DROP TABLE table_name')
spark.sql('ALTER TABLE table_name_temp RENAME TO table_name')
Atomic Writes
One of the big challenges from the beginning of developing our Data Lake was having atomic writes in place. The problem is that when overwriting a table using simply…
(
df.write
.mode('overwrite')
.format('parquet')
.option('path', output_path)
.saveAsTable(table_name)
)
…is not atomic. If the Spark job fails for any reason during the write the table may stop existing or the S3 prefix may begin to contain partially written data. In our framework, we implemented the atomicity by always saving the table in a different location with a new name table_name_temp and after the write succeeds we swap the table names, so the new snapshot will become available for the users only after it is successfully written. If the job fails we never swap the names but rather start the process from the beginning in a new location:
output_path = posixpath.join(output_path, str(int(time.time())))
(
df.write
.mode('overwrite')
.format('parquet')
.option('path', output_path)
.saveAsTable(table_name_tmp)
)
spark.sql('DROP TABLE IF EXISTS table_name')
spark.sql('ALTER TABLE table_name_tmp RENAME TO table_name')
The path where we save the data contains the timestamp of the moment of the write and we always keep a couple of last snapshots which allows us for a so-called time-travel — if we realize that the newly created table is corrupted for some reason (due to some bug) we can point it back to any of the previous snapshots.
Data Quality
The consistency and quality of the data in the target tables are crucial since the tables are directly exposed to the data users. This can be achieved by checking the tables after each update to make sure that data is not corrupted due to some bug in the code, corrupted export from the original database, or some other problem that could have happened along the way while the data was passing through the transformation steps before it got to the final table.
We are using the data quality framework Great Expectations that can be integrated with Spark. For each dataset, we can define a set of expectations that are being validated after the new snapshot of the table is created. In case some crucial expectations fail we can time-travel the table to the previous snapshot and investigate the failed expectations in detail.
Mart
Even though accessing the data from the target tables is quite efficient, for some tables it is not very user-friendly. This is natural because, as mentioned before, we keep the structure of the data in the target tables the same as is in the original databases where the data is not necessarily stored for analytical purposes. This is related mostly to columns inside nested data structures, such as arrays or structs. Sometimes it is too cumbersome for people using SQL to transform these fields in their queries so for this purpose we create additional tables that are derived from the target tables in such a way that the data structure is transformed there so it can be more easily queried. Sometimes these tables can be also joined with other tables and they are simply tailored for some specific group of users. These derived tables are saved in the mart layer of the lake. Another purpose of the mart is to keep there tables with some aggregations and results computed by the analysts.
Environment, Jobs, Orchestration
All the processing in the stage, target, and mart layer is implemented in PySpark with some custom logic for handling the schema evolution implemented in Python. The code runs in the Databricks platform and for each datasource, we have one job in the stage layer and one job in the target layer, for some datasources there are also jobs in the mart layer. All the jobs are orchestrated using the technology Apache Airflow which allows us to define the dependencies between the jobs.
The core logic for deduplication, schema evolution, atomic data saving, and others is implemented in classes and modules and is compiled into a wheel that is imported into the jobs so the code is re-used based on best practices in software engineering.
All steps in the lake are fairly automated with just a few manual steps related to ingesting a new datasource. In that situation, we just add some configuration parameters into a config file and add the schema of the new datasource to the schema registry. The git pipeline creates the jobs in Databricks and they are launched using Airflow according to predefined scheduling. The other manual steps are related to monitoring and approving new fields in incoming data and possible debugging of changed datatypes that cannot be safely cast to versioned ones.
As mentioned before, the data users can access the target and mart tables using either SQL or DataFrame API in Spark that is easily integrated with Pandas and consequently the entire ecosystem of Python libraries for data science and machine learning. Analysts that prefer using SQL can use Querybook — a user-friendly notebook interface — that is connected to the Presto engine, on the other hand, scientists that prefer using Spark can access the tables in the notebook environment in Databricks.
Acknowledgment
Building and maintaining the data lake has been a collaborative work of the entire data engineering team. I would like to thank all my colleagues not only for the beneficial and productive cooperation on the Data Lake project, for a lot of interesting thoughts and ideas that we came up with during the last four years, but also for the review and helpful comments on this article.
Conclusion
Having a robust and reliable data lake seems a necessity in a company that deals with big data. Traditional data warehousing concepts are no longer appropriate because the data volume is huge, and is coming from different databases and systems having complex structures. There are however several challenges that one has to deal with when implementing such a storage system.
ACID transactions that we take for granted in a relational database are no longer so obvious and one has to deal with low-level concepts such as file organization in the storage system. With some additional effort, the ACID can be brought back — we achieved that for the atomicity. An alternative to this approach can be using some more advanced table format such as Delta or Iceberg that provide ACID.
The changing schema of the data either because of some error during the export or just because the entities really change is quite a problem that may cause data loss if not treated with caution. Internally we developed a tool that we consider open-sourcing in the future. This tool allows us to version schemas for all datasources so we can see the history of how their schemas evolved. It also allows us to compare the schemas, do various actions with them, and handle the schema evolution painlessly.
Maintaining the data lake requires continuous discussion with the data users which are usually analysts, researchers, and data scientists. Knowing what kind of queries are being executed against a table allows us to highly optimize it by using a custom layout for the files in terms of partitioning, bucketing, and even sorting.