@@ -826,6 +826,7 @@ func (db *DB) writeRequests(reqs []*request) error {
826826 }
827827 count += len (b .Entries )
828828 var i uint64
829+ var err error
829830 for err = db .ensureRoomForWrite (); err == errNoRoom ; err = db .ensureRoomForWrite () {
830831 i ++
831832 if i % 100 == 0 {
@@ -1010,10 +1011,16 @@ func arenaSize(opt Options) int64 {
10101011
10111012// buildL0Table builds a new table from the memtable.
10121013func buildL0Table (ft flushTask , bopts table.Options ) * table.Builder {
1013- iter := ft .mt .sl .NewIterator ()
1014+ var iter y.Iterator
1015+ if ft .itr != nil {
1016+ iter = ft .itr
1017+ } else {
1018+ iter = ft .mt .sl .NewUniIterator (false )
1019+ }
10141020 defer iter .Close ()
1021+
10151022 b := table .NewTableBuilder (bopts )
1016- for iter .SeekToFirst (); iter .Valid (); iter .Next () {
1023+ for iter .Rewind (); iter .Valid (); iter .Next () {
10171024 if len (ft .dropPrefixes ) > 0 && hasAnyPrefixes (iter .Key (), ft .dropPrefixes ) {
10181025 continue
10191026 }
@@ -1029,16 +1036,13 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
10291036
10301037type flushTask struct {
10311038 mt * memTable
1039+ itr y.Iterator
10321040 dropPrefixes [][]byte
10331041}
10341042
10351043// handleFlushTask must be run serially.
10361044func (db * DB ) handleFlushTask (ft flushTask ) error {
1037- // There can be a scenario, when empty memtable is flushed.
1038- if ft .mt .sl .Empty () {
1039- return nil
1040- }
1041-
1045+ // ft.mt could be nil with ft.itr being the valid field.
10421046 bopts := buildTableOptions (db )
10431047 builder := buildL0Table (ft , bopts )
10441048 defer builder .Close ()
@@ -1074,11 +1078,51 @@ func (db *DB) handleFlushTask(ft flushTask) error {
10741078func (db * DB ) flushMemtable (lc * z.Closer ) error {
10751079 defer lc .Done ()
10761080
1081+ var sz int64
1082+ var itrs []y.Iterator
1083+ var mts []* memTable
1084+ slurp := func () {
1085+ for {
1086+ select {
1087+ case more , ok := <- db .flushChan :
1088+ if ! ok {
1089+ return
1090+ }
1091+ if more .mt == nil {
1092+ continue
1093+ }
1094+ sl := more .mt .sl
1095+ itrs = append (itrs , sl .NewUniIterator (false ))
1096+ mts = append (mts , more .mt )
1097+
1098+ sz += sl .MemSize ()
1099+ if sz > db .opt .MemTableSize {
1100+ return
1101+ }
1102+ default :
1103+ return
1104+ }
1105+ }
1106+ }
1107+
10771108 for ft := range db .flushChan {
10781109 if ft .mt == nil {
10791110 // We close db.flushChan now, instead of sending a nil ft.mt.
10801111 continue
10811112 }
1113+ sz = ft .mt .sl .MemSize ()
1114+ // Reset of itrs, mts etc. is being done below.
1115+ y .AssertTrue (len (itrs ) == 0 && len (mts ) == 0 )
1116+ itrs = append (itrs , ft .mt .sl .NewUniIterator (false ))
1117+ mts = append (mts , ft .mt )
1118+
1119+ // Pick more memtables, so we can really fill up the L0 table.
1120+ slurp ()
1121+
1122+ // db.opt.Infof("Picked %d memtables. Size: %d\n", len(itrs), sz)
1123+ ft .mt = nil
1124+ ft .itr = table .NewMergeIterator (itrs , false )
1125+
10821126 for {
10831127 err := db .handleFlushTask (ft )
10841128 if err == nil {
@@ -1089,9 +1133,11 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
10891133 // which would arrive here would match db.imm[0], because we acquire a
10901134 // lock over DB when pushing to flushChan.
10911135 // TODO: This logic is dirty AF. Any change and this could easily break.
1092- y .AssertTrue (ft .mt == db .imm [0 ])
1093- db .imm = db .imm [1 :]
1094- ft .mt .DecrRef () // Return memory.
1136+ for _ , mt := range mts {
1137+ y .AssertTrue (mt == db .imm [0 ])
1138+ db .imm = db .imm [1 :]
1139+ mt .DecrRef () // Return memory.
1140+ }
10951141 db .lock .Unlock ()
10961142
10971143 break
@@ -1100,6 +1146,8 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
11001146 db .opt .Errorf ("Failure while flushing memtable to disk: %v. Retrying...\n " , err )
11011147 time .Sleep (time .Second )
11021148 }
1149+ // Reset everything.
1150+ itrs , mts , sz = itrs [:0 ], mts [:0 ], 0
11031151 }
11041152 return nil
11051153}
0 commit comments