Testing

Run the HTTP stack in-process with stario.testing.TestClient: same status codes, headers, cookies, and bodies as production, without a listening socket. Always use async with TestClient(...) so shutdown runs (disconnect, drain_tasks). Practical recipes: Testing with TestClient.

pytest-asyncio: set asyncio_mode = auto under [tool.pytest.ini_options] in your app’s pyproject.toml, or mark async tests with @pytest.mark.asyncio. In strict mode, async fixtures usually need @pytest_asyncio.fixture—see Testing with TestClient and pytest-asyncio.

Bootstrap vs inline App: pass your real bootstrap (same entry as stario serve) for integration tests, or construct a small App in the test for narrow cases. With a bootstrap factory, client.app is the live App only after you enter the context manager. Use app_factory=lambda: build_app() when tests must not share mutable state on one App (see TestClient API).


1. Request and response (buffered)

Use buffered await client.get(...), post(...), request(...) when the handler finishes the HTTP exchange in one go. Assert what the client sent (params, json, headers, cookies) and what came back (r.status_code, r.text, r.json(), r.headers, r.cookies). That is the usual “did the handler return what I wanted?” test.

python
from myapp.app import bootstrap
from stario.testing import TestClient
 
 
async def test_list_items():
    async with TestClient(bootstrap) as client:
        r = await client.get("/items", params={"page": "2"})
        assert r.status_code == 200
        assert "items" in r.json()
 
 
async def test_create_item():
    async with TestClient(bootstrap) as client:
        r = await client.post("/items", json={"name": "cup"})
        assert r.status_code == 201
        assert r.json()["name"] == "cup"

For a tiny app without bootstrap, build an App in the test and pass it to TestClient the same way:

python
from stario import App, Context, Writer
from stario.testing import TestClient
 
 
async def test_ping_inline():
    app = App()
 
    async def ping(c: Context, w: Writer) -> None:
        w.write_headers(200)
        w.end()
 
    app.get("/ping", ping)
 
    async with TestClient(app) as client:
        assert (await client.get("/ping")).status_code == 200

client.app.url_for("name") builds paths the same way as in handlers—useful for named routes without hard-coding strings (Runtime — Reverse URLs). r.span_id is the root request span id for client.tracer when you assert on telemetry (Telemetry).

Full argument lists (files, follow_redirects, timeout, …) and TestResponse fields are in the API below. Request shape: Request.


2. Long-lived response (stream)

When the handler keeps the connection open (SSE, chunked bodies, incremental work), use async with client.stream("GET", path, ...). Read incrementally: await r.body(), async for chunk in r.iter_bytes(), or for SSE async for ev in r.iter_events(). Event dicts include only keys present in the frame (data may be omitted; multiple data: lines are joined with newlines). Leaving the async with disconnects that request (w.disconnected becomes true) and waits for the handler.

stream does not follow redirects; buffered calls do by default. On streams, iter_bytes() expects identity-encoded wire bytes; send Accept-Encoding: identity on the request. If the handler sets a non-identity Content-Encoding, iteration can raise. See TestStreamResponse in the API for body(), iter_bytes(), iter_events().

Below, a tiny App defines two routes: one writes chunked bytes (join with iter_bytes()), the other emits a minimal SSE line (parse with iter_events()). The same TestClient.stream pattern applies when bootstrap wires the routes—swap TestClient(app) for TestClient(bootstrap).

python
from stario import App, Context, Writer
from stario.testing import TestClient
 
 
async def test_stream_joins_byte_chunks():
    app = App()
 
    async def chunked(c: Context, w: Writer) -> None:
        w.write_headers(200)
        w.write(b"hel")
        w.write(b"lo")
        w.end()
 
    app.get("/chunked", chunked)
 
    async with TestClient(app) as client:
        async with client.stream(
            "GET", "/chunked", headers={"Accept-Encoding": "identity"}
        ) as r:
            assert r.status_code == 200
            parts = [chunk async for chunk in r.iter_bytes()]
    assert b"".join(parts) == b"hello"
 
 
