Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Parallel Loop with Generic Reduction #12195

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

matekelemen
Copy link
Contributor

@matekelemen matekelemen commented Mar 16, 2024

Changes

Add a parallel for loop with thread local storage and an extra functor that performs reduction on each storage in a single thread after it finished its chunk of the loop.

This allows defining reductions on the fly, allowing for the implementation of more complex logic without overpopulating reduction_utilities.

Example

An example for usage is building a set "in parallel". Each thread computes part of the set, which then gets unified into the final one during reduction. The following example collects every round number between 0 and 99:

// --- Core Includes ---
#include "utilities/parallel_utilities.h" // block_for_each

// --- STL Includes ---
#include <unordered_set> // unordered_set
#include <numeric> // iota
#include <vector> // vector
#include <iostream> // cout


int main(){
    using Container = std::vector<int>;

    // Construct a range of integers between 0 and 99
    Container container(1e2);
    std::iota(container.begin(), container.end(), 0);

    // Define the thread-local storage
    using TLS = std::unordered_set<Container::value_type>;
    TLS round_numbers;
    
    // Perform the loop and reduction
    Kratos::block_for_each(
        container,
        round_numbers,
        [](Container::value_type Value, TLS& rTls) -> void {
            if (!(Value % 10)) rTls.insert(Value);
        },
        [&round_numbers](TLS& rTls) mutable -> void {
            for (auto item : rTls) round_numbers.insert(item);
        }
    );

    // Print the unified results
    for (auto item : round_numbers) {
        std::cout << item << "\n";
    }
}

Possible output:

90
20
40
70
10
80
30
50
0
60

@matekelemen matekelemen added Kratos Core C++ Parallel-SMP Shared memory parallelism with OpenMP or C++ Threads labels Mar 16, 2024
@matekelemen matekelemen requested a review from philbucher March 16, 2024 22:59
@matekelemen matekelemen self-assigned this Mar 16, 2024
@matekelemen matekelemen requested a review from a team as a code owner March 16, 2024 22:59
@matekelemen matekelemen requested a review from sunethwarna March 16, 2024 23:28
Copy link
Member

@philbucher philbucher left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how I feel abt this TBH

You replace the reductor by an extra lambda function if I understand right?

You could easily do the example you mention with the existing interface

Do you have a usecase where the existing one doesn't work?

@matekelemen
Copy link
Contributor Author

matekelemen commented Mar 17, 2024

You replace the reductor by an extra lambda function if I understand right?

more or less yes. The key is that the lambda function is an instance the user creates at the location of the loop, while existing reductions are instantiated in for_each, and thus cannot refer to local state.

You could easily do the example you mention with the existing interface

Do you have a usecase where the existing one doesn't work?

The current interface is general enough to allow doing mostly anything, but it's ill-suited for a couple of applications.

One gripe I have with it is that you need to define a class for every kind of reduction. If your reduction is highly specific to your problem, you'd basically have to define a class that gets used only once.

However, the major issue is what I mentioned earlier: reductions cannot store local state (reference local variables), nor do they support thread local storage. So the only way of getting local state into the reduction is via the return value of the parallel function. This is extremely hackish and I would immediately block any PR that tried doing this.

For example, take the inverse of the problem in the example: you have an existing set of integers between 0 and 99, and you want to remove all round numbers that appear in the range you're looping over. With the current interface, you'd have to capture the existing set in the functor and forward the pointer in its return value ...

// --- Core Includes ---
#include "utilities/parallel_utilities.h" // block_for_each

// --- STL Includes ---
#include <unordered_set> // unordered_set
#include <numeric> // iota
#include <vector> // vector
#include <iostream> // cout
#include <optional> // optional

using Container = std::vector<int>;

using IntSet = std::unordered_set<Container::value_type>;

namespace Kratos { // required by KRATOS_CRITICAL_SECTION

struct RemoveRoundsReduction
{
    using value_type = std::pair<
                            Container::value_type, // <== value of the item in the container we're looping over
                            IntSet*                // <== local state
                       >;

    using return_type = void;


    /// @details We're performing the reduction on an external object, so there's nothing to return.
    /// @note Btw it's extremely confusing that "GetValue" returns "return_type" instead of "value_type".
    return_type GetValue() const {}

    /// @brief Accumulate items to remove from the global set.
    void LocalReduce(value_type Value) {
        const auto item = Value.first;
        if (!(item % 10)) this->local_set.insert(item);

        // Store the global state in the reduction
        IntSet* p_global_set = Value.second;
        this->p_maybe_global_set.emplace(p_global_set);
    }

    /// @brief Remove items in the local set from the global one.
    /// @note The global reducer (this instance) is unused because we're performing
    ///       the reduction on an external object, and the data used for that reduction
    ///       is stored in the local reductions.
    void ThreadSafeReduce(const RemoveRoundsReduction& rLocalReduction) {
        if (rLocalReduction.p_maybe_global_set.has_value()) {
            KRATOS_CRITICAL_SECTION
            for (auto item : rLocalReduction.local_set) rLocalReduction.p_maybe_global_set.value()->erase(item);
        }
    }

    /// @brief Accumulate round numbers in this set during the local loop.
    IntSet local_set;

    /// @details Pointer to the global set, protected by an optional in case
    ///          the current thread was assigned an empty chunk to work on.
    std::optional<IntSet*> p_maybe_global_set;
}; // struct RemoveRoundsReduction

} // namespace Kratos


int main(){
    Container container(1e2);
    std::iota(container.begin(), container.end(), 0);

    IntSet not_round_numbers(container.begin(), container.end());

    Kratos::block_for_each<Kratos::RemoveRoundsReduction>(
        container,
        [&not_round_numbers](Container::value_type Value) -> std::pair<Container::value_type,IntSet*> {
            return std::make_pair(Value, &not_round_numbers);
        }
    );

    for (auto item : not_round_numbers) std::cout << item << " ";
    std::cout << "\n";
}

Possible output:

99 98 97 96 95 94 93 92 91 89 88 87 86 85 84 83 82 81 79 78 77 76 75 74 73 72 71 69 68 67 66 65 64 63 62 61 59 28 27 26 25 24 23 22 21 19 18 17 16 15 14 13 1 2 3 4 5 6 7 8 9 11 12 29 31 32 33 34 35 36 37 38 39 41 42 43 44 45 46 47 48 49 51 52 53 54 55 56 57 58

In comparison, the reduction with a lambda would look like so:

// --- Core Includes ---
#include "utilities/parallel_utilities.h" // block_for_each

// --- STL Includes ---
#include <unordered_set> // unordered_set
#include <numeric> // iota
#include <vector> // vector
#include <iostream> // cout


int main(){
    using Container = std::vector<int>;

    Container container(1e2);
    std::iota(container.begin(), container.end(), 0);

    using TLS = std::unordered_set<Container::value_type>;
    TLS not_round_numbers(container.begin(), container.end());

    Kratos::block_for_each(
        container,
        TLS(),
        [](Container::value_type Value, TLS& rTls) -> void {
            if (!(Value % 10)) rTls.insert(Value);
        },
        [&not_round_numbers](TLS& rTls) mutable -> void {
            for (auto item : rTls) not_round_numbers.erase(item);
        }
    );

    for (auto item : not_round_numbers) {
        std::cout << item << "\n";
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C++ Kratos Core Parallel-SMP Shared memory parallelism with OpenMP or C++ Threads
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants