diff --git a/pcpostprocess/hergQC.py b/pcpostprocess/hergQC.py index f5c4e4a..b17fd2d 100644 --- a/pcpostprocess/hergQC.py +++ b/pcpostprocess/hergQC.py @@ -75,8 +75,7 @@ def __init__(self, sampling_rate=5, plot_dir=None, voltage=np.array([]), n_sweeps=None, removal_time=5): # TODO docstring - if plot_dir is not None: - self.plot_dir = plot_dir + self._plot_dir = plot_dir self._n_qc = 16 @@ -134,8 +133,15 @@ def __init__(self, sampling_rate=5, plot_dir=None, voltage=np.array([]), self._debug = True - def set_trace(self, before, after, qc_vals_before, - qc_vals_after, n_sweeps): + @property + def plot_dir(self): + return self._plot_dir + + @plot_dir.setter + def plot_dir(self, path): + self._plot_dir = path + + def set_trace(self, before, after, qc_vals_before, qc_vals_after, n_sweeps): self._before = before self._qc_vals_before = qc_vals_before self._after = after @@ -154,7 +160,7 @@ def run_qc(self, voltage_steps, times, @param before is the before-drug current trace @param after is the post-drug current trace @param qc_vals_before is an array of values for each pre-drug sweep where each row is (Rseal, Cm, Rseries) - @param qc_vals_before is an array of values for each post-drug sweep where each row is (Rseal, Cm, Rseries) + @param qc_vals_after is an array of values for each post-drug sweep where each row is (Rseal, Cm, Rseries) @n_sweeps is the number of sweeps to be considered """ @@ -241,7 +247,7 @@ def run_qc(self, voltage_steps, times, QC['qc6.1.subtracted'].append(qc6_1) QC['qc6.2.subtracted'].append(qc6_2) - if self._debug: + if self.plot_dir and self._debug: fig = plt.figure(figsize=(8, 5)) ax = fig.subplots() ax.plot(times, (before - after).T, label='subtracted') diff --git a/tests/test_herg_qc.py b/tests/test_herg_qc.py index 88799a1..dec3876 100644 --- a/tests/test_herg_qc.py +++ b/tests/test_herg_qc.py @@ -1,5 +1,6 @@ import logging import os +import copy import string import unittest @@ -12,44 +13,60 @@ class TestHergQC(unittest.TestCase): def setUp(self): - filepath = os.path.join('tests', 'test_data', '13112023_MW2_FF', - 'staircaseramp (2)_2kHz_15.01.07') + base_path = os.path.join("tests", "test_data", "13112023_MW2_FF") - self.all_wells = [ - lab + str(i).zfill(2) for lab in string.ascii_uppercase[:16] - for i in range(1, 25)] + label_before = "staircaseramp (2)_2kHz_15.01.07" + label_after = "staircaseramp (2)_2kHz_15.11.33" - filepath2 = os.path.join('tests', 'test_data', '13112023_MW2_FF', - 'staircaseramp (2)_2kHz_15.11.33') + path_before = os.path.join(base_path, label_before) + path_after = os.path.join(base_path, label_after) - json_file = "staircaseramp (2)_2kHz_15.01.07" - json_file2 = "staircaseramp (2)_2kHz_15.11.33" + trace_before = Trace(path_before, json_file=label_before) + trace_after = Trace(path_after, json_file=label_after) - self.output_dir = os.path.join('test_output', 'test_herg_qc') + sweeps = [0, 1] + self.trace_sweeps_before = trace_before.get_trace_sweeps(sweeps) + self.trace_sweeps_after = trace_after.get_trace_sweeps(sweeps) + + self.qc_vals_before = trace_before.get_onboard_QC_values(sweeps=sweeps) + self.qc_vals_after = trace_after.get_onboard_QC_values(sweeps=sweeps) + + self.voltage = trace_before.get_voltage() + self.times = trace_after.get_times() - if not os.path.exists(self.output_dir): - os.makedirs(self.output_dir) + sampling_rate = int(1.0 / (self.times[1] - self.times[0])) # in kHz + + #  Assume that there are no discontinuities at the start or end of ramps + voltage_protocol = trace_before.get_voltage_protocol() + self.voltage_steps = [ + tstart + for tstart, _, vstart, vend in voltage_protocol.get_all_sections() + if vend == vstart + ] - self.test_trace_before = Trace(filepath, json_file) - self.test_trace_after = Trace(filepath2, json_file2) + plot_dir = "test_output" + if not os.path.exists(plot_dir): + os.makedirs(plot_dir) - self.voltage = self.test_trace_before.get_voltage() - self.times = self.test_trace_after.get_times() + self.hergqc = hERGQC( + sampling_rate=sampling_rate, + plot_dir=plot_dir, + voltage=self.voltage, + ) - # Calculate sampling rate in (use kHz) - self.sampling_rate = int(1.0 / (self.times[1] - self.times[0])) + def test_qc_inputs(self): + self.assertTrue(np.all(np.isfinite(self.voltage))) + self.assertTrue(np.all(np.isfinite(self.times))) def test_qc1(self): def passed(result): return all([x for x, _ in result]) - plot_dir = os.path.join(self.output_dir, "test_qc1") + hergqc = copy.deepcopy(self.hergqc) + plot_dir = os.path.join(hergqc.plot_dir, "test_qc1") if not os.path.exists(plot_dir): os.makedirs(plot_dir) - - hergqc = hERGQC(sampling_rate=self.sampling_rate, - plot_dir=plot_dir, - voltage=self.voltage) + hergqc.plot_dir = plot_dir # qc1 checks that rseal, cm, rseries are within range rseal_lo, rseal_hi = 1e8, 1e12 @@ -88,46 +105,62 @@ def passed(result): self.assertEqual( passed(hergqc.qc1(rseal, cm, rseries)), expected, - f"({rseal}, {cm}, {rseries})", + f"QC1: {rseal}, {cm}, {rseries}", ) - # TODO: Test on select data + # Test on select data + test_matrix = [ + ("A01", True), + ("A02", True), + ("A03", True), + ("A04", True), + ("A05", True), + ("A10", False), + ("A12", False), + ("A13", False), + ("A16", False), + ("A19", False), + ] + + for well, expected in test_matrix: + with self.subTest(well): + qc_vals_before = np.array(self.qc_vals_before[well])[0, :] + self.assertEqual( + passed(hergqc.qc1(*qc_vals_before)), + expected, + f"QC1: {well} (before) {qc_vals_before}", + ) def test_qc2(self): - plot_dir = os.path.join(self.output_dir, "test_qc2") + hergqc = copy.deepcopy(self.hergqc) + plot_dir = os.path.join(hergqc.plot_dir, "test_qc2") if not os.path.exists(plot_dir): os.makedirs(plot_dir) - - hergqc = hERGQC(sampling_rate=self.sampling_rate, - plot_dir=plot_dir, - voltage=self.voltage) + hergqc.plot_dir = plot_dir # qc2 checks that raw and subtracted SNR are above a minimum threshold test_matrix = [ - (40, True), # snr = 130286 (10, True), # snr = 8082 (1, True), # snr = 74 - (0.601, True), # snr = 25.07 + (0.601, True), # snr = 25.07 (0.6, False), # snr = 24.98 (0.5, False), # snr = 17 (0.1, False), # snr = 0.5 ] for i, expected in test_matrix: - recording = np.asarray([0, 0.1] * (NOISE_LEN//2) + [i] * 500) + recording = np.asarray([0, 0.1] * (NOISE_LEN // 2) + [i] * 500) result = hergqc.qc2(recording) self.assertEqual(result[0], expected, f"({i}: {result[1]})") # TODO: Test on select data def test_qc3(self): - plot_dir = os.path.join(self.output_dir, "test_qc3") + hergqc = copy.deepcopy(self.hergqc) + plot_dir = os.path.join(hergqc.plot_dir, "test_qc3") if not os.path.exists(plot_dir): os.makedirs(plot_dir) - - hergqc = hERGQC(sampling_rate=self.sampling_rate, - plot_dir=plot_dir, - voltage=self.voltage) + hergqc.plot_dir = plot_dir # qc3 checks that rmsd of two sweeps are similar @@ -135,17 +168,17 @@ def test_qc3(self): test_matrix = [ (-10, False), (-9, False), - (-8, False), # rmsdc - rmsd = -0.6761186497920804 - (-7, True), # rmsdc - rmsd = 0.25355095037585507 - (0, True), # rmsdc - rmsd = 6.761238263598085 - (8, True), # rmsdc - rmsd = 0.6761272774054383 - (9, False), # rmsdc - rmsd = -0.08451158778363332 + (-8, False), # rmsdc - rmsd = -0.6761186497920804 + (-7, True), # rmsdc - rmsd = 0.25355095037585507 + (0, True), # rmsdc - rmsd = 6.761238263598085 + (8, True), # rmsdc - rmsd = 0.6761272774054383 + (9, False), # rmsdc - rmsd = -0.08451158778363332 (10, False), ] - recording1 = np.asarray([0, 0.1] * (NOISE_LEN//2) + [40] * 500) + recording1 = np.asarray([0, 0.1] * (NOISE_LEN // 2) + [40] * 500) for i, expected in test_matrix: - recording2 = np.asarray([0, 0.1] * (NOISE_LEN//2) + [40 + i] * 500) + recording2 = np.asarray([0, 0.1] * (NOISE_LEN // 2) + [40 + i] * 500) result = hergqc.qc3(recording1, recording2) self.assertEqual(result[0], expected, f"({i}: {result[1]})") @@ -157,9 +190,9 @@ def test_qc3(self): (1000, True), ] - recording1 = np.asarray([0, 0.1] * (NOISE_LEN//2) + [40] * 500) + recording1 = np.asarray([0, 0.1] * (NOISE_LEN // 2) + [40] * 500) for i, expected in test_matrix: - recording2 = np.asarray([0, 0.1 * i] * (NOISE_LEN//2) + [40] * 500) + recording2 = np.asarray([0, 0.1 * i] * (NOISE_LEN // 2) + [40] * 500) result = hergqc.qc3(recording1, recording2) self.assertEqual(result[0], expected, f"({i}: {result[1]})") @@ -169,13 +202,11 @@ def test_qc4(self): def passed(result): return all([x for x, _ in result]) - plot_dir = os.path.join(self.output_dir, "test_qc1") + hergqc = copy.deepcopy(self.hergqc) + plot_dir = os.path.join(hergqc.plot_dir, "test_qc4") if not os.path.exists(plot_dir): os.makedirs(plot_dir) - - hergqc = hERGQC( - sampling_rate=self.sampling_rate, plot_dir=plot_dir, voltage=self.voltage - ) + hergqc.plot_dir = plot_dir # qc4 checks that rseal, cm, rseries are similar before/after E-4031 change r_lo, r_hi = 1e6, 3e7 @@ -262,13 +293,11 @@ def passed(result): # TODO: Test on select data def test_qc5(self): - plot_dir = os.path.join(self.output_dir, "test_qc5") + hergqc = copy.deepcopy(self.hergqc) + plot_dir = os.path.join(hergqc.plot_dir, "test_qc5") if not os.path.exists(plot_dir): os.makedirs(plot_dir) - - hergqc = hERGQC( - sampling_rate=self.sampling_rate, plot_dir=plot_dir, voltage=self.voltage - ) + hergqc.plot_dir = plot_dir # qc5 checks that the maximum current during the second half of the # staircase changes by at least 75% of the raw trace after E-4031 addition @@ -291,13 +320,11 @@ def test_qc5(self): # TODO: Test on select data def test_qc5_1(self): - plot_dir = os.path.join(self.output_dir, "test_qc5_1") + hergqc = copy.deepcopy(self.hergqc) + plot_dir = os.path.join(hergqc.plot_dir, "test_qc5_1") if not os.path.exists(plot_dir): os.makedirs(plot_dir) - - hergqc = hERGQC( - sampling_rate=self.sampling_rate, plot_dir=plot_dir, voltage=self.voltage - ) + hergqc.plot_dir = plot_dir # qc5_1 checks that the RMSD to zero of staircase protocol changes # by at least 50% of the raw trace after E-4031 addition. @@ -321,13 +348,11 @@ def test_qc5_1(self): # TODO: Test on select data def test_qc6(self): - plot_dir = os.path.join(self.output_dir, "test_qc2") + hergqc = copy.deepcopy(self.hergqc) + plot_dir = os.path.join(hergqc.plot_dir, "test_qc6") if not os.path.exists(plot_dir): os.makedirs(plot_dir) - - hergqc = hERGQC( - sampling_rate=self.sampling_rate, plot_dir=plot_dir, voltage=self.voltage - ) + hergqc.plot_dir = plot_dir # qc6 checks that the first step up to +40 mV, before the staircase, in # the subtracted trace is bigger than -2 x estimated noise level. @@ -349,56 +374,47 @@ def test_qc6(self): # TODO: Test on select data def test_run_qc(self): - self.assertTrue(np.all(np.isfinite(self.voltage))) - self.assertTrue(np.all(np.isfinite(self.times))) - - plot_dir = os.path.join(self.output_dir, - 'test_run_qc') + # Spot check a few wells; could check all, but it's time consuming. + hergqc = copy.deepcopy(self.hergqc) + plot_dir = os.path.join(hergqc.plot_dir, "test_run_qc") if not os.path.exists(plot_dir): os.makedirs(plot_dir) + hergqc.plot_dir = plot_dir - hergqc = hERGQC(sampling_rate=self.sampling_rate, - plot_dir=plot_dir, - voltage=self.voltage) - - sweeps = [0, 1] - before = self.test_trace_before.get_trace_sweeps(sweeps) - after = self.test_trace_after.get_trace_sweeps(sweeps) - qc_vals_before = self.test_trace_before.get_onboard_QC_values(sweeps=sweeps) - qc_vals_after = self.test_trace_after.get_onboard_QC_values(sweeps=sweeps) - - # Spot check a few wells - # We could check all of the wells but it's time consuming - - test_wells = ['A01', 'A02', 'A03'] - - voltage_protocol = self.test_trace_before.get_voltage_protocol() + test_matrix = [ + ("A01", True), + ("A02", True), + ("A03", True), + ("A04", False), + ("A05", False), + ("D01", False), + ] - for well in test_wells: + for well, expected in test_matrix: with self.subTest(well): # Take values from the first sweep only - qc_vals_before_well = np.array(qc_vals_before[well])[0, :] - qc_vals_after_well = np.array(qc_vals_after[well])[0, :] - - before_well = np.array(before[well]) - after_well = np.array(after[well]) - - #  Assume that there are no discontinuities at the start or end of ramps - voltage_steps = [tstart - for tstart, tend, vstart, vend in - voltage_protocol.get_all_sections() if vend == vstart] - - QC = hergqc.run_qc(voltage_steps, - self.times, before_well, after_well, - qc_vals_before_well, - qc_vals_after_well, n_sweeps=2) + before = np.array(self.trace_sweeps_before[well]) + after = np.array(self.trace_sweeps_after[well]) + + qc_vals_before = np.array(self.qc_vals_before[well])[0, :] + qc_vals_after = np.array(self.qc_vals_after[well])[0, :] + + QC = hergqc.run_qc( + voltage_steps=self.voltage_steps, + times=self.times, + before=before, + after=after, + qc_vals_before=qc_vals_before, + qc_vals_after=qc_vals_after, + n_sweeps=2, + ) logging.debug(well, QC.all_passed()) trace = "" for label, result in QC.items(): if not QC.qc_passed(label): - trace += f"{label}: {result}\n" - print(f"Testing Well, {well}") - self.assertTrue(QC.all_passed(), trace) + trace += f"{well} {label}: {result}\n" + + self.assertEqual(QC.all_passed(), expected, trace)