diff --git a/.github/workflows/ci-golang-lint.yml b/.github/workflows/ci-golang-lint.yml index 21161ecc1..fe3c0f684 100644 --- a/.github/workflows/ci-golang-lint.yml +++ b/.github/workflows/ci-golang-lint.yml @@ -26,7 +26,7 @@ jobs: with: go-version: ${{ env.GOVERSION }} - name: golang-lint - uses: golangci/golangci-lint-action@v6 + uses: golangci/golangci-lint-action@v6.1.1 with: # Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version. version: latest diff --git a/stream.go b/stream.go index 21c5e9926..d755ba99d 100644 --- a/stream.go +++ b/stream.go @@ -62,6 +62,14 @@ type Stream struct { // Note: Calls to ChooseKey are concurrent. ChooseKey func(item *Item) bool + // MaxSize is the maximum allowed size of a stream batch. This is a soft limit + // as a single list that is still over the limit will have to be sent as is since it + // cannot be split further. This limit prevents the framework from creating batches + // so big that sending them causes issues (e.g running into the max size gRPC limit). + // If necessary, set it up before the Stream starts synchronisation + // This is not a concurrency-safe setting + MaxSize uint64 + // KeyToList, similar to ChooseKey, is only invoked on the highest version of the value. It // is upto the caller to iterate over the versions and generate zero, one or more KVs. It // is expected that the user would advance the iterator to go through the versions of the @@ -315,7 +323,7 @@ func (st *Stream) streamKVs(ctx context.Context) error { // Send the batch immediately if it already exceeds the maximum allowed size. // If the size of the batch exceeds maxStreamSize, break from the loop to // avoid creating a batch that is so big that certain limits are reached. - if batch.LenNoPadding() > int(maxStreamSize) { + if uint64(batch.LenNoPadding()) > st.MaxSize { break loop } select { @@ -452,6 +460,7 @@ func (db *DB) newStream() *Stream { db: db, NumGo: db.opt.NumGoroutines, LogPrefix: "Badger.Stream", + MaxSize: maxStreamSize, } } diff --git a/stream_test.go b/stream_test.go index 10af62836..a910f544f 100644 --- a/stream_test.go +++ b/stream_test.go @@ -171,6 +171,67 @@ func TestStream(t *testing.T) { require.NoError(t, db.Close()) } +func TestStreamMaxSize(t *testing.T) { + if !*manual { + t.Skip("Skipping test meant to be run manually.") + return + } + // Set the maxStreamSize to 1MB for the duration of the test so that the it can use a smaller + // dataset than it would otherwise need. + originalMaxStreamSize := maxStreamSize + maxStreamSize = 1 << 20 + defer func() { + maxStreamSize = originalMaxStreamSize + }() + + testSize := int(1e6) + dir, err := os.MkdirTemp("", "badger-big-test") + require.NoError(t, err) + defer removeDir(dir) + + db, err := OpenManaged(DefaultOptions(dir)) + require.NoError(t, err) + + var count int + wb := db.NewWriteBatchAt(5) + for _, prefix := range []string{"p0", "p1", "p2"} { + for i := 1; i <= testSize; i++ { + require.NoError(t, wb.SetEntry(NewEntry(keyWithPrefix(prefix, i), value(i)))) + count++ + } + } + require.NoError(t, wb.Flush()) + + stream := db.NewStreamAt(math.MaxUint64) + stream.LogPrefix = "Testing" + c := &collector{} + stream.Send = c.Send + + // default value + require.Equal(t, stream.MaxSize, maxStreamSize) + + // reset maxsize + stream.MaxSize = 1024 * 1024 * 50 + + // Test case 1. Retrieve everything. + err = stream.Orchestrate(ctxb) + require.NoError(t, err) + require.Equal(t, 3*testSize, len(c.kv), "Expected 30000. Got: %d", len(c.kv)) + + m := make(map[string]int) + for _, kv := range c.kv { + prefix, ki := keyToInt(kv.Key) + expected := value(ki) + require.Equal(t, expected, kv.Value) + m[prefix]++ + } + require.Equal(t, 3, len(m)) + for pred, count := range m { + require.Equal(t, testSize, count, "Count mismatch for pred: %s", pred) + } + require.NoError(t, db.Close()) +} + func TestStreamWithThreadId(t *testing.T) { dir, err := os.MkdirTemp("", "badger-test") require.NoError(t, err)