avatar💡Mike Shakhomirov

Free AI web copilot to create summaries, insights and extended knowledge, download it at here

11945

Abstract

'</span>, event_params.value.string_value, <span class="hljs-keyword">NULL</span>) <span class="hljs-keyword">as</span> use_extra_steps_virtual_currency_name

<span class="hljs-keyword">FROM</span> firebase<span class="hljs-operator">-</span>public<span class="hljs-operator">-</span>project.analytics_153293282.events_<span class="hljs-operator">*</span> , <span class="hljs-built_in">UNNEST</span>(event_params) <span class="hljs-keyword">AS</span> event_params , <span class="hljs-built_in">UNNEST</span>(user_properties) <span class="hljs-keyword">AS</span> user_properties <span class="hljs-keyword">WHERE</span> _TABLE_SUFFIX <span class="hljs-operator">>=</span> <span class="hljs-string">'20181003'</span> <span class="hljs-keyword">and</span> _TABLE_SUFFIX <span class="hljs-operator"><=</span> <span class="hljs-string">'20181003'</span> <span class="hljs-keyword">and</span> event_name <span class="hljs-keyword">in</span> (<span class="hljs-string">'use_extra_steps'</span>, <span class="hljs-string">'completed_5_levels'</span>) ) <span class="hljs-keyword">select</span> d.<span class="hljs-operator">*</span> ,e.event_category <span class="hljs-keyword">from</span> data d <span class="hljs-keyword">join</span> event_category e <span class="hljs-keyword">on</span> e.event_name <span class="hljs-operator">=</span> d.event_name <span class="hljs-keyword">order</span> <span class="hljs-keyword">by</span> user_pseudo_id , event_name , event_timestamp ;

