Building a Streaming Data Pipeline with Redshift Serverless and Kinesis
An End-To-End Tutorial for Beginners
In this article, I will talk about one of the most popular data pipeline design patterns — event streaming. Among other benefits, it enables lightning-fast data analytics and we can create reporting dashboards that update results in real-time. I will demonstrate how it can be achieved by building a streaming data pipeline with AWS Kinesis and Redshift which can be deployed with just a few clicks using infrastructure as code. We will use AWS CloudFormation to describe our data platform architecture and simplify deployment.
Imagine that as a data engineer, you are tasked to create a data pipeline that connects server event streams with a data warehouse solution (Redshift) to transform the data and create an analytics dashboard.

What is a data pipeline?
It is a sequence of data processing steps. Due to logical data flow connections between these stages, each stage generates an output that serves as an input for the following stage.
I previously wrote about it in this article:
For example, event data can be created by a source at the back end, an event stream built with Kinesis Firehose or Kafka stream. It can then feed a number of various consumers or destinations. Streaming is a “must-have” solution for enterprise data due to its streaming data processing capabilities. It enables real-time data analytics.
In our use-case scenario we can set up an ELT streaming data pipeline to AWS Redshift. AWS Firehose stream can offer this type of seamless integration when streaming data will be uploaded directly into the data warehouse table. Then data can be transformed to create reports with AWS Quicksight as a BI tool for example.

This tutorial assumes that learners are familiar with AWS CLI and have minimal Python knowledge.
Workflow
1. Firstly, we will create Kinesis data stream using AWS CloudFormation
2. We will send sample data events to this event stream using AWS Lambda.
3. Finally, we will provision AWS Redshift cluster and test our streaming pipeline.
Create an AWS Kinesis Data Stream
AWS Kinesis Data Streams is an Amazon Kinesis real-time data streaming solution. It offers great scalability and durability where data streams are available for any consumer.
We can create it with CloudFormation template. The commandline script below will trigger AWS CLI command to deploy it:
KINESIS_STACK=YourRedshiftDataStream
ENV=staging
aws \
cloudformation deploy \
--template-file kinesis-data-stream.yaml \
--stack-name $KINESIS_STACK \
--capabilities CAPABILITY_IAM \
--parameter-overrides \
"Environment"=$ENV
And the template kinesis-data-stream.yaml will look as follows:
AWSTemplateFormatVersion: 2010-09-09
Description: >
Firehose resources relating to statistics generation.
Repository - https://github.com/your_repository.
Parameters:
Environment:
AllowedValues:
- staging
- production
Description: Target environment
Type: String
Default: 'staging'
Resources:
MyKinesisStream:
Type: AWS::Kinesis::Stream
Properties:
Name: !Sub 'your-data-stream-${Environment}'
RetentionPeriodHours: 24
StreamModeDetails:
StreamMode: ON_DEMAND
# ShardCount: 1
Tags:
-
Key: Environment
Value: Production
Very simple. If everything goes well we will see our Kinesis stream being deployed:

2. Create AWS Lambda function to simulate an event stream
Now we would want to send some events to our Kinesis Data Stream. For this, we can create a serverless application, such as AWS Lambda. We will use `boto3` library (The AWS SDK for Python) to build a data connector with AWS Kinesis at source.

