Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce TCP API #433

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions eventuals/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ cc_library(
"event-loop.h",
"filesystem.h",
"signal.h",
"tcp.h",
"tcp-acceptor.h",
"tcp-base.h",
"tcp-socket.h",
"tcp-ssl.h",
"tcp-ssl-context.h",
"tcp-ssl-context-builder.h",
"tcp-ssl-socket.h",
"timer.h",
],
copts = copts(),
Expand Down
31 changes: 25 additions & 6 deletions eventuals/event-loop.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ EventLoop::EventLoop()
check_.data = this;

uv_check_start(&check_, [](uv_check_t* check) {
((EventLoop*) check->data)->Check();
// Poll ASIO handles before scheduling.
((EventLoop*) check->data)->AsioPoll();
((EventLoop*) check->data)->Check(); // Schedules waiters.
});

// NOTE: we unreference 'check_' so that when we run the event
Expand Down Expand Up @@ -219,14 +221,21 @@ EventLoop::~EventLoop() {
static constexpr size_t ITERATIONS = 100000;
size_t iterations = ITERATIONS;

auto alive = Alive();
CHECK(uv_loop_alive(&loop_))
<< "should still have check and async handles to close";

CHECK(alive) << "should still have check and async handles to close";
size_t active_handles = 0;

do {
alive = uv_run(&loop_, UV_RUN_NOWAIT);
active_handles = uv_run(&loop_, UV_RUN_NOWAIT);

if (alive && --iterations == 0) {
// Is needed for the following 'io_context.run()'.
io_context_.restart();

// BLOCKS! Returns 0 ONLY if there are no active handles left.
active_handles += io_context_.run_one();

if (active_handles > 0 && --iterations == 0) {
std::ostringstream out;

out << "destructing EventLoop with active handles:\n";
Expand Down Expand Up @@ -264,9 +273,12 @@ EventLoop::~EventLoop() {

LOG(WARNING) << out.str();

// NOTE: there's currently no way for us to print out active
// ASIO handles.

iterations = ITERATIONS;
}
} while (alive);
} while (active_handles > 0);

CHECK_EQ(uv_loop_close(&loop_), 0);
}
Expand Down Expand Up @@ -387,6 +399,13 @@ void EventLoop::Check() {

////////////////////////////////////////////////////////////////////////

void EventLoop::AsioPoll() {
io_context().restart();
io_context().poll();
}

////////////////////////////////////////////////////////////////////////

} // namespace eventuals

////////////////////////////////////////////////////////////////////////
40 changes: 30 additions & 10 deletions eventuals/event-loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <string>
#include <tuple>

#include "asio.hpp"
#include "eventuals/callback.h"
#include "eventuals/closure.h"
#include "eventuals/lazy.h"
Expand Down Expand Up @@ -247,7 +248,8 @@ class EventLoop final : public Scheduler {
error_ = uv_timer_start(
timer(),
[](uv_timer_t* timer) {
auto& continuation = *(Continuation*) timer->data;
auto& continuation =
*(Continuation*) timer->data;
CHECK_EQ(timer, continuation.timer());
CHECK_EQ(
&continuation,
Expand All @@ -266,8 +268,10 @@ class EventLoop final : public Scheduler {
if (!continuation.error_) {
continuation.k_.Start();
} else {
continuation.k_.Fail(std::runtime_error(
uv_strerror(continuation.error_)));
continuation.k_.Fail(
std::runtime_error(
uv_strerror(
continuation.error_)));
}
});
}
Expand Down Expand Up @@ -482,7 +486,15 @@ class EventLoop final : public Scheduler {
in_event_loop_ = false;

status = future.wait_for(std::chrono::nanoseconds::zero());
} while (status != std::future_status::ready || waiters_.load() != nullptr);
} while (
status != std::future_status::ready
|| waiters_.load() != nullptr);
}

template <typename T, typename... Ts>
void RunUntil(std::future<T>& future, std::future<Ts...>& futures) {
RunUntil(future);
RunUntil(futures);
}

void RunWhileWaiters() {
Expand Down Expand Up @@ -517,10 +529,6 @@ class EventLoop final : public Scheduler {
template <typename E>
[[nodiscard]] auto Schedule(std::string&& name, E e);

bool Alive() {
return uv_loop_alive(&loop_);
}

bool Running() {
return running_.load();
}
Expand All @@ -533,6 +541,10 @@ class EventLoop final : public Scheduler {
return &loop_;
}

asio::io_context& io_context() {
return io_context_;
}

Clock& clock() {
return clock_;
}
Expand Down Expand Up @@ -849,7 +861,8 @@ class EventLoop final : public Scheduler {
CHECK(!continuation.error_);
continuation.error_ = uv_poll_stop(poll);
if (status == 0 && !continuation.error_) {
continuation.k_.Body(static_cast<PollEvents>(events));
continuation.k_.Body(
static_cast<PollEvents>(events));
} else {
continuation.completed_ = true;
if (!continuation.error_) {
Expand Down Expand Up @@ -1045,10 +1058,14 @@ class EventLoop final : public Scheduler {

void Check();

void AsioPoll();

uv_loop_t loop_ = {};
uv_check_t check_ = {};
uv_async_t async_ = {};

asio::io_context io_context_;

std::atomic<bool> running_ = false;

static inline thread_local bool in_event_loop_ = false;
Expand Down Expand Up @@ -1258,7 +1275,10 @@ template <typename E>
////////////////////////////////////////////////////////////////////////
template <typename E>
[[nodiscard]] auto EventLoop::Schedule(std::string&& name, E e) {
return _EventLoopSchedule::Composable<E>{std::move(e), this, std::move(name)};
return _EventLoopSchedule::Composable<E>{
std::move(e),
this,
std::move(name)};
}

////////////////////////////////////////////////////////////////////////
Expand Down
Loading