Skip to content

agentix.agent

agent

The agent loop.

Small and shared: call the model, and if it asked for tools, run them and feed the results back, until the model produces a final answer or a budget is hit. Everything load-bearing — the model, the tools, the policy — is injected. The loop only enforces the resource budgets here; the security guards plug in later (P3) without changing this control flow.

Agent

Agent(
    *,
    model: ModelFn,
    system_prompt: str,
    policy: AgentPolicy | None = None,
    tools: ToolRegistry | Iterable[Tool] | None = None,
    tool_executor: ToolExecutor | None = None,
    tool_schemas: Sequence[ToolSchema] | None = None,
    guards: Iterable[Guard] | None = None,
    confirm_fn: ConfirmFn | None = None,
    suspend_on_confirm: bool = False,
    events: AgentEvents | None = None,
    store: Store | None = None,
    model_limiter: Limiter | None = None,
    context_strategy: ContextStrategy | None = None,
    output_validator: OutputValidator | None = None,
    max_output_retries: int = 1,
    response_model: Any = None,
    memory: Memory | None = None,
    memory_limit: int = 5,
    remember_exchange: bool = False,
)

Drives the async agent loop around an injected model and tool executor.

Minimal usage::

agent = Agent(model=my_model, system_prompt="...")
outcome = await agent.run("Summarize today's tickets.")

The easiest way to add tools is the tools= argument — pass @tool functions (or a :class:~agentix.tools.ToolRegistry) and the agent derives both the executor and the schemas the model sees::

agent = Agent(model=m, system_prompt="...", tools=[get_weather, add])

For full control (e.g. a sandboxed executor) supply tool_executor and tool_schemas directly instead.

run async

run(
    user_request: str | list[ContentPart],
    *,
    run_id: str | None = None,
    interrupt: Interrupt | None = None,
) -> AgentOutcome

Run the loop to completion. If run_id is given and a store is configured, the run is checkpointed after every step (resumable). Pass an Interrupt to stop the run at its next safe boundary.

resume async

resume(
    run_id: str,
    *,
    decisions: Mapping[str, bool] | None = None,
    store: Store | None = None,
    interrupt: Interrupt | None = None,
) -> AgentOutcome

Reload a checkpointed run and continue the loop from where it stopped.

For a run suspended awaiting confirmation (suspend_on_confirm), pass decisions mapping each pending call.id to True (approve) or False (deny). A pending call with no entry is denied (fail closed). This may be called on a fresh Agent in a later process — the paused state lives entirely in the store.

run_sync

run_sync(
    user_request: str | list[ContentPart],
    *,
    run_id: str | None = None,
) -> AgentOutcome

Blocking convenience wrapper for scripts/CLIs. Do not call from inside a running event loop.

resume_sync

resume_sync(
    run_id: str,
    *,
    decisions: Mapping[str, bool] | None = None,
    store: Store | None = None,
) -> AgentOutcome

Blocking wrapper around :meth:resume. Do not call from inside a running event loop.

stream async

stream(
    user_request: str | list[ContentPart],
    *,
    run_id: str | None = None,
    interrupt: Interrupt | None = None,
) -> AsyncIterator[AgentStreamEvent]

Run the loop, yielding events as they happen: AnswerDelta text chunks, ToolStarted/ToolFinished around tool calls, and a final Done carrying the outcome.

Note: on_answer egress guards (e.g. PII redaction) cannot un-send already-streamed deltas — the deltas are raw, but Done.outcome.answer is passed through the guards. Use :meth:run if you need the user-facing text itself redacted before it is emitted.