<span class="hljs-keyword">select</span> <span class="hljs-operator"></span> <span class="hljs-keyword">from</span> your<span class="hljs-operator">-</span>project.analytics.ml_data_20181003 <span class="hljs-keyword">where</span> event_category <span class="hljs-operator">=</span> <span class="hljs-number">1</span> ; <span class="hljs-keyword">select</span> <span class="hljs-operator"></span> <span class="hljs-keyword">from</span> your<span class="hljs-operator">-</span>project.analytics.ml_data_20181003 <span class="hljs-keyword">where</span> event_category <span class="hljs-operator">=</span> <span class="hljs-number">2</span> ;</pre></div><p id="771d">In the query results, you will see that we can use <code><b>event_category</b></code> <b>as a partition</b> to <b>avoid</b> a full table scan in the future.</p><blockquote id="c1ec"><p>We processed raw data just once and now can create an externally partitioned data lake bucket with a Hive partition layout</p></blockquote><figure id="405c"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*OsLP5qT987-OckrAotahtg.png"><figcaption>Image by author</figcaption></figure><p id="c6ff">We can <b>loop through</b> each wildcard table and each <code>event_category</code> partition to export the data if needed.</p><p id="8de2">We know that certain operations allow <b>suffixing</b> of the <b>table ID</b> with a <b>partition decorator</b>, such as <code><b>sample_table20190123</b></code>. So in our case, it will be:</p><div id="7fdc"><pre>bq <span class="hljs-built_in">head</span> --max_rows=10 <span class="hljs-string">'your-project:analytics.ml_data_201810021'</span></pre></div><p id="4c6b">We can use it to export data to the data lake with <code>category</code> partition, i.e. <code>gs://firebase-events-archive-avro/dt=2018–10–03/category=1/partitionKey/events_.avro</code></p><p id="688c" type="7">I’ll explain how to do it in the next step.</p><h2 id="f776">What is a Hive partitioning layout?</h2><p id="772d">It is just a way to format object names in the data lake.</p><p id="2406">Should we choose to use <b><i>externally partitioned data</i></b> later, we would want to store it in cloud storage using the <b>default Hive partitioning layout</b>.</p><p id="2572">In this case, we can create externally partitioned tables on <b>Avro, CSV, JSON, ORC, and Parquet</b> files and</p><blockquote id="9a9c"><p>use data lake as a source layer for Hadoop and EMR tools.</p></blockquote><p id="802f">Example:</p><div id="828b"><pre>gs://events-export-avro/public-project/avro_external_test/dt=2018-10-01/lang=en/partitionKey gs://events-export-avro/public-project/avro_external_test/dt=2018-10-02/lang=fr/partitionKey</pre></div><h2 id="9596">How to choose the right Big Data file format?</h2><p id="083c" type="7">Avro, CSV, JSON, ORC or Parquet?</p><p id="4f53">It could be difficult to determine which format would be superior to the other because each offers advantages and different forms of compression.</p><p id="4dc4">When we need a <b>better compression</b> ratio, then <b>ORC</b> or <b>Parquet</b> would suit us better. It actually depends on which tool we are going to use to run analytical queries on our data. <b>ORC</b> is better optimized for <b>HIVE</b> and <b>Pig</b> framework workloads, whereas <b>Parquet</b> is a default file format for <b>Spark</b>.</p><p id="45f7">I previously wrote about it here:</p><div id="9435" class="link-block"> <a href="https://mydataschool.com/blog/external-tables-and-data-file-formats/"> <div> <div> <h2>Data Warehouse, Big Data Export and External Tables</h2> <div><h3>It's never just a data warehouse Yet another way to improve our data solution. If your data platform architecture…</h3></div> <div><p>mydataschool.com</p></div> </div> <div> <div style="background-image: url(https://miro.readmedium.com/v2/resize:fit:320/0*766L1bsIXpchea6m)"></div> </div> </div> </a> </div><p id="5796">When all fields must be accessible, row-based storage makes <b>AVRO</b> a better option. It proved to be very fast with <i>write-intensive</i> queries and has advanced schema evolution support. Therefore, it might be a better choice for the <b>landing area and data loading</b>.</p><h2 id="832a">Step 2. Export data to Cloud Storage</h2><p id="3690">In many modern data warehouse solutions, there is a feature to export data to storage using <b>SQL</b>. So, in theory we could do something like that using BigQuery and a public Firebase project:</p><div id="61a0"><pre>EXPORT DATA OPTIONS ( uri <span class="hljs-operator">=</span> <span class="hljs-string">'gs://firebase-events-export/public-project/dt=2018-10-01/partitionKey/.json'</span>, format <span class="hljs-operator">=</span> <span class="hljs-string">'JSON'</span>, overwrite <span class="hljs-operator">=</span> <span class="hljs-literal">true</span> ) <span class="hljs-keyword">AS</span> ( <span class="hljs-keyword">SELECT</span> <span class="hljs-operator">*</span> <span class="hljs-keyword">FROM</span> firebase<span class="hljs-operator">-</span>public<span class="hljs-operator">-</span>project.analytics_153293282.events_20181001 );</pre></div><p id="0aaf"><code>uri</code> option defines the output storage layout, i.e. <code>uri = 'gs://firebase-events-export/public-project/dt=2018–10–01/partitionKey/*.avro'</code></p><p id="d2a8">There is one thing to consider, though… When we use SQL and <code><b>SELECT * …</b></code> it will <b>do a full scan of that table</b>.</p><p id="8eff" type="7">So the export is not entirely free in this case.</p><p id="5409">In fact, it is a common misconception, i.e., in BigQuery documentation, it is free, but we will have to pay for the query we use in data export operation.</p><h2 id="beae">How to export data from BigQuery for free</h2><p id="506a">We can use a <b>shared pool</b> to export data from <b>BigQuery</b> dataset to Cloud Storage for free. Let’s do some coding with <b>Python</b>. I’ll put a link about the shared pool and data export at the bottom of this article.</p><p id="fa2b">We would want to create a simple microservice that works in a Directed Acyclic Graph (DAG) and maybe schedule it to export data after 60 days.</p><blockquote id="d264"><p>What is DAG?</p></blockquote><p id="73d3"><i>There are lots of clever mathematical words behind this term but in data engineering, we mean a data pipeline with some actions triggered by some events or outcomes of other actions.</i></p><p id="3671">Our app folder would look like that:</p><div id="2bd4"><pre>. ├── stack └── bq_extractor ├── app.py ├── bq_extractor_env ├── event.json └── requirements.txt</pre></div><p id="a747">Let’s create a virtual environment with all the required libraries we are going to use. Our <code><b>requirements.txt</b></code> will have these Python libraries installed:</p><div id="d1e2"><pre><span class="hljs-attr">google-auth</span>==<span class="hljs-number">2.15</span>.<span class="hljs-number">0</span> <span class="hljs-attr">google-cloud-bigquery</span>==<span class="hljs-number">3.4</span>.<span class="hljs-number">0</span> <span class="hljs-attr">requests</span>==<span class="hljs-number">2.28</span>.<span class="hljs-number">1</span> <span class="hljs-attr">pyyaml</span>==<span class="hljs-number">6.0</span> <span class="hljs-attr">python-lambda-local</span>==<span class="hljs-number">0.1</span>.<span class="hljs-number">13</span></pre></div><p id="a6f2">Let’s install them.</p><div id="57fa"><pre><span class="hljs-built_in">cd</span> stack <span class="hljs-built_in">cd</span> bq_extractor virtualenv bq_extractor_env <span class="hljs-built_in">source</span> bq_extractor_env/bin/activate pip install -r requirements.txt</pre></div><p id="017b">Now let’s create our microservice. I’ve quickly scribbled this snippet below for this article. Feel free to change the code according to your needs. It will use <code><b>google</b></code> libraries to authenticate BigQuery client and run the <code><b>export</b></code> job.</p><div id="c2bf"><pre><span class="hljs-comment"># https://googleapis.dev/python/bigquery/latest/index.html</span> <span class="hljs-keyword">import</span> json <span class="hljs-keyword">import</span> requests <span class="hljs-keyword">from</span> datetime <span class="hljs-keyword">import</span> datetime, date, timedelta <span class="hljs-keyword">from</span> google.api_core <span class="hljs-keyword">import</span> retry <span class="hljs-keyword">from</span> google.cloud <span class="hljs-keyword">import</span> bigquery <span class="hljs-keyword">from</span> google.oauth2 <span class="hljs-keyword">import</span> service_account

