From 26ca0eb194c504b183cdd8f69763de7f7bcd2741 Mon Sep 17 00:00:00 2001 From: Suryandaru Triandana Date: Mon, 24 Feb 2014 08:58:57 +0700 Subject: [PATCH] leveldb: Implements slice support --- README.md | 24 +- leveldb/bench_test.go | 2 +- leveldb/corrupt_test.go | 2 +- leveldb/db.go | 13 +- leveldb/db_iter.go | 60 ++-- leveldb/db_snapshot.go | 10 +- leveldb/db_test.go | 20 +- leveldb/db_util.go | 3 +- leveldb/doc.go | 24 +- leveldb/empty_test.go | 62 ---- leveldb/external_test.go | 58 ++++ leveldb/leveldb_suite_test.go | 20 ++ leveldb/session.go | 4 +- leveldb/sorted_test.go | 530 ------------------------------- leveldb/table.go | 44 ++- leveldb/testutil/db.go | 44 ++- leveldb/testutil/iter.go | 1 + leveldb/testutil/kvtest.go | 15 - leveldb/testutil/storage.go | 580 ++++++++++++++++++++++++++++++++++ leveldb/testutil/util.go | 39 ++- leveldb/testutil_test.go | 58 ++++ leveldb/version.go | 9 +- 22 files changed, 939 insertions(+), 683 deletions(-) delete mode 100644 leveldb/empty_test.go create mode 100644 leveldb/external_test.go create mode 100644 leveldb/leveldb_suite_test.go delete mode 100644 leveldb/sorted_test.go create mode 100644 leveldb/testutil/storage.go create mode 100644 leveldb/testutil_test.go diff --git a/README.md b/README.md index 9bb6bf9f..3776aad7 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ Read or modify the database content: Iterate over database content: - iter := db.NewIterator(nil) + iter := db.NewIterator(nil, nil) for iter.Next() { // Remember that the contents of the returned slice should not be modified, and // only valid until the next call to Next. @@ -44,6 +44,28 @@ Iterate over database content: err = iter.Error() ... +Seek-then-Iterate: + + iter := db.NewIterator(nil, nil) + for ok := iter.Seek(key); ok; ok = iter.Next() { + // Use key/value. + ... + } + iter.Release() + err = iter.Error() + ... + +Iterate over subset of database content: + + iter := db.NewIterator(&util.Range{Start: []byte("foo"), Limit: []byte("xoo")}, nil) + for iter.Next() { + // Use key/value. + ... + } + iter.Release() + err = iter.Error() + ... + Batch writes: batch := new(leveldb.Batch) diff --git a/leveldb/bench_test.go b/leveldb/bench_test.go index 79933048..ea6801a8 100644 --- a/leveldb/bench_test.go +++ b/leveldb/bench_test.go @@ -240,7 +240,7 @@ func (p *dbBench) seeks() { } func (p *dbBench) newIter() iterator.Iterator { - iter := p.db.NewIterator(p.ro) + iter := p.db.NewIterator(nil, p.ro) err := iter.Error() if err != nil { p.b.Fatal("cannot create iterator: ", err) diff --git a/leveldb/corrupt_test.go b/leveldb/corrupt_test.go index 5a69351b..42f60227 100644 --- a/leveldb/corrupt_test.go +++ b/leveldb/corrupt_test.go @@ -133,7 +133,7 @@ func (h *dbCorruptHarness) check(min, max int) { db := p.db var n, badk, badv, missed, good int - iter := db.NewIterator(p.ro) + iter := db.NewIterator(nil, p.ro) for iter.Next() { k := 0 fmt.Sscanf(string(iter.Key()), "%d", &k) diff --git a/leveldb/db.go b/leveldb/db.go index e0267a4e..e11dda30 100644 --- a/leveldb/db.go +++ b/leveldb/db.go @@ -231,7 +231,7 @@ func Recover(p storage.Storage, o *opt.Options) (db *DB, err error) { } t := newTFile(f, uint64(size), nil, nil) - iter := s.tops.newIterator(t, nil) + iter := s.tops.newIterator(t, nil, nil) // min ikey if iter.First() { t.min = iter.Key() @@ -266,7 +266,7 @@ func Recover(p storage.Storage, o *opt.Options) (db *DB, err error) { // extract largest seq number from newest table if nt != nil { var lseq uint64 - iter := s.tops.newIterator(nt, nil) + iter := s.tops.newIterator(nt, nil, nil) for iter.Next() { seq, _, ok := iKey(iter.Key()).parseNum() if !ok { @@ -473,17 +473,22 @@ func (d *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { // underlying DB. The resultant key/value pairs are guaranteed to be // consistent. // +// Slice allows slicing the iterator to only contains keys in the given +// range. A nil Range.Start is treated as a key before all keys in the +// DB. And a nil Range.Limit is treated as a key after all keys in +// the DB. +// // The iterator must be released after use, by calling Release method. // // Also read Iterator documentation of the leveldb/iterator package. -func (d *DB) NewIterator(ro *opt.ReadOptions) iterator.Iterator { +func (d *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { if err := d.ok(); err != nil { return iterator.NewEmptyIterator(err) } p := d.newSnapshot() defer p.Release() - return p.NewIterator(ro) + return p.NewIterator(slice, ro) } // GetSnapshot returns a latest snapshot of the underlying DB. A snapshot diff --git a/leveldb/db_iter.go b/leveldb/db_iter.go index 83ff3832..9598a468 100644 --- a/leveldb/db_iter.go +++ b/leveldb/db_iter.go @@ -20,18 +20,18 @@ var ( errInvalidIkey = errors.New("leveldb: Iterator: invalid internal key") ) -func (db *DB) newRawIterator(ro *opt.ReadOptions) iterator.Iterator { +func (db *DB) newRawIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { s := db.s em, fm := db.getMems() v := s.version() - ti := v.getIterators(ro) + ti := v.getIterators(slice, ro) n := len(ti) + 2 i := make([]iterator.Iterator, 0, n) - i = append(i, em.NewIterator(nil)) + i = append(i, em.NewIterator(slice)) if fm != nil { - i = append(i, fm.NewIterator(nil)) + i = append(i, fm.NewIterator(slice)) } i = append(i, ti...) mi := iterator.NewMergedIterator(i, s.cmp, true) @@ -39,13 +39,25 @@ func (db *DB) newRawIterator(ro *opt.ReadOptions) iterator.Iterator { return mi } -func (db *DB) newIterator(seq uint64, ro *opt.ReadOptions) *dbIter { - rawIter := db.newRawIterator(ro) +func (db *DB) newIterator(seq uint64, slice *util.Range, ro *opt.ReadOptions) *dbIter { + var slice_ *util.Range + if slice != nil { + slice_ = &util.Range{} + if slice.Start != nil { + slice_.Start = newIKey(slice.Start, kMaxSeq, tSeek) + } + if slice.Limit != nil { + slice_.Limit = newIKey(slice.Limit, kMaxSeq, tSeek) + } + } + rawIter := db.newRawIterator(slice_, ro) iter := &dbIter{ cmp: db.s.cmp.cmp, iter: rawIter, seq: seq, strict: db.s.o.GetStrict(opt.StrictIterator) || ro.GetStrict(opt.StrictIterator), + key: make([]byte, 0), + value: make([]byte, 0), } runtime.SetFinalizer(iter, (*dbIter).Release) return iter @@ -193,25 +205,27 @@ func (i *dbIter) Next() bool { func (i *dbIter) prev() bool { i.dir = dirBackward del := true - for { - ukey, seq, t, ok := parseIkey(i.iter.Key()) - if ok { - if seq <= i.seq { - if !del && i.cmp.Compare(ukey, i.key) < 0 { - return true - } - del = (t == tDel) - if !del { - i.key = append(i.key[:0], ukey...) - i.value = append(i.value[:0], i.iter.Value()...) + if i.iter.Valid() { + for { + ukey, seq, t, ok := parseIkey(i.iter.Key()) + if ok { + if seq <= i.seq { + if !del && i.cmp.Compare(ukey, i.key) < 0 { + return true + } + del = (t == tDel) + if !del { + i.key = append(i.key[:0], ukey...) + i.value = append(i.value[:0], i.iter.Value()...) + } } + } else if i.strict { + i.setErr(errInvalidIkey) + return false + } + if !i.iter.Prev() { + break } - } else if i.strict { - i.setErr(errInvalidIkey) - return false - } - if !i.iter.Prev() { - break } } if del { diff --git a/leveldb/db_snapshot.go b/leveldb/db_snapshot.go index fa621c00..8300ccd3 100644 --- a/leveldb/db_snapshot.go +++ b/leveldb/db_snapshot.go @@ -12,6 +12,7 @@ import ( "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/util" ) type snapshotElement struct { @@ -119,12 +120,17 @@ func (p *Snapshot) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error // underlying DB. The resultant key/value pairs are guaranteed to be // consistent. // +// Slice allows slicing the iterator to only contains keys in the given +// range. A nil Range.Start is treated as a key before all keys in the +// DB. And a nil Range.Limit is treated as a key after all keys in +// the DB. +// // The iterator must be released after use, by calling Release method. // Releasing the snapshot doesn't mean releasing the iterator too, the // iterator would be still valid until released. // // Also read Iterator documentation of the leveldb/iterator package. -func (p *Snapshot) NewIterator(ro *opt.ReadOptions) iterator.Iterator { +func (p *Snapshot) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { db := p.db if err := db.ok(); err != nil { return iterator.NewEmptyIterator(err) @@ -134,7 +140,7 @@ func (p *Snapshot) NewIterator(ro *opt.ReadOptions) iterator.Iterator { if p.released { return iterator.NewEmptyIterator(ErrSnapshotReleased) } - return db.newIterator(p.elem.seq, ro) + return db.newIterator(p.elem.seq, slice, ro) } // Release releases the snapshot. This will not release any returned diff --git a/leveldb/db_test.go b/leveldb/db_test.go index 54421acf..777a6a25 100644 --- a/leveldb/db_test.go +++ b/leveldb/db_test.go @@ -224,7 +224,7 @@ func (h *dbHarness) allEntriesFor(key, want string) { ucmp := db.s.cmp.cmp ikey := newIKey([]byte(key), kMaxSeq, tVal) - iter := db.newRawIterator(new(opt.ReadOptions)) + iter := db.newRawIterator(nil, nil) if !iter.Seek(ikey) && iter.Error() != nil { t.Error("AllEntries: error during seek, err: ", iter.Error()) return @@ -276,7 +276,7 @@ func (h *dbHarness) getKeyVal(want string) { t.Fatal("GetSnapshot: got error: ", err) } res := "" - iter := s.NewIterator(new(opt.ReadOptions)) + iter := s.NewIterator(nil, nil) for iter.Next() { res += fmt.Sprintf("(%s->%s)", string(iter.Key()), string(iter.Value())) } @@ -766,7 +766,7 @@ func TestDb_IterMultiWithDelete(t *testing.T) { h.delete("b") h.get("b", false) - iter := h.db.NewIterator(new(opt.ReadOptions)) + iter := h.db.NewIterator(nil, nil) iter.Seek([]byte("c")) testKeyVal(t, iter, "c->vc") iter.Prev() @@ -775,7 +775,7 @@ func TestDb_IterMultiWithDelete(t *testing.T) { h.compactMem() - iter = h.db.NewIterator(new(opt.ReadOptions)) + iter = h.db.NewIterator(nil, nil) iter.Seek([]byte("c")) testKeyVal(t, iter, "c->vc") iter.Prev() @@ -791,7 +791,7 @@ func TestDb_IteratorPinsRef(t *testing.T) { h.put("foo", "hello") // Get iterator that will yield the current contents of the DB. - iter := h.db.NewIterator(new(opt.ReadOptions)) + iter := h.db.NewIterator(nil, nil) // Write to force compactions h.put("foo", "newvalue1") @@ -1424,7 +1424,7 @@ func TestDb_ClosedIsClosed(t *testing.T) { h.put("k", "v") h.getVal("k", "v") - iter = db.NewIterator(h.ro) + iter = db.NewIterator(nil, h.ro) iter.Seek([]byte("k")) testKeyVal(t, iter, "k->v") @@ -1436,7 +1436,7 @@ func TestDb_ClosedIsClosed(t *testing.T) { h.getValr(snap, "k", "v") - iter2 = snap.NewIterator(h.ro) + iter2 = snap.NewIterator(nil, h.ro) iter2.Seek([]byte("k")) testKeyVal(t, iter2, "k->v") @@ -1470,10 +1470,10 @@ func TestDb_ClosedIsClosed(t *testing.T) { _, err = db.GetSnapshot() assertErr(t, err, true) - iter3 := db.NewIterator(h.ro) + iter3 := db.NewIterator(nil, h.ro) assertErr(t, iter3.Error(), true) - iter3 = snap.NewIterator(h.ro) + iter3 = snap.NewIterator(nil, h.ro) assertErr(t, iter3.Error(), true) assertErr(t, db.Delete([]byte("k"), h.wo), true) @@ -1725,7 +1725,7 @@ func TestDb_Concurrent2(t *testing.T) { for i := 0; i < n2; i++ { closeWg.Add(1) go func(i int) { - it := h.db.NewIterator(nil) + it := h.db.NewIterator(nil, nil) var pk []byte for it.Next() { kk := it.Key() diff --git a/leveldb/db_util.go b/leveldb/db_util.go index ff11b34d..0fbd66df 100644 --- a/leveldb/db_util.go +++ b/leveldb/db_util.go @@ -10,13 +10,14 @@ import ( "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/storage" + "github.com/syndtr/goleveldb/leveldb/util" ) // Reader is the interface that wraps basic Get and NewIterator methods. // This interface implemented by both DB and Snapshot. type Reader interface { Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) - NewIterator(ro *opt.ReadOptions) iterator.Iterator + NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator } type Sizes []uint64 diff --git a/leveldb/doc.go b/leveldb/doc.go index 51c4b0d7..ac9ea3d0 100644 --- a/leveldb/doc.go +++ b/leveldb/doc.go @@ -25,7 +25,7 @@ // // Iterate over database content: // -// iter := db.NewIterator(nil) +// iter := db.NewIterator(nil, nil) // for iter.Next() { // // Remember that the contents of the returned slice should not be modified, and // // only valid until the next call to Next. @@ -37,6 +37,28 @@ // err = iter.Error() // ... // +// Seek-then-Iterate: +// +// iter := db.NewIterator(nil, nil) +// for ok := iter.Seek(key); ok; ok = iter.Next() { +// // Use key/value. +// ... +// } +// iter.Release() +// err = iter.Error() +// ... +// +// Iterate over subset of database content: +// +// iter := db.NewIterator(&util.Range{Start: []byte("foo"), Limit: []byte("xoo")}, nil) +// for iter.Next() { +// // Use key/value. +// ... +// } +// iter.Release() +// err = iter.Error() +// ... +// // Batch writes: // // batch := new(leveldb.Batch) diff --git a/leveldb/empty_test.go b/leveldb/empty_test.go deleted file mode 100644 index 71538256..00000000 --- a/leveldb/empty_test.go +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright (c) 2012, Suryandaru Triandana -// All rights reserved. -// -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -package leveldb - -import "testing" - -func TestIter_Empty(t *testing.T) { - cc := []struct { - name string - c stConstructor - }{ - {"table", &stConstructor_Table{}}, - {"memdb", &stConstructor_MemDB{}}, - {"merged", &stConstructor_MergedMemDB{}}, - {"db", &stConstructor_DB{}}, - } - - for _, p := range cc { - c, name := p.c, p.name - func() { - err := c.init(t, new(stHarnessOpt)) - if err != nil { - t.Error(name+": error when initializing constructor:", err.Error()) - return - } - defer c.close() - size, err := c.finish() - if err != nil { - t.Error(name+": error when finishing constructor:", err.Error()) - return - } - t.Logf(name+": final size is %d bytes", size) - iter := c.newIterator() - defer iter.Release() - if iter.Valid() { - t.Error(name + ": Valid() return true") - } - if iter.Next() { - t.Error(name + ": Next() return true") - } - if iter.Prev() { - t.Error(name + ": Prev() return true") - } - if iter.Seek(nil) { - t.Error(name + ": Seek(nil) return true") - } - if iter.Seek([]byte("v")) { - t.Error(name + ": Seek('v') return true") - } - if iter.First() { - t.Error(name + ": First() return true") - } - if iter.Last() { - t.Error(name + ": Last() return true") - } - }() - } -} diff --git a/leveldb/external_test.go b/leveldb/external_test.go new file mode 100644 index 00000000..274472f0 --- /dev/null +++ b/leveldb/external_test.go @@ -0,0 +1,58 @@ +// Copyright (c) 2014, Suryandaru Triandana +// All rights reserved. +// +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package leveldb + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/testutil" +) + +var _ = testutil.Defer(func() { + Describe("Leveldb external", func() { + o := &opt.Options{ + BlockCache: opt.NoCache, + BlockRestartInterval: 5, + BlockSize: 50, + Compression: opt.NoCompression, + MaxOpenFiles: 0, + Strict: opt.StrictAll, + WriteBuffer: 1000, + } + + Describe("write test", func() { + It("should do write correctly", func(done Done) { + db := newTestingDB(o, nil, nil) + t := testutil.DBTesting{ + DB: db, + Deleted: testutil.KeyValue_Generate(nil, 1000, 1, 30, 5, 5).Clone(), + } + testutil.DoDBTesting(&t) + db.TestClose() + done <- true + }, 9.0) + }) + + Describe("read test", func() { + testutil.AllKeyValueTesting(nil, func(kv testutil.KeyValue) testutil.DB { + // Building the DB. + db := newTestingDB(o, nil, nil) + kv.IterateShuffled(nil, func(i int, key, value []byte) { + err := db.TestPut(key, value) + Expect(err).NotTo(HaveOccurred()) + }) + testutil.Defer("teardown", func() { + db.TestClose() + }) + + return db + }) + }) + }) +}) diff --git a/leveldb/leveldb_suite_test.go b/leveldb/leveldb_suite_test.go new file mode 100644 index 00000000..245b1fd4 --- /dev/null +++ b/leveldb/leveldb_suite_test.go @@ -0,0 +1,20 @@ +package leveldb + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/syndtr/goleveldb/leveldb/testutil" +) + +func TestLeveldb(t *testing.T) { + testutil.RunDefer() + + RegisterFailHandler(Fail) + RunSpecs(t, "Leveldb Suite") + + RegisterTestingT(t) + testutil.RunDefer("teardown") +} diff --git a/leveldb/session.go b/leveldb/session.go index dd38305c..f9d078e8 100644 --- a/leveldb/session.go +++ b/leveldb/session.go @@ -365,10 +365,10 @@ func (c *compaction) newIterator() iterator.Iterator { if level+i == 0 { for _, t := range tt { - its = append(its, s.tops.newIterator(t, ro)) + its = append(its, s.tops.newIterator(t, nil, ro)) } } else { - it := iterator.NewIndexedIterator(tt.newIndexIterator(s.tops, icmp, ro), strict, true) + it := iterator.NewIndexedIterator(tt.newIndexIterator(s.tops, icmp, nil, ro), strict, true) its = append(its, it) } } diff --git a/leveldb/sorted_test.go b/leveldb/sorted_test.go deleted file mode 100644 index ec20aa56..00000000 --- a/leveldb/sorted_test.go +++ /dev/null @@ -1,530 +0,0 @@ -// Copyright (c) 2012, Suryandaru Triandana -// All rights reserved. -// -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -package leveldb - -import ( - "bytes" - "fmt" - "math/rand" - "runtime" - "strings" - "testing" - - "github.com/syndtr/goleveldb/leveldb/comparer" - "github.com/syndtr/goleveldb/leveldb/iterator" - "github.com/syndtr/goleveldb/leveldb/memdb" - "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/syndtr/goleveldb/leveldb/table" - "github.com/syndtr/goleveldb/leveldb/util" -) - -type stConstructor interface { - init(t *testing.T, ho *stHarnessOpt) error - add(key, value string) error - finish() (int, error) - newIterator() iterator.Iterator - customTest(h *stHarness) - close() -} - -type stConstructor_Table struct { - t *testing.T - buf bytes.Buffer - o *opt.Options - writer *table.Writer - reader *table.Reader -} - -func (tc *stConstructor_Table) init(t *testing.T, ho *stHarnessOpt) error { - tc.t = t - tc.o = &opt.Options{ - BlockSize: 512, - BlockRestartInterval: 3, - } - tc.writer = table.NewWriter(&tc.buf, tc.o) - return nil -} - -func (tc *stConstructor_Table) add(key, value string) error { - return tc.writer.Append([]byte(key), []byte(value)) -} - -func (tc *stConstructor_Table) finish() (size int, err error) { - err = tc.writer.Close() - if err != nil { - return - } - tc.t.Logf("table: contains %d entries and %d blocks", tc.writer.EntriesLen(), tc.writer.BlocksLen()) - size = tc.buf.Len() - if csize := int(tc.writer.BytesLen()); csize != size { - tc.t.Errorf("table: invalid calculated size, calculated=%d actual=%d", csize, size) - } - tc.reader = table.NewReader(bytes.NewReader(tc.buf.Bytes()), int64(size), nil, tc.o) - return -} - -func (tc *stConstructor_Table) newIterator() iterator.Iterator { - return tc.reader.NewIterator(nil, nil) -} - -func (tc *stConstructor_Table) customTest(h *stHarness) { - for i := range h.keys { - key, value, err := tc.reader.Find([]byte(h.keys[i]), nil) - if err != nil { - h.t.Errorf("table: CustomTest: Find: %v", err) - continue - } - if string(key) != h.keys[i] { - h.t.Errorf("table: CustomTest: Find: invalid key, got=%q want=%q", - shorten(string(key)), shorten(h.keys[i])) - } - if string(value) != h.values[i] { - h.t.Errorf("table: CustomTest: Find: invalid value, got=%q want=%q", - shorten(string(value)), shorten(h.values[i])) - } - } -} - -func (tc *stConstructor_Table) close() {} - -type stConstructor_MemDB struct { - t *testing.T - db *memdb.DB -} - -func (mc *stConstructor_MemDB) init(t *testing.T, ho *stHarnessOpt) error { - ho.Randomize = true - mc.t = t - mc.db = memdb.New(comparer.DefaultComparer, 0) - return nil -} - -func (mc *stConstructor_MemDB) add(key, value string) error { - mc.db.Put([]byte(key), []byte(value)) - return nil -} - -func (mc *stConstructor_MemDB) finish() (size int, err error) { - return int(mc.db.Size()), nil -} - -func (mc *stConstructor_MemDB) newIterator() iterator.Iterator { - return mc.db.NewIterator(nil) -} - -func (mc *stConstructor_MemDB) customTest(h *stHarness) {} -func (mc *stConstructor_MemDB) close() {} - -type stConstructor_MergedMemDB struct { - t *testing.T - db [3]*memdb.DB -} - -func (mc *stConstructor_MergedMemDB) init(t *testing.T, ho *stHarnessOpt) error { - ho.Randomize = true - mc.t = t - for i := range mc.db { - mc.db[i] = memdb.New(comparer.DefaultComparer, 0) - } - return nil -} - -func (mc *stConstructor_MergedMemDB) add(key, value string) error { - mc.db[rand.Intn(99999)%len(mc.db)].Put([]byte(key), []byte(value)) - return nil -} - -func (mc *stConstructor_MergedMemDB) finish() (size int, err error) { - for i, db := range mc.db { - mc.t.Logf("merged: db[%d] size: %d", i, db.Size()) - size += db.Size() - } - return -} - -func (mc *stConstructor_MergedMemDB) newIterator() iterator.Iterator { - var ii []iterator.Iterator - for _, db := range mc.db { - ii = append(ii, db.NewIterator(nil)) - } - return iterator.NewMergedIterator(ii, comparer.DefaultComparer, true) -} - -func (mc *stConstructor_MergedMemDB) customTest(h *stHarness) {} -func (mc *stConstructor_MergedMemDB) close() {} - -type stConstructor_DB struct { - t *testing.T - stor *testStorage - ro *opt.ReadOptions - wo *opt.WriteOptions - db *DB -} - -func (dc *stConstructor_DB) init(t *testing.T, ho *stHarnessOpt) (err error) { - ho.Randomize = true - dc.t = t - dc.stor = newTestStorage(t) - dc.ro = nil - dc.wo = nil - dc.db, err = Open(dc.stor, &opt.Options{ - WriteBuffer: 2800, - }) - if err != nil { - dc.stor.Close() - } - return -} - -func (dc *stConstructor_DB) add(key, value string) error { - return dc.db.Put([]byte(key), []byte(value), dc.wo) -} - -func (dc *stConstructor_DB) finish() (size int, err error) { - iter := dc.db.NewIterator(dc.ro) - defer iter.Release() - var r util.Range - if iter.First() { - r.Start = append([]byte{}, iter.Key()...) - } - if iter.Last() { - r.Limit = append([]byte{}, iter.Key()...) - } - err = iter.Error() - if err != nil { - return - } - sizes, err := dc.db.GetApproximateSizes([]util.Range{r}) - size = int(sizes.Sum()) - return -} - -func (dc *stConstructor_DB) newIterator() iterator.Iterator { - return dc.db.NewIterator(dc.ro) -} - -func (dc *stConstructor_DB) customTest(h *stHarness) {} - -func (dc *stConstructor_DB) close() { - if err := dc.db.Close(); err != nil { - dc.t.Errorf("leveldb: db close: %v", err) - } - if err := dc.stor.Close(); err != nil { - dc.t.Errorf("leveldb: storage close: %v", err) - } - dc.db = nil - dc.stor = nil - runtime.GC() -} - -type stHarnessOpt struct { - Randomize bool -} - -type stHarness struct { - t *testing.T - - keys, values []string -} - -func newStHarness(t *testing.T) *stHarness { - return &stHarness{t: t} -} - -func (h *stHarness) add(key, value string) { - h.keys = append(h.keys, key) - h.values = append(h.values, value) -} - -func (h *stHarness) testAll() { - h.test("table", &stConstructor_Table{}) - h.test("memdb", &stConstructor_MemDB{}) - h.test("merged", &stConstructor_MergedMemDB{}) - h.test("leveldb", &stConstructor_DB{}) -} - -func (h *stHarness) test(name string, c stConstructor) { - ho := new(stHarnessOpt) - - err := c.init(h.t, ho) - if err != nil { - h.t.Error("error when initializing constructor:", err.Error()) - return - } - defer c.close() - - keys, values := h.keys, h.values - if ho.Randomize { - m := len(h.keys) - times := m * 2 - r1, r2 := rand.New(rand.NewSource(0xdeadbeef)), rand.New(rand.NewSource(0xbeefface)) - keys, values = make([]string, m), make([]string, m) - copy(keys, h.keys) - copy(values, h.values) - for n := 0; n < times; n++ { - i, j := r1.Intn(99999)%m, r2.Intn(99999)%m - if i == j { - continue - } - keys[i], keys[j] = keys[j], keys[i] - values[i], values[j] = values[j], values[i] - } - } - - for i := range keys { - err = c.add(keys[i], values[i]) - if err != nil { - h.t.Error("error when adding key/value:", err) - return - } - } - - var size int - size, err = c.finish() - if err != nil { - h.t.Error("error when finishing constructor:", err) - return - } - - h.t.Logf(name+": final size is %d bytes", size) - h.testScan(name, c) - h.testSeek(name, c) - c.customTest(h) - h.t.Log(name + ": test is done") -} - -func (h *stHarness) testScan(name string, c stConstructor) { - it := c.newIterator() - for i := 0; i < 3; i++ { - if it.Prev() { - h.t.Errorf(name+": SortedTest: Scan: Backward: expecting eof (it=%d)", i) - } else if it.Valid() { - h.t.Errorf(name+": SortedTest: Scan: Backward: Valid != false (it=%d)", i) - } - } - it.Release() - - it = c.newIterator() - defer it.Release() - var first, last bool - -first: - for i := range h.keys { - if !it.Next() { - h.t.Error(name + ": SortedTest: Scan: Forward: unxepected eof") - } else if !it.Valid() { - h.t.Error(name + ": SortedTest: Scan: Forward: Valid != true") - } - rkey, rval := string(it.Key()), string(it.Value()) - if rkey != h.keys[i] { - h.t.Errorf(name+": SortedTest: Scan: Forward: key are invalid, got=%q want=%q", - shorten(rkey), shorten(h.keys[i])) - } - if rval != h.values[i] { - h.t.Errorf(name+": SortedTest: Scan: Forward: value are invalid, got=%q want=%q", - shorten(rval), shorten(h.values[i])) - } - } - - if !first { - first = true - if !it.First() { - h.t.Error(name + ": SortedTest: Scan: ToFirst: unxepected eof") - } else if !it.Valid() { - h.t.Error(name + ": SortedTest: Scan: ToFirst: Valid != true") - } - rkey, rval := string(it.Key()), string(it.Value()) - if rkey != h.keys[0] { - h.t.Errorf(name+": SortedTest: Scan: ToFirst: key are invalid, got=%q want=%q", - shorten(rkey), shorten(h.keys[0])) - } - if rval != h.values[0] { - h.t.Errorf(name+": SortedTest: Scan: ToFirst: value are invalid, got=%q want=%q", - shorten(rval), shorten(h.values[0])) - } - if it.Prev() { - h.t.Error(name + ": SortedTest: Scan: ToFirst: Prev: expecting eof") - } else if it.Valid() { - h.t.Error(name + ": SortedTest: Scan: ToFirst: Prev: Valid != false") - } - goto first - } - -last: - for i := 0; i < 3; i++ { - if it.Next() { - h.t.Errorf(name+": SortedTest: Scan: Forward: expecting eof (it=%d)", i) - } else if it.Valid() { - h.t.Errorf(name+": SortedTest: Scan: Forward: Valid != false (it=%d)", i) - } - } - - for i := len(h.keys) - 1; i >= 0; i-- { - if !it.Prev() { - h.t.Error(name + ": SortedTest: Scan: Backward: unxepected eof") - } else if !it.Valid() { - h.t.Error(name + ": SortedTest: Scan: Backward: Valid != true") - } - rkey, rval := string(it.Key()), string(it.Value()) - if rkey != h.keys[i] { - h.t.Errorf(name+": SortedTest: Scan: Backward: key are invalid, got=%q want=%q", - shorten(rkey), shorten(h.keys[i])) - } - if rval != h.values[i] { - h.t.Errorf(name+": SortedTest: Scan: Backward: value are invalid, got=%q want=%q", - shorten(rval), shorten(h.values[i])) - } - } - - if !last { - last = true - if !it.Last() { - h.t.Error(name + ": SortedTest: Scan: ToLast: unxepected eof") - } else if !it.Valid() { - h.t.Error(name + ": SortedTest: Scan: ToLast: Valid != true") - } - i := len(h.keys) - 1 - rkey, rval := string(it.Key()), string(it.Value()) - if rkey != h.keys[i] { - h.t.Errorf(name+": SortedTest: Scan: ToLast: key are invalid, got=%q want=%q", - shorten(rkey), shorten(h.keys[i])) - } - if rval != h.values[i] { - h.t.Errorf(name+": SortedTest: Scan: ToLast: value are invalid, got=%q want=%q", - shorten(rval), shorten(h.values[i])) - } - goto last - } - - for i := 0; i < 3; i++ { - if it.Prev() { - h.t.Errorf(name+": SortedTest: Scan: Backward: expecting eof (it=%d)", i) - } else if it.Valid() { - h.t.Errorf(name+": SortedTest: Scan: Backward: Valid != false (it=%d)", i) - } - } -} - -func (h *stHarness) testSeek(name string, c stConstructor) { - it := c.newIterator() - defer it.Release() - - for i, key := range h.keys { - if !it.Seek([]byte(key)) { - h.t.Errorf(name+": SortedTest: Seek: key %q is not found, err: %v", - shorten(key), it.Error()) - continue - } else if !it.Valid() { - h.t.Error(name + ": SortedTest: Seek: Valid != true") - } - - for j := i; j >= 0; j-- { - rkey, rval := string(it.Key()), string(it.Value()) - if rkey != h.keys[j] { - h.t.Errorf(name+": SortedTest: Seek: key are invalid, got=%q want=%q", - shorten(rkey), shorten(h.keys[j])) - } - if rval != h.values[j] { - h.t.Errorf(name+": SortedTest: Seek: value are invalid, got=%q want=%q", - shorten(rval), shorten(h.values[j])) - } - ret := it.Prev() - if j == 0 && ret { - h.t.Error(name + ": SortedTest: Seek: Backward: expecting eof") - } else if j > 0 && !ret { - h.t.Error(name+": SortedTest: Seek: Backward: unxepected eof, err: ", it.Error()) - } - } - } -} - -func TestSorted_EmptyKey(t *testing.T) { - h := newStHarness(t) - h.add("", "v") - h.testAll() -} - -func TestSorted_EmptyValue(t *testing.T) { - h := newStHarness(t) - h.add("abc", "") - h.add("abcd", "") - h.testAll() -} - -func TestSorted_Single(t *testing.T) { - h := newStHarness(t) - h.add("abc", "v") - h.testAll() -} - -func TestSorted_SingleBig(t *testing.T) { - h := newStHarness(t) - h.add("big1", strings.Repeat("1", 200000)) - h.testAll() -} - -func TestSorted_Multi(t *testing.T) { - h := newStHarness(t) - h.add("a", "v") - h.add("aa", "v1") - h.add("aaa", "v2") - h.add("aaacccccccccc", "v2") - h.add("aaaccccccccccd", "v3") - h.add("aaaccccccccccf", "v4") - h.add("aaaccccccccccfg", "v5") - h.add("ab", "v6") - h.add("abc", "v7") - h.add("abcd", "v8") - h.add("accccccccccccccc", "v9") - h.add("b", "v10") - h.add("bb", "v11") - h.add("bc", "v12") - h.add("c", "v13") - h.add("c1", "v13") - h.add("czzzzzzzzzzzzzz", "v14") - h.add("fffffffffffffff", "v15") - h.add("g11", "v15") - h.add("g111", "v15") - h.add("g111\xff", "v15") - h.add("zz", "v16") - h.add("zzzzzzz", "v16") - h.add("zzzzzzzzzzzzzzzz", "v16") - h.testAll() -} - -func TestSorted_SpecialKey(t *testing.T) { - h := newStHarness(t) - h.add("\xff\xff", "v3") - h.testAll() -} - -func TestSorted_GeneratedShort(t *testing.T) { - h := newStHarness(t) - h.add("", "v") - n := 0 - for c := byte('a'); c <= byte('o'); c++ { - for i := 1; i < 10; i++ { - key := bytes.Repeat([]byte{c}, i) - h.add(string(key), "v"+fmt.Sprint(n)) - n++ - } - } - h.testAll() -} - -func TestSorted_GeneratedLong(t *testing.T) { - h := newStHarness(t) - n := 0 - for c := byte('a'); c <= byte('o'); c++ { - for i := 150; i < 180; i++ { - key := bytes.Repeat([]byte{c}, i) - h.add(string(key), "v"+fmt.Sprint(n)) - n++ - } - } - h.testAll() -} diff --git a/leveldb/table.go b/leveldb/table.go index 9d17aad8..82ed1f26 100644 --- a/leveldb/table.go +++ b/leveldb/table.go @@ -16,6 +16,7 @@ import ( "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/storage" "github.com/syndtr/goleveldb/leveldb/table" + "github.com/syndtr/goleveldb/leveldb/util" ) // table file @@ -103,7 +104,13 @@ func (tf tFiles) size() (sum uint64) { return sum } -func (tf tFiles) search(key iKey, icmp *iComparer) int { +func (tf tFiles) searchMin(key iKey, icmp *iComparer) int { + return sort.Search(len(tf), func(i int) bool { + return icmp.Compare(tf[i].min, key) >= 0 + }) +} + +func (tf tFiles) searchMax(key iKey, icmp *iComparer) int { return sort.Search(len(tf), func(i int) bool { return icmp.Compare(tf[i].max, key) >= 0 }) @@ -125,7 +132,7 @@ func (tf tFiles) isOverlaps(min, max []byte, disjSorted bool, icmp *iComparer) b var idx int if len(min) > 0 { // Find the earliest possible internal key for min - idx = tf.search(newIKey(min, kMaxSeq, tSeek), icmp) + idx = tf.searchMax(newIKey(min, kMaxSeq, tSeek), icmp) } if idx >= len(tf) { @@ -179,28 +186,45 @@ func (tf tFiles) getRange(icmp *iComparer) (min, max iKey) { return } -func (tf tFiles) newIndexIterator(tops *tOps, icmp *iComparer, ro *opt.ReadOptions) iterator.IteratorIndexer { +func (tf tFiles) newIndexIterator(tops *tOps, icmp *iComparer, slice *util.Range, ro *opt.ReadOptions) iterator.IteratorIndexer { + if slice != nil { + var start, limit int + if slice.Start != nil { + start = tf.searchMax(iKey(slice.Start), icmp) + } + if slice.Limit != nil { + limit = tf.searchMin(iKey(slice.Limit), icmp) + } else { + limit = tf.Len() + } + tf = tf[start:limit] + } return iterator.NewArrayIndexer(&tFilesArrayIndexer{ tFiles: tf, tops: tops, icmp: icmp, + slice: slice, ro: ro, }) } type tFilesArrayIndexer struct { tFiles - tops *tOps - icmp *iComparer - ro *opt.ReadOptions + tops *tOps + icmp *iComparer + slice *util.Range + ro *opt.ReadOptions } func (a *tFilesArrayIndexer) Search(key []byte) int { - return a.search(iKey(key), a.icmp) + return a.searchMax(iKey(key), a.icmp) } func (a *tFilesArrayIndexer) Get(i int) iterator.Iterator { - return a.tops.newIterator(a.tFiles[i], a.ro) + if i == 0 || i == a.Len()-1 { + return a.tops.newIterator(a.tFiles[i], a.slice, a.ro) + } + return a.tops.newIterator(a.tFiles[i], nil, a.ro) } type tFilesSortByKey struct { @@ -325,12 +349,12 @@ func (t *tOps) getApproximateOffset(f *tFile, key []byte) (offset uint64, err er return } -func (t *tOps) newIterator(f *tFile, ro *opt.ReadOptions) iterator.Iterator { +func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { c, err := t.lookup(f) if err != nil { return iterator.NewEmptyIterator(err) } - iter := c.Value().(*table.Reader).NewIterator(nil, ro) + iter := c.Value().(*table.Reader).NewIterator(slice, ro) iter.SetReleaser(c) return iter } diff --git a/leveldb/testutil/db.go b/leveldb/testutil/db.go index 710faf17..4b87b5ef 100644 --- a/leveldb/testutil/db.go +++ b/leveldb/testutil/db.go @@ -12,9 +12,32 @@ import ( . "github.com/onsi/gomega" + "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/util" ) +type DB interface{} + +type Put interface { + TestPut(key []byte, value []byte) error +} + +type Delete interface { + TestDelete(key []byte) error +} + +type Find interface { + TestFind(key []byte) (rkey, rvalue []byte, err error) +} + +type Get interface { + TestGet(key []byte) (value []byte, err error) +} + +type NewIterator interface { + TestNewIterator(slice *util.Range) iterator.Iterator +} + type DBAct int func (a DBAct) String() string { @@ -41,14 +64,6 @@ const ( DBDeleteNA ) -type Put interface { - TestPut(key []byte, value []byte) error -} - -type Delete interface { - TestDelete(key []byte) error -} - type DBTesting struct { Rand *rand.Rand DB interface { @@ -185,4 +200,17 @@ func DoDBTesting(t *DBTesting) { t.PutRandom() } t.RandomAct((t.Deleted.Len() + t.Present.Len()) * 10) + + // Additional iterator testing + if db, ok := t.DB.(NewIterator); ok { + iter := db.TestNewIterator(nil) + Expect(iter.Error()).NotTo(HaveOccurred()) + + it := IteratorTesting{ + KeyValue: t.Present, + Iter: iter, + } + + DoIteratorTesting(&it) + } } diff --git a/leveldb/testutil/iter.go b/leveldb/testutil/iter.go index 3120d5d5..df6d9db6 100644 --- a/leveldb/testutil/iter.go +++ b/leveldb/testutil/iter.go @@ -99,6 +99,7 @@ func (t *IteratorTesting) IsLast() bool { func (t *IteratorTesting) TestKV() { t.init() key, value := t.Index(t.Pos) + Expect(t.Iter.Key()).NotTo(BeNil()) Expect(t.Iter.Key()).Should(Equal(key), "Key is invalid, %s", t.text()) Expect(t.Iter.Value()).Should(Equal(value), "Value for key %q, %s", key, t.text()) } diff --git a/leveldb/testutil/kvtest.go b/leveldb/testutil/kvtest.go index 4bacf8e2..4fc75b6f 100644 --- a/leveldb/testutil/kvtest.go +++ b/leveldb/testutil/kvtest.go @@ -13,24 +13,9 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/util" ) -type DB interface{} - -type Find interface { - TestFind(key []byte) (rkey, rvalue []byte, err error) -} - -type Get interface { - TestGet(key []byte) (value []byte, err error) -} - -type NewIterator interface { - TestNewIterator(slice *util.Range) iterator.Iterator -} - func KeyValueTesting(rnd *rand.Rand, p DB, kv KeyValue) { if rnd == nil { rnd = NewRand() diff --git a/leveldb/testutil/storage.go b/leveldb/testutil/storage.go new file mode 100644 index 00000000..bd5fc12e --- /dev/null +++ b/leveldb/testutil/storage.go @@ -0,0 +1,580 @@ +// Copyright (c) 2014, Suryandaru Triandana +// All rights reserved. +// +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package testutil + +import ( + "bytes" + "fmt" + "io" + "os" + "path/filepath" + "runtime" + "strings" + "sync" + + . "github.com/onsi/gomega" + + "github.com/syndtr/goleveldb/leveldb/storage" + "github.com/syndtr/goleveldb/leveldb/util" +) + +var ( + storageMu sync.Mutex + storageUseFS bool = true + storageKeepFS bool = false + storageNum int +) + +type StorageMode int + +const ( + ModeOpen StorageMode = 1 << iota + ModeCreate + ModeRemove + ModeRead + ModeWrite + ModeSync + ModeClose +) + +const ( + modeOpen = iota + modeCreate + modeRemove + modeRead + modeWrite + modeSync + modeClose + + modeCount +) + +const ( + typeManifest = iota + typeJournal + typeTable + + typeCount +) + +const flattenCount = modeCount * typeCount + +func flattenType(m StorageMode, t storage.FileType) int { + var x int + switch m { + case ModeOpen: + x = modeOpen + case ModeCreate: + x = modeCreate + case ModeRemove: + x = modeRemove + case ModeRead: + x = modeRead + case ModeWrite: + x = modeWrite + case ModeSync: + x = modeSync + case ModeClose: + x = modeClose + default: + panic("invalid storage mode") + } + x *= typeCount + switch t { + case storage.TypeManifest: + return x + typeManifest + case storage.TypeJournal: + return x + typeJournal + case storage.TypeTable: + return x + typeTable + default: + panic("invalid file type") + } +} + +func listFlattenType(m StorageMode, t storage.FileType) []int { + ret := make([]int, 0, flattenCount) + add := func(x int) { + x *= typeCount + switch { + case t&storage.TypeManifest != 0: + ret = append(ret, x+typeManifest) + case t&storage.TypeJournal != 0: + ret = append(ret, x+typeJournal) + case t&storage.TypeTable != 0: + ret = append(ret, x+typeTable) + } + } + switch { + case m&ModeOpen != 0: + add(modeOpen) + case m&ModeCreate != 0: + add(modeCreate) + case m&ModeRemove != 0: + add(modeRemove) + case m&ModeRead != 0: + add(modeRead) + case m&ModeWrite != 0: + add(modeWrite) + case m&ModeSync != 0: + add(modeSync) + case m&ModeClose != 0: + add(modeClose) + } + return ret +} + +func packFile(num uint64, t storage.FileType) uint64 { + if num>>(64-typeCount) != 0 { + panic("overflow") + } + return num<> typeCount, storage.FileType(x) & storage.TypeAll +} + +type emulatedError struct { + err error +} + +func (err emulatedError) Error() string { + return fmt.Sprintf("emulated storage error: %v", err.err) +} + +type storageLock struct { + s *Storage + r util.Releaser +} + +func (l storageLock) Release() { + l.r.Release() + l.s.logI("storage lock released") +} + +type reader struct { + f *file + storage.Reader +} + +func (r *reader) Read(p []byte) (n int, err error) { + err = r.f.s.emulateError(ModeRead, r.f.Type()) + if err == nil { + r.f.s.stall(ModeRead, r.f.Type()) + n, err = r.Reader.Read(p) + } + r.f.s.count(ModeRead, r.f.Type(), n) + if err != nil && err != io.EOF { + r.f.s.logI("read error, num=%d type=%v n=%d err=%v", r.f.Num(), r.f.Type(), n, err) + } + return +} + +func (r *reader) ReadAt(p []byte, off int64) (n int, err error) { + err = r.f.s.emulateError(ModeRead, r.f.Type()) + if err == nil { + r.f.s.stall(ModeRead, r.f.Type()) + n, err = r.Reader.ReadAt(p, off) + } + r.f.s.count(ModeRead, r.f.Type(), n) + if err != nil && err != io.EOF { + r.f.s.logI("readAt error, num=%d type=%v offset=%d n=%d err=%v", r.f.Num(), r.f.Type(), off, n, err) + } + return +} + +func (r *reader) Close() (err error) { + return r.f.doClose(r.Reader) +} + +type writer struct { + f *file + storage.Writer +} + +func (w *writer) Write(p []byte) (n int, err error) { + err = w.f.s.emulateError(ModeWrite, w.f.Type()) + if err == nil { + w.f.s.stall(ModeWrite, w.f.Type()) + n, err = w.Writer.Write(p) + } + w.f.s.count(ModeWrite, w.f.Type(), n) + if err != nil && err != io.EOF { + w.f.s.logI("write error, num=%d type=%v n=%d err=%v", w.f.Num(), w.f.Type(), n, err) + } + return +} + +func (w *writer) Sync() (err error) { + err = w.f.s.emulateError(ModeSync, w.f.Type()) + if err == nil { + w.f.s.stall(ModeSync, w.f.Type()) + err = w.Writer.Sync() + } + w.f.s.count(ModeSync, w.f.Type(), 0) + if err != nil { + w.f.s.logI("sync error, num=%d type=%v err=%v", w.f.Num(), w.f.Type(), err) + } + return +} + +func (w *writer) Close() (err error) { + return w.f.doClose(w.Writer) +} + +type file struct { + s *Storage + storage.File +} + +func (f *file) pack() uint64 { + return packFile(f.Num(), f.Type()) +} + +func (f *file) assertOpen() { + ExpectWithOffset(2, f.s.opens).NotTo(HaveKey(f.pack()), "File open, num=%d type=%v writer=%v", f.Num(), f.Type(), f.s.opens[f.pack()]) +} + +func (f *file) doClose(closer io.Closer) (err error) { + err = f.s.emulateError(ModeClose, f.Type()) + if err == nil { + f.s.stall(ModeClose, f.Type()) + } + f.s.mu.Lock() + defer f.s.mu.Unlock() + if err == nil { + ExpectWithOffset(2, f.s.opens).To(HaveKey(f.pack()), "File closed, num=%d type=%v", f.Num(), f.Type()) + err = closer.Close() + } + f.s.countNB(ModeClose, f.Type(), 0) + writer := f.s.opens[f.pack()] + if err != nil { + f.s.logISkip(1, "file close failed, num=%d type=%v writer=%v err=%v", f.Num(), f.Type(), writer, err) + } else { + f.s.logISkip(1, "file closed, num=%d type=%v writer=%v", f.Num(), f.Type(), writer) + delete(f.s.opens, f.pack()) + } + return +} + +func (f *file) Open() (r storage.Reader, err error) { + err = f.s.emulateError(ModeOpen, f.Type()) + if err == nil { + f.s.stall(ModeOpen, f.Type()) + } + f.s.mu.Lock() + defer f.s.mu.Unlock() + if err == nil { + f.assertOpen() + f.s.countNB(ModeOpen, f.Type(), 0) + r, err = f.File.Open() + } + if err != nil { + f.s.logI("file open failed, num=%d type=%v err=%v", f.Num(), f.Type(), err) + } else { + f.s.logI("file opened, num=%d type=%v", f.Num(), f.Type()) + f.s.opens[f.pack()] = false + r = &reader{f, r} + } + return +} + +func (f *file) Create() (w storage.Writer, err error) { + err = f.s.emulateError(ModeCreate, f.Type()) + if err == nil { + f.s.stall(ModeCreate, f.Type()) + } + f.s.mu.Lock() + defer f.s.mu.Unlock() + if err == nil { + f.assertOpen() + f.s.countNB(ModeCreate, f.Type(), 0) + w, err = f.File.Create() + } + if err != nil { + f.s.logI("file create failed, num=%d type=%v err=%v", f.Num(), f.Type(), err) + } else { + f.s.logI("file created, num=%d type=%v", f.Num(), f.Type()) + f.s.opens[f.pack()] = true + w = &writer{f, w} + } + return +} + +func (f *file) Remove() (err error) { + err = f.s.emulateError(ModeRemove, f.Type()) + if err == nil { + f.s.stall(ModeRemove, f.Type()) + } + f.s.mu.Lock() + defer f.s.mu.Unlock() + if err == nil { + f.assertOpen() + f.s.countNB(ModeRemove, f.Type(), 0) + err = f.File.Remove() + } + if err != nil { + f.s.logI("file remove failed, num=%d type=%v err=%v", f.Num(), f.Type(), err) + } else { + f.s.logI("file removed, num=%d type=%v", f.Num(), f.Type()) + } + return +} + +type Storage struct { + storage.Storage + closeFn func() error + + lmu sync.Mutex + lb bytes.Buffer + + mu sync.Mutex + // Open files, true=writer, false=reader + opens map[uint64]bool + counters [flattenCount]int + bytesCounter [flattenCount]int64 + emulatedError [flattenCount]error + stallCond sync.Cond + stalled [flattenCount]bool +} + +func (s *Storage) log(skip int, str string) { + s.lmu.Lock() + defer s.lmu.Unlock() + _, file, line, ok := runtime.Caller(skip + 2) + if ok { + // Truncate file name at last file name separator. + if index := strings.LastIndex(file, "/"); index >= 0 { + file = file[index+1:] + } else if index = strings.LastIndex(file, "\\"); index >= 0 { + file = file[index+1:] + } + } else { + file = "???" + line = 1 + } + fmt.Fprintf(&s.lb, "%s:%d: ", file, line) + lines := strings.Split(str, "\n") + if l := len(lines); l > 1 && lines[l-1] == "" { + lines = lines[:l-1] + } + for i, line := range lines { + if i > 0 { + s.lb.WriteString("\n\t") + } + s.lb.WriteString(line) + } + s.lb.WriteByte('\n') +} + +func (s *Storage) logISkip(skip int, format string, args ...interface{}) { + pc, _, _, ok := runtime.Caller(skip + 1) + if ok { + if f := runtime.FuncForPC(pc); f != nil { + fname := f.Name() + if index := strings.LastIndex(fname, "."); index >= 0 { + fname = fname[index+1:] + } + format = fname + ": " + format + } + } + s.log(skip+1, fmt.Sprintf(format, args...)) +} + +func (s *Storage) logI(format string, args ...interface{}) { + s.logISkip(1, format, args...) +} + +func (s *Storage) Log(str string) { + s.log(1, "Log: "+str) +} + +func (s *Storage) Lock() (r util.Releaser, err error) { + r, err = s.Storage.Lock() + if err != nil { + s.logI("storage locking failed, err=%v", err) + } else { + s.logI("storage locked") + r = storageLock{s, r} + } + return +} + +func (s *Storage) GetFile(num uint64, t storage.FileType) storage.File { + return &file{s, s.Storage.GetFile(num, t)} +} + +func (s *Storage) GetFiles(t storage.FileType) (files []storage.File, err error) { + rfiles, err := s.Storage.GetFiles(t) + if err != nil { + s.logI("get files failed, err=%v", err) + return + } + files = make([]storage.File, len(rfiles)) + for i, f := range rfiles { + files[i] = &file{s, f} + } + s.logI("get files, type=0x%x count=%d", int(t), len(files)) + return +} + +func (s *Storage) GetManifest() (f storage.File, err error) { + manifest, err := s.Storage.GetManifest() + if err != nil { + if !os.IsNotExist(err) { + s.logI("get manifest failed, err=%v", err) + } + return + } + s.logI("get manifest, num=%d", manifest.Num()) + return &file{s, manifest}, nil +} + +func (s *Storage) SetManifest(f storage.File) error { + f_, ok := f.(*file) + ExpectWithOffset(1, ok).To(BeTrue()) + ExpectWithOffset(1, f_.Type()).To(Equal(storage.TypeManifest)) + err := s.Storage.SetManifest(f_.File) + if err != nil { + s.logI("set manifest failed, err=%v", err) + } else { + s.logI("set manifest, num=%d", f_.Num()) + } + return err +} + +func (s *Storage) openFiles() string { + out := "Open files:" + for x, writer := range s.opens { + num, t := unpackFile(x) + out += fmt.Sprintf("\n ยท num=%d type=%v writer=%v", num, t, writer) + } + return out +} + +func (s *Storage) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + ExpectWithOffset(1, s.opens).To(BeEmpty(), s.openFiles()) + err := s.Storage.Close() + if err != nil { + s.logI("storage closing failed, err=%v", err) + } else { + s.logI("storage closed") + } + if s.closeFn != nil { + if err1 := s.closeFn(); err1 != nil { + s.logI("close func error, err=%v", err1) + } + } + return err +} + +func (s *Storage) countNB(m StorageMode, t storage.FileType, n int) { + s.counters[flattenType(m, t)]++ + s.bytesCounter[flattenType(m, t)] += int64(n) +} + +func (s *Storage) count(m StorageMode, t storage.FileType, n int) { + s.mu.Lock() + defer s.mu.Unlock() + s.countNB(m, t, n) +} + +func (s *Storage) ResetCounter(m StorageMode, t storage.FileType) { + for _, x := range listFlattenType(m, t) { + s.counters[x] = 0 + s.bytesCounter[x] = 0 + } +} + +func (s *Storage) Counter(m StorageMode, t storage.FileType) (count int, bytes int64) { + for _, x := range listFlattenType(m, t) { + count += s.counters[x] + bytes += s.bytesCounter[x] + } + return +} + +func (s *Storage) emulateError(m StorageMode, t storage.FileType) error { + s.mu.Lock() + defer s.mu.Unlock() + err := s.emulatedError[flattenType(m, t)] + if err != nil { + return emulatedError{err} + } + return nil +} + +func (s *Storage) EmulateError(m StorageMode, t storage.FileType, err error) { + s.mu.Lock() + defer s.mu.Unlock() + for _, x := range listFlattenType(m, t) { + s.emulatedError[x] = err + } +} + +func (s *Storage) stall(m StorageMode, t storage.FileType) { + x := flattenType(m, t) + s.mu.Lock() + defer s.mu.Unlock() + for s.stalled[x] { + s.stallCond.Wait() + } +} + +func (s *Storage) Stall(m StorageMode, t storage.FileType) { + s.mu.Lock() + defer s.mu.Unlock() + for _, x := range listFlattenType(m, t) { + s.stalled[x] = true + } +} + +func (s *Storage) Release(m StorageMode, t storage.FileType) { + s.mu.Lock() + defer s.mu.Unlock() + for _, x := range listFlattenType(m, t) { + s.stalled[x] = false + } + s.stallCond.Broadcast() +} + +func NewStorage() *Storage { + var stor storage.Storage + var closeFn func() error + if storageUseFS { + for { + storageMu.Lock() + num := storageNum + storageNum++ + storageMu.Unlock() + path := filepath.Join(os.TempDir(), fmt.Sprintf("goleveldb-test%d0%d0%d", os.Getuid(), os.Getpid(), num)) + if _, err := os.Stat(path); os.IsNotExist(err) { + stor, err = storage.OpenFile(path) + ExpectWithOffset(1, err).NotTo(HaveOccurred(), "creating storage at %s", path) + closeFn = func() error { + if storageKeepFS { + return nil + } + return os.RemoveAll(path) + } + break + } + } + } else { + stor = storage.NewMemStorage() + } + s := &Storage{ + Storage: stor, + closeFn: closeFn, + opens: make(map[uint64]bool), + } + s.stallCond.L = &s.mu + return s +} diff --git a/leveldb/testutil/util.go b/leveldb/testutil/util.go index 1f0966af..38fe25d5 100644 --- a/leveldb/testutil/util.go +++ b/leveldb/testutil/util.go @@ -10,6 +10,7 @@ import ( "bytes" "flag" "math/rand" + "reflect" "sync" "github.com/onsi/ginkgo/config" @@ -18,21 +19,43 @@ import ( ) var ( - runfn []func() + runfn = make(map[string][]func()) runmu sync.Mutex ) -func Defer(fn func()) bool { - runmu.Lock() - runfn = append(runfn, fn) - runmu.Unlock() +func Defer(args ...interface{}) bool { + var ( + group string + fn func() + ) + for _, arg := range args { + v := reflect.ValueOf(arg) + switch v.Kind() { + case reflect.String: + group = v.String() + case reflect.Func: + r := reflect.ValueOf(&fn).Elem() + r.Set(v) + } + } + if fn != nil { + runmu.Lock() + runfn[group] = append(runfn[group], fn) + runmu.Unlock() + } return true } -func RunDefer() bool { +func RunDefer(groups ...string) bool { + if len(groups) == 0 { + groups = append(groups, "") + } runmu.Lock() - runfn_ := runfn - runfn = nil + var runfn_ []func() + for _, group := range groups { + runfn_ = append(runfn_, runfn[group]...) + delete(runfn, group) + } runmu.Unlock() for _, fn := range runfn_ { fn() diff --git a/leveldb/testutil_test.go b/leveldb/testutil_test.go new file mode 100644 index 00000000..c1402fda --- /dev/null +++ b/leveldb/testutil_test.go @@ -0,0 +1,58 @@ +// Copyright (c) 2014, Suryandaru Triandana +// All rights reserved. +// +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package leveldb + +import ( + . "github.com/onsi/gomega" + + "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/testutil" + "github.com/syndtr/goleveldb/leveldb/util" +) + +type testingDB struct { + *DB + ro *opt.ReadOptions + wo *opt.WriteOptions + stor *testutil.Storage +} + +func (t *testingDB) TestPut(key []byte, value []byte) error { + return t.Put(key, value, t.wo) +} + +func (t *testingDB) TestDelete(key []byte) error { + return t.Delete(key, t.wo) +} + +func (t *testingDB) TestGet(key []byte) (value []byte, err error) { + return t.Get(key, t.ro) +} + +func (t *testingDB) TestNewIterator(slice *util.Range) iterator.Iterator { + return t.NewIterator(slice, t.ro) +} + +func (t *testingDB) TestClose() { + err := t.Close() + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + err = t.stor.Close() + ExpectWithOffset(1, err).NotTo(HaveOccurred()) +} + +func newTestingDB(o *opt.Options, ro *opt.ReadOptions, wo *opt.WriteOptions) *testingDB { + stor := testutil.NewStorage() + db, err := Open(stor, o) + Expect(err).NotTo(HaveOccurred()) + return &testingDB{ + DB: db, + ro: ro, + wo: wo, + stor: stor, + } +} diff --git a/leveldb/version.go b/leveldb/version.go index 6ad27239..52ba2907 100644 --- a/leveldb/version.go +++ b/leveldb/version.go @@ -13,6 +13,7 @@ import ( "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/util" ) var levelMaxSize [kNumLevels]float64 @@ -124,7 +125,7 @@ func (v *version) get(key iKey, ro *opt.ReadOptions) (value []byte, cstate bool, tmp.sortByNum() ts = tmp } else { - i := ts.search(key, icmp) + i := ts.searchMax(key, icmp) if i >= len(ts) || ucmp.Compare(ukey, ts[i].min.ukey()) < 0 { continue } @@ -174,13 +175,13 @@ func (v *version) get(key iKey, ro *opt.ReadOptions) (value []byte, cstate bool, return } -func (v *version) getIterators(ro *opt.ReadOptions) (its []iterator.Iterator) { +func (v *version) getIterators(slice *util.Range, ro *opt.ReadOptions) (its []iterator.Iterator) { s := v.s icmp := s.cmp // Merge all level zero files together since they may overlap for _, t := range v.tables[0] { - it := s.tops.newIterator(t, ro) + it := s.tops.newIterator(t, slice, ro) its = append(its, it) } @@ -190,7 +191,7 @@ func (v *version) getIterators(ro *opt.ReadOptions) (its []iterator.Iterator) { continue } - it := iterator.NewIndexedIterator(tt.newIndexIterator(s.tops, icmp, ro), strict, true) + it := iterator.NewIndexedIterator(tt.newIndexIterator(s.tops, icmp, slice, ro), strict, true) its = append(its, it) }