agentlattice — Python SDK
Governance-aware SDK for AI agents. Wraps agent actions with audit trails, approval gates, and circuit breaker integration.
Install
pip install agentlattice
For realtime event subscriptions: pip install agentlattice[realtime]
Method Inventory
Core
| Capability | Async | Sync | Details |
|---|---|---|---|
| Execute action | await al.execute("pr.open") |
al.execute_sync("pr.open") |
Reference |
| Gate (block until approved) | await al.gate("pr.merge") |
al.gate_sync("pr.merge") |
Reference |
Delegation
| Capability | Async | Sync | Details |
|---|---|---|---|
| Delegate sub-agent | async with al.delegate("reader", capabilities=[...], ttl=300) as sub: |
with al.delegate_sync("reader", ...) as sub: |
Reference |
| Parallel fan-out | async with AgentLattice.parallel(...) as [r1, r2]: |
— | Reference |
| Execute as child | await sub.execute("read_data") |
sub.execute_sync("read_data") |
Reference |
| Gate as child | await sub.gate("read_data") |
sub.gate_sync("read_data") |
Reference |
| Chain delegation | async with sub.delegate("sub-reader", ...) as s: |
with sub.delegate_sync(...) as s: |
Reference |
Introspection & Governance
| Capability | Async | Sync | Details |
|---|---|---|---|
| Agent identity & state | await al.whoami() |
al.whoami_sync() |
Reference |
| List policies | await al.policies(scope="org") |
al.policies_sync(scope="org") |
Reference |
| Dry-run policy check | await al.can_i("pr.merge") |
al.can_i_sync("pr.merge") |
Reference |
| Query audit trail | await al.audit(limit=50) |
al.audit_sync(limit=50) |
Reference |
| Verify audit integrity | await al.verify() |
al.verify_sync() |
Reference |
| List delegations | await al.delegations(role="parent") |
al.delegations_sync(role="parent") |
Reference |
| Revoke delegation | await al.revoke("del-abc123") |
al.revoke_sync("del-abc123") |
Reference |
| Report action outcome | await al.report("evt-abc123", outcome) |
al.report_sync("evt-abc123", outcome) |
Reference |
| Governance posture score | await al.posture() |
al.posture_sync() |
Reference |
Realtime
| Capability | Usage | Details |
|---|---|---|
| Subscribe to events | await al.subscribe("org-id", callback) |
Reference |
| Disconnect | await al.unsubscribe() |
Reference |
Types & Errors
| Export | Description | Details |
|---|---|---|
ActionResult |
Return type of execute() — status, audit ID, conditions |
Reference |
ActionOptions |
Options for execute() and gate() — data_accessed, metadata, event_id |
Reference |
ConditionResult |
Individual policy rule evaluation — field, operator, expected, result | Reference |
ReportOutcome |
Outcome for report() — status, message |
Reference |
GovernanceEvent |
Realtime event payload — event_type, agent_id, data | Reference |
AgentLatticeError |
Base error — code, message, details | Reference |
AgentLatticeDeniedError |
Policy denial or reviewer rejection — reason, policy, conditions | Reference |
AgentLatticeTimeoutError |
Approval window expired — approval_id | Reference |
Quickstart
import asyncio
import os
from agentlattice import AgentLattice
al = AgentLattice(api_key=os.environ["AL_API_KEY"])
async def main():
# gate() blocks until approved — your agent code only runs if it passes
await al.gate("pr.merge")
await merge_pr(42)
asyncio.run(main())
Handle denials and timeouts:
from agentlattice import AgentLattice, AgentLatticeDeniedError, AgentLatticeTimeoutError
al = AgentLattice(api_key=os.environ["AL_API_KEY"])
async def main():
try:
await al.gate("code.commit")
except AgentLatticeDeniedError as e:
print(f"Denied: {e.reason}, policy: {e.policy}")
print(f"Failed conditions: {e.conditions}")
except AgentLatticeTimeoutError as e:
print(f"Approval timed out: {e.approval_id}")
asyncio.run(main())
Sync usage
If you're not in an async context:
import os
from agentlattice import AgentLattice
al = AgentLattice(api_key=os.environ["AL_API_KEY"])
al.gate_sync("code.commit")
Core API
AgentLattice(api_key, *, base_url?, gate_poll_timeout_ms?, gate_poll_interval_ms?)
| Parameter | Default | Description |
|---|---|---|
api_key |
required | Bearer token — pass via os.environ["AL_API_KEY"], never hardcoded |
base_url |
https://www.agentlattice.io |
Override for self-hosted or staging |
gate_poll_timeout_ms |
8 * 60 * 60 * 1000 (8 hours) |
How long gate() polls for approval |
gate_poll_interval_ms |
5000 |
Poll interval in ms |
await al.execute(action_type, options?)
Submit an action and return immediately. Returns an ActionResult. Does not block.
from agentlattice import AgentLattice, ActionOptions
al = AgentLattice(api_key="al_...")
result = await al.execute("pr.open", ActionOptions(
data_accessed=[{"type": "source_code", "count": 5, "sensitivity": "low"}],
metadata={"pr_number": 42},
event_id="my-idempotency-key", # optional: dedup by your own key
))
if result.status == "executed":
print(f"Auto-executed under policy: {result.policy_name}")
elif result.status == "requested":
print(f"Awaiting approval: {result.approval_id}")
elif result.status == "denied":
print(f"Denied ({result.denial_reason}): {result.conditions_evaluated}")
await al.gate(action_type, options?)
Submit an action and block until it is approved or denied.
try:
await al.gate("pr.merge")
except AgentLatticeDeniedError as e:
# e.reason: "CONDITIONS_DENIED" | "POLICY_TAMPERED" | "DENIED_BY_REVIEWER"
# e.policy: name of the governing policy (if conditions-based)
# e.conditions: list of ConditionResult (which rule failed and why)
# e.approval_id: set if a human reviewer denied it
...
except AgentLatticeTimeoutError as e:
# e.approval_id: the approval that timed out
...
Sync: al.gate_sync(action_type, options?) / al.execute_sync(action_type, options?)
ActionResult
@dataclass
class ActionResult:
status: str # "executed" | "requested" | "denied" | ...
audit_event_id: str
approval_id: str | None # set when status="requested"
message: str | None
timeout_at: str | None
denial_reason: str | None # "CONDITIONS_DENIED" | "POLICY_TAMPERED" | ...
policy_name: str | None # governing policy name
conditions_evaluated: list[ConditionResult] # which rules passed/failed
ConditionResult
@dataclass
class ConditionResult:
field: str # e.g. "pr_size"
operator: str # e.g. "lt"
expected: Any # e.g. 100
result: bool # True = passed, False = failed
Agents can read conditions_evaluated to understand exactly which governance
rule blocked them and adapt their behavior accordingly.
Delegation — Scoped Sub-Agents
Create short-lived child agents with narrowed capabilities. Cleanup is automatic.
async with al.delegate("data-processor",
capabilities=["read_data", "write_results"],
ttl=300
) as sub:
await sub.execute("read_data")
result = await sub.execute("write_results")
Sync version:
with al.delegate_sync("data-processor",
capabilities=["read_data"], ttl=300
) as sub:
sub.execute_sync("read_data")
al.delegate(name, *, capabilities, ttl)
| Parameter | Description |
|---|---|
name |
Display name for the ephemeral child agent |
capabilities |
List of action types the child can perform (must be subset of parent's) |
ttl |
Time-to-live in seconds (max 86400 = 24 hours) |
Returns an async context manager yielding a DelegatedAgent. Cleanup runs
in __aexit__ regardless of exceptions.
DelegatedAgent
| Method | Description |
|---|---|
sub.execute(action_type, options?) |
Execute an action as the child |
sub.gate(action_type, options?) |
Execute + block until approved |
sub.delegate(name, *, capabilities, ttl) |
Chain further with narrower scope |
All methods have _sync variants.
Parallel fan-out
async with AgentLattice.parallel(
al.delegate("reader-1", capabilities=["read_data"], ttl=60),
al.delegate("reader-2", capabilities=["read_data"], ttl=60),
) as [r1, r2]:
result_1 = await r1.execute("read_data")
result_2 = await r2.execute("read_data")
See docs/ephemeral-agents.md for architecture details, security model, and cleanup guarantees.
Introspection & Governance
These methods give agents visibility into their own identity, policies, audit trails, and governance posture. Every method has a *_sync() variant.
await al.whoami()
Returns this agent's identity, active policies, and delegation relationships.
info = await al.whoami()
print(f"Agent: {info.name} ({info.config_id})")
print(f"Circuit breaker: {info.cb_state}")
print(f"Policies: {[p.name for p in info.policies]}")
await al.policies(scope?)
List policies. scope="own" (default) returns policies for this agent. scope="org" returns all org policies.
result = await al.policies(scope="org")
for p in result.policies:
print(f"{p.name}: {p.action_type} (approval={p.approval_required})")
await al.can_i(action_type)
Dry-run policy check. No audit event created, no approval requested.
check = await al.can_i("pr.merge")
if check.allowed:
print("Good to go")
elif check.needs_approval:
print(f"Will need approval under policy: {check.policy_name}")
await al.audit(cursor?, limit?, action_type?)
Query the audit trail with pagination.
page = await al.audit(limit=50, action_type="pr.merge")
for event in page.events:
print(f"{event.timestamp}: {event.action_type} -> {event.status}")
if page.next_cursor:
next_page = await al.audit(cursor=page.next_cursor)
await al.verify()
Verify the org's audit chain integrity (tamper detection).
result = await al.verify()
if not result.chain_valid:
print(f"Chain broken at row: {result.first_broken_at_row_id}")
await al.delegations(role?, active_only?)
List delegations. Filter by role="parent" or role="child", and active_only=False to include expired.
result = await al.delegations(role="parent", active_only=False)
for d in result.delegations:
print(f"{d.id}: active={d.active}, expires={d.expires_at}")
await al.revoke(delegation_id)
Revoke a delegation by ID.
result = await al.revoke("del-abc123")
assert result.revoked
await al.report(audit_event_id, outcome)
Report the outcome of a previously executed action. Closes the audit loop.
from agentlattice import ReportOutcome
result = await al.report(
"evt-abc123",
ReportOutcome(status="success", message="Deployed to prod"),
)
print(f"Reported at: {result.reported_at}")
Outcome statuses: "success", "failure", "partial".
await al.posture()
Get the org's governance posture score (0-100).
result = await al.posture()
print(f"Score: {result.score}/100")
for name, comp in result.components.items():
print(f" {name}: {comp.score}/{comp.max}")
Realtime Events
Subscribe to governance events via WebSocket. Requires pip install agentlattice[realtime].
from agentlattice import AgentLattice, GovernanceEvent
al = AgentLattice(api_key=os.environ["AL_API_KEY"])
def on_event(event: GovernanceEvent):
print(f"[{event.event_type}] agent={event.agent_id} data={event.data}")
await al.subscribe("org-id", on_event)
# Filter to specific event types
await al.subscribe("org-id", on_event, events=["action.denied", "anomaly.detected"])
# Filter to a specific agent
await al.subscribe("org-id", on_event, agent_filter="agent-123")
# Cleanup
await al.unsubscribe()
Event types: action.denied, action.executed, delegation.expired, delegation.revoked, policy.changed, anomaly.detected, enforcement.triggered.
Patterns
Self-correcting agent
conditions_evaluated is what makes this more than a compliance layer — it
closes the feedback loop so agents can adapt, not just fail.
from agentlattice import AgentLattice, AgentLatticeDeniedError, ActionOptions
al = AgentLattice(api_key=os.environ["AL_API_KEY"])
async def commit_with_governance(files: list[str]) -> None:
try:
await al.gate("pr.open", ActionOptions(
metadata={"pr_size": len(files), "repo": "acme/backend"},
))
await open_pr(files)
except AgentLatticeDeniedError as e:
if e.reason == "CONDITIONS_DENIED":
# Find which rules failed and why
failed = [c for c in e.conditions if not c.result]
# e.g. [ConditionResult(field="pr_size", operator="lt", expected=50, result=False)]
for rule in failed:
if rule.field == "pr_size":
# Policy says PRs must be < 50 files — split and retry
mid = len(files) // 2
await commit_with_governance(files[:mid])
await commit_with_governance(files[mid:])
return
# Reviewer denial or policy tamper — no retry
raise
Error Hierarchy
AgentLatticeError base — code, message, details
AgentLatticeDeniedError approval_id?, reason?, policy?, conditions?
AgentLatticeTimeoutError approval_id
Reliability
execute() and gate() retry automatically on 5xx and network errors — up to 3 attempts with 1s/2s/4s exponential backoff. Each request has a 30-second timeout. Pass event_id to make retries idempotent.
execute_sync() and gate_sync() are safe in async runtimes (LangGraph, FastAPI, CrewAI) — they detect a running event loop and dispatch to a thread pool automatically.
License
MIT