Skip to content

Commit

Permalink
[#25652] Fix TSAN issue with accessing the PgClientSessionContext::cl…
Browse files Browse the repository at this point in the history
…ient field

Summary:
The `PgClientSessionContext::client` field is initialized once in one thread. And other threads performs only reads from this field (and only after initialization is fully completed).
Relying on the fact the fields' value is never changed the reader threads perform read of the field in non atomic fashion.
But field's initialization is performed in multi thread environment. For this purpose `compare_exchange_strong` function is used.

But after full initialization `compare_exchange_strong` is still executed and it may write same value into the memory. As a result it is not safe to read value in non atomic fashion.

Simplest solutions (which is implemented in current diff) is to move `client_` field out of the `PgClientSessionContext` structure and store it individually in each `PgClientSession` object.
Jira: DB-14902

Test Plan: Jenkins

Reviewers: sergei, hsunder, bkolagani, esheng

Reviewed By: sergei

Subscribers: ybase, yql

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D41372
  • Loading branch information
d-uspenskiy committed Jan 23, 2025
1 parent 06cd407 commit 9bc1dd9
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 38 deletions.
14 changes: 1 addition & 13 deletions src/yb/tserver/pg_client_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <unordered_map>
#include <unordered_set>

#include <boost/atomic.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/mem_fun.hpp>
#include <boost/multi_index_container.hpp>
Expand Down Expand Up @@ -427,7 +426,6 @@ class PgClientServiceImpl::Impl {
}),
advisory_locks_table_(client_future_),
session_context_{
.client = nullptr,
.xcluster_context = xcluster_context,
.advisory_locks_table = advisory_locks_table_,
.pg_node_level_mutation_counter = pg_node_level_mutation_counter,
Expand Down Expand Up @@ -482,7 +480,7 @@ class PgClientServiceImpl::Impl {
auto session_info = SessionInfo::Make(
txns_assignment_mutexes_[session_id % txns_assignment_mutexes_.size()],
FLAGS_pg_client_session_expiration_ms * 1ms, transaction_builder_,
SessionContext(), session_id, messenger_.scheduler());
client(), session_context_, session_id, messenger_.scheduler());
resp->set_session_id(session_id);
if (FLAGS_pg_client_use_shared_memory) {
resp->set_instance_id(instance_id_);
Expand Down Expand Up @@ -2047,16 +2045,6 @@ class PgClientServiceImpl::Impl {
std::chrono::seconds(FLAGS_check_pg_object_id_allocators_interval_secs));
}

const PgClientSessionContext& SessionContext() {
auto* client_ptr = &client();
client::YBClient* expected = nullptr;
[[maybe_unused]] const auto exchanged =
boost::atomic_ref{session_context_.client}.compare_exchange_strong(
expected, client_ptr);
DCHECK(exchanged || expected == client_ptr);
return session_context_;
}

const TabletServerIf& tablet_server_;
std::shared_future<client::YBClient*> client_future_;
const scoped_refptr<ClockBase> clock_;
Expand Down
43 changes: 22 additions & 21 deletions src/yb/tserver/pg_client_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -896,9 +896,10 @@ void PgClientSession::ReadPointHistory::Clear() {

PgClientSession::PgClientSession(
TransactionBuilder&& transaction_builder, SharedThisSource shared_this_source,
std::reference_wrapper<const PgClientSessionContext> context, uint64_t id,
rpc::Scheduler& scheduler)
: context_(context),
client::YBClient& client, std::reference_wrapper<const PgClientSessionContext> context,
uint64_t id, rpc::Scheduler& scheduler)
: client_(client),
context_(context),
shared_this_(std::shared_ptr<PgClientSession>(std::move(shared_this_source), this)),
id_(id),
transaction_builder_(std::move(transaction_builder)),
Expand All @@ -920,7 +921,7 @@ Status PgClientSession::CreateTable(

const auto* metadata = VERIFY_RESULT(GetDdlTransactionMetadata(
req.use_transaction(), context->GetClientDeadline()));
RETURN_NOT_OK(helper.Exec(&client(), metadata, context->GetClientDeadline()));
RETURN_NOT_OK(helper.Exec(&client_, metadata, context->GetClientDeadline()));
VLOG_WITH_PREFIX(1) << __func__ << ": " << req.table_name();
const auto& indexed_table_id = helper.indexed_table_id();
if (indexed_table_id.IsValid()) {
Expand All @@ -945,7 +946,7 @@ Status PgClientSession::CreateDatabase(
.tgt_owner = req.target_owner().c_str(),
};
}
return client().CreateNamespace(
return client_.CreateNamespace(
req.database_name(), YQL_DATABASE_PGSQL, "" /* creator_role_name */,
GetPgsqlNamespaceId(req.database_oid()),
req.source_database_oid() != kPgInvalidOid ? GetPgsqlNamespaceId(req.source_database_oid())
Expand All @@ -957,7 +958,7 @@ Status PgClientSession::CreateDatabase(

Status PgClientSession::DropDatabase(
const PgDropDatabaseRequestPB& req, PgDropDatabaseResponsePB* resp, rpc::RpcContext* context) {
return client().DeleteNamespace(
return client_.DeleteNamespace(
req.database_name(),
YQL_DATABASE_PGSQL,
GetPgsqlNamespaceId(req.database_oid()),
Expand All @@ -974,7 +975,7 @@ Status PgClientSession::DropTable(
// transaction has been determined to be a success.
if (req.index()) {
client::YBTableName indexed_table;
RETURN_NOT_OK(client().DeleteIndexTable(
RETURN_NOT_OK(client_.DeleteIndexTable(
yb_table_id, &indexed_table, !YsqlDdlRollbackEnabled() /* wait */,
metadata, context->GetClientDeadline()));
indexed_table.SetIntoTableIdentifierPB(resp->mutable_indexed_table());
Expand All @@ -983,7 +984,7 @@ Status PgClientSession::DropTable(
return Status::OK();
}

RETURN_NOT_OK(client().DeleteTable(yb_table_id, !YsqlDdlRollbackEnabled(), metadata,
RETURN_NOT_OK(client_.DeleteTable(yb_table_id, !YsqlDdlRollbackEnabled(), metadata,
context->GetClientDeadline()));
table_cache().Invalidate(yb_table_id);
return Status::OK();
Expand All @@ -992,7 +993,7 @@ Status PgClientSession::DropTable(
Status PgClientSession::AlterDatabase(
const PgAlterDatabaseRequestPB& req, PgAlterDatabaseResponsePB* resp,
rpc::RpcContext* context) {
const auto alterer = client().NewNamespaceAlterer(
const auto alterer = client_.NewNamespaceAlterer(
req.database_name(), GetPgsqlNamespaceId(req.database_oid()));
alterer->SetDatabaseType(YQL_DATABASE_PGSQL);
alterer->RenameTo(req.new_name());
Expand All @@ -1002,7 +1003,7 @@ Status PgClientSession::AlterDatabase(
Status PgClientSession::AlterTable(
const PgAlterTableRequestPB& req, PgAlterTableResponsePB* resp, rpc::RpcContext* context) {
const auto table_id = PgObjectId::GetYbTableIdFromPB(req.table_id());
const auto alterer = client().NewTableAlterer(table_id);
const auto alterer = client_.NewTableAlterer(table_id);
const auto txn = VERIFY_RESULT(GetDdlTransactionMetadata(
req.use_transaction(), context->GetClientDeadline()));
if (txn) {
Expand Down Expand Up @@ -1060,7 +1061,7 @@ Status PgClientSession::AlterTable(
Status PgClientSession::TruncateTable(
const PgTruncateTableRequestPB& req, PgTruncateTableResponsePB* resp,
rpc::RpcContext* context) {
return client().TruncateTable(PgObjectId::GetYbTableIdFromPB(req.table_id()));
return client_.TruncateTable(PgObjectId::GetYbTableIdFromPB(req.table_id()));
}

Status PgClientSession::CreateReplicationSlot(
Expand Down Expand Up @@ -1105,7 +1106,7 @@ Status PgClientSession::CreateReplicationSlot(
}

uint64_t consistent_snapshot_time;
auto stream_result = VERIFY_RESULT(client().CreateCDCSDKStreamForNamespace(
auto stream_result = VERIFY_RESULT(client_.CreateCDCSDKStreamForNamespace(
GetPgsqlNamespaceId(req.database_oid()), options,
/* populate_namespace_id_as_table_id */ false,
ReplicationSlotName(req.replication_slot_name()),
Expand All @@ -1122,15 +1123,15 @@ Status PgClientSession::CreateReplicationSlot(
Status PgClientSession::DropReplicationSlot(
const PgDropReplicationSlotRequestPB& req, PgDropReplicationSlotResponsePB* resp,
rpc::RpcContext* context) {
return client().DeleteCDCStream(ReplicationSlotName(req.replication_slot_name()));
return client_.DeleteCDCStream(ReplicationSlotName(req.replication_slot_name()));
}

Status PgClientSession::WaitForBackendsCatalogVersion(
const PgWaitForBackendsCatalogVersionRequestPB& req,
PgWaitForBackendsCatalogVersionResponsePB* resp,
rpc::RpcContext* context) {
// TODO(jason): send deadline to client.
const int num_lagging_backends = VERIFY_RESULT(client().WaitForYsqlBackendsCatalogVersion(
const int num_lagging_backends = VERIFY_RESULT(client_.WaitForYsqlBackendsCatalogVersion(
req.database_oid(), req.catalog_version(), context->GetClientDeadline(),
req.requestor_pg_backend_pid()));
resp->set_num_lagging_backends(num_lagging_backends);
Expand All @@ -1140,7 +1141,7 @@ Status PgClientSession::WaitForBackendsCatalogVersion(
Status PgClientSession::BackfillIndex(
const PgBackfillIndexRequestPB& req, PgBackfillIndexResponsePB* resp,
rpc::RpcContext* context) {
return client().BackfillIndex(
return client_.BackfillIndex(
PgObjectId::GetYbTableIdFromPB(req.table_id()), /* wait= */ true,
context->GetClientDeadline());
}
Expand All @@ -1152,7 +1153,7 @@ Status PgClientSession::CreateTablegroup(
auto tablespace_id = PgObjectId::FromPB(req.tablespace_id());
const auto* metadata = VERIFY_RESULT(GetDdlTransactionMetadata(
true /* use_transaction */, context->GetClientDeadline()));
auto s = client().CreateTablegroup(
auto s = client_.CreateTablegroup(
req.database_name(),
GetPgsqlNamespaceId(id.database_oid),
id.GetYbTablegroupId(),
Expand All @@ -1178,7 +1179,7 @@ Status PgClientSession::DropTablegroup(
const auto* metadata = VERIFY_RESULT(GetDdlTransactionMetadata(
true /* use_transaction */, context->GetClientDeadline()));
const auto status =
client().DeleteTablegroup(GetPgsqlTablegroupId(id.database_oid, id.object_oid), metadata);
client_.DeleteTablegroup(GetPgsqlTablegroupId(id.database_oid, id.object_oid), metadata);
if (status.IsNotFound()) {
return Status::OK();
}
Expand Down Expand Up @@ -1389,7 +1390,7 @@ Status PgClientSession::DdlAtomicityFinishTransaction(
// If we failed to report the status of this DDL transaction, we can just log and ignore it,
// as the poller in the YB-Master will figure out the status of this transaction using the
// transaction status tablet and PG catalog.
ERROR_NOT_OK(client().ReportYsqlDdlTxnStatus(*metadata, *commit),
ERROR_NOT_OK(client_.ReportYsqlDdlTxnStatus(*metadata, *commit),
Format("Sending ReportYsqlDdlTxnStatus call of $0 failed", *commit));
}

Expand All @@ -1404,7 +1405,7 @@ Status PgClientSession::DdlAtomicityFinishTransaction(
// (commit.has_value() is false), the purpose is to use the side effect of
// WaitForDdlVerificationToFinish to trigger the start of a background task to
// complete the DDL transaction at the DocDB side.
ERROR_NOT_OK(client().WaitForDdlVerificationToFinish(*metadata),
ERROR_NOT_OK(client_.WaitForDdlVerificationToFinish(*metadata),
"WaitForDdlVerificationToFinish call failed");
}
}
Expand Down Expand Up @@ -1911,7 +1912,7 @@ Status PgClientSession::InsertSequenceTuple(
PgObjectId table_oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid);
auto result = table_cache().Get(table_oid.GetYbTableId());
if (!result.ok()) {
RETURN_NOT_OK(CreateSequencesDataTable(&client(), context->GetClientDeadline()));
RETURN_NOT_OK(CreateSequencesDataTable(&client_, context->GetClientDeadline()));
// Try one more time.
result = table_cache().Get(table_oid.GetYbTableId());
}
Expand Down Expand Up @@ -2328,7 +2329,7 @@ client::YBSessionPtr& PgClientSession::EnsureSession(
PgClientSessionKind kind, CoarseTimePoint deadline, std::optional<uint64_t> read_time) {
auto& session = Session(kind);
if (!session) {
session = CreateSession(&client(), deadline, clock());
session = CreateSession(&client_, deadline, clock());
} else {
session->SetDeadline(deadline);
}
Expand Down
7 changes: 3 additions & 4 deletions src/yb/tserver/pg_client_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ YB_DEFINE_ENUM(PgClientSessionKind, (kPlain)(kDdl)(kCatalog)(kSequence)(kPgSessi
YB_STRONGLY_TYPED_BOOL(IsDDL);

struct PgClientSessionContext {
client::YBClient* client;
const TserverXClusterContextIf* xcluster_context;
YsqlAdvisoryLocksTable& advisory_locks_table;
PgMutationCounter* pg_node_level_mutation_counter;
Expand Down Expand Up @@ -171,8 +170,8 @@ class PgClientSession final {

PgClientSession(
TransactionBuilder&& transaction_builder, SharedThisSource shared_this_source,
std::reference_wrapper<const PgClientSessionContext> context, uint64_t id,
rpc::Scheduler& scheduler);
client::YBClient& client, std::reference_wrapper<const PgClientSessionContext> context,
uint64_t id, rpc::Scheduler& scheduler);

uint64_t id() const { return id_; }

Expand Down Expand Up @@ -317,7 +316,6 @@ class PgClientSession final {

void ScheduleBigSharedMemExpirationCheck(std::chrono::steady_clock::duration delay);

auto& client() { return *context_.client; }
const auto* xcluster_context() const { return context_.xcluster_context; }
auto& advisory_locks_table() { return context_.advisory_locks_table; }
auto* pg_node_level_mutation_counter() { return context_.pg_node_level_mutation_counter; }
Expand Down Expand Up @@ -348,6 +346,7 @@ class PgClientSession final {
std::unordered_map<uint64_t, ConsistentReadPoint::Momento> read_points_;
};

client::YBClient& client_;
const PgClientSessionContext& context_;
const std::weak_ptr<PgClientSession> shared_this_;
const uint64_t id_;
Expand Down

0 comments on commit 9bc1dd9

Please sign in to comment.