Lambda transformasional

Streaming Data dengan AWS Kinesis dan Lambda

Maksim Pecherskiy

Data Engineer

Di bab 1...

2020-07-15_06-18.png

Streaming Data dengan AWS Kinesis dan Lambda

Di bab 2...

2020-07-15_06-20.png

Streaming Data dengan AWS Kinesis dan Lambda

Di bab ini...

2020-07-15_06-27.png

Streaming Data dengan AWS Kinesis dan Lambda

Pendekatan sebelumnya

2020-07-14_07-42.png

Streaming Data dengan AWS Kinesis dan Lambda

Pemrosesan sekali di S3 vs transformasi Lambda

Pemrosesan sekali di S3

  • Menggunakan fungsi Lambda yang dipicu saat objek ditulis di S3
  • Jeda lebih lama hingga data bisa ditransformasi
  • Data mentah disimpan sebelum dibersihkan
  • Firehose harus ke tujuan S3

Pemrosesan via transformasi Lambda

  • Menggunakan fungsi Lambda yang dipicu di tengah aliran Firehose
  • Transformasi langsung terjadi
  • Hanya data bersih yang disimpan
  • Memungkinkan tujuan lain di Firehose
Streaming Data dengan AWS Kinesis dan Lambda

Data masuk

record_id timestamp vin lon lat speed
939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 4FTEX4944AK844294 106.9447146 -6.3385652 25
f29a5b3d-d0fa-43c0-9e1a-e2a5cdb8be7a 8:10:47.000 3FTEX1G5XAK844393 108.580681 34.79925 37
ff8e7131-408d-463b-8d07-d016419b0656 20:26:44.000 2LAXX1C8XAK844292 114.392392 36.097577 90
Streaming Data dengan AWS Kinesis dan Lambda

Lambda transformasional

2020-07-15_06-38.png

Streaming Data dengan AWS Kinesis dan Lambda

Contoh event

{
  "invocationId": "invocationIdExample",
  "deliveryStreamArn": "arn:aws:firehose:us-east-1:458912630:deliverystream/gps-delivery-stream",
  "region": "us-east-1",
  "records": [
    {

"recordId": "49546986683135544286507457936321625675700192471156785154",
"approximateArrivalTimestamp": 1495072949453,
"data": "NjQuMjQyLjg4LjEwIC0gLSBbMDcvTWFyLzIwMDQ6MTY6MTA6MDIgLTA4MDBdICJHRVQgL21haWxtYW4vbGlzdGluZm8vaHNkaXZpc2lvbiBIVFRQLzEuMSIgMjAwIDYyOTE=="
} ] }
Streaming Data dengan AWS Kinesis dan Lambda

Base64

2020-07-19_10-54.png

Streaming Data dengan AWS Kinesis dan Lambda

Memroses data

import base64
from datetime import datetime as dt

def convert_timestamp(record_time_val):
# Dapatkan tanggal hari ini sebagai string today = dt.today().strftime("%Y-%m-%d")
# Gabungkan tanggal hari ini dengan waktu pada record dan buat objek datetime new_ts = dt.strptime(f"{today} {record_time_val}", "%Y-%m-%d %H:%M:%S.%f")
# Ubah objek datetime menjadi string berformat rapi return new_ts.strftime("%Y-%m-%dT%H:%M:%S")
Streaming Data dengan AWS Kinesis dan Lambda

Memroses data

def lambda_handler(event, context):
    output = []

for record in event['records']: payload = base64.b64decode(record['data'])
payload = payload.decode()
payload = payload.split(" ")
payload[1] = convert_timestamp(payload[1])
Streaming Data dengan AWS Kinesis dan Lambda

Memroses data

    for record in event['records']:
        ...
        payload = " ".join(payload)
        payload_enc = base64.b64encode(payload.encode())

output.append({
'recordId': record['recordId'],
'result': 'Ok',
'data': payload_enc
})
return {'records': output}
Streaming Data dengan AWS Kinesis dan Lambda

Tinjauan singkat

import base64
def lambda_handler(event, context):

output = [] for record in event['records']: # Iterasi tiap record
payload = base64.b64decode(record['data']).decode() # Dekode payload
# Ubah isinya
payload_enc = base64.b64encode(payload.encode()) # Enkode ulang
output.append({ # Masukkan ke dictionary 'recordId': record['recordId'], 'result': 'Ok', 'data': payload_enc, })
return {'records': output}
Streaming Data dengan AWS Kinesis dan Lambda

Membuat Lambda di AWS

Streaming Data dengan AWS Kinesis dan Lambda

Skrip untuk membuat Lambda

Streaming Data dengan AWS Kinesis dan Lambda

Ayo berlatih!

Streaming Data dengan AWS Kinesis dan Lambda

Preparing Video For Download...