Workflow dati serverless

Dati in streaming con AWS Kinesis e Lambda

Maksim Pecherskiy

Data Engineer

Ripasso

z8HRk.png

Dati in streaming con AWS Kinesis e Lambda

Partizionamento attuale

FMktI.png

Dati in streaming con AWS Kinesis e Lambda

Filtrare gli speeders

IPwGK.png

Dati in streaming con AWS Kinesis e Lambda

Aggregare per giorno

tvBXx.png

Dati in streaming con AWS Kinesis e Lambda

Apri recordReaderS3

aIahM.png

Dati in streaming con AWS Kinesis e Lambda

Modifica recordReaderS3

...
import pytz
from datetime import datetime
tz = pytz.timezone('America/Los_Angeles')
Dati in streaming con AWS Kinesis e Lambda

Modifica recordReaderS3

def record_created_handler(event, context):

... ## Prendi le velocità oltre il limite di 45 too_fast = top_speeds.loc[top_speeds.speed > SPEED_ALERT_THRESHOLD :]
## Genera la chiave dell'oggetto fdate = datetime.now(tz).strftime("%Y%m%d/%HH%MM%SS") obj_key = f"speeders/{fdate}.csv" # filename nella cartella speeders
## Scrivi l'oggetto su S3 s3.put_object(Bucket='sd-vehicle-data', Key=obj_key, Body=too_fast.to_csv(sep=" ", index=False) )
Dati in streaming con AWS Kinesis e Lambda

Scrittura dei speeders per data e ora

s3QY4.png

Dati in streaming con AWS Kinesis e Lambda

speederAggregator

tvBXx.png

Dati in streaming con AWS Kinesis e Lambda

Crea speederAggregator

3pGYX.png

Dati in streaming con AWS Kinesis e Lambda

Aggiungi layer AWS Data Wrangler

7TO0V.png

Dati in streaming con AWS Kinesis e Lambda

Risorse di speederAggregator

sO0op.png

Dati in streaming con AWS Kinesis e Lambda

Crea il trigger pianificato

4Qi4i.png

Dati in streaming con AWS Kinesis e Lambda

y0FkQ.png

cron(50 6 * * ? *)
  • Al minuto 50
  • Dell'ora 6 (UTC)
  • Di ogni giorno del mese
  • Di ogni mese
  • Di ogni giorno della settimana
  • Di ogni anno
Dati in streaming con AWS Kinesis e Lambda

callback di speederAggregator

import boto3, pytz, pandas as pd

s3 = boto3.client("s3" ...)
...

def speeder_aggregator(event, context): tz = pytz.timezone('America/Los_Angeles') filter_date = datetime.now(tz).strftime("%Y%m%d")
Dati in streaming con AWS Kinesis e Lambda

callback di speederAggregator

def speeder_aggregator(event, context):
  ...
  objects = s3.list_objects_v2(
      Bucket='sd-vehicle-data', Prefix=f'speeders/{filter_date}')
  day_data = []

for obj in objects['Contents']: print(obj['Key']) day_record = s3.get_object(Bucket='sd-vehicle-data', Key = obj['Key']) day_data.append( pd.read_csv(day_record['Body'], delimiter = " ")) # Concatena i nuovi record in un unico dataframe. data = pd.concat(day_data) data.columns = ["record_id", "timestamp", "vin", "lon", "lat", "speed"]
Dati in streaming con AWS Kinesis e Lambda

Pacchetto awswrangler

import awswrangler as wr

session = boto3.Session(aws_access_key_id = AWS_KEY, aws_secret_access_key = AWS_SECRET, region_name="us-east-1")
df = wr.s3.read_csv(f"s3://sd-vehicle-data/speeders/{filter_date}") df.head()
Dati in streaming con AWS Kinesis e Lambda

Vecchio metodo vs awswrangler

def speeder_aggregator(event, context):
  ...
  objects = s3.list_objects_v2(
      Bucket='sd-vehicle-data', Prefix=f'speeders/{filter_date}')
  day_data = []

  for obj in objects['Contents']:
      print(obj['Key'])
      day_record = s3.get_object(Bucket='sd-vehicle-data', Key = obj['Key'])
      day_data.append(
          pd.read_csv(day_record['Body'], delimiter = " "))

  # Concatena i nuovi record in un unico dataframe.
  data = pd.concat(day_data)
  data.columns = ["record_id", "timestamp", "vin", "lon", "lat", "speed"]
Dati in streaming con AWS Kinesis e Lambda

Vecchio metodo vs awswrangler

import awswrangler as wr

session = boto3.Session(aws_access_key_id = AWS_KEY, 
                        aws_secret_access_key = AWS_SECRET, region_name="us-east-1")
def speeder_aggregator(event, context):
    df = wr.s3.read_csv(f"s3://sd-vehicle-data/speeders/{filter_date}")      
    df.head()
Dati in streaming con AWS Kinesis e Lambda

Scrivi file aggregato degli speeders

wr.s3.to_csv(df, 
             f"s3://sd-vehicle-data/speeders_daily/{filter_date}.csv", 
             boto3_session=session, 
             sep=" ", index=False)
Dati in streaming con AWS Kinesis e Lambda

Salva e testa

agF2b.png

Dati in streaming con AWS Kinesis e Lambda

Abilita il trigger

COfYY.png

Dati in streaming con AWS Kinesis e Lambda

Riepilogo

u1Jwm.png

Dati in streaming con AWS Kinesis e Lambda

Riepilogo

7ZYPV.png

Dati in streaming con AWS Kinesis e Lambda

Riepilogo

  • Cron per la pianificazione
  • wr.s3.read_csv()
  • wr.s3.write_csv()
Dati in streaming con AWS Kinesis e Lambda

¡Vamos a practicar!

Dati in streaming con AWS Kinesis e Lambda

Preparing Video For Download...