diff --git a/src/client.rs b/src/client.rs index 6f317266..deee1e46 100644 --- a/src/client.rs +++ b/src/client.rs @@ -759,47 +759,78 @@ pub async fn work( let client = Arc::new(client); - let futures = if client.is_http2() { - (0..n_connections) + if client.is_http2() { + let futures = (0..n_connections) .map(|_| { let report_tx = report_tx.clone(); let counter = counter.clone(); let client = client.clone(); tokio::spawn(async move { - match setup_http2(&client).await { - Ok((connection_time, client_state)) => { - let futures = (0..n_http2_parallel) - .map(move |_| { - let report_tx = report_tx.clone(); - let counter = counter.clone(); - let client = client.clone(); - - let mut client_state = client_state.clone(); - tokio::spawn(async move { - while counter.fetch_add(1, Ordering::Relaxed) < n_tasks { - let mut res = - client.work_http2(&mut client_state).await; - let is_cancel = is_too_many_open_files(&res); - set_connection_time(&mut res, connection_time); - report_tx.send_async(res).await.unwrap(); - if is_cancel { - break; + loop { + match setup_http2(&client).await { + Ok((connection_time, client_state)) => { + let futures = (0..n_http2_parallel) + .map(|_| { + let report_tx = report_tx.clone(); + let counter = counter.clone(); + let client = client.clone(); + + let mut client_state = client_state.clone(); + tokio::spawn(async move { + while counter.fetch_add(1, Ordering::Relaxed) < n_tasks + { + let mut res = + client.work_http2(&mut client_state).await; + let is_cancel = is_too_many_open_files(&res); + let is_reconnect = is_hyper_error(&res); + set_connection_time(&mut res, connection_time); + report_tx.send_async(res).await.unwrap(); + if is_cancel || is_reconnect { + return is_cancel; + } } - } + + true + }) }) - }) - .collect::>(); - for f in futures { - let _ = f.await; + .collect::>(); + + let mut connection_gone = false; + for f in futures { + match f.await { + Ok(true) => { + // All works done + connection_gone = true; + } + Err(_) => { + // Unexpected + connection_gone = true; + } + _ => {} + } + } + + if connection_gone { + return; + } + } + Err(err) => { + if counter.fetch_add(1, Ordering::Relaxed) < n_tasks { + report_tx.send_async(Err(err)).await.unwrap(); + } else { + return; + } } } - Err(err) => report_tx.send_async(Err(err)).await.unwrap(), } }) }) - .collect::>() + .collect::>(); + for f in futures { + let _ = f.await; + } } else { - (0..n_connections) + let futures = (0..n_connections) .map(|_| { let report_tx = report_tx.clone(); let counter = counter.clone(); @@ -816,12 +847,11 @@ pub async fn work( } }) }) - .collect::>() + .collect::>(); + for f in futures { + let _ = f.await; + } }; - - for f in futures { - let _ = f.await; - } } /// n tasks by m workers limit to qps works in a second @@ -874,46 +904,74 @@ pub async fn work_with_qps( let client = Arc::new(client); - let futures = if client.is_http2() { - (0..n_connections) + if client.is_http2() { + let futures = (0..n_connections) .map(|_| { let report_tx = report_tx.clone(); let rx = rx.clone(); let client = client.clone(); tokio::spawn(async move { - match setup_http2(&client).await { - Ok((connection_time, client_state)) => { - let futures = (0..n_http2_parallel) - .map(|_| { - let report_tx = report_tx.clone(); - let rx = rx.clone(); - let client = client.clone(); - let mut client_state = client_state.clone(); - tokio::spawn(async move { - while let Ok(()) = rx.recv_async().await { - let mut res = - client.work_http2(&mut client_state).await; - let is_cancel = is_too_many_open_files(&res); - set_connection_time(&mut res, connection_time); - report_tx.send_async(res).await.unwrap(); - if is_cancel { - break; + loop { + match setup_http2(&client).await { + Ok((connection_time, client_state)) => { + let futures = (0..n_http2_parallel) + .map(|_| { + let report_tx = report_tx.clone(); + let rx = rx.clone(); + let client = client.clone(); + let mut client_state = client_state.clone(); + tokio::spawn(async move { + while let Ok(()) = rx.recv_async().await { + let mut res = + client.work_http2(&mut client_state).await; + let is_cancel = is_too_many_open_files(&res); + let is_reconnect = is_hyper_error(&res); + set_connection_time(&mut res, connection_time); + report_tx.send_async(res).await.unwrap(); + if is_cancel || is_reconnect { + return is_cancel; + } } - } + true + }) }) - }) - .collect::>(); - for f in futures { - let _ = f.await; + .collect::>(); + let mut connection_gone = false; + for f in futures { + match f.await { + Ok(true) => { + // All works done + connection_gone = true; + } + Err(_) => { + // Unexpected + connection_gone = true; + } + _ => {} + } + } + if connection_gone { + return; + } + } + Err(err) => { + // Consume a task + if let Ok(()) = rx.recv_async().await { + report_tx.send_async(Err(err)).await.unwrap(); + } else { + return; + } } } - Err(err) => report_tx.send_async(Err(err)).await.unwrap(), } }) }) - .collect::>() + .collect::>(); + for f in futures { + let _ = f.await; + } } else { - (0..n_connections) + let futures = (0..n_connections) .map(|_| { let report_tx = report_tx.clone(); let rx = rx.clone(); @@ -930,12 +988,11 @@ pub async fn work_with_qps( } }) }) - .collect::>() + .collect::>(); + for f in futures { + let _ = f.await; + } }; - - for f in futures { - let _ = f.await; - } } /// n tasks by m workers limit to qps works in a second with latency correction @@ -990,47 +1047,75 @@ pub async fn work_with_qps_latency_correction( let client = Arc::new(client); - let futures = if client.is_http2() { - (0..n_connections) + if client.is_http2() { + let futures = (0..n_connections) .map(|_| { let report_tx = report_tx.clone(); let rx = rx.clone(); let client = client.clone(); tokio::spawn(async move { - match setup_http2(&client).await { - Ok((connection_time, client_state)) => { - let futures = (0..n_http2_parallel) - .map(|_| { - let report_tx = report_tx.clone(); - let rx = rx.clone(); - let client = client.clone(); - let mut client_state = client_state.clone(); - tokio::spawn(async move { - while let Ok(start) = rx.recv_async().await { - let mut res = - client.work_http2(&mut client_state).await; - let is_cancel = is_too_many_open_files(&res); - set_connection_time(&mut res, connection_time); - set_start_latency_correction(&mut res, start); - report_tx.send_async(res).await.unwrap(); - if is_cancel { - break; + loop { + match setup_http2(&client).await { + Ok((connection_time, client_state)) => { + let futures = (0..n_http2_parallel) + .map(|_| { + let report_tx = report_tx.clone(); + let rx = rx.clone(); + let client = client.clone(); + let mut client_state = client_state.clone(); + tokio::spawn(async move { + while let Ok(start) = rx.recv_async().await { + let mut res = + client.work_http2(&mut client_state).await; + let is_cancel = is_too_many_open_files(&res); + let is_reconnect = is_hyper_error(&res); + set_connection_time(&mut res, connection_time); + set_start_latency_correction(&mut res, start); + report_tx.send_async(res).await.unwrap(); + if is_cancel || is_reconnect { + return is_cancel; + } } - } + true + }) }) - }) - .collect::>(); - for f in futures { - let _ = f.await; + .collect::>(); + let mut connection_gone = false; + for f in futures { + match f.await { + Ok(true) => { + // All works done + connection_gone = true; + } + Err(_) => { + // Unexpected + connection_gone = true; + } + _ => {} + } + } + if connection_gone { + return; + } + } + Err(err) => { + // Consume a task + if rx.recv_async().await.is_ok() { + report_tx.send_async(Err(err)).await.unwrap(); + } else { + return; + } } } - Err(err) => report_tx.send_async(Err(err)).await.unwrap(), } }) }) - .collect::>() + .collect::>(); + for f in futures { + let _ = f.await; + } } else { - (0..n_connections) + let futures = (0..n_connections) .map(|_| { let client = client.clone(); let mut client_state = ClientStateHttp1::default(); @@ -1048,12 +1133,11 @@ pub async fn work_with_qps_latency_correction( } }) }) - .collect::>() + .collect::>(); + for f in futures { + let _ = f.await; + } }; - - for f in futures { - let _ = f.await; - } } /// Run until dead_line by n workers @@ -1087,10 +1171,10 @@ pub async fn work_until( let mut res = client.work_http2(&mut client_state).await; let is_cancel = is_too_many_open_files(&res); - let is_hyper_error = is_hyper_error(&res); + let is_reconnect = is_hyper_error(&res); set_connection_time(&mut res, connection_time); report_tx.send_async(res).await.unwrap(); - if is_cancel || is_hyper_error { + if is_cancel || is_reconnect { break is_cancel; } } @@ -1207,34 +1291,58 @@ pub async fn work_until_with_qps( let report_tx = report_tx.clone(); let rx = rx.clone(); tokio::spawn(async move { - match setup_http2(&client).await { - Ok((connection_time, client_state)) => { - let futures = (0..n_http2_parallel) - .map(|_| { - let client = client.clone(); - let report_tx = report_tx.clone(); - let rx = rx.clone(); - let mut client_state = client_state.clone(); - tokio::spawn(async move { - while let Ok(()) = rx.recv_async().await { - let mut res = - client.work_http2(&mut client_state).await; - let is_cancel = is_too_many_open_files(&res); - set_connection_time(&mut res, connection_time); - report_tx.send_async(res).await.unwrap(); - if is_cancel { - break; + loop { + match setup_http2(&client).await { + Ok((connection_time, client_state)) => { + let futures = (0..n_http2_parallel) + .map(|_| { + let client = client.clone(); + let report_tx = report_tx.clone(); + let rx = rx.clone(); + let mut client_state = client_state.clone(); + tokio::spawn(async move { + while let Ok(()) = rx.recv_async().await { + let mut res = + client.work_http2(&mut client_state).await; + let is_cancel = is_too_many_open_files(&res); + let is_reconnect = is_hyper_error(&res); + set_connection_time(&mut res, connection_time); + report_tx.send_async(res).await.unwrap(); + if is_cancel || is_reconnect { + return is_cancel; + } } - } + true + }) }) - }) - .collect::>(); - tokio::time::sleep_until(dead_line.into()).await; - for f in futures { - f.abort(); + .collect::>(); + let mut connection_gone = false; + for f in futures { + match f.await { + Ok(true) => { + // All works done + connection_gone = true; + } + Err(_) => { + // Unexpected + connection_gone = true; + } + _ => {} + } + } + if connection_gone { + return; + } + } + Err(err) => { + // Consume a task + if rx.recv_async().await.is_ok() { + report_tx.send_async(Err(err)).await.unwrap(); + } else { + return; + } } } - Err(err) => report_tx.send_async(Err(err)).await.unwrap(), } }) }) @@ -1328,36 +1436,59 @@ pub async fn work_until_with_qps_latency_correction( let report_tx = report_tx.clone(); let rx = rx.clone(); tokio::spawn(async move { - match setup_http2(&client).await { - Ok((connection_time, client_state)) => { - let futures = (0..n_http2_parallel) - .map(|_| { - let client = client.clone(); - let report_tx = report_tx.clone(); - let rx = rx.clone(); - let mut client_state = client_state.clone(); - tokio::spawn(async move { - while let Ok(start) = rx.recv_async().await { - let mut res = - client.work_http2(&mut client_state).await; - set_start_latency_correction(&mut res, start); - set_connection_time(&mut res, connection_time); - let is_cancel = is_too_many_open_files(&res); - report_tx.send_async(res).await.unwrap(); - if is_cancel { - break; + loop { + match setup_http2(&client).await { + Ok((connection_time, client_state)) => { + let futures = (0..n_http2_parallel) + .map(|_| { + let client = client.clone(); + let report_tx = report_tx.clone(); + let rx = rx.clone(); + let mut client_state = client_state.clone(); + tokio::spawn(async move { + while let Ok(start) = rx.recv_async().await { + let mut res = + client.work_http2(&mut client_state).await; + set_start_latency_correction(&mut res, start); + set_connection_time(&mut res, connection_time); + let is_cancel = is_too_many_open_files(&res); + let is_reconnect = is_hyper_error(&res); + report_tx.send_async(res).await.unwrap(); + if is_cancel || is_reconnect { + return is_cancel; + } } - } + true + }) }) - }) - .collect::>(); - tokio::time::sleep_until(dead_line.into()).await; - for f in futures { - f.abort(); + .collect::>(); + let mut connection_gone = false; + for f in futures { + match f.await { + Ok(true) => { + // All works done + connection_gone = true; + } + Err(_) => { + // Unexpected + connection_gone = true; + } + _ => {} + } + } + if connection_gone { + return; + } } - } - Err(err) => report_tx.send_async(Err(err)).await.unwrap(), + Err(err) => { + if rx.recv_async().await.is_ok() { + report_tx.send_async(Err(err)).await.unwrap(); + } else { + return; + } + } + } } }) })