Working with the Firehose delivery stream

Streaming Data with AWS Kinesis and Lambda

Maksim Pecherskiy

Data Engineer

Ready to create stream

KF414O.png

Streaming Data with AWS Kinesis and Lambda

Ready to create stream

YgVbe.png

Streaming Data with AWS Kinesis and Lambda

Ready to create stream

BBYB4.png

Streaming Data with AWS Kinesis and Lambda

Ready to create stream

Ue73e.png

Streaming Data with AWS Kinesis and Lambda

Ready to create stream

qUq0E.png

Streaming Data with AWS Kinesis and Lambda

Get Role ARN

qH99c.png

Streaming Data with AWS Kinesis and Lambda

Initialize boto3 client

import boto3

firehose = boto3.client('firehose', 
                        aws_access_key_id=AWS_KEY_ID, 
                        aws_secret_access_key=AWS_SECRET, 
                        region_name='us-east-1')
Streaming Data with AWS Kinesis and Lambda

Create the stream!

res = firehose.create_delivery_stream(

DeliveryStreamName = "gps-delivery-stream",
DeliveryStreamType = "DirectPut",
S3DestinationConfiguration = {
"RoleARN": "arn:aws:iam::0000000:role/firehoseDeliveryRole",
"BucketARN": "arn:aws:s3:::sd-vehicle-data"
}
)
Streaming Data with AWS Kinesis and Lambda

Create stream response

print(res['DeliveryStreamARN'])
# New Stream's ARN
"arn:aws:firehose:us-east-1:0000000:deliverystream/gps-delivery-stream"
Streaming Data with AWS Kinesis and Lambda

Stream is ready

z0Fp4.png

Streaming Data with AWS Kinesis and Lambda

Writing to stream

La0zZ.png

Streaming Data with AWS Kinesis and Lambda

Telematics hardware

OGHUHC.png

Streaming Data with AWS Kinesis and Lambda

Telematics data send

wCJz8.png

Streaming Data with AWS Kinesis and Lambda

Single record

{

'record_id': '939ed1d1-1740-420c-8906-445278573c7f', # <-- Unique record id
'timestamp': '4:25:06.000', # <-- time of measurement
'vin': '4FTEX4944AK844294', # <-- vehicle id
'lon': 106.9447146, # <-- vehicle location longitude
'lat': -6.3385652, # <-- vehicle location latitude
'speed': 25 # <-- vehicle speed
}
Streaming Data with AWS Kinesis and Lambda

Records coming in

vRg6A.png

Streaming Data with AWS Kinesis and Lambda

Another use case

gu6oT.png

Streaming Data with AWS Kinesis and Lambda

Another use case

XFD7E.png

Streaming Data with AWS Kinesis and Lambda

Patterns

h7hUU.png

Streaming Data with AWS Kinesis and Lambda

Sending a record

res = firehose.put_record(

DeliveryStreamName='gps-delivery-stream',
Record = { 'Data': payload }
)
Streaming Data with AWS Kinesis and Lambda

Sending a record

    Record = {
        'Data': payload 
    }
Streaming Data with AWS Kinesis and Lambda

Sending a record

What our Record Looks Like
record = {
 'record_id': '939ed1d1-1740-420c-8906-445278573c7f',
 'timestamp': '4:25:06.000','vin': '4FTEX4944AK844294',
 'lon': 106.9447146,'lat': -6.338565200000001,
 'speed': 25}
What we want to send (one string)
"939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 
4FTEX4944AK844294 106.9447146 -6.338565200000001 25"
Streaming Data with AWS Kinesis and Lambda

Sending a record

payload = " ".join(
    str(value) for value in record.values()
)
print(payload)
"939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 
4FTEX4944AK844294 106.9447146 -6.338565200000001 25"
Streaming Data with AWS Kinesis and Lambda

Putting it together

record = {
 'record_id': '939ed1d1-1740-420c-8906-445278573c7f',
 'timestamp': '4:25:06.000','vin': '4FTEX4944AK844294',
 'lon': 106.9447146,'lat': -6.338565200000001, 'speed': 25}

