diff --git a/block-node/backfill/build.gradle.kts b/block-node/backfill/build.gradle.kts index 8317ba093..f75c9b9a6 100644 --- a/block-node/backfill/build.gradle.kts +++ b/block-node/backfill/build.gradle.kts @@ -1,7 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 plugins { id("org.hiero.gradle.module.library") - id("com.hedera.pbj.pbj-compiler") version "0.11.13" + id("com.hedera.pbj.pbj-compiler") version "0.11.14" } description = "Hiero Block Node Backfill Plugin" @@ -25,6 +25,8 @@ mainModuleInfo { runtimeOnly("org.apache.logging.log4j.slf4j2.impl") runtimeOnly("io.helidon.logging.jul") runtimeOnly("com.hedera.pbj.grpc.helidon.config") + runtimeOnly("com.hedera.pbj.grpc.client.helidon") + runtimeOnly("com.hedera.pbj.grpc.helidon") } testModuleInfo { diff --git a/block-node/backfill/src/main/java/module-info.java b/block-node/backfill/src/main/java/module-info.java index 055684a57..0b0026368 100644 --- a/block-node/backfill/src/main/java/module-info.java +++ b/block-node/backfill/src/main/java/module-info.java @@ -20,13 +20,11 @@ requires transitive com.swirlds.metrics.api; requires transitive org.hiero.block.node.spi; requires transitive org.hiero.block.protobuf.pbj; - requires transitive io.grpc.stub; - requires transitive io.grpc; - requires transitive io.helidon.grpc.core; - requires transitive io.helidon.webclient.grpc; + requires com.hedera.pbj.grpc.client.helidon; requires org.hiero.block.node.base; requires io.helidon.common.tls; requires io.helidon.webclient.api; + requires io.helidon.webclient.grpc; requires org.antlr.antlr4.runtime; requires static transitive com.github.spotbugs.annotations; diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillConfiguration.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillConfiguration.java index a4f55c07f..847549e8a 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillConfiguration.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillConfiguration.java @@ -21,6 +21,8 @@ * @param initialDelay Initial delay in seconds before starting the backfill process, to give time for the system to stabilize * @param perBlockProcessingTimeout Timeout in milliseconds for processing each block, to avoid blocking the backfill * process indefinitely in case something unexpected happens, this would allow for self-recovery + * @param grpcOverallTimeout single timeout configuration for gRPC Client construction, connectTimeout, readTimeout and pollWaitTime + * @param enableTLS if enabled will assume block-node client supports tls connection. */ @ConfigData("backfill") public record BackfillConfiguration( @@ -33,4 +35,6 @@ public record BackfillConfiguration( @Loggable @ConfigProperty(defaultValue = "25") @Min(1) @Max(10_000) int fetchBatchSize, @Loggable @ConfigProperty(defaultValue = "1000") @Min(100) int delayBetweenBatches, @Loggable @ConfigProperty(defaultValue = "15000") @Min(5) int initialDelay, - @Loggable @ConfigProperty(defaultValue = "1000") @Min(500) int perBlockProcessingTimeout) {} + @Loggable @ConfigProperty(defaultValue = "1000") @Min(500) int perBlockProcessingTimeout, + @Loggable @ConfigProperty(defaultValue = "30000") @Min(10000) int grpcOverallTimeout, + @Loggable @ConfigProperty(defaultValue = "false") boolean enableTLS) {} diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java index bfe848472..27f18c8e8 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java @@ -15,6 +15,8 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import org.hiero.block.api.ServerStatusRequest; +import org.hiero.block.api.ServerStatusResponse; import org.hiero.block.internal.BlockUnparsed; import org.hiero.block.node.backfill.client.BackfillSource; import org.hiero.block.node.backfill.client.BackfillSourceConfig; @@ -50,7 +52,10 @@ public class BackfillGrpcClient { * This is used for exponential backoff in case of failures. */ private final int initialRetryDelayMs; - + /** Connection timeout in milliseconds for gRPC calls to block nodes. */ + private final int connectionTimeoutSeconds; + /** Enable TLS for secure connections to block nodes. */ + private final boolean enableTls; /** Current status of the Block Node Clients */ private ConcurrentHashMap nodeStatusMap = new ConcurrentHashMap<>(); /** @@ -65,12 +70,19 @@ public class BackfillGrpcClient { * @param blockNodePreferenceFilePath the path to the block node preference file */ public BackfillGrpcClient( - Path blockNodePreferenceFilePath, int maxRetries, Counter backfillRetriesCounter, int retryInitialDelayMs) + Path blockNodePreferenceFilePath, + int maxRetries, + Counter backfillRetriesCounter, + int retryInitialDelayMs, + int connectionTimeoutSeconds, + boolean enableTls) throws IOException, ParseException { this.blockNodeSource = BackfillSource.JSON.parse(Bytes.wrap(Files.readAllBytes(blockNodePreferenceFilePath))); this.maxRetries = maxRetries; this.initialRetryDelayMs = retryInitialDelayMs; this.backfillRetries = backfillRetriesCounter; + this.connectionTimeoutSeconds = connectionTimeoutSeconds; + this.enableTls = enableTls; for (BackfillSourceConfig node : blockNodeSource.nodes()) { LOGGER.log(INFO, "Address: {0}, Port: {1}, Priority: {2}", node.address(), node.port(), node.priority()); @@ -84,10 +96,11 @@ public BackfillGrpcClient( * @return a LongRange representing the intersection of the block range and the available blocks in the node. */ private LongRange getAvailableRangeInNode(BlockNodeClient node, LongRange blockRange) { - long firstAvailableBlock = - node.getBlockNodeServerStatusClient().getServerStatus().firstAvailableBlock(); - long lastAvailableBlock = - node.getBlockNodeServerStatusClient().getServerStatus().lastAvailableBlock(); + + final ServerStatusResponse nodeStatus = + node.getBlockNodeServiceClient().serverStatus(new ServerStatusRequest()); + long firstAvailableBlock = nodeStatus.firstAvailableBlock(); + long lastAvailableBlock = nodeStatus.lastAvailableBlock(); long start = blockRange.start(); long end = blockRange.end(); @@ -148,7 +161,7 @@ public List fetchBlocks(LongRange blockRange) { // Try to fetch blocks from this node return currentNodeClient - .getBlockNodeSubscribeClient() + .getBlockstreamSubscribeUnparsedClient() .getBatchOfBlocks(actualRange.start(), actualRange.end()); } catch (Exception e) { if (attempt == maxRetries) { @@ -195,7 +208,8 @@ public List fetchBlocks(LongRange blockRange) { * @return a BlockNodeClient for the specified node */ private BlockNodeClient getNodeClient(BackfillSourceConfig node) { - return nodeClientMap.computeIfAbsent(node, BlockNodeClient::new); + return nodeClientMap.computeIfAbsent( + node, BlockNodeClient -> new BlockNodeClient(node, connectionTimeoutSeconds, enableTls)); } /** diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillPlugin.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillPlugin.java index 77bb67b5c..fda366b00 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillPlugin.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillPlugin.java @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 package org.hiero.block.node.backfill; +import static java.lang.System.Logger.Level.DEBUG; import static java.lang.System.Logger.Level.INFO; import static java.lang.System.Logger.Level.TRACE; @@ -180,7 +181,9 @@ public void init(BlockNodeContext context, ServiceBuilder serviceBuilder) { blockNodeSourcesPath, backfillConfiguration.maxRetries(), this.backfillRetries, - backfillConfiguration.initialRetryDelay()); + backfillConfiguration.initialRetryDelay(), + backfillConfiguration.grpcOverallTimeout(), + backfillConfiguration.enableTLS()); LOGGER.log(TRACE, "Initialized gRPC client with sources path: {0}", blockNodeSourcesPath); } catch (Exception e) { LOGGER.log(INFO, "Failed to initialize gRPC client: {0}", e.getMessage()); @@ -206,7 +209,8 @@ public void start() { TRACE, "Scheduling backfill process to start in {0} milliseconds", backfillConfiguration.initialDelay()); - scheduler = Executors.newSingleThreadScheduledExecutor(); + scheduler = Executors.newScheduledThreadPool( + 2); // Two threads: one for autonomous backfill, one for on-demand backfill scheduler.scheduleAtFixedRate( this::detectGaps, backfillConfiguration.initialDelay(), @@ -331,6 +335,11 @@ private void backfillGap(LongRange gap, BackfillType backfillType) throws Interr // to avoid deadlocks, since blocks that fail verification are not persisted getLatch(backfillType).set(new CountDownLatch(batchOfBlocks.size())); + if (batchOfBlocks.isEmpty()) { + LOGGER.log(DEBUG, "No blocks fetched for gap {0}, skipping", chunk); + continue; // Skip empty batches + } + // Process each fetched block for (BlockUnparsed blockUnparsed : batchOfBlocks) { long blockNumber = extractBlockNumber(blockUnparsed); @@ -347,13 +356,13 @@ private void backfillGap(LongRange gap, BackfillType backfillType) throws Interr boolean backfillFinished = getLatch(backfillType).get().await(timeout, TimeUnit.MILLISECONDS); // Check if the backfill finished successfully - if (!backfillFinished) { + if (backfillFinished) { + // just log a victory message for each chunk + LOGGER.log(TRACE, "Successfully backfilled gap {0}", chunk); + } else { LOGGER.log(TRACE, "Backfill for gap {0} did not finish in time", chunk); backfillFetchErrors.increment(); // If it didn't finish, we will retry it later but move on to next chunk - } else { - // just log a victory message for each chunk - LOGGER.log(TRACE, "Successfully backfilled gap {0}", chunk); } // Cooldown between batches @@ -361,7 +370,11 @@ private void backfillGap(LongRange gap, BackfillType backfillType) throws Interr } LOGGER.log( - TRACE, "Completed backfilling of type {0} gap from {1} to {2}", backfillType, gap.start(), gap.end()); + TRACE, + "Completed backfilling task (completion only) of type {0} gap from {1} to {2} ", + backfillType, + gap.start(), + gap.end()); if (backfillType.equals(BackfillType.ON_DEMAND)) { onDemandBackfillStartBlock.set(-1); // Reset on-demand start block after backfill } diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java index d09ed1abd..9e1b9981e 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java @@ -1,37 +1,74 @@ // SPDX-License-Identifier: Apache-2.0 package org.hiero.block.node.backfill.client; +import com.hedera.pbj.grpc.client.helidon.PbjGrpcClient; +import com.hedera.pbj.grpc.client.helidon.PbjGrpcClientConfig; +import com.hedera.pbj.runtime.grpc.ServiceInterface; import io.helidon.common.tls.Tls; -import io.helidon.webclient.grpc.GrpcClient; +import io.helidon.webclient.api.WebClient; import io.helidon.webclient.grpc.GrpcClientProtocolConfig; import java.time.Duration; +import java.util.List; +import java.util.Optional; +import org.hiero.block.api.BlockNodeServiceInterface; public class BlockNodeClient { - private final BlockNodeServerStatusClient blockNodeServerStatusClient; - private final BlockNodeSubscribeClient blockNodeSubscribeClient; + // Options definition for all gRPC services in the block node client + private static record Options(Optional authority, String contentType) + implements ServiceInterface.RequestOptions {} - public BlockNodeClient(BackfillSourceConfig blockNodeConfig) { + private static final BlockNodeClient.Options OPTIONS = + new BlockNodeClient.Options(Optional.empty(), ServiceInterface.RequestOptions.APPLICATION_GRPC); - // Initialize gRPC client with the block node configuration - GrpcClient grpcClient = GrpcClient.builder() - .tls(Tls.builder().enabled(false).build()) + // block node services + private final BlockStreamSubscribeUnparsedClient blockStreamSubscribeUnparsedClient; + private final BlockNodeServiceInterface.BlockNodeServiceClient blockNodeServiceClient; + + /** + * Constructs a BlockNodeClient using the provided configuration. + * + * @param blockNodeConfig the configuration for the block node, including address and port + */ + public BlockNodeClient(BackfillSourceConfig blockNodeConfig, int timeoutMs, boolean enableTls) { + + final Duration timeoutDuration = Duration.ofMillis(timeoutMs); + + final Tls tls = Tls.builder().enabled(enableTls).build(); + final PbjGrpcClientConfig grpcConfig = + new PbjGrpcClientConfig(timeoutDuration, tls, Optional.of(""), "application/grpc"); + + final WebClient webClient = WebClient.builder() .baseUri("http://" + blockNodeConfig.address() + ":" + blockNodeConfig.port()) - .protocolConfig(GrpcClientProtocolConfig.builder() + .tls(tls) + .protocolConfigs(List.of(GrpcClientProtocolConfig.builder() .abortPollTimeExpired(false) - .pollWaitTime(Duration.ofSeconds(30)) - .build()) - .keepAlive(true) + .pollWaitTime(timeoutDuration) + .build())) + .connectTimeout(timeoutDuration) .build(); - // Initialize clients for server status and block subscription - this.blockNodeServerStatusClient = new BlockNodeServerStatusClient(grpcClient); - this.blockNodeSubscribeClient = new BlockNodeSubscribeClient(grpcClient); + + PbjGrpcClient pbjGrpcClient = new PbjGrpcClient(webClient, grpcConfig); + + // we reuse the host connection with many services. + blockNodeServiceClient = new BlockNodeServiceInterface.BlockNodeServiceClient(pbjGrpcClient, OPTIONS); + this.blockStreamSubscribeUnparsedClient = new BlockStreamSubscribeUnparsedClient(pbjGrpcClient); } - public BlockNodeServerStatusClient getBlockNodeServerStatusClient() { - return blockNodeServerStatusClient; + /** + * Returns the BlockStreamSubscribeUnparsedClient for subscribing to block streams. + * + * @return the BlockStreamSubscribeUnparsedClient + */ + public BlockStreamSubscribeUnparsedClient getBlockstreamSubscribeUnparsedClient() { + return blockStreamSubscribeUnparsedClient; } - public BlockNodeSubscribeClient getBlockNodeSubscribeClient() { - return blockNodeSubscribeClient; + /** + * Returns the BlockNodeServiceClient for accessing block node services. + * + * @return the BlockNodeServiceClient + */ + public BlockNodeServiceInterface.BlockNodeServiceClient getBlockNodeServiceClient() { + return blockNodeServiceClient; } } diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeServerStatusClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeServerStatusClient.java deleted file mode 100644 index e53761ab1..000000000 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeServerStatusClient.java +++ /dev/null @@ -1,138 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -package org.hiero.block.node.backfill.client; - -import static java.util.Objects.requireNonNull; - -import com.hedera.pbj.runtime.Codec; -import com.hedera.pbj.runtime.ParseException; -import com.hedera.pbj.runtime.io.buffer.Bytes; -import edu.umd.cs.findbugs.annotations.NonNull; -import io.grpc.MethodDescriptor; -import io.grpc.stub.StreamObserver; -import io.helidon.grpc.core.MarshallerSupplier; -import io.helidon.webclient.grpc.GrpcClient; -import io.helidon.webclient.grpc.GrpcClientMethodDescriptor; -import io.helidon.webclient.grpc.GrpcServiceClient; -import io.helidon.webclient.grpc.GrpcServiceDescriptor; -import java.io.IOException; -import java.io.InputStream; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; -import org.hiero.block.api.BlockNodeServiceInterface; -import org.hiero.block.api.ServerStatusRequest; -import org.hiero.block.api.ServerStatusResponse; - -public class BlockNodeServerStatusClient implements StreamObserver { - private final GrpcServiceClient serverStatusServiceClient; - private final String methodName = BlockNodeServiceInterface.BlockNodeServiceMethod.serverStatus.name(); - // Per Request State - private AtomicReference replyRef; - private AtomicReference errorRef; - private CountDownLatch latch; - - public BlockNodeServerStatusClient(final GrpcClient grpcClient) { - // create service client for server status - this.serverStatusServiceClient = grpcClient.serviceClient(GrpcServiceDescriptor.builder() - .serviceName(BlockNodeServiceInterface.FULL_NAME) - .putMethod( - methodName, - GrpcClientMethodDescriptor.unary(BlockNodeServiceInterface.FULL_NAME, methodName) - .requestType(ServerStatusRequest.class) - .responseType(ServerStatusResponse.class) - .marshallerSupplier( - new BlockNodeServerStatusClient.ServerStatusRequestResponseMarshaller - .Supplier()) - .build()) - .build()); - } - - public ServerStatusResponse getServerStatus() { - // reset state for the request - replyRef = new AtomicReference<>(); - errorRef = new AtomicReference<>(); - latch = new CountDownLatch(1); - // Call - serverStatusServiceClient.unary(methodName, new ServerStatusRequest(), this); - // wait for response or error - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - // Check for error - if (errorRef.get() != null) { - if (errorRef.get() instanceof RuntimeException re) { - throw re; - } - throw new RuntimeException(errorRef.get()); - } - // Check for reply - if (replyRef.get() != null) { - return replyRef.get(); - } - // If we reach here, it means we did not receive a reply or an error - throw new RuntimeException("Call to serverStatus completed w/o receiving a reply or an error explicitly."); - } - - @Override - public void onNext(ServerStatusResponse serverStatusResponse) { - if (replyRef.get() != null) { - throw new IllegalStateException( - "serverStatus is unary, but received more than one reply. The latest reply is: " - + serverStatusResponse); - } - replyRef.set(serverStatusResponse); - latch.countDown(); - } - - @Override - public void onError(Throwable throwable) { - errorRef.set(throwable); - latch.countDown(); - } - - @Override - public void onCompleted() { - latch.countDown(); - } - - public static class ServerStatusRequestResponseMarshaller implements MethodDescriptor.Marshaller { - private final Codec codec; - - ServerStatusRequestResponseMarshaller(@NonNull final Class clazz) { - requireNonNull(clazz); - - if (clazz == ServerStatusRequest.class) { - this.codec = (Codec) ServerStatusRequest.PROTOBUF; - } else if (clazz == ServerStatusResponse.class) { - this.codec = (Codec) ServerStatusResponse.PROTOBUF; - } else { - throw new IllegalArgumentException("Unsupported class: " + clazz.getName()); - } - } - - @Override - public InputStream stream(@NonNull final T obj) { - requireNonNull(obj); - return codec.toBytes(obj).toInputStream(); - } - - @Override - public T parse(@NonNull final InputStream inputStream) { - requireNonNull(inputStream); - - try (inputStream) { - return codec.parse(Bytes.wrap(inputStream.readAllBytes())); - } catch (final ParseException | IOException e) { - throw new RuntimeException(e); - } - } - - public static class Supplier implements MarshallerSupplier { - @Override - public MethodDescriptor.Marshaller get(@NonNull final Class clazz) { - return new ServerStatusRequestResponseMarshaller<>(clazz); - } - } - } -} diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeSubscribeClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeSubscribeClient.java deleted file mode 100644 index d71cb7548..000000000 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeSubscribeClient.java +++ /dev/null @@ -1,198 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -package org.hiero.block.node.backfill.client; - -import static java.util.Objects.requireNonNull; - -import com.hedera.hapi.block.stream.output.BlockHeader; -import com.hedera.pbj.runtime.Codec; -import com.hedera.pbj.runtime.ParseException; -import com.hedera.pbj.runtime.io.buffer.Bytes; -import edu.umd.cs.findbugs.annotations.NonNull; -import io.grpc.MethodDescriptor; -import io.grpc.stub.StreamObserver; -import io.helidon.grpc.core.MarshallerSupplier; -import io.helidon.webclient.grpc.GrpcClient; -import io.helidon.webclient.grpc.GrpcClientMethodDescriptor; -import io.helidon.webclient.grpc.GrpcServiceClient; -import io.helidon.webclient.grpc.GrpcServiceDescriptor; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import org.hiero.block.api.BlockStreamSubscribeServiceInterface; -import org.hiero.block.api.SubscribeStreamRequest; -import org.hiero.block.api.SubscribeStreamResponse; -import org.hiero.block.internal.BlockItemUnparsed; -import org.hiero.block.internal.BlockUnparsed; -import org.hiero.block.internal.SubscribeStreamResponseUnparsed; - -public class BlockNodeSubscribeClient implements StreamObserver { - private final GrpcServiceClient blockStreamSubscribeServiceClient; - private final String methodName = - BlockStreamSubscribeServiceInterface.BlockStreamSubscribeServiceMethod.subscribeBlockStream.name(); - // Per Request State - private List currentBlockItems; - private AtomicLong currentBlockNumber; - private AtomicReference> replyRef; - private AtomicReference errorRef; - private CountDownLatch latch; - - public BlockNodeSubscribeClient(GrpcClient grpcClient) { - // create service client for server status - this.blockStreamSubscribeServiceClient = grpcClient.serviceClient(GrpcServiceDescriptor.builder() - .serviceName(BlockStreamSubscribeServiceInterface.FULL_NAME) - .putMethod( - methodName, - GrpcClientMethodDescriptor.serverStreaming( - BlockStreamSubscribeServiceInterface.FULL_NAME, methodName) - .requestType(SubscribeStreamRequest.class) - .responseType(SubscribeStreamResponseUnparsed.class) - .marshallerSupplier(new BlockStreamSubscribeMarshaller.Supplier()) - .build()) - .build()); - } - - public List getBatchOfBlocks(long startBlockNumber, long endBlockNumber) { - // Validate input parameters - if (startBlockNumber < 0 || endBlockNumber < 0 || startBlockNumber > endBlockNumber) { - throw new IllegalArgumentException("Invalid block range: " + startBlockNumber + " to " + endBlockNumber); - } - // reset state for the request - currentBlockItems = new ArrayList<>(); - currentBlockNumber = new AtomicLong(startBlockNumber); - replyRef = new AtomicReference<>(); - errorRef = new AtomicReference<>(); - latch = new CountDownLatch(1); - // Create request - SubscribeStreamRequest request = SubscribeStreamRequest.newBuilder() - .startBlockNumber(startBlockNumber) - .endBlockNumber(endBlockNumber) - .build(); - // Call - blockStreamSubscribeServiceClient.serverStream(methodName, request, this); - - // wait for response or error - try { - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while waiting for blocks", e); - } - if (errorRef.get() != null) { - throw new RuntimeException("Error fetching blocks", errorRef.get()); - } - return replyRef.get(); - } - - @Override - public void onNext(SubscribeStreamResponseUnparsed subscribeStreamResponse) { - if (subscribeStreamResponse.hasBlockItems()) { - List blockItems = - subscribeStreamResponse.blockItems().blockItems(); - // Check if is new Block - if (blockItems.getFirst().hasBlockHeader()) { - // verify is the expected block number - long expectedBlockNumber = currentBlockNumber.get(); - long actualBlockNumber = 0; - try { - actualBlockNumber = BlockHeader.PROTOBUF - .parse(blockItems.getFirst().blockHeaderOrThrow()) - .number(); - } catch (ParseException e) { - throw new RuntimeException(e); - } - if (actualBlockNumber != expectedBlockNumber) { - throw new IllegalStateException( - "Expected block number " + expectedBlockNumber + " but received " + actualBlockNumber); - } - // Create new Block and add to current block items - currentBlockItems = new ArrayList<>(blockItems); - } else { - // Add items to current block - currentBlockItems.addAll(blockItems); - } - - // Check if response contains block proof (end of block) - if (blockItems.getLast().hasBlockProof()) { - // Create Block from current items - BlockUnparsed block = - BlockUnparsed.newBuilder().blockItems(currentBlockItems).build(); - // Add to reply - List blocks = replyRef.get(); - if (blocks == null) { - blocks = new ArrayList<>(); - replyRef.set(blocks); - } - blocks.add(block); - // Reset current block items and number for next block - currentBlockItems = new ArrayList<>(); - currentBlockNumber.incrementAndGet(); - } - - } else if (subscribeStreamResponse.hasStatus()) { - // If response has code, set the status - SubscribeStreamResponse.Code codeStatus = subscribeStreamResponse.status(); - if (codeStatus != SubscribeStreamResponse.Code.SUCCESS) { - errorRef.set(new RuntimeException("Received error code: " + codeStatus)); - } - } else { - // If no block items and no code, this is unexpected - errorRef.set(new RuntimeException("Received unexpected response without block items or code")); - } - } - - @Override - public void onError(Throwable throwable) { - errorRef.set(throwable); - replyRef.set(null); - latch.countDown(); - } - - @Override - public void onCompleted() { - latch.countDown(); - } - - public static class BlockStreamSubscribeMarshaller implements MethodDescriptor.Marshaller { - private final Codec codec; - - public BlockStreamSubscribeMarshaller(@NonNull final Class clazz) { - requireNonNull(clazz); - - if (clazz == SubscribeStreamRequest.class) { - this.codec = (Codec) SubscribeStreamRequest.PROTOBUF; - } else if (clazz == SubscribeStreamResponseUnparsed.class) { - this.codec = (Codec) SubscribeStreamResponseUnparsed.PROTOBUF; - } else { - throw new IllegalArgumentException("Unsupported class: " + clazz.getName()); - } - } - - @Override - public InputStream stream(@NonNull final T obj) { - requireNonNull(obj); - return codec.toBytes(obj).toInputStream(); - } - - @Override - public T parse(@NonNull final InputStream inputStream) { - requireNonNull(inputStream); - - try (inputStream) { - return codec.parse(Bytes.wrap(inputStream.readAllBytes())); - } catch (final ParseException | IOException e) { - throw new RuntimeException(e); - } - } - - public static class Supplier implements MarshallerSupplier { - @Override - public MethodDescriptor.Marshaller get(@NonNull final Class clazz) { - return new BlockStreamSubscribeMarshaller<>(clazz); - } - } - } -} diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java new file mode 100644 index 000000000..0636f1203 --- /dev/null +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java @@ -0,0 +1,222 @@ +// SPDX-License-Identifier: Apache-2.0 +package org.hiero.block.node.backfill.client; + +import static java.lang.System.Logger.Level.DEBUG; +import static java.lang.System.Logger.Level.TRACE; +import static java.util.Objects.requireNonNull; +import static org.hiero.block.api.BlockStreamSubscribeServiceInterface.FULL_NAME; + +import com.hedera.hapi.block.stream.output.BlockHeader; +import com.hedera.pbj.runtime.ParseException; +import com.hedera.pbj.runtime.grpc.GrpcCall; +import com.hedera.pbj.runtime.grpc.GrpcClient; +import com.hedera.pbj.runtime.grpc.Pipeline; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Flow; +import org.hiero.block.api.SubscribeStreamRequest; +import org.hiero.block.api.SubscribeStreamResponse; +import org.hiero.block.internal.BlockItemUnparsed; +import org.hiero.block.internal.BlockUnparsed; +import org.hiero.block.internal.SubscribeStreamResponseUnparsed; + +/** + * Client for subscribing to block streams using unparsed responses. + *

+ * This implementation is request-isolated: + * it does not keep per-request mutable state on {@code this}. Each call to + * {@link #getBatchOfBlocks(long, long)} creates a per-request {@code RequestContext} + * and a per-request {@link Pipeline} that closes over that context. + * This design prevents late callbacks from one request from mutating the state of another + * and avoids the need for per-field atomic types. + * + *

Thread-safety: The instance is stateless across requests. You may invoke + * {@code getBatchOfBlocks} concurrently from multiple threads; each invocation uses + * its own context and pipeline. + */ +public class BlockStreamSubscribeUnparsedClient { + + private static final System.Logger LOGGER = System.getLogger(BlockStreamSubscribeUnparsedClient.class.getName()); + + // From constructor + private final GrpcClient grpcClient; + + /** + * Constructs a new client for subscribing to block streams. + * + * @param grpcClient the gRPC client to use for communication + */ + public BlockStreamSubscribeUnparsedClient(@NonNull final GrpcClient grpcClient) { + this.grpcClient = requireNonNull(grpcClient); + } + + /** + * Subscribes to a closed range of blocks and returns them as a list once the stream completes. + * + * @param startBlockNumber inclusive start + * @param endBlockNumber inclusive end + * @return list of received blocks (never {@code null}) + * @throws IllegalArgumentException on invalid range + * @throws RuntimeException on stream error or interruption + */ + public List getBatchOfBlocks(long startBlockNumber, long endBlockNumber) { + // Validate input parameters + if (startBlockNumber < 0 || endBlockNumber < 0 || startBlockNumber > endBlockNumber) { + throw new IllegalArgumentException("Invalid block range: " + startBlockNumber + " to " + endBlockNumber); + } + + // Build per-request context + final RequestContext ctx = new RequestContext(startBlockNumber); + + // Create request + final SubscribeStreamRequest request = SubscribeStreamRequest.newBuilder() + .startBlockNumber(startBlockNumber) + .endBlockNumber(endBlockNumber) + .build(); + + // Create a per-request pipeline that closes over `ctx` + final Pipeline pipeline = new SubscribePipeline(ctx); + + // Issue the call using the per-request pipeline + final GrpcCall call = grpcClient.createCall( + FULL_NAME + "/subscribeBlockStream", + SubscribeStreamRequest.PROTOBUF, + SubscribeStreamResponseUnparsed.PROTOBUF, + pipeline); + + call.sendRequest(request, true); + + // Wait for completion or error and return the blocks + return ctx.await(); + } + + /** + * Extracts the block number from a block header item. + */ + private static long extractBlockNumberFromBlockHeader(BlockItemUnparsed itemUnparsed) throws ParseException { + return BlockHeader.PROTOBUF.parse(itemUnparsed.blockHeaderOrThrow()).number(); + } + + /** + * Per-request state holder. All fields are confined to a single request. + * The {@link CountDownLatch} establishes happens-before from callback threads to the waiter. + */ + private static final class RequestContext { + final CountDownLatch done = new CountDownLatch(1); + final List blocks = new ArrayList<>(); + long expectedBlockNumber; + List currentBlockItems = new ArrayList<>(); + Throwable error; + + RequestContext(long startBlock) { + this.expectedBlockNumber = startBlock; + } + + void fail(Throwable t) { + this.error = t; + done.countDown(); + } + + void complete() { + done.countDown(); + } + + List await() { + try { + done.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (error != null) { + throw new RuntimeException("Error fetching blocks", error); + } + return blocks; + } + } + + /** + * Pipeline implementation for handling responses from the block stream subscription. + * It processes incoming {@link SubscribeStreamResponseUnparsed} messages and manages + * the state of the request context. + */ + private static final class SubscribePipeline implements Pipeline { + private final RequestContext ctx; + + /** + * Constructs a new pipeline for processing block stream subscription responses. + * + * @param ctx the request context to manage state across callbacks + */ + SubscribePipeline(RequestContext ctx) { + this.ctx = ctx; + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + LOGGER.log(TRACE, "received onSubscribe confirmation"); + // No backpressure negotiation needed for this pattern. + } + + @Override + public void onNext(SubscribeStreamResponseUnparsed resp) { + try { + if (resp.hasBlockItems()) { + final List frame = resp.blockItems().blockItems(); + + if (frame.getFirst().hasBlockHeader()) { + final long expected = ctx.expectedBlockNumber; + final long actual = extractBlockNumberFromBlockHeader(frame.getFirst()); + if (actual != expected) { + ctx.fail(new IllegalStateException( + "Expected block number " + expected + " but received " + actual)); + return; + } + // Start a new block: reuse the buffer and populate it. + ctx.currentBlockItems.clear(); + ctx.currentBlockItems.addAll(frame); + } else { + // Continuation: append to the same buffer. + ctx.currentBlockItems.addAll(frame); + } + + if (frame.getLast().hasBlockProof()) { + // Snapshot the current items to avoid retaining the large buffer in the finished block. + final List snapshot = List.copyOf(ctx.currentBlockItems); + ctx.blocks.add( + BlockUnparsed.newBuilder().blockItems(snapshot).build()); + ctx.currentBlockItems.clear(); + ctx.expectedBlockNumber++; + } + + } else if (resp.hasStatus()) { + final SubscribeStreamResponse.Code code = resp.status(); + if (code != SubscribeStreamResponse.Code.SUCCESS) { + ctx.fail(new RuntimeException("Received error code: " + code)); + } + } else { + ctx.fail(new RuntimeException("Received unexpected response without block items or code")); + } + } catch (ParseException e) { + LOGGER.log(DEBUG, "Parse error in block item", e); + ctx.fail(e); + } catch (RuntimeException e) { + LOGGER.log(DEBUG, "Runtime error processing SubscribeStreamResponseUnparsed", e); + ctx.fail(e); + } + } + + @Override + public void onError(Throwable throwable) { + LOGGER.log(TRACE, "received onError", throwable); + ctx.fail(throwable); + } + + @Override + public void onComplete() { + LOGGER.log(TRACE, "received onComplete"); + ctx.complete(); + } + } +} diff --git a/block-node/backfill/src/test/java/org/hiero/block/node/backfill/BackfillPluginTest.java b/block-node/backfill/src/test/java/org/hiero/block/node/backfill/BackfillPluginTest.java index 059833166..48614c391 100644 --- a/block-node/backfill/src/test/java/org/hiero/block/node/backfill/BackfillPluginTest.java +++ b/block-node/backfill/src/test/java/org/hiero/block/node/backfill/BackfillPluginTest.java @@ -58,16 +58,20 @@ void testBackfillPlugin() throws InterruptedException { // Config Override Map configOverride = BackfillConfigBuilder.NewBuilder() .backfillSourcePath(blockNodeSourcesPath) - .fetchBatchSize(20) + .fetchBatchSize(100) + .initialDelay(500) // start quickly .build(); // create a historical block facility for the plugin (should have a GAP) - final HistoricalBlockFacility historicalBlockFacilityForPlugin = getHistoricalBlockFacility(10, 20); + final HistoricalBlockFacility historicalBlockFacilityForPlugin = getHistoricalBlockFacility(200, 210); // start the plugin start(new BackfillPlugin(), historicalBlockFacilityForPlugin, configOverride); - CountDownLatch countDownLatch = new CountDownLatch(10); // 0 to 9 inclusive, so 10 blocks + // expected blocks to backfill + int expectedBlocksToBackfill = 200; // from 0 to 199 inclusive, so 200 blocks + + CountDownLatch countDownLatch = new CountDownLatch(expectedBlocksToBackfill); // 0 to 9 inclusive, so 10 blocks // register the backfill handler registerDefaultTestBackfillHandler(); // register the verification handler @@ -82,11 +86,11 @@ void testBackfillPlugin() throws InterruptedException { // Verify sent verifications assertEquals( - 10, + expectedBlocksToBackfill, blockMessaging.getSentPersistedNotifications().size(), "Should have sent 11 persisted notifications"); assertEquals( - 10, + expectedBlocksToBackfill, blockMessaging.getSentVerificationNotifications().size(), "Should have sent 11 verification notifications"); @@ -126,7 +130,7 @@ void testSecondarySourceBakcfill() throws InterruptedException { Map configOverride = BackfillConfigBuilder.NewBuilder() .backfillSourcePath(backfillSourcePath) .maxRetries(2) - .initialDelayMs(600) // start quickly + .initialDelay(500) // start quickly .build(); // create a historical block facility for the plugin (should have a GAP) @@ -289,6 +293,8 @@ void testBackfillOnDemandWhileAutonomousBackfillRunning() throws InterruptedExce // config override for test final Map configOverride = BackfillConfigBuilder.NewBuilder() .backfillSourcePath(backfillSourcePath) + .initialDelay(100) // start quickly + .scanInterval(500000) // scan every 500 seconds .build(); // create a historical block facility for the plugin (should have a GAP) @@ -331,8 +337,8 @@ public void handleVerification(VerificationNotification notification) { false, "test-backfill-handler"); - boolean startOnDemand = latch1.await(1, TimeUnit.MINUTES); // Wait until latch1.countDown() is called - assertTrue(startOnDemand, "Should have started on-demand backfill while autonomous backfill is running"); + boolean startAutonomous = latch1.await(1, TimeUnit.MINUTES); // Wait until latch1.countDown() is called + assertTrue(startAutonomous, "Should have started on-demand backfill while autonomous backfill is running"); // Trigger the on-demand backfill by sending a NewestBlockKnownToNetworkNotification NewestBlockKnownToNetworkNotification newestBlockNotification = new NewestBlockKnownToNetworkNotification(200L); this.blockMessaging.sendNewestBlockKnownToNetwork(newestBlockNotification); @@ -400,8 +406,8 @@ void testBackfillPartialAvailableSourcesForGap() throws InterruptedException { // config override Map config = BackfillConfigBuilder.NewBuilder() .backfillSourcePath(backfillSourcePath) - .initialDelayMs(100) // start quickly - .scanIntervalMs(500000) // scan every 500 seconds + .initialDelay(100) // start quickly + .scanInterval(500000) // scan every 500 seconds .build(); // create a historical block facility for the plugin (should have a GAP from 0 to 124) @@ -526,13 +532,14 @@ private static class BackfillConfigBuilder { // Fields with default valuess private String backfillSourcePath; private int fetchBatchSize = 10; - private int delayBetweenBatchesMs = 100; - private int initialDelayMs = 1000; - private int initialRetryDelayMs = 1000; + private int delayBetweenBatches = 100; + private int initialDelay = 500; + private int initialRetryDelay = 500; private int maxRetries = 3; private int scanIntervalMs = 60000; // 60 seconds private long startBlock = 0L; private long endBlock = -1L; // -1 means no end block, backfill until the latest block + private int perBlockProcessingTimeout = 500; // half second private BackfillConfigBuilder() { // private to force use of NewBuilder() @@ -552,18 +559,18 @@ public BackfillConfigBuilder fetchBatchSize(int value) { return this; } - public BackfillConfigBuilder delayBetweenBatchesMs(int value) { - this.delayBetweenBatchesMs = value; + public BackfillConfigBuilder delayBetweenBatches(int value) { + this.delayBetweenBatches = value; return this; } - public BackfillConfigBuilder initialDelayMs(int value) { - this.initialDelayMs = value; + public BackfillConfigBuilder initialDelay(int value) { + this.initialDelay = value; return this; } - public BackfillConfigBuilder initialRetryDelayMs(int value) { - this.initialRetryDelayMs = value; + public BackfillConfigBuilder initialRetryDelay(int value) { + this.initialRetryDelay = value; return this; } @@ -572,7 +579,7 @@ public BackfillConfigBuilder maxRetries(int value) { return this; } - public BackfillConfigBuilder scanIntervalMs(int value) { + public BackfillConfigBuilder scanInterval(int value) { this.scanIntervalMs = value; return this; } @@ -587,6 +594,11 @@ public BackfillConfigBuilder endBlock(long value) { return this; } + public BackfillConfigBuilder perBlockProcessingTimeout(int value) { + this.perBlockProcessingTimeout = value; + return this; + } + public Map build() { if (backfillSourcePath == null || backfillSourcePath.isBlank()) { throw new IllegalStateException("backfillSourcePath is required"); @@ -595,13 +607,14 @@ public Map build() { return Map.of( "backfill.blockNodeSourcesPath", backfillSourcePath, "backfill.fetchBatchSize", String.valueOf(fetchBatchSize), - "backfill.delayBetweenBatchesMs", String.valueOf(delayBetweenBatchesMs), - "backfill.initialDelayMs", String.valueOf(initialDelayMs), - "backfill.initialRetryDelayMs", String.valueOf(initialRetryDelayMs), + "backfill.delayBetweenBatches", String.valueOf(delayBetweenBatches), + "backfill.initialDelay", String.valueOf(initialDelay), + "backfill.initialRetryDelay", String.valueOf(initialRetryDelay), "backfill.maxRetries", String.valueOf(maxRetries), - "backfill.scanIntervalMs", String.valueOf(scanIntervalMs), + "backfill.scanInterval", String.valueOf(scanIntervalMs), "backfill.startBlock", String.valueOf(startBlock), - "backfill.endBlock", String.valueOf(endBlock)); + "backfill.endBlock", String.valueOf(endBlock), + "backfill.perBlockProcessingTimeout", String.valueOf(perBlockProcessingTimeout)); } } } diff --git a/block-node/protobuf-pbj/build.gradle.kts b/block-node/protobuf-pbj/build.gradle.kts index 98ed3a6a5..953eb2bbb 100644 --- a/block-node/protobuf-pbj/build.gradle.kts +++ b/block-node/protobuf-pbj/build.gradle.kts @@ -3,7 +3,7 @@ plugins { id("org.hiero.gradle.module.library") // When upgrading pbjVersion, also need to update pbjVersion on // hiero-dependency-versions/build.gradle.kts - id("com.hedera.pbj.pbj-compiler") version "0.11.13" + id("com.hedera.pbj.pbj-compiler") version "0.11.14" } description = "Hiero Block Node Protobuf PBJ API" diff --git a/hiero-dependency-versions/build.gradle.kts b/hiero-dependency-versions/build.gradle.kts index 84c904718..260423171 100644 --- a/hiero-dependency-versions/build.gradle.kts +++ b/hiero-dependency-versions/build.gradle.kts @@ -17,7 +17,7 @@ dependencies.constraints { val helidonVersion = "4.2.3" // When Upgrading pbjVersion, also need to update pbjCompiler version on // block-node/protobuf-pbj/build.gradle.kts - val pbjVersion = "0.11.13" + val pbjVersion = "0.11.14" val protobufVersion = "4.31.1" val swirldsVersion = "0.61.3" val mockitoVersion = "5.19.0" @@ -36,6 +36,9 @@ dependencies.constraints { because("com.google.protobuf.util") } api("com.google.protobuf:protoc:$protobufVersion") { because("google.proto") } + api("com.hedera.pbj:pbj-grpc-client-helidon:${pbjVersion}") { + because("com.hedera.pbj.grpc.client.helidon") + } api("com.hedera.pbj:pbj-grpc-helidon:${pbjVersion}") { because("com.hedera.pbj.grpc.helidon") } api("com.hedera.pbj:pbj-grpc-helidon-config:${pbjVersion}") { because("com.hedera.pbj.grpc.helidon.config")