Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split out backend files for batch and iterator #63

Merged
merged 10 commits into from
Mar 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 13 additions & 172 deletions boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package db

import (
"bytes"
"fmt"
"os"
"path/filepath"
Expand All @@ -20,8 +19,7 @@ func init() {
}, false)
}

// BoltDB is a wrapper around etcd's fork of bolt
// (https://github.com/etcd-io/bbolt).
// BoltDB is a wrapper around etcd's fork of bolt (https://github.com/etcd-io/bbolt).
//
// NOTE: All operations (including Set, Delete) are synchronous by default. One
// can globally turn it off by using NoSync config option (not recommended).
Expand All @@ -32,6 +30,8 @@ type BoltDB struct {
db *bbolt.DB
}

var _ DB = (*BoltDB)(nil)

// NewBoltDB returns a BoltDB with default options.
func NewBoltDB(name, dir string) (DB, error) {
return NewBoltDBWithOpts(name, dir, bbolt.DefaultOptions)
Expand Down Expand Up @@ -62,6 +62,7 @@ func NewBoltDBWithOpts(name string, dir string, opts *bbolt.Options) (DB, error)
return &BoltDB{db: db}, nil
}

// Get implements DB.
func (bdb *BoltDB) Get(key []byte) (value []byte, err error) {
key = nonEmptyKey(nonNilBytes(key))
err = bdb.db.View(func(tx *bbolt.Tx) error {
Expand All @@ -77,6 +78,7 @@ func (bdb *BoltDB) Get(key []byte) (value []byte, err error) {
return
}

// Has implements DB.
func (bdb *BoltDB) Has(key []byte) (bool, error) {
bytes, err := bdb.Get(key)
if err != nil {
Expand All @@ -85,6 +87,7 @@ func (bdb *BoltDB) Has(key []byte) (bool, error) {
return bytes != nil, nil
}

// Set implements DB.
func (bdb *BoltDB) Set(key, value []byte) error {
key = nonEmptyKey(nonNilBytes(key))
value = nonNilBytes(value)
Expand All @@ -98,10 +101,12 @@ func (bdb *BoltDB) Set(key, value []byte) error {
return nil
}

// SetSync implements DB.
func (bdb *BoltDB) SetSync(key, value []byte) error {
return bdb.Set(key, value)
}

// Delete implements DB.
func (bdb *BoltDB) Delete(key []byte) error {
key = nonEmptyKey(nonNilBytes(key))
err := bdb.db.Update(func(tx *bbolt.Tx) error {
Expand All @@ -113,14 +118,17 @@ func (bdb *BoltDB) Delete(key []byte) error {
return nil
}

// DeleteSync implements DB.
func (bdb *BoltDB) DeleteSync(key []byte) error {
return bdb.Delete(key)
}

// Close implements DB.
func (bdb *BoltDB) Close() error {
return bdb.db.Close()
}

// Print implements DB.
func (bdb *BoltDB) Print() error {
stats := bdb.db.Stats()
fmt.Printf("%v\n", stats)
Expand All @@ -138,6 +146,7 @@ func (bdb *BoltDB) Print() error {
return nil
}

// Stats implements DB.
func (bdb *BoltDB) Stats() map[string]string {
stats := bdb.db.Stats()
m := make(map[string]string)
Expand All @@ -155,64 +164,14 @@ func (bdb *BoltDB) Stats() map[string]string {
return m
}

// boltDBBatch stores key values in sync.Map and dumps them to the underlying
// DB upon Write call.
type boltDBBatch struct {
db *BoltDB
ops []operation
}

// NewBatch returns a new batch.
// NewBatch implements DB.
func (bdb *BoltDB) NewBatch() Batch {
return &boltDBBatch{
ops: nil,
db: bdb,
}
}

// It is safe to modify the contents of the argument after Set returns but not
// before.
func (bdb *boltDBBatch) Set(key, value []byte) {
bdb.ops = append(bdb.ops, operation{opTypeSet, key, value})
}

// It is safe to modify the contents of the argument after Delete returns but
// not before.
func (bdb *boltDBBatch) Delete(key []byte) {
bdb.ops = append(bdb.ops, operation{opTypeDelete, key, nil})
}

// NOTE: the operation is synchronous (see BoltDB for reasons)
func (bdb *boltDBBatch) Write() error {
err := bdb.db.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(bucket)
for _, op := range bdb.ops {
key := nonEmptyKey(nonNilBytes(op.key))
switch op.opType {
case opTypeSet:
if putErr := b.Put(key, op.value); putErr != nil {
return putErr
}
case opTypeDelete:
if delErr := b.Delete(key); delErr != nil {
return delErr
}
}
}
return nil
})
if err != nil {
return err
}
return nil
}

func (bdb *boltDBBatch) WriteSync() error {
return bdb.Write()
}

func (bdb *boltDBBatch) Close() {}

// WARNING: Any concurrent writes or reads will block until the iterator is
// closed.
func (bdb *BoltDB) Iterator(start, end []byte) (Iterator, error) {
Expand All @@ -233,124 +192,6 @@ func (bdb *BoltDB) ReverseIterator(start, end []byte) (Iterator, error) {
return newBoltDBIterator(tx, start, end, true), nil
}

// boltDBIterator allows you to iterate on range of keys/values given some
// start / end keys (nil & nil will result in doing full scan).
type boltDBIterator struct {
tx *bbolt.Tx

itr *bbolt.Cursor
start []byte
end []byte

currentKey []byte
currentValue []byte

isInvalid bool
isReverse bool
}

func newBoltDBIterator(tx *bbolt.Tx, start, end []byte, isReverse bool) *boltDBIterator {
itr := tx.Bucket(bucket).Cursor()

var ck, cv []byte
if isReverse {
if end == nil {
ck, cv = itr.Last()
} else {
_, _ = itr.Seek(end) // after key
ck, cv = itr.Prev() // return to end key
}
} else {
if start == nil {
ck, cv = itr.First()
} else {
ck, cv = itr.Seek(start)
}
}

return &boltDBIterator{
tx: tx,
itr: itr,
start: start,
end: end,
currentKey: ck,
currentValue: cv,
isReverse: isReverse,
isInvalid: false,
}
}

func (itr *boltDBIterator) Domain() ([]byte, []byte) {
return itr.start, itr.end
}

func (itr *boltDBIterator) Valid() bool {
if itr.isInvalid {
return false
}

// iterated to the end of the cursor
if len(itr.currentKey) == 0 {
itr.isInvalid = true
return false
}

if itr.isReverse {
if itr.start != nil && bytes.Compare(itr.currentKey, itr.start) < 0 {
itr.isInvalid = true
return false
}
} else {
if itr.end != nil && bytes.Compare(itr.end, itr.currentKey) <= 0 {
itr.isInvalid = true
return false
}
}

// Valid
return true
}

func (itr *boltDBIterator) Next() {
itr.assertIsValid()
if itr.isReverse {
itr.currentKey, itr.currentValue = itr.itr.Prev()
} else {
itr.currentKey, itr.currentValue = itr.itr.Next()
}
}

func (itr *boltDBIterator) Key() []byte {
itr.assertIsValid()
return append([]byte{}, itr.currentKey...)
}

func (itr *boltDBIterator) Value() []byte {
itr.assertIsValid()
var value []byte
if itr.currentValue != nil {
value = append([]byte{}, itr.currentValue...)
}
return value
}

func (itr *boltDBIterator) Error() error {
return nil
}

func (itr *boltDBIterator) Close() {
err := itr.tx.Rollback()
if err != nil {
panic(err)
}
}

func (itr *boltDBIterator) assertIsValid() {
if !itr.Valid() {
panic("boltdb-iterator is invalid")
}
}

// nonEmptyKey returns a []byte("nil") if key is empty.
// WARNING: this may collude with "nil" user key!
func nonEmptyKey(key []byte) []byte {
Expand Down
52 changes: 52 additions & 0 deletions boltdb_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// +build boltdb

package db

import "github.com/etcd-io/bbolt"

// boltDBBatch stores operations internally and dumps them to BoltDB on Write().
type boltDBBatch struct {
db *BoltDB
ops []operation
}

var _ Batch = (*boltDBBatch)(nil)

// Set implements Batch.
func (b *boltDBBatch) Set(key, value []byte) {
b.ops = append(b.ops, operation{opTypeSet, key, value})
}

// Delete implements Batch.
func (b *boltDBBatch) Delete(key []byte) {
b.ops = append(b.ops, operation{opTypeDelete, key, nil})
}

// Write implements Batch.
func (b *boltDBBatch) Write() error {
return b.db.db.Batch(func(tx *bbolt.Tx) error {
bkt := tx.Bucket(bucket)
for _, op := range b.ops {
key := nonEmptyKey(nonNilBytes(op.key))
switch op.opType {
case opTypeSet:
if err := bkt.Put(key, op.value); err != nil {
return err
}
case opTypeDelete:
if err := bkt.Delete(key); err != nil {
return err
}
}
}
return nil
})
}

// WriteSync implements Batch.
func (b *boltDBBatch) WriteSync() error {
return b.Write()
}

// Close implements Batch.
func (b *boltDBBatch) Close() {}
Loading