Skip to content

Commit

Permalink
CONT-281: Updating group IDs of sessions fails for large numbers of s…
Browse files Browse the repository at this point in the history
…essions.
  • Loading branch information
Henning-Schulz committed Nov 11, 2019
1 parent deedda8 commit ff91495
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
1 change: 1 addition & 0 deletions clustinator/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(self, rabbitmq_host, rabbitmq_port, elastic_host, timeout):

connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_host, port=rabbitmq_port, heartbeat=timeout))
channel = connection.channel()
channel.basic_qos(prefetch_count = 1) # receive at most 1 unacked message at a time
self.channel = channel

clustering_queue = 'continuity.clustinator.task.clustinator.cluster'
Expand Down
11 changes: 9 additions & 2 deletions clustinator/session_matrix_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,18 @@ def create(self):

return matrix

def update_group_ids(self, sessions_per_cluster):
def update_group_ids(self, sessions_per_cluster, chunk_size = 2000):
"""
Updates the group IDs of the sessions as specified in the passed dict.
:param sessions_per_cluster: Dict of session IDs per group (cluster) ID.
:param chunk_size: The number of sessions to update with one request. Defaults to 2000.
"""

for beh_id, session_ids in sessions_per_cluster.items():
self._elastic.set_group_ids(beh_id, session_ids)

for chunk in [
session_ids[i * chunk_size:(i + 1) * chunk_size]
for i in range((len(session_ids) + chunk_size - 1) // chunk_size )
]:

self._elastic.set_group_ids(beh_id, chunk)

0 comments on commit ff91495

Please sign in to comment.