Skip to content

Commit 10a09e6

Browse files
authored
Allow stream custom maxsize per batch (#2063)
1 parent ca7c9d4 commit 10a09e6

File tree

3 files changed

+72
-2
lines changed

3 files changed

+72
-2
lines changed

.github/workflows/ci-golang-lint.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ jobs:
2626
with:
2727
go-version: ${{ env.GOVERSION }}
2828
- name: golang-lint
29-
uses: golangci/golangci-lint-action@v6
29+
uses: golangci/golangci-lint-action@v6.1.1
3030
with:
3131
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
3232
version: latest

stream.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,14 @@ type Stream struct {
6262
// Note: Calls to ChooseKey are concurrent.
6363
ChooseKey func(item *Item) bool
6464

65+
// MaxSize is the maximum allowed size of a stream batch. This is a soft limit
66+
// as a single list that is still over the limit will have to be sent as is since it
67+
// cannot be split further. This limit prevents the framework from creating batches
68+
// so big that sending them causes issues (e.g running into the max size gRPC limit).
69+
// If necessary, set it up before the Stream starts synchronisation
70+
// This is not a concurrency-safe setting
71+
MaxSize uint64
72+
6573
// KeyToList, similar to ChooseKey, is only invoked on the highest version of the value. It
6674
// is upto the caller to iterate over the versions and generate zero, one or more KVs. It
6775
// is expected that the user would advance the iterator to go through the versions of the
@@ -315,7 +323,7 @@ func (st *Stream) streamKVs(ctx context.Context) error {
315323
// Send the batch immediately if it already exceeds the maximum allowed size.
316324
// If the size of the batch exceeds maxStreamSize, break from the loop to
317325
// avoid creating a batch that is so big that certain limits are reached.
318-
if batch.LenNoPadding() > int(maxStreamSize) {
326+
if uint64(batch.LenNoPadding()) > st.MaxSize {
319327
break loop
320328
}
321329
select {
@@ -452,6 +460,7 @@ func (db *DB) newStream() *Stream {
452460
db: db,
453461
NumGo: db.opt.NumGoroutines,
454462
LogPrefix: "Badger.Stream",
463+
MaxSize: maxStreamSize,
455464
}
456465
}
457466

stream_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,67 @@ func TestStream(t *testing.T) {
171171
require.NoError(t, db.Close())
172172
}
173173

174+
func TestStreamMaxSize(t *testing.T) {
175+
if !*manual {
176+
t.Skip("Skipping test meant to be run manually.")
177+
return
178+
}
179+
// Set the maxStreamSize to 1MB for the duration of the test so that the it can use a smaller
180+
// dataset than it would otherwise need.
181+
originalMaxStreamSize := maxStreamSize
182+
maxStreamSize = 1 << 20
183+
defer func() {
184+
maxStreamSize = originalMaxStreamSize
185+
}()
186+
187+
testSize := int(1e6)
188+
dir, err := os.MkdirTemp("", "badger-big-test")
189+
require.NoError(t, err)
190+
defer removeDir(dir)
191+
192+
db, err := OpenManaged(DefaultOptions(dir))
193+
require.NoError(t, err)
194+
195+
var count int
196+
wb := db.NewWriteBatchAt(5)
197+
for _, prefix := range []string{"p0", "p1", "p2"} {
198+
for i := 1; i <= testSize; i++ {
199+
require.NoError(t, wb.SetEntry(NewEntry(keyWithPrefix(prefix, i), value(i))))
200+
count++
201+
}
202+
}
203+
require.NoError(t, wb.Flush())
204+
205+
stream := db.NewStreamAt(math.MaxUint64)
206+
stream.LogPrefix = "Testing"
207+
c := &collector{}
208+
stream.Send = c.Send
209+
210+
// default value
211+
require.Equal(t, stream.MaxSize, maxStreamSize)
212+
213+
// reset maxsize
214+
stream.MaxSize = 1024 * 1024 * 50
215+
216+
// Test case 1. Retrieve everything.
217+
err = stream.Orchestrate(ctxb)
218+
require.NoError(t, err)
219+
require.Equal(t, 3*testSize, len(c.kv), "Expected 30000. Got: %d", len(c.kv))
220+
221+
m := make(map[string]int)
222+
for _, kv := range c.kv {
223+
prefix, ki := keyToInt(kv.Key)
224+
expected := value(ki)
225+
require.Equal(t, expected, kv.Value)
226+
m[prefix]++
227+
}
228+
require.Equal(t, 3, len(m))
229+
for pred, count := range m {
230+
require.Equal(t, testSize, count, "Count mismatch for pred: %s", pred)
231+
}
232+
require.NoError(t, db.Close())
233+
}
234+
174235
func TestStreamWithThreadId(t *testing.T) {
175236
dir, err := os.MkdirTemp("", "badger-test")
176237
require.NoError(t, err)

0 commit comments

Comments
 (0)