Skip to content

Commit

Permalink
updated getBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
harshil-goel committed Feb 18, 2025
1 parent 1dfb2e0 commit 0faedd8
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 91 deletions.
8 changes: 4 additions & 4 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ func (db *DB) getMemTables() ([]*memTable, func()) {
// do that. For every get("fooX") call where X is the version, we will search
// for "fooX" in all the levels of the LSM tree. This is expensive but it
// removes the overhead of handling move keys completely.
func (db *DB) getBatch(keys [][]byte, done []bool) ([]y.ValueStruct, error) {
func (db *DB) getBatch(keys [][]byte, keysRead []bool) ([]y.ValueStruct, error) {
if db.IsClosed() {
return []y.ValueStruct{}, ErrDBClosed
}
Expand All @@ -761,7 +761,7 @@ func (db *DB) getBatch(keys [][]byte, done []bool) ([]y.ValueStruct, error) {
y.NumGetsAdd(db.opt.MetricsEnabled, 1)
// For memtable, we need to check every memtable each time
for j, key := range keys {
if done[j] {
if keysRead[j] {
continue
}
version := y.ParseTs(key)
Expand All @@ -776,15 +776,15 @@ func (db *DB) getBatch(keys [][]byte, done []bool) ([]y.ValueStruct, error) {
if vs.Version == version {
y.NumGetsWithResultsAdd(db.opt.MetricsEnabled, 1)
maxVs[j] = vs
done[j] = true
keysRead[j] = true
break
}
if maxVs[j].Version < vs.Version {
maxVs[j] = vs
}
}
}
return db.lc.getBatch(keys, maxVs, 0, done)
return db.lc.getBatch(keys, maxVs, 0, keysRead)
}

func (db *DB) get(key []byte) (y.ValueStruct, error) {
Expand Down
138 changes: 55 additions & 83 deletions level_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,29 @@ func (s *levelHandler) getTableForKey(key []byte) ([]*table.Table, func() error)
return []*table.Table{tbl}, tbl.DecrRef
}

func (s *levelHandler) getBatch(keys [][]byte, done []bool) ([]y.ValueStruct, error) {
// Find the table for which the key is in, and then seek it
getForKey := func(key []byte) (y.ValueStruct, func() error, []*table.Iterator) {
// checkInsideIteator checks if the key is present in the iterator or not. It updates maxVs if the value is
// found.
func (s *levelHandler) checkInsideIterator(key []byte, it *table.Iterator, maxVs *y.ValueStruct) {
y.NumLSMGetsAdd(s.db.opt.MetricsEnabled, s.strLevel, 1)
it.Seek(key)
if !it.Valid() {
return
}
if !y.SameKey(key, it.Key()) {
return
}
if version := y.ParseTs(it.Key()); maxVs.Version < version {
*maxVs = it.ValueCopy()
maxVs.Version = version
}
}

func (s *levelHandler) getBatch(keys [][]byte, keysRead []bool) ([]y.ValueStruct, error) {
// Find the table for which the key is in, and then seek it. There's a good chance that they next key to be
// searched, is in the same table as well. Hence, we store the iterators found. If we don't find the results
// in the given table, we would need to search again. Worst case, this function could be a little worse than
// getting the n keys, in n different get calls.
createIteratorsForEachTable := func(key []byte) (y.ValueStruct, func() error, []*table.Iterator) {
tables, decr := s.getTableForKey(key)
keyNoTs := y.ParseKey(key)
itrs := make([]*table.Iterator, 0)
Expand All @@ -279,94 +299,56 @@ func (s *levelHandler) getBatch(keys [][]byte, done []bool) ([]y.ValueStruct, er

it := th.NewIterator(0)
itrs = append(itrs, it)

y.NumLSMGetsAdd(s.db.opt.MetricsEnabled, s.strLevel, 1)
it.Seek(key)
if !it.Valid() {
continue
}
if y.SameKey(key, it.Key()) {
if version := y.ParseTs(it.Key()); maxVs.Version < version {
maxVs = it.ValueCopy()
maxVs.Version = version
}
}
s.checkInsideIterator(key, it, &maxVs)
}

return maxVs, decr, itrs
}

// Use old results from getForKey and find in those tables.
findInIter := func(key []byte, itrs []*table.Iterator) y.ValueStruct {
// Use old results from createIteratorsForEachTable and find in those tables.
findInIterators := func(key []byte, itrs []*table.Iterator) y.ValueStruct {
var maxVs y.ValueStruct

for _, it := range itrs {
it.Seek(key)
if !it.Valid() {
continue
}
if y.SameKey(key, it.Key()) {
if version := y.ParseTs(it.Key()); maxVs.Version < version {
maxVs = it.ValueCopy()
maxVs.Version = version
}
}
s.checkInsideIterator(key, it, &maxVs)
}

return maxVs
}

results := make([]y.ValueStruct, len(keys))
// For L0, we need to search all tables each time, so we can just call get() as required
if s.level == 0 {
var err error
for i, key := range keys {
if done[i] {
continue
}
results[i], err = s.get(key)
if err != nil {
return results, err
}

decr := func() error { return nil }
var itrs []*table.Iterator

close_iters := func() {
for _, itr := range itrs {
itr.Close()
}
return results, nil
} else {
decr := func() error { return nil }
var itrs []*table.Iterator
}

started := false
for i := 0; i < len(keys); i++ {
if done[i] {
continue
}
if !started {
var maxVs y.ValueStruct
maxVs, decr, itrs = getForKey(keys[0])
results[i] = maxVs
started = true
} else {
results[i] = findInIter(keys[i], itrs)
// If we can't find in the current tables, maybe the
// data is there in other tables
if len(results[i].Value) == 0 {
for i := 0; i < len(itrs); i++ {
itrs[i].Close()
}
err := decr()
if err != nil {
return nil, err
}
results[i], decr, itrs = getForKey(keys[i])
defer close_iters()

for i := 0; i < len(keys); i++ {
if keysRead[i] {
continue
}
// If there are no iterators present, create new iterators
if len(itrs) == 0 {
results[i], decr, itrs = createIteratorsForEachTable(keys[i])
} else {
results[i] = findInIterators(keys[i], itrs)
// If we can't find in the current tables, then data is there in other tables. We would
// then need to close iterators, call decr() and then recreate new iterators.
if len(results[i].Value) == 0 {
close_iters()
if err := decr(); err != nil {
return nil, err
}
results[i], decr, itrs = createIteratorsForEachTable(keys[i])
}
}

for i := 0; i < len(itrs); i++ {
itrs[i].Close()
}
return results, decr()
}

return results, decr()
}

// get returns value for a given key or the key after that. If not found, return nil.
Expand All @@ -385,17 +367,7 @@ func (s *levelHandler) get(key []byte) (y.ValueStruct, error) {
it := th.NewIterator(0)
defer it.Close()

y.NumLSMGetsAdd(s.db.opt.MetricsEnabled, s.strLevel, 1)
it.Seek(key)
if !it.Valid() {
continue
}
if y.SameKey(key, it.Key()) {
if version := y.ParseTs(it.Key()); maxVs.Version < version {
maxVs = it.ValueCopy()
maxVs.Version = version
}
}
s.checkInsideIterator(key, it, &maxVs)
}
return maxVs, decr()
}
Expand Down
8 changes: 4 additions & 4 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -1594,7 +1594,7 @@ func (s *levelsController) close() error {
return y.Wrap(err, "levelsController.Close")
}

func (s *levelsController) getBatch(keys [][]byte, maxVs []y.ValueStruct, startLevel int, done []bool) (
func (s *levelsController) getBatch(keys [][]byte, maxVs []y.ValueStruct, startLevel int, keysRead []bool) (
[]y.ValueStruct, error) {
if s.kv.IsClosed() {
return []y.ValueStruct{}, ErrDBClosed
Expand All @@ -1609,15 +1609,15 @@ func (s *levelsController) getBatch(keys [][]byte, maxVs []y.ValueStruct, startL
if h.level < startLevel {
continue
}
vs, err := h.getBatch(keys, done) // Calls h.RLock() and h.RUnlock().
vs, err := h.getBatch(keys, keysRead) // Calls h.RLock() and h.RUnlock().
if err != nil {
return []y.ValueStruct{}, y.Wrapf(err, "get keys: %q", keys)
}

for i, v := range vs {
// Done is only update by this function or one in db. levelhandler will
// not update done. No need to do anything is done is set.
if done[i] {
if keysRead[i] {
continue
}
if v.Value == nil && v.Meta == 0 {
Expand All @@ -1627,7 +1627,7 @@ func (s *levelsController) getBatch(keys [][]byte, maxVs []y.ValueStruct, startL
version := y.ParseTs(keys[i])
if v.Version == version {
maxVs[i] = v
done[i] = true
keysRead[i] = true
}
if maxVs[i].Version < v.Version {
maxVs[i] = v
Expand Down

0 comments on commit 0faedd8

Please sign in to comment.