0

I have a Python function:

def get_log_probability(string, transition_log_probabilities):
    string = ngrams(string, 2)
    terms = [transition_log_probabilities[bigram]
                       for bigram in string]
    log_probability = sum(terms)/len(terms) if len(terms) > 0 else sum(terms)
    return log_probability

I want to use this function for Pyspark DataFrame column with transition_log_probabilities as a constant as follows:

transition_log_probabilities = {('a', 'a'): -3.688879454113936,
('a', 'b'): -3.688879454113936,
('a', 'c'): -3.688879454113936,
('b', 'a'): -3.688879454113936,
('b', 'b'): -3.688879454113936,
('b', 'c'): -3.688879454113936,
('c', 'a'): -3.688879454113936,
('c', 'b'): -3.688879454113936,
('c', 'c'): -3.688879454113936}

So I change it to Pyspark UDF:

def get_log_prob_udf(dictionary):
    return udf(lambda string: get_log_probability(string, dictionary), FloatType())

Even though get_log_probability("abc", transition_log_probabilities) works and gives a result of -3.688879454113936, when I apply its UDF into Pyspark as follows:

df = df \
.withColumn("string_log_probability", get_log_prob_udf(transition_log_probabilities)(col('string')))

It doesn't work and throws out the error of

An error occurred while calling o3463.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 
182.0 failed 1 times, most recent failure: Lost task 0.0 in stage 182.0 (TID 774) 
(kubernetes.docker.internal executor driver): net.razorvine.pickle.PickleException: 
expected zero arguments for construction of ClassDict (for numpy.dtype)

Does anyone know how to solve it? Thank you very much.

4
  • You defined transition_log_probabilities as a dictionary but how does it look like in a df column? Commented Mar 21, 2022 at 12:25
  • It is not a df column. It is just a constant like the example in my question transition_log_probabilities = {('a', 'a'): -3.688879454113936, ('a', 'b'): -3.688879454113936, ('a', 'c'): -3.688879454113936, ('b', 'a'): -3.688879454113936, ('b', 'b'): -3.688879454113936, ('b', 'c'): -3.688879454113936, ('c', 'a'): -3.688879454113936, ('c', 'b'): -3.688879454113936, ('c', 'c'): -3.688879454113936} Commented Mar 21, 2022 at 12:36
  • OK, how is this col('string')column? Can you provide some exemples of your df? Commented Mar 21, 2022 at 12:44
  • My df can be demonstrated as follows: df = spark.createDataFrame([(1, "bc"), (2, "aa"), (3, "ca")], ["id", "string"] Commented Mar 21, 2022 at 12:54

1 Answer 1

1

Hope that's the outcome you are looking for.

df = spark.createDataFrame( [ (1, "bc"), (2, "aa"), (3, "ca") ], ["id", "string"] )
                           
from pyspark.sql import functions as F, types as T
from nltk import ngrams
                           
transition_log_probabilities = {('a', 'a'): -3.688879454113936,
        ('a', 'b'): -3.688879454113936,
        ('a', 'c'): -3.688879454113936,
        ('b', 'a'): -3.688879454113936,
        ('b', 'b'): -3.688879454113936,
        ('b', 'c'): -3.688879454113936,
        ('c', 'a'): -3.688879454113936,
        ('c', 'b'): -3.688879454113936,
        ('c', 'c'): -3.688879454113936}
    
def get_log_probability(string):
    
    string = ngrams(string, 2)
    terms = [transition_log_probabilities[bigram]
                       for bigram in string]
    log_probability = sum(terms)/len(terms) if len(terms) > 0 else sum(terms)
    return log_probability


get_log_prob_udf = udf(get_log_probability, T.FloatType())

df.withColumn('string_log_probability', get_log_prob_udf(F.col('string'))).show()
+---+------+----------------------+
| id|string|string_log_probability|
+---+------+----------------------+
|  1|    bc|            -3.6888795|
|  2|    aa|            -3.6888795|
|  3|    ca|            -3.6888795|
+---+------+----------------------+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.