xref: /titanic_50/usr/src/uts/common/os/rwlock.c (revision 6aa4fc89ec1cf2cdf7d7c3b9ec059802ac9abe65)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 /*
27  * Copyright (c) 2013, Joyent, Inc.  All rights reserved.
28  */
29 
30 #include <sys/param.h>
31 #include <sys/thread.h>
32 #include <sys/cmn_err.h>
33 #include <sys/debug.h>
34 #include <sys/cpuvar.h>
35 #include <sys/sobject.h>
36 #include <sys/turnstile.h>
37 #include <sys/rwlock.h>
38 #include <sys/rwlock_impl.h>
39 #include <sys/atomic.h>
40 #include <sys/lockstat.h>
41 
42 /*
43  * Big Theory Statement for readers/writer locking primitives.
44  *
45  * An rwlock provides exclusive access to a single thread ("writer") or
46  * concurrent access to multiple threads ("readers").  See rwlock(9F)
47  * for a full description of the interfaces and programming model.
48  * The rest of this comment describes the implementation.
49  *
50  * An rwlock is a single word with the following structure:
51  *
52  *	---------------------------------------------------------------------
53  *	| OWNER (writer) or HOLD COUNT (readers)   | WRLOCK | WRWANT | WAIT |
54  *	---------------------------------------------------------------------
55  *			63 / 31 .. 3			2	1	0
56  *
57  * The waiters bit (0) indicates whether any threads are blocked waiting
58  * for the lock.  The write-wanted bit (1) indicates whether any threads
59  * are blocked waiting for write access.  The write-locked bit (2) indicates
60  * whether the lock is held by a writer, which determines whether the upper
61  * bits (3..31 in ILP32, 3..63 in LP64) should be interpreted as the owner
62  * (thread pointer) or the hold count (number of readers).
63  *
64  * In the absence of any contention, a writer gets the lock by setting
65  * this word to (curthread | RW_WRITE_LOCKED); a reader gets the lock
66  * by incrementing the hold count (i.e. adding 8, aka RW_READ_LOCK).
67  *
68  * A writer will fail to acquire the lock if any other thread owns it.
69  * A reader will fail if the lock is either owned (in the RW_READER and
70  * RW_READER_STARVEWRITER cases) or wanted by a writer (in the RW_READER
71  * case). rw_tryenter() returns 0 in these cases; rw_enter() blocks until
72  * the lock becomes available.
73  *
74  * When a thread blocks it acquires the rwlock's hashed turnstile lock and
75  * attempts to set RW_HAS_WAITERS (and RW_WRITE_WANTED in the writer case)
76  * atomically *only if the lock still appears busy*.  A thread must never
77  * accidentally block for an available lock since there would be no owner
78  * to awaken it.  casip() provides the required atomicity.  Once casip()
79  * succeeds, the decision to block becomes final and irreversible.  The
80  * thread will not become runnable again until it has been granted ownership
81  * of the lock via direct handoff from a former owner as described below.
82  *
83  * In the absence of any waiters, rw_exit() just clears the lock (if it
84  * is write-locked) or decrements the hold count (if it is read-locked).
85  * Note that even if waiters are present, decrementing the hold count
86  * to a non-zero value requires no special action since the lock is still
87  * held by at least one other thread.
88  *
89  * On the "final exit" (transition to unheld state) of a lock with waiters,
90  * rw_exit_wakeup() grabs the turnstile lock and transfers ownership directly
91  * to the next writer or set of readers.  There are several advantages to this
92  * approach: (1) it closes all windows for priority inversion (when a new
93  * writer has grabbed the lock but has not yet inherited from blocked readers);
94  * (2) it prevents starvation of equal-priority threads by granting the lock
95  * in FIFO order; (3) it eliminates the need for a write-wanted count -- a
96  * single bit suffices because the lock remains held until all waiting
97  * writers are gone; (4) when we awaken N readers we can perform a single
98  * "atomic_add(&x, N)" to set the total hold count rather than having all N
99  * threads fight for the cache to perform an "atomic_add(&x, 1)" upon wakeup.
100  *
101  * The most interesting policy decision in rw_exit_wakeup() is which thread
102  * to wake.  Starvation is always possible with priority-based scheduling,
103  * but any sane wakeup policy should at least satisfy these requirements:
104  *
105  * (1) The highest-priority thread in the system should not starve.
106  * (2) The highest-priority writer should not starve.
107  * (3) No writer should starve due to lower-priority threads.
108  * (4) No reader should starve due to lower-priority writers.
109  * (5) If all threads have equal priority, none of them should starve.
110  *
111  * We used to employ a writers-always-win policy, which doesn't even
112  * satisfy (1): a steady stream of low-priority writers can starve out
113  * a real-time reader!  This is clearly a broken policy -- it violates
114  * (1), (4), and (5) -- but it's how rwlocks always used to behave.
115  *
116  * A round-robin policy (exiting readers grant the lock to blocked writers
117  * and vice versa) satisfies all but (3): a single high-priority writer
118  * and many low-priority readers can starve out medium-priority writers.
119  *
120  * A strict priority policy (grant the lock to the highest priority blocked
121  * thread) satisfies everything but (2): a steady stream of high-priority
122  * readers can permanently starve the highest-priority writer.
123  *
124  * The reason we care about (2) is that it's important to process writers
125  * reasonably quickly -- even if they're low priority -- because their very
126  * presence causes all readers to take the slow (blocking) path through this
127  * code.  There is also a general sense that writers deserve some degree of
128  * deference because they're updating the data upon which all readers act.
129  * Presumably this data should not be allowed to become arbitrarily stale
130  * due to writer starvation.  Finally, it seems reasonable to level the
131  * playing field a bit to compensate for the fact that it's so much harder
132  * for a writer to get in when there are already many readers present.
133  *
134  * A hybrid of round-robin and strict priority can be made to satisfy
135  * all five criteria.  In this "writer priority policy" exiting readers
136  * always grant the lock to waiting writers, but exiting writers only
137  * grant the lock to readers of the same or higher priority than the
138  * highest-priority blocked writer.  Thus requirement (2) is satisfied,
139  * necessarily, by a willful act of priority inversion: an exiting reader
140  * will grant the lock to a blocked writer even if there are blocked
141  * readers of higher priority.  The situation is mitigated by the fact
142  * that writers always inherit priority from blocked readers, and the
143  * writer will awaken those readers as soon as it exits the lock.
144  *
145  * Finally, note that this hybrid scheme -- and indeed, any scheme that
146  * satisfies requirement (2) -- has an important consequence:  if a lock is
147  * held as reader and a writer subsequently becomes blocked, any further
148  * readers must be blocked to avoid writer starvation.  This implementation
149  * detail has ramifications for the semantics of rwlocks, as it prohibits
150  * recursively acquiring an rwlock as reader: any writer that wishes to
151  * acquire the lock after the first but before the second acquisition as
152  * reader will block the second acquisition -- resulting in deadlock.  This
153  * itself is not necessarily prohibitive, as it is often straightforward to
154  * prevent a single thread from recursively acquiring an rwlock as reader.
155  * However, a more subtle situation arises when both a traditional mutex and
156  * a reader lock are acquired by two different threads in opposite order.
157  * (That is, one thread first acquires the mutex and then the rwlock as
158  * reader; the other acquires the rwlock as reader and then the mutex.) As
159  * with the single threaded case, this is fine absent a blocked writer: the
160  * thread that acquires the mutex before acquiring the rwlock as reader will
161  * be able to successfully acquire the rwlock -- even as/if the other thread
162  * has the rwlock as reader and is blocked on the held mutex.  However, if
163  * an unrelated writer (that is, a third thread) becomes blocked on the
164  * rwlock after the first thread acquires the rwlock as reader but before
165  * it's able to acquire the mutex, the second thread -- with the mutex held
166  * -- will not be able to acquire the rwlock as reader due to the waiting
167  * writer, deadlocking the three threads.  Unlike the single-threaded
168  * (recursive) rwlock acquisition case, this case can be quite a bit
169  * thornier to fix, especially as there is nothing inherently wrong in the
170  * locking strategy: the deadlock is really induced by requirement (2), not
171  * the consumers of the rwlock.  To permit such consumers, we allow rwlock
172  * acquirers to explicitly opt out of requirement (2) by specifying
173  * RW_READER_STARVEWRITER when acquiring the rwlock.  This (obviously) means
174  * that inifinite readers can starve writers, but it also allows for
175  * multiple readers in the presence of other synchronization primitives
176  * without regard for lock-ordering.  And while certainly odd (and perhaps
177  * unwise), RW_READER_STARVEWRITER can be safely used alongside RW_READER on
178  * the same lock -- RW_READER_STARVEWRITER describes only the act of lock
179  * acquisition with respect to waiting writers, not the lock itself.
180  *
181  * rw_downgrade() follows the same wakeup policy as an exiting writer.
182  *
183  * rw_tryupgrade() has the same failure mode as rw_tryenter() for a
184  * write lock.  Both honor the WRITE_WANTED bit by specification.
185  *
186  * The following rules apply to manipulation of rwlock internal state:
187  *
188  * (1) The rwlock is only modified via the atomic primitives casip()
189  *     and atomic_add_ip().
190  *
191  * (2) The waiters bit and write-wanted bit are only modified under
192  *     turnstile_lookup().  This ensures that the turnstile is consistent
193  *     with the rwlock.
194  *
195  * (3) Waiters receive the lock by direct handoff from the previous
196  *     owner.  Therefore, waiters *always* wake up holding the lock.
197  */
198 
199 /*
200  * The sobj_ops vector exports a set of functions needed when a thread
201  * is asleep on a synchronization object of a given type.
202  */
203 static sobj_ops_t rw_sobj_ops = {
204 	SOBJ_RWLOCK, rw_owner, turnstile_stay_asleep, turnstile_change_pri
205 };
206 
207 /*
208  * If the system panics on an rwlock, save the address of the offending
209  * rwlock in panic_rwlock_addr, and save the contents in panic_rwlock.
210  */
211 static rwlock_impl_t panic_rwlock;
212 static rwlock_impl_t *panic_rwlock_addr;
213 
214 static void
215 rw_panic(char *msg, rwlock_impl_t *lp)
216 {
217 	if (panicstr)
218 		return;
219 
220 	if (atomic_cas_ptr(&panic_rwlock_addr, NULL, lp) == NULL)
221 		panic_rwlock = *lp;
222 
223 	panic("%s, lp=%p wwwh=%lx thread=%p",
224 	    msg, (void *)lp, panic_rwlock.rw_wwwh, (void *)curthread);
225 }
226 
227 /* ARGSUSED */
228 void
229 rw_init(krwlock_t *rwlp, char *name, krw_type_t type, void *arg)
230 {
231 	((rwlock_impl_t *)rwlp)->rw_wwwh = 0;
232 }
233 
234 void
235 rw_destroy(krwlock_t *rwlp)
236 {
237 	rwlock_impl_t *lp = (rwlock_impl_t *)rwlp;
238 
239 	if (lp->rw_wwwh != 0) {
240 		if ((lp->rw_wwwh & RW_DOUBLE_LOCK) == RW_DOUBLE_LOCK)
241 			rw_panic("rw_destroy: lock already destroyed", lp);
242 		else
243 			rw_panic("rw_destroy: lock still active", lp);
244 	}
245 
246 	lp->rw_wwwh = RW_DOUBLE_LOCK;
247 }
248 
249 /*
250  * Verify that an rwlock is held correctly.
251  */
252 static int
253 rw_locked(rwlock_impl_t *lp, krw_t rw)
254 {
255 	uintptr_t old = lp->rw_wwwh;
256 
257 	if (rw == RW_READER || rw == RW_READER_STARVEWRITER)
258 		return ((old & RW_LOCKED) && !(old & RW_WRITE_LOCKED));
259 
260 	if (rw == RW_WRITER)
261 		return ((old & RW_OWNER) == (uintptr_t)curthread);
262 
263 	return (0);
264 }
265 
266 uint_t (*rw_lock_backoff)(uint_t) = NULL;
267 void (*rw_lock_delay)(uint_t) = NULL;
268 
269 /*
270  * Full-service implementation of rw_enter() to handle all the hard cases.
271  * Called from the assembly version if anything complicated is going on.
272  * The only semantic difference between calling rw_enter() and calling
273  * rw_enter_sleep() directly is that we assume the caller has already done
274  * a THREAD_KPRI_REQUEST() in the RW_READER cases.
275  */
276 void
277 rw_enter_sleep(rwlock_impl_t *lp, krw_t rw)
278 {
279 	uintptr_t old, new, lock_value, lock_busy, lock_wait;
280 	hrtime_t sleep_time;
281 	turnstile_t *ts;
282 	uint_t  backoff = 0;
283 	int loop_count = 0;
284 
285 	if (rw == RW_READER) {
286 		lock_value = RW_READ_LOCK;
287 		lock_busy = RW_WRITE_CLAIMED;
288 		lock_wait = RW_HAS_WAITERS;
289 	} else if (rw == RW_READER_STARVEWRITER) {
290 		lock_value = RW_READ_LOCK;
291 		lock_busy = RW_WRITE_LOCKED;
292 		lock_wait = RW_HAS_WAITERS;
293 	} else {
294 		lock_value = RW_WRITE_LOCK(curthread);
295 		lock_busy = (uintptr_t)RW_LOCKED;
296 		lock_wait = RW_HAS_WAITERS | RW_WRITE_WANTED;
297 	}
298 
299 	for (;;) {
300 		if (((old = lp->rw_wwwh) & lock_busy) == 0) {
301 			if (casip(&lp->rw_wwwh, old, old + lock_value) != old) {
302 				if (rw_lock_delay != NULL) {
303 					backoff = rw_lock_backoff(backoff);
304 					rw_lock_delay(backoff);
305 					if (++loop_count == ncpus_online) {
306 						backoff = 0;
307 						loop_count = 0;
308 					}
309 				}
310 				continue;
311 			}
312 			break;
313 		}
314 
315 		if (panicstr)
316 			return;
317 
318 		if ((old & RW_DOUBLE_LOCK) == RW_DOUBLE_LOCK) {
319 			rw_panic("rw_enter: bad rwlock", lp);
320 			return;
321 		}
322 
323 		if ((old & RW_OWNER) == (uintptr_t)curthread) {
324 			rw_panic("recursive rw_enter", lp);
325 			return;
326 		}
327 
328 		ts = turnstile_lookup(lp);
329 
330 		do {
331 			if (((old = lp->rw_wwwh) & lock_busy) == 0)
332 				break;
333 			new = old | lock_wait;
334 		} while (old != new && casip(&lp->rw_wwwh, old, new) != old);
335 
336 		if ((old & lock_busy) == 0) {
337 			/*
338 			 * The lock appears free now; try the dance again
339 			 */
340 			turnstile_exit(lp);
341 			continue;
342 		}
343 
344 		/*
345 		 * We really are going to block.  Bump the stats, and drop
346 		 * kpri if we're a reader.
347 		 */
348 		ASSERT(lp->rw_wwwh & lock_wait);
349 		ASSERT(lp->rw_wwwh & RW_LOCKED);
350 
351 		sleep_time = -gethrtime();
352 		if (rw != RW_WRITER) {
353 			THREAD_KPRI_RELEASE();
354 			CPU_STATS_ADDQ(CPU, sys, rw_rdfails, 1);
355 			(void) turnstile_block(ts, TS_READER_Q, lp,
356 			    &rw_sobj_ops, NULL, NULL);
357 		} else {
358 			CPU_STATS_ADDQ(CPU, sys, rw_wrfails, 1);
359 			(void) turnstile_block(ts, TS_WRITER_Q, lp,
360 			    &rw_sobj_ops, NULL, NULL);
361 		}
362 		sleep_time += gethrtime();
363 
364 		LOCKSTAT_RECORD4(LS_RW_ENTER_BLOCK, lp, sleep_time, rw,
365 		    (old & RW_WRITE_LOCKED) ? 1 : 0,
366 		    old >> RW_HOLD_COUNT_SHIFT);
367 
368 		/*
369 		 * We wake up holding the lock (and having kpri if we're
370 		 * a reader) via direct handoff from the previous owner.
371 		 */
372 		break;
373 	}
374 
375 	ASSERT(rw_locked(lp, rw));
376 
377 	membar_enter();
378 
379 	LOCKSTAT_RECORD(LS_RW_ENTER_ACQUIRE, lp, rw);
380 }
381 
382 /*
383  * Return the number of readers to wake, or zero if we should wake a writer.
384  * Called only by exiting/downgrading writers (readers don't wake readers).
385  */
386 static int
387 rw_readers_to_wake(turnstile_t *ts)
388 {
389 	kthread_t *next_writer = ts->ts_sleepq[TS_WRITER_Q].sq_first;
390 	kthread_t *next_reader = ts->ts_sleepq[TS_READER_Q].sq_first;
391 	pri_t wpri = (next_writer != NULL) ? DISP_PRIO(next_writer) : -1;
392 	int count = 0;
393 
394 	while (next_reader != NULL) {
395 		if (DISP_PRIO(next_reader) < wpri)
396 			break;
397 		next_reader->t_kpri_req++;
398 		next_reader = next_reader->t_link;
399 		count++;
400 	}
401 	return (count);
402 }
403 
404 /*
405  * Full-service implementation of rw_exit() to handle all the hard cases.
406  * Called from the assembly version if anything complicated is going on.
407  * There is no semantic difference between calling rw_exit() and calling
408  * rw_exit_wakeup() directly.
409  */
410 void
411 rw_exit_wakeup(rwlock_impl_t *lp)
412 {
413 	turnstile_t *ts;
414 	uintptr_t old, new, lock_value;
415 	kthread_t *next_writer;
416 	int nreaders;
417 	uint_t  backoff = 0;
418 	int loop_count = 0;
419 
420 	membar_exit();
421 
422 	old = lp->rw_wwwh;
423 	if (old & RW_WRITE_LOCKED) {
424 		if ((old & RW_OWNER) != (uintptr_t)curthread) {
425 			rw_panic("rw_exit: not owner", lp);
426 			lp->rw_wwwh = 0;
427 			return;
428 		}
429 		lock_value = RW_WRITE_LOCK(curthread);
430 	} else {
431 		if ((old & RW_LOCKED) == 0) {
432 			rw_panic("rw_exit: lock not held", lp);
433 			return;
434 		}
435 		lock_value = RW_READ_LOCK;
436 	}
437 
438 	for (;;) {
439 		/*
440 		 * If this is *not* the final exit of a lock with waiters,
441 		 * just drop the lock -- there's nothing tricky going on.
442 		 */
443 		old = lp->rw_wwwh;
444 		new = old - lock_value;
445 		if ((new & (RW_LOCKED | RW_HAS_WAITERS)) != RW_HAS_WAITERS) {
446 			if (casip(&lp->rw_wwwh, old, new) != old) {
447 				if (rw_lock_delay != NULL) {
448 					backoff = rw_lock_backoff(backoff);
449 					rw_lock_delay(backoff);
450 					if (++loop_count == ncpus_online) {
451 						backoff = 0;
452 						loop_count = 0;
453 					}
454 				}
455 				continue;
456 			}
457 			break;
458 		}
459 
460 		/*
461 		 * This appears to be the final exit of a lock with waiters.
462 		 * If we do not have the lock as writer (that is, if this is
463 		 * the last exit of a reader with waiting writers), we will
464 		 * grab the lock as writer to prevent additional readers.
465 		 * (This is required because a reader that is acquiring the
466 		 * lock via RW_READER_STARVEWRITER will not observe the
467 		 * RW_WRITE_WANTED bit -- and we could therefore be racing
468 		 * with such readers here.)
469 		 */
470 		if (!(old & RW_WRITE_LOCKED)) {
471 			new = RW_WRITE_LOCK(curthread) |
472 			    RW_HAS_WAITERS | RW_WRITE_WANTED;
473 
474 			if (casip(&lp->rw_wwwh, old, new) != old)
475 				continue;
476 		}
477 
478 		/*
479 		 * Perform the final exit of a lock that has waiters.
480 		 */
481 		ts = turnstile_lookup(lp);
482 
483 		next_writer = ts->ts_sleepq[TS_WRITER_Q].sq_first;
484 
485 		if ((old & RW_WRITE_LOCKED) &&
486 		    (nreaders = rw_readers_to_wake(ts)) > 0) {
487 			/*
488 			 * Don't drop the lock -- just set the hold count
489 			 * such that we grant the lock to all readers at once.
490 			 */
491 			new = nreaders * RW_READ_LOCK;
492 			if (ts->ts_waiters > nreaders)
493 				new |= RW_HAS_WAITERS;
494 			if (next_writer)
495 				new |= RW_WRITE_WANTED;
496 			lp->rw_wwwh = new;
497 			membar_enter();
498 			turnstile_wakeup(ts, TS_READER_Q, nreaders, NULL);
499 		} else {
500 			/*
501 			 * Don't drop the lock -- just transfer ownership
502 			 * directly to next_writer.  Note that there must
503 			 * be at least one waiting writer, because we get
504 			 * here only if (A) the lock is read-locked or
505 			 * (B) there are no waiting readers.  In case (A),
506 			 * since the lock is read-locked there would be no
507 			 * reason for other readers to have blocked unless
508 			 * the RW_WRITE_WANTED bit was set.  In case (B),
509 			 * since there are waiters but no waiting readers,
510 			 * they must all be waiting writers.
511 			 */
512 			ASSERT(lp->rw_wwwh & RW_WRITE_WANTED);
513 			new = RW_WRITE_LOCK(next_writer);
514 			if (ts->ts_waiters > 1)
515 				new |= RW_HAS_WAITERS;
516 			if (next_writer->t_link)
517 				new |= RW_WRITE_WANTED;
518 			lp->rw_wwwh = new;
519 			membar_enter();
520 			turnstile_wakeup(ts, TS_WRITER_Q, 1, next_writer);
521 		}
522 		break;
523 	}
524 
525 	if (lock_value == RW_READ_LOCK) {
526 		THREAD_KPRI_RELEASE();
527 		LOCKSTAT_RECORD(LS_RW_EXIT_RELEASE, lp, RW_READER);
528 	} else {
529 		LOCKSTAT_RECORD(LS_RW_EXIT_RELEASE, lp, RW_WRITER);
530 	}
531 }
532 
533 int
534 rw_tryenter(krwlock_t *rwlp, krw_t rw)
535 {
536 	rwlock_impl_t *lp = (rwlock_impl_t *)rwlp;
537 	uintptr_t old;
538 
539 	if (rw != RW_WRITER) {
540 		uint_t backoff = 0;
541 		int loop_count = 0;
542 		THREAD_KPRI_REQUEST();
543 		for (;;) {
544 			if ((old = lp->rw_wwwh) & (rw == RW_READER ?
545 			    RW_WRITE_CLAIMED : RW_WRITE_LOCKED)) {
546 				THREAD_KPRI_RELEASE();
547 				return (0);
548 			}
549 			if (casip(&lp->rw_wwwh, old, old + RW_READ_LOCK) == old)
550 				break;
551 			if (rw_lock_delay != NULL) {
552 				backoff = rw_lock_backoff(backoff);
553 				rw_lock_delay(backoff);
554 				if (++loop_count == ncpus_online) {
555 					backoff = 0;
556 					loop_count = 0;
557 				}
558 			}
559 		}
560 		LOCKSTAT_RECORD(LS_RW_TRYENTER_ACQUIRE, lp, rw);
561 	} else {
562 		if (casip(&lp->rw_wwwh, 0, RW_WRITE_LOCK(curthread)) != 0)
563 			return (0);
564 		LOCKSTAT_RECORD(LS_RW_TRYENTER_ACQUIRE, lp, rw);
565 	}
566 	ASSERT(rw_locked(lp, rw));
567 	membar_enter();
568 	return (1);
569 }
570 
571 void
572 rw_downgrade(krwlock_t *rwlp)
573 {
574 	rwlock_impl_t *lp = (rwlock_impl_t *)rwlp;
575 
576 	THREAD_KPRI_REQUEST();
577 	membar_exit();
578 
579 	if ((lp->rw_wwwh & RW_OWNER) != (uintptr_t)curthread) {
580 		rw_panic("rw_downgrade: not owner", lp);
581 		return;
582 	}
583 
584 	if (atomic_add_ip_nv(&lp->rw_wwwh,
585 	    RW_READ_LOCK - RW_WRITE_LOCK(curthread)) & RW_HAS_WAITERS) {
586 		turnstile_t *ts = turnstile_lookup(lp);
587 		int nreaders = rw_readers_to_wake(ts);
588 		if (nreaders > 0) {
589 			uintptr_t delta = nreaders * RW_READ_LOCK;
590 			if (ts->ts_waiters == nreaders)
591 				delta -= RW_HAS_WAITERS;
592 			atomic_add_ip(&lp->rw_wwwh, delta);
593 		}
594 		turnstile_wakeup(ts, TS_READER_Q, nreaders, NULL);
595 	}
596 	ASSERT(rw_locked(lp, RW_READER));
597 	LOCKSTAT_RECORD0(LS_RW_DOWNGRADE_DOWNGRADE, lp);
598 }
599 
600 int
601 rw_tryupgrade(krwlock_t *rwlp)
602 {
603 	rwlock_impl_t *lp = (rwlock_impl_t *)rwlp;
604 	uintptr_t old, new;
605 
606 	ASSERT(rw_locked(lp, RW_READER));
607 
608 	do {
609 		if (((old = lp->rw_wwwh) & ~RW_HAS_WAITERS) != RW_READ_LOCK)
610 			return (0);
611 		new = old + RW_WRITE_LOCK(curthread) - RW_READ_LOCK;
612 	} while (casip(&lp->rw_wwwh, old, new) != old);
613 
614 	membar_enter();
615 	THREAD_KPRI_RELEASE();
616 	LOCKSTAT_RECORD0(LS_RW_TRYUPGRADE_UPGRADE, lp);
617 	ASSERT(rw_locked(lp, RW_WRITER));
618 	return (1);
619 }
620 
621 int
622 rw_read_held(krwlock_t *rwlp)
623 {
624 	uintptr_t tmp;
625 
626 	return (_RW_READ_HELD(rwlp, tmp));
627 }
628 
629 int
630 rw_write_held(krwlock_t *rwlp)
631 {
632 	return (_RW_WRITE_HELD(rwlp));
633 }
634 
635 int
636 rw_lock_held(krwlock_t *rwlp)
637 {
638 	return (_RW_LOCK_HELD(rwlp));
639 }
640 
641 /*
642  * Like rw_read_held(), but ASSERTs that the lock is currently held
643  */
644 int
645 rw_read_locked(krwlock_t *rwlp)
646 {
647 	uintptr_t old = ((rwlock_impl_t *)rwlp)->rw_wwwh;
648 
649 	ASSERT(old & RW_LOCKED);
650 	return ((old & RW_LOCKED) && !(old & RW_WRITE_LOCKED));
651 }
652 
653 /*
654  * Returns non-zero if the lock is either held or desired by a writer
655  */
656 int
657 rw_iswriter(krwlock_t *rwlp)
658 {
659 	return (_RW_ISWRITER(rwlp));
660 }
661 
662 kthread_t *
663 rw_owner(krwlock_t *rwlp)
664 {
665 	uintptr_t old = ((rwlock_impl_t *)rwlp)->rw_wwwh;
666 
667 	return ((old & RW_WRITE_LOCKED) ? (kthread_t *)(old & RW_OWNER) : NULL);
668 }
669