0

I'm trying to move ServiceBus triggered message to deadletter queue inside once of the Azure durable function activity.

[FunctionName(nameof(CaviContainerStart))]
public async Task CaviContainerStart(
[ServiceBusTrigger(
    topicName: "cavitopic",
    subscriptionName: "monitor",
    Connection = "ServiceBusConnection"
)]
    ServiceBusReceivedMessage message,
[DurableClient] IDurableOrchestrationClient starter){


string messageJson = SerializeMessage(message);

// Function input comes from the request content.
string instanceId = await starter.StartNewAsync(
    nameof(CaviContainerOrchestrator), null,
    messageJson
);

_logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId);

}

// Convert to JSON

public string SerializeMessage(ServiceBusReceivedMessage message)
{
    var messageInfo = new ServiceBusMessageInput
    {
        MessageId = message.MessageId,
        Body = message.Body.ToString, // Convert body to Base64 string
        EnqueuedTime = message.EnqueuedTime,
        ApplicationProperties = message.ApplicationProperties.ToDictionary(
    kvp => kvp.Key,
    kvp => (object)kvp.Value // Convert value to object for serialization
),
        SequenceNumber = message.SequenceNumber
    }; ;
    return JsonConvert.SerializeObject(messageInfo);
}

Inside the orchestrator need to deserialize it to ServiceBusReceivedMessage. I'm getting empty ServiceBusReceivedMessage object. Is it a correct way to deserialize it?

[FunctionName(nameof(CaviContainerOrchestrator))]
public async Task CaviContainerOrchestrator(
    [OrchestrationTrigger] IDurableOrchestrationContext context
)
{
 string messageJson = context.GetInput<string>();

 // Deserialize to ServiceBusReceivedMessage
 ServiceBusReceivedMessage messageInfo = JsonConvert.DeserializeObject<ServiceBusReceivedMessage>(messageJson);
}

Note: Since message will be deleted from ServiceBus topic once ServiceBus starter function got triggered there is no way to fetch message based on messageid. Inside the durable function to move message to deadletter queue, I guess only way to serialize the object while passing as an input and deserialize it in the orchestrator. It would be helpful to know the way to pass servicebus message as an input to the orchestrator.

1 Answer 1

0

I am able to pass Service Bus message as an input to the orchestrator using below code.

using System;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Azure.Messaging.ServiceBus;
using System.Linq;


namespace _78855769
{
    public class Function1
    {
        [FunctionName(nameof(CaviContainerOrchestrator))]
        public async Task CaviContainerOrchestrator(
        [OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
        {
            string messageJson = context.GetInput<string>();
            ServiceBusMessageInput messageInfo = JsonConvert.DeserializeObject<ServiceBusMessageInput>(messageJson);

            log.LogInformation($"Message Info: {$"Message Info: {JsonConvert.SerializeObject(messageInfo, Formatting.Indented)}"}");
        }

        [FunctionName(nameof(CaviContainerStart))]
        public async Task CaviContainerStart(
        [ServiceBusTrigger(
        topicName: "cavitopic",
        subscriptionName: "monitor",
        Connection = "ServiceBusConnectionString"
        )]
        ServiceBusReceivedMessage message,
        [DurableClient] IDurableOrchestrationClient starter, ILogger log)
        {
            string messageJson = SerializeMessage(message);

            string instanceId = await starter.StartNewAsync(
                nameof(CaviContainerOrchestrator), null, messageJson
            );

            log.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId);
        }

        public string SerializeMessage(ServiceBusReceivedMessage message)
        {
            var messageInfo = new ServiceBusMessageInput
            {
                MessageId = message.MessageId,
                Body = Convert.ToBase64String(message.Body.ToArray()), 
                EnqueuedTime = message.EnqueuedTime,
                ApplicationProperties = message.ApplicationProperties.ToDictionary(
                    kvp => kvp.Key,
                    kvp => (object)kvp.Value
                ),
                SequenceNumber = message.SequenceNumber
            };
            return JsonConvert.SerializeObject(messageInfo);
        }
    }
}

I can get the Service Bus message info.

Azure Functions Core Tools
Core Tools Version:       4.0.5907 Commit hash: N/A +807e89766a92b14fd07b9f0bc2bea1d8777ab209 (64-bit)
Function Runtime Version: 4.834.3.22875

[2024-08-12T08:25:36.236Z] Found C:\Users\******\78855769\78855769.csproj. Using for user secrets file configuration.

Functions:

        CaviContainerOrchestrator: orchestrationTrigger

        CaviContainerStart: serviceBusTrigger

For detailed output, run func with --verbose flag.
[2024-08-12T08:25:44.691Z] Host lock lease acquired by instance ID '0000000000000000000000000D2022A4'.
[2024-08-12T08:26:07.946Z] Executing 'CaviContainerStart' (Reason='(null)', Id=00e74b68-ea8c-46a9-9e05-8a2b24f6dc7d)
[2024-08-12T08:26:07.951Z] Trigger Details: MessageId: cb704fdc7f7e478ab08ca69046bd54b4, SequenceNumber: 6, DeliveryCount: 1, EnqueuedTimeUtc: 2024-08-12T08:26:07.7090000+00:00, LockedUntilUtc: 2024-08-12T08:27:07.7250000+00:00, SessionId: (null)
[2024-08-12T08:26:08.057Z] Started orchestration with ID = '7aaef82c166f43abaa7882d743f1393e'.
[2024-08-12T08:26:08.077Z] Executed 'CaviContainerStart' (Succeeded, Id=00e74b68-ea8c-46a9-9e05-8a2b24f6dc7d, Duration=196ms)
[2024-08-12T08:26:08.215Z] Executing 'CaviContainerOrchestrator' (Reason='(null)', Id=c08cd84e-a982-4666-ab58-b7fb5a6d7954)
[2024-08-12T08:26:08.253Z] Message Info: Message Info: {
[2024-08-12T08:26:08.256Z]   "MessageId": "cb704fdc7f7e478ab08ca69046bd54b4",
[2024-08-12T08:26:08.259Z]   "Body": "SGVsbG8=",
[2024-08-12T08:26:08.260Z]   "EnqueuedTime": "2024-08-12T08:26:07.709+00:00",
[2024-08-12T08:26:08.261Z]   "ApplicationProperties": {},
[2024-08-12T08:26:08.262Z]   "SequenceNumber": 6
[2024-08-12T08:26:08.263Z] }
[2024-08-12T08:26:08.268Z] Executed 'CaviContainerOrchestrator' (Succeeded, Id=c08cd84e-a982-4666-ab58-b7fb5a6d7954, Duration=58ms)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.