Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
* @author Christian Tzolov
* @author Alexandros Pappas
* @author Dariusz Jędrzejczyk
* @author Yanming Zhou
* @see McpServerTransport
* @see ServerSentEvent
*/
Expand Down Expand Up @@ -391,14 +392,19 @@ private Mono<ServerResponse> handleMessage(ServerRequest request) {
}

if (request.queryParam("sessionId").isEmpty()) {
return ServerResponse.badRequest().bodyValue(new McpError("Session ID missing in message endpoint"));
return ServerResponse.badRequest()
.bodyValue(McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST)
.message("Session ID missing in message endpoint")
.build());
}

McpServerSession session = sessions.get(request.queryParam("sessionId").get());

if (session == null) {
return ServerResponse.status(HttpStatus.NOT_FOUND)
.bodyValue(new McpError("Session not found: " + request.queryParam("sessionId").get()));
.bodyValue(McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST)
.message("Session not found: " + request.queryParam("sessionId").get())
.build());
}

McpTransportContext transportContext = this.contextExtractor.extract(request, new DefaultMcpTransportContext());
Expand All @@ -412,12 +418,17 @@ private Mono<ServerResponse> handleMessage(ServerRequest request) {
// - the error is signalled on the SSE connection
// return ServerResponse.ok().build();
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue(new McpError(error.getMessage()));
.bodyValue(McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR)
.message(error.getMessage())
.build());
});
}
catch (IllegalArgumentException | IOException e) {
logger.error("Failed to deserialize message: {}", e.getMessage());
return ServerResponse.badRequest().bodyValue(new McpError("Invalid message format"));
return ServerResponse.badRequest()
.bodyValue(McpError.builder(McpSchema.ErrorCodes.PARSE_ERROR)
.message("Invalid message format")
.build());
}
}).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* Implementation of a WebFlux based {@link McpStatelessServerTransport}.
*
* @author Dariusz Jędrzejczyk
* @author Yanming Zhou
*/
public class WebFluxStatelessServerTransport implements McpStatelessServerTransport {

Expand Down Expand Up @@ -102,7 +103,10 @@ private Mono<ServerResponse> handlePost(ServerRequest request) {
List<MediaType> acceptHeaders = request.headers().asHttpHeaders().getAccept();
if (!(acceptHeaders.contains(MediaType.APPLICATION_JSON)
&& acceptHeaders.contains(MediaType.TEXT_EVENT_STREAM))) {
return ServerResponse.badRequest().build();
return ServerResponse.badRequest()
.bodyValue(McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST)
.message("Invalid Accept headers. Expected application/json and text/event-stream")
.build());
}

return request.bodyToMono(String.class).<ServerResponse>flatMap(body -> {
Expand All @@ -121,12 +125,17 @@ else if (message instanceof McpSchema.JSONRPCNotification jsonrpcNotification) {
}
else {
return ServerResponse.badRequest()
.bodyValue(new McpError("The server accepts either requests or notifications"));
.bodyValue(McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST)
.message("The server accepts either requests or notifications")
.build());
}
}
catch (IllegalArgumentException | IOException e) {
logger.error("Failed to deserialize message: {}", e.getMessage());
return ServerResponse.badRequest().bodyValue(new McpError("Invalid message format"));
return ServerResponse.badRequest()
.bodyValue(McpError.builder(McpSchema.ErrorCodes.PARSE_ERROR)
.message("Invalid message format")
.build());
}
}).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
* Implementation of a WebFlux based {@link McpStreamableServerTransportProvider}.
*
* @author Dariusz Jędrzejczyk
* @author Yanming Zhou
*/
public class WebFluxStreamableServerTransportProvider implements McpStreamableServerTransportProvider {

Expand Down Expand Up @@ -171,12 +172,17 @@ private Mono<ServerResponse> handleGet(ServerRequest request) {
return Mono.defer(() -> {
List<MediaType> acceptHeaders = request.headers().asHttpHeaders().getAccept();
if (!acceptHeaders.contains(MediaType.TEXT_EVENT_STREAM)) {
return ServerResponse.badRequest().build();
return ServerResponse.badRequest()
.bodyValue(McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST)
.message("Invalid Accept headers. Expected text/event-stream")
.build());
}

if (!request.headers().asHttpHeaders().containsKey(HttpHeaders.MCP_SESSION_ID)) {
return ServerResponse.badRequest().build(); // TODO: say we need a session
// id
return ServerResponse.badRequest()
.bodyValue(McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST)
.message("Missing header " + HttpHeaders.MCP_SESSION_ID)
.build());
}

String sessionId = request.headers().asHttpHeaders().getFirst(HttpHeaders.MCP_SESSION_ID);
Expand Down Expand Up @@ -226,7 +232,10 @@ private Mono<ServerResponse> handlePost(ServerRequest request) {
List<MediaType> acceptHeaders = request.headers().asHttpHeaders().getAccept();
if (!(acceptHeaders.contains(MediaType.APPLICATION_JSON)
&& acceptHeaders.contains(MediaType.TEXT_EVENT_STREAM))) {
return ServerResponse.badRequest().build();
return ServerResponse.badRequest()
.bodyValue(McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST)
.message("Invalid Accept headers. Expected application/json and text/event-stream")
.build());
}

return request.bodyToMono(String.class).<ServerResponse>flatMap(body -> {
Expand Down Expand Up @@ -258,15 +267,20 @@ private Mono<ServerResponse> handlePost(ServerRequest request) {
}

if (!request.headers().asHttpHeaders().containsKey(HttpHeaders.MCP_SESSION_ID)) {
return ServerResponse.badRequest().bodyValue(new McpError("Session ID missing"));
return ServerResponse.badRequest()
.bodyValue(McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST)
.message("Session ID missing")
.build());
}

String sessionId = request.headers().asHttpHeaders().getFirst(HttpHeaders.MCP_SESSION_ID);
McpStreamableServerSession session = sessions.get(sessionId);

if (session == null) {
return ServerResponse.status(HttpStatus.NOT_FOUND)
.bodyValue(new McpError("Session not found: " + sessionId));
.bodyValue(McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST)
.message("Session not found: " + sessionId)
.build());
}

if (message instanceof McpSchema.JSONRPCResponse jsonrpcResponse) {
Expand All @@ -292,12 +306,18 @@ else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) {
ServerSentEvent.class);
}
else {
return ServerResponse.badRequest().bodyValue(new McpError("Unknown message type"));
return ServerResponse.badRequest()
.bodyValue(McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST)
.message("Unknown message type")
.build());
}
}
catch (IllegalArgumentException | IOException e) {
logger.error("Failed to deserialize message: {}", e.getMessage());
return ServerResponse.badRequest().bodyValue(new McpError("Invalid message format"));
return ServerResponse.badRequest()
.bodyValue(McpError.builder(McpSchema.ErrorCodes.PARSE_ERROR)
.message("Invalid message format")
.build());
}
})
.switchIfEmpty(ServerResponse.badRequest().build())
Expand All @@ -313,8 +333,10 @@ private Mono<ServerResponse> handleDelete(ServerRequest request) {

return Mono.defer(() -> {
if (!request.headers().asHttpHeaders().containsKey(HttpHeaders.MCP_SESSION_ID)) {
return ServerResponse.badRequest().build(); // TODO: say we need a session
// id
return ServerResponse.badRequest()
.bodyValue(McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST)
.message("Missing header " + HttpHeaders.MCP_SESSION_ID)
.build());
}

if (this.disallowDelete) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
*
* @author Christian Tzolov
* @author Alexandros Pappas
* @author Yanming Zhou
* @see McpServerTransportProvider
* @see RouterFunction
*/
Expand Down Expand Up @@ -386,14 +387,20 @@ private ServerResponse handleMessage(ServerRequest request) {
}

if (request.param("sessionId").isEmpty()) {
return ServerResponse.badRequest().body(new McpError("Session ID missing in message endpoint"));
return ServerResponse.badRequest()
.body(McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST)
.message("Session ID missing in message endpoint")
.build());
}

String sessionId = request.param("sessionId").get();
McpServerSession session = sessions.get(sessionId);

if (session == null) {
return ServerResponse.status(HttpStatus.NOT_FOUND).body(new McpError("Session not found: " + sessionId));
return ServerResponse.status(HttpStatus.NOT_FOUND)
.body(McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST)
.message("Session not found: " + sessionId)
.build());
}

