Skip to content

Commit

Permalink
make operator's execution_timeout configurable (#22389)
Browse files Browse the repository at this point in the history
* make operator's execution_timeout configurable

By this commit, execution_timeout attribute of the
operators is now configurable globally via airflow.cfg.

* The default value is still `None`. Users are expected to
define a positive integer value to be passed into timedelta object
to set timeout in terms of seconds by default, via configuration.
* If the key is missing or is set to a non-positive value, then it is
considered as `None`.
* Added `gettimedelta` method to be used in abstractoperator
to get timedelta or None type object. The method raises exception
for the values that are not convertible to integer and/or the values
too large to be converted to C int.
* Sample config cases are added into unit tests.

Closes #18578

* raise error for non-positive execution_timeout

* By this commit, error raises for the values <= 0
instead of using fallback value
* Updated unit tests

* include OverflowError error message in exception

To be more clear to the user, added relevant error message
into to AirflowConfigException.

* rename default_execution_timeout

This parameter specifies the tasks' execution timeout,
so all configuration and variable names are now contains
`task` in it.

* update `version_added` for execution_timeout

* update execution_timeout description

fixed the description of default_task_execution_timeout
based on the recent changes

* update inline comment for non-positive value check

* update `gettimedelta` docstring

* allow non-positive values in gettimedelta

Before this commit, gettimedelta method was preventing
user to provide non-positive values. Now it is totally up to
users to provide a sensible value for this configuration

Co-authored-by: sercan.sagman <sercan.sagman@inventanalytics.com>
  • Loading branch information
sagmansercan and sercansagmaninvent authored Apr 11, 2022
1 parent 4201e6e commit a111a79
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 2 deletions.
9 changes: 9 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,15 @@
type: string
example: ~
default: "downstream"
- name: default_task_execution_timeout
description: |
The default task execution_timeout value for the operators. Expected an integer value to
be passed into timedelta as seconds. If not specified, then the value is considered as None,
meaning that the operators are never timed out by default.
version_added: 2.3.0
type: integer
example: ~
default: ""
- name: min_serialized_dag_update_interval
description: |
Updating serialized DAG can not be faster than a minimum interval to reduce database write rate.
Expand Down
5 changes: 5 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ default_task_retries = 0
# The weighting method used for the effective total priority weight of the task
default_task_weight_rule = downstream

# The default task execution_timeout value for the operators. Expected an integer value to
# be passed into timedelta as seconds. If not specified, then the value is considered as None,
# meaning that the operators are never timed out by default.
default_task_execution_timeout =

# Updating serialized DAG can not be faster than a minimum interval to reduce database write rate.
min_serialized_dag_update_interval = 30

Expand Down
35 changes: 35 additions & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

import datetime
import functools
import json
import logging
Expand Down Expand Up @@ -608,6 +609,40 @@ def getjson(self, section, key, fallback=_UNSET, **kwargs) -> Union[dict, list,
except JSONDecodeError as e:
raise AirflowConfigException(f'Unable to parse [{section}] {key!r} as valid json') from e

def gettimedelta(self, section, key, fallback=None, **kwargs) -> Optional[datetime.timedelta]:
"""
Gets the config value for the given section and key, and converts it into datetime.timedelta object.
If the key is missing, then it is considered as `None`.
:param section: the section from the config
:param key: the key defined in the given section
:param fallback: fallback value when no config value is given, defaults to None
:raises AirflowConfigException: raised because ValueError or OverflowError
:return: datetime.timedelta(seconds=<config_value>) or None
"""
val = self.get(section, key, fallback=fallback, **kwargs)

if val:
# the given value must be convertible to integer
try:
int_val = int(val)
except ValueError:
raise AirflowConfigException(
f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
f'Current value: "{val}".'
)

try:
return datetime.timedelta(seconds=int_val)
except OverflowError as err:
raise AirflowConfigException(
f'Failed to convert value to timedelta in `seconds`. '
f'{err}. '
f'Please check "{key}" key in "{section}" section. Current value: "{val}".'
)

return fallback

def read(self, filenames, encoding=None):
super().read(filenames=filenames, encoding=encoding)

Expand Down
3 changes: 3 additions & 0 deletions airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@
conf.get("core", "default_task_weight_rule", fallback=WeightRule.DOWNSTREAM)
)
DEFAULT_TRIGGER_RULE: TriggerRule = TriggerRule.ALL_SUCCESS
DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta = conf.gettimedelta(
"core", "default_task_execution_timeout"
)


