-
Notifications
You must be signed in to change notification settings - Fork 14.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AIP-45 Remove dag parsing in airflow run local #21877
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -149,6 +149,7 @@ def __init__( | |
self.processor_agent: Optional[DagFileProcessorAgent] = None | ||
|
||
self.dagbag = DagBag(dag_folder=self.subdir, read_dags_from_db=True, load_op_links=False) | ||
self._paused_dag_without_running_dagruns: Set = set() | ||
|
||
if conf.getboolean('smart_sensor', 'use_smart_sensor'): | ||
compatible_sensors = set( | ||
|
@@ -764,6 +765,26 @@ def _execute(self) -> None: | |
self.log.exception("Exception when executing DagFileProcessorAgent.end") | ||
self.log.info("Exited execute loop") | ||
|
||
def _update_dag_run_state_for_paused_dags(self): | ||
try: | ||
paused_dag_ids = DagModel.get_all_paused_dag_ids() | ||
for dag_id in paused_dag_ids: | ||
pingzh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if dag_id in self._paused_dag_without_running_dagruns: | ||
continue | ||
Comment on lines
+772
to
+773
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. dag_ids are never removed from this set. If I unpause the DAG after this has run and then later re-pause it, then this method skips it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch. let me open a PR. |
||
|
||
dag = SerializedDagModel.get_dag(dag_id) | ||
if dag is None: | ||
continue | ||
dag_runs = DagRun.find(dag_id=dag_id, state=State.RUNNING) | ||
for dag_run in dag_runs: | ||
dag_run.dag = dag | ||
_, callback_to_run = dag_run.update_state(execute_callbacks=False) | ||
if callback_to_run: | ||
self._send_dag_callbacks_to_processor(dag, callback_to_run) | ||
self._paused_dag_without_running_dagruns.add(dag_id) | ||
except Exception as e: # should not fail the scheduler | ||
self.log.exception('Failed to update dag run state for paused dags due to %s', str(e)) | ||
|
||
def _run_scheduler_loop(self) -> None: | ||
""" | ||
The actual scheduler loop. The main steps in the loop are: | ||
|
@@ -809,6 +830,7 @@ def _run_scheduler_loop(self) -> None: | |
conf.getfloat('scheduler', 'zombie_detection_interval', fallback=10.0), | ||
self._find_zombies, | ||
) | ||
timers.call_regular_interval(60.0, self._update_dag_run_state_for_paused_dags) | ||
|
||
for loop_count in itertools.count(start=1): | ||
with Stats.timer() as timer: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did this have to move out of LocalTaskJob?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since no callbacks is executed in the LocalTaskJob, we need to rely on the scheduler to check and update the paused dag runs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not too happy about this change -- the point of having this in LocalTaskJob is so that the state of a dag run updates quickly after the task finishes.
By moving it to the scheduler we've lost that.
Instead of moving this to the scheduler, could we move it "down" in to the raw task process instead? That or I wonder if there is some way with AIP-44 to add a callback from the LTJ if needed.
Final option: If there are no callbacks we can still safely do this here, so maybe we only need to do this in the scheduler in case of callbacks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point on having the LocalTaskJob to mark the dag run quickly. my concern is that it leaks the responsibility of the scheduler, also there is chance that the LTJ fails before marking the dag run state, which leaves the unhandled dag run states (this could be rare). also, since the dag is paused, it would be a good deal if the dag run state is updated a little bit late.
as we discussed in this email thread:
[DISCUSSION] let scheduler heal tasks stuck in queued state
, we will need to define the responsibility of each component in airflow (scheduler, executor, LTJ (airflow run --local),airflow run --raw
) in terms of the state machine.we can have a thorough discussion there. let me know your thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I quite agree with no need to update pasused tasks state immediately - this makes this part of the changfe far more appealing. Since this change is merged - @ashb - if you stil have some concerns, we can continue discussing here and maybe make a follow-up change in case we think of a scenario where it might be problematic.