API Reference¶
Core¶
GantryEngine¶
gantrygraph.engine.engine.GantryEngine(*, llm, perception=None, tools=None, approval_callback=None, on_event=None, max_steps=50, guardrail=None, system_prompt=None, memory=None, checkpointer=None, enable_suspension=False, budget=None, workspace_policy=None)
¶
Autonomous agent engine backed by LangGraph.
Composes a perception source, a set of tools/MCP connectors, and an LLM
into a self-correcting observe → think → act → review loop.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
llm
|
BaseChatModel
|
Any LangChain |
required |
perception
|
BasePerception | None
|
How the agent observes its environment. |
None
|
tools
|
list[_AnyTool] | None
|
|
None
|
approval_callback
|
ApprovalCallback | None
|
Called before every tool execution. Return
|
None
|
on_event
|
EventCallback | None
|
Called after each node transition with a
|
None
|
max_steps
|
int
|
Hard upper bound on act-node executions. |
50
|
guardrail
|
GuardrailPolicy | None
|
Optional |
None
|
system_prompt
|
str | None
|
Prepended as a |
None
|
memory
|
BaseMemory | None
|
Optional long-term memory backend. Past experiences are recalled at the start of each run and the result is stored automatically on completion. |
None
|
checkpointer
|
Any
|
LangGraph checkpointer for state persistence. When
provided, |
None
|
enable_suspension
|
bool
|
Use LangGraph |
False
|
budget
|
BudgetPolicy | None
|
Optional |
None
|
workspace_policy
|
WorkspacePolicy | None
|
Optional |
None
|
Example::
agent = GantryEngine(
llm=ChatAnthropic(model="claude-sonnet-4-6"),
tools=[ShellTool(workspace="/tmp")],
on_event=lambda e: print(e),
max_steps=20,
)
result = agent.run("List the 5 largest files in /tmp")
arun(task, *, thread_id=None)
async
¶
Primary async entry point.
Enters the resource lifecycle, runs the graph to completion, stores the result in long-term memory (if configured), and returns the final answer as a string.
When suspension is enabled and an approval is needed, raises
:exc:AgentSuspended with the thread_id to resume from.
astream_events(task)
async
¶
Stream GantryEvents as they are emitted during execution.
get_graph()
¶
Return the compiled LangGraph — the official escape hatch for loop customisation.
Use this when the default observe → think → act → review topology
does not fit your use case. The returned CompiledStateGraph is a
standard LangGraph object; you can call ainvoke, astream,
get_state, etc. on it directly.
Pattern A — inspect or stream the existing graph:
.. code-block:: python
compiled = agent.get_graph()
# Invoke directly with a custom initial state
result = await compiled.ainvoke({
"task": "my task",
"messages": [],
"step_count": 0,
"is_done": False,
})
# Stream individual node outputs
async for chunk in compiled.astream(initial_state):
print(chunk)
Pattern B — build a fully custom loop using gantrygraph's node primitives:
.. code-block:: python
from functools import partial
from gantrygraph.engine import (
act_node, observe_node, review_node,
should_continue, think_node,
)
from gantrygraph.core.state import GantryState
from langgraph.graph import END, START, StateGraph
async def my_pre_act_hook(state: GantryState) -> dict:
"""Validate tool calls before execution."""
return {}
graph = StateGraph(GantryState)
graph.add_node("observe", partial(observe_node, perception=None, event_cb=None))
graph.add_node("think", partial(think_node, llm_with_tools=my_llm, event_cb=None))
graph.add_node("pre_act", my_pre_act_hook) # custom hook
graph.add_node("act", partial(act_node, tool_map=tool_map,
approval_cb=None, guardrail=None,
event_cb=None, use_interrupt=False))
graph.add_node("review", review_node)
graph.add_edge(START, "observe")
graph.add_edge("observe", "think")
graph.add_edge("think", "pre_act")
graph.add_edge("pre_act", "act")
graph.add_edge("act", "review")
graph.add_conditional_edges(
"review",
partial(should_continue, max_steps=30),
{"observe": "observe", END: END},
)
compiled = graph.compile()
All node functions are exported from :mod:gantrygraph.engine and accept only
keyword-only arguments (bound via functools.partial), so they remain
pure and testable in isolation.
resume(thread_id, *, approved=True)
async
¶
Resume a suspended agent run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
The |
required |
approved
|
bool
|
Decision to pass back to the interrupted node.
|
True
|
Returns:
| Type | Description |
|---|---|
str
|
The agent's final answer after resuming. |
Raises:
| Type | Description |
|---|---|
|
exc: |
run(task, *, thread_id=None)
¶
Synchronous entry point. Blocks until the task completes.
AgentSuspended¶
gantrygraph.engine.engine.AgentSuspended(thread_id, data=None)
¶
Bases: Exception
Raised by arun() when the agent is suspended awaiting human approval.
Resume execution with GantryEngine.resume(thread_id, approved=True/False).
GantryConfig¶
gantrygraph.config.GantryConfig
¶
Bases: BaseModel
Full declarative configuration for a GantryEngine instance.
All fields have sensible defaults so you only need to set what you change.
Attributes:
| Name | Type | Description |
|---|---|---|
max_steps |
int
|
Hard upper bound on act-node executions. |
system_prompt |
str | None
|
Optional extra system prompt prepended to every run. |
workspace |
str | None
|
If set, attaches |
shell_allowed_commands |
list[str] | None
|
Allowlist for |
shell_timeout |
float
|
Per-command wall-clock limit (seconds). |
perception |
Literal['none', 'desktop', 'web']
|
Perception backend to attach.
|
browser_headless |
bool
|
Run browser in headless mode. |
memory |
Literal['none', 'in_memory', 'chroma']
|
Long-term memory backend.
|
memory_persist_directory |
str | None
|
On-disk path for |
memory_collection |
str
|
ChromaDB collection name. |
telemetry_service_name |
str
|
|
telemetry_otlp_endpoint |
str | None
|
If set, enables OTel export to this OTLP gRPC endpoint (e.g. Grafana Alloy, Datadog agent, Jaeger). |
enable_suspension |
bool
|
Enable LangGraph interrupt-based HITL. |
guardrail_requires_approval |
list[str]
|
Tool names that require |
max_wall_seconds |
float | None
|
Wall-clock timeout for the entire |
build(llm, *, approval_callback=None, on_event=None, extra_tools=None)
¶
Assemble and return a configured GantryEngine.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
llm
|
Any
|
Any LangChain |
required |
approval_callback
|
Any
|
Optional HITL callback ( |
None
|
on_event
|
Any
|
Optional event callback; merged with the OTel
callback when |
None
|
extra_tools
|
list[Any] | None
|
Additional tools appended after the built-in set. |
None
|
Returns:
| Type | Description |
|---|---|
GantryEngine
|
A ready-to-use |
from_env(prefix='CLAW_')
classmethod
¶
Load config from environment variables.
Each field maps to {prefix}{FIELD_NAME_UPPERCASE}.
Example::
CLAW_MAX_STEPS=30
CLAW_WORKSPACE=/app
CLAW_MEMORY=chroma
CLAW_MEMORY_PERSIST_DIRECTORY=/data/memory
CLAW_TELEMETRY_OTLP_ENDPOINT=http://collector:4317
CLAW_ENABLE_SUSPENSION=true
CLAW_GUARDRAIL_REQUIRES_APPROVAL=shell_run,file_delete
CLAW_DESKTOP_MAX_RESOLUTION=1280,720
from_yaml(path)
classmethod
¶
Load config from a YAML file.
Requires pyyaml::
pip install pyyaml
Example YAML::
max_steps: 30
workspace: /app
memory: chroma
memory_persist_directory: /data/memory
telemetry_otlp_endpoint: http://localhost:4317
guardrail_requires_approval:
- shell_run
- file_delete
State & Events¶
GantryState¶
gantrygraph.core.state.GantryState
¶
Bases: TypedDict
LangGraph state dict for the gantrygraph agent loop.
All graph nodes receive the full state and return a partial update dict.
The messages field uses the add_messages reducer, so a node can
append a new message by returning {"messages": [new_msg]} without
reading the current list first.
Fields¶
task: The original task string passed to GantryEngine.run().
messages: Full conversation history, auto-appended via reducer.
step_count: Number of act-node executions so far; used by budget guard.
is_done: Set to True by the review node to terminate the loop.
last_error: Most recent tool error message (for self-correction context).
last_observation: Raw PerceptionResult.model_dump() from the last observe
node; stored so nodes can access it without re-capturing.
GantryEvent¶
gantrygraph.core.events.GantryEvent(event_type, step, data=dict())
dataclass
¶
Emitted by GantryEngine at each state transition for observability.
Pass an on_event callback to GantryEngine to receive these events.
The callback may be either a plain function or an async coroutine.
Example::
def my_logger(event: GantryEvent) -> None:
print(f"[step {event.step}] {event.event_type}: {event.data}")
agent = GantryEngine(..., on_event=my_logger)
PerceptionResult¶
gantrygraph.core.events.PerceptionResult
¶
Bases: BaseModel
Serialisable snapshot from any BasePerception.
Passed to the agent as a LangChain multimodal message via
to_message_content(). Pydantic is used here so the result
can be stored in GantryState.last_observation as a plain dict
via .model_dump().
to_message_content()
¶
Convert to LangChain multimodal message content blocks.
Base Classes¶
BasePerception¶
gantrygraph.core.base_perception.BasePerception
¶
Bases: ABC
Abstract base class for all perception sources.
Subclass this to add new ways for an agent to observe its environment (desktop screenshot, web page, terminal output, REST API, database state — anything that can be turned into text or an image).
The engine calls observe() once per loop iteration and attaches
the result as a multimodal HumanMessage before invoking the LLM.
If you return a screenshot_b64, the LLM receives a vision block;
if you return an accessibility_tree, it receives a text block; you
can return both at the same time.
Optionally override close() to release resources on shutdown (file
handles, network sockets, subprocesses). GantryEngine calls it
automatically at the end of every arun() / run() call.
Minimal example — observe a REST API:
.. code-block:: python
from gantrygraph.core.base_perception import BasePerception
from gantrygraph.core.events import PerceptionResult
import httpx
class APIStatusPerception(BasePerception):
def __init__(self, url: str) -> None:
self._url = url
self._client = httpx.AsyncClient()
async def observe(self) -> PerceptionResult:
resp = await self._client.get(self._url)
return PerceptionResult(
accessibility_tree=f"HTTP {resp.status_code}\n{resp.text[:2000]}"
)
async def close(self) -> None:
await self._client.aclose()
agent = GantryEngine(
llm=my_llm,
perception=APIStatusPerception("https://api.example.com/status"),
)
Terminal / subprocess example:
.. code-block:: python
import asyncio
from gantrygraph.core.base_perception import BasePerception
from gantrygraph.core.events import PerceptionResult
class TerminalPerception(BasePerception):
async def observe(self) -> PerceptionResult:
proc = await asyncio.create_subprocess_shell(
"ps aux --sort=-%cpu | head -20",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
stdout, _ = await proc.communicate()
return PerceptionResult(accessibility_tree=stdout.decode())
Combining multiple sources — use MultiPerception:
.. code-block:: python
from gantrygraph.perception import MultiPerception, DesktopScreen, WebPage
agent = GantryEngine(
llm=my_llm,
perception=MultiPerception([DesktopScreen(), WebPage("https://example.com")]),
)
BaseAction¶
gantrygraph.core.base_action.BaseAction
¶
Bases: ABC
Abstract base class for an action set — a bundle of LangChain tools.
Subclass this to expose a group of related tools to the agent.
GantryEngine collects tools from all registered action sets,
flattens them into one registry, and binds them to the LLM via
llm.bind_tools().
Grouping tools into a BaseAction subclass is the recommended
pattern when your tools share state (e.g., an HTTP session, a DB
connection) or when you want them to appear as a logical unit.
For single stateless tools, use the @gantry_tool decorator instead.
Optionally override close() to release resources on shutdown.
GantryEngine calls it automatically at the end of every run.
Minimal example — HTTP client tools:
.. code-block:: python
import httpx
from langchain_core.tools import StructuredTool
from gantrygraph.core.base_action import BaseAction
class HTTPTools(BaseAction):
def __init__(self, base_url: str) -> None:
self._client = httpx.AsyncClient(base_url=base_url)
def get_tools(self) -> list:
async def http_get(path: str) -> str:
"""Make a GET request to *path* and return the response body."""
r = await self._client.get(path)
return r.text
async def http_post(path: str, body: str) -> str:
"""POST *body* (JSON string) to *path*, return the response."""
r = await self._client.post(path, content=body)
return r.text
return [
StructuredTool.from_function(coroutine=http_get, name="http_get"),
StructuredTool.from_function(coroutine=http_post, name="http_post"),
]
async def close(self) -> None:
await self._client.aclose()
agent = GantryEngine(llm=my_llm, tools=[HTTPTools("https://api.example.com")])
Using @gantry_tool for simple stateless tools:
.. code-block:: python
from gantrygraph import gantry_tool
@gantry_tool
def calculator(expression: str) -> str:
"""Evaluate a Python math expression and return the result."""
return str(eval(expression)) # noqa: S307
agent = GantryEngine(llm=my_llm, tools=[calculator])
BaseMCPConnector¶
gantrygraph.core.base_mcp.BaseMCPConnector
¶
Bases: ABC
Abstract base class for MCP server connections.
MCP connectors own a subprocess (or network connection) lifetime.
They must be used as async context managers: the server starts in
__aenter__ and shuts down in __aexit__.
GantryEngine enters all connectors automatically inside its
_lifecycle() context manager before building the graph, so user
code never needs to manage the lifecycle manually.
Implement this ABC when you need to wrap an MCP-compatible server that
is not covered by the built-in MCPClient.
Standalone usage (e.g., in scripts):
.. code-block:: python
async with MCPClient("npx -y @mcp/github") as client:
tools = client.get_tools() # list[BaseTool]
print([t.name for t in tools])
Passing to GantryEngine (lifecycle managed automatically):
.. code-block:: python
agent = GantryEngine(
llm=my_llm,
tools=[MCPClient("npx -y @mcp/github")],
)
agent.run("Open a pull request")
Custom connector example (Python-based MCP server):
.. code-block:: python
from types import TracebackType
from langchain_core.tools import BaseTool, StructuredTool
from gantrygraph.core.base_mcp import BaseMCPConnector
class InProcessMCPConnector(BaseMCPConnector):
"""Wraps an in-process FastMCP server (no subprocess)."""
def __init__(self) -> None:
self._tools: list[BaseTool] = []
async def __aenter__(self) -> "InProcessMCPConnector":
# spin up in-process server, discover tools
self._tools = [
StructuredTool.from_function(
func=lambda x: x.upper(), name="shout",
description="Convert text to uppercase.",
)
]
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
self._tools = [] # cleanup
def get_tools(self) -> list[BaseTool]:
return self._tools
__aenter__()
abstractmethod
async
¶
Start the MCP server subprocess and initialise the client session.
__aexit__(exc_type, exc_val, exc_tb)
abstractmethod
async
¶
Shut down the MCP server subprocess.
get_tools()
abstractmethod
¶
Return dynamically generated StructuredTool instances.
Only valid after __aenter__ has been called.
BaseMemory¶
gantrygraph.memory.base.BaseMemory
¶
Bases: ABC
Pluggable long-term memory backend.
Subclasses must implement :meth:add and :meth:search.
All methods are async so implementations can use I/O-bound vector DBs.
Security¶
GuardrailPolicy¶
gantrygraph.security.policies.GuardrailPolicy
¶
Bases: BaseModel
Configures which tools require explicit human approval before execution.
Pass this to GantryEngine together with an approval_callback to
gate dangerous operations.
Example::
from gantrygraph.security import GuardrailPolicy
policy = GuardrailPolicy(
requires_approval={"shell_run", "file_delete"},
)
agent = GantryEngine(..., guardrail=policy,
approval_callback=my_slack_approval_fn)
WorkspacePolicy¶
gantrygraph.security.policies.WorkspacePolicy
¶
Bases: BaseModel
Restrict filesystem and shell operations to a specific directory.
Pass to GantryEngine via workspace_policy= to automatically add
FileSystemTools and ShellTool locked to workspace_path.
This is more declarative than listing the tools manually.
Example::
from gantrygraph import GantryEngine
from gantrygraph.security import WorkspacePolicy
agent = GantryEngine(
llm=my_llm,
workspace_policy=WorkspacePolicy(workspace_path="/home/user/project"),
)
# Equivalent to:
# GantryEngine(llm=..., tools=[FileSystemTools("/home/user/project"),
# ShellTool("/home/user/project")])
Note
allow_read_outside and allow_write_outside are reserved for
future fine-grained enforcement. Currently the workspace boundary is
enforced at the tool level (path traversal blocked in FileSystemTools).
BudgetPolicy¶
gantrygraph.security.policies.BudgetPolicy
¶
Bases: BaseModel
Hard limits to prevent runaway costs and infinite loops.
Pass to GantryEngine via budget= to enforce spending limits.
Enforcement
max_steps: capsGantryEngine.max_steps(whichever is lower wins).max_wall_seconds: wraps the fullarun()call inasyncio.wait_for; raisesTimeoutErroron breach.max_tokens: stored but not currently enforced by gantrygraph. Configure token limits on the LLM itself (e.g.max_tokens=inChatAnthropic).
Example::
from gantrygraph import GantryEngine
from gantrygraph.security import BudgetPolicy
agent = GantryEngine(
llm=my_llm,
budget=BudgetPolicy(max_steps=30, max_wall_seconds=120.0),
)
Swarm¶
GantrySupervisor¶
gantrygraph.swarm.supervisor.GantrySupervisor(*, llm, worker_factory=None, workers=None, max_workers=5)
¶
Decompose a task into parallel subtasks and synthesize the results.
The supervisor uses the LLM twice:
- Decompose — break the input task into N independent subtasks, optionally assigning each to a named specialist worker.
- Synthesize — merge all worker answers into a final response.
Worker agents run concurrently via asyncio.gather.
Homogeneous workers (original API — all workers share the same tools):
.. code-block:: python
from gantrygraph.swarm import GantrySupervisor
from gantrygraph import GantryEngine
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4-6")
supervisor = GantrySupervisor(
llm=llm,
worker_factory=lambda: GantryEngine(llm=llm, tools=[...]),
max_workers=4,
)
result = await supervisor.run("Analyse these 10 documents and summarise findings")
Heterogeneous workers (new API — each worker has its own tools):
.. code-block:: python
from gantrygraph.swarm import GantrySupervisor, WorkerSpec
from gantrygraph import GantryEngine
from gantrygraph.actions import ShellTool, FileSystemTools
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4-6")
supervisor = GantrySupervisor(
llm=llm,
workers=[
WorkerSpec(
name="shell_expert",
engine=GantryEngine(llm=llm, tools=[ShellTool(workspace="/tmp")]),
description="Runs shell commands and explores the filesystem.",
),
WorkerSpec(
name="file_editor",
engine=GantryEngine(llm=llm, tools=[FileSystemTools(workspace="/tmp")]),
description="Reads, writes, and edits files.",
),
],
)
result = await supervisor.run(
"Find all .log files in /tmp and summarise their contents."
)
When workers is provided the supervisor asks the LLM to assign
each subtask to the most appropriate specialist by name. If a
subtask is not assigned (or the name is unrecognised), it falls back
to the first worker in the list.
run(task)
async
¶
Decompose task, run workers concurrently, and synthesise results.
WorkerSpec¶
gantrygraph.swarm.worker.WorkerSpec(name, engine, description='')
dataclass
¶
Named specialist worker with its own pre-configured engine.
Use WorkerSpec when different subtasks require different tools,
LLMs, or configurations. Pass a list of WorkerSpec instances
to GantrySupervisor instead of a worker_factory.
The supervisor LLM reads all description fields and automatically
routes each decomposed subtask to the most appropriate specialist.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Short identifier used in routing (e.g. |
required |
engine
|
Any
|
A fully-configured |
required |
description
|
str
|
One-sentence description of what this worker can do. Used by the supervisor LLM to route subtasks. |
''
|
Example::
from gantrygraph import GantryEngine
from gantrygraph.actions import ShellTool, FileSystemTools
from gantrygraph.swarm import GantrySupervisor, WorkerSpec
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4-6")
supervisor = GantrySupervisor(
llm=llm,
workers=[
WorkerSpec(
name="shell_expert",
engine=GantryEngine(llm=llm, tools=[ShellTool(workspace="/tmp")]),
description="Runs shell commands, explores the filesystem, executes scripts.",
),
WorkerSpec(
name="file_editor",
engine=GantryEngine(llm=llm, tools=[FileSystemTools(workspace="/tmp")]),
description="Reads, writes, and edits files.",
),
],
)
result = await supervisor.run(
"Explore /tmp, find all .log files, then read their first 10 lines."
)
MCP¶
MCPClient¶
gantrygraph.mcp.client.MCPClient(server_command, env=None)
¶
Bases: BaseMCPConnector
Connect to an MCP server process and expose its tools as LangChain tools.
The server is started as a subprocess via stdio transport when the client
is used as an async context manager. GantryEngine manages this lifecycle
automatically.
Example — standalone::
async with MCPClient("npx -y @modelcontextprotocol/server-filesystem /tmp") as c:
tools = c.get_tools()
result = await tools[0].ainvoke({"path": "/tmp"})
Example — with engine (lifecycle managed automatically)::
agent = GantryEngine(
...,
tools=[MCPClient("npx -y @mcp/github")],
)
MCPToolRegistry¶
gantrygraph.mcp.registry.MCPToolRegistry(clients)
¶
Bases: BaseMCPConnector
Manage multiple MCPClient instances as a single pluggable unit.
Useful when an agent needs tools from several MCP servers — pass
the registry to GantryEngine instead of individual clients.
Example::
registry = MCPToolRegistry([
MCPClient("npx -y @mcp/github"),
MCPClient("npx -y @mcp/sqlite ./db.sqlite"),
])
agent = GantryEngine(..., tools=[registry])
Memory¶
InMemoryVector¶
gantrygraph.memory.in_memory.InMemoryVector()
¶
Bases: BaseMemory
Ephemeral in-process memory backed by trigram Jaccard similarity.
All entries are held in RAM and lost when the process exits. Thread-safe for single-threaded asyncio use (no locking needed).
Telemetry¶
OTelExporter¶
gantrygraph.telemetry.otel.OTelExporter(service_name='gantry-agent', otlp_endpoint=None)
¶
Converts GantryEvent callbacks into OpenTelemetry spans.
Usage::
exporter = OTelExporter(service_name="qa-agent")
agent = GantryEngine(llm=..., on_event=exporter.as_event_callback())
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
|
'gantry-agent'
|
otlp_endpoint
|
str | None
|
gRPC endpoint for an OTLP collector
(e.g. |
None
|
as_event_callback()
¶
Return a GantryEvent callback that creates OTel spans.
Each call to the returned function is stateful — the same callback instance tracks the root span across the full run. Do not share a single callback instance between concurrent agent runs.
force_flush(timeout_ms=5000)
¶
Flush pending spans to the exporter (call before process exit).
Decorators¶
gantry_tool¶
gantrygraph.tool.gantry_tool(fn=None, *, name=None, description=None)
¶
Decorate a function (sync or async) and return a LangChain BaseTool.
Can be used bare (@gantry_tool) or with keyword arguments
(@gantry_tool(name="x", description="y")).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
Callable[..., Any] | None
|
The function to wrap. Passed automatically when the
decorator is used bare. Leave |
None
|
name
|
str | None
|
Tool name visible to the LLM. Defaults to the
function's |
None
|
description
|
str | None
|
Tool description visible to the LLM. Defaults to the function's docstring. Required if the function has no docstring. |
None
|
Returns:
| Type | Description |
|---|---|
BaseTool | Callable[[Callable[..., Any]], BaseTool]
|
A |
BaseTool | Callable[[Callable[..., Any]], BaseTool]
|
decorated), or a decorator factory (when keyword arguments are |
BaseTool | Callable[[Callable[..., Any]], BaseTool]
|
provided). |
Raises:
| Type | Description |
|---|---|
ValueError
|
If neither a docstring nor an explicit |
Graph primitives¶
These are exported from gantrygraph for custom graph topologies:
gantrygraph.engine.nodes.observe_node(state, *, perception, event_cb)
async
¶
Capture the current environment and append it as a HumanMessage.
gantrygraph.engine.nodes.think_node(state, *, llm_with_tools, event_cb)
async
¶
Invoke the LLM with the full message history and get the next action.
gantrygraph.engine.nodes.act_node(state, *, tool_map, approval_cb, guardrail, event_cb, use_interrupt=False)
async
¶
Execute tool calls from the last AIMessage.
For each tool call: 1. Check the guardrail approval list — pause and ask if required. 2. Locate the tool in the registry — return an error message if not found. 3. Execute the tool — catch all exceptions and return them as error messages so the LLM can self-correct on the next think step.
When use_interrupt is True and a checkpointer is configured on the
graph, tool calls that need approval use LangGraph's interrupt() to
suspend execution and persist state. Resume via
GantryEngine.resume(thread_id, approved=True).
gantrygraph.engine.nodes.review_node(state)
¶
Decide whether the task is complete.
Termination condition: the last message is an AIMessage with no tool calls, meaning the LLM chose to stop calling tools and produce a final answer. This is a pure function — no I/O.
gantrygraph.engine.nodes.should_continue(state, *, max_steps)
¶
Conditional edge: loop back to observe, or terminate.
gantrygraph.engine.graph.build_graph(*, perception, llm_with_tools, tool_map, approval_cb, guardrail, event_cb, max_steps, memory=None, use_interrupt=False, checkpointer=None)
¶
Build and compile the gantrygraph agent StateGraph.
Nodes are bound to configuration via functools.partial so the node
functions themselves remain pure and testable without constructing a full
engine.
Graph structure::
START → memory_recall → observe → think → act → review → should_continue
↙ ↘
observe END