From 93bf6e05d6340f62c46411b6ba2d180ba2c6cd80 Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Mon, 20 Mar 2023 08:14:35 -0700 Subject: [PATCH 01/12] wip --- toad/src/step/block.rs | 1 + toad/src/step/mod.rs | 23 +++++++++++++++++++++++ toad/src/step/set_standard_options.rs | 5 +++++ 3 files changed, 29 insertions(+) create mode 100644 toad/src/step/block.rs diff --git a/toad/src/step/block.rs b/toad/src/step/block.rs new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/toad/src/step/block.rs @@ -0,0 +1 @@ + diff --git a/toad/src/step/mod.rs b/toad/src/step/mod.rs index e96c8582..8dad658d 100644 --- a/toad/src/step/mod.rs +++ b/toad/src/step/mod.rs @@ -75,6 +75,29 @@ pub mod runtime { } } +/// # Block-wise transfer +/// * Client Flow ✓ +/// * Server Flow ✓ +/// +/// ## Internal State +/// Stores all messages sent, removing them when they will +/// not need to be resent +/// +/// ## Behavior +/// For outbound confirmable requests & responses, uses the params in [`Config.msg.con`](crate::config::Con). +/// +/// For outbound non-confirmable requests, uses the params in [`Config.msg.non`](crate::config::Non). +/// +/// Outbound non-confirmable responses and ACKs will never be retried. +/// +/// Note that the bandwidth used for retrying will never significantly exceed +/// [`probing_rate`](crate::config::Config.probing_rate), so retries may be delayed +/// by a small amount to respect this parameter. +/// +/// ## Transformation +/// None +pub mod block; + /// # Buffer & resend messages until they get a sufficient response /// * Client Flow ✓ /// * Server Flow ✓ diff --git a/toad/src/step/set_standard_options.rs b/toad/src/step/set_standard_options.rs index 2bdc04a0..f09e72be 100644 --- a/toad/src/step/set_standard_options.rs +++ b/toad/src/step/set_standard_options.rs @@ -68,6 +68,11 @@ impl Step

for SetStandardOptions write!(bytes, "{}", host).ok(); msg.as_mut().set_host(bytes.as_str()).ok(); msg.as_mut().set_port(port).ok(); + match msg.data().code.kind() { + toad_msg::CodeKind::Request => toad_msg::opt::known::no_repeat::SIZE1, + toad_msg::CodeKind::Response => toad_msg::opt::known::no_repeat::SIZE2, + toad_msg::CodeKind::Empty => (), + } Ok(()) } From 2957378602d1b6a95bae095fa054f719b71a32df Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Fri, 24 Mar 2023 10:36:30 -0700 Subject: [PATCH 02/12] feat: set size1/2 on outbound messages --- toad/src/step/set_standard_options.rs | 59 ++++++++++++++++++++------- 1 file changed, 45 insertions(+), 14 deletions(-) diff --git a/toad/src/step/set_standard_options.rs b/toad/src/step/set_standard_options.rs index f09e72be..f5b9b1e8 100644 --- a/toad/src/step/set_standard_options.rs +++ b/toad/src/step/set_standard_options.rs @@ -68,10 +68,16 @@ impl Step

for SetStandardOptions write!(bytes, "{}", host).ok(); msg.as_mut().set_host(bytes.as_str()).ok(); msg.as_mut().set_port(port).ok(); + + let payload_len = msg.data().payload().as_bytes().len() as u64; match msg.data().code.kind() { - toad_msg::CodeKind::Request => toad_msg::opt::known::no_repeat::SIZE1, - toad_msg::CodeKind::Response => toad_msg::opt::known::no_repeat::SIZE2, - toad_msg::CodeKind::Empty => (), + | toad_msg::CodeKind::Request => { + msg.as_mut().set_size1(payload_len).ok(); + }, + | toad_msg::CodeKind::Response => { + msg.as_mut().set_size2(payload_len).ok(); + }, + | toad_msg::CodeKind::Empty => (), } Ok(()) @@ -80,26 +86,29 @@ impl Step

for SetStandardOptions #[cfg(test)] mod test { + use embedded_time::Instant; use tinyvec::array_vec; use toad_msg::Type; use super::*; + use crate::platform::Snapshot; use crate::step::test::test_step; + use crate::test; - type InnerPollReq = Addrd>; - type InnerPollResp = Addrd>; + type InnerPollReq = Addrd>; + type InnerPollResp = Addrd>; - fn test_message(ty: Type) -> Addrd { + fn test_message(ty: Type) -> Addrd { use toad_msg::*; - Addrd(crate::test::Message { ver: Default::default(), - ty, - id: Id(1), - code: Code::new(1, 1), - token: Token(array_vec!(_ => 1)), - payload: Payload(Default::default()), - opts: Default::default() }, - crate::test::dummy_addr()) + Addrd(test::Message { ver: Default::default(), + ty, + id: Id(1), + code: Code::new(1, 1), + token: Token(array_vec!(_ => 1)), + payload: Payload(Default::default()), + opts: Default::default() }, + test::dummy_addr()) } test_step!( @@ -127,4 +136,26 @@ mod test { (poll_resp(_, _, _, _) should satisfy { |out| assert_eq!(out, Some(Err(nb::Error::WouldBlock))) }) ] ); + + #[test] + fn options() { + crate::step::test::dummy_step!({Step}); + let s = SetStandardOptions::::default(); + let snap = Snapshot { time: Instant::new(0), + config: Default::default(), + recvd_dgram: None }; + + let mut req = test::msg!(CON GET x.x.x.x:80); + req.as_mut().payload = toad_msg::Payload("Yabba dabba doo!!".bytes().collect()); + + let mut resp = test::msg!(CON {2 . 04} x.x.x.x:80); + resp.as_mut().payload = + toad_msg::Payload("wacky tobaccy is the smacky holacky".bytes().collect()); + + s.before_message_sent(&snap, &mut vec![], &mut req).unwrap(); + s.before_message_sent(&snap, &mut vec![], &mut resp) + .unwrap(); + assert_eq!(req.data().size1(), Some(17)); + assert_eq!(resp.data().size2(), Some(35)); + } } From 403ba94cdc9cca122855d91fa5c276e763b4ab74 Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Tue, 28 Mar 2023 11:44:57 -0700 Subject: [PATCH 03/12] feat!: assemble rxd Blocked responses --- Cargo.lock | 8 +- toad/Cargo.toml | 6 +- toad/src/platform.rs | 16 ++ toad/src/step/block.rs | 489 +++++++++++++++++++++++++++++++++++++++ toad/src/step/mod.rs | 47 ++-- toad/src/step/observe.rs | 202 +--------------- 6 files changed, 554 insertions(+), 214 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a62e0cb4..bb82459c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -600,9 +600,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "openssl" -version = "0.10.51" +version = "0.10.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97ea2d98598bf9ada7ea6ee8a30fb74f9156b63bbe495d64ec2b87c269d2dda3" +checksum = "01b8574602df80f7b85fdfc5392fa884a4e3b3f4f35402c070ab34c3d3f78d56" dependencies = [ "bitflags", "cfg-if", @@ -626,9 +626,9 @@ dependencies = [ [[package]] name = "openssl-sys" -version = "0.9.86" +version = "0.9.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "992bac49bdbab4423199c654a5515bd2a6c6a23bf03f2dd3bdb7e5ae6259bc69" +checksum = "8e17f59264b2809d77ae94f0e1ebabc434773f370d6ca667bd223ea10e06cc7e" dependencies = [ "cc", "libc", diff --git a/toad/Cargo.toml b/toad/Cargo.toml index d4455833..379393f6 100644 --- a/toad/Cargo.toml +++ b/toad/Cargo.toml @@ -48,7 +48,9 @@ toad-string = {version = "0.2.0", default_features = false} toad-msg = "0.18.1" toad-macros = "0.2.0" log = "0.4" -tinyvec = { version = "1.5", default_features = false, features = ["rustc_1_55"] } +tinyvec = { version = "1.5", default_features = false, features = [ + "rustc_1_55" +] } no-std-net = "0.6" embedded-time = "0.12" nb = "1" @@ -65,6 +67,6 @@ serde-json-core = { version = "0.5.0", optional = true } simple_logger = "2" lazycell = "1.3.0" paste = "1.0.9" -serde = {version = "1.0", features = ["derive"]} +serde = { version = "1.0", features = ["derive"] } serde-json-core = { version = "0.5.0" } serde_json = { version = "1.0" } diff --git a/toad/src/platform.rs b/toad/src/platform.rs index d5ac51d6..2ebae444 100644 --- a/toad/src/platform.rs +++ b/toad/src/platform.rs @@ -341,6 +341,22 @@ pub enum Effect

Nop, } +impl

Effect

