xref: /freebsd/contrib/llvm-project/llvm/lib/Support/Parallel.cpp (revision 2ed24e28d1d95c62cc37ca3534d4d33360b4cef2)
1  //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
2  //
3  // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4  // See https://llvm.org/LICENSE.txt for license information.
5  // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6  //
7  //===----------------------------------------------------------------------===//
8  
9  #include "llvm/Support/Parallel.h"
10  #include "llvm/Config/llvm-config.h"
11  #include "llvm/Support/ManagedStatic.h"
12  #include "llvm/Support/Threading.h"
13  
14  #include <atomic>
15  #include <future>
16  #include <thread>
17  #include <vector>
18  
19  llvm::ThreadPoolStrategy llvm::parallel::strategy;
20  
21  namespace llvm {
22  namespace parallel {
23  #if LLVM_ENABLE_THREADS
24  
25  #ifdef _WIN32
26  static thread_local unsigned threadIndex = UINT_MAX;
27  
28  unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; }
29  #else
30  thread_local unsigned threadIndex = UINT_MAX;
31  #endif
32  
33  namespace detail {
34  
35  namespace {
36  
37  /// An abstract class that takes closures and runs them asynchronously.
38  class Executor {
39  public:
40    virtual ~Executor() = default;
41    virtual void add(std::function<void()> func) = 0;
42    virtual size_t getThreadCount() const = 0;
43  
44    static Executor *getDefaultExecutor();
45  };
46  
47  /// An implementation of an Executor that runs closures on a thread pool
48  ///   in filo order.
49  class ThreadPoolExecutor : public Executor {
50  public:
51    explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) {
52      ThreadCount = S.compute_thread_count();
53      // Spawn all but one of the threads in another thread as spawning threads
54      // can take a while.
55      Threads.reserve(ThreadCount);
56      Threads.resize(1);
57      std::lock_guard<std::mutex> Lock(Mutex);
58      // Use operator[] before creating the thread to avoid data race in .size()
59      // in 'safe libc++' mode.
60      auto &Thread0 = Threads[0];
61      Thread0 = std::thread([this, S] {
62        for (unsigned I = 1; I < ThreadCount; ++I) {
63          Threads.emplace_back([=] { work(S, I); });
64          if (Stop)
65            break;
66        }
67        ThreadsCreated.set_value();
68        work(S, 0);
69      });
70    }
71  
72    void stop() {
73      {
74        std::lock_guard<std::mutex> Lock(Mutex);
75        if (Stop)
76          return;
77        Stop = true;
78      }
79      Cond.notify_all();
80      ThreadsCreated.get_future().wait();
81    }
82  
83    ~ThreadPoolExecutor() override {
84      stop();
85      std::thread::id CurrentThreadId = std::this_thread::get_id();
86      for (std::thread &T : Threads)
87        if (T.get_id() == CurrentThreadId)
88          T.detach();
89        else
90          T.join();
91    }
92  
93    struct Creator {
94      static void *call() { return new ThreadPoolExecutor(strategy); }
95    };
96    struct Deleter {
97      static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
98    };
99  
100    void add(std::function<void()> F) override {
101      {
102        std::lock_guard<std::mutex> Lock(Mutex);
103        WorkStack.push_back(std::move(F));
104      }
105      Cond.notify_one();
106    }
107  
108    size_t getThreadCount() const override { return ThreadCount; }
109  
110  private:
111    void work(ThreadPoolStrategy S, unsigned ThreadID) {
112      threadIndex = ThreadID;
113      S.apply_thread_strategy(ThreadID);
114      while (true) {
115        std::unique_lock<std::mutex> Lock(Mutex);
116        Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
117        if (Stop)
118          break;
119        auto Task = std::move(WorkStack.back());
120        WorkStack.pop_back();
121        Lock.unlock();
122        Task();
123      }
124    }
125  
126    std::atomic<bool> Stop{false};
127    std::vector<std::function<void()>> WorkStack;
128    std::mutex Mutex;
129    std::condition_variable Cond;
130    std::promise<void> ThreadsCreated;
131    std::vector<std::thread> Threads;
132    unsigned ThreadCount;
133  };
134  
135  Executor *Executor::getDefaultExecutor() {
136    // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
137    // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
138    // stops the thread pool and waits for any worker thread creation to complete
139    // but does not wait for the threads to finish. The wait for worker thread
140    // creation to complete is important as it prevents intermittent crashes on
141    // Windows due to a race condition between thread creation and process exit.
142    //
143    // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
144    // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
145    // destructor ensures it has been stopped and waits for worker threads to
146    // finish. The wait is important as it prevents intermittent crashes on
147    // Windows when the process is doing a full exit.
148    //
149    // The Windows crashes appear to only occur with the MSVC static runtimes and
150    // are more frequent with the debug static runtime.
151    //
152    // This also prevents intermittent deadlocks on exit with the MinGW runtime.
153  
154    static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
155                         ThreadPoolExecutor::Deleter>
156        ManagedExec;
157    static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
158    return Exec.get();
159  }
160  } // namespace
161  } // namespace detail
162  
163  size_t getThreadCount() {
164    return detail::Executor::getDefaultExecutor()->getThreadCount();
165  }
166  #endif
167  
168  // Latch::sync() called by the dtor may cause one thread to block. If is a dead
169  // lock if all threads in the default executor are blocked. To prevent the dead
170  // lock, only allow the root TaskGroup to run tasks parallelly. In the scenario
171  // of nested parallel_for_each(), only the outermost one runs parallelly.
172  TaskGroup::TaskGroup()
173  #if LLVM_ENABLE_THREADS
174      : Parallel((parallel::strategy.ThreadsRequested != 1) &&
175                 (threadIndex == UINT_MAX)) {}
176  #else
177      : Parallel(false) {}
178  #endif
179  TaskGroup::~TaskGroup() {
180    // We must ensure that all the workloads have finished before decrementing the
181    // instances count.
182    L.sync();
183  }
184  
185  void TaskGroup::spawn(std::function<void()> F) {
186  #if LLVM_ENABLE_THREADS
187    if (Parallel) {
188      L.inc();
189      detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] {
190        F();
191        L.dec();
192      });
193      return;
194    }
195  #endif
196    F();
197  }
198  
199  } // namespace parallel
200  } // namespace llvm
201  
202  void llvm::parallelFor(size_t Begin, size_t End,
203                         llvm::function_ref<void(size_t)> Fn) {
204  #if LLVM_ENABLE_THREADS
205    if (parallel::strategy.ThreadsRequested != 1) {
206      auto NumItems = End - Begin;
207      // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
208      // overhead on large inputs.
209      auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
210      if (TaskSize == 0)
211        TaskSize = 1;
212  
213      parallel::TaskGroup TG;
214      for (; Begin + TaskSize < End; Begin += TaskSize) {
215        TG.spawn([=, &Fn] {
216          for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
217            Fn(I);
218        });
219      }
220      if (Begin != End) {
221        TG.spawn([=, &Fn] {
222          for (size_t I = Begin; I != End; ++I)
223            Fn(I);
224        });
225      }
226      return;
227    }
228  #endif
229  
230    for (; Begin != End; ++Begin)
231      Fn(Begin);
232  }
233