Skip to content
This repository was archived by the owner on Jun 26, 2023. It is now read-only.

Datastore based pinner #4

Merged
merged 29 commits into from
Nov 30, 2020
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9fc54f6
start restructuring for move to datastore pinner
aschmahmann Sep 30, 2020
be03825
Store pins in datastore instead of in mdag
gammazero Oct 26, 2020
77c9a1d
Import and Export functions. Cid stored as bytes. Revise indexer inte…
gammazero Oct 27, 2020
5398bb2
add name index
gammazero Oct 27, 2020
922fab7
add benchmarks
gammazero Oct 27, 2020
fdf37b1
Use dirty flag to determine when to rebuild indexes
gammazero Oct 28, 2020
a2720f1
Fix benchmarks
gammazero Oct 28, 2020
37293b6
Do not keep pinned CID sets in memory (no-cache implementation)
gammazero Oct 29, 2020
0244724
Add comments and unit test
gammazero Oct 29, 2020
22a61da
Speed up pinning by avoining 2nd recursive check if no changes
gammazero Oct 29, 2020
1662bb8
correct log level
gammazero Oct 29, 2020
cad8378
improve import/export unit test
gammazero Oct 29, 2020
d1a44d7
Update returns error if from CID is not pinned, even when from and to…
gammazero Nov 16, 2020
9bb7a0c
test update of same pin
gammazero Nov 16, 2020
eb32271
Cleanup and better test coverage
gammazero Nov 17, 2020
22254b2
Change requested in review
gammazero Nov 19, 2020
d787682
Additional changes from review
gammazero Nov 19, 2020
0435ac4
Removed New in favor of only having LoadPinner
gammazero Nov 19, 2020
dde41e9
Indexer encodes index and key to allow arbitrary strings
gammazero Nov 19, 2020
0a5737f
Use int64 for dirty count and remove unused const
gammazero Nov 19, 2020
fb419e7
use base64 encoding
gammazero Nov 20, 2020
00764f0
Encode using multibase
gammazero Nov 20, 2020
a6d812c
Changes from review
gammazero Nov 24, 2020
86f36c2
Rename LoadPinner to New for both pinners
gammazero Nov 24, 2020
7a128c6
Check context when loading pinner and during iterative operations
gammazero Nov 24, 2020
cd2065d
indexer.New takes ds.Key
gammazero Nov 24, 2020
9c335cd
Change pin encoding. Add unit test
gammazero Nov 24, 2020
9fafc51
switch to atlas pin encoding
aschmahmann Nov 30, 2020
2586c60
removed type annotations from pin struct
aschmahmann Nov 30, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
*~
*.log

# Test binary, build with `go test -c`
*.test

# Output of the go coverage tool
*.out
8 changes: 8 additions & 0 deletions dsindex/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package dsindex

import "errors"

var (
ErrEmptyKey = errors.New("key is empty")
ErrEmptyValue = errors.New("value is empty")
)
282 changes: 282 additions & 0 deletions dsindex/indexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
// Package dsindex provides secondary indexing functionality for a datastore.
package dsindex

import (
"context"
"fmt"
"path"

ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
"github.com/multiformats/go-multibase"
)

// Indexer maintains a secondary index. An index is a collection of key-value
// mappings where the key is the secondary index that maps to one or more
// values, where each value is a unique key being indexed.
type Indexer interface {
// Add adds the specified value to the key
Add(ctx context.Context, key, value string) error

// Delete deletes the specified value from the key. If the value is not in
// the datastore, this method returns no error.
Delete(ctx context.Context, key, value string) error

// DeleteKey deletes all values in the given key. If a key is not in the
// datastore, this method returns no error. Returns a count of values that
// were deleted.
DeleteKey(ctx context.Context, key string) (count int, err error)

// DeleteAll deletes all keys managed by this Indexer. Returns a count of
// the values that were deleted.
DeleteAll(ctx context.Context) (count int, err error)

// ForEach calls the function for each value in the specified index, until
// there are no more values, or until the function returns false. If key
// is empty string, then all keys are iterated.
ForEach(ctx context.Context, index string, fn func(key, value string) bool) error

// HasValue determines if the key contains the specified value
HasValue(ctx context.Context, key, value string) (bool, error)

// HasAny determines if any value is in the specified key. If key is
// empty string, then all values are searched.
HasAny(ctx context.Context, key string) (bool, error)

// Search returns all values for the given key
Search(ctx context.Context, key string) (values []string, err error)
}

// indexer is a simple implementation of Indexer. This implementation relies
// on the underlying data store to support efficient querying by prefix.
//
// TODO: Consider adding caching
type indexer struct {
dstore ds.Datastore
}

// New creates a new datastore index. All indexes are stored under the
// specified index name.
//
// To persist the actions of calling Indexer functions, it is necessary to call
// dstore.Sync.
func New(dstore ds.Datastore, name string) Indexer {
return &indexer{
dstore: namespace.Wrap(dstore, ds.NewKey(name)),
}
}