where P: PlatformTypes +{ + /// Is this [`Effect::Send`]? + pub fn is_send(&self) -> bool { + self.get_send().is_some() + } + + /// If this is [`Effect::Send`], yields a reference to the message + pub fn get_send(&self) -> Option<&Addrd>> { + match self { + | Self::Send(r) => Some(r), + | _ => None, + } + } +} + impl

Default for Effect

where P: PlatformTypes { fn default() -> Self { diff --git a/toad/src/step/block.rs b/toad/src/step/block.rs index 8b137891..b8650977 100644 --- a/toad/src/step/block.rs +++ b/toad/src/step/block.rs @@ -1 +1,490 @@ +use core::marker::PhantomData; +use no_std_net::SocketAddr; +use toad_array::{AppendCopy, Array}; +use toad_map::Map; +use toad_msg::no_repeat::{BLOCK1, BLOCK2, SIZE2}; +use toad_msg::{CodeKind, Id, MessageOptions, Payload, Token, Type}; +use toad_stem::Stem; + +use super::{Step, _try}; +use crate::net::Addrd; +use crate::platform::{self, Effect, PlatformTypes, Snapshot}; +use crate::req::Req; +use crate::resp::Resp; + +#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy)] +enum Piece { + Have(M), + Waiting, + Missing, +} + +impl Piece { + fn get_msg(&self) -> Option<&M> { + match self { + | Piece::Have(m) => Some(m), + | _ => None, + } + } +} + +#[derive(Debug)] +struct BlockState + where P: PlatformTypes +{ + pub(self) biggest: Option, + pub(self) original: Option>, + pub(self) pcs: Pcs, +} + +impl BlockState where P: PlatformTypes +{ + pub(self) fn assembled(&self) -> Payload + where Pcs: Map>> + { + const PANIC_MSG: &'static str = r#"BlockState.assembled() assumes: +- BlockState.biggest is Some(_) +- BlockState contains at least one Piece +- BlockState.have_all() has been invoked and confirmed to be true"#; + + let mut p = P::MessagePayload::default(); + for i in 0..=self.biggest.expect(PANIC_MSG) { + p.append_copy(&self.pcs + .get(&i) + .expect(PANIC_MSG) + .get_msg() + .expect(PANIC_MSG) + .payload + .0); + } + + Payload(p) + } + + pub(self) fn get_missing(&self) -> Option + where Pcs: Map>, + T: PartialEq + { + self.pcs + .iter() + .find_map(|(n, p)| if p == &Piece::Missing { Some(n) } else { None }) + .copied() + } + + pub(self) fn have_all(&self) -> bool + where Pcs: Map>, + T: PartialEq + { + self.pcs + .iter() + .all(|(_, p)| p != &Piece::Missing && p != &Piece::Waiting) + } + + pub(self) fn touch(&mut self, n: u32) + where Pcs: Map> + { + let missing_nums = match self.biggest { + | Some(m) if m + 1 < n => (m + 1..n).into_iter(), + | None if n > 0 => (0..n).into_iter(), + | _ => (0..0).into_iter(), + }; + + missing_nums.for_each(|n| { + self.pcs.insert(n, Piece::Missing).ok(); + }); + + let n_is_bigger = self.biggest.map(|m| m < n).unwrap_or(true); + if n_is_bigger { + self.biggest = Some(n); + } + } + + pub(self) fn waiting(&mut self, n: u32) + where Pcs: Map> + { + let e = self.pcs.get_mut(&n); + + match e { + | Some(Piece::Missing) | Some(Piece::Waiting) | None => { + self.touch(n); + self.pcs.insert(n, Piece::Waiting).ok(); + }, + | _ => (), + } + } + + pub(self) fn have(&mut self, n: u32, m: T) + where Pcs: Map> + { + self.touch(n); + self.pcs.insert(n, Piece::Have(m)).ok(); + } +} + +/// TODO +#[derive(Debug)] +pub struct Block { + inner: S, + block_states: Stem, + __p: PhantomData<(P, Pcs)>, +} + +impl Default for Block + where S: Default, + BS: Default +{ + fn default() -> Self { + Block { inner: S::default(), + block_states: Stem::new(BS::default()), + __p: PhantomData } + } +} + +impl Step

for Block + where P: PlatformTypes, + S: Step>, PollResp = Addrd>>, + Pcs: Map>>, + BS: Array> +{ + type PollReq = Addrd>; + type PollResp = Addrd>; + type Error = S::Error; + type Inner = S; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + fn poll_req(&self, + snap: &crate::platform::Snapshot

, + effects: &mut P::Effects) + -> super::StepOutput { + _try!(Option; self.inner().poll_req(snap, effects)); + None + } + + fn poll_resp(&self, + snap: &Snapshot

, + effects: &mut P::Effects, + token: Token, + addr: SocketAddr) + -> super::StepOutput { + let rep: Addrd> = + _try!(Option; self.inner().poll_resp(snap, effects, token, addr)); + + let block_state_ix = self.block_states.map_ref(|block_states| { + block_states.iter() + .enumerate() + .find(|(_, bs)| { + let block0_and_response_is_for_originating_request = + bs.biggest.is_none() + && bs.original.as_ref().map(|msg| msg.token) + == Some(rep.data().token()); + + let block_n_and_response_matches_previous_block = + bs.pcs + .get(&0) + .and_then(|p| p.get_msg().map(|m| m.cache_key())) + == Some(rep.data().msg().cache_key()); + + block0_and_response_is_for_originating_request + || block_n_and_response_matches_previous_block + }) + .map(|(ix, _)| ix) + }); + + match rep.data().msg().block1() { + | None => { + // Response didn't have Block1; we can drop the block state + if let Some(ix) = block_state_ix { + self.block_states.map_mut(|es| es.remove(ix)); + } + Some(Ok(rep)) + }, + | Some(block) => { + let mut rep = Some(rep); + self.block_states.map_mut(|block_states| { + let mut rep = Option::take(&mut rep).unwrap(); + + match block_state_ix { + | None => { + // Got a Block1 message but we don't have any conception of it; yield the response as-is from the inner step. + Some(Ok(rep)) + }, + | Some(ix) => { + let blocks = block_states.get_mut(ix).unwrap(); + + macro_rules! request_num { + ($num:expr) => {{ + let orig = blocks.original.as_ref().unwrap(); + + let mut new = platform::toad_msg::Message::

::new(Type::Con, + orig.code, + Id(0), + orig.token); + orig.opts.iter().for_each(|(n, vs)| { + if n.include_in_cache_key() { + new.opts.insert(*n, vs.clone()).ok(); + } + }); + new.set_block1(0, $num, false).ok(); + new.remove(BLOCK2); + new.remove(SIZE2); + + effects.push(Effect::Send(rep.as_ref().map(|_| new))); + blocks.waiting($num); + }}; + } + + blocks.have(block.num(), rep.data().msg().clone()); + + if block.more() { + request_num!(block.num() + 1); + } + + if let Some(missing) = blocks.get_missing() { + request_num!(missing); + } + + if blocks.have_all() { + rep.as_mut().msg_mut().payload = blocks.assembled(); + rep.as_mut().msg_mut().remove(BLOCK1); + block_states.remove(ix); + Some(Ok(rep)) + } else { + Some(Err(nb::Error::WouldBlock)) + } + }, + } + }) + }, + } + } + + fn on_message_sent(&self, + snap: &platform::Snapshot

, + msg: &Addrd>) + -> Result<(), Self::Error> { + self.inner.on_message_sent(snap, msg)?; + if msg.data().code.kind() == CodeKind::Request { + self.block_states.map_mut(|block_states| { + block_states.push(BlockState { biggest: None, + original: Some(msg.data().clone()), + pcs: Default::default() }) + }); + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use core::time::Duration; + use std::time::Instant; + + use std_alloc::collections::BTreeMap; + use tinyvec::array_vec; + use toad_msg::{Code, ContentFormat, Id, MessageOptions, Type}; + + use super::*; + use crate::net::Addrd; + use crate::test; + + #[test] + fn ent_correctly_identifies_missing_pieces() { + let mut e = BlockState::>> { biggest: None, + original: None, + pcs: BTreeMap::new() }; + e.have(0, ()); + assert_eq!(e.get_missing(), None); + + e.have(1, ()); + assert_eq!(e.get_missing(), None); + + e.waiting(3); + e.waiting(2); + e.waiting(5); + + assert_eq!(e.get_missing(), Some(4)); + e.waiting(4); + + assert_eq!(e.get_missing(), None); + } + + #[test] + fn when_inner_errors_block_should_error() { + type S = test::MockStep<(), Addrd, Addrd, ()>; + let b = Block::, BTreeMap<_, _>>::default(); + + b.inner() + .set_poll_req(Box::new(|_, _, _| Some(Err(nb::Error::Other(()))))) + .set_poll_resp(Box::new(|_, _, _, _, _| Some(Err(nb::Error::Other(()))))); + + assert_eq!(b.poll_req(&test::snapshot(), &mut vec![]), + Some(Err(nb::Error::Other(())))); + assert_eq!(b.poll_resp(&test::snapshot(), + &mut vec![], + Token(Default::default()), + test::x.x.x.x(80)), + Some(Err(nb::Error::Other(())))); + } + + #[test] + fn when_recv_response_with_no_block1_this_should_do_nothing() { + type S = test::MockStep<(), Addrd, Addrd, ()>; + let b = Block::, BTreeMap<_, _>>::default(); + + b.inner().set_poll_resp(Box::new(|_, _, _, _, _| { + let msg = toad_msg::alloc::Message::new(Type::Con, + Code::GET, + Id(0), + Token(Default::default())); + Some(Ok(Addrd(Resp::from(msg), test::x.x.x.x(80)))) + })); + + let mut effects = vec![]; + assert!(matches!(b.poll_resp(&test::snapshot(), + &mut effects, + Token(Default::default()), + test::x.x.x.x(80)), + Some(Ok(Addrd(_, _))))); + assert!(effects.is_empty()); + } + + #[test] + fn when_recv_response_with_block1_this_should_ask_for_other_blocks() { + struct TestState { + gave_pieces: Vec, + req: Option>, + last_request_at: Instant, + } + + type S = test::MockStep, Addrd, ()>; + let b = Block::, BTreeMap<_, _>>::default(); + + let mut orig_req = test::Message::new(Type::Con, Code::GET, Id(1), Token(array_vec! {1})); + orig_req.set_accept(ContentFormat::Text).ok(); + orig_req.set_path("lipsum").ok(); + + let cache_key = orig_req.cache_key(); + + b.on_message_sent(&test::snapshot(), &Addrd(orig_req, test::x.x.x.x(80))) + .unwrap(); + + let mut effects: Vec = vec![]; + + let payload = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do"; + let payload_blocks = || { + payload.bytes().fold(vec![vec![]], |mut b, a| { + let last = b.last_mut().unwrap(); + if last.len() < 16 { + last.push(a); + } else { + b.push(vec![a]); + } + b + }) + }; + + b.inner() + .init(TestState { gave_pieces: vec![], + req: None, + last_request_at: Instant::now() }) + .set_poll_resp(Box::new(move |mock, _, _, _, _| { + let blocksize: u16 = 16; + let blocks = payload_blocks(); + + let mut resp = + Resp::from(test::Message::new(Type::Non, Code::new(2, 05), Id(1), Token(array_vec! {1}))); + resp.msg_mut().set_size1(payload.len() as _).ok(); + resp.msg_mut().set_path("lipsum").ok(); + + let request = mock.state.map_ref(|s| s.as_ref().unwrap().req.clone()); + + let requested_piece = request.as_ref() + .and_then(|req| req.data().msg().block1()) + .map(|b| b.num()); + let already_gave_pieces = mock.state + .map_ref(|s| s.as_ref().unwrap().gave_pieces.clone()); + let last_request_at = mock.state + .map_ref(|s| s.as_ref().unwrap().last_request_at.clone()); + let elapsed = Instant::now().duration_since(last_request_at); + + match requested_piece { + | None if already_gave_pieces.is_empty() => { + resp.set_payload(blocks[0].iter().copied()); + resp.msg_mut().set_block1(blocksize, 0, true).ok(); + mock.state + .map_mut(|s| s.as_mut().unwrap().last_request_at = Instant::now()); + }, + | None if request.is_none() && elapsed > Duration::from_secs(1) => panic!("timeout"), + | None => panic!("Block1 not set on request when client already got a Block1 response"), + | Some(_) if request.map(|r| r.data().msg().cache_key()) != Some(cache_key) => { + panic!("cache_key mismatch!") + }, + | Some(n) + if already_gave_pieces.iter() + .any(|p| Some(*p) == requested_piece) => + { + panic!("block {n} already given") + }, + | Some(n) if n > 3 => panic!("block {n} out of range"), + | Some(n) => { + resp.set_payload(blocks[n as usize].iter().copied()); + resp.msg_mut().set_block1(blocksize, n, n < 3).ok(); + mock.state.map_mut(|s| { + let s = s.as_mut().unwrap(); + s.gave_pieces.push(n); + s.last_request_at = Instant::now(); + }); + }, + } + + Some(Ok(Addrd(resp, test::x.x.x.x(80)))) + })); + + let rep = loop { + let mut reqs = effects.drain(..) + .filter(|e| e.is_send()) + .collect::>(); + match reqs.len() { + | 0 => (), + | 1 => { + let mut req = reqs.remove(0) + .get_send() + .cloned() + .map(|addrd| addrd.map(Req::from)); + b.inner() + .state + .map_mut(|s| s.as_mut().unwrap().req = Option::take(&mut req)); + }, + | _ => panic!("too many outbound messages queued ({:?})", + reqs.iter() + .cloned() + .map(|r| r.get_send() + .as_ref() + .unwrap() + .data() + .block1() + .unwrap() + .num()) + .collect::>()), + } + + match b.poll_resp(&test::snapshot(), + &mut effects, + Token(array_vec! {1}), + test::x.x.x.x(40)) + { + | Some(Err(nb::Error::WouldBlock)) => continue, + | Some(Err(nb::Error::Other(e))) => panic!("{e:?}"), + | Some(Ok(rep)) => break rep, + | None => panic!("got None"), + } + }; + + assert_eq!(rep.data().payload().copied().collect::>(), + payload.bytes().collect::>()); + } +} diff --git a/toad/src/step/mod.rs b/toad/src/step/mod.rs index 8dad658d..ec8b99ef 100644 --- a/toad/src/step/mod.rs +++ b/toad/src/step/mod.rs @@ -6,7 +6,7 @@ use crate::platform::{self, PlatformTypes}; /// Standard set of Steps pub mod runtime { - use ::toad_msg::Token; + use ::toad_msg::{DefaultCacheKey, Token}; use naan::prelude::{HKT1, HKT2}; use no_std_net::SocketAddr; @@ -80,19 +80,38 @@ pub mod runtime { /// * Server Flow ✓ /// /// ## Internal State -/// Stores all messages sent, removing them when they will -/// not need to be resent +/// * Stores inbound chunked messages for reassembly on final piece /// /// ## Behavior -/// For outbound confirmable requests & responses, uses the params in [`Config.msg.con`](crate::config::Con). -/// -/// For outbound non-confirmable requests, uses the params in [`Config.msg.non`](crate::config::Non). -/// -/// Outbound non-confirmable responses and ACKs will never be retried. -/// -/// Note that the bandwidth used for retrying will never significantly exceed -/// [`probing_rate`](crate::config::Config.probing_rate), so retries may be delayed -/// by a small amount to respect this parameter. +/// ### Inbound Response +/// * response had `Block1 {num: 0, size: , more: true}`? +/// * incr num by 1 and send a duplicate of the original request (stripped of any Block1 options) +/// * response had `Block1 {num: , size: , more: false}`? +/// * yield assembled response to client +/// +/// ### Inbound Request +/// _NOTE: this covers entire request-response flow; requestors have to ask for each block from the server._ +/// * request has `Block1 {num: 0, size: , more: true}`? +/// * store first block +/// * request has `Block1 {num: , size: , more: true}`? +/// * if we have not seen `num - 1`, respond `4.08 REQUEST ENTITY INCOMPLETE` +/// * if we have seen `num - 1`, respond `2.31 CONTINUE Block1 {num: , size: , more: true}` +/// * request has `Block1 {num: , size: , more: false}`? +/// * same checks as `more: true` +/// * let server see assembled request +/// * request has `Block2 {num: 0, size: , more: false}`? +/// * response should be blocked into this size, else `1024` +/// * request has `Block2 {num: , size: , more: false}`? +/// * send requested block +/// +/// ### Outbound Request +/// * outbound requests are automatically broken into blocks, default block size 1024 +/// * when server responds with `2.31 CONTINUE Block1 {num: 0, size: , more: true}`, break the remainder of the payload into block size `` and restart the process for each block +/// * when server responds with `4.08 REQUEST ENTITY INCOMPLETE` restart from block 0 +/// * when server responds with `4.13 REQUEST ENTITY TOO LARGE Block1 {num: 0, size: , more: false}`, restart from block 0 with size `` +/// * when server responds with any other status code and `Block1 {num: , size: , more: }`, treat this response as the final response to the original request +/// * if `more` was false and we have more blocks to send, store the response and pretend we didn't receive it until all blocks sent +/// * if at any time we receive `4.08 REQUEST ENTITY INCOMPLETE` discard the stored response /// /// ## Transformation /// None @@ -151,8 +170,8 @@ pub mod retry; /// /// ### Then /// this step will issue 2 requests to your server: -/// - Request 1 `GET coap://server/temperature` -/// - Request 2 `GET coap://server/temperature?above=23deg` +/// * Request 1 `GET coap://server/temperature` +/// * Request 2 `GET coap://server/temperature?above=23deg` /// /// The response to request 1 will be sent to clients A, B, and C. The response to request 2 will be sent to client D. pub mod observe; diff --git a/toad/src/step/observe.rs b/toad/src/step/observe.rs index 94704786..1e7f89cb 100644 --- a/toad/src/step/observe.rs +++ b/toad/src/step/observe.rs @@ -4,11 +4,10 @@ use core::marker::PhantomData; use no_std_net::SocketAddr; use toad_array::Array; -use toad_hash::Blake2Hasher; use toad_msg::opt::known::observe::Action::{Deregister, Register}; use toad_msg::opt::known::repeat::QUERY; use toad_msg::repeat::PATH; -use toad_msg::{CodeKind, Id, MessageOptions, Token}; +use toad_msg::{CacheKey, CodeKind, DefaultCacheKey, Id, MessageOptions, Token}; use toad_stem::Stem; use super::{log, Step}; @@ -30,126 +29,6 @@ pub mod opt { pub const WAS_CREATED_BY_OBSERVE: OptNumber = OptNumber(65000); } -/// Default hasher used for [`SubscriptionHash`] -/// -/// Hashes: -/// - [Message Type](toad_msg::Message.ty) -/// - [Uri-Path](toad_msg::opt::known::no_repeat::HOST) -/// - [Uri-Query](toad_msg::opt::known::no_repeat::HOST) -/// - [Accept](toad_msg::opt::known::no_repeat::ACCEPT) -#[derive(Debug, Clone)] -#[allow(non_camel_case_types)] -pub struct SubHash_TypePathQueryAccept

(Blake2Hasher, PhantomData

); - -impl

Default for SubHash_TypePathQueryAccept

{ - fn default() -> Self { - Self(Blake2Hasher::new(), PhantomData) - } -} - -impl

SubHash_TypePathQueryAccept

{ - /// Create a new `DefaultSubscriptionHasher` - pub fn new() -> Self { - Self::default() - } -} - -impl

SubscriptionHash

for SubHash_TypePathQueryAccept

where P: PlatformTypes -{ - type Hasher = Blake2Hasher; - - fn hasher(&mut self) -> &mut Self::Hasher { - &mut self.0 - } - - fn subscription_hash(&mut self, sub: &Addrd>) { - let msg = sub.data().msg(); - - msg.ty.hash(&mut self.0); - msg.get(QUERY).into_iter().for_each(|v| { - v.hash(&mut self.0); - }); - msg.accept().hash(&mut self.0); - msg.get(PATH).into_iter().for_each(|v| { - v.hash(&mut self.0); - }); - } -} - -/// Extends [`core::hash::Hash`] with "subscription similarity" -/// used to determine whether similar subscriptions may be grouped together. -/// -/// A default implementation is provided by [`SubHash_TypePathQueryAccept`]. -/// -/// ## Why? -/// When your server [`notify`](super::Step::notify)s the toad runtime -/// that there is a new version of a resource available, all -/// subscriptions matching the path passed to `notify` will be -/// re-sent as new requests to your server. -/// -/// Similar requests (determined by this trait) will be grouped together -/// so that your server only sees 1 request, and the response -/// will be fanned back out to the subscribers. -/// -/// For a more concrete example, see the [module documentation](self). -pub trait SubscriptionHash

- where Self: Sized + Debug, - P: PlatformTypes -{ - /// Type used to generate hashes - type Hasher: Hasher; - - #[allow(missing_docs)] - fn hasher(&mut self) -> &mut Self::Hasher; - - /// Mutate the hasher instance with a subscription - /// - /// To obtain the [`u64`] hash, use [`Hasher::finish`] on [`sub_hash.hasher()`](SubscriptionHash::hasher) - /// - /// ``` - /// use core::hash::Hasher; - /// - /// use toad::net::{ipv4_socketaddr, Addrd}; - /// use toad::platform::toad_msg::Message; - /// use toad::req::Req; - /// use toad::step::observe::{SubHash_TypePathQueryAccept, SubscriptionHash}; - /// use toad_msg::Type::Con; - /// use toad_msg::{Code, Id, Token}; - /// - /// type Std = toad::std::PlatformTypes; - /// - /// let msg_a = Message::::new(Con, Code::GET, Id(1), Token(Default::default())); - /// let req_a = Addrd(Req::::from(msg_a), - /// ipv4_socketaddr([127, 0, 0, 1], 1234)); - /// let mut ha = SubHash_TypePathQueryAccept::new(); - /// ha.subscription_hash(&req_a); - /// - /// let msg_b = Message::::new(Con, Code::GET, Id(2), Token(Default::default())); - /// let req_b = Addrd(Req::::from(msg_b), - /// ipv4_socketaddr([127, 0, 0, 1], 2345)); - /// let mut hb = SubHash_TypePathQueryAccept::new(); - /// hb.subscription_hash(&req_a); - /// - /// assert_eq!(ha.hasher().finish(), hb.hasher().finish()); - /// ``` - fn subscription_hash(&mut self, sub: &Addrd>); -} - -impl SubscriptionHash

for &mut T - where P: PlatformTypes, - T: SubscriptionHash

-{ - type Hasher = T::Hasher; - - fn hasher(&mut self) -> &mut Self::Hasher { - >::hasher(self) - } - - fn subscription_hash(&mut self, sub: &Addrd>) { - >::subscription_hash(self, sub) - } -} - /// An Observe subscription pub struct Sub

where P: PlatformTypes @@ -228,17 +107,17 @@ impl Default for Observe impl Observe { fn hash<'a, P>(sub: &'a Sub

) -> (&'a Sub

, u64) where P: PlatformTypes, - Hasher: SubscriptionHash

+ Default + Hasher: CacheKey + Default { (sub, Self::hash_req(sub.req())) } fn hash_req<'a, P>(sub: &'a Addrd>) -> u64 where P: PlatformTypes, - Hasher: SubscriptionHash

+ Default + Hasher: CacheKey + Default { let mut h = Hasher::default(); - h.subscription_hash(sub); + h.cache_key(sub.data().msg()); h.hasher().finish() } @@ -285,7 +164,7 @@ impl Observe { -> impl 'a + Iterator> where Subs: Array>, P: PlatformTypes, - Hasher: SubscriptionHash

+ Default + Hasher: CacheKey + Default { subs.iter() .filter(move |s| match Self::get(subs, addr, t).map(Self::hash) { @@ -414,7 +293,7 @@ impl Observe { where P: PlatformTypes, Subs: Array>, RequestQueue: Array>>, - Hasher: SubscriptionHash

+ Default + Hasher: CacheKey + Default { Self::subs_matching_path(subs, path).for_each(|sub| { // TODO: handle option capacity @@ -439,7 +318,7 @@ impl Step

for Observe S: Step>, PollResp = Addrd>>, B: Default + Array>, RQ: Default + Array>>, - H: SubscriptionHash

+ Default + H: CacheKey + Default { type PollReq = Addrd>; type PollResp = Addrd>; @@ -576,10 +455,7 @@ mod tests { type Snapshot = crate::platform::Snapshot; type Message = toad_msg::Message; type Sub = super::Sub; - type Observe = super::Observe, - Vec>>, - SubHash_TypePathQueryAccept>; + type Observe = super::Observe, Vec>>, DefaultCacheKey>; type PollReq = Addrd>; type PollResp = Addrd>; @@ -733,66 +609,4 @@ mod tests { (poll_req(_, _) should satisfy { |req| assert!(req.is_none()) }) ] ); - - #[test] - pub fn sub_hash() { - fn req(stuff: F) -> u64 - where F: FnOnce(&mut Message) - { - let mut req = Message::new(Type::Con, Code::GET, Id(1), Token(Default::default())); - stuff(&mut req); - let sub = Sub::new(Addrd(Req::from(req), test::x.x.x.x(0))); - - let mut h = SubHash_TypePathQueryAccept::new(); - h.subscription_hash(sub.req()); - h.hasher().finish() - } - - assert_ne!(req(|r| { - r.set_path("a/b/c").ok(); - }), - req(|_| {})); - assert_eq!(req(|r| { - r.set_path("a/b/c").ok(); - }), - req(|r| { - r.set_path("a/b/c").ok(); - })); - assert_ne!(req(|r| { - r.set_path("a/b/c").ok(); - r.add_query("filter[temp](less_than)=123").ok(); - }), - req(|r| { - r.set_path("a/b/c").ok(); - })); - assert_eq!(req(|r| { - r.set_path("a/b/c").ok(); - r.add_query("filter[temp](less_than)=123").ok(); - }), - req(|r| { - r.set_path("a/b/c").ok(); - r.add_query("filter[temp](less_than)=123").ok(); - r.set_content_format(ContentFormat::Json).ok(); - })); - assert_ne!(req(|r| { - r.set_path("a/b/c").ok(); - r.add_query("filter[temp](less_than)=123").ok(); - r.set_accept(ContentFormat::Json).ok(); - }), - req(|r| { - r.set_path("a/b/c").ok(); - r.add_query("filter[temp](less_than)=123").ok(); - r.set_accept(ContentFormat::Text).ok(); - })); - assert_eq!(req(|r| { - r.set_path("a/b/c").ok(); - r.add_query("filter[temp](less_than)=123").ok(); - r.set_accept(ContentFormat::Json).ok(); - }), - req(|r| { - r.set_path("a/b/c").ok(); - r.add_query("filter[temp](less_than)=123").ok(); - r.set_accept(ContentFormat::Json).ok(); - })); - } } From 5c0db01f6af4410f180850c019c4fffc37c20657 Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Wed, 29 Mar 2023 15:08:22 -0700 Subject: [PATCH 04/12] feat!: assemble rxd Blocked requests --- toad/src/lib.rs | 9 +- toad/src/req/method.rs | 10 +- toad/src/resp/code.rs | 64 ++++++++---- toad/src/resp/mod.rs | 12 +-- toad/src/step/block.rs | 227 ++++++++++++++++++++++++++++++++++++++++- 5 files changed, 283 insertions(+), 39 deletions(-) diff --git a/toad/src/lib.rs b/toad/src/lib.rs index 1a14b3fb..1ba008b3 100644 --- a/toad/src/lib.rs +++ b/toad/src/lib.rs @@ -155,16 +155,21 @@ pub mod multicast { } macro_rules! code { - (rfc7252($section:literal) $name:ident = $c:literal.$d:literal) => { + (rfc7252($section:literal) $name:ident = $c:literal*$d:literal) => { #[doc = toad_macros::rfc_7252_doc!($section)] #[allow(clippy::zero_prefixed_literal)] pub const $name: toad_msg::Code = toad_msg::Code::new($c, $d); }; - (rfc7252($section:literal) $name:ident = $newtype:tt($c:literal.$d:literal)) => { + (rfc7252($section:literal) $name:ident = $newtype:tt($c:literal*$d:literal)) => { #[doc = toad_macros::rfc_7252_doc!($section)] #[allow(clippy::zero_prefixed_literal)] pub const $name: $newtype = $newtype(toad_msg::Code::new($c, $d)); }; + (#[doc = $docstr:expr] $name:ident = $c:literal*$d:literal) => { + #[doc = $docstr] + #[allow(clippy::zero_prefixed_literal)] + pub const $name: toad_msg::Code = toad_msg::Code::new($c, $d); + }; } pub(crate) use code; diff --git a/toad/src/req/method.rs b/toad/src/req/method.rs index 46b11e8c..ff3bfe79 100644 --- a/toad/src/req/method.rs +++ b/toad/src/req/method.rs @@ -45,9 +45,9 @@ impl core::fmt::Display for Method { } impl Method { - code!(rfc7252("4.1") EMPTY = Method(0 . 00)); - code!(rfc7252("5.8.1") GET = Method(0 . 01)); - code!(rfc7252("5.8.2") POST = Method(0 . 02)); - code!(rfc7252("5.8.3") PUT = Method(0 . 03)); - code!(rfc7252("5.8.4") DELETE = Method(0 . 04)); + code!(rfc7252("4.1") EMPTY = Method(0*00)); + code!(rfc7252("5.8.1") GET = Method(0*01)); + code!(rfc7252("5.8.2") POST = Method(0*02)); + code!(rfc7252("5.8.3") PUT = Method(0*03)); + code!(rfc7252("5.8.4") DELETE = Method(0*04)); } diff --git a/toad/src/resp/code.rs b/toad/src/resp/code.rs index 7c893467..ddf9f2c7 100644 --- a/toad/src/resp/code.rs +++ b/toad/src/resp/code.rs @@ -3,28 +3,50 @@ pub use toad_msg::Code; use crate::code; // 2.xx -code!(rfc7252("5.9.1.1") CREATED = 2 . 01); -code!(rfc7252("5.9.1.2") DELETED = 2 . 02); -code!(rfc7252("5.9.1.3") VALID = 2 . 03); -code!(rfc7252("5.9.1.4") CHANGED = 2 . 04); -code!(rfc7252("5.9.1.5") CONTENT = 2 . 05); +code!(rfc7252("5.9.1.1") CREATED = 2*01); +code!(rfc7252("5.9.1.2") DELETED = 2*02); +code!(rfc7252("5.9.1.3") VALID = 2*03); +code!(rfc7252("5.9.1.4") CHANGED = 2*04); +code!(rfc7252("5.9.1.5") CONTENT = 2*05); +code!( + #[doc = concat!( + "## [2.31 Continue](https://www.rfc-editor.org/rfc/rfc7959#section-2.9.1)\n", + "This success status code indicates that the transfer of this\n", + "block of the request body was successful and that the server\n", + "encourages sending further blocks, but that a final outcome of the\n", + "whole block-wise request cannot yet be determined. No payload is\n", + "returned with this response code.", + )] + CONTINUE = 2 * 31 +); // 4.xx -code!(rfc7252("5.9.2.1") BAD_REQUEST = 4 . 00); -code!(rfc7252("5.9.2.2") UNAUTHORIZED = 4 . 01); -code!(rfc7252("5.9.2.3") BAD_OPTION = 4 . 02); -code!(rfc7252("5.9.2.4") FORBIDDEN = 4 . 03); -code!(rfc7252("5.9.2.5") NOT_FOUND = 4 . 04); -code!(rfc7252("5.9.2.6") METHOD_NOT_ALLOWED = 4 . 05); -code!(rfc7252("5.9.2.7") NOT_ACCEPTABLE = 4 . 06); -code!(rfc7252("5.9.2.8") PRECONDITION_FAILED = 4 . 12); -code!(rfc7252("5.9.2.9") REQUEST_ENTITY_TOO_LARGE = 4 . 13); -code!(rfc7252("5.9.2.10") UNSUPPORTED_CONTENT_FORMAT = 4 . 15); +code!(rfc7252("5.9.2.1") BAD_REQUEST = 4*00); +code!(rfc7252("5.9.2.2") UNAUTHORIZED = 4*01); +code!(rfc7252("5.9.2.3") BAD_OPTION = 4*02); +code!(rfc7252("5.9.2.4") FORBIDDEN = 4*03); +code!(rfc7252("5.9.2.5") NOT_FOUND = 4*04); +code!(rfc7252("5.9.2.6") METHOD_NOT_ALLOWED = 4*05); +code!(rfc7252("5.9.2.7") NOT_ACCEPTABLE = 4*06); +code!( + #[doc = concat!( + "## [4.08 Request Entity Incomplete](https://www.rfc-editor.org/rfc/rfc7959#section-2.9.2)\n", + "This client error status code indicates that the server has not\n", + "received the blocks of the request body that it needs to proceed.\n", + "The client has not sent all blocks, not sent them in the order\n", + "required by the server, or has sent them long enough ago that the\n", + "server has already discarded them.", + )] + REQUEST_ENTITY_INCOMPLETE = 4 * 08 +); +code!(rfc7252("5.9.2.8") PRECONDITION_FAILED = 4*12); +code!(rfc7252("5.9.2.9") REQUEST_ENTITY_TOO_LARGE = 4*13); +code!(rfc7252("5.9.2.10") UNSUPPORTED_CONTENT_FORMAT = 4*15); // 5.xx -code!(rfc7252("5.9.3.1") INTERNAL_SERVER_ERROR = 5 . 00); -code!(rfc7252("5.9.3.2") NOT_IMPLEMENTED = 5 . 01); -code!(rfc7252("5.9.3.3") BAD_GATEWAY = 5 . 02); -code!(rfc7252("5.9.3.4") SERVICE_UNAVAILABLE = 5 . 03); -code!(rfc7252("5.9.3.5") GATEWAY_TIMEOUT = 5 . 04); -code!(rfc7252("5.9.3.6") PROXYING_NOT_SUPPORTED = 5 . 05); +code!(rfc7252("5.9.3.1") INTERNAL_SERVER_ERROR = 5*00); +code!(rfc7252("5.9.3.2") NOT_IMPLEMENTED = 5*01); +code!(rfc7252("5.9.3.3") BAD_GATEWAY = 5*02); +code!(rfc7252("5.9.3.4") SERVICE_UNAVAILABLE = 5*03); +code!(rfc7252("5.9.3.5") GATEWAY_TIMEOUT = 5*04); +code!(rfc7252("5.9.3.6") PROXYING_NOT_SUPPORTED = 5*05); diff --git a/toad/src/resp/mod.rs b/toad/src/resp/mod.rs index d846c212..4aa0be4a 100644 --- a/toad/src/resp/mod.rs +++ b/toad/src/resp/mod.rs @@ -129,13 +129,13 @@ impl Resp

{ /// Create a response ACKnowledging an incoming request. /// - /// An ack response must be used when you receive - /// a CON request. + /// Received CON requests will be continually retried until + /// ACKed, making it very important that we acknowledge them + /// quickly on receipt. /// - /// You may choose to include the response payload in an ACK, - /// but keep in mind that you might receive duplicate - /// If you do need to ensure they receive your response, - /// you + /// Servers may choose to include a response to the request + /// along with the ACK (entailing a response [`Code`] and [`Payload`]), + /// as long as care is taken to not delay between receipt and ACK. pub fn ack(req: &Req

) -> Self { let msg = Message { ty: Type::Ack, id: req.msg().id, diff --git a/toad/src/step/block.rs b/toad/src/step/block.rs index b8650977..e8dc1c70 100644 --- a/toad/src/step/block.rs +++ b/toad/src/step/block.rs @@ -11,7 +11,9 @@ use super::{Step, _try}; use crate::net::Addrd; use crate::platform::{self, Effect, PlatformTypes, Snapshot}; use crate::req::Req; +use crate::resp::code::{CONTINUE, REQUEST_ENTITY_INCOMPLETE}; use crate::resp::Resp; +use crate::server::ap::state::Complete; #[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy)] enum Piece { @@ -160,8 +162,92 @@ impl Step

for Block snap: &crate::platform::Snapshot

, effects: &mut P::Effects) -> super::StepOutput { - _try!(Option; self.inner().poll_req(snap, effects)); - None + let mut req = _try!(Option; self.inner().poll_req(snap, effects)); + + match req.data().msg().block2() { + | None => Some(Ok(req)), + | Some(block) => { + let block_state_ix = self.block_states.map_ref(|block_states| { + block_states.iter() + .enumerate() + .find(|(_, bs)| match bs.pcs.get(&0) { + | Some(Piece::Have(m)) => { + m.token == req.data().msg().token + }, + | _ => false, + }) + .map(|(ix, _)| ix) + }); + + macro_rules! respond { + ($code:expr) => {{ + let rep_ty = if req.data().msg().ty == Type::Con { + Type::Ack + } else { + Type::Non + }; + + let rep = + platform::toad_msg::Message::

::new(rep_ty, $code, Id(0), req.data().msg().token); + effects.push(Effect::Send(Addrd(rep, req.addr()))); + }}; + } + + match block_state_ix { + | None if block.num() > 0 => { + respond!(REQUEST_ENTITY_INCOMPLETE); + + Some(Err(nb::Error::WouldBlock)) + }, + | Some(ix) + if self.block_states.map_ref(|block_states| { + block_states[ix].biggest.map(|n| n + 1).unwrap_or(0) < block.num() + }) => + { + self.block_states + .map_mut(|block_states| block_states.remove(ix)); + respond!(REQUEST_ENTITY_INCOMPLETE); + + Some(Err(nb::Error::WouldBlock)) + }, + | None if block.more() => { + let mut block_state = BlockState { biggest: Some(0), + original: None, + pcs: Default::default() }; + block_state.have(0, req.data().msg().clone()); + + let mut block_state = Some(block_state); + self.block_states + .map_mut(|block_states| block_states.push(Option::take(&mut block_state).unwrap())); + respond!(CONTINUE); + + Some(Err(nb::Error::WouldBlock)) + }, + | None => { + // this is block 0 and there are no more blocks, + // simply yield the request + Some(Ok(req)) + }, + | Some(ix) => { + self.block_states.map_mut(|block_states| { + block_states[ix].have(block.num(), req.data().msg().clone()) + }); + + if block.more() { + respond!(CONTINUE); + Some(Err(nb::Error::WouldBlock)) + } else { + let p = self.block_states + .map_ref(|block_states| block_states[ix].assembled()); + req.as_mut().msg_mut().payload = p; + self.block_states + .map_mut(|block_states| block_states.remove(ix)); + Some(Ok(req)) + } + }, + } + }, + } } fn poll_resp(&self, @@ -360,6 +446,14 @@ mod tests { last_request_at: Instant, } + struct Addrs { + server: SocketAddr, + client: SocketAddr, + } + + let addrs = Addrs { server: test::x.x.x.x(80), + client: test::x.x.x.x(10) }; + type S = test::MockStep, Addrd, ()>; let b = Block::, BTreeMap<_, _>>::default(); @@ -369,7 +463,7 @@ mod tests { let cache_key = orig_req.cache_key(); - b.on_message_sent(&test::snapshot(), &Addrd(orig_req, test::x.x.x.x(80))) + b.on_message_sent(&test::snapshot(), &Addrd(orig_req, addrs.client)) .unwrap(); let mut effects: Vec = vec![]; @@ -441,7 +535,7 @@ mod tests { }, } - Some(Ok(Addrd(resp, test::x.x.x.x(80)))) + Some(Ok(Addrd(resp, addrs.server))) })); let rep = loop { @@ -475,7 +569,7 @@ mod tests { match b.poll_resp(&test::snapshot(), &mut effects, Token(array_vec! {1}), - test::x.x.x.x(40)) + addrs.server) { | Some(Err(nb::Error::WouldBlock)) => continue, | Some(Err(nb::Error::Other(e))) => panic!("{e:?}"), @@ -487,4 +581,127 @@ mod tests { assert_eq!(rep.data().payload().copied().collect::>(), payload.bytes().collect::>()); } + + #[test] + fn when_recv_request_without_block2_this_should_do_nothing() { + #[derive(Clone, Copy)] + struct Addrs { + server: SocketAddr, + client: SocketAddr, + } + + let addrs = Addrs { server: test::x.x.x.x(80), + client: test::x.x.x.x(10) }; + let addrs: &'static Addrs = unsafe { core::mem::transmute(&addrs) }; + + type S = test::MockStep<(), Addrd, Addrd, ()>; + let b = Block::, BTreeMap<_, _>>::default(); + + b.inner().set_poll_req(Box::new(|_, _, _| { + let req = test::Message::new(Type::Con, + Code::POST, + Id(0), + Token(Default::default())); + Some(Ok(Addrd(Req::from(req), addrs.client))) + })); + + let mut effects = vec![]; + b.poll_req(&test::snapshot(), &mut effects) + .unwrap() + .unwrap(); + + assert!(effects.is_empty()); + } + + #[test] + fn when_recv_request_with_block2_and_recognized_number_this_should_respond_2_31() { + struct TestState { + next_block: u32, + } + + #[derive(Clone, Copy)] + struct Addrs { + server: SocketAddr, + client: SocketAddr, + } + + let addrs = Addrs { server: test::x.x.x.x(80), + client: test::x.x.x.x(10) }; + let addrs: &'static Addrs = unsafe { core::mem::transmute(&addrs) }; + + type S = test::MockStep, Addrd, ()>; + let b = Block::, BTreeMap<_, _>>::default(); + + b.inner() + .init(TestState { next_block: 0 }) + .set_poll_req(Box::new(|mock, _, _| { + let mut req = + test::Message::new(Type::Con, Code::POST, Id(0), Token(Default::default())); + let num = mock.state.map_ref(|s| s.as_ref().unwrap().next_block); + req.set_block2(128, num, num < 2).ok(); + req.set_payload(Payload(core::iter::repeat(0u8).take(128).collect())); + + mock.state.map_mut(|s| s.as_mut().unwrap().next_block += 1); + Some(Ok(Addrd(Req::from(req), addrs.client))) + })); + + let mut effects = vec![]; + + // get block 0 + assert_eq!(b.poll_req(&test::snapshot(), &mut effects), + Some(Err(nb::Error::WouldBlock))); + + let resp = effects[0].get_send().unwrap(); + assert_eq!(resp.data().code, Code::new(2, 31)); + effects.clear(); + + // get block 1 + assert_eq!(b.poll_req(&test::snapshot(), &mut effects), + Some(Err(nb::Error::WouldBlock))); + + let resp = effects[0].get_send().unwrap(); + assert_eq!(resp.data().code, Code::new(2, 31)); + effects.clear(); + + // get block 2 + let assembled = b.poll_req(&test::snapshot(), &mut effects); + assert!(matches!(assembled, Some(Ok(_)))); + assert_eq!(assembled.unwrap().unwrap().data().payload().len(), 128 * 3); + assert!(effects.is_empty()); + } + + #[test] + fn when_recv_request_with_block2_and_unrecognized_number_this_should_respond_4_08() { + #[derive(Clone, Copy)] + struct Addrs { + server: SocketAddr, + client: SocketAddr, + } + + let addrs = Addrs { server: test::x.x.x.x(80), + client: test::x.x.x.x(10) }; + let addrs: &'static Addrs = unsafe { core::mem::transmute(&addrs) }; + + type S = test::MockStep<(), Addrd, Addrd, ()>; + let b = Block::, BTreeMap<_, _>>::default(); + + b.inner().set_poll_req(Box::new(|_, _, _| { + let mut req = test::Message::new(Type::Con, + Code::POST, + Id(0), + Token(Default::default())); + req.set_block2(128, 1, true).ok(); + req.set_payload(Payload(core::iter::repeat(0u8).take(128).collect())); + Some(Ok(Addrd(Req::from(req), addrs.client))) + })); + + let mut effects = vec![]; + assert_eq!(b.poll_req(&test::snapshot(), &mut effects), + Some(Err(nb::Error::WouldBlock))); + + assert!(!effects.is_empty()); + + let resp = effects[0].get_send().unwrap(); + assert_eq!(resp.data().code, Code::new(4, 08)); + } } From 02b8916d3deb27b42e05718ee413cd7e9a367a50 Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Thu, 30 Mar 2023 11:26:07 -0700 Subject: [PATCH 05/12] refactor: judicious locks, move access patterns to Block step methods --- toad/src/step/block.rs | 295 ++++++++++++++++++++++++----------------- 1 file changed, 175 insertions(+), 120 deletions(-) diff --git a/toad/src/step/block.rs b/toad/src/step/block.rs index e8dc1c70..071151d2 100644 --- a/toad/src/step/block.rs +++ b/toad/src/step/block.rs @@ -1,5 +1,6 @@ use core::marker::PhantomData; +use naan::prelude::F1Once; use no_std_net::SocketAddr; use toad_array::{AppendCopy, Array}; use toad_map::Map; @@ -9,6 +10,7 @@ use toad_stem::Stem; use super::{Step, _try}; use crate::net::Addrd; +use crate::platform::toad_msg::Message; use crate::platform::{self, Effect, PlatformTypes, Snapshot}; use crate::req::Req; use crate::resp::code::{CONTINUE, REQUEST_ENTITY_INCOMPLETE}; @@ -36,14 +38,14 @@ struct BlockState where P: PlatformTypes { pub(self) biggest: Option, - pub(self) original: Option>, + pub(self) original: Option>, pub(self) pcs: Pcs, } impl BlockState where P: PlatformTypes { pub(self) fn assembled(&self) -> Payload - where Pcs: Map>> + where Pcs: Map>> { const PANIC_MSG: &'static str = r#"BlockState.assembled() assumes: - BlockState.biggest is Some(_) @@ -122,6 +124,10 @@ impl BlockState where P: PlatformTypes self.touch(n); self.pcs.insert(n, Piece::Have(m)).ok(); } + + pub(self) fn biggest(&self) -> Option { + self.biggest + } } /// TODO @@ -143,10 +149,108 @@ impl Default for Block } } +impl Block where P: PlatformTypes +{ + fn find_rx_request_block_state_ix(&self, token: Token) -> Option + where Pcs: Map>>, + BS: Array> + { + self.block_states.map_ref(|bs| { + bs.iter() + .enumerate() + .find(|(_, bs)| match bs.pcs.get(&0) { + | Some(Piece::Have(m)) => m.token == token, + | _ => false, + }) + .map(|(ix, _)| ix) + }) + } + + fn find_rx_response_block_state_ix(&self, token: Token, cache_key: u64) -> Option + where Pcs: Map>>, + BS: Array> + { + self.block_states.map_ref(|block_states| { + block_states.iter() + .enumerate() + .find(|(_, bs)| { + let block0_and_response_is_for_originating_request = + bs.biggest.is_none() + && bs.original.as_ref().map(|msg| msg.token) == Some(token); + + let block_n_and_response_matches_previous_block = + bs.pcs + .get(&0) + .and_then(|p| p.get_msg().map(|m| m.cache_key())) + == Some(cache_key); + + block0_and_response_is_for_originating_request + || block_n_and_response_matches_previous_block + }) + .map(|(ix, _)| ix) + }) + } + + fn block_state_mut(&self, ix: usize, f: F) -> R + where F: for<'a> F1Once<&'a mut BlockState, Ret = R>, + Pcs: Map>>, + BS: Array> + { + let mut f = Some(f); + self.block_states + .map_mut(|bs| Option::take(&mut f).unwrap().call1(&mut bs[ix])) + } + + fn block_state(&self, ix: usize, f: F) -> R + where F: for<'a> F1Once<&'a BlockState, Ret = R>, + Pcs: Map>>, + BS: Array> + { + let mut f = Some(f); + self.block_states + .map_ref(|bs| Option::take(&mut f).unwrap().call1(&bs[ix])) + } + + fn push(&self, s: BlockState) + where Pcs: Map>>, + BS: Array> + { + let mut s = Some(s); + self.block_states + .map_mut(|bs| bs.push(Option::take(&mut s).unwrap())); + } + + fn remove(&self, ix: usize) + where Pcs: Map>>, + BS: Array> + { + self.block_states.map_mut(|bs| bs.remove(ix)); + } + + fn clone_original(&self, ix: usize) -> Message

+ where Pcs: Map>>, + BS: Array> + { + self.block_state(ix, |s: &BlockState<_, _>| { + let orig: &Message

= s.original.as_ref().unwrap(); + + let mut new = Message::

::new(Type::Con, orig.code, Id(0), orig.token); + + orig.opts.iter().for_each(|(n, vs)| { + if n.include_in_cache_key() { + new.opts.insert(*n, vs.clone()).ok(); + } + }); + + new + }) + } +} + impl Step

for Block where P: PlatformTypes, S: Step>, PollResp = Addrd>>, - Pcs: Map>>, + Pcs: Map>>, BS: Array> { type PollReq = Addrd>; @@ -167,17 +271,7 @@ impl Step

for Block match req.data().msg().block2() { | None => Some(Ok(req)), | Some(block) => { - let block_state_ix = self.block_states.map_ref(|block_states| { - block_states.iter() - .enumerate() - .find(|(_, bs)| match bs.pcs.get(&0) { - | Some(Piece::Have(m)) => { - m.token == req.data().msg().token - }, - | _ => false, - }) - .map(|(ix, _)| ix) - }); + let block_state_ix = self.find_rx_request_block_state_ix(req.data().msg().token); macro_rules! respond { ($code:expr) => {{ @@ -187,8 +281,7 @@ impl Step

for Block Type::Non }; - let rep = - platform::toad_msg::Message::

::new(rep_ty, $code, Id(0), req.data().msg().token); + let rep = Message::

::new(rep_ty, $code, Id(0), req.data().msg().token); effects.push(Effect::Send(Addrd(rep, req.addr()))); }}; } @@ -199,26 +292,13 @@ impl Step

for Block Some(Err(nb::Error::WouldBlock)) }, - | Some(ix) - if self.block_states.map_ref(|block_states| { - block_states[ix].biggest.map(|n| n + 1).unwrap_or(0) < block.num() - }) => - { - self.block_states - .map_mut(|block_states| block_states.remove(ix)); - respond!(REQUEST_ENTITY_INCOMPLETE); - - Some(Err(nb::Error::WouldBlock)) - }, | None if block.more() => { let mut block_state = BlockState { biggest: Some(0), original: None, pcs: Default::default() }; block_state.have(0, req.data().msg().clone()); - let mut block_state = Some(block_state); - self.block_states - .map_mut(|block_states| block_states.push(Option::take(&mut block_state).unwrap())); + self.push(block_state); respond!(CONTINUE); Some(Err(nb::Error::WouldBlock)) @@ -228,20 +308,29 @@ impl Step

for Block // simply yield the request Some(Ok(req)) }, + | Some(ix) + if self.block_state(ix, BlockState::biggest) + .map(|n| n + 1) + .unwrap_or(0) + < block.num() => + { + self.remove(ix); + respond!(REQUEST_ENTITY_INCOMPLETE); + + Some(Err(nb::Error::WouldBlock)) + }, | Some(ix) => { - self.block_states.map_mut(|block_states| { - block_states[ix].have(block.num(), req.data().msg().clone()) - }); + self.block_state_mut(ix, |s: &mut BlockState<_, _>| { + s.have(block.num(), req.data().msg().clone()) + }); if block.more() { respond!(CONTINUE); Some(Err(nb::Error::WouldBlock)) } else { - let p = self.block_states - .map_ref(|block_states| block_states[ix].assembled()); + let p = self.block_state(ix, BlockState::assembled); req.as_mut().msg_mut().payload = p; - self.block_states - .map_mut(|block_states| block_states.remove(ix)); + self.remove(ix); Some(Ok(req)) } }, @@ -256,109 +345,75 @@ impl Step

for Block token: Token, addr: SocketAddr) -> super::StepOutput { - let rep: Addrd> = + let mut rep: Addrd> = _try!(Option; self.inner().poll_resp(snap, effects, token, addr)); - let block_state_ix = self.block_states.map_ref(|block_states| { - block_states.iter() - .enumerate() - .find(|(_, bs)| { - let block0_and_response_is_for_originating_request = - bs.biggest.is_none() - && bs.original.as_ref().map(|msg| msg.token) - == Some(rep.data().token()); - - let block_n_and_response_matches_previous_block = - bs.pcs - .get(&0) - .and_then(|p| p.get_msg().map(|m| m.cache_key())) - == Some(rep.data().msg().cache_key()); - - block0_and_response_is_for_originating_request - || block_n_and_response_matches_previous_block - }) - .map(|(ix, _)| ix) - }); + let block_state_ix = + self.find_rx_response_block_state_ix(rep.data().msg().token, rep.data().msg().cache_key()); match rep.data().msg().block1() { | None => { // Response didn't have Block1; we can drop the block state if let Some(ix) = block_state_ix { - self.block_states.map_mut(|es| es.remove(ix)); + self.remove(ix); } Some(Ok(rep)) }, | Some(block) => { - let mut rep = Some(rep); - self.block_states.map_mut(|block_states| { - let mut rep = Option::take(&mut rep).unwrap(); - - match block_state_ix { - | None => { - // Got a Block1 message but we don't have any conception of it; yield the response as-is from the inner step. - Some(Ok(rep)) - }, - | Some(ix) => { - let blocks = block_states.get_mut(ix).unwrap(); - - macro_rules! request_num { - ($num:expr) => {{ - let orig = blocks.original.as_ref().unwrap(); - - let mut new = platform::toad_msg::Message::

::new(Type::Con, - orig.code, - Id(0), - orig.token); - orig.opts.iter().for_each(|(n, vs)| { - if n.include_in_cache_key() { - new.opts.insert(*n, vs.clone()).ok(); - } - }); - new.set_block1(0, $num, false).ok(); - new.remove(BLOCK2); - new.remove(SIZE2); - - effects.push(Effect::Send(rep.as_ref().map(|_| new))); - blocks.waiting($num); - }}; - } - - blocks.have(block.num(), rep.data().msg().clone()); - - if block.more() { - request_num!(block.num() + 1); - } - - if let Some(missing) = blocks.get_missing() { - request_num!(missing); - } - - if blocks.have_all() { - rep.as_mut().msg_mut().payload = blocks.assembled(); - rep.as_mut().msg_mut().remove(BLOCK1); - block_states.remove(ix); - Some(Ok(rep)) - } else { - Some(Err(nb::Error::WouldBlock)) - } - }, - } - }) + match block_state_ix { + | None => { + // Got a Block1 message but we don't have any conception of it; yield the response as-is from the inner step. + Some(Ok(rep)) + }, + | Some(ix) => { + macro_rules! request_num { + ($num:expr) => {{ + let mut new = self.clone_original(ix); + + new.set_block1(0, $num, false).ok(); + new.remove(BLOCK2); + new.remove(SIZE2); + + effects.push(Effect::Send(rep.as_ref().map(|_| new))); + self.block_state_mut(ix, |s: &mut BlockState<_, _>| s.waiting($num)); + }}; + } + + self.block_state_mut(ix, |s: &mut BlockState<_, _>| { + s.have(block.num(), rep.data().msg().clone()) + }); + + if block.more() { + request_num!(block.num() + 1); + } + + if let Some(missing) = self.block_state(ix, BlockState::get_missing) { + request_num!(missing); + } + + if self.block_state(ix, BlockState::have_all) { + rep.as_mut().msg_mut().payload = self.block_state(ix, BlockState::assembled); + rep.as_mut().msg_mut().remove(BLOCK1); + self.remove(ix); + Some(Ok(rep)) + } else { + Some(Err(nb::Error::WouldBlock)) + } + }, + } }, } } fn on_message_sent(&self, snap: &platform::Snapshot

, - msg: &Addrd>) + msg: &Addrd>) -> Result<(), Self::Error> { self.inner.on_message_sent(snap, msg)?; if msg.data().code.kind() == CodeKind::Request { - self.block_states.map_mut(|block_states| { - block_states.push(BlockState { biggest: None, - original: Some(msg.data().clone()), - pcs: Default::default() }) - }); + self.push(BlockState { biggest: None, + original: Some(msg.data().clone()), + pcs: Default::default() }); } Ok(()) } From 8ba40fc75ffd06e30a4408b0611937c64ef2e3fc Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Wed, 26 Apr 2023 10:13:33 -0500 Subject: [PATCH 06/12] fix: tests --- toad/src/step/block.rs | 78 ++++++++++++++++++++---------------------- toad/src/step/mod.rs | 2 +- 2 files changed, 39 insertions(+), 41 deletions(-) diff --git a/toad/src/step/block.rs b/toad/src/step/block.rs index 071151d2..a44be679 100644 --- a/toad/src/step/block.rs +++ b/toad/src/step/block.rs @@ -407,9 +407,10 @@ impl Step

for Block fn on_message_sent(&self, snap: &platform::Snapshot

, + effs: &mut P::Effects, msg: &Addrd>) -> Result<(), Self::Error> { - self.inner.on_message_sent(snap, msg)?; + self.inner.on_message_sent(snap, effs, msg)?; if msg.data().code.kind() == CodeKind::Request { self.push(BlockState { biggest: None, original: Some(msg.data().clone()), @@ -459,8 +460,8 @@ mod tests { let b = Block::, BTreeMap<_, _>>::default(); b.inner() - .set_poll_req(Box::new(|_, _, _| Some(Err(nb::Error::Other(()))))) - .set_poll_resp(Box::new(|_, _, _, _, _| Some(Err(nb::Error::Other(()))))); + .set_poll_req(|_, _, _| Some(Err(nb::Error::Other(())))) + .set_poll_resp(|_, _, _, _, _| Some(Err(nb::Error::Other(())))); assert_eq!(b.poll_req(&test::snapshot(), &mut vec![]), Some(Err(nb::Error::Other(())))); @@ -476,13 +477,13 @@ mod tests { type S = test::MockStep<(), Addrd, Addrd, ()>; let b = Block::, BTreeMap<_, _>>::default(); - b.inner().set_poll_resp(Box::new(|_, _, _, _, _| { - let msg = toad_msg::alloc::Message::new(Type::Con, - Code::GET, - Id(0), - Token(Default::default())); - Some(Ok(Addrd(Resp::from(msg), test::x.x.x.x(80)))) - })); + b.inner().set_poll_resp(|_, _, _, _, _| { + let msg = toad_msg::alloc::Message::new(Type::Con, + Code::GET, + Id(0), + Token(Default::default())); + Some(Ok(Addrd(Resp::from(msg), test::x.x.x.x(80)))) + }); let mut effects = vec![]; assert!(matches!(b.poll_resp(&test::snapshot(), @@ -518,11 +519,13 @@ mod tests { let cache_key = orig_req.cache_key(); - b.on_message_sent(&test::snapshot(), &Addrd(orig_req, addrs.client)) - .unwrap(); - let mut effects: Vec = vec![]; + b.on_message_sent(&test::snapshot(), + &mut effects, + &Addrd(orig_req, addrs.client)) + .unwrap(); + let payload = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do"; let payload_blocks = || { payload.bytes().fold(vec![vec![]], |mut b, a| { @@ -540,7 +543,7 @@ mod tests { .init(TestState { gave_pieces: vec![], req: None, last_request_at: Instant::now() }) - .set_poll_resp(Box::new(move |mock, _, _, _, _| { + .set_poll_resp(move |mock, _, _, _, _| { let blocksize: u16 = 16; let blocks = payload_blocks(); @@ -591,7 +594,7 @@ mod tests { } Some(Ok(Addrd(resp, addrs.server))) - })); + }); let rep = loop { let mut reqs = effects.drain(..) @@ -652,13 +655,11 @@ mod tests { type S = test::MockStep<(), Addrd, Addrd, ()>; let b = Block::, BTreeMap<_, _>>::default(); - b.inner().set_poll_req(Box::new(|_, _, _| { - let req = test::Message::new(Type::Con, - Code::POST, - Id(0), - Token(Default::default())); - Some(Ok(Addrd(Req::from(req), addrs.client))) - })); + b.inner().set_poll_req(|_, _, _| { + let req = + test::Message::new(Type::Con, Code::POST, Id(0), Token(Default::default())); + Some(Ok(Addrd(Req::from(req), addrs.client))) + }); let mut effects = vec![]; b.poll_req(&test::snapshot(), &mut effects) @@ -689,16 +690,15 @@ mod tests { b.inner() .init(TestState { next_block: 0 }) - .set_poll_req(Box::new(|mock, _, _| { - let mut req = - test::Message::new(Type::Con, Code::POST, Id(0), Token(Default::default())); - let num = mock.state.map_ref(|s| s.as_ref().unwrap().next_block); - req.set_block2(128, num, num < 2).ok(); - req.set_payload(Payload(core::iter::repeat(0u8).take(128).collect())); + .set_poll_req(|mock, _, _| { + let mut req = test::Message::new(Type::Con, Code::POST, Id(0), Token(Default::default())); + let num = mock.state.map_ref(|s| s.as_ref().unwrap().next_block); + req.set_block2(128, num, num < 2).ok(); + req.set_payload(Payload(core::iter::repeat(0u8).take(128).collect())); - mock.state.map_mut(|s| s.as_mut().unwrap().next_block += 1); - Some(Ok(Addrd(Req::from(req), addrs.client))) - })); + mock.state.map_mut(|s| s.as_mut().unwrap().next_block += 1); + Some(Ok(Addrd(Req::from(req), addrs.client))) + }); let mut effects = vec![]; @@ -740,15 +740,13 @@ mod tests { type S = test::MockStep<(), Addrd, Addrd, ()>; let b = Block::, BTreeMap<_, _>>::default(); - b.inner().set_poll_req(Box::new(|_, _, _| { - let mut req = test::Message::new(Type::Con, - Code::POST, - Id(0), - Token(Default::default())); - req.set_block2(128, 1, true).ok(); - req.set_payload(Payload(core::iter::repeat(0u8).take(128).collect())); - Some(Ok(Addrd(Req::from(req), addrs.client))) - })); + b.inner().set_poll_req(|_, _, _| { + let mut req = + test::Message::new(Type::Con, Code::POST, Id(0), Token(Default::default())); + req.set_block2(128, 1, true).ok(); + req.set_payload(Payload(core::iter::repeat(0u8).take(128).collect())); + Some(Ok(Addrd(Req::from(req), addrs.client))) + }); let mut effects = vec![]; assert_eq!(b.poll_req(&test::snapshot(), &mut effects), diff --git a/toad/src/step/mod.rs b/toad/src/step/mod.rs index ec8b99ef..4e5a71b4 100644 --- a/toad/src/step/mod.rs +++ b/toad/src/step/mod.rs @@ -48,7 +48,7 @@ pub mod runtime { pub type Observe = observe::Observe>, Array>>, - observe::SubHash_TypePathQueryAccept

>; + toad_msg::DefaultCacheKey>; /// Parse -> ProvisionIds -> ProvisionTokens -> Ack -> Retry -> HandleAcks -> BufferResponses -> Observe #[rustfmt::skip] From 5a0c7c72cd24d92c5b4790401ac80dd3d2caffdb Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Sun, 7 May 2023 20:45:25 -0500 Subject: [PATCH 07/12] fix: remove block state entries after exchange_lifetime --- toad/src/config.rs | 2 - toad/src/step/block.rs | 300 ++++++++++++++++++++++++++++++----------- 2 files changed, 219 insertions(+), 83 deletions(-) diff --git a/toad/src/config.rs b/toad/src/config.rs index 71b095f7..27ff7cf1 100644 --- a/toad/src/config.rs +++ b/toad/src/config.rs @@ -1,5 +1,3 @@ -#![allow(dead_code)] - use embedded_time::duration::Milliseconds; use crate::retry::{Attempts, Strategy}; diff --git a/toad/src/step/block.rs b/toad/src/step/block.rs index a44be679..df63c3ba 100644 --- a/toad/src/step/block.rs +++ b/toad/src/step/block.rs @@ -1,5 +1,7 @@ use core::marker::PhantomData; +use embedded_time::duration::Milliseconds; +use embedded_time::Instant; use naan::prelude::F1Once; use no_std_net::SocketAddr; use toad_array::{AppendCopy, Array}; @@ -8,14 +10,13 @@ use toad_msg::no_repeat::{BLOCK1, BLOCK2, SIZE2}; use toad_msg::{CodeKind, Id, MessageOptions, Payload, Token, Type}; use toad_stem::Stem; -use super::{Step, _try}; +use super::{log, Step, _try}; use crate::net::Addrd; use crate::platform::toad_msg::Message; use crate::platform::{self, Effect, PlatformTypes, Snapshot}; use crate::req::Req; use crate::resp::code::{CONTINUE, REQUEST_ENTITY_INCOMPLETE}; use crate::resp::Resp; -use crate::server::ap::state::Complete; #[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy)] enum Piece { @@ -38,14 +39,15 @@ struct BlockState where P: PlatformTypes { pub(self) biggest: Option, - pub(self) original: Option>, + pub(self) original: Option>>, pub(self) pcs: Pcs, + pub(self) expires_at: Instant, } impl BlockState where P: PlatformTypes { pub(self) fn assembled(&self) -> Payload - where Pcs: Map>> + where Pcs: Map>>> { const PANIC_MSG: &'static str = r#"BlockState.assembled() assumes: - BlockState.biggest is Some(_) @@ -59,6 +61,7 @@ impl BlockState where P: PlatformTypes .expect(PANIC_MSG) .get_msg() .expect(PANIC_MSG) + .data() .payload .0); } @@ -66,6 +69,7 @@ impl BlockState where P: PlatformTypes Payload(p) } + /// find a missing piece that should be requested pub(self) fn get_missing(&self) -> Option where Pcs: Map>, T: PartialEq @@ -76,6 +80,7 @@ impl BlockState where P: PlatformTypes .copied() } + /// are no pieces waiting or missing? pub(self) fn have_all(&self) -> bool where Pcs: Map>, T: PartialEq @@ -85,7 +90,9 @@ impl BlockState where P: PlatformTypes .all(|(_, p)| p != &Piece::Missing && p != &Piece::Waiting) } - pub(self) fn touch(&mut self, n: u32) + /// if `n > self.biggest`, update `self.biggest` to `n` + /// and insert `Piece::Missing` for all pieces between `biggest` and `n` + pub(self) fn touch(&mut self, now: Instant, n: u32) where Pcs: Map> { let missing_nums = match self.biggest { @@ -104,24 +111,26 @@ impl BlockState where P: PlatformTypes } } - pub(self) fn waiting(&mut self, n: u32) + /// Mark piece `n` as having been requested + pub(self) fn waiting(&mut self, now: Instant, n: u32) where Pcs: Map> { let e = self.pcs.get_mut(&n); match e { | Some(Piece::Missing) | Some(Piece::Waiting) | None => { - self.touch(n); + self.touch(now, n); self.pcs.insert(n, Piece::Waiting).ok(); }, | _ => (), } } - pub(self) fn have(&mut self, n: u32, m: T) + /// Store piece `T` corresponding to piece number `n` + pub(self) fn have(&mut self, now: Instant, n: u32, m: T) where Pcs: Map> { - self.touch(n); + self.touch(now, n); self.pcs.insert(n, Piece::Have(m)).ok(); } @@ -151,23 +160,55 @@ impl Default for Block impl Block where P: PlatformTypes { - fn find_rx_request_block_state_ix(&self, token: Token) -> Option - where Pcs: Map>>, + fn prune(&self, effects: &mut P::Effects, now: Instant) + where Pcs: Map>>>, + BS: Array> + { + let len_before = self.block_states.map_ref(|bss| bss.len()); + fn go<_P, _S, _BS, _Pcs>(now: Instant<_P::Clock>, b: &Block<_P, _S, _BS, _Pcs>) + where _P: PlatformTypes, + _Pcs: Map>>>, + _BS: Array> + { + if let Some(ix) = b.block_states.map_ref(|bss| { + bss.iter().enumerate().find_map(|(ix, b)| { + if now >= b.expires_at { + Some(ix) + } else { + None + } + }) + }) + { + b.block_states.map_mut(|bss| bss.remove(ix)); + go(now, b); + } + } + + go(now, self); + let len_after = self.block_states.map_ref(|bss| bss.len()); + if len_before - len_after > 0 { + log!(Block::prune, effects, log::Level::Debug, "Removed {} expired entries. For outbound messages, a prior step SHOULD but MAY NOT retry sending them", len_before - len_after); + } + } + + fn find_rx_request_block_state_ix(&self, token: Addrd) -> Option + where Pcs: Map>>>, BS: Array> { self.block_states.map_ref(|bs| { bs.iter() .enumerate() .find(|(_, bs)| match bs.pcs.get(&0) { - | Some(Piece::Have(m)) => m.token == token, + | Some(Piece::Have(m)) => m.as_ref().map(|m| m.token) == token, | _ => false, }) .map(|(ix, _)| ix) }) } - fn find_rx_response_block_state_ix(&self, token: Token, cache_key: u64) -> Option - where Pcs: Map>>, + fn find_rx_response_block_state_ix(&self, token: Addrd, cache_key: u64) -> Option + where Pcs: Map>>>, BS: Array> { self.block_states.map_ref(|block_states| { @@ -176,12 +217,15 @@ impl Block where P: PlatformTypes .find(|(_, bs)| { let block0_and_response_is_for_originating_request = bs.biggest.is_none() - && bs.original.as_ref().map(|msg| msg.token) == Some(token); + && bs.original + .as_ref() + .map(|msg| msg.as_ref().map(|m| m.token)) + == Some(token); let block_n_and_response_matches_previous_block = bs.pcs .get(&0) - .and_then(|p| p.get_msg().map(|m| m.cache_key())) + .and_then(|p| p.get_msg().map(|m| m.data().cache_key())) == Some(cache_key); block0_and_response_is_for_originating_request @@ -193,7 +237,7 @@ impl Block where P: PlatformTypes fn block_state_mut(&self, ix: usize, f: F) -> R where F: for<'a> F1Once<&'a mut BlockState, Ret = R>, - Pcs: Map>>, + Pcs: Map>>>, BS: Array> { let mut f = Some(f); @@ -203,7 +247,7 @@ impl Block where P: PlatformTypes fn block_state(&self, ix: usize, f: F) -> R where F: for<'a> F1Once<&'a BlockState, Ret = R>, - Pcs: Map>>, + Pcs: Map>>>, BS: Array> { let mut f = Some(f); @@ -212,7 +256,7 @@ impl Block where P: PlatformTypes } fn push(&self, s: BlockState) - where Pcs: Map>>, + where Pcs: Map>>>, BS: Array> { let mut s = Some(s); @@ -221,28 +265,31 @@ impl Block where P: PlatformTypes } fn remove(&self, ix: usize) - where Pcs: Map>>, + where Pcs: Map>>>, BS: Array> { self.block_states.map_mut(|bs| bs.remove(ix)); } - fn clone_original(&self, ix: usize) -> Message

- where Pcs: Map>>, + fn clone_original(&self, ix: usize) -> Option>> + where Pcs: Map>>>, BS: Array> { self.block_state(ix, |s: &BlockState<_, _>| { - let orig: &Message

= s.original.as_ref().unwrap(); - - let mut new = Message::

::new(Type::Con, orig.code, Id(0), orig.token); - - orig.opts.iter().for_each(|(n, vs)| { - if n.include_in_cache_key() { - new.opts.insert(*n, vs.clone()).ok(); - } - }); - - new + s.original.as_ref().map(|orig| { + let addr = orig.addr(); + let orig: &Message

= orig.data(); + let mut new = + Message::

::new(Type::Con, orig.code, Id(0), orig.token); + + orig.opts.iter().for_each(|(n, vs)| { + if n.include_in_cache_key() { + new.opts.insert(*n, vs.clone()).ok(); + } + }); + + Addrd(new, addr) + }) }) } } @@ -250,7 +297,7 @@ impl Block where P: PlatformTypes impl Step

for Block where P: PlatformTypes, S: Step>, PollResp = Addrd>>, - Pcs: Map>>, + Pcs: Map>>> + core::fmt::Debug, BS: Array> { type PollReq = Addrd>; @@ -266,26 +313,42 @@ impl Step

for Block snap: &crate::platform::Snapshot

, effects: &mut P::Effects) -> super::StepOutput { + self.prune(effects, snap.time); let mut req = _try!(Option; self.inner().poll_req(snap, effects)); - match req.data().msg().block2() { - | None => Some(Ok(req)), - | Some(block) => { - let block_state_ix = self.find_rx_request_block_state_ix(req.data().msg().token); + macro_rules! respond { + ($code:expr) => {{ + let rep_ty = if req.data().msg().ty == Type::Con { + Type::Ack + } else { + Type::Non + }; + + let rep = Message::

::new(rep_ty, $code, Id(0), req.data().msg().token); + effects.push(Effect::Send(Addrd(rep, req.addr()))); + }}; + } - macro_rules! respond { - ($code:expr) => {{ - let rep_ty = if req.data().msg().ty == Type::Con { - Type::Ack - } else { - Type::Non - }; + let block_state_ix = self.find_rx_request_block_state_ix(req.as_ref().map(|r| r.msg().token)); - let rep = Message::

::new(rep_ty, $code, Id(0), req.data().msg().token); - effects.push(Effect::Send(Addrd(rep, req.addr()))); - }}; + match req.data().msg().block2() { + | None => { + if let Some(ix) = block_state_ix { + self.block_state(ix, |s: &BlockState<_, _>| { + log!(Block::poll_req, + effects, + log::Level::Warn, + "Expected message {:?} to continue block sequence {:?}", + req.data().msg().token, + s); + }); + self.remove(ix); + respond!(REQUEST_ENTITY_INCOMPLETE); } + Some(Ok(req)) + }, + | Some(block) => { match block_state_ix { | None if block.num() > 0 => { respond!(REQUEST_ENTITY_INCOMPLETE); @@ -293,10 +356,14 @@ impl Step

for Block Some(Err(nb::Error::WouldBlock)) }, | None if block.more() => { - let mut block_state = BlockState { biggest: Some(0), - original: None, - pcs: Default::default() }; - block_state.have(0, req.data().msg().clone()); + let mut block_state = + BlockState { biggest: Some(0), + original: None, + pcs: Default::default(), + expires_at: snap.time + + Milliseconds(snap.config.exchange_lifetime_millis()) }; + + block_state.have(snap.time, 0, req.clone().map(|r| r.into())); self.push(block_state); respond!(CONTINUE); @@ -321,7 +388,7 @@ impl Step

for Block }, | Some(ix) => { self.block_state_mut(ix, |s: &mut BlockState<_, _>| { - s.have(block.num(), req.data().msg().clone()) + s.have(snap.time, block.num(), req.clone().map(|r| r.into())) }); if block.more() { @@ -345,11 +412,12 @@ impl Step

for Block token: Token, addr: SocketAddr) -> super::StepOutput { + self.prune(effects, snap.time); let mut rep: Addrd> = _try!(Option; self.inner().poll_resp(snap, effects, token, addr)); - let block_state_ix = - self.find_rx_response_block_state_ix(rep.data().msg().token, rep.data().msg().cache_key()); + let block_state_ix = self.find_rx_response_block_state_ix(rep.as_ref().map(|r| r.msg().token), + rep.data().msg().cache_key()); match rep.data().msg().block1() { | None => { @@ -368,19 +436,19 @@ impl Step

for Block | Some(ix) => { macro_rules! request_num { ($num:expr) => {{ - let mut new = self.clone_original(ix); + let mut new = self.clone_original(ix).unwrap(); - new.set_block1(0, $num, false).ok(); - new.remove(BLOCK2); - new.remove(SIZE2); + new.as_mut().set_block1(0, $num, false).ok(); + new.as_mut().remove(BLOCK2); + new.as_mut().remove(SIZE2); - effects.push(Effect::Send(rep.as_ref().map(|_| new))); - self.block_state_mut(ix, |s: &mut BlockState<_, _>| s.waiting($num)); + effects.push(Effect::Send(new)); + self.block_state_mut(ix, |s: &mut BlockState<_, _>| s.waiting(snap.time, $num)); }}; } self.block_state_mut(ix, |s: &mut BlockState<_, _>| { - s.have(block.num(), rep.data().msg().clone()) + s.have(snap.time, block.num(), rep.clone().map(|r| r.into())) }); if block.more() { @@ -410,11 +478,14 @@ impl Step

for Block effs: &mut P::Effects, msg: &Addrd>) -> Result<(), Self::Error> { + self.prune(effs, snap.time); self.inner.on_message_sent(snap, effs, msg)?; if msg.data().code.kind() == CodeKind::Request { self.push(BlockState { biggest: None, - original: Some(msg.data().clone()), - pcs: Default::default() }); + original: Some(msg.clone()), + pcs: Default::default(), + expires_at: snap.time + + Milliseconds(snap.config.exchange_lifetime_millis()) }); } Ok(()) } @@ -422,34 +493,34 @@ impl Step

for Block #[cfg(test)] mod tests { - use core::time::Duration; - use std::time::Instant; - use std_alloc::collections::BTreeMap; use tinyvec::array_vec; use toad_msg::{Code, ContentFormat, Id, MessageOptions, Type}; use super::*; use crate::net::Addrd; + use crate::resp::code::CONTENT; use crate::test; #[test] fn ent_correctly_identifies_missing_pieces() { let mut e = BlockState::>> { biggest: None, original: None, - pcs: BTreeMap::new() }; - e.have(0, ()); + pcs: BTreeMap::new(), + expires_at: + Instant::new(1000) }; + e.have(Instant::new(0), 0, ()); assert_eq!(e.get_missing(), None); - e.have(1, ()); + e.have(Instant::new(0), 1, ()); assert_eq!(e.get_missing(), None); - e.waiting(3); - e.waiting(2); - e.waiting(5); + e.waiting(Instant::new(0), 3); + e.waiting(Instant::new(0), 2); + e.waiting(Instant::new(0), 5); assert_eq!(e.get_missing(), Some(4)); - e.waiting(4); + e.waiting(Instant::new(0), 4); assert_eq!(e.get_missing(), None); } @@ -499,9 +570,10 @@ mod tests { struct TestState { gave_pieces: Vec, req: Option>, - last_request_at: Instant, + last_request_at: std::time::Instant, } + #[allow(dead_code)] struct Addrs { server: SocketAddr, client: SocketAddr, @@ -523,7 +595,7 @@ mod tests { b.on_message_sent(&test::snapshot(), &mut effects, - &Addrd(orig_req, addrs.client)) + &Addrd(orig_req, addrs.server)) .unwrap(); let payload = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do"; @@ -542,7 +614,7 @@ mod tests { b.inner() .init(TestState { gave_pieces: vec![], req: None, - last_request_at: Instant::now() }) + last_request_at: std::time::Instant::now() }) .set_poll_resp(move |mock, _, _, _, _| { let blocksize: u16 = 16; let blocks = payload_blocks(); @@ -561,16 +633,18 @@ mod tests { .map_ref(|s| s.as_ref().unwrap().gave_pieces.clone()); let last_request_at = mock.state .map_ref(|s| s.as_ref().unwrap().last_request_at.clone()); - let elapsed = Instant::now().duration_since(last_request_at); + let elapsed = std::time::Instant::now().duration_since(last_request_at); match requested_piece { | None if already_gave_pieces.is_empty() => { resp.set_payload(blocks[0].iter().copied()); resp.msg_mut().set_block1(blocksize, 0, true).ok(); mock.state - .map_mut(|s| s.as_mut().unwrap().last_request_at = Instant::now()); + .map_mut(|s| s.as_mut().unwrap().last_request_at = std::time::Instant::now()); + }, + | None if request.is_none() && elapsed > std::time::Duration::from_secs(1) => { + panic!("timeout") }, - | None if request.is_none() && elapsed > Duration::from_secs(1) => panic!("timeout"), | None => panic!("Block1 not set on request when client already got a Block1 response"), | Some(_) if request.map(|r| r.data().msg().cache_key()) != Some(cache_key) => { panic!("cache_key mismatch!") @@ -588,7 +662,7 @@ mod tests { mock.state.map_mut(|s| { let s = s.as_mut().unwrap(); s.gave_pieces.push(n); - s.last_request_at = Instant::now(); + s.last_request_at = std::time::Instant::now(); }); }, } @@ -640,9 +714,71 @@ mod tests { payload.bytes().collect::>()); } + #[test] + fn when_recv_response_with_block2_and_dont_hear_back_for_a_long_time_this_should_prune_state() { + #[derive(Clone, Copy)] + #[allow(dead_code)] + struct Addrs { + server: SocketAddr, + client: SocketAddr, + } + + let addrs = Addrs { server: test::x.x.x.x(80), + client: test::x.x.x.x(10) }; + let addrs: &'static Addrs = unsafe { core::mem::transmute(&addrs) }; + + type S = test::MockStep<(), Addrd, Addrd, ()>; + let b = Block::, BTreeMap<_, _>>::default(); + + b.inner().set_poll_resp(|_, snap, _, _, _| { + if snap.time == Instant::new(0) { + let mut rep = + test::Message::new(Type::Con, CONTENT, Id(0), Token(Default::default())); + rep.set_block1(128, 0, true).ok(); + Some(Ok(Addrd(Resp::from(rep), addrs.server))) + } else { + None + } + }); + + let mut effects = vec![]; + let t_0 = test::snapshot(); + let mut t_n1 = test::snapshot(); + t_n1.time = Instant::new(0) + Milliseconds(t_n1.config.exchange_lifetime_millis() - 1); + + let mut t_n2 = test::snapshot(); + t_n2.time = Instant::new(0) + Milliseconds(t_n2.config.exchange_lifetime_millis() + 1); + + let req = test::Message::new(Type::Non, Code::GET, Id(0), Token(Default::default())); + let req = Addrd(req, addrs.server); + + b.on_message_sent(&t_0, &mut effects, &req).unwrap(); + + let rep_0 = b.poll_resp(&t_0, &mut effects, Token(Default::default()), addrs.server) + .unwrap() + .unwrap_err(); + + assert!(matches!(rep_0, nb::Error::WouldBlock)); + + assert_eq!(b.block_states.map_ref(|bss| bss.len()), 1); + + b.poll_resp(&t_n1, &mut effects, Token(Default::default()), addrs.server) + .ok_or(()) + .unwrap_err(); + + assert_eq!(b.block_states.map_ref(|bss| bss.len()), 1); + + b.poll_resp(&t_n2, &mut effects, Token(Default::default()), addrs.server) + .ok_or(()) + .unwrap_err(); + + assert_eq!(b.block_states.map_ref(|bss| bss.len()), 0); + } + #[test] fn when_recv_request_without_block2_this_should_do_nothing() { #[derive(Clone, Copy)] + #[allow(dead_code)] struct Addrs { server: SocketAddr, client: SocketAddr, @@ -676,6 +812,7 @@ mod tests { } #[derive(Clone, Copy)] + #[allow(dead_code)] struct Addrs { server: SocketAddr, client: SocketAddr, @@ -728,6 +865,7 @@ mod tests { #[test] fn when_recv_request_with_block2_and_unrecognized_number_this_should_respond_4_08() { #[derive(Clone, Copy)] + #[allow(dead_code)] struct Addrs { server: SocketAddr, client: SocketAddr, From 9ce8a89773afe543942670227d8d1e9fde4d930c Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Mon, 8 May 2023 00:12:24 -0500 Subject: [PATCH 08/12] test: test that rxd requests are pruned --- toad/src/step/block.rs | 44 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/toad/src/step/block.rs b/toad/src/step/block.rs index df63c3ba..ab849e88 100644 --- a/toad/src/step/block.rs +++ b/toad/src/step/block.rs @@ -714,6 +714,50 @@ mod tests { payload.bytes().collect::>()); } + #[test] + fn when_recv_request_with_block2_and_dont_hear_another_for_a_long_time_this_should_prune_state() { + #[derive(Clone, Copy)] + #[allow(dead_code)] + struct Addrs { + server: SocketAddr, + client: SocketAddr, + } + + let addrs = Addrs { server: test::x.x.x.x(80), + client: test::x.x.x.x(10) }; + let addrs: &'static Addrs = unsafe { core::mem::transmute(&addrs) }; + + type S = test::MockStep<(), Addrd, Addrd, ()>; + let b = Block::, BTreeMap<_, _>>::default(); + + b.inner().set_poll_req(|_, snap, _| { + if snap.time == Instant::new(0) { + let mut req = + test::Message::new(Type::Non, Code::GET, Id(0), Token(Default::default())); + req.set_block2(128, 0, true).ok(); + Some(Ok(Addrd(Req::from(req), addrs.client))) + } else { + None + } + }); + + let t_0 = test::snapshot(); + let mut t_1 = test::snapshot(); + t_1.time = Instant::new(0) + Milliseconds(t_1.config.exchange_lifetime_millis() - 1); + + let mut t_2 = test::snapshot(); + t_2.time = Instant::new(0) + Milliseconds(t_2.config.exchange_lifetime_millis() + 1); + + assert!(matches!(b.poll_req(&t_0, &mut vec![]).unwrap().unwrap_err(), nb::Error::WouldBlock)); + assert_eq!(b.block_states.map_ref(|bss| bss.len()), 1); + + assert!(matches!(b.poll_req(&t_1, &mut vec![]), None)); + assert_eq!(b.block_states.map_ref(|bss| bss.len()), 1); + + assert!(matches!(b.poll_req(&t_2, &mut vec![]), None)); + assert_eq!(b.block_states.map_ref(|bss| bss.len()), 0); + } + #[test] fn when_recv_response_with_block2_and_dont_hear_back_for_a_long_time_this_should_prune_state() { #[derive(Clone, Copy)] From ec537585c802fc70c03858471fde7dd5cdccca17 Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Mon, 8 May 2023 13:55:50 -0500 Subject: [PATCH 09/12] refactor: rework type design to support outbound messages --- toad/src/step/block.rs | 597 ++++++++++++++++++++++------------------- 1 file changed, 315 insertions(+), 282 deletions(-) diff --git a/toad/src/step/block.rs b/toad/src/step/block.rs index ab849e88..a85f2e63 100644 --- a/toad/src/step/block.rs +++ b/toad/src/step/block.rs @@ -2,7 +2,6 @@ use core::marker::PhantomData; use embedded_time::duration::Milliseconds; use embedded_time::Instant; -use naan::prelude::F1Once; use no_std_net::SocketAddr; use toad_array::{AppendCopy, Array}; use toad_map::Map; @@ -18,36 +17,92 @@ use crate::req::Req; use crate::resp::code::{CONTINUE, REQUEST_ENTITY_INCOMPLETE}; use crate::resp::Resp; +/// A potential role for a blocked message (request / response) +/// +/// Part of composite map keys identifying [`Conversation`]s. +#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy)] +pub enum Role { + #[allow(missing_docs)] + Request, + #[allow(missing_docs)] + Response, +} + +/// Whether a blocked message is outbound or inbound +/// +/// Part of composite map keys identifying [`Conversation`]s. +#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy)] +pub enum Direction { + #[allow(missing_docs)] + Outbound, + #[allow(missing_docs)] + Inbound, +} + +/// A single piece of a blocked message #[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy)] -enum Piece { - Have(M), - Waiting, - Missing, +pub enum Piece { + /// For [`Direction::Inbound`], indicates that we have received this piece, + /// and have the data `M` associated with a piece. + /// + /// For [`Direction::Outbound`], indicates that we have not yet communicated + /// this piece, and are waiting for the right time to transition to a different state. + Data(M), + /// For [`Direction::Inbound`], indicates that we have requested this piece, + /// and expect that it will be transitioned to [`Piece::Have`] at some point in + /// the future. + /// + /// For [`Direction::Outbound`], this state is impossible. + Requested, + /// For [`Direction::Inbound`], indicates that we have not yet received or asked for this piece. + /// + /// For [`Direction::Outbound`], indicates that we previously had this piece and have + /// sent it to the remote endpoint. + Absent, } impl Piece { fn get_msg(&self) -> Option<&M> { match self { - | Piece::Have(m) => Some(m), + | Piece::Data(m) => Some(m), | _ => None, } } } +/// The state for a given Blocked conversation between ourselves and a remote endpoint. #[derive(Debug)] -struct BlockState +pub struct Conversation where P: PlatformTypes { - pub(self) biggest: Option, - pub(self) original: Option>>, - pub(self) pcs: Pcs, + pub(self) biggest_number_seen: Option, + pub(self) original: Option>, + pub(self) pcs: Pieces, pub(self) expires_at: Instant, } -impl BlockState where P: PlatformTypes +impl Conversation + where P: PlatformTypes, + Pieces: Default { + /// Create a new [`Conversation`] tracking a Blocked response to a sent request (param `msg`) + pub fn expect_response(expires_at: Instant, msg: Message

) -> Self { + Self { original: Some(msg), + biggest_number_seen: None, + pcs: Default::default(), + expires_at } + } + + /// Create a new [`Conversation`] tracking a received Blocked request + pub fn request(expires_at: Instant) -> Self { + Self { original: None, + biggest_number_seen: None, + pcs: Default::default(), + expires_at } + } + pub(self) fn assembled(&self) -> Payload - where Pcs: Map>>> + where Pieces: Map>> { const PANIC_MSG: &'static str = r#"BlockState.assembled() assumes: - BlockState.biggest is Some(_) @@ -55,13 +110,12 @@ impl BlockState where P: PlatformTypes - BlockState.have_all() has been invoked and confirmed to be true"#; let mut p = P::MessagePayload::default(); - for i in 0..=self.biggest.expect(PANIC_MSG) { + for i in 0..=self.biggest_number_seen.expect(PANIC_MSG) { p.append_copy(&self.pcs .get(&i) .expect(PANIC_MSG) .get_msg() .expect(PANIC_MSG) - .data() .payload .0); } @@ -71,56 +125,56 @@ impl BlockState where P: PlatformTypes /// find a missing piece that should be requested pub(self) fn get_missing(&self) -> Option - where Pcs: Map>, + where Pieces: Map>, T: PartialEq { self.pcs .iter() - .find_map(|(n, p)| if p == &Piece::Missing { Some(n) } else { None }) + .find_map(|(n, p)| if p == &Piece::Absent { Some(n) } else { None }) .copied() } /// are no pieces waiting or missing? pub(self) fn have_all(&self) -> bool - where Pcs: Map>, + where Pieces: Map>, T: PartialEq { self.pcs .iter() - .all(|(_, p)| p != &Piece::Missing && p != &Piece::Waiting) + .all(|(_, p)| p != &Piece::Absent && p != &Piece::Requested) } /// if `n > self.biggest`, update `self.biggest` to `n` /// and insert `Piece::Missing` for all pieces between `biggest` and `n` pub(self) fn touch(&mut self, now: Instant, n: u32) - where Pcs: Map> + where Pieces: Map> { - let missing_nums = match self.biggest { + let missing_nums = match self.biggest_number_seen { | Some(m) if m + 1 < n => (m + 1..n).into_iter(), | None if n > 0 => (0..n).into_iter(), | _ => (0..0).into_iter(), }; missing_nums.for_each(|n| { - self.pcs.insert(n, Piece::Missing).ok(); + self.pcs.insert(n, Piece::Absent).ok(); }); - let n_is_bigger = self.biggest.map(|m| m < n).unwrap_or(true); + let n_is_bigger = self.biggest_number_seen.map(|m| m < n).unwrap_or(true); if n_is_bigger { - self.biggest = Some(n); + self.biggest_number_seen = Some(n); } } /// Mark piece `n` as having been requested pub(self) fn waiting(&mut self, now: Instant, n: u32) - where Pcs: Map> + where Pieces: Map> { let e = self.pcs.get_mut(&n); match e { - | Some(Piece::Missing) | Some(Piece::Waiting) | None => { + | Some(Piece::Absent) | Some(Piece::Requested) | None => { self.touch(now, n); - self.pcs.insert(n, Piece::Waiting).ok(); + self.pcs.insert(n, Piece::Requested).ok(); }, | _ => (), } @@ -128,177 +182,164 @@ impl BlockState where P: PlatformTypes /// Store piece `T` corresponding to piece number `n` pub(self) fn have(&mut self, now: Instant, n: u32, m: T) - where Pcs: Map> + where Pieces: Map> { self.touch(now, n); - self.pcs.insert(n, Piece::Have(m)).ok(); - } - - pub(self) fn biggest(&self) -> Option { - self.biggest + self.pcs.insert(n, Piece::Data(m)).ok(); } } /// TODO #[derive(Debug)] -pub struct Block { +pub struct Block { inner: S, - block_states: Stem, - __p: PhantomData<(P, Pcs)>, + endpoints: Stem, + __p: PhantomData<(P, Conversations, Pieces)>, } -impl Default for Block +impl Default + for Block where S: Default, - BS: Default + Endpoints: Default { fn default() -> Self { Block { inner: S::default(), - block_states: Stem::new(BS::default()), + endpoints: Stem::new(Endpoints::default()), __p: PhantomData } } } -impl Block where P: PlatformTypes +impl Block + where P: PlatformTypes, + Endpoints: Default + Map, + Conversations: + core::fmt::Debug + Default + Map<(Token, Role, Direction), Conversation>, + Pieces: core::fmt::Debug + Default + Map>> { - fn prune(&self, effects: &mut P::Effects, now: Instant) - where Pcs: Map>>>, - BS: Array> - { - let len_before = self.block_states.map_ref(|bss| bss.len()); - fn go<_P, _S, _BS, _Pcs>(now: Instant<_P::Clock>, b: &Block<_P, _S, _BS, _Pcs>) - where _P: PlatformTypes, - _Pcs: Map>>>, - _BS: Array> - { - if let Some(ix) = b.block_states.map_ref(|bss| { - bss.iter().enumerate().find_map(|(ix, b)| { - if now >= b.expires_at { - Some(ix) - } else { - None - } - }) - }) - { - b.block_states.map_mut(|bss| bss.remove(ix)); - go(now, b); - } - } + fn prune_conversations(&self, + effects: &mut P::Effects, + now: Instant, + cs: &mut Conversations) { + let len_before = cs.len(); + let mut remove_next = || { + cs.iter() + .filter(|(_, b)| now >= b.expires_at) + .map(|(k, _)| *k) + .next() + .map(|k| cs.remove(&k)) + .map(|_| ()) + }; + + while let Some(()) = remove_next() {} - go(now, self); - let len_after = self.block_states.map_ref(|bss| bss.len()); + let len_after = cs.len(); if len_before - len_after > 0 { log!(Block::prune, effects, log::Level::Debug, "Removed {} expired entries. For outbound messages, a prior step SHOULD but MAY NOT retry sending them", len_before - len_after); } } - fn find_rx_request_block_state_ix(&self, token: Addrd) -> Option - where Pcs: Map>>>, - BS: Array> - { - self.block_states.map_ref(|bs| { - bs.iter() - .enumerate() - .find(|(_, bs)| match bs.pcs.get(&0) { - | Some(Piece::Have(m)) => m.as_ref().map(|m| m.token) == token, - | _ => false, - }) - .map(|(ix, _)| ix) - }) + fn prune_endpoints(&self) { + self.endpoints.map_mut(|es| { + let mut remove_next = || { + es.iter() + .filter(|(_, cs)| cs.is_empty()) + .map(|(k, _)| *k) + .next() + .map(|k| es.remove(&k)) + .map(|_| ()) + }; + + while let Some(()) = remove_next() {} + }); } - fn find_rx_response_block_state_ix(&self, token: Addrd, cache_key: u64) -> Option - where Pcs: Map>>>, - BS: Array> - { - self.block_states.map_ref(|block_states| { - block_states.iter() - .enumerate() - .find(|(_, bs)| { - let block0_and_response_is_for_originating_request = - bs.biggest.is_none() - && bs.original - .as_ref() - .map(|msg| msg.as_ref().map(|m| m.token)) - == Some(token); - - let block_n_and_response_matches_previous_block = - bs.pcs - .get(&0) - .and_then(|p| p.get_msg().map(|m| m.data().cache_key())) - == Some(cache_key); - - block0_and_response_is_for_originating_request - || block_n_and_response_matches_previous_block - }) - .map(|(ix, _)| ix) - }) + fn prune(&self, effects: &mut P::Effects, now: Instant) { + // TODO: log + self.endpoints.map_mut(|es| { + es.iter_mut() + .for_each(|(_, c)| self.prune_conversations(effects, now, c)) + }); + self.prune_endpoints(); } - fn block_state_mut(&self, ix: usize, f: F) -> R - where F: for<'a> F1Once<&'a mut BlockState, Ret = R>, - Pcs: Map>>>, - BS: Array> + fn get_endpoint(&self, addr: SocketAddr, mut f: F) -> Option + where F: FnMut(&mut Conversations) -> R { - let mut f = Some(f); - self.block_states - .map_mut(|bs| Option::take(&mut f).unwrap().call1(&mut bs[ix])) + self.endpoints.map_mut(|es| es.get_mut(&addr).map(&mut f)) } - fn block_state(&self, ix: usize, f: F) -> R - where F: for<'a> F1Once<&'a BlockState, Ret = R>, - Pcs: Map>>>, - BS: Array> + fn get_or_create_endpoint(&self, addr: SocketAddr, mut f: F) -> R + where F: FnMut(&mut Conversations) -> R { - let mut f = Some(f); - self.block_states - .map_ref(|bs| Option::take(&mut f).unwrap().call1(&bs[ix])) + self.endpoints.map_mut(|es| { + if !es.has(&addr) { + es.insert(addr, Conversations::default()).unwrap(); + } + + es.get_mut(&addr).map(&mut f).unwrap() + }) } - fn push(&self, s: BlockState) - where Pcs: Map>>>, - BS: Array> - { - let mut s = Some(s); - self.block_states - .map_mut(|bs| bs.push(Option::take(&mut s).unwrap())); + fn expect_response(&self, snap: &Snapshot

, addr: SocketAddr, req: &Message

) { + let exp = snap.time + Milliseconds(snap.config.exchange_lifetime_millis()); + self.get_or_create_endpoint(addr, |convs| { + convs.insert((req.token, Role::Response, Direction::Inbound), + Conversation::expect_response(exp, req.clone())) + .unwrap(); + }); + } + + fn insert_request(&self, + snap: &Snapshot

, + (addr, token, role, dir): (SocketAddr, Token, Role, Direction)) { + let exp = snap.time + Milliseconds(snap.config.exchange_lifetime_millis()); + self.get_or_create_endpoint(addr, |convs| { + convs.insert((token, role, dir), Conversation::request(exp)) + .unwrap(); + }); } - fn remove(&self, ix: usize) - where Pcs: Map>>>, - BS: Array> + fn has(&self, (addr, token, role, dir): (SocketAddr, Token, Role, Direction)) -> bool { + self.get_endpoint(addr, |convs| convs.has(&(token, role, dir))) + .unwrap_or(false) + } + + fn map_mut(&self, + (addr, token, role, dir): (SocketAddr, Token, Role, Direction), + mut f: F) + -> Option + where F: FnMut(&mut Conversation) -> R { - self.block_states.map_mut(|bs| bs.remove(ix)); + self.get_or_create_endpoint(addr, |conv| conv.get_mut(&(token, role, dir)).map(&mut f)) } - fn clone_original(&self, ix: usize) -> Option>> - where Pcs: Map>>>, - BS: Array> + fn map(&self, + (addr, token, role, dir): (SocketAddr, Token, Role, Direction), + f: F) + -> Option + where F: FnOnce(&Conversation) -> R { - self.block_state(ix, |s: &BlockState<_, _>| { - s.original.as_ref().map(|orig| { - let addr = orig.addr(); - let orig: &Message

= orig.data(); - let mut new = - Message::

::new(Type::Con, orig.code, Id(0), orig.token); - - orig.opts.iter().for_each(|(n, vs)| { - if n.include_in_cache_key() { - new.opts.insert(*n, vs.clone()).ok(); - } - }); - - Addrd(new, addr) - }) + let mut f = Some(f); + self.get_or_create_endpoint(addr, |conv| { + conv.get(&(token, role, dir)) + .map(Option::take(&mut f).unwrap()) }) } + + fn remove_if_present(&self, (addr, token, role, dir): (SocketAddr, Token, Role, Direction)) { + self.get_endpoint(addr, |convs| { + convs.remove(&(token, role, dir)); + }); + } } -impl Step

for Block +impl Step

+ for Block where P: PlatformTypes, S: Step>, PollResp = Addrd>>, - Pcs: Map>>> + core::fmt::Debug, - BS: Array> + Endpoints: core::fmt::Debug + Map, + Conversations: core::fmt::Debug + Map<(Token, Role, Direction), Conversation>, + Pieces: core::fmt::Debug + Map>> { type PollReq = Addrd>; type PollResp = Addrd>; @@ -318,10 +359,9 @@ impl Step

for Block macro_rules! respond { ($code:expr) => {{ - let rep_ty = if req.data().msg().ty == Type::Con { - Type::Ack - } else { - Type::Non + let rep_ty = match req.data().msg().ty { + | Type::Con => Type::Ack, + | _ => Type::Non, }; let rep = Message::

::new(rep_ty, $code, Id(0), req.data().msg().token); @@ -329,12 +369,13 @@ impl Step

for Block }}; } - let block_state_ix = self.find_rx_request_block_state_ix(req.as_ref().map(|r| r.msg().token)); + let k = (req.addr(), req.data().msg().token, Role::Request, Direction::Inbound); + let has_prev_pieces = self.has(k); match req.data().msg().block2() { | None => { - if let Some(ix) = block_state_ix { - self.block_state(ix, |s: &BlockState<_, _>| { + if has_prev_pieces { + self.map(k, |s: &Conversation<_, _>| { log!(Block::poll_req, effects, log::Level::Warn, @@ -342,65 +383,51 @@ impl Step

for Block req.data().msg().token, s); }); - self.remove(ix); + self.remove_if_present(k); respond!(REQUEST_ENTITY_INCOMPLETE); } Some(Ok(req)) }, | Some(block) => { - match block_state_ix { - | None if block.num() > 0 => { - respond!(REQUEST_ENTITY_INCOMPLETE); - - Some(Err(nb::Error::WouldBlock)) - }, - | None if block.more() => { - let mut block_state = - BlockState { biggest: Some(0), - original: None, - pcs: Default::default(), - expires_at: snap.time - + Milliseconds(snap.config.exchange_lifetime_millis()) }; - - block_state.have(snap.time, 0, req.clone().map(|r| r.into())); - - self.push(block_state); - respond!(CONTINUE); - - Some(Err(nb::Error::WouldBlock)) - }, - | None => { - // this is block 0 and there are no more blocks, - // simply yield the request - Some(Ok(req)) - }, - | Some(ix) - if self.block_state(ix, BlockState::biggest) - .map(|n| n + 1) - .unwrap_or(0) - < block.num() => - { - self.remove(ix); - respond!(REQUEST_ENTITY_INCOMPLETE); - - Some(Err(nb::Error::WouldBlock)) - }, - | Some(ix) => { - self.block_state_mut(ix, |s: &mut BlockState<_, _>| { - s.have(snap.time, block.num(), req.clone().map(|r| r.into())) - }); - - if block.more() { - respond!(CONTINUE); - Some(Err(nb::Error::WouldBlock)) - } else { - let p = self.block_state(ix, BlockState::assembled); - req.as_mut().msg_mut().payload = p; - self.remove(ix); - Some(Ok(req)) - } - }, + if !has_prev_pieces && block.num() == 0 && block.more() { + self.insert_request(snap, k); + self.map_mut(k, |conv| { + conv.have(snap.time, 0, req.clone().map(|r| r.into()).unwrap()) + }); + respond!(CONTINUE); + Some(Err(nb::Error::WouldBlock)) + } else if !has_prev_pieces && block.num() == 0 && !block.more() { + Some(Ok(req)) + } else if !has_prev_pieces && block.num() > 0 { + respond!(REQUEST_ENTITY_INCOMPLETE); + Some(Err(nb::Error::WouldBlock)) + } else if block.num() + > self.map(k, |conv| conv.biggest_number_seen.map(|n| n + 1)) + .flatten() + .unwrap_or(0) + { + self.remove_if_present(k); + respond!(REQUEST_ENTITY_INCOMPLETE); + Some(Err(nb::Error::WouldBlock)) + } else if block.more() { + self.map_mut(k, |conv| { + conv.have(snap.time, + block.num(), + req.clone().map(|r| r.into()).unwrap()) + }); + respond!(CONTINUE); + Some(Err(nb::Error::WouldBlock)) + } else { + self.map_mut(k, |conv| { + conv.have(snap.time, + block.num(), + req.clone().map(|r| r.into()).unwrap()) + }); + let p = self.map(k, Conversation::assembled).unwrap(); + req.as_mut().msg_mut().payload = p; + self.remove_if_present(k); + Some(Ok(req)) } }, } @@ -413,61 +440,57 @@ impl Step

for Block addr: SocketAddr) -> super::StepOutput { self.prune(effects, snap.time); + let mut rep: Addrd> = _try!(Option; self.inner().poll_resp(snap, effects, token, addr)); - let block_state_ix = self.find_rx_response_block_state_ix(rep.as_ref().map(|r| r.msg().token), - rep.data().msg().cache_key()); + let k = (rep.addr(), rep.data().msg().token, Role::Response, Direction::Inbound); + let has_prev_pieces = self.has(k); + + macro_rules! request_piece { + ($num:expr) => {{ + let mut new = self.map(k, |conv| conv.original.clone().unwrap()).unwrap(); + + new.set_block1(0, $num, false).ok(); + new.remove(BLOCK2); + new.remove(SIZE2); + + effects.push(Effect::Send(Addrd(new, rep.addr()))); + self.map_mut(k, |conv| conv.waiting(snap.time, $num)); + }}; + } match rep.data().msg().block1() { | None => { - // Response didn't have Block1; we can drop the block state - if let Some(ix) = block_state_ix { - self.remove(ix); - } + self.remove_if_present(k); Some(Ok(rep)) }, | Some(block) => { - match block_state_ix { - | None => { - // Got a Block1 message but we don't have any conception of it; yield the response as-is from the inner step. + if !has_prev_pieces { + // TODO: warn + Some(Ok(rep)) + } else { + self.map_mut(k, |conv| { + conv.have(snap.time, + block.num(), + rep.clone().map(|r| r.into()).unwrap()) + }); + if block.more() { + request_piece!(block.num() + 1); + } + + if let Some(missing) = self.map(k, Conversation::get_missing).unwrap() { + request_piece!(missing); + } + + if self.map(k, Conversation::have_all).unwrap() { + rep.as_mut().msg_mut().payload = self.map(k, Conversation::assembled).unwrap(); + rep.as_mut().msg_mut().remove(BLOCK1); + self.remove_if_present(k); Some(Ok(rep)) - }, - | Some(ix) => { - macro_rules! request_num { - ($num:expr) => {{ - let mut new = self.clone_original(ix).unwrap(); - - new.as_mut().set_block1(0, $num, false).ok(); - new.as_mut().remove(BLOCK2); - new.as_mut().remove(SIZE2); - - effects.push(Effect::Send(new)); - self.block_state_mut(ix, |s: &mut BlockState<_, _>| s.waiting(snap.time, $num)); - }}; - } - - self.block_state_mut(ix, |s: &mut BlockState<_, _>| { - s.have(snap.time, block.num(), rep.clone().map(|r| r.into())) - }); - - if block.more() { - request_num!(block.num() + 1); - } - - if let Some(missing) = self.block_state(ix, BlockState::get_missing) { - request_num!(missing); - } - - if self.block_state(ix, BlockState::have_all) { - rep.as_mut().msg_mut().payload = self.block_state(ix, BlockState::assembled); - rep.as_mut().msg_mut().remove(BLOCK1); - self.remove(ix); - Some(Ok(rep)) - } else { - Some(Err(nb::Error::WouldBlock)) - } - }, + } else { + Some(Err(nb::Error::WouldBlock)) + } } }, } @@ -481,12 +504,10 @@ impl Step

for Block self.prune(effs, snap.time); self.inner.on_message_sent(snap, effs, msg)?; if msg.data().code.kind() == CodeKind::Request { - self.push(BlockState { biggest: None, - original: Some(msg.clone()), - pcs: Default::default(), - expires_at: snap.time - + Milliseconds(snap.config.exchange_lifetime_millis()) }); + self.expect_response(snap, msg.addr(), msg.data()); + } else if msg.data().code.kind() == CodeKind::Response { } + Ok(()) } } @@ -502,13 +523,19 @@ mod tests { use crate::resp::code::CONTENT; use crate::test; + type Pieces = BTreeMap>; + type Conversations = + BTreeMap<(Token, Role, Direction), super::Conversation>; + type Block = + super::Block, Conversations, Pieces>; + #[test] - fn ent_correctly_identifies_missing_pieces() { - let mut e = BlockState::>> { biggest: None, - original: None, - pcs: BTreeMap::new(), - expires_at: - Instant::new(1000) }; + fn block_state_correctly_identifies_missing_pieces() { + let mut e = + Conversation::>> { biggest_number_seen: None, + original: None, + pcs: BTreeMap::new(), + expires_at: Instant::new(1000) }; e.have(Instant::new(0), 0, ()); assert_eq!(e.get_missing(), None); @@ -528,7 +555,7 @@ mod tests { #[test] fn when_inner_errors_block_should_error() { type S = test::MockStep<(), Addrd, Addrd, ()>; - let b = Block::, BTreeMap<_, _>>::default(); + let b = Block::::default(); b.inner() .set_poll_req(|_, _, _| Some(Err(nb::Error::Other(())))) @@ -546,7 +573,7 @@ mod tests { #[test] fn when_recv_response_with_no_block1_this_should_do_nothing() { type S = test::MockStep<(), Addrd, Addrd, ()>; - let b = Block::, BTreeMap<_, _>>::default(); + let b = Block::::default(); b.inner().set_poll_resp(|_, _, _, _, _| { let msg = toad_msg::alloc::Message::new(Type::Con, @@ -583,7 +610,7 @@ mod tests { client: test::x.x.x.x(10) }; type S = test::MockStep, Addrd, ()>; - let b = Block::, BTreeMap<_, _>>::default(); + let b = Block::::default(); let mut orig_req = test::Message::new(Type::Con, Code::GET, Id(1), Token(array_vec! {1})); orig_req.set_accept(ContentFormat::Text).ok(); @@ -715,7 +742,8 @@ mod tests { } #[test] - fn when_recv_request_with_block2_and_dont_hear_another_for_a_long_time_this_should_prune_state() { + fn when_recv_request_with_block2_and_dont_hear_another_for_a_long_time_this_should_prune_state( + ) { #[derive(Clone, Copy)] #[allow(dead_code)] struct Addrs { @@ -728,7 +756,7 @@ mod tests { let addrs: &'static Addrs = unsafe { core::mem::transmute(&addrs) }; type S = test::MockStep<(), Addrd, Addrd, ()>; - let b = Block::, BTreeMap<_, _>>::default(); + let b = Block::::default(); b.inner().set_poll_req(|_, snap, _| { if snap.time == Instant::new(0) { @@ -748,14 +776,17 @@ mod tests { let mut t_2 = test::snapshot(); t_2.time = Instant::new(0) + Milliseconds(t_2.config.exchange_lifetime_millis() + 1); - assert!(matches!(b.poll_req(&t_0, &mut vec![]).unwrap().unwrap_err(), nb::Error::WouldBlock)); - assert_eq!(b.block_states.map_ref(|bss| bss.len()), 1); + assert!(matches!(b.poll_req(&t_0, &mut vec![]).unwrap().unwrap_err(), + nb::Error::WouldBlock)); + assert_eq!(b.get_endpoint(addrs.client, |convs| convs.len()).unwrap(), + 1); assert!(matches!(b.poll_req(&t_1, &mut vec![]), None)); - assert_eq!(b.block_states.map_ref(|bss| bss.len()), 1); + assert_eq!(b.get_endpoint(addrs.client, |convs| convs.len()).unwrap(), + 1); assert!(matches!(b.poll_req(&t_2, &mut vec![]), None)); - assert_eq!(b.block_states.map_ref(|bss| bss.len()), 0); + assert_eq!(b.get_endpoint(addrs.client, |convs| convs.len()), None); } #[test] @@ -772,7 +803,7 @@ mod tests { let addrs: &'static Addrs = unsafe { core::mem::transmute(&addrs) }; type S = test::MockStep<(), Addrd, Addrd, ()>; - let b = Block::, BTreeMap<_, _>>::default(); + let b = Block::::default(); b.inner().set_poll_resp(|_, snap, _, _, _| { if snap.time == Instant::new(0) { @@ -804,19 +835,21 @@ mod tests { assert!(matches!(rep_0, nb::Error::WouldBlock)); - assert_eq!(b.block_states.map_ref(|bss| bss.len()), 1); + assert_eq!(b.get_endpoint(addrs.server, |convs| convs.len()).unwrap(), + 1); b.poll_resp(&t_n1, &mut effects, Token(Default::default()), addrs.server) .ok_or(()) .unwrap_err(); - assert_eq!(b.block_states.map_ref(|bss| bss.len()), 1); + assert_eq!(b.get_endpoint(addrs.server, |convs| convs.len()).unwrap(), + 1); b.poll_resp(&t_n2, &mut effects, Token(Default::default()), addrs.server) .ok_or(()) .unwrap_err(); - assert_eq!(b.block_states.map_ref(|bss| bss.len()), 0); + assert_eq!(b.get_endpoint(addrs.server, |convs| convs.len()), None); } #[test] @@ -833,7 +866,7 @@ mod tests { let addrs: &'static Addrs = unsafe { core::mem::transmute(&addrs) }; type S = test::MockStep<(), Addrd, Addrd, ()>; - let b = Block::, BTreeMap<_, _>>::default(); + let b = Block::::default(); b.inner().set_poll_req(|_, _, _| { let req = @@ -867,7 +900,7 @@ mod tests { let addrs: &'static Addrs = unsafe { core::mem::transmute(&addrs) }; type S = test::MockStep, Addrd, ()>; - let b = Block::, BTreeMap<_, _>>::default(); + let b = Block::::default(); b.inner() .init(TestState { next_block: 0 }) @@ -920,7 +953,7 @@ mod tests { let addrs: &'static Addrs = unsafe { core::mem::transmute(&addrs) }; type S = test::MockStep<(), Addrd, Addrd, ()>; - let b = Block::, BTreeMap<_, _>>::default(); + let b = Block::::default(); b.inner().set_poll_req(|_, _, _| { let mut req = From 1d11d083b94b5b8111eee65c24c564c431d9192d Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Mon, 8 May 2023 19:11:31 -0500 Subject: [PATCH 10/12] wip --- toad/src/step/block.rs | 66 ++++++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/toad/src/step/block.rs b/toad/src/step/block.rs index a85f2e63..a6a0c455 100644 --- a/toad/src/step/block.rs +++ b/toad/src/step/block.rs @@ -86,16 +86,8 @@ impl Conversation Pieces: Default { /// Create a new [`Conversation`] tracking a Blocked response to a sent request (param `msg`) - pub fn expect_response(expires_at: Instant, msg: Message

) -> Self { - Self { original: Some(msg), - biggest_number_seen: None, - pcs: Default::default(), - expires_at } - } - - /// Create a new [`Conversation`] tracking a received Blocked request - pub fn request(expires_at: Instant) -> Self { - Self { original: None, + pub fn new(expires_at: Instant, original: Option>) -> Self { + Self { original, biggest_number_seen: None, pcs: Default::default(), expires_at } @@ -280,21 +272,13 @@ impl Block, addr: SocketAddr, req: &Message

) { - let exp = snap.time + Milliseconds(snap.config.exchange_lifetime_millis()); - self.get_or_create_endpoint(addr, |convs| { - convs.insert((req.token, Role::Response, Direction::Inbound), - Conversation::expect_response(exp, req.clone())) - .unwrap(); - }); - } - - fn insert_request(&self, + fn insert(&self, snap: &Snapshot

, + original: Option<&Message

>, (addr, token, role, dir): (SocketAddr, Token, Role, Direction)) { let exp = snap.time + Milliseconds(snap.config.exchange_lifetime_millis()); self.get_or_create_endpoint(addr, |convs| { - convs.insert((token, role, dir), Conversation::request(exp)) + convs.insert((token, role, dir), Conversation::new(exp, original.cloned())) .unwrap(); }); } @@ -391,7 +375,7 @@ impl Step

}, | Some(block) => { if !has_prev_pieces && block.num() == 0 && block.more() { - self.insert_request(snap, k); + self.insert(snap, None, k); self.map_mut(k, |conv| { conv.have(snap.time, 0, req.clone().map(|r| r.into()).unwrap()) }); @@ -467,7 +451,7 @@ impl Step

}, | Some(block) => { if !has_prev_pieces { - // TODO: warn + log!(Block::poll_resp, effects, log::Level::Warn, "Response received for token {:?} but we've never seen a request using that token. Ignoring this response despite it having {:?}", rep.data().msg().token, block); Some(Ok(rep)) } else { self.map_mut(k, |conv| { @@ -496,16 +480,48 @@ impl Step

} } + fn before_message_sent(&self, + snap: &platform::Snapshot

, + effs: &mut P::Effects, + msg: &mut Addrd>) + -> Result<(), Self::Error> { + self.prune(effs, snap.time); + self.inner.before_message_sent(snap, effs, msg)?; + + let block_size: usize = 1024; + + let original_payload = msg.data().payload().0; + + // TODO: block if 1024 is too big and we got REQUEST_ENTITY_TOO_LARGE + if msg.data().block1().is_none() && original_payload.len() > block_size { + let k = (msg.addr(), msg.data().token, Role::Request, Direction::Outbound); + self.insert(snap, Some(msg.data()), k); + self.map_mut(k, |conv| { + let len = original_payload.len() as f32; + let block_count = (len / block_size as f32).ceil() as u32; + for n in 0..block_count { + let mut msg_block = msg.clone(); + msg_block.as_mut().set_block1(1024, n, n == block_count - 1).ok(); + let mut p = P::MessagePayload::default(); + p.append_copy(original_payload[n * 1024..((n + 1) * 1024)]); + msg_block.as_mut().payload = Payload(p); + conv.have(snap.time, n, msg_block.unwrap()); + } + }).unwrap(); + } + Ok(()) + } + fn on_message_sent(&self, snap: &platform::Snapshot

, effs: &mut P::Effects, msg: &Addrd>) -> Result<(), Self::Error> { - self.prune(effs, snap.time); self.inner.on_message_sent(snap, effs, msg)?; if msg.data().code.kind() == CodeKind::Request { - self.expect_response(snap, msg.addr(), msg.data()); + self.insert(snap, Some(msg.data()), (msg.addr(), msg.data().token, Role::Response, Direction::Inbound)); } else if msg.data().code.kind() == CodeKind::Response { + // TODO: block outbound responses } Ok(()) From 52db1a2ef940d7a428d3892caf81f9e2b413a323 Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Thu, 11 May 2023 14:30:21 -0500 Subject: [PATCH 11/12] feat: bump array --- Cargo.lock | 26 +++++++++++++------------- toad/src/step/block.rs | 4 ++-- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bb82459c..1b71fde8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1006,7 +1006,7 @@ dependencies = [ "toad-len 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "toad-macros 0.2.0", "toad-map 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "toad-msg 0.18.1 (registry+https://github.com/rust-lang/crates.io-index)", + "toad-msg 0.18.1", "toad-stem 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-string 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-writable 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1035,7 +1035,7 @@ dependencies = [ "toad-len 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "toad-macros 0.2.0", "toad-map 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "toad-msg 0.18.1 (registry+https://github.com/rust-lang/crates.io-index)", + "toad-msg 0.18.1", "toad-stem 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-string 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-writable 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1126,7 +1126,7 @@ dependencies = [ "toad 0.19.1 (registry+https://github.com/rust-lang/crates.io-index)", "toad-array 0.5.0", "toad-len 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", - "toad-msg 0.18.1 (registry+https://github.com/rust-lang/crates.io-index)", + "toad-msg 0.18.1", "toad-stem 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1187,15 +1187,12 @@ dependencies = [ [[package]] name = "toad-msg" version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d78ad790094eefd8146fab35178e23b224285fe9b275132a31872c0f74b1a8ff" dependencies = [ - "arrayvec", "blake2", - "coap-lite", - "criterion", - "heapless", - "itertools", "tinyvec", - "toad-array 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "toad-array 0.2.3", "toad-cursor 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-hash 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-len 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1205,13 +1202,16 @@ dependencies = [ [[package]] name = "toad-msg" -version = "0.18.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d78ad790094eefd8146fab35178e23b224285fe9b275132a31872c0f74b1a8ff" +version = "0.19.0" dependencies = [ + "arrayvec", "blake2", + "coap-lite", + "criterion", + "heapless", + "itertools", "tinyvec", - "toad-array 0.2.3", + "toad-array 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-cursor 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-hash 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-len 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/toad/src/step/block.rs b/toad/src/step/block.rs index a6a0c455..e02eb66a 100644 --- a/toad/src/step/block.rs +++ b/toad/src/step/block.rs @@ -490,7 +490,7 @@ impl Step

let block_size: usize = 1024; - let original_payload = msg.data().payload().0; + let original_payload = &msg.data().payload().0; // TODO: block if 1024 is too big and we got REQUEST_ENTITY_TOO_LARGE if msg.data().block1().is_none() && original_payload.len() > block_size { @@ -503,7 +503,7 @@ impl Step

let mut msg_block = msg.clone(); msg_block.as_mut().set_block1(1024, n, n == block_count - 1).ok(); let mut p = P::MessagePayload::default(); - p.append_copy(original_payload[n * 1024..((n + 1) * 1024)]); + p.append_copy(&original_payload[(n as usize) * 1024..(((n as usize) + 1) * 1024)]); msg_block.as_mut().payload = Payload(p); conv.have(snap.time, n, msg_block.unwrap()); } From de7b8629161345a43fd18c47d438c83e0b0862fb Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Fri, 12 May 2023 19:39:39 -0500 Subject: [PATCH 12/12] fix: bump toad-array --- Cargo.lock | 20 ++++++++++-- toad/Cargo.toml | 4 +-- toad/src/platform.rs | 6 ++-- toad/src/step/ack.rs | 4 +-- toad/src/step/block.rs | 54 +++++++++++++++++-------------- toad/src/step/buffer_responses.rs | 4 +-- toad/src/step/handle_acks.rs | 2 +- toad/src/step/mod.rs | 4 +-- toad/src/step/observe.rs | 6 ++-- toad/src/step/provision_ids.rs | 4 +-- toad/src/step/retry.rs | 14 ++++---- 11 files changed, 72 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1b71fde8..352cc12c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1001,12 +1001,12 @@ dependencies = [ "serde_json", "simple_logger", "tinyvec", - "toad-array 0.2.3", + "toad-array 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-hash 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-len 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "toad-macros 0.2.0", "toad-map 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "toad-msg 0.18.1", + "toad-msg 0.19.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-stem 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-string 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-writable 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1219,6 +1219,22 @@ dependencies = [ "toad-map 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "toad-msg" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c3b34b3dfbea374373990c1a2324a073251ca76c9896f1675756b9cb26bb39f" +dependencies = [ + "blake2", + "tinyvec", + "toad-array 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "toad-cursor 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "toad-hash 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "toad-len 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "toad-macros 0.2.0", + "toad-map 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "toad-stem" version = "0.1.0" diff --git a/toad/Cargo.toml b/toad/Cargo.toml index 379393f6..b8eb81dd 100644 --- a/toad/Cargo.toml +++ b/toad/Cargo.toml @@ -38,14 +38,14 @@ test = [] docs = [] [dependencies] -toad-array = {version = "0.2.3", default_features = false} +toad-array = {version = "0.8.0", default_features = false} toad-map = {version = "0.2.3", default_features = false} toad-len = {version = "0.1.3", default_features = false} toad-hash = {version = "0.3.0", default_features = false} toad-writable = {version = "0.1.1", default_features = false} toad-stem = {version = "0.1.0", default_features = false} toad-string = {version = "0.2.0", default_features = false} -toad-msg = "0.18.1" +toad-msg = "0.19.0" toad-macros = "0.2.0" log = "0.4" tinyvec = { version = "1.5", default_features = false, features = [ diff --git a/toad/src/platform.rs b/toad/src/platform.rs index 2ebae444..e3b71e35 100644 --- a/toad/src/platform.rs +++ b/toad/src/platform.rs @@ -6,7 +6,7 @@ use naan::prelude::MonadOnce; use no_std_net::SocketAddr; #[cfg(feature = "alloc")] use std_alloc::vec::Vec; -use toad_array::{AppendCopy, Array}; +use toad_array::{AppendCopy, Array, Indexed}; use crate::config::Config; use crate::net::{Addrd, Socket}; @@ -232,11 +232,11 @@ pub trait Platform | Ok(()) => nb::block!(self.exec_1(&eff)).map_err(|e| { let mut effs: ::Effects = Default::default(); - effs.push(eff); + effs.append(eff); (effs, e) }), | Err((mut effs, e)) => { - effs.push(eff); + effs.append(eff); Err((effs, e)) }, }) diff --git a/toad/src/step/ack.rs b/toad/src/step/ack.rs index 50591462..84608171 100644 --- a/toad/src/step/ack.rs +++ b/toad/src/step/ack.rs @@ -1,4 +1,4 @@ -use toad_array::Array; +use toad_array::{Array, Indexed}; use toad_msg::{CodeKind, Type}; use super::{exec_inner_step, Step, StepOutput}; @@ -50,7 +50,7 @@ impl, PollResp = InnerPollResp

>, P: if req.data().as_ref().ty == Type::Con && req.data().as_ref().code.kind() == CodeKind::Request => { - effects.push(Effect::Send(Addrd(Resp::ack(req.as_ref().data()).into(), req.addr()))); + effects.append(Effect::Send(Addrd(Resp::ack(req.as_ref().data()).into(), req.addr()))); Some(Ok(req)) }, | Some(req) => Some(Ok(req)), diff --git a/toad/src/step/block.rs b/toad/src/step/block.rs index e02eb66a..5e7cd076 100644 --- a/toad/src/step/block.rs +++ b/toad/src/step/block.rs @@ -3,7 +3,7 @@ use core::marker::PhantomData; use embedded_time::duration::Milliseconds; use embedded_time::Instant; use no_std_net::SocketAddr; -use toad_array::{AppendCopy, Array}; +use toad_array::{AppendCopy, Array, Indexed}; use toad_map::Map; use toad_msg::no_repeat::{BLOCK1, BLOCK2, SIZE2}; use toad_msg::{CodeKind, Id, MessageOptions, Payload, Token, Type}; @@ -273,12 +273,13 @@ impl Block, - original: Option<&Message

>, - (addr, token, role, dir): (SocketAddr, Token, Role, Direction)) { + snap: &Snapshot

, + original: Option<&Message

>, + (addr, token, role, dir): (SocketAddr, Token, Role, Direction)) { let exp = snap.time + Milliseconds(snap.config.exchange_lifetime_millis()); self.get_or_create_endpoint(addr, |convs| { - convs.insert((token, role, dir), Conversation::new(exp, original.cloned())) + convs.insert((token, role, dir), + Conversation::new(exp, original.cloned())) .unwrap(); }); } @@ -349,7 +350,7 @@ impl Step

}; let rep = Message::

::new(rep_ty, $code, Id(0), req.data().msg().token); - effects.push(Effect::Send(Addrd(rep, req.addr()))); + effects.append(Effect::Send(Addrd(rep, req.addr()))); }}; } @@ -439,7 +440,7 @@ impl Step

new.remove(BLOCK2); new.remove(SIZE2); - effects.push(Effect::Send(Addrd(new, rep.addr()))); + effects.append(Effect::Send(Addrd(new, rep.addr()))); self.map_mut(k, |conv| conv.waiting(snap.time, $num)); }}; } @@ -481,10 +482,10 @@ impl Step

} fn before_message_sent(&self, - snap: &platform::Snapshot

, - effs: &mut P::Effects, - msg: &mut Addrd>) - -> Result<(), Self::Error> { + snap: &platform::Snapshot

, + effs: &mut P::Effects, + msg: &mut Addrd>) + -> Result<(), Self::Error> { self.prune(effs, snap.time); self.inner.before_message_sent(snap, effs, msg)?; @@ -494,20 +495,23 @@ impl Step

// TODO: block if 1024 is too big and we got REQUEST_ENTITY_TOO_LARGE if msg.data().block1().is_none() && original_payload.len() > block_size { - let k = (msg.addr(), msg.data().token, Role::Request, Direction::Outbound); + let k = (msg.addr(), msg.data().token, Role::Request, Direction::Outbound); self.insert(snap, Some(msg.data()), k); self.map_mut(k, |conv| { - let len = original_payload.len() as f32; - let block_count = (len / block_size as f32).ceil() as u32; - for n in 0..block_count { - let mut msg_block = msg.clone(); - msg_block.as_mut().set_block1(1024, n, n == block_count - 1).ok(); - let mut p = P::MessagePayload::default(); - p.append_copy(&original_payload[(n as usize) * 1024..(((n as usize) + 1) * 1024)]); - msg_block.as_mut().payload = Payload(p); - conv.have(snap.time, n, msg_block.unwrap()); - } - }).unwrap(); + let len = original_payload.len() as f32; + let block_count = (len / block_size as f32).ceil() as u32; + for n in 0..block_count { + let mut msg_block = msg.clone(); + msg_block.as_mut() + .set_block1(1024, n, n == block_count - 1) + .ok(); + let mut p = P::MessagePayload::default(); + p.append_copy(&original_payload[(n as usize) * 1024..(((n as usize) + 1) * 1024)]); + msg_block.as_mut().payload = Payload(p); + conv.have(snap.time, n, msg_block.unwrap()); + } + }) + .unwrap(); } Ok(()) } @@ -519,7 +523,9 @@ impl Step

-> Result<(), Self::Error> { self.inner.on_message_sent(snap, effs, msg)?; if msg.data().code.kind() == CodeKind::Request { - self.insert(snap, Some(msg.data()), (msg.addr(), msg.data().token, Role::Response, Direction::Inbound)); + self.insert(snap, + Some(msg.data()), + (msg.addr(), msg.data().token, Role::Response, Direction::Inbound)); } else if msg.data().code.kind() == CodeKind::Response { // TODO: block outbound responses } diff --git a/toad/src/step/buffer_responses.rs b/toad/src/step/buffer_responses.rs index 563d6a8a..e9d1a8f8 100644 --- a/toad/src/step/buffer_responses.rs +++ b/toad/src/step/buffer_responses.rs @@ -1,7 +1,7 @@ use core::fmt::Write; use no_std_net::SocketAddr; -use toad_array::Array; +use toad_array::{Array, Indexed}; use toad_len::Len; use toad_map::Map; use toad_msg::{Token, Type}; @@ -131,7 +131,7 @@ impl {{ - use toad_array::Array; + use toad_array::Indexed; type S = $crate::todo::String::<1000>; let msg = S::fmt(format_args!($($arg)*)); let msg = S::fmt(format_args!("[{}] {}", stringify!($at), msg.as_str())); - $effs.push($crate::platform::Effect::Log($lvl, msg)); + $effs.append($crate::platform::Effect::Log($lvl, msg)); }}; } diff --git a/toad/src/step/observe.rs b/toad/src/step/observe.rs index 1e7f89cb..02b04d7e 100644 --- a/toad/src/step/observe.rs +++ b/toad/src/step/observe.rs @@ -3,7 +3,7 @@ use core::hash::{Hash, Hasher}; use core::marker::PhantomData; use no_std_net::SocketAddr; -use toad_array::Array; +use toad_array::{Array, Indexed}; use toad_msg::opt::known::observe::Action::{Deregister, Register}; use toad_msg::opt::known::repeat::QUERY; use toad_msg::repeat::PATH; @@ -259,7 +259,7 @@ impl Observe { req.data().msg().token); let mut sub = Some(Sub::new(req.clone())); self.subs - .map_mut(move |s| s.push(Option::take(&mut sub).expect("closure only invoked once"))); + .map_mut(move |s| s.append(Option::take(&mut sub).expect("closure only invoked once"))); }, | Some(Deregister) => { log!(Observe::handle_incoming_request, @@ -414,7 +414,7 @@ impl Step

for Observe "=> {:?} {:?}", sub.addr(), msg.data().token); - effs.push(Effect::Send(msg.with_addr(sub.addr()))); + effs.append(Effect::Send(msg.with_addr(sub.addr()))); }) }); } else { diff --git a/toad/src/step/provision_ids.rs b/toad/src/step/provision_ids.rs index b9d9f9dc..99b691fe 100644 --- a/toad/src/step/provision_ids.rs +++ b/toad/src/step/provision_ids.rs @@ -5,7 +5,7 @@ use embedded_time::duration::Milliseconds; use embedded_time::Instant; use no_std_net::SocketAddr; use tinyvec::ArrayVec; -use toad_array::Array; +use toad_array::{Array, Indexed}; use toad_len::Len; use toad_map::{InsertError, Map}; use toad_msg::Id; @@ -291,7 +291,7 @@ impl ProvisionIds log::Level::Trace, "Saw new {:?}", id); - ids.push(Stamped(IdWithDefault(id), now)); + ids.append(Stamped(IdWithDefault(id), now)); }, } } diff --git a/toad/src/step/retry.rs b/toad/src/step/retry.rs index 0fcc4fec..37be6402 100644 --- a/toad/src/step/retry.rs +++ b/toad/src/step/retry.rs @@ -1,6 +1,6 @@ use embedded_time::duration::Milliseconds; use embedded_time::Instant; -use toad_array::Array; +use toad_array::{Array, Indexed}; use toad_msg::{CodeKind, Token, Type}; use toad_stem::Stem; use toad_string::{format, String}; @@ -71,7 +71,7 @@ pub trait Buf

dbg.msg_short, dbg.msg_should_be, dbg.since_last_attempt); - effects.push(Effect::Send(msg.clone())); + effects.append(Effect::Send(msg.clone())); }, | _ => log!(retry::Buf::attempt_all, effects, @@ -222,10 +222,10 @@ pub trait Buf

let timer = RetryTimer::new(now, config.msg.con.unacked_retry_strategy, config.msg.con.max_attempts); - self.push((State::ConPreAck { timer, - post_ack_strategy: config.msg.con.acked_retry_strategy, - post_ack_max_attempts: config.msg.con.max_attempts }, - msg.clone())); + self.append((State::ConPreAck { timer, + post_ack_strategy: config.msg.con.acked_retry_strategy, + post_ack_max_attempts: config.msg.con.max_attempts }, + msg.clone())); log!(retry::Buf::store_retryables, effects, @@ -244,7 +244,7 @@ pub trait Buf

let timer = RetryTimer::new(now, config.msg.non.retry_strategy, config.msg.non.max_attempts); - self.push((State::Just(timer), msg.clone())); + self.append((State::Just(timer), msg.clone())); Ok(()) },