Skip to content

Commit 063fa89

Browse files
authored
ESQL: Add a stream view to BreakingBytesRefBuilder (#133148)
Adds a `.stream()` method to get a `StreamOutput` view into `BreakingBytesRefBuilder`. This is useful in `COLLECT` because we need to write `XContent` into a `BreakingBytesRefBuilder` per document.
1 parent 5add49e commit 063fa89

File tree

2 files changed

+130
-19
lines changed

2 files changed

+130
-19
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/BreakingBytesRefBuilder.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.lucene.util.BytesRef;
1313
import org.apache.lucene.util.RamUsageEstimator;
1414
import org.elasticsearch.common.breaker.CircuitBreaker;
15+
import org.elasticsearch.common.io.stream.StreamOutput;
1516
import org.elasticsearch.core.Releasable;
1617

1718
/**
@@ -162,6 +163,13 @@ public long ramBytesUsed() {
162163
return SHALLOW_SIZE + bytesArrayRamBytesUsed(bytes.bytes.length);
163164
}
164165

166+
/**
167+
* Builds a {@link StreamOutput} view into the {@link BreakingBytesRefBuilder}.
168+
*/
169+
public StreamOutput stream() {
170+
return new Stream();
171+
}
172+
165173
private static long bytesArrayRamBytesUsed(long capacity) {
166174
return RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + capacity);
167175
}
@@ -170,4 +178,32 @@ private static long bytesArrayRamBytesUsed(long capacity) {
170178
public void close() {
171179
breaker.addWithoutBreaking(-ramBytesUsed());
172180
}
181+
182+
private class Stream extends StreamOutput {
183+
@Override
184+
public long position() {
185+
return length();
186+
}
187+
188+
@Override
189+
public void writeByte(byte b) {
190+
append(b);
191+
}
192+
193+
@Override
194+
public void writeBytes(byte[] b, int offset, int length) {
195+
append(b, offset, length);
196+
}
197+
198+
@Override
199+
public void flush() {}
200+
201+
/**
202+
* Closes this stream to further operations. NOOP because we don't want to
203+
* close the builder when we close.
204+
*/
205+
@Override
206+
public void close() {}
207+
}
208+
173209
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/BreakingBytesRefBuilderTests.java

Lines changed: 94 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,16 @@
1414
import org.apache.lucene.util.RamUsageEstimator;
1515
import org.elasticsearch.common.breaker.CircuitBreaker;
1616
import org.elasticsearch.common.breaker.CircuitBreakingException;
17+
import org.elasticsearch.common.io.stream.BytesRefStreamOutput;
18+
import org.elasticsearch.common.io.stream.StreamOutput;
1719
import org.elasticsearch.common.unit.ByteSizeValue;
1820
import org.elasticsearch.common.util.MockBigArrays;
1921
import org.elasticsearch.test.ESTestCase;
22+
import org.elasticsearch.xcontent.XContentBuilder;
23+
import org.elasticsearch.xcontent.json.JsonXContent;
2024

25+
import java.io.IOException;
26+
import java.io.UncheckedIOException;
2127
import java.util.function.Supplier;
2228

2329
import static org.hamcrest.Matchers.equalTo;
@@ -34,11 +40,6 @@ public void testAddByte() {
3440
testAgainstOracle(() -> new TestIteration() {
3541
final byte b = randomByte();
3642

37-
@Override
38-
public int size() {
39-
return 1;
40-
}
41-
4243
@Override
4344
public void applyToBuilder(BreakingBytesRefBuilder builder) {
4445
builder.append(b);
@@ -55,11 +56,6 @@ public void testAddBytesRef() {
5556
testAgainstOracle(() -> new TestIteration() {
5657
final BytesRef ref = new BytesRef(randomAlphaOfLengthBetween(1, 100));
5758

58-
@Override
59-
public int size() {
60-
return ref.length;
61-
}
62-
6359
@Override
6460
public void applyToBuilder(BreakingBytesRefBuilder builder) {
6561
builder.append(ref);
@@ -90,11 +86,6 @@ public void testGrow() {
9086
final int length = between(1, 100);
9187
final byte b = randomByte();
9288

93-
@Override
94-
public int size() {
95-
return length;
96-
}
97-
9889
@Override
9990
public void applyToBuilder(BreakingBytesRefBuilder builder) {
10091
builder.grow(builder.length() + length);
@@ -111,9 +102,90 @@ public void applyToOracle(BytesRefBuilder oracle) {
111102
});
112103
}
113104

114-
interface TestIteration {
115-
int size();
105+
public void testStream() {
106+
testAgainstOracle(() -> switch (between(0, 3)) {
107+
case 0 -> new XContentTestIteration() {
108+
@Override
109+
protected void apply(XContentBuilder builder) throws IOException {
110+
// Noop
111+
}
112+
113+
@Override
114+
public String toString() {
115+
return "noop";
116+
}
117+
};
118+
case 1 -> new XContentTestIteration() {
119+
private final String value = randomAlphanumericOfLength(10);
120+
121+
@Override
122+
protected void apply(XContentBuilder builder) throws IOException {
123+
builder.value(value);
124+
}
125+
126+
@Override
127+
public String toString() {
128+
return '"' + value + '"';
129+
}
130+
};
131+
case 2 -> new XContentTestIteration() {
132+
private final long value = randomLong();
133+
134+
@Override
135+
protected void apply(XContentBuilder builder) throws IOException {
136+
builder.value(value);
137+
}
138+
139+
@Override
140+
public String toString() {
141+
return Long.toString(value);
142+
}
143+
};
144+
case 3 -> new XContentTestIteration() {
145+
private final String name = randomAlphanumericOfLength(5);
146+
private final String value = randomAlphanumericOfLength(5);
147+
148+
@Override
149+
protected void apply(XContentBuilder builder) throws IOException {
150+
builder.startObject().field(name, value).endObject();
151+
}
152+
153+
@Override
154+
public String toString() {
155+
return name + ": " + value;
156+
}
157+
};
158+
default -> throw new UnsupportedOperationException();
159+
});
160+
}
161+
162+
private abstract static class XContentTestIteration implements TestIteration {
163+
protected abstract void apply(XContentBuilder builder) throws IOException;
164+
165+
@Override
166+
public void applyToBuilder(BreakingBytesRefBuilder builder) {
167+
applyToStream(builder.stream());
168+
}
169+
170+
@Override
171+
public void applyToOracle(BytesRefBuilder oracle) {
172+
BytesRefStreamOutput out = new BytesRefStreamOutput();
173+
applyToStream(out);
174+
oracle.append(out.get());
175+
}
176+
177+
private void applyToStream(StreamOutput out) {
178+
try {
179+
try (XContentBuilder builder = new XContentBuilder(JsonXContent.jsonXContent, out)) {
180+
apply(builder);
181+
}
182+
} catch (IOException e) {
183+
throw new UncheckedIOException(e);
184+
}
185+
}
186+
}
116187

188+
interface TestIteration {
117189
void applyToBuilder(BreakingBytesRefBuilder builder);
118190

119191
void applyToOracle(BytesRefBuilder oracle);
@@ -131,7 +203,11 @@ private void testAgainstOracle(Supplier<TestIteration> iterations) {
131203
assertThat(builder.bytesRefView(), equalTo(oracle.get()));
132204
while (true) {
133205
TestIteration iteration = iterations.get();
134-
int targetSize = builder.length() + iteration.size();
206+
207+
int prevOracle = oracle.length();
208+
iteration.applyToOracle(oracle);
209+
int size = oracle.length() - prevOracle;
210+
int targetSize = builder.length() + size;
135211
boolean willResize = targetSize >= builder.bytes().length;
136212
if (willResize) {
137213
long resizeMemoryUsage = BreakingBytesRefBuilder.SHALLOW_SIZE + ramForArray(builder.bytes().length);
@@ -143,7 +219,6 @@ private void testAgainstOracle(Supplier<TestIteration> iterations) {
143219
}
144220
}
145221
iteration.applyToBuilder(builder);
146-
iteration.applyToOracle(oracle);
147222
assertThat(builder.bytesRefView(), equalTo(oracle.get()));
148223
assertThat(
149224
builder.ramBytesUsed(),

0 commit comments

Comments
 (0)