diff --git a/Makefile b/Makefile index bc3c5d78..8729edfc 100644 --- a/Makefile +++ b/Makefile @@ -43,11 +43,20 @@ down: repo-clean: rm -rf ~/.m2/repository/com/studiomediatech/query-response-spring-amqp -.PHONY: demo run-demo +## +## Demo targets +## +## These targets are used to run the demo application and its components. +## They depend on the `install` target to ensure that the application is built +## and ready to run. The up target starts the necessary Docker containers, +## and the run-demo target orchestrates the execution of the demo components. +## Please note that the parallel execution of the demo components is what makes +## it possible to start the individual components in the background. +## +.PHONY: demo run-demo run-demo run-query run-response run-ui demo: install up ${MAKE} -j3 run-demo -.PHONY: run-demo run-query run-response run-ui run-demo: run-ui run-query run-response run-query: @@ -59,4 +68,4 @@ run-response: ${MAKE} -C examples/responding/ run-ui: - ${MAKE} -C ui/ + ${MAKE} demo -C ui/ diff --git a/docs/events.adoc b/docs/events.adoc index 231dafc4..d624ab31 100644 --- a/docs/events.adoc +++ b/docs/events.adoc @@ -1,13 +1,14 @@ = Query/Response Events -== Metrics or "Stats" +== Telemetry -Participating nodes in a Query/Response topology will broadcast events of -structured statistics for the UI and monitoring applications to consume. The +Participating nodes in a Query/Response network topology will broadcast events +with structured telemetry for the UI and monitoring applications to consume. The common data-shape encapsulates a way to communicate measurements, events and -node-identifying and describing meta-data. +node-identifying and describing meta data. -The entry structure and format is encoded as a JSON UTF-8 encoded string. +The entry structure and format is encoded as a JSON UTF-8 encoded string +payload. ```json { @@ -33,7 +34,7 @@ means that in practice there are operational metrics which may _come and go_ as well as logical statistics which may pertain to the topology or cluster of nodes. -=== Table of published statistics +=== Table of published telemetry [cols="2,2,1,1,3"] |=== diff --git a/ui-frontend/src/ws.js b/ui-frontend/src/ws.js index 45946cfe..16ff6ae1 100644 --- a/ui-frontend/src/ws.js +++ b/ui-frontend/src/ws.js @@ -40,8 +40,8 @@ const connectSocket = () => { }; sock.onmessage = (msg) => { - // console.log("RECEIVED EVENT", msg); - // console.log("RECEIVED EVENT PAYLOAD", JSON.stringify(JSON.parse(msg.data))); + //console.log("RECEIVED EVENT", msg); + console.log("RECEIVED EVENT PAYLOAD", JSON.stringify(JSON.parse(msg.data))); listeners.forEach((listener) => listener(msg)); }; } catch (err) { diff --git a/ui/Makefile b/ui/Makefile index cc0f16a2..69781a90 100644 --- a/ui/Makefile +++ b/ui/Makefile @@ -1,7 +1,7 @@ JAVA_HOME=$(shell unset JAVA_HOME; /usr/libexec/java_home -v 21) MVN := ../mvnw -.PHONY: run build test verify v clean tidy +.PHONY: run demo build test verify v clean tidy run: ${MVN} spring-boot:run @@ -10,14 +10,16 @@ test verify v: ${MVN} clean verify build: - ${MVN} clean prepare-package -DskipTests + ${MVN} clean prepare-package mkdir -p target/classes/static @$(MAKE) -C ../ui-frontend build cp -r ../ui-frontend/dist/* target/classes/static/ - ${MVN} package -DskipTests + ${MVN} package clean: rm -rf target/ tidy: ${MVN} formatter:format + +demo: build run \ No newline at end of file diff --git a/ui/pom.xml b/ui/pom.xml index 9a67f457..48b3c8d2 100644 --- a/ui/pom.xml +++ b/ui/pom.xml @@ -69,6 +69,22 @@ ${project.artifactId} + + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.0.0 + + + org.apache.maven.plugins + maven-failsafe-plugin + 3.0.0 + + + + net.revelc.code.formatter @@ -81,7 +97,18 @@ - + + + + org.apache.maven.plugins + maven-failsafe-plugin + + + **/*IT.java + + + + org.springframework.boot spring-boot-maven-plugin diff --git a/ui/src/main/java/com/studiomediatech/QueryPublisher.java b/ui/src/main/java/com/studiomediatech/QueryPublisher.java index a03410b7..43720d36 100644 --- a/ui/src/main/java/com/studiomediatech/QueryPublisher.java +++ b/ui/src/main/java/com/studiomediatech/QueryPublisher.java @@ -1,13 +1,11 @@ package com.studiomediatech; import java.io.IOException; -import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.LinkedList; @@ -24,24 +22,23 @@ import org.springframework.context.event.EventListener; import org.springframework.util.StringUtils; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.studiomediatech.events.QueryRecordedEvent; import com.studiomediatech.queryresponse.QueryBuilder; -import com.studiomediatech.queryresponse.ui.QueryResponseUIApp; -import com.studiomediatech.queryresponse.ui.api.RestApiAdapter; -import com.studiomediatech.queryresponse.ui.api.WebSocketApiHandler; +import com.studiomediatech.queryresponse.ui.api.WebSocketApiHandlerPort; +import com.studiomediatech.queryresponse.ui.app.QueryResponseUIApp; +import com.studiomediatech.queryresponse.ui.app.adapter.RestApiAdapter; +import com.studiomediatech.queryresponse.ui.messaging.Stat; +import com.studiomediatech.queryresponse.ui.messaging.Stats; import com.studiomediatech.queryresponse.util.Loggable; public class QueryPublisher implements Loggable, RestApiAdapter { - private static final ObjectMapper MAPPER = new ObjectMapper(); - // This is a Fib! private static final int MAX_SIZE = 2584; private static final int SLIDING_WINDOW = 40; - private static final int DEFAULT_QUERY_TIMEOUT = 1500; + + private static final ObjectMapper MAPPER = new ObjectMapper(); static ToLongFunction statToLong = s -> ((Number) s.value()).longValue(); @@ -53,67 +50,16 @@ public class QueryPublisher implements Loggable, RestApiAdapter { private List tps = new LinkedList<>(); private final QueryBuilder queryBuilder; - private final WebSocketApiHandler handler; + private final WebSocketApiHandlerPort handler; private final Map nodes = new ConcurrentHashMap<>(); - public QueryPublisher(WebSocketApiHandler handler, QueryBuilder queryBuilder) { + public QueryPublisher(WebSocketApiHandlerPort handler, QueryBuilder queryBuilder) { this.handler = handler; this.queryBuilder = queryBuilder; } - @Override - public Map query(String q, int timeout, int limit) { - - List defaults = List.of("No responses"); - - final Collection responses; - - long start = System.nanoTime(); - - if (q.contains(" ")) { - responses = queryParsed(q, defaults); - } else { - responses = queryStrict(q, timeout, limit, defaults); - } - - return Map.of("response", responses, "duration", Duration.ofNanos(System.nanoTime() - start)); - } - - private Collection queryParsed(String q, List defaults) { - - var query = QueryRecordedEvent.valueOf(q, "none"); - - query.getQuery(); - query.getTimeout(); - - Optional maybe = query.getLimit(); - - if (maybe.isPresent()) { - - return queryBuilder.queryFor(query.getQuery(), Object.class).waitingFor(query.getTimeout()) - .takingAtMost(maybe.get()).orDefaults(defaults); - - } - - return queryBuilder.queryFor(query.getQuery(), Object.class).waitingFor(query.getTimeout()) - .orDefaults(defaults); - - } - - private Collection queryStrict(String q, int timeout, int limit, List defaults) { - long queryTimeout = timeout > 0 ? timeout : DEFAULT_QUERY_TIMEOUT; - - if (limit > 0) { - return queryBuilder.queryFor(q, Object.class).waitingFor(queryTimeout).takingAtMost(limit) - .orDefaults(defaults); - } - - return queryBuilder.queryFor(q, Object.class).waitingFor(queryTimeout).orDefaults(defaults); - - } - @EventListener void on(QueryRecordedEvent event) { @@ -142,7 +88,7 @@ void on(QueryRecordedEvent event) { void onQueryResponseStats(Message message) { try { - handle(MAPPER.readValue(message.getBody(), Stats.class).elements); + handle(MAPPER.readValue(message.getBody(), Stats.class).elements()); } catch (RuntimeException | IOException ex) { logger().error("Failed to consumed stats", ex); } @@ -158,17 +104,13 @@ protected void handle(Collection stats) { handleNodes(stats); } - @Override - public Map nodes() { - return Map.of("nodes", this.nodes, "timestamp", Instant.now(Clock.systemUTC())); - } - private void handleNodes(Collection stats) { Map> nodes = stats.stream().filter(stat -> StringUtils.hasText(stat.uuid())) .collect(Collectors.groupingBy(stat -> stat.uuid())); for (Entry> node : nodes.entrySet()) { + String uuid = node.getKey(); this.nodes.put(uuid, Instant.now()); @@ -332,18 +274,4 @@ private double calculateThroughput(String key, Collection source, List elements; - - @Override - public String toString() { - - return Optional.ofNullable(elements).orElse(Collections.emptyList()).stream().map(Object::toString) - .collect(Collectors.joining(", ")); - } - } - } diff --git a/ui/src/main/java/com/studiomediatech/queryresponse/ui/QueryResponseUIApp.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/QueryResponseUIApp.java deleted file mode 100644 index 71bd3631..00000000 --- a/ui/src/main/java/com/studiomediatech/queryresponse/ui/QueryResponseUIApp.java +++ /dev/null @@ -1,111 +0,0 @@ -package com.studiomediatech.queryresponse.ui; - -import org.springframework.amqp.core.AnonymousQueue; -import org.springframework.amqp.core.Binding; -import org.springframework.amqp.core.BindingBuilder; -import org.springframework.amqp.core.Queue; -import org.springframework.amqp.rabbit.connection.ConnectionNameStrategy; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.ApplicationEventPublisher; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Primary; -import org.springframework.core.annotation.Order; -import org.springframework.core.env.Environment; -import org.springframework.scheduling.TaskScheduler; -import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; -import org.springframework.web.socket.config.annotation.EnableWebSocket; -import org.springframework.web.socket.config.annotation.WebSocketConfigurer; -import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; - -import com.studiomediatech.QueryPublisher; -import com.studiomediatech.events.AsyncEventEmitter; -import com.studiomediatech.events.EventEmitter; -import com.studiomediatech.queryresponse.EnableQueryResponse; -import com.studiomediatech.queryresponse.QueryBuilder; -import com.studiomediatech.queryresponse.QueryResponseTopicExchange; -import com.studiomediatech.queryresponse.ui.api.WebSocketApiHandler; - -@SpringBootApplication -@EnableQueryResponse -@EnableScheduling -@EnableWebSocket -public class QueryResponseUIApp { - - public static final String QUERY_RESPONSE_STATS_QUEUE_BEAN = "queryResponseStatsQueue"; - - public static void main(String[] args) { - - SpringApplication.run(QueryResponseUIApp.class); - } - - @Order(10) - @Configuration - static class AppConfig { - - @Bean - ConnectionNameStrategy connectionNameStrategy(Environment env) { - - return connectionFactory -> env.getProperty("spring.application.name", "query-response-ui"); - } - - @Bean - @Primary - TaskScheduler taskScheduler() { - - return new ThreadPoolTaskScheduler(); - } - - @Bean - EventEmitter eventEmitter(TaskScheduler scheduler, ApplicationEventPublisher publisher) { - - return new AsyncEventEmitter(scheduler, publisher); - } - - @Bean - WebSocketApiHandler handler(EventEmitter emitter) { - - return new WebSocketApiHandler(emitter); - } - - @Bean - QueryPublisher querier(WebSocketApiHandler handler, QueryBuilder queryBuilder) { - - return new QueryPublisher(handler, queryBuilder); - } - - @Bean(QUERY_RESPONSE_STATS_QUEUE_BEAN) - Queue queryResponseStatsQueue() { - - return new AnonymousQueue(); - } - - @Bean - Binding queryResponseStatsQueueBinding(QueryResponseTopicExchange queryResponseTopicExchange) { - - return BindingBuilder.bind(queryResponseStatsQueue()).to(queryResponseTopicExchange) - .with("query-response/internal/stats"); - } - } - - @Order(100) - @Configuration - static class WebSocketConfig implements WebSocketConfigurer { - - private final WebSocketApiHandler webSocketHandler; - - public WebSocketConfig(WebSocketApiHandler webSocketHandler) { - - this.webSocketHandler = webSocketHandler; - } - - @Override - public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { - - // TODO: DO NOT ALLOW ORIGINS * !!! - registry.addHandler(webSocketHandler, "/ws").setAllowedOrigins("*"); - } - } -} diff --git a/ui/src/main/java/com/studiomediatech/queryresponse/ui/api/ApiConfig.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/api/ApiConfig.java new file mode 100644 index 00000000..9a2e9ec2 --- /dev/null +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/api/ApiConfig.java @@ -0,0 +1,16 @@ +package com.studiomediatech.queryresponse.ui.api; + +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.config.annotation.EnableWebSocket; + +/** + * Configuration for port components, providing access or information over published APIs, specifically over HTTP and + * WebSockets. + */ +@Configuration +@EnableWebSocket +@ComponentScan(basePackageClasses = ApiConfig.class) +public class ApiConfig { + // OK +} diff --git a/ui/src/main/java/com/studiomediatech/queryresponse/ui/api/RestApiController.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/api/RestApiController.java deleted file mode 100644 index 2864f654..00000000 --- a/ui/src/main/java/com/studiomediatech/queryresponse/ui/api/RestApiController.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.studiomediatech.queryresponse.ui.api; - -import java.util.Collections; -import java.util.Map; -import java.util.Optional; - -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; - -@RestController -public class RestApiController { - - private final RestApiAdapter adapter; - - public RestApiController(Optional maybe) { - this.adapter = maybe.orElse(RestApiAdapter.empty()); - } - - @GetMapping("/api") - public Map getApiRoot() { - return Map.of("version", "v1"); - } - - @GetMapping("/api/v1") - public Map none() { - return Collections.emptyMap(); - } - - @GetMapping(path = "/api/v1", params = "q") - public Map query(String q, // NOSONAR - @RequestParam(name = "timeout", required = false, defaultValue = "0") int timeout, - @RequestParam(name = "t", required = false, defaultValue = "0") int t, - @RequestParam(name = "limit", required = false, defaultValue = "0") int limit, - @RequestParam(name = "l", required = false, defaultValue = "0") int l) { - return adapter.query(q, Math.max(0, Math.max(timeout, t)), Math.max(0, Math.max(limit, l))); - } - - @GetMapping("/api/v1/nodes") - public Map nodes() { - return adapter.nodes(); - } -} diff --git a/ui/src/main/java/com/studiomediatech/queryresponse/ui/api/RestApiControllerPort.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/api/RestApiControllerPort.java new file mode 100644 index 00000000..47a63b0b --- /dev/null +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/api/RestApiControllerPort.java @@ -0,0 +1,92 @@ +package com.studiomediatech.queryresponse.ui.api; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; + +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import com.studiomediatech.queryresponse.ui.app.adapter.RestApiAdapter; + +@RestController +public class RestApiControllerPort { + + private final RestApiAdapter adapter; + + public RestApiControllerPort(Optional maybe) { + this.adapter = maybe.orElse(RestApiAdapter.empty()); + } + + @GetMapping("/api") + public Map showApi() { + return Response.from(Map.of("now", Instant.now())).withLinks("v0", "/api/v0", "v1", "/api/v1"); + } + + @GetMapping("/api/v0") + public Map v0() { + return Response.from(Map.of("version", "v0", "now", Instant.now())).withLinks("nodes", "/api/v0/nodes"); + } + + @GetMapping("/api/v0/nodes") + public Map nodes() { + return adapter.nodes(); + } + + @GetMapping(path = "/api/v1", params = "q") + public Map query(String q, // NOSONAR + @RequestParam(name = "timeout", required = false, defaultValue = "0") int timeout, + @RequestParam(name = "t", required = false, defaultValue = "0") int t, + @RequestParam(name = "limit", required = false, defaultValue = "0") int limit, + @RequestParam(name = "l", required = false, defaultValue = "0") int l) { + + int normalizedTimeout = Math.max(0, Math.max(timeout, t)); + int normalizedLimit = Math.max(0, Math.max(limit, l)); + + return adapter.query(q, normalizedTimeout, normalizedLimit); + } + + @GetMapping(path = "/api/v1", params = "!q") + public Map v1() { + return Response.from(Map.of("version", "v1", "now", Instant.now())).withLinks("query-response", + "/api/v1?q=query"); + } + + protected interface Response { + public static ResponseBuilder from(Map map) { + return new ResponseBuilder(map); + } + } + + static class ResponseBuilder { + + private final Map map; + + private ResponseBuilder(Map map) { + this.map = map; + } + + public Map withLinks(String... args) { + + var links = new ArrayList(); + + for (int i = 0; i < args.length; i = i + 2) { + var rel = args[i]; + var val = args[i + 1]; + links.add(new Link(rel, val)); + } + + var results = new LinkedHashMap<>(map); + results.put("_links", links); + return results; + } + } + + protected record Link(String rel, String href) { + // OK + } + +} diff --git a/ui/src/main/java/com/studiomediatech/queryresponse/ui/api/WebSocketApiHandler.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/api/WebSocketApiHandlerPort.java similarity index 92% rename from ui/src/main/java/com/studiomediatech/queryresponse/ui/api/WebSocketApiHandler.java rename to ui/src/main/java/com/studiomediatech/queryresponse/ui/api/WebSocketApiHandlerPort.java index 9dbf49bd..ff419c84 100644 --- a/ui/src/main/java/com/studiomediatech/queryresponse/ui/api/WebSocketApiHandler.java +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/api/WebSocketApiHandlerPort.java @@ -8,6 +8,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.springframework.stereotype.Component; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; @@ -17,22 +18,24 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.studiomediatech.Stat; -import com.studiomediatech.events.EventEmitter; import com.studiomediatech.events.QueryRecordedEvent; +import com.studiomediatech.queryresponse.ui.app.adapter.EventEmitterAdapter; +import com.studiomediatech.queryresponse.ui.app.adapter.WebSocketApiAdapter; +import com.studiomediatech.queryresponse.ui.messaging.Stat; import com.studiomediatech.queryresponse.util.Loggable; -public class WebSocketApiHandler extends TextWebSocketHandler implements Loggable { +@Component +public class WebSocketApiHandlerPort extends TextWebSocketHandler implements WebSocketApiAdapter, Loggable { private static final int SEND_TIME_LIMIT = 6 * 1000; private static final int SEND_BUFFER_SIZE_LIMIT = 512 * 1024; private final Map sessionsById = new ConcurrentHashMap<>(); - private final EventEmitter emitter; + private final EventEmitterAdapter emitter; private final ObjectMapper objectMapper; - public WebSocketApiHandler(EventEmitter emitter) { + public WebSocketApiHandlerPort(EventEmitterAdapter emitter) { this.emitter = emitter; diff --git a/ui/src/main/java/com/studiomediatech/queryresponse/ui/api/WebSocketConfig.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/api/WebSocketConfig.java new file mode 100644 index 00000000..b9a459a7 --- /dev/null +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/api/WebSocketConfig.java @@ -0,0 +1,23 @@ +package com.studiomediatech.queryresponse.ui.api; + +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.Order; +import org.springframework.web.socket.config.annotation.WebSocketConfigurer; +import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; + +@Order(100) +@Configuration +class WebSocketConfig implements WebSocketConfigurer { + + private final WebSocketApiHandlerPort webSocketHandler; + + public WebSocketConfig(WebSocketApiHandlerPort webSocketHandler) { + this.webSocketHandler = webSocketHandler; + } + + @Override + public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + // TODO: DO NOT ALLOW ORIGINS * !!! + registry.addHandler(webSocketHandler, "/ws").setAllowedOrigins("*"); + } +} \ No newline at end of file diff --git a/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/QueryResponseUIApp.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/QueryResponseUIApp.java new file mode 100644 index 00000000..d1067c3e --- /dev/null +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/QueryResponseUIApp.java @@ -0,0 +1,28 @@ +package com.studiomediatech.queryresponse.ui.app; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.scheduling.annotation.EnableScheduling; + +import com.studiomediatech.queryresponse.ui.api.ApiConfig; +import com.studiomediatech.queryresponse.ui.infra.InfraConfig; +import com.studiomediatech.queryresponse.ui.messaging.MessagingConfig; + +@EnableScheduling +@SpringBootApplication +public class QueryResponseUIApp { + + public static final String QUERY_RESPONSE_STATS_QUEUE_BEAN = "queryResponseStatsBean"; + + public static void main(String[] args) { + SpringApplication.run(QueryResponseUIApp.class); + } + + @Configuration + @Import({ MessagingConfig.class, ApiConfig.class, InfraConfig.class }) + static class Setup { + // OK + } +} diff --git a/ui/src/main/java/com/studiomediatech/events/EventEmitter.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/adapter/EventEmitterAdapter.java similarity index 59% rename from ui/src/main/java/com/studiomediatech/events/EventEmitter.java rename to ui/src/main/java/com/studiomediatech/queryresponse/ui/app/adapter/EventEmitterAdapter.java index 2ff0299e..4e70d411 100644 --- a/ui/src/main/java/com/studiomediatech/events/EventEmitter.java +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/adapter/EventEmitterAdapter.java @@ -1,10 +1,9 @@ -package com.studiomediatech.events; +package com.studiomediatech.queryresponse.ui.app.adapter; /** * Declares the contract provided to clients, on how an event can be emitted. */ @FunctionalInterface -public interface EventEmitter { - +public interface EventEmitterAdapter { void emitEvent(Object event); } diff --git a/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/adapter/QueryPublisherAdapter.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/adapter/QueryPublisherAdapter.java new file mode 100644 index 00000000..f6fe0d5b --- /dev/null +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/adapter/QueryPublisherAdapter.java @@ -0,0 +1,17 @@ +package com.studiomediatech.queryresponse.ui.app.adapter; + +import java.util.Collections; +import java.util.Map; + +public interface QueryPublisherAdapter { + + static QueryPublisherAdapter empty() { + return new QueryPublisherAdapter() { + // OK + }; + } + + default Map query(String q, int timeout, int limit) { + return Collections.emptyMap(); + } +} diff --git a/ui/src/main/java/com/studiomediatech/queryresponse/ui/api/RestApiAdapter.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/adapter/RestApiAdapter.java similarity index 87% rename from ui/src/main/java/com/studiomediatech/queryresponse/ui/api/RestApiAdapter.java rename to ui/src/main/java/com/studiomediatech/queryresponse/ui/app/adapter/RestApiAdapter.java index c0bfb5a4..8fc35e14 100644 --- a/ui/src/main/java/com/studiomediatech/queryresponse/ui/api/RestApiAdapter.java +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/adapter/RestApiAdapter.java @@ -1,4 +1,4 @@ -package com.studiomediatech.queryresponse.ui.api; +package com.studiomediatech.queryresponse.ui.app.adapter; import java.util.Collections; import java.util.Map; diff --git a/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/adapter/TelemetryHandlerAdapter.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/adapter/TelemetryHandlerAdapter.java new file mode 100644 index 00000000..4ee13d6d --- /dev/null +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/adapter/TelemetryHandlerAdapter.java @@ -0,0 +1,20 @@ +package com.studiomediatech.queryresponse.ui.app.adapter; + +import com.studiomediatech.queryresponse.ui.messaging.Stats; +import com.studiomediatech.queryresponse.util.Loggable; + +/** + * Declares the capabilities of the incoming side for statistics to aggregate. + */ +public interface TelemetryHandlerAdapter extends Loggable { + + static TelemetryHandlerAdapter empty() { + return new TelemetryHandlerAdapter() { + // OK + }; + } + + default void handleConsumed(Stats stats) { + logger().warn("NOT YET HANDLING {}", stats); + } +} diff --git a/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/adapter/WebSocketApiAdapter.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/adapter/WebSocketApiAdapter.java new file mode 100644 index 00000000..63c72091 --- /dev/null +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/adapter/WebSocketApiAdapter.java @@ -0,0 +1,19 @@ +package com.studiomediatech.queryresponse.ui.app.adapter; + +import java.util.Collection; + +import com.studiomediatech.queryresponse.ui.app.telemetry.Node; +import com.studiomediatech.queryresponse.util.Loggable; + +public interface WebSocketApiAdapter extends Loggable { + + static WebSocketApiAdapter empty() { + return new WebSocketApiAdapter() { + // OK + }; + } + + default void publishNodes(Collection nodes) { + logger().warn("NOT PUBLISHING NODES! {}", nodes); + } +} diff --git a/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/telemetry/Node.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/telemetry/Node.java new file mode 100644 index 00000000..36abcd3b --- /dev/null +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/telemetry/Node.java @@ -0,0 +1,91 @@ +package com.studiomediatech.queryresponse.ui.app.telemetry; + +import java.util.Comparator; +import java.util.UUID; + +import org.springframework.util.Assert; + +import com.studiomediatech.queryresponse.util.Loggable; + +public class Node implements Loggable { + + public static final Comparator SORT = Comparator.comparing(Node::getName).thenComparing(Node::getUUID); + + protected final UUID uuid; + protected String name; + protected String pid; + protected String host; + protected String uptime; + + public Node(UUID uuid) { + this.uuid = uuid; + } + + public Node(Node other) { + this(other.uuid); + this.name = other.name; + this.pid = other.pid; + this.host = other.host; + this.uptime = other.uptime; + } + + public static Node from(UUID uuid) { + return new Node(uuid); + } + + public Node update(Node other) { + + Assert.isTrue(other.uuid.equals(this.uuid), "Must be same node."); + + logger().info("Updating {} with {}", this, other); + + this.host = other.host; + this.name = other.name; + this.uptime = other.uptime; + this.pid = other.pid; + + return this; + } + + @Override + public String toString() { + return "Node [uuid=%s]".formatted(uuid); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getPid() { + return pid; + } + + public void setPid(String pid) { + this.pid = pid; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public String getUptime() { + return uptime; + } + + public void setUptime(String uptime) { + this.uptime = uptime; + } + + public UUID getUUID() { + return uuid; + } + +} diff --git a/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/telemetry/NodeRepository.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/telemetry/NodeRepository.java new file mode 100644 index 00000000..daff6285 --- /dev/null +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/telemetry/NodeRepository.java @@ -0,0 +1,14 @@ +package com.studiomediatech.queryresponse.ui.app.telemetry; + +import java.util.Collection; +import java.util.Optional; +import java.util.UUID; + +public interface NodeRepository { + + Optional findOneByUUID(UUID uuid); + + void save(Node node); + + Collection findAll(); +} diff --git a/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/telemetry/TelemetryService.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/telemetry/TelemetryService.java new file mode 100644 index 00000000..e2fd1155 --- /dev/null +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/app/telemetry/TelemetryService.java @@ -0,0 +1,91 @@ +package com.studiomediatech.queryresponse.ui.app.telemetry; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import com.studiomediatech.queryresponse.ui.app.adapter.QueryPublisherAdapter; +import com.studiomediatech.queryresponse.ui.app.adapter.RestApiAdapter; +import com.studiomediatech.queryresponse.ui.app.adapter.TelemetryHandlerAdapter; +import com.studiomediatech.queryresponse.ui.app.adapter.WebSocketApiAdapter; +import com.studiomediatech.queryresponse.ui.messaging.Stat; +import com.studiomediatech.queryresponse.ui.messaging.Stats; +import com.studiomediatech.queryresponse.util.Loggable; + +public class TelemetryService implements Loggable, TelemetryHandlerAdapter, RestApiAdapter { + + private final WebSocketApiAdapter webSocketApiAdapter; + private final NodeRepository nodeRepository; + private final QueryPublisherAdapter queryPublisherAdapter; + + public TelemetryService(WebSocketApiAdapter webSocketApiAdapter, NodeRepository nodeRepository, + QueryPublisherAdapter queryPublisherAdapter) { + this.webSocketApiAdapter = webSocketApiAdapter; + this.nodeRepository = nodeRepository; + this.queryPublisherAdapter = queryPublisherAdapter; + } + + @Override + public Map query(String q, int timeout, int limit) { + return queryPublisherAdapter.query(q, timeout, limit); + } + + @Override + public Map nodes() { + return Map.of("elements", nodeRepository.findAll()); + } + + @Override + public void handleConsumed(Stats stats) { + + logger().info("Consumed stats with {} elements", stats.elements().size()); + + Collection nodes = parseToNodesCollection(stats); + updateNodes(nodes); + + // stats.elements().stream().filter(s -> s.timestamp() != + // null).map(Stat::toString) + // .forEach(str -> log().debug("STAT VALUE: {}", str)); + // + // stats.elements().stream().filter(s -> s.timestamp() == + // null).map(Stat::toString) + // .forEach(str -> log().debug("STAT INFO: {}", str)); + } + + private void updateNodes(Collection nodes) { + for (Node node : nodes) { + Optional maybe = nodeRepository.findOneByUUID(node.getUUID()); + if (maybe.isPresent()) { + Node updated = maybe.get().update(node); + nodeRepository.save(updated); + } else { + nodeRepository.save(node); + } + } + } + + protected Collection parseToNodesCollection(Stats stats) { + + Map nodes = new HashMap<>(); + + for (Stat stat : stats.elements()) { + + UUID uuid = UUID.fromString(stat.uuid()); + Node node = nodes.computeIfAbsent(uuid, key -> Node.from(key)); + + stat.whenKey("name", node::setName); + stat.whenKey("pid", node::setPid); + stat.whenKey("host", node::setHost); + stat.whenKey("uptime", node::setUptime); + } + + return nodes.values(); + } + + public void publishNodes() { + webSocketApiAdapter.publishNodes(nodeRepository.findAll()); + } + +} diff --git a/ui/src/main/java/com/studiomediatech/events/AsyncEventEmitter.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/infra/AsyncEventEmitter.java similarity index 80% rename from ui/src/main/java/com/studiomediatech/events/AsyncEventEmitter.java rename to ui/src/main/java/com/studiomediatech/queryresponse/ui/infra/AsyncEventEmitter.java index 35f9e300..91143fbd 100644 --- a/ui/src/main/java/com/studiomediatech/events/AsyncEventEmitter.java +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/infra/AsyncEventEmitter.java @@ -1,15 +1,17 @@ -package com.studiomediatech.events; +package com.studiomediatech.queryresponse.ui.infra; import org.springframework.context.ApplicationEventPublisher; import org.springframework.scheduling.TaskScheduler; +import com.studiomediatech.queryresponse.ui.app.adapter.EventEmitterAdapter; + import java.time.Instant; /** * An implementation that emits as a new scheduled task, ensuring that the calling thread is no longer blocked. */ -public class AsyncEventEmitter implements EventEmitter { +public class AsyncEventEmitter implements EventEmitterAdapter { // NOTE: Scheduled ASAP for any instant in the past. private static final Instant ASAP = Instant.EPOCH; diff --git a/ui/src/main/java/com/studiomediatech/queryresponse/ui/infra/InfraConfig.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/infra/InfraConfig.java new file mode 100644 index 00000000..35563b14 --- /dev/null +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/infra/InfraConfig.java @@ -0,0 +1,60 @@ +package com.studiomediatech.queryresponse.ui.infra; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; + +import com.studiomediatech.queryresponse.ui.app.adapter.EventEmitterAdapter; +import com.studiomediatech.queryresponse.ui.app.adapter.QueryPublisherAdapter; +import com.studiomediatech.queryresponse.ui.app.adapter.WebSocketApiAdapter; +import com.studiomediatech.queryresponse.ui.app.telemetry.NodeRepository; +import com.studiomediatech.queryresponse.ui.app.telemetry.TelemetryService; +import com.studiomediatech.queryresponse.ui.infra.repo.InMemoryNodeRepository; + +/** + * Configuration for components of the application, with adapters for the platform. + */ +@Configuration +@ComponentScan(basePackageClasses = InfraConfig.class) +public class InfraConfig { + + @Bean + @Primary + TaskScheduler taskScheduler() { + return new ThreadPoolTaskScheduler(); + } + + @Bean + EventEmitterAdapter eventEmitter(TaskScheduler scheduler, ApplicationEventPublisher publisher) { + return new AsyncEventEmitter(scheduler, publisher); + } + + @Bean + @ConditionalOnMissingBean + WebSocketApiAdapter emptyWebSocketApiAdapter() { + return WebSocketApiAdapter.empty(); + } + + @Bean + @ConditionalOnMissingBean + NodeRepository inMemoryNodeRepository() { + return new InMemoryNodeRepository(); + } + + @Bean + @ConditionalOnMissingBean + QueryPublisherAdapter emptyQueryPublisherAdapter() { + return QueryPublisherAdapter.empty(); + } + + @Bean + public TelemetryService telemetryService(WebSocketApiAdapter webSocketApiAdapter, NodeRepository nodeRepository, + QueryPublisherAdapter queryPublisherAdapter) { + return new TelemetryService(webSocketApiAdapter, nodeRepository, queryPublisherAdapter); + } +} diff --git a/ui/src/main/java/com/studiomediatech/queryresponse/ui/infra/adapter/TelemetryServiceAdapter.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/infra/adapter/TelemetryServiceAdapter.java new file mode 100644 index 00000000..50958189 --- /dev/null +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/infra/adapter/TelemetryServiceAdapter.java @@ -0,0 +1,24 @@ +package com.studiomediatech.queryresponse.ui.infra.adapter; + +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import com.studiomediatech.queryresponse.ui.app.telemetry.TelemetryService; +import com.studiomediatech.queryresponse.util.Loggable; + +@Component +public class TelemetryServiceAdapter implements Loggable { + + private final TelemetryService service; + + public TelemetryServiceAdapter(TelemetryService service) { + this.service = service; + } + + @Scheduled(fixedDelayString = "PT3S") + public void aFewSecondsHasPassed() { + logger().info("TIC TOC!"); + service.publishNodes(); + } + +} diff --git a/ui/src/main/java/com/studiomediatech/queryresponse/ui/infra/repo/InMemoryNodeRepository.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/infra/repo/InMemoryNodeRepository.java new file mode 100644 index 00000000..41ce606a --- /dev/null +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/infra/repo/InMemoryNodeRepository.java @@ -0,0 +1,31 @@ +package com.studiomediatech.queryresponse.ui.infra.repo; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import com.studiomediatech.queryresponse.ui.app.telemetry.Node; +import com.studiomediatech.queryresponse.ui.app.telemetry.NodeRepository; + +public class InMemoryNodeRepository implements NodeRepository { + + private final Map nodes = new ConcurrentHashMap<>(); + + @Override + public Optional findOneByUUID(UUID uuid) { + return Optional.ofNullable(nodes.get(uuid)); + } + + @Override + public void save(Node node) { + Node copy = new Node(node); + nodes.put(copy.getUUID(), copy); + } + + @Override + public Collection findAll() { + return nodes.values().stream().sorted(Node.SORT).toList(); + } +} diff --git a/ui/src/main/java/com/studiomediatech/queryresponse/ui/messaging/MessagingConfig.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/messaging/MessagingConfig.java new file mode 100644 index 00000000..e3c33832 --- /dev/null +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/messaging/MessagingConfig.java @@ -0,0 +1,51 @@ +package com.studiomediatech.queryresponse.ui.messaging; + +import org.springframework.amqp.core.AnonymousQueue; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbit.connection.ConnectionNameStrategy; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.env.Environment; + +import com.studiomediatech.queryresponse.EnableQueryResponse; +import com.studiomediatech.queryresponse.QueryResponseTopicExchange; + +/** + * Configuration for ports that enable messaging, specifically AMQP including Query/Response. + */ +@Configuration +@EnableQueryResponse +@ComponentScan(basePackageClasses = MessagingConfig.class) +public class MessagingConfig { + + static final String QUERY_RESPONSE_STATS_QUEUE_BEAN = "queryResponseStatsQueue"; + static final String QUERY_RESPONSE_QUERIES_QUEUE_BEAN = "queryResponseQueriesQueue"; + + protected static final String QUERY_RESPONSE_STATS_BINDING = "queryResponseStatsBinding"; + protected static final String QUERY_RESPONSE_TELEMETRY_BINDING = "queryResponseTelemetryBinding"; + + @Bean + ConnectionNameStrategy connectionNameStrategy(Environment env) { + return connectionFactory -> env.getProperty("spring.application.name", "query-response-ui"); + } + + @Bean(QUERY_RESPONSE_STATS_QUEUE_BEAN) + Queue queryResponseStatsQueue() { + return new AnonymousQueue(); + } + + @Bean(QUERY_RESPONSE_STATS_BINDING) + Binding queryResponseStatsQueueBinding(QueryResponseTopicExchange queryResponseTopicExchange) { + return BindingBuilder.bind(queryResponseStatsQueue()).to(queryResponseTopicExchange) + .with("query-response/internal/stats"); + } + + @Bean(QUERY_RESPONSE_TELEMETRY_BINDING) + Binding queryResponseTelemetryQueueBinding(QueryResponseTopicExchange queryResponseTopicExchange) { + return BindingBuilder.bind(queryResponseStatsQueue()).to(queryResponseTopicExchange) + .with("query-response/internal/telemetry"); + } +} diff --git a/ui/src/main/java/com/studiomediatech/queryresponse/ui/messaging/QueryPublisherPort.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/messaging/QueryPublisherPort.java new file mode 100644 index 00000000..f4a8d9a0 --- /dev/null +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/messaging/QueryPublisherPort.java @@ -0,0 +1,78 @@ +package com.studiomediatech.queryresponse.ui.messaging; + +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.springframework.stereotype.Component; + +import com.studiomediatech.events.QueryRecordedEvent; +import com.studiomediatech.queryresponse.QueryBuilder; +import com.studiomediatech.queryresponse.ui.app.adapter.QueryPublisherAdapter; +import com.studiomediatech.queryresponse.util.Loggable; + +@Component +public class QueryPublisherPort implements Loggable, QueryPublisherAdapter { + + private static final int DEFAULT_QUERY_TIMEOUT = 1500; + + private final QueryBuilder queryBuilder; + + public QueryPublisherPort(QueryBuilder queryBuilder) { + this.queryBuilder = queryBuilder; + } + + @Override + public Map query(String q, int timeout, int limit) { + + List defaults = List.of("No responses"); + + final Collection responses; + + long start = System.nanoTime(); + + if (q.contains(" ")) { + responses = queryParsed(q, defaults); + } else { + responses = queryStrict(q, timeout, limit, defaults); + } + + return Map.of("response", responses, "duration", Duration.ofNanos(System.nanoTime() - start)); + } + + private Collection queryParsed(String q, List defaults) { + + var query = QueryRecordedEvent.valueOf(q, "none"); + + query.getQuery(); + query.getTimeout(); + + Optional maybe = query.getLimit(); + + if (maybe.isPresent()) { + + return queryBuilder.queryFor(query.getQuery(), Object.class).waitingFor(query.getTimeout()) + .takingAtMost(maybe.get()).orDefaults(defaults); + + } + + return queryBuilder.queryFor(query.getQuery(), Object.class).waitingFor(query.getTimeout()) + .orDefaults(defaults); + + } + + private Collection queryStrict(String q, int timeout, int limit, List defaults) { + long queryTimeout = timeout > 0 ? timeout : DEFAULT_QUERY_TIMEOUT; + + if (limit > 0) { + return queryBuilder.queryFor(q, Object.class).waitingFor(queryTimeout).takingAtMost(limit) + .orDefaults(defaults); + } + + return queryBuilder.queryFor(q, Object.class).waitingFor(queryTimeout).orDefaults(defaults); + + } + +} diff --git a/ui/src/main/java/com/studiomediatech/Stat.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/messaging/Stat.java similarity index 71% rename from ui/src/main/java/com/studiomediatech/Stat.java rename to ui/src/main/java/com/studiomediatech/queryresponse/ui/messaging/Stat.java index d015e099..ae2505e5 100644 --- a/ui/src/main/java/com/studiomediatech/Stat.java +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/messaging/Stat.java @@ -1,4 +1,6 @@ -package com.studiomediatech; +package com.studiomediatech.queryresponse.ui.messaging; + +import java.util.function.Consumer; public record Stat(String key, Object value, Long timestamp, String uuid) { @@ -12,10 +14,10 @@ public record Stat(String key, Object value, Long timestamp, String uuid) { public static final String THROUGHPUT_RESPONSES = "throughput_responses"; public static final String AVG_THROUGHPUT = "avg_throughput"; - // @Override - // public String toString() { - // - // return key + "=" + value + (timestamp != null ? " " + timestamp : "") - // + (uuid != null ? " uuid=" + uuid : ""); - // } + @SuppressWarnings("unchecked") + public void whenKey(String key, Consumer target) { + if (this.key.equals(key)) { + target.accept((T) value); + } + } } \ No newline at end of file diff --git a/ui/src/main/java/com/studiomediatech/queryresponse/ui/messaging/Stats.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/messaging/Stats.java new file mode 100644 index 00000000..c80c46b4 --- /dev/null +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/messaging/Stats.java @@ -0,0 +1,6 @@ +package com.studiomediatech.queryresponse.ui.messaging; + +import java.util.Collection; + +public record Stats(Collection elements) { +} diff --git a/ui/src/main/java/com/studiomediatech/queryresponse/ui/messaging/TelemetryConsumerPort.java b/ui/src/main/java/com/studiomediatech/queryresponse/ui/messaging/TelemetryConsumerPort.java new file mode 100644 index 00000000..85bbe92c --- /dev/null +++ b/ui/src/main/java/com/studiomediatech/queryresponse/ui/messaging/TelemetryConsumerPort.java @@ -0,0 +1,48 @@ +package com.studiomediatech.queryresponse.ui.messaging; + +import java.io.IOException; +import java.util.Optional; + +import org.springframework.amqp.core.AcknowledgeMode; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.studiomediatech.queryresponse.ui.app.adapter.TelemetryHandlerAdapter; +import com.studiomediatech.queryresponse.util.Loggable; + +/** + * Consumes Query/Response telemetry messages from the internal topics, and directly delegates for handling via the + * abstract adapter. + */ +@Component +class TelemetryConsumerPort implements Loggable { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + /** + * {@link AcknowledgeMode#NONE} + */ + private static final String NONE = "NONE"; + private static final String CONSUMERS_MIN = "1"; + private static final String CONSUMERS_MAX = "7"; + + private final TelemetryHandlerAdapter adapter; + + public TelemetryConsumerPort(Optional maybe) { + this.adapter = maybe.orElse(TelemetryHandlerAdapter.empty()); + } + + @RabbitListener(// + queues = "#{@" + MessagingConfig.QUERY_RESPONSE_STATS_QUEUE_BEAN + "}", // + ackMode = NONE, concurrency = CONSUMERS_MIN + "-" + CONSUMERS_MAX) + void onQueryResponseStats(Message message) { + try { + Stats stats = MAPPER.readValue(message.getBody(), Stats.class); + adapter.handleConsumed(stats); + } catch (RuntimeException | IOException ex) { + logger().error("Failed to consumed telemetry message", ex); + } + } +} diff --git a/ui/src/main/resources/application.yaml b/ui/src/main/resources/application.yaml index ea22b08b..867e7a77 100644 --- a/ui/src/main/resources/application.yaml +++ b/ui/src/main/resources/application.yaml @@ -1,2 +1,2 @@ spring.application.name: query-response-ui -logging.level.com.studiomediatech.QueryPublisher: DEBUG \ No newline at end of file +logging.level.com.studiomediatech.queryresponse.ui: DEBUG diff --git a/ui/src/test/java/com/studiomediatech/queryresponse/ui/api/RestApiControllerTest.java b/ui/src/test/java/com/studiomediatech/queryresponse/ui/api/RestApiControllerPortTest.java similarity index 79% rename from ui/src/test/java/com/studiomediatech/queryresponse/ui/api/RestApiControllerTest.java rename to ui/src/test/java/com/studiomediatech/queryresponse/ui/api/RestApiControllerPortTest.java index 60f3cb7a..9013e2fc 100644 --- a/ui/src/test/java/com/studiomediatech/queryresponse/ui/api/RestApiControllerTest.java +++ b/ui/src/test/java/com/studiomediatech/queryresponse/ui/api/RestApiControllerPortTest.java @@ -13,23 +13,23 @@ import org.springframework.test.web.servlet.setup.MockMvcBuilders; @ExtendWith(MockitoExtension.class) -class RestApiControllerTest { +class RestApiControllerPortTest { MockMvc mockMvc; @BeforeEach void setup() { - mockMvc = MockMvcBuilders.standaloneSetup(new RestApiController(Optional.empty())).build(); + mockMvc = MockMvcBuilders.standaloneSetup(new RestApiControllerPort(Optional.empty())).build(); } @Test void ensureHandlesQueryRequest() throws Exception { - mockMvc.perform(get("/api/v1/").param("q", "hello")).andExpect(status().isOk()); + mockMvc.perform(get("/api/v1").param("q", "hello")).andExpect(status().isOk()); } @Test void ensureHandlesNodesRequest() throws Exception { - mockMvc.perform(get("/api/v1/nodes")).andExpect(status().isOk()); + mockMvc.perform(get("/api/v0/nodes")).andExpect(status().isOk()); } } diff --git a/ui/src/test/java/com/studiomediatech/queryresponse/ui/messaging/MessagingConfigIT.java b/ui/src/test/java/com/studiomediatech/queryresponse/ui/messaging/MessagingConfigIT.java new file mode 100644 index 00000000..4cb35d42 --- /dev/null +++ b/ui/src/test/java/com/studiomediatech/queryresponse/ui/messaging/MessagingConfigIT.java @@ -0,0 +1,43 @@ +package com.studiomediatech.queryresponse.ui.messaging; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.Queue; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.ApplicationContext; + +@SpringBootTest(classes = MessagingConfig.class) +class MessagingConfigIT { + + @Autowired + ApplicationContext ctx; + + @Test + void ensure_has_telemetry_queue_with_binding() { + + Queue queue = (Queue) ctx.getBean(MessagingConfig.QUERY_RESPONSE_STATS_QUEUE_BEAN); + assertThat(queue).isNotNull(); + + Binding binding = (Binding) ctx.getBean(MessagingConfig.QUERY_RESPONSE_STATS_BINDING); + assertThat(binding).isNotNull(); + assertThat(binding.getDestination()).isEqualTo(queue.getName()); + assertThat(binding.getRoutingKey()).isEqualTo("query-response/internal/stats"); + assertThat(binding.getExchange()).isEqualTo("query-response"); + } + + @Test + void ensure_has_telemetry_queue_with_modernised_binding() throws Exception { + + Queue queue = (Queue) ctx.getBean(MessagingConfig.QUERY_RESPONSE_STATS_QUEUE_BEAN); + assertThat(queue).isNotNull(); + + Binding binding = (Binding) ctx.getBean(MessagingConfig.QUERY_RESPONSE_TELEMETRY_BINDING); + assertThat(binding).isNotNull(); + assertThat(binding.getDestination()).isEqualTo(queue.getName()); + assertThat(binding.getRoutingKey()).isEqualTo("query-response/internal/telemetry"); + assertThat(binding.getExchange()).isEqualTo("query-response"); + } +}