1 //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 9 #include "llvm/Support/Parallel.h" 10 #include "llvm/Config/llvm-config.h" 11 #include "llvm/Support/ManagedStatic.h" 12 #include "llvm/Support/Threading.h" 13 14 #include <atomic> 15 #include <deque> 16 #include <future> 17 #include <thread> 18 #include <vector> 19 20 llvm::ThreadPoolStrategy llvm::parallel::strategy; 21 22 namespace llvm { 23 namespace parallel { 24 #if LLVM_ENABLE_THREADS 25 26 #ifdef _WIN32 27 static thread_local unsigned threadIndex = UINT_MAX; 28 29 unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; } 30 #else 31 thread_local unsigned threadIndex = UINT_MAX; 32 #endif 33 34 namespace detail { 35 36 namespace { 37 38 /// An abstract class that takes closures and runs them asynchronously. 39 class Executor { 40 public: 41 virtual ~Executor() = default; 42 virtual void add(std::function<void()> func, bool Sequential = false) = 0; 43 virtual size_t getThreadCount() const = 0; 44 45 static Executor *getDefaultExecutor(); 46 }; 47 48 /// An implementation of an Executor that runs closures on a thread pool 49 /// in filo order. 50 class ThreadPoolExecutor : public Executor { 51 public: 52 explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) { 53 ThreadCount = S.compute_thread_count(); 54 // Spawn all but one of the threads in another thread as spawning threads 55 // can take a while. 56 Threads.reserve(ThreadCount); 57 Threads.resize(1); 58 std::lock_guard<std::mutex> Lock(Mutex); 59 // Use operator[] before creating the thread to avoid data race in .size() 60 // in “safe libc++” mode. 61 auto &Thread0 = Threads[0]; 62 Thread0 = std::thread([this, S] { 63 for (unsigned I = 1; I < ThreadCount; ++I) { 64 Threads.emplace_back([=] { work(S, I); }); 65 if (Stop) 66 break; 67 } 68 ThreadsCreated.set_value(); 69 work(S, 0); 70 }); 71 } 72 73 void stop() { 74 { 75 std::lock_guard<std::mutex> Lock(Mutex); 76 if (Stop) 77 return; 78 Stop = true; 79 } 80 Cond.notify_all(); 81 ThreadsCreated.get_future().wait(); 82 } 83 84 ~ThreadPoolExecutor() override { 85 stop(); 86 std::thread::id CurrentThreadId = std::this_thread::get_id(); 87 for (std::thread &T : Threads) 88 if (T.get_id() == CurrentThreadId) 89 T.detach(); 90 else 91 T.join(); 92 } 93 94 struct Creator { 95 static void *call() { return new ThreadPoolExecutor(strategy); } 96 }; 97 struct Deleter { 98 static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); } 99 }; 100 101 void add(std::function<void()> F, bool Sequential = false) override { 102 { 103 std::lock_guard<std::mutex> Lock(Mutex); 104 if (Sequential) 105 WorkQueueSequential.emplace_front(std::move(F)); 106 else 107 WorkQueue.emplace_back(std::move(F)); 108 } 109 Cond.notify_one(); 110 } 111 112 size_t getThreadCount() const override { return ThreadCount; } 113 114 private: 115 bool hasSequentialTasks() const { 116 return !WorkQueueSequential.empty() && !SequentialQueueIsLocked; 117 } 118 119 bool hasGeneralTasks() const { return !WorkQueue.empty(); } 120 121 void work(ThreadPoolStrategy S, unsigned ThreadID) { 122 threadIndex = ThreadID; 123 S.apply_thread_strategy(ThreadID); 124 while (true) { 125 std::unique_lock<std::mutex> Lock(Mutex); 126 Cond.wait(Lock, [&] { 127 return Stop || hasGeneralTasks() || hasSequentialTasks(); 128 }); 129 if (Stop) 130 break; 131 bool Sequential = hasSequentialTasks(); 132 if (Sequential) 133 SequentialQueueIsLocked = true; 134 else 135 assert(hasGeneralTasks()); 136 137 auto &Queue = Sequential ? WorkQueueSequential : WorkQueue; 138 auto Task = std::move(Queue.back()); 139 Queue.pop_back(); 140 Lock.unlock(); 141 Task(); 142 if (Sequential) 143 SequentialQueueIsLocked = false; 144 } 145 } 146 147 std::atomic<bool> Stop{false}; 148 std::atomic<bool> SequentialQueueIsLocked{false}; 149 std::deque<std::function<void()>> WorkQueue; 150 std::deque<std::function<void()>> WorkQueueSequential; 151 std::mutex Mutex; 152 std::condition_variable Cond; 153 std::promise<void> ThreadsCreated; 154 std::vector<std::thread> Threads; 155 unsigned ThreadCount; 156 }; 157 158 Executor *Executor::getDefaultExecutor() { 159 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via 160 // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This 161 // stops the thread pool and waits for any worker thread creation to complete 162 // but does not wait for the threads to finish. The wait for worker thread 163 // creation to complete is important as it prevents intermittent crashes on 164 // Windows due to a race condition between thread creation and process exit. 165 // 166 // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to 167 // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor 168 // destructor ensures it has been stopped and waits for worker threads to 169 // finish. The wait is important as it prevents intermittent crashes on 170 // Windows when the process is doing a full exit. 171 // 172 // The Windows crashes appear to only occur with the MSVC static runtimes and 173 // are more frequent with the debug static runtime. 174 // 175 // This also prevents intermittent deadlocks on exit with the MinGW runtime. 176 177 static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator, 178 ThreadPoolExecutor::Deleter> 179 ManagedExec; 180 static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec)); 181 return Exec.get(); 182 } 183 } // namespace 184 } // namespace detail 185 186 size_t getThreadCount() { 187 return detail::Executor::getDefaultExecutor()->getThreadCount(); 188 } 189 #endif 190 191 // Latch::sync() called by the dtor may cause one thread to block. If is a dead 192 // lock if all threads in the default executor are blocked. To prevent the dead 193 // lock, only allow the root TaskGroup to run tasks parallelly. In the scenario 194 // of nested parallel_for_each(), only the outermost one runs parallelly. 195 TaskGroup::TaskGroup() 196 #if LLVM_ENABLE_THREADS 197 : Parallel((parallel::strategy.ThreadsRequested != 1) && 198 (threadIndex == UINT_MAX)) {} 199 #else 200 : Parallel(false) {} 201 #endif 202 TaskGroup::~TaskGroup() { 203 // We must ensure that all the workloads have finished before decrementing the 204 // instances count. 205 L.sync(); 206 } 207 208 void TaskGroup::spawn(std::function<void()> F, bool Sequential) { 209 #if LLVM_ENABLE_THREADS 210 if (Parallel) { 211 L.inc(); 212 detail::Executor::getDefaultExecutor()->add( 213 [&, F = std::move(F)] { 214 F(); 215 L.dec(); 216 }, 217 Sequential); 218 return; 219 } 220 #endif 221 F(); 222 } 223 224 } // namespace parallel 225 } // namespace llvm 226 227 void llvm::parallelFor(size_t Begin, size_t End, 228 llvm::function_ref<void(size_t)> Fn) { 229 #if LLVM_ENABLE_THREADS 230 if (parallel::strategy.ThreadsRequested != 1) { 231 auto NumItems = End - Begin; 232 // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling 233 // overhead on large inputs. 234 auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup; 235 if (TaskSize == 0) 236 TaskSize = 1; 237 238 parallel::TaskGroup TG; 239 for (; Begin + TaskSize < End; Begin += TaskSize) { 240 TG.spawn([=, &Fn] { 241 for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I) 242 Fn(I); 243 }); 244 } 245 if (Begin != End) { 246 TG.spawn([=, &Fn] { 247 for (size_t I = Begin; I != End; ++I) 248 Fn(I); 249 }); 250 } 251 return; 252 } 253 #endif 254 255 for (; Begin != End; ++Begin) 256 Fn(Begin); 257 } 258