diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 4c3c430e954..4079e4d9a3c 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -61,6 +61,7 @@ * * @author Arjen Poutsma * @author Brian Clozel + * @author Nabil Fawwaz Elqayyim * @since 5.0 */ public abstract class DataBufferUtils { @@ -859,7 +860,20 @@ public void reset() { /** - * Base class for a {@link NestedMatcher}. + * Base {@link NestedMatcher} implementation that scans a {@link DataBuffer} + * for a specific delimiter. + * + *
Relies on a per-instance reusable buffer to scan data in chunks, + * minimizing allocations and improving performance for large or streaming data.
+ * + *Each matcher maintains its own state and buffer, ensuring safe use + * in reactive pipelines where execution may shift across threads.
+ * + *Subclasses may extend this class to customize matching strategies + * while reusing the built-in delimiter tracking and scanning logic.
+ * + * @see NestedMatcher + * @see DataBuffer */ private abstract static class AbstractNestedMatcher implements NestedMatcher { @@ -867,6 +881,7 @@ private abstract static class AbstractNestedMatcher implements NestedMatcher { private int matches = 0; + private final byte[] localBuffer = new byte[8 * 1024]; // Reusable buffer per matcher instance protected AbstractNestedMatcher(byte[] delimiter) { this.delimiter = delimiter; @@ -882,14 +897,79 @@ protected int getMatches() { @Override public int match(DataBuffer dataBuffer) { - for (int pos = dataBuffer.readPosition(); pos < dataBuffer.writePosition(); pos++) { - byte b = dataBuffer.getByte(pos); - if (match(b)) { + final int readPos = dataBuffer.readPosition(); + final int writePos = dataBuffer.writePosition(); + final int length = writePos - readPos; + + final byte[] delimiterBytes = this.delimiter; + final int delimiterLength = delimiterBytes.length; + final byte delimiterFirstByte = delimiterBytes[0]; + + final byte[] chunk = localBuffer; + final int chunkSize = Math.min(chunk.length, length); + + int matchIndex = this.matches; + + try { + for (int offset = 0; offset < length; offset += chunkSize) { + int currentChunkSize = Math.min(chunkSize, length - offset); + + dataBuffer.readPosition(readPos + offset); + dataBuffer.read(chunk, 0, currentChunkSize); + + matchIndex = processChunk(chunk, currentChunkSize, delimiterBytes, delimiterLength, delimiterFirstByte, matchIndex, readPos, offset); + if (matchIndex < 0) { + return -(matchIndex + 1); // found, returning actual position + } + } + + this.matches = matchIndex; + return -1; + } + finally { + dataBuffer.readPosition(readPos); // restore original position + } + } + + private int processChunk(byte[] chunk, int currentChunkSize, byte[] delimiterBytes, int delimiterLen, byte delimiterFirstByte, int matchIndex, int readPos, int offset) { + int i = 0; + while (i < currentChunkSize) { + if (matchIndex == 0) { + i = findNextCandidate(chunk, i, currentChunkSize, delimiterFirstByte); + if (i >= currentChunkSize) { + return matchIndex; // no candidate in this chunk + } + } + + matchIndex = updateMatchIndex(chunk[i], delimiterBytes, delimiterLen, delimiterFirstByte, matchIndex); + if (matchIndex == -1) { + return -(readPos + offset + i + 1); // return found delimiter position (encoded as negative) + } + i++; + } + return matchIndex; + } + + private int findNextCandidate(byte[] chunk, int start, int limit, byte delimiterFirstByte) { + int j = start; + while (j < limit && chunk[j] != delimiterFirstByte) { + j++; + } + return j; + } + + private int updateMatchIndex(byte b, byte[] delimiterBytes, int delimiterLen, byte delimiterFirstByte, int matchIndex) { + if (b == delimiterBytes[matchIndex]) { + matchIndex++; + if (matchIndex == delimiterLen) { reset(); - return pos; + return -1; } } - return -1; + else { + matchIndex = (b == delimiterFirstByte) ? 1 : 0; + } + return matchIndex; } @Override @@ -1026,7 +1106,7 @@ private static class ReadCompletionHandler implements CompletionHandler