Skip to content

Commit 368b2bd

Browse files
authored
NIFI-14631 When stopping a Stateless Process Group, wait up to 10 seconds before interrupting (apache#9992)
Signed-off-by: David Handermann <exceptionfactory@apache.org>
1 parent 1ac5f65 commit 368b2bd

File tree

4 files changed

+58
-13
lines changed

4 files changed

+58
-13
lines changed

nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.nifi.stateless.repository.StatelessProvenanceRepository;
5757

5858
import java.io.IOException;
59+
import java.time.Duration;
5960
import java.util.ArrayList;
6061
import java.util.Collections;
6162
import java.util.EnumSet;
@@ -71,6 +72,10 @@
7172
import java.util.stream.Collectors;
7273

7374
public class StatelessFlowTask {
75+
// The amount of time after a user stops a Stateless Group that we will wait before we disable the ProcessSession,
76+
// throwing Exceptions on all ProcessSession access (effectively performing the same functionality as Terminating a processor).
77+
private static final Duration GRACEFUL_STOP_DURATION = Duration.ofSeconds(10);
78+
7479
private static final Set<ProvenanceEventType> eventTypesToKeepOnFailure = EnumSet.of(ProvenanceEventType.SEND, ProvenanceEventType.REMOTE_INVOCATION);
7580
private final StatelessGroupNode statelessGroupNode;
7681
private final StatelessDataflow flow;
@@ -88,6 +93,8 @@ public class StatelessFlowTask {
8893
private List<RepositoryRecord> outputRepositoryRecords;
8994
private List<ProvenanceEventRecord> cloneProvenanceEvents;
9095

96+
// State that is updated during invocation but do not need to be guarded by synchronized block
97+
private volatile long shutdownInitiationTime = 0L;
9198

9299
private StatelessFlowTask(final Builder builder) {
93100
this.statelessGroupNode = builder.statelessGroupNode;
@@ -147,12 +154,24 @@ private boolean isRunAsFastAsPossible(final ProcessorNode procNode) {
147154
}
148155

149156
public void shutdown() {
150-
this.flow.shutdown(false, true);
157+
this.shutdownInitiationTime = System.nanoTime();
158+
this.flow.shutdown(false, true, GRACEFUL_STOP_DURATION);
151159
}
152160

153161
private boolean isAbort() {
154162
final ScheduledState desiredState = statelessGroupNode.getDesiredState();
155-
return desiredState != ScheduledState.RUNNING && desiredState != ScheduledState.RUN_ONCE;
163+
final boolean stopped = desiredState != ScheduledState.RUNNING && desiredState != ScheduledState.RUN_ONCE;
164+
if (!stopped) {
165+
return false;
166+
}
167+
168+
final long shutdownTime = shutdownInitiationTime;
169+
if (shutdownTime == 0) {
170+
return false;
171+
}
172+
173+
final long abortTime = shutdownTime + GRACEFUL_STOP_DURATION.toNanos();
174+
return System.nanoTime() >= abortTime;
156175
}
157176

158177

nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ private void initialize(final ScheduledExecutorService executor, final Schedulin
234234
} catch (final Exception e) {
235235
for (final StandardStatelessFlow flow : createdFlows) {
236236
try {
237-
flow.shutdown(false, true);
237+
flow.shutdown(false, true, Duration.ofMillis(0));
238238
} catch (final Exception ex) {
239239
logger.error("Failed to shutdown Stateless Flow {}", flow, ex);
240240
}
@@ -371,7 +371,7 @@ public boolean isEnableControllerServices() {
371371
try {
372372
dataflow.initialize(initializationContext);
373373
} catch (final Exception e) {
374-
dataflow.shutdown(true, true);
374+
dataflow.shutdown(true, true, Duration.ofMillis(0));
375375
throw e;
376376
}
377377

nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.nifi.reporting.BulletinRepository;
2323

2424
import java.io.InputStream;
25+
import java.time.Duration;
2526
import java.util.Map;
2627
import java.util.Set;
2728

@@ -68,10 +69,16 @@ default DataflowTrigger trigger() {
6869
void initialize(StatelessDataflowInitializationContext initializationContext);
6970

7071
default void shutdown() {
71-
shutdown(true, false);
72+
shutdown(true, false, Duration.ofMillis(0));
7273
}
7374

74-
void shutdown(boolean triggerComponentShutdown, boolean interruptProcessors);
75+
/**
76+
* Shuts down the dataflow, stopping all components and releasing all resources.
77+
* @param triggerComponentShutdown whether or not to trigger the shutdown of components (e.g., invoking @OnShutdown methods)
78+
* @param interruptProcessors whether or not to interrupt any processors and tasks that are running
79+
* @param gracefulShutdownPeriod if interruptProcessors is true, this specifies the amount of time to wait for processors to finish before interrupting them.
80+
*/
81+
void shutdown(boolean triggerComponentShutdown, boolean interruptProcessors, Duration gracefulShutdownPeriod);
7582

7683
StatelessDataflowValidation performValidation();
7784

nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import java.util.Optional;
8888
import java.util.Set;
8989
import java.util.concurrent.BlockingQueue;
90+
import java.util.concurrent.CompletableFuture;
9091
import java.util.concurrent.ExecutorService;
9192
import java.util.concurrent.Executors;
9293
import java.util.concurrent.Future;
@@ -353,7 +354,7 @@ private void startReportingTask(final ReportingTaskNode taskNode) {
353354
}
354355

355356
@Override
356-
public void shutdown(final boolean triggerComponentShutdown, final boolean interruptProcessors) {
357+
public void shutdown(final boolean triggerComponentShutdown, final boolean interruptProcessors, final Duration gracefulShutdownDuration) {
357358
if (shutdown) {
358359
return;
359360
}
@@ -362,18 +363,36 @@ public void shutdown(final boolean triggerComponentShutdown, final boolean inter
362363
logger.info("Shutting down dataflow {}", rootGroup.getName());
363364

364365
if (runDataflowExecutor != null) {
365-
if (interruptProcessors) {
366-
runDataflowExecutor.shutdownNow();
367-
} else {
368-
runDataflowExecutor.shutdown();
369-
}
366+
runDataflowExecutor.shutdown();
370367
}
371368
if (backgroundTaskExecutor != null) {
372369
backgroundTaskExecutor.shutdown();
373370
}
374371

375372
logger.info("Stopping all components");
376-
rootGroup.stopComponents().join();
373+
final CompletableFuture<Void> stopComponentsFuture = rootGroup.stopComponents();
374+
375+
// Wait for the graceful shutdown period for all processors to stop. If the processors do not stop within this time,
376+
// then interrupt them.
377+
if (runDataflowExecutor != null && interruptProcessors) {
378+
if (gracefulShutdownDuration.isZero()) {
379+
logger.info("Shutting down all components immediately without waiting for graceful shutdown period");
380+
runDataflowExecutor.shutdownNow();
381+
} else {
382+
try {
383+
stopComponentsFuture.get(gracefulShutdownDuration.toMillis(), TimeUnit.MILLISECONDS);
384+
} catch (final Exception e) {
385+
logger.warn("Stateless flow failed to stop all components gracefully after {} millis. Interrupting all running components.", gracefulShutdownDuration.toMillis(), e);
386+
if (e instanceof InterruptedException) {
387+
Thread.interrupted();
388+
}
389+
390+
runDataflowExecutor.shutdownNow();
391+
}
392+
}
393+
}
394+
395+
stopComponentsFuture.join();
377396
rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::shutdown);
378397

379398
if (triggerComponentShutdown) {

0 commit comments

Comments
 (0)