Skip to content

Commit

Permalink
small rewrite of the scan_archive step, to make use of a Queue to dis…
Browse files Browse the repository at this point in the history
…tribute over the workers
  • Loading branch information
ThomasLecocq committed Jul 30, 2024
1 parent 9a67c55 commit 49641ce
Showing 1 changed file with 30 additions and 70 deletions.
100 changes: 30 additions & 70 deletions msnoise/s01scan_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,61 +410,27 @@ def get_data_structure(config_data_st):
return None


def spawn_processes(pool, nproc, dir_list, scan_func, scan_args):
"""
Spawn nproc processes that will execute scan_func(scan_args).
Return a list of multiprocessing.pool.AsyncResult objects that
represent the list of children to wait for.
"""
# Get the ceiling of (len(dir_list) / nproc) (w/o importing math.ceil)
def worker(queue, scan_func, scan_args):
while True:
folder_slice = queue.get()
if folder_slice is None: # Sentinel value to signal the worker to exit
break
print(scan_args)
scan_func([folder_slice], *scan_args)

n = int((len(dir_list) + nproc - 1) / nproc)

# Shuffling the folders to (try to) mix small and large folders:
random.shuffle(dir_list)
def spawn_processes(queue, nproc, scan_func, scan_args):
children = []
for i in range(nproc):
folder_slice = dir_list[n*i:n*(i+1)]
logger.debug('Spawning a child to process {} of the {} directories.'
.format(len(folder_slice), len(dir_list)))
children.append(pool.apply_async(scan_func,
[folder_slice] + list(scan_args)))
for _ in range(nproc):
p = multiprocessing.Process(target=worker, args=(queue, scan_func, scan_args))
p.start()
children.append(p)
return children


def await_children(pool, children):
"""
Wait for children to return.
Watch multiprocessing.pool.AsyncResult object in 'children' until all
children are finished. If any of the children raise an exception or
exits anormally, terminate the pool. Re-raise any exception raised by a
child.
"""
try:
while children:
# Wait a few seconds for each child to return but don't block
# so we catch any child error early and terminate all children.
# Note: once python 2 support in msnoise will be dropped,
# consider replacing this loop by error_callback arguments to
# apply_async (unavailable in python 2).
for result in children:
try:
rc = result.get(timeout=1)
# The child has terminated without raising any exception:
# remove it from the list of children to wait for
children.remove(result)
except multiprocessing.TimeoutError:
# AsyncResult.get() reached its wait timeout: do nothing
pass
except Exception:
# AsyncResult.get() re-raised an exception raised in the child
logger.debug('A child process has raised an exception:'
' terminating and re-raising it.')
pool.terminate()
pool.join()
raise # this re-raises the last active exception
def await_children(children):
for child in children:
child.join()


def scan_archive(folder_globs, nproc, mintime, startdate, enddate,
Expand Down Expand Up @@ -497,27 +463,21 @@ def scan_archive(folder_globs, nproc, mintime, startdate, enddate,
scan_folders(dir_list, mintime, startdate, enddate, goal_sampling_rate,
archive_format)
else:
# In multiprocessing mode, we split the folders into nproc lists of
# similar size and have them processed by as many child processes.
# (This reduces the number of process spawning, as it is a slow
# operation.)

# Note: consider using a context manager instead of try/except
# once python 2 support will be dropped in msnoise:
# with multiprocessing.Pool(processes=nproc) as pool:
pool = multiprocessing.Pool(processes=nproc)

# MSNoise 2 trying to revamp in Queue ... pfffffffff:
# the_queue = multiprocessing.Queue()
# pool = multiprocessing.Pool(nproc, scan_folders, (the_queue,))

# Launch nproc children working on a mostly equal number of folders
children = spawn_processes(pool, nproc, dir_list, scan_folders,
(mintime, startdate, enddate,
goal_sampling_rate, archive_format,
logger.level))
# Wait for children to finish, or terminate them if one crashes
await_children(pool, children)
scan_func = scan_folders
scan_args = (mintime, startdate, enddate, goal_sampling_rate, archive_format, logger.level)
# Shuffle directories and put them in a queue
random.shuffle(dir_list)
queue = multiprocessing.Queue()
for folder in dir_list:
queue.put(folder)

# Add a sentinel value for each worker
for _ in range(nproc):
queue.put(None)
print(dir_list)

children = spawn_processes(queue, nproc, scan_func, scan_args)
await_children(children)


def parse_crondays(crondays):
Expand Down

0 comments on commit 49641ce

Please sign in to comment.