Skip to content

Commit 469d731

Browse files
ryanyuanashb
authored andcommitted
[AIRFLOW-4931] Add KMS Encryption Configuration to BigQuery Hook and Operators (apache#5567)
Add KMS to BigQuery (cherry picked from commit 682aea2)
1 parent 17159f4 commit 469d731

File tree

6 files changed

+343
-16
lines changed

6 files changed

+343
-16
lines changed

airflow/contrib/hooks/bigquery_hook.py

+78-6
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ def __init__(self,
226226
self.location = location
227227
self.num_retries = num_retries
228228

229+
# pylint: disable=too-many-arguments
229230
def create_empty_table(self,
230231
project_id,
231232
dataset_id,
@@ -235,6 +236,7 @@ def create_empty_table(self,
235236
cluster_fields=None,
236237
labels=None,
237238
view=None,
239+
encryption_configuration=None,
238240
num_retries=None):
239241
"""
240242
Creates a new, empty table in the dataset.
@@ -280,6 +282,13 @@ def create_empty_table(self,
280282
"useLegacySql": False
281283
}
282284
285+
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
286+
**Example**: ::
287+
288+
encryption_configuration = {
289+
"kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key"
290+
}
291+
:type encryption_configuration: dict
283292
:return: None
284293
"""
285294

@@ -308,6 +317,9 @@ def create_empty_table(self,
308317
if view:
309318
table_resource['view'] = view
310319

320+
if encryption_configuration:
321+
table_resource["encryptionConfiguration"] = encryption_configuration
322+
311323
num_retries = num_retries if num_retries else self.num_retries
312324

313325
self.log.info('Creating Table %s:%s.%s',
@@ -342,7 +354,8 @@ def create_external_table(self,
342354
allow_quoted_newlines=False,
343355
allow_jagged_rows=False,
344356
src_fmt_configs=None,
345-
labels=None
357+
labels=None,
358+
encryption_configuration=None
346359
):
347360
"""
348361
Creates a new external table in the dataset with the data in Google
@@ -405,6 +418,13 @@ def create_external_table(self,
405418
:type src_fmt_configs: dict
406419
:param labels: a dictionary containing labels for the table, passed to BigQuery
407420
:type labels: dict
421+
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
422+
**Example**: ::
423+
424+
encryption_configuration = {
425+
"kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key"
426+
}
427+
:type encryption_configuration: dict
408428
"""
409429

410430
if src_fmt_configs is None:
@@ -508,6 +528,9 @@ def create_external_table(self,
508528
if labels:
509529
table_resource['labels'] = labels
510530

531+
if encryption_configuration:
532+
table_resource["encryptionConfiguration"] = encryption_configuration
533+
511534
try:
512535
self.service.tables().insert(
513536
projectId=project_id,
@@ -535,7 +558,8 @@ def patch_table(self,
535558
schema=None,
536559
time_partitioning=None,
537560
view=None,
538-
require_partition_filter=None):
561+
require_partition_filter=None,
562+
encryption_configuration=None):
539563
"""
540564
Patch information in an existing table.
541565
It only updates fileds that are provided in the request object.
@@ -587,6 +611,13 @@ def patch_table(self,
587611
:param require_partition_filter: [Optional] If true, queries over the this table require a
588612
partition filter. If false, queries over the table
589613
:type require_partition_filter: bool
614+
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
615+
**Example**: ::
616+
617+
encryption_configuration = {
618+
"kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key"
619+
}
620+
:type encryption_configuration: dict
590621
591622
"""
592623

@@ -612,6 +643,8 @@ def patch_table(self,
612643
table_resource['view'] = view
613644
if require_partition_filter is not None:
614645
table_resource['requirePartitionFilter'] = require_partition_filter
646+
if encryption_configuration:
647+
table_resource["encryptionConfiguration"] = encryption_configuration
615648

616649
self.log.info('Patching Table %s:%s.%s',
617650
project_id, dataset_id, table_id)
@@ -631,7 +664,7 @@ def patch_table(self,
631664
'BigQuery job failed. Error was: {}'.format(err.content)
632665
)
633666

634-
def run_query(self,
667+
def run_query(self, # pylint: disable=too-many-locals,too-many-arguments
635668
bql=None,
636669
sql=None,
637670
destination_dataset_table=None,
@@ -650,7 +683,8 @@ def run_query(self,
650683
time_partitioning=None,
651684
api_resource_configs=None,
652685
cluster_fields=None,
653-
location=None):
686+
location=None,
687+
encryption_configuration=None):
654688
"""
655689
Executes a BigQuery SQL query. Optionally persists results in a BigQuery
656690
table. See here:
@@ -725,6 +759,13 @@ def run_query(self,
725759
US and EU. See details at
726760
https://cloud.google.com/bigquery/docs/locations#specifying_your_location
727761
:type location: str
762+
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
763+
**Example**: ::
764+
765+
encryption_configuration = {
766+
"kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key"
767+
}
768+
:type encryption_configuration: dict
728769
"""
729770

730771
if time_partitioning is None:
@@ -867,6 +908,11 @@ def run_query(self,
867908
'labels', labels, configuration)
868909
configuration['labels'] = labels
869910

911+
if encryption_configuration:
912+
configuration["query"][
913+
"destinationEncryptionConfiguration"
914+
] = encryption_configuration
915+
870916
return self.run_with_configuration(configuration)
871917

872918
def run_extract( # noqa
@@ -942,7 +988,8 @@ def run_copy(self,
942988
destination_project_dataset_table,
943989
write_disposition='WRITE_EMPTY',
944990
create_disposition='CREATE_IF_NEEDED',
945-
labels=None):
991+
labels=None,
992+
encryption_configuration=None):
946993
"""
947994
Executes a BigQuery copy command to copy data from one BigQuery table
948995
to another. See here:
@@ -968,6 +1015,13 @@ def run_copy(self,
9681015
:param labels: a dictionary containing labels for the job/query,
9691016
passed to BigQuery
9701017
:type labels: dict
1018+
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
1019+
**Example**: ::
1020+
1021+
encryption_configuration = {
1022+
"kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key"
1023+
}
1024+
:type encryption_configuration: dict
9711025
"""
9721026
source_project_dataset_tables = ([
9731027
source_project_dataset_tables
@@ -1008,6 +1062,11 @@ def run_copy(self,
10081062
if labels:
10091063
configuration['labels'] = labels
10101064

1065+
if encryption_configuration:
1066+
configuration["copy"][
1067+
"destinationEncryptionConfiguration"
1068+
] = encryption_configuration
1069+
10111070
return self.run_with_configuration(configuration)
10121071

10131072
def run_load(self,
@@ -1028,7 +1087,8 @@ def run_load(self,
10281087
src_fmt_configs=None,
10291088
time_partitioning=None,
10301089
cluster_fields=None,
1031-
autodetect=False):
1090+
autodetect=False,
1091+
encryption_configuration=None):
10321092
"""
10331093
Executes a BigQuery load command to load data from Google Cloud Storage
10341094
to BigQuery. See here:
@@ -1098,6 +1158,13 @@ def run_load(self,
10981158
by one or more columns. This is only available in combination with
10991159
time_partitioning. The order of columns given determines the sort order.
11001160
:type cluster_fields: list[str]
1161+
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
1162+
**Example**: ::
1163+
1164+
encryption_configuration = {
1165+
"kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key"
1166+
}
1167+
:type encryption_configuration: dict
11011168
"""
11021169

11031170
# bigquery only allows certain source formats
@@ -1189,6 +1256,11 @@ def run_load(self,
11891256
if max_bad_records:
11901257
configuration['load']['maxBadRecords'] = max_bad_records
11911258

1259+
if encryption_configuration:
1260+
configuration["load"][
1261+
"destinationEncryptionConfiguration"
1262+
] = encryption_configuration
1263+
11921264
# if following fields are not specified in src_fmt_configs,
11931265
# honor the top-level params for backward-compatibility
11941266
if 'skipLeadingRows' not in src_fmt_configs:

airflow/contrib/operators/bigquery_operator.py

+33-3
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,13 @@ class BigQueryOperator(BaseOperator):
130130
US and EU. See details at
131131
https://cloud.google.com/bigquery/docs/locations#specifying_your_location
132132
:type location: str
133+
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
134+
**Example**: ::
135+
136+
encryption_configuration = {
137+
"kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key"
138+
}
139+
:type encryption_configuration: dict
133140
"""
134141

135142
template_fields = ('bql', 'sql', 'destination_dataset_table', 'labels')
@@ -140,7 +147,7 @@ class BigQueryOperator(BaseOperator):
140147
BigQueryConsoleLink(),
141148
)
142149

143-
# pylint: disable=too-many-arguments
150+
# pylint: disable=too-many-arguments, too-many-locals
144151
@apply_defaults
145152
def __init__(self,
146153
bql=None,
@@ -164,6 +171,7 @@ def __init__(self,
164171
api_resource_configs=None,
165172
cluster_fields=None,
166173
location=None,
174+
encryption_configuration=None,
167175
*args,
168176
**kwargs):
169177
super(BigQueryOperator, self).__init__(*args, **kwargs)
@@ -189,6 +197,7 @@ def __init__(self,
189197
self.api_resource_configs = api_resource_configs
190198
self.cluster_fields = cluster_fields
191199
self.location = location
200+
self.encryption_configuration = encryption_configuration
192201

193202
# TODO remove `bql` in Airflow 2.0
194203
if self.bql:
@@ -233,6 +242,7 @@ def execute(self, context):
233242
time_partitioning=self.time_partitioning,
234243
api_resource_configs=self.api_resource_configs,
235244
cluster_fields=self.cluster_fields,
245+
encryption_configuration=self.encryption_configuration
236246
)
237247
elif isinstance(self.sql, Iterable):
238248
job_id = [
@@ -253,6 +263,7 @@ def execute(self, context):
253263
time_partitioning=self.time_partitioning,
254264
api_resource_configs=self.api_resource_configs,
255265
cluster_fields=self.cluster_fields,
266+
encryption_configuration=self.encryption_configuration
256267
)
257268
for s in self.sql]
258269
else:
@@ -354,7 +365,13 @@ class BigQueryCreateEmptyTableOperator(BaseOperator):
354365
google_cloud_storage_conn_id='airflow-service-account'
355366
)
356367
:type labels: dict
368+
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
369+
**Example**: ::
357370
371+
encryption_configuration = {
372+
"kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key"
373+
}
374+
:type encryption_configuration: dict
358375
"""
359376
template_fields = ('dataset_id', 'table_id', 'project_id',
360377
'gcs_schema_object', 'labels')
@@ -373,6 +390,7 @@ def __init__(self,
373390
google_cloud_storage_conn_id='google_cloud_default',
374391
delegate_to=None,
375392
labels=None,
393+
encryption_configuration=None,
376394
*args, **kwargs):
377395

378396
super(BigQueryCreateEmptyTableOperator, self).__init__(*args, **kwargs)
@@ -387,6 +405,7 @@ def __init__(self,
387405
self.delegate_to = delegate_to
388406
self.time_partitioning = {} if time_partitioning is None else time_partitioning
389407
self.labels = labels
408+
self.encryption_configuration = encryption_configuration
390409

391410
def execute(self, context):
392411
bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
@@ -414,7 +433,8 @@ def execute(self, context):
414433
table_id=self.table_id,
415434
schema_fields=schema_fields,
416435
time_partitioning=self.time_partitioning,
417-
labels=self.labels
436+
labels=self.labels,
437+
encryption_configuration=self.encryption_configuration
418438
)
419439

420440

@@ -490,6 +510,13 @@ class BigQueryCreateExternalTableOperator(BaseOperator):
490510
:type src_fmt_configs: dict
491511
:param labels: a dictionary containing labels for the table, passed to BigQuery
492512
:type labels: dict
513+
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
514+
**Example**: ::
515+
516+
encryption_configuration = {
517+
"kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key"
518+
}
519+
:type encryption_configuration: dict
493520
"""
494521
template_fields = ('bucket', 'source_objects',
495522
'schema_object', 'destination_project_dataset_table', 'labels')
@@ -516,6 +543,7 @@ def __init__(self,
516543
delegate_to=None,
517544
src_fmt_configs=None,
518545
labels=None,
546+
encryption_configuration=None,
519547
*args, **kwargs):
520548

521549
super(BigQueryCreateExternalTableOperator, self).__init__(*args, **kwargs)
@@ -543,6 +571,7 @@ def __init__(self,
543571

544572
self.src_fmt_configs = src_fmt_configs if src_fmt_configs is not None else dict()
545573
self.labels = labels
574+
self.encryption_configuration = encryption_configuration
546575

547576
def execute(self, context):
548577
bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
@@ -577,7 +606,8 @@ def execute(self, context):
577606
allow_quoted_newlines=self.allow_quoted_newlines,
578607
allow_jagged_rows=self.allow_jagged_rows,
579608
src_fmt_configs=self.src_fmt_configs,
580-
labels=self.labels
609+
labels=self.labels,
610+
encryption_configuration=self.encryption_configuration
581611
)
582612

583613

airflow/contrib/operators/bigquery_to_bigquery.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,13 @@ class BigQueryToBigQueryOperator(BaseOperator):
5252
:param labels: a dictionary containing labels for the job/query,
5353
passed to BigQuery
5454
:type labels: dict
55+
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
56+
**Example**: ::
57+
58+
encryption_configuration = {
59+
"kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key"
60+
}
61+
:type encryption_configuration: dict
5562
"""
5663
template_fields = ('source_project_dataset_tables',
5764
'destination_project_dataset_table', 'labels')
@@ -67,6 +74,7 @@ def __init__(self,
6774
bigquery_conn_id='bigquery_default',
6875
delegate_to=None,
6976
labels=None,
77+
encryption_configuration=None,
7078
*args,
7179
**kwargs):
7280
super(BigQueryToBigQueryOperator, self).__init__(*args, **kwargs)
@@ -77,6 +85,7 @@ def __init__(self,
7785
self.bigquery_conn_id = bigquery_conn_id
7886
self.delegate_to = delegate_to
7987
self.labels = labels
88+
self.encryption_configuration = encryption_configuration
8089

8190
def execute(self, context):
8291
self.log.info(
@@ -92,4 +101,5 @@ def execute(self, context):
92101
destination_project_dataset_table=self.destination_project_dataset_table,
93102
write_disposition=self.write_disposition,
94103
create_disposition=self.create_disposition,
95-
labels=self.labels)
104+
labels=self.labels,
105+
encryption_configuration=self.encryption_configuration)

0 commit comments

Comments
 (0)