- //****************************************************************************************
- //
- // Copyright 2017 Nokia. All rights reserved.
- //
- //****************************************************************************************
- #pragma once
- #include <utils/concurrency/thread_guard.hpp>
- #include <utils/functional/deferred_call.hpp>
- #include <atomic>
- #include <condition_variable>
- #include <functional>
- #include <future>
- #include <memory>
- #include <mutex>
- #include <queue>
- #include <stdexcept>
- #include <thread>
- #include <vector>
- #include <logging/logging.hpp> // added
- namespace utils {
- class thread_pool
- {
- using job_t = std::function<void()>;
- using mutex_t = std::mutex;
- using lock_t = std::unique_lock<std::mutex>;
- using thread_t = thread_guard_join;
- public:
- thread_pool();
- explicit thread_pool(std::size_t num_threads); //explicit
- thread_pool(const thread_pool&) = delete;
- thread_pool& operator=(const thread_pool&) = delete;
- thread_pool(thread_pool&&) = delete;
- thread_pool& operator=(thread_pool&&) = delete;
- ~thread_pool();
- void shutdown();
- template <typename Callable, typename... Args>
- void enqueue(Callable&& f, Args&&... args);
- template <typename Callable, typename... Args>
- auto enqueue_with_future(Callable&& f, Args&&... args)
- -> std::future<std::result_of_t<Callable(Args...)>>;
- void start();
- private:
- void run_thread();
- private:
- std::atomic_bool stop_; // was private
- const bool one_thread_;
- mutex_t queue_mutex_;
- std::condition_variable cv_;
- std::queue<job_t> queue_;
- std::vector<thread_t> workers_;
- logger_handle logger; //added
- std::size_t num_of_threads;
- };
- template <typename Callable, typename... Args>
- void thread_pool::enqueue(Callable&& f, Args&&... args)
- {
- if (stop_)
- {
- return;
- }
- {
- lock_t lock(queue_mutex_);
- queue_.push(
- deferred_call(std::forward<Callable>(f), std::forward<Args>(args)...));
- }
- cv_.notify_one();
- }
- template <typename Callable, typename... Args>
- auto thread_pool::enqueue_with_future(Callable&& f, Args&&... args)
- -> std::future<std::result_of_t<Callable(Args...)>>
- {
- using return_t = std::result_of_t<Callable(Args...)>;
- if (stop_)
- {
- std::promise<return_t> promise;
- std::future<return_t> res = promise.get_future();
- promise.set_exception(std::make_exception_ptr(
- std::runtime_error{"Can't queue tasks after the pool was stopped"}));
- return res;
- }
- auto job = std::make_shared<std::packaged_task<return_t()>>(
- deferred_call(std::forward<Callable>(f), std::forward<Args>(args)...));
- auto res = job->get_future();
- {
- lock_t lock(queue_mutex_);
- queue_.push([job]() { (*job)(); });
- }
- cv_.notify_one();
- return res;
- }
- } // namespace utils