Python SDK
Async-first Python SDK for building AI agents with persistent memory, local SQLite storage, cloud sync, and full type hints.
pip install rekall-agent-sdkPython 3.9+
Installation
pip install rekall-agent-sdk
Requirements
Python 3.9 or higher is required. The SDK depends on aiohttp, aiosqlite, and pydantic. All dependencies are installed automatically.
Initialization
Create a RekallAgent instance and call initialize() to set up local storage and connect to the cloud.
import osfrom rekall import RekallAgentagent = RekallAgent(api_key=os.environ["REKALL_API_KEY"],agent_id="my-assistant",storage_path="./data/rekall.db",sync_enabled=True,sync_interval_seconds=30,sync_strategy="last-write-wins",base_url="https://api.rekall.ai/v1", # Default)await agent.initialize()# When done, shut down cleanlyawait agent.shutdown()
Environment variables
If api_key is not provided, the SDK reads from the REKALL_API_KEY environment variable automatically.
Creating Memories
Store memories locally and sync them to the cloud. All seven memory types are supported.
memory = await agent.memories.create(type="episodic",content="User asked about quarterly revenue trends for Q3 2024.",context="agent",importance=0.8,metadata={"conversation_id": "conv_abc123","user_id": "user_456","tags": ["finance", "revenue", "Q3"],},)print(memory.id) # "mem_..."print(memory.created_at) # datetime objectprint(memory.sync_status) # "pending" | "synced" | "conflict"
memory = await agent.memories.create(type="semantic",content="Acme Corp is a Fortune 500 company headquartered in San Francisco.",entities=[{"name": "Acme Corp", "type": "organization"},{"name": "San Francisco", "type": "location"},],relationships=[{"source": "Acme Corp","target": "San Francisco","type": "headquartered_in",},],)
memory = await agent.memories.create(type="procedural",content="Deploy to production workflow",steps=[{"order": 1, "instruction": "Run the test suite", "command": "pytest"},{"order": 2, "instruction": "Build the package", "command": "python -m build"},{"order": 3, "instruction": "Deploy to staging", "command": "deploy --env staging"},{"order": 4, "instruction": "Run smoke tests", "command": "pytest tests/smoke/"},{"order": 5, "instruction": "Promote to production", "command": "deploy --env prod"},],)
Searching Memories
Semantic search with optional type, context, and metadata filters.
results = await agent.memories.search(query="What do we know about the customer onboarding process?",type="procedural",context="agent",limit=10,threshold=0.7,)for result in results:print(result.memory.content)print(f"Score: {result.score:.2f}")print(f"Source: {result.source}") # "local" or "cloud"
results = await agent.memories.search(query="revenue",filters={"tags": {"contains": "finance"},"created_after": "2024-06-01T00:00:00Z","importance": {"gte": 0.5},},sort="relevance", # "relevance" | "recency" | "importance")
Entity Management
Create, query, and manage entities and relationships in the knowledge graph.
# Create an entityentity = await agent.entities.create(name="Acme Corp",type="organization",attributes={"industry": "Technology","founded": 2010,"website": "https://acme.example.com",},)# Create a relationshipawait agent.entities.relate(source_id=entity.id,target_id="ent_person_456",type="employs",attributes={"role": "CEO", "since": "2018-01-01"},)# Query the knowledge graphgraph = await agent.entities.graph(root_id=entity.id,depth=2,types=["employs", "headquartered_in", "acquired"],)for node in graph.nodes:print(f"{node.name} ({node.type})")for edge in graph.edges:print(f"{edge.source} --{edge.type}--> {edge.target}")# List entities by typeorgs = await agent.entities.list(type="organization", limit=50)
Agent Patterns
Common patterns for using the SDK in agent workflows.
# Spawn a child agentchild = await agent.spawn(agent_id="research-assistant",config={"model": "claude-3-opus","system_prompt": "You are a research assistant...",},memory={"context": "agent", "inherit_parent": True},)# Send a message to the childresponse = await agent.send(child.agent_id,message={"type": "task","payload": {"query": "Research latest AI safety papers"},},)# Terminate the childawait agent.terminate(child.agent_id,preserve_memories=True,)
async def handle_message(user_message: str) -> str:# Search for relevant contextcontext = await agent.memories.search(query=user_message,limit=5,threshold=0.6,)# Build prompt with memory contextmemory_context = "\n".join(f"- {r.memory.content}" for r in context)# Generate response (using your LLM of choice)response = await generate_response(system=f"Relevant memories:\n{memory_context}",user=user_message,)# Store the interaction as a memoryawait agent.memories.create(type="episodic",content=f"User: {user_message}\nAssistant: {response}",importance=0.6,metadata={"turn": "conversation"},)return response
Async / Await Usage
The SDK is async-first. All I/O methods are coroutines and must be awaited. Use asyncio to run your agent.
import asynciofrom rekall import RekallAgentasync def main():agent = RekallAgent(agent_id="my-assistant",storage_path="./rekall.db",)await agent.initialize()try:memory = await agent.memories.create(type="episodic",content="Agent started successfully.",)print(f"Created memory: {memory.id}")results = await agent.memories.search(query="agent started")print(f"Found {len(results)} results")finally:await agent.shutdown()asyncio.run(main())
import asyncio# Run multiple searches concurrentlyresults = await asyncio.gather(agent.memories.search(query="revenue trends"),agent.memories.search(query="customer feedback"),agent.entities.list(type="organization"),)revenue_results, feedback_results, org_entities = results
Context Managers
Use the agent as an async context manager for automatic initialization and cleanup.
from rekall import RekallAgentasync def main():async with RekallAgent(agent_id="my-assistant",storage_path="./rekall.db",) as agent:# Agent is initialized and readymemory = await agent.memories.create(type="episodic",content="Created within context manager.",)print(f"Memory: {memory.id}")# Agent is automatically shut down hereasyncio.run(main())
Recommended pattern
Using the context manager ensures that local storage is properly closed and pending sync operations are flushed, even if an exception occurs.
Type Hints
The SDK uses Pydantic models for all inputs and outputs. Full type hints are provided for IDE autocompletion and static analysis with mypy or pyright.
from rekall.types import (# Core typesMemory,MemoryType, # Literal["episodic", "semantic", "procedural", ...]MemoryContext, # Literal["agent", "user", "hive"]MemoryCreateInput,MemorySearchInput,MemorySearchResult,# Entity typesEntity,EntityCreateInput,Relationship,GraphQueryResult,# Agent typesAgentConfig,AgentStatus,SpawnOptions,# Sync typesSyncStatus,SyncStrategy, # Literal["last-write-wins", "cloud-wins", "local-wins", "manual"])
from pydantic import BaseModelfrom datetime import datetimefrom typing import Optionalclass Memory(BaseModel):id: strtype: MemoryTypecontent: strcontext: MemoryContextimportance: floatmetadata: dict[str, Any]entities: list[Entity]relationships: list[Relationship]created_at: datetimeupdated_at: datetimeaccessed_at: datetimeaccess_count: intdecay_factor: floatsync_status: Literal["pending", "synced", "conflict"]
Error Handling
The SDK raises typed exceptions for all API and storage errors.
from rekall.exceptions import (RekallError,RekallAuthError,RekallNotFoundError,RekallValidationError,RekallRateLimitError,RekallServerError,)try:memory = await agent.memories.get("mem_nonexistent")except RekallNotFoundError as e:print(f"Not found: {e.resource_id}")except RekallRateLimitError as e:print(f"Rate limited. Retry after {e.retry_after_seconds}s")await asyncio.sleep(e.retry_after_seconds)except RekallAuthError:print("Invalid API key")except RekallError as e:print(f"API error {e.status_code}: {e.code} - {e.message}")
Offline error handling
When operating offline, API errors are not raised for write operations -- the SDK queues changes locally. Errors will surface during the next sync attempt via the on_sync_error callback.
Progressive Disclosure (Infinite Context)
The Python SDK provides the same progressive disclosure pattern for token-efficient memory retrieval.
# Step 1: Get lightweight indexindex = await agent.recall_index("project requirements", token_budget=2000)print(f"Found {len(index.entries)} memories using {index.index_tokens} tokens")# Step 2: Select relevant memories by scorerelevant_ids = [e.id for e in index.entries if e.score > 0.7]# Step 3: Load full contentmemories = await agent.recall_full(relevant_ids)# Or use smart_recall for automatic handlingmemories = await agent.smart_recall("project requirements",token_budget=4000,progressive=True,)# Capture observations for future retrievalawait agent.observe(content=file_content,tool_name="file_read",tags=["config", "deployment"],)
