1 //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 9 #include "llvm/Support/Parallel.h" 10 #include "llvm/Config/llvm-config.h" 11 #include "llvm/Support/ManagedStatic.h" 12 #include "llvm/Support/Threading.h" 13 14 #include <atomic> 15 #include <future> 16 #include <stack> 17 #include <thread> 18 #include <vector> 19 20 llvm::ThreadPoolStrategy llvm::parallel::strategy; 21 22 namespace llvm { 23 namespace parallel { 24 #if LLVM_ENABLE_THREADS 25 26 #ifdef _WIN32 27 static thread_local unsigned threadIndex; 28 29 unsigned getThreadIndex() { return threadIndex; } 30 #else 31 thread_local unsigned threadIndex; 32 #endif 33 34 namespace detail { 35 36 namespace { 37 38 /// An abstract class that takes closures and runs them asynchronously. 39 class Executor { 40 public: 41 virtual ~Executor() = default; 42 virtual void add(std::function<void()> func) = 0; 43 44 static Executor *getDefaultExecutor(); 45 }; 46 47 /// An implementation of an Executor that runs closures on a thread pool 48 /// in filo order. 49 class ThreadPoolExecutor : public Executor { 50 public: 51 explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) { 52 unsigned ThreadCount = S.compute_thread_count(); 53 // Spawn all but one of the threads in another thread as spawning threads 54 // can take a while. 55 Threads.reserve(ThreadCount); 56 Threads.resize(1); 57 std::lock_guard<std::mutex> Lock(Mutex); 58 Threads[0] = std::thread([this, ThreadCount, S] { 59 for (unsigned I = 1; I < ThreadCount; ++I) { 60 Threads.emplace_back([=] { work(S, I); }); 61 if (Stop) 62 break; 63 } 64 ThreadsCreated.set_value(); 65 work(S, 0); 66 }); 67 } 68 69 void stop() { 70 { 71 std::lock_guard<std::mutex> Lock(Mutex); 72 if (Stop) 73 return; 74 Stop = true; 75 } 76 Cond.notify_all(); 77 ThreadsCreated.get_future().wait(); 78 } 79 80 ~ThreadPoolExecutor() override { 81 stop(); 82 std::thread::id CurrentThreadId = std::this_thread::get_id(); 83 for (std::thread &T : Threads) 84 if (T.get_id() == CurrentThreadId) 85 T.detach(); 86 else 87 T.join(); 88 } 89 90 struct Creator { 91 static void *call() { return new ThreadPoolExecutor(strategy); } 92 }; 93 struct Deleter { 94 static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); } 95 }; 96 97 void add(std::function<void()> F) override { 98 { 99 std::lock_guard<std::mutex> Lock(Mutex); 100 WorkStack.push(std::move(F)); 101 } 102 Cond.notify_one(); 103 } 104 105 private: 106 void work(ThreadPoolStrategy S, unsigned ThreadID) { 107 threadIndex = ThreadID; 108 S.apply_thread_strategy(ThreadID); 109 while (true) { 110 std::unique_lock<std::mutex> Lock(Mutex); 111 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); }); 112 if (Stop) 113 break; 114 auto Task = std::move(WorkStack.top()); 115 WorkStack.pop(); 116 Lock.unlock(); 117 Task(); 118 } 119 } 120 121 std::atomic<bool> Stop{false}; 122 std::stack<std::function<void()>> WorkStack; 123 std::mutex Mutex; 124 std::condition_variable Cond; 125 std::promise<void> ThreadsCreated; 126 std::vector<std::thread> Threads; 127 }; 128 129 Executor *Executor::getDefaultExecutor() { 130 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via 131 // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This 132 // stops the thread pool and waits for any worker thread creation to complete 133 // but does not wait for the threads to finish. The wait for worker thread 134 // creation to complete is important as it prevents intermittent crashes on 135 // Windows due to a race condition between thread creation and process exit. 136 // 137 // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to 138 // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor 139 // destructor ensures it has been stopped and waits for worker threads to 140 // finish. The wait is important as it prevents intermittent crashes on 141 // Windows when the process is doing a full exit. 142 // 143 // The Windows crashes appear to only occur with the MSVC static runtimes and 144 // are more frequent with the debug static runtime. 145 // 146 // This also prevents intermittent deadlocks on exit with the MinGW runtime. 147 148 static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator, 149 ThreadPoolExecutor::Deleter> 150 ManagedExec; 151 static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec)); 152 return Exec.get(); 153 } 154 } // namespace 155 } // namespace detail 156 #endif 157 158 static std::atomic<int> TaskGroupInstances; 159 160 // Latch::sync() called by the dtor may cause one thread to block. If is a dead 161 // lock if all threads in the default executor are blocked. To prevent the dead 162 // lock, only allow the first TaskGroup to run tasks parallelly. In the scenario 163 // of nested parallel_for_each(), only the outermost one runs parallelly. 164 TaskGroup::TaskGroup() : Parallel(TaskGroupInstances++ == 0) {} 165 TaskGroup::~TaskGroup() { 166 // We must ensure that all the workloads have finished before decrementing the 167 // instances count. 168 L.sync(); 169 --TaskGroupInstances; 170 } 171 172 void TaskGroup::spawn(std::function<void()> F) { 173 #if LLVM_ENABLE_THREADS 174 if (Parallel) { 175 L.inc(); 176 detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] { 177 F(); 178 L.dec(); 179 }); 180 return; 181 } 182 #endif 183 F(); 184 } 185 186 void TaskGroup::execute(std::function<void()> F) { 187 if (parallel::strategy.ThreadsRequested == 1) 188 F(); 189 else 190 spawn(F); 191 } 192 } // namespace parallel 193 } // namespace llvm 194 195 void llvm::parallelFor(size_t Begin, size_t End, 196 llvm::function_ref<void(size_t)> Fn) { 197 // If we have zero or one items, then do not incur the overhead of spinning up 198 // a task group. They are surprisingly expensive, and because they do not 199 // support nested parallelism, a single entry task group can block parallel 200 // execution underneath them. 201 #if LLVM_ENABLE_THREADS 202 auto NumItems = End - Begin; 203 if (NumItems > 1 && parallel::strategy.ThreadsRequested != 1) { 204 // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling 205 // overhead on large inputs. 206 auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup; 207 if (TaskSize == 0) 208 TaskSize = 1; 209 210 parallel::TaskGroup TG; 211 for (; Begin + TaskSize < End; Begin += TaskSize) { 212 TG.spawn([=, &Fn] { 213 for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I) 214 Fn(I); 215 }); 216 } 217 if (Begin != End) { 218 TG.spawn([=, &Fn] { 219 for (size_t I = Begin; I != End; ++I) 220 Fn(I); 221 }); 222 } 223 return; 224 } 225 #endif 226 227 for (; Begin != End; ++Begin) 228 Fn(Begin); 229 } 230