-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Performance analysis of cascade operations #660
Comments
For the environment configuration, I'm using a modified version of the agents index event generator utility ( The groups' generation is set to a max of 500 groups, from which each agent will have assigned 128 different groups # Generate 500 unique group names
unique_groups = [f'group{i}' for i in range(500)]
random.shuffle(unique_groups) And the OS distribution is calculated based on the number of events to generate, in this case we will use 50k events def generate_random_data(number):
data = []
num_windows = int(0.5 * number)
num_macos = int(0.15 * number)
num_linux = number - num_windows - num_macos
... Some other modifications were made to meet these requirements, I'm sharing the complete script below event generator script#!/bin/python3
import datetime
import json
import logging
import random
import requests
import urllib3
# Constants and Configuration
LOG_FILE = 'generate_data.log'
GENERATED_DATA_FILE = 'generatedData.json'
DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
# Default values
INDEX_NAME = "wazuh-agents"
USERNAME = "admin"
PASSWORD = "admin"
IP = "127.0.0.1"
PORT = "9200"
# Configure logging
logging.basicConfig(filename=LOG_FILE, level=logging.INFO)
# Suppress warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Generate 500 unique group names
unique_groups = [f'group{i}' for i in range(500)]
random.shuffle(unique_groups)
def generate_random_date():
start_date = datetime.datetime.now()
end_date = start_date - datetime.timedelta(days(10))
random_date = start_date + (end_date - start_date) * random.random()
return random_date.strftime(DATE_FORMAT)
def generate_random_groups():
return random.sample(unique_groups, 128)
def generate_random_agent(agent_type):
agent = {
'id': f'agent{random.randint(0, 99999)}',
'name': f'Agent{random.randint(0, 99999)}',
'type': agent_type,
'version': f'v{random.randint(0, 9)}-stable',
'status': random.choice(['active', 'inactive']),
'last_login': generate_random_date(),
'groups': generate_random_groups(),
'key': f'key{random.randint(0, 99999)}',
'host': generate_random_host(agent_type)
}
return agent
def generate_random_host(agent_type):
families = {
'linux': ['debian', 'ubuntu', 'centos', 'redhat'],
'windows': ['windows'],
'macos': ['macos', 'ios']
}
family = random.choice(families[agent_type])
version = f'{random.randint(0, 99)}.{random.randint(0, 99)}'
host = {
'architecture': random.choice(['x86_64', 'arm64']),
'boot': {
'id': f'boot{random.randint(0, 9999)}'
},
'cpu': {
'usage': random.uniform(0, 100)
},
'disk': {
'read': {
'bytes': random.randint(0, 1000000)
},
'write': {
'bytes': random.randint(0, 1000000)
}
},
'domain': f'domain{random.randint(0, 999)}',
'geo': {
'city_name': random.choice(['San Francisco', 'New York', 'Berlin', 'Tokyo']),
'continent_code': random.choice(['NA', 'EU', 'AS']),
'continent_name': random.choice(['North America', 'Europe', 'Asia']),
'country_iso_code': random.choice(['US', 'DE', 'JP']),
'country_name': random.choice(['United States', 'Germany', 'Japan']),
'location': {
'lat': round(random.uniform(-90.0, 90.0), 6),
'lon': round(random.uniform(-180.0, 180.0), 6)
},
'name': f'geo{random.randint(0, 999)}',
'postal_code': f'{random.randint(10000, 99999)}',
'region_iso_code': f'region{random.randint(0, 999)}',
'region_name': f'Region {random.randint(0, 999)}',
'timezone': random.choice(['PST', 'EST', 'CET', 'JST'])
},
'hostname': f'host{random.randint(0, 9999)}',
'id': f'hostid{random.randint(0, 9999)}',
'ip': f'{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}',
'mac': f'{random.randint(0, 255):02x}:{random.randint(0, 255):02x}:{random.randint(0, 255):02x}:{random.randint(0, 255):02x}:{random.randint(0, 255):02x}:{random.randint(0, 255):02x}',
'name': f'hostname{random.randint(0, 9999)}',
'network': {
'egress': {
'bytes': random.randint(0, 1000000),
'packets': random.randint(0, 1000000)
},
'ingress': {
'bytes': random.randint(0, 1000000),
'packets': random.randint(0, 1000000)
}
},
'os': {
'family': family,
'full': f'{family} {version}',
'kernel': f'kernel{random.randint(0, 999)}',
'name': family,
'platform': agent_type,
'type': agent_type,
'version': version
},
'pid_ns_ino': f'{random.randint(1000000, 9999999)}',
'risk': {
'calculated_level': random.choice(['low', 'medium', 'high']),
'calculated_score': random.uniform(0, 100),
'calculated_score_norm': random.uniform(0, 1),
'static_level': random.choice(['low', 'medium', 'high']),
'static_score': random.uniform(0, 100),
'static_score_norm': random.uniform(0, 1)
},
'uptime': random.randint(0, 1000000)
}
return host
def generate_random_data(number):
data = []
num_windows = int(0.5 * number)
num_macos = int(0.15 * number)
num_linux = number - num_windows - num_macos
for _ in range(num_windows):
event_data = {
'agent': generate_random_agent('windows')
}
data.append(event_data)
for _ in range(num_macos):
event_data = {
'agent': generate_random_agent('macos')
}
data.append(event_data)
for _ in range(num_linux):
event_data = {
'agent': generate_random_agent('linux')
}
data.append(event_data)
return data
def inject_events(ip, port, index, username, password, data):
url = f'https://{ip}:{port}/{index}/_doc'
session = requests.Session()
session.auth = (username, password)
session.verify = False
headers = {'Content-Type': 'application/json'}
try:
for event_data in data:
response = session.post(url, json=event_data, headers=headers)
if response.status_code != 201:
logging.error(f'Error: {response.status_code}')
logging.error(response.text)
break
logging.info('Data injection completed successfully.')
except Exception as e:
logging.error(f'Error: {str(e)}')
def main():
try:
number = int(input("How many events do you want to generate? "))
except ValueError:
logging.error("Invalid input. Please enter a valid number.")
return
logging.info(f"Generating {number} events...")
data = generate_random_data(number)
with open(GENERATED_DATA_FILE, 'a') as outfile:
for event_data in data:
json.dump(event_data, outfile)
outfile.write('\n')
logging.info('Data generation completed.')
inject = input("Do you want to inject the generated data into your indexer? (y/n) ").strip().lower()
if inject == 'y':
ip = input(f"Enter the IP of your Indexer (default: '{IP}'): ") or IP
port = input(f"Enter the port of your Indexer (default: '{PORT}'): ") or PORT
index = input(f"Enter the index name (default: '{INDEX_NAME}'): ") or INDEX_NAME
username = input(f"Username (default: '{USERNAME}'): ") or USERNAME
password = input(f"Password (default: '{PASSWORD}'): ") or PASSWORD
inject_events(ip, port, index, username, password, data)
if __name__ == "__main__":
main() For the generation of the documents we can use the Script to use for the events generation import requests
# Configuration
CLUSTER_URL = "http://localhost:9200"
AGENTS_URL = f"{CLUSTER_URL}/wazuh-agents/_search"
COMMANDS_URL = f"{CLUSTER_URL}/_plugins/_command_manager/commands"
USERNAME = "admin"
PASSWORD = "admin"
def get_agents_ids():
try:
response = requests.get(AGENTS_URL, auth=(USERNAME, PASSWORD))
response.raise_for_status()
agents_data = response.json()
agents_ids = [hit['_id'] for hit in agents_data['hits']['hits']]
return agents_ids
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
return []
def send_commands(agent_ids, num_commands):
commands = []
for agent_id in agent_ids:
for _ in range(num_commands):
command = {
"source": "Engine",
"user": "user53",
"target": {
"id": agent_id,
"type": "agent"
},
"action": {
"name": "restart",
"args": {
"arg1": "/path/to/executable/arg6"
},
"version": "v4"
},
"timeout": 30
}
commands.append(command)
payload = {"commands": commands}
headers = {'Content-Type': 'application/json'}
try:
response = requests.post(COMMANDS_URL, json=payload, headers=headers, auth=(USERNAME, PASSWORD))
response.raise_for_status()
print(f"Succesfully sent {len(commands)} commands.")
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
def main():
try:
num_commands = int(input("Enter the number of commands to generate for each agent: "))
except ValueError:
print("Invalid input. Please enter a valid number.")
return
agents_ids = get_agents_ids()
if agents_ids:
print(f"Total agents retrieved: {len(agents_ids)}")
send_commands(agents_ids, num_commands)
else:
print("No agents found on wazuh-agents index.")
if __name__ == "__main__":
main() |
Description
As a preliminary step towards migrating Wazuh's RBAC from the Server to the Indexer, we need to be aware about the performance of the Indexer on cascade operations involving the change of agents' groups.
A single agent can generate hundreds to thousands of events that end up on indexes. These documents (events) are tied to a single agent, comprising a one-to-one relationship, meaning that a document in an index can only belong to an agent. In order to depict this relationship in the indices, every document contains the
agent.id
as a primary key that allows these entities to be correlated. Every document also has the fieldagent.groups
to:The main drawback of this design is that when any agent changes its groups, all the data belonging to that agent until that moment needs to be updated with the new groups of the agent.
To better understand the problem, let's imagine an environment with 50K agents, 10K documents per agent and day, over 30 days.
Over a month, such an environment would have 15K million documents. On a hypothetical, but possible, update of every agent's group, the Indexer would need to perform 15K million update operations as a result.
Environment details
Plan
The text was updated successfully, but these errors were encountered: