Event-Driven Scheduled Jobs Using GCP Cloud Tasks

In our company Interviewer.AI, we have been using a lot of AWS Lambda / GCP Cloud Function for scheduled tasks. For example, a daily event scheduled via AWS Cloudwatch Event to Lambda in order to send out daily insights / reports for our users.

So far the setup works for us pretty well.
Recently, we started to work on a product feature where we wanted to send out email reminders for our candidate at a specified future time (think of it as X hours after the candidate applied for a job). While the original workflow with scheduled cron timing still works to a certain extent, there will be unnecessary scanning of DB and processing for information that might not be needed.

Since each email to be sent is tied to an “event” when a candidate entry is being created, this is a good use case to use an event-driven + asynchronous architecture. We started thinking about using some sort of queueing service, however, we quickly realized there is a downside of normal queueing or publisher/subscriber service such as AWS SQS or SNS as the services usually don’t have a “temporal” element of it. Unless you publish the message to the queue say 48 hours down the road, the message will be consumed straight away by the downstream services.
This is when we were introduced to GCP Cloud Tasks by a Google Customer Engineer. In a nutshell, it’s a task queue service that comes with the feature we need the most in this problem statement, i.e. scheduling. Aside from that, other benefits that come along include task de-duplication, configurable downstream rate-limiting, and can store messages in the queue for up to 30 days.
Summary of the implementation as below:

1. Create a Cloud Function with HTTP handler to handle the task.
I will skip showing the function logic, in our case, it does 2 things:
- Send out reminder email via Sendgrid API
- Once processed, create a new scheduled task and put back in the same queue for 2nd reminder
2. At the same time, 2 things we can do to further secure the Cloud Function from an unauthorized call.
- Remove allUsers from Cloud Function Invoker permission
- Create a service account and grant it the Cloud Function Invoker call, this service account email address will be used as one parameter in subsequent task creation
Here’s how it looks like in the permission panel of our Cloud Function:
3. Creating a task queue in GCP
gcloud tasks queues create sample-queue
(you can supply additional parameters like max-dispatches-per-seconds to control the rate of message dispatching)
4. In the application logic, create (“enqueue”) the task in the task queue specified in point 3.
Here’s 2 snippet from our application that uses the Python / Nodejs client library to create the task in the task queue.
Python
import json
from google.cloud import tasks_v2beta3
from google.protobuf.timestamp_pb2 import Timestamp# TODO: UPDATE WITH YOUR OWN CONFIG BELOW
CONFIG = {
'CLOUD_TASKS': {
'QUEUE': '', # queue name
'FUNCTION_URL': '', # Cloud function URL
'SERVICE_ACCOUNT_EMAIL': '', # SA created in point 2
},
'GCP_CONFIG': {
'PROJECT': '',
'LOCATION': ''
}
}def create_http_task_with_token(
payload,
payload_date
):
client = tasks_v2beta3.CloudTasksClient()
parent = client.queue_path(
project=CONFIG['GCP_CONFIG']['PROJECT'],
location=CONFIG['GCP_CONFIG']['LOCATION'],
queue=CONFIG['CLOUD_TASKS']['QUEUE'],
)
timestamp = Timestamp()
timestamp.FromDatetime(payload_date)
task = {
'http_request': {
'http_method': 'POST',
'url': CONFIG['CLOUD_TASKS']['FUNCTION_URL'],
'oidc_token': {
'service_account_email': CONFIG['CLOUD_TASKS']['SERVICE_ACCOUNT_EMAIL'],
},
'headers': {
'Content-Type': 'application/json'
},
'body': json.dumps(payload).encode()
},
'schedule_time': timestamp
}
response = client.create_task(parent=parent, task=task)
return responseNode
const events = require('events');
const { CloudTasksClient } = require('@google-cloud/tasks');// TODO: UPDATE WITH YOUR OWN CONFIG BELOW
const config = {
CLOUD_TASKS: {
QUEUE: '',
FUNCTION_URL: '',
SERVICE_ACCOUNT_EMAIL: '',
},
GCP_CONFIG: {
PROJECT: '',
LOCATION: '',
},
};const em = new events.EventEmitter();
const client = new CloudTasksClient();
const parent = client.queuePath(
config.GCP_CONFIG.PROJECT,
config.GCP_CONFIG.LOCATION,
config.CLOUD_TASKS.QUEUE,
);em.on('SomeEvent', async (val) => {
const payload = JSON.stringify({
key: val
});
const body = Buffer.from(payload).toString('base64');
const task = {
httpRequest: {
httpMethod: 'POST',
url: config.CLOUD_TASKS.FUNCTION_URL,
oidcToken: {
serviceAccountEmail: config.CLOUD_TASKS.SERVICE_ACCOUNT_EMAIL,
},
headers: {
'Content-Type': 'application/json',
},
body,
},
scheduleTime: {
seconds: Date.now() / 1000 + 86400, // 1 day later
},
};
try {
await client.createTask({ parent, task });
} catch (err) {
winston.error(`Error in creating task: ${err}`);
}
});// you can then emit the event above in the application flow3 things we noticed:
- For Python, we have to convert Python datetime using Google Protobuf Timestamp class before passing in for the schedule_time parameter. Node on the other hand can use Javascript native Date object.
- For task deduplication, you can use the “name” key in the task object to create the task with the specified name. The name has to be in the following format:
projects/{PROJECT_NAME}/locations/{LOCATION}/queues/{QUEUE_ID}/tasks/{TASK_NAME} - As per the language best practices, Python will be using
snake_casewhile Javascript will be usingcamelCasefor the task object.
Summary
We use Google Cloud Tasks to support the queueing of event-driven scheduled tasks. Another potential use case could include scheduling a reminder/notification to a trial user couple of days before the subscription expired (vs. scan the entire user base to see which user account expiring in X number of days). The event-driven nature of this solution allows us to be more efficient and only consume the resources we need for a given task.
