xref: /freebsd/contrib/llvm-project/llvm/include/llvm/Support/ThreadPool.h (revision 700637cbb5e582861067a11aaca4d053546871d2)
1 //===-- llvm/Support/ThreadPool.h - A ThreadPool implementation -*- C++ -*-===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 //
9 // This file defines a crude C++11 based thread pool.
10 //
11 //===----------------------------------------------------------------------===//
12 
13 #ifndef LLVM_SUPPORT_THREADPOOL_H
14 #define LLVM_SUPPORT_THREADPOOL_H
15 
16 #include "llvm/ADT/DenseMap.h"
17 #include "llvm/Config/llvm-config.h"
18 #include "llvm/Support/Compiler.h"
19 #include "llvm/Support/RWMutex.h"
20 #include "llvm/Support/Threading.h"
21 #include "llvm/Support/thread.h"
22 
23 #include <future>
24 
25 #include <condition_variable>
26 #include <deque>
27 #include <functional>
28 #include <memory>
29 #include <mutex>
30 #include <utility>
31 
32 namespace llvm {
33 
34 class ThreadPoolTaskGroup;
35 
36 /// This defines the abstract base interface for a ThreadPool allowing
37 /// asynchronous parallel execution on a defined number of threads.
38 ///
39 /// It is possible to reuse one thread pool for different groups of tasks
40 /// by grouping tasks using ThreadPoolTaskGroup. All tasks are processed using
41 /// the same queue, but it is possible to wait only for a specific group of
42 /// tasks to finish.
43 ///
44 /// It is also possible for worker threads to submit new tasks and wait for
45 /// them. Note that this may result in a deadlock in cases such as when a task
46 /// (directly or indirectly) tries to wait for its own completion, or when all
47 /// available threads are used up by tasks waiting for a task that has no thread
48 /// left to run on (this includes waiting on the returned future). It should be
49 /// generally safe to wait() for a group as long as groups do not form a cycle.
50 class LLVM_ABI ThreadPoolInterface {
51   /// The actual method to enqueue a task to be defined by the concrete
52   /// implementation.
53   virtual void asyncEnqueue(std::function<void()> Task,
54                             ThreadPoolTaskGroup *Group) = 0;
55 
56 public:
57   /// Destroying the pool will drain the pending tasks and wait. The current
58   /// thread may participate in the execution of the pending tasks.
59   virtual ~ThreadPoolInterface();
60 
61   /// Blocking wait for all the threads to complete and the queue to be empty.
62   /// It is an error to try to add new tasks while blocking on this call.
63   /// Calling wait() from a task would deadlock waiting for itself.
64   virtual void wait() = 0;
65 
66   /// Blocking wait for only all the threads in the given group to complete.
67   /// It is possible to wait even inside a task, but waiting (directly or
68   /// indirectly) on itself will deadlock. If called from a task running on a
69   /// worker thread, the call may process pending tasks while waiting in order
70   /// not to waste the thread.
71   virtual void wait(ThreadPoolTaskGroup &Group) = 0;
72 
73   /// Returns the maximum number of worker this pool can eventually grow to.
74   virtual unsigned getMaxConcurrency() const = 0;
75 
76   /// Asynchronous submission of a task to the pool. The returned future can be
77   /// used to wait for the task to finish and is *non-blocking* on destruction.
78   template <typename Function, typename... Args>
async(Function && F,Args &&...ArgList)79   auto async(Function &&F, Args &&...ArgList) {
80     auto Task =
81         std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
82     return async(std::move(Task));
83   }
84 
85   /// Overload, task will be in the given task group.
86   template <typename Function, typename... Args>
async(ThreadPoolTaskGroup & Group,Function && F,Args &&...ArgList)87   auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList) {
88     auto Task =
89         std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
90     return async(Group, std::move(Task));
91   }
92 
93   /// Asynchronous submission of a task to the pool. The returned future can be
94   /// used to wait for the task to finish and is *non-blocking* on destruction.
95   template <typename Func>
96   auto async(Func &&F) -> std::shared_future<decltype(F())> {
97     return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
98                      nullptr);
99   }
100 
101   template <typename Func>
102   auto async(ThreadPoolTaskGroup &Group, Func &&F)
103       -> std::shared_future<decltype(F())> {
104     return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
105                      &Group);
106   }
107 
108 private:
109   /// Asynchronous submission of a task to the pool. The returned future can be
110   /// used to wait for the task to finish and is *non-blocking* on destruction.
111   template <typename ResTy>
asyncImpl(std::function<ResTy ()> Task,ThreadPoolTaskGroup * Group)112   std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task,
113                                       ThreadPoolTaskGroup *Group) {
114     auto Future = std::async(std::launch::deferred, std::move(Task)).share();
115     asyncEnqueue([Future]() { Future.wait(); }, Group);
116     return Future;
117   }
118 };
119 
120 #if LLVM_ENABLE_THREADS
121 /// A ThreadPool implementation using std::threads.
122 ///
123 /// The pool keeps a vector of threads alive, waiting on a condition variable
124 /// for some work to become available.
125 class LLVM_ABI StdThreadPool : public ThreadPoolInterface {
126 public:
127   /// Construct a pool using the hardware strategy \p S for mapping hardware
128   /// execution resources (threads, cores, CPUs)
129   /// Defaults to using the maximum execution resources in the system, but
130   /// accounting for the affinity mask.
131   StdThreadPool(ThreadPoolStrategy S = hardware_concurrency());
132 
133   /// Blocking destructor: the pool will wait for all the threads to complete.
134   ~StdThreadPool() override;
135 
136   /// Blocking wait for all the threads to complete and the queue to be empty.
137   /// It is an error to try to add new tasks while blocking on this call.
138   /// Calling wait() from a task would deadlock waiting for itself.
139   void wait() override;
140 
141   /// Blocking wait for only all the threads in the given group to complete.
142   /// It is possible to wait even inside a task, but waiting (directly or
143   /// indirectly) on itself will deadlock. If called from a task running on a
144   /// worker thread, the call may process pending tasks while waiting in order
145   /// not to waste the thread.
146   void wait(ThreadPoolTaskGroup &Group) override;
147 
148   /// Returns the maximum number of worker threads in the pool, not the current
149   /// number of threads!
getMaxConcurrency()150   unsigned getMaxConcurrency() const override { return MaxThreadCount; }
151 
152   // TODO: Remove, misleading legacy name warning!
153   LLVM_DEPRECATED("Use getMaxConcurrency instead", "getMaxConcurrency")
getThreadCount()154   unsigned getThreadCount() const { return MaxThreadCount; }
155 
156   /// Returns true if the current thread is a worker thread of this thread pool.
157   bool isWorkerThread() const;
158 
159 private:
160   /// Returns true if all tasks in the given group have finished (nullptr means
161   /// all tasks regardless of their group). QueueLock must be locked.
162   bool workCompletedUnlocked(ThreadPoolTaskGroup *Group) const;
163 
164   /// Asynchronous submission of a task to the pool. The returned future can be
165   /// used to wait for the task to finish and is *non-blocking* on destruction.
asyncEnqueue(std::function<void ()> Task,ThreadPoolTaskGroup * Group)166   void asyncEnqueue(std::function<void()> Task,
167                     ThreadPoolTaskGroup *Group) override {
168     int requestedThreads;
169     {
170       // Lock the queue and push the new task
171       std::unique_lock<std::mutex> LockGuard(QueueLock);
172 
173       // Don't allow enqueueing after disabling the pool
174       assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
175       Tasks.emplace_back(std::make_pair(std::move(Task), Group));
176       requestedThreads = ActiveThreads + Tasks.size();
177     }
178     QueueCondition.notify_one();
179     grow(requestedThreads);
180   }
181 
182   /// Grow to ensure that we have at least `requested` Threads, but do not go
183   /// over MaxThreadCount.
184   void grow(int requested);
185 
186   void processTasks(ThreadPoolTaskGroup *WaitingForGroup);
187 
188   /// Threads in flight
189   std::vector<llvm::thread> Threads;
190   /// Lock protecting access to the Threads vector.
191   mutable llvm::sys::RWMutex ThreadsLock;
192 
193   /// Tasks waiting for execution in the pool.
194   std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks;
195 
196   /// Locking and signaling for accessing the Tasks queue.
197   std::mutex QueueLock;
198   std::condition_variable QueueCondition;
199 
200   /// Signaling for job completion (all tasks or all tasks in a group).
201   std::condition_variable CompletionCondition;
202 
203   /// Keep track of the number of thread actually busy
204   unsigned ActiveThreads = 0;
205   /// Number of threads active for tasks in the given group (only non-zero).
206   DenseMap<ThreadPoolTaskGroup *, unsigned> ActiveGroups;
207 
208   /// Signal for the destruction of the pool, asking thread to exit.
209   bool EnableFlag = true;
210 
211   const ThreadPoolStrategy Strategy;
212 
213   /// Maximum number of threads to potentially grow this pool to.
214   const unsigned MaxThreadCount;
215 };
216 #endif // LLVM_ENABLE_THREADS
217 
218 /// A non-threaded implementation.
219 class LLVM_ABI SingleThreadExecutor : public ThreadPoolInterface {
220 public:
221   /// Construct a non-threaded pool, ignoring using the hardware strategy.
222   SingleThreadExecutor(ThreadPoolStrategy ignored = {});
223 
224   /// Blocking destructor: the pool will first execute the pending tasks.
225   ~SingleThreadExecutor() override;
226 
227   /// Blocking wait for all the tasks to execute first
228   void wait() override;
229 
230   /// Blocking wait for only all the tasks in the given group to complete.
231   void wait(ThreadPoolTaskGroup &Group) override;
232 
233   /// Returns always 1: there is no concurrency.
getMaxConcurrency()234   unsigned getMaxConcurrency() const override { return 1; }
235 
236   // TODO: Remove, misleading legacy name warning!
237   LLVM_DEPRECATED("Use getMaxConcurrency instead", "getMaxConcurrency")
getThreadCount()238   unsigned getThreadCount() const { return 1; }
239 
240   /// Returns true if the current thread is a worker thread of this thread pool.
241   bool isWorkerThread() const;
242 
243 private:
244   /// Asynchronous submission of a task to the pool. The returned future can be
245   /// used to wait for the task to finish and is *non-blocking* on destruction.
asyncEnqueue(std::function<void ()> Task,ThreadPoolTaskGroup * Group)246   void asyncEnqueue(std::function<void()> Task,
247                     ThreadPoolTaskGroup *Group) override {
248     Tasks.emplace_back(std::make_pair(std::move(Task), Group));
249   }
250 
251   /// Tasks waiting for execution in the pool.
252   std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks;
253 };
254 
255 #if LLVM_ENABLE_THREADS
256 using DefaultThreadPool = StdThreadPool;
257 #else
258 using DefaultThreadPool = SingleThreadExecutor;
259 #endif
260 
261 /// A group of tasks to be run on a thread pool. Thread pool tasks in different
262 /// groups can run on the same threadpool but can be waited for separately.
263 /// It is even possible for tasks of one group to submit and wait for tasks
264 /// of another group, as long as this does not form a loop.
265 class ThreadPoolTaskGroup {
266 public:
267   /// The ThreadPool argument is the thread pool to forward calls to.
ThreadPoolTaskGroup(ThreadPoolInterface & Pool)268   ThreadPoolTaskGroup(ThreadPoolInterface &Pool) : Pool(Pool) {}
269 
270   /// Blocking destructor: will wait for all the tasks in the group to complete
271   /// by calling ThreadPool::wait().
~ThreadPoolTaskGroup()272   ~ThreadPoolTaskGroup() { wait(); }
273 
274   /// Calls ThreadPool::async() for this group.
275   template <typename Function, typename... Args>
async(Function && F,Args &&...ArgList)276   inline auto async(Function &&F, Args &&...ArgList) {
277     return Pool.async(*this, std::forward<Function>(F),
278                       std::forward<Args>(ArgList)...);
279   }
280 
281   /// Calls ThreadPool::wait() for this group.
wait()282   void wait() { Pool.wait(*this); }
283 
284 private:
285   ThreadPoolInterface &Pool;
286 };
287 
288 } // namespace llvm
289 
290 #endif // LLVM_SUPPORT_THREADPOOL_H
291