I am having dataframe which has a column of dense vectors i.e. Multiclass classification prediction probabilities. I want to convert that column to numpy array and facing issues of shape mismatch. There are the things I tried.
One answer I found on here did converted the values into numpy array but in original dataframe it had
4653observations but the shape of numpy array was(4712, 21). I dont understand how it increased and in another attempt with same code numpy array shape desreased the the count of original dataframe. I dont understand why?I also tried
predictions.select("probability").toPandas().values.shapebut again the shape was mismatched. I usedcount()method of pyspark dataframe to check the lenght of dataframe.I also tried UTF with
toArray()method of column of pyspark dataframe which resulted in strange error like thisorg.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 116.0 failed 4 times, most recent failure: Lost task 2.3 in stage 116.0 (TID 6254, 10.2.1.54, executor 0): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.core.multiarray._reconstruct)
Here is what I am doing
rf = RandomForestClassifier(
featuresCol="features",
labelCol=TARGET_COL,
predictionCol=TARGET_COL + "_predicted",
# impurity="entropy"
# maxDepth=5,
# numTrees=1000,
# minInfoGain=0.2,
# subsamplingRate=0.8
)
evaluator = MulticlassClassificationEvaluator(
predictionCol=TARGET_COL + "_predicted",
labelCol=TARGET_COL,
metricName="accuracy"
)
paramGrid = ParamGridBuilder(). \
addGrid(rf.maxDepth, [3, 5, 7, 9, 11]). \
addGrid(rf.numTrees, [20, 50, 100, 200, 500]). \
addGrid(rf.minInfoGain, [0.0, 0.2, 0.5, 1.0]). \
addGrid(rf.subsamplingRate, [0.5, 0.8, 1.0]). \
addGrid(rf.impurity, ["entropy", "gini"]). \
build()
paramGrid = ParamGridBuilder(). \
addGrid(rf.maxDepth, [3]). \
addGrid(rf.numTrees, [2]). \
addGrid(rf.minInfoGain, [0.0]). \
addGrid(rf.subsamplingRate, [0.5]). \
addGrid(rf.impurity, ["entropy"]). \
build()
tvs = TrainValidationSplit(estimator=rf,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
trainRatio=0.8)
print("~~~~~~~~~~~ Model Training Started ~~~~~~~~~~~")
model = tvs.fit(train_df)
best_model = model.bestModel
print(best_model._java_obj.parent().getImpurity())
print(best_model._java_obj.parent().getMaxDepth())
print(best_model._java_obj.parent().getNumTrees())
print(best_model._java_obj.parent().getMinInfoGain())
print(best_model._java_obj.parent().getSubsamplingRate())
prob_array = []
predictions = model.transform(test_df)
print(predictions.count())
print(test_df.count())
pprint(predictions.select("probability").head(1)[0].probability)
pprint(predictions.select("probability").head(1)[0].probability.toArray())
pprint(type(predictions.select("probability").head(1)[0].probability.toArray()))
pprint(predictions.select("probability").head(1)[0].probability.toArray().shape)
print(predictions.select("probability").count())
print(predictions.select("probability").toPandas())
print(predictions.select("probability").toPandas().values.shape)
[Row(probability=DenseVector([0.2066, 0.1184, 0.1138, 0.1158, 0.0876, 0.0548, 0.0628, 0.0713, 0.041, 0.0306, 0.0258, 0.0271, 0.0177, 0.0081, 0.0085, 0.0044, 0.0032, 0.0015, 0.0005, 0.0005, 0.0])), Row(probability=DenseVector([0.1902, 0.0679, 0.1281, 0.0939, 0.0719, 0.0205, 0.0977, 0.0471, 0.0946, 0.0491, 0.0425, 0.0292, 0.0113, 0.0328, 0.0098, 0.0048, 0.0029, 0.0036, 0.0016, 0.0002, 0.0003]))]