A transformational Lambda

Streaming Data with AWS Kinesis and Lambda

Maksim Pecherskiy

Data Engineer

In chapter 1...

2020-07-15_06-18.png

Streaming Data with AWS Kinesis and Lambda

In chapter 2...

2020-07-15_06-20.png

Streaming Data with AWS Kinesis and Lambda

In this chapter...

2020-07-15_06-27.png

Streaming Data with AWS Kinesis and Lambda

Previous approach

2020-07-14_07-42.png

Streaming Data with AWS Kinesis and Lambda

Processing once in S3 vs Lambda transform

Processing once in S3

  • Uses a lambda function fired on object write in S3
  • Longer delay until data can be transformed
  • Raw data is stored before cleaning
  • Requires Firehose destination to be S3

Processing via Lambda transform

  • Uses a lambda function fired mid-firehose-stream
  • Transformation is immediate
  • Only cleaned data is stored
  • Allows for other destination in Firehose
Streaming Data with AWS Kinesis and Lambda

Incoming data

record_id timestamp vin lon lat speed
939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 4FTEX4944AK844294 106.9447146 -6.3385652 25
f29a5b3d-d0fa-43c0-9e1a-e2a5cdb8be7a 8:10:47.000 3FTEX1G5XAK844393 108.580681 34.79925 37
ff8e7131-408d-463b-8d07-d016419b0656 20:26:44.000 2LAXX1C8XAK844292 114.392392 36.097577 90
Streaming Data with AWS Kinesis and Lambda

Transformational Lambda

2020-07-15_06-38.png

Streaming Data with AWS Kinesis and Lambda

Sample event

{
  "invocationId": "invocationIdExample",
  "deliveryStreamArn": "arn:aws:firehose:us-east-1:458912630:deliverystream/gps-delivery-stream",
  "region": "us-east-1",
  "records": [
    {

"recordId": "49546986683135544286507457936321625675700192471156785154",
"approximateArrivalTimestamp": 1495072949453,
"data": "NjQuMjQyLjg4LjEwIC0gLSBbMDcvTWFyLzIwMDQ6MTY6MTA6MDIgLTA4MDBdICJHRVQgL21haWxtYW4vbGlzdGluZm8vaHNkaXZpc2lvbiBIVFRQLzEuMSIgMjAwIDYyOTE=="
} ] }
Streaming Data with AWS Kinesis and Lambda

Base64

2020-07-19_10-54.png

Streaming Data with AWS Kinesis and Lambda

Processing the data

import base64
from datetime import datetime as dt

def convert_timestamp(record_time_val):
# Get today's date as string today = dt.today().strftime("%Y-%m-%d")
# Combine today's date with the record's time and make datetime object new_ts = dt.strptime(f"{today} {record_time_val}", "%Y-%m-%d %H:%M:%S.%f")
# Convert the datetime object to a nicely formatted string return new_ts.strftime("%Y-%m-%dT%H:%M:%S")
Streaming Data with AWS Kinesis and Lambda

Processing the data

def lambda_handler(event, context):
    output = []

for record in event['records']: payload = base64.b64decode(record['data'])
payload = payload.decode()
payload = payload.split(" ")
payload[1] = convert_timestamp(payload[1])
Streaming Data with AWS Kinesis and Lambda

Processing the data

    for record in event['records']:
        ...
        payload = " ".join(payload)
        payload_enc = base64.b64encode(payload.encode())

output.append({
'recordId': record['recordId'],
'result': 'Ok',
'data': payload_enc
})
return {'records': output}
Streaming Data with AWS Kinesis and Lambda

A review

import base64
def lambda_handler(event, context):

output = [] for record in event['records']: # Iterate over the records
payload = base64.b64decode(record['data']).decode() # Decode the payload
# Modify it
payload_enc = base64.b64encode(payload.encode()) # Re-encode it
output.append({ # Put it in a dictionary 'recordId': record['recordId'], 'result': 'Ok', 'data': payload_enc, })
return {'records': output}
Streaming Data with AWS Kinesis and Lambda

Creating Lambda in AWS

Streaming Data with AWS Kinesis and Lambda

Script for create lambda

Streaming Data with AWS Kinesis and Lambda

Let's practice!

Streaming Data with AWS Kinesis and Lambda

Preparing Video For Download...