agentix.agent¶
agent ¶
The agent loop.
Small and shared: call the model, and if it asked for tools, run them and feed the results back, until the model produces a final answer or a budget is hit. Everything load-bearing — the model, the tools, the policy — is injected. The loop only enforces the resource budgets here; the security guards plug in later (P3) without changing this control flow.
Agent ¶
Agent(
*,
model: ModelFn,
system_prompt: str,
policy: AgentPolicy | None = None,
tools: ToolRegistry | Iterable[Tool] | None = None,
tool_executor: ToolExecutor | None = None,
tool_schemas: Sequence[ToolSchema] | None = None,
guards: Iterable[Guard] | None = None,
confirm_fn: ConfirmFn | None = None,
suspend_on_confirm: bool = False,
events: AgentEvents | None = None,
store: Store | None = None,
model_limiter: Limiter | None = None,
context_strategy: ContextStrategy | None = None,
output_validator: OutputValidator | None = None,
max_output_retries: int = 1,
response_model: Any = None,
memory: Memory | None = None,
memory_limit: int = 5,
remember_exchange: bool = False,
)
Drives the async agent loop around an injected model and tool executor.
Minimal usage::
agent = Agent(model=my_model, system_prompt="...")
outcome = await agent.run("Summarize today's tickets.")
The easiest way to add tools is the tools= argument — pass @tool
functions (or a :class:~agentix.tools.ToolRegistry) and the agent derives
both the executor and the schemas the model sees::
agent = Agent(model=m, system_prompt="...", tools=[get_weather, add])
For full control (e.g. a sandboxed executor) supply tool_executor and
tool_schemas directly instead.
run
async
¶
run(
user_request: str | list[ContentPart],
*,
run_id: str | None = None,
interrupt: Interrupt | None = None,
) -> AgentOutcome
Run the loop to completion. If run_id is given and a store is
configured, the run is checkpointed after every step (resumable). Pass an
Interrupt to stop the run at its next safe boundary.
resume
async
¶
resume(
run_id: str,
*,
decisions: Mapping[str, bool] | None = None,
store: Store | None = None,
interrupt: Interrupt | None = None,
) -> AgentOutcome
Reload a checkpointed run and continue the loop from where it stopped.
For a run suspended awaiting confirmation (suspend_on_confirm),
pass decisions mapping each pending call.id to True (approve)
or False (deny). A pending call with no entry is denied (fail closed).
This may be called on a fresh Agent in a later process — the paused
state lives entirely in the store.
run_sync ¶
Blocking convenience wrapper for scripts/CLIs. Do not call from inside a running event loop.
resume_sync ¶
resume_sync(
run_id: str,
*,
decisions: Mapping[str, bool] | None = None,
store: Store | None = None,
) -> AgentOutcome
Blocking wrapper around :meth:resume. Do not call from inside a
running event loop.
stream
async
¶
stream(
user_request: str | list[ContentPart],
*,
run_id: str | None = None,
interrupt: Interrupt | None = None,
) -> AsyncIterator[AgentStreamEvent]
Run the loop, yielding events as they happen: AnswerDelta text
chunks, ToolStarted/ToolFinished around tool calls, and a final
Done carrying the outcome.
Note: on_answer egress guards (e.g. PII redaction) cannot un-send
already-streamed deltas — the deltas are raw, but Done.outcome.answer
is passed through the guards. Use :meth:run if you need the user-facing
text itself redacted before it is emitted.