1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 #include "funnel-requestqueue.h" 7 8 #include <linux/atomic.h> 9 #include <linux/compiler.h> 10 #include <linux/wait.h> 11 12 #include "funnel-queue.h" 13 #include "logger.h" 14 #include "memory-alloc.h" 15 #include "thread-utils.h" 16 17 /* 18 * This queue will attempt to handle requests in reasonably sized batches instead of reacting 19 * immediately to each new request. The wait time between batches is dynamically adjusted up or 20 * down to try to balance responsiveness against wasted thread run time. 21 * 22 * If the wait time becomes long enough, the queue will become dormant and must be explicitly 23 * awoken when a new request is enqueued. The enqueue operation updates "newest" in the funnel 24 * queue via xchg (which is a memory barrier), and later checks "dormant" to decide whether to do a 25 * wakeup of the worker thread. 26 * 27 * When deciding to go to sleep, the worker thread sets "dormant" and then examines "newest" to 28 * decide if the funnel queue is idle. In dormant mode, the last examination of "newest" before 29 * going to sleep is done inside the wait_event_interruptible() macro, after a point where one or 30 * more memory barriers have been issued. (Preparing to sleep uses spin locks.) Even if the funnel 31 * queue's "next" field update isn't visible yet to make the entry accessible, its existence will 32 * kick the worker thread out of dormant mode and back into timer-based mode. 33 * 34 * Unbatched requests are used to communicate between different zone threads and will also cause 35 * the queue to awaken immediately. 36 */ 37 38 enum { 39 NANOSECOND = 1, 40 MICROSECOND = 1000 * NANOSECOND, 41 MILLISECOND = 1000 * MICROSECOND, 42 DEFAULT_WAIT_TIME = 20 * MICROSECOND, 43 MINIMUM_WAIT_TIME = DEFAULT_WAIT_TIME / 2, 44 MAXIMUM_WAIT_TIME = MILLISECOND, 45 MINIMUM_BATCH = 32, 46 MAXIMUM_BATCH = 64, 47 }; 48 49 struct uds_request_queue { 50 /* Wait queue for synchronizing producers and consumer */ 51 struct wait_queue_head wait_head; 52 /* Function to process a request */ 53 uds_request_queue_processor_fn processor; 54 /* Queue of new incoming requests */ 55 struct funnel_queue *main_queue; 56 /* Queue of old requests to retry */ 57 struct funnel_queue *retry_queue; 58 /* The thread id of the worker thread */ 59 struct thread *thread; 60 /* True if the worker was started */ 61 bool started; 62 /* When true, requests can be enqueued */ 63 bool running; 64 /* A flag set when the worker is waiting without a timeout */ 65 atomic_t dormant; 66 }; 67 68 static inline struct uds_request *poll_queues(struct uds_request_queue *queue) 69 { 70 struct funnel_queue_entry *entry; 71 72 entry = vdo_funnel_queue_poll(queue->retry_queue); 73 if (entry != NULL) 74 return container_of(entry, struct uds_request, queue_link); 75 76 entry = vdo_funnel_queue_poll(queue->main_queue); 77 if (entry != NULL) 78 return container_of(entry, struct uds_request, queue_link); 79 80 return NULL; 81 } 82 83 static inline bool are_queues_idle(struct uds_request_queue *queue) 84 { 85 return vdo_is_funnel_queue_idle(queue->retry_queue) && 86 vdo_is_funnel_queue_idle(queue->main_queue); 87 } 88 89 /* 90 * Determine if there is a next request to process, and return it if there is. Also return flags 91 * indicating whether the worker thread can sleep (for the use of wait_event() macros) and whether 92 * the thread did sleep before returning a new request. 93 */ 94 static inline bool dequeue_request(struct uds_request_queue *queue, 95 struct uds_request **request_ptr, bool *waited_ptr) 96 { 97 struct uds_request *request = poll_queues(queue); 98 99 if (request != NULL) { 100 *request_ptr = request; 101 return true; 102 } 103 104 if (!READ_ONCE(queue->running)) { 105 /* Wake the worker thread so it can exit. */ 106 *request_ptr = NULL; 107 return true; 108 } 109 110 *request_ptr = NULL; 111 *waited_ptr = true; 112 return false; 113 } 114 115 static void wait_for_request(struct uds_request_queue *queue, bool dormant, 116 unsigned long timeout, struct uds_request **request, 117 bool *waited) 118 { 119 if (dormant) { 120 wait_event_interruptible(queue->wait_head, 121 (dequeue_request(queue, request, waited) || 122 !are_queues_idle(queue))); 123 return; 124 } 125 126 wait_event_interruptible_hrtimeout(queue->wait_head, 127 dequeue_request(queue, request, waited), 128 ns_to_ktime(timeout)); 129 } 130 131 static void request_queue_worker(void *arg) 132 { 133 struct uds_request_queue *queue = arg; 134 struct uds_request *request = NULL; 135 unsigned long time_batch = DEFAULT_WAIT_TIME; 136 bool dormant = atomic_read(&queue->dormant); 137 bool waited = false; 138 long current_batch = 0; 139 140 for (;;) { 141 wait_for_request(queue, dormant, time_batch, &request, &waited); 142 if (likely(request != NULL)) { 143 current_batch++; 144 queue->processor(request); 145 } else if (!READ_ONCE(queue->running)) { 146 break; 147 } 148 149 if (dormant) { 150 /* 151 * The queue has been roused from dormancy. Clear the flag so enqueuers can 152 * stop broadcasting. No fence is needed for this transition. 153 */ 154 atomic_set(&queue->dormant, false); 155 dormant = false; 156 time_batch = DEFAULT_WAIT_TIME; 157 } else if (waited) { 158 /* 159 * We waited for this request to show up. Adjust the wait time to smooth 160 * out the batch size. 161 */ 162 if (current_batch < MINIMUM_BATCH) { 163 /* 164 * If the last batch of requests was too small, increase the wait 165 * time. 166 */ 167 time_batch += time_batch / 4; 168 if (time_batch >= MAXIMUM_WAIT_TIME) { 169 atomic_set(&queue->dormant, true); 170 dormant = true; 171 } 172 } else if (current_batch > MAXIMUM_BATCH) { 173 /* 174 * If the last batch of requests was too large, decrease the wait 175 * time. 176 */ 177 time_batch -= time_batch / 4; 178 if (time_batch < MINIMUM_WAIT_TIME) 179 time_batch = MINIMUM_WAIT_TIME; 180 } 181 current_batch = 0; 182 } 183 } 184 185 /* 186 * Ensure that we process any remaining requests that were enqueued before trying to shut 187 * down. The corresponding write barrier is in uds_request_queue_finish(). 188 */ 189 smp_rmb(); 190 while ((request = poll_queues(queue)) != NULL) 191 queue->processor(request); 192 } 193 194 int uds_make_request_queue(const char *queue_name, 195 uds_request_queue_processor_fn processor, 196 struct uds_request_queue **queue_ptr) 197 { 198 int result; 199 struct uds_request_queue *queue; 200 201 result = vdo_allocate(1, struct uds_request_queue, __func__, &queue); 202 if (result != VDO_SUCCESS) 203 return result; 204 205 queue->processor = processor; 206 queue->running = true; 207 atomic_set(&queue->dormant, false); 208 init_waitqueue_head(&queue->wait_head); 209 210 result = vdo_make_funnel_queue(&queue->main_queue); 211 if (result != VDO_SUCCESS) { 212 uds_request_queue_finish(queue); 213 return result; 214 } 215 216 result = vdo_make_funnel_queue(&queue->retry_queue); 217 if (result != VDO_SUCCESS) { 218 uds_request_queue_finish(queue); 219 return result; 220 } 221 222 result = vdo_create_thread(request_queue_worker, queue, queue_name, 223 &queue->thread); 224 if (result != VDO_SUCCESS) { 225 uds_request_queue_finish(queue); 226 return result; 227 } 228 229 queue->started = true; 230 *queue_ptr = queue; 231 return UDS_SUCCESS; 232 } 233 234 static inline void wake_up_worker(struct uds_request_queue *queue) 235 { 236 if (wq_has_sleeper(&queue->wait_head)) 237 wake_up(&queue->wait_head); 238 } 239 240 void uds_request_queue_enqueue(struct uds_request_queue *queue, 241 struct uds_request *request) 242 { 243 struct funnel_queue *sub_queue; 244 bool unbatched = request->unbatched; 245 246 sub_queue = request->requeued ? queue->retry_queue : queue->main_queue; 247 vdo_funnel_queue_put(sub_queue, &request->queue_link); 248 249 /* 250 * We must wake the worker thread when it is dormant. A read fence isn't needed here since 251 * we know the queue operation acts as one. 252 */ 253 if (atomic_read(&queue->dormant) || unbatched) 254 wake_up_worker(queue); 255 } 256 257 void uds_request_queue_finish(struct uds_request_queue *queue) 258 { 259 if (queue == NULL) 260 return; 261 262 /* 263 * This memory barrier ensures that any requests we queued will be seen. The point is that 264 * when dequeue_request() sees the following update to the running flag, it will also be 265 * able to see any change we made to a next field in the funnel queue entry. The 266 * corresponding read barrier is in request_queue_worker(). 267 */ 268 smp_wmb(); 269 WRITE_ONCE(queue->running, false); 270 271 if (queue->started) { 272 wake_up_worker(queue); 273 vdo_join_threads(queue->thread); 274 } 275 276 vdo_free_funnel_queue(queue->main_queue); 277 vdo_free_funnel_queue(queue->retry_queue); 278 vdo_free(queue); 279 } 280