Skip to content

Commit

Permalink
Update the datastore interface to return the number of deleted relati…
Browse files Browse the repository at this point in the history
…onships
  • Loading branch information
josephschorr committed Mar 10, 2025
1 parent d909242 commit b1eda0f
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 69 deletions.
14 changes: 7 additions & 7 deletions internal/datastore/crdb/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func exactRelationshipClause(r tuple.Relationship) sq.Eq {
}
}

func (rwt *crdbReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
func (rwt *crdbReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (uint64, bool, error) {
// Add clauses for the ResourceFilter
query := rwt.queryDeleteTuples()

Expand All @@ -401,7 +401,7 @@ func (rwt *crdbReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1
}
if filter.OptionalResourceIdPrefix != "" {
if strings.Contains(filter.OptionalResourceIdPrefix, "%") {
return false, fmt.Errorf("unable to delete relationships with a prefix containing the %% character")
return 0, false, fmt.Errorf("unable to delete relationships with a prefix containing the %% character")
}

query = query.Where(sq.Like{colObjectID: filter.OptionalResourceIdPrefix + "%"})
Expand Down Expand Up @@ -434,24 +434,24 @@ func (rwt *crdbReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1

sql, args, err := query.ToSql()
if err != nil {
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
return 0, false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

modified, err := rwt.tx.Exec(ctx, sql, args...)
if err != nil {
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
return 0, false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

rwt.relCountChange -= modified.RowsAffected()
rowsAffected, err := safecast.ToUint64(modified.RowsAffected())
if err != nil {
return false, spiceerrors.MustBugf("could not cast RowsAffected to uint64: %v", err)
return 0, false, spiceerrors.MustBugf("could not cast RowsAffected to uint64: %v", err)
}
if delLimit > 0 && rowsAffected == delLimit {
return true, nil
return rowsAffected, true, nil
}

return false, nil
return rowsAffected, false, nil
}

func (rwt *crdbReadWriteTXN) WriteNamespaces(ctx context.Context, newConfigs ...*core.NamespaceDefinition) error {
Expand Down
16 changes: 8 additions & 8 deletions internal/datastore/memdb/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,13 @@ func (rwt *memdbReadWriteTx) toCaveatReference(mutation tuple.RelationshipUpdate
return cr
}

func (rwt *memdbReadWriteTx) DeleteRelationships(_ context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
func (rwt *memdbReadWriteTx) DeleteRelationships(_ context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (uint64, bool, error) {
rwt.mustLock()
defer rwt.Unlock()

tx, err := rwt.txSource()
if err != nil {
return false, err
return 0, false, err
}

delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...)
Expand All @@ -153,16 +153,16 @@ func (rwt *memdbReadWriteTx) DeleteRelationships(_ context.Context, filter *v1.R
}

// caller must already hold the concurrent access lock
func (rwt *memdbReadWriteTx) deleteWithLock(tx *memdb.Txn, filter *v1.RelationshipFilter, limit uint64) (bool, error) {
func (rwt *memdbReadWriteTx) deleteWithLock(tx *memdb.Txn, filter *v1.RelationshipFilter, limit uint64) (uint64, bool, error) {
// Create an iterator to find the relevant tuples
dsFilter, err := datastore.RelationshipsFilterFromPublicFilter(filter)
if err != nil {
return false, err
return 0, false, err
}

bestIter, err := iteratorForFilter(tx, dsFilter)
if err != nil {
return false, err
return 0, false, err
}
filteredIter := memdb.NewFilterIterator(bestIter, relationshipFilterFilterFunc(filter))

Expand All @@ -174,7 +174,7 @@ func (rwt *memdbReadWriteTx) deleteWithLock(tx *memdb.Txn, filter *v1.Relationsh
for row := filteredIter.Next(); row != nil; row = filteredIter.Next() {
rt, err := row.(*relationship).Relationship()
if err != nil {
return false, err
return 0, false, err
}
mutations = append(mutations, tuple.Delete(rt))
counter++
Expand All @@ -185,7 +185,7 @@ func (rwt *memdbReadWriteTx) deleteWithLock(tx *memdb.Txn, filter *v1.Relationsh
}
}

return metLimit, rwt.write(tx, mutations...)
return counter, metLimit, rwt.write(tx, mutations...)
}

func (rwt *memdbReadWriteTx) RegisterCounter(ctx context.Context, name string, filter *core.RelationshipFilter) error {
Expand Down Expand Up @@ -320,7 +320,7 @@ func (rwt *memdbReadWriteTx) DeleteNamespaces(_ context.Context, nsNames ...stri
}

// Delete the relationships from the namespace
if _, err := rwt.deleteWithLock(tx, &v1.RelationshipFilter{
if _, _, err := rwt.deleteWithLock(tx, &v1.RelationshipFilter{
ResourceType: nsName,
}, 0); err != nil {
return fmt.Errorf("unable to delete relationships from deleted namespace: %w", err)
Expand Down
16 changes: 8 additions & 8 deletions internal/datastore/mysql/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func (rwt *mysqlReadWriteTXN) WriteRelationships(ctx context.Context, mutations
return nil
}

func (rwt *mysqlReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
func (rwt *mysqlReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (uint64, bool, error) {
// Add clauses for the ResourceFilter
query := rwt.DeleteRelsQuery
if filter.ResourceType != "" {
Expand All @@ -353,7 +353,7 @@ func (rwt *mysqlReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v
}
if filter.OptionalResourceIdPrefix != "" {
if strings.Contains(filter.OptionalResourceIdPrefix, "%") {
return false, fmt.Errorf("unable to delete relationships with a prefix containing the %% character")
return 0, false, fmt.Errorf("unable to delete relationships with a prefix containing the %% character")
}

query = query.Where(sq.Like{colObjectID: filter.OptionalResourceIdPrefix + "%"})
Expand Down Expand Up @@ -385,29 +385,29 @@ func (rwt *mysqlReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v

querySQL, args, err := query.ToSql()
if err != nil {
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
return 0, false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

modified, err := rwt.tx.ExecContext(ctx, querySQL, args...)
if err != nil {
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
return 0, false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

rowsAffected, err := modified.RowsAffected()
if err != nil {
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
return 0, false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

uintRowsAffected, err := safecast.ToUint64(rowsAffected)
if err != nil {
return false, spiceerrors.MustBugf("rowsAffected was negative: %v", err)
return 0, false, spiceerrors.MustBugf("rowsAffected was negative: %v", err)
}

if delLimit > 0 && uintRowsAffected == delLimit {
return true, nil
return uintRowsAffected, true, nil
}

return false, nil
return uintRowsAffected, false, nil
}

func (rwt *mysqlReadWriteTXN) WriteNamespaces(ctx context.Context, newNamespaces ...*core.NamespaceDefinition) error {
Expand Down
40 changes: 26 additions & 14 deletions internal/datastore/postgres/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,20 +419,21 @@ func handleWriteError(err error) error {
return fmt.Errorf(errUnableToWriteRelationships, err)
}

func (rwt *pgReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
func (rwt *pgReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (uint64, bool, error) {
delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...)
if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 {
return rwt.deleteRelationshipsWithLimit(ctx, filter, *delOpts.DeleteLimit)
}

return false, rwt.deleteRelationships(ctx, filter)
numDeleted, err := rwt.deleteRelationships(ctx, filter)
return numDeleted, false, err
}

func (rwt *pgReadWriteTXN) deleteRelationshipsWithLimit(ctx context.Context, filter *v1.RelationshipFilter, limit uint64) (bool, error) {
func (rwt *pgReadWriteTXN) deleteRelationshipsWithLimit(ctx context.Context, filter *v1.RelationshipFilter, limit uint64) (uint64, bool, error) {
// validate the limit
intLimit, err := safecast.ToInt64(limit)
if err != nil {
return false, fmt.Errorf("limit argument could not safely be cast to int64: %w", err)
return 0, false, fmt.Errorf("limit argument could not safely be cast to int64: %w", err)
}

// Construct a select query for the relationships to be removed.
Expand All @@ -449,7 +450,7 @@ func (rwt *pgReadWriteTXN) deleteRelationshipsWithLimit(ctx context.Context, fil
}
if filter.OptionalResourceIdPrefix != "" {
if strings.Contains(filter.OptionalResourceIdPrefix, "%") {
return false, fmt.Errorf("unable to delete relationships with a prefix containing the %% character")
return 0, false, fmt.Errorf("unable to delete relationships with a prefix containing the %% character")
}

query = query.Where(sq.Like{colObjectID: filter.OptionalResourceIdPrefix + "%"})
Expand All @@ -470,7 +471,7 @@ func (rwt *pgReadWriteTXN) deleteRelationshipsWithLimit(ctx context.Context, fil

selectSQL, args, err := query.ToSql()
if err != nil {
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
return 0, false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

args = append(args, rwt.newXID)
Expand All @@ -493,13 +494,18 @@ func (rwt *pgReadWriteTXN) deleteRelationshipsWithLimit(ctx context.Context, fil

result, err := rwt.tx.Exec(ctx, cteSQL, args...)
if err != nil {
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
return 0, false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

return result.RowsAffected() == intLimit, nil
numDeleted, err := safecast.ToUint64(result.RowsAffected())
if err != nil {
return 0, false, fmt.Errorf("unable to cast rows affected to uint64: %w", err)
}

return numDeleted, result.RowsAffected() == intLimit, nil
}

func (rwt *pgReadWriteTXN) deleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error {
func (rwt *pgReadWriteTXN) deleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) (uint64, error) {
// Add clauses for the ResourceFilter
query := deleteTuple
if filter.ResourceType != "" {
Expand All @@ -513,7 +519,7 @@ func (rwt *pgReadWriteTXN) deleteRelationships(ctx context.Context, filter *v1.R
}
if filter.OptionalResourceIdPrefix != "" {
if strings.Contains(filter.OptionalResourceIdPrefix, "%") {
return fmt.Errorf("unable to delete relationships with a prefix containing the %% character")
return 0, fmt.Errorf("unable to delete relationships with a prefix containing the %% character")
}

query = query.Where(sq.Like{colObjectID: filter.OptionalResourceIdPrefix + "%"})
Expand All @@ -532,14 +538,20 @@ func (rwt *pgReadWriteTXN) deleteRelationships(ctx context.Context, filter *v1.R

sql, args, err := query.Set(colDeletedXid, rwt.newXID).ToSql()
if err != nil {
return fmt.Errorf(errUnableToDeleteRelationships, err)
return 0, fmt.Errorf(errUnableToDeleteRelationships, err)
}

result, err := rwt.tx.Exec(ctx, sql, args...)
if err != nil {
return 0, fmt.Errorf(errUnableToDeleteRelationships, err)
}

if _, err := rwt.tx.Exec(ctx, sql, args...); err != nil {
return fmt.Errorf(errUnableToDeleteRelationships, err)
numDeleted, err := safecast.ToUint64(result.RowsAffected())
if err != nil {
return 0, fmt.Errorf("unable to cast rows affected to uint64: %w", err)
}

return nil
return numDeleted, nil
}

func (rwt *pgReadWriteTXN) WriteNamespaces(ctx context.Context, newConfigs ...*core.NamespaceDefinition) error {
Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/proxy/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (rwt *observableRWT) DeleteNamespaces(ctx context.Context, nsNames ...strin
return rwt.delegate.DeleteNamespaces(ctx, nsNames...)
}

func (rwt *observableRWT) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, options ...options.DeleteOptionsOption) (bool, error) {
func (rwt *observableRWT) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, options ...options.DeleteOptionsOption) (uint64, bool, error) {
ctx, closer := observe(ctx, "DeleteRelationships", trace.WithAttributes(
filterToAttributes(filter)...,
))
Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/proxy/proxy_test/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,9 @@ func (dm *MockReadWriteTransaction) WriteRelationships(_ context.Context, mutati
return args.Error(0)
}

func (dm *MockReadWriteTransaction) DeleteRelationships(_ context.Context, filter *v1.RelationshipFilter, options ...options.DeleteOptionsOption) (bool, error) {
func (dm *MockReadWriteTransaction) DeleteRelationships(_ context.Context, filter *v1.RelationshipFilter, options ...options.DeleteOptionsOption) (uint64, bool, error) {
args := dm.Called(filter)
return false, args.Error(0)
return 0, false, args.Error(0)
}

func (dm *MockReadWriteTransaction) WriteNamespaces(_ context.Context, newConfigs ...*core.NamespaceDefinition) error {
Expand Down
24 changes: 12 additions & 12 deletions internal/datastore/spanner/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,51 +147,51 @@ func spannerMutation(
return
}

func (rwt spannerReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
limitReached, err := deleteWithFilter(ctx, rwt.spannerRWT, filter, opts...)
func (rwt spannerReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (uint64, bool, error) {
numDeleted, limitReached, err := deleteWithFilter(ctx, rwt.spannerRWT, filter, opts...)
if err != nil {
return false, fmt.Errorf(errUnableToDeleteRelationships, err)
return 0, false, fmt.Errorf(errUnableToDeleteRelationships, err)
}

return limitReached, nil
return numDeleted, limitReached, nil
}

func deleteWithFilter(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) {
func deleteWithFilter(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (uint64, bool, error) {
delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...)
var delLimit uint64
if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 {
delLimit = *delOpts.DeleteLimit
if delLimit > inLimit {
return false, spiceerrors.MustBugf("delete limit %d exceeds maximum of %d in spanner", delLimit, inLimit)
return 0, false, spiceerrors.MustBugf("delete limit %d exceeds maximum of %d in spanner", delLimit, inLimit)
}
}

var numDeleted int64
if delLimit > 0 {
nu, err := deleteWithFilterAndLimit(ctx, rwt, filter, delLimit)
if err != nil {
return false, err
return 0, false, err
}
numDeleted = nu
} else {
nu, err := deleteWithFilterAndNoLimit(ctx, rwt, filter)
if err != nil {
return false, err
return 0, false, err
}

numDeleted = nu
}

uintNumDeleted, err := safecast.ToUint64(numDeleted)
if err != nil {
return false, spiceerrors.MustBugf("numDeleted was negative: %v", err)
return 0, false, spiceerrors.MustBugf("numDeleted was negative: %v", err)
}

if delLimit > 0 && uintNumDeleted == delLimit {
return true, nil
return uintNumDeleted, true, nil
}

return false, nil
return uintNumDeleted, false, nil
}

func deleteWithFilterAndLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, delLimit uint64) (int64, error) {
Expand Down Expand Up @@ -391,7 +391,7 @@ func (rwt spannerReadWriteTXN) DeleteNamespaces(ctx context.Context, nsNames ...
// Ensure the namespace exists.

relFilter := &v1.RelationshipFilter{ResourceType: nsName}
if _, err := deleteWithFilter(ctx, rwt.spannerRWT, relFilter); err != nil {
if _, _, err := deleteWithFilter(ctx, rwt.spannerRWT, relFilter); err != nil {
return fmt.Errorf(errUnableToDeleteConfig, err)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/services/v1/relationships.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ func (ps *permissionServer) DeleteRelationships(ctx context.Context, req *v1.Del
// Delete with the specified limit.
if req.OptionalLimit > 0 {
deleteLimit := uint64(req.OptionalLimit)
reachedLimit, err := rwt.DeleteRelationships(ctx, req.RelationshipFilter, options.WithDeleteLimit(&deleteLimit))
_, reachedLimit, err := rwt.DeleteRelationships(ctx, req.RelationshipFilter, options.WithDeleteLimit(&deleteLimit))
if err != nil {
return err
}
Expand All @@ -489,7 +489,7 @@ func (ps *permissionServer) DeleteRelationships(ctx context.Context, req *v1.Del
}

// Otherwise, kick off an unlimited deletion.
_, err = rwt.DeleteRelationships(ctx, req.RelationshipFilter)
_, _, err = rwt.DeleteRelationships(ctx, req.RelationshipFilter)
return err
}, options.WithMetadata(req.OptionalTransactionMetadata))
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/testfixtures/validating.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,9 @@ func (vrwt validatingReadWriteTransaction) WriteRelationships(ctx context.Contex
return vrwt.delegate.WriteRelationships(ctx, mutations)
}

func (vrwt validatingReadWriteTransaction) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, options ...options.DeleteOptionsOption) (bool, error) {
func (vrwt validatingReadWriteTransaction) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, options ...options.DeleteOptionsOption) (uint64, bool, error) {
if err := filter.Validate(); err != nil {
return false, err
return 0, false, err
}

return vrwt.delegate.DeleteRelationships(ctx, filter, options...)
Expand Down
Loading

0 comments on commit b1eda0f

Please sign in to comment.