Alur kerja data tanpa server

Streaming Data dengan AWS Kinesis dan Lambda

Maksim Pecherskiy

Data Engineer

Kilasan balik

z8HRk.png

Streaming Data dengan AWS Kinesis dan Lambda

Partisi saat ini

FMktI.png

Streaming Data dengan AWS Kinesis dan Lambda

Menyaring pelanggar kecepatan

IPwGK.png

Streaming Data dengan AWS Kinesis dan Lambda

Mengagregasi per hari

tvBXx.png

Streaming Data dengan AWS Kinesis dan Lambda

Buka recordReaderS3

aIahM.png

Streaming Data dengan AWS Kinesis dan Lambda

Mengedit recordReaderS3

...
import pytz
from datetime import datetime
tz = pytz.timezone('America/Los_Angeles')
Streaming Data dengan AWS Kinesis dan Lambda

Mengedit recordReaderS3

def record_created_handler(event, context):

... ## Dapatkan kecepatan puncak yang melebihi batas 45 too_fast = top_speeds.loc[top_speeds.speed > SPEED_ALERT_THRESHOLD :]
## Hasilkan object key fdate = datetime.now(tz).strftime("%Y%m%d/%HH%MM%SS") obj_key = f"speeders/{fdate}.csv" # nama file di folder speeders
## Tulis objek ke S3 s3.put_object(Bucket='sd-vehicle-data', Key=obj_key, Body=too_fast.to_csv(sep=" ", index=False) )
Streaming Data dengan AWS Kinesis dan Lambda

Menulis pelanggar kecepatan per tanggal dan waktu

s3QY4.png

Streaming Data dengan AWS Kinesis dan Lambda

speederAggregator

tvBXx.png

Streaming Data dengan AWS Kinesis dan Lambda

Buat speederAggregator

3pGYX.png

Streaming Data dengan AWS Kinesis dan Lambda

Tambah layer AWS data wrangler

7TO0V.png

Streaming Data dengan AWS Kinesis dan Lambda

Sumber daya speederAggregator

sO0op.png

Streaming Data dengan AWS Kinesis dan Lambda

Buat pemicu terjadwal

4Qi4i.png

Streaming Data dengan AWS Kinesis dan Lambda

y0FkQ.png

cron(50 6 * * ? *)
  • Pada menit ke-50
  • Pada jam ke-6 (UTC)
  • Setiap hari dalam bulan
  • Setiap bulan
  • Setiap hari dalam minggu
  • Setiap tahun
Streaming Data dengan AWS Kinesis dan Lambda

Callback speederAggregator

import boto3, pytz, pandas as pd

s3 = boto3.client("s3" ...)
...

def speeder_aggregator(event, context): tz = pytz.timezone('America/Los_Angeles') filter_date = datetime.now(tz).strftime("%Y%m%d")
Streaming Data dengan AWS Kinesis dan Lambda

Callback speederAggregator

def speeder_aggregator(event, context):
  ...
  objects = s3.list_objects_v2(
      Bucket='sd-vehicle-data', Prefix=f'speeders/{filter_date}')
  day_data = []

for obj in objects['Contents']: print(obj['Key']) day_record = s3.get_object(Bucket='sd-vehicle-data', Key = obj['Key']) day_data.append( pd.read_csv(day_record['Body'], delimiter = " ")) # Gabungkan rekaman baru menjadi satu dataframe. data = pd.concat(day_data) data.columns = ["record_id", "timestamp", "vin", "lon", "lat", "speed"]
Streaming Data dengan AWS Kinesis dan Lambda

Paket awswrangler

import awswrangler as wr

session = boto3.Session(aws_access_key_id = AWS_KEY, aws_secret_access_key = AWS_SECRET, region_name="us-east-1")
df = wr.s3.read_csv(f"s3://sd-vehicle-data/speeders/{filter_date}") df.head()
Streaming Data dengan AWS Kinesis dan Lambda

Cara lama vs awswrangler

def speeder_aggregator(event, context):
  ...
  objects = s3.list_objects_v2(
      Bucket='sd-vehicle-data', Prefix=f'speeders/{filter_date}')
  day_data = []

  for obj in objects['Contents']:
      print(obj['Key'])
      day_record = s3.get_object(Bucket='sd-vehicle-data', Key = obj['Key'])
      day_data.append(
          pd.read_csv(day_record['Body'], delimiter = " "))

  # Gabungkan rekaman baru menjadi satu dataframe.
  data = pd.concat(day_data)
  data.columns = ["record_id", "timestamp", "vin", "lon", "lat", "speed"]
Streaming Data dengan AWS Kinesis dan Lambda

Cara lama vs awswrangler

import awswrangler as wr

session = boto3.Session(aws_access_key_id = AWS_KEY, 
                        aws_secret_access_key = AWS_SECRET, region_name="us-east-1")
def speeder_aggregator(event, context):
    df = wr.s3.read_csv(f"s3://sd-vehicle-data/speeders/{filter_date}")      
    df.head()
Streaming Data dengan AWS Kinesis dan Lambda

Menulis file agregat pelanggar kecepatan

wr.s3.to_csv(df, 
             f"s3://sd-vehicle-data/speeders_daily/{filter_date}.csv", 
             boto3_session=session, 
             sep=" ", index=False)
Streaming Data dengan AWS Kinesis dan Lambda

Simpan dan uji

agF2b.png

Streaming Data dengan AWS Kinesis dan Lambda

Aktifkan pemicu

COfYY.png

Streaming Data dengan AWS Kinesis dan Lambda

Tinjauan

u1Jwm.png

Streaming Data dengan AWS Kinesis dan Lambda

Tinjauan

7ZYPV.png

Streaming Data dengan AWS Kinesis dan Lambda

Tinjauan

  • Cron untuk penjadwalan
  • wr.s3.read_csv()
  • wr.s3.write_csv()
Streaming Data dengan AWS Kinesis dan Lambda

Ayo berlatih!

Streaming Data dengan AWS Kinesis dan Lambda

Preparing Video For Download...