xref: /linux/kernel/locking/qspinlock.c (revision c2aa3089ad7e7fec3ec4a58d8d0904b5e9b392a1)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3  * Queued spinlock
4  *
5  * (C) Copyright 2013-2015 Hewlett-Packard Development Company, L.P.
6  * (C) Copyright 2013-2014,2018 Red Hat, Inc.
7  * (C) Copyright 2015 Intel Corp.
8  * (C) Copyright 2015 Hewlett-Packard Enterprise Development LP
9  *
10  * Authors: Waiman Long <longman@redhat.com>
11  *          Peter Zijlstra <peterz@infradead.org>
12  */
13 
14 #ifndef _GEN_PV_LOCK_SLOWPATH
15 
16 #include <linux/smp.h>
17 #include <linux/bug.h>
18 #include <linux/cpumask.h>
19 #include <linux/percpu.h>
20 #include <linux/hardirq.h>
21 #include <linux/mutex.h>
22 #include <linux/prefetch.h>
23 #include <asm/byteorder.h>
24 #include <asm/qspinlock.h>
25 #include <trace/events/lock.h>
26 
27 /*
28  * Include queued spinlock definitions and statistics code
29  */
30 #include "qspinlock.h"
31 #include "qspinlock_stat.h"
32 
33 /*
34  * The basic principle of a queue-based spinlock can best be understood
35  * by studying a classic queue-based spinlock implementation called the
36  * MCS lock. A copy of the original MCS lock paper ("Algorithms for Scalable
37  * Synchronization on Shared-Memory Multiprocessors by Mellor-Crummey and
38  * Scott") is available at
39  *
40  * https://bugzilla.kernel.org/show_bug.cgi?id=206115
41  *
42  * This queued spinlock implementation is based on the MCS lock, however to
43  * make it fit the 4 bytes we assume spinlock_t to be, and preserve its
44  * existing API, we must modify it somehow.
45  *
46  * In particular; where the traditional MCS lock consists of a tail pointer
47  * (8 bytes) and needs the next pointer (another 8 bytes) of its own node to
48  * unlock the next pending (next->locked), we compress both these: {tail,
49  * next->locked} into a single u32 value.
50  *
51  * Since a spinlock disables recursion of its own context and there is a limit
52  * to the contexts that can nest; namely: task, softirq, hardirq, nmi. As there
53  * are at most 4 nesting levels, it can be encoded by a 2-bit number. Now
54  * we can encode the tail by combining the 2-bit nesting level with the cpu
55  * number. With one byte for the lock value and 3 bytes for the tail, only a
56  * 32-bit word is now needed. Even though we only need 1 bit for the lock,
57  * we extend it to a full byte to achieve better performance for architectures
58  * that support atomic byte write.
59  *
60  * We also change the first spinner to spin on the lock bit instead of its
61  * node; whereby avoiding the need to carry a node from lock to unlock, and
62  * preserving existing lock API. This also makes the unlock code simpler and
63  * faster.
64  *
65  * N.B. The current implementation only supports architectures that allow
66  *      atomic operations on smaller 8-bit and 16-bit data types.
67  *
68  */
69 
70 #include "mcs_spinlock.h"
71 
72 /*
73  * Per-CPU queue node structures; we can never have more than 4 nested
74  * contexts: task, softirq, hardirq, nmi.
75  *
76  * Exactly fits one 64-byte cacheline on a 64-bit architecture.
77  *
78  * PV doubles the storage and uses the second cacheline for PV state.
79  */
80 static DEFINE_PER_CPU_ALIGNED(struct qnode, qnodes[_Q_MAX_NODES]);
81 
82 /*
83  * Generate the native code for queued_spin_unlock_slowpath(); provide NOPs for
84  * all the PV callbacks.
85  */
86 
87 static __always_inline void __pv_init_node(struct mcs_spinlock *node) { }
88 static __always_inline void __pv_wait_node(struct mcs_spinlock *node,
89 					   struct mcs_spinlock *prev) { }
90 static __always_inline void __pv_kick_node(struct qspinlock *lock,
91 					   struct mcs_spinlock *node) { }
92 static __always_inline u32  __pv_wait_head_or_lock(struct qspinlock *lock,
93 						   struct mcs_spinlock *node)
94 						   { return 0; }
95 
96 #define pv_enabled()		false
97 
98 #define pv_init_node		__pv_init_node
99 #define pv_wait_node		__pv_wait_node
100 #define pv_kick_node		__pv_kick_node
101 #define pv_wait_head_or_lock	__pv_wait_head_or_lock
102 
103 #ifdef CONFIG_PARAVIRT_SPINLOCKS
104 #define queued_spin_lock_slowpath	native_queued_spin_lock_slowpath
105 #endif
106 
107 #endif /* _GEN_PV_LOCK_SLOWPATH */
108 
109 /**
110  * queued_spin_lock_slowpath - acquire the queued spinlock
111  * @lock: Pointer to queued spinlock structure
112  * @val: Current value of the queued spinlock 32-bit word
113  *
114  * (queue tail, pending bit, lock value)
115  *
116  *              fast     :    slow                                  :    unlock
117  *                       :                                          :
118  * uncontended  (0,0,0) -:--> (0,0,1) ------------------------------:--> (*,*,0)
119  *                       :       | ^--------.------.             /  :
120  *                       :       v           \      \            |  :
121  * pending               :    (0,1,1) +--> (0,1,0)   \           |  :
122  *                       :       | ^--'              |           |  :
123  *                       :       v                   |           |  :
124  * uncontended           :    (n,x,y) +--> (n,0,0) --'           |  :
125  *   queue               :       | ^--'                          |  :
126  *                       :       v                               |  :
127  * contended             :    (*,x,y) +--> (*,0,0) ---> (*,0,1) -'  :
128  *   queue               :         ^--'                             :
129  */
130 void __lockfunc queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
131 {
132 	struct mcs_spinlock *prev, *next, *node;
133 	u32 old, tail;
134 	int idx;
135 
136 	BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));
137 
138 	if (pv_enabled())
139 		goto pv_queue;
140 
141 	if (virt_spin_lock(lock))
142 		return;
143 
144 	/*
145 	 * Wait for in-progress pending->locked hand-overs with a bounded
146 	 * number of spins so that we guarantee forward progress.
147 	 *
148 	 * 0,1,0 -> 0,0,1
149 	 */
150 	if (val == _Q_PENDING_VAL) {
151 		int cnt = _Q_PENDING_LOOPS;
152 		val = atomic_cond_read_relaxed(&lock->val,
153 					       (VAL != _Q_PENDING_VAL) || !cnt--);
154 	}
155 
156 	/*
157 	 * If we observe any contention; queue.
158 	 */
159 	if (val & ~_Q_LOCKED_MASK)
160 		goto queue;
161 
162 	/*
163 	 * trylock || pending
164 	 *
165 	 * 0,0,* -> 0,1,* -> 0,0,1 pending, trylock
166 	 */
167 	val = queued_fetch_set_pending_acquire(lock);
168 
169 	/*
170 	 * If we observe contention, there is a concurrent locker.
171 	 *
172 	 * Undo and queue; our setting of PENDING might have made the
173 	 * n,0,0 -> 0,0,0 transition fail and it will now be waiting
174 	 * on @next to become !NULL.
175 	 */
176 	if (unlikely(val & ~_Q_LOCKED_MASK)) {
177 
178 		/* Undo PENDING if we set it. */
179 		if (!(val & _Q_PENDING_MASK))
180 			clear_pending(lock);
181 
182 		goto queue;
183 	}
184 
185 	/*
186 	 * We're pending, wait for the owner to go away.
187 	 *
188 	 * 0,1,1 -> *,1,0
189 	 *
190 	 * this wait loop must be a load-acquire such that we match the
191 	 * store-release that clears the locked bit and create lock
192 	 * sequentiality; this is because not all
193 	 * clear_pending_set_locked() implementations imply full
194 	 * barriers.
195 	 */
196 	if (val & _Q_LOCKED_MASK)
197 		smp_cond_load_acquire(&lock->locked, !VAL);
198 
199 	/*
200 	 * take ownership and clear the pending bit.
201 	 *
202 	 * 0,1,0 -> 0,0,1
203 	 */
204 	clear_pending_set_locked(lock);
205 	lockevent_inc(lock_pending);
206 	return;
207 
208 	/*
209 	 * End of pending bit optimistic spinning and beginning of MCS
210 	 * queuing.
211 	 */
212 queue:
213 	lockevent_inc(lock_slowpath);
214 pv_queue:
215 	node = this_cpu_ptr(&qnodes[0].mcs);
216 	idx = node->count++;
217 	tail = encode_tail(smp_processor_id(), idx);
218 
219 	trace_contention_begin(lock, LCB_F_SPIN);
220 
221 	/*
222 	 * 4 nodes are allocated based on the assumption that there will
223 	 * not be nested NMIs taking spinlocks. That may not be true in
224 	 * some architectures even though the chance of needing more than
225 	 * 4 nodes will still be extremely unlikely. When that happens,
226 	 * we fall back to spinning on the lock directly without using
227 	 * any MCS node. This is not the most elegant solution, but is
228 	 * simple enough.
229 	 */
230 	if (unlikely(idx >= _Q_MAX_NODES)) {
231 		lockevent_inc(lock_no_node);
232 		while (!queued_spin_trylock(lock))
233 			cpu_relax();
234 		goto release;
235 	}
236 
237 	node = grab_mcs_node(node, idx);
238 
239 	/*
240 	 * Keep counts of non-zero index values:
241 	 */
242 	lockevent_cond_inc(lock_use_node2 + idx - 1, idx);
243 
244 	/*
245 	 * Ensure that we increment the head node->count before initialising
246 	 * the actual node. If the compiler is kind enough to reorder these
247 	 * stores, then an IRQ could overwrite our assignments.
248 	 */
249 	barrier();
250 
251 	node->locked = 0;
252 	node->next = NULL;
253 	pv_init_node(node);
254 
255 	/*
256 	 * We touched a (possibly) cold cacheline in the per-cpu queue node;
257 	 * attempt the trylock once more in the hope someone let go while we
258 	 * weren't watching.
259 	 */
260 	if (queued_spin_trylock(lock))
261 		goto release;
262 
263 	/*
264 	 * Ensure that the initialisation of @node is complete before we
265 	 * publish the updated tail via xchg_tail() and potentially link
266 	 * @node into the waitqueue via WRITE_ONCE(prev->next, node) below.
267 	 */
268 	smp_wmb();
269 
270 	/*
271 	 * Publish the updated tail.
272 	 * We have already touched the queueing cacheline; don't bother with
273 	 * pending stuff.
274 	 *
275 	 * p,*,* -> n,*,*
276 	 */
277 	old = xchg_tail(lock, tail);
278 	next = NULL;
279 
280 	/*
281 	 * if there was a previous node; link it and wait until reaching the
282 	 * head of the waitqueue.
283 	 */
284 	if (old & _Q_TAIL_MASK) {
285 		prev = decode_tail(old, qnodes);
286 
287 		/* Link @node into the waitqueue. */
288 		WRITE_ONCE(prev->next, node);
289 
290 		pv_wait_node(node, prev);
291 		arch_mcs_spin_lock_contended(&node->locked);
292 
293 		/*
294 		 * While waiting for the MCS lock, the next pointer may have
295 		 * been set by another lock waiter. We optimistically load
296 		 * the next pointer & prefetch the cacheline for writing
297 		 * to reduce latency in the upcoming MCS unlock operation.
298 		 */
299 		next = READ_ONCE(node->next);
300 		if (next)
301 			prefetchw(next);
302 	}
303 
304 	/*
305 	 * we're at the head of the waitqueue, wait for the owner & pending to
306 	 * go away.
307 	 *
308 	 * *,x,y -> *,0,0
309 	 *
310 	 * this wait loop must use a load-acquire such that we match the
311 	 * store-release that clears the locked bit and create lock
312 	 * sequentiality; this is because the set_locked() function below
313 	 * does not imply a full barrier.
314 	 *
315 	 * The PV pv_wait_head_or_lock function, if active, will acquire
316 	 * the lock and return a non-zero value. So we have to skip the
317 	 * atomic_cond_read_acquire() call. As the next PV queue head hasn't
318 	 * been designated yet, there is no way for the locked value to become
319 	 * _Q_SLOW_VAL. So both the set_locked() and the
320 	 * atomic_cmpxchg_relaxed() calls will be safe.
321 	 *
322 	 * If PV isn't active, 0 will be returned instead.
323 	 *
324 	 */
325 	if ((val = pv_wait_head_or_lock(lock, node)))
326 		goto locked;
327 
328 	val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK));
329 
330 locked:
331 	/*
332 	 * claim the lock:
333 	 *
334 	 * n,0,0 -> 0,0,1 : lock, uncontended
335 	 * *,*,0 -> *,*,1 : lock, contended
336 	 *
337 	 * If the queue head is the only one in the queue (lock value == tail)
338 	 * and nobody is pending, clear the tail code and grab the lock.
339 	 * Otherwise, we only need to grab the lock.
340 	 */
341 
342 	/*
343 	 * In the PV case we might already have _Q_LOCKED_VAL set, because
344 	 * of lock stealing; therefore we must also allow:
345 	 *
346 	 * n,0,1 -> 0,0,1
347 	 *
348 	 * Note: at this point: (val & _Q_PENDING_MASK) == 0, because of the
349 	 *       above wait condition, therefore any concurrent setting of
350 	 *       PENDING will make the uncontended transition fail.
351 	 */
352 	if ((val & _Q_TAIL_MASK) == tail) {
353 		if (atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL))
354 			goto release; /* No contention */
355 	}
356 
357 	/*
358 	 * Either somebody is queued behind us or _Q_PENDING_VAL got set
359 	 * which will then detect the remaining tail and queue behind us
360 	 * ensuring we'll see a @next.
361 	 */
362 	set_locked(lock);
363 
364 	/*
365 	 * contended path; wait for next if not observed yet, release.
366 	 */
367 	if (!next)
368 		next = smp_cond_load_relaxed(&node->next, (VAL));
369 
370 	arch_mcs_spin_unlock_contended(&next->locked);
371 	pv_kick_node(lock, next);
372 
373 release:
374 	trace_contention_end(lock, 0);
375 
376 	/*
377 	 * release the node
378 	 */
379 	__this_cpu_dec(qnodes[0].mcs.count);
380 }
381 EXPORT_SYMBOL(queued_spin_lock_slowpath);
382 
383 /*
384  * Generate the paravirt code for queued_spin_unlock_slowpath().
385  */
386 #if !defined(_GEN_PV_LOCK_SLOWPATH) && defined(CONFIG_PARAVIRT_SPINLOCKS)
387 #define _GEN_PV_LOCK_SLOWPATH
388 
389 #undef  pv_enabled
390 #define pv_enabled()	true
391 
392 #undef pv_init_node
393 #undef pv_wait_node
394 #undef pv_kick_node
395 #undef pv_wait_head_or_lock
396 
397 #undef  queued_spin_lock_slowpath
398 #define queued_spin_lock_slowpath	__pv_queued_spin_lock_slowpath
399 
400 #include "qspinlock_paravirt.h"
401 #include "qspinlock.c"
402 
403 bool nopvspin;
404 static __init int parse_nopvspin(char *arg)
405 {
406 	nopvspin = true;
407 	return 0;
408 }
409 early_param("nopvspin", parse_nopvspin);
410 #endif
411