Your first live lambda!

Streaming de datos con AWS Kinesis y Lambda

Maksim Pecherskiy

Data Engineer

Last lesson

StkGt.png

Streaming de datos con AWS Kinesis y Lambda

This lesson

Zb4Va.png

Streaming de datos con AWS Kinesis y Lambda

Common log format

bc75da5f-1bf6-444c-80ad-49c180e1b8de 23:16:06.000 3FTEX1G5XAK844393 -76.6990172 2.481207 40
ff8e7131-408d-463b-8d07-d016419b0656 20:26:44.000 2LAXX1C8XAK844292 114.39239199999999 36.097577 90
f29a5b3d-d0fa-43c0-9e1a-e2a5cdb8be7a 8:10:47.000 3FTEX1G5XAK844393 108.58068100000001 34.79925 37
Streaming de datos con AWS Kinesis y Lambda

Reading S3 files

import pandas as pd

results in:

Response:
{
  "errorMessage": "Unable to import module 'lambda_function': No module named 'pandas'",
  "errorType": "Runtime.ImportModuleError"
}
Streaming de datos con AWS Kinesis y Lambda

Updating the handler

#lambda_function.py
import json, boto3, pandas as pd
# Initialize clients
...

SPEED_ALERT_THRESHOLD = 45 ALERT_PHONE_NUMBER = "+1234567890"
# Helper function to get dataframe from written records def get_new_data(event): pd.read_csv()... ... return data
# Lambda function handler def record_created_handler(event, context): data = get_new_data(event) ... sns.publish() ...
Streaming de datos con AWS Kinesis y Lambda

get_new_data()

def get_new_data(event):
    # Create a list to store new object keys.
    written_objects = []

    # Iterate over each S3 event record.
    for record in event['Records']:

        # Get the variables to check for
        event_name = record['eventName']
        bucket_name = record['s3']['bucket']['name']
        obj_key = record['s3']['object']['key']
Streaming de datos con AWS Kinesis y Lambda

get_new_data()

def get_new_data(event):
        ...
        # Verify that event is created from sd-vehicle-data bucket.
        if event_name == 'ObjectCreated:Put' and bucket_name == 'sd-vehicle-data':

obj = s3.get_object(Bucket=bucket_name, Key = obj_key) df = pd.read_csv(obj['Body'], delimiter = " ", names=["record_id", "timestamp", "vin", "lon", "lat", "speed"])
written_objects.append(df)
# Concatenate new records into a single dataframe. return pd.concat(written_objects)
Streaming de datos con AWS Kinesis y Lambda

record_created_handler()

SPEED_ALERT_THRESHOLD = 45
ALERT_PHONE_NUMBER = "+1234567890"
...
def record_created_handler(event, context):
    # Call the helper method
    data = get_new_data(event)

## Get the top speeds top_speeds = data.groupby(['vin'])['speed'].max().reset_index()
## Get top speeds that exceed the limit of 45 too_fast = top_speeds.loc[top_speeds.speed > SPEED_ALERT_THRESHOLD, :]
Streaming de datos con AWS Kinesis y Lambda

record_created_handler()

SPEED_ALERT_THRESHOLD = 45
ALERT_PHONE_NUMBER = "+1234567890"
...

def record_created_handler(event, context):
    ...
    ## Send SMS
    sns.publish(PhoneNumber=ALERT_PHONE_NUMBER,
        Message="Speeding Alert \n" + too_fast.to_string())

## This doesn't go anywhere yet, but we need to return something. totals = data.groupby(['vin'])['speed'].max().reset_index() return totals.to_csv(sep=" ", index=False)
Streaming de datos con AWS Kinesis y Lambda

Test the Lambda function

jBpMo.png

KodWB.png

Streaming de datos con AWS Kinesis y Lambda

Adding environment variables

import os
os.environ.get("ENV_VARIABLE_NAME", "DEFAULT_VALUE")
import os
SPEED_ALERT_THRESHOLD = os.environ.get("SPEED_ALERT_THRESHOLD", 45)
ALERT_PHONE_NUMBER = os.environ.get("ALERT_PHONE_NUMBER", None)
...

def record_created_handler(event, context):
    ...
Streaming de datos con AWS Kinesis y Lambda

Adding environment variables

7jD6x.png

Streaming de datos con AWS Kinesis y Lambda

Adding environment variables

eDRTX.png

Streaming de datos con AWS Kinesis y Lambda

yKjWC.png

Streaming de datos con AWS Kinesis y Lambda

Adding a trigger

gLmbZD.png

Streaming de datos con AWS Kinesis y Lambda

Adding a trigger

7F08p.png

Streaming de datos con AWS Kinesis y Lambda

Adding a trigger

bPUtX.png

Streaming de datos con AWS Kinesis y Lambda

Review

ZAbfT.png

Streaming de datos con AWS Kinesis y Lambda

Review

m9jlv.png

Streaming de datos con AWS Kinesis y Lambda

Review

GTlyD.png

Streaming de datos con AWS Kinesis y Lambda

Review

XddOW.png

Streaming de datos con AWS Kinesis y Lambda

Let's practice!

Streaming de datos con AWS Kinesis y Lambda

Preparing Video For Download...