Skip to content

Commit

Permalink
Updates based on testing (#160)
Browse files Browse the repository at this point in the history
* Updates based on testing

* flake8

* ignore row order on dataframe comparison

* Resolve differences in test results

* Add Antonio's recent changes

* Updates based on feedback

* SeqCountsJob now uses TRIntegratedJob output
  • Loading branch information
charles-cowart authored Jan 2, 2025
1 parent b6fb6ff commit 1937477
Show file tree
Hide file tree
Showing 31 changed files with 404 additions and 153 deletions.
63 changes: 41 additions & 22 deletions sequence_processing_pipeline/Commands.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import glob
import gzip
import os
from sequence_processing_pipeline.util import iter_paired_files
from sequence_processing_pipeline.util import (iter_paired_files,
determine_orientation)


def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,
Expand All @@ -14,21 +15,17 @@ def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,
:return: The number of output-files created, size of largest bin.
'''

# SPP root paths are of the form:
# ../72dc6080-c453-418e-8a47-1ccd6d646a30/ConvertJob, and contain only
# directories named after projects e.g:
# 72dc6080-c453-418e-8a47-1ccd6d646a30/ConvertJob/Tell_Seq_15196.
# Each of these directories contain R1 and R2 fastq files. Hence, path
# is now the following:
# add one more level to account for project_names nested under ConvertJob
# dir.
# this will ignore the _I1_ reads that appear in the integrated result.
fastq_paths = glob.glob(data_location_path + '*/*/*.fastq.gz')

# case-specific filter for TellSeq output directories that also contain
# _I1_ files. Ensure paths are still sorted afterwards.
fastq_paths = [x for x in fastq_paths if '_I1_001.fastq.gz' not in x]
fastq_paths = sorted(fastq_paths)
# to prevent issues w/filenames like the ones below from being mistaken
# for R1 or R2 files, use determine_orientation().
# LS_8_22_2014_R2_SRE_S2_L007_I1_001.fastq.gz
# LS_8_22_2014_R1_SRE_S3_L007_I1_001.fastq.gz

# since the names of all fastq files are being scanned for orientation,
# collect all of them instead of mistakenly pre-filtering some files.
# fastq_paths = glob.glob(data_location_path + '/*/*_R?_*.fastq.gz')
fastq_paths = glob.glob(data_location_path + '/*/*.fastq.gz')
fastq_paths = [x for x in fastq_paths
if determine_orientation(x) in ['R1', 'R2']]

# convert from GB and halve as we sum R1
max_size = (int(max_file_list_size_in_gb) * (2 ** 30) / 2)
Expand Down Expand Up @@ -114,21 +111,43 @@ def demux(id_map, fp, out_d, task, maxtask):
openfps[idx] = current_fp

# setup a parser
id_ = iter(fp)
seq_id = iter(fp)
seq = iter(fp)
dumb = iter(fp)
qual = iter(fp)

for i, s, d, q in zip(id_, seq, dumb, qual):
fname_encoded, id_ = i.split(delimiter, 1)
for i, s, d, q in zip(seq_id, seq, dumb, qual):
# '@1', 'LH00444:84:227CNHLT4:7:1101:41955:2443/1'
# '@1', 'LH00444:84:227CNHLT4:7:1101:41955:2443/1 BX:Z:TATGACACATGCGGCCCT' # noqa
# '@baz/1

fname_encoded, sid = i.split(delimiter, 1)

if fname_encoded not in openfps:
continue

orientation = id_[-2] # -1 is \n
current_fp = openfps[fname_encoded]
id_ = rec + id_
current_fp[orientation].write(id_)

# remove '\n' from sid and split on all whitespace.
tmp = sid.strip().split()

if len(tmp) == 1:
# sequence id line contains no optional metadata.
# don't change sid.
# -1 is \n
orientation = sid[-2]
sid = rec + sid
elif len(tmp) == 2:
sid = tmp[0]
metadata = tmp[1]
# no '\n'
orientation = sid[-1]
# hexdump confirms separator is ' ', not '\t'
sid = rec + sid + ' ' + metadata + '\n'
else:
raise ValueError(f"'{sid}' is not a recognized form")

current_fp[orientation].write(sid)
current_fp[orientation].write(s)
current_fp[orientation].write(d)
current_fp[orientation].write(q)
Expand Down
2 changes: 2 additions & 0 deletions sequence_processing_pipeline/FastQCJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,13 @@ def _generate_job_script(self):
lines.append(f"#SBATCH -n {self.nprocs}")
lines.append("#SBATCH --time %d" % self.wall_time_limit)
lines.append(f"#SBATCH --mem {self.jmem}")
lines.append('#SBATCH --constraint="intel"')

lines.append("#SBATCH --array 1-%d%%%d" % (
len(self.commands), self.pool_size))

lines.append("set -x")
lines.append("set +e")
lines.append('date')
lines.append('hostname')
lines.append('echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID}')
Expand Down
51 changes: 32 additions & 19 deletions sequence_processing_pipeline/GenPrepFileJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
class GenPrepFileJob(Job):
def __init__(self, run_dir, convert_job_path, qc_job_path, output_path,
input_file_path, seqpro_path, modules_to_load,
qiita_job_id, is_amplicon=False):
qiita_job_id, reports_path, is_amplicon=False):

super().__init__(run_dir,
output_path,
Expand All @@ -31,13 +31,22 @@ def __init__(self, run_dir, convert_job_path, qc_job_path, output_path,
self.commands = []
self.has_replicates = False
self.replicate_count = 0
self.reports_path = reports_path

# make the 'root' of your run_directory
makedirs(join(self.output_path, self.run_id), exist_ok=True)
# copy bcl-convert's Stats-equivalent directory to the
# run_directory
copytree(join(convert_job_path, 'Reports'),
join(self.output_path, self.run_id, 'Reports'))

# This directory will already exist on restarts, hence avoid
# copying.
reports_dir = join(self.output_path, self.run_id, 'Reports')

if exists(reports_dir):
self.is_restart = True
else:
self.is_restart = False
copytree(self.reports_path, reports_dir)

# extracting from either convert_job_path or qc_job_path should
# produce equal results.
Expand All @@ -52,22 +61,26 @@ def __init__(self, run_dir, convert_job_path, qc_job_path, output_path,

dst = join(self.output_path, self.run_id, project)

if self.is_amplicon:
if exists(amplicon_seq_dir):
makedirs(dst, exist_ok=True)
symlink(amplicon_seq_dir, join(dst, 'amplicon'))
else:
if exists(filtered_seq_dir):
makedirs(dst, exist_ok=True)
symlink(filtered_seq_dir, join(dst, 'filtered_sequences'))

if exists(trimmed_seq_dir):
makedirs(dst, exist_ok=True)
symlink(trimmed_seq_dir, join(dst, 'trimmed_sequences'))

if exists(fastp_rept_dir):
makedirs(dst, exist_ok=True)
symlink(fastp_rept_dir, join(dst, 'json'))
if not self.is_restart:
# these will already be created if restarted.
if self.is_amplicon:
if exists(amplicon_seq_dir):
makedirs(dst, exist_ok=True)
symlink(amplicon_seq_dir, join(dst, 'amplicon'))
else:
if exists(filtered_seq_dir):
makedirs(dst, exist_ok=True)
symlink(filtered_seq_dir, join(dst,
'filtered_sequences'))

if exists(trimmed_seq_dir):
makedirs(dst, exist_ok=True)
symlink(trimmed_seq_dir, join(dst,
'trimmed_sequences'))

if exists(fastp_rept_dir):
makedirs(dst, exist_ok=True)
symlink(fastp_rept_dir, join(dst, 'json'))

# seqpro usage:
# seqpro path/to/run_dir path/to/sample/sheet /path/to/fresh/output_dir
Expand Down
Loading

0 comments on commit 1937477

Please sign in to comment.