1

I have an Azure Durable Function that is triggered when users send SMS messages. I'm using the function to collect multiple messages if they arrive within a short duration, and create one consolidated message.

The first message triggers the function and calls the start_new method of the durable function orchestrator, subsequent messages raise a new_message event

The issue I'm facing is that if two messages arrive very close, both of them start the orchestrator because the second message arrives before the orchestration for the first message was initialized. So the durable function runtime starts a new orchestration for each message, because of which one of the messages overwrites the other since they both belong to the same orchestration with the same predetermined orchestration id.

Is this a known issue and how can I get around it?

Here's the code I have:

Queue Trigger (run when a new message arrives on a message queue)


import json
import asyncio

import azure.functions as func

import azure.durable_functions as df
from azure.durable_functions.models.OrchestrationRuntimeStatus import OrchestrationRuntimeStatus

from lib.logtrail import get_logger
logtrail = get_logger("on_new_message_fragment")


async def is_function_running(orchestrator, instance_id):
    status = await orchestrator.get_status(instance_id)

    if status is None:
        return False

    return status.runtime_status not in [
        OrchestrationRuntimeStatus.Completed,
        OrchestrationRuntimeStatus.Terminated,
        OrchestrationRuntimeStatus.Failed,
        OrchestrationRuntimeStatus.Canceled,
        OrchestrationRuntimeStatus.Suspended,
    ]


async def main(message: func.QueueMessage, starter: str):
    orchestrator = df.DurableOrchestrationClient(starter)

    payload = json.loads(message.get_body().decode("utf-8"))

    user_phone = payload["phone"]

    instance_id = f"message-collector:{user_phone}"

    # check if the orchestration is already running...
    if await is_function_running(orchestrator, instance_id):
        logtrail.info(f"{instance_id} is running; appending {payload}")

        await orchestrator.raise_event(instance_id, "new-message", payload)

    # ...otherwise try to start a new one...
    else:
        try:
            logtrail.info(f"{instance_id} is NOT running; starting new with {payload}")

            await orchestrator.start_new(
                "collect_messages",
                instance_id=instance_id,
                client_input=payload
            )

        # ...and if even that fails, try sending an event again
        except Exception as e:
            if "already exists" in str(e).lower():
                logtrail.info(f"Race condition for {instance_id}; sending event instead")

                await orchestrator.raise_event(instance_id, "new-message", payload)

            else:
                logtrail.error(f"Failed to start new orchestration: {e}")

                raise

The Orchestrator Function

import os
import json

import azure.durable_functions as df

from datetime import timedelta

from lib.logtrail import get_logger
logtrail = get_logger("message_collector")


# buffer messages by waiting for new messages for 10 seconds,
# then send them
def collect_messages(context: df.DurableOrchestrationContext):
    user_message = context.get_input()

    # user_phone = user_message.get("phone")

    timeout_duration = int(os.getenv("MESSAGE_COLLECTOR_TIMEOUT_DURATION", 10)) # seconds

    full_message = { **user_message }

    while True:
        message_task = context.wait_for_external_event("new-message")
        timeout_task = context.create_timer(context.current_utc_datetime + timedelta(seconds=timeout_duration))

        completed_task = yield context.task_any([message_task, timeout_task])

        if completed_task == timeout_task:
            break

        timeout_task.cancel()

        new_message = json.loads(completed_task.result) # same schema as the original user_message

        full_message = yield context.call_activity("append_message_fragment", {
            "full_message": full_message,
            "message_fragment": new_message
        })

    yield context.call_activity("send_message_for_analysis", full_message)


main = df.Orchestrator.create(collect_messages)
1
  • I created a diagram to help me understand the code. Here is a diagram for the queue trigger . Here is a diagram for the orchestration Commented Jul 3 at 15:51

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.