Working with the Firehose delivery stream

Streaming de datos con AWS Kinesis y Lambda

Maksim Pecherskiy

Data Engineer

Ready to create stream

KF414O.png

Streaming de datos con AWS Kinesis y Lambda

Ready to create stream

YgVbe.png

Streaming de datos con AWS Kinesis y Lambda

Ready to create stream

BBYB4.png

Streaming de datos con AWS Kinesis y Lambda

Ready to create stream

Ue73e.png

Streaming de datos con AWS Kinesis y Lambda

Ready to create stream

qUq0E.png

Streaming de datos con AWS Kinesis y Lambda

Get Role ARN

qH99c.png

Streaming de datos con AWS Kinesis y Lambda

Initialize boto3 client

import boto3

firehose = boto3.client('firehose', 
                        aws_access_key_id=AWS_KEY_ID, 
                        aws_secret_access_key=AWS_SECRET, 
                        region_name='us-east-1')
Streaming de datos con AWS Kinesis y Lambda

Create the stream!

res = firehose.create_delivery_stream(

DeliveryStreamName = "gps-delivery-stream",
DeliveryStreamType = "DirectPut",
S3DestinationConfiguration = {
"RoleARN": "arn:aws:iam::0000000:role/firehoseDeliveryRole",
"BucketARN": "arn:aws:s3:::sd-vehicle-data"
}
)
Streaming de datos con AWS Kinesis y Lambda

Create stream response

print(res['DeliveryStreamARN'])
# New Stream's ARN
"arn:aws:firehose:us-east-1:0000000:deliverystream/gps-delivery-stream"
Streaming de datos con AWS Kinesis y Lambda

Stream is ready

z0Fp4.png

Streaming de datos con AWS Kinesis y Lambda

Writing to stream

La0zZ.png

Streaming de datos con AWS Kinesis y Lambda

Telematics hardware

OGHUHC.png

Streaming de datos con AWS Kinesis y Lambda

Telematics data send

wCJz8.png

Streaming de datos con AWS Kinesis y Lambda

Single record

{

'record_id': '939ed1d1-1740-420c-8906-445278573c7f', # <-- Unique record id
'timestamp': '4:25:06.000', # <-- time of measurement
'vin': '4FTEX4944AK844294', # <-- vehicle id
'lon': 106.9447146, # <-- vehicle location longitude
'lat': -6.3385652, # <-- vehicle location latitude
'speed': 25 # <-- vehicle speed
}
Streaming de datos con AWS Kinesis y Lambda

Records coming in

vRg6A.png

Streaming de datos con AWS Kinesis y Lambda

Another use case

gu6oT.png

Streaming de datos con AWS Kinesis y Lambda

Another use case

XFD7E.png

Streaming de datos con AWS Kinesis y Lambda

Patterns

h7hUU.png

Streaming de datos con AWS Kinesis y Lambda

Sending a record

res = firehose.put_record(

DeliveryStreamName='gps-delivery-stream',
Record = { 'Data': payload }
)
Streaming de datos con AWS Kinesis y Lambda

Sending a record

    Record = {
        'Data': payload 
    }
Streaming de datos con AWS Kinesis y Lambda

Sending a record

What our Record Looks Like
record = {
 'record_id': '939ed1d1-1740-420c-8906-445278573c7f',
 'timestamp': '4:25:06.000','vin': '4FTEX4944AK844294',
 'lon': 106.9447146,'lat': -6.338565200000001,
 'speed': 25}
What we want to send (one string)
"939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 
4FTEX4944AK844294 106.9447146 -6.338565200000001 25"
Streaming de datos con AWS Kinesis y Lambda

Sending a record

payload = " ".join(
    str(value) for value in record.values()
)
print(payload)
"939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 
4FTEX4944AK844294 106.9447146 -6.338565200000001 25"
Streaming de datos con AWS Kinesis y Lambda

Putting it together

record = {
 'record_id': '939ed1d1-1740-420c-8906-445278573c7f',
 'timestamp': '4:25:06.000','vin': '4FTEX4944AK844294',
 'lon': 106.9447146,'lat': -6.338565200000001, 'speed': 25}

