Skip to content

Commit

Permalink
Implement RecordBatchFileReader::readRecordBatchAtIndex(...)
Browse files Browse the repository at this point in the history
  • Loading branch information
sgilmore10 committed Jun 13, 2024
1 parent 35c7cad commit ec77d85
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 2 deletions.
2 changes: 2 additions & 0 deletions matlab/src/cpp/arrow/matlab/error/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,5 +244,7 @@ static const char* C_EXPORT_FAILED = "arrow:c:export:ExportFailed";
static const char* C_IMPORT_FAILED = "arrow:c:import:ImportFailed";
static const char* IPC_RECORD_BATCH_WRITE_FAILED = "arrow:io:ipc:FailedToWriteRecordBatch";
static const char* IPC_RECORD_BATCH_READER_OPEN_FAILED = "arrow:io:ipc:FailedToOpenRecordBatchReader";
static const char* IPC_RECORD_BATCH_READ_INVALID_INDEX = "arrow:io:ipc:InvalidIndex";
static const char* IPC_RECORD_BATCH_READ_FAILED = "arrow:io:ipc:ReadFailed";

} // namespace arrow::matlab::error
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,31 @@
#include "arrow/io/file.h"
#include "arrow/matlab/error/error.h"
#include "arrow/matlab/io/ipc/proxy/record_batch_file_reader.h"
#include "arrow/matlab/tabular/proxy/record_batch.h"
#include "arrow/matlab/tabular/proxy/schema.h"
#include "arrow/util/utf8.h"

#include "libmexclass/proxy/ProxyManager.h"

namespace arrow::matlab::io::ipc::proxy {

namespace {
libmexclass::error::Error makeInvalidNumericIndexError(const int32_t matlab_index,
const int32_t num_batches) {
std::stringstream error_message_stream;
error_message_stream << "Invalid record batch index: ";
error_message_stream << matlab_index;
error_message_stream << ". Record batch index must be between 1 and the number of record batches (";
error_message_stream << num_batches;
error_message_stream << ").";
return libmexclass::error::Error{error::IPC_RECORD_BATCH_READ_INVALID_INDEX, error_message_stream.str()};
}
}

RecordBatchFileReader::RecordBatchFileReader(const std::shared_ptr<arrow::ipc::RecordBatchFileReader> reader)
: reader{std::move(reader)} {
REGISTER_METHOD(RecordBatchFileReader, getNumRecordBatches);
REGISTER_METHOD(RecordBatchFileReader, getSchema);

}

libmexclass::proxy::MakeResult RecordBatchFileReader::make(const libmexclass::proxy::FunctionArguments& constructor_arguments) {
Expand Down Expand Up @@ -75,9 +88,33 @@ void RecordBatchFileReader::getSchema(libmexclass::proxy::method::Context& conte
mda::ArrayFactory factory;
const auto schema_proxy_id_mda = factory.createScalar(schema_proxy_id);
context.outputs[0] = schema_proxy_id_mda;

}

void RecordBatchFileReader::readRecordBatchAtIndex(libmexclass::proxy::method::Context& context) {
namespace mda = ::matlab::data;
using RecordBatchProxy = arrow::matlab::tabular::proxy::RecordBatch;

mda::StructArray opts = context.inputs[0];
const mda::TypedArray<int32_t> matlab_index_mda = opts[0]["Index"];

const auto matlab_index = matlab_index_mda[0];
const auto num_record_batches = reader->num_record_batches();
if (matlab_index < 1 || matlab_index > num_record_batches) {
context.error = makeInvalidNumericIndexError(matlab_index, num_record_batches);
return;
}
const auto arrow_index = matlab_index - 1;

MATLAB_ASSIGN_OR_ERROR_WITH_CONTEXT(const auto record_batch, reader->ReadRecordBatch(arrow_index),
context, error::IPC_RECORD_BATCH_READ_FAILED);

auto record_batch_proxy = std::make_shared<RecordBatchProxy>(std::move(record_batch));
const auto record_batch_proxy_id = libmexclass::proxy::ProxyManager::manageProxy(record_batch_proxy);

mda::ArrayFactory factory;
const auto record_batch_proxyy_id_mda = factory.createScalar(record_batch_proxy_id);
context.outputs[0] = record_batch_proxyy_id_mda;
}



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ class RecordBatchFileReader : public libmexclass::proxy::Proxy {

void getSchema(libmexclass::proxy::method::Context& context);

void readRecordBatchAtIndex(libmexclass::proxy::method::Context& context);



};

Expand Down

0 comments on commit ec77d85

Please sign in to comment.