Streaming Data with AWS Kinesis and Lambda
Maksim Pecherskiy
Data Engineer
https://.../speederReporterApi?date=20200620
{
"queryStringParameters": {
"date": "20200620"
},
"isBase64Encoded": false
}
import json, boto3 import awswrangler as wr import pandas as pd
session = boto3.Session(aws_access_key_id = AWS_KEY, aws_secret_access_key = AWS_SECRET, region_name="us-east-1")
def lambda_handler(event, context): filter_date = event['queryStringParameters']['date']
df = wr.s3.read_csv( f"s3://sd-vehicle-data/speeders_daily/{filter_date}.csv", boto3_session=session, delimiter=" ")
def lambda_handler(event, context):
...
return {
'statusCode': 200,
'headers': {
"content-type" : "application/json"
},
'body': df.to_json()
}
https://.../default/speederReporterApi?date=20200621
[
{
"record_id":"939ed1d1-1740-420c-8906-445278573c7f",
"timestamp":"4:25:06.000",
"vin":"4FTEX4944AK844294",
"lon":106.9447146,
"lat":-6.3385652,
"speed":25
},
{
"record_id":"f29a5b3d-d0fa-43c0-9e1a-e2a5cdb8be7a",
...
},
...
]
def trigger_recalc():
lambda_client = boto3.client("s3",
aws_access_key_id = AWS_KEY,
aws_secret_access_key = AWS_SECRET,
region_name = 'us-east-1')
def lambda_handler(event, context):
...
def trigger_recalc():
...
lambda_client.invoke(
FunctionName='arn:aws:lambda:us-east-1:458913182630:function:speederAggregator',
InvocationType='Event', # (or RequestResponse)
)
Streaming Data with AWS Kinesis and Lambda