14
14
import subprocess
15
15
import time
16
16
from concurrent .futures import ThreadPoolExecutor
17
+ from dataclasses import dataclass
17
18
from queue import Empty , Queue
18
19
from tempfile import NamedTemporaryFile
19
20
from threading import Thread
21
+ from typing import Optional
20
22
21
23
import psycopg2
22
24
25
27
26
28
# pylint: disable=superfluous-parens
27
29
from . import common , version , wal
30
+ from .basebackup_delta import DeltaBaseBackup
28
31
from .common import (
29
- connection_string_using_pgpass , replication_connection_string_and_slot_using_pgpass , set_stream_nonblocking ,
32
+ BackupFailure , BaseBackupFormat , BaseBackupMode , connection_string_using_pgpass ,
33
+ replication_connection_string_and_slot_using_pgpass , set_stream_nonblocking ,
30
34
set_subprocess_stdout_and_stderr_nonblocking , terminate_subprocess
31
35
)
32
36
from .patchedtarfile import tarfile
46
50
]
47
51
48
52
49
- class BackupFailure (Exception ):
50
- """Backup failed - post a failure to callback_queue and allow the thread to terminate"""
51
-
52
-
53
53
class NoException (BaseException ):
54
54
"""Exception that's never raised, used in conditional except blocks"""
55
55
56
56
57
+ @dataclass (frozen = True )
58
+ class EncryptionData :
59
+ encryption_key_id : Optional [str ]
60
+ rsa_public_key : Optional [str ]
61
+
62
+ @staticmethod
63
+ def from_site_config (site_config ) -> "EncryptionData" :
64
+ encryption_key_id = site_config ["encryption_key_id" ]
65
+ if encryption_key_id :
66
+ rsa_public_key = site_config ["encryption_keys" ][encryption_key_id ]["public" ]
67
+ else :
68
+ rsa_public_key = None
69
+
70
+ return EncryptionData (encryption_key_id = encryption_key_id , rsa_public_key = rsa_public_key )
71
+
72
+
73
+ @dataclass (frozen = True )
74
+ class CompressionData :
75
+ algorithm : str
76
+ level : int
77
+
78
+ @staticmethod
79
+ def from_config (config ) -> "CompressionData" :
80
+ algorithm = config ["compression" ]["algorithm" ]
81
+ level = config ["compression" ]["level" ]
82
+ return CompressionData (algorithm = algorithm , level = level )
83
+
84
+
57
85
class PGBaseBackup (Thread ):
58
86
def __init__ (
59
87
self ,
@@ -63,10 +91,12 @@ def __init__(
63
91
basebackup_path ,
64
92
compression_queue ,
65
93
metrics ,
94
+ storage ,
66
95
transfer_queue = None ,
67
96
callback_queue = None ,
68
97
pg_version_server = None ,
69
- metadata = None
98
+ metadata = None ,
99
+ get_remote_basebackups_info = None
70
100
):
71
101
super ().__init__ ()
72
102
self .log = logging .getLogger ("PGBaseBackup" )
@@ -84,15 +114,19 @@ def __init__(
84
114
self .pid = None
85
115
self .pg_version_server = pg_version_server
86
116
self .latest_activity = datetime .datetime .utcnow ()
117
+ self .storage = storage
118
+ self .get_remote_basebackups_info = get_remote_basebackups_info
87
119
88
120
def run (self ):
89
121
try :
90
- basebackup_mode = self .config [ "backup_sites" ][ self . site ] ["basebackup_mode" ]
91
- if basebackup_mode == " basic" :
122
+ basebackup_mode = self .site_config ["basebackup_mode" ]
123
+ if basebackup_mode == BaseBackupMode . basic :
92
124
self .run_basic_basebackup ()
93
- elif basebackup_mode == "local-tar" :
125
+ elif basebackup_mode == BaseBackupMode . local_tar :
94
126
self .run_local_tar_basebackup ()
95
- elif basebackup_mode == "pipe" :
127
+ elif basebackup_mode == BaseBackupMode .delta :
128
+ self .run_local_tar_basebackup (delta = True )
129
+ elif basebackup_mode == BaseBackupMode .pipe :
96
130
self .run_piped_basebackup ()
97
131
else :
98
132
raise errors .InvalidConfigurationError ("Unsupported basebackup_mode {!r}" .format (basebackup_mode ))
@@ -129,7 +163,7 @@ def get_paths_for_backup(basebackup_path):
129
163
130
164
def get_command_line (self , output_name ):
131
165
command = [
132
- self .config [ "backup_sites" ][ self . site ] ["pg_basebackup_path" ],
166
+ self .site_config ["pg_basebackup_path" ],
133
167
"--format" ,
134
168
"tar" ,
135
169
"--label" ,
@@ -139,7 +173,7 @@ def get_command_line(self, output_name):
139
173
output_name ,
140
174
]
141
175
142
- if self .config [ "backup_sites" ][ self . site ] ["active_backup_mode" ] == "standalone_hot_backup" :
176
+ if self .site_config ["active_backup_mode" ] == "standalone_hot_backup" :
143
177
if self .pg_version_server >= 100000 :
144
178
command .extend (["--wal-method=fetch" ])
145
179
else :
@@ -169,9 +203,9 @@ def check_command_success(self, proc, output_file):
169
203
170
204
def basebackup_compression_pipe (self , proc , basebackup_path ):
171
205
rsa_public_key = None
172
- encryption_key_id = self .config [ "backup_sites" ][ self . site ] ["encryption_key_id" ]
206
+ encryption_key_id = self .site_config ["encryption_key_id" ]
173
207
if encryption_key_id :
174
- rsa_public_key = self .config [ "backup_sites" ][ self . site ] ["encryption_keys" ][encryption_key_id ]["public" ]
208
+ rsa_public_key = self .site_config ["encryption_keys" ][encryption_key_id ]["public" ]
175
209
compression_algorithm = self .config ["compression" ]["algorithm" ]
176
210
compression_level = self .config ["compression" ]["level" ]
177
211
self .log .debug ("Compressing basebackup directly to file: %r" , basebackup_path )
@@ -461,25 +495,30 @@ def add_entry(archive_path, local_path, *, missing_ok):
461
495
yield from add_directory (archive_path , local_path , missing_ok = False )
462
496
yield archive_path , local_path , False , "leave"
463
497
498
+ @property
499
+ def site_config (self ):
500
+ return self .config ["backup_sites" ][self .site ]
501
+
502
+ @property
503
+ def encryption_data (self ) -> EncryptionData :
504
+ return EncryptionData .from_site_config (self .site_config )
505
+
506
+ @property
507
+ def compression_data (self ) -> CompressionData :
508
+ return CompressionData .from_config (self .config )
509
+
464
510
def tar_one_file (
465
511
self , * , temp_dir , chunk_path , files_to_backup , callback_queue , filetype = "basebackup_chunk" , extra_metadata = None
466
512
):
467
513
start_time = time .monotonic ()
468
514
469
- site_config = self .config ["backup_sites" ][self .site ]
470
- encryption_key_id = site_config ["encryption_key_id" ]
471
- if encryption_key_id :
472
- rsa_public_key = site_config ["encryption_keys" ][encryption_key_id ]["public" ]
473
- else :
474
- rsa_public_key = None
475
-
476
515
with NamedTemporaryFile (dir = temp_dir , prefix = os .path .basename (chunk_path ), suffix = ".tmp" ) as raw_output_obj :
477
516
# pylint: disable=bad-continuation
478
517
with rohmufile .file_writer (
479
- compression_algorithm = self .config [ "compression" ][ " algorithm" ] ,
480
- compression_level = self .config [ "compression" ][ " level" ] ,
481
- compression_threads = site_config ["basebackup_compression_threads" ],
482
- rsa_public_key = rsa_public_key ,
518
+ compression_algorithm = self .compression_data . algorithm ,
519
+ compression_level = self .compression_data . level ,
520
+ compression_threads = self . site_config ["basebackup_compression_threads" ],
521
+ rsa_public_key = self . encryption_data . rsa_public_key ,
483
522
fileobj = raw_output_obj
484
523
) as output_obj :
485
524
with tarfile .TarFile (fileobj = output_obj , mode = "w" ) as output_tar :
@@ -492,7 +531,7 @@ def tar_one_file(
492
531
os .link (raw_output_obj .name , chunk_path )
493
532
494
533
rohmufile .log_compression_result (
495
- encrypted = bool (encryption_key_id ),
534
+ encrypted = bool (self . encryption_data . encryption_key_id ),
496
535
elapsed = time .monotonic () - start_time ,
497
536
original_size = input_size ,
498
537
result_size = result_size ,
@@ -505,16 +544,16 @@ def tar_one_file(
505
544
"pghoard.compressed_size_ratio" ,
506
545
size_ratio ,
507
546
tags = {
508
- "algorithm" : self .config [ "compression" ][ " algorithm" ] ,
547
+ "algorithm" : self .compression_data . algorithm ,
509
548
"site" : self .site ,
510
549
"type" : "basebackup" ,
511
550
}
512
551
)
513
552
514
553
metadata = {
515
- "compression-algorithm" : self .config [ "compression" ][ " algorithm" ] ,
516
- "encryption-key-id" : encryption_key_id ,
517
- "format" : "pghoard-bb-v2" ,
554
+ "compression-algorithm" : self .compression_data . algorithm ,
555
+ "encryption-key-id" : self . encryption_data . encryption_key_id ,
556
+ "format" : BaseBackupFormat . v2 ,
518
557
"original-file-size" : input_size ,
519
558
"host" : socket .gethostname (),
520
559
}
@@ -573,9 +612,8 @@ def create_and_upload_chunks(self, chunks, data_file_format, temp_base_dir):
573
612
self .chunks_on_disk = 0
574
613
i = 0
575
614
576
- site_config = self .config ["backup_sites" ][self .site ]
577
- max_chunks_on_disk = site_config ["basebackup_chunks_in_progress" ]
578
- threads = site_config ["basebackup_threads" ]
615
+ max_chunks_on_disk = self .site_config ["basebackup_chunks_in_progress" ]
616
+ threads = self .site_config ["basebackup_threads" ]
579
617
with ThreadPoolExecutor (max_workers = threads ) as tpe :
580
618
pending_compress_and_encrypt_tasks = []
581
619
while i < len (chunks ):
@@ -612,8 +650,9 @@ def create_and_upload_chunks(self, chunks, data_file_format, temp_base_dir):
612
650
613
651
return chunk_files
614
652
615
- def run_local_tar_basebackup (self ):
616
- pgdata = self .config ["backup_sites" ][self .site ]["pg_data_directory" ]
653
+ def run_local_tar_basebackup (self , delta = False ):
654
+ control_files_metadata_extra = {}
655
+ pgdata = self .site_config ["pg_data_directory" ]
617
656
if not os .path .isdir (pgdata ):
618
657
raise errors .InvalidConfigurationError ("pg_data_directory {!r} does not exist" .format (pgdata ))
619
658
@@ -622,7 +661,7 @@ def run_local_tar_basebackup(self):
622
661
data_file_format = "{}/{}.{{0:08d}}.pghoard" .format (compressed_base , os .path .basename (compressed_base )).format
623
662
624
663
# Default to 2GB chunks of uncompressed data
625
- target_chunk_size = self .config [ "backup_sites" ][ self . site ] ["basebackup_chunk_size" ]
664
+ target_chunk_size = self .site_config ["basebackup_chunk_size" ]
626
665
627
666
self .log .debug ("Connecting to database to start backup process" )
628
667
connection_string = connection_string_using_pgpass (self .connection_info )
@@ -686,13 +725,45 @@ def run_local_tar_basebackup(self):
686
725
self .log .info ("Starting to backup %r and %r tablespaces to %r" , pgdata , len (tablespaces ), compressed_base )
687
726
start_time = time .monotonic ()
688
727
689
- total_file_count , chunks = self .find_and_split_files_to_backup (
690
- pgdata = pgdata , tablespaces = tablespaces , target_chunk_size = target_chunk_size
691
- )
728
+ if delta :
729
+ delta_backup = DeltaBaseBackup (
730
+ storage = self .storage ,
731
+ site = self .site ,
732
+ site_config = self .site_config ,
733
+ transfer_queue = self .transfer_queue ,
734
+ metrics = self .metrics ,
735
+ encryption_data = self .encryption_data ,
736
+ compression_data = self .compression_data ,
737
+ get_remote_basebackups_info = self .get_remote_basebackups_info ,
738
+ parallel = self .site_config ["basebackup_threads" ],
739
+ temp_base_dir = temp_base_dir ,
740
+ compressed_base = compressed_base
741
+ )
742
+ total_size_plain , total_size_enc , manifest , total_file_count = delta_backup .run (
743
+ pgdata = pgdata ,
744
+ src_iterate_func = lambda : (
745
+ item [1 ]
746
+ for item in self .find_files_to_backup (pgdata = pgdata , tablespaces = tablespaces )
747
+ if not item [1 ].endswith (".pem" ) # Exclude such files like "dh1024.pem"
748
+ ),
749
+ )
750
+
751
+ chunks_count = total_file_count
752
+ control_files_metadata_extra ["manifest" ] = manifest .jsondict ()
753
+ self .metadata ["format" ] = BaseBackupFormat .delta_v1
754
+ else :
755
+ total_file_count , chunks = self .find_and_split_files_to_backup (
756
+ pgdata = pgdata , tablespaces = tablespaces , target_chunk_size = target_chunk_size
757
+ )
758
+ chunks_count = len (chunks )
759
+ # Tar up the chunks and submit them for upload; note that we start from chunk 1 here; chunk 0
760
+ # is reserved for special files and metadata and will be generated last.
761
+ chunk_files = self .create_and_upload_chunks (chunks , data_file_format , temp_base_dir )
692
762
693
- # Tar up the chunks and submit them for upload; note that we start from chunk 1 here; chunk 0
694
- # is reserved for special files and metadata and will be generated last.
695
- chunk_files = self .create_and_upload_chunks (chunks , data_file_format , temp_base_dir )
763
+ total_size_plain = sum (item ["input_size" ] for item in chunk_files )
764
+ total_size_enc = sum (item ["result_size" ] for item in chunk_files )
765
+
766
+ control_files_metadata_extra ["chunks" ] = chunk_files
696
767
697
768
# Everything is now tarred up, grab the latest pg_control and stop the backup process
698
769
with open (os .path .join (pgdata , "global" , "pg_control" ), "rb" ) as fp :
@@ -709,14 +780,16 @@ def run_local_tar_basebackup(self):
709
780
db_conn .commit ()
710
781
backup_stopped = True
711
782
712
- total_size_plain = sum (item ["input_size" ] for item in chunk_files )
713
- total_size_enc = sum (item ["result_size" ] for item in chunk_files )
783
+ backup_time = time .monotonic () - start_time
784
+ self .metrics .gauge (
785
+ "pghoard.backup_time_{}" .format (self .site_config ["basebackup_mode" ]),
786
+ backup_time ,
787
+ )
714
788
715
789
self .log .info (
716
790
"Basebackup generation finished, %r files, %r chunks, "
717
- "%r byte input, %r byte output, took %r seconds, waiting to upload" , total_file_count , len (chunk_files ),
718
- total_size_plain , total_size_enc ,
719
- time .monotonic () - start_time
791
+ "%r byte input, %r byte output, took %r seconds, waiting to upload" , total_file_count , chunks_count ,
792
+ total_size_plain , total_size_enc , backup_time
720
793
)
721
794
722
795
finally :
@@ -740,13 +813,13 @@ def run_local_tar_basebackup(self):
740
813
"backup_end_wal_segment" : backup_end_wal_segment ,
741
814
"backup_start_time" : backup_start_time ,
742
815
"backup_start_wal_segment" : backup_start_wal_segment ,
743
- "chunks" : chunk_files ,
744
816
"pgdata" : pgdata ,
745
817
"pghoard_object" : "basebackup" ,
746
818
"pghoard_version" : version .__version__ ,
747
819
"tablespaces" : tablespaces ,
748
820
"host" : socket .gethostname (),
749
821
}
822
+ metadata .update (control_files_metadata_extra )
750
823
control_files = list (
751
824
self .get_control_entries_for_tar (
752
825
metadata = metadata ,
0 commit comments