Skip to content

Commit

Permalink
Remove connection state management
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Oct 10, 2024
1 parent 4d4238f commit fa73512
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ nonstd::expected<::couchbase::collection, CouchbaseErrorType> CouchbaseClient::g
if (auto error_type = establishConnection()) {
return nonstd::make_unexpected(*error_type);
}
return cluster_.bucket(collection.bucket_name).scope(collection.scope_name).collection(collection.collection_name);
return cluster_->bucket(collection.bucket_name).scope(collection.scope_name).collection(collection.collection_name);
}

nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> CouchbaseClient::upsert(const CouchbaseCollection& collection,
Expand All @@ -47,7 +47,6 @@ nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> CouchbaseClient::ups

auto [upsert_err, upsert_resp] = collection_result->upsert<::couchbase::codec::raw_binary_transcoder>(document_id, buffer, options).get();
if (upsert_err.ec()) {
setConnectionError();
// ambiguous_timeout should not be retried as we do not know if the insert was successful or not
if (getErrorType(upsert_err.ec()) == CouchbaseErrorType::TEMPORARY && upsert_err.ec().value() != static_cast<int>(::couchbase::errc::common::ambiguous_timeout)) {
logger_->log_error("Failed to upsert document '{}' to collection '{}.{}.{}' due to temporary issue, error code: '{}', message: '{}'",
Expand All @@ -71,44 +70,24 @@ nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> CouchbaseClient::ups
}
}

void CouchbaseClient::setConnectionError() {
std::lock_guard<std::mutex> lock(state_mutex_);
state_ = State::UNKNOWN;
}

void CouchbaseClient::close() {
std::lock_guard<std::mutex> lock(state_mutex_);
if (state_ == State::CONNECTED || state_ == State::UNKNOWN) {
cluster_.close().wait();
state_ = State::DISCONNECTED;
if (cluster_) {
cluster_->close().wait();
}
}

std::optional<CouchbaseErrorType> CouchbaseClient::establishConnection() {
std::lock_guard<std::mutex> lock(state_mutex_);
if (state_ == State::CONNECTED) {
if (cluster_) {
return std::nullopt;
}

if (state_ == State::UNKNOWN) {
auto [err, upsert_resp] = cluster_.ping().get();
if (err.ec()) {
close();
state_ = State::DISCONNECTED;
} else {
state_ = State::CONNECTED;
return std::nullopt;
}
}

auto options = ::couchbase::cluster_options(username_, password_);
auto [connect_err, cluster] = ::couchbase::cluster::connect(connection_string_, options).get();
if (connect_err.ec()) {
logger_->log_error("Failed to connect to Couchbase cluster with error code: '{}' and message: '{}'", connect_err.ec(), connect_err.message());
return getErrorType(connect_err.ec());
}
cluster_ = std::move(cluster);
state_ = State::CONNECTED;
return std::nullopt;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,6 @@ enum class CouchbaseErrorType {

class CouchbaseClient {
public:
enum class State {
DISCONNECTED,
CONNECTED,
UNKNOWN,
};

CouchbaseClient(std::string connection_string, std::string username, std::string password, const std::shared_ptr<core::logging::Logger>& logger)
: connection_string_(std::move(connection_string)), username_(std::move(username)), password_(std::move(password)), logger_(logger) {
}
Expand All @@ -84,14 +78,11 @@ class CouchbaseClient {

static CouchbaseErrorType getErrorType(const std::error_code& error_code);
nonstd::expected<::couchbase::collection, CouchbaseErrorType> getCollection(const CouchbaseCollection& collection);
void setConnectionError();

std::mutex state_mutex_;
State state_ = State::DISCONNECTED;
std::string connection_string_;
std::string username_;
std::string password_;
::couchbase::cluster cluster_;
std::optional<::couchbase::cluster> cluster_;
std::shared_ptr<core::logging::Logger> logger_;
};

Expand Down

0 comments on commit fa73512

Please sign in to comment.