async def test_stream_parses_sse_events():
    app = App()
 
    async def sse(c: Context, w: Writer) -> None:
        w.headers.set("content-type", "text/event-stream")
        w.write_headers(200)
        w.write(b"event: ping\ndata: hello\n\n")
        w.end()
 
    app.get("/events", sse)
 
    async with TestClient(app) as client:
        async with client.stream(
            "GET", "/events", headers={"Accept-Encoding": "identity"}
        ) as r:
            events = [e async for e in r.iter_events()]
    assert events == [{"event": "ping", "data": "hello"}]

Assert on telemetry for the same request: r.span_id scopes client.tracer (Telemetry, TestTracer API)—for example has_attribute, find_span(..., root_id=r.span_id) for child spans opened while the stream is open. On TestClient, client.exchanges lists recent buffered round-trips when you need to inspect traffic without re-reading bodies elsewhere.


3. Background work after the response

Buffered get / post return when the response is finished on the wire; work scheduled with app.create_task can still run. To wait until the app’s task queue is idle and tracer spans have settled, await client.drain_tasks() after the request (same work as at client shutdown—do not call drain_tasks from inside a coroutine scheduled with app.create_task: deadlock risk, same as App.join_tasks()).

If you need assertions on work that finishes after the handler returns, use a child span: start() in the handler, end() when the background task finishes, then inspect client.tracer after drain_tasks() or after exiting async with TestClient—attributes on the root request span alone will not capture late work.

python
import asyncio
 
import stario.responses as responses
from stario import App, Context, Writer
from stario.testing import TestClient
 
 
async def test_response_then_background():
    app = App()
 
    async def handler(c: Context, w: Writer) -> None:
        child = c.span.step("work.after_response")
        child.start()
 
        async def bg():
            try:
                await asyncio.sleep(0.02)
                child.attr("work.result", "done")
            finally:
                child.end()
 
        c.app.create_task(bg())
        responses.empty(w, 204)
 
    app.post("/action", handler)
 
    async with TestClient(app) as client:
        r = await client.post("/action")
        assert r.status_code == 204
        await client.drain_tasks()
 
    step = client.tracer.find_span("work.after_response", root_id=r.span_id)
    assert step is not None
    assert step.attributes.get("work.result") == "done"

API

class TestClient(app_or_bootstrap, *, app_factory=None, base_url='http://testserver', headers=None, cookies=None, follow_redirects=True, max_redirects=20, compression=CompressionConfig(min_size=512, zstd_level=3, zstd_window_log=None, brotli_level=4, brotli_window_log=None, gzip_level=6, gzip_window_bits=None), request_timeout=30.0)

Async HTTP client: exercise an App in-process.

Pass a fully wired app or the same bootstrap callable your program uses in production. For a bootstrap, enter async with TestClient(bootstrap) before calling request / stream / …; then app is the live application.

Buffered requests — request waits for the entire response body and returns TestResponse. The get / head / … helpers are thin wrappers with the same keyword arguments. Redirects are followed up to max_redirects unless overridden per call.

Streaming — stream provides TestStreamResponse after headers; it does not follow redirects. Prefer Accept-Encoding: identity when using TestStreamResponse.iter_bytes. Leaving the stream block disconnects that exchange and awaits the handler.

Telemetry — each response exposes span_id; finished data is on tracer. Buffered calls append TestExchange rows to exchanges.

Exit — buffered exchanges are disconnected, drain_tasks runs, then bootstrap teardown (if any) matches normal app shutdown.

async TestClient.drain_tasks()

Wait until App.join_tasks is quiet and tracer has no open spans.

Called automatically after signalling disconnect when exiting the client context. Unsafe to call from work scheduled via app.create_task (can deadlock).

async TestClient.request(method, url, *, params=None, headers=None, cookies=None, json=None, data=None, files=None, content=None, follow_redirects=None, timeout=None)

Issue an arbitrary HTTP method and return a fully buffered TestResponse.

