From cc80fc48427195796d4bd8cbbdb7efcbcffca28e Mon Sep 17 00:00:00 2001 From: Jintu Das Date: Mon, 16 Sep 2024 13:55:10 +0530 Subject: [PATCH 01/10] feat: add payload size limit to SnOpts type and update Sn function to use the provided limit --- shardus_net/src/lib.rs | 14 +++++------ shardus_net/src/shardus_net_listener.rs | 32 +++++++++++++++---------- src/index.ts | 11 ++++++++- src/types.ts | 1 + test/setup_sender.ts | 3 +++ test/test_bombardment.ts | 2 ++ test/test_headers.ts | 1 + test/test_lru.ts | 1 + test/test_server.js | 1 + 9 files changed, 45 insertions(+), 21 deletions(-) diff --git a/shardus_net/src/lib.rs b/shardus_net/src/lib.rs index 3b01f5f..118ed57 100644 --- a/shardus_net/src/lib.rs +++ b/shardus_net/src/lib.rs @@ -40,8 +40,8 @@ use crate::shardus_net_sender::Connection; const ENABLE_COMPRESSION: bool = false; const HEADER_SIZE_LIMIT_IN_BYTES: usize = 2 * 1024; // 2KB const PAYLOAD_SIZE_LIMIT_IN_BYTES: usize = 2 * 1024 * 1024; // 2MB -const SIGNATURE_SIZE_LIMIT_IN_BYTES: usize = 96; -const OWNER_SIZE_LIMIT_IN_BYTES: usize = 32; +const SIGNATURE_SIZE_LIMIT_IN_BYTES: usize = 96; // 96 bytes +const OWNER_SIZE_LIMIT_IN_BYTES: usize = 32; // 32 bytes fn create_shardus_net(mut cx: FunctionContext) -> JsResult { let cx = &mut cx; @@ -52,13 +52,13 @@ fn create_shardus_net(mut cx: FunctionContext) -> JsResult { let use_lru = cx.argument::(2)?.value(cx); let lru_size = cx.argument::(3)?.value(cx); let hash_key = cx.argument::(4)?.value(cx); + let hex_signing_sk = cx.argument::(5)?.value(cx); + let payload_size_limit = cx.argument::(6)?.value(cx) as usize; shardus_crypto::initialize_shardus_crypto_instance(&hash_key); - let hex_signing_sk = cx.argument::(5)?.value(cx); let key_pair = shardus_crypto::get_shardus_crypto_instance().get_key_pair_using_sk(&crypto::HexStringOrBuffer::Hex(hex_signing_sk)); - - let shardus_net_listener = create_shardus_net_listener(cx, port, host)?; + let shardus_net_listener = create_shardus_net_listener(cx, port, host, payload_size_limit)?; let shardus_net_sender = create_shardus_net_sender(use_lru, NonZeroUsize::new(lru_size as usize).unwrap(), key_pair); let (stats, stats_incrementers) = Stats::new(); let shardus_net_listener = cx.boxed(shardus_net_listener); @@ -419,11 +419,11 @@ fn evict_socket(mut cx: FunctionContext) -> JsResult { } } -fn create_shardus_net_listener(cx: &mut FunctionContext, port: f64, host: String) -> Result, Throw> { +fn create_shardus_net_listener(cx: &mut FunctionContext, port: f64, host: String, payload_size_limit: usize) -> Result, Throw> { // @TODO: Verify that a javascript number properly converts here without loss. let address = (host, port as u16); - let shardus_net = ShardusNetListener::new(address); + let shardus_net = ShardusNetListener::new(address, payload_size_limit); match shardus_net { Ok(net) => Ok(Arc::new(net)), diff --git a/shardus_net/src/shardus_net_listener.rs b/shardus_net/src/shardus_net_listener.rs index 527842d..dac40ac 100644 --- a/shardus_net/src/shardus_net_listener.rs +++ b/shardus_net/src/shardus_net_listener.rs @@ -1,7 +1,7 @@ use crate::header::header_types::RequestMetadata; use crate::header_factory::header_deserialize_factory; use crate::message::Message; -use crate::{shardus_crypto, HEADER_SIZE_LIMIT_IN_BYTES, PAYLOAD_SIZE_LIMIT_IN_BYTES}; +use crate::{shardus_crypto, HEADER_SIZE_LIMIT_IN_BYTES}; use super::runtime::RUNTIME; @@ -18,6 +18,7 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; pub struct ShardusNetListener { address: SocketAddr, + payload_size_limit: usize, } #[derive(Error, Debug)] @@ -34,31 +35,31 @@ pub enum ListenerError { type ListenerResult = Result; impl ShardusNetListener { - pub fn new(address: A) -> Result { + pub fn new(address: A, payload_size_limit: usize) -> Result { let mut addresses = address.to_socket_addrs().map_err(|_| ())?; let address = addresses.next().ok_or(())?; - Ok(Self { address }) + Ok(Self { address, payload_size_limit }) } pub fn listen(&self) -> UnboundedReceiver<(String, SocketAddr, Option)> { - Self::spawn_listener(self.address) + Self::spawn_listener(self.address, self.payload_size_limit) } - fn spawn_listener(address: SocketAddr) -> UnboundedReceiver<(String, SocketAddr, Option)> { + fn spawn_listener(address: SocketAddr, payload_size_limit: usize) -> UnboundedReceiver<(String, SocketAddr, Option)> { let (tx, rx) = unbounded_channel(); - RUNTIME.spawn(Self::bind_to_socket(address, tx)); + RUNTIME.spawn(Self::bind_to_socket(address, tx, payload_size_limit)); rx } - async fn bind_to_socket(address: SocketAddr, tx: UnboundedSender<(String, SocketAddr, Option)>) { + async fn bind_to_socket(address: SocketAddr, tx: UnboundedSender<(String, SocketAddr, Option)>, payload_size_limit: usize) { loop { let listener = TcpListener::bind(address).await; match listener { Ok(listener) => { let tx = tx.clone(); - match Self::accept_connections(listener, tx).await { + match Self::accept_connections(listener, tx, payload_size_limit).await { Ok(_) => unreachable!(), Err(err) => { error!("Failed to accept connection to {} due to {}", address, err) @@ -72,13 +73,13 @@ impl ShardusNetListener { } } - async fn accept_connections(listener: TcpListener, received_msg_tx: UnboundedSender<(String, SocketAddr, Option)>) -> std::io::Result<()> { + async fn accept_connections(listener: TcpListener, received_msg_tx: UnboundedSender<(String, SocketAddr, Option)>, payload_size_limit: usize) -> std::io::Result<()> { loop { let (socket, remote_addr) = listener.accept().await?; let received_msg_tx = received_msg_tx.clone(); RUNTIME.spawn(async move { - let result = Self::receive(socket, remote_addr, received_msg_tx).await; + let result = Self::receive(socket, remote_addr, received_msg_tx, payload_size_limit).await; match result { Ok(_) => info!("Connection safely completed and shutdown with {}", remote_addr), Err(err) => { @@ -89,11 +90,16 @@ impl ShardusNetListener { } } - async fn receive(socket_stream: TcpStream, remote_addr: SocketAddr, received_msg_tx: UnboundedSender<(String, SocketAddr, Option)>) -> ListenerResult<()> { + async fn receive( + socket_stream: TcpStream, + remote_addr: SocketAddr, + received_msg_tx: UnboundedSender<(String, SocketAddr, Option)>, + payload_size_limit: usize, + ) -> ListenerResult<()> { let mut socket_stream: TcpStream = socket_stream; while let Ok(msg_len) = socket_stream.read_u32().await { - if (msg_len as usize) > PAYLOAD_SIZE_LIMIT_IN_BYTES { - error!("Message length exceeds the limit of 2MB"); + if (msg_len as usize) > payload_size_limit { + error!("Message length exceeds the limit of {} bytes", payload_size_limit); continue; } diff --git a/src/index.ts b/src/index.ts index f8a1752..14b2cb2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -49,12 +49,21 @@ export const Sn = (opts: SnOpts) => { const LRU_SIZE = (opts.senderOpts && opts.senderOpts.lruSize) || 1028 const HASH_KEY = opts.crypto.hashKey const SIGNING_SECRET_KEY_HEX = opts.crypto.signingSecretKeyHex + const PAYLOAD_SIZE_LIMIT = opts.payloadSizeLimitInBytes || 2 * 1024 * 1024 // 2MB const HEADER_OPTS = opts.headerOpts || { sendHeaderVersion: 0, } - const _net = net.Sn(PORT, ADDRESS, USE_LRU_CACHE, LRU_SIZE, HASH_KEY, SIGNING_SECRET_KEY_HEX) + const _net = net.Sn( + PORT, + ADDRESS, + USE_LRU_CACHE, + LRU_SIZE, + HASH_KEY, + SIGNING_SECRET_KEY_HEX, + PAYLOAD_SIZE_LIMIT + ) net.setLoggingEnabled(false) diff --git a/src/types.ts b/src/types.ts index e5ce3fb..4c4c54e 100644 --- a/src/types.ts +++ b/src/types.ts @@ -60,6 +60,7 @@ export type SnOpts = { hashKey: string signingSecretKeyHex: string } + payloadSizeLimitInBytes?: number } /** diff --git a/test/setup_sender.ts b/test/setup_sender.ts index 04dfa4a..f8c79e2 100644 --- a/test/setup_sender.ts +++ b/test/setup_sender.ts @@ -2,6 +2,7 @@ import { Sn } from '../.' const RESPONSE_DELAY_MILLIS = 500 const USE_LRU_CACHE = true +const PAYLOAD_SIZE_LIMIT = 2 * 1024 * 1024 const setupLruSender = () => { const port = 49152 @@ -17,6 +18,7 @@ const setupLruSender = () => { 'c3774b92cc8850fb4026b073081290b82cab3c0f66cac250b4d710ee9aaf83ed8088b37f6f458104515ae18c2a05bde890199322f62ab5114d20c77bde5e6c9d', hashKey: '69fa4195670576c0160d660c3be36556ff8d504725be8a59b5a96509e0c994bc', }, + payloadSizeLimitInBytes: PAYLOAD_SIZE_LIMIT, }) } else { return Sn({ @@ -26,6 +28,7 @@ const setupLruSender = () => { 'c3774b92cc8850fb4026b073081290b82cab3c0f66cac250b4d710ee9aaf83ed8088b37f6f458104515ae18c2a05bde890199322f62ab5114d20c77bde5e6c9d', hashKey: '69fa4195670576c0160d660c3be36556ff8d504725be8a59b5a96509e0c994bc', }, + payloadSizeLimitInBytes: PAYLOAD_SIZE_LIMIT, }) } } diff --git a/test/test_bombardment.ts b/test/test_bombardment.ts index 0d9a876..00b50b3 100644 --- a/test/test_bombardment.ts +++ b/test/test_bombardment.ts @@ -10,6 +10,7 @@ const TARGET_SOCKET_PORT = 49152 // Internal port of the validator to be bombard const MESSAGE_JSON = { route: 'bombardment-test', payload: 'Hello, world!' } // Message to be sent to the validator const RAMP_UP_STRATEGY: 'linear' | 'none' = 'none' // Ramp up strategy to be used for the bombardment const RAMP_UP_EVERY_X_BOMBS = 10 // Number of bombs to be sent before ramping up the number of socket clients +const PAYLOAD_SIZE_LIMIT = 2 * 1024 * 1024 // Payload size limit in bytes // Test variables @@ -33,6 +34,7 @@ function setupSocketClients() { 'c3774b92cc8850fb4026b073081290b82cab3c0f66cac250b4d710ee9aaf83ed8088b37f6f458104515ae18c2a05bde890199322f62ab5114d20c77bde5e6c9d', hashKey: '69fa4195670576c0160d660c3be36556ff8d504725be8a59b5a96509e0c994bc', }, + payloadSizeLimitInBytes: PAYLOAD_SIZE_LIMIT, }) ) } diff --git a/test/test_headers.ts b/test/test_headers.ts index ab74eaf..128b47f 100644 --- a/test/test_headers.ts +++ b/test/test_headers.ts @@ -11,6 +11,7 @@ const setupSender = (port: number, senderOpts: any, headerOpts: any) => { 'c3774b92cc8850fb4026b073081290b82cab3c0f66cac250b4d710ee9aaf83ed8088b37f6f458104515ae18c2a05bde890199322f62ab5114d20c77bde5e6c9d', hashKey: '69fa4195670576c0160d660c3be36556ff8d504725be8a59b5a96509e0c994bc', }, + payloadSizeLimitInBytes: 2 * 1024 * 1024, }) } diff --git a/test/test_lru.ts b/test/test_lru.ts index 46f0ed5..857dc13 100644 --- a/test/test_lru.ts +++ b/test/test_lru.ts @@ -18,6 +18,7 @@ const setupLruSender = (port: number, lruSize: number) => { headerOpts: { sendHeaderVersion: 1, }, + payloadSizeLimitInBytes: 3 * 1024 * 1024, }) } diff --git a/test/test_server.js b/test/test_server.js index f6cf2c7..4a39064 100644 --- a/test/test_server.js +++ b/test/test_server.js @@ -7,6 +7,7 @@ const port = 5001 const sn = Sn({ address, port, + payloadSizeLimitInBytes: 2 * 1024 * 1024, }) const RESPONSE_DELAY_MILLIS = 1000 From 9d454a444f298bba8d2d38accc5e7f69ba31fccf Mon Sep 17 00:00:00 2001 From: Jintu Das Date: Fri, 29 Nov 2024 18:23:43 +0530 Subject: [PATCH 02/10] Add additional network param as shardus config --- shardus_net/src/header/header_v1.rs | 41 ++++++++++++++------- shardus_net/src/header_factory.rs | 5 ++- shardus_net/src/lib.rs | 28 ++++++++++---- shardus_net/src/message.rs | 35 ++++++++++++------ shardus_net/src/shardus_net_listener.rs | 49 ++++++++++--------------- src/index.ts | 8 +++- src/types.ts | 41 +++++++++++++++++---- test/test_multi_send.ts | 42 +++++++++++++++++---- 8 files changed, 169 insertions(+), 80 deletions(-) diff --git a/shardus_net/src/header/header_v1.rs b/shardus_net/src/header/header_v1.rs index a29b7f8..a24d6d2 100644 --- a/shardus_net/src/header/header_v1.rs +++ b/shardus_net/src/header/header_v1.rs @@ -6,7 +6,8 @@ extern crate serde_json; use crate::compression::Compression; use serde::Deserialize; -use crate::{check_variable_size, HEADER_SIZE_LIMIT_IN_BYTES}; +use crate::check_variable_size; +use crate::NetConfig; #[derive(Deserialize)] pub struct HeaderV1 { @@ -59,7 +60,7 @@ impl HeaderV1 { } // Deserialize a Vec cursor into a HeaderV1 struct - pub fn deserialize(cursor: &mut Cursor>) -> Option { + pub fn deserialize(cursor: &mut Cursor>, net_config: &NetConfig) -> Option { // Deserialize uuid let mut uuid_bytes = [0u8; 16]; cursor.read_exact(&mut uuid_bytes).ok()?; @@ -83,7 +84,7 @@ impl HeaderV1 { let mut tracker_id_len_bytes = [0u8; 4]; cursor.read_exact(&mut tracker_id_len_bytes).ok()?; let tracker_id_len = u32::from_le_bytes(tracker_id_len_bytes); - check_variable_size(tracker_id_len, HEADER_SIZE_LIMIT_IN_BYTES); + check_variable_size(tracker_id_len, net_config.header_size_limit); let mut tracker_id_bytes = vec![0u8; tracker_id_len as usize]; cursor.read_exact(&mut tracker_id_bytes).ok()?; let tracker_id = String::from_utf8(tracker_id_bytes).ok()?; @@ -92,7 +93,7 @@ impl HeaderV1 { let mut verification_data_len_bytes = [0u8; 4]; cursor.read_exact(&mut verification_data_len_bytes).ok()?; let verification_data_len = u32::from_le_bytes(verification_data_len_bytes); - check_variable_size(verification_data_len, HEADER_SIZE_LIMIT_IN_BYTES); + check_variable_size(verification_data_len, net_config.header_size_limit); let mut verification_data_bytes = vec![0u8; verification_data_len as usize]; cursor.read_exact(&mut verification_data_bytes).ok()?; let verification_data = String::from_utf8(verification_data_bytes).ok()?; @@ -146,10 +147,15 @@ mod tests { verification_data: "verification_data_1".to_string(), compression: Compression::None, }; - + let net_config = NetConfig { + header_size_limit: 2 * 1024, + signature_size_limit: 96, + owner_size_limit: 32, + payload_size_limit: 2 * 1024 * 1024, + }; let serialized = header.serialize(); let mut cursor = Cursor::new(serialized); - let deserialized = HeaderV1::deserialize(&mut cursor).unwrap(); + let deserialized = HeaderV1::deserialize(&mut cursor, &net_config).unwrap(); assert_eq!(header.uuid, deserialized.uuid); assert_eq!(header.message_length, deserialized.message_length); @@ -194,23 +200,30 @@ mod tests { #[test] #[should_panic(expected = "variable_len exceeds the limit")] fn test_check_variable_size_panic() { - use crate::HEADER_SIZE_LIMIT_IN_BYTES; - + let net_config = NetConfig { + header_size_limit: 2 * 1024, + signature_size_limit: 96, + owner_size_limit: 32, + payload_size_limit: 2 * 1024 * 1024, + }; // Define a variable length that exceeds the limit - let oversized_length = HEADER_SIZE_LIMIT_IN_BYTES as u32 + 1; + let oversized_length = net_config.header_size_limit as u32 + 1; // Call the function, expecting it to panic - check_variable_size(oversized_length, HEADER_SIZE_LIMIT_IN_BYTES); + check_variable_size(oversized_length, net_config.header_size_limit); } #[test] fn test_check_variable_size_no_panic() { - use crate::HEADER_SIZE_LIMIT_IN_BYTES; - + let net_config = NetConfig { + header_size_limit: 2 * 1024, + signature_size_limit: 96, + owner_size_limit: 32, + payload_size_limit: 2 * 1024 * 1024, + }; // Define a variable length within the limit : 2048 (0x800) let valid_length = 0x799; - // Call the function, ensuring it does not panic - check_variable_size(valid_length, HEADER_SIZE_LIMIT_IN_BYTES); + check_variable_size(valid_length, net_config.header_size_limit); } } diff --git a/shardus_net/src/header_factory.rs b/shardus_net/src/header_factory.rs index 5cc091a..3db95ff 100644 --- a/shardus_net/src/header_factory.rs +++ b/shardus_net/src/header_factory.rs @@ -2,6 +2,7 @@ use std::io::Cursor; use crate::header::header_types::Header; use crate::header::header_v1::HeaderV1; +use crate::NetConfig; pub fn wrap_serialized_message(mut serialized_message: Vec) -> Vec { let mut buffer = Vec::new(); @@ -10,10 +11,10 @@ pub fn wrap_serialized_message(mut serialized_message: Vec) -> Vec { buffer } -pub fn header_deserialize_factory(version: u8, serialized_header_cursor: &mut Cursor>) -> Option
{ +pub fn header_deserialize_factory(version: u8, serialized_header_cursor: &mut Cursor>, net_config: &NetConfig) -> Option
{ match version { 1 => { - let deserialized = HeaderV1::deserialize(serialized_header_cursor)?; + let deserialized = HeaderV1::deserialize(serialized_header_cursor, &net_config)?; Some(Header::V1(deserialized)) } _ => None, diff --git a/shardus_net/src/lib.rs b/shardus_net/src/lib.rs index 118ed57..6521f8e 100644 --- a/shardus_net/src/lib.rs +++ b/shardus_net/src/lib.rs @@ -38,10 +38,14 @@ use tokio::sync::Mutex; use crate::shardus_net_sender::Connection; const ENABLE_COMPRESSION: bool = false; -const HEADER_SIZE_LIMIT_IN_BYTES: usize = 2 * 1024; // 2KB -const PAYLOAD_SIZE_LIMIT_IN_BYTES: usize = 2 * 1024 * 1024; // 2MB -const SIGNATURE_SIZE_LIMIT_IN_BYTES: usize = 96; // 96 bytes -const OWNER_SIZE_LIMIT_IN_BYTES: usize = 32; // 32 bytes + +#[derive(Clone)] +pub struct NetConfig { + pub header_size_limit: usize, + pub signature_size_limit: usize, + pub owner_size_limit: usize, + pub payload_size_limit: usize, +} fn create_shardus_net(mut cx: FunctionContext) -> JsResult { let cx = &mut cx; @@ -54,11 +58,21 @@ fn create_shardus_net(mut cx: FunctionContext) -> JsResult { let hash_key = cx.argument::(4)?.value(cx); let hex_signing_sk = cx.argument::(5)?.value(cx); let payload_size_limit = cx.argument::(6)?.value(cx) as usize; + let header_size_limit = cx.argument::(7)?.value(cx) as usize; + let signature_size_limit = cx.argument::(8)?.value(cx) as usize; + let owner_size_limit = cx.argument::(9)?.value(cx) as usize; + + let net_config = NetConfig { + header_size_limit, + signature_size_limit, + owner_size_limit, + payload_size_limit, + }; shardus_crypto::initialize_shardus_crypto_instance(&hash_key); let key_pair = shardus_crypto::get_shardus_crypto_instance().get_key_pair_using_sk(&crypto::HexStringOrBuffer::Hex(hex_signing_sk)); - let shardus_net_listener = create_shardus_net_listener(cx, port, host, payload_size_limit)?; + let shardus_net_listener = create_shardus_net_listener(cx, port, host, net_config)?; let shardus_net_sender = create_shardus_net_sender(use_lru, NonZeroUsize::new(lru_size as usize).unwrap(), key_pair); let (stats, stats_incrementers) = Stats::new(); let shardus_net_listener = cx.boxed(shardus_net_listener); @@ -419,11 +433,11 @@ fn evict_socket(mut cx: FunctionContext) -> JsResult { } } -fn create_shardus_net_listener(cx: &mut FunctionContext, port: f64, host: String, payload_size_limit: usize) -> Result, Throw> { +fn create_shardus_net_listener(cx: &mut FunctionContext, port: f64, host: String, net_config: NetConfig) -> Result, Throw> { // @TODO: Verify that a javascript number properly converts here without loss. let address = (host, port as u16); - let shardus_net = ShardusNetListener::new(address, payload_size_limit); + let shardus_net = ShardusNetListener::new(address, net_config); match shardus_net { Ok(net) => Ok(Arc::new(net)), diff --git a/shardus_net/src/message.rs b/shardus_net/src/message.rs index b9f6118..39db853 100644 --- a/shardus_net/src/message.rs +++ b/shardus_net/src/message.rs @@ -1,6 +1,7 @@ use std::io::{Cursor, Read, Write}; -use crate::{check_variable_size, HEADER_SIZE_LIMIT_IN_BYTES, OWNER_SIZE_LIMIT_IN_BYTES, PAYLOAD_SIZE_LIMIT_IN_BYTES, SIGNATURE_SIZE_LIMIT_IN_BYTES}; +use crate::check_variable_size; +use crate::NetConfig; use crypto::Format::Buffer; use crypto::{KeyPair, ShardusCrypto}; @@ -84,7 +85,7 @@ impl Message { buffer } - pub fn deserialize(cursor: &mut Cursor>) -> Option { + pub fn deserialize(cursor: &mut Cursor>, net_config: NetConfig) -> Option { // Deserialize header_version let mut header_version_bytes = [0u8; 1]; cursor.read_exact(&mut header_version_bytes).ok()?; @@ -94,7 +95,7 @@ impl Message { let mut header_len_bytes = [0u8; 4]; cursor.read_exact(&mut header_len_bytes).ok()?; let header_len = u32::from_le_bytes(header_len_bytes); - check_variable_size(header_len, HEADER_SIZE_LIMIT_IN_BYTES); + check_variable_size(header_len, net_config.header_size_limit); let mut header_bytes = vec![0u8; header_len as usize]; cursor.read_exact(&mut header_bytes).ok()?; let header = header_bytes; @@ -103,13 +104,14 @@ impl Message { let mut data_len_bytes = [0u8; 4]; cursor.read_exact(&mut data_len_bytes).ok()?; let data_len = u32::from_le_bytes(data_len_bytes); - check_variable_size(data_len, PAYLOAD_SIZE_LIMIT_IN_BYTES); + let data_len_limit = net_config.payload_size_limit - header_len as usize; // Since Payload size = header + data + check_variable_size(data_len, data_len_limit); let mut data_bytes = vec![0u8; data_len as usize]; cursor.read_exact(&mut data_bytes).ok()?; let data = data_bytes; // Deserialize sign - let sign = Sign::deserialize(cursor)?; + let sign = Sign::deserialize(cursor, net_config)?; Some(Message::new(header_version, header, data, sign)) } @@ -138,12 +140,12 @@ impl Sign { buffer } - pub fn deserialize(cursor: &mut Cursor>) -> Option { + pub fn deserialize(cursor: &mut Cursor>, net_config: NetConfig) -> Option { // Deserialize owner let mut owner_len_bytes = [0u8; 4]; cursor.read_exact(&mut owner_len_bytes).ok()?; let owner_len = u32::from_le_bytes(owner_len_bytes); - check_variable_size(owner_len, OWNER_SIZE_LIMIT_IN_BYTES); + check_variable_size(owner_len, net_config.owner_size_limit); let mut owner_bytes = vec![0u8; owner_len as usize]; cursor.read_exact(&mut owner_bytes).ok()?; let owner = owner_bytes; @@ -152,7 +154,7 @@ impl Sign { let mut signature_len_bytes = [0u8; 4]; cursor.read_exact(&mut signature_len_bytes).ok()?; let signature_len = u32::from_le_bytes(signature_len_bytes); - check_variable_size(signature_len, SIGNATURE_SIZE_LIMIT_IN_BYTES); + check_variable_size(signature_len, net_config.signature_size_limit); let mut signature_bytes = vec![0u8; signature_len as usize]; cursor.read_exact(&mut signature_bytes).ok()?; let signature = signature_bytes; @@ -190,10 +192,15 @@ mod tests { owner: vec![0x12, 0x34, 0x56, 0x78], sig: vec![0x9a, 0xbc, 0xde, 0xf0], }; - + let net_config = NetConfig { + header_size_limit: 2 * 1024, + signature_size_limit: 96, + owner_size_limit: 32, + payload_size_limit: 2 * 1024 * 1024, + }; let serialized = sign.serialize(); let mut cursor = Cursor::new(serialized); - let deserialized = Sign::deserialize(&mut cursor).unwrap(); + let deserialized = Sign::deserialize(&mut cursor, &net_config).unwrap(); assert_eq!(sign.owner, deserialized.owner); assert_eq!(sign.sig, deserialized.sig); @@ -213,9 +220,15 @@ mod tests { sign, }; + let net_config = NetConfig { + header_size_limit: 2 * 1024, + signature_size_limit: 96, + owner_size_limit: 32, + payload_size_limit: 2 * 1024 * 1024, + }; let serialized = message.serialize(); let mut cursor = Cursor::new(serialized); - let deserialized = Message::deserialize(&mut cursor).unwrap(); + let deserialized = Message::deserialize(&mut cursor, &net_config).unwrap(); assert_eq!(message.header_version, deserialized.header_version); assert_eq!(message.header, deserialized.header); diff --git a/shardus_net/src/shardus_net_listener.rs b/shardus_net/src/shardus_net_listener.rs index dac40ac..d08d8dc 100644 --- a/shardus_net/src/shardus_net_listener.rs +++ b/shardus_net/src/shardus_net_listener.rs @@ -1,10 +1,9 @@ +use super::runtime::RUNTIME; use crate::header::header_types::RequestMetadata; use crate::header_factory::header_deserialize_factory; use crate::message::Message; -use crate::{shardus_crypto, HEADER_SIZE_LIMIT_IN_BYTES}; - -use super::runtime::RUNTIME; - +use crate::shardus_crypto; +use crate::NetConfig; use log::{error, info}; use std::io::Cursor; use std::net::{SocketAddr, ToSocketAddrs}; @@ -18,7 +17,7 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; pub struct ShardusNetListener { address: SocketAddr, - payload_size_limit: usize, + net_config: NetConfig, } #[derive(Error, Debug)] @@ -35,31 +34,31 @@ pub enum ListenerError { type ListenerResult = Result; impl ShardusNetListener { - pub fn new(address: A, payload_size_limit: usize) -> Result { + pub fn new(address: A, net_config: NetConfig) -> Result { let mut addresses = address.to_socket_addrs().map_err(|_| ())?; let address = addresses.next().ok_or(())?; - Ok(Self { address, payload_size_limit }) + Ok(Self { address, net_config }) } pub fn listen(&self) -> UnboundedReceiver<(String, SocketAddr, Option)> { - Self::spawn_listener(self.address, self.payload_size_limit) + Self::spawn_listener(self.address, self.net_config.clone()) } - fn spawn_listener(address: SocketAddr, payload_size_limit: usize) -> UnboundedReceiver<(String, SocketAddr, Option)> { + fn spawn_listener(address: SocketAddr, net_config: NetConfig) -> UnboundedReceiver<(String, SocketAddr, Option)> { let (tx, rx) = unbounded_channel(); - RUNTIME.spawn(Self::bind_to_socket(address, tx, payload_size_limit)); + RUNTIME.spawn(Self::bind_to_socket(address, tx, net_config)); rx } - async fn bind_to_socket(address: SocketAddr, tx: UnboundedSender<(String, SocketAddr, Option)>, payload_size_limit: usize) { + async fn bind_to_socket(address: SocketAddr, tx: UnboundedSender<(String, SocketAddr, Option)>, net_config: NetConfig) { loop { let listener = TcpListener::bind(address).await; match listener { Ok(listener) => { let tx = tx.clone(); - match Self::accept_connections(listener, tx, payload_size_limit).await { + match Self::accept_connections(listener, tx, net_config.clone()).await { Ok(_) => unreachable!(), Err(err) => { error!("Failed to accept connection to {} due to {}", address, err) @@ -73,13 +72,14 @@ impl ShardusNetListener { } } - async fn accept_connections(listener: TcpListener, received_msg_tx: UnboundedSender<(String, SocketAddr, Option)>, payload_size_limit: usize) -> std::io::Result<()> { + async fn accept_connections(listener: TcpListener, received_msg_tx: UnboundedSender<(String, SocketAddr, Option)>, net_config: NetConfig) -> std::io::Result<()> { loop { let (socket, remote_addr) = listener.accept().await?; let received_msg_tx = received_msg_tx.clone(); + let net_config = net_config.clone(); RUNTIME.spawn(async move { - let result = Self::receive(socket, remote_addr, received_msg_tx, payload_size_limit).await; + let result = Self::receive(socket, remote_addr, received_msg_tx, &net_config).await; match result { Ok(_) => info!("Connection safely completed and shutdown with {}", remote_addr), Err(err) => { @@ -90,16 +90,11 @@ impl ShardusNetListener { } } - async fn receive( - socket_stream: TcpStream, - remote_addr: SocketAddr, - received_msg_tx: UnboundedSender<(String, SocketAddr, Option)>, - payload_size_limit: usize, - ) -> ListenerResult<()> { + async fn receive(socket_stream: TcpStream, remote_addr: SocketAddr, received_msg_tx: UnboundedSender<(String, SocketAddr, Option)>, net_config: &NetConfig) -> ListenerResult<()> { let mut socket_stream: TcpStream = socket_stream; while let Ok(msg_len) = socket_stream.read_u32().await { - if (msg_len as usize) > payload_size_limit { - error!("Message length exceeds the limit of {} bytes", payload_size_limit); + if (msg_len as usize) > net_config.payload_size_limit { + error!("Message length exceeds the limit of {} bytes", net_config.payload_size_limit); continue; } @@ -121,13 +116,7 @@ impl ShardusNetListener { let msg_bytes = &buffer[1..]; let mut cursor = Cursor::new(msg_bytes.to_vec()); - let message = Message::deserialize(&mut cursor).expect("Failed to deserialize message"); - - if message.header.len() > HEADER_SIZE_LIMIT_IN_BYTES { - error!("Header exceeds the limit of {} bytes", HEADER_SIZE_LIMIT_IN_BYTES); - continue; - } - + let message = Message::deserialize(&mut cursor, net_config.clone()).expect("Failed to deserialize message"); if !message.verify(shardus_crypto::get_shardus_crypto_instance()) { error!("Failed to verify message signature"); continue; @@ -135,7 +124,7 @@ impl ShardusNetListener { info!("Message verified!"); let header_cursor = &mut Cursor::new(message.header); - let header = header_deserialize_factory(message.header_version, header_cursor).expect("Failed to deserialize header"); + let header = header_deserialize_factory(message.header_version, header_cursor, &net_config).expect("Failed to deserialize header"); let data = message.data; diff --git a/src/index.ts b/src/index.ts index 14b2cb2..f9b47b0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -50,6 +50,9 @@ export const Sn = (opts: SnOpts) => { const HASH_KEY = opts.crypto.hashKey const SIGNING_SECRET_KEY_HEX = opts.crypto.signingSecretKeyHex const PAYLOAD_SIZE_LIMIT = opts.payloadSizeLimitInBytes || 2 * 1024 * 1024 // 2MB + const HEADER_SIZE_LIMIT = opts.headerSizeLimitInBytes || 2 * 1024 // 2KB + const SIGNATURE_SIZE_LIMIT = opts.signatureSizeLimitInBytes || 96 // 96 bytes + const OWNER_SIZE_LIMIT = opts.ownerSizeLimitInBytes || 32 // 32 bytes const HEADER_OPTS = opts.headerOpts || { sendHeaderVersion: 0, @@ -62,7 +65,10 @@ export const Sn = (opts: SnOpts) => { LRU_SIZE, HASH_KEY, SIGNING_SECRET_KEY_HEX, - PAYLOAD_SIZE_LIMIT + PAYLOAD_SIZE_LIMIT, + HEADER_SIZE_LIMIT, + SIGNATURE_SIZE_LIMIT, + OWNER_SIZE_LIMIT ) net.setLoggingEnabled(false) diff --git a/src/types.ts b/src/types.ts index 4c4c54e..4319d3c 100644 --- a/src/types.ts +++ b/src/types.ts @@ -61,6 +61,9 @@ export type SnOpts = { signingSecretKeyHex: string } payloadSizeLimitInBytes?: number + headerSizeLimitInBytes?: number + signatureSizeLimitInBytes?: number + ownerSizeLimitInBytes?: number } /** @@ -71,13 +74,37 @@ export type SnOpts = { export const validateSnOpts = (opts: SnOpts): void => { if (!opts) throw new Error('snq: must supply options') - if (!opts.port || typeof opts.port !== 'number') throw new Error('snq: must supply port') - - if (!opts.crypto.hashKey || typeof opts.crypto.hashKey !== 'string') - throw new Error('snq: must supply hashKey') - - if (opts.senderOpts && opts.senderOpts.useLruCache && !opts.senderOpts.lruSize) - throw new Error('snq: must supply lruSize when using lruCache') + const validations = [ + { condition: !opts.port || typeof opts.port !== 'number', message: 'snq: must supply port' }, + { + condition: !opts.crypto.hashKey || typeof opts.crypto.hashKey !== 'string', + message: 'snq: must supply hashKey', + }, + { + condition: opts.senderOpts?.useLruCache && !opts.senderOpts.lruSize, + message: 'snq: must supply lruSize when using lruCache', + }, + { + condition: opts.payloadSizeLimitInBytes && typeof opts.payloadSizeLimitInBytes !== 'number', + message: 'snq: payloadSizeLimitInBytes must be a number', + }, + { + condition: opts.headerSizeLimitInBytes && typeof opts.headerSizeLimitInBytes !== 'number', + message: 'snq: headerSizeLimitInBytes must be a number', + }, + { + condition: opts.signatureSizeLimitInBytes && typeof opts.signatureSizeLimitInBytes !== 'number', + message: 'snq: signatureSizeLimitInBytes must be a number', + }, + { + condition: opts.ownerSizeLimitInBytes && typeof opts.ownerSizeLimitInBytes !== 'number', + message: 'snq: ownerSizeLimitInBytes must be a number', + }, + ] + + for (const { condition, message } of validations) { + if (condition) throw new Error(message) + } } export interface RemoteSender { diff --git a/test/test_multi_send.ts b/test/test_multi_send.ts index cc383f5..79c77f6 100644 --- a/test/test_multi_send.ts +++ b/test/test_multi_send.ts @@ -1,8 +1,16 @@ import { Command } from 'commander' import { Sn } from '../.' -import { AppHeader, Sign } from '../build/src/types' -const setupLruSender = (port: number, lruSize: number) => { +const setupLruSender = ( + port: number, + lruSize: number, + limits: { + payloadSize?: number + headerSize?: number + signatureSize?: number + ownerSize?: number + } +) => { return Sn({ port, address: '127.0.0.1', @@ -18,6 +26,10 @@ const setupLruSender = (port: number, lruSize: number) => { headerOpts: { sendHeaderVersion: 1, }, + payloadSizeLimitInBytes: limits.payloadSize || 2 * 1024 * 1024, // Default 2MB + headerSizeLimitInBytes: limits.headerSize || 2 * 1024, // Default 2KB + signatureSizeLimitInBytes: limits.signatureSize || 96, // Default 96 bytes + ownerSizeLimitInBytes: limits.ownerSize || 32, // Default 32 bytes }) } @@ -26,10 +38,15 @@ const main = async () => { create a cli with the following options: -p, --port Port to listen on -c, --cache Size of the LRU cache + --payload-size Payload size limit in bytes + --header-size Header size limit in bytes + --signature-size Signature size limit in bytes + --owner-size Owner size limit in bytes the cli should create a sender with the following options: - lruSize: - port: + - limits: { payloadSize, headerSize, signatureSize, ownerSize } on running the cli a listener should be started and sending of message with input from terminal should be allowed */ @@ -37,8 +54,8 @@ const main = async () => { /* Commands to use for multi_send_with_header - ts-node test/test_multi_send.ts -p 44000 -c 2 - path/to/test_multi_send.ts -p -c + ts-node test/test_multi_send.ts -p 44000 -c 2 --payload-size 2097152 --header-size 2048 --signature-size 96 --owner-size 32 + path/to/test_multi_send.ts -p -c --payload-size --header-size --signature-size --owner-size data 3 ping @@ -49,14 +66,24 @@ const main = async () => { const program = new Command() program.requiredOption('-p, --port ', 'Port to listen on') program.option('-c, --cache ', 'Size of the LRU cache', '2') + program.option('--payload-size ', 'Payload size limit in bytes', '2097152') // Default 2MB + program.option('--header-size ', 'Header size limit in bytes', '2048') // Default 2KB + program.option('--signature-size ', 'Signature size limit in bytes', '96') // Default 96 bytes + program.option('--owner-size ', 'Owner size limit in bytes', '32') // Default 32 bytes program.parse(process.argv) const port = program.port.toString() const cacheSize = program.cache.toString() + const limits = { + payloadSize: +program.payloadSize, + headerSize: +program.headerSize, + signatureSize: +program.signatureSize, + ownerSize: +program.ownerSize, + } console.log(`Starting listener on port ${port} with cache size ${cacheSize}`) - - const sn = setupLruSender(+port, +cacheSize) + console.log(`Limits: ${JSON.stringify(limits, null, 2)}`) + const sn = setupLruSender(+port, +cacheSize, limits) const input = process.stdin input.addListener('data', async (data: Buffer) => { @@ -86,7 +113,7 @@ const main = async () => { }, 1000 ) - console.log('Message sent', message) + console.log('Message sent: ', message) } else if (inputs.length === 2) { sn.evictSocket(+inputs[1], '127.0.0.1') console.log('Cache cleared') @@ -100,7 +127,6 @@ const main = async () => { if (data && data.message === 'ping') { console.log('Received ping from:', data.fromPort) console.log('Ping header:', JSON.stringify(header, null, 2)) - // await sleep(10000) return respond( { message: 'pong', fromPort: +port }, { From f8e386b338c0728272de37c8ada1c1edcb9965e2 Mon Sep 17 00:00:00 2001 From: Jintu Das Date: Fri, 29 Nov 2024 18:40:49 +0530 Subject: [PATCH 03/10] Make sender id constant --- shardus_net/src/header/header_v1.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/shardus_net/src/header/header_v1.rs b/shardus_net/src/header/header_v1.rs index a24d6d2..610d442 100644 --- a/shardus_net/src/header/header_v1.rs +++ b/shardus_net/src/header/header_v1.rs @@ -24,6 +24,7 @@ pub struct HeaderV1 { pub compression: Compression, } +const SENDER_ID_SIZE: usize = 64; impl HeaderV1 { // Serialize the struct into a Vec pub fn serialize(&self) -> Vec { @@ -75,7 +76,7 @@ impl HeaderV1 { let mut sender_id_len_bytes = [0u8; 4]; cursor.read_exact(&mut sender_id_len_bytes).ok()?; let sender_id_len = u32::from_le_bytes(sender_id_len_bytes); - check_variable_size(sender_id_len, 64); + check_variable_size(sender_id_len, SENDER_ID_SIZE); let mut sender_id_bytes = vec![0u8; sender_id_len as usize]; cursor.read_exact(&mut sender_id_bytes).ok()?; let sender_id = String::from_utf8(sender_id_bytes).ok()?; From 0d6bf78a0f3b10306a5cd61f55b4e09d32edfb1d Mon Sep 17 00:00:00 2001 From: Jintu Das Date: Fri, 29 Nov 2024 19:17:24 +0530 Subject: [PATCH 04/10] Make sign params constant --- shardus_net/src/lib.rs | 8 ++------ shardus_net/src/message.rs | 16 ++++++---------- src/index.ts | 6 +----- src/types.ts | 10 ---------- test/test_multi_send.ts | 14 +++----------- 5 files changed, 12 insertions(+), 42 deletions(-) diff --git a/shardus_net/src/lib.rs b/shardus_net/src/lib.rs index 6521f8e..18eab6e 100644 --- a/shardus_net/src/lib.rs +++ b/shardus_net/src/lib.rs @@ -42,10 +42,10 @@ const ENABLE_COMPRESSION: bool = false; #[derive(Clone)] pub struct NetConfig { pub header_size_limit: usize, - pub signature_size_limit: usize, - pub owner_size_limit: usize, pub payload_size_limit: usize, } +const SIGNATURE_SIZE_LIMIT_IN_BYTES: usize = 96; +const OWNER_SIZE_LIMIT_IN_BYTES: usize = 32; fn create_shardus_net(mut cx: FunctionContext) -> JsResult { let cx = &mut cx; @@ -59,13 +59,9 @@ fn create_shardus_net(mut cx: FunctionContext) -> JsResult { let hex_signing_sk = cx.argument::(5)?.value(cx); let payload_size_limit = cx.argument::(6)?.value(cx) as usize; let header_size_limit = cx.argument::(7)?.value(cx) as usize; - let signature_size_limit = cx.argument::(8)?.value(cx) as usize; - let owner_size_limit = cx.argument::(9)?.value(cx) as usize; let net_config = NetConfig { header_size_limit, - signature_size_limit, - owner_size_limit, payload_size_limit, }; diff --git a/shardus_net/src/message.rs b/shardus_net/src/message.rs index 39db853..16275cd 100644 --- a/shardus_net/src/message.rs +++ b/shardus_net/src/message.rs @@ -1,7 +1,7 @@ use std::io::{Cursor, Read, Write}; -use crate::check_variable_size; use crate::NetConfig; +use crate::{check_variable_size, OWNER_SIZE_LIMIT_IN_BYTES, SIGNATURE_SIZE_LIMIT_IN_BYTES}; use crypto::Format::Buffer; use crypto::{KeyPair, ShardusCrypto}; @@ -111,7 +111,7 @@ impl Message { let data = data_bytes; // Deserialize sign - let sign = Sign::deserialize(cursor, net_config)?; + let sign = Sign::deserialize(cursor)?; Some(Message::new(header_version, header, data, sign)) } @@ -140,12 +140,12 @@ impl Sign { buffer } - pub fn deserialize(cursor: &mut Cursor>, net_config: NetConfig) -> Option { + pub fn deserialize(cursor: &mut Cursor>) -> Option { // Deserialize owner let mut owner_len_bytes = [0u8; 4]; cursor.read_exact(&mut owner_len_bytes).ok()?; let owner_len = u32::from_le_bytes(owner_len_bytes); - check_variable_size(owner_len, net_config.owner_size_limit); + check_variable_size(owner_len, OWNER_SIZE_LIMIT_IN_BYTES); let mut owner_bytes = vec![0u8; owner_len as usize]; cursor.read_exact(&mut owner_bytes).ok()?; let owner = owner_bytes; @@ -154,7 +154,7 @@ impl Sign { let mut signature_len_bytes = [0u8; 4]; cursor.read_exact(&mut signature_len_bytes).ok()?; let signature_len = u32::from_le_bytes(signature_len_bytes); - check_variable_size(signature_len, net_config.signature_size_limit); + check_variable_size(signature_len, SIGNATURE_SIZE_LIMIT_IN_BYTES); let mut signature_bytes = vec![0u8; signature_len as usize]; cursor.read_exact(&mut signature_bytes).ok()?; let signature = signature_bytes; @@ -194,13 +194,11 @@ mod tests { }; let net_config = NetConfig { header_size_limit: 2 * 1024, - signature_size_limit: 96, - owner_size_limit: 32, payload_size_limit: 2 * 1024 * 1024, }; let serialized = sign.serialize(); let mut cursor = Cursor::new(serialized); - let deserialized = Sign::deserialize(&mut cursor, &net_config).unwrap(); + let deserialized = Sign::deserialize(&mut cursor).unwrap(); assert_eq!(sign.owner, deserialized.owner); assert_eq!(sign.sig, deserialized.sig); @@ -222,8 +220,6 @@ mod tests { let net_config = NetConfig { header_size_limit: 2 * 1024, - signature_size_limit: 96, - owner_size_limit: 32, payload_size_limit: 2 * 1024 * 1024, }; let serialized = message.serialize(); diff --git a/src/index.ts b/src/index.ts index f9b47b0..1b086cb 100644 --- a/src/index.ts +++ b/src/index.ts @@ -51,8 +51,6 @@ export const Sn = (opts: SnOpts) => { const SIGNING_SECRET_KEY_HEX = opts.crypto.signingSecretKeyHex const PAYLOAD_SIZE_LIMIT = opts.payloadSizeLimitInBytes || 2 * 1024 * 1024 // 2MB const HEADER_SIZE_LIMIT = opts.headerSizeLimitInBytes || 2 * 1024 // 2KB - const SIGNATURE_SIZE_LIMIT = opts.signatureSizeLimitInBytes || 96 // 96 bytes - const OWNER_SIZE_LIMIT = opts.ownerSizeLimitInBytes || 32 // 32 bytes const HEADER_OPTS = opts.headerOpts || { sendHeaderVersion: 0, @@ -66,9 +64,7 @@ export const Sn = (opts: SnOpts) => { HASH_KEY, SIGNING_SECRET_KEY_HEX, PAYLOAD_SIZE_LIMIT, - HEADER_SIZE_LIMIT, - SIGNATURE_SIZE_LIMIT, - OWNER_SIZE_LIMIT + HEADER_SIZE_LIMIT ) net.setLoggingEnabled(false) diff --git a/src/types.ts b/src/types.ts index 4319d3c..b3aa218 100644 --- a/src/types.ts +++ b/src/types.ts @@ -62,8 +62,6 @@ export type SnOpts = { } payloadSizeLimitInBytes?: number headerSizeLimitInBytes?: number - signatureSizeLimitInBytes?: number - ownerSizeLimitInBytes?: number } /** @@ -92,14 +90,6 @@ export const validateSnOpts = (opts: SnOpts): void => { condition: opts.headerSizeLimitInBytes && typeof opts.headerSizeLimitInBytes !== 'number', message: 'snq: headerSizeLimitInBytes must be a number', }, - { - condition: opts.signatureSizeLimitInBytes && typeof opts.signatureSizeLimitInBytes !== 'number', - message: 'snq: signatureSizeLimitInBytes must be a number', - }, - { - condition: opts.ownerSizeLimitInBytes && typeof opts.ownerSizeLimitInBytes !== 'number', - message: 'snq: ownerSizeLimitInBytes must be a number', - }, ] for (const { condition, message } of validations) { diff --git a/test/test_multi_send.ts b/test/test_multi_send.ts index 79c77f6..0e06c92 100644 --- a/test/test_multi_send.ts +++ b/test/test_multi_send.ts @@ -28,8 +28,6 @@ const setupLruSender = ( }, payloadSizeLimitInBytes: limits.payloadSize || 2 * 1024 * 1024, // Default 2MB headerSizeLimitInBytes: limits.headerSize || 2 * 1024, // Default 2KB - signatureSizeLimitInBytes: limits.signatureSize || 96, // Default 96 bytes - ownerSizeLimitInBytes: limits.ownerSize || 32, // Default 32 bytes }) } @@ -40,13 +38,11 @@ const main = async () => { -c, --cache Size of the LRU cache --payload-size Payload size limit in bytes --header-size Header size limit in bytes - --signature-size Signature size limit in bytes - --owner-size Owner size limit in bytes the cli should create a sender with the following options: - lruSize: - port: - - limits: { payloadSize, headerSize, signatureSize, ownerSize } + - limits: { payloadSize, headerSize} on running the cli a listener should be started and sending of message with input from terminal should be allowed */ @@ -54,8 +50,8 @@ const main = async () => { /* Commands to use for multi_send_with_header - ts-node test/test_multi_send.ts -p 44000 -c 2 --payload-size 2097152 --header-size 2048 --signature-size 96 --owner-size 32 - path/to/test_multi_send.ts -p -c --payload-size --header-size --signature-size --owner-size + ts-node test/test_multi_send.ts -p 44000 -c 2 --payload-size 2097152 --header-size 2048 + path/to/test_multi_send.ts -p -c --payload-size --header-size data 3 ping @@ -68,8 +64,6 @@ const main = async () => { program.option('-c, --cache ', 'Size of the LRU cache', '2') program.option('--payload-size ', 'Payload size limit in bytes', '2097152') // Default 2MB program.option('--header-size ', 'Header size limit in bytes', '2048') // Default 2KB - program.option('--signature-size ', 'Signature size limit in bytes', '96') // Default 96 bytes - program.option('--owner-size ', 'Owner size limit in bytes', '32') // Default 32 bytes program.parse(process.argv) const port = program.port.toString() @@ -77,8 +71,6 @@ const main = async () => { const limits = { payloadSize: +program.payloadSize, headerSize: +program.headerSize, - signatureSize: +program.signatureSize, - ownerSize: +program.ownerSize, } console.log(`Starting listener on port ${port} with cache size ${cacheSize}`) From 9b9b9ff2138dec3b1cf766acd91e9b655da3b20e Mon Sep 17 00:00:00 2001 From: Jintu Das Date: Fri, 29 Nov 2024 19:22:47 +0530 Subject: [PATCH 05/10] Update tests --- shardus_net/src/header/header_v1.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/shardus_net/src/header/header_v1.rs b/shardus_net/src/header/header_v1.rs index 610d442..6107ecd 100644 --- a/shardus_net/src/header/header_v1.rs +++ b/shardus_net/src/header/header_v1.rs @@ -150,8 +150,6 @@ mod tests { }; let net_config = NetConfig { header_size_limit: 2 * 1024, - signature_size_limit: 96, - owner_size_limit: 32, payload_size_limit: 2 * 1024 * 1024, }; let serialized = header.serialize(); @@ -203,8 +201,6 @@ mod tests { fn test_check_variable_size_panic() { let net_config = NetConfig { header_size_limit: 2 * 1024, - signature_size_limit: 96, - owner_size_limit: 32, payload_size_limit: 2 * 1024 * 1024, }; // Define a variable length that exceeds the limit @@ -218,8 +214,6 @@ mod tests { fn test_check_variable_size_no_panic() { let net_config = NetConfig { header_size_limit: 2 * 1024, - signature_size_limit: 96, - owner_size_limit: 32, payload_size_limit: 2 * 1024 * 1024, }; // Define a variable length within the limit : 2048 (0x800) From 29893c0a7e235b2bba6cd2e640fa3d5171cc2c2c Mon Sep 17 00:00:00 2001 From: Jintu Das Date: Fri, 29 Nov 2024 19:27:16 +0530 Subject: [PATCH 06/10] Update test files --- test/setup_sender.ts | 2 -- test/test_bombardment.ts | 2 -- test/test_headers.ts | 1 - test/test_lru.ts | 1 - test/test_server.js | 1 - 5 files changed, 7 deletions(-) diff --git a/test/setup_sender.ts b/test/setup_sender.ts index f8c79e2..5193576 100644 --- a/test/setup_sender.ts +++ b/test/setup_sender.ts @@ -18,7 +18,6 @@ const setupLruSender = () => { 'c3774b92cc8850fb4026b073081290b82cab3c0f66cac250b4d710ee9aaf83ed8088b37f6f458104515ae18c2a05bde890199322f62ab5114d20c77bde5e6c9d', hashKey: '69fa4195670576c0160d660c3be36556ff8d504725be8a59b5a96509e0c994bc', }, - payloadSizeLimitInBytes: PAYLOAD_SIZE_LIMIT, }) } else { return Sn({ @@ -28,7 +27,6 @@ const setupLruSender = () => { 'c3774b92cc8850fb4026b073081290b82cab3c0f66cac250b4d710ee9aaf83ed8088b37f6f458104515ae18c2a05bde890199322f62ab5114d20c77bde5e6c9d', hashKey: '69fa4195670576c0160d660c3be36556ff8d504725be8a59b5a96509e0c994bc', }, - payloadSizeLimitInBytes: PAYLOAD_SIZE_LIMIT, }) } } diff --git a/test/test_bombardment.ts b/test/test_bombardment.ts index 00b50b3..0d9a876 100644 --- a/test/test_bombardment.ts +++ b/test/test_bombardment.ts @@ -10,7 +10,6 @@ const TARGET_SOCKET_PORT = 49152 // Internal port of the validator to be bombard const MESSAGE_JSON = { route: 'bombardment-test', payload: 'Hello, world!' } // Message to be sent to the validator const RAMP_UP_STRATEGY: 'linear' | 'none' = 'none' // Ramp up strategy to be used for the bombardment const RAMP_UP_EVERY_X_BOMBS = 10 // Number of bombs to be sent before ramping up the number of socket clients -const PAYLOAD_SIZE_LIMIT = 2 * 1024 * 1024 // Payload size limit in bytes // Test variables @@ -34,7 +33,6 @@ function setupSocketClients() { 'c3774b92cc8850fb4026b073081290b82cab3c0f66cac250b4d710ee9aaf83ed8088b37f6f458104515ae18c2a05bde890199322f62ab5114d20c77bde5e6c9d', hashKey: '69fa4195670576c0160d660c3be36556ff8d504725be8a59b5a96509e0c994bc', }, - payloadSizeLimitInBytes: PAYLOAD_SIZE_LIMIT, }) ) } diff --git a/test/test_headers.ts b/test/test_headers.ts index 128b47f..ab74eaf 100644 --- a/test/test_headers.ts +++ b/test/test_headers.ts @@ -11,7 +11,6 @@ const setupSender = (port: number, senderOpts: any, headerOpts: any) => { 'c3774b92cc8850fb4026b073081290b82cab3c0f66cac250b4d710ee9aaf83ed8088b37f6f458104515ae18c2a05bde890199322f62ab5114d20c77bde5e6c9d', hashKey: '69fa4195670576c0160d660c3be36556ff8d504725be8a59b5a96509e0c994bc', }, - payloadSizeLimitInBytes: 2 * 1024 * 1024, }) } diff --git a/test/test_lru.ts b/test/test_lru.ts index 857dc13..46f0ed5 100644 --- a/test/test_lru.ts +++ b/test/test_lru.ts @@ -18,7 +18,6 @@ const setupLruSender = (port: number, lruSize: number) => { headerOpts: { sendHeaderVersion: 1, }, - payloadSizeLimitInBytes: 3 * 1024 * 1024, }) } diff --git a/test/test_server.js b/test/test_server.js index 4a39064..f6cf2c7 100644 --- a/test/test_server.js +++ b/test/test_server.js @@ -7,7 +7,6 @@ const port = 5001 const sn = Sn({ address, port, - payloadSizeLimitInBytes: 2 * 1024 * 1024, }) const RESPONSE_DELAY_MILLIS = 1000 From 5895335f708fcebf55ad25365120d65aeb174d28 Mon Sep 17 00:00:00 2001 From: Jintu Das Date: Fri, 29 Nov 2024 19:28:20 +0530 Subject: [PATCH 07/10] Fix test_multi_send bug --- test/setup_sender.ts | 1 - test/test_multi_send.ts | 2 -- 2 files changed, 3 deletions(-) diff --git a/test/setup_sender.ts b/test/setup_sender.ts index 5193576..04dfa4a 100644 --- a/test/setup_sender.ts +++ b/test/setup_sender.ts @@ -2,7 +2,6 @@ import { Sn } from '../.' const RESPONSE_DELAY_MILLIS = 500 const USE_LRU_CACHE = true -const PAYLOAD_SIZE_LIMIT = 2 * 1024 * 1024 const setupLruSender = () => { const port = 49152 diff --git a/test/test_multi_send.ts b/test/test_multi_send.ts index 0e06c92..a1607bc 100644 --- a/test/test_multi_send.ts +++ b/test/test_multi_send.ts @@ -7,8 +7,6 @@ const setupLruSender = ( limits: { payloadSize?: number headerSize?: number - signatureSize?: number - ownerSize?: number } ) => { return Sn({ From bdb15c9f486cf97753ad66c5aa8e8bbb1041180a Mon Sep 17 00:00:00 2001 From: Jintu Das Date: Mon, 2 Dec 2024 12:48:43 +0530 Subject: [PATCH 08/10] Move payload and header sizes under same config --- src/index.ts | 4 ++-- src/types.ts | 14 ++++++++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/index.ts b/src/index.ts index 1b086cb..fa79a9d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -49,8 +49,8 @@ export const Sn = (opts: SnOpts) => { const LRU_SIZE = (opts.senderOpts && opts.senderOpts.lruSize) || 1028 const HASH_KEY = opts.crypto.hashKey const SIGNING_SECRET_KEY_HEX = opts.crypto.signingSecretKeyHex - const PAYLOAD_SIZE_LIMIT = opts.payloadSizeLimitInBytes || 2 * 1024 * 1024 // 2MB - const HEADER_SIZE_LIMIT = opts.headerSizeLimitInBytes || 2 * 1024 // 2KB + const PAYLOAD_SIZE_LIMIT = opts.payloadOpts?.payloadSizeLimitInBytes || 2 * 1024 * 1024 // 2MB + const HEADER_SIZE_LIMIT = opts.payloadOpts?.headerSizeLimitInBytes || 2 * 1024 // 2KB const HEADER_OPTS = opts.headerOpts || { sendHeaderVersion: 0, diff --git a/src/types.ts b/src/types.ts index b3aa218..78448f5 100644 --- a/src/types.ts +++ b/src/types.ts @@ -60,8 +60,10 @@ export type SnOpts = { hashKey: string signingSecretKeyHex: string } - payloadSizeLimitInBytes?: number - headerSizeLimitInBytes?: number + payloadOpts?: { + payloadSizeLimitInBytes?: number + headerSizeLimitInBytes?: number + } } /** @@ -83,11 +85,15 @@ export const validateSnOpts = (opts: SnOpts): void => { message: 'snq: must supply lruSize when using lruCache', }, { - condition: opts.payloadSizeLimitInBytes && typeof opts.payloadSizeLimitInBytes !== 'number', + condition: + opts.payloadOpts?.payloadSizeLimitInBytes && + typeof opts.payloadOpts.payloadSizeLimitInBytes !== 'number', message: 'snq: payloadSizeLimitInBytes must be a number', }, { - condition: opts.headerSizeLimitInBytes && typeof opts.headerSizeLimitInBytes !== 'number', + condition: + opts.payloadOpts?.headerSizeLimitInBytes && + typeof opts.payloadOpts.headerSizeLimitInBytes !== 'number', message: 'snq: headerSizeLimitInBytes must be a number', }, ] From 9a63262dc1ea8ffdd2fa460e773e5cabd69ad613 Mon Sep 17 00:00:00 2001 From: Jintu Das Date: Mon, 2 Dec 2024 12:57:35 +0530 Subject: [PATCH 09/10] Update test file --- test/test_multi_send.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/test_multi_send.ts b/test/test_multi_send.ts index a1607bc..f115b96 100644 --- a/test/test_multi_send.ts +++ b/test/test_multi_send.ts @@ -24,8 +24,10 @@ const setupLruSender = ( headerOpts: { sendHeaderVersion: 1, }, - payloadSizeLimitInBytes: limits.payloadSize || 2 * 1024 * 1024, // Default 2MB - headerSizeLimitInBytes: limits.headerSize || 2 * 1024, // Default 2KB + payloadOpts: { + payloadSizeLimitInBytes: limits.payloadSize || 2 * 1024 * 1024, // Default 2MB + headerSizeLimitInBytes: limits.headerSize || 2 * 1024, // Default 2KB + }, }) } From d0c3978774834bf691e961d2c059f9a3f2995519 Mon Sep 17 00:00:00 2001 From: Jintu Das Date: Mon, 2 Dec 2024 14:53:09 +0530 Subject: [PATCH 10/10] Remove unnecessary clone for NetConfig --- shardus_net/src/header/header_v1.rs | 4 ++-- shardus_net/src/header_factory.rs | 4 ++-- shardus_net/src/lib.rs | 2 +- shardus_net/src/message.rs | 2 +- shardus_net/src/shardus_net_listener.rs | 10 +++++----- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/shardus_net/src/header/header_v1.rs b/shardus_net/src/header/header_v1.rs index 6107ecd..00aa0f8 100644 --- a/shardus_net/src/header/header_v1.rs +++ b/shardus_net/src/header/header_v1.rs @@ -61,7 +61,7 @@ impl HeaderV1 { } // Deserialize a Vec cursor into a HeaderV1 struct - pub fn deserialize(cursor: &mut Cursor>, net_config: &NetConfig) -> Option { + pub fn deserialize(cursor: &mut Cursor>, net_config: NetConfig) -> Option { // Deserialize uuid let mut uuid_bytes = [0u8; 16]; cursor.read_exact(&mut uuid_bytes).ok()?; @@ -154,7 +154,7 @@ mod tests { }; let serialized = header.serialize(); let mut cursor = Cursor::new(serialized); - let deserialized = HeaderV1::deserialize(&mut cursor, &net_config).unwrap(); + let deserialized = HeaderV1::deserialize(&mut cursor, net_config).unwrap(); assert_eq!(header.uuid, deserialized.uuid); assert_eq!(header.message_length, deserialized.message_length); diff --git a/shardus_net/src/header_factory.rs b/shardus_net/src/header_factory.rs index 3db95ff..0e2bf63 100644 --- a/shardus_net/src/header_factory.rs +++ b/shardus_net/src/header_factory.rs @@ -11,10 +11,10 @@ pub fn wrap_serialized_message(mut serialized_message: Vec) -> Vec { buffer } -pub fn header_deserialize_factory(version: u8, serialized_header_cursor: &mut Cursor>, net_config: &NetConfig) -> Option
{ +pub fn header_deserialize_factory(version: u8, serialized_header_cursor: &mut Cursor>, net_config: NetConfig) -> Option
{ match version { 1 => { - let deserialized = HeaderV1::deserialize(serialized_header_cursor, &net_config)?; + let deserialized = HeaderV1::deserialize(serialized_header_cursor, net_config)?; Some(Header::V1(deserialized)) } _ => None, diff --git a/shardus_net/src/lib.rs b/shardus_net/src/lib.rs index 18eab6e..c8e2cd4 100644 --- a/shardus_net/src/lib.rs +++ b/shardus_net/src/lib.rs @@ -39,7 +39,7 @@ use crate::shardus_net_sender::Connection; const ENABLE_COMPRESSION: bool = false; -#[derive(Clone)] +#[derive(Copy, Clone)] pub struct NetConfig { pub header_size_limit: usize, pub payload_size_limit: usize, diff --git a/shardus_net/src/message.rs b/shardus_net/src/message.rs index 16275cd..e9b9fe9 100644 --- a/shardus_net/src/message.rs +++ b/shardus_net/src/message.rs @@ -224,7 +224,7 @@ mod tests { }; let serialized = message.serialize(); let mut cursor = Cursor::new(serialized); - let deserialized = Message::deserialize(&mut cursor, &net_config).unwrap(); + let deserialized = Message::deserialize(&mut cursor, net_config).unwrap(); assert_eq!(message.header_version, deserialized.header_version); assert_eq!(message.header, deserialized.header); diff --git a/shardus_net/src/shardus_net_listener.rs b/shardus_net/src/shardus_net_listener.rs index d08d8dc..f015413 100644 --- a/shardus_net/src/shardus_net_listener.rs +++ b/shardus_net/src/shardus_net_listener.rs @@ -42,7 +42,7 @@ impl ShardusNetListener { } pub fn listen(&self) -> UnboundedReceiver<(String, SocketAddr, Option)> { - Self::spawn_listener(self.address, self.net_config.clone()) + Self::spawn_listener(self.address, self.net_config) } fn spawn_listener(address: SocketAddr, net_config: NetConfig) -> UnboundedReceiver<(String, SocketAddr, Option)> { @@ -79,7 +79,7 @@ impl ShardusNetListener { let net_config = net_config.clone(); RUNTIME.spawn(async move { - let result = Self::receive(socket, remote_addr, received_msg_tx, &net_config).await; + let result = Self::receive(socket, remote_addr, received_msg_tx, net_config).await; match result { Ok(_) => info!("Connection safely completed and shutdown with {}", remote_addr), Err(err) => { @@ -90,7 +90,7 @@ impl ShardusNetListener { } } - async fn receive(socket_stream: TcpStream, remote_addr: SocketAddr, received_msg_tx: UnboundedSender<(String, SocketAddr, Option)>, net_config: &NetConfig) -> ListenerResult<()> { + async fn receive(socket_stream: TcpStream, remote_addr: SocketAddr, received_msg_tx: UnboundedSender<(String, SocketAddr, Option)>, net_config: NetConfig) -> ListenerResult<()> { let mut socket_stream: TcpStream = socket_stream; while let Ok(msg_len) = socket_stream.read_u32().await { if (msg_len as usize) > net_config.payload_size_limit { @@ -116,7 +116,7 @@ impl ShardusNetListener { let msg_bytes = &buffer[1..]; let mut cursor = Cursor::new(msg_bytes.to_vec()); - let message = Message::deserialize(&mut cursor, net_config.clone()).expect("Failed to deserialize message"); + let message = Message::deserialize(&mut cursor, net_config).expect("Failed to deserialize message"); if !message.verify(shardus_crypto::get_shardus_crypto_instance()) { error!("Failed to verify message signature"); continue; @@ -124,7 +124,7 @@ impl ShardusNetListener { info!("Message verified!"); let header_cursor = &mut Cursor::new(message.header); - let header = header_deserialize_factory(message.header_version, header_cursor, &net_config).expect("Failed to deserialize header"); + let header = header_deserialize_factory(message.header_version, header_cursor, net_config).expect("Failed to deserialize header"); let data = message.data;