Facebook
From Sharp Sloth, 3 Years ago, written in Plain Text.
Embed
Download Paste or View Raw
Hits: 164
  1. //****************************************************************************************
  2. //
  3. // Copyright 2017 Nokia. All rights reserved.
  4. //
  5. //****************************************************************************************
  6. #pragma once
  7.  
  8. #include <utils/concurrency/thread_guard.hpp>
  9. #include <utils/functional/deferred_call.hpp>
  10.  
  11. #include <atomic>
  12. #include <condition_variable>
  13. #include <functional>
  14. #include <future>
  15. #include <memory>
  16. #include <mutex>
  17. #include <queue>
  18. #include <stdexcept>
  19. #include <thread>
  20. #include <vector>
  21. #include <logging/logging.hpp> // added
  22.  
  23. namespace utils {
  24.  
  25. class thread_pool
  26. {
  27.     using job_t = std::function<void()>;
  28.     using mutex_t = std::mutex;
  29.     using lock_t = std::unique_lock<std::mutex>;
  30.     using thread_t = thread_guard_join;
  31.  
  32. public:
  33.     thread_pool();
  34.     explicit thread_pool(std::size_t num_threads); //explicit
  35.  
  36.     thread_pool(const thread_pool&) = delete;
  37.     thread_pool& operator=(const thread_pool&) = delete;
  38.  
  39.     thread_pool(thread_pool&&) = delete;
  40.     thread_pool& operator=(thread_pool&&) = delete;
  41.  
  42.     ~thread_pool();
  43.  
  44.     void shutdown();
  45.  
  46.     template <typename Callable, typename... Args>
  47.     void enqueue(Callable&& f, Args&&... args);
  48.  
  49.     template <typename Callable, typename... Args>
  50.     auto enqueue_with_future(Callable&& f, Args&&... args)
  51.         -> std::future<std::result_of_t<Callable(Args...)>>;
  52.  
  53.  
  54.  
  55.     void start();
  56.  
  57.  
  58. private:
  59.     void run_thread();
  60.  
  61. private:
  62.     std::atomic_bool stop_; // was private
  63.     const bool one_thread_;
  64.     mutex_t queue_mutex_;
  65.     std::condition_variable cv_;
  66.     std::queue<job_t> queue_;
  67.     std::vector<thread_t> workers_;
  68.     logger_handle logger; //added
  69.     std::size_t num_of_threads;
  70.  
  71. };
  72.  
  73. template <typename Callable, typename... Args>
  74. void thread_pool::enqueue(Callable&& f, Args&&... args)
  75. {
  76.     if (stop_)
  77.     {
  78.         return;
  79.     }
  80.  
  81.     {
  82.         lock_t lock(queue_mutex_);
  83.  
  84.         queue_.push(
  85.             deferred_call(std::forward<Callable>(f), std::forward<Args>(args)...));
  86.     }
  87.     cv_.notify_one();
  88. }
  89.  
  90. template <typename Callable, typename... Args>
  91. auto thread_pool::enqueue_with_future(Callable&& f, Args&&... args)
  92.     -> std::future<std::result_of_t<Callable(Args...)>>
  93. {
  94.     using return_t = std::result_of_t<Callable(Args...)>;
  95.  
  96.     if (stop_)
  97.     {
  98.         std::promise<return_t> promise;
  99.         std::future<return_t> res = promise.get_future();
  100.         promise.set_exception(std::make_exception_ptr(
  101.             std::runtime_error{"Can't queue tasks after the pool was stopped"}));
  102.         return res;
  103.     }
  104.  
  105.     auto job = std::make_shared<std::packaged_task<return_t()>>(
  106.         deferred_call(std::forward<Callable>(f), std::forward<Args>(args)...));
  107.  
  108.     auto res = job->get_future();
  109.  
  110.     {
  111.         lock_t lock(queue_mutex_);
  112.  
  113.         queue_.push([job]() { (*job)(); });
  114.     }
  115.     cv_.notify_one();
  116.  
  117.     return res;
  118. }
  119.  
  120. } // namespace utils
  121.