Machine Learning with PySpark
Andrew Collier
Data Scientist, Fathom Data
+-----+-------+-------+------+----+----+------+------+----+-----------+
|maker| model| origin| type| cyl|size|weight|length| rpm|consumption|
+-----+-------+-------+------+----+----+------+------+----+-----------+
|Mazda| RX-7|non-USA|Sporty|null| 1.3| 2895| 169.0|6500| 9.41|
| Geo| Metro|non-USA| Small| 3| 1.0| 1695| 151.0|5700| 4.7|
| Ford|Festiva| USA| Small| 4| 1.3| 1845| 141.0|5000| 7.13|
+-----+-------+-------+------+----+----+------+------+----+-----------+
Remove the maker
and model
fields.
# Either drop the columns you don't want... cars = cars.drop('maker', 'model')
# ... or select the columns you want to retain. cars = cars.select('origin', 'type', 'cyl', 'size', 'weight', 'length', 'rpm', 'consumption')
+-------+------+----+----+------+------+----+-----------+
| origin| type| cyl|size|weight|length| rpm|consumption|
+-------+------+----+----+------+------+----+-----------+
|non-USA|Sporty|null| 1.3| 2895| 169.0|6500| 9.41|
|non-USA| Small| 3| 1.0| 1695| 151.0|5700| 4.7|
| USA| Small| 4| 1.3| 1845| 141.0|5000| 7.13|
+-------+------+----+----+------+------+----+-----------+
# How many missing values?
cars.filter('cyl IS NULL').count()
1
Drop records with missing values in the cylinders
column.
cars = cars.filter('cyl IS NOT NULL')
Drop records with missing values in any column.
cars = cars.dropna()
from pyspark.sql.functions import round # Create a new 'mass' column cars = cars.withColumn('mass', round(cars.weight / 2.205, 0))
# Convert length to metres cars = cars.withColumn('length', round(cars.length * 0.0254, 3))
+-------+-----+---+----+------+------+----+-----------+-----+
| origin| type|cyl|size|weight|length| rpm|consumption| mass|
+-------+-----+---+----+------+------+----+-----------+-----+
|non-USA|Small| 3| 1.0| 1695| 3.835|5700| 4.7|769.0|
| USA|Small| 4| 1.3| 1845| 3.581|5000| 7.13|837.0|
|non-USA|Small| 3| 1.3| 1965| 4.089|6000| 5.47|891.0|
+-------+-----+---+----+------+------+----+-----------+-----+
from pyspark.ml.feature import StringIndexer indexer = StringIndexer(inputCol='type', outputCol='type_idx')
# Assign index values to strings indexer = indexer.fit(cars)
# Create column with index values cars = indexer.transform(cars)
Use stringOrderType
to change order.
+-------+--------+
| type|type_idx|
+-------+--------+
|Midsize| 0.0| <- most frequent value
| Small| 1.0|
|Compact| 2.0|
| Sporty| 3.0|
| Large| 4.0|
| Van| 5.0| <- least frequent value
+-------+--------+
# Index country of origin:
#
# USA -> 0
# non-USA -> 1
#
cars = StringIndexer(
inputCol="origin",
outputCol="label"
).fit(cars).transform(cars)
+-------+-----+
| origin|label|
+-------+-----+
| USA| 0.0|
|non-USA| 1.0|
+-------+-----+
Use a vector assembler to transform the data.
from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler(inputCols=['cyl', 'size'], outputCol='features')
assembler.transform(cars)
+---+----+---------+
|cyl|size| features|
+---+----+---------+
| 3| 1.0|[3.0,1.0]|
| 4| 1.3|[4.0,1.3]|
| 3| 1.3|[3.0,1.3]|
+---+----+---------+
Machine Learning with PySpark