<span class="hljs-comment"># Test your service locally by ruunning</span> <span class="hljs-comment"># python-lambda-local -f lambda_handler -t 10 app.py event.json</span> <span class="hljs-comment"># It should be able to do a request</span> response = requests.get(<span class="hljs-string">'https://api.github.com'</span>) <span class="hljs-built_in">print</span>(response)

<span class="hljs-comment"># Paste your JSON service account credentials here:</span> service_acount_str = { <span class="hljs-string">"type"</span>: <span class="hljs-string">"service_account"</span>, <span class="hljs-string">"project_id"</span>: <span class="hljs-string">"your-project"</span>, <span class="hljs-string">"private_key_id"</span>: <span class="hljs-string">""</span>, <span class="hljs-string">"private_key"</span>: <span class="hljs-string">"-----BEGIN PRIVATE KEY----...\n-----END PRIVATE KEY-----\n"</span>, <span class="hljs-string">"client_email"</span>: <span class="hljs-string">"[email protected]"</span>, <span class="hljs-string">"client_id"</span>: <span class="hljs-string">"123"</span>, <span class="hljs-string">"auth_uri"</span>: <span class="hljs-string">"https://accounts.google.com/o/oauth2/auth"</span>, <span class="hljs-string">"token_uri"</span>: <span class="hljs-string">"https://oauth2.googleapis.com/token"</span>, <span class="hljs-string">"auth_provider_x509_cert_url"</span>: <span class="hljs-string">"https://www.googleapis.com/oauth2/v1/certs"</span>, <span class="h

Options

ljs-string">"client_x509_cert_url"</span>: <span class="hljs-string">"https://www.googleapis.com/robot/v1/metadata/x509/bigquery-adminsdk%40client.iam.gserviceaccount.com"</span> }

credentials = service_account.Credentials.from_service_account_info(service_acount_str) <span class="hljs-comment"># ? https://googleapis.dev/python/google-api-core/latest/auth.html#overview</span> <span class="hljs-built_in">print</span>(credentials.project_id)

<span class="hljs-comment"># Simple function to check connectivity:</span> <span class="hljs-keyword">def</span> <span class="hljs-title function_">bigquery_hello</span>(<span class="hljs-params">txt</span>): client = bigquery.Client(credentials=credentials, project=credentials.project_id) QUERY = (<span class="hljs-string">'SELECT "{} nice to meet you";'</span>.<span class="hljs-built_in">format</span>(txt)) query_job = client.query(QUERY) <span class="hljs-comment"># API request</span> rows = query_job.result() <span class="hljs-comment"># Waits for query to finish</span> greet = <span class="hljs-built_in">list</span>(rows)[<span class="hljs-number">0</span>][<span class="hljs-number">0</span>] <span class="hljs-keyword">return</span> greet

<span class="hljs-comment"># Main helper function</span> <span class="hljs-keyword">def</span> <span class="hljs-title function_">export_table_to_storage</span>(<span class="hljs-params">table_name, bucket_partition</span>): <span class="hljs-comment"># Connect to BigQuery to run jobs programmatically</span> client = bigquery.Client(credentials=credentials, project=credentials.project_id)

<span class="hljs-comment"># Public project source and test staging buucket</span>
project = <span class="hljs-string">'firebase-public-project'</span>
dataset_id = <span class="hljs-string">'analytics_153293282'</span>
bucket_name = <span class="hljs-string">'firebase-events-archive-avro'</span>

destination_uri = <span class="hljs-string">"gs://{}/{}/partitionKey/events_*.avro"</span>.<span class="hljs-built_in">format</span>(bucket_name, bucket_partition)
dataset_ref = bigquery.DatasetReference(project, dataset_id)
table_ref = dataset_ref.table(table_name)
job_config = bigquery.job.ExtractJobConfig()
<span class="hljs-comment"># job_config.compression = bigquery.Compression.GZIP</span>
<span class="hljs-comment"># https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.ExtractJobConfig</span>
job_config.destination_format = bigquery.DestinationFormat.AVRO
job_config.compression = bigquery.Compression.SNAPPY

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    <span class="hljs-comment"># Location must match that of the source table.</span>
    location=<span class="hljs-string">"US"</span>,
    job_config=job_config,
)  <span class="hljs-comment"># API request</span>
<span class="hljs-comment"># extract_job.result()  # Waits for job to complete. Calling client.extract_table starts the job. No need to wait to finish</span>
<span class="hljs-built_in">print</span>(<span class="hljs-string">"Export table to {}"</span>.<span class="hljs-built_in">format</span>(destination_uri))

<span class="hljs-keyword">def</span> <span class="hljs-title function_">lambda_handler</span>(<span class="hljs-params">event, context</span>): <span class="hljs-built_in">print</span>(event)

start_date = date(<span class="hljs-number">2018</span>,<span class="hljs-number">9</span>,<span class="hljs-number">1</span>)
end_date = date(<span class="hljs-number">2018</span>,<span class="hljs-number">9</span>,<span class="hljs-number">3</span>)

