@@ -529,27 +529,34 @@ def test_startup_walk_for_missed_uncompressed_files(self):
529
529
assert self .pghoard .compression_queue .qsize () == 2
530
530
assert self .pghoard .transfer_queue .qsize () == 0
531
531
532
- def test_startup_walk_for_missed_uncompressed_files_timeline (self ):
532
+ @pytest .mark .parametrize (
533
+ "file_type, file_name" , [(FileType .Wal , "000000010000000000000004" ), (FileType .Timeline , "00000002.history" )]
534
+ )
535
+ def test_startup_walk_for_missed_uncompressed_file_type (self , file_type : FileType , file_name : str ):
533
536
compressed_wal_path , _ = self .pghoard .create_backup_site_paths (self .test_site )
534
537
uncompressed_wal_path = compressed_wal_path + "_incoming"
535
- with open (os .path .join (uncompressed_wal_path , "00000002.history" ), "wb" ) as fp :
538
+ with open (os .path .join (uncompressed_wal_path , file_name ), "wb" ) as fp :
536
539
fp .write (b"foo" )
537
540
self .pghoard .startup_walk_for_missed_files ()
538
541
assert self .pghoard .compression_queue .qsize () == 1
539
542
assert self .pghoard .transfer_queue .qsize () == 0
540
543
compress_event = self .pghoard .compression_queue .get (timeout = 1.0 )
541
- assert compress_event .file_type == FileType . Timeline
544
+ assert compress_event .file_type == file_type
542
545
543
- def test_startup_walk_for_missed_uncompressed_files_wal (self ):
546
+ @pytest .mark .parametrize (
547
+ "file_type, file_name" , [(FileType .Wal , "000000010000000000000005" ), (FileType .Timeline , "00000003.history" )]
548
+ )
549
+ def test_startup_walk_for_missed_compressed_file_type (self , file_type : FileType , file_name : str ):
544
550
compressed_wal_path , _ = self .pghoard .create_backup_site_paths (self .test_site )
545
- uncompressed_wal_path = compressed_wal_path + "_incoming"
546
- with open (os .path .join (uncompressed_wal_path , "000000010000000000000004" ), "wb" ) as fp :
551
+ with open (os .path .join (compressed_wal_path , file_name ), "wb" ) as fp :
547
552
fp .write (b"foo" )
553
+ with open (os .path .join (compressed_wal_path , f"{ file_name } .metadata" ), "wb" ) as fp :
554
+ fp .write (b"{}" )
548
555
self .pghoard .startup_walk_for_missed_files ()
549
- assert self .pghoard .compression_queue .qsize () == 1
550
- assert self .pghoard .transfer_queue .qsize () == 0
551
- compress_event = self .pghoard .compression_queue .get (timeout = 1.0 )
552
- assert compress_event .file_type == FileType . Wal
556
+ assert self .pghoard .compression_queue .qsize () == 0
557
+ assert self .pghoard .transfer_queue .qsize () == 1
558
+ upload_event = self .pghoard .transfer_queue .get (timeout = 1.0 )
559
+ assert upload_event .file_type == file_type
553
560
554
561
555
562
class TestPGHoardWithPG :
@@ -597,8 +604,6 @@ def test_pause_on_disk_full(self, db, pghoard_separate_volume, caplog):
597
604
# MiB so if logic for automatically suspending pg_receive(xlog|wal) wasn't working the volume
598
605
# would certainly fill up and the files couldn't be processed. Now this should work fine.
599
606
for _ in range (16 ):
600
- # Note: do not combine two function call in one select, PG executes it differently and
601
- # sometimes looks like it generates less WAL files than we wanted
602
607
switch_wal (conn )
603
608
conn .close ()
604
609
@@ -625,6 +630,10 @@ def test_surviving_pg_receivewal_hickup(self, db, pghoard):
625
630
if pghoard .receivexlogs [pghoard .test_site ].is_alive ():
626
631
pghoard .receivexlogs [pghoard .test_site ].join ()
627
632
del pghoard .receivexlogs [pghoard .test_site ]
633
+ # stopping the thread is not enough, it's possible that killed receiver will leave incomplete partial files
634
+ # around, pghoard is capable of cleaning those up but needs to be restarted, for the test it should be OK
635
+ # just to call startup_walk_for_missed_files, so it takes care of cleaning up
636
+ pghoard .startup_walk_for_missed_files ()
628
637
629
638
n_xlogs = pghoard .transfer_agent_state [pghoard .test_site ]["upload" ]["xlog" ]["xlogs_since_basebackup" ]
630
639
0 commit comments