20

I have an S3 bucket which is constantly being filled with new data, I am using Athena and Glue to query that data, the thing is if glue doesn't know that a new partition is created it doesn't search that it needs to search there. If I make an API call to run the Glue crawler each time I need a new partition is too expensive so the best solution to do this is to tell glue that a new partition is added i.e to create a new partition is in it's properties table. I looked through AWS documentation but no luck, I am using Java with AWS. Any help?

5
  • Too expensive in terms of computation or money? Commented Jun 1, 2018 at 10:00
  • Money wise, it's not a difficult operation to use CPU that much. Commented Jun 1, 2018 at 13:13
  • then if you know when new partitions are added try the #3 option from my answer. Commented Jun 2, 2018 at 8:05
  • @Gudzo - Can you accept my answer if it helped? Commented Feb 26, 2020 at 5:51
  • @Gudzo Hello? Checking in if you can accept my solution Commented Jun 9, 2020 at 22:52

4 Answers 4

37

You may want to use batch_create_partition() glue api to register new partitions. It doesn't require any expensive operation like MSCK REPAIR TABLE or re-crawling.

I had a similar use case for which I wrote a python script which does the below -

Step 1 - Fetch the table information and parse the necessary information from it which is required to register the partitions.

# Fetching table information from glue catalog
logger.info("Fetching table info for {}.{}".format(l_database, l_table))
try:
    response = l_client.get_table(
        CatalogId=l_catalog_id,
        DatabaseName=l_database,
        Name=l_table
    )
except Exception as error:
    logger.error("Exception while fetching table info for {}.{} - {}"
                 .format(l_database, l_table, error))
    sys.exit(-1)

# Parsing table info required to create partitions from table
input_format = response['Table']['StorageDescriptor']['InputFormat']
output_format = response['Table']['StorageDescriptor']['OutputFormat']
table_location = response['Table']['StorageDescriptor']['Location']
serde_info = response['Table']['StorageDescriptor']['SerdeInfo']
partition_keys = response['Table']['PartitionKeys']

Step 2 - Generate a dictionary of lists where each list contains the information to create a single partition. All lists will have same structure but their partition specific values will change (year, month, day, hour)

def generate_partition_input_list(start_date, num_of_days, table_location,
                                  input_format, output_format, serde_info):
    input_list = []  # Initializing empty list
    today = datetime.utcnow().date()
    if start_date > today:  # To handle scenarios if any future partitions are created manually
        start_date = today
    end_date = today + timedelta(days=num_of_days)  # Getting end date till which partitions needs to be created
    logger.info("Partitions to be created from {} to {}".format(start_date, end_date))

    for input_date in date_range(start_date, end_date):
        # Formatting partition values by padding required zeroes and converting into string
        year = str(input_date)[0:4].zfill(4)
        month = str(input_date)[5:7].zfill(2)
        day = str(input_date)[8:10].zfill(2)
        for hour in range(24):  # Looping over 24 hours to generate partition input for 24 hours for a day
            hour = str('{:02d}'.format(hour))  # Padding zero to make sure that hour is in two digits
            part_location = "{}{}/{}/{}/{}/".format(table_location, year, month, day, hour)
            input_dict = {
                'Values': [
                    year, month, day, hour
                ],
                'StorageDescriptor': {
                    'Location': part_location,
                    'InputFormat': input_format,
                    'OutputFormat': output_format,
                    'SerdeInfo': serde_info
                }
            }
            input_list.append(input_dict.copy())
    return input_list

Step 3 - Call the batch_create_partition() API

for each_input in break_list_into_chunks(partition_input_list, 100):
    create_partition_response = client.batch_create_partition(
        CatalogId=catalog_id,
        DatabaseName=l_database,
        TableName=l_table,
        PartitionInputList=each_input
    )

There is a limit of 100 partitions in a single api call, So if you are creating more than 100 partitions then you will need to break your list into chunks and iterate over it.

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.batch_create_partition

Sign up to request clarification or add additional context in comments.

1 Comment

Hey @conetfun, I tried to find the limit on the number of partition in a single API. Is there any official documentation where it has been mentioned?
12
  1. You can configure you're glue crawler to get triggered every 5 mins

  2. You can create a lambda function which will either run on schedule, or will be triggered by an event from your bucket (eg. putObject event) and that function could call athena to discover partitions:

     import boto3
    
     athena = boto3.client('athena')
    
     def lambda_handler(event, context):
         athena.start_query_execution(
             QueryString = "MSCK REPAIR TABLE mytable",
             ResultConfiguration = {
                 'OutputLocation': "s3://some-bucket/_athena_results"
             }
    
  3. Use Athena to add partitions manualy. You can also run sql queries via API like in my lambda example.

    Example from Athena manual:

     ALTER TABLE orders ADD
       PARTITION (dt = '2016-05-14', country = 'IN') LOCATION 's3://mystorage/path/to/INDIA_14_May_2016'
       PARTITION (dt = '2016-05-15', country = 'IN') LOCATION 's3://mystorage/path/to/INDIA_15_May_2016';
    

2 Comments

"You can configure you're glue catalog to get triggered every 5 mins" ... what does that mean please ?
Oh, sorry - I meant to write "You can configure you're glue crawler to get triggered every 5 mins". I will update my answer.
6

This question is old but I wanted to put it out there that someone could have s3:ObjectCreated:Put notifications trigger a Lambda function which registers new partitions when data arrives on S3. I would even expand this function to handle deprecations based on object deletes and so on. Here's a blog post by AWS which details S3 event notifications: https://aws.amazon.com/blogs/aws/s3-event-notification/

Comments

4

AWS Glue recently added a RecrawlPolicy that only crawls the new folders/paritions that you add to your S3 bucket.

https://docs.aws.amazon.com/glue/latest/dg/incremental-crawls.html

This should help you with minimizing crawling all the data again an again. From what I read, you can define incremental crawls while setting up your crawler, or editing an existing one. One thing however to note is that incremental crawls require the schema of new data to be more or less the same as existing schema.

2 Comments

That might work for some very simple schemas ... but just forget Crawlers for more complex schemas and go for the strategies listed above.
@MáriodeSáVera true, for complex structures crawler cant infer the schema correctly i.e. if CSV file has field values like "01" then crawler infers it as 1. In such cases crawler -> recrawl doesnt makes any sense

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.