@@ -820,7 +820,27 @@ impl BTreeCursor {
820
820
( self . find_cell ( page, int_key) , page. page_type ( ) )
821
821
} ;
822
822
823
- // TODO: if overwrite drop cell
823
+ // check if we have an exact match on the cell at cell_idx:
824
+ // (`update` support/insert record to same existing rowid)
825
+ if cell_idx < page. get_contents ( ) . cell_count ( ) {
826
+ // parse the existing cell to get the stored key
827
+ if let BTreeCell :: TableLeafCell ( tbl_leaf) = page. get_contents ( ) . cell_get (
828
+ cell_idx,
829
+ self . pager . clone ( ) ,
830
+ payload_overflow_threshold_max ( page_type, self . usable_space ( ) as u16 ) ,
831
+ payload_overflow_threshold_min ( page_type, self . usable_space ( ) as u16 ) ,
832
+ self . usable_space ( ) ,
833
+ ) ? {
834
+ if tbl_leaf. _rowid == int_key {
835
+ tracing:: trace!( "insert_into_page: found exact match with cell_idx={cell_idx}, overwriting" ) ;
836
+ // we are "overwriting in place" the old cell, not removing the pointers
837
+ self . overwrite_cell ( page. clone ( ) , cell_idx, record) ?;
838
+ // advance to the next cell
839
+ self . move_to ( SeekKey :: TableRowId ( int_key) , SeekOp :: GT ) ?;
840
+ return Ok ( CursorResult :: Ok ( ( ) ) ) ;
841
+ }
842
+ }
843
+ }
824
844
825
845
// insert cell
826
846
let mut cell_payload: Vec < u8 > = Vec :: new ( ) ;
@@ -2201,6 +2221,183 @@ impl BTreeCursor {
2201
2221
pub fn table_id ( & self ) -> usize {
2202
2222
self . root_page
2203
2223
}
2224
+
2225
+ pub fn overwrite_cell (
2226
+ & mut self ,
2227
+ page_ref : PageRef ,
2228
+ cell_idx : usize ,
2229
+ record : & Record ,
2230
+ ) -> Result < CursorResult < ( ) > > {
2231
+ // build the new payload
2232
+ let page_type = page_ref. get ( ) . contents . as_ref ( ) . unwrap ( ) . page_type ( ) ;
2233
+ let mut new_payload = Vec :: new ( ) ;
2234
+ fill_cell_payload (
2235
+ page_type,
2236
+ self . rowid . get ( ) ,
2237
+ & mut new_payload,
2238
+ record,
2239
+ self . usable_space ( ) as u16 ,
2240
+ self . pager . clone ( ) ,
2241
+ ) ;
2242
+
2243
+ // figure out old cell offset & size
2244
+ let ( old_offset, old_local_size) = {
2245
+ let page = page_ref. get ( ) . contents . as_ref ( ) . unwrap ( ) ;
2246
+ page. cell_get_raw_region (
2247
+ cell_idx,
2248
+ payload_overflow_threshold_max ( page_type, self . usable_space ( ) as u16 ) ,
2249
+ payload_overflow_threshold_min ( page_type, self . usable_space ( ) as u16 ) ,
2250
+ self . usable_space ( ) ,
2251
+ )
2252
+ } ;
2253
+
2254
+ // if it all fits in local space and old_local_size is enough, do an in-place copy
2255
+ // (like sqlite btreeOverwriteContent local portion)
2256
+ if new_payload. len ( ) <= old_local_size {
2257
+ self . overwrite_content (
2258
+ page_ref. clone ( ) ,
2259
+ old_offset,
2260
+ & new_payload,
2261
+ 0 ,
2262
+ new_payload. len ( ) ,
2263
+ ) ?;
2264
+ // if there's leftover local space (old_local_size > new_payload.len()), zero it or free it
2265
+ let remaining = old_local_size - new_payload. len ( ) ;
2266
+ if remaining > 0 {
2267
+ let buf = page_ref. get ( ) . contents . as_mut ( ) . unwrap ( ) . as_ptr ( ) ;
2268
+ for i in 0 ..remaining {
2269
+ buf[ old_offset + new_payload. len ( ) + i] = 0 ;
2270
+ }
2271
+ }
2272
+ Ok ( CursorResult :: Ok ( ( ) ) )
2273
+ } else {
2274
+ // otherwise, we have to handle overflow (or page-splitting)... overflow for simplicity
2275
+ self . overwrite_overflow_cell (
2276
+ page_ref,
2277
+ cell_idx,
2278
+ old_offset,
2279
+ old_local_size,
2280
+ & new_payload,
2281
+ )
2282
+ }
2283
+ }
2284
+
2285
+ pub fn overwrite_content (
2286
+ & mut self ,
2287
+ page_ref : PageRef ,
2288
+ dest_offset : usize ,
2289
+ new_payload : & [ u8 ] ,
2290
+ src_offset : usize ,
2291
+ amount : usize ,
2292
+ ) -> Result < CursorResult < ( ) > > {
2293
+ return_if_locked ! ( page_ref) ;
2294
+ page_ref. set_dirty ( ) ;
2295
+ self . pager . add_dirty ( page_ref. get ( ) . id ) ;
2296
+
2297
+ let buf = page_ref. get ( ) . contents . as_mut ( ) . unwrap ( ) . as_ptr ( ) ;
2298
+
2299
+ // if new_payload doesn't have enough data, we fill with zeros
2300
+ let n_data = new_payload. len ( ) . saturating_sub ( src_offset) ;
2301
+ if n_data == 0 {
2302
+ // everything is zeros
2303
+ for i in 0 ..amount {
2304
+ if buf[ dest_offset + i] != 0 {
2305
+ // do page-write if needed
2306
+ // ...
2307
+ buf[ dest_offset + i] = 0 ;
2308
+ }
2309
+ }
2310
+ } else {
2311
+ let copy_len = n_data. min ( amount) ;
2312
+ // copy the overlapping portion
2313
+ buf[ dest_offset..dest_offset + copy_len]
2314
+ . copy_from_slice ( & new_payload[ src_offset..src_offset + copy_len] ) ;
2315
+
2316
+ // if copy_len < amount => fill remainder with 0
2317
+ if copy_len < amount {
2318
+ for i in copy_len..amount {
2319
+ buf[ dest_offset + i] = 0 ;
2320
+ }
2321
+ }
2322
+ }
2323
+ Ok ( CursorResult :: Ok ( ( ) ) )
2324
+ }
2325
+
2326
+ fn overwrite_overflow_cell (
2327
+ & mut self ,
2328
+ page_ref : PageRef ,
2329
+ cell_idx : usize ,
2330
+ old_offset : usize ,
2331
+ old_local_size : usize ,
2332
+ new_payload : & [ u8 ] ,
2333
+ ) -> Result < CursorResult < ( ) > > {
2334
+ // possibly defragment
2335
+ let page = page_ref. get ( ) . contents . as_mut ( ) . unwrap ( ) ;
2336
+ defragment_page ( page, self . usable_space ( ) as u16 ) ;
2337
+
2338
+ // re-check how many bytes we can store locally
2339
+ let ( local_needed, _) = compute_local_needed ( new_payload. len ( ) , self . usable_space ( ) as u16 ) ;
2340
+ let local_area_size = old_local_size;
2341
+
2342
+ let local_bytes = ( local_needed. min ( old_local_size) ) . saturating_sub ( 4 ) ;
2343
+
2344
+ // overwrite that many bytes
2345
+ self . overwrite_content ( page_ref. clone ( ) , old_offset, new_payload, 0 , local_bytes) ?;
2346
+
2347
+ // if local_bytes < local_area_size => zero out leftover
2348
+ if local_bytes < local_area_size {
2349
+ let leftover = local_area_size - local_bytes;
2350
+ // zero the leftover area (for now)
2351
+ for i in 0 ..leftover {
2352
+ page. as_ptr ( ) [ old_offset + local_bytes + i] = 0 ;
2353
+ }
2354
+ }
2355
+
2356
+ let remainder_start = if local_bytes < 4 { 4 } else { local_bytes } ;
2357
+
2358
+ let remainder = & new_payload[ remainder_start..] ;
2359
+ // now we must store new_payload[local_bytes..] in overflow
2360
+ if !remainder. is_empty ( ) {
2361
+ let overflow_page = allocate_overflow_page ( self . pager . clone ( ) ) ;
2362
+ let arr = overflow_page. get ( ) . id . to_be_bytes ( ) ;
2363
+ let buf = page. as_ptr ( ) ;
2364
+ let pointer_offset = old_offset + local_bytes;
2365
+ if pointer_offset + 4 > buf. len ( ) {
2366
+ panic ! (
2367
+ "Overflow pointer exceeds buffer! pointer_offset: {}, buf.len(): {}" ,
2368
+ pointer_offset,
2369
+ buf. len( )
2370
+ ) ;
2371
+ }
2372
+ buf[ pointer_offset..pointer_offset + 4 ] . copy_from_slice ( & arr) ;
2373
+ }
2374
+ // no remainder => store 0 as next overflow pointer
2375
+ let pointer_offset = old_offset + local_bytes. saturating_sub ( 4 ) ;
2376
+ for b in & mut page. as_ptr ( ) [ pointer_offset..pointer_offset + 4 ] {
2377
+ * b = 0 ;
2378
+ }
2379
+ // zero out the rest of the local area
2380
+ Ok ( CursorResult :: Ok ( ( ) ) )
2381
+ }
2382
+ }
2383
+
2384
+ // Compute how many bytes of a new payload to store locally on a b-tree table leaf page.
2385
+ // returns (local_size, needs_overflow).
2386
+ fn compute_local_needed ( total_payload_size : usize , usable_space : u16 ) -> ( usize , bool ) {
2387
+ // Compute the min/max thresholds.
2388
+ let max_local = payload_overflow_threshold_max ( PageType :: TableLeaf , usable_space) ;
2389
+ let min_local = payload_overflow_threshold_min ( PageType :: TableLeaf , usable_space) ;
2390
+
2391
+ // if the entire record fits within max_local, store it all on page.
2392
+ if total_payload_size <= max_local {
2393
+ return ( total_payload_size, false ) ;
2394
+ }
2395
+ let c = ( usable_space as isize - 4 ) . max ( 1 ) as usize ;
2396
+ let mut local = min_local + ( ( total_payload_size - min_local) % ( c - 1 ) ) ;
2397
+ if local > max_local {
2398
+ local = min_local;
2399
+ }
2400
+ ( local, true )
2204
2401
}
2205
2402
2206
2403
impl PageStack {
0 commit comments