-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add transactional support to leveldb datastore. #17
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
1.1.2: QmPnXsHj9W8WpDDwj2iogRcnVL6d5ANtK9SAJLgKpeBMq8 | ||
1.2.0: QmcxDvw8NnJsfdEcfrypwHkLeVxZY2rT8iiWsUuBnw93gb |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,16 +9,21 @@ import ( | |
"github.com/jbenet/goprocess" | ||
"github.com/syndtr/goleveldb/leveldb" | ||
"github.com/syndtr/goleveldb/leveldb/errors" | ||
"github.com/syndtr/goleveldb/leveldb/iterator" | ||
"github.com/syndtr/goleveldb/leveldb/opt" | ||
"github.com/syndtr/goleveldb/leveldb/storage" | ||
"github.com/syndtr/goleveldb/leveldb/util" | ||
) | ||
|
||
type datastore struct { | ||
*accessor | ||
DB *leveldb.DB | ||
path string | ||
} | ||
|
||
var _ ds.Datastore = (*datastore)(nil) | ||
var _ ds.TxnDatastore = (*datastore)(nil) | ||
|
||
// Options is an alias of syndtr/goleveldb/opt.Options which might be extended | ||
// in the future. | ||
type Options opt.Options | ||
|
@@ -49,21 +54,34 @@ func NewDatastore(path string, opts *Options) (*datastore, error) { | |
} | ||
|
||
return &datastore{ | ||
DB: db, | ||
path: path, | ||
accessor: &accessor{ldb: db}, | ||
DB: db, | ||
path: path, | ||
}, nil | ||
} | ||
|
||
// Returns ErrInvalidType if value is not of type []byte. | ||
// An extraction of the common interface between LevelDB Transactions and the DB itself. | ||
// | ||
// Note: using sync = false. | ||
// see http://godoc.org/github.com/syndtr/goleveldb/leveldb/opt#WriteOptions | ||
func (d *datastore) Put(key ds.Key, value []byte) (err error) { | ||
return d.DB.Put(key.Bytes(), value, nil) | ||
// It allows to plug in either inside the `accessor`. | ||
type levelDbOps interface { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 🎉 |
||
Put(key, value []byte, wo *opt.WriteOptions) error | ||
Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) | ||
Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) | ||
Delete(key []byte, wo *opt.WriteOptions) error | ||
NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator | ||
} | ||
|
||
// Datastore operations using either the DB or a transaction as the backend. | ||
type accessor struct { | ||
ldb levelDbOps | ||
} | ||
|
||
func (a *accessor) Put(key ds.Key, value []byte) (err error) { | ||
return a.ldb.Put(key.Bytes(), value, nil) | ||
} | ||
|
||
func (d *datastore) Get(key ds.Key) (value []byte, err error) { | ||
val, err := d.DB.Get(key.Bytes(), nil) | ||
func (a *accessor) Get(key ds.Key) (value []byte, err error) { | ||
val, err := a.ldb.Get(key.Bytes(), nil) | ||
if err != nil { | ||
if err == leveldb.ErrNotFound { | ||
return nil, ds.ErrNotFound | ||
|
@@ -73,40 +91,40 @@ func (d *datastore) Get(key ds.Key) (value []byte, err error) { | |
return val, nil | ||
} | ||
|
||
func (d *datastore) Has(key ds.Key) (exists bool, err error) { | ||
return d.DB.Has(key.Bytes(), nil) | ||
func (a *accessor) Has(key ds.Key) (exists bool, err error) { | ||
return a.ldb.Has(key.Bytes(), nil) | ||
} | ||
|
||
func (d *datastore) Delete(key ds.Key) (err error) { | ||
func (a *accessor) Delete(key ds.Key) (err error) { | ||
// leveldb Delete will not return an error if the key doesn't | ||
// exist (see https://github.com/syndtr/goleveldb/issues/109), | ||
// so check that the key exists first and if not return an | ||
// error | ||
exists, err := d.DB.Has(key.Bytes(), nil) | ||
exists, err := a.ldb.Has(key.Bytes(), nil) | ||
if !exists { | ||
return ds.ErrNotFound | ||
} else if err != nil { | ||
return err | ||
} | ||
return d.DB.Delete(key.Bytes(), nil) | ||
return a.ldb.Delete(key.Bytes(), nil) | ||
} | ||
|
||
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { | ||
return d.QueryNew(q) | ||
func (a *accessor) Query(q dsq.Query) (dsq.Results, error) { | ||
return a.queryNew(q) | ||
} | ||
|
||
func (d *datastore) QueryNew(q dsq.Query) (dsq.Results, error) { | ||
func (a *accessor) queryNew(q dsq.Query) (dsq.Results, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. generally approve re-scoping this, but just make sure it's not breaking anything unexpectedly |
||
if len(q.Filters) > 0 || | ||
len(q.Orders) > 0 || | ||
q.Limit > 0 || | ||
q.Offset > 0 { | ||
return d.QueryOrig(q) | ||
return a.queryOrig(q) | ||
} | ||
var rnge *util.Range | ||
if q.Prefix != "" { | ||
rnge = util.BytesPrefix([]byte(q.Prefix)) | ||
} | ||
i := d.DB.NewIterator(rnge, nil) | ||
i := a.ldb.NewIterator(rnge, nil) | ||
return dsq.ResultsFromIterator(q, dsq.Iterator{ | ||
Next: func() (dsq.Result, bool) { | ||
ok := i.Next() | ||
|
@@ -130,7 +148,7 @@ func (d *datastore) QueryNew(q dsq.Query) (dsq.Results, error) { | |
}), nil | ||
} | ||
|
||
func (d *datastore) QueryOrig(q dsq.Query) (dsq.Results, error) { | ||
func (a *accessor) queryOrig(q dsq.Query) (dsq.Results, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The exported There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, there we go! |
||
// we can use multiple iterators concurrently. see: | ||
// https://godoc.org/github.com/syndtr/goleveldb/leveldb#DB.NewIterator | ||
// advance the iterator only if the reader reads | ||
|
@@ -140,7 +158,7 @@ func (d *datastore) QueryOrig(q dsq.Query) (dsq.Results, error) { | |
// that resources should be reclaimed. | ||
qrb := dsq.NewResultBuilder(q) | ||
qrb.Process.Go(func(worker goprocess.Process) { | ||
d.runQuery(worker, qrb) | ||
a.runQuery(worker, qrb) | ||
}) | ||
|
||
// go wait on the worker (without signaling close) | ||
|
@@ -157,13 +175,12 @@ func (d *datastore) QueryOrig(q dsq.Query) (dsq.Results, error) { | |
return qr, nil | ||
} | ||
|
||
func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) { | ||
|
||
func (a *accessor) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) { | ||
var rnge *util.Range | ||
if qrb.Query.Prefix != "" { | ||
rnge = util.BytesPrefix([]byte(qrb.Query.Prefix)) | ||
} | ||
i := d.DB.NewIterator(rnge, nil) | ||
i := a.ldb.NewIterator(rnge, nil) | ||
defer i.Release() | ||
|
||
// advance iterator for offset | ||
|
@@ -261,3 +278,26 @@ func (b *leveldbBatch) Delete(key ds.Key) error { | |
b.b.Delete(key.Bytes()) | ||
return nil | ||
} | ||
|
||
// A leveldb transaction embedding the accessor backed by the transaction. | ||
type transaction struct { | ||
*accessor | ||
tx *leveldb.Transaction | ||
} | ||
|
||
func (t *transaction) Commit() error { | ||
return t.tx.Commit() | ||
} | ||
|
||
func (t *transaction) Discard() { | ||
t.tx.Discard() | ||
} | ||
|
||
func (d *datastore) NewTransaction(readOnly bool) (ds.Txn, error) { | ||
tx, err := d.DB.OpenTransaction() | ||
if err != nil { | ||
return nil, err | ||
} | ||
accessor := &accessor{tx} | ||
return &transaction{accessor, tx}, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This
accessor
is created with the LevelDB DB itself. Theaccessor
onNewTransaction()
is created with the transaction. I'm not married to the naming.