-
Notifications
You must be signed in to change notification settings - Fork 69
/
Copy pathasync_event.cpp
94 lines (71 loc) · 2.04 KB
/
async_event.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// Copyright (c) 2024, Lawrence Livermore National Security, LLC.
// See top-level LICENSE file for details.
// Example for using asynchronous timed events across threads
#include <caliper/cali.h>
#include <caliper/cali-manager.h>
#include <caliper/AsyncEvent.h>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>
std::queue<cali::TimedAsyncEvent> q;
std::mutex q_mtx;
std::condition_variable cv;
bool done = false;
void consumer_thread_fn()
{
CALI_CXX_MARK_FUNCTION;
while (true) {
cali::TimedAsyncEvent evt;
CALI_MARK_BEGIN("waiting");
{
std::unique_lock<std::mutex> g(q_mtx);
cv.wait(g, [](){ return !q.empty() || done; });
if (!q.empty()) {
evt = q.front();
q.pop();
} else
return;
}
CALI_MARK_END("waiting");
evt.end();
CALI_CXX_MARK_SCOPE("processing");
std::this_thread::sleep_for(std::chrono::microseconds(200));
}
}
int main(int argc, char* argv[])
{
cali::ConfigManager mgr;
mgr.set_default_parameter("aggregate_across_ranks", "false");
if (argc > 1) {
mgr.add(argv[1]);
if (mgr.error()) {
std::cerr << "ConfigManager: " << mgr.error_msg() << std::endl;
return -1;
}
}
mgr.start();
cali_init(); // initialize Caliper before creating sub-thread
std::thread consumer(consumer_thread_fn);
CALI_MARK_BEGIN("main_thread");
int N = 200;
CALI_MARK_BEGIN("producing");
for (int i = 0; i < N; ++i) {
q_mtx.lock();
q.push(cali::TimedAsyncEvent::begin("queue_wait"));
q_mtx.unlock();
cv.notify_one();
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
CALI_MARK_END("producing");
{
std::lock_guard<std::mutex> g(q_mtx);
done = true;
}
cv.notify_all();
CALI_MARK_BEGIN("waiting");
consumer.join();
CALI_MARK_END("waiting");
CALI_MARK_END("main_thread");
mgr.flush();
}