1 //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 9 #include "llvm/Support/Parallel.h" 10 #include "llvm/Config/llvm-config.h" 11 #include "llvm/Support/ManagedStatic.h" 12 13 #if LLVM_ENABLE_THREADS 14 15 #include "llvm/Support/Threading.h" 16 17 #include <atomic> 18 #include <future> 19 #include <stack> 20 #include <thread> 21 #include <vector> 22 23 namespace llvm { 24 namespace parallel { 25 namespace detail { 26 27 namespace { 28 29 /// An abstract class that takes closures and runs them asynchronously. 30 class Executor { 31 public: 32 virtual ~Executor() = default; 33 virtual void add(std::function<void()> func) = 0; 34 35 static Executor *getDefaultExecutor(); 36 }; 37 38 /// An implementation of an Executor that runs closures on a thread pool 39 /// in filo order. 40 class ThreadPoolExecutor : public Executor { 41 public: 42 explicit ThreadPoolExecutor(unsigned ThreadCount = hardware_concurrency()) { 43 // Spawn all but one of the threads in another thread as spawning threads 44 // can take a while. 45 Threads.reserve(ThreadCount); 46 Threads.resize(1); 47 std::lock_guard<std::mutex> Lock(Mutex); 48 Threads[0] = std::thread([&, ThreadCount] { 49 for (unsigned i = 1; i < ThreadCount; ++i) { 50 Threads.emplace_back([=] { work(); }); 51 if (Stop) 52 break; 53 } 54 ThreadsCreated.set_value(); 55 work(); 56 }); 57 } 58 59 void stop() { 60 { 61 std::lock_guard<std::mutex> Lock(Mutex); 62 if (Stop) 63 return; 64 Stop = true; 65 } 66 Cond.notify_all(); 67 ThreadsCreated.get_future().wait(); 68 } 69 70 ~ThreadPoolExecutor() override { 71 stop(); 72 std::thread::id CurrentThreadId = std::this_thread::get_id(); 73 for (std::thread &T : Threads) 74 if (T.get_id() == CurrentThreadId) 75 T.detach(); 76 else 77 T.join(); 78 } 79 80 struct Deleter { 81 static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); } 82 }; 83 84 void add(std::function<void()> F) override { 85 { 86 std::lock_guard<std::mutex> Lock(Mutex); 87 WorkStack.push(F); 88 } 89 Cond.notify_one(); 90 } 91 92 private: 93 void work() { 94 while (true) { 95 std::unique_lock<std::mutex> Lock(Mutex); 96 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); }); 97 if (Stop) 98 break; 99 auto Task = WorkStack.top(); 100 WorkStack.pop(); 101 Lock.unlock(); 102 Task(); 103 } 104 } 105 106 std::atomic<bool> Stop{false}; 107 std::stack<std::function<void()>> WorkStack; 108 std::mutex Mutex; 109 std::condition_variable Cond; 110 std::promise<void> ThreadsCreated; 111 std::vector<std::thread> Threads; 112 }; 113 114 Executor *Executor::getDefaultExecutor() { 115 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via 116 // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This 117 // stops the thread pool and waits for any worker thread creation to complete 118 // but does not wait for the threads to finish. The wait for worker thread 119 // creation to complete is important as it prevents intermittent crashes on 120 // Windows due to a race condition between thread creation and process exit. 121 // 122 // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to 123 // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor 124 // destructor ensures it has been stopped and waits for worker threads to 125 // finish. The wait is important as it prevents intermittent crashes on 126 // Windows when the process is doing a full exit. 127 // 128 // The Windows crashes appear to only occur with the MSVC static runtimes and 129 // are more frequent with the debug static runtime. 130 // 131 // This also prevents intermittent deadlocks on exit with the MinGW runtime. 132 static ManagedStatic<ThreadPoolExecutor, object_creator<ThreadPoolExecutor>, 133 ThreadPoolExecutor::Deleter> 134 ManagedExec; 135 static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec)); 136 return Exec.get(); 137 } 138 } // namespace 139 140 static std::atomic<int> TaskGroupInstances; 141 142 // Latch::sync() called by the dtor may cause one thread to block. If is a dead 143 // lock if all threads in the default executor are blocked. To prevent the dead 144 // lock, only allow the first TaskGroup to run tasks parallelly. In the scenario 145 // of nested parallel_for_each(), only the outermost one runs parallelly. 146 TaskGroup::TaskGroup() : Parallel(TaskGroupInstances++ == 0) {} 147 TaskGroup::~TaskGroup() { --TaskGroupInstances; } 148 149 void TaskGroup::spawn(std::function<void()> F) { 150 if (Parallel) { 151 L.inc(); 152 Executor::getDefaultExecutor()->add([&, F] { 153 F(); 154 L.dec(); 155 }); 156 } else { 157 F(); 158 } 159 } 160 161 } // namespace detail 162 } // namespace parallel 163 } // namespace llvm 164 #endif // LLVM_ENABLE_THREADS 165