xref: /freebsd/contrib/llvm-project/llvm/lib/ExecutionEngine/Orc/TargetProcess/SimpleRemoteEPCServer.cpp (revision 59c8e88e72633afbc47a4ace0d2170d00d51f7dc)
1 //===------- SimpleEPCServer.cpp - EPC over simple abstract channel -------===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
9 #include "llvm/ExecutionEngine/Orc/TargetProcess/SimpleRemoteEPCServer.h"
10 
11 #include "llvm/ExecutionEngine/Orc/Shared/TargetProcessControlTypes.h"
12 #include "llvm/Support/FormatVariadic.h"
13 #include "llvm/Support/Process.h"
14 #include "llvm/TargetParser/Host.h"
15 
16 #include "OrcRTBootstrap.h"
17 
18 #define DEBUG_TYPE "orc"
19 
20 using namespace llvm::orc::shared;
21 
22 namespace llvm {
23 namespace orc {
24 
25 ExecutorBootstrapService::~ExecutorBootstrapService() = default;
26 
27 SimpleRemoteEPCServer::Dispatcher::~Dispatcher() = default;
28 
29 #if LLVM_ENABLE_THREADS
30 void SimpleRemoteEPCServer::ThreadDispatcher::dispatch(
31     unique_function<void()> Work) {
32   {
33     std::lock_guard<std::mutex> Lock(DispatchMutex);
34     if (!Running)
35       return;
36     ++Outstanding;
37   }
38 
39   std::thread([this, Work = std::move(Work)]() mutable {
40     Work();
41     std::lock_guard<std::mutex> Lock(DispatchMutex);
42     --Outstanding;
43     OutstandingCV.notify_all();
44   }).detach();
45 }
46 
47 void SimpleRemoteEPCServer::ThreadDispatcher::shutdown() {
48   std::unique_lock<std::mutex> Lock(DispatchMutex);
49   Running = false;
50   OutstandingCV.wait(Lock, [this]() { return Outstanding == 0; });
51 }
52 #endif
53 
54 StringMap<ExecutorAddr> SimpleRemoteEPCServer::defaultBootstrapSymbols() {
55   StringMap<ExecutorAddr> DBS;
56   rt_bootstrap::addTo(DBS);
57   return DBS;
58 }
59 
60 Expected<SimpleRemoteEPCTransportClient::HandleMessageAction>
61 SimpleRemoteEPCServer::handleMessage(SimpleRemoteEPCOpcode OpC, uint64_t SeqNo,
62                                      ExecutorAddr TagAddr,
63                                      SimpleRemoteEPCArgBytesVector ArgBytes) {
64 
65   LLVM_DEBUG({
66     dbgs() << "SimpleRemoteEPCServer::handleMessage: opc = ";
67     switch (OpC) {
68     case SimpleRemoteEPCOpcode::Setup:
69       dbgs() << "Setup";
70       assert(SeqNo == 0 && "Non-zero SeqNo for Setup?");
71       assert(!TagAddr && "Non-zero TagAddr for Setup?");
72       break;
73     case SimpleRemoteEPCOpcode::Hangup:
74       dbgs() << "Hangup";
75       assert(SeqNo == 0 && "Non-zero SeqNo for Hangup?");
76       assert(!TagAddr && "Non-zero TagAddr for Hangup?");
77       break;
78     case SimpleRemoteEPCOpcode::Result:
79       dbgs() << "Result";
80       assert(!TagAddr && "Non-zero TagAddr for Result?");
81       break;
82     case SimpleRemoteEPCOpcode::CallWrapper:
83       dbgs() << "CallWrapper";
84       break;
85     }
86     dbgs() << ", seqno = " << SeqNo << ", tag-addr = " << TagAddr
87            << ", arg-buffer = " << formatv("{0:x}", ArgBytes.size())
88            << " bytes\n";
89   });
90 
91   using UT = std::underlying_type_t<SimpleRemoteEPCOpcode>;
92   if (static_cast<UT>(OpC) > static_cast<UT>(SimpleRemoteEPCOpcode::LastOpC))
93     return make_error<StringError>("Unexpected opcode",
94                                    inconvertibleErrorCode());
95 
96   // TODO: Clean detach message?
97   switch (OpC) {
98   case SimpleRemoteEPCOpcode::Setup:
99     return make_error<StringError>("Unexpected Setup opcode",
100                                    inconvertibleErrorCode());
101   case SimpleRemoteEPCOpcode::Hangup:
102     return SimpleRemoteEPCTransportClient::EndSession;
103   case SimpleRemoteEPCOpcode::Result:
104     if (auto Err = handleResult(SeqNo, TagAddr, std::move(ArgBytes)))
105       return std::move(Err);
106     break;
107   case SimpleRemoteEPCOpcode::CallWrapper:
108     handleCallWrapper(SeqNo, TagAddr, std::move(ArgBytes));
109     break;
110   }
111   return ContinueSession;
112 }
113 
114 Error SimpleRemoteEPCServer::waitForDisconnect() {
115   std::unique_lock<std::mutex> Lock(ServerStateMutex);
116   ShutdownCV.wait(Lock, [this]() { return RunState == ServerShutDown; });
117   return std::move(ShutdownErr);
118 }
119 
120 void SimpleRemoteEPCServer::handleDisconnect(Error Err) {
121   PendingJITDispatchResultsMap TmpPending;
122 
123   {
124     std::lock_guard<std::mutex> Lock(ServerStateMutex);
125     std::swap(TmpPending, PendingJITDispatchResults);
126     RunState = ServerShuttingDown;
127   }
128 
129   // Send out-of-band errors to any waiting threads.
130   for (auto &KV : TmpPending)
131     KV.second->set_value(
132         shared::WrapperFunctionResult::createOutOfBandError("disconnecting"));
133 
134   // Wait for dispatcher to clear.
135   D->shutdown();
136 
137   // Shut down services.
138   while (!Services.empty()) {
139     ShutdownErr =
140       joinErrors(std::move(ShutdownErr), Services.back()->shutdown());
141     Services.pop_back();
142   }
143 
144   std::lock_guard<std::mutex> Lock(ServerStateMutex);
145   ShutdownErr = joinErrors(std::move(ShutdownErr), std::move(Err));
146   RunState = ServerShutDown;
147   ShutdownCV.notify_all();
148 }
149 
150 Error SimpleRemoteEPCServer::sendMessage(SimpleRemoteEPCOpcode OpC,
151                                          uint64_t SeqNo, ExecutorAddr TagAddr,
152                                          ArrayRef<char> ArgBytes) {
153 
154   LLVM_DEBUG({
155     dbgs() << "SimpleRemoteEPCServer::sendMessage: opc = ";
156     switch (OpC) {
157     case SimpleRemoteEPCOpcode::Setup:
158       dbgs() << "Setup";
159       assert(SeqNo == 0 && "Non-zero SeqNo for Setup?");
160       assert(!TagAddr && "Non-zero TagAddr for Setup?");
161       break;
162     case SimpleRemoteEPCOpcode::Hangup:
163       dbgs() << "Hangup";
164       assert(SeqNo == 0 && "Non-zero SeqNo for Hangup?");
165       assert(!TagAddr && "Non-zero TagAddr for Hangup?");
166       break;
167     case SimpleRemoteEPCOpcode::Result:
168       dbgs() << "Result";
169       assert(!TagAddr && "Non-zero TagAddr for Result?");
170       break;
171     case SimpleRemoteEPCOpcode::CallWrapper:
172       dbgs() << "CallWrapper";
173       break;
174     }
175     dbgs() << ", seqno = " << SeqNo << ", tag-addr = " << TagAddr
176            << ", arg-buffer = " << formatv("{0:x}", ArgBytes.size())
177            << " bytes\n";
178   });
179   auto Err = T->sendMessage(OpC, SeqNo, TagAddr, ArgBytes);
180   LLVM_DEBUG({
181     if (Err)
182       dbgs() << "  \\--> SimpleRemoteEPC::sendMessage failed\n";
183   });
184   return Err;
185 }
186 
187 Error SimpleRemoteEPCServer::sendSetupMessage(
188     StringMap<std::vector<char>> BootstrapMap,
189     StringMap<ExecutorAddr> BootstrapSymbols) {
190 
191   using namespace SimpleRemoteEPCDefaultBootstrapSymbolNames;
192 
193   std::vector<char> SetupPacket;
194   SimpleRemoteEPCExecutorInfo EI;
195   EI.TargetTriple = sys::getProcessTriple();
196   if (auto PageSize = sys::Process::getPageSize())
197     EI.PageSize = *PageSize;
198   else
199     return PageSize.takeError();
200   EI.BootstrapMap = std::move(BootstrapMap);
201   EI.BootstrapSymbols = std::move(BootstrapSymbols);
202 
203   assert(!EI.BootstrapSymbols.count(ExecutorSessionObjectName) &&
204          "Dispatch context name should not be set");
205   assert(!EI.BootstrapSymbols.count(DispatchFnName) &&
206          "Dispatch function name should not be set");
207   EI.BootstrapSymbols[ExecutorSessionObjectName] = ExecutorAddr::fromPtr(this);
208   EI.BootstrapSymbols[DispatchFnName] = ExecutorAddr::fromPtr(jitDispatchEntry);
209 
210   using SPSSerialize =
211       shared::SPSArgList<shared::SPSSimpleRemoteEPCExecutorInfo>;
212   auto SetupPacketBytes =
213       shared::WrapperFunctionResult::allocate(SPSSerialize::size(EI));
214   shared::SPSOutputBuffer OB(SetupPacketBytes.data(), SetupPacketBytes.size());
215   if (!SPSSerialize::serialize(OB, EI))
216     return make_error<StringError>("Could not send setup packet",
217                                    inconvertibleErrorCode());
218 
219   return sendMessage(SimpleRemoteEPCOpcode::Setup, 0, ExecutorAddr(),
220                      {SetupPacketBytes.data(), SetupPacketBytes.size()});
221 }
222 
223 Error SimpleRemoteEPCServer::handleResult(
224     uint64_t SeqNo, ExecutorAddr TagAddr,
225     SimpleRemoteEPCArgBytesVector ArgBytes) {
226   std::promise<shared::WrapperFunctionResult> *P = nullptr;
227   {
228     std::lock_guard<std::mutex> Lock(ServerStateMutex);
229     auto I = PendingJITDispatchResults.find(SeqNo);
230     if (I == PendingJITDispatchResults.end())
231       return make_error<StringError>("No call for sequence number " +
232                                          Twine(SeqNo),
233                                      inconvertibleErrorCode());
234     P = I->second;
235     PendingJITDispatchResults.erase(I);
236     releaseSeqNo(SeqNo);
237   }
238   auto R = shared::WrapperFunctionResult::allocate(ArgBytes.size());
239   memcpy(R.data(), ArgBytes.data(), ArgBytes.size());
240   P->set_value(std::move(R));
241   return Error::success();
242 }
243 
244 void SimpleRemoteEPCServer::handleCallWrapper(
245     uint64_t RemoteSeqNo, ExecutorAddr TagAddr,
246     SimpleRemoteEPCArgBytesVector ArgBytes) {
247   D->dispatch([this, RemoteSeqNo, TagAddr, ArgBytes = std::move(ArgBytes)]() {
248     using WrapperFnTy =
249         shared::CWrapperFunctionResult (*)(const char *, size_t);
250     auto *Fn = TagAddr.toPtr<WrapperFnTy>();
251     shared::WrapperFunctionResult ResultBytes(
252         Fn(ArgBytes.data(), ArgBytes.size()));
253     if (auto Err = sendMessage(SimpleRemoteEPCOpcode::Result, RemoteSeqNo,
254                                ExecutorAddr(),
255                                {ResultBytes.data(), ResultBytes.size()}))
256       ReportError(std::move(Err));
257   });
258 }
259 
260 shared::WrapperFunctionResult
261 SimpleRemoteEPCServer::doJITDispatch(const void *FnTag, const char *ArgData,
262                                      size_t ArgSize) {
263   uint64_t SeqNo;
264   std::promise<shared::WrapperFunctionResult> ResultP;
265   auto ResultF = ResultP.get_future();
266   {
267     std::lock_guard<std::mutex> Lock(ServerStateMutex);
268     if (RunState != ServerRunning)
269       return shared::WrapperFunctionResult::createOutOfBandError(
270           "jit_dispatch not available (EPC server shut down)");
271 
272     SeqNo = getNextSeqNo();
273     assert(!PendingJITDispatchResults.count(SeqNo) && "SeqNo already in use");
274     PendingJITDispatchResults[SeqNo] = &ResultP;
275   }
276 
277   if (auto Err = sendMessage(SimpleRemoteEPCOpcode::CallWrapper, SeqNo,
278                              ExecutorAddr::fromPtr(FnTag), {ArgData, ArgSize}))
279     ReportError(std::move(Err));
280 
281   return ResultF.get();
282 }
283 
284 shared::CWrapperFunctionResult
285 SimpleRemoteEPCServer::jitDispatchEntry(void *DispatchCtx, const void *FnTag,
286                                         const char *ArgData, size_t ArgSize) {
287   return reinterpret_cast<SimpleRemoteEPCServer *>(DispatchCtx)
288       ->doJITDispatch(FnTag, ArgData, ArgSize)
289       .release();
290 }
291 
292 } // end namespace orc
293 } // end namespace llvm
294