Skip to content

Commit

Permalink
refactor: judicious locks, move access patterns to Block step methods
Browse files Browse the repository at this point in the history
  • Loading branch information
cakekindel committed Mar 30, 2023
1 parent 684020e commit 8d6a34b
Showing 1 changed file with 171 additions and 120 deletions.
291 changes: 171 additions & 120 deletions toad/src/step/block.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use core::marker::PhantomData;

use naan::prelude::F1Once;
use no_std_net::SocketAddr;
use toad_array::{AppendCopy, Array};
use toad_map::Map;
Expand All @@ -9,6 +10,7 @@ use toad_stem::Stem;

use super::{Step, _try};
use crate::net::Addrd;
use crate::platform::toad_msg::Message;
use crate::platform::{self, Effect, PlatformTypes, Snapshot};
use crate::req::Req;
use crate::resp::code::{CONTINUE, REQUEST_ENTITY_INCOMPLETE};
Expand Down Expand Up @@ -36,14 +38,14 @@ struct BlockState<P, Pcs>
where P: PlatformTypes
{
pub(self) biggest: Option<u32>,
pub(self) original: Option<platform::toad_msg::Message<P>>,
pub(self) original: Option<Message<P>>,
pub(self) pcs: Pcs,
}

impl<P, Pcs> BlockState<P, Pcs> where P: PlatformTypes
{
pub(self) fn assembled(&self) -> Payload<P::MessagePayload>
where Pcs: Map<u32, Piece<platform::toad_msg::Message<P>>>
where Pcs: Map<u32, Piece<Message<P>>>
{
const PANIC_MSG: &'static str = r#"BlockState.assembled() assumes:
- BlockState.biggest is Some(_)
Expand Down Expand Up @@ -143,10 +145,108 @@ impl<P, S, BS, Pcs> Default for Block<P, S, BS, Pcs>
}
}

impl<P, S, BS, Pcs> Block<P, S, BS, Pcs> where P: PlatformTypes
{
fn find_rx_request_block_state_ix(&self, token: Token) -> Option<usize>
where Pcs: Map<u32, Piece<Message<P>>>,
BS: Array<Item = BlockState<P, Pcs>>
{
self.block_states.map_ref(|bs| {
bs.iter()
.enumerate()
.find(|(_, bs)| match bs.pcs.get(&0) {
| Some(Piece::Have(m)) => m.token == token,
| _ => false,
})
.map(|(ix, _)| ix)
})
}

fn find_rx_response_block_state_ix(&self, token: Token, cache_key: u64) -> Option<usize>
where Pcs: Map<u32, Piece<Message<P>>>,
BS: Array<Item = BlockState<P, Pcs>>
{
self.block_states.map_ref(|block_states| {
block_states.iter()
.enumerate()
.find(|(_, bs)| {
let block0_and_response_is_for_originating_request =
bs.biggest.is_none()
&& bs.original.as_ref().map(|msg| msg.token) == Some(token);

let block_n_and_response_matches_previous_block =
bs.pcs
.get(&0)
.and_then(|p| p.get_msg().map(|m| m.cache_key()))
== Some(cache_key);

block0_and_response_is_for_originating_request
|| block_n_and_response_matches_previous_block
})
.map(|(ix, _)| ix)
})
}

fn block_state_mut<F, R>(&self, ix: usize, f: F) -> R
where F: for<'a> F1Once<&'a mut BlockState<P, Pcs>, Ret = R>,
Pcs: Map<u32, Piece<Message<P>>>,
BS: Array<Item = BlockState<P, Pcs>>
{
let mut f = Some(f);
self.block_states
.map_mut(|bs| Option::take(&mut f).unwrap().call1(&mut bs[ix]))
}

fn block_state<F, R>(&self, ix: usize, f: F) -> R
where F: for<'a> F1Once<&'a BlockState<P, Pcs>, Ret = R>,
Pcs: Map<u32, Piece<Message<P>>>,
BS: Array<Item = BlockState<P, Pcs>>
{
let mut f = Some(f);
self.block_states
.map_ref(|bs| Option::take(&mut f).unwrap().call1(&bs[ix]))
}

fn push(&self, s: BlockState<P, Pcs>)
where Pcs: Map<u32, Piece<Message<P>>>,
BS: Array<Item = BlockState<P, Pcs>>
{
let mut s = Some(s);
self.block_states
.map_mut(|bs| bs.push(Option::take(&mut s).unwrap()));
}

fn remove(&self, ix: usize)
where Pcs: Map<u32, Piece<Message<P>>>,
BS: Array<Item = BlockState<P, Pcs>>
{
self.block_states.map_mut(|bs| bs.remove(ix));
}

fn clone_original(&self, ix: usize) -> Message<P>
where Pcs: Map<u32, Piece<Message<P>>>,
BS: Array<Item = BlockState<P, Pcs>>
{
self.block_state(ix, |s: &BlockState<_, _>| {
let orig: &Message<P> = s.original.as_ref().unwrap();

let mut new = Message::<P>::new(Type::Con, orig.code, Id(0), orig.token);

orig.opts.iter().for_each(|(n, vs)| {
if n.include_in_cache_key() {
new.opts.insert(*n, vs.clone()).ok();
}
});

new
})
}
}

