Streaming Data with AWS Kinesis and Lambda
Maksim Pecherskiy
Data Engineer
bc75da5f-1bf6-444c-80ad-49c180e1b8de 23:16:06.000 3FTEX1G5XAK844393 -76.6990172 2.481207 40
ff8e7131-408d-463b-8d07-d016419b0656 20:26:44.000 2LAXX1C8XAK844292 114.39239199999999 36.097577 90
f29a5b3d-d0fa-43c0-9e1a-e2a5cdb8be7a 8:10:47.000 3FTEX1G5XAK844393 108.58068100000001 34.79925 37
import pandas as pd
results in:
Response:
{
"errorMessage": "Unable to import module 'lambda_function': No module named 'pandas'",
"errorType": "Runtime.ImportModuleError"
}
#lambda_function.py import json, boto3, pandas as pd # Initialize clients ...
SPEED_ALERT_THRESHOLD = 45 ALERT_PHONE_NUMBER = "+1234567890"
# Helper function to get dataframe from written records def get_new_data(event): pd.read_csv()... ... return data
# Lambda function handler def record_created_handler(event, context): data = get_new_data(event) ... sns.publish() ...
def get_new_data(event):
# Create a list to store new object keys.
written_objects = []
# Iterate over each S3 event record.
for record in event['Records']:
# Get the variables to check for
event_name = record['eventName']
bucket_name = record['s3']['bucket']['name']
obj_key = record['s3']['object']['key']
def get_new_data(event): ... # Verify that event is created from sd-vehicle-data bucket. if event_name == 'ObjectCreated:Put' and bucket_name == 'sd-vehicle-data':
obj = s3.get_object(Bucket=bucket_name, Key = obj_key) df = pd.read_csv(obj['Body'], delimiter = " ", names=["record_id", "timestamp", "vin", "lon", "lat", "speed"])
written_objects.append(df)
# Concatenate new records into a single dataframe. return pd.concat(written_objects)
SPEED_ALERT_THRESHOLD = 45 ALERT_PHONE_NUMBER = "+1234567890" ... def record_created_handler(event, context): # Call the helper method data = get_new_data(event)
## Get the top speeds top_speeds = data.groupby(['vin'])['speed'].max().reset_index()
## Get top speeds that exceed the limit of 45 too_fast = top_speeds.loc[top_speeds.speed > SPEED_ALERT_THRESHOLD, :]
SPEED_ALERT_THRESHOLD = 45 ALERT_PHONE_NUMBER = "+1234567890" ... def record_created_handler(event, context): ... ## Send SMS sns.publish(PhoneNumber=ALERT_PHONE_NUMBER, Message="Speeding Alert \n" + too_fast.to_string())
## This doesn't go anywhere yet, but we need to return something. totals = data.groupby(['vin'])['speed'].max().reset_index() return totals.to_csv(sep=" ", index=False)
import os
os.environ.get("ENV_VARIABLE_NAME", "DEFAULT_VALUE")
import os
SPEED_ALERT_THRESHOLD = os.environ.get("SPEED_ALERT_THRESHOLD", 45)
ALERT_PHONE_NUMBER = os.environ.get("ALERT_PHONE_NUMBER", None)
...
def record_created_handler(event, context):
...
Streaming Data with AWS Kinesis and Lambda