From 523de23809e378e33c377757ad7f08185ce04d5b Mon Sep 17 00:00:00 2001 From: Your Name Date: Sat, 14 Sep 2024 13:31:20 -0400 Subject: [PATCH] [CLEANUP] --- pyproject.toml | 15 +- requirements.txt | 17 +- swarms_torch/ant_colony_swarm.py | 4 +- swarms_torch/drone_swarm.py | 376 ---------------------------- swarms_torch/firefly.py | 3 +- swarms_torch/mas_model.py | 200 +++++++++++++++ swarms_torch/queen_bee.py | 3 +- swarms_torch/spiral_optimization.py | 3 +- swarms_torch/structs/basic_nn.py | 0 test.py | 341 +++++++++++++++++++++++++ 10 files changed, 567 insertions(+), 395 deletions(-) delete mode 100644 swarms_torch/drone_swarm.py create mode 100644 swarms_torch/mas_model.py delete mode 100644 swarms_torch/structs/basic_nn.py create mode 100644 test.py diff --git a/pyproject.toml b/pyproject.toml index 91f8c3e..7956a36 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,12 +4,12 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms-torch" -version = "0.2.1" +version = "0.2.2" description = "swarms-torch - Pytorch" license = "MIT" authors = ["Kye Gomez "] homepage = "https://github.com/kyegomez/swarms-pytorch" -documentation = "" # Add this if you have documentation. +documentation = "https://github.com/kyegomez/swarms-pytorch" # Add this if you have documentation. readme = "README.md" # Assuming you have a README.md repository = "https://github.com/kyegomez/swarms-pytorch" keywords = ["artificial intelligence", "deep learning", "optimizers", "Prompt Engineering"] @@ -29,10 +29,13 @@ packages = [ [tool.poetry.dependencies] python = "^3.6" -torch = "2.1.2" -einops = "0.7.0" -zetascale = "1.4.4" -pytest = "7.4.2" +torch = "*" +einops = "*" +zetascale = "*" +pytest = "*" +torchvision = "*" +loguru = "*" +einx = "*" diff --git a/requirements.txt b/requirements.txt index 32271e9..ef388af 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,9 @@ -torch==2.1.2 -einops==0.7.0 -pandas==2.2.1 -zetascale==1.4.4 -pytest==7.4.2 -mkdocs -mkdocs-material -mkdocs-glightbox +python +torch +einops +zetascale +pytest +torchvision +loguru +einx + diff --git a/swarms_torch/ant_colony_swarm.py b/swarms_torch/ant_colony_swarm.py index 65c7eb6..487e650 100644 --- a/swarms_torch/ant_colony_swarm.py +++ b/swarms_torch/ant_colony_swarm.py @@ -1,7 +1,7 @@ import torch +from torch import nn - -class AntColonyOptimization: +class AntColonyOptimization(nn.Module): """ Ant Colony Optimization Overview: https://en.wikipedia.org/wiki/Ant_colony_optimization_algorithms diff --git a/swarms_torch/drone_swarm.py b/swarms_torch/drone_swarm.py deleted file mode 100644 index 85768d4..0000000 --- a/swarms_torch/drone_swarm.py +++ /dev/null @@ -1,376 +0,0 @@ -import torch -from torch import nn, Tensor -from dataclasses import dataclass -from zeta.nn import FeedForward -from typing import Any -import torch.nn.functional as F - - -OBST_COLOR_3 = (0.0, 0.5, 0.0) -OBST_COLOR_4 = (0.0, 0.5, 0.0, 1.0) - - -QUADS_OBS_REPR = { - "xyz_vxyz_R_omega": 18, - "xyz_vxyz_R_omega_floor": 19, - "xyz_vxyz_R_omega_wall": 24, -} - -QUADS_NEIGHBOR_OBS_TYPE = { - "none": 0, - "pos_vel": 6, -} - -QUADS_OBSTACLE_OBS_TYPE = { - "none": 0, - "octomap": 9, -} - - -@dataclass -class OneHeadAttention(nn.Module): - """ - OneHeadAttention module performs self-attention operation on input tensors. - - Args: - dim (int): The dimension of the input tensors. - - Attributes: - w_qs (nn.Linear): Linear layer for queries transformation. - w_ks (nn.Linear): Linear layer for keys transformation. - w_vs (nn.Linear): Linear layer for values transformation. - fc (nn.Linear): Linear layer for final transformation. - ln (nn.LayerNorm): Layer normalization for output. - - Methods: - forward(q, k, v): Performs forward pass of the self-attention operation. - - """ - - dim: int - - def __post_init_(self): - self.w_qs = nn.Linear(self.dim, self.dim, bias=False) - self.w_ks = nn.Linear(self.dim, self.dim, bias=False) - self.w_vs = nn.Linear(self.dim, self.dim, bias=False) - - self.fc = nn.Linear(self.dim, self.dim, bias=False) - self.ln = nn.LayerNorm(self.dim, eps=1e-6) - - def forward(self, q, k, v): - """ - Performs forward pass of the self-attention operation. - - Args: - q (torch.Tensor): The query tensor. - k (torch.Tensor): The key tensor. - v (torch.Tensor): The value tensor. - - Returns: - q (torch.Tensor): The output tensor after self-attention operation. - attn (torch.Tensor): The attention weights. - - """ - residual = q - - # Pre attn ops - q = self.w_qs(q) - k = self.w_ks(k) - v = self.w_vs(v) - - # Compute attention weights using queries and keys - attn = torch.matmul(q / (self.dim**-0.5), k.tranpose(-1, -2)) - attn = F.softmax(attn, dim=-1) - q = torch.matmul(attn, v) - q = self.fc(q) - q += residual - q = self.ln(q) - return q, attn - - -def estimate_neuron_score(act): - reduce_axes = list(range(act.dim() - 1)) - score = torch.mean(torch.abs(act), dim=reduce_axes) - return score - - -@dataclass -class SwarmNeighborhoodEncoder(nn.Module): - """ - A class representing the encoder for swarm neighborhood observations. - - Args: - self_obs_dim (int): The dimension of the self-observation. - neighbor_obs_dim (int): The dimension of the neighbor observations. - neighbor_hidden_size (int): The hidden size of the neighbor encoder. - num_use_neighbor_obs (int): The number of neighbor observations to use. - """ - - self_obs_dim: int - neighbor_obs_dim: int - neighbor_hidden_size: int - num_use_neighbor_obs: int - - -@dataclass -class SwarmNeighborhoodEncoderDeepsets(SwarmNeighborhoodEncoder): - neighbor_obs_dim: int - neighbor_hidden_size: int - self_obs_dim: int - num_use_neighbor_obs: int - mult: int = 4 - args: dict = None - - def __post_init__(self): - self.ffn = FeedForward( - self.neighbor_obs_dim, - self.neighbor_hidden_size, - self.mult, - self.args, - ) - - def forward( - self, - self_obs: Tensor, - obs: Tensor, - all_neighbor_obs_size: int, - batch: int, - ) -> Tensor: - """ - Forward pass of the SwarmNeighborhoodEncoder. - - Args: - self_obs (Tensor): Self observation tensor. - obs (Tensor): Observation tensor. - all_neighbor_obs_size (int): Size of all neighbor observations. - batch (int): Batch size. - - Returns: - Tensor: Mean embedding tensor. - """ - obs_neighbors = obs[ - :, self.self_obs_dim : self.self_obs_dim + all_neighbor_obs_size - ] - obs_neighbors = obs_neighbors.reshape(-1, self.neighbor_obs_dim) - neighbor_embeds = self.embedding_mlp(obs_neighbors) - neighbor_embeds = neighbor_embeds.reshape( - batch, -1, self.neighbor_hidden_size - ) - mean_embed = torch.mean(neighbor_embeds, dim=1) - return mean_embed - - -@dataclass -class SwarmNeighborhoodEncoderAttention(SwarmNeighborhoodEncoder): - """ - A class that represents a swarm neighborhood encoder with attention mechanism. - - Args: - neighbor_obs_dim (int): The dimension of the neighbor observations. - neighbor_hidden_size (int): The hidden size of the neighbor encoder. - self_obs_dim (int): The dimension of the self observations. - num_use_neighbor_obs (int): The number of neighbor observations to use. - mult (int, optional): The multiplier for the hidden size in the MLPs. Defaults to 4. - args (dict, optional): Additional arguments for the MLPs. Defaults to None. - """ - - neighbor_obs_dim: int - neighbor_hidden_size: int - self_obs_dim: int - num_use_neighbor_obs: int - mult: int = 4 - args: dict = None - - def __post_init__(self): - self.embedding_mlp = FeedForward( - self.self_obs_dim + self.neighbor_obs_dim, - self.neighbor_hidden_size, - self.mult, - self.args, - ) - - self.neighbor_value_mlp = FeedForward( - self.neighbor_hidden_size, - self.neighbor_hidden_size, - self.mult, - self.args, - ) - - # Outputs scalar score alpha_i for each neighbor - self.attention_mlp = FeedForward( - self.neighbor_hidden_size * 2, - self.neighbor_hidden_size, - self.mult, - self.args, - ) - - def forward( - self, - self_obs: Tensor, - obs: Tensor, - all_neighbor_obs_size: int, - batch_size: int, - ) -> Tensor: - obs_neighbors = obs[ - :, self.self_obs_dim : self.self_obs_dim + all_neighbor_obs_size - ] - obs_neighbors = obs_neighbors.reshape(-1, self.neighbor_obs_dim) - - # Concat self observation with neighbor observation - self_obs_repeat = self_obs.repeat(self.num_use_neighbor_obs, 1) - mlp_input = torch.cat((self_obs_repeat, obs_neighbors), dim=1) - neighbor_embeddings = self.embedding_mlp(mlp_input) - neighbor_values = self.neighbor_value_mlp(neighbor_embeddings) - neighbor_embeddings_mean_input = neighbor_embeddings.reshape( - batch_size, -1, self.neighbor_hidden_size - ) - neighbor_embeddings_mean = torch.mean( - neighbor_embeddings_mean_input, dim=1 - ) - neighbor_embeddings_mean_repeat = neighbor_embeddings_mean.repeat( - self.num_use_neighbor_obs, 1 - ) - attention_mlp_input = torch.cat( - (neighbor_embeddings, neighbor_embeddings_mean_repeat), dim=1 - ) - attention_weights = self.attention_mlp(attention_mlp_input).view( - batch_size, -1 - ) - attention_weights_softmax = torch.nn.functional.softmax( - attention_weights, dim=1 - ) - attention_weights_softmax = attention_weights_softmax.view(-1, 1) - - final_neighbor_embedding = attention_weights_softmax * neighbor_values - final_neighbor_embedding = final_neighbor_embedding.view( - batch_size, -1, self.neighbor_hidden_size - ) - final_neighbor_embedding = torch.sum(final_neighbor_embedding, dim=1) - - return final_neighbor_embedding - - -@dataclass -class SwarmNeighborEncoderMLP(SwarmNeighborhoodEncoder): - """ - A class representing a multi-layer perceptron (MLP) encoder for swarm neighbor observations. - - Args: - neighbor_obs_dim (int): The dimension of each neighbor observation. - neighbor_hidden_size (int): The size of the hidden layer in the MLP. - self_obs_dim (int): The dimension of the self observation. - num_use_neighbor_obs (int): The number of neighbor observations to use. - mult (int, optional): The multiplier for the hidden layer size. Defaults to 4. - args (dict, optional): Additional arguments for the MLP. Defaults to None. - """ - - neighbor_obs_dim: int - neighbor_hidden_size: int - self_obs_dim: int - num_use_neighbor_obs: int - mult: int = 4 - args: dict = None - - def __post_init__(self): - """ - Initialize the MLP encoder. - - This method creates an MLP with the specified dimensions and parameters. - """ - self.neighbor_mlp = FeedForward( - self.neighbor_obs_dim * self.num_use_neighbor_obs, - self.neighbor_hidden_size, - self.mult, - self.args, - ) - - def forward( - self, - self_obs: Tensor, - obs: Tensor, - all_neighbor_obs_size: int, - batch_size: int, - ) -> Tensor: - """ - Perform a forward pass through the MLP encoder. - - Args: - self_obs (Tensor): The self observation tensor. - obs (Tensor): The observation tensor. - all_neighbor_obs_size (int): The size of all neighbor observations. - batch_size (int): The size of the batch. - - Returns: - Tensor: The final neighborhood embedding tensor. - """ - obs_neighbors = obs[ - :, self.self_obs_dim : self.self_obs_dim + all_neighbor_obs_size - ] - final_neighborhood_embedding = self.neighbor_mlp(obs_neighbors) - return final_neighborhood_embedding - - -@dataclass -class SwarmMultiHeadAttentionEncoder(nn.Module): - dim: int - - -@dataclass -class QuadSingleHeadAttentionEncoderSim2Real(SwarmMultiHeadAttentionEncoder): - obs_space: int - quads_obs_repr: Any - neighbor_hidden_size: int - quads_neighbor_hidden_size: int - use_obstacles: Any - quads_use_obstacles: Any - quads_neighbor_visible_num: int - num_use_neighbor_obs: int - quads_num_agents: int - quads_neighbor_obs_type: Any - rnn_size: int - - def __post_init__(self): - if self.quads_obs_repr in QUADS_OBS_REPR: - self.self_obs_dim = QUADS_OBS_REPR[self.quads_obs_repr] - else: - raise NotImplementedError( - f"Unknown observation representation {self.quads_obs_repr}" - ) - - self.neighborbor_hidden_size = self.quads_neighbor_hidden_size - self.use_obstacles = self.quads_use_obstacles - - if self.quads_neighbor_visible_num == 1: - self.num_use_neighbor_obs = self.quads_num_agents - 1 - else: - self.num_use_neighbor_obs = self.quads_neighbor_visible_num - - self.neighbor_obs_dim = QUADS_NEIGHBOR_OBS_TYPE[ - self.quads_neighbor_obs_type - ] - self.all_neighbor_obs_dim = ( - self.neighbor_obs_dim * self.num_use_neighbor_obs - ) - - self.self_embed_layer = nn.Sequential( - nn.Linear(self.self_obs_dim, self.rnn_size), - nn.ReLU(), - ) - self.neighbor_embed_layer = nn.Sequential( - nn.Linear(self.all_neighbor_obs_dim, self.rnn_size), - nn.ReLU(), - ) - self.obstacle_obs_dim = QUADS_OBSTACLE_OBS_TYPE[ - self.quads_obstacle_obs_type - ] - self.obstacle_embed_layer = nn.Sequential( - nn.Linear(self.obstacle_obs_dim, self.rnn_size), - nn.ReLU(), - ) - self.attn = OneHeadAttention(self.rnn_size) - self.encoder_output_size = self.rnn_size - - self.ffn = FeedForward( - 3 * self.rnn_size, - self.encoder_output_size, - ) diff --git a/swarms_torch/firefly.py b/swarms_torch/firefly.py index b5cc063..71a9172 100644 --- a/swarms_torch/firefly.py +++ b/swarms_torch/firefly.py @@ -4,9 +4,10 @@ import torch from loguru import logger from torch import Tensor +from torch import nn -class FireflyOptimizer: +class FireflyOptimizer(nn.Module): def __init__( self, cost_function: Callable[[Tensor], Tensor], diff --git a/swarms_torch/mas_model.py b/swarms_torch/mas_model.py new file mode 100644 index 0000000..dfdbfc9 --- /dev/null +++ b/swarms_torch/mas_model.py @@ -0,0 +1,200 @@ +from typing import List, Dict, Any +import torch +import torch.nn as nn +import torch.optim as optim +from loguru import logger + +# Set up logger +logger.add("masi_log.log", rotation="500 MB") + +# Define device +device = torch.device("cuda" if torch.cuda.is_available() else "cpu") +logger.info(f"Using device: {device}") + + +# Agent Base Class +class Agent(nn.Module): + def __init__(self): + super(Agent, self).__init__() + + def forward(self, x: torch.Tensor) -> torch.Tensor: + raise NotImplementedError + + # def backward(self, loss: torch.Tensor) -> None: + # loss.backward() + + def update_parameters( + self, shared_gradients: Dict[str, torch.Tensor] + ) -> None: + with torch.no_grad(): + for name, param in self.named_parameters(): + if param.grad is not None: + param.grad = shared_gradients[name] + self.optimizer.step() + self.optimizer.zero_grad() + + +# MLP Agent +class MLPAgent(Agent): + def __init__(self, input_size: int, hidden_size: int, output_size: int): + super(MLPAgent, self).__init__() + self.model = nn.Sequential( + nn.Flatten(), # Add this line to flatten the input + nn.Linear(input_size, hidden_size), + nn.ReLU(), + nn.Linear(hidden_size, output_size), + ) + self.to(device) + self.optimizer = optim.Adam(self.parameters(), lr=0.001) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + logger.debug(f"MLPAgent input shape: {x.shape}") + output = self.model(x) + logger.debug(f"MLPAgent output shape: {output.shape}") + return output + + +# CNN Agent +class CNNAgent(Agent): + def __init__(self, input_channels: int, num_classes: int): + super(CNNAgent, self).__init__() + self.model = nn.Sequential( + nn.Conv2d(input_channels, 16, kernel_size=3, padding=1), + nn.ReLU(), + nn.Flatten(), + nn.Linear(16 * 28 * 28, num_classes), + ) + self.to(device) + self.optimizer = optim.Adam(self.parameters(), lr=0.001) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + logger.debug(f"CNNAgent input shape: {x.shape}") + output = self.model(x) + logger.debug(f"CNNAgent output shape: {output.shape}") + return output + + +# LSTM Agent +class LSTMAgent(Agent): + def __init__(self, input_size: int, hidden_size: int, output_size: int): + super(LSTMAgent, self).__init__() + self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True) + self.fc = nn.Linear(hidden_size, output_size) + self.to(device) + self.optimizer = optim.Adam(self.parameters(), lr=0.001) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + logger.debug(f"LSTMAgent input shape: {x.shape}") + # Reshape input: (batch, channels, height, width) -> (batch, height, width * channels) + x = x.view(x.size(0), x.size(2), -1) + lstm_out, _ = self.lstm(x) + output = self.fc(lstm_out[:, -1, :]) + logger.debug(f"LSTMAgent output shape: {output.shape}") + return output + + +# Transformer Agent +class TransformerAgent(Agent): + def __init__( + self, input_size: int, num_heads: int, num_layers: int, output_size: int + ): + super(TransformerAgent, self).__init__() + self.embedding = nn.Linear(input_size, 128) + encoder_layer = nn.TransformerEncoderLayer(d_model=128, nhead=num_heads) + self.transformer_encoder = nn.TransformerEncoder( + encoder_layer, num_layers=num_layers + ) + self.fc = nn.Linear(128, output_size) + self.to(device) + self.optimizer = optim.Adam(self.parameters(), lr=0.001) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + logger.debug(f"TransformerAgent input shape: {x.shape}") + # Reshape input: (batch, channels, height, width) -> (batch, height, width * channels) + x = x.view(x.size(0), x.size(2), -1) + x = self.embedding(x) + x = x.permute(1, 0, 2) # (sequence_length, batch_size, embedding_dim) + transformer_out = self.transformer_encoder(x) + transformer_out = transformer_out.permute( + 1, 0, 2 + ) # Back to (batch_size, sequence_length, embedding_dim) + output = self.fc(transformer_out[:, -1, :]) + logger.debug(f"TransformerAgent output shape: {output.shape}") + return output + + +# Multi-Architecture Swarm Intelligence (MASI) class +class MultiArchitectureSwarm(nn.Module): + def __init__( + self, + num_mlp_agents: int, + num_cnn_agents: int, + num_lstm_agents: int, + num_transformer_agents: int, + input_sizes: Dict[str, Any], + output_size: int, + ): + super(MultiArchitectureSwarm, self).__init__() + + self.agents: List[Agent] = [] + + # Initialize MLP Agents + for _ in range(num_mlp_agents): + agent = MLPAgent( + input_size=input_sizes["mlp"]["input_size"], + hidden_size=input_sizes["mlp"]["hidden_size"], + output_size=output_size, + ) + self.agents.append(agent) + + # Initialize CNN Agents + for _ in range(num_cnn_agents): + agent = CNNAgent( + input_channels=input_sizes["cnn"]["input_channels"], + num_classes=output_size, + ) + self.agents.append(agent) + + # Initialize LSTM Agents + for _ in range(num_lstm_agents): + agent = LSTMAgent( + input_size=input_sizes["lstm"]["input_size"], + hidden_size=input_sizes["lstm"]["hidden_size"], + output_size=output_size, + ) + self.agents.append(agent) + + # Initialize Transformer Agents + for _ in range(num_transformer_agents): + agent = TransformerAgent( + input_size=input_sizes["transformer"]["input_size"], + num_heads=input_sizes["transformer"]["num_heads"], + num_layers=input_sizes["transformer"]["num_layers"], + output_size=output_size, + ) + self.agents.append(agent) + + logger.info(f"Initialized {len(self.agents)} agents.") + + def forward(self, x: torch.Tensor) -> torch.Tensor: + agent_outputs = [] + + for agent in self.agents: + agent_output = agent(x) + agent_outputs.append(agent_output) + + # Aggregate outputs (Simple averaging for now) + global_output = self.aggregate_agent_outputs(agent_outputs) + + return global_output + + def aggregate_agent_outputs( + self, agent_outputs: List[torch.Tensor] + ) -> torch.Tensor: + # Stack outputs and calculate mean + logger.debug(f"Aggregating outputs from {len(agent_outputs)} agents.") + stacked_outputs = torch.stack(agent_outputs) + logger.debug(f"Stacked outputs shape: {stacked_outputs.shape}") + global_output = torch.mean(stacked_outputs, dim=0) + logger.debug(f"Global output shape: {global_output.shape}") + return global_output diff --git a/swarms_torch/queen_bee.py b/swarms_torch/queen_bee.py index b0c8719..4671c67 100644 --- a/swarms_torch/queen_bee.py +++ b/swarms_torch/queen_bee.py @@ -1,7 +1,8 @@ import torch +from torch import nn -class QueenBeeGa: +class QueenBeeGa(nn.Module): """ Queen Bee evolution for genetic algos diff --git a/swarms_torch/spiral_optimization.py b/swarms_torch/spiral_optimization.py index 82db8cb..f07cf2a 100644 --- a/swarms_torch/spiral_optimization.py +++ b/swarms_torch/spiral_optimization.py @@ -1,7 +1,8 @@ import torch +from torch import nn -class SPO: +class SPO(nn.Module): """ Spiral Optimization (SPO) Algorithm in PyTorch. diff --git a/swarms_torch/structs/basic_nn.py b/swarms_torch/structs/basic_nn.py deleted file mode 100644 index e69de29..0000000 diff --git a/test.py b/test.py new file mode 100644 index 0000000..e7a9947 --- /dev/null +++ b/test.py @@ -0,0 +1,341 @@ +from typing import List, Dict, Any +import torch +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader +from loguru import logger + +# Set up logger +logger.add("masi_log.log", rotation="500 MB") + +# Define device +device = torch.device("cuda" if torch.cuda.is_available() else "cpu") +logger.info(f"Using device: {device}") + + +# Agent Base Class +class Agent(nn.Module): + def __init__(self): + super(Agent, self).__init__() + + def forward(self, x: torch.Tensor) -> torch.Tensor: + raise NotImplementedError + + # def backward(self, loss: torch.Tensor) -> None: + # loss.backward() + + def update_parameters( + self, shared_gradients: Dict[str, torch.Tensor] + ) -> None: + with torch.no_grad(): + for name, param in self.named_parameters(): + if param.grad is not None: + param.grad = shared_gradients[name] + self.optimizer.step() + self.optimizer.zero_grad() + + +# MLP Agent +class MLPAgent(Agent): + def __init__(self, input_size: int, hidden_size: int, output_size: int): + super(MLPAgent, self).__init__() + self.model = nn.Sequential( + nn.Flatten(), # Add this line to flatten the input + nn.Linear(input_size, hidden_size), + nn.ReLU(), + nn.Linear(hidden_size, output_size), + ) + self.to(device) + self.optimizer = optim.Adam(self.parameters(), lr=0.001) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + logger.debug(f"MLPAgent input shape: {x.shape}") + output = self.model(x) + logger.debug(f"MLPAgent output shape: {output.shape}") + return output + + +# CNN Agent +class CNNAgent(Agent): + def __init__(self, input_channels: int, num_classes: int): + super(CNNAgent, self).__init__() + self.model = nn.Sequential( + nn.Conv2d(input_channels, 16, kernel_size=3, padding=1), + nn.ReLU(), + nn.Flatten(), + nn.Linear(16 * 28 * 28, num_classes), + ) + self.to(device) + self.optimizer = optim.Adam(self.parameters(), lr=0.001) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + logger.debug(f"CNNAgent input shape: {x.shape}") + output = self.model(x) + logger.debug(f"CNNAgent output shape: {output.shape}") + return output + + +# LSTM Agent +class LSTMAgent(Agent): + def __init__(self, input_size: int, hidden_size: int, output_size: int): + super(LSTMAgent, self).__init__() + self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True) + self.fc = nn.Linear(hidden_size, output_size) + self.to(device) + self.optimizer = optim.Adam(self.parameters(), lr=0.001) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + logger.debug(f"LSTMAgent input shape: {x.shape}") + # Reshape input: (batch, channels, height, width) -> (batch, height, width * channels) + x = x.view(x.size(0), x.size(2), -1) + lstm_out, _ = self.lstm(x) + output = self.fc(lstm_out[:, -1, :]) + logger.debug(f"LSTMAgent output shape: {output.shape}") + return output + + +# Transformer Agent +class TransformerAgent(Agent): + def __init__( + self, input_size: int, num_heads: int, num_layers: int, output_size: int + ): + super(TransformerAgent, self).__init__() + self.embedding = nn.Linear(input_size, 128) + encoder_layer = nn.TransformerEncoderLayer(d_model=128, nhead=num_heads) + self.transformer_encoder = nn.TransformerEncoder( + encoder_layer, num_layers=num_layers + ) + self.fc = nn.Linear(128, output_size) + self.to(device) + self.optimizer = optim.Adam(self.parameters(), lr=0.001) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + logger.debug(f"TransformerAgent input shape: {x.shape}") + # Reshape input: (batch, channels, height, width) -> (batch, height, width * channels) + x = x.view(x.size(0), x.size(2), -1) + x = self.embedding(x) + x = x.permute(1, 0, 2) # (sequence_length, batch_size, embedding_dim) + transformer_out = self.transformer_encoder(x) + transformer_out = transformer_out.permute( + 1, 0, 2 + ) # Back to (batch_size, sequence_length, embedding_dim) + output = self.fc(transformer_out[:, -1, :]) + logger.debug(f"TransformerAgent output shape: {output.shape}") + return output + + +# Initialize Agents +def initialize_agents( + num_mlp_agents: int, + num_cnn_agents: int, + num_lstm_agents: int, + num_transformer_agents: int, + input_sizes: Dict[str, Any], + output_size: int, +) -> List[Agent]: + agents: List[Agent] = [] + + # MLP Agents + for _ in range(num_mlp_agents): + agent = MLPAgent( + input_size=input_sizes["mlp"]["input_size"], + hidden_size=input_sizes["mlp"]["hidden_size"], + output_size=output_size, + ) + agents.append(agent) + + # CNN Agents + for _ in range(num_cnn_agents): + agent = CNNAgent( + input_channels=input_sizes["cnn"]["input_channels"], + num_classes=output_size, + ) + agents.append(agent) + + # LSTM Agents + for _ in range(num_lstm_agents): + agent = LSTMAgent( + input_size=input_sizes["lstm"]["input_size"], + hidden_size=input_sizes["lstm"]["hidden_size"], + output_size=output_size, + ) + agents.append(agent) + + # Transformer Agents + for _ in range(num_transformer_agents): + agent = TransformerAgent( + input_size=input_sizes["transformer"]["input_size"], + num_heads=input_sizes["transformer"]["num_heads"], + num_layers=input_sizes["transformer"]["num_layers"], + output_size=output_size, + ) + agents.append(agent) + + logger.info(f"Initialized {len(agents)} agents.") + return agents + + +# Aggregate Outputs +def aggregate_agent_outputs(agent_outputs: List[torch.Tensor]) -> torch.Tensor: + # Simple average of outputs + logger.debug(f"Aggregating outputs from {len(agent_outputs)} agents.") + stacked_outputs = torch.stack(agent_outputs) + logger.debug(f"Stacked outputs shape: {stacked_outputs.shape}") + global_output = torch.mean(stacked_outputs, dim=0) + logger.debug(f"Global output shape: {global_output.shape}") + return global_output + + +# Compute Loss +def compute_loss( + global_output: torch.Tensor, targets: torch.Tensor +) -> torch.Tensor: + criterion = nn.CrossEntropyLoss() + loss = criterion(global_output, targets) + logger.debug(f"Computed loss: {loss.item()}") + return loss + + +# Compute Agent-specific Loss (Optional) +def compute_agent_loss(agent: Agent, loss: torch.Tensor) -> torch.Tensor: + # For simplicity, all agents share the same loss + return loss + + +# Aggregate Gradients +def aggregate_gradients(agents: List[Agent]) -> Dict[str, torch.Tensor]: + # Average gradients across all agents + shared_gradients: Dict[str, torch.Tensor] = {} + num_agents = len(agents) + for name, param in agents[0].named_parameters(): + if param.grad is not None: + shared_gradients[name] = param.grad.clone() / num_agents + for other_agent in agents[1:]: + shared_gradients[name] += ( + other_agent._parameters[name].grad.clone() / num_agents + ) + logger.debug("Aggregated gradients.") + return shared_gradients + + +# Evaluate Performance +def evaluate_swarm_performance( + agents: List[Agent], validation_loader: DataLoader +) -> None: + correct = 0 + total = 0 + with torch.no_grad(): + for data in validation_loader: + inputs, labels = data + inputs, labels = inputs.to(device), labels.to(device) + agent_outputs = [] + for agent in agents: + output = agent(inputs) + agent_outputs.append(output) + global_output = aggregate_agent_outputs(agent_outputs) + _, predicted = torch.max(global_output.data, 1) + total += labels.size(0) + correct += (predicted == labels).sum().item() + accuracy = 100 * correct / total + logger.info(f"Validation Accuracy: {accuracy:.2f}%") + + +# Main Training Loop +def train_swarm( + agents: List[Agent], + train_loader: DataLoader, + validation_loader: DataLoader, + num_epochs: int, + evaluation_interval: int, +) -> None: + for epoch in range(num_epochs): + for i, data in enumerate(train_loader, 0): + inputs, targets = data + inputs, targets = inputs.to(device), targets.to(device) + + agent_outputs = [] + total_loss = 0 + + # Each agent processes the data + for agent in agents: + agent.optimizer.zero_grad() + agent_output = agent(inputs) + agent_outputs.append(agent_output) + + # Compute individual agent loss + agent_loss = compute_loss(agent_output, targets) + total_loss += agent_loss.item() + + # Backward pass and update for each agent + agent_loss.backward() + agent.optimizer.step() + + # Aggregate outputs (for logging purposes) + global_output = aggregate_agent_outputs(agent_outputs) + + # Log the average loss + avg_loss = total_loss / len(agents) + logger.debug(f"Batch [{i}] Average loss: {avg_loss:.4f}") + + # Evaluate performance + if (epoch + 1) % evaluation_interval == 0: + logger.info(f"Epoch [{epoch + 1}/{num_epochs}]") + evaluate_swarm_performance(agents, validation_loader) + + +# Example Usage +if __name__ == "__main__": + # Hyperparameters + num_mlp_agents = 2 + num_cnn_agents = 2 + num_lstm_agents = 2 + num_transformer_agents = 2 + num_epochs = 10 + evaluation_interval = 1 + batch_size = 64 + output_size = 10 # For example, number of classes in classification + + # Input sizes for different agents + input_sizes = { + "mlp": {"input_size": 784, "hidden_size": 128}, # Example for MNIST + "cnn": {"input_channels": 1}, + "lstm": { + "input_size": 28, + "hidden_size": 128, + }, # Sequence length for MNIST rows + "transformer": {"input_size": 28, "num_heads": 4, "num_layers": 2}, + } + + # Initialize agents + agents = initialize_agents( + num_mlp_agents, + num_cnn_agents, + num_lstm_agents, + num_transformer_agents, + input_sizes, + output_size, + ) + + # Load and preprocess data + from torchvision import datasets, transforms + + transform = transforms.Compose([transforms.ToTensor()]) + + train_dataset = datasets.MNIST( + root="./data", train=True, download=True, transform=transform + ) + validation_dataset = datasets.MNIST( + root="./data", train=False, download=True, transform=transform + ) + + train_loader = DataLoader( + train_dataset, batch_size=batch_size, shuffle=True + ) + validation_loader = DataLoader( + validation_dataset, batch_size=batch_size, shuffle=False + ) + + # Train swarm + train_swarm( + agents, train_loader, validation_loader, num_epochs, evaluation_interval + )