dates= [start_date+timedelta(days=x) <span class="hljs-keyword">for</span> x <span class="hljs-keyword">in</span> <span class="hljs-built_in">range</span>((end_date-start_date).days)]
<span class="hljs-keyword">for</span> dt <span class="hljs-keyword">in</span> dates:
    table_name = dt.strftime(<span class="hljs-string">'events_%Y%m%d'</span>)
    partition_name = dt.strftime(<span class="hljs-string">'dt=%Y-%m-%d'</span>)
    export_table_to_storage(table_name, partition_name)

bigquery_message = bigquery_hello(<span class="hljs-string">'it is '</span>)

message = <span class="hljs-string">'Hello {} {}, {}!'</span>.<span class="hljs-built_in">format</span>(event[<span class="hljs-string">'first_name'</span>], event[<span class="hljs-string">'last_name'</span>], bigquery_message)  
<span class="hljs-keyword">return</span> { 
    <span class="hljs-string">'message'</span> : message
}</pre></div><p id="dd4f">Our service will connect to BigQuery to run jobs programmatically including <code><b>`extract`</b></code>.</p><p id="3884">Let’s create our Cloud Storage bucket first. We can do it with a web console or using command line tools. If we have <code><b>`gsutil`</b></code> installed run this in the command line:</p><div id="f124"><pre>gsutil mb -c archive -l US-CENTRAL1 -p your-project-name gs://firebase-events-archive-avro</pre></div><p id="4bb4">Now let’s run our microservice.</p><p id="59ae">When <code><b>app.py</b></code> is ready, you can test it locally:</p><div id="82c2"><pre><span class="hljs-comment"># Test your service locally by running in command line</span>

python-lambda-<span class="hljs-keyword">local</span> -f lambda_handler -t <span class="hljs-number">10</span> app.py event.json</pre></div><figure id="e971"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*-xNXos_lRjtAYG-ZH7k66w.png"><figcaption>Image by author</figcaption></figure><p id="a8f1">Let’s list our bucket to see if data is there:</p><div id="b687"><pre>gsutil <span class="hljs-built_in">ls</span> gs://firebase-events-archive-avro/</pre></div><figure id="27de"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*AkXpdhay9rGzJyRXOnWs4A.png"><figcaption>Image by author</figcaption></figure><h2 id="01b6">Use this bash script to get all directory sizes in the bucket</h2><p id="1706">For example, we might want to validate that the export operation actually worked.</p><div id="9ca8"><pre>gsutil <span class="hljs-built_in">ls</span> -l gs://firebase-events-archive-avro/ | xargs -I{} gsutil <span class="hljs-built_in">du</span> -sh {}</pre></div><figure id="e805"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*yYP-KMBS2OY3KZ4HiBkSXA.png"><figcaption>Image by author</figcaption></figure><h2 id="365f">How to add an extra bucket partition with the Hive layout?</h2><p id="a6bd">In case we need to export data with an extra <code><b>category</b></code> <b>partition key,</b> we would want to use something like this:</p><div id="bf00"><pre> category_number = <span class="hljs-number">1</span> <span class="hljs-keyword">for</span> dt <span class="hljs-keyword">in</span> dates: table_name = dt.strftime(<span class="hljs-string">'ml_data_%Y%m%d$1'</span>) partition_name = dt.strftime(<span class="hljs-string">'dt=%Y-%m-%d'</span>) category = <span class="hljs-string">"category={}"</span>.<span class="hljs-built_in">format</span>(category_number) export_table_to_storage(table_name, partition_name, category)</pre></div><p id="4495">We can simply <b>loop through</b> <i>all dates</i> and <i>all categories</i> to create a data lake output like this:</p><p id="7f00"><code>gs://firebase-events-archive-avro/dt=2018–10-03/category=1/partitionKey/events_*.avro</code></p><p id="187a">The output would be:</p><div id="7f87"><pre><span class="hljs-string">Export</span> <span class="hljs-string">table</span> <span class="hljs-string">to</span> <span class="hljs-string">gs://firebase-events-archive-avro/dt=2018-10-03/category=1/partitionKey/events_.avro</span> [<span class="hljs-string">root</span> <span class="hljs-bullet">-</span> <span class="hljs-string">INFO</span> <span class="hljs-bullet">-</span> <span class="hljs-number">2023-02-01 15:30:22</span>,<span class="hljs-number">716</span>] <span class="hljs-attr">END RequestId:</span> <span class="hljs-string">ac865a0b-05dd-4ef3-9b5f-9bfe7f0ce5b7</span> [<span class="hljs-string">root</span> <span class="hljs-bullet">-</span> <span class="hljs-string">INFO</span> <span class="hljs-bullet">-</span> <span class="hljs-number">2023-02-01 15:30:22</span>,<span class="hljs-number">717</span>] <span class="hljs-attr">REPORT RequestId: ac865a0b-05dd-4ef3-9b5f-9bfe7f0ce5b7 Duration:</span> <span class="hljs-number">2284.26 </span><span class="hljs-string">ms</span></pre></div><p id="a364">Let’s validate the numbers in case we choose to load it back:</p><div id="7c57"><pre><span class="hljs-keyword">select</span> <span class="hljs-built_in">count</span>(<span class="hljs-operator"></span>) <span class="hljs-keyword">from</span> your<span class="hljs-operator">-</span>project.analytics.ml_data_20181003 <span class="hljs-keyword">where</span> event_category <span class="hljs-operator">=</span> <span class="hljs-number">1</span> ;

