Skip to content

Commit

Permalink
ENH: Update Read CSV Filter caching to handle modified files. (#1078)
Browse files Browse the repository at this point in the history
* Adjust filter to re-read the headers from the header line every time.
* Adjust the filter to only re-read the file when the following occurs:

1. File path changes
2. File is modified
3. Headers mode is set to LINE and the header line has changed.

---------

Signed-off-by: Joey Kleingers <[email protected]>
  • Loading branch information
joeykleingers authored Sep 23, 2024
1 parent 9eeeacb commit 92d6cd6
Showing 1 changed file with 55 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct ReadCSVFileFilterCache
usize TotalLines = 0;
usize HeadersLine = 0;
std::string Headers;
fs::file_time_type LastModifiedTime;
};

std::atomic_int32_t s_InstanceId = 0;
Expand Down Expand Up @@ -323,6 +324,47 @@ IFilter::PreflightResult readHeaders(const std::string& inputFilePath, usize hea
return {};
}

Result<> cacheHeaders(const ReadCSVData& readCsvData)
{
std::fstream in(readCsvData.inputFilePath.c_str(), std::ios_base::in);
if(!in.is_open())
{
return MakeErrorResult(to_underlying(IssueCodes::FILE_NOT_OPEN), fmt::format("Could not open file for reading: {}", readCsvData.inputFilePath));
}

usize currentLine = 0;
while(!in.eof())
{
std::string line;
std::getline(in, line);
currentLine++;

if(currentLine == readCsvData.headersLine)
{
s_HeaderCache[s_InstanceId].Headers = line;
s_HeaderCache[s_InstanceId].HeadersLine = readCsvData.headersLine;
break;
}
}

return {};
}

Result<> cacheFullFile(const ReadCSVData& readCsvData)
{
s_HeaderCache[s_InstanceId].FilePath = readCsvData.inputFilePath;
auto result = cacheHeaders(readCsvData);
if(result.invalid())
{
return result;
}

s_HeaderCache[s_InstanceId].TotalLines = nx::core::FileUtilities::LinesInFile(readCsvData.inputFilePath);
s_HeaderCache[s_InstanceId].LastModifiedTime = fs::last_write_time(readCsvData.inputFilePath);

return {};
}

} // namespace

namespace nx::core
Expand Down Expand Up @@ -431,57 +473,28 @@ IFilter::PreflightResult ReadCSVFileFilter::preflightImpl(const DataStructure& d
}

StringVector headers;
if(readCSVData.inputFilePath != s_HeaderCache[s_InstanceId].FilePath)
auto lastModifiedTime = fs::last_write_time(readCSVData.inputFilePath);
if(readCSVData.inputFilePath != s_HeaderCache[s_InstanceId].FilePath || lastModifiedTime > s_HeaderCache[s_InstanceId].LastModifiedTime)
{
int64 lineCount = nx::core::FileUtilities::LinesInFile(inputFilePath);
if(lineCount < 0)
{
return {MakeErrorResult<OutputActions>(to_underlying(IssueCodes::FILE_NOT_OPEN), fmt::format("Could not open file for reading: {}", inputFilePath)), {}};
}
std::fstream in(inputFilePath.c_str(), std::ios_base::in);
if(!in.is_open())
// File path changed or file was modified
auto result = cacheFullFile(readCSVData);
if(result.invalid())
{
return {MakeErrorResult<OutputActions>(to_underlying(IssueCodes::FILE_NOT_OPEN), fmt::format("Could not open file for reading: {}", inputFilePath)), {}};
return {ConvertResultTo<OutputActions>(ConvertResult(std::move(result)), {})};
}

s_HeaderCache[s_InstanceId].FilePath = readCSVData.inputFilePath;

usize currentLine = 0;
while(!in.eof())
{
std::string line;
std::getline(in, line);
currentLine++;

if(headerMode == ReadCSVData::HeaderMode::LINE && currentLine == readCSVData.headersLine)
{
s_HeaderCache[s_InstanceId].Headers = line;
s_HeaderCache[s_InstanceId].HeadersLine = readCSVData.headersLine;
break;
}
}

headers = StringUtilities::split(s_HeaderCache[s_InstanceId].Headers, readCSVData.delimiters, readCSVData.consecutiveDelimiters);
s_HeaderCache[s_InstanceId].TotalLines = lineCount;
}
else if(headerMode == ReadCSVData::HeaderMode::LINE)
else if(headerMode == ReadCSVData::HeaderMode::LINE && readCSVData.headersLine != s_HeaderCache[s_InstanceId].HeadersLine)
{
if(readCSVData.headersLine != s_HeaderCache[s_InstanceId].HeadersLine)
// We are in header line mode and the header line number changed
auto result = cacheHeaders(readCSVData);
if(result.invalid())
{
IFilter::PreflightResult result = readHeaders(readCSVData.inputFilePath, readCSVData.headersLine, s_HeaderCache[s_InstanceId]);
if(result.outputActions.invalid())
{
return result;
}
return {ConvertResultTo<OutputActions>(ConvertResult(std::move(result)), {})};
}

headers = StringUtilities::split(s_HeaderCache[s_InstanceId].Headers, readCSVData.delimiters, readCSVData.consecutiveDelimiters);
}

if(headerMode == ReadCSVData::HeaderMode::CUSTOM)
{
headers = readCSVData.customHeaders;
}
headers = (headerMode == ReadCSVData::HeaderMode::LINE) ? StringUtilities::split(s_HeaderCache[s_InstanceId].Headers, readCSVData.delimiters, readCSVData.consecutiveDelimiters) :
readCSVData.customHeaders;

usize totalLines = s_HeaderCache[s_InstanceId].TotalLines;

Expand Down

0 comments on commit 92d6cd6

Please sign in to comment.