feat: scheduled reminders + less aggressive article summary

Add scheduled messages/reminders system:
- New scheduled_messages table in memory-service with CRUD endpoints
- schedule_message, list_reminders, cancel_reminder tools for the bot
- Background scheduler loop (30s) sends due reminders automatically
- Supports one-time, daily, weekly, weekdays, monthly repeat patterns

Make article URL handling non-blocking:
- Show 3 options (discuss, text summary, audio) instead of forcing audio wizard
- Default to passing article context to AI if user just keeps chatting
- New AWAITING_LANGUAGE state for cleaner audio flow FSM

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Christian Gick
2026-03-09 06:55:14 +02:00
parent 19abea01ca
commit 964a3f6075
4 changed files with 558 additions and 8 deletions

View File

@@ -84,7 +84,11 @@ class ArticleSummaryHandler:
return await self._check_for_url(room_id, sender, body) return await self._check_for_url(room_id, sender, body)
elif session.state == ArticleState.URL_DETECTED: elif session.state == ArticleState.URL_DETECTED:
# Waiting for language selection # Waiting for user to pick action (discuss, text summary, audio)
return await self._on_action_choice(room_id, sender, body, body_lower)
elif session.state == ArticleState.AWAITING_LANGUAGE:
# Audio flow: waiting for language selection
return self._on_language(room_id, sender, body_lower) return self._on_language(room_id, sender, body_lower)
elif session.state == ArticleState.LANGUAGE: elif session.state == ArticleState.LANGUAGE:
@@ -143,10 +147,11 @@ class ArticleSummaryHandler:
return ( return (
f"**Found:** {session.title} (~{read_time} min read){topics_hint}\n\n" f"**Found:** {session.title} (~{read_time} min read){topics_hint}\n\n"
f"Want an audio summary? What language?\n" f"What would you like to do?\n"
f"1️⃣ English\n" f"1\ufe0f\u20e3 **Discuss** \u2014 I'll read the article and we can talk about it\n"
f"2️⃣ German\n\n" f"2\ufe0f\u20e3 **Text summary** \u2014 Quick written summary\n"
f"_(or say \"cancel\" to skip)_" f"3\ufe0f\u20e3 **Audio summary** \u2014 Blinkist-style MP3\n\n"
f"_(or just keep chatting \u2014 I won't interrupt)_"
) )
def _on_language( def _on_language(
@@ -224,6 +229,79 @@ class ArticleSummaryHandler:
self.sessions.touch(sender, room_id) self.sessions.touch(sender, room_id)
return "__GENERATE__" return "__GENERATE__"
async def _on_action_choice(
self, room_id: str, sender: str, body: str, body_lower: str
) -> str | None:
"""Handle user's choice after URL detection: discuss, text summary, or audio."""
session = self.sessions.get(sender, room_id)
# Option 1: Discuss — reset FSM, return article context for AI handler
if body_lower in ("1", "discuss", "diskutieren", "besprechen"):
article_context = session.content[:8000]
title = session.title
self.sessions.reset(sender, room_id)
return f"__DISCUSS__{title}\n{article_context}"
# Option 2: Text summary — generate and return text, no TTS
if body_lower in ("2", "text", "text summary", "zusammenfassung"):
return await self._generate_text_summary(room_id, sender)
# Option 3: Audio summary — enter language selection (existing flow)
if body_lower in ("3", "audio", "audio summary"):
return self._prompt_language(room_id, sender)
# Anything else — user is just chatting, reset and pass through with article context
article_context = session.content[:8000]
title = session.title
self.sessions.reset(sender, room_id)
return f"__DISCUSS__{title}\n{article_context}"
def _prompt_language(self, room_id: str, sender: str) -> str:
"""Present language selection for audio summary."""
session = self.sessions.get(sender, room_id)
session.state = ArticleState.AWAITING_LANGUAGE
self.sessions.touch(sender, room_id)
return (
"What language for the audio summary?\n"
"1\ufe0f\u20e3 English\n"
"2\ufe0f\u20e3 German"
)
async def _generate_text_summary(self, room_id: str, sender: str) -> str | None:
"""Generate a text-only summary of the article."""
session = self.sessions.get(sender, room_id)
try:
resp = await self.llm.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": (
"Summarize this article concisely in 3-5 paragraphs. "
"Respond in the same language as the article."
),
},
{
"role": "user",
"content": f"Article: {session.title}\n\n{session.content[:12000]}",
},
],
max_tokens=1000,
temperature=0.3,
)
summary = resp.choices[0].message.content.strip()
session.summary_text = summary
session.state = ArticleState.COMPLETE
self.sessions.touch(sender, room_id)
return (
f"**Summary: {session.title}**\n\n{summary}\n\n"
f"_Ask follow-up questions or share a new link._"
)
except Exception:
logger.warning("Text summary failed", exc_info=True)
self.sessions.reset(sender, room_id)
return None
async def generate_and_post(self, bot, room_id: str, sender: str) -> None: async def generate_and_post(self, bot, room_id: str, sender: str) -> None:
"""Run the full pipeline: summarize → TTS → upload MP3.""" """Run the full pipeline: summarize → TTS → upload MP3."""
session = self.sessions.get(sender, room_id) session = self.sessions.get(sender, room_id)

