Files
matrix-ai-agent/memory-service/migrate_json.py
Christian Gick 4cd7a0262e feat: Replace JSON memory with pgvector semantic search (MAT-11)
Add memory-service (FastAPI + pgvector) for semantic memory storage.
Bot now queries relevant memories per conversation instead of dumping all 50.
Includes migration script for existing JSON files.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 06:25:50 +02:00

109 lines
3.7 KiB
Python

#!/usr/bin/env python3
"""One-time migration: read JSON memory files, embed each fact, insert into pgvector."""
import asyncio
import json
import logging
import os
import sys
import time
import asyncpg
import httpx
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("migrate")
DB_DSN = os.environ.get("DATABASE_URL", "postgresql://memory:memory@memory-db:5432/memories")
LITELLM_URL = os.environ.get("LITELLM_BASE_URL", "")
LITELLM_KEY = os.environ.get("LITELLM_API_KEY", "not-needed")
EMBED_MODEL = os.environ.get("EMBED_MODEL", "text-embedding-3-small")
MEMORIES_DIR = os.environ.get("MEMORIES_DIR", "/data/memories")
async def embed(text: str) -> list[float]:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{LITELLM_URL}/embeddings",
json={"model": EMBED_MODEL, "input": text},
headers={"Authorization": f"Bearer {LITELLM_KEY}"},
)
resp.raise_for_status()
return resp.json()["data"][0]["embedding"]
async def main():
if not os.path.isdir(MEMORIES_DIR):
logger.error("MEMORIES_DIR %s does not exist", MEMORIES_DIR)
sys.exit(1)
json_files = [f for f in os.listdir(MEMORIES_DIR) if f.endswith(".json")]
if not json_files:
logger.info("No JSON memory files found in %s", MEMORIES_DIR)
return
logger.info("Found %d memory files to migrate", len(json_files))
pool = await asyncpg.create_pool(DB_DSN, min_size=1, max_size=5)
total_migrated = 0
total_skipped = 0
for filename in json_files:
filepath = os.path.join(MEMORIES_DIR, filename)
try:
with open(filepath) as f:
memories = json.load(f)
except (json.JSONDecodeError, OSError) as e:
logger.warning("Skipping %s: %s", filename, e)
continue
if not memories:
continue
# The filename is a hash of the user_id — we need to find the user_id
# from the fact entries or use the hash as identifier.
# Since JSON files are named by sha256(user_id)[:16].json, we can't
# reverse the hash. We'll need to scan bot-data for user_keys.json
# to build a mapping, or just use the hash as user_id placeholder.
#
# Better approach: read all facts and check if any contain user identity.
# For now, use the filename hash as a temporary user_id marker.
# The bot will re-associate on next interaction.
user_hash = filename.replace(".json", "")
for mem in memories:
fact = mem.get("fact", "").strip()
if not fact:
continue
try:
embedding = await embed(fact)
except Exception as e:
logger.warning("Embedding failed for fact '%s': %s", fact[:50], e)
total_skipped += 1
continue
vec_literal = "[" + ",".join(str(v) for v in embedding) + "]"
created_at = mem.get("created", time.time())
source_room = mem.get("source_room", "")
async with pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO memories (user_id, fact, source_room, created_at, embedding)
VALUES ($1, $2, $3, $4, $5::vector)
""",
user_hash, fact, source_room, created_at, vec_literal,
)
total_migrated += 1
logger.info("Migrated %s: %d facts", filename, len(memories))
await pool.close()
logger.info("Migration complete: %d migrated, %d skipped", total_migrated, total_skipped)
if __name__ == "__main__":
asyncio.run(main())