feat: Add Atlassian tools and agentic tool-calling loop

- Add AtlassianClient class: fetches per-user OAuth tokens from portal,
  calls Jira and Confluence REST APIs on behalf of users
- Add 7 Atlassian tools: confluence_search, confluence_read_page,
  jira_search, jira_get_issue, jira_create_issue, jira_add_comment,
  jira_transition
- Replace single LLM call with agentic loop (max 5 iterations)
  that feeds tool results back to the model
- Add PORTAL_URL and BOT_API_KEY env vars to docker-compose
- Update system prompt with Atlassian tool guidance

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Christian Gick
2026-02-26 10:15:15 +02:00
parent 08a3c4a9cc
commit 48f6e7dd17
2 changed files with 477 additions and 15 deletions

490
bot.py
View File

@@ -67,6 +67,9 @@ MEMORY_SERVICE_URL = os.environ.get("MEMORY_SERVICE_URL", "http://memory-service
CONFLUENCE_URL = os.environ.get("CONFLUENCE_BASE_URL", "")
CONFLUENCE_USER = os.environ.get("CONFLUENCE_USER", "")
CONFLUENCE_TOKEN = os.environ.get("CONFLUENCE_TOKEN", "")
PORTAL_URL = os.environ.get("PORTAL_URL", "")
BOT_API_KEY = os.environ.get("BOT_API_KEY", "")
MAX_TOOL_ITERATIONS = 5
SYSTEM_PROMPT = """You are a helpful AI assistant in a Matrix chat room.
Keep answers concise but thorough. Use markdown formatting when helpful.
@@ -84,6 +87,9 @@ IMPORTANT RULES — FOLLOW THESE STRICTLY:
- You can see and analyze images that users send. Describe what you see when asked about an image.
- You can read and analyze PDF documents that users send. Summarize content and answer questions about them.
- You can generate images when asked — use the generate_image tool for any image creation, drawing, or illustration requests.
- You can search Confluence and Jira using tools. When users ask about documentation, wiki pages, tickets, or tasks, use the appropriate tool.
- When creating Jira issues, always confirm the project key and summary with the user before creating.
- If a user's Atlassian account is not connected, tell them to connect it at matrixhost.eu/settings and provide the link.
- If user memories are provided, use them to personalize responses. Address users by name if known.
- When asked to translate, provide ONLY the translation with no explanation."""
@@ -102,6 +108,122 @@ IMAGE_GEN_TOOLS = [{
}
}]
ATLASSIAN_TOOLS = [
{
"type": "function",
"function": {
"name": "confluence_search",
"description": "Search Confluence wiki pages using CQL (Confluence Query Language). Use when the user asks about documentation, wiki, or knowledge base content.",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query text"},
"limit": {"type": "integer", "description": "Max results to return (default 5)", "default": 5},
},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "confluence_read_page",
"description": "Read the full content of a Confluence page by its ID. Use after searching to get page details.",
"parameters": {
"type": "object",
"properties": {
"page_id": {"type": "string", "description": "The Confluence page ID"},
},
"required": ["page_id"],
},
},
},
{
"type": "function",
"function": {
"name": "jira_search",
"description": "Search Jira issues using JQL (Jira Query Language). Use when the user asks about tickets, tasks, bugs, or project issues.",
"parameters": {
"type": "object",
"properties": {
"jql": {"type": "string", "description": "JQL query string"},
"limit": {"type": "integer", "description": "Max results (default 10)", "default": 10},
},
"required": ["jql"],
},
},
},
{
"type": "function",
"function": {
"name": "jira_get_issue",
"description": "Get detailed information about a specific Jira issue by its key (e.g. CF-123).",
"parameters": {
"type": "object",
"properties": {
"issue_key": {"type": "string", "description": "Jira issue key like CF-123"},
},
"required": ["issue_key"],
},
},
},
{
"type": "function",
"function": {
"name": "jira_create_issue",
"description": "Create a new Jira issue. Always confirm project and summary with user before creating.",
"parameters": {
"type": "object",
"properties": {
"project": {"type": "string", "description": "Project key (e.g. CF)"},
"summary": {"type": "string", "description": "Issue title/summary"},
"issue_type": {"type": "string", "description": "Issue type (default: Task)", "default": "Task"},
"description": {"type": "string", "description": "Issue description (optional)"},
},
"required": ["project", "summary"],
},
},
},
{
"type": "function",
"function": {
"name": "jira_add_comment",
"description": "Add a comment to an existing Jira issue.",
"parameters": {
"type": "object",
"properties": {
"issue_key": {"type": "string", "description": "Jira issue key like CF-123"},
"comment": {"type": "string", "description": "Comment text"},
},
"required": ["issue_key", "comment"],
},
},
},
{
"type": "function",
"function": {
"name": "jira_transition",
"description": "Change the status of a Jira issue (e.g. move to 'In Progress', 'Done').",
"parameters": {
"type": "object",
"properties": {
"issue_key": {"type": "string", "description": "Jira issue key like CF-123"},
"status": {"type": "string", "description": "Target status name (e.g. 'In Progress', 'Done')"},
},
"required": ["issue_key", "status"],
},
},
},
]
ALL_TOOLS = IMAGE_GEN_TOOLS + ATLASSIAN_TOOLS
ATLASSIAN_NOT_CONNECTED_MSG = (
"Your Atlassian account is not connected. "
"Please connect it at [matrixhost.eu/settings](https://matrixhost.eu/settings?connect=atlassian) "
"to use Jira and Confluence features."
)
HELP_TEXT = """**AI Bot Commands**
- `!ai help` — Show this help
- `!ai models` — List available models
@@ -254,6 +376,281 @@ class MemoryClient:
return []
class AtlassianClient:
"""Fetches per-user Atlassian tokens from the portal and calls Atlassian REST APIs."""
def __init__(self, portal_url: str, bot_api_key: str):
self.portal_url = portal_url.rstrip("/")
self.bot_api_key = bot_api_key
self.enabled = bool(portal_url and bot_api_key)
# Cache cloud IDs per user token to avoid repeated lookups
self._cloud_id_cache: dict[str, str] = {}
async def get_token(self, matrix_user_id: str) -> str | None:
"""Fetch user's Atlassian token from the portal."""
if not self.enabled:
return None
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
f"{self.portal_url}/api/bot/tokens",
params={"matrix_user_id": matrix_user_id, "provider": "atlassian"},
headers={"Authorization": f"Bearer {self.bot_api_key}"},
)
resp.raise_for_status()
data = resp.json()
return data.get("access_token") if data.get("connected") else None
except Exception:
logger.warning("Failed to fetch Atlassian token for %s", matrix_user_id, exc_info=True)
return None
async def _get_cloud_id(self, token: str) -> str | None:
"""Get the Atlassian Cloud ID for the user's instance."""
if token in self._cloud_id_cache:
return self._cloud_id_cache[token]
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
"https://api.atlassian.com/oauth/token/accessible-resources",
headers={"Authorization": f"Bearer {token}"},
)
resp.raise_for_status()
resources = resp.json()
if resources:
cloud_id = resources[0]["id"]
self._cloud_id_cache[token] = cloud_id
return cloud_id
except Exception:
logger.warning("Failed to fetch Atlassian cloud ID", exc_info=True)
return None
async def confluence_search(self, token: str, query: str, limit: int = 5) -> str:
cloud_id = await self._get_cloud_id(token)
if not cloud_id:
return "Error: Could not determine Atlassian Cloud instance."
try:
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.get(
f"https://api.atlassian.com/ex/confluence/{cloud_id}/wiki/rest/api/content/search",
params={"cql": f'text ~ "{query}"', "limit": str(limit), "expand": "metadata.labels"},
headers={"Authorization": f"Bearer {token}"},
)
resp.raise_for_status()
data = resp.json()
results = data.get("results", [])
if not results:
return f"No Confluence pages found for: {query}"
lines = []
for r in results:
title = r.get("title", "Untitled")
page_id = r.get("id", "")
space = r.get("_expandable", {}).get("space", "").split("/")[-1]
url = f"https://api.atlassian.com/ex/confluence/{cloud_id}/wiki{r.get('_links', {}).get('webui', '')}"
lines.append(f"- **{title}** (ID: {page_id}, Space: {space})\n {url}")
return f"Found {len(results)} pages:\n" + "\n".join(lines)
except Exception as e:
return f"Confluence search error: {e}"
async def confluence_read_page(self, token: str, page_id: str) -> str:
cloud_id = await self._get_cloud_id(token)
if not cloud_id:
return "Error: Could not determine Atlassian Cloud instance."
try:
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.get(
f"https://api.atlassian.com/ex/confluence/{cloud_id}/wiki/rest/api/content/{page_id}",
params={"expand": "body.storage,version"},
headers={"Authorization": f"Bearer {token}"},
)
resp.raise_for_status()
data = resp.json()
title = data.get("title", "Untitled")
html_body = data.get("body", {}).get("storage", {}).get("value", "")
# Strip HTML tags for a readable summary
from bs4 import BeautifulSoup
text = BeautifulSoup(html_body, "lxml").get_text(separator="\n", strip=True)
# Truncate for LLM context
if len(text) > 8000:
text = text[:8000] + "\n...(truncated)"
return f"**{title}**\n\n{text}"
except Exception as e:
return f"Failed to read Confluence page {page_id}: {e}"
async def jira_search(self, token: str, jql: str, limit: int = 10) -> str:
cloud_id = await self._get_cloud_id(token)
if not cloud_id:
return "Error: Could not determine Atlassian Cloud instance."
try:
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.get(
f"https://api.atlassian.com/ex/jira/{cloud_id}/rest/api/3/search/jql",
params={"jql": jql, "maxResults": str(limit), "fields": "summary,status,assignee,priority,created,updated"},
headers={"Authorization": f"Bearer {token}"},
)
resp.raise_for_status()
data = resp.json()
issues = data.get("issues", [])
if not issues:
return f"No Jira issues found for: {jql}"
lines = []
for issue in issues:
key = issue["key"]
fields = issue.get("fields", {})
summary = fields.get("summary", "")
status = fields.get("status", {}).get("name", "")
assignee = fields.get("assignee", {})
assignee_name = assignee.get("displayName", "Unassigned") if assignee else "Unassigned"
priority = fields.get("priority", {}).get("name", "") if fields.get("priority") else ""
lines.append(f"- **{key}**: {summary} [{status}] (Assignee: {assignee_name}, Priority: {priority})")
return f"Found {data.get('total', len(issues))} issues:\n" + "\n".join(lines)
except Exception as e:
return f"Jira search error: {e}"
async def jira_get_issue(self, token: str, issue_key: str) -> str:
cloud_id = await self._get_cloud_id(token)
if not cloud_id:
return "Error: Could not determine Atlassian Cloud instance."
try:
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.get(
f"https://api.atlassian.com/ex/jira/{cloud_id}/rest/api/3/issue/{issue_key}",
params={"fields": "summary,status,assignee,priority,description,created,updated,comment"},
headers={"Authorization": f"Bearer {token}"},
)
resp.raise_for_status()
data = resp.json()
fields = data.get("fields", {})
summary = fields.get("summary", "")
status = fields.get("status", {}).get("name", "")
assignee = fields.get("assignee", {})
assignee_name = assignee.get("displayName", "Unassigned") if assignee else "Unassigned"
priority = fields.get("priority", {}).get("name", "") if fields.get("priority") else ""
desc = fields.get("description")
# ADF description — extract text nodes
desc_text = ""
if desc and isinstance(desc, dict):
desc_text = self._extract_adf_text(desc)
elif isinstance(desc, str):
desc_text = desc
if len(desc_text) > 4000:
desc_text = desc_text[:4000] + "...(truncated)"
comments = fields.get("comment", {}).get("comments", [])
comment_lines = []
for c in comments[-5:]: # Last 5 comments
author = c.get("author", {}).get("displayName", "Unknown")
body = self._extract_adf_text(c.get("body", {})) if isinstance(c.get("body"), dict) else str(c.get("body", ""))
comment_lines.append(f" - **{author}**: {body[:500]}")
result = f"**{issue_key}: {summary}**\nStatus: {status} | Assignee: {assignee_name} | Priority: {priority}"
if desc_text:
result += f"\n\nDescription:\n{desc_text}"
if comment_lines:
result += f"\n\nRecent comments:\n" + "\n".join(comment_lines)
return result
except Exception as e:
return f"Failed to fetch {issue_key}: {e}"
async def jira_create_issue(self, token: str, project: str, summary: str,
issue_type: str = "Task", description: str = "") -> str:
cloud_id = await self._get_cloud_id(token)
if not cloud_id:
return "Error: Could not determine Atlassian Cloud instance."
try:
body: dict = {
"fields": {
"project": {"key": project},
"summary": summary,
"issuetype": {"name": issue_type},
}
}
if description:
body["fields"]["description"] = {
"type": "doc",
"version": 1,
"content": [{"type": "paragraph", "content": [{"type": "text", "text": description}]}],
}
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.post(
f"https://api.atlassian.com/ex/jira/{cloud_id}/rest/api/3/issue",
json=body,
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
)
resp.raise_for_status()
data = resp.json()
return f"Created **{data['key']}**: {summary}"
except httpx.HTTPStatusError as e:
return f"Failed to create issue: {e.response.text}"
except Exception as e:
return f"Failed to create issue: {e}"
async def jira_add_comment(self, token: str, issue_key: str, comment: str) -> str:
cloud_id = await self._get_cloud_id(token)
if not cloud_id:
return "Error: Could not determine Atlassian Cloud instance."
try:
body = {
"body": {
"type": "doc",
"version": 1,
"content": [{"type": "paragraph", "content": [{"type": "text", "text": comment}]}],
}
}
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.post(
f"https://api.atlassian.com/ex/jira/{cloud_id}/rest/api/3/issue/{issue_key}/comment",
json=body,
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
)
resp.raise_for_status()
return f"Comment added to {issue_key}."
except Exception as e:
return f"Failed to add comment to {issue_key}: {e}"
async def jira_transition(self, token: str, issue_key: str, status: str) -> str:
cloud_id = await self._get_cloud_id(token)
if not cloud_id:
return "Error: Could not determine Atlassian Cloud instance."
try:
async with httpx.AsyncClient(timeout=15.0) as client:
# First, get available transitions
resp = await client.get(
f"https://api.atlassian.com/ex/jira/{cloud_id}/rest/api/3/issue/{issue_key}/transitions",
headers={"Authorization": f"Bearer {token}"},
)
resp.raise_for_status()
transitions = resp.json().get("transitions", [])
# Find matching transition (case-insensitive)
target = None
for t in transitions:
if t["name"].lower() == status.lower() or t["to"]["name"].lower() == status.lower():
target = t
break
if not target:
available = ", ".join(t["name"] for t in transitions)
return f"Cannot transition {issue_key} to '{status}'. Available transitions: {available}"
# Execute transition
resp = await client.post(
f"https://api.atlassian.com/ex/jira/{cloud_id}/rest/api/3/issue/{issue_key}/transitions",
json={"transition": {"id": target["id"]}},
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
)
resp.raise_for_status()
return f"Transitioned {issue_key} to **{target['to']['name']}**."
except Exception as e:
return f"Failed to transition {issue_key}: {e}"
@staticmethod
def _extract_adf_text(adf: dict) -> str:
"""Recursively extract text from Atlassian Document Format."""
if not isinstance(adf, dict):
return str(adf)
parts = []
if adf.get("type") == "text":
parts.append(adf.get("text", ""))
for child in adf.get("content", []):
parts.append(AtlassianClient._extract_adf_text(child))
return " ".join(parts).strip()
class Bot:
def __init__(self):
config = AsyncClientConfig(
@@ -274,6 +671,7 @@ class Bot:
self.active_callers: dict[str, set[str]] = {} # room_id → set of caller user IDs
self.rag = DocumentRAG(WILDFILES_BASE_URL, WILDFILES_ORG)
self.memory = MemoryClient(MEMORY_SERVICE_URL)
self.atlassian = AtlassianClient(PORTAL_URL, BOT_API_KEY)
self.llm = AsyncOpenAI(base_url=LITELLM_URL, api_key=LITELLM_KEY) if LITELLM_URL else None
self.user_keys: dict[str, str] = self._load_user_keys() # matrix_user_id -> api_key
self.room_models: dict[str, str] = {} # room_id -> model name
@@ -1495,6 +1893,38 @@ class Bot:
finally:
self._pending_connects.pop(sender, None)
async def _execute_tool(self, tool_name: str, args: dict, sender: str, room_id: str) -> str:
"""Execute a tool call and return the result as a string."""
# Image generation — no Atlassian token needed
if tool_name == "generate_image":
await self._generate_and_send_image(room_id, args.get("prompt", ""))
return "Image generated and sent to the room."
# Atlassian tools — need per-user token
token = await self.atlassian.get_token(sender) if sender else None
if not token:
return ATLASSIAN_NOT_CONNECTED_MSG
if tool_name == "confluence_search":
return await self.atlassian.confluence_search(token, args["query"], args.get("limit", 5))
elif tool_name == "confluence_read_page":
return await self.atlassian.confluence_read_page(token, args["page_id"])
elif tool_name == "jira_search":
return await self.atlassian.jira_search(token, args["jql"], args.get("limit", 10))
elif tool_name == "jira_get_issue":
return await self.atlassian.jira_get_issue(token, args["issue_key"])
elif tool_name == "jira_create_issue":
return await self.atlassian.jira_create_issue(
token, args["project"], args["summary"],
args.get("issue_type", "Task"), args.get("description", ""),
)
elif tool_name == "jira_add_comment":
return await self.atlassian.jira_add_comment(token, args["issue_key"], args["comment"])
elif tool_name == "jira_transition":
return await self.atlassian.jira_transition(token, args["issue_key"], args["status"])
else:
return f"Unknown tool: {tool_name}"
async def _respond_with_ai(self, room, user_message: str, sender: str = None, image_data: tuple = None) -> str | None:
"""Send AI response and return the reply text (or None on failure)."""
model = self.room_models.get(room.room_id, DEFAULT_MODEL)
@@ -1569,24 +1999,54 @@ class Bot:
else:
messages.append({"role": "user", "content": user_message})
try:
resp = await self.llm.chat.completions.create(
model=model,
messages=messages,
max_tokens=2048,
tools=IMAGE_GEN_TOOLS if not image_data else None,
)
choice = resp.choices[0]
reply = choice.message.content or ""
# Determine available tools (no tools when analyzing images)
tools = ALL_TOOLS if not image_data else None
if choice.message.tool_calls:
try:
reply = ""
# Agentic tool-calling loop: iterate up to MAX_TOOL_ITERATIONS
for iteration in range(MAX_TOOL_ITERATIONS):
resp = await self.llm.chat.completions.create(
model=model,
messages=messages,
max_tokens=2048,
tools=tools,
)
choice = resp.choices[0]
reply = choice.message.content or ""
if not choice.message.tool_calls:
# No tool calls — final text response
break
# Process tool calls and feed results back
# Append the assistant message with tool_calls
assistant_msg = {"role": "assistant", "content": reply or None, "tool_calls": []}
for tc in choice.message.tool_calls:
if tc.function.name == "generate_image":
assistant_msg["tool_calls"].append({
"id": tc.id,
"type": "function",
"function": {"name": tc.function.name, "arguments": tc.function.arguments},
})
messages.append(assistant_msg)
# Execute each tool and append results
for tc in choice.message.tool_calls:
try:
args = json.loads(tc.function.arguments)
await self._generate_and_send_image(room.room_id, args["prompt"])
if reply:
await self._send_text(room.room_id, reply)
else:
except json.JSONDecodeError:
args = {}
result = await self._execute_tool(tc.function.name, args, sender, room.room_id)
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": result,
})
logger.info("Tool %s executed (iter %d) for %s", tc.function.name, iteration, sender)
# Send final reply
if reply:
await self._send_text(room.room_id, reply)
# Extract and store new memories (after reply sent, with timeout)