6#include <condition_variable>
35 std::chrono::steady_clock::time_point
start_time{std::chrono::steady_clock::now()};
45 [[nodiscard]] auto
get_uptime() const noexcept -> std::chrono::duration<
double>;
55 explicit
ThreadPool(std::
size_t num_threads = 0, std::string_view thread_name_prefix = "
ThreadPool");
75 template <typename Callable, typename... Args>
76 [[nodiscard]] auto
submit(Callable&& callable, Args&&... args) -> std::future<std::invoke_result_t<std::decay_t<Callable>, std::decay_t<Args>...>>;
82 auto
shutdown(std::chrono::milliseconds timeout = std::chrono::seconds{5})
noexcept ->
void;
89 [[nodiscard]]
auto wait_for_idle(std::chrono::milliseconds timeout = std::chrono::milliseconds::max()) const ->
bool;
94 [[nodiscard]] auto
thread_count() const noexcept -> std::
size_t {
return m_workers.size(); }
99 [[nodiscard]]
auto pending_tasks() const noexcept -> std::
size_t {
return m_stats.tasks_pending.load(std::memory_order_acquire); }
104 [[nodiscard]]
auto is_running() const noexcept ->
bool {
return !m_stop_requested.load(std::memory_order_acquire); }
112 auto worker_loop(std::size_t worker_id)
noexcept -> void;
113 auto set_thread_name(std::string_view name)
const noexcept -> void;
115 mutable std::mutex m_queue_mutex;
116 std::condition_variable m_task_available;
117 mutable std::condition_variable m_all_idle;
118 std::queue<std::function<void()>> m_task_queue;
120 std::vector<std::thread> m_workers;
121 std::atomic<bool> m_stop_requested{
false};
122 std::string m_thread_name_prefix;
127template <
typename Callable,
typename... Args>
129 -> std::future<std::invoke_result_t<std::decay_t<Callable>, std::decay_t<Args>...>> {
131 using ReturnType = std::invoke_result_t<std::decay_t<Callable>, std::decay_t<Args>...>;
133 auto task = std::make_shared<std::packaged_task<ReturnType()>>(
134 [callable = std::forward<Callable>(callable),
135 args_tuple = std::make_tuple(std::forward<Args>(args)...)]()
mutable -> ReturnType {
136 return std::apply(std::move(callable), std::move(args_tuple));
139 auto future = task->get_future();
141 std::lock_guard lock{m_queue_mutex};
142 if (m_stop_requested.load(std::memory_order_acquire))
throw std::runtime_error(
"Cannot submit task to stopped ThreadPool");
143 m_task_queue.emplace([task = std::move(task)]()
noexcept {
try {(*task)();}
catch (...) {} });
146 m_stats.
tasks_pending.fetch_add(1, std::memory_order_acq_rel);
148 m_task_available.notify_one();
153constexpr std::size_t MAX_REASONABLE_THREADS = 100;
154constexpr auto DEFAULT_SHUTDOWN_TIMEOUT = std::chrono::seconds{5};
158 const auto elapsed = std::chrono::steady_clock::now() - start_time;
159 const auto seconds = std::chrono::duration<double>(elapsed).count();
160 return seconds > 0.0 ?
static_cast<double>(tasks_completed.load(std::memory_order_acquire)) / seconds : 0.0;
164 return std::chrono::steady_clock::now() - start_time;
168 : tasks_submitted{other.tasks_submitted.load(std::memory_order_acquire)},
169 tasks_completed{other.tasks_completed.load(std::memory_order_acquire)},
170 tasks_failed{other.tasks_failed.load(std::memory_order_acquire)},
171 tasks_pending{other.tasks_pending.load(std::memory_order_acquire)},
172 start_time{other.start_time} {
174 other.tasks_submitted.store(0, std::memory_order_release);
175 other.tasks_completed.store(0, std::memory_order_release);
176 other.tasks_failed.store(0, std::memory_order_release);
177 other.tasks_pending.store(0, std::memory_order_release);
182 if (
this != &other) {
183 tasks_submitted.store(other.tasks_submitted.load(std::memory_order_acquire), std::memory_order_release);
184 tasks_completed.store(other.tasks_completed.load(std::memory_order_acquire), std::memory_order_release);
185 tasks_failed.store(other.tasks_failed.load(std::memory_order_acquire), std::memory_order_release);
186 tasks_pending.store(other.tasks_pending.load(std::memory_order_acquire), std::memory_order_release);
190 other.tasks_submitted.store(0, std::memory_order_release);
191 other.tasks_completed.store(0, std::memory_order_release);
192 other.tasks_failed.store(0, std::memory_order_release);
193 other.tasks_pending.store(0, std::memory_order_release);
199 : m_thread_name_prefix{thread_name_prefix} {
201 if (num_threads == 0)
throw std::invalid_argument(
"ThreadPool: num_threads must be greater than 0");
202 if (num_threads > MAX_REASONABLE_THREADS)
throw std::invalid_argument(
"ThreadPool: Requested thread count exceeds reasonable limit");
204 m_workers.reserve(num_threads);
207 for (std::size_t i = 0; i < num_threads; ++i) {
208 m_workers.emplace_back(&ThreadPool::worker_loop,
this, i);
210 }
catch (
const std::system_error& e) {
211 m_stop_requested.store(
true, std::memory_order_release);
212 m_task_available.notify_all();
214 for (
auto& worker : m_workers) {
215 if (worker.joinable()) worker.join();
229 m_task_queue{std::move(other.m_task_queue)},
230 m_workers{std::move(other.m_workers)},
231 m_stop_requested{other.m_stop_requested.load(std::memory_order_acquire)},
232 m_thread_name_prefix{std::move(other.m_thread_name_prefix)},
233 m_stats{std::move(other.m_stats)} {
235 other.m_stop_requested.store(
true, std::memory_order_release);
239 if (
this != &other) {
240 if (is_running()) shutdown(DEFAULT_SHUTDOWN_TIMEOUT);
243 std::lock_guard lock{m_queue_mutex};
244 std::lock_guard other_lock{other.m_queue_mutex};
246 m_task_queue = std::move(other.m_task_queue);
247 m_workers = std::move(other.m_workers);
248 m_stop_requested.store(other.m_stop_requested.load(std::memory_order_acquire), std::memory_order_release);
249 m_thread_name_prefix = std::move(other.m_thread_name_prefix);
250 m_stats = std::move(other.m_stats);
253 other.m_stop_requested.store(
true, std::memory_order_release);
259 const auto deadline = std::chrono::steady_clock::now() + timeout;
261 bool expected =
false;
262 if (!m_stop_requested.compare_exchange_strong(expected,
true, std::memory_order_acq_rel)) fmt::print(
"Pool was already stopped\n");
265 m_task_available.notify_all();
267 bool all_joined =
true;
268 for (
auto& worker : m_workers) {
269 if (worker.joinable()) {
270 if (std::chrono::steady_clock::now() < deadline) {
273 }
catch (
const std::system_error&) {
287 std::lock_guard lock{m_queue_mutex};
288 std::queue<std::function<void()>> empty;
289 m_task_queue.swap(empty);
290 m_stats.tasks_pending.store(0, std::memory_order_release);
292 ShutdownResult shutdown_result = all_joined ? ShutdownResult::Graceful : ShutdownResult::Forced;
294 switch (shutdown_result) {
296 fmt::print(
"Pool shutdown gracefully\n");
299 fmt::print(
"Pool shutdown was forced (timeout)\n");
302 fmt::print(
"Pool was already stopped\n");
308 std::unique_lock lock{m_queue_mutex};
309 return m_all_idle.wait_for(lock, timeout, [
this] {
310 return m_task_queue.empty() &&
311 m_stats.tasks_pending.load(std::memory_order_acquire) == 0;
315inline auto ThreadPool::worker_loop(std::size_t worker_id)
noexcept ->
void {
316 set_thread_name(m_thread_name_prefix +
"_" + std::to_string(worker_id));
318 while (!m_stop_requested.load(std::memory_order_acquire)) {
319 std::function<void()> task;
321 std::unique_lock lock{m_queue_mutex};
324 m_task_available.wait(lock, [
this] {
325 return m_stop_requested.load(std::memory_order_acquire) || !m_task_queue.empty();
329 if (m_stop_requested.load(std::memory_order_acquire) && m_task_queue.empty()) {
334 if (!m_task_queue.empty()) {
335 task = std::move(m_task_queue.front());
350 m_stats.tasks_completed.fetch_add(1, std::memory_order_acq_rel);
351 }
catch (
const std::exception& e) {
352 m_stats.tasks_failed.fetch_add(1, std::memory_order_acq_rel);
354 m_stats.tasks_failed.fetch_add(1, std::memory_order_acq_rel);
358 const auto remaining = m_stats.tasks_pending.fetch_sub(1, std::memory_order_acq_rel);
359 if (remaining == 1) {
360 std::lock_guard lock{m_queue_mutex};
361 if (m_task_queue.empty()) m_all_idle.notify_all();
367inline auto ThreadPool::set_thread_name(std::string_view name)
const noexcept ->
void {
369#if defined(__linux__)
371 constexpr std::size_t MAX_NAME_LENGTH = 15;
372 std::string truncated_name{name.substr(0, MAX_NAME_LENGTH)};
373 pthread_setname_np(pthread_self(), truncated_name.c_str());
377 std::wstring wide_name;
378 wide_name.reserve(name.size());
379 std::transform(name.begin(), name.end(), std::back_inserter(wide_name),
380 [](
char c) { return static_cast<wchar_t>(c); });
382 SetThreadDescription(GetCurrentThread(), wide_name.c_str());
384#elif defined(__APPLE__)
386 std::string name_str{name};
387 pthread_setname_np(name_str.c_str());
A ThreadPool implementation.
Definition threadpool.hpp:22
ShutdownResult
Definition threadpool.hpp:24
@ AlreadyStopped
Pool was already stopped.
@ Graceful
All tasks completed within timeout.
@ Forced
Timeout exceeded, threads detached.
auto get_statistics() const noexcept -> const Statistics &
Gets current statistics snapshot.
Definition threadpool.hpp:109
auto wait_for_idle(std::chrono::milliseconds timeout=std::chrono::milliseconds::max()) const -> bool
Waits for all pending tasks to complete.
Definition threadpool.hpp:307
auto pending_tasks() const noexcept -> std::size_t
Gets the number of pending tasks.
Definition threadpool.hpp:99
auto shutdown(std::chrono::milliseconds timeout=std::chrono::seconds{5}) noexcept -> void
Initiates graceful shutdown with specified timeout.
Definition threadpool.hpp:258
auto submit(Callable &&callable, Args &&... args) -> std::future< std::invoke_result_t< std::decay_t< Callable >, std::decay_t< Args >... > >
Submits a task for asynchronous execution.
Definition threadpool.hpp:128
auto thread_count() const noexcept -> std::size_t
Gets the number of worker threads.
Definition threadpool.hpp:94
auto is_running() const noexcept -> bool
Checks if the pool is running (not stopped).
Definition threadpool.hpp:104
ThreadPool(std::size_t num_threads=0, std::string_view thread_name_prefix="ThreadPool")
Constructs a ThreadPool with specified number of worker threads.
Definition threadpool.hpp:198
~ThreadPool() noexcept
Destructor ensures graceful shutdown with reasonable timeout.
Definition threadpool.hpp:221
ThreadPool & operator=(const ThreadPool &)=delete
Definition threadpool.hpp:30
std::atomic< std::size_t > tasks_pending
Definition threadpool.hpp:34
Statistics(const Statistics &)=delete
auto get_throughput() const noexcept -> double
Definition threadpool.hpp:157
std::atomic< std::size_t > tasks_failed
Definition threadpool.hpp:33
std::atomic< std::size_t > tasks_submitted
Definition threadpool.hpp:31
Statistics & operator=(const Statistics &)=delete
auto get_uptime() const noexcept -> std::chrono::duration< double >
Definition threadpool.hpp:163
std::chrono::steady_clock::time_point start_time
Definition threadpool.hpp:35
std::atomic< std::size_t > tasks_completed
Definition threadpool.hpp:32