Una Lambda di trasformazione

Dati in streaming con AWS Kinesis e Lambda

Maksim Pecherskiy

Data Engineer

Nel capitolo 1...

2020-07-15_06-18.png

Dati in streaming con AWS Kinesis e Lambda

Nel capitolo 2...

2020-07-15_06-20.png

Dati in streaming con AWS Kinesis e Lambda

In questo capitolo...

2020-07-15_06-27.png

Dati in streaming con AWS Kinesis e Lambda

Approccio precedente

2020-07-14_07-42.png

Dati in streaming con AWS Kinesis e Lambda

Elaborazione in S3 vs trasformazione Lambda

Elaborazione una volta in S3

  • Usa una funzione Lambda attivata alla scrittura su S3
  • Maggior ritardo prima della trasformazione
  • I dati grezzi sono salvati prima della pulizia
  • Richiede S3 come destinazione di Firehose

Elaborazione via trasformazione Lambda

  • Usa una funzione Lambda a metà stream Firehose
  • Trasformazione immediata
  • Si salvano solo dati puliti
  • Consente altre destinazioni in Firehose
Dati in streaming con AWS Kinesis e Lambda

Dati in arrivo

record_id timestamp vin lon lat speed
939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 4FTEX4944AK844294 106.9447146 -6.3385652 25
f29a5b3d-d0fa-43c0-9e1a-e2a5cdb8be7a 8:10:47.000 3FTEX1G5XAK844393 108.580681 34.79925 37
ff8e7131-408d-463b-8d07-d016419b0656 20:26:44.000 2LAXX1C8XAK844292 114.392392 36.097577 90
Dati in streaming con AWS Kinesis e Lambda

Lambda di trasformazione

2020-07-15_06-38.png

Dati in streaming con AWS Kinesis e Lambda

Evento di esempio

{
  "invocationId": "invocationIdExample",
  "deliveryStreamArn": "arn:aws:firehose:us-east-1:458912630:deliverystream/gps-delivery-stream",
  "region": "us-east-1",
  "records": [
    {

"recordId": "49546986683135544286507457936321625675700192471156785154",
"approximateArrivalTimestamp": 1495072949453,
"data": "NjQuMjQyLjg4LjEwIC0gLSBbMDcvTWFyLzIwMDQ6MTY6MTA6MDIgLTA4MDBdICJHRVQgL21haWxtYW4vbGlzdGluZm8vaHNkaXZpc2lvbiBIVFRQLzEuMSIgMjAwIDYyOTE=="
} ] }
Dati in streaming con AWS Kinesis e Lambda

Base64

2020-07-19_10-54.png

Dati in streaming con AWS Kinesis e Lambda

Elaborare i dati

import base64
from datetime import datetime as dt

def convert_timestamp(record_time_val):
# Get today's date as string today = dt.today().strftime("%Y-%m-%d")
# Combine today's date with the record's time and make datetime object new_ts = dt.strptime(f"{today} {record_time_val}", "%Y-%m-%d %H:%M:%S.%f")
# Convert the datetime object to a nicely formatted string return new_ts.strftime("%Y-%m-%dT%H:%M:%S")
Dati in streaming con AWS Kinesis e Lambda

Elaborare i dati

def lambda_handler(event, context):
    output = []

for record in event['records']: payload = base64.b64decode(record['data'])
payload = payload.decode()
payload = payload.split(" ")
payload[1] = convert_timestamp(payload[1])
Dati in streaming con AWS Kinesis e Lambda

Elaborare i dati

    for record in event['records']:
        ...
        payload = " ".join(payload)
        payload_enc = base64.b64encode(payload.encode())

output.append({
'recordId': record['recordId'],
'result': 'Ok',
'data': payload_enc
})
return {'records': output}
Dati in streaming con AWS Kinesis e Lambda

Ripasso

import base64
def lambda_handler(event, context):

output = [] for record in event['records']: # Iterate over the records
payload = base64.b64decode(record['data']).decode() # Decode the payload
# Modify it
payload_enc = base64.b64encode(payload.encode()) # Re-encode it
output.append({ # Put it in a dictionary 'recordId': record['recordId'], 'result': 'Ok', 'data': payload_enc, })
return {'records': output}
Dati in streaming con AWS Kinesis e Lambda

Creare Lambda in AWS

Dati in streaming con AWS Kinesis e Lambda

Script per creare la lambda

Dati in streaming con AWS Kinesis e Lambda

Passons à la pratique !

Dati in streaming con AWS Kinesis e Lambda

Preparing Video For Download...