From bffcb30351ef9d59102f32558b6476042b1eb8bf Mon Sep 17 00:00:00 2001 From: "sercan.sagman" Date: Mon, 21 Mar 2022 02:52:24 +0300 Subject: [PATCH 1/9] make operator's execution_timeout configurable By this commit, execution_timeout attribute of the operators is now configurable globally via airflow.cfg. * The default value is still `None`. Users are expected to define a positive integer value to be passed into timedelta object to set timeout in terms of seconds by default, via configuration. * If the key is missing or is set to a non-positive value, then it is considered as `None`. * Added `gettimedelta` method to be used in abstractoperator to get timedelta or None type object. The method raises exception for the values that are not convertible to integer and/or the values too large to be converted to C int. * Sample config cases are added into unit tests. Closes #18578 --- airflow/config_templates/config.yml | 9 +++ airflow/config_templates/default_airflow.cfg | 5 ++ airflow/configuration.py | 38 ++++++++++++ airflow/models/abstractoperator.py | 1 + airflow/models/baseoperator.py | 5 +- tests/core/test_configuration.py | 64 ++++++++++++++++++++ 6 files changed, 120 insertions(+), 2 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 8fed3247898b3..232ec5c256885 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -361,6 +361,15 @@ type: string example: ~ default: "downstream" + - name: default_execution_timeout + description: | + The default execution_timeout value for the operators. Expected positive integer value to + be passed into timedelta as seconds. The missing and the non-positive values are considered as None, + means that the operators are never timed out. + version_added: 2.3.1 + type: integer + example: ~ + default: "" - name: min_serialized_dag_update_interval description: | Updating serialized DAG can not be faster than a minimum interval to reduce database write rate. diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 1982ee01a5760..e5ba801255750 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -210,6 +210,11 @@ default_task_retries = 0 # The weighting method used for the effective total priority weight of the task default_task_weight_rule = downstream +# The default execution_timeout value for the operators. Expected positive integer value to +# be passed into timedelta as seconds. The missing and the non-positive values are considered as None, +# means that the operators are never timed out. +default_execution_timeout = + # Updating serialized DAG can not be faster than a minimum interval to reduce database write rate. min_serialized_dag_update_interval = 30 diff --git a/airflow/configuration.py b/airflow/configuration.py index 8ba37a15289de..f52d9514b96f9 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +import datetime import functools import json import logging @@ -608,6 +609,43 @@ def getjson(self, section, key, fallback=_UNSET, **kwargs) -> Union[dict, list, except JSONDecodeError as e: raise AirflowConfigException(f'Unable to parse [{section}] {key!r} as valid json') from e + def gettimedelta(self, section, key, fallback=None, **kwargs) -> Optional[datetime.timedelta]: + """ + Gets the config value for the given section and key, and converts it into datetime.timedelta object. + If the key is missing or set to a non-positive value, then it is considered as `None`. + + :param section: the section from the config + :param key: the key defined in the given section + :param fallback: fallback value when no config value is given, defaults to None + :raises AirflowConfigException: raised because ValueError or OverflowError + :return: datetime.timedelta(seconds=) or None + """ + val = self.get(section, key, fallback=fallback, **kwargs) + + if val: + # the given value must be convertible to integer + try: + int_val = int(val) + except ValueError: + raise AirflowConfigException( + f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' + f'Current value: "{val}".' + ) + + # the given value must be positive. Otherwise fallback value is returned. + if int_val <= 0: + return fallback + + try: + return datetime.timedelta(seconds=int_val) + except OverflowError: + raise AirflowConfigException( + f'Failed to convert value to timedelta in `seconds`. ' + f'Please check "{key}" key in "{section}" section. Current value: "{val}".' + ) + + return fallback + def read(self, filenames, encoding=None): super().read(filenames=filenames, encoding=encoding) diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py index ee4aa48416a45..80d660a4fd2ed 100644 --- a/airflow/models/abstractoperator.py +++ b/airflow/models/abstractoperator.py @@ -67,6 +67,7 @@ conf.get("core", "default_task_weight_rule", fallback=WeightRule.DOWNSTREAM) ) DEFAULT_TRIGGER_RULE: TriggerRule = TriggerRule.ALL_SUCCESS +DEFAULT_EXECUTION_TIMEOUT: datetime.timedelta = conf.gettimedelta("core", "default_execution_timeout") class AbstractOperator(LoggingMixin, DAGNode): diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 083e075441c21..05975ace04367 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -59,6 +59,7 @@ from airflow.exceptions import AirflowException, TaskDeferred from airflow.lineage import apply_lineage, prepare_lineage from airflow.models.abstractoperator import ( + DEFAULT_EXECUTION_TIMEOUT, DEFAULT_OWNER, DEFAULT_POOL_SLOTS, DEFAULT_PRIORITY_WEIGHT, @@ -201,7 +202,7 @@ def partial( queue: str = DEFAULT_QUEUE, pool: Optional[str] = None, pool_slots: int = DEFAULT_POOL_SLOTS, - execution_timeout: Optional[timedelta] = None, + execution_timeout: Optional[timedelta] = DEFAULT_EXECUTION_TIMEOUT, retry_delay: Union[timedelta, float] = DEFAULT_RETRY_DELAY, retry_exponential_backoff: bool = False, priority_weight: int = DEFAULT_PRIORITY_WEIGHT, @@ -706,7 +707,7 @@ def __init__( pool: Optional[str] = None, pool_slots: int = DEFAULT_POOL_SLOTS, sla: Optional[timedelta] = None, - execution_timeout: Optional[timedelta] = None, + execution_timeout: Optional[timedelta] = DEFAULT_EXECUTION_TIMEOUT, on_execute_callback: Optional[TaskStateChangeCallback] = None, on_failure_callback: Optional[TaskStateChangeCallback] = None, on_success_callback: Optional[TaskStateChangeCallback] = None, diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py index 046456a9aa7eb..cbe8a33ebe487 100644 --- a/tests/core/test_configuration.py +++ b/tests/core/test_configuration.py @@ -16,6 +16,7 @@ # specific language governing permissions and limitations # under the License. import copy +import datetime import io import os import re @@ -837,3 +838,66 @@ def test_as_dict_respects_sensitive_cmds(self): else: assert 'sql_alchemy_conn' in conf_maintain_cmds['core'] assert conf_maintain_cmds['core']['sql_alchemy_conn'] == conf_conn + + def test_gettimedelta(self): + test_config = ''' +[invalid] +# non-integer value +key1 = str + +# fractional value +key2 = 300.99 + +# too large value for C int +key3 = 999999999999999 + +[valid] +# Equals to None +key4 = -1 + +# zero +key5 = 0 + +# valid seconds +key6 = 300 + +[default] +# Equals to None +key7 = +''' + test_conf = AirflowConfigParser(default_config=test_config) + with pytest.raises( + AirflowConfigException, + match=re.escape( + 'Failed to convert value to int. Please check "key1" key in "invalid" section. ' + 'Current value: "str".' + ), + ): + test_conf.gettimedelta("invalid", "key1") + + with pytest.raises( + AirflowConfigException, + match=re.escape( + 'Failed to convert value to int. Please check "key2" key in "invalid" section. ' + 'Current value: "300.99".' + ), + ): + test_conf.gettimedelta("invalid", "key2") + + with pytest.raises( + AirflowConfigException, + match=re.escape( + 'Failed to convert value to timedelta in `seconds`. ' + 'Please check "key3" key in "invalid" section. Current value: "999999999999999".' + ), + ): + test_conf.gettimedelta("invalid", "key3") + + assert isinstance(test_conf.gettimedelta('valid', 'key4'), type(None)) + assert test_conf.gettimedelta('valid', 'key4') is None + assert isinstance(test_conf.gettimedelta('valid', 'key5'), type(None)) + assert test_conf.gettimedelta('valid', 'key5') is None + assert isinstance(test_conf.gettimedelta('valid', 'key6'), datetime.timedelta) + assert test_conf.gettimedelta('valid', 'key6') == datetime.timedelta(seconds=300) + assert isinstance(test_conf.gettimedelta('default', 'key7'), type(None)) + assert test_conf.gettimedelta('default', 'key7') is None From 20361351149b7beaf672e178aca64519450b0396 Mon Sep 17 00:00:00 2001 From: "sercan.sagman" Date: Mon, 21 Mar 2022 21:00:33 +0300 Subject: [PATCH 2/9] raise error for non-positive execution_timeout * By this commit, error raises for the values <= 0 instead of using fallback value * Updated unit tests --- airflow/configuration.py | 6 +++++- tests/core/test_configuration.py | 26 +++++++++++++++++++++----- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/airflow/configuration.py b/airflow/configuration.py index f52d9514b96f9..bab51cb995aa7 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -634,7 +634,11 @@ def gettimedelta(self, section, key, fallback=None, **kwargs) -> Optional[dateti # the given value must be positive. Otherwise fallback value is returned. if int_val <= 0: - return fallback + raise AirflowConfigException( + f'Failed to convert value to timedelta in `seconds`. ' + f'Value must be greater than zero. ' + f'Please check "{key}" key in "{section}" section. Current value: "{val}".' + ) try: return datetime.timedelta(seconds=int_val) diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py index cbe8a33ebe487..a070668bb448a 100644 --- a/tests/core/test_configuration.py +++ b/tests/core/test_configuration.py @@ -851,13 +851,13 @@ def test_gettimedelta(self): # too large value for C int key3 = 999999999999999 -[valid] # Equals to None key4 = -1 # zero key5 = 0 +[valid] # valid seconds key6 = 300 @@ -893,10 +893,26 @@ def test_gettimedelta(self): ): test_conf.gettimedelta("invalid", "key3") - assert isinstance(test_conf.gettimedelta('valid', 'key4'), type(None)) - assert test_conf.gettimedelta('valid', 'key4') is None - assert isinstance(test_conf.gettimedelta('valid', 'key5'), type(None)) - assert test_conf.gettimedelta('valid', 'key5') is None + with pytest.raises( + AirflowConfigException, + match=re.escape( + 'Failed to convert value to timedelta in `seconds`. ' + 'Value must be greater than zero. ' + 'Please check "key4" key in "invalid" section. Current value: "-1".' + ), + ): + test_conf.gettimedelta("invalid", "key4") + + with pytest.raises( + AirflowConfigException, + match=re.escape( + 'Failed to convert value to timedelta in `seconds`. ' + 'Value must be greater than zero. ' + 'Please check "key5" key in "invalid" section. Current value: "0".' + ), + ): + test_conf.gettimedelta("invalid", "key5") + assert isinstance(test_conf.gettimedelta('valid', 'key6'), datetime.timedelta) assert test_conf.gettimedelta('valid', 'key6') == datetime.timedelta(seconds=300) assert isinstance(test_conf.gettimedelta('default', 'key7'), type(None)) From 047d8c3e03c478c00fa1cef339e0d71868657543 Mon Sep 17 00:00:00 2001 From: "sercan.sagman" Date: Mon, 21 Mar 2022 21:03:46 +0300 Subject: [PATCH 3/9] include OverflowError error message in exception To be more clear to the user, added relevant error message into to AirflowConfigException. --- airflow/configuration.py | 3 ++- tests/core/test_configuration.py | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/airflow/configuration.py b/airflow/configuration.py index bab51cb995aa7..a15fdf08eed70 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -642,9 +642,10 @@ def gettimedelta(self, section, key, fallback=None, **kwargs) -> Optional[dateti try: return datetime.timedelta(seconds=int_val) - except OverflowError: + except OverflowError as err: raise AirflowConfigException( f'Failed to convert value to timedelta in `seconds`. ' + f'{err}. ' f'Please check "{key}" key in "{section}" section. Current value: "{val}".' ) diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py index a070668bb448a..8a96c6b8b4483 100644 --- a/tests/core/test_configuration.py +++ b/tests/core/test_configuration.py @@ -888,6 +888,7 @@ def test_gettimedelta(self): AirflowConfigException, match=re.escape( 'Failed to convert value to timedelta in `seconds`. ' + 'Python int too large to convert to C int. ' 'Please check "key3" key in "invalid" section. Current value: "999999999999999".' ), ): From fd7d42e94acbfea36847f0e6f5dc9c84f8a63a4a Mon Sep 17 00:00:00 2001 From: "sercan.sagman" Date: Mon, 21 Mar 2022 21:10:06 +0300 Subject: [PATCH 4/9] rename default_execution_timeout This parameter specifies the tasks' execution timeout, so all configuration and variable names are now contains `task` in it. --- airflow/config_templates/config.yml | 4 ++-- airflow/config_templates/default_airflow.cfg | 4 ++-- airflow/models/abstractoperator.py | 4 +++- airflow/models/baseoperator.py | 6 +++--- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 232ec5c256885..121d3fc8cab76 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -361,9 +361,9 @@ type: string example: ~ default: "downstream" - - name: default_execution_timeout + - name: default_task_execution_timeout description: | - The default execution_timeout value for the operators. Expected positive integer value to + The default task execution_timeout value for the operators. Expected positive integer value to be passed into timedelta as seconds. The missing and the non-positive values are considered as None, means that the operators are never timed out. version_added: 2.3.1 diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index e5ba801255750..ba204fe2c83a9 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -210,10 +210,10 @@ default_task_retries = 0 # The weighting method used for the effective total priority weight of the task default_task_weight_rule = downstream -# The default execution_timeout value for the operators. Expected positive integer value to +# The default task execution_timeout value for the operators. Expected positive integer value to # be passed into timedelta as seconds. The missing and the non-positive values are considered as None, # means that the operators are never timed out. -default_execution_timeout = +default_task_execution_timeout = # Updating serialized DAG can not be faster than a minimum interval to reduce database write rate. min_serialized_dag_update_interval = 30 diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py index 80d660a4fd2ed..ea80bed26636b 100644 --- a/airflow/models/abstractoperator.py +++ b/airflow/models/abstractoperator.py @@ -67,7 +67,9 @@ conf.get("core", "default_task_weight_rule", fallback=WeightRule.DOWNSTREAM) ) DEFAULT_TRIGGER_RULE: TriggerRule = TriggerRule.ALL_SUCCESS -DEFAULT_EXECUTION_TIMEOUT: datetime.timedelta = conf.gettimedelta("core", "default_execution_timeout") +DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta = conf.gettimedelta( + "core", "default_task_execution_timeout" +) class AbstractOperator(LoggingMixin, DAGNode): diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 05975ace04367..344f3760ed10e 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -59,13 +59,13 @@ from airflow.exceptions import AirflowException, TaskDeferred from airflow.lineage import apply_lineage, prepare_lineage from airflow.models.abstractoperator import ( - DEFAULT_EXECUTION_TIMEOUT, DEFAULT_OWNER, DEFAULT_POOL_SLOTS, DEFAULT_PRIORITY_WEIGHT, DEFAULT_QUEUE, DEFAULT_RETRIES, DEFAULT_RETRY_DELAY, + DEFAULT_TASK_EXECUTION_TIMEOUT, DEFAULT_TRIGGER_RULE, DEFAULT_WEIGHT_RULE, AbstractOperator, @@ -202,7 +202,7 @@ def partial( queue: str = DEFAULT_QUEUE, pool: Optional[str] = None, pool_slots: int = DEFAULT_POOL_SLOTS, - execution_timeout: Optional[timedelta] = DEFAULT_EXECUTION_TIMEOUT, + execution_timeout: Optional[timedelta] = DEFAULT_TASK_EXECUTION_TIMEOUT, retry_delay: Union[timedelta, float] = DEFAULT_RETRY_DELAY, retry_exponential_backoff: bool = False, priority_weight: int = DEFAULT_PRIORITY_WEIGHT, @@ -707,7 +707,7 @@ def __init__( pool: Optional[str] = None, pool_slots: int = DEFAULT_POOL_SLOTS, sla: Optional[timedelta] = None, - execution_timeout: Optional[timedelta] = DEFAULT_EXECUTION_TIMEOUT, + execution_timeout: Optional[timedelta] = DEFAULT_TASK_EXECUTION_TIMEOUT, on_execute_callback: Optional[TaskStateChangeCallback] = None, on_failure_callback: Optional[TaskStateChangeCallback] = None, on_success_callback: Optional[TaskStateChangeCallback] = None, From de97218bf56d97bdec4a85fdbbbd284e65561a3c Mon Sep 17 00:00:00 2001 From: "sercan.sagman" Date: Mon, 21 Mar 2022 21:11:24 +0300 Subject: [PATCH 5/9] update `version_added` for execution_timeout --- airflow/config_templates/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 121d3fc8cab76..debc3f367e1bf 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -366,7 +366,7 @@ The default task execution_timeout value for the operators. Expected positive integer value to be passed into timedelta as seconds. The missing and the non-positive values are considered as None, means that the operators are never timed out. - version_added: 2.3.1 + version_added: 2.3.0 type: integer example: ~ default: "" From 413f77d7a45f30c7a232b35dc1c61286acb914d3 Mon Sep 17 00:00:00 2001 From: "sercan.sagman" Date: Tue, 22 Mar 2022 00:28:15 +0300 Subject: [PATCH 6/9] update execution_timeout description fixed the description of default_task_execution_timeout based on the recent changes --- airflow/config_templates/config.yml | 4 ++-- airflow/config_templates/default_airflow.cfg | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index debc3f367e1bf..a431a52f892ff 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -364,8 +364,8 @@ - name: default_task_execution_timeout description: | The default task execution_timeout value for the operators. Expected positive integer value to - be passed into timedelta as seconds. The missing and the non-positive values are considered as None, - means that the operators are never timed out. + be passed into timedelta as seconds. If not specified, then the value is considered as None, + meaning that the operators are never timed out by default. version_added: 2.3.0 type: integer example: ~ diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index ba204fe2c83a9..33d665dd4f0ac 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -211,8 +211,8 @@ default_task_retries = 0 default_task_weight_rule = downstream # The default task execution_timeout value for the operators. Expected positive integer value to -# be passed into timedelta as seconds. The missing and the non-positive values are considered as None, -# means that the operators are never timed out. +# be passed into timedelta as seconds. If not specified, then the value is considered as None, +# meaning that the operators are never timed out by default. default_task_execution_timeout = # Updating serialized DAG can not be faster than a minimum interval to reduce database write rate. From 07ae2b1dd0764beb40aa2ed7cf789c7dc7fb8587 Mon Sep 17 00:00:00 2001 From: "sercan.sagman" Date: Tue, 22 Mar 2022 00:43:46 +0300 Subject: [PATCH 7/9] update inline comment for non-positive value check --- airflow/configuration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/configuration.py b/airflow/configuration.py index a15fdf08eed70..138ac64743c58 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -632,7 +632,7 @@ def gettimedelta(self, section, key, fallback=None, **kwargs) -> Optional[dateti f'Current value: "{val}".' ) - # the given value must be positive. Otherwise fallback value is returned. + # the given value must be positive. if int_val <= 0: raise AirflowConfigException( f'Failed to convert value to timedelta in `seconds`. ' From 39a571b1a0dd9017f9c6c45e445a4d69f8e2e3a8 Mon Sep 17 00:00:00 2001 From: "sercan.sagman" Date: Tue, 22 Mar 2022 00:47:24 +0300 Subject: [PATCH 8/9] update `gettimedelta` docstring --- airflow/configuration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/configuration.py b/airflow/configuration.py index 138ac64743c58..feee48ebe09c7 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -612,7 +612,7 @@ def getjson(self, section, key, fallback=_UNSET, **kwargs) -> Union[dict, list, def gettimedelta(self, section, key, fallback=None, **kwargs) -> Optional[datetime.timedelta]: """ Gets the config value for the given section and key, and converts it into datetime.timedelta object. - If the key is missing or set to a non-positive value, then it is considered as `None`. + If the key is missing, then it is considered as `None`. :param section: the section from the config :param key: the key defined in the given section From 7f30f2536680c49ff0010c34ce348178cfebba83 Mon Sep 17 00:00:00 2001 From: "sercan.sagman" Date: Sun, 10 Apr 2022 15:43:28 +0300 Subject: [PATCH 9/9] allow non-positive values in gettimedelta Before this commit, gettimedelta method was preventing user to provide non-positive values. Now it is totally up to users to provide a sensible value for this configuration --- airflow/config_templates/config.yml | 2 +- airflow/config_templates/default_airflow.cfg | 2 +- airflow/configuration.py | 8 ------ tests/core/test_configuration.py | 30 +++++--------------- 4 files changed, 9 insertions(+), 33 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index a431a52f892ff..3acbdcc3d645f 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -363,7 +363,7 @@ default: "downstream" - name: default_task_execution_timeout description: | - The default task execution_timeout value for the operators. Expected positive integer value to + The default task execution_timeout value for the operators. Expected an integer value to be passed into timedelta as seconds. If not specified, then the value is considered as None, meaning that the operators are never timed out by default. version_added: 2.3.0 diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 33d665dd4f0ac..706f83608ded5 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -210,7 +210,7 @@ default_task_retries = 0 # The weighting method used for the effective total priority weight of the task default_task_weight_rule = downstream -# The default task execution_timeout value for the operators. Expected positive integer value to +# The default task execution_timeout value for the operators. Expected an integer value to # be passed into timedelta as seconds. If not specified, then the value is considered as None, # meaning that the operators are never timed out by default. default_task_execution_timeout = diff --git a/airflow/configuration.py b/airflow/configuration.py index feee48ebe09c7..fd17245219fb2 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -632,14 +632,6 @@ def gettimedelta(self, section, key, fallback=None, **kwargs) -> Optional[dateti f'Current value: "{val}".' ) - # the given value must be positive. - if int_val <= 0: - raise AirflowConfigException( - f'Failed to convert value to timedelta in `seconds`. ' - f'Value must be greater than zero. ' - f'Please check "{key}" key in "{section}" section. Current value: "{val}".' - ) - try: return datetime.timedelta(seconds=int_val) except OverflowError as err: diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py index 8a96c6b8b4483..68efdbbb911f8 100644 --- a/tests/core/test_configuration.py +++ b/tests/core/test_configuration.py @@ -851,14 +851,14 @@ def test_gettimedelta(self): # too large value for C int key3 = 999999999999999 -# Equals to None +[valid] +# negative value key4 = -1 # zero key5 = 0 -[valid] -# valid seconds +# positive value key6 = 300 [default] @@ -894,26 +894,10 @@ def test_gettimedelta(self): ): test_conf.gettimedelta("invalid", "key3") - with pytest.raises( - AirflowConfigException, - match=re.escape( - 'Failed to convert value to timedelta in `seconds`. ' - 'Value must be greater than zero. ' - 'Please check "key4" key in "invalid" section. Current value: "-1".' - ), - ): - test_conf.gettimedelta("invalid", "key4") - - with pytest.raises( - AirflowConfigException, - match=re.escape( - 'Failed to convert value to timedelta in `seconds`. ' - 'Value must be greater than zero. ' - 'Please check "key5" key in "invalid" section. Current value: "0".' - ), - ): - test_conf.gettimedelta("invalid", "key5") - + assert isinstance(test_conf.gettimedelta('valid', 'key4'), datetime.timedelta) + assert test_conf.gettimedelta('valid', 'key4') == datetime.timedelta(seconds=-1) + assert isinstance(test_conf.gettimedelta('valid', 'key5'), datetime.timedelta) + assert test_conf.gettimedelta('valid', 'key5') == datetime.timedelta(seconds=0) assert isinstance(test_conf.gettimedelta('valid', 'key6'), datetime.timedelta) assert test_conf.gettimedelta('valid', 'key6') == datetime.timedelta(seconds=300) assert isinstance(test_conf.gettimedelta('default', 'key7'), type(None))