From 7a06bf10a34fa90a94a11c0cb97df351c9c57d09 Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Mon, 29 Jul 2024 10:54:41 +0800 Subject: [PATCH 1/5] Refactor code and update last_ckp_ts, since delta checkpoint is also checkpoint Signed-off-by: Jin Hai --- src/storage/txn/txn_manager.cpp | 22 ++++++++++++---------- src/storage/txn/txn_manager.cppm | 4 ++-- src/storage/wal/wal_manager.cpp | 1 + 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/storage/txn/txn_manager.cpp b/src/storage/txn/txn_manager.cpp index 3b943eec30..f74e05fe26 100644 --- a/src/storage/txn/txn_manager.cpp +++ b/src/storage/txn/txn_manager.cpp @@ -58,35 +58,35 @@ Txn *TxnManager::BeginTxn(UniquePtr txn_text, bool ckp_txn) { u64 new_txn_id = ++catalog_->next_txn_id_; // Record the start ts of the txn - TxnTimeStamp ts = ++start_ts_; + TxnTimeStamp begin_ts = ++start_ts_; if (ckp_txn) { if (ckp_begin_ts_ != UNCOMMIT_TS) { // not set ckp_begin_ts_ may not truncate the wal file. - LOG_WARN(fmt::format("Another checkpoint txn is started in {}, new checkpoint {} will do nothing", ckp_begin_ts_, ts)); + LOG_WARN(fmt::format("Another checkpoint txn is started in {}, new checkpoint {} will do nothing", ckp_begin_ts_, begin_ts)); } else { - LOG_DEBUG(fmt::format("Checkpoint txn is started in {}", ts)); - ckp_begin_ts_ = ts; + LOG_DEBUG(fmt::format("Checkpoint txn is started in {}", begin_ts)); + ckp_begin_ts_ = begin_ts; } } // Create txn instance - auto new_txn = SharedPtr(new Txn(this, buffer_mgr_, catalog_, bg_task_processor_, new_txn_id, ts, std::move(txn_text))); + auto new_txn = SharedPtr(new Txn(this, buffer_mgr_, catalog_, bg_task_processor_, new_txn_id, begin_ts, std::move(txn_text))); // Storage txn in txn manager txn_map_[new_txn_id] = new_txn; beginned_txns_.emplace_back(new_txn); - // LOG_INFO(fmt::format("Txn: {} is Begin. begin ts: {}", new_txn_id, ts)); + // LOG_INFO(fmt::format("Txn: {} is Begin. begin ts: {}", new_txn_id, begin_ts)); return new_txn.get(); } -Txn *TxnManager::GetTxn(TransactionID txn_id) { +Txn *TxnManager::GetTxn(TransactionID txn_id) const { std::lock_guard guard(locker_); Txn *res = txn_map_.at(txn_id).get(); return res; } -TxnState TxnManager::GetTxnState(TransactionID txn_id) { +TxnState TxnManager::GetTxnState(TransactionID txn_id) const { std::lock_guard guard(locker_); auto iter = txn_map_.find(txn_id); if (iter == txn_map_.end()) { @@ -111,18 +111,20 @@ bool TxnManager::CheckIfCommitting(TransactionID txn_id, TxnTimeStamp begin_ts) } TxnTimeStamp TxnManager::GetCommitTimeStampR(Txn *txn) { + txn->SetTxnRead(); + std::lock_guard guard(locker_); TxnTimeStamp commit_ts = ++start_ts_; - txn->SetTxnRead(); return commit_ts; } TxnTimeStamp TxnManager::GetCommitTimeStampW(Txn *txn) { + txn->SetTxnWrite(); + std::lock_guard guard(locker_); TxnTimeStamp commit_ts = ++start_ts_; wait_conflict_ck_.emplace(commit_ts, nullptr); finishing_txns_.emplace(txn); - txn->SetTxnWrite(); return commit_ts; } diff --git a/src/storage/txn/txn_manager.cppm b/src/storage/txn/txn_manager.cppm index f5bcd7ef03..c1d4e19b00 100644 --- a/src/storage/txn/txn_manager.cppm +++ b/src/storage/txn/txn_manager.cppm @@ -49,9 +49,9 @@ public: Txn *BeginTxn(UniquePtr txn_text, bool ckp_txn = false); - Txn *GetTxn(TransactionID txn_id); + Txn *GetTxn(TransactionID txn_id) const; - TxnState GetTxnState(TransactionID txn_id); + TxnState GetTxnState(TransactionID txn_id) const; bool CheckIfCommitting(TransactionID txn_id, TxnTimeStamp begin_ts); diff --git a/src/storage/wal/wal_manager.cpp b/src/storage/wal/wal_manager.cpp index 785fc68aed..8d344bac0c 100644 --- a/src/storage/wal/wal_manager.cpp +++ b/src/storage/wal/wal_manager.cpp @@ -323,6 +323,7 @@ void WalManager::CheckpointInner(bool is_full_checkpoint, Txn *txn) { } else { auto new_max_commit_ts = txn->DeltaCheckpoint(); if (new_max_commit_ts == 0) { + last_ckp_ts_ = max_commit_ts; // TODO: Need to refactor return; } max_commit_ts = new_max_commit_ts; From b12f2468a51b78bfbace2caa5744c37ea9be1dfd Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Mon, 29 Jul 2024 11:52:03 +0800 Subject: [PATCH 2/5] Update rollback process Signed-off-by: Jin Hai --- src/storage/txn/txn.cpp | 11 +++++++++-- src/storage/txn/txn_manager.cpp | 33 ++++++++++++++++---------------- src/storage/txn/txn_manager.cppm | 4 +++- 3 files changed, 29 insertions(+), 19 deletions(-) diff --git a/src/storage/txn/txn.cpp b/src/storage/txn/txn.cpp index 9d478bc91c..cd599b69cb 100644 --- a/src/storage/txn/txn.cpp +++ b/src/storage/txn/txn.cpp @@ -428,8 +428,11 @@ WalEntry *Txn::GetWALEntry() const { return wal_entry_.get(); } TxnTimeStamp Txn::Commit() { if (wal_entry_->cmds_.empty() && txn_store_.ReadOnly()) { + // Read only transaction + this->SetTxnRead(); + // Don't need to write empty WalEntry (read-only transactions). - TxnTimeStamp commit_ts = txn_mgr_->GetCommitTimeStampR(this); + TxnTimeStamp commit_ts = txn_mgr_->GetNextTimestamp(); this->SetTxnCommitting(commit_ts); this->SetTxnCommitted(commit_ts); return commit_ts; @@ -509,17 +512,21 @@ void Txn::Rollback() { auto state = txn_context_.GetTxnState(); TxnTimeStamp abort_ts = 0; if (state == TxnState::kStarted) { - abort_ts = txn_mgr_->GetCommitTimeStampR(this); + // Rollback TXN before commit + abort_ts = txn_mgr_->GetNextTimestamp(); } else if (state == TxnState::kCommitting) { abort_ts = txn_context_.GetCommitTS(); } else { String error_message = fmt::format("Transaction {} state is {}.", txn_id_, ToString(state)); UnrecoverableError(error_message); } + txn_context_.SetTxnRollbacking(abort_ts); txn_store_.Rollback(txn_id_, abort_ts); + txn_context_.SetTxnRollbacked(); + LOG_TRACE(fmt::format("Txn: {} is dropped.", txn_id_)); } diff --git a/src/storage/txn/txn_manager.cpp b/src/storage/txn/txn_manager.cpp index f74e05fe26..316998adec 100644 --- a/src/storage/txn/txn_manager.cpp +++ b/src/storage/txn/txn_manager.cpp @@ -110,14 +110,6 @@ bool TxnManager::CheckIfCommitting(TransactionID txn_id, TxnTimeStamp begin_ts) return txn->CommitTS() < begin_ts; } -TxnTimeStamp TxnManager::GetCommitTimeStampR(Txn *txn) { - txn->SetTxnRead(); - - std::lock_guard guard(locker_); - TxnTimeStamp commit_ts = ++start_ts_; - return commit_ts; -} - TxnTimeStamp TxnManager::GetCommitTimeStampW(Txn *txn) { txn->SetTxnWrite(); @@ -303,25 +295,34 @@ TxnTimeStamp TxnManager::GetCleanupScanTS() { // A Txn can be deleted when there is no uncommitted txn whose begin is less than the commit ts of the txn // So maintain the least uncommitted begin ts void TxnManager::FinishTxn(Txn *txn) { + auto txn_state = txn->GetTxnState(); + auto txn_type = txn->GetTxnType(); + std::lock_guard guard(locker_); + if (txn_state == TxnState::kRollbacked) { + // Rollback by TXN self + txn_map_.erase(txn->TxnID()); + return; + } - if (txn->GetTxnType() == TxnType::kInvalid) { - String error_message = "Txn type is invalid"; - UnrecoverableError(error_message); - } else if (txn->GetTxnType() == TxnType::kRead) { + if (txn_type == TxnType::kRead) { + // Read only TXN txn_map_.erase(txn->TxnID()); return; + } else if (txn_type == TxnType::kInvalid) { + String error_message = "Txn type is invalid"; + UnrecoverableError(error_message); } TxnTimeStamp finished_ts = ++start_ts_; - auto state = txn->GetTxnState(); - if (state == TxnState::kCommitting) { + + if (txn_state == TxnState::kCommitting) { txn->SetTxnCommitted(finished_ts); finished_txns_.emplace_back(txn); - } else if (state == TxnState::kRollbacking) { + } else if (txn_state == TxnState::kRollbacking) { txn->SetTxnRollbacked(); } else { - String error_message = fmt::format("Invalid transaction status: {}", ToString(state)); + String error_message = fmt::format("Invalid transaction status: {}", ToString(txn_state)); UnrecoverableError(error_message); } SizeT remove_n = finishing_txns_.erase(txn); diff --git a/src/storage/txn/txn_manager.cppm b/src/storage/txn/txn_manager.cppm index c1d4e19b00..7b41aded8d 100644 --- a/src/storage/txn/txn_manager.cppm +++ b/src/storage/txn/txn_manager.cppm @@ -65,7 +65,9 @@ public: BGTaskProcessor *bg_task_processor() const { return bg_task_processor_; } - TxnTimeStamp GetCommitTimeStampR(Txn *txn); + TxnTimeStamp GetNextTimestamp() { + return ++ start_ts_; + } TxnTimeStamp GetCommitTimeStampW(Txn *txn); From eb181de4f6b46f190d02ae947c063a9ff652417b Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Mon, 29 Jul 2024 12:20:24 +0800 Subject: [PATCH 3/5] Refactor txn manager to store the begin ts of txn Signed-off-by: Jin Hai --- src/storage/txn/txn_manager.cpp | 32 ++++++++++++++++---------------- src/storage/txn/txn_manager.cppm | 2 +- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/storage/txn/txn_manager.cpp b/src/storage/txn/txn_manager.cpp index 316998adec..7b5a99b3dc 100644 --- a/src/storage/txn/txn_manager.cpp +++ b/src/storage/txn/txn_manager.cpp @@ -74,7 +74,8 @@ Txn *TxnManager::BeginTxn(UniquePtr txn_text, bool ckp_txn) { // Storage txn in txn manager txn_map_[new_txn_id] = new_txn; - beginned_txns_.emplace_back(new_txn); + begin_txn_by_ts_.emplace(begin_ts, new_txn_id); +// beginned_txns_.emplace_back(new_txn); // LOG_INFO(fmt::format("Txn: {} is Begin. begin ts: {}", new_txn_id, begin_ts)); return new_txn.get(); @@ -275,13 +276,8 @@ TxnTimeStamp TxnManager::GetNewTimeStamp() { return start_ts_++; } TxnTimeStamp TxnManager::GetCleanupScanTS() { std::lock_guard guard(locker_); TxnTimeStamp first_uncommitted_begin_ts = start_ts_; - while (!beginned_txns_.empty()) { - auto first_txn = beginned_txns_.front().lock(); - if (first_txn.get() != nullptr) { - first_uncommitted_begin_ts = first_txn->BeginTS(); - break; - } - beginned_txns_.pop_front(); + if (!begin_txn_by_ts_.empty()) { + first_uncommitted_begin_ts = begin_txn_by_ts_.begin()->first; } TxnTimeStamp checkpointed_ts = wal_mgr_->GetCheckpointedTS(); TxnTimeStamp res = std::min(first_uncommitted_begin_ts, checkpointed_ts); @@ -302,12 +298,14 @@ void TxnManager::FinishTxn(Txn *txn) { if (txn_state == TxnState::kRollbacked) { // Rollback by TXN self txn_map_.erase(txn->TxnID()); + begin_txn_by_ts_.erase(txn->BeginTS()); return; } if (txn_type == TxnType::kRead) { // Read only TXN txn_map_.erase(txn->TxnID()); + begin_txn_by_ts_.erase(txn->BeginTS()); return; } else if (txn_type == TxnType::kInvalid) { String error_message = "Txn type is invalid"; @@ -331,17 +329,19 @@ void TxnManager::FinishTxn(Txn *txn) { } TxnTimeStamp least_uncommitted_begin_ts = finished_ts; - while (!beginned_txns_.empty()) { - auto first_txn = beginned_txns_.front().lock(); - if (first_txn.get() == nullptr) { - beginned_txns_.pop_front(); - continue; + while (!begin_txn_by_ts_.empty()) { + auto first_txn_ts = begin_txn_by_ts_.begin()->first; + auto first_txn_id = begin_txn_by_ts_.begin()->second; + auto txn_iter = txn_map_.find(first_txn_id); + if(txn_iter == txn_map_.end()) { + UnrecoverableError(fmt::format("Txn: {} not found in txn_map", first_txn_id)); } - auto status = first_txn->GetTxnState(); + Txn* first_txn_ptr = txn_iter->second.get(); + auto status = first_txn_ptr->GetTxnState(); if (status == TxnState::kCommitted || status == TxnState::kRollbacked) { - beginned_txns_.pop_front(); + begin_txn_by_ts_.erase(first_txn_ts); } else { - least_uncommitted_begin_ts = first_txn->BeginTS(); + least_uncommitted_begin_ts = first_txn_ptr->BeginTS(); break; } } diff --git a/src/storage/txn/txn_manager.cppm b/src/storage/txn/txn_manager.cppm index 7b41aded8d..1aa69602ff 100644 --- a/src/storage/txn/txn_manager.cppm +++ b/src/storage/txn/txn_manager.cppm @@ -125,7 +125,7 @@ private: HashMap> txn_map_{}; WalManager *wal_mgr_; - Deque> beginned_txns_; // sorted by begin ts + Map begin_txn_by_ts_; HashSet finishing_txns_; // the txns in committing stage, can use flat_map Deque finished_txns_; // the txns that committed_ts From f6cb4b95a54d6ca0627792af8a3e83a7220cf144 Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Mon, 29 Jul 2024 16:01:57 +0800 Subject: [PATCH 4/5] Fix UT Signed-off-by: Jin Hai --- src/storage/txn/txn_manager.cpp | 101 ++++++++++++++++++++------------ 1 file changed, 62 insertions(+), 39 deletions(-) diff --git a/src/storage/txn/txn_manager.cpp b/src/storage/txn/txn_manager.cpp index 7b5a99b3dc..5972606d5f 100644 --- a/src/storage/txn/txn_manager.cpp +++ b/src/storage/txn/txn_manager.cpp @@ -294,41 +294,51 @@ void TxnManager::FinishTxn(Txn *txn) { auto txn_state = txn->GetTxnState(); auto txn_type = txn->GetTxnType(); + TxnTimeStamp finished_ts = ++start_ts_; + std::lock_guard guard(locker_); if (txn_state == TxnState::kRollbacked) { // Rollback by TXN self txn_map_.erase(txn->TxnID()); begin_txn_by_ts_.erase(txn->BeginTS()); - return; - } - - if (txn_type == TxnType::kRead) { - // Read only TXN - txn_map_.erase(txn->TxnID()); - begin_txn_by_ts_.erase(txn->BeginTS()); - return; - } else if (txn_type == TxnType::kInvalid) { - String error_message = "Txn type is invalid"; - UnrecoverableError(error_message); - } - TxnTimeStamp finished_ts = ++start_ts_; + // If rollback happened before commit phase, finishing_txns won't involve the txn. + // If rollback happened at the commit phase which only caused by txn conflict, the finishing_txns will involve the txn. + finishing_txns_.erase(txn); - if (txn_state == TxnState::kCommitting) { - txn->SetTxnCommitted(finished_ts); - finished_txns_.emplace_back(txn); - } else if (txn_state == TxnState::kRollbacking) { - txn->SetTxnRollbacked(); } else { - String error_message = fmt::format("Invalid transaction status: {}", ToString(txn_state)); - UnrecoverableError(error_message); - } - SizeT remove_n = finishing_txns_.erase(txn); - if (remove_n == 0) { - UnrecoverableError("Txn not found in finishing_txns_"); + // Committing status and Write TXN + switch(txn_type) { + case TxnType::kRead: { + // Read only TXN + txn_map_.erase(txn->TxnID()); + begin_txn_by_ts_.erase(txn->BeginTS()); + return; + } + case TxnType::kInvalid: { + String error_message = "Txn type is invalid"; + UnrecoverableError(error_message); + } + case TxnType::kWrite: { + break; + } + } + + if (txn_state == TxnState::kCommitting) { + txn->SetTxnCommitted(finished_ts); + finished_txns_.emplace_back(txn); + } else { + String error_message = fmt::format("Invalid transaction status: {}", ToString(txn_state)); + UnrecoverableError(error_message); + } + + SizeT remove_n = finishing_txns_.erase(txn); + if (remove_n == 0) { + UnrecoverableError("Txn not found in finishing_txns_"); + } } - TxnTimeStamp least_uncommitted_begin_ts = finished_ts; + TxnTimeStamp oldest_uncommitted_begin_ts = finished_ts; while (!begin_txn_by_ts_.empty()) { auto first_txn_ts = begin_txn_by_ts_.begin()->first; auto first_txn_id = begin_txn_by_ts_.begin()->second; @@ -338,27 +348,40 @@ void TxnManager::FinishTxn(Txn *txn) { } Txn* first_txn_ptr = txn_iter->second.get(); auto status = first_txn_ptr->GetTxnState(); - if (status == TxnState::kCommitted || status == TxnState::kRollbacked) { + + if(status == TxnState::kCommitted) { begin_txn_by_ts_.erase(first_txn_ts); - } else { - least_uncommitted_begin_ts = first_txn_ptr->BeginTS(); - break; + continue; + } + + if(status == TxnState::kRollbacked) { + UnrecoverableError(fmt::format("Rollback Txn: {}, should already be removed before.", first_txn_id)); + } + + if(status == TxnState::kInvalid) { + UnrecoverableError(fmt::format("Txn: {} has invalid txn state.", first_txn_id)); } + + oldest_uncommitted_begin_ts = first_txn_ptr->BeginTS(); + break; } while (!finished_txns_.empty()) { auto *finished_txn = finished_txns_.front(); - if (finished_txn->CommittedTS() > least_uncommitted_begin_ts) { - break; - } auto finished_txn_id = finished_txn->TxnID(); - finished_txns_.pop_front(); - - // LOG_INFO(fmt::format("Txn: {} is erased", finished_txn_id)); - SizeT remove_n = txn_map_.erase(finished_txn_id); - if (remove_n == 0) { - String error_message = fmt::format("Txn: {} not found in txn map", finished_txn_id); - UnrecoverableError(error_message); + if(finished_txn->CommittedTS() <= oldest_uncommitted_begin_ts) { + // The finished txn is committed before the oldest commit or rollback TXN ts + finished_txns_.pop_front(); + + // LOG_INFO(fmt::format("Txn: {} is erased", finished_txn_id)); + SizeT remove_n = txn_map_.erase(finished_txn_id); + if (remove_n == 0) { + String error_message = fmt::format("Txn: {} not found in txn map", finished_txn_id); + UnrecoverableError(error_message); + } + } else { + LOG_TRACE(fmt::format("{} finish TS: {}, later than {}", finished_txn_id, finished_txn->CommittedTS(), oldest_uncommitted_begin_ts)); + break; } } } From b28f1b720700a9e85d7c77f5490e9e24bb1c3187 Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Mon, 29 Jul 2024 17:10:57 +0800 Subject: [PATCH 5/5] Fix UT Signed-off-by: Jin Hai --- src/storage/txn/txn_manager.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/storage/txn/txn_manager.cpp b/src/storage/txn/txn_manager.cpp index 5972606d5f..af17507acf 100644 --- a/src/storage/txn/txn_manager.cpp +++ b/src/storage/txn/txn_manager.cpp @@ -299,8 +299,8 @@ void TxnManager::FinishTxn(Txn *txn) { std::lock_guard guard(locker_); if (txn_state == TxnState::kRollbacked) { // Rollback by TXN self - txn_map_.erase(txn->TxnID()); begin_txn_by_ts_.erase(txn->BeginTS()); + txn_map_.erase(txn->TxnID()); // If rollback happened before commit phase, finishing_txns won't involve the txn. // If rollback happened at the commit phase which only caused by txn conflict, the finishing_txns will involve the txn. @@ -311,8 +311,8 @@ void TxnManager::FinishTxn(Txn *txn) { switch(txn_type) { case TxnType::kRead: { // Read only TXN - txn_map_.erase(txn->TxnID()); begin_txn_by_ts_.erase(txn->BeginTS()); + txn_map_.erase(txn->TxnID()); return; } case TxnType::kInvalid: {