Firehose teslim akışıyla çalışma

AWS Kinesis ve Lambda ile Akış Verileri

Maksim Pecherskiy

Data Engineer

Akış oluşturmaya hazır

KF414O.png

AWS Kinesis ve Lambda ile Akış Verileri

Akış oluşturmaya hazır

YgVbe.png

AWS Kinesis ve Lambda ile Akış Verileri

Akış oluşturmaya hazır

BBYB4.png

AWS Kinesis ve Lambda ile Akış Verileri

Akış oluşturmaya hazır

Ue73e.png

AWS Kinesis ve Lambda ile Akış Verileri

Akış oluşturmaya hazır

qUq0E.png

AWS Kinesis ve Lambda ile Akış Verileri

Role ARN alın

qH99c.png

AWS Kinesis ve Lambda ile Akış Verileri

boto3 istemcisini başlatın

import boto3

firehose = boto3.client('firehose', 
                        aws_access_key_id=AWS_KEY_ID, 
                        aws_secret_access_key=AWS_SECRET, 
                        region_name='us-east-1')
AWS Kinesis ve Lambda ile Akış Verileri

Akışı oluşturun!

res = firehose.create_delivery_stream(

DeliveryStreamName = "gps-delivery-stream",
DeliveryStreamType = "DirectPut",
S3DestinationConfiguration = {
"RoleARN": "arn:aws:iam::0000000:role/firehoseDeliveryRole",
"BucketARN": "arn:aws:s3:::sd-vehicle-data"
}
)
AWS Kinesis ve Lambda ile Akış Verileri

Akış oluşturma yanıtı

print(res['DeliveryStreamARN'])
# Yeni akışın ARN’i
"arn:aws:firehose:us-east-1:0000000:deliverystream/gps-delivery-stream"
AWS Kinesis ve Lambda ile Akış Verileri

Akış hazır

z0Fp4.png

AWS Kinesis ve Lambda ile Akış Verileri

Akışa yazma

La0zZ.png

AWS Kinesis ve Lambda ile Akış Verileri

Telematik donanımı

OGHUHC.png

AWS Kinesis ve Lambda ile Akış Verileri

Telematik veri gönderimi

wCJz8.png

AWS Kinesis ve Lambda ile Akış Verileri

Tek kayıt

{

'record_id': '939ed1d1-1740-420c-8906-445278573c7f', # <-- Benzersiz kayıt kimliği
'timestamp': '4:25:06.000', # <-- ölçüm zamanı
'vin': '4FTEX4944AK844294', # <-- araç kimliği
'lon': 106.9447146, # <-- araç boylamı
'lat': -6.3385652, # <-- araç enlemi
'speed': 25 # <-- araç hızı
}
AWS Kinesis ve Lambda ile Akış Verileri

Kayıtlar geliyor

vRg6A.png

AWS Kinesis ve Lambda ile Akış Verileri

Başka bir kullanım durumu

gu6oT.png

AWS Kinesis ve Lambda ile Akış Verileri

Başka bir kullanım durumu

XFD7E.png

AWS Kinesis ve Lambda ile Akış Verileri

Örüntüler

h7hUU.png

AWS Kinesis ve Lambda ile Akış Verileri

Kayıt gönderme

res = firehose.put_record(

DeliveryStreamName='gps-delivery-stream',
Record = { 'Data': payload }
)
AWS Kinesis ve Lambda ile Akış Verileri

Kayıt gönderme

    Record = {
        'Data': payload 
    }
AWS Kinesis ve Lambda ile Akış Verileri

Kayıt gönderme

Kaydımızın görünümü
record = {
 'record_id': '939ed1d1-1740-420c-8906-445278573c7f',
 'timestamp': '4:25:06.000','vin': '4FTEX4944AK844294',
 'lon': 106.9447146,'lat': -6.338565200000001,
 'speed': 25}
Göndermek istediğimiz (tek satır)
"939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 
4FTEX4944AK844294 106.9447146 -6.338565200000001 25"
AWS Kinesis ve Lambda ile Akış Verileri

Kayıt gönderme

payload = " ".join(
    str(value) for value in record.values()
)
print(payload)
"939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 
4FTEX4944AK844294 106.9447146 -6.338565200000001 25"
AWS Kinesis ve Lambda ile Akış Verileri

Birleştirme

record = {
 'record_id': '939ed1d1-1740-420c-8906-445278573c7f',
 'timestamp': '4:25:06.000','vin': '4FTEX4944AK844294',
 'lon': 106.9447146,'lat': -6.338565200000001, 'speed': 25}

