Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor txn commit process. #1549

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/storage/txn/txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,11 @@ WalEntry *Txn::GetWALEntry() const { return wal_entry_.get(); }

TxnTimeStamp Txn::Commit() {
if (wal_entry_->cmds_.empty() && txn_store_.ReadOnly()) {
// Read only transaction
this->SetTxnRead();

// Don't need to write empty WalEntry (read-only transactions).
TxnTimeStamp commit_ts = txn_mgr_->GetCommitTimeStampR(this);
TxnTimeStamp commit_ts = txn_mgr_->GetNextTimestamp();
this->SetTxnCommitting(commit_ts);
this->SetTxnCommitted(commit_ts);
return commit_ts;
Expand Down Expand Up @@ -509,17 +512,21 @@ void Txn::Rollback() {
auto state = txn_context_.GetTxnState();
TxnTimeStamp abort_ts = 0;
if (state == TxnState::kStarted) {
abort_ts = txn_mgr_->GetCommitTimeStampR(this);
// Rollback TXN before commit
abort_ts = txn_mgr_->GetNextTimestamp();
} else if (state == TxnState::kCommitting) {
abort_ts = txn_context_.GetCommitTS();
} else {
String error_message = fmt::format("Transaction {} state is {}.", txn_id_, ToString(state));
UnrecoverableError(error_message);
}

txn_context_.SetTxnRollbacking(abort_ts);

txn_store_.Rollback(txn_id_, abort_ts);

txn_context_.SetTxnRollbacked();

LOG_TRACE(fmt::format("Txn: {} is dropped.", txn_id_));
}

Expand Down
156 changes: 91 additions & 65 deletions src/storage/txn/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,35 +58,36 @@ Txn *TxnManager::BeginTxn(UniquePtr<String> txn_text, bool ckp_txn) {
u64 new_txn_id = ++catalog_->next_txn_id_;

// Record the start ts of the txn
TxnTimeStamp ts = ++start_ts_;
TxnTimeStamp begin_ts = ++start_ts_;
if (ckp_txn) {
if (ckp_begin_ts_ != UNCOMMIT_TS) {
// not set ckp_begin_ts_ may not truncate the wal file.
LOG_WARN(fmt::format("Another checkpoint txn is started in {}, new checkpoint {} will do nothing", ckp_begin_ts_, ts));
LOG_WARN(fmt::format("Another checkpoint txn is started in {}, new checkpoint {} will do nothing", ckp_begin_ts_, begin_ts));
} else {
LOG_DEBUG(fmt::format("Checkpoint txn is started in {}", ts));
ckp_begin_ts_ = ts;
LOG_DEBUG(fmt::format("Checkpoint txn is started in {}", begin_ts));
ckp_begin_ts_ = begin_ts;
}
}

// Create txn instance
auto new_txn = SharedPtr<Txn>(new Txn(this, buffer_mgr_, catalog_, bg_task_processor_, new_txn_id, ts, std::move(txn_text)));
auto new_txn = SharedPtr<Txn>(new Txn(this, buffer_mgr_, catalog_, bg_task_processor_, new_txn_id, begin_ts, std::move(txn_text)));

// Storage txn in txn manager
txn_map_[new_txn_id] = new_txn;
beginned_txns_.emplace_back(new_txn);
begin_txn_by_ts_.emplace(begin_ts, new_txn_id);
// beginned_txns_.emplace_back(new_txn);

// LOG_INFO(fmt::format("Txn: {} is Begin. begin ts: {}", new_txn_id, ts));
// LOG_INFO(fmt::format("Txn: {} is Begin. begin ts: {}", new_txn_id, begin_ts));
return new_txn.get();
}

Txn *TxnManager::GetTxn(TransactionID txn_id) {
Txn *TxnManager::GetTxn(TransactionID txn_id) const {
std::lock_guard guard(locker_);
Txn *res = txn_map_.at(txn_id).get();
return res;
}

TxnState TxnManager::GetTxnState(TransactionID txn_id) {
TxnState TxnManager::GetTxnState(TransactionID txn_id) const {
std::lock_guard guard(locker_);
auto iter = txn_map_.find(txn_id);
if (iter == txn_map_.end()) {
Expand All @@ -110,19 +111,13 @@ bool TxnManager::CheckIfCommitting(TransactionID txn_id, TxnTimeStamp begin_ts)
return txn->CommitTS() < begin_ts;
}

TxnTimeStamp TxnManager::GetCommitTimeStampR(Txn *txn) {
std::lock_guard guard(locker_);
TxnTimeStamp commit_ts = ++start_ts_;
txn->SetTxnRead();
return commit_ts;
}

TxnTimeStamp TxnManager::GetCommitTimeStampW(Txn *txn) {
txn->SetTxnWrite();

std::lock_guard guard(locker_);
TxnTimeStamp commit_ts = ++start_ts_;
wait_conflict_ck_.emplace(commit_ts, nullptr);
finishing_txns_.emplace(txn);
txn->SetTxnWrite();
return commit_ts;
}

Expand Down Expand Up @@ -281,13 +276,8 @@ TxnTimeStamp TxnManager::GetNewTimeStamp() { return start_ts_++; }
TxnTimeStamp TxnManager::GetCleanupScanTS() {
std::lock_guard guard(locker_);
TxnTimeStamp first_uncommitted_begin_ts = start_ts_;
while (!beginned_txns_.empty()) {
auto first_txn = beginned_txns_.front().lock();
if (first_txn.get() != nullptr) {
first_uncommitted_begin_ts = first_txn->BeginTS();
break;
}
beginned_txns_.pop_front();
if (!begin_txn_by_ts_.empty()) {
first_uncommitted_begin_ts = begin_txn_by_ts_.begin()->first;
}
TxnTimeStamp checkpointed_ts = wal_mgr_->GetCheckpointedTS();
TxnTimeStamp res = std::min(first_uncommitted_begin_ts, checkpointed_ts);
Expand All @@ -301,61 +291,97 @@ TxnTimeStamp TxnManager::GetCleanupScanTS() {
// A Txn can be deleted when there is no uncommitted txn whose begin is less than the commit ts of the txn
// So maintain the least uncommitted begin ts
void TxnManager::FinishTxn(Txn *txn) {
std::lock_guard guard(locker_);
auto txn_state = txn->GetTxnState();
auto txn_type = txn->GetTxnType();

if (txn->GetTxnType() == TxnType::kInvalid) {
String error_message = "Txn type is invalid";
UnrecoverableError(error_message);
} else if (txn->GetTxnType() == TxnType::kRead) {
TxnTimeStamp finished_ts = ++start_ts_;

std::lock_guard guard(locker_);
if (txn_state == TxnState::kRollbacked) {
// Rollback by TXN self
begin_txn_by_ts_.erase(txn->BeginTS());
txn_map_.erase(txn->TxnID());
return;
}

TxnTimeStamp finished_ts = ++start_ts_;
auto state = txn->GetTxnState();
if (state == TxnState::kCommitting) {
txn->SetTxnCommitted(finished_ts);
finished_txns_.emplace_back(txn);
} else if (state == TxnState::kRollbacking) {
txn->SetTxnRollbacked();
// If rollback happened before commit phase, finishing_txns won't involve the txn.
// If rollback happened at the commit phase which only caused by txn conflict, the finishing_txns will involve the txn.
finishing_txns_.erase(txn);

} else {
String error_message = fmt::format("Invalid transaction status: {}", ToString(state));
UnrecoverableError(error_message);
}
SizeT remove_n = finishing_txns_.erase(txn);
if (remove_n == 0) {
UnrecoverableError("Txn not found in finishing_txns_");
// Committing status and Write TXN
switch(txn_type) {
case TxnType::kRead: {
// Read only TXN
begin_txn_by_ts_.erase(txn->BeginTS());
txn_map_.erase(txn->TxnID());
return;
}
case TxnType::kInvalid: {
String error_message = "Txn type is invalid";
UnrecoverableError(error_message);
}
case TxnType::kWrite: {
break;
}
}

if (txn_state == TxnState::kCommitting) {
txn->SetTxnCommitted(finished_ts);
finished_txns_.emplace_back(txn);
} else {
String error_message = fmt::format("Invalid transaction status: {}", ToString(txn_state));
UnrecoverableError(error_message);
}

SizeT remove_n = finishing_txns_.erase(txn);
if (remove_n == 0) {
UnrecoverableError("Txn not found in finishing_txns_");
}
}

TxnTimeStamp least_uncommitted_begin_ts = finished_ts;
while (!beginned_txns_.empty()) {
auto first_txn = beginned_txns_.front().lock();
if (first_txn.get() == nullptr) {
beginned_txns_.pop_front();
TxnTimeStamp oldest_uncommitted_begin_ts = finished_ts;
while (!begin_txn_by_ts_.empty()) {
auto first_txn_ts = begin_txn_by_ts_.begin()->first;
auto first_txn_id = begin_txn_by_ts_.begin()->second;
auto txn_iter = txn_map_.find(first_txn_id);
if(txn_iter == txn_map_.end()) {
UnrecoverableError(fmt::format("Txn: {} not found in txn_map", first_txn_id));
}
Txn* first_txn_ptr = txn_iter->second.get();
auto status = first_txn_ptr->GetTxnState();

if(status == TxnState::kCommitted) {
begin_txn_by_ts_.erase(first_txn_ts);
continue;
}
auto status = first_txn->GetTxnState();
if (status == TxnState::kCommitted || status == TxnState::kRollbacked) {
beginned_txns_.pop_front();
} else {
least_uncommitted_begin_ts = first_txn->BeginTS();
break;

if(status == TxnState::kRollbacked) {
UnrecoverableError(fmt::format("Rollback Txn: {}, should already be removed before.", first_txn_id));
}

if(status == TxnState::kInvalid) {
UnrecoverableError(fmt::format("Txn: {} has invalid txn state.", first_txn_id));
}

oldest_uncommitted_begin_ts = first_txn_ptr->BeginTS();
break;
}

while (!finished_txns_.empty()) {
auto *finished_txn = finished_txns_.front();
if (finished_txn->CommittedTS() > least_uncommitted_begin_ts) {
break;
}
auto finished_txn_id = finished_txn->TxnID();
finished_txns_.pop_front();

// LOG_INFO(fmt::format("Txn: {} is erased", finished_txn_id));
SizeT remove_n = txn_map_.erase(finished_txn_id);
if (remove_n == 0) {
String error_message = fmt::format("Txn: {} not found in txn map", finished_txn_id);
UnrecoverableError(error_message);
if(finished_txn->CommittedTS() <= oldest_uncommitted_begin_ts) {
// The finished txn is committed before the oldest commit or rollback TXN ts
finished_txns_.pop_front();

// LOG_INFO(fmt::format("Txn: {} is erased", finished_txn_id));
SizeT remove_n = txn_map_.erase(finished_txn_id);
if (remove_n == 0) {
String error_message = fmt::format("Txn: {} not found in txn map", finished_txn_id);
UnrecoverableError(error_message);
}
} else {
LOG_TRACE(fmt::format("{} finish TS: {}, later than {}", finished_txn_id, finished_txn->CommittedTS(), oldest_uncommitted_begin_ts));
break;
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions src/storage/txn/txn_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ public:

Txn *BeginTxn(UniquePtr<String> txn_text, bool ckp_txn = false);

Txn *GetTxn(TransactionID txn_id);
Txn *GetTxn(TransactionID txn_id) const;

TxnState GetTxnState(TransactionID txn_id);
TxnState GetTxnState(TransactionID txn_id) const;

bool CheckIfCommitting(TransactionID txn_id, TxnTimeStamp begin_ts);

Expand All @@ -65,7 +65,9 @@ public:

BGTaskProcessor *bg_task_processor() const { return bg_task_processor_; }

TxnTimeStamp GetCommitTimeStampR(Txn *txn);
TxnTimeStamp GetNextTimestamp() {
return ++ start_ts_;
}

TxnTimeStamp GetCommitTimeStampW(Txn *txn);

Expand Down Expand Up @@ -123,7 +125,7 @@ private:
HashMap<TransactionID, SharedPtr<Txn>> txn_map_{};
WalManager *wal_mgr_;

Deque<WeakPtr<Txn>> beginned_txns_; // sorted by begin ts
Map<TxnTimeStamp, TransactionID> begin_txn_by_ts_;
HashSet<Txn *> finishing_txns_; // the txns in committing stage, can use flat_map
Deque<Txn *> finished_txns_; // the txns that committed_ts

Expand Down
1 change: 1 addition & 0 deletions src/storage/wal/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ void WalManager::CheckpointInner(bool is_full_checkpoint, Txn *txn) {
} else {
auto new_max_commit_ts = txn->DeltaCheckpoint();
if (new_max_commit_ts == 0) {
last_ckp_ts_ = max_commit_ts; // TODO: Need to refactor
return;
}
max_commit_ts = new_max_commit_ts;
Expand Down
Loading