Sunucusuz veri iş akışı

AWS Kinesis ve Lambda ile Akış Verileri

Maksim Pecherskiy

Data Engineer

Geriye bakış

z8HRk.png

AWS Kinesis ve Lambda ile Akış Verileri

Geçerli bölümleme

FMktI.png

AWS Kinesis ve Lambda ile Akış Verileri

Hız ihlalcilerini filtreleme

IPwGK.png

AWS Kinesis ve Lambda ile Akış Verileri

Güne göre toplama

tvBXx.png

AWS Kinesis ve Lambda ile Akış Verileri

recordReaderS3'ü açın

aIahM.png

AWS Kinesis ve Lambda ile Akış Verileri

recordReaderS3'ü düzenleme

...
import pytz
from datetime import datetime
tz = pytz.timezone('America/Los_Angeles')
AWS Kinesis ve Lambda ile Akış Verileri

recordReaderS3'ü düzenleme

def record_created_handler(event, context):

... ## Get top speeds that exceed the limit of 45 too_fast = top_speeds.loc[top_speeds.speed > SPEED_ALERT_THRESHOLD :]
## Generate object key fdate = datetime.now(tz).strftime("%Y%m%d/%HH%MM%SS") obj_key = f"speeders/{fdate}.csv" # filename in speeders folder
## Write the object to S3 s3.put_object(Bucket='sd-vehicle-data', Key=obj_key, Body=too_fast.to_csv(sep=" ", index=False) )
AWS Kinesis ve Lambda ile Akış Verileri

Hız ihlallerini tarih ve saate göre yazma

s3QY4.png

AWS Kinesis ve Lambda ile Akış Verileri

speederAggregator

tvBXx.png

AWS Kinesis ve Lambda ile Akış Verileri

speederAggregator oluşturma

3pGYX.png

AWS Kinesis ve Lambda ile Akış Verileri

AWS data wrangler katmanı ekleme

7TO0V.png

AWS Kinesis ve Lambda ile Akış Verileri

speederAggregator Kaynakları

sO0op.png

AWS Kinesis ve Lambda ile Akış Verileri

Zamanlanmış tetikleyici oluşturma

4Qi4i.png

AWS Kinesis ve Lambda ile Akış Verileri

y0FkQ.png

cron(50 6 * * ? *)
    1. dakikada
    1. saatte (UTC)
  • Ayın her günü
  • Her ay
  • Haftanın her günü
  • Her yıl
AWS Kinesis ve Lambda ile Akış Verileri

speederAggregator geri çağrısı

import boto3, pytz, pandas as pd

s3 = boto3.client("s3" ...)
...

def speeder_aggregator(event, context): tz = pytz.timezone('America/Los_Angeles') filter_date = datetime.now(tz).strftime("%Y%m%d")
AWS Kinesis ve Lambda ile Akış Verileri

speederAggregator geri çağrısı

def speeder_aggregator(event, context):
  ...
  objects = s3.list_objects_v2(
      Bucket='sd-vehicle-data', Prefix=f'speeders/{filter_date}')
  day_data = []

for obj in objects['Contents']: print(obj['Key']) day_record = s3.get_object(Bucket='sd-vehicle-data', Key = obj['Key']) day_data.append( pd.read_csv(day_record['Body'], delimiter = " ")) # Concatenate new records into a single dataframe. data = pd.concat(day_data) data.columns = ["record_id", "timestamp", "vin", "lon", "lat", "speed"]
AWS Kinesis ve Lambda ile Akış Verileri

awswrangler paketi

import awswrangler as wr

session = boto3.Session(aws_access_key_id = AWS_KEY, aws_secret_access_key = AWS_SECRET, region_name="us-east-1")
df = wr.s3.read_csv(f"s3://sd-vehicle-data/speeders/{filter_date}") df.head()
AWS Kinesis ve Lambda ile Akış Verileri

Eski yöntem vs awswrangler

def speeder_aggregator(event, context):
  ...
  objects = s3.list_objects_v2(
      Bucket='sd-vehicle-data', Prefix=f'speeders/{filter_date}')
  day_data = []

  for obj in objects['Contents']:
      print(obj['Key'])
      day_record = s3.get_object(Bucket='sd-vehicle-data', Key = obj['Key'])
      day_data.append(
          pd.read_csv(day_record['Body'], delimiter = " "))

  # Concatenate new records into a single dataframe.
  data = pd.concat(day_data)
  data.columns = ["record_id", "timestamp", "vin", "lon", "lat", "speed"]
AWS Kinesis ve Lambda ile Akış Verileri

Eski yöntem vs awswrangler

import awswrangler as wr

session = boto3.Session(aws_access_key_id = AWS_KEY, 
                        aws_secret_access_key = AWS_SECRET, region_name="us-east-1")
def speeder_aggregator(event, context):
    df = wr.s3.read_csv(f"s3://sd-vehicle-data/speeders/{filter_date}")      
    df.head()
AWS Kinesis ve Lambda ile Akış Verileri

Toplu hız ihlali dosyasını yazma

wr.s3.to_csv(df, 
             f"s3://sd-vehicle-data/speeders_daily/{filter_date}.csv", 
             boto3_session=session, 
             sep=" ", index=False)
AWS Kinesis ve Lambda ile Akış Verileri

Kaydet ve test et

agF2b.png

AWS Kinesis ve Lambda ile Akış Verileri

Tetikleyiciyi etkinleştirme

COfYY.png

AWS Kinesis ve Lambda ile Akış Verileri

Gözden geçirme

u1Jwm.png

AWS Kinesis ve Lambda ile Akış Verileri

Gözden geçirme

7ZYPV.png

AWS Kinesis ve Lambda ile Akış Verileri

Gözden geçirme

  • Zamanlama için cron
  • wr.s3.read_csv()
  • wr.s3.write_csv()
AWS Kinesis ve Lambda ile Akış Verileri

Vamos praticar!

AWS Kinesis ve Lambda ile Akış Verileri

Preparing Video For Download...