Skip to content

agentix.concurrency

concurrency

Concurrency control for running agents at scale.

At "thousands of agents" the scarce resources are the provider connection pool, provider rate limits, and memory (one transcript per in-flight run). agentix imposes no cap by default; these primitives let you add one.

  • :class:Limiter — a shareable async semaphore. Inject it into an Agent (model_limiter=) to bound concurrent model calls across a whole fleet of agents — the right tool for a web server where each request spawns its own run() and there's no single place to gather.
  • :func:bounded_gather — run many awaitables with a hard concurrency cap. The right tool for batch jobs, where it also bounds peak memory by limiting how many runs are alive at once.

Both bind to the event loop on first use — create and share them within one running loop (don't reuse one Limiter across separate asyncio.run calls).

Limiter

Limiter(max_concurrency: int)

Bases: AbstractAsyncContextManager['Limiter']

An async concurrency limiter usable as an async with context.

Share one instance across many agents to cap their combined in-flight operations (e.g. concurrent provider requests)::

limiter = Limiter(50)
agent = Agent(model=..., system_prompt=..., model_limiter=limiter)

bounded_gather async

bounded_gather(
    aws: Sequence[Awaitable[_T]], *, limit: int
) -> list[_T]

Like :func:asyncio.gather, but at most limit awaitables run at once.

Results are returned in the original order. Use this to launch many agent runs without flooding the provider or holding every transcript in memory::

outcomes = await bounded_gather(
    [agent.run(q) for q in questions], limit=20
)