diff --git a/src/db/mod.rs b/src/db/mod.rs index 2916a4bfa..71e9a92c8 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -19,6 +19,12 @@ use crate::{ concern::{ReadConcern, WriteConcern}, cursor::Cursor, error::{Error, ErrorKind, Result}, + gridfs::{ + options::GridFsBucketOptions, + GridFsBucket, + DEFAULT_BUCKET_NAME, + DEFAULT_CHUNK_SIZE_BYTES, + }, operation::{Aggregate, AggregateTarget, Create, DropDatabase, ListCollections, RunCommand}, options::{ AggregateOptions, @@ -564,4 +570,26 @@ impl Database { .execute_watch_with_session(pipeline, options, target, None, session) .await } + + /// Creates a new GridFsBucket in the database with the given options. + pub fn gridfs_bucket(&self, options: impl Into>) -> GridFsBucket { + let mut options = options.into().unwrap_or_default(); + options.read_concern = options + .read_concern + .or_else(|| self.read_concern().cloned()); + options.write_concern = options + .write_concern + .or_else(|| self.write_concern().cloned()); + options.selection_criteria = options + .selection_criteria + .or_else(|| self.selection_criteria().cloned()); + options.bucket_name = options + .bucket_name + .or_else(|| Some(DEFAULT_BUCKET_NAME.to_string())); + options.chunk_size_bytes = options.chunk_size_bytes.or(Some(DEFAULT_CHUNK_SIZE_BYTES)); + GridFsBucket { + db: self.clone(), + options, + } + } } diff --git a/src/gridfs.rs b/src/gridfs.rs new file mode 100644 index 000000000..545a95828 --- /dev/null +++ b/src/gridfs.rs @@ -0,0 +1,331 @@ +pub mod options; + +use core::task::{Context, Poll}; +use std::pin::Pin; + +use crate::{ + concern::{ReadConcern, WriteConcern}, + cursor::Cursor, + error::Result, + selection_criteria::SelectionCriteria, + Database, +}; +use bson::{oid::ObjectId, Bson, DateTime, Document}; +use options::*; +use serde::{Deserialize, Serialize}; +use tokio::io::ReadBuf; + +pub const DEFAULT_BUCKET_NAME: &str = "fs"; +pub const DEFAULT_CHUNK_SIZE_BYTES: u32 = 255 * 1024; + +// Contained in a "chunks" collection for each user file +struct Chunk { + id: ObjectId, + files_id: Bson, + n: u32, + // default size is 255 KiB + data: Vec, +} + +/// A collection in which information about stored files is stored. There will be one files +/// collection document per stored file. +#[derive(Serialize, Deserialize)] +pub struct FilesCollectionDocument { + id: Bson, + length: i64, + chunk_size: u32, + upload_date: DateTime, + filename: String, + metadata: Document, +} + +/// Struct for storing GridFS managed files within a [`Database`]. +pub struct GridFsBucket { + // Contains a "chunks" collection + pub(crate) db: Database, + pub(crate) options: GridFsBucketOptions, +} + +// TODO: RUST-1395 Add documentation and example code for this struct. +pub struct GridFsUploadStream { + files_id: Bson, +} + +impl GridFsUploadStream { + /// Gets the file `id` for the stream. + pub fn files_id(&self) -> &Bson { + &self.files_id + } + + /// Consumes the stream and uploads data in the stream to the server. + pub async fn finish(self) { + todo!() + } + + /// Aborts the upload and discards the upload stream. + pub async fn abort(self) { + todo!() + } +} + +impl tokio::io::AsyncWrite for GridFsUploadStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + todo!() + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + todo!() + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + todo!() + } +} + +impl futures_util::AsyncWrite for GridFsUploadStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + todo!() + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + todo!() + } + + fn poll_close( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + todo!() + } +} + +pub struct GridFsDownloadStream { + files_id: Bson, +} + +impl GridFsDownloadStream { + /// Gets the file `id` for the stream. + pub fn files_id(&self) -> &Bson { + &self.files_id + } +} + +impl tokio::io::AsyncRead for GridFsDownloadStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + todo!() + } +} + +impl futures_util::io::AsyncRead for GridFsDownloadStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + todo!() + } +} + +impl GridFsBucket { + /// Gets the read concern of the [`GridFsBucket`]. + pub fn read_concern(&self) -> Option<&ReadConcern> { + self.options.read_concern.as_ref() + } + + /// Gets the write concern of the [`GridFsBucket`]. + pub fn write_concern(&self) -> Option<&WriteConcern> { + self.options.write_concern.as_ref() + } + + /// Gets the selection criteria of the [`GridFsBucket`]. + pub fn selection_criteria(&self) -> Option<&SelectionCriteria> { + self.options.selection_criteria.as_ref() + } + + /// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to. + /// The application provides a custom file id. + /// + /// Returns a [`GridFsUploadStream`] to which the application will write the contents. + pub async fn open_upload_stream_with_id( + &self, + id: Bson, + filename: String, + options: impl Into>, + ) -> Result { + todo!() + } + + /// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to. + /// The driver generates a unique [`Bson::ObjectId`] for the file id. + /// + /// Returns a [`GridFsUploadStream`] to which the application will write the contents. + pub async fn open_upload_stream( + &self, + filename: String, + options: impl Into>, + ) -> Result { + self.open_upload_stream_with_id(Bson::ObjectId(ObjectId::new()), filename, options) + .await + } + + /// Uploads a user file to a GridFS bucket. The application supplies a custom file id. Uses the + /// `tokio` crate's `AsyncRead` trait for the `source`. + pub async fn upload_from_tokio_reader_with_id( + &self, + id: Bson, + filename: String, + source: impl tokio::io::AsyncRead, + options: impl Into>, + ) { + todo!() + } + + /// Uploads a user file to a GridFS bucket. The application supplies a custom file id. Uses the + /// `futures-0.3` crate's `AsyncRead` trait for the `source`. + pub async fn upload_from_futures_0_3_reader_with_id( + &self, + id: Bson, + filename: String, + source: impl futures_util::AsyncRead, + options: impl Into>, + ) { + todo!() + } + + /// Uploads a user file to a GridFS bucket. The driver generates a unique [`Bson::ObjectId`] for + /// the file id. Uses the `tokio` crate's `AsyncRead` trait for the `source`. + pub async fn upload_from_tokio_reader( + &self, + filename: String, + source: impl tokio::io::AsyncRead, + options: impl Into>, + ) { + self.upload_from_tokio_reader_with_id( + Bson::ObjectId(ObjectId::new()), + filename, + source, + options, + ) + .await + } + + /// Uploads a user file to a GridFS bucket. The driver generates a unique [`Bson::ObjectId`] for + /// the file id. Uses the `futures-0.3` crate's `AsyncRead` trait for the `source`. + pub async fn upload_from_futures_0_3_reader( + &self, + filename: String, + source: impl futures_util::AsyncRead, + options: impl Into>, + ) { + self.upload_from_futures_0_3_reader_with_id( + Bson::ObjectId(ObjectId::new()), + filename, + source, + options, + ) + .await + } + + /// Opens and returns a [`GridFsDownloadStream`] from which the application can read + /// the contents of the stored file specified by `id`. + pub async fn open_download_stream(&self, id: Bson) -> Result { + todo!() + } + + /// Opens and returns a [`GridFsDownloadStream`] from which the application can read + /// the contents of the stored file specified by `filename` and the revision + /// in `options`. + pub async fn open_download_stream_by_name( + &self, + filename: String, + options: impl Into>, + ) -> Result { + todo!() + } + + /// Downloads the contents of the stored file specified by `id` and writes + /// the contents to the `destination`. Uses the `tokio` crate's `AsyncWrite` + /// trait for the `destination`. + pub async fn download_to_tokio_writer( + &self, + id: Bson, + destination: impl tokio::io::AsyncWrite, + ) { + todo!() + } + + /// Downloads the contents of the stored file specified by `id` and writes + /// the contents to the `destination`. Uses the `futures-0.3` crate's `AsyncWrite` + /// trait for the `destination`. + pub async fn download_to_futures_0_3_writer( + &self, + id: Bson, + destination: impl futures_util::AsyncWrite, + ) { + todo!() + } + + /// Downloads the contents of the stored file specified by `filename` and by + /// the revision in `options` and writes the contents to the `destination`. Uses the + /// `tokio` crate's `AsyncWrite` trait for the `destination`. + pub async fn download_to_tokio_writer_by_name( + &self, + filename: String, + destination: impl tokio::io::AsyncWrite, + options: impl Into>, + ) { + todo!() + } + + /// Downloads the contents of the stored file specified by `filename` and by + /// the revision in `options` and writes the contents to the `destination`. Uses the + /// `futures-0.3` crate's `AsyncWrite` trait for the `destination`. + pub async fn download_to_futures_0_3_writer_by_name( + &self, + filename: String, + destination: impl futures_util::AsyncWrite, + options: impl Into>, + ) { + todo!() + } + + /// Given an `id`, deletes the stored file's files collection document and + /// associated chunks from a [`GridFsBucket`]. + pub async fn delete(&self, id: Bson) { + todo!() + } + + /// Finds and returns the files collection documents that match the filter. + pub async fn find( + &self, + filter: Document, + options: impl Into>, + ) -> Result> { + todo!() + } + + /// Renames the stored file with the specified `id`. + pub async fn rename(&self, id: Bson, new_filename: String) { + todo!() + } + + /// Drops the files associated with this bucket. + pub async fn drop(&self) { + todo!() + } +} diff --git a/src/gridfs/options.rs b/src/gridfs/options.rs new file mode 100644 index 000000000..cc4a31014 --- /dev/null +++ b/src/gridfs/options.rs @@ -0,0 +1,89 @@ +use crate::{ + concern::{ReadConcern, WriteConcern}, + selection_criteria::SelectionCriteria, +}; +use serde::Deserialize; +use std::time::Duration; +use typed_builder::TypedBuilder; + +use bson::Document; + +/// Contains the options for creating a [`GridFsBucket`]. +#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] +#[builder(field_defaults(setter(into)))] +#[non_exhaustive] +pub struct GridFsBucketOptions { + /// The bucket name. Defaults to 'fs'. + pub bucket_name: Option, + + /// The chunk size in bytes used to break the user file into chunks. Defaults to 255 KiB. + pub chunk_size_bytes: Option, + + /// The write concern. Defaults to the write concern of the database. + pub write_concern: Option, + + /// The read concern. Defaults to the read concern of the database. + pub read_concern: Option, + + /// The selection criteria. Defaults to the selection criteria of the database. + pub selection_criteria: Option, +} + +/// Contains the options for creating a [`GridFsUploadStream`] to upload a file to a +/// [`GridFsBucket`]. +#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] +#[builder(field_defaults(setter(into)))] +#[non_exhaustive] +pub struct GridFsUploadOptions { + /// The number of bytes per chunk of this file. Defaults to the `chunk_size_bytes` specified + /// in the [`GridFsBucketOptions`]. + pub chunk_size_bytes: Option, + + /// User data for the 'metadata' field of the files collection document. + pub metadata: Option, +} + +/// Contains the options for creating a [`GridFsDownloadStream`] to retrieve a stored file +/// from a [`GridFsBucket`]. +#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] +#[builder(field_defaults(setter(into)))] +#[non_exhaustive] +pub struct GridFsDownloadByNameOptions { + /// Which revision (documents with the same filename and different `upload_date`) + /// of the file to retrieve. Defaults to -1 (the most recent revision). + /// + /// Revision numbers are defined as follows: + /// 0 = the original stored file + /// 1 = the first revision + /// 2 = the second revision + /// etc... + /// -2 = the second most recent revision + /// -1 = the most recent revision + pub revision: Option, +} + +/// Contains the options for performing a find operation on a files collection. +#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)] +#[builder(field_defaults(setter(into)))] +#[non_exhaustive] +pub struct GridFsFindOptions { + /// Enables writing to temporary files on the server. When set to true, the + /// server can write temporary data to disk while executing the find operation + /// on the files collection. + pub allow_disk_use: Option, + + /// The number of documents to return per batch. + pub batch_size: Option, + + /// The maximum number of documents to return. + pub limit: Option, + + /// The maximum amount of time to allow the query to run. + pub max_time: Option, + + /// The number of documents to skip before returning. + pub skip: Option, + + /// The order by which to sort results. Defaults to not sorting. + pub sort: Option, +} diff --git a/src/lib.rs b/src/lib.rs index 68b803005..738496ff4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -324,8 +324,9 @@ mod cursor; mod db; pub mod error; pub mod event; -mod index; +mod gridfs; mod hello; +mod index; mod operation; pub mod results; pub(crate) mod runtime; @@ -343,18 +344,18 @@ mod test; extern crate derive_more; pub use crate::{ - client::{Client, session::ClientSession}, + client::{session::ClientSession, Client}, coll::Collection, - cursor::{Cursor, session::{SessionCursor, SessionCursorStream}}, + cursor::{ + session::{SessionCursor, SessionCursorStream}, + Cursor, + }, db::Database, }; -pub use {coll::Namespace, index::IndexModel, client::session::ClusterTime, sdam::public::*}; +pub use {client::session::ClusterTime, coll::Namespace, index::IndexModel, sdam::public::*}; -#[cfg(all( - feature = "tokio-runtime", - feature = "async-std-runtime", -))] +#[cfg(all(feature = "tokio-runtime", feature = "async-std-runtime",))] compile_error!( "`tokio-runtime` and `async-std-runtime` can't both be enabled; either disable \ `async-std-runtime` or set `default-features = false` in your Cargo.toml"