HIPO4 C++ Library 4.4.1
Columnar I/O library for CLAS12 physics data
Loading...
Searching...
No Matches
threadpool.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <fmt/format.h>
4#include <atomic>
5#include <chrono>
6#include <condition_variable>
7#include <functional>
8#include <future>
9#include <memory>
10#include <mutex>
11#include <queue>
12#include <string>
13#include <string_view>
14#include <thread>
15#include <type_traits>
16#include <vector>
17
23 public:
24 enum class ShutdownResult {
25 Graceful,
26 Forced,
28 };
29
30 struct Statistics {
31 std::atomic<std::size_t> tasks_submitted{0};
32 std::atomic<std::size_t> tasks_completed{0};
33 std::atomic<std::size_t> tasks_failed{0};
34 std::atomic<std::size_t> tasks_pending{0};
35 std::chrono::steady_clock::time_point start_time{std::chrono::steady_clock::now()};
36
37 // Custom move operations for atomic members
38 Statistics() = default;
39 Statistics(const Statistics&) = delete;
40 Statistics& operator=(const Statistics&) = delete;
41 Statistics(Statistics&& other) noexcept;
42 Statistics& operator=(Statistics&& other) noexcept;
43
44 [[nodiscard]] auto get_throughput() const noexcept -> double;
45 [[nodiscard]] auto get_uptime() const noexcept -> std::chrono::duration<double>;
46 };
47
55 explicit ThreadPool(std::size_t num_threads = 0, std::string_view thread_name_prefix = "ThreadPool");
56
60 ~ThreadPool() noexcept;
61
62 // Non-copyable, move-only
63 ThreadPool(const ThreadPool&) = delete;
64 ThreadPool& operator=(const ThreadPool&) = delete;
65 ThreadPool(ThreadPool&&) noexcept;
66 ThreadPool& operator=(ThreadPool&&) noexcept;
67
75 template <typename Callable, typename... Args>
76 [[nodiscard]] auto submit(Callable&& callable, Args&&... args) -> std::future<std::invoke_result_t<std::decay_t<Callable>, std::decay_t<Args>...>>;
77
82 auto shutdown(std::chrono::milliseconds timeout = std::chrono::seconds{5}) noexcept -> void;
83
89 [[nodiscard]] auto wait_for_idle(std::chrono::milliseconds timeout = std::chrono::milliseconds::max()) const -> bool;
90
94 [[nodiscard]] auto thread_count() const noexcept -> std::size_t { return m_workers.size(); }
95
99 [[nodiscard]] auto pending_tasks() const noexcept -> std::size_t { return m_stats.tasks_pending.load(std::memory_order_acquire); }
100
104 [[nodiscard]] auto is_running() const noexcept -> bool { return !m_stop_requested.load(std::memory_order_acquire); }
105
109 [[nodiscard]] auto get_statistics() const noexcept -> const Statistics& { return m_stats; }
110
111 private:
112 auto worker_loop(std::size_t worker_id) noexcept -> void;
113 auto set_thread_name(std::string_view name) const noexcept -> void;
114
115 mutable std::mutex m_queue_mutex;
116 std::condition_variable m_task_available;
117 mutable std::condition_variable m_all_idle;
118 std::queue<std::function<void()>> m_task_queue;
119
120 std::vector<std::thread> m_workers;
121 std::atomic<bool> m_stop_requested{false};
122 std::string m_thread_name_prefix;
123 mutable Statistics m_stats;
124};
125
126// Template implementation
127template <typename Callable, typename... Args>
128auto ThreadPool::submit(Callable&& callable, Args&&... args)
129 -> std::future<std::invoke_result_t<std::decay_t<Callable>, std::decay_t<Args>...>> {
130
131 using ReturnType = std::invoke_result_t<std::decay_t<Callable>, std::decay_t<Args>...>;
132
133 auto task = std::make_shared<std::packaged_task<ReturnType()>>(
134 [callable = std::forward<Callable>(callable),
135 args_tuple = std::make_tuple(std::forward<Args>(args)...)]() mutable -> ReturnType {
136 return std::apply(std::move(callable), std::move(args_tuple));
137 });
138
139 auto future = task->get_future();
140
141 std::lock_guard lock{m_queue_mutex};
142 if (m_stop_requested.load(std::memory_order_acquire)) throw std::runtime_error("Cannot submit task to stopped ThreadPool");
143 m_task_queue.emplace([task = std::move(task)]() noexcept {try {(*task)();} catch (...) {} });
144
145 m_stats.tasks_submitted.fetch_add(1, std::memory_order_acq_rel);
146 m_stats.tasks_pending.fetch_add(1, std::memory_order_acq_rel);
147
148 m_task_available.notify_one();
149 return future;
150}
151
152namespace {
153constexpr std::size_t MAX_REASONABLE_THREADS = 100;
154constexpr auto DEFAULT_SHUTDOWN_TIMEOUT = std::chrono::seconds{5};
155} // anonymous namespace
156
157inline auto ThreadPool::Statistics::get_throughput() const noexcept -> double {
158 const auto elapsed = std::chrono::steady_clock::now() - start_time;
159 const auto seconds = std::chrono::duration<double>(elapsed).count();
160 return seconds > 0.0 ? static_cast<double>(tasks_completed.load(std::memory_order_acquire)) / seconds : 0.0;
161}
162
163inline auto ThreadPool::Statistics::get_uptime() const noexcept -> std::chrono::duration<double> {
164 return std::chrono::steady_clock::now() - start_time;
165}
166
168 : tasks_submitted{other.tasks_submitted.load(std::memory_order_acquire)},
169 tasks_completed{other.tasks_completed.load(std::memory_order_acquire)},
170 tasks_failed{other.tasks_failed.load(std::memory_order_acquire)},
171 tasks_pending{other.tasks_pending.load(std::memory_order_acquire)},
172 start_time{other.start_time} {
173 // Reset other's atomic values
174 other.tasks_submitted.store(0, std::memory_order_release);
175 other.tasks_completed.store(0, std::memory_order_release);
176 other.tasks_failed.store(0, std::memory_order_release);
177 other.tasks_pending.store(0, std::memory_order_release);
178}
179
180// Statistics move assignment operator
182 if (this != &other) {
183 tasks_submitted.store(other.tasks_submitted.load(std::memory_order_acquire), std::memory_order_release);
184 tasks_completed.store(other.tasks_completed.load(std::memory_order_acquire), std::memory_order_release);
185 tasks_failed.store(other.tasks_failed.load(std::memory_order_acquire), std::memory_order_release);
186 tasks_pending.store(other.tasks_pending.load(std::memory_order_acquire), std::memory_order_release);
187 start_time = other.start_time;
188
189 // Reset other's atomic values
190 other.tasks_submitted.store(0, std::memory_order_release);
191 other.tasks_completed.store(0, std::memory_order_release);
192 other.tasks_failed.store(0, std::memory_order_release);
193 other.tasks_pending.store(0, std::memory_order_release);
194 }
195 return *this;
196}
197
198inline ThreadPool::ThreadPool(std::size_t num_threads, std::string_view thread_name_prefix)
199 : m_thread_name_prefix{thread_name_prefix} {
200
201 if (num_threads == 0) throw std::invalid_argument("ThreadPool: num_threads must be greater than 0");
202 if (num_threads > MAX_REASONABLE_THREADS) throw std::invalid_argument("ThreadPool: Requested thread count exceeds reasonable limit");
203
204 m_workers.reserve(num_threads);
205
206 try {
207 for (std::size_t i = 0; i < num_threads; ++i) {
208 m_workers.emplace_back(&ThreadPool::worker_loop, this, i);
209 }
210 } catch (const std::system_error& e) {
211 m_stop_requested.store(true, std::memory_order_release);
212 m_task_available.notify_all();
213
214 for (auto& worker : m_workers) {
215 if (worker.joinable()) worker.join();
216 }
217 throw;
218 }
219}
220
221inline ThreadPool::~ThreadPool() noexcept {
222 if (is_running()) shutdown(DEFAULT_SHUTDOWN_TIMEOUT);
223}
224
225inline ThreadPool::ThreadPool(ThreadPool&& other) noexcept
226 : m_queue_mutex{}, // Cannot move mutex, create new
227 m_task_available{}, // Cannot move condition_variable, create new
228 m_all_idle{}, // Cannot move condition_variable, create new
229 m_task_queue{std::move(other.m_task_queue)},
230 m_workers{std::move(other.m_workers)},
231 m_stop_requested{other.m_stop_requested.load(std::memory_order_acquire)},
232 m_thread_name_prefix{std::move(other.m_thread_name_prefix)},
233 m_stats{std::move(other.m_stats)} {
234
235 other.m_stop_requested.store(true, std::memory_order_release);
236}
237
238inline ThreadPool& ThreadPool::operator=(ThreadPool&& other) noexcept {
239 if (this != &other) {
240 if (is_running()) shutdown(DEFAULT_SHUTDOWN_TIMEOUT);
241
242 // Move data (note: mutexes and condition_variables cannot be moved)
243 std::lock_guard lock{m_queue_mutex};
244 std::lock_guard other_lock{other.m_queue_mutex};
245
246 m_task_queue = std::move(other.m_task_queue);
247 m_workers = std::move(other.m_workers);
248 m_stop_requested.store(other.m_stop_requested.load(std::memory_order_acquire), std::memory_order_release);
249 m_thread_name_prefix = std::move(other.m_thread_name_prefix);
250 m_stats = std::move(other.m_stats);
251
252 // Mark other as moved-from
253 other.m_stop_requested.store(true, std::memory_order_release);
254 }
255 return *this;
256}
257
258inline auto ThreadPool::shutdown(std::chrono::milliseconds timeout) noexcept -> void {
259 const auto deadline = std::chrono::steady_clock::now() + timeout;
260
261 bool expected = false;
262 if (!m_stop_requested.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) fmt::print("Pool was already stopped\n");
263
264 // Notify all workers to stop
265 m_task_available.notify_all();
266
267 bool all_joined = true;
268 for (auto& worker : m_workers) {
269 if (worker.joinable()) {
270 if (std::chrono::steady_clock::now() < deadline) {
271 try {
272 worker.join();
273 } catch (const std::system_error&) {
274 worker.detach();
275 all_joined = false;
276 }
277 } else {
278 worker.detach();
279 all_joined = false;
280 }
281 }
282 }
283
284 m_workers.clear();
285
286 // Clear remaining tasks
287 std::lock_guard lock{m_queue_mutex};
288 std::queue<std::function<void()>> empty;
289 m_task_queue.swap(empty);
290 m_stats.tasks_pending.store(0, std::memory_order_release);
291
292 ShutdownResult shutdown_result = all_joined ? ShutdownResult::Graceful : ShutdownResult::Forced;
293
294 switch (shutdown_result) {
296 fmt::print("Pool shutdown gracefully\n");
297 break;
299 fmt::print("Pool shutdown was forced (timeout)\n");
300 break;
302 fmt::print("Pool was already stopped\n");
303 break;
304 }
305}
306
307inline auto ThreadPool::wait_for_idle(std::chrono::milliseconds timeout) const -> bool {
308 std::unique_lock lock{m_queue_mutex};
309 return m_all_idle.wait_for(lock, timeout, [this] {
310 return m_task_queue.empty() &&
311 m_stats.tasks_pending.load(std::memory_order_acquire) == 0;
312 });
313}
314
315inline auto ThreadPool::worker_loop(std::size_t worker_id) noexcept -> void {
316 set_thread_name(m_thread_name_prefix + "_" + std::to_string(worker_id));
317
318 while (!m_stop_requested.load(std::memory_order_acquire)) {
319 std::function<void()> task;
320 { // This scope is VERY IMPORTANT to limit lock duration, don't remove it!
321 std::unique_lock lock{m_queue_mutex};
322
323 // Wait for task or stop signal
324 m_task_available.wait(lock, [this] {
325 return m_stop_requested.load(std::memory_order_acquire) || !m_task_queue.empty();
326 });
327
328 // Check if we should exit
329 if (m_stop_requested.load(std::memory_order_acquire) && m_task_queue.empty()) {
330 break;
331 }
332
333 // Get task if available
334 if (!m_task_queue.empty()) {
335 task = std::move(m_task_queue.front());
336 m_task_queue.pop();
337 }
338 /*
339 The braces { } create a new scope. In C++, when a variable goes out of scope, its destructor is called automatically (RAII).
340 By wrapping the std::unique_lock in braces, the lock's destructor runs at the closing }, which releases the mutex before the task executes.
341 Without the braces, the lock stayed alive until the end of the entire while loop, keeping the mutex locked during task execution and blocking all other threads.
342 It's the difference between "lock → get task → unlock → execute" (parallel) versus "lock → get task → execute → unlock" (serial).
343 */
344 } // ← Lock released here!
345
346 // Execute task outside of lock
347 if (task) {
348 try {
349 task();
350 m_stats.tasks_completed.fetch_add(1, std::memory_order_acq_rel);
351 } catch (const std::exception& e) {
352 m_stats.tasks_failed.fetch_add(1, std::memory_order_acq_rel);
353 } catch (...) {
354 m_stats.tasks_failed.fetch_add(1, std::memory_order_acq_rel);
355 }
356
357 // Update pending count and notify if idle
358 const auto remaining = m_stats.tasks_pending.fetch_sub(1, std::memory_order_acq_rel);
359 if (remaining == 1) {
360 std::lock_guard lock{m_queue_mutex};
361 if (m_task_queue.empty()) m_all_idle.notify_all();
362 }
363 }
364 }
365}
366
367inline auto ThreadPool::set_thread_name(std::string_view name) const noexcept -> void {
368 try {
369#if defined(__linux__)
370 // Linux limits thread names to 15 characters + null terminator
371 constexpr std::size_t MAX_NAME_LENGTH = 15;
372 std::string truncated_name{name.substr(0, MAX_NAME_LENGTH)};
373 pthread_setname_np(pthread_self(), truncated_name.c_str());
374
375#elif defined(_WIN32)
376 // Windows thread naming via SetThreadDescription (Windows 10 version 1607+)
377 std::wstring wide_name;
378 wide_name.reserve(name.size());
379 std::transform(name.begin(), name.end(), std::back_inserter(wide_name),
380 [](char c) { return static_cast<wchar_t>(c); });
381
382 SetThreadDescription(GetCurrentThread(), wide_name.c_str());
383
384#elif defined(__APPLE__)
385 // macOS allows longer thread names
386 std::string name_str{name};
387 pthread_setname_np(name_str.c_str());
388#endif
389 } catch (...) {
390 }
391}
A ThreadPool implementation.
Definition threadpool.hpp:22
ShutdownResult
Definition threadpool.hpp:24
@ AlreadyStopped
Pool was already stopped.
@ Graceful
All tasks completed within timeout.
@ Forced
Timeout exceeded, threads detached.
auto get_statistics() const noexcept -> const Statistics &
Gets current statistics snapshot.
Definition threadpool.hpp:109
auto wait_for_idle(std::chrono::milliseconds timeout=std::chrono::milliseconds::max()) const -> bool
Waits for all pending tasks to complete.
Definition threadpool.hpp:307
auto pending_tasks() const noexcept -> std::size_t
Gets the number of pending tasks.
Definition threadpool.hpp:99
auto shutdown(std::chrono::milliseconds timeout=std::chrono::seconds{5}) noexcept -> void
Initiates graceful shutdown with specified timeout.
Definition threadpool.hpp:258
auto submit(Callable &&callable, Args &&... args) -> std::future< std::invoke_result_t< std::decay_t< Callable >, std::decay_t< Args >... > >
Submits a task for asynchronous execution.
Definition threadpool.hpp:128
auto thread_count() const noexcept -> std::size_t
Gets the number of worker threads.
Definition threadpool.hpp:94
auto is_running() const noexcept -> bool
Checks if the pool is running (not stopped).
Definition threadpool.hpp:104
ThreadPool(std::size_t num_threads=0, std::string_view thread_name_prefix="ThreadPool")
Constructs a ThreadPool with specified number of worker threads.
Definition threadpool.hpp:198
~ThreadPool() noexcept
Destructor ensures graceful shutdown with reasonable timeout.
Definition threadpool.hpp:221
ThreadPool & operator=(const ThreadPool &)=delete
Definition threadpool.hpp:30
std::atomic< std::size_t > tasks_pending
Definition threadpool.hpp:34
Statistics(const Statistics &)=delete
auto get_throughput() const noexcept -> double
Definition threadpool.hpp:157
std::atomic< std::size_t > tasks_failed
Definition threadpool.hpp:33
std::atomic< std::size_t > tasks_submitted
Definition threadpool.hpp:31
Statistics & operator=(const Statistics &)=delete
auto get_uptime() const noexcept -> std::chrono::duration< double >
Definition threadpool.hpp:163
std::chrono::steady_clock::time_point start_time
Definition threadpool.hpp:35
std::atomic< std::size_t > tasks_completed
Definition threadpool.hpp:32