Serverless data workflow

Datenstreaming mit AWS Kinesis und Lambda

Maksim Pecherskiy

Data Engineer

A look back

z8HRk.png

Datenstreaming mit AWS Kinesis und Lambda

Current partitioning

FMktI.png

Datenstreaming mit AWS Kinesis und Lambda

Filtering out speeders

IPwGK.png

Datenstreaming mit AWS Kinesis und Lambda

Aggregating by day

tvBXx.png

Datenstreaming mit AWS Kinesis und Lambda

Open recordReaderS3

aIahM.png

Datenstreaming mit AWS Kinesis und Lambda

Editing recordReaderS3

...
import pytz
from datetime import datetime
tz = pytz.timezone('America/Los_Angeles')
Datenstreaming mit AWS Kinesis und Lambda

Editing recordReaderS3

def record_created_handler(event, context):

... ## Get top speeds that exceed the limit of 45 too_fast = top_speeds.loc[top_speeds.speed > SPEED_ALERT_THRESHOLD :]
## Generate object key fdate = datetime.now(tz).strftime("%Y%m%d/%HH%MM%SS") obj_key = f"speeders/{fdate}.csv" # filename in speeders folder
## Write the object to S3 s3.put_object(Bucket='sd-vehicle-data', Key=obj_key, Body=too_fast.to_csv(sep=" ", index=False) )
Datenstreaming mit AWS Kinesis und Lambda

Writing speeders by date and time

s3QY4.png

Datenstreaming mit AWS Kinesis und Lambda

speederAggregator

tvBXx.png

Datenstreaming mit AWS Kinesis und Lambda

Create speederAggregator

3pGYX.png

Datenstreaming mit AWS Kinesis und Lambda

Add AWS data wrangler layer

7TO0V.png

Datenstreaming mit AWS Kinesis und Lambda

speederAggregator Resources

sO0op.png

Datenstreaming mit AWS Kinesis und Lambda

Create the timed trigger

4Qi4i.png

Datenstreaming mit AWS Kinesis und Lambda

y0FkQ.png

cron(50 6 * * ? *)
  • On Minute 50
  • Of the 6th hour (UTC)
  • Of Every Day of the Month
  • Of Every Month
  • Of Every Day of the Week
  • Of Every Year
Datenstreaming mit AWS Kinesis und Lambda

speederAggregator callback

import boto3, pytz, pandas as pd

s3 = boto3.client("s3" ...)
...

def speeder_aggregator(event, context): tz = pytz.timezone('America/Los_Angeles') filter_date = datetime.now(tz).strftime("%Y%m%d")
Datenstreaming mit AWS Kinesis und Lambda

speederAggregator callback

def speeder_aggregator(event, context):
  ...
  objects = s3.list_objects_v2(
      Bucket='sd-vehicle-data', Prefix=f'speeders/{filter_date}')
  day_data = []

for obj in objects['Contents']: print(obj['Key']) day_record = s3.get_object(Bucket='sd-vehicle-data', Key = obj['Key']) day_data.append( pd.read_csv(day_record['Body'], delimiter = " ")) # Concatenate new records into a single dataframe. data = pd.concat(day_data) data.columns = ["record_id", "timestamp", "vin", "lon", "lat", "speed"]
Datenstreaming mit AWS Kinesis und Lambda

awswrangler package

import awswrangler as wr

session = boto3.Session(aws_access_key_id = AWS_KEY, aws_secret_access_key = AWS_SECRET, region_name="us-east-1")
df = wr.s3.read_csv(f"s3://sd-vehicle-data/speeders/{filter_date}") df.head()
Datenstreaming mit AWS Kinesis und Lambda

Old way vs awswrangler

def speeder_aggregator(event, context):
  ...
  objects = s3.list_objects_v2(
      Bucket='sd-vehicle-data', Prefix=f'speeders/{filter_date}')
  day_data = []

  for obj in objects['Contents']:
      print(obj['Key'])
      day_record = s3.get_object(Bucket='sd-vehicle-data', Key = obj['Key'])
      day_data.append(
          pd.read_csv(day_record['Body'], delimiter = " "))

  # Concatenate new records into a single dataframe.
  data = pd.concat(day_data)
  data.columns = ["record_id", "timestamp", "vin", "lon", "lat", "speed"]
Datenstreaming mit AWS Kinesis und Lambda

Old way vs awswrangler

import awswrangler as wr

session = boto3.Session(aws_access_key_id = AWS_KEY, 
                        aws_secret_access_key = AWS_SECRET, region_name="us-east-1")
def speeder_aggregator(event, context):
    df = wr.s3.read_csv(f"s3://sd-vehicle-data/speeders/{filter_date}")      
    df.head()
Datenstreaming mit AWS Kinesis und Lambda

Writing aggregate speeders file

wr.s3.to_csv(df, 
             f"s3://sd-vehicle-data/speeders_daily/{filter_date}.csv", 
             boto3_session=session, 
             sep=" ", index=False)
Datenstreaming mit AWS Kinesis und Lambda

Save and test

agF2b.png

Datenstreaming mit AWS Kinesis und Lambda

Enable the trigger

COfYY.png

Datenstreaming mit AWS Kinesis und Lambda

Review

u1Jwm.png

Datenstreaming mit AWS Kinesis und Lambda

Review

7ZYPV.png

Datenstreaming mit AWS Kinesis und Lambda

Review

  • Cron for scheduling
  • wr.s3.read_csv()
  • wr.s3.write_csv()
Datenstreaming mit AWS Kinesis und Lambda

Let's practice!

Datenstreaming mit AWS Kinesis und Lambda

Preparing Video For Download...