diff --git a/core/parachain/pvf/kagome_pvf_worker.cpp b/core/parachain/pvf/kagome_pvf_worker.cpp index 9ad170c465..c431086b2b 100644 --- a/core/parachain/pvf/kagome_pvf_worker.cpp +++ b/core/parachain/pvf/kagome_pvf_worker.cpp @@ -5,7 +5,6 @@ */ #include -#include #include #include #include @@ -23,6 +22,7 @@ #include #include +#include #include #include #include @@ -62,6 +62,8 @@ } namespace kagome::parachain { + using unix = boost::asio::local::stream_protocol; + namespace { // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) static kagome::log::Logger logger; @@ -229,26 +231,21 @@ namespace kagome::parachain { } #endif - outcome::result readStdin(std::span out) { - std::cin.read( - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - reinterpret_cast(out.data()), - // NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions) - out.size()); - if (not std::cin.good()) { - return std::errc::io_error; - } - return outcome::success(); - } - template - outcome::result decodeInput() { + outcome::result decodeInput(unix::socket &socket) { // NOLINTNEXTLINE(cppcoreguidelines-pro-type-member-init,hicpp-member-init) std::array length_bytes; - OUTCOME_TRY(readStdin(length_bytes)); + boost::system::error_code ec; + boost::asio::read(socket, boost::asio::buffer(length_bytes), ec); + if (ec) { + return ec; + } OUTCOME_TRY(message_length, scale::decode(length_bytes)); std::vector packed_message(message_length, 0); - OUTCOME_TRY(readStdin(packed_message)); + boost::asio::read(socket, boost::asio::buffer(packed_message), ec); + if (ec) { + return ec; + } return scale::decode(packed_message); } @@ -282,8 +279,16 @@ namespace kagome::parachain { } } - outcome::result pvf_worker_main_outcome() { - OUTCOME_TRY(input_config, decodeInput()); + outcome::result pvf_worker_main_outcome( + const std::string &unix_socket_path) { + boost::asio::io_context io_context; + unix::socket socket{io_context}; + boost::system::error_code ec; + socket.connect(unix_socket_path, ec); + if (ec) { + return ec; + } + OUTCOME_TRY(input_config, decodeInput(socket)); kagome::log::tuneLoggingSystem(input_config.log_params); SL_VERBOSE(logger, "Cache directory: {}", input_config.cache_dir); @@ -347,7 +352,7 @@ namespace kagome::parachain { OUTCOME_TRY(factory, createModuleFactory(injector, input_config.engine)); std::shared_ptr module; while (true) { - OUTCOME_TRY(input, decodeInput()); + OUTCOME_TRY(input, decodeInput(socket)); if (auto *code_params = std::get_if(&input)) { auto &path = code_params->path; @@ -370,17 +375,14 @@ namespace kagome::parachain { OUTCOME_TRY(instance->resetEnvironment()); OUTCOME_TRY(len, scale::encode(result.size())); - std::cout.write( - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - reinterpret_cast(len.data()), - // NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions) - len.size()); - std::cout.write( - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - reinterpret_cast(result.data()), - // NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions) - result.size()); - std::cout.flush(); + boost::asio::write(socket, boost::asio::buffer(len), ec); + if (ec) { + return ec; + } + boost::asio::write(socket, boost::asio::buffer(result), ec); + if (ec) { + return ec; + } } } @@ -399,14 +401,12 @@ namespace kagome::parachain { } kagome::log::setLoggingSystem(logging_system); logger = kagome::log::createLogger("PVF Worker", "parachain"); - - if (!checkEnvVarsEmpty(env)) { - logger->error( - "PVF worker processes must not have any environment variables."); + if (argc < 2) { + SL_ERROR(logger, "missing unix socket path arg"); return EXIT_FAILURE; } - - if (auto r = pvf_worker_main_outcome(); not r) { + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + if (auto r = pvf_worker_main_outcome(argv[1]); not r) { SL_ERROR(logger, "PVF worker process failed: {}", r.error()); return EXIT_FAILURE; } diff --git a/core/parachain/pvf/kagome_pvf_worker_injector.hpp b/core/parachain/pvf/kagome_pvf_worker_injector.hpp index 9893aa1336..0f10ab2929 100644 --- a/core/parachain/pvf/kagome_pvf_worker_injector.hpp +++ b/core/parachain/pvf/kagome_pvf_worker_injector.hpp @@ -15,11 +15,14 @@ #include "crypto/ed25519/ed25519_provider_impl.hpp" #include "crypto/elliptic_curves/elliptic_curves_impl.hpp" #include "crypto/hasher/hasher_impl.hpp" +#include "crypto/key_store.hpp" #include "crypto/pbkdf2/impl/pbkdf2_provider_impl.hpp" #include "crypto/secp256k1/secp256k1_provider_impl.hpp" #include "crypto/sr25519/sr25519_provider_impl.hpp" #include "host_api/impl/host_api_factory_impl.hpp" #include "injector/bind_by_lambda.hpp" +#include "offchain/offchain_persistent_storage.hpp" +#include "offchain/offchain_worker_pool.hpp" #include "parachain/pvf/pvf_worker_types.hpp" #include "runtime/binaryen/instance_environment_factory.hpp" #include "runtime/binaryen/module/module_factory_impl.hpp" @@ -27,6 +30,7 @@ #include "runtime/common/runtime_properties_cache_impl.hpp" #include "runtime/memory_provider.hpp" #include "runtime/module.hpp" +#include "runtime/runtime_instances_pool.hpp" #include "runtime/wasm_compiler_definitions.hpp" // this header-file is generated #include "storage/trie/serialization/trie_serializer_impl.hpp" #include "storage/trie/trie_storage.hpp" diff --git a/core/parachain/pvf/workers.cpp b/core/parachain/pvf/workers.cpp index 5551676a6d..4aaffafb95 100644 --- a/core/parachain/pvf/workers.cpp +++ b/core/parachain/pvf/workers.cpp @@ -6,8 +6,7 @@ #include "parachain/pvf/workers.hpp" -#include -#include +#include #include #include #include @@ -15,26 +14,19 @@ #include "application/app_configuration.hpp" #include "common/main_thread_pool.hpp" +#include "filesystem/common.hpp" #include "parachain/pvf/pvf_worker_types.hpp" #include "utils/get_exe_path.hpp" #include "utils/weak_macro.hpp" namespace kagome::parachain { - constexpr auto kMetricQueueSize = "kagome_pvf_queue_size"; + using unix = boost::asio::local::stream_protocol; - struct AsyncPipe : boost::process::async_pipe { - using async_pipe::async_pipe; - using lowest_layer_type = AsyncPipe; - }; + constexpr auto kMetricQueueSize = "kagome_pvf_queue_size"; struct ProcessAndPipes : std::enable_shared_from_this { - AsyncPipe pipe_stdin; - // NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members) - AsyncPipe &writer; - AsyncPipe pipe_stdout; - // NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members) - AsyncPipe &reader; boost::process::child process; + std::optional socket; std::shared_ptr writing = std::make_shared(); std::shared_ptr reading = std::make_shared(); @@ -44,23 +36,18 @@ namespace kagome::parachain { ProcessAndPipes(boost::asio::io_context &io_context, const std::string &exe, + const std::string &unix_socket_path, const Config &config) - : pipe_stdin{io_context}, - writer{pipe_stdin}, - pipe_stdout{io_context}, - reader{pipe_stdout}, - process{ - exe, - boost::process::args({"pvf-worker"}), - boost::process::env(boost::process::environment()), + : process{ + exe, + boost::process::args({"pvf-worker", unix_socket_path}), + boost::process::env(boost::process::environment()), // LSAN doesn't work in secure mode #ifdef KAGOME_WITH_ASAN - boost::process::env["ASAN_OPTIONS"] = - config.disable_lsan ? "detect_leaks=0" : "", + boost::process::env["ASAN_OPTIONS"] = + config.disable_lsan ? "detect_leaks=0" : "", #endif - boost::process::std_out > pipe_stdout, - boost::process::std_in < pipe_stdin, - } { + } { } void write(Buffer data, auto cb) { @@ -68,7 +55,7 @@ namespace kagome::parachain { scale::encode(data.size()).value()); *writing = std::move(data); boost::asio::async_write( - writer, + *socket, libp2p::asioBuffer(*len), [WEAK_SELF, cb, len](boost::system::error_code ec, size_t) mutable { WEAK_LOCK(self); @@ -76,7 +63,7 @@ namespace kagome::parachain { return cb(ec); } boost::asio::async_write( - self->writer, + *self->socket, libp2p::asioBuffer(*self->writing), [weak_self, cb](boost::system::error_code ec, size_t) mutable { WEAK_LOCK(self); @@ -95,7 +82,7 @@ namespace kagome::parachain { void read(auto cb) { auto len = std::make_shared>(); boost::asio::async_read( - reader, + *socket, libp2p::asioBuffer(*len), [WEAK_SELF, cb{std::move(cb)}, len](boost::system::error_code ec, size_t) mutable { @@ -109,7 +96,7 @@ namespace kagome::parachain { } self->reading->resize(len_res.value()); boost::asio::async_read( - self->reader, + *self->socket, libp2p::asioBuffer(*self->reading), [cb{std::move(cb)}, reading{self->reading}]( boost::system::error_code ec, size_t) mutable { @@ -162,20 +149,46 @@ namespace kagome::parachain { #if defined(__linux__) && defined(KAGOME_WITH_ASAN) config.disable_lsan = !worker_config_.force_disable_secure_mode; #endif - auto process = - std::make_shared(*io_context_, exe_, config); - process->writeScale( - worker_config_, - [WEAK_SELF, job{std::move(job)}, used{std::move(used)}, process]( - outcome::result r) mutable { - WEAK_LOCK(self); - if (not r) { - return job.cb(r.error()); - } - self->writeCode(std::move(job), - {.process = std::move(process)}, - std::move(used)); - }); + auto unix_socket_path = filesystem::unique_path( + std::filesystem::path{worker_config_.cache_dir} + / "unix_socket.%%%%%%"); + std::error_code ec; + std::filesystem::remove(unix_socket_path, ec); + if (ec) { + return job.cb(ec); + } + auto acceptor = std::make_shared( + *io_context_, unix_socket_path.native()); + auto process = std::make_shared( + *io_context_, exe_, unix_socket_path, config); + acceptor->async_accept([WEAK_SELF, + job{std::move(job)}, + used, + unix_socket_path, + acceptor, + process{std::move(process)}]( + boost::system::error_code ec, + unix::socket &&socket) mutable { + std::error_code ec2; + std::filesystem::remove(unix_socket_path, ec2); + WEAK_LOCK(self); + if (ec) { + return job.cb(ec); + } + process->socket = std::move(socket); + process->writeScale( + self->worker_config_, + [weak_self, job{std::move(job)}, used{std::move(used)}, process]( + outcome::result r) mutable { + WEAK_LOCK(self); + if (not r) { + return job.cb(r.error()); + } + self->writeCode(std::move(job), + {.process = std::move(process)}, + std::move(used)); + }); + }); return; } findFree(std::move(job));