Skip to content

Commit acba512

Browse files
simon28082mangalaman93
authored andcommitted
Allow stream custom maxsize per batch
1 parent 16b63df commit acba512

File tree

2 files changed

+72
-1
lines changed

2 files changed

+72
-1
lines changed

stream.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,14 @@ type Stream struct {
6262
// Note: Calls to ChooseKey are concurrent.
6363
ChooseKey func(item *Item) bool
6464

65+
// MaxSize is the maximum allowed size of a stream batch. This is a soft limit
66+
// as a single list that is still over the limit will have to be sent as is since it
67+
// cannot be split further. This limit prevents the framework from creating batches
68+
// so big that sending them causes issues (e.g running into the max size gRPC limit).
69+
// If necessary, set it up before the Stream starts synchronisation
70+
// This is not a concurrency-safe setting
71+
MaxSize uint64
72+
6573
// KeyToList, similar to ChooseKey, is only invoked on the highest version of the value. It
6674
// is upto the caller to iterate over the versions and generate zero, one or more KVs. It
6775
// is expected that the user would advance the iterator to go through the versions of the
@@ -315,7 +323,7 @@ func (st *Stream) streamKVs(ctx context.Context) error {
315323
// Send the batch immediately if it already exceeds the maximum allowed size.
316324
// If the size of the batch exceeds maxStreamSize, break from the loop to
317325
// avoid creating a batch that is so big that certain limits are reached.
318-
if batch.LenNoPadding() > int(maxStreamSize) {
326+
if batch.LenNoPadding() > int(st.MaxSize) {
319327
break loop
320328
}
321329
select {
@@ -452,6 +460,7 @@ func (db *DB) newStream() *Stream {
452460
db: db,
453461
NumGo: db.opt.NumGoroutines,
454462
LogPrefix: "Badger.Stream",
463+
MaxSize: maxStreamSize,
455464
}
456465
}
457466

stream_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,68 @@ func TestStream(t *testing.T) {
171171
require.NoError(t, db.Close())
172172
}
173173

174+
175+
func TestStreamMaxSize(t *testing.T) {
176+
if !*manual {
177+
t.Skip("Skipping test meant to be run manually.")
178+
return
179+
}
180+
// Set the maxStreamSize to 1MB for the duration of the test so that the it can use a smaller
181+
// dataset than it would otherwise need.
182+
originalMaxStreamSize := maxStreamSize
183+
maxStreamSize = 1 << 20
184+
defer func() {
185+
maxStreamSize = originalMaxStreamSize
186+
}()
187+
188+
testSize := int(1e6)
189+
dir, err := os.MkdirTemp("", "badger-big-test")
190+
require.NoError(t, err)
191+
defer removeDir(dir)
192+
193+
db, err := OpenManaged(DefaultOptions(dir))
194+
require.NoError(t, err)
195+
196+
var count int
197+
wb := db.NewWriteBatchAt(5)
198+
for _, prefix := range []string{"p0", "p1", "p2"} {
199+
for i := 1; i <= testSize; i++ {
200+
require.NoError(t, wb.SetEntry(NewEntry(keyWithPrefix(prefix, i), value(i))))
201+
count++
202+
}
203+
}
204+
require.NoError(t, wb.Flush())
205+
206+
stream := db.NewStreamAt(math.MaxUint64)
207+
stream.LogPrefix = "Testing"
208+
c := &collector{}
209+
stream.Send = c.Send
210+
211+
// default value
212+
require.Equal(t, stream.MaxSize, maxStreamSize)
213+
214+
// reset maxsize
215+
stream.MaxSize = 1024 * 1024 * 50
216+
217+
// Test case 1. Retrieve everything.
218+
err = stream.Orchestrate(ctxb)
219+
require.NoError(t, err)
220+
require.Equal(t, 3*testSize, len(c.kv), "Expected 30000. Got: %d", len(c.kv))
221+
222+
m := make(map[string]int)
223+
for _, kv := range c.kv {
224+
prefix, ki := keyToInt(kv.Key)
225+
expected := value(ki)
226+
require.Equal(t, expected, kv.Value)
227+
m[prefix]++
228+
}
229+
require.Equal(t, 3, len(m))
230+
for pred, count := range m {
231+
require.Equal(t, testSize, count, "Count mismatch for pred: %s", pred)
232+
}
233+
require.NoError(t, db.Close())
234+
}
235+
174236
func TestStreamWithThreadId(t *testing.T) {
175237
dir, err := os.MkdirTemp("", "badger-test")
176238
require.NoError(t, err)

0 commit comments

Comments
 (0)