Skip to content

Commit 540bbe4

Browse files
antgonzaElDeveloper
authored andcommitted
adding recover jobs script (#2135)
* adding recover jobs script * small fixes * addressing @elDevelopers comments
1 parent a058102 commit 540bbe4

File tree

1 file changed

+137
-0
lines changed

1 file changed

+137
-0
lines changed

scripts/qiita-recover-jobs

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
#!/usr/bin/env python
2+
3+
# -----------------------------------------------------------------------------
4+
# Copyright (c) 2014--, The Qiita Development Team.
5+
#
6+
# Distributed under the terms of the BSD 3-clause License.
7+
#
8+
# The full license is in the file LICENSE, distributed with this software.
9+
# -----------------------------------------------------------------------------
10+
from subprocess import check_output
11+
from qiita_db.sql_connection import TRN
12+
from qiita_db.processing_job import ProcessingJob
13+
from time import sleep
14+
15+
16+
SLEEP_TIME = 2
17+
18+
19+
def _submit_jobs(jids_to_recover, recover_type):
20+
len_jids_to_recover = len(jids_to_recover) - 1
21+
for i, j in enumerate(jids_to_recover):
22+
print 'recovering %s: %d/%d' % (recover_type, len_jids_to_recover, i)
23+
job = ProcessingJob(j)
24+
job._set_status('in_construction')
25+
job.submit()
26+
sleep(SLEEP_TIME)
27+
28+
29+
def qiita_recover_jobs():
30+
# Step 1: get current jobs running in the queue
31+
qiita_jobs = [line.split()[0] for line in check_output("qstat").split("\n")
32+
# just retriving 'qiita' and ignoring [] (ipython workers)
33+
if 'qiita' in line and '[]' not in line and
34+
# and private jobs
35+
'private' not in line]
36+
qiita_jids = []
37+
for qj in qiita_jobs:
38+
# to retrieve info about the jobs we need to use the fullname, so
39+
# appending .ucsd.edu
40+
args = ["qstat", "-f", "%s.ucsd.edu" % qj]
41+
# the name is the last string of the line and has .txt prepended
42+
qji = [line.split()[-1].split(".")[0]
43+
for line in check_output(args).split("\n")
44+
if 'Job_Name' in line]
45+
qiita_jids.extend(qji)
46+
qiita_jids = set(qiita_jids)
47+
48+
sql = """SELECT processing_job_id
49+
FROM qiita.processing_job
50+
JOIN qiita.processing_job_status
51+
USING (processing_job_status_id)
52+
WHERE processing_job_status = %s"""
53+
sql_validators = """SELECT processing_job_id, array_agg(validator_id)
54+
FROM qiita.processing_job_validator
55+
WHERE processing_job_id in %s
56+
GROUP BY processing_job_id"""
57+
58+
# Step 2: recover jobs that are in queue status
59+
with TRN:
60+
recover_type = 'queued'
61+
TRN.add(sql, [recover_type])
62+
jids = set(TRN.execute_fetchflatten())
63+
jids_to_recover = jids - qiita_jids
64+
65+
_submit_jobs(jids_to_recover, recover_type)
66+
67+
# Step 3: recover jobs that are running, note that there are several steps
68+
# to recover this group: 3.1. check if they have validators,
69+
# 3.2. if so, recover validators, 3. recover failed jobs
70+
with TRN:
71+
# getting all "running" jobs, discard those in qiita_jids
72+
recover_type = 'running'
73+
TRN.add(sql, [recover_type])
74+
jids = set(TRN.execute_fetchflatten())
75+
jids_to_recover = jids - qiita_jids
76+
77+
# 3.1, and 3.2: checking which jobs have validators, and recover them
78+
79+
TRN.add(sql_validators, [tuple(jids_to_recover)])
80+
jids_validator = TRN.execute_fetchindex()
81+
jobs_with_validators = []
82+
for j, validators in jids_validator:
83+
validators = validators[1:-1].split(',')
84+
# we append the main job as it shouldn't be touch
85+
# and all their validators
86+
jobs_with_validators.append(j)
87+
jobs_with_validators.extend(validators)
88+
job = ProcessingJob(j)
89+
status = set([ProcessingJob(v).status
90+
for v in validators if v not in qiita_jids])
91+
# if there are no status, that means that the validators weren't
92+
# created and we should rerun from scratch (Step 4)
93+
if not bool(status):
94+
continue
95+
# it multiple status in the validators, it's a complex behaivor
96+
# and needs a case by case solution
97+
if len(status) != 1:
98+
print ("Job '%s' has too many validators status (%d), check "
99+
"them by hand" % (j, len(status)))
100+
continue
101+
status = list(status)[0]
102+
103+
if status == 'waiting':
104+
print "releasing job validators: %s" % j
105+
job.release_validators()
106+
sleep(SLEEP_TIME)
107+
elif status == 'running':
108+
_submit_jobs(validators, recover_type + ' validator, running')
109+
elif status == 'error':
110+
# in this case is the same process than before but we need
111+
# to split the set in_construction and submit in 2 steps,
112+
# however, we can still submit via _submit_jobs
113+
for v in validators:
114+
vjob = ProcessingJob(v)
115+
vjob._set_status('in_construction')
116+
_submit_jobs(validators + ' validator, error')
117+
else:
118+
print ("Check the status of this job %s : %s and validators"
119+
"%s." % (j, status, validators))
120+
121+
jids_to_recover = jids_to_recover - set(jobs_with_validators)
122+
123+
# Step 4: Finally, we recover all the leftover jobs
124+
for i, j in enumerate(jids_to_recover):
125+
job = ProcessingJob(j)
126+
status = job.status
127+
128+
if status == 'waiting':
129+
print "releasing job validators: %s" % j
130+
job.release_validators()
131+
sleep(SLEEP_TIME)
132+
elif 'running' == status:
133+
_submit_jobs([j], 'main_job, running')
134+
135+
136+
if __name__ == '__main__':
137+
qiita_recover_jobs()

0 commit comments

Comments
 (0)