A transformational Lambda

Datenstreaming mit AWS Kinesis und Lambda

Maksim Pecherskiy

Data Engineer

In chapter 1...

2020-07-15_06-18.png

Datenstreaming mit AWS Kinesis und Lambda

In chapter 2...

2020-07-15_06-20.png

Datenstreaming mit AWS Kinesis und Lambda

In this chapter...

2020-07-15_06-27.png

Datenstreaming mit AWS Kinesis und Lambda

Previous approach

2020-07-14_07-42.png

Datenstreaming mit AWS Kinesis und Lambda

Processing once in S3 vs Lambda transform

Processing once in S3

  • Uses a lambda function fired on object write in S3
  • Longer delay until data can be transformed
  • Raw data is stored before cleaning
  • Requires Firehose destination to be S3

Processing via Lambda transform

  • Uses a lambda function fired mid-firehose-stream
  • Transformation is immediate
  • Only cleaned data is stored
  • Allows for other destination in Firehose
Datenstreaming mit AWS Kinesis und Lambda

Incoming data

record_id timestamp vin lon lat speed
939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 4FTEX4944AK844294 106.9447146 -6.3385652 25
f29a5b3d-d0fa-43c0-9e1a-e2a5cdb8be7a 8:10:47.000 3FTEX1G5XAK844393 108.580681 34.79925 37
ff8e7131-408d-463b-8d07-d016419b0656 20:26:44.000 2LAXX1C8XAK844292 114.392392 36.097577 90
Datenstreaming mit AWS Kinesis und Lambda

Transformational Lambda

2020-07-15_06-38.png

Datenstreaming mit AWS Kinesis und Lambda

Sample event

{
  "invocationId": "invocationIdExample",
  "deliveryStreamArn": "arn:aws:firehose:us-east-1:458912630:deliverystream/gps-delivery-stream",
  "region": "us-east-1",
  "records": [
    {

"recordId": "49546986683135544286507457936321625675700192471156785154",
"approximateArrivalTimestamp": 1495072949453,
"data": "NjQuMjQyLjg4LjEwIC0gLSBbMDcvTWFyLzIwMDQ6MTY6MTA6MDIgLTA4MDBdICJHRVQgL21haWxtYW4vbGlzdGluZm8vaHNkaXZpc2lvbiBIVFRQLzEuMSIgMjAwIDYyOTE=="
} ] }
Datenstreaming mit AWS Kinesis und Lambda

Base64

2020-07-19_10-54.png

Datenstreaming mit AWS Kinesis und Lambda

Processing the data

import base64
from datetime import datetime as dt

def convert_timestamp(record_time_val):
# Get today's date as string today = dt.today().strftime("%Y-%m-%d")
# Combine today's date with the record's time and make datetime object new_ts = dt.strptime(f"{today} {record_time_val}", "%Y-%m-%d %H:%M:%S.%f")
# Convert the datetime object to a nicely formatted string return new_ts.strftime("%Y-%m-%dT%H:%M:%S")
Datenstreaming mit AWS Kinesis und Lambda

Processing the data

def lambda_handler(event, context):
    output = []

for record in event['records']: payload = base64.b64decode(record['data'])
payload = payload.decode()
payload = payload.split(" ")
payload[1] = convert_timestamp(payload[1])
Datenstreaming mit AWS Kinesis und Lambda

Processing the data

    for record in event['records']:
        ...
        payload = " ".join(payload)
        payload_enc = base64.b64encode(payload.encode())

output.append({
'recordId': record['recordId'],
'result': 'Ok',
'data': payload_enc
})
return {'records': output}
Datenstreaming mit AWS Kinesis und Lambda

A review

import base64
def lambda_handler(event, context):

output = [] for record in event['records']: # Iterate over the records
payload = base64.b64decode(record['data']).decode() # Decode the payload
# Modify it
payload_enc = base64.b64encode(payload.encode()) # Re-encode it
output.append({ # Put it in a dictionary 'recordId': record['recordId'], 'result': 'Ok', 'data': payload_enc, })
return {'records': output}
Datenstreaming mit AWS Kinesis und Lambda

Creating Lambda in AWS

Datenstreaming mit AWS Kinesis und Lambda

Script for create lambda

Datenstreaming mit AWS Kinesis und Lambda

Let's practice!

Datenstreaming mit AWS Kinesis und Lambda

Preparing Video For Download...