diff --git a/core/include/minifi-cpp/core/ProcessSession.h b/core/include/minifi-cpp/core/ProcessSession.h index cbc74f7323..6bd971813f 100644 --- a/core/include/minifi-cpp/core/ProcessSession.h +++ b/core/include/minifi-cpp/core/ProcessSession.h @@ -32,7 +32,6 @@ #include "ProcessContext.h" #include "core/logging/LoggerFactory.h" -#include "core/Deprecated.h" #include "FlowFile.h" #include "WeakReference.h" #include "minifi-cpp/provenance/Provenance.h" diff --git a/extension-utils/include/core/Deprecated.h b/extension-utils/include/core/Deprecated.h deleted file mode 100644 index 4dd7e54a25..0000000000 --- a/extension-utils/include/core/Deprecated.h +++ /dev/null @@ -1,29 +0,0 @@ -/** - - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBMINIFI_INCLUDE_CORE_DEPRECATED_H_ -#define LIBMINIFI_INCLUDE_CORE_DEPRECATED_H_ - -#ifdef _MSC_VER -#define DEPRECATED(v, ev) __declspec(deprecated) -#elif defined(__GNUC__) | defined(__clang__) -#define DEPRECATED(v, ev) __attribute__((__deprecated__)) -#else -#define DEPRECATED(v, ev) -#endif - -#endif /* LIBMINIFI_INCLUDE_CORE_DEPRECATED_H_ */ diff --git a/extension-utils/include/utils/OpenTelemetryLogDataModelUtils.h b/extension-utils/include/utils/OpenTelemetryLogDataModelUtils.h index c9c639116b..d46d5598a5 100644 --- a/extension-utils/include/utils/OpenTelemetryLogDataModelUtils.h +++ b/extension-utils/include/utils/OpenTelemetryLogDataModelUtils.h @@ -20,7 +20,7 @@ #include #include "rapidjson/document.h" -#include "NetworkInterfaceInfo.h" +#include "utils/net/NetworkInterfaceInfo.h" #include "utils/net/DNS.h" namespace org::apache::nifi::minifi::utils { diff --git a/extension-utils/include/utils/StagingQueue.h b/extension-utils/include/utils/StagingQueue.h deleted file mode 100644 index 6751c53010..0000000000 --- a/extension-utils/include/utils/StagingQueue.h +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include -#include "MinifiConcurrentQueue.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace utils { - -namespace internal { -template -struct default_allocator { - T operator()(size_t max_size) const { - return T::allocate(max_size); - } -}; -} // namespace internal - -/** - * Purpose: A FIFO container that allows chunked processing while trying to enforce - * soft limits like max chunk size and max total size. The "head" chunk might be - * modified in a thread-safe manner (usually appending to it) before committing it - * thus making it available for dequeuing. - */ -template> -class StagingQueue { - using Item = typename std::decay().commit())>::type; - - static_assert(std::is_same()(std::declval())), ActiveItem>::value, - "Allocator::operator(size_t) must return an ActiveItem"); - static_assert(std::is_same().size()), size_t>::value, - "Item::size must return size_t"); - static_assert(std::is_same().size()), size_t>::value, - "ActiveItem::size must return size_t"); - - template - struct FunctorCallHelper; - - template - struct FunctorCallHelper()(std::declval())), bool>::value>::type> { - static bool call(Functor&& fn, Arg&& arg) { - return std::forward(fn)(std::forward(arg)); - } - }; - - template - struct FunctorCallHelper()(std::declval())), void>::value>::type> { - static bool call(Functor&& fn, Arg&& arg) { - std::forward(fn)(std::forward(arg)); - return false; - } - }; - - static ActiveItem allocateActiveItem(const Allocator& allocator, size_t max_item_size) { - // max_size is a soft limit, i.e. reaching max_size is an indicator - // that that item should be committed, we cannot guarantee that only - // max_size content is in the item, since max_size is the "trigger limit", - // presumable each item would contain (at the trigger point) a little - // more than max_size content, that is the reasoning behind "* 3 / 2" - return allocator(max_item_size * 3 / 2); - } - - public: - StagingQueue(size_t max_size, size_t max_item_size, Allocator allocator = {}) - : max_size_(max_size), - max_item_size_(max_item_size), - active_item_(allocateActiveItem(allocator, max_item_size)), - allocator_(allocator) {} - - void commit() { - std::unique_lock lock{active_item_mutex_}; - if (active_item_.size() == 0) { - // nothing to commit - return; - } - commit(lock); - } - - /** - * Allows thread-safe modification of the "live" instance. - * @tparam Functor - * @param fn callable which can modify the instance, should return true - * if it would like to force a commit - */ - template - void modify(Functor&& fn) { - std::unique_lock lock{active_item_mutex_}; - size_t original_size = active_item_.size(); - bool should_commit = FunctorCallHelper::call(std::forward(fn), active_item_); - size_t new_size = active_item_.size(); - if (new_size >= original_size) { - total_size_ += new_size - original_size; - } else { - total_size_ -= original_size - new_size; - } - if (should_commit || new_size > max_item_size_) { - commit(lock); - } - } - - template - bool tryDequeue(Item& out, const std::chrono::duration& time) { - if (time == std::chrono::duration{0}) { - return tryDequeue(out); - } - if (queue_.dequeueWaitFor(out, time)) { - total_size_ -= out.size(); - return true; - } - return false; - } - - bool tryDequeue(Item& out) { - if (queue_.tryDequeue(out)) { - total_size_ -= out.size(); - return true; - } - return false; - } - - size_t getMaxSize() const { - return max_size_; - } - - size_t getMaxItemSize() const { - return max_item_size_; - } - - void discardOverflow() { - while (total_size_ > max_size_) { - Item item; - if (!queue_.tryDequeue(item)) { - break; - } - total_size_ -= item.size(); - } - } - - size_t size() const { - return total_size_; - } - - size_t itemCount() const { - return queue_.size(); - } - - private: - void commit(std::unique_lock& /*lock*/) { - queue_.enqueue(active_item_.commit()); - active_item_ = allocateActiveItem(allocator_, max_item_size_); - } - - const size_t max_size_; - const size_t max_item_size_; - std::atomic total_size_{0}; - - std::mutex active_item_mutex_; - ActiveItem active_item_; - - const Allocator allocator_; - - ConditionConcurrentQueue queue_; -}; - -} // namespace utils -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org diff --git a/extension-utils/src/Exception.cpp b/extension-utils/src/Exception.cpp deleted file mode 100644 index c9e622077c..0000000000 --- a/extension-utils/src/Exception.cpp +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "Exception.h" - -#ifndef WIN32 -#include -#endif // !WIN32 - -#include - -namespace org::apache::nifi::minifi { -std::string getCurrentExceptionTypeName() { -#ifndef WIN32 - const std::type_info* exception_type = abi::__cxa_current_exception_type(); - if (exception_type) { - return exception_type->name(); - } -#endif // !WIN32 - try { - std::rethrow_exception(std::current_exception()); - } catch (const std::exception& ex) { - return typeid(ex).name(); - } catch (...) { } - - return {}; -} -} // namespace org::apache::nifi::minifi diff --git a/extension-utils/src/utils/NetworkInterfaceInfo.cpp b/extension-utils/src/utils/NetworkInterfaceInfo.cpp deleted file mode 100644 index f61b9ba821..0000000000 --- a/extension-utils/src/utils/NetworkInterfaceInfo.cpp +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "utils/NetworkInterfaceInfo.h" -#include "utils/net/Socket.h" -#include "core/logging/LoggerFactory.h" -#ifdef WIN32 -#include -#pragma comment(lib, "IPHLPAPI.lib") -#include "utils/OsUtils.h" -#include "utils/UnicodeConversion.h" -#else -#include -#include -#include -#include -#include -#include -#endif - -namespace org::apache::nifi::minifi::utils { - -std::shared_ptr NetworkInterfaceInfo::logger_ = core::logging::LoggerFactory::getLogger(); - -#ifdef WIN32 - -NetworkInterfaceInfo::NetworkInterfaceInfo(const IP_ADAPTER_ADDRESSES* adapter) - : name_(to_string(adapter->FriendlyName)), - running_(adapter->OperStatus == IfOperStatusUp), - loopback_(adapter->IfType == IF_TYPE_SOFTWARE_LOOPBACK) { - for (auto unicast_address = adapter->FirstUnicastAddress; unicast_address != nullptr; unicast_address = unicast_address->Next) { - if (unicast_address->Address.lpSockaddr->sa_family == AF_INET) { - ip_v4_addresses_.push_back(net::sockaddr_ntop(unicast_address->Address.lpSockaddr)); - } else if (unicast_address->Address.lpSockaddr->sa_family == AF_INET6) { - ip_v6_addresses_.push_back(net::sockaddr_ntop(unicast_address->Address.lpSockaddr)); - } - } -} -#else -NetworkInterfaceInfo::NetworkInterfaceInfo(const struct ifaddrs* ifa) - : name_(ifa->ifa_name), - running_(ifa->ifa_flags & IFF_RUNNING), - loopback_(ifa->ifa_flags & IFF_LOOPBACK) { - if (ifa->ifa_addr->sa_family == AF_INET) { - ip_v4_addresses_.push_back(net::sockaddr_ntop(ifa->ifa_addr)); - } else if (ifa->ifa_addr->sa_family == AF_INET6) { - ip_v6_addresses_.push_back(net::sockaddr_ntop(ifa->ifa_addr)); - } -} -#endif - -namespace { -struct HasName { - explicit HasName(const std::string& name) : name_(name) {} - bool operator()(const NetworkInterfaceInfo& network_interface_info) { - return network_interface_info.getName() == name_; - } - const std::string& name_; -}; -} - -std::vector NetworkInterfaceInfo::getNetworkInterfaceInfos(const std::function& filter, - const std::optional max_interfaces) { - std::vector network_adapters; -#ifdef WIN32 - ULONG buffer_length = sizeof(IP_ADAPTER_ADDRESSES); - auto get_adapters_err = GetAdaptersAddresses(0, 0, nullptr, nullptr, &buffer_length); - if (ERROR_BUFFER_OVERFLOW != get_adapters_err) { - logger_->log_error("GetAdaptersAddresses failed: {}", get_adapters_err); - return network_adapters; - } - std::vector bytes(buffer_length, 0); - auto* adapter = reinterpret_cast(bytes.data()); - get_adapters_err = GetAdaptersAddresses(0, 0, nullptr, adapter, &buffer_length); - if (NO_ERROR != get_adapters_err) { - logger_->log_error("GetAdaptersAddresses failed: {}", get_adapters_err); - return network_adapters; - } - while (adapter != nullptr) { - NetworkInterfaceInfo interface_info(adapter); - if (filter(interface_info)) { - auto it = std::find_if(network_adapters.begin(), network_adapters.end(), HasName(interface_info.getName())); - if (it == network_adapters.end()) { - network_adapters.emplace_back(std::move(interface_info)); - } else { - interface_info.moveAddressesInto(*it); - } - } - if (max_interfaces.has_value() && network_adapters.size() >= max_interfaces.value()) - return network_adapters; - adapter = adapter->Next; - } -#else - struct ifaddrs* interface_addresses = nullptr; - auto cleanup = gsl::finally([&interface_addresses] { freeifaddrs(interface_addresses); }); - if (getifaddrs(&interface_addresses) == -1) { - logger_->log_error("getifaddrs failed: {}", std::strerror(errno)); - return network_adapters; - } - - for (struct ifaddrs* ifa = interface_addresses; ifa != nullptr; ifa = ifa->ifa_next) { - if (!ifa->ifa_addr) - continue; - NetworkInterfaceInfo interface_info(ifa); - if (filter(interface_info)) { - auto it = std::find_if(network_adapters.begin(), network_adapters.end(), HasName(interface_info.getName())); - if (it == network_adapters.end()) { - network_adapters.emplace_back(std::move(interface_info)); - } else { - interface_info.moveAddressesInto(*it); - } - } - if (max_interfaces.has_value() && network_adapters.size() >= max_interfaces.value()) - return network_adapters; - } -#endif - return network_adapters; -} - -namespace { -void move_append(std::vector &&source, std::vector &destination) { - destination.reserve(destination.size() + source.size()); - std::move(std::begin(source), std::end(source), std::back_inserter(destination)); - source.clear(); -} -} // namespace - -void NetworkInterfaceInfo::moveAddressesInto(NetworkInterfaceInfo& destination) { - move_append(std::move(ip_v4_addresses_), destination.ip_v4_addresses_); - move_append(std::move(ip_v6_addresses_), destination.ip_v6_addresses_); -} - -} // namespace org::apache::nifi::minifi::utils diff --git a/extensions/rocksdb-repos/controllers/RocksDbStateStorage.h b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.h index 0805a93af3..6c9f8a85ac 100644 --- a/extensions/rocksdb-repos/controllers/RocksDbStateStorage.h +++ b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.h @@ -20,7 +20,7 @@ #include #include -#include "controllers/keyvalue/AutoPersistor.h" +#include "utils/AutoPersistor.h" #include "controllers/keyvalue/KeyValueStateStorage.h" #include "core/Core.h" #include "core/logging/Logger.h" diff --git a/extensions/standard-processors/controllers/PersistentMapStateStorage.h b/extensions/standard-processors/controllers/PersistentMapStateStorage.h index 689d336c0e..86096a07b5 100644 --- a/extensions/standard-processors/controllers/PersistentMapStateStorage.h +++ b/extensions/standard-processors/controllers/PersistentMapStateStorage.h @@ -22,7 +22,7 @@ #include #include -#include "controllers/keyvalue/AutoPersistor.h" +#include "utils/AutoPersistor.h" #include "core/Core.h" #include "properties/Configure.h" #include "InMemoryKeyValueStorage.h" diff --git a/extensions/standard-processors/processors/AppendHostInfo.cpp b/extensions/standard-processors/processors/AppendHostInfo.cpp index b2d0b3f291..33d9196865 100644 --- a/extensions/standard-processors/processors/AppendHostInfo.cpp +++ b/extensions/standard-processors/processors/AppendHostInfo.cpp @@ -31,7 +31,7 @@ #include "core/ProcessSession.h" #include "core/FlowFile.h" #include "core/Resource.h" -#include "utils/NetworkInterfaceInfo.h" +#include "utils/net/NetworkInterfaceInfo.h" #include "utils/net/DNS.h" namespace org::apache::nifi::minifi::processors { diff --git a/libminifi/include/controllers/keyvalue/AutoPersistor.h b/libminifi/include/controllers/keyvalue/AutoPersistor.h deleted file mode 100644 index 3cefe8c514..0000000000 --- a/libminifi/include/controllers/keyvalue/AutoPersistor.h +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include -#include -#include -#include -#include -#include - -#include "core/ConfigurableComponent.h" -#include "core/Core.h" -#include "core/logging/LoggerFactory.h" -#include "utils/Export.h" - -namespace org::apache::nifi::minifi::controllers { - -/** - * Persists in given intervals. - * Has an own thread, so stop() must be called before destruction of data used by persist_. - */ -class AutoPersistor { - public: - ~AutoPersistor(); - - void start(bool always_persist, std::chrono::milliseconds auto_persistence_interval, std::function persist); - void stop(); - - [[nodiscard]] bool isAlwaysPersisting() const { - return always_persist_; - } - - private: - void persistingThreadFunc(); - - bool always_persist_ = false; - std::chrono::milliseconds auto_persistence_interval_{0}; - std::thread persisting_thread_; - bool running_ = false; - std::mutex persisting_mutex_; - std::condition_variable persisting_cv_; - std::function persist_; - std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger(); -}; - -} // namespace org::apache::nifi::minifi::controllers diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h index 745ee97dfa..14adb56751 100644 --- a/libminifi/include/core/ProcessSession.h +++ b/libminifi/include/core/ProcessSession.h @@ -34,7 +34,6 @@ #include "FlowFileRecord.h" #include "Exception.h" #include "core/logging/LoggerFactory.h" -#include "core/Deprecated.h" #include "FlowFile.h" #include "WeakReference.h" #include "provenance/Provenance.h" diff --git a/libminifi/include/core/state/nodes/DeviceInformation.h b/libminifi/include/core/state/nodes/DeviceInformation.h index 88480c7b7d..d7ebf47c82 100644 --- a/libminifi/include/core/state/nodes/DeviceInformation.h +++ b/libminifi/include/core/state/nodes/DeviceInformation.h @@ -48,8 +48,6 @@ #include "core/state/nodes/MetricsBase.h" #include "Connection.h" -#include "utils/OsUtils.h" -#include "utils/NetworkInterfaceInfo.h" #include "utils/SystemCpuUsageTracker.h" #include "utils/Export.h" diff --git a/libminifi/include/utils/CallBackTimer.h b/libminifi/include/utils/CallBackTimer.h deleted file mode 100644 index c5fe96b2c6..0000000000 --- a/libminifi/include/utils/CallBackTimer.h +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef LIBMINIFI_INCLUDE_UTILS_CALLBACKTIMER_H_ -#define LIBMINIFI_INCLUDE_UTILS_CALLBACKTIMER_H_ - -#include -#include -#include -#include -#include - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace utils { - -class CallBackTimer { - public: - CallBackTimer(std::chrono::milliseconds interval, const std::function& func); - ~CallBackTimer(); - - void stop(); - - void start(); - - bool is_running() const; - - private: - bool execute_; - std::function func_; - std::thread thd_; - mutable std::mutex mtx_; - mutable std::mutex cv_mtx_; - std::condition_variable cv_; - - const std::chrono::milliseconds interval_; -}; - -} // namespace utils -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org - -#endif // LIBMINIFI_INCLUDE_UTILS_CALLBACKTIMER_H_ - diff --git a/libminifi/include/utils/ClassUtils.h b/libminifi/include/utils/ClassUtils.h deleted file mode 100644 index 3726651669..0000000000 --- a/libminifi/include/utils/ClassUtils.h +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include -#include - -namespace org::apache::nifi::minifi::utils::ClassUtils { - -/** - * Shortens class names via the canonical representation ( package with name ) - * @param class_name input class name - * @param out output class name that is shortened. - * @return true if out has been updated, false otherwise - */ -bool shortenClassName(std::string_view class_name, std::string &out); - -} // namespace org::apache::nifi::minifi::utils::ClassUtils diff --git a/libminifi/include/utils/CollectionUtils.h b/libminifi/include/utils/CollectionUtils.h deleted file mode 100644 index 46b3f4f832..0000000000 --- a/libminifi/include/utils/CollectionUtils.h +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace utils { - -namespace internal { - -template -struct find_in_range { - static auto call(const T& range, const Arg& arg) -> decltype(std::find(range.begin(), range.end(), arg)) { - return std::find(range.begin(), range.end(), arg); - } -}; - -template -struct find_in_range().find(std::declval()), void())> { - static auto call(const T& range, const Arg& arg) -> decltype(range.find(arg)) { - return range.find(arg); - } -}; - -} // namespace internal - -template -bool haveCommonItem(const T& a, const U& b) { - using Item = typename T::value_type; - return std::any_of(a.begin(), a.end(), [&] (const Item& item) { - return internal::find_in_range::call(b, item) != b.end(); - }); -} - -} // namespace utils -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org diff --git a/libminifi/include/utils/Cron.h b/libminifi/include/utils/Cron.h deleted file mode 100644 index 97b78e3d99..0000000000 --- a/libminifi/include/utils/Cron.h +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include -#include -#include -#include -#include -#include -#include "date/tz.h" -#include "Exception.h" - -namespace org::apache::nifi::minifi::utils { -class BadCronExpression : public minifi::Exception { - public: - explicit BadCronExpression(const std::string& errmsg) : minifi::Exception(errmsg) {} -}; - -class CronField { - public: - virtual ~CronField() = default; - - [[nodiscard]] virtual bool matches(date::local_seconds time_point) const = 0; - virtual bool operator==(const CronField&) const { throw std::runtime_error("not implemented"); } -}; - -class Cron { - public: - explicit Cron(const std::string& expression); - - [[nodiscard]] std::optional calculateNextTrigger(date::local_seconds start) const; - - std::unique_ptr second_; - std::unique_ptr minute_; - std::unique_ptr hour_; - std::unique_ptr day_; - std::unique_ptr month_; - std::unique_ptr day_of_week_; - std::unique_ptr year_; -}; - -} // namespace org::apache::nifi::minifi::utils diff --git a/libminifi/include/utils/Error.h b/libminifi/include/utils/Error.h deleted file mode 100644 index 77ef4a6aca..0000000000 --- a/libminifi/include/utils/Error.h +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -namespace org::apache::nifi::minifi::utils { - -std::error_code getLastError(); - -} // namespace org::apache::nifi::minifi::utils diff --git a/libminifi/include/utils/Export.h b/libminifi/include/utils/Export.h deleted file mode 100644 index 175821d93c..0000000000 --- a/libminifi/include/utils/Export.h +++ /dev/null @@ -1,35 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#ifdef WIN32 - #ifdef LIBMINIFI - #define MINIFIAPI __declspec(dllexport) - #else - #define MINIFIAPI __declspec(dllimport) - #endif - #ifdef MODULE_NAME - #define EXTENSIONAPI __declspec(dllexport) - #else - #define EXTENSIONAPI __declspec(dllimport) - #endif -#else - #define MINIFIAPI - #define EXTENSIONAPI -#endif diff --git a/libminifi/include/utils/FifoExecutor.h b/libminifi/include/utils/FifoExecutor.h deleted file mode 100644 index 1aca59d560..0000000000 --- a/libminifi/include/utils/FifoExecutor.h +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include - -#include "utils/MinifiConcurrentQueue.h" - -namespace org::apache::nifi::minifi::utils { - -namespace detail { -class WorkerThread final { - public: - WorkerThread(); - - WorkerThread(const WorkerThread&) = delete; - WorkerThread(WorkerThread&&) = delete; - WorkerThread& operator=(WorkerThread) = delete; - - ~WorkerThread(); - - template - void enqueue(Args&&... args) { task_queue_.enqueue(std::forward(args)...); } - - private: - void run() noexcept; - - utils::ConditionConcurrentQueue> task_queue_; - std::thread thread_; -}; -} // namespace detail - -/** - * Executes arbitrary functions with no parameters asynchronously on an internal thread, returning a future to the result. - */ -class FifoExecutor final { - public: - template - auto enqueue(Func func) -> std::future { - using result_type = decltype(func()); - std::packaged_task task{std::move(func)}; - auto future = task.get_future(); - worker_thread_.enqueue(std::move(task)); - return future; - } - private: - detail::WorkerThread worker_thread_; -}; - -} // namespace org::apache::nifi::minifi::utils diff --git a/libminifi/include/utils/FileMutex.h b/libminifi/include/utils/FileMutex.h deleted file mode 100644 index 54596ead2e..0000000000 --- a/libminifi/include/utils/FileMutex.h +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include -#include -#include "utils/gsl.h" - -#ifdef WIN32 -#include -#endif - -namespace org::apache::nifi::minifi::utils { - -// Warning: this will write the pid of the current process into the file -class FileMutex { - public: - explicit FileMutex(std::filesystem::path path); - ~FileMutex() { - gsl_Expects(!file_handle_.has_value()); - } - - FileMutex(const FileMutex&) = delete; - FileMutex(FileMutex&&) = delete; - FileMutex& operator=(const FileMutex&) = delete; - FileMutex& operator=(FileMutex&&) = delete; - - void lock(); - void unlock(); - - private: - std::filesystem::path path_; - - std::mutex mtx_; -#ifdef WIN32 - std::optional file_handle_; -#else - std::optional file_handle_; -#endif -}; - -} // namespace org::apache::nifi::minifi::utils diff --git a/libminifi/include/utils/IntervalSwitch.h b/libminifi/include/utils/IntervalSwitch.h deleted file mode 100644 index 0052b4e006..0000000000 --- a/libminifi/include/utils/IntervalSwitch.h +++ /dev/null @@ -1,74 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include - -#include "utils/gsl.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace utils { - -enum class IntervalSwitchState { - LOWER, - UPPER, -}; - -namespace detail { -struct SwitchReturn { - IntervalSwitchState state; - bool switched; -}; -} // namespace detail - -template> -class IntervalSwitch { - public: - IntervalSwitch(T lower_threshold, T upper_threshold, const IntervalSwitchState initial_state = IntervalSwitchState::UPPER) - :lower_threshold_{std::move(lower_threshold)}, upper_threshold_{std::move(upper_threshold)}, state_{initial_state} { - gsl_Expects(!less_(upper_threshold_, lower_threshold_)); - } - - detail::SwitchReturn operator()(const T& value) { - const auto old_state = state_; - if (less_(value, lower_threshold_)) { - state_ = IntervalSwitchState::LOWER; - } else if (!less_(value, upper_threshold_)) { - state_ = IntervalSwitchState::UPPER; - } - return {state_, state_ != old_state}; - } - - private: - T lower_threshold_; - T upper_threshold_; - Comp less_; - - IntervalSwitchState state_; -}; - -} // namespace utils -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org diff --git a/libminifi/include/utils/JsonCallback.h b/libminifi/include/utils/JsonCallback.h deleted file mode 100644 index c455e6c348..0000000000 --- a/libminifi/include/utils/JsonCallback.h +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include -#include -#include -#include - -#include "rapidjson/stream.h" -#include "rapidjson/writer.h" -#include "rapidjson/prettywriter.h" - -#include "io/StreamPipe.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace utils { - -class JsonInputCallback { - public: - explicit JsonInputCallback(rapidjson::Document& document) : document_(document) {} - int64_t operator()(const std::shared_ptr& stream) { - std::string content; - content.resize(stream->size()); - const auto read_ret = stream->read(as_writable_bytes(std::span(content))); - if (io::isError(read_ret)) { - return -1; - } - rapidjson::ParseResult parse_result = document_.Parse(content.data()); - if (parse_result.IsError()) - return -1; - - return read_ret; - } - private: - rapidjson::Document& document_; -}; - -class JsonOutputCallback { - public: - explicit JsonOutputCallback(rapidjson::Document&& root, std::optional decimal_places) - : root_(std::move(root)), decimal_places_(decimal_places) {} - - int64_t operator()(const std::shared_ptr& stream) const { - rapidjson::StringBuffer buffer; - rapidjson::Writer writer(buffer); - if (decimal_places_.has_value()) - writer.SetMaxDecimalPlaces(decimal_places_.value()); - root_.Accept(writer); - const auto write_return = stream->write(reinterpret_cast(buffer.GetString()), buffer.GetSize()); - return !io::isError(write_return) ? gsl::narrow(write_return) : -1; - } - - protected: - rapidjson::Document root_; - std::optional decimal_places_; -}; - -class PrettyJsonOutputCallback { - public: - explicit PrettyJsonOutputCallback(rapidjson::Document&& root, std::optional decimal_places) - : root_(std::move(root)), decimal_places_(decimal_places) {} - - int64_t operator()(const std::shared_ptr& stream) const { - rapidjson::StringBuffer buffer; - rapidjson::PrettyWriter writer(buffer); - if (decimal_places_.has_value()) - writer.SetMaxDecimalPlaces(decimal_places_.value()); - root_.Accept(writer); - const auto write_return = stream->write(reinterpret_cast(buffer.GetString()), buffer.GetSize()); - return !io::isError(write_return) ? gsl::narrow(write_return) : -1; - } - - protected: - rapidjson::Document root_; - std::optional decimal_places_; -}; - -} // namespace utils -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org diff --git a/libminifi/include/utils/LineByLineInputOutputStreamCallback.h b/libminifi/include/utils/LineByLineInputOutputStreamCallback.h deleted file mode 100644 index 0d68fe5680..0000000000 --- a/libminifi/include/utils/LineByLineInputOutputStreamCallback.h +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include -#include -#include -#include -#include - -#include "core/logging/Logger.h" -#include "io/InputStream.h" -#include "io/OutputStream.h" -#include "io/StreamPipe.h" - -namespace org::apache::nifi::minifi::utils { - -class LineByLineInputOutputStreamCallback { - public: - using CallbackType = std::function; - explicit LineByLineInputOutputStreamCallback(CallbackType callback); - int64_t operator()(const std::shared_ptr& input, const std::shared_ptr& output); - - private: - int64_t readInput(io::InputStream& stream); - void readLine(); - [[nodiscard]] bool isLastLine() const { return !next_line_.has_value(); } - - CallbackType callback_; - std::vector input_; - std::vector::iterator current_pos_{}; - std::optional current_line_; - std::optional next_line_; -}; - -} // namespace org::apache::nifi::minifi::utils diff --git a/libminifi/include/utils/LogUtils.h b/libminifi/include/utils/LogUtils.h deleted file mode 100644 index c8d852c9b7..0000000000 --- a/libminifi/include/utils/LogUtils.h +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include -#include - -#include "utils/Enum.h" - -namespace org::apache::nifi::minifi::utils::LogUtils { - -enum class LogLevelOption { - LOGGING_TRACE, - LOGGING_DEBUG, - LOGGING_INFO, - LOGGING_WARN, - LOGGING_ERROR, - LOGGING_CRITICAL, - LOGGING_OFF -}; - -inline LogLevelOption mapToLogLevelOption(core::logging::LOG_LEVEL level) { - switch (level) { - case core::logging::trace: return LogLevelOption::LOGGING_TRACE; - case core::logging::debug: return LogLevelOption::LOGGING_DEBUG; - case core::logging::info: return LogLevelOption::LOGGING_INFO; - case core::logging::warn: return LogLevelOption::LOGGING_WARN; - case core::logging::err: return LogLevelOption::LOGGING_ERROR; - case core::logging::critical: return LogLevelOption::LOGGING_CRITICAL; - case core::logging::off: return LogLevelOption::LOGGING_OFF; - } - throw std::invalid_argument(fmt::format("Invalid LOG_LEVEL {}", magic_enum::enum_underlying(level))); -} - -inline core::logging::LOG_LEVEL mapToLogLevel(LogLevelOption option) { - switch (option) { - case LogLevelOption::LOGGING_TRACE: return core::logging::trace; - case LogLevelOption::LOGGING_DEBUG: return core::logging::debug; - case LogLevelOption::LOGGING_INFO: return core::logging::info; - case LogLevelOption::LOGGING_WARN: return core::logging::warn; - case LogLevelOption::LOGGING_ERROR: return core::logging::err; - case LogLevelOption::LOGGING_CRITICAL: return core::logging::critical; - case LogLevelOption::LOGGING_OFF: return core::logging::off; - } - throw std::invalid_argument(fmt::format("Invalid LogLevelOption {}", magic_enum::enum_underlying(option))); -} - -} // namespace org::apache::nifi::minifi::utils::LogUtils - -namespace magic_enum::customize { -using LogLevelOption = org::apache::nifi::minifi::utils::LogUtils::LogLevelOption; - -template <> -constexpr customize_t enum_name(LogLevelOption value) noexcept { - switch (value) { - case LogLevelOption::LOGGING_TRACE: - return "TRACE"; - case LogLevelOption::LOGGING_DEBUG: - return "DEBUG"; - case LogLevelOption::LOGGING_INFO: - return "INFO"; - case LogLevelOption::LOGGING_WARN: - return "WARN"; - case LogLevelOption::LOGGING_ERROR: - return "ERROR"; - case LogLevelOption::LOGGING_CRITICAL: - return "CRITICAL"; - case LogLevelOption::LOGGING_OFF: - return "OFF"; - } - return invalid_tag; -} -} // namespace magic_enum::customize diff --git a/libminifi/include/utils/MapUtils.h b/libminifi/include/utils/MapUtils.h deleted file mode 100644 index 55011d78df..0000000000 --- a/libminifi/include/utils/MapUtils.h +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace utils { -namespace MapUtils { - -/** - * Return a set of keys from a map - */ -template