Skip to content

Commit ad7c9a1

Browse files
committed
BE: Implement a mechanism to skip SSL verification. Resolves #53
1 parent c06385d commit ad7c9a1

9 files changed

+85
-46
lines changed

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,22 +35,31 @@ public class ClustersProperties {
3535
public static class Cluster {
3636
String name;
3737
String bootstrapServers;
38+
39+
TruststoreConfig ssl;
40+
3841
String schemaRegistry;
3942
SchemaRegistryAuth schemaRegistryAuth;
4043
KeystoreConfig schemaRegistrySsl;
44+
4145
String ksqldbServer;
4246
KsqldbServerAuth ksqldbServerAuth;
4347
KeystoreConfig ksqldbServerSsl;
48+
4449
List<ConnectCluster> kafkaConnect;
45-
MetricsConfigData metrics;
46-
Map<String, Object> properties;
47-
boolean readOnly = false;
50+
4851
List<SerdeConfig> serde;
4952
String defaultKeySerde;
5053
String defaultValueSerde;
51-
List<Masking> masking;
54+
55+
MetricsConfigData metrics;
56+
Map<String, Object> properties;
57+
boolean readOnly = false;
58+
5259
Long pollingThrottleRate;
53-
TruststoreConfig ssl;
60+
61+
List<Masking> masking;
62+
5463
AuditProperties audit;
5564
}
5665

@@ -99,6 +108,16 @@ public static class SchemaRegistryAuth {
99108
public static class TruststoreConfig {
100109
String truststoreLocation;
101110
String truststorePassword;
111+
boolean verifySsl = true;
112+
}
113+
114+
@Data
115+
@NoArgsConstructor
116+
@AllArgsConstructor
117+
@ToString(exclude = {"keystorePassword"})
118+
public static class KeystoreConfig {
119+
String keystoreLocation;
120+
String keystorePassword;
102121
}
103122

104123
@Data
@@ -118,15 +137,6 @@ public static class KsqldbServerAuth {
118137
String password;
119138
}
120139

121-
@Data
122-
@NoArgsConstructor
123-
@AllArgsConstructor
124-
@ToString(exclude = {"keystorePassword"})
125-
public static class KeystoreConfig {
126-
String keystoreLocation;
127-
String keystorePassword;
128-
}
129-
130140
@Data
131141
public static class Masking {
132142
Type type;
@@ -182,6 +192,7 @@ private void flattenClusterProperties() {
182192
}
183193
}
184194

195+
@SuppressWarnings("unchecked")
185196
private Map<String, Object> flattenClusterProperties(@Nullable String prefix,
186197
@Nullable Map<String, Object> propertiesMap) {
187198
Map<String, Object> flattened = new HashMap<>();

api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import io.kafbat.ui.config.ClustersProperties;
44
import io.kafbat.ui.model.KafkaCluster;
5-
import io.kafbat.ui.util.SslPropertiesUtil;
5+
import io.kafbat.ui.util.KafkaClientSslPropertiesUtil;
66
import java.io.Closeable;
77
import java.time.Instant;
88
import java.util.Map;
@@ -42,7 +42,7 @@ public Mono<ReactiveAdminClient> get(KafkaCluster cluster) {
4242
private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
4343
return Mono.fromSupplier(() -> {
4444
Properties properties = new Properties();
45-
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
45+
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
4646
properties.putAll(cluster.getProperties());
4747
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
4848
properties.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);

api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import io.kafbat.ui.model.SortOrderDTO;
1111
import io.kafbat.ui.service.rbac.AccessControlService;
1212
import io.kafbat.ui.util.ApplicationMetrics;
13-
import io.kafbat.ui.util.SslPropertiesUtil;
13+
import io.kafbat.ui.util.KafkaClientSslPropertiesUtil;
1414
import java.util.ArrayList;
1515
import java.util.Collection;
1616
import java.util.Comparator;
@@ -254,7 +254,7 @@ public EnhancedConsumer createConsumer(KafkaCluster cluster) {
254254
public EnhancedConsumer createConsumer(KafkaCluster cluster,
255255
Map<String, Object> properties) {
256256
Properties props = new Properties();
257-
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
257+
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
258258
props.putAll(cluster.getProperties());
259259
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafbat-ui-consumer-" + System.currentTimeMillis());
260260
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());

api/src/main/java/io/kafbat/ui/service/MessagesService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import io.kafbat.ui.model.TopicMessageEventDTO;
2424
import io.kafbat.ui.serdes.ConsumerRecordDeserializer;
2525
import io.kafbat.ui.serdes.ProducerRecordCreator;
26-
import io.kafbat.ui.util.SslPropertiesUtil;
26+
import io.kafbat.ui.util.KafkaClientSslPropertiesUtil;
2727
import java.time.Instant;
2828
import java.time.OffsetDateTime;
2929
import java.time.ZoneOffset;
@@ -199,7 +199,7 @@ private Mono<RecordMetadata> sendMessageImpl(KafkaCluster cluster,
199199
public static KafkaProducer<byte[], byte[]> createProducer(KafkaCluster cluster,
200200
Map<String, Object> additionalProps) {
201201
Properties properties = new Properties();
202-
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
202+
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
203203
properties.putAll(cluster.getProperties());
204204
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
205205
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

api/src/main/java/io/kafbat/ui/service/ksql/KsqlApiClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,8 @@ private Flux<KsqlResponseTable> executeSelect(String ksql, Map<String, String> s
130130
* Some version of ksqldb (?..0.24) can cut off json streaming without respect proper array ending like <p/>
131131
* <code>[{"header":{"queryId":"...","schema":"..."}}, ]</code>
132132
* which will cause json parsing error and will be propagated to UI.
133-
* This is a know issue(https://github.com/confluentinc/ksql/issues/8746), but we don't know when it will be fixed.
134-
* To workaround this we need to check DecodingException err msg.
133+
* This is a known issue(<a href="https://github.com/confluentinc/ksql/issues/8746">...</a>), but we don't know when it will be fixed.
134+
* To work around this we need to check DecodingException err msg.
135135
*/
136136
private boolean isUnexpectedJsonArrayEndCharException(Throwable th) {
137137
return th instanceof DecodingException
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.kafbat.ui.util;
2+
3+
import io.kafbat.ui.config.ClustersProperties;
4+
import java.util.Properties;
5+
import javax.annotation.Nullable;
6+
import org.apache.kafka.common.config.SslConfigs;
7+
8+
public final class KafkaClientSslPropertiesUtil {
9+
10+
private KafkaClientSslPropertiesUtil() {
11+
}
12+
13+
public static void addKafkaSslProperties(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
14+
Properties sink) {
15+
if (truststoreConfig == null) {
16+
return;
17+
}
18+
19+
if (!truststoreConfig.isVerifySsl()) {
20+
sink.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
21+
}
22+
23+
if (truststoreConfig.getTruststoreLocation() == null) {
24+
return;
25+
}
26+
27+
sink.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.getTruststoreLocation());
28+
29+
if (truststoreConfig.getTruststorePassword() != null) {
30+
sink.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.getTruststorePassword());
31+
}
32+
33+
}
34+
35+
}

api/src/main/java/io/kafbat/ui/util/KafkaServicesValidation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public static Mono<ApplicationPropertyValidationDTO> validateClusterConnection(S
6565
@Nullable
6666
TruststoreConfig ssl) {
6767
Properties properties = new Properties();
68-
SslPropertiesUtil.addKafkaSslProperties(ssl, properties);
68+
KafkaClientSslPropertiesUtil.addKafkaSslProperties(ssl, properties);
6969
properties.putAll(clusterProps);
7070
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
7171
// editing properties to make validation faster

api/src/main/java/io/kafbat/ui/util/SslPropertiesUtil.java

Lines changed: 0 additions & 23 deletions
This file was deleted.

api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import io.kafbat.ui.exception.ValidationException;
88
import io.netty.handler.ssl.SslContext;
99
import io.netty.handler.ssl.SslContextBuilder;
10+
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
1011
import java.io.FileInputStream;
1112
import java.security.KeyStore;
1213
import java.util.function.Consumer;
@@ -45,6 +46,10 @@ private static ObjectMapper defaultOM() {
4546

4647
public WebClientConfigurator configureSsl(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
4748
@Nullable ClustersProperties.KeystoreConfig keystoreConfig) {
49+
if (truststoreConfig != null && !truststoreConfig.isVerifySsl()) {
50+
return configureNoSsl();
51+
}
52+
4853
return configureSsl(
4954
keystoreConfig != null ? keystoreConfig.getKeystoreLocation() : null,
5055
keystoreConfig != null ? keystoreConfig.getKeystorePassword() : null,
@@ -97,6 +102,17 @@ private WebClientConfigurator configureSsl(
97102
return this;
98103
}
99104

105+
@SneakyThrows
106+
public WebClientConfigurator configureNoSsl() {
107+
var contextBuilder = SslContextBuilder.forClient();
108+
contextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
109+
110+
SslContext context = contextBuilder.build();
111+
112+
httpClient = httpClient.secure(t -> t.sslContext(context));
113+
return this;
114+
}
115+
100116
public WebClientConfigurator configureBasicAuth(@Nullable String username, @Nullable String password) {
101117
if (username != null && password != null) {
102118
builder.defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth(username, password));

0 commit comments

Comments
 (0)