0

I have an Azure Durable function in which I need to read a few blobs in "blobs" path, upload a new blob and once that's complete, delete the blobs in "blobs" path

filePath = await context.CallActivityAsync<string>(nameof(UpoadFunction));
await context.CallActivityAsync<string>(nameof(DeletFunction));

// UploadFunction calls SendAsync()

 public async Task<string> SendAsync()
 {           
            string prefix = "blobs";
            var blobResult = await ReadAsync(prefix);
            var fileName = $"new.json";
            await UploadAsync(fileName, JsonSerializer.Serialize(blobResult));       
            return fileName;
}



     public async Task<List<AzureADUser>> ReadAsync(string path)
      {
          var blobs = _containerClient.GetBlobsAsync(prefix: path);
          var blobReaderTasks = new List<Task<Stream>>();
          await foreach (BlobItem blobItem in blobs)
          {
              blobReaderTasks.Add(_containerClient.GetBlobClient(blobItem.Name).OpenReadAsync());
          }

      var blobStreams = await Task.WhenAll(blobReaderTasks);
      var blobDeserializationTasks = blobStreams
          .Select(async stream =>
          {
              var options = new JsonSerializerOptions
              {
                  PropertyNameCaseInsensitive = true
              };
              var users = await JsonSerializer.DeserializeAsync<List<AzureADUser>>(stream, options);
              if (users == null)
              {
                  throw new Exception("Failed to deserialize blob");
              }
              return users;
          })
          .ToArray();

      var adUsers = await Task.WhenAll(blobDeserializationTasks);
      return adUsers.SelectMany(userList => userList).ToList();
  }



public async Task UploadAsync(string path, string content, Dictionary<string, string> metadata = null)
 {
     var blobClient = _containerClient.GetBlobClient(path);
     await blobClient.UploadAsync(BinaryData.FromString(content), overwrite: true);

     if (metadata != null && metadata.Count > 0)
         blobClient.SetMetadata(metadata);
 }

// DeletFunction calls DeleteAsync()
public async Task DeleteAsync(string path)
        {
            var blobItems = _containerClient.GetBlobsAsync(prefix: "blobs");
            var deleteTasks = new List<Task>();
            await foreach (BlobItem blobItem in blobItems)
            {
                var blobClient = _containerClient.GetBlobClient(blobItem.Name);
                deleteTasks.Add(blobClient.DeleteIfExistsAsync());
            }
            await Task.WhenAll(deleteTasks);
        }

The issue that I notice is that, if I have DeletFunction uncommented, I see less number of blobs being read & uploaded blob has less number of objects. If I have DeletFunction commented, I see it reading all the blobs & uploading blob with all objects. What am I missing?

await context.CallActivityAsync<string>(nameof(DeletFunction));
1
  • Make sure to call DeleteFunction only after the upload is completed, so the blobs will not delete while you are still using them. Commented Apr 10 at 16:54

1 Answer 1

0

Deleting blobs only after reading blobs using c#

The Problem is due to your code is Reading and deleting blobs are running at the same time, so some blobs got deleted before they were fully read.

ReadAsync() returns deserialized data, but the blob streams may still be in use or not fully processed before.
If DeleteAsync() runs after, it may delete blobs before ReadAsync() has fully finished working with them.

So I removed the separate upload and delete steps and kept read, upload, and delete into one function so they run in the right order.

ProcessBlobsActivity.cs:

    public class ProcessBlobsActivity
    {
        [Function("ProcessBlobsActivity")]
        public async Task<string> Run(
            [ActivityTrigger] string prefix,
            FunctionContext context)
        {
            var logger = context.GetLogger("ProcessBlobsActivity");
            var connectionString = <Your Connection string>;
            var containerClient = new BlobContainerClient(connectionString, "test1");
            logger.LogInformation("Starting blob processing for prefix: {0}", prefix);
            var users = await ReadBlobsAsync(containerClient, prefix, logger);
            if (users.Count == 0)
            {
                logger.LogWarning("No users found in blobs. Uploading skipped.");
                return "No users found";
            }
            var outputBlobName = $"{prefix}combined-users.json";
            await UploadCombinedUsersAsync(containerClient, outputBlobName, users, logger);
            await DeleteOriginalBlobsAsync(containerClient, prefix, logger);
            logger.LogInformation("Processing complete. Output blob: {0}", outputBlobName);
            return outputBlobName;
        }
        private async Task<List<AzureADUser>> ReadBlobsAsync(BlobContainerClient containerClient, string prefix, ILogger logger)
        {
            var users = new List<AzureADUser>();
            var found = false;
            await foreach (var blob in containerClient.GetBlobsAsync(prefix: prefix))
            {
                found = true;
                var blobClient = containerClient.GetBlobClient(blob.Name);
                logger.LogInformation("Reading blob: {0}", blob.Name);
                using var stream = await blobClient.OpenReadAsync();
                var deserialized = await JsonSerializer.DeserializeAsync<List<AzureADUser>>(stream, new JsonSerializerOptions
                {
                    PropertyNameCaseInsensitive = true
                });
                if (deserialized != null)
                    users.AddRange(deserialized);
            }
            if (!found)
                logger.LogWarning("No blobs found under prefix: {0}", prefix);
            return users;
        }
        private async Task UploadCombinedUsersAsync(BlobContainerClient containerClient, string blobName, List<AzureADUser> users, ILogger logger)
        {
            var blobClient = containerClient.GetBlobClient(blobName);
            var json = JsonSerializer.Serialize(users);
            await blobClient.UploadAsync(BinaryData.FromString(json), overwrite: true);
            logger.LogInformation("Uploaded combined users to blob: {0}", blobName);
        }
        private async Task DeleteOriginalBlobsAsync(BlobContainerClient containerClient, string prefix, ILogger logger)
        {
            var found = false;
            await foreach (var blob in containerClient.GetBlobsAsync(prefix: prefix))
            {
                found = true;
                var blobClient = containerClient.GetBlobClient(blob.Name);
                logger.LogInformation("Deleting blob: {0}", blob.Name);
                await blobClient.DeleteIfExistsAsync();
            }
            if (!found)
                logger.LogWarning("No blobs to delete under prefix: {0}", prefix);
        }
    }

OrchestratorFunction.cs:

[Function("OrchestratorFunction")]
        public static async Task<string> RunOrchestrator(
            [OrchestrationTrigger] TaskOrchestrationContext context)
        {
            return await context.CallActivityAsync<string>("ProcessBlobsActivity", "blobs/");
        }
        [Function("OrchestratorFunction_HttpStart")]
        public static async Task<HttpResponseData> HttpStart(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
            [DurableClient] DurableTaskClient client,
            FunctionContext executionContext)
        {
            var logger = executionContext.GetLogger("HttpStart");
            string instanceId = await client.ScheduleNewOrchestrationInstanceAsync("OrchestratorFunction");
            logger.LogInformation($"Started orchestration with ID = '{instanceId}'.");
            return await client.CreateCheckStatusResponseAsync(req, instanceId);
        }    

Output:

Upload

Upload

Upload

Reference:

Delete and restore a blob with .NET

Sign up to request clarification or add additional context in comments.

1 Comment

Posting this yet again: please stop asking people if a solution works or if you can further support them - this is not an official support channel (and you've now posted a follow-up twice for this answer).

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.