avatar💡Mike Shakhomirov

Summary

The provided content outlines a tutorial for building a streaming data pipeline using AWS Kinesis and Redshift Serverless, emphasizing the use of infrastructure as code for deployment and the real-time analytics capabilities of the pipeline.

Abstract

The article "Building a Streaming Data Pipeline with Redshift Serverless and Kinesis" serves as a comprehensive guide for beginners to construct a robust data pipeline. It details the process of setting up an event streaming architecture that connects server event streams to a Redshift data warehouse, enabling real-time data analytics. The tutorial emphasizes the use of AWS CloudFormation for infrastructure as code deployment, simplifying the provisioning of resources such as Kinesis Data Streams and AWS Lambda functions to simulate event streams. It also covers the creation of a Redshift Serverless cluster and the integration of these components to facilitate the transformation and visualization of data in real-time. The article underscores the scalability, durability, and performance benefits of using AWS services for streaming data pipelines and highlights the importance of continuous integration and deployment practices in data platform management.

Opinions

  • The author advocates for the use of AWS CloudFormation and infrastructure as code to streamline the deployment of data pipelines.
  • The article suggests that AWS Redshift Serverless is a powerful solution for analytics workloads due to its automatic scaling and fast performance.
  • The author implies that direct access to Redshift from a development machine, while not recommended for production, can be beneficial during early development stages.
  • The tutorial promotes the idea that integrating AWS Kinesis with Redshift enables the creation of a flexible and reliable streaming data pipeline.
  • The author expresses a preference for using AWS managed policies, such as AmazonRedshiftAllCommandsFullAccess, to simplify role configuration for Redshift.
  • The article conveys the importance of robust CI/CD pipelines for data engineering and ML Ops, suggesting that they are essential for modern data platforms.

Building a Streaming Data Pipeline with Redshift Serverless and Kinesis

An End-To-End Tutorial for Beginners

Photo by Sebastian Pandelache on Unsplash

In this article, I will talk about one of the most popular data pipeline design patterns — event streaming. Among other benefits, it enables lightning-fast data analytics and we can create reporting dashboards that update results in real-time. I will demonstrate how it can be achieved by building a streaming data pipeline with AWS Kinesis and Redshift which can be deployed with just a few clicks using infrastructure as code. We will use AWS CloudFormation to describe our data platform architecture and simplify deployment.

Imagine that as a data engineer, you are tasked to create a data pipeline that connects server event streams with a data warehouse solution (Redshift) to transform the data and create an analytics dashboard.

Pipeline Infrastructure. Image by author.

What is a data pipeline?

It is a sequence of data processing steps. Due to logical data flow connections between these stages, each stage generates an output that serves as an input for the following stage.

I previously wrote about it in this article:

For example, event data can be created by a source at the back end, an event stream built with Kinesis Firehose or Kafka stream. It can then feed a number of various consumers or destinations. Streaming is a “must-have” solution for enterprise data due to its streaming data processing capabilities. It enables real-time data analytics.

In our use-case scenario we can set up an ELT streaming data pipeline to AWS Redshift. AWS Firehose stream can offer this type of seamless integration when streaming data will be uploaded directly into the data warehouse table. Then data can be transformed to create reports with AWS Quicksight as a BI tool for example.

Added BI component. Image by author.

This tutorial assumes that learners are familiar with AWS CLI and have minimal Python knowledge.

Workflow

1. Firstly, we will create Kinesis data stream using AWS CloudFormation

2. We will send sample data events to this event stream using AWS Lambda.

3. Finally, we will provision AWS Redshift cluster and test our streaming pipeline.

Create an AWS Kinesis Data Stream

AWS Kinesis Data Streams is an Amazon Kinesis real-time data streaming solution. It offers great scalability and durability where data streams are available for any consumer.

We can create it with CloudFormation template. The commandline script below will trigger AWS CLI command to deploy it:

KINESIS_STACK=YourRedshiftDataStream
ENV=staging
aws \
cloudformation deploy \
--template-file kinesis-data-stream.yaml \
--stack-name $KINESIS_STACK \
--capabilities CAPABILITY_IAM \
--parameter-overrides \
"Environment"=$ENV

And the template kinesis-data-stream.yaml will look as follows:

AWSTemplateFormatVersion: 2010-09-09
Description: >
  Firehose resources relating to statistics generation.
  Repository - https://github.com/your_repository.

Parameters:
  Environment:
    AllowedValues:
      - staging
      - production
    Description: Target environment
    Type: String
    Default: 'staging'

