Streaming Data with AWS Kinesis and Lambda
Maksim Pecherskiy
Data Engineer
import boto3
firehose = boto3.client('firehose',
aws_access_key_id=AWS_KEY_ID,
aws_secret_access_key=AWS_SECRET,
region_name='us-east-1')
res = firehose.create_delivery_stream(
DeliveryStreamName = "gps-delivery-stream",
DeliveryStreamType = "DirectPut",
S3DestinationConfiguration = {
"RoleARN": "arn:aws:iam::0000000:role/firehoseDeliveryRole",
"BucketARN": "arn:aws:s3:::sd-vehicle-data"
}
)
print(res['DeliveryStreamARN'])
# New Stream's ARN
"arn:aws:firehose:us-east-1:0000000:deliverystream/gps-delivery-stream"
{
'record_id': '939ed1d1-1740-420c-8906-445278573c7f', # <-- Unique record id
'timestamp': '4:25:06.000', # <-- time of measurement
'vin': '4FTEX4944AK844294', # <-- vehicle id
'lon': 106.9447146, # <-- vehicle location longitude
'lat': -6.3385652, # <-- vehicle location latitude
'speed': 25 # <-- vehicle speed
}
res = firehose.put_record(
DeliveryStreamName='gps-delivery-stream',
Record = { 'Data': payload }
)
Record = {
'Data': payload
}
record = {
'record_id': '939ed1d1-1740-420c-8906-445278573c7f',
'timestamp': '4:25:06.000','vin': '4FTEX4944AK844294',
'lon': 106.9447146,'lat': -6.338565200000001,
'speed': 25}
"939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000
4FTEX4944AK844294 106.9447146 -6.338565200000001 25"
payload = " ".join(
str(value) for value in record.values()
)
print(payload)
"939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000
4FTEX4944AK844294 106.9447146 -6.338565200000001 25"
record = { 'record_id': '939ed1d1-1740-420c-8906-445278573c7f', 'timestamp': '4:25:06.000','vin': '4FTEX4944AK844294', 'lon': 106.9447146,'lat': -6.338565200000001, 'speed': 25}
payload = " ".join( str(value) for value in record.values() )
#"939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 4FTEX4944AK844294 106.9447146 -6.338565200000001 25"
res = firehose.put_record(
DeliveryStreamName='gps-delivery-stream',
Record = {
'Data': payload + "\n" #<-- Line break!
}
)
939ed1d1-1740-420c-8906-445278573c7f 4:25:06.000 4FTEX4944AK844294 106.9447146 -6.338565200000001 25
f29a5b3d-d0fa-43c0-9e1a-e2a5cdb8be7a 8:10:47.000 3FTEX1G5XAK844393 108.58068100000001 34.79925 37
ff8e7131-408d-463b-8d07-d016419b0656 20:26:44.000 2LAXX1C8XAK844292 114.39239199999999 36.097577 90
bc75da5f-1bf6-444c-80ad-49c180e1b8de 23:16:06.000 3FTEX1G5XAK844393 -76.6990172 2.481207 40
7bdcf779-444e-4313-83da-140461933aeb 22:28:44.000 5FTEX1MAXAK844295 -47.0145295 -21.4649238 40
# Create boto3 S3 client.
s3 = boto3.client('s3',
aws_access_key_id=AWS_KEY_ID,
aws_secret_access_key=AWS_SECRET,
region_name='us-east-1')
# Get the object from S3
obj_data = s3.get_object(Bucket='sd-vehicle-data', Key=KEY_YOU_COPIED)
# Read read the object into a DataFrame
vehicle_data = pd.read_csv(
data['Body'],
delimiter = " ",
names=["record_id", "timestamp", "vin", "lon", "lat", "speed"]))
record_id | timestamp | vin | lon | lat | speed | |
---|---|---|---|---|---|---|
0 | 939ed1d1... | 4:25:06.000 | 4FTEX4944AK844294 | 106.945 | -6.33857 | 25 |
1 | f29a5b3d... | 8:10:47.000 | 3FTEX1G5XAK844393 | 108.581 | 34.7993 | 37 |
2 | ff8e7131... | 20:26:44.000 | 2LAXX1C8XAK844292 | 114.392 | 36.0976 | 90 |
3 | bc75da5f... | 23:16:06.000 | 3FTEX1G5XAK844393 | -76.699 | 2.48121 | 40 |
4 | 7bdcf779... | 22:28:44.000 | 5FTEX1MAXAK844295 | -47.0145 | -21.4649 | 40 |
res = firehose.create_delivery_stream(
DeliveryStreamName = "gps-delivery-stream",
DeliveryStreamType = "DirectPut",
S3DestinationConfiguration = {
"RoleARN": "arn:aws:iam::0000000:role/firehoseDeliveryRole",
"BucketARN": "arn:aws:s3:::sd-vehicle-data",
}
)
record_id | timestamp | vin | lon | lat | speed | |
---|---|---|---|---|---|---|
0 | 939ed1d1... | 4:25:06.000 | 4FTEX4944AK844294 | 106.945 | -6.33857 | 25 |
1 | f29a5b3d... | 8:10:47.000 | 3FTEX1G5XAK844393 | 108.581 | 34.7993 | 37 |
2 | ff8e7131... | 20:26:44.000 | 2LAXX1C8XAK844292 | 114.392 | 36.0976 | 90 |
3 | bc75da5f... | 23:16:06.000 | 3FTEX1G5XAK844393 | -76.699 | 2.48121 | 40 |
4 | 7bdcf779... | 22:28:44.000 | 5FTEX1MAXAK844295 | -47.0145 | -21.4649 | 40 |
Streaming Data with AWS Kinesis and Lambda