Skip to content

Commit 2f32ab9

Browse files
authored
refactor(backfill): Use of PBJ gRPC Client Helidon instead of Vanilla Helidon. (#1499)
Signed-off-by: Alfredo Gutierrez Grajeda <alfredo@hashgraph.com>
1 parent 69a735f commit 2f32ab9

File tree

12 files changed

+372
-402
lines changed

12 files changed

+372
-402
lines changed

block-node/backfill/build.gradle.kts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
plugins {
33
id("org.hiero.gradle.module.library")
4-
id("com.hedera.pbj.pbj-compiler") version "0.11.13"
4+
id("com.hedera.pbj.pbj-compiler") version "0.11.14"
55
}
66

77
description = "Hiero Block Node Backfill Plugin"
@@ -25,6 +25,8 @@ mainModuleInfo {
2525
runtimeOnly("org.apache.logging.log4j.slf4j2.impl")
2626
runtimeOnly("io.helidon.logging.jul")
2727
runtimeOnly("com.hedera.pbj.grpc.helidon.config")
28+
runtimeOnly("com.hedera.pbj.grpc.client.helidon")
29+
runtimeOnly("com.hedera.pbj.grpc.helidon")
2830
}
2931

3032
testModuleInfo {

block-node/backfill/src/main/java/module-info.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,11 @@
2020
requires transitive com.swirlds.metrics.api;
2121
requires transitive org.hiero.block.node.spi;
2222
requires transitive org.hiero.block.protobuf.pbj;
23-
requires transitive io.grpc.stub;
24-
requires transitive io.grpc;
25-
requires transitive io.helidon.grpc.core;
26-
requires transitive io.helidon.webclient.grpc;
23+
requires com.hedera.pbj.grpc.client.helidon;
2724
requires org.hiero.block.node.base;
2825
requires io.helidon.common.tls;
2926
requires io.helidon.webclient.api;
27+
requires io.helidon.webclient.grpc;
3028
requires org.antlr.antlr4.runtime;
3129
requires static transitive com.github.spotbugs.annotations;
3230

block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillConfiguration.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
* @param initialDelay Initial delay in seconds before starting the backfill process, to give time for the system to stabilize
2222
* @param perBlockProcessingTimeout Timeout in milliseconds for processing each block, to avoid blocking the backfill
2323
* process indefinitely in case something unexpected happens, this would allow for self-recovery
24+
* @param grpcOverallTimeout single timeout configuration for gRPC Client construction, connectTimeout, readTimeout and pollWaitTime
25+
* @param enableTLS if enabled will assume block-node client supports tls connection.
2426
*/
2527
@ConfigData("backfill")
2628
public record BackfillConfiguration(
@@ -33,4 +35,6 @@ public record BackfillConfiguration(
3335
@Loggable @ConfigProperty(defaultValue = "25") @Min(1) @Max(10_000) int fetchBatchSize,
3436
@Loggable @ConfigProperty(defaultValue = "1000") @Min(100) int delayBetweenBatches,
3537
@Loggable @ConfigProperty(defaultValue = "15000") @Min(5) int initialDelay,
36-
@Loggable @ConfigProperty(defaultValue = "1000") @Min(500) int perBlockProcessingTimeout) {}
38+
@Loggable @ConfigProperty(defaultValue = "1000") @Min(500) int perBlockProcessingTimeout,
39+
@Loggable @ConfigProperty(defaultValue = "30000") @Min(10000) int grpcOverallTimeout,
40+
@Loggable @ConfigProperty(defaultValue = "false") boolean enableTLS) {}

block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import java.util.List;
1616
import java.util.concurrent.ConcurrentHashMap;
1717
import java.util.concurrent.TimeUnit;
18+
import org.hiero.block.api.ServerStatusRequest;
19+
import org.hiero.block.api.ServerStatusResponse;
1820
import org.hiero.block.internal.BlockUnparsed;
1921
import org.hiero.block.node.backfill.client.BackfillSource;
2022
import org.hiero.block.node.backfill.client.BackfillSourceConfig;
@@ -50,7 +52,10 @@ public class BackfillGrpcClient {
5052
* This is used for exponential backoff in case of failures.
5153
*/
5254
private final int initialRetryDelayMs;
53-
55+
/** Connection timeout in milliseconds for gRPC calls to block nodes. */
56+
private final int connectionTimeoutSeconds;
57+
/** Enable TLS for secure connections to block nodes. */
58+
private final boolean enableTls;
5459
/** Current status of the Block Node Clients */
5560
private ConcurrentHashMap<BackfillSourceConfig, Status> nodeStatusMap = new ConcurrentHashMap<>();
5661
/**
@@ -65,12 +70,19 @@ public class BackfillGrpcClient {
6570
* @param blockNodePreferenceFilePath the path to the block node preference file
6671
*/
6772
public BackfillGrpcClient(
68-
Path blockNodePreferenceFilePath, int maxRetries, Counter backfillRetriesCounter, int retryInitialDelayMs)
73+
Path blockNodePreferenceFilePath,
74+
int maxRetries,
75+
Counter backfillRetriesCounter,
76+
int retryInitialDelayMs,
77+
int connectionTimeoutSeconds,
78+
boolean enableTls)
6979
throws IOException, ParseException {
7080
this.blockNodeSource = BackfillSource.JSON.parse(Bytes.wrap(Files.readAllBytes(blockNodePreferenceFilePath)));
7181
this.maxRetries = maxRetries;
7282
this.initialRetryDelayMs = retryInitialDelayMs;
7383
this.backfillRetries = backfillRetriesCounter;
84+
this.connectionTimeoutSeconds = connectionTimeoutSeconds;
85+
this.enableTls = enableTls;
7486

7587
for (BackfillSourceConfig node : blockNodeSource.nodes()) {
7688
LOGGER.log(INFO, "Address: {0}, Port: {1}, Priority: {2}", node.address(), node.port(), node.priority());
@@ -84,10 +96,11 @@ public BackfillGrpcClient(
8496
* @return a LongRange representing the intersection of the block range and the available blocks in the node.
8597
*/
8698
private LongRange getAvailableRangeInNode(BlockNodeClient node, LongRange blockRange) {
87-
long firstAvailableBlock =
88-
node.getBlockNodeServerStatusClient().getServerStatus().firstAvailableBlock();
89-
long lastAvailableBlock =
90-
node.getBlockNodeServerStatusClient().getServerStatus().lastAvailableBlock();
99+
100+
final ServerStatusResponse nodeStatus =
101+
node.getBlockNodeServiceClient().serverStatus(new ServerStatusRequest());
102+
long firstAvailableBlock = nodeStatus.firstAvailableBlock();
103+
long lastAvailableBlock = nodeStatus.lastAvailableBlock();
91104

92105
long start = blockRange.start();
93106
long end = blockRange.end();
@@ -148,7 +161,7 @@ public List<BlockUnparsed> fetchBlocks(LongRange blockRange) {
148161

149162
// Try to fetch blocks from this node
150163
return currentNodeClient
151-
.getBlockNodeSubscribeClient()
164+
.getBlockstreamSubscribeUnparsedClient()
152165
.getBatchOfBlocks(actualRange.start(), actualRange.end());
153166
} catch (Exception e) {
154167
if (attempt == maxRetries) {
@@ -195,7 +208,8 @@ public List<BlockUnparsed> fetchBlocks(LongRange blockRange) {
195208
* @return a BlockNodeClient for the specified node
196209
*/
197210
private BlockNodeClient getNodeClient(BackfillSourceConfig node) {
198-
return nodeClientMap.computeIfAbsent(node, BlockNodeClient::new);
211+
return nodeClientMap.computeIfAbsent(
212+
node, BlockNodeClient -> new BlockNodeClient(node, connectionTimeoutSeconds, enableTls));
199213
}
200214

201215
/**

block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillPlugin.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
package org.hiero.block.node.backfill;
33

4+
import static java.lang.System.Logger.Level.DEBUG;
45
import static java.lang.System.Logger.Level.INFO;
56
import static java.lang.System.Logger.Level.TRACE;
67

@@ -180,7 +181,9 @@ public void init(BlockNodeContext context, ServiceBuilder serviceBuilder) {
180181
blockNodeSourcesPath,
181182
backfillConfiguration.maxRetries(),
182183
this.backfillRetries,
183-
backfillConfiguration.initialRetryDelay());
184+
backfillConfiguration.initialRetryDelay(),
185+
backfillConfiguration.grpcOverallTimeout(),
186+
backfillConfiguration.enableTLS());
184187
LOGGER.log(TRACE, "Initialized gRPC client with sources path: {0}", blockNodeSourcesPath);
185188
} catch (Exception e) {
186189
LOGGER.log(INFO, "Failed to initialize gRPC client: {0}", e.getMessage());
@@ -206,7 +209,8 @@ public void start() {
206209
TRACE,
207210
"Scheduling backfill process to start in {0} milliseconds",
208211
backfillConfiguration.initialDelay());
209-
scheduler = Executors.newSingleThreadScheduledExecutor();
212+
scheduler = Executors.newScheduledThreadPool(
213+
2); // Two threads: one for autonomous backfill, one for on-demand backfill
210214
scheduler.scheduleAtFixedRate(
211215
this::detectGaps,
212216
backfillConfiguration.initialDelay(),
@@ -331,6 +335,11 @@ private void backfillGap(LongRange gap, BackfillType backfillType) throws Interr
331335
// to avoid deadlocks, since blocks that fail verification are not persisted
332336
getLatch(backfillType).set(new CountDownLatch(batchOfBlocks.size()));
333337

338+
if (batchOfBlocks.isEmpty()) {
339+
LOGGER.log(DEBUG, "No blocks fetched for gap {0}, skipping", chunk);
340+
continue; // Skip empty batches
341+
}
342+
334343
// Process each fetched block
335344
for (BlockUnparsed blockUnparsed : batchOfBlocks) {
336345
long blockNumber = extractBlockNumber(blockUnparsed);
@@ -347,21 +356,25 @@ private void backfillGap(LongRange gap, BackfillType backfillType) throws Interr
347356
boolean backfillFinished = getLatch(backfillType).get().await(timeout, TimeUnit.MILLISECONDS);
348357

349358
// Check if the backfill finished successfully
350-
if (!backfillFinished) {
359+
if (backfillFinished) {
360+
// just log a victory message for each chunk
361+
LOGGER.log(TRACE, "Successfully backfilled gap {0}", chunk);
362+
} else {
351363
LOGGER.log(TRACE, "Backfill for gap {0} did not finish in time", chunk);
352364
backfillFetchErrors.increment();
353365
// If it didn't finish, we will retry it later but move on to next chunk
354-
} else {
355-
// just log a victory message for each chunk
356-
LOGGER.log(TRACE, "Successfully backfilled gap {0}", chunk);
357366
}
358367

359368
// Cooldown between batches
360369
Thread.sleep(backfillConfiguration.delayBetweenBatches());
361370
}
362371

363372
LOGGER.log(
364-
TRACE, "Completed backfilling of type {0} gap from {1} to {2}", backfillType, gap.start(), gap.end());
373+
TRACE,
374+
"Completed backfilling task (completion only) of type {0} gap from {1} to {2} ",
375+
backfillType,
376+
gap.start(),
377+
gap.end());
365378
if (backfillType.equals(BackfillType.ON_DEMAND)) {
366379
onDemandBackfillStartBlock.set(-1); // Reset on-demand start block after backfill
367380
}
Lines changed: 55 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,74 @@
11
// SPDX-License-Identifier: Apache-2.0
22
package org.hiero.block.node.backfill.client;
33

4+
import com.hedera.pbj.grpc.client.helidon.PbjGrpcClient;
5+
import com.hedera.pbj.grpc.client.helidon.PbjGrpcClientConfig;
6+
import com.hedera.pbj.runtime.grpc.ServiceInterface;
47
import io.helidon.common.tls.Tls;
5-
import io.helidon.webclient.grpc.GrpcClient;
8+
import io.helidon.webclient.api.WebClient;
69
import io.helidon.webclient.grpc.GrpcClientProtocolConfig;
710
import java.time.Duration;
11+
import java.util.List;
12+
import java.util.Optional;
13+
import org.hiero.block.api.BlockNodeServiceInterface;
814

915
public class BlockNodeClient {
10-
private final BlockNodeServerStatusClient blockNodeServerStatusClient;
11-
private final BlockNodeSubscribeClient blockNodeSubscribeClient;
16+
// Options definition for all gRPC services in the block node client
17+
private static record Options(Optional<String> authority, String contentType)
18+
implements ServiceInterface.RequestOptions {}
1219

13-
public BlockNodeClient(BackfillSourceConfig blockNodeConfig) {
20+
private static final BlockNodeClient.Options OPTIONS =
21+
new BlockNodeClient.Options(Optional.empty(), ServiceInterface.RequestOptions.APPLICATION_GRPC);
1422

15-
// Initialize gRPC client with the block node configuration
16-
GrpcClient grpcClient = GrpcClient.builder()
17-
.tls(Tls.builder().enabled(false).build())
23+
// block node services
24+
private final BlockStreamSubscribeUnparsedClient blockStreamSubscribeUnparsedClient;
25+
private final BlockNodeServiceInterface.BlockNodeServiceClient blockNodeServiceClient;
26+
27+
/**
28+
* Constructs a BlockNodeClient using the provided configuration.
29+
*
30+
* @param blockNodeConfig the configuration for the block node, including address and port
31+
*/
32+
public BlockNodeClient(BackfillSourceConfig blockNodeConfig, int timeoutMs, boolean enableTls) {
33+
34+
final Duration timeoutDuration = Duration.ofMillis(timeoutMs);
35+
36+
final Tls tls = Tls.builder().enabled(enableTls).build();
37+
final PbjGrpcClientConfig grpcConfig =
38+
new PbjGrpcClientConfig(timeoutDuration, tls, Optional.of(""), "application/grpc");
39+
40+
final WebClient webClient = WebClient.builder()
1841
.baseUri("http://" + blockNodeConfig.address() + ":" + blockNodeConfig.port())
19-
.protocolConfig(GrpcClientProtocolConfig.builder()
42+
.tls(tls)
43+
.protocolConfigs(List.of(GrpcClientProtocolConfig.builder()
2044
.abortPollTimeExpired(false)
21-
.pollWaitTime(Duration.ofSeconds(30))
22-
.build())
23-
.keepAlive(true)
45+
.pollWaitTime(timeoutDuration)
46+
.build()))
47+
.connectTimeout(timeoutDuration)
2448
.build();
25-
// Initialize clients for server status and block subscription
26-
this.blockNodeServerStatusClient = new BlockNodeServerStatusClient(grpcClient);
27-
this.blockNodeSubscribeClient = new BlockNodeSubscribeClient(grpcClient);
49+
50+
PbjGrpcClient pbjGrpcClient = new PbjGrpcClient(webClient, grpcConfig);
51+
52+
// we reuse the host connection with many services.
53+
blockNodeServiceClient = new BlockNodeServiceInterface.BlockNodeServiceClient(pbjGrpcClient, OPTIONS);
54+
this.blockStreamSubscribeUnparsedClient = new BlockStreamSubscribeUnparsedClient(pbjGrpcClient);
2855
}
2956

30-
public BlockNodeServerStatusClient getBlockNodeServerStatusClient() {
31-
return blockNodeServerStatusClient;
57+
/**
58+
* Returns the BlockStreamSubscribeUnparsedClient for subscribing to block streams.
59+
*
60+
* @return the BlockStreamSubscribeUnparsedClient
61+
*/
62+
public BlockStreamSubscribeUnparsedClient getBlockstreamSubscribeUnparsedClient() {
63+
return blockStreamSubscribeUnparsedClient;
3264
}
3365

34-
public BlockNodeSubscribeClient getBlockNodeSubscribeClient() {
35-
return blockNodeSubscribeClient;
66+
/**
67+
* Returns the BlockNodeServiceClient for accessing block node services.
68+
*
69+
* @return the BlockNodeServiceClient
70+
*/
71+
public BlockNodeServiceInterface.BlockNodeServiceClient getBlockNodeServiceClient() {
72+
return blockNodeServiceClient;
3673
}
3774
}

0 commit comments

Comments
 (0)