Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

moving operators #479

Merged
merged 4 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
112 changes: 112 additions & 0 deletions operators/cluster_embeddings/cluster_embeddings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""
Operator to cluster embeddings using KMeans, Affinity Propagation, and Agglomerative clustering algorithms
"""

def initialize(param):
"""
Initializes the operator.

Args:
param (dict): Parameters for initialization
"""
global KMeans_clustering, Agglomerative_clustering, AffinityPropagation_clustering
global gen_data

# Imports
from sklearn.cluster import KMeans, AffinityPropagation, AgglomerativeClustering
import numpy as np

# Constants
RANDOM_STATE = 50

def gen_data(payloads, labels):
"""
Generates formatted output data.

Args:
payloads (list): List of payloads
labels (np.ndarray): An array of cluster labels

Returns:
dict: A dictionary mapping cluster labels to corresponding array of payloads
"""
out = {}
for label, payload in zip(labels, payloads):
key = f'cluster_{label}'
if key not in out:
out[key] = []
out[key].append(payload)
return out

def KMeans_clustering(matrix, n_clusters):
"""
Clusters given embeddings using KMeans clustering algorithm.

Args:
matrix (list[list]): list of embeddings
n_clusters (int): number of clusters

Returns:
numpy.ndarray: An array of cluster labels for each embedding
"""
return KMeans(n_clusters=n_clusters, random_state=RANDOM_STATE).fit_predict(np.array(matrix))

def Agglomerative_clustering(matrix, n_clusters):
"""
Clusters given embeddings using Agglomerative clustering algorithm.

Args:
matrix (list[list]): list of embeddings
n_clusters (int): number of clusters

Returns:
numpy.ndarray: An array of cluster labels for each embedding
"""
return AgglomerativeClustering(n_clusters=n_clusters).fit_predict(np.array(matrix))

def AffinityPropagation_clustering(matrix):
"""
Clusters given embeddings using Affinity Propagation algorithm (used if the number of clusters is unknown).

Args:
matrix (list[list]): list of embeddings

Returns:
numpy.ndarray: An array of cluster labels for each embedding
"""
return AffinityPropagation(random_state=RANDOM_STATE).fit_predict(np.array(matrix))

def run(input_data, n_clusters=None, modality='audio'):
"""
Runs the operator.

Args:
input_data (list[dict]): List of data with each dictionary containing `embedding` and `payload` properties
n_clusters (int, optional): Number of clusters. Defaults to None
modality (str, optional): Source modality of embeddings. Defaults to 'audio'

Returns:
dict: A dictionary mapping cluster labels to corresponding array of payloads

Raises:
ValueError: Modality should be either `audio` or `video`
KeyError: Each data point in input must have `embedding` and `payload` properties
"""
# Parse data:
try:
matrix, payloads = zip(*[(data['embedding'], data['payload']) for data in input_data])
except KeyError as e:
raise KeyError(f"Invalid data. Each data point in input must have `embedding` and `payload` properties. Missing key: {e}.")

# Delegate appropriate clustering algorithm for the given params:
if n_clusters:
n_clusters = int(n_clusters) # cast it to int
if modality == 'audio':
labels = KMeans_clustering(matrix=matrix, n_clusters=n_clusters)
elif modality == 'video':
labels = Agglomerative_clustering(matrix=matrix, n_clusters=n_clusters)
else:
raise ValueError("Invalid modality. Modality should be either `audio` or `video`.")
else:
labels = AffinityPropagation_clustering(matrix=matrix)
return gen_data(payloads=payloads, labels=labels) # format output
20 changes: 20 additions & 0 deletions operators/cluster_embeddings/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[project]
name = "feluda-cluster-embeddings"
version = "0.0.0"
requires-python = ">=3.10"
dependencies = ["scikit-learn>=1.5.1", "numpy>=2.2.1"]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.semantic_release]
version_variable = ["pyproject.toml:project.version"]

[tool.semantic_release.branches.main]
match = "main"
prerelease = false
tag_format = "{name}-{version}"

[tool.hatch.build.targets.wheel]
packages = ["."]
44 changes: 44 additions & 0 deletions operators/cluster_embeddings/test_cluster_embeddings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import unittest
from operators.cluster_embeddings import cluster_embeddings

# Test constants
MOCK_DATA = [
{"payload": "A", "embedding": [0, 1]},
{"payload": "B", "embedding": [1, 0]},
{"payload": "C", "embedding": [100, 101]},
{"payload": "D", "embedding": [101, 100]}
]
EXPECTED_CLUSTERS = [["A", "B"], ["C", "D"]]

class Test(unittest.TestCase):
@classmethod
def setUpClass(cls):
# initialize operator
param = {}
cluster_embeddings.initialize(param)

@classmethod
def tearDownClass(cls):
# delete config files
pass

def test_kmeans_clustering(self):
result = cluster_embeddings.run(input_data=MOCK_DATA, n_clusters=2, modality="audio")
self.assertIn("cluster_0", result)
self.assertIn("cluster_1", result)
self.assertEqual(len(result), 2)
self.assertCountEqual([result["cluster_0"], result["cluster_1"]], EXPECTED_CLUSTERS)

def test_agglomerative_clustering(self):
result = cluster_embeddings.run(input_data=MOCK_DATA, n_clusters=2, modality="video")
self.assertIn("cluster_0", result)
self.assertIn("cluster_1", result)
self.assertEqual(len(result), 2)
self.assertCountEqual([result["cluster_0"], result["cluster_1"]], EXPECTED_CLUSTERS)

def test_affinity_propagation(self):
result = cluster_embeddings.run(input_data=MOCK_DATA, n_clusters=None, modality="audio")
self.assertIn("cluster_0", result)
self.assertIn("cluster_1", result)
self.assertEqual(len(result), 2)
self.assertCountEqual([result["cluster_0"], result["cluster_1"]], EXPECTED_CLUSTERS)
Empty file.
179 changes: 179 additions & 0 deletions operators/dimension_reduction/dimension_reduction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
"""Operator to perform dimensionality reduction given the embedddings."""

from abc import ABC, abstractmethod
from sklearn.manifold import TSNE
import numpy as np


class DimensionReduction(ABC):
"""Abstract base class for dimension reduction techniques."""

@abstractmethod
def initialize(self, params):
pass

@abstractmethod
def run(self, embeddings):
pass


class TSNEReduction(DimensionReduction):
"""t-SNE implementation of the DimensionReduction abstract class."""

def initialize(self, params):
"""
Initialize the t-SNE model with parameters.

