7

I have CSV data that is crawled via a glue crawler, and ends up in one table.

I'm trying to run an ETL job to re-partition the data on disk into some components of the date column. Then convert the CSV to parquet.

i.e. I have a column named "date" in my data, and wanted to partition the data into year, month,day partitions on s3.

I am able to convert to parquet and get it to partition correctly on the serial number value (a different column), but it puts the value "__HIVE_DEFAULT_PARTITION__" in for all the values year, month, and day for the date related partitions.

I am able to partition on other columns (like serial-number), but the year/month/day are not in the original data set, and so my approach has been to create the values from the date column as new columns in the data set and tell the write_dynamic_frame function to partition by the columns, but that isn't working.

I'm new to spark/pyspark and glue in general, so there is a very real possibility that i'm missing something simple.

Thanks to anyone who offers assistance.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql import functions as F
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job


args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "my_database", table_name = "my_table", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("date", "date", "date", "date"), ("serial-number", "string", "serial-number", "string")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

to_spark_df4 = dropnullfields3.toDF()

with_file_name_df5 = to_spark_df4.withColumn("input_file_name", F.input_file_name()).withColumn('year', F.year(F.col("date").cast("date"))).withColumn('month', F.month(F.col("date").cast("date"))).withColumn('day', F.dayofmonth(F.col("date").cast("date")))

back_to_glue_df8 = DynamicFrame.fromDF(with_file_name_df5, glueContext, "back_to_glue_df8")


datasink4 = glueContext.write_dynamic_frame.from_options(frame = back_to_glue_df8, connection_type = "s3", connection_options = {"path": "s3://output/path","partitionKeys": ["serial-number","year", "month","day"]}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

The result is my keys in s3 end up looking like this:

serial-number=1234567890/year=__HIVE_DEFAULT_PARTITION__/month=__HIVE_DEFAULT_PARTITION__/day=__HIVE_DEFAULT_PARTITION__/part-01571-273027e4-72ba-45ff-ac15-c0bb2f342e58.c000.snappy.parquet

Update: Edited for formatting

3
  • 3
    The __HIVE_DEFAULT_PARTITION__ is created if the partitionKey has a NULL value. Was the date field populated? When you do this: .withColumn('year', F.year(F.col("date").cast("date"))) I notice you have a previous ApplyMapping that maps the fields and types from date to date, I wonder if you need to do that _after_ the dataframe is converted back to a glue dynamicframe? I look at those ApplyMapping` autogenerated steps and read the docs and apart from renaming columns or changing types I wonder what the point of it is? Perhaps its a hack to provide Glue with field metadata. Commented Aug 9, 2019 at 13:16
  • I read in Athena docs that the partition types must be primitive types, which it doesn't enumerate but I assume that means the integers returned from those year, month, dayofmonth functions should be OK. I thought perhaps I had read that the types needed to be strings, but here's a valid example very close to yours in native pyspark: stackoverflow.com/a/41739138/1335793 Here's a comment about using ApplyMapping after your newly appended columns github.com/aws-samples/aws-glue-samples/issues/… Commented Aug 9, 2019 at 13:28
  • Hi @Larry, were you able to resolve this problem ?, i am having same problem. Can you please kindly share your solution Commented Jun 10, 2022 at 8:31

1 Answer 1

3

I run a job very similar to yours. I hope you managed to solve it now, but anyway, here's the solution to your predicaments:

Basic solution:

from pyspark.sql.functions import year, month, dayofmonth

###### rest of your code until ApplyMapping included ######

# add year, month & day columns, non zero-padded
df = df.toDF()
df = df.withColumn('year', year(df.date))\
       .withColumn('month', month(df.date))\
       .withColumn('day', dayofmonth(df.date))

Additional note:

If you need to run queries on Athena where you want to select range of dates, I would suggest you avoid using nested partitioning (so year -> month -> day), but instead to use a flat partitioning schema. The reason for this, is that the query becomes much simpler to write. Here's the python code to get the flat schema:

from pyspark.sql.functions import date_format

###### rest of your code until ApplyMapping included ######

df = df.toDF()
df = df.withColumn('date_2', date_format(df.date, 'yyyy-MM-dd'))

# date_2 is because column "date" already exists,
# but we want the partitioning one to be in a string format.
# You can later drop the original column if you wish.

Let's say now you want to query your data, from the 15 of March to the 3rd of April 2020.

Here's the SQL queries based on which partitioning schema you choose.

Nested schema

SELECT item_1, item_2
  FROM my_table
 WHERE year = 2020
   AND (
          (month = 3 AND day >= 15)
       OR (month = 4 AND day <= 3)
       )

Flat schema

SELECT item_1, item_2
  FROM my_table
 WHERE date BETWEEN '2020-03-15' AND '2020-04-3'

Also, given that your 'date' column is stored as a string, you'll be able to run queries using the LIKE operator.

For example, if you want to query all data from each April in your database, you can do:

SELECT item_1, item_2
  FROM my_table
 WHERE date LIKE '%-04-%'
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.