Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Empty expand() crashes the scheduler #23435

Closed
2 tasks done
jedcunningham opened this issue May 3, 2022 · 1 comment · Fixed by #23463
Closed
2 tasks done

Empty expand() crashes the scheduler #23435

jedcunningham opened this issue May 3, 2022 · 1 comment · Fixed by #23463
Assignees
Labels
affected_version:2.3 Issues Reported for 2.3 area:core area:dynamic-task-mapping AIP-42 kind:bug This is a clearly a bug
Milestone

Comments

@jedcunningham
Copy link
Member

Apache Airflow version

2.3.0 (latest released)

What happened

I've found a DAG that will crash the scheduler:

@task
    def hello():
        return "hello"

    hello.expand()
[2022-05-03 03:41:23,779] {scheduler_job.py:753} ERROR - Exception when executing SchedulerJob._run_scheduler_loop                 
Traceback (most recent call last):                                                                                                 
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 736, in _execute                     
    self._run_scheduler_loop()                                                                                                     
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 824, in _run_scheduler_loop          
    num_queued_tis = self._do_scheduling(session)                                                                                  
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 906, in _do_scheduling               
    callback_to_run = self._schedule_dag_run(dag_run, session)                                                                     
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1148, in _schedule_dag_run           
    schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)                              
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 68, in wrapper                            
    return func(*args, **kwargs)                                                                                                   
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 522, in update_state                      
    info = self.task_instance_scheduling_decisions(session)                                                                        
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 68, in wrapper                            
    return func(*args, **kwargs)                                                                                                   
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 661, in task_instance_scheduling_decisions
    session=session,                                                                                                               
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 714, in _get_ready_tis                    
    expanded_tis, _ = schedulable.task.expand_mapped_task(self.run_id, session=session)                                            
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/mappedoperator.py", line 609, in expand_mapped_task        
    operator.mul, self._resolve_map_lengths(run_id, session=session).values()                                                      
TypeError: reduce() of empty sequence with no initial value                                                                        

What you think should happen instead

A user DAG shouldn't crash the scheduler. This specific case could likely be an ImportError at parse time, but it makes me think we might be missing some exception handling?

How to reproduce

No response

Operating System

Debian

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@jedcunningham jedcunningham added kind:bug This is a clearly a bug area:core labels May 3, 2022
@uranusjr
Copy link
Member

uranusjr commented May 3, 2022

We should make this a parse-time error because expanding nothing makes no sense anyway. If we really want this to work (which I assume should just be expanding to one task), this can be easily amendable by adding a 1 to the end of the currently crashing reduce call.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.3 Issues Reported for 2.3 area:core area:dynamic-task-mapping AIP-42 kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants