xref: /freebsd/sys/kern/kern_rwlock.c (revision 30d239bc4c510432e65a84fa1c14ed67a3ab1c92)
1 /*-
2  * Copyright (c) 2006 John Baldwin <jhb@FreeBSD.org>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of the author nor the names of any co-contributors
14  *    may be used to endorse or promote products derived from this software
15  *    without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27  * SUCH DAMAGE.
28  */
29 
30 /*
31  * Machine independent bits of reader/writer lock implementation.
32  */
33 
34 #include <sys/cdefs.h>
35 __FBSDID("$FreeBSD$");
36 
37 #include "opt_ddb.h"
38 #include "opt_no_adaptive_rwlocks.h"
39 
40 #include <sys/param.h>
41 #include <sys/ktr.h>
42 #include <sys/lock.h>
43 #include <sys/mutex.h>
44 #include <sys/proc.h>
45 #include <sys/rwlock.h>
46 #include <sys/systm.h>
47 #include <sys/turnstile.h>
48 
49 #include <machine/cpu.h>
50 
51 CTASSERT((RW_RECURSE & LO_CLASSFLAGS) == RW_RECURSE);
52 
53 #if defined(SMP) && !defined(NO_ADAPTIVE_RWLOCKS)
54 #define	ADAPTIVE_RWLOCKS
55 #endif
56 
57 #ifdef DDB
58 #include <ddb/ddb.h>
59 
60 static void	db_show_rwlock(struct lock_object *lock);
61 #endif
62 static void	lock_rw(struct lock_object *lock, int how);
63 static int	unlock_rw(struct lock_object *lock);
64 
65 struct lock_class lock_class_rw = {
66 	.lc_name = "rw",
67 	.lc_flags = LC_SLEEPLOCK | LC_RECURSABLE | LC_UPGRADABLE,
68 #ifdef DDB
69 	.lc_ddb_show = db_show_rwlock,
70 #endif
71 	.lc_lock = lock_rw,
72 	.lc_unlock = unlock_rw,
73 };
74 
75 /*
76  * Return a pointer to the owning thread if the lock is write-locked or
77  * NULL if the lock is unlocked or read-locked.
78  */
79 #define	rw_wowner(rw)							\
80 	((rw)->rw_lock & RW_LOCK_READ ? NULL :				\
81 	    (struct thread *)RW_OWNER((rw)->rw_lock))
82 
83 /*
84  * Returns if a write owner is recursed.  Write ownership is not assured
85  * here and should be previously checked.
86  */
87 #define	rw_recursed(rw)		((rw)->rw_recurse != 0)
88 
89 /*
90  * Return true if curthread helds the lock.
91  */
92 #define	rw_wlocked(rw)		(rw_wowner((rw)) == curthread)
93 
94 /*
95  * Return a pointer to the owning thread for this lock who should receive
96  * any priority lent by threads that block on this lock.  Currently this
97  * is identical to rw_wowner().
98  */
99 #define	rw_owner(rw)		rw_wowner(rw)
100 
101 #ifndef INVARIANTS
102 #define	_rw_assert(rw, what, file, line)
103 #endif
104 
105 void
106 lock_rw(struct lock_object *lock, int how)
107 {
108 	struct rwlock *rw;
109 
110 	rw = (struct rwlock *)lock;
111 	if (how)
112 		rw_wlock(rw);
113 	else
114 		rw_rlock(rw);
115 }
116 
117 int
118 unlock_rw(struct lock_object *lock)
119 {
120 	struct rwlock *rw;
121 
122 	rw = (struct rwlock *)lock;
123 	rw_assert(rw, RA_LOCKED | LA_NOTRECURSED);
124 	if (rw->rw_lock & RW_LOCK_READ) {
125 		rw_runlock(rw);
126 		return (0);
127 	} else {
128 		rw_wunlock(rw);
129 		return (1);
130 	}
131 }
132 
133 void
134 rw_init_flags(struct rwlock *rw, const char *name, int opts)
135 {
136 	int flags;
137 
138 	MPASS((opts & ~(RW_DUPOK | RW_NOPROFILE | RW_NOWITNESS | RW_QUIET |
139 	    RW_RECURSE)) == 0);
140 
141 	flags = LO_UPGRADABLE | LO_RECURSABLE;
142 	if (opts & RW_DUPOK)
143 		flags |= LO_DUPOK;
144 	if (opts & RW_NOPROFILE)
145 		flags |= LO_NOPROFILE;
146 	if (!(opts & RW_NOWITNESS))
147 		flags |= LO_WITNESS;
148 	if (opts & RW_QUIET)
149 		flags |= LO_QUIET;
150 	flags |= opts & RW_RECURSE;
151 
152 	rw->rw_lock = RW_UNLOCKED;
153 	rw->rw_recurse = 0;
154 	lock_init(&rw->lock_object, &lock_class_rw, name, NULL, flags);
155 }
156 
157 void
158 rw_destroy(struct rwlock *rw)
159 {
160 
161 	KASSERT(rw->rw_lock == RW_UNLOCKED, ("rw lock not unlocked"));
162 	KASSERT(rw->rw_recurse == 0, ("rw lock still recursed"));
163 	rw->rw_lock = RW_DESTROYED;
164 	lock_destroy(&rw->lock_object);
165 }
166 
167 void
168 rw_sysinit(void *arg)
169 {
170 	struct rw_args *args = arg;
171 
172 	rw_init(args->ra_rw, args->ra_desc);
173 }
174 
175 int
176 rw_wowned(struct rwlock *rw)
177 {
178 
179 	return (rw_wowner(rw) == curthread);
180 }
181 
182 void
183 _rw_wlock(struct rwlock *rw, const char *file, int line)
184 {
185 
186 	MPASS(curthread != NULL);
187 	KASSERT(rw->rw_lock != RW_DESTROYED,
188 	    ("rw_wlock() of destroyed rwlock @ %s:%d", file, line));
189 	KASSERT(rw_wowner(rw) != curthread,
190 	    ("%s (%s): wlock already held @ %s:%d", __func__,
191 	    rw->lock_object.lo_name, file, line));
192 	WITNESS_CHECKORDER(&rw->lock_object, LOP_NEWORDER | LOP_EXCLUSIVE, file,
193 	    line);
194 	__rw_wlock(rw, curthread, file, line);
195 	LOCK_LOG_LOCK("WLOCK", &rw->lock_object, 0, rw->rw_recurse, file, line);
196 	WITNESS_LOCK(&rw->lock_object, LOP_EXCLUSIVE, file, line);
197 	curthread->td_locks++;
198 }
199 
200 void
201 _rw_wunlock(struct rwlock *rw, const char *file, int line)
202 {
203 
204 	MPASS(curthread != NULL);
205 	KASSERT(rw->rw_lock != RW_DESTROYED,
206 	    ("rw_wunlock() of destroyed rwlock @ %s:%d", file, line));
207 	_rw_assert(rw, RA_WLOCKED, file, line);
208 	curthread->td_locks--;
209 	WITNESS_UNLOCK(&rw->lock_object, LOP_EXCLUSIVE, file, line);
210 	LOCK_LOG_LOCK("WUNLOCK", &rw->lock_object, 0, rw->rw_recurse, file,
211 	    line);
212 	if (!rw_recursed(rw))
213 		lock_profile_release_lock(&rw->lock_object);
214 	__rw_wunlock(rw, curthread, file, line);
215 }
216 
217 void
218 _rw_rlock(struct rwlock *rw, const char *file, int line)
219 {
220 	struct turnstile *ts;
221 #ifdef ADAPTIVE_RWLOCKS
222 	volatile struct thread *owner;
223 #endif
224 #ifdef LOCK_PROFILING_SHARED
225 	uint64_t waittime = 0;
226 	int contested = 0;
227 #endif
228 	uintptr_t x;
229 
230 	KASSERT(rw->rw_lock != RW_DESTROYED,
231 	    ("rw_rlock() of destroyed rwlock @ %s:%d", file, line));
232 	KASSERT(rw_wowner(rw) != curthread,
233 	    ("%s (%s): wlock already held @ %s:%d", __func__,
234 	    rw->lock_object.lo_name, file, line));
235 	WITNESS_CHECKORDER(&rw->lock_object, LOP_NEWORDER, file, line);
236 
237 	/*
238 	 * Note that we don't make any attempt to try to block read
239 	 * locks once a writer has blocked on the lock.  The reason is
240 	 * that we currently allow for read locks to recurse and we
241 	 * don't keep track of all the holders of read locks.  Thus, if
242 	 * we were to block readers once a writer blocked and a reader
243 	 * tried to recurse on their reader lock after a writer had
244 	 * blocked we would end up in a deadlock since the reader would
245 	 * be blocked on the writer, and the writer would be blocked
246 	 * waiting for the reader to release its original read lock.
247 	 */
248 	for (;;) {
249 		/*
250 		 * Handle the easy case.  If no other thread has a write
251 		 * lock, then try to bump up the count of read locks.  Note
252 		 * that we have to preserve the current state of the
253 		 * RW_LOCK_WRITE_WAITERS flag.  If we fail to acquire a
254 		 * read lock, then rw_lock must have changed, so restart
255 		 * the loop.  Note that this handles the case of a
256 		 * completely unlocked rwlock since such a lock is encoded
257 		 * as a read lock with no waiters.
258 		 */
259 		x = rw->rw_lock;
260 		if (x & RW_LOCK_READ) {
261 
262 			/*
263 			 * The RW_LOCK_READ_WAITERS flag should only be set
264 			 * if another thread currently holds a write lock,
265 			 * and in that case RW_LOCK_READ should be clear.
266 			 */
267 			MPASS((x & RW_LOCK_READ_WAITERS) == 0);
268 			if (atomic_cmpset_acq_ptr(&rw->rw_lock, x,
269 			    x + RW_ONE_READER)) {
270 #ifdef LOCK_PROFILING_SHARED
271 				if (RW_READERS(x) == 0)
272 					lock_profile_obtain_lock_success(
273 					    &rw->lock_object, contested,
274 					    waittime, file, line);
275 #endif
276 				if (LOCK_LOG_TEST(&rw->lock_object, 0))
277 					CTR4(KTR_LOCK,
278 					    "%s: %p succeed %p -> %p", __func__,
279 					    rw, (void *)x,
280 					    (void *)(x + RW_ONE_READER));
281 				break;
282 			}
283 			cpu_spinwait();
284 			continue;
285 		}
286 
287 		/*
288 		 * Okay, now it's the hard case.  Some other thread already
289 		 * has a write lock, so acquire the turnstile lock so we can
290 		 * begin the process of blocking.
291 		 */
292 		ts = turnstile_trywait(&rw->lock_object);
293 
294 		/*
295 		 * The lock might have been released while we spun, so
296 		 * recheck its state and restart the loop if there is no
297 		 * longer a write lock.
298 		 */
299 		x = rw->rw_lock;
300 		if (x & RW_LOCK_READ) {
301 			turnstile_cancel(ts);
302 			cpu_spinwait();
303 			continue;
304 		}
305 
306 		/*
307 		 * Ok, it's still a write lock.  If the RW_LOCK_READ_WAITERS
308 		 * flag is already set, then we can go ahead and block.  If
309 		 * it is not set then try to set it.  If we fail to set it
310 		 * drop the turnstile lock and restart the loop.
311 		 */
312 		if (!(x & RW_LOCK_READ_WAITERS)) {
313 			if (!atomic_cmpset_ptr(&rw->rw_lock, x,
314 			    x | RW_LOCK_READ_WAITERS)) {
315 				turnstile_cancel(ts);
316 				cpu_spinwait();
317 				continue;
318 			}
319 			if (LOCK_LOG_TEST(&rw->lock_object, 0))
320 				CTR2(KTR_LOCK, "%s: %p set read waiters flag",
321 				    __func__, rw);
322 		}
323 
324 #ifdef ADAPTIVE_RWLOCKS
325 		/*
326 		 * If the owner is running on another CPU, spin until
327 		 * the owner stops running or the state of the lock
328 		 * changes.
329 		 */
330 		owner = (struct thread *)RW_OWNER(x);
331 		if (TD_IS_RUNNING(owner)) {
332 			turnstile_cancel(ts);
333 			if (LOCK_LOG_TEST(&rw->lock_object, 0))
334 				CTR3(KTR_LOCK, "%s: spinning on %p held by %p",
335 				    __func__, rw, owner);
336 #ifdef LOCK_PROFILING_SHARED
337 			lock_profile_obtain_lock_failed(&rw->lock_object,
338 			    &contested, &waittime);
339 #endif
340 			while ((struct thread*)RW_OWNER(rw->rw_lock)== owner &&
341 			    TD_IS_RUNNING(owner))
342 				cpu_spinwait();
343 			continue;
344 		}
345 #endif
346 
347 		/*
348 		 * We were unable to acquire the lock and the read waiters
349 		 * flag is set, so we must block on the turnstile.
350 		 */
351 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
352 			CTR2(KTR_LOCK, "%s: %p blocking on turnstile", __func__,
353 			    rw);
354 #ifdef LOCK_PROFILING_SHARED
355 		lock_profile_obtain_lock_failed(&rw->lock_object, &contested,
356 		    &waittime);
357 #endif
358 		turnstile_wait(ts, rw_owner(rw), TS_SHARED_QUEUE);
359 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
360 			CTR2(KTR_LOCK, "%s: %p resuming from turnstile",
361 			    __func__, rw);
362 	}
363 
364 	/*
365 	 * TODO: acquire "owner of record" here.  Here be turnstile dragons
366 	 * however.  turnstiles don't like owners changing between calls to
367 	 * turnstile_wait() currently.
368 	 */
369 
370 	LOCK_LOG_LOCK("RLOCK", &rw->lock_object, 0, 0, file, line);
371 	WITNESS_LOCK(&rw->lock_object, 0, file, line);
372 	curthread->td_locks++;
373 }
374 
375 void
376 _rw_runlock(struct rwlock *rw, const char *file, int line)
377 {
378 	struct turnstile *ts;
379 	uintptr_t x;
380 
381 	KASSERT(rw->rw_lock != RW_DESTROYED,
382 	    ("rw_runlock() of destroyed rwlock @ %s:%d", file, line));
383 	_rw_assert(rw, RA_RLOCKED, file, line);
384 	curthread->td_locks--;
385 	WITNESS_UNLOCK(&rw->lock_object, 0, file, line);
386 	LOCK_LOG_LOCK("RUNLOCK", &rw->lock_object, 0, 0, file, line);
387 
388 	/* TODO: drop "owner of record" here. */
389 
390 	for (;;) {
391 		/*
392 		 * See if there is more than one read lock held.  If so,
393 		 * just drop one and return.
394 		 */
395 		x = rw->rw_lock;
396 		if (RW_READERS(x) > 1) {
397 			if (atomic_cmpset_ptr(&rw->rw_lock, x,
398 			    x - RW_ONE_READER)) {
399 				if (LOCK_LOG_TEST(&rw->lock_object, 0))
400 					CTR4(KTR_LOCK,
401 					    "%s: %p succeeded %p -> %p",
402 					    __func__, rw, (void *)x,
403 					    (void *)(x - RW_ONE_READER));
404 				break;
405 			}
406 			continue;
407 		}
408 
409 
410 		/*
411 		 * We should never have read waiters while at least one
412 		 * thread holds a read lock.  (See note above)
413 		 */
414 		KASSERT(!(x & RW_LOCK_READ_WAITERS),
415 		    ("%s: waiting readers", __func__));
416 #ifdef LOCK_PROFILING_SHARED
417 		lock_profile_release_lock(&rw->lock_object);
418 #endif
419 
420 		/*
421 		 * If there aren't any waiters for a write lock, then try
422 		 * to drop it quickly.
423 		 */
424 		if (!(x & RW_LOCK_WRITE_WAITERS)) {
425 
426 			/*
427 			 * There shouldn't be any flags set and we should
428 			 * be the only read lock.  If we fail to release
429 			 * the single read lock, then another thread might
430 			 * have just acquired a read lock, so go back up
431 			 * to the multiple read locks case.
432 			 */
433 			MPASS(x == RW_READERS_LOCK(1));
434 			if (atomic_cmpset_ptr(&rw->rw_lock, RW_READERS_LOCK(1),
435 			    RW_UNLOCKED)) {
436 				if (LOCK_LOG_TEST(&rw->lock_object, 0))
437 					CTR2(KTR_LOCK, "%s: %p last succeeded",
438 					    __func__, rw);
439 				break;
440 			}
441 			continue;
442 		}
443 
444 		/*
445 		 * There should just be one reader with one or more
446 		 * writers waiting.
447 		 */
448 		MPASS(x == (RW_READERS_LOCK(1) | RW_LOCK_WRITE_WAITERS));
449 
450 		/*
451 		 * Ok, we know we have a waiting writer and we think we
452 		 * are the last reader, so grab the turnstile lock.
453 		 */
454 		turnstile_chain_lock(&rw->lock_object);
455 
456 		/*
457 		 * Try to drop our lock leaving the lock in a unlocked
458 		 * state.
459 		 *
460 		 * If you wanted to do explicit lock handoff you'd have to
461 		 * do it here.  You'd also want to use turnstile_signal()
462 		 * and you'd have to handle the race where a higher
463 		 * priority thread blocks on the write lock before the
464 		 * thread you wakeup actually runs and have the new thread
465 		 * "steal" the lock.  For now it's a lot simpler to just
466 		 * wakeup all of the waiters.
467 		 *
468 		 * As above, if we fail, then another thread might have
469 		 * acquired a read lock, so drop the turnstile lock and
470 		 * restart.
471 		 */
472 		if (!atomic_cmpset_ptr(&rw->rw_lock,
473 		    RW_READERS_LOCK(1) | RW_LOCK_WRITE_WAITERS, RW_UNLOCKED)) {
474 			turnstile_chain_unlock(&rw->lock_object);
475 			continue;
476 		}
477 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
478 			CTR2(KTR_LOCK, "%s: %p last succeeded with waiters",
479 			    __func__, rw);
480 
481 		/*
482 		 * Ok.  The lock is released and all that's left is to
483 		 * wake up the waiters.  Note that the lock might not be
484 		 * free anymore, but in that case the writers will just
485 		 * block again if they run before the new lock holder(s)
486 		 * release the lock.
487 		 */
488 		ts = turnstile_lookup(&rw->lock_object);
489 		MPASS(ts != NULL);
490 		turnstile_broadcast(ts, TS_EXCLUSIVE_QUEUE);
491 		turnstile_unpend(ts, TS_SHARED_LOCK);
492 		turnstile_chain_unlock(&rw->lock_object);
493 		break;
494 	}
495 }
496 
497 /*
498  * This function is called when we are unable to obtain a write lock on the
499  * first try.  This means that at least one other thread holds either a
500  * read or write lock.
501  */
502 void
503 _rw_wlock_hard(struct rwlock *rw, uintptr_t tid, const char *file, int line)
504 {
505 	struct turnstile *ts;
506 #ifdef ADAPTIVE_RWLOCKS
507 	volatile struct thread *owner;
508 #endif
509 	uint64_t waittime = 0;
510 	uintptr_t v;
511 	int contested = 0;
512 
513 	if (rw_wlocked(rw)) {
514 		KASSERT(rw->lock_object.lo_flags & RW_RECURSE,
515 		    ("%s: recursing but non-recursive rw %s @ %s:%d\n",
516 		    __func__, rw->lock_object.lo_name, file, line));
517 		rw->rw_recurse++;
518 		atomic_set_ptr(&rw->rw_lock, RW_LOCK_RECURSED);
519 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
520 			CTR2(KTR_LOCK, "%s: %p recursing", __func__, rw);
521 		return;
522 	}
523 
524 	if (LOCK_LOG_TEST(&rw->lock_object, 0))
525 		CTR5(KTR_LOCK, "%s: %s contested (lock=%p) at %s:%d", __func__,
526 		    rw->lock_object.lo_name, (void *)rw->rw_lock, file, line);
527 
528 	while (!_rw_write_lock(rw, tid)) {
529 		ts = turnstile_trywait(&rw->lock_object);
530 		v = rw->rw_lock;
531 
532 		/*
533 		 * If the lock was released while spinning on the
534 		 * turnstile chain lock, try again.
535 		 */
536 		if (v == RW_UNLOCKED) {
537 			turnstile_cancel(ts);
538 			cpu_spinwait();
539 			continue;
540 		}
541 
542 		/*
543 		 * If the lock was released by a writer with both readers
544 		 * and writers waiting and a reader hasn't woken up and
545 		 * acquired the lock yet, rw_lock will be set to the
546 		 * value RW_UNLOCKED | RW_LOCK_WRITE_WAITERS.  If we see
547 		 * that value, try to acquire it once.  Note that we have
548 		 * to preserve the RW_LOCK_WRITE_WAITERS flag as there are
549 		 * other writers waiting still.  If we fail, restart the
550 		 * loop.
551 		 */
552 		if (v == (RW_UNLOCKED | RW_LOCK_WRITE_WAITERS)) {
553 			if (atomic_cmpset_acq_ptr(&rw->rw_lock,
554 			    RW_UNLOCKED | RW_LOCK_WRITE_WAITERS,
555 			    tid | RW_LOCK_WRITE_WAITERS)) {
556 				turnstile_claim(ts);
557 				CTR2(KTR_LOCK, "%s: %p claimed by new writer",
558 				    __func__, rw);
559 				break;
560 			}
561 			turnstile_cancel(ts);
562 			cpu_spinwait();
563 			continue;
564 		}
565 
566 		/*
567 		 * If the RW_LOCK_WRITE_WAITERS flag isn't set, then try to
568 		 * set it.  If we fail to set it, then loop back and try
569 		 * again.
570 		 */
571 		if (!(v & RW_LOCK_WRITE_WAITERS)) {
572 			if (!atomic_cmpset_ptr(&rw->rw_lock, v,
573 			    v | RW_LOCK_WRITE_WAITERS)) {
574 				turnstile_cancel(ts);
575 				cpu_spinwait();
576 				continue;
577 			}
578 			if (LOCK_LOG_TEST(&rw->lock_object, 0))
579 				CTR2(KTR_LOCK, "%s: %p set write waiters flag",
580 				    __func__, rw);
581 		}
582 
583 #ifdef ADAPTIVE_RWLOCKS
584 		/*
585 		 * If the lock is write locked and the owner is
586 		 * running on another CPU, spin until the owner stops
587 		 * running or the state of the lock changes.
588 		 */
589 		owner = (struct thread *)RW_OWNER(v);
590 		if (!(v & RW_LOCK_READ) && TD_IS_RUNNING(owner)) {
591 			turnstile_cancel(ts);
592 			if (LOCK_LOG_TEST(&rw->lock_object, 0))
593 				CTR3(KTR_LOCK, "%s: spinning on %p held by %p",
594 				    __func__, rw, owner);
595 			lock_profile_obtain_lock_failed(&rw->lock_object,
596 			    &contested, &waittime);
597 			while ((struct thread*)RW_OWNER(rw->rw_lock)== owner &&
598 			    TD_IS_RUNNING(owner))
599 				cpu_spinwait();
600 			continue;
601 		}
602 #endif
603 
604 		/*
605 		 * We were unable to acquire the lock and the write waiters
606 		 * flag is set, so we must block on the turnstile.
607 		 */
608 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
609 			CTR2(KTR_LOCK, "%s: %p blocking on turnstile", __func__,
610 			    rw);
611 		lock_profile_obtain_lock_failed(&rw->lock_object, &contested,
612 		    &waittime);
613 		turnstile_wait(ts, rw_owner(rw), TS_EXCLUSIVE_QUEUE);
614 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
615 			CTR2(KTR_LOCK, "%s: %p resuming from turnstile",
616 			    __func__, rw);
617 	}
618 	lock_profile_obtain_lock_success(&rw->lock_object, contested, waittime,
619 	    file, line);
620 }
621 
622 /*
623  * This function is called if the first try at releasing a write lock failed.
624  * This means that one of the 2 waiter bits must be set indicating that at
625  * least one thread is waiting on this lock.
626  */
627 void
628 _rw_wunlock_hard(struct rwlock *rw, uintptr_t tid, const char *file, int line)
629 {
630 	struct turnstile *ts;
631 	uintptr_t v;
632 	int queue;
633 
634 	if (rw_wlocked(rw) && rw_recursed(rw)) {
635 		if ((--rw->rw_recurse) == 0)
636 			atomic_clear_ptr(&rw->rw_lock, RW_LOCK_RECURSED);
637 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
638 			CTR2(KTR_LOCK, "%s: %p unrecursing", __func__, rw);
639 		return;
640 	}
641 
642 	KASSERT(rw->rw_lock & (RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS),
643 	    ("%s: neither of the waiter flags are set", __func__));
644 
645 	if (LOCK_LOG_TEST(&rw->lock_object, 0))
646 		CTR2(KTR_LOCK, "%s: %p contested", __func__, rw);
647 
648 	turnstile_chain_lock(&rw->lock_object);
649 	ts = turnstile_lookup(&rw->lock_object);
650 
651 #ifdef ADAPTIVE_RWLOCKS
652 	/*
653 	 * There might not be a turnstile for this lock if all of
654 	 * the waiters are adaptively spinning.  In that case, just
655 	 * reset the lock to the unlocked state and return.
656 	 */
657 	if (ts == NULL) {
658 		atomic_store_rel_ptr(&rw->rw_lock, RW_UNLOCKED);
659 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
660 			CTR2(KTR_LOCK, "%s: %p no sleepers", __func__, rw);
661 		turnstile_chain_unlock(&rw->lock_object);
662 		return;
663 	}
664 #else
665 	MPASS(ts != NULL);
666 #endif
667 
668 	/*
669 	 * Use the same algo as sx locks for now.  Prefer waking up shared
670 	 * waiters if we have any over writers.  This is probably not ideal.
671 	 *
672 	 * 'v' is the value we are going to write back to rw_lock.  If we
673 	 * have waiters on both queues, we need to preserve the state of
674 	 * the waiter flag for the queue we don't wake up.  For now this is
675 	 * hardcoded for the algorithm mentioned above.
676 	 *
677 	 * In the case of both readers and writers waiting we wakeup the
678 	 * readers but leave the RW_LOCK_WRITE_WAITERS flag set.  If a
679 	 * new writer comes in before a reader it will claim the lock up
680 	 * above.  There is probably a potential priority inversion in
681 	 * there that could be worked around either by waking both queues
682 	 * of waiters or doing some complicated lock handoff gymnastics.
683 	 *
684 	 * Note that in the ADAPTIVE_RWLOCKS case, if both flags are
685 	 * set, there might not be any actual writers on the turnstile
686 	 * as they might all be spinning.  In that case, we don't want
687 	 * to preserve the RW_LOCK_WRITE_WAITERS flag as the turnstile
688 	 * is going to go away once we wakeup all the readers.
689 	 */
690 	v = RW_UNLOCKED;
691 	if (rw->rw_lock & RW_LOCK_READ_WAITERS) {
692 		queue = TS_SHARED_QUEUE;
693 #ifdef ADAPTIVE_RWLOCKS
694 		if (rw->rw_lock & RW_LOCK_WRITE_WAITERS &&
695 		    !turnstile_empty(ts, TS_EXCLUSIVE_QUEUE))
696 			v |= RW_LOCK_WRITE_WAITERS;
697 #else
698 		v |= (rw->rw_lock & RW_LOCK_WRITE_WAITERS);
699 #endif
700 	} else
701 		queue = TS_EXCLUSIVE_QUEUE;
702 
703 #ifdef ADAPTIVE_RWLOCKS
704 	/*
705 	 * We have to make sure that we actually have waiters to
706 	 * wakeup.  If they are all spinning, then we just need to
707 	 * disown the turnstile and return.
708 	 */
709 	if (turnstile_empty(ts, queue)) {
710 		if (LOCK_LOG_TEST(&rw->lock_object, 0))
711 			CTR2(KTR_LOCK, "%s: %p no sleepers 2", __func__, rw);
712 		atomic_store_rel_ptr(&rw->rw_lock, v);
713 		turnstile_disown(ts);
714 		turnstile_chain_unlock(&rw->lock_object);
715 		return;
716 	}
717 #endif
718 
719 	/* Wake up all waiters for the specific queue. */
720 	if (LOCK_LOG_TEST(&rw->lock_object, 0))
721 		CTR3(KTR_LOCK, "%s: %p waking up %s waiters", __func__, rw,
722 		    queue == TS_SHARED_QUEUE ? "read" : "write");
723 	turnstile_broadcast(ts, queue);
724 	atomic_store_rel_ptr(&rw->rw_lock, v);
725 	turnstile_unpend(ts, TS_EXCLUSIVE_LOCK);
726 	turnstile_chain_unlock(&rw->lock_object);
727 }
728 
729 /*
730  * Attempt to do a non-blocking upgrade from a read lock to a write
731  * lock.  This will only succeed if this thread holds a single read
732  * lock.  Returns true if the upgrade succeeded and false otherwise.
733  */
734 int
735 _rw_try_upgrade(struct rwlock *rw, const char *file, int line)
736 {
737 	uintptr_t v, tid;
738 	struct turnstile *ts;
739 	int success;
740 
741 	KASSERT(rw->rw_lock != RW_DESTROYED,
742 	    ("rw_try_upgrade() of destroyed rwlock @ %s:%d", file, line));
743 	_rw_assert(rw, RA_RLOCKED, file, line);
744 
745 	/*
746 	 * Attempt to switch from one reader to a writer.  If there
747 	 * are any write waiters, then we will have to lock the
748 	 * turnstile first to prevent races with another writer
749 	 * calling turnstile_wait() before we have claimed this
750 	 * turnstile.  So, do the simple case of no waiters first.
751 	 */
752 	tid = (uintptr_t)curthread;
753 	if (!(rw->rw_lock & RW_LOCK_WRITE_WAITERS)) {
754 		success = atomic_cmpset_ptr(&rw->rw_lock, RW_READERS_LOCK(1),
755 		    tid);
756 		goto out;
757 	}
758 
759 	/*
760 	 * Ok, we think we have write waiters, so lock the
761 	 * turnstile.
762 	 */
763 	ts = turnstile_trywait(&rw->lock_object);
764 
765 	/*
766 	 * Try to switch from one reader to a writer again.  This time
767 	 * we honor the current state of the RW_LOCK_WRITE_WAITERS
768 	 * flag.  If we obtain the lock with the flag set, then claim
769 	 * ownership of the turnstile.  In the ADAPTIVE_RWLOCKS case
770 	 * it is possible for there to not be an associated turnstile
771 	 * even though there are waiters if all of the waiters are
772 	 * spinning.
773 	 */
774 	v = rw->rw_lock & RW_LOCK_WRITE_WAITERS;
775 	success = atomic_cmpset_ptr(&rw->rw_lock, RW_READERS_LOCK(1) | v,
776 	    tid | v);
777 #ifdef ADAPTIVE_RWLOCKS
778 	if (success && v && turnstile_lookup(&rw->lock_object) != NULL)
779 #else
780 	if (success && v)
781 #endif
782 		turnstile_claim(ts);
783 	else
784 		turnstile_cancel(ts);
785 out:
786 	LOCK_LOG_TRY("WUPGRADE", &rw->lock_object, 0, success, file, line);
787 	if (success)
788 		WITNESS_UPGRADE(&rw->lock_object, LOP_EXCLUSIVE | LOP_TRYLOCK,
789 		    file, line);
790 	return (success);
791 }
792 
793 /*
794  * Downgrade a write lock into a single read lock.
795  */
796 void
797 _rw_downgrade(struct rwlock *rw, const char *file, int line)
798 {
799 	struct turnstile *ts;
800 	uintptr_t tid, v;
801 
802 	KASSERT(rw->rw_lock != RW_DESTROYED,
803 	    ("rw_downgrade() of destroyed rwlock @ %s:%d", file, line));
804 	_rw_assert(rw, RA_WLOCKED | RA_NOTRECURSED, file, line);
805 #ifndef INVARIANTS
806 	if (rw_recursed(rw))
807 		panic("downgrade of a recursed lock");
808 #endif
809 
810 	WITNESS_DOWNGRADE(&rw->lock_object, 0, file, line);
811 
812 	/*
813 	 * Convert from a writer to a single reader.  First we handle
814 	 * the easy case with no waiters.  If there are any waiters, we
815 	 * lock the turnstile, "disown" the lock, and awaken any read
816 	 * waiters.
817 	 */
818 	tid = (uintptr_t)curthread;
819 	if (atomic_cmpset_rel_ptr(&rw->rw_lock, tid, RW_READERS_LOCK(1)))
820 		goto out;
821 
822 	/*
823 	 * Ok, we think we have waiters, so lock the turnstile so we can
824 	 * read the waiter flags without any races.
825 	 */
826 	turnstile_chain_lock(&rw->lock_object);
827 	v = rw->rw_lock;
828 	MPASS(v & (RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS));
829 
830 	/*
831 	 * Downgrade from a write lock while preserving
832 	 * RW_LOCK_WRITE_WAITERS and give up ownership of the
833 	 * turnstile.  If there are any read waiters, wake them up.
834 	 *
835 	 * For ADAPTIVE_RWLOCKS, we have to allow for the fact that
836 	 * all of the read waiters might be spinning.  In that case,
837 	 * act as if RW_LOCK_READ_WAITERS is not set.  Also, only
838 	 * preserve the RW_LOCK_WRITE_WAITERS flag if at least one
839 	 * writer is blocked on the turnstile.
840 	 */
841 	ts = turnstile_lookup(&rw->lock_object);
842 #ifdef ADAPTIVE_RWLOCKS
843 	if (ts == NULL)
844 		v &= ~(RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS);
845 	else if (v & RW_LOCK_READ_WAITERS &&
846 	    turnstile_empty(ts, TS_SHARED_QUEUE))
847 		v &= ~RW_LOCK_READ_WAITERS;
848 	else if (v & RW_LOCK_WRITE_WAITERS &&
849 	    turnstile_empty(ts, TS_EXCLUSIVE_QUEUE))
850 		v &= ~RW_LOCK_WRITE_WAITERS;
851 #else
852 	MPASS(ts != NULL);
853 #endif
854 	if (v & RW_LOCK_READ_WAITERS)
855 		turnstile_broadcast(ts, TS_SHARED_QUEUE);
856 	atomic_store_rel_ptr(&rw->rw_lock, RW_READERS_LOCK(1) |
857 	    (v & RW_LOCK_WRITE_WAITERS));
858 	if (v & RW_LOCK_READ_WAITERS)
859 		turnstile_unpend(ts, TS_EXCLUSIVE_LOCK);
860 	else if (ts)
861 		turnstile_disown(ts);
862 	turnstile_chain_unlock(&rw->lock_object);
863 out:
864 	LOCK_LOG_LOCK("WDOWNGRADE", &rw->lock_object, 0, 0, file, line);
865 }
866 
867 #ifdef INVARIANT_SUPPORT
868 #ifndef INVARIANTS
869 #undef _rw_assert
870 #endif
871 
872 /*
873  * In the non-WITNESS case, rw_assert() can only detect that at least
874  * *some* thread owns an rlock, but it cannot guarantee that *this*
875  * thread owns an rlock.
876  */
877 void
878 _rw_assert(struct rwlock *rw, int what, const char *file, int line)
879 {
880 
881 	if (panicstr != NULL)
882 		return;
883 	switch (what) {
884 	case RA_LOCKED:
885 	case RA_LOCKED | RA_RECURSED:
886 	case RA_LOCKED | RA_NOTRECURSED:
887 	case RA_RLOCKED:
888 #ifdef WITNESS
889 		witness_assert(&rw->lock_object, what, file, line);
890 #else
891 		/*
892 		 * If some other thread has a write lock or we have one
893 		 * and are asserting a read lock, fail.  Also, if no one
894 		 * has a lock at all, fail.
895 		 */
896 		if (rw->rw_lock == RW_UNLOCKED ||
897 		    (!(rw->rw_lock & RW_LOCK_READ) && (what == RA_RLOCKED ||
898 		    rw_wowner(rw) != curthread)))
899 			panic("Lock %s not %slocked @ %s:%d\n",
900 			    rw->lock_object.lo_name, (what == RA_RLOCKED) ?
901 			    "read " : "", file, line);
902 
903 		if (!(rw->rw_lock & RW_LOCK_READ)) {
904 			if (rw_recursed(rw)) {
905 				if (what & RA_NOTRECURSED)
906 					panic("Lock %s recursed @ %s:%d\n",
907 					    rw->lock_object.lo_name, file,
908 					    line);
909 			} else if (what & RA_RECURSED)
910 				panic("Lock %s not recursed @ %s:%d\n",
911 				    rw->lock_object.lo_name, file, line);
912 		}
913 #endif
914 		break;
915 	case RA_WLOCKED:
916 	case RA_WLOCKED | RA_RECURSED:
917 	case RA_WLOCKED | RA_NOTRECURSED:
918 		if (rw_wowner(rw) != curthread)
919 			panic("Lock %s not exclusively locked @ %s:%d\n",
920 			    rw->lock_object.lo_name, file, line);
921 		if (rw_recursed(rw)) {
922 			if (what & RA_NOTRECURSED)
923 				panic("Lock %s recursed @ %s:%d\n",
924 				    rw->lock_object.lo_name, file, line);
925 		} else if (what & RA_RECURSED)
926 			panic("Lock %s not recursed @ %s:%d\n",
927 			    rw->lock_object.lo_name, file, line);
928 		break;
929 	case RA_UNLOCKED:
930 #ifdef WITNESS
931 		witness_assert(&rw->lock_object, what, file, line);
932 #else
933 		/*
934 		 * If we hold a write lock fail.  We can't reliably check
935 		 * to see if we hold a read lock or not.
936 		 */
937 		if (rw_wowner(rw) == curthread)
938 			panic("Lock %s exclusively locked @ %s:%d\n",
939 			    rw->lock_object.lo_name, file, line);
940 #endif
941 		break;
942 	default:
943 		panic("Unknown rw lock assertion: %d @ %s:%d", what, file,
944 		    line);
945 	}
946 }
947 #endif /* INVARIANT_SUPPORT */
948 
949 #ifdef DDB
950 void
951 db_show_rwlock(struct lock_object *lock)
952 {
953 	struct rwlock *rw;
954 	struct thread *td;
955 
956 	rw = (struct rwlock *)lock;
957 
958 	db_printf(" state: ");
959 	if (rw->rw_lock == RW_UNLOCKED)
960 		db_printf("UNLOCKED\n");
961 	else if (rw->rw_lock == RW_DESTROYED) {
962 		db_printf("DESTROYED\n");
963 		return;
964 	} else if (rw->rw_lock & RW_LOCK_READ)
965 		db_printf("RLOCK: %ju locks\n",
966 		    (uintmax_t)(RW_READERS(rw->rw_lock)));
967 	else {
968 		td = rw_wowner(rw);
969 		db_printf("WLOCK: %p (tid %d, pid %d, \"%s\")\n", td,
970 		    td->td_tid, td->td_proc->p_pid, td->td_proc->p_comm);
971 		if (rw_recursed(rw))
972 			db_printf(" recursed: %u\n", rw->rw_recurse);
973 	}
974 	db_printf(" waiters: ");
975 	switch (rw->rw_lock & (RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS)) {
976 	case RW_LOCK_READ_WAITERS:
977 		db_printf("readers\n");
978 		break;
979 	case RW_LOCK_WRITE_WAITERS:
980 		db_printf("writers\n");
981 		break;
982 	case RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS:
983 		db_printf("readers and writers\n");
984 		break;
985 	default:
986 		db_printf("none\n");
987 		break;
988 	}
989 }
990 
991 #endif
992