feat: add HTTP API for E2EE Matrix notifications
- POST /notify: send encrypted message to any room - GET /messages: read decrypted messages from any room - GET /health: health check - Authenticated via BOT_API_KEY header - Port 9100 exposed in docker-compose Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
80
bot.py
80
bot.py
@@ -9,6 +9,7 @@ import re
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from aiohttp import web
|
||||
import sentry_sdk
|
||||
import docx
|
||||
import fitz # pymupdf
|
||||
@@ -1404,6 +1405,9 @@ class Bot:
|
||||
asyncio.create_task(self.cron_scheduler.start())
|
||||
logger.info("Cron scheduler task created")
|
||||
|
||||
# Start HTTP API server for notify/messages
|
||||
asyncio.create_task(self._start_api_server())
|
||||
|
||||
await self.client.sync_forever(timeout=30000, full_state=True)
|
||||
|
||||
async def _cleanup_own_devices(self, current_device_id: str, access_token: str, keep: int = 3, max_age_days: int = 30):
|
||||
@@ -4057,7 +4061,83 @@ class Bot:
|
||||
if not isinstance(mac, ToDeviceError):
|
||||
await self.client.to_device(mac)
|
||||
|
||||
# ─── HTTP API for external integrations (deploy notifications, session messages) ───
|
||||
|
||||
async def _start_api_server(self):
|
||||
"""Start aiohttp server for /notify and /messages endpoints."""
|
||||
app = web.Application()
|
||||
app.router.add_post('/notify', self._api_notify)
|
||||
app.router.add_get('/messages', self._api_messages)
|
||||
app.router.add_get('/health', self._api_health)
|
||||
runner = web.AppRunner(app)
|
||||
await runner.setup()
|
||||
self._api_runner = runner
|
||||
site = web.TCPSite(runner, '0.0.0.0', 9100)
|
||||
await site.start()
|
||||
logger.info("HTTP API server started on port 9100")
|
||||
|
||||
def _api_check_auth(self, request: web.Request) -> bool:
|
||||
auth = request.headers.get('Authorization', '')
|
||||
return auth == f'Bearer {BOT_API_KEY}' and BOT_API_KEY
|
||||
|
||||
async def _api_health(self, request: web.Request) -> web.Response:
|
||||
return web.json_response({'status': 'ok'})
|
||||
|
||||
async def _api_notify(self, request: web.Request) -> web.Response:
|
||||
"""Send an encrypted message to a Matrix room.
|
||||
POST /notify {"room_id": "!abc...", "message": "text"}
|
||||
"""
|
||||
if not self._api_check_auth(request):
|
||||
return web.json_response({'error': 'unauthorized'}, status=401)
|
||||
try:
|
||||
body = await request.json()
|
||||
except Exception:
|
||||
return web.json_response({'error': 'invalid json'}, status=400)
|
||||
room_id = body.get('room_id')
|
||||
message = body.get('message')
|
||||
if not room_id or not message:
|
||||
return web.json_response({'error': 'room_id and message required'}, status=400)
|
||||
try:
|
||||
await self._send_text(room_id, message)
|
||||
return web.json_response({'status': 'sent'})
|
||||
except Exception as e:
|
||||
logger.error("Notify error: %s", e)
|
||||
return web.json_response({'error': str(e)}, status=500)
|
||||
|
||||
async def _api_messages(self, request: web.Request) -> web.Response:
|
||||
"""Read decrypted messages from a Matrix room.
|
||||
GET /messages?room_id=!abc...&limit=10
|
||||
"""
|
||||
if not self._api_check_auth(request):
|
||||
return web.json_response({'error': 'unauthorized'}, status=401)
|
||||
room_id = request.query.get('room_id')
|
||||
limit = int(request.query.get('limit', '10'))
|
||||
if not room_id:
|
||||
return web.json_response({'error': 'room_id required'}, status=400)
|
||||
try:
|
||||
room = self.client.rooms.get(room_id)
|
||||
if not room:
|
||||
return web.json_response({'error': 'room not found (bot not joined?)'}, status=404)
|
||||
messages = []
|
||||
for event in room.timeline[-limit:]:
|
||||
msg = {
|
||||
'sender': event.sender,
|
||||
'timestamp': getattr(event, 'server_timestamp', 0),
|
||||
'type': type(event).__name__,
|
||||
}
|
||||
if hasattr(event, 'body'):
|
||||
msg['body'] = event.body
|
||||
elif hasattr(event, 'source'):
|
||||
msg['body'] = event.source.get('content', {}).get('body', '[encrypted/unknown]')
|
||||
messages.append(msg)
|
||||
return web.json_response({'messages': messages})
|
||||
except Exception as e:
|
||||
logger.error("Messages error: %s", e)
|
||||
return web.json_response({'error': str(e)}, status=500)
|
||||
|
||||
async def cleanup(self):
|
||||
if hasattr(self, '_api_runner'):
|
||||
await self._api_runner.cleanup()
|
||||
if self.cron_scheduler:
|
||||
await self.cron_scheduler.stop()
|
||||
await self.client.close()
|
||||
|
||||
Reference in New Issue
Block a user