Skip to content

How to test streaming async responses with httpx AsyncClient #2006

@seweissman

Description

@seweissman

First check

  • I added a very descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn't find it.
  • I searched the FastAPI documentation, with the integrated search.
  • I already searched in Google "How to X in FastAPI" and didn't find any information.
  • I already read and followed all the tutorial in the docs and didn't find an answer.
  • I already checked if it is not related to FastAPI but to Pydantic.
  • I already checked if it is not related to FastAPI but to Swagger UI.
  • I already checked if it is not related to FastAPI but to ReDoc.
  • After submitting this, I commit to one of:
    • Read open issues with questions until I find 2 issues where I can help someone and add a comment to help there.
    • I already hit the "watch" button in this repository to receive notifications and I commit to help at least 2 people that ask questions in the future.
    • Implement a Pull Request for a confirmed bug.

Question

I have an async endpoint that streams an arbitrarily large amount of output and I want to write a test to check a portion of its streaming response. I have tried to adapt the FastAPI async tests example to use the httpx.AsyncClient.stream method, but it appears that the call to stream is happening synchronously in the test. How can I do this correctly?

Example

The example app below defines two endpoints:

  • stream_yes_infinite - Streams an infinite stream of "y"s.
  • stream_yes_truncate - Streams a truncated stream of "y"s, length of output controlled by request parameter nlines.

There are two tests:

  • test_stream_yes_infinite - Calls stream_yes_infinite with AsyncClient.stream. Loops over the output with aiter_lines asserting that each line is "y" until we have read 1000 lines, then break out of the loop.
  • test_stream_yes_truncate - Calls stream_yes_truncate with AsyncClient.stream. Loops over the all of the output with aiter_lines asserting that each line is "n". This test should fail right away.
# stream_yes.py
import pytest
import httpx
import asyncio

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import itertools

app = FastAPI()

@app.get("/stream_yes_infinite")
async def get_stream_yes_infinite():
    """
    Returns an infinite stream of "y" followed by newline
    """
    y_gen = itertools.repeat("y\n")
    return StreamingResponse(y_gen)

@app.get("/stream_yes_truncate")
async def get_stream_yes_truncate(nlines: int):
    """
    Returns a truncated stream of "y" followed by newline.
    """
    y_gen = itertools.repeat("y\n", nlines)
    return StreamingResponse(y_gen)

# This test should pass but it hangs forever and prints no output
@pytest.mark.asyncio
async def test_stream_yes_infinite():
    """Get the first 1000 lines from the infinite stream and test that the output is always 'y' """
    max_lines = 1000
    i = 0
    async with httpx.AsyncClient(app=app, base_url="http://test") as aclient:
        async with aclient.stream("GET", "/stream_yes_infinite") as response:
            async for line in response.aiter_lines():
                if i > max_lines:
                    break
                assert line.strip() == "y"
                print(line.strip())
                i += 1

# This test fails as expected, but time to failure is dependent on the value of the nlines parameter.
@pytest.mark.asyncio
async def test_stream_yes_truncate():
    """Get the output of the truncated stream and test that the output is always 'n' """
    async with httpx.AsyncClient(app=app, base_url="http://test") as aclient:
        async with aclient.stream("GET", "/stream_yes_truncate", params={"nlines": 1000}) as response:
            async for line in response.aiter_lines():
                # The test should fail because the output of stream_yes_truncate is "y"
                assert line.strip() == "n"
                print(line.strip())

if __name__ == "__main__":

    # Run: uvicorn stream_yes:app --reload

    # Check that we can stream output from the app running in uvicorn
    async def fetch_stream():
        max_lines = 1000
        i = 0
        async with httpx.AsyncClient() as aclient:
            async with aclient.stream("GET", "http://localhost:8000/stream_yes") as r:
                async for line in r.aiter_lines():
                    if i > max_lines:
                        break
                    print(line.strip())
                    i += 1

    loop = asyncio.get_event_loop()
    loop.run_until_complete(fetch_stream())

Description

Things that work as expected:

  • Run uvicorn stream_yes:app --reload
    • Open the browser and call the endpoint /stream_yes_infinite.
      • The browser successfully streams the output of the infinite stream.
    • Run python stream_yes.py.
      • The httpx client streams the response from the app when run in uvicorn (this works with the synchronous client as well).

Things that don't work as expected:

  • Run pytest -s stream_yes.py -k test_stream_yes_infinite.
    • I expected: The test to print a bunch of "y"s and pass.
    • But what happens is: The test prints no output and hangs forever.
  • Run pytest -s stream_yes.py -k test_stream_yes_truncate.
    • I expected: The test to fail right away.
    • But what happens is: The test takes more and more time to fail as the nlines parameter is increased.

Environment

  • OS: macOS
  • FastAPI Version: 0.61.0
  • Python version: 3.8.0
  • pytest version: 6.0.1
  • pytest-asyncio version: 0.14.0
  • httpx version: 0.14.3

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions