Serverless data workflow

Streaming de dados com AWS Kinesis e Lambda

Maksim Pecherskiy

Data Engineer

A look back

z8HRk.png

Streaming de dados com AWS Kinesis e Lambda

Current partitioning

FMktI.png

Streaming de dados com AWS Kinesis e Lambda

Filtering out speeders

IPwGK.png

Streaming de dados com AWS Kinesis e Lambda

Aggregating by day

tvBXx.png

Streaming de dados com AWS Kinesis e Lambda

Open recordReaderS3

aIahM.png

Streaming de dados com AWS Kinesis e Lambda

Editing recordReaderS3

...
import pytz
from datetime import datetime
tz = pytz.timezone('America/Los_Angeles')
Streaming de dados com AWS Kinesis e Lambda

Editing recordReaderS3

def record_created_handler(event, context):

... ## Get top speeds that exceed the limit of 45 too_fast = top_speeds.loc[top_speeds.speed > SPEED_ALERT_THRESHOLD :]
## Generate object key fdate = datetime.now(tz).strftime("%Y%m%d/%HH%MM%SS") obj_key = f"speeders/{fdate}.csv" # filename in speeders folder
## Write the object to S3 s3.put_object(Bucket='sd-vehicle-data', Key=obj_key, Body=too_fast.to_csv(sep=" ", index=False) )
Streaming de dados com AWS Kinesis e Lambda

Writing speeders by date and time

s3QY4.png

Streaming de dados com AWS Kinesis e Lambda

speederAggregator

tvBXx.png

Streaming de dados com AWS Kinesis e Lambda

Create speederAggregator

3pGYX.png

Streaming de dados com AWS Kinesis e Lambda

Add AWS data wrangler layer

7TO0V.png

Streaming de dados com AWS Kinesis e Lambda

speederAggregator Resources

sO0op.png

Streaming de dados com AWS Kinesis e Lambda

Create the timed trigger

4Qi4i.png

Streaming de dados com AWS Kinesis e Lambda

y0FkQ.png

cron(50 6 * * ? *)
  • On Minute 50
  • Of the 6th hour (UTC)
  • Of Every Day of the Month
  • Of Every Month
  • Of Every Day of the Week
  • Of Every Year
Streaming de dados com AWS Kinesis e Lambda

speederAggregator callback

import boto3, pytz, pandas as pd

s3 = boto3.client("s3" ...)
...

def speeder_aggregator(event, context): tz = pytz.timezone('America/Los_Angeles') filter_date = datetime.now(tz).strftime("%Y%m%d")
Streaming de dados com AWS Kinesis e Lambda

speederAggregator callback

def speeder_aggregator(event, context):
  ...
  objects = s3.list_objects_v2(
      Bucket='sd-vehicle-data', Prefix=f'speeders/{filter_date}')
  day_data = []

for obj in objects['Contents']: print(obj['Key']) day_record = s3.get_object(Bucket='sd-vehicle-data', Key = obj['Key']) day_data.append( pd.read_csv(day_record['Body'], delimiter = " ")) # Concatenate new records into a single dataframe. data = pd.concat(day_data) data.columns = ["record_id", "timestamp", "vin", "lon", "lat", "speed"]
Streaming de dados com AWS Kinesis e Lambda

awswrangler package

import awswrangler as wr

session = boto3.Session(aws_access_key_id = AWS_KEY, aws_secret_access_key = AWS_SECRET, region_name="us-east-1")
df = wr.s3.read_csv(f"s3://sd-vehicle-data/speeders/{filter_date}") df.head()
Streaming de dados com AWS Kinesis e Lambda

Old way vs awswrangler

def speeder_aggregator(event, context):
  ...
  objects = s3.list_objects_v2(
      Bucket='sd-vehicle-data', Prefix=f'speeders/{filter_date}')
  day_data = []

  for obj in objects['Contents']:
      print(obj['Key'])
      day_record = s3.get_object(Bucket='sd-vehicle-data', Key = obj['Key'])
      day_data.append(
          pd.read_csv(day_record['Body'], delimiter = " "))

  # Concatenate new records into a single dataframe.
  data = pd.concat(day_data)
  data.columns = ["record_id", "timestamp", "vin", "lon", "lat", "speed"]
Streaming de dados com AWS Kinesis e Lambda

Old way vs awswrangler

import awswrangler as wr

session = boto3.Session(aws_access_key_id = AWS_KEY, 
                        aws_secret_access_key = AWS_SECRET, region_name="us-east-1")
def speeder_aggregator(event, context):
    df = wr.s3.read_csv(f"s3://sd-vehicle-data/speeders/{filter_date}")      
    df.head()
Streaming de dados com AWS Kinesis e Lambda

Writing aggregate speeders file

wr.s3.to_csv(df, 
             f"s3://sd-vehicle-data/speeders_daily/{filter_date}.csv", 
             boto3_session=session, 
             sep=" ", index=False)
Streaming de dados com AWS Kinesis e Lambda

Save and test

agF2b.png

Streaming de dados com AWS Kinesis e Lambda

Enable the trigger

COfYY.png

Streaming de dados com AWS Kinesis e Lambda

Review

u1Jwm.png

Streaming de dados com AWS Kinesis e Lambda

Review

7ZYPV.png

Streaming de dados com AWS Kinesis e Lambda

Review

  • Cron for scheduling
  • wr.s3.read_csv()
  • wr.s3.write_csv()
Streaming de dados com AWS Kinesis e Lambda

Let's practice!

Streaming de dados com AWS Kinesis e Lambda

Preparing Video For Download...