I have CSV data that is crawled via a glue crawler, and ends up in one table.
I'm trying to run an ETL job to re-partition the data on disk into some components of the date column. Then convert the CSV to parquet.
i.e. I have a column named "date" in my data, and wanted to partition the data into year, month,day partitions on s3.
I am able to convert to parquet and get it to partition correctly on the serial number value (a different column), but it puts the value "__HIVE_DEFAULT_PARTITION__" in for all the values year, month, and day for the date related partitions.
I am able to partition on other columns (like serial-number), but the year/month/day are not in the original data set, and so my approach has been to create the values from the date column as new columns in the data set and tell the write_dynamic_frame function to partition by the columns, but that isn't working.
I'm new to spark/pyspark and glue in general, so there is a very real possibility that i'm missing something simple.
Thanks to anyone who offers assistance.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql import functions as F
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "my_database", table_name = "my_table", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("date", "date", "date", "date"), ("serial-number", "string", "serial-number", "string")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
to_spark_df4 = dropnullfields3.toDF()
with_file_name_df5 = to_spark_df4.withColumn("input_file_name", F.input_file_name()).withColumn('year', F.year(F.col("date").cast("date"))).withColumn('month', F.month(F.col("date").cast("date"))).withColumn('day', F.dayofmonth(F.col("date").cast("date")))
back_to_glue_df8 = DynamicFrame.fromDF(with_file_name_df5, glueContext, "back_to_glue_df8")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = back_to_glue_df8, connection_type = "s3", connection_options = {"path": "s3://output/path","partitionKeys": ["serial-number","year", "month","day"]}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
The result is my keys in s3 end up looking like this:
serial-number=1234567890/year=__HIVE_DEFAULT_PARTITION__/month=__HIVE_DEFAULT_PARTITION__/day=__HIVE_DEFAULT_PARTITION__/part-01571-273027e4-72ba-45ff-ac15-c0bb2f342e58.c000.snappy.parquet
Update: Edited for formatting
__HIVE_DEFAULT_PARTITION__is created if the partitionKey has a NULL value. Was thedatefield populated? When you do this:.withColumn('year', F.year(F.col("date").cast("date")))I notice you have a previousApplyMappingthat maps the fields and types fromdateto date, I wonder if you need to do that _after_ the dataframe is converted back to a glue dynamicframe? I look at thoseApplyMapping` autogenerated steps and read the docs and apart from renaming columns or changing types I wonder what the point of it is? Perhaps its a hack to provide Glue with field metadata.year,month,dayofmonthfunctions should be OK. I thought perhaps I had read that the types needed to be strings, but here's a valid example very close to yours in native pyspark: stackoverflow.com/a/41739138/1335793 Here's a comment about using ApplyMapping after your newly appended columns github.com/aws-samples/aws-glue-samples/issues/…