@@ -349,10 +349,10 @@ def _launch_process(result_queue,
349
349
:param file_path: the file to process
350
350
:type file_path: unicode
351
351
:param pickle_dags: whether to pickle the DAGs found in the file and
352
- save them to the DB
352
+ save them to the DB
353
353
:type pickle_dags: bool
354
354
:param dag_id_white_list: if specified, only examine DAG ID's that are
355
- in this list
355
+ in this list
356
356
:type dag_id_white_list: list[unicode]
357
357
:param thread_name: the name to use for the process that is launched
358
358
:type thread_name: unicode
@@ -424,6 +424,7 @@ def start(self):
424
424
def terminate (self , sigkill = False ):
425
425
"""
426
426
Terminate (and then kill) the process launched to process the file.
427
+
427
428
:param sigkill: whether to issue a SIGKILL if SIGTERM doesn't work.
428
429
:type sigkill: bool
429
430
"""
@@ -452,6 +453,7 @@ def pid(self):
452
453
def exit_code (self ):
453
454
"""
454
455
After the process is finished, this can be called to get the return code
456
+
455
457
:return: the exit code of the process
456
458
:rtype: int
457
459
"""
@@ -463,6 +465,7 @@ def exit_code(self):
463
465
def done (self ):
464
466
"""
465
467
Check if the process launched to process this file is done.
468
+
466
469
:return: whether the process is finished running
467
470
:rtype: bool
468
471
"""
@@ -544,16 +547,18 @@ def __init__(
544
547
:param dag_ids: if specified, only schedule tasks with these DAG IDs
545
548
:type dag_ids: list[unicode]
546
549
:param subdir: directory containing Python files with Airflow DAG
547
- definitions, or a specific path to a file
550
+ definitions, or a specific path to a file
548
551
:type subdir: unicode
549
552
:param num_runs: The number of times to try to schedule each DAG file.
550
- -1 for unlimited within the run_duration.
553
+ -1 for unlimited within the run_duration.
554
+ :type num_runs: int
551
555
:param processor_poll_interval: The number of seconds to wait between
552
- polls of running processors
556
+ polls of running processors
557
+ :type processor_poll_interval: int
553
558
:param run_duration: how long to run (in seconds) before exiting
554
559
:type run_duration: int
555
560
:param do_pickle: once a DAG object is obtained by executing the Python
556
- file, whether to serialize the DAG object to the DB
561
+ file, whether to serialize the DAG object to the DB
557
562
:type do_pickle: bool
558
563
"""
559
564
# for BaseJob compatibility
@@ -783,7 +788,7 @@ def update_import_errors(session, dagbag):
783
788
def create_dag_run (self , dag , session = None ):
784
789
"""
785
790
This method checks whether a new DagRun needs to be created
786
- for a DAG based on scheduling interval
791
+ for a DAG based on scheduling interval.
787
792
Returns DagRun if one is scheduled. Otherwise returns None.
788
793
"""
789
794
if dag .schedule_interval :
@@ -991,7 +996,7 @@ def _change_state_for_tis_without_dagrun(self,
991
996
:param new_state: set TaskInstances to this state
992
997
:type new_state: State
993
998
:param simple_dag_bag: TaskInstances associated with DAGs in the
994
- simple_dag_bag and with states in the old_state will be examined
999
+ simple_dag_bag and with states in the old_state will be examined
995
1000
:type simple_dag_bag: SimpleDagBag
996
1001
"""
997
1002
tis_changed = 0
@@ -1062,7 +1067,7 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None):
1062
1067
dag concurrency, executor state, and priority.
1063
1068
1064
1069
:param simple_dag_bag: TaskInstances associated with DAGs in the
1065
- simple_dag_bag will be fetched from the DB and executed
1070
+ simple_dag_bag will be fetched from the DB and executed
1066
1071
:type simple_dag_bag: SimpleDagBag
1067
1072
:param executor: the executor that runs task instances
1068
1073
:type executor: BaseExecutor
@@ -1373,7 +1378,7 @@ def _execute_task_instances(self,
1373
1378
3. Enqueue the TIs in the executor.
1374
1379
1375
1380
:param simple_dag_bag: TaskInstances associated with DAGs in the
1376
- simple_dag_bag will be fetched from the DB and executed
1381
+ simple_dag_bag will be fetched from the DB and executed
1377
1382
:type simple_dag_bag: SimpleDagBag
1378
1383
:param states: Execute TaskInstances in these states
1379
1384
:type states: Tuple[State]
@@ -1482,7 +1487,7 @@ def _log_file_processing_stats(self,
1482
1487
Print out stats about how files are getting processed.
1483
1488
1484
1489
:param known_file_paths: a list of file paths that may contain Airflow
1485
- DAG definitions
1490
+ DAG definitions
1486
1491
:type known_file_paths: list[unicode]
1487
1492
:param processor_manager: manager for the file processors
1488
1493
:type stats: DagFileProcessorManager
@@ -1788,7 +1793,7 @@ def process_file(self, file_path, pickle_dags=False, session=None):
1788
1793
:param file_path: the path to the Python file that should be executed
1789
1794
:type file_path: unicode
1790
1795
:param pickle_dags: whether serialize the DAGs found in the file and
1791
- save them to the db
1796
+ save them to the db
1792
1797
:type pickle_dags: bool
1793
1798
:return: a list of SimpleDags made from the Dags found in the file
1794
1799
:rtype: list[SimpleDag]
@@ -2027,6 +2032,7 @@ def _update_counters(self, ti_status):
2027
2032
"""
2028
2033
Updates the counters per state of the tasks that were running. Can re-add
2029
2034
to tasks to run in case required.
2035
+
2030
2036
:param ti_status: the internal status of the backfill job tasks
2031
2037
:type ti_status: BackfillJob._DagRunTaskStatus
2032
2038
"""
@@ -2071,6 +2077,7 @@ def _manage_executor_state(self, running):
2071
2077
"""
2072
2078
Checks if the executor agrees with the state of task instances
2073
2079
that are running
2080
+
2074
2081
:param running: dict of key, task to verify
2075
2082
"""
2076
2083
executor = self .executor
@@ -2102,6 +2109,7 @@ def _get_dag_run(self, run_date, session=None):
2102
2109
Returns a dag run for the given run date, which will be matched to an existing
2103
2110
dag run if available or create a new dag run otherwise. If the max_active_runs
2104
2111
limit is reached, this function will return None.
2112
+
2105
2113
:param run_date: the execution date for the dag run
2106
2114
:type run_date: datetime
2107
2115
:param session: the database session object
@@ -2161,6 +2169,7 @@ def _task_instances_for_dag_run(self, dag_run, session=None):
2161
2169
"""
2162
2170
Returns a map of task instance key to task instance object for the tasks to
2163
2171
run in the given dag run.
2172
+
2164
2173
:param dag_run: the dag run to get the tasks from
2165
2174
:type dag_run: models.DagRun
2166
2175
:param session: the database session object
@@ -2226,6 +2235,7 @@ def _process_backfill_task_instances(self,
2226
2235
Process a set of task instances from a set of dag runs. Special handling is done
2227
2236
to account for different task instance states that could be present when running
2228
2237
them in a backfill process.
2238
+
2229
2239
:param ti_status: the internal status of the job
2230
2240
:type ti_status: BackfillJob._DagRunTaskStatus
2231
2241
:param executor: the executor to run the task instances
@@ -2463,6 +2473,7 @@ def _execute_for_run_dates(self, run_dates, ti_status, executor, pickle_id,
2463
2473
Computes the dag runs and their respective task instances for
2464
2474
the given run dates and executes the task instances.
2465
2475
Returns a list of execution dates of the dag runs that were executed.
2476
+
2466
2477
:param run_dates: Execution dates for dag runs
2467
2478
:type run_dates: list
2468
2479
:param ti_status: internal BackfillJob status structure to tis track progress
0 commit comments