Skip to content

Commit f87de81

Browse files
YingboWangAlice Berard
authored and
Alice Berard
committed
[AIRFLOW-2930] Fix celery excecutor scheduler crash (apache#3784)
Caused by an update in PR apache#3740. execute_command.apply_async(args=command, ...) -command is a list of short unicode strings and the above code pass multiple arguments to a function defined as taking only one argument. -command = ["airflow", "run", "dag323",...] -args = command = ["airflow", "run", "dag323", ...] -execute_command("airflow","run","dag3s3", ...) will be error and exit.
1 parent e826e1e commit f87de81

File tree

4 files changed

+7
-7
lines changed

4 files changed

+7
-7
lines changed

airflow/executors/celery_executor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def execute_async(self, key, command,
8282
self.log.info("[celery] queuing {key} through celery, "
8383
"queue={queue}".format(**locals()))
8484
self.tasks[key] = execute_command.apply_async(
85-
args=command, queue=queue)
85+
args=[command], queue=queue)
8686
self.last_state[key] = celery_states.PENDING
8787

8888
def sync(self):

tests/executors/dask_executor.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ def assert_tasks_on_executor(self, executor):
5555
# start the executor
5656
executor.start()
5757

58-
success_command = ['true', ]
59-
fail_command = ['false', ]
58+
success_command = ['true', 'some_parameter']
59+
fail_command = ['false', 'some_parameter']
6060

6161
executor.execute_async(key='success', command=success_command)
6262
executor.execute_async(key='fail', command=fail_command)

tests/executors/test_celery_executor.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ def test_celery_integration(self):
3434
executor.start()
3535
with start_worker(app=app, logfile=sys.stdout, loglevel='debug'):
3636

37-
success_command = ['true', ]
38-
fail_command = ['false', ]
37+
success_command = ['true', 'some_parameter']
38+
fail_command = ['false', 'some_parameter']
3939

4040
executor.execute_async(key='success', command=success_command)
4141
# errors are propagated for some reason

tests/executors/test_local_executor.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ def execution_parallelism(self, parallelism=0):
3333
executor.start()
3434

3535
success_key = 'success {}'
36-
success_command = ['true', ]
37-
fail_command = ['false', ]
36+
success_command = ['true', 'some_parameter']
37+
fail_command = ['false', 'some_parameter']
3838

3939
for i in range(self.TEST_SUCCESS_COMMANDS):
4040
key, command = success_key.format(i), success_command

0 commit comments

Comments
 (0)