xref: /freebsd/sys/kern/kern_rwlock.c (revision 9336e0699bda8a301cd2bfa37106b6ec5e32012e)
1 /*-
2  * Copyright (c) 2006 John Baldwin <jhb@FreeBSD.org>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of the author nor the names of any co-contributors
14  *    may be used to endorse or promote products derived from this software
15  *    without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27  * SUCH DAMAGE.
28  */
29 
30 /*
31  * Machine independent bits of reader/writer lock implementation.
32  */
33 
34 #include <sys/cdefs.h>
35 __FBSDID("$FreeBSD$");
36 
37 #include "opt_ddb.h"
38 #include "opt_no_adaptive_rwlocks.h"
39 
40 #include <sys/param.h>
41 #include <sys/ktr.h>
42 #include <sys/lock.h>
43 #include <sys/mutex.h>
44 #include <sys/proc.h>
45 #include <sys/rwlock.h>
46 #include <sys/systm.h>
47 #include <sys/turnstile.h>
48 
49 #include <machine/cpu.h>
50 
51 CTASSERT((RW_RECURSE & LO_CLASSFLAGS) == RW_RECURSE);
52 
53 #if defined(SMP) && !defined(NO_ADAPTIVE_RWLOCKS)
54 #define	ADAPTIVE_RWLOCKS
55 #endif
56 
57 #ifdef DDB
58 #include <ddb/ddb.h>
59 
60 static void	db_show_rwlock(struct lock_object *lock);
61 #endif
62 static void	assert_rw(struct lock_object *lock, int what);
63 static void	lock_rw(struct lock_object *lock, int how);
64 static int	unlock_rw(struct lock_object *lock);
65 
66 struct lock_class lock_class_rw = {
67 	.lc_name = "rw",
68 	.lc_flags = LC_SLEEPLOCK | LC_RECURSABLE | LC_UPGRADABLE,
69 	.lc_assert = assert_rw,
70 #ifdef DDB
71 	.lc_ddb_show = db_show_rwlock,
72 #endif
73 	.lc_lock = lock_rw,
74 	.lc_unlock = unlock_rw,
75 };
76 
77 /*
78  * Return a pointer to the owning thread if the lock is write-locked or
79  * NULL if the lock is unlocked or read-locked.
80  */
81 #define	rw_wowner(rw)							\
82 	((rw)->rw_lock & RW_LOCK_READ ? NULL :				\
83 	    (struct thread *)RW_OWNER((rw)->rw_lock))
84 
85 /*
86  * Returns if a write owner is recursed.  Write ownership is not assured
87  * here and should be previously checked.
88  */
89 #define	rw_recursed(rw)		((rw)->rw_recurse != 0)
90 
91 /*
92  * Return true if curthread helds the lock.
93  */
94 #define	rw_wlocked(rw)		(rw_wowner((rw)) == curthread)
95 
96 /*
97  * Return a pointer to the owning thread for this lock who should receive
98  * any priority lent by threads that block on this lock.  Currently this
99  * is identical to rw_wowner().
100  */
101 #define	rw_owner(rw)		rw_wowner(rw)
102 
103 #ifndef INVARIANTS
104 #define	_rw_assert(rw, what, file, line)
105 #endif
106 
107 void
108 assert_rw(struct lock_object *lock, int what)
109 {
110 
111 	rw_assert((struct rwlock *)lock, what);
112 }
113 
114 void
115 lock_rw(struct lock_object *lock, int how)
116 {
117 	struct rwlock *rw;
118 
119 	rw = (struct rwlock *)lock;
120 	if (how)
121 		rw_wlock(rw);
122 	else
123 		rw_rlock(rw);
124 }
125 
126 int
127 unlock_rw(struct lock_object *lock)
128 {
129 	struct rwlock *rw;
130 
131 	rw = (struct rwlock *)lock;
132 	rw_assert(rw, RA_LOCKED | LA_NOTRECURSED);
133 	if (rw->rw_lock & RW_LOCK_READ) {
134 		rw_runlock(rw);
135 		return (0);
136 	} else {
137 		rw_wunlock(rw);
138 		return (1);
139 	}
140 }
141 
142 void
143 rw_init_flags(struct rwlock *rw, const char *name, int opts)
144 {
145 	int flags;
146 
147 	MPASS((opts & ~(RW_DUPOK | RW_NOPROFILE | RW_NOWITNESS | RW_QUIET |
148 	    RW_RECURSE)) == 0);
149 
150 	flags = LO_UPGRADABLE | LO_RECURSABLE;
151 	if (opts & RW_DUPOK)
152 		flags |= LO_DUPOK;
153 	if (opts & RW_NOPROFILE)
154 		flags |= LO_NOPROFILE;
155 	if (!(opts & RW_NOWITNESS))
156 		flags |= LO_WITNESS;
157 	if (opts & RW_QUIET)
158 		flags |= LO_QUIET;
159 	flags |= opts & RW_RECURSE;
160 
161 	rw->rw_lock = RW_UNLOCKED;
162 	rw->rw_recurse = 0;
163 	lock_init(&rw->lock_object, &lock_class_rw, name, NULL, flags);
164 }
165 
166 void
167 rw_destroy(struct rwlock *rw)
168 {
169 
170 	KASSERT(rw->rw_lock == RW_UNLOCKED, ("rw lock not unlocked"));
171 	KASSERT(rw->rw_recurse == 0, ("rw lock still recursed"));
172 	rw->rw_lock = RW_DESTROYED;
173 	lock_destroy(&rw->lock_object);
174 }
175 
176 void
177 rw_sysinit(void *arg)
178 {
179 	struct rw_args *args = arg;
180 
181 	rw_init(args->ra_rw, args->ra_desc);
182 }
183 
184 int
185 rw_wowned(struct rwlock *rw)
186 {
187 
188 	return (rw_wowner(rw) == curthread);
189 }
190 
191 void
192 _rw_wlock(struct rwlock *rw, const char *file, int line)
193 {
194 
195 	MPASS(curthread != NULL);
196 	KASSERT(rw->rw_lock != RW_DESTROYED,
197 	    ("rw_wlock() of destroyed rwlock @ %s:%d", file, line));
198 	WITNESS_CHECKORDER(&rw->lock_object, LOP_NEWORDER | LOP_EXCLUSIVE, file,
199 	    line);
200 	__rw_wlock(rw, curthread, file, line);
201 	LOCK_LOG_LOCK("WLOCK", &rw->lock_object, 0, rw->rw_recurse, file, line);
202 	WITNESS_LOCK(&rw->lock_object, LOP_EXCLUSIVE, file, line);
203 	curthread->td_locks++;
204 }
205 
206 void
207 _rw_wunlock(struct rwlock *rw, const char *file, int line)
208 {
209 
210 	MPASS(curthread != NULL);
211 	KASSERT(rw->rw_lock != RW_DESTROYED,
212 	    ("rw_wunlock() of destroyed rwlock @ %s:%d", file, line));
213 	_rw_assert(rw, RA_WLOCKED, file, line);
214 	curthread->td_locks--;
215 	WITNESS_UNLOCK(&rw->lock_object, LOP_EXCLUSIVE, file, line);
216 	LOCK_LOG_LOCK("WUNLOCK", &rw->lock_object, 0, rw->rw_recurse, file,
217 	    line);
218 	if (!rw_recursed(rw))
219 		lock_profile_release_lock(&rw->lock_object);
220 	__rw_wunlock(rw, curthread, file, line);
221 }
222 
223 void
224 _rw_rlock(struct rwlock *rw, const char *file, int line)
225 {
226 	struct turnstile *ts;
227 #ifdef ADAPTIVE_RWLOCKS
228 	volatile struct thread *owner;
229 #endif
230 	uint64_t waittime = 0;
231 	int contested = 0;
232 	uintptr_t x;
233 
234 	KASSERT(rw->rw_lock != RW_DESTROYED,
235 	    ("rw_rlock() of destroyed rwlock @ %s:%d", file, line));
236 	KASSERT(rw_wowner(rw) != curthread,
237 	    ("%s (%s): wlock already held @ %s:%d", __func__,
238 	    rw->lock_object.lo_name, file, line));
239 	WITNESS_CHECKORDER(&rw->lock_object, LOP_NEWORDER, file, line);
240 
241 	/*
242 	 * Note that we don't make any attempt to try to block read
243 	 * locks once a writer has blocked on the lock.  The reason is
244 	 * that we currently allow for read locks to recurse and we
245 	 * don't keep track of all the holders of read locks.  Thus, if
246 	 * we were to block readers once a writer blocked and a reader
247 	 * tried to recurse on their reader lock after a writer had
248 	 * blocked we would end up in a deadlock since the reader would
249 	 * be blocked on the writer, and the writer would be blocked
250 	 * waiting for the reader to release its original read lock.
251 	 */
252 	for (;;) {
253 		/*
254 		 * Handle the easy case.  If no other thread has a write
255 		 * lock, then try to bump up the count of read locks.  Note
256 		 * that we have to preserve the current state of the
257 		 * RW_LOCK_WRITE_WAITERS flag.  If we fail to acquire a
258 		 * read lock, then rw_lock must have changed, so restart
259 		 * the loop.  Note that this handles the case of a
260 		 * completely unlocked rwlock since such a lock is encoded
261 		 * as a read lock with no waiters.
262 		 */
263 		x = rw->rw_lock;
264 		if (x & RW_LOCK_READ) {
265 
266 			/*
267 			 * The RW_LOCK_READ_WAITERS flag should only be set
268 			 * if another thread currently holds a write lock,
269 			 * and in that case RW_LOCK_READ should be clear.
270 			 */
271 			MPASS((x & RW_LOCK_READ_WAITERS) == 0);
272 			if (atomic_cmpset_acq_ptr(&rw->rw_lock, x,
273 			    x + RW_ONE_READER)) {
274 				if (LOCK_LOG_TEST(&rw->lock_object, 0))
275 					CTR4(KTR_LOCK,
276 					    "%s: %p succeed %p -> %p", __func__,
277 					    rw, (void *)x,
278 					    (void *)(x + RW_ONE_READER));
279 				break;
280 			}
281 			cpu_spinwait();
282 			continue;
283 		}
284 		lock_profile_obtain_lock_failed(&rw->lock_object,
285 		    &contested, &waittime);
286 
287 #ifdef ADAPTIVE_RWLOCKS
288 		/*
289 		 * If the owner is running on another CPU, spin until
290 		 * the owner stops running or the state of the lock
291 		 * changes.
292 		 */
293 		owner = (struct thread *)RW_OWNER(x);
294 		if (TD_IS_RUNNING(owner)) {
295 			if (LOCK_LOG_TEST(&rw->lock_object, 0))
296 				CTR3(KTR_LOCK, "%s: spinning on %p held by %p",
297 				    __func__, rw, owner);
298 			while ((struct thread*)RW_OWNER(rw->rw_lock) == owner &&
299 			    TD_IS_RUNNING(owner))
300 				cpu_spinwait();
301 			continue;
302 		}
303 #endif
304 
305 		/*
306 		 * Okay, now it's the hard case.  Some other thread already
307 		 * has a write lock, so acquire the turnstile lock so we can
308 		 * begin the process of blocking.
309 		 */
310 		ts = turnstile_trywait(&rw->lock_object);
311 
312 		/*
313 		 * The lock might have been released while we spun, so
314 		 * recheck its state and restart the loop if there is no
315 		 * longer a write lock.
316 		 */
317 		x = rw->rw_lock;
318 		if (x & RW_LOCK_READ) {
319 			turnstile_cancel(ts);
320 			cpu_spinwait();
321 			continue;
322 		}
323 
324 #ifdef ADAPTIVE_RWLOCKS
325 		/*
326 		 * If the current owner of the lock is executing on another
327 		 * CPU quit the hard path and try to spin.
328 		 */
329 		owner = (struct thread *)RW_OWNER(x);
330 		if (TD_IS_RUNNING(owner)) {
331 			turnstile_cancel(ts);
332 			cpu_spinwait();
333 			continue;
334 		}
335 #endif
336 
337 		/*
338 		 * Ok, it's still a write lock.  If the RW_LOCK_READ_WAITERS
339 		 * flag is already set, then we can go ahead and block.  If
340 		 * it is not set then try to set it.  If we fail to set it
341 		 * drop the turnstile lock and restart the loop.
342 		 */
343 		if (!(x & RW_LOCK_READ_WAITERS)) {
344 			if (!atomic_cmpset_ptr(&rw->rw_lock, x,
345 			    x | RW_LOCK_READ_WAITERS)) {
346 				turnstile_cancel(ts);
347 				cpu_spinwait();
348 				continue;
349 			}
350 			if (LOCK_LOG_TEST(&rw->lock_object, 0))
351 				CTR2(KTR_LOCK, "%s: %p set read waiters flag",
352 				    __func__, rw);
353 		}
354 
355 		/*
356 		 * We were unable to acquire the lock and the read waiters
357 		 * flag is set, so we must block on the turnstile.
358 		 */
359 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
360 			CTR2(KTR_LOCK, "%s: %p blocking on turnstile", __func__,
361 			    rw);
362 		turnstile_wait(ts, rw_owner(rw), TS_SHARED_QUEUE);
363 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
364 			CTR2(KTR_LOCK, "%s: %p resuming from turnstile",
365 			    __func__, rw);
366 	}
367 
368 	/*
369 	 * TODO: acquire "owner of record" here.  Here be turnstile dragons
370 	 * however.  turnstiles don't like owners changing between calls to
371 	 * turnstile_wait() currently.
372 	 */
373 	lock_profile_obtain_lock_success( &rw->lock_object, contested,
374 	    waittime, file, line);
375 	LOCK_LOG_LOCK("RLOCK", &rw->lock_object, 0, 0, file, line);
376 	WITNESS_LOCK(&rw->lock_object, 0, file, line);
377 	curthread->td_locks++;
378 }
379 
380 void
381 _rw_runlock(struct rwlock *rw, const char *file, int line)
382 {
383 	struct turnstile *ts;
384 	uintptr_t x;
385 
386 	KASSERT(rw->rw_lock != RW_DESTROYED,
387 	    ("rw_runlock() of destroyed rwlock @ %s:%d", file, line));
388 	_rw_assert(rw, RA_RLOCKED, file, line);
389 	curthread->td_locks--;
390 	WITNESS_UNLOCK(&rw->lock_object, 0, file, line);
391 	LOCK_LOG_LOCK("RUNLOCK", &rw->lock_object, 0, 0, file, line);
392 
393 	/* TODO: drop "owner of record" here. */
394 
395 	for (;;) {
396 		/*
397 		 * See if there is more than one read lock held.  If so,
398 		 * just drop one and return.
399 		 */
400 		x = rw->rw_lock;
401 		if (RW_READERS(x) > 1) {
402 			if (atomic_cmpset_ptr(&rw->rw_lock, x,
403 			    x - RW_ONE_READER)) {
404 				if (LOCK_LOG_TEST(&rw->lock_object, 0))
405 					CTR4(KTR_LOCK,
406 					    "%s: %p succeeded %p -> %p",
407 					    __func__, rw, (void *)x,
408 					    (void *)(x - RW_ONE_READER));
409 				break;
410 			}
411 			continue;
412 		}
413 
414 
415 		/*
416 		 * We should never have read waiters while at least one
417 		 * thread holds a read lock.  (See note above)
418 		 */
419 		KASSERT(!(x & RW_LOCK_READ_WAITERS),
420 		    ("%s: waiting readers", __func__));
421 
422 		/*
423 		 * If there aren't any waiters for a write lock, then try
424 		 * to drop it quickly.
425 		 */
426 		if (!(x & RW_LOCK_WRITE_WAITERS)) {
427 
428 			/*
429 			 * There shouldn't be any flags set and we should
430 			 * be the only read lock.  If we fail to release
431 			 * the single read lock, then another thread might
432 			 * have just acquired a read lock, so go back up
433 			 * to the multiple read locks case.
434 			 */
435 			MPASS(x == RW_READERS_LOCK(1));
436 			if (atomic_cmpset_ptr(&rw->rw_lock, RW_READERS_LOCK(1),
437 			    RW_UNLOCKED)) {
438 				if (LOCK_LOG_TEST(&rw->lock_object, 0))
439 					CTR2(KTR_LOCK, "%s: %p last succeeded",
440 					    __func__, rw);
441 				break;
442 			}
443 			continue;
444 		}
445 
446 		/*
447 		 * There should just be one reader with one or more
448 		 * writers waiting.
449 		 */
450 		MPASS(x == (RW_READERS_LOCK(1) | RW_LOCK_WRITE_WAITERS));
451 
452 		/*
453 		 * Ok, we know we have a waiting writer and we think we
454 		 * are the last reader, so grab the turnstile lock.
455 		 */
456 		turnstile_chain_lock(&rw->lock_object);
457 
458 		/*
459 		 * Try to drop our lock leaving the lock in a unlocked
460 		 * state.
461 		 *
462 		 * If you wanted to do explicit lock handoff you'd have to
463 		 * do it here.  You'd also want to use turnstile_signal()
464 		 * and you'd have to handle the race where a higher
465 		 * priority thread blocks on the write lock before the
466 		 * thread you wakeup actually runs and have the new thread
467 		 * "steal" the lock.  For now it's a lot simpler to just
468 		 * wakeup all of the waiters.
469 		 *
470 		 * As above, if we fail, then another thread might have
471 		 * acquired a read lock, so drop the turnstile lock and
472 		 * restart.
473 		 */
474 		if (!atomic_cmpset_ptr(&rw->rw_lock,
475 		    RW_READERS_LOCK(1) | RW_LOCK_WRITE_WAITERS, RW_UNLOCKED)) {
476 			turnstile_chain_unlock(&rw->lock_object);
477 			continue;
478 		}
479 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
480 			CTR2(KTR_LOCK, "%s: %p last succeeded with waiters",
481 			    __func__, rw);
482 
483 		/*
484 		 * Ok.  The lock is released and all that's left is to
485 		 * wake up the waiters.  Note that the lock might not be
486 		 * free anymore, but in that case the writers will just
487 		 * block again if they run before the new lock holder(s)
488 		 * release the lock.
489 		 */
490 		ts = turnstile_lookup(&rw->lock_object);
491 		MPASS(ts != NULL);
492 		turnstile_broadcast(ts, TS_EXCLUSIVE_QUEUE);
493 		turnstile_unpend(ts, TS_SHARED_LOCK);
494 		turnstile_chain_unlock(&rw->lock_object);
495 		break;
496 	}
497 	lock_profile_release_lock(&rw->lock_object);
498 }
499 
500 /*
501  * This function is called when we are unable to obtain a write lock on the
502  * first try.  This means that at least one other thread holds either a
503  * read or write lock.
504  */
505 void
506 _rw_wlock_hard(struct rwlock *rw, uintptr_t tid, const char *file, int line)
507 {
508 	struct turnstile *ts;
509 #ifdef ADAPTIVE_RWLOCKS
510 	volatile struct thread *owner;
511 #endif
512 	uint64_t waittime = 0;
513 	uintptr_t v;
514 	int contested = 0;
515 
516 	if (rw_wlocked(rw)) {
517 		KASSERT(rw->lock_object.lo_flags & RW_RECURSE,
518 		    ("%s: recursing but non-recursive rw %s @ %s:%d\n",
519 		    __func__, rw->lock_object.lo_name, file, line));
520 		rw->rw_recurse++;
521 		atomic_set_ptr(&rw->rw_lock, RW_LOCK_RECURSED);
522 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
523 			CTR2(KTR_LOCK, "%s: %p recursing", __func__, rw);
524 		return;
525 	}
526 
527 	if (LOCK_LOG_TEST(&rw->lock_object, 0))
528 		CTR5(KTR_LOCK, "%s: %s contested (lock=%p) at %s:%d", __func__,
529 		    rw->lock_object.lo_name, (void *)rw->rw_lock, file, line);
530 
531 	while (!_rw_write_lock(rw, tid)) {
532 		lock_profile_obtain_lock_failed(&rw->lock_object,
533 		    &contested, &waittime);
534 #ifdef ADAPTIVE_RWLOCKS
535 		/*
536 		 * If the lock is write locked and the owner is
537 		 * running on another CPU, spin until the owner stops
538 		 * running or the state of the lock changes.
539 		 */
540 		v = rw->rw_lock;
541 		owner = (struct thread *)RW_OWNER(v);
542 		if (!(v & RW_LOCK_READ) && TD_IS_RUNNING(owner)) {
543 			if (LOCK_LOG_TEST(&rw->lock_object, 0))
544 				CTR3(KTR_LOCK, "%s: spinning on %p held by %p",
545 				    __func__, rw, owner);
546 			while ((struct thread*)RW_OWNER(rw->rw_lock) == owner &&
547 			    TD_IS_RUNNING(owner))
548 				cpu_spinwait();
549 			continue;
550 		}
551 #endif
552 
553 		ts = turnstile_trywait(&rw->lock_object);
554 		v = rw->rw_lock;
555 
556 		/*
557 		 * If the lock was released while spinning on the
558 		 * turnstile chain lock, try again.
559 		 */
560 		if (v == RW_UNLOCKED) {
561 			turnstile_cancel(ts);
562 			cpu_spinwait();
563 			continue;
564 		}
565 
566 #ifdef ADAPTIVE_RWLOCKS
567 		/*
568 		 * If the current owner of the lock is executing on another
569 		 * CPU quit the hard path and try to spin.
570 		 */
571 		if (!(v & RW_LOCK_READ)) {
572 			owner = (struct thread *)RW_OWNER(v);
573 			if (TD_IS_RUNNING(owner)) {
574 				turnstile_cancel(ts);
575 				cpu_spinwait();
576 				continue;
577 			}
578 		}
579 #endif
580 
581 		/*
582 		 * If the lock was released by a writer with both readers
583 		 * and writers waiting and a reader hasn't woken up and
584 		 * acquired the lock yet, rw_lock will be set to the
585 		 * value RW_UNLOCKED | RW_LOCK_WRITE_WAITERS.  If we see
586 		 * that value, try to acquire it once.  Note that we have
587 		 * to preserve the RW_LOCK_WRITE_WAITERS flag as there are
588 		 * other writers waiting still.  If we fail, restart the
589 		 * loop.
590 		 */
591 		if (v == (RW_UNLOCKED | RW_LOCK_WRITE_WAITERS)) {
592 			if (atomic_cmpset_acq_ptr(&rw->rw_lock,
593 			    RW_UNLOCKED | RW_LOCK_WRITE_WAITERS,
594 			    tid | RW_LOCK_WRITE_WAITERS)) {
595 				turnstile_claim(ts);
596 				CTR2(KTR_LOCK, "%s: %p claimed by new writer",
597 				    __func__, rw);
598 				break;
599 			}
600 			turnstile_cancel(ts);
601 			cpu_spinwait();
602 			continue;
603 		}
604 
605 		/*
606 		 * If the RW_LOCK_WRITE_WAITERS flag isn't set, then try to
607 		 * set it.  If we fail to set it, then loop back and try
608 		 * again.
609 		 */
610 		if (!(v & RW_LOCK_WRITE_WAITERS)) {
611 			if (!atomic_cmpset_ptr(&rw->rw_lock, v,
612 			    v | RW_LOCK_WRITE_WAITERS)) {
613 				turnstile_cancel(ts);
614 				cpu_spinwait();
615 				continue;
616 			}
617 			if (LOCK_LOG_TEST(&rw->lock_object, 0))
618 				CTR2(KTR_LOCK, "%s: %p set write waiters flag",
619 				    __func__, rw);
620 		}
621 
622 		/*
623 		 * We were unable to acquire the lock and the write waiters
624 		 * flag is set, so we must block on the turnstile.
625 		 */
626 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
627 			CTR2(KTR_LOCK, "%s: %p blocking on turnstile", __func__,
628 			    rw);
629 		turnstile_wait(ts, rw_owner(rw), TS_EXCLUSIVE_QUEUE);
630 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
631 			CTR2(KTR_LOCK, "%s: %p resuming from turnstile",
632 			    __func__, rw);
633 	}
634 	lock_profile_obtain_lock_success(&rw->lock_object, contested, waittime,
635 	    file, line);
636 }
637 
638 /*
639  * This function is called if the first try at releasing a write lock failed.
640  * This means that one of the 2 waiter bits must be set indicating that at
641  * least one thread is waiting on this lock.
642  */
643 void
644 _rw_wunlock_hard(struct rwlock *rw, uintptr_t tid, const char *file, int line)
645 {
646 	struct turnstile *ts;
647 	uintptr_t v;
648 	int queue;
649 
650 	if (rw_wlocked(rw) && rw_recursed(rw)) {
651 		if ((--rw->rw_recurse) == 0)
652 			atomic_clear_ptr(&rw->rw_lock, RW_LOCK_RECURSED);
653 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
654 			CTR2(KTR_LOCK, "%s: %p unrecursing", __func__, rw);
655 		return;
656 	}
657 
658 	KASSERT(rw->rw_lock & (RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS),
659 	    ("%s: neither of the waiter flags are set", __func__));
660 
661 	if (LOCK_LOG_TEST(&rw->lock_object, 0))
662 		CTR2(KTR_LOCK, "%s: %p contested", __func__, rw);
663 
664 	turnstile_chain_lock(&rw->lock_object);
665 	ts = turnstile_lookup(&rw->lock_object);
666 
667 	MPASS(ts != NULL);
668 
669 	/*
670 	 * Use the same algo as sx locks for now.  Prefer waking up shared
671 	 * waiters if we have any over writers.  This is probably not ideal.
672 	 *
673 	 * 'v' is the value we are going to write back to rw_lock.  If we
674 	 * have waiters on both queues, we need to preserve the state of
675 	 * the waiter flag for the queue we don't wake up.  For now this is
676 	 * hardcoded for the algorithm mentioned above.
677 	 *
678 	 * In the case of both readers and writers waiting we wakeup the
679 	 * readers but leave the RW_LOCK_WRITE_WAITERS flag set.  If a
680 	 * new writer comes in before a reader it will claim the lock up
681 	 * above.  There is probably a potential priority inversion in
682 	 * there that could be worked around either by waking both queues
683 	 * of waiters or doing some complicated lock handoff gymnastics.
684 	 */
685 	v = RW_UNLOCKED;
686 	if (rw->rw_lock & RW_LOCK_READ_WAITERS) {
687 		queue = TS_SHARED_QUEUE;
688 		v |= (rw->rw_lock & RW_LOCK_WRITE_WAITERS);
689 	} else
690 		queue = TS_EXCLUSIVE_QUEUE;
691 
692 	/* Wake up all waiters for the specific queue. */
693 	if (LOCK_LOG_TEST(&rw->lock_object, 0))
694 		CTR3(KTR_LOCK, "%s: %p waking up %s waiters", __func__, rw,
695 		    queue == TS_SHARED_QUEUE ? "read" : "write");
696 	turnstile_broadcast(ts, queue);
697 	atomic_store_rel_ptr(&rw->rw_lock, v);
698 	turnstile_unpend(ts, TS_EXCLUSIVE_LOCK);
699 	turnstile_chain_unlock(&rw->lock_object);
700 }
701 
702 /*
703  * Attempt to do a non-blocking upgrade from a read lock to a write
704  * lock.  This will only succeed if this thread holds a single read
705  * lock.  Returns true if the upgrade succeeded and false otherwise.
706  */
707 int
708 _rw_try_upgrade(struct rwlock *rw, const char *file, int line)
709 {
710 	uintptr_t v, tid;
711 	struct turnstile *ts;
712 	int success;
713 
714 	KASSERT(rw->rw_lock != RW_DESTROYED,
715 	    ("rw_try_upgrade() of destroyed rwlock @ %s:%d", file, line));
716 	_rw_assert(rw, RA_RLOCKED, file, line);
717 
718 	/*
719 	 * Attempt to switch from one reader to a writer.  If there
720 	 * are any write waiters, then we will have to lock the
721 	 * turnstile first to prevent races with another writer
722 	 * calling turnstile_wait() before we have claimed this
723 	 * turnstile.  So, do the simple case of no waiters first.
724 	 */
725 	tid = (uintptr_t)curthread;
726 	if (!(rw->rw_lock & RW_LOCK_WRITE_WAITERS)) {
727 		success = atomic_cmpset_ptr(&rw->rw_lock, RW_READERS_LOCK(1),
728 		    tid);
729 		goto out;
730 	}
731 
732 	/*
733 	 * Ok, we think we have write waiters, so lock the
734 	 * turnstile.
735 	 */
736 	ts = turnstile_trywait(&rw->lock_object);
737 
738 	/*
739 	 * Try to switch from one reader to a writer again.  This time
740 	 * we honor the current state of the RW_LOCK_WRITE_WAITERS
741 	 * flag.  If we obtain the lock with the flag set, then claim
742 	 * ownership of the turnstile.
743 	 */
744 	v = rw->rw_lock & RW_LOCK_WRITE_WAITERS;
745 	success = atomic_cmpset_ptr(&rw->rw_lock, RW_READERS_LOCK(1) | v,
746 	    tid | v);
747 	if (success && v)
748 		turnstile_claim(ts);
749 	else
750 		turnstile_cancel(ts);
751 out:
752 	LOCK_LOG_TRY("WUPGRADE", &rw->lock_object, 0, success, file, line);
753 	if (success)
754 		WITNESS_UPGRADE(&rw->lock_object, LOP_EXCLUSIVE | LOP_TRYLOCK,
755 		    file, line);
756 	return (success);
757 }
758 
759 /*
760  * Downgrade a write lock into a single read lock.
761  */
762 void
763 _rw_downgrade(struct rwlock *rw, const char *file, int line)
764 {
765 	struct turnstile *ts;
766 	uintptr_t tid, v;
767 
768 	KASSERT(rw->rw_lock != RW_DESTROYED,
769 	    ("rw_downgrade() of destroyed rwlock @ %s:%d", file, line));
770 	_rw_assert(rw, RA_WLOCKED | RA_NOTRECURSED, file, line);
771 #ifndef INVARIANTS
772 	if (rw_recursed(rw))
773 		panic("downgrade of a recursed lock");
774 #endif
775 
776 	WITNESS_DOWNGRADE(&rw->lock_object, 0, file, line);
777 
778 	/*
779 	 * Convert from a writer to a single reader.  First we handle
780 	 * the easy case with no waiters.  If there are any waiters, we
781 	 * lock the turnstile, "disown" the lock, and awaken any read
782 	 * waiters.
783 	 */
784 	tid = (uintptr_t)curthread;
785 	if (atomic_cmpset_rel_ptr(&rw->rw_lock, tid, RW_READERS_LOCK(1)))
786 		goto out;
787 
788 	/*
789 	 * Ok, we think we have waiters, so lock the turnstile so we can
790 	 * read the waiter flags without any races.
791 	 */
792 	turnstile_chain_lock(&rw->lock_object);
793 	v = rw->rw_lock;
794 	MPASS(v & (RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS));
795 
796 	/*
797 	 * Downgrade from a write lock while preserving
798 	 * RW_LOCK_WRITE_WAITERS and give up ownership of the
799 	 * turnstile.  If there are any read waiters, wake them up.
800 	 */
801 	ts = turnstile_lookup(&rw->lock_object);
802 	MPASS(ts != NULL);
803 	if (v & RW_LOCK_READ_WAITERS)
804 		turnstile_broadcast(ts, TS_SHARED_QUEUE);
805 	atomic_store_rel_ptr(&rw->rw_lock, RW_READERS_LOCK(1) |
806 	    (v & RW_LOCK_WRITE_WAITERS));
807 	if (v & RW_LOCK_READ_WAITERS)
808 		turnstile_unpend(ts, TS_EXCLUSIVE_LOCK);
809 	else if (ts)
810 		turnstile_disown(ts);
811 	turnstile_chain_unlock(&rw->lock_object);
812 out:
813 	LOCK_LOG_LOCK("WDOWNGRADE", &rw->lock_object, 0, 0, file, line);
814 }
815 
816 #ifdef INVARIANT_SUPPORT
817 #ifndef INVARIANTS
818 #undef _rw_assert
819 #endif
820 
821 /*
822  * In the non-WITNESS case, rw_assert() can only detect that at least
823  * *some* thread owns an rlock, but it cannot guarantee that *this*
824  * thread owns an rlock.
825  */
826 void
827 _rw_assert(struct rwlock *rw, int what, const char *file, int line)
828 {
829 
830 	if (panicstr != NULL)
831 		return;
832 	switch (what) {
833 	case RA_LOCKED:
834 	case RA_LOCKED | RA_RECURSED:
835 	case RA_LOCKED | RA_NOTRECURSED:
836 	case RA_RLOCKED:
837 #ifdef WITNESS
838 		witness_assert(&rw->lock_object, what, file, line);
839 #else
840 		/*
841 		 * If some other thread has a write lock or we have one
842 		 * and are asserting a read lock, fail.  Also, if no one
843 		 * has a lock at all, fail.
844 		 */
845 		if (rw->rw_lock == RW_UNLOCKED ||
846 		    (!(rw->rw_lock & RW_LOCK_READ) && (what == RA_RLOCKED ||
847 		    rw_wowner(rw) != curthread)))
848 			panic("Lock %s not %slocked @ %s:%d\n",
849 			    rw->lock_object.lo_name, (what == RA_RLOCKED) ?
850 			    "read " : "", file, line);
851 
852 		if (!(rw->rw_lock & RW_LOCK_READ)) {
853 			if (rw_recursed(rw)) {
854 				if (what & RA_NOTRECURSED)
855 					panic("Lock %s recursed @ %s:%d\n",
856 					    rw->lock_object.lo_name, file,
857 					    line);
858 			} else if (what & RA_RECURSED)
859 				panic("Lock %s not recursed @ %s:%d\n",
860 				    rw->lock_object.lo_name, file, line);
861 		}
862 #endif
863 		break;
864 	case RA_WLOCKED:
865 	case RA_WLOCKED | RA_RECURSED:
866 	case RA_WLOCKED | RA_NOTRECURSED:
867 		if (rw_wowner(rw) != curthread)
868 			panic("Lock %s not exclusively locked @ %s:%d\n",
869 			    rw->lock_object.lo_name, file, line);
870 		if (rw_recursed(rw)) {
871 			if (what & RA_NOTRECURSED)
872 				panic("Lock %s recursed @ %s:%d\n",
873 				    rw->lock_object.lo_name, file, line);
874 		} else if (what & RA_RECURSED)
875 			panic("Lock %s not recursed @ %s:%d\n",
876 			    rw->lock_object.lo_name, file, line);
877 		break;
878 	case RA_UNLOCKED:
879 #ifdef WITNESS
880 		witness_assert(&rw->lock_object, what, file, line);
881 #else
882 		/*
883 		 * If we hold a write lock fail.  We can't reliably check
884 		 * to see if we hold a read lock or not.
885 		 */
886 		if (rw_wowner(rw) == curthread)
887 			panic("Lock %s exclusively locked @ %s:%d\n",
888 			    rw->lock_object.lo_name, file, line);
889 #endif
890 		break;
891 	default:
892 		panic("Unknown rw lock assertion: %d @ %s:%d", what, file,
893 		    line);
894 	}
895 }
896 #endif /* INVARIANT_SUPPORT */
897 
898 #ifdef DDB
899 void
900 db_show_rwlock(struct lock_object *lock)
901 {
902 	struct rwlock *rw;
903 	struct thread *td;
904 
905 	rw = (struct rwlock *)lock;
906 
907 	db_printf(" state: ");
908 	if (rw->rw_lock == RW_UNLOCKED)
909 		db_printf("UNLOCKED\n");
910 	else if (rw->rw_lock == RW_DESTROYED) {
911 		db_printf("DESTROYED\n");
912 		return;
913 	} else if (rw->rw_lock & RW_LOCK_READ)
914 		db_printf("RLOCK: %ju locks\n",
915 		    (uintmax_t)(RW_READERS(rw->rw_lock)));
916 	else {
917 		td = rw_wowner(rw);
918 		db_printf("WLOCK: %p (tid %d, pid %d, \"%s\")\n", td,
919 		    td->td_tid, td->td_proc->p_pid, td->td_name);
920 		if (rw_recursed(rw))
921 			db_printf(" recursed: %u\n", rw->rw_recurse);
922 	}
923 	db_printf(" waiters: ");
924 	switch (rw->rw_lock & (RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS)) {
925 	case RW_LOCK_READ_WAITERS:
926 		db_printf("readers\n");
927 		break;
928 	case RW_LOCK_WRITE_WAITERS:
929 		db_printf("writers\n");
930 		break;
931 	case RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS:
932 		db_printf("readers and writers\n");
933 		break;
934 	default:
935 		db_printf("none\n");
936 		break;
937 	}
938 }
939 
940 #endif
941