Skip to content

Commit 067ec04

Browse files
YingboWangashb
authored andcommitted
[AIRFLOW-2930] Fix celery excecutor scheduler crash (apache#3784)
Caused by an update in PR apache#3740. execute_command.apply_async(args=command, ...) -command is a list of short unicode strings and the above code pass multiple arguments to a function defined as taking only one argument. -command = ["airflow", "run", "dag323",...] -args = command = ["airflow", "run", "dag323", ...] -execute_command("airflow","run","dag3s3", ...) will be error and exit.
1 parent ad2bc2c commit 067ec04

File tree

4 files changed

+7
-7
lines changed

4 files changed

+7
-7
lines changed

airflow/executors/celery_executor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def execute_async(self, key, command,
141141
self.log.info("[celery] queuing {key} through celery, "
142142
"queue={queue}".format(**locals()))
143143
self.tasks[key] = execute_command.apply_async(
144-
args=command, queue=queue)
144+
args=[command], queue=queue)
145145
self.last_state[key] = celery_states.PENDING
146146

147147
def _num_tasks_per_process(self):

tests/executors/dask_executor.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ def assert_tasks_on_executor(self, executor):
5555
# start the executor
5656
executor.start()
5757

58-
success_command = ['true', ]
59-
fail_command = ['false', ]
58+
success_command = ['true', 'some_parameter']
59+
fail_command = ['false', 'some_parameter']
6060

6161
executor.execute_async(key='success', command=success_command)
6262
executor.execute_async(key='fail', command=fail_command)

tests/executors/test_celery_executor.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ def test_celery_integration(self):
4141
executor.start()
4242
with start_worker(app=app, logfile=sys.stdout, loglevel='debug'):
4343

44-
success_command = ['true', ]
45-
fail_command = ['false', ]
44+
success_command = ['true', 'some_parameter']
45+
fail_command = ['false', 'some_parameter']
4646

4747
executor.execute_async(key='success', command=success_command)
4848
# errors are propagated for some reason

tests/executors/test_local_executor.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ def execution_parallelism(self, parallelism=0):
3333
executor.start()
3434

3535
success_key = 'success {}'
36-
success_command = ['true', ]
37-
fail_command = ['false', ]
36+
success_command = ['true', 'some_parameter']
37+
fail_command = ['false', 'some_parameter']
3838

3939
for i in range(self.TEST_SUCCESS_COMMANDS):
4040
key, command = success_key.format(i), success_command

0 commit comments

Comments
 (0)