Pipeline

Machine Learning with PySpark

Andrew Collier

Data Scientist, Fathom Data

Leakage?

The fit() method

Only for training data.

The transform() method

For testing and training data.

Machine Learning with PySpark

A leaky model

Model where testing data used for training

Machine Learning with PySpark

A watertight model

Model where only training data used for training

Machine Learning with PySpark

Pipeline

A pipeline consists of a series of operations.

A pipeline with multiple stages

You could apply each operation individually... or you could just apply the pipeline!

Machine Learning with PySpark

Cars model: Steps

indexer = StringIndexer(inputCol='type', outputCol='type_idx')

onehot = OneHotEncoder(inputCols=['type_idx'], outputCols=['type_dummy'])
assemble = VectorAssembler( inputCols=['mass', 'cyl', 'type_dummy'], outputCol='features' )
regression = LinearRegression(labelCol='consumption')
Machine Learning with PySpark

Cars model: Applying steps

Training data

indexer = indexer.fit(cars_train)
cars_train = indexer.transform(cars_train)
onehot = onehot.fit(cars_train)
cars_train = onehot.transform(cars_train)
cars_train = assemble.transform(cars_train)
# Fit model to training data
regression = regression.fit(cars_train)

Testing data

cars_test  = indexer.transform(cars_test)
cars_test  = onehot.transform(cars_test)
cars_test  = assemble.transform(cars_test)
# Make predictions on testing data
predictions = regression.transform(cars_test)
Machine Learning with PySpark

Cars model: Pipeline

Combine steps into a pipeline.

from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[indexer, onehot, assemble, regression])

Training data

pipeline = pipeline.fit(cars_train)

Testing data

predictions = pipeline.transform(cars_test)
Machine Learning with PySpark

Cars model: Stages

Access individual stages using the .stages attribute.

# The LinearRegression object (fourth stage -> index 3)
pipeline.stages[3]

print(pipeline.stages[3].intercept)
4.19433571782916
print(pipeline.stages[3].coefficients)
DenseVector([0.0028, 0.2705, -1.1813, -1.3696, -1.1751, -1.1553, -1.8894])
Machine Learning with PySpark

Pipelines streamline workflow!

Machine Learning with PySpark

Preparing Video For Download...