TestClient.stream(method, url, *, params=None, headers=None, cookies=None, json=None, data=None, files=None, content=None, timeout=None)

Start a request and yield TestStreamResponse once headers are available.

Does not follow redirects (inspect Location yourself). On context exit, signals client disconnect for this exchange and awaits the handler coroutine. Use Accept-Encoding: identity when reading iter_bytes so the body is not compressed.

async TestClient.get(url, **kwargs)

async TestClient.head(url, **kwargs)

async TestClient.post(url, **kwargs)

async TestClient.put(url, **kwargs)

async TestClient.patch(url, **kwargs)

async TestClient.delete(url, **kwargs)

async TestClient.options(url, **kwargs)

class TestResponse(status_code, url, headers, content, request, span_id, _disconnect_future, history=<factory>, cookies=<factory>)

Buffered HTTP result from TestClient.get, TestClient.post, etc.

Fields

  • status_code(int):
  • url(str):
  • headers(Headers):
  • content(bytes):
  • request(ClientRequest):
  • span_id(UUID):
  • _disconnect_future(Future):
  • history(list):default: factory
  • cookies(dict):default: factory

The body is fully buffered; Content-Encoding is decoded when present. Pair span_id with TestClient.tracer for assertions.

TestResponse.json()

TestResponse.raise_for_status()

class TestStreamResponse(status_code, url, headers, request, span_id, cookies, _sink, _body_start, _chunked, _content_length, _disconnect_future, _app_task)

Streaming result from TestClient.stream once response headers exist.

Fields

  • status_code(int):
  • url(str):
  • headers(Headers):
  • request(ClientRequest):
  • span_id(UUID):
  • cookies(dict):
  • _sink(_GrowingSink):
  • _body_start(int):
  • _chunked(bool):
  • _content_length(Union):
  • _disconnect_future(Future):
  • _app_task(Task):

Use body, iter_bytes, or iter_events to read the entity body. Leaving the stream context disconnects this exchange and awaits the app task.

async TestStreamResponse.body()

Concatenate all body chunks after transfer decoding (same as iterating iter_bytes).

async TestStreamResponse.iter_bytes()

Yield decoded body chunks as they arrive (chunked / fixed-length / until close).

Only identity Content-Encoding is supported; request Accept-Encoding: identity or use buffered methods for compression.

async TestStreamResponse.iter_events()

Parse text/event-stream into dicts (keys such as event, id, data).

class TestTracer()

Test-side view of telemetry for TestClient.

Implements the stario.telemetry.Tracer protocol for request dispatch (create, start, add_event, end, … — see that protocol for handler-time usage). Assertions in tests should use the query helpers below; they only return finished span snapshots.

TestTracer.add_event(span_id, name, attributes=None, /, *, body=None)

TestTracer.create(name, attributes=None, /, *, parent_id=None)

TestTracer.end(span_id)

TestTracer.fail(span_id, message)

TestTracer.find_span(name, *, root_id=None, parent_id=None)

First finished span named name, in start-time order.

When several requests reuse the same span name, pass root_id=r.span_id (from the matching TestResponse) so you match the right subtree. parent_id requires an exact parent link.

TestTracer.get_event(span_id, event_name, *, index=0)

The index-th TelemetryEvent named event_name, or None.

TestTracer.get_events(span_id, *, name=None)

Events recorded on a finished span; filter with name when set.

TestTracer.get_span(span_id)

Return the finished TelemetrySpan for span_id, or None.

Open spans and unknown ids yield None.

TestTracer.has_attribute(span_id, key, value=<object object at 0x7d7c723890e0>)

Whether the finished span’s attributes include key.

Pass value to require equality; omit it to assert presence only.

TestTracer.has_event(span_id, event_name)

Whether get_events(span_id) contains at least one event_name.

TestTracer.has_open_spans()

True while any span created through this tracer has not been ended.

TestTracer.set_attribute(span_id, name, value)

TestTracer.set_attributes(span_id, attributes)

TestTracer.set_name(span_id, name)

TestTracer.start(span_id)