From ff91495e93d8c023f86ba49f08fe71b6d6a90f5b Mon Sep 17 00:00:00 2001 From: Henning Schulz Date: Mon, 11 Nov 2019 16:21:44 +0100 Subject: [PATCH] CONT-281: Updating group IDs of sessions fails for large numbers of sessions. --- clustinator/receiver.py | 1 + clustinator/session_matrix_creator.py | 11 +++++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/clustinator/receiver.py b/clustinator/receiver.py index 5bbf989..8950d41 100644 --- a/clustinator/receiver.py +++ b/clustinator/receiver.py @@ -26,6 +26,7 @@ def __init__(self, rabbitmq_host, rabbitmq_port, elastic_host, timeout): connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_host, port=rabbitmq_port, heartbeat=timeout)) channel = connection.channel() + channel.basic_qos(prefetch_count = 1) # receive at most 1 unacked message at a time self.channel = channel clustering_queue = 'continuity.clustinator.task.clustinator.cluster' diff --git a/clustinator/session_matrix_creator.py b/clustinator/session_matrix_creator.py index 2292549..a43f2cc 100644 --- a/clustinator/session_matrix_creator.py +++ b/clustinator/session_matrix_creator.py @@ -31,11 +31,18 @@ def create(self): return matrix - def update_group_ids(self, sessions_per_cluster): + def update_group_ids(self, sessions_per_cluster, chunk_size = 2000): """ Updates the group IDs of the sessions as specified in the passed dict. :param sessions_per_cluster: Dict of session IDs per group (cluster) ID. + :param chunk_size: The number of sessions to update with one request. Defaults to 2000. """ for beh_id, session_ids in sessions_per_cluster.items(): - self._elastic.set_group_ids(beh_id, session_ids) + + for chunk in [ + session_ids[i * chunk_size:(i + 1) * chunk_size] + for i in range((len(session_ids) + chunk_size - 1) // chunk_size ) + ]: + + self._elastic.set_group_ids(beh_id, chunk)