From 49fa64322306eca1cecba767b2bd796f6c4c7e54 Mon Sep 17 00:00:00 2001 From: Islam Aleiv Date: Thu, 20 Jun 2024 17:42:26 +0200 Subject: [PATCH] WIP --- cli/utils.go | 1 + internal/core/block/block.go | 19 ++-- internal/core/crdt/base.go | 13 ++- internal/core/crdt/base_test.go | 4 +- internal/core/crdt/composite.go | 31 ++++++- internal/core/crdt/counter.go | 32 ++++++- internal/core/crdt/ipld_union.go | 13 +++ internal/core/crdt/lwwreg.go | 62 ++++++++++--- internal/core/crdt/lwwreg_test.go | 2 +- internal/core/delta.go | 6 ++ internal/db/collection.go | 1 + internal/db/context.go | 3 - internal/db/fetcher/versioned.go | 1 + internal/db/merge.go | 8 ++ internal/encryption/config.go | 16 ++++ internal/encryption/context.go | 18 ++++ internal/encryption/encryptor.go | 6 +- internal/merkle/clock/clock.go | 43 ++++++++- internal/merkle/clock/clock_test.go | 4 +- internal/merkle/crdt/lwwreg.go | 14 +-- internal/merkle/crdt/merklecrdt.go | 2 + internal/merkle/crdt/merklecrdt_test.go | 2 +- internal/planner/explain.go | 2 +- internal/planner/multi.go | 2 +- internal/planner/planner.go | 2 +- tests/integration/encryption/commit_test.go | 10 +-- tests/integration/encryption/peer_test.go | 96 +++++++++++++++++++++ tests/integration/utils2.go | 1 + 28 files changed, 359 insertions(+), 55 deletions(-) create mode 100644 internal/encryption/config.go diff --git a/cli/utils.go b/cli/utils.go index 95a43cc9b8..a5a2a804c1 100644 --- a/cli/utils.go +++ b/cli/utils.go @@ -171,6 +171,7 @@ func setContextDocEncryptionKey(cmd *cobra.Command, shouldEncrypt bool, txn data if txn != nil { ctx = encryption.ContextWithStore(ctx, txn) } + ctx = encryption.SetContextConfig(cmd.Context(), encryption.DocEncConfig{IsEncrypted: true}) cmd.SetContext(ctx) } diff --git a/internal/core/block/block.go b/internal/core/block/block.go index c9a3f629c2..d2642d81e6 100644 --- a/internal/core/block/block.go +++ b/internal/core/block/block.go @@ -103,6 +103,8 @@ type Block struct { Delta crdt.CRDT // Links are the links to other blocks in the DAG. Links []DAGLink + // TODO: see if it's better to keep it here instead of inside of delta + IsEncrypted *bool } // IPLDSchemaBytes returns the IPLD schema representation for the block. @@ -111,8 +113,9 @@ type Block struct { func (b Block) IPLDSchemaBytes() []byte { return []byte(` type Block struct { - delta CRDT - links [ DAGLink ] + delta CRDT + links [ DAGLink ] + isEncrypted optional Bool }`) } @@ -143,19 +146,9 @@ func New(delta core.Delta, links []DAGLink, heads ...cid.Cid) *Block { blockLinks = append(blockLinks, links...) - var crdtDelta crdt.CRDT - switch delta := delta.(type) { - case *crdt.LWWRegDelta: - crdtDelta = crdt.CRDT{LWWRegDelta: delta} - case *crdt.CompositeDAGDelta: - crdtDelta = crdt.CRDT{CompositeDAGDelta: delta} - case *crdt.CounterDelta: - crdtDelta = crdt.CRDT{CounterDelta: delta} - } - return &Block{ Links: blockLinks, - Delta: crdtDelta, + Delta: crdt.NewCRDT(delta), } } diff --git a/internal/core/crdt/base.go b/internal/core/crdt/base.go index 9958b7b1ac..3e6cfcd23a 100644 --- a/internal/core/crdt/base.go +++ b/internal/core/crdt/base.go @@ -27,7 +27,8 @@ import ( // tasks that all the CRDTs need to implement anyway type baseCRDT struct { store datastore.DSReaderWriter - key core.DataStoreKey + // encryptionKeyStore + key core.DataStoreKey // schemaVersionKey is the schema version datastore key at the time of commit. // @@ -35,19 +36,29 @@ type baseCRDT struct { schemaVersionKey core.CollectionSchemaVersionKey fieldName string + + // isEncrypted is a flag to indicate if the CRDT is encrypted + //isEncrypted bool } +// IsEncrypted returns true if the CRDT is encrypted +/*func (base baseCRDT) IsEncrypted() bool { + return base.isEncrypted +}*/ + func newBaseCRDT( store datastore.DSReaderWriter, key core.DataStoreKey, schemaVersionKey core.CollectionSchemaVersionKey, fieldName string, + isEncrypted bool, ) baseCRDT { return baseCRDT{ store: store, key: key, schemaVersionKey: schemaVersionKey, fieldName: fieldName, + //isEncrypted: isEncrypted, } } diff --git a/internal/core/crdt/base_test.go b/internal/core/crdt/base_test.go index 661c5cb7ce..50a4b0fb9a 100644 --- a/internal/core/crdt/base_test.go +++ b/internal/core/crdt/base_test.go @@ -29,11 +29,11 @@ func newSeededDS() datastore.DSReaderWriter { } func exampleBaseCRDT() baseCRDT { - return newBaseCRDT(newSeededDS(), core.DataStoreKey{}, core.CollectionSchemaVersionKey{}, "") + return newBaseCRDT(newSeededDS(), core.DataStoreKey{}, core.CollectionSchemaVersionKey{}, "", false) } func TestBaseCRDTNew(t *testing.T) { - base := newBaseCRDT(newDS(), core.DataStoreKey{}, core.CollectionSchemaVersionKey{}, "") + base := newBaseCRDT(newDS(), core.DataStoreKey{}, core.CollectionSchemaVersionKey{}, "", false) if base.store == nil { t.Error("newBaseCRDT needs to init store") } diff --git a/internal/core/crdt/composite.go b/internal/core/crdt/composite.go index 58372cfb49..db3f15cbe2 100644 --- a/internal/core/crdt/composite.go +++ b/internal/core/crdt/composite.go @@ -54,6 +54,35 @@ func (delta *CompositeDAGDelta) IPLDSchemaBytes() []byte { }`) } +func (delta *CompositeDAGDelta) GetData() []byte { + return nil +} + +func (delta *CompositeDAGDelta) SetData([]byte) { +} + +func (delta *CompositeDAGDelta) GetDocID() []byte { + return delta.DocID +} + +func (delta *CompositeDAGDelta) GetFieldName() string { + return delta.FieldName +} + +func (delta *CompositeDAGDelta) GetSchemaVersionID() string { + return delta.SchemaVersionID +} + +func (delta *CompositeDAGDelta) Clone() core.Delta { + return &CompositeDAGDelta{ + DocID: delta.DocID, + FieldName: delta.FieldName, + Priority: delta.Priority, + SchemaVersionID: delta.SchemaVersionID, + Status: delta.Status, + } +} + // GetPriority gets the current priority for this delta. func (delta *CompositeDAGDelta) GetPriority() uint64 { return delta.Priority @@ -77,7 +106,7 @@ func NewCompositeDAG( key core.DataStoreKey, fieldName string, ) CompositeDAG { - return CompositeDAG{newBaseCRDT(store, key, schemaVersionKey, fieldName)} + return CompositeDAG{newBaseCRDT(store, key, schemaVersionKey, fieldName, false)} } // Value is a no-op for a CompositeDAG. diff --git a/internal/core/crdt/counter.go b/internal/core/crdt/counter.go index 4aa9a40793..a19a3b7676 100644 --- a/internal/core/crdt/counter.go +++ b/internal/core/crdt/counter.go @@ -64,6 +64,36 @@ func (delta *CounterDelta) IPLDSchemaBytes() []byte { }`) } +func (delta *CounterDelta) GetData() []byte { + return nil +} + +func (delta *CounterDelta) SetData([]byte) { +} + +func (delta *CounterDelta) GetDocID() []byte { + return delta.DocID +} + +func (delta *CounterDelta) GetFieldName() string { + return delta.FieldName +} + +func (delta *CounterDelta) GetSchemaVersionID() string { + return delta.SchemaVersionID +} + +func (delta *CounterDelta) Clone() core.Delta { + return &CounterDelta{ + DocID: delta.DocID, + FieldName: delta.FieldName, + Priority: delta.Priority, + Nonce: delta.Nonce, + SchemaVersionID: delta.SchemaVersionID, + Data: delta.Data, + } +} + // GetPriority gets the current priority for this delta. func (delta *CounterDelta) GetPriority() uint64 { return delta.Priority @@ -93,7 +123,7 @@ func NewCounter( allowDecrement bool, kind client.ScalarKind, ) Counter { - return Counter{newBaseCRDT(store, key, schemaVersionKey, fieldName), allowDecrement, kind} + return Counter{newBaseCRDT(store, key, schemaVersionKey, fieldName, false), allowDecrement, kind} } // Value gets the current counter value diff --git a/internal/core/crdt/ipld_union.go b/internal/core/crdt/ipld_union.go index 361a41b150..66c4a8e451 100644 --- a/internal/core/crdt/ipld_union.go +++ b/internal/core/crdt/ipld_union.go @@ -19,6 +19,19 @@ type CRDT struct { CounterDelta *CounterDelta } +// NewCRDT returns a new CRDT. +func NewCRDT(delta core.Delta) CRDT { + switch d := delta.(type) { + case *LWWRegDelta: + return CRDT{LWWRegDelta: d} + case *CompositeDAGDelta: + return CRDT{CompositeDAGDelta: d} + case *CounterDelta: + return CRDT{CounterDelta: d} + } + return CRDT{} +} + // IPLDSchemaBytes returns the IPLD schema representation for the CRDT. // // This needs to match the [CRDT] struct or [mustSetSchema] will panic on init. diff --git a/internal/core/crdt/lwwreg.go b/internal/core/crdt/lwwreg.go index 15baf4060f..6c69079382 100644 --- a/internal/core/crdt/lwwreg.go +++ b/internal/core/crdt/lwwreg.go @@ -20,7 +20,6 @@ import ( "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/internal/core" "github.com/sourcenetwork/defradb/internal/db/base" - "github.com/sourcenetwork/defradb/internal/encryption" ) // LWWRegDelta is a single delta operation for an LWWRegister @@ -34,6 +33,8 @@ type LWWRegDelta struct { // It can be used to identify the collection datastructure state at the time of commit. SchemaVersionID string Data []byte + // TODO: move this one layer above + //IsEncrypted bool } var _ core.Delta = (*LWWRegDelta)(nil) @@ -44,14 +45,44 @@ var _ core.Delta = (*LWWRegDelta)(nil) func (delta LWWRegDelta) IPLDSchemaBytes() []byte { return []byte(` type LWWRegDelta struct { - docID Bytes - fieldName String - priority Int - schemaVersionID String - data Bytes + docID Bytes + fieldName String + priority Int + schemaVersionID String + data Bytes }`) } +func (delta *LWWRegDelta) GetData() []byte { + return delta.Data +} + +func (delta *LWWRegDelta) SetData(data []byte) { + delta.Data = data +} + +func (delta *LWWRegDelta) GetDocID() []byte { + return delta.DocID +} + +func (delta *LWWRegDelta) GetFieldName() string { + return delta.FieldName +} + +func (delta *LWWRegDelta) GetSchemaVersionID() string { + return delta.SchemaVersionID +} + +func (delta *LWWRegDelta) Clone() core.Delta { + return &LWWRegDelta{ + DocID: delta.DocID, + FieldName: delta.FieldName, + Priority: delta.Priority, + SchemaVersionID: delta.SchemaVersionID, + Data: delta.Data, + } +} + // GetPriority gets the current priority for this delta. func (delta *LWWRegDelta) GetPriority() uint64 { return delta.Priority @@ -76,8 +107,9 @@ func NewLWWRegister( schemaVersionKey core.CollectionSchemaVersionKey, key core.DataStoreKey, fieldName string, + isEncrypted bool, ) LWWRegister { - return LWWRegister{newBaseCRDT(store, key, schemaVersionKey, fieldName)} + return LWWRegister{newBaseCRDT(store, key, schemaVersionKey, fieldName, isEncrypted)} } // Value gets the current register value @@ -99,6 +131,7 @@ func (reg LWWRegister) Set(value []byte) *LWWRegDelta { DocID: []byte(reg.key.DocID), FieldName: reg.fieldName, SchemaVersionID: reg.schemaVersionKey.SchemaVersionID, + //IsEncrypted: reg.isEncrypted, } } @@ -114,11 +147,16 @@ func (reg LWWRegister) Merge(ctx context.Context, delta core.Delta) error { data := d.Data - var err error - data, err = encryption.DecryptDoc(ctx, string(d.DocID), 0, data) - if err != nil { - return err - } + /*if d.IsEncrypted { + plainText, err := encryption.DecryptDoc(ctx, string(d.DocID), 0, data) + if err != nil { + return err + } + if plainText == nil { + return nil + } + data = plainText + }*/ return reg.setValue(ctx, data, d.GetPriority()) } diff --git a/internal/core/crdt/lwwreg_test.go b/internal/core/crdt/lwwreg_test.go index 2083a5b800..7ec424ff26 100644 --- a/internal/core/crdt/lwwreg_test.go +++ b/internal/core/crdt/lwwreg_test.go @@ -28,7 +28,7 @@ func newMockStore() datastore.DSReaderWriter { func setupLWWRegister() LWWRegister { store := newMockStore() key := core.DataStoreKey{DocID: "AAAA-BBBB"} - return NewLWWRegister(store, core.CollectionSchemaVersionKey{}, key, "") + return NewLWWRegister(store, core.CollectionSchemaVersionKey{}, key, "", false) } func setupLoadedLWWRegster(ctx context.Context) LWWRegister { diff --git a/internal/core/delta.go b/internal/core/delta.go index 16cba6ce53..ff857c046a 100644 --- a/internal/core/delta.go +++ b/internal/core/delta.go @@ -15,4 +15,10 @@ package core type Delta interface { GetPriority() uint64 SetPriority(uint64) + GetData() []byte + SetData([]byte) + GetDocID() []byte + GetFieldName() string + GetSchemaVersionID() string + Clone() Delta } diff --git a/internal/db/collection.go b/internal/db/collection.go index bbdc4c117f..1ca6da305d 100644 --- a/internal/db/collection.go +++ b/internal/db/collection.go @@ -652,6 +652,7 @@ func (c *collection) save( fieldDescription.Kind, fieldKey, fieldDescription.Name, + false, ) if err != nil { return cid.Undef, err diff --git a/internal/db/context.go b/internal/db/context.go index 7b71758b0c..8ad51c86ce 100644 --- a/internal/db/context.go +++ b/internal/db/context.go @@ -89,9 +89,6 @@ func SetContextTxn(ctx context.Context, txn datastore.Txn) context.Context { return context.WithValue(ctx, txnContextKey{}, txn) } -// TryGetContextTxn returns an identity and a bool indicating if the -// identity was retrieved from the given context. - // GetContextIdentity returns the identity from the given context. // // If an identity does not exist `NoIdentity` is returned. diff --git a/internal/db/fetcher/versioned.go b/internal/db/fetcher/versioned.go index 6ce8f94ebc..1f2f9a9314 100644 --- a/internal/db/fetcher/versioned.go +++ b/internal/db/fetcher/versioned.go @@ -408,6 +408,7 @@ func (vf *VersionedFetcher) processBlock( kind, dsKey, fieldName, + false, ) if err != nil { return err diff --git a/internal/db/merge.go b/internal/db/merge.go index 7f78deb77e..3c19599400 100644 --- a/internal/db/merge.go +++ b/internal/db/merge.go @@ -265,6 +265,9 @@ func (mp *mergeProcessor) loadComposites( func (mp *mergeProcessor) mergeComposites(ctx context.Context) error { for e := mp.composites.Front(); e != nil; e = e.Next() { block := e.Value.(*coreblock.Block) + // we can check block.IsEncrypted and pass it with the context + // we can keep track if composite is encrypted and also to double check in processBlock + // to make sure field-level encryption is used or not. link, err := block.GenerateLink() if err != nil { return err @@ -283,6 +286,8 @@ func (mp *mergeProcessor) processBlock( block *coreblock.Block, blockLink cidlink.Link, ) error { + // in this method we can keep track if composite or fields need to be encrypted and possibly pass around + // through the context crdt, err := mp.initCRDTForType(block.Delta.GetFieldName()) if err != nil { return err @@ -294,6 +299,8 @@ func (mp *mergeProcessor) processBlock( return nil } + // we want to decrypt block before calling ProcessBlock because ProcessBlock is also called by + // AddDelta err = crdt.Clock().ProcessBlock(ctx, block, blockLink) if err != nil { return err @@ -359,6 +366,7 @@ func (mp *mergeProcessor) initCRDTForType( fd.Kind, mp.dsKey.WithFieldId(fd.ID.String()), field, + false, ) if err != nil { return nil, err diff --git a/internal/encryption/config.go b/internal/encryption/config.go new file mode 100644 index 0000000000..ddb4a3815a --- /dev/null +++ b/internal/encryption/config.go @@ -0,0 +1,16 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package encryption + +// DocEncConfig is the configuration for document encryption. +type DocEncConfig struct { + IsEncrypted bool +} diff --git a/internal/encryption/context.go b/internal/encryption/context.go index d79e62b359..f3f1fec113 100644 --- a/internal/encryption/context.go +++ b/internal/encryption/context.go @@ -14,11 +14,15 @@ import ( "context" "github.com/sourcenetwork/defradb/datastore" + "github.com/sourcenetwork/immutable" ) // docEncContextKey is the key type for document encryption context values. type docEncContextKey struct{} +// configContextKey is the key type for encryption context values. +type configContextKey struct{} + // TryGetContextDocEnc returns a document encryption and a bool indicating if // it was retrieved from the given context. func TryGetContextEncryptor(ctx context.Context) (*DocEncryptor, bool) { @@ -46,3 +50,17 @@ func ContextWithStore(ctx context.Context, txn datastore.Txn) context.Context { encryptor.SetStore(txn.Encstore()) return ctx } + +// GetContextConfig returns the doc encryption config from the given context. +func GetContextConfig(ctx context.Context) immutable.Option[DocEncConfig] { + encConfig, ok := ctx.Value(configContextKey{}).(DocEncConfig) + if ok { + return immutable.Some(encConfig) + } + return immutable.None[DocEncConfig]() +} + +// SetContextConfig returns a new context with the encryption value set. +func SetContextConfig(ctx context.Context, encConfig DocEncConfig) context.Context { + return context.WithValue(ctx, configContextKey{}, encConfig) +} diff --git a/internal/encryption/encryptor.go b/internal/encryption/encryptor.go index 97d30a2af4..f22cee25ee 100644 --- a/internal/encryption/encryptor.go +++ b/internal/encryption/encryptor.go @@ -90,7 +90,7 @@ func (d *DocEncryptor) Decrypt(docID string, fieldID uint32, cipherText []byte) return nil, err } if len(encKey) == 0 { - return cipherText, nil + return nil, nil } return DecryptAES(cipherText, encKey) } @@ -113,7 +113,7 @@ func (d *DocEncryptor) fetchEncryptionKey(docID string, fieldID uint32) ([]byte, func EncryptDoc(ctx context.Context, docID string, fieldID uint32, plainText []byte) ([]byte, error) { enc, ok := TryGetContextEncryptor(ctx) if !ok { - return plainText, nil + return nil, nil } return enc.Encrypt(docID, fieldID, plainText) } @@ -121,7 +121,7 @@ func EncryptDoc(ctx context.Context, docID string, fieldID uint32, plainText []b func DecryptDoc(ctx context.Context, docID string, fieldID uint32, cipherText []byte) ([]byte, error) { enc, ok := TryGetContextEncryptor(ctx) if !ok { - return cipherText, nil + return nil, nil } return enc.Decrypt(docID, fieldID, cipherText) } diff --git a/internal/merkle/clock/clock.go b/internal/merkle/clock/clock.go index 06cccb6467..f826f767c9 100644 --- a/internal/merkle/clock/clock.go +++ b/internal/merkle/clock/clock.go @@ -24,6 +24,8 @@ import ( "github.com/sourcenetwork/defradb/datastore" "github.com/sourcenetwork/defradb/internal/core" coreblock "github.com/sourcenetwork/defradb/internal/core/block" + "github.com/sourcenetwork/defradb/internal/core/crdt" + "github.com/sourcenetwork/defradb/internal/encryption" ) var ( @@ -85,12 +87,32 @@ func (mc *MerkleClock) AddDelta( delta.SetPriority(height) block := coreblock.New(delta, links, heads...) + // TODO: check context instead of height check + if height == 1 { + //toEncrypt := true + //block.IsEncrypted = &toEncrypt + } else if len(heads) > 0 { + // to test this we need 2 peers: 1 updates encrypted doc, the second updates with unecnrypted flag + // we don't have unencryption yet + //bytes, err := mc.dagstore.AsIPLDStorage().Get(ctx, heads[0].KeyString()) + //prevBlock, err :=coreblock.GetFromBytes(bytes) + //block.IsEncrypted + // could do block.EncryptDelta(encKey) + } + + // block needs to be encrypted here + // we could call block.WithEncryptedDelta(ctx) // Write the new block to the dag store. - link, err := mc.putBlock(ctx, block) + dagBlock, err := encryptBlockIfNeeded(ctx, block) + if err != nil { + return cidlink.Link{}, nil, err + } + link, err := mc.putBlock(ctx, dagBlock) if err != nil { return cidlink.Link{}, nil, err } + // block doesn't need to be encrypted here // merge the delta and update the state err = mc.ProcessBlock( ctx, @@ -109,6 +131,25 @@ func (mc *MerkleClock) AddDelta( return link, b, err } +func encryptBlockIfNeeded(ctx context.Context, block *coreblock.Block) (*coreblock.Block, error) { + encConf := encryption.GetContextConfig(ctx) + if encConf.HasValue() && encConf.Value().IsEncrypted { + delta := block.Delta.GetDelta() + cloned := delta.Clone() + // TODO: should I change EncryptDoc to accept DocID as `[]byte`? + bytes, err := encryption.EncryptDoc(ctx, string(cloned.GetDocID()), 0, cloned.GetData()) + if err != nil { + return nil, err + } + cloned.SetData(bytes) + isEncrypted := true + return &coreblock.Block{Delta: crdt.NewCRDT(cloned), Links: block.Links, IsEncrypted: &isEncrypted}, nil + //return &coreblock.Block{Delta: crdt.NewCRDT(cloned), Links: block.Links, IsEncrypted: true}, nil + } + + return block, nil +} + // ProcessBlock merges the delta CRDT and updates the state accordingly. func (mc *MerkleClock) ProcessBlock( ctx context.Context, diff --git a/internal/merkle/clock/clock_test.go b/internal/merkle/clock/clock_test.go index c9a51a7a1e..c70e37f47e 100644 --- a/internal/merkle/clock/clock_test.go +++ b/internal/merkle/clock/clock_test.go @@ -33,7 +33,7 @@ func newTestMerkleClock() *MerkleClock { s := newDS() multistore := datastore.MultiStoreFrom(s) - reg := crdt.NewLWWRegister(multistore.Rootstore(), core.CollectionSchemaVersionKey{}, core.DataStoreKey{}, "") + reg := crdt.NewLWWRegister(multistore.Rootstore(), core.CollectionSchemaVersionKey{}, core.DataStoreKey{}, "", false) return NewMerkleClock( multistore.Headstore(), multistore.DAGstore(), @@ -45,7 +45,7 @@ func newTestMerkleClock() *MerkleClock { func TestNewMerkleClock(t *testing.T) { s := newDS() multistore := datastore.MultiStoreFrom(s) - reg := crdt.NewLWWRegister(multistore.Rootstore(), core.CollectionSchemaVersionKey{}, core.DataStoreKey{}, "") + reg := crdt.NewLWWRegister(multistore.Rootstore(), core.CollectionSchemaVersionKey{}, core.DataStoreKey{}, "", false) clk := NewMerkleClock(multistore.Headstore(), multistore.DAGstore(), core.HeadStoreKey{}, reg) if clk.headstore != multistore.Headstore() { diff --git a/internal/merkle/crdt/lwwreg.go b/internal/merkle/crdt/lwwreg.go index bc1a6af640..dc59226393 100644 --- a/internal/merkle/crdt/lwwreg.go +++ b/internal/merkle/crdt/lwwreg.go @@ -18,7 +18,6 @@ import ( "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/internal/core" corecrdt "github.com/sourcenetwork/defradb/internal/core/crdt" - "github.com/sourcenetwork/defradb/internal/encryption" "github.com/sourcenetwork/defradb/internal/merkle/clock" ) @@ -36,8 +35,9 @@ func NewMerkleLWWRegister( schemaVersionKey core.CollectionSchemaVersionKey, key core.DataStoreKey, fieldName string, + isEncrypted bool, ) *MerkleLWWRegister { - register := corecrdt.NewLWWRegister(store.Datastore(), schemaVersionKey, key, fieldName) + register := corecrdt.NewLWWRegister(store.Datastore(), schemaVersionKey, key, fieldName, isEncrypted) clk := clock.NewMerkleClock(store.Headstore(), store.DAGstore(), key.ToHeadStoreKey(), register) base := &baseMerkleCRDT{clock: clk, crdt: register} return &MerkleLWWRegister{ @@ -57,10 +57,12 @@ func (mlwwreg *MerkleLWWRegister) Save(ctx context.Context, data any) (cidlink.L return cidlink.Link{}, nil, err } - bytes, err = encryption.EncryptDoc(ctx, value.DocID, 0, bytes) - if err != nil { - return cidlink.Link{}, nil, err - } + /*if mlwwreg.reg.IsEncrypted() { + bytes, err = encryption.EncryptDoc(ctx, value.DocID, 0, bytes) + if err != nil { + return cidlink.Link{}, nil, err + } + }*/ // Set() call on underlying LWWRegister CRDT // persist/publish delta diff --git a/internal/merkle/crdt/merklecrdt.go b/internal/merkle/crdt/merklecrdt.go index abc0ffeb51..76624a1d5c 100644 --- a/internal/merkle/crdt/merklecrdt.go +++ b/internal/merkle/crdt/merklecrdt.go @@ -78,6 +78,7 @@ func InstanceWithStore( kind client.FieldKind, key core.DataStoreKey, fieldName string, + isEncrypted bool, ) (MerkleCRDT, error) { switch cType { case client.LWW_REGISTER: @@ -86,6 +87,7 @@ func InstanceWithStore( schemaVersionKey, key, fieldName, + isEncrypted, ), nil case client.PN_COUNTER, client.P_COUNTER: return NewMerkleCounter( diff --git a/internal/merkle/crdt/merklecrdt_test.go b/internal/merkle/crdt/merklecrdt_test.go index bd42223509..593a9a7a67 100644 --- a/internal/merkle/crdt/merklecrdt_test.go +++ b/internal/merkle/crdt/merklecrdt_test.go @@ -31,7 +31,7 @@ func newTestBaseMerkleCRDT() (*baseMerkleCRDT, datastore.DSReaderWriter) { s := newDS() multistore := datastore.MultiStoreFrom(s) - reg := crdt.NewLWWRegister(multistore.Datastore(), core.CollectionSchemaVersionKey{}, core.DataStoreKey{}, "") + reg := crdt.NewLWWRegister(multistore.Datastore(), core.CollectionSchemaVersionKey{}, core.DataStoreKey{}, "", false) clk := clock.NewMerkleClock(multistore.Headstore(), multistore.DAGstore(), core.HeadStoreKey{}, reg) return &baseMerkleCRDT{clock: clk, crdt: reg}, multistore.Rootstore() } diff --git a/internal/planner/explain.go b/internal/planner/explain.go index f6d3f57209..34c3b3b644 100644 --- a/internal/planner/explain.go +++ b/internal/planner/explain.go @@ -342,7 +342,7 @@ func collectExecuteExplainInfo(executedPlan planNode) (map[string]any, error) { // Note: This function only fails if the collection of the datapoints goes wrong, otherwise // even if plan execution fails this function would return the collected datapoints. func (p *Planner) executeAndExplainRequest( - ctx context.Context, + _ context.Context, plan planNode, ) ([]map[string]any, error) { executionSuccess := false diff --git a/internal/planner/multi.go b/internal/planner/multi.go index 27d6886d7c..de220c43e5 100644 --- a/internal/planner/multi.go +++ b/internal/planner/multi.go @@ -131,7 +131,7 @@ func (p *parallelNode) Next() (bool, error) { return orNext, nil } -func (p *parallelNode) nextMerge(index int, plan planNode) (bool, error) { +func (p *parallelNode) nextMerge(_ int, plan planNode) (bool, error) { if next, err := plan.Next(); !next { return false, err } diff --git a/internal/planner/planner.go b/internal/planner/planner.go index f7a875af70..0f513a045c 100644 --- a/internal/planner/planner.go +++ b/internal/planner/planner.go @@ -528,7 +528,7 @@ func walkAndFindPlanType[T planNode](planNode planNode) (T, bool) { // executeRequest executes the plan graph that represents the request that was made. func (p *Planner) executeRequest( - ctx context.Context, + _ context.Context, planNode planNode, ) ([]map[string]any, error) { if err := planNode.Start(); err != nil { diff --git a/tests/integration/encryption/commit_test.go b/tests/integration/encryption/commit_test.go index d856d82b82..95568e489f 100644 --- a/tests/integration/encryption/commit_test.go +++ b/tests/integration/encryption/commit_test.go @@ -53,7 +53,7 @@ func TestDocEncryption_ShouldStoreCommitsDeltaEncrypted(t *testing.T) { `, Results: []map[string]any{ { - "cid": "bafyreicv422zhiuqefs32wp7glrqsbjpy76hgem4ivagm2ttuli43wluci", + "cid": "bafyreidrbl46bz5nuzuby6s4zqvzliq4gyup3pq6ipy7ljm5o7l5hxtjhm", "collectionID": int64(1), "delta": encrypt(testUtils.CBORValue(21)), "docID": "bae-c9fb0fa4-1195-589c-aa54-e68333fb90b3", @@ -63,7 +63,7 @@ func TestDocEncryption_ShouldStoreCommitsDeltaEncrypted(t *testing.T) { "links": []map[string]any{}, }, { - "cid": "bafyreie6i4dw5jh6bp2anszqkmuwfslsemzatrflipetljhtpjhjn3zbum", + "cid": "bafyreighzsctnwzhw57nbzici6dbvohozwet5w2baey3p4dxtxp7wxybui", "collectionID": int64(1), "delta": encrypt(testUtils.CBORValue("John")), "docID": "bae-c9fb0fa4-1195-589c-aa54-e68333fb90b3", @@ -73,7 +73,7 @@ func TestDocEncryption_ShouldStoreCommitsDeltaEncrypted(t *testing.T) { "links": []map[string]any{}, }, { - "cid": "bafyreia747gvxxbowag2mob2up34zwh364olc7ocab3nunj2ikdxq7srom", + "cid": "bafyreidzfgvlx6eaj4furwl3mpvxp3wslbvzs4hvknivhpjw7g275k5v5i", "collectionID": int64(1), "delta": nil, "docID": "bae-c9fb0fa4-1195-589c-aa54-e68333fb90b3", @@ -82,11 +82,11 @@ func TestDocEncryption_ShouldStoreCommitsDeltaEncrypted(t *testing.T) { "height": int64(1), "links": []map[string]any{ { - "cid": "bafyreicv422zhiuqefs32wp7glrqsbjpy76hgem4ivagm2ttuli43wluci", + "cid": "bafyreidrbl46bz5nuzuby6s4zqvzliq4gyup3pq6ipy7ljm5o7l5hxtjhm", "name": "age", }, { - "cid": "bafyreie6i4dw5jh6bp2anszqkmuwfslsemzatrflipetljhtpjhjn3zbum", + "cid": "bafyreighzsctnwzhw57nbzici6dbvohozwet5w2baey3p4dxtxp7wxybui", "name": "name", }, }, diff --git a/tests/integration/encryption/peer_test.go b/tests/integration/encryption/peer_test.go index 6793f3991f..d810833ba7 100644 --- a/tests/integration/encryption/peer_test.go +++ b/tests/integration/encryption/peer_test.go @@ -63,3 +63,99 @@ func TestDocEncryptionPeer_IfPeerHasNoKey_ShouldNotFetch(t *testing.T) { testUtils.ExecuteTestCase(t, test) } +func TestDocEncryptionPeer_UponSync_ShouldSyncEncryptedDAG(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + age: Int + } + `, + }, + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.SubscribeToCollection{ + NodeID: 1, + CollectionIDs: []int{0}, + }, + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Doc: `{ + "name": "John", + "age": 21 + }`, + IsEncrypted: true, + }, + testUtils.WaitForSync{}, + testUtils.Request{ + NodeID: immutable.Some(1), + Request: ` + query { + commits { + cid + collectionID + delta + docID + fieldId + fieldName + height + links { + cid + name + } + } + } + `, + Results: []map[string]any{ + { + "cid": "bafyreicv422zhiuqefs32wp7glrqsbjpy76hgem4ivagm2ttuli43wluci", + "collectionID": int64(1), + "delta": encrypt(testUtils.CBORValue(21)), + "docID": "bae-c9fb0fa4-1195-589c-aa54-e68333fb90b3", + "fieldId": "1", + "fieldName": "age", + "height": int64(1), + "links": []map[string]any{}, + }, + { + "cid": "bafyreie6i4dw5jh6bp2anszqkmuwfslsemzatrflipetljhtpjhjn3zbum", + "collectionID": int64(1), + "delta": encrypt(testUtils.CBORValue("John")), + "docID": "bae-c9fb0fa4-1195-589c-aa54-e68333fb90b3", + "fieldId": "2", + "fieldName": "name", + "height": int64(1), + "links": []map[string]any{}, + }, + { + "cid": "bafyreia747gvxxbowag2mob2up34zwh364olc7ocab3nunj2ikdxq7srom", + "collectionID": int64(1), + "delta": nil, + "docID": "bae-c9fb0fa4-1195-589c-aa54-e68333fb90b3", + "fieldId": "C", + "fieldName": nil, + "height": int64(1), + "links": []map[string]any{ + { + "cid": "bafyreicv422zhiuqefs32wp7glrqsbjpy76hgem4ivagm2ttuli43wluci", + "name": "age", + }, + { + "cid": "bafyreie6i4dw5jh6bp2anszqkmuwfslsemzatrflipetljhtpjhjn3zbum", + "name": "name", + }, + }, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/utils2.go b/tests/integration/utils2.go index aaa23efe26..c88ee3755c 100644 --- a/tests/integration/utils2.go +++ b/tests/integration/utils2.go @@ -1228,6 +1228,7 @@ func makeContextForDocCreate(ctx context.Context, action *CreateDoc, txn datasto ctx = db.SetContextIdentity(ctx, action.Identity) if action.IsEncrypted { ctx = encryption.Context(ctx) + ctx = encryption.SetContextConfig(ctx, encryption.DocEncConfig{IsEncrypted: true}) } if txn != nil { ctx = encryption.ContextWithStore(ctx, txn)