1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3 * Copyright 2023 Red Hat
4 */
5
6 #include "funnel-requestqueue.h"
7
8 #include <linux/atomic.h>
9 #include <linux/compiler.h>
10 #include <linux/wait.h>
11
12 #include "funnel-queue.h"
13 #include "logger.h"
14 #include "memory-alloc.h"
15 #include "thread-utils.h"
16
17 /*
18 * This queue will attempt to handle requests in reasonably sized batches instead of reacting
19 * immediately to each new request. The wait time between batches is dynamically adjusted up or
20 * down to try to balance responsiveness against wasted thread run time.
21 *
22 * If the wait time becomes long enough, the queue will become dormant and must be explicitly
23 * awoken when a new request is enqueued. The enqueue operation updates "newest" in the funnel
24 * queue via xchg (which is a memory barrier), and later checks "dormant" to decide whether to do a
25 * wakeup of the worker thread.
26 *
27 * When deciding to go to sleep, the worker thread sets "dormant" and then examines "newest" to
28 * decide if the funnel queue is idle. In dormant mode, the last examination of "newest" before
29 * going to sleep is done inside the wait_event_interruptible() macro, after a point where one or
30 * more memory barriers have been issued. (Preparing to sleep uses spin locks.) Even if the funnel
31 * queue's "next" field update isn't visible yet to make the entry accessible, its existence will
32 * kick the worker thread out of dormant mode and back into timer-based mode.
33 *
34 * Unbatched requests are used to communicate between different zone threads and will also cause
35 * the queue to awaken immediately.
36 */
37
38 enum {
39 NANOSECOND = 1,
40 MICROSECOND = 1000 * NANOSECOND,
41 MILLISECOND = 1000 * MICROSECOND,
42 DEFAULT_WAIT_TIME = 20 * MICROSECOND,
43 MINIMUM_WAIT_TIME = DEFAULT_WAIT_TIME / 2,
44 MAXIMUM_WAIT_TIME = MILLISECOND,
45 MINIMUM_BATCH = 32,
46 MAXIMUM_BATCH = 64,
47 };
48
49 struct uds_request_queue {
50 /* Wait queue for synchronizing producers and consumer */
51 struct wait_queue_head wait_head;
52 /* Function to process a request */
53 uds_request_queue_processor_fn processor;
54 /* Queue of new incoming requests */
55 struct funnel_queue *main_queue;
56 /* Queue of old requests to retry */
57 struct funnel_queue *retry_queue;
58 /* The thread id of the worker thread */
59 struct thread *thread;
60 /* True if the worker was started */
61 bool started;
62 /* When true, requests can be enqueued */
63 bool running;
64 /* A flag set when the worker is waiting without a timeout */
65 atomic_t dormant;
66 };
67
poll_queues(struct uds_request_queue * queue)68 static inline struct uds_request *poll_queues(struct uds_request_queue *queue)
69 {
70 struct funnel_queue_entry *entry;
71
72 entry = vdo_funnel_queue_poll(queue->retry_queue);
73 if (entry != NULL)
74 return container_of(entry, struct uds_request, queue_link);
75
76 entry = vdo_funnel_queue_poll(queue->main_queue);
77 if (entry != NULL)
78 return container_of(entry, struct uds_request, queue_link);
79
80 return NULL;
81 }
82
are_queues_idle(struct uds_request_queue * queue)83 static inline bool are_queues_idle(struct uds_request_queue *queue)
84 {
85 return vdo_is_funnel_queue_idle(queue->retry_queue) &&
86 vdo_is_funnel_queue_idle(queue->main_queue);
87 }
88
89 /*
90 * Determine if there is a next request to process, and return it if there is. Also return flags
91 * indicating whether the worker thread can sleep (for the use of wait_event() macros) and whether
92 * the thread did sleep before returning a new request.
93 */
dequeue_request(struct uds_request_queue * queue,struct uds_request ** request_ptr,bool * waited_ptr)94 static inline bool dequeue_request(struct uds_request_queue *queue,
95 struct uds_request **request_ptr, bool *waited_ptr)
96 {
97 struct uds_request *request = poll_queues(queue);
98
99 if (request != NULL) {
100 *request_ptr = request;
101 return true;
102 }
103
104 if (!READ_ONCE(queue->running)) {
105 /* Wake the worker thread so it can exit. */
106 *request_ptr = NULL;
107 return true;
108 }
109
110 *request_ptr = NULL;
111 *waited_ptr = true;
112 return false;
113 }
114
wait_for_request(struct uds_request_queue * queue,bool dormant,unsigned long timeout,struct uds_request ** request,bool * waited)115 static void wait_for_request(struct uds_request_queue *queue, bool dormant,
116 unsigned long timeout, struct uds_request **request,
117 bool *waited)
118 {
119 if (dormant) {
120 wait_event_interruptible(queue->wait_head,
121 (dequeue_request(queue, request, waited) ||
122 !are_queues_idle(queue)));
123 return;
124 }
125
126 wait_event_interruptible_hrtimeout(queue->wait_head,
127 dequeue_request(queue, request, waited),
128 ns_to_ktime(timeout));
129 }
130
request_queue_worker(void * arg)131 static void request_queue_worker(void *arg)
132 {
133 struct uds_request_queue *queue = arg;
134 struct uds_request *request = NULL;
135 unsigned long time_batch = DEFAULT_WAIT_TIME;
136 bool dormant = atomic_read(&queue->dormant);
137 bool waited = false;
138 long current_batch = 0;
139
140 for (;;) {
141 wait_for_request(queue, dormant, time_batch, &request, &waited);
142 if (likely(request != NULL)) {
143 current_batch++;
144 queue->processor(request);
145 } else if (!READ_ONCE(queue->running)) {
146 break;
147 }
148
149 if (dormant) {
150 /*
151 * The queue has been roused from dormancy. Clear the flag so enqueuers can
152 * stop broadcasting. No fence is needed for this transition.
153 */
154 atomic_set(&queue->dormant, false);
155 dormant = false;
156 time_batch = DEFAULT_WAIT_TIME;
157 } else if (waited) {
158 /*
159 * We waited for this request to show up. Adjust the wait time to smooth
160 * out the batch size.
161 */
162 if (current_batch < MINIMUM_BATCH) {
163 /*
164 * If the last batch of requests was too small, increase the wait
165 * time.
166 */
167 time_batch += time_batch / 4;
168 if (time_batch >= MAXIMUM_WAIT_TIME) {
169 atomic_set(&queue->dormant, true);
170 dormant = true;
171 }
172 } else if (current_batch > MAXIMUM_BATCH) {
173 /*
174 * If the last batch of requests was too large, decrease the wait
175 * time.
176 */
177 time_batch -= time_batch / 4;
178 if (time_batch < MINIMUM_WAIT_TIME)
179 time_batch = MINIMUM_WAIT_TIME;
180 }
181 current_batch = 0;
182 }
183 }
184
185 /*
186 * Ensure that we process any remaining requests that were enqueued before trying to shut
187 * down. The corresponding write barrier is in uds_request_queue_finish().
188 */
189 smp_rmb();
190 while ((request = poll_queues(queue)) != NULL)
191 queue->processor(request);
192 }
193
uds_make_request_queue(const char * queue_name,uds_request_queue_processor_fn processor,struct uds_request_queue ** queue_ptr)194 int uds_make_request_queue(const char *queue_name,
195 uds_request_queue_processor_fn processor,
196 struct uds_request_queue **queue_ptr)
197 {
198 int result;
199 struct uds_request_queue *queue;
200
201 result = vdo_allocate(1, struct uds_request_queue, __func__, &queue);
202 if (result != VDO_SUCCESS)
203 return result;
204
205 queue->processor = processor;
206 queue->running = true;
207 atomic_set(&queue->dormant, false);
208 init_waitqueue_head(&queue->wait_head);
209
210 result = vdo_make_funnel_queue(&queue->main_queue);
211 if (result != VDO_SUCCESS) {
212 uds_request_queue_finish(queue);
213 return result;
214 }
215
216 result = vdo_make_funnel_queue(&queue->retry_queue);
217 if (result != VDO_SUCCESS) {
218 uds_request_queue_finish(queue);
219 return result;
220 }
221
222 result = vdo_create_thread(request_queue_worker, queue, queue_name,
223 &queue->thread);
224 if (result != VDO_SUCCESS) {
225 uds_request_queue_finish(queue);
226 return result;
227 }
228
229 queue->started = true;
230 *queue_ptr = queue;
231 return UDS_SUCCESS;
232 }
233
wake_up_worker(struct uds_request_queue * queue)234 static inline void wake_up_worker(struct uds_request_queue *queue)
235 {
236 if (wq_has_sleeper(&queue->wait_head))
237 wake_up(&queue->wait_head);
238 }
239
uds_request_queue_enqueue(struct uds_request_queue * queue,struct uds_request * request)240 void uds_request_queue_enqueue(struct uds_request_queue *queue,
241 struct uds_request *request)
242 {
243 struct funnel_queue *sub_queue;
244 bool unbatched = request->unbatched;
245
246 sub_queue = request->requeued ? queue->retry_queue : queue->main_queue;
247 vdo_funnel_queue_put(sub_queue, &request->queue_link);
248
249 /*
250 * We must wake the worker thread when it is dormant. A read fence isn't needed here since
251 * we know the queue operation acts as one.
252 */
253 if (atomic_read(&queue->dormant) || unbatched)
254 wake_up_worker(queue);
255 }
256
uds_request_queue_finish(struct uds_request_queue * queue)257 void uds_request_queue_finish(struct uds_request_queue *queue)
258 {
259 if (queue == NULL)
260 return;
261
262 /*
263 * This memory barrier ensures that any requests we queued will be seen. The point is that
264 * when dequeue_request() sees the following update to the running flag, it will also be
265 * able to see any change we made to a next field in the funnel queue entry. The
266 * corresponding read barrier is in request_queue_worker().
267 */
268 smp_wmb();
269 WRITE_ONCE(queue->running, false);
270
271 if (queue->started) {
272 wake_up_worker(queue);
273 vdo_join_threads(queue->thread);
274 }
275
276 vdo_free_funnel_queue(queue->main_queue);
277 vdo_free_funnel_queue(queue->retry_queue);
278 vdo_free(queue);
279 }
280