Skip to content

Commit cb38c32

Browse files
authored
feat: BlockNodeContext Executor Service Factory Method (#1083)
Signed-off-by: Atanas Atanasov <a.v.atanasov98@gmail.com>
1 parent db6c7cc commit cb38c32

File tree

16 files changed

+299
-61
lines changed

16 files changed

+299
-61
lines changed

block-node/app/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ testModuleInfo {
6969
requires("org.junit.jupiter.api")
7070
requires("org.junit.jupiter.params")
7171
requires("org.mockito")
72+
requires("org.assertj.core")
7273
}
7374

7475
// Vals

block-node/app/src/main/java/org/hiero/block/node/app/BlockNodeApp.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.hiero.block.node.spi.blockmessaging.BlockMessagingFacility;
3333
import org.hiero.block.node.spi.health.HealthFacility;
3434
import org.hiero.block.node.spi.historicalblocks.LongRange;
35+
import org.hiero.block.node.spi.threading.ThreadPoolManager;
3536

3637
/** Main class for the block node server */
3738
public class BlockNodeApp implements HealthFacility {
@@ -129,9 +130,17 @@ public class BlockNodeApp implements HealthFacility {
129130
// ==== METRICS ================================================================================================
130131
metricsProvider = new DefaultMetricsProvider(configuration);
131132
final Metrics metrics = metricsProvider.createGlobalMetrics();
133+
// ==== THREAD POOL MANAGER ====================================================================================
134+
final ThreadPoolManager threadPoolManager = new DefaultThreadPoolManager();
132135
// ==== CONTEXT ================================================================================================
133136
blockNodeContext = new BlockNodeContext(
134-
configuration, metrics, this, blockMessagingService, historicalBlockFacility, serviceLoader);
137+
configuration,
138+
metrics,
139+
this,
140+
blockMessagingService,
141+
historicalBlockFacility,
142+
serviceLoader,
143+
threadPoolManager);
135144
// ==== CREATE ROUTING BUILDERS ================================================================================
136145
// Create HTTP & GRPC routing builders
137146
final ServiceBuilderImpl serviceBuilder = new ServiceBuilderImpl();
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package org.hiero.block.node.app;
3+
4+
import edu.umd.cs.findbugs.annotations.NonNull;
5+
import edu.umd.cs.findbugs.annotations.Nullable;
6+
import java.lang.Thread.Builder.OfPlatform;
7+
import java.util.concurrent.ExecutorService;
8+
import java.util.concurrent.LinkedBlockingQueue;
9+
import java.util.concurrent.ThreadFactory;
10+
import java.util.concurrent.ThreadPoolExecutor;
11+
import java.util.concurrent.TimeUnit;
12+
import org.hiero.block.common.utils.Preconditions;
13+
import org.hiero.block.node.spi.threading.ThreadPoolManager;
14+
15+
/**
16+
* The default implementation of the {@link ThreadPoolManager} interface. This
17+
* implementation is used systemwide to manage the thread pools.
18+
*/
19+
final class DefaultThreadPoolManager implements ThreadPoolManager {
20+
/**
21+
* {@inheritDoc}
22+
*/
23+
@NonNull
24+
@Override
25+
public ExecutorService createSingleThreadExecutor(@NonNull final String threadName) {
26+
return createSingleThreadExecutor(threadName, null);
27+
}
28+
29+
/**
30+
* {@inheritDoc}
31+
*/
32+
@NonNull
33+
@Override
34+
public ExecutorService createSingleThreadExecutor(
35+
@NonNull final String threadName,
36+
@Nullable final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
37+
Preconditions.requireNotBlank(threadName);
38+
final OfPlatform factoryBuilder = Thread.ofPlatform().name(threadName);
39+
if (uncaughtExceptionHandler != null) {
40+
factoryBuilder.uncaughtExceptionHandler(uncaughtExceptionHandler);
41+
}
42+
final ThreadFactory factory = factoryBuilder.factory();
43+
return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), factory);
44+
}
45+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package org.hiero.block.node.app;
3+
4+
import static org.assertj.core.api.Assertions.assertThat;
5+
6+
import java.lang.Thread.UncaughtExceptionHandler;
7+
import java.util.concurrent.ExecutorService;
8+
import java.util.concurrent.ThreadPoolExecutor;
9+
import java.util.concurrent.TimeUnit;
10+
import org.assertj.core.api.InstanceOfAssertFactories;
11+
import org.junit.jupiter.api.BeforeEach;
12+
import org.junit.jupiter.api.DisplayName;
13+
import org.junit.jupiter.api.Nested;
14+
import org.junit.jupiter.api.Test;
15+
16+
/**
17+
* Test class for {@link DefaultThreadPoolManager}.
18+
*/
19+
@DisplayName("DefaultThreadPoolManager Tests")
20+
class DefaultThreadPoolManagerTest {
21+
/** The instance under test. */
22+
private DefaultThreadPoolManager toTest;
23+
24+
/**
25+
* Setup before each test.
26+
*/
27+
@BeforeEach
28+
void setUp() {
29+
toTest = new DefaultThreadPoolManager();
30+
}
31+
32+
/**
33+
* Functionality tests for {@link DefaultThreadPoolManager}.
34+
*/
35+
@Nested
36+
@DisplayName("Functionality Tests")
37+
final class FunctionalityTests {
38+
/**
39+
* This test aims to verify that the
40+
* {@link DefaultThreadPoolManager#createSingleThreadExecutor(String)}
41+
* creates a single thread executor correctly, each invocation creates
42+
* a new instance with the proper setup.
43+
*/
44+
@Test
45+
@DisplayName(
46+
"Test createSingleThreadExecutor(String) will correctly create a single thread executor, new instance on every invocation")
47+
void testCreateSingleThreadExecutor() {
48+
final ExecutorService actual = toTest.createSingleThreadExecutor("testThreadName");
49+
assertThat(actual)
50+
.isNotNull()
51+
.isExactlyInstanceOf(ThreadPoolExecutor.class)
52+
.asInstanceOf(InstanceOfAssertFactories.type(ThreadPoolExecutor.class))
53+
.returns(0L, executor -> executor.getKeepAliveTime(TimeUnit.MILLISECONDS))
54+
.returns(1, ThreadPoolExecutor::getCorePoolSize)
55+
.returns(1, ThreadPoolExecutor::getMaximumPoolSize);
56+
final ExecutorService actual2 = toTest.createSingleThreadExecutor("testThreadName2");
57+
assertThat(actual2)
58+
.isNotNull()
59+
.isExactlyInstanceOf(ThreadPoolExecutor.class)
60+
.asInstanceOf(InstanceOfAssertFactories.type(ThreadPoolExecutor.class))
61+
.returns(0L, executor -> executor.getKeepAliveTime(TimeUnit.MILLISECONDS))
62+
.returns(1, ThreadPoolExecutor::getCorePoolSize)
63+
.returns(1, ThreadPoolExecutor::getMaximumPoolSize)
64+
.isNotSameAs(actual);
65+
}
66+
67+
/**
68+
* This test aims to verify that the
69+
* {@link DefaultThreadPoolManager#createSingleThreadExecutor(String, UncaughtExceptionHandler)}
70+
* creates a single thread executor correctly, each invocation creates
71+
* a new instance with the proper setup.
72+
*/
73+
@Test
74+
@DisplayName(
75+
"Test createSingleThreadExecutor(String, UncaughtExceptionHandler) will correctly create a single thread executor, new instance on every invocation")
76+
void testCreateSingleThreadExecutorWithUncaughtExceptionHandler() {
77+
final UncaughtExceptionHandler expectedHandler = (t, e) -> {
78+
// Handle the exception
79+
};
80+
final ExecutorService actual = toTest.createSingleThreadExecutor("testThreadName", expectedHandler);
81+
assertThat(actual)
82+
.isNotNull()
83+
.isExactlyInstanceOf(ThreadPoolExecutor.class)
84+
.asInstanceOf(InstanceOfAssertFactories.type(ThreadPoolExecutor.class))
85+
.returns(0L, executor -> executor.getKeepAliveTime(TimeUnit.MILLISECONDS))
86+
.returns(1, ThreadPoolExecutor::getCorePoolSize)
87+
.returns(1, ThreadPoolExecutor::getMaximumPoolSize)
88+
.returns(expectedHandler, executor -> executor.getThreadFactory()
89+
.newThread(() -> {})
90+
.getUncaughtExceptionHandler());
91+
final ExecutorService actual2 = toTest.createSingleThreadExecutor("testThreadName2", expectedHandler);
92+
assertThat(actual2)
93+
.isNotNull()
94+
.isExactlyInstanceOf(ThreadPoolExecutor.class)
95+
.asInstanceOf(InstanceOfAssertFactories.type(ThreadPoolExecutor.class))
96+
.returns(0L, executor -> executor.getKeepAliveTime(TimeUnit.MILLISECONDS))
97+
.returns(1, ThreadPoolExecutor::getCorePoolSize)
98+
.returns(1, ThreadPoolExecutor::getMaximumPoolSize)
99+
.returns(expectedHandler, executor -> executor.getThreadFactory()
100+
.newThread(() -> {})
101+
.getUncaughtExceptionHandler())
102+
.isNotSameAs(actual);
103+
}
104+
}
105+
}

block-node/app/src/testFixtures/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
// SPDX-License-Identifier: Apache-2.0
22
module org.hiero.block.node.app.test.fixtures {
3+
exports org.hiero.block.node.app.fixtures.async;
34
exports org.hiero.block.node.app.fixtures.blocks;
45
exports org.hiero.block.node.app.fixtures.plugintest;
56

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package org.hiero.block.node.app.fixtures.async;
3+
4+
import edu.umd.cs.findbugs.annotations.NonNull;
5+
import edu.umd.cs.findbugs.annotations.Nullable;
6+
import java.util.Objects;
7+
import java.util.concurrent.ExecutorService;
8+
import org.hiero.block.node.spi.threading.ThreadPoolManager;
9+
10+
/**
11+
* A very simplified version of the {@link ThreadPoolManager} that is used only
12+
* for testing. This class will return the same executor service that was passed
13+
* to it in the constructor. This is useful for testing purposes where we want
14+
* to control the executor service that is used in the tests.
15+
*
16+
* @param <T> the type of executor service
17+
*/
18+
public record TestThreadPoolManager<T extends ExecutorService>(@NonNull T executor) implements ThreadPoolManager {
19+
public TestThreadPoolManager {
20+
Objects.requireNonNull(executor);
21+
}
22+
23+
/**
24+
* Test implementation, always returns the same executor service that was
25+
* passed to the constructor of this class.
26+
*
27+
* @return the executor service that was passed to the constructor
28+
*/
29+
@NonNull
30+
@Override
31+
public ExecutorService createSingleThreadExecutor(@NonNull final String threadName) {
32+
return createSingleThreadExecutor(threadName, null);
33+
}
34+
35+
/**
36+
* Test implementation, always returns the same executor service that was
37+
* passed to the constructor of this class.
38+
*
39+
* @return the executor service that was passed to the constructor
40+
*/
41+
@NonNull
42+
@Override
43+
public ExecutorService createSingleThreadExecutor(
44+
@NonNull final String threadName,
45+
@Nullable final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
46+
return executor;
47+
}
48+
}

block-node/app/src/testFixtures/java/org/hiero/block/node/app/fixtures/plugintest/PluginTestBase.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
import io.helidon.webserver.http.HttpService;
1111
import java.util.Collections;
1212
import java.util.Map;
13+
import java.util.concurrent.LinkedBlockingQueue;
14+
import org.hiero.block.node.app.fixtures.async.BlockingSerialExecutor;
15+
import org.hiero.block.node.app.fixtures.async.TestThreadPoolManager;
1316
import org.hiero.block.node.spi.BlockNodeContext;
1417
import org.hiero.block.node.spi.BlockNodePlugin;
1518
import org.hiero.block.node.spi.ServiceBuilder;
@@ -46,6 +49,9 @@ public abstract class PluginTestBase<P extends BlockNodePlugin> {
4649
protected BlockNodeContext blockNodeContext;
4750
/** The test block messaging facility, for mocking out the messaging service. */
4851
protected TestBlockMessagingFacility blockMessaging = new TestBlockMessagingFacility();
52+
/** The test thread pool manager */
53+
protected TestThreadPoolManager<BlockingSerialExecutor> testThreadPoolManager =
54+
new TestThreadPoolManager<>(new BlockingSerialExecutor(new LinkedBlockingQueue<>()));
4955
/** The plugin to be tested */
5056
protected P plugin;
5157

@@ -92,7 +98,8 @@ public void start(P plugin, HistoricalBlockFacility historicalBlockFacility, Map
9298
healthFacility,
9399
blockMessaging,
94100
historicalBlockFacility,
95-
new ServiceLoaderFunction());
101+
new ServiceLoaderFunction(),
102+
testThreadPoolManager);
96103
// if the subclass implements ServiceBuilder, use it otherwise create a mock
97104
ServiceBuilder mockServiceBuilder = (this instanceof ServiceBuilder)
98105
? (ServiceBuilder) this
@@ -125,10 +132,11 @@ public void registerGrpcService(@NonNull ServiceInterface service) {}
125132
}
126133

127134
/**
128-
* Tears down the test fixture by stopping the metrics provider.
135+
* Teardown after each.
129136
*/
130137
@AfterEach
131138
public void tearDown() {
132139
metricsProvider.stop();
140+
testThreadPoolManager.executor().shutdownNow();
133141
}
134142
}

