fix(mcp-bridge): unique wire ids to avoid concurrent session collisions
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 11s
All checks were successful
Build & Deploy / build-and-deploy (push) Successful in 11s
LiteLLM opens a fresh MCP session per tool call. Two concurrent sessions both send id=1 (tools/list + tools/call) which overwrite each other in the shared pending-map keyed by the inbound id. One of the two calls never gets a response and times out. Assign a monotonic demux-N wire id per forwardMcpCall. The caller's original id is echoed back in the HTTP response by the route handler. SB-48 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -81,6 +81,12 @@ export function isCustomerOnline(customerId: string): boolean {
|
|||||||
* Forward a JSON-RPC MCP call to the customer's WebSocket and return the result.
|
* Forward a JSON-RPC MCP call to the customer's WebSocket and return the result.
|
||||||
* Throws if customer is offline or times out.
|
* Throws if customer is offline or times out.
|
||||||
*/
|
*/
|
||||||
|
// Monotonic counter to ensure each WS-forwarded call has a unique id.
|
||||||
|
// Caller-provided ids (e.g. from LiteLLM) can collide across concurrent
|
||||||
|
// sessions — LiteLLM opens a fresh session per tool call and each uses
|
||||||
|
// id=1. We wrap with our own id on the wire and translate back.
|
||||||
|
let nextWireId = 1;
|
||||||
|
|
||||||
export async function forwardMcpCall(
|
export async function forwardMcpCall(
|
||||||
customerId: string,
|
customerId: string,
|
||||||
request: { method: string; params?: unknown; id: string | number },
|
request: { method: string; params?: unknown; id: string | number },
|
||||||
@@ -91,19 +97,28 @@ export async function forwardMcpCall(
|
|||||||
throw new McpBridgeError("tool_unavailable: customer not online", -32001);
|
throw new McpBridgeError("tool_unavailable: customer not online", -32001);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const wireId = `demux-${nextWireId++}`;
|
||||||
|
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
const timer = setTimeout(() => {
|
const timer = setTimeout(() => {
|
||||||
session.pending.delete(request.id);
|
session.pending.delete(wireId);
|
||||||
reject(new McpBridgeError(`tool_unavailable: timeout after ${timeoutMs}ms`, -32002));
|
reject(new McpBridgeError(`tool_unavailable: timeout after ${timeoutMs}ms`, -32002));
|
||||||
}, timeoutMs);
|
}, timeoutMs);
|
||||||
|
|
||||||
session.pending.set(request.id, { resolve, reject, timer });
|
session.pending.set(wireId, { resolve, reject, timer });
|
||||||
|
|
||||||
try {
|
try {
|
||||||
session.ws.send(JSON.stringify({ jsonrpc: "2.0", ...request }));
|
session.ws.send(
|
||||||
|
JSON.stringify({
|
||||||
|
jsonrpc: "2.0",
|
||||||
|
method: request.method,
|
||||||
|
params: request.params,
|
||||||
|
id: wireId,
|
||||||
|
})
|
||||||
|
);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
clearTimeout(timer);
|
clearTimeout(timer);
|
||||||
session.pending.delete(request.id);
|
session.pending.delete(wireId);
|
||||||
reject(err);
|
reject(err);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user