payload = " ".join( str(value) for value in record.values() )
#"939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 4FTEX4944AK844294 106.9447146 -6.338565200000001 25"
Streaming de datos con AWS Kinesis y Lambda

Putting it together

res = firehose.put_record(
    DeliveryStreamName='gps-delivery-stream',
    Record = {
        'Data': payload + "\n" #<-- Line break!
    }
)
Streaming de datos con AWS Kinesis y Lambda

Created files

McYNJ.png

Streaming de datos con AWS Kinesis y Lambda

Sample data

939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 4FTEX4944AK844294 106.9447146 -6.338565200000001 25
f29a5b3d-d0fa-43c0-9e1a-e2a5cdb8be7a 8:10:47.000 3FTEX1G5XAK844393 108.58068100000001 34.79925 37
ff8e7131-408d-463b-8d07-d016419b0656 20:26:44.000 2LAXX1C8XAK844292 114.39239199999999 36.097577 90
bc75da5f-1bf6-444c-80ad-49c180e1b8de 23:16:06.000 3FTEX1G5XAK844393 -76.6990172 2.481207 40
7bdcf779-444e-4313-83da-140461933aeb 22:28:44.000 5FTEX1MAXAK844295 -47.0145295 -21.4649238 40
Streaming de datos con AWS Kinesis y Lambda

Created files

X0px9.png

Streaming de datos con AWS Kinesis y Lambda

Create S3 client

# Create boto3 S3 client.
s3 = boto3.client('s3', 
                  aws_access_key_id=AWS_KEY_ID, 
                  aws_secret_access_key=AWS_SECRET, 
                  region_name='us-east-1')
Streaming de datos con AWS Kinesis y Lambda

Read data into DataFrame

# Get the object from S3
obj_data = s3.get_object(Bucket='sd-vehicle-data', Key=KEY_YOU_COPIED)
# Read read the object into a DataFrame
vehicle_data = pd.read_csv(
    data['Body'], 
    delimiter = " ", 
    names=["record_id", "timestamp", "vin", "lon", "lat", "speed"]))
Streaming de datos con AWS Kinesis y Lambda

vehicle_data

record_id timestamp vin lon lat speed
0 939ed1d1... 4:25:06.000 4FTEX4944AK844294 106.945 -6.33857 25
1 f29a5b3d... 8:10:47.000 3FTEX1G5XAK844393 108.581 34.7993 37
2 ff8e7131... 20:26:44.000 2LAXX1C8XAK844292 114.392 36.0976 90
3 bc75da5f... 23:16:06.000 3FTEX1G5XAK844393 -76.699 2.48121 40
4 7bdcf779... 22:28:44.000 5FTEX1MAXAK844295 -47.0145 -21.4649 40
Streaming de datos con AWS Kinesis y Lambda

Review

qUq0E.png

Streaming de datos con AWS Kinesis y Lambda

Review

res = firehose.create_delivery_stream(

DeliveryStreamName = "gps-delivery-stream",
DeliveryStreamType = "DirectPut",
S3DestinationConfiguration = {
"RoleARN": "arn:aws:iam::0000000:role/firehoseDeliveryRole",
"BucketARN": "arn:aws:s3:::sd-vehicle-data",
}
)
Streaming de datos con AWS Kinesis y Lambda

Review

wCJz8.png

Streaming de datos con AWS Kinesis y Lambda

Review

Streaming de datos con AWS Kinesis y Lambda

Review

record_id timestamp vin lon lat speed
0 939ed1d1... 4:25:06.000 4FTEX4944AK844294 106.945 -6.33857 25
1 f29a5b3d... 8:10:47.000 3FTEX1G5XAK844393 108.581 34.7993 37
2 ff8e7131... 20:26:44.000 2LAXX1C8XAK844292 114.392 36.0976 90
3 bc75da5f... 23:16:06.000 3FTEX1G5XAK844393 -76.699 2.48121 40
4 7bdcf779... 22:28:44.000 5FTEX1MAXAK844295 -47.0145 -21.4649 40
Streaming de datos con AWS Kinesis y Lambda

Let's practice!

Streaming de datos con AWS Kinesis y Lambda

Preparing Video For Download...