Skip to content

Commit 9323a8e

Browse files
KevinYang21kaxil
authored andcommitted
[Airflow-2760] Decouple DAG parsing loop from scheduler loop (#3873)
1 parent 095f0ab commit 9323a8e

13 files changed

+1108
-424
lines changed

UPDATING.md

+10
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ assists users migrating to a new version.
2424

2525
## Airflow 1.10.1
2626

27+
### New `dag_processor_manager_log_location` config option
28+
29+
The DAG parsing manager log now by default will be log into a file, where its location is
30+
controlled by the new `dag_processor_manager_log_location` config option in core section.
31+
32+
### new `sync_parallelism` config option in celery section
33+
34+
The new `sync_parallelism` config option will control how many processes CeleryExecutor will use to
35+
fetch celery task state in parallel. Default value is max(1, number of cores - 1)
36+
2737
### StatsD Metrics
2838

2939
The `scheduler_heartbeat` metric has been changed from a gauge to a counter. Each loop of the scheduler will increment the counter by 1. This provides a higher degree of visibility and allows for better integration with Prometheus using the [StatsD Exporter](https://github.com/prometheus/statsd_exporter). Scheduler upness can be determined by graphing and alerting using a rate. If the scheduler goes down, the rate will drop to 0.

airflow/config_templates/airflow_local_settings.py

+42-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import os
2121

2222
from airflow import configuration as conf
23+
from airflow.utils.file import mkdirs
2324

2425
# TODO: Logging format and level should be configured
2526
# in this file instead of from airflow.cfg. Currently
@@ -38,7 +39,11 @@
3839

3940
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY')
4041

42+
DAG_PROCESSOR_MANAGER_LOG_LOCATION = \
43+
conf.get('core', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION')
44+
4145
FILENAME_TEMPLATE = conf.get('core', 'LOG_FILENAME_TEMPLATE')
46+
4247
PROCESSOR_FILENAME_TEMPLATE = conf.get('core', 'LOG_PROCESSOR_FILENAME_TEMPLATE')
4348

4449
# Storage bucket url for remote logging
@@ -79,7 +84,7 @@
7984
'formatter': 'airflow',
8085
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
8186
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
82-
},
87+
}
8388
},
8489
'loggers': {
8590
'airflow.processor': {
@@ -104,6 +109,26 @@
104109
}
105110
}
106111

112+
DEFAULT_DAG_PARSING_LOGGING_CONFIG = {
113+
'handlers': {
114+
'processor_manager': {
115+
'class': 'logging.handlers.RotatingFileHandler',
116+
'formatter': 'airflow',
117+
'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION,
118+
'mode': 'a',
119+
'maxBytes': 104857600, # 100MB
120+
'backupCount': 5
121+
}
122+
},
123+
'loggers': {
124+
'airflow.processor_manager': {
125+
'handlers': ['processor_manager'],
126+
'level': LOG_LEVEL,
127+
'propagate': False,
128+
}
129+
}
130+
}
131+
107132
REMOTE_HANDLERS = {
108133
's3': {
109134
'task': {
@@ -172,6 +197,22 @@
172197

173198
REMOTE_LOGGING = conf.get('core', 'remote_logging')
174199

200+
# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set.
201+
# This is to avoid exceptions when initializing RotatingFileHandler multiple times
202+
# in multiple processes.
203+
if os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER') == 'True':
204+
DEFAULT_LOGGING_CONFIG['handlers'] \
205+
.update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'])
206+
DEFAULT_LOGGING_CONFIG['loggers'] \
207+
.update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['loggers'])
208+
209+
# Manually create log directory for processor_manager handler as RotatingFileHandler
210+
# will only create file but not the directory.
211+
processor_manager_handler_config = DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'][
212+
'processor_manager']
213+
directory = os.path.dirname(processor_manager_handler_config['filename'])
214+
mkdirs(directory, 0o755)
215+
175216
if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
176217
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
177218
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):

airflow/config_templates/default_airflow.cfg

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
6868
# Log filename format
6969
log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts }}}}/{{{{ try_number }}}}.log
7070
log_processor_filename_template = {{{{ filename }}}}.log
71+
dag_processor_manager_log_location = {AIRFLOW_HOME}/logs/dag_processor_manager/dag_processor_manager.log
7172

7273
# Hostname by providing a path to a callable, which will resolve the hostname
7374
hostname_callable = socket:getfqdn

airflow/config_templates/default_test.cfg

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ logging_level = INFO
3939
fab_logging_level = WARN
4040
log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts }}}}/{{{{ try_number }}}}.log
4141
log_processor_filename_template = {{{{ filename }}}}.log
42+
dag_processor_manager_log_location = {AIRFLOW_HOME}/logs/dag_processor_manager/dag_processor_manager.log
4243
executor = SequentialExecutor
4344
sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db
4445
load_examples = True

airflow/executors/base_executor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def heartbeat(self):
141141
queue=queue,
142142
executor_config=ti.executor_config)
143143
else:
144-
self.logger.info(
144+
self.log.info(
145145
'Task is already running, not sending to '
146146
'executor: {}'.format(key))
147147

0 commit comments

Comments
 (0)