1 //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 9 #include "llvm/Support/Parallel.h" 10 #include "llvm/Config/llvm-config.h" 11 #include "llvm/Support/ManagedStatic.h" 12 #include "llvm/Support/Threading.h" 13 14 #include <atomic> 15 #include <future> 16 #include <stack> 17 #include <thread> 18 #include <vector> 19 20 llvm::ThreadPoolStrategy llvm::parallel::strategy; 21 22 #if LLVM_ENABLE_THREADS 23 24 namespace llvm { 25 namespace parallel { 26 namespace detail { 27 28 namespace { 29 30 /// An abstract class that takes closures and runs them asynchronously. 31 class Executor { 32 public: 33 virtual ~Executor() = default; 34 virtual void add(std::function<void()> func) = 0; 35 36 static Executor *getDefaultExecutor(); 37 }; 38 39 /// An implementation of an Executor that runs closures on a thread pool 40 /// in filo order. 41 class ThreadPoolExecutor : public Executor { 42 public: 43 explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) { 44 unsigned ThreadCount = S.compute_thread_count(); 45 // Spawn all but one of the threads in another thread as spawning threads 46 // can take a while. 47 Threads.reserve(ThreadCount); 48 Threads.resize(1); 49 std::lock_guard<std::mutex> Lock(Mutex); 50 Threads[0] = std::thread([this, ThreadCount, S] { 51 for (unsigned I = 1; I < ThreadCount; ++I) { 52 Threads.emplace_back([=] { work(S, I); }); 53 if (Stop) 54 break; 55 } 56 ThreadsCreated.set_value(); 57 work(S, 0); 58 }); 59 } 60 61 void stop() { 62 { 63 std::lock_guard<std::mutex> Lock(Mutex); 64 if (Stop) 65 return; 66 Stop = true; 67 } 68 Cond.notify_all(); 69 ThreadsCreated.get_future().wait(); 70 } 71 72 ~ThreadPoolExecutor() override { 73 stop(); 74 std::thread::id CurrentThreadId = std::this_thread::get_id(); 75 for (std::thread &T : Threads) 76 if (T.get_id() == CurrentThreadId) 77 T.detach(); 78 else 79 T.join(); 80 } 81 82 struct Creator { 83 static void *call() { return new ThreadPoolExecutor(strategy); } 84 }; 85 struct Deleter { 86 static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); } 87 }; 88 89 void add(std::function<void()> F) override { 90 { 91 std::lock_guard<std::mutex> Lock(Mutex); 92 WorkStack.push(F); 93 } 94 Cond.notify_one(); 95 } 96 97 private: 98 void work(ThreadPoolStrategy S, unsigned ThreadID) { 99 S.apply_thread_strategy(ThreadID); 100 while (true) { 101 std::unique_lock<std::mutex> Lock(Mutex); 102 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); }); 103 if (Stop) 104 break; 105 auto Task = WorkStack.top(); 106 WorkStack.pop(); 107 Lock.unlock(); 108 Task(); 109 } 110 } 111 112 std::atomic<bool> Stop{false}; 113 std::stack<std::function<void()>> WorkStack; 114 std::mutex Mutex; 115 std::condition_variable Cond; 116 std::promise<void> ThreadsCreated; 117 std::vector<std::thread> Threads; 118 }; 119 120 Executor *Executor::getDefaultExecutor() { 121 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via 122 // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This 123 // stops the thread pool and waits for any worker thread creation to complete 124 // but does not wait for the threads to finish. The wait for worker thread 125 // creation to complete is important as it prevents intermittent crashes on 126 // Windows due to a race condition between thread creation and process exit. 127 // 128 // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to 129 // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor 130 // destructor ensures it has been stopped and waits for worker threads to 131 // finish. The wait is important as it prevents intermittent crashes on 132 // Windows when the process is doing a full exit. 133 // 134 // The Windows crashes appear to only occur with the MSVC static runtimes and 135 // are more frequent with the debug static runtime. 136 // 137 // This also prevents intermittent deadlocks on exit with the MinGW runtime. 138 139 static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator, 140 ThreadPoolExecutor::Deleter> 141 ManagedExec; 142 static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec)); 143 return Exec.get(); 144 } 145 } // namespace 146 147 static std::atomic<int> TaskGroupInstances; 148 149 // Latch::sync() called by the dtor may cause one thread to block. If is a dead 150 // lock if all threads in the default executor are blocked. To prevent the dead 151 // lock, only allow the first TaskGroup to run tasks parallelly. In the scenario 152 // of nested parallel_for_each(), only the outermost one runs parallelly. 153 TaskGroup::TaskGroup() : Parallel(TaskGroupInstances++ == 0) {} 154 TaskGroup::~TaskGroup() { 155 // We must ensure that all the workloads have finished before decrementing the 156 // instances count. 157 L.sync(); 158 --TaskGroupInstances; 159 } 160 161 void TaskGroup::spawn(std::function<void()> F) { 162 if (Parallel) { 163 L.inc(); 164 Executor::getDefaultExecutor()->add([&, F] { 165 F(); 166 L.dec(); 167 }); 168 } else { 169 F(); 170 } 171 } 172 173 } // namespace detail 174 } // namespace parallel 175 } // namespace llvm 176 #endif // LLVM_ENABLE_THREADS 177 178 void llvm::parallelForEachN(size_t Begin, size_t End, 179 llvm::function_ref<void(size_t)> Fn) { 180 // If we have zero or one items, then do not incur the overhead of spinning up 181 // a task group. They are surprisingly expensive, and because they do not 182 // support nested parallelism, a single entry task group can block parallel 183 // execution underneath them. 184 #if LLVM_ENABLE_THREADS 185 auto NumItems = End - Begin; 186 if (NumItems > 1 && parallel::strategy.ThreadsRequested != 1) { 187 // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling 188 // overhead on large inputs. 189 auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup; 190 if (TaskSize == 0) 191 TaskSize = 1; 192 193 parallel::detail::TaskGroup TG; 194 for (; Begin + TaskSize < End; Begin += TaskSize) { 195 TG.spawn([=, &Fn] { 196 for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I) 197 Fn(I); 198 }); 199 } 200 for (; Begin != End; ++Begin) 201 Fn(Begin); 202 return; 203 } 204 #endif 205 206 for (; Begin != End; ++Begin) 207 Fn(Begin); 208 } 209