2

I am trying to run a subquery in pyspark. I see that it is possible using SQL statements. But is there any inherent support using "where" or "filter" operations?

Consider the test data frame :

from pyspark.sql import SparkSession
sqlContext = SparkSession.builder.appName('test').enableHiveSupport().getOrCreate() 
tst = sqlContext.createDataFrame([(1,2),(4,3),(1,4),(1,5),(1,6)],schema=['sample','time'])
tst_sub = sqlContext.createDataFrame([(1,2),(4,3),(1,4)],schema=['sample','time'])
#%% using where to query the df
tst.where(F.col('time')>4).show()
+------+----+
|sample|time|
+------+----+
|     1|   5|
|     1|   6|
+------+----+

Here you can see that the where function is working fine. When I try to do the same using a subquery , like this:

#%% using where with subquery
tst.where(F.col('time')>F.max(tst_sub.select('time'))).show()

I get this error:

AttributeError Traceback (most recent call last) in ----> 1 tst.where(F.col('time')>F.max(tst_sub.select('time'))).show()

/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p4744.12781922/lib/spark/python/pyspark/sql/functions.py in _(col) 42 def _(col): 43 sc = SparkContext._active_spark_context ---> 44 jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col) 45 return Column(jc) 46 _.name = name

/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p4744.12781922/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in call(self, *args) 1246 1247 def call(self, *args): -> 1248 args_command, temp_args = self._build_args(*args) 1249 1250 command = proto.CALL_COMMAND_NAME +\

/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p4744.12781922/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in _build_args(self, *args) 1216 1217 args_command = "".join( -> 1218 [get_command_part(arg, self.pool) for arg in new_args]) 1219 1220 return args_command, temp_args

/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p4744.12781922/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in (.0) 1216 1217 args_command = "".join( -> 1218 [get_command_part(arg, self.pool) for arg in new_args]) 1219 1220 return args_command, temp_args

/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p4744.12781922/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_command_part(parameter, python_proxy_pool) 296 command_part += ";" + interface 297 else: --> 298 command_part = REFERENCE_TYPE + parameter._get_object_id() 299 300 command_part += "\n"

/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p4744.12781922/lib/spark/python/pyspark/sql/dataframe.py in getattr(self, name) 1298 if name not in self.columns: 1299 raise AttributeError( -> 1300 "'%s' object has no attribute '%s'" % (self.class.name, name)) 1301 jc = self._jdf.apply(name) 1302 return Column(jc)

AttributeError: 'DataFrame' object has no attribute '_get_object_id'

When I register the dataframes as table and perform a sql query, it works fine:

tst.createOrReplaceTempView("tst")
tst_sub.createOrReplaceTempView("tst_sub")
sqlContext.sql("SELECT * FROM tst WHERE time>(SELECT(max(time)) FROM tst_sub)").show()

Is there any method to perform a subquery in pyspark on the dataframes directly using filter, where or any other methods?

1 Answer 1

1

You need to collect the max time into a numerical variable in Python before putting it in the filter:

tst.where(F.col('time') > tst_sub.select(F.max('time')).head()[0]).show()
+------+----+
|sample|time|
+------+----+
|     1|   5|
|     1|   6|
+------+----+
Sign up to request clarification or add additional context in comments.

5 Comments

Thanks a lot for the answer. Will there be any performance issue when I do the collect ? Are subqueries faster because it does not need this?
You're just collecting a single number, so performance shouldn't be an issue. But I prefer using subqueries because it doesn't require collect.
Exactly, I prefer to avoid collect too. In this example , the query is simple. But not sure if collect is always a good idea. So am searching for the syntax of using subqueries, without any sql type commands. any idea?
it's not possible to use subqueries with dataframe API. You need to use SQL API for subqueries.
Thanks, this was what i was lookin for. Can you please add this line in your answer? I will mark it as accepted.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.