Skip to content

Commit

Permalink
Place a buffer over each sink of a demux to avoid serial message sending
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Apr 12, 2023
1 parent 837f410 commit 66a1798
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions hydroflow_cli_integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
use bytes::{Bytes, BytesMut};
use serde::{Deserialize, Serialize};

use futures::{ready, stream, Sink, Stream};
use futures::{ready, stream, Sink, Stream, SinkExt, sink::Buffer};

use async_recursion::async_recursion;
use async_trait::async_trait;
Expand Down Expand Up @@ -395,9 +395,12 @@ impl ConnectedSink for ConnectedBidi {
}
}

pub struct ConnectedDemux<T: ConnectedSink> {
pub type BufferedDrain<S, I> = DemuxDrain<I, Buffer<S, I>>;

pub struct ConnectedDemux<T: ConnectedSink>
where <T as ConnectedSink>::Input: Sync {
pub keys: Vec<u32>,
sink: Option<DemuxDrain<T::Input, T::Sink>>,
sink: Option<BufferedDrain<T::Sink, T::Input>>,
}

#[pin_project]
Expand Down Expand Up @@ -460,7 +463,7 @@ where
for (id, pipe) in demux {
connected_demux.insert(
id,
Box::pin(T::from_defn(ServerOrBound::Server(pipe)).await.into_sink()),
Box::pin(T::from_defn(ServerOrBound::Server(pipe)).await.into_sink().buffer(1024)),
);
}

Expand All @@ -481,7 +484,7 @@ where
for (id, bound) in demux {
connected_demux.insert(
id,
Box::pin(T::from_defn(ServerOrBound::Bound(bound)).await.into_sink()),
Box::pin(T::from_defn(ServerOrBound::Bound(bound)).await.into_sink().buffer(1024)),
);
}

Expand All @@ -505,7 +508,7 @@ where
<T as ConnectedSink>::Input: 'static + Sync,
{
type Input = (u32, T::Input);
type Sink = DemuxDrain<T::Input, T::Sink>;
type Sink = BufferedDrain<T::Sink, T::Input>;

fn into_sink(mut self) -> Self::Sink {
self.sink.take().unwrap()
Expand Down

0 comments on commit 66a1798

Please sign in to comment.