Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cakekindel committed May 11, 2023
1 parent ec53758 commit 1d11d08
Showing 1 changed file with 41 additions and 25 deletions.
66 changes: 41 additions & 25 deletions toad/src/step/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,8 @@ impl<P, Pieces> Conversation<P, Pieces>
Pieces: Default
{
/// Create a new [`Conversation`] tracking a Blocked response to a sent request (param `msg`)
pub fn expect_response(expires_at: Instant<P::Clock>, msg: Message<P>) -> Self {
Self { original: Some(msg),
biggest_number_seen: None,
pcs: Default::default(),
expires_at }
}

/// Create a new [`Conversation`] tracking a received Blocked request
pub fn request(expires_at: Instant<P::Clock>) -> Self {
Self { original: None,
pub fn new(expires_at: Instant<P::Clock>, original: Option<Message<P>>) -> Self {
Self { original,
biggest_number_seen: None,
pcs: Default::default(),
expires_at }
Expand Down Expand Up @@ -280,21 +272,13 @@ impl<P, S, Endpoints, Conversations, Pieces> Block<P, S, Endpoints, Conversation
})
}

fn expect_response(&self, snap: &Snapshot<P>, addr: SocketAddr, req: &Message<P>) {
let exp = snap.time + Milliseconds(snap.config.exchange_lifetime_millis());
self.get_or_create_endpoint(addr, |convs| {
convs.insert((req.token, Role::Response, Direction::Inbound),
Conversation::expect_response(exp, req.clone()))
.unwrap();
});
}

fn insert_request(&self,
fn insert(&self,
snap: &Snapshot<P>,
original: Option<&Message<P>>,
(addr, token, role, dir): (SocketAddr, Token, Role, Direction)) {
let exp = snap.time + Milliseconds(snap.config.exchange_lifetime_millis());
self.get_or_create_endpoint(addr, |convs| {
convs.insert((token, role, dir), Conversation::request(exp))
convs.insert((token, role, dir), Conversation::new(exp, original.cloned()))
.unwrap();
});
}
Expand Down Expand Up @@ -391,7 +375,7 @@ impl<P, S, Endpoints, Conversations, Pieces> Step<P>
},
| Some(block) => {
if !has_prev_pieces && block.num() == 0 && block.more() {
self.insert_request(snap, k);
self.insert(snap, None, k);
self.map_mut(k, |conv| {
conv.have(snap.time, 0, req.clone().map(|r| r.into()).unwrap())
});
Expand Down Expand Up @@ -467,7 +451,7 @@ impl<P, S, Endpoints, Conversations, Pieces> Step<P>
},
| Some(block) => {
if !has_prev_pieces {
// TODO: warn
log!(Block::poll_resp, effects, log::Level::Warn, "Response received for token {:?} but we've never seen a request using that token. Ignoring this response despite it having {:?}", rep.data().msg().token, block);
Some(Ok(rep))
} else {
self.map_mut(k, |conv| {
Expand Down Expand Up @@ -496,16 +480,48 @@ impl<P, S, Endpoints, Conversations, Pieces> Step<P>
}
}

fn before_message_sent(&self,
snap: &platform::Snapshot<P>,
effs: &mut P::Effects,
msg: &mut Addrd<Message<P>>)
-> Result<(), Self::Error> {
self.prune(effs, snap.time);
self.inner.before_message_sent(snap, effs, msg)?;

let block_size: usize = 1024;

let original_payload = msg.data().payload().0;

// TODO: block if 1024 is too big and we got REQUEST_ENTITY_TOO_LARGE
if msg.data().block1().is_none() && original_payload.len() > block_size {
let k = (msg.addr(), msg.data().token, Role::Request, Direction::Outbound);
self.insert(snap, Some(msg.data()), k);
self.map_mut(k, |conv| {
let len = original_payload.len() as f32;
let block_count = (len / block_size as f32).ceil() as u32;
for n in 0..block_count {
let mut msg_block = msg.clone();
msg_block.as_mut().set_block1(1024, n, n == block_count - 1).ok();
let mut p = P::MessagePayload::default();
p.append_copy(original_payload[n * 1024..((n + 1) * 1024)]);
msg_block.as_mut().payload = Payload(p);
conv.have(snap.time, n, msg_block.unwrap());
}
}).unwrap();
}
Ok(())
}

fn on_message_sent(&self,
snap: &platform::Snapshot<P>,
effs: &mut P::Effects,
msg: &Addrd<Message<P>>)
-> Result<(), Self::Error> {
self.prune(effs, snap.time);
self.inner.on_message_sent(snap, effs, msg)?;
if msg.data().code.kind() == CodeKind::Request {
self.expect_response(snap, msg.addr(), msg.data());
self.insert(snap, Some(msg.data()), (msg.addr(), msg.data().token, Role::Response, Direction::Inbound));
} else if msg.data().code.kind() == CodeKind::Response {
// TODO: block outbound responses
}

Ok(())
Expand Down

0 comments on commit 1d11d08

Please sign in to comment.