0

Problem statement was to get all managers of employees upto a given level in Spark.

For example: in the below dataset..

EMPLOYEE_ID,FIRST_NAME,LAST_NAME,EMAIL,PHONE_NUMBER,HIRE_DATE,JOB_ID,SALARY,COMMISSION_PCT,MANAGER_ID,DEPARTMENT_ID
1,Donald,OConnell,DOCONNEL,650.507.9833,21/06/2007,SH_CLERK,2600,,2,500
2,Douglas,Grant,DGRANT,650.507.9844,13/01/2008,SH_CLERK,2600,,3,50
3,Jennifer,Whalen,JWHALEN,515.123.4444,17/09/2003,AD_ASST,4400,,4,10
4,Michael,Hartstein,MHARTSTE,515.123.5555,17/02/2004,MK_MAN,13000,,5,20
5,Pat,Fay,PFAY,603.123.6666,17/08/2005,MK_REP,6000,,6,20
6,Susan,Mavris,SMAVRIS,515.123.7777,07/06/2002,HR_REP,6500,,7,40
7,Hermann,Baer,HBAER,515.123.8888,07/06/2002,PR_REP,10000,,8,70
8,Shelley,Higgins,SHIGGINS,515.123.8080,07/06/2002,AC_MGR,12008,,9,110
9,William,Gietz,WGIETZ,515.123.8181,07/06/2002,AC_ACCOUNT,8300,,,110

Given employee '1' result expected is ['3', '4', '5', '6', '7', '8', '9']

Below is the PySpark code which I tried writing which however failed with the mentioned error.

import sys    
import os    
from operator import add    
import re    
import pyspark.sql.functions as F    

os.environ['SPARK_HOME'] = "path"    
sys.path.append("path")    
sys.path.append("path")    

def recur_man(emp_id,lvl,list1):    
    with open("path\\employee_1.txt") as f:    
            for lines in f:    
                if lines.split(',')[0] == emp_id:    
                    list1.append(lines.split(',')[9])    
                    lvl-=1    
                    recur_man(lines.split(',')[9],lvl,list1)    
    return list1    


try:    
    from pyspark import SparkContext    
    from pyspark import SparkConf    
    from pyspark.sql import SQLContext    
    from pyspark.sql.functions import *    
    from pyspark.sql.types import *    
    config = SparkConf().setAll([('spark.num.executors','10'),('spark.ui.port','4050')])    
    sc = SparkContext(conf=config)    
    sqlContext = SQLContext(sc)    
    rdd = sc.textFile("path\\employee_1.txt")    
    header = rdd.first()     
    header_mod = [x.encode("utf-8") for x in header.split(',')]    
    rdd = rdd.filter(lambda line: line!=header)    
    rdd = rdd.map(lambda line: line.split(','))    
    df1 = rdd.toDF(header_mod)    
    spark_recur_man = udf(lambda x: recur_man,ArrayType(StringType()))    
    list1 = []    
    df1.select('EMPLOYEE_ID',spark_recur_man('EMPLOYEE_ID',3,list1).alias('heirarchy')).show(truncate=False)    
    sc.stop()    
except ImportError as e:    
    print ("Error importing Spark Modules", e)    
    sys.exit(1)    

Error as below:

df1.select('EMPLOYEE_ID',spark_recur_man('EMPLOYEE_ID',3,list1).alias('heirarchy')).show(truncate=False)

File "C:\opt\spark\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\functions.py", line 1957, in wrapper File "C:\opt\spark\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\functions.py", line 1918, in call File "C:\opt\spark\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\column.py", line 60, in _to_seq File "C:\opt\spark\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\column.py", line 48, in _to_java_column File "C:\opt\spark\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\column.py", line 41, in _create_column_from_name File "C:\opt\spark\spark-2.2.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py", line 1133, in call File "C:\opt\spark\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\utils.py", line 63, in deco File "C:\opt\spark\spark-2.2.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py", line 323, in get_return_value py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace: py4j.Py4JException: Method col([class java.lang.Integer]) does not exist

Can anyone help me with the above issue.

Thanks in advance

0

1 Answer 1

1

Spark is not able to get the accept the type of last parameter of Array to UDF i.e. list1.

Below is work around for now you can try:

import sys    
import os    
from operator import add    
import re    
import numpy as np
import pyspark.sql.functions as F   
import pyspark.sql.types as t
import pyspark.sql.column as c 

def square(x):
    return x**2

def recur_man(emp_id,lvl,list1):
    if(type(list1)==int):
        list1=[]
    with open("D:\MOCK_DATA\employee_1.txt") as f:    
            for lines in f:    
                if lines.split(',')[0] == emp_id:    
                    list1.append(lines.split(',')[9])    
                    lvl-=1    
                    recur_man(lines.split(',')[9],lvl,list1) 
    return list1

try:    
    from pyspark import SparkContext    
    from pyspark import SparkConf    
    from pyspark.sql import SQLContext    
    from pyspark.sql.functions import *    
    from pyspark.sql.types import *        
    sc = SparkContext.getOrCreate()
    sqlContext = SQLContext(sc)    
    rdd = sc.textFile("D:\MOCK_DATA\employee_1.txt")    
    header = rdd.first() 
    header_mod = [x.encode("utf-8") for x in header.split(',')]    
    rdd = rdd.filter(lambda line: line!=header)    
    rdd = rdd.map(lambda line: line.split(','))    
    df1 = rdd.toDF(header_mod)    
    spark_recur_man = udf(lambda x,y,z: recur_man(x,y,z), t.ArrayType(t.StringType()))    
    list1 = lit(0)    
    df1.select('EMPLOYEE_ID',spark_recur_man('EMPLOYEE_ID',lit(3),list1).alias('heirarchy')).show(truncate=False)    
    sc.stop()    
except ImportError as e:    
    print ("Error importing Spark Modules", e)    
    sys.exit(1)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.