Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement engine="thread" in ChunkRecordingExecutor #3526

Open
wants to merge 27 commits into
base: main
Choose a base branch
from

Conversation

samuelgarcia
Copy link
Member

@samuelgarcia samuelgarcia commented Nov 8, 2024

@h-mayorquin @zm711 @alejoe91
I wanted to do this since a long time : implement thread for ChunkRecordingExecutor.

Now we can do :

# this use threads
job_kwargs = dict(chunk_duration="1s", pool_engine="thread", n_jobs=8, progress_bar=True)
# this use processes
job_kwargs = dict(chunk_duration="1s", pool_engine="process", n_jobs=8, progress_bar=True)

Maybe this will help a lot windows user and will avoid use "spawn" for multiprocessing (which is too slow at startup).
My bet is that thread will have IO (read data from disk) and computation withou the GIL without computing lib, so it should good enought for parralell computing. Lets see.

Here a very first as a proof of concept. (Without any real test).

TODO:

  • add engine thread
  • add worker_index mechanism in job_tools.py (this will be usefull for multi GPU)
  • rename max_threads_per_process to max_threads_per_worker. See note.
  • discuss renaming n_jobs to n_worker or num_worker or max_worker (Alessio will be unhappy)
  • make a get_best_job_kwargs() that will be platform dependant.

When using engine thread we will need to disambigu the concept of worker that can be thread and max_threads_per_worker which related to the hidden thread use by computing lib like numpy/scipy/blas/lapcak/sklearn.

EDIT:

  • on linux thread are a bit slower than process with fork but not so much.

Important notes:
many ProcessPoolExecutor are hard coded in several places (principal_components, split, merge...)
we should also do this but this will be a separated PR. because theses parallelisation are not on the recording axis.
Note that tricky stuff between loops, thread and process is initialzing variable to a private dict per worker. This is done in ChunkRecordingExecutor but this should be propagated for other use cases that do not need recording slices.

@alejoe91 alejoe91 added core Changes to core module concurrency Related to parallel processing labels Nov 11, 2024
@samuelgarcia
Copy link
Member Author

After a long fight this is now working.
The threadpool was making the progress_bar randomly freezing.
I found a simple trick but it took long time.

@samuelgarcia
Copy link
Member Author

@zm711 : if you have time could you make some test using windows ?
I will also do it.

@zm711
Copy link
Collaborator

zm711 commented Nov 20, 2024

@zm711 : if you have time could you make some test using windows ?

Yep. Will do once tests pass :)

@samuelgarcia samuelgarcia marked this pull request as ready for review November 20, 2024 13:35
@samuelgarcia
Copy link
Member Author

@alejoe91 @zm711 @chrishalcrow @h-mayorquin : ready for review.

Also we should discuss changing n_jobs to something else.
job is ambiguous worker is better.
This semantic come from joblib but I think the smantic is unclear.
A "job" is more something push to slurm.
In my mind job=task but NOT the the numer of people doing the task queue (they are workers). No ?

@alejoe91
Copy link
Member

failing tests are really weird. The arrays that are not equal look exactly the same from the values

Copy link
Collaborator

@zm711 zm711 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few tweaks as well.

I haven't look deeply but since Windows and Macs have a different mp context at baseline do we think that could be contributing or that this is just floating point issues?

src/spikeinterface/core/job_tools.py Outdated Show resolved Hide resolved
src/spikeinterface/core/job_tools.py Outdated Show resolved Hide resolved
src/spikeinterface/core/job_tools.py Outdated Show resolved Hide resolved
@zm711
Copy link
Collaborator

zm711 commented Nov 20, 2024

script in case people want to know

import spikeinterface.full as si
sorting, rec = si.generate_ground_truth_recording()
analyzer = si.create_sorting_analyzer(rec, sorting)
analyzer.compute(['random_spikes', 'waveforms'])
%timeit analyzer.compute(['principal_components']) 
# or
%timeit analyzer.compute(['principal_components'], n_jobs=2)

Okay with default settings on this branch using the generate_ground_truth_recording

n_jobs=1
67 ms ± 4.35 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
n_jobs=2
47.9 s ± 971 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# explicitly setting pool_engine=='thread'
48.4 s ± 1.23 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

compared to going back to main

n_jobs=1
78.3 ms ± 27.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
n_jobs=2
48.6 s ± 890 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

