I have a function app with a http trigger function which takes an orchestrator function name. I have 2 orchestrator functions for differing workflows. Both of the orchestrator functions need to read a list of files from blob and upload data to blob following activity function execution. Following best practices I should have the Blob I/O functions (list,upload) as activity functions and call from orchestrator (deterministic, no I/O in orchestrator). I have looked at the example from Azure for fan out/in with durable functions (https://github.com/Azure/azure-functions-durable-python/blob/dev/samples-v2/fan_in_fan_out/function_app.py). However, in this example the BlobConnection() is called in the BlobUpload() function. But ideally I want a class that accepts a blob container name and creates a connection and functions such as List, Upload, Download can inherit this connection - so I am not creating the connection in each activity function. Can I somehow have a class as a function? Or do I need to recreate the connection string in each Blob I/O activity function? Thanks
1 Answer
You can use helper function in your durable function.
I have created a get_blob_service_client helper function for connection.
def get_blob_service_client():
connect_str = os.getenv('AzureWebJobsStorage')
return BlobServiceClient.from_connection_string(connect_str)
I have made some changes in function which you provided.
function_app.py
import os
import pathlib
import logging
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
import azure.functions as func
import azure.durable_functions as df
myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
def get_blob_service_client():
connect_str = os.getenv('AzureWebJobsStorage')
return BlobServiceClient.from_connection_string(connect_str)
@myApp.route(route="orchestrators/{functionName}")
@myApp.durable_client_input(client_name="client")
async def HttpStart(req: func.HttpRequest, client):
instance_id = await client.start_new(req.route_params["functionName"])
logging.info(f"Started orchestration with ID = '{instance_id}'.")
return client.create_check_status_response(req, instance_id)
@myApp.orchestration_trigger(context_name="context")
def E2_BackupSiteContent(context: df.DurableOrchestrationContext):
root_directory: str = r"C:\Users\< your username>\Desktop\test"
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
@myApp.activity_trigger(input_name="rootDirectory")
def E2_GetFileList(rootDirectory):
all_file_paths = []
for path, _, files in os.walk(rootDirectory):
for name in files:
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
@myApp.activity_trigger(input_name="filePath")
def E2_CopyFileToBlob(filePath)
blob_service_client = get_blob_service_client()
container_name = "test"
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
blob_client.upload_blob(filePath)
byte_count = os.path.getsize(filePath)
return byte_count
it ran perfectly in local and uploaded the files in container "test":



Files in "root_directory":

as you can see files have been uploaded in the storage account.


2 Comments
David Hurley
Thanks. This makes sense. So basically wrote normal Python functions and wrap in activity trigger so that they can be called in orchestrators but also imported into other functions and unittest.
Vivek Vaibhav Shandilya
Yes. you can use this method, write the code in normal function which are being used or defined again and again.