payload = " ".join( str(value) for value in record.values() )
#"939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 4FTEX4944AK844294 106.9447146 -6.338565200000001 25"
AWS Kinesis ve Lambda ile Akış Verileri

Birleştirme

res = firehose.put_record(
    DeliveryStreamName='gps-delivery-stream',
    Record = {
        'Data': payload + "\n" #<-- Satır sonu!
    }
)
AWS Kinesis ve Lambda ile Akış Verileri

Oluşturulan dosyalar

McYNJ.png

AWS Kinesis ve Lambda ile Akış Verileri

Örnek veriler

939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 4FTEX4944AK844294 106.9447146 -6.338565200000001 25
f29a5b3d-d0fa-43c0-9e1a-e2a5cdb8be7a 8:10:47.000 3FTEX1G5XAK844393 108.58068100000001 34.79925 37
ff8e7131-408d-463b-8d07-d016419b0656 20:26:44.000 2LAXX1C8XAK844292 114.39239199999999 36.097577 90
bc75da5f-1bf6-444c-80ad-49c180e1b8de 23:16:06.000 3FTEX1G5XAK844393 -76.6990172 2.481207 40
7bdcf779-444e-4313-83da-140461933aeb 22:28:44.000 5FTEX1MAXAK844295 -47.0145295 -21.4649238 40
AWS Kinesis ve Lambda ile Akış Verileri

Oluşturulan dosyalar

X0px9.png

AWS Kinesis ve Lambda ile Akış Verileri

S3 istemcisi oluşturun

# boto3 S3 istemcisi oluşturun.
s3 = boto3.client('s3', 
                  aws_access_key_id=AWS_KEY_ID, 
                  aws_secret_access_key=AWS_SECRET, 
                  region_name='us-east-1')
AWS Kinesis ve Lambda ile Akış Verileri

Veriyi DataFrame’e okuyun

# Nesneyi S3'ten alın
obj_data = s3.get_object(Bucket='sd-vehicle-data', Key=KEY_YOU_COPIED)
# Nesneyi DataFrame'e okuyun
vehicle_data = pd.read_csv(
    data['Body'], 
    delimiter = " ", 
    names=["record_id", "timestamp", "vin", "lon", "lat", "speed"]))
AWS Kinesis ve Lambda ile Akış Verileri

vehicle_data

record_id timestamp vin lon lat speed
0 939ed1d1... 4:25:06.000 4FTEX4944AK844294 106.945 -6.33857 25
1 f29a5b3d... 8:10:47.000 3FTEX1G5XAK844393 108.581 34.7993 37
2 ff8e7131... 20:26:44.000 2LAXX1C8XAK844292 114.392 36.0976 90
3 bc75da5f... 23:16:06.000 3FTEX1G5XAK844393 -76.699 2.48121 40
4 7bdcf779... 22:28:44.000 5FTEX1MAXAK844295 -47.0145 -21.4649 40
AWS Kinesis ve Lambda ile Akış Verileri

Gözden geçirme

qUq0E.png

AWS Kinesis ve Lambda ile Akış Verileri

Gözden geçirme

res = firehose.create_delivery_stream(

DeliveryStreamName = "gps-delivery-stream",
DeliveryStreamType = "DirectPut",
S3DestinationConfiguration = {
"RoleARN": "arn:aws:iam::0000000:role/firehoseDeliveryRole",
"BucketARN": "arn:aws:s3:::sd-vehicle-data",
}
)
AWS Kinesis ve Lambda ile Akış Verileri

Gözden geçirme

wCJz8.png

AWS Kinesis ve Lambda ile Akış Verileri

Gözden geçirme

AWS Kinesis ve Lambda ile Akış Verileri

Gözden geçirme

record_id timestamp vin lon lat speed
0 939ed1d1... 4:25:06.000 4FTEX4944AK844294 106.945 -6.33857 25
1 f29a5b3d... 8:10:47.000 3FTEX1G5XAK844393 108.581 34.7993 37
2 ff8e7131... 20:26:44.000 2LAXX1C8XAK844292 114.392 36.0976 90
3 bc75da5f... 23:16:06.000 3FTEX1G5XAK844393 -76.699 2.48121 40
4 7bdcf779... 22:28:44.000 5FTEX1MAXAK844295 -47.0145 -21.4649 40
AWS Kinesis ve Lambda ile Akış Verileri

Hadi pratik yapalım!

AWS Kinesis ve Lambda ile Akış Verileri

Preparing Video For Download...