Skip to content

Commit 79046db

Browse files
committed
Ported core part of original logstash-plugins#410 PR
1 parent 0833d76 commit 79046db

File tree

1 file changed

+79
-5
lines changed

1 file changed

+79
-5
lines changed

src/main/java/org/logstash/beats/BeatsParser.java

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33

44
import io.netty.buffer.ByteBuf;
55
import io.netty.buffer.ByteBufOutputStream;
6+
import io.netty.buffer.PooledByteBufAllocator;
67
import io.netty.channel.ChannelHandlerContext;
78
import io.netty.handler.codec.ByteToMessageDecoder;
9+
import io.netty.handler.codec.DecoderException;
810
import org.apache.logging.log4j.LogManager;
911
import org.apache.logging.log4j.Logger;
1012

@@ -14,12 +16,14 @@
1416
import java.util.HashMap;
1517
import java.util.List;
1618
import java.util.Map;
19+
import java.util.concurrent.TimeUnit;
1720
import java.util.zip.Inflater;
1821
import java.util.zip.InflaterOutputStream;
1922

2023

2124
public class BeatsParser extends ByteToMessageDecoder {
2225
private final static Logger logger = LogManager.getLogger(BeatsParser.class);
26+
private final static long maxDirectMemory = io.netty.util.internal.PlatformDependent.maxDirectMemory();
2327

2428
private Batch batch;
2529

@@ -45,15 +49,18 @@ private enum States {
4549
private int requiredBytes = 0;
4650
private int sequence = 0;
4751
private boolean decodingCompressedBuffer = false;
52+
private long usedDirectMemory;
53+
private boolean closeCalled = false;
4854

4955
@Override
5056
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws InvalidFrameProtocolException, IOException {
51-
if(!hasEnoughBytes(in)) {
52-
if (decodingCompressedBuffer){
57+
if (!hasEnoughBytes(in)) {
58+
if (decodingCompressedBuffer) {
5359
throw new InvalidFrameProtocolException("Insufficient bytes in compressed content to decode: " + currentState);
5460
}
5561
return;
5662
}
63+
usedDirectMemory = ((PooledByteBufAllocator) ctx.alloc()).metric().usedDirectMemory();
5764

5865
switch (currentState) {
5966
case READ_HEADER: {
@@ -178,6 +185,13 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
178185

179186
case READ_COMPRESSED_FRAME: {
180187
logger.trace("Running: READ_COMPRESSED_FRAME");
188+
189+
if (usedDirectMemory + requiredBytes > maxDirectMemory * 0.90) {
190+
ctx.channel().config().setAutoRead(false);
191+
ctx.close();
192+
closeCalled = true;
193+
throw new IOException("not enough memory to decompress this from " + ctx.channel().id());
194+
}
181195
inflateCompressedFrame(ctx, in, (buffer) -> {
182196
transition(States.READ_HEADER);
183197

@@ -188,16 +202,20 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
188202
}
189203
} finally {
190204
decodingCompressedBuffer = false;
205+
ctx.channel().config().setAutoRead(false);
206+
ctx.channel().eventLoop().schedule(() -> {
207+
ctx.channel().config().setAutoRead(true);
208+
}, 5, TimeUnit.MILLISECONDS);
191209
transition(States.READ_HEADER);
192210
}
193211
});
194212
break;
195213
}
196214
case READ_JSON: {
197215
logger.trace("Running: READ_JSON");
198-
((V2Batch)batch).addMessage(sequence, in, requiredBytes);
199-
if(batch.isComplete()) {
200-
if(logger.isTraceEnabled()) {
216+
((V2Batch) batch).addMessage(sequence, in, requiredBytes);
217+
if (batch.isComplete()) {
218+
if (logger.isTraceEnabled()) {
201219
logger.trace("Sending batch size: " + this.batch.size() + ", windowSize: " + batch.getBatchSize() + " , seq: " + sequence);
202220
}
203221
out.add(batch);
@@ -256,6 +274,62 @@ private void batchComplete() {
256274
batch = null;
257275
}
258276

277+
@Override
278+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
279+
//System.out.println("channelRead(" + ctx.channel().isActive() + ": " + ctx.channel().id() + ":" + currentState + ":" + decodingCompressedBuffer);
280+
if (closeCalled) {
281+
((ByteBuf) msg).release();
282+
//if(batch != null) batch.release();
283+
return;
284+
}
285+
usedDirectMemory = ((PooledByteBufAllocator) ctx.alloc()).metric().usedDirectMemory();
286+
287+
// If we're just beginning a new frame on this channel,
288+
// don't accumulate more data for 25 ms if usage of direct memory is above 20%
289+
//
290+
// The goal here is to avoid thundering herd: many beats connecting and sending data
291+
// at the same time. As some channels progress to other states they'll use more memory
292+
// but also give it back once a full batch is read.
293+
if ((!decodingCompressedBuffer) && (this.currentState != States.READ_COMPRESSED_FRAME)) {
294+
if (usedDirectMemory > (maxDirectMemory * 0.40)) {
295+
ctx.channel().config().setAutoRead(false);
296+
//System.out.println("pausing reads on " + ctx.channel().id());
297+
ctx.channel().eventLoop().schedule(() -> {
298+
//System.out.println("resuming reads on " + ctx.channel().id());
299+
ctx.channel().config().setAutoRead(true);
300+
}, 200, TimeUnit.MILLISECONDS);
301+
} else {
302+
//System.out.println("no need to pause reads on " + ctx.channel().id());
303+
}
304+
} else if (usedDirectMemory > maxDirectMemory * 0.90) {
305+
ctx.channel().config().setAutoRead(false);
306+
ctx.close();
307+
closeCalled = true;
308+
((ByteBuf) msg).release();
309+
if (batch != null) batch.release();
310+
throw new IOException("about to explode, cut them all down " + ctx.channel().id());
311+
}
312+
super.channelRead(ctx, msg);
313+
}
314+
315+
@Override
316+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
317+
System.out.println(cause.getClass().toString() + ":" + ctx.channel().id().toString() + ":" + this.currentState + "|" + cause.getMessage());
318+
if (cause instanceof DecoderException) {
319+
ctx.channel().config().setAutoRead(false);
320+
if (!closeCalled) ctx.close();
321+
} else if (cause instanceof OutOfMemoryError) {
322+
cause.printStackTrace();
323+
ctx.channel().config().setAutoRead(false);
324+
if (!closeCalled) ctx.close();
325+
} else if (cause instanceof IOException) {
326+
ctx.channel().config().setAutoRead(false);
327+
if (!closeCalled) ctx.close();
328+
} else {
329+
super.exceptionCaught(ctx, cause);
330+
}
331+
}
332+
259333
@FunctionalInterface
260334
private interface CheckedConsumer<T> {
261335
void accept(T t) throws IOException;

0 commit comments

Comments
 (0)