Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update client.py #296

Closed
wants to merge 4 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 67 additions & 45 deletions scripts/data/client.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,58 @@
#!/usr/bin/env python3

from dataclasses import dataclass
import argparse
import json
import re
import logging
import os
import threading
import queue
import argparse
import random
import re
import signal
import subprocess
import logging
import threading
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from pathlib import Path
import random
from logging.handlers import TimedRotatingFileHandler

from generate_data import generate_data
from format_args import format_args
import logging
from logging.handlers import TimedRotatingFileHandler

logger = logging.getLogger(__name__)

# Constants
MAX_WEIGHT_LIMIT = 8000 # Total weight limit for all jobs
THREAD_POOL_SIZE = os.cpu_count() # Number of threads for processing
QUEUE_MAX_SIZE = THREAD_POOL_SIZE * 2 # Maximum size of the job queue

BASE_DIR = Path(".client_cache")
BASE_DIR = Path(".client_cache") # Shared state variables

# Shared state variables
current_weight = 0
weight_lock = threading.Condition()
job_queue = queue.Queue(maxsize=QUEUE_MAX_SIZE)

# Create an event for graceful shutdown
shutdown_event = threading.Event()


def handle_sigterm(signum, frame):
logger.info("Received SIGTERM signal. Shutting down gracefully...")
shutdown_event.set() # Set the event to signal threads to exit


# Register the SIGTERM handler
signal.signal(signal.SIGTERM, handle_sigterm)


# Function to calculate weight of a block
def calculate_batch_weight(block_data, mode):
if mode == "light":
return len(block_data["blocks"])
else:
return sum(
len(tx["inputs"]) + len(tx["outputs"])
for block in block_data["blocks"]
for tx in block["data"]["transactions"]
)
return sum(
len(tx["inputs"]) + len(tx["outputs"])
for block in block_data["blocks"]
for tx in block["data"]["transactions"]
)


@dataclass
Expand Down Expand Up @@ -70,15 +81,13 @@ def job_generator(start, blocks, step, mode, strategy):
for height in height_range:
try:
batch_file = BASE_DIR / f"{mode}_{height}_{step}.json"

batch_data = generate_data(
mode=mode, initial_height=height, num_blocks=step, fast=True
)

Path(batch_file).write_text(json.dumps(batch_data, indent=2))

batch_weight = calculate_batch_weight(batch_data, mode)
yield Job(height, step, mode, batch_weight, batch_file), batch_weight

except Exception as e:
logger.error(f"Error while generating data for: {height}:\n{e}")

Expand Down Expand Up @@ -113,6 +122,7 @@ def process_batch(job):
or "panicked" in result.stdout
):
error = result.stdout or result.stderr

if result.returncode == -9:
match = re.search(r"gas_spent=(\d+)", result.stdout)
gas_info = (
Expand All @@ -122,16 +132,19 @@ def process_batch(job):
)
error = f"Return code -9, killed by OOM?{gas_info}"
message = error

else:
error_match = re.search(r"error='([^']*)'", error)
message = error_match.group(1) if error_match else ""

logger.error(f"{job} error: {message}")
logger.debug(f"Full error while processing: {job}:\n{error}")

else:
match = re.search(r"gas_spent=(\d+)", result.stdout)
gas_info = f"gas spent: {int(match.group(1))}" if match else "no gas info found"
logger.info(f"{job} done, {gas_info}")

if not match:
logger.warning(f"{job}: not gas info found")

Expand All @@ -142,76 +155,80 @@ def job_producer(job_gen):

try:
for job, weight in job_gen:
# Wait until there is enough weight capacity to add the new block
# Check for shutdown
if shutdown_event.is_set():
logger.info("Shutdown event detected in producer. Exiting...")
break

with weight_lock:
logger.debug(
f"Adding job: {job}, current total weight: {current_weight}..."
)

while (
(current_weight + weight > MAX_WEIGHT_LIMIT)
and current_weight != 0
or job_queue.full()
):
if shutdown_event.is_set():
logger.info("Shutdown event detected while waiting. Exiting...")
return

