Python SDK · v0.1
Python SDK
Sync and async Python client for SapixDB. Works with FastAPI, Django, Flask, and plain scripts. Python 3.9+.
Installation
pip
pip install sapixdb
uv
uv add sapixdb
poetry
poetry add sapixdb
Quick Start — Sync
Python
from sapixdb import SapixClient
db = SapixClient(url="http://localhost:7475", agent="my-app")
# Check connection
print(db.ping()) # True
# Write a record
record = db.collection("products").write({
"name": "Classic T-Shirt",
"price": 29.99,
"stock": 100,
})
print(record.id) # "nuc_abc123"
print(record.hash) # "sha3:e7f2a1..." — cryptographic proof
# Read latest records
products = db.collection("products").latest()
# Filter
shirts = db.collection("products").find({"category": "apparel"})
# Time travel
from datetime import datetime, timedelta, timezone
yesterday = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat()
snapshot = db.collection("orders").as_of(yesterday).latest()Async Client
AsyncSapixClient has an identical API to SapixClient — every method is just awaitable. Use it with FastAPI, asyncio scripts, or any async framework.
Python — asyncio
import asyncio
from sapixdb import AsyncSapixClient
async def main():
db = AsyncSapixClient(url="http://localhost:7475", agent="my-app")
record = await db.collection("products").write({"name": "T-Shirt", "price": 29.99})
products = await db.collection("products").latest()
# Async context manager
async with AsyncSapixClient(url="http://localhost:7475", agent="my-app") as db:
shirts = await db.collection("products").find({"category": "apparel"})
asyncio.run(main())FastAPI Integration
Python — main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from sapixdb import AsyncSapixClient, SapixNotFoundError
app = FastAPI()
db = AsyncSapixClient(url="http://localhost:7475", agent="store")
class ProductIn(BaseModel):
name: str
price: float
stock: int = 0
category: str | None = None
@app.post("/products", status_code=201)
async def create_product(body: ProductIn):
record = await db.collection("products").write(body.model_dump())
return {"id": record.id, "hash": record.hash, "timestamp": record.timestamp}
@app.get("/products")
async def list_products(category: str | None = None):
filter_ = {"category": category} if category else None
items = await db.collection("products").latest(filter=filter_)
return [{"id": r.id, **r.data} for r in items]
@app.get("/products/{record_id}")
async def get_product(record_id: str):
try:
r = await db.collection("products").get(record_id)
return {"id": r.id, **r.data}
except SapixNotFoundError:
raise HTTPException(status_code=404, detail="Product not found")Collection API
.write(data)→ WriteResultAppend a new record. Returns
WriteResult with id, hash, prev_hash, timestamp. Nothing is ever overwritten — every write is permanent..write_batch(records)→ list[WriteResult]Write multiple records. Async version runs them concurrently with
asyncio.gather..get(record_id)→ NucleotideRecordFetch by nucleotide ID. Raises
SapixNotFoundError if missing..latest(*, filter?, limit?)→ list[NucleotideRecord]Current (most recent) version of every record in the collection.
.history(*, filter?, limit?)→ list[NucleotideRecord]Full append-only history — every version ever written.
.find(filter, *, limit?)→ list[NucleotideRecord]Filter records (latest version only).
.find_one(filter)→ NucleotideRecord | NoneFirst match or
None. Never raises on empty result..as_of(timestamp)→ CollectionQueryScope reads to a point in time. Returns a query object with
.latest(), .find(), .find_one(), .all().Time Travel
Python
from datetime import datetime, timedelta, timezone
# 30 minutes ago
ts = (datetime.now(timezone.utc) - timedelta(minutes=30)).isoformat()
snapshot = db.collection("orders").as_of(ts).find({"customer_id": "cust_001"})
# Returns orders exactly as they existed 30 minutes agoGraph Relationships
db.graph.relate(src, dst, edge_type, weight=1.0)→ NoneCreate a typed directed edge. Cleanest way to express relationships.
db.graph.traverse(from_id, *, depth=1, direction='outbound')→ TraverseResultWalk the graph. Returns
TraverseResult with .nodes and .edges. Direction: "outbound" | "inbound" | "both".db.graph.neighbors(node_id, direction='outbound')→ list[NucleotideRecord]Direct neighbours — depth=1 shortcut.
Python
# Link order → customer db.graph.relate(order.id, customer.id, "placed_by") db.graph.relate(order.id, product.id, "contains") # Find everything connected to a customer (depth 2) result = db.graph.traverse(customer.id, depth=2, direction="inbound") print(result.nodes) # list of NucleotideRecord print(result.edges) # list of GraphEdge
Agent Ingest
Python — log every AI decision
# Log AI agent decisions permanently and immutably
db.ingest("ai_decisions", {
"model": "gpt-4o",
"action": "approve_loan",
"confidence": 0.94,
"applicant": "cust_001",
"reasoning": "Credit score 780, DTI 28%",
})
# Every decision is cryptographically signed — you can always prove
# what the AI decided, when, and why.Error Handling
Python
from sapixdb import SapixError, SapixNetworkError, SapixNotFoundError
try:
record = db.collection("orders").get("nuc_missing")
except SapixNotFoundError as e:
print(f"Not found: {e.record_id}") # record ID that was missing
except SapixNetworkError:
print("Cannot reach SapixDB — is it running?")
except SapixError as e:
print(f"Error {e.status}: {e}") # HTTP status + messageFull Example: Online Store
Python — store.py
from sapixdb import SapixClient
db = SapixClient(url="http://localhost:7475", agent="store")
# 1. Add product
shirt = db.collection("products").write({
"sku": "SHIRT-001", "name": "Classic T-Shirt",
"price": 29.99, "stock": 200, "category": "apparel",
})
# 2. Register customer
customer = db.collection("customers").write({
"name": "Alice Johnson", "email": "[email protected]",
})
# 3. Place order
order = db.collection("orders").write({
"customer_id": customer.id,
"items": [{"product_id": shirt.id, "qty": 2, "unit_price": 29.99}],
"total": 59.98,
"status": "placed",
})
# 4. Link in graph
db.graph.relate(order.id, customer.id, "placed_by")
db.graph.relate(order.id, shirt.id, "contains")
# 5. Ship (append — "placed" version is preserved forever)
db.collection("orders").write({
"customer_id": customer.id,
"status": "shipped",
"tracking": "UPS-1Z999AA10123456784",
})
# 6. Audit: what was the status when it was placed?
original = db.collection("orders").as_of(order.timestamp).find_one(
{"customer_id": customer.id}
)
print(original.data["status"]) # "placed" — not "shipped"Realtime Subscriptions (SSE)
Subscribe to live writes on any agent using Server-Sent Events. subscribe_agent() and subscribe_global() are Python generators — iterate them with a for loop (sync) or async for (async). The connection stays open until you break out of the loop or the server closes it.
.subscribe_agent(agent_id, *, since, filter_field)→ Iterator[StreamEvent]Blocking generator that yields
StreamEvent objects for every write to the named agent. since is an HLC timestamp for backfill; filter_field limits events to those whose JSON payload contains that field name..subscribe_global(*, agents, filter_field)→ Iterator[StreamEvent]Blocking generator for all agents.
agents restricts to a list of agent IDs.Python — sync per-agent stream
from sapixdb import SapixClient, StreamEvent
db = SapixClient(url="http://localhost:7475", agent="my-app")
# Blocking loop — runs until the server closes the stream
for event in db.subscribe_agent("orders"):
print(f"New record: {event.record_id} payload={event.payload}")
if event.payload and event.payload.get("status") == "shipped":
send_shipping_notification(event.payload)Python — sync with HLC backfill + field filter
last_seen_hlc = 1748304000000 # replay writes after this HLC timestamp first
for event in db.subscribe_agent(
"transactions",
since=last_seen_hlc,
filter_field="risk_score", # only events whose payload has risk_score
):
if event.payload["risk_score"] >= 0.8:
flag_for_review(event)Python — async (FastAPI / asyncio)
from sapixdb import AsyncSapixClient
async def stream_handler():
async with AsyncSapixClient(url="http://localhost:7475", agent="my-app") as db:
async for event in db.subscribe_agent("orders"):
await process_event(event)
# Global stream, two agents only
async def global_stream():
async with AsyncSapixClient(url="http://localhost:7475", agent="my-app") as db:
async for event in db.subscribe_global(agents=["orders", "payments"]):
print(event.agent_id, event.record_id, event.timestamp_ms)Python — StreamEvent fields
from sapixdb import StreamEvent event: StreamEvent event.agent_id # str — which agent was written to event.event_type # str — always "record.written" event.record_id # str — unique record ID event.content_hash # str — hex content hash (cryptographic proof) event.timestamp_ms # int — write time as Unix milliseconds event.payload # Any — decoded JSON payload (None for raw writes)
Also available: JavaScript / TypeScript and Go SDKs
npm install sapixdb (JS/TS) · go get github.com/sapixdb/sapixdb-go (Go) — same realtime API, every language.