Skip to content

refactor(backfill): Use of PBJ gRPC Client Helidon instead of Vanilla Helidon. #1499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion block-node/backfill/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
plugins {
id("org.hiero.gradle.module.library")
id("com.hedera.pbj.pbj-compiler") version "0.11.13"
id("com.hedera.pbj.pbj-compiler") version "0.11.14"
}

description = "Hiero Block Node Backfill Plugin"
Expand All @@ -25,6 +25,8 @@ mainModuleInfo {
runtimeOnly("org.apache.logging.log4j.slf4j2.impl")
runtimeOnly("io.helidon.logging.jul")
runtimeOnly("com.hedera.pbj.grpc.helidon.config")
runtimeOnly("com.hedera.pbj.grpc.client.helidon")
runtimeOnly("com.hedera.pbj.grpc.helidon")
}

testModuleInfo {
Expand Down
6 changes: 2 additions & 4 deletions block-node/backfill/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@
requires transitive com.swirlds.metrics.api;
requires transitive org.hiero.block.node.spi;
requires transitive org.hiero.block.protobuf.pbj;
requires transitive io.grpc.stub;
requires transitive io.grpc;
requires transitive io.helidon.grpc.core;
requires transitive io.helidon.webclient.grpc;
requires com.hedera.pbj.grpc.client.helidon;
requires org.hiero.block.node.base;
requires io.helidon.common.tls;
requires io.helidon.webclient.api;
requires io.helidon.webclient.grpc;
requires org.antlr.antlr4.runtime;
requires static transitive com.github.spotbugs.annotations;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
* @param initialDelay Initial delay in seconds before starting the backfill process, to give time for the system to stabilize
* @param perBlockProcessingTimeout Timeout in milliseconds for processing each block, to avoid blocking the backfill
* process indefinitely in case something unexpected happens, this would allow for self-recovery
* @param grpcOverallTimeout single timeout configuration for gRPC Client construction, connectTimeout, readTimeout and pollWaitTime
* @param enableTLS if enabled will assume block-node client supports tls connection.
*/
@ConfigData("backfill")
public record BackfillConfiguration(
Expand All @@ -33,4 +35,6 @@ public record BackfillConfiguration(
@Loggable @ConfigProperty(defaultValue = "25") @Min(1) @Max(10_000) int fetchBatchSize,
@Loggable @ConfigProperty(defaultValue = "1000") @Min(100) int delayBetweenBatches,
@Loggable @ConfigProperty(defaultValue = "15000") @Min(5) int initialDelay,
@Loggable @ConfigProperty(defaultValue = "1000") @Min(500) int perBlockProcessingTimeout) {}
@Loggable @ConfigProperty(defaultValue = "1000") @Min(500) int perBlockProcessingTimeout,
@Loggable @ConfigProperty(defaultValue = "30000") @Min(10000) int grpcOverallTimeout,
@Loggable @ConfigProperty(defaultValue = "false") boolean enableTLS) {}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.hiero.block.api.ServerStatusRequest;
import org.hiero.block.api.ServerStatusResponse;
import org.hiero.block.internal.BlockUnparsed;
import org.hiero.block.node.backfill.client.BackfillSource;
import org.hiero.block.node.backfill.client.BackfillSourceConfig;
Expand Down Expand Up @@ -50,7 +52,10 @@ public class BackfillGrpcClient {
* This is used for exponential backoff in case of failures.
*/
private final int initialRetryDelayMs;

/** Connection timeout in milliseconds for gRPC calls to block nodes. */
private final int connectionTimeoutSeconds;
/** Enable TLS for secure connections to block nodes. */
private final boolean enableTls;
/** Current status of the Block Node Clients */
private ConcurrentHashMap<BackfillSourceConfig, Status> nodeStatusMap = new ConcurrentHashMap<>();
/**
Expand All @@ -65,12 +70,19 @@ public class BackfillGrpcClient {
* @param blockNodePreferenceFilePath the path to the block node preference file
*/
public BackfillGrpcClient(
Path blockNodePreferenceFilePath, int maxRetries, Counter backfillRetriesCounter, int retryInitialDelayMs)
Path blockNodePreferenceFilePath,
int maxRetries,
Counter backfillRetriesCounter,
int retryInitialDelayMs,
int connectionTimeoutSeconds,
boolean enableTls)
throws IOException, ParseException {
this.blockNodeSource = BackfillSource.JSON.parse(Bytes.wrap(Files.readAllBytes(blockNodePreferenceFilePath)));
this.maxRetries = maxRetries;
this.initialRetryDelayMs = retryInitialDelayMs;
this.backfillRetries = backfillRetriesCounter;
this.connectionTimeoutSeconds = connectionTimeoutSeconds;
this.enableTls = enableTls;

for (BackfillSourceConfig node : blockNodeSource.nodes()) {
LOGGER.log(INFO, "Address: {0}, Port: {1}, Priority: {2}", node.address(), node.port(), node.priority());
Expand All @@ -84,10 +96,11 @@ public BackfillGrpcClient(
* @return a LongRange representing the intersection of the block range and the available blocks in the node.
*/
private LongRange getAvailableRangeInNode(BlockNodeClient node, LongRange blockRange) {
long firstAvailableBlock =
node.getBlockNodeServerStatusClient().getServerStatus().firstAvailableBlock();
long lastAvailableBlock =
node.getBlockNodeServerStatusClient().getServerStatus().lastAvailableBlock();

final ServerStatusResponse nodeStatus =
node.getBlockNodeServiceClient().serverStatus(new ServerStatusRequest());
long firstAvailableBlock = nodeStatus.firstAvailableBlock();
long lastAvailableBlock = nodeStatus.lastAvailableBlock();

long start = blockRange.start();
long end = blockRange.end();
Expand Down Expand Up @@ -148,7 +161,7 @@ public List<BlockUnparsed> fetchBlocks(LongRange blockRange) {

// Try to fetch blocks from this node
return currentNodeClient
.getBlockNodeSubscribeClient()
.getBlockstreamSubscribeUnparsedClient()
.getBatchOfBlocks(actualRange.start(), actualRange.end());
} catch (Exception e) {
if (attempt == maxRetries) {
Expand Down Expand Up @@ -195,7 +208,8 @@ public List<BlockUnparsed> fetchBlocks(LongRange blockRange) {
* @return a BlockNodeClient for the specified node
*/
private BlockNodeClient getNodeClient(BackfillSourceConfig node) {
return nodeClientMap.computeIfAbsent(node, BlockNodeClient::new);
return nodeClientMap.computeIfAbsent(
node, BlockNodeClient -> new BlockNodeClient(node, connectionTimeoutSeconds, enableTls));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
package org.hiero.block.node.backfill;

import static java.lang.System.Logger.Level.DEBUG;
import static java.lang.System.Logger.Level.INFO;
import static java.lang.System.Logger.Level.TRACE;

Expand Down Expand Up @@ -180,7 +181,9 @@ public void init(BlockNodeContext context, ServiceBuilder serviceBuilder) {
blockNodeSourcesPath,
backfillConfiguration.maxRetries(),
this.backfillRetries,
backfillConfiguration.initialRetryDelay());
backfillConfiguration.initialRetryDelay(),
backfillConfiguration.grpcOverallTimeout(),
backfillConfiguration.enableTLS());
LOGGER.log(TRACE, "Initialized gRPC client with sources path: {0}", blockNodeSourcesPath);
} catch (Exception e) {
LOGGER.log(INFO, "Failed to initialize gRPC client: {0}", e.getMessage());
Expand All @@ -206,7 +209,8 @@ public void start() {
TRACE,
"Scheduling backfill process to start in {0} milliseconds",
backfillConfiguration.initialDelay());
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler = Executors.newScheduledThreadPool(
2); // Two threads: one for autonomous backfill, one for on-demand backfill
scheduler.scheduleAtFixedRate(
this::detectGaps,
backfillConfiguration.initialDelay(),
Expand Down Expand Up @@ -331,6 +335,11 @@ private void backfillGap(LongRange gap, BackfillType backfillType) throws Interr
// to avoid deadlocks, since blocks that fail verification are not persisted
getLatch(backfillType).set(new CountDownLatch(batchOfBlocks.size()));

if (batchOfBlocks.isEmpty()) {
LOGGER.log(DEBUG, "No blocks fetched for gap {0}, skipping", chunk);
continue; // Skip empty batches
}

// Process each fetched block
for (BlockUnparsed blockUnparsed : batchOfBlocks) {
long blockNumber = extractBlockNumber(blockUnparsed);
Expand All @@ -347,21 +356,25 @@ private void backfillGap(LongRange gap, BackfillType backfillType) throws Interr
boolean backfillFinished = getLatch(backfillType).get().await(timeout, TimeUnit.MILLISECONDS);

// Check if the backfill finished successfully
if (!backfillFinished) {
if (backfillFinished) {
// just log a victory message for each chunk
LOGGER.log(TRACE, "Successfully backfilled gap {0}", chunk);
} else {
LOGGER.log(TRACE, "Backfill for gap {0} did not finish in time", chunk);
backfillFetchErrors.increment();
// If it didn't finish, we will retry it later but move on to next chunk
} else {
// just log a victory message for each chunk
LOGGER.log(TRACE, "Successfully backfilled gap {0}", chunk);
}

// Cooldown between batches
Thread.sleep(backfillConfiguration.delayBetweenBatches());
}

LOGGER.log(
TRACE, "Completed backfilling of type {0} gap from {1} to {2}", backfillType, gap.start(), gap.end());
TRACE,
"Completed backfilling task (completion only) of type {0} gap from {1} to {2} ",
backfillType,
gap.start(),
gap.end());
if (backfillType.equals(BackfillType.ON_DEMAND)) {
onDemandBackfillStartBlock.set(-1); // Reset on-demand start block after backfill
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,74 @@
// SPDX-License-Identifier: Apache-2.0
package org.hiero.block.node.backfill.client;

import com.hedera.pbj.grpc.client.helidon.PbjGrpcClient;
import com.hedera.pbj.grpc.client.helidon.PbjGrpcClientConfig;
import com.hedera.pbj.runtime.grpc.ServiceInterface;
import io.helidon.common.tls.Tls;
import io.helidon.webclient.grpc.GrpcClient;
import io.helidon.webclient.api.WebClient;
import io.helidon.webclient.grpc.GrpcClientProtocolConfig;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import org.hiero.block.api.BlockNodeServiceInterface;

public class BlockNodeClient {
private final BlockNodeServerStatusClient blockNodeServerStatusClient;
private final BlockNodeSubscribeClient blockNodeSubscribeClient;
// Options definition for all gRPC services in the block node client
private static record Options(Optional<String> authority, String contentType)
implements ServiceInterface.RequestOptions {}

public BlockNodeClient(BackfillSourceConfig blockNodeConfig) {
private static final BlockNodeClient.Options OPTIONS =
new BlockNodeClient.Options(Optional.empty(), ServiceInterface.RequestOptions.APPLICATION_GRPC);

// Initialize gRPC client with the block node configuration
GrpcClient grpcClient = GrpcClient.builder()
.tls(Tls.builder().enabled(false).build())
// block node services
private final BlockStreamSubscribeUnparsedClient blockStreamSubscribeUnparsedClient;
private final BlockNodeServiceInterface.BlockNodeServiceClient blockNodeServiceClient;

/**
* Constructs a BlockNodeClient using the provided configuration.
*
* @param blockNodeConfig the configuration for the block node, including address and port
*/
public BlockNodeClient(BackfillSourceConfig blockNodeConfig, int timeoutMs, boolean enableTls) {

final Duration timeoutDuration = Duration.ofMillis(timeoutMs);

final Tls tls = Tls.builder().enabled(enableTls).build();
final PbjGrpcClientConfig grpcConfig =
new PbjGrpcClientConfig(timeoutDuration, tls, Optional.of(""), "application/grpc");

final WebClient webClient = WebClient.builder()
.baseUri("http://" + blockNodeConfig.address() + ":" + blockNodeConfig.port())
.protocolConfig(GrpcClientProtocolConfig.builder()
.tls(tls)
.protocolConfigs(List.of(GrpcClientProtocolConfig.builder()
.abortPollTimeExpired(false)
.pollWaitTime(Duration.ofSeconds(30))
.build())
.keepAlive(true)
.pollWaitTime(timeoutDuration)
.build()))
.connectTimeout(timeoutDuration)
.build();
// Initialize clients for server status and block subscription
this.blockNodeServerStatusClient = new BlockNodeServerStatusClient(grpcClient);
this.blockNodeSubscribeClient = new BlockNodeSubscribeClient(grpcClient);

PbjGrpcClient pbjGrpcClient = new PbjGrpcClient(webClient, grpcConfig);

// we reuse the host connection with many services.
blockNodeServiceClient = new BlockNodeServiceInterface.BlockNodeServiceClient(pbjGrpcClient, OPTIONS);
this.blockStreamSubscribeUnparsedClient = new BlockStreamSubscribeUnparsedClient(pbjGrpcClient);
}

public BlockNodeServerStatusClient getBlockNodeServerStatusClient() {
return blockNodeServerStatusClient;
/**
* Returns the BlockStreamSubscribeUnparsedClient for subscribing to block streams.
*
* @return the BlockStreamSubscribeUnparsedClient
*/
public BlockStreamSubscribeUnparsedClient getBlockstreamSubscribeUnparsedClient() {
return blockStreamSubscribeUnparsedClient;
}

public BlockNodeSubscribeClient getBlockNodeSubscribeClient() {
return blockNodeSubscribeClient;
/**
* Returns the BlockNodeServiceClient for accessing block node services.
*
* @return the BlockNodeServiceClient
*/
public BlockNodeServiceInterface.BlockNodeServiceClient getBlockNodeServiceClient() {
return blockNodeServiceClient;
}
}
Loading
Loading