LOAD DATA <span class="hljs-keyword">INTO</span> your<span class="hljs-operator">-</span>project.source.ml_data_20181003_1 <span class="hljs-keyword">FROM</span> FILES( format<span class="hljs-operator">=</span><span class="hljs-string">'AVRO'</span>, uris <span class="hljs-operator">=</span> [<span class="hljs-string">'gs://firebase-events-archive-avro/dt=2018-10-03/category=1/*'</span>] ) ;

<span class="hljs-keyword">select</span> <span class="hljs-built_in">count</span>(<span class="hljs-operator">*</span>) <span class="hljs-keyword">from</span> your<span class="hljs-operator">-</span>project.source.ml_data_20181003_1 ;</pre></div><h2 id="8250">Conclusion</h2><p id="e789">I hope this story will be useful for you. It’s a real-life data engineering scenario where we need to <b>prepare raw event data</b> and <b>pass it to a machine learning service</b> further down the pipeline. <b>Data modeling</b> is one of the essential skills in data engineering. This article tells how we apply it to optimize dataset schemas, partitions, and storage when data is no longer needed.</p><blockquote id="b757"><p>If there is a way to do it for free, then why not?</p></blockquote><p id="874b">We created a simple microservice with AWS Lambda to export the data, but there is so much more we can do with it. We can connect it to the API gateway, create another web service to orchestrate pipelines (i.e., DataHub), use other events as triggers, etc.</p><p id="fe8b">After the required DML transformations on our event data, it is stored in the data lake where other ML services can access it and process it in a more efficient and scalable way to train machine learning models.</p><h2 id="6275">Repository</h2><p id="da81"><a href="https://github.com/mshakhomirov/bigquery_extractor">https://github.com/mshakhomirov/bigquery_extractor</a></p><h2 id="a4c1">Recommended read</h2><ol><li><a href="https://cloud.google.com/bigquery/docs/managing-partitioned-table-data">https://cloud.google.com/bigquery/docs/managing-partitioned-table-data</a></li><li><a href="https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client">https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client</a></li><li><a href="https://cloud.google.com/bigquery/docs/external-data-cloud-storage">https://cloud.google.com/bigquery/docs/external-data-cloud-storage</a></li><li><a href="https://cloud.google.com/bigquery/docs/hive-partitioned-queries">https://cloud.google.com/bigquery/docs/hive-partitioned-queries</a></li><li><a href="https://cloud.google.com/bigquery/docs/exporting-data">https://cloud.google.com/bigquery/docs/exporting-data</a></li><li><a href="https://cloud.google.com/bigquery/quotas#export_jobs">https://cloud.google.com/bigquery/quotas#export_jobs</a></li></ol></article></body>

Supercharge Your Data Engineering Skills with This Machine Learning Pipeline

Data modeling, Python, DAGs, Big Data file formats, costs… It covers everything

Photo by Peter Olexa on Unsplash

This is a real-life scenario when I was tasked to create a highly scalable machine learning pipeline with raw event data sent from the mobile application.

The story offers a set of advanced techniques that might be useful for interview preparation.

Learn how to work with raw data, transform it, enrich it to prepare for machine learning, export it to the data lake and archive raw when it is no longer needed.

Everything featured in this story assumes you have a Google Cloud Platform (GCP) account and you are familiar with basic Python and data warehousing concepts.

If not, don’t worry. I’ll try to explain it in detail.

Data Pipeline

Data pipelines are not always straightforward, and I wrote about it before.

This is how it works in real life.

Let’s imagine we have a huge amount of raw event data (Big Data) coming from our mobile application. The application itself has built on IOS and Android, and we connect it to Google Firebase (Google Analytics 4) to gather user engagement data.

Now we want to use that dataset to activate user behavior or predictions, i.e., user churn, personas, notifications, etc. At some point, I had to unload about 150Tb of data to a cloud storage archive to optimize BigQuery storage costs. We also would like to transform raw event data and create a dataset for the Machine learning (ML) pipeline.

Image by author

Data will flow as described below:

  • Mobile app sends event data to Firebase
  • Firebase outputs data into the BigQuery dataset
  • We QA and transform the data with SQL to create a new dataset for ML
  • We export the ML dataset to the Cloud Storage bucket (standard class)
  • We archive raw event data in Cloud Storage (archive type)

Sometimes data pipelines are a bit unconventional, like in this case. A typical data pipeline would start in the data lake.

Data lakes are cheaper to run compared to data warehouse solutions

However, this particular pipeline starts with Firebase event export to BigQuery. It’s a natural data integration that exists in the GCP ecosystem. No coding knowledge is required, and we can connect it with no problem at all.

Why export data to archive or cloud storage?

Until a certain point, we never thought about data exports from the data warehouse per se. Storage there is already optimized, and after 90 days, all tables and partitions go to the near-line storage class, which is 50% cheaper than standard (or active storage in BigQuery terms).

