Streaming Data with AWS Kinesis and Lambda
Maksim Pecherskiy
Data Engineer
record_id | timestamp | vin | lon | lat | speed |
---|---|---|---|---|---|
939ed1d1-1740-420c-8906-445278573c7f | 4:25:06.000 | 4FTEX4944AK844294 | 106.9447146 | -6.3385652 | 25 |
f29a5b3d-d0fa-43c0-9e1a-e2a5cdb8be7a | 8:10:47.000 | 3FTEX1G5XAK844393 | 108.580681 | 34.79925 | 37 |
ff8e7131-408d-463b-8d07-d016419b0656 | 20:26:44.000 | 2LAXX1C8XAK844292 | 114.392392 | 36.097577 | 90 |
{ "invocationId": "invocationIdExample", "deliveryStreamArn": "arn:aws:firehose:us-east-1:458912630:deliverystream/gps-delivery-stream", "region": "us-east-1", "records": [ {
"recordId": "49546986683135544286507457936321625675700192471156785154",
"approximateArrivalTimestamp": 1495072949453,
"data": "NjQuMjQyLjg4LjEwIC0gLSBbMDcvTWFyLzIwMDQ6MTY6MTA6MDIgLTA4MDBdICJHRVQgL21haWxtYW4vbGlzdGluZm8vaHNkaXZpc2lvbiBIVFRQLzEuMSIgMjAwIDYyOTE=="
} ] }
import base64 from datetime import datetime as dt
def convert_timestamp(record_time_val):
# Get today's date as string today = dt.today().strftime("%Y-%m-%d")
# Combine today's date with the record's time and make datetime object new_ts = dt.strptime(f"{today} {record_time_val}", "%Y-%m-%d %H:%M:%S.%f")
# Convert the datetime object to a nicely formatted string return new_ts.strftime("%Y-%m-%dT%H:%M:%S")
def lambda_handler(event, context): output = []
for record in event['records']: payload = base64.b64decode(record['data'])
payload = payload.decode()
payload = payload.split(" ")
payload[1] = convert_timestamp(payload[1])
for record in event['records']: ... payload = " ".join(payload) payload_enc = base64.b64encode(payload.encode())
output.append({
'recordId': record['recordId'],
'result': 'Ok',
'data': payload_enc
})
return {'records': output}
import base64 def lambda_handler(event, context):
output = [] for record in event['records']: # Iterate over the records
payload = base64.b64decode(record['data']).decode() # Decode the payload
# Modify it
payload_enc = base64.b64encode(payload.encode()) # Re-encode it
output.append({ # Put it in a dictionary 'recordId': record['recordId'], 'result': 'Ok', 'data': payload_enc, })
return {'records': output}
Streaming Data with AWS Kinesis and Lambda