Automate Your BigQuery Schema Definitions With 5 Lines of Python
Tired of manually writing my BigQuery schemas, I wrote a function that makes schema definition less time-consuming.
Optimizing BigQuery Schema Definitions
One thing I’ve learned while programming is that I’d rather do more work now to do less work later.
That’s why, if I find myself performing a redundant operation, I begin exploring the possibility of writing a function or implementing another method to avoid doing mundane, repetitive work.
Schema definition, while important, is one of the most mundane, repetitive tasks I do as a data engineer. While this task can be completed in the BigQuery UI, since I primarily use the Python client, I define my schemas manually to ensure that GCP doesn’t parse anything wrong.
In doing so, I often end up with some pretty ugly, lengthy code that, even with a config file, can become unruly.
If you’re dealing with nested fields, it can become downright unpleasant, like the below snippet:
schema = [bigquery.SchemaField("abstract", "STRING", mode="NULLABLE"),
bigquery.SchemaField("web_url", "STRING", mode="NULLABLE"),
bigquery.SchemaField("snippet", "STRING", mode="NULLABLE"),
bigquery.SchemaField("lead_paragraph", "STRING", mode="NULLABLE"),
bigquery.SchemaField("print_section", "STRING", mode="NULLABLE"),
bigquery.SchemaField("print_page", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("source", "STRING", mode="NULLABLE"),
bigquery.SchemaField("multimedia", "RECORD", mode="REPEATED", fields=[
bigquery.SchemaField("rank", "STRING", mode="NULLABLE"),
bigquery.SchemaField("caption", "STRING", mode="NULLABLE"),
bigquery.SchemaField("credit", "STRING", mode="NULLABLE"),
bigquery.SchemaField("type", "STRING", mode="NULLABLE"),
bigquery.SchemaField("url", "STRING", mode="NULLABLE"),
bigquery.SchemaField("height", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("width", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("legacy", "RECORD", mode="REPEATED", fields=[
bigquery.SchemaField("xlarge", "STRING", mode="NULLABLE"),
bigquery.SchemaField("xlargewidth", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("xlargeheight", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("crop_name", "STRING", mode="NULLABLE")
])
]),
bigquery.SchemaField("headline", "RECORD", mode="REPEATED",
fields=[
bigquery.SchemaField("main", "STRING", mode="NULLABLE"),
bigquery.SchemaField("content_kicker", "STRING", mode="NULLABLE"),
bigquery.SchemaField("print_headline", "STRING", mode="NULLABLE"),
bigquery.SchemaField("name", "STRING", mode="NULLABLE"),
bigquery.SchemaField("seo", "STRING", mode="NULLABLE"),
bigquery.SchemaField("sub", "STRING", mode="NULLABLE"),
])]
I realized, recently, that writing ‘bigquery.SchemaField’ out every time is unnecessary. Remembering the write parentheses and comma combination was annoying and prone to syntax errors.
So I took some time and made my life (and, hopefully, now yours) easier.
Below is my process. Feel free to copy/paste, replicate and integrate.
Pardon the interruption: For more Python, SQL and cloud computing walkthroughs, follow Pipeline: Your Data Engineering Resource.
To receive my latest writing, you can follow me as well.
Creating the Schema Definition Function
Depending on how thorough you are with your schema definition, there are two elements we use repeatedly:
- Field (i.e. “print_headline”)
- Type (i.e. “STRING”)
The first bit of the function creates a placeholder line used as the basis for a loop.
schema = bigquery.SchemaField(fields, types)
We’ll store our desired fields and types as lists in separate variables.
field_list = ["print_section", "print_page", "source"]
type_list = ["STRING", "INTEGER", "STRING"]
schema = bigquery.SchemaField(fields, types)
We only need one loop. Note how we’re combining two lists, field_list and type_list using the zip() function.
field_list = ["print_section", "print_page", "source"]
type_list = ["STRING", "INTEGER", "STRING"]
for fields, types in zip(field_list, type_list):
schema = bigquery.SchemaField(fields, types)
The final step of the process is to store the output, which we’ll do in an empty list, defined before the loop, and append it to the schema.
field_list = ["print_section", "print_page", "source"]
type_list = ["STRING", "INTEGER", "STRING"]
schema_list = []
for fields, types in zip(field_list, type_list):
schema = bigquery.SchemaField(fields, types)
schema_list.append(schema)
If we’re executing this in a script and not in a function, we can leave this as is (but it will be more than five lines).
However, if we want to use this as a function, we’ll want to eliminate the field_list and type_list variable, since we’ll supply those inputs when we call the function, seen in this function definition.
def create_schema(field_list: list, type_list: list):
schema_list = []
for fields, types in zip(field_list, type_list):
schema = bigquery.SchemaField(fields, types)
schema_list.append(schema)
return schema_list
Calling the Function
Once you have the function defined, you can call it within your script.
This is what the output should look like:
The obvious place to call this function would be when configuring the job for the final upload to BigQuery.
bq_client = bigquery.Client()
dataset_ref = client.dataset(dataset_id)
dataset_table_id = dataset_ref.table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.write_disposition='WRITE_TRUNCATE'
job_config.source_format = bigquery.SourceFormat.CSV
job_config.autodetect=False
# Calling create_schema()
job_config.schema = create_schema()
job_config.ignore_unknown_values=True
job = client.load_table_from_dataframe(
df,
table_id,
location='US',
job_config=job_config)
job.result()
For a neater script, I’ll compile the above code into a function.
def bq_load(df, dataset_id: str, table_id: str, schema):
bq_client = bigquery.Client()
dataset_ref = client.dataset(dataset_id)
dataset_table_id = dataset_ref.table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.write_disposition='WRITE_TRUNCATE'
job_config.source_format = bigquery.SourceFormat.CSV
job_config.autodetect=False
# Calling create_schema()
job_config.schema = schema
job_config.ignore_unknown_values=True
job = client.load_table_from_dataframe(df,
table_id,
location='US',
job_config=job_config)
return job.result()
Finally, we can call the create_schema function inside of the bq_load function. For a neater script, we can store variables in a config file.
def main():
# Function that makes the final data frame.
final_df = make_df()
# BQ load function. Note how we can call 'create_schema' inside.
df_to_bq = bq_load(final_df, cfg.dataset_id, cfg.table_id,
create_schema(cfg.field_list, cfg.type_list))
return 'Success!'
Recap & Takeaway
By applying fairly basic Python concepts like loops and list operations, we can create a simple but efficient way to avoid repetitious schema writing.
Before you try this on your own, I encourage you to keep the following in mind:
- Be careful when compiling and combining your field lists and type lists. Ensure all fields and types are correct, just as you would when defining a schema regularly.
- Defining this operation as a function makes it easy to repeat the operation for multiple tables. Just make sure you clearly label your list variables.
- If you need to define complex data types like nested or repeated fields, you’ll need to adjust the logic to reflect the levels of nesting you’ll encounter
Again, this is not mind-blowing code. But I hope that this walk-through has helped demonstrate how to automate your schema definitions and also shown you how to think about optimizing repetitive tasks using Python.
Create a job-worthy data portfolio. Learn how with my free project guide.