try {
Expand All @@ -413,11 +420,13 @@ private ServerResponse handleMessage(ServerRequest request) {
}
catch (IllegalArgumentException | IOException e) {
logger.error("Failed to deserialize message: {}", e.getMessage());
return ServerResponse.badRequest().body(new McpError("Invalid message format"));
return ServerResponse.badRequest()
.body(McpError.builder(McpSchema.ErrorCodes.PARSE_ERROR).message("Invalid message format").build());
}
catch (Exception e) {
logger.error("Error handling message: {}", e.getMessage());
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).body(new McpError(e.getMessage()));
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR).message(e.getMessage()).build());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
* {@link io.modelcontextprotocol.server.transport.WebFluxStatelessServerTransport}
*
* @author Christian Tzolov
* @author Yanming Zhou
*/
public class WebMvcStatelessServerTransport implements McpStatelessServerTransport {

Expand Down Expand Up @@ -106,7 +107,10 @@ private ServerResponse handlePost(ServerRequest request) {
List<MediaType> acceptHeaders = request.headers().asHttpHeaders().getAccept();
if (!(acceptHeaders.contains(MediaType.APPLICATION_JSON)
&& acceptHeaders.contains(MediaType.TEXT_EVENT_STREAM))) {
return ServerResponse.badRequest().build();
return ServerResponse.badRequest()
.body(McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST)
.message("Invalid Accept headers. Expected application/json and text/event-stream")
.build());
}

try {
Expand All @@ -124,7 +128,9 @@ private ServerResponse handlePost(ServerRequest request) {
catch (Exception e) {
logger.error("Failed to handle request: {}", e.getMessage());
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(new McpError("Failed to handle request: " + e.getMessage()));
.body(McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR)
.message("Failed to handle request: " + e.getMessage())
.build());
}
}
else if (message instanceof McpSchema.JSONRPCNotification jsonrpcNotification) {
Expand All @@ -137,22 +143,29 @@ else if (message instanceof McpSchema.JSONRPCNotification jsonrpcNotification) {
catch (Exception e) {
logger.error("Failed to handle notification: {}", e.getMessage());
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(new McpError("Failed to handle notification: " + e.getMessage()));
.body(McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR)
.message("Failed to handle notification: " + e.getMessage())
.build());
}
}
else {
return ServerResponse.badRequest()
.body(new McpError("The server accepts either requests or notifications"));
.body(McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST)
.message("The server accepts either requests or notifications")
.build());
}
}
catch (IllegalArgumentException | IOException e) {
logger.error("Failed to deserialize message: {}", e.getMessage());
return ServerResponse.badRequest().body(new McpError("Invalid message format"));
return ServerResponse.badRequest()
.body(McpError.builder(McpSchema.ErrorCodes.PARSE_ERROR).message("Invalid message format").build());
}
catch (Exception e) {
logger.error("Unexpected error handling message: {}", e.getMessage());
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(new McpError("Unexpected error: " + e.getMessage()));
.body(McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR)
.message("Unexpected error: " + e.getMessage())
.build());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
*
* @author Christian Tzolov
* @author Dariusz Jędrzejczyk
* @author Yanming Zhou
* @see McpStreamableServerTransportProvider
* @see RouterFunction
*/
Expand Down Expand Up @@ -235,7 +236,7 @@ private ServerResponse handleGet(ServerRequest request) {

List<MediaType> acceptHeaders = request.headers().asHttpHeaders().getAccept();
if (!acceptHeaders.contains(MediaType.TEXT_EVENT_STREAM)) {
return ServerResponse.badRequest().body("Invalid Accept header. Expected TEXT_EVENT_STREAM");
return ServerResponse.badRequest().body("Invalid Accept header. Expected text/event-stream");
}

McpTransportContext transportContext = this.contextExtractor.extract(request, new DefaultMcpTransportContext());
Expand Down Expand Up @@ -319,7 +320,9 @@ private ServerResponse handlePost(ServerRequest request) {
if (!acceptHeaders.contains(MediaType.TEXT_EVENT_STREAM)
|| !acceptHeaders.contains(MediaType.APPLICATION_JSON)) {
return ServerResponse.badRequest()
.body(new McpError("Invalid Accept headers. Expected TEXT_EVENT_STREAM and APPLICATION_JSON"));
.body(McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST)
.message("Invalid Accept headers. Expected application/json and text/event-stream")
.build());
}

McpTransportContext transportContext = this.contextExtractor.extract(request, new DefaultMcpTransportContext());
Expand Down Expand Up @@ -349,21 +352,25 @@ private ServerResponse handlePost(ServerRequest request) {
}
catch (Exception e) {
logger.error("Failed to initialize session: {}", e.getMessage());
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).body(new McpError(e.getMessage()));
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR).message(e.getMessage()).build());
}
}

