diff --git a/libafl/src/events/llmp/mgr.rs b/libafl/src/events/llmp/mgr.rs index 1b76f0813c..63f0f7f5e2 100644 --- a/libafl/src/events/llmp/mgr.rs +++ b/libafl/src/events/llmp/mgr.rs @@ -45,6 +45,9 @@ use crate::{ Error, HasMetadata, }; +/// Default initial capacity of the event buffer - 4KB +const INITIAL_EVENT_BUFFER_SIZE: usize = 1024 * 4; + /// An [`EventManager`] that forwards all events to other attached fuzzers on shared maps or via tcp, /// using low-level message passing, `llmp`. pub struct LlmpEventManager @@ -75,6 +78,7 @@ where should_serialize_cnt: usize, pub(crate) time_ref: Option>, phantom: PhantomData, + event_buffer: Vec, } impl LlmpEventManager<(), NopState, NopShMemProvider> { @@ -165,6 +169,7 @@ impl LlmpEventManagerBuilder { time_ref, phantom: PhantomData, custom_buf_handlers: vec![], + event_buffer: Vec::with_capacity(INITIAL_EVENT_BUFFER_SIZE), }) } @@ -199,6 +204,7 @@ impl LlmpEventManagerBuilder { time_ref, phantom: PhantomData, custom_buf_handlers: vec![], + event_buffer: Vec::with_capacity(INITIAL_EVENT_BUFFER_SIZE), }) } @@ -233,6 +239,7 @@ impl LlmpEventManagerBuilder { time_ref, phantom: PhantomData, custom_buf_handlers: vec![], + event_buffer: Vec::with_capacity(INITIAL_EVENT_BUFFER_SIZE), }) } @@ -265,6 +272,7 @@ impl LlmpEventManagerBuilder { time_ref, phantom: PhantomData, custom_buf_handlers: vec![], + event_buffer: Vec::with_capacity(INITIAL_EVENT_BUFFER_SIZE), }) } } @@ -409,6 +417,7 @@ where + EvaluatorObservers::Input, S> + Evaluator::Input, S>, { + println!("Got event in client: {} from {:?}", event.name(), client_id); if !self.hooks.pre_exec_all(state, client_id, &event)? { return Ok(()); } @@ -512,44 +521,56 @@ where true } } - - #[cfg(feature = "llmp_compression")] fn fire( &mut self, _state: &mut Self::State, event: Event<::Input>, ) -> Result<(), Error> { - let serialized = postcard::to_allocvec(&event)?; + #[cfg(feature = "llmp_compression")] let flags = LLMP_FLAG_INITIALIZED; - match self.compressor.maybe_compress(&serialized) { - Some(comp_buf) => { - self.llmp.send_buf_with_flags( - LLMP_TAG_EVENT_TO_BOTH, - flags | LLMP_FLAG_COMPRESSED, - &comp_buf, - )?; + self.event_buffer.resize(self.event_buffer.capacity(), 0); + + // Serialize the event, reallocating event_buffer if needed + let written_len = match postcard::to_slice(&event, &mut self.event_buffer) { + Ok(written) => written.len(), + Err(postcard::Error::SerializeBufferFull) => { + let serialized = postcard::to_allocvec(&event)?; + self.event_buffer = serialized; + self.event_buffer.len() } - None => { - self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?; + Err(e) => return Err(Error::from(e)), + }; + + #[cfg(feature = "llmp_compression")] + { + match self + .compressor + .maybe_compress(&self.event_buffer[..written_len]) + { + Some(comp_buf) => { + self.llmp.send_buf_with_flags( + LLMP_TAG_EVENT_TO_BOTH, + flags | LLMP_FLAG_COMPRESSED, + &comp_buf, + )?; + } + None => { + self.llmp + .send_buf(LLMP_TAG_EVENT_TO_BOTH, &self.event_buffer[..written_len])?; + } } } - self.last_sent = current_time(); - Ok(()) - } + #[cfg(not(feature = "llmp_compression"))] + { + self.llmp + .send_buf(LLMP_TAG_EVENT_TO_BOTH, &self.event_buffer[..written_len]); + } - #[cfg(not(feature = "llmp_compression"))] - fn fire( - &mut self, - _state: &mut Self::State, - event: Event<::Input>, - ) -> Result<(), Error> { - let serialized = postcard::to_allocvec(&event)?; - self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?; + self.last_sent = current_time(); Ok(()) } - fn serialize_observers(&mut self, observers: &OT) -> Result>, Error> where OT: ObserversTuple + Serialize,