Streaming Data with AWS Kinesis and Lambda
Maksim Pecherskiy
Data Engineer
...
import pytz
from datetime import datetime
tz = pytz.timezone('America/Los_Angeles')
def record_created_handler(event, context):
... ## Get top speeds that exceed the limit of 45 too_fast = top_speeds.loc[top_speeds.speed > SPEED_ALERT_THRESHOLD :]
## Generate object key fdate = datetime.now(tz).strftime("%Y%m%d/%HH%MM%SS") obj_key = f"speeders/{fdate}.csv" # filename in speeders folder
## Write the object to S3 s3.put_object(Bucket='sd-vehicle-data', Key=obj_key, Body=too_fast.to_csv(sep=" ", index=False) )
cron(50 6 * * ? *)
import boto3, pytz, pandas as pd s3 = boto3.client("s3" ...) ...
def speeder_aggregator(event, context): tz = pytz.timezone('America/Los_Angeles') filter_date = datetime.now(tz).strftime("%Y%m%d")
def speeder_aggregator(event, context): ... objects = s3.list_objects_v2( Bucket='sd-vehicle-data', Prefix=f'speeders/{filter_date}') day_data = []
for obj in objects['Contents']: print(obj['Key']) day_record = s3.get_object(Bucket='sd-vehicle-data', Key = obj['Key']) day_data.append( pd.read_csv(day_record['Body'], delimiter = " ")) # Concatenate new records into a single dataframe. data = pd.concat(day_data) data.columns = ["record_id", "timestamp", "vin", "lon", "lat", "speed"]
import awswrangler as wr
session = boto3.Session(aws_access_key_id = AWS_KEY, aws_secret_access_key = AWS_SECRET, region_name="us-east-1")
df = wr.s3.read_csv(f"s3://sd-vehicle-data/speeders/{filter_date}") df.head()
def speeder_aggregator(event, context):
...
objects = s3.list_objects_v2(
Bucket='sd-vehicle-data', Prefix=f'speeders/{filter_date}')
day_data = []
for obj in objects['Contents']:
print(obj['Key'])
day_record = s3.get_object(Bucket='sd-vehicle-data', Key = obj['Key'])
day_data.append(
pd.read_csv(day_record['Body'], delimiter = " "))
# Concatenate new records into a single dataframe.
data = pd.concat(day_data)
data.columns = ["record_id", "timestamp", "vin", "lon", "lat", "speed"]
import awswrangler as wr
session = boto3.Session(aws_access_key_id = AWS_KEY,
aws_secret_access_key = AWS_SECRET, region_name="us-east-1")
def speeder_aggregator(event, context):
df = wr.s3.read_csv(f"s3://sd-vehicle-data/speeders/{filter_date}")
df.head()
wr.s3.to_csv(df,
f"s3://sd-vehicle-data/speeders_daily/{filter_date}.csv",
boto3_session=session,
sep=" ", index=False)
wr.s3.read_csv()
wr.s3.write_csv()
Streaming Data with AWS Kinesis and Lambda