class AbstractOperator(LoggingMixin, DAGNode):
Expand Down
5 changes: 3 additions & 2 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
DEFAULT_QUEUE,
DEFAULT_RETRIES,
DEFAULT_RETRY_DELAY,
DEFAULT_TASK_EXECUTION_TIMEOUT,
DEFAULT_TRIGGER_RULE,
DEFAULT_WEIGHT_RULE,
AbstractOperator,
Expand Down Expand Up @@ -201,7 +202,7 @@ def partial(
queue: str = DEFAULT_QUEUE,
pool: Optional[str] = None,
pool_slots: int = DEFAULT_POOL_SLOTS,
execution_timeout: Optional[timedelta] = None,
execution_timeout: Optional[timedelta] = DEFAULT_TASK_EXECUTION_TIMEOUT,
retry_delay: Union[timedelta, float] = DEFAULT_RETRY_DELAY,
retry_exponential_backoff: bool = False,
priority_weight: int = DEFAULT_PRIORITY_WEIGHT,
Expand Down Expand Up @@ -706,7 +707,7 @@ def __init__(
pool: Optional[str] = None,
pool_slots: int = DEFAULT_POOL_SLOTS,
sla: Optional[timedelta] = None,
execution_timeout: Optional[timedelta] = None,
execution_timeout: Optional[timedelta] = DEFAULT_TASK_EXECUTION_TIMEOUT,
on_execute_callback: Optional[TaskStateChangeCallback] = None,
on_failure_callback: Optional[TaskStateChangeCallback] = None,
on_success_callback: Optional[TaskStateChangeCallback] = None,
Expand Down
65 changes: 65 additions & 0 deletions tests/core/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.
import copy
import datetime
import io
import os
import re
Expand Down Expand Up @@ -837,3 +838,67 @@ def test_as_dict_respects_sensitive_cmds(self):
else:
assert 'sql_alchemy_conn' in conf_maintain_cmds['core']
assert conf_maintain_cmds['core']['sql_alchemy_conn'] == conf_conn

def test_gettimedelta(self):
test_config = '''
[invalid]
# non-integer value
key1 = str
# fractional value
key2 = 300.99
# too large value for C int
key3 = 999999999999999
[valid]
# negative value
key4 = -1
# zero
key5 = 0
# positive value
key6 = 300
[default]
# Equals to None
key7 =
'''
test_conf = AirflowConfigParser(default_config=test_config)
with pytest.raises(
AirflowConfigException,
match=re.escape(
'Failed to convert value to int. Please check "key1" key in "invalid" section. '
'Current value: "str".'
),
):
test_conf.gettimedelta("invalid", "key1")

with pytest.raises(
AirflowConfigException,
match=re.escape(
'Failed to convert value to int. Please check "key2" key in "invalid" section. '
'Current value: "300.99".'
),
):
test_conf.gettimedelta("invalid", "key2")

with pytest.raises(
AirflowConfigException,
match=re.escape(
'Failed to convert value to timedelta in `seconds`. '
'Python int too large to convert to C int. '
'Please check "key3" key in "invalid" section. Current value: "999999999999999".'
),
):
test_conf.gettimedelta("invalid", "key3")

assert isinstance(test_conf.gettimedelta('valid', 'key4'), datetime.timedelta)
assert test_conf.gettimedelta('valid', 'key4') == datetime.timedelta(seconds=-1)
assert isinstance(test_conf.gettimedelta('valid', 'key5'), datetime.timedelta)
assert test_conf.gettimedelta('valid', 'key5') == datetime.timedelta(seconds=0)
assert isinstance(test_conf.gettimedelta('valid', 'key6'), datetime.timedelta)
assert test_conf.gettimedelta('valid', 'key6') == datetime.timedelta(seconds=300)
assert isinstance(test_conf.gettimedelta('default', 'key7'), type(None))
assert test_conf.gettimedelta('default', 'key7') is None

0 comments on commit a111a79

Please sign in to comment.