0

I have a multiline flat file which I wish to convert to an rdd/dataframe as a 4 column dataframe, or rdd array via PySpark. The Spark Scala code is,

#from pyspark.sql import SparkSession # Scala equivalant
#from pyspark import SparkContext # Scala equivalant
import org.apache.spark.mllib.rdd.RDDFunctions._
path = '/mypath/file'
spark = SparkSession.builder.appName('findApp').getOrCreate()
rdd = spark.sparkContext.textFile(path).sliding(4, 4).toDF("x", "y", "z", "a")

There is not a sliding() function in PySpark. What is the equivalent? The input is

A
B
C
D
A2
B2
C2
D2

The desired output is

x y z a
A B C D
A2 B2 C2 D2

I'd better add that the data sets are around 50 million records, per data set and there are couple of 100 data sets. So it's over 2 terabyte of data in total because one column holds >300 features. I like the pandas code by @GoodMan

2 Answers 2

1

feel free to modernize it. Since I am not aware how your data is sorted, feel free to change the window function. You can also change the number of columns if required.

from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import *

# Create a SparkSession
spark = SparkSession.builder.appName("Data Transformation").getOrCreate()

data = ["A", "B", "C", "D", "A2", "B2", "C2", "D2"]

schema = "col1 string, id int"

data_rdd = spark.sparkContext.parallelize(data)

new_rdd = data_rdd.map(lambda x: (x,1))

df = new_rdd.toDF(schema)

mywindow = Window.orderBy("col1")

df_row = df.withColumn("column",row_number().over(mywindow)).withColumn("row",expr("ceiling(column/4)"))

df_row.createOrReplaceTempView("data")

df_final = spark.sql("""
Select row,
case when column%4= 1 then col1 else null end as col1,
case when column%4= 2 then col1 else null end as col2,
case when column%4= 3 then col1 else null end as col3,
case when column%4= 0 then col1 else null end as col4
from data order by col1

""")

df_final2 = df_final.groupBy('row').agg(first("col1",ignorenulls=True).alias("x"), \
                      first("col2",ignorenulls=True).alias("y"), \
                      first("col3",ignorenulls=True).alias("z"), \
                      first("col4",ignorenulls=True).alias("a"),)

print(data)

df_final2.show()

spark.stop()
Sign up to request clarification or add additional context in comments.

Comments

1
import pandas as pd
string = """A
            B
            C
            D
            A2
            B2
            C2
            D2"""
data = string.split("\n")
col = 4
df = pd.DataFrame([data[i:i+col] for i in range(0, len(data), col)], columns= list("xyza"))

Output:

x y z a
A B C D
A2 B2 C2 D2

1 Comment

This is very nice pandas indeed. Thank you. The data sets are around 50 million records, whether pandas can cope with that I'm not sure, but I love the pandas code.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.