Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@
* THE SOFTWARE.
*/
package com.iluwatar.logaggregation;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CountDownLatch;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;

/**
* Responsible for collecting and buffering logs from different services. Once the logs reach a
* certain threshold or after a certain time interval, they are flushed to the central log store.
Expand All @@ -41,30 +43,49 @@
public class LogAggregator {

private static final int BUFFER_THRESHOLD = 3;
private static final int FLUSH_INTERVAL_SECONDS = 5;
private static final int SHUTDOWN_TIMEOUT_SECONDS = 10;

private final CentralLogStore centralLogStore;
private final ConcurrentLinkedQueue<LogEntry> buffer = new ConcurrentLinkedQueue<>();
private final BlockingQueue<LogEntry> buffer = new LinkedBlockingQueue<>();
private final LogLevel minLogLevel;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);
private final AtomicInteger logCount = new AtomicInteger(0);

private final CountDownLatch shutdownLatch = new CountDownLatch(1);
private volatile boolean running = true;
/**
* constructor of LogAggregator.
*
* @param centralLogStore central log store implement
* @param minLogLevel min log level to store log
*/
public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) {
this.centralLogStore = centralLogStore;
this.centralLogStore = centralLogStore;
this.minLogLevel = minLogLevel;
startBufferFlusher();
startPeriodicFlusher();

// Add shutdown hook for graceful termination
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
stop();
} catch (InterruptedException e) {
LOGGER.warn("Shutdown interrupted", e);
Thread.currentThread().interrupt();
}
}));
}

/**
* Collects a given log entry, and filters it by the defined log level.
*
* @param logEntry The log entry to collect.
*/
public void collectLog(LogEntry logEntry) {
public void collectLog(LogEntry logEntry) {
if (!running) {
LOGGER.warn("LogAggregator is shutting down. Skipping log entry.");
return;
}

if (logEntry.getLevel() == null || minLogLevel == null) {
LOGGER.warn("Log level or threshold level is null. Skipping.");
return;
Expand All @@ -75,10 +96,17 @@ public void collectLog(LogEntry logEntry) {
return;
}

buffer.offer(logEntry);
// BlockingQueue.offer() is non-blocking and thread-safe
boolean added = buffer.offer(logEntry);
if (!added) {
LOGGER.warn("Failed to add log entry to buffer - queue may be full");
return;
}

// Check if immediate flush is needed due to threshold
if (logCount.incrementAndGet() >= BUFFER_THRESHOLD) {
flushBuffer();
// Schedule immediate flush instead of blocking current thread
scheduledExecutor.execute(this::flushBuffer);
}
}

Expand All @@ -87,33 +115,126 @@ public void collectLog(LogEntry logEntry) {
*
* @throws InterruptedException If any thread has interrupted the current thread.
*/
public void stop() throws InterruptedException {
executorService.shutdownNow();
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.error("Log aggregator did not terminate.");
public void stop() throws InterruptedException {
LOGGER.info("Stopping LogAggregator...");
running = false;

// Shutdown the scheduler gracefully
scheduledExecutor.shutdown();

try {
// Wait for scheduled tasks to complete
if (!scheduledExecutor.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
LOGGER.warn("Scheduler did not terminate gracefully, forcing shutdown");
scheduledExecutor.shutdownNow();

// Wait a bit more for tasks to respond to interruption
if (!scheduledExecutor.awaitTermination(2, TimeUnit.SECONDS)) {
LOGGER.error("Scheduler did not terminate after forced shutdown");
}
}
} finally {
// Final flush of any remaining logs
flushBuffer();
shutdownLatch.countDown();
LOGGER.info("LogAggregator stopped successfully");
}
flushBuffer();
}



/**
* Waits for the LogAggregator to complete shutdown.
* Useful for testing or controlled shutdown scenarios.
*
* @throws InterruptedException If any thread has interrupted the current thread.
*/
public void awaitShutdown() throws InterruptedException {
shutdownLatch.await();
}


private void flushBuffer() {
LogEntry logEntry;
while ((logEntry = buffer.poll()) != null) {
centralLogStore.storeLog(logEntry);
logCount.decrementAndGet();
if (!running && buffer.isEmpty()) {
return;
}

try {
List<LogEntry> batch = new ArrayList<>();
int drained = 0;

// Drain up to a reasonable batch size for efficiency
LogEntry logEntry;
while ((logEntry = buffer.poll()) != null && drained < 100) {
batch.add(logEntry);
drained++;
}

if (!batch.isEmpty()) {
LOGGER.debug("Flushing {} log entries to central store", batch.size());

// Process the batch
for (LogEntry entry : batch) {
centralLogStore.storeLog(entry);
logCount.decrementAndGet();
}

LOGGER.debug("Successfully flushed {} log entries", batch.size());
}
} catch (Exception e) {
LOGGER.error("Error occurred while flushing buffer", e);
}
}

private void startBufferFlusher() {
executorService.execute(

/**
* Starts the periodic buffer flusher using ScheduledExecutorService.
* This eliminates the busy-waiting loop with Thread.sleep().
*/
private void startPeriodicFlusher() {
scheduledExecutor.scheduleAtFixedRate(
() -> {
while (!Thread.currentThread().isInterrupted()) {
if (running) {
try {
Thread.sleep(5000); // Flush every 5 seconds.
flushBuffer();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
LOGGER.error("Error in periodic flush", e);
}
}
});
},
FLUSH_INTERVAL_SECONDS, // Initial delay
FLUSH_INTERVAL_SECONDS, // Period
TimeUnit.SECONDS
);

LOGGER.info("Periodic log flusher started with interval of {} seconds", FLUSH_INTERVAL_SECONDS);
}
/**
* Gets the current number of buffered log entries.
* Useful for monitoring and testing.
*
* @return Current buffer size
*/
public int getBufferSize() {
return buffer.size();
}

/**
* Gets the current log count.
* Useful for monitoring and testing.
*
* @return Current log count
*/
public int getLogCount() {
return logCount.get();
}

/**
* Checks if the LogAggregator is currently running.
*
* @return true if running, false if stopped or stopping
*/
public boolean isRunning() {
return running;
}
}
Loading
Loading