Skip to content

Commit

Permalink
Add fromBytes and fromFile static "make" methods to arrow.io.ipc.Reco…
Browse files Browse the repository at this point in the history
…rdBatchStreamReader MATLAB class.
  • Loading branch information
kevingurney committed Jan 14, 2025
1 parent 8b75373 commit 2c3b5d9
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
// specific language governing permissions and limitations
// under the License.

#include "arrow/matlab/buffer/matlab_buffer.h"
#include "arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h"
#include "arrow/io/file.h"
#include "arrow/io/memory.h"
#include "arrow/matlab/error/error.h"
#include "arrow/matlab/tabular/proxy/record_batch.h"
#include "arrow/matlab/tabular/proxy/schema.h"
Expand All @@ -36,6 +38,35 @@ RecordBatchStreamReader::RecordBatchStreamReader(
REGISTER_METHOD(RecordBatchStreamReader, readTable);
}

libmexclass::proxy::MakeResult RecordBatchStreamReader::fromFile(const libmexclass::proxy::FunctionArguments& constructor_arguments) {
const mda::StructArray opts = constructor_arguments[0];
const mda::StringArray filename_mda = opts[0]["Filename"];
const auto filename_utf16 = std::u16string(filename_mda[0]);
MATLAB_ASSIGN_OR_ERROR(const auto filename_utf8,
arrow::util::UTF16StringToUTF8(filename_utf16),
error::UNICODE_CONVERSION_ERROR_ID);

MATLAB_ASSIGN_OR_ERROR(auto input_stream, arrow::io::ReadableFile::Open(filename_utf8),
error::FAILED_TO_OPEN_FILE_FOR_READ);

MATLAB_ASSIGN_OR_ERROR(auto reader,
arrow::ipc::RecordBatchStreamReader::Open(input_stream),
error::IPC_RECORD_BATCH_READER_OPEN_FAILED);

return std::make_shared<RecordBatchStreamReaderProxy>(std::move(reader));
}

libmexclass::proxy::MakeResult RecordBatchStreamReader::fromBytes(const libmexclass::proxy::FunctionArguments& constructor_arguments) {
const mda::StructArray opts = constructor_arguments[0];
const ::matlab::data::TypedArray<uint8_t> bytes_mda = opts[0]["Bytes"];
const auto matlab_buffer = std::make_shared<matlab::arrow::MatlabBuffer>(bytes_mda);
auto buffer_reader = std::make_shared<arrow::io::memory::BufferReader>(matlab_buffer);
MATLAB_ASSIGN_OR_ERROR(auto reader,
arrow::ipc::RecordBatchStreamReader::Open(buffer_reader),
error::IPC_RECORD_BATCH_READER_OPEN_FAILED);
return std::make_shared<RecordBatchStreamReaderProxy>(std::move(reader));
}

libmexclass::proxy::MakeResult RecordBatchStreamReader::make(
const libmexclass::proxy::FunctionArguments& constructor_arguments) {
namespace mda = ::matlab::data;
Expand All @@ -44,20 +75,18 @@ libmexclass::proxy::MakeResult RecordBatchStreamReader::make(

const mda::StructArray opts = constructor_arguments[0];

const mda::StringArray filename_mda = opts[0]["Filename"];
const auto filename_utf16 = std::u16string(filename_mda[0]);
MATLAB_ASSIGN_OR_ERROR(const auto filename_utf8,
arrow::util::UTF16StringToUTF8(filename_utf16),
error::UNICODE_CONVERSION_ERROR_ID);

MATLAB_ASSIGN_OR_ERROR(auto input_stream, arrow::io::ReadableFile::Open(filename_utf8),
error::FAILED_TO_OPEN_FILE_FOR_READ);

MATLAB_ASSIGN_OR_ERROR(auto reader,
arrow::ipc::RecordBatchStreamReader::Open(input_stream),
error::IPC_RECORD_BATCH_READER_OPEN_FAILED);

return std::make_shared<RecordBatchStreamReaderProxy>(std::move(reader));
// Dispatch to the appropriate static "make" method depending
// on the input type.
const mda::StringArray type_mda = opts[0]["Type"];
const auto type_utf16 = std::u16string(type_mda[0]);
if (type_utf16.equals(u"Bytes")) {
return RecordBatchStreamReader::fromBytes(constructor_arguments);
} else if (type_utf16.equals(u"Filename")) {
return RecordBatchStreamReader::fromFile(constructor_arguments);
} else {
// TODO: Create static error id string
return libmexclass::error::Error{"arrow:some:test:id", "Invalid construction type for RecordBatchStreamReader."};
}
}

void RecordBatchStreamReader::getSchema(libmexclass::proxy::method::Context& context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ class RecordBatchStreamReader : public libmexclass::proxy::Proxy {

static libmexclass::proxy::MakeResult make(
const libmexclass::proxy::FunctionArguments& constructor_arguments);
static libmexclass::proxy::MakeResult fromFile(
const libmexclass::proxy::FunctionArguments& constructor_arguments);
static libmexclass::proxy::MakeResult fromBytes(
const libmexclass::proxy::FunctionArguments& constructor_arguments);

protected:
std::shared_ptr<arrow::ipc::RecordBatchStreamReader> reader;
Expand Down
22 changes: 17 additions & 5 deletions matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,26 @@
Schema
end

methods (Static)
function obj = fromBytes(bytes)
args = struct(Bytes=bytes, Type="Bytes");
proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader";
obj.Proxy = arrow.internal.proxy.create(proxyName, args);
end

function obj = fromFile(filename)
args = struct(Filename=filename, Type="File");
proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader";
obj.Proxy = arrow.internal.proxy.create(proxyName, args);
end
end

methods
function obj = RecordBatchStreamReader(filename)
function obj = RecordBatchStreamReader(proxy)
arguments
filename(1, 1) string {mustBeNonzeroLengthText}
proxy(1, 1) libmexclass.proxy.Proxy
end
args = struct(Filename=filename);
proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader";
obj.Proxy = arrow.internal.proxy.create(proxyName, args);
obj.Proxy = proxy;
end

function schema = get.Schema(obj)
Expand Down

0 comments on commit 2c3b5d9

Please sign in to comment.