impl<P, S, BS, Pcs> Step<P> for Block<P, S, BS, Pcs>
where P: PlatformTypes,
S: Step<P, PollReq = Addrd<Req<P>>, PollResp = Addrd<Resp<P>>>,
Pcs: Map<u32, Piece<platform::toad_msg::Message<P>>>,
Pcs: Map<u32, Piece<Message<P>>>,
BS: Array<Item = BlockState<P, Pcs>>
{
type PollReq = Addrd<Req<P>>;
Expand All @@ -167,17 +267,7 @@ impl<P, S, BS, Pcs> Step<P> for Block<P, S, BS, Pcs>
match req.data().msg().block2() {
| None => Some(Ok(req)),
| Some(block) => {
let block_state_ix = self.block_states.map_ref(|block_states| {
block_states.iter()
.enumerate()
.find(|(_, bs)| match bs.pcs.get(&0) {
| Some(Piece::Have(m)) => {
m.token == req.data().msg().token
},
| _ => false,
})
.map(|(ix, _)| ix)
});
let block_state_ix = self.find_rx_request_block_state_ix(req.data().msg().token);

macro_rules! respond {
($code:expr) => {{
Expand All @@ -187,8 +277,7 @@ impl<P, S, BS, Pcs> Step<P> for Block<P, S, BS, Pcs>
Type::Non
};

let rep =
platform::toad_msg::Message::<P>::new(rep_ty, $code, Id(0), req.data().msg().token);
let rep = Message::<P>::new(rep_ty, $code, Id(0), req.data().msg().token);
effects.push(Effect::Send(Addrd(rep, req.addr())));
}};
}
Expand All @@ -199,26 +288,13 @@ impl<P, S, BS, Pcs> Step<P> for Block<P, S, BS, Pcs>

Some(Err(nb::Error::WouldBlock))
},
| Some(ix)
if self.block_states.map_ref(|block_states| {
block_states[ix].biggest.map(|n| n + 1).unwrap_or(0) < block.num()
}) =>
{
self.block_states
.map_mut(|block_states| block_states.remove(ix));
respond!(REQUEST_ENTITY_INCOMPLETE);

Some(Err(nb::Error::WouldBlock))
},
| None if block.more() => {
let mut block_state = BlockState { biggest: Some(0),
original: None,
pcs: Default::default() };
block_state.have(0, req.data().msg().clone());

let mut block_state = Some(block_state);
self.block_states
.map_mut(|block_states| block_states.push(Option::take(&mut block_state).unwrap()));
self.push(block_state);
respond!(CONTINUE);

Some(Err(nb::Error::WouldBlock))
Expand All @@ -228,20 +304,29 @@ impl<P, S, BS, Pcs> Step<P> for Block<P, S, BS, Pcs>
// simply yield the request
Some(Ok(req))
},
| Some(ix)
if self.block_state(ix, |s: &BlockState<_, _>| s.biggest)
.map(|n| n + 1)
.unwrap_or(0)
< block.num() =>
{
self.remove(ix);
respond!(REQUEST_ENTITY_INCOMPLETE);

Some(Err(nb::Error::WouldBlock))
},
| Some(ix) => {
self.block_states.map_mut(|block_states| {
block_states[ix].have(block.num(), req.data().msg().clone())
});
self.block_state_mut(ix, |s: &mut BlockState<_, _>| {
s.have(block.num(), req.data().msg().clone())
});

if block.more() {
respond!(CONTINUE);
Some(Err(nb::Error::WouldBlock))
} else {
let p = self.block_states
.map_ref(|block_states| block_states[ix].assembled());
let p = self.block_state(ix, |s: &BlockState<_, _>| s.assembled());
req.as_mut().msg_mut().payload = p;
self.block_states
.map_mut(|block_states| block_states.remove(ix));
self.remove(ix);
Some(Ok(req))
}
},
Expand All @@ -256,109 +341,75 @@ impl<P, S, BS, Pcs> Step<P> for Block<P, S, BS, Pcs>
token: Token,
addr: SocketAddr)
-> super::StepOutput<Self::PollResp, Self::Error> {
let rep: Addrd<Resp<P>> =
let mut rep: Addrd<Resp<P>> =
_try!(Option<nb::Result>; self.inner().poll_resp(snap, effects, token, addr));

let block_state_ix = self.block_states.map_ref(|block_states| {
block_states.iter()
.enumerate()
.find(|(_, bs)| {
let block0_and_response_is_for_originating_request =
bs.biggest.is_none()
&& bs.original.as_ref().map(|msg| msg.token)
== Some(rep.data().token());

let block_n_and_response_matches_previous_block =
bs.pcs
.get(&0)
.and_then(|p| p.get_msg().map(|m| m.cache_key()))
== Some(rep.data().msg().cache_key());

block0_and_response_is_for_originating_request
|| block_n_and_response_matches_previous_block
})
.map(|(ix, _)| ix)
});
let block_state_ix =
self.find_rx_response_block_state_ix(rep.data().msg().token, rep.data().msg().cache_key());

match rep.data().msg().block1() {
| None => {
// Response didn't have Block1; we can drop the block state
if let Some(ix) = block_state_ix {
self.block_states.map_mut(|es| es.remove(ix));
self.remove(ix);
}
Some(Ok(rep))
},
| Some(block) => {
let mut rep = Some(rep);
self.block_states.map_mut(|block_states| {
let mut rep = Option::take(&mut rep).unwrap();

match block_state_ix {
| None => {
// Got a Block1 message but we don't have any conception of it; yield the response as-is from the inner step.
Some(Ok(rep))
},
| Some(ix) => {
let blocks = block_states.get_mut(ix).unwrap();

macro_rules! request_num {
($num:expr) => {{
let orig = blocks.original.as_ref().unwrap();

let mut new = platform::toad_msg::Message::<P>::new(Type::Con,
orig.code,
Id(0),
orig.token);
orig.opts.iter().for_each(|(n, vs)| {
if n.include_in_cache_key() {
new.opts.insert(*n, vs.clone()).ok();
}
});
new.set_block1(0, $num, false).ok();
new.remove(BLOCK2);
new.remove(SIZE2);

effects.push(Effect::Send(rep.as_ref().map(|_| new)));
blocks.waiting($num);
}};
}

blocks.have(block.num(), rep.data().msg().clone());

if block.more() {
request_num!(block.num() + 1);
}

if let Some(missing) = blocks.get_missing() {
request_num!(missing);
}

if blocks.have_all() {
rep.as_mut().msg_mut().payload = blocks.assembled();
rep.as_mut().msg_mut().remove(BLOCK1);
block_states.remove(ix);
Some(Ok(rep))
} else {
Some(Err(nb::Error::WouldBlock))
}
},
}
})
match block_state_ix {
| None => {
// Got a Block1 message but we don't have any conception of it; yield the response as-is from the inner step.
Some(Ok(rep))
},
| Some(ix) => {
macro_rules! request_num {
($num:expr) => {{
let mut new = self.clone_original(ix);

new.set_block1(0, $num, false).ok();
new.remove(BLOCK2);
new.remove(SIZE2);

effects.push(Effect::Send(rep.as_ref().map(|_| new)));
self.block_state_mut(ix, |s: &mut BlockState<_, _>| s.waiting($num));
}};
}

self.block_state_mut(ix, |s: &mut BlockState<_, _>| {
s.have(block.num(), rep.data().msg().clone())
});

if block.more() {
request_num!(block.num() + 1);
}

if let Some(missing) = self.block_state(ix, BlockState::get_missing) {
request_num!(missing);
}

if self.block_state(ix, BlockState::have_all) {
rep.as_mut().msg_mut().payload = self.block_state(ix, BlockState::assembled);
rep.as_mut().msg_mut().remove(BLOCK1);
self.remove(ix);
Some(Ok(rep))
} else {
Some(Err(nb::Error::WouldBlock))
}
},
}
},
}
}

fn on_message_sent(&self,
snap: &platform::Snapshot<P>,
msg: &Addrd<platform::Message<P>>)
msg: &Addrd<Message<P>>)
-> Result<(), Self::Error> {
self.inner.on_message_sent(snap, msg)?;
if msg.data().code.kind() == CodeKind::Request {
self.block_states.map_mut(|block_states| {
block_states.push(BlockState { biggest: None,
original: Some(msg.data().clone()),
pcs: Default::default() })
});
self.push(BlockState { biggest: None,
original: Some(msg.data().clone()),
pcs: Default::default() });
}
Ok(())
}
Expand Down

0 comments on commit 8d6a34b

Please sign in to comment.