diff --git a/tests/09_1-push_many_files_no_ipfs.test.sh b/tests/09_1-push_many_files_no_ipfs.test.sh index 82cc64229..8c199ad5c 100755 --- a/tests/09_1-push_many_files_no_ipfs.test.sh +++ b/tests/09_1-push_many_files_no_ipfs.test.sh @@ -4,6 +4,9 @@ set -o pipefail . ./util.sh set -x +FILES_CNT=500 +BATCH_SIZE=200 + # Test 19_1 pushes many files not to ipfs # It checks that contracts correctly work with big amount of files, especially that they don't exceed their balances # while processing big amount of diffs (that's why it is important to have files not in ipfs) and correctly ask for @@ -12,7 +15,7 @@ set -x # 2. Create dev branch and push a big amount of files to one commit # 3. Clone the repo and check dev branch to have an appropriate number of files -REPO_NAME="repo19_$(date +%s)" +REPO_NAME="repo09_1_$(date +%s)" [ -d $REPO_NAME ] && rm -rf $REPO_NAME [ -d $REPO_NAME"-clone" ] && rm -rf $REPO_NAME"-clone" @@ -33,22 +36,16 @@ git config user.email "foo@bar.com" git config user.name "My name" git branch -m main -echo 1 > 1.txt -git add 1.txt -git commit -m init -git push -u origin main - -git checkout -b dev echo "***** Generating files *****" if [[ "$VERSION" =~ "v4_x" ]]; then FILES_CNT=504 for n in {1..500}; do - echo "$n$n$n" > "$n.txt" + echo "$n$n$n" > "$n.txt" done else - FILES_CNT=304 - for n in {1..300}; do - echo "$n$n$n" > "$n.txt" + for n in $(seq 1 $FILES_CNT); do + num=$(printf "%03d" ${n}) + echo $num > "${num}.txt" done fi @@ -57,7 +54,7 @@ echo $(ls -la | wc -l) echo "***** Pushing file to the repo *****" git add * git commit -m push -git push -u origin dev +GOSH_TRACE=1 GOSH_PUSH_CHUNK=$BATCH_SIZE git push -u origin main &> ../trace_09_1.log echo "***** cloning repo *****" cd .. @@ -68,8 +65,8 @@ git clone gosh://$SYSTEM_CONTRACT_ADDR/$DAO_NAME/$REPO_NAME $REPO_NAME"-clone" echo "***** check repo *****" cd "$REPO_NAME-clone" -git checkout dev -cur_ver=$(ls -la | wc -l) + +cur_ver=$(ls -l | sed 1d | wc -l) if [ "$cur_ver" != "$FILES_CNT" ]; then echo "WRONG NUMBER OF FILES" exit 1 diff --git a/tests/set-vars.sh b/tests/set-vars.sh index 1c859c978..8bc23d1f0 100755 --- a/tests/set-vars.sh +++ b/tests/set-vars.sh @@ -27,6 +27,8 @@ tonos-cli config --url $NETWORK set -x # Should exist VersionConrtroler and SystemContract contracts +export VERSION_CONTROLLER=`cat $GOSH_PATH/VersionController.addr` +echo "export VERSION_CONTROLLER=$VERSION_CONTROLLER" >> env.env export SYSTEM_CONTRACT_ADDR=`cat $GOSH_PATH/SystemContract.addr` echo "export SYSTEM_CONTRACT_ADDR=$SYSTEM_CONTRACT_ADDR" >> env.env diff --git a/tests/util.sh b/tests/util.sh index cdef64200..fde7b5f04 100755 --- a/tests/util.sh +++ b/tests/util.sh @@ -83,7 +83,8 @@ function wait_account_active { contract_addr=$1 is_ok=0 while [ $SECONDS -lt $stop_at ]; do - status=`tonos-cli -j -u $NETWORK account $contract_addr | jq -r '."'"$contract_addr"'".acc_type'` + # status=`tonos-cli -j -u $NETWORK account $contract_addr | jq -r '."'"$contract_addr"'".acc_type'` + status=`tonos-cli -j -u $NETWORK account $contract_addr | jq -r .acc_type` if [ "$status" = "Active" ]; then is_ok=1 echo account is active diff --git a/v6_x/v6.1.0/git-remote-gosh/Cargo.toml b/v6_x/v6.1.0/git-remote-gosh/Cargo.toml index 46eeb8ac0..ccd8e0d44 100644 --- a/v6_x/v6.1.0/git-remote-gosh/Cargo.toml +++ b/v6_x/v6.1.0/git-remote-gosh/Cargo.toml @@ -66,7 +66,7 @@ opentelemetry = {version = "0.18.0", features = ["rt-tokio"]} opentelemetry-otlp = {version = "0.11.0", features = ["grpc-tonic"]} opentelemetry-semantic-conventions = "0.10.0" primitive-types = '0.12.1' -proc-macro2 = "=1.0.52" +# proc-macro2 = "=1.0.52" reqwest-middleware = "0.2.0" reqwest-tracing = "0.4.0" rmp-serde = "1.1.1" @@ -107,14 +107,14 @@ version = '1.21.2' [dependencies.ton_client] git = 'https://github.com/gosh-sh/ever-sdk.git' default-features = false -features = ['std', 'rustls-tls-webpki-roots'] +features = ['std'] package = 'ton_client' -tag = "1.43.1-rustls" +tag = "1.44.3" [dependencies.ton_sdk] git = 'https://github.com/gosh-sh/ever-sdk.git' package = 'ton_sdk' -tag = "1.43.1-rustls" +tag = "1.44.3" [dependencies.zstd] default-features = false diff --git a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/call.rs b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/call.rs index f1cefa6b5..b2557454b 100644 --- a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/call.rs +++ b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/call.rs @@ -5,19 +5,22 @@ pub use crate::abi as gosh_abi; use crate::blockchain::{default_callback, BlockchainService, GoshContract}; use async_trait::async_trait; use std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, sync::Arc, time::{Duration, Instant}, }; use ton_client::{ - abi::{CallSet, ParamsOfEncodeMessage, ResultOfEncodeMessage, Signer}, + abi::{CallSet, ParamsOfEncodeMessage, ResultOfEncodeMessage, Signer, FunctionHeader}, processing::{ - ParamsOfProcessMessage, ParamsOfSendMessage, ResultOfProcessMessage, ResultOfSendMessage, + MessageSendingParams, ParamsOfProcessMessage, ParamsOfSendMessage, + ResultOfProcessMessage, ResultOfSendMessage, ParamsOfSendMessages, }, }; use tracing::Instrument; #[async_trait] -pub(super) trait BlockchainCall { +pub trait BlockchainCall { async fn call( &self, contract: &C, @@ -36,6 +39,22 @@ pub(super) trait BlockchainCall { ) -> anyhow::Result where C: ContractInfo + Sync; + + async fn send_messages( + &self, + messages: &Vec<(String, Option)>, + wait_until: u32, + ) -> anyhow::Result; + + async fn construct_boc( + &self, + contract: &C, + function_name: &str, + args: Option, + expire: Option, + ) -> anyhow::Result<(String, String)> + where + C: ContractInfo + Sync; } #[async_trait] @@ -119,40 +138,11 @@ impl BlockchainCall for Everscale { args, expected_address, ); - let call_set = match args { - Some(value) => CallSet::some_with_function_and_input(function_name, value), - None => CallSet::some_with_function(function_name), - }; - let signer = match contract.get_keys() { - Some(key_pair) => Signer::Keys { - keys: key_pair.to_owned(), - }, - None => Signer::None, - }; - let ResultOfEncodeMessage { - message, - message_id, - address, - .. - } = ton_client::abi::encode_message( - Arc::clone(self.client()), - ParamsOfEncodeMessage { - abi: contract.get_abi().to_owned(), - address: Some(String::from(contract.get_address().clone())), - call_set, - signer, - deploy_set: None, - processing_try_index: None, - signature_id: None, - }, - ) - .await?; + let (message_id, message) = + self.construct_boc(contract, function_name, args, None).await?; - tracing::trace!( - "sending message ({message_id}) to {}", - contract.get_address() - ); + tracing::trace!("sending message ({message_id}) to {}", contract.get_address()); let ResultOfSendMessage { shard_block_id, sending_endpoints, @@ -197,4 +187,91 @@ impl BlockchainCall for Everscale { }; Ok(call_result) } + + async fn send_messages( + &self, + messages: &Vec<(String, Option)>, + wait_until: u32, + ) -> anyhow::Result { + let messages_num = messages.len(); + let mut hasher = DefaultHasher::new(); + Hash::hash_slice(messages, &mut hasher); + let qkey = format!("{:x}", hasher.finish()); + let messages: Vec = messages + .iter() + .map(|(boc, addr)| MessageSendingParams { + boc: boc.to_string(), + wait_until, + user_data: if let Some(addr) = addr { + Some(serde_json::json!({"expected_address": addr})) + } else { + None + }, + }) + .collect(); + + let params = ParamsOfSendMessages { + messages, + monitor_queue: Some(qkey.to_string()) + }; + ton_client::processing::send_messages(self.client().clone(), params).await?; + tracing::trace!("sent {} messages", messages_num); + Ok(qkey) + } + + #[instrument(level = "info", skip_all)] + async fn construct_boc( + &self, + contract: &C, + function_name: &str, + args: Option, + expire: Option, + ) -> anyhow::Result<(String, String)> + where + C: ContractInfo + Sync, + { + tracing::trace!( + "message BOC constructing: contract.address: {:?}, function: {}, args: {:?}", + contract.get_address().clone(), + function_name, + args, + ); + let call_set = match args { + Some(value) => Some(CallSet { + function_name: function_name.into(), + header: Some(FunctionHeader { + expire, + ..Default::default() + }), + input: Some(value) + }), + None => CallSet::some_with_function(function_name), + }; + let signer = match contract.get_keys() { + Some(key_pair) => Signer::Keys { + keys: key_pair.to_owned(), + }, + None => Signer::None, + }; + + let ResultOfEncodeMessage { + message_id, + message, + .. + } = ton_client::abi::encode_message( + Arc::clone(self.client()), + ParamsOfEncodeMessage { + abi: contract.get_abi().to_owned(), + address: Some(String::from(contract.get_address().clone())), + call_set, + signer, + deploy_set: None, + processing_try_index: None, + signature_id: None, + }, + ) + .await?; + + Ok((message_id, message)) + } } diff --git a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/commit/mod.rs b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/commit/mod.rs index 34478facc..0b9cd1520 100644 --- a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/commit/mod.rs +++ b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/commit/mod.rs @@ -145,8 +145,7 @@ pub async fn get_set_commit_created_at_time( is_internal: true, ..Default::default() }, - ) - .await?; + )?; tracing::trace!("Decoded message `{}`", decoded.name); if decoded.name == "setCommit" { diff --git a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/commit/save.rs b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/commit/save.rs index 290377533..73d40afd1 100644 --- a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/commit/save.rs +++ b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/commit/save.rs @@ -360,8 +360,7 @@ pub async fn find_messages( is_internal: true, ..Default::default() }, - ) - .await; + ); got_new_messages = true; already_processed_messages.insert(message.id.clone(), true); diff --git a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/mod.rs b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/mod.rs index 91ad28b3c..a9b37bc24 100644 --- a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/mod.rs +++ b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/mod.rs @@ -115,7 +115,7 @@ pub struct AddrVersion { } #[derive(Deserialize, Debug)] -struct CallResult { +pub struct CallResult { #[serde(rename = "id")] trx_id: String, status: u8, @@ -126,7 +126,7 @@ struct CallResult { } #[derive(Deserialize, Debug)] -struct SendMessageResult { +pub struct SendMessageResult { shard_block_id: String, message_id: String, sending_endpoints: Vec, @@ -444,8 +444,7 @@ async fn run_static( boc, cache_type: BocCacheType::Pinned { pin: pin }, }, - ) - .await?; + )?; // write lock { let mut refs = PINNED_CONTRACT_BOCREFS.write().await; @@ -767,7 +766,7 @@ pub async fn calculate_boc_hash(context: &EverClient, code: &str) -> anyhow::Res let params = ParamsOfGetBocHash { boc: code.to_owned(), }; - let ResultOfGetBocHash { hash } = get_boc_hash(Arc::clone(context), params).await?; + let ResultOfGetBocHash { hash } = get_boc_hash(Arc::clone(context), params)?; Ok(hash) } @@ -794,7 +793,7 @@ pub async fn calculate_contract_address( }; let ResultOfEncodeInitialData { data } = - encode_initial_data(Arc::clone(context), params).await?; + encode_initial_data(Arc::clone(context), params)?; let params = ParamsOfEncodeStateInit { code: Some(code.to_owned()), @@ -803,7 +802,7 @@ pub async fn calculate_contract_address( }; let ResultOfEncodeStateInit { state_init } = - encode_state_init(Arc::clone(context), params).await?; + encode_state_init(Arc::clone(context), params)?; let hash = calculate_boc_hash(context, &state_init).await?; diff --git a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/service.rs b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/service.rs index e40e433a2..c59cc2dda 100644 --- a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/service.rs +++ b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/service.rs @@ -1,5 +1,6 @@ use super::{ branch::{DeleteBranch, DeployBranch}, + call::BlockchainCall, commit::save::BlockchainCommitPusher, contract::ContractRead, snapshot::save::{DeleteSnapshot, DeployDiff, DeployNewSnapshot}, @@ -53,6 +54,7 @@ pub trait BlockchainService: + Sync + Send // + + BlockchainCall + BlockchainCommitService + BlockchainCommitPusher + BlockchainUserWalletService @@ -234,6 +236,18 @@ pub mod tests { index2: u32, last: bool, ) -> anyhow::Result<()>; + + async fn construct_deploy_diff_message( + &self, + wallet: &UserWallet, + repo_name: String, + branch_name: String, + commit_id: String, + diffs: Diff, + index1: u32, + index2: u32, + last: bool, + ) -> anyhow::Result; } #[async_trait] @@ -340,6 +354,7 @@ pub mod tests { fn root_contract(&self) -> &GoshContract; fn repo_contract(&self) -> &GoshContract; } + #[async_trait] impl BlockchainReadContractState for Everscale { async fn check_contracts_state( diff --git a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/snapshot/load/diffs/iterator.rs b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/snapshot/load/diffs/iterator.rs index a56d3c101..a7ca242f0 100644 --- a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/snapshot/load/diffs/iterator.rs +++ b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/snapshot/load/diffs/iterator.rs @@ -390,8 +390,7 @@ pub async fn load_messages_to( is_internal: true, ..Default::default() }, - ) - .await; + ); if let Err(ref e) = decoding_result { tracing::trace!("decode_message_body error: {:#?}", e); @@ -481,8 +480,7 @@ pub async fn load_constructor( is_internal: true, ..Default::default() }, - ) - .await; + ); if let Err(ref e) = decoding_result { tracing::trace!("decode_message_body error: {:#?}", e); diff --git a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/snapshot/save.rs b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/snapshot/save.rs index d153f00ec..d871f8867 100644 --- a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/snapshot/save.rs +++ b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/snapshot/save.rs @@ -109,6 +109,19 @@ pub trait DeployDiff { index2: u32, last: bool, ) -> anyhow::Result<()>; + + async fn construct_deploy_diff_message( + &self, + wallet: &UserWallet, + repo_name: String, + branch_name: String, + commit_id: String, + diffs: Diff, + index1: u32, + index2: u32, + last: bool, + expire: u32, + ) -> anyhow::Result; } #[async_trait] @@ -125,7 +138,15 @@ impl DeployDiff for Everscale { index2: u32, last: bool, ) -> anyhow::Result<()> { - tracing::trace!("deploy_diff: repo_name={repo_name}, branch_name={branch_name}, commit_id={commit_id}, index1={index1}, index2={index2}, last={last}"); + tracing::trace!( + "deploy_diff: repo_name={}, branch_name={}, commit_id={}, index1={}, index2={}, last={}", + repo_name, + branch_name, + commit_id, + index1, + index2, + last, + ); let diffs = vec![diff]; let args = DeployDiffParams { repo_name, @@ -151,6 +172,54 @@ impl DeployDiff for Everscale { tracing::trace!("deployDiff result: {:?}", result); Ok(()) } + + #[instrument(level = "info", skip_all)] + async fn construct_deploy_diff_message( + &self, + wallet: &UserWallet, + repo_name: String, + branch_name: String, + commit_id: String, + diff: Diff, + index1: u32, + index2: u32, + last: bool, + expire: u32, + ) -> anyhow::Result { + tracing::trace!( + "construct_deploy_diff_message: repo_name={}, branch_name={}, commit_id={}, index1={}, index2={}, last={}", + repo_name, + branch_name, + commit_id, + index1, + index2, + last, + ); + let diffs = vec![diff]; + let args = DeployDiffParams { + repo_name, + branch_name, + commit_id, + diffs, + index1, + index2, + last, + }; + + let wallet_contract = wallet.take_one().await?; + tracing::trace!("Acquired wallet: {}", wallet_contract.get_address()); + let (message_id, boc) = self + .construct_boc( + wallet_contract.deref(), + "deployDiff", + Some(serde_json::to_value(args)?), + Some(expire), + ) + .await?; + drop(wallet_contract); + tracing::trace!("construct_deploy_diff_message done: {message_id}"); + Ok(boc) + } } #[async_trait] @@ -164,6 +233,17 @@ pub trait DeployNewSnapshot { content: String, ipfs: Option, ) -> anyhow::Result<()>; + + async fn construct_deploy_snapshot_message( + &self, + wallet: &UserWallet, + repo_address: BlockchainContractAddress, + commit_id: String, + file_path: String, + content: String, + ipfs: Option, + expire: u32, + ) -> anyhow::Result; } #[async_trait] @@ -204,6 +284,41 @@ impl DeployNewSnapshot for Everscale { } result } + + #[instrument(level = "info", skip_all)] + async fn construct_deploy_snapshot_message( + &self, + wallet: &UserWallet, + repo_address: BlockchainContractAddress, + commit_id: String, + file_path: String, + content: String, + ipfs: Option, + expire: u32, + ) -> anyhow::Result { + tracing::trace!("deploy_new_snapshot: repo_address={repo_address}, commit_id={commit_id}, file_path={file_path}"); + let args = DeploySnapshotParams { + repo_address, + commit_sha: commit_id, + file_path, + content, + ipfs, + is_pin: false, + }; + let wallet_contract = wallet.take_one().await?; + tracing::trace!("Acquired wallet: {}", wallet_contract.get_address()); + let (message_id, boc) = self + .construct_boc( + wallet_contract.deref(), + "deployNewSnapshot", + Some(serde_json::to_value(args)?), + Some(expire), + ) + .await?; + drop(wallet_contract); + tracing::trace!("construct_deploy_snapshot_message done: {message_id}"); + Ok(boc) + } } #[async_trait] diff --git a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/tree/save.rs b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/tree/save.rs index 553a4841a..9986c42a0 100644 --- a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/tree/save.rs +++ b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/tree/save.rs @@ -1,10 +1,8 @@ use crate::blockchain::{ self, call::BlockchainCall, contract::ContractInfo, gosh_abi, user_wallet::UserWallet, - BlockchainContractAddress, BlockchainService, Everscale, GoshBlobBitFlags, GoshContract, Tree, + BlockchainContractAddress, BlockchainService, Everscale, GoshContract, Tree, }; use async_trait::async_trait; -use git_object; -use git_object::tree; use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; diff --git a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/tvm_hash/mod.rs b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/tvm_hash/mod.rs index ebf99f3df..ddfdea2e5 100644 --- a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/tvm_hash/mod.rs +++ b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/tvm_hash/mod.rs @@ -19,7 +19,7 @@ pub async fn tvm_hash(context: &EverClient, data: &[u8]) -> anyhow::Result anyhow::Result anyhow::Result( +pub(super) async fn get_user_wallet_config_max_number_of_mirrors( blockchain: &B, - user_wallet_contract: &C, + dao_address: &BlockchainContractAddress, ) -> anyhow::Result where B: BlockchainService + BlockchainCall, - C: ContractRead + ContractInfo + Sync, { - tracing::trace!("get_user_wallet_config_max_number_of_mirrors: user_wallet_contract={user_wallet_contract:?}"); - let result: GetConfigResult = user_wallet_contract + let dao_contract = GoshContract::new(dao_address, abi::DAO); + tracing::trace!("get_user_wallet_config_max_number_of_mirrors: dao_contract={dao_contract:?}"); + let result: GetConfigResult = dao_contract .read_state(blockchain.client(), "getConfig", None) .await?; - tracing::trace!( - "get_user_wallet_config_max_number_of_mirrors result: {:?}", - result - ); + tracing::trace!("get_user_wallet_config_max_number_of_mirrors result: {:?}", result); let number = result.max_number_of_mirror_wallets.into(); Ok(number) } diff --git a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/user_wallet/mod.rs b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/user_wallet/mod.rs index bc4c0454b..5f2d7e1b4 100644 --- a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/user_wallet/mod.rs +++ b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/user_wallet/mod.rs @@ -46,7 +46,8 @@ impl BlockchainUserWalletService for Everscale { .await?; } if !_USER_WALLET.is_mirrors_ready().await { - let init_mirrors_result = _USER_WALLET.try_init_mirrors(self).await; + let init_mirrors_result = + _USER_WALLET.try_init_mirrors(self, dao_address).await; if let Err(e) = init_mirrors_result { tracing::debug!("init mirrors error: {}", e); } diff --git a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/user_wallet/state.rs b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/user_wallet/state.rs index 83094b429..6206e4ca3 100644 --- a/v6_x/v6.1.0/git-remote-gosh/src/blockchain/user_wallet/state.rs +++ b/v6_x/v6.1.0/git-remote-gosh/src/blockchain/user_wallet/state.rs @@ -163,7 +163,10 @@ impl UserWalletMirrors { Ok(()) } - pub(super) async fn try_init_mirrors(&self, blockchain: &B) -> anyhow::Result<()> + pub(super) async fn try_init_mirrors( + &self, blockchain: &B, + dao_address: &BlockchainContractAddress, + ) -> anyhow::Result<()> where B: BlockchainService + BlockchainCall, { @@ -223,9 +226,11 @@ impl UserWalletMirrors { match max_number_of_wallets { Some(w) => w, None => { - let n = - get_user_wallet_config_max_number_of_mirrors(blockchain, &zero_wallet) - .await?; + let n = get_user_wallet_config_max_number_of_mirrors( + blockchain, + dao_address + ) + .await?; let mut inner_state = self.inner.write().await; let w: TWalletMirrorIndex = (n + 1) as TWalletMirrorIndex; inner_state.max_number_of_wallets = Some(w); diff --git a/v6_x/v6.1.0/git-remote-gosh/src/git_helper/push/mod.rs b/v6_x/v6.1.0/git-remote-gosh/src/git_helper/push/mod.rs index b27023c36..dd787b898 100644 --- a/v6_x/v6.1.0/git-remote-gosh/src/git_helper/push/mod.rs +++ b/v6_x/v6.1.0/git-remote-gosh/src/git_helper/push/mod.rs @@ -1,28 +1,36 @@ -use super::GitHelper; use crate::{ blockchain::{ + get_commit_address, branch::DeleteBranch, contract::{ContractRead, GoshContract}, - get_commit_address, gosh_abi, AddrVersion, BlockchainContractAddress, BlockchainService, - GetNameCommitResult, MAX_ACCOUNTS_ADDRESSES_PER_QUERY, ZERO_SHA, + gosh_abi, + AddrVersion, BlockchainContractAddress, BlockchainService, GetNameCommitResult, + MAX_ACCOUNTS_ADDRESSES_PER_QUERY, ZERO_SHA, + }, + git_helper::{ + push::create_branch::CreateBranchOperation, + GitHelper, }, - git_helper::push::create_branch::CreateBranchOperation, }; use git_hash::{self, ObjectId}; use git_odb::Find; -use std::collections::VecDeque; use std::{ - collections::{HashMap, HashSet}, + collections::{HashMap, HashSet, VecDeque}, str::FromStr, sync::Arc, + time::{SystemTime, Duration, Instant}, vec::Vec, }; -use ton_client::net::ParamsOfQuery; - +use ton_client::{ + net::ParamsOfQuery, + processing::{ + fetch_next_monitor_results, + ParamsOfFetchNextMonitorResults, MessageMonitoringResult, MessageMonitoringStatus, + }, + utils::compress_zstd, +}; use tokio::sync::Semaphore; -use ton_client::utils::compress_zstd; - pub mod create_branch; pub(crate) mod parallel_diffs_upload_support; mod utilities; @@ -34,7 +42,7 @@ use push_tag::push_tag; mod delete_tag; pub(crate) mod parallel_snapshot_upload_support; -use crate::blockchain::{branch_list, get_commit_by_addr, Snapshot, Tree, tree}; +use crate::blockchain::{branch_list, get_commit_by_addr, Snapshot, Tree}; use crate::git_helper::push::parallel_snapshot_upload_support::{ ParallelCommit, ParallelCommitUploadSupport, ParallelSnapshot, ParallelSnapshotUploadSupport, ParallelTreeUploadSupport, @@ -1196,7 +1204,7 @@ where } let files_cnt = parallel_snapshot_uploads.get_expected().len(); - parallel_snapshot_uploads.start_push(self).await?; + parallel_snapshot_uploads.push_snapshots_in_chunks(self).await?; let stored_snapshot_addresses = parallel_snapshot_uploads.get_expected().clone(); let db = self.get_db()?; @@ -1608,8 +1616,8 @@ where } // After we have all commits and trees deployed, start push of diffs - parallel_diffs_upload_support.start_push(self).await?; - parallel_snapshot_uploads.start_push(self).await?; + parallel_diffs_upload_support.push_diffs_in_chunks(self).await?; + parallel_snapshot_uploads.push_snapshots_in_chunks(self).await?; // clear database after all objects were deployed self.delete_db()?; @@ -1873,6 +1881,94 @@ fn get_list_of_commit_objects( Ok(res) } +#[instrument(level = "debug", skip_all)] +pub async fn wait_chunk_until_send( + blockchain: &impl BlockchainService, + chunk: &mut Vec, + queue: String, +) -> anyhow::Result<()> { + #[derive(Debug, Deserialize)] + struct UserData { + expected_address: String, + } + + let start = Instant::now(); + let timeout = Duration::from_secs(300); // todo remove magic num + + let expected_receipts_count = chunk.len(); + let mut receipts_count = 0; + loop { + let sent_messages = fetch_next_monitor_results( + blockchain.client().clone(), + ParamsOfFetchNextMonitorResults { + queue: queue.clone(), + wait_mode: None, + } + ).await?; + tracing::trace!("got {} receipts", sent_messages.results.len()); + + for sent_result in sent_messages.results { + receipts_count += 1; + + let MessageMonitoringResult { + hash, + status, + error, + user_data, + .. + } = sent_result; + + match status { + MessageMonitoringStatus::Finalized => { + let target_addr = match user_data { + Some(payload) => { + let UserData { expected_address } = serde_json::from_value(payload)?; + BlockchainContractAddress::new(expected_address) + }, + None => unreachable!(), + }; + chunk.remove( + chunk.iter().position(|x| x == &target_addr) + .expect("unexpected: element must exists") + ); + tracing::trace!("msg {} ok", hash); + }, + MessageMonitoringStatus::Timeout => { + let reason = match error { + Some(err) => format!("error: {err}"), + None => "message expired".to_owned() + }; + tracing::debug!( + "batched message failed: message id={}, expected address={:?}, reason={}", + hash, + user_data, + reason + ); + }, + MessageMonitoringStatus::Reserved => unreachable!() + } + } + + if receipts_count >= expected_receipts_count { + tracing::debug!("{} receipts processed", expected_receipts_count); + break; + } else if start.elapsed() > timeout { + tracing::debug!("batched messages ({}) failed: time is up", expected_receipts_count - receipts_count); + let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); + tracing::debug!("current timestamp {:?}", now.as_secs()); + break; + } + tracing::trace!( + "{}/{} receipts. falling asleep for 10 sec...", + receipts_count, + expected_receipts_count + ); + tokio::time::sleep(Duration::from_secs(10)).await; // todo remove magic num + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; @@ -2071,9 +2167,9 @@ mod tests { } } -pub fn get_redeploy_attempts() -> i32 { +pub fn get_redeploy_attempts() -> u32 { std::env::var(GOSH_DEPLOY_RETRIES) .ok() .and_then(|num| i32::from_str_radix(&num, 10).ok()) - .unwrap_or(MAX_REDEPLOY_ATTEMPTS) + .unwrap_or(MAX_REDEPLOY_ATTEMPTS) as u32 } \ No newline at end of file diff --git a/v6_x/v6.1.0/git-remote-gosh/src/git_helper/push/parallel_diffs_upload_support.rs b/v6_x/v6.1.0/git-remote-gosh/src/git_helper/push/parallel_diffs_upload_support.rs index 0ca54974c..850eca905 100644 --- a/v6_x/v6.1.0/git-remote-gosh/src/git_helper/push/parallel_diffs_upload_support.rs +++ b/v6_x/v6.1.0/git-remote-gosh/src/git_helper/push/parallel_diffs_upload_support.rs @@ -1,18 +1,24 @@ -use crate::blockchain::BlockchainService; -use crate::blockchain::{snapshot::PushDiffCoordinate, BlockchainContractAddress}; -use crate::git_helper::push::push_diff::{diff_address, is_diff_deployed, push_diff}; -use crate::git_helper::GitHelper; - -use crate::blockchain::snapshot::diffs::wait_diffs_ready::wait_diffs_until_ready; -use anyhow::bail; -use std::collections::HashMap; -use std::vec::Vec; +use crate::{ + blockchain::{ + snapshot::{diffs::wait_diffs_ready::wait_diffs_until_ready, PushDiffCoordinate}, + BlockchainContractAddress, BlockchainService + }, + database::GoshDB, + git_helper::{ + push::{ + get_redeploy_attempts, wait_chunk_until_send, + parallel_snapshot_upload_support::get_push_chunk, + push_diff::{diff_address, prepush_diff}, + }, + GitHelper, + }, +}; + +use std::{collections::HashMap, sync::Arc, time::SystemTime, vec::Vec}; use tokio::task::JoinSet; use tracing::Instrument; -use crate::git_helper::push::get_redeploy_attempts; -use crate::git_helper::push::parallel_snapshot_upload_support::get_push_chunk; -const MAX_RETRIES_FOR_DIFFS_TO_APPEAR: i32 = 20; // x 3sec +// const MAX_RETRIES_FOR_DIFFS_TO_APPEAR: i32 = 20; // x 3sec pub struct ParallelDiffsUploadSupport { parallels: HashMap, @@ -21,7 +27,6 @@ pub struct ParallelDiffsUploadSupport { next_parallel_index: u32, last_commit_id: git_hash::ObjectId, expecting_deployed_contacts_addresses: Vec, - pushed_blobs: JoinSet>, } #[derive(Clone, Debug)] @@ -74,90 +79,87 @@ impl ParallelDiffsUploadSupport { next_parallel_index: 0, last_commit_id: *last_commit_id, expecting_deployed_contacts_addresses: vec![], - pushed_blobs: JoinSet::new(), } } - pub fn get_expected(&self) -> &Vec { - &self.expecting_deployed_contacts_addresses - } - pub fn push_expected(&mut self, value: String) { self.expecting_deployed_contacts_addresses.push(value); } - pub async fn start_push( + #[instrument(level = "info", skip_all)] + pub async fn push_diffs_in_chunks( &mut self, context: &mut GitHelper, ) -> anyhow::Result<()> { let chunk_size = get_push_chunk(); let max_attempts = get_redeploy_attempts(); + tracing::trace!("Start push of diffs, chunk_size={chunk_size}, max_attempts={max_attempts}"); - let mut exp: Vec = self.expecting_deployed_contacts_addresses.iter().map(|addr| BlockchainContractAddress::new(addr)).collect(); - let mut attempt = 0; - let mut last_rest_cnt = 0; + let mut exp: Vec = self + .expecting_deployed_contacts_addresses + .iter() + .map(|addr| BlockchainContractAddress::new(addr)) + .collect(); + + let mut attempt = 0u32; loop { if attempt == max_attempts { - anyhow::bail!("Failed to deploy snapshots. Undeployed snapshots: {exp:?}"); + anyhow::bail!("Failed to send deploy diff messages. Undeployed diffs: {exp:?}"); } - let mut rest = vec![]; + + let mut rest: Vec = vec![]; for chunk in exp.chunks(chunk_size) { - for addr in chunk { - self.add_to_push_list(context, &String::from(addr)).await?; - } - self.finish_push().await?; - let mut tmp_rest = wait_diffs_until_ready(&context.blockchain, chunk).await?; - rest.append(&mut tmp_rest); + let blockchain = context.blockchain.clone(); + let dao_address: BlockchainContractAddress = context.dao_addr.clone(); + let remote_network: String = context.remote.network.clone(); + let repo_name: String = context.remote.repo.clone(); + let ipfs_http_endpoint: String = context.config.ipfs_http_endpoint().to_string(); + let last_commit_id = self.last_commit_id.clone(); + let database = context.get_db()?.clone(); + + let mut unsent = push_chunk( + &blockchain, + repo_name, + &dao_address, + &remote_network, + ipfs_http_endpoint, + last_commit_id, + database, + chunk, + ).await?; + rest.append(&mut unsent); } - exp = rest; - if exp.is_empty() { + + if rest.len() == 0 { break; + } else { + exp = rest; } - if exp.len() != last_rest_cnt { - attempt = 0; - } - last_rest_cnt = exp.len(); - attempt += 1; } - Ok(()) - } + attempt = 0; + loop { + if attempt == max_attempts { + anyhow::bail!("Failed to deploy diffs. Undeployed diffs: {exp:?}"); + } - pub async fn add_to_push_list( - &mut self, - context: &mut GitHelper, - diff_address: &String, - ) -> anyhow::Result<()> { - let diff_address = diff_address.to_owned(); - let blockchain = context.blockchain.clone(); - let dao_address: BlockchainContractAddress = context.dao_addr.clone(); - let remote_network: String = context.remote.network.clone(); - let last_commit_id = self.last_commit_id.clone(); - let repo_name: String = context.remote.repo.clone(); - let ipfs_http_endpoint: String = context.config.ipfs_http_endpoint().to_string(); - tracing::trace!("start push of diff: {}", diff_address); - - let database = context.get_db()?.clone(); - - // self.expecting_deployed_contacts_addresses - // .push(diff_address.clone()); - self.pushed_blobs.spawn( - async move { - push_diff( - &blockchain, - &repo_name, - &dao_address, - &remote_network, - &ipfs_http_endpoint, - &last_commit_id, - diff_address, - database, - ) - .await + let mut rest: Vec = vec![]; + for chunk in exp.chunks(chunk_size) { + let mut undeployed = + wait_diffs_until_ready(&context.blockchain, chunk).await?; + + tracing::trace!("undeployed {} diffs. iteration {}", undeployed.len(), attempt + 1); + rest.append(&mut undeployed); } - .instrument(debug_span!("tokio::spawn::push_diff").or_current()), - ); + if rest.len() == 0 { + break; + } else { + exp = rest; + } + + attempt += 1; + } Ok(()) } @@ -184,11 +186,6 @@ impl ParallelDiffsUploadSupport { (¶llel_diff, diff_coordinates, true), diff_contract_address.clone(), )?; - - // self.add_to_push_list(context, diff_contract_address) - // .await?; - // } else { - // self.push_expected(diff_contract_address); } self.push_expected(diff_contract_address); } @@ -196,21 +193,6 @@ impl ParallelDiffsUploadSupport { Ok(()) } - async fn finish_push(&mut self) -> anyhow::Result<()> { - while let Some(finished_task) = self.pushed_blobs.join_next().await { - match finished_task { - Err(e) => { - bail!("diffs join-handler: {}", e); - } - Ok(Err(e)) => { - bail!("diffs inner: {}", e); - } - Ok(Ok(_)) => {} - } - } - Ok(()) - } - #[instrument(level = "info", skip_all)] pub async fn push( &mut self, @@ -248,11 +230,6 @@ impl ParallelDiffsUploadSupport { (¶llel_diff, diff_coordinates, false), diff_contract_address.clone(), )?; - - // self.add_to_push_list(context, diff_contract_address) - // .await?; - // } else { - // self.push_expected(diff_contract_address); } self.push_expected(diff_contract_address); } @@ -282,3 +259,78 @@ impl ParallelDiffsUploadSupport { } } } + +#[instrument(level = "info", skip_all)] +pub async fn push_chunk( + blockchain: &B, + repo_name: String, + dao_address: &BlockchainContractAddress, + remote_network: &str, + ipfs_endpoint: String, + last_commit_id: git_hash::ObjectId, + database: Arc, + chunk: &[BlockchainContractAddress], +) -> anyhow::Result> +where + B: BlockchainService + 'static, +{ + let wallet = blockchain.user_wallet(dao_address, remote_network).await?; + + let mut chunk = chunk.to_vec(); + let mut message_bocs: Vec<(String, Option)> = vec![]; + + let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); + let expire = now.as_secs() as u32 + 120; // todo remove magic num + + let mut bocs: JoinSet)>> = JoinSet::new(); + for addr in &chunk { + let blockchain = blockchain.clone(); + let repo_name = repo_name.clone(); + let wallet = wallet.clone(); + let ipfs_endpoint = ipfs_endpoint.clone(); + let last_commit = last_commit_id.clone(); + let diff_address = addr.clone(); + let database = database.clone(); + + bocs.spawn( + async move { + let boc_pair = prepush_diff( + &blockchain, + &repo_name, + &wallet, + &ipfs_endpoint, + &last_commit_id, + &diff_address, + database, + expire, + ).await; + boc_pair + } + .instrument(info_span!("tokio::spawn::prepush_diff").or_current()) + ); + } + + while let Some(finished_task) = bocs.join_next().await { + match finished_task { + Err(e) => { + anyhow::bail!("prepush objects join-handler: {}", e); + } + Ok(Err(e)) => { + anyhow::bail!("prepush objects inner: {}", e); + } + Ok(Ok(boc_pair)) => message_bocs.push(boc_pair) + } + } + + let wait_until = expire + 5; + tracing::trace!("msg expire={}, wait_until={}", expire, wait_until); + let queue_name = blockchain.send_messages(&message_bocs, wait_until).await?; + + wait_chunk_until_send(blockchain, &mut chunk, queue_name).await?; + + if chunk.len() > 0 { + tracing::trace!("failed to send {} messages", chunk.len()); + } + + Ok(chunk) +} \ No newline at end of file diff --git a/v6_x/v6.1.0/git-remote-gosh/src/git_helper/push/parallel_snapshot_upload_support.rs b/v6_x/v6.1.0/git-remote-gosh/src/git_helper/push/parallel_snapshot_upload_support.rs index 9c80ad1b3..fbb2c7d7e 100644 --- a/v6_x/v6.1.0/git-remote-gosh/src/git_helper/push/parallel_snapshot_upload_support.rs +++ b/v6_x/v6.1.0/git-remote-gosh/src/git_helper/push/parallel_snapshot_upload_support.rs @@ -1,33 +1,31 @@ -use crate::blockchain::snapshot::wait_snapshots_until_ready; +use anyhow::bail; use crate::{ blockchain::{ contract::wait_contracts_deployed::wait_contracts_deployed, - tree::{load::check_if_tree_is_ready}, + snapshot::wait_snapshots_until_ready, + tree::load::{check_if_tree_is_ready, TreeComponent}, user_wallet::WalletError, AddrVersion, BlockchainContractAddress, BlockchainService, }, git_helper::{ push::{ - push_diff::push_initial_snapshot, push_tree::inner_deploy_tree, + get_redeploy_attempts, wait_chunk_until_send, + push_diff::prepush_initial_snapshot, push_tree::inner_deploy_tree, utilities::retry::default_retry_strategy, }, GitHelper, }, }; -use anyhow::bail; +use crate::database::GoshDB; use git_hash::ObjectId; -use std::time::Duration; -use std::{collections::HashMap, sync::Arc, vec::Vec}; -use tokio::time::sleep; -use tokio::{sync::Semaphore, task::JoinSet}; +use std::{collections::HashMap, sync::Arc, time::{Duration, SystemTime}, vec::Vec}; +use tokio::{sync::Semaphore, task::JoinSet, time::sleep}; use tokio_retry::RetryIf; use tracing::Instrument; -use crate::blockchain::tree::load::TreeComponent; -use crate::git_helper::push::get_redeploy_attempts; const WAIT_TREE_READY_MAX_ATTEMPTS: i32 = 4; const GOSH_PUSH_CHUNK: &str = "GOSH_PUSH_CHUNK"; -const DEFAULT_PUSH_CHUNK_SIZE: usize = 3000; +const DEFAULT_PUSH_CHUNK_SIZE: usize = 150; const WAIT_CONTRACT_CHUNK_SIZE: usize = 50; pub fn get_push_chunk() -> usize { @@ -41,7 +39,6 @@ pub fn get_push_chunk() -> usize { pub struct ParallelSnapshotUploadSupport { expecting_deployed_contacts_addresses: Vec, - pushed_blobs: JoinSet>, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -84,7 +81,6 @@ impl ParallelSnapshotUploadSupport { pub fn new() -> Self { Self { expecting_deployed_contacts_addresses: vec![], - pushed_blobs: JoinSet::new(), } } @@ -96,90 +92,76 @@ impl ParallelSnapshotUploadSupport { self.expecting_deployed_contacts_addresses.push(value); } - pub async fn start_push( + #[instrument(level = "info", skip_all)] + pub async fn push_snapshots_in_chunks( &mut self, context: &mut GitHelper, ) -> anyhow::Result<()> { let chunk_size = get_push_chunk(); let max_attempts = get_redeploy_attempts(); + tracing::trace!("Start push of snapshots, chunk_size={chunk_size}, max_attempts={max_attempts}"); - let mut exp: Vec = self.expecting_deployed_contacts_addresses.iter().map(|addr| BlockchainContractAddress::new(addr)).collect(); + let mut exp: Vec = self + .expecting_deployed_contacts_addresses + .iter() + .map(|addr| BlockchainContractAddress::new(addr)) + .collect(); + let mut attempt = 0; - let mut last_rest_cnt = 0; loop { if attempt == max_attempts { anyhow::bail!("Failed to deploy snapshots. Undeployed snapshots: {exp:?}"); } + let mut rest = vec![]; for chunk in exp.chunks(chunk_size) { - for addr in chunk { - self.add_to_push_list(context, &String::from(addr)).await?; - } - self.finish_push().await?; - let mut tmp_rest = wait_snapshots_until_ready(&context.blockchain, chunk).await?; - rest.append(&mut tmp_rest); + let blockchain = context.blockchain.clone(); + let dao_address = context.dao_addr.clone(); + let remote_network = context.remote.network.clone(); + let repo_addr = context.repo_addr.clone(); + let database = context.get_db()?.clone(); + + let mut unsent = push_chunk( + &blockchain, + &repo_addr, + &dao_address, + &remote_network, + database, + chunk, + ).await?; + rest.append(&mut unsent); } - exp = rest; - if exp.is_empty() { + + if rest.len() == 0 { break; + } else { + exp = rest; } - if exp.len() != last_rest_cnt { - attempt = 0; - } - last_rest_cnt = exp.len(); - attempt += 1; } - Ok(()) - } - async fn finish_push(&mut self) -> anyhow::Result<()> { - while let Some(finished_task) = self.pushed_blobs.join_next().await { - match finished_task { - Err(e) => { - bail!("snapshots join-handler: {}", e); - } - Ok(Err(e)) => { - bail!("snapshots inner: {}", e); - } - Ok(Ok(_)) => {} + attempt = 0; + loop { + if attempt == max_attempts { + anyhow::bail!("Failed to deploy diffs. Undeployed diffs: {exp:?}"); } - } - Ok(()) - } - - #[instrument(level = "info", skip_all)] - pub async fn add_to_push_list( - &mut self, - context: &mut GitHelper, - snapshot_address: &String, - ) -> anyhow::Result<()> { - let snapshot_address = snapshot_address.to_owned(); - let blockchain = context.blockchain.clone(); - let dao_address: BlockchainContractAddress = context.dao_addr.clone(); - let remote_network: String = context.remote.network.clone(); - let repo_address = context.repo_addr.clone(); - tracing::trace!("Start push of snapshot: address: {snapshot_address:?}"); - - // self.expecting_deployed_contacts_addresses - // .push(snapshot_address.to_string()); + let mut rest: Vec = vec![]; + for chunk in exp.chunks(chunk_size) { + let mut undeployed = + wait_snapshots_until_ready(&context.blockchain, chunk).await?; - let database = context.get_db()?.clone(); - self.pushed_blobs.spawn( - async move { - push_initial_snapshot( - blockchain, - repo_address, - dao_address, - remote_network, - snapshot_address, - database, - ) - .await + tracing::trace!("undeployed {} diffs. iteration {}", undeployed.len(), attempt + 1); + rest.append(&mut undeployed); } - .instrument(info_span!("tokio::spawn::push_initial_snapshot").or_current()), - ); + if rest.len() == 0 { + break; + } else { + exp = rest; + } + + attempt += 1; + } Ok(()) } } @@ -506,4 +488,73 @@ pub async fn wait_trees_until_ready( } Ok(not_ready_trees) +} + +#[instrument(level = "info", skip_all)] +pub async fn push_chunk( + blockchain: &B, + repo_addr: &BlockchainContractAddress, + dao_address: &BlockchainContractAddress, + remote_network: &str, + database: Arc, + chunk: &[BlockchainContractAddress], +) -> anyhow::Result> +where + B: BlockchainService + 'static, +{ + let wallet = blockchain.user_wallet(dao_address, remote_network).await?; + + let mut chunk = chunk.to_vec(); + let mut message_bocs: Vec<(String, Option)> = vec![]; + + let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); + let expire = now.as_secs() as u32 + 120; // todo remove magic num + + let mut bocs: JoinSet)>> = JoinSet::new(); + for addr in &chunk { + let blockchain = blockchain.clone(); + let repo_addr = repo_addr.clone(); + let wallet = wallet.clone(); + let snapshot_address = addr.clone(); + let database = database.clone(); + + bocs.spawn( + async move { + let boc_pair = prepush_initial_snapshot( + &blockchain, + &repo_addr, + &wallet, + &snapshot_address, + database, + expire, + ).await; + boc_pair + } + .instrument(info_span!("tokio::spawn::prepush_diff").or_current()) + ); + } + + while let Some(finished_task) = bocs.join_next().await { + match finished_task { + Err(e) => { + anyhow::bail!("prepush objects join-handler: {}", e); + } + Ok(Err(e)) => { + anyhow::bail!("prepush objects inner: {}", e); + } + Ok(Ok(boc_pair)) => message_bocs.push(boc_pair) + } + } + + let wait_until = expire + 5; + tracing::trace!("msg expire={}, wait_until={}", expire, wait_until); + let queue_name = blockchain.send_messages(&message_bocs, wait_until).await?; + + wait_chunk_until_send(blockchain, &mut chunk, queue_name).await?; + + if chunk.len() > 0 { + tracing::trace!("failed to send {} messages", chunk.len()); + } + + Ok(chunk) } \ No newline at end of file diff --git a/v6_x/v6.1.0/git-remote-gosh/src/git_helper/push/push_diff.rs b/v6_x/v6.1.0/git-remote-gosh/src/git_helper/push/push_diff.rs index e8c4a4b9d..cd1eb4d78 100644 --- a/v6_x/v6.1.0/git-remote-gosh/src/git_helper/push/push_diff.rs +++ b/v6_x/v6.1.0/git-remote-gosh/src/git_helper/push/push_diff.rs @@ -1,11 +1,5 @@ -use crate::blockchain::user_wallet::{UserWallet, WalletError}; -use crate::ipfs::{build_ipfs, IpfsError}; -use std::sync::Arc; -use std::time::Duration; -use tokio::time::sleep; - -use crate::database::GoshDB; use crate::{ + database::GoshDB, blockchain::{ contract::{ContractRead, GoshContract}, gosh_abi, @@ -13,11 +7,15 @@ use crate::{ save::{Diff, GetDiffAddrResult, GetVersionResult}, PushDiffCoordinate, }, - tvm_hash, BlockchainContractAddress, BlockchainService, EverClient, EMPTY_BLOB_SHA1, - EMPTY_BLOB_SHA256, + tvm_hash, + user_wallet::{UserWallet, WalletError}, + BlockchainContractAddress, BlockchainService, EverClient, + EMPTY_BLOB_SHA1, EMPTY_BLOB_SHA256, }, - ipfs::{service::FileSave, IpfsService}, + ipfs::{build_ipfs, service::FileSave, IpfsError, IpfsService}, }; +use std::{sync::Arc, time::Duration}; +use tokio::time::sleep; use tokio_retry::RetryIf; use ton_client::utils::compress_zstd; @@ -212,6 +210,140 @@ pub async fn inner_push_diff( Ok(()) } +#[instrument(level = "info", skip_all)] +pub async fn prepush_diff( + blockchain: &B, + repo_name: &str, + wallet: &UserWallet, + ipfs_endpoint: &str, + last_commit_id: &git_hash::ObjectId, + diff_address: &BlockchainContractAddress, + database: Arc, + expire: u32, +) -> anyhow::Result<(String, Option)> +where + B: BlockchainService, +{ + let (parallel_diff, diff_coordinate, is_last) = + database.get_diff(&String::from(diff_address))?; + + let commit_id = parallel_diff.commit_id.to_string(); + let branch_name = parallel_diff.branch_name; + let blob_id = parallel_diff.blob_id; + let file_path = parallel_diff.file_path; + let original_snapshot_content = ¶llel_diff.original_snapshot_content; + let diff = ¶llel_diff.diff; + let new_snapshot_content = ¶llel_diff.new_snapshot_content; + let snapshot_addr: BlockchainContractAddress = + BlockchainContractAddress::new(¶llel_diff.snapshot_address); + + tracing::trace!("prepush_diff: snapshot_addr={snapshot_addr}, commit_id={commit_id}, branch_name={branch_name}, blob_id={blob_id}, file_path={file_path}, diff_coordinate={diff_coordinate:?}, last_commit_id={last_commit_id}, is_last={is_last}"); + let diff = compress_zstd(diff, None)?; + tracing::trace!("prepush_diff: compressed to {} size", diff.len()); + + let ipfs_client = build_ipfs(ipfs_endpoint)?; + let is_previous_oversized = is_going_to_ipfs(original_snapshot_content); + let blob_dst = { + let is_going_to_ipfs = is_going_to_ipfs(new_snapshot_content); + if !is_going_to_ipfs { + if is_previous_oversized { + let compressed = compress_zstd(new_snapshot_content, None)?; + BlobDst::SetContent(hex::encode(compressed)) + } else { + BlobDst::Patch(hex::encode(diff)) + } + } else { + tracing::debug!("prepush_diff->save_data_to_ipfs"); + let ipfs = save_data_to_ipfs(&ipfs_client, new_snapshot_content) + .await + .map_err(|e| { + tracing::debug!("save_data_to_ipfs error: {:#?}", e); + e + })?; + BlobDst::Ipfs(ipfs) + } + }; + let content_sha256 = { + if let BlobDst::Ipfs(_) = blob_dst { + format!("0x{}", sha256::digest(&**new_snapshot_content)) + } else { + format!( + "0x{}", + tvm_hash(&blockchain.client(), new_snapshot_content).await? + ) + } + }; + + let sha1 = if &content_sha256 == EMPTY_BLOB_SHA256 { + EMPTY_BLOB_SHA1.to_owned() + } else { + blob_id.to_string() + }; + + let diff = match blob_dst { + BlobDst::Ipfs(ipfs) => { + let patch = if is_previous_oversized { + None + } else { + let compressed = compress_zstd(original_snapshot_content, None)?; + Some(hex::encode(compressed)) + }; + Diff { + snapshot_addr, + snapshot_file_path: file_path, // TODO: change to full path + commit_id, + patch, + ipfs: Some(ipfs), + remove_ipfs: false, + sha1, + sha256: content_sha256, + } + } + BlobDst::Patch(patch) => Diff { + snapshot_addr, + snapshot_file_path: file_path, // TODO: change to full path + commit_id, + patch: Some(patch), + ipfs: None, + remove_ipfs: false, + sha1, + sha256: content_sha256, + }, + BlobDst::SetContent(content) => Diff { + snapshot_addr, + snapshot_file_path: file_path, // TODO: change to full path + commit_id, + patch: Some(content), + ipfs: None, + remove_ipfs: true, + sha1, + sha256: content_sha256, + }, + }; + + if diff.ipfs.is_some() { + tracing::debug!("prepush_diff: {:?}", diff); + } else { + tracing::trace!("prepush_diff: {:?}", diff); + } + + let boc = blockchain + .construct_deploy_diff_message( + wallet, + repo_name.to_owned(), + branch_name.to_string(), + last_commit_id.to_string(), + diff, + diff_coordinate.index_of_parallel_thread, + diff_coordinate.order_of_diff_in_the_parallel_thread, + is_last, + expire, + ) + .await?; + + Ok((boc, Some(diff_address.clone()))) +} + #[instrument(level = "info", skip_all)] pub async fn save_data_to_ipfs( ipfs_client: &IpfsService, @@ -377,3 +509,40 @@ where ) .await } + +#[instrument(level = "info", skip_all)] +pub async fn prepush_initial_snapshot( + blockchain: &B, + repo_addr: &BlockchainContractAddress, + wallet: &UserWallet, + snapshot_address: &BlockchainContractAddress, + database: Arc, + expire: u32, +) -> anyhow::Result<(String, Option)> +where + B: BlockchainService, +{ + let snapshot = database.get_snapshot(&String::from(snapshot_address))?; + + let file_path = snapshot.file_path; + let upgrade = snapshot.upgrade; + let commit_id = snapshot.commit_id; + let content = snapshot.content; + let ipfs = snapshot.ipfs; + + tracing::trace!("push_initial_snapshot: snapshot_address={snapshot_address}, repo_addr={repo_addr}, file_path={file_path}"); + + let boc = blockchain + .construct_deploy_snapshot_message( + &wallet, + repo_addr.clone(), + commit_id.clone(), + file_path.clone(), + content.clone(), + ipfs.clone(), + expire, + ) + .await?; + + Ok((boc, Some(snapshot_address.clone()))) +}