La tua prima Lambda live!

Dati in streaming con AWS Kinesis e Lambda

Maksim Pecherskiy

Data Engineer

Lezione precedente

Architettura della lezione precedente

Dati in streaming con AWS Kinesis e Lambda

Questa lezione

Obiettivi della lezione

Dati in streaming con AWS Kinesis e Lambda

Common log format

bc75da5f-1bf6-444c-80ad-49c180e1b8de 23:16:06.000 3FTEX1G5XAK844393 -76.6990172 2.481207 40
ff8e7131-408d-463b-8d07-d016419b0656 20:26:44.000 2LAXX1C8XAK844292 114.39239199999999 36.097577 90
f29a5b3d-d0fa-43c0-9e1a-e2a5cdb8be7a 8:10:47.000 3FTEX1G5XAK844393 108.58068100000001 34.79925 37
Dati in streaming con AWS Kinesis e Lambda

Lettura di file S3

import pandas as pd

results in:

Response:
{
  "errorMessage": "Unable to import module 'lambda_function': No module named 'pandas'",
  "errorType": "Runtime.ImportModuleError"
}
Dati in streaming con AWS Kinesis e Lambda

Aggiornare l'handler

#lambda_function.py
import json, boto3, pandas as pd
# Inizializza i client
...

SPEED_ALERT_THRESHOLD = 45 ALERT_PHONE_NUMBER = "+1234567890"
# Funzione di supporto per ottenere un dataframe dai record scritti def get_new_data(event): pd.read_csv()... ... return data
# Handler della Lambda def record_created_handler(event, context): data = get_new_data(event) ... sns.publish() ...
Dati in streaming con AWS Kinesis e Lambda

get_new_data()

def get_new_data(event):
    # Crea una lista per salvare le chiavi dei nuovi oggetti.
    written_objects = []

    # Itera su ogni record evento S3.
    for record in event['Records']:

        # Variabili da verificare
        event_name = record['eventName']
        bucket_name = record['s3']['bucket']['name']
        obj_key = record['s3']['object']['key']
Dati in streaming con AWS Kinesis e Lambda

get_new_data()

def get_new_data(event):
        ...
        # Verifica che l'evento provenga dal bucket sd-vehicle-data.
        if event_name == 'ObjectCreated:Put' and bucket_name == 'sd-vehicle-data':

obj = s3.get_object(Bucket=bucket_name, Key = obj_key) df = pd.read_csv(obj['Body'], delimiter = " ", names=["record_id", "timestamp", "vin", "lon", "lat", "speed"])
written_objects.append(df)
# Concatena i nuovi record in un unico dataframe. return pd.concat(written_objects)
Dati in streaming con AWS Kinesis e Lambda

record_created_handler()

SPEED_ALERT_THRESHOLD = 45
ALERT_PHONE_NUMBER = "+1234567890"
...
def record_created_handler(event, context):
    # Chiama l'helper
    data = get_new_data(event)

## Velocità massime top_speeds = data.groupby(['vin'])['speed'].max().reset_index()
## Oltre il limite di 45 too_fast = top_speeds.loc[top_speeds.speed > SPEED_ALERT_THRESHOLD, :]
Dati in streaming con AWS Kinesis e Lambda

record_created_handler()

SPEED_ALERT_THRESHOLD = 45
ALERT_PHONE_NUMBER = "+1234567890"
...

def record_created_handler(event, context):
    ...
    ## Invia SMS
    sns.publish(PhoneNumber=ALERT_PHONE_NUMBER,
        Message="Speeding Alert \n" + too_fast.to_string())

## Per ora non viene usato, ma dobbiamo restituire qualcosa. totals = data.groupby(['vin'])['speed'].max().reset_index() return totals.to_csv(sep=" ", index=False)
Dati in streaming con AWS Kinesis e Lambda

Testa la funzione Lambda

Schermata Lambda: test riuscito

Output SMS di esempio

Dati in streaming con AWS Kinesis e Lambda

Aggiungere variabili d'ambiente

import os
os.environ.get("ENV_VARIABLE_NAME", "DEFAULT_VALUE")
import os
SPEED_ALERT_THRESHOLD = os.environ.get("SPEED_ALERT_THRESHOLD", 45)
ALERT_PHONE_NUMBER = os.environ.get("ALERT_PHONE_NUMBER", None)
...

def record_created_handler(event, context):
    ...
Dati in streaming con AWS Kinesis e Lambda

Aggiungere variabili d'ambiente

Console Lambda: variabili d'ambiente

Dati in streaming con AWS Kinesis e Lambda

Aggiungere variabili d'ambiente

Impostazione variabili in console

Dati in streaming con AWS Kinesis e Lambda

Aggiungi trigger

Dati in streaming con AWS Kinesis e Lambda

Aggiungere un trigger

Selezione bucket S3 come trigger

Dati in streaming con AWS Kinesis e Lambda

Aggiungere un trigger

Configura evento ObjectCreated

Dati in streaming con AWS Kinesis e Lambda

Aggiungere un trigger

Rivedi configurazione trigger

Dati in streaming con AWS Kinesis e Lambda

Ripasso

Passi chiave: variabili, trigger, test

Dati in streaming con AWS Kinesis e Lambda

Ripasso

Flusso: S3 -> Lambda -> SNS

Dati in streaming con AWS Kinesis e Lambda

Ripasso

Best practice Lambda

Dati in streaming con AWS Kinesis e Lambda

Ripasso

Passi successivi

Dati in streaming con AWS Kinesis e Lambda

¡Vamos a practicar!

Dati in streaming con AWS Kinesis e Lambda

Preparing Video For Download...