Skip to content

Commit

Permalink
Support new object search (#676)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov authored Jan 10, 2025
2 parents 5f693d2 + 71d457a commit 74d6159
Show file tree
Hide file tree
Showing 40 changed files with 2,539 additions and 959 deletions.
15 changes: 13 additions & 2 deletions client/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,17 @@ var (
proto.Clone(validProtoObjectIDs[2]).(*protorefs.ObjectID),
},
}
// correct ObjectService.SearchV2 response payload with required fields only.
validMinSearchV2ResponseBody = &protoobject.SearchV2Response_Body{}
// correct ObjectService.SearchV2 response payload with all fields.
validFullSearchV2ResponseBody = &protoobject.SearchV2Response_Body{
Result: []*protoobject.SearchV2Response_OIDWithMeta{
{Id: proto.Clone(validProtoObjectIDs[0]).(*protorefs.ObjectID), Attributes: []string{"val_1_1", "val_1_2"}},
{Id: proto.Clone(validProtoObjectIDs[1]).(*protorefs.ObjectID), Attributes: []string{"val_2_1", "val_2_2"}},
{Id: proto.Clone(validProtoObjectIDs[2]).(*protorefs.ObjectID), Attributes: []string{"val_3_1", "val_3_2"}},
},
Cursor: "any_cursor",
}
)

// Reputation service.
Expand Down Expand Up @@ -1840,7 +1851,7 @@ func checkSplitInfoTransport(s object.SplitInfo, m *protoobject.SplitInfo) error
return nil
}

