avatarHugo Lu

Free AI web copilot to create summaries, insights and extended knowledge, download it at here

3392

Abstract

er">17</span>]</pre></div><blockquote id="c020"><p>Compact Map()</p></blockquote><p id="b1c2"><code>compactMap</code> function is similar to <code>map</code>, but it also includes an additional step, it filters out any <code>nil</code> values that result from applying the transformation closure. This is particularly useful when you have an array of optional values and you want to transform them while simultaneously filtering out the <code>nil</code> elements.</p><div id="9cf9"><pre><span class="hljs-keyword">extension</span> <span class="hljs-title class_">Array</span> { <span class="hljs-keyword">func</span> <span class="hljs-title function_">compactMap</span><<span class="hljs-type">T</span>>(<span class="hljs-keyword">_</span> <span class="hljs-params">transforms</span>: (<span class="hljs-type">Element</span>) -> <span class="hljs-type">T</span>?) -> [<span class="hljs-type">T</span>] { <span class="hljs-keyword">var</span> result <span class="hljs-operator">=</span> <span class="hljs-type">T</span> <span class="hljs-keyword">for</span> element <span class="hljs-keyword">in</span> <span class="hljs-keyword">self</span> { <span class="hljs-keyword">if</span> <span class="hljs-keyword">let</span> element <span class="hljs-operator">=</span> element { result.append(transform(element)) } } <span class="hljs-keyword">return</span> result } }</pre></div><p id="fe9c">Here the only change from <code>Map</code> and <code>Compact Map</code> is avoiding the nil value with a condition.</p><p id="3b42">In the above code</p><ul><li>It takes a closure <code>transform</code> as an argument, which specifies how each element should be transformed.</li><li>Inside <code>compactMap</code>, a new array <code>result</code> is created to store the transformed non-nil elements.</li><li>It then iterates over each element of the original array (<code>self</code>), applies the transformation closure to each element, and checks if the result is non-nil.</li><li>If the result is non-nil, it appends the transformed element to the <code>result</code> array.</li><li>Finally, it returns the <code>result</code> array containing all the non-nil transformed elements.</li></ul><blockquote id="9909"><p>How to consume it?</p></blockquote><div id="41f1"><pre><span class="hljs-keyword">let</span> mapArray <span class="hljs-operator">=</span> [<span class="hljs-number">23</span>, <span class="hljs-number">43</span>, <span class="hljs-number">56</span>, <span class="hljs-literal">nil</span>, <span class="hljs-number">75</span>, <span class="hljs-number">9</span>, <span class="hljs-number">14</span>] <span class="hljs-keyword">let</span> result<span class="hljs-operator">=</span> mapArray.compactMap { <span class="hljs-variable">$0</span> } <span class="hljs-built_in">debugPrint</span>(result) output: [<span class="hljs-number">23</span>, <span class="hljs-number">43</span>, <span class="hljs-number">56</span>, <span class="hljs-number">75</span>, <span class="hljs-number">9</span>, <span class="hljs-number">14</span>]</pre></div><blockquote id="854c"><p>Flat Map()</p></blockquote><p id="b656">FlatMap is typically the same as <code>map</code> does, only difference is that it always return flatten array joining all elements. Typically used to collect specific data in n

Options

ested array, dictionary or set</p><div id="6431"><pre><span class="hljs-keyword">extension</span> <span class="hljs-title class_">Array</span> { <span class="hljs-keyword">func</span> <span class="hljs-title function_">flatMap</span><<span class="hljs-type">T</span>>(<span class="hljs-keyword">_</span> <span class="hljs-params">transform</span>: (<span class="hljs-type">Element</span>) -> [<span class="hljs-type">T</span>]) -> [<span class="hljs-type">T</span>] { <span class="hljs-keyword">var</span> result <span class="hljs-operator">=</span> <span class="hljs-type">T</span> <span class="hljs-keyword">for</span> element <span class="hljs-keyword">in</span> <span class="hljs-keyword">self</span> { result.append(contentsOf: transform(element)) } <span class="hljs-keyword">return</span> result } }</pre></div><p id="75fa">In the above code</p><ul><li>It takes a closure <code>transform</code> as an argument, which specifies how each element should be transformed into a sequence.</li><li>Inside <code>flatMap</code>, a new array <code>result</code> is created to store the flattened elements.</li><li>It then iterates over each element of the original array (<code>self</code>), applies the transformation closure to each element, and concatenates the resulting sequences into the <code>result</code> array.</li><li><code>append(contentsOf: )</code> will add the elements of a sequence to the end of the array.</li><li>Finally, it returns the <code>result</code> array containing all the flattened elements.</li></ul><blockquote id="2285"><p>How to consume it?</p></blockquote><div id="117f"><pre><span class="hljs-keyword">let</span> arrayOfArrays <span class="hljs-operator">=</span> [[<span class="hljs-number">1</span>, <span class="hljs-number">2</span>, <span class="hljs-number">3</span>], [<span class="hljs-number">4</span>, <span class="hljs-number">5</span>, <span class="hljs-number">6</span>], [<span class="hljs-number">7</span>, <span class="hljs-number">8</span>, <span class="hljs-number">9</span>]] <span class="hljs-keyword">let</span> transformedArray <span class="hljs-operator">=</span> arrayOfArrays.flatMap { <span class="hljs-variable">$0</span> } <span class="hljs-built_in">print</span>(transformedArray) output: [<span class="hljs-number">1</span>, <span class="hljs-number">2</span>, <span class="hljs-number">3</span>, <span class="hljs-number">4</span>, <span class="hljs-number">5</span>, <span class="hljs-number">6</span>, <span class="hljs-number">7</span>, <span class="hljs-number">8</span>, <span class="hljs-number">9</span>]</pre></div><h1 id="3bda">Conclusion</h1><p id="057c">There are plenty of other use cases for higher-order functions. This is a gist of what we have discovered today:</p><ul><li>If you need to simply transform a value to another value, then use <code>map</code>.</li><li>If you need to remove nil values, then use <code>compactMap</code>.</li><li>If you need to flatten your result one level down, then use <code>flatMap</code>.</li></ul><p id="0a10">Thank you for your time and attention! 👏👏👏</p><p id="33e6">Do clap👏 if you like this and comment your suggestions!!! <i>Happy coding</i>!!!</p><blockquote id="1e0c"><p>Source Code: <a href="https://github.com/Vikassingamsetty/HigherOrderFunctions.git">GitHub</a></p></blockquote></article></body>

