From 8b2fe88e18f1c650b06f0275336fa7918400bd03 Mon Sep 17 00:00:00 2001 From: Spyros Zoupanos Date: Tue, 7 Nov 2017 18:29:34 +0100 Subject: [PATCH 1/8] Trying to reproduce the problem --- aiida/backends/tests/workflows.py | 26 ++++++++++++++++++++++++++ aiida/daemon/tasks.py | 2 +- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/aiida/backends/tests/workflows.py b/aiida/backends/tests/workflows.py index 714beca46a..2b6e92bb82 100644 --- a/aiida/backends/tests/workflows.py +++ b/aiida/backends/tests/workflows.py @@ -15,6 +15,8 @@ from aiida.workflows.test import WFTestEmpty from aiida.orm.implementation import get_workflow_info from aiida.workflows.test import WFTestSimpleWithSubWF +from aiida.workflows.wf_demo import WorkflowDemo +from aiida.daemon.workflowmanager import execute_steps class TestWorkflowBasic(AiidaTestCase): @@ -152,6 +154,30 @@ def test_wf_get_state(self): # it is a valid state self.assertIn(wf.get_state(), wf_states) + def test_old_wf_results(self): + class FailingWorkflowDemo(WorkflowDemo): + def generate_calc(self): + from aiida.orm import Code, Computer, CalculationFactory + from aiida.common.datastructures import calc_states + + CustomCalc = CalculationFactory( + 'simpleplugins.templatereplacer') + + computer = Computer.get("localhost") + + calc = CustomCalc(computer=computer, withmpi=True) + calc.set_resources( + {"num_machines": 1, "num_mpiprocs_per_machine": 1}) + calc.store() + calc._set_state(calc_states.FAILED) + + return calc + + wf = FailingWorkflowDemo() + wf.start() + while wf.is_running(): + execute_steps() + def tearDown(self): """ diff --git a/aiida/daemon/tasks.py b/aiida/daemon/tasks.py index fd09e60c89..6da72a7202 100644 --- a/aiida/daemon/tasks.py +++ b/aiida/daemon/tasks.py @@ -139,5 +139,5 @@ def manual_tick_all(): submit_jobs() update_jobs() retrieve_jobs() - execute_steps() # legacy workflows + workflow_stepper() # legacy workflows tick_workflow_engine() From cd64f722721d148fdb32f27adf8a1d3823723fad Mon Sep 17 00:00:00 2001 From: Spyros Zoupanos Date: Tue, 7 Nov 2017 19:56:34 +0100 Subject: [PATCH 2/8] More proger with the test --- aiida/backends/tests/workflows.py | 30 +++++++++++------------------- aiida/workflows/test.py | 27 +++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 19 deletions(-) diff --git a/aiida/backends/tests/workflows.py b/aiida/backends/tests/workflows.py index 2b6e92bb82..24eb137afb 100644 --- a/aiida/backends/tests/workflows.py +++ b/aiida/backends/tests/workflows.py @@ -16,7 +16,7 @@ from aiida.orm.implementation import get_workflow_info from aiida.workflows.test import WFTestSimpleWithSubWF from aiida.workflows.wf_demo import WorkflowDemo -from aiida.daemon.workflowmanager import execute_steps +from aiida.daemon.tasks import workflow_stepper class TestWorkflowBasic(AiidaTestCase): @@ -154,29 +154,21 @@ def test_wf_get_state(self): # it is a valid state self.assertIn(wf.get_state(), wf_states) - def test_old_wf_results(self): - class FailingWorkflowDemo(WorkflowDemo): - def generate_calc(self): - from aiida.orm import Code, Computer, CalculationFactory - from aiida.common.datastructures import calc_states + def test_failing_calc_in_wf(self): + from time import sleep + from aiida.daemon.workflowmanager import execute_steps + from aiida.workflows.test import WFTestSimple, WFTestSimpleWithSubWF + from aiida.orm.workflow import Workflow + from aiida.workflows.test import FailingWFTestSimple - CustomCalc = CalculationFactory( - 'simpleplugins.templatereplacer') - - computer = Computer.get("localhost") - - calc = CustomCalc(computer=computer, withmpi=True) - calc.set_resources( - {"num_machines": 1, "num_mpiprocs_per_machine": 1}) - calc.store() - calc._set_state(calc_states.FAILED) - - return calc + wf = FailingWFTestSimple() + wf.store() - wf = FailingWorkflowDemo() wf.start() while wf.is_running(): + # workflow_stepper() execute_steps() + sleep(1) def tearDown(self): diff --git a/aiida/workflows/test.py b/aiida/workflows/test.py index babfdb1d60..9ec0a69e64 100644 --- a/aiida/workflows/test.py +++ b/aiida/workflows/test.py @@ -40,6 +40,33 @@ def second_step(self): self.next(self.exit) +class FailingWFTestSimple(WFTestSimple): + @Workflow.step + def start(self): + # Testing calculations + self.attach_calculation(self.generate_calc()) + + # Test process + self.next(self.second_step) + + def generate_calc(self): + from aiida.orm import Code, Computer, CalculationFactory + from aiida.common.datastructures import calc_states + + CustomCalc = CalculationFactory( + 'simpleplugins.templatereplacer') + + computer = Computer.get("localhost") + + calc = CustomCalc(computer=computer, withmpi=True) + calc.set_resources( + {"num_machines": 1, "num_mpiprocs_per_machine": 1}) + calc.store() + calc._set_state(calc_states.FAILED) + + return calc + + class WFTestSimpleWithSubWF(Workflow): def __init__(self, **kwargs): super(WFTestSimpleWithSubWF, self).__init__(**kwargs) From ae80cf611a14c483077a25d6c081b8a023026270 Mon Sep 17 00:00:00 2001 From: Spyros Zoupanos Date: Wed, 8 Nov 2017 11:05:09 +0100 Subject: [PATCH 3/8] More progress on reproduction --- aiida/backends/tests/workflows.py | 20 +++++++++++++---- aiida/workflows/test.py | 36 +++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/aiida/backends/tests/workflows.py b/aiida/backends/tests/workflows.py index 24eb137afb..59adbc2f6e 100644 --- a/aiida/backends/tests/workflows.py +++ b/aiida/backends/tests/workflows.py @@ -159,16 +159,28 @@ def test_failing_calc_in_wf(self): from aiida.daemon.workflowmanager import execute_steps from aiida.workflows.test import WFTestSimple, WFTestSimpleWithSubWF from aiida.orm.workflow import Workflow - from aiida.workflows.test import FailingWFTestSimple - - wf = FailingWFTestSimple() + from aiida.workflows.test import FailingWFTestSimple, FailingWFTestSimpleWithSubWF + from aiida.daemon.tasks import manual_tick_all + # wf = FailingWFTestSimple() + wf = FailingWFTestSimpleWithSubWF() wf.store() wf.start() while wf.is_running(): # workflow_stepper() execute_steps() - sleep(1) + # manual_tick_all() + sleep(.1) + + pks = [wf.pk] + workflows = get_workflow_list(pks) + print "=====> ", workflows + + tab_size = 2 # how many spaces to use for indentation of subworkflows + for w in workflows: + if not w.is_subworkflow() or w.pk in pks: + print "\n".join(get_workflow_info(w, tab_size=tab_size, + depth=16)) def tearDown(self): diff --git a/aiida/workflows/test.py b/aiida/workflows/test.py index 9ec0a69e64..6f3e439d78 100644 --- a/aiida/workflows/test.py +++ b/aiida/workflows/test.py @@ -49,6 +49,18 @@ def start(self): # Test process self.next(self.second_step) + @Workflow.step + def second_step(self): + # Testing calculations + self.attach_calculation(self.generate_calc()) + + # Test process + self.next(self.third_step) + + @Workflow.step + def third_step(self): + self.next(self.exit) + def generate_calc(self): from aiida.orm import Code, Computer, CalculationFactory from aiida.common.datastructures import calc_states @@ -67,6 +79,30 @@ def generate_calc(self): return calc +class FailingWFTestSimpleWithSubWF(Workflow): + def __init__(self, **kwargs): + super(FailingWFTestSimpleWithSubWF, self).__init__(**kwargs) + + @Workflow.step + def start(self): + self.attach_calculation(generate_calc()) + + # Create two subworkflows + w = FailingWFTestSimple() + w.start() + self.attach_workflow(w) + + w = FailingWFTestSimple() + w.start() + self.attach_workflow(w) + + self.next(self.second) + + @Workflow.step + def second(self): + self.next(self.exit) + + class WFTestSimpleWithSubWF(Workflow): def __init__(self, **kwargs): super(WFTestSimpleWithSubWF, self).__init__(**kwargs) From ae1d956eba9fa77ce3e947a8f04c5ea45c0833cb Mon Sep 17 00:00:00 2001 From: Spyros Zoupanos Date: Wed, 8 Nov 2017 11:25:43 +0100 Subject: [PATCH 4/8] Problem seems to be reproduced --- aiida/backends/tests/workflows.py | 12 +++++++++--- aiida/workflows/test.py | 3 +++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/aiida/backends/tests/workflows.py b/aiida/backends/tests/workflows.py index 59adbc2f6e..fdef5c1326 100644 --- a/aiida/backends/tests/workflows.py +++ b/aiida/backends/tests/workflows.py @@ -165,17 +165,23 @@ def test_failing_calc_in_wf(self): wf = FailingWFTestSimpleWithSubWF() wf.store() + pks = [wf.pk] + wf.start() while wf.is_running(): # workflow_stepper() execute_steps() # manual_tick_all() sleep(.1) + self.print_wf(pks) - pks = [wf.pk] - workflows = get_workflow_list(pks) - print "=====> ", workflows + print "=====> ", pks + self.print_wf(pks) + + + def print_wf(self, pks): + workflows = get_workflow_list(pks) tab_size = 2 # how many spaces to use for indentation of subworkflows for w in workflows: if not w.is_subworkflow() or w.pk in pks: diff --git a/aiida/workflows/test.py b/aiida/workflows/test.py index 6f3e439d78..a5a3fdbe4c 100644 --- a/aiida/workflows/test.py +++ b/aiida/workflows/test.py @@ -53,6 +53,9 @@ def start(self): def second_step(self): # Testing calculations self.attach_calculation(self.generate_calc()) + from aiida.common.datastructures import wf_states + # self.set_state(wf_states.ERROR) + raise Exception('Test exception') # Test process self.next(self.third_step) From 2db05a8b69bbd9347439f2dfd459abc20a6f4940 Mon Sep 17 00:00:00 2001 From: Spyros Zoupanos Date: Wed, 8 Nov 2017 11:50:58 +0100 Subject: [PATCH 5/8] This should be the fix for the error. --- aiida/backends/tests/workflows.py | 1 - aiida/orm/implementation/sqlalchemy/workflow.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/aiida/backends/tests/workflows.py b/aiida/backends/tests/workflows.py index fdef5c1326..0188596e07 100644 --- a/aiida/backends/tests/workflows.py +++ b/aiida/backends/tests/workflows.py @@ -175,7 +175,6 @@ def test_failing_calc_in_wf(self): sleep(.1) self.print_wf(pks) - print "=====> ", pks self.print_wf(pks) diff --git a/aiida/orm/implementation/sqlalchemy/workflow.py b/aiida/orm/implementation/sqlalchemy/workflow.py index f65b175e13..0d8d01d16a 100644 --- a/aiida/orm/implementation/sqlalchemy/workflow.py +++ b/aiida/orm/implementation/sqlalchemy/workflow.py @@ -637,6 +637,7 @@ def wrapper(cls, *args, **kwargs): wrapped_method)) cls.append_to_report("full traceback: {0}".format(traceback.format_exc())) method_step.set_state(wf_states.ERROR) + cls.set_state(wf_states.ERROR) return None out = wrapper From 365fe7a8c98ab18b0a0e356c09f797e4c26f307a Mon Sep 17 00:00:00 2001 From: Spyros Zoupanos Date: Wed, 8 Nov 2017 12:12:19 +0100 Subject: [PATCH 6/8] Cleaning up --- aiida/backends/tests/workflows.py | 72 ++++++++++++++++++------------- aiida/workflows/test.py | 22 +--------- 2 files changed, 44 insertions(+), 50 deletions(-) diff --git a/aiida/backends/tests/workflows.py b/aiida/backends/tests/workflows.py index 0188596e07..67831e3503 100644 --- a/aiida/backends/tests/workflows.py +++ b/aiida/backends/tests/workflows.py @@ -155,37 +155,49 @@ def test_wf_get_state(self): self.assertIn(wf.get_state(), wf_states) def test_failing_calc_in_wf(self): - from time import sleep + import logging from aiida.daemon.workflowmanager import execute_steps - from aiida.workflows.test import WFTestSimple, WFTestSimpleWithSubWF - from aiida.orm.workflow import Workflow - from aiida.workflows.test import FailingWFTestSimple, FailingWFTestSimpleWithSubWF - from aiida.daemon.tasks import manual_tick_all - # wf = FailingWFTestSimple() - wf = FailingWFTestSimpleWithSubWF() - wf.store() - - pks = [wf.pk] - - wf.start() - while wf.is_running(): - # workflow_stepper() - execute_steps() - # manual_tick_all() - sleep(.1) - self.print_wf(pks) - - print "=====> ", pks - self.print_wf(pks) - - - def print_wf(self, pks): - workflows = get_workflow_list(pks) - tab_size = 2 # how many spaces to use for indentation of subworkflows - for w in workflows: - if not w.is_subworkflow() or w.pk in pks: - print "\n".join(get_workflow_info(w, tab_size=tab_size, - depth=16)) + from aiida.workflows.test import (FailingWFTestSimple, + FailingWFTestSimpleWithSubWF) + + try: + # First of all, I re-enable logging in case it was disabled by + # mistake by a previous test (e.g. one that disables and reenables + # again, but that failed) + logging.disable(logging.NOTSET) + # Temporarily disable logging to the stream handler (i.e. screen) + # because otherwise fix_calc_states will print warnings + handler = next((h for h in logging.getLogger('aiida').handlers if + isinstance(h, logging.StreamHandler)), None) + if handler: + original_level = handler.level + handler.setLevel(logging.ERROR) + + # Testing the error propagation of a simple workflow + wf = FailingWFTestSimple() + wf.store() + step_no = 0 + wf.start() + while wf.is_running(): + execute_steps() + step_no += 1 + self.assertLess(step_no, 5, "This workflow should have stopped " + "since it is failing") + + # Testing the error propagation of a workflow with subworkflows + wf = FailingWFTestSimpleWithSubWF() + wf.store() + + step_no = 0 + wf.start() + while wf.is_running(): + execute_steps() + step_no += 1 + self.assertLess(step_no, 5, "This workflow should have stopped " + "since it is failing") + finally: + if handler: + handler.setLevel(original_level) def tearDown(self): diff --git a/aiida/workflows/test.py b/aiida/workflows/test.py index a5a3fdbe4c..d6e0389674 100644 --- a/aiida/workflows/test.py +++ b/aiida/workflows/test.py @@ -52,9 +52,8 @@ def start(self): @Workflow.step def second_step(self): # Testing calculations - self.attach_calculation(self.generate_calc()) - from aiida.common.datastructures import wf_states - # self.set_state(wf_states.ERROR) + self.attach_calculation(generate_calc()) + # Raise a test exception that should make the workflow to stop raise Exception('Test exception') # Test process @@ -64,23 +63,6 @@ def second_step(self): def third_step(self): self.next(self.exit) - def generate_calc(self): - from aiida.orm import Code, Computer, CalculationFactory - from aiida.common.datastructures import calc_states - - CustomCalc = CalculationFactory( - 'simpleplugins.templatereplacer') - - computer = Computer.get("localhost") - - calc = CustomCalc(computer=computer, withmpi=True) - calc.set_resources( - {"num_machines": 1, "num_mpiprocs_per_machine": 1}) - calc.store() - calc._set_state(calc_states.FAILED) - - return calc - class FailingWFTestSimpleWithSubWF(Workflow): def __init__(self, **kwargs): From 3f600f0778f1cd31c826a5ac87c8948ed650ceec Mon Sep 17 00:00:00 2001 From: Spyros Zoupanos Date: Wed, 8 Nov 2017 12:18:16 +0100 Subject: [PATCH 7/8] Some minor cleaning --- aiida/backends/tests/workflows.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/aiida/backends/tests/workflows.py b/aiida/backends/tests/workflows.py index 67831e3503..78e6157319 100644 --- a/aiida/backends/tests/workflows.py +++ b/aiida/backends/tests/workflows.py @@ -15,8 +15,6 @@ from aiida.workflows.test import WFTestEmpty from aiida.orm.implementation import get_workflow_info from aiida.workflows.test import WFTestSimpleWithSubWF -from aiida.workflows.wf_demo import WorkflowDemo -from aiida.daemon.tasks import workflow_stepper class TestWorkflowBasic(AiidaTestCase): @@ -155,6 +153,11 @@ def test_wf_get_state(self): self.assertIn(wf.get_state(), wf_states) def test_failing_calc_in_wf(self): + """ + This test checks that a workflow (but also a workflow with + sub-workflows) that has an exception at one of its steps stops + properly and it is not left as RUNNING. + """ import logging from aiida.daemon.workflowmanager import execute_steps from aiida.workflows.test import (FailingWFTestSimple, @@ -199,7 +202,6 @@ def test_failing_calc_in_wf(self): if handler: handler.setLevel(original_level) - def tearDown(self): """ Cleaning the database after each test. Since I don't From c2c2a42d49308cd6ebfdafa11be82ba299a90279 Mon Sep 17 00:00:00 2001 From: Spyros Zoupanos Date: Thu, 9 Nov 2017 14:53:20 +0100 Subject: [PATCH 8/8] Putting back execute_steps() in the manual_tick_all() --- aiida/daemon/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiida/daemon/tasks.py b/aiida/daemon/tasks.py index 6da72a7202..fd09e60c89 100644 --- a/aiida/daemon/tasks.py +++ b/aiida/daemon/tasks.py @@ -139,5 +139,5 @@ def manual_tick_all(): submit_jobs() update_jobs() retrieve_jobs() - workflow_stepper() # legacy workflows + execute_steps() # legacy workflows tick_workflow_engine()