Skip to content

Commit 5be025d

Browse files
jeffkpaynekaxil
authored andcommitted
[AIRFLOW-3002] Fix variable & tests in GoogleCloudBucketHelper (#3843)
1 parent 1f88a4c commit 5be025d

File tree

2 files changed

+55
-7
lines changed

2 files changed

+55
-7
lines changed

airflow/contrib/operators/dataflow_operator.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -361,17 +361,17 @@ def google_cloud_to_local(self, file_name):
361361
path_components = file_name[self.GCS_PREFIX_LENGTH:].split('/')
362362
if len(path_components) < 2:
363363
raise Exception(
364-
'Invalid Google Cloud Storage (GCS) object path: {}.'
364+
'Invalid Google Cloud Storage (GCS) object path: {}'
365365
.format(file_name))
366366

367367
bucket_id = path_components[0]
368368
object_id = '/'.join(path_components[1:])
369369
local_file = '/tmp/dataflow{}-{}'.format(str(uuid.uuid4())[:8],
370370
path_components[-1])
371-
file_size = self._gcs_hook.download(bucket_id, object_id, local_file)
371+
self._gcs_hook.download(bucket_id, object_id, local_file)
372372

373-
if os.stat(file_size).st_size > 0:
373+
if os.stat(local_file).st_size > 0:
374374
return local_file
375375
raise Exception(
376-
'Failed to download Google Cloud Storage GCS object: {}'
376+
'Failed to download Google Cloud Storage (GCS) object: {}'
377377
.format(file_name))

tests/contrib/operators/test_dataflow_operator.py

+51-3
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
# to you under the Apache License, Version 2.0 (the
88
# "License"); you may not use this file except in compliance
99
# with the License. You may obtain a copy of the License at
10-
#
10+
#
1111
# http://www.apache.org/licenses/LICENSE-2.0
12-
#
12+
#
1313
# Unless required by applicable law or agreed to in writing,
1414
# software distributed under the License is distributed on an
1515
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -207,5 +207,53 @@ def test_invalid_object_path(self, mock_parent_init):
207207
gcs_bucket_helper.google_cloud_to_local(file_name)
208208

209209
self.assertEquals(
210-
'Invalid Google Cloud Storage (GCS) object path: {}.'.format(file_name),
210+
'Invalid Google Cloud Storage (GCS) object path: {}'.format(file_name),
211+
str(context.exception))
212+
213+
@mock.patch(
214+
'airflow.contrib.operators.dataflow_operator.GoogleCloudBucketHelper.__init__'
215+
)
216+
def test_valid_object(self, mock_parent_init):
217+
218+
file_name = 'gs://test-bucket/path/to/obj.jar'
219+
mock_parent_init.return_value = None
220+
221+
gcs_bucket_helper = GoogleCloudBucketHelper()
222+
gcs_bucket_helper._gcs_hook = mock.Mock()
223+
224+
def _mock_download(bucket, object, filename=None):
225+
text_file_contents = 'text file contents'
226+
with open(filename, 'w') as text_file:
227+
text_file.write(text_file_contents)
228+
return text_file_contents
229+
230+
gcs_bucket_helper._gcs_hook.download.side_effect = _mock_download
231+
232+
local_file = gcs_bucket_helper.google_cloud_to_local(file_name)
233+
self.assertIn('obj.jar', local_file)
234+
235+
@mock.patch(
236+
'airflow.contrib.operators.dataflow_operator.GoogleCloudBucketHelper.__init__'
237+
)
238+
def test_empty_object(self, mock_parent_init):
239+
240+
file_name = 'gs://test-bucket/path/to/obj.jar'
241+
mock_parent_init.return_value = None
242+
243+
gcs_bucket_helper = GoogleCloudBucketHelper()
244+
gcs_bucket_helper._gcs_hook = mock.Mock()
245+
246+
def _mock_download(bucket, object, filename=None):
247+
text_file_contents = ''
248+
with open(filename, 'w') as text_file:
249+
text_file.write(text_file_contents)
250+
return text_file_contents
251+
252+
gcs_bucket_helper._gcs_hook.download.side_effect = _mock_download
253+
254+
with self.assertRaises(Exception) as context:
255+
gcs_bucket_helper.google_cloud_to_local(file_name)
256+
257+
self.assertEquals(
258+
'Failed to download Google Cloud Storage (GCS) object: {}'.format(file_name),
211259
str(context.exception))

0 commit comments

Comments
 (0)