@@ -592,9 +592,11 @@ def run_query(self,
592
592
}
593
593
594
594
if destination_dataset_table :
595
- assert '.' in destination_dataset_table , (
596
- 'Expected destination_dataset_table in the format of '
597
- '<dataset>.<table>. Got: {}' ).format (destination_dataset_table )
595
+ if '.' not in destination_dataset_table :
596
+ raise ValueError (
597
+ 'Expected destination_dataset_table name in the format of '
598
+ '<dataset>.<table>. Got: {}' .format (
599
+ destination_dataset_table ))
598
600
destination_project , destination_dataset , destination_table = \
599
601
_split_tablename (table_input = destination_dataset_table ,
600
602
default_project_id = self .project_id )
@@ -610,7 +612,9 @@ def run_query(self,
610
612
}
611
613
})
612
614
if udf_config :
613
- assert isinstance (udf_config , list )
615
+ if not isinstance (udf_config , list ):
616
+ raise TypeError ("udf_config argument must have a type 'list'"
617
+ " not {}" .format (type (udf_config )))
614
618
configuration ['query' ].update ({
615
619
'userDefinedFunctionResources' : udf_config
616
620
})
@@ -1153,10 +1157,10 @@ def run_table_delete(self, deletion_dataset_table,
1153
1157
:type ignore_if_missing: boolean
1154
1158
:return:
1155
1159
"""
1156
-
1157
- assert '.' in deletion_dataset_table , (
1158
- 'Expected deletion_dataset_table in the format of '
1159
- '<dataset>.<table>. Got: {}' ) .format (deletion_dataset_table )
1160
+ if '.' not in deletion_dataset_table :
1161
+ raise ValueError (
1162
+ 'Expected deletion_dataset_table name in the format of '
1163
+ '<dataset>.<table>. Got: {}' .format (deletion_dataset_table ) )
1160
1164
deletion_project , deletion_dataset , deletion_table = \
1161
1165
_split_tablename (table_input = deletion_dataset_table ,
1162
1166
default_project_id = self .project_id )
@@ -1284,14 +1288,10 @@ def run_grant_dataset_view_access(self,
1284
1288
# if view is already in access, do nothing.
1285
1289
self .log .info (
1286
1290
'Table %s:%s.%s already has authorized view access to %s:%s dataset.' ,
1287
- view_project , view_dataset , view_table , source_project ,
1288
- source_dataset )
1291
+ view_project , view_dataset , view_table , source_project , source_dataset )
1289
1292
return source_dataset_resource
1290
1293
1291
- def delete_dataset (self ,
1292
- project_id ,
1293
- dataset_id
1294
- ):
1294
+ def delete_dataset (self , project_id , dataset_id ):
1295
1295
"""
1296
1296
Delete a dataset of Big query in your project.
1297
1297
:param project_id: The name of the project where we have the dataset .
@@ -1308,9 +1308,8 @@ def delete_dataset(self,
1308
1308
self .service .datasets ().delete (
1309
1309
projectId = project_id ,
1310
1310
datasetId = dataset_id ).execute ()
1311
-
1312
- self .log .info ('Dataset deleted successfully: In project %s Dataset %s' ,
1313
- project_id , dataset_id )
1311
+ self .log .info ('Dataset deleted successfully: In project %s '
1312
+ 'Dataset %s' , project_id , dataset_id )
1314
1313
1315
1314
except HttpError as err :
1316
1315
raise AirflowException (
@@ -1518,14 +1517,17 @@ def _bq_cast(string_field, bq_type):
1518
1517
elif bq_type == 'FLOAT' or bq_type == 'TIMESTAMP' :
1519
1518
return float (string_field )
1520
1519
elif bq_type == 'BOOLEAN' :
1521
- assert string_field in set (['true' , 'false' ])
1520
+ if string_field not in ['true' , 'false' ]:
1521
+ raise ValueError ("{} must have value 'true' or 'false'" .format (
1522
+ string_field ))
1522
1523
return string_field == 'true'
1523
1524
else :
1524
1525
return string_field
1525
1526
1526
1527
1527
1528
def _split_tablename (table_input , default_project_id , var_name = None ):
1528
- assert default_project_id is not None , "INTERNAL: No default project is specified"
1529
+ if not default_project_id :
1530
+ raise ValueError ("INTERNAL: No default project is specified" )
1529
1531
1530
1532
def var_print (var_name ):
1531
1533
if var_name is None :
@@ -1537,7 +1539,6 @@ def var_print(var_name):
1537
1539
raise Exception (('{var}Use either : or . to specify project '
1538
1540
'got {input}' ).format (
1539
1541
var = var_print (var_name ), input = table_input ))
1540
-
1541
1542
cmpt = table_input .rsplit (':' , 1 )
1542
1543
project_id = None
1543
1544
rest = table_input
@@ -1555,8 +1556,10 @@ def var_print(var_name):
1555
1556
1556
1557
cmpt = rest .split ('.' )
1557
1558
if len (cmpt ) == 3 :
1558
- assert project_id is None , ("{var}Use either : or . to specify project"
1559
- ).format (var = var_print (var_name ))
1559
+ if project_id :
1560
+ raise ValueError (
1561
+ "{var}Use either : or . to specify project" .format (
1562
+ var = var_print (var_name )))
1560
1563
project_id = cmpt [0 ]
1561
1564
dataset_id = cmpt [1 ]
1562
1565
table_id = cmpt [2 ]
@@ -1586,10 +1589,10 @@ def _cleanse_time_partitioning(destination_dataset_table, time_partitioning_in):
1586
1589
# if it is a partitioned table ($ is in the table name) add partition load option
1587
1590
time_partitioning_out = {}
1588
1591
if destination_dataset_table and '$' in destination_dataset_table :
1589
- assert not time_partitioning_in .get ('field' ), (
1590
- "Cannot specify field partition and partition name "
1591
- "(dataset.table$partition) at the same time "
1592
- )
1592
+ if time_partitioning_in .get ('field' ):
1593
+ raise ValueError (
1594
+ "Cannot specify field partition and partition name "
1595
+ "(dataset.table$partition) at the same time" )
1593
1596
time_partitioning_out ['type' ] = 'DAY'
1594
1597
1595
1598
time_partitioning_out .update (time_partitioning_in )
0 commit comments