Skip to content

Commit

Permalink
refactor: IOStream Base
Browse files Browse the repository at this point in the history
  • Loading branch information
MistEO committed Dec 6, 2023
1 parent e9bd186 commit 4ec1ced
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 107 deletions.
70 changes: 26 additions & 44 deletions source/MaaUtils/IOStream/ChildPipeIOStream.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include "Utils/IOStream/ChildPipeIOStream.h"

#include "Utils/Time.hpp"
#include "Utils/Logger.h"

MAA_NS_BEGIN

Expand All @@ -11,8 +11,7 @@ ChildPipeIOStream::ChildPipeIOStream(const std::filesystem::path& exec, const st
,
boost::process::windows::create_no_window
#endif
),
buffer_(std::make_unique<char[]>(kBufferSize))
)
{}

ChildPipeIOStream::~ChildPipeIOStream()
Expand All @@ -22,53 +21,16 @@ ChildPipeIOStream::~ChildPipeIOStream()

bool ChildPipeIOStream::write(std::string_view data)
{
if (!child_.running()) {
if (is_open()) {
LogError << "not opened";
return false;
}

pout_ << data << std::endl;
return true;
}

std::string ChildPipeIOStream::read(duration_t timeout)
{
return read_some(std::numeric_limits<size_t>::max(), timeout);
}

std::string ChildPipeIOStream::read_some(size_t count, duration_t timeout)
{
auto start_time = std::chrono::steady_clock::now();
std::string result;

while (pin_.is_open() && result.size() < count && duration_since(start_time) < timeout) {
auto read_size = std::min(kBufferSize, count - result.size());
auto read_num = pin_.readsome(buffer_.get(), read_size);
result.append(buffer_.get(), read_num);
}

return result;
}

std::string ChildPipeIOStream::read_until(std::string_view delimiter, duration_t timeout)
{
auto start_time = std::chrono::steady_clock::now();

std::string result;

while (!result.ends_with(delimiter)) {
auto sub_timeout = timeout - duration_since<duration_t>(start_time);
if (sub_timeout < duration_t::zero()) {
break;
}

auto sub_str = read_some(1, sub_timeout);
result.append(std::move(sub_str));
}

return result;
}

int ChildPipeIOStream::release()
bool ChildPipeIOStream::release()
{
if (child_.running()) {
child_.terminate();
Expand All @@ -77,12 +39,32 @@ int ChildPipeIOStream::release()
child_.wait();
}

return child_.exit_code();
int code = child_.exit_code();

if (code != 0) {
LogError << "child exit with" << code;
return false;
}

return true;
}

bool ChildPipeIOStream::is_open() const
{
return pin_.is_open();
}

std::string ChildPipeIOStream::read_once(size_t max_count)
{
constexpr size_t kBufferSize = 128 * 1024;

if (!buffer_) {
buffer_ = std::make_unique<char[]>(kBufferSize);
}

auto read_size = std::min(kBufferSize, max_count);
auto read_num = pin_.readsome(buffer_.get(), read_size);
return std::string(buffer_.get(), read_num);
}

MAA_NS_END
44 changes: 44 additions & 0 deletions source/MaaUtils/IOStream/IOStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#include "Utils/IOStream/IOStream.h"

#include "Utils/Time.hpp"

MAA_NS_BEGIN

std::string IOStream::read(duration_t timeout)
{
return read_some(std::numeric_limits<size_t>::max(), timeout);
}

std::string IOStream::read_some(size_t count, duration_t timeout)
{
auto start_time = std::chrono::steady_clock::now();
std::string result;

while (is_open() && result.size() < count && duration_since(start_time) < timeout) {
auto data = read_once(count - result.size());
result.append(data);
}

return result;
}

std::string IOStream::read_until(std::string_view delimiter, duration_t timeout)
{
auto start_time = std::chrono::steady_clock::now();

std::string result;

while (!result.ends_with(delimiter)) {
auto sub_timeout = timeout - duration_since<duration_t>(start_time);
if (sub_timeout < duration_t::zero()) {
break;
}

auto sub_str = read_some(1, sub_timeout);
result.append(std::move(sub_str));
}

return result;
}

MAA_NS_END
55 changes: 16 additions & 39 deletions source/MaaUtils/IOStream/SockIOStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,66 +64,43 @@ std::shared_ptr<SockIOStream> ClientSockIOFactory::connect()
return std::make_shared<SockIOStream>(std::move(sock));
}

SockIOStream::SockIOStream(boost::asio::ip::tcp::socket&& sock)
: sock_(std::move(sock)), buffer_(std::make_unique<char[]>(kBufferSize))
{}
SockIOStream::SockIOStream(boost::asio::ip::tcp::socket&& sock) : sock_(std::move(sock)) {}

SockIOStream::~SockIOStream()
{
sock_.close();
}
SockIOStream::~SockIOStream() {}

