agentix.concurrency¶
concurrency ¶
Concurrency control for running agents at scale.
At "thousands of agents" the scarce resources are the provider connection pool, provider rate limits, and memory (one transcript per in-flight run). agentix imposes no cap by default; these primitives let you add one.
- :class:
Limiter— a shareable async semaphore. Inject it into anAgent(model_limiter=) to bound concurrent model calls across a whole fleet of agents — the right tool for a web server where each request spawns its ownrun()and there's no single place to gather. - :func:
bounded_gather— run many awaitables with a hard concurrency cap. The right tool for batch jobs, where it also bounds peak memory by limiting how many runs are alive at once.
Both bind to the event loop on first use — create and share them within one
running loop (don't reuse one Limiter across separate asyncio.run calls).
Limiter ¶
Bases: AbstractAsyncContextManager['Limiter']
An async concurrency limiter usable as an async with context.
Share one instance across many agents to cap their combined in-flight operations (e.g. concurrent provider requests)::
limiter = Limiter(50)
agent = Agent(model=..., system_prompt=..., model_limiter=limiter)
bounded_gather
async
¶
Like :func:asyncio.gather, but at most limit awaitables run at once.
Results are returned in the original order. Use this to launch many agent runs without flooding the provider or holding every transcript in memory::
outcomes = await bounded_gather(
[agent.run(q) for q in questions], limit=20
)