@@ -755,13 +755,6 @@ var requestPool = sync.Pool{
755755}
756756
757757func (db * DB ) writeToLSM (b * request ) error {
758- // We should check the length of b.Prts and b.Entries only when badger is not
759- // running in InMemory mode. In InMemory mode, we don't write anything to the
760- // value log and that's why the length of b.Ptrs will always be zero.
761- if ! db .opt .InMemory && len (b .Ptrs ) != len (b .Entries ) {
762- return errors .Errorf ("Ptrs and Entries don't match: %+v" , b )
763- }
764-
765758 for i , entry := range b .Entries {
766759 var err error
767760 if entry .skipVlogAndSetThreshold (db .valueThreshold ()) {
@@ -826,6 +819,7 @@ func (db *DB) writeRequests(reqs []*request) error {
826819 }
827820 count += len (b .Entries )
828821 var i uint64
822+ var err error
829823 for err = db .ensureRoomForWrite (); err == errNoRoom ; err = db .ensureRoomForWrite () {
830824 i ++
831825 if i % 100 == 0 {
@@ -1010,10 +1004,16 @@ func arenaSize(opt Options) int64 {
10101004
10111005// buildL0Table builds a new table from the memtable.
10121006func buildL0Table (ft flushTask , bopts table.Options ) * table.Builder {
1013- iter := ft .mt .sl .NewIterator ()
1007+ var iter y.Iterator
1008+ if ft .itr != nil {
1009+ iter = ft .itr
1010+ } else {
1011+ iter = ft .mt .sl .NewUniIterator (false )
1012+ }
10141013 defer iter .Close ()
1014+
10151015 b := table .NewTableBuilder (bopts )
1016- for iter .SeekToFirst (); iter .Valid (); iter .Next () {
1016+ for iter .Rewind (); iter .Valid (); iter .Next () {
10171017 if len (ft .dropPrefixes ) > 0 && hasAnyPrefixes (iter .Key (), ft .dropPrefixes ) {
10181018 continue
10191019 }
@@ -1029,16 +1029,13 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
10291029
10301030type flushTask struct {
10311031 mt * memTable
1032+ itr y.Iterator
10321033 dropPrefixes [][]byte
10331034}
10341035
10351036// handleFlushTask must be run serially.
10361037func (db * DB ) handleFlushTask (ft flushTask ) error {
1037- // There can be a scenario, when empty memtable is flushed.
1038- if ft .mt .sl .Empty () {
1039- return nil
1040- }
1041-
1038+ // ft.mt could be nil with ft.itr being the valid field.
10421039 bopts := buildTableOptions (db )
10431040 builder := buildL0Table (ft , bopts )
10441041 defer builder .Close ()
@@ -1074,11 +1071,48 @@ func (db *DB) handleFlushTask(ft flushTask) error {
10741071func (db * DB ) flushMemtable (lc * z.Closer ) error {
10751072 defer lc .Done ()
10761073
1074+ var sz int64
1075+ var itrs []y.Iterator
1076+ var mts []* memTable
1077+ slurp := func () {
1078+ for {
1079+ select {
1080+ case more := <- db .flushChan :
1081+ if more .mt == nil {
1082+ return
1083+ }
1084+ sl := more .mt .sl
1085+ itrs = append (itrs , sl .NewUniIterator (false ))
1086+ mts = append (mts , more .mt )
1087+
1088+ sz += sl .MemSize ()
1089+ if sz > db .opt .MemTableSize {
1090+ return
1091+ }
1092+ default :
1093+ return
1094+ }
1095+ }
1096+ }
1097+
10771098 for ft := range db .flushChan {
10781099 if ft .mt == nil {
10791100 // We close db.flushChan now, instead of sending a nil ft.mt.
10801101 continue
10811102 }
1103+ sz = ft .mt .sl .MemSize ()
1104+ // Reset of itrs, mts etc. is being done below.
1105+ y .AssertTrue (len (itrs ) == 0 && len (mts ) == 0 )
1106+ itrs = append (itrs , ft .mt .sl .NewUniIterator (false ))
1107+ mts = append (mts , ft .mt )
1108+
1109+ // Pick more memtables, so we can really fill up the L0 table.
1110+ slurp ()
1111+
1112+ // db.opt.Infof("Picked %d memtables. Size: %d\n", len(itrs), sz)
1113+ ft .mt = nil
1114+ ft .itr = table .NewMergeIterator (itrs , false )
1115+
10821116 for {
10831117 err := db .handleFlushTask (ft )
10841118 if err == nil {
@@ -1089,9 +1123,11 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
10891123 // which would arrive here would match db.imm[0], because we acquire a
10901124 // lock over DB when pushing to flushChan.
10911125 // TODO: This logic is dirty AF. Any change and this could easily break.
1092- y .AssertTrue (ft .mt == db .imm [0 ])
1093- db .imm = db .imm [1 :]
1094- ft .mt .DecrRef () // Return memory.
1126+ for _ , mt := range mts {
1127+ y .AssertTrue (mt == db .imm [0 ])
1128+ db .imm = db .imm [1 :]
1129+ mt .DecrRef () // Return memory.
1130+ }
10951131 db .lock .Unlock ()
10961132
10971133 break
@@ -1100,6 +1136,8 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
11001136 db .opt .Errorf ("Failure while flushing memtable to disk: %v. Retrying...\n " , err )
11011137 time .Sleep (time .Second )
11021138 }
1139+ // Reset everything.
1140+ itrs , mts , sz = itrs [:0 ], mts [:0 ], 0
11031141 }
11041142 return nil
11051143}
0 commit comments