Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sparse vectors support #20

Merged
merged 12 commits into from
Dec 7, 2023
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ ctrlc = "3.4.1"
futures = "0.3.29"
indicatif = "0.17.7"
memmap2 = "0.9.0"
qdrant-client = "1.6.0"
qdrant-client = { git = "https://github.com/qdrant/rust-client.git", branch = "dev" }
rand = "0.8.5"
serde = "1.0"
serde_json = "1.0"
Expand Down
16 changes: 12 additions & 4 deletions src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct Args {
#[clap(short, long)]
pub max_id: Option<usize>,

/// Number of dimensions in each vector
/// Number of dimensions in each dense vector or max dimension for sparse vectors
#[clap(short, long, default_value_t = 128)]
pub dim: usize,

Expand Down Expand Up @@ -115,9 +115,9 @@ pub struct Args {
#[clap(long, default_value_t = false)]
pub on_disk_payload: bool,

/// On disk hnsw
#[clap(long, default_value_t = false)]
pub on_disk_hnsw: bool,
/// On disk index
#[clap(long)]
pub on_disk_index: Option<bool>,

/// On disk vectors
#[clap(long)]
Expand Down Expand Up @@ -221,6 +221,14 @@ pub struct Args {
/// Skip un-indexed segments during search
#[clap(long)]
pub indexed_only: Option<bool>,

/// Whether to use sparse vectors and with how much sparsity
#[clap(long, value_name = "SPARSITY")]
pub sparse_vectors: Option<f64>,

/// Number of named vectors per point
#[clap(long, default_value_t = 1)]
pub sparse_vectors_per_point: usize,
}

#[derive(Copy, Clone, Debug)]
Expand Down
29 changes: 27 additions & 2 deletions src/common.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::args::Args;
use core::option::Option;
use core::option::Option::{None, Some};
use qdrant_client::client::{Payload, QdrantClient};
use qdrant_client::qdrant::r#match::MatchValue;
use qdrant_client::qdrant::{FieldCondition, Filter, Match, Range};
use qdrant_client::qdrant::{FieldCondition, Filter, Match, Range, Vector};
use rand::prelude::SliceRandom;
use rand::Rng;

Expand Down Expand Up @@ -111,7 +112,31 @@ pub fn random_filter(
have_any.then_some(filter)
}

pub fn random_vector(dim: usize) -> Vec<f32> {
pub fn random_vector(args: &Args) -> Vector {
random_dense_vector(args.dim).into()
}

/// Generate random sparse vector with random size and random values.
/// - `max_size` - maximum size of vector
/// - `sparsity` - how many non-zero values should be in vector
pub fn random_sparse_vector(max_size: usize, sparsity: f64) -> Vec<(u32, f32)> {
let mut rng = rand::thread_rng();
let size = rng.gen_range(1..max_size);
// (index, value)
let mut pairs = Vec::with_capacity(size);
for i in 1..=size {
// probability of skipping a dimension to make the vectors sparse
let skip = !rng.gen_bool(sparsity);
if skip {
continue;
}
// Only positive values are generated to make sure to hit the pruning path.
pairs.push((i as u32, rng.gen_range(0.0..10.0) as f32));
}
pairs
}

pub fn random_dense_vector(dim: usize) -> Vec<f32> {
let mut rng = rand::thread_rng();
(0..dim).map(|_| rng.gen_range(-1.0..1.0)).collect()
}
Expand Down
38 changes: 31 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod upsert;

use crate::args::QuantizationArg;
use crate::common::{
random_filter, random_payload, random_vector, INTEGERS_PAYLOAD_KEY, KEYWORD_PAYLOAD_KEY,
random_dense_vector, random_filter, random_payload, INTEGERS_PAYLOAD_KEY, KEYWORD_PAYLOAD_KEY,
};
use crate::fbin_reader::FBinReader;
use crate::search::SearchProcessor;
Expand All @@ -23,7 +23,8 @@ use qdrant_client::qdrant::vectors_config::Config;
use qdrant_client::qdrant::{
CollectionStatus, CompressionRatio, CreateCollection, Distance, FieldType, HnswConfigDiff,
OptimizersConfigDiff, ProductQuantization, QuantizationConfig, QuantizationType,
ScalarQuantization, VectorParams, VectorParamsMap, VectorsConfig,
ScalarQuantization, SparseIndexConfig, SparseVectorConfig, SparseVectorParams, VectorParams,
VectorParamsMap, VectorsConfig,
};
use rand::Rng;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -124,7 +125,7 @@ async fn recreate_collection(args: &Args, stopped: Arc<AtomicBool>) -> Result<()
..Default::default()
};

let vector_params = if args.vectors_per_point == 1 {
let dense_vector_params = if args.vectors_per_point == 1 {
Config::Params(_vector_param)
} else {
let params = (0..args.vectors_per_point)
Expand All @@ -134,14 +135,36 @@ async fn recreate_collection(args: &Args, stopped: Arc<AtomicBool>) -> Result<()
Config::ParamsMap(VectorParamsMap { map: params })
};

let vectors_config = Some(VectorsConfig {
config: Some(dense_vector_params),
});

let sparse_vectors_config = if args.sparse_vectors.is_some() {
let params = (0..args.sparse_vectors_per_point)
.map(|idx| {
(
format!("{idx}_sparse").to_string(),
SparseVectorParams {
index: Some(SparseIndexConfig {
full_scan_threshold: None,
on_disk: args.on_disk_index,
}),
},
)
})
.collect();

Some(SparseVectorConfig { map: params })
} else {
None
};

client
.create_collection(&CreateCollection {
collection_name: args.collection_name.clone(),
vectors_config: Some(VectorsConfig {
config: Some(vector_params),
}),
vectors_config,
hnsw_config: Some(HnswConfigDiff {
on_disk: Some(args.on_disk_hnsw),
on_disk: args.on_disk_index,
m: args.hnsw_m.map(|x| x as u64),
ef_construct: args.hnsw_ef_construct.map(|x| x as u64),
..Default::default()
Expand Down Expand Up @@ -200,6 +223,7 @@ async fn recreate_collection(args: &Args, stopped: Arc<AtomicBool>) -> Result<()
},
None => None,
},
sparse_vectors_config,
..Default::default()
})
.await?;
Expand Down
64 changes: 52 additions & 12 deletions src/search.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::common::{random_vector_name, retry_with_clients};
use crate::{random_filter, random_vector, Args};
use crate::common::{random_sparse_vector, random_vector_name, retry_with_clients};
use crate::{random_dense_vector, random_filter, Args};
use indicatif::ProgressBar;
use qdrant_client::client::QdrantClient;
use qdrant_client::qdrant::{QuantizationSearchParams, SearchParams, SearchPoints};
use qdrant_client::qdrant::{
QuantizationSearchParams, SearchParams, SearchPoints, SparseIndices, Vector,
};
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex};

Expand All @@ -27,6 +29,32 @@ impl SearchProcessor {
}
}

fn get_sparse_query(&self) -> (Vec<f32>, Option<SparseIndices>, Option<String>) {
if let Some(sparsity) = self.args.sparse_vectors {
let sparse_vector: Vector = random_sparse_vector(self.args.dim, sparsity).into();
let query_vector = sparse_vector.data;
let sparse_indices = sparse_vector.indices;
let name = format!(
"{}_sparse",
random_vector_name(self.args.sparse_vectors_per_point)
);
(query_vector, sparse_indices, Some(name))
} else {
panic!("No sparse vectors configured")
}
}

fn get_dense_query(&self) -> (Vec<f32>, Option<SparseIndices>, Option<String>) {
let query_vector = random_dense_vector(self.args.dim);
let sparse_indices = None;
if self.args.vectors_per_point > 1 {
let name = random_vector_name(self.args.vectors_per_point);
(query_vector, sparse_indices, Some(name))
} else {
(query_vector, sparse_indices, None)
}
}

pub async fn search(
&self,
_req_id: usize,
Expand All @@ -36,21 +64,30 @@ impl SearchProcessor {
return Ok(());
}

let query_vector = random_vector(self.args.dim);
let start = std::time::Instant::now();

let has_sparse = self.args.sparse_vectors.is_some();
let has_dense = self.args.vectors_per_point > 0;

let use_sparse = match (has_sparse, has_dense) {
(true, true) => rand::random::<bool>(),
(true, false) => true,
(false, true) => false,
(false, false) => panic!("No sparse or dense vectors"),
};

let (query_vector, sparse_indices, vector_name) = if use_sparse {
self.get_sparse_query()
} else {
self.get_dense_query()
};

let query_filter = random_filter(
self.args.keywords,
self.args.float_payloads,
self.args.int_payloads,
);

let start = std::time::Instant::now();

let vector_name = if self.args.vectors_per_point > 1 {
Some(random_vector_name(self.args.vectors_per_point))
} else {
None
};

let request = SearchPoints {
collection_name: self.args.collection_name.to_string(),
vector: query_vector,
Expand All @@ -72,6 +109,9 @@ impl SearchProcessor {
vector_name,
with_vectors: None,
read_consistency: self.args.read_consistency.map(Into::into),
timeout: None,
shard_key_selector: None,
sparse_indices,
};

let res =
Expand Down
40 changes: 34 additions & 6 deletions src/upsert.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::common::retry_with_clients;
use crate::common::{random_sparse_vector, random_vector, retry_with_clients};
use crate::fbin_reader::FBinReader;
use crate::{random_payload, random_vector, Args};
use crate::{random_dense_vector, random_payload, Args};
use anyhow::Error;
use indicatif::ProgressBar;
use qdrant_client::client::QdrantClient;
use qdrant_client::qdrant::point_id::PointIdOptions;
use qdrant_client::qdrant::{PointId, PointStruct, PointsSelector, Vectors};
use qdrant_client::qdrant::vectors::VectorsOptions;
use qdrant_client::qdrant::{PointId, PointStruct, PointsSelector, Vector, Vectors};
use rand::Rng;
use std::cmp::min;
use std::collections::HashMap;
Expand Down Expand Up @@ -71,17 +72,44 @@ impl UpsertProcessor {

let vectors: Vectors = if let Some(reader) = &self.reader {
reader.read_vector(idx as usize).to_vec().into()
} else if self.args.vectors_per_point > 1 {
} else if self.args.vectors_per_point != 1 {
let vectors_map: HashMap<_, _> = (0..self.args.vectors_per_point)
.map(|i| {
let vector_name = format!("{}", i);
let vector = random_vector(self.args.dim);
let vector = random_vector(&self.args);
(vector_name, vector)
})
.collect();
vectors_map.into()
} else {
random_vector(self.args.dim).into()
random_dense_vector(self.args.dim).into()
};

let vectors: Vectors = if let Some(sparsity) = self.args.sparse_vectors {
let mut vectors_map: HashMap<_, _> = Default::default();

for i in 0..self.args.sparse_vectors_per_point {
let vector_name = format!("{}_sparse", i);
let vector = Vector::from(random_sparse_vector(self.args.dim, sparsity));
vectors_map.insert(vector_name, vector);
}

match vectors.vectors_options {
None => {}
Some(vectors) => match vectors {
VectorsOptions::Vector(vector) => {
vectors_map.insert("".to_string(), vector);
}
VectorsOptions::Vectors(vectors) => {
for (name, vector) in vectors.vectors.into_iter() {
vectors_map.insert(name, vector);
}
}
},
}
vectors_map.into()
} else {
vectors
};

points.push(PointStruct::new(
Expand Down