//**************************************************************************************** // // Copyright 2017 Nokia. All rights reserved. // //**************************************************************************************** #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include // added namespace utils { class thread_pool { using job_t = std::function; using mutex_t = std::mutex; using lock_t = std::unique_lock; using thread_t = thread_guard_join; public: thread_pool(); explicit thread_pool(std::size_t num_threads); //explicit thread_pool(const thread_pool&) = delete; thread_pool& operator=(const thread_pool&) = delete; thread_pool(thread_pool&&) = delete; thread_pool& operator=(thread_pool&&) = delete; ~thread_pool(); void shutdown(); template void enqueue(Callable&& f, Args&&... args); template auto enqueue_with_future(Callable&& f, Args&&... args) -> std::future>; void start(); private: void run_thread(); private: std::atomic_bool stop_; // was private const bool one_thread_; mutex_t queue_mutex_; std::condition_variable cv_; std::queue queue_; std::vector workers_; logger_handle logger; //added std::size_t num_of_threads; }; template void thread_pool::enqueue(Callable&& f, Args&&... args) { if (stop_) { return; } { lock_t lock(queue_mutex_); queue_.push( deferred_call(std::forward(f), std::forward(args)...)); } cv_.notify_one(); } template auto thread_pool::enqueue_with_future(Callable&& f, Args&&... args) -> std::future> { using return_t = std::result_of_t; if (stop_) { std::promise promise; std::future res = promise.get_future(); promise.set_exception(std::make_exception_ptr( std::runtime_error{"Can't queue tasks after the pool was stopped"})); return res; } auto job = std::make_shared>( deferred_call(std::forward(f), std::forward(args)...)); auto res = job->get_future(); { lock_t lock(queue_mutex_); queue_.push([job]() { (*job)(); }); } cv_.notify_one(); return res; } } // namespace utils