However, these `events_` wildcard tables are very heavy, and over a couple of years, it might result in a huge dataset with petabytes of data.

Here’s a BigQuery Long-term storage costs example to consider. Even though the storage class had been changed after 90 days, there is still a potential for cost-optimization:

Image by author

Why export data for ML?

We would want to train a bespoke machine-learning model with Spark/PySpark. Of course, you can rightfully mention that BigQuery has its own built-in ML capabilities.

However, it might not be enough if we are on a flat rate pricing model and our model training application requires more compute power.

In this case, we would need something that scales well and can work with data lake data. Ideally, it has to be partitioned and have a certain partitioning layout, i.e., Hive. I wrote about how to add a Hive partitioning layout in this story:

Step 1. Create ML dataset from Firebase / Google Analytics 4 event data

We can use some publicly available Firebase data from `firebase-public-project`.

For example, Google has a sample dataset for a mobile game app called “Flood It!” (Android, iOS) and you can find it here: https://console.cloud.google.com/bigquery?p=firebase-public-project&d=analytics_153293282&t=events_20181003&page=table&_ga=2.124992394.-1293267939.1657258995

This dataset contains 5.7M events from over 15k users. Open the link above and click Preview. It won’t cost anything to run a Preview on any table:

Image by author

It looks fairly simple and it is 9.7Mb of data only. We have users in the app with their device_ids (`user_pseudo_id`) and we have event parameters from user engagement data.

One major requirement for machine learning data would be to have this dataset available externally in a data lake partitioned by `date` and `event_name_category`.

In this case, the data science team won’t need to load all raw event data to train the model and will be able to pull only the required event types.

We would want to create a new table using the DML statement and transformations we need. We will use the raw data only once to generate this dataset. After that, raw data go to the archive, and ML data will be transferred to the ML landing area in Cloud Storage.

During this operation, we are going to `unnest` the `event_params` and `user_properties` we need. Let’s take a look at this example for just one day of data and only two events, i.e. `event_name in ('use_extra_steps', 'completed_5_levels')`. The partitioning column must be a top-level field.

Unfortunately, we cannot use a leaf field from a RECORD ( STRUCT ) as the partitioning column, i.e. `partition by (dt, event_category)` will not work.

However, we are working with just one day of data (a wildcard table), so we can simply partition it by `event_category` and export it to Storage, i.e.

gs://firebase-events-export/public-project/dt=2018-10-02/category=1/partitionKey

After that, we can create a custom script to loop through the categories and export data to the data lake with `category=1/partitionKey`.

Here is a sample script to do it. Feel free to add any nested parameters from events, etc. It can be scheduled daily to run just once and save a lot of money:

create table if not exists `your-project.analytics.ml_data_20181003` (
 
  dt                DATE 
, event_timestamp   TIMESTAMP
, user_id           STRING
, user_pseudo_id    STRING
, platform          STRING
, language          STRING
, country           STRING
, event_name        STRING
, use_extra_steps_virtual_currency_name        STRING
, plays_quickplay       STRING
, event_category        INT64
)
partition by range_bucket(event_category, generate_array(0, 10, 1))
cluster by user_id, user_pseudo_id
;

INSERT `your-project.analytics.ml_data_20181003`

with 
event_category as (
   select 
      1 as event_category
      ,'use_extra_steps' as event_name
   union all 

   select
      2 as event_category
      ,'completed_5_levels' as event_name
)

,data as (
SELECT
   PARSE_DATE('%Y%m%d', event_date) as dt
 , timestamp_micros(event_timestamp) as event_timestamp
 , user_id            
 , user_pseudo_id     
 , platform           
 , device.language    
 , geo.country        
 , event_name         
 , IF(user_properties.key = 'plays_quickplay', user_properties.value.string_value, NULL)                      as plays_quickplay
 , IF(event_params.key = 'virtual_currency_name', event_params.value.string_value, NULL)                      as use_extra_steps_virtual_currency_name


FROM `firebase-public-project.analytics_153293282.events_*`
     , UNNEST(event_params)    AS event_params
     , UNNEST(user_properties) AS user_properties
WHERE
   _TABLE_SUFFIX >= '20181003'
and _TABLE_SUFFIX <= '20181003'
and event_name in ('use_extra_steps', 'completed_5_levels')
)
select d.*  ,e.event_category
from data d
join event_category e on e.event_name = d.event_name
order by
    user_pseudo_id
    , event_name
    , event_timestamp
;

select * from `your-project.analytics.ml_data_20181003` where event_category = 1
;
select * from `your-project.analytics.ml_data_20181003` where event_category = 2
;

In the query results, you will see that we can use `event_category` as a partition to avoid a full table scan in the future.

We processed raw data just once and now can create an externally partitioned data lake bucket with a Hive partition layout

Image by author

We can loop through each wildcard table and each `event_category` partition to export the data if needed.

We know that certain operations allow suffixing of the table ID with a partition decorator, such as sample_table$20190123. So in our case, it will be:

bq head --max_rows=10 'your-project:analytics.ml_data_20181002$1'

