Skip to content

Commit 7142ae0

Browse files
committed
[AIRFLOW-2895] Prevent scheduler from spamming heartbeats/logs
Reverts most of AIRFLOW-2027 until the issues with it can be fixed. Closes #3747 from aoen/revert_min_file_parsing_time_commit
1 parent 404be4b commit 7142ae0

File tree

7 files changed

+34
-49
lines changed

7 files changed

+34
-49
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ Currently **officially** using Airflow:
246246
1. [Tile](https://tile.com/) [[@ranjanmanish](https://github.com/ranjanmanish)]
247247
1. [Tokopedia](https://www.tokopedia.com/) [@topedmaria](https://github.com/topedmaria)
248248
1. [Twine Labs](https://www.twinelabs.com/) [[@ivorpeles](https://github.com/ivorpeles)]
249+
1. [Twitter](https://www.twitter.com/) [[@aoen](https://github.com/aoen)]
249250
1. [T2 Systems](http://t2systems.com) [[@unclaimedpants](https://github.com/unclaimedpants)]
250251
1. [Ubisoft](https://www.ubisoft.com/) [[@Walkoss](https://github.com/Walkoss)]
251252
1. [United Airlines](https://www.united.com/) [[@ilopezfr](https://github.com/ilopezfr)]

UPDATING.md

+6-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ so you might need to update your config.
1212

1313
`task_runner = StandardTaskRunner`
1414

15+
### min_file_parsing_loop_time config option temporarily disabled
16+
17+
The scheduler.min_file_parsing_loop_time config option has been temporarily removed due to
18+
some bugs.
19+
1520
## Airflow 1.10
1621

1722
Installation and upgrading requires setting `SLUGIFY_USES_TEXT_UNIDECODE=yes` in your environment or
@@ -428,7 +433,7 @@ indefinitely. This is only available on the command line.
428433
After how much time should an updated DAG be picked up from the filesystem.
429434

430435
#### min_file_parsing_loop_time
431-
436+
CURRENTLY DISABLED DUE TO A BUG
432437
How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
433438

434439
#### dag_dir_list_interval

airflow/config_templates/default_airflow.cfg

-3
Original file line numberDiff line numberDiff line change
@@ -435,9 +435,6 @@ run_duration = -1
435435
# after how much time (seconds) a new DAGs should be picked up from the filesystem
436436
min_file_process_interval = 0
437437

438-
# How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
439-
min_file_parsing_loop_time = 1
440-
441438
# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
442439
dag_dir_list_interval = 300
443440

airflow/jobs.py

+23-19
Original file line numberDiff line numberDiff line change
@@ -533,8 +533,7 @@ def __init__(
533533
num_runs=-1,
534534
file_process_interval=conf.getint('scheduler',
535535
'min_file_process_interval'),
536-
min_file_parsing_loop_time=conf.getint('scheduler',
537-
'min_file_parsing_loop_time'),
536+
processor_poll_interval=1.0,
538537
run_duration=None,
539538
do_pickle=False,
540539
log=None,
@@ -549,6 +548,8 @@ def __init__(
549548
:type subdir: unicode
550549
:param num_runs: The number of times to try to schedule each DAG file.
551550
-1 for unlimited within the run_duration.
551+
:param processor_poll_interval: The number of seconds to wait between
552+
polls of running processors
552553
:param run_duration: how long to run (in seconds) before exiting
553554
:type run_duration: int
554555
:param do_pickle: once a DAG object is obtained by executing the Python
@@ -565,6 +566,7 @@ def __init__(
565566

566567
self.num_runs = num_runs
567568
self.run_duration = run_duration
569+
self._processor_poll_interval = processor_poll_interval
568570

569571
self.do_pickle = do_pickle
570572
super(SchedulerJob, self).__init__(*args, **kwargs)
@@ -592,10 +594,7 @@ def __init__(
592594

593595
self.file_process_interval = file_process_interval
594596

595-
# Wait until at least this many seconds have passed before parsing files once all
596-
# files have finished parsing.
597-
self.min_file_parsing_loop_time = min_file_parsing_loop_time
598-
597+
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
599598
if run_duration is None:
600599
self.run_duration = conf.getint('scheduler',
601600
'run_duration')
@@ -1557,16 +1556,18 @@ def _execute(self):
15571556
# DAGs in parallel. By processing them in separate processes,
15581557
# we can get parallelism and isolation from potentially harmful
15591558
# user code.
1560-
self.log.info("Processing files using up to %s processes at a time",
1561-
self.max_threads)
1559+
self.log.info(
1560+
"Processing files using up to %s processes at a time",
1561+
self.max_threads)
15621562
self.log.info("Running execute loop for %s seconds", self.run_duration)
15631563
self.log.info("Processing each file at most %s times", self.num_runs)
1564-
self.log.info("Process each file at most once every %s seconds",
1565-
self.file_process_interval)
1566-
self.log.info("Wait until at least %s seconds have passed between file parsing "
1567-
"loops", self.min_file_parsing_loop_time)
1568-
self.log.info("Checking for new files in %s every %s seconds",
1569-
self.subdir, self.dag_dir_list_interval)
1564+
self.log.info(
1565+
"Process each file at most once every %s seconds",
1566+
self.file_process_interval)
1567+
self.log.info(
1568+
"Checking for new files in %s every %s seconds",
1569+
self.subdir,
1570+
self.dag_dir_list_interval)
15701571

15711572
# Build up a list of Python files that could contain DAGs
15721573
self.log.info("Searching for files in %s", self.subdir)
@@ -1582,7 +1583,6 @@ def processor_factory(file_path):
15821583
known_file_paths,
15831584
self.max_threads,
15841585
self.file_process_interval,
1585-
self.min_file_parsing_loop_time,
15861586
self.num_runs,
15871587
processor_factory)
15881588

@@ -1734,13 +1734,17 @@ def _execute_helper(self, processor_manager):
17341734
last_stat_print_time = timezone.utcnow()
17351735

17361736
loop_end_time = time.time()
1737-
self.log.debug("Ran scheduling loop in %.2f seconds",
1738-
loop_end_time - loop_start_time)
1737+
self.log.debug(
1738+
"Ran scheduling loop in %.2f seconds",
1739+
loop_end_time - loop_start_time)
1740+
self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
1741+
time.sleep(self._processor_poll_interval)
17391742

17401743
# Exit early for a test mode
17411744
if processor_manager.max_runs_reached():
1742-
self.log.info("Exiting loop as all files have been processed %s times",
1743-
self.num_runs)
1745+
self.log.info(
1746+
"Exiting loop as all files have been processed %s times",
1747+
self.num_runs)
17441748
break
17451749

17461750
# Stop any processors

airflow/utils/dag_processing.py

+4-21
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,6 @@ def __init__(self,
326326
file_paths,
327327
parallelism,
328328
process_file_interval,
329-
min_file_parsing_loop_time,
330329
max_runs,
331330
processor_factory):
332331
"""
@@ -340,9 +339,6 @@ def __init__(self,
340339
:param process_file_interval: process a file at most once every this
341340
many seconds
342341
:type process_file_interval: float
343-
:param min_file_parsing_loop_time: wait until at least this many seconds have
344-
passed before parsing files once all files have finished parsing.
345-
:type min_file_parsing_loop_time: float
346342
:param max_runs: The number of times to parse and schedule each file. -1
347343
for unlimited.
348344
:type max_runs: int
@@ -358,7 +354,6 @@ def __init__(self,
358354
self._dag_directory = dag_directory
359355
self._max_runs = max_runs
360356
self._process_file_interval = process_file_interval
361-
self._min_file_parsing_loop_time = min_file_parsing_loop_time
362357
self._processor_factory = processor_factory
363358
# Map from file path to the processor
364359
self._processors = {}
@@ -529,24 +524,12 @@ def heartbeat(self):
529524
file_paths_in_progress = self._processors.keys()
530525
now = timezone.utcnow()
531526
file_paths_recently_processed = []
532-
533-
longest_parse_duration = 0
534527
for file_path in self._file_paths:
535528
last_finish_time = self.get_last_finish_time(file_path)
536-
if last_finish_time is not None:
537-
duration = now - last_finish_time
538-
longest_parse_duration = max(duration.total_seconds(),
539-
longest_parse_duration)
540-
if duration.total_seconds() < self._process_file_interval:
541-
file_paths_recently_processed.append(file_path)
542-
543-
sleep_length = max(self._min_file_parsing_loop_time - longest_parse_duration,
544-
0)
545-
if sleep_length > 0:
546-
self.log.debug("Sleeping for %.2f seconds to prevent excessive "
547-
"logging",
548-
sleep_length)
549-
time.sleep(sleep_length)
529+
if (last_finish_time is not None and
530+
(now - last_finish_time).total_seconds() <
531+
self._process_file_interval):
532+
file_paths_recently_processed.append(file_path)
550533

551534
files_paths_at_run_limit = [file_path
552535
for file_path, num_runs in self._run_count.items()

scripts/ci/kubernetes/kube/configmaps.yaml

-3
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,6 @@ data:
5757
statsd_port = 8125
5858
statsd_prefix = airflow
5959
60-
# How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
61-
min_file_parsing_loop_time = 1
62-
6360
print_stats_interval = 30
6461
scheduler_zombie_task_threshold = 300
6562
max_tis_per_query = 0

tests/utils/test_dag_processing.py

-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
3232
parallelism=1,
3333
process_file_interval=1,
3434
max_runs=1,
35-
min_file_parsing_loop_time=0,
3635
processor_factory=MagicMock().return_value)
3736

3837
mock_processor = MagicMock()
@@ -52,7 +51,6 @@ def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self):
5251
parallelism=1,
5352
process_file_interval=1,
5453
max_runs=1,
55-
min_file_parsing_loop_time=0,
5654
processor_factory=MagicMock().return_value)
5755

5856
mock_processor = MagicMock()

0 commit comments

Comments
 (0)