Skip to content

Commit 4ca0713

Browse files
authored
fix(core): Fix race condition in mutation map (#9473)
1 parent 12f338f commit 4ca0713

File tree

5 files changed

+146
-12
lines changed

5 files changed

+146
-12
lines changed

posting/index_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ func TestTokensTable(t *testing.T) {
173173
attr := x.AttrInRootNamespace("name")
174174
key := x.DataKey(attr, 1)
175175
l, err := getNew(key, ps, math.MaxUint64, false)
176+
l.mutationMap.readTs = 1
176177
require.NoError(t, err)
177178

178179
edge := &pb.DirectedEdge{

posting/list.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func (mm *MutableLayer) setCurrentEntries(ts uint64, pl *pb.PostingList) {
155155
return
156156
}
157157
if mm.readTs != 0 {
158-
x.AssertTrue(mm.readTs == ts)
158+
x.AssertTruef(mm.readTs == ts, "List object reused for a different transaction %d %d", mm.readTs, ts)
159159
}
160160

161161
mm.readTs = ts
@@ -353,7 +353,7 @@ func (mm *MutableLayer) populateUidMap(pl *pb.PostingList) {
353353
// insertPosting inserts a new posting in the mutable layers. It updates the currentUids map.
354354
func (mm *MutableLayer) insertPosting(mpost *pb.Posting, hasCountIndex bool) {
355355
if mm.readTs != 0 {
356-
x.AssertTrue(mpost.StartTs == mm.readTs)
356+
x.AssertTruef(mpost.StartTs == mm.readTs, "Diffrenent read ts and start ts found %d %d", mpost.StartTs, mm.readTs)
357357
}
358358

359359
mm.readTs = mpost.StartTs
@@ -410,7 +410,7 @@ func (mm *MutableLayer) print() string {
410410
mm.deleteAllMarker)
411411
}
412412

413-
func (l *List) print() string {
413+
func (l *List) Print() string {
414414
return fmt.Sprintf("minTs: %d, plist: %+v, mutationMap: %s", l.minTs, l.plist, l.mutationMap.print())
415415
}
416416

@@ -699,6 +699,7 @@ func (it *pIterator) posting() *pb.Posting {
699699
it.pidx++
700700
}
701701
it.uidPosting.Uid = uid
702+
it.uidPosting.ValType = pb.Posting_UID
702703
return it.uidPosting
703704
}
704705

@@ -1006,6 +1007,14 @@ func (l *List) setMutationAfterCommit(startTs, commitTs uint64, pl *pb.PostingLi
10061007
}
10071008
l.mutationMap.committedUidsTime = x.Max(l.mutationMap.committedUidsTime, commitTs)
10081009

1010+
if refresh {
1011+
newMap := make(map[uint64]*pb.Posting, len(l.mutationMap.committedUids))
1012+
for uid, post := range l.mutationMap.committedUids {
1013+
newMap[uid] = post
1014+
}
1015+
l.mutationMap.committedUids = newMap
1016+
}
1017+
10091018
for _, mpost := range pl.Postings {
10101019
if hasDeleteAll(mpost) {
10111020
l.mutationMap.deleteAllMarker = commitTs
@@ -1144,7 +1153,7 @@ func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e
11441153
// pitr iterates through immutable postings
11451154
err = pitr.seek(l, afterUid, deleteBelowTs)
11461155
if err != nil {
1147-
return errors.Wrapf(err, "cannot initialize iterator when calling List.iterate %v", l.print())
1156+
return errors.Wrapf(err, "cannot initialize iterator when calling List.iterate %v", l.Print())
11481157
}
11491158

11501159
loop:
@@ -1995,7 +2004,10 @@ func (l *List) findStaticValue(readTs uint64) *pb.PostingList {
19952004

19962005
// If we reach here, that means that there was no entry in mutation map which is less than readTs. That
19972006
// means we need to return l.plist
1998-
return l.plist
2007+
if l.plist != nil && len(l.plist.Postings) > 0 {
2008+
return l.plist
2009+
}
2010+
return nil
19992011
}
20002012

20012013
// Value returns the default value from the posting list. The default value is

posting/list_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ func TestGetSinglePosting(t *testing.T) {
124124

125125
res, err := l.StaticValue(1)
126126
require.NoError(t, err)
127+
fmt.Println(res, res == nil)
127128
require.Equal(t, res == nil, true)
128129

129130
l.plist = create_pl(1, 1)
@@ -254,7 +255,7 @@ func TestAddMutation_jchiu1(t *testing.T) {
254255
key := x.DataKey(x.AttrInRootNamespace(x.AttrInRootNamespace("value")), 12)
255256
ol, err := GetNoStore(key, math.MaxUint64)
256257
require.NoError(t, err)
257-
258+
ol.mutationMap.setTs(1)
258259
// Set value to cars and merge to BadgerDB.
259260
edge := &pb.DirectedEdge{
260261
Value: []byte("cars"),
@@ -297,6 +298,7 @@ func TestAddMutation_DelSet(t *testing.T) {
297298
key := x.DataKey(x.AttrInRootNamespace(x.AttrInRootNamespace("value")), 1534)
298299
ol, err := GetNoStore(key, math.MaxUint64)
299300
require.NoError(t, err)
301+
ol.mutationMap.setTs(1)
300302

301303
// DO sp*, don't commit
302304
// Del a value cars and but don't merge.
@@ -312,6 +314,7 @@ func TestAddMutation_DelSet(t *testing.T) {
312314
Value: []byte("newcars"),
313315
}
314316
ol1, err := GetNoStore(key, math.MaxUint64)
317+
ol1.mutationMap.setTs(2)
315318
require.NoError(t, err)
316319
txn = &Txn{StartTs: 2}
317320
addMutationHelper(t, ol1, edge, Set, txn)
@@ -323,6 +326,7 @@ func TestAddMutation_DelSet(t *testing.T) {
323326
func TestAddMutation_DelRead(t *testing.T) {
324327
key := x.DataKey(x.AttrInRootNamespace(x.AttrInRootNamespace("value")), 1543)
325328
ol, err := GetNoStore(key, math.MaxUint64)
329+
ol.mutationMap.setTs(1)
326330
require.NoError(t, err)
327331

328332
// Set value to newcars, and commit it
@@ -361,6 +365,7 @@ func TestAddMutation_DelRead(t *testing.T) {
361365
func TestAddMutation_jchiu2(t *testing.T) {
362366
key := x.DataKey(x.AttrInRootNamespace(x.AttrInRootNamespace("value")), 15)
363367
ol, err := GetNoStore(key, math.MaxUint64)
368+
ol.mutationMap.setTs(1)
364369
require.NoError(t, err)
365370

366371
// Del a value cars and but don't merge.
@@ -383,6 +388,7 @@ func TestAddMutation_jchiu2(t *testing.T) {
383388
func TestAddMutation_jchiu2_Commit(t *testing.T) {
384389
key := x.DataKey(x.AttrInRootNamespace(x.AttrInRootNamespace("value")), 16)
385390
ol, err := GetNoStore(key, math.MaxUint64)
391+
ol.mutationMap.setTs(1)
386392
require.NoError(t, err)
387393

388394
// Del a value cars and but don't merge.
@@ -408,6 +414,7 @@ func TestAddMutation_jchiu2_Commit(t *testing.T) {
408414
func TestAddMutation_jchiu3(t *testing.T) {
409415
key := x.DataKey(x.AttrInRootNamespace("value"), 29)
410416
ol, err := GetNoStore(key, math.MaxUint64)
417+
ol.mutationMap.setTs(1)
411418
require.NoError(t, err)
412419

413420
// Set value to cars and merge to BadgerDB.
@@ -448,6 +455,7 @@ func TestAddMutation_jchiu3(t *testing.T) {
448455
func TestAddMutation_mrjn1(t *testing.T) {
449456
key := x.DataKey(x.AttrInRootNamespace("value"), 21)
450457
ol, err := GetNoStore(key, math.MaxUint64)
458+
ol.mutationMap.setTs(1)
451459
require.NoError(t, err)
452460

453461
// Set a value cars and merge.

posting/mvcc.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,7 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
662662
l := new(List)
663663
l.key = key
664664
l.plist = new(pb.PostingList)
665+
l.mutationMap = newMutableLayer()
665666
l.minTs = 0
666667

667668
// We use the following block of code to trigger incremental rollup on this key.
@@ -708,9 +709,6 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
708709
return err
709710
}
710711
pl.CommitTs = item.Version()
711-
if l.mutationMap == nil {
712-
l.mutationMap = newMutableLayer()
713-
}
714712
l.mutationMap.insertCommittedPostings(pl)
715713
return nil
716714
})
@@ -783,9 +781,6 @@ func (ml *MemoryLayer) readFromDisk(key []byte, pstore *badger.DB, readTs uint64
783781
return l, err
784782
}
785783
if readUids {
786-
if l.mutationMap == nil {
787-
l.mutationMap = newMutableLayer()
788-
}
789784
if err := l.calculateUids(); err != nil {
790785
return nil, err
791786
}

worker/sort_test.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,124 @@ func TestEmptyTypeSchema(t *testing.T) {
9090
x.ParseNamespaceAttr(types[0].TypeName)
9191
}
9292

93+
func TestDeleteSetWithVarEdgeCorruptsData(t *testing.T) {
94+
// Setup temporary directory for Badger DB
95+
dir, err := os.MkdirTemp("", "storetest_")
96+
require.NoError(t, err)
97+
defer os.RemoveAll(dir)
98+
99+
opt := badger.DefaultOptions(dir)
100+
ps, err := badger.OpenManaged(opt)
101+
require.NoError(t, err)
102+
posting.Init(ps, 0, false)
103+
Init(ps)
104+
105+
// Set schema
106+
schemaTxt := `
107+
room: string @index(hash) @upsert .
108+
person: string @index(hash) @upsert .
109+
office: uid @reverse @count .
110+
`
111+
err = schema.ParseBytes([]byte(schemaTxt), 1)
112+
require.NoError(t, err)
113+
114+
ctx := context.Background()
115+
attrRoom := x.AttrInRootNamespace("room")
116+
attrPerson := x.AttrInRootNamespace("person")
117+
attrOffice := x.AttrInRootNamespace("office")
118+
119+
uidRoom := uint64(1)
120+
uidJohn := uint64(2)
121+
122+
runMutation := func(startTs, commitTs uint64, edges []*pb.DirectedEdge) {
123+
txn := posting.Oracle().RegisterStartTs(startTs)
124+
for _, edge := range edges {
125+
require.NoError(t, runMutation(ctx, edge, txn))
126+
}
127+
txn.Update()
128+
writer := posting.NewTxnWriter(ps)
129+
require.NoError(t, txn.CommitToDisk(writer, commitTs))
130+
require.NoError(t, writer.Flush())
131+
txn.UpdateCachedKeys(commitTs)
132+
}
133+
134+
// Initial mutation: Set John → Leopard
135+
runMutation(1, 3, []*pb.DirectedEdge{
136+
{
137+
Entity: uidJohn,
138+
Attr: attrPerson,
139+
Value: []byte("John Smith"),
140+
ValueType: pb.Posting_STRING,
141+
Op: pb.DirectedEdge_SET,
142+
},
143+
{
144+
Entity: uidRoom,
145+
Attr: attrRoom,
146+
Value: []byte("Leopard"),
147+
ValueType: pb.Posting_STRING,
148+
Op: pb.DirectedEdge_SET,
149+
},
150+
{
151+
Entity: uidJohn,
152+
Attr: attrOffice,
153+
ValueId: uidRoom,
154+
ValueType: pb.Posting_UID,
155+
Op: pb.DirectedEdge_SET,
156+
},
157+
})
158+
159+
key := x.DataKey(attrOffice, uidJohn)
160+
rollup(t, key, ps, 4)
161+
162+
// Second mutation: Remove John from Leopard, assign Amanda
163+
uidAmanda := uint64(3)
164+
165+
runMutation(6, 8, []*pb.DirectedEdge{
166+
{
167+
Entity: uidJohn,
168+
Attr: attrOffice,
169+
ValueId: uidRoom,
170+
ValueType: pb.Posting_UID,
171+
Op: pb.DirectedEdge_DEL,
172+
},
173+
{
174+
Entity: uidAmanda,
175+
Attr: attrPerson,
176+
Value: []byte("Amanda Anderson"),
177+
ValueType: pb.Posting_STRING,
178+
Op: pb.DirectedEdge_SET,
179+
},
180+
{
181+
Entity: uidAmanda,
182+
Attr: attrOffice,
183+
ValueId: uidRoom,
184+
ValueType: pb.Posting_UID,
185+
Op: pb.DirectedEdge_SET,
186+
},
187+
})
188+
189+
// Read and validate: Amanda assigned, John unassigned
190+
txnRead := posting.Oracle().RegisterStartTs(10)
191+
192+
list, err := txnRead.Get(key)
193+
require.NoError(t, err)
194+
195+
uids, err := list.Uids(posting.ListOptions{ReadTs: 10})
196+
require.NoError(t, err)
197+
198+
// This assertion FAILS in the broken case where both Amanda and John are assigned
199+
require.Equal(t, 0, len(uids.Uids), "John should no longer have an office assigned")
200+
201+
keyRev := x.ReverseKey(attrOffice, uidRoom)
202+
listRev, err := txnRead.Get(keyRev)
203+
require.NoError(t, err)
204+
205+
reverseUids, err := listRev.Uids(posting.ListOptions{ReadTs: 10})
206+
require.NoError(t, err)
207+
208+
require.Equal(t, []uint64{uidAmanda}, reverseUids.Uids, "Only Amanda should be assigned on reverse edge")
209+
}
210+
93211
func TestGetScalarList(t *testing.T) {
94212
dir, err := os.MkdirTemp("", "storetest_")
95213
x.Check(err)

0 commit comments

Comments
 (0)