A transformational Lambda

Streaming de datos con AWS Kinesis y Lambda

Maksim Pecherskiy

Data Engineer

In chapter 1...

2020-07-15_06-18.png

Streaming de datos con AWS Kinesis y Lambda

In chapter 2...

2020-07-15_06-20.png

Streaming de datos con AWS Kinesis y Lambda

In this chapter...

2020-07-15_06-27.png

Streaming de datos con AWS Kinesis y Lambda

Previous approach

2020-07-14_07-42.png

Streaming de datos con AWS Kinesis y Lambda

Processing once in S3 vs Lambda transform

Processing once in S3

  • Uses a lambda function fired on object write in S3
  • Longer delay until data can be transformed
  • Raw data is stored before cleaning
  • Requires Firehose destination to be S3

Processing via Lambda transform

  • Uses a lambda function fired mid-firehose-stream
  • Transformation is immediate
  • Only cleaned data is stored
  • Allows for other destination in Firehose
Streaming de datos con AWS Kinesis y Lambda

Incoming data

record_id timestamp vin lon lat speed
939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 4FTEX4944AK844294 106.9447146 -6.3385652 25
f29a5b3d-d0fa-43c0-9e1a-e2a5cdb8be7a 8:10:47.000 3FTEX1G5XAK844393 108.580681 34.79925 37
ff8e7131-408d-463b-8d07-d016419b0656 20:26:44.000 2LAXX1C8XAK844292 114.392392 36.097577 90
Streaming de datos con AWS Kinesis y Lambda

Transformational Lambda

2020-07-15_06-38.png

Streaming de datos con AWS Kinesis y Lambda

Sample event

{
  "invocationId": "invocationIdExample",
  "deliveryStreamArn": "arn:aws:firehose:us-east-1:458912630:deliverystream/gps-delivery-stream",
  "region": "us-east-1",
  "records": [
    {

"recordId": "49546986683135544286507457936321625675700192471156785154",
"approximateArrivalTimestamp": 1495072949453,
"data": "NjQuMjQyLjg4LjEwIC0gLSBbMDcvTWFyLzIwMDQ6MTY6MTA6MDIgLTA4MDBdICJHRVQgL21haWxtYW4vbGlzdGluZm8vaHNkaXZpc2lvbiBIVFRQLzEuMSIgMjAwIDYyOTE=="
} ] }
Streaming de datos con AWS Kinesis y Lambda

Base64

2020-07-19_10-54.png

Streaming de datos con AWS Kinesis y Lambda

Processing the data

import base64
from datetime import datetime as dt

def convert_timestamp(record_time_val):
# Get today's date as string today = dt.today().strftime("%Y-%m-%d")
# Combine today's date with the record's time and make datetime object new_ts = dt.strptime(f"{today} {record_time_val}", "%Y-%m-%d %H:%M:%S.%f")
# Convert the datetime object to a nicely formatted string return new_ts.strftime("%Y-%m-%dT%H:%M:%S")
Streaming de datos con AWS Kinesis y Lambda

Processing the data

def lambda_handler(event, context):
    output = []

for record in event['records']: payload = base64.b64decode(record['data'])
payload = payload.decode()
payload = payload.split(" ")
payload[1] = convert_timestamp(payload[1])
Streaming de datos con AWS Kinesis y Lambda

Processing the data

    for record in event['records']:
        ...
        payload = " ".join(payload)
        payload_enc = base64.b64encode(payload.encode())

output.append({
'recordId': record['recordId'],
'result': 'Ok',
'data': payload_enc
})
return {'records': output}
Streaming de datos con AWS Kinesis y Lambda

A review

import base64
def lambda_handler(event, context):

output = [] for record in event['records']: # Iterate over the records
payload = base64.b64decode(record['data']).decode() # Decode the payload
# Modify it
payload_enc = base64.b64encode(payload.encode()) # Re-encode it
output.append({ # Put it in a dictionary 'recordId': record['recordId'], 'result': 'Ok', 'data': payload_enc, })
return {'records': output}
Streaming de datos con AWS Kinesis y Lambda

Creating Lambda in AWS

Streaming de datos con AWS Kinesis y Lambda

Script for create lambda

Streaming de datos con AWS Kinesis y Lambda

Let's practice!

Streaming de datos con AWS Kinesis y Lambda

Preparing Video For Download...