HIPO  4.3.0
High Performance Output data format for experimental physics
threadpool.hpp
Go to the documentation of this file.
1 
12 #pragma once
13 
14 #include <fmt/format.h>
15 #include <atomic>
16 #include <chrono>
17 #include <condition_variable>
18 #include <functional>
19 #include <future>
20 #include <memory>
21 #include <mutex>
22 #include <queue>
23 #include <string>
24 #include <string_view>
25 #include <thread>
26 #include <type_traits>
27 #include <vector>
28 
35 class ThreadPool {
36  public:
40  enum class ShutdownResult {
41  Graceful,
42  Forced,
44  };
45 
52  struct Statistics {
53  std::atomic<std::size_t> tasks_submitted{0};
54  std::atomic<std::size_t> tasks_completed{0};
55  std::atomic<std::size_t> tasks_failed{0};
56  std::atomic<std::size_t> tasks_pending{0};
57  std::chrono::steady_clock::time_point start_time{std::chrono::steady_clock::now()};
58 
60  Statistics() = default;
62  Statistics(const Statistics&) = delete;
64  Statistics& operator=(const Statistics&) = delete;
66  Statistics(Statistics&& other) noexcept;
68  Statistics& operator=(Statistics&& other) noexcept;
69 
72  [[nodiscard]] auto get_throughput() const noexcept -> double;
75  [[nodiscard]] auto get_uptime() const noexcept -> std::chrono::duration<double>;
76  };
77 
85  explicit ThreadPool(std::size_t num_threads = 0, std::string_view thread_name_prefix = "ThreadPool");
86 
90  ~ThreadPool() noexcept;
91 
92  // Non-copyable, move-only
93  ThreadPool(const ThreadPool&) = delete;
94  ThreadPool& operator=(const ThreadPool&) = delete;
95  ThreadPool(ThreadPool&&) noexcept;
96  ThreadPool& operator=(ThreadPool&&) noexcept;
97 
105  template <typename Callable, typename... Args>
106  [[nodiscard]] auto submit(Callable&& callable, Args&&... args) -> std::future<std::invoke_result_t<std::decay_t<Callable>, std::decay_t<Args>...>>;
107 
112  auto shutdown(std::chrono::milliseconds timeout = std::chrono::seconds{5}) noexcept -> void;
113 
119  [[nodiscard]] auto wait_for_idle(std::chrono::milliseconds timeout = std::chrono::milliseconds::max()) const -> bool;
120 
124  [[nodiscard]] auto thread_count() const noexcept -> std::size_t { return m_workers.size(); }
125 
129  [[nodiscard]] auto pending_tasks() const noexcept -> std::size_t { return m_stats.tasks_pending.load(std::memory_order_acquire); }
130 
134  [[nodiscard]] auto is_running() const noexcept -> bool { return !m_stop_requested.load(std::memory_order_acquire); }
135 
139  [[nodiscard]] auto get_statistics() const noexcept -> const Statistics& { return m_stats; }
140 
141  private:
144  auto worker_loop(std::size_t worker_id) noexcept -> void;
147  auto set_thread_name(std::string_view name) const noexcept -> void;
148 
149  mutable std::mutex m_queue_mutex;
150  std::condition_variable m_task_available;
151  mutable std::condition_variable m_all_idle;
152  std::queue<std::function<void()>> m_task_queue;
153 
154  std::vector<std::thread> m_workers;
155  std::atomic<bool> m_stop_requested{false};
156  std::string m_thread_name_prefix;
157  mutable Statistics m_stats;
158 };
159 
160 // Template implementation
161 template <typename Callable, typename... Args>
162 auto ThreadPool::submit(Callable&& callable, Args&&... args)
163  -> std::future<std::invoke_result_t<std::decay_t<Callable>, std::decay_t<Args>...>> {
164 
165  using ReturnType = std::invoke_result_t<std::decay_t<Callable>, std::decay_t<Args>...>;
166 
167  auto task = std::make_shared<std::packaged_task<ReturnType()>>(
168  [callable = std::forward<Callable>(callable),
169  args_tuple = std::make_tuple(std::forward<Args>(args)...)]() mutable -> ReturnType {
170  return std::apply(std::move(callable), std::move(args_tuple));
171  });
172 
173  auto future = task->get_future();
174 
175  std::lock_guard lock{m_queue_mutex};
176  if (m_stop_requested.load(std::memory_order_acquire)) throw std::runtime_error("Cannot submit task to stopped ThreadPool");
177  m_task_queue.emplace([task = std::move(task)]() noexcept {try {(*task)();} catch (...) {} });
178 
179  m_stats.tasks_submitted.fetch_add(1, std::memory_order_acq_rel);
180  m_stats.tasks_pending.fetch_add(1, std::memory_order_acq_rel);
181 
182  m_task_available.notify_one();
183  return future;
184 }
185 
186 namespace {
187 constexpr std::size_t MAX_REASONABLE_THREADS = 100;
188 constexpr auto DEFAULT_SHUTDOWN_TIMEOUT = std::chrono::seconds{5};
189 } // anonymous namespace
190 
191 inline auto ThreadPool::Statistics::get_throughput() const noexcept -> double {
192  const auto elapsed = std::chrono::steady_clock::now() - start_time;
193  const auto seconds = std::chrono::duration<double>(elapsed).count();
194  return seconds > 0.0 ? static_cast<double>(tasks_completed.load(std::memory_order_acquire)) / seconds : 0.0;
195 }
196 
197 inline auto ThreadPool::Statistics::get_uptime() const noexcept -> std::chrono::duration<double> {
198  return std::chrono::steady_clock::now() - start_time;
199 }
200 
202  : tasks_submitted{other.tasks_submitted.load(std::memory_order_acquire)},
203  tasks_completed{other.tasks_completed.load(std::memory_order_acquire)},
204  tasks_failed{other.tasks_failed.load(std::memory_order_acquire)},
205  tasks_pending{other.tasks_pending.load(std::memory_order_acquire)},
206  start_time{other.start_time} {
207  // Reset other's atomic values
208  other.tasks_submitted.store(0, std::memory_order_release);
209  other.tasks_completed.store(0, std::memory_order_release);
210  other.tasks_failed.store(0, std::memory_order_release);
211  other.tasks_pending.store(0, std::memory_order_release);
212 }
213 
214 // Statistics move assignment operator
216  if (this != &other) {
217  tasks_submitted.store(other.tasks_submitted.load(std::memory_order_acquire), std::memory_order_release);
218  tasks_completed.store(other.tasks_completed.load(std::memory_order_acquire), std::memory_order_release);
219  tasks_failed.store(other.tasks_failed.load(std::memory_order_acquire), std::memory_order_release);
220  tasks_pending.store(other.tasks_pending.load(std::memory_order_acquire), std::memory_order_release);
221  start_time = other.start_time;
222 
223  // Reset other's atomic values
224  other.tasks_submitted.store(0, std::memory_order_release);
225  other.tasks_completed.store(0, std::memory_order_release);
226  other.tasks_failed.store(0, std::memory_order_release);
227  other.tasks_pending.store(0, std::memory_order_release);
228  }
229  return *this;
230 }
231 
232 inline ThreadPool::ThreadPool(std::size_t num_threads, std::string_view thread_name_prefix)
233  : m_thread_name_prefix{thread_name_prefix} {
234 
235  if (num_threads == 0) throw std::invalid_argument("ThreadPool: num_threads must be greater than 0");
236  if (num_threads > MAX_REASONABLE_THREADS) throw std::invalid_argument("ThreadPool: Requested thread count exceeds reasonable limit");
237 
238  m_workers.reserve(num_threads);
239 
240  try {
241  for (std::size_t i = 0; i < num_threads; ++i) {
242  m_workers.emplace_back(&ThreadPool::worker_loop, this, i);
243  }
244  } catch (const std::system_error& e) {
245  m_stop_requested.store(true, std::memory_order_release);
246  m_task_available.notify_all();
247 
248  for (auto& worker : m_workers) {
249  if (worker.joinable()) worker.join();
250  }
251  throw;
252  }
253 }
254 
255 inline ThreadPool::~ThreadPool() noexcept {
256  if (is_running()) shutdown(DEFAULT_SHUTDOWN_TIMEOUT);
257 }
258 
259 inline ThreadPool::ThreadPool(ThreadPool&& other) noexcept
260  : m_queue_mutex{}, // Cannot move mutex, create new
261  m_task_available{}, // Cannot move condition_variable, create new
262  m_all_idle{}, // Cannot move condition_variable, create new
263  m_task_queue{std::move(other.m_task_queue)},
264  m_workers{std::move(other.m_workers)},
265  m_stop_requested{other.m_stop_requested.load(std::memory_order_acquire)},
266  m_thread_name_prefix{std::move(other.m_thread_name_prefix)},
267  m_stats{std::move(other.m_stats)} {
268 
269  other.m_stop_requested.store(true, std::memory_order_release);
270 }
271 
272 inline ThreadPool& ThreadPool::operator=(ThreadPool&& other) noexcept {
273  if (this != &other) {
274  if (is_running()) shutdown(DEFAULT_SHUTDOWN_TIMEOUT);
275 
276  // Move data (note: mutexes and condition_variables cannot be moved)
277  std::lock_guard lock{m_queue_mutex};
278  std::lock_guard other_lock{other.m_queue_mutex};
279 
280  m_task_queue = std::move(other.m_task_queue);
281  m_workers = std::move(other.m_workers);
282  m_stop_requested.store(other.m_stop_requested.load(std::memory_order_acquire), std::memory_order_release);
283  m_thread_name_prefix = std::move(other.m_thread_name_prefix);
284  m_stats = std::move(other.m_stats);
285 
286  // Mark other as moved-from
287  other.m_stop_requested.store(true, std::memory_order_release);
288  }
289  return *this;
290 }
291 
292 inline auto ThreadPool::shutdown(std::chrono::milliseconds timeout) noexcept -> void {
293  const auto deadline = std::chrono::steady_clock::now() + timeout;
294 
295  bool expected = false;
296  if (!m_stop_requested.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) fmt::print("Pool was already stopped\n");
297 
298  // Notify all workers to stop
299  m_task_available.notify_all();
300 
301  bool all_joined = true;
302  for (auto& worker : m_workers) {
303  if (worker.joinable()) {
304  if (std::chrono::steady_clock::now() < deadline) {
305  try {
306  worker.join();
307  } catch (const std::system_error&) {
308  worker.detach();
309  all_joined = false;
310  }
311  } else {
312  worker.detach();
313  all_joined = false;
314  }
315  }
316  }
317 
318  m_workers.clear();
319 
320  // Clear remaining tasks
321  std::lock_guard lock{m_queue_mutex};
322  std::queue<std::function<void()>> empty;
323  m_task_queue.swap(empty);
324  m_stats.tasks_pending.store(0, std::memory_order_release);
325 
326  ShutdownResult shutdown_result = all_joined ? ShutdownResult::Graceful : ShutdownResult::Forced;
327 
328  switch (shutdown_result) {
330  fmt::print("Pool shutdown gracefully\n");
331  break;
333  fmt::print("Pool shutdown was forced (timeout)\n");
334  break;
336  fmt::print("Pool was already stopped\n");
337  break;
338  }
339 }
340 
341 inline auto ThreadPool::wait_for_idle(std::chrono::milliseconds timeout) const -> bool {
342  std::unique_lock lock{m_queue_mutex};
343  return m_all_idle.wait_for(lock, timeout, [this] {
344  return m_task_queue.empty() &&
345  m_stats.tasks_pending.load(std::memory_order_acquire) == 0;
346  });
347 }
348 
349 inline auto ThreadPool::worker_loop(std::size_t worker_id) noexcept -> void {
350  set_thread_name(m_thread_name_prefix + "_" + std::to_string(worker_id));
351 
352  while (!m_stop_requested.load(std::memory_order_acquire)) {
353  std::function<void()> task;
354  { // This scope is VERY IMPORTANT to limit lock duration, don't remove it!
355  std::unique_lock lock{m_queue_mutex};
356 
357  // Wait for task or stop signal
358  m_task_available.wait(lock, [this] {
359  return m_stop_requested.load(std::memory_order_acquire) || !m_task_queue.empty();
360  });
361 
362  // Check if we should exit
363  if (m_stop_requested.load(std::memory_order_acquire) && m_task_queue.empty()) {
364  break;
365  }
366 
367  // Get task if available
368  if (!m_task_queue.empty()) {
369  task = std::move(m_task_queue.front());
370  m_task_queue.pop();
371  }
372  /*
373  The braces { } create a new scope. In C++, when a variable goes out of scope, its destructor is called automatically (RAII).
374  By wrapping the std::unique_lock in braces, the lock's destructor runs at the closing }, which releases the mutex before the task executes.
375  Without the braces, the lock stayed alive until the end of the entire while loop, keeping the mutex locked during task execution and blocking all other threads.
376  It's the difference between "lock → get task → unlock → execute" (parallel) versus "lock → get task → execute → unlock" (serial).
377  */
378  } // ← Lock released here!
379 
380  // Execute task outside of lock
381  if (task) {
382  try {
383  task();
384  m_stats.tasks_completed.fetch_add(1, std::memory_order_acq_rel);
385  } catch (const std::exception& e) {
386  m_stats.tasks_failed.fetch_add(1, std::memory_order_acq_rel);
387  } catch (...) {
388  m_stats.tasks_failed.fetch_add(1, std::memory_order_acq_rel);
389  }
390 
391  // Update pending count and notify if idle
392  const auto remaining = m_stats.tasks_pending.fetch_sub(1, std::memory_order_acq_rel);
393  if (remaining == 1) {
394  std::lock_guard lock{m_queue_mutex};
395  if (m_task_queue.empty()) m_all_idle.notify_all();
396  }
397  }
398  }
399 }
400 
401 inline auto ThreadPool::set_thread_name(std::string_view name) const noexcept -> void {
402  try {
403 #if defined(__linux__)
404  // Linux limits thread names to 15 characters + null terminator
405  constexpr std::size_t MAX_NAME_LENGTH = 15;
406  std::string truncated_name{name.substr(0, MAX_NAME_LENGTH)};
407  pthread_setname_np(pthread_self(), truncated_name.c_str());
408 
409 #elif defined(_WIN32)
410  // Windows thread naming via SetThreadDescription (Windows 10 version 1607+)
411  std::wstring wide_name;
412  wide_name.reserve(name.size());
413  std::transform(name.begin(), name.end(), std::back_inserter(wide_name),
414  [](char c) { return static_cast<wchar_t>(c); });
415 
416  SetThreadDescription(GetCurrentThread(), wide_name.c_str());
417 
418 #elif defined(__APPLE__)
419  // macOS allows longer thread names
420  std::string name_str{name};
421  pthread_setname_np(name_str.c_str());
422 #endif
423  } catch (...) {
424  }
425 }
426 /// @} // end of concurrency group
A ThreadPool implementation for managing asynchronous task execution.
Definition: threadpool.hpp:35
ShutdownResult
Result status of thread pool shutdown.
Definition: threadpool.hpp:40
@ AlreadyStopped
Pool was already stopped.
@ Graceful
All tasks completed within timeout.
@ Forced
Timeout exceeded, threads detached.
auto get_statistics() const noexcept -> const Statistics &
Gets current statistics snapshot.
Definition: threadpool.hpp:139
auto pending_tasks() const noexcept -> std::size_t
Gets the number of pending tasks.
Definition: threadpool.hpp:129
auto thread_count() const noexcept -> std::size_t
Gets the number of worker threads.
Definition: threadpool.hpp:124
ThreadPool & operator=(const ThreadPool &)=delete
auto is_running() const noexcept -> bool
Checks if the pool is running (not stopped).
Definition: threadpool.hpp:134
auto wait_for_idle(std::chrono::milliseconds timeout=std::chrono::milliseconds::max()) const -> bool
Waits for all pending tasks to complete.
Definition: threadpool.hpp:341
auto shutdown(std::chrono::milliseconds timeout=std::chrono::seconds{5}) noexcept -> void
Initiates graceful shutdown with specified timeout.
Definition: threadpool.hpp:292
auto get_throughput() const noexcept -> double
Calculates tasks completed per second.
Definition: threadpool.hpp:191
auto submit(Callable &&callable, Args &&... args) -> std::future< std::invoke_result_t< std::decay_t< Callable >, std::decay_t< Args >... >>
Submits a task for asynchronous execution.
Definition: threadpool.hpp:162
ThreadPool(std::size_t num_threads=0, std::string_view thread_name_prefix="ThreadPool")
Constructs a ThreadPool with specified number of worker threads.
Definition: threadpool.hpp:232
~ThreadPool() noexcept
Destructor ensures graceful shutdown with reasonable timeout.
Definition: threadpool.hpp:255
auto get_uptime() const noexcept -> std::chrono::duration< double >
Calculates time elapsed since pool creation.
Definition: threadpool.hpp:197
Runtime statistics for thread pool performance monitoring.
Definition: threadpool.hpp:52
std::atomic< std::size_t > tasks_pending
Current number of pending (not yet executed) tasks.
Definition: threadpool.hpp:56
Statistics(const Statistics &)=delete
Non-copyable (atomic members cannot be copied).
std::atomic< std::size_t > tasks_failed
Total tasks that failed or threw exceptions.
Definition: threadpool.hpp:55
std::atomic< std::size_t > tasks_submitted
Total tasks submitted to the pool.
Definition: threadpool.hpp:53
Statistics & operator=(const Statistics &)=delete
Non-copyable (atomic members cannot be copied).
Statistics()=default
Default constructor.
std::chrono::steady_clock::time_point start_time
Pool creation timestamp.
Definition: threadpool.hpp:57
std::atomic< std::size_t > tasks_completed
Total tasks completed successfully.
Definition: threadpool.hpp:54