Skip to content

Commit

Permalink
ResponseInner::Complete handled by StreamOrFut
Browse files Browse the repository at this point in the history
  • Loading branch information
oscartbeaumont committed Jul 19, 2023
1 parent aa997b1 commit 9d14735
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 63 deletions.
25 changes: 4 additions & 21 deletions src/internal/exec/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl<'a, TCtx: Clone + Send + 'static> SubscriptionManager<TCtx>
{
type Set<'m> = &'m mut SubscriptionSet where Self: 'm;

fn queue(&mut self, _id: u32, stream: OwnedStream<TCtx>) {
fn queue(&mut self, stream: OwnedStream<TCtx>) {
match &mut self.queued {
Some(queued) => {
queued.push(stream);
Expand Down Expand Up @@ -337,18 +337,8 @@ impl<
PollResult::QueueSend
}
StreamYield::Finished(f) => {
if let Some(stream) = f.take(conn.streams.as_mut()) {
if let StreamOrFut::OwnedStream { stream } = stream {
this.batch.as_mut().insert(exec::Response {
id: stream.id,
inner: ResponseInner::Complete,
});
}

PollResult::QueueSend
} else {
PollResult::Progressed
}
f.take(conn.streams.as_mut());
PollResult::Progressed
}
},
// If no streams, fall asleep until a new subscription is queued
Expand All @@ -369,14 +359,7 @@ impl<

// TODO: This can be improved by: https://github.com/jonhoo/streamunordered/pull/5
for (token, _) in conn.steam_to_sub_id.drain() {
if let Some(stream) = conn.streams.as_mut().take(token) {
if let StreamOrFut::OwnedStream { stream } = stream {
this.batch.as_mut().insert(exec::Response {
id: stream.id,
inner: ResponseInner::Complete,
});
}
}
conn.streams.as_mut().remove(token);
}
conn.steam_to_sub_id.drain().for_each(drop);
conn.map.drain().for_each(drop);
Expand Down
10 changes: 5 additions & 5 deletions src/internal/exec/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ mod private {
Self: 'm;

/// TODO
fn queue(&mut self, id: u32, stream: OwnedStream<TCtx>);
fn queue(&mut self, stream: OwnedStream<TCtx>);

/// TODO
fn subscriptions(&mut self) -> Self::Set<'_>;
Expand All @@ -51,7 +51,7 @@ mod private {
impl<TCtx> SubscriptionManager<TCtx> for NoOpSubscriptionManager {
type Set<'a> = &'a mut SubscriptionSet;

fn queue(&mut self, _id: u32, _task: OwnedStream<TCtx>) {
fn queue(&mut self, _task: OwnedStream<TCtx>) {
// Empty enum is unconstructable so this panics will never be hit.
unreachable!();
}
Expand Down Expand Up @@ -105,7 +105,7 @@ mod private {
///
// WARNING: The result of this function will not contain all requests.
// Your expected to use the `queue` fn to push them onto the runtime and handle them when completed
pub(crate) fn execute_batch<'a, M>(
pub fn execute_batch<'a, M>(
&'a self,
ctx: &TCtx,
reqs: Vec<Request>,
Expand Down Expand Up @@ -225,7 +225,7 @@ mod private {
subscriptions.insert(id);
drop(subscriptions);

subscription_manager.queue(id, s);
subscription_manager.queue(s);

ExecutorResult::None
}
Expand All @@ -239,7 +239,7 @@ mod private {

pub struct ExecRequestFut {
stream: Pin<Box<dyn Stream<Item = Result<Value, ExecError>> + Send>>,
pub(crate) id: u32,
pub id: u32,
}

impl ExecRequestFut {
Expand Down
6 changes: 4 additions & 2 deletions src/internal/exec/owned_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ mod private {
BuiltRouter, ExecError,
};

// TODO: This should be private or handle the "complete" message. Right now `StreamOrFut` handles it and can easily be overlooked by downstream impl.

pin_project! {
#[project = OwnedStreamProj]
/// TODO
pub struct OwnedStream<TCtx> {
arc: Arc<BuiltRouter<TCtx>>,
#[pin]
pub(crate) reference: Pin<Box<dyn Stream<Item = Result<Value, ExecError>> + Send>>,
pub(crate) id: u32,
pub id: u32,
}
}

Expand Down Expand Up @@ -210,7 +212,7 @@ impl<TCtx: Send + 'static> TrustMeBro<TCtx> {
subscriptions.insert(id);
drop(subscriptions);

subscription_manager.queue(s.id, s);
subscription_manager.queue(s);

ExecutorResult::None
}
Expand Down
88 changes: 54 additions & 34 deletions src/internal/exec/stream_or_fut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,63 @@ use crate::internal::{exec, PinnedOption, PinnedOptionProj};

use super::{ExecRequestFut, OwnedStream};

pin_project! {
/// TODO
#[project = StreamOrFutProj]
pub(crate) enum StreamOrFut<TCtx> {
OwnedStream {
#[pin]
stream: OwnedStream<TCtx>
},
ExecRequestFut {
#[pin]
fut: ExecRequestFut,
},
Done,
mod private {
use super::*;

pin_project! {
/// TODO
#[project = StreamOrFutProj]
pub enum StreamOrFut<TCtx> {
OwnedStream {
#[pin]
stream: OwnedStream<TCtx>
},
ExecRequestFut {
#[pin]
fut: ExecRequestFut,
},
Done,
}
}
}

impl<TCtx: 'static> Stream for StreamOrFut<TCtx> {
type Item = exec::Response;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.as_mut().project() {
StreamOrFutProj::OwnedStream { stream } => {
let s = stream.project();

Poll::Ready(ready!(s.reference.poll_next(cx)).map(|r| exec::Response {
id: *s.id,
inner: match r {
Ok(v) => exec::ResponseInner::Value(v),
Err(err) => exec::ResponseInner::Error(err.into()),
},
}))
impl<TCtx: 'static> Stream for StreamOrFut<TCtx> {
type Item = exec::Response;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.as_mut().project() {
StreamOrFutProj::OwnedStream { stream } => {
let s = stream.project();

Poll::Ready(Some(match ready!(s.reference.poll_next(cx)) {
Some(r) => exec::Response {
id: *s.id,
inner: match r {
Ok(v) => exec::ResponseInner::Value(v),
Err(err) => exec::ResponseInner::Error(err.into()),
},
},
None => {
let id = *s.id;
self.set(StreamOrFut::Done);
exec::Response {
id,
inner: exec::ResponseInner::Complete,
}
}
}))
}
StreamOrFutProj::ExecRequestFut { fut } => fut.poll(cx).map(|v| {
self.set(StreamOrFut::Done);
Some(v)
}),
StreamOrFutProj::Done { .. } => Poll::Ready(None),
}
StreamOrFutProj::ExecRequestFut { fut } => fut.poll(cx).map(|v| {
self.set(StreamOrFut::Done);
Some(v)
}),
StreamOrFutProj::Done { .. } => Poll::Ready(None),
}
}
}

#[cfg(feature = "unstable")]
pub use private::StreamOrFut;

#[cfg(not(feature = "unstable"))]
pub(crate) use private::StreamOrFut;
2 changes: 1 addition & 1 deletion src/internal/exec/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ mod private {
/// TODO
#[derive(Debug)]
#[allow(dead_code)]
pub(crate) enum IncomingMessage {
pub enum IncomingMessage {
Msg(Result<Value, serde_json::Error>),
Close,
Skip,
Expand Down

0 comments on commit 9d14735

Please sign in to comment.