GCC Code Coverage Report


Directory: ./
Coverage: low: ≥ 0% medium: ≥ 75.0% high: ≥ 90.0%
Coverage Exec / Excl / Total
Lines: 89.0% 65 / 0 / 73
Functions: 87.5% 14 / 0 / 16
Branches: 74.2% 23 / 0 / 31

libs/capy/src/ex/thread_pool.cpp
Line Branch Exec Source
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Michael Vandeberg
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/boostorg/capy
9 //
10
11 #include <boost/capy/ex/thread_pool.hpp>
12 #include <boost/capy/detail/intrusive.hpp>
13 #include <boost/capy/detail/thread_name.hpp>
14 #include <condition_variable>
15 #include <cstdio>
16 #include <mutex>
17 #include <stop_token>
18 #include <thread>
19 #include <vector>
20
21 /*
22 Thread pool implementation using a shared work queue.
23
24 Work items are coroutine handles wrapped in intrusive list nodes, stored
25 in a single queue protected by a mutex. Worker threads wait on a
26 condition_variable_any that integrates with std::stop_token for clean
27 shutdown.
28
29 Threads are started lazily on first post() via std::call_once to avoid
30 spawning threads for pools that are constructed but never used. Each
31 thread is named with a configurable prefix plus index for debugger
32 visibility.
33
34 Shutdown sequence: stop() requests all threads to stop via their stop
35 tokens, then the destructor joins threads and destroys any remaining
36 queued work without executing it.
37 */
38
39 namespace boost {
40 namespace capy {
41
42 //------------------------------------------------------------------------------
43
44 class thread_pool::impl
45 {
46 struct work : detail::intrusive_queue<work>::node
47 {
48 coro h_;
49
50 122 explicit work(coro h) noexcept
51 122 : h_(h)
52 {
53 122 }
54
55 122 void run()
56 {
57 122 auto h = h_;
58
1/2
✓ Branch 0 taken 122 times.
✗ Branch 1 not taken.
122 delete this;
59
1/1
✓ Branch 1 taken 122 times.
122 h.resume();
60 122 }
61
62 void destroy()
63 {
64 delete this;
65 }
66 };
67
68 std::mutex mutex_;
69 std::condition_variable_any cv_;
70 detail::intrusive_queue<work> q_;
71 std::vector<std::jthread> threads_;
72 std::size_t num_threads_;
73 char thread_name_prefix_[13]{}; // 12 chars max + null terminator
74 std::once_flag start_flag_;
75
76 public:
77 56 ~impl()
78 {
79 56 stop();
80 56 threads_.clear();
81
82
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 56 times.
56 while(auto* w = q_.pop())
83 w->destroy();
84 56 }
85
86 56 impl(std::size_t num_threads, std::string_view thread_name_prefix)
87 56 : num_threads_(num_threads)
88 {
89
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 55 times.
56 if(num_threads_ == 0)
90 1 num_threads_ = std::thread::hardware_concurrency();
91
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 56 times.
56 if(num_threads_ == 0)
92 num_threads_ = 1;
93
94 // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
95
1/1
✓ Branch 1 taken 56 times.
56 auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
96 56 thread_name_prefix_[n] = '\0';
97 56 }
98
99 void
100 122 post(coro h)
101 {
102 122 ensure_started();
103 122 auto* w = new work(h);
104 {
105
1/1
✓ Branch 1 taken 122 times.
122 std::lock_guard<std::mutex> lock(mutex_);
106 122 q_.push(w);
107 122 }
108 122 cv_.notify_one();
109 122 }
110
111 void
112 56 stop() noexcept
113 {
114
2/2
✓ Branch 5 taken 34 times.
✓ Branch 6 taken 56 times.
90 for (auto& t : threads_)
115 34 t.request_stop();
116 56 cv_.notify_all();
117 56 }
118
119 private:
120 void
121 122 ensure_started()
122 {
123
1/1
✓ Branch 1 taken 122 times.
122 std::call_once(start_flag_, [this]{
124 22 threads_.reserve(num_threads_);
125
2/2
✓ Branch 0 taken 34 times.
✓ Branch 1 taken 22 times.
56 for(std::size_t i = 0; i < num_threads_; ++i)
126 34 threads_.emplace_back(
127
2/2
✓ Branch 2 taken 34 times.
✓ Branch 7 taken 34 times.
68 [this, i](std::stop_token st){ run(st, i); });
128 22 });
129 122 }
130
131 void
132 34 run(std::stop_token st, std::size_t index)
133 {
134 // Build name; set_current_thread_name truncates to platform limits.
135 char name[16];
136 34 std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
137 34 detail::set_current_thread_name(name);
138
139 for(;;)
140 {
141 156 work* w = nullptr;
142 {
143
1/1
✓ Branch 1 taken 156 times.
156 std::unique_lock<std::mutex> lock(mutex_);
144
3/3
✓ Branch 2 taken 156 times.
✓ Branch 5 taken 34 times.
✓ Branch 6 taken 122 times.
353 if(!cv_.wait(lock, st, [this]{ return !q_.empty(); }))
145 68 return;
146 122 w = q_.pop();
147 156 }
148
1/1
✓ Branch 1 taken 122 times.
122 w->run();
149 122 }
150 }
151 };
152
153 //------------------------------------------------------------------------------
154
155 56 thread_pool::
156 ~thread_pool()
157 {
158 56 shutdown();
159 56 destroy();
160
1/2
✓ Branch 0 taken 56 times.
✗ Branch 1 not taken.
56 delete impl_;
161 56 }
162
163 56 thread_pool::
164 56 thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
165
2/4
✓ Branch 2 taken 56 times.
✓ Branch 5 taken 56 times.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
56 : impl_(new impl(num_threads, thread_name_prefix))
166 {
167 56 }
168
169 void
170 thread_pool::
171 stop() noexcept
172 {
173 impl_->stop();
174 }
175
176 //------------------------------------------------------------------------------
177
178 void
179 122 thread_pool::executor_type::
180 post(coro h) const
181 {
182 122 pool_->impl_->post(h);
183 122 }
184
185 } // capy
186 } // boost
187