Skip to content

Commit dbc0b0c

Browse files
committed
initial refactor of batching logic
Signed-off-by: Atanas Atanasov <a.v.atanasov98@gmail.com>
1 parent 4dfc44a commit dbc0b0c

File tree

4 files changed

+251
-151
lines changed

4 files changed

+251
-151
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package org.hiero.block.node.app.fixtures.blocks;
3+
4+
import com.hedera.hapi.block.stream.Block;
5+
import com.hedera.hapi.block.stream.BlockItem;
6+
import java.util.List;
7+
import java.util.Objects;
8+
import org.hiero.block.node.spi.historicalblocks.BlockAccessor;
9+
10+
/**
11+
* A simple in-memory block accessor class. This class is intended to be used
12+
* for testing purposes only!
13+
*/
14+
public final class InMemoryBlockAccessor implements BlockAccessor {
15+
/** Internal in-memory reference of the accessible block */
16+
private final Block block;
17+
/** The block number of the accessible block */
18+
private final long blockNumber;
19+
20+
/**
21+
* Constructor.
22+
*
23+
* @param blockItems of the block, must not be empty, first item must be a
24+
* {@link com.hedera.hapi.block.stream.output.BlockHeader} and last item
25+
* must be a {@link com.hedera.hapi.block.stream.BlockProof}.
26+
* @throws IllegalArgumentException if preconditions are not met
27+
*/
28+
public InMemoryBlockAccessor(final List<BlockItem> blockItems) {
29+
if (blockItems.isEmpty()) {
30+
throw new IllegalArgumentException("Block items list cannot be empty");
31+
}
32+
// Ensure the first item is a BlockHeader
33+
if (!blockItems.getFirst().hasBlockHeader()) {
34+
throw new IllegalArgumentException("First block item must be a BlockHeader");
35+
}
36+
// Ensure the last item is a BlockProof
37+
if (!blockItems.getLast().hasBlockProof()) {
38+
throw new IllegalArgumentException("Last block item must be a BlockProof");
39+
}
40+
// Create a Block from the provided block items
41+
block = Block.newBuilder().items(blockItems).build();
42+
blockNumber =
43+
Objects.requireNonNull(block.items().getFirst().blockHeader()).number();
44+
}
45+
46+
/**
47+
* {@inheritDoc}
48+
*/
49+
@Override
50+
public long blockNumber() {
51+
return blockNumber;
52+
}
53+
54+
/**
55+
* {@inheritDoc}
56+
*/
57+
@Override
58+
public Block block() {
59+
return block;
60+
}
61+
}

block-node/block-providers/files.historic/src/main/java/org/hiero/block/node/blocks/files/historic/BlocksFilesHistoricPlugin.java

Lines changed: 87 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.io.IOException;
99
import java.lang.System.Logger.Level;
1010
import java.nio.file.Files;
11+
import java.util.ArrayList;
1112
import java.util.List;
1213
import java.util.Objects;
1314
import java.util.concurrent.CopyOnWriteArrayList;
@@ -21,6 +22,7 @@
2122
import org.hiero.block.node.spi.historicalblocks.BlockAccessor;
2223
import org.hiero.block.node.spi.historicalblocks.BlockProviderPlugin;
2324
import org.hiero.block.node.spi.historicalblocks.BlockRangeSet;
25+
import org.hiero.block.node.spi.historicalblocks.HistoricalBlockFacility;
2426
import org.hiero.block.node.spi.historicalblocks.LongRange;
2527

