diff --git a/clustinator/receiver.py b/clustinator/receiver.py index 5bbf989..8950d41 100644 --- a/clustinator/receiver.py +++ b/clustinator/receiver.py @@ -26,6 +26,7 @@ def __init__(self, rabbitmq_host, rabbitmq_port, elastic_host, timeout): connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_host, port=rabbitmq_port, heartbeat=timeout)) channel = connection.channel() + channel.basic_qos(prefetch_count = 1) # receive at most 1 unacked message at a time self.channel = channel clustering_queue = 'continuity.clustinator.task.clustinator.cluster' diff --git a/clustinator/session_matrix_creator.py b/clustinator/session_matrix_creator.py index 2292549..a43f2cc 100644 --- a/clustinator/session_matrix_creator.py +++ b/clustinator/session_matrix_creator.py @@ -31,11 +31,18 @@ def create(self): return matrix - def update_group_ids(self, sessions_per_cluster): + def update_group_ids(self, sessions_per_cluster, chunk_size = 2000): """ Updates the group IDs of the sessions as specified in the passed dict. :param sessions_per_cluster: Dict of session IDs per group (cluster) ID. + :param chunk_size: The number of sessions to update with one request. Defaults to 2000. """ for beh_id, session_ids in sessions_per_cluster.items(): - self._elastic.set_group_ids(beh_id, session_ids) + + for chunk in [ + session_ids[i * chunk_size:(i + 1) * chunk_size] + for i in range((len(session_ids) + chunk_size - 1) // chunk_size ) + ]: + + self._elastic.set_group_ids(beh_id, chunk)