1
1
use async_trait:: async_trait;
2
- use mongodb:: bson:: { doc, Document } ;
2
+ use mongodb:: bson:: { doc, DateTime , Document } ;
3
3
use mongodb:: { options:: ClientOptions , Client } ;
4
4
use serde:: { de:: DeserializeOwned , Serialize } ;
5
5
use tracing:: { event, span, trace, warn, Level } ;
@@ -18,6 +18,31 @@ pub struct MongoDbConn {
18
18
pub index : MongoDbIndex ,
19
19
}
20
20
21
+ impl MongoDbConn {
22
+ /// Creates a TTL index on the expiry field.
23
+ ///
24
+ /// NOTE: This function will need to be called in the main function when initialising a MongoDB connection.
25
+ pub async fn create_ttl_index ( & self ) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > {
26
+ let collection = self
27
+ . client
28
+ . database ( & self . index . db_name )
29
+ . collection :: < Document > ( & self . index . coll_name ) ;
30
+
31
+ // Create TTL index on the 'expiry' field
32
+ let index_model = mongodb:: IndexModel :: builder ( )
33
+ . keys ( doc ! { "expiry" : 1 } )
34
+ . options ( Some (
35
+ mongodb:: options:: IndexOptions :: builder ( )
36
+ . expire_after ( Some ( std:: time:: Duration :: from_secs ( 0 ) ) )
37
+ . build ( ) ,
38
+ ) )
39
+ . build ( ) ;
40
+
41
+ collection. create_index ( index_model, None ) . await ?;
42
+ Ok ( ( ) )
43
+ }
44
+ }
45
+
21
46
#[ async_trait]
22
47
impl KvStoreConnection for MongoDbConn {
23
48
async fn init ( url : & str ) -> Result < Self , Box < dyn std:: error:: Error + Send + Sync > > {
@@ -47,7 +72,7 @@ impl KvStoreConnection for MongoDbConn {
47
72
Ok ( MongoDbConn { client, index } )
48
73
}
49
74
50
- async fn set_data < T : Serialize + std:: marker:: Send > (
75
+ async fn set_data < T : Serialize + std:: marker:: Send + DeserializeOwned > (
51
76
& mut self ,
52
77
key : & str ,
53
78
value : T ,
@@ -61,20 +86,32 @@ impl KvStoreConnection for MongoDbConn {
61
86
. database ( & self . index . db_name )
62
87
. collection :: < Document > ( & self . index . coll_name ) ;
63
88
64
- let document = match mongodb:: bson:: to_document ( & value) {
65
- Ok ( document) => document,
66
- Err ( e) => {
67
- event ! ( Level :: ERROR , "Failed to serialize data with error: {e}" ) ;
68
- Document :: new ( )
69
- }
89
+ // Check if the document with the given key exists
90
+ let filter = doc ! { "_id" : key } ;
91
+ let existing_doc = collection. find_one ( filter. clone ( ) , None ) . await ?;
92
+
93
+ let mut vec: Vec < T > = if let Some ( doc) = existing_doc {
94
+ // Deserialize the existing data
95
+ mongodb:: bson:: from_bson ( doc. get ( "data" ) . unwrap ( ) . clone ( ) ) ?
96
+ } else {
97
+ Vec :: new ( )
70
98
} ;
71
99
72
- let filter = doc ! { "_id" : key } ;
100
+ // Append the new data to the vec
101
+ vec. push ( value) ;
102
+
103
+ // Serialize the vec back to a BSON array
104
+ let serialized_vec = mongodb:: bson:: to_bson ( & vec) ?;
105
+
106
+ // Create or update the document
107
+ let update = doc ! {
108
+ "$set" : { "data" : serialized_vec }
109
+ } ;
73
110
match collection
74
- . replace_one (
111
+ . update_one (
75
112
filter,
76
- document . clone ( ) ,
77
- mongodb:: options:: ReplaceOptions :: builder ( )
113
+ update ,
114
+ mongodb:: options:: UpdateOptions :: builder ( )
78
115
. upsert ( true )
79
116
. build ( ) ,
80
117
)
@@ -86,32 +123,122 @@ impl KvStoreConnection for MongoDbConn {
86
123
}
87
124
} ;
88
125
89
- trace ! ( "Data set successfully" ) ;
126
+ trace ! ( "Data set successfully with expiry" ) ;
127
+
128
+ Ok ( ( ) )
129
+ }
130
+
131
+ async fn set_data_with_expiry < T : Serialize + std:: marker:: Send + DeserializeOwned > (
132
+ & mut self ,
133
+ key : & str ,
134
+ value : T ,
135
+ seconds : usize ,
136
+ ) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > {
137
+ // Tracing
138
+ let span = span ! ( Level :: TRACE , "MongoDbConn::set_data_with_expiry" ) ;
139
+ let _enter = span. enter ( ) ;
140
+
141
+ let collection = self
142
+ . client
143
+ . database ( & self . index . db_name )
144
+ . collection :: < Document > ( & self . index . coll_name ) ;
145
+
146
+ // Check if the document with the given key exists
147
+ let filter = doc ! { "_id" : key } ;
148
+ let existing_doc = collection. find_one ( filter. clone ( ) , None ) . await ?;
149
+
150
+ let mut vec: Vec < T > = if let Some ( doc) = existing_doc {
151
+ // Deserialize the existing data
152
+ mongodb:: bson:: from_bson ( doc. get ( "data" ) . unwrap ( ) . clone ( ) ) ?
153
+ } else {
154
+ Vec :: new ( )
155
+ } ;
156
+
157
+ // Append the new data to the vec
158
+ vec. push ( value) ;
159
+
160
+ // Serialize the vec back to a BSON array
161
+ let serialized_vec = mongodb:: bson:: to_bson ( & vec) ?;
162
+
163
+ // Calculate the expiry time
164
+ let expiry_time = ( seconds * 1000 ) as i64 ;
165
+ let expiry_bson_datetime = DateTime :: from_millis ( expiry_time) ;
166
+
167
+ // Create or update the document with the new expiry time
168
+ let update = doc ! {
169
+ "$set" : {
170
+ "data" : serialized_vec,
171
+ "expiry" : expiry_bson_datetime,
172
+ }
173
+ } ;
174
+ collection
175
+ . update_one (
176
+ filter,
177
+ update,
178
+ mongodb:: options:: UpdateOptions :: builder ( )
179
+ . upsert ( true )
180
+ . build ( ) ,
181
+ )
182
+ . await ?;
183
+
184
+ trace ! ( "Data set successfully with expiry" ) ;
185
+
186
+ Ok ( ( ) )
187
+ }
188
+
189
+ async fn delete_data (
190
+ & mut self ,
191
+ key : & str ,
192
+ ) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > {
193
+ // Tracing
194
+ let span = span ! ( Level :: TRACE , "MongoDbConn::delete_data" ) ;
195
+ let _enter = span. enter ( ) ;
196
+
197
+ let collection = self
198
+ . client
199
+ . database ( & self . index . db_name )
200
+ . collection :: < Document > ( & self . index . coll_name ) ;
201
+
202
+ let filter = doc ! { "_id" : key } ;
203
+ match collection. delete_one ( filter, None ) . await {
204
+ Ok ( _) => ( ) ,
205
+ Err ( e) => {
206
+ event ! ( Level :: ERROR , "Failed to delete data with error: {e}" ) ;
207
+ }
208
+ } ;
209
+
210
+ trace ! ( "Data deleted successfully" ) ;
90
211
91
212
Ok ( ( ) )
92
213
}
93
214
94
215
async fn get_data < T : DeserializeOwned > (
95
216
& mut self ,
96
217
key : & str ,
97
- ) -> Result < Option < T > , Box < dyn std:: error:: Error + Send + Sync > > {
218
+ ) -> Result < Option < Vec < T > > , Box < dyn std:: error:: Error + Send + Sync > > {
98
219
// Tracing
99
220
let span = span ! ( Level :: TRACE , "MongoDbConn::get_data" ) ;
100
221
let _enter = span. enter ( ) ;
101
222
102
223
let collection = self
103
224
. client
104
225
. database ( & self . index . db_name )
105
- . collection :: < Document > ( & self . index . coll_name ) ; // Change to your actual collection name
226
+ . collection :: < Document > ( & self . index . coll_name ) ;
106
227
228
+ // Check if the document with the given key exists
107
229
let filter = doc ! { "_id" : key } ;
108
- let result = collection. find_one ( filter, None ) . await ?;
109
-
110
- trace ! ( "Data retrieved successfully" ) ;
230
+ let doc_find = match collection. find_one ( filter. clone ( ) , None ) . await {
231
+ Ok ( doc) => doc,
232
+ Err ( e) => {
233
+ event ! ( Level :: ERROR , "Failed to get data with error: {e}" ) ;
234
+ return Ok ( None ) ;
235
+ }
236
+ } ;
111
237
112
- if let Some ( document) = result {
113
- let deserialized: T = mongodb:: bson:: from_document ( document) ?;
114
- return Ok ( Some ( deserialized) ) ;
238
+ if let Some ( doc) = doc_find {
239
+ // Deserialize the existing data
240
+ let vec: Vec < T > = mongodb:: bson:: from_bson ( doc. get ( "data" ) . unwrap ( ) . clone ( ) ) ?;
241
+ return Ok ( Some ( vec) ) ;
115
242
}
116
243
117
244
warn ! ( "Data unsuccessfully deserialized" ) ;
0 commit comments