From 020e89da262336341b0820a2fc65078480c624ab Mon Sep 17 00:00:00 2001 From: Yota Toyama Date: Tue, 16 Jul 2024 17:24:53 +0900 Subject: [PATCH 1/2] Fix --- src/prime_server.cpp | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/prime_server.cpp b/src/prime_server.cpp index 1aee966..3231675 100644 --- a/src/prime_server.cpp +++ b/src/prime_server.cpp @@ -144,7 +144,7 @@ server_t::server_t( uint32_t request_timeout, const health_check_matcher_t& health_check_matcher, const std::string& health_check_response) - : client(context, ZMQ_STREAM), proxy(context, ZMQ_DEALER), loopback(context, ZMQ_SUB), + : client(context, ZMQ_STREAM), proxy(context, ZMQ_DEALER), loopback(context, ZMQ_PULL), interrupt(context, ZMQ_PUB), log(log), max_request_size(max_request_size), request_timeout(request_timeout), request_id(0), health_check_matcher(health_check_matcher), health_check_response(health_check_response.size(), health_check_response.data()) { @@ -158,9 +158,6 @@ server_t::server_t( proxy.setsockopt(ZMQ_SNDHWM, &disabled, sizeof(disabled)); proxy.connect(proxy_endpoint.c_str()); - // TODO: consider making this into a pull socket so we dont lose any results due to timing - loopback.setsockopt(ZMQ_RCVHWM, &disabled, sizeof(disabled)); - loopback.setsockopt(ZMQ_SUBSCRIBE, "", 0); loopback.bind(result_endpoint.c_str()); interrupt.setsockopt(ZMQ_SNDHWM, &disabled, sizeof(disabled)); @@ -468,7 +465,7 @@ worker_t::worker_t(zmq::context_t& context, const cleanup_function_t& cleanup_function, const std::string& heart_beat) : upstream_proxy(context, ZMQ_DEALER), downstream_proxy(context, ZMQ_DEALER), - loopback(context, ZMQ_PUB), interrupt(context, ZMQ_SUB), work_function(work_function), + loopback(context, ZMQ_PUSH), interrupt(context, ZMQ_SUB), work_function(work_function), cleanup_function(cleanup_function), heart_beat_interval(5000), heart_beat(heart_beat), job(std::numeric_limits::max()) { @@ -482,7 +479,6 @@ worker_t::worker_t(zmq::context_t& context, downstream_proxy.setsockopt(ZMQ_SNDHWM, &disabled, sizeof(disabled)); downstream_proxy.connect(downstream_proxy_endpoint.c_str()); - loopback.setsockopt(ZMQ_SNDHWM, &disabled, sizeof(disabled)); loopback.connect(result_endpoint.c_str()); interrupt.setsockopt(ZMQ_RCVHWM, &disabled, sizeof(disabled)); From 7c75d3358b1a54ca9b47bc676791dc7ff3844320 Mon Sep 17 00:00:00 2001 From: Yota Toyama Date: Tue, 30 Jul 2024 17:45:51 +0900 Subject: [PATCH 2/2] Revert socket options --- src/prime_server.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/prime_server.cpp b/src/prime_server.cpp index 3231675..8d5d9e8 100644 --- a/src/prime_server.cpp +++ b/src/prime_server.cpp @@ -158,6 +158,7 @@ server_t::server_t( proxy.setsockopt(ZMQ_SNDHWM, &disabled, sizeof(disabled)); proxy.connect(proxy_endpoint.c_str()); + loopback.setsockopt(ZMQ_RCVHWM, &disabled, sizeof(disabled)); loopback.bind(result_endpoint.c_str()); interrupt.setsockopt(ZMQ_SNDHWM, &disabled, sizeof(disabled)); @@ -479,6 +480,7 @@ worker_t::worker_t(zmq::context_t& context, downstream_proxy.setsockopt(ZMQ_SNDHWM, &disabled, sizeof(disabled)); downstream_proxy.connect(downstream_proxy_endpoint.c_str()); + loopback.setsockopt(ZMQ_SNDHWM, &disabled, sizeof(disabled)); loopback.connect(result_endpoint.c_str()); interrupt.setsockopt(ZMQ_RCVHWM, &disabled, sizeof(disabled));