Parallel Programming with Dask in Python
James Fulton
Climate Informatics Researcher
import glob
video_filenames = glob.glob("*.mp4")
print(video_filenames)
['me_at_the_zoo.mp4', 'life_goes_on.mp4', 'guitar.mp4', 'hurt.mp4', ...]
import glob
video_filenames = glob.glob("*.mp4")
import dask.bag as db
filename_bag = db.from_sequence(video_filenames)
filename_bag.take(1)[0]
'me_at_the_zoo.mp4'
# Loads a single video
load_mp4("video.mp4")
{'video': array(
[[[ 51, 57, 37, ..., 227, 238, 168],
...,
[ 83, 125, 129, ..., 222, 148, 208]]]),
'audio': array([ 7. , 9. , 9.5, ..., -544.5, -551. , -558. ]),
'filename': 'video.mp4'}
data_bag = filename_bag.map(load_mp4)
data_bag.take(1)[0]
{'video': array(
[126, 162, 203, ..., 63, 58, 8],
...,
[ 58, 222, 170, ..., 234, 63, 81]]]),
'audio': array([-203.5, -209. , -207. , ..., -222.5, -233. , -248.5]),
'filename': 'me_at_the_zoo.mp4'}
data_bag = filename_bag.map(load_mp4)
# Create empty list
data_list = []
# Add delayed loaded files to list
for file in video_filenames:
data_list.append(dask.delayed(load_mp4)(file))
# Convert list of delayed objects to dask bag
data_bag = db.from_delayed(data_list)
# Convert dask bag to list of delayed objects
data_list = data_bag.to_delayed()
transcribed_bag = data_bag.map(transcribe_audio)
transcribed_bag.take(1)[0]
{'video': array(
[126, 162, 203, ..., 63, 58, 8],
...,
[ 58, 222, 170, ..., 234, 63, 81]]]),
'audio': array([-203.5, -209. , -207. , ..., -222.5, -233. , -248.5]),
'filename': 'me_at_the_zoo.mp4'
'transcript': "All right, so here we are in front of the, uh, elephants ...",
}
# Apply custom function to remove videos with no spoken words
clean_bag = transcribed_bag.filter(transcript_is_not_blank)
# Apply sentiment analysis to transcripts
sentiment_bag = clean_bag.map(analyze_transcript_sentiment)
# Remove unwanted elements from bag
keys_to_drop = ['video', 'audio']
final_bag = sentiment_bag.map(filter_dictionary, keys_to_drop=keys_to_drop)
# Convert to Dask DataFrame
df = final_bag.to_dataframe()
df.compute()
filename transcript sentiment
0 me_at_the_zoo.mp4 All right, so here ... positive
... ... ... ...
# Import scipy module for .wav files
from scipy.io import wavfile
# Load sampling frequency and audio array
sample_freq, audio = wavfile.read(filename)
# Samples per second
print(sample_freq)
44100
# The audio data
print(audio)
array([ 148, 142, 150, ..., -542, -546, -559], dtype=int16)
Parallel Programming with Dask in Python