Uso del delivery stream Firehose

Dati in streaming con AWS Kinesis e Lambda

Maksim Pecherskiy

Data Engineer

Pronto a creare lo stream

KF414O.png

Dati in streaming con AWS Kinesis e Lambda

Pronto a creare lo stream

YgVbe.png

Dati in streaming con AWS Kinesis e Lambda

Pronto a creare lo stream

BBYB4.png

Dati in streaming con AWS Kinesis e Lambda

Pronto a creare lo stream

Ue73e.png

Dati in streaming con AWS Kinesis e Lambda

Pronto a creare lo stream

qUq0E.png

Dati in streaming con AWS Kinesis e Lambda

Ottieni Role ARN

qH99c.png

Dati in streaming con AWS Kinesis e Lambda

Inizializza il client boto3

import boto3

firehose = boto3.client('firehose', 
                        aws_access_key_id=AWS_KEY_ID, 
                        aws_secret_access_key=AWS_SECRET, 
                        region_name='us-east-1')
Dati in streaming con AWS Kinesis e Lambda

Crea lo stream!

res = firehose.create_delivery_stream(

DeliveryStreamName = "gps-delivery-stream",
DeliveryStreamType = "DirectPut",
S3DestinationConfiguration = {
"RoleARN": "arn:aws:iam::0000000:role/firehoseDeliveryRole",
"BucketARN": "arn:aws:s3:::sd-vehicle-data"
}
)
Dati in streaming con AWS Kinesis e Lambda

Risposta creazione stream

print(res['DeliveryStreamARN'])
# ARN del nuovo stream
"arn:aws:firehose:us-east-1:0000000:deliverystream/gps-delivery-stream"
Dati in streaming con AWS Kinesis e Lambda

Stream pronto

z0Fp4.png

Dati in streaming con AWS Kinesis e Lambda

Scrittura nello stream

La0zZ.png

Dati in streaming con AWS Kinesis e Lambda

Hardware telematico

OGHUHC.png

Dati in streaming con AWS Kinesis e Lambda

Invio dati telematici

wCJz8.png

Dati in streaming con AWS Kinesis e Lambda

Record singolo

{

'record_id': '939ed1d1-1740-420c-8906-445278573c7f', # <-- ID record univoco
'timestamp': '4:25:06.000', # <-- ora della misura
'vin': '4FTEX4944AK844294', # <-- ID veicolo
'lon': 106.9447146, # <-- longitudine veicolo
'lat': -6.3385652, # <-- latitudine veicolo
'speed': 25 # <-- velocità veicolo
}
Dati in streaming con AWS Kinesis e Lambda

Arrivano i record

vRg6A.png

Dati in streaming con AWS Kinesis e Lambda

Altro caso d'uso

gu6oT.png

Dati in streaming con AWS Kinesis e Lambda

Altro caso d'uso

XFD7E.png

Dati in streaming con AWS Kinesis e Lambda

Pattern

h7hUU.png

Dati in streaming con AWS Kinesis e Lambda

Invio di un record

res = firehose.put_record(

DeliveryStreamName='gps-delivery-stream',
Record = { 'Data': payload }
)
Dati in streaming con AWS Kinesis e Lambda

Invio di un record

    Record = {
        'Data': payload 
    }
Dati in streaming con AWS Kinesis e Lambda

Invio di un record

Com'è il nostro record
record = {
 'record_id': '939ed1d1-1740-420c-8906-445278573c7f',
 'timestamp': '4:25:06.000','vin': '4FTEX4944AK844294',
 'lon': 106.9447146,'lat': -6.338565200000001,
 'speed': 25}
Cosa vogliamo inviare (una stringa)
"939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 
4FTEX4944AK844294 106.9447146 -6.338565200000001 25"
Dati in streaming con AWS Kinesis e Lambda

Invio di un record

payload = " ".join(
    str(value) for value in record.values()
)
print(payload)
"939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 
4FTEX4944AK844294 106.9447146 -6.338565200000001 25"
Dati in streaming con AWS Kinesis e Lambda

Mettiamo tutto insieme

record = {
 'record_id': '939ed1d1-1740-420c-8906-445278573c7f',
 'timestamp': '4:25:06.000','vin': '4FTEX4944AK844294',
 'lon': 106.9447146,'lat': -6.338565200000001, 'speed': 25}

