Skip to content

Commit 51cefe9

Browse files
committed
fix some bug
1 parent b29296e commit 51cefe9

File tree

4 files changed

+55
-64
lines changed

4 files changed

+55
-64
lines changed

DAGflow/DAG.py

+10-4
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ def __init__(self, id, script, work_dir=".", type="sge", option=""):
125125
self.script = script
126126
self.type = type
127127
self._option = option
128-
self.done = os.path.join(work_dir, "%s_done" % id)
128+
self.done = os.path.join(self.work_dir, "%s_done" % id)
129129

130130
self.depends = []
131131
self.status = None
@@ -157,9 +157,9 @@ def run_time(self):
157157
"""
158158
if self.end_time and self.start_time:
159159
_time = self.end_time - self.start_time
160-
return "%ss" % _time
160+
return "%s" % int(_time)
161161
else:
162-
return "0s"
162+
return "0"
163163

164164
"""
165165
Functions to describe relationship between tasks
@@ -298,15 +298,21 @@ def kill(self):
298298
return 1
299299

300300
def check_done(self):
301+
"""
302+
check the status of done task
303+
:return: success 1 or fail 0
304+
"""
301305
if os.path.isfile(self.done):
302306
self.status = "success"
303307
self.end_time = time.time()
304308
LOG.info("task %r finished by %s seconds" % (self.id, self.run_time))
309+
310+
return 1
305311
else:
306312
self.status = "failed"
307313
LOG.info("task %r run but failed" % self.id)
308314

309-
return 1
315+
return 0
310316

311317
"""
312318
methods to read and write Task from json

DAGflow/do_DAG.py

+30-27
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import time
2424
import json
2525
import signal
26-
from .DAG import Task, DAG
26+
from DAGflow import DAG
2727

2828

2929
LOG = logging.getLogger(__name__)
@@ -98,7 +98,7 @@ def ps():
9898
return r
9999

100100

101-
def update_task_status(tasks):
101+
def update_task_status(tasks, stop_on_failure):
102102
"""
103103
104104
:param tasks:
@@ -133,12 +133,19 @@ def update_task_status(tasks):
133133

134134
# check recent done tasks on sge
135135
if task.type == "sge" and task.run_id not in sge_running_task:
136-
task.check_done()
136+
status = task.check_done()
137+
138+
if not status and stop_on_failure:
139+
LOG.info("Task %r failed, stop all tasks" % task.id)
140+
del_online_tasks()
137141
continue
138142
elif task.type == "local":
139-
if task.run_id.poll():
140-
task.check_done()
143+
if not task.run_id.poll():
144+
status = task.check_done()
141145

146+
if not status and stop_on_failure:
147+
LOG.info("Task %r failed, stop all tasks" % task.id)
148+
del_online_tasks()
142149
continue
143150
else:
144151
pass
@@ -192,7 +199,11 @@ def submit_tasks(tasks, concurrent_tasks):
192199
return tasks
193200

194201

195-
def qdel_online_tasks(signum, frame):
202+
def del_task_hander(signum, frame):
203+
del_online_tasks()
204+
205+
206+
def del_online_tasks():
196207
LOG.info("delete all running jobs, please wait")
197208
time.sleep(3)
198209

@@ -201,44 +212,37 @@ def qdel_online_tasks(signum, frame):
201212
if task.status == "running":
202213
task.kill()
203214

204-
write_tasks(TASKS, TASK_NAME + ".json")
215+
write_tasks(TASKS)
205216

206217
sys.exit("sorry, the program exit")
207218

208219

209-
def write_tasks(tasks, filename):
220+
def write_tasks(tasks):
210221
failed_tasks = []
211222

212-
tasks_json = OrderedDict()
213-
214223
for id, task in tasks.items():
215224

216225
if task.status != "success":
217226
failed_tasks.append(task.id)
218227

219-
tasks_json.update(task.to_json())
220-
221-
with open(filename, "w") as out:
222-
json.dump(tasks_json, out, indent=2)
223-
224228
if failed_tasks:
225229
LOG.info("""\
226230
The following tasks were failed:
227231
%s
228-
The tasks were save in %s, you can resub it.
229-
""" % ("\n".join([i for i in failed_tasks]), filename))
232+
""" % "\n".join([i for i in failed_tasks]))
230233
sys.exit("sorry, the program exit with some jobs failed")
231234
else:
232235
LOG.info("All jobs were done!")
233236

234237

235-
def do_dag(dag, concurrent_tasks, refresh_time, log_name=""):
238+
def do_dag(dag, concurrent_tasks=200, refresh_time=60, stop_on_failure=False):
236239

240+
dag.to_json()
237241
start = time.time()
238242

239243
logging.basicConfig(level=logging.DEBUG,
240244
format="[%(levelname)s] %(asctime)s %(message)s",
241-
filename=log_name,
245+
filename="%s.log" % dag.id,
242246
filemode='w',
243247
)
244248

@@ -253,15 +257,13 @@ def do_dag(dag, concurrent_tasks, refresh_time, log_name=""):
253257
global TASKS
254258
TASKS = dag.tasks
255259

256-
signal.signal(signal.SIGINT, qdel_online_tasks)
257-
signal.signal(signal.SIGTERM, qdel_online_tasks)
260+
signal.signal(signal.SIGINT, del_task_hander)
261+
signal.signal(signal.SIGTERM, del_task_hander)
258262
# signal.signal(signal.SIGKILL, qdel_online_tasks)
259263

260264
for id, task in TASKS.items():
261265
task.init()
262266

263-
failed_json = TASK_NAME + ".json"
264-
265267
loop = 0
266268

267269
while 1:
@@ -292,10 +294,10 @@ def do_dag(dag, concurrent_tasks, refresh_time, log_name=""):
292294
else:
293295
time.sleep(refresh_time)
294296
loop += 1
295-
update_task_status(TASKS)
297+
update_task_status(TASKS, stop_on_failure)
296298

297299
# write failed
298-
write_tasks(TASKS, failed_json)
300+
write_tasks(TASKS)
299301
totalTime = time.time() - start
300302
LOG.info('Total time:' + time.strftime("%H:%M:%S", time.gmtime(totalTime)))
301303

@@ -314,7 +316,8 @@ def get_args():
314316

315317
parser.add_argument("json", help="The json file contain DAG information")
316318
parser.add_argument("-m", "--max_task", type=int, default=200, help="concurrent_tasks")
317-
parser.add_argument("-r", "--refresh", type=int, default=30, help="refresh time of task status (seconds)")
319+
parser.add_argument("-r", "--refresh", type=int, default=60, help="refresh time of task status (seconds)")
320+
parser.add_argument("-s", "--stopOnFailure", action="store_true", help="stop all tasks when any task failure")
318321
args = parser.parse_args()
319322

320323
return args
@@ -327,7 +330,7 @@ def main():
327330
TASK_NAME = os.path.splitext(os.path.basename(args.json))[0]
328331
print(TASK_NAME)
329332
dag = DAG.from_json(args.json)
330-
do_dag(dag, args.max_task, args.refresh, TASK_NAME+".log")
333+
do_dag(dag, args.max_task, args.refresh, args.stopOnFailure)
331334

332335

333336
if __name__ == "__main__":

README.md

+9-14
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ To complete this work, a workflow as following is needed
3636
At first, you should write your workflow script
3737
```python
3838
import os
39-
from DAGflow.DAG import DAG, Task
39+
from DAGflow import DAG, Task
40+
from DAGflow.do_DAG import do_dag
4041

4142

4243
inputs = ['1.fasta', "2.fasta", "3.fasta", "4.fasta"]
@@ -47,7 +48,7 @@ db = os.path.abspath(db)
4748
my_dag = DAG("blast")
4849
# create the first task 'make_db'
4950
make_db = Task(
50-
task_id="make_db", # your task id, should be unique
51+
id="make_db", # your task id, should be unique
5152
work_dir="", # you task work directory
5253
type="local", # the way your task run. if "sge", task will submit with qsub
5354
option={}, # the option of "sge" or "local"
@@ -64,7 +65,7 @@ n = 1
6465
for fn in inputs:
6566
task_id = "blast_%s" % n
6667
task = Task(
67-
task_id= task_id,
68+
id= task_id,
6869
work_dir=task_id,
6970
type="sge",
7071
option={
@@ -82,7 +83,7 @@ for fn in inputs:
8283

8384
# add blast_join task to join blast results
8485
blast_join = Task(
85-
task_id="blast_join",
86+
id="blast_join",
8687
work_dir="",
8788
type="local", # option is default
8889
script="cat */*.m6 > blast.all.m6"
@@ -93,10 +94,8 @@ my_dag.add_task(blast_join)
9394
blast_join.set_upstream(*blast_tasks)
9495

9596
# all of you tasks were added to you workflow, you can run it
96-
# write you workflow to a json file
97-
js = my_dag.to_json()
98-
# submit you workflow tasks with do_DAG
99-
os.system("python -m DAGflow.do_DAG %s" % js)
97+
do_dag(my_dag)
98+
10099
```
101100
Now, your workflow script is completed, you can name it as 'workflow.py'
102101
### Run you workflow
@@ -106,15 +105,11 @@ python workflow.py
106105
```
107106
### Re-run your workflow if it was break in the middle
108107
For some reason, you workflow was broken with some tasks undone.
109-
You can use the following commands to re-run the undone jobs.
110-
```commandline
111-
python -m DAGflow.do_DAG blast.json
112-
# note that the blast.json is in you work directory, 'blast' is your DAG id.
113-
```
108+
You can use the same command `python workflow.py` to re-run the undone jobs.
114109
### Add workflow to workflow
115110
Sometimes you may want to add a workflow to another workflow, this can be down as following:
116111
```python
117-
from DAGflow.DAG import *
112+
from DAGflow import *
118113

119114

120115
# two workflow wf1 and wf2

test/test_add_dag.py

+6-19
Original file line numberDiff line numberDiff line change
@@ -4,39 +4,26 @@
44
"""
55

66

7-
from DAGflow.DAG import *
7+
from DAGflow import *
88

99
workflow = DAG("test")
1010

1111
task1 = Task(
12-
task_id="task1",
12+
id="task1",
1313
work_dir="",
1414
type="sge",
15-
script="echo running workflow task 1"
15+
script="echo running workflow task 1\nsleep 5"
1616
)
1717

1818
workflow.add_task(task1)
1919

2020
task2 = Task(
21-
task_id="task2",
21+
id="task2",
2222
work_dir="task2",
2323
type="local",
24-
script="echo running workflow task 3"
24+
script="echo running workflow task 2\n sleep 5"
2525
)
2626
workflow.add_task(task2)
2727
task2.set_upstream(task1)
2828

29-
wf2 = DAG("test2")
30-
31-
task3 = Task(
32-
task_id="task3",
33-
work_dir=".",
34-
type="local",
35-
script="echo running workflow task 3"
36-
)
37-
38-
wf2.add_task(task3)
39-
40-
workflow.add_dag(wf2)
41-
42-
workflow.print_task()
29+
do_dag(workflow, 200, 10, "test")

0 commit comments

Comments
 (0)