From fb6efe4e70412e1214946cc00f8ac4c2894880d3 Mon Sep 17 00:00:00 2001 From: olegkubrakov Date: Mon, 4 Nov 2024 16:21:55 -0800 Subject: [PATCH] Update Channel Monitor without broadcasting transactions Allowing Channel Monitor to be updated without executing any commands outside ChannelMonitor. This allows reading Channel Monitor in MonitorUpdatingPersister without using broadcaster and fee estimator and broadcast claims when the node is ready for it. --- lightning/src/chain/channelmonitor.rs | 378 ++++++++++++++++++++++---- 1 file changed, 326 insertions(+), 52 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 2d76c09f1bb..ffc721e43fe 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -1548,7 +1548,42 @@ impl ChannelMonitor { { let mut inner = self.inner.lock().unwrap(); let logger = WithChannelMonitor::from_impl(logger, &*inner, None); - inner.update_monitor(updates, broadcaster, fee_estimator, &logger) + let res = inner.update_monitor_opt_broadcast(updates, Some((broadcaster, fee_estimator)), &logger); + match res { + Ok(results) => { + if !results.1 { + return Err(()) + } + for res in results.0 { + match res { + Err(_) => return Err(()), + Ok(claims) => assert!(claims.len() == 0) + } + }; + Ok(()) + }, + Err(()) => Err(()) + } + } + + /// Updates a ChannelMonitor on the basis of some new information provided by the Channel + /// itself. Does not broadcast L1 transactions, but returns them instead to broadcast + /// at the caller convenience. + /// + /// panics if the given update is not the next update by update_id. + pub fn update_monitor_without_broadcast( + &self, + updates: &ChannelMonitorUpdate, + logger: &L, + ) ->Result<(Vec, ()>>, bool), ()> + where + B::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + { + let mut inner = self.inner.lock().unwrap(); + let logger = WithChannelMonitor::from_impl(logger, &*inner, None); + inner.update_monitor_opt_broadcast::(updates, None, &logger) } /// Gets the update_id from the latest ChannelMonitorUpdate which was applied to this @@ -3046,6 +3081,94 @@ impl ChannelMonitorImpl { } } } + + /// Provides a payment_hash->payment_preimage mapping. Will be automatically pruned when all + /// commitment_tx_infos which contain the payment hash have been revoked. + /// + /// Note that this is often called multiple times for the same payment and must be idempotent. + fn provide_payment_preimage_without_broadcasting( + &mut self, payment_hash: &PaymentHash, payment_preimage: &PaymentPreimage, + payment_info: &Option, logger: &WithChannelMonitor) -> Vec + where L::Target: Logger, + { + let mut claimable_htlcs = vec![]; + self.payment_preimages.entry(payment_hash.clone()) + .and_modify(|(_, payment_infos)| { + if let Some(payment_info) = payment_info { + if !payment_infos.contains(&payment_info) { + payment_infos.push(payment_info.clone()); + } + } + }) + .or_insert_with(|| { + (payment_preimage.clone(), payment_info.clone().into_iter().collect()) + }); + + let confirmed_spend_txid = self.funding_spend_confirmed.or_else(|| { + self.onchain_events_awaiting_threshold_conf.iter().find_map(|event| match event.event { + OnchainEvent::FundingSpendConfirmation { .. } => Some(event.txid), + _ => None, + }) + }); + let confirmed_spend_txid = if let Some(txid) = confirmed_spend_txid { + txid + } else { + return claimable_htlcs; + }; + + if let Some(txid) = self.current_counterparty_commitment_txid { + if txid == confirmed_spend_txid { + if let Some(commitment_number) = self.counterparty_commitment_txn_on_chain.get(&txid) { + let (htlc_claim_reqs, _) = self.get_counterparty_output_claim_info(*commitment_number, txid, None,self.counterparty_claimable_outpoints.get(&txid)); + claimable_htlcs.extend(htlc_claim_reqs); + } else { + debug_assert!(false); + log_error!(logger, "Detected counterparty commitment tx on-chain without tracking commitment number"); + } + return claimable_htlcs; + } + } + if let Some(txid) = self.prev_counterparty_commitment_txid { + if txid == confirmed_spend_txid { + if let Some(commitment_number) = self.counterparty_commitment_txn_on_chain.get(&txid) { + let (htlc_claim_reqs, _) = self.get_counterparty_output_claim_info(*commitment_number, txid, None,self.counterparty_claimable_outpoints.get(&txid)); + claimable_htlcs.extend(htlc_claim_reqs); + } else { + debug_assert!(false); + log_error!(logger, "Detected counterparty commitment tx on-chain without tracking commitment number"); + } + return claimable_htlcs; + } + } + + // Then if a holder commitment transaction has been seen on-chain, broadcast transactions + // claiming the HTLC output from each of the holder commitment transactions. + // Note that we can't just use `self.holder_tx_signed`, because that only covers the case where + // *we* sign a holder commitment transaction, not when e.g. a watchtower broadcasts one of our + // holder commitment transactions. + if self.broadcasted_holder_revokable_script.is_some() { + let holder_commitment_tx = if self.current_holder_commitment_tx.txid == confirmed_spend_txid { + Some(&self.current_holder_commitment_tx) + } else if let Some(prev_holder_commitment_tx) = &self.prev_holder_signed_commitment_tx { + if prev_holder_commitment_tx.txid == confirmed_spend_txid { + Some(prev_holder_commitment_tx) + } else { + None + } + } else { + None + }; + if let Some(holder_commitment_tx) = holder_commitment_tx { + // Assume that the broadcasted commitment transaction confirmed in the current best + // block. Even if not, its a reasonable metric for the bump criteria on the HTLC + // transactions. + let (claim_reqs, _) = self.get_broadcasted_holder_claims(&holder_commitment_tx, self.best_block.height); + claimable_htlcs.extend(claim_reqs); + //self.onchain_tx_handler.update_claims_view_from_requests(claim_reqs, self.best_block.height, self.best_block.height, broadcaster, conf_target, fee_estimator, logger); + } + } + claimable_htlcs + } fn generate_claimable_outpoints_and_watch_outputs(&mut self, reason: ClosureReason) -> (Vec, Vec) { let funding_outp = HolderFundingOutput::build( @@ -3109,32 +3232,46 @@ impl ChannelMonitorImpl { ); } - fn update_monitor( - &mut self, updates: &ChannelMonitorUpdate, broadcaster: &B, fee_estimator: &F, logger: &WithChannelMonitor - ) -> Result<(), ()> + /// Updates the channel monitor with the given updates, optionally broadcasting + /// claim transactions if needed. + /// + /// This function applies the given monitor updates and handles broadcasting of + /// claim transactions if `should_broadcast_claims` is provided. It is used to + /// update the monitor's state and handle any necessary on-chain claims. + /// + /// # Arguments + /// * `updates` - The [`ChannelMonitorUpdate`] containing the updates to apply + /// * `should_broadcast_claims` - Optional tuple of (broadcaster, fee_estimator) + /// used to broadcast claim transactions. If None, claim + /// transactions will be generated but not broadcast. + /// * `logger` - Logger to use for logging update information + /// + /// # Returns + /// Returns a tuple containing: + /// * A vector of results, one per update step, where each result is either: + /// - Ok(Vec) - The claim transactions that should + /// be broadcast for this update + /// - Err(()) - If the update failed to apply + /// * A boolean indicating whether further updates can be applied: + /// - true if updates were successful and more can be applied + /// - false if an error occurred and no more updates should be accepted + fn update_monitor_opt_broadcast( + &mut self, updates: &ChannelMonitorUpdate, should_broadcast_claims: Option<(&B, &F)>, logger: &WithChannelMonitor + ) -> Result<(Vec, ()>>, bool), ()> where B::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, { - if self.latest_update_id == CLOSED_CHANNEL_UPDATE_ID && updates.update_id == CLOSED_CHANNEL_UPDATE_ID { - log_info!(logger, "Applying post-force-closed update to monitor {} with {} change(s).", - log_funding_info!(self), updates.updates.len()); - } else if updates.update_id == CLOSED_CHANNEL_UPDATE_ID { - log_info!(logger, "Applying force close update to monitor {} with {} change(s).", - log_funding_info!(self), updates.updates.len()); - } else { - log_info!(logger, "Applying update to monitor {}, bringing update_id from {} to {} with {} change(s).", - log_funding_info!(self), self.latest_update_id, updates.update_id, updates.updates.len()); - } + self.log_update_monitor(updates, logger); - if updates.counterparty_node_id.is_some() { - if self.counterparty_node_id.is_none() { - self.counterparty_node_id = updates.counterparty_node_id; - } else { - debug_assert_eq!(self.counterparty_node_id, updates.counterparty_node_id); - } - } + // Populate `counterparty_node_id` for legacy monitors. + match (updates.counterparty_node_id, self.counterparty_node_id) { + (Some(_), None) => self.counterparty_node_id = updates.counterparty_node_id, + (Some(counterparty_node_id_from_updates), Some(current_counterparty_node_id)) => debug_assert_eq!(counterparty_node_id_from_updates, current_counterparty_node_id), + _ => {} + }; + // Check if `updates` can be applied. // ChannelMonitor updates may be applied after force close if we receive a preimage for a // broadcasted commitment transaction HTLC output that we'd like to claim on-chain. If this // is the case, we no longer have guaranteed access to the monitor's update ID, so we use a @@ -3159,26 +3296,47 @@ impl ChannelMonitorImpl { } else if self.latest_update_id + 1 != updates.update_id { panic!("Attempted to apply ChannelMonitorUpdates out of order, check the update_id before passing an update to update_monitor!"); } - let mut ret = Ok(()); - let bounded_fee_estimator = LowerBoundedFeeEstimator::new(&**fee_estimator); + + // Process each update in the ChannelMonitorUpdate, executing the appropriate + // handler based on type. + // For each update: + // - Executes the corresponding handler function + // (provide_latest_holder_commitment_tx, provide_payment_preimage, etc) + // - Returns Result, ()> containing any claim transactions that + // should be broadcast + // - Collects results into update_results vector to track success/failure of each step + let mut update_results = Vec::new(); for update in updates.updates.iter() { - match update { + let update_result:Result, ()> = match update { ChannelMonitorUpdateStep::LatestHolderCommitmentTXInfo { commitment_tx, htlc_outputs, claimed_htlcs, nondust_htlc_sources } => { log_trace!(logger, "Updating ChannelMonitor with latest holder commitment transaction info"); if self.lockdown_from_offchain { panic!(); } if let Err(e) = self.provide_latest_holder_commitment_tx(commitment_tx.clone(), htlc_outputs.clone(), &claimed_htlcs, nondust_htlc_sources.clone()) { log_error!(logger, "Providing latest holder commitment transaction failed/was refused:"); log_error!(logger, " {}", e); - ret = Err(()); + Err(()) + } else { + Ok(vec![]) } } ChannelMonitorUpdateStep::LatestCounterpartyCommitmentTXInfo { commitment_txid, htlc_outputs, commitment_number, their_per_commitment_point, .. } => { log_trace!(logger, "Updating ChannelMonitor with latest counterparty commitment transaction info"); - self.provide_latest_counterparty_commitment_tx(*commitment_txid, htlc_outputs.clone(), *commitment_number, *their_per_commitment_point, logger) + self.provide_latest_counterparty_commitment_tx(*commitment_txid, htlc_outputs.clone(), *commitment_number, *their_per_commitment_point, logger); + Ok(vec![]) }, ChannelMonitorUpdateStep::PaymentPreimage { payment_preimage, payment_info } => { log_trace!(logger, "Updating ChannelMonitor with payment preimage"); - self.provide_payment_preimage(&PaymentHash(Sha256::hash(&payment_preimage.0[..]).to_byte_array()), &payment_preimage, payment_info, broadcaster, &bounded_fee_estimator, logger) + let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0[..]).to_byte_array()); + match should_broadcast_claims { + Some((broadcaster, fee_estimator)) => { + let bounded_fee_estimator = LowerBoundedFeeEstimator::new(&**fee_estimator); + self.provide_payment_preimage(&PaymentHash(Sha256::hash(&payment_preimage.0[..]).to_byte_array()), &payment_preimage, payment_info, broadcaster, &bounded_fee_estimator, logger); + Ok(vec![]) + }, + None => { + Ok(self.provide_payment_preimage_without_broadcasting(&payment_hash, &payment_preimage, payment_info, logger)) + } + } }, ChannelMonitorUpdateStep::CommitmentSecret { idx, secret } => { log_trace!(logger, "Updating ChannelMonitor with commitment secret"); @@ -3186,64 +3344,100 @@ impl ChannelMonitorImpl { debug_assert!(false, "Latest counterparty commitment secret was invalid"); log_error!(logger, "Providing latest counterparty commitment secret failed/was refused:"); log_error!(logger, " {}", e); - ret = Err(()); + Err(()) + } else { + Ok(vec![]) } }, ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast } => { log_trace!(logger, "Updating ChannelMonitor: channel force closed, should broadcast: {}", should_broadcast); self.lockdown_from_offchain = true; - if *should_broadcast { + let claimable_outpoints = if *should_broadcast { // There's no need to broadcast our commitment transaction if we've seen one // confirmed (even with 1 confirmation) as it'll be rejected as // duplicate/conflicting. - let detected_funding_spend = self.funding_spend_confirmed.is_some() || - self.onchain_events_awaiting_threshold_conf.iter().find(|event| match event.event { - OnchainEvent::FundingSpendConfirmation { .. } => true, - _ => false, - }).is_some(); - if detected_funding_spend { + if self.detected_funding_spend() { log_trace!(logger, "Avoiding commitment broadcast, already detected confirmed spend onchain"); continue; } - self.queue_latest_holder_commitment_txn_for_broadcast(broadcaster, &bounded_fee_estimator, logger); - } else if !self.holder_tx_signed { - log_error!(logger, "WARNING: You have a potentially-unsafe holder commitment transaction available to broadcast"); - log_error!(logger, " in channel monitor for channel {}!", &self.channel_id()); - log_error!(logger, " Read the docs for ChannelMonitor::broadcast_latest_holder_commitment_txn to take manual action!"); + + match should_broadcast_claims { + Some((broadcaster, fee_estimator)) => { + let bounded_fee_estimator = LowerBoundedFeeEstimator::new(&**fee_estimator); + self.queue_latest_holder_commitment_txn_for_broadcast(broadcaster, &bounded_fee_estimator, logger); + vec![] + }, + None => self.generate_claimable_outpoints_and_watch_outputs(ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) }).0 + } } else { - // If we generated a MonitorEvent::HolderForceClosed, the ChannelManager - // will still give us a ChannelForceClosed event with !should_broadcast, but we - // shouldn't print the scary warning above. - log_info!(logger, "Channel off-chain state closed after we broadcasted our latest commitment transaction."); - } + if !self.holder_tx_signed { + log_error!(logger, "WARNING: You have a potentially-unsafe holder commitment transaction available to broadcast"); + log_error!(logger, " in channel monitor for channel {}!", &self.channel_id()); + log_error!(logger, " Read the docs for ChannelMonitor::broadcast_latest_holder_commitment_txn to take manual action!"); + } else { + // If we generated a MonitorEvent::HolderForceClosed, the ChannelManager + // will still give us a ChannelForceClosed event with !should_broadcast, but we + // shouldn't print the scary warning above. + log_info!(logger, "Channel off-chain state closed after we broadcasted our latest commitment transaction."); + }; + vec![] + }; + Ok(claimable_outpoints) }, ChannelMonitorUpdateStep::ShutdownScript { scriptpubkey } => { log_trace!(logger, "Updating ChannelMonitor with shutdown script"); if let Some(shutdown_script) = self.shutdown_script.replace(scriptpubkey.clone()) { panic!("Attempted to replace shutdown script {} with {}", shutdown_script, scriptpubkey); } + Ok(vec![]) }, - } + }; + update_results.push(update_result); } #[cfg(debug_assertions)] { self.counterparty_commitment_txs_from_update(updates); } + // Used to signal the caller that an error happened and no further updates can be applied. + // Usually this means that the channel state is corrupted and it should be force closed. + let mut next_updates_allowed = !update_results.iter().any(|r| r.is_err()); + // If the updates succeeded and we were in an already closed channel state, then there's no // need to refuse any updates we expect to receive afer seeing a confirmed commitment. - if ret.is_ok() && updates.update_id == CLOSED_CHANNEL_UPDATE_ID && self.latest_update_id == updates.update_id { - return Ok(()); + if next_updates_allowed && updates.update_id == CLOSED_CHANNEL_UPDATE_ID && self.latest_update_id == updates.update_id { + return Ok((update_results, next_updates_allowed)); } self.latest_update_id = updates.update_id; // Refuse updates after we've detected a spend onchain, but only if we haven't processed a // force closed monitor update yet. - if ret.is_ok() && self.funding_spend_seen && self.latest_update_id != CLOSED_CHANNEL_UPDATE_ID { + if next_updates_allowed && self.funding_spend_seen && self.latest_update_id != CLOSED_CHANNEL_UPDATE_ID { log_error!(logger, "Refusing Channel Monitor Update as counterparty attempted to update commitment after funding was spent"); - Err(()) - } else { ret } + next_updates_allowed = false + }; + Ok((update_results, next_updates_allowed)) + } + + fn log_update_monitor( + &self, updates: &ChannelMonitorUpdate, logger: &WithChannelMonitor + ) where L::Target: Logger { + if self.latest_update_id == CLOSED_CHANNEL_UPDATE_ID && updates.update_id == CLOSED_CHANNEL_UPDATE_ID { + log_info!(logger, "Applying post-force-closed update to monitor {} with {} change(s).", + log_funding_info!(self), updates.updates.len()); + } else if updates.update_id == CLOSED_CHANNEL_UPDATE_ID { + log_info!(logger, "Applying force close update to monitor {} with {} change(s).", + log_funding_info!(self), updates.updates.len()); + } else { + log_info!(logger, "Applying update to monitor {}, bringing update_id from {} to {} with {} change(s).", + log_funding_info!(self), self.latest_update_id, updates.update_id, updates.updates.len()); + } + } + + fn detected_funding_spend(&self) -> bool { + let funding_spend_on_chain = self.onchain_events_awaiting_threshold_conf.iter().any(|event| matches!(event.event, OnchainEvent::FundingSpendConfirmation { .. })); + self.funding_spend_confirmed.is_some() || funding_spend_on_chain } fn get_latest_update_id(&self) -> u64 { @@ -5076,7 +5270,7 @@ mod tests { use crate::ln::channel_keys::{DelayedPaymentBasepoint, DelayedPaymentKey, HtlcBasepoint, RevocationBasepoint, RevocationKey}; use crate::ln::chan_utils::{self,HTLCOutputInCommitment, ChannelPublicKeys, ChannelTransactionParameters, HolderCommitmentTransaction, CounterpartyChannelTransactionParameters}; use crate::ln::channelmanager::{PaymentSendFailure, PaymentId, RecipientOnionFields}; - use crate::ln::functional_test_utils::*; + use crate::ln::functional_test_utils::{self, *}; use crate::ln::script::ShutdownScript; use crate::util::errors::APIError; use crate::util::test_utils::{TestLogger, TestBroadcaster, TestFeeEstimator}; @@ -5175,10 +5369,90 @@ mod tests { check_spends!(htlc_txn[0], broadcast_tx); check_spends!(htlc_txn[1], broadcast_tx); } + + fn do_test_funding_spend_refuses_updates_without_broadcast(use_local_txn: bool) { + // Previously, monitor updates were allowed freely even after a funding-spend transaction + // confirmed. This would allow a race condition where we could receive a payment (including + // the counterparty revoking their broadcasted state!) and accept it without recourse as + // long as the ChannelMonitor receives the block first, the full commitment update dance + // occurs after the block is connected, and before the ChannelManager receives the block. + // Obviously this is an incredibly contrived race given the counterparty would be risking + // their full channel balance for it, but its worth fixing nonetheless as it makes the + // potential ChannelMonitor states simpler to reason about. + // + // This test checks said behavior, as well as ensuring a ChannelMonitorUpdate with multiple + // updates is handled correctly in such conditions. + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); + let nodes = create_network(3, &node_cfgs, &node_chanmgrs); + let channel = create_announced_chan_between_nodes(&nodes, 0, 1); + create_announced_chan_between_nodes(&nodes, 1, 2); + + // Rebalance somewhat + send_payment(&nodes[0], &[&nodes[1]], 10_000_000); + + // First route two payments for testing at the end + let payment_preimage_1 = route_payment(&nodes[0], &[&nodes[1], &nodes[2]], 1_000_000).0; + let payment_preimage_2 = route_payment(&nodes[0], &[&nodes[1], &nodes[2]], 1_000_000).0; + + let local_txn = get_local_commitment_txn!(nodes[1], channel.2); + assert_eq!(local_txn.len(), 1); + let remote_txn = get_local_commitment_txn!(nodes[0], channel.2); + assert_eq!(remote_txn.len(), 3); // Commitment and two HTLC-Timeouts + check_spends!(remote_txn[1], remote_txn[0]); + check_spends!(remote_txn[2], remote_txn[0]); + let broadcast_tx = if use_local_txn { &local_txn[0] } else { &remote_txn[0] }; + + // Connect a commitment transaction, but only to the ChainMonitor/ChannelMonitor. The + // channel is now closed, but the ChannelManager doesn't know that yet. + let new_header = create_dummy_header(nodes[0].best_block_info().0, 0); + let conf_height = nodes[0].best_block_info().1 + 1; + nodes[1].chain_monitor.chain_monitor.transactions_confirmed(&new_header, + &[(0, broadcast_tx)], conf_height); + + let (_, pre_update_monitor) = <(BlockHash, ChannelMonitor)>::read( + &mut io::Cursor::new(&get_monitor!(nodes[1], channel.2).encode()), + (&nodes[1].keys_manager.backing, &nodes[1].keys_manager.backing)).unwrap(); + + // If the ChannelManager tries to update the channel, however, the ChainMonitor will pass + // the update through to the ChannelMonitor which will refuse it (as the channel is closed). + let (route, payment_hash, _, payment_secret) = get_route_and_payment_hash!(nodes[1], nodes[0], 100_000); + unwrap_send_err!(nodes[1].node.send_payment_with_route(route, payment_hash, + RecipientOnionFields::secret_only(payment_secret), PaymentId(payment_hash.0) + ), false, APIError::MonitorUpdateInProgress, {}); + check_added_monitors!(nodes[1], 1); + + // Build a new ChannelMonitorUpdate which contains both the failing commitment tx update + // and provides the claim preimages for the two pending HTLCs. The first update generates + // an error, but the point of this test is to ensure the later updates are still applied. + let monitor_updates = nodes[1].chain_monitor.monitor_updates.lock().unwrap(); + let mut replay_update = monitor_updates.get(&channel.2).unwrap().iter().rev().next().unwrap().clone(); + assert_eq!(replay_update.updates.len(), 1); + if let ChannelMonitorUpdateStep::LatestCounterpartyCommitmentTXInfo { .. } = replay_update.updates[0] { + } else { panic!(); } + replay_update.updates.push(ChannelMonitorUpdateStep::PaymentPreimage { + payment_preimage: payment_preimage_1, payment_info: None, + }); + replay_update.updates.push(ChannelMonitorUpdateStep::PaymentPreimage { + payment_preimage: payment_preimage_2, payment_info: None, + }); + + let res = pre_update_monitor.update_monitor_without_broadcast::<&TestBroadcaster, &TestFeeEstimator, _>(&replay_update, &nodes[1].logger).unwrap(); + assert_eq!(false, res.1); + + // Even though we error'd on the first update, we should still have generated an HTLC claim + // transaction + let txn_broadcasted = res.0; + assert!(txn_broadcasted.iter().fold(0, |len, r| len + r.as_ref().unwrap_or(&vec![]).len()) >= 2); + } + #[test] fn test_funding_spend_refuses_updates() { do_test_funding_spend_refuses_updates(true); do_test_funding_spend_refuses_updates(false); + do_test_funding_spend_refuses_updates_without_broadcast(true); + do_test_funding_spend_refuses_updates_without_broadcast(false); } #[test]