Code Snippet :
def write_alert_summary_df_docdb(alert_df,database,collection):
try:
update_df = DynamicFrame.fromDF(alert_df.select("*",lit("update").alias("status")), glueContext,"update_df")
glueContext.write_dynamic_frame.from_options(
frame = update_df,
connection_type="mongodb",
connection_options={
"connectionName": glue_mongoconnection_name,
"database": database,
"collection": collection,
"operationType": "update",
"updateKey": "id",
"upsertDocument": "false",
"updateOperators": {
"$set": {
"alert_generated_time": "$$ROOT.alert_generated_time",
"status": "$$ROOT.status"
}
}
}
)
print (f"[UPDATE] Write Summary Data to {database}.{collection} \n")
insert_df = DynamicFrame.fromDF(alert_df.select("*",lit("new").alias("status")), glueContext,"insert_df")
glueContext.write_dynamic_frame.from_options(
frame=insert_df,
connection_type="mongodb",
connection_options={
"connectionName": glue_mongoconnection_name,
"database": database,
"collection": mongodb_alert_summary_collection,
"operationType": "update",
"updateKey": "id",
"upsertDocument": "true",
"updateOperators": {
"$setOnInsert": {
"tenantid": "$$ROOT.tenantid",
"alert_generated_time": "$$ROOT.alert_generated_time",
"status": "$$ROOT.status"
}
}
}
)
print (f"[NEW] Write Summary Data to {database}.{collection} \n")
return "done"
except Exception as e:
logger.error(f"Upsert failed: {str(e)}")
raise
Note :
- The alert Df has 2 records.
- The Mongodb is new with no documents in it.
Expected Behaviour :
Now , Lets assume the mongodb collection is empty. Hence, When we try to write the update_df , there is no matching id in mongodb , and upsert is False. hence this operation does nothing. Now we move to the next , insert_df , which is the same as update_df but differs in status column value. Here, since there is no data in Mongo collection, hence, it will be inserted, Resulting in insertion of 2 Records into MongoDB collection.
Now let's take the case where the same code is rerun.
Now mongo collection has the same 2 records with status = new .
hence now , when the update_df is created and write operation starts, It gets a match on id , and it will go ahead and update the required columns. Moving on, when the insert_df is created and write operation begins, it again gets a match on id, and hence it will not insert anything and exit.
hence after 2nd run, there should be 2 records with the status = update
Current Behaviour : After the first run , i.e when the collection is empty, there are 4 records in the collection 2 each with status new and update. Upon rerunning the code snippet, we end up having 4 new records, i.e total of 8 records.
So there is some issue with Dynamic Frames as it doesn't respond to the provided parameters for upsert.
How/Why did I use the aforementioned parameters?
Ans - There is absolutely no documentation on using Dynamic Frames (AWS Glue) to UPSERT/GGREGATE/FILTER from MongoDB. However, With the help this documentation - https://www.mongodb.com/docs/spark-connector/master/batch-mode/batch-read-config/#overview
I tweaked my code of filtering and it worked.
Filtering Code :
documents_dynamic_frame = glueContext.create_dynamic_frame.from_options(
connection_type="mongodb",
connection_options={
"connectionName": glue_mongoconnection_name,
"database": db_name,
"collection": coll_name,
"aggregation.pipeline": f"""[
{{
"$match": {{
"alerttype": "{alert_type}",
"alertkey": "{alert_key}",
"status": "Enabled"
}}
}}
]"""
}
)
Hence assuming this documentation holds true, and with my prior knowledge of writing Spark Dataframes in UPSERT mode, I wrote the UPSERT code. But that code doesn't work.
My ask is to find a way to UPSERT to MongoDB Collection using Dynamic Frames or if there is any better way of doing it.