Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/qiita-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: flake8
- name: ruff
uses: actions/setup-python@v2
with:
python-version: "3.9"
Expand All @@ -262,5 +262,5 @@ jobs:
uses: actions/checkout@v2
- name: lint
run: |
pip install -q flake8
flake8 qiita_* setup.py scripts/qiita* notebooks/*/*.py
pip install -q ruff
ruff check qiita_* setup.py scripts/qiita* notebooks/*/*.py
299 changes: 185 additions & 114 deletions notebooks/resource-allocation/generate-allocation-summary-arrays.py

Large diffs are not rendered by default.

123 changes: 68 additions & 55 deletions notebooks/resource-allocation/generate-allocation-summary.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,44 @@
from subprocess import check_output
import pandas as pd
from datetime import datetime, timedelta
from io import StringIO
from json import loads
from os.path import join
from subprocess import check_output

import pandas as pd

from qiita_db.util import MaxRSS_helper
from qiita_db.exceptions import QiitaDBUnknownIDError
from qiita_db.processing_job import ProcessingJob
from qiita_db.software import Software

from qiita_db.util import MaxRSS_helper

all_commands = [c for s in Software.iter(False) for c in s.commands]

# retrieving only the numerice external_id means that we are only focusing
# on barnacle2/slurm jobs
main_jobs = [j for c in all_commands for j in c.processing_jobs
if j.status == 'success' and j.external_id.isnumeric()]

sacct = ['sacct', '-p', '--format=JobID,ElapsedRaw,MaxRSS,Submit,Start,MaxRSS,'
'CPUTimeRAW,ReqMem,AllocCPUs,AveVMSize', '-j']
main_jobs = [
j
for c in all_commands
for j in c.processing_jobs
if j.status == "success" and j.external_id.isnumeric()
]

sacct = [
"sacct",
"-p",
"--format=JobID,ElapsedRaw,MaxRSS,Submit,Start,MaxRSS,"
"CPUTimeRAW,ReqMem,AllocCPUs,AveVMSize",
"-j",
]

data = []
for i, j in enumerate(main_jobs):
if i % 1000 == 0:
print(f'{i}/{len(main_jobs)}')
print(f"{i}/{len(main_jobs)}")
eid = j.external_id
extra_info = ''
rvals = StringIO(check_output(sacct + [eid]).decode('ascii'))
_d = pd.read_csv(rvals, sep='|')
_d['QiitaID'] = j.id
extra_info = ""
rvals = StringIO(check_output(sacct + [eid]).decode("ascii"))
_d = pd.read_csv(rvals, sep="|")
_d["QiitaID"] = j.id
cmd = j.command
s = j.command.software
try:
Expand All @@ -43,44 +52,49 @@
except TypeError as e:
# similar to the except above, exept that for these 2 commands, we have
# the study_id as None
if cmd.name in {'create_sample_template', 'delete_sample_template',
'list_remote_files'}:
if cmd.name in {
"create_sample_template",
"delete_sample_template",
"list_remote_files",
}:
continue
else:
raise e

sname = s.name

if cmd.name == 'release_validators':
ej = ProcessingJob(j.parameters.values['job'])
if cmd.name == "release_validators":
ej = ProcessingJob(j.parameters.values["job"])
extra_info = ej.command.name
samples, columns, input_size = ej.shape
elif cmd.name == 'complete_job':
artifacts = loads(j.parameters.values['payload'])['artifacts']
elif cmd.name == "complete_job":
artifacts = loads(j.parameters.values["payload"])["artifacts"]
if artifacts is not None:
extra_info = ','.join({
x['artifact_type'] for x in artifacts.values()
if 'artifact_type' in x})
elif cmd.name == 'Validate':
input_size = sum([len(x) for x in loads(
j.parameters.values['files']).values()])
extra_info = ",".join(
{x["artifact_type"] for x in artifacts.values() if "artifact_type" in x}
)
elif cmd.name == "Validate":
input_size = sum([len(x) for x in loads(j.parameters.values["files"]).values()])
sname = f"{sname} - {j.parameters.values['artifact_type']}"
elif cmd.name == 'Alpha rarefaction curves [alpha_rarefaction]':
elif cmd.name == "Alpha rarefaction curves [alpha_rarefaction]":
extra_info = j.parameters.values[
('The number of rarefaction depths to include between min_depth '
'and max_depth. (steps)')]