func checkObjectSearchFilterTransport(f object.SearchFilter, m *protoobject.SearchRequest_Body_Filter) error {
func checkObjectSearchFilterTransport(f object.SearchFilter, m *protoobject.SearchFilter) error {
// 1. matcher
var expMatcher protoobject.MatchType
switch m := f.Operation(); m {
Expand Down Expand Up @@ -1877,7 +1888,7 @@ func checkObjectSearchFilterTransport(f object.SearchFilter, m *protoobject.Sear
return nil
}

func checkObjectSearchFiltersTransport(fs []object.SearchFilter, ms []*protoobject.SearchRequest_Body_Filter) error {
func checkObjectSearchFiltersTransport(fs []object.SearchFilter, ms []*protoobject.SearchFilter) error {
if v1, v2 := len(fs), len(ms); v1 != v2 {
return fmt.Errorf("number of attributes (client: %d, message: %d)", v1, v2)
}
Expand Down
2 changes: 1 addition & 1 deletion client/object_replicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func BenchmarkPrepareReplicationMessage(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
for range b.N {
_, err = prepareReplicateMessage(id, bytes.NewReader(bObj), signer, true)
require.NoError(b, err)
}
Expand Down
215 changes: 214 additions & 1 deletion client/object_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"slices"
"time"

"github.com/nspcc-dev/neofs-sdk-go/bearer"
Expand All @@ -16,11 +17,223 @@ import (
protoobject "github.com/nspcc-dev/neofs-sdk-go/proto/object"
"github.com/nspcc-dev/neofs-sdk-go/proto/refs"
protosession "github.com/nspcc-dev/neofs-sdk-go/proto/session"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/stat"
"github.com/nspcc-dev/neofs-sdk-go/user"
"github.com/nspcc-dev/neofs-sdk-go/version"
)

const (
defaultSearchObjectsQueryVersion = 1

maxSearchObjectsCount = 1000
maxSearchObjectsFilterCount = 8
maxSearchObjectsAttrCount = 4
)

// SearchResultItem groups data of an object matching particular search query.
type SearchResultItem struct {
ID oid.ID
Attributes []string
}

// SearchObjectsOptions groups optional parameters of [Client.SearchObjects].
type SearchObjectsOptions struct {
prmCommonMeta
sessionToken *session.Object
bearerToken *bearer.Token
noForwarding bool

count uint32
}

// DisableForwarding disables request forwarding by the server and limits
// execution to its local storage. Mostly used for system purposes.
func (x *SearchObjectsOptions) DisableForwarding() { x.noForwarding = true }

// WithSessionToken specifies session token to attach to the request. The token
// must be issued for the request signer and target the requested container and
// operation.
func (x *SearchObjectsOptions) WithSessionToken(st session.Object) { x.sessionToken = &st }

// WithBearerToken specifies bearer token to attach to the request. The token
// must be issued by the container owner for the request signer.
func (x *SearchObjectsOptions) WithBearerToken(bt bearer.Token) { x.bearerToken = &bt }

// SetCount limits the search result to a given number. Must be in [1, 1000]
// range. Defaults to 1000.
func (x *SearchObjectsOptions) SetCount(count uint32) { x.count = count }

// SearchObjects selects objects from given container by applying specified
// filters, collects values of requested attributes and returns the result
// sorted. Elements are compared by attributes' values lexicographically in
// priority from first to last, closing with the default sorting by IDs. System
// attributes can be included using special aliases like
// [object.FilterPayloadSize]. SearchObjects also returns opaque continuation
// cursor: when passed to a repeat call, it specifies where to continue the
// operation from. To start the search anew, pass an empty cursor.
//
// Max number of filters is 8. Max number of attributes is 4. If attributes are
// specified, filters must include the 1st of them.
//
// Note that if requested attribute is missing in the matching object,
// corresponding element in its [SearchResultItem.Attributes] is empty.
func (c *Client) SearchObjects(ctx context.Context, cnr cid.ID, filters object.SearchFilters, attrs []string, cursor string,
signer neofscrypto.Signer, opts SearchObjectsOptions) ([]SearchResultItem, string, error) {
var err error
if c.prm.statisticCallback != nil {
startTime := time.Now()
defer func() {
c.sendStatistic(stat.MethodObjectSearchV2, time.Since(startTime), err)
}()
}

switch {
case signer == nil:
return nil, "", ErrMissingSigner
case cnr.IsZero():
err = cid.ErrZero
return nil, "", err
case opts.count > maxSearchObjectsCount:
err = fmt.Errorf("count is out of [1, %d] range", maxSearchObjectsCount)
return nil, "", err
case len(filters) > maxSearchObjectsFilterCount:
err = fmt.Errorf("more than %d filters", maxSearchObjectsFilterCount)
return nil, "", err
case len(attrs) > 0:
if len(attrs) > maxSearchObjectsAttrCount {
err = fmt.Errorf("more than %d attributes", maxSearchObjectsAttrCount)
return nil, "", err
}
for i := range attrs {
if attrs[i] == "" {
err = fmt.Errorf("empty attribute #%d", i)
return nil, "", err
}
for j := i + 1; j < len(attrs); j++ {
if attrs[i] == attrs[j] {
err = fmt.Errorf("duplicated attribute %q", attrs[i])
return nil, "", err
}
}
}
if !slices.ContainsFunc(filters, func(f object.SearchFilter) bool { return f.Header() == attrs[0] }) {
err = fmt.Errorf("attribute %q is requested but not filtered", attrs[0])
return nil, "", err
}
}

if opts.count == 0 {
opts.count = maxSearchObjectsCount
}

req := &protoobject.SearchV2Request{
Body: &protoobject.SearchV2Request_Body{
ContainerId: cnr.ProtoMessage(),
Version: defaultSearchObjectsQueryVersion,
Filters: filters.ProtoMessage(),
Cursor: cursor,
Count: opts.count,
Attributes: attrs,
},
MetaHeader: &protosession.RequestMetaHeader{
Version: version.Current().ProtoMessage(),
},
}
writeXHeadersToMeta(opts.xHeaders, req.MetaHeader)
if opts.noForwarding {
req.MetaHeader.Ttl = localRequestTTL
} else {
req.MetaHeader.Ttl = defaultRequestTTL
}
if opts.sessionToken != nil {
req.MetaHeader.SessionToken = opts.sessionToken.ProtoMessage()
}
if opts.bearerToken != nil {
req.MetaHeader.BearerToken = opts.bearerToken.ProtoMessage()
}

buf := c.buffers.Get().(*[]byte)
defer func() { c.buffers.Put(buf) }()

req.VerifyHeader, err = neofscrypto.SignRequestWithBuffer[*protoobject.SearchV2Request_Body](signer, req, *buf)
if err != nil {
err = fmt.Errorf("%w: %w", errSignRequest, err)
return nil, "", err
}

resp, err := c.object.SearchV2(ctx, req)
if err != nil {
err = rpcErr(err)
return nil, "", err
}

if c.prm.cbRespInfo != nil {
err = c.prm.cbRespInfo(ResponseMetaInfo{
key: resp.GetVerifyHeader().GetBodySignature().GetKey(),
epoch: resp.GetMetaHeader().GetEpoch(),
})
if err != nil {
err = fmt.Errorf("%w: %w", errResponseCallback, err)
return nil, "", err
}
}

if err = neofscrypto.VerifyResponseWithBuffer[*protoobject.SearchV2Response_Body](resp, *buf); err != nil {
err = fmt.Errorf("%w: %w", errResponseSignatures, err)
return nil, "", err
}

if err = apistatus.ToError(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, "", err
}

if resp.Body == nil {
return nil, "", nil
}

n := uint32(len(resp.Body.Result))
const cursorField = "cursor"
if n == 0 {
if resp.Body.Cursor != "" {
err = newErrInvalidResponseField(cursorField, errors.New("set while result is empty"))
return nil, "", err
}
return nil, "", nil
}
if cursor != "" && resp.Body.Cursor == cursor {
err = newErrInvalidResponseField(cursorField, errors.New("repeats the initial one"))
return nil, "", err
}
const resultField = "result"
if n > opts.count {
err = newErrInvalidResponseField(resultField, fmt.Errorf("more items than requested: %d", n))
return nil, "", err
}

res := make([]SearchResultItem, n)
for i, r := range resp.Body.Result {
switch {
case r == nil:
err = newErrInvalidResponseField(resultField, fmt.Errorf("nil element #%d", i))
return nil, "", err
case r.Id == nil:
err = newErrInvalidResponseField(resultField, fmt.Errorf("invalid element #%d: missing ID", i))
return nil, "", err
case len(r.Attributes) != len(attrs):
err = newErrInvalidResponseField(resultField, fmt.Errorf("invalid element #%d: wrong attribute count %d", i, len(r.Attributes)))
return nil, "", err
}
if err = res[i].ID.FromProtoMessage(r.Id); err != nil {
err = newErrInvalidResponseField(resultField, fmt.Errorf("invalid element #%d: invalid ID: %w", i, err))
return nil, "", err
}
res[i].Attributes = r.Attributes
}

return res, resp.Body.Cursor, nil
}

// PrmObjectSearch groups optional parameters of ObjectSearch operation.
type PrmObjectSearch struct {
sessionContainer
Expand Down Expand Up @@ -216,7 +429,7 @@ func (c *Client) ObjectSearchInit(ctx context.Context, containerID cid.ID, signe
req := &protoobject.SearchRequest{
Body: &protoobject.SearchRequest_Body{
ContainerId: containerID.ProtoMessage(),
Version: 1,
Version: defaultSearchObjectsQueryVersion,
Filters: prm.filters.ProtoMessage(),
},
MetaHeader: &protosession.RequestMetaHeader{
Expand Down
Loading

0 comments on commit 74d6159

Please sign in to comment.