Dönüştürücü Lambda

AWS Kinesis ve Lambda ile Akış Verileri

Maksim Pecherskiy

Data Engineer

Bölüm 1'de...

2020-07-15_06-18.png

AWS Kinesis ve Lambda ile Akış Verileri

Bölüm 2'de...

2020-07-15_06-20.png

AWS Kinesis ve Lambda ile Akış Verileri

Bu bölümde...

2020-07-15_06-27.png

AWS Kinesis ve Lambda ile Akış Verileri

Önceki yaklaşım

2020-07-14_07-42.png

AWS Kinesis ve Lambda ile Akış Verileri

S3'te işleme vs Lambda dönüşümü

S3'te tek seferde işleme

  • S3'e yazmada tetiklenen bir Lambda kullanır
  • Dönüşüm için daha uzun gecikme
  • Ham veri temizlikten önce saklanır
  • Firehose hedefinin S3 olmasını gerektirir

Lambda dönüşümü ile işleme

  • Firehose akışı ortasında tetiklenen bir Lambda kullanır
  • Dönüşüm anlıktır
  • Yalnızca temiz veri saklanır
  • Firehose'da başka hedeflere izin verir
AWS Kinesis ve Lambda ile Akış Verileri

Gelen veriler

record_id timestamp vin lon lat speed
939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 4FTEX4944AK844294 106.9447146 -6.3385652 25
f29a5b3d-d0fa-43c0-9e1a-e2a5cdb8be7a 8:10:47.000 3FTEX1G5XAK844393 108.580681 34.79925 37
ff8e7131-408d-463b-8d07-d016419b0656 20:26:44.000 2LAXX1C8XAK844292 114.392392 36.097577 90
AWS Kinesis ve Lambda ile Akış Verileri

Dönüştürücü Lambda

2020-07-15_06-38.png

AWS Kinesis ve Lambda ile Akış Verileri

Örnek olay

{
  "invocationId": "invocationIdExample",
  "deliveryStreamArn": "arn:aws:firehose:us-east-1:458912630:deliverystream/gps-delivery-stream",
  "region": "us-east-1",
  "records": [
    {

"recordId": "49546986683135544286507457936321625675700192471156785154",
"approximateArrivalTimestamp": 1495072949453,
"data": "NjQuMjQyLjg4LjEwIC0gLSBbMDcvTWFyLzIwMDQ6MTY6MTA6MDIgLTA4MDBdICJHRVQgL21haWxtYW4vbGlzdGluZm8vaHNkaXZpc2lvbiBIVFRQLzEuMSIgMjAwIDYyOTE=="
} ] }
AWS Kinesis ve Lambda ile Akış Verileri

Base64

2020-07-19_10-54.png

AWS Kinesis ve Lambda ile Akış Verileri

Veriyi işleme

import base64
from datetime import datetime as dt

def convert_timestamp(record_time_val):
# Bugünün tarihini metin olarak al today = dt.today().strftime("%Y-%m-%d")
# Bugünün tarihi ile kaydın saatini birleştir ve datetime nesnesi yap new_ts = dt.strptime(f"{today} {record_time_val}", "%Y-%m-%d %H:%M:%S.%f")
# Datetime nesnesini düzgün biçimlendirilmiş metne çevir return new_ts.strftime("%Y-%m-%dT%H:%M:%S")
AWS Kinesis ve Lambda ile Akış Verileri

Veriyi işleme

def lambda_handler(event, context):
    output = []

for record in event['records']: payload = base64.b64decode(record['data'])
payload = payload.decode()
payload = payload.split(" ")
payload[1] = convert_timestamp(payload[1])
AWS Kinesis ve Lambda ile Akış Verileri

Veriyi işleme

    for record in event['records']:
        ...
        payload = " ".join(payload)
        payload_enc = base64.b64encode(payload.encode())

output.append({
'recordId': record['recordId'],
'result': 'Ok',
'data': payload_enc
})
return {'records': output}
AWS Kinesis ve Lambda ile Akış Verileri

Kısa özet

import base64
def lambda_handler(event, context):

output = [] for record in event['records']: # Kayıtlar üzerinde yinele
payload = base64.b64decode(record['data']).decode() # Yükü çöz
# Değiştir
payload_enc = base64.b64encode(payload.encode()) # Yeniden kodla
output.append({ # Sözlüğe ekle 'recordId': record['recordId'], 'result': 'Ok', 'data': payload_enc, })
return {'records': output}
AWS Kinesis ve Lambda ile Akış Verileri

AWS'de Lambda oluşturma

AWS Kinesis ve Lambda ile Akış Verileri

Lambda oluşturma betiği

AWS Kinesis ve Lambda ile Akış Verileri

Hadi pratik yapalım!

AWS Kinesis ve Lambda ile Akış Verileri

Preparing Video For Download...