Using any data in Dask bags

Parallel Programming with Dask in Python

James Fulton

Climate Informatics Researcher

Complex mixed data formats

A diagram showing a dataset which includes both video and sound.

Parallel Programming with Dask in Python

Creating a Dask bag

import glob

video_filenames = glob.glob("*.mp4")
print(video_filenames)
['me_at_the_zoo.mp4', 'life_goes_on.mp4', 'guitar.mp4', 'hurt.mp4', ...]
Parallel Programming with Dask in Python

Creating a Dask bag

import glob

video_filenames = glob.glob("*.mp4")
import dask.bag as db

filename_bag = db.from_sequence(video_filenames)
filename_bag.take(1)[0]
'me_at_the_zoo.mp4'
Parallel Programming with Dask in Python

Loading custom data

# Loads a single video
load_mp4("video.mp4")
{'video': array(
         [[[ 51,  57,  37, ..., 227, 238, 168],
         ...,
         [ 83, 125, 129, ..., 222, 148, 208]]]),
 'audio': array([   7. ,    9. ,    9.5, ..., -544.5, -551. , -558. ]),
 'filename': 'video.mp4'}
Parallel Programming with Dask in Python

Loading custom data

data_bag = filename_bag.map(load_mp4)
data_bag.take(1)[0]
{'video': array(
         [126, 162, 203, ...,  63,  58,   8],
         ...,
         [ 58, 222, 170, ..., 234,  63,  81]]]),
 'audio': array([-203.5, -209. , -207. , ..., -222.5, -233. , -248.5]),
 'filename': 'me_at_the_zoo.mp4'}
Parallel Programming with Dask in Python

Loading custom data

data_bag = filename_bag.map(load_mp4)
# Create empty list
data_list = []

# Add delayed loaded files to list
for file in video_filenames:
    data_list.append(dask.delayed(load_mp4)(file))
Parallel Programming with Dask in Python

List of delayed objects vs. Dask bag

# Convert list of delayed objects to dask bag
data_bag = db.from_delayed(data_list)
# Convert dask bag to list of delayed objects
data_list = data_bag.to_delayed()
Parallel Programming with Dask in Python

Further analysis

transcribed_bag = data_bag.map(transcribe_audio)
transcribed_bag.take(1)[0]
{'video': array(
         [126, 162, 203, ...,  63,  58,   8],
         ...,
         [ 58, 222, 170, ..., 234,  63,  81]]]),
 'audio': array([-203.5, -209. , -207. , ..., -222.5, -233. , -248.5]),
 'filename': 'me_at_the_zoo.mp4'
 'transcript': "All right, so here we are in front of the, uh, elephants ...",
}
Parallel Programming with Dask in Python

Further analysis

# Apply custom function to remove videos with no spoken words
clean_bag = transcribed_bag.filter(transcript_is_not_blank)

# Apply sentiment analysis to transcripts
sentiment_bag = clean_bag.map(analyze_transcript_sentiment)
# Remove unwanted elements from bag
keys_to_drop = ['video', 'audio']
final_bag = sentiment_bag.map(filter_dictionary, keys_to_drop=keys_to_drop)

# Convert to Dask DataFrame
df = final_bag.to_dataframe()
Parallel Programming with Dask in Python

Results

df.compute()
            filename              transcript        sentiment
0  me_at_the_zoo.mp4  All right, so here ...         positive
...              ...                     ...              ...
Parallel Programming with Dask in Python

Using .wav files

# Import scipy module for .wav files
from scipy.io import wavfile

# Load sampling frequency and audio array
sample_freq, audio = wavfile.read(filename)
Parallel Programming with Dask in Python

Using .wav files

# Samples per second
print(sample_freq)
44100
# The audio data
print(audio)
array([ 148,  142,  150, ..., -542, -546, -559], dtype=int16)
Parallel Programming with Dask in Python

Let's practice!

Parallel Programming with Dask in Python

Preparing Video For Download...