xref: /freebsd/contrib/llvm-project/llvm/lib/Support/Parallel.cpp (revision 47e073941f4e7ca6e9bde3fa65abbfcfed6bfa2b)
1  //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
2  //
3  // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4  // See https://llvm.org/LICENSE.txt for license information.
5  // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6  //
7  //===----------------------------------------------------------------------===//
8  
9  #include "llvm/Support/Parallel.h"
10  #include "llvm/Config/llvm-config.h"
11  #include "llvm/Support/ManagedStatic.h"
12  #include "llvm/Support/Threading.h"
13  
14  #include <atomic>
15  #include <deque>
16  #include <future>
17  #include <thread>
18  #include <vector>
19  
20  llvm::ThreadPoolStrategy llvm::parallel::strategy;
21  
22  namespace llvm {
23  namespace parallel {
24  #if LLVM_ENABLE_THREADS
25  
26  #ifdef _WIN32
27  static thread_local unsigned threadIndex = UINT_MAX;
28  
29  unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; }
30  #else
31  thread_local unsigned threadIndex = UINT_MAX;
32  #endif
33  
34  namespace detail {
35  
36  namespace {
37  
38  /// An abstract class that takes closures and runs them asynchronously.
39  class Executor {
40  public:
41    virtual ~Executor() = default;
42    virtual void add(std::function<void()> func, bool Sequential = false) = 0;
43    virtual size_t getThreadCount() const = 0;
44  
45    static Executor *getDefaultExecutor();
46  };
47  
48  /// An implementation of an Executor that runs closures on a thread pool
49  ///   in filo order.
50  class ThreadPoolExecutor : public Executor {
51  public:
52    explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) {
53      ThreadCount = S.compute_thread_count();
54      // Spawn all but one of the threads in another thread as spawning threads
55      // can take a while.
56      Threads.reserve(ThreadCount);
57      Threads.resize(1);
58      std::lock_guard<std::mutex> Lock(Mutex);
59      // Use operator[] before creating the thread to avoid data race in .size()
60      // in “safe libc++” mode.
61      auto &Thread0 = Threads[0];
62      Thread0 = std::thread([this, S] {
63        for (unsigned I = 1; I < ThreadCount; ++I) {
64          Threads.emplace_back([=] { work(S, I); });
65          if (Stop)
66            break;
67        }
68        ThreadsCreated.set_value();
69        work(S, 0);
70      });
71    }
72  
73    void stop() {
74      {
75        std::lock_guard<std::mutex> Lock(Mutex);
76        if (Stop)
77          return;
78        Stop = true;
79      }
80      Cond.notify_all();
81      ThreadsCreated.get_future().wait();
82    }
83  
84    ~ThreadPoolExecutor() override {
85      stop();
86      std::thread::id CurrentThreadId = std::this_thread::get_id();
87      for (std::thread &T : Threads)
88        if (T.get_id() == CurrentThreadId)
89          T.detach();
90        else
91          T.join();
92    }
93  
94    struct Creator {
95      static void *call() { return new ThreadPoolExecutor(strategy); }
96    };
97    struct Deleter {
98      static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
99    };
100  
101    void add(std::function<void()> F, bool Sequential = false) override {
102      {
103        std::lock_guard<std::mutex> Lock(Mutex);
104        if (Sequential)
105          WorkQueueSequential.emplace_front(std::move(F));
106        else
107          WorkQueue.emplace_back(std::move(F));
108      }
109      Cond.notify_one();
110    }
111  
112    size_t getThreadCount() const override { return ThreadCount; }
113  
114  private:
115    bool hasSequentialTasks() const {
116      return !WorkQueueSequential.empty() && !SequentialQueueIsLocked;
117    }
118  
119    bool hasGeneralTasks() const { return !WorkQueue.empty(); }
120  
121    void work(ThreadPoolStrategy S, unsigned ThreadID) {
122      threadIndex = ThreadID;
123      S.apply_thread_strategy(ThreadID);
124      while (true) {
125        std::unique_lock<std::mutex> Lock(Mutex);
126        Cond.wait(Lock, [&] {
127          return Stop || hasGeneralTasks() || hasSequentialTasks();
128        });
129        if (Stop)
130          break;
131        bool Sequential = hasSequentialTasks();
132        if (Sequential)
133          SequentialQueueIsLocked = true;
134        else
135          assert(hasGeneralTasks());
136  
137        auto &Queue = Sequential ? WorkQueueSequential : WorkQueue;
138        auto Task = std::move(Queue.back());
139        Queue.pop_back();
140        Lock.unlock();
141        Task();
142        if (Sequential)
143          SequentialQueueIsLocked = false;
144      }
145    }
146  
147    std::atomic<bool> Stop{false};
148    std::atomic<bool> SequentialQueueIsLocked{false};
149    std::deque<std::function<void()>> WorkQueue;
150    std::deque<std::function<void()>> WorkQueueSequential;
151    std::mutex Mutex;
152    std::condition_variable Cond;
153    std::promise<void> ThreadsCreated;
154    std::vector<std::thread> Threads;
155    unsigned ThreadCount;
156  };
157  
158  Executor *Executor::getDefaultExecutor() {
159    // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
160    // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
161    // stops the thread pool and waits for any worker thread creation to complete
162    // but does not wait for the threads to finish. The wait for worker thread
163    // creation to complete is important as it prevents intermittent crashes on
164    // Windows due to a race condition between thread creation and process exit.
165    //
166    // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
167    // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
168    // destructor ensures it has been stopped and waits for worker threads to
169    // finish. The wait is important as it prevents intermittent crashes on
170    // Windows when the process is doing a full exit.
171    //
172    // The Windows crashes appear to only occur with the MSVC static runtimes and
173    // are more frequent with the debug static runtime.
174    //
175    // This also prevents intermittent deadlocks on exit with the MinGW runtime.
176  
177    static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
178                         ThreadPoolExecutor::Deleter>
179        ManagedExec;
180    static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
181    return Exec.get();
182  }
183  } // namespace
184  } // namespace detail
185  
186  size_t getThreadCount() {
187    return detail::Executor::getDefaultExecutor()->getThreadCount();
188  }
189  #endif
190  
191  // Latch::sync() called by the dtor may cause one thread to block. If is a dead
192  // lock if all threads in the default executor are blocked. To prevent the dead
193  // lock, only allow the root TaskGroup to run tasks parallelly. In the scenario
194  // of nested parallel_for_each(), only the outermost one runs parallelly.
195  TaskGroup::TaskGroup()
196  #if LLVM_ENABLE_THREADS
197      : Parallel((parallel::strategy.ThreadsRequested != 1) &&
198                 (threadIndex == UINT_MAX)) {}
199  #else
200      : Parallel(false) {}
201  #endif
202  TaskGroup::~TaskGroup() {
203    // We must ensure that all the workloads have finished before decrementing the
204    // instances count.
205    L.sync();
206  }
207  
208  void TaskGroup::spawn(std::function<void()> F, bool Sequential) {
209  #if LLVM_ENABLE_THREADS
210    if (Parallel) {
211      L.inc();
212      detail::Executor::getDefaultExecutor()->add(
213          [&, F = std::move(F)] {
214            F();
215            L.dec();
216          },
217          Sequential);
218      return;
219    }
220  #endif
221    F();
222  }
223  
224  } // namespace parallel
225  } // namespace llvm
226  
227  void llvm::parallelFor(size_t Begin, size_t End,
228                         llvm::function_ref<void(size_t)> Fn) {
229  #if LLVM_ENABLE_THREADS
230    if (parallel::strategy.ThreadsRequested != 1) {
231      auto NumItems = End - Begin;
232      // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
233      // overhead on large inputs.
234      auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
235      if (TaskSize == 0)
236        TaskSize = 1;
237  
238      parallel::TaskGroup TG;
239      for (; Begin + TaskSize < End; Begin += TaskSize) {
240        TG.spawn([=, &Fn] {
241          for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
242            Fn(I);
243        });
244      }
245      if (Begin != End) {
246        TG.spawn([=, &Fn] {
247          for (size_t I = Begin; I != End; ++I)
248            Fn(I);
249        });
250      }
251      return;
252    }
253  #endif
254  
255    for (; Begin != End; ++Begin)
256      Fn(Begin);
257  }
258