Toolbox

Small building blocks next to the HTTP stack: Relay for in-process pub/sub, and stario.toys for development-only helpers. They do not replace a message broker or production observability; they cover local coordination and debugging UX. Relay assumes comfort with asyncio tasks; Toys assumes Datastar is loaded on the page when using inspector UIs.

Relay

Relay is in-process only: one Python process, dotted subject strings, and * wildcards. It is not a message broker. There is no persistence, no replay, and no delivery across processes, containers, or machines. If you need multiple workers or hosts, use something like NATS, Redis pub/sub, Kafka, or a cloud queue behind an interface your handlers already use. Relay stays the zero-dependency option for single-process apps and tests.

If no subscriber is registered for any pattern that a publish resolves to, that publish is a no-op for fan-out (there is no backlog for unheard messages). subscribe() does not validate pattern strings; patterns that never appear in the matcher’s lookup simply never receive events.

Wildcards and what each subscriber sees

You can use one * per segment, and only as the last segment (chat.*, room.*, *). Matching walks dotted paths: publish("a.b.c", …) is checked against patterns such as a.b.c, a.b.*, a.*, *. Subscribers receive (subject, data) where subject is the published string, not the pattern they registered. Odd patterns outside this family never match—there is no runtime error; they simply see no messages.

prefix.* vs the bare prefix subject: a subscriber on chat.* receives publishes whose subject has a dot after chat (for example chat.room1). A publish whose subject is exactly chat matches exact chat and the catch-all *, not chat.*—if you need both a room namespace and a top-level chat channel, model them as distinct subjects (for example chat vs chat.rooms.*) or publish under consistent depths.

python
from stario import Relay
 
relay = Relay[str]()
 
# Subscriber A: pattern "chat.*"  →  receives publishes whose subject has prefix "chat."
#   ✓ relay.publish("chat.room1", "x")
#   ✓ relay.publish("chat.room2.thread", "y")   # deeper segments still under chat.*
#   ✗ relay.publish("dm.room1", "z")             # not under chat
#   ✗ relay.publish("chatter.all", "w")         # "chat" ≠ "chatter"
 
# Subscriber B: pattern "chat.room1"  →  exact segment path only
#   ✓ relay.publish("chat.room1", "a")
#   ✗ relay.publish("chat.room1.extra", "b")    # extra segment → different subject
 
# Subscriber C: pattern "*"  →  catches every publish in this process
#   ✓ any relay.publish("anything.here", ...)

Subscription styles: async with + async for

Relay.subscribe(pattern) returns a RelaySubscription: an async context manager (register / unregister) and an async iterable (drain the queue).

The usual pattern is to enter the context first so you do not miss an early publish, then iterate on the same object. async for subject, payload in sub does not register again; async with already ran __aenter__, and the loop only awaits queue.get().

python
async with relay.subscribe("chat.*") as sub:
    # Registered before any code that might publish.
    async for subject, payload in sub:
        # Already subscribed; this loop does not re-enter the context manager.
        ...

You can also write async for subject, payload in relay.subscribe("chat.*"):, which auto-enters on the first iteration and exits when the loop ends. That is fine for small demos. If ordering matters, subscribe before starting tasks that publish: another coroutine can still publish before your first await if you only use the shorthand.

python
# Equivalent to wrapping the whole for-loop in async with ... (register → loop → unregister).
async for subject, payload in relay.subscribe("chat.*"):
    ...

Use Relay[PayloadType]() when you want shared payload typing across publishers and consumers.

End-to-end example

python
import asyncio
 
from stario import Relay
 
relay = Relay[dict]()
 
 
async def main():
    # Subscribe before any publish you need to receive. Relay has no buffer for a
    # pattern until at least one subscriber exists—an unmatched publish is a no-op.
    async with relay.subscribe("chat.*") as sub:
        # Synchronous publish: thread-safe; no await; now that we're registered, this
        # fan-out is queued on this subscription.
        relay.publish(
            "chat.control",
            {"kind": "joined", "room": "room1"},
        )
 
        async for subject, payload in sub:
            # First event should be the join we just sent. If we had published
            # "chat.room1" above the async with block, it would have been dropped and
            # this loop would never see it.
            assert subject == "chat.control"
            assert payload["kind"] == "joined"
            break
 
        relay.publish("chat.room1", {"text": "hi", "from": "ada"})
        async for subject, payload in sub:
            assert subject == "chat.room1"
            assert payload["text"] == "hi"
            break
 
 
asyncio.run(main())

Typical uses: fan-out between handlers and SSE loops in one process, cache-invalidation hints, small chat or live-UI prototypes, tests that drain a subscription on the same event loop.

Implementation notes (current behaviour)

  • Under a threading.Lock, publish snapshots matching (pattern → subscribers), then releases the lock before touching queues.

  • Each subscription remembers the asyncio event loop it registered on. When publish runs:

    • If this thread has no running loop (get_running_loop() fails), every enqueue uses loop.call_soon_threadsafe(queue.put_nowait, msg) so worker threads stay safe.

    • If this thread’s running loop is the same object as the subscriber’s loop, put_nowait runs directly.

    • Otherwise call_soon_threadsafe schedules the put on the subscriber’s loop.

  • If a loop is already closed, the put is skipped (shutdown).

None of this crosses process boundaries; scaling out still means an external broker you operate.

class Relay()

In-process pub/sub: dotted subjects, * wildcards, sync publish safe from any thread (threading.Lock).

Each subscription binds to its loop. publish uses put_nowait when called on that loop's thread; otherwise it uses call_soon_threadsafe so other threads can enqueue safely.

Relay.publish(subject, data)

Copy subscriber list under lock; enqueue on the loop (direct or call_soon_threadsafe) outside it.

Relay.subscribe(pattern)

Return a subscription handle: register with async with, then async for messages, or async for directly (registers on first iteration, unsubscribes when the loop ends).

class RelaySubscription(_relay, pattern)

Registered queue on a Relay: async with then async for, or async for alone (auto enter/exit for the loop).

Fields

  • _relay(ForwardRef('Relay[__annotationlib_name_1__]', is_class=True, owner=<class 'stario.relay.RelaySubscription'>)):
  • pattern(str):
  • _queue(Queue):default: factory
  • _entry(Union):default: None

Toys

stario.toys is for debugging only. Do not rely on it in production; helpers may change or disappear between releases without deprecation.

toy_inspector

Also exported as stario.toys.inspector (alias of toy_inspector).

toy_inspector(position="top-right") returns HTML for a fixed overlay that shows live Datastar signal JSON (ds.json_signals() in a draggable panel). It assumes you already load Datastar on the page—see ModuleScript (from stario import datastar as ds then ds.ModuleScript(), or equivalent)—or updates will not run.

position is one of top-left, top-right, bottom-left, bottom-right for initial placement; users can drag the panel. Signal text in the <pre> stays selectable.

python
from stario.toys import toy_inspector
from stario import html as h
 
# Inside a view that already includes Datastar on the layout
def dev_layout(*children):
    return h.Body(
        ...,
        toy_inspector("bottom-right"),
        *children,
    )

Limit it to developer builds or guard with an environment flag so production users never depend on the overlay.

toy_inspector(position='top-right')

Debug overlay: renders signal JSON and draggable via Datastar events (state in dataset).