payload = " ".join( str(value) for value in record.values() )
#"939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 4FTEX4944AK844294 106.9447146 -6.338565200000001 25"
Streaming Data with AWS Kinesis and Lambda

Putting it together

res = firehose.put_record(
    DeliveryStreamName='gps-delivery-stream',
    Record = {
        'Data': payload + "\n" #<-- Line break!
    }
)
Streaming Data with AWS Kinesis and Lambda

Created files

McYNJ.png

Streaming Data with AWS Kinesis and Lambda

Sample data

939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 4FTEX4944AK844294 106.9447146 -6.338565200000001 25
f29a5b3d-d0fa-43c0-9e1a-e2a5cdb8be7a 8:10:47.000 3FTEX1G5XAK844393 108.58068100000001 34.79925 37
ff8e7131-408d-463b-8d07-d016419b0656 20:26:44.000 2LAXX1C8XAK844292 114.39239199999999 36.097577 90
bc75da5f-1bf6-444c-80ad-49c180e1b8de 23:16:06.000 3FTEX1G5XAK844393 -76.6990172 2.481207 40
7bdcf779-444e-4313-83da-140461933aeb 22:28:44.000 5FTEX1MAXAK844295 -47.0145295 -21.4649238 40
Streaming Data with AWS Kinesis and Lambda

Created files

X0px9.png

Streaming Data with AWS Kinesis and Lambda

Create S3 client

# Create boto3 S3 client.
s3 = boto3.client('s3', 
                  aws_access_key_id=AWS_KEY_ID, 
                  aws_secret_access_key=AWS_SECRET, 
                  region_name='us-east-1')
Streaming Data with AWS Kinesis and Lambda

Read data into DataFrame

# Get the object from S3
obj_data = s3.get_object(Bucket='sd-vehicle-data', Key=KEY_YOU_COPIED)
# Read read the object into a DataFrame
vehicle_data = pd.read_csv(
    data['Body'], 
    delimiter = " ", 
    names=["record_id", "timestamp", "vin", "lon", "lat", "speed"]))
Streaming Data with AWS Kinesis and Lambda

vehicle_data

record_id timestamp vin lon lat speed
0 939ed1d1... 4:25:06.000 4FTEX4944AK844294 106.945 -6.33857 25
1 f29a5b3d... 8:10:47.000 3FTEX1G5XAK844393 108.581 34.7993 37
2 ff8e7131... 20:26:44.000 2LAXX1C8XAK844292 114.392 36.0976 90
3 bc75da5f... 23:16:06.000 3FTEX1G5XAK844393 -76.699 2.48121 40
4 7bdcf779... 22:28:44.000 5FTEX1MAXAK844295 -47.0145 -21.4649 40
Streaming Data with AWS Kinesis and Lambda

Review

qUq0E.png

Streaming Data with AWS Kinesis and Lambda

Review

res = firehose.create_delivery_stream(

DeliveryStreamName = "gps-delivery-stream",
DeliveryStreamType = "DirectPut",
S3DestinationConfiguration = {
"RoleARN": "arn:aws:iam::0000000:role/firehoseDeliveryRole",
"BucketARN": "arn:aws:s3:::sd-vehicle-data",
}
)
Streaming Data with AWS Kinesis and Lambda

Review

wCJz8.png

Streaming Data with AWS Kinesis and Lambda

Review

Streaming Data with AWS Kinesis and Lambda

Review

record_id timestamp vin lon lat speed
0 939ed1d1... 4:25:06.000 4FTEX4944AK844294 106.945 -6.33857 25
1 f29a5b3d... 8:10:47.000 3FTEX1G5XAK844393 108.581 34.7993 37
2 ff8e7131... 20:26:44.000 2LAXX1C8XAK844292 114.392 36.0976 90
3 bc75da5f... 23:16:06.000 3FTEX1G5XAK844393 -76.699 2.48121 40
4 7bdcf779... 22:28:44.000 5FTEX1MAXAK844295 -47.0145 -21.4649 40
Streaming Data with AWS Kinesis and Lambda

Let's practice!

Streaming Data with AWS Kinesis and Lambda

Preparing Video For Download...