View File

@@ -10,6 +10,7 @@ from enum import Enum, auto
class ArticleState(Enum): class ArticleState(Enum):
IDLE = auto() IDLE = auto()
URL_DETECTED = auto() URL_DETECTED = auto()
AWAITING_LANGUAGE = auto() # Audio flow: waiting for language selection
LANGUAGE = auto() LANGUAGE = auto()
DURATION = auto() DURATION = auto()
TOPICS = auto() TOPICS = auto()

237
bot.py
View File

@@ -114,7 +114,8 @@ IMPORTANT RULES — FOLLOW THESE STRICTLY:
- When creating Jira issues, always confirm the project key and summary with the user before creating. - When creating Jira issues, always confirm the project key and summary with the user before creating.
- If a user's Atlassian account is not connected, tell them to connect it at matrixhost.eu/settings and provide the link. - If a user's Atlassian account is not connected, tell them to connect it at matrixhost.eu/settings and provide the link.
- If user memories are provided, use them to personalize responses. Address users by name if known. - If user memories are provided, use them to personalize responses. Address users by name if known.
- When asked to translate, provide ONLY the translation with no explanation.""" - When asked to translate, provide ONLY the translation with no explanation.
- You can set reminders and scheduled messages. When users ask to be reminded of something, use the schedule_message tool. Parse natural language times like "in 2 hours", "tomorrow at 9am", "every Monday" into ISO 8601 datetime with Europe/Berlin timezone (unless user specifies otherwise)."""
IMAGE_GEN_TOOLS = [{ IMAGE_GEN_TOOLS = [{
"type": "function", "type": "function",
@@ -334,7 +335,58 @@ ROOM_TOOLS = [{
}, },
}] }]
ALL_TOOLS = IMAGE_GEN_TOOLS + WEB_SEARCH_TOOLS + ATLASSIAN_TOOLS + ROOM_TOOLS SCHEDULER_TOOLS = [
{
"type": "function",
"function": {
"name": "schedule_message",
"description": (
"Schedule a reminder or message to be sent at a future time. "
"Use when the user says things like 'remind me', 'erinnere mich', "
"'send a message at', 'every Monday at 8am', etc. "
"Parse the user's natural language time into an ISO 8601 datetime. "
"Default timezone is Europe/Berlin unless the user specifies otherwise."
),
"parameters": {
"type": "object",
"properties": {
"message": {"type": "string", "description": "The reminder text to send"},
"datetime_iso": {"type": "string", "description": "When to send, ISO 8601 format (e.g. 2026-03-10T09:00:00+01:00)"},
"repeat": {
"type": "string",
"description": "Repeat pattern: 'once' (default), 'daily', 'weekly', 'weekdays', 'monthly'",
"enum": ["once", "daily", "weekly", "weekdays", "monthly"],
},
},
"required": ["message", "datetime_iso"],
},
},
},
{
"type": "function",
"function": {
"name": "list_reminders",
"description": "List all active/pending reminders for the user. Use when they ask 'what reminders do I have?' or 'show my reminders'.",
"parameters": {"type": "object", "properties": {}},
},
},
{
"type": "function",
"function": {
"name": "cancel_reminder",
"description": "Cancel a scheduled reminder by its ID number. Use when the user says 'cancel reminder #3' or 'lösche Erinnerung 3'.",
"parameters": {
"type": "object",
"properties": {
"reminder_id": {"type": "integer", "description": "The reminder ID to cancel"},
},
"required": ["reminder_id"],
},
},
},
]
ALL_TOOLS = IMAGE_GEN_TOOLS + WEB_SEARCH_TOOLS + ATLASSIAN_TOOLS + ROOM_TOOLS + SCHEDULER_TOOLS
ATLASSIAN_NOT_CONNECTED_MSG = ( ATLASSIAN_NOT_CONNECTED_MSG = (
"Your Atlassian account is not connected. " "Your Atlassian account is not connected. "
@@ -579,6 +631,95 @@ class MemoryClient:
return [] return []
async def create_scheduled(self, user_id: str, room_id: str, message_text: str,
scheduled_at: float, repeat: str = "once") -> dict:
if not self.enabled:
return {"error": "Memory service not configured"}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(
f"{self.base_url}/scheduled/create",
json={
"user_id": user_id, "room_id": room_id,
"message_text": message_text, "scheduled_at": scheduled_at,
"repeat_pattern": repeat,
},
headers=self._headers(),
)
resp.raise_for_status()
return resp.json()
except httpx.HTTPStatusError as e:
detail = e.response.json().get("detail", str(e)) if e.response else str(e)
logger.warning("Schedule create failed: %s", detail)
return {"error": detail}
except Exception:
logger.warning("Schedule create failed", exc_info=True)
return {"error": "Failed to create reminder"}
async def list_scheduled(self, user_id: str) -> list[dict]:
if not self.enabled:
return []
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
f"{self.base_url}/scheduled/{user_id}",
headers=self._headers(),
)
resp.raise_for_status()
return resp.json().get("reminders", [])
except Exception:
logger.warning("Schedule list failed", exc_info=True)
return []
async def cancel_scheduled(self, user_id: str, reminder_id: int) -> dict:
if not self.enabled:
return {"error": "Memory service not configured"}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.delete(
f"{self.base_url}/scheduled/{user_id}/{reminder_id}",
headers=self._headers(),
)
resp.raise_for_status()
return resp.json()
except httpx.HTTPStatusError as e:
detail = e.response.json().get("detail", str(e)) if e.response else str(e)
return {"error": detail}
except Exception:
logger.warning("Schedule cancel failed", exc_info=True)
return {"error": "Failed to cancel reminder"}
async def get_due_messages(self) -> list[dict]:
if not self.enabled:
return []
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(
f"{self.base_url}/scheduled/due",
headers=self._headers(),
)
resp.raise_for_status()
return resp.json().get("due", [])
except Exception:
logger.warning("Get due messages failed", exc_info=True)
return []
async def mark_sent(self, reminder_id: int) -> dict:
if not self.enabled:
return {}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(
f"{self.base_url}/scheduled/{reminder_id}/mark-sent",
headers=self._headers(),
)
resp.raise_for_status()
return resp.json()
except Exception:
logger.warning("Mark sent failed for #%d", reminder_id, exc_info=True)
return {}
class AtlassianClient: class AtlassianClient:
"""Fetches per-user Atlassian tokens from the portal and calls Atlassian REST APIs.""" """Fetches per-user Atlassian tokens from the portal and calls Atlassian REST APIs."""
@@ -1114,6 +1255,9 @@ class Bot:
self.client.add_to_device_callback(self.on_key_verification, KeyVerificationMac) self.client.add_to_device_callback(self.on_key_verification, KeyVerificationMac)
self.client.add_to_device_callback(self.on_key_verification, KeyVerificationCancel) self.client.add_to_device_callback(self.on_key_verification, KeyVerificationCancel)
# Start reminder scheduler
asyncio.create_task(self._reminder_scheduler())
await self.client.sync_forever(timeout=30000, full_state=True) await self.client.sync_forever(timeout=30000, full_state=True)
async def _ensure_cross_signing(self): async def _ensure_cross_signing(self):
@@ -1257,6 +1401,25 @@ class Bot:
except Exception as e: except Exception as e:
logger.error("RAG key injection failed: %s", e, exc_info=True) logger.error("RAG key injection failed: %s", e, exc_info=True)
async def _reminder_scheduler(self):
"""Background loop: check for due reminders every 30 seconds, send them."""
await asyncio.sleep(10) # Wait for bot to fully initialize
logger.info("Reminder scheduler started")
while True:
try:
due = await self.memory.get_due_messages()
for msg in due:
try:
reminder_text = f"\u23f0 **Erinnerung:** {msg['message_text']}"
await self._send_text(msg["room_id"], reminder_text)
await self.memory.mark_sent(msg["id"])
logger.info("Sent reminder #%d to %s", msg["id"], msg["room_id"])
except Exception:
logger.warning("Failed to send reminder #%d", msg["id"], exc_info=True)
except Exception:
logger.warning("Reminder scheduler check failed", exc_info=True)
await asyncio.sleep(30)
async def on_invite(self, room, event: InviteMemberEvent): async def on_invite(self, room, event: InviteMemberEvent):
if event.state_key != BOT_USER: if event.state_key != BOT_USER:
return return
@@ -1785,6 +1948,12 @@ class Bot:
) )
finally: finally:
await self.client.room_typing(room.room_id, typing_state=False) await self.client.room_typing(room.room_id, typing_state=False)
return
elif summary_response.startswith("__DISCUSS__"):
# Extract article context, enrich the user message for AI
article_info = summary_response[len("__DISCUSS__"):]
body = f"[Article context: {article_info[:6000]}]\n\nUser message: {body}"
# Fall through to normal AI handler with enriched context
elif summary_response: elif summary_response:
await self._send_text(room.room_id, summary_response) await self._send_text(room.room_id, summary_response)
return return
@@ -2245,6 +2414,17 @@ class Bot:
room_id, args.get("query", ""), args.get("limit", 200) room_id, args.get("query", ""), args.get("limit", 200)
) )
# Scheduler tools — no Atlassian auth needed
if tool_name == "schedule_message":
return await self._schedule_message(
sender, room_id, args.get("message", ""),
args.get("datetime_iso", ""), args.get("repeat", "once")
)
if tool_name == "list_reminders":
return await self._list_reminders(sender)
if tool_name == "cancel_reminder":
return await self._cancel_reminder(sender, args.get("reminder_id", 0))
# Atlassian tools — need per-user token # Atlassian tools — need per-user token
token = await self.atlassian.get_token(sender) if sender else None token = await self.atlassian.get_token(sender) if sender else None
if not token: if not token:
@@ -2319,6 +2499,59 @@ class Bot:
logger.warning("Room history search failed", exc_info=True) logger.warning("Room history search failed", exc_info=True)
return "Failed to search room history." return "Failed to search room history."
async def _schedule_message(self, sender: str, room_id: str,
message: str, datetime_iso: str, repeat: str) -> str:
"""Parse datetime, validate, store via memory service."""
if not message:
return "No reminder message provided."
if not datetime_iso:
return "No datetime provided."
try:
from datetime import datetime as dt, timezone
parsed = dt.fromisoformat(datetime_iso)
if parsed.tzinfo is None:
# Default to Europe/Berlin
import zoneinfo
parsed = parsed.replace(tzinfo=zoneinfo.ZoneInfo("Europe/Berlin"))
ts = parsed.timestamp()
except Exception:
return f"Could not parse datetime: {datetime_iso}. Use ISO 8601 format."
if ts <= time.time():
return "That time has already passed. Please specify a future time."
result = await self.memory.create_scheduled(sender, room_id, message, ts, repeat or "once")
if "error" in result:
return f"Failed to create reminder: {result['error']}"
time_str = parsed.strftime("%B %d, %Y at %H:%M")
tz_name = str(parsed.tzinfo) if parsed.tzinfo else "Europe/Berlin"
repeat_str = f" (repeats {repeat})" if repeat and repeat != "once" else ""
return f"Reminder #{result.get('id', '?')} set for {time_str} ({tz_name}){repeat_str}: {message}"
async def _list_reminders(self, sender: str) -> str:
"""List user's active reminders."""
reminders = await self.memory.list_scheduled(sender)
if not reminders:
return "You have no active reminders."
from datetime import datetime as dt, timezone
lines = []
for r in reminders:
t = dt.fromtimestamp(r["scheduled_at"], tz=timezone.utc)
time_str = t.strftime("%Y-%m-%d %H:%M UTC")
repeat = f" ({r['repeat_pattern']})" if r["repeat_pattern"] != "once" else ""
lines.append(f"**#{r['id']}** — {time_str}{repeat}: {r['message_text']}")
return f"**Your reminders ({len(lines)}):**\n" + "\n".join(lines)
async def _cancel_reminder(self, sender: str, reminder_id: int) -> str:
"""Cancel a reminder by ID."""
if not reminder_id:
return "Please provide a reminder ID to cancel."
result = await self.memory.cancel_scheduled(sender, reminder_id)
if "error" in result:
return f"Could not cancel reminder #{reminder_id}: {result['error']}"
return f"Reminder #{reminder_id} cancelled."
# -- Escalation patterns for model routing -- # -- Escalation patterns for model routing --
_ESCALATION_KEYWORDS = re.compile( _ESCALATION_KEYWORDS = re.compile(
r"\b(debug|architecture|algorithm|regex|sql|refactor|optimize|migration" r"\b(debug|architecture|algorithm|regex|sql|refactor|optimize|migration"

View File

@@ -72,6 +72,34 @@ def _decrypt(ciphertext: str, user_id: str) -> str:
return ciphertext return ciphertext
class ScheduleRequest(BaseModel):
user_id: str
room_id: str
message_text: str
scheduled_at: float # Unix timestamp
repeat_pattern: str = "once" # once | daily | weekly | weekdays | monthly
@field_validator('user_id')
@classmethod
def user_id_not_empty(cls, v):
if not v or not v.strip():
raise ValueError("user_id is required")
return v.strip()
@field_validator('repeat_pattern')
@classmethod
def valid_pattern(cls, v):
allowed = {"once", "daily", "weekly", "weekdays", "monthly"}
if v not in allowed:
raise ValueError(f"repeat_pattern must be one of {allowed}")
return v
class ScheduleCancelRequest(BaseModel):
id: int
user_id: str
class StoreRequest(BaseModel): class StoreRequest(BaseModel):
user_id: str user_id: str
fact: str fact: str
@@ -237,6 +265,27 @@ async def _init_db():
await conn.execute(""" await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_chunks_user_room ON conversation_chunks (user_id, room_id) CREATE INDEX IF NOT EXISTS idx_chunks_user_room ON conversation_chunks (user_id, room_id)
""") """)
# Scheduled messages table for reminders
await conn.execute("""
CREATE TABLE IF NOT EXISTS scheduled_messages (
id BIGSERIAL PRIMARY KEY,
user_id TEXT NOT NULL,
room_id TEXT NOT NULL,
message_text TEXT NOT NULL,
scheduled_at DOUBLE PRECISION NOT NULL,
created_at DOUBLE PRECISION NOT NULL,
status TEXT DEFAULT 'pending',
repeat_pattern TEXT DEFAULT 'once',
repeat_interval_seconds INTEGER DEFAULT 0,
last_sent_at DOUBLE PRECISION DEFAULT 0
)
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_scheduled_user_id ON scheduled_messages (user_id)
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_scheduled_status ON scheduled_messages (status, scheduled_at)
""")
finally: finally:
await owner_conn.close() await owner_conn.close()
# Create restricted pool for all request handlers (RLS applies) # Create restricted pool for all request handlers (RLS applies)
@@ -269,10 +318,12 @@ async def health():
async with owner_pool.acquire() as conn: async with owner_pool.acquire() as conn:
mem_count = await conn.fetchval("SELECT count(*) FROM memories") mem_count = await conn.fetchval("SELECT count(*) FROM memories")
chunk_count = await conn.fetchval("SELECT count(*) FROM conversation_chunks") chunk_count = await conn.fetchval("SELECT count(*) FROM conversation_chunks")
sched_count = await conn.fetchval("SELECT count(*) FROM scheduled_messages WHERE status = 'pending'")
return { return {
"status": "ok", "status": "ok",
"total_memories": mem_count, "total_memories": mem_count,
"total_chunks": chunk_count, "total_chunks": chunk_count,
"pending_reminders": sched_count,
"encryption": "on" if ENCRYPTION_KEY else "off", "encryption": "on" if ENCRYPTION_KEY else "off",
} }
except Exception as e: except Exception as e:
@@ -525,3 +576,190 @@ async def count_user_chunks(user_id: str, _: None = Depends(verify_token)):
"SELECT count(*) FROM conversation_chunks WHERE user_id = $1", user_id, "SELECT count(*) FROM conversation_chunks WHERE user_id = $1", user_id,
) )
return {"user_id": user_id, "count": count} return {"user_id": user_id, "count": count}
# --- Scheduled Messages ---
import calendar
import datetime
def _compute_repeat_interval(pattern: str) -> int:
"""Compute repeat_interval_seconds from pattern name."""
return {
"once": 0,
"daily": 86400,
"weekly": 604800,
"weekdays": 86400, # special handling in mark-sent
"monthly": 0, # special handling in mark-sent
}.get(pattern, 0)
def _next_scheduled_at(current_ts: float, pattern: str) -> float:
"""Compute the next scheduled_at timestamp for recurring patterns."""
dt = datetime.datetime.fromtimestamp(current_ts, tz=datetime.timezone.utc)
if pattern == "daily":
return current_ts + 86400.0
elif pattern == "weekly":
return current_ts + 604800.0
elif pattern == "weekdays":
next_dt = dt + datetime.timedelta(days=1)
while next_dt.weekday() >= 5: # Skip Sat(5), Sun(6)
next_dt += datetime.timedelta(days=1)
return next_dt.timestamp()
elif pattern == "monthly":
month = dt.month + 1
year = dt.year + (month - 1) // 12
month = (month - 1) % 12 + 1
day = min(dt.day, calendar.monthrange(year, month)[1])
return dt.replace(year=year, month=month, day=day).timestamp()
return current_ts
MAX_REMINDERS_PER_USER = 50
@app.post("/scheduled/create")
async def create_scheduled(req: ScheduleRequest, _: None = Depends(verify_token)):
"""Create a new scheduled message/reminder."""
now = time.time()
if req.scheduled_at <= now:
raise HTTPException(400, "scheduled_at must be in the future")
# Check max reminders per user
await _ensure_pool()
async with owner_pool.acquire() as conn:
count = await conn.fetchval(
"SELECT count(*) FROM scheduled_messages WHERE user_id = $1 AND status = 'pending'",
req.user_id,
)
if count >= MAX_REMINDERS_PER_USER:
raise HTTPException(400, f"Maximum {MAX_REMINDERS_PER_USER} active reminders per user")
msg_text = req.message_text[:2000] # Truncate long messages
interval = _compute_repeat_interval(req.repeat_pattern)
row_id = await conn.fetchval(
"""
INSERT INTO scheduled_messages
(user_id, room_id, message_text, scheduled_at, created_at, status, repeat_pattern, repeat_interval_seconds)
VALUES ($1, $2, $3, $4, $5, 'pending', $6, $7)
RETURNING id
""",
req.user_id, req.room_id, msg_text, req.scheduled_at, now,
req.repeat_pattern, interval,
)
logger.info("Created reminder #%d for %s at %.0f (%s)", row_id, req.user_id, req.scheduled_at, req.repeat_pattern)
return {"id": row_id, "created": True}
@app.get("/scheduled/{user_id}")
async def list_scheduled(user_id: str, _: None = Depends(verify_token)):
"""List all pending/active reminders for a user."""
await _ensure_pool()
async with owner_pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, message_text, scheduled_at, repeat_pattern, status
FROM scheduled_messages
WHERE user_id = $1 AND status = 'pending'
ORDER BY scheduled_at
""",
user_id,
)
return {
"user_id": user_id,
"reminders": [
{
"id": r["id"],
"message_text": r["message_text"],
"scheduled_at": r["scheduled_at"],
"repeat_pattern": r["repeat_pattern"],
"status": r["status"],
}
for r in rows
],
}
@app.delete("/scheduled/{user_id}/{reminder_id}")
async def cancel_scheduled(user_id: str, reminder_id: int, _: None = Depends(verify_token)):
"""Cancel a reminder. Only the owner can cancel."""
await _ensure_pool()
async with owner_pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE scheduled_messages SET status = 'cancelled'
WHERE id = $1 AND user_id = $2 AND status = 'pending'
""",
reminder_id, user_id,
)
count = int(result.split()[-1])
if count == 0:
raise HTTPException(404, "Reminder not found or already cancelled")
logger.info("Cancelled reminder #%d for %s", reminder_id, user_id)
return {"cancelled": True, "id": reminder_id}
@app.post("/scheduled/due")
async def get_due_messages(_: None = Depends(verify_token)):
"""Return all messages that are due (scheduled_at <= now, status = pending)."""
now = time.time()
await _ensure_pool()
async with owner_pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, user_id, room_id, message_text, scheduled_at, repeat_pattern
FROM scheduled_messages
WHERE scheduled_at <= $1 AND status = 'pending'
ORDER BY scheduled_at
LIMIT 100
""",
now,
)
return {
"due": [
{
"id": r["id"],
"user_id": r["user_id"],
"room_id": r["room_id"],
"message_text": r["message_text"],
"scheduled_at": r["scheduled_at"],
"repeat_pattern": r["repeat_pattern"],
}
for r in rows
],
}
@app.post("/scheduled/{reminder_id}/mark-sent")
async def mark_sent(reminder_id: int, _: None = Depends(verify_token)):
"""Mark a reminder as sent. For recurring, compute next scheduled_at."""
now = time.time()
await _ensure_pool()
async with owner_pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT repeat_pattern, scheduled_at FROM scheduled_messages WHERE id = $1",
reminder_id,
)
if not row:
raise HTTPException(404, "Reminder not found")
if row["repeat_pattern"] == "once":
await conn.execute(
"UPDATE scheduled_messages SET status = 'sent', last_sent_at = $1 WHERE id = $2",
now, reminder_id,
)
else:
next_at = _next_scheduled_at(row["scheduled_at"], row["repeat_pattern"])
await conn.execute(
"""
UPDATE scheduled_messages
SET scheduled_at = $1, last_sent_at = $2
WHERE id = $3
""",
next_at, now, reminder_id,
)
logger.info("Marked reminder #%d as sent (pattern=%s)", reminder_id, row["repeat_pattern"])
return {"marked": True}