Working with the Firehose delivery stream

Datenstreaming mit AWS Kinesis und Lambda

Maksim Pecherskiy

Data Engineer

Ready to create stream

KF414O.png

Datenstreaming mit AWS Kinesis und Lambda

Ready to create stream

YgVbe.png

Datenstreaming mit AWS Kinesis und Lambda

Ready to create stream

BBYB4.png

Datenstreaming mit AWS Kinesis und Lambda

Ready to create stream

Ue73e.png

Datenstreaming mit AWS Kinesis und Lambda

Ready to create stream

qUq0E.png

Datenstreaming mit AWS Kinesis und Lambda

Get Role ARN

qH99c.png

Datenstreaming mit AWS Kinesis und Lambda

Initialize boto3 client

import boto3

firehose = boto3.client('firehose', 
                        aws_access_key_id=AWS_KEY_ID, 
                        aws_secret_access_key=AWS_SECRET, 
                        region_name='us-east-1')
Datenstreaming mit AWS Kinesis und Lambda

Create the stream!

res = firehose.create_delivery_stream(

DeliveryStreamName = "gps-delivery-stream",
DeliveryStreamType = "DirectPut",
S3DestinationConfiguration = {
"RoleARN": "arn:aws:iam::0000000:role/firehoseDeliveryRole",
"BucketARN": "arn:aws:s3:::sd-vehicle-data"
}
)
Datenstreaming mit AWS Kinesis und Lambda

Create stream response

print(res['DeliveryStreamARN'])
# New Stream's ARN
"arn:aws:firehose:us-east-1:0000000:deliverystream/gps-delivery-stream"
Datenstreaming mit AWS Kinesis und Lambda

Stream is ready

z0Fp4.png

Datenstreaming mit AWS Kinesis und Lambda

Writing to stream

La0zZ.png

Datenstreaming mit AWS Kinesis und Lambda

Telematics hardware

OGHUHC.png

Datenstreaming mit AWS Kinesis und Lambda

Telematics data send

wCJz8.png

Datenstreaming mit AWS Kinesis und Lambda

Single record

{

'record_id': '939ed1d1-1740-420c-8906-445278573c7f', # <-- Unique record id
'timestamp': '4:25:06.000', # <-- time of measurement
'vin': '4FTEX4944AK844294', # <-- vehicle id
'lon': 106.9447146, # <-- vehicle location longitude
'lat': -6.3385652, # <-- vehicle location latitude
'speed': 25 # <-- vehicle speed
}
Datenstreaming mit AWS Kinesis und Lambda

Records coming in

vRg6A.png

Datenstreaming mit AWS Kinesis und Lambda

Another use case

gu6oT.png

Datenstreaming mit AWS Kinesis und Lambda

Another use case

XFD7E.png

Datenstreaming mit AWS Kinesis und Lambda

Patterns

h7hUU.png

Datenstreaming mit AWS Kinesis und Lambda

Sending a record

res = firehose.put_record(

DeliveryStreamName='gps-delivery-stream',
Record = { 'Data': payload }
)
Datenstreaming mit AWS Kinesis und Lambda

Sending a record

    Record = {
        'Data': payload 
    }
Datenstreaming mit AWS Kinesis und Lambda

Sending a record

What our Record Looks Like
record = {
 'record_id': '939ed1d1-1740-420c-8906-445278573c7f',
 'timestamp': '4:25:06.000','vin': '4FTEX4944AK844294',
 'lon': 106.9447146,'lat': -6.338565200000001,
 'speed': 25}
What we want to send (one string)
"939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 
4FTEX4944AK844294 106.9447146 -6.338565200000001 25"
Datenstreaming mit AWS Kinesis und Lambda

Sending a record

payload = " ".join(
    str(value) for value in record.values()
)
print(payload)
"939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 
4FTEX4944AK844294 106.9447146 -6.338565200000001 25"
Datenstreaming mit AWS Kinesis und Lambda

Putting it together

record = {
 'record_id': '939ed1d1-1740-420c-8906-445278573c7f',
 'timestamp': '4:25:06.000','vin': '4FTEX4944AK844294',
 'lon': 106.9447146,'lat': -6.338565200000001, 'speed': 25}

