İlk canlı lambda’nız!

AWS Kinesis ve Lambda ile Akış Verileri

Maksim Pecherskiy

Data Engineer

Önceki ders

StkGt.png

AWS Kinesis ve Lambda ile Akış Verileri

Bu ders

Zb4Va.png

AWS Kinesis ve Lambda ile Akış Verileri

Ortak günlük biçimi

bc75da5f-1bf6-444c-80ad-49c180e1b8de 23:16:06.000 3FTEX1G5XAK844393 -76.6990172 2.481207 40
ff8e7131-408d-463b-8d07-d016419b0656 20:26:44.000 2LAXX1C8XAK844292 114.39239199999999 36.097577 90
f29a5b3d-d0fa-43c0-9e1a-e2a5cdb8be7a 8:10:47.000 3FTEX1G5XAK844393 108.58068100000001 34.79925 37
AWS Kinesis ve Lambda ile Akış Verileri

S3 dosyalarını okuma

import pandas as pd

results in:

Response:
{
  "errorMessage": "Unable to import module 'lambda_function': No module named 'pandas'",
  "errorType": "Runtime.ImportModuleError"
}
AWS Kinesis ve Lambda ile Akış Verileri

İşleyiciyi güncelleme

#lambda_function.py
import json, boto3, pandas as pd
# İstemcileri başlatın
...

SPEED_ALERT_THRESHOLD = 45 ALERT_PHONE_NUMBER = "+1234567890"
# Yazılan kayıtlardan veri çerçevesi almak için yardımcı işlev def get_new_data(event): pd.read_csv()... ... return data
# Lambda işlevi işleyicisi def record_created_handler(event, context): data = get_new_data(event) ... sns.publish() ...
AWS Kinesis ve Lambda ile Akış Verileri

get_new_data()

def get_new_data(event):
    # Yeni nesne anahtarlarını saklamak için bir liste oluşturun.
    written_objects = []

    # Her S3 olay kaydını yineleyin.
    for record in event['Records']:

        # Kontrol edilecek değişkenleri alın
        event_name = record['eventName']
        bucket_name = record['s3']['bucket']['name']
        obj_key = record['s3']['object']['key']
AWS Kinesis ve Lambda ile Akış Verileri

get_new_data()

def get_new_data(event):
        ...
        # Olayın sd-vehicle-data kovasından geldiğini doğrulayın.
        if event_name == 'ObjectCreated:Put' and bucket_name == 'sd-vehicle-data':

obj = s3.get_object(Bucket=bucket_name, Key = obj_key) df = pd.read_csv(obj['Body'], delimiter = " ", names=["record_id", "timestamp", "vin", "lon", "lat", "speed"])
written_objects.append(df)
# Yeni kayıtları tek bir veri çerçevesinde birleştirin. return pd.concat(written_objects)
AWS Kinesis ve Lambda ile Akış Verileri

record_created_handler()

SPEED_ALERT_THRESHOLD = 45
ALERT_PHONE_NUMBER = "+1234567890"
...
def record_created_handler(event, context):
    # Yardımcı yöntemi çağırın
    data = get_new_data(event)

## En yüksek hızları alın top_speeds = data.groupby(['vin'])['speed'].max().reset_index()
## 45 sınırını aşan en yüksek hızları alın too_fast = top_speeds.loc[top_speeds.speed > SPEED_ALERT_THRESHOLD, :]
AWS Kinesis ve Lambda ile Akış Verileri

record_created_handler()

SPEED_ALERT_THRESHOLD = 45
ALERT_PHONE_NUMBER = "+1234567890"
...

def record_created_handler(event, context):
    ...
    ## SMS gönderin
    sns.publish(PhoneNumber=ALERT_PHONE_NUMBER,
        Message="Speeding Alert \n" + too_fast.to_string())

## Şimdilik hiçbir yere gitmiyor, ancak bir şey döndürmemiz gerekir. totals = data.groupby(['vin'])['speed'].max().reset_index() return totals.to_csv(sep=" ", index=False)
AWS Kinesis ve Lambda ile Akış Verileri

Lambda işlevini test edin

jBpMo.png

KodWB.png

AWS Kinesis ve Lambda ile Akış Verileri

Ortam değişkenleri ekleme

import os
os.environ.get("ENV_VARIABLE_NAME", "DEFAULT_VALUE")
import os
SPEED_ALERT_THRESHOLD = os.environ.get("SPEED_ALERT_THRESHOLD", 45)
ALERT_PHONE_NUMBER = os.environ.get("ALERT_PHONE_NUMBER", None)
...

def record_created_handler(event, context):
    ...
AWS Kinesis ve Lambda ile Akış Verileri

Ortam değişkenleri ekleme

7jD6x.png

AWS Kinesis ve Lambda ile Akış Verileri

Ortam değişkenleri ekleme

eDRTX.png

AWS Kinesis ve Lambda ile Akış Verileri

yKjWC.png

AWS Kinesis ve Lambda ile Akış Verileri

Tetikleyici ekleme

gLmbZD.png

AWS Kinesis ve Lambda ile Akış Verileri

Tetikleyici ekleme

7F08p.png

AWS Kinesis ve Lambda ile Akış Verileri

Tetikleyici ekleme

bPUtX.png

AWS Kinesis ve Lambda ile Akış Verileri

Gözden geçirme

ZAbfT.png

AWS Kinesis ve Lambda ile Akış Verileri

Gözden geçirme

m9jlv.png

AWS Kinesis ve Lambda ile Akış Verileri

Gözden geçirme

GTlyD.png

AWS Kinesis ve Lambda ile Akış Verileri

Gözden geçirme

XddOW.png

AWS Kinesis ve Lambda ile Akış Verileri

Ayo berlatih!

AWS Kinesis ve Lambda ile Akış Verileri

Preparing Video For Download...