xref: /freebsd/sys/kern/kern_rwlock.c (revision 35a04710d7286aa9538917fd7f8e417dbee95b82)
1 /*-
2  * Copyright (c) 2006 John Baldwin <jhb@FreeBSD.org>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of the author nor the names of any co-contributors
14  *    may be used to endorse or promote products derived from this software
15  *    without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27  * SUCH DAMAGE.
28  */
29 
30 /*
31  * Machine independent bits of reader/writer lock implementation.
32  */
33 
34 #include <sys/cdefs.h>
35 __FBSDID("$FreeBSD$");
36 
37 #include "opt_ddb.h"
38 #include "opt_no_adaptive_rwlocks.h"
39 
40 #include <sys/param.h>
41 #include <sys/ktr.h>
42 #include <sys/lock.h>
43 #include <sys/mutex.h>
44 #include <sys/proc.h>
45 #include <sys/rwlock.h>
46 #include <sys/systm.h>
47 #include <sys/turnstile.h>
48 
49 #include <machine/cpu.h>
50 
51 CTASSERT((RW_RECURSE & LO_CLASSFLAGS) == RW_RECURSE);
52 
53 #if defined(SMP) && !defined(NO_ADAPTIVE_RWLOCKS)
54 #define	ADAPTIVE_RWLOCKS
55 #endif
56 
57 #ifdef DDB
58 #include <ddb/ddb.h>
59 
60 static void	db_show_rwlock(struct lock_object *lock);
61 #endif
62 static void	assert_rw(struct lock_object *lock, int what);
63 static void	lock_rw(struct lock_object *lock, int how);
64 static int	unlock_rw(struct lock_object *lock);
65 
66 struct lock_class lock_class_rw = {
67 	.lc_name = "rw",
68 	.lc_flags = LC_SLEEPLOCK | LC_RECURSABLE | LC_UPGRADABLE,
69 	.lc_assert = assert_rw,
70 #ifdef DDB
71 	.lc_ddb_show = db_show_rwlock,
72 #endif
73 	.lc_lock = lock_rw,
74 	.lc_unlock = unlock_rw,
75 };
76 
77 /*
78  * Return a pointer to the owning thread if the lock is write-locked or
79  * NULL if the lock is unlocked or read-locked.
80  */
81 #define	rw_wowner(rw)							\
82 	((rw)->rw_lock & RW_LOCK_READ ? NULL :				\
83 	    (struct thread *)RW_OWNER((rw)->rw_lock))
84 
85 /*
86  * Returns if a write owner is recursed.  Write ownership is not assured
87  * here and should be previously checked.
88  */
89 #define	rw_recursed(rw)		((rw)->rw_recurse != 0)
90 
91 /*
92  * Return true if curthread helds the lock.
93  */
94 #define	rw_wlocked(rw)		(rw_wowner((rw)) == curthread)
95 
96 /*
97  * Return a pointer to the owning thread for this lock who should receive
98  * any priority lent by threads that block on this lock.  Currently this
99  * is identical to rw_wowner().
100  */
101 #define	rw_owner(rw)		rw_wowner(rw)
102 
103 #ifndef INVARIANTS
104 #define	_rw_assert(rw, what, file, line)
105 #endif
106 
107 void
108 assert_rw(struct lock_object *lock, int what)
109 {
110 
111 	rw_assert((struct rwlock *)lock, what);
112 }
113 
114 void
115 lock_rw(struct lock_object *lock, int how)
116 {
117 	struct rwlock *rw;
118 
119 	rw = (struct rwlock *)lock;
120 	if (how)
121 		rw_wlock(rw);
122 	else
123 		rw_rlock(rw);
124 }
125 
126 int
127 unlock_rw(struct lock_object *lock)
128 {
129 	struct rwlock *rw;
130 
131 	rw = (struct rwlock *)lock;
132 	rw_assert(rw, RA_LOCKED | LA_NOTRECURSED);
133 	if (rw->rw_lock & RW_LOCK_READ) {
134 		rw_runlock(rw);
135 		return (0);
136 	} else {
137 		rw_wunlock(rw);
138 		return (1);
139 	}
140 }
141 
142 void
143 rw_init_flags(struct rwlock *rw, const char *name, int opts)
144 {
145 	int flags;
146 
147 	MPASS((opts & ~(RW_DUPOK | RW_NOPROFILE | RW_NOWITNESS | RW_QUIET |
148 	    RW_RECURSE)) == 0);
149 
150 	flags = LO_UPGRADABLE | LO_RECURSABLE;
151 	if (opts & RW_DUPOK)
152 		flags |= LO_DUPOK;
153 	if (opts & RW_NOPROFILE)
154 		flags |= LO_NOPROFILE;
155 	if (!(opts & RW_NOWITNESS))
156 		flags |= LO_WITNESS;
157 	if (opts & RW_QUIET)
158 		flags |= LO_QUIET;
159 	flags |= opts & RW_RECURSE;
160 
161 	rw->rw_lock = RW_UNLOCKED;
162 	rw->rw_recurse = 0;
163 	lock_init(&rw->lock_object, &lock_class_rw, name, NULL, flags);
164 }
165 
166 void
167 rw_destroy(struct rwlock *rw)
168 {
169 
170 	KASSERT(rw->rw_lock == RW_UNLOCKED, ("rw lock not unlocked"));
171 	KASSERT(rw->rw_recurse == 0, ("rw lock still recursed"));
172 	rw->rw_lock = RW_DESTROYED;
173 	lock_destroy(&rw->lock_object);
174 }
175 
176 void
177 rw_sysinit(void *arg)
178 {
179 	struct rw_args *args = arg;
180 
181 	rw_init(args->ra_rw, args->ra_desc);
182 }
183 
184 int
185 rw_wowned(struct rwlock *rw)
186 {
187 
188 	return (rw_wowner(rw) == curthread);
189 }
190 
191 void
192 _rw_wlock(struct rwlock *rw, const char *file, int line)
193 {
194 
195 	MPASS(curthread != NULL);
196 	KASSERT(rw->rw_lock != RW_DESTROYED,
197 	    ("rw_wlock() of destroyed rwlock @ %s:%d", file, line));
198 	WITNESS_CHECKORDER(&rw->lock_object, LOP_NEWORDER | LOP_EXCLUSIVE, file,
199 	    line);
200 	__rw_wlock(rw, curthread, file, line);
201 	LOCK_LOG_LOCK("WLOCK", &rw->lock_object, 0, rw->rw_recurse, file, line);
202 	WITNESS_LOCK(&rw->lock_object, LOP_EXCLUSIVE, file, line);
203 	curthread->td_locks++;
204 }
205 
206 void
207 _rw_wunlock(struct rwlock *rw, const char *file, int line)
208 {
209 
210 	MPASS(curthread != NULL);
211 	KASSERT(rw->rw_lock != RW_DESTROYED,
212 	    ("rw_wunlock() of destroyed rwlock @ %s:%d", file, line));
213 	_rw_assert(rw, RA_WLOCKED, file, line);
214 	curthread->td_locks--;
215 	WITNESS_UNLOCK(&rw->lock_object, LOP_EXCLUSIVE, file, line);
216 	LOCK_LOG_LOCK("WUNLOCK", &rw->lock_object, 0, rw->rw_recurse, file,
217 	    line);
218 	if (!rw_recursed(rw))
219 		lock_profile_release_lock(&rw->lock_object);
220 	__rw_wunlock(rw, curthread, file, line);
221 }
222 
223 void
224 _rw_rlock(struct rwlock *rw, const char *file, int line)
225 {
226 	struct turnstile *ts;
227 #ifdef ADAPTIVE_RWLOCKS
228 	volatile struct thread *owner;
229 #endif
230 #ifdef LOCK_PROFILING_SHARED
231 	uint64_t waittime = 0;
232 	int contested = 0;
233 #endif
234 	uintptr_t x;
235 
236 	KASSERT(rw->rw_lock != RW_DESTROYED,
237 	    ("rw_rlock() of destroyed rwlock @ %s:%d", file, line));
238 	KASSERT(rw_wowner(rw) != curthread,
239 	    ("%s (%s): wlock already held @ %s:%d", __func__,
240 	    rw->lock_object.lo_name, file, line));
241 	WITNESS_CHECKORDER(&rw->lock_object, LOP_NEWORDER, file, line);
242 
243 	/*
244 	 * Note that we don't make any attempt to try to block read
245 	 * locks once a writer has blocked on the lock.  The reason is
246 	 * that we currently allow for read locks to recurse and we
247 	 * don't keep track of all the holders of read locks.  Thus, if
248 	 * we were to block readers once a writer blocked and a reader
249 	 * tried to recurse on their reader lock after a writer had
250 	 * blocked we would end up in a deadlock since the reader would
251 	 * be blocked on the writer, and the writer would be blocked
252 	 * waiting for the reader to release its original read lock.
253 	 */
254 	for (;;) {
255 		/*
256 		 * Handle the easy case.  If no other thread has a write
257 		 * lock, then try to bump up the count of read locks.  Note
258 		 * that we have to preserve the current state of the
259 		 * RW_LOCK_WRITE_WAITERS flag.  If we fail to acquire a
260 		 * read lock, then rw_lock must have changed, so restart
261 		 * the loop.  Note that this handles the case of a
262 		 * completely unlocked rwlock since such a lock is encoded
263 		 * as a read lock with no waiters.
264 		 */
265 		x = rw->rw_lock;
266 		if (x & RW_LOCK_READ) {
267 
268 			/*
269 			 * The RW_LOCK_READ_WAITERS flag should only be set
270 			 * if another thread currently holds a write lock,
271 			 * and in that case RW_LOCK_READ should be clear.
272 			 */
273 			MPASS((x & RW_LOCK_READ_WAITERS) == 0);
274 			if (atomic_cmpset_acq_ptr(&rw->rw_lock, x,
275 			    x + RW_ONE_READER)) {
276 #ifdef LOCK_PROFILING_SHARED
277 				if (RW_READERS(x) == 0)
278 					lock_profile_obtain_lock_success(
279 					    &rw->lock_object, contested,
280 					    waittime, file, line);
281 #endif
282 				if (LOCK_LOG_TEST(&rw->lock_object, 0))
283 					CTR4(KTR_LOCK,
284 					    "%s: %p succeed %p -> %p", __func__,
285 					    rw, (void *)x,
286 					    (void *)(x + RW_ONE_READER));
287 				break;
288 			}
289 			cpu_spinwait();
290 			continue;
291 		}
292 
293 #ifdef ADAPTIVE_RWLOCKS
294 		/*
295 		 * If the owner is running on another CPU, spin until
296 		 * the owner stops running or the state of the lock
297 		 * changes.
298 		 */
299 		owner = (struct thread *)RW_OWNER(x);
300 		if (TD_IS_RUNNING(owner)) {
301 			if (LOCK_LOG_TEST(&rw->lock_object, 0))
302 				CTR3(KTR_LOCK, "%s: spinning on %p held by %p",
303 				    __func__, rw, owner);
304 #ifdef LOCK_PROFILING_SHARED
305 			lock_profile_obtain_lock_failed(&rw->lock_object,
306 			    &contested, &waittime);
307 #endif
308 			while ((struct thread*)RW_OWNER(rw->rw_lock) == owner &&
309 			    TD_IS_RUNNING(owner))
310 				cpu_spinwait();
311 			continue;
312 		}
313 #endif
314 
315 		/*
316 		 * Okay, now it's the hard case.  Some other thread already
317 		 * has a write lock, so acquire the turnstile lock so we can
318 		 * begin the process of blocking.
319 		 */
320 		ts = turnstile_trywait(&rw->lock_object);
321 
322 		/*
323 		 * The lock might have been released while we spun, so
324 		 * recheck its state and restart the loop if there is no
325 		 * longer a write lock.
326 		 */
327 		x = rw->rw_lock;
328 		if (x & RW_LOCK_READ) {
329 			turnstile_cancel(ts);
330 			cpu_spinwait();
331 			continue;
332 		}
333 
334 #ifdef ADAPTIVE_RWLOCKS
335 		/*
336 		 * If the current owner of the lock is executing on another
337 		 * CPU quit the hard path and try to spin.
338 		 */
339 		owner = (struct thread *)RW_OWNER(x);
340 		if (TD_IS_RUNNING(owner)) {
341 			turnstile_cancel(ts);
342 			cpu_spinwait();
343 			continue;
344 		}
345 #endif
346 
347 		/*
348 		 * Ok, it's still a write lock.  If the RW_LOCK_READ_WAITERS
349 		 * flag is already set, then we can go ahead and block.  If
350 		 * it is not set then try to set it.  If we fail to set it
351 		 * drop the turnstile lock and restart the loop.
352 		 */
353 		if (!(x & RW_LOCK_READ_WAITERS)) {
354 			if (!atomic_cmpset_ptr(&rw->rw_lock, x,
355 			    x | RW_LOCK_READ_WAITERS)) {
356 				turnstile_cancel(ts);
357 				cpu_spinwait();
358 				continue;
359 			}
360 			if (LOCK_LOG_TEST(&rw->lock_object, 0))
361 				CTR2(KTR_LOCK, "%s: %p set read waiters flag",
362 				    __func__, rw);
363 		}
364 
365 		/*
366 		 * We were unable to acquire the lock and the read waiters
367 		 * flag is set, so we must block on the turnstile.
368 		 */
369 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
370 			CTR2(KTR_LOCK, "%s: %p blocking on turnstile", __func__,
371 			    rw);
372 #ifdef LOCK_PROFILING_SHARED
373 		lock_profile_obtain_lock_failed(&rw->lock_object, &contested,
374 		    &waittime);
375 #endif
376 		turnstile_wait(ts, rw_owner(rw), TS_SHARED_QUEUE);
377 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
378 			CTR2(KTR_LOCK, "%s: %p resuming from turnstile",
379 			    __func__, rw);
380 	}
381 
382 	/*
383 	 * TODO: acquire "owner of record" here.  Here be turnstile dragons
384 	 * however.  turnstiles don't like owners changing between calls to
385 	 * turnstile_wait() currently.
386 	 */
387 
388 	LOCK_LOG_LOCK("RLOCK", &rw->lock_object, 0, 0, file, line);
389 	WITNESS_LOCK(&rw->lock_object, 0, file, line);
390 	curthread->td_locks++;
391 }
392 
393 void
394 _rw_runlock(struct rwlock *rw, const char *file, int line)
395 {
396 	struct turnstile *ts;
397 	uintptr_t x;
398 
399 	KASSERT(rw->rw_lock != RW_DESTROYED,
400 	    ("rw_runlock() of destroyed rwlock @ %s:%d", file, line));
401 	_rw_assert(rw, RA_RLOCKED, file, line);
402 	curthread->td_locks--;
403 	WITNESS_UNLOCK(&rw->lock_object, 0, file, line);
404 	LOCK_LOG_LOCK("RUNLOCK", &rw->lock_object, 0, 0, file, line);
405 
406 	/* TODO: drop "owner of record" here. */
407 
408 	for (;;) {
409 		/*
410 		 * See if there is more than one read lock held.  If so,
411 		 * just drop one and return.
412 		 */
413 		x = rw->rw_lock;
414 		if (RW_READERS(x) > 1) {
415 			if (atomic_cmpset_ptr(&rw->rw_lock, x,
416 			    x - RW_ONE_READER)) {
417 				if (LOCK_LOG_TEST(&rw->lock_object, 0))
418 					CTR4(KTR_LOCK,
419 					    "%s: %p succeeded %p -> %p",
420 					    __func__, rw, (void *)x,
421 					    (void *)(x - RW_ONE_READER));
422 				break;
423 			}
424 			continue;
425 		}
426 
427 
428 		/*
429 		 * We should never have read waiters while at least one
430 		 * thread holds a read lock.  (See note above)
431 		 */
432 		KASSERT(!(x & RW_LOCK_READ_WAITERS),
433 		    ("%s: waiting readers", __func__));
434 #ifdef LOCK_PROFILING_SHARED
435 		lock_profile_release_lock(&rw->lock_object);
436 #endif
437 
438 		/*
439 		 * If there aren't any waiters for a write lock, then try
440 		 * to drop it quickly.
441 		 */
442 		if (!(x & RW_LOCK_WRITE_WAITERS)) {
443 
444 			/*
445 			 * There shouldn't be any flags set and we should
446 			 * be the only read lock.  If we fail to release
447 			 * the single read lock, then another thread might
448 			 * have just acquired a read lock, so go back up
449 			 * to the multiple read locks case.
450 			 */
451 			MPASS(x == RW_READERS_LOCK(1));
452 			if (atomic_cmpset_ptr(&rw->rw_lock, RW_READERS_LOCK(1),
453 			    RW_UNLOCKED)) {
454 				if (LOCK_LOG_TEST(&rw->lock_object, 0))
455 					CTR2(KTR_LOCK, "%s: %p last succeeded",
456 					    __func__, rw);
457 				break;
458 			}
459 			continue;
460 		}
461 
462 		/*
463 		 * There should just be one reader with one or more
464 		 * writers waiting.
465 		 */
466 		MPASS(x == (RW_READERS_LOCK(1) | RW_LOCK_WRITE_WAITERS));
467 
468 		/*
469 		 * Ok, we know we have a waiting writer and we think we
470 		 * are the last reader, so grab the turnstile lock.
471 		 */
472 		turnstile_chain_lock(&rw->lock_object);
473 
474 		/*
475 		 * Try to drop our lock leaving the lock in a unlocked
476 		 * state.
477 		 *
478 		 * If you wanted to do explicit lock handoff you'd have to
479 		 * do it here.  You'd also want to use turnstile_signal()
480 		 * and you'd have to handle the race where a higher
481 		 * priority thread blocks on the write lock before the
482 		 * thread you wakeup actually runs and have the new thread
483 		 * "steal" the lock.  For now it's a lot simpler to just
484 		 * wakeup all of the waiters.
485 		 *
486 		 * As above, if we fail, then another thread might have
487 		 * acquired a read lock, so drop the turnstile lock and
488 		 * restart.
489 		 */
490 		if (!atomic_cmpset_ptr(&rw->rw_lock,
491 		    RW_READERS_LOCK(1) | RW_LOCK_WRITE_WAITERS, RW_UNLOCKED)) {
492 			turnstile_chain_unlock(&rw->lock_object);
493 			continue;
494 		}
495 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
496 			CTR2(KTR_LOCK, "%s: %p last succeeded with waiters",
497 			    __func__, rw);
498 
499 		/*
500 		 * Ok.  The lock is released and all that's left is to
501 		 * wake up the waiters.  Note that the lock might not be
502 		 * free anymore, but in that case the writers will just
503 		 * block again if they run before the new lock holder(s)
504 		 * release the lock.
505 		 */
506 		ts = turnstile_lookup(&rw->lock_object);
507 		MPASS(ts != NULL);
508 		turnstile_broadcast(ts, TS_EXCLUSIVE_QUEUE);
509 		turnstile_unpend(ts, TS_SHARED_LOCK);
510 		turnstile_chain_unlock(&rw->lock_object);
511 		break;
512 	}
513 }
514 
515 /*
516  * This function is called when we are unable to obtain a write lock on the
517  * first try.  This means that at least one other thread holds either a
518  * read or write lock.
519  */
520 void
521 _rw_wlock_hard(struct rwlock *rw, uintptr_t tid, const char *file, int line)
522 {
523 	struct turnstile *ts;
524 #ifdef ADAPTIVE_RWLOCKS
525 	volatile struct thread *owner;
526 #endif
527 	uint64_t waittime = 0;
528 	uintptr_t v;
529 	int contested = 0;
530 
531 	if (rw_wlocked(rw)) {
532 		KASSERT(rw->lock_object.lo_flags & RW_RECURSE,
533 		    ("%s: recursing but non-recursive rw %s @ %s:%d\n",
534 		    __func__, rw->lock_object.lo_name, file, line));
535 		rw->rw_recurse++;
536 		atomic_set_ptr(&rw->rw_lock, RW_LOCK_RECURSED);
537 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
538 			CTR2(KTR_LOCK, "%s: %p recursing", __func__, rw);
539 		return;
540 	}
541 
542 	if (LOCK_LOG_TEST(&rw->lock_object, 0))
543 		CTR5(KTR_LOCK, "%s: %s contested (lock=%p) at %s:%d", __func__,
544 		    rw->lock_object.lo_name, (void *)rw->rw_lock, file, line);
545 
546 	while (!_rw_write_lock(rw, tid)) {
547 #ifdef ADAPTIVE_RWLOCKS
548 		/*
549 		 * If the lock is write locked and the owner is
550 		 * running on another CPU, spin until the owner stops
551 		 * running or the state of the lock changes.
552 		 */
553 		v = rw->rw_lock;
554 		owner = (struct thread *)RW_OWNER(v);
555 		if (!(v & RW_LOCK_READ) && TD_IS_RUNNING(owner)) {
556 			if (LOCK_LOG_TEST(&rw->lock_object, 0))
557 				CTR3(KTR_LOCK, "%s: spinning on %p held by %p",
558 				    __func__, rw, owner);
559 			lock_profile_obtain_lock_failed(&rw->lock_object,
560 			    &contested, &waittime);
561 			while ((struct thread*)RW_OWNER(rw->rw_lock) == owner &&
562 			    TD_IS_RUNNING(owner))
563 				cpu_spinwait();
564 			continue;
565 		}
566 #endif
567 
568 		ts = turnstile_trywait(&rw->lock_object);
569 		v = rw->rw_lock;
570 
571 		/*
572 		 * If the lock was released while spinning on the
573 		 * turnstile chain lock, try again.
574 		 */
575 		if (v == RW_UNLOCKED) {
576 			turnstile_cancel(ts);
577 			cpu_spinwait();
578 			continue;
579 		}
580 
581 #ifdef ADAPTIVE_RWLOCKS
582 		/*
583 		 * If the current owner of the lock is executing on another
584 		 * CPU quit the hard path and try to spin.
585 		 */
586 		if (!(v & RW_LOCK_READ)) {
587 			owner = (struct thread *)RW_OWNER(v);
588 			if (TD_IS_RUNNING(owner)) {
589 				turnstile_cancel(ts);
590 				cpu_spinwait();
591 				continue;
592 			}
593 		}
594 #endif
595 
596 		/*
597 		 * If the lock was released by a writer with both readers
598 		 * and writers waiting and a reader hasn't woken up and
599 		 * acquired the lock yet, rw_lock will be set to the
600 		 * value RW_UNLOCKED | RW_LOCK_WRITE_WAITERS.  If we see
601 		 * that value, try to acquire it once.  Note that we have
602 		 * to preserve the RW_LOCK_WRITE_WAITERS flag as there are
603 		 * other writers waiting still.  If we fail, restart the
604 		 * loop.
605 		 */
606 		if (v == (RW_UNLOCKED | RW_LOCK_WRITE_WAITERS)) {
607 			if (atomic_cmpset_acq_ptr(&rw->rw_lock,
608 			    RW_UNLOCKED | RW_LOCK_WRITE_WAITERS,
609 			    tid | RW_LOCK_WRITE_WAITERS)) {
610 				turnstile_claim(ts);
611 				CTR2(KTR_LOCK, "%s: %p claimed by new writer",
612 				    __func__, rw);
613 				break;
614 			}
615 			turnstile_cancel(ts);
616 			cpu_spinwait();
617 			continue;
618 		}
619 
620 		/*
621 		 * If the RW_LOCK_WRITE_WAITERS flag isn't set, then try to
622 		 * set it.  If we fail to set it, then loop back and try
623 		 * again.
624 		 */
625 		if (!(v & RW_LOCK_WRITE_WAITERS)) {
626 			if (!atomic_cmpset_ptr(&rw->rw_lock, v,
627 			    v | RW_LOCK_WRITE_WAITERS)) {
628 				turnstile_cancel(ts);
629 				cpu_spinwait();
630 				continue;
631 			}
632 			if (LOCK_LOG_TEST(&rw->lock_object, 0))
633 				CTR2(KTR_LOCK, "%s: %p set write waiters flag",
634 				    __func__, rw);
635 		}
636 
637 		/*
638 		 * We were unable to acquire the lock and the write waiters
639 		 * flag is set, so we must block on the turnstile.
640 		 */
641 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
642 			CTR2(KTR_LOCK, "%s: %p blocking on turnstile", __func__,
643 			    rw);
644 		lock_profile_obtain_lock_failed(&rw->lock_object, &contested,
645 		    &waittime);
646 		turnstile_wait(ts, rw_owner(rw), TS_EXCLUSIVE_QUEUE);
647 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
648 			CTR2(KTR_LOCK, "%s: %p resuming from turnstile",
649 			    __func__, rw);
650 	}
651 	lock_profile_obtain_lock_success(&rw->lock_object, contested, waittime,
652 	    file, line);
653 }
654 
655 /*
656  * This function is called if the first try at releasing a write lock failed.
657  * This means that one of the 2 waiter bits must be set indicating that at
658  * least one thread is waiting on this lock.
659  */
660 void
661 _rw_wunlock_hard(struct rwlock *rw, uintptr_t tid, const char *file, int line)
662 {
663 	struct turnstile *ts;
664 	uintptr_t v;
665 	int queue;
666 
667 	if (rw_wlocked(rw) && rw_recursed(rw)) {
668 		if ((--rw->rw_recurse) == 0)
669 			atomic_clear_ptr(&rw->rw_lock, RW_LOCK_RECURSED);
670 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
671 			CTR2(KTR_LOCK, "%s: %p unrecursing", __func__, rw);
672 		return;
673 	}
674 
675 	KASSERT(rw->rw_lock & (RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS),
676 	    ("%s: neither of the waiter flags are set", __func__));
677 
678 	if (LOCK_LOG_TEST(&rw->lock_object, 0))
679 		CTR2(KTR_LOCK, "%s: %p contested", __func__, rw);
680 
681 	turnstile_chain_lock(&rw->lock_object);
682 	ts = turnstile_lookup(&rw->lock_object);
683 
684 	MPASS(ts != NULL);
685 
686 	/*
687 	 * Use the same algo as sx locks for now.  Prefer waking up shared
688 	 * waiters if we have any over writers.  This is probably not ideal.
689 	 *
690 	 * 'v' is the value we are going to write back to rw_lock.  If we
691 	 * have waiters on both queues, we need to preserve the state of
692 	 * the waiter flag for the queue we don't wake up.  For now this is
693 	 * hardcoded for the algorithm mentioned above.
694 	 *
695 	 * In the case of both readers and writers waiting we wakeup the
696 	 * readers but leave the RW_LOCK_WRITE_WAITERS flag set.  If a
697 	 * new writer comes in before a reader it will claim the lock up
698 	 * above.  There is probably a potential priority inversion in
699 	 * there that could be worked around either by waking both queues
700 	 * of waiters or doing some complicated lock handoff gymnastics.
701 	 */
702 	v = RW_UNLOCKED;
703 	if (rw->rw_lock & RW_LOCK_READ_WAITERS) {
704 		queue = TS_SHARED_QUEUE;
705 		v |= (rw->rw_lock & RW_LOCK_WRITE_WAITERS);
706 	} else
707 		queue = TS_EXCLUSIVE_QUEUE;
708 
709 	/* Wake up all waiters for the specific queue. */
710 	if (LOCK_LOG_TEST(&rw->lock_object, 0))
711 		CTR3(KTR_LOCK, "%s: %p waking up %s waiters", __func__, rw,
712 		    queue == TS_SHARED_QUEUE ? "read" : "write");
713 	turnstile_broadcast(ts, queue);
714 	atomic_store_rel_ptr(&rw->rw_lock, v);
715 	turnstile_unpend(ts, TS_EXCLUSIVE_LOCK);
716 	turnstile_chain_unlock(&rw->lock_object);
717 }
718 
719 /*
720  * Attempt to do a non-blocking upgrade from a read lock to a write
721  * lock.  This will only succeed if this thread holds a single read
722  * lock.  Returns true if the upgrade succeeded and false otherwise.
723  */
724 int
725 _rw_try_upgrade(struct rwlock *rw, const char *file, int line)
726 {
727 	uintptr_t v, tid;
728 	struct turnstile *ts;
729 	int success;
730 
731 	KASSERT(rw->rw_lock != RW_DESTROYED,
732 	    ("rw_try_upgrade() of destroyed rwlock @ %s:%d", file, line));
733 	_rw_assert(rw, RA_RLOCKED, file, line);
734 
735 	/*
736 	 * Attempt to switch from one reader to a writer.  If there
737 	 * are any write waiters, then we will have to lock the
738 	 * turnstile first to prevent races with another writer
739 	 * calling turnstile_wait() before we have claimed this
740 	 * turnstile.  So, do the simple case of no waiters first.
741 	 */
742 	tid = (uintptr_t)curthread;
743 	if (!(rw->rw_lock & RW_LOCK_WRITE_WAITERS)) {
744 		success = atomic_cmpset_ptr(&rw->rw_lock, RW_READERS_LOCK(1),
745 		    tid);
746 		goto out;
747 	}
748 
749 	/*
750 	 * Ok, we think we have write waiters, so lock the
751 	 * turnstile.
752 	 */
753 	ts = turnstile_trywait(&rw->lock_object);
754 
755 	/*
756 	 * Try to switch from one reader to a writer again.  This time
757 	 * we honor the current state of the RW_LOCK_WRITE_WAITERS
758 	 * flag.  If we obtain the lock with the flag set, then claim
759 	 * ownership of the turnstile.
760 	 */
761 	v = rw->rw_lock & RW_LOCK_WRITE_WAITERS;
762 	success = atomic_cmpset_ptr(&rw->rw_lock, RW_READERS_LOCK(1) | v,
763 	    tid | v);
764 	if (success && v)
765 		turnstile_claim(ts);
766 	else
767 		turnstile_cancel(ts);
768 out:
769 	LOCK_LOG_TRY("WUPGRADE", &rw->lock_object, 0, success, file, line);
770 	if (success)
771 		WITNESS_UPGRADE(&rw->lock_object, LOP_EXCLUSIVE | LOP_TRYLOCK,
772 		    file, line);
773 	return (success);
774 }
775 
776 /*
777  * Downgrade a write lock into a single read lock.
778  */
779 void
780 _rw_downgrade(struct rwlock *rw, const char *file, int line)
781 {
782 	struct turnstile *ts;
783 	uintptr_t tid, v;
784 
785 	KASSERT(rw->rw_lock != RW_DESTROYED,
786 	    ("rw_downgrade() of destroyed rwlock @ %s:%d", file, line));
787 	_rw_assert(rw, RA_WLOCKED | RA_NOTRECURSED, file, line);
788 #ifndef INVARIANTS
789 	if (rw_recursed(rw))
790 		panic("downgrade of a recursed lock");
791 #endif
792 
793 	WITNESS_DOWNGRADE(&rw->lock_object, 0, file, line);
794 
795 	/*
796 	 * Convert from a writer to a single reader.  First we handle
797 	 * the easy case with no waiters.  If there are any waiters, we
798 	 * lock the turnstile, "disown" the lock, and awaken any read
799 	 * waiters.
800 	 */
801 	tid = (uintptr_t)curthread;
802 	if (atomic_cmpset_rel_ptr(&rw->rw_lock, tid, RW_READERS_LOCK(1)))
803 		goto out;
804 
805 	/*
806 	 * Ok, we think we have waiters, so lock the turnstile so we can
807 	 * read the waiter flags without any races.
808 	 */
809 	turnstile_chain_lock(&rw->lock_object);
810 	v = rw->rw_lock;
811 	MPASS(v & (RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS));
812 
813 	/*
814 	 * Downgrade from a write lock while preserving
815 	 * RW_LOCK_WRITE_WAITERS and give up ownership of the
816 	 * turnstile.  If there are any read waiters, wake them up.
817 	 */
818 	ts = turnstile_lookup(&rw->lock_object);
819 	MPASS(ts != NULL);
820 	if (v & RW_LOCK_READ_WAITERS)
821 		turnstile_broadcast(ts, TS_SHARED_QUEUE);
822 	atomic_store_rel_ptr(&rw->rw_lock, RW_READERS_LOCK(1) |
823 	    (v & RW_LOCK_WRITE_WAITERS));
824 	if (v & RW_LOCK_READ_WAITERS)
825 		turnstile_unpend(ts, TS_EXCLUSIVE_LOCK);
826 	else if (ts)
827 		turnstile_disown(ts);
828 	turnstile_chain_unlock(&rw->lock_object);
829 out:
830 	LOCK_LOG_LOCK("WDOWNGRADE", &rw->lock_object, 0, 0, file, line);
831 }
832 
833 #ifdef INVARIANT_SUPPORT
834 #ifndef INVARIANTS
835 #undef _rw_assert
836 #endif
837 
838 /*
839  * In the non-WITNESS case, rw_assert() can only detect that at least
840  * *some* thread owns an rlock, but it cannot guarantee that *this*
841  * thread owns an rlock.
842  */
843 void
844 _rw_assert(struct rwlock *rw, int what, const char *file, int line)
845 {
846 
847 	if (panicstr != NULL)
848 		return;
849 	switch (what) {
850 	case RA_LOCKED:
851 	case RA_LOCKED | RA_RECURSED:
852 	case RA_LOCKED | RA_NOTRECURSED:
853 	case RA_RLOCKED:
854 #ifdef WITNESS
855 		witness_assert(&rw->lock_object, what, file, line);
856 #else
857 		/*
858 		 * If some other thread has a write lock or we have one
859 		 * and are asserting a read lock, fail.  Also, if no one
860 		 * has a lock at all, fail.
861 		 */
862 		if (rw->rw_lock == RW_UNLOCKED ||
863 		    (!(rw->rw_lock & RW_LOCK_READ) && (what == RA_RLOCKED ||
864 		    rw_wowner(rw) != curthread)))
865 			panic("Lock %s not %slocked @ %s:%d\n",
866 			    rw->lock_object.lo_name, (what == RA_RLOCKED) ?
867 			    "read " : "", file, line);
868 
869 		if (!(rw->rw_lock & RW_LOCK_READ)) {
870 			if (rw_recursed(rw)) {
871 				if (what & RA_NOTRECURSED)
872 					panic("Lock %s recursed @ %s:%d\n",
873 					    rw->lock_object.lo_name, file,
874 					    line);
875 			} else if (what & RA_RECURSED)
876 				panic("Lock %s not recursed @ %s:%d\n",
877 				    rw->lock_object.lo_name, file, line);
878 		}
879 #endif
880 		break;
881 	case RA_WLOCKED:
882 	case RA_WLOCKED | RA_RECURSED:
883 	case RA_WLOCKED | RA_NOTRECURSED:
884 		if (rw_wowner(rw) != curthread)
885 			panic("Lock %s not exclusively locked @ %s:%d\n",
886 			    rw->lock_object.lo_name, file, line);
887 		if (rw_recursed(rw)) {
888 			if (what & RA_NOTRECURSED)
889 				panic("Lock %s recursed @ %s:%d\n",
890 				    rw->lock_object.lo_name, file, line);
891 		} else if (what & RA_RECURSED)
892 			panic("Lock %s not recursed @ %s:%d\n",
893 			    rw->lock_object.lo_name, file, line);
894 		break;
895 	case RA_UNLOCKED:
896 #ifdef WITNESS
897 		witness_assert(&rw->lock_object, what, file, line);
898 #else
899 		/*
900 		 * If we hold a write lock fail.  We can't reliably check
901 		 * to see if we hold a read lock or not.
902 		 */
903 		if (rw_wowner(rw) == curthread)
904 			panic("Lock %s exclusively locked @ %s:%d\n",
905 			    rw->lock_object.lo_name, file, line);
906 #endif
907 		break;
908 	default:
909 		panic("Unknown rw lock assertion: %d @ %s:%d", what, file,
910 		    line);
911 	}
912 }
913 #endif /* INVARIANT_SUPPORT */
914 
915 #ifdef DDB
916 void
917 db_show_rwlock(struct lock_object *lock)
918 {
919 	struct rwlock *rw;
920 	struct thread *td;
921 
922 	rw = (struct rwlock *)lock;
923 
924 	db_printf(" state: ");
925 	if (rw->rw_lock == RW_UNLOCKED)
926 		db_printf("UNLOCKED\n");
927 	else if (rw->rw_lock == RW_DESTROYED) {
928 		db_printf("DESTROYED\n");
929 		return;
930 	} else if (rw->rw_lock & RW_LOCK_READ)
931 		db_printf("RLOCK: %ju locks\n",
932 		    (uintmax_t)(RW_READERS(rw->rw_lock)));
933 	else {
934 		td = rw_wowner(rw);
935 		db_printf("WLOCK: %p (tid %d, pid %d, \"%s\")\n", td,
936 		    td->td_tid, td->td_proc->p_pid, td->td_name);
937 		if (rw_recursed(rw))
938 			db_printf(" recursed: %u\n", rw->rw_recurse);
939 	}
940 	db_printf(" waiters: ");
941 	switch (rw->rw_lock & (RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS)) {
942 	case RW_LOCK_READ_WAITERS:
943 		db_printf("readers\n");
944 		break;
945 	case RW_LOCK_WRITE_WAITERS:
946 		db_printf("writers\n");
947 		break;
948 	case RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS:
949 		db_printf("readers and writers\n");
950 		break;
951 	default:
952 		db_printf("none\n");
953 		break;
954 	}
955 }
956 
957 #endif
958