Introduction to Spark SQL in Python
Mark Plutowski
Data Scientist
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
bad_udf = udf(lambda x:
x.indices[0]
if (x and hasattr(x, "toArray") and x.numNonzeros())
else 0,
IntegerType())
try:
df.select(bad_udf('outvec').alias('label')).first()
except Exception as e:
print(e.__class__)
print(e.errmsg)
<class 'py4j.protocol.Py4JJavaError'>
An error occurred while calling o90.collectToPython.
first_udf = udf(lambda x:
int(x.indices[0])
if (x and hasattr(x, "toArray") and x.numNonzeros())
else 0,
IntegerType())
+-------+--------------------+-----+--------------------+-------------------+
|endword| doc|count| features| outvec|
+-------+--------------------+-----+--------------------+-------------------+
| it|[please, do, not,...| 1149|(12847,[15,47,502...| (12847,[7],[1.0])|
| holmes|[start, of, the, ...| 107|(12847,[0,3,183,1...|(12847,[145],[1.0])|
| i|[the, adventures,...| 103|(12847,[0,3,35,14...| (12847,[11],[1.0])|
+-------+--------------------+-----+--------------------+-------------------+
df.withColumn('label', k_udf('outvec')).drop('outvec').show(3)
+-------+--------------------+-----+--------------------+-----+
|endword| doc|count| features|label|
+-------+--------------------+-----+--------------------+-----+
| it|[please, do, not,...| 1149|(12847,[15,47,502...| 7|
| holmes|[start, of, the, ...| 107|(12847,[0,3,183,1...| 145|
| i|[the, adventures,...| 103|(12847,[0,3,35,14...| 11|
+-------+--------------------+-----+--------------------+-----+
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol='words',
outputCol="features")
model = cv.fit(df)
result = model.transform(df)
print(result)
DataFrame[words: array<string>, features: vector]
# Dense string array on left, dense integer vector on right
+-------------------------+--------------------------------------+
|words |features |
+-------------------------+--------------------------------------+
|[Hello, world] |(10,[7,9],[1.0,1.0]) |
|[How, are, you?] |(10,[1,3,4],[1.0,1.0,1.0]) |
|[I, am, fine, thank, you]|(10,[0,2,5,6,8],[1.0,1.0,1.0,1.0,1.0])|
+-------------------------+--------------------------------------+
Introduction to Spark SQL in Python