Args:
params (dict): A dictionary containing t-SNE parameters such as:
- n_components (int): Number of dimensions to reduce to. Default is 2.
- perplexity (float): Perplexity parameter for t-SNE. Default is 30.
- learning_rate (float): Learning rate for t-SNE. Default is 150.
- n_iter (int): Number of iterations for optimization. Default is 1000.
- random_state (int): Seed for random number generation. Default is 42.
- method (str): Algorithm to use for gradient calculation. Default is barnes_hut

Raises:
ValueError: If the t-SNE model fails to initialize.
"""
try:
self.model = TSNE(
n_components=params.get('n_components', 2),
perplexity=params.get('perplexity', 30),
learning_rate=params.get('learning_rate', 150),
max_iter=params.get('max_iter', 1000),
random_state=params.get('random_state', 42),
method=params.get('method', 'barnes_hut')
)
print("t-SNE model successfully initialized")
except Exception as e:
raise ValueError(f"Failed to initialize t-SNE model: {e}")

def run(self, embeddings_array):
"""
Apply the t-SNE model to reduce the dimensionality of embeddings.

Args:
embeddings (list or numpy.ndarray): A list or array of embeddings to be reduced.

Returns:
numpy.ndarray: The reduced embeddings as a 2D array.

Raises:
ValueError: If the embeddings input is not a 2D array.
RuntimeError: If the t-SNE reduction fails.
"""
try:
if embeddings_array.ndim != 2:
raise ValueError("Embeddings should be a 2D array.")
return self.model.fit_transform(embeddings_array)
except Exception as e:
raise RuntimeError(f"t-SNE reduction failed: {e}")


class DimensionReductionFactory:
"""Factory class for creating dimension reduction models."""

@staticmethod
def get_reduction_model(model_type):
"""
Factory method to create a dimension reduction model based on type.

Args:
model_type (str): String indicating the type of model (e.g., 'tsne').

Returns:
DimensionReduction: An instance of the corresponding dimension reduction model.

Raises:
ValueError: If the specified model type is unsupported.
"""
if model_type.lower() == 'tsne':
return TSNEReduction()
else:
raise ValueError(f"Unsupported model type: {model_type}")


def gen_data(payloads, reduced_embeddings):
"""
Generates the formatted output.

Args:
payloads (list): List of paylods.
reduced_embeddings (nd.array): An array of reduced embeddings.

Returns:
list: A list of dictionaries containing the payload and corresponding embedding.
"""
out = []

for payload, reduced_embedding in zip(payloads, reduced_embeddings):
tmp_dict = {}
tmp_dict['payload'] = payload
tmp_dict['reduced_embedding'] = reduced_embedding.tolist()
out.append(tmp_dict)
return out


def initialize(params):
"""
Initialize the dimension reduction model with provided type and parameters.

Args:
params (dict): Dictionary of parameters for the model initialization.

"""
global reduction_model
reduction_model = DimensionReductionFactory.get_reduction_model(params.get('model_type', 'tsne'))
reduction_model.initialize(params)


def run(input_data):
"""
Reduce the dimensionality of the provided embeddings using the initialized model.

Args:
input_data (list): A list of dictionaries containing payload and embeddings to be reduced.
Example:

[
{
"payload": "123",
"embedding": [1, 2, 3]
},
{
"payload": "124",
"embedding": [1, 0, 1]
}
]

Returns:
list: The reduced embeddings and the corresponding payload as a list of dictionaries.
Example:

[
{
"payload":"123",
"reduced_embedding": [1, 2]
},
{
"payload": "124",
"reduced_embedding": [1, 0]
}
]

Raises:
ValueError: If the embeddings input is not a non-empty list.
KeyError: If the input data is invalid.
"""
if not isinstance(input_data, list) or len(input_data) == 0:
raise ValueError("Input should be a non-empty list.")

try:
embeddings, payloads = zip(*[(data['embedding'], data['payload']) for data in input_data])
except KeyError as e:
raise KeyError(f"Invalid data. Each data point in input must have `embedding` and `payload` properties. Missing key: {e}.")

reduced_embeddings = reduction_model.run(np.array(embeddings))

return gen_data(payloads, reduced_embeddings)
20 changes: 20 additions & 0 deletions operators/dimension_reduction/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[project]
name = "feluda-dimension-reduction"
version = "0.0.0"
requires-python = ">=3.10"
dependencies = ["scikit-learn>=1.5.1", "numpy>=2.2.1"]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.semantic_release]
version_variable = ["pyproject.toml:project.version"]

[tool.semantic_release.branches.main]
match = "main"
prerelease = false
tag_format = "{name}-{version}"

[tool.hatch.build.targets.wheel]
packages = ["."]
Loading
Loading