avatarZach Quinn

Summary

The author has developed a Python function to streamline the creation of BigQuery schema definitions, reducing manual effort and potential syntax errors.

Abstract

The article discusses the author's creation of a Python function designed to automate the definition of BigQuery schemas. This function addresses the repetitive and error-prone task of manually writing schema definitions, which is a common challenge for data engineers. By leveraging Python's capabilities, such as loops and list operations, the author demonstrates how to define schemas more efficiently. The function takes lists of field names and data types as inputs and generates the schema definition, which can then be used to configure BigQuery table uploads. The article also provides a step-by-step guide on how to implement the function within a data engineering workflow, emphasizing the benefits of automating such tasks to save time and reduce mistakes.

Opinions

  • The author believes in the value of investing time upfront to create tools that save time in the long run.
  • They find the manual definition of BigQuery schemas to be a mundane and repetitive task that is prone to human error.
  • The author encourages the use of functions and automation to avoid doing tedious work, suggesting that this approach is preferable to the repetition of manual operations.
  • They advocate for the reusability of code, as demonstrated by the creation of a function that can be applied to multiple tables, provided the field lists and type lists are correctly compiled.
  • The author emphasizes the importance of accuracy in schema definitions, especially when dealing with complex data types such as nested or repeated fields, and suggests that the logic of the

Automate Your BigQuery Schema Definitions With 5 Lines of Python

Tired of manually writing my BigQuery schemas, I wrote a function that makes schema definition less time-consuming.

Automation in human form. Photo by Possessed Photography on Unsplash

Optimizing BigQuery Schema Definitions

One thing I’ve learned while programming is that I’d rather do more work now to do less work later.

That’s why, if I find myself performing a redundant operation, I begin exploring the possibility of writing a function or implementing another method to avoid doing mundane, repetitive work.

Schema definition, while important, is one of the most mundane, repetitive tasks I do as a data engineer. While this task can be completed in the BigQuery UI, since I primarily use the Python client, I define my schemas manually to ensure that GCP doesn’t parse anything wrong.

In doing so, I often end up with some pretty ugly, lengthy code that, even with a config file, can become unruly.

If you’re dealing with nested fields, it can become downright unpleasant, like the below snippet:

schema = [bigquery.SchemaField("abstract", "STRING", mode="NULLABLE"),
bigquery.SchemaField("web_url", "STRING", mode="NULLABLE"),
bigquery.SchemaField("snippet", "STRING", mode="NULLABLE"),
bigquery.SchemaField("lead_paragraph", "STRING", mode="NULLABLE"),
bigquery.SchemaField("print_section", "STRING", mode="NULLABLE"),
bigquery.SchemaField("print_page", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("source", "STRING", mode="NULLABLE"),
bigquery.SchemaField("multimedia", "RECORD", mode="REPEATED", fields=[
bigquery.SchemaField("rank", "STRING", mode="NULLABLE"),
bigquery.SchemaField("caption", "STRING", mode="NULLABLE"),
bigquery.SchemaField("credit", "STRING", mode="NULLABLE"),
bigquery.SchemaField("type", "STRING", mode="NULLABLE"),
bigquery.SchemaField("url", "STRING", mode="NULLABLE"),
bigquery.SchemaField("height", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("width", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("legacy", "RECORD", mode="REPEATED", fields=[
bigquery.SchemaField("xlarge", "STRING", mode="NULLABLE"),
bigquery.SchemaField("xlargewidth", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("xlargeheight", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("crop_name", "STRING", mode="NULLABLE")
        ])
    ]),
bigquery.SchemaField("headline", "RECORD", mode="REPEATED",
          fields=[
          bigquery.SchemaField("main", "STRING", mode="NULLABLE"),
          bigquery.SchemaField("content_kicker", "STRING", mode="NULLABLE"),
          bigquery.SchemaField("print_headline", "STRING", mode="NULLABLE"),
          bigquery.SchemaField("name", "STRING", mode="NULLABLE"),
          bigquery.SchemaField("seo", "STRING", mode="NULLABLE"),
          bigquery.SchemaField("sub", "STRING", mode="NULLABLE"),
                                                ])]

I realized, recently, that writing ‘bigquery.SchemaField’ out every time is unnecessary. Remembering the write parentheses and comma combination was annoying and prone to syntax errors.

So I took some time and made my life (and, hopefully, now yours) easier.

Below is my process. Feel free to copy/paste, replicate and integrate.

Pardon the interruption: For more Python, SQL and cloud computing walkthroughs, follow Pipeline: Your Data Engineering Resource.

To receive my latest writing, you can follow me as well.

Creating the Schema Definition Function

Depending on how thorough you are with your schema definition, there are two elements we use repeatedly:

  • Field (i.e. “print_headline”)
  • Type (i.e. “STRING”)

The first bit of the function creates a placeholder line used as the basis for a loop.

schema = bigquery.SchemaField(fields, types)

We’ll store our desired fields and types as lists in separate variables.

field_list = ["print_section", "print_page", "source"]
type_list = ["STRING", "INTEGER", "STRING"]
schema = bigquery.SchemaField(fields, types)

We only need one loop. Note how we’re combining two lists, field_list and type_list using the zip() function.

field_list = ["print_section", "print_page", "source"]
type_list = ["STRING", "INTEGER", "STRING"]
for fields, types in zip(field_list, type_list):
    schema = bigquery.SchemaField(fields, types)

The final step of the process is to store the output, which we’ll do in an empty list, defined before the loop, and append it to the schema.

field_list = ["print_section", "print_page", "source"]
type_list = ["STRING", "INTEGER", "STRING"]
schema_list = []
for fields, types in zip(field_list, type_list):
    schema = bigquery.SchemaField(fields, types)
    schema_list.append(schema)

If we’re executing this in a script and not in a function, we can leave this as is (but it will be more than five lines).

However, if we want to use this as a function, we’ll want to eliminate the field_list and type_list variable, since we’ll supply those inputs when we call the function, seen in this function definition.

def create_schema(field_list: list, type_list: list):
    
    schema_list = []
    
    for fields, types in zip(field_list, type_list):
        schema = bigquery.SchemaField(fields, types)
        schema_list.append(schema)
    return schema_list

Calling the Function

Once you have the function defined, you can call it within your script.

This is what the output should look like:

Python output of create_schema function. Screenshot and code by the author.

The obvious place to call this function would be when configuring the job for the final upload to BigQuery.

bq_client = bigquery.Client()
dataset_ref = client.dataset(dataset_id)
dataset_table_id = dataset_ref.table(table_id)
    
job_config = bigquery.LoadJobConfig()
job_config.write_disposition='WRITE_TRUNCATE'
job_config.source_format = bigquery.SourceFormat.CSV
job_config.autodetect=False
# Calling create_schema()
job_config.schema = create_schema()
job_config.ignore_unknown_values=True
job = client.load_table_from_dataframe(
    df,
    table_id,
    location='US',
    job_config=job_config)
    
    job.result()

For a neater script, I’ll compile the above code into a function.

def bq_load(df, dataset_id: str, table_id: str, schema):
    bq_client = bigquery.Client()
    dataset_ref = client.dataset(dataset_id)
    dataset_table_id = dataset_ref.table(table_id)
    
    job_config = bigquery.LoadJobConfig()
    job_config.write_disposition='WRITE_TRUNCATE'
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.autodetect=False
    # Calling create_schema()
    job_config.schema = schema
    job_config.ignore_unknown_values=True
    job = client.load_table_from_dataframe(df,
    table_id,
    location='US',
    job_config=job_config)
    
    return job.result()

Finally, we can call the create_schema function inside of the bq_load function. For a neater script, we can store variables in a config file.

def main():
   
   # Function that makes the final data frame.
   final_df = make_df()
    # BQ load function. Note how we can call 'create_schema' inside.
   df_to_bq = bq_load(final_df, cfg.dataset_id, cfg.table_id, 
                      create_schema(cfg.field_list, cfg.type_list))
   return 'Success!' 

Recap & Takeaway

By applying fairly basic Python concepts like loops and list operations, we can create a simple but efficient way to avoid repetitious schema writing.

Before you try this on your own, I encourage you to keep the following in mind:

  • Be careful when compiling and combining your field lists and type lists. Ensure all fields and types are correct, just as you would when defining a schema regularly.
  • Defining this operation as a function makes it easy to repeat the operation for multiple tables. Just make sure you clearly label your list variables.
  • If you need to define complex data types like nested or repeated fields, you’ll need to adjust the logic to reflect the levels of nesting you’ll encounter

Again, this is not mind-blowing code. But I hope that this walk-through has helped demonstrate how to automate your schema definitions and also shown you how to think about optimizing repetitive tasks using Python.

Create a job-worthy data portfolio. Learn how with my free project guide.

Python
Learning To Code
Google Cloud Platform
Data Science
Data Engineering
Recommended from ReadMedium