diff --git a/operators/cluster_embeddings/__init__.py b/operators/cluster_embeddings/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/operators/cluster_embeddings/cluster_embeddings.py b/operators/cluster_embeddings/cluster_embeddings.py new file mode 100644 index 0000000..16d7aec --- /dev/null +++ b/operators/cluster_embeddings/cluster_embeddings.py @@ -0,0 +1,112 @@ +""" +Operator to cluster embeddings using KMeans, Affinity Propagation, and Agglomerative clustering algorithms +""" + +def initialize(param): + """ + Initializes the operator. + + Args: + param (dict): Parameters for initialization + """ + global KMeans_clustering, Agglomerative_clustering, AffinityPropagation_clustering + global gen_data + + # Imports + from sklearn.cluster import KMeans, AffinityPropagation, AgglomerativeClustering + import numpy as np + + # Constants + RANDOM_STATE = 50 + + def gen_data(payloads, labels): + """ + Generates formatted output data. + + Args: + payloads (list): List of payloads + labels (np.ndarray): An array of cluster labels + + Returns: + dict: A dictionary mapping cluster labels to corresponding array of payloads + """ + out = {} + for label, payload in zip(labels, payloads): + key = f'cluster_{label}' + if key not in out: + out[key] = [] + out[key].append(payload) + return out + + def KMeans_clustering(matrix, n_clusters): + """ + Clusters given embeddings using KMeans clustering algorithm. + + Args: + matrix (list[list]): list of embeddings + n_clusters (int): number of clusters + + Returns: + numpy.ndarray: An array of cluster labels for each embedding + """ + return KMeans(n_clusters=n_clusters, random_state=RANDOM_STATE).fit_predict(np.array(matrix)) + + def Agglomerative_clustering(matrix, n_clusters): + """ + Clusters given embeddings using Agglomerative clustering algorithm. + + Args: + matrix (list[list]): list of embeddings + n_clusters (int): number of clusters + + Returns: + numpy.ndarray: An array of cluster labels for each embedding + """ + return AgglomerativeClustering(n_clusters=n_clusters).fit_predict(np.array(matrix)) + + def AffinityPropagation_clustering(matrix): + """ + Clusters given embeddings using Affinity Propagation algorithm (used if the number of clusters is unknown). + + Args: + matrix (list[list]): list of embeddings + + Returns: + numpy.ndarray: An array of cluster labels for each embedding + """ + return AffinityPropagation(random_state=RANDOM_STATE).fit_predict(np.array(matrix)) + +def run(input_data, n_clusters=None, modality='audio'): + """ + Runs the operator. + + Args: + input_data (list[dict]): List of data with each dictionary containing `embedding` and `payload` properties + n_clusters (int, optional): Number of clusters. Defaults to None + modality (str, optional): Source modality of embeddings. Defaults to 'audio' + + Returns: + dict: A dictionary mapping cluster labels to corresponding array of payloads + + Raises: + ValueError: Modality should be either `audio` or `video` + KeyError: Each data point in input must have `embedding` and `payload` properties + """ + # Parse data: + try: + matrix, payloads = zip(*[(data['embedding'], data['payload']) for data in input_data]) + except KeyError as e: + raise KeyError(f"Invalid data. Each data point in input must have `embedding` and `payload` properties. Missing key: {e}.") + + # Delegate appropriate clustering algorithm for the given params: + if n_clusters: + n_clusters = int(n_clusters) # cast it to int + if modality == 'audio': + labels = KMeans_clustering(matrix=matrix, n_clusters=n_clusters) + elif modality == 'video': + labels = Agglomerative_clustering(matrix=matrix, n_clusters=n_clusters) + else: + raise ValueError("Invalid modality. Modality should be either `audio` or `video`.") + else: + labels = AffinityPropagation_clustering(matrix=matrix) + return gen_data(payloads=payloads, labels=labels) # format output diff --git a/operators/cluster_embeddings/pyproject.toml b/operators/cluster_embeddings/pyproject.toml new file mode 100644 index 0000000..fa3acbb --- /dev/null +++ b/operators/cluster_embeddings/pyproject.toml @@ -0,0 +1,20 @@ +[project] +name = "feluda-cluster-embeddings" +version = "0.0.0" +requires-python = ">=3.10" +dependencies = ["scikit-learn>=1.5.1", "numpy>=2.2.1"] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.semantic_release] +version_variable = ["pyproject.toml:project.version"] + +[tool.semantic_release.branches.main] +match = "main" +prerelease = false +tag_format = "{name}-{version}" + +[tool.hatch.build.targets.wheel] +packages = ["."] diff --git a/operators/cluster_embeddings/test_cluster_embeddings.py b/operators/cluster_embeddings/test_cluster_embeddings.py new file mode 100644 index 0000000..dd4591a --- /dev/null +++ b/operators/cluster_embeddings/test_cluster_embeddings.py @@ -0,0 +1,44 @@ +import unittest +from operators.cluster_embeddings import cluster_embeddings + +# Test constants +MOCK_DATA = [ + {"payload": "A", "embedding": [0, 1]}, + {"payload": "B", "embedding": [1, 0]}, + {"payload": "C", "embedding": [100, 101]}, + {"payload": "D", "embedding": [101, 100]} +] +EXPECTED_CLUSTERS = [["A", "B"], ["C", "D"]] + +class Test(unittest.TestCase): + @classmethod + def setUpClass(cls): + # initialize operator + param = {} + cluster_embeddings.initialize(param) + + @classmethod + def tearDownClass(cls): + # delete config files + pass + + def test_kmeans_clustering(self): + result = cluster_embeddings.run(input_data=MOCK_DATA, n_clusters=2, modality="audio") + self.assertIn("cluster_0", result) + self.assertIn("cluster_1", result) + self.assertEqual(len(result), 2) + self.assertCountEqual([result["cluster_0"], result["cluster_1"]], EXPECTED_CLUSTERS) + + def test_agglomerative_clustering(self): + result = cluster_embeddings.run(input_data=MOCK_DATA, n_clusters=2, modality="video") + self.assertIn("cluster_0", result) + self.assertIn("cluster_1", result) + self.assertEqual(len(result), 2) + self.assertCountEqual([result["cluster_0"], result["cluster_1"]], EXPECTED_CLUSTERS) + + def test_affinity_propagation(self): + result = cluster_embeddings.run(input_data=MOCK_DATA, n_clusters=None, modality="audio") + self.assertIn("cluster_0", result) + self.assertIn("cluster_1", result) + self.assertEqual(len(result), 2) + self.assertCountEqual([result["cluster_0"], result["cluster_1"]], EXPECTED_CLUSTERS) diff --git a/operators/dimension_reduction/__init__.py b/operators/dimension_reduction/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/operators/dimension_reduction/dimension_reduction.py b/operators/dimension_reduction/dimension_reduction.py new file mode 100644 index 0000000..1ee8500 --- /dev/null +++ b/operators/dimension_reduction/dimension_reduction.py @@ -0,0 +1,179 @@ +"""Operator to perform dimensionality reduction given the embedddings.""" + +from abc import ABC, abstractmethod +from sklearn.manifold import TSNE +import numpy as np + + +class DimensionReduction(ABC): + """Abstract base class for dimension reduction techniques.""" + + @abstractmethod + def initialize(self, params): + pass + + @abstractmethod + def run(self, embeddings): + pass + + +class TSNEReduction(DimensionReduction): + """t-SNE implementation of the DimensionReduction abstract class.""" + + def initialize(self, params): + """ + Initialize the t-SNE model with parameters. + + Args: + params (dict): A dictionary containing t-SNE parameters such as: + - n_components (int): Number of dimensions to reduce to. Default is 2. + - perplexity (float): Perplexity parameter for t-SNE. Default is 30. + - learning_rate (float): Learning rate for t-SNE. Default is 150. + - n_iter (int): Number of iterations for optimization. Default is 1000. + - random_state (int): Seed for random number generation. Default is 42. + - method (str): Algorithm to use for gradient calculation. Default is barnes_hut + + Raises: + ValueError: If the t-SNE model fails to initialize. + """ + try: + self.model = TSNE( + n_components=params.get('n_components', 2), + perplexity=params.get('perplexity', 30), + learning_rate=params.get('learning_rate', 150), + max_iter=params.get('max_iter', 1000), + random_state=params.get('random_state', 42), + method=params.get('method', 'barnes_hut') + ) + print("t-SNE model successfully initialized") + except Exception as e: + raise ValueError(f"Failed to initialize t-SNE model: {e}") + + def run(self, embeddings_array): + """ + Apply the t-SNE model to reduce the dimensionality of embeddings. + + Args: + embeddings (list or numpy.ndarray): A list or array of embeddings to be reduced. + + Returns: + numpy.ndarray: The reduced embeddings as a 2D array. + + Raises: + ValueError: If the embeddings input is not a 2D array. + RuntimeError: If the t-SNE reduction fails. + """ + try: + if embeddings_array.ndim != 2: + raise ValueError("Embeddings should be a 2D array.") + return self.model.fit_transform(embeddings_array) + except Exception as e: + raise RuntimeError(f"t-SNE reduction failed: {e}") + + +class DimensionReductionFactory: + """Factory class for creating dimension reduction models.""" + + @staticmethod + def get_reduction_model(model_type): + """ + Factory method to create a dimension reduction model based on type. + + Args: + model_type (str): String indicating the type of model (e.g., 'tsne'). + + Returns: + DimensionReduction: An instance of the corresponding dimension reduction model. + + Raises: + ValueError: If the specified model type is unsupported. + """ + if model_type.lower() == 'tsne': + return TSNEReduction() + else: + raise ValueError(f"Unsupported model type: {model_type}") + + +def gen_data(payloads, reduced_embeddings): + """ + Generates the formatted output. + + Args: + payloads (list): List of paylods. + reduced_embeddings (nd.array): An array of reduced embeddings. + + Returns: + list: A list of dictionaries containing the payload and corresponding embedding. + """ + out = [] + + for payload, reduced_embedding in zip(payloads, reduced_embeddings): + tmp_dict = {} + tmp_dict['payload'] = payload + tmp_dict['reduced_embedding'] = reduced_embedding.tolist() + out.append(tmp_dict) + return out + + +def initialize(params): + """ + Initialize the dimension reduction model with provided type and parameters. + + Args: + params (dict): Dictionary of parameters for the model initialization. + + """ + global reduction_model + reduction_model = DimensionReductionFactory.get_reduction_model(params.get('model_type', 'tsne')) + reduction_model.initialize(params) + + +def run(input_data): + """ + Reduce the dimensionality of the provided embeddings using the initialized model. + + Args: + input_data (list): A list of dictionaries containing payload and embeddings to be reduced. + Example: + + [ + { + "payload": "123", + "embedding": [1, 2, 3] + }, + { + "payload": "124", + "embedding": [1, 0, 1] + } + ] + + Returns: + list: The reduced embeddings and the corresponding payload as a list of dictionaries. + Example: + + [ + { + "payload":"123", + "reduced_embedding": [1, 2] + }, + { + "payload": "124", + "reduced_embedding": [1, 0] + } + ] + + Raises: + ValueError: If the embeddings input is not a non-empty list. + KeyError: If the input data is invalid. + """ + if not isinstance(input_data, list) or len(input_data) == 0: + raise ValueError("Input should be a non-empty list.") + + try: + embeddings, payloads = zip(*[(data['embedding'], data['payload']) for data in input_data]) + except KeyError as e: + raise KeyError(f"Invalid data. Each data point in input must have `embedding` and `payload` properties. Missing key: {e}.") + + reduced_embeddings = reduction_model.run(np.array(embeddings)) + + return gen_data(payloads, reduced_embeddings) \ No newline at end of file diff --git a/operators/dimension_reduction/pyproject.toml b/operators/dimension_reduction/pyproject.toml new file mode 100644 index 0000000..9f1add5 --- /dev/null +++ b/operators/dimension_reduction/pyproject.toml @@ -0,0 +1,20 @@ +[project] +name = "feluda-dimension-reduction" +version = "0.0.0" +requires-python = ">=3.10" +dependencies = ["scikit-learn>=1.5.1", "numpy>=2.2.1"] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.semantic_release] +version_variable = ["pyproject.toml:project.version"] + +[tool.semantic_release.branches.main] +match = "main" +prerelease = false +tag_format = "{name}-{version}" + +[tool.hatch.build.targets.wheel] +packages = ["."] diff --git a/operators/dimension_reduction/test_dimension_reduction.py b/operators/dimension_reduction/test_dimension_reduction.py new file mode 100644 index 0000000..65ca4d9 --- /dev/null +++ b/operators/dimension_reduction/test_dimension_reduction.py @@ -0,0 +1,50 @@ +import unittest +import numpy as np +from operators.dimension_reduction.dimension_reduction import initialize, run + + +class TestDimensionReductionOperator(unittest.TestCase): + @classmethod + def setUpClass(cls): + # Initialize operator + cls.initial_params = { + 'model_type': 'tsne', + 'n_components': 2, + 'perplexity': 30, + 'learning_rate': 200, + 'max_iter': 250, + 'random_state': 42, + 'method': 'barnes_hut' + } + initialize(cls.initial_params) + + @classmethod + def tearDownClass(cls): + # Clean up if necessary + pass + + def test_tsne_reduction(self): + # Create sample embeddings + sample_embeddings = np.random.rand(100, 50) # 100 samples, 50 dimensions + + input_data = [{'payload': str(i), 'embedding': embedding} for i, embedding in enumerate(sample_embeddings)] + + # Perform reduction + reduced_data = run(input_data) + reduced_embeddings = np.array([d['reduced_embedding'] for d in reduced_data]) + + # Check output shape + self.assertEqual(reduced_embeddings.shape, (100, 2)) # Should reduce to 2D + + def test_invalid_input(self): + # Test with empty list + with self.assertRaises(ValueError): + run([]) + + # Test with non-list input + with self.assertRaises(ValueError): + run("not a list") + + # Test with missing keys in input data + with self.assertRaises(KeyError): + run([{'payload': '123'}]) \ No newline at end of file diff --git a/operators/image_vec_rep_resnet/pyproject.toml b/operators/image_vec_rep_resnet/pyproject.toml index c30e409..638c3cb 100644 --- a/operators/image_vec_rep_resnet/pyproject.toml +++ b/operators/image_vec_rep_resnet/pyproject.toml @@ -5,22 +5,18 @@ requires-python = ">=3.10" dependencies = [ "torch>=2.5.1", "torchvision>=0.20.1", - "numpy>=2.1.3", - "pillow>=11.0.0", - "memray>=1.14.0", + "numpy>=2.2.1", + "pillow>=11.1.0", + "memray>=1.15.0", "pyinstrument>=5.0.0", ] [build-system] -requires = [ - "hatchling", -] +requires = ["hatchling"] build-backend = "hatchling.build" [tool.semantic_release] -version_variable = [ - "pyproject.toml:project.version", -] +version_variable = ["pyproject.toml:project.version"] [tool.semantic_release.branches.main] match = "main" @@ -28,6 +24,4 @@ prerelease = false tag_format = "{name}-{version}" [tool.hatch.build.targets.wheel] -packages = [ - ".", -] +packages = ["."] diff --git a/operators/vid_vec_rep_clip/__init__.py b/operators/vid_vec_rep_clip/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/operators/vid_vec_rep_clip/pyproject.toml b/operators/vid_vec_rep_clip/pyproject.toml new file mode 100644 index 0000000..793c37c --- /dev/null +++ b/operators/vid_vec_rep_clip/pyproject.toml @@ -0,0 +1,25 @@ +[project] +name = "feluda-vid-vec-rep-clip" +version = "0.0.0" +requires-python = ">=3.10" +dependencies = [ + "torch>=2.5.1", + "torchvision>=0.20.1", + "transformers>=4.44.0", + "pillow>=11.1.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.semantic_release] +version_variable = ["pyproject.toml:project.version"] + +[tool.semantic_release.branches.main] +match = "main" +prerelease = false +tag_format = "{name}-{version}" + +[tool.hatch.build.targets.wheel] +packages = ["."] diff --git a/operators/vid_vec_rep_clip/test_vid_vec_rep_clip.py b/operators/vid_vec_rep_clip/test_vid_vec_rep_clip.py new file mode 100644 index 0000000..dac1b29 --- /dev/null +++ b/operators/vid_vec_rep_clip/test_vid_vec_rep_clip.py @@ -0,0 +1,33 @@ +import unittest +from unittest.case import skip +from operators.vid_vec_rep_clip import vid_vec_rep_clip +from feluda.models.media_factory import VideoFactory + +class Test(unittest.TestCase): + @classmethod + def setUpClass(cls): + # initialize operator + param = {} + vid_vec_rep_clip.initialize(param) + + @classmethod + def tearDownClass(cls): + # delete config files + pass + + @skip + def test_sample_video_from_disk(self): + video_path = VideoFactory.make_from_file_on_disk( + r"core/operators/sample_data/sample-cat-video.mp4" + ) + result = vid_vec_rep_clip.run(video_path) + for vec in result: + self.assertEqual(len(vec.get("vid_vec")), 512) + + # @skip + def test_sample_video_from_url(self): + video_url = "https://tattle-media.s3.amazonaws.com/test-data/tattle-search/cat_vid_2mb.mp4" + video_path = VideoFactory.make_from_url(video_url) + result = vid_vec_rep_clip.run(video_path) + for vec in result: + self.assertEqual(len(vec.get("vid_vec")), 512) diff --git a/operators/vid_vec_rep_clip/vid_vec_rep_clip.py b/operators/vid_vec_rep_clip/vid_vec_rep_clip.py new file mode 100644 index 0000000..f64e738 --- /dev/null +++ b/operators/vid_vec_rep_clip/vid_vec_rep_clip.py @@ -0,0 +1,160 @@ +""" +Operator to extract video vector representations using CLIP-ViT-B-32. +""" + +def initialize(param): + """ + Initializes the operator. + + Args: + param (dict): Parameters for initialization + """ + print("Installing packages for vid_vec_rep_clip") + global os + global VideoAnalyzer, gendata + + # Imports + import os + import subprocess + import tempfile + import torch + from PIL import Image + from transformers import AutoProcessor, CLIPModel + + # Load the model and processor + processor = AutoProcessor.from_pretrained("openai/clip-vit-base-patch32") + model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") + + # Set the device + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + model.to(device) + + def gendata(vid_analyzer): + """ + Yields video vector representations from a `VideoAnalyzer` prototype. + + Args: + vid_analyzer (VideoAnalyzer): `VideoAnalyzer` instance + + Yields: + dict: A dictionary containing: + - `vid_vec` (list): Vector representation + - `is_avg` (bool): A flag indicating whether the vector is the average vector or a I-frame vector + """ + # average vector + yield { + "vid_vec": vid_analyzer.get_mean_feature().tolist(), + "is_avg": True, + } + # I-frame vectors + for keyframe in vid_analyzer.feature_matrix: + yield { + "vid_vec": keyframe.tolist(), + "is_avg": False, + } + + class VideoAnalyzer: + """ + A class for video feature extraction. + """ + def __init__(self, fname): + """ + Constructor for the `VideoAnalyzer` class. + + Args: + fname (str): Path to the video file + """ + self.model = model + self.device = device + self.frame_images = [] + self.feature_matrix = [] + self.analyze(fname) + + def get_mean_feature(self): + """ + Returns: + torch.Tensor: Mean feature vector + """ + return torch.mean(self.feature_matrix, dim=0) + + def analyze(self, fname): + """ + Analyzes the video file and extracts features. + + Args: + fname (str): Path to the video file + + Raises: + FileNotFoundError: If the file is not found + """ + # check if file exists + if not os.path.exists(fname): + raise FileNotFoundError(f"File not found: {fname}") + + # Extract I-frames and features + self.frame_images = self.extract_frames(fname) + self.feature_matrix = self.extract_features(self.frame_images) + + def extract_frames(self, fname): + """ + Extracts I-frames from the video file using `ffmpeg`. + + Args: + fname (str): Path to the video file + + Returns: + list: List of PIL Images + """ + with tempfile.TemporaryDirectory() as temp_dir: + # Command to extract I-frames using ffmpeg's command line tool + cmd=f""" + ffmpeg -i "{fname}" -vf "select=eq(pict_type\,I)" -vsync vfr "{temp_dir}/frame_%05d.jpg" + """ + with subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as process: + process.wait() + frames = [] + for filename in os.listdir(temp_dir): + if filename.endswith((".jpg")): + image_path = os.path.join(temp_dir, filename) + with Image.open(image_path) as img: + frames.append(img.copy()) + return frames + + def extract_features(self, images): + """ + Extracts features from a list of images using pre-trained CLIP-ViT-B-32. + + Args: + images (list): List of PIL Images + + Returns: + torch.Tensor: Feature matrix of shape (batch, 512) + """ + inputs = processor(images=images, return_tensors="pt", padding=True, truncation=True) + inputs = {k: v.to(self.device) for k, v in inputs.items()} # move to device + with torch.no_grad(): + features = self.model.get_image_features(**inputs) + return features + +def run(file): + """ + Runs the operator. + + Args: + file (dict): `VideoFactory` file object + + Returns: + generator: Yields video and I-frame vector representations + """ + fname = file["path"] + try: + vid_analyzer = VideoAnalyzer(fname) + return gendata(vid_analyzer) + finally: + os.remove(fname) + +def cleanup(param): + pass + +def state(): + pass diff --git a/pyproject.toml b/pyproject.toml index 74eb364..8c53ac9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ version = "0.9.0" readme = "README.md" requires-python = ">=3.10" dependencies = [ - "boto3>=1.35.63", + "boto3>=1.35.91", "dacite>=1.8.1", "pydub>=0.25.1", "pyyaml>=6.0.2", @@ -14,6 +14,10 @@ dependencies = [ "toml>=0.10.2", ] +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + [project.optional-dependencies] dev = [ "python-semantic-release>=9.14.0", @@ -25,15 +29,15 @@ dev = [ "pyproject-pre-commit[ruff]>=0.3.6", ] -[build-system] -requires = [ - "hatchling", -] -build-backend = "hatchling.build" - [tool.semantic_release] -version_variable = [ - "pyproject.toml:project.version", +version_variable = ["pyproject.toml:project.version"] + +[tool.uv.workspace] +members = [ + "operators/image_vec_rep_resnet", + "operators/vid_vec_rep_clip", + "operators/cluster_embeddings", + "operators/dimension_reduction", ] [tool.semantic_release.branches.main] @@ -41,12 +45,5 @@ match = "main" prerelease = false tag_format = "{name}-{version}" -[tool.uv.workspace] -members = [ - "operators/image_vec_rep_resnet", -] - [tool.hatch.build.targets.wheel] -packages = [ - "feluda", -] +packages = ["feluda"]