Putting events onto a queue and listening to them yourself is a bit like this, I guess. Credit David Hoffman

Cloud skills for Data Engineers: Google Cloud (Part 1)

How to use Google Pub/Sub

About me

Hello 👋 I’m Hugo Lu — I started my career working in M&A in London before doing data engineering at JUUL and London-based Fintech, Codat. I’m now CEO at Orchestra, which is a tool to help Data Teams release data into Production reliably and efficiently. ⭐️

Introduction

I’ve mentioned google pub sub a lot in my articles, and with good reason. Fundamentally, if you (the data team) can get the software team to just send you events with all the information in them you’ll ever need) then your life just got ALOT easier. You have a big incentive to do this, but with one snag — how do you ingest events? There are many options, Google Pub/Sub is but one! So let’s dive in.

Pub sub in a nutshell

Pubsub is a way to get events in a data warehouse without using a data ingestion tool. As I alluded to, this is also often best practice. It’s often best practice because you (the data team) have to speak to your colleagues in software engineering (data producers) and thereby by having this dialogue you are already more advanced than 70% of data teams as you instil better lines of communication and better data culture. You also get to do less transformation and fundamentally get software engineers to do some of your work for you.

Furthermore, you realise that any time you need an event from an internal system, there is a single replicable thing to do (get events). Events are fantastic because they typically have all the data you ever need, can easily be aggregated, and are cheap! Streaming events into bigquery is CHEAP. No need for CDC on a massive table. No need for flat copying 1bn rows using a data ingest tool. Just use events.

Getting set-up

This is what it should look like in the Google Cloud console — I assume you already have this:

Setting up a topic

A topic is an entity that allows messages to be put onto it. It is similar to a queue in Azure. An important thing to bear in mind is that you should enforce the schema on messages that get put on the topic. This is called a data contract, and you can read more about that here from Andrew Jones.

The first thing you’ll do, then, is actually define a schema. Hit that “Use a schema” checkbox and build out a schema- this uses something you might have heard of called Google Protocol Buffer or “Protobuf”. It’s not that complicated.

After doing that, you’ll get something like this.

Some options

The revisions section is arguably not even best practice. If people keep changing the schema, it may be sensible to just force people to change the topic they put messages on for radical changes. If something changes from a string to an integer however, the important thing is that when you set up destinations for the data they’re partitioned in such a way that you don’t get differently-formatted data in the same data store — therefore generally I like to be pretty strict on allowing revisions.

Enabling message retention is normally unnecessary. It is costly, not recommended.

Encryption refers to how the data is formatted when it is in transit. You can read more about that here.

Ok — hit create.

Subscriptions

Ok head over to subscriptions on the LHS. You’ll see BQ made a subscription for you. Click the three dots on the RHS and hit edit to see some more cool stuff

A wild subscription appeared
The settings for a subscription

The subscription is an object that listens to messages on the topic and does stuff with them. This configuration is very powerful. You can retain messages, expire the subscription, and do cool stuff with the messages like push them to cloud storage or anything really. Select to BigQuery, and create a new dataset and table. You’ll also be able to select “write metadata” on the table, which will write the metadata from the message e.g. the time it was delivered, so when you specified your schema in the first part, you can leave stuff out. This also ensures there is a common metadata structure across all events which is desirable.

