Skip to content

Commit 6fe71e1

Browse files
authored
Added Model Context Protocol (#1093)
1 parent 1290771 commit 6fe71e1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+630
-152
lines changed

api/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ dependencies {
6262
implementation libs.netty.common
6363
implementation libs.netty.handler
6464

65+
implementation libs.modelcontextprotocol.spring.webflux
66+
implementation libs.victools.jsonschema.generator
6567

6668
// Google Managed Service for Kafka IAM support
6769
implementation (libs.google.managed.kafka.login.handler) {

api/src/main/java/io/kafbat/ui/config/CustomWebFilter.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,17 @@ public class CustomWebFilter implements WebFilter {
1717

1818
final String path = exchange.getRequest().getPath().pathWithinApplication().value();
1919

20+
ServerWebExchange filterExchange = exchange;
21+
2022
if (path.startsWith("/ui") || path.isEmpty() || path.equals("/")) {
21-
return chain.filter(
22-
exchange.mutate().request(
23-
exchange.getRequest().mutate()
24-
.path(basePath + "/index.html")
25-
.contextPath(basePath)
26-
.build()
27-
).build()
28-
);
23+
filterExchange = exchange.mutate().request(
24+
exchange.getRequest().mutate()
25+
.path(basePath + "/index.html")
26+
.contextPath(basePath)
27+
.build()
28+
).build();
2929
}
3030

31-
return chain.filter(exchange);
31+
return chain.filter(filterExchange).contextWrite(ctx -> ctx.put(ServerWebExchange.class, exchange));
3232
}
3333
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.kafbat.ui.config;
2+
3+
import com.github.victools.jsonschema.generator.OptionPreset;
4+
import com.github.victools.jsonschema.generator.SchemaGenerator;
5+
import com.github.victools.jsonschema.generator.SchemaGeneratorConfigBuilder;
6+
import com.github.victools.jsonschema.generator.SchemaVersion;
7+
import org.springframework.context.annotation.Bean;
8+
import org.springframework.context.annotation.Configuration;
9+
10+
@Configuration
11+
public class JsonSchemaConfig {
12+
@Bean
13+
public SchemaGenerator schemaGenerator() {
14+
SchemaGeneratorConfigBuilder configBuilder =
15+
new SchemaGeneratorConfigBuilder(SchemaVersion.DRAFT_2020_12, OptionPreset.PLAIN_JSON);
16+
return new SchemaGenerator(configBuilder.build());
17+
}
18+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package io.kafbat.ui.config;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import io.kafbat.ui.service.mcp.McpSpecificationGenerator;
5+
import io.kafbat.ui.service.mcp.McpTool;
6+
import io.modelcontextprotocol.server.McpAsyncServer;
7+
import io.modelcontextprotocol.server.McpServer;
8+
import io.modelcontextprotocol.server.McpServerFeatures.AsyncPromptSpecification;
9+
import io.modelcontextprotocol.server.McpServerFeatures.AsyncToolSpecification;
10+
import io.modelcontextprotocol.server.transport.WebFluxSseServerTransportProvider;
11+
import io.modelcontextprotocol.spec.McpSchema;
12+
import java.util.ArrayList;
13+
import java.util.List;
14+
import lombok.RequiredArgsConstructor;
15+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
16+
import org.springframework.context.annotation.Bean;
17+
import org.springframework.context.annotation.Configuration;
18+
import org.springframework.web.reactive.function.server.RouterFunction;
19+
20+
@Configuration
21+
@RequiredArgsConstructor
22+
@ConditionalOnProperty(value = "mcp.enabled", havingValue = "true")
23+
public class McpConfig {
24+
25+
private final List<McpTool> mcpTools;
26+
private final McpSpecificationGenerator mcpSpecificationGenerator;
27+
28+
// SSE transport
29+
@Bean
30+
public WebFluxSseServerTransportProvider sseServerTransport(ObjectMapper mapper) {
31+
return new WebFluxSseServerTransportProvider(mapper, "/mcp/message", "/mcp/sse");
32+
}
33+
34+
// Router function for SSE transport used by Spring WebFlux to start an HTTP
35+
// server.
36+
37+
@Bean
38+
public RouterFunction<?> mcpRouterFunction(WebFluxSseServerTransportProvider transport) {
39+
return transport.getRouterFunction();
40+
}
41+
42+
@Bean
43+
public McpAsyncServer mcpServer(WebFluxSseServerTransportProvider transport) {
44+
45+
// Configure server capabilities with resource support
46+
var capabilities = McpSchema.ServerCapabilities.builder()
47+
.resources(false, true)
48+
.tools(true) // Tool support with list changes notifications
49+
.prompts(false) // Prompt support with list changes notifications
50+
.logging() // Logging support
51+
.build();
52+
53+
// Create the server with both tool and resource capabilities
54+
return McpServer.async(transport)
55+
.serverInfo("Kafka UI MCP", "0.0.1")
56+
.capabilities(capabilities)
57+
.tools(tools())
58+
.build();
59+
}
60+
61+
private List<AsyncToolSpecification> tools() {
62+
List<AsyncToolSpecification> tools = new ArrayList<>();
63+
for (McpTool mcpTool : mcpTools) {
64+
tools.addAll(mcpSpecificationGenerator.convertTool(mcpTool));
65+
}
66+
return tools;
67+
}
68+
}

api/src/main/java/io/kafbat/ui/controller/AclsController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.kafbat.ui.model.rbac.AccessContext;
1212
import io.kafbat.ui.model.rbac.permission.AclAction;
1313
import io.kafbat.ui.service.acl.AclsService;
14+
import io.kafbat.ui.service.mcp.McpTool;
1415
import java.util.Optional;
1516
import lombok.RequiredArgsConstructor;
1617
import org.apache.kafka.common.resource.PatternType;
@@ -24,7 +25,7 @@
2425

2526
@RestController
2627
@RequiredArgsConstructor
27-
public class AclsController extends AbstractController implements AclsApi {
28+
public class AclsController extends AbstractController implements AclsApi, McpTool {
2829

2930
private final AclsService aclsService;
3031

api/src/main/java/io/kafbat/ui/controller/BrokersController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.kafbat.ui.model.rbac.AccessContext;
1212
import io.kafbat.ui.model.rbac.permission.ClusterConfigAction;
1313
import io.kafbat.ui.service.BrokerService;
14+
import io.kafbat.ui.service.mcp.McpTool;
1415
import java.util.List;
1516
import java.util.Map;
1617
import javax.annotation.Nullable;
@@ -25,7 +26,7 @@
2526
@RestController
2627
@RequiredArgsConstructor
2728
@Slf4j
28-
public class BrokersController extends AbstractController implements BrokersApi {
29+
public class BrokersController extends AbstractController implements BrokersApi, McpTool {
2930
private static final String BROKER_ID = "brokerId";
3031

3132
private final BrokerService brokerService;

api/src/main/java/io/kafbat/ui/controller/ClientQuotasController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.kafbat.ui.model.ClientQuotasDTO;
77
import io.kafbat.ui.model.rbac.AccessContext;
88
import io.kafbat.ui.model.rbac.permission.ClientQuotaAction;
9+
import io.kafbat.ui.service.mcp.McpTool;
910
import io.kafbat.ui.service.quota.ClientQuotaRecord;
1011
import io.kafbat.ui.service.quota.ClientQuotaService;
1112
import java.math.BigDecimal;
@@ -22,7 +23,7 @@
2223

2324
@RestController
2425
@RequiredArgsConstructor
25-
public class ClientQuotasController extends AbstractController implements ClientQuotasApi {
26+
public class ClientQuotasController extends AbstractController implements ClientQuotasApi, McpTool {
2627

2728
private static final Comparator<ClientQuotaRecord> QUOTA_RECORDS_COMPARATOR =
2829
Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::user))

api/src/main/java/io/kafbat/ui/controller/ClustersController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.kafbat.ui.model.ClusterStatsDTO;
77
import io.kafbat.ui.model.rbac.AccessContext;
88
import io.kafbat.ui.service.ClusterService;
9+
import io.kafbat.ui.service.mcp.McpTool;
910
import lombok.RequiredArgsConstructor;
1011
import lombok.extern.slf4j.Slf4j;
1112
import org.springframework.http.ResponseEntity;
@@ -17,7 +18,7 @@
1718
@RestController
1819
@RequiredArgsConstructor
1920
@Slf4j
20-
public class ClustersController extends AbstractController implements ClustersApi {
21+
public class ClustersController extends AbstractController implements ClustersApi, McpTool {
2122
private final ClusterService clusterService;
2223

2324
@Override

api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.kafbat.ui.model.rbac.permission.TopicAction;
2020
import io.kafbat.ui.service.ConsumerGroupService;
2121
import io.kafbat.ui.service.OffsetsResetService;
22+
import io.kafbat.ui.service.mcp.McpTool;
2223
import java.util.Map;
2324
import java.util.Optional;
2425
import java.util.function.Supplier;
@@ -35,7 +36,7 @@
3536
@RestController
3637
@RequiredArgsConstructor
3738
@Slf4j
38-
public class ConsumerGroupsController extends AbstractController implements ConsumerGroupsApi {
39+
public class ConsumerGroupsController extends AbstractController implements ConsumerGroupsApi, McpTool {
3940

4041
private final ConsumerGroupService consumerGroupService;
4142
private final OffsetsResetService offsetsResetService;

api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.kafbat.ui.model.rbac.AccessContext;
2121
import io.kafbat.ui.model.rbac.permission.ConnectAction;
2222
import io.kafbat.ui.service.KafkaConnectService;
23+
import io.kafbat.ui.service.mcp.McpTool;
2324
import java.util.Comparator;
2425
import java.util.Map;
2526
import java.util.Set;
@@ -35,7 +36,7 @@
3536
@RestController
3637
@RequiredArgsConstructor
3738
@Slf4j
38-
public class KafkaConnectController extends AbstractController implements KafkaConnectApi {
39+
public class KafkaConnectController extends AbstractController implements KafkaConnectApi, McpTool {
3940
private static final Set<ConnectorActionDTO> RESTART_ACTIONS
4041
= Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS);
4142
private static final String CONNECTOR_NAME = "connectorName";

api/src/main/java/io/kafbat/ui/controller/KsqlController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.kafbat.ui.model.rbac.AccessContext;
1111
import io.kafbat.ui.model.rbac.permission.KsqlAction;
1212
import io.kafbat.ui.service.ksql.KsqlServiceV2;
13+
import io.kafbat.ui.service.mcp.McpTool;
1314
import java.util.List;
1415
import java.util.Map;
1516
import java.util.Optional;
@@ -24,7 +25,7 @@
2425
@RestController
2526
@RequiredArgsConstructor
2627
@Slf4j
27-
public class KsqlController extends AbstractController implements KsqlApi {
28+
public class KsqlController extends AbstractController implements KsqlApi, McpTool {
2829

2930
private final KsqlServiceV2 ksqlServiceV2;
3031

api/src/main/java/io/kafbat/ui/controller/MessagesController.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@
2626
import io.kafbat.ui.serde.api.Serde;
2727
import io.kafbat.ui.service.DeserializationService;
2828
import io.kafbat.ui.service.MessagesService;
29+
import io.kafbat.ui.service.mcp.McpTool;
2930
import java.util.List;
3031
import java.util.Optional;
3132
import javax.validation.Valid;
3233
import lombok.RequiredArgsConstructor;
3334
import lombok.extern.slf4j.Slf4j;
35+
import org.springframework.http.HttpStatus;
3436
import org.springframework.http.ResponseEntity;
3537
import org.springframework.web.bind.annotation.RestController;
3638
import org.springframework.web.server.ServerWebExchange;
@@ -41,7 +43,7 @@
4143
@RestController
4244
@RequiredArgsConstructor
4345
@Slf4j
44-
public class MessagesController extends AbstractController implements MessagesApi {
46+
public class MessagesController extends AbstractController implements MessagesApi, McpTool {
4547

4648
private final MessagesService messagesService;
4749
private final DeserializationService deserializationService;
@@ -148,8 +150,8 @@ public Mono<ResponseEntity<Void>> sendTopicMessages(
148150

149151
return validateAccess(context).then(
150152
createTopicMessage.flatMap(msg ->
151-
messagesService.sendMessage(getCluster(clusterName), topicName, msg).then()
152-
).map(ResponseEntity::ok)
153+
messagesService.sendMessage(getCluster(clusterName), topicName, msg)
154+
).map(m -> new ResponseEntity<Void>(HttpStatus.OK))
153155
).doOnEach(sig -> audit(context, sig));
154156
}
155157

api/src/main/java/io/kafbat/ui/controller/SchemasController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.kafbat.ui.model.rbac.AccessContext;
1414
import io.kafbat.ui.model.rbac.permission.SchemaAction;
1515
import io.kafbat.ui.service.SchemaRegistryService;
16+
import io.kafbat.ui.service.mcp.McpTool;
1617
import java.util.List;
1718
import java.util.Map;
1819
import javax.validation.Valid;
@@ -28,7 +29,7 @@
2829
@RestController
2930
@RequiredArgsConstructor
3031
@Slf4j
31-
public class SchemasController extends AbstractController implements SchemasApi {
32+
public class SchemasController extends AbstractController implements SchemasApi, McpTool {
3233

3334
private static final Integer DEFAULT_PAGE_SIZE = 25;
3435

api/src/main/java/io/kafbat/ui/controller/TopicsController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.kafbat.ui.model.rbac.AccessContext;
3030
import io.kafbat.ui.service.TopicsService;
3131
import io.kafbat.ui.service.analyze.TopicAnalysisService;
32+
import io.kafbat.ui.service.mcp.McpTool;
3233
import java.util.Comparator;
3334
import java.util.List;
3435
import java.util.Map;
@@ -46,7 +47,7 @@
4647
@RestController
4748
@RequiredArgsConstructor
4849
@Slf4j
49-
public class TopicsController extends AbstractController implements TopicsApi {
50+
public class TopicsController extends AbstractController implements TopicsApi, McpTool {
5051

5152
private static final Integer DEFAULT_PAGE_SIZE = 25;
5253

api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public static Predicate<TopicMessageDTO> noop() {
5656

5757
public static Predicate<TopicMessageDTO> containsStringFilter(String string) {
5858
return msg -> StringUtils.contains(msg.getKey(), string)
59-
|| StringUtils.contains(msg.getContent(), string) || headersContains(msg, string);
59+
|| StringUtils.contains(msg.getValue(), string) || headersContains(msg, string);
6060
}
6161

6262
private static boolean headersContains(TopicMessageDTO msg, String searchString) {
@@ -126,9 +126,9 @@ private static Map<String, Map<String, Object>> recordToArgs(TopicMessageDTO top
126126
args.put("keyAsText", topicMessage.getKey());
127127
}
128128

129-
if (topicMessage.getContent() != null) {
130-
args.put("value", parseToJsonOrReturnAsIs(topicMessage.getContent()));
131-
args.put("valueAsText", topicMessage.getContent());
129+
if (topicMessage.getValue() != null) {
130+
args.put("value", parseToJsonOrReturnAsIs(topicMessage.getValue()));
131+
args.put("valueAsText", topicMessage.getValue());
132132
}
133133

134134
args.put("headers", Objects.requireNonNullElse(topicMessage.getHeaders(), emptyMap()));

api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,15 +99,15 @@ private void fillValue(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec
9999
try {
100100
var deserResult = valueDeserializer.deserialize(
101101
new RecordHeadersImpl(rec.headers()), rec.value().get());
102-
message.setContent(deserResult.getResult());
102+
message.setValue(deserResult.getResult());
103103
message.setValueSerde(valueSerdeName);
104104
message.setValueDeserializeProperties(deserResult.getAdditionalProperties());
105105
} catch (Exception e) {
106106
log.trace("Error deserializing key for value topic: {}, partition {}, offset {}, with serde {}",
107107
rec.topic(), rec.partition(), rec.offset(), valueSerdeName, e);
108108
var deserResult = fallbackValueDeserializer.deserialize(
109109
new RecordHeadersImpl(rec.headers()), rec.value().get());
110-
message.setContent(deserResult.getResult());
110+
message.setValue(deserResult.getResult());
111111
message.setValueSerde(fallbackSerdeName);
112112
}
113113
}

api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -143,15 +143,15 @@ private Mono<List<ConsumerGroupDescription>> loadSortedDescriptions(ReactiveAdmi
143143
case STATE -> {
144144
ToIntFunction<ConsumerGroupListing> statesPriorities =
145145
cg -> switch (cg.state().orElse(ConsumerGroupState.UNKNOWN)) {
146-
case STABLE -> 0;
147-
case COMPLETING_REBALANCE -> 1;
148-
case PREPARING_REBALANCE -> 2;
149-
case EMPTY -> 3;
150-
case DEAD -> 4;
151-
case UNKNOWN -> 5;
152-
case ASSIGNING -> 6;
153-
case RECONCILING -> 7;
154-
};
146+
case STABLE -> 0;
147+
case COMPLETING_REBALANCE -> 1;
148+
case PREPARING_REBALANCE -> 2;
149+
case EMPTY -> 3;
150+
case DEAD -> 4;
151+
case UNKNOWN -> 5;
152+
case ASSIGNING -> 6;
153+
case RECONCILING -> 7;
154+
};
155155
var comparator = Comparator.comparingInt(statesPriorities);
156156
yield loadDescriptionsByListings(ac, groups, comparator, pageNum, perPage, sortOrderDto);
157157
}

api/src/main/java/io/kafbat/ui/service/DeserializationService.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,19 @@ public DeserializationService(Environment env,
4141
}
4242
}
4343

44+
public ClusterSerdes getSerdesFor(String clusterName) {
45+
return clusterSerdes.get(clusterName);
46+
}
47+
4448
private ClusterSerdes getSerdesFor(KafkaCluster cluster) {
45-
return clusterSerdes.get(cluster.getName());
49+
return getSerdesFor(cluster.getName());
4650
}
4751

4852
private Serde.Serializer getSerializer(KafkaCluster cluster,
4953
String topic,
5054
Serde.Target type,
5155
String serdeName) {
52-
var serdes = getSerdesFor(cluster);
56+
var serdes = getSerdesFor(cluster.getName());
5357
var serde = serdes.serdeForName(serdeName)
5458
.orElseThrow(() -> new ValidationException(
5559
String.format("Serde %s not found", serdeName)));

0 commit comments

Comments
 (0)