Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-3175] Fix docstring format in airflow/jobs.py #4025

Merged
merged 1 commit into from
Oct 9, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,10 @@ def _launch_process(result_queue,
:param file_path: the file to process
:type file_path: unicode
:param pickle_dags: whether to pickle the DAGs found in the file and
save them to the DB
save them to the DB
:type pickle_dags: bool
:param dag_id_white_list: if specified, only examine DAG ID's that are
in this list
in this list
:type dag_id_white_list: list[unicode]
:param thread_name: the name to use for the process that is launched
:type thread_name: unicode
Expand Down Expand Up @@ -424,6 +424,7 @@ def start(self):
def terminate(self, sigkill=False):
"""
Terminate (and then kill) the process launched to process the file.

:param sigkill: whether to issue a SIGKILL if SIGTERM doesn't work.
:type sigkill: bool
"""
Expand Down Expand Up @@ -452,6 +453,7 @@ def pid(self):
def exit_code(self):
"""
After the process is finished, this can be called to get the return code

:return: the exit code of the process
:rtype: int
"""
Expand All @@ -463,6 +465,7 @@ def exit_code(self):
def done(self):
"""
Check if the process launched to process this file is done.

:return: whether the process is finished running
:rtype: bool
"""
Expand Down Expand Up @@ -544,16 +547,18 @@ def __init__(
:param dag_ids: if specified, only schedule tasks with these DAG IDs
:type dag_ids: list[unicode]
:param subdir: directory containing Python files with Airflow DAG
definitions, or a specific path to a file
definitions, or a specific path to a file
:type subdir: unicode
:param num_runs: The number of times to try to schedule each DAG file.
-1 for unlimited within the run_duration.
-1 for unlimited within the run_duration.
:type num_runs: int
:param processor_poll_interval: The number of seconds to wait between
polls of running processors
polls of running processors
:type processor_poll_interval: int
:param run_duration: how long to run (in seconds) before exiting
:type run_duration: int
:param do_pickle: once a DAG object is obtained by executing the Python
file, whether to serialize the DAG object to the DB
file, whether to serialize the DAG object to the DB
:type do_pickle: bool
"""
# for BaseJob compatibility
Expand Down Expand Up @@ -782,7 +787,7 @@ def update_import_errors(session, dagbag):
def create_dag_run(self, dag, session=None):
"""
This method checks whether a new DagRun needs to be created
for a DAG based on scheduling interval
for a DAG based on scheduling interval.
Returns DagRun if one is scheduled. Otherwise returns None.
"""
if dag.schedule_interval and conf.getboolean('scheduler', 'USE_JOB_SCHEDULE'):
Expand Down Expand Up @@ -990,7 +995,7 @@ def _change_state_for_tis_without_dagrun(self,
:param new_state: set TaskInstances to this state
:type new_state: State
:param simple_dag_bag: TaskInstances associated with DAGs in the
simple_dag_bag and with states in the old_state will be examined
simple_dag_bag and with states in the old_state will be examined
:type simple_dag_bag: SimpleDagBag
"""
tis_changed = 0
Expand Down Expand Up @@ -1061,7 +1066,7 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None):
dag concurrency, executor state, and priority.

:param simple_dag_bag: TaskInstances associated with DAGs in the
simple_dag_bag will be fetched from the DB and executed
simple_dag_bag will be fetched from the DB and executed
:type simple_dag_bag: SimpleDagBag
:param executor: the executor that runs task instances
:type executor: BaseExecutor
Expand Down Expand Up @@ -1374,7 +1379,7 @@ def _execute_task_instances(self,
3. Enqueue the TIs in the executor.

:param simple_dag_bag: TaskInstances associated with DAGs in the
simple_dag_bag will be fetched from the DB and executed
simple_dag_bag will be fetched from the DB and executed
:type simple_dag_bag: SimpleDagBag
:param states: Execute TaskInstances in these states
:type states: Tuple[State]
Expand Down Expand Up @@ -1483,7 +1488,7 @@ def _log_file_processing_stats(self,
Print out stats about how files are getting processed.

:param known_file_paths: a list of file paths that may contain Airflow
DAG definitions
DAG definitions
:type known_file_paths: list[unicode]
:param processor_manager: manager for the file processors
:type stats: DagFileProcessorManager
Expand Down Expand Up @@ -1789,7 +1794,7 @@ def process_file(self, file_path, pickle_dags=False, session=None):
:param file_path: the path to the Python file that should be executed
:type file_path: unicode
:param pickle_dags: whether serialize the DAGs found in the file and
save them to the db
save them to the db
:type pickle_dags: bool
:return: a list of SimpleDags made from the Dags found in the file
:rtype: list[SimpleDag]
Expand Down Expand Up @@ -2028,6 +2033,7 @@ def _update_counters(self, ti_status):
"""
Updates the counters per state of the tasks that were running. Can re-add
to tasks to run in case required.

:param ti_status: the internal status of the backfill job tasks
:type ti_status: BackfillJob._DagRunTaskStatus
"""
Expand Down Expand Up @@ -2072,6 +2078,7 @@ def _manage_executor_state(self, running):
"""
Checks if the executor agrees with the state of task instances
that are running

:param running: dict of key, task to verify
"""
executor = self.executor
Expand Down Expand Up @@ -2103,6 +2110,7 @@ def _get_dag_run(self, run_date, session=None):
Returns a dag run for the given run date, which will be matched to an existing
dag run if available or create a new dag run otherwise. If the max_active_runs
limit is reached, this function will return None.

:param run_date: the execution date for the dag run
:type run_date: datetime
:param session: the database session object
Expand Down Expand Up @@ -2162,6 +2170,7 @@ def _task_instances_for_dag_run(self, dag_run, session=None):
"""
Returns a map of task instance key to task instance object for the tasks to
run in the given dag run.

:param dag_run: the dag run to get the tasks from
:type dag_run: models.DagRun
:param session: the database session object
Expand Down Expand Up @@ -2227,6 +2236,7 @@ def _process_backfill_task_instances(self,
Process a set of task instances from a set of dag runs. Special handling is done
to account for different task instance states that could be present when running
them in a backfill process.

:param ti_status: the internal status of the job
:type ti_status: BackfillJob._DagRunTaskStatus
:param executor: the executor to run the task instances
Expand Down Expand Up @@ -2464,6 +2474,7 @@ def _execute_for_run_dates(self, run_dates, ti_status, executor, pickle_id,
Computes the dag runs and their respective task instances for
the given run dates and executes the task instances.
Returns a list of execution dates of the dag runs that were executed.

:param run_dates: Execution dates for dag runs
:type run_dates: list
:param ti_status: internal BackfillJob status structure to tis track progress
Expand Down