Resources:
  MyKinesisStream:
    Type: AWS::Kinesis::Stream
    Properties: 
      Name: !Sub 'your-data-stream-${Environment}'
      RetentionPeriodHours: 24 
      StreamModeDetails:
        StreamMode: ON_DEMAND
      # ShardCount: 1
      Tags: 
        -
          Key: Environment
          Value: Production

Very simple. If everything goes well we will see our Kinesis stream being deployed:

Stream created. Image by author.

2. Create AWS Lambda function to simulate an event stream

Now we would want to send some events to our Kinesis Data Stream. For this, we can create a serverless application, such as AWS Lambda. We will use `boto3` library (The AWS SDK for Python) to build a data connector with AWS Kinesis at source.

Running app locally to simulate event stream. Image by author.

Our application folder structure can look like this:

.
├── app.py
├── config
│   └── staging.yaml
├── env.json
└── requirements.txt

Our `app.py` must be able to send events to Kinesis Data Stream:

# Make sure boto3 is installed locally, i.e. pip install boto3
import json
import random
import boto3

kinesis_client = boto3.client('kinesis', region_name='eu-west-1')
# Constants:
STREAM_NAME = "your-data-stream-staging"

def lambda_handler(event, context):
    processed = 0
    print(STREAM_NAME)
    try:
        print('Trying to send events to Kinesis...')
        for i in range(0, 5):
            data = get_data()
            print(i, " : ", data)
            kinesis_client.put_record(
                StreamName=STREAM_NAME,

                Data=json.dumps(data),
                PartitionKey="partitionkey")
            processed += 1
    except Exception as e:
        print(e)
    message = 'Successfully processed {} events.'.format(processed)
    return {
        'statusCode': 200,
        'body': { 'lambdaResult': message }
    }

```

We would like to add a helper function to generate some random event data. For instance:

```python
# Helpers:
def get_data():
    return {
        'event_time': datetime.now().isoformat(),
        'event_name': random.choice(['JOIN', 'LEAVE', 'OPEN_CHAT', 'SUBSCRIBE', 'SEND_MESSAGE']),
        'user': round(random.random() * 100)}

We can use `python-lambda-local` library to run and test AWS Lambda locally like so:

pip install python-lambda-local
cd stack
python-lambda-local -e events_connector/env.json -f lambda_handler events_connector/app.py event.json --timeout 10000
# -e is for environment variables if you choose to use them.
#  event.json - sample JSON event to invoke our Lambda with.

`env.json` is just an event payload to run Lambda locally.

`config/staging.yaml` can contain any environment specific setting our application might require in future. For example:

# staging.yaml
Kinesis:
  DataStreamNsme: your-data-stream-staging

If you need to use requirements.txt it can look like this:

requests==2.28.1
pyyaml==6.0
boto3==boto3-1.26.90
python-lambda-local==0.1.13

Run this in your command line:

cd stack
pip install -r events_connector/requirements.txt

This approach is useful because we might want to deploy our serverless application in the cloud and schedule it. We can use CloudFormation template for this. I prevoiusly wrote about it here:

When we use a CloudFormation template the application can be deployed with a shell script like so:

PROFILE=your-aws-profile
STACK_NAME=YourStackNameLive
LAMBDA_BUCKET=your-lambdas-bucket.aws # Make sure it exists
 
date
 
TIME=`date +"%Y%m%d%H%M%S"`
 
