@@ -859,23 +859,17 @@ public void reset() {
859
859
860
860
861
861
/**
862
- * An abstract base implementation of {@link NestedMatcher} that looks for
863
- * a specific delimiter in a {@link DataBuffer}.
864
- * <p>
865
- * Uses a thread-local buffer to scan data in chunks, reducing memory
866
- * allocations and improving performance when processing large buffers.
867
- * </p>
862
+ * Base {@link NestedMatcher} implementation that scans a {@link DataBuffer}
863
+ * for a specific delimiter.
868
864
*
869
- * <p>
870
- * Each matcher keeps its own match state, so it is intended for
871
- * single-threaded use. The thread-local buffer ensures that multiple
872
- * threads can run their own matchers independently without interfering.
873
- * </p>
865
+ * <p>Relies on a per-instance reusable buffer to scan data in chunks,
866
+ * minimizing allocations and improving performance for large or streaming data.</p>
874
867
*
875
- * <p>
876
- * Subclasses can extend this class to add custom matching behavior while
877
- * reusing the built-in delimiter tracking and scanning logic.
878
- * </p>
868
+ * <p>Each matcher maintains its own state and buffer, ensuring safe use
869
+ * in reactive pipelines where execution may shift across threads.</p>
870
+ *
871
+ * <p>Subclasses may extend this class to customize matching strategies
872
+ * while reusing the built-in delimiter tracking and scanning logic.</p>
879
873
*
880
874
* @see NestedMatcher
881
875
* @see DataBuffer
@@ -886,8 +880,7 @@ private abstract static class AbstractNestedMatcher implements NestedMatcher {
886
880
887
881
private int matches = 0 ;
888
882
889
- // Thread-local chunk buffer to avoid per-call allocations
890
- private static final ThreadLocal <byte []> LOCAL_BUFFER = ThreadLocal .withInitial (() -> new byte [8 * 1024 ]);
883
+ private final byte [] localBuffer = new byte [8 * 1024 ]; // Reusable buffer per matcher instance
891
884
892
885
protected AbstractNestedMatcher (byte [] delimiter ) {
893
886
this .delimiter = delimiter ;
@@ -901,33 +894,29 @@ protected int getMatches() {
901
894
return this .matches ;
902
895
}
903
896
904
- protected static void releaseLocalBuffer () {
905
- LOCAL_BUFFER .remove ();
906
- }
907
-
908
897
@ Override
909
898
public int match (DataBuffer dataBuffer ) {
910
899
final int readPos = dataBuffer .readPosition ();
911
900
final int writePos = dataBuffer .writePosition ();
912
901
final int length = writePos - readPos ;
913
902
914
- final byte [] delimiter0 = this .delimiter ;
915
- final int delimiterLen = delimiter0 .length ;
916
- final byte delimiter1 = delimiter0 [0 ];
903
+ final byte [] delimiterBytes = this .delimiter ;
904
+ final int delimiterLength = delimiterBytes .length ;
905
+ final byte delimiterFirstByte = delimiterBytes [0 ];
917
906
918
- int matchIndex = this .matches ;
919
-
920
- final byte [] chunk = LOCAL_BUFFER .get ();
907
+ final byte [] chunk = localBuffer ;
921
908
final int chunkSize = Math .min (chunk .length , length );
922
909
910
+ int matchIndex = this .matches ;
911
+
923
912
try {
924
913
for (int offset = 0 ; offset < length ; offset += chunkSize ) {
925
914
int currentChunkSize = Math .min (chunkSize , length - offset );
926
915
927
916
dataBuffer .readPosition (readPos + offset );
928
917
dataBuffer .read (chunk , 0 , currentChunkSize );
929
918
930
- matchIndex = processChunk (chunk , currentChunkSize , delimiter0 , delimiterLen , delimiter1 , matchIndex , readPos , offset );
919
+ matchIndex = processChunk (chunk , currentChunkSize , delimiterBytes , delimiterLength , delimiterFirstByte , matchIndex , readPos , offset );
931
920
if (matchIndex < 0 ) {
932
921
return -(matchIndex + 1 ); // found, returning actual position
933
922
}
@@ -938,21 +927,20 @@ public int match(DataBuffer dataBuffer) {
938
927
}
939
928
finally {
940
929
dataBuffer .readPosition (readPos ); // restore original position
941
- releaseLocalBuffer ();
942
930
}
943
931
}
944
932
945
- private int processChunk (byte [] chunk , int currentChunkSize , byte [] delimiter0 , int delimiterLen , byte delimiter1 , int matchIndex , int readPos , int offset ) {
933
+ private int processChunk (byte [] chunk , int currentChunkSize , byte [] delimiterBytes , int delimiterLen , byte delimiterFirstByte , int matchIndex , int readPos , int offset ) {
946
934
int i = 0 ;
947
935
while (i < currentChunkSize ) {
948
936
if (matchIndex == 0 ) {
949
- i = findNextCandidate (chunk , i , currentChunkSize , delimiter1 );
937
+ i = findNextCandidate (chunk , i , currentChunkSize , delimiterFirstByte );
950
938
if (i >= currentChunkSize ) {
951
939
return matchIndex ; // no candidate in this chunk
952
940
}
953
941
}
954
942
955
- matchIndex = updateMatchIndex (chunk [i ], delimiter0 , delimiterLen , delimiter1 , matchIndex );
943
+ matchIndex = updateMatchIndex (chunk [i ], delimiterBytes , delimiterLen , delimiterFirstByte , matchIndex );
956
944
if (matchIndex == -1 ) {
957
945
return -(readPos + offset + i + 1 ); // return found delimiter position (encoded as negative)
958
946
}
@@ -961,24 +949,24 @@ private int processChunk(byte[] chunk, int currentChunkSize, byte[] delimiter0,
961
949
return matchIndex ;
962
950
}
963
951
964
- private int findNextCandidate (byte [] chunk , int start , int limit , byte delimiter1 ) {
952
+ private int findNextCandidate (byte [] chunk , int start , int limit , byte delimiterFirstByte ) {
965
953
int j = start ;
966
- while (j < limit && chunk [j ] != delimiter1 ) {
954
+ while (j < limit && chunk [j ] != delimiterFirstByte ) {
967
955
j ++;
968
956
}
969
957
return j ;
970
958
}
971
959
972
- private int updateMatchIndex (byte b , byte [] delimiter0 , int delimiterLen , byte delimiter1 , int matchIndex ) {
973
- if (b == delimiter0 [matchIndex ]) {
960
+ private int updateMatchIndex (byte b , byte [] delimiterBytes , int delimiterLen , byte delimiterFirstByte , int matchIndex ) {
961
+ if (b == delimiterBytes [matchIndex ]) {
974
962
matchIndex ++;
975
963
if (matchIndex == delimiterLen ) {
976
964
reset ();
977
965
return -1 ;
978
966
}
979
967
}
980
968
else {
981
- matchIndex = (b == delimiter1 ) ? 1 : 0 ;
969
+ matchIndex = (b == delimiterFirstByte ) ? 1 : 0 ;
982
970
}
983
971
return matchIndex ;
984
972
}
0 commit comments