Skip to content

Commit 93ded93

Browse files
committed
fix: memiavl WriteSnapshotWithContext cancel using wrong ctx
1 parent 8d5f462 commit 93ded93

File tree

2 files changed

+384
-1
lines changed

2 files changed

+384
-1
lines changed

memiavl/multitree.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,8 @@ func (t *MultiTree) WriteSnapshotWithContext(ctx context.Context, dir string, wp
371371
}
372372

373373
// write the snapshots in parallel and wait all jobs done
374-
group, _ := wp.GroupContext(context.Background())
374+
// group, _ := wp.GroupContext(context.Background())
375+
group, _ := wp.GroupContext(ctx)
375376

376377
for _, entry := range t.trees {
377378
tree, name := entry.Tree, entry.Name

memiavl/multitree_test.go

Lines changed: 382 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,382 @@
1+
package memiavl
2+
3+
import (
4+
"context"
5+
"os"
6+
"path/filepath"
7+
"sync/atomic"
8+
"testing"
9+
"time"
10+
11+
"github.com/alitto/pond"
12+
"github.com/stretchr/testify/require"
13+
)
14+
15+
func TestMultiTreeWriteSnapshotWithContextCancellation(t *testing.T) {
16+
mtree := NewEmptyMultiTree(0, 0)
17+
18+
stores := []string{"store1", "store2", "store3", "store4", "store5"}
19+
var upgrades []*TreeNameUpgrade
20+
for _, name := range stores {
21+
upgrades = append(upgrades, &TreeNameUpgrade{Name: name})
22+
}
23+
require.NoError(t, mtree.ApplyUpgrades(upgrades))
24+
25+
for _, storeName := range stores {
26+
tree := mtree.TreeByName(storeName)
27+
require.NotNil(t, tree)
28+
29+
for i := 0; i < 1000; i++ {
30+
tree.set([]byte(string(rune('a'+i%26))+string(rune('a'+(i/26)%26))), []byte("value"))
31+
}
32+
}
33+
34+
_, err := mtree.SaveVersion(true)
35+
require.NoError(t, err)
36+
37+
ctx, cancel := context.WithCancel(context.Background())
38+
39+
pool := pond.New(2, 10)
40+
defer pool.StopAndWait()
41+
42+
snapshotDir := t.TempDir()
43+
44+
cancel()
45+
46+
err = mtree.WriteSnapshotWithContext(ctx, snapshotDir, pool)
47+
48+
require.Error(t, err)
49+
require.ErrorIs(t, err, context.Canceled)
50+
}
51+
52+
func TestMultiTreeWriteSnapshotWithTimeoutContext(t *testing.T) {
53+
mtree := NewEmptyMultiTree(0, 0)
54+
55+
stores := []string{"store1", "store2", "store3"}
56+
var upgrades []*TreeNameUpgrade
57+
for _, name := range stores {
58+
upgrades = append(upgrades, &TreeNameUpgrade{Name: name})
59+
}
60+
require.NoError(t, mtree.ApplyUpgrades(upgrades))
61+
62+
for _, storeName := range stores {
63+
tree := mtree.TreeByName(storeName)
64+
require.NotNil(t, tree)
65+
66+
for i := 0; i < 500; i++ {
67+
tree.set([]byte(string(rune('a'+i%26))+string(rune('a'+(i/26)%26))), []byte("value"))
68+
}
69+
}
70+
71+
_, err := mtree.SaveVersion(true)
72+
require.NoError(t, err)
73+
74+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
75+
defer cancel()
76+
77+
time.Sleep(10 * time.Millisecond)
78+
79+
pool := pond.New(2, 10)
80+
defer pool.StopAndWait()
81+
82+
snapshotDir := t.TempDir()
83+
84+
err = mtree.WriteSnapshotWithContext(ctx, snapshotDir, pool)
85+
86+
require.Error(t, err)
87+
require.ErrorIs(t, err, context.DeadlineExceeded)
88+
}
89+
90+
func TestMultiTreeWriteSnapshotSuccessWithContext(t *testing.T) {
91+
mtree := NewEmptyMultiTree(0, 0)
92+
93+
stores := []string{"store1", "store2", "store3"}
94+
var upgrades []*TreeNameUpgrade
95+
for _, name := range stores {
96+
upgrades = append(upgrades, &TreeNameUpgrade{Name: name})
97+
}
98+
require.NoError(t, mtree.ApplyUpgrades(upgrades))
99+
100+
for _, storeName := range stores {
101+
tree := mtree.TreeByName(storeName)
102+
require.NotNil(t, tree)
103+
104+
for i := 0; i < 100; i++ {
105+
key := []byte(storeName + string(rune('a'+i%26)))
106+
value := []byte("value" + string(rune('0'+i%10)))
107+
tree.set(key, value)
108+
}
109+
}
110+
111+
_, err := mtree.SaveVersion(true)
112+
require.NoError(t, err)
113+
114+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
115+
defer cancel()
116+
117+
pool := pond.New(4, 10)
118+
defer pool.StopAndWait()
119+
120+
snapshotDir := t.TempDir()
121+
122+
err = mtree.WriteSnapshotWithContext(ctx, snapshotDir, pool)
123+
require.NoError(t, err)
124+
125+
// Verify all stores were written
126+
for _, storeName := range stores {
127+
storeDir := filepath.Join(snapshotDir, storeName)
128+
require.DirExists(t, storeDir)
129+
130+
// Verify metadata file exists
131+
metadataFile := filepath.Join(storeDir, FileNameMetadata)
132+
require.FileExists(t, metadataFile)
133+
}
134+
135+
// Verify metadata file was written at root
136+
metadataFile := filepath.Join(snapshotDir, MetadataFileName)
137+
require.FileExists(t, metadataFile)
138+
139+
// Verify we can load the snapshot back
140+
mtree2, err := LoadMultiTree(snapshotDir, false, 0)
141+
require.NoError(t, err)
142+
defer mtree2.Close()
143+
144+
require.Equal(t, mtree.Version(), mtree2.Version())
145+
require.Equal(t, len(mtree.trees), len(mtree2.trees))
146+
147+
// Verify data integrity
148+
for _, storeName := range stores {
149+
tree1 := mtree.TreeByName(storeName)
150+
tree2 := mtree2.TreeByName(storeName)
151+
require.NotNil(t, tree1)
152+
require.NotNil(t, tree2)
153+
require.Equal(t, tree1.RootHash(), tree2.RootHash())
154+
}
155+
}
156+
157+
func TestMultiTreeWriteSnapshotConcurrentCancellation(t *testing.T) {
158+
mtree := NewEmptyMultiTree(0, 0)
159+
160+
stores := []string{"store1", "store2", "store3", "store4", "store5", "store6", "store7", "store8"}
161+
var upgrades []*TreeNameUpgrade
162+
for _, name := range stores {
163+
upgrades = append(upgrades, &TreeNameUpgrade{Name: name})
164+
}
165+
require.NoError(t, mtree.ApplyUpgrades(upgrades))
166+
167+
for _, storeName := range stores {
168+
tree := mtree.TreeByName(storeName)
169+
require.NotNil(t, tree)
170+
171+
for i := 0; i < 2000; i++ {
172+
key := []byte(storeName + string(rune('a'+i%26)) + string(rune('a'+(i/26)%26)))
173+
value := []byte("value" + string(rune('0'+i%10)))
174+
tree.set(key, value)
175+
}
176+
}
177+
178+
_, err := mtree.SaveVersion(true)
179+
require.NoError(t, err)
180+
181+
ctx, cancel := context.WithCancel(context.Background())
182+
183+
pool := pond.New(2, 10)
184+
defer pool.StopAndWait()
185+
186+
snapshotDir := t.TempDir()
187+
188+
errChan := make(chan error, 1)
189+
go func() {
190+
errChan <- mtree.WriteSnapshotWithContext(ctx, snapshotDir, pool)
191+
}()
192+
193+
time.Sleep(5 * time.Millisecond)
194+
cancel()
195+
196+
err = <-errChan
197+
198+
// Should return context.Canceled error
199+
require.Error(t, err)
200+
require.ErrorIs(t, err, context.Canceled)
201+
202+
// Verify that the snapshot directory might be partially written or not at all
203+
// This is acceptable - the important part is that we got the error and stopped
204+
_, statErr := os.Stat(snapshotDir)
205+
if statErr == nil {
206+
// Directory exists, but may be incomplete - this is fine
207+
// The important thing is we stopped and returned an error
208+
t.Logf("this is acceptable")
209+
}
210+
}
211+
212+
func TestMultiTreeWriteSnapshotEmptyTree(t *testing.T) {
213+
mtree := NewEmptyMultiTree(0, 0)
214+
215+
stores := []string{"empty1", "empty2"}
216+
var upgrades []*TreeNameUpgrade
217+
for _, name := range stores {
218+
upgrades = append(upgrades, &TreeNameUpgrade{Name: name})
219+
}
220+
require.NoError(t, mtree.ApplyUpgrades(upgrades))
221+
222+
_, err := mtree.SaveVersion(true)
223+
require.NoError(t, err)
224+
225+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
226+
defer cancel()
227+
228+
pool := pond.New(4, 10)
229+
defer pool.StopAndWait()
230+
231+
snapshotDir := t.TempDir()
232+
233+
err = mtree.WriteSnapshotWithContext(ctx, snapshotDir, pool)
234+
require.NoError(t, err)
235+
236+
mtree2, err := LoadMultiTree(snapshotDir, false, 0)
237+
require.NoError(t, err)
238+
defer mtree2.Close()
239+
240+
require.Equal(t, mtree.Version(), mtree2.Version())
241+
}
242+
243+
func TestMultiTreeWriteSnapshotParallelWrites(t *testing.T) {
244+
mtree := NewEmptyMultiTree(0, 0)
245+
246+
stores := []string{"store1", "store2", "store3", "store4", "store5", "store6", "store7", "store8", "store9", "store10"}
247+
var upgrades []*TreeNameUpgrade
248+
for _, name := range stores {
249+
upgrades = append(upgrades, &TreeNameUpgrade{Name: name})
250+
}
251+
require.NoError(t, mtree.ApplyUpgrades(upgrades))
252+
253+
for _, storeName := range stores {
254+
tree := mtree.TreeByName(storeName)
255+
require.NotNil(t, tree)
256+
257+
for i := 0; i < 100; i++ {
258+
key := []byte(storeName + string(rune('a'+i%26)))
259+
value := []byte("value" + string(rune('0'+i%10)))
260+
tree.set(key, value)
261+
}
262+
}
263+
264+
_, err := mtree.SaveVersion(true)
265+
require.NoError(t, err)
266+
267+
ctx := context.Background()
268+
269+
poolSizes := []int{1, 2, 4, 8}
270+
for _, poolSize := range poolSizes {
271+
t.Run("PoolSize"+string(rune('0'+poolSize)), func(t *testing.T) {
272+
pool := pond.New(poolSize, poolSize*10)
273+
defer pool.StopAndWait()
274+
275+
snapshotDir := t.TempDir()
276+
277+
start := time.Now()
278+
err = mtree.WriteSnapshotWithContext(ctx, snapshotDir, pool)
279+
duration := time.Since(start)
280+
281+
require.NoError(t, err)
282+
t.Logf("Pool size %d completed in %v", poolSize, duration)
283+
284+
mtree2, err := LoadMultiTree(snapshotDir, false, 0)
285+
require.NoError(t, err)
286+
defer mtree2.Close()
287+
288+
require.Equal(t, mtree.Version(), mtree2.Version())
289+
for _, storeName := range stores {
290+
tree1 := mtree.TreeByName(storeName)
291+
tree2 := mtree2.TreeByName(storeName)
292+
require.Equal(t, tree1.RootHash(), tree2.RootHash())
293+
}
294+
})
295+
}
296+
}
297+
298+
// TestMultiTreeWorkerPoolQueuedTasksShouldNotStart tests that when context is
299+
// canceled, tasks that are queued but haven't started executing should NOT run.
300+
// This test DEMONSTRATES THE BUG at line 374 where context.Background() is used
301+
// instead of the passed ctx, causing all queued tasks to execute even after cancellation.
302+
func TestMultiTreeWorkerPoolQueuedTasksShouldNotStart(t *testing.T) {
303+
mtree := NewEmptyMultiTree(0, 0)
304+
305+
// Create many stores to ensure tasks will be queued
306+
numStores := 20
307+
var stores []string
308+
var upgrades []*TreeNameUpgrade
309+
for i := 0; i < numStores; i++ {
310+
name := "store" + string(rune('0'+i%10)) + string(rune('a'+i/10))
311+
stores = append(stores, name)
312+
upgrades = append(upgrades, &TreeNameUpgrade{Name: name})
313+
}
314+
require.NoError(t, mtree.ApplyUpgrades(upgrades))
315+
316+
// Don't add any data - use empty trees so writeLeaf won't be called
317+
// This means tasks won't check ctx.Done() internally
318+
_, err := mtree.SaveVersion(true)
319+
require.NoError(t, err)
320+
321+
ctx, cancel := context.WithCancel(context.Background())
322+
323+
// Create worker pool with only 1 worker but capacity for all tasks
324+
// This ensures most tasks will be queued waiting for the worker
325+
pool := pond.New(1, numStores)
326+
defer pool.StopAndWait()
327+
328+
// Track how many tasks actually executed
329+
var tasksExecuted atomic.Int32
330+
331+
// We need to slow down task execution so we can cancel while tasks are queued
332+
// We'll patch this by checking the execution count after cancellation
333+
334+
snapshotDir := t.TempDir()
335+
336+
// Cancel context immediately
337+
cancel()
338+
339+
// Now call WriteSnapshotWithContext
340+
// BUG: Because line 374 uses context.Background(), the worker pool group
341+
// doesn't know about the cancellation. All 20 tasks will be submitted to the pool.
342+
// With only 1 worker, they'll execute one by one.
343+
344+
// Since we're using empty trees, tree.WriteSnapshotWithContext doesn't actually
345+
// check ctx (no data to write means no ctx.Done() check in writeLeaf).
346+
// So all tasks will complete successfully despite ctx being canceled.
347+
348+
err = mtree.WriteSnapshotWithContext(ctx, snapshotDir, pool)
349+
350+
// With the BUG (context.Background() at line 374):
351+
// - All tasks get queued
352+
// - Worker executes them one by one
353+
// - Empty trees don't trigger context checks
354+
// - Result: err == nil (SUCCESS despite canceled context)
355+
//
356+
// With the FIX (using ctx at line 374):
357+
// - Worker pool's group context would be canceled
358+
// - Queued tasks wouldn't start
359+
// - Result: err == context.Canceled
360+
361+
if err == nil {
362+
// This proves the bug exists!
363+
t.Logf("BUG REPRODUCED: All %d tasks completed despite canceled context!", numStores)
364+
t.Logf("Tasks executed: %d", tasksExecuted.Load())
365+
t.Logf("This happens because line 374 uses context.Background() instead of ctx")
366+
367+
// Verify all stores were actually written (proving tasks ran)
368+
for _, storeName := range stores {
369+
storeDir := filepath.Join(snapshotDir, storeName)
370+
if _, err := os.Stat(storeDir); err == nil {
371+
tasksExecuted.Add(1)
372+
}
373+
}
374+
375+
t.Logf("Verified: %d stores were written to disk", tasksExecuted.Load())
376+
t.Fatal("Expected context.Canceled error but got nil - this proves the bug at line 374")
377+
} else {
378+
// If we get here, the bug has been fixed
379+
t.Logf("Bug is FIXED: Got expected error: %v", err)
380+
require.ErrorIs(t, err, context.Canceled)
381+
}
382+
}

0 commit comments

Comments
 (0)