Ensure you create the table in the relevant dataset in Bigquery too. Google Cloud can auto create a dataset for you, but not a table (stupid). Ensure you add this schema to the table if you selected write metadata:

You will also need to add columns here that mirror the schema you set in Protobuf.

Also see:

Dead letter queue

If the message cannot be delivered (maybe you destroyed your bigquery) then typically these get put on something known as a “Dead Letter Queue”. This is basically just another Topic s.t. every Topic has another Topic for failed messages (this gets auto made in Azure, not so in BQ). The best practice thing to do is to ensure there is also a subscription for the dead letter queue, handling the messages that cannot be delivered. For example, you might have a table called {event_name} and maybe the dead letter queue ingests data to a separate table called {event_name_dl}. That way, you have all the data getting ingested but it’s separate so you don’t get corrupted data.

Google Cloud has created a service account to operate the subscription, however that service account does not have access to Bigquery. You’ll need to add it in IAM. See a nice error message post from StackExchange below:

I also got stuck. This is how I unstuck myself.

But the problem is that you cannot see this service account (even after selecting “Include Google-provided role grants”).

Solution:

* Visit https://console.cloud.google.com/iam-admin/iam

* Choose your project

* Click “Add+” and enter the principal email

* Provide the Roles as required (double check docs)

* BigQuery Data Editor

* BigQuery Metadata Viewer

Success

Sending an event onto the topic

To send an event onto the topic, you’ll need to ensure you have the relevant credentials set in the right way as an environment variable. This is non-trivial. There are multiple hacky ways to do it — ignore them all and do it properly by creating a new service principal and assigning it the role of Cloud Pub/Sub Service Agent. Go to Keys and download the json file — this is important.

If you see something like this, you’re on the right track

You’ll then need to head over to the Topic and add the Pub/Sub roles to service principal for that topic:

Go to the topic, the permissions are weirdly on the RHS

Ok now you’re ready.

First run pip install google-cloud-pubsub.

See here.

Then, you need code like the below:

"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
import os
from concurrent import futures
from typing import Callable

from google.cloud import pubsub_v1

project_id = "yourid"
topic_id = "yourtopic id"
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "Route to the creds you downloaded"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []


def get_callback(
    publish_future: pubsub_v1.publisher.futures.Future,
    data: str,
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
    def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
        try:
            # Wait 60 seconds for the publish call to succeed.
            print(publish_future.result(timeout=60))
        except futures.TimeoutError:
            print(f"Publishing {data} timed out.")

    return callback


for i in range(10):
    id_ = i + 100
    data = str(
        {
            "name": "Message",  
            "id": 100,  
            "email": "[email protected]",  
            "numbers": "123455"  
        },
    ).replace("'", '"')
    # Note - the string you put into bytes needs to be a properly formatted json string 
    # Do this, use json.dumps, JSON.stringify() etc.
    
    # When you publish a message, the client returns a future.
    publish_future = publisher.publish(topic_path, data.encode("utf-8"))
    # Non-blocking. Publish failures are handled in the callback function.
    publish_future.add_done_callback(get_callback(publish_future, data))
    publish_futures.append(publish_future)

# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with error handler to {topic_path}.")

And that’s it!

Head to the topic and you’ll see:

Some stuff got sent

And in Bigquery:

Woohoo

To make this really best practice, you should ensure you are streaming into a partitioned table. More on that here: https://stackoverflow.com/questions/61114455/pub-sub-on-daily-table

Also if you want to do this in Azure, it’s a little more complicated but I wrote this up too: Azure Service Bus.

Conclusion

This article covered the practical steps you need to take to get started ingesting event-style data using BigQuery Pub/Sub.

This is a very very powerful thing to do. The most tricky and important sources of data in tech companies (and similar) come from proprietary databases. These are owned by Software Engineering teams, and in many ways you (the data engineer, the analytics engineer, the head of data) are at their behest if you piss them off. Asking them to open up their networks so a tool like Fivetran can tunnel in will definitely not make them happier.

Instead, impress them. Impress them by demonstrating your knowledge of reasonably intermediate engineering concepts like message buses and BigQuery pub/sub. Propose a few lines of code they can use to send you events, and use these for analysis instead of replicating tables.

Not only is this architecturally elegant but culturally it’s enormously beneficial to be on speaking terms with your software engineers. Taking an events-first approach to data / data ingestion is also beneficial because event data is intuitive to understand and fun to work with.

Hope you enjoy. Feel free to connect on Linkedin 👍

References

  1. https://www.googlecloudcommunity.com/gc/Data-Analytics/BigQuery-Subscription-Error-Incompatible-schema-type-DOUBLE-vs/m-p/454809
  2. Publishing docs: https://cloud.google.com/pubsub/docs/publisher#python
Data Release Pipeline
Google Cloud Platform
Bigquery
Data Engineering
Gcp App Dev
Recommended from ReadMedium