payload = " ".join( str(value) for value in record.values() )
#"939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 4FTEX4944AK844294 106.9447146 -6.338565200000001 25"
Dati in streaming con AWS Kinesis e Lambda

Mettiamo tutto insieme

res = firehose.put_record(
    DeliveryStreamName='gps-delivery-stream',
    Record = {
        'Data': payload + "\n" #<-- A capo!
    }
)
Dati in streaming con AWS Kinesis e Lambda

File creati

McYNJ.png

Dati in streaming con AWS Kinesis e Lambda

Dati di esempio

939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 4FTEX4944AK844294 106.9447146 -6.338565200000001 25
f29a5b3d-d0fa-43c0-9e1a-e2a5cdb8be7a 8:10:47.000 3FTEX1G5XAK844393 108.58068100000001 34.79925 37
ff8e7131-408d-463b-8d07-d016419b0656 20:26:44.000 2LAXX1C8XAK844292 114.39239199999999 36.097577 90
bc75da5f-1bf6-444c-80ad-49c180e1b8de 23:16:06.000 3FTEX1G5XAK844393 -76.6990172 2.481207 40
7bdcf779-444e-4313-83da-140461933aeb 22:28:44.000 5FTEX1MAXAK844295 -47.0145295 -21.4649238 40
Dati in streaming con AWS Kinesis e Lambda

File creati

X0px9.png

Dati in streaming con AWS Kinesis e Lambda

Crea client S3

# Crea il client S3 boto3.
s3 = boto3.client('s3', 
                  aws_access_key_id=AWS_KEY_ID, 
                  aws_secret_access_key=AWS_SECRET, 
                  region_name='us-east-1')
Dati in streaming con AWS Kinesis e Lambda

Leggi i dati in un DataFrame

# Prendi l'oggetto da S3
obj_data = s3.get_object(Bucket='sd-vehicle-data', Key=KEY_YOU_COPIED)
# Leggi l'oggetto in un DataFrame
vehicle_data = pd.read_csv(
    data['Body'], 
    delimiter = " ", 
    names=["record_id", "timestamp", "vin", "lon", "lat", "speed"]))
Dati in streaming con AWS Kinesis e Lambda

vehicle_data

record_id timestamp vin lon lat speed
0 939ed1d1... 4:25:06.000 4FTEX4944AK844294 106.945 -6.33857 25
1 f29a5b3d... 8:10:47.000 3FTEX1G5XAK844393 108.581 34.7993 37
2 ff8e7131... 20:26:44.000 2LAXX1C8XAK844292 114.392 36.0976 90
3 bc75da5f... 23:16:06.000 3FTEX1G5XAK844393 -76.699 2.48121 40
4 7bdcf779... 22:28:44.000 5FTEX1MAXAK844295 -47.0145 -21.4649 40
Dati in streaming con AWS Kinesis e Lambda

Ripasso

qUq0E.png

Dati in streaming con AWS Kinesis e Lambda

Ripasso

res = firehose.create_delivery_stream(

DeliveryStreamName = "gps-delivery-stream",
DeliveryStreamType = "DirectPut",
S3DestinationConfiguration = {
"RoleARN": "arn:aws:iam::0000000:role/firehoseDeliveryRole",
"BucketARN": "arn:aws:s3:::sd-vehicle-data",
}
)
Dati in streaming con AWS Kinesis e Lambda

Ripasso

wCJz8.png

Dati in streaming con AWS Kinesis e Lambda

Ripasso

Dati in streaming con AWS Kinesis e Lambda

Ripasso

record_id timestamp vin lon lat speed
0 939ed1d1... 4:25:06.000 4FTEX4944AK844294 106.945 -6.33857 25
1 f29a5b3d... 8:10:47.000 3FTEX1G5XAK844393 108.581 34.7993 37
2 ff8e7131... 20:26:44.000 2LAXX1C8XAK844292 114.392 36.0976 90
3 bc75da5f... 23:16:06.000 3FTEX1G5XAK844393 -76.699 2.48121 40
4 7bdcf779... 22:28:44.000 5FTEX1MAXAK844295 -47.0145 -21.4649 40
Dati in streaming con AWS Kinesis e Lambda

Esercitiamoci!

Dati in streaming con AWS Kinesis e Lambda

Preparing Video For Download...