1

I want to download some xml files (50MBs each - about 3000 = 150GBs), process them and upload to BigQuery using pyspark. For the development purpose I was using jupyter notebook and small amount of files 10. I wrote pretty complex code setup cluster on dataproc. My daproc cluster has 6TBs of HDFSs, 10 nodes (each 4 cores) and 120GBs of RAM.

def context():
    import os
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell'
    import pyspark
    conf = pyspark.SparkConf()

    conf = (conf.setMaster('local[*]')
            .set('spark.executor.memory', '4G')
            .set('spark.driver.memory', '45G')
            .set('spark.driver.maxResultSize', '10G')
            .set("spark.python.profile", "true"))
    sc = pyspark.SparkContext(conf=conf)
    return sc
def job(sc):
    print("Job started")
    RDDread = sc.wholeTextFiles("s3a://custom-bucket/*/*.gz")
    models = RDDread.flatMap(process_xmls).groupByKey()
    tracking_all = (models.filter(lambda x: x[0] == TrackInformation)
                    .flatMap(lambda x: x[1])
                    .map(lambda model: (model.flight_ref, model))
                    .groupByKey())
    tracking_merged = tracking_all.map(lambda x: x[1]).map(merge_ti)
    flight_plans = (models.filter(lambda x: x[0] == FlightPlan).flatMap(lambda x: x[1]).map(lambda fp: (fp.flight_ref, fp)))
    fps_tracking = tracking_merged.union(flight_plans).groupByKey().filter(lambda x: len(x[1]) == 2)
    in_bq_batch = 1000
    n = fps_tracking.count()
    parts = ceil(n / in_bq_batch)
    many_n = fps_tracking.repartition(parts).mapPartitions(upload_fpm2)
    print("Job ended")
    return fps_tracking, tracking_merged, flight_plans, models, many_n

After 200 messages org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz] I'm getting 2 errors: java.lang.OutOfMemoryError and MemoryError, mostly MemoryError. I thought that I have just 2 partitions after RDDread, so I modified code for: sc.wholeTextFiles("s3a://custom-bucket//.gz", minPartitions=40) -> And it got broke even faster. I was adding persistent(DISK) function in some random places.

File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 684, in loads
    return s.decode("utf-8") if self.use_unicode else s
MemoryError
19/05/20 14:09:23 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/20 14:09:30 ERROR org.apache.spark.util.Utils: Uncaught exception in thread stdout writer for /opt/conda/default/bin/python
java.lang.OutOfMemoryError: Java heap space

What I am doing wrong and how to debug my code?

1 Answer 1

1

You seem to be running spark in local mode (local[*]). This means that you are using a single jvm with 45G of RAM (spark.driver.memory) and that all your worker threads run within that jvm. The spark.executor.memory option has no effect What does setMaster `local[*]` mean in spark?.

You should set up your spark master either to the yarn scheduler, or if you have no yarn use standalone mode https://spark.apache.org/docs/latest/spark-standalone.html.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.