0

Code Snippet :

def write_alert_summary_df_docdb(alert_df,database,collection):
    try:
        
        update_df = DynamicFrame.fromDF(alert_df.select("*",lit("update").alias("status")), glueContext,"update_df")
        
        
        glueContext.write_dynamic_frame.from_options(
            frame = update_df,
            connection_type="mongodb",
            connection_options={
                "connectionName": glue_mongoconnection_name,
                "database": database,
                "collection": collection,
                "operationType": "update",
                "updateKey": "id",
                "upsertDocument": "false",
                "updateOperators": {
                    "$set": {
                        "alert_generated_time": "$$ROOT.alert_generated_time",
                        "status": "$$ROOT.status"
                    }
                }
            }
        )
        print (f"[UPDATE] Write Summary Data to {database}.{collection} \n")
        
        insert_df = DynamicFrame.fromDF(alert_df.select("*",lit("new").alias("status")), glueContext,"insert_df")
        
        glueContext.write_dynamic_frame.from_options(
            frame=insert_df,
            connection_type="mongodb",
            connection_options={
                "connectionName": glue_mongoconnection_name,
                "database": database,
                "collection": mongodb_alert_summary_collection,
                "operationType": "update",
                "updateKey": "id",
                "upsertDocument": "true",
                "updateOperators": {
                    "$setOnInsert": {
                        "tenantid": "$$ROOT.tenantid",
                        "alert_generated_time": "$$ROOT.alert_generated_time",
                        "status": "$$ROOT.status"
                    }
                }
            }
        )
        
        print (f"[NEW] Write Summary Data to {database}.{collection} \n")
        
        return "done"
    except Exception as e:
        logger.error(f"Upsert failed: {str(e)}")
        raise

Note :

  1. The alert Df has 2 records.
  2. The Mongodb is new with no documents in it.

Expected Behaviour :

Now , Lets assume the mongodb collection is empty. Hence, When we try to write the update_df , there is no matching id in mongodb , and upsert is False. hence this operation does nothing. Now we move to the next , insert_df , which is the same as update_df but differs in status column value. Here, since there is no data in Mongo collection, hence, it will be inserted, Resulting in insertion of 2 Records into MongoDB collection.

Now let's take the case where the same code is rerun.

Now mongo collection has the same 2 records with status = new .

hence now , when the update_df is created and write operation starts, It gets a match on id , and it will go ahead and update the required columns. Moving on, when the insert_df is created and write operation begins, it again gets a match on id, and hence it will not insert anything and exit.

hence after 2nd run, there should be 2 records with the status = update

Current Behaviour : After the first run , i.e when the collection is empty, there are 4 records in the collection 2 each with status new and update. Upon rerunning the code snippet, we end up having 4 new records, i.e total of 8 records.

So there is some issue with Dynamic Frames as it doesn't respond to the provided parameters for upsert.

How/Why did I use the aforementioned parameters?

Ans - There is absolutely no documentation on using Dynamic Frames (AWS Glue) to UPSERT/GGREGATE/FILTER from MongoDB. However, With the help this documentation - https://www.mongodb.com/docs/spark-connector/master/batch-mode/batch-read-config/#overview

I tweaked my code of filtering and it worked.

Filtering Code :

documents_dynamic_frame = glueContext.create_dynamic_frame.from_options(
            connection_type="mongodb",
            connection_options={
                "connectionName": glue_mongoconnection_name,
                "database": db_name,
                "collection": coll_name,
                "aggregation.pipeline": f"""[
                    {{
                        "$match": {{
                            "alerttype": "{alert_type}",
                            "alertkey": "{alert_key}",
                            "status": "Enabled"
                        }}
                    }}
                ]"""
            }
        )

Hence assuming this documentation holds true, and with my prior knowledge of writing Spark Dataframes in UPSERT mode, I wrote the UPSERT code. But that code doesn't work.

My ask is to find a way to UPSERT to MongoDB Collection using Dynamic Frames or if there is any better way of doing it.

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.