14 #include <fmt/format.h>
17 #include <condition_variable>
24 #include <string_view>
26 #include <type_traits>
57 std::chrono::steady_clock::time_point
start_time{std::chrono::steady_clock::now()};
75 [[nodiscard]] auto
get_uptime() const noexcept -> std::chrono::duration<
double>;
85 explicit
ThreadPool(std::
size_t num_threads = 0, std::string_view thread_name_prefix = "
ThreadPool");
105 template <typename Callable, typename... Args>
106 [[nodiscard]] auto
submit(Callable&& callable, Args&&... args) -> std::future<std::invoke_result_t<std::decay_t<Callable>, std::decay_t<Args>...>>;
112 auto
shutdown(std::chrono::milliseconds timeout = std::chrono::seconds{5}) noexcept ->
void;
119 [[nodiscard]]
auto wait_for_idle(std::chrono::milliseconds timeout = std::chrono::milliseconds::max()) const ->
bool;
124 [[nodiscard]] auto
thread_count() const noexcept -> std::
size_t {
return m_workers.size(); }
129 [[nodiscard]]
auto pending_tasks() const noexcept -> std::
size_t {
return m_stats.tasks_pending.load(std::memory_order_acquire); }
134 [[nodiscard]]
auto is_running() const noexcept ->
bool {
return !m_stop_requested.load(std::memory_order_acquire); }
144 auto worker_loop(std::size_t worker_id) noexcept -> void;
147 auto set_thread_name(std::string_view name)
const noexcept -> void;
149 mutable std::mutex m_queue_mutex;
150 std::condition_variable m_task_available;
151 mutable std::condition_variable m_all_idle;
152 std::queue<std::function<void()>> m_task_queue;
154 std::vector<std::thread> m_workers;
155 std::atomic<bool> m_stop_requested{
false};
156 std::string m_thread_name_prefix;
161 template <
typename Callable,
typename... Args>
163 -> std::future<std::invoke_result_t<std::decay_t<Callable>, std::decay_t<Args>...>> {
165 using ReturnType = std::invoke_result_t<std::decay_t<Callable>, std::decay_t<Args>...>;
167 auto task = std::make_shared<std::packaged_task<ReturnType()>>(
168 [callable = std::forward<Callable>(callable),
169 args_tuple = std::make_tuple(std::forward<Args>(args)...)]()
mutable -> ReturnType {
170 return std::apply(std::move(callable), std::move(args_tuple));
173 auto future = task->get_future();
175 std::lock_guard lock{m_queue_mutex};
176 if (m_stop_requested.load(std::memory_order_acquire))
throw std::runtime_error(
"Cannot submit task to stopped ThreadPool");
177 m_task_queue.emplace([task = std::move(task)]() noexcept {
try {(*task)();}
catch (...) {} });
180 m_stats.
tasks_pending.fetch_add(1, std::memory_order_acq_rel);
182 m_task_available.notify_one();
187 constexpr std::size_t MAX_REASONABLE_THREADS = 100;
188 constexpr
auto DEFAULT_SHUTDOWN_TIMEOUT = std::chrono::seconds{5};
192 const auto elapsed = std::chrono::steady_clock::now() - start_time;
193 const auto seconds = std::chrono::duration<double>(elapsed).count();
194 return seconds > 0.0 ?
static_cast<double>(tasks_completed.load(std::memory_order_acquire)) / seconds : 0.0;
198 return std::chrono::steady_clock::now() - start_time;
202 : tasks_submitted{other.
tasks_submitted.load(std::memory_order_acquire)},
203 tasks_completed{other.tasks_completed.load(std::memory_order_acquire)},
204 tasks_failed{other.tasks_failed.load(std::memory_order_acquire)},
205 tasks_pending{other.tasks_pending.load(std::memory_order_acquire)},
206 start_time{other.start_time} {
208 other.tasks_submitted.store(0, std::memory_order_release);
209 other.tasks_completed.store(0, std::memory_order_release);
210 other.tasks_failed.store(0, std::memory_order_release);
211 other.tasks_pending.store(0, std::memory_order_release);
216 if (
this != &other) {
217 tasks_submitted.store(other.tasks_submitted.load(std::memory_order_acquire), std::memory_order_release);
218 tasks_completed.store(other.tasks_completed.load(std::memory_order_acquire), std::memory_order_release);
219 tasks_failed.store(other.tasks_failed.load(std::memory_order_acquire), std::memory_order_release);
220 tasks_pending.store(other.tasks_pending.load(std::memory_order_acquire), std::memory_order_release);
224 other.tasks_submitted.store(0, std::memory_order_release);
225 other.tasks_completed.store(0, std::memory_order_release);
226 other.tasks_failed.store(0, std::memory_order_release);
227 other.tasks_pending.store(0, std::memory_order_release);
233 : m_thread_name_prefix{thread_name_prefix} {
235 if (num_threads == 0)
throw std::invalid_argument(
"ThreadPool: num_threads must be greater than 0");
236 if (num_threads > MAX_REASONABLE_THREADS)
throw std::invalid_argument(
"ThreadPool: Requested thread count exceeds reasonable limit");
238 m_workers.reserve(num_threads);
241 for (std::size_t i = 0; i < num_threads; ++i) {
242 m_workers.emplace_back(&ThreadPool::worker_loop,
this, i);
244 }
catch (
const std::system_error& e) {
245 m_stop_requested.store(
true, std::memory_order_release);
246 m_task_available.notify_all();
248 for (
auto& worker : m_workers) {
249 if (worker.joinable()) worker.join();
263 m_task_queue{std::move(other.m_task_queue)},
264 m_workers{std::move(other.m_workers)},
265 m_stop_requested{other.m_stop_requested.load(std::memory_order_acquire)},
266 m_thread_name_prefix{std::move(other.m_thread_name_prefix)},
267 m_stats{std::move(other.m_stats)} {
269 other.m_stop_requested.store(
true, std::memory_order_release);
273 if (
this != &other) {
274 if (is_running()) shutdown(DEFAULT_SHUTDOWN_TIMEOUT);
277 std::lock_guard lock{m_queue_mutex};
278 std::lock_guard other_lock{other.m_queue_mutex};
280 m_task_queue = std::move(other.m_task_queue);
281 m_workers = std::move(other.m_workers);
282 m_stop_requested.store(other.m_stop_requested.load(std::memory_order_acquire), std::memory_order_release);
283 m_thread_name_prefix = std::move(other.m_thread_name_prefix);
284 m_stats = std::move(other.m_stats);
287 other.m_stop_requested.store(
true, std::memory_order_release);
293 const auto deadline = std::chrono::steady_clock::now() + timeout;
295 bool expected =
false;
296 if (!m_stop_requested.compare_exchange_strong(expected,
true, std::memory_order_acq_rel)) fmt::print(
"Pool was already stopped\n");
299 m_task_available.notify_all();
301 bool all_joined =
true;
302 for (
auto& worker : m_workers) {
303 if (worker.joinable()) {
304 if (std::chrono::steady_clock::now() < deadline) {
307 }
catch (
const std::system_error&) {
321 std::lock_guard lock{m_queue_mutex};
322 std::queue<std::function<void()>> empty;
323 m_task_queue.swap(empty);
324 m_stats.tasks_pending.store(0, std::memory_order_release);
326 ShutdownResult shutdown_result = all_joined ? ShutdownResult::Graceful : ShutdownResult::Forced;
328 switch (shutdown_result) {
330 fmt::print(
"Pool shutdown gracefully\n");
333 fmt::print(
"Pool shutdown was forced (timeout)\n");
336 fmt::print(
"Pool was already stopped\n");
342 std::unique_lock lock{m_queue_mutex};
343 return m_all_idle.wait_for(lock, timeout, [
this] {
344 return m_task_queue.empty() &&
345 m_stats.tasks_pending.load(std::memory_order_acquire) == 0;
349 inline auto ThreadPool::worker_loop(std::size_t worker_id) noexcept ->
void {
350 set_thread_name(m_thread_name_prefix +
"_" + std::to_string(worker_id));
352 while (!m_stop_requested.load(std::memory_order_acquire)) {
353 std::function<void()> task;
355 std::unique_lock lock{m_queue_mutex};
358 m_task_available.wait(lock, [
this] {
359 return m_stop_requested.load(std::memory_order_acquire) || !m_task_queue.empty();
363 if (m_stop_requested.load(std::memory_order_acquire) && m_task_queue.empty()) {
368 if (!m_task_queue.empty()) {
369 task = std::move(m_task_queue.front());
384 m_stats.tasks_completed.fetch_add(1, std::memory_order_acq_rel);
385 }
catch (
const std::exception& e) {
386 m_stats.tasks_failed.fetch_add(1, std::memory_order_acq_rel);
388 m_stats.tasks_failed.fetch_add(1, std::memory_order_acq_rel);
392 const auto remaining = m_stats.tasks_pending.fetch_sub(1, std::memory_order_acq_rel);
393 if (remaining == 1) {
394 std::lock_guard lock{m_queue_mutex};
395 if (m_task_queue.empty()) m_all_idle.notify_all();
401 inline auto ThreadPool::set_thread_name(std::string_view name)
const noexcept ->
void {
403 #if defined(__linux__)
405 constexpr std::size_t MAX_NAME_LENGTH = 15;
406 std::string truncated_name{name.substr(0, MAX_NAME_LENGTH)};
407 pthread_setname_np(pthread_self(), truncated_name.c_str());
409 #elif defined(_WIN32)
411 std::wstring wide_name;
412 wide_name.reserve(name.size());
413 std::transform(name.begin(), name.end(), std::back_inserter(wide_name),
414 [](
char c) { return static_cast<wchar_t>(c); });
416 SetThreadDescription(GetCurrentThread(), wide_name.c_str());
418 #elif defined(__APPLE__)
420 std::string name_str{name};
421 pthread_setname_np(name_str.c_str());
A ThreadPool implementation for managing asynchronous task execution.
ShutdownResult
Result status of thread pool shutdown.
@ AlreadyStopped
Pool was already stopped.
@ Graceful
All tasks completed within timeout.
@ Forced
Timeout exceeded, threads detached.
auto get_statistics() const noexcept -> const Statistics &
Gets current statistics snapshot.
auto pending_tasks() const noexcept -> std::size_t
Gets the number of pending tasks.
auto thread_count() const noexcept -> std::size_t
Gets the number of worker threads.
ThreadPool & operator=(const ThreadPool &)=delete
auto is_running() const noexcept -> bool
Checks if the pool is running (not stopped).
auto wait_for_idle(std::chrono::milliseconds timeout=std::chrono::milliseconds::max()) const -> bool
Waits for all pending tasks to complete.
auto shutdown(std::chrono::milliseconds timeout=std::chrono::seconds{5}) noexcept -> void
Initiates graceful shutdown with specified timeout.
auto get_throughput() const noexcept -> double
Calculates tasks completed per second.
auto submit(Callable &&callable, Args &&... args) -> std::future< std::invoke_result_t< std::decay_t< Callable >, std::decay_t< Args >... >>
Submits a task for asynchronous execution.
ThreadPool(std::size_t num_threads=0, std::string_view thread_name_prefix="ThreadPool")
Constructs a ThreadPool with specified number of worker threads.
~ThreadPool() noexcept
Destructor ensures graceful shutdown with reasonable timeout.
auto get_uptime() const noexcept -> std::chrono::duration< double >
Calculates time elapsed since pool creation.
Runtime statistics for thread pool performance monitoring.
std::atomic< std::size_t > tasks_pending
Current number of pending (not yet executed) tasks.
Statistics(const Statistics &)=delete
Non-copyable (atomic members cannot be copied).
std::atomic< std::size_t > tasks_failed
Total tasks that failed or threw exceptions.
std::atomic< std::size_t > tasks_submitted
Total tasks submitted to the pool.
Statistics & operator=(const Statistics &)=delete
Non-copyable (atomic members cannot be copied).
Statistics()=default
Default constructor.
std::chrono::steady_clock::time_point start_time
Pool creation timestamp.
std::atomic< std::size_t > tasks_completed
Total tasks completed successfully.