diff --git a/docs/docs/using/.pages b/docs/docs/using/.pages index fbf30385..e73ee1aa 100644 --- a/docs/docs/using/.pages +++ b/docs/docs/using/.pages @@ -1,5 +1,6 @@ nav: - index.md - mcpgateway-wrapper.md + - mcpgateway-translate.md - Clients: clients - Agents: agents diff --git a/docs/docs/using/mcpgateway-translate.md b/docs/docs/using/mcpgateway-translate.md new file mode 100644 index 00000000..0164f232 --- /dev/null +++ b/docs/docs/using/mcpgateway-translate.md @@ -0,0 +1,139 @@ +# MCP Gateway StdIO to SSE Bridge (`mcpgateway-translate`) + +`mcpgateway-translate` is a lightweight bridge that connects a JSON-RPC server +running over StdIO to an HTTP/SSE interface, or consumes a remote SSE stream +and forwards messages to a local StdIO process. + +Supported modes: + +1. StdIO to SSE – serve a local subprocess over HTTP with SSE output +2. SSE to StdIO – subscribe to a remote SSE stream and forward messages to a local process + +--- + +## Features + +| Feature | Description | +|---------|-------------| +| Bidirectional bridging | Supports both StdIO to SSE and SSE to StdIO | +| Keep-alive frames | Emits `keepalive` events every 30 seconds | +| Endpoint bootstrapping | Sends a unique message POST endpoint per client session | +| CORS support | Configure allowed origins via `--cors` | +| OAuth2 support | Use `--oauth2Bearer` to authorize remote SSE connections | +| Health check | Provides a `/healthz` endpoint for liveness probes | +| Logging control | Adjustable log verbosity with `--logLevel` | +| Graceful shutdown | Cleans up subprocess and server on termination signals | + +--- + +## Quick Start + +### Expose a local StdIO server over SSE + +```bash +mcpgateway-translate \ + --stdio "uvenv run mcp-server-git" \ + --port 9000 +``` + +Access the SSE stream at: + +``` +http://localhost:9000/sse +``` + +### Bridge a remote SSE endpoint to a local process + +```bash +mcpgateway-translate \ + --sse "https://corp.example.com/mcp" \ + --oauth2Bearer "your-token" +``` + +--- + +## Command-Line Options + +``` +mcpgateway-translate [--stdio CMD | --sse URL | --streamableHttp URL] [options] +``` + +### Required (one of) + +* `--stdio ` + Start a local process whose stdout will be streamed as SSE and stdin will receive backchannel messages. + +* `--sse ` + Connect to a remote SSE stream and forward messages to a local subprocess. + +* `--streamableHttp ` + Not implemented in this build. Raises an error. + +### Optional + +* `--port ` + HTTP server port when using --stdio mode (default: 8000) + +* `--cors ` + One or more allowed origins for CORS (space-separated) + +* `--oauth2Bearer ` + Bearer token to include in Authorization header when connecting to remote SSE + +* `--logLevel ` + Logging level (default: info). Options: debug, info, warning, error, critical + +--- + +## HTTP API (when using --stdio) + +### GET /sse + +Streams JSON-RPC responses as SSE. Each connection receives: + +* `event: endpoint` – the URL for backchannel POST +* `event: keepalive` – periodic keepalive signal +* `event: message` – forwarded output from subprocess + +### POST /message + +Send a JSON-RPC message to the subprocess. Returns HTTP 202 on success, or 400 for invalid JSON. + +### GET /healthz + +Health check endpoint. Always responds with `ok`. + +--- + +## Example Use Cases + +### 1. Browser integration + +```bash +mcpgateway-translate \ + --stdio "uvenv run mcp-server-git" \ + --port 9000 \ + --cors "https://myapp.com" +``` + +Then connect the frontend to: + +``` +http://localhost:9000/sse +``` + +### 2. Connect remote server to local CLI tools + +```bash +mcpgateway-translate \ + --sse "https://corp.example.com/mcp" \ + --oauth2Bearer "$TOKEN" \ + --logLevel debug +``` + +--- + +## Notes + +* Only StdIO to SSE and SSE to StdIO bridging are implemented. +* Any use of `--streamableHttp` will raise a NotImplementedError. diff --git a/mcpgateway/translate.py b/mcpgateway/translate.py index daf5baee..c9d0e502 100644 --- a/mcpgateway/translate.py +++ b/mcpgateway/translate.py @@ -3,9 +3,13 @@ Copyright 2025 SPDX-License-Identifier: Apache-2.0 -Authors: Mihai Criveti +Authors: Mihai Criveti, Manav Gupta + +You can now run the bridge in either direction: + +- stdio to SSE (expose local stdio MCP server over SSE) +- SSE to stdio (bridge remote SSE endpoint to local stdio) -Only the stdio→SSE direction is implemented for now. Usage ----- @@ -16,17 +20,17 @@ curl -N http://localhost:9000/sse # receive the stream # 3. send a test echo request -curl -X POST http://localhost:9000/message \ - -H 'Content-Type: application/json' \ +curl -X POST http://localhost:9000/message \\ + -H 'Content-Type: application/json' \\ -d '{"jsonrpc":"2.0","id":1,"method":"echo","params":{"value":"hi"}}' # 4. proper MCP handshake and tool listing -curl -X POST http://localhost:9000/message \ - -H 'Content-Type: application/json' \ +curl -X POST http://localhost:9000/message \\ + -H 'Content-Type: application/json' \\ -d '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-03-26","capabilities":{},"clientInfo":{"name":"demo","version":"0.0.1"}}}' -curl -X POST http://localhost:9000/message \ - -H 'Content-Type: application/json' \ +curl -X POST http://localhost:9000/message \\ + -H 'Content-Type: application/json' \\ -d '{"jsonrpc":"2.0","id":2,"method":"tools/list"}' The SSE stream now emits JSON-RPC responses as `event: message` frames and sends @@ -162,9 +166,23 @@ def _build_fastapi( keep_alive: int = KEEP_ALIVE_INTERVAL, sse_path: str = "/sse", message_path: str = "/message", + cors_origins: Optional[List[str]] = None, ) -> FastAPI: app = FastAPI() + # Add CORS middleware if origins specified + if cors_origins: + # Third-Party + from fastapi.middleware.cors import CORSMiddleware + + app.add_middleware( + CORSMiddleware, + allow_origins=cors_origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + # ----- GET /sse ---------------------------------------------------------# @app.get(sse_path) async def get_sse(request: Request) -> EventSourceResponse: # noqa: D401 @@ -263,11 +281,11 @@ async def health() -> Response: # noqa: D401 def _parse_args(argv: Sequence[str]) -> argparse.Namespace: p = argparse.ArgumentParser( prog="mcpgateway.translate", - description="Bridges stdio JSON-RPC to SSE.", + description="Bridges stdio JSON-RPC to SSE or SSE to stdio.", ) src = p.add_mutually_exclusive_group(required=True) src.add_argument("--stdio", help='Command to run, e.g. "uv run mcp-server-git"') - src.add_argument("--sse", help="[NOT IMPLEMENTED]") + src.add_argument("--sse", help="Remote SSE endpoint URL") src.add_argument("--streamableHttp", help="[NOT IMPLEMENTED]") p.add_argument("--port", type=int, default=8000, help="HTTP port to bind") @@ -277,19 +295,28 @@ def _parse_args(argv: Sequence[str]) -> argparse.Namespace: choices=["debug", "info", "warning", "error", "critical"], help="Log level", ) + p.add_argument( + "--cors", + nargs="*", + help="CORS allowed origins (e.g., --cors https://app.example.com)", + ) + p.add_argument( + "--oauth2Bearer", + help="OAuth2 Bearer token for authentication", + ) args = p.parse_args(argv) - if args.sse or args.streamableHttp: - raise NotImplementedError("Only --stdio → SSE is available in this build.") + if args.streamableHttp: + raise NotImplementedError("Only --stdio → SSE and --sse → stdio are available in this build.") return args -async def _run_stdio_to_sse(cmd: str, port: int, log_level: str = "info") -> None: +async def _run_stdio_to_sse(cmd: str, port: int, log_level: str = "info", cors: Optional[List[str]] = None) -> None: pubsub = _PubSub() stdio = StdIOEndpoint(cmd, pubsub) await stdio.start() - app = _build_fastapi(pubsub, stdio) + app = _build_fastapi(pubsub, stdio, cors_origins=cors) config = uvicorn.Config( app, host="0.0.0.0", @@ -319,6 +346,43 @@ async def _shutdown() -> None: await _shutdown() # final cleanup +async def _run_sse_to_stdio(url: str, oauth2_bearer: Optional[str], log_level: str = "info") -> None: + # Third-Party + import httpx + + headers = {} + if oauth2_bearer: + headers["Authorization"] = f"Bearer {oauth2_bearer}" + + async with httpx.AsyncClient(headers=headers, timeout=None) as client: + process = await asyncio.create_subprocess_shell( + "cat", # Placeholder command; replace with actual stdio server command if needed + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=sys.stderr, + ) + + async def read_stdout(): + assert process.stdout + while True: + line = await process.stdout.readline() + if not line: + break + print(line.decode().rstrip()) + + async def pump_sse_to_stdio(): + async with client.stream("GET", url) as response: + async for line in response.aiter_lines(): + if line.startswith("data: "): + data = line[6:] + if data and data != "{}": + if process.stdin: + process.stdin.write((data + "\n").encode()) + await process.stdin.drain() + + await asyncio.gather(read_stdout(), pump_sse_to_stdio()) + + def main(argv: Optional[Sequence[str]] | None = None) -> None: # entry-point args = _parse_args(argv or sys.argv[1:]) logging.basicConfig( @@ -326,7 +390,10 @@ def main(argv: Optional[Sequence[str]] | None = None) -> None: # entry-point format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) try: - asyncio.run(_run_stdio_to_sse(args.stdio, args.port, args.logLevel)) + if args.stdio: + asyncio.run(_run_stdio_to_sse(args.stdio, args.port, args.logLevel, args.cors)) + elif args.sse: + asyncio.run(_run_sse_to_stdio(args.sse, args.oauth2Bearer, args.logLevel)) except KeyboardInterrupt: print("") # restore shell prompt sys.exit(0) diff --git a/pyproject.toml b/pyproject.toml index 1fa89b04..d07a7851 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -175,6 +175,7 @@ Changelog = "https://github.com/IBM/mcp-context-forge/blob/main/CHANGELOG.md" # -------------------------------------------------------------------- [project.scripts] mcpgateway = "mcpgateway.cli:main" +mcpgateway-translate = "mcpgateway.translate.cli:main" # -------------------------------------------------------------------- # 🔧 setuptools-specific configuration diff --git a/tests/unit/mcpgateway/test_translate.py b/tests/unit/mcpgateway/test_translate.py index 24417846..e8b84dec 100644 --- a/tests/unit/mcpgateway/test_translate.py +++ b/tests/unit/mcpgateway/test_translate.py @@ -212,9 +212,10 @@ def test_parse_args_ok(translate): assert ns.stdio == "echo hi" and ns.port == 9001 -def test_parse_args_not_implemented(translate): - with pytest.raises(NotImplementedError): - translate._parse_args(["--sse", "x"]) +def test_parse_args_sse_ok(translate): + ns = translate._parse_args(["--sse", "http://upstream.example/sse"]) + assert ns.sse == "http://upstream.example/sse" + assert ns.stdio is None @pytest.mark.asyncio