bool SockIOStream::write(std::string_view data)
{
if (!sock_.is_open()) {
if (!is_open()) {
LogError << "not opened";
return false;
}

sock_.write_some(boost::asio::buffer(data));
return true;
}

std::string SockIOStream::read(duration_t timeout)
bool SockIOStream::release()
{
return read_some(std::numeric_limits<size_t>::max(), timeout);
sock_.close();
return true;
}

std::string SockIOStream::read_some(size_t count, duration_t timeout)
bool SockIOStream::is_open() const
{
auto start_time = std::chrono::steady_clock::now();
std::string result;

while (sock_.is_open() && result.size() < count && duration_since(start_time) < timeout) {
auto read_size = std::min(kBufferSize, count - result.size());
auto read_num = sock_.read_some(boost::asio::mutable_buffer(buffer_.get(), read_size));
result.append(buffer_.get(), read_num);
}

return result;
return sock_.is_open();
}

std::string SockIOStream::read_until(std::string_view delimiter, duration_t timeout)
std::string SockIOStream::read_once(size_t max_count)
{
auto start_time = std::chrono::steady_clock::now();

std::string result;
constexpr size_t kBufferSize = 128 * 1024;

while (!result.ends_with(delimiter)) {
auto sub_timeout = timeout - duration_since<duration_t>(start_time);
if (sub_timeout < duration_t::zero()) {
break;
}

auto sub_str = read_some(1, sub_timeout);
result.append(std::move(sub_str));
if (!buffer_) {
buffer_ = std::make_unique<char[]>(kBufferSize);
}

return result;
}

bool SockIOStream::is_open() const
{
return sock_.is_open();
auto read_size = std::min(kBufferSize, max_count);
auto read_num = sock_.read_some(boost::asio::mutable_buffer(buffer_.get(), read_size));
return std::string(buffer_.get(), read_num);
}

MAA_NS_END
2 changes: 1 addition & 1 deletion source/include/LibraryHolder/LibraryHolder.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ inline bool LibraryHolder<T>::load_library(const std::filesystem::path& libname)
boost::dll::fs::error_code ec;
module_.load(libname, ec, boost::dll::load_mode::append_decorations | boost::dll::load_mode::search_system_folders);

if (ec.value() != boost::system::errc::success) {
if (ec) {
auto message = ec.message();
LogError << "Failed to load library" << VAR(libname) << VAR(message);
return false;
Expand Down
19 changes: 7 additions & 12 deletions source/include/Utils/IOStream/ChildPipeIOStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,27 @@

#include <memory>

#include "IOStream.h"
#include "MaaFramework/MaaPort.h"
#include "Utils/Boost.hpp"
#include "Utils/NonCopyable.hpp"

MAA_NS_BEGIN

class MAA_UTILS_API ChildPipeIOStream : public NonCopyButMovable
class MAA_UTILS_API ChildPipeIOStream : public IOStream
{
using duration_t = std::chrono::milliseconds;

static constexpr size_t kBufferSize = 128 * 1024;

public:
ChildPipeIOStream(const std::filesystem::path& exec, const std::vector<std::string>& args);

virtual ~ChildPipeIOStream();

public:
bool write(std::string_view data);
virtual bool write(std::string_view data) override;

std::string read(duration_t timeout = duration_t::max());
std::string read_some(size_t count, duration_t timeout = duration_t::max());
std::string read_until(std::string_view delimiter, duration_t timeout = duration_t::max());
virtual bool release() override;
virtual bool is_open() const override;

int release();
bool is_open() const;
protected:
virtual std::string read_once(size_t max_count) override;

private:
boost::process::ipstream pin_;
Expand Down
33 changes: 33 additions & 0 deletions source/include/Utils/IOStream/IOStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once

#include <chrono>
#include <string>
#include <string_view>

#include "MaaFramework/MaaPort.h"
#include "Utils/NonCopyable.hpp"

MAA_NS_BEGIN

class MAA_UTILS_API IOStream : public NonCopyButMovable
{
using duration_t = std::chrono::milliseconds;

public:
virtual ~IOStream() = default;

public:
virtual bool write(std::string_view data) = 0;

virtual std::string read(duration_t timeout = duration_t::max());
virtual std::string read_some(size_t count, duration_t timeout = duration_t::max());
virtual std::string read_until(std::string_view delimiter, duration_t timeout = duration_t::max());

virtual bool release() = 0;
virtual bool is_open() const = 0;

protected:
virtual std::string read_once(size_t max_count) = 0;
};

MAA_NS_END
19 changes: 8 additions & 11 deletions source/include/Utils/IOStream/SockIOStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <memory>

#include "IOStream.h"
#include "MaaFramework/MaaPort.h"
#include "Utils/Boost.hpp"
#include "Utils/NonCopyable.hpp"
Expand Down Expand Up @@ -40,24 +41,20 @@ class MAA_UTILS_API ClientSockIOFactory : public NonCopyButMovable
boost::asio::ip::tcp::endpoint endpoint_;
};

class MAA_UTILS_API SockIOStream : public NonCopyButMovable
class MAA_UTILS_API SockIOStream : public IOStream
{
using duration_t = std::chrono::milliseconds;

static constexpr size_t kBufferSize = 128 * 1024;

public:
SockIOStream(boost::asio::ip::tcp::socket&& sock);
~SockIOStream();
virtual ~SockIOStream() override;

public:
bool write(std::string_view data);
virtual bool write(std::string_view data) override;

std::string read(duration_t timeout = duration_t::max());
std::string read_some(size_t count, duration_t timeout = duration_t::max());
std::string read_until(std::string_view delimiter, duration_t timeout = duration_t::max());
virtual bool release() override;
virtual bool is_open() const override;

bool is_open() const;
protected:
virtual std::string read_once(size_t max_count) override;

private:
boost::asio::ip::tcp::socket sock_;
Expand Down

0 comments on commit 4ec1ced

Please sign in to comment.