logger.debug("Producer is waiting for weight to be released.")
weight_lock.wait() # Wait for the condition to be met
weight_lock.wait()

if (current_weight + weight > MAX_WEIGHT_LIMIT) and current_weight == 0:
logger.warning(f"{job} over the weight limit: {MAX_WEIGHT_LIMIT}")

# Add the job to the queue and update the weight
job_queue.put((job, weight))
current_weight += weight
logger.debug(
f"Produced job: {job}, current total weight: {current_weight}"
)

# Notify consumers that a new job is available
weight_lock.notify_all()

finally:
logger.debug("Producer is exiting...")

for _ in range(THREAD_POOL_SIZE):
job_queue.put(None)

with weight_lock:
weight_lock.notify_all()

logger.debug("Consumers notified")


# Consumer function: Processes blocks from the queue
def job_consumer(process_job):
global current_weight

while True:
if shutdown_event.is_set():
logger.info("Shutdown event detected in consumer. Exiting...")
break

try:
logger.debug(
f"Consumer is waiting for a job. Queue length: {job_queue.qsize()}"
)
# Get a job from the queue

work_to_do = job_queue.get(block=True)

if work_to_do is None:
logger.debug("No more work to do, consumer is exiting.")
job_queue.task_done()
break

(job, weight) = work_to_do

# Process the block
try:
logger.debug(f"Executing job: {job}...")
process_job(job)

except Exception as e:
logger.error(f"Error while processing job: {job}:\n{e}")

with weight_lock:
current_weight -= weight
logger.debug(
f"Finished processing job, current total weight: {current_weight}"
)
weight_lock.notify_all() # Notify producer to add more jobs
with weight_lock:
current_weight -= weight

weight_lock.notify_all()

# Mark job as done
job_queue.task_done()

except Exception as e:
Expand All @@ -220,7 +237,6 @@ def job_consumer(process_job):


def main(start, blocks, step, mode, strategy):

logger.info(
"Starting client, initial height: %d, blocks: %d, step: %d, mode: %s, strategy: %s",
start,
Expand All @@ -229,6 +245,7 @@ def main(start, blocks, step, mode, strategy):
mode,
strategy,
)

logger.info(
"Max weight limit: %d, Thread pool size: %d, Queue max size: %d",
MAX_WEIGHT_LIMIT,
Expand Down Expand Up @@ -260,14 +277,11 @@ def main(start, blocks, step, mode, strategy):


if __name__ == "__main__":

parser = argparse.ArgumentParser(description="Run client script")

parser.add_argument("--start", type=int, required=True, help="Start block height")
parser.add_argument(
"--blocks",
type=int,
default=1,
help="Number of blocks to process",
"--blocks", type=int, default=1, help="Number of blocks to process"
)
parser.add_argument(
"--step", type=int, default=1, help="Step size for block processing"
Expand All @@ -292,34 +306,42 @@ def main(start, blocks, step, mode, strategy):

MAX_WEIGHT_LIMIT = args.maxweight

# file_handler = logging.FileHandler("client.log")
file_handler = TimedRotatingFileHandler(
filename="client.log",
when="midnight",
interval=1,
backupCount=14,
encoding="utf8",
)

file_handler.setLevel(logging.INFO)

file_handler.setFormatter(
logging.Formatter("%(asctime)s - %(name)-10.10s - %(levelname)s - %(message)s")
)

console_handler = logging.StreamHandler()

console_handler.setLevel(logging.DEBUG)

console_handler.setFormatter(
logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
)

root_logger = logging.getLogger()

root_logger.addHandler(console_handler)

root_logger.addHandler(file_handler)

if args.verbose:
root_logger.setLevel(logging.DEBUG)

else:
root_logger.setLevel(logging.INFO)

logging.getLogger("urllib3").setLevel(logging.WARNING)

logging.getLogger("generate_data").setLevel(logging.WARNING)

main(args.start, args.blocks, args.step, args.mode, args.strategy)
Loading