Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(perf): support iperf-style intermittent results #4382

Merged
merged 32 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d57ae7b
Revert "refactor(perf): use libp2p-request-response"
mxinden Aug 23, 2023
f45070c
Implement reporting via genawaiter
mxinden Aug 23, 2023
3f49d8e
Replace genawaiter with plain channel
mxinden Aug 23, 2023
a5df071
Basic printing
mxinden Aug 23, 2023
9788b53
intermittent -> intermediate
mxinden Aug 24, 2023
066d44c
Max out quic connection and stream data
mxinden Aug 25, 2023
9a1865e
Set send_window
mxinden Aug 25, 2023
14db268
Log config
mxinden Aug 25, 2023
d85382c
Revert logging
mxinden Aug 25, 2023
0f99dd4
Be reasonable
mxinden Aug 25, 2023
b901b6b
Merge branch 'master' of https://github.com/libp2p/rust-libp2p into i…
mxinden Oct 19, 2023
257c101
Simplify binary
mxinden Oct 19, 2023
f897297
Reduce tokio features
mxinden Oct 19, 2023
1ac5543
Revert changes to dockerfile
mxinden Oct 19, 2023
ce4384d
Remove quic window bumps
mxinden Oct 19, 2023
b82b684
Revert quic transport send_window hack
mxinden Oct 19, 2023
2126bf6
Handle client error
mxinden Oct 19, 2023
b3ab270
Simplify client protocol
mxinden Oct 19, 2023
61c83be
Remove outdated todo
mxinden Oct 19, 2023
fd9443f
Address minor review comments
mxinden Oct 22, 2023
4facef9
Use SwarmBuilder
mxinden Oct 23, 2023
9048af2
Use loop
mxinden Oct 23, 2023
a33842b
Use futures-bounded in server handler
mxinden Oct 23, 2023
5d4a7c4
Make RunError::NotConnected an empty struct
mxinden Oct 23, 2023
b8a7066
Refactor import
mxinden Oct 24, 2023
9111752
fmt
mxinden Oct 25, 2023
d124a46
fix(.github): allow perf to depend on meta crate
mxinden Oct 25, 2023
f082fbe
Merge branch 'master' of https://github.com/libp2p/rust-libp2p into i…
mxinden Oct 25, 2023
affdfce
Bump version and add changelog entry
mxinden Oct 25, 2023
0218fcf
Bump workspace dependency version
mxinden Oct 25, 2023
9090ede
Merge branch 'master' into iperf-style
mergify[bot] Oct 25, 2023
f817667
Remove keep alive handling
mxinden Oct 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions protocols/perf/src/bin/perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use libp2p_core::{
Transport as _,
};
use libp2p_identity::PeerId;
use libp2p_perf::server::Event;
use libp2p_perf::{Finished, Progressed, Run, RunParams, RunUpdate};
use libp2p_perf::{client, server};
use libp2p_perf::{Final, Intermediate, Run, RunParams, RunUpdate};
use libp2p_swarm::{Config, NetworkBehaviour, Swarm, SwarmEvent};
use log::{error, info};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -135,7 +135,7 @@ async fn server(server_address: SocketAddr) -> Result<()> {
info!("Established connection to {:?} via {:?}", peer_id, endpoint);
}
SwarmEvent::ConnectionClosed { .. } => {}
SwarmEvent::Behaviour(Event { .. }) => {
SwarmEvent::Behaviour(server::Event { .. }) => {
info!("Finished run",)
}
e => panic!("{e:?}"),
Expand Down Expand Up @@ -237,7 +237,7 @@ async fn swarm<B: NetworkBehaviour + Default>() -> Result<Swarm<B>> {
}

async fn connect(
swarm: &mut Swarm<libp2p_perf::client::Behaviour>,
swarm: &mut Swarm<client::Behaviour>,
server_address: Multiaddr,
) -> Result<PeerId> {
let start = Instant::now();
Expand All @@ -260,21 +260,21 @@ async fn connect(
}

async fn perf(
swarm: &mut Swarm<libp2p_perf::client::Behaviour>,
swarm: &mut Swarm<client::Behaviour>,
server_peer_id: PeerId,
params: RunParams,
) -> Result<Run> {
swarm.behaviour_mut().perf(server_peer_id, params)?;

let duration = loop {
match swarm.next().await.unwrap() {
SwarmEvent::Behaviour(libp2p_perf::client::Event {
SwarmEvent::Behaviour(client::Event {
id: _,
result: Ok(RunUpdate::Progressed(progressed)),
result: Ok(RunUpdate::Intermediate(progressed)),
}) => {
info!("{progressed}");

let Progressed {
let Intermediate {
duration,
sent,
received,
Expand All @@ -291,9 +291,9 @@ async fn perf(
.unwrap()
);
}
SwarmEvent::Behaviour(libp2p_perf::client::Event {
SwarmEvent::Behaviour(client::Event {
id: _,
result: Ok(RunUpdate::Finished(Finished { duration })),
result: Ok(RunUpdate::Final(Final { duration })),
}) => break duration,
e => panic!("{e:?}"),
};
Expand Down
17 changes: 6 additions & 11 deletions protocols/perf/src/client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use libp2p_swarm::{
};
use void::Void;

use super::{RunError, RunId};
use crate::client::{RunError, RunId};
use crate::{RunParams, RunUpdate};

#[derive(Debug)]
Expand Down Expand Up @@ -66,8 +66,6 @@ pub struct Handler {
requested_streams: VecDeque<Command>,

outbound: SelectAll<BoxStream<'static, (RunId, Result<crate::RunUpdate, std::io::Error>)>>,
mxinden marked this conversation as resolved.
Show resolved Hide resolved

keep_alive: KeepAlive,
}

impl Handler {
Expand All @@ -76,7 +74,6 @@ impl Handler {
queued_events: Default::default(),
requested_streams: Default::default(),
outbound: Default::default(),
keep_alive: KeepAlive::Yes,
}
}
}
Expand Down Expand Up @@ -157,7 +154,11 @@ impl ConnectionHandler for Handler {
}

fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
if self.outbound.is_empty() {
KeepAlive::No
} else {
KeepAlive::Yes
}
}

fn poll(
Expand All @@ -182,12 +183,6 @@ impl ConnectionHandler for Handler {
}));
}

if self.outbound.is_empty() {
self.keep_alive = KeepAlive::No
} else {
self.keep_alive = KeepAlive::Yes
}

Poll::Pending
}
}
12 changes: 6 additions & 6 deletions protocols/perf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,20 @@ pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/perf/1.0.0");

#[derive(Debug, Clone, Copy)]
pub enum RunUpdate {
Progressed(Progressed),
Finished(Finished),
Intermediate(Intermediate),
Final(Final),
}

#[derive(Debug, Clone, Copy)]
pub struct Progressed {
pub struct Intermediate {
pub duration: Duration,
pub sent: usize,
pub received: usize,
}

impl Display for Progressed {
impl Display for Intermediate {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Progressed {
let Intermediate {
duration,
sent,
received,
Expand All @@ -69,7 +69,7 @@ impl Display for Progressed {
}

#[derive(Debug, Clone, Copy)]
pub struct Finished {
pub struct Final {
pub duration: RunDuration,
}

Expand Down
18 changes: 9 additions & 9 deletions protocols/perf/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use futures::{
AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, SinkExt, Stream, StreamExt,
};

use crate::{Finished, Progressed, Run, RunDuration, RunParams, RunUpdate};
use crate::{Final, Intermediate, Run, RunDuration, RunParams, RunUpdate};

const BUF: [u8; 1024] = [0; 1024];
const REPORT_INTERVAL: Duration = Duration::from_secs(1);
Expand All @@ -43,18 +43,18 @@ pub(crate) fn send_receive<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
let inner = send_receive_inner(params, stream, sender).fuse();

futures::stream::select(
receiver.map(|progressed| Ok(RunUpdate::Progressed(progressed))),
receiver.map(|progressed| Ok(RunUpdate::Intermediate(progressed))),
inner
.map(|finished| finished.map(RunUpdate::Finished))
.map(|finished| finished.map(RunUpdate::Final))
.into_stream(),
)
}

async fn send_receive_inner<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
params: RunParams,
mut stream: S,
mut progress: futures::channel::mpsc::Sender<crate::Progressed>,
) -> Result<Finished, std::io::Error> {
mut progress: futures::channel::mpsc::Sender<crate::Intermediate>,
mxinden marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<Final, std::io::Error> {
let mut delay = Delay::new(REPORT_INTERVAL);

let RunParams {
Expand All @@ -81,7 +81,7 @@ async fn send_receive_inner<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
Either::Left((_, _)) => {
delay.reset(REPORT_INTERVAL);
progress
.send(Progressed {
.send(Intermediate {
duration: intermittant_start.elapsed(),
sent: sent - intermittent_sent,
received: 0,
Expand All @@ -101,7 +101,7 @@ async fn send_receive_inner<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
Either::Left((_, _)) => {
delay.reset(REPORT_INTERVAL);
progress
.send(Progressed {
.send(Intermediate {
duration: intermittant_start.elapsed(),
sent: sent - intermittent_sent,
received: 0,
Expand All @@ -127,7 +127,7 @@ async fn send_receive_inner<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
Either::Left((_, _)) => {
delay.reset(REPORT_INTERVAL);
progress
.send(Progressed {
.send(Intermediate {
duration: intermittant_start.elapsed(),
sent: sent - intermittent_sent,
received: received - intermittend_received,
Expand All @@ -145,7 +145,7 @@ async fn send_receive_inner<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(

let read_done = Instant::now();

Ok(Finished {
Ok(Final {
duration: RunDuration {
upload: write_done.duration_since(write_start),
download: read_done.duration_since(write_done),
Expand Down
14 changes: 5 additions & 9 deletions protocols/perf/src/server/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,12 @@ pub struct Event {

pub struct Handler {
inbound: FuturesUnordered<BoxFuture<'static, Result<Run, std::io::Error>>>,
mxinden marked this conversation as resolved.
Show resolved Hide resolved
keep_alive: KeepAlive,
}

impl Handler {
pub fn new() -> Self {
Self {
inbound: Default::default(),
keep_alive: KeepAlive::Yes,
}
}
}
Expand Down Expand Up @@ -110,7 +108,11 @@ impl ConnectionHandler for Handler {
}

fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
if self.inbound.is_empty() {
KeepAlive::No
} else {
KeepAlive::Yes
}
}

fn poll(
Expand All @@ -135,12 +137,6 @@ impl ConnectionHandler for Handler {
}
}

if self.inbound.is_empty() {
self.keep_alive = KeepAlive::No
} else {
self.keep_alive = KeepAlive::Yes
}

Poll::Pending
}
}
Loading