@@ -2,6 +2,7 @@ use async_trait::async_trait;
2
2
use mongodb:: bson:: { doc, DateTime , Document } ;
3
3
use mongodb:: { options:: ClientOptions , Client } ;
4
4
use serde:: { de:: DeserializeOwned , Serialize } ;
5
+ use std:: collections:: HashMap ;
5
6
use tracing:: { event, span, trace, warn, Level } ;
6
7
7
8
use super :: handler:: KvStoreConnection ;
@@ -75,6 +76,7 @@ impl KvStoreConnection for MongoDbConn {
75
76
async fn set_data < T : Serialize + std:: marker:: Send + DeserializeOwned > (
76
77
& mut self ,
77
78
key : & str ,
79
+ value_id : & str ,
78
80
value : T ,
79
81
) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > {
80
82
// Tracing
@@ -90,22 +92,22 @@ impl KvStoreConnection for MongoDbConn {
90
92
let filter = doc ! { "_id" : key } ;
91
93
let existing_doc = collection. find_one ( filter. clone ( ) , None ) . await ?;
92
94
93
- let mut vec : Vec < T > = if let Some ( doc) = existing_doc {
95
+ let mut mapping : HashMap < String , T > = if let Some ( doc) = existing_doc {
94
96
if doc. contains_key ( "data" ) {
95
97
// Deserialize the existing data
96
98
mongodb:: bson:: from_bson ( doc. get ( "data" ) . unwrap ( ) . clone ( ) ) ?
97
99
} else {
98
- Vec :: new ( )
100
+ HashMap :: new ( )
99
101
}
100
102
} else {
101
- Vec :: new ( )
103
+ HashMap :: new ( )
102
104
} ;
103
105
104
106
// Append the new data to the vec
105
- vec . push ( value) ;
107
+ mapping . insert ( value_id . to_string ( ) , value) ;
106
108
107
109
// Serialize the vec back to a BSON array
108
- let serialized_vec = mongodb:: bson:: to_bson ( & vec ) ?;
110
+ let serialized_vec = mongodb:: bson:: to_bson ( & mapping ) ?;
109
111
110
112
// Create or update the document
111
113
let update = doc ! {
@@ -135,6 +137,7 @@ impl KvStoreConnection for MongoDbConn {
135
137
async fn set_data_with_expiry < T : Serialize + std:: marker:: Send + DeserializeOwned > (
136
138
& mut self ,
137
139
key : & str ,
140
+ value_id : & str ,
138
141
value : T ,
139
142
seconds : usize ,
140
143
) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > {
@@ -151,18 +154,18 @@ impl KvStoreConnection for MongoDbConn {
151
154
let filter = doc ! { "_id" : key } ;
152
155
let existing_doc = collection. find_one ( filter. clone ( ) , None ) . await ?;
153
156
154
- let mut vec : Vec < T > = if let Some ( doc) = existing_doc {
157
+ let mut mapping : HashMap < String , T > = if let Some ( doc) = existing_doc {
155
158
// Deserialize the existing data
156
159
mongodb:: bson:: from_bson ( doc. get ( "data" ) . unwrap ( ) . clone ( ) ) ?
157
160
} else {
158
- Vec :: new ( )
161
+ HashMap :: new ( )
159
162
} ;
160
163
161
164
// Append the new data to the vec
162
- vec . push ( value) ;
165
+ mapping . insert ( value_id . to_string ( ) , value) ;
163
166
164
167
// Serialize the vec back to a BSON array
165
- let serialized_vec = mongodb:: bson:: to_bson ( & vec ) ?;
168
+ let serialized_vec = mongodb:: bson:: to_bson ( & mapping ) ?;
166
169
167
170
// Calculate the expiry time
168
171
let expiry_time = ( seconds * 1000 ) as i64 ;
@@ -190,36 +193,68 @@ impl KvStoreConnection for MongoDbConn {
190
193
Ok ( ( ) )
191
194
}
192
195
193
- async fn delete_data (
196
+ async fn del_data (
194
197
& mut self ,
195
198
key : & str ,
199
+ value_id : Option < & str > ,
196
200
) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > {
197
201
// Tracing
198
- let span = span ! ( Level :: TRACE , "MongoDbConn::delete_data " ) ;
202
+ let span = span ! ( Level :: TRACE , "MongoDbConn::del_data " ) ;
199
203
let _enter = span. enter ( ) ;
200
204
201
205
let collection = self
202
206
. client
203
207
. database ( & self . index . db_name )
204
208
. collection :: < Document > ( & self . index . coll_name ) ;
205
209
210
+ // Build the filter based on the key
206
211
let filter = doc ! { "_id" : key } ;
207
- match collection. delete_one ( filter, None ) . await {
208
- Ok ( _) => ( ) ,
209
- Err ( e) => {
210
- event ! ( Level :: ERROR , "Failed to delete data with error: {e}" ) ;
211
- }
212
- } ;
213
212
214
- trace ! ( "Data deleted successfully" ) ;
213
+ // If value_id is provided, we need to fetch the document and update it
214
+ if let Some ( value_id) = value_id {
215
+ let update = doc ! {
216
+ "$unset" : {
217
+ & format!( "data.{}" , value_id) : ""
218
+ }
219
+ } ;
220
+
221
+ match collection. find_one_and_update ( filter, update, None ) . await {
222
+ Ok ( result) => {
223
+ if let Some ( _) = result {
224
+ // Document was found and updated, log success or handle as needed
225
+ trace ! ( "Data updated successfully" ) ;
226
+ } else {
227
+ // Document not found
228
+ event ! ( Level :: ERROR , "Document not found for key: {}" , key) ;
229
+ }
230
+ }
231
+ Err ( e) => {
232
+ // Handle error from MongoDB
233
+ event ! ( Level :: ERROR , "Failed to update data with error: {:?}" , e) ;
234
+ return Err ( Box :: new ( e) ) ;
235
+ }
236
+ }
237
+ } else {
238
+ // value_id is None, so delete the entire document
239
+ match collection. delete_one ( filter. clone ( ) , None ) . await {
240
+ Ok ( _) => {
241
+ trace ! ( "Data deleted successfully" ) ;
242
+ }
243
+ Err ( e) => {
244
+ event ! ( Level :: ERROR , "Failed to delete data with error: {:?}" , e) ;
245
+ return Err ( Box :: new ( e) ) ;
246
+ }
247
+ } ;
248
+ }
215
249
216
250
Ok ( ( ) )
217
251
}
218
252
219
- async fn get_data < T : DeserializeOwned > (
253
+ async fn get_data < T : Clone + DeserializeOwned > (
220
254
& mut self ,
221
255
key : & str ,
222
- ) -> Result < Option < Vec < T > > , Box < dyn std:: error:: Error + Send + Sync > > {
256
+ value_id : Option < & str > ,
257
+ ) -> Result < Option < HashMap < String , T > > , Box < dyn std:: error:: Error + Send + Sync > > {
223
258
// Tracing
224
259
let span = span ! ( Level :: TRACE , "MongoDbConn::get_data" ) ;
225
260
let _enter = span. enter ( ) ;
@@ -241,8 +276,22 @@ impl KvStoreConnection for MongoDbConn {
241
276
242
277
if let Some ( doc) = doc_find {
243
278
// Deserialize the existing data
244
- let vec: Vec < T > = mongodb:: bson:: from_bson ( doc. get ( "data" ) . unwrap ( ) . clone ( ) ) ?;
245
- return Ok ( Some ( vec) ) ;
279
+ let mapping: HashMap < String , T > =
280
+ mongodb:: bson:: from_bson ( doc. get ( "data" ) . unwrap ( ) . clone ( ) ) ?;
281
+
282
+ if let Some ( id) = value_id {
283
+ // If value_id is provided, return only the value with the given ID
284
+ if let Some ( value) = mapping. get ( id) {
285
+ let mut result: HashMap < String , T > = HashMap :: new ( ) ;
286
+ result. insert ( id. to_string ( ) , value. clone ( ) ) ;
287
+ return Ok ( Some ( result) ) ;
288
+ } else {
289
+ // Value with the given ID not found
290
+ event ! ( Level :: ERROR , "Value with ID {id} not found for key {key}" ) ;
291
+ return Ok ( None ) ;
292
+ }
293
+ }
294
+ return Ok ( Some ( mapping) ) ;
246
295
}
247
296
248
297
warn ! ( "Data unsuccessfully deserialized" ) ;
0 commit comments