We can use it to export data to the data lake with `category` partition, i.e. gs://firebase-events-archive-avro/dt=2018–10–03/category=1/partitionKey/events_*.avro

I’ll explain how to do it in the next step.

What is a Hive partitioning layout?

It is just a way to format object names in the data lake.

Should we choose to use externally partitioned data later, we would want to store it in cloud storage using the default Hive partitioning layout.

In this case, we can create externally partitioned tables on Avro, CSV, JSON, ORC, and Parquet files and

use data lake as a source layer for Hadoop and EMR tools.

Example:

gs://events-export-avro/public-project/avro_external_test/dt=2018-10-01/lang=en/partitionKey
gs://events-export-avro/public-project/avro_external_test/dt=2018-10-02/lang=fr/partitionKey

How to choose the right Big Data file format?

Avro, CSV, JSON, ORC or Parquet?

It could be difficult to determine which format would be superior to the other because each offers advantages and different forms of compression.

When we need a better compression ratio, then ORC or Parquet would suit us better. It actually depends on which tool we are going to use to run analytical queries on our data. ORC is better optimized for HIVE and Pig framework workloads, whereas Parquet is a default file format for Spark.

I previously wrote about it here:

When all fields must be accessible, row-based storage makes AVRO a better option. It proved to be very fast with write-intensive queries and has advanced schema evolution support. Therefore, it might be a better choice for the landing area and data loading.

Step 2. Export data to Cloud Storage

In many modern data warehouse solutions, there is a feature to export data to storage using SQL. So, in theory we could do something like that using BigQuery and a public Firebase project:

EXPORT DATA
  OPTIONS (
    uri = 'gs://firebase-events-export/public-project/dt=2018-10-01/partitionKey/*.json',
    format = 'JSON', 
    overwrite = true
)
AS (
SELECT *
FROM `firebase-public-project.analytics_153293282.events_20181001`
);

`uri` option defines the output storage layout, i.e. `uri = 'gs://firebase-events-export/public-project/dt=2018–10–01/partitionKey/*.avro'`

There is one thing to consider, though… When we use SQL and `SELECT * …` it will do a full scan of that table.

So the `export` is not entirely free in this case.

In fact, it is a common misconception, i.e., in BigQuery documentation, it is free, but we will have to pay for the query we use in data export operation.

How to export data from BigQuery for free

We can use a shared pool to export data from BigQuery dataset to Cloud Storage for free. Let’s do some coding with Python. I’ll put a link about the shared pool and data export at the bottom of this article.

We would want to create a simple microservice that works in a Directed Acyclic Graph (DAG) and maybe schedule it to export data after 60 days.

What is DAG?

There are lots of clever mathematical words behind this term but in data engineering, we mean a data pipeline with some actions triggered by some events or outcomes of other actions.

Our app folder would look like that:

.
├── stack
    └── bq_extractor
        ├── app.py
        ├── bq_extractor_env
        ├── event.json
        └── requirements.txt

Let’s create a virtual environment with all the required libraries we are going to use. Our `requirements.txt` will have these Python libraries installed:

google-auth==2.15.0
google-cloud-bigquery==3.4.0
requests==2.28.1
pyyaml==6.0
python-lambda-local==0.1.13

Let’s install them.

cd stack
cd bq_extractor
virtualenv bq_extractor_env
source bq_extractor_env/bin/activate
pip install -r requirements.txt

Now let’s create our microservice. I’ve quickly scribbled this snippet below for this article. Feel free to change the code according to your needs. It will use `google` libraries to authenticate BigQuery client and run the `export` job.

# https://googleapis.dev/python/bigquery/latest/index.html
import json
import requests
from datetime import datetime, date, timedelta
from google.api_core import retry
from google.cloud import bigquery
from google.oauth2 import service_account

# Test your service locally by ruunning
# python-lambda-local -f lambda_handler -t 10 app.py event.json
# It should be able to do a request
response = requests.get('https://api.github.com')
print(response)

# Paste your JSON service account credentials here:
service_acount_str = { "type": "service_account", "project_id": "your-project", "private_key_id": "", "private_key": "-----BEGIN PRIVATE KEY----...\n-----END PRIVATE KEY-----\n", "client_email": "[email protected]", "client_id": "123", "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/bigquery-adminsdk%40client.iam.gserviceaccount.com" }


credentials = service_account.Credentials.from_service_account_info(service_acount_str)
# ? https://googleapis.dev/python/google-api-core/latest/auth.html#overview
print(credentials.project_id)

# Simple function to check connectivity:
def bigquery_hello(txt):
    client = bigquery.Client(credentials=credentials, project=credentials.project_id)
    QUERY = ('SELECT "{} nice to meet you";'.format(txt))
    query_job = client.query(QUERY)  # API request
    rows = query_job.result()  # Waits for query to finish
    greet = list(rows)[0][0]
    return greet

