Skip to content

Commit

Permalink
protocols/gossipsub: Add IWANT and memcache misses metrics (#2518)
Browse files Browse the repository at this point in the history
Co-authored-by: Max Inden <[email protected]>
  • Loading branch information
AgeManning and mxinden authored Feb 16, 2022
1 parent 80bbb62 commit 9f1114d
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,9 @@ where
"Message not in cache. Ignoring forwarding. Message Id: {}",
msg_id
);
if let Some(metrics) = self.metrics.as_mut() {
metrics.memcache_miss();
}
return Ok(false);
}
};
Expand Down Expand Up @@ -1214,8 +1217,7 @@ where

trace!("Handling IHAVE for peer: {:?}", peer_id);

// use a hashmap to avoid duplicates efficiently
let mut iwant_ids = HashMap::new();
let mut iwant_ids = HashSet::new();

for (topic, ids) in ihave_msgs {
// only process the message if we are subscribed
Expand All @@ -1236,7 +1238,12 @@ where
.unwrap_or(true)
{
// have not seen this message and are not currently requesting it
iwant_ids.insert(id, topic.clone());
if iwant_ids.insert(id) {
// Register the IWANT metric
if let Some(metrics) = self.metrics.as_mut() {
metrics.register_iwant(&topic);
}
}
}
}
}
Expand All @@ -1258,37 +1265,37 @@ where
);

// Ask in random order
let mut iwant_ids_vec: Vec<_> = iwant_ids.keys().collect();
let mut iwant_ids_vec: Vec<_> = iwant_ids.into_iter().collect();
let mut rng = thread_rng();
iwant_ids_vec.partial_shuffle(&mut rng, iask as usize);

iwant_ids_vec.truncate(iask as usize);
*iasked += iask;

let mut message_ids = Vec::new();
for message_id in iwant_ids_vec {
for message_id in &iwant_ids_vec {
// Add all messages to the pending list
self.pending_iwant_msgs.insert(message_id.clone());
message_ids.push(message_id.clone());
}

if let Some((_, _, _, gossip_promises)) = &mut self.peer_score {
gossip_promises.add_promise(
*peer_id,
&message_ids,
&iwant_ids_vec,
Instant::now() + self.config.iwant_followup_time(),
);
}
trace!(
"IHAVE: Asking for the following messages from {}: {:?}",
peer_id,
message_ids
iwant_ids_vec
);

Self::control_pool_add(
&mut self.control_pool,
*peer_id,
GossipsubControlAction::IWant { message_ids },
GossipsubControlAction::IWant {
message_ids: iwant_ids_vec,
},
);
}
trace!("Completed IHAVE handling for peer: {:?}", peer_id);
Expand Down

0 comments on commit 9f1114d

Please sign in to comment.