diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 8fed3247898b3..3acbdcc3d645f 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -361,6 +361,15 @@ type: string example: ~ default: "downstream" + - name: default_task_execution_timeout + description: | + The default task execution_timeout value for the operators. Expected an integer value to + be passed into timedelta as seconds. If not specified, then the value is considered as None, + meaning that the operators are never timed out by default. + version_added: 2.3.0 + type: integer + example: ~ + default: "" - name: min_serialized_dag_update_interval description: | Updating serialized DAG can not be faster than a minimum interval to reduce database write rate. diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 1982ee01a5760..706f83608ded5 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -210,6 +210,11 @@ default_task_retries = 0 # The weighting method used for the effective total priority weight of the task default_task_weight_rule = downstream +# The default task execution_timeout value for the operators. Expected an integer value to +# be passed into timedelta as seconds. If not specified, then the value is considered as None, +# meaning that the operators are never timed out by default. +default_task_execution_timeout = + # Updating serialized DAG can not be faster than a minimum interval to reduce database write rate. min_serialized_dag_update_interval = 30 diff --git a/airflow/configuration.py b/airflow/configuration.py index 8ba37a15289de..fd17245219fb2 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +import datetime import functools import json import logging @@ -608,6 +609,40 @@ def getjson(self, section, key, fallback=_UNSET, **kwargs) -> Union[dict, list, except JSONDecodeError as e: raise AirflowConfigException(f'Unable to parse [{section}] {key!r} as valid json') from e + def gettimedelta(self, section, key, fallback=None, **kwargs) -> Optional[datetime.timedelta]: + """ + Gets the config value for the given section and key, and converts it into datetime.timedelta object. + If the key is missing, then it is considered as `None`. + + :param section: the section from the config + :param key: the key defined in the given section + :param fallback: fallback value when no config value is given, defaults to None + :raises AirflowConfigException: raised because ValueError or OverflowError + :return: datetime.timedelta(seconds=) or None + """ + val = self.get(section, key, fallback=fallback, **kwargs) + + if val: + # the given value must be convertible to integer + try: + int_val = int(val) + except ValueError: + raise AirflowConfigException( + f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' + f'Current value: "{val}".' + ) + + try: + return datetime.timedelta(seconds=int_val) + except OverflowError as err: + raise AirflowConfigException( + f'Failed to convert value to timedelta in `seconds`. ' + f'{err}. ' + f'Please check "{key}" key in "{section}" section. Current value: "{val}".' + ) + + return fallback + def read(self, filenames, encoding=None): super().read(filenames=filenames, encoding=encoding) diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py index ee4aa48416a45..ea80bed26636b 100644 --- a/airflow/models/abstractoperator.py +++ b/airflow/models/abstractoperator.py @@ -67,6 +67,9 @@ conf.get("core", "default_task_weight_rule", fallback=WeightRule.DOWNSTREAM) ) DEFAULT_TRIGGER_RULE: TriggerRule = TriggerRule.ALL_SUCCESS +DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta = conf.gettimedelta( + "core", "default_task_execution_timeout" +) class AbstractOperator(LoggingMixin, DAGNode): diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 083e075441c21..344f3760ed10e 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -65,6 +65,7 @@ DEFAULT_QUEUE, DEFAULT_RETRIES, DEFAULT_RETRY_DELAY, + DEFAULT_TASK_EXECUTION_TIMEOUT, DEFAULT_TRIGGER_RULE, DEFAULT_WEIGHT_RULE, AbstractOperator, @@ -201,7 +202,7 @@ def partial( queue: str = DEFAULT_QUEUE, pool: Optional[str] = None, pool_slots: int = DEFAULT_POOL_SLOTS, - execution_timeout: Optional[timedelta] = None, + execution_timeout: Optional[timedelta] = DEFAULT_TASK_EXECUTION_TIMEOUT, retry_delay: Union[timedelta, float] = DEFAULT_RETRY_DELAY, retry_exponential_backoff: bool = False, priority_weight: int = DEFAULT_PRIORITY_WEIGHT, @@ -706,7 +707,7 @@ def __init__( pool: Optional[str] = None, pool_slots: int = DEFAULT_POOL_SLOTS, sla: Optional[timedelta] = None, - execution_timeout: Optional[timedelta] = None, + execution_timeout: Optional[timedelta] = DEFAULT_TASK_EXECUTION_TIMEOUT, on_execute_callback: Optional[TaskStateChangeCallback] = None, on_failure_callback: Optional[TaskStateChangeCallback] = None, on_success_callback: Optional[TaskStateChangeCallback] = None, diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py index 046456a9aa7eb..68efdbbb911f8 100644 --- a/tests/core/test_configuration.py +++ b/tests/core/test_configuration.py @@ -16,6 +16,7 @@ # specific language governing permissions and limitations # under the License. import copy +import datetime import io import os import re @@ -837,3 +838,67 @@ def test_as_dict_respects_sensitive_cmds(self): else: assert 'sql_alchemy_conn' in conf_maintain_cmds['core'] assert conf_maintain_cmds['core']['sql_alchemy_conn'] == conf_conn + + def test_gettimedelta(self): + test_config = ''' +[invalid] +# non-integer value +key1 = str + +# fractional value +key2 = 300.99 + +# too large value for C int +key3 = 999999999999999 + +[valid] +# negative value +key4 = -1 + +# zero +key5 = 0 + +# positive value +key6 = 300 + +[default] +# Equals to None +key7 = +''' + test_conf = AirflowConfigParser(default_config=test_config) + with pytest.raises( + AirflowConfigException, + match=re.escape( + 'Failed to convert value to int. Please check "key1" key in "invalid" section. ' + 'Current value: "str".' + ), + ): + test_conf.gettimedelta("invalid", "key1") + + with pytest.raises( + AirflowConfigException, + match=re.escape( + 'Failed to convert value to int. Please check "key2" key in "invalid" section. ' + 'Current value: "300.99".' + ), + ): + test_conf.gettimedelta("invalid", "key2") + + with pytest.raises( + AirflowConfigException, + match=re.escape( + 'Failed to convert value to timedelta in `seconds`. ' + 'Python int too large to convert to C int. ' + 'Please check "key3" key in "invalid" section. Current value: "999999999999999".' + ), + ): + test_conf.gettimedelta("invalid", "key3") + + assert isinstance(test_conf.gettimedelta('valid', 'key4'), datetime.timedelta) + assert test_conf.gettimedelta('valid', 'key4') == datetime.timedelta(seconds=-1) + assert isinstance(test_conf.gettimedelta('valid', 'key5'), datetime.timedelta) + assert test_conf.gettimedelta('valid', 'key5') == datetime.timedelta(seconds=0) + assert isinstance(test_conf.gettimedelta('valid', 'key6'), datetime.timedelta) + assert test_conf.gettimedelta('valid', 'key6') == datetime.timedelta(seconds=300) + assert isinstance(test_conf.gettimedelta('default', 'key7'), type(None)) + assert test_conf.gettimedelta('default', 'key7') is None