diff --git a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java index cdf5bce14..30b664da1 100644 --- a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java +++ b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java @@ -39,8 +39,9 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi { public RetryingKafkaConnectClient(ClustersProperties.ConnectCluster config, @Nullable ClustersProperties.TruststoreConfig truststoreConfig, - DataSize maxBuffSize) { - super(new RetryingApiClient(config, truststoreConfig, maxBuffSize)); + DataSize maxBuffSize, + Duration responseTimeout) { + super(new RetryingApiClient(config, truststoreConfig, maxBuffSize, responseTimeout)); } private static Retry conflictCodeRetry() { @@ -318,14 +319,16 @@ private static class RetryingApiClient extends ApiClient { public RetryingApiClient(ClustersProperties.ConnectCluster config, ClustersProperties.TruststoreConfig truststoreConfig, - DataSize maxBuffSize) { - super(buildWebClient(maxBuffSize, config, truststoreConfig), null, null); + DataSize maxBuffSize, + Duration responseTimeout) { + super(buildWebClient(maxBuffSize, responseTimeout, config, truststoreConfig), null, null); setBasePath(config.getAddress()); setUsername(config.getUsername()); setPassword(config.getPassword()); } public static WebClient buildWebClient(DataSize maxBuffSize, + Duration responseTimeout, ClustersProperties.ConnectCluster config, ClustersProperties.TruststoreConfig truststoreConfig) { return new WebClientConfigurator() @@ -341,6 +344,7 @@ public static WebClient buildWebClient(DataSize maxBuffSize, config.getPassword() ) .configureBufferSize(maxBuffSize) + .configureResponseTimeout(responseTimeout) .build(); } } diff --git a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java index 6fbfce1d7..23be86369 100644 --- a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java +++ b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java @@ -77,6 +77,7 @@ public static class PollingProperties { Integer pollTimeoutMs; Integer maxPageSize; Integer defaultPageSize; + Integer responseTimeoutMs; } @Data diff --git a/api/src/main/java/io/kafbat/ui/config/WebclientProperties.java b/api/src/main/java/io/kafbat/ui/config/WebclientProperties.java index 8b147c898..b43c1f65a 100644 --- a/api/src/main/java/io/kafbat/ui/config/WebclientProperties.java +++ b/api/src/main/java/io/kafbat/ui/config/WebclientProperties.java @@ -13,6 +13,7 @@ public class WebclientProperties { String maxInMemoryBufferSize; + Integer responseTimeoutMs; @PostConstruct public void validate() { diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java index a74a33ce4..f8c528f90 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java @@ -16,6 +16,7 @@ import io.kafbat.ui.util.KafkaServicesValidation; import io.kafbat.ui.util.ReactiveFailover; import io.kafbat.ui.util.WebClientConfigurator; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -37,13 +38,18 @@ public class KafkaClusterFactory { private static final DataSize DEFAULT_WEBCLIENT_BUFFER = DataSize.parse("20MB"); + private static final Duration DEFAULT_RESPONSE_TIMEOUT = Duration.ofSeconds(20); private final DataSize webClientMaxBuffSize; + private final Duration responseTimeout; public KafkaClusterFactory(WebclientProperties webclientProperties) { this.webClientMaxBuffSize = Optional.ofNullable(webclientProperties.getMaxInMemoryBufferSize()) .map(DataSize::parse) .orElse(DEFAULT_WEBCLIENT_BUFFER); + this.responseTimeout = Optional.ofNullable(webclientProperties.getResponseTimeoutMs()) + .map(Duration::ofMillis) + .orElse(DEFAULT_RESPONSE_TIMEOUT); } public KafkaCluster create(ClustersProperties properties, @@ -147,7 +153,8 @@ private ReactiveFailover connectClient(ClustersProperties url -> new RetryingKafkaConnectClient( connectCluster.toBuilder().address(url).build(), cluster.getSsl(), - webClientMaxBuffSize + webClientMaxBuffSize, + responseTimeout ), ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER, "No alive connect instances available", diff --git a/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java b/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java index 1c289f54f..170530be1 100644 --- a/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java +++ b/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java @@ -10,6 +10,7 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import java.io.FileInputStream; import java.security.KeyStore; +import java.time.Duration; import java.util.function.Consumer; import javax.annotation.Nullable; import javax.net.ssl.KeyManagerFactory; @@ -144,6 +145,11 @@ public WebClientConfigurator configureCodecs(Consumer con return this; } + public WebClientConfigurator configureResponseTimeout(Duration responseTimeout) { + httpClient = httpClient.responseTimeout(responseTimeout); + return this; + } + public WebClient build() { return builder.clientConnector(new ReactorClientHttpConnector(httpClient)).build(); } diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index 24530bcb4..454d78c7c 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -4245,6 +4245,9 @@ components: maxInMemoryBufferSize: type: string description: "examples: 20, 12KB, 5MB" + responseTimeoutMs: + type: integer + description: "general response timeout in milliseconds for all http requests" kafka: type: object properties: