@@ -201,7 +201,6 @@ def __init__(self, name=None, job_id=None, status=None, priority=None, loaded_da
201
201
priority = loaded_data ['priority' ]
202
202
203
203
self .rerun_only = False
204
- self .script_name_wrapper = None
205
204
self .delay_end = None
206
205
self .wrapper_type = None
207
206
self ._wrapper_queue = None
@@ -247,7 +246,6 @@ def __init__(self, name=None, job_id=None, status=None, priority=None, loaded_da
247
246
self ._status = None
248
247
self .status = status
249
248
self .prev_status = status
250
- self .old_status = self .status
251
249
self .new_status = status
252
250
self .priority = priority
253
251
self ._parents = set ()
@@ -258,7 +256,6 @@ def __init__(self, name=None, job_id=None, status=None, priority=None, loaded_da
258
256
self ._tmp_path = os .path .join (
259
257
BasicConfig .LOCAL_ROOT_DIR , self .expid , BasicConfig .LOCAL_TMP_DIR )
260
258
self ._log_path = Path (f"{ self ._tmp_path } /LOG_{ self .expid } " )
261
- self .write_start = False
262
259
self ._platform = None
263
260
self .check = 'true'
264
261
self .check_warnings = False
@@ -289,7 +286,6 @@ def __init__(self, name=None, job_id=None, status=None, priority=None, loaded_da
289
286
self .log_recovered = False
290
287
self .submit_time_timestamp = None # for wrappers, all jobs inside a wrapper are submitted at the same time
291
288
self .start_time_timestamp = None
292
- self .finish_time_timestamp = None # for wrappers, with inner_retrials, the submission time should be the last finish_time of the previous retrial
293
289
self ._script = None # Inline code to be executed
294
290
self ._log_recovery_retries = None
295
291
self .ready_date = None
@@ -1466,7 +1462,6 @@ def update_status(self, as_conf, failed_file=False):
1466
1462
:param failed_file: boolean, if True, checks if the job failed
1467
1463
:return:
1468
1464
"""
1469
- self .log_avaliable = False
1470
1465
previous_status = self .status
1471
1466
self .prev_status = previous_status
1472
1467
new_status = self .new_status
@@ -1813,8 +1808,7 @@ def process_scheduler_parameters(self, job_platform, chunk):
1813
1808
self .het ['CUSTOM_DIRECTIVES' ].append (json .loads (custom_directive ))
1814
1809
self .custom_directives = self .het ['CUSTOM_DIRECTIVES' ][0 ]
1815
1810
else :
1816
- if type (self .custom_directives ) is str : # TODO This is a workaround for the time being, just defined for tests passing without more issues
1817
- self .custom_directives = json .loads (self .custom_directives )
1811
+ self .custom_directives = json .loads (self .custom_directives )
1818
1812
if len (self .het ['CUSTOM_DIRECTIVES' ]) < self .het ['HETSIZE' ]:
1819
1813
for x in range (self .het ['HETSIZE' ] - len (self .het ['CUSTOM_DIRECTIVES' ])):
1820
1814
self .het ['CUSTOM_DIRECTIVES' ].append (self .custom_directives )
@@ -2428,7 +2422,6 @@ def create_wrapped_script(self, as_conf, wrapper_tag='wrapped'):
2428
2422
'%(?<!%%)' + variable + '%(?!%%)' , '' , template_content ,flags = re .I )
2429
2423
template_content = template_content .replace ("%%" , "%" )
2430
2424
script_name = '{0}.{1}.cmd' .format (self .name , wrapper_tag )
2431
- self .script_name_wrapper = '{0}.{1}.cmd' .format (self .name , wrapper_tag )
2432
2425
open (os .path .join (self ._tmp_path , script_name ),
2433
2426
'w' ).write (template_content )
2434
2427
os .chmod (os .path .join (self ._tmp_path , script_name ), 0o755 )
@@ -2557,12 +2550,10 @@ def write_end_time(self, completed, count=-1):
2557
2550
if end_time > 0 :
2558
2551
# noinspection PyTypeChecker
2559
2552
f .write (date2str (datetime .datetime .fromtimestamp (float (end_time )), 'S' ))
2560
- self .finish_time_timestamp = date2str (datetime .datetime .fromtimestamp (end_time ),'S' )
2561
2553
# date2str(datetime.datetime.fromtimestamp(end_time), 'S')
2562
2554
finish_time = end_time
2563
2555
else :
2564
2556
f .write (date2str (datetime .datetime .now (), 'S' ))
2565
- self .finish_time_timestamp = date2str (datetime .datetime .now (), 'S' )
2566
2557
finish_time = time .time ()
2567
2558
f .write (' ' )
2568
2559
if completed :
@@ -2583,42 +2574,6 @@ def write_end_time(self, completed, count=-1):
2583
2574
thread_write_finish .start ()
2584
2575
2585
2576
2586
- def write_total_stat_by_retries (self , total_stats , first_retrial = False ):
2587
- """
2588
- Writes all data to TOTAL_STATS file
2589
- :param total_stats: data gathered by the wrapper
2590
- :type total_stats: dict
2591
- :param first_retrial: True if this is the first retry, False otherwise
2592
- :type first_retrial: bool
2593
-
2594
- """
2595
- path = os .path .join (self ._tmp_path , self .name + '_TOTAL_STATS' )
2596
- f = open (path , 'a' )
2597
- if first_retrial :
2598
- f .write (" " + date2str (datetime .datetime .fromtimestamp (total_stats [0 ]), 'S' ) + ' ' + date2str (datetime .datetime .fromtimestamp (total_stats [1 ]), 'S' ) + ' ' + total_stats [2 ])
2599
- else :
2600
- f .write ('\n ' + date2str (datetime .datetime .fromtimestamp (total_stats [0 ]), 'S' ) + ' ' + date2str (datetime .datetime .fromtimestamp (total_stats [0 ]), 'S' ) + ' ' + date2str (datetime .datetime .fromtimestamp (total_stats [1 ]), 'S' ) + ' ' + total_stats [2 ])
2601
- out , err = self .local_logs
2602
- path_out = os .path .join (self ._tmp_path , 'LOG_' + str (self .expid ), out )
2603
- # Launch first as simple non-threaded function
2604
-
2605
- exp_history = ExperimentHistory (self .expid , jobdata_dir_path = BasicConfig .JOBDATA_DIR , historiclog_dir_path = BasicConfig .HISTORICAL_LOG_DIR )
2606
- exp_history .write_start_time (self .name , start = total_stats [0 ], status = Status .VALUE_TO_KEY .get (self .status , "UNKNOWN" ), qos = self .queue , job_id = self .id , wrapper_queue = self ._wrapper_queue , wrapper_code = get_job_package_code (self .expid , self .name ),
2607
- children = self .children_names_str )
2608
- if not first_retrial :
2609
- exp_history = ExperimentHistory (self .expid , jobdata_dir_path = BasicConfig .JOBDATA_DIR , historiclog_dir_path = BasicConfig .HISTORICAL_LOG_DIR )
2610
- exp_history .write_submit_time (self .name , submit = total_stats [0 ], status = Status .VALUE_TO_KEY .get (self .status , "UNKNOWN" ), ncpus = self .processors ,
2611
- wallclock = self .wallclock , qos = self .queue , date = self .date , member = self .member , section = self .section , chunk = self .chunk ,
2612
- platform = self .platform_name , job_id = self .id , wrapper_queue = self ._wrapper_queue , wrapper_code = get_job_package_code (self .expid , self .name ),
2613
- children = self .children_names_str )
2614
- exp_history = ExperimentHistory (self .expid , jobdata_dir_path = BasicConfig .JOBDATA_DIR , historiclog_dir_path = BasicConfig .HISTORICAL_LOG_DIR )
2615
- job_data_dc = exp_history .write_finish_time (self .name , finish = total_stats [1 ], status = total_stats [2 ], job_id = self .id , out_file = out , err_file = err )
2616
- # Launch second as threaded function only for slurm
2617
- if job_data_dc and type (self .platform ) is not str and self .platform .type == "slurm" :
2618
- thread_write_finish = Thread (target = ExperimentHistory (self .expid , jobdata_dir_path = BasicConfig .JOBDATA_DIR , historiclog_dir_path = BasicConfig .HISTORICAL_LOG_DIR ).write_platform_data_after_finish , args = (job_data_dc , self .platform ))
2619
- thread_write_finish .name = "JOB_data_{}" .format (self .name )
2620
- thread_write_finish .start ()
2621
-
2622
2577
def check_started_after (self , date_limit ):
2623
2578
"""
2624
2579
Checks if the job started after the given date
0 commit comments