I have an Azure Durable Function that is triggered when users send SMS messages. I'm using the function to collect multiple messages if they arrive within a short duration, and create one consolidated message.
The first message triggers the function and calls the start_new method of the durable function orchestrator, subsequent messages raise a new_message event
The issue I'm facing is that if two messages arrive very close, both of them start the orchestrator because the second message arrives before the orchestration for the first message was initialized. So the durable function runtime starts a new orchestration for each message, because of which one of the messages overwrites the other since they both belong to the same orchestration with the same predetermined orchestration id.
Is this a known issue and how can I get around it?
Here's the code I have:
Queue Trigger (run when a new message arrives on a message queue)
import json
import asyncio
import azure.functions as func
import azure.durable_functions as df
from azure.durable_functions.models.OrchestrationRuntimeStatus import OrchestrationRuntimeStatus
from lib.logtrail import get_logger
logtrail = get_logger("on_new_message_fragment")
async def is_function_running(orchestrator, instance_id):
status = await orchestrator.get_status(instance_id)
if status is None:
return False
return status.runtime_status not in [
OrchestrationRuntimeStatus.Completed,
OrchestrationRuntimeStatus.Terminated,
OrchestrationRuntimeStatus.Failed,
OrchestrationRuntimeStatus.Canceled,
OrchestrationRuntimeStatus.Suspended,
]
async def main(message: func.QueueMessage, starter: str):
orchestrator = df.DurableOrchestrationClient(starter)
payload = json.loads(message.get_body().decode("utf-8"))
user_phone = payload["phone"]
instance_id = f"message-collector:{user_phone}"
# check if the orchestration is already running...
if await is_function_running(orchestrator, instance_id):
logtrail.info(f"{instance_id} is running; appending {payload}")
await orchestrator.raise_event(instance_id, "new-message", payload)
# ...otherwise try to start a new one...
else:
try:
logtrail.info(f"{instance_id} is NOT running; starting new with {payload}")
await orchestrator.start_new(
"collect_messages",
instance_id=instance_id,
client_input=payload
)
# ...and if even that fails, try sending an event again
except Exception as e:
if "already exists" in str(e).lower():
logtrail.info(f"Race condition for {instance_id}; sending event instead")
await orchestrator.raise_event(instance_id, "new-message", payload)
else:
logtrail.error(f"Failed to start new orchestration: {e}")
raise
The Orchestrator Function
import os
import json
import azure.durable_functions as df
from datetime import timedelta
from lib.logtrail import get_logger
logtrail = get_logger("message_collector")
# buffer messages by waiting for new messages for 10 seconds,
# then send them
def collect_messages(context: df.DurableOrchestrationContext):
user_message = context.get_input()
# user_phone = user_message.get("phone")
timeout_duration = int(os.getenv("MESSAGE_COLLECTOR_TIMEOUT_DURATION", 10)) # seconds
full_message = { **user_message }
while True:
message_task = context.wait_for_external_event("new-message")
timeout_task = context.create_timer(context.current_utc_datetime + timedelta(seconds=timeout_duration))
completed_task = yield context.task_any([message_task, timeout_task])
if completed_task == timeout_task:
break
timeout_task.cancel()
new_message = json.loads(completed_task.result) # same schema as the original user_message
full_message = yield context.call_activity("append_message_fragment", {
"full_message": full_message,
"message_fragment": new_message
})
yield context.call_activity("send_message_for_analysis", full_message)
main = df.Orchestrator.create(collect_messages)