1 //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 9 #include "llvm/Support/Parallel.h" 10 #include "llvm/Config/llvm-config.h" 11 12 #if LLVM_ENABLE_THREADS 13 14 #include "llvm/Support/Threading.h" 15 16 #include <atomic> 17 #include <stack> 18 #include <thread> 19 20 namespace llvm { 21 namespace parallel { 22 namespace detail { 23 24 namespace { 25 26 /// An abstract class that takes closures and runs them asynchronously. 27 class Executor { 28 public: 29 virtual ~Executor() = default; 30 virtual void add(std::function<void()> func) = 0; 31 32 static Executor *getDefaultExecutor(); 33 }; 34 35 #if defined(_MSC_VER) 36 /// An Executor that runs tasks via ConcRT. 37 class ConcRTExecutor : public Executor { 38 struct Taskish { 39 Taskish(std::function<void()> Task) : Task(Task) {} 40 41 std::function<void()> Task; 42 43 static void run(void *P) { 44 Taskish *Self = static_cast<Taskish *>(P); 45 Self->Task(); 46 concurrency::Free(Self); 47 } 48 }; 49 50 public: 51 virtual void add(std::function<void()> F) { 52 Concurrency::CurrentScheduler::ScheduleTask( 53 Taskish::run, new (concurrency::Alloc(sizeof(Taskish))) Taskish(F)); 54 } 55 }; 56 57 Executor *Executor::getDefaultExecutor() { 58 static ConcRTExecutor exec; 59 return &exec; 60 } 61 62 #else 63 /// An implementation of an Executor that runs closures on a thread pool 64 /// in filo order. 65 class ThreadPoolExecutor : public Executor { 66 public: 67 explicit ThreadPoolExecutor(unsigned ThreadCount = hardware_concurrency()) 68 : Done(ThreadCount) { 69 // Spawn all but one of the threads in another thread as spawning threads 70 // can take a while. 71 std::thread([&, ThreadCount] { 72 for (size_t i = 1; i < ThreadCount; ++i) { 73 std::thread([=] { work(); }).detach(); 74 } 75 work(); 76 }).detach(); 77 } 78 79 ~ThreadPoolExecutor() override { 80 std::unique_lock<std::mutex> Lock(Mutex); 81 Stop = true; 82 Lock.unlock(); 83 Cond.notify_all(); 84 // Wait for ~Latch. 85 } 86 87 void add(std::function<void()> F) override { 88 std::unique_lock<std::mutex> Lock(Mutex); 89 WorkStack.push(F); 90 Lock.unlock(); 91 Cond.notify_one(); 92 } 93 94 private: 95 void work() { 96 while (true) { 97 std::unique_lock<std::mutex> Lock(Mutex); 98 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); }); 99 if (Stop) 100 break; 101 auto Task = WorkStack.top(); 102 WorkStack.pop(); 103 Lock.unlock(); 104 Task(); 105 } 106 Done.dec(); 107 } 108 109 std::atomic<bool> Stop{false}; 110 std::stack<std::function<void()>> WorkStack; 111 std::mutex Mutex; 112 std::condition_variable Cond; 113 parallel::detail::Latch Done; 114 }; 115 116 Executor *Executor::getDefaultExecutor() { 117 static ThreadPoolExecutor exec; 118 return &exec; 119 } 120 #endif 121 } 122 123 static std::atomic<int> TaskGroupInstances; 124 125 // Latch::sync() called by the dtor may cause one thread to block. If is a dead 126 // lock if all threads in the default executor are blocked. To prevent the dead 127 // lock, only allow the first TaskGroup to run tasks parallelly. In the scenario 128 // of nested parallel_for_each(), only the outermost one runs parallelly. 129 TaskGroup::TaskGroup() : Parallel(TaskGroupInstances++ == 0) {} 130 TaskGroup::~TaskGroup() { --TaskGroupInstances; } 131 132 void TaskGroup::spawn(std::function<void()> F) { 133 if (Parallel) { 134 L.inc(); 135 Executor::getDefaultExecutor()->add([&, F] { 136 F(); 137 L.dec(); 138 }); 139 } else { 140 F(); 141 } 142 } 143 144 } // namespace detail 145 } // namespace parallel 146 } // namespace llvm 147 #endif // LLVM_ENABLE_THREADS 148