Serverless data workflow

Streaming de données avec AWS Kinesis et Lambda

Maksim Pecherskiy

Data Engineer

A look back

z8HRk.png

Streaming de données avec AWS Kinesis et Lambda

Current partitioning

FMktI.png

Streaming de données avec AWS Kinesis et Lambda

Filtering out speeders

IPwGK.png

Streaming de données avec AWS Kinesis et Lambda

Aggregating by day

tvBXx.png

Streaming de données avec AWS Kinesis et Lambda

Open recordReaderS3

aIahM.png

Streaming de données avec AWS Kinesis et Lambda

Editing recordReaderS3

...
import pytz
from datetime import datetime
tz = pytz.timezone('America/Los_Angeles')
Streaming de données avec AWS Kinesis et Lambda

Editing recordReaderS3

def record_created_handler(event, context):

... ## Get top speeds that exceed the limit of 45 too_fast = top_speeds.loc[top_speeds.speed > SPEED_ALERT_THRESHOLD :]
## Generate object key fdate = datetime.now(tz).strftime("%Y%m%d/%HH%MM%SS") obj_key = f"speeders/{fdate}.csv" # filename in speeders folder
## Write the object to S3 s3.put_object(Bucket='sd-vehicle-data', Key=obj_key, Body=too_fast.to_csv(sep=" ", index=False) )
Streaming de données avec AWS Kinesis et Lambda

Writing speeders by date and time

s3QY4.png

Streaming de données avec AWS Kinesis et Lambda

speederAggregator

tvBXx.png

Streaming de données avec AWS Kinesis et Lambda

Create speederAggregator

3pGYX.png

Streaming de données avec AWS Kinesis et Lambda

Add AWS data wrangler layer

7TO0V.png

Streaming de données avec AWS Kinesis et Lambda

speederAggregator Resources

sO0op.png

Streaming de données avec AWS Kinesis et Lambda

Create the timed trigger

4Qi4i.png

Streaming de données avec AWS Kinesis et Lambda

y0FkQ.png

cron(50 6 * * ? *)
  • On Minute 50
  • Of the 6th hour (UTC)
  • Of Every Day of the Month
  • Of Every Month
  • Of Every Day of the Week
  • Of Every Year
Streaming de données avec AWS Kinesis et Lambda

speederAggregator callback

import boto3, pytz, pandas as pd

s3 = boto3.client("s3" ...)
...

def speeder_aggregator(event, context): tz = pytz.timezone('America/Los_Angeles') filter_date = datetime.now(tz).strftime("%Y%m%d")
Streaming de données avec AWS Kinesis et Lambda

speederAggregator callback

def speeder_aggregator(event, context):
  ...
  objects = s3.list_objects_v2(
      Bucket='sd-vehicle-data', Prefix=f'speeders/{filter_date}')
  day_data = []

for obj in objects['Contents']: print(obj['Key']) day_record = s3.get_object(Bucket='sd-vehicle-data', Key = obj['Key']) day_data.append( pd.read_csv(day_record['Body'], delimiter = " ")) # Concatenate new records into a single dataframe. data = pd.concat(day_data) data.columns = ["record_id", "timestamp", "vin", "lon", "lat", "speed"]
Streaming de données avec AWS Kinesis et Lambda

awswrangler package

import awswrangler as wr

session = boto3.Session(aws_access_key_id = AWS_KEY, aws_secret_access_key = AWS_SECRET, region_name="us-east-1")
df = wr.s3.read_csv(f"s3://sd-vehicle-data/speeders/{filter_date}") df.head()
Streaming de données avec AWS Kinesis et Lambda

Old way vs awswrangler

def speeder_aggregator(event, context):
  ...
  objects = s3.list_objects_v2(
      Bucket='sd-vehicle-data', Prefix=f'speeders/{filter_date}')
  day_data = []

  for obj in objects['Contents']:
      print(obj['Key'])
      day_record = s3.get_object(Bucket='sd-vehicle-data', Key = obj['Key'])
      day_data.append(
          pd.read_csv(day_record['Body'], delimiter = " "))

  # Concatenate new records into a single dataframe.
  data = pd.concat(day_data)
  data.columns = ["record_id", "timestamp", "vin", "lon", "lat", "speed"]
Streaming de données avec AWS Kinesis et Lambda

Old way vs awswrangler

import awswrangler as wr

session = boto3.Session(aws_access_key_id = AWS_KEY, 
                        aws_secret_access_key = AWS_SECRET, region_name="us-east-1")
def speeder_aggregator(event, context):
    df = wr.s3.read_csv(f"s3://sd-vehicle-data/speeders/{filter_date}")      
    df.head()
Streaming de données avec AWS Kinesis et Lambda

Writing aggregate speeders file

wr.s3.to_csv(df, 
             f"s3://sd-vehicle-data/speeders_daily/{filter_date}.csv", 
             boto3_session=session, 
             sep=" ", index=False)
Streaming de données avec AWS Kinesis et Lambda

Save and test

agF2b.png

Streaming de données avec AWS Kinesis et Lambda

Enable the trigger

COfYY.png

Streaming de données avec AWS Kinesis et Lambda

Review

u1Jwm.png

Streaming de données avec AWS Kinesis et Lambda

Review

7ZYPV.png

Streaming de données avec AWS Kinesis et Lambda

Review

  • Cron for scheduling
  • wr.s3.read_csv()
  • wr.s3.write_csv()
Streaming de données avec AWS Kinesis et Lambda

Let's practice!

Streaming de données avec AWS Kinesis et Lambda

Preparing Video For Download...