func (x *indexer) Add(ctx context.Context, key, value string) error {
if key == "" {
return ErrEmptyKey
}
if value == "" {
return ErrEmptyValue
}
dsKey := ds.NewKey(encode(key)).ChildString(encode(value))
return x.dstore.Put(dsKey, []byte{})
}

func (x *indexer) Delete(ctx context.Context, key, value string) error {
if key == "" {
return ErrEmptyKey
}
if value == "" {
return ErrEmptyValue
}
return x.dstore.Delete(ds.NewKey(encode(key)).ChildString(encode(value)))
}

func (x *indexer) DeleteKey(ctx context.Context, key string) (int, error) {
if key == "" {
return 0, ErrEmptyKey
}
return x.deletePrefix(ctx, encode(key))
}

func (x *indexer) DeleteAll(ctx context.Context) (int, error) {
return x.deletePrefix(ctx, "")
}

func (x *indexer) ForEach(ctx context.Context, key string, fn func(key, value string) bool) error {
if key != "" {
key = encode(key)
}

q := query.Query{
Prefix: key,
KeysOnly: true,
}
results, err := x.dstore.Query(q)
if err != nil {
return err
}

for {
r, ok := results.NextSync()
if !ok {
break
}
if r.Error != nil {
err = r.Error
break
}

ent := r.Entry
decIdx, err := decode(path.Base(path.Dir(ent.Key)))
if err != nil {
err = fmt.Errorf("cannot decode index: %v", err)
break
}
decKey, err := decode(path.Base(ent.Key))
if err != nil {
err = fmt.Errorf("cannot decode key: %v", err)
break
}
if !fn(decIdx, decKey) {
break
}
}
results.Close()

return err
}

func (x *indexer) HasValue(ctx context.Context, key, value string) (bool, error) {
if key == "" {
return false, ErrEmptyKey
}
if value == "" {
return false, ErrEmptyValue
}
return x.dstore.Has(ds.NewKey(encode(key)).ChildString(encode(value)))
}

func (x *indexer) HasAny(ctx context.Context, key string) (bool, error) {
var any bool
err := x.ForEach(ctx, key, func(key, value string) bool {
any = true
return false
})
return any, err
}

func (x *indexer) Search(ctx context.Context, key string) ([]string, error) {
if key == "" {
return nil, ErrEmptyKey
}
ents, err := x.queryPrefix(ctx, encode(key))
if err != nil {
return nil, err
}
if len(ents) == 0 {
return nil, nil
}

values := make([]string, len(ents))
for i := range ents {
values[i], err = decode(path.Base(ents[i].Key))
if err != nil {
return nil, fmt.Errorf("cannot decode value: %v", err)
}
}
return values, nil
}

// SyncIndex synchronizes the keys in the target Indexer to match those of the
// ref Indexer. The name portion of the stored data is not synchronized, only
// the key/value portion of the indexes.
func SyncIndex(ctx context.Context, ref, target Indexer) (bool, error) {
// Build reference index map
refs := map[string]string{}
err := ref.ForEach(ctx, "", func(key, value string) bool {
refs[value] = key
return true
})
if err != nil {
return false, err
}
if len(refs) == 0 {
return false, nil
}

// Compare current indexes
dels := map[string]string{}
err = target.ForEach(ctx, "", func(key, value string) bool {
refKey, ok := refs[value]
if ok && refKey == key {
// same in both; delete from refs, do not add to dels
delete(refs, value)
} else {
dels[value] = key
}
return true
})
if err != nil {
return false, err
}

// Items in dels are keys that no longer exist
for value, key := range dels {
err = target.Delete(ctx, key, value)
if err != nil {
return false, err
}
}

// What remains in refs are keys that need to be added
for value, key := range refs {
err = target.Add(ctx, key, value)
if err != nil {
return false, err
}
}

return len(refs) != 0 || len(dels) != 0, nil
}

func (x *indexer) deletePrefix(ctx context.Context, prefix string) (int, error) {
ents, err := x.queryPrefix(ctx, prefix)
if err != nil {
return 0, err
}

for i := range ents {
err = x.dstore.Delete(ds.NewKey(ents[i].Key))
if err != nil {
return 0, err
}
}

return len(ents), nil
}

func (x *indexer) queryPrefix(ctx context.Context, prefix string) ([]query.Entry, error) {
q := query.Query{
Prefix: prefix,
KeysOnly: true,
}
results, err := x.dstore.Query(q)
if err != nil {
return nil, err
}
return results.Rest()
}

func encode(data string) string {
encData, err := multibase.Encode(multibase.Base64url, []byte(data))
if err != nil {
// programming error; using unsupported encoding
panic(err.Error())
}
return encData
}

func decode(data string) (string, error) {
_, b, err := multibase.Decode(data)
if err != nil {
return "", err
}
return string(b), nil
}
Loading