I'm thinking this might be deeper than our code. Let me know if you want a different type of test Sam. For real data with n_jobs we often have PCA take 1-2 hours. So I'm not currently testing with real data. Do we want me to play around with n_job number and threads?

@samuelgarcia
Copy link
Member Author

Hi Zach and Alessio. Thanks fo testing. pca is not the best parralelisation we have.
for me detect_peaks or save() or better illustration. But this is ok.

Yes, this failing test are super strange because it is a floating point issue which is not the same with multiprocessing or thread. This hard to figure.

As a side note : I modified the waveforms_tools that estimate templates with both thread and process and this do not give the same results. I need to set decimals=4 for almost equal which is very easy to pass. I did not except this. Maybe we have somthing deep with parralisation that can change the result.
Lets discuss this with a call.

@zm711
Copy link
Collaborator

zm711 commented Nov 21, 2024

I can test save at some point this week on Windows.

@samuelgarcia
Copy link
Member Author

test are passing now

@zm711
Copy link
Collaborator

zm711 commented Nov 22, 2024

I forgot.... easy testing of save is not really possible on Windows due to the inability to overwrite files still open in the process so I can't use timeit. I will need to try to write a script. I can try detect_peaks though.

@zm711
Copy link
Collaborator

zm711 commented Nov 22, 2024

okay detect_peaks

on main

n_jobs=1
43.6 ms ± 335 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
n_jobs=2
3.86 s ± 361 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

with this pr

n_jobs=2
164 ms ± 3.14 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
n_jobs=2, and explicitly setting pool_engine='thread'
162 ms ± 515 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

So an improvement but still worse than not using multiprocessing.

@zm711
Copy link
Collaborator

zm711 commented Nov 22, 2024

Now trying a longer simulated recording to see if the more realistic length if we see a big difference.

object is

GroundTruthRecording: 4 channels - 25.0kHz - 1 segments - 250,000,000 samples
                      10,000.00s (2.78 hours) - float32 dtype - 3.73 GiB
n_jobs=1
43.6 s ± 329 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
n_jobs=4, pool_engine='thread'
1min 21s ± 821 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

@samuelgarcia
Copy link
Member Author

merci Zach.

How to document this is a big topic. The parralelism was quite empirical until now. For big linux station, it faster with many channel and many core but do not scale we reach a plateau quite soon in speed.
We should have a benchmark script somewhere which is able to generate some figures from time to time to be injected in the doc.

@yger
Copy link
Collaborator

yger commented Nov 27, 2024

@samuelgarcia . i'm testing it and i have a bug

SorterClass._run_from_folder(sorter_output_folder, sorter_params, verbose)

