Skip to content

Implemented Transport-Translation Bridge (mcpgateway.translate) #128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs/using/.pages
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
nav:
- index.md
- mcpgateway-wrapper.md
- mcpgateway-translate.md
- Clients: clients
- Agents: agents
139 changes: 139 additions & 0 deletions docs/docs/using/mcpgateway-translate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# MCP Gateway StdIO to SSE Bridge (`mcpgateway-translate`)

`mcpgateway-translate` is a lightweight bridge that connects a JSON-RPC server
running over StdIO to an HTTP/SSE interface, or consumes a remote SSE stream
and forwards messages to a local StdIO process.

Supported modes:

1. StdIO to SSE – serve a local subprocess over HTTP with SSE output
2. SSE to StdIO – subscribe to a remote SSE stream and forward messages to a local process

---

## Features

| Feature | Description |
|---------|-------------|
| Bidirectional bridging | Supports both StdIO to SSE and SSE to StdIO |
| Keep-alive frames | Emits `keepalive` events every 30 seconds |
| Endpoint bootstrapping | Sends a unique message POST endpoint per client session |
| CORS support | Configure allowed origins via `--cors` |
| OAuth2 support | Use `--oauth2Bearer` to authorize remote SSE connections |
| Health check | Provides a `/healthz` endpoint for liveness probes |
| Logging control | Adjustable log verbosity with `--logLevel` |
| Graceful shutdown | Cleans up subprocess and server on termination signals |

---

## Quick Start

### Expose a local StdIO server over SSE

```bash
mcpgateway-translate \
--stdio "uvenv run mcp-server-git" \
--port 9000
```

Access the SSE stream at:

```
http://localhost:9000/sse
```

### Bridge a remote SSE endpoint to a local process

```bash
mcpgateway-translate \
--sse "https://corp.example.com/mcp" \
--oauth2Bearer "your-token"
```

---

## Command-Line Options

```
mcpgateway-translate [--stdio CMD | --sse URL | --streamableHttp URL] [options]
```

### Required (one of)

* `--stdio <command>`
Start a local process whose stdout will be streamed as SSE and stdin will receive backchannel messages.

* `--sse <url>`
Connect to a remote SSE stream and forward messages to a local subprocess.

* `--streamableHttp <url>`
Not implemented in this build. Raises an error.

### Optional

* `--port <number>`
HTTP server port when using --stdio mode (default: 8000)

* `--cors <origins>`
One or more allowed origins for CORS (space-separated)

* `--oauth2Bearer <token>`
Bearer token to include in Authorization header when connecting to remote SSE

* `--logLevel <level>`
Logging level (default: info). Options: debug, info, warning, error, critical

---

## HTTP API (when using --stdio)

### GET /sse

Streams JSON-RPC responses as SSE. Each connection receives:

* `event: endpoint` – the URL for backchannel POST
* `event: keepalive` – periodic keepalive signal
* `event: message` – forwarded output from subprocess

### POST /message

Send a JSON-RPC message to the subprocess. Returns HTTP 202 on success, or 400 for invalid JSON.

### GET /healthz

Health check endpoint. Always responds with `ok`.

---

## Example Use Cases

### 1. Browser integration

```bash
mcpgateway-translate \
--stdio "uvenv run mcp-server-git" \
--port 9000 \
--cors "https://myapp.com"
```

Then connect the frontend to:

```
http://localhost:9000/sse
```

### 2. Connect remote server to local CLI tools

```bash
mcpgateway-translate \
--sse "https://corp.example.com/mcp" \
--oauth2Bearer "$TOKEN" \
--logLevel debug
```

---

## Notes

* Only StdIO to SSE and SSE to StdIO bridging are implemented.
* Any use of `--streamableHttp` will raise a NotImplementedError.
97 changes: 82 additions & 15 deletions mcpgateway/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@

Copyright 2025
SPDX-License-Identifier: Apache-2.0
Authors: Mihai Criveti
Authors: Mihai Criveti, Manav Gupta

You can now run the bridge in either direction:

- stdio to SSE (expose local stdio MCP server over SSE)
- SSE to stdio (bridge remote SSE endpoint to local stdio)

Only the stdio→SSE direction is implemented for now.

Usage
-----
Expand All @@ -16,17 +20,17 @@
curl -N http://localhost:9000/sse # receive the stream

# 3. send a test echo request
curl -X POST http://localhost:9000/message \
-H 'Content-Type: application/json' \
curl -X POST http://localhost:9000/message \\
-H 'Content-Type: application/json' \\
-d '{"jsonrpc":"2.0","id":1,"method":"echo","params":{"value":"hi"}}'

# 4. proper MCP handshake and tool listing
curl -X POST http://localhost:9000/message \
-H 'Content-Type: application/json' \
curl -X POST http://localhost:9000/message \\
-H 'Content-Type: application/json' \\
-d '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-03-26","capabilities":{},"clientInfo":{"name":"demo","version":"0.0.1"}}}'

curl -X POST http://localhost:9000/message \
-H 'Content-Type: application/json' \
curl -X POST http://localhost:9000/message \\
-H 'Content-Type: application/json' \\
-d '{"jsonrpc":"2.0","id":2,"method":"tools/list"}'