block-node/block-providers/files.historic/src/main/java/org/hiero/block/node/blocks/files/historic/BlocksFilesHistoricPlugin.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import java.util.Objects;
1010
import java.util.concurrent.CopyOnWriteArrayList;
1111
import java.util.concurrent.ExecutorService;
12-
import java.util.concurrent.Executors;
1312
import org.hiero.block.node.base.ranges.ConcurrentLongRangeSet;
1413
import org.hiero.block.node.spi.BlockNodeContext;
1514
import org.hiero.block.node.spi.ServiceBuilder;
@@ -28,7 +27,7 @@ public final class BlocksFilesHistoricPlugin implements BlockProviderPlugin, Blo
2827
/** The logger for this class. */
2928
private final System.Logger LOGGER = System.getLogger(getClass().getName());
3029
/** The executor service for moving blocks to zip files in a background thread. */
31-
private final ExecutorService zipMoveExecutorService;
30+
private ExecutorService zipMoveExecutorService;
3231
/** The block node context. */
3332
private BlockNodeContext context;
3433
/** The zip block archive. */
@@ -40,19 +39,6 @@ public final class BlocksFilesHistoricPlugin implements BlockProviderPlugin, Blo
4039
/** List of all zip ranges that are in progress, so we do not start a duplicate job. */
4140
private final CopyOnWriteArrayList<LongRange> inProgressZipRanges = new CopyOnWriteArrayList<>();
4241

