Skip to content

Commit

Permalink
Merge pull request #1 from scap3yvt/828-feature-add-the-ability-to-sp…
Browse files Browse the repository at this point in the history
…lit-csvs-for-trainingvalidationtesting-as-a-separate-script

Merging
  • Loading branch information
scap3yvt authored Mar 24, 2024
2 parents 3e0480a + 96efef5 commit 2234933
Show file tree
Hide file tree
Showing 7 changed files with 368 additions and 273 deletions.
6 changes: 3 additions & 3 deletions GANDLF/compute/training_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,9 @@ def training_loop(
params["validation_data"] = validation_data
params["testing_data"] = testing_data
testingDataDefined = True
if params["testing_data"] is None:
# testing_data = validation_data
testingDataDefined = False
if not isinstance(testing_data, pd.DataFrame):
if params["testing_data"] is None:
testingDataDefined = False

# Setup a few variables for tracking
best_loss = 1e7
Expand Down
6 changes: 6 additions & 0 deletions GANDLF/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,12 @@ def _parseConfig(
"nested_training" in params
), "The parameter 'nested_training' needs to be defined"
# initialize defaults for nested training
params["nested_training"]["stratified"] = params["nested_training"].get(
"stratified", False
)
params["nested_training"]["stratified"] = params["nested_training"].get(
"proportional", params["nested_training"]["stratified"]
)
params["nested_training"]["testing"] = params["nested_training"].get("testing", -5)
params["nested_training"]["validation"] = params["nested_training"].get(
"validation", -5
Expand Down
333 changes: 79 additions & 254 deletions GANDLF/training_manager.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import pandas as pd
import os, sys, pickle, subprocess, shutil
from sklearn.model_selection import KFold
import os, pickle, shutil
from pathlib import Path

from GANDLF.compute import training_loop
from GANDLF.utils import get_dataframe
from GANDLF.utils import get_dataframe, split_data


def TrainingManager(
Expand Down Expand Up @@ -44,269 +43,95 @@ def TrainingManager(
)
parameters = pickle.load(open(currentModelConfigPickle, "rb"))

# check for single fold training
singleFoldValidation = False
singleFoldTesting = False
noTestingData = False
# if the user wants a single fold training
if parameters["nested_training"]["testing"] < 0:
parameters["nested_training"]["testing"] = abs(
parameters["nested_training"]["testing"]
)
singleFoldTesting = True
dataframe_split = split_data(dataframe, parameters)

# if the user wants a single fold training
if parameters["nested_training"]["validation"] < 0:
parameters["nested_training"]["validation"] = abs(
parameters["nested_training"]["validation"]
)
singleFoldValidation = True
last_indeces, _, _, _ = dataframe_split[-1]

# this is the condition where testing data is not to be kept
if parameters["nested_training"]["testing"] == 1:
noTestingData = True
singleFoldTesting = True
# put 2 just so that the first for-loop does not fail
parameters["nested_training"]["testing"] = 2

# initialize the kfold structures
kf_testing = KFold(n_splits=parameters["nested_training"]["testing"])
kf_validation = KFold(n_splits=parameters["nested_training"]["validation"])

currentTestingFold = 0

# split across subjects
subjectIDs_full = (
dataframe[dataframe.columns[parameters["headers"]["subjectIDHeader"]]]
.unique()
.tolist()
)

# get the indeces for kfold splitting
trainingData_full = dataframe

# start the kFold train for testing
for trainAndVal_index, testing_index in kf_testing.split(subjectIDs_full):
# ensure the validation fold is initialized per-testing split
currentValidationFold = 0

trainingAndValidationData = pd.DataFrame() # initialize the variable
testingData = pd.DataFrame() # initialize the variable
# get the current training and testing data
if noTestingData:
# don't consider the split indeces for this case
trainingAndValidationData = trainingData_full
testingData = None
else:
# loop over all trainAndVal_index and construct new dataframe
for subject_idx in trainAndVal_index:
trainingAndValidationData = trainingAndValidationData._append(
trainingData_full[
trainingData_full[
trainingData_full.columns[
parameters["headers"]["subjectIDHeader"]
]
]
== subjectIDs_full[subject_idx]
]
)

# loop over all testing_index and construct new dataframe
for subject_idx in testing_index:
testingData = testingData._append(
trainingData_full[
trainingData_full[
trainingData_full.columns[
parameters["headers"]["subjectIDHeader"]
]
]
== subjectIDs_full[subject_idx]
]
)
# check the last indeces to see if single fold training is requested
singleFoldTesting = True if last_indeces[0] == 0 else False
singleFoldValidation = True if last_indeces[1] == 0 else False

for (
testing_and_valid_indeces,
trainingData,
validationData,
testingData,
) in dataframe_split:
# the output of the current fold is only needed if multi-fold training is happening
if singleFoldTesting:
currentOutputFolder = outputDir
else:
currentOutputFolder = os.path.join(
outputDir, "testing_" + str(currentTestingFold)
currentTestingOutputFolder = outputDir
if not singleFoldTesting:
currentTestingOutputFolder = os.path.join(
outputDir, "testing_" + str(testing_and_valid_indeces[0])
)
Path(currentOutputFolder).mkdir(parents=True, exist_ok=True)
Path(currentTestingOutputFolder).mkdir(parents=True, exist_ok=True)

# save the current training+validation and testing datasets
if noTestingData:
print(
"WARNING: Testing data is empty, which will result in scientifically incorrect results; use at your own risk."
)
current_training_subject_indeces_full = subjectIDs_full
currentTestingDataPickle = "None"
else:
currentTrainingAndValidationDataPickle = os.path.join(
currentOutputFolder, "data_trainAndVal.pkl"
currentValidationOutputFolder = currentTestingOutputFolder
if not singleFoldValidation:
currentValidationOutputFolder = os.path.join(
currentTestingOutputFolder, str(testing_and_valid_indeces[1])
)
currentTestingDataPickle = os.path.join(
currentOutputFolder, "data_testing.pkl"
)

if (not os.path.exists(currentTestingDataPickle)) or reset or resume:
testingData.to_pickle(currentTestingDataPickle)
else:
if os.path.exists(currentTestingDataPickle):
print(
"Using previously saved testing data",
currentTestingDataPickle,
flush=True,
)
testingData = pd.read_pickle(currentTestingDataPickle)

if (
(not os.path.exists(currentTrainingAndValidationDataPickle))
or reset
or resume
):
trainingAndValidationData.to_pickle(
currentTrainingAndValidationDataPickle
Path(currentValidationOutputFolder).mkdir(parents=True, exist_ok=True)

# initialize the dataframes and save them to disk
data_dict = {
"training": trainingData,
"validation": validationData,
"testing": testingData,
}
data_dict_files = {}
for data_type, data in data_dict.items():
data_dict_files[data_type] = None
if data is not None:
currentDataPickle = os.path.join(
currentValidationOutputFolder, "data_" + data_type + ".pkl"
)
else:
if os.path.exists(currentTrainingAndValidationDataPickle):
print(
"Using previously saved training+validation data",
currentTrainingAndValidationDataPickle,
flush=True,
)
trainingAndValidationData = pd.read_pickle(
currentTrainingAndValidationDataPickle
)

current_training_subject_indeces_full = (
trainingAndValidationData[
trainingAndValidationData.columns[
parameters["headers"]["subjectIDHeader"]
]
]
.unique()
.tolist()
data_dict_files[data_type] = currentDataPickle
if (not os.path.exists(currentDataPickle)) or reset or resume:
data.to_pickle(currentDataPickle)
data.to_csv(currentDataPickle.replace(".pkl", ".csv"), index=False)
else:
# read the data from the pickle if present
data_dict[data_type] = get_dataframe(currentDataPickle)

# parallel_compute_command is an empty string, thus no parallel computing requested
if not parameters["parallel_compute_command"]:
training_loop(
training_data=data_dict["training"],
validation_data=data_dict["validation"],
output_dir=currentValidationOutputFolder,
device=device,
params=parameters,
testing_data=data_dict["testing"],
)

# start the kFold train for validation
for train_index, val_index in kf_validation.split(
current_training_subject_indeces_full
):
# the output of the current fold is only needed if multi-fold training is happening
if singleFoldValidation:
currentValOutputFolder = currentOutputFolder
else:
currentValOutputFolder = os.path.join(
currentOutputFolder, str(currentValidationFold)
)
Path(currentValOutputFolder).mkdir(parents=True, exist_ok=True)

trainingData = pd.DataFrame() # initialize the variable
validationData = pd.DataFrame() # initialize the variable

# loop over all train_index and construct new dataframe
for subject_idx in train_index:
trainingData = trainingData._append(
trainingData_full[
trainingData_full[
trainingData_full.columns[
parameters["headers"]["subjectIDHeader"]
]
]
== subjectIDs_full[subject_idx]
]
)

# loop over all val_index and construct new dataframe
for subject_idx in val_index:
validationData = validationData._append(
trainingData_full[
trainingData_full[
trainingData_full.columns[
parameters["headers"]["subjectIDHeader"]
]
]
== subjectIDs_full[subject_idx]
]
)

# # write parameters to pickle - this should not change for the different folds, so keeping is independent
## pickle/unpickle data
# pickle the data
currentTrainingDataPickle = os.path.join(
currentValOutputFolder, "data_training.pkl"
)
currentValidationDataPickle = os.path.join(
currentValOutputFolder, "data_validation.pkl"
else:
# call hpc command here
parallel_compute_command_actual = parameters[
"parallel_compute_command"
].replace("${outputDir}", currentValidationOutputFolder)

assert (
"python" in parallel_compute_command_actual
), "The 'parallel_compute_command_actual' needs to have the python from the virtual environment, which is usually '${GANDLF_dir}/venv/bin/python'"

command = (
parallel_compute_command_actual
+ " -m GANDLF.training_loop -train_loader_pickle "
+ data_dict_files["training"]
+ " -val_loader_pickle "
+ data_dict_files["validation"]
+ " -parameter_pickle "
+ currentModelConfigPickle
+ " -device "
+ str(device)
+ " -outputDir "
+ currentValidationOutputFolder
+ " -testing_loader_pickle "
+ data_dict_files["testing"]
)
if (not os.path.exists(currentTrainingDataPickle)) or reset or resume:
trainingData.to_pickle(currentTrainingDataPickle)
trainingData.to_csv(
currentTrainingDataPickle.replace(".pkl", ".csv"), index=False
)
else:
trainingData = get_dataframe(currentTrainingDataPickle)
if (not os.path.exists(currentValidationDataPickle)) or reset or resume:
validationData.to_pickle(currentValidationDataPickle)
validationData.to_csv(
currentValidationDataPickle.replace(".pkl", ".csv"), index=False
)
else:
validationData = get_dataframe(currentValidationDataPickle)

# parallel_compute_command is an empty string, thus no parallel computing requested
if (not parameters["parallel_compute_command"]) or (singleFoldValidation):
training_loop(
training_data=trainingData,
validation_data=validationData,
output_dir=currentValOutputFolder,
device=device,
params=parameters,
testing_data=testingData,
)

else:
# call qsub here
parallel_compute_command_actual = parameters[
"parallel_compute_command"
].replace("${outputDir}", currentValOutputFolder)

if not ("python" in parallel_compute_command_actual):
sys.exit(
"The 'parallel_compute_command_actual' needs to have the python from the virtual environment, which is usually '${GANDLF_dir}/venv/bin/python'"
)

command = (
parallel_compute_command_actual
+ " -m GANDLF.training_loop -train_loader_pickle "
+ currentTrainingDataPickle
+ " -val_loader_pickle "
+ currentValidationDataPickle
+ " -parameter_pickle "
+ currentModelConfigPickle
+ " -device "
+ str(device)
+ " -outputDir "
+ currentValOutputFolder
+ " -testing_loader_pickle "
+ currentTestingDataPickle
)

print(
"Submitting job for testing split "
+ str(currentTestingFold)
+ " and validation split "
+ str(currentValidationFold)
)
subprocess.Popen(command, shell=True).wait()

if singleFoldValidation:
break
currentValidationFold += 1 # go to next fold

if singleFoldTesting:
break
currentTestingFold += 1 # go to next fold
print("Running command: ", command, flush=True)
os.system(command, flush=True)


def TrainingManager_split(
Expand Down
2 changes: 2 additions & 0 deletions GANDLF/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,5 @@
save_model,
optimize_and_save_model,
)

from .data_splitter import split_data
Loading

0 comments on commit 2234933

Please sign in to comment.