diff --git a/Cargo.lock b/Cargo.lock index a62e0cb4..352cc12c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -600,9 +600,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "openssl" -version = "0.10.51" +version = "0.10.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97ea2d98598bf9ada7ea6ee8a30fb74f9156b63bbe495d64ec2b87c269d2dda3" +checksum = "01b8574602df80f7b85fdfc5392fa884a4e3b3f4f35402c070ab34c3d3f78d56" dependencies = [ "bitflags", "cfg-if", @@ -626,9 +626,9 @@ dependencies = [ [[package]] name = "openssl-sys" -version = "0.9.86" +version = "0.9.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "992bac49bdbab4423199c654a5515bd2a6c6a23bf03f2dd3bdb7e5ae6259bc69" +checksum = "8e17f59264b2809d77ae94f0e1ebabc434773f370d6ca667bd223ea10e06cc7e" dependencies = [ "cc", "libc", @@ -1001,12 +1001,12 @@ dependencies = [ "serde_json", "simple_logger", "tinyvec", - "toad-array 0.2.3", + "toad-array 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-hash 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-len 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "toad-macros 0.2.0", "toad-map 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "toad-msg 0.18.1 (registry+https://github.com/rust-lang/crates.io-index)", + "toad-msg 0.19.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-stem 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-string 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-writable 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1035,7 +1035,7 @@ dependencies = [ "toad-len 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "toad-macros 0.2.0", "toad-map 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "toad-msg 0.18.1 (registry+https://github.com/rust-lang/crates.io-index)", + "toad-msg 0.18.1", "toad-stem 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-string 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-writable 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1126,7 +1126,7 @@ dependencies = [ "toad 0.19.1 (registry+https://github.com/rust-lang/crates.io-index)", "toad-array 0.5.0", "toad-len 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", - "toad-msg 0.18.1 (registry+https://github.com/rust-lang/crates.io-index)", + "toad-msg 0.18.1", "toad-stem 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1187,6 +1187,22 @@ dependencies = [ [[package]] name = "toad-msg" version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d78ad790094eefd8146fab35178e23b224285fe9b275132a31872c0f74b1a8ff" +dependencies = [ + "blake2", + "tinyvec", + "toad-array 0.2.3", + "toad-cursor 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "toad-hash 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "toad-len 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "toad-macros 0.2.0", + "toad-map 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "toad-msg" +version = "0.19.0" dependencies = [ "arrayvec", "blake2", @@ -1205,13 +1221,13 @@ dependencies = [ [[package]] name = "toad-msg" -version = "0.18.1" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d78ad790094eefd8146fab35178e23b224285fe9b275132a31872c0f74b1a8ff" +checksum = "6c3b34b3dfbea374373990c1a2324a073251ca76c9896f1675756b9cb26bb39f" dependencies = [ "blake2", "tinyvec", - "toad-array 0.2.3", + "toad-array 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-cursor 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-hash 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "toad-len 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/toad/Cargo.toml b/toad/Cargo.toml index d4455833..b8eb81dd 100644 --- a/toad/Cargo.toml +++ b/toad/Cargo.toml @@ -38,17 +38,19 @@ test = [] docs = [] [dependencies] -toad-array = {version = "0.2.3", default_features = false} +toad-array = {version = "0.8.0", default_features = false} toad-map = {version = "0.2.3", default_features = false} toad-len = {version = "0.1.3", default_features = false} toad-hash = {version = "0.3.0", default_features = false} toad-writable = {version = "0.1.1", default_features = false} toad-stem = {version = "0.1.0", default_features = false} toad-string = {version = "0.2.0", default_features = false} -toad-msg = "0.18.1" +toad-msg = "0.19.0" toad-macros = "0.2.0" log = "0.4" -tinyvec = { version = "1.5", default_features = false, features = ["rustc_1_55"] } +tinyvec = { version = "1.5", default_features = false, features = [ + "rustc_1_55" +] } no-std-net = "0.6" embedded-time = "0.12" nb = "1" @@ -65,6 +67,6 @@ serde-json-core = { version = "0.5.0", optional = true } simple_logger = "2" lazycell = "1.3.0" paste = "1.0.9" -serde = {version = "1.0", features = ["derive"]} +serde = { version = "1.0", features = ["derive"] } serde-json-core = { version = "0.5.0" } serde_json = { version = "1.0" } diff --git a/toad/src/config.rs b/toad/src/config.rs index 71b095f7..27ff7cf1 100644 --- a/toad/src/config.rs +++ b/toad/src/config.rs @@ -1,5 +1,3 @@ -#![allow(dead_code)] - use embedded_time::duration::Milliseconds; use crate::retry::{Attempts, Strategy}; diff --git a/toad/src/lib.rs b/toad/src/lib.rs index 1a14b3fb..1ba008b3 100644 --- a/toad/src/lib.rs +++ b/toad/src/lib.rs @@ -155,16 +155,21 @@ pub mod multicast { } macro_rules! code { - (rfc7252($section:literal) $name:ident = $c:literal.$d:literal) => { + (rfc7252($section:literal) $name:ident = $c:literal*$d:literal) => { #[doc = toad_macros::rfc_7252_doc!($section)] #[allow(clippy::zero_prefixed_literal)] pub const $name: toad_msg::Code = toad_msg::Code::new($c, $d); }; - (rfc7252($section:literal) $name:ident = $newtype:tt($c:literal.$d:literal)) => { + (rfc7252($section:literal) $name:ident = $newtype:tt($c:literal*$d:literal)) => { #[doc = toad_macros::rfc_7252_doc!($section)] #[allow(clippy::zero_prefixed_literal)] pub const $name: $newtype = $newtype(toad_msg::Code::new($c, $d)); }; + (#[doc = $docstr:expr] $name:ident = $c:literal*$d:literal) => { + #[doc = $docstr] + #[allow(clippy::zero_prefixed_literal)] + pub const $name: toad_msg::Code = toad_msg::Code::new($c, $d); + }; } pub(crate) use code; diff --git a/toad/src/platform.rs b/toad/src/platform.rs index d5ac51d6..e3b71e35 100644 --- a/toad/src/platform.rs +++ b/toad/src/platform.rs @@ -6,7 +6,7 @@ use naan::prelude::MonadOnce; use no_std_net::SocketAddr; #[cfg(feature = "alloc")] use std_alloc::vec::Vec; -use toad_array::{AppendCopy, Array}; +use toad_array::{AppendCopy, Array, Indexed}; use crate::config::Config; use crate::net::{Addrd, Socket}; @@ -232,11 +232,11 @@ pub trait Platform | Ok(()) => nb::block!(self.exec_1(&eff)).map_err(|e| { let mut effs: ::Effects = Default::default(); - effs.push(eff); + effs.append(eff); (effs, e) }), | Err((mut effs, e)) => { - effs.push(eff); + effs.append(eff); Err((effs, e)) }, }) @@ -341,6 +341,22 @@ pub enum Effect

Nop, } +impl

Effect

where P: PlatformTypes +{ + /// Is this [`Effect::Send`]? + pub fn is_send(&self) -> bool { + self.get_send().is_some() + } + + /// If this is [`Effect::Send`], yields a reference to the message + pub fn get_send(&self) -> Option<&Addrd>> { + match self { + | Self::Send(r) => Some(r), + | _ => None, + } + } +} + impl

Default for Effect

where P: PlatformTypes { fn default() -> Self { diff --git a/toad/src/req/method.rs b/toad/src/req/method.rs index 46b11e8c..ff3bfe79 100644 --- a/toad/src/req/method.rs +++ b/toad/src/req/method.rs @@ -45,9 +45,9 @@ impl core::fmt::Display for Method { } impl Method { - code!(rfc7252("4.1") EMPTY = Method(0 . 00)); - code!(rfc7252("5.8.1") GET = Method(0 . 01)); - code!(rfc7252("5.8.2") POST = Method(0 . 02)); - code!(rfc7252("5.8.3") PUT = Method(0 . 03)); - code!(rfc7252("5.8.4") DELETE = Method(0 . 04)); + code!(rfc7252("4.1") EMPTY = Method(0*00)); + code!(rfc7252("5.8.1") GET = Method(0*01)); + code!(rfc7252("5.8.2") POST = Method(0*02)); + code!(rfc7252("5.8.3") PUT = Method(0*03)); + code!(rfc7252("5.8.4") DELETE = Method(0*04)); } diff --git a/toad/src/resp/code.rs b/toad/src/resp/code.rs index 7c893467..ddf9f2c7 100644 --- a/toad/src/resp/code.rs +++ b/toad/src/resp/code.rs @@ -3,28 +3,50 @@ pub use toad_msg::Code; use crate::code; // 2.xx -code!(rfc7252("5.9.1.1") CREATED = 2 . 01); -code!(rfc7252("5.9.1.2") DELETED = 2 . 02); -code!(rfc7252("5.9.1.3") VALID = 2 . 03); -code!(rfc7252("5.9.1.4") CHANGED = 2 . 04); -code!(rfc7252("5.9.1.5") CONTENT = 2 . 05); +code!(rfc7252("5.9.1.1") CREATED = 2*01); +code!(rfc7252("5.9.1.2") DELETED = 2*02); +code!(rfc7252("5.9.1.3") VALID = 2*03); +code!(rfc7252("5.9.1.4") CHANGED = 2*04); +code!(rfc7252("5.9.1.5") CONTENT = 2*05); +code!( + #[doc = concat!( + "## [2.31 Continue](https://www.rfc-editor.org/rfc/rfc7959#section-2.9.1)\n", + "This success status code indicates that the transfer of this\n", + "block of the request body was successful and that the server\n", + "encourages sending further blocks, but that a final outcome of the\n", + "whole block-wise request cannot yet be determined. No payload is\n", + "returned with this response code.", + )] + CONTINUE = 2 * 31 +); // 4.xx -code!(rfc7252("5.9.2.1") BAD_REQUEST = 4 . 00); -code!(rfc7252("5.9.2.2") UNAUTHORIZED = 4 . 01); -code!(rfc7252("5.9.2.3") BAD_OPTION = 4 . 02); -code!(rfc7252("5.9.2.4") FORBIDDEN = 4 . 03); -code!(rfc7252("5.9.2.5") NOT_FOUND = 4 . 04); -code!(rfc7252("5.9.2.6") METHOD_NOT_ALLOWED = 4 . 05); -code!(rfc7252("5.9.2.7") NOT_ACCEPTABLE = 4 . 06); -code!(rfc7252("5.9.2.8") PRECONDITION_FAILED = 4 . 12); -code!(rfc7252("5.9.2.9") REQUEST_ENTITY_TOO_LARGE = 4 . 13); -code!(rfc7252("5.9.2.10") UNSUPPORTED_CONTENT_FORMAT = 4 . 15); +code!(rfc7252("5.9.2.1") BAD_REQUEST = 4*00); +code!(rfc7252("5.9.2.2") UNAUTHORIZED = 4*01); +code!(rfc7252("5.9.2.3") BAD_OPTION = 4*02); +code!(rfc7252("5.9.2.4") FORBIDDEN = 4*03); +code!(rfc7252("5.9.2.5") NOT_FOUND = 4*04); +code!(rfc7252("5.9.2.6") METHOD_NOT_ALLOWED = 4*05); +code!(rfc7252("5.9.2.7") NOT_ACCEPTABLE = 4*06); +code!( + #[doc = concat!( + "## [4.08 Request Entity Incomplete](https://www.rfc-editor.org/rfc/rfc7959#section-2.9.2)\n", + "This client error status code indicates that the server has not\n", + "received the blocks of the request body that it needs to proceed.\n", + "The client has not sent all blocks, not sent them in the order\n", + "required by the server, or has sent them long enough ago that the\n", + "server has already discarded them.", + )] + REQUEST_ENTITY_INCOMPLETE = 4 * 08 +); +code!(rfc7252("5.9.2.8") PRECONDITION_FAILED = 4*12); +code!(rfc7252("5.9.2.9") REQUEST_ENTITY_TOO_LARGE = 4*13); +code!(rfc7252("5.9.2.10") UNSUPPORTED_CONTENT_FORMAT = 4*15); // 5.xx -code!(rfc7252("5.9.3.1") INTERNAL_SERVER_ERROR = 5 . 00); -code!(rfc7252("5.9.3.2") NOT_IMPLEMENTED = 5 . 01); -code!(rfc7252("5.9.3.3") BAD_GATEWAY = 5 . 02); -code!(rfc7252("5.9.3.4") SERVICE_UNAVAILABLE = 5 . 03); -code!(rfc7252("5.9.3.5") GATEWAY_TIMEOUT = 5 . 04); -code!(rfc7252("5.9.3.6") PROXYING_NOT_SUPPORTED = 5 . 05); +code!(rfc7252("5.9.3.1") INTERNAL_SERVER_ERROR = 5*00); +code!(rfc7252("5.9.3.2") NOT_IMPLEMENTED = 5*01); +code!(rfc7252("5.9.3.3") BAD_GATEWAY = 5*02); +code!(rfc7252("5.9.3.4") SERVICE_UNAVAILABLE = 5*03); +code!(rfc7252("5.9.3.5") GATEWAY_TIMEOUT = 5*04); +code!(rfc7252("5.9.3.6") PROXYING_NOT_SUPPORTED = 5*05); diff --git a/toad/src/resp/mod.rs b/toad/src/resp/mod.rs index d846c212..4aa0be4a 100644 --- a/toad/src/resp/mod.rs +++ b/toad/src/resp/mod.rs @@ -129,13 +129,13 @@ impl Resp

{ /// Create a response ACKnowledging an incoming request. /// - /// An ack response must be used when you receive - /// a CON request. + /// Received CON requests will be continually retried until + /// ACKed, making it very important that we acknowledge them + /// quickly on receipt. /// - /// You may choose to include the response payload in an ACK, - /// but keep in mind that you might receive duplicate - /// If you do need to ensure they receive your response, - /// you + /// Servers may choose to include a response to the request + /// along with the ACK (entailing a response [`Code`] and [`Payload`]), + /// as long as care is taken to not delay between receipt and ACK. pub fn ack(req: &Req

) -> Self { let msg = Message { ty: Type::Ack, id: req.msg().id, diff --git a/toad/src/step/ack.rs b/toad/src/step/ack.rs index 50591462..84608171 100644 --- a/toad/src/step/ack.rs +++ b/toad/src/step/ack.rs @@ -1,4 +1,4 @@ -use toad_array::Array; +use toad_array::{Array, Indexed}; use toad_msg::{CodeKind, Type}; use super::{exec_inner_step, Step, StepOutput}; @@ -50,7 +50,7 @@ impl, PollResp = InnerPollResp

>, P: if req.data().as_ref().ty == Type::Con && req.data().as_ref().code.kind() == CodeKind::Request => { - effects.push(Effect::Send(Addrd(Resp::ack(req.as_ref().data()).into(), req.addr()))); + effects.append(Effect::Send(Addrd(Resp::ack(req.as_ref().data()).into(), req.addr()))); Some(Ok(req)) }, | Some(req) => Some(Ok(req)), diff --git a/toad/src/step/block.rs b/toad/src/step/block.rs new file mode 100644 index 00000000..5e7cd076 --- /dev/null +++ b/toad/src/step/block.rs @@ -0,0 +1,997 @@ +use core::marker::PhantomData; + +use embedded_time::duration::Milliseconds; +use embedded_time::Instant; +use no_std_net::SocketAddr; +use toad_array::{AppendCopy, Array, Indexed}; +use toad_map::Map; +use toad_msg::no_repeat::{BLOCK1, BLOCK2, SIZE2}; +use toad_msg::{CodeKind, Id, MessageOptions, Payload, Token, Type}; +use toad_stem::Stem; + +use super::{log, Step, _try}; +use crate::net::Addrd; +use crate::platform::toad_msg::Message; +use crate::platform::{self, Effect, PlatformTypes, Snapshot}; +use crate::req::Req; +use crate::resp::code::{CONTINUE, REQUEST_ENTITY_INCOMPLETE}; +use crate::resp::Resp; + +/// A potential role for a blocked message (request / response) +/// +/// Part of composite map keys identifying [`Conversation`]s. +#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy)] +pub enum Role { + #[allow(missing_docs)] + Request, + #[allow(missing_docs)] + Response, +} + +/// Whether a blocked message is outbound or inbound +/// +/// Part of composite map keys identifying [`Conversation`]s. +#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy)] +pub enum Direction { + #[allow(missing_docs)] + Outbound, + #[allow(missing_docs)] + Inbound, +} + +/// A single piece of a blocked message +#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy)] +pub enum Piece { + /// For [`Direction::Inbound`], indicates that we have received this piece, + /// and have the data `M` associated with a piece. + /// + /// For [`Direction::Outbound`], indicates that we have not yet communicated + /// this piece, and are waiting for the right time to transition to a different state. + Data(M), + /// For [`Direction::Inbound`], indicates that we have requested this piece, + /// and expect that it will be transitioned to [`Piece::Have`] at some point in + /// the future. + /// + /// For [`Direction::Outbound`], this state is impossible. + Requested, + /// For [`Direction::Inbound`], indicates that we have not yet received or asked for this piece. + /// + /// For [`Direction::Outbound`], indicates that we previously had this piece and have + /// sent it to the remote endpoint. + Absent, +} + +impl Piece { + fn get_msg(&self) -> Option<&M> { + match self { + | Piece::Data(m) => Some(m), + | _ => None, + } + } +} + +/// The state for a given Blocked conversation between ourselves and a remote endpoint. +#[derive(Debug)] +pub struct Conversation + where P: PlatformTypes +{ + pub(self) biggest_number_seen: Option, + pub(self) original: Option>, + pub(self) pcs: Pieces, + pub(self) expires_at: Instant, +} + +impl Conversation + where P: PlatformTypes, + Pieces: Default +{ + /// Create a new [`Conversation`] tracking a Blocked response to a sent request (param `msg`) + pub fn new(expires_at: Instant, original: Option>) -> Self { + Self { original, + biggest_number_seen: None, + pcs: Default::default(), + expires_at } + } + + pub(self) fn assembled(&self) -> Payload + where Pieces: Map>> + { + const PANIC_MSG: &'static str = r#"BlockState.assembled() assumes: +- BlockState.biggest is Some(_) +- BlockState contains at least one Piece +- BlockState.have_all() has been invoked and confirmed to be true"#; + + let mut p = P::MessagePayload::default(); + for i in 0..=self.biggest_number_seen.expect(PANIC_MSG) { + p.append_copy(&self.pcs + .get(&i) + .expect(PANIC_MSG) + .get_msg() + .expect(PANIC_MSG) + .payload + .0); + } + + Payload(p) + } + + /// find a missing piece that should be requested + pub(self) fn get_missing(&self) -> Option + where Pieces: Map>, + T: PartialEq + { + self.pcs + .iter() + .find_map(|(n, p)| if p == &Piece::Absent { Some(n) } else { None }) + .copied() + } + + /// are no pieces waiting or missing? + pub(self) fn have_all(&self) -> bool + where Pieces: Map>, + T: PartialEq + { + self.pcs + .iter() + .all(|(_, p)| p != &Piece::Absent && p != &Piece::Requested) + } + + /// if `n > self.biggest`, update `self.biggest` to `n` + /// and insert `Piece::Missing` for all pieces between `biggest` and `n` + pub(self) fn touch(&mut self, now: Instant, n: u32) + where Pieces: Map> + { + let missing_nums = match self.biggest_number_seen { + | Some(m) if m + 1 < n => (m + 1..n).into_iter(), + | None if n > 0 => (0..n).into_iter(), + | _ => (0..0).into_iter(), + }; + + missing_nums.for_each(|n| { + self.pcs.insert(n, Piece::Absent).ok(); + }); + + let n_is_bigger = self.biggest_number_seen.map(|m| m < n).unwrap_or(true); + if n_is_bigger { + self.biggest_number_seen = Some(n); + } + } + + /// Mark piece `n` as having been requested + pub(self) fn waiting(&mut self, now: Instant, n: u32) + where Pieces: Map> + { + let e = self.pcs.get_mut(&n); + + match e { + | Some(Piece::Absent) | Some(Piece::Requested) | None => { + self.touch(now, n); + self.pcs.insert(n, Piece::Requested).ok(); + }, + | _ => (), + } + } + + /// Store piece `T` corresponding to piece number `n` + pub(self) fn have(&mut self, now: Instant, n: u32, m: T) + where Pieces: Map> + { + self.touch(now, n); + self.pcs.insert(n, Piece::Data(m)).ok(); + } +} + +/// TODO +#[derive(Debug)] +pub struct Block { + inner: S, + endpoints: Stem, + __p: PhantomData<(P, Conversations, Pieces)>, +} + +impl Default + for Block + where S: Default, + Endpoints: Default +{ + fn default() -> Self { + Block { inner: S::default(), + endpoints: Stem::new(Endpoints::default()), + __p: PhantomData } + } +} + +impl Block + where P: PlatformTypes, + Endpoints: Default + Map, + Conversations: + core::fmt::Debug + Default + Map<(Token, Role, Direction), Conversation>, + Pieces: core::fmt::Debug + Default + Map>> +{ + fn prune_conversations(&self, + effects: &mut P::Effects, + now: Instant, + cs: &mut Conversations) { + let len_before = cs.len(); + let mut remove_next = || { + cs.iter() + .filter(|(_, b)| now >= b.expires_at) + .map(|(k, _)| *k) + .next() + .map(|k| cs.remove(&k)) + .map(|_| ()) + }; + + while let Some(()) = remove_next() {} + + let len_after = cs.len(); + if len_before - len_after > 0 { + log!(Block::prune, effects, log::Level::Debug, "Removed {} expired entries. For outbound messages, a prior step SHOULD but MAY NOT retry sending them", len_before - len_after); + } + } + + fn prune_endpoints(&self) { + self.endpoints.map_mut(|es| { + let mut remove_next = || { + es.iter() + .filter(|(_, cs)| cs.is_empty()) + .map(|(k, _)| *k) + .next() + .map(|k| es.remove(&k)) + .map(|_| ()) + }; + + while let Some(()) = remove_next() {} + }); + } + + fn prune(&self, effects: &mut P::Effects, now: Instant) { + // TODO: log + self.endpoints.map_mut(|es| { + es.iter_mut() + .for_each(|(_, c)| self.prune_conversations(effects, now, c)) + }); + self.prune_endpoints(); + } + + fn get_endpoint(&self, addr: SocketAddr, mut f: F) -> Option + where F: FnMut(&mut Conversations) -> R + { + self.endpoints.map_mut(|es| es.get_mut(&addr).map(&mut f)) + } + + fn get_or_create_endpoint(&self, addr: SocketAddr, mut f: F) -> R + where F: FnMut(&mut Conversations) -> R + { + self.endpoints.map_mut(|es| { + if !es.has(&addr) { + es.insert(addr, Conversations::default()).unwrap(); + } + + es.get_mut(&addr).map(&mut f).unwrap() + }) + } + + fn insert(&self, + snap: &Snapshot

, + original: Option<&Message

>, + (addr, token, role, dir): (SocketAddr, Token, Role, Direction)) { + let exp = snap.time + Milliseconds(snap.config.exchange_lifetime_millis()); + self.get_or_create_endpoint(addr, |convs| { + convs.insert((token, role, dir), + Conversation::new(exp, original.cloned())) + .unwrap(); + }); + } + + fn has(&self, (addr, token, role, dir): (SocketAddr, Token, Role, Direction)) -> bool { + self.get_endpoint(addr, |convs| convs.has(&(token, role, dir))) + .unwrap_or(false) + } + + fn map_mut(&self, + (addr, token, role, dir): (SocketAddr, Token, Role, Direction), + mut f: F) + -> Option + where F: FnMut(&mut Conversation) -> R + { + self.get_or_create_endpoint(addr, |conv| conv.get_mut(&(token, role, dir)).map(&mut f)) + } + + fn map(&self, + (addr, token, role, dir): (SocketAddr, Token, Role, Direction), + f: F) + -> Option + where F: FnOnce(&Conversation) -> R + { + let mut f = Some(f); + self.get_or_create_endpoint(addr, |conv| { + conv.get(&(token, role, dir)) + .map(Option::take(&mut f).unwrap()) + }) + } + + fn remove_if_present(&self, (addr, token, role, dir): (SocketAddr, Token, Role, Direction)) { + self.get_endpoint(addr, |convs| { + convs.remove(&(token, role, dir)); + }); + } +} + +impl Step

+ for Block + where P: PlatformTypes, + S: Step>, PollResp = Addrd>>, + Endpoints: core::fmt::Debug + Map, + Conversations: core::fmt::Debug + Map<(Token, Role, Direction), Conversation>, + Pieces: core::fmt::Debug + Map>> +{ + type PollReq = Addrd>; + type PollResp = Addrd>; + type Error = S::Error; + type Inner = S; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + fn poll_req(&self, + snap: &crate::platform::Snapshot

, + effects: &mut P::Effects) + -> super::StepOutput { + self.prune(effects, snap.time); + let mut req = _try!(Option; self.inner().poll_req(snap, effects)); + + macro_rules! respond { + ($code:expr) => {{ + let rep_ty = match req.data().msg().ty { + | Type::Con => Type::Ack, + | _ => Type::Non, + }; + + let rep = Message::

::new(rep_ty, $code, Id(0), req.data().msg().token); + effects.append(Effect::Send(Addrd(rep, req.addr()))); + }}; + } + + let k = (req.addr(), req.data().msg().token, Role::Request, Direction::Inbound); + let has_prev_pieces = self.has(k); + + match req.data().msg().block2() { + | None => { + if has_prev_pieces { + self.map(k, |s: &Conversation<_, _>| { + log!(Block::poll_req, + effects, + log::Level::Warn, + "Expected message {:?} to continue block sequence {:?}", + req.data().msg().token, + s); + }); + self.remove_if_present(k); + respond!(REQUEST_ENTITY_INCOMPLETE); + } + + Some(Ok(req)) + }, + | Some(block) => { + if !has_prev_pieces && block.num() == 0 && block.more() { + self.insert(snap, None, k); + self.map_mut(k, |conv| { + conv.have(snap.time, 0, req.clone().map(|r| r.into()).unwrap()) + }); + respond!(CONTINUE); + Some(Err(nb::Error::WouldBlock)) + } else if !has_prev_pieces && block.num() == 0 && !block.more() { + Some(Ok(req)) + } else if !has_prev_pieces && block.num() > 0 { + respond!(REQUEST_ENTITY_INCOMPLETE); + Some(Err(nb::Error::WouldBlock)) + } else if block.num() + > self.map(k, |conv| conv.biggest_number_seen.map(|n| n + 1)) + .flatten() + .unwrap_or(0) + { + self.remove_if_present(k); + respond!(REQUEST_ENTITY_INCOMPLETE); + Some(Err(nb::Error::WouldBlock)) + } else if block.more() { + self.map_mut(k, |conv| { + conv.have(snap.time, + block.num(), + req.clone().map(|r| r.into()).unwrap()) + }); + respond!(CONTINUE); + Some(Err(nb::Error::WouldBlock)) + } else { + self.map_mut(k, |conv| { + conv.have(snap.time, + block.num(), + req.clone().map(|r| r.into()).unwrap()) + }); + let p = self.map(k, Conversation::assembled).unwrap(); + req.as_mut().msg_mut().payload = p; + self.remove_if_present(k); + Some(Ok(req)) + } + }, + } + } + + fn poll_resp(&self, + snap: &Snapshot

, + effects: &mut P::Effects, + token: Token, + addr: SocketAddr) + -> super::StepOutput { + self.prune(effects, snap.time); + + let mut rep: Addrd> = + _try!(Option; self.inner().poll_resp(snap, effects, token, addr)); + + let k = (rep.addr(), rep.data().msg().token, Role::Response, Direction::Inbound); + let has_prev_pieces = self.has(k); + + macro_rules! request_piece { + ($num:expr) => {{ + let mut new = self.map(k, |conv| conv.original.clone().unwrap()).unwrap(); + + new.set_block1(0, $num, false).ok(); + new.remove(BLOCK2); + new.remove(SIZE2); + + effects.append(Effect::Send(Addrd(new, rep.addr()))); + self.map_mut(k, |conv| conv.waiting(snap.time, $num)); + }}; + } + + match rep.data().msg().block1() { + | None => { + self.remove_if_present(k); + Some(Ok(rep)) + }, + | Some(block) => { + if !has_prev_pieces { + log!(Block::poll_resp, effects, log::Level::Warn, "Response received for token {:?} but we've never seen a request using that token. Ignoring this response despite it having {:?}", rep.data().msg().token, block); + Some(Ok(rep)) + } else { + self.map_mut(k, |conv| { + conv.have(snap.time, + block.num(), + rep.clone().map(|r| r.into()).unwrap()) + }); + if block.more() { + request_piece!(block.num() + 1); + } + + if let Some(missing) = self.map(k, Conversation::get_missing).unwrap() { + request_piece!(missing); + } + + if self.map(k, Conversation::have_all).unwrap() { + rep.as_mut().msg_mut().payload = self.map(k, Conversation::assembled).unwrap(); + rep.as_mut().msg_mut().remove(BLOCK1); + self.remove_if_present(k); + Some(Ok(rep)) + } else { + Some(Err(nb::Error::WouldBlock)) + } + } + }, + } + } + + fn before_message_sent(&self, + snap: &platform::Snapshot

, + effs: &mut P::Effects, + msg: &mut Addrd>) + -> Result<(), Self::Error> { + self.prune(effs, snap.time); + self.inner.before_message_sent(snap, effs, msg)?; + + let block_size: usize = 1024; + + let original_payload = &msg.data().payload().0; + + // TODO: block if 1024 is too big and we got REQUEST_ENTITY_TOO_LARGE + if msg.data().block1().is_none() && original_payload.len() > block_size { + let k = (msg.addr(), msg.data().token, Role::Request, Direction::Outbound); + self.insert(snap, Some(msg.data()), k); + self.map_mut(k, |conv| { + let len = original_payload.len() as f32; + let block_count = (len / block_size as f32).ceil() as u32; + for n in 0..block_count { + let mut msg_block = msg.clone(); + msg_block.as_mut() + .set_block1(1024, n, n == block_count - 1) + .ok(); + let mut p = P::MessagePayload::default(); + p.append_copy(&original_payload[(n as usize) * 1024..(((n as usize) + 1) * 1024)]); + msg_block.as_mut().payload = Payload(p); + conv.have(snap.time, n, msg_block.unwrap()); + } + }) + .unwrap(); + } + Ok(()) + } + + fn on_message_sent(&self, + snap: &platform::Snapshot

, + effs: &mut P::Effects, + msg: &Addrd>) + -> Result<(), Self::Error> { + self.inner.on_message_sent(snap, effs, msg)?; + if msg.data().code.kind() == CodeKind::Request { + self.insert(snap, + Some(msg.data()), + (msg.addr(), msg.data().token, Role::Response, Direction::Inbound)); + } else if msg.data().code.kind() == CodeKind::Response { + // TODO: block outbound responses + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std_alloc::collections::BTreeMap; + use tinyvec::array_vec; + use toad_msg::{Code, ContentFormat, Id, MessageOptions, Type}; + + use super::*; + use crate::net::Addrd; + use crate::resp::code::CONTENT; + use crate::test; + + type Pieces = BTreeMap>; + type Conversations = + BTreeMap<(Token, Role, Direction), super::Conversation>; + type Block = + super::Block, Conversations, Pieces>; + + #[test] + fn block_state_correctly_identifies_missing_pieces() { + let mut e = + Conversation::>> { biggest_number_seen: None, + original: None, + pcs: BTreeMap::new(), + expires_at: Instant::new(1000) }; + e.have(Instant::new(0), 0, ()); + assert_eq!(e.get_missing(), None); + + e.have(Instant::new(0), 1, ()); + assert_eq!(e.get_missing(), None); + + e.waiting(Instant::new(0), 3); + e.waiting(Instant::new(0), 2); + e.waiting(Instant::new(0), 5); + + assert_eq!(e.get_missing(), Some(4)); + e.waiting(Instant::new(0), 4); + + assert_eq!(e.get_missing(), None); + } + + #[test] + fn when_inner_errors_block_should_error() { + type S = test::MockStep<(), Addrd, Addrd, ()>; + let b = Block::::default(); + + b.inner() + .set_poll_req(|_, _, _| Some(Err(nb::Error::Other(())))) + .set_poll_resp(|_, _, _, _, _| Some(Err(nb::Error::Other(())))); + + assert_eq!(b.poll_req(&test::snapshot(), &mut vec![]), + Some(Err(nb::Error::Other(())))); + assert_eq!(b.poll_resp(&test::snapshot(), + &mut vec![], + Token(Default::default()), + test::x.x.x.x(80)), + Some(Err(nb::Error::Other(())))); + } + + #[test] + fn when_recv_response_with_no_block1_this_should_do_nothing() { + type S = test::MockStep<(), Addrd, Addrd, ()>; + let b = Block::::default(); + + b.inner().set_poll_resp(|_, _, _, _, _| { + let msg = toad_msg::alloc::Message::new(Type::Con, + Code::GET, + Id(0), + Token(Default::default())); + Some(Ok(Addrd(Resp::from(msg), test::x.x.x.x(80)))) + }); + + let mut effects = vec![]; + assert!(matches!(b.poll_resp(&test::snapshot(), + &mut effects, + Token(Default::default()), + test::x.x.x.x(80)), + Some(Ok(Addrd(_, _))))); + assert!(effects.is_empty()); + } + + #[test] + fn when_recv_response_with_block1_this_should_ask_for_other_blocks() { + struct TestState { + gave_pieces: Vec, + req: Option>, + last_request_at: std::time::Instant, + } + + #[allow(dead_code)] + struct Addrs { + server: SocketAddr, + client: SocketAddr, + } + + let addrs = Addrs { server: test::x.x.x.x(80), + client: test::x.x.x.x(10) }; + + type S = test::MockStep, Addrd, ()>; + let b = Block::::default(); + + let mut orig_req = test::Message::new(Type::Con, Code::GET, Id(1), Token(array_vec! {1})); + orig_req.set_accept(ContentFormat::Text).ok(); + orig_req.set_path("lipsum").ok(); + + let cache_key = orig_req.cache_key(); + + let mut effects: Vec = vec![]; + + b.on_message_sent(&test::snapshot(), + &mut effects, + &Addrd(orig_req, addrs.server)) + .unwrap(); + + let payload = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do"; + let payload_blocks = || { + payload.bytes().fold(vec![vec![]], |mut b, a| { + let last = b.last_mut().unwrap(); + if last.len() < 16 { + last.push(a); + } else { + b.push(vec![a]); + } + b + }) + }; + + b.inner() + .init(TestState { gave_pieces: vec![], + req: None, + last_request_at: std::time::Instant::now() }) + .set_poll_resp(move |mock, _, _, _, _| { + let blocksize: u16 = 16; + let blocks = payload_blocks(); + + let mut resp = + Resp::from(test::Message::new(Type::Non, Code::new(2, 05), Id(1), Token(array_vec! {1}))); + resp.msg_mut().set_size1(payload.len() as _).ok(); + resp.msg_mut().set_path("lipsum").ok(); + + let request = mock.state.map_ref(|s| s.as_ref().unwrap().req.clone()); + + let requested_piece = request.as_ref() + .and_then(|req| req.data().msg().block1()) + .map(|b| b.num()); + let already_gave_pieces = mock.state + .map_ref(|s| s.as_ref().unwrap().gave_pieces.clone()); + let last_request_at = mock.state + .map_ref(|s| s.as_ref().unwrap().last_request_at.clone()); + let elapsed = std::time::Instant::now().duration_since(last_request_at); + + match requested_piece { + | None if already_gave_pieces.is_empty() => { + resp.set_payload(blocks[0].iter().copied()); + resp.msg_mut().set_block1(blocksize, 0, true).ok(); + mock.state + .map_mut(|s| s.as_mut().unwrap().last_request_at = std::time::Instant::now()); + }, + | None if request.is_none() && elapsed > std::time::Duration::from_secs(1) => { + panic!("timeout") + }, + | None => panic!("Block1 not set on request when client already got a Block1 response"), + | Some(_) if request.map(|r| r.data().msg().cache_key()) != Some(cache_key) => { + panic!("cache_key mismatch!") + }, + | Some(n) + if already_gave_pieces.iter() + .any(|p| Some(*p) == requested_piece) => + { + panic!("block {n} already given") + }, + | Some(n) if n > 3 => panic!("block {n} out of range"), + | Some(n) => { + resp.set_payload(blocks[n as usize].iter().copied()); + resp.msg_mut().set_block1(blocksize, n, n < 3).ok(); + mock.state.map_mut(|s| { + let s = s.as_mut().unwrap(); + s.gave_pieces.push(n); + s.last_request_at = std::time::Instant::now(); + }); + }, + } + + Some(Ok(Addrd(resp, addrs.server))) + }); + + let rep = loop { + let mut reqs = effects.drain(..) + .filter(|e| e.is_send()) + .collect::>(); + match reqs.len() { + | 0 => (), + | 1 => { + let mut req = reqs.remove(0) + .get_send() + .cloned() + .map(|addrd| addrd.map(Req::from)); + b.inner() + .state + .map_mut(|s| s.as_mut().unwrap().req = Option::take(&mut req)); + }, + | _ => panic!("too many outbound messages queued ({:?})", + reqs.iter() + .cloned() + .map(|r| r.get_send() + .as_ref() + .unwrap() + .data() + .block1() + .unwrap() + .num()) + .collect::>()), + } + + match b.poll_resp(&test::snapshot(), + &mut effects, + Token(array_vec! {1}), + addrs.server) + { + | Some(Err(nb::Error::WouldBlock)) => continue, + | Some(Err(nb::Error::Other(e))) => panic!("{e:?}"), + | Some(Ok(rep)) => break rep, + | None => panic!("got None"), + } + }; + + assert_eq!(rep.data().payload().copied().collect::>(), + payload.bytes().collect::>()); + } + + #[test] + fn when_recv_request_with_block2_and_dont_hear_another_for_a_long_time_this_should_prune_state( + ) { + #[derive(Clone, Copy)] + #[allow(dead_code)] + struct Addrs { + server: SocketAddr, + client: SocketAddr, + } + + let addrs = Addrs { server: test::x.x.x.x(80), + client: test::x.x.x.x(10) }; + let addrs: &'static Addrs = unsafe { core::mem::transmute(&addrs) }; + + type S = test::MockStep<(), Addrd, Addrd, ()>; + let b = Block::::default(); + + b.inner().set_poll_req(|_, snap, _| { + if snap.time == Instant::new(0) { + let mut req = + test::Message::new(Type::Non, Code::GET, Id(0), Token(Default::default())); + req.set_block2(128, 0, true).ok(); + Some(Ok(Addrd(Req::from(req), addrs.client))) + } else { + None + } + }); + + let t_0 = test::snapshot(); + let mut t_1 = test::snapshot(); + t_1.time = Instant::new(0) + Milliseconds(t_1.config.exchange_lifetime_millis() - 1); + + let mut t_2 = test::snapshot(); + t_2.time = Instant::new(0) + Milliseconds(t_2.config.exchange_lifetime_millis() + 1); + + assert!(matches!(b.poll_req(&t_0, &mut vec![]).unwrap().unwrap_err(), + nb::Error::WouldBlock)); + assert_eq!(b.get_endpoint(addrs.client, |convs| convs.len()).unwrap(), + 1); + + assert!(matches!(b.poll_req(&t_1, &mut vec![]), None)); + assert_eq!(b.get_endpoint(addrs.client, |convs| convs.len()).unwrap(), + 1); + + assert!(matches!(b.poll_req(&t_2, &mut vec![]), None)); + assert_eq!(b.get_endpoint(addrs.client, |convs| convs.len()), None); + } + + #[test] + fn when_recv_response_with_block2_and_dont_hear_back_for_a_long_time_this_should_prune_state() { + #[derive(Clone, Copy)] + #[allow(dead_code)] + struct Addrs { + server: SocketAddr, + client: SocketAddr, + } + + let addrs = Addrs { server: test::x.x.x.x(80), + client: test::x.x.x.x(10) }; + let addrs: &'static Addrs = unsafe { core::mem::transmute(&addrs) }; + + type S = test::MockStep<(), Addrd, Addrd, ()>; + let b = Block::::default(); + + b.inner().set_poll_resp(|_, snap, _, _, _| { + if snap.time == Instant::new(0) { + let mut rep = + test::Message::new(Type::Con, CONTENT, Id(0), Token(Default::default())); + rep.set_block1(128, 0, true).ok(); + Some(Ok(Addrd(Resp::from(rep), addrs.server))) + } else { + None + } + }); + + let mut effects = vec![]; + let t_0 = test::snapshot(); + let mut t_n1 = test::snapshot(); + t_n1.time = Instant::new(0) + Milliseconds(t_n1.config.exchange_lifetime_millis() - 1); + + let mut t_n2 = test::snapshot(); + t_n2.time = Instant::new(0) + Milliseconds(t_n2.config.exchange_lifetime_millis() + 1); + + let req = test::Message::new(Type::Non, Code::GET, Id(0), Token(Default::default())); + let req = Addrd(req, addrs.server); + + b.on_message_sent(&t_0, &mut effects, &req).unwrap(); + + let rep_0 = b.poll_resp(&t_0, &mut effects, Token(Default::default()), addrs.server) + .unwrap() + .unwrap_err(); + + assert!(matches!(rep_0, nb::Error::WouldBlock)); + + assert_eq!(b.get_endpoint(addrs.server, |convs| convs.len()).unwrap(), + 1); + + b.poll_resp(&t_n1, &mut effects, Token(Default::default()), addrs.server) + .ok_or(()) + .unwrap_err(); + + assert_eq!(b.get_endpoint(addrs.server, |convs| convs.len()).unwrap(), + 1); + + b.poll_resp(&t_n2, &mut effects, Token(Default::default()), addrs.server) + .ok_or(()) + .unwrap_err(); + + assert_eq!(b.get_endpoint(addrs.server, |convs| convs.len()), None); + } + + #[test] + fn when_recv_request_without_block2_this_should_do_nothing() { + #[derive(Clone, Copy)] + #[allow(dead_code)] + struct Addrs { + server: SocketAddr, + client: SocketAddr, + } + + let addrs = Addrs { server: test::x.x.x.x(80), + client: test::x.x.x.x(10) }; + let addrs: &'static Addrs = unsafe { core::mem::transmute(&addrs) }; + + type S = test::MockStep<(), Addrd, Addrd, ()>; + let b = Block::::default(); + + b.inner().set_poll_req(|_, _, _| { + let req = + test::Message::new(Type::Con, Code::POST, Id(0), Token(Default::default())); + Some(Ok(Addrd(Req::from(req), addrs.client))) + }); + + let mut effects = vec![]; + b.poll_req(&test::snapshot(), &mut effects) + .unwrap() + .unwrap(); + + assert!(effects.is_empty()); + } + + #[test] + fn when_recv_request_with_block2_and_recognized_number_this_should_respond_2_31() { + struct TestState { + next_block: u32, + } + + #[derive(Clone, Copy)] + #[allow(dead_code)] + struct Addrs { + server: SocketAddr, + client: SocketAddr, + } + + let addrs = Addrs { server: test::x.x.x.x(80), + client: test::x.x.x.x(10) }; + let addrs: &'static Addrs = unsafe { core::mem::transmute(&addrs) }; + + type S = test::MockStep, Addrd, ()>; + let b = Block::::default(); + + b.inner() + .init(TestState { next_block: 0 }) + .set_poll_req(|mock, _, _| { + let mut req = test::Message::new(Type::Con, Code::POST, Id(0), Token(Default::default())); + let num = mock.state.map_ref(|s| s.as_ref().unwrap().next_block); + req.set_block2(128, num, num < 2).ok(); + req.set_payload(Payload(core::iter::repeat(0u8).take(128).collect())); + + mock.state.map_mut(|s| s.as_mut().unwrap().next_block += 1); + Some(Ok(Addrd(Req::from(req), addrs.client))) + }); + + let mut effects = vec![]; + + // get block 0 + assert_eq!(b.poll_req(&test::snapshot(), &mut effects), + Some(Err(nb::Error::WouldBlock))); + + let resp = effects[0].get_send().unwrap(); + assert_eq!(resp.data().code, Code::new(2, 31)); + effects.clear(); + + // get block 1 + assert_eq!(b.poll_req(&test::snapshot(), &mut effects), + Some(Err(nb::Error::WouldBlock))); + + let resp = effects[0].get_send().unwrap(); + assert_eq!(resp.data().code, Code::new(2, 31)); + effects.clear(); + + // get block 2 + let assembled = b.poll_req(&test::snapshot(), &mut effects); + assert!(matches!(assembled, Some(Ok(_)))); + assert_eq!(assembled.unwrap().unwrap().data().payload().len(), 128 * 3); + assert!(effects.is_empty()); + } + + #[test] + fn when_recv_request_with_block2_and_unrecognized_number_this_should_respond_4_08() { + #[derive(Clone, Copy)] + #[allow(dead_code)] + struct Addrs { + server: SocketAddr, + client: SocketAddr, + } + + let addrs = Addrs { server: test::x.x.x.x(80), + client: test::x.x.x.x(10) }; + let addrs: &'static Addrs = unsafe { core::mem::transmute(&addrs) }; + + type S = test::MockStep<(), Addrd, Addrd, ()>; + let b = Block::::default(); + + b.inner().set_poll_req(|_, _, _| { + let mut req = + test::Message::new(Type::Con, Code::POST, Id(0), Token(Default::default())); + req.set_block2(128, 1, true).ok(); + req.set_payload(Payload(core::iter::repeat(0u8).take(128).collect())); + Some(Ok(Addrd(Req::from(req), addrs.client))) + }); + + let mut effects = vec![]; + assert_eq!(b.poll_req(&test::snapshot(), &mut effects), + Some(Err(nb::Error::WouldBlock))); + + assert!(!effects.is_empty()); + + let resp = effects[0].get_send().unwrap(); + assert_eq!(resp.data().code, Code::new(4, 08)); + } +} diff --git a/toad/src/step/buffer_responses.rs b/toad/src/step/buffer_responses.rs index 563d6a8a..e9d1a8f8 100644 --- a/toad/src/step/buffer_responses.rs +++ b/toad/src/step/buffer_responses.rs @@ -1,7 +1,7 @@ use core::fmt::Write; use no_std_net::SocketAddr; -use toad_array::Array; +use toad_array::{Array, Indexed}; use toad_len::Len; use toad_map::Map; use toad_msg::{Token, Type}; @@ -131,7 +131,7 @@ impl = observe::Observe>, Array>>, - observe::SubHash_TypePathQueryAccept

>; + toad_msg::DefaultCacheKey>; /// Parse -> ProvisionIds -> ProvisionTokens -> Ack -> Retry -> HandleAcks -> BufferResponses -> Observe #[rustfmt::skip] @@ -75,6 +75,48 @@ pub mod runtime { } } +/// # Block-wise transfer +/// * Client Flow ✓ +/// * Server Flow ✓ +/// +/// ## Internal State +/// * Stores inbound chunked messages for reassembly on final piece +/// +/// ## Behavior +/// ### Inbound Response +/// * response had `Block1 {num: 0, size: , more: true}`? +/// * incr num by 1 and send a duplicate of the original request (stripped of any Block1 options) +/// * response had `Block1 {num: , size: , more: false}`? +/// * yield assembled response to client +/// +/// ### Inbound Request +/// _NOTE: this covers entire request-response flow; requestors have to ask for each block from the server._ +/// * request has `Block1 {num: 0, size: , more: true}`? +/// * store first block +/// * request has `Block1 {num: , size: , more: true}`? +/// * if we have not seen `num - 1`, respond `4.08 REQUEST ENTITY INCOMPLETE` +/// * if we have seen `num - 1`, respond `2.31 CONTINUE Block1 {num: , size: , more: true}` +/// * request has `Block1 {num: , size: , more: false}`? +/// * same checks as `more: true` +/// * let server see assembled request +/// * request has `Block2 {num: 0, size: , more: false}`? +/// * response should be blocked into this size, else `1024` +/// * request has `Block2 {num: , size: , more: false}`? +/// * send requested block +/// +/// ### Outbound Request +/// * outbound requests are automatically broken into blocks, default block size 1024 +/// * when server responds with `2.31 CONTINUE Block1 {num: 0, size: , more: true}`, break the remainder of the payload into block size `` and restart the process for each block +/// * when server responds with `4.08 REQUEST ENTITY INCOMPLETE` restart from block 0 +/// * when server responds with `4.13 REQUEST ENTITY TOO LARGE Block1 {num: 0, size: , more: false}`, restart from block 0 with size `` +/// * when server responds with any other status code and `Block1 {num: , size: , more: }`, treat this response as the final response to the original request +/// * if `more` was false and we have more blocks to send, store the response and pretend we didn't receive it until all blocks sent +/// * if at any time we receive `4.08 REQUEST ENTITY INCOMPLETE` discard the stored response +/// +/// ## Transformation +/// None +pub mod block; + /// # Buffer & resend messages until they get a sufficient response /// * Client Flow ✓ /// * Server Flow ✓ @@ -128,8 +170,8 @@ pub mod retry; /// /// ### Then /// this step will issue 2 requests to your server: -/// - Request 1 `GET coap://server/temperature` -/// - Request 2 `GET coap://server/temperature?above=23deg` +/// * Request 1 `GET coap://server/temperature` +/// * Request 2 `GET coap://server/temperature?above=23deg` /// /// The response to request 1 will be sent to clients A, B, and C. The response to request 2 will be sent to client D. pub mod observe; @@ -288,11 +330,11 @@ macro_rules! exec_inner_step { #[macro_export] macro_rules! log { ($at:path, $effs:expr, $lvl:expr, $($arg:tt)*) => {{ - use toad_array::Array; + use toad_array::Indexed; type S = $crate::todo::String::<1000>; let msg = S::fmt(format_args!($($arg)*)); let msg = S::fmt(format_args!("[{}] {}", stringify!($at), msg.as_str())); - $effs.push($crate::platform::Effect::Log($lvl, msg)); + $effs.append($crate::platform::Effect::Log($lvl, msg)); }}; } diff --git a/toad/src/step/observe.rs b/toad/src/step/observe.rs index 94704786..02b04d7e 100644 --- a/toad/src/step/observe.rs +++ b/toad/src/step/observe.rs @@ -3,12 +3,11 @@ use core::hash::{Hash, Hasher}; use core::marker::PhantomData; use no_std_net::SocketAddr; -use toad_array::Array; -use toad_hash::Blake2Hasher; +use toad_array::{Array, Indexed}; use toad_msg::opt::known::observe::Action::{Deregister, Register}; use toad_msg::opt::known::repeat::QUERY; use toad_msg::repeat::PATH; -use toad_msg::{CodeKind, Id, MessageOptions, Token}; +use toad_msg::{CacheKey, CodeKind, DefaultCacheKey, Id, MessageOptions, Token}; use toad_stem::Stem; use super::{log, Step}; @@ -30,126 +29,6 @@ pub mod opt { pub const WAS_CREATED_BY_OBSERVE: OptNumber = OptNumber(65000); } -/// Default hasher used for [`SubscriptionHash`] -/// -/// Hashes: -/// - [Message Type](toad_msg::Message.ty) -/// - [Uri-Path](toad_msg::opt::known::no_repeat::HOST) -/// - [Uri-Query](toad_msg::opt::known::no_repeat::HOST) -/// - [Accept](toad_msg::opt::known::no_repeat::ACCEPT) -#[derive(Debug, Clone)] -#[allow(non_camel_case_types)] -pub struct SubHash_TypePathQueryAccept

(Blake2Hasher, PhantomData

); - -impl

Default for SubHash_TypePathQueryAccept

{ - fn default() -> Self { - Self(Blake2Hasher::new(), PhantomData) - } -} - -impl

SubHash_TypePathQueryAccept

{ - /// Create a new `DefaultSubscriptionHasher` - pub fn new() -> Self { - Self::default() - } -} - -impl

SubscriptionHash

for SubHash_TypePathQueryAccept

where P: PlatformTypes -{ - type Hasher = Blake2Hasher; - - fn hasher(&mut self) -> &mut Self::Hasher { - &mut self.0 - } - - fn subscription_hash(&mut self, sub: &Addrd>) { - let msg = sub.data().msg(); - - msg.ty.hash(&mut self.0); - msg.get(QUERY).into_iter().for_each(|v| { - v.hash(&mut self.0); - }); - msg.accept().hash(&mut self.0); - msg.get(PATH).into_iter().for_each(|v| { - v.hash(&mut self.0); - }); - } -} - -/// Extends [`core::hash::Hash`] with "subscription similarity" -/// used to determine whether similar subscriptions may be grouped together. -/// -/// A default implementation is provided by [`SubHash_TypePathQueryAccept`]. -/// -/// ## Why? -/// When your server [`notify`](super::Step::notify)s the toad runtime -/// that there is a new version of a resource available, all -/// subscriptions matching the path passed to `notify` will be -/// re-sent as new requests to your server. -/// -/// Similar requests (determined by this trait) will be grouped together -/// so that your server only sees 1 request, and the response -/// will be fanned back out to the subscribers. -/// -/// For a more concrete example, see the [module documentation](self). -pub trait SubscriptionHash

- where Self: Sized + Debug, - P: PlatformTypes -{ - /// Type used to generate hashes - type Hasher: Hasher; - - #[allow(missing_docs)] - fn hasher(&mut self) -> &mut Self::Hasher; - - /// Mutate the hasher instance with a subscription - /// - /// To obtain the [`u64`] hash, use [`Hasher::finish`] on [`sub_hash.hasher()`](SubscriptionHash::hasher) - /// - /// ``` - /// use core::hash::Hasher; - /// - /// use toad::net::{ipv4_socketaddr, Addrd}; - /// use toad::platform::toad_msg::Message; - /// use toad::req::Req; - /// use toad::step::observe::{SubHash_TypePathQueryAccept, SubscriptionHash}; - /// use toad_msg::Type::Con; - /// use toad_msg::{Code, Id, Token}; - /// - /// type Std = toad::std::PlatformTypes; - /// - /// let msg_a = Message::::new(Con, Code::GET, Id(1), Token(Default::default())); - /// let req_a = Addrd(Req::::from(msg_a), - /// ipv4_socketaddr([127, 0, 0, 1], 1234)); - /// let mut ha = SubHash_TypePathQueryAccept::new(); - /// ha.subscription_hash(&req_a); - /// - /// let msg_b = Message::::new(Con, Code::GET, Id(2), Token(Default::default())); - /// let req_b = Addrd(Req::::from(msg_b), - /// ipv4_socketaddr([127, 0, 0, 1], 2345)); - /// let mut hb = SubHash_TypePathQueryAccept::new(); - /// hb.subscription_hash(&req_a); - /// - /// assert_eq!(ha.hasher().finish(), hb.hasher().finish()); - /// ``` - fn subscription_hash(&mut self, sub: &Addrd>); -} - -impl SubscriptionHash

for &mut T - where P: PlatformTypes, - T: SubscriptionHash

-{ - type Hasher = T::Hasher; - - fn hasher(&mut self) -> &mut Self::Hasher { - >::hasher(self) - } - - fn subscription_hash(&mut self, sub: &Addrd>) { - >::subscription_hash(self, sub) - } -} - /// An Observe subscription pub struct Sub

where P: PlatformTypes @@ -228,17 +107,17 @@ impl Default for Observe impl Observe { fn hash<'a, P>(sub: &'a Sub

) -> (&'a Sub

, u64) where P: PlatformTypes, - Hasher: SubscriptionHash

+ Default + Hasher: CacheKey + Default { (sub, Self::hash_req(sub.req())) } fn hash_req<'a, P>(sub: &'a Addrd>) -> u64 where P: PlatformTypes, - Hasher: SubscriptionHash

+ Default + Hasher: CacheKey + Default { let mut h = Hasher::default(); - h.subscription_hash(sub); + h.cache_key(sub.data().msg()); h.hasher().finish() } @@ -285,7 +164,7 @@ impl Observe { -> impl 'a + Iterator> where Subs: Array>, P: PlatformTypes, - Hasher: SubscriptionHash

+ Default + Hasher: CacheKey + Default { subs.iter() .filter(move |s| match Self::get(subs, addr, t).map(Self::hash) { @@ -380,7 +259,7 @@ impl Observe { req.data().msg().token); let mut sub = Some(Sub::new(req.clone())); self.subs - .map_mut(move |s| s.push(Option::take(&mut sub).expect("closure only invoked once"))); + .map_mut(move |s| s.append(Option::take(&mut sub).expect("closure only invoked once"))); }, | Some(Deregister) => { log!(Observe::handle_incoming_request, @@ -414,7 +293,7 @@ impl Observe { where P: PlatformTypes, Subs: Array>, RequestQueue: Array>>, - Hasher: SubscriptionHash

+ Default + Hasher: CacheKey + Default { Self::subs_matching_path(subs, path).for_each(|sub| { // TODO: handle option capacity @@ -439,7 +318,7 @@ impl Step

for Observe S: Step>, PollResp = Addrd>>, B: Default + Array>, RQ: Default + Array>>, - H: SubscriptionHash

+ Default + H: CacheKey + Default { type PollReq = Addrd>; type PollResp = Addrd>; @@ -535,7 +414,7 @@ impl Step

for Observe "=> {:?} {:?}", sub.addr(), msg.data().token); - effs.push(Effect::Send(msg.with_addr(sub.addr()))); + effs.append(Effect::Send(msg.with_addr(sub.addr()))); }) }); } else { @@ -576,10 +455,7 @@ mod tests { type Snapshot = crate::platform::Snapshot; type Message = toad_msg::Message; type Sub = super::Sub; - type Observe = super::Observe, - Vec>>, - SubHash_TypePathQueryAccept>; + type Observe = super::Observe, Vec>>, DefaultCacheKey>; type PollReq = Addrd>; type PollResp = Addrd>; @@ -733,66 +609,4 @@ mod tests { (poll_req(_, _) should satisfy { |req| assert!(req.is_none()) }) ] ); - - #[test] - pub fn sub_hash() { - fn req(stuff: F) -> u64 - where F: FnOnce(&mut Message) - { - let mut req = Message::new(Type::Con, Code::GET, Id(1), Token(Default::default())); - stuff(&mut req); - let sub = Sub::new(Addrd(Req::from(req), test::x.x.x.x(0))); - - let mut h = SubHash_TypePathQueryAccept::new(); - h.subscription_hash(sub.req()); - h.hasher().finish() - } - - assert_ne!(req(|r| { - r.set_path("a/b/c").ok(); - }), - req(|_| {})); - assert_eq!(req(|r| { - r.set_path("a/b/c").ok(); - }), - req(|r| { - r.set_path("a/b/c").ok(); - })); - assert_ne!(req(|r| { - r.set_path("a/b/c").ok(); - r.add_query("filter[temp](less_than)=123").ok(); - }), - req(|r| { - r.set_path("a/b/c").ok(); - })); - assert_eq!(req(|r| { - r.set_path("a/b/c").ok(); - r.add_query("filter[temp](less_than)=123").ok(); - }), - req(|r| { - r.set_path("a/b/c").ok(); - r.add_query("filter[temp](less_than)=123").ok(); - r.set_content_format(ContentFormat::Json).ok(); - })); - assert_ne!(req(|r| { - r.set_path("a/b/c").ok(); - r.add_query("filter[temp](less_than)=123").ok(); - r.set_accept(ContentFormat::Json).ok(); - }), - req(|r| { - r.set_path("a/b/c").ok(); - r.add_query("filter[temp](less_than)=123").ok(); - r.set_accept(ContentFormat::Text).ok(); - })); - assert_eq!(req(|r| { - r.set_path("a/b/c").ok(); - r.add_query("filter[temp](less_than)=123").ok(); - r.set_accept(ContentFormat::Json).ok(); - }), - req(|r| { - r.set_path("a/b/c").ok(); - r.add_query("filter[temp](less_than)=123").ok(); - r.set_accept(ContentFormat::Json).ok(); - })); - } } diff --git a/toad/src/step/provision_ids.rs b/toad/src/step/provision_ids.rs index b9d9f9dc..99b691fe 100644 --- a/toad/src/step/provision_ids.rs +++ b/toad/src/step/provision_ids.rs @@ -5,7 +5,7 @@ use embedded_time::duration::Milliseconds; use embedded_time::Instant; use no_std_net::SocketAddr; use tinyvec::ArrayVec; -use toad_array::Array; +use toad_array::{Array, Indexed}; use toad_len::Len; use toad_map::{InsertError, Map}; use toad_msg::Id; @@ -291,7 +291,7 @@ impl ProvisionIds log::Level::Trace, "Saw new {:?}", id); - ids.push(Stamped(IdWithDefault(id), now)); + ids.append(Stamped(IdWithDefault(id), now)); }, } } diff --git a/toad/src/step/retry.rs b/toad/src/step/retry.rs index 0fcc4fec..37be6402 100644 --- a/toad/src/step/retry.rs +++ b/toad/src/step/retry.rs @@ -1,6 +1,6 @@ use embedded_time::duration::Milliseconds; use embedded_time::Instant; -use toad_array::Array; +use toad_array::{Array, Indexed}; use toad_msg::{CodeKind, Token, Type}; use toad_stem::Stem; use toad_string::{format, String}; @@ -71,7 +71,7 @@ pub trait Buf

dbg.msg_short, dbg.msg_should_be, dbg.since_last_attempt); - effects.push(Effect::Send(msg.clone())); + effects.append(Effect::Send(msg.clone())); }, | _ => log!(retry::Buf::attempt_all, effects, @@ -222,10 +222,10 @@ pub trait Buf

let timer = RetryTimer::new(now, config.msg.con.unacked_retry_strategy, config.msg.con.max_attempts); - self.push((State::ConPreAck { timer, - post_ack_strategy: config.msg.con.acked_retry_strategy, - post_ack_max_attempts: config.msg.con.max_attempts }, - msg.clone())); + self.append((State::ConPreAck { timer, + post_ack_strategy: config.msg.con.acked_retry_strategy, + post_ack_max_attempts: config.msg.con.max_attempts }, + msg.clone())); log!(retry::Buf::store_retryables, effects, @@ -244,7 +244,7 @@ pub trait Buf

let timer = RetryTimer::new(now, config.msg.non.retry_strategy, config.msg.non.max_attempts); - self.push((State::Just(timer), msg.clone())); + self.append((State::Just(timer), msg.clone())); Ok(()) }, diff --git a/toad/src/step/set_standard_options.rs b/toad/src/step/set_standard_options.rs index 2bdc04a0..f5b9b1e8 100644 --- a/toad/src/step/set_standard_options.rs +++ b/toad/src/step/set_standard_options.rs @@ -69,32 +69,46 @@ impl Step

for SetStandardOptions msg.as_mut().set_host(bytes.as_str()).ok(); msg.as_mut().set_port(port).ok(); + let payload_len = msg.data().payload().as_bytes().len() as u64; + match msg.data().code.kind() { + | toad_msg::CodeKind::Request => { + msg.as_mut().set_size1(payload_len).ok(); + }, + | toad_msg::CodeKind::Response => { + msg.as_mut().set_size2(payload_len).ok(); + }, + | toad_msg::CodeKind::Empty => (), + } + Ok(()) } } #[cfg(test)] mod test { + use embedded_time::Instant; use tinyvec::array_vec; use toad_msg::Type; use super::*; + use crate::platform::Snapshot; use crate::step::test::test_step; + use crate::test; - type InnerPollReq = Addrd>; - type InnerPollResp = Addrd>; + type InnerPollReq = Addrd>; + type InnerPollResp = Addrd>; - fn test_message(ty: Type) -> Addrd { + fn test_message(ty: Type) -> Addrd { use toad_msg::*; - Addrd(crate::test::Message { ver: Default::default(), - ty, - id: Id(1), - code: Code::new(1, 1), - token: Token(array_vec!(_ => 1)), - payload: Payload(Default::default()), - opts: Default::default() }, - crate::test::dummy_addr()) + Addrd(test::Message { ver: Default::default(), + ty, + id: Id(1), + code: Code::new(1, 1), + token: Token(array_vec!(_ => 1)), + payload: Payload(Default::default()), + opts: Default::default() }, + test::dummy_addr()) } test_step!( @@ -122,4 +136,26 @@ mod test { (poll_resp(_, _, _, _) should satisfy { |out| assert_eq!(out, Some(Err(nb::Error::WouldBlock))) }) ] ); + + #[test] + fn options() { + crate::step::test::dummy_step!({Step}); + let s = SetStandardOptions::::default(); + let snap = Snapshot { time: Instant::new(0), + config: Default::default(), + recvd_dgram: None }; + + let mut req = test::msg!(CON GET x.x.x.x:80); + req.as_mut().payload = toad_msg::Payload("Yabba dabba doo!!".bytes().collect()); + + let mut resp = test::msg!(CON {2 . 04} x.x.x.x:80); + resp.as_mut().payload = + toad_msg::Payload("wacky tobaccy is the smacky holacky".bytes().collect()); + + s.before_message_sent(&snap, &mut vec![], &mut req).unwrap(); + s.before_message_sent(&snap, &mut vec![], &mut resp) + .unwrap(); + assert_eq!(req.data().size1(), Some(17)); + assert_eq!(resp.data().size2(), Some(35)); + } }