Serverless data workflow

Streaming Data with AWS Kinesis and Lambda

Maksim Pecherskiy

Data Engineer

A look back

z8HRk.png

Streaming Data with AWS Kinesis and Lambda

Current partitioning

FMktI.png

Streaming Data with AWS Kinesis and Lambda

Filtering out speeders

IPwGK.png

Streaming Data with AWS Kinesis and Lambda

Aggregating by day

tvBXx.png

Streaming Data with AWS Kinesis and Lambda

Open recordReaderS3

aIahM.png

Streaming Data with AWS Kinesis and Lambda

Editing recordReaderS3

...
import pytz
from datetime import datetime
tz = pytz.timezone('America/Los_Angeles')
Streaming Data with AWS Kinesis and Lambda

Editing recordReaderS3

def record_created_handler(event, context):

... ## Get top speeds that exceed the limit of 45 too_fast = top_speeds.loc[top_speeds.speed > SPEED_ALERT_THRESHOLD :]
## Generate object key fdate = datetime.now(tz).strftime("%Y%m%d/%HH%MM%SS") obj_key = f"speeders/{fdate}.csv" # filename in speeders folder
## Write the object to S3 s3.put_object(Bucket='sd-vehicle-data', Key=obj_key, Body=too_fast.to_csv(sep=" ", index=False) )
Streaming Data with AWS Kinesis and Lambda

Writing speeders by date and time

s3QY4.png

Streaming Data with AWS Kinesis and Lambda

speederAggregator

tvBXx.png

Streaming Data with AWS Kinesis and Lambda

Create speederAggregator

3pGYX.png

Streaming Data with AWS Kinesis and Lambda

Add AWS data wrangler layer

7TO0V.png

Streaming Data with AWS Kinesis and Lambda

speederAggregator Resources

sO0op.png

Streaming Data with AWS Kinesis and Lambda

Create the timed trigger

4Qi4i.png

Streaming Data with AWS Kinesis and Lambda

y0FkQ.png

cron(50 6 * * ? *)
  • On Minute 50
  • Of the 6th hour (UTC)
  • Of Every Day of the Month
  • Of Every Month
  • Of Every Day of the Week
  • Of Every Year
Streaming Data with AWS Kinesis and Lambda

speederAggregator callback

import boto3, pytz, pandas as pd

s3 = boto3.client("s3" ...)
...

def speeder_aggregator(event, context): tz = pytz.timezone('America/Los_Angeles') filter_date = datetime.now(tz).strftime("%Y%m%d")
Streaming Data with AWS Kinesis and Lambda

speederAggregator callback

def speeder_aggregator(event, context):
  ...
  objects = s3.list_objects_v2(
      Bucket='sd-vehicle-data', Prefix=f'speeders/{filter_date}')
  day_data = []

for obj in objects['Contents']: print(obj['Key']) day_record = s3.get_object(Bucket='sd-vehicle-data', Key = obj['Key']) day_data.append( pd.read_csv(day_record['Body'], delimiter = " ")) # Concatenate new records into a single dataframe. data = pd.concat(day_data) data.columns = ["record_id", "timestamp", "vin", "lon", "lat", "speed"]
Streaming Data with AWS Kinesis and Lambda

awswrangler package

import awswrangler as wr

session = boto3.Session(aws_access_key_id = AWS_KEY, aws_secret_access_key = AWS_SECRET, region_name="us-east-1")
df = wr.s3.read_csv(f"s3://sd-vehicle-data/speeders/{filter_date}") df.head()
Streaming Data with AWS Kinesis and Lambda

Old way vs awswrangler

def speeder_aggregator(event, context):
  ...
  objects = s3.list_objects_v2(
      Bucket='sd-vehicle-data', Prefix=f'speeders/{filter_date}')
  day_data = []

  for obj in objects['Contents']:
      print(obj['Key'])
      day_record = s3.get_object(Bucket='sd-vehicle-data', Key = obj['Key'])
      day_data.append(
          pd.read_csv(day_record['Body'], delimiter = " "))

  # Concatenate new records into a single dataframe.
  data = pd.concat(day_data)
  data.columns = ["record_id", "timestamp", "vin", "lon", "lat", "speed"]
Streaming Data with AWS Kinesis and Lambda

Old way vs awswrangler

import awswrangler as wr

session = boto3.Session(aws_access_key_id = AWS_KEY, 
                        aws_secret_access_key = AWS_SECRET, region_name="us-east-1")
def speeder_aggregator(event, context):
    df = wr.s3.read_csv(f"s3://sd-vehicle-data/speeders/{filter_date}")      
    df.head()
Streaming Data with AWS Kinesis and Lambda

Writing aggregate speeders file

wr.s3.to_csv(df, 
             f"s3://sd-vehicle-data/speeders_daily/{filter_date}.csv", 
             boto3_session=session, 
             sep=" ", index=False)
Streaming Data with AWS Kinesis and Lambda

Save and test

agF2b.png

Streaming Data with AWS Kinesis and Lambda

Enable the trigger

COfYY.png

Streaming Data with AWS Kinesis and Lambda

Review

u1Jwm.png

Streaming Data with AWS Kinesis and Lambda

Review

7ZYPV.png

Streaming Data with AWS Kinesis and Lambda

Review

  • Cron for scheduling
  • wr.s3.read_csv()
  • wr.s3.write_csv()
Streaming Data with AWS Kinesis and Lambda

Let's practice!

Streaming Data with AWS Kinesis and Lambda

Preparing Video For Download...