File "/media/cure/Secondary/pierre/softwares/spikeinterface/src/spikeinterface/sorters/internal/spyking_circus2.py", line 190, in _run_from_folder
prototype = get_prototype(
^^^^^^^^^^^^^^
File "/media/cure/Secondary/pierre/softwares/spikeinterface/src/spikeinterface/sorters/internal/spyking_circus2.py", line 428, in get_prototype
res = detect_peaks(
^^^^^^^^^^^^^
File "/media/cure/Secondary/pierre/softwares/spikeinterface/src/spikeinterface/sortingcomponents/peak_detection.py", line 133, in detect_peaks
outs = run_node_pipeline(
^^^^^^^^^^^^^^^^^^
File "/media/cure/Secondary/pierre/softwares/spikeinterface/src/spikeinterface/core/node_pipeline.py", line 586, in run_node_pipeline
processor.run(recording_slices=recording_slices)
File "/media/cure/Secondary/pierre/softwares/spikeinterface/src/spikeinterface/core/job_tools.py", line 538, in run
recording_slices2 = [(thread_local_data, ) + args for args in recording_slices]
~~~~~~~~~~~~~~~~~~~~~~^~~~~~
TypeError: unsupported operand type(s) for +: '_thread._local' and 'int'

@yger
Copy link
Collaborator

yger commented Nov 27, 2024

It works only if you modify L537 in job_tools.py by

recording_slices2 = [(thread_local_data, ) + tuple(args) for args in recording_slices]

(need to add the tuple() )

@chrishalcrow
Copy link
Collaborator

Hello, just some quick benchmarking on a M4 Mac, doing detect_peaks on a 96 channel, 20 minute recording (just once per set-up)

THIS PR:
n_jobs = 1 : 22s
n_jobs = 4, "thread" : 26s
n_jobs = 4, "spawn" : 12s

CURRENT MAIN:
n_jobs = 1 : 26s
n_jobs = 4 : 11s

So the "thread" option doesn't seem to be working as expected for this system.

@yger
Copy link
Collaborator

yger commented Nov 28, 2024

Did some benchmarks on linux, and while it is slower than process, this is working and we do get a speedup. However, @samuelgarcia, I think you should also extend the possibilities to the get_poolexecutor() in job_tools.py. Some functions in components are relying on this, and maybe we should propagate the thread option there also

@samuelgarcia
Copy link
Member Author

I did more tests on windows on a quite old machine i5-4460 3.2Ghz (4 cores)

Running this simple example

    from spikeinterface.generation import generate_drifting_recording
    from spikeinterface.sortingcomponents.peak_detection import detect_peaks
    from spikeinterface import get_noise_levels
    import time

    all_job_kwargs = [
        dict(pool_engine="process", n_jobs=2, mp_context="spawn", max_threads_per_worker=2),
        dict(pool_engine="process", n_jobs=4, mp_context="spawn", max_threads_per_worker=1),
        dict(pool_engine="thread", n_jobs=4, mp_context=None, max_threads_per_worker=1),
        dict(pool_engine="thread", n_jobs=2, mp_context=None, max_threads_per_worker=2),
        dict(n_jobs=1),
    ]

    

    rec, _, sorting = generate_drifting_recording(
        num_units=50,
        duration=120.0,
        sampling_frequency=30000.0,
        probe_name="Neuropixel-128",

    )
    # print(rec)

    noise_levels = get_noise_levels(rec, return_scaled=False)
    for job_kwargs in all_job_kwargs:
        print()
        print(job_kwargs)
        t0 = time.perf_counter()
        peaks = detect_peaks(rec, method="locally_exclusive", noise_levels=noise_levels, **job_kwargs)
        t1 = time.perf_counter()
        print("time included the spawn:", t1-t0)

Give this

{'pool_engine': 'process', 'n_jobs': 2, 'mp_context': 'spawn', 'max_threads_per_worker': 2}
detect peaks using locally_exclusive: 100%|██████████████████████████████| 120/120 [00:26<00:00,  4.53it/s] 
time included the spawn: 28.41631959999995

{'pool_engine': 'process', 'n_jobs': 4, 'mp_context': 'spawn', 'max_threads_per_worker': 1}
detect peaks using locally_exclusive: 100%|██████████████████████████████| 120/120 [00:17<00:00,  6.92it/s] 
time included the spawn: 21.655267000000094

{'pool_engine': 'thread', 'n_jobs': 4, 'mp_context': None, 'max_threads_per_worker': 1}
detect peaks using locally_exclusive: 100%|██████████████████████████████| 120/120 [00:18<00:00,  6.38it/s] 
time included the spawn: 18.809355100000175

{'pool_engine': 'thread', 'n_jobs': 2, 'mp_context': None, 'max_threads_per_worker': 2}
detect peaks using locally_exclusive: 100%|██████████████████████████████| 120/120 [00:25<00:00,  4.62it/s] 
time included the spawn: 25.975980100000015

{'n_jobs': 1}
detect peaks using locally_exclusive: 100%|██████████████████████████████| 120/120 [00:41<00:00,  2.87it/s] 
time included the spawn: 41.7998663000003

So for me 4 conclusions:

  • a clear benefit of thread over process because the total time included the spawn is bigger than the progress_bar time
  • several worker (n_jobs) is faster that one
  • not so much difference in it/s for "thread" vs "process" in same situation.
  • it do not linearly scales

Note: n_jobs=1 do not limit max_threads_per_worker underlying libs are threading under the wood.

@chrishalcrow @zm711 could you rerun the same code on your machine because we clearly do not have the results.

If someone have a stronger windows machine I would be happy to change n_jobs=4 to something bigger.

@zm711
Copy link
Collaborator

zm711 commented Jan 7, 2025

I'm processing something now, but I can run your exact script on mine with n=8 to see what happens in your condition. For me I found an improvement with increasing channels. So I think you and I agree for Windows.

I found it was worse with low channel counts, but better with high channel counts. See this comment where I tested higher channel count :)

So that means we need to run the test on mac. I could also do that from mine (which is M1pro) but Chris has a newer he could try with.

@samuelgarcia
Copy link
Member Author

On a bigger machine with 40cores with on Linux.

with the same script but n = 10 or 20 or 40 with conter balance the max_threads_per_worker.
(to have max_threads_per_worker*n_jobs=40 always).

I have this.

{'pool_engine': 'process', 'n_jobs': 10, 'mp_context': 'fork', 'max_threads_per_worker': 4}
detect peaks using locally_exclusive: 100%|█████████████████| 120/120 [00:07<00:00, 16.31it/s]
time included the spawn: 7.659971084445715

{'pool_engine': 'process', 'n_jobs': 20, 'mp_context': 'fork', 'max_threads_per_worker': 2}
detect peaks using locally_exclusive: 100%|█████████████████| 120/120 [00:04<00:00, 25.03it/s]
time included the spawn: 5.780991319566965

{'pool_engine': 'process', 'n_jobs': 40, 'mp_context': 'fork', 'max_threads_per_worker': 1}
detect peaks using locally_exclusive: 100%|█████████████████| 120/120 [00:05<00:00, 23.18it/s]
time included the spawn: 7.160590501502156

{'pool_engine': 'process', 'n_jobs': 10, 'mp_context': 'spawn', 'max_threads_per_worker': 4}
detect peaks using locally_exclusive: 100%|█████████████████| 120/120 [00:07<00:00, 16.89it/s]
time included the spawn: 7.443390963599086

{'pool_engine': 'process', 'n_jobs': 20, 'mp_context': 'spawn', 'max_threads_per_worker': 2}
detect peaks using locally_exclusive: 100%|█████████████████| 120/120 [00:04<00:00, 24.57it/s]
time included the spawn: 5.915252136066556

{'pool_engine': 'process', 'n_jobs': 40, 'mp_context': 'spawn', 'max_threads_per_worker': 1}
detect peaks using locally_exclusive: 100%|█████████████████| 120/120 [00:05<00:00, 22.33it/s]
time included the spawn: 7.943920761346817

{'pool_engine': 'thread', 'n_jobs': 10, 'mp_context': None, 'max_threads_per_worker': 4}
detect peaks using locally_exclusive: 100%|█████████████████| 120/120 [00:09<00:00, 12.03it/s]
time included the spawn: 9.9882453661412

{'pool_engine': 'thread', 'n_jobs': 20, 'mp_context': None, 'max_threads_per_worker': 2}
detect peaks using locally_exclusive: 100%|█████████████████| 120/120 [00:12<00:00,  9.34it/s]
time included the spawn: 12.85539048165083

{'pool_engine': 'thread', 'n_jobs': 40, 'mp_context': None, 'max_threads_per_worker': 1}
detect peaks using locally_exclusive: 100%|█████████████████| 120/120 [00:07<00:00, 16.17it/s]
time included the spawn: 7.429490776732564

{'n_jobs': 1}
detect peaks using locally_exclusive: 100%|█████████████████| 120/120 [00:47<00:00,  2.54it/s]
time included the spawn: 47.23868325911462

Here the trend is:

  • process+fork is the winner but not enormous diff
  • the more n_jobs is not the best for process approach
  • the more n_jobs is maybe the best for thread approach

Note : the spawn time will be negligable for long recording (here in this bench duration are very very short).

@chrishalcrow
Copy link
Collaborator

chrishalcrow commented Jan 8, 2025

EDIT: hadn't pulled. So re-ran after pulling.

Results from running your script:

{'pool_engine': 'process', 'n_jobs': 2, 'mp_context': 'spawn', 'max_threads_per_worker': 2}
detect peaks using locally_exclusive: 100%
 120/120 [00:08<00:00, 15.01it/s]
time included the spawn: 9.578207833008491

{'pool_engine': 'process', 'n_jobs': 4, 'mp_context': 'spawn', 'max_threads_per_worker': 1}
detect peaks using locally_exclusive: 100%
 120/120 [00:05<00:00, 24.13it/s]
time included the spawn: 7.022294959009741

{'pool_engine': 'thread', 'n_jobs': 4, 'mp_context': None, 'max_threads_per_worker': 1}
detect peaks using locally_exclusive: 100%
 120/120 [00:05<00:00, 23.68it/s]
time included the spawn: 5.4412608339916915

{'pool_engine': 'thread', 'n_jobs': 2, 'mp_context': None, 'max_threads_per_worker': 2}
detect peaks using locally_exclusive: 100%
 120/120 [00:08<00:00, 14.85it/s]
time included the spawn: 8.143652290993487

{'n_jobs': 1}
detect peaks using locally_exclusive: 100%
 120/120 [00:14<00:00,  8.09it/s]
time included the spawn: 14.862276790998294

Did it with the duration x10'd too:

{'pool_engine': 'process', 'n_jobs': 2, 'mp_context': 'spawn', 'max_threads_per_worker': 2}
detect peaks using locally_exclusive: 100%
 1200/1200 [01:19<00:00, 15.41it/s]
time included the spawn: 79.93183075000707

{'pool_engine': 'process', 'n_jobs': 4, 'mp_context': 'spawn', 'max_threads_per_worker': 1}
detect peaks using locally_exclusive: 100%
 1200/1200 [00:51<00:00, 23.48it/s]
time included the spawn: 53.32360237499233

{'pool_engine': 'thread', 'n_jobs': 4, 'mp_context': None, 'max_threads_per_worker': 1}
detect peaks using locally_exclusive: 100%
 1200/1200 [00:52<00:00, 23.10it/s]
time included the spawn: 52.550351167010376

{'pool_engine': 'thread', 'n_jobs': 2, 'mp_context': None, 'max_threads_per_worker': 2}
detect peaks using locally_exclusive: 100%
 1200/1200 [01:20<00:00, 14.90it/s]
time included the spawn: 80.46884250000585

{'n_jobs': 1}
detect peaks using locally_exclusive: 100%
 1200/1200 [02:28<00:00,  8.06it/s]
time included the spawn: 148.97508970800845

@chrishalcrow
Copy link
Collaborator

When I do the same process on real data, threads does much worse. Here are the results for a 5 minute slice (same script as before, but with rec = read_openephys('...')

{'pool_engine': 'process', 'n_jobs': 2, 'mp_context': 'spawn', 'max_threads_per_worker': 2}
detect peaks using locally_exclusive: 100%
 300/300 [00:17<00:00, 29.88it/s]
time included the spawn: 19.257165417002398

{'pool_engine': 'process', 'n_jobs': 4, 'mp_context': 'spawn', 'max_threads_per_worker': 1}
detect peaks using locally_exclusive: 100%
 300/300 [00:10<00:00, 18.40it/s]
time included the spawn: 14.335240166998119

{'pool_engine': 'thread', 'n_jobs': 4, 'mp_context': None, 'max_threads_per_worker': 1}
detect peaks using locally_exclusive: 100%
 300/300 [00:24<00:00, 26.61it/s]
time included the spawn: 24.705210832995363

{'pool_engine': 'thread', 'n_jobs': 2, 'mp_context': None, 'max_threads_per_worker': 2}
detect peaks using locally_exclusive: 100%
 300/300 [00:27<00:00, 28.44it/s]
time included the spawn: 27.068868458009092

{'n_jobs': 1}
detect peaks using locally_exclusive: 100%
 300/300 [00:30<00:00, 25.30it/s]
time included the spawn: 30.779495833005058

Any ideas why this would be different than the generated recording?

@samuelgarcia
Copy link
Member Author

Thanks Chris and Zach.
I did not excepted a bad result wth threading when reading form disk, noramlly this is a good use case of thread (blocking IOs).

Indeed my benchmark is not realistic finally.
Generating traces is cpu intensive : I will rerun after a rec.save().

else: # windows and mac
# on windows and macos the fork is forbidden and process+spwan is super slow at startup
# so let's go to threads
pool_engine = "thread"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change mac default to "process"

@samuelgarcia
Copy link
Member Author

@alejoe91 : ready to merge I think

@h-mayorquin
Copy link
Collaborator

h-mayorquin commented Jan 22, 2025

Was there a tested/profiled case where this was useful at the end?

@samuelgarcia
Copy link
Member Author

It is commented in the test. in test_job_tools.py.
You can uncomment and play around.

@samuelgarcia
Copy link
Member Author

The main use case is to avoid the spawn on windows and also for to use thread in a HPC situation because the fork consumes too much memory and slurm is killing my task...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
concurrency Related to parallel processing core Changes to core module
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants