Your first live lambda!

Streaming Data with AWS Kinesis and Lambda

Maksim Pecherskiy

Data Engineer

Last lesson

StkGt.png

Streaming Data with AWS Kinesis and Lambda

This lesson

Zb4Va.png

Streaming Data with AWS Kinesis and Lambda

Common log format

bc75da5f-1bf6-444c-80ad-49c180e1b8de 23:16:06.000 3FTEX1G5XAK844393 -76.6990172 2.481207 40
ff8e7131-408d-463b-8d07-d016419b0656 20:26:44.000 2LAXX1C8XAK844292 114.39239199999999 36.097577 90
f29a5b3d-d0fa-43c0-9e1a-e2a5cdb8be7a 8:10:47.000 3FTEX1G5XAK844393 108.58068100000001 34.79925 37
Streaming Data with AWS Kinesis and Lambda

Reading S3 files

import pandas as pd

results in:

Response:
{
  "errorMessage": "Unable to import module 'lambda_function': No module named 'pandas'",
  "errorType": "Runtime.ImportModuleError"
}
Streaming Data with AWS Kinesis and Lambda

Updating the handler

#lambda_function.py
import json, boto3, pandas as pd
# Initialize clients
...

SPEED_ALERT_THRESHOLD = 45 ALERT_PHONE_NUMBER = "+1234567890"
# Helper function to get dataframe from written records def get_new_data(event): pd.read_csv()... ... return data
# Lambda function handler def record_created_handler(event, context): data = get_new_data(event) ... sns.publish() ...
Streaming Data with AWS Kinesis and Lambda

get_new_data()

def get_new_data(event):
    # Create a list to store new object keys.
    written_objects = []

    # Iterate over each S3 event record.
    for record in event['Records']:

        # Get the variables to check for
        event_name = record['eventName']
        bucket_name = record['s3']['bucket']['name']
        obj_key = record['s3']['object']['key']
Streaming Data with AWS Kinesis and Lambda

get_new_data()

def get_new_data(event):
        ...
        # Verify that event is created from sd-vehicle-data bucket.
        if event_name == 'ObjectCreated:Put' and bucket_name == 'sd-vehicle-data':

obj = s3.get_object(Bucket=bucket_name, Key = obj_key) df = pd.read_csv(obj['Body'], delimiter = " ", names=["record_id", "timestamp", "vin", "lon", "lat", "speed"])
written_objects.append(df)
# Concatenate new records into a single dataframe. return pd.concat(written_objects)
Streaming Data with AWS Kinesis and Lambda

record_created_handler()

SPEED_ALERT_THRESHOLD = 45
ALERT_PHONE_NUMBER = "+1234567890"
...
def record_created_handler(event, context):
    # Call the helper method
    data = get_new_data(event)

## Get the top speeds top_speeds = data.groupby(['vin'])['speed'].max().reset_index()
## Get top speeds that exceed the limit of 45 too_fast = top_speeds.loc[top_speeds.speed > SPEED_ALERT_THRESHOLD, :]
Streaming Data with AWS Kinesis and Lambda

record_created_handler()

SPEED_ALERT_THRESHOLD = 45
ALERT_PHONE_NUMBER = "+1234567890"
...

def record_created_handler(event, context):
    ...
    ## Send SMS
    sns.publish(PhoneNumber=ALERT_PHONE_NUMBER,
        Message="Speeding Alert \n" + too_fast.to_string())

## This doesn't go anywhere yet, but we need to return something. totals = data.groupby(['vin'])['speed'].max().reset_index() return totals.to_csv(sep=" ", index=False)
Streaming Data with AWS Kinesis and Lambda

Test the Lambda function

jBpMo.png

KodWB.png

Streaming Data with AWS Kinesis and Lambda

Adding environment variables

import os
os.environ.get("ENV_VARIABLE_NAME", "DEFAULT_VALUE")
import os
SPEED_ALERT_THRESHOLD = os.environ.get("SPEED_ALERT_THRESHOLD", 45)
ALERT_PHONE_NUMBER = os.environ.get("ALERT_PHONE_NUMBER", None)
...

def record_created_handler(event, context):
    ...
Streaming Data with AWS Kinesis and Lambda

Adding environment variables

7jD6x.png

Streaming Data with AWS Kinesis and Lambda

Adding environment variables

eDRTX.png

Streaming Data with AWS Kinesis and Lambda

yKjWC.png

Streaming Data with AWS Kinesis and Lambda

Adding a trigger

gLmbZD.png

Streaming Data with AWS Kinesis and Lambda

Adding a trigger

7F08p.png

Streaming Data with AWS Kinesis and Lambda

Adding a trigger

bPUtX.png

Streaming Data with AWS Kinesis and Lambda

Review

ZAbfT.png

Streaming Data with AWS Kinesis and Lambda

Review

m9jlv.png

Streaming Data with AWS Kinesis and Lambda

Review

GTlyD.png

Streaming Data with AWS Kinesis and Lambda

Review

XddOW.png

Streaming Data with AWS Kinesis and Lambda

Let's practice!

Streaming Data with AWS Kinesis and Lambda

Preparing Video For Download...