Skip to content

Commit db9bb7f

Browse files
authored
[AIRFLOW-2981] Fix TypeError in dataflow operators (#3831)
- Fix TypeError in dataflow operators when using GCS jar or py_file
1 parent f279151 commit db9bb7f

File tree

2 files changed

+29
-6
lines changed

2 files changed

+29
-6
lines changed

airflow/contrib/operators/dataflow_operator.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# KIND, either express or implied. See the License for the
1717
# specific language governing permissions and limitations
1818
# under the License.
19-
19+
import os
2020
import re
2121
import uuid
2222
import copy
@@ -359,7 +359,7 @@ def google_cloud_to_local(self, file_name):
359359
# Extracts bucket_id and object_id by first removing 'gs://' prefix and
360360
# then split the remaining by path delimiter '/'.
361361
path_components = file_name[self.GCS_PREFIX_LENGTH:].split('/')
362-
if path_components < 2:
362+
if len(path_components) < 2:
363363
raise Exception(
364364
'Invalid Google Cloud Storage (GCS) object path: {}.'
365365
.format(file_name))
@@ -370,7 +370,7 @@ def google_cloud_to_local(self, file_name):
370370
path_components[-1])
371371
file_size = self._gcs_hook.download(bucket_id, object_id, local_file)
372372

373-
if file_size > 0:
373+
if os.stat(file_size).st_size > 0:
374374
return local_file
375375
raise Exception(
376376
'Failed to download Google Cloud Storage GCS object: {}'

tests/contrib/operators/test_dataflow_operator.py

+26-3
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020

2121
import unittest
2222

23-
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator, \
24-
DataFlowJavaOperator, DataflowTemplateOperator
25-
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
23+
from airflow.contrib.operators.dataflow_operator import \
24+
DataFlowPythonOperator, DataFlowJavaOperator, \
25+
DataflowTemplateOperator, GoogleCloudBucketHelper
26+
2627
from airflow.version import version
2728

2829
try:
@@ -186,3 +187,25 @@ def test_exec(self, dataflow_mock):
186187
}
187188
start_template_hook.assert_called_once_with(TASK_ID, expected_options,
188189
PARAMETERS, TEMPLATE)
190+
191+
192+
class GoogleCloudBucketHelperTest(unittest.TestCase):
193+
194+
@mock.patch(
195+
'airflow.contrib.operators.dataflow_operator.GoogleCloudBucketHelper.__init__'
196+
)
197+
def test_invalid_object_path(self, mock_parent_init):
198+
199+
# This is just the path of a bucket hence invalid filename
200+
file_name = 'gs://test-bucket'
201+
mock_parent_init.return_value = None
202+
203+
gcs_bucket_helper = GoogleCloudBucketHelper()
204+
gcs_bucket_helper._gcs_hook = mock.Mock()
205+
206+
with self.assertRaises(Exception) as context:
207+
gcs_bucket_helper.google_cloud_to_local(file_name)
208+
209+
self.assertEquals(
210+
'Invalid Google Cloud Storage (GCS) object path: {}.'.format(file_name),
211+
str(context.exception))

0 commit comments

Comments
 (0)