diff --git a/boltdb.go b/boltdb.go index 0c28bf291..c1a430082 100644 --- a/boltdb.go +++ b/boltdb.go @@ -3,7 +3,6 @@ package db import ( - "bytes" "fmt" "os" "path/filepath" @@ -20,8 +19,7 @@ func init() { }, false) } -// BoltDB is a wrapper around etcd's fork of bolt -// (https://github.com/etcd-io/bbolt). +// BoltDB is a wrapper around etcd's fork of bolt (https://github.com/etcd-io/bbolt). // // NOTE: All operations (including Set, Delete) are synchronous by default. One // can globally turn it off by using NoSync config option (not recommended). @@ -32,6 +30,8 @@ type BoltDB struct { db *bbolt.DB } +var _ DB = (*BoltDB)(nil) + // NewBoltDB returns a BoltDB with default options. func NewBoltDB(name, dir string) (DB, error) { return NewBoltDBWithOpts(name, dir, bbolt.DefaultOptions) @@ -62,6 +62,7 @@ func NewBoltDBWithOpts(name string, dir string, opts *bbolt.Options) (DB, error) return &BoltDB{db: db}, nil } +// Get implements DB. func (bdb *BoltDB) Get(key []byte) (value []byte, err error) { key = nonEmptyKey(nonNilBytes(key)) err = bdb.db.View(func(tx *bbolt.Tx) error { @@ -77,6 +78,7 @@ func (bdb *BoltDB) Get(key []byte) (value []byte, err error) { return } +// Has implements DB. func (bdb *BoltDB) Has(key []byte) (bool, error) { bytes, err := bdb.Get(key) if err != nil { @@ -85,6 +87,7 @@ func (bdb *BoltDB) Has(key []byte) (bool, error) { return bytes != nil, nil } +// Set implements DB. func (bdb *BoltDB) Set(key, value []byte) error { key = nonEmptyKey(nonNilBytes(key)) value = nonNilBytes(value) @@ -98,10 +101,12 @@ func (bdb *BoltDB) Set(key, value []byte) error { return nil } +// SetSync implements DB. func (bdb *BoltDB) SetSync(key, value []byte) error { return bdb.Set(key, value) } +// Delete implements DB. func (bdb *BoltDB) Delete(key []byte) error { key = nonEmptyKey(nonNilBytes(key)) err := bdb.db.Update(func(tx *bbolt.Tx) error { @@ -113,14 +118,17 @@ func (bdb *BoltDB) Delete(key []byte) error { return nil } +// DeleteSync implements DB. func (bdb *BoltDB) DeleteSync(key []byte) error { return bdb.Delete(key) } +// Close implements DB. func (bdb *BoltDB) Close() error { return bdb.db.Close() } +// Print implements DB. func (bdb *BoltDB) Print() error { stats := bdb.db.Stats() fmt.Printf("%v\n", stats) @@ -138,6 +146,7 @@ func (bdb *BoltDB) Print() error { return nil } +// Stats implements DB. func (bdb *BoltDB) Stats() map[string]string { stats := bdb.db.Stats() m := make(map[string]string) @@ -155,14 +164,7 @@ func (bdb *BoltDB) Stats() map[string]string { return m } -// boltDBBatch stores key values in sync.Map and dumps them to the underlying -// DB upon Write call. -type boltDBBatch struct { - db *BoltDB - ops []operation -} - -// NewBatch returns a new batch. +// NewBatch implements DB. func (bdb *BoltDB) NewBatch() Batch { return &boltDBBatch{ ops: nil, @@ -170,49 +172,6 @@ func (bdb *BoltDB) NewBatch() Batch { } } -// It is safe to modify the contents of the argument after Set returns but not -// before. -func (bdb *boltDBBatch) Set(key, value []byte) { - bdb.ops = append(bdb.ops, operation{opTypeSet, key, value}) -} - -// It is safe to modify the contents of the argument after Delete returns but -// not before. -func (bdb *boltDBBatch) Delete(key []byte) { - bdb.ops = append(bdb.ops, operation{opTypeDelete, key, nil}) -} - -// NOTE: the operation is synchronous (see BoltDB for reasons) -func (bdb *boltDBBatch) Write() error { - err := bdb.db.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(bucket) - for _, op := range bdb.ops { - key := nonEmptyKey(nonNilBytes(op.key)) - switch op.opType { - case opTypeSet: - if putErr := b.Put(key, op.value); putErr != nil { - return putErr - } - case opTypeDelete: - if delErr := b.Delete(key); delErr != nil { - return delErr - } - } - } - return nil - }) - if err != nil { - return err - } - return nil -} - -func (bdb *boltDBBatch) WriteSync() error { - return bdb.Write() -} - -func (bdb *boltDBBatch) Close() {} - // WARNING: Any concurrent writes or reads will block until the iterator is // closed. func (bdb *BoltDB) Iterator(start, end []byte) (Iterator, error) { @@ -233,124 +192,6 @@ func (bdb *BoltDB) ReverseIterator(start, end []byte) (Iterator, error) { return newBoltDBIterator(tx, start, end, true), nil } -// boltDBIterator allows you to iterate on range of keys/values given some -// start / end keys (nil & nil will result in doing full scan). -type boltDBIterator struct { - tx *bbolt.Tx - - itr *bbolt.Cursor - start []byte - end []byte - - currentKey []byte - currentValue []byte - - isInvalid bool - isReverse bool -} - -func newBoltDBIterator(tx *bbolt.Tx, start, end []byte, isReverse bool) *boltDBIterator { - itr := tx.Bucket(bucket).Cursor() - - var ck, cv []byte - if isReverse { - if end == nil { - ck, cv = itr.Last() - } else { - _, _ = itr.Seek(end) // after key - ck, cv = itr.Prev() // return to end key - } - } else { - if start == nil { - ck, cv = itr.First() - } else { - ck, cv = itr.Seek(start) - } - } - - return &boltDBIterator{ - tx: tx, - itr: itr, - start: start, - end: end, - currentKey: ck, - currentValue: cv, - isReverse: isReverse, - isInvalid: false, - } -} - -func (itr *boltDBIterator) Domain() ([]byte, []byte) { - return itr.start, itr.end -} - -func (itr *boltDBIterator) Valid() bool { - if itr.isInvalid { - return false - } - - // iterated to the end of the cursor - if len(itr.currentKey) == 0 { - itr.isInvalid = true - return false - } - - if itr.isReverse { - if itr.start != nil && bytes.Compare(itr.currentKey, itr.start) < 0 { - itr.isInvalid = true - return false - } - } else { - if itr.end != nil && bytes.Compare(itr.end, itr.currentKey) <= 0 { - itr.isInvalid = true - return false - } - } - - // Valid - return true -} - -func (itr *boltDBIterator) Next() { - itr.assertIsValid() - if itr.isReverse { - itr.currentKey, itr.currentValue = itr.itr.Prev() - } else { - itr.currentKey, itr.currentValue = itr.itr.Next() - } -} - -func (itr *boltDBIterator) Key() []byte { - itr.assertIsValid() - return append([]byte{}, itr.currentKey...) -} - -func (itr *boltDBIterator) Value() []byte { - itr.assertIsValid() - var value []byte - if itr.currentValue != nil { - value = append([]byte{}, itr.currentValue...) - } - return value -} - -func (itr *boltDBIterator) Error() error { - return nil -} - -func (itr *boltDBIterator) Close() { - err := itr.tx.Rollback() - if err != nil { - panic(err) - } -} - -func (itr *boltDBIterator) assertIsValid() { - if !itr.Valid() { - panic("boltdb-iterator is invalid") - } -} - // nonEmptyKey returns a []byte("nil") if key is empty. // WARNING: this may collude with "nil" user key! func nonEmptyKey(key []byte) []byte { diff --git a/boltdb_batch.go b/boltdb_batch.go new file mode 100644 index 000000000..a5996fe38 --- /dev/null +++ b/boltdb_batch.go @@ -0,0 +1,52 @@ +// +build boltdb + +package db + +import "github.com/etcd-io/bbolt" + +// boltDBBatch stores operations internally and dumps them to BoltDB on Write(). +type boltDBBatch struct { + db *BoltDB + ops []operation +} + +var _ Batch = (*boltDBBatch)(nil) + +// Set implements Batch. +func (b *boltDBBatch) Set(key, value []byte) { + b.ops = append(b.ops, operation{opTypeSet, key, value}) +} + +// Delete implements Batch. +func (b *boltDBBatch) Delete(key []byte) { + b.ops = append(b.ops, operation{opTypeDelete, key, nil}) +} + +// Write implements Batch. +func (b *boltDBBatch) Write() error { + return b.db.db.Batch(func(tx *bbolt.Tx) error { + bkt := tx.Bucket(bucket) + for _, op := range b.ops { + key := nonEmptyKey(nonNilBytes(op.key)) + switch op.opType { + case opTypeSet: + if err := bkt.Put(key, op.value); err != nil { + return err + } + case opTypeDelete: + if err := bkt.Delete(key); err != nil { + return err + } + } + } + return nil + }) +} + +// WriteSync implements Batch. +func (b *boltDBBatch) WriteSync() error { + return b.Write() +} + +// Close implements Batch. +func (b *boltDBBatch) Close() {} diff --git a/boltdb_iterator.go b/boltdb_iterator.go new file mode 100644 index 000000000..4f56a0da0 --- /dev/null +++ b/boltdb_iterator.go @@ -0,0 +1,137 @@ +// +build boltdb + +package db + +import ( + "bytes" + + "github.com/etcd-io/bbolt" +) + +// boltDBIterator allows you to iterate on range of keys/values given some +// start / end keys (nil & nil will result in doing full scan). +type boltDBIterator struct { + tx *bbolt.Tx + + itr *bbolt.Cursor + start []byte + end []byte + + currentKey []byte + currentValue []byte + + isInvalid bool + isReverse bool +} + +var _ Iterator = (*boltDBIterator)(nil) + +// newBoltDBIterator creates a new boltDBIterator. +func newBoltDBIterator(tx *bbolt.Tx, start, end []byte, isReverse bool) *boltDBIterator { + itr := tx.Bucket(bucket).Cursor() + + var ck, cv []byte + if isReverse { + if end == nil { + ck, cv = itr.Last() + } else { + _, _ = itr.Seek(end) // after key + ck, cv = itr.Prev() // return to end key + } + } else { + if start == nil { + ck, cv = itr.First() + } else { + ck, cv = itr.Seek(start) + } + } + + return &boltDBIterator{ + tx: tx, + itr: itr, + start: start, + end: end, + currentKey: ck, + currentValue: cv, + isReverse: isReverse, + isInvalid: false, + } +} + +// Domain implements Iterator. +func (itr *boltDBIterator) Domain() ([]byte, []byte) { + return itr.start, itr.end +} + +// Valid implements Iterator. +func (itr *boltDBIterator) Valid() bool { + if itr.isInvalid { + return false + } + + // iterated to the end of the cursor + if len(itr.currentKey) == 0 { + itr.isInvalid = true + return false + } + + if itr.isReverse { + if itr.start != nil && bytes.Compare(itr.currentKey, itr.start) < 0 { + itr.isInvalid = true + return false + } + } else { + if itr.end != nil && bytes.Compare(itr.end, itr.currentKey) <= 0 { + itr.isInvalid = true + return false + } + } + + // Valid + return true +} + +// Next implements Iterator. +func (itr *boltDBIterator) Next() { + itr.assertIsValid() + if itr.isReverse { + itr.currentKey, itr.currentValue = itr.itr.Prev() + } else { + itr.currentKey, itr.currentValue = itr.itr.Next() + } +} + +// Key implements Iterator. +func (itr *boltDBIterator) Key() []byte { + itr.assertIsValid() + return append([]byte{}, itr.currentKey...) +} + +// Value implements Iterator. +func (itr *boltDBIterator) Value() []byte { + itr.assertIsValid() + var value []byte + if itr.currentValue != nil { + value = append([]byte{}, itr.currentValue...) + } + return value +} + +// Error implements Iterator. +func (itr *boltDBIterator) Error() error { + return nil +} + +// Close implements Iterator. +func (itr *boltDBIterator) Close() { + err := itr.tx.Rollback() + if err != nil { + panic(err) + } +} + +func (itr *boltDBIterator) assertIsValid() { + if !itr.Valid() { + panic("boltdb-iterator is invalid") + } +} diff --git a/c_level_db.go b/c_level_db.go deleted file mode 100644 index ca558fb9e..000000000 --- a/c_level_db.go +++ /dev/null @@ -1,339 +0,0 @@ -// +build cleveldb - -package db - -import ( - "bytes" - "fmt" - "path/filepath" - - "github.com/jmhodges/levigo" -) - -func init() { - dbCreator := func(name string, dir string) (DB, error) { - return NewCLevelDB(name, dir) - } - registerDBCreator(CLevelDBBackend, dbCreator, false) -} - -var _ DB = (*CLevelDB)(nil) - -type CLevelDB struct { - db *levigo.DB - ro *levigo.ReadOptions - wo *levigo.WriteOptions - woSync *levigo.WriteOptions -} - -func NewCLevelDB(name string, dir string) (*CLevelDB, error) { - dbPath := filepath.Join(dir, name+".db") - - opts := levigo.NewOptions() - opts.SetCache(levigo.NewLRUCache(1 << 30)) - opts.SetCreateIfMissing(true) - db, err := levigo.Open(dbPath, opts) - if err != nil { - return nil, err - } - ro := levigo.NewReadOptions() - wo := levigo.NewWriteOptions() - woSync := levigo.NewWriteOptions() - woSync.SetSync(true) - database := &CLevelDB{ - db: db, - ro: ro, - wo: wo, - woSync: woSync, - } - return database, nil -} - -// Implements DB. -func (db *CLevelDB) Get(key []byte) ([]byte, error) { - key = nonNilBytes(key) - res, err := db.db.Get(db.ro, key) - if err != nil { - return nil, err - } - return res, nil -} - -// Implements DB. -func (db *CLevelDB) Has(key []byte) (bool, error) { - bytes, err := db.Get(key) - if err != nil { - return false, err - } - return bytes != nil, nil -} - -// Implements DB. -func (db *CLevelDB) Set(key []byte, value []byte) error { - key = nonNilBytes(key) - value = nonNilBytes(value) - if err := db.db.Put(db.wo, key, value); err != nil { - return err - } - return nil -} - -// Implements DB. -func (db *CLevelDB) SetSync(key []byte, value []byte) error { - key = nonNilBytes(key) - value = nonNilBytes(value) - if err := db.db.Put(db.woSync, key, value); err != nil { - return err - } - return nil -} - -// Implements DB. -func (db *CLevelDB) Delete(key []byte) error { - key = nonNilBytes(key) - if err := db.db.Delete(db.wo, key); err != nil { - return err - } - return nil -} - -// Implements DB. -func (db *CLevelDB) DeleteSync(key []byte) error { - key = nonNilBytes(key) - if err := db.db.Delete(db.woSync, key); err != nil { - return err - } - return nil -} - -func (db *CLevelDB) DB() *levigo.DB { - return db.db -} - -// Implements DB. -func (db *CLevelDB) Close() error { - db.db.Close() - db.ro.Close() - db.wo.Close() - db.woSync.Close() - return nil -} - -// Implements DB. -func (db *CLevelDB) Print() error { - itr, err := db.Iterator(nil, nil) - if err != nil { - return err - } - defer itr.Close() - for ; itr.Valid(); itr.Next() { - key := itr.Key() - value := itr.Value() - fmt.Printf("[%X]:\t[%X]\n", key, value) - } - return nil -} - -// Implements DB. -func (db *CLevelDB) Stats() map[string]string { - keys := []string{ - "leveldb.aliveiters", - "leveldb.alivesnaps", - "leveldb.blockpool", - "leveldb.cachedblock", - "leveldb.num-files-at-level{n}", - "leveldb.openedtables", - "leveldb.sstables", - "leveldb.stats", - } - - stats := make(map[string]string, len(keys)) - for _, key := range keys { - str := db.db.PropertyValue(key) - stats[key] = str - } - return stats -} - -//---------------------------------------- -// Batch - -// Implements DB. -func (db *CLevelDB) NewBatch() Batch { - batch := levigo.NewWriteBatch() - return &cLevelDBBatch{db, batch} -} - -type cLevelDBBatch struct { - db *CLevelDB - batch *levigo.WriteBatch -} - -// Implements Batch. -func (mBatch *cLevelDBBatch) Set(key, value []byte) { - mBatch.batch.Put(key, value) -} - -// Implements Batch. -func (mBatch *cLevelDBBatch) Delete(key []byte) { - mBatch.batch.Delete(key) -} - -// Implements Batch. -func (mBatch *cLevelDBBatch) Write() error { - if err := mBatch.db.db.Write(mBatch.db.wo, mBatch.batch); err != nil { - return err - } - return nil -} - -// Implements Batch. -func (mBatch *cLevelDBBatch) WriteSync() error { - if err := mBatch.db.db.Write(mBatch.db.woSync, mBatch.batch); err != nil { - return err - } - return nil -} - -// Implements Batch. -func (mBatch *cLevelDBBatch) Close() { - mBatch.batch.Close() -} - -//---------------------------------------- -// Iterator -// NOTE This is almost identical to db/go_level_db.Iterator -// Before creating a third version, refactor. - -func (db *CLevelDB) Iterator(start, end []byte) (Iterator, error) { - itr := db.db.NewIterator(db.ro) - return newCLevelDBIterator(itr, start, end, false), nil -} - -func (db *CLevelDB) ReverseIterator(start, end []byte) (Iterator, error) { - itr := db.db.NewIterator(db.ro) - return newCLevelDBIterator(itr, start, end, true), nil -} - -var _ Iterator = (*cLevelDBIterator)(nil) - -type cLevelDBIterator struct { - source *levigo.Iterator - start, end []byte - isReverse bool - isInvalid bool -} - -func newCLevelDBIterator(source *levigo.Iterator, start, end []byte, isReverse bool) *cLevelDBIterator { - if isReverse { - if end == nil { - source.SeekToLast() - } else { - source.Seek(end) - if source.Valid() { - eoakey := source.Key() // end or after key - if bytes.Compare(end, eoakey) <= 0 { - source.Prev() - } - } else { - source.SeekToLast() - } - } - } else { - if start == nil { - source.SeekToFirst() - } else { - source.Seek(start) - } - } - return &cLevelDBIterator{ - source: source, - start: start, - end: end, - isReverse: isReverse, - isInvalid: false, - } -} - -func (itr cLevelDBIterator) Domain() ([]byte, []byte) { - return itr.start, itr.end -} - -func (itr cLevelDBIterator) Valid() bool { - - // Once invalid, forever invalid. - if itr.isInvalid { - return false - } - - // Panic on DB error. No way to recover. - itr.assertNoError() - - // If source is invalid, invalid. - if !itr.source.Valid() { - itr.isInvalid = true - return false - } - - // If key is end or past it, invalid. - var start = itr.start - var end = itr.end - var key = itr.source.Key() - if itr.isReverse { - if start != nil && bytes.Compare(key, start) < 0 { - itr.isInvalid = true - return false - } - } else { - if end != nil && bytes.Compare(end, key) <= 0 { - itr.isInvalid = true - return false - } - } - - // It's valid. - return true -} - -func (itr cLevelDBIterator) Key() []byte { - itr.assertNoError() - itr.assertIsValid() - return itr.source.Key() -} - -func (itr cLevelDBIterator) Value() []byte { - itr.assertNoError() - itr.assertIsValid() - return itr.source.Value() -} - -func (itr cLevelDBIterator) Next() { - itr.assertNoError() - itr.assertIsValid() - if itr.isReverse { - itr.source.Prev() - } else { - itr.source.Next() - } -} - -func (itr cLevelDBIterator) Error() error { - return itr.source.GetError() -} - -func (itr cLevelDBIterator) Close() { - itr.source.Close() -} - -func (itr cLevelDBIterator) assertNoError() { - err := itr.source.GetError() - if err != nil { - panic(err) - } -} - -func (itr cLevelDBIterator) assertIsValid() { - if !itr.Valid() { - panic("cLevelDBIterator is invalid") - } -} diff --git a/cleveldb.go b/cleveldb.go new file mode 100644 index 000000000..066126c1f --- /dev/null +++ b/cleveldb.go @@ -0,0 +1,176 @@ +// +build cleveldb + +package db + +import ( + "fmt" + "path/filepath" + + "github.com/jmhodges/levigo" +) + +func init() { + dbCreator := func(name string, dir string) (DB, error) { + return NewCLevelDB(name, dir) + } + registerDBCreator(CLevelDBBackend, dbCreator, false) +} + +// CLevelDB uses the C LevelDB database via a Go wrapper. +type CLevelDB struct { + db *levigo.DB + ro *levigo.ReadOptions + wo *levigo.WriteOptions + woSync *levigo.WriteOptions +} + +var _ DB = (*CLevelDB)(nil) + +// NewCLevelDB creates a new CLevelDB. +func NewCLevelDB(name string, dir string) (*CLevelDB, error) { + dbPath := filepath.Join(dir, name+".db") + + opts := levigo.NewOptions() + opts.SetCache(levigo.NewLRUCache(1 << 30)) + opts.SetCreateIfMissing(true) + db, err := levigo.Open(dbPath, opts) + if err != nil { + return nil, err + } + ro := levigo.NewReadOptions() + wo := levigo.NewWriteOptions() + woSync := levigo.NewWriteOptions() + woSync.SetSync(true) + database := &CLevelDB{ + db: db, + ro: ro, + wo: wo, + woSync: woSync, + } + return database, nil +} + +// Get implements DB. +func (db *CLevelDB) Get(key []byte) ([]byte, error) { + key = nonNilBytes(key) + res, err := db.db.Get(db.ro, key) + if err != nil { + return nil, err + } + return res, nil +} + +// Has implements DB. +func (db *CLevelDB) Has(key []byte) (bool, error) { + bytes, err := db.Get(key) + if err != nil { + return false, err + } + return bytes != nil, nil +} + +// Set implements DB. +func (db *CLevelDB) Set(key []byte, value []byte) error { + key = nonNilBytes(key) + value = nonNilBytes(value) + if err := db.db.Put(db.wo, key, value); err != nil { + return err + } + return nil +} + +// SetSync implements DB. +func (db *CLevelDB) SetSync(key []byte, value []byte) error { + key = nonNilBytes(key) + value = nonNilBytes(value) + if err := db.db.Put(db.woSync, key, value); err != nil { + return err + } + return nil +} + +// Delete implements DB. +func (db *CLevelDB) Delete(key []byte) error { + key = nonNilBytes(key) + if err := db.db.Delete(db.wo, key); err != nil { + return err + } + return nil +} + +// DeleteSync implements DB. +func (db *CLevelDB) DeleteSync(key []byte) error { + key = nonNilBytes(key) + if err := db.db.Delete(db.woSync, key); err != nil { + return err + } + return nil +} + +// FIXME This should not be exposed +func (db *CLevelDB) DB() *levigo.DB { + return db.db +} + +// Close implements DB. +func (db *CLevelDB) Close() error { + db.db.Close() + db.ro.Close() + db.wo.Close() + db.woSync.Close() + return nil +} + +// Print implements DB. +func (db *CLevelDB) Print() error { + itr, err := db.Iterator(nil, nil) + if err != nil { + return err + } + defer itr.Close() + for ; itr.Valid(); itr.Next() { + key := itr.Key() + value := itr.Value() + fmt.Printf("[%X]:\t[%X]\n", key, value) + } + return nil +} + +// Stats implements DB. +func (db *CLevelDB) Stats() map[string]string { + keys := []string{ + "leveldb.aliveiters", + "leveldb.alivesnaps", + "leveldb.blockpool", + "leveldb.cachedblock", + "leveldb.num-files-at-level{n}", + "leveldb.openedtables", + "leveldb.sstables", + "leveldb.stats", + } + + stats := make(map[string]string, len(keys)) + for _, key := range keys { + str := db.db.PropertyValue(key) + stats[key] = str + } + return stats +} + +// NewBatch implements DB. +func (db *CLevelDB) NewBatch() Batch { + batch := levigo.NewWriteBatch() + return &cLevelDBBatch{db, batch} +} + +// Iterator implements DB. +func (db *CLevelDB) Iterator(start, end []byte) (Iterator, error) { + itr := db.db.NewIterator(db.ro) + return newCLevelDBIterator(itr, start, end, false), nil +} + +// ReverseIterator implements DB. +func (db *CLevelDB) ReverseIterator(start, end []byte) (Iterator, error) { + itr := db.db.NewIterator(db.ro) + return newCLevelDBIterator(itr, start, end, true), nil +} diff --git a/cleveldb_batch.go b/cleveldb_batch.go new file mode 100644 index 000000000..c98fe3354 --- /dev/null +++ b/cleveldb_batch.go @@ -0,0 +1,42 @@ +// +build cleveldb + +package db + +import "github.com/jmhodges/levigo" + +// cLevelDBBatch is a LevelDB batch. +type cLevelDBBatch struct { + db *CLevelDB + batch *levigo.WriteBatch +} + +// Set implements Batch. +func (b *cLevelDBBatch) Set(key, value []byte) { + b.batch.Put(key, value) +} + +// Delete implements Batch. +func (b *cLevelDBBatch) Delete(key []byte) { + b.batch.Delete(key) +} + +// Write implements Batch. +func (b *cLevelDBBatch) Write() error { + if err := b.db.db.Write(b.db.wo, b.batch); err != nil { + return err + } + return nil +} + +// WriteSync implements Batch. +func (b *cLevelDBBatch) WriteSync() error { + if err := b.db.db.Write(b.db.woSync, b.batch); err != nil { + return err + } + return nil +} + +// Close implements Batch. +func (b *cLevelDBBatch) Close() { + b.batch.Close() +} diff --git a/cleveldb_iterator.go b/cleveldb_iterator.go new file mode 100644 index 000000000..951fd0407 --- /dev/null +++ b/cleveldb_iterator.go @@ -0,0 +1,140 @@ +// +build cleveldb + +package db + +import ( + "bytes" + + "github.com/jmhodges/levigo" +) + +// cLevelDBIterator is a cLevelDB iterator. +type cLevelDBIterator struct { + source *levigo.Iterator + start, end []byte + isReverse bool + isInvalid bool +} + +var _ Iterator = (*cLevelDBIterator)(nil) + +func newCLevelDBIterator(source *levigo.Iterator, start, end []byte, isReverse bool) *cLevelDBIterator { + if isReverse { + if end == nil { + source.SeekToLast() + } else { + source.Seek(end) + if source.Valid() { + eoakey := source.Key() // end or after key + if bytes.Compare(end, eoakey) <= 0 { + source.Prev() + } + } else { + source.SeekToLast() + } + } + } else { + if start == nil { + source.SeekToFirst() + } else { + source.Seek(start) + } + } + return &cLevelDBIterator{ + source: source, + start: start, + end: end, + isReverse: isReverse, + isInvalid: false, + } +} + +// Domain implements Iterator. +func (itr cLevelDBIterator) Domain() ([]byte, []byte) { + return itr.start, itr.end +} + +// Valid implements Iterator. +func (itr cLevelDBIterator) Valid() bool { + + // Once invalid, forever invalid. + if itr.isInvalid { + return false + } + + // Panic on DB error. No way to recover. + itr.assertNoError() + + // If source is invalid, invalid. + if !itr.source.Valid() { + itr.isInvalid = true + return false + } + + // If key is end or past it, invalid. + var start = itr.start + var end = itr.end + var key = itr.source.Key() + if itr.isReverse { + if start != nil && bytes.Compare(key, start) < 0 { + itr.isInvalid = true + return false + } + } else { + if end != nil && bytes.Compare(end, key) <= 0 { + itr.isInvalid = true + return false + } + } + + // It's valid. + return true +} + +// Key implements Iterator. +func (itr cLevelDBIterator) Key() []byte { + itr.assertNoError() + itr.assertIsValid() + return itr.source.Key() +} + +// Value implements Iterator. +func (itr cLevelDBIterator) Value() []byte { + itr.assertNoError() + itr.assertIsValid() + return itr.source.Value() +} + +// Next implements Iterator. +func (itr cLevelDBIterator) Next() { + itr.assertNoError() + itr.assertIsValid() + if itr.isReverse { + itr.source.Prev() + } else { + itr.source.Next() + } +} + +// Error implements Iterator. +func (itr cLevelDBIterator) Error() error { + return itr.source.GetError() +} + +// Close implements Iterator. +func (itr cLevelDBIterator) Close() { + itr.source.Close() +} + +func (itr cLevelDBIterator) assertNoError() { + err := itr.source.GetError() + if err != nil { + panic(err) + } +} + +func (itr cLevelDBIterator) assertIsValid() { + if !itr.Valid() { + panic("cLevelDBIterator is invalid") + } +} diff --git a/c_level_db_test.go b/cleveldb_test.go similarity index 90% rename from c_level_db_test.go rename to cleveldb_test.go index 57f3ebb4f..e893c09af 100644 --- a/c_level_db_test.go +++ b/cleveldb_test.go @@ -78,18 +78,6 @@ func BenchmarkRandomReadsWrites2(b *testing.B) { db.Close() } -/* -func int642Bytes(i int64) []byte { - buf := make([]byte, 8) - binary.BigEndian.PutUint64(buf, uint64(i)) - return buf -} - -func bytes2Int64(buf []byte) int64 { - return int64(binary.BigEndian.Uint64(buf)) -} -*/ - func TestCLevelDBBackend(t *testing.T) { name := fmt.Sprintf("test_%x", randStr(12)) // Can't use "" (current directory) or "./" here because levigo.Open returns: diff --git a/go_level_db.go b/go_level_db.go deleted file mode 100644 index 126096f96..000000000 --- a/go_level_db.go +++ /dev/null @@ -1,353 +0,0 @@ -package db - -import ( - "bytes" - "fmt" - "path/filepath" - - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/errors" - "github.com/syndtr/goleveldb/leveldb/iterator" - "github.com/syndtr/goleveldb/leveldb/opt" -) - -func init() { - dbCreator := func(name string, dir string) (DB, error) { - return NewGoLevelDB(name, dir) - } - registerDBCreator(GoLevelDBBackend, dbCreator, false) -} - -var _ DB = (*GoLevelDB)(nil) - -type GoLevelDB struct { - db *leveldb.DB -} - -func NewGoLevelDB(name string, dir string) (*GoLevelDB, error) { - return NewGoLevelDBWithOpts(name, dir, nil) -} - -func NewGoLevelDBWithOpts(name string, dir string, o *opt.Options) (*GoLevelDB, error) { - dbPath := filepath.Join(dir, name+".db") - db, err := leveldb.OpenFile(dbPath, o) - if err != nil { - return nil, err - } - database := &GoLevelDB{ - db: db, - } - return database, nil -} - -// Implements DB. -func (db *GoLevelDB) Get(key []byte) ([]byte, error) { - key = nonNilBytes(key) - res, err := db.db.Get(key, nil) - if err != nil { - if err == errors.ErrNotFound { - return nil, nil - } - return nil, err - } - return res, nil -} - -// Implements DB. -func (db *GoLevelDB) Has(key []byte) (bool, error) { - bytes, err := db.Get(key) - if err != nil { - return false, err - } - return bytes != nil, nil -} - -// Implements DB. -func (db *GoLevelDB) Set(key []byte, value []byte) error { - key = nonNilBytes(key) - value = nonNilBytes(value) - if err := db.db.Put(key, value, nil); err != nil { - return err - } - return nil -} - -// Implements DB. -func (db *GoLevelDB) SetSync(key []byte, value []byte) error { - key = nonNilBytes(key) - value = nonNilBytes(value) - if err := db.db.Put(key, value, &opt.WriteOptions{Sync: true}); err != nil { - return err - } - return nil -} - -// Implements DB. -func (db *GoLevelDB) Delete(key []byte) error { - key = nonNilBytes(key) - if err := db.db.Delete(key, nil); err != nil { - return err - } - return nil -} - -// Implements DB. -func (db *GoLevelDB) DeleteSync(key []byte) error { - key = nonNilBytes(key) - err := db.db.Delete(key, &opt.WriteOptions{Sync: true}) - if err != nil { - return err - } - return nil -} - -func (db *GoLevelDB) DB() *leveldb.DB { - return db.db -} - -// Implements DB. -func (db *GoLevelDB) Close() error { - if err := db.db.Close(); err != nil { - return err - } - return nil -} - -// Implements DB. -func (db *GoLevelDB) Print() error { - str, err := db.db.GetProperty("leveldb.stats") - if err != nil { - return err - } - fmt.Printf("%v\n", str) - - itr := db.db.NewIterator(nil, nil) - for itr.Next() { - key := itr.Key() - value := itr.Value() - fmt.Printf("[%X]:\t[%X]\n", key, value) - } - return nil -} - -// Implements DB. -func (db *GoLevelDB) Stats() map[string]string { - keys := []string{ - "leveldb.num-files-at-level{n}", - "leveldb.stats", - "leveldb.sstables", - "leveldb.blockpool", - "leveldb.cachedblock", - "leveldb.openedtables", - "leveldb.alivesnaps", - "leveldb.aliveiters", - } - - stats := make(map[string]string) - for _, key := range keys { - str, err := db.db.GetProperty(key) - if err == nil { - stats[key] = str - } - } - return stats -} - -//---------------------------------------- -// Batch - -// Implements DB. -func (db *GoLevelDB) NewBatch() Batch { - batch := new(leveldb.Batch) - return &goLevelDBBatch{db, batch} -} - -type goLevelDBBatch struct { - db *GoLevelDB - batch *leveldb.Batch -} - -// Implements Batch. -func (mBatch *goLevelDBBatch) Set(key, value []byte) { - mBatch.batch.Put(key, value) -} - -// Implements Batch. -func (mBatch *goLevelDBBatch) Delete(key []byte) { - mBatch.batch.Delete(key) -} - -// Implements Batch. -func (mBatch *goLevelDBBatch) Write() error { - err := mBatch.db.db.Write(mBatch.batch, &opt.WriteOptions{Sync: false}) - if err != nil { - return err - } - return nil -} - -// Implements Batch. -func (mBatch *goLevelDBBatch) WriteSync() error { - err := mBatch.db.db.Write(mBatch.batch, &opt.WriteOptions{Sync: true}) - if err != nil { - return err - } - return nil -} - -// Implements Batch. -func (mBatch *goLevelDBBatch) Close() { - mBatch.batch.Reset() -} - -//---------------------------------------- -// Iterator -// NOTE This is almost identical to db/c_level_db.Iterator -// Before creating a third version, refactor. - -// Implements DB. -func (db *GoLevelDB) Iterator(start, end []byte) (Iterator, error) { - itr := db.db.NewIterator(nil, nil) - return newGoLevelDBIterator(itr, start, end, false), nil -} - -// Implements DB. -func (db *GoLevelDB) ReverseIterator(start, end []byte) (Iterator, error) { - itr := db.db.NewIterator(nil, nil) - return newGoLevelDBIterator(itr, start, end, true), nil -} - -type goLevelDBIterator struct { - source iterator.Iterator - start []byte - end []byte - isReverse bool - isInvalid bool -} - -var _ Iterator = (*goLevelDBIterator)(nil) - -func newGoLevelDBIterator(source iterator.Iterator, start, end []byte, isReverse bool) *goLevelDBIterator { - if isReverse { - if end == nil { - source.Last() - } else { - valid := source.Seek(end) - if valid { - eoakey := source.Key() // end or after key - if bytes.Compare(end, eoakey) <= 0 { - source.Prev() - } - } else { - source.Last() - } - } - } else { - if start == nil { - source.First() - } else { - source.Seek(start) - } - } - return &goLevelDBIterator{ - source: source, - start: start, - end: end, - isReverse: isReverse, - isInvalid: false, - } -} - -// Implements Iterator. -func (itr *goLevelDBIterator) Domain() ([]byte, []byte) { - return itr.start, itr.end -} - -// Implements Iterator. -func (itr *goLevelDBIterator) Valid() bool { - - // Once invalid, forever invalid. - if itr.isInvalid { - return false - } - - // Panic on DB error. No way to recover. - itr.assertNoError() - - // If source is invalid, invalid. - if !itr.source.Valid() { - itr.isInvalid = true - return false - } - - // If key is end or past it, invalid. - var start = itr.start - var end = itr.end - var key = itr.source.Key() - - if itr.isReverse { - if start != nil && bytes.Compare(key, start) < 0 { - itr.isInvalid = true - return false - } - } else { - if end != nil && bytes.Compare(end, key) <= 0 { - itr.isInvalid = true - return false - } - } - - // Valid - return true -} - -// Implements Iterator. -func (itr *goLevelDBIterator) Key() []byte { - // Key returns a copy of the current key. - // See https://github.com/syndtr/goleveldb/blob/52c212e6c196a1404ea59592d3f1c227c9f034b2/leveldb/iterator/iter.go#L88 - itr.assertNoError() - itr.assertIsValid() - return cp(itr.source.Key()) -} - -// Implements Iterator. -func (itr *goLevelDBIterator) Value() []byte { - // Value returns a copy of the current value. - // See https://github.com/syndtr/goleveldb/blob/52c212e6c196a1404ea59592d3f1c227c9f034b2/leveldb/iterator/iter.go#L88 - itr.assertNoError() - itr.assertIsValid() - return cp(itr.source.Value()) -} - -// Implements Iterator. -func (itr *goLevelDBIterator) Next() { - itr.assertNoError() - itr.assertIsValid() - if itr.isReverse { - itr.source.Prev() - } else { - itr.source.Next() - } -} - -func (itr *goLevelDBIterator) Error() error { - return itr.source.Error() -} - -// Implements Iterator. -func (itr *goLevelDBIterator) Close() { - itr.source.Release() -} - -func (itr *goLevelDBIterator) assertNoError() { - err := itr.source.Error() - if err != nil { - panic(err) - } -} - -func (itr goLevelDBIterator) assertIsValid() { - if !itr.Valid() { - panic("goLevelDBIterator is invalid") - } -} diff --git a/goleveldb.go b/goleveldb.go new file mode 100644 index 000000000..0c54cdee9 --- /dev/null +++ b/goleveldb.go @@ -0,0 +1,170 @@ +package db + +import ( + "fmt" + "path/filepath" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb/opt" +) + +func init() { + dbCreator := func(name string, dir string) (DB, error) { + return NewGoLevelDB(name, dir) + } + registerDBCreator(GoLevelDBBackend, dbCreator, false) +} + +type GoLevelDB struct { + db *leveldb.DB +} + +var _ DB = (*GoLevelDB)(nil) + +func NewGoLevelDB(name string, dir string) (*GoLevelDB, error) { + return NewGoLevelDBWithOpts(name, dir, nil) +} + +func NewGoLevelDBWithOpts(name string, dir string, o *opt.Options) (*GoLevelDB, error) { + dbPath := filepath.Join(dir, name+".db") + db, err := leveldb.OpenFile(dbPath, o) + if err != nil { + return nil, err + } + database := &GoLevelDB{ + db: db, + } + return database, nil +} + +// Get implements DB. +func (db *GoLevelDB) Get(key []byte) ([]byte, error) { + key = nonNilBytes(key) + res, err := db.db.Get(key, nil) + if err != nil { + if err == errors.ErrNotFound { + return nil, nil + } + return nil, err + } + return res, nil +} + +// Has implements DB. +func (db *GoLevelDB) Has(key []byte) (bool, error) { + bytes, err := db.Get(key) + if err != nil { + return false, err + } + return bytes != nil, nil +} + +// Set implements DB. +func (db *GoLevelDB) Set(key []byte, value []byte) error { + key = nonNilBytes(key) + value = nonNilBytes(value) + if err := db.db.Put(key, value, nil); err != nil { + return err + } + return nil +} + +// SetSync implements DB. +func (db *GoLevelDB) SetSync(key []byte, value []byte) error { + key = nonNilBytes(key) + value = nonNilBytes(value) + if err := db.db.Put(key, value, &opt.WriteOptions{Sync: true}); err != nil { + return err + } + return nil +} + +// Delete implements DB. +func (db *GoLevelDB) Delete(key []byte) error { + key = nonNilBytes(key) + if err := db.db.Delete(key, nil); err != nil { + return err + } + return nil +} + +// DeleteSync implements DB. +func (db *GoLevelDB) DeleteSync(key []byte) error { + key = nonNilBytes(key) + err := db.db.Delete(key, &opt.WriteOptions{Sync: true}) + if err != nil { + return err + } + return nil +} + +func (db *GoLevelDB) DB() *leveldb.DB { + return db.db +} + +// Close implements DB. +func (db *GoLevelDB) Close() error { + if err := db.db.Close(); err != nil { + return err + } + return nil +} + +// Print implements DB. +func (db *GoLevelDB) Print() error { + str, err := db.db.GetProperty("leveldb.stats") + if err != nil { + return err + } + fmt.Printf("%v\n", str) + + itr := db.db.NewIterator(nil, nil) + for itr.Next() { + key := itr.Key() + value := itr.Value() + fmt.Printf("[%X]:\t[%X]\n", key, value) + } + return nil +} + +// Stats implements DB. +func (db *GoLevelDB) Stats() map[string]string { + keys := []string{ + "leveldb.num-files-at-level{n}", + "leveldb.stats", + "leveldb.sstables", + "leveldb.blockpool", + "leveldb.cachedblock", + "leveldb.openedtables", + "leveldb.alivesnaps", + "leveldb.aliveiters", + } + + stats := make(map[string]string) + for _, key := range keys { + str, err := db.db.GetProperty(key) + if err == nil { + stats[key] = str + } + } + return stats +} + +// NewBatch implements DB. +func (db *GoLevelDB) NewBatch() Batch { + batch := new(leveldb.Batch) + return &goLevelDBBatch{db, batch} +} + +// Iterator implements DB. +func (db *GoLevelDB) Iterator(start, end []byte) (Iterator, error) { + itr := db.db.NewIterator(nil, nil) + return newGoLevelDBIterator(itr, start, end, false), nil +} + +// ReverseIterator implements DB. +func (db *GoLevelDB) ReverseIterator(start, end []byte) (Iterator, error) { + itr := db.db.NewIterator(nil, nil) + return newGoLevelDBIterator(itr, start, end, true), nil +} diff --git a/goleveldb_batch.go b/goleveldb_batch.go new file mode 100644 index 000000000..ec290fe10 --- /dev/null +++ b/goleveldb_batch.go @@ -0,0 +1,46 @@ +package db + +import ( + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" +) + +type goLevelDBBatch struct { + db *GoLevelDB + batch *leveldb.Batch +} + +var _ Batch = (*goLevelDBBatch)(nil) + +// Set implements Batch. +func (b *goLevelDBBatch) Set(key, value []byte) { + b.batch.Put(key, value) +} + +// Delete implements Batch. +func (b *goLevelDBBatch) Delete(key []byte) { + b.batch.Delete(key) +} + +// Write implements Batch. +func (b *goLevelDBBatch) Write() error { + err := b.db.db.Write(b.batch, &opt.WriteOptions{Sync: false}) + if err != nil { + return err + } + return nil +} + +// WriteSync implements Batch. +func (b *goLevelDBBatch) WriteSync() error { + err := b.db.db.Write(b.batch, &opt.WriteOptions{Sync: true}) + if err != nil { + return err + } + return nil +} + +// Close implements Batch. +func (b *goLevelDBBatch) Close() { + b.batch.Reset() +} diff --git a/goleveldb_iterator.go b/goleveldb_iterator.go new file mode 100644 index 000000000..3a13c4d6f --- /dev/null +++ b/goleveldb_iterator.go @@ -0,0 +1,143 @@ +package db + +import ( + "bytes" + + "github.com/syndtr/goleveldb/leveldb/iterator" +) + +type goLevelDBIterator struct { + source iterator.Iterator + start []byte + end []byte + isReverse bool + isInvalid bool +} + +var _ Iterator = (*goLevelDBIterator)(nil) + +func newGoLevelDBIterator(source iterator.Iterator, start, end []byte, isReverse bool) *goLevelDBIterator { + if isReverse { + if end == nil { + source.Last() + } else { + valid := source.Seek(end) + if valid { + eoakey := source.Key() // end or after key + if bytes.Compare(end, eoakey) <= 0 { + source.Prev() + } + } else { + source.Last() + } + } + } else { + if start == nil { + source.First() + } else { + source.Seek(start) + } + } + return &goLevelDBIterator{ + source: source, + start: start, + end: end, + isReverse: isReverse, + isInvalid: false, + } +} + +// Domain implements Iterator. +func (itr *goLevelDBIterator) Domain() ([]byte, []byte) { + return itr.start, itr.end +} + +// Valid implements Iterator. +func (itr *goLevelDBIterator) Valid() bool { + + // Once invalid, forever invalid. + if itr.isInvalid { + return false + } + + // Panic on DB error. No way to recover. + itr.assertNoError() + + // If source is invalid, invalid. + if !itr.source.Valid() { + itr.isInvalid = true + return false + } + + // If key is end or past it, invalid. + var start = itr.start + var end = itr.end + var key = itr.source.Key() + + if itr.isReverse { + if start != nil && bytes.Compare(key, start) < 0 { + itr.isInvalid = true + return false + } + } else { + if end != nil && bytes.Compare(end, key) <= 0 { + itr.isInvalid = true + return false + } + } + + // Valid + return true +} + +// Key implements Iterator. +func (itr *goLevelDBIterator) Key() []byte { + // Key returns a copy of the current key. + // See https://github.com/syndtr/goleveldb/blob/52c212e6c196a1404ea59592d3f1c227c9f034b2/leveldb/iterator/iter.go#L88 + itr.assertNoError() + itr.assertIsValid() + return cp(itr.source.Key()) +} + +// Value implements Iterator. +func (itr *goLevelDBIterator) Value() []byte { + // Value returns a copy of the current value. + // See https://github.com/syndtr/goleveldb/blob/52c212e6c196a1404ea59592d3f1c227c9f034b2/leveldb/iterator/iter.go#L88 + itr.assertNoError() + itr.assertIsValid() + return cp(itr.source.Value()) +} + +// Next implements Iterator. +func (itr *goLevelDBIterator) Next() { + itr.assertNoError() + itr.assertIsValid() + if itr.isReverse { + itr.source.Prev() + } else { + itr.source.Next() + } +} + +// Error implements Iterator. +func (itr *goLevelDBIterator) Error() error { + return itr.source.Error() +} + +// Close implements Iterator. +func (itr *goLevelDBIterator) Close() { + itr.source.Release() +} + +func (itr *goLevelDBIterator) assertNoError() { + err := itr.source.Error() + if err != nil { + panic(err) + } +} + +func (itr goLevelDBIterator) assertIsValid() { + if !itr.Valid() { + panic("goLevelDBIterator is invalid") + } +} diff --git a/go_level_db_test.go b/goleveldb_test.go similarity index 100% rename from go_level_db_test.go rename to goleveldb_test.go diff --git a/prefix_db.go b/prefix_db.go deleted file mode 100644 index fd6e5800e..000000000 --- a/prefix_db.go +++ /dev/null @@ -1,342 +0,0 @@ -package db - -import ( - "bytes" - "fmt" - "sync" -) - -// IteratePrefix is a convenience function for iterating over a key domain -// restricted by prefix. -func IteratePrefix(db DB, prefix []byte) (Iterator, error) { - var start, end []byte - if len(prefix) == 0 { - start = nil - end = nil - } else { - start = cp(prefix) - end = cpIncr(prefix) - } - itr, err := db.Iterator(start, end) - if err != nil { - return nil, err - } - return itr, nil -} - -/* -TODO: Make test, maybe rename. -// Like IteratePrefix but the iterator strips the prefix from the keys. -func IteratePrefixStripped(db DB, prefix []byte) Iterator { - start, end := ... - return newPrefixIterator(prefix, start, end, IteratePrefix(db, prefix)) -} -*/ - -//---------------------------------------- -// prefixDB - -type PrefixDB struct { - mtx sync.Mutex - prefix []byte - db DB -} - -// NewPrefixDB lets you namespace multiple DBs within a single DB. -func NewPrefixDB(db DB, prefix []byte) *PrefixDB { - return &PrefixDB{ - prefix: prefix, - db: db, - } -} - -// Implements atomicSetDeleter. -func (pdb *PrefixDB) Mutex() *sync.Mutex { - return &(pdb.mtx) -} - -// Implements DB. -func (pdb *PrefixDB) Get(key []byte) ([]byte, error) { - pdb.mtx.Lock() - defer pdb.mtx.Unlock() - - pkey := pdb.prefixed(key) - value, err := pdb.db.Get(pkey) - if err != nil { - return nil, err - } - return value, nil -} - -// Implements DB. -func (pdb *PrefixDB) Has(key []byte) (bool, error) { - pdb.mtx.Lock() - defer pdb.mtx.Unlock() - - ok, err := pdb.db.Has(pdb.prefixed(key)) - if err != nil { - return ok, err - } - - return ok, nil -} - -// Implements DB. -func (pdb *PrefixDB) Set(key []byte, value []byte) error { - pdb.mtx.Lock() - defer pdb.mtx.Unlock() - - pkey := pdb.prefixed(key) - if err := pdb.db.Set(pkey, value); err != nil { - return err - } - return nil -} - -// Implements DB. -func (pdb *PrefixDB) SetSync(key []byte, value []byte) error { - pdb.mtx.Lock() - defer pdb.mtx.Unlock() - - return pdb.db.SetSync(pdb.prefixed(key), value) -} - -// Implements DB. -func (pdb *PrefixDB) Delete(key []byte) error { - pdb.mtx.Lock() - defer pdb.mtx.Unlock() - - return pdb.db.Delete(pdb.prefixed(key)) -} - -// Implements DB. -func (pdb *PrefixDB) DeleteSync(key []byte) error { - pdb.mtx.Lock() - defer pdb.mtx.Unlock() - - return pdb.db.DeleteSync(pdb.prefixed(key)) -} - -// Implements DB. -func (pdb *PrefixDB) Iterator(start, end []byte) (Iterator, error) { - pdb.mtx.Lock() - defer pdb.mtx.Unlock() - - var pstart, pend []byte - pstart = append(cp(pdb.prefix), start...) - if end == nil { - pend = cpIncr(pdb.prefix) - } else { - pend = append(cp(pdb.prefix), end...) - } - itr, err := pdb.db.Iterator(pstart, pend) - if err != nil { - return nil, err - } - - return newPrefixIterator(pdb.prefix, start, end, itr) -} - -// Implements DB. -func (pdb *PrefixDB) ReverseIterator(start, end []byte) (Iterator, error) { - pdb.mtx.Lock() - defer pdb.mtx.Unlock() - - var pstart, pend []byte - pstart = append(cp(pdb.prefix), start...) - if end == nil { - pend = cpIncr(pdb.prefix) - } else { - pend = append(cp(pdb.prefix), end...) - } - ritr, err := pdb.db.ReverseIterator(pstart, pend) - if err != nil { - return nil, err - } - - return newPrefixIterator(pdb.prefix, start, end, ritr) -} - -// Implements DB. -// Panics if the underlying DB is not an -// atomicSetDeleter. -func (pdb *PrefixDB) NewBatch() Batch { - pdb.mtx.Lock() - defer pdb.mtx.Unlock() - - return newPrefixBatch(pdb.prefix, pdb.db.NewBatch()) -} - -// Implements DB. -func (pdb *PrefixDB) Close() error { - pdb.mtx.Lock() - defer pdb.mtx.Unlock() - - return pdb.db.Close() -} - -// Implements DB. -func (pdb *PrefixDB) Print() error { - fmt.Printf("prefix: %X\n", pdb.prefix) - - itr, err := pdb.Iterator(nil, nil) - if err != nil { - return err - } - defer itr.Close() - for ; itr.Valid(); itr.Next() { - key := itr.Key() - value := itr.Value() - fmt.Printf("[%X]:\t[%X]\n", key, value) - } - return nil -} - -// Implements DB. -func (pdb *PrefixDB) Stats() map[string]string { - stats := make(map[string]string) - stats["prefixdb.prefix.string"] = string(pdb.prefix) - stats["prefixdb.prefix.hex"] = fmt.Sprintf("%X", pdb.prefix) - source := pdb.db.Stats() - for key, value := range source { - stats["prefixdb.source."+key] = value - } - return stats -} - -func (pdb *PrefixDB) prefixed(key []byte) []byte { - return append(cp(pdb.prefix), key...) -} - -//---------------------------------------- -// prefixBatch - -type prefixBatch struct { - prefix []byte - source Batch -} - -func newPrefixBatch(prefix []byte, source Batch) prefixBatch { - return prefixBatch{ - prefix: prefix, - source: source, - } -} - -func (pb prefixBatch) Set(key, value []byte) { - pkey := append(cp(pb.prefix), key...) - pb.source.Set(pkey, value) -} - -func (pb prefixBatch) Delete(key []byte) { - pkey := append(cp(pb.prefix), key...) - pb.source.Delete(pkey) -} - -func (pb prefixBatch) Write() error { - return pb.source.Write() -} - -func (pb prefixBatch) WriteSync() error { - return pb.source.WriteSync() -} - -func (pb prefixBatch) Close() { - pb.source.Close() -} - -//---------------------------------------- -// prefixIterator - -var _ Iterator = (*prefixIterator)(nil) - -// Strips prefix while iterating from Iterator. -type prefixIterator struct { - prefix []byte - start []byte - end []byte - source Iterator - valid bool -} - -func newPrefixIterator(prefix, start, end []byte, source Iterator) (*prefixIterator, error) { - - pitrInvalid := &prefixIterator{ - prefix: prefix, - start: start, - end: end, - source: source, - valid: false, - } - - if !source.Valid() { - return pitrInvalid, nil - } - key := source.Key() - - if !bytes.HasPrefix(key, prefix) { - return pitrInvalid, nil - } - return &prefixIterator{ - prefix: prefix, - start: start, - end: end, - source: source, - valid: true, - }, nil -} - -func (itr *prefixIterator) Domain() (start []byte, end []byte) { - return itr.start, itr.end -} - -func (itr *prefixIterator) Valid() bool { - return itr.valid && itr.source.Valid() -} - -func (itr *prefixIterator) Next() { - if !itr.valid { - panic("prefixIterator invalid; cannot call Next()") - } - itr.source.Next() - - if !itr.source.Valid() || !bytes.HasPrefix(itr.source.Key(), itr.prefix) { - itr.valid = false - } -} - -func (itr *prefixIterator) Key() (key []byte) { - if !itr.valid { - panic("prefixIterator invalid; cannot call Key()") - } - key = itr.source.Key() - return stripPrefix(key, itr.prefix) -} - -func (itr *prefixIterator) Value() (value []byte) { - if !itr.valid { - panic("prefixIterator invalid; cannot call Value()") - } - value = itr.source.Value() - return value -} - -func (itr *prefixIterator) Error() error { - return itr.source.Error() -} - -func (itr *prefixIterator) Close() { - itr.source.Close() -} - -//---------------------------------------- - -func stripPrefix(key []byte, prefix []byte) (stripped []byte) { - if len(key) < len(prefix) { - panic("should not happen") - } - if !bytes.Equal(key[:len(prefix)], prefix) { - panic("should not happen") - } - return key[len(prefix):] -} diff --git a/prefixdb.go b/prefixdb.go new file mode 100644 index 000000000..2fcacc1a9 --- /dev/null +++ b/prefixdb.go @@ -0,0 +1,174 @@ +package db + +import ( + "fmt" + "sync" +) + +// PrefixDB wraps a namespace of another database as a logical database. +type PrefixDB struct { + mtx sync.Mutex + prefix []byte + db DB +} + +var _ DB = (*PrefixDB)(nil) + +// NewPrefixDB lets you namespace multiple DBs within a single DB. +func NewPrefixDB(db DB, prefix []byte) *PrefixDB { + return &PrefixDB{ + prefix: prefix, + db: db, + } +} + +// Get implements DB. +func (pdb *PrefixDB) Get(key []byte) ([]byte, error) { + pdb.mtx.Lock() + defer pdb.mtx.Unlock() + + pkey := pdb.prefixed(key) + value, err := pdb.db.Get(pkey) + if err != nil { + return nil, err + } + return value, nil +} + +// Has implements DB. +func (pdb *PrefixDB) Has(key []byte) (bool, error) { + pdb.mtx.Lock() + defer pdb.mtx.Unlock() + + ok, err := pdb.db.Has(pdb.prefixed(key)) + if err != nil { + return ok, err + } + + return ok, nil +} + +// Set implements DB. +func (pdb *PrefixDB) Set(key []byte, value []byte) error { + pdb.mtx.Lock() + defer pdb.mtx.Unlock() + + pkey := pdb.prefixed(key) + if err := pdb.db.Set(pkey, value); err != nil { + return err + } + return nil +} + +// SetSync implements DB. +func (pdb *PrefixDB) SetSync(key []byte, value []byte) error { + pdb.mtx.Lock() + defer pdb.mtx.Unlock() + + return pdb.db.SetSync(pdb.prefixed(key), value) +} + +// Delete implements DB. +func (pdb *PrefixDB) Delete(key []byte) error { + pdb.mtx.Lock() + defer pdb.mtx.Unlock() + + return pdb.db.Delete(pdb.prefixed(key)) +} + +// DeleteSync implements DB. +func (pdb *PrefixDB) DeleteSync(key []byte) error { + pdb.mtx.Lock() + defer pdb.mtx.Unlock() + + return pdb.db.DeleteSync(pdb.prefixed(key)) +} + +// Iterator implements DB. +func (pdb *PrefixDB) Iterator(start, end []byte) (Iterator, error) { + pdb.mtx.Lock() + defer pdb.mtx.Unlock() + + var pstart, pend []byte + pstart = append(cp(pdb.prefix), start...) + if end == nil { + pend = cpIncr(pdb.prefix) + } else { + pend = append(cp(pdb.prefix), end...) + } + itr, err := pdb.db.Iterator(pstart, pend) + if err != nil { + return nil, err + } + + return newPrefixIterator(pdb.prefix, start, end, itr) +} + +// ReverseIterator implements DB. +func (pdb *PrefixDB) ReverseIterator(start, end []byte) (Iterator, error) { + pdb.mtx.Lock() + defer pdb.mtx.Unlock() + + var pstart, pend []byte + pstart = append(cp(pdb.prefix), start...) + if end == nil { + pend = cpIncr(pdb.prefix) + } else { + pend = append(cp(pdb.prefix), end...) + } + ritr, err := pdb.db.ReverseIterator(pstart, pend) + if err != nil { + return nil, err + } + + return newPrefixIterator(pdb.prefix, start, end, ritr) +} + +// NewBatch implements DB. +func (pdb *PrefixDB) NewBatch() Batch { + pdb.mtx.Lock() + defer pdb.mtx.Unlock() + + return newPrefixBatch(pdb.prefix, pdb.db.NewBatch()) +} + +// Close implements DB. +func (pdb *PrefixDB) Close() error { + pdb.mtx.Lock() + defer pdb.mtx.Unlock() + + return pdb.db.Close() +} + +// Print implements DB. +func (pdb *PrefixDB) Print() error { + fmt.Printf("prefix: %X\n", pdb.prefix) + + itr, err := pdb.Iterator(nil, nil) + if err != nil { + return err + } + defer itr.Close() + for ; itr.Valid(); itr.Next() { + key := itr.Key() + value := itr.Value() + fmt.Printf("[%X]:\t[%X]\n", key, value) + } + return nil +} + +// Stats implements DB. +func (pdb *PrefixDB) Stats() map[string]string { + stats := make(map[string]string) + stats["prefixdb.prefix.string"] = string(pdb.prefix) + stats["prefixdb.prefix.hex"] = fmt.Sprintf("%X", pdb.prefix) + source := pdb.db.Stats() + for key, value := range source { + stats["prefixdb.source."+key] = value + } + return stats +} + +func (pdb *PrefixDB) prefixed(key []byte) []byte { + return append(cp(pdb.prefix), key...) +} diff --git a/prefixdb_batch.go b/prefixdb_batch.go new file mode 100644 index 000000000..a3547de18 --- /dev/null +++ b/prefixdb_batch.go @@ -0,0 +1,42 @@ +package db + +type prefixDBBatch struct { + prefix []byte + source Batch +} + +var _ Batch = (*prefixDBBatch)(nil) + +func newPrefixBatch(prefix []byte, source Batch) prefixDBBatch { + return prefixDBBatch{ + prefix: prefix, + source: source, + } +} + +// Set implements Batch. +func (pb prefixDBBatch) Set(key, value []byte) { + pkey := append(cp(pb.prefix), key...) + pb.source.Set(pkey, value) +} + +// Delete implements Batch. +func (pb prefixDBBatch) Delete(key []byte) { + pkey := append(cp(pb.prefix), key...) + pb.source.Delete(pkey) +} + +// Write implements Batch. +func (pb prefixDBBatch) Write() error { + return pb.source.Write() +} + +// WriteSync implements Batch. +func (pb prefixDBBatch) WriteSync() error { + return pb.source.WriteSync() +} + +// Close implements Batch. +func (pb prefixDBBatch) Close() { + pb.source.Close() +} diff --git a/prefixdb_iterator.go b/prefixdb_iterator.go new file mode 100644 index 000000000..9b50d4cd4 --- /dev/null +++ b/prefixdb_iterator.go @@ -0,0 +1,127 @@ +package db + +import "bytes" + +// IteratePrefix is a convenience function for iterating over a key domain +// restricted by prefix. +func IteratePrefix(db DB, prefix []byte) (Iterator, error) { + var start, end []byte + if len(prefix) == 0 { + start = nil + end = nil + } else { + start = cp(prefix) + end = cpIncr(prefix) + } + itr, err := db.Iterator(start, end) + if err != nil { + return nil, err + } + return itr, nil +} + +/* +TODO: Make test, maybe rename. +// Like IteratePrefix but the iterator strips the prefix from the keys. +func IteratePrefixStripped(db DB, prefix []byte) Iterator { + start, end := ... + return newPrefixIterator(prefix, start, end, IteratePrefix(db, prefix)) +} +*/ + +// Strips prefix while iterating from Iterator. +type prefixDBIterator struct { + prefix []byte + start []byte + end []byte + source Iterator + valid bool +} + +var _ Iterator = (*prefixDBIterator)(nil) + +func newPrefixIterator(prefix, start, end []byte, source Iterator) (*prefixDBIterator, error) { + pitrInvalid := &prefixDBIterator{ + prefix: prefix, + start: start, + end: end, + source: source, + valid: false, + } + + if !source.Valid() { + return pitrInvalid, nil + } + key := source.Key() + + if !bytes.HasPrefix(key, prefix) { + return pitrInvalid, nil + } + return &prefixDBIterator{ + prefix: prefix, + start: start, + end: end, + source: source, + valid: true, + }, nil +} + +// Domain implements Iterator. +func (itr *prefixDBIterator) Domain() (start []byte, end []byte) { + return itr.start, itr.end +} + +// Valid implements Iterator. +func (itr *prefixDBIterator) Valid() bool { + return itr.valid && itr.source.Valid() +} + +// Next implements Iterator. +func (itr *prefixDBIterator) Next() { + if !itr.valid { + panic("prefixIterator invalid; cannot call Next()") + } + itr.source.Next() + + if !itr.source.Valid() || !bytes.HasPrefix(itr.source.Key(), itr.prefix) { + itr.valid = false + } +} + +// Next implements Iterator. +func (itr *prefixDBIterator) Key() (key []byte) { + if !itr.valid { + panic("prefixIterator invalid; cannot call Key()") + } + key = itr.source.Key() + return stripPrefix(key, itr.prefix) +} + +// Value implements Iterator. +func (itr *prefixDBIterator) Value() (value []byte) { + if !itr.valid { + panic("prefixIterator invalid; cannot call Value()") + } + value = itr.source.Value() + return value +} + +// Error implements Iterator. +func (itr *prefixDBIterator) Error() error { + return itr.source.Error() +} + +// Close implements Iterator. +func (itr *prefixDBIterator) Close() { + itr.source.Close() +} + +func stripPrefix(key []byte, prefix []byte) (stripped []byte) { + if len(key) < len(prefix) { + panic("should not happen") + } + if !bytes.Equal(key[:len(prefix)], prefix) { + panic("should not happen") + } + return key[len(prefix):] +} diff --git a/prefix_db_test.go b/prefixdb_test.go similarity index 100% rename from prefix_db_test.go rename to prefixdb_test.go diff --git a/remotedb/batch.go b/remotedb/batch.go new file mode 100644 index 000000000..5fb92f30c --- /dev/null +++ b/remotedb/batch.go @@ -0,0 +1,54 @@ +package remotedb + +import ( + "github.com/pkg/errors" + + db "github.com/tendermint/tm-db" + protodb "github.com/tendermint/tm-db/remotedb/proto" +) + +type batch struct { + db *RemoteDB + ops []*protodb.Operation +} + +var _ db.Batch = (*batch)(nil) + +// Set implements Batch. +func (b *batch) Set(key, value []byte) { + op := &protodb.Operation{ + Entity: &protodb.Entity{Key: key, Value: value}, + Type: protodb.Operation_SET, + } + b.ops = append(b.ops, op) +} + +// Delete implements Batch. +func (b *batch) Delete(key []byte) { + op := &protodb.Operation{ + Entity: &protodb.Entity{Key: key}, + Type: protodb.Operation_DELETE, + } + b.ops = append(b.ops, op) +} + +// Write implements Batch. +func (b *batch) Write() error { + if _, err := b.db.dc.BatchWrite(b.db.ctx, &protodb.Batch{Ops: b.ops}); err != nil { + return errors.Errorf("remoteDB.BatchWrite: %v", err) + } + return nil +} + +// WriteSync implements Batch. +func (b *batch) WriteSync() error { + if _, err := b.db.dc.BatchWriteSync(b.db.ctx, &protodb.Batch{Ops: b.ops}); err != nil { + return errors.Errorf("RemoteDB.BatchWriteSync: %v", err) + } + return nil +} + +// Close implements Batch. +func (b *batch) Close() { + b.ops = nil +} diff --git a/remotedb/iterator.go b/remotedb/iterator.go new file mode 100644 index 000000000..77b252512 --- /dev/null +++ b/remotedb/iterator.go @@ -0,0 +1,131 @@ +package remotedb + +import ( + "fmt" + + db "github.com/tendermint/tm-db" + protodb "github.com/tendermint/tm-db/remotedb/proto" +) + +func makeIterator(dic protodb.DB_IteratorClient) db.Iterator { + return &iterator{dic: dic} +} + +func makeReverseIterator(dric protodb.DB_ReverseIteratorClient) db.Iterator { + return &reverseIterator{dric: dric} +} + +type reverseIterator struct { + dric protodb.DB_ReverseIteratorClient + cur *protodb.Iterator +} + +var _ db.Iterator = (*iterator)(nil) + +// Valid implements Iterator. +func (rItr *reverseIterator) Valid() bool { + return rItr.cur != nil && rItr.cur.Valid +} + +// Domain implements Iterator. +func (rItr *reverseIterator) Domain() (start, end []byte) { + if rItr.cur == nil || rItr.cur.Domain == nil { + return nil, nil + } + return rItr.cur.Domain.Start, rItr.cur.Domain.End +} + +// Next implements Iterator. +func (rItr *reverseIterator) Next() { + var err error + rItr.cur, err = rItr.dric.Recv() + if err != nil { + panic(fmt.Sprintf("RemoteDB.ReverseIterator.Next error: %v", err)) + } +} + +// Key implements Iterator. +func (rItr *reverseIterator) Key() []byte { + if rItr.cur == nil { + panic("key does not exist") + } + return rItr.cur.Key +} + +// Value implements Iterator. +func (rItr *reverseIterator) Value() []byte { + if rItr.cur == nil { + panic("key does not exist") + } + return rItr.cur.Value +} + +// Error implements Iterator. +func (rItr *reverseIterator) Error() error { + return nil +} + +// Close implements Iterator. +func (rItr *reverseIterator) Close() {} + +// iterator implements the db.Iterator by retrieving +// streamed iterators from the remote backend as +// needed. It is NOT safe for concurrent usage, +// matching the behavior of other iterators. +type iterator struct { + dic protodb.DB_IteratorClient + cur *protodb.Iterator +} + +var _ db.Iterator = (*iterator)(nil) + +// Valid implements Iterator. +func (itr *iterator) Valid() bool { + return itr.cur != nil && itr.cur.Valid +} + +// Domain implements Iterator. +func (itr *iterator) Domain() (start, end []byte) { + if itr.cur == nil || itr.cur.Domain == nil { + return nil, nil + } + return itr.cur.Domain.Start, itr.cur.Domain.End +} + +// Next implements Iterator. +func (itr *iterator) Next() { + var err error + itr.cur, err = itr.dic.Recv() + if err != nil { + panic(fmt.Sprintf("remoteDB.Iterator.Next error: %v", err)) + } +} + +// Key implements Iterator. +func (itr *iterator) Key() []byte { + if itr.cur == nil { + return nil + } + return itr.cur.Key +} + +// Value implements Iterator. +func (itr *iterator) Value() []byte { + if itr.cur == nil { + panic("current poisition is not valid") + } + return itr.cur.Value +} + +// Error implements Iterator. +func (itr *iterator) Error() error { + return nil +} + +// Close implements Iterator. +func (itr *iterator) Close() { + err := itr.dic.CloseSend() + if err != nil { + panic(fmt.Sprintf("Error closing iterator: %v", err)) + } +} diff --git a/remotedb/remotedb.go b/remotedb/remotedb.go index bef708673..9da3dd08b 100644 --- a/remotedb/remotedb.go +++ b/remotedb/remotedb.go @@ -128,155 +128,3 @@ func (rd *RemoteDB) Iterator(start, end []byte) (db.Iterator, error) { } return makeIterator(dic), nil } - -func makeIterator(dic protodb.DB_IteratorClient) db.Iterator { - return &iterator{dic: dic} -} - -func makeReverseIterator(dric protodb.DB_ReverseIteratorClient) db.Iterator { - return &reverseIterator{dric: dric} -} - -type reverseIterator struct { - dric protodb.DB_ReverseIteratorClient - cur *protodb.Iterator -} - -var _ db.Iterator = (*iterator)(nil) - -func (rItr *reverseIterator) Valid() bool { - return rItr.cur != nil && rItr.cur.Valid -} - -func (rItr *reverseIterator) Domain() (start, end []byte) { - if rItr.cur == nil || rItr.cur.Domain == nil { - return nil, nil - } - return rItr.cur.Domain.Start, rItr.cur.Domain.End -} - -// Next advances the current reverseIterator -func (rItr *reverseIterator) Next() { - var err error - rItr.cur, err = rItr.dric.Recv() - if err != nil { - panic(fmt.Sprintf("RemoteDB.ReverseIterator.Next error: %v", err)) - } -} - -func (rItr *reverseIterator) Key() []byte { - if rItr.cur == nil { - panic("key does not exist") - } - return rItr.cur.Key -} - -func (rItr *reverseIterator) Value() []byte { - if rItr.cur == nil { - panic("key does not exist") - } - return rItr.cur.Value -} - -func (rItr *reverseIterator) Error() error { - return nil -} - -func (rItr *reverseIterator) Close() {} - -// iterator implements the db.Iterator by retrieving -// streamed iterators from the remote backend as -// needed. It is NOT safe for concurrent usage, -// matching the behavior of other iterators. -type iterator struct { - dic protodb.DB_IteratorClient - cur *protodb.Iterator -} - -var _ db.Iterator = (*iterator)(nil) - -func (itr *iterator) Valid() bool { - return itr.cur != nil && itr.cur.Valid -} - -func (itr *iterator) Domain() (start, end []byte) { - if itr.cur == nil || itr.cur.Domain == nil { - return nil, nil - } - return itr.cur.Domain.Start, itr.cur.Domain.End -} - -// Next advances the current iterator -func (itr *iterator) Next() { - var err error - itr.cur, err = itr.dic.Recv() - if err != nil { - panic(fmt.Sprintf("remoteDB.Iterator.Next error: %v", err)) - } -} - -func (itr *iterator) Key() []byte { - if itr.cur == nil { - return nil - } - return itr.cur.Key -} - -func (itr *iterator) Value() []byte { - if itr.cur == nil { - panic("current poisition is not valid") - } - return itr.cur.Value -} - -func (itr *iterator) Error() error { - return nil -} - -func (itr *iterator) Close() { - err := itr.dic.CloseSend() - if err != nil { - panic(fmt.Sprintf("Error closing iterator: %v", err)) - } -} - -type batch struct { - db *RemoteDB - ops []*protodb.Operation -} - -var _ db.Batch = (*batch)(nil) - -func (bat *batch) Set(key, value []byte) { - op := &protodb.Operation{ - Entity: &protodb.Entity{Key: key, Value: value}, - Type: protodb.Operation_SET, - } - bat.ops = append(bat.ops, op) -} - -func (bat *batch) Delete(key []byte) { - op := &protodb.Operation{ - Entity: &protodb.Entity{Key: key}, - Type: protodb.Operation_DELETE, - } - bat.ops = append(bat.ops, op) -} - -func (bat *batch) Write() error { - if _, err := bat.db.dc.BatchWrite(bat.db.ctx, &protodb.Batch{Ops: bat.ops}); err != nil { - return errors.Errorf("remoteDB.BatchWrite: %v", err) - } - return nil -} - -func (bat *batch) WriteSync() error { - if _, err := bat.db.dc.BatchWriteSync(bat.db.ctx, &protodb.Batch{Ops: bat.ops}); err != nil { - return errors.Errorf("RemoteDB.BatchWriteSync: %v", err) - } - return nil -} - -func (bat *batch) Close() { - bat.ops = nil -} diff --git a/rocks_db.go b/rocksdb.go similarity index 50% rename from rocks_db.go rename to rocksdb.go index 409ff5af3..b4bcf1def 100644 --- a/rocks_db.go +++ b/rocksdb.go @@ -3,7 +3,6 @@ package db import ( - "bytes" "fmt" "path/filepath" "runtime" @@ -18,8 +17,7 @@ func init() { registerDBCreator(RocksDBBackend, dbCreator, false) } -var _ DB = (*RocksDB)(nil) - +// RocksDB is a RocksDB backend. type RocksDB struct { db *gorocksdb.DB ro *gorocksdb.ReadOptions @@ -27,6 +25,8 @@ type RocksDB struct { woSync *gorocksdb.WriteOptions } +var _ DB = (*RocksDB)(nil) + func NewRocksDB(name string, dir string) (*RocksDB, error) { // default rocksdb option, good enough for most cases, including heavy workloads. // 1GB table cache, 512MB write buffer(may use 50% more on heavy workloads). @@ -63,7 +63,7 @@ func NewRocksDBWithOptions(name string, dir string, opts *gorocksdb.Options) (*R return database, nil } -// Implements DB. +// Get implements DB. func (db *RocksDB) Get(key []byte) ([]byte, error) { key = nonNilBytes(key) res, err := db.db.Get(db.ro, key) @@ -73,7 +73,7 @@ func (db *RocksDB) Get(key []byte) ([]byte, error) { return moveSliceToBytes(res), nil } -// Implements DB. +// Has implements DB. func (db *RocksDB) Has(key []byte) (bool, error) { bytes, err := db.Get(key) if err != nil { @@ -82,7 +82,7 @@ func (db *RocksDB) Has(key []byte) (bool, error) { return bytes != nil, nil } -// Implements DB. +// Set implements DB. func (db *RocksDB) Set(key []byte, value []byte) error { key = nonNilBytes(key) value = nonNilBytes(value) @@ -93,7 +93,7 @@ func (db *RocksDB) Set(key []byte, value []byte) error { return nil } -// Implements DB. +// SetSync implements DB. func (db *RocksDB) SetSync(key []byte, value []byte) error { key = nonNilBytes(key) value = nonNilBytes(value) @@ -104,7 +104,7 @@ func (db *RocksDB) SetSync(key []byte, value []byte) error { return nil } -// Implements DB. +// Delete implements DB. func (db *RocksDB) Delete(key []byte) error { key = nonNilBytes(key) err := db.db.Delete(db.wo, key) @@ -114,7 +114,7 @@ func (db *RocksDB) Delete(key []byte) error { return nil } -// Implements DB. +// DeleteSync implements DB. func (db *RocksDB) DeleteSync(key []byte) error { key = nonNilBytes(key) err := db.db.Delete(db.woSync, key) @@ -128,7 +128,7 @@ func (db *RocksDB) DB() *gorocksdb.DB { return db.db } -// Implements DB. +// Close implements DB. func (db *RocksDB) Close() error { db.ro.Destroy() db.wo.Destroy() @@ -137,7 +137,7 @@ func (db *RocksDB) Close() error { return nil } -// Implements DB. +// Print implements DB. func (db *RocksDB) Print() error { itr, err := db.Iterator(nil, nil) if err != nil { @@ -152,7 +152,7 @@ func (db *RocksDB) Print() error { return nil } -// Implements DB. +// Stats implements DB. func (db *RocksDB) Stats() map[string]string { keys := []string{"rocksdb.stats"} stats := make(map[string]string, len(keys)) @@ -162,199 +162,20 @@ func (db *RocksDB) Stats() map[string]string { return stats } -//---------------------------------------- -// Batch - -// Implements DB. +// NewBatch implements DB. func (db *RocksDB) NewBatch() Batch { batch := gorocksdb.NewWriteBatch() return &rocksDBBatch{db, batch} } -type rocksDBBatch struct { - db *RocksDB - batch *gorocksdb.WriteBatch -} - -// Implements Batch. -func (mBatch *rocksDBBatch) Set(key, value []byte) { - mBatch.batch.Put(key, value) -} - -// Implements Batch. -func (mBatch *rocksDBBatch) Delete(key []byte) { - mBatch.batch.Delete(key) -} - -// Implements Batch. -func (mBatch *rocksDBBatch) Write() error { - err := mBatch.db.db.Write(mBatch.db.wo, mBatch.batch) - if err != nil { - return err - } - return nil -} - -// Implements Batch. -func (mBatch *rocksDBBatch) WriteSync() error { - err := mBatch.db.db.Write(mBatch.db.woSync, mBatch.batch) - if err != nil { - return err - } - return nil -} - -// Implements Batch. -func (mBatch *rocksDBBatch) Close() { - mBatch.batch.Destroy() -} - -//---------------------------------------- -// Iterator -// NOTE This is almost identical to db/go_level_db.Iterator -// Before creating a third version, refactor. - +// Iterator implements DB. func (db *RocksDB) Iterator(start, end []byte) (Iterator, error) { itr := db.db.NewIterator(db.ro) return newRocksDBIterator(itr, start, end, false), nil } +// ReverseIterator implements DB. func (db *RocksDB) ReverseIterator(start, end []byte) (Iterator, error) { itr := db.db.NewIterator(db.ro) return newRocksDBIterator(itr, start, end, true), nil } - -var _ Iterator = (*rocksDBIterator)(nil) - -type rocksDBIterator struct { - source *gorocksdb.Iterator - start, end []byte - isReverse bool - isInvalid bool -} - -func newRocksDBIterator(source *gorocksdb.Iterator, start, end []byte, isReverse bool) *rocksDBIterator { - if isReverse { - if end == nil { - source.SeekToLast() - } else { - source.Seek(end) - if source.Valid() { - eoakey := moveSliceToBytes(source.Key()) // end or after key - if bytes.Compare(end, eoakey) <= 0 { - source.Prev() - } - } else { - source.SeekToLast() - } - } - } else { - if start == nil { - source.SeekToFirst() - } else { - source.Seek(start) - } - } - return &rocksDBIterator{ - source: source, - start: start, - end: end, - isReverse: isReverse, - isInvalid: false, - } -} - -func (itr rocksDBIterator) Domain() ([]byte, []byte) { - return itr.start, itr.end -} - -func (itr rocksDBIterator) Valid() bool { - - // Once invalid, forever invalid. - if itr.isInvalid { - return false - } - - // Panic on DB error. No way to recover. - itr.assertNoError() - - // If source is invalid, invalid. - if !itr.source.Valid() { - itr.isInvalid = true - return false - } - - // If key is end or past it, invalid. - var start = itr.start - var end = itr.end - var key = moveSliceToBytes(itr.source.Key()) - if itr.isReverse { - if start != nil && bytes.Compare(key, start) < 0 { - itr.isInvalid = true - return false - } - } else { - if end != nil && bytes.Compare(end, key) <= 0 { - itr.isInvalid = true - return false - } - } - - // It's valid. - return true -} - -func (itr rocksDBIterator) Key() []byte { - itr.assertNoError() - itr.assertIsValid() - return moveSliceToBytes(itr.source.Key()) -} - -func (itr rocksDBIterator) Value() []byte { - itr.assertNoError() - itr.assertIsValid() - return moveSliceToBytes(itr.source.Value()) -} - -func (itr rocksDBIterator) Next() { - itr.assertNoError() - itr.assertIsValid() - if itr.isReverse { - itr.source.Prev() - } else { - itr.source.Next() - } -} - -func (itr rocksDBIterator) Error() error { - return itr.source.Err() -} - -func (itr rocksDBIterator) Close() { - itr.source.Close() -} - -func (itr rocksDBIterator) assertNoError() { - if err := itr.source.Err(); err != nil { - panic(err) - } -} - -func (itr rocksDBIterator) assertIsValid() { - if !itr.Valid() { - panic("rocksDBIterator is invalid") - } -} - -// moveSliceToBytes will free the slice and copy out a go []byte -// This function can be applied on *Slice returned from Key() and Value() -// of an Iterator, because they are marked as freed. -func moveSliceToBytes(s *gorocksdb.Slice) []byte { - defer s.Free() - if !s.Exists() { - return nil - } - v := make([]byte, len(s.Data())) - copy(v, s.Data()) - return v -} diff --git a/rocksdb_batch.go b/rocksdb_batch.go new file mode 100644 index 000000000..085ec51ce --- /dev/null +++ b/rocksdb_batch.go @@ -0,0 +1,45 @@ +// +build rocksdb + +package db + +import "github.com/tecbot/gorocksdb" + +type rocksDBBatch struct { + db *RocksDB + batch *gorocksdb.WriteBatch +} + +var _ Batch = (*rocksDBBatch)(nil) + +// Set implements Batch. +func (mBatch *rocksDBBatch) Set(key, value []byte) { + mBatch.batch.Put(key, value) +} + +// Delete implements Batch. +func (mBatch *rocksDBBatch) Delete(key []byte) { + mBatch.batch.Delete(key) +} + +// Write implements Batch. +func (mBatch *rocksDBBatch) Write() error { + err := mBatch.db.db.Write(mBatch.db.wo, mBatch.batch) + if err != nil { + return err + } + return nil +} + +// WriteSync mplements Batch. +func (mBatch *rocksDBBatch) WriteSync() error { + err := mBatch.db.db.Write(mBatch.db.woSync, mBatch.batch) + if err != nil { + return err + } + return nil +} + +// Close implements Batch. +func (mBatch *rocksDBBatch) Close() { + mBatch.batch.Destroy() +} diff --git a/rocksdb_iterator.go b/rocksdb_iterator.go new file mode 100644 index 000000000..301e49d15 --- /dev/null +++ b/rocksdb_iterator.go @@ -0,0 +1,151 @@ +// +build rocksdb + +package db + +import ( + "bytes" + + "github.com/tecbot/gorocksdb" +) + +type rocksDBIterator struct { + source *gorocksdb.Iterator + start, end []byte + isReverse bool + isInvalid bool +} + +var _ Iterator = (*rocksDBIterator)(nil) + +func newRocksDBIterator(source *gorocksdb.Iterator, start, end []byte, isReverse bool) *rocksDBIterator { + if isReverse { + if end == nil { + source.SeekToLast() + } else { + source.Seek(end) + if source.Valid() { + eoakey := moveSliceToBytes(source.Key()) // end or after key + if bytes.Compare(end, eoakey) <= 0 { + source.Prev() + } + } else { + source.SeekToLast() + } + } + } else { + if start == nil { + source.SeekToFirst() + } else { + source.Seek(start) + } + } + return &rocksDBIterator{ + source: source, + start: start, + end: end, + isReverse: isReverse, + isInvalid: false, + } +} + +// Domain implements Iterator. +func (itr rocksDBIterator) Domain() ([]byte, []byte) { + return itr.start, itr.end +} + +// Valid implements Iterator. +func (itr rocksDBIterator) Valid() bool { + + // Once invalid, forever invalid. + if itr.isInvalid { + return false + } + + // Panic on DB error. No way to recover. + itr.assertNoError() + + // If source is invalid, invalid. + if !itr.source.Valid() { + itr.isInvalid = true + return false + } + + // If key is end or past it, invalid. + var start = itr.start + var end = itr.end + var key = moveSliceToBytes(itr.source.Key()) + if itr.isReverse { + if start != nil && bytes.Compare(key, start) < 0 { + itr.isInvalid = true + return false + } + } else { + if end != nil && bytes.Compare(end, key) <= 0 { + itr.isInvalid = true + return false + } + } + + // It's valid. + return true +} + +// Key implements Iterator. +func (itr rocksDBIterator) Key() []byte { + itr.assertNoError() + itr.assertIsValid() + return moveSliceToBytes(itr.source.Key()) +} + +// Value implements Iterator. +func (itr rocksDBIterator) Value() []byte { + itr.assertNoError() + itr.assertIsValid() + return moveSliceToBytes(itr.source.Value()) +} + +// Next implements Iterator. +func (itr rocksDBIterator) Next() { + itr.assertNoError() + itr.assertIsValid() + if itr.isReverse { + itr.source.Prev() + } else { + itr.source.Next() + } +} + +// Error implements Iterator. +func (itr rocksDBIterator) Error() error { + return itr.source.Err() +} + +// Close implements Iterator. +func (itr rocksDBIterator) Close() { + itr.source.Close() +} + +func (itr rocksDBIterator) assertNoError() { + if err := itr.source.Err(); err != nil { + panic(err) + } +} + +func (itr rocksDBIterator) assertIsValid() { + if !itr.Valid() { + panic("rocksDBIterator is invalid") + } +} + +// moveSliceToBytes will free the slice and copy out a go []byte +// This function can be applied on *Slice returned from Key() and Value() +// of an Iterator, because they are marked as freed. +func moveSliceToBytes(s *gorocksdb.Slice) []byte { + defer s.Free() + if !s.Exists() { + return nil + } + v := make([]byte, len(s.Data())) + copy(v, s.Data()) + return v +} diff --git a/rocks_db_test.go b/rocksdb_test.go similarity index 100% rename from rocks_db_test.go rename to rocksdb_test.go