// Handle other messages that require a session
if (!request.headers().asHttpHeaders().containsKey(HttpHeaders.MCP_SESSION_ID)) {
return ServerResponse.badRequest().body(new McpError("Session ID missing"));
return ServerResponse.badRequest()
.body(McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST).message("Session ID missing").build());
}

String sessionId = request.headers().asHttpHeaders().getFirst(HttpHeaders.MCP_SESSION_ID);
McpStreamableServerSession session = this.sessions.get(sessionId);

if (session == null) {
return ServerResponse.status(HttpStatus.NOT_FOUND)
.body(new McpError("Session not found: " + sessionId));
.body(McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST)
.message("Session not found: " + sessionId)
.build());
}

if (message instanceof McpSchema.JSONRPCResponse jsonrpcResponse) {
Expand Down Expand Up @@ -404,16 +411,20 @@ else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) {
}
else {
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(new McpError("Unknown message type"));
.body(McpError.builder(McpSchema.ErrorCodes.INVALID_REQUEST)
.message("Unknown message type")
.build());
}
}
catch (IllegalArgumentException | IOException e) {
logger.error("Failed to deserialize message: {}", e.getMessage());
return ServerResponse.badRequest().body(new McpError("Invalid message format"));
return ServerResponse.badRequest()
.body(McpError.builder(McpSchema.ErrorCodes.PARSE_ERROR).message("Invalid message format").build());
}
catch (Exception e) {
logger.error("Error handling message: {}", e.getMessage());
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).body(new McpError(e.getMessage()));
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR).message(e.getMessage()).build());
}
}

Expand Down Expand Up @@ -451,7 +462,8 @@ private ServerResponse handleDelete(ServerRequest request) {
}
catch (Exception e) {
logger.error("Failed to delete session {}: {}", sessionId, e.getMessage());
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).body(new McpError(e.getMessage()));
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR).message(e.getMessage()).build());
}
}

Expand Down
Loading