5
5
See LICENSE for details
6
6
"""
7
7
import datetime
8
+ import hashlib
8
9
import io
9
10
import logging
10
11
import os
18
19
from queue import Empty , Queue
19
20
from tempfile import NamedTemporaryFile
20
21
from threading import Thread
21
- from typing import Optional
22
+ from typing import Dict , Optional
22
23
23
24
import psycopg2
24
25
29
30
from . import common , version , wal
30
31
from .basebackup_delta import DeltaBaseBackup
31
32
from .common import (
32
- BackupFailure , BaseBackupFormat , BaseBackupMode , connection_string_using_pgpass ,
33
+ BackupFailure , BaseBackupFormat , BaseBackupMode , connection_string_using_pgpass , extract_pghoard_bb_v2_metadata ,
33
34
replication_connection_string_and_slot_using_pgpass , set_stream_nonblocking ,
34
35
set_subprocess_stdout_and_stderr_nonblocking , terminate_subprocess
35
36
)
36
37
from .patchedtarfile import tarfile
38
+ from .rohmu .delta .common import EMBEDDED_FILE_SIZE
37
39
38
40
BASEBACKUP_NAME = "pghoard_base_backup"
39
41
EMPTY_DIRS = [
@@ -82,6 +84,26 @@ def from_config(config) -> "CompressionData":
82
84
return CompressionData (algorithm = algorithm , level = level )
83
85
84
86
87
+ class HashFile :
88
+ def __init__ (self , * , path ):
89
+ self ._file = open (path , "rb" )
90
+ self .hash = hashlib .blake2s ()
91
+
92
+ def __enter__ (self ):
93
+ return self
94
+
95
+ def __exit__ (self , t , v , tb ):
96
+ self ._file .close ()
97
+
98
+ def read (self , n = None ):
99
+ data = self ._file .read (n )
100
+ self .hash .update (data )
101
+ return data
102
+
103
+ def __getattr__ (self , attr ):
104
+ return getattr (self ._file , attr )
105
+
106
+
85
107
class PGBaseBackup (Thread ):
86
108
def __init__ (
87
109
self ,
@@ -126,6 +148,8 @@ def run(self):
126
148
self .run_local_tar_basebackup ()
127
149
elif basebackup_mode == BaseBackupMode .delta :
128
150
self .run_local_tar_basebackup (delta = True )
151
+ elif basebackup_mode == BaseBackupMode .local_tar_delta_stats :
152
+ self .run_local_tar_basebackup (with_delta_stats = True )
129
153
elif basebackup_mode == BaseBackupMode .pipe :
130
154
self .run_piped_basebackup ()
131
155
else :
@@ -409,7 +433,7 @@ def get_control_entries_for_tar(self, *, metadata, pg_control, backup_label):
409
433
ti .mtime = mtime
410
434
yield ti , None , False
411
435
412
- def write_files_to_tar (self , * , files , tar ):
436
+ def write_files_to_tar (self , * , files , tar , delta_stats = None ):
413
437
for archive_path , local_path , missing_ok in files :
414
438
if not self .running :
415
439
raise BackupFailure ("thread termination requested" )
@@ -419,7 +443,18 @@ def write_files_to_tar(self, *, files, tar):
419
443
continue
420
444
421
445
try :
422
- tar .add (local_path , arcname = archive_path , recursive = False )
446
+ if delta_stats is None :
447
+ tar .add (local_path , arcname = archive_path , recursive = False )
448
+ else :
449
+ if os .path .isdir (local_path ):
450
+ tar .add (local_path , arcname = archive_path , recursive = False )
451
+ else :
452
+ with HashFile (path = local_path ) as fileobj :
453
+ ti = tar .gettarinfo (name = local_path , arcname = archive_path )
454
+ tar .addfile (ti , fileobj = fileobj )
455
+ if ti .size > EMBEDDED_FILE_SIZE :
456
+ # Tiny files are not uploaded separately, they are embed into the manifest, so skip them
457
+ delta_stats [fileobj .hash .hexdigest ()] = ti .size
423
458
except (FileNotFoundError if missing_ok else NoException ):
424
459
self .log .warning ("File %r went away while writing to tar, ignoring" , local_path )
425
460
@@ -508,7 +543,15 @@ def compression_data(self) -> CompressionData:
508
543
return CompressionData .from_config (self .config )
509
544
510
545
def tar_one_file (
511
- self , * , temp_dir , chunk_path , files_to_backup , callback_queue , filetype = "basebackup_chunk" , extra_metadata = None
546
+ self ,
547
+ * ,
548
+ temp_dir ,
549
+ chunk_path ,
550
+ files_to_backup ,
551
+ callback_queue ,
552
+ filetype = "basebackup_chunk" ,
553
+ extra_metadata = None ,
554
+ delta_stats = None
512
555
):
513
556
start_time = time .monotonic ()
514
557
@@ -522,7 +565,7 @@ def tar_one_file(
522
565
fileobj = raw_output_obj
523
566
) as output_obj :
524
567
with tarfile .TarFile (fileobj = output_obj , mode = "w" ) as output_tar :
525
- self .write_files_to_tar (files = files_to_backup , tar = output_tar )
568
+ self .write_files_to_tar (files = files_to_backup , tar = output_tar , delta_stats = delta_stats )
526
569
527
570
input_size = output_obj .tell ()
528
571
@@ -585,13 +628,14 @@ def wait_for_chunk_transfer_to_complete(self, chunk_count, upload_results, chunk
585
628
)
586
629
return False
587
630
588
- def handle_single_chunk (self , * , chunk_callback_queue , chunk_path , chunks , index , temp_dir ):
631
+ def handle_single_chunk (self , * , chunk_callback_queue , chunk_path , chunks , index , temp_dir , delta_stats = None ):
589
632
one_chunk_files = chunks [index ]
590
633
chunk_name , input_size , result_size = self .tar_one_file (
591
634
callback_queue = chunk_callback_queue ,
592
635
chunk_path = chunk_path ,
593
636
temp_dir = temp_dir ,
594
637
files_to_backup = one_chunk_files ,
638
+ delta_stats = delta_stats ,
595
639
)
596
640
self .log .info (
597
641
"Queued backup chunk %r for transfer, chunks on disk (including partial): %r, current: %r, total chunks: %r" ,
@@ -604,7 +648,9 @@ def handle_single_chunk(self, *, chunk_callback_queue, chunk_path, chunks, index
604
648
"files" : [chunk [0 ] for chunk in one_chunk_files ]
605
649
}
606
650
607
- def create_and_upload_chunks (self , chunks , data_file_format , temp_base_dir ):
651
+ def create_and_upload_chunks (
652
+ self , chunks , data_file_format , temp_base_dir , delta_stats : Optional [Dict [str , int ]] = None
653
+ ):
608
654
start_time = time .monotonic ()
609
655
chunk_files = []
610
656
upload_results = []
@@ -633,6 +679,7 @@ def create_and_upload_chunks(self, chunks, data_file_format, temp_base_dir):
633
679
chunks = chunks ,
634
680
index = i ,
635
681
temp_dir = temp_base_dir ,
682
+ delta_stats = delta_stats ,
636
683
)
637
684
pending_compress_and_encrypt_tasks .append (task )
638
685
self .chunks_on_disk += 1
@@ -650,7 +697,31 @@ def create_and_upload_chunks(self, chunks, data_file_format, temp_base_dir):
650
697
651
698
return chunk_files
652
699
653
- def run_local_tar_basebackup (self , delta = False ):
700
+ def fetch_all_data_files_hashes (self ):
701
+ hashes : Dict [str , int ] = {}
702
+
703
+ for backup in self .get_remote_basebackups_info (self .site ):
704
+ if backup ["metadata" ].get ("format" ) != BaseBackupFormat .v2 :
705
+ continue
706
+
707
+ key = os .path .join (self .site_config ["prefix" ], "basebackup" , backup ["name" ])
708
+ bmeta_compressed = self .storage .get_contents_to_string (key )[0 ]
709
+
710
+ with rohmufile .file_reader (
711
+ fileobj = io .BytesIO (bmeta_compressed ),
712
+ metadata = backup ["metadata" ],
713
+ key_lookup = lambda key_id : self .site_config ["encryption_keys" ][key_id ]["private" ]
714
+ ) as input_obj :
715
+ meta = extract_pghoard_bb_v2_metadata (input_obj )
716
+
717
+ if "delta_stats" not in meta :
718
+ continue
719
+
720
+ hashes .update (meta ["delta_stats" ]["hashes" ])
721
+
722
+ return hashes
723
+
724
+ def run_local_tar_basebackup (self , delta = False , with_delta_stats = False ):
654
725
control_files_metadata_extra = {}
655
726
pgdata = self .site_config ["pg_data_directory" ]
656
727
if not os .path .isdir (pgdata ):
@@ -756,13 +827,53 @@ def run_local_tar_basebackup(self, delta=False):
756
827
pgdata = pgdata , tablespaces = tablespaces , target_chunk_size = target_chunk_size
757
828
)
758
829
chunks_count = len (chunks )
830
+
831
+ delta_stats : Optional [Dict [str , int ]] = None
832
+ if with_delta_stats :
833
+ delta_stats = {}
834
+
759
835
# Tar up the chunks and submit them for upload; note that we start from chunk 1 here; chunk 0
760
836
# is reserved for special files and metadata and will be generated last.
761
- chunk_files = self .create_and_upload_chunks (chunks , data_file_format , temp_base_dir )
837
+ chunk_files = self .create_and_upload_chunks (
838
+ chunks , data_file_format , temp_base_dir , delta_stats = delta_stats
839
+ )
762
840
763
841
total_size_plain = sum (item ["input_size" ] for item in chunk_files )
764
842
total_size_enc = sum (item ["result_size" ] for item in chunk_files )
765
843
844
+ if with_delta_stats :
845
+ control_files_metadata_extra ["delta_stats" ] = {"hashes" : delta_stats }
846
+
847
+ existing_hashes = self .fetch_all_data_files_hashes ()
848
+ new_hashes = {k : delta_stats [k ] for k in set (delta_stats ).difference (set (existing_hashes ))}
849
+
850
+ planned_upload_size = sum (new_hashes .values ())
851
+ planned_upload_count = len (new_hashes )
852
+
853
+ if existing_hashes :
854
+ # Send ratio metrics for every backup except for the first one
855
+ planned_total_size = sum (delta_stats .values ())
856
+ planned_total_count = len (delta_stats )
857
+ if planned_total_count :
858
+ self .metrics .gauge (
859
+ "pghoard.planned_delta_backup_changed_data_files_ratio" ,
860
+ planned_upload_count / planned_total_count
861
+ )
862
+ if planned_total_size :
863
+ self .metrics .gauge (
864
+ "pghoard.planned_delta_backup_changed_data_size_ratio" ,
865
+ planned_upload_size / planned_total_size
866
+ )
867
+ self .metrics .gauge (
868
+ "pghoard.planned_delta_backup_remained_data_size_raw" ,
869
+ planned_total_size - planned_upload_size ,
870
+ )
871
+
872
+ self .metrics .increase ("pghoard.planned_delta_backup_total_size" , inc_value = planned_upload_size )
873
+ self .metrics .gauge ("pghoard.planned_delta_backup_upload_size" , planned_upload_size )
874
+ self .metrics .increase ("pghoard.planned_delta_backup_total_files" , inc_value = planned_upload_count )
875
+ self .metrics .gauge ("pghoard.planned_delta_backup_upload_files" , planned_upload_count )
876
+
766
877
control_files_metadata_extra ["chunks" ] = chunk_files
767
878
768
879
# Everything is now tarred up, grab the latest pg_control and stop the backup process
0 commit comments