base=${PWD##*/}
zp=$base".zip"
echo $zp
 
rm -f $zp

pip install --target ./package -r requirements.txt

cd package
zip -r ../${base}.zip .

cd $OLDPWD

zip -r $zp ./events_connector -x __pycache__ 
 
aws --profile $PROFILE s3 cp ./${base}.zip s3://${LAMBDA_BUCKET}/events_connector/${base}${TIME}.zip

aws --profile $PROFILE \
cloudformation deploy \
--template-file stack.yaml \
--stack-name $STACK_NAME \
--capabilities CAPABILITY_IAM \
--parameter-overrides \
"StackPackageS3Key"="events_connector/${base}${TIME}.zip" \
"Environment"="staging" \
"Testing"="false"

It is a flexible setup allowing use to create robust CI/CD pipelines. I remember I created one in this post below.

Create Redshift Serverless resources

Now we need to create Redshift Serverless cluster for our streaming data pipeline. We can provision Redshift Workgroup, create a Namespace and other required resources either manually or with CloudFormation template.

Redshift Serverless is just a data warehouse solution. It enables the execution of analytics workloads of any size without the need for data warehouse infrastructure management. Redshift is fast and generates insights from enormous volumes of data in seconds. It scales automatically to provide quick performance for even the most demanding applications.

Example view with events from our application. Image by author.

In our case we can deploy the Redshift resources using CloudFormation template definitions.

AWSTemplateFormatVersion: 2010-09-09
Parameters:
  DatabaseName:
    Description: The name of the first database in the Amazon Redshift Serverless environment.
    Type: String
    Default: dev
    MaxLength: 127
    AllowedPattern: '[a-zA-Z][a-zA-Z_0-9+.@-]*'
  AdminUsername:
    Description: The administrator's user name for Redshift Serverless Namespace being created.
    Type: String
    Default: admin
    AllowedPattern: '[a-zA-Z][a-zA-Z_0-9+.@-]*'
  AdminUserPassword:
    Description: The password associated with admin user.
    Type: String
    NoEcho: 'true'
    Default: Admin123
    MinLength: 8
    MaxLength: 64
    # AllowedPattern: '^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)[^\x00-\x20\x22\x27\x2f\x40\x5c\x7f-\uffff]+'
  NamespaceName:
    Description: A unique identifier that defines the Namespace.
    Default: rswg
    Type: String
    MinLength: 3
    MaxLength: 64
    AllowedPattern: '^[a-z0-9-]+$'
  WorkgroupName:
    Description: A unique identifier that defines the Workspace.
    Default: redshiftworkgroup
    Type: String
    MinLength: 3
    MaxLength: 64
    AllowedPattern: '^[a-z0-9-]*$'
  BaseRPU:
    Description: Base RPU for Redshift Serverless Workgroup.
    Type: Number
    MinValue: 8
    MaxValue: 512
    Default: 8
    AllowedValues: [8,16,32,40,48,56,64,72,80,88,96,104,112,120,128,136,144,152,160,168,176,184,192,200,208,216,224,232,240,248,256,264,272,280,288,296,304,312,320,328,336,344,352,360,368,376,384,392,400,408,416,424,432,440,448,456,464,472,480,488,496,504,512]
  PubliclyAccessible:
    Description: Redshift Serverless instance to be publicly accessible.
    Type: String
    Default: true
    AllowedValues:
      - true
      - false

  SubnetId:
    Description: You must have at least three subnets, and they must span across three Availability Zones
    Type: List<AWS::EC2::Subnet::Id>
  SecurityGroupIds:
    Description: The list of SecurityGroupIds in your Virtual Private Cloud (VPC).
    Type: List<AWS::EC2::SecurityGroup::Id>
  LogExportsList:
    Description: Provide comma seperate values from list "userlog","connectionlog","useractivitylog".  E.g userlog,connectionlog,useractivitylog.  If left blank, LogExport is turned off.
    Type: CommaDelimitedList 
    Default: userlog,connectionlog,useractivitylog
  EnhancedVpcRouting:
    Description: The value that specifies whether to enable enhanced virtual private cloud (VPC) routing, which forces Amazon Redshift Serverless to route traffic through your VPC.
    Type: String
    AllowedValues:
      - true
      - false
    Default: false    
Metadata:
  'AWS::CloudFormation::Interface':
    ParameterGroups:
      - Label:
          default: Namespace parameters
        Parameters:
          - NamespaceName
          - DatabaseName
          - AdminUsername
          - AdminUserPassword
          - IAMRole
          - LogExportsList          
      - Label:
          default: Workgroup parameters
        Parameters:
            - WorkgroupName
            - BaseRPU
            - PubliclyAccessible
            - SubnetId
            - SecurityGroupIds
            - EnhancedVpcRouting            
    ParameterLabels:
      DatabaseName:
        default: "Database Name"
      AdminUsername:
        default: "Admin User Name"
      AdminUserPassword:
        default: "Admin User Password"
      NamespaceName:
        default: "Namespace"
      WorkgroupName:
        default: "Workgroup"
      BaseRPU:
        default: "Base RPU"
      PubliclyAccessible:
        default: "Publicly accessible"
      SubnetId:
        default: "Subnet Ids (Select 3 Subnet Ids spanning 3 AZs)"
      SecurityGroupIds:
        default: "Security Group Id"
      IAMRole:
        default: "Associate IAM Role"
      EnhancedVpcRouting:
        default: "Enhanced VPC Routing"  
      LogExportsList:
        default: "Log Export List"
Resources:
  RedshiftAccessRole:
    Type: AWS::IAM::Role
    Properties:
      ManagedPolicyArns: 
          - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
          - arn:aws:iam::aws:policy/AmazonRedshiftAllCommandsFullAccess

      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          -
            Effect: Allow
            Principal:
              Service:
                - redshift.amazonaws.com
            Action:
              - sts:AssumeRole
  RedshiftRolePolicy:
    Type: AWS::IAM::Policy
    Properties:
      PolicyName: RedshiftRolePolicy
      PolicyDocument:
        Version: 2012-10-17
        Statement:
          -
            Effect: Allow
            Action: s3:ListAllMyBuckets
            Resource: arn:aws:s3:::*
          -
            Effect: Allow
            Action:
              - 's3:Get*'
              - 's3:List*'
            Resource: '*'
          -
            Effect: Allow
            Action: cloudwatch:*
            Resource: "*"
          -
            Effect: Allow
            Action: kinesis:*
            Resource: "*"
      Roles:
        - !Ref RedshiftAccessRole
  RedshiftServerlessNamespace:
    DependsOn: RedshiftAccessRole
    Type: 'AWS::RedshiftServerless::Namespace'
    Properties:
      AdminUsername:
        Ref: AdminUsername
      AdminUserPassword:
        Ref: AdminUserPassword
      DbName:
        Ref: DatabaseName
      NamespaceName:
        Ref: NamespaceName
      IamRoles:
        - !GetAtt [ RedshiftAccessRole, Arn ]
      LogExports:
        Ref: LogExportsList        
  RedshiftServerlessWorkgroup:
    Type: 'AWS::RedshiftServerless::Workgroup'
    Properties:
      WorkgroupName:
        Ref: WorkgroupName
      NamespaceName:
        Ref: NamespaceName
      BaseCapacity:
        Ref: BaseRPU
      PubliclyAccessible:
        Ref: PubliclyAccessible
      SubnetIds:
        Ref: SubnetId
      SecurityGroupIds:
        Ref: SecurityGroupIds
      EnhancedVpcRouting:
        Ref: EnhancedVpcRouting        
    DependsOn:
      - RedshiftServerlessNamespace
Outputs:
  ServerlessNamespace:
    Description: Name of the namespace
    Value: !Ref NamespaceName
  ServerlessWorkgroup:
    Description: Name of the workgroup
    Value: !Ref WorkgroupName

So if we run this in the command line it will deploy this stack:

STACK=YourRedshiftServerless
SUBNETID=subnet-1,subnet-2,subnet-3
SECURITYGROUPIDS=sg-your-security-group
aws \
cloudformation deploy \
--template-file redshift-serverless.yaml \
--stack-name $STACK \
--capabilities CAPABILITY_IAM \
--parameter-overrides \
"SubnetId"=$SUBNETID \
"SecurityGroupIds"=$SECURITYGROUPIDS

Typically we would want to deploy databases in a private subnet. However, in the early stages of a development, you might want to have a direct access to Redshift from dev machine.

This is not recommended for production environments, but in this development case, you can start off by putting Redshift into our `default` VPC subnet.

Now when all required pipeline resources were successfully provisioned we can connect our Kinesis stream and Redshift data warehouse.

Then we can use SQL statements to create `kinesis_data` schema in Redshift:

CREATE EXTERNAL SCHEMA kinesis_data
FROM KINESIS
IAM_ROLE 'arn:aws:iam::123456789:role/rs3-RedshiftAccessRole-1TU31HQNXM0EK';
;
CREATE MATERIALIZED VIEW "your-stream-view" AUTO REFRESH YES AS
    SELECT approximate_arrival_timestamp,
           partition_key,
           shard_id,
           sequence_number,
           refresh_time,
           JSON_PARSE(kinesis_data) as payload
      FROM kinesis_data."your-data-stream-staging";
;

The first part of this SQL will set AWS Kinesis as source. The second one will create a view with event data from our application.

Make sure to create AWS Redshift role with added `AmazonRedshiftAllCommandsFullAccess` AWS managed policy.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": "s3:ListAllMyBuckets",
            "Resource": "arn:aws:s3:::*",
            "Effect": "Allow"
        },
        {
            "Action": [
                "s3:Get*",
                "s3:List*"
            ],
            "Resource": "*",
            "Effect": "Allow"
        },
        {
            "Action": "cloudwatch:*",
            "Resource": "*",
            "Effect": "Allow"
        },
        {
            "Action": "kinesis:*",
            "Resource": "*",
            "Effect": "Allow"
        }
    ]
}

That’s it. Everything is ready to run the application to simulate the event data stream. These events will appear straight away in the Redshift view we have just created:

Application running locally. Image by author.
Example view with events from our application. Image by author.

Conclusion

We created a simple and reliable streaming data pipeline from a serverless application created with AWS Lambda to AWS Redshift data warehouse where data is transformed and ingested in real time. It enables capturing, processing, and storing data streams of any size with ease. It is great for any Machine learning (ML) pipeline where models are used to examine data and forecast inference endpoints as streams flow to their destination. We used infrastructure as code to deploy data pipeline resources. This is a preferable approach to deploying resources in different data environments.

Recommended read

Data Engineering
Machine Learning
Streaming
Data Pipeline
Big Data
Recommended from ReadMedium