terminal

Python SDK

Async-first Python SDK for building AI agents with persistent memory, local SQLite storage, cloud sync, and full type hints.

pip install rekall-agent-sdkPython 3.9+

Installation

pip install rekall-agent-sdk

Requirements

Python 3.9 or higher is required. The SDK depends on aiohttp, aiosqlite, and pydantic. All dependencies are installed automatically.

Initialization

Create a RekallAgent instance and call initialize() to set up local storage and connect to the cloud.

agent.py
import os
from rekall import RekallAgent
agent = RekallAgent(
api_key=os.environ["REKALL_API_KEY"],
agent_id="my-assistant",
storage_path="./data/rekall.db",
sync_enabled=True,
sync_interval_seconds=30,
sync_strategy="last-write-wins",
base_url="https://api.rekall.ai/v1", # Default
)
await agent.initialize()
# When done, shut down cleanly
await agent.shutdown()

Environment variables

If api_key is not provided, the SDK reads from the REKALL_API_KEY environment variable automatically.

Creating Memories

Store memories locally and sync them to the cloud. All seven memory types are supported.

Episodic memory
memory = await agent.memories.create(
type="episodic",
content="User asked about quarterly revenue trends for Q3 2024.",
context="agent",
importance=0.8,
metadata={
"conversation_id": "conv_abc123",
"user_id": "user_456",
"tags": ["finance", "revenue", "Q3"],
},
)
print(memory.id) # "mem_..."
print(memory.created_at) # datetime object
print(memory.sync_status) # "pending" | "synced" | "conflict"
Semantic memory with entities
memory = await agent.memories.create(
type="semantic",
content="Acme Corp is a Fortune 500 company headquartered in San Francisco.",
entities=[
{"name": "Acme Corp", "type": "organization"},
{"name": "San Francisco", "type": "location"},
],
relationships=[
{
"source": "Acme Corp",
"target": "San Francisco",
"type": "headquartered_in",
},
],
)
Procedural memory
memory = await agent.memories.create(
type="procedural",
content="Deploy to production workflow",
steps=[
{"order": 1, "instruction": "Run the test suite", "command": "pytest"},
{"order": 2, "instruction": "Build the package", "command": "python -m build"},
{"order": 3, "instruction": "Deploy to staging", "command": "deploy --env staging"},
{"order": 4, "instruction": "Run smoke tests", "command": "pytest tests/smoke/"},
{"order": 5, "instruction": "Promote to production", "command": "deploy --env prod"},
],
)

Searching Memories

Semantic search with optional type, context, and metadata filters.

Semantic search
results = await agent.memories.search(
query="What do we know about the customer onboarding process?",
type="procedural",
context="agent",
limit=10,
threshold=0.7,
)
for result in results:
print(result.memory.content)
print(f"Score: {result.score:.2f}")
print(f"Source: {result.source}") # "local" or "cloud"
Filtered search
results = await agent.memories.search(
query="revenue",
filters={
"tags": {"contains": "finance"},
"created_after": "2024-06-01T00:00:00Z",
"importance": {"gte": 0.5},
},
sort="relevance", # "relevance" | "recency" | "importance"
)

Entity Management

Create, query, and manage entities and relationships in the knowledge graph.

# Create an entity
entity = await agent.entities.create(
name="Acme Corp",
type="organization",
attributes={
"industry": "Technology",
"founded": 2010,
"website": "https://acme.example.com",
},
)
# Create a relationship
await agent.entities.relate(
source_id=entity.id,
target_id="ent_person_456",
type="employs",
attributes={"role": "CEO", "since": "2018-01-01"},
)
# Query the knowledge graph
graph = await agent.entities.graph(
root_id=entity.id,
depth=2,
types=["employs", "headquartered_in", "acquired"],
)
for node in graph.nodes:
print(f"{node.name} ({node.type})")
for edge in graph.edges:
print(f"{edge.source} --{edge.type}--> {edge.target}")
# List entities by type
orgs = await agent.entities.list(type="organization", limit=50)

Agent Patterns

Common patterns for using the SDK in agent workflows.

Spawn and message child agents
# Spawn a child agent
child = await agent.spawn(
agent_id="research-assistant",
config={
"model": "claude-3-opus",
"system_prompt": "You are a research assistant...",
},
memory={"context": "agent", "inherit_parent": True},
)
# Send a message to the child
response = await agent.send(
child.agent_id,
message={
"type": "task",
"payload": {"query": "Research latest AI safety papers"},
},
)
# Terminate the child
await agent.terminate(
child.agent_id,
preserve_memories=True,
)
Conversation loop with memory
async def handle_message(user_message: str) -> str:
# Search for relevant context
context = await agent.memories.search(
query=user_message,
limit=5,
threshold=0.6,
)
# Build prompt with memory context
memory_context = "\n".join(
f"- {r.memory.content}" for r in context
)
# Generate response (using your LLM of choice)
response = await generate_response(
system=f"Relevant memories:\n{memory_context}",
user=user_message,
)
# Store the interaction as a memory
await agent.memories.create(
type="episodic",
content=f"User: {user_message}\nAssistant: {response}",
importance=0.6,
metadata={"turn": "conversation"},
)
return response

