Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

import static java.lang.Double.valueOf;
import static java.lang.Integer.parseInt;
import static java.lang.Long.parseLong;
import static java.lang.Thread.sleep;
import static java.time.LocalDateTime.now;
import static java.time.LocalDateTime.ofInstant;
import static java.util.concurrent.Executors.newFixedThreadPool;

public class ExecutorServiceRunner {
Expand All @@ -29,6 +31,7 @@ public class ExecutorServiceRunner {
private int numberOfThreads;
private int rampUpPeriod;
private int loopCount;
private long abortAfterTimeLapsedInSeconds;

private Double delayBetweenTwoThreadsInMilliSecs;

Expand All @@ -37,16 +40,22 @@ public ExecutorServiceRunner(String loadPropertiesFile) {
numberOfThreads = parseInt(properties.getProperty("number.of.threads"));
rampUpPeriod = parseInt(properties.getProperty("ramp.up.period.in.seconds"));
loopCount = parseInt(properties.getProperty("loop.count"));
abortAfterTimeLapsedInSeconds = parseLong(properties.getProperty("abort.after.time.lapsed.in.seconds", "-1"));

calculateAndSetDelayBetweenTwoThreadsInSecs(rampUpPeriod);

logLoadingProperties();
}

public ExecutorServiceRunner(int numberOfThreads, int loopCount, int rampUpPeriod) {
this(numberOfThreads, loopCount, rampUpPeriod, -1);
}

public ExecutorServiceRunner(int numberOfThreads, int loopCount, int rampUpPeriod, long abortAfterTimeLapsedInSeconds) {
this.numberOfThreads = numberOfThreads;
this.loopCount = loopCount;
this.rampUpPeriod = rampUpPeriod;
this.abortAfterTimeLapsedInSeconds = abortAfterTimeLapsedInSeconds;

calculateAndSetDelayBetweenTwoThreadsInSecs(this.rampUpPeriod);
logLoadingProperties();
Expand All @@ -69,12 +78,23 @@ public void runRunnables() {
throw new RuntimeException("No runnable(s) was found to run. You can add one or more runnables using 'addRunnable(Runnable runnable)'");
}

long startTime = System.currentTimeMillis();
ExecutorService executorService = newFixedThreadPool(numberOfThreads);

try {
for (int i = 0; i < loopCount; i++) {
// Check timeout before each loop iteration
if (isTimeoutExceeded(startTime)) {
throw new RuntimeException("Load test aborted: abort.after.time.lapsed.in.seconds of " + abortAfterTimeLapsedInSeconds + " seconds exceeded");
}

runnables.stream().forEach(thisFunction -> {
for (int j = 0; j < numberOfThreads; j++) {
// Check timeout before executing each thread
if (isTimeoutExceeded(startTime)) {
throw new RuntimeException("Load test aborted: abort.after.time.lapsed.in.seconds of " + abortAfterTimeLapsedInSeconds + " seconds exceeded");
}

try {
LOGGER.debug("Waiting for the next test flight to adjust the overall ramp up time, " +
"waiting time in the transit now = " + delayBetweenTwoThreadsInMilliSecs);
Expand All @@ -83,6 +103,11 @@ public void runRunnables() {
throw new RuntimeException(e);
}

// Check timeout again after sleep
if (isTimeoutExceeded(startTime)) {
throw new RuntimeException("Load test aborted: abort.after.time.lapsed.in.seconds of " + abortAfterTimeLapsedInSeconds + " seconds exceeded");
}

LOGGER.debug(Thread.currentThread().getName() + " Executor - *Start... Time = " + now());

executorService.execute(thisFunction);
Expand All @@ -96,10 +121,24 @@ public void runRunnables() {
} finally {
executorService.shutdown();
while (!executorService.isTerminated()) {
// Check timeout while waiting for tasks to finish
if (abortAfterTimeLapsedInSeconds > 0) {
long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000;
if (elapsedSeconds >= abortAfterTimeLapsedInSeconds) {
LOGGER.warn("Timeout reached while waiting for tasks to complete. Initiating shutdown...");
executorService.shutdownNow();
throw new RuntimeException("Load test aborted: abort.after.time.lapsed.in.seconds of " + abortAfterTimeLapsedInSeconds + " seconds exceeded");
}
}
// --------------------------------------
// wait for all tasks to finish execution
// --------------------------------------
//LOGGER.info("Still waiting for all threads to complete execution...");
try {
Thread.sleep(100); // Small sleep to prevent busy-waiting
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
LOGGER.debug("**Finished executing all threads**");
}
Expand All @@ -110,13 +149,24 @@ public void runRunnablesMulti() {
throw new RuntimeException("No runnable(s) was found to run. You can add one or more runnables using 'addRunnable(Runnable runnable)'");
}

long startTime = System.currentTimeMillis();
ExecutorService executorService = newFixedThreadPool(numberOfThreads);

try {
final AtomicInteger functionIndex = new AtomicInteger();

for (int i = 0; i < loopCount; i++) {
// Check timeout before each loop iteration
if (isTimeoutExceeded(startTime)) {
throw new RuntimeException("Load test aborted: abort.after.time.lapsed.in.seconds of " + abortAfterTimeLapsedInSeconds + " seconds exceeded");
}

for (int j = 0; j < numberOfThreads; j++) {
// Check timeout before executing each thread
if (isTimeoutExceeded(startTime)) {
throw new RuntimeException("Load test aborted: abort.after.time.lapsed.in.seconds of " + abortAfterTimeLapsedInSeconds + " seconds exceeded");
}

try {
LOGGER.debug("Waiting for the next test flight to adjust the overall ramp up time, " +
"waiting time in the transit now = " + delayBetweenTwoThreadsInMilliSecs);
Expand All @@ -125,6 +175,11 @@ public void runRunnablesMulti() {
throw new RuntimeException(e);
}

// Check timeout again after sleep
if (isTimeoutExceeded(startTime)) {
throw new RuntimeException("Load test aborted: abort.after.time.lapsed.in.seconds of " + abortAfterTimeLapsedInSeconds + " seconds exceeded");
}

LOGGER.debug(Thread.currentThread().getName() + " Executor - *Start... Time = " + now());

executorService.execute(runnables.get(functionIndex.getAndIncrement()));
Expand All @@ -142,10 +197,24 @@ public void runRunnablesMulti() {
} finally {
executorService.shutdown();
while (!executorService.isTerminated()) {
// Check timeout while waiting for tasks to finish
if (abortAfterTimeLapsedInSeconds > 0) {
long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000;
if (elapsedSeconds >= abortAfterTimeLapsedInSeconds) {
LOGGER.warn("Timeout reached while waiting for tasks to complete. Initiating shutdown...");
executorService.shutdownNow();
throw new RuntimeException("Load test aborted: abort.after.time.lapsed.in.seconds of " + abortAfterTimeLapsedInSeconds + " seconds exceeded");
}
}
// --------------------------------------
// wait for all tasks to finish execution
// --------------------------------------
//LOGGER.info("Still waiting for all threads to complete execution...");
try {
Thread.sleep(100); // Small sleep to prevent busy-waiting
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
LOGGER.warn("** Completed executing all virtual-user scenarios! **");
}
Expand All @@ -161,18 +230,29 @@ public void runCallableFutures() {
throw new RuntimeException("No callable(s) was found to run. You can add one or more callables using 'addCallable(Callable callable)'");
}

long startTime = System.currentTimeMillis();
ExecutorService executorService = newFixedThreadPool(numberOfThreads);

try {
executorService.invokeAll(callables).stream().forEach(future -> {
for (int j = 0; j < numberOfThreads; j++) {
// Check timeout before each thread execution
if (isTimeoutExceeded(startTime)) {
throw new RuntimeException("Load test aborted: abort.after.time.lapsed.in.seconds of " + abortAfterTimeLapsedInSeconds + " seconds exceeded");
}

try {
LOGGER.debug("Waiting in the transit for next test flight to adjust overall ramp up time, wait time now = " + delayBetweenTwoThreadsInMilliSecs);
sleep(delayBetweenTwoThreadsInMilliSecs.longValue());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

// Check timeout again after sleep
if (isTimeoutExceeded(startTime)) {
throw new RuntimeException("Load test aborted: abort.after.time.lapsed.in.seconds of " + abortAfterTimeLapsedInSeconds + " seconds exceeded");
}

LOGGER.debug(Thread.currentThread().getName() + " Future execution- Start.... Time = " + now());

execute(future);
Expand All @@ -185,8 +265,23 @@ public void runCallableFutures() {
} finally {
executorService.shutdown();
while (!executorService.isTerminated()) {
// Check timeout while waiting for tasks to finish
if (abortAfterTimeLapsedInSeconds > 0) {
long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000;
if (elapsedSeconds >= abortAfterTimeLapsedInSeconds) {
LOGGER.warn("Timeout reached while waiting for tasks to complete. Initiating shutdown...");
executorService.shutdownNow();
throw new RuntimeException("Load test aborted: abort.after.time.lapsed.in.seconds of " + abortAfterTimeLapsedInSeconds + " seconds exceeded");
}
}
// wait for all tasks to finish executing
// LOGGER.info("Still waiting for all threads to complete execution...");
try {
Thread.sleep(100); // Small sleep to prevent busy-waiting
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
LOGGER.warn("* Completed executing all virtual-user scenarios! *");
}
Expand Down Expand Up @@ -245,4 +340,13 @@ private void logLoadingProperties() {
"\n-----------------------------------\n");

}

private boolean isTimeoutExceeded(long startTime) {
if (abortAfterTimeLapsedInSeconds <= 0) {
return false; // No timeout configured
}

long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000;
return elapsedSeconds >= abortAfterTimeLapsedInSeconds;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package org.jsmart.zerocode.parallel;

import org.junit.Test;

import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.Assert.*;

public class ExecutorServiceRunnerTimeoutTest {

@Test(expected = RuntimeException.class)
public void testTimeoutFunctionality() {
// Create an ExecutorServiceRunner with 1 thread, 10 loops, 1 second ramp up
// and a timeout of 1 second
ExecutorServiceRunner runner = new ExecutorServiceRunner(1, 10, 1, 1); // 1 second timeout

// Add a runnable that will take some time to execute (more than timeout)
runner.addRunnable(() -> {
try {
// Sleep for 3 seconds which is more than the 1 second timeout
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

// This should throw a RuntimeException due to timeout
runner.runRunnables();
}

@Test
public void testTimeoutNotExceeded() {
// Create an ExecutorServiceRunner with 1 thread, 1 loop, 1 second ramp up
// and a timeout of 5 seconds
ExecutorServiceRunner runner = new ExecutorServiceRunner(1, 1, 1, 5); // 5 second timeout

AtomicBoolean taskExecuted = new AtomicBoolean(false);

// Add a runnable that completes before timeout
runner.addRunnable(() -> {
taskExecuted.set(true);
try {
// Sleep for 1 second which is less than the 5 second timeout
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

// This should complete without throwing an exception
runner.runRunnables();

assertTrue("Task should have been executed", taskExecuted.get());
}

@Test
public void testTimeoutWithMultipleRunnables() {
// Create an ExecutorServiceRunner with 2 threads, 2 loops, 1 second ramp up
// and a timeout of 1 second
ExecutorServiceRunner runner = new ExecutorServiceRunner(2, 2, 1, 1); // 1 second timeout

// Add runnables that will take more time than the timeout
for (int i = 0; i < 4; i++) {
runner.addRunnable(() -> {
try {
// Sleep for 2 seconds which is more than the 1 second timeout
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}

// This should throw a RuntimeException due to timeout
try {
runner.runRunnables();
fail("Expected RuntimeException due to timeout");
} catch (RuntimeException e) {
assertTrue("Exception message should mention timeout", e.getMessage().contains("abort.after.time.lapsed.in.seconds"));
}
}

@Test
public void testTimeoutDisabledWhenNegative() {
// Create an ExecutorServiceRunner with negative timeout value (disabled)
ExecutorServiceRunner runner = new ExecutorServiceRunner(1, 1, 1, -1); // No timeout

AtomicBoolean taskExecuted = new AtomicBoolean(false);

// Add a runnable that executes
runner.addRunnable(() -> {
taskExecuted.set(true);
try {
Thread.sleep(500); // Sleep for 0.5 seconds
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

// This should complete without throwing an exception
runner.runRunnables();

assertTrue("Task should have been executed", taskExecuted.get());
}
}