2628
/**
@@ -195,9 +197,13 @@ public BlockRangeSet availableBlocks() {
195197
*/
196198
@Override
197199
public void handlePersisted(PersistedNotification notification) {
200+
// all operations here until we release the caller thread are queued up
201+
// by the block messaging facility, so we can safely perform
202+
// calculations and decide if we will submit or not a task
198203
if (notification.blockProviderPriority() > defaultPriority()) {
199204
attemptZipping();
200-
} // todo this is not enough of an assertion that the blocks will be coming from the right place
205+
}
206+
// @todo(1069) this is not enough of an assertion that the blocks will be coming from the right place
201207
// as notifications are async and things can happen, when we get the accessors later, we should
202208
// be able to get accessors only from places that have higher priority than us. We should probably
203209
// have that as a feature in the block accessor api. (meaning we should be able to query the
@@ -207,12 +213,20 @@ public void handlePersisted(PersistedNotification notification) {
207213
// ==== Private Methods ============================================================================================
208214
private void attemptZipping() {
209215
// compute the min and max block in next batch to zip
210-
long minBlockNumber = availableBlocks().max() + 1;
216+
// since we ensure no gaps in the zip file are possible, and also we
217+
// have a power of 10 number of blocks per zip file, it is safe to
218+
// simply add +1 to the latest available block number and have that as
219+
// the start of the next batch of blocks to zip
220+
long minBlockNumber = availableBlocks.max() + 1;
211221
long maxBlockNumber = minBlockNumber + numberOfBlocksPerZipFile - 1;
212-
// check all those blocks are available
222+
// make sure the historical block facility has a higher max than our
223+
// desired batch max
213224
final BlockRangeSet historicalAvailable =
214225
context.historicalBlockProvider().availableBlocks();
215226
// while we can zip blocks, we must keep zipping
227+
// we loop here because the historical block facility can have
228+
// multiple batches of blocks available for zipping potentially, so we
229+
// need to queue them all up
216230
while (historicalAvailable.max() >= maxBlockNumber) {
217231
if (historicalAvailable.contains(minBlockNumber, maxBlockNumber)) {
218232
final LongRange batchRange = new LongRange(minBlockNumber, maxBlockNumber);
@@ -234,17 +248,19 @@ private void attemptZipping() {
234248
private void startMovingBatchOfBlocksToZipFile(final LongRange batchRange) {
235249
// check if the batch of blocks is already in progress
236250
if (inProgressZipRanges.contains(batchRange)) {
251+
// if the batch is in progress, we must not submit a task
237252
LOGGER.log(
238253
System.Logger.Level.DEBUG,
239254
"Batch of blocks[{%1} -> {%2}] is already in progress",
240255
batchRange.start(),
241256
batchRange.end());
242-
return;
257+
} else {
258+
// if the batch is not in progress, we must submit a task
259+
// add the batch of blocks to the in progress ranges
260+
inProgressZipRanges.add(batchRange);
261+
// move the batch of blocks to a zip file (submit a task)
262+
zipMoveExecutorService.submit(() -> moveBatchOfBlocksToZipFile(batchRange));
243263
}
244-
// add the batch of blocks to the in progress ranges
245-
inProgressZipRanges.add(batchRange);
246-
// move the batch of blocks to a zip file
247-
zipMoveExecutorService.submit(() -> moveBatchOfBlocksToZipFile(batchRange));
248264
}
249265

250266
/**
@@ -253,47 +269,70 @@ private void startMovingBatchOfBlocksToZipFile(final LongRange batchRange) {
253269
* @param batchRange The range of blocks to move to zip file.
254270
*/
255271
private void moveBatchOfBlocksToZipFile(final LongRange batchRange) {
256-
final long batchFirstBlockNumber = batchRange.start();
257-
final long batchLastBlockNumber = batchRange.end();
258-
// move the batch of blocks to a zip file
259272
try {
260-
LOGGER.log(
261-
System.Logger.Level.DEBUG,
262-
"Moving batch of blocks[%d -> %d] to zip file",
263-
batchFirstBlockNumber,
264-
batchLastBlockNumber);
265-
266-
// Write the zip file and get result with file size
267-
final long zipFileSize = zipBlockArchive.writeNewZipFile(batchFirstBlockNumber);
268-
// Metrics updates
269-
// Update total bytes stored with the new zip file size
270-
totalBytesStored.addAndGet(zipFileSize);
271-
// Increment the blocks written counter
272-
long blockCount = batchLastBlockNumber - batchFirstBlockNumber + 1;
273-
blocksWrittenCounter.add(blockCount);
274-
} catch (final IOException e) {
275-
LOGGER.log(
276-
System.Logger.Level.ERROR,
277-
"Failed to move batch of blocks[" + batchFirstBlockNumber + " -> " + batchLastBlockNumber
278-
+ "] to zip file",
279-
e);
280-
return;
273+
// first off, let's create our batch of blocks
274+
final long batchFirstBlockNumber = batchRange.start();
275+
final long batchLastBlockNumber = batchRange.end();
276+
final List<BlockAccessor> batch = new ArrayList<>(numberOfBlocksPerZipFile);
277+
final HistoricalBlockFacility historicalBlockFacility = context.historicalBlockProvider();
278+
// gather batch, if there are no gaps, then we can proceed with zipping
279+
for (long blockNumber = batchFirstBlockNumber; blockNumber <= batchLastBlockNumber; blockNumber++) {
280+
final BlockAccessor currentAccessor = historicalBlockFacility.block(blockNumber);
281+
if (currentAccessor == null) {
282+
break;
283+
} else {
284+
batch.add(currentAccessor);
285+
}
286+
}
287+
// if there are any gaps, then we cannot zip the batch
288+
if (batch.size() != numberOfBlocksPerZipFile) {
289+
// we have a gap in the batch, so we cannot zip it
290+
LOGGER.log(
291+
System.Logger.Level.DEBUG,
292+
"Batch of blocks[%d -> %d] has a gap, skipping zipping",
293+
batchFirstBlockNumber,
294+
batchLastBlockNumber);
295+
} else {
296+
// move the batch of blocks to a zip file
297+
try {
298+
LOGGER.log(
299+
System.Logger.Level.DEBUG,
300+
"Moving batch of blocks[%d -> %d] to zip file",
301+
batchFirstBlockNumber,
302+
batchLastBlockNumber);
303+
// Write the zip file and get result with file size
304+
final long zipFileSize = zipBlockArchive.writeNewZipFile(batch);
305+
// Metrics updates
306+
// Update total bytes stored with the new zip file size
307+
totalBytesStored.addAndGet(zipFileSize);
308+
// Increment the blocks written counter
309+
blocksWrittenCounter.add(numberOfBlocksPerZipFile);
310+
} catch (final IOException e) {
311+
LOGGER.log(
312+
System.Logger.Level.ERROR,
313+
"Failed to move batch of blocks[" + batchFirstBlockNumber + " -> " + batchLastBlockNumber
314+
+ "] to zip file",
315+
e);
316+
return;
317+
}
318+
// if we have reached here, then the batch of blocks has been zipped,
319+
// now we need to make some updates
320+
// update the first and last block numbers
321+
availableBlocks.add(batchFirstBlockNumber, batchLastBlockNumber);
322+
// log done
323+
LOGGER.log(
324+
Level.INFO,
325+
"Moved batch of blocks[%d -> %d] to zip file",
326+
batchFirstBlockNumber,
327+
batchLastBlockNumber);
328+
// now all the blocks are in the zip file and accessible, send notification
329+
context.blockMessaging()
330+
.sendBlockPersisted(new PersistedNotification(
331+
batchFirstBlockNumber, batchLastBlockNumber, defaultPriority()));
332+
}
333+
} finally {
334+
// always make sure to remove the batch of blocks from in progress ranges
335+
inProgressZipRanges.remove(batchRange);
281336
}
282-
// if we have reached here, then the batch of blocks has been zipped,
283-
// now we need to make some updates
284-
// update the first and last block numbers
285-
availableBlocks.add(batchFirstBlockNumber, batchLastBlockNumber);
286-
// log done
287-
LOGGER.log(
288-
System.Logger.Level.INFO,
289-
"Moved batch of blocks[%d -> %d] to zip file",
290-
batchFirstBlockNumber,
291-
batchLastBlockNumber);
292-
// now all the blocks are in the zip file and accessible, send notification
293-
context.blockMessaging()
294-
.sendBlockPersisted(
295-
new PersistedNotification(batchFirstBlockNumber, batchLastBlockNumber, defaultPriority()));
296-
// remove the batch of blocks from in progress ranges
297-
inProgressZipRanges.remove(batchRange);
298337
}
299338
}

block-node/block-providers/files.historic/src/main/java/org/hiero/block/node/blocks/files/historic/ZipBlockArchive.java

Lines changed: 18 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,15 @@
2121
import java.util.List;
2222
import java.util.Objects;
2323
import java.util.Optional;
24-
import java.util.stream.IntStream;
2524
import java.util.stream.Stream;
2625
import java.util.zip.CRC32;
26+
import java.util.zip.Deflater;
2727
import java.util.zip.ZipEntry;
2828
import java.util.zip.ZipOutputStream;
2929
import org.hiero.block.node.base.BlockFile;
3030
import org.hiero.block.node.spi.BlockNodeContext;
3131
import org.hiero.block.node.spi.historicalblocks.BlockAccessor;
3232
import org.hiero.block.node.spi.historicalblocks.BlockAccessor.Format;
33-
import org.hiero.block.node.spi.historicalblocks.HistoricalBlockFacility;
3433

3534
/**
3635
* The ZipBlockArchive class provides methods for creating and managing zip files containing blocks.
@@ -43,69 +42,50 @@ class ZipBlockArchive {
4342
private final BlockNodeContext context;
4443
/** The configuration for the historic files. */
4544
private final FilesHistoricConfig config;
46-
/** The historical block facility. */
47-
private final HistoricalBlockFacility historicalBlockFacility;
48-
/** The number of blocks per zip file. */
49-
private final int numberOfBlocksPerZipFile;
5045
/** The format for the blocks. */
5146
private final Format format;
5247

5348
/**
5449
* Constructor for ZipBlockArchive.
5550
*
56-
* @param context The block node context
5751
* @param filesHistoricConfig Configuration to be used internally
5852
*/
5953
ZipBlockArchive(@NonNull final BlockNodeContext context, @NonNull final FilesHistoricConfig filesHistoricConfig) {
6054
this.context = Objects.requireNonNull(context);
6155
this.config = Objects.requireNonNull(filesHistoricConfig);
62-
this.historicalBlockFacility = Objects.requireNonNull(context.historicalBlockProvider());
63-
numberOfBlocksPerZipFile = (int) Math.pow(10, this.config.powersOfTenPerZipFileContents());
6456
format = switch (this.config.compression()) {
6557
case ZSTD -> Format.ZSTD_PROTOBUF;
6658
case NONE -> Format.PROTOBUF;
6759
};
6860
}
6961

7062
/**
71-
* Write a new zip file containing blocks, reads the batch of blocks from the HistoricalBlockFacility.
63+
* Write a new zip file containing the input batch of blocks.
7264
*
73-
* @param firstBlockNumber The first block number to write
65+
* @return The size of the zip file created
7466
* @throws IOException If an error occurs writing the block
75-
* @return The size of the zip file created, 0 if no blocks were written.
7667
*/
77-
long writeNewZipFile(long firstBlockNumber) throws IOException {
78-
final long lastBlockNumber = firstBlockNumber + numberOfBlocksPerZipFile - 1;
79-
// compute block path
80-
final BlockPath firstBlockPath = computeBlockPath(config, firstBlockNumber);
68+
long writeNewZipFile(List<BlockAccessor> batch) throws IOException {
69+
// compute zip path
70+
final BlockPath firstBlockPath =
71+
computeBlockPath(config, batch.getFirst().blockNumber());
8172
// create directories
8273
Files.createDirectories(firstBlockPath.dirPath());
83-
// create list for all block accessors, so we can delete files after we are done
84-
final List<BlockAccessor> blockAccessors = IntStream.rangeClosed((int) firstBlockNumber, (int) lastBlockNumber)
85-
.mapToObj(historicalBlockFacility::block)
86-
.toList();
87-
// create zip file path
88-
try (ZipOutputStream zipOutputStream = new ZipOutputStream(new BufferedOutputStream(
89-
Files.newOutputStream(
90-
firstBlockPath.zipFilePath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE),
74+
// create zip file
75+
final Path zipFilePath = firstBlockPath.zipFilePath();
76+
try (final ZipOutputStream zipOutputStream = new ZipOutputStream(new BufferedOutputStream(
77+
Files.newOutputStream(zipFilePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE),
9178
1024 * 1204))) {
9279
// don't compress the zip file as files are already compressed
9380
zipOutputStream.setMethod(ZipOutputStream.STORED);
94-
// todo should we not also set the level to Deflater.NO_COMPRESSION
95-
for (long blockNumber = firstBlockNumber; blockNumber <= lastBlockNumber; blockNumber++) {
81+
zipOutputStream.setLevel(Deflater.NO_COMPRESSION);
82+
for (final BlockAccessor blockAccessor : batch) {
9683
// compute block filename
97-
// todo should we also not append the compression extension to the filename?
98-
// todo I feel like the accessor should generally be getting us the block file name
99-
// what if the file is zstd compressed but the current runtime compression is none?
100-
// then the file name would be wrong? For now appending, maybe a slight cleanup is in order for this
101-
// logic.
102-
final String blockFileName = BlockFile.blockFileName(blockNumber, config.compression());
103-
// get block accessor
104-
final BlockAccessor blockAccessor = blockAccessors.get((int) (blockNumber - firstBlockNumber));
84+
final String blockFileName = BlockFile.blockFileName(blockAccessor.blockNumber(), config.compression());
10585
// get the bytes to write, we have to do this as we need to know the size
10686
final Bytes bytes = blockAccessor.blockBytes(format);
10787
// calculate CRC-32 checksum
108-
CRC32 crc = new CRC32();
88+
final CRC32 crc = new CRC32();
10989
crc.update(bytes.toByteArray());
11090
// create zip entry
11191
final ZipEntry zipEntry = new ZipEntry(blockFileName);
@@ -119,9 +99,10 @@ long writeNewZipFile(long firstBlockNumber) throws IOException {
11999
zipOutputStream.closeEntry();
120100
}
121101
}
102+
// if we have reached here, this means that the zip file was created
103+
// successfully
122104
// return the size of the zip file created
123-
// todo(1217): consider case when no zip is created and rerturn 0
124-
return Files.size(firstBlockPath.zipFilePath());
105+
return Files.size(zipFilePath);
125106
}
126107

127108
/**

0 commit comments

Comments
 (0)