Async / Await Usage

The SDK is async-first. All I/O methods are coroutines and must be awaited. Use asyncio to run your agent.

Running the agent
import asyncio
from rekall import RekallAgent
async def main():
agent = RekallAgent(
agent_id="my-assistant",
storage_path="./rekall.db",
)
await agent.initialize()
try:
memory = await agent.memories.create(
type="episodic",
content="Agent started successfully.",
)
print(f"Created memory: {memory.id}")
results = await agent.memories.search(query="agent started")
print(f"Found {len(results)} results")
finally:
await agent.shutdown()
asyncio.run(main())
Concurrent operations
import asyncio
# Run multiple searches concurrently
results = await asyncio.gather(
agent.memories.search(query="revenue trends"),
agent.memories.search(query="customer feedback"),
agent.entities.list(type="organization"),
)
revenue_results, feedback_results, org_entities = results

Context Managers

Use the agent as an async context manager for automatic initialization and cleanup.

from rekall import RekallAgent
async def main():
async with RekallAgent(
agent_id="my-assistant",
storage_path="./rekall.db",
) as agent:
# Agent is initialized and ready
memory = await agent.memories.create(
type="episodic",
content="Created within context manager.",
)
print(f"Memory: {memory.id}")
# Agent is automatically shut down here
asyncio.run(main())

Recommended pattern

Using the context manager ensures that local storage is properly closed and pending sync operations are flushed, even if an exception occurs.

Type Hints

The SDK uses Pydantic models for all inputs and outputs. Full type hints are provided for IDE autocompletion and static analysis with mypy or pyright.

Key types
from rekall.types import (
# Core types
Memory,
MemoryType, # Literal["episodic", "semantic", "procedural", ...]
MemoryContext, # Literal["agent", "user", "hive"]
MemoryCreateInput,
MemorySearchInput,
MemorySearchResult,
# Entity types
Entity,
EntityCreateInput,
Relationship,
GraphQueryResult,
# Agent types
AgentConfig,
AgentStatus,
SpawnOptions,
# Sync types
SyncStatus,
SyncStrategy, # Literal["last-write-wins", "cloud-wins", "local-wins", "manual"]
)
Memory model
from pydantic import BaseModel
from datetime import datetime
from typing import Optional
class Memory(BaseModel):
id: str
type: MemoryType
content: str
context: MemoryContext
importance: float
metadata: dict[str, Any]
entities: list[Entity]
relationships: list[Relationship]
created_at: datetime
updated_at: datetime
accessed_at: datetime
access_count: int
decay_factor: float
sync_status: Literal["pending", "synced", "conflict"]

Error Handling

The SDK raises typed exceptions for all API and storage errors.

from rekall.exceptions import (
RekallError,
RekallAuthError,
RekallNotFoundError,
RekallValidationError,
RekallRateLimitError,
RekallServerError,
)
try:
memory = await agent.memories.get("mem_nonexistent")
except RekallNotFoundError as e:
print(f"Not found: {e.resource_id}")
except RekallRateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after_seconds}s")
await asyncio.sleep(e.retry_after_seconds)
except RekallAuthError:
print("Invalid API key")
except RekallError as e:
print(f"API error {e.status_code}: {e.code} - {e.message}")

Offline error handling

When operating offline, API errors are not raised for write operations -- the SDK queues changes locally. Errors will surface during the next sync attempt via the on_sync_error callback.

Progressive Disclosure (Infinite Context)

The Python SDK provides the same progressive disclosure pattern for token-efficient memory retrieval.

# Step 1: Get lightweight index
index = await agent.recall_index("project requirements", token_budget=2000)
print(f"Found {len(index.entries)} memories using {index.index_tokens} tokens")
# Step 2: Select relevant memories by score
relevant_ids = [e.id for e in index.entries if e.score > 0.7]
# Step 3: Load full content
memories = await agent.recall_full(relevant_ids)
# Or use smart_recall for automatic handling
memories = await agent.smart_recall(
"project requirements",
token_budget=4000,
progressive=True,
)
# Capture observations for future retrieval
await agent.observe(
content=file_content,
tool_name="file_read",
tags=["config", "deployment"],
)
Rekall
rekall