Skip to content

Commit

Permalink
Add Discovery API and local mode (#368)
Browse files Browse the repository at this point in the history
* add score calculations

* fix type errors, route scorers accordingly

* generate grpc client (only discovery stuff)

* generate rest client

* discover preprocessing + local mode handling

* implement discover for remote client

* add failing tests, fix some stuff

* update rest client and local mode

* fixes and more tests

* fix mypy errors

* generate async client

* tiny edit docstring

* simplify custom distance calculation,
fix euclidean case for recommend best score

* fix python version for generating grpc client

* update grpc (select only discovery changes)

* generate grpc: add also ShardKey changes,
fix conversion changes for deprecated PointsUpdateOperation operations

* fix coverage on conversions

* generate async client

* generate async client with python 3.11

* change dict -> typings.Dict

* add deprecated field handling for grpc.PointsUpdateOperation

* update grpc client

* fix grpc conversion issues and review comments

* increase coverage

* also test for prefer_grpc = True

* fix has_field

* unflake test_context_many_pairs

* don't convert if None in model conversions

* move docstring to qdrant_client.py

* handle request timeout on all other interfaces

* oopsie: define timeout param in search_batch

* generate async client
  • Loading branch information
coszio authored Nov 22, 2023
1 parent d5b00c7 commit d3332ea
Show file tree
Hide file tree
Showing 31 changed files with 3,191 additions and 848 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ dist
local_cache/*/*
.python-version
docs/source/examples/local_cache/*
docs/source/examples/path/to/db/*
docs/source/examples/path/to/db/*
1,055 changes: 573 additions & 482 deletions poetry.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ fastembed = [

[tool.poetry.group.dev.dependencies]
pytest = "^7.1"
grpcio-tools = "^1.46.0"
grpcio-tools = [
# the proper version to use for generating grpc client is python 3.10.10
{ version = "1.48.2", python = "3.10" },
{ version = "^1.46" }
]
coverage = "^6.3.3"
pytest-asyncio = "^0.21.0"
pytest-timeout = "^2.1.0"
Expand Down
23 changes: 23 additions & 0 deletions qdrant_client/async_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,29 @@ async def recommend_groups(
) -> types.GroupsResult:
raise NotImplementedError()

async def discover(
self,
collection_name: str,
target: Optional[types.TargetVector] = None,
context: Optional[Sequence[types.ContextExamplePair]] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
offset: int = 0,
with_payload: Union[bool, List[str], types.PayloadSelector] = True,
with_vectors: Union[bool, List[str]] = False,
using: Optional[str] = None,
lookup_from: Optional[types.LookupLocation] = None,
consistency: Optional[types.ReadConsistency] = None,
**kwargs: Any,
) -> List[types.ScoredPoint]:
raise NotImplementedError()

async def discover_batch(
self, collection_name: str, requests: Sequence[types.DiscoverRequest], **kwargs: Any
) -> List[List[types.ScoredPoint]]:
raise NotImplementedError()

async def scroll(
self,
collection_name: str,
Expand Down
144 changes: 140 additions & 4 deletions qdrant_client/async_qdrant_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ async def search_batch(
self,
collection_name: str,
requests: Sequence[types.SearchRequest],
timeout: Optional[int] = None,
consistency: Optional[types.ReadConsistency] = None,
**kwargs: Any,
) -> List[List[types.ScoredPoint]]:
Expand All @@ -191,13 +192,19 @@ async def search_batch(
- 'majority' - query all replicas, but return values present in the majority of replicas
- 'quorum' - query the majority of replicas, return values present in all of them
- 'all' - query all replicas, and return values present in all replicas
timeout:
Overrides global timeout for this search. Unit is seconds.
Returns:
List of search responses
"""
assert len(kwargs) == 0, f"Unknown arguments: {list(kwargs.keys())}"
return await self._client.search_batch(
collection_name=collection_name, requests=requests, consistency=consistency, **kwargs
collection_name=collection_name,
requests=requests,
consistency=consistency,
timeout=timeout,
**kwargs,
)

async def search(
Expand All @@ -215,6 +222,7 @@ async def search(
score_threshold: Optional[float] = None,
append_payload: bool = True,
consistency: Optional[types.ReadConsistency] = None,
timeout: Optional[int] = None,
**kwargs: Any,
) -> List[types.ScoredPoint]:
"""Search for closest vectors in collection taking into account filtering conditions
Expand Down Expand Up @@ -258,6 +266,8 @@ async def search(
- 'majority' - query all replicas, but return values present in the majority of replicas
- 'quorum' - query the majority of replicas, return values present in all of them
- 'all' - query all replicas, and return values present in all replicas
timeout:
Overrides global timeout for this search. Unit is seconds.
Examples:
Expand Down Expand Up @@ -294,6 +304,7 @@ async def search(
score_threshold=score_threshold,
append_payload=append_payload,
consistency=consistency,
timeout=timeout,
**kwargs,
)

Expand All @@ -313,6 +324,7 @@ async def search_groups(
score_threshold: Optional[float] = None,
with_lookup: Optional[types.WithLookupInterface] = None,
consistency: Optional[types.ReadConsistency] = None,
timeout: Optional[int] = None,
**kwargs: Any,
) -> types.GroupsResult:
"""Search for closest vectors grouped by payload field.
Expand Down Expand Up @@ -363,6 +375,8 @@ async def search_groups(
- 'majority' - query all replicas, but return values present in the majority of replicas
- 'quorum' - query the majority of replicas, return values present in all of them
- 'all' - query all replicas, and return values present in all replicas
timeout:
Overrides global timeout for this search. Unit is seconds.
Returns:
List of groups with not more than `group_size` hits in each group.
Expand All @@ -380,8 +394,9 @@ async def search_groups(
with_payload=with_payload,
with_vectors=with_vectors,
score_threshold=score_threshold,
consistency=consistency,
with_lookup=with_lookup,
consistency=consistency,
timeout=timeout,
**kwargs,
)

Expand All @@ -390,6 +405,7 @@ async def recommend_batch(
collection_name: str,
requests: Sequence[types.RecommendRequest],
consistency: Optional[types.ReadConsistency] = None,
timeout: Optional[int] = None,
**kwargs: Any,
) -> List[List[types.ScoredPoint]]:
"""Perform multiple recommend requests in batch mode
Expand All @@ -404,13 +420,19 @@ async def recommend_batch(
- 'majority' - query all replicas, but return values present in the majority of replicas
- 'quorum' - query the majority of replicas, return values present in all of them
- 'all' - query all replicas, and return values present in all replicas
timeout:
Overrides global timeout for this search. Unit is seconds.
Returns:
List of recommend responses
"""
assert len(kwargs) == 0, f"Unknown arguments: {list(kwargs.keys())}"
return await self._client.recommend_batch(
collection_name=collection_name, requests=requests, consistency=consistency, **kwargs
collection_name=collection_name,
requests=requests,
consistency=consistency,
timeout=timeout,
**kwargs,
)

async def recommend(
Expand All @@ -429,6 +451,7 @@ async def recommend(
lookup_from: Optional[types.LookupLocation] = None,
strategy: Optional[types.RecommendStrategy] = None,
consistency: Optional[types.ReadConsistency] = None,
timeout: Optional[int] = None,
**kwargs: Any,
) -> List[types.ScoredPoint]:
"""Recommend points: search for similar points based on already stored in Qdrant examples.
Expand Down Expand Up @@ -495,6 +518,8 @@ async def recommend(
- 'average_vector' - calculates average vector of all examples and uses it for search
- 'best_score' - finds the result which is closer to positive examples and further from negative
timeout:
Overrides global timeout for this search. Unit is seconds.
Returns:
List of recommended points with similarity scores.
Expand All @@ -515,6 +540,7 @@ async def recommend(
lookup_from=lookup_from,
consistency=consistency,
strategy=strategy,
timeout=timeout,
**kwargs,
)

Expand All @@ -536,6 +562,7 @@ async def recommend_groups(
with_lookup: Optional[types.WithLookupInterface] = None,
strategy: Optional[types.RecommendStrategy] = None,
consistency: Optional[types.ReadConsistency] = None,
timeout: Optional[int] = None,
**kwargs: Any,
) -> types.GroupsResult:
"""Recommend point groups: search for similar points based on already stored in Qdrant examples
Expand Down Expand Up @@ -609,6 +636,8 @@ async def recommend_groups(
- 'average_vector' - calculates average vector of all examples and uses it for search
- 'best_score' - finds the result which is closer to positive examples and further from negative
timeout:
Overrides global timeout for this search. Unit is seconds.
Returns:
List of groups with not more than `group_size` hits in each group.
Expand All @@ -630,9 +659,116 @@ async def recommend_groups(
with_vectors=with_vectors,
using=using,
lookup_from=lookup_from,
consistency=consistency,
with_lookup=with_lookup,
strategy=strategy,
consistency=consistency,
timeout=timeout,
**kwargs,
)

async def discover(
self,
collection_name: str,
target: Optional[types.TargetVector] = None,
context: Optional[Sequence[types.ContextExamplePair]] = None,
query_filter: Optional[types.Filter] = None,
search_params: Optional[types.SearchParams] = None,
limit: int = 10,
offset: int = 0,
with_payload: Union[bool, List[str], types.PayloadSelector] = True,
with_vectors: Union[bool, List[str]] = False,
using: Optional[str] = None,
lookup_from: Optional[types.LookupLocation] = None,
consistency: Optional[types.ReadConsistency] = None,
timeout: Optional[int] = None,
**kwargs: Any,
) -> List[types.ScoredPoint]:
"""
Use context and a target to find the most similar points, constrained by the context.
Args:
target:
Look for vectors closest to this.
When using the target (with or without context), the integer part of the score represents the rank with respect to the context, while the decimal part of the score relates to the distance to the target.
context:
Pairs of { positive, negative } examples to constrain the search.
When using only the context (without a target), a special search - called context search - is performed where pairs of points are used to generate a loss that guides the search towards the zone where most positive examples overlap. This means that the score minimizes the scenario of finding a point closer to a negative than to a positive part of a pair.
Since the score of a context relates to loss, the maximum score a point can get is 0.0, and it becomes normal that many points can have a score of 0.0.
For discovery search (when including a target), the context part of the score for each pair is calculated +1 if the point is closer to a positive than to a negative part of a pair, and -1 otherwise.
query_filter:
Look only for points which satisfies this conditions
saerch_params:
Additional search params
limit:
Max number of result to return
offset:
Offset of the first result to return. May be used to paginate results. Note: large offset values may cause performance issues.
with_payload:
Select which payload to return with the response. Default: None
with_vectors:
Whether to return the point vector with the result?
using:
Define which vector to use for recommendation, if not specified - try to use default vector.
lookup_from:
The location used to lookup vectors. If not specified - use current collection. Note: the other collection should have the same vector size as the current collection.
consistency:
Read consistency of the search. Defines how many replicas should be queried before returning the result. Values:
- int - number of replicas to query, values should present in all queried replicas
- 'majority' - query all replicas, but return values present in the majority of replicas
- 'quorum' - query the majority of replicas, return values present in all of them
- 'all' - query all replicas, and return values present in all replicas
timeout:
Overrides global timeout for this search. Unit is seconds.
Returns:
List of discovered points with discovery or context scores, accordingly.
"""
return await self._client.discover(
collection_name=collection_name,
target=target,
context=context,
query_filter=query_filter,
search_params=search_params,
limit=limit,
offset=offset,
with_payload=with_payload,
with_vectors=with_vectors,
using=using,
lookup_from=lookup_from,
consistency=consistency,
timeout=timeout,
**kwargs,
)

async def discover_batch(
self,
collection_name: str,
requests: Sequence[types.DiscoverRequest],
consistency: Optional[types.ReadConsistency] = None,
timeout: Optional[int] = None,
**kwargs: Any,
) -> List[List[types.ScoredPoint]]:
return await self._client.discover_batch(
collection_name=collection_name,
requests=requests,
consistency=consistency,
timeout=timeout,
**kwargs,
)

Expand Down
Loading

0 comments on commit d3332ea

Please sign in to comment.