From 3b5de797d1a07060aabe8666ba9fd7e3364cdc68 Mon Sep 17 00:00:00 2001 From: Juliusz Stasiewicz Date: Tue, 9 Jun 2020 17:41:57 +0200 Subject: [PATCH 1/9] build: Added choice between C++ standard versions This required supplying std::optional from a 3rd party implementation (by Andrzej Krzemienski). The selection of `optional` is done on the basis of C++ standard version, e.g.: cmake -DCASS_CPP_STANDARD=17 .. will use std::optional, while `-DCASS_CPP_STANDARD=11` will fall back to the 3rd party impl. --- CMakeLists.txt | 12 +- driver_config.hpp.in | 1 + LICENSE.txt => licenses/apache-2.0.txt | 0 licenses/boost-1.0.txt | 23 + src/CMakeLists.txt | 9 + src/optional.hpp | 31 + src/optional/optional_akrzemi.hpp | 1095 ++++++++++++++++++++++++ src/optional/optional_std.hpp | 14 + 8 files changed, 1179 insertions(+), 6 deletions(-) rename LICENSE.txt => licenses/apache-2.0.txt (100%) create mode 100644 licenses/boost-1.0.txt create mode 100644 src/optional.hpp create mode 100644 src/optional/optional_akrzemi.hpp create mode 100644 src/optional/optional_std.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index c6150948b..1abac30ef 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 2.8.12) +cmake_minimum_required(VERSION 3.1) project(cassandra C CXX) set(CASS_ROOT_DIR ${CMAKE_CURRENT_SOURCE_DIR}) @@ -43,10 +43,12 @@ option(CASS_USE_KERBEROS "Use Kerberos" OFF) option(CASS_USE_LIBSSH2 "Use libssh2 for integration tests" OFF) option(CASS_USE_OPENSSL "Use OpenSSL" ON) option(CASS_USE_STATIC_LIBS "Link static libraries when building executables" OFF) -option(CASS_USE_STD_ATOMIC "Use C++11 atomics library" OFF) +option(CASS_USE_STD_ATOMIC "Use std::atomic library" ON) option(CASS_USE_ZLIB "Use zlib" ON) option(CASS_USE_TIMERFD "Use timerfd (Linux only)" ON) +set(CASS_CPP_STANDARD "11" CACHE STRING "C++ standard (11, 14, 17, etc.)") + # Handle testing dependencies if(CASS_BUILD_TESTS) # Enable integration and unit tests @@ -161,12 +163,10 @@ endif() # Top-level compiler flags #------------------------ +set (CMAKE_CXX_STANDARD ${CASS_CPP_STANDARD}) + if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang" OR "${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU") - # Enable C++11 support to use std::atomic - if(CASS_USE_STD_ATOMIC) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") - endif() # OpenSSL is deprecated on later versions of Mac OS X. The long-term solution # is to provide a CommonCryto implementation. diff --git a/driver_config.hpp.in b/driver_config.hpp.in index 0aa5f6f05..9694e12c6 100644 --- a/driver_config.hpp.in +++ b/driver_config.hpp.in @@ -4,6 +4,7 @@ #cmakedefine HAVE_KERBEROS #cmakedefine HAVE_OPENSSL #cmakedefine HAVE_STD_ATOMIC +#cmakedefine CASS_CPP_STANDARD @CASS_CPP_STANDARD@ #cmakedefine HAVE_BOOST_ATOMIC #cmakedefine HAVE_NOSIGPIPE #cmakedefine HAVE_SIGTIMEDWAIT diff --git a/LICENSE.txt b/licenses/apache-2.0.txt similarity index 100% rename from LICENSE.txt rename to licenses/apache-2.0.txt diff --git a/licenses/boost-1.0.txt b/licenses/boost-1.0.txt new file mode 100644 index 000000000..127a5bc39 --- /dev/null +++ b/licenses/boost-1.0.txt @@ -0,0 +1,23 @@ +Boost Software License - Version 1.0 - August 17th, 2003 + +Permission is hereby granted, free of charge, to any person or organization +obtaining a copy of the software and accompanying documentation covered by +this license (the "Software") to use, reproduce, display, distribute, +execute, and transmit the Software, and to prepare derivative works of the +Software, and to permit third-parties to whom the Software is furnished to +do so, all subject to the following: + +The copyright notices in the Software and this entire statement, including +the above license grant, this restriction and the following disclaimer, +must be included in all copies of the Software, in whole or in part, and +all derivative works of the Software, unless such copies or derivative +works are solely in the form of machine-executable object code generated by +a source language processor. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT +SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE +FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE, +ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 06b84b759..cfedd739f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -36,6 +36,15 @@ else() endif() endif() +# Determine `optional` library to include +if(CMAKE_CXX_STANDARD LESS 17) + message(STATUS "Using akrzemi's `optional` implementation") + list(APPEND SOURCES optional/optional_akrzemi.hpp) +else() + message(STATUS "Using std::optional library") + list(APPEND SOURCES optional/optional_std.hpp) +endif() + add_subdirectory(third_party/curl) add_subdirectory(third_party/hdr_histogram) add_subdirectory(third_party/http-parser) diff --git a/src/optional.hpp b/src/optional.hpp new file mode 100644 index 000000000..18d941345 --- /dev/null +++ b/src/optional.hpp @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2020 ScyllaDB + * + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#ifndef DATASTAX_INTERNAL_OPTIONAL_HPP +#define DATASTAX_INTERNAL_OPTIONAL_HPP + +#include "driver_config.hpp" + +#if CASS_CPP_STANDARD >= 17 + #include "optional/optional_std.hpp" +#else + #include "optional/optional_akrzemi.hpp" +#endif + +#endif /* DATASTAX_INTERNAL_OPTIONAL_HPP */ diff --git a/src/optional/optional_akrzemi.hpp b/src/optional/optional_akrzemi.hpp new file mode 100644 index 000000000..6f0f63f82 --- /dev/null +++ b/src/optional/optional_akrzemi.hpp @@ -0,0 +1,1095 @@ +// Copyright (C) 2011 - 2012 Andrzej Krzemienski. +// +// Use, modification, and distribution is subject to the Boost Software +// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// +// The idea and interface is based on Boost.Optional library +// authored by Fernando Luis Cacciola Carballal + +/* + * Modified by ScyllaDB + * Copyright (C) 2020 ScyllaDB + * + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +# ifndef OPTIONAL_AKRZEMI_HPP +# define OPTIONAL_AKRZEMI_HPP + +# include +# include +# include +# include +# include +# include +# include + +# define TR2_OPTIONAL_REQUIRES(...) typename enable_if<__VA_ARGS__::value, bool>::type = false + +# if defined __GNUC__ // NOTE: GNUC is also defined for Clang +# if (__GNUC__ == 4) && (__GNUC_MINOR__ >= 8) +# define TR2_OPTIONAL_GCC_4_8_AND_HIGHER___ +# elif (__GNUC__ > 4) +# define TR2_OPTIONAL_GCC_4_8_AND_HIGHER___ +# endif + +# if (__GNUC__ == 4) && (__GNUC_MINOR__ >= 7) +# define TR2_OPTIONAL_GCC_4_7_AND_HIGHER___ +# elif (__GNUC__ > 4) +# define TR2_OPTIONAL_GCC_4_7_AND_HIGHER___ +# endif + +# if (__GNUC__ == 4) && (__GNUC_MINOR__ == 8) && (__GNUC_PATCHLEVEL__ >= 1) +# define TR2_OPTIONAL_GCC_4_8_1_AND_HIGHER___ +# elif (__GNUC__ == 4) && (__GNUC_MINOR__ >= 9) +# define TR2_OPTIONAL_GCC_4_8_1_AND_HIGHER___ +# elif (__GNUC__ > 4) +# define TR2_OPTIONAL_GCC_4_8_1_AND_HIGHER___ +# endif +# endif + +# if defined __clang_major__ +# if (__clang_major__ == 3 && __clang_minor__ >= 5) +# define TR2_OPTIONAL_CLANG_3_5_AND_HIGHTER_ +# elif (__clang_major__ > 3) +# define TR2_OPTIONAL_CLANG_3_5_AND_HIGHTER_ +# endif +# if defined TR2_OPTIONAL_CLANG_3_5_AND_HIGHTER_ +# define TR2_OPTIONAL_CLANG_3_4_2_AND_HIGHER_ +# elif (__clang_major__ == 3 && __clang_minor__ == 4 && __clang_patchlevel__ >= 2) +# define TR2_OPTIONAL_CLANG_3_4_2_AND_HIGHER_ +# endif +# endif + +# if defined _MSC_VER +# if (_MSC_VER >= 1900) +# define TR2_OPTIONAL_MSVC_2015_AND_HIGHER___ +# endif +# endif + +# if defined __clang__ +# if (__clang_major__ > 2) || (__clang_major__ == 2) && (__clang_minor__ >= 9) +# define OPTIONAL_HAS_THIS_RVALUE_REFS 1 +# else +# define OPTIONAL_HAS_THIS_RVALUE_REFS 0 +# endif +# elif defined TR2_OPTIONAL_GCC_4_8_1_AND_HIGHER___ +# define OPTIONAL_HAS_THIS_RVALUE_REFS 1 +# elif defined TR2_OPTIONAL_MSVC_2015_AND_HIGHER___ +# define OPTIONAL_HAS_THIS_RVALUE_REFS 1 +# else +# define OPTIONAL_HAS_THIS_RVALUE_REFS 0 +# endif + + +# if defined TR2_OPTIONAL_GCC_4_8_1_AND_HIGHER___ +# define OPTIONAL_HAS_CONSTEXPR_INIT_LIST 1 +# define OPTIONAL_CONSTEXPR_INIT_LIST constexpr +# else +# define OPTIONAL_HAS_CONSTEXPR_INIT_LIST 0 +# define OPTIONAL_CONSTEXPR_INIT_LIST +# endif + +# if defined TR2_OPTIONAL_CLANG_3_5_AND_HIGHTER_ && (defined __cplusplus) && (__cplusplus != 201103L) +# define OPTIONAL_HAS_MOVE_ACCESSORS 1 +# else +# define OPTIONAL_HAS_MOVE_ACCESSORS 0 +# endif + +// In C++11 constexpr implies const, so we need to make non-const members also non-constexpr +# if (defined __cplusplus) && (__cplusplus == 201103L) +# define OPTIONAL_MUTABLE_CONSTEXPR +# else +# define OPTIONAL_MUTABLE_CONSTEXPR constexpr +# endif + +namespace std{ + +namespace akrzemi{ + +// BEGIN workaround for missing is_trivially_destructible +# if defined TR2_OPTIONAL_GCC_4_8_AND_HIGHER___ + // leave it: it is already there +# elif defined TR2_OPTIONAL_CLANG_3_4_2_AND_HIGHER_ + // leave it: it is already there +# elif defined TR2_OPTIONAL_MSVC_2015_AND_HIGHER___ + // leave it: it is already there +# elif defined TR2_OPTIONAL_DISABLE_EMULATION_OF_TYPE_TRAITS + // leave it: the user doesn't want it +# else + template + using is_trivially_destructible = std::has_trivial_destructor; +# endif +// END workaround for missing is_trivially_destructible + +# if (defined TR2_OPTIONAL_GCC_4_7_AND_HIGHER___) + // leave it; our metafunctions are already defined. +# elif defined TR2_OPTIONAL_CLANG_3_4_2_AND_HIGHER_ + // leave it; our metafunctions are already defined. +# elif defined TR2_OPTIONAL_MSVC_2015_AND_HIGHER___ + // leave it: it is already there +# elif defined TR2_OPTIONAL_DISABLE_EMULATION_OF_TYPE_TRAITS + // leave it: the user doesn't want it +# else + + +// workaround for missing traits in GCC and CLANG +template +struct is_nothrow_move_constructible +{ + constexpr static bool value = std::is_nothrow_constructible::value; +}; + + +template +struct is_assignable +{ + template + constexpr static bool has_assign(...) { return false; } + + template () = std::declval(), true)) > + // the comma operator is necessary for the cases where operator= returns void + constexpr static bool has_assign(bool) { return true; } + + constexpr static bool value = has_assign(true); +}; + + +template +struct is_nothrow_move_assignable +{ + template + struct has_nothrow_move_assign { + constexpr static bool value = false; + }; + + template + struct has_nothrow_move_assign { + constexpr static bool value = noexcept( std::declval() = std::declval() ); + }; + + constexpr static bool value = has_nothrow_move_assign::value>::value; +}; +// end workaround + + +# endif + + + +// 20.5.4, optional for object types +template class optional; + +// 20.5.5, optional for lvalue reference types +template class optional; + + +// workaround: std utility functions aren't constexpr yet +template inline constexpr T&& constexpr_forward(typename std::remove_reference::type& t) noexcept +{ + return static_cast(t); +} + +template inline constexpr T&& constexpr_forward(typename std::remove_reference::type&& t) noexcept +{ + static_assert(!std::is_lvalue_reference::value, "!!"); + return static_cast(t); +} + +template inline constexpr typename std::remove_reference::type&& constexpr_move(T&& t) noexcept +{ + return static_cast::type&&>(t); +} + + +#if defined NDEBUG +# define TR2_OPTIONAL_ASSERTED_EXPRESSION(CHECK, EXPR) (EXPR) +#else +# define TR2_OPTIONAL_ASSERTED_EXPRESSION(CHECK, EXPR) ((CHECK) ? (EXPR) : ([]{assert(!#CHECK);}(), (EXPR))) +#endif + + +namespace detail_ +{ + +// static_addressof: a constexpr version of addressof +template +struct has_overloaded_addressof +{ + template + constexpr static bool has_overload(...) { return false; } + + template ().operator&()) > + constexpr static bool has_overload(bool) { return true; } + + constexpr static bool value = has_overload(true); +}; + +template )> +constexpr T* static_addressof(T& ref) +{ + return &ref; +} + +template )> +T* static_addressof(T& ref) +{ + return std::addressof(ref); +} + + +// the call to convert(b) has return type A and converts b to type A iff b decltype(b) is implicitly convertible to A +template +constexpr U convert(U v) { return v; } + + +namespace swap_ns +{ + using std::swap; + + template + void adl_swap(T& t, T& u) noexcept(noexcept(swap(t, u))) + { + swap(t, u); + } + +} // namespace swap_ns + +} // namespace detail + + +constexpr struct trivial_init_t{} trivial_init{}; + + +// 20.5.6, In-place construction +constexpr struct in_place_t{} in_place{}; + + +// 20.5.7, Disengaged state indicator +struct nullopt_t +{ + struct init{}; + constexpr explicit nullopt_t(init){} +}; +constexpr nullopt_t nullopt{nullopt_t::init()}; + + +// 20.5.8, class bad_optional_access +class bad_optional_access : public logic_error { +public: + explicit bad_optional_access(const string& what_arg) : logic_error{what_arg} {} + explicit bad_optional_access(const char* what_arg) : logic_error{what_arg} {} +}; + + +template +union storage_t +{ + unsigned char dummy_; + T value_; + + constexpr storage_t( trivial_init_t ) noexcept : dummy_() {}; + + template + constexpr storage_t( Args&&... args ) : value_(constexpr_forward(args)...) {} + + ~storage_t(){} +}; + + +template +union constexpr_storage_t +{ + unsigned char dummy_; + T value_; + + constexpr constexpr_storage_t( trivial_init_t ) noexcept : dummy_() {}; + + template + constexpr constexpr_storage_t( Args&&... args ) : value_(constexpr_forward(args)...) {} + + ~constexpr_storage_t() = default; +}; + + +template +struct optional_base +{ + bool init_; + storage_t storage_; + + constexpr optional_base() noexcept : init_(false), storage_(trivial_init) {}; + + explicit constexpr optional_base(const T& v) : init_(true), storage_(v) {} + + explicit constexpr optional_base(T&& v) : init_(true), storage_(constexpr_move(v)) {} + + template explicit optional_base(in_place_t, Args&&... args) + : init_(true), storage_(constexpr_forward(args)...) {} + + template >)> + explicit optional_base(in_place_t, std::initializer_list il, Args&&... args) + : init_(true), storage_(il, std::forward(args)...) {} + + ~optional_base() { if (init_) storage_.value_.T::~T(); } +}; + + +template +struct constexpr_optional_base +{ + bool init_; + constexpr_storage_t storage_; + + constexpr constexpr_optional_base() noexcept : init_(false), storage_(trivial_init) {}; + + explicit constexpr constexpr_optional_base(const T& v) : init_(true), storage_(v) {} + + explicit constexpr constexpr_optional_base(T&& v) : init_(true), storage_(constexpr_move(v)) {} + + template explicit constexpr constexpr_optional_base(in_place_t, Args&&... args) + : init_(true), storage_(constexpr_forward(args)...) {} + + template >)> + OPTIONAL_CONSTEXPR_INIT_LIST explicit constexpr_optional_base(in_place_t, std::initializer_list il, Args&&... args) + : init_(true), storage_(il, std::forward(args)...) {} + + ~constexpr_optional_base() = default; +}; + +template +using OptionalBase = typename std::conditional< + is_trivially_destructible::value, // if possible + constexpr_optional_base::type>, // use base with trivial destructor + optional_base::type> +>::type; + + + +template +class optional : private OptionalBase +{ + static_assert( !std::is_same::type, nullopt_t>::value, "bad T" ); + static_assert( !std::is_same::type, in_place_t>::value, "bad T" ); + + + constexpr bool initialized() const noexcept { return OptionalBase::init_; } + typename std::remove_const::type* dataptr() { return std::addressof(OptionalBase::storage_.value_); } + constexpr const T* dataptr() const { return detail_::static_addressof(OptionalBase::storage_.value_); } + +# if OPTIONAL_HAS_THIS_RVALUE_REFS == 1 + constexpr const T& contained_val() const& { return OptionalBase::storage_.value_; } +# if OPTIONAL_HAS_MOVE_ACCESSORS == 1 + OPTIONAL_MUTABLE_CONSTEXPR T&& contained_val() && { return std::move(OptionalBase::storage_.value_); } + OPTIONAL_MUTABLE_CONSTEXPR T& contained_val() & { return OptionalBase::storage_.value_; } +# else + T& contained_val() & { return OptionalBase::storage_.value_; } + T&& contained_val() && { return std::move(OptionalBase::storage_.value_); } +# endif +# else + constexpr const T& contained_val() const { return OptionalBase::storage_.value_; } + T& contained_val() { return OptionalBase::storage_.value_; } +# endif + + void clear() noexcept { + if (initialized()) dataptr()->T::~T(); + OptionalBase::init_ = false; + } + + template + void initialize(Args&&... args) noexcept(noexcept(T(std::forward(args)...))) + { + assert(!OptionalBase::init_); + ::new (static_cast(dataptr())) T(std::forward(args)...); + OptionalBase::init_ = true; + } + + template + void initialize(std::initializer_list il, Args&&... args) noexcept(noexcept(T(il, std::forward(args)...))) + { + assert(!OptionalBase::init_); + ::new (static_cast(dataptr())) T(il, std::forward(args)...); + OptionalBase::init_ = true; + } + +public: + typedef T value_type; + + // 20.5.5.1, constructors + constexpr optional() noexcept : OptionalBase() {}; + constexpr optional(nullopt_t) noexcept : OptionalBase() {}; + + optional(const optional& rhs) + : OptionalBase() + { + if (rhs.initialized()) { + ::new (static_cast(dataptr())) T(*rhs); + OptionalBase::init_ = true; + } + } + + optional(optional&& rhs) noexcept(is_nothrow_move_constructible::value) + : OptionalBase() + { + if (rhs.initialized()) { + ::new (static_cast(dataptr())) T(std::move(*rhs)); + OptionalBase::init_ = true; + } + } + + constexpr optional(const T& v) : OptionalBase(v) {} + + constexpr optional(T&& v) : OptionalBase(constexpr_move(v)) {} + + template + explicit constexpr optional(in_place_t, Args&&... args) + : OptionalBase(in_place_t{}, constexpr_forward(args)...) {} + + template >)> + OPTIONAL_CONSTEXPR_INIT_LIST explicit optional(in_place_t, std::initializer_list il, Args&&... args) + : OptionalBase(in_place_t{}, il, constexpr_forward(args)...) {} + + // 20.5.4.2, Destructor + ~optional() = default; + + // 20.5.4.3, assignment + optional& operator=(nullopt_t) noexcept + { + clear(); + return *this; + } + + optional& operator=(const optional& rhs) + { + if (initialized() == true && rhs.initialized() == false) clear(); + else if (initialized() == false && rhs.initialized() == true) initialize(*rhs); + else if (initialized() == true && rhs.initialized() == true) contained_val() = *rhs; + return *this; + } + + optional& operator=(optional&& rhs) + noexcept(is_nothrow_move_assignable::value && is_nothrow_move_constructible::value) + { + if (initialized() == true && rhs.initialized() == false) clear(); + else if (initialized() == false && rhs.initialized() == true) initialize(std::move(*rhs)); + else if (initialized() == true && rhs.initialized() == true) contained_val() = std::move(*rhs); + return *this; + } + + template + auto operator=(U&& v) + -> typename enable_if + < + is_same::type, T>::value, + optional& + >::type + { + if (initialized()) { contained_val() = std::forward(v); } + else { initialize(std::forward(v)); } + return *this; + } + + + template + void emplace(Args&&... args) + { + clear(); + initialize(std::forward(args)...); + } + + template + void emplace(initializer_list il, Args&&... args) + { + clear(); + initialize(il, std::forward(args)...); + } + + // 20.5.4.4, Swap + void swap(optional& rhs) noexcept(is_nothrow_move_constructible::value + && noexcept(detail_::swap_ns::adl_swap(declval(), declval()))) + { + if (initialized() == true && rhs.initialized() == false) { rhs.initialize(std::move(**this)); clear(); } + else if (initialized() == false && rhs.initialized() == true) { initialize(std::move(*rhs)); rhs.clear(); } + else if (initialized() == true && rhs.initialized() == true) { using std::swap; swap(**this, *rhs); } + } + + // 20.5.4.5, Observers + + explicit constexpr operator bool() const noexcept { return initialized(); } + constexpr bool has_value() const noexcept { return initialized(); } + + constexpr T const* operator ->() const { + return TR2_OPTIONAL_ASSERTED_EXPRESSION(initialized(), dataptr()); + } + +# if OPTIONAL_HAS_MOVE_ACCESSORS == 1 + + OPTIONAL_MUTABLE_CONSTEXPR T* operator ->() { + assert (initialized()); + return dataptr(); + } + + constexpr T const& operator *() const& { + return TR2_OPTIONAL_ASSERTED_EXPRESSION(initialized(), contained_val()); + } + + OPTIONAL_MUTABLE_CONSTEXPR T& operator *() & { + assert (initialized()); + return contained_val(); + } + + OPTIONAL_MUTABLE_CONSTEXPR T&& operator *() && { + assert (initialized()); + return constexpr_move(contained_val()); + } + + constexpr T const& value() const& { + return initialized() ? contained_val() : (throw bad_optional_access("bad optional access"), contained_val()); + } + + OPTIONAL_MUTABLE_CONSTEXPR T& value() & { + return initialized() ? contained_val() : (throw bad_optional_access("bad optional access"), contained_val()); + } + + OPTIONAL_MUTABLE_CONSTEXPR T&& value() && { + if (!initialized()) throw bad_optional_access("bad optional access"); + return std::move(contained_val()); + } + +# else + + T* operator ->() { + assert (initialized()); + return dataptr(); + } + + constexpr T const& operator *() const { + return TR2_OPTIONAL_ASSERTED_EXPRESSION(initialized(), contained_val()); + } + + T& operator *() { + assert (initialized()); + return contained_val(); + } + + constexpr T const& value() const { + return initialized() ? contained_val() : (throw bad_optional_access("bad optional access"), contained_val()); + } + + T& value() { + return initialized() ? contained_val() : (throw bad_optional_access("bad optional access"), contained_val()); + } + +# endif + +# if OPTIONAL_HAS_THIS_RVALUE_REFS == 1 + + template + constexpr T value_or(V&& v) const& + { + return *this ? **this : detail_::convert(constexpr_forward(v)); + } + +# if OPTIONAL_HAS_MOVE_ACCESSORS == 1 + + template + OPTIONAL_MUTABLE_CONSTEXPR T value_or(V&& v) && + { + return *this ? constexpr_move(const_cast&>(*this).contained_val()) : detail_::convert(constexpr_forward(v)); + } + +# else + + template + T value_or(V&& v) && + { + return *this ? constexpr_move(const_cast&>(*this).contained_val()) : detail_::convert(constexpr_forward(v)); + } + +# endif + +# else + + template + constexpr T value_or(V&& v) const + { + return *this ? **this : detail_::convert(constexpr_forward(v)); + } + +# endif + + // 20.6.3.6, modifiers + void reset() noexcept { clear(); } +}; + + +template +class optional +{ + static_assert( !std::is_same::value, "bad T" ); + static_assert( !std::is_same::value, "bad T" ); + T* ref; + +public: + + // 20.5.5.1, construction/destruction + constexpr optional() noexcept : ref(nullptr) {} + + constexpr optional(nullopt_t) noexcept : ref(nullptr) {} + + constexpr optional(T& v) noexcept : ref(detail_::static_addressof(v)) {} + + optional(T&&) = delete; + + constexpr optional(const optional& rhs) noexcept : ref(rhs.ref) {} + + explicit constexpr optional(in_place_t, T& v) noexcept : ref(detail_::static_addressof(v)) {} + + explicit optional(in_place_t, T&&) = delete; + + ~optional() = default; + + // 20.5.5.2, mutation + optional& operator=(nullopt_t) noexcept { + ref = nullptr; + return *this; + } + + // optional& operator=(const optional& rhs) noexcept { + // ref = rhs.ref; + // return *this; + // } + + // optional& operator=(optional&& rhs) noexcept { + // ref = rhs.ref; + // return *this; + // } + + template + auto operator=(U&& rhs) noexcept + -> typename enable_if + < + is_same::type, optional>::value, + optional& + >::type + { + ref = rhs.ref; + return *this; + } + + template + auto operator=(U&& rhs) noexcept + -> typename enable_if + < + !is_same::type, optional>::value, + optional& + >::type + = delete; + + void emplace(T& v) noexcept { + ref = detail_::static_addressof(v); + } + + void emplace(T&&) = delete; + + + void swap(optional& rhs) noexcept + { + std::swap(ref, rhs.ref); + } + + // 20.5.5.3, observers + constexpr T* operator->() const { + return TR2_OPTIONAL_ASSERTED_EXPRESSION(ref, ref); + } + + constexpr T& operator*() const { + return TR2_OPTIONAL_ASSERTED_EXPRESSION(ref, *ref); + } + + constexpr T& value() const { + return ref ? *ref : (throw bad_optional_access("bad optional access"), *ref); + } + + explicit constexpr operator bool() const noexcept { + return ref != nullptr; + } + + constexpr bool has_value() const noexcept { + return ref != nullptr; + } + + template + constexpr typename decay::type value_or(V&& v) const + { + return *this ? **this : detail_::convert::type>(constexpr_forward(v)); + } + + // x.x.x.x, modifiers + void reset() noexcept { ref = nullptr; } +}; + + +template +class optional +{ + static_assert( sizeof(T) == 0, "optional rvalue references disallowed" ); +}; + + +// 20.5.8, Relational operators +template constexpr bool operator==(const optional& x, const optional& y) +{ + return bool(x) != bool(y) ? false : bool(x) == false ? true : *x == *y; +} + +template constexpr bool operator!=(const optional& x, const optional& y) +{ + return !(x == y); +} + +template constexpr bool operator<(const optional& x, const optional& y) +{ + return (!y) ? false : (!x) ? true : *x < *y; +} + +template constexpr bool operator>(const optional& x, const optional& y) +{ + return (y < x); +} + +template constexpr bool operator<=(const optional& x, const optional& y) +{ + return !(y < x); +} + +template constexpr bool operator>=(const optional& x, const optional& y) +{ + return !(x < y); +} + + +// 20.5.9, Comparison with nullopt +template constexpr bool operator==(const optional& x, nullopt_t) noexcept +{ + return (!x); +} + +template constexpr bool operator==(nullopt_t, const optional& x) noexcept +{ + return (!x); +} + +template constexpr bool operator!=(const optional& x, nullopt_t) noexcept +{ + return bool(x); +} + +template constexpr bool operator!=(nullopt_t, const optional& x) noexcept +{ + return bool(x); +} + +template constexpr bool operator<(const optional&, nullopt_t) noexcept +{ + return false; +} + +template constexpr bool operator<(nullopt_t, const optional& x) noexcept +{ + return bool(x); +} + +template constexpr bool operator<=(const optional& x, nullopt_t) noexcept +{ + return (!x); +} + +template constexpr bool operator<=(nullopt_t, const optional&) noexcept +{ + return true; +} + +template constexpr bool operator>(const optional& x, nullopt_t) noexcept +{ + return bool(x); +} + +template constexpr bool operator>(nullopt_t, const optional&) noexcept +{ + return false; +} + +template constexpr bool operator>=(const optional&, nullopt_t) noexcept +{ + return true; +} + +template constexpr bool operator>=(nullopt_t, const optional& x) noexcept +{ + return (!x); +} + + + +// 20.5.10, Comparison with T +template constexpr bool operator==(const optional& x, const T& v) +{ + return bool(x) ? *x == v : false; +} + +template constexpr bool operator==(const T& v, const optional& x) +{ + return bool(x) ? v == *x : false; +} + +template constexpr bool operator!=(const optional& x, const T& v) +{ + return bool(x) ? *x != v : true; +} + +template constexpr bool operator!=(const T& v, const optional& x) +{ + return bool(x) ? v != *x : true; +} + +template constexpr bool operator<(const optional& x, const T& v) +{ + return bool(x) ? *x < v : true; +} + +template constexpr bool operator>(const T& v, const optional& x) +{ + return bool(x) ? v > *x : true; +} + +template constexpr bool operator>(const optional& x, const T& v) +{ + return bool(x) ? *x > v : false; +} + +template constexpr bool operator<(const T& v, const optional& x) +{ + return bool(x) ? v < *x : false; +} + +template constexpr bool operator>=(const optional& x, const T& v) +{ + return bool(x) ? *x >= v : false; +} + +template constexpr bool operator<=(const T& v, const optional& x) +{ + return bool(x) ? v <= *x : false; +} + +template constexpr bool operator<=(const optional& x, const T& v) +{ + return bool(x) ? *x <= v : true; +} + +template constexpr bool operator>=(const T& v, const optional& x) +{ + return bool(x) ? v >= *x : true; +} + + +// Comparison of optional with T +template constexpr bool operator==(const optional& x, const T& v) +{ + return bool(x) ? *x == v : false; +} + +template constexpr bool operator==(const T& v, const optional& x) +{ + return bool(x) ? v == *x : false; +} + +template constexpr bool operator!=(const optional& x, const T& v) +{ + return bool(x) ? *x != v : true; +} + +template constexpr bool operator!=(const T& v, const optional& x) +{ + return bool(x) ? v != *x : true; +} + +template constexpr bool operator<(const optional& x, const T& v) +{ + return bool(x) ? *x < v : true; +} + +template constexpr bool operator>(const T& v, const optional& x) +{ + return bool(x) ? v > *x : true; +} + +template constexpr bool operator>(const optional& x, const T& v) +{ + return bool(x) ? *x > v : false; +} + +template constexpr bool operator<(const T& v, const optional& x) +{ + return bool(x) ? v < *x : false; +} + +template constexpr bool operator>=(const optional& x, const T& v) +{ + return bool(x) ? *x >= v : false; +} + +template constexpr bool operator<=(const T& v, const optional& x) +{ + return bool(x) ? v <= *x : false; +} + +template constexpr bool operator<=(const optional& x, const T& v) +{ + return bool(x) ? *x <= v : true; +} + +template constexpr bool operator>=(const T& v, const optional& x) +{ + return bool(x) ? v >= *x : true; +} + +// Comparison of optional with T +template constexpr bool operator==(const optional& x, const T& v) +{ + return bool(x) ? *x == v : false; +} + +template constexpr bool operator==(const T& v, const optional& x) +{ + return bool(x) ? v == *x : false; +} + +template constexpr bool operator!=(const optional& x, const T& v) +{ + return bool(x) ? *x != v : true; +} + +template constexpr bool operator!=(const T& v, const optional& x) +{ + return bool(x) ? v != *x : true; +} + +template constexpr bool operator<(const optional& x, const T& v) +{ + return bool(x) ? *x < v : true; +} + +template constexpr bool operator>(const T& v, const optional& x) +{ + return bool(x) ? v > *x : true; +} + +template constexpr bool operator>(const optional& x, const T& v) +{ + return bool(x) ? *x > v : false; +} + +template constexpr bool operator<(const T& v, const optional& x) +{ + return bool(x) ? v < *x : false; +} + +template constexpr bool operator>=(const optional& x, const T& v) +{ + return bool(x) ? *x >= v : false; +} + +template constexpr bool operator<=(const T& v, const optional& x) +{ + return bool(x) ? v <= *x : false; +} + +template constexpr bool operator<=(const optional& x, const T& v) +{ + return bool(x) ? *x <= v : true; +} + +template constexpr bool operator>=(const T& v, const optional& x) +{ + return bool(x) ? v >= *x : true; +} + + +// 20.5.12, Specialized algorithms +template +void swap(optional& x, optional& y) noexcept(noexcept(x.swap(y))) +{ + x.swap(y); +} + + +template +constexpr optional::type> make_optional(T&& v) +{ + return optional::type>(constexpr_forward(v)); +} + +template +constexpr optional make_optional(reference_wrapper v) +{ + return optional(v.get()); +} + + +} // namespace akrzemi +} // namespace std + + +namespace std +{ + template + struct hash> + { + typedef typename hash::result_type result_type; + typedef akrzemi::optional argument_type; + + constexpr result_type operator()(argument_type const& arg) const { + return arg ? std::hash{}(*arg) : result_type{}; + } + }; + + template + struct hash> + { + typedef typename hash::result_type result_type; + typedef akrzemi::optional argument_type; + + constexpr result_type operator()(argument_type const& arg) const { + return arg ? std::hash{}(*arg) : result_type{}; + } + }; +} + +# undef TR2_OPTIONAL_REQUIRES +# undef TR2_OPTIONAL_ASSERTED_EXPRESSION + +namespace datastax { namespace internal { + +template +using CassOptional = std::akrzemi::optional; +constexpr auto CassNullopt = std::akrzemi::nullopt; + +}} // namespace datastax::internal + +# endif /* OPTIONAL_AKRZEMI_HPP */ diff --git a/src/optional/optional_std.hpp b/src/optional/optional_std.hpp new file mode 100644 index 000000000..da42340e4 --- /dev/null +++ b/src/optional/optional_std.hpp @@ -0,0 +1,14 @@ +# ifndef OPTIONAL_STD_HPP +# define OPTIONAL_STD_HPP + +# include + +namespace datastax { namespace internal { + +template +using CassOptional = std::optional; +constexpr auto CassNullopt = std::nullopt; + +}} // namespace datastax::internal + +# endif /* OPTIONAL_STD_HPP */ From 92eb3c7dbe22222df230e3f6595cef4a58725c63 Mon Sep 17 00:00:00 2001 From: Juliusz Stasiewicz Date: Tue, 9 Jun 2020 19:16:33 +0200 Subject: [PATCH 2/9] Added class `ShardingInfo` --- src/sharding_info.cpp | 80 +++++++++++++++++++++++++++++++++++++++++++ src/sharding_info.hpp | 59 +++++++++++++++++++++++++++++++ 2 files changed, 139 insertions(+) create mode 100644 src/sharding_info.cpp create mode 100644 src/sharding_info.hpp diff --git a/src/sharding_info.cpp b/src/sharding_info.cpp new file mode 100644 index 000000000..114842821 --- /dev/null +++ b/src/sharding_info.cpp @@ -0,0 +1,80 @@ +/* + * Copyright (C) 2020 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "sharding_info.hpp" + +#include + +namespace datastax { namespace internal { namespace core { + +const String ShardingInfo::SCYLLA_SHARD_PARAM_KEY = "SCYLLA_SHARD"; +const String ShardingInfo::SCYLLA_NR_SHARDS_PARAM_KEY = "SCYLLA_NR_SHARDS"; +const String ShardingInfo::SCYLLA_PARTITIONER = "SCYLLA_PARTITIONER"; +const String ShardingInfo::SCYLLA_SHARDING_ALGORITHM = "SCYLLA_SHARDING_ALGORITHM"; +const String ShardingInfo::SCYLLA_SHARDING_IGNORE_MSB = "SCYLLA_SHARDING_IGNORE_MSB"; + +ShardingInfo::ShardingInfo(size_t shards_count, String partitioner, String sharding_algorithm, int sharding_ignore_MSB) noexcept + : shards_count_(shards_count) + , partitioner_(std::move(partitioner)) + , sharding_algorithm_(std::move(sharding_algorithm)) + , sharding_ignore_MSB_ (sharding_ignore_MSB) {} + +size_t ShardingInfo::get_shards_count() const { + return shards_count_; +} + +int32_t ShardingInfo::shard_id(int64_t token) const { + token += std::numeric_limits::min(); + token <<= sharding_ignore_MSB_; + const int64_t tokLo = token & 0xffffffffL; + const int64_t tokHi = (token >> 32) & 0xffffffffL; + const int64_t mul1 = tokLo * shards_count_; + const int64_t mul2 = tokHi * shards_count_; // logically shifted 32 bits + const int64_t sum = (mul1 >> 32) + mul2; + return (int32_t) (sum >> 32); +} + +CassOptional ShardingInfo::parse_sharding_info(const StringMultimap& params) { + const auto shard_id = parse_int(params, SCYLLA_SHARD_PARAM_KEY); + const auto shards_count = parse_int(params, SCYLLA_NR_SHARDS_PARAM_KEY); + const auto partitioner = parse_string(params, SCYLLA_PARTITIONER); + const auto sharding_algorithm = parse_string(params, SCYLLA_SHARDING_ALGORITHM); + const auto sharding_ignore_MSB = parse_int(params, SCYLLA_SHARDING_IGNORE_MSB); + + if (!shard_id || !shards_count || !partitioner || !sharding_algorithm || !sharding_ignore_MSB + || *partitioner != "org.apache.cassandra.dht.Murmur3Partitioner" + || *sharding_algorithm != "biased-token-round-robin") { + return CassNullopt; + } + return ConnectionShardingInfo{*shard_id, ShardingInfo{(size_t)(*shards_count), *partitioner, *sharding_algorithm, *sharding_ignore_MSB}}; +} + +CassOptional ShardingInfo::parse_string(const StringMultimap& params, const String& key) { + if (!params.count(key) || params.at(key).size() != 1u) { + return CassNullopt; + } + return params.at(key)[0]; +} + +CassOptional ShardingInfo::parse_int(const StringMultimap& params, const String& key) { + const auto val = parse_string(params, key); + if (!val) { + return CassNullopt; + } + return std::atoi(val->c_str()); +} + +}}} // namespace datastax::internal::core diff --git a/src/sharding_info.hpp b/src/sharding_info.hpp new file mode 100644 index 000000000..cceb2cff5 --- /dev/null +++ b/src/sharding_info.hpp @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2020 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef DATASTAX_INTERNAL_SHARDING_INFO_HPP +#define DATASTAX_INTERNAL_SHARDING_INFO_HPP + +#include "decoder.hpp" +#include "optional.hpp" + +namespace datastax { namespace internal { namespace core { + +struct ConnectionShardingInfo; + +class ShardingInfo final { +public: + size_t get_shards_count() const; + int32_t shard_id(int64_t token) const; + + static CassOptional parse_sharding_info(const StringMultimap& params); + +private: + ShardingInfo(size_t shards_count, String partitioner, String sharding_algorithm, int sharding_ignore_MSB) noexcept; + + static const String SCYLLA_SHARD_PARAM_KEY; + static const String SCYLLA_NR_SHARDS_PARAM_KEY; + static const String SCYLLA_PARTITIONER; + static const String SCYLLA_SHARDING_ALGORITHM; + static const String SCYLLA_SHARDING_IGNORE_MSB; + + static CassOptional parse_string(const StringMultimap& params, const String& key); + static CassOptional parse_int(const StringMultimap& params, const String& key); + + size_t shards_count_; + String partitioner_; + String sharding_algorithm_; + int sharding_ignore_MSB_; +}; + +struct ConnectionShardingInfo final { + int32_t shard_id; + ShardingInfo sharding_info; +}; + +}}} // namespace datastax::internal::core + +#endif From f68c5d11511448c72e66e96c2f6e45306a1819c1 Mon Sep 17 00:00:00 2001 From: Juliusz Stasiewicz Date: Sun, 14 Jun 2020 15:57:01 +0200 Subject: [PATCH 3/9] Added `shard_id` to Connection and `ShardingInfo` field to Host. --- src/connection.hpp | 8 ++++++++ src/host.hpp | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/src/connection.hpp b/src/connection.hpp index d8878f027..f5b9cdfec 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -18,6 +18,7 @@ #include "request_callback.hpp" #include "socket.hpp" #include "stream_manager.hpp" +#include "optional.hpp" #ifndef DATASTAX_INTERNAL_CONNECTION_HPP #define DATASTAX_INTERNAL_CONNECTION_HPP @@ -195,6 +196,11 @@ class Connection : public RefCounted { */ void set_listener(ConnectionListener* listener = NULL); + int32_t shard_id() const { return shard_id_; } + void set_shard_id(int32_t shard_id) { + shard_id_ = shard_id; + } + /** * Start heartbeats to keep the connection alive and to detect a network or * server-side failure. @@ -241,6 +247,8 @@ class Connection : public RefCounted { ProtocolVersion protocol_version_; String keyspace_; + int32_t shard_id_ = 0; + unsigned int idle_timeout_secs_; unsigned int heartbeat_interval_secs_; bool heartbeat_outstanding_; diff --git a/src/host.hpp b/src/host.hpp index 28665aeed..e8c8c66fd 100644 --- a/src/host.hpp +++ b/src/host.hpp @@ -29,6 +29,8 @@ #include "scoped_ptr.hpp" #include "spin_lock.hpp" #include "vector.hpp" +#include "sharding_info.hpp" +#include "optional.hpp" #include #include @@ -124,6 +126,11 @@ class Host : public RefCounted { dc_id_ = dc_id; } + CassOptional sharding_info() const { return sharding_info_opt_; } + void set_sharding_info(ShardingInfo si) { + sharding_info_opt_ = std::move(si); + } + const String& partitioner() const { return partitioner_; } const Vector& tokens() const { return tokens_; } @@ -212,6 +219,7 @@ class Host : public RefCounted { Vector tokens_; Atomic connection_count_; Atomic inflight_request_count_; + CassOptional sharding_info_opt_; ScopedPtr latency_tracker_; From ae957dc20985af6e23347036a9bf627d150144a8 Mon Sep 17 00:00:00 2001 From: Juliusz Stasiewicz Date: Sun, 14 Jun 2020 16:01:48 +0200 Subject: [PATCH 4/9] cosmetics: unified order of includes and include guards --- src/connection.hpp | 6 +++--- src/connector.hpp | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/connection.hpp b/src/connection.hpp index f5b9cdfec..4dcdf18b3 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -14,15 +14,15 @@ limitations under the License. */ +#ifndef DATASTAX_INTERNAL_CONNECTION_HPP +#define DATASTAX_INTERNAL_CONNECTION_HPP + #include "event_response.hpp" #include "request_callback.hpp" #include "socket.hpp" #include "stream_manager.hpp" #include "optional.hpp" -#ifndef DATASTAX_INTERNAL_CONNECTION_HPP -#define DATASTAX_INTERNAL_CONNECTION_HPP - namespace datastax { namespace internal { namespace core { class ResponseMessage; diff --git a/src/connector.hpp b/src/connector.hpp index 7e22e72c6..0b3af70ec 100644 --- a/src/connector.hpp +++ b/src/connector.hpp @@ -14,14 +14,14 @@ limitations under the License. */ +#ifndef DATASTAX_INTERNAL_CONNECTION_CONNECTOR_HPP +#define DATASTAX_INTERNAL_CONNECTION_CONNECTOR_HPP + #include "auth.hpp" #include "callback.hpp" #include "connection.hpp" #include "socket_connector.hpp" -#ifndef DATASTAX_INTERNAL_CONNECTION_CONNECTOR_HPP -#define DATASTAX_INTERNAL_CONNECTION_CONNECTOR_HPP - namespace datastax { namespace internal { namespace core { class AuthResponseRequest; From 47511c51798d331050624150f2e8dbdfb9260a6b Mon Sep 17 00:00:00 2001 From: Juliusz Stasiewicz Date: Sun, 14 Jun 2020 16:16:38 +0200 Subject: [PATCH 5/9] Added ProtocolVersion::supports_sharding() --- src/protocol.cpp | 5 +++++ src/protocol.hpp | 7 +++++++ 2 files changed, 12 insertions(+) diff --git a/src/protocol.cpp b/src/protocol.cpp index 787282a84..31f9502d1 100644 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -92,3 +92,8 @@ bool ProtocolVersion::supports_result_metadata_id() const { assert(value_ > 0 && "Invalid protocol version"); return is_protocol_at_least_v5_or_dse_v2(value_); } + +bool ProtocolVersion::supports_sharding() const { + assert(value_ > 0 && "Invalid protocol version"); + return *this >= ProtocolVersion(CASS_PROTOCOL_VERSION_V4); +} diff --git a/src/protocol.hpp b/src/protocol.hpp index 739ec9a49..4a4e43505 100644 --- a/src/protocol.hpp +++ b/src/protocol.hpp @@ -131,6 +131,13 @@ class ProtocolVersion { */ bool supports_result_metadata_id() const; + /** + * Scylla-specific: check to see if shard-awareness is supported by the current protocol version. + * + * @return true if supported, otherwise false. + */ + bool supports_sharding() const; + public: bool operator<(ProtocolVersion version) const { return value_ < version.value_; } bool operator>(ProtocolVersion version) const { return value_ > version.value_; } From ff30f9953de447817b7cef9aa034030e67a86338 Mon Sep 17 00:00:00 2001 From: Juliusz Stasiewicz Date: Sun, 14 Jun 2020 18:05:38 +0200 Subject: [PATCH 6/9] Setting ShardingInfo on Host and shard_id on ctrl. connection --- src/connector.cpp | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/connector.cpp b/src/connector.cpp index 76cf411b5..b5ff74d94 100644 --- a/src/connector.cpp +++ b/src/connector.cpp @@ -282,6 +282,18 @@ void Connector::on_supported(ResponseMessage* response) { SupportedResponse* supported = static_cast(response->response_body().get()); supported_options_ = supported->supported_options(); + if (connection_->protocol_version().supports_sharding()) { + auto conn_sharding_info_opt = ShardingInfo::parse_sharding_info(supported_options_); + if (conn_sharding_info_opt) { + connection_->set_shard_id(conn_sharding_info_opt->shard_id); + connection_->host()->set_sharding_info(std::move(conn_sharding_info_opt->sharding_info)); + } else { + LOG_ERROR("Could not retrieve sharding info from control connection to %s." + " Continuing WITHOUT SHARD-AWARENESS.", + connection_->address().to_string().c_str()); + } + } + connection_->write_and_flush(RequestCallback::Ptr(new StartupCallback( this, Request::ConstPtr(new StartupRequest(settings_.application_name, settings_.application_version, settings_.client_id, From 138e47522c97fcd3bb77d2399539dc07656be6d8 Mon Sep 17 00:00:00 2001 From: Juliusz Stasiewicz Date: Fri, 26 Jun 2020 05:50:34 +0200 Subject: [PATCH 7/9] Added routing to shards on top of TokenAware policy At this point metrics indicate that cross-shard ops are reduced, but the implementation is still raw. The reasons: When connection-to-the-right-shard is being searched among per-host connections, it is done by linear search. Connecting logic does not attempt to reconnect until all shards are hit. Topology change events are not accounted for. --- src/connection_pool.cpp | 24 ++++++++++++++++++------ src/connection_pool.hpp | 2 +- src/connection_pool_manager.cpp | 4 ++-- src/connection_pool_manager.hpp | 9 ++++++++- src/pooled_connection.hpp | 2 ++ src/request_handler.cpp | 11 ++++++++++- src/request_processor.cpp | 4 ++-- 7 files changed, 43 insertions(+), 13 deletions(-) diff --git a/src/connection_pool.cpp b/src/connection_pool.cpp index d22d73c11..ced2c4502 100644 --- a/src/connection_pool.cpp +++ b/src/connection_pool.cpp @@ -95,13 +95,25 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo } } -PooledConnection::Ptr ConnectionPool::find_least_busy() const { - PooledConnection::Vec::const_iterator it = - std::min_element(connections_.begin(), connections_.end(), least_busy_comp); - if (it == connections_.end() || (*it)->is_closing()) { - return PooledConnection::Ptr(); +PooledConnection::Ptr ConnectionPool::find_least_busy(int64_t token) const { + if (token == CASS_INT64_MIN) { + PooledConnection::Vec::const_iterator it = + std::min_element(connections_.begin(), connections_.end(), least_busy_comp); + if (it == connections_.end() || (*it)->is_closing()) { + return PooledConnection::Ptr(); + } + return *it; + } + + const auto desired_shard_num = host_->sharding_info()->shard_id(token); + const auto conn_it = std::find_if(connections_.begin(), connections_.end(), [desired_shard_num] (const PooledConnection::Ptr& c) { + return c->shard_id() == desired_shard_num; + }); + if (conn_it != connections_.end()) { + return *conn_it; } - return *it; + + return find_least_busy(CASS_INT64_MIN); } bool ConnectionPool::has_connections() const { return !connections_.empty(); } diff --git a/src/connection_pool.hpp b/src/connection_pool.hpp index cd6a5e89c..db3e2c184 100644 --- a/src/connection_pool.hpp +++ b/src/connection_pool.hpp @@ -135,7 +135,7 @@ class ConnectionPool : public RefCounted { * * @return The least busy connection or null if no connection is available. */ - PooledConnection::Ptr find_least_busy() const; + PooledConnection::Ptr find_least_busy(int64_t token) const; /** * Determine if the pool has any valid connections. diff --git a/src/connection_pool_manager.cpp b/src/connection_pool_manager.cpp index 74c50243f..fa8872162 100644 --- a/src/connection_pool_manager.cpp +++ b/src/connection_pool_manager.cpp @@ -62,12 +62,12 @@ ConnectionPoolManager::ConnectionPoolManager(const ConnectionPool::Map& pools, u } } -PooledConnection::Ptr ConnectionPoolManager::find_least_busy(const Address& address) const { +PooledConnection::Ptr ConnectionPoolManager::find_least_busy(const Address& address, int64_t token) const { ConnectionPool::Map::const_iterator it = pools_.find(address); if (it == pools_.end()) { return PooledConnection::Ptr(); } - return it->second->find_least_busy(); + return it->second->find_least_busy(token); } bool ConnectionPoolManager::has_connections(const Address& address) const { diff --git a/src/connection_pool_manager.hpp b/src/connection_pool_manager.hpp index 1633e1cde..e734c7ddd 100644 --- a/src/connection_pool_manager.hpp +++ b/src/connection_pool_manager.hpp @@ -84,7 +84,14 @@ class ConnectionPoolManager * @return The least busy connection for a host or null if no connections are * available. */ - PooledConnection::Ptr find_least_busy(const Address& address) const; + PooledConnection::Ptr find_least_busy(const Address& address, int64_t token) const; + + /** + * Non-token-aware version of `find_least_busy()`, kept only for testing. + */ + PooledConnection::Ptr find_least_busy(const Address& address) const { + return find_least_busy(address, CASS_INT64_MIN); + } /** * Determine if a pool has any valid connections. diff --git a/src/pooled_connection.hpp b/src/pooled_connection.hpp index 2a902da2b..c6de6b9d9 100644 --- a/src/pooled_connection.hpp +++ b/src/pooled_connection.hpp @@ -79,6 +79,8 @@ class PooledConnection */ bool is_closing() const; + int32_t shard_id() const { return connection_->shard_id(); } + public: const String& keyspace() const { return connection_->keyspace(); } // Test only diff --git a/src/request_handler.cpp b/src/request_handler.cpp index 30b2e4d1e..9591006f5 100644 --- a/src/request_handler.cpp +++ b/src/request_handler.cpp @@ -29,6 +29,7 @@ #include "result_response.hpp" #include "row.hpp" #include "session.hpp" +#include "token_map_impl.hpp" #include @@ -357,8 +358,16 @@ void RequestHandler::internal_retry(RequestExecution* request_execution) { bool is_done = false; while (!is_done && request_execution->current_host()) { + int64_t token = CASS_INT64_MIN; + const RoutableRequest* routable_req = dynamic_cast(request()); + if (routable_req) { + String routing_key; + routable_req->get_routing_key(&routing_key); + token = Murmur3Partitioner::hash(routing_key); + } + PooledConnection::Ptr connection = - manager_->find_least_busy(request_execution->current_host()->address()); + manager_->find_least_busy(request_execution->current_host()->address(), token); if (connection) { int32_t result = connection->write(request_execution); diff --git a/src/request_processor.cpp b/src/request_processor.cpp index 47e22fbca..261bbd190 100644 --- a/src/request_processor.cpp +++ b/src/request_processor.cpp @@ -378,7 +378,7 @@ bool RequestProcessor::on_prepare_all(const RequestHandler::Ptr& request_handler PrepareAllCallback::Ptr prepare_all_callback( new PrepareAllCallback(address, prepare_all_handler)); - PooledConnection::Ptr connection(connection_pool_manager_->find_least_busy(address)); + PooledConnection::Ptr connection(connection_pool_manager_->find_least_busy(address, CASS_INT64_MIN)); if (connection) { connection->write(prepare_all_callback.get()); } @@ -580,7 +580,7 @@ bool RequestProcessor::write_wait_callback(const RequestHandler::Ptr& request_ha const Host::Ptr& current_host, const RequestCallback::Ptr& callback) { PooledConnection::Ptr connection( - connection_pool_manager_->find_least_busy(current_host->address())); + connection_pool_manager_->find_least_busy(current_host->address(), CASS_INT64_MIN)); if (connection && connection->write(callback.get()) > 0) { // Stop the original request timer now that we have a response and // are waiting for the maximum wait time of the handler. From 690db35f871cbc8c15728e3a674ff833f772e01a Mon Sep 17 00:00:00 2001 From: Juliusz Stasiewicz Date: Wed, 19 Aug 2020 10:59:27 +0200 Subject: [PATCH 8/9] Signal usage of shard-aware driver --- src/cluster.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/cluster.cpp b/src/cluster.cpp index f03eaabd0..63b3e621c 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -25,6 +25,8 @@ #include "speculative_execution.hpp" #include "utils.hpp" +#include + using namespace datastax; using namespace datastax::internal::core; @@ -235,6 +237,10 @@ Cluster::Cluster(const ControlConnection::Ptr& connection, ClusterListener* list , local_dc_(local_dc) , supported_options_(supported_options) , is_recording_events_(settings.disable_events_on_startup) { + static const auto optimized_msg = "===== Using optimized driver!!! =====\n"; + std::cout << optimized_msg; + LOG_INFO(optimized_msg); + inc_ref(); connection_->set_listener(this); From ddec8591920b5a7cf4b32e25de1376d9ec6512b0 Mon Sep 17 00:00:00 2001 From: Juliusz Stasiewicz Date: Wed, 19 Aug 2020 10:59:49 +0200 Subject: [PATCH 9/9] connection_pool: picking the right connection and reconnecting The reconnection is being scheduled until we reach the desired number of connections per shard. The other change is that instead of picking the 'least busy' connection from the host's pool, we pick the least busy one from the "shards pool". --- src/connection_pool.cpp | 105 ++++++++++++++++++++++++++-------------- src/connection_pool.hpp | 6 ++- 2 files changed, 75 insertions(+), 36 deletions(-) diff --git a/src/connection_pool.cpp b/src/connection_pool.cpp index ced2c4502..aee0ac273 100644 --- a/src/connection_pool.cpp +++ b/src/connection_pool.cpp @@ -22,6 +22,7 @@ #include "utils.hpp" #include +#include using namespace datastax; using namespace datastax::internal::core; @@ -77,46 +78,70 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo set_pointer_keys(reconnection_schedules_); set_pointer_keys(to_flush_); + if (host->sharding_info()) { + const auto hosts_shard_cnt = host->sharding_info()->get_shards_count(); + connections_by_shard_.resize(hosts_shard_cnt); + num_connections_per_shard_ = settings_.num_connections_per_host / hosts_shard_cnt + + (settings_.num_connections_per_host % hosts_shard_cnt ? 1u : 0u); + } else { + connections_by_shard_.resize(1); + num_connections_per_shard_ = settings_.num_connections_per_host; + } + for (Connection::Vec::const_iterator it = connections.begin(), end = connections.end(); it != end; ++it) { const Connection::Ptr& connection(*it); if (!connection->is_closing()) { - add_connection(PooledConnection::Ptr(new PooledConnection(this, connection))); + if (connections_by_shard_[connection->shard_id()].size() < num_connections_per_shard_) { + add_connection(PooledConnection::Ptr(new PooledConnection(this, connection))); + } else { + connection->close(); + } } } notify_up_or_down(); // We had non-critical errors or some connections closed - assert(connections.size() <= settings_.num_connections_per_host); - size_t needed = settings_.num_connections_per_host - connections_.size(); + size_t needed = num_connections_per_shard_ * connections_by_shard_.size() + - std::accumulate(connections_by_shard_.begin(), connections_by_shard_.end(), 0u, + [] (size_t acc, const PooledConnection::Vec& v) { + return acc + v.size(); + }); for (size_t i = 0; i < needed; ++i) { schedule_reconnect(); } } PooledConnection::Ptr ConnectionPool::find_least_busy(int64_t token) const { - if (token == CASS_INT64_MIN) { - PooledConnection::Vec::const_iterator it = - std::min_element(connections_.begin(), connections_.end(), least_busy_comp); - if (it == connections_.end() || (*it)->is_closing()) { - return PooledConnection::Ptr(); + if (token == CASS_INT64_MIN || !host_->sharding_info()) { + // We got a placeholder token, or a sensible token that is useless without the sharding info. + // In both cases we return the least busy connection of the *entire pool* (or NULL). + PooledConnection::Ptr least_busy; // NULL by default + for (const auto& shard_pool : connections_by_shard_) { + for (const auto& conn : shard_pool) { + if (!conn->is_closing()) { + least_busy = least_busy ? std::min(least_busy, conn, least_busy_comp) : conn; + } + } } - return *it; + return least_busy; } - const auto desired_shard_num = host_->sharding_info()->shard_id(token); - const auto conn_it = std::find_if(connections_.begin(), connections_.end(), [desired_shard_num] (const PooledConnection::Ptr& c) { - return c->shard_id() == desired_shard_num; - }); - if (conn_it != connections_.end()) { - return *conn_it; + // Otherwise, find the least busy connection pointing to the right shard (if possible) + const auto& shard_pool = connections_by_shard_[host_->sharding_info()->shard_id(token)]; + PooledConnection::Vec::const_iterator it = + std::min_element(shard_pool.begin(), shard_pool.end(), least_busy_comp); + if (it == shard_pool.end() || (*it)->is_closing()) { + return find_least_busy(CASS_INT64_MIN); } - - return find_least_busy(CASS_INT64_MIN); + return *it; } -bool ConnectionPool::has_connections() const { return !connections_.empty(); } +bool ConnectionPool::has_connections() const { + return std::any_of(connections_by_shard_.begin(), connections_by_shard_.end(), + [] (const PooledConnection::Vec& v) { return !v.empty(); }); +} void ConnectionPool::flush() { for (DenseHashSet::const_iterator it = to_flush_.begin(), @@ -154,8 +179,9 @@ void ConnectionPool::close_connection(PooledConnection* connection, Protected) { if (metrics_) { metrics_->total_connections.dec(); } - connections_.erase(std::remove(connections_.begin(), connections_.end(), connection), - connections_.end()); + auto& shard_pool = connections_by_shard_[connection->shard_id()]; + shard_pool.erase(std::remove(shard_pool.begin(), shard_pool.end(), connection), + shard_pool.end()); to_flush_.erase(connection); if (close_state_ != CLOSE_STATE_OPEN) { @@ -173,16 +199,16 @@ void ConnectionPool::add_connection(const PooledConnection::Ptr& connection) { if (metrics_) { metrics_->total_connections.inc(); } - connections_.push_back(connection); + const size_t new_connections_shard = connection->shard_id(); + LOG_INFO("add_connection: to host %s to shard %ld", host_->address_string().c_str(), new_connections_shard); + connections_by_shard_[new_connections_shard].push_back(connection); } void ConnectionPool::notify_up_or_down() { - if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) && - connections_.empty()) { + if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) && !has_connections()) { notify_state_ = NOTIFY_STATE_DOWN; listener_->on_pool_down(host_->address()); - } else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) && - !connections_.empty()) { + } else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) && has_connections()) { notify_state_ = NOTIFY_STATE_UP; listener_->on_pool_up(host_->address()); } @@ -223,11 +249,12 @@ void ConnectionPool::internal_close() { // Make copies of connection/connector data structures to prevent iterator // invalidation. - PooledConnection::Vec connections(connections_); - for (PooledConnection::Vec::iterator it = connections.begin(), end = connections.end(); - it != end; ++it) { - (*it)->close(); - } + auto connections_per_shards = connections_by_shard_; + std::for_each(connections_per_shards.begin(), connections_per_shards.end(), [] (PooledConnection::Vec& v) { + for (auto& c : v) { + c->close(); + } + }); DelayedConnector::Vec pending_connections(pending_connections_); for (DelayedConnector::Vec::iterator it = pending_connections.begin(), @@ -244,8 +271,7 @@ void ConnectionPool::internal_close() { void ConnectionPool::maybe_closed() { // Remove the pool once all current connections and pending connections // are terminated. - if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && connections_.empty() && - pending_connections_.empty()) { + if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && !has_connections() && pending_connections_.empty()) { close_state_ = CLOSE_STATE_CLOSED; // Only mark DOWN if it's UP otherwise we might get multiple DOWN events // when connecting the pool. @@ -275,9 +301,18 @@ void ConnectionPool::on_reconnect(DelayedConnector* connector) { } if (connector->is_ok()) { - add_connection( - PooledConnection::Ptr(new PooledConnection(this, connector->release_connection()))); - notify_up_or_down(); + PooledConnection::Ptr pooled_conn {new PooledConnection(this, connector->release_connection())}; + const size_t new_connections_shard = pooled_conn->shard_id(); + if (connections_by_shard_.size() > new_connections_shard + && connections_by_shard_[new_connections_shard].size() < num_connections_per_shard_) { + add_connection(pooled_conn); + notify_up_or_down(); + } else { + LOG_INFO("Reconnection to host %s connected us to shard %ld, reconnecting again", + address().to_string().c_str(), new_connections_shard); + pooled_conn->close(); + schedule_reconnect(schedule.release()); + } } else if (!connector->is_canceled()) { if (connector->is_critical_error()) { LOG_ERROR("Closing established connection pool to host %s because of the following error: %s", diff --git a/src/connection_pool.hpp b/src/connection_pool.hpp index db3e2c184..4dac7466e 100644 --- a/src/connection_pool.hpp +++ b/src/connection_pool.hpp @@ -213,6 +213,9 @@ class ConnectionPool : public RefCounted { private: void notify_up_or_down(); void notify_critical_error(Connector::ConnectionError code, const String& message); + + /** Adds connection to the pool. It's the caller's responsibility + * to keep track of the connections count. */ void add_connection(const PooledConnection::Ptr& connection); void schedule_reconnect(ReconnectionSchedule* schedule = NULL); void internal_close(); @@ -232,7 +235,8 @@ class ConnectionPool : public RefCounted { CloseState close_state_; NotifyState notify_state_; - PooledConnection::Vec connections_; + std::vector connections_by_shard_; /// Index is the shard ID + size_t num_connections_per_shard_; DelayedConnector::Vec pending_connections_; DenseHashSet to_flush_; };