payload = " ".join( str(value) for value in record.values() )
#"939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 4FTEX4944AK844294 106.9447146 -6.338565200000001 25"
Datenstreaming mit AWS Kinesis und Lambda

Putting it together

res = firehose.put_record(
    DeliveryStreamName='gps-delivery-stream',
    Record = {
        'Data': payload + "\n" #<-- Line break!
    }
)
Datenstreaming mit AWS Kinesis und Lambda

Created files

McYNJ.png

Datenstreaming mit AWS Kinesis und Lambda

Sample data

939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 4FTEX4944AK844294 106.9447146 -6.338565200000001 25
f29a5b3d-d0fa-43c0-9e1a-e2a5cdb8be7a 8:10:47.000 3FTEX1G5XAK844393 108.58068100000001 34.79925 37
ff8e7131-408d-463b-8d07-d016419b0656 20:26:44.000 2LAXX1C8XAK844292 114.39239199999999 36.097577 90
bc75da5f-1bf6-444c-80ad-49c180e1b8de 23:16:06.000 3FTEX1G5XAK844393 -76.6990172 2.481207 40
7bdcf779-444e-4313-83da-140461933aeb 22:28:44.000 5FTEX1MAXAK844295 -47.0145295 -21.4649238 40
Datenstreaming mit AWS Kinesis und Lambda

Created files

X0px9.png

Datenstreaming mit AWS Kinesis und Lambda

Create S3 client

# Create boto3 S3 client.
s3 = boto3.client('s3', 
                  aws_access_key_id=AWS_KEY_ID, 
                  aws_secret_access_key=AWS_SECRET, 
                  region_name='us-east-1')
Datenstreaming mit AWS Kinesis und Lambda

Read data into DataFrame

# Get the object from S3
obj_data = s3.get_object(Bucket='sd-vehicle-data', Key=KEY_YOU_COPIED)
# Read read the object into a DataFrame
vehicle_data = pd.read_csv(
    data['Body'], 
    delimiter = " ", 
    names=["record_id", "timestamp", "vin", "lon", "lat", "speed"]))
Datenstreaming mit AWS Kinesis und Lambda

vehicle_data

record_id timestamp vin lon lat speed
0 939ed1d1... 4:25:06.000 4FTEX4944AK844294 106.945 -6.33857 25
1 f29a5b3d... 8:10:47.000 3FTEX1G5XAK844393 108.581 34.7993 37
2 ff8e7131... 20:26:44.000 2LAXX1C8XAK844292 114.392 36.0976 90
3 bc75da5f... 23:16:06.000 3FTEX1G5XAK844393 -76.699 2.48121 40
4 7bdcf779... 22:28:44.000 5FTEX1MAXAK844295 -47.0145 -21.4649 40
Datenstreaming mit AWS Kinesis und Lambda

Review

qUq0E.png

Datenstreaming mit AWS Kinesis und Lambda

Review

res = firehose.create_delivery_stream(

DeliveryStreamName = "gps-delivery-stream",
DeliveryStreamType = "DirectPut",
S3DestinationConfiguration = {
"RoleARN": "arn:aws:iam::0000000:role/firehoseDeliveryRole",
"BucketARN": "arn:aws:s3:::sd-vehicle-data",
}
)
Datenstreaming mit AWS Kinesis und Lambda

Review

wCJz8.png

Datenstreaming mit AWS Kinesis und Lambda

Review

Datenstreaming mit AWS Kinesis und Lambda

Review

record_id timestamp vin lon lat speed
0 939ed1d1... 4:25:06.000 4FTEX4944AK844294 106.945 -6.33857 25
1 f29a5b3d... 8:10:47.000 3FTEX1G5XAK844393 108.581 34.7993 37
2 ff8e7131... 20:26:44.000 2LAXX1C8XAK844292 114.392 36.0976 90
3 bc75da5f... 23:16:06.000 3FTEX1G5XAK844393 -76.699 2.48121 40
4 7bdcf779... 22:28:44.000 5FTEX1MAXAK844295 -47.0145 -21.4649 40
Datenstreaming mit AWS Kinesis und Lambda

Let's practice!

Datenstreaming mit AWS Kinesis und Lambda

Preparing Video For Download...