Skip to content

Commit adc19f9

Browse files
XD-DENGAlice Berard
authored and
Alice Berard
committed
[AIRFLOW-2916] Arg verify for AwsHook() & S3 sensors/operators (apache#3764)
This is useful when 1. users want to use a different CA cert bundle than the one used by botocore. 2. users want to have '--no-verify-ssl'. This is especially useful when we're using on-premises S3 or other implementations of object storage, like IBM's Cloud Object Storage. The default value here is `None`, which is also the default value in boto3, so that backward compatibility is ensured too. Reference: https://boto3.readthedocs.io/en/latest/reference/core/session.html
1 parent f87de81 commit adc19f9

11 files changed

+133
-15
lines changed

airflow/contrib/hooks/aws_hook.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,9 @@ class AwsHook(BaseHook):
8484
This class is a thin wrapper around the boto3 python library.
8585
"""
8686

87-
def __init__(self, aws_conn_id='aws_default'):
87+
def __init__(self, aws_conn_id='aws_default', verify=None):
8888
self.aws_conn_id = aws_conn_id
89+
self.verify = verify
8990

9091
def _get_credentials(self, region_name):
9192
aws_access_key_id = None
@@ -162,12 +163,14 @@ def _get_credentials(self, region_name):
162163
def get_client_type(self, client_type, region_name=None):
163164
session, endpoint_url = self._get_credentials(region_name)
164165

165-
return session.client(client_type, endpoint_url=endpoint_url)
166+
return session.client(client_type, endpoint_url=endpoint_url,
167+
verify=self.verify)
166168

167169
def get_resource_type(self, resource_type, region_name=None):
168170
session, endpoint_url = self._get_credentials(region_name)
169171

170-
return session.resource(resource_type, endpoint_url=endpoint_url)
172+
return session.resource(resource_type, endpoint_url=endpoint_url,
173+
verify=self.verify)
171174

172175
def get_session(self, region_name=None):
173176
"""Get the underlying boto3.session."""

airflow/contrib/operators/gcs_to_s3.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,16 @@ class GoogleCloudStorageToS3Operator(GoogleCloudStorageListOperator):
4747
:type dest_aws_conn_id: str
4848
:param dest_s3_key: The base S3 key to be used to store the files. (templated)
4949
:type dest_s3_key: str
50+
:parame dest_verify: Whether or not to verify SSL certificates for S3 connection.
51+
By default SSL certificates are verified.
52+
You can provide the following values:
53+
- False: do not validate SSL certificates. SSL will still be used
54+
(unless use_ssl is False), but SSL certificates will not be
55+
verified.
56+
- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
57+
You can specify this argument if you want to use a different
58+
CA cert bundle than the one used by botocore.
59+
:type dest_verify: bool or str
5060
"""
5161
template_fields = ('bucket', 'prefix', 'delimiter', 'dest_s3_key')
5262
ui_color = '#f0eee4'
@@ -60,6 +70,7 @@ def __init__(self,
6070
delegate_to=None,
6171
dest_aws_conn_id=None,
6272
dest_s3_key=None,
73+
dest_verify=None,
6374
replace=False,
6475
*args,
6576
**kwargs):
@@ -75,12 +86,13 @@ def __init__(self,
7586
)
7687
self.dest_aws_conn_id = dest_aws_conn_id
7788
self.dest_s3_key = dest_s3_key
89+
self.dest_verify = dest_verify
7890
self.replace = replace
7991

8092
def execute(self, context):
8193
# use the super to list all files in an Google Cloud Storage bucket
8294
files = super(GoogleCloudStorageToS3Operator, self).execute(context)
83-
s3_hook = S3Hook(aws_conn_id=self.dest_aws_conn_id)
95+
s3_hook = S3Hook(aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify)
8496

8597
if not self.replace:
8698
# if we are not replacing -> list all files in the S3 bucket

airflow/contrib/operators/s3_list_operator.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,16 @@ class S3ListOperator(BaseOperator):
3838
:type delimiter: string
3939
:param aws_conn_id: The connection ID to use when connecting to S3 storage.
4040
:type aws_conn_id: string
41+
:parame verify: Whether or not to verify SSL certificates for S3 connection.
42+
By default SSL certificates are verified.
43+
You can provide the following values:
44+
- False: do not validate SSL certificates. SSL will still be used
45+
(unless use_ssl is False), but SSL certificates will not be
46+
verified.
47+
- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
48+
You can specify this argument if you want to use a different
49+
CA cert bundle than the one used by botocore.
50+
:type verify: bool or str
4151
4252
**Example**:
4353
The following operator would list all the files
@@ -61,16 +71,18 @@ def __init__(self,
6171
prefix='',
6272
delimiter='',
6373
aws_conn_id='aws_default',
74+
verify=None,
6475
*args,
6576
**kwargs):
6677
super(S3ListOperator, self).__init__(*args, **kwargs)
6778
self.bucket = bucket
6879
self.prefix = prefix
6980
self.delimiter = delimiter
7081
self.aws_conn_id = aws_conn_id
82+
self.verify = verify
7183

7284
def execute(self, context):
73-
hook = S3Hook(aws_conn_id=self.aws_conn_id)
85+
hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
7486

7587
self.log.info(
7688
'Getting the list of files from bucket: {0} in prefix: {1} (Delimiter {2})'.

airflow/contrib/operators/s3_to_gcs_operator.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,16 @@ class S3ToGoogleCloudStorageOperator(S3ListOperator):
4141
:type delimiter: string
4242
:param aws_conn_id: The source S3 connection
4343
:type aws_conn_id: string
44+
:parame verify: Whether or not to verify SSL certificates for S3 connection.
45+
By default SSL certificates are verified.
46+
You can provide the following values:
47+
- False: do not validate SSL certificates. SSL will still be used
48+
(unless use_ssl is False), but SSL certificates will not be
49+
verified.
50+
- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
51+
You can specify this argument if you want to use a different
52+
CA cert bundle than the one used by botocore.
53+
:type verify: bool or str
4454
:param dest_gcs_conn_id: The destination connection ID to use
4555
when connecting to Google Cloud Storage.
4656
:type dest_gcs_conn_id: string
@@ -80,6 +90,7 @@ def __init__(self,
8090
prefix='',
8191
delimiter='',
8292
aws_conn_id='aws_default',
93+
verify=None,
8394
dest_gcs_conn_id=None,
8495
dest_gcs=None,
8596
delegate_to=None,
@@ -98,6 +109,7 @@ def __init__(self,
98109
self.dest_gcs = dest_gcs
99110
self.delegate_to = delegate_to
100111
self.replace = replace
112+
self.verify = verify
101113

102114
if dest_gcs and not self._gcs_object_is_directory(self.dest_gcs):
103115
self.log.info(
@@ -146,7 +158,7 @@ def execute(self, context):
146158
'There are no new files to sync. Have a nice day!')
147159

148160
if files:
149-
hook = S3Hook(aws_conn_id=self.aws_conn_id)
161+
hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
150162

151163
for file in files:
152164
# GCS hook builds its own in-memory file so we have to create

airflow/operators/redshift_to_s3_operator.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,16 @@ class RedshiftToS3Transfer(BaseOperator):
3939
:type redshift_conn_id: string
4040
:param aws_conn_id: reference to a specific S3 connection
4141
:type aws_conn_id: string
42+
:parame verify: Whether or not to verify SSL certificates for S3 connection.
43+
By default SSL certificates are verified.
44+
You can provide the following values:
45+
- False: do not validate SSL certificates. SSL will still be used
46+
(unless use_ssl is False), but SSL certificates will not be
47+
verified.
48+
- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
49+
You can specify this argument if you want to use a different
50+
CA cert bundle than the one used by botocore.
51+
:type verify: bool or str
4252
:param unload_options: reference to a list of UNLOAD options
4353
:type unload_options: list
4454
"""
@@ -56,6 +66,7 @@ def __init__(
5666
s3_key,
5767
redshift_conn_id='redshift_default',
5868
aws_conn_id='aws_default',
69+
verify=None,
5970
unload_options=tuple(),
6071
autocommit=False,
6172
parameters=None,
@@ -68,6 +79,7 @@ def __init__(
6879
self.s3_key = s3_key
6980
self.redshift_conn_id = redshift_conn_id
7081
self.aws_conn_id = aws_conn_id
82+
self.verify = verify
7183
self.unload_options = unload_options
7284
self.autocommit = autocommit
7385
self.parameters = parameters
@@ -79,7 +91,7 @@ def __init__(
7991

8092
def execute(self, context):
8193
self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
82-
self.s3 = S3Hook(aws_conn_id=self.aws_conn_id)
94+
self.s3 = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
8395
credentials = self.s3.get_credentials()
8496
unload_options = '\n\t\t\t'.join(self.unload_options)
8597

airflow/operators/s3_file_transform_operator.py

+19-2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,17 @@ class S3FileTransformOperator(BaseOperator):
4747
:type source_s3_key: str
4848
:param source_aws_conn_id: source s3 connection
4949
:type source_aws_conn_id: str
50+
:parame source_verify: Whether or not to verify SSL certificates for S3 connetion.
51+
By default SSL certificates are verified.
52+
You can provide the following values:
53+
- False: do not validate SSL certificates. SSL will still be used
54+
(unless use_ssl is False), but SSL certificates will not be
55+
verified.
56+
- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
57+
You can specify this argument if you want to use a different
58+
CA cert bundle than the one used by botocore.
59+
This is also applicable to ``dest_verify``.
60+
:type source_verify: bool or str
5061
:param dest_s3_key: The key to be written from S3. (templated)
5162
:type dest_s3_key: str
5263
:param dest_aws_conn_id: destination s3 connection
@@ -71,14 +82,18 @@ def __init__(
7182
transform_script=None,
7283
select_expression=None,
7384
source_aws_conn_id='aws_default',
85+
source_verify=None,
7486
dest_aws_conn_id='aws_default',
87+
dest_verify=None,
7588
replace=False,
7689
*args, **kwargs):
7790
super(S3FileTransformOperator, self).__init__(*args, **kwargs)
7891
self.source_s3_key = source_s3_key
7992
self.source_aws_conn_id = source_aws_conn_id
93+
self.source_verify = source_verify
8094
self.dest_s3_key = dest_s3_key
8195
self.dest_aws_conn_id = dest_aws_conn_id
96+
self.dest_verify = dest_verify
8297
self.replace = replace
8398
self.transform_script = transform_script
8499
self.select_expression = select_expression
@@ -88,8 +103,10 @@ def execute(self, context):
88103
raise AirflowException(
89104
"Either transform_script or select_expression must be specified")
90105

91-
source_s3 = S3Hook(aws_conn_id=self.source_aws_conn_id)
92-
dest_s3 = S3Hook(aws_conn_id=self.dest_aws_conn_id)
106+
source_s3 = S3Hook(aws_conn_id=self.source_aws_conn_id,
107+
verify=self.source_verify)
108+
dest_s3 = S3Hook(aws_conn_id=self.dest_aws_conn_id,
109+
verify=self.dest_verify)
93110

94111
self.log.info("Downloading source S3 file %s", self.source_s3_key)
95112
if not source_s3.check_for_key(self.source_s3_key):

airflow/operators/s3_to_hive_operator.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,16 @@ class S3ToHiveTransfer(BaseOperator):
7878
:type delimiter: str
7979
:param aws_conn_id: source s3 connection
8080
:type aws_conn_id: str
81+
:parame verify: Whether or not to verify SSL certificates for S3 connection.
82+
By default SSL certificates are verified.
83+
You can provide the following values:
84+
- False: do not validate SSL certificates. SSL will still be used
85+
(unless use_ssl is False), but SSL certificates will not be
86+
verified.
87+
- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
88+
You can specify this argument if you want to use a different
89+
CA cert bundle than the one used by botocore.
90+
:type verify: bool or str
8191
:param hive_cli_conn_id: destination hive connection
8292
:type hive_cli_conn_id: str
8393
:param input_compressed: Boolean to determine if file decompression is
@@ -107,6 +117,7 @@ def __init__(
107117
check_headers=False,
108118
wildcard_match=False,
109119
aws_conn_id='aws_default',
120+
verify=None,
110121
hive_cli_conn_id='hive_cli_default',
111122
input_compressed=False,
112123
tblproperties=None,
@@ -125,6 +136,7 @@ def __init__(
125136
self.wildcard_match = wildcard_match
126137
self.hive_cli_conn_id = hive_cli_conn_id
127138
self.aws_conn_id = aws_conn_id
139+
self.verify = verify
128140
self.input_compressed = input_compressed
129141
self.tblproperties = tblproperties
130142
self.select_expression = select_expression
@@ -136,7 +148,7 @@ def __init__(
136148

137149
def execute(self, context):
138150
# Downloading file from S3
139-
self.s3 = S3Hook(aws_conn_id=self.aws_conn_id)
151+
self.s3 = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
140152
self.hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
141153
self.log.info("Downloading S3 file")
142154

airflow/operators/s3_to_redshift_operator.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,16 @@ class S3ToRedshiftTransfer(BaseOperator):
3939
:type redshift_conn_id: string
4040
:param aws_conn_id: reference to a specific S3 connection
4141
:type aws_conn_id: string
42+
:parame verify: Whether or not to verify SSL certificates for S3 connection.
43+
By default SSL certificates are verified.
44+
You can provide the following values:
45+
- False: do not validate SSL certificates. SSL will still be used
46+
(unless use_ssl is False), but SSL certificates will not be
47+
verified.
48+
- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
49+
You can specify this argument if you want to use a different
50+
CA cert bundle than the one used by botocore.
51+
:type verify: bool or str
4252
:param copy_options: reference to a list of COPY options
4353
:type copy_options: list
4454
"""
@@ -56,6 +66,7 @@ def __init__(
5666
s3_key,
5767
redshift_conn_id='redshift_default',
5868
aws_conn_id='aws_default',
69+
verify=None,
5970
copy_options=tuple(),
6071
autocommit=False,
6172
parameters=None,
@@ -67,13 +78,14 @@ def __init__(
6778
self.s3_key = s3_key
6879
self.redshift_conn_id = redshift_conn_id
6980
self.aws_conn_id = aws_conn_id
81+
self.verify = verify
7082
self.copy_options = copy_options
7183
self.autocommit = autocommit
7284
self.parameters = parameters
7385

7486
def execute(self, context):
7587
self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
76-
self.s3 = S3Hook(aws_conn_id=self.aws_conn_id)
88+
self.s3 = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
7789
credentials = self.s3.get_credentials()
7890
copy_options = '\n\t\t\t'.join(self.copy_options)
7991

airflow/sensors/s3_key_sensor.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@ class S3KeySensor(BaseSensorOperator):
4343
:type wildcard_match: bool
4444
:param aws_conn_id: a reference to the s3 connection
4545
:type aws_conn_id: str
46+
:parame verify: Whether or not to verify SSL certificates for S3 connection.
47+
By default SSL certificates are verified.
48+
You can provide the following values:
49+
- False: do not validate SSL certificates. SSL will still be used
50+
(unless use_ssl is False), but SSL certificates will not be
51+
verified.
52+
- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
53+
You can specify this argument if you want to use a different
54+
CA cert bundle than the one used by botocore.
55+
:type verify: bool or str
4656
"""
4757
template_fields = ('bucket_key', 'bucket_name')
4858

@@ -52,6 +62,7 @@ def __init__(self,
5262
bucket_name=None,
5363
wildcard_match=False,
5464
aws_conn_id='aws_default',
65+
verify=None,
5566
*args,
5667
**kwargs):
5768
super(S3KeySensor, self).__init__(*args, **kwargs)
@@ -76,10 +87,11 @@ def __init__(self,
7687
self.bucket_key = bucket_key
7788
self.wildcard_match = wildcard_match
7889
self.aws_conn_id = aws_conn_id
90+
self.verify = verify
7991

8092
def poke(self, context):
8193
from airflow.hooks.S3_hook import S3Hook
82-
hook = S3Hook(aws_conn_id=self.aws_conn_id)
94+
hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
8395
full_url = "s3://" + self.bucket_name + "/" + self.bucket_key
8496
self.log.info('Poking for key : {full_url}'.format(**locals()))
8597
if self.wildcard_match:

0 commit comments

Comments
 (0)