Skip to content

Commit fc6fc12

Browse files
kaxilAlice Berard
authored and
Alice Berard
committed
[AIRFLOW-3438] Fix default values in BigQuery Hook & BigQueryOperator (apache#4274)
1 parent 4b851e9 commit fc6fc12

File tree

3 files changed

+92
-9
lines changed

3 files changed

+92
-9
lines changed

airflow/contrib/hooks/bigquery_hook.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,7 @@ def run_query(self,
566566
:param labels a dictionary containing labels for the job/query,
567567
passed to BigQuery
568568
:type labels: dict
569-
:param schema_update_options: Allows the schema of the desitination
569+
:param schema_update_options: Allows the schema of the destination
570570
table to be updated as a side effect of the query job.
571571
:type schema_update_options: tuple
572572
:param priority: Specifies a priority for the query.
@@ -582,6 +582,9 @@ def run_query(self,
582582
:type cluster_fields: list of str
583583
"""
584584

585+
if time_partitioning is None:
586+
time_partitioning = {}
587+
585588
if not api_resource_configs:
586589
api_resource_configs = self.api_resource_configs
587590
else:

airflow/contrib/operators/bigquery_operator.py

+5-7
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,13 @@ class BigQueryOperator(BaseOperator):
106106
@apply_defaults
107107
def __init__(self,
108108
sql,
109-
destination_dataset_table=False,
109+
destination_dataset_table=None,
110110
write_disposition='WRITE_EMPTY',
111111
allow_large_results=False,
112112
flatten_results=None,
113113
bigquery_conn_id='bigquery_default',
114114
delegate_to=None,
115-
udf_config=False,
115+
udf_config=None,
116116
use_legacy_sql=True,
117117
maximum_billing_tier=None,
118118
maximum_bytes_billed=None,
@@ -144,10 +144,8 @@ def __init__(self,
144144
self.labels = labels
145145
self.bq_cursor = None
146146
self.priority = priority
147-
if time_partitioning is None:
148-
self.time_partitioning = {}
149-
if api_resource_configs is None:
150-
self.api_resource_configs = {}
147+
self.time_partitioning = time_partitioning
148+
self.api_resource_configs = api_resource_configs
151149
self.cluster_fields = cluster_fields
152150

153151
def execute(self, context):
@@ -160,7 +158,7 @@ def execute(self, context):
160158
conn = hook.get_conn()
161159
self.bq_cursor = conn.cursor()
162160
self.bq_cursor.run_query(
163-
self.sql,
161+
sql=self.sql,
164162
destination_dataset_table=self.destination_dataset_table,
165163
write_disposition=self.write_disposition,
166164
allow_large_results=self.allow_large_results,

tests/contrib/operators/test_bigquery_operator.py

+83-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121

2222
from airflow.contrib.operators.bigquery_operator import \
2323
BigQueryCreateExternalTableOperator, BigQueryCreateEmptyTableOperator, \
24-
BigQueryDeleteDatasetOperator, BigQueryCreateEmptyDatasetOperator
24+
BigQueryDeleteDatasetOperator, BigQueryCreateEmptyDatasetOperator, \
25+
BigQueryOperator
2526

2627
try:
2728
from unittest import mock
@@ -143,3 +144,84 @@ def test_execute(self, mock_hook):
143144
project_id=TEST_PROJECT_ID,
144145
dataset_reference={}
145146
)
147+
148+
149+
class BigQueryOperatorTest(unittest.TestCase):
150+
@mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook')
151+
def test_execute(self, mock_hook):
152+
operator = BigQueryOperator(
153+
task_id=TASK_ID,
154+
sql='Select * from test_table',
155+
destination_dataset_table=None,
156+
write_disposition='WRITE_EMPTY',
157+
allow_large_results=False,
158+
flatten_results=None,
159+
bigquery_conn_id='bigquery_default',
160+
udf_config=None,
161+
use_legacy_sql=True,
162+
maximum_billing_tier=None,
163+
maximum_bytes_billed=None,
164+
create_disposition='CREATE_IF_NEEDED',
165+
schema_update_options=(),
166+
query_params=None,
167+
labels=None,
168+
priority='INTERACTIVE',
169+
time_partitioning=None,
170+
api_resource_configs=None,
171+
cluster_fields=None,
172+
)
173+
174+
operator.execute(None)
175+
mock_hook.return_value \
176+
.get_conn() \
177+
.cursor() \
178+
.run_query \
179+
.assert_called_once_with(
180+
sql='Select * from test_table',
181+
destination_dataset_table=None,
182+
write_disposition='WRITE_EMPTY',
183+
allow_large_results=False,
184+
flatten_results=None,
185+
udf_config=None,
186+
maximum_billing_tier=None,
187+
maximum_bytes_billed=None,
188+
create_disposition='CREATE_IF_NEEDED',
189+
schema_update_options=(),
190+
query_params=None,
191+
labels=None,
192+
priority='INTERACTIVE',
193+
time_partitioning=None,
194+
api_resource_configs=None,
195+
cluster_fields=None,
196+
)
197+
198+
@mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook')
199+
def test_bigquery_operator_defaults(self, mock_hook):
200+
operator = BigQueryOperator(
201+
task_id=TASK_ID,
202+
sql='Select * from test_table',
203+
)
204+
205+
operator.execute(None)
206+
mock_hook.return_value \
207+
.get_conn() \
208+
.cursor() \
209+
.run_query \
210+
.assert_called_once_with(
211+
sql='Select * from test_table',
212+
destination_dataset_table=None,
213+
write_disposition='WRITE_EMPTY',
214+
allow_large_results=False,
215+
flatten_results=None,
216+
udf_config=None,
217+
maximum_billing_tier=None,
218+
maximum_bytes_billed=None,
219+
create_disposition='CREATE_IF_NEEDED',
220+
schema_update_options=(),
221+
query_params=None,
222+
labels=None,
223+
priority='INTERACTIVE',
224+
time_partitioning=None,
225+
api_resource_configs=None,
226+
cluster_fields=None,
227+
)

0 commit comments

Comments
 (0)