_d['external_id'] = eid
_d['sId'] = s.id
_d['sName'] = sname
_d['sVersion'] = s.version
_d['cId'] = cmd.id
_d['cName'] = cmd.name
_d['samples'] = samples
_d['columns'] = columns
_d['input_size'] = input_size
_d['extra_info'] = extra_info
_d.drop(columns=['Unnamed: 10'], inplace=True)
(
"The number of rarefaction depths to include between min_depth "
"and max_depth. (steps)"
)
]

_d["external_id"] = eid
_d["sId"] = s.id
_d["sName"] = sname
_d["sVersion"] = s.version
_d["cId"] = cmd.id
_d["cName"] = cmd.name
_d["samples"] = samples
_d["columns"] = columns
_d["input_size"] = input_size
_d["extra_info"] = extra_info
_d.drop(columns=["Unnamed: 10"], inplace=True)
data.append(_d)

data = pd.concat(data)
Expand All @@ -100,34 +114,33 @@
# external_id but separate from external_id.batch.
# Here we are going to merge all this info into a single row + some
# other columns
date_fmt = '%Y-%m-%dT%H:%M:%S'
date_fmt = "%Y-%m-%dT%H:%M:%S"

df = []
for eid, __df in data.groupby('external_id'):
for eid, __df in data.groupby("external_id"):
tmp = __df.iloc[1].copy()
# Calculating WaitTime, basically how long did the job took to start
# this is useful for some general profiling
tmp['WaitTime'] = datetime.strptime(
__df.iloc[0].Start, date_fmt) - datetime.strptime(
__df.iloc[0].Submit, date_fmt)
tmp["WaitTime"] = datetime.strptime(
__df.iloc[0].Start, date_fmt
) - datetime.strptime(__df.iloc[0].Submit, date_fmt)
df.append(tmp)
df = pd.DataFrame(df)

# This is important as we are transforming the MaxRSS to raw value
# so we need to confirm that there is no other suffixes
print('Make sure that only 0/K/M exist', set(
df.MaxRSS.apply(lambda x: str(x)[-1])))
print("Make sure that only 0/K/M exist", set(df.MaxRSS.apply(lambda x: str(x)[-1])))

# Generating new columns
df['MaxRSSRaw'] = df.MaxRSS.apply(lambda x: MaxRSS_helper(str(x)))
df['ElapsedRawTime'] = df.ElapsedRaw.apply(
lambda x: timedelta(seconds=float(x)))
df["MaxRSSRaw"] = df.MaxRSS.apply(lambda x: MaxRSS_helper(str(x)))
df["ElapsedRawTime"] = df.ElapsedRaw.apply(lambda x: timedelta(seconds=float(x)))

# Thu, Apr 27, 2023 was the first time Jeff and I changed the old allocations
# (from barnacle) to a better allocation so using job 1265533 as the
# before/after so we only use the latests for the newest version
df['updated'] = df.external_id.apply(
lambda x: 'after' if int(x) >= 1265533 else 'before')
df["updated"] = df.external_id.apply(
lambda x: "after" if int(x) >= 1265533 else "before"
)

fn = join('/panfs', 'qiita', f'jobs_{df.Start.max()[:10]}.tsv.gz')
df.to_csv(fn, sep='\t', index=False)
fn = join("/panfs", "qiita", f"jobs_{df.Start.max()[:10]}.tsv.gz")
df.to_csv(fn, sep="\t", index=False)
Loading
Loading