From 674e89db66385a41b97bea4d5336316cf119b1e6 Mon Sep 17 00:00:00 2001 From: Alfredo Gutierrez Grajeda Date: Fri, 15 Aug 2025 13:26:23 -0600 Subject: [PATCH 1/7] Refactored the BlockNodeClient to use PBJ gRPC Client Helidon, instead of Vanilla Helidon. On the case of BlockStreamSubscribe I also refactored the custom client to use PBJ but with Unparsed response Codec. Signed-off-by: Alfredo Gutierrez Grajeda --- block-node/backfill/build.gradle.kts | 4 +- .../backfill/src/main/java/module-info.java | 1 + .../node/backfill/BackfillGrpcClient.java | 13 +- .../node/backfill/client/BlockNodeClient.java | 66 +++-- .../client/BlockNodeSubscribeClient.java | 198 --------------- .../BlockStreamSubscribeUnparsedClient.java | 238 ++++++++++++++++++ .../node/backfill/BackfillPluginTest.java | 57 +++-- block-node/protobuf-pbj/build.gradle.kts | 2 +- hiero-dependency-versions/build.gradle.kts | 5 +- 9 files changed, 339 insertions(+), 245 deletions(-) delete mode 100644 block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeSubscribeClient.java create mode 100644 block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java diff --git a/block-node/backfill/build.gradle.kts b/block-node/backfill/build.gradle.kts index 8317ba093..f75c9b9a6 100644 --- a/block-node/backfill/build.gradle.kts +++ b/block-node/backfill/build.gradle.kts @@ -1,7 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 plugins { id("org.hiero.gradle.module.library") - id("com.hedera.pbj.pbj-compiler") version "0.11.13" + id("com.hedera.pbj.pbj-compiler") version "0.11.14" } description = "Hiero Block Node Backfill Plugin" @@ -25,6 +25,8 @@ mainModuleInfo { runtimeOnly("org.apache.logging.log4j.slf4j2.impl") runtimeOnly("io.helidon.logging.jul") runtimeOnly("com.hedera.pbj.grpc.helidon.config") + runtimeOnly("com.hedera.pbj.grpc.client.helidon") + runtimeOnly("com.hedera.pbj.grpc.helidon") } testModuleInfo { diff --git a/block-node/backfill/src/main/java/module-info.java b/block-node/backfill/src/main/java/module-info.java index 055684a57..aa815f002 100644 --- a/block-node/backfill/src/main/java/module-info.java +++ b/block-node/backfill/src/main/java/module-info.java @@ -24,6 +24,7 @@ requires transitive io.grpc; requires transitive io.helidon.grpc.core; requires transitive io.helidon.webclient.grpc; + requires com.hedera.pbj.grpc.client.helidon; requires org.hiero.block.node.base; requires io.helidon.common.tls; requires io.helidon.webclient.api; diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java index bfe848472..da5cac3df 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java @@ -15,6 +15,7 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import org.hiero.block.api.ServerStatusRequest; import org.hiero.block.internal.BlockUnparsed; import org.hiero.block.node.backfill.client.BackfillSource; import org.hiero.block.node.backfill.client.BackfillSourceConfig; @@ -84,10 +85,12 @@ public BackfillGrpcClient( * @return a LongRange representing the intersection of the block range and the available blocks in the node. */ private LongRange getAvailableRangeInNode(BlockNodeClient node, LongRange blockRange) { - long firstAvailableBlock = - node.getBlockNodeServerStatusClient().getServerStatus().firstAvailableBlock(); - long lastAvailableBlock = - node.getBlockNodeServerStatusClient().getServerStatus().lastAvailableBlock(); + long firstAvailableBlock = node.getBlockNodeServiceClient() + .serverStatus(new ServerStatusRequest()) + .firstAvailableBlock(); + long lastAvailableBlock = node.getBlockNodeServiceClient() + .serverStatus(new ServerStatusRequest()) + .lastAvailableBlock(); long start = blockRange.start(); long end = blockRange.end(); @@ -148,7 +151,7 @@ public List fetchBlocks(LongRange blockRange) { // Try to fetch blocks from this node return currentNodeClient - .getBlockNodeSubscribeClient() + .getBlockstreamSubscribeUnparsedClient() .getBatchOfBlocks(actualRange.start(), actualRange.end()); } catch (Exception e) { if (attempt == maxRetries) { diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java index d09ed1abd..627636e38 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java @@ -1,37 +1,71 @@ // SPDX-License-Identifier: Apache-2.0 package org.hiero.block.node.backfill.client; +import com.hedera.pbj.grpc.client.helidon.PbjGrpcClient; +import com.hedera.pbj.grpc.client.helidon.PbjGrpcClientConfig; +import com.hedera.pbj.runtime.grpc.ServiceInterface; import io.helidon.common.tls.Tls; -import io.helidon.webclient.grpc.GrpcClient; +import io.helidon.webclient.api.WebClient; import io.helidon.webclient.grpc.GrpcClientProtocolConfig; import java.time.Duration; +import java.util.List; +import java.util.Optional; +import org.hiero.block.api.BlockNodeServiceInterface; public class BlockNodeClient { - private final BlockNodeServerStatusClient blockNodeServerStatusClient; - private final BlockNodeSubscribeClient blockNodeSubscribeClient; + // Options definition for all gRPC services in the block node client + private record Options(Optional authority, String contentType) implements ServiceInterface.RequestOptions {} + private static final BlockNodeClient.Options OPTIONS = + new BlockNodeClient.Options(Optional.empty(), ServiceInterface.RequestOptions.APPLICATION_GRPC); + // block node services + + private final BlockStreamSubscribeUnparsedClient blockStreamSubscribeUnparsedClient; + private final BlockNodeServiceInterface.BlockNodeServiceClient blockNodeServiceClient; + + /** + * Constructs a BlockNodeClient using the provided configuration. + * + * @param blockNodeConfig the configuration for the block node, including address and port + */ public BlockNodeClient(BackfillSourceConfig blockNodeConfig) { - // Initialize gRPC client with the block node configuration - GrpcClient grpcClient = GrpcClient.builder() - .tls(Tls.builder().enabled(false).build()) + final Tls tls = Tls.builder().enabled(false).build(); + final PbjGrpcClientConfig grpcConfig = + new PbjGrpcClientConfig(Duration.ofSeconds(30), tls, Optional.of(""), "application/grpc"); + + final WebClient webClient = WebClient.builder() .baseUri("http://" + blockNodeConfig.address() + ":" + blockNodeConfig.port()) - .protocolConfig(GrpcClientProtocolConfig.builder() + .tls(tls) + .protocolConfigs(List.of(GrpcClientProtocolConfig.builder() .abortPollTimeExpired(false) .pollWaitTime(Duration.ofSeconds(30)) - .build()) - .keepAlive(true) + .build())) + .connectTimeout(Duration.ofSeconds(30)) .build(); - // Initialize clients for server status and block subscription - this.blockNodeServerStatusClient = new BlockNodeServerStatusClient(grpcClient); - this.blockNodeSubscribeClient = new BlockNodeSubscribeClient(grpcClient); + + PbjGrpcClient pbjGrpcClient = new PbjGrpcClient(webClient, grpcConfig); + + // we reuse the host connection with many services. + blockNodeServiceClient = new BlockNodeServiceInterface.BlockNodeServiceClient(pbjGrpcClient, OPTIONS); + this.blockStreamSubscribeUnparsedClient = new BlockStreamSubscribeUnparsedClient(pbjGrpcClient, OPTIONS); } - public BlockNodeServerStatusClient getBlockNodeServerStatusClient() { - return blockNodeServerStatusClient; + /** + * Returns the BlockStreamSubscribeUnparsedClient for subscribing to block streams. + * + * @return the BlockStreamSubscribeUnparsedClient + */ + public BlockStreamSubscribeUnparsedClient getBlockstreamSubscribeUnparsedClient() { + return blockStreamSubscribeUnparsedClient; } - public BlockNodeSubscribeClient getBlockNodeSubscribeClient() { - return blockNodeSubscribeClient; + /** + * Returns the BlockNodeServiceClient for accessing block node services. + * + * @return the BlockNodeServiceClient + */ + public BlockNodeServiceInterface.BlockNodeServiceClient getBlockNodeServiceClient() { + return blockNodeServiceClient; } } diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeSubscribeClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeSubscribeClient.java deleted file mode 100644 index d71cb7548..000000000 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeSubscribeClient.java +++ /dev/null @@ -1,198 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -package org.hiero.block.node.backfill.client; - -import static java.util.Objects.requireNonNull; - -import com.hedera.hapi.block.stream.output.BlockHeader; -import com.hedera.pbj.runtime.Codec; -import com.hedera.pbj.runtime.ParseException; -import com.hedera.pbj.runtime.io.buffer.Bytes; -import edu.umd.cs.findbugs.annotations.NonNull; -import io.grpc.MethodDescriptor; -import io.grpc.stub.StreamObserver; -import io.helidon.grpc.core.MarshallerSupplier; -import io.helidon.webclient.grpc.GrpcClient; -import io.helidon.webclient.grpc.GrpcClientMethodDescriptor; -import io.helidon.webclient.grpc.GrpcServiceClient; -import io.helidon.webclient.grpc.GrpcServiceDescriptor; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import org.hiero.block.api.BlockStreamSubscribeServiceInterface; -import org.hiero.block.api.SubscribeStreamRequest; -import org.hiero.block.api.SubscribeStreamResponse; -import org.hiero.block.internal.BlockItemUnparsed; -import org.hiero.block.internal.BlockUnparsed; -import org.hiero.block.internal.SubscribeStreamResponseUnparsed; - -public class BlockNodeSubscribeClient implements StreamObserver { - private final GrpcServiceClient blockStreamSubscribeServiceClient; - private final String methodName = - BlockStreamSubscribeServiceInterface.BlockStreamSubscribeServiceMethod.subscribeBlockStream.name(); - // Per Request State - private List currentBlockItems; - private AtomicLong currentBlockNumber; - private AtomicReference> replyRef; - private AtomicReference errorRef; - private CountDownLatch latch; - - public BlockNodeSubscribeClient(GrpcClient grpcClient) { - // create service client for server status - this.blockStreamSubscribeServiceClient = grpcClient.serviceClient(GrpcServiceDescriptor.builder() - .serviceName(BlockStreamSubscribeServiceInterface.FULL_NAME) - .putMethod( - methodName, - GrpcClientMethodDescriptor.serverStreaming( - BlockStreamSubscribeServiceInterface.FULL_NAME, methodName) - .requestType(SubscribeStreamRequest.class) - .responseType(SubscribeStreamResponseUnparsed.class) - .marshallerSupplier(new BlockStreamSubscribeMarshaller.Supplier()) - .build()) - .build()); - } - - public List getBatchOfBlocks(long startBlockNumber, long endBlockNumber) { - // Validate input parameters - if (startBlockNumber < 0 || endBlockNumber < 0 || startBlockNumber > endBlockNumber) { - throw new IllegalArgumentException("Invalid block range: " + startBlockNumber + " to " + endBlockNumber); - } - // reset state for the request - currentBlockItems = new ArrayList<>(); - currentBlockNumber = new AtomicLong(startBlockNumber); - replyRef = new AtomicReference<>(); - errorRef = new AtomicReference<>(); - latch = new CountDownLatch(1); - // Create request - SubscribeStreamRequest request = SubscribeStreamRequest.newBuilder() - .startBlockNumber(startBlockNumber) - .endBlockNumber(endBlockNumber) - .build(); - // Call - blockStreamSubscribeServiceClient.serverStream(methodName, request, this); - - // wait for response or error - try { - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while waiting for blocks", e); - } - if (errorRef.get() != null) { - throw new RuntimeException("Error fetching blocks", errorRef.get()); - } - return replyRef.get(); - } - - @Override - public void onNext(SubscribeStreamResponseUnparsed subscribeStreamResponse) { - if (subscribeStreamResponse.hasBlockItems()) { - List blockItems = - subscribeStreamResponse.blockItems().blockItems(); - // Check if is new Block - if (blockItems.getFirst().hasBlockHeader()) { - // verify is the expected block number - long expectedBlockNumber = currentBlockNumber.get(); - long actualBlockNumber = 0; - try { - actualBlockNumber = BlockHeader.PROTOBUF - .parse(blockItems.getFirst().blockHeaderOrThrow()) - .number(); - } catch (ParseException e) { - throw new RuntimeException(e); - } - if (actualBlockNumber != expectedBlockNumber) { - throw new IllegalStateException( - "Expected block number " + expectedBlockNumber + " but received " + actualBlockNumber); - } - // Create new Block and add to current block items - currentBlockItems = new ArrayList<>(blockItems); - } else { - // Add items to current block - currentBlockItems.addAll(blockItems); - } - - // Check if response contains block proof (end of block) - if (blockItems.getLast().hasBlockProof()) { - // Create Block from current items - BlockUnparsed block = - BlockUnparsed.newBuilder().blockItems(currentBlockItems).build(); - // Add to reply - List blocks = replyRef.get(); - if (blocks == null) { - blocks = new ArrayList<>(); - replyRef.set(blocks); - } - blocks.add(block); - // Reset current block items and number for next block - currentBlockItems = new ArrayList<>(); - currentBlockNumber.incrementAndGet(); - } - - } else if (subscribeStreamResponse.hasStatus()) { - // If response has code, set the status - SubscribeStreamResponse.Code codeStatus = subscribeStreamResponse.status(); - if (codeStatus != SubscribeStreamResponse.Code.SUCCESS) { - errorRef.set(new RuntimeException("Received error code: " + codeStatus)); - } - } else { - // If no block items and no code, this is unexpected - errorRef.set(new RuntimeException("Received unexpected response without block items or code")); - } - } - - @Override - public void onError(Throwable throwable) { - errorRef.set(throwable); - replyRef.set(null); - latch.countDown(); - } - - @Override - public void onCompleted() { - latch.countDown(); - } - - public static class BlockStreamSubscribeMarshaller implements MethodDescriptor.Marshaller { - private final Codec codec; - - public BlockStreamSubscribeMarshaller(@NonNull final Class clazz) { - requireNonNull(clazz); - - if (clazz == SubscribeStreamRequest.class) { - this.codec = (Codec) SubscribeStreamRequest.PROTOBUF; - } else if (clazz == SubscribeStreamResponseUnparsed.class) { - this.codec = (Codec) SubscribeStreamResponseUnparsed.PROTOBUF; - } else { - throw new IllegalArgumentException("Unsupported class: " + clazz.getName()); - } - } - - @Override - public InputStream stream(@NonNull final T obj) { - requireNonNull(obj); - return codec.toBytes(obj).toInputStream(); - } - - @Override - public T parse(@NonNull final InputStream inputStream) { - requireNonNull(inputStream); - - try (inputStream) { - return codec.parse(Bytes.wrap(inputStream.readAllBytes())); - } catch (final ParseException | IOException e) { - throw new RuntimeException(e); - } - } - - public static class Supplier implements MarshallerSupplier { - @Override - public MethodDescriptor.Marshaller get(@NonNull final Class clazz) { - return new BlockStreamSubscribeMarshaller<>(clazz); - } - } - } -} diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java new file mode 100644 index 000000000..5f69fa318 --- /dev/null +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java @@ -0,0 +1,238 @@ +// SPDX-License-Identifier: Apache-2.0 +package org.hiero.block.node.backfill.client; + +import static java.lang.System.Logger.Level.TRACE; +import static java.util.Objects.requireNonNull; +import static org.hiero.block.api.BlockStreamSubscribeServiceInterface.FULL_NAME; + +import com.hedera.hapi.block.stream.output.BlockHeader; +import com.hedera.pbj.runtime.Codec; +import com.hedera.pbj.runtime.ParseException; +import com.hedera.pbj.runtime.grpc.GrpcCall; +import com.hedera.pbj.runtime.grpc.GrpcClient; +import com.hedera.pbj.runtime.grpc.Pipeline; +import com.hedera.pbj.runtime.grpc.ServiceInterface; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.hiero.block.api.SubscribeStreamRequest; +import org.hiero.block.api.SubscribeStreamResponse; +import org.hiero.block.internal.BlockItemUnparsed; +import org.hiero.block.internal.BlockUnparsed; +import org.hiero.block.internal.SubscribeStreamResponseUnparsed; + +/** + * Client for subscribing to block streams using unparsed responses. + * This client handles the subscription and processes incoming block items, + * accumulating them into blocks until a complete block is received. + *

+ * Might be thought as a similar to BlockNodeServiceInterface.BlockStreamSubscribeServiceClient + * but specifically for unparsed responses and with convenience abstractions for handling closed range + * requests and responses without dealing directly with streams and pipelines. + */ +public class BlockStreamSubscribeUnparsedClient implements Pipeline { + // Logger + private static final System.Logger LOGGER = System.getLogger(BlockStreamSubscribeUnparsedClient.class.getName()); + + // from constructor + private final GrpcClient grpcClient; + private final ServiceInterface.RequestOptions requestOptions; + + // Per Request State + private List currentBlockItems; + private AtomicLong currentBlockNumber; + private AtomicReference> replyRef; + private AtomicReference errorRef; + private CountDownLatch latch; + + public BlockStreamSubscribeUnparsedClient( + @NonNull final GrpcClient grpcClient, @NonNull final ServiceInterface.RequestOptions requestOptions) { + this.grpcClient = requireNonNull(grpcClient); + this.requestOptions = requireNonNull(requestOptions); + } + + /** + * Subscribes to a batch of blocks from the block stream. + *

+ * This method sends a request to subscribe to a range of blocks specified by the start and end block numbers. + * It waits for the whole response, accumulating block items into blocks, until the end block is reached or an error occurs. + * Then it returns a list of blocks that were received during the subscription. + *

+ * meant to be used for closed ranges of blocks where start and end are both specified and used within a specific batch size + * that avoids overwhelming the client with too many blocks at once. and long wait times. + * + * @param startBlockNumber the starting block number (inclusive) + * @param endBlockNumber the ending block number (inclusive) + * @return a list of blocks received during the subscription + * @throws IllegalArgumentException if the start or end block number is invalid or endBlock is less than startBlock + */ + public List getBatchOfBlocks(long startBlockNumber, long endBlockNumber) { + // Validate input parameters + if (startBlockNumber < 0 || endBlockNumber < 0 || startBlockNumber > endBlockNumber) { + throw new IllegalArgumentException("Invalid block range: " + startBlockNumber + " to " + endBlockNumber); + } + // reset state for the request + currentBlockItems = new ArrayList<>(); + currentBlockNumber = new AtomicLong(startBlockNumber); + replyRef = new AtomicReference<>(); + errorRef = new AtomicReference<>(); + latch = new CountDownLatch(1); + // Create request + SubscribeStreamRequest request = SubscribeStreamRequest.newBuilder() + .startBlockNumber(startBlockNumber) + .endBlockNumber(endBlockNumber) + .build(); + // Call + final GrpcCall call = grpcClient.createCall( + FULL_NAME + "/subscribeBlockStream", + getSubscribeStreamRequestCodec(requestOptions), + getSubscribeStreamResponseUnparsedCodec(requestOptions), + this); + call.sendRequest(request, true); + + // wait for response or error + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for blocks", e); + } + if (errorRef.get() != null) { + throw new RuntimeException("Error fetching blocks", errorRef.get()); + } + return replyRef.get(); + } + + /** + * Does nothing on subscription. + * @param subscription a new subscription + */ + @Override + public void onSubscribe(Flow.Subscription subscription) { + LOGGER.log(TRACE, "received onSubscribe confirmation"); + // No action needed on subscription + } + + /** + * Handles incoming SubscribeStreamResponseUnparsed messages. + * Accumulates block items into blocks and manages state transitions. + *

+ * Processes the response by checking if it contains block items or status codes. + * If it contains block items, it checks if the first item is a block header, + * indicating the start of a new block. It verifies the block number and accumulates items + * into the current block. + * If the last item is a block proof, it finalizes the current block, + * adds it to the reply list, + * and resets the current block items and number for the next block. + * If the response contains a status code, + * it sets the error reference if the code indicates an error. + * If the response does not contain block items or a status code, + * it sets an error indicating an unexpected response. + * @param subscribeStreamResponse the response to process + */ + @Override + public void onNext(SubscribeStreamResponseUnparsed subscribeStreamResponse) { + if (subscribeStreamResponse.hasBlockItems()) { + List blockItems = + subscribeStreamResponse.blockItems().blockItems(); + // Check if is new Block + if (blockItems.getFirst().hasBlockHeader()) { + // verify is the expected block number + long expectedBlockNumber = currentBlockNumber.get(); + long actualBlockNumber = 0; + try { + actualBlockNumber = BlockHeader.PROTOBUF + .parse(blockItems.getFirst().blockHeaderOrThrow()) + .number(); + } catch (ParseException e) { + throw new RuntimeException(e); + } + if (actualBlockNumber != expectedBlockNumber) { + throw new IllegalStateException( + "Expected block number " + expectedBlockNumber + " but received " + actualBlockNumber); + } + // Create new Block and add to current block items + currentBlockItems = new ArrayList<>(blockItems); + } else { + // Add items to current block + currentBlockItems.addAll(blockItems); + } + + // Check if response contains block proof (end of block) + if (blockItems.getLast().hasBlockProof()) { + // Create Block from current items + BlockUnparsed block = + BlockUnparsed.newBuilder().blockItems(currentBlockItems).build(); + // Add to reply + List blocks = replyRef.get(); + if (blocks == null) { + blocks = new ArrayList<>(); + replyRef.set(blocks); + } + blocks.add(block); + // Reset current block items and number for next block + currentBlockItems = new ArrayList<>(); + currentBlockNumber.incrementAndGet(); + } + + } else if (subscribeStreamResponse.hasStatus()) { + // If response has code, set the status + SubscribeStreamResponse.Code codeStatus = subscribeStreamResponse.status(); + if (codeStatus != SubscribeStreamResponse.Code.SUCCESS) { + errorRef.set(new RuntimeException("Received error code: " + codeStatus)); + } + } else { + // If no block items and no code, this is unexpected + errorRef.set(new RuntimeException("Received unexpected response without block items or code")); + } + } + + /** + * Handles errors during the subscription. + * Sets the error reference and releases the latch to signal completion. + * @param throwable the error encountered + */ + @Override + public void onError(Throwable throwable) { + LOGGER.log(TRACE, "received onError", throwable); + errorRef.set(throwable); + replyRef.set(null); + latch.countDown(); + } + + @Override + public void onComplete() { + LOGGER.log(TRACE, "received onComplete"); + latch.countDown(); + } + + private static Codec getSubscribeStreamRequestCodec( + @NonNull final ServiceInterface.RequestOptions options) { + Objects.requireNonNull(options); + + // Default to protobuf, and don't error out if both are set: + if (options.isJson() && !options.isProtobuf()) { + return SubscribeStreamRequest.JSON; + } else { + return SubscribeStreamRequest.PROTOBUF; + } + } + + @NonNull + private static Codec getSubscribeStreamResponseUnparsedCodec( + @NonNull final ServiceInterface.RequestOptions options) { + Objects.requireNonNull(options); + + // Default to protobuf, and don't error out if both are set: + if (options.isJson() && !options.isProtobuf()) { + return SubscribeStreamResponseUnparsed.JSON; + } else { + return SubscribeStreamResponseUnparsed.PROTOBUF; + } + } +} diff --git a/block-node/backfill/src/test/java/org/hiero/block/node/backfill/BackfillPluginTest.java b/block-node/backfill/src/test/java/org/hiero/block/node/backfill/BackfillPluginTest.java index 059833166..4c31da15b 100644 --- a/block-node/backfill/src/test/java/org/hiero/block/node/backfill/BackfillPluginTest.java +++ b/block-node/backfill/src/test/java/org/hiero/block/node/backfill/BackfillPluginTest.java @@ -58,16 +58,20 @@ void testBackfillPlugin() throws InterruptedException { // Config Override Map configOverride = BackfillConfigBuilder.NewBuilder() .backfillSourcePath(blockNodeSourcesPath) - .fetchBatchSize(20) + .fetchBatchSize(100) + .initialDelay(500) // start quickly .build(); // create a historical block facility for the plugin (should have a GAP) - final HistoricalBlockFacility historicalBlockFacilityForPlugin = getHistoricalBlockFacility(10, 20); + final HistoricalBlockFacility historicalBlockFacilityForPlugin = getHistoricalBlockFacility(200, 210); // start the plugin start(new BackfillPlugin(), historicalBlockFacilityForPlugin, configOverride); - CountDownLatch countDownLatch = new CountDownLatch(10); // 0 to 9 inclusive, so 10 blocks + // expected blocks to backfill + int expectedBlocksToBackfill = 200; // from 0 to 199 inclusive, so 200 blocks + + CountDownLatch countDownLatch = new CountDownLatch(expectedBlocksToBackfill); // 0 to 9 inclusive, so 10 blocks // register the backfill handler registerDefaultTestBackfillHandler(); // register the verification handler @@ -82,11 +86,11 @@ void testBackfillPlugin() throws InterruptedException { // Verify sent verifications assertEquals( - 10, + expectedBlocksToBackfill, blockMessaging.getSentPersistedNotifications().size(), "Should have sent 11 persisted notifications"); assertEquals( - 10, + expectedBlocksToBackfill, blockMessaging.getSentVerificationNotifications().size(), "Should have sent 11 verification notifications"); @@ -126,7 +130,7 @@ void testSecondarySourceBakcfill() throws InterruptedException { Map configOverride = BackfillConfigBuilder.NewBuilder() .backfillSourcePath(backfillSourcePath) .maxRetries(2) - .initialDelayMs(600) // start quickly + .initialDelay(500) // start quickly .build(); // create a historical block facility for the plugin (should have a GAP) @@ -400,8 +404,8 @@ void testBackfillPartialAvailableSourcesForGap() throws InterruptedException { // config override Map config = BackfillConfigBuilder.NewBuilder() .backfillSourcePath(backfillSourcePath) - .initialDelayMs(100) // start quickly - .scanIntervalMs(500000) // scan every 500 seconds + .initialDelay(100) // start quickly + .scanInterval(500000) // scan every 500 seconds .build(); // create a historical block facility for the plugin (should have a GAP from 0 to 124) @@ -526,13 +530,14 @@ private static class BackfillConfigBuilder { // Fields with default valuess private String backfillSourcePath; private int fetchBatchSize = 10; - private int delayBetweenBatchesMs = 100; - private int initialDelayMs = 1000; - private int initialRetryDelayMs = 1000; + private int delayBetweenBatches = 100; + private int initialDelay = 500; + private int initialRetryDelay = 500; private int maxRetries = 3; private int scanIntervalMs = 60000; // 60 seconds private long startBlock = 0L; private long endBlock = -1L; // -1 means no end block, backfill until the latest block + private int perBlockProcessingTimeout = 500; // half second private BackfillConfigBuilder() { // private to force use of NewBuilder() @@ -552,18 +557,18 @@ public BackfillConfigBuilder fetchBatchSize(int value) { return this; } - public BackfillConfigBuilder delayBetweenBatchesMs(int value) { - this.delayBetweenBatchesMs = value; + public BackfillConfigBuilder delayBetweenBatches(int value) { + this.delayBetweenBatches = value; return this; } - public BackfillConfigBuilder initialDelayMs(int value) { - this.initialDelayMs = value; + public BackfillConfigBuilder initialDelay(int value) { + this.initialDelay = value; return this; } - public BackfillConfigBuilder initialRetryDelayMs(int value) { - this.initialRetryDelayMs = value; + public BackfillConfigBuilder initialRetryDelay(int value) { + this.initialRetryDelay = value; return this; } @@ -572,7 +577,7 @@ public BackfillConfigBuilder maxRetries(int value) { return this; } - public BackfillConfigBuilder scanIntervalMs(int value) { + public BackfillConfigBuilder scanInterval(int value) { this.scanIntervalMs = value; return this; } @@ -587,6 +592,11 @@ public BackfillConfigBuilder endBlock(long value) { return this; } + public BackfillConfigBuilder perBlockProcessingTimeout(int value) { + this.perBlockProcessingTimeout = value; + return this; + } + public Map build() { if (backfillSourcePath == null || backfillSourcePath.isBlank()) { throw new IllegalStateException("backfillSourcePath is required"); @@ -595,13 +605,14 @@ public Map build() { return Map.of( "backfill.blockNodeSourcesPath", backfillSourcePath, "backfill.fetchBatchSize", String.valueOf(fetchBatchSize), - "backfill.delayBetweenBatchesMs", String.valueOf(delayBetweenBatchesMs), - "backfill.initialDelayMs", String.valueOf(initialDelayMs), - "backfill.initialRetryDelayMs", String.valueOf(initialRetryDelayMs), + "backfill.delayBetweenBatches", String.valueOf(delayBetweenBatches), + "backfill.initialDelay", String.valueOf(initialDelay), + "backfill.initialRetryDelay", String.valueOf(initialRetryDelay), "backfill.maxRetries", String.valueOf(maxRetries), - "backfill.scanIntervalMs", String.valueOf(scanIntervalMs), + "backfill.scanInterval", String.valueOf(scanIntervalMs), "backfill.startBlock", String.valueOf(startBlock), - "backfill.endBlock", String.valueOf(endBlock)); + "backfill.endBlock", String.valueOf(endBlock), + "backfill.perBlockProcessingTimeout", String.valueOf(perBlockProcessingTimeout)); } } } diff --git a/block-node/protobuf-pbj/build.gradle.kts b/block-node/protobuf-pbj/build.gradle.kts index 98ed3a6a5..953eb2bbb 100644 --- a/block-node/protobuf-pbj/build.gradle.kts +++ b/block-node/protobuf-pbj/build.gradle.kts @@ -3,7 +3,7 @@ plugins { id("org.hiero.gradle.module.library") // When upgrading pbjVersion, also need to update pbjVersion on // hiero-dependency-versions/build.gradle.kts - id("com.hedera.pbj.pbj-compiler") version "0.11.13" + id("com.hedera.pbj.pbj-compiler") version "0.11.14" } description = "Hiero Block Node Protobuf PBJ API" diff --git a/hiero-dependency-versions/build.gradle.kts b/hiero-dependency-versions/build.gradle.kts index 84c904718..260423171 100644 --- a/hiero-dependency-versions/build.gradle.kts +++ b/hiero-dependency-versions/build.gradle.kts @@ -17,7 +17,7 @@ dependencies.constraints { val helidonVersion = "4.2.3" // When Upgrading pbjVersion, also need to update pbjCompiler version on // block-node/protobuf-pbj/build.gradle.kts - val pbjVersion = "0.11.13" + val pbjVersion = "0.11.14" val protobufVersion = "4.31.1" val swirldsVersion = "0.61.3" val mockitoVersion = "5.19.0" @@ -36,6 +36,9 @@ dependencies.constraints { because("com.google.protobuf.util") } api("com.google.protobuf:protoc:$protobufVersion") { because("google.proto") } + api("com.hedera.pbj:pbj-grpc-client-helidon:${pbjVersion}") { + because("com.hedera.pbj.grpc.client.helidon") + } api("com.hedera.pbj:pbj-grpc-helidon:${pbjVersion}") { because("com.hedera.pbj.grpc.helidon") } api("com.hedera.pbj:pbj-grpc-helidon-config:${pbjVersion}") { because("com.hedera.pbj.grpc.helidon.config") From cbb1487e7c84c64efe85b2796585862e8f2ae2ee Mon Sep 17 00:00:00 2001 From: Alfredo Gutierrez Grajeda Date: Fri, 15 Aug 2025 16:46:00 -0600 Subject: [PATCH 2/7] small but important optimization removing vestigial class that is no longer used Signed-off-by: Alfredo Gutierrez Grajeda --- .../node/backfill/BackfillGrpcClient.java | 12 +- .../client/BlockNodeServerStatusClient.java | 138 ------------------ .../BlockStreamSubscribeUnparsedClient.java | 11 +- 3 files changed, 10 insertions(+), 151 deletions(-) delete mode 100644 block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeServerStatusClient.java diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java index da5cac3df..e17f9d49d 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java @@ -16,6 +16,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.hiero.block.api.ServerStatusRequest; +import org.hiero.block.api.ServerStatusResponse; import org.hiero.block.internal.BlockUnparsed; import org.hiero.block.node.backfill.client.BackfillSource; import org.hiero.block.node.backfill.client.BackfillSourceConfig; @@ -85,12 +86,11 @@ public BackfillGrpcClient( * @return a LongRange representing the intersection of the block range and the available blocks in the node. */ private LongRange getAvailableRangeInNode(BlockNodeClient node, LongRange blockRange) { - long firstAvailableBlock = node.getBlockNodeServiceClient() - .serverStatus(new ServerStatusRequest()) - .firstAvailableBlock(); - long lastAvailableBlock = node.getBlockNodeServiceClient() - .serverStatus(new ServerStatusRequest()) - .lastAvailableBlock(); + + final ServerStatusResponse nodeStatus = + node.getBlockNodeServiceClient().serverStatus(new ServerStatusRequest()); + long firstAvailableBlock = nodeStatus.firstAvailableBlock(); + long lastAvailableBlock = nodeStatus.lastAvailableBlock(); long start = blockRange.start(); long end = blockRange.end(); diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeServerStatusClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeServerStatusClient.java deleted file mode 100644 index e53761ab1..000000000 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeServerStatusClient.java +++ /dev/null @@ -1,138 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -package org.hiero.block.node.backfill.client; - -import static java.util.Objects.requireNonNull; - -import com.hedera.pbj.runtime.Codec; -import com.hedera.pbj.runtime.ParseException; -import com.hedera.pbj.runtime.io.buffer.Bytes; -import edu.umd.cs.findbugs.annotations.NonNull; -import io.grpc.MethodDescriptor; -import io.grpc.stub.StreamObserver; -import io.helidon.grpc.core.MarshallerSupplier; -import io.helidon.webclient.grpc.GrpcClient; -import io.helidon.webclient.grpc.GrpcClientMethodDescriptor; -import io.helidon.webclient.grpc.GrpcServiceClient; -import io.helidon.webclient.grpc.GrpcServiceDescriptor; -import java.io.IOException; -import java.io.InputStream; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; -import org.hiero.block.api.BlockNodeServiceInterface; -import org.hiero.block.api.ServerStatusRequest; -import org.hiero.block.api.ServerStatusResponse; - -public class BlockNodeServerStatusClient implements StreamObserver { - private final GrpcServiceClient serverStatusServiceClient; - private final String methodName = BlockNodeServiceInterface.BlockNodeServiceMethod.serverStatus.name(); - // Per Request State - private AtomicReference replyRef; - private AtomicReference errorRef; - private CountDownLatch latch; - - public BlockNodeServerStatusClient(final GrpcClient grpcClient) { - // create service client for server status - this.serverStatusServiceClient = grpcClient.serviceClient(GrpcServiceDescriptor.builder() - .serviceName(BlockNodeServiceInterface.FULL_NAME) - .putMethod( - methodName, - GrpcClientMethodDescriptor.unary(BlockNodeServiceInterface.FULL_NAME, methodName) - .requestType(ServerStatusRequest.class) - .responseType(ServerStatusResponse.class) - .marshallerSupplier( - new BlockNodeServerStatusClient.ServerStatusRequestResponseMarshaller - .Supplier()) - .build()) - .build()); - } - - public ServerStatusResponse getServerStatus() { - // reset state for the request - replyRef = new AtomicReference<>(); - errorRef = new AtomicReference<>(); - latch = new CountDownLatch(1); - // Call - serverStatusServiceClient.unary(methodName, new ServerStatusRequest(), this); - // wait for response or error - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - // Check for error - if (errorRef.get() != null) { - if (errorRef.get() instanceof RuntimeException re) { - throw re; - } - throw new RuntimeException(errorRef.get()); - } - // Check for reply - if (replyRef.get() != null) { - return replyRef.get(); - } - // If we reach here, it means we did not receive a reply or an error - throw new RuntimeException("Call to serverStatus completed w/o receiving a reply or an error explicitly."); - } - - @Override - public void onNext(ServerStatusResponse serverStatusResponse) { - if (replyRef.get() != null) { - throw new IllegalStateException( - "serverStatus is unary, but received more than one reply. The latest reply is: " - + serverStatusResponse); - } - replyRef.set(serverStatusResponse); - latch.countDown(); - } - - @Override - public void onError(Throwable throwable) { - errorRef.set(throwable); - latch.countDown(); - } - - @Override - public void onCompleted() { - latch.countDown(); - } - - public static class ServerStatusRequestResponseMarshaller implements MethodDescriptor.Marshaller { - private final Codec codec; - - ServerStatusRequestResponseMarshaller(@NonNull final Class clazz) { - requireNonNull(clazz); - - if (clazz == ServerStatusRequest.class) { - this.codec = (Codec) ServerStatusRequest.PROTOBUF; - } else if (clazz == ServerStatusResponse.class) { - this.codec = (Codec) ServerStatusResponse.PROTOBUF; - } else { - throw new IllegalArgumentException("Unsupported class: " + clazz.getName()); - } - } - - @Override - public InputStream stream(@NonNull final T obj) { - requireNonNull(obj); - return codec.toBytes(obj).toInputStream(); - } - - @Override - public T parse(@NonNull final InputStream inputStream) { - requireNonNull(inputStream); - - try (inputStream) { - return codec.parse(Bytes.wrap(inputStream.readAllBytes())); - } catch (final ParseException | IOException e) { - throw new RuntimeException(e); - } - } - - public static class Supplier implements MarshallerSupplier { - @Override - public MethodDescriptor.Marshaller get(@NonNull final Class clazz) { - return new ServerStatusRequestResponseMarshaller<>(clazz); - } - } - } -} diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java index 5f69fa318..2099480b8 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java @@ -15,7 +15,6 @@ import edu.umd.cs.findbugs.annotations.NonNull; import java.util.ArrayList; import java.util.List; -import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicLong; @@ -129,10 +128,8 @@ public void onSubscribe(Flow.Subscription subscription) { * If the last item is a block proof, it finalizes the current block, * adds it to the reply list, * and resets the current block items and number for the next block. - * If the response contains a status code, - * it sets the error reference if the code indicates an error. - * If the response does not contain block items or a status code, - * it sets an error indicating an unexpected response. + * If the response contains a status code, different from SUCCESS, + * it sets the error reference accordingly. * @param subscribeStreamResponse the response to process */ @Override @@ -213,7 +210,7 @@ public void onComplete() { private static Codec getSubscribeStreamRequestCodec( @NonNull final ServiceInterface.RequestOptions options) { - Objects.requireNonNull(options); + requireNonNull(options); // Default to protobuf, and don't error out if both are set: if (options.isJson() && !options.isProtobuf()) { @@ -226,7 +223,7 @@ private static Codec getSubscribeStreamRequestCodec( @NonNull private static Codec getSubscribeStreamResponseUnparsedCodec( @NonNull final ServiceInterface.RequestOptions options) { - Objects.requireNonNull(options); + requireNonNull(options); // Default to protobuf, and don't error out if both are set: if (options.isJson() && !options.isProtobuf()) { From 46490dc797bdea45b71b4bb4ab5d1b3de44f7352 Mon Sep 17 00:00:00 2001 From: Alfredo Gutierrez Grajeda Date: Fri, 15 Aug 2025 20:16:39 -0600 Subject: [PATCH 3/7] enabling actual parallel backfilling of multiple flow types and fixing misleading log removing unneeded modules Signed-off-by: Alfredo Gutierrez Grajeda --- .../backfill/src/main/java/module-info.java | 5 +---- .../block/node/backfill/BackfillPlugin.java | 22 ++++++++++++++----- .../node/backfill/client/BlockNodeClient.java | 1 + 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/block-node/backfill/src/main/java/module-info.java b/block-node/backfill/src/main/java/module-info.java index aa815f002..0b0026368 100644 --- a/block-node/backfill/src/main/java/module-info.java +++ b/block-node/backfill/src/main/java/module-info.java @@ -20,14 +20,11 @@ requires transitive com.swirlds.metrics.api; requires transitive org.hiero.block.node.spi; requires transitive org.hiero.block.protobuf.pbj; - requires transitive io.grpc.stub; - requires transitive io.grpc; - requires transitive io.helidon.grpc.core; - requires transitive io.helidon.webclient.grpc; requires com.hedera.pbj.grpc.client.helidon; requires org.hiero.block.node.base; requires io.helidon.common.tls; requires io.helidon.webclient.api; + requires io.helidon.webclient.grpc; requires org.antlr.antlr4.runtime; requires static transitive com.github.spotbugs.annotations; diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillPlugin.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillPlugin.java index 77bb67b5c..fd11754a2 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillPlugin.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillPlugin.java @@ -206,7 +206,8 @@ public void start() { TRACE, "Scheduling backfill process to start in {0} milliseconds", backfillConfiguration.initialDelay()); - scheduler = Executors.newSingleThreadScheduledExecutor(); + scheduler = Executors.newScheduledThreadPool( + 2); // Two threads: one for autonomous backfill, one for on-demand backfill scheduler.scheduleAtFixedRate( this::detectGaps, backfillConfiguration.initialDelay(), @@ -331,6 +332,11 @@ private void backfillGap(LongRange gap, BackfillType backfillType) throws Interr // to avoid deadlocks, since blocks that fail verification are not persisted getLatch(backfillType).set(new CountDownLatch(batchOfBlocks.size())); + if (batchOfBlocks.isEmpty()) { + LOGGER.log(TRACE, "No blocks fetched for gap {0}, skipping", chunk); + continue; // Skip empty batches + } + // Process each fetched block for (BlockUnparsed blockUnparsed : batchOfBlocks) { long blockNumber = extractBlockNumber(blockUnparsed); @@ -347,13 +353,13 @@ private void backfillGap(LongRange gap, BackfillType backfillType) throws Interr boolean backfillFinished = getLatch(backfillType).get().await(timeout, TimeUnit.MILLISECONDS); // Check if the backfill finished successfully - if (!backfillFinished) { + if (backfillFinished) { + // just log a victory message for each chunk + LOGGER.log(TRACE, "Successfully backfilled gap {0}", chunk); + } else { LOGGER.log(TRACE, "Backfill for gap {0} did not finish in time", chunk); backfillFetchErrors.increment(); // If it didn't finish, we will retry it later but move on to next chunk - } else { - // just log a victory message for each chunk - LOGGER.log(TRACE, "Successfully backfilled gap {0}", chunk); } // Cooldown between batches @@ -361,7 +367,11 @@ private void backfillGap(LongRange gap, BackfillType backfillType) throws Interr } LOGGER.log( - TRACE, "Completed backfilling of type {0} gap from {1} to {2}", backfillType, gap.start(), gap.end()); + TRACE, + "Completed backfilling task (does not mean success or failure, just completion) of type {0} gap from {1} to {2} ", + backfillType, + gap.start(), + gap.end()); if (backfillType.equals(BackfillType.ON_DEMAND)) { onDemandBackfillStartBlock.set(-1); // Reset on-demand start block after backfill } diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java index 627636e38..32005a49a 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java @@ -15,6 +15,7 @@ public class BlockNodeClient { // Options definition for all gRPC services in the block node client private record Options(Optional authority, String contentType) implements ServiceInterface.RequestOptions {} + private static final BlockNodeClient.Options OPTIONS = new BlockNodeClient.Options(Optional.empty(), ServiceInterface.RequestOptions.APPLICATION_GRPC); From ea1921f86d0ff414a3854395b93ebfb94a48860e Mon Sep 17 00:00:00 2001 From: Alfredo Gutierrez Grajeda Date: Sat, 16 Aug 2025 13:05:34 -0600 Subject: [PATCH 4/7] once enabled the multi-thread scheduler to be able to run On-Demand and Autonomous at the same time, found out that the connection to initiate a batch request most happen synchronously, so On-Demand should only have to wait for the current batch to be fetched before starting. Signed-off-by: Alfredo Gutierrez Grajeda --- .../client/BlockStreamSubscribeUnparsedClient.java | 11 +++++++++++ .../hiero/block/node/backfill/BackfillPluginTest.java | 6 ++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java index 2099480b8..9b392ea8c 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java @@ -75,6 +75,17 @@ public List getBatchOfBlocks(long startBlockNumber, long endBlock if (startBlockNumber < 0 || endBlockNumber < 0 || startBlockNumber > endBlockNumber) { throw new IllegalArgumentException("Invalid block range: " + startBlockNumber + " to " + endBlockNumber); } + // only start a new request if previous one is not in progress + if (latch != null && latch.getCount() > 0) { + // wait for response or error + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for blocks", e); + } + } + // reset state for the request currentBlockItems = new ArrayList<>(); currentBlockNumber = new AtomicLong(startBlockNumber); diff --git a/block-node/backfill/src/test/java/org/hiero/block/node/backfill/BackfillPluginTest.java b/block-node/backfill/src/test/java/org/hiero/block/node/backfill/BackfillPluginTest.java index 4c31da15b..48614c391 100644 --- a/block-node/backfill/src/test/java/org/hiero/block/node/backfill/BackfillPluginTest.java +++ b/block-node/backfill/src/test/java/org/hiero/block/node/backfill/BackfillPluginTest.java @@ -293,6 +293,8 @@ void testBackfillOnDemandWhileAutonomousBackfillRunning() throws InterruptedExce // config override for test final Map configOverride = BackfillConfigBuilder.NewBuilder() .backfillSourcePath(backfillSourcePath) + .initialDelay(100) // start quickly + .scanInterval(500000) // scan every 500 seconds .build(); // create a historical block facility for the plugin (should have a GAP) @@ -335,8 +337,8 @@ public void handleVerification(VerificationNotification notification) { false, "test-backfill-handler"); - boolean startOnDemand = latch1.await(1, TimeUnit.MINUTES); // Wait until latch1.countDown() is called - assertTrue(startOnDemand, "Should have started on-demand backfill while autonomous backfill is running"); + boolean startAutonomous = latch1.await(1, TimeUnit.MINUTES); // Wait until latch1.countDown() is called + assertTrue(startAutonomous, "Should have started on-demand backfill while autonomous backfill is running"); // Trigger the on-demand backfill by sending a NewestBlockKnownToNetworkNotification NewestBlockKnownToNetworkNotification newestBlockNotification = new NewestBlockKnownToNetworkNotification(200L); this.blockMessaging.sendNewestBlockKnownToNetwork(newestBlockNotification); From 123a494bdb9ad4378d2f1bb01c2c167462525e67 Mon Sep 17 00:00:00 2001 From: Alfredo Gutierrez Grajeda Date: Mon, 18 Aug 2025 12:18:55 -0600 Subject: [PATCH 5/7] PR Review Feedback Signed-off-by: Alfredo Gutierrez Grajeda --- .../node/backfill/BackfillConfiguration.java | 4 +- .../node/backfill/BackfillGrpcClient.java | 13 +- .../block/node/backfill/BackfillPlugin.java | 5 +- .../node/backfill/client/BlockNodeClient.java | 11 +- .../BlockStreamSubscribeUnparsedClient.java | 291 ++++++++---------- 5 files changed, 158 insertions(+), 166 deletions(-) diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillConfiguration.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillConfiguration.java index a4f55c07f..7bc6a6d1a 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillConfiguration.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillConfiguration.java @@ -21,6 +21,7 @@ * @param initialDelay Initial delay in seconds before starting the backfill process, to give time for the system to stabilize * @param perBlockProcessingTimeout Timeout in milliseconds for processing each block, to avoid blocking the backfill * process indefinitely in case something unexpected happens, this would allow for self-recovery + * @param grpcOverallTimeout single timeout configuration for gRPC Client construction, connectTimeout, readTimeout and pollWaitTime */ @ConfigData("backfill") public record BackfillConfiguration( @@ -33,4 +34,5 @@ public record BackfillConfiguration( @Loggable @ConfigProperty(defaultValue = "25") @Min(1) @Max(10_000) int fetchBatchSize, @Loggable @ConfigProperty(defaultValue = "1000") @Min(100) int delayBetweenBatches, @Loggable @ConfigProperty(defaultValue = "15000") @Min(5) int initialDelay, - @Loggable @ConfigProperty(defaultValue = "1000") @Min(500) int perBlockProcessingTimeout) {} + @Loggable @ConfigProperty(defaultValue = "1000") @Min(500) int perBlockProcessingTimeout, + @Loggable @ConfigProperty(defaultValue = "30000") @Min(10000) int grpcOverallTimeout) {} diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java index e17f9d49d..26a94ae02 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java @@ -53,6 +53,9 @@ public class BackfillGrpcClient { */ private final int initialRetryDelayMs; + /** Connection timeout in milliseconds for gRPC calls to block nodes. */ + private final int connectionTimeoutSeconds; + /** Current status of the Block Node Clients */ private ConcurrentHashMap nodeStatusMap = new ConcurrentHashMap<>(); /** @@ -67,12 +70,17 @@ public class BackfillGrpcClient { * @param blockNodePreferenceFilePath the path to the block node preference file */ public BackfillGrpcClient( - Path blockNodePreferenceFilePath, int maxRetries, Counter backfillRetriesCounter, int retryInitialDelayMs) + Path blockNodePreferenceFilePath, + int maxRetries, + Counter backfillRetriesCounter, + int retryInitialDelayMs, + int connectionTimeoutSeconds) throws IOException, ParseException { this.blockNodeSource = BackfillSource.JSON.parse(Bytes.wrap(Files.readAllBytes(blockNodePreferenceFilePath))); this.maxRetries = maxRetries; this.initialRetryDelayMs = retryInitialDelayMs; this.backfillRetries = backfillRetriesCounter; + this.connectionTimeoutSeconds = connectionTimeoutSeconds; for (BackfillSourceConfig node : blockNodeSource.nodes()) { LOGGER.log(INFO, "Address: {0}, Port: {1}, Priority: {2}", node.address(), node.port(), node.priority()); @@ -198,7 +206,8 @@ public List fetchBlocks(LongRange blockRange) { * @return a BlockNodeClient for the specified node */ private BlockNodeClient getNodeClient(BackfillSourceConfig node) { - return nodeClientMap.computeIfAbsent(node, BlockNodeClient::new); + return nodeClientMap.computeIfAbsent( + node, BlockNodeClient -> new BlockNodeClient(node, connectionTimeoutSeconds)); } /** diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillPlugin.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillPlugin.java index fd11754a2..668503288 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillPlugin.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillPlugin.java @@ -180,7 +180,8 @@ public void init(BlockNodeContext context, ServiceBuilder serviceBuilder) { blockNodeSourcesPath, backfillConfiguration.maxRetries(), this.backfillRetries, - backfillConfiguration.initialRetryDelay()); + backfillConfiguration.initialRetryDelay(), + backfillConfiguration.grpcOverallTimeout()); LOGGER.log(TRACE, "Initialized gRPC client with sources path: {0}", blockNodeSourcesPath); } catch (Exception e) { LOGGER.log(INFO, "Failed to initialize gRPC client: {0}", e.getMessage()); @@ -368,7 +369,7 @@ private void backfillGap(LongRange gap, BackfillType backfillType) throws Interr LOGGER.log( TRACE, - "Completed backfilling task (does not mean success or failure, just completion) of type {0} gap from {1} to {2} ", + "Completed backfilling task (completion only) of type {0} gap from {1} to {2} ", backfillType, gap.start(), gap.end()); diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java index 32005a49a..1ccb93620 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java @@ -20,7 +20,6 @@ private record Options(Optional authority, String contentType) implement new BlockNodeClient.Options(Optional.empty(), ServiceInterface.RequestOptions.APPLICATION_GRPC); // block node services - private final BlockStreamSubscribeUnparsedClient blockStreamSubscribeUnparsedClient; private final BlockNodeServiceInterface.BlockNodeServiceClient blockNodeServiceClient; @@ -29,20 +28,22 @@ private record Options(Optional authority, String contentType) implement * * @param blockNodeConfig the configuration for the block node, including address and port */ - public BlockNodeClient(BackfillSourceConfig blockNodeConfig) { + public BlockNodeClient(BackfillSourceConfig blockNodeConfig, int timeoutMs) { + + final Duration timeoutDuration = Duration.ofMillis(timeoutMs); final Tls tls = Tls.builder().enabled(false).build(); final PbjGrpcClientConfig grpcConfig = - new PbjGrpcClientConfig(Duration.ofSeconds(30), tls, Optional.of(""), "application/grpc"); + new PbjGrpcClientConfig(timeoutDuration, tls, Optional.of(""), "application/grpc"); final WebClient webClient = WebClient.builder() .baseUri("http://" + blockNodeConfig.address() + ":" + blockNodeConfig.port()) .tls(tls) .protocolConfigs(List.of(GrpcClientProtocolConfig.builder() .abortPollTimeExpired(false) - .pollWaitTime(Duration.ofSeconds(30)) + .pollWaitTime(timeoutDuration) .build())) - .connectTimeout(Duration.ofSeconds(30)) + .connectTimeout(timeoutDuration) .build(); PbjGrpcClient pbjGrpcClient = new PbjGrpcClient(webClient, grpcConfig); diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java index 9b392ea8c..8596d33fc 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java @@ -8,6 +8,7 @@ import com.hedera.hapi.block.stream.output.BlockHeader; import com.hedera.pbj.runtime.Codec; import com.hedera.pbj.runtime.ParseException; +import com.hedera.pbj.runtime.UncheckedParseException; import com.hedera.pbj.runtime.grpc.GrpcCall; import com.hedera.pbj.runtime.grpc.GrpcClient; import com.hedera.pbj.runtime.grpc.Pipeline; @@ -17,8 +18,6 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Flow; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import org.hiero.block.api.SubscribeStreamRequest; import org.hiero.block.api.SubscribeStreamResponse; import org.hiero.block.internal.BlockItemUnparsed; @@ -27,28 +26,26 @@ /** * Client for subscribing to block streams using unparsed responses. - * This client handles the subscription and processes incoming block items, - * accumulating them into blocks until a complete block is received. *

- * Might be thought as a similar to BlockNodeServiceInterface.BlockStreamSubscribeServiceClient - * but specifically for unparsed responses and with convenience abstractions for handling closed range - * requests and responses without dealing directly with streams and pipelines. + * This implementation is request-isolated: + * it does not keep per-request mutable state on {@code this}. Each call to + * {@link #getBatchOfBlocks(long, long)} creates a per-request {@code RequestContext} + * and a per-request {@link Pipeline} that closes over that context. + * This design prevents late callbacks from one request from mutating the state of another + * and avoids the need for per-field atomic types. + * + *

Thread-safety: The instance is stateless across requests. You may invoke + * {@code getBatchOfBlocks} concurrently from multiple threads; each invocation uses + * its own context and pipeline. */ -public class BlockStreamSubscribeUnparsedClient implements Pipeline { - // Logger +public class BlockStreamSubscribeUnparsedClient { + private static final System.Logger LOGGER = System.getLogger(BlockStreamSubscribeUnparsedClient.class.getName()); - // from constructor + // From constructor private final GrpcClient grpcClient; private final ServiceInterface.RequestOptions requestOptions; - // Per Request State - private List currentBlockItems; - private AtomicLong currentBlockNumber; - private AtomicReference> replyRef; - private AtomicReference errorRef; - private CountDownLatch latch; - public BlockStreamSubscribeUnparsedClient( @NonNull final GrpcClient grpcClient, @NonNull final ServiceInterface.RequestOptions requestOptions) { this.grpcClient = requireNonNull(grpcClient); @@ -56,173 +53,118 @@ public BlockStreamSubscribeUnparsedClient( } /** - * Subscribes to a batch of blocks from the block stream. - *

- * This method sends a request to subscribe to a range of blocks specified by the start and end block numbers. - * It waits for the whole response, accumulating block items into blocks, until the end block is reached or an error occurs. - * Then it returns a list of blocks that were received during the subscription. - *

- * meant to be used for closed ranges of blocks where start and end are both specified and used within a specific batch size - * that avoids overwhelming the client with too many blocks at once. and long wait times. + * Subscribes to a closed range of blocks and returns them as a list once the stream completes. * - * @param startBlockNumber the starting block number (inclusive) - * @param endBlockNumber the ending block number (inclusive) - * @return a list of blocks received during the subscription - * @throws IllegalArgumentException if the start or end block number is invalid or endBlock is less than startBlock + * @param startBlockNumber inclusive start + * @param endBlockNumber inclusive end + * @return list of received blocks (never {@code null}) + * @throws IllegalArgumentException on invalid range + * @throws RuntimeException on stream error or interruption */ public List getBatchOfBlocks(long startBlockNumber, long endBlockNumber) { // Validate input parameters if (startBlockNumber < 0 || endBlockNumber < 0 || startBlockNumber > endBlockNumber) { throw new IllegalArgumentException("Invalid block range: " + startBlockNumber + " to " + endBlockNumber); } - // only start a new request if previous one is not in progress - if (latch != null && latch.getCount() > 0) { - // wait for response or error - try { - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while waiting for blocks", e); - } - } - // reset state for the request - currentBlockItems = new ArrayList<>(); - currentBlockNumber = new AtomicLong(startBlockNumber); - replyRef = new AtomicReference<>(); - errorRef = new AtomicReference<>(); - latch = new CountDownLatch(1); + // Build per-request context + final RequestContext ctx = new RequestContext(startBlockNumber); + // Create request - SubscribeStreamRequest request = SubscribeStreamRequest.newBuilder() + final SubscribeStreamRequest request = SubscribeStreamRequest.newBuilder() .startBlockNumber(startBlockNumber) .endBlockNumber(endBlockNumber) .build(); - // Call - final GrpcCall call = grpcClient.createCall( - FULL_NAME + "/subscribeBlockStream", - getSubscribeStreamRequestCodec(requestOptions), - getSubscribeStreamResponseUnparsedCodec(requestOptions), - this); - call.sendRequest(request, true); - - // wait for response or error - try { - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while waiting for blocks", e); - } - if (errorRef.get() != null) { - throw new RuntimeException("Error fetching blocks", errorRef.get()); - } - return replyRef.get(); - } - /** - * Does nothing on subscription. - * @param subscription a new subscription - */ - @Override - public void onSubscribe(Flow.Subscription subscription) { - LOGGER.log(TRACE, "received onSubscribe confirmation"); - // No action needed on subscription - } + // Create a per-request pipeline that closes over `ctx` + final Pipeline pipeline = new Pipeline<>() { + @Override + public void onSubscribe(Flow.Subscription subscription) { + LOGGER.log(TRACE, "received onSubscribe confirmation"); + // No backpressure negotiation needed for this pattern. + } - /** - * Handles incoming SubscribeStreamResponseUnparsed messages. - * Accumulates block items into blocks and manages state transitions. - *

- * Processes the response by checking if it contains block items or status codes. - * If it contains block items, it checks if the first item is a block header, - * indicating the start of a new block. It verifies the block number and accumulates items - * into the current block. - * If the last item is a block proof, it finalizes the current block, - * adds it to the reply list, - * and resets the current block items and number for the next block. - * If the response contains a status code, different from SUCCESS, - * it sets the error reference accordingly. - * @param subscribeStreamResponse the response to process - */ - @Override - public void onNext(SubscribeStreamResponseUnparsed subscribeStreamResponse) { - if (subscribeStreamResponse.hasBlockItems()) { - List blockItems = - subscribeStreamResponse.blockItems().blockItems(); - // Check if is new Block - if (blockItems.getFirst().hasBlockHeader()) { - // verify is the expected block number - long expectedBlockNumber = currentBlockNumber.get(); - long actualBlockNumber = 0; - try { - actualBlockNumber = BlockHeader.PROTOBUF - .parse(blockItems.getFirst().blockHeaderOrThrow()) - .number(); - } catch (ParseException e) { - throw new RuntimeException(e); + @Override + public void onNext(SubscribeStreamResponseUnparsed subscribeStreamResponse) { + if (subscribeStreamResponse.hasBlockItems()) { + final List blockItems = + subscribeStreamResponse.blockItems().blockItems(); + + // Start of a new block + if (blockItems.getFirst().hasBlockHeader()) { + final long expected = ctx.expectedBlockNumber; + final long actual = extractBlockNumberFromBlockHeader(blockItems.getFirst()); + if (actual != expected) { + ctx.fail(new IllegalStateException( + "Expected block number " + expected + " but received " + actual)); + return; + } + // Begin a new block with the header and following items in this frame + ctx.currentBlockItems = new ArrayList<>(blockItems); + } else { + // Continuation of the current block + ctx.currentBlockItems.addAll(blockItems); + } + + // End of block + if (blockItems.getLast().hasBlockProof()) { + ctx.blocks.add(BlockUnparsed.newBuilder() + .blockItems(ctx.currentBlockItems) + .build()); + ctx.currentBlockItems = new ArrayList<>(); + ctx.expectedBlockNumber++; + } + + } else if (subscribeStreamResponse.hasStatus()) { + final SubscribeStreamResponse.Code code = subscribeStreamResponse.status(); + if (code != SubscribeStreamResponse.Code.SUCCESS) { + ctx.fail(new RuntimeException("Received error code: " + code)); + } + } else { + ctx.fail(new RuntimeException("Received unexpected response without block items or code")); } - if (actualBlockNumber != expectedBlockNumber) { - throw new IllegalStateException( - "Expected block number " + expectedBlockNumber + " but received " + actualBlockNumber); - } - // Create new Block and add to current block items - currentBlockItems = new ArrayList<>(blockItems); - } else { - // Add items to current block - currentBlockItems.addAll(blockItems); } - // Check if response contains block proof (end of block) - if (blockItems.getLast().hasBlockProof()) { - // Create Block from current items - BlockUnparsed block = - BlockUnparsed.newBuilder().blockItems(currentBlockItems).build(); - // Add to reply - List blocks = replyRef.get(); - if (blocks == null) { - blocks = new ArrayList<>(); - replyRef.set(blocks); - } - blocks.add(block); - // Reset current block items and number for next block - currentBlockItems = new ArrayList<>(); - currentBlockNumber.incrementAndGet(); + @Override + public void onError(Throwable throwable) { + LOGGER.log(TRACE, "received onError", throwable); + ctx.fail(throwable); } - } else if (subscribeStreamResponse.hasStatus()) { - // If response has code, set the status - SubscribeStreamResponse.Code codeStatus = subscribeStreamResponse.status(); - if (codeStatus != SubscribeStreamResponse.Code.SUCCESS) { - errorRef.set(new RuntimeException("Received error code: " + codeStatus)); + @Override + public void onComplete() { + LOGGER.log(TRACE, "received onComplete"); + ctx.complete(); } - } else { - // If no block items and no code, this is unexpected - errorRef.set(new RuntimeException("Received unexpected response without block items or code")); - } + }; + + // Issue the call using the per-request pipeline + final GrpcCall call = grpcClient.createCall( + FULL_NAME + "/subscribeBlockStream", + getSubscribeStreamRequestCodec(requestOptions), + getSubscribeStreamResponseUnparsedCodec(requestOptions), + pipeline); + + call.sendRequest(request, true); + + // Wait for completion or error and return the blocks + return ctx.await(); } /** - * Handles errors during the subscription. - * Sets the error reference and releases the latch to signal completion. - * @param throwable the error encountered + * Extracts the block number from a block header item. */ - @Override - public void onError(Throwable throwable) { - LOGGER.log(TRACE, "received onError", throwable); - errorRef.set(throwable); - replyRef.set(null); - latch.countDown(); - } - - @Override - public void onComplete() { - LOGGER.log(TRACE, "received onComplete"); - latch.countDown(); + private static long extractBlockNumberFromBlockHeader(BlockItemUnparsed itemUnparsed) { + try { + return BlockHeader.PROTOBUF.parse(itemUnparsed.blockHeaderOrThrow()).number(); + } catch (ParseException e) { + throw new UncheckedParseException(e); + } } private static Codec getSubscribeStreamRequestCodec( @NonNull final ServiceInterface.RequestOptions options) { requireNonNull(options); - // Default to protobuf, and don't error out if both are set: if (options.isJson() && !options.isProtobuf()) { return SubscribeStreamRequest.JSON; @@ -235,7 +177,6 @@ private static Codec getSubscribeStreamRequestCodec( private static Codec getSubscribeStreamResponseUnparsedCodec( @NonNull final ServiceInterface.RequestOptions options) { requireNonNull(options); - // Default to protobuf, and don't error out if both are set: if (options.isJson() && !options.isProtobuf()) { return SubscribeStreamResponseUnparsed.JSON; @@ -243,4 +184,42 @@ private static Codec getSubscribeStreamResponse return SubscribeStreamResponseUnparsed.PROTOBUF; } } + + /** + * Per-request state holder. All fields are confined to a single request. + * The {@link CountDownLatch} establishes happens-before from callback threads to the waiter. + */ + private static final class RequestContext { + final CountDownLatch done = new CountDownLatch(1); + final List blocks = new ArrayList<>(); + long expectedBlockNumber; + List currentBlockItems = new ArrayList<>(); + Throwable error; + + RequestContext(long startBlock) { + this.expectedBlockNumber = startBlock; + } + + void fail(Throwable t) { + this.error = t; + done.countDown(); + } + + void complete() { + done.countDown(); + } + + List await() { + try { + done.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for blocks", e); + } + if (error != null) { + throw new RuntimeException("Error fetching blocks", error); + } + return blocks; + } + } } From 3757c75fc913a97373c9dca1b0054b63cf4663ef Mon Sep 17 00:00:00 2001 From: Alfredo Gutierrez Grajeda Date: Tue, 19 Aug 2025 10:56:00 -0600 Subject: [PATCH 6/7] Optimization improvements over the PR Review process Signed-off-by: Alfredo Gutierrez Grajeda --- .../node/backfill/BackfillConfiguration.java | 4 +- .../node/backfill/BackfillGrpcClient.java | 10 +- .../block/node/backfill/BackfillPlugin.java | 6 +- .../node/backfill/client/BlockNodeClient.java | 7 +- .../BlockStreamSubscribeUnparsedClient.java | 177 ++++++++---------- 5 files changed, 98 insertions(+), 106 deletions(-) diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillConfiguration.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillConfiguration.java index 7bc6a6d1a..847549e8a 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillConfiguration.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillConfiguration.java @@ -22,6 +22,7 @@ * @param perBlockProcessingTimeout Timeout in milliseconds for processing each block, to avoid blocking the backfill * process indefinitely in case something unexpected happens, this would allow for self-recovery * @param grpcOverallTimeout single timeout configuration for gRPC Client construction, connectTimeout, readTimeout and pollWaitTime + * @param enableTLS if enabled will assume block-node client supports tls connection. */ @ConfigData("backfill") public record BackfillConfiguration( @@ -35,4 +36,5 @@ public record BackfillConfiguration( @Loggable @ConfigProperty(defaultValue = "1000") @Min(100) int delayBetweenBatches, @Loggable @ConfigProperty(defaultValue = "15000") @Min(5) int initialDelay, @Loggable @ConfigProperty(defaultValue = "1000") @Min(500) int perBlockProcessingTimeout, - @Loggable @ConfigProperty(defaultValue = "30000") @Min(10000) int grpcOverallTimeout) {} + @Loggable @ConfigProperty(defaultValue = "30000") @Min(10000) int grpcOverallTimeout, + @Loggable @ConfigProperty(defaultValue = "false") boolean enableTLS) {} diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java index 26a94ae02..27f18c8e8 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillGrpcClient.java @@ -52,10 +52,10 @@ public class BackfillGrpcClient { * This is used for exponential backoff in case of failures. */ private final int initialRetryDelayMs; - /** Connection timeout in milliseconds for gRPC calls to block nodes. */ private final int connectionTimeoutSeconds; - + /** Enable TLS for secure connections to block nodes. */ + private final boolean enableTls; /** Current status of the Block Node Clients */ private ConcurrentHashMap nodeStatusMap = new ConcurrentHashMap<>(); /** @@ -74,13 +74,15 @@ public BackfillGrpcClient( int maxRetries, Counter backfillRetriesCounter, int retryInitialDelayMs, - int connectionTimeoutSeconds) + int connectionTimeoutSeconds, + boolean enableTls) throws IOException, ParseException { this.blockNodeSource = BackfillSource.JSON.parse(Bytes.wrap(Files.readAllBytes(blockNodePreferenceFilePath))); this.maxRetries = maxRetries; this.initialRetryDelayMs = retryInitialDelayMs; this.backfillRetries = backfillRetriesCounter; this.connectionTimeoutSeconds = connectionTimeoutSeconds; + this.enableTls = enableTls; for (BackfillSourceConfig node : blockNodeSource.nodes()) { LOGGER.log(INFO, "Address: {0}, Port: {1}, Priority: {2}", node.address(), node.port(), node.priority()); @@ -207,7 +209,7 @@ public List fetchBlocks(LongRange blockRange) { */ private BlockNodeClient getNodeClient(BackfillSourceConfig node) { return nodeClientMap.computeIfAbsent( - node, BlockNodeClient -> new BlockNodeClient(node, connectionTimeoutSeconds)); + node, BlockNodeClient -> new BlockNodeClient(node, connectionTimeoutSeconds, enableTls)); } /** diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillPlugin.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillPlugin.java index 668503288..fda366b00 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillPlugin.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/BackfillPlugin.java @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 package org.hiero.block.node.backfill; +import static java.lang.System.Logger.Level.DEBUG; import static java.lang.System.Logger.Level.INFO; import static java.lang.System.Logger.Level.TRACE; @@ -181,7 +182,8 @@ public void init(BlockNodeContext context, ServiceBuilder serviceBuilder) { backfillConfiguration.maxRetries(), this.backfillRetries, backfillConfiguration.initialRetryDelay(), - backfillConfiguration.grpcOverallTimeout()); + backfillConfiguration.grpcOverallTimeout(), + backfillConfiguration.enableTLS()); LOGGER.log(TRACE, "Initialized gRPC client with sources path: {0}", blockNodeSourcesPath); } catch (Exception e) { LOGGER.log(INFO, "Failed to initialize gRPC client: {0}", e.getMessage()); @@ -334,7 +336,7 @@ private void backfillGap(LongRange gap, BackfillType backfillType) throws Interr getLatch(backfillType).set(new CountDownLatch(batchOfBlocks.size())); if (batchOfBlocks.isEmpty()) { - LOGGER.log(TRACE, "No blocks fetched for gap {0}, skipping", chunk); + LOGGER.log(DEBUG, "No blocks fetched for gap {0}, skipping", chunk); continue; // Skip empty batches } diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java index 1ccb93620..c22cc9eb2 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java @@ -14,7 +14,8 @@ public class BlockNodeClient { // Options definition for all gRPC services in the block node client - private record Options(Optional authority, String contentType) implements ServiceInterface.RequestOptions {} + private static record Options(Optional authority, String contentType) + implements ServiceInterface.RequestOptions {} private static final BlockNodeClient.Options OPTIONS = new BlockNodeClient.Options(Optional.empty(), ServiceInterface.RequestOptions.APPLICATION_GRPC); @@ -28,11 +29,11 @@ private record Options(Optional authority, String contentType) implement * * @param blockNodeConfig the configuration for the block node, including address and port */ - public BlockNodeClient(BackfillSourceConfig blockNodeConfig, int timeoutMs) { + public BlockNodeClient(BackfillSourceConfig blockNodeConfig, int timeoutMs, boolean enableTls) { final Duration timeoutDuration = Duration.ofMillis(timeoutMs); - final Tls tls = Tls.builder().enabled(false).build(); + final Tls tls = Tls.builder().enabled(enableTls).build(); final PbjGrpcClientConfig grpcConfig = new PbjGrpcClientConfig(timeoutDuration, tls, Optional.of(""), "application/grpc"); diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java index 8596d33fc..bad7a0c7e 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java @@ -1,14 +1,13 @@ // SPDX-License-Identifier: Apache-2.0 package org.hiero.block.node.backfill.client; +import static java.lang.System.Logger.Level.DEBUG; import static java.lang.System.Logger.Level.TRACE; import static java.util.Objects.requireNonNull; import static org.hiero.block.api.BlockStreamSubscribeServiceInterface.FULL_NAME; import com.hedera.hapi.block.stream.output.BlockHeader; -import com.hedera.pbj.runtime.Codec; import com.hedera.pbj.runtime.ParseException; -import com.hedera.pbj.runtime.UncheckedParseException; import com.hedera.pbj.runtime.grpc.GrpcCall; import com.hedera.pbj.runtime.grpc.GrpcClient; import com.hedera.pbj.runtime.grpc.Pipeline; @@ -44,12 +43,11 @@ public class BlockStreamSubscribeUnparsedClient { // From constructor private final GrpcClient grpcClient; - private final ServiceInterface.RequestOptions requestOptions; public BlockStreamSubscribeUnparsedClient( @NonNull final GrpcClient grpcClient, @NonNull final ServiceInterface.RequestOptions requestOptions) { this.grpcClient = requireNonNull(grpcClient); - this.requestOptions = requireNonNull(requestOptions); + ServiceInterface.RequestOptions requestOptions1 = requireNonNull(requestOptions); } /** @@ -77,72 +75,13 @@ public List getBatchOfBlocks(long startBlockNumber, long endBlock .build(); // Create a per-request pipeline that closes over `ctx` - final Pipeline pipeline = new Pipeline<>() { - @Override - public void onSubscribe(Flow.Subscription subscription) { - LOGGER.log(TRACE, "received onSubscribe confirmation"); - // No backpressure negotiation needed for this pattern. - } - - @Override - public void onNext(SubscribeStreamResponseUnparsed subscribeStreamResponse) { - if (subscribeStreamResponse.hasBlockItems()) { - final List blockItems = - subscribeStreamResponse.blockItems().blockItems(); - - // Start of a new block - if (blockItems.getFirst().hasBlockHeader()) { - final long expected = ctx.expectedBlockNumber; - final long actual = extractBlockNumberFromBlockHeader(blockItems.getFirst()); - if (actual != expected) { - ctx.fail(new IllegalStateException( - "Expected block number " + expected + " but received " + actual)); - return; - } - // Begin a new block with the header and following items in this frame - ctx.currentBlockItems = new ArrayList<>(blockItems); - } else { - // Continuation of the current block - ctx.currentBlockItems.addAll(blockItems); - } - - // End of block - if (blockItems.getLast().hasBlockProof()) { - ctx.blocks.add(BlockUnparsed.newBuilder() - .blockItems(ctx.currentBlockItems) - .build()); - ctx.currentBlockItems = new ArrayList<>(); - ctx.expectedBlockNumber++; - } - - } else if (subscribeStreamResponse.hasStatus()) { - final SubscribeStreamResponse.Code code = subscribeStreamResponse.status(); - if (code != SubscribeStreamResponse.Code.SUCCESS) { - ctx.fail(new RuntimeException("Received error code: " + code)); - } - } else { - ctx.fail(new RuntimeException("Received unexpected response without block items or code")); - } - } - - @Override - public void onError(Throwable throwable) { - LOGGER.log(TRACE, "received onError", throwable); - ctx.fail(throwable); - } - - @Override - public void onComplete() { - LOGGER.log(TRACE, "received onComplete"); - ctx.complete(); - } - }; + final Pipeline pipeline = new SubscribePipeline(ctx); // Issue the call using the per-request pipeline final GrpcCall call = grpcClient.createCall( FULL_NAME + "/subscribeBlockStream", - getSubscribeStreamRequestCodec(requestOptions), - getSubscribeStreamResponseUnparsedCodec(requestOptions), + SubscribeStreamRequest.PROTOBUF, + SubscribeStreamResponseUnparsed.PROTOBUF, pipeline); call.sendRequest(request, true); @@ -154,35 +93,8 @@ public void onComplete() { /** * Extracts the block number from a block header item. */ - private static long extractBlockNumberFromBlockHeader(BlockItemUnparsed itemUnparsed) { - try { - return BlockHeader.PROTOBUF.parse(itemUnparsed.blockHeaderOrThrow()).number(); - } catch (ParseException e) { - throw new UncheckedParseException(e); - } - } - - private static Codec getSubscribeStreamRequestCodec( - @NonNull final ServiceInterface.RequestOptions options) { - requireNonNull(options); - // Default to protobuf, and don't error out if both are set: - if (options.isJson() && !options.isProtobuf()) { - return SubscribeStreamRequest.JSON; - } else { - return SubscribeStreamRequest.PROTOBUF; - } - } - - @NonNull - private static Codec getSubscribeStreamResponseUnparsedCodec( - @NonNull final ServiceInterface.RequestOptions options) { - requireNonNull(options); - // Default to protobuf, and don't error out if both are set: - if (options.isJson() && !options.isProtobuf()) { - return SubscribeStreamResponseUnparsed.JSON; - } else { - return SubscribeStreamResponseUnparsed.PROTOBUF; - } + private static long extractBlockNumberFromBlockHeader(BlockItemUnparsed itemUnparsed) throws ParseException { + return BlockHeader.PROTOBUF.parse(itemUnparsed.blockHeaderOrThrow()).number(); } /** @@ -214,7 +126,6 @@ List await() { done.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while waiting for blocks", e); } if (error != null) { throw new RuntimeException("Error fetching blocks", error); @@ -222,4 +133,78 @@ List await() { return blocks; } } + + private static final class SubscribePipeline implements Pipeline { + private final RequestContext ctx; + + SubscribePipeline(RequestContext ctx) { + this.ctx = ctx; + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + LOGGER.log(TRACE, "received onSubscribe confirmation"); + // No backpressure negotiation needed for this pattern. + } + + @Override + public void onNext(SubscribeStreamResponseUnparsed resp) { + try { + if (resp.hasBlockItems()) { + final List frame = resp.blockItems().blockItems(); + + if (frame.getFirst().hasBlockHeader()) { + final long expected = ctx.expectedBlockNumber; + final long actual = extractBlockNumberFromBlockHeader(frame.getFirst()); + if (actual != expected) { + ctx.fail(new IllegalStateException( + "Expected block number " + expected + " but received " + actual)); + return; + } + // Start a new block: reuse the buffer and populate it. + ctx.currentBlockItems.clear(); + ctx.currentBlockItems.addAll(frame); + } else { + // Continuation: append to the same buffer. + ctx.currentBlockItems.addAll(frame); + } + + if (frame.getLast().hasBlockProof()) { + // Snapshot the current items to avoid retaining the large buffer in the finished block. + final List snapshot = List.copyOf(ctx.currentBlockItems); + ctx.blocks.add( + BlockUnparsed.newBuilder().blockItems(snapshot).build()); + ctx.currentBlockItems.clear(); + ctx.expectedBlockNumber++; + } + + } else if (resp.hasStatus()) { + final SubscribeStreamResponse.Code code = resp.status(); + if (code != SubscribeStreamResponse.Code.SUCCESS) { + ctx.fail(new RuntimeException("Received error code: " + code)); + } + } else { + ctx.fail(new RuntimeException("Received unexpected response without block items or code")); + } + } catch (ParseException e) { + LOGGER.log(DEBUG, "Parse error in block item", e); + ctx.fail(e); + } catch (RuntimeException e) { + LOGGER.log(DEBUG, "Runtime error processing SubscribeStreamResponseUnparsed", e); + ctx.fail(e); + } + } + + @Override + public void onError(Throwable throwable) { + LOGGER.log(TRACE, "received onError", throwable); + ctx.fail(throwable); + } + + @Override + public void onComplete() { + LOGGER.log(TRACE, "received onComplete"); + ctx.complete(); + } + } } From ff1c5a6c69be7c491094500b0ad4397a1dfd40b9 Mon Sep 17 00:00:00 2001 From: Alfredo Gutierrez Grajeda Date: Tue, 19 Aug 2025 11:00:25 -0600 Subject: [PATCH 7/7] further cleanup Signed-off-by: Alfredo Gutierrez Grajeda --- .../node/backfill/client/BlockNodeClient.java | 2 +- .../BlockStreamSubscribeUnparsedClient.java | 20 +++++++++++++++---- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java index c22cc9eb2..9e1b9981e 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockNodeClient.java @@ -51,7 +51,7 @@ public BlockNodeClient(BackfillSourceConfig blockNodeConfig, int timeoutMs, bool // we reuse the host connection with many services. blockNodeServiceClient = new BlockNodeServiceInterface.BlockNodeServiceClient(pbjGrpcClient, OPTIONS); - this.blockStreamSubscribeUnparsedClient = new BlockStreamSubscribeUnparsedClient(pbjGrpcClient, OPTIONS); + this.blockStreamSubscribeUnparsedClient = new BlockStreamSubscribeUnparsedClient(pbjGrpcClient); } /** diff --git a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java index bad7a0c7e..0636f1203 100644 --- a/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java +++ b/block-node/backfill/src/main/java/org/hiero/block/node/backfill/client/BlockStreamSubscribeUnparsedClient.java @@ -11,7 +11,6 @@ import com.hedera.pbj.runtime.grpc.GrpcCall; import com.hedera.pbj.runtime.grpc.GrpcClient; import com.hedera.pbj.runtime.grpc.Pipeline; -import com.hedera.pbj.runtime.grpc.ServiceInterface; import edu.umd.cs.findbugs.annotations.NonNull; import java.util.ArrayList; import java.util.List; @@ -44,10 +43,13 @@ public class BlockStreamSubscribeUnparsedClient { // From constructor private final GrpcClient grpcClient; - public BlockStreamSubscribeUnparsedClient( - @NonNull final GrpcClient grpcClient, @NonNull final ServiceInterface.RequestOptions requestOptions) { + /** + * Constructs a new client for subscribing to block streams. + * + * @param grpcClient the gRPC client to use for communication + */ + public BlockStreamSubscribeUnparsedClient(@NonNull final GrpcClient grpcClient) { this.grpcClient = requireNonNull(grpcClient); - ServiceInterface.RequestOptions requestOptions1 = requireNonNull(requestOptions); } /** @@ -134,9 +136,19 @@ List await() { } } + /** + * Pipeline implementation for handling responses from the block stream subscription. + * It processes incoming {@link SubscribeStreamResponseUnparsed} messages and manages + * the state of the request context. + */ private static final class SubscribePipeline implements Pipeline { private final RequestContext ctx; + /** + * Constructs a new pipeline for processing block stream subscription responses. + * + * @param ctx the request context to manage state across callbacks + */ SubscribePipeline(RequestContext ctx) { this.ctx = ctx; }