43-
/** Constructor, used for normal plugin loading */
44-
public BlocksFilesHistoricPlugin() {
45-
this.zipMoveExecutorService = Executors.newSingleThreadExecutor();
46-
}
47-
48-
/**
49-
* Constructor, used only for testing temporarily, need to introduce the
50-
* executor factory method in the test context to remove this.
51-
*/
52-
BlocksFilesHistoricPlugin(final ExecutorService zipMoveExecutorService) {
53-
this.zipMoveExecutorService = Objects.requireNonNull(zipMoveExecutorService);
54-
}
55-
5642
// ==== BlockProviderPlugin Methods ================================================================================
5743

5844
/**
@@ -81,6 +67,8 @@ public void init(final BlockNodeContext context, final ServiceBuilder serviceBui
8167
// register to listen to block notifications
8268
context.blockMessaging().registerBlockNotificationHandler(this, false, "Blocks Files Historic");
8369
numberOfBlocksPerZipFile = (int) Math.pow(10, config.powersOfTenPerZipFileContents());
70+
// create the executor service for moving blocks to zip files
71+
zipMoveExecutorService = context.threadPoolManager().createSingleThreadExecutor("FilesHistoricZipMove");
8472
zipBlockArchive = new ZipBlockArchive(context, config);
8573
// get the first and last block numbers from the zipBlockArchive
8674
final long firstZippedBlock = zipBlockArchive.minStoredBlockNumber();

0 commit comments

Comments
 (0)