3

I am trying to use httpx in a FastAPI endpoint to download files from a server and return them as a StreamingResponse. For some processing, I need to get the header information along with the data. I want to stream the file data so I came up with this attempt, boiled down to a MRE:

from typing import AsyncIterable

import httpx
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


class FileStream:
    def __init__(self, headers: dict[str, str], stream: AsyncIterable[bytes]):
        self.headers = headers
        self.stream = stream


async def get_file_stream(url) -> FileStream:
    async with httpx.AsyncClient() as client:
        async with client.stream("GET", url) as response:

            async def chunk_generator() -> AsyncIterable[bytes]:
                async for chunk in response.aiter_bytes():
                    yield chunk

            return FileStream(response.headers, chunk_generator())


@app.get("/download")
async def download_file():
    file_stream = await get_file_stream(url=some_url)

    headers = {}
    media_type = "application/octet-stream"
    # some code setting headers and media_type based on file_stream.headers

    return StreamingResponse(file_stream.stream, media_type=media_type, headers=headers)


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="127.0.0.1", port=8000, log_level="debug")

This leads to the Error httpx.StreamClosed: Attempted to read or stream content, but the stream has been closed.. To my understanding, this is because of the scoping of the context managers in get_file_stream, so I tried to solve it with a wrapping context manager:

from contextlib import asynccontextmanager
from typing import AsyncIterable

import httpx
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


class FileStream:
    def __init__(self, headers: dict[str, str], stream: AsyncIterable[bytes]):
        self.headers = headers
        self.stream = stream

@asynccontextmanager
async def get_file_stream(url) -> FileStream:
    async with httpx.AsyncClient() as client:
        async with client.stream("GET", url) as response:

            async def chunk_generator() -> AsyncIterable[bytes]:
                async for chunk in response.aiter_bytes():
                    yield chunk

            yield FileStream(response.headers, chunk_generator())


@app.get("/download")
async def download_file():
    async with get_file_stream(url=some_url) as file_stream:

        headers = {}
        media_type = "application/octet-stream"
        # some code setting headers and media_type based on file_stream.headers

        return StreamingResponse(file_stream.stream, media_type=media_type, headers=headers)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000, log_level="debug")

However, this leads to the same issue. It seems that I am missing some points here. Any hints on how to solve this?

Update The problem seems to be with how FastAPI handles StreamingResponses. It indeed closes the resources before starting the response streaming. I am trying to find a solid workaround.

Still same for fastapi 0.116.1

1
  • Don’t use async with client.stream(...) inside your endpoint. That guarantees the stream will close too early. Instead, create the httpx client and response, then hand over a generator (gen) that: yields chunks, and in a finally: block closes the response and client when iteration is done Commented Sep 17 at 7:41

1 Answer 1

1

The issue is context management. Try wrapping your httpx call within the generator object. This way the stream should stay open while you get data.

Relevant Code Updates:

async def get_file_stream(url: str) -> FileStream:
    async def chunk_generator() -> AsyncIterable[bytes]:
        async with httpx.AsyncClient() as client:
            async with client.stream("GET", url) as response:
                async for chunk in response.aiter_bytes():
                    yield chunk
    return FileStream({}, chunk_generator())


@app.get("/download")
async def download_file():

    file_stream = await get_file_stream(some_url)

    headers = {}
    media_type = "application/octet-stream"

    return StreamingResponse(
        file_stream.stream,
        media_type=media_type,
        headers=headers,
    )
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.