Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make operator's execution_timeout configurable #22389

Merged
merged 9 commits into from
Apr 11, 2022
9 changes: 9 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,15 @@
type: string
example: ~
default: "downstream"
- name: default_task_execution_timeout
description: |
The default task execution_timeout value for the operators. Expected an integer value to
be passed into timedelta as seconds. If not specified, then the value is considered as None,
meaning that the operators are never timed out by default.
version_added: 2.3.0
type: integer
example: ~
default: ""
- name: min_serialized_dag_update_interval
description: |
Updating serialized DAG can not be faster than a minimum interval to reduce database write rate.
Expand Down
5 changes: 5 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ default_task_retries = 0
# The weighting method used for the effective total priority weight of the task
default_task_weight_rule = downstream

# The default task execution_timeout value for the operators. Expected an integer value to
# be passed into timedelta as seconds. If not specified, then the value is considered as None,
# meaning that the operators are never timed out by default.
default_task_execution_timeout =

# Updating serialized DAG can not be faster than a minimum interval to reduce database write rate.
min_serialized_dag_update_interval = 30

Expand Down
35 changes: 35 additions & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

import datetime
import functools
import json
import logging
Expand Down Expand Up @@ -608,6 +609,40 @@ def getjson(self, section, key, fallback=_UNSET, **kwargs) -> Union[dict, list,
except JSONDecodeError as e:
raise AirflowConfigException(f'Unable to parse [{section}] {key!r} as valid json') from e

def gettimedelta(self, section, key, fallback=None, **kwargs) -> Optional[datetime.timedelta]:
"""
Gets the config value for the given section and key, and converts it into datetime.timedelta object.
If the key is missing, then it is considered as `None`.

:param section: the section from the config
:param key: the key defined in the given section
:param fallback: fallback value when no config value is given, defaults to None
:raises AirflowConfigException: raised because ValueError or OverflowError
:return: datetime.timedelta(seconds=<config_value>) or None
"""
val = self.get(section, key, fallback=fallback, **kwargs)

if val:
# the given value must be convertible to integer
try:
int_val = int(val)
except ValueError:
raise AirflowConfigException(
f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
f'Current value: "{val}".'
)

try:
return datetime.timedelta(seconds=int_val)
except OverflowError as err:
raise AirflowConfigException(
f'Failed to convert value to timedelta in `seconds`. '
f'{err}. '
f'Please check "{key}" key in "{section}" section. Current value: "{val}".'
)

return fallback

def read(self, filenames, encoding=None):
super().read(filenames=filenames, encoding=encoding)

Expand Down
3 changes: 3 additions & 0 deletions airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@
conf.get("core", "default_task_weight_rule", fallback=WeightRule.DOWNSTREAM)
)
DEFAULT_TRIGGER_RULE: TriggerRule = TriggerRule.ALL_SUCCESS
DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta = conf.gettimedelta(
"core", "default_task_execution_timeout"
)


class AbstractOperator(LoggingMixin, DAGNode):
Expand Down
5 changes: 3 additions & 2 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
DEFAULT_QUEUE,
DEFAULT_RETRIES,
DEFAULT_RETRY_DELAY,
DEFAULT_TASK_EXECUTION_TIMEOUT,
DEFAULT_TRIGGER_RULE,
DEFAULT_WEIGHT_RULE,
AbstractOperator,
Expand Down Expand Up @@ -201,7 +202,7 @@ def partial(
queue: str = DEFAULT_QUEUE,
pool: Optional[str] = None,
pool_slots: int = DEFAULT_POOL_SLOTS,
execution_timeout: Optional[timedelta] = None,
execution_timeout: Optional[timedelta] = DEFAULT_TASK_EXECUTION_TIMEOUT,
retry_delay: Union[timedelta, float] = DEFAULT_RETRY_DELAY,
retry_exponential_backoff: bool = False,
priority_weight: int = DEFAULT_PRIORITY_WEIGHT,
Expand Down Expand Up @@ -706,7 +707,7 @@ def __init__(
pool: Optional[str] = None,
pool_slots: int = DEFAULT_POOL_SLOTS,
sla: Optional[timedelta] = None,
execution_timeout: Optional[timedelta] = None,
execution_timeout: Optional[timedelta] = DEFAULT_TASK_EXECUTION_TIMEOUT,
on_execute_callback: Optional[TaskStateChangeCallback] = None,
on_failure_callback: Optional[TaskStateChangeCallback] = None,
on_success_callback: Optional[TaskStateChangeCallback] = None,
Expand Down
65 changes: 65 additions & 0 deletions tests/core/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.
import copy
import datetime
import io
import os
import re
Expand Down Expand Up @@ -837,3 +838,67 @@ def test_as_dict_respects_sensitive_cmds(self):
else:
assert 'sql_alchemy_conn' in conf_maintain_cmds['core']
assert conf_maintain_cmds['core']['sql_alchemy_conn'] == conf_conn

def test_gettimedelta(self):
test_config = '''
[invalid]
# non-integer value
key1 = str

# fractional value
key2 = 300.99

# too large value for C int
key3 = 999999999999999

[valid]
# negative value
key4 = -1

# zero
key5 = 0

# positive value
key6 = 300

[default]
# Equals to None
key7 =
'''
test_conf = AirflowConfigParser(default_config=test_config)
with pytest.raises(
AirflowConfigException,
match=re.escape(
'Failed to convert value to int. Please check "key1" key in "invalid" section. '
'Current value: "str".'
),
):
test_conf.gettimedelta("invalid", "key1")

with pytest.raises(
AirflowConfigException,
match=re.escape(
'Failed to convert value to int. Please check "key2" key in "invalid" section. '
'Current value: "300.99".'
),
):
test_conf.gettimedelta("invalid", "key2")

with pytest.raises(
AirflowConfigException,
match=re.escape(
'Failed to convert value to timedelta in `seconds`. '
'Python int too large to convert to C int. '
'Please check "key3" key in "invalid" section. Current value: "999999999999999".'
),
):
test_conf.gettimedelta("invalid", "key3")

assert isinstance(test_conf.gettimedelta('valid', 'key4'), datetime.timedelta)
assert test_conf.gettimedelta('valid', 'key4') == datetime.timedelta(seconds=-1)
assert isinstance(test_conf.gettimedelta('valid', 'key5'), datetime.timedelta)
assert test_conf.gettimedelta('valid', 'key5') == datetime.timedelta(seconds=0)
assert isinstance(test_conf.gettimedelta('valid', 'key6'), datetime.timedelta)
assert test_conf.gettimedelta('valid', 'key6') == datetime.timedelta(seconds=300)
assert isinstance(test_conf.gettimedelta('default', 'key7'), type(None))
assert test_conf.gettimedelta('default', 'key7') is None