0

I have a function app with a http trigger function which takes an orchestrator function name. I have 2 orchestrator functions for differing workflows. Both of the orchestrator functions need to read a list of files from blob and upload data to blob following activity function execution. Following best practices I should have the Blob I/O functions (list,upload) as activity functions and call from orchestrator (deterministic, no I/O in orchestrator). I have looked at the example from Azure for fan out/in with durable functions (https://github.com/Azure/azure-functions-durable-python/blob/dev/samples-v2/fan_in_fan_out/function_app.py). However, in this example the BlobConnection() is called in the BlobUpload() function. But ideally I want a class that accepts a blob container name and creates a connection and functions such as List, Upload, Download can inherit this connection - so I am not creating the connection in each activity function. Can I somehow have a class as a function? Or do I need to recreate the connection string in each Blob I/O activity function? Thanks

1 Answer 1

1

You can use helper function in your durable function. I have created a get_blob_service_client helper function for connection.

def get_blob_service_client():
    connect_str = os.getenv('AzureWebJobsStorage')
    return BlobServiceClient.from_connection_string(connect_str)

I have made some changes in function which you provided.

function_app.py

    import os
    import pathlib
    import logging
    
    from azure.storage.blob import BlobServiceClient
    from azure.core.exceptions import ResourceExistsError
    
    import azure.functions as func
    import azure.durable_functions as df
    
    myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
    
    def get_blob_service_client():
        
        connect_str = os.getenv('AzureWebJobsStorage')
        return BlobServiceClient.from_connection_string(connect_str)
    
    
    @myApp.route(route="orchestrators/{functionName}")
    @myApp.durable_client_input(client_name="client")
    async def HttpStart(req: func.HttpRequest, client):
        
        instance_id = await client.start_new(req.route_params["functionName"])
    
        logging.info(f"Started orchestration with ID = '{instance_id}'.")
    
        return client.create_check_status_response(req, instance_id)
    
    @myApp.orchestration_trigger(context_name="context")
    def E2_BackupSiteContent(context: df.DurableOrchestrationContext):
        root_directory: str = r"C:\Users\< your username>\Desktop\test"
    
        if not root_directory:
            raise Exception("A directory path is required as input")
    
        files = yield context.call_activity("E2_GetFileList", root_directory)
        tasks = []
        for file in files:
            tasks.append(context.call_activity("E2_CopyFileToBlob", file))
        
        results = yield context.task_all(tasks)
        total_bytes = sum(results)
        return total_bytes
    
    
    
    @myApp.activity_trigger(input_name="rootDirectory")
    def E2_GetFileList(rootDirectory):
        all_file_paths = []
        
        for path, _, files in os.walk(rootDirectory):
                for name in files:
                        file_path = os.path.join(path, name)
                        all_file_paths.append(file_path)
        
        return all_file_paths
    
    @myApp.activity_trigger(input_name="filePath")
    def E2_CopyFileToBlob(filePath)
        blob_service_client = get_blob_service_client()
        container_name = "test"
       
        try:
            blob_service_client.create_container(container_name)
        except ResourceExistsError:
            pass
        parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
        blob_name = parent_dir + "_" + fname
        blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
        
        blob_client.upload_blob(filePath)
        byte_count = os.path.getsize(filePath)
 
        return byte_count

it ran perfectly in local and uploaded the files in container "test": enter image description here

enter image description here

enter image description here enter image description here

Files in "root_directory":

enter image description here

as you can see files have been uploaded in the storage account.

enter image description here

enter image description here

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks. This makes sense. So basically wrote normal Python functions and wrap in activity trigger so that they can be called in orchestrators but also imported into other functions and unittest.
Yes. you can use this method, write the code in normal function which are being used or defined again and again.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.