Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle pending queries #1172

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
50 changes: 23 additions & 27 deletions modules/threat_intelligence/threat_intelligence.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
# SPDX-FileCopyrightText: 2021 Sebastian Garcia <[email protected]>
# SPDX-License-Identifier: GPL-2.0-only
import asyncio
import ipaddress
import multiprocessing
import os
import json
import threading
import time
import asyncio
from uuid import uuid4
import validators
from typing import (
Expand Down Expand Up @@ -68,10 +67,8 @@ def init(self):
self.get_all_blacklisted_ip_ranges()
self.urlhaus = URLhaus(self.db)
self.spamhaus = Spamhaus(self.db)
self.pending_queries = multiprocessing.Queue()
self.pending_circllu_calls_thread = threading.Thread(
target=self.handle_pending_queries, daemon=True
)
self.pending_queries = asyncio.Queue()
asyncio.create_task(self.handle_pending_queries())
self.circllu = Circllu(self.db, self.pending_queries)

def get_all_blacklisted_ip_ranges(self):
Expand Down Expand Up @@ -1744,13 +1741,12 @@ def update_local_file(self, filename):
self.db.set_ti_feed_info(filename, malicious_file_info)
return True

def handle_pending_queries(self):
async def handle_pending_queries(self):
"""Processes the pending Circl.lu queries stored in the queue.
This method runs as a daemon thread, executing a batch of up to 10
queries every 2 minutes. After processing a batch, it waits for
another 2 minutes before attempting the next batch of queries.
This method continuously checks the queue for new items and
processes them accordingly.
This method runs as an asynchronous coroutine, executing a batch of
up to 10 queries at a time. It continuously checks the queue for
new items, processes them concurrently, and sleeps for 2 seconds
if the queue is empty.

Side Effects:
- Calls `is_malicious_hash` for each flow information
Expand All @@ -1760,20 +1756,20 @@ def handle_pending_queries(self):
"""
max_queries = 10
while True:
time.sleep(120)
try:
flow_info = self.pending_queries.get(timeout=0.5)
except Exception:
# queue is empty wait extra 2 mins
continue

queries_done = 0
while (
not self.pending_queries.empty()
and queries_done <= max_queries
):
self.is_malicious_hash(flow_info)
queries_done += 1
tasks = []

while len(tasks) < max_queries:
try:
flow_info = self.pending_queries.get_nowait()
tasks.append(asyncio.create_task(
self.is_malicious_hash(flow_info)))
except asyncio.QueueEmpty:
break

if tasks:
await asyncio.gather(*tasks)
else:
await asyncio.sleep(2)

def should_lookup(self, ip: str, protocol: str, ip_state: str) -> bool:
"""Return whether slips should lookup the given ip or not."""
Expand Down
2 changes: 1 addition & 1 deletion tests/test_slips_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_get_sha256_hash():
# a file that we know doesn't change
assert (
utils.get_sha256_hash("modules/template/__init__.py")
== "2d12747a3369505a4d3b722a0422f8ffc8af5514355cdb0eb18178ea7071b8d0"
== "683de4e72614dd4947e5f3b5889e12fa15bf6d5b4c5978683bad78f3c6ad5695"
)


Expand Down
Loading