@@ -12,18 +12,17 @@ from qiita_db.sql_connection import TRN
1212from qiita_db .processing_job import ProcessingJob
1313import pandas as pd
1414from time import sleep
15- from datetime import timedelta
1615from math import ceil
16+ from io import StringIO
17+
1718
18- QIITA_QUEUE_LOG = '/home/qiita/qiita-queues-logs-DONT-DELETE.log'
1919SLEEP_TIME = 6
2020CHANCES = 3
2121SQL = """SELECT processing_job_id
2222 FROM qiita.processing_job
2323 JOIN qiita.processing_job_status
2424 USING (processing_job_status_id)
2525 WHERE processing_job_status = %s"""
26- ARRAY_COMMANDS = set (['Woltka v0.1.1' ])
2726
2827
2928def _submit_jobs (jids_to_recover , recover_type ):
@@ -42,37 +41,17 @@ def _submit_jobs(jids_to_recover, recover_type):
4241
4342
4443def _retrieve_queue_jobs ():
45- lines = check_output (["qstat" , "-f" ]).decode ('ascii' ).split ("\n " )
46-
47- # looking for qiita jobs
48- # i-1: the line before is the job name, which is the internal qiita job id
49- job_names = [lines [i - 1 ] for i , l in enumerate (lines )
50- if l .startswith (' Job_Owner = qiita' )]
51-
52- qiita_jids = []
53- for job in job_names :
54- # this should always be false but rather check
55- if 'Job_Name' not in job :
56- continue
57- # ignoring interactive jobs
58- if 'STDIN' in job :
59- continue
60-
61- # removing extra info
62- jid = job [15 :].replace ('merge-' , '' ).replace ('.txt' , '' )
63- qiita_jids .append (jid )
44+ # getting all the jobs in the queues
45+ all_jobs = pd .read_csv (StringIO (
46+ check_output (['squeue' , '-o' , '%all' ]).decode ('ascii' )), sep = '|' )
6447
65- return set (qiita_jids )
48+ # just keeping the qiita jobs
49+ jobs = all_jobs [all_jobs .GROUP == 'qiita' ]
6650
51+ # ignore the merge-jobs and get unique values
52+ qiita_jids = jobs .NAME .str .replace ('merge-' , '' ).unique ()
6753
68- def _count_jobs_in_scheduler ():
69- # first let's count all regular jobs
70- j1 = len (check_output (['qstat' ]).decode ('ascii' ).split ("\n " ))
71- # now, let's count the jobs in job arrays
72- lines = check_output (['qstat' , '-f' ]).decode ('ascii' ).split ("\n " )
73- j2 = sum ([int (x .split (' ' )[- 1 ].split (',' )[- 1 ].split ('-' )[- 1 ])
74- for x in lines if 'job_array_request' in x ])
75- return j1 + j2
54+ return set (qiita_jids )
7655
7756
7857def _get_jids_to_recover (recover_type ):
@@ -84,57 +63,35 @@ def _get_jids_to_recover(recover_type):
8463 return jids_to_recover
8564
8665
87- def _parse_queue_values (d ):
88- max_mem = 0
89- max_pmem = 0
90- max_vmem = 0
91- max_wt = timedelta (hours = 0 , minutes = 0 , seconds = 0 )
92- d = d .split (',' )
93- for dd in d :
94- if dd .startswith ('mem' ):
95- v = int (dd [4 :- 2 ])
96- if v > max_mem :
97- max_mem = v
98- elif dd .startswith ('pmem' ):
99- v = int (dd [5 :- 2 ])
100- if v > max_pmem :
101- max_pmem = v
102- elif dd .startswith ('vmem' ):
103- v = int (dd [5 :- 2 ])
104- if v > max_mem :
105- max_mem = v
106- elif dd .startswith ('walltime' ):
107- v = map (int , dd [9 :].split (':' ))
108- v = timedelta (hours = v [0 ], minutes = v [1 ], seconds = v [2 ])
109- if v > max_wt :
110- max_wt = v
111- return max_mem , max_pmem , max_vmem , max_wt
112-
113-
11466def _qiita_queue_log_parse (jids_to_recover ):
115- df = pd .read_csv (QIITA_QUEUE_LOG , sep = '\t ' ,
116- index_col = None , header = None , dtype = str , names = [
117- 'bjid' , 'user' , 'group' , 'jid' , 'session' ,
118- 'resource-list' , 'resource-used' , 'queue' , 'account' ,
119- 'exit-code' , 'node' ])
120- # remove the register and empty fields to avoid errors
121- df = df [(df .bjid != '0' ) &
122- (~ df .bjid .isnull ()) &
123- (~ df .user .isnull ()) &
124- (df .jid != 'register.txt' )]
125- # generate the qiita job id
126- df ['qjid' ] = df .jid .apply (lambda x : x .split ('.' )[0 ])
127-
12867 results = []
129- for jid , ddf in df .groupby ('qjid' ):
130- if jid in jids_to_recover :
131- vals = []
132- for _ , r in ddf .iterrows ():
133- vals .append ({
134- 'exit-code' : r ['exit-code' ],
135- 'resource-list' : _parse_queue_values (r ['resource-list' ]),
136- 'resource-used' : _parse_queue_values (r ['resource-used' ])})
137- results .append ((ProcessingJob (jid ), vals ))
68+ for jid in jids_to_recover :
69+ job = ProcessingJob (jid )
70+ if job .external_id :
71+ bvals = pd .read_csv (StringIO (check_output ([
72+ 'sacct' , '-p' ,
73+ '--format=ExitCode,ReqMem,MaxRSS,CPUTimeRAW,TimelimitRaw' ,
74+ '-j' , f'{ job .external_id } .batch' ]).decode (
75+ 'ascii' )), sep = '|' ).iloc [0 ].to_dict ()
76+ vals = pd .read_csv (StringIO (check_output ([
77+ 'sacct' , '-p' ,
78+ '--format=ExitCode,ReqMem,MaxRSS,CPUTimeRAW,TimelimitRaw' ,
79+ '-j' , f'{ job .external_id } ' ]).decode (
80+ 'ascii' )), sep = '|' ).iloc [0 ].to_dict ()
81+ data = {
82+ 'exit-code' : bvals ['ExitCode' ],
83+ 'mem-requested' : bvals ['ReqMem' ],
84+ 'time-requested' : vals ['TimelimitRaw' ],
85+ 'mem-used' : bvals ['MaxRSS' ],
86+ 'time-used' : bvals ['CPUTimeRAW' ]}
87+ else :
88+ data = {
89+ 'exit-code' : None ,
90+ 'mem-requested' : None ,
91+ 'time-requested' : None ,
92+ 'mem-used' : None ,
93+ 'time-used' : None }
94+ results .append (job , data )
13895
13996 return results
14097
0 commit comments