The SSE stream now emits JSON-RPC responses as `event: message` frames and sends
Expand Down Expand Up @@ -162,9 +166,23 @@ def _build_fastapi(
keep_alive: int = KEEP_ALIVE_INTERVAL,
sse_path: str = "/sse",
message_path: str = "/message",
cors_origins: Optional[List[str]] = None,
) -> FastAPI:
app = FastAPI()

# Add CORS middleware if origins specified
if cors_origins:
# Third-Party
from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
CORSMiddleware,
allow_origins=cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

# ----- GET /sse ---------------------------------------------------------#
@app.get(sse_path)
async def get_sse(request: Request) -> EventSourceResponse: # noqa: D401
Expand Down Expand Up @@ -263,11 +281,11 @@ async def health() -> Response: # noqa: D401
def _parse_args(argv: Sequence[str]) -> argparse.Namespace:
p = argparse.ArgumentParser(
prog="mcpgateway.translate",
description="Bridges stdio JSON-RPC to SSE.",
description="Bridges stdio JSON-RPC to SSE or SSE to stdio.",
)
src = p.add_mutually_exclusive_group(required=True)
src.add_argument("--stdio", help='Command to run, e.g. "uv run mcp-server-git"')
src.add_argument("--sse", help="[NOT IMPLEMENTED]")
src.add_argument("--sse", help="Remote SSE endpoint URL")
src.add_argument("--streamableHttp", help="[NOT IMPLEMENTED]")

p.add_argument("--port", type=int, default=8000, help="HTTP port to bind")
Expand All @@ -277,19 +295,28 @@ def _parse_args(argv: Sequence[str]) -> argparse.Namespace:
choices=["debug", "info", "warning", "error", "critical"],
help="Log level",
)
p.add_argument(
"--cors",
nargs="*",
help="CORS allowed origins (e.g., --cors https://app.example.com)",
)
p.add_argument(
"--oauth2Bearer",
help="OAuth2 Bearer token for authentication",
)

args = p.parse_args(argv)
if args.sse or args.streamableHttp:
raise NotImplementedError("Only --stdio β†’ SSE is available in this build.")
if args.streamableHttp:
raise NotImplementedError("Only --stdio β†’ SSE and --sse β†’ stdio are available in this build.")
return args


async def _run_stdio_to_sse(cmd: str, port: int, log_level: str = "info") -> None:
async def _run_stdio_to_sse(cmd: str, port: int, log_level: str = "info", cors: Optional[List[str]] = None) -> None:
pubsub = _PubSub()
stdio = StdIOEndpoint(cmd, pubsub)
await stdio.start()

app = _build_fastapi(pubsub, stdio)
app = _build_fastapi(pubsub, stdio, cors_origins=cors)
config = uvicorn.Config(
app,
host="0.0.0.0",
Expand Down Expand Up @@ -319,14 +346,54 @@ async def _shutdown() -> None:
await _shutdown() # final cleanup


async def _run_sse_to_stdio(url: str, oauth2_bearer: Optional[str], log_level: str = "info") -> None:
# Third-Party
import httpx

headers = {}
if oauth2_bearer:
headers["Authorization"] = f"Bearer {oauth2_bearer}"

async with httpx.AsyncClient(headers=headers, timeout=None) as client:
process = await asyncio.create_subprocess_shell(
"cat", # Placeholder command; replace with actual stdio server command if needed
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=sys.stderr,
)

async def read_stdout():
assert process.stdout
while True:
line = await process.stdout.readline()
if not line:
break
print(line.decode().rstrip())

async def pump_sse_to_stdio():
async with client.stream("GET", url) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
data = line[6:]
if data and data != "{}":
if process.stdin:
process.stdin.write((data + "\n").encode())
await process.stdin.drain()

await asyncio.gather(read_stdout(), pump_sse_to_stdio())


def main(argv: Optional[Sequence[str]] | None = None) -> None: # entry-point
args = _parse_args(argv or sys.argv[1:])
logging.basicConfig(
level=getattr(logging, args.logLevel.upper(), logging.INFO),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
try:
asyncio.run(_run_stdio_to_sse(args.stdio, args.port, args.logLevel))
if args.stdio:
asyncio.run(_run_stdio_to_sse(args.stdio, args.port, args.logLevel, args.cors))
elif args.sse:
asyncio.run(_run_sse_to_stdio(args.sse, args.oauth2Bearer, args.logLevel))
except KeyboardInterrupt:
print("") # restore shell prompt
sys.exit(0)
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ Changelog = "https://github.com/IBM/mcp-context-forge/blob/main/CHANGELOG.md"
# --------------------------------------------------------------------
[project.scripts]
mcpgateway = "mcpgateway.cli:main"
mcpgateway-translate = "mcpgateway.translate.cli:main"

# --------------------------------------------------------------------
# πŸ”§ setuptools-specific configuration
Expand Down
7 changes: 4 additions & 3 deletions tests/unit/mcpgateway/test_translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,10 @@ def test_parse_args_ok(translate):
assert ns.stdio == "echo hi" and ns.port == 9001


def test_parse_args_not_implemented(translate):
with pytest.raises(NotImplementedError):
translate._parse_args(["--sse", "x"])
def test_parse_args_sse_ok(translate):
ns = translate._parse_args(["--sse", "http://upstream.example/sse"])
assert ns.sse == "http://upstream.example/sse"
assert ns.stdio is None


@pytest.mark.asyncio
Expand Down
Loading