Our application folder structure can look like this:
.
├── app.py
├── config
│ └── staging.yaml
├── env.json
└── requirements.txt
Our `app.py` must be able to send events to Kinesis Data Stream:
# Make sure boto3 is installed locally, i.e. pip install boto3
import json
import random
import boto3
kinesis_client = boto3.client('kinesis', region_name='eu-west-1')
# Constants:
STREAM_NAME = "your-data-stream-staging"
def lambda_handler(event, context):
processed = 0
print(STREAM_NAME)
try:
print('Trying to send events to Kinesis...')
for i in range(0, 5):
data = get_data()
print(i, " : ", data)
kinesis_client.put_record(
StreamName=STREAM_NAME,
Data=json.dumps(data),
PartitionKey="partitionkey")
processed += 1
except Exception as e:
print(e)
message = 'Successfully processed {} events.'.format(processed)
return {
'statusCode': 200,
'body': { 'lambdaResult': message }
}
```
We would like to add a helper function to generate some random event data. For instance:
```python
# Helpers:
def get_data():
return {
'event_time': datetime.now().isoformat(),
'event_name': random.choice(['JOIN', 'LEAVE', 'OPEN_CHAT', 'SUBSCRIBE', 'SEND_MESSAGE']),
'user': round(random.random() * 100)}
We can use `python-lambda-local` library to run and test AWS Lambda locally like so:
pip install python-lambda-local
cd stack
python-lambda-local -e events_connector/env.json -f lambda_handler events_connector/app.py event.json --timeout 10000
# -e is for environment variables if you choose to use them.
# event.json - sample JSON event to invoke our Lambda with.
`env.json` is just an event payload to run Lambda locally.
`config/staging.yaml` can contain any environment specific setting our application might require in future. For example:
# staging.yaml
Kinesis:
DataStreamNsme: your-data-stream-staging
If you need to use requirements.txt
it can look like this:
requests==2.28.1
pyyaml==6.0
boto3==boto3-1.26.90
python-lambda-local==0.1.13
Run this in your command line:
cd stack
pip install -r events_connector/requirements.txt
This approach is useful because we might want to deploy our serverless application in the cloud and schedule it. We can use CloudFormation template for this. I prevoiusly wrote about it here:
When we use a CloudFormation template the application can be deployed with a shell script like so:
PROFILE=your-aws-profile
STACK_NAME=YourStackNameLive
LAMBDA_BUCKET=your-lambdas-bucket.aws # Make sure it exists
date
TIME=`date +"%Y%m%d%H%M%S"`
base=${PWD##*/}
zp=$base".zip"
echo $zp
rm -f $zp
pip install --target ./package -r requirements.txt
cd package
zip -r ../${base}.zip .
cd $OLDPWD
zip -r $zp ./events_connector -x __pycache__
aws --profile $PROFILE s3 cp ./${base}.zip s3://${LAMBDA_BUCKET}/events_connector/${base}${TIME}.zip
aws --profile $PROFILE \
cloudformation deploy \
--template-file stack.yaml \
--stack-name $STACK_NAME \
--capabilities CAPABILITY_IAM \
--parameter-overrides \
"StackPackageS3Key"="events_connector/${base}${TIME}.zip" \
"Environment"="staging" \
"Testing"="false"
It is a flexible setup allowing use to create robust CI/CD pipelines. I remember I created one in this post below.
Create Redshift Serverless resources
Now we need to create Redshift Serverless cluster for our streaming data pipeline. We can provision Redshift Workgroup, create a Namespace and other required resources either manually or with CloudFormation template.
Redshift Serverless is just a data warehouse solution. It enables the execution of analytics workloads of any size without the need for data warehouse infrastructure management. Redshift is fast and generates insights from enormous volumes of data in seconds. It scales automatically to provide quick performance for even the most demanding applications.

In our case we can deploy the Redshift resources using CloudFormation template definitions.
AWSTemplateFormatVersion: 2010-09-09
Parameters:
DatabaseName:
Description: The name of the first database in the Amazon Redshift Serverless environment.
Type: String
Default: dev
MaxLength: 127
AllowedPattern: '[a-zA-Z][a-zA-Z_0-9+.@-]*'
AdminUsername:
Description: The administrator's user name for Redshift Serverless Namespace being created.
Type: String
Default: admin
AllowedPattern: '[a-zA-Z][a-zA-Z_0-9+.@-]*'
AdminUserPassword:
Description: The password associated with admin user.
Type: String
NoEcho: 'true'
Default: Admin123
MinLength: 8
MaxLength: 64
# AllowedPattern: '^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)[^\x00-\x20\x22\x27\x2f\x40\x5c\x7f-\uffff]+'
NamespaceName:
Description: A unique identifier that defines the Namespace.
Default: rswg
Type: String
MinLength: 3
MaxLength: 64
AllowedPattern: '^[a-z0-9-]+$'
WorkgroupName:
Description: A unique identifier that defines the Workspace.
Default: redshiftworkgroup
Type: String
MinLength: 3
MaxLength: 64
AllowedPattern: '^[a-z0-9-]*$'
BaseRPU:
Description: Base RPU for Redshift Serverless Workgroup.
Type: Number
MinValue: 8
MaxValue: 512
Default: 8
AllowedValues: [8,16,32,40,48,56,64,72,80,88,96,104,112,120,128,136,144,152,160,168,176,184,192,200,208,216,224,232,240,248,256,264,272,280,288,296,304,312,320,328,336,344,352,360,368,376,384,392,400,408,416,424,432,440,448,456,464,472,480,488,496,504,512]
PubliclyAccessible:
Description: Redshift Serverless instance to be publicly accessible.
Type: String
Default: true
AllowedValues:
- true
- false
SubnetId:
Description: You must have at least three subnets, and they must span across three Availability Zones
Type: List<AWS::EC2::Subnet::Id>
SecurityGroupIds:
Description: The list of SecurityGroupIds in your Virtual Private Cloud (VPC).
Type: List<AWS::EC2::SecurityGroup::Id>
LogExportsList:
Description: Provide comma seperate values from list "userlog","connectionlog","useractivitylog". E.g userlog,connectionlog,useractivitylog. If left blank, LogExport is turned off.
Type: CommaDelimitedList
Default: userlog,connectionlog,useractivitylog
EnhancedVpcRouting:
Description: The value that specifies whether to enable enhanced virtual private cloud (VPC) routing, which forces Amazon Redshift Serverless to route traffic through your VPC.
Type: String
AllowedValues:
- true
- false
Default: false
Metadata:
'AWS::CloudFormation::Interface':
ParameterGroups:
- Label:
default: Namespace parameters
Parameters:
- NamespaceName
- DatabaseName
- AdminUsername
- AdminUserPassword
- IAMRole
- LogExportsList
- Label:
default: Workgroup parameters
Parameters:
- WorkgroupName
- BaseRPU
- PubliclyAccessible
- SubnetId
- SecurityGroupIds
- EnhancedVpcRouting
ParameterLabels:
DatabaseName:
default: "Database Name"
AdminUsername:
default: "Admin User Name"
AdminUserPassword:
default: "Admin User Password"
NamespaceName:
default: "Namespace"
WorkgroupName:
default: "Workgroup"
BaseRPU:
default: "Base RPU"
PubliclyAccessible:
default: "Publicly accessible"
SubnetId:
default: "Subnet Ids (Select 3 Subnet Ids spanning 3 AZs)"
SecurityGroupIds:
default: "Security Group Id"
IAMRole:
default: "Associate IAM Role"
EnhancedVpcRouting:
default: "Enhanced VPC Routing"
LogExportsList:
default: "Log Export List"
Resources:
RedshiftAccessRole:
Type: AWS::IAM::Role
Properties:
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
- arn:aws:iam::aws:policy/AmazonRedshiftAllCommandsFullAccess
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
-
Effect: Allow
Principal:
Service:
- redshift.amazonaws.com
Action:
- sts:AssumeRole
RedshiftRolePolicy:
Type: AWS::IAM::Policy
Properties:
PolicyName: RedshiftRolePolicy
PolicyDocument:
Version: 2012-10-17
Statement:
-
Effect: Allow
Action: s3:ListAllMyBuckets
Resource: arn:aws:s3:::*
-
Effect: Allow
Action:
- 's3:Get*'
- 's3:List*'
Resource: '*'
-
Effect: Allow
Action: cloudwatch:*
Resource: "*"
-
Effect: Allow
Action: kinesis:*
Resource: "*"
Roles:
- !Ref RedshiftAccessRole
RedshiftServerlessNamespace:
DependsOn: RedshiftAccessRole
Type: 'AWS::RedshiftServerless::Namespace'
Properties:
AdminUsername:
Ref: AdminUsername
AdminUserPassword:
Ref: AdminUserPassword
DbName:
Ref: DatabaseName
NamespaceName:
Ref: NamespaceName
IamRoles:
- !GetAtt [ RedshiftAccessRole, Arn ]
LogExports:
Ref: LogExportsList
RedshiftServerlessWorkgroup:
Type: 'AWS::RedshiftServerless::Workgroup'
Properties:
WorkgroupName:
Ref: WorkgroupName
NamespaceName:
Ref: NamespaceName
BaseCapacity:
Ref: BaseRPU
PubliclyAccessible:
Ref: PubliclyAccessible
SubnetIds:
Ref: SubnetId
SecurityGroupIds:
Ref: SecurityGroupIds
EnhancedVpcRouting:
Ref: EnhancedVpcRouting
DependsOn:
- RedshiftServerlessNamespace
Outputs:
ServerlessNamespace:
Description: Name of the namespace
Value: !Ref NamespaceName
ServerlessWorkgroup:
Description: Name of the workgroup
Value: !Ref WorkgroupName
So if we run this in the command line it will deploy this stack:
STACK=YourRedshiftServerless
SUBNETID=subnet-1,subnet-2,subnet-3
SECURITYGROUPIDS=sg-your-security-group
aws \
cloudformation deploy \
--template-file redshift-serverless.yaml \
--stack-name $STACK \
--capabilities CAPABILITY_IAM \
--parameter-overrides \
"SubnetId"=$SUBNETID \
"SecurityGroupIds"=$SECURITYGROUPIDS
Typically we would want to deploy databases in a private subnet. However, in the early stages of a development, you might want to have a direct access to Redshift from dev machine.
This is not recommended for production environments, but in this development case, you can start off by putting Redshift into our `default` VPC subnet.
Now when all required pipeline resources were successfully provisioned we can connect our Kinesis stream and Redshift data warehouse.
Then we can use SQL statements to create `kinesis_data` schema in Redshift:
CREATE EXTERNAL SCHEMA kinesis_data
FROM KINESIS
IAM_ROLE 'arn:aws:iam::123456789:role/rs3-RedshiftAccessRole-1TU31HQNXM0EK';
;
CREATE MATERIALIZED VIEW "your-stream-view" AUTO REFRESH YES AS
SELECT approximate_arrival_timestamp,
partition_key,
shard_id,
sequence_number,
refresh_time,
JSON_PARSE(kinesis_data) as payload
FROM kinesis_data."your-data-stream-staging";
;
The first part of this SQL will set AWS Kinesis as source. The second one will create a view with event data from our application.
Make sure to create AWS Redshift role with added `AmazonRedshiftAllCommandsFullAccess` AWS managed policy.
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "s3:ListAllMyBuckets",
"Resource": "arn:aws:s3:::*",
"Effect": "Allow"
},
{
"Action": [
"s3:Get*",
"s3:List*"
],
"Resource": "*",
"Effect": "Allow"
},
{
"Action": "cloudwatch:*",
"Resource": "*",
"Effect": "Allow"
},
{
"Action": "kinesis:*",
"Resource": "*",
"Effect": "Allow"
}
]
}
That’s it. Everything is ready to run the application to simulate the event data stream. These events will appear straight away in the Redshift view we have just created:


Conclusion
We created a simple and reliable streaming data pipeline from a serverless application created with AWS Lambda to AWS Redshift data warehouse where data is transformed and ingested in real time. It enables capturing, processing, and storing data streams of any size with ease. It is great for any Machine learning (ML) pipeline where models are used to examine data and forecast inference endpoints as streams flow to their destination. We used infrastructure as code to deploy data pipeline resources. This is a preferable approach to deploying resources in different data environments.