# Main helper function
def export_table_to_storage(table_name, bucket_partition):
    # Connect to BigQuery to run jobs programmatically
    client = bigquery.Client(credentials=credentials, project=credentials.project_id)

    # Public project source and test staging buucket
    project = 'firebase-public-project'
    dataset_id = 'analytics_153293282'
    bucket_name = 'firebase-events-archive-avro'

    destination_uri = "gs://{}/{}/partitionKey/events_*.avro".format(bucket_name, bucket_partition)
    dataset_ref = bigquery.DatasetReference(project, dataset_id)
    table_ref = dataset_ref.table(table_name)
    job_config = bigquery.job.ExtractJobConfig()
    # job_config.compression = bigquery.Compression.GZIP
    # https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.ExtractJobConfig
    job_config.destination_format = bigquery.DestinationFormat.AVRO
    job_config.compression = bigquery.Compression.SNAPPY

    extract_job = client.extract_table(
        table_ref,
        destination_uri,
        # Location must match that of the source table.
        location="US",
        job_config=job_config,
    )  # API request
    # extract_job.result()  # Waits for job to complete. Calling client.extract_table starts the job. No need to wait to finish
    print("Export table to {}".format(destination_uri))

def lambda_handler(event, context):
    print(event)

    start_date = date(2018,9,1)
    end_date = date(2018,9,3)
    
    dates= [start_date+timedelta(days=x) for x in range((end_date-start_date).days)]
    for dt in dates:
        table_name = dt.strftime('events_%Y%m%d')
        partition_name = dt.strftime('dt=%Y-%m-%d')
        export_table_to_storage(table_name, partition_name)

    bigquery_message = bigquery_hello('it is ')

    message = 'Hello {} {}, {}!'.format(event['first_name'], event['last_name'], bigquery_message)  
    return { 
        'message' : message
    }

Our service will connect to BigQuery to run jobs programmatically including `extract`.

Let’s create our Cloud Storage bucket first. We can do it with a web console or using command line tools. If we have `gsutil` installed run this in the command line:

gsutil mb -c archive -l US-CENTRAL1 -p your-project-name gs://firebase-events-archive-avro

Now let’s run our microservice.

When app.py is ready, you can test it locally:

# Test your service locally by running in command line
python-lambda-local -f lambda_handler -t 10 app.py event.json
Image by author

Let’s list our bucket to see if data is there:

gsutil ls gs://firebase-events-archive-avro/
Image by author

Use this bash script to get all directory sizes in the bucket

For example, we might want to validate that the export operation actually worked.

gsutil ls -l gs://firebase-events-archive-avro/ | xargs -I{} gsutil du -sh  {}
Image by author

How to add an extra bucket partition with the Hive layout?

In case we need to export data with an extra `category` partition key, we would want to use something like this:

    category_number = 1
    for dt in dates:
        table_name = dt.strftime('ml_data_%Y%m%d$1')
        partition_name = dt.strftime('dt=%Y-%m-%d')
        category = "category={}".format(category_number)
        export_table_to_storage(table_name, partition_name, category)

We can simply loop through all dates and all categories to create a data lake output like this:

`gs://firebase-events-archive-avro/dt=2018–10-03/category=1/partitionKey/events_*.avro`

The output would be:

Export table to gs://firebase-events-archive-avro/dt=2018-10-03/category=1/partitionKey/events_*.avro
[root - INFO - 2023-02-01 15:30:22,716] END RequestId: ac865a0b-05dd-4ef3-9b5f-9bfe7f0ce5b7
[root - INFO - 2023-02-01 15:30:22,717] REPORT RequestId: ac865a0b-05dd-4ef3-9b5f-9bfe7f0ce5b7  Duration: 2284.26 ms

Let’s validate the numbers in case we choose to load it back:

select count(*) 
from `your-project.analytics.ml_data_20181003` 
where event_category = 1
;

LOAD DATA INTO your-project.source.ml_data_20181003_1
 FROM FILES(
   format='AVRO',
   uris = ['gs://firebase-events-archive-avro/dt=2018-10-03/category=1/*']
 )
;

select count(*) 
from your-project.source.ml_data_20181003_1
;

Conclusion

I hope this story will be useful for you. It’s a real-life data engineering scenario where we need to prepare raw event data and pass it to a machine learning service further down the pipeline. Data modeling is one of the essential skills in data engineering. This article tells how we apply it to optimize dataset schemas, partitions, and storage when data is no longer needed.

If there is a way to do it for free, then why not?

We created a simple microservice with AWS Lambda to export the data, but there is so much more we can do with it. We can connect it to the API gateway, create another web service to orchestrate pipelines (i.e., DataHub), use other events as triggers, etc.

After the required DML transformations on our event data, it is stored in the data lake where other ML services can access it and process it in a more efficient and scalable way to train machine learning models.

Repository

https://github.com/mshakhomirov/bigquery_extractor

Recommended read

  1. https://cloud.google.com/bigquery/docs/managing-partitioned-table-data
  2. https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client
  3. https://cloud.google.com/bigquery/docs/external-data-cloud-storage
  4. https://cloud.google.com/bigquery/docs/hive-partitioned-queries
  5. https://cloud.google.com/bigquery/docs/exporting-data
  6. https://cloud.google.com/bigquery/quotas#export_jobs
Big Data
Data Engineering
Google Analytics
Machine Learning
Python
Recommended from ReadMedium