From 2c3b5d93ce7c77e9113a4901c1ab7f18c2fc58f5 Mon Sep 17 00:00:00 2001 From: Kevin Gurney Date: Tue, 14 Jan 2025 15:18:32 -0500 Subject: [PATCH] Add fromBytes and fromFile static "make" methods to arrow.io.ipc.RecordBatchStreamReader MATLAB class. --- .../ipc/proxy/record_batch_stream_reader.cc | 57 ++++++++++++++----- .../io/ipc/proxy/record_batch_stream_reader.h | 4 ++ .../+arrow/+io/+ipc/RecordBatchStreamReader.m | 22 +++++-- 3 files changed, 64 insertions(+), 19 deletions(-) diff --git a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc index f3c833484d38e..cbd9c5562e29c 100644 --- a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc +++ b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc @@ -15,8 +15,10 @@ // specific language governing permissions and limitations // under the License. +#include "arrow/matlab/buffer/matlab_buffer.h" #include "arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h" #include "arrow/io/file.h" +#include "arrow/io/memory.h" #include "arrow/matlab/error/error.h" #include "arrow/matlab/tabular/proxy/record_batch.h" #include "arrow/matlab/tabular/proxy/schema.h" @@ -36,6 +38,35 @@ RecordBatchStreamReader::RecordBatchStreamReader( REGISTER_METHOD(RecordBatchStreamReader, readTable); } +libmexclass::proxy::MakeResult RecordBatchStreamReader::fromFile(const libmexclass::proxy::FunctionArguments& constructor_arguments) { + const mda::StructArray opts = constructor_arguments[0]; + const mda::StringArray filename_mda = opts[0]["Filename"]; + const auto filename_utf16 = std::u16string(filename_mda[0]); + MATLAB_ASSIGN_OR_ERROR(const auto filename_utf8, + arrow::util::UTF16StringToUTF8(filename_utf16), + error::UNICODE_CONVERSION_ERROR_ID); + + MATLAB_ASSIGN_OR_ERROR(auto input_stream, arrow::io::ReadableFile::Open(filename_utf8), + error::FAILED_TO_OPEN_FILE_FOR_READ); + + MATLAB_ASSIGN_OR_ERROR(auto reader, + arrow::ipc::RecordBatchStreamReader::Open(input_stream), + error::IPC_RECORD_BATCH_READER_OPEN_FAILED); + + return std::make_shared(std::move(reader)); +} + +libmexclass::proxy::MakeResult RecordBatchStreamReader::fromBytes(const libmexclass::proxy::FunctionArguments& constructor_arguments) { + const mda::StructArray opts = constructor_arguments[0]; + const ::matlab::data::TypedArray bytes_mda = opts[0]["Bytes"]; + const auto matlab_buffer = std::make_shared(bytes_mda); + auto buffer_reader = std::make_shared(matlab_buffer); + MATLAB_ASSIGN_OR_ERROR(auto reader, + arrow::ipc::RecordBatchStreamReader::Open(buffer_reader), + error::IPC_RECORD_BATCH_READER_OPEN_FAILED); + return std::make_shared(std::move(reader)); +} + libmexclass::proxy::MakeResult RecordBatchStreamReader::make( const libmexclass::proxy::FunctionArguments& constructor_arguments) { namespace mda = ::matlab::data; @@ -44,20 +75,18 @@ libmexclass::proxy::MakeResult RecordBatchStreamReader::make( const mda::StructArray opts = constructor_arguments[0]; - const mda::StringArray filename_mda = opts[0]["Filename"]; - const auto filename_utf16 = std::u16string(filename_mda[0]); - MATLAB_ASSIGN_OR_ERROR(const auto filename_utf8, - arrow::util::UTF16StringToUTF8(filename_utf16), - error::UNICODE_CONVERSION_ERROR_ID); - - MATLAB_ASSIGN_OR_ERROR(auto input_stream, arrow::io::ReadableFile::Open(filename_utf8), - error::FAILED_TO_OPEN_FILE_FOR_READ); - - MATLAB_ASSIGN_OR_ERROR(auto reader, - arrow::ipc::RecordBatchStreamReader::Open(input_stream), - error::IPC_RECORD_BATCH_READER_OPEN_FAILED); - - return std::make_shared(std::move(reader)); + // Dispatch to the appropriate static "make" method depending + // on the input type. + const mda::StringArray type_mda = opts[0]["Type"]; + const auto type_utf16 = std::u16string(type_mda[0]); + if (type_utf16.equals(u"Bytes")) { + return RecordBatchStreamReader::fromBytes(constructor_arguments); + } else if (type_utf16.equals(u"Filename")) { + return RecordBatchStreamReader::fromFile(constructor_arguments); + } else { + // TODO: Create static error id string + return libmexclass::error::Error{"arrow:some:test:id", "Invalid construction type for RecordBatchStreamReader."}; + } } void RecordBatchStreamReader::getSchema(libmexclass::proxy::method::Context& context) { diff --git a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h index 56fb293987825..0492c46dc04cc 100644 --- a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h +++ b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h @@ -30,6 +30,10 @@ class RecordBatchStreamReader : public libmexclass::proxy::Proxy { static libmexclass::proxy::MakeResult make( const libmexclass::proxy::FunctionArguments& constructor_arguments); + static libmexclass::proxy::MakeResult fromFile( + const libmexclass::proxy::FunctionArguments& constructor_arguments); + static libmexclass::proxy::MakeResult fromBytes( + const libmexclass::proxy::FunctionArguments& constructor_arguments); protected: std::shared_ptr reader; diff --git a/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m index 60ca38eba9ad5..4ce58f1208546 100644 --- a/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m +++ b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m @@ -26,14 +26,26 @@ Schema end + methods (Static) + function obj = fromBytes(bytes) + args = struct(Bytes=bytes, Type="Bytes"); + proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader"; + obj.Proxy = arrow.internal.proxy.create(proxyName, args); + end + + function obj = fromFile(filename) + args = struct(Filename=filename, Type="File"); + proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader"; + obj.Proxy = arrow.internal.proxy.create(proxyName, args); + end + end + methods - function obj = RecordBatchStreamReader(filename) + function obj = RecordBatchStreamReader(proxy) arguments - filename(1, 1) string {mustBeNonzeroLengthText} + proxy(1, 1) libmexclass.proxy.Proxy end - args = struct(Filename=filename); - proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader"; - obj.Proxy = arrow.internal.proxy.create(proxyName, args); + obj.Proxy = proxy; end function schema = get.Schema(obj)