xref: /illumos-gate/usr/src/uts/common/os/rwlock.c (revision 69a119caa6570c7077699161b7c28b6ee9f8b0f4)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #include <sys/param.h>
27 #include <sys/thread.h>
28 #include <sys/cmn_err.h>
29 #include <sys/debug.h>
30 #include <sys/cpuvar.h>
31 #include <sys/sobject.h>
32 #include <sys/turnstile.h>
33 #include <sys/rwlock.h>
34 #include <sys/rwlock_impl.h>
35 #include <sys/atomic.h>
36 #include <sys/lockstat.h>
37 
38 /*
39  * Big Theory Statement for readers/writer locking primitives.
40  *
41  * An rwlock provides exclusive access to a single thread ("writer") or
42  * concurrent access to multiple threads ("readers").  See rwlock(9F)
43  * for a full description of the interfaces and programming model.
44  * The rest of this comment describes the implementation.
45  *
46  * An rwlock is a single word with the following structure:
47  *
48  *	---------------------------------------------------------------------
49  *	| OWNER (writer) or HOLD COUNT (readers)   | WRLOCK | WRWANT | WAIT |
50  *	---------------------------------------------------------------------
51  *			63 / 31 .. 3			2	1	0
52  *
53  * The waiters bit (0) indicates whether any threads are blocked waiting
54  * for the lock.  The write-wanted bit (1) indicates whether any threads
55  * are blocked waiting for write access.  The write-locked bit (2) indicates
56  * whether the lock is held by a writer, which determines whether the upper
57  * bits (3..31 in ILP32, 3..63 in LP64) should be interpreted as the owner
58  * (thread pointer) or the hold count (number of readers).
59  *
60  * In the absence of any contention, a writer gets the lock by setting
61  * this word to (curthread | RW_WRITE_LOCKED); a reader gets the lock
62  * by incrementing the hold count (i.e. adding 8, aka RW_READ_LOCK).
63  *
64  * A writer will fail to acquire the lock if any other thread owns it.
65  * A reader will fail if the lock is either owned or wanted by a writer.
66  * rw_tryenter() returns 0 in these cases; rw_enter() blocks until the
67  * lock becomes available.
68  *
69  * When a thread blocks it acquires the rwlock's hashed turnstile lock and
70  * attempts to set RW_HAS_WAITERS (and RW_WRITE_WANTED in the writer case)
71  * atomically *only if the lock still appears busy*.  A thread must never
72  * accidentally block for an available lock since there would be no owner
73  * to awaken it.  casip() provides the required atomicity.  Once casip()
74  * succeeds, the decision to block becomes final and irreversible.  The
75  * thread will not become runnable again until it has been granted ownership
76  * of the lock via direct handoff from a former owner as described below.
77  *
78  * In the absence of any waiters, rw_exit() just clears the lock (if it
79  * is write-locked) or decrements the hold count (if it is read-locked).
80  * Note that even if waiters are present, decrementing the hold count
81  * to a non-zero value requires no special action since the lock is still
82  * held by at least one other thread.
83  *
84  * On the "final exit" (transition to unheld state) of a lock with waiters,
85  * rw_exit_wakeup() grabs the turnstile lock and transfers ownership directly
86  * to the next writer or set of readers.  There are several advantages to this
87  * approach: (1) it closes all windows for priority inversion (when a new
88  * writer has grabbed the lock but has not yet inherited from blocked readers);
89  * (2) it prevents starvation of equal-priority threads by granting the lock
90  * in FIFO order; (3) it eliminates the need for a write-wanted count -- a
91  * single bit suffices because the lock remains held until all waiting
92  * writers are gone; (4) when we awaken N readers we can perform a single
93  * "atomic_add(&x, N)" to set the total hold count rather than having all N
94  * threads fight for the cache to perform an "atomic_add(&x, 1)" upon wakeup.
95  *
96  * The most interesting policy decision in rw_exit_wakeup() is which thread
97  * to wake.  Starvation is always possible with priority-based scheduling,
98  * but any sane wakeup policy should at least satisfy these requirements:
99  *
100  * (1) The highest-priority thread in the system should not starve.
101  * (2) The highest-priority writer should not starve.
102  * (3) No writer should starve due to lower-priority threads.
103  * (4) No reader should starve due to lower-priority writers.
104  * (5) If all threads have equal priority, none of them should starve.
105  *
106  * We used to employ a writers-always-win policy, which doesn't even
107  * satisfy (1): a steady stream of low-priority writers can starve out
108  * a real-time reader!  This is clearly a broken policy -- it violates
109  * (1), (4), and (5) -- but it's how rwlocks always used to behave.
110  *
111  * A round-robin policy (exiting readers grant the lock to blocked writers
112  * and vice versa) satisfies all but (3): a single high-priority writer
113  * and many low-priority readers can starve out medium-priority writers.
114  *
115  * A strict priority policy (grant the lock to the highest priority blocked
116  * thread) satisfies everything but (2): a steady stream of high-priority
117  * readers can permanently starve the highest-priority writer.
118  *
119  * The reason we care about (2) is that it's important to process writers
120  * reasonably quickly -- even if they're low priority -- because their very
121  * presence causes all readers to take the slow (blocking) path through this
122  * code.  There is also a general sense that writers deserve some degree of
123  * deference because they're updating the data upon which all readers act.
124  * Presumably this data should not be allowed to become arbitrarily stale
125  * due to writer starvation.  Finally, it seems reasonable to level the
126  * playing field a bit to compensate for the fact that it's so much harder
127  * for a writer to get in when there are already many readers present.
128  *
129  * A hybrid of round-robin and strict priority can be made to satisfy
130  * all five criteria.  In this "writer priority policy" exiting readers
131  * always grant the lock to waiting writers, but exiting writers only
132  * grant the lock to readers of the same or higher priority than the
133  * highest-priority blocked writer.  Thus requirement (2) is satisfied,
134  * necessarily, by a willful act of priority inversion: an exiting reader
135  * will grant the lock to a blocked writer even if there are blocked
136  * readers of higher priority.  The situation is mitigated by the fact
137  * that writers always inherit priority from blocked readers, and the
138  * writer will awaken those readers as soon as it exits the lock.
139  *
140  * rw_downgrade() follows the same wakeup policy as an exiting writer.
141  *
142  * rw_tryupgrade() has the same failure mode as rw_tryenter() for a
143  * write lock.  Both honor the WRITE_WANTED bit by specification.
144  *
145  * The following rules apply to manipulation of rwlock internal state:
146  *
147  * (1) The rwlock is only modified via the atomic primitives casip()
148  *     and atomic_add_ip().
149  *
150  * (2) The waiters bit and write-wanted bit are only modified under
151  *     turnstile_lookup().  This ensures that the turnstile is consistent
152  *     with the rwlock.
153  *
154  * (3) Waiters receive the lock by direct handoff from the previous
155  *     owner.  Therefore, waiters *always* wake up holding the lock.
156  */
157 
158 /*
159  * The sobj_ops vector exports a set of functions needed when a thread
160  * is asleep on a synchronization object of a given type.
161  */
162 static sobj_ops_t rw_sobj_ops = {
163 	SOBJ_RWLOCK, rw_owner, turnstile_stay_asleep, turnstile_change_pri
164 };
165 
166 /*
167  * If the system panics on an rwlock, save the address of the offending
168  * rwlock in panic_rwlock_addr, and save the contents in panic_rwlock.
169  */
170 static rwlock_impl_t panic_rwlock;
171 static rwlock_impl_t *panic_rwlock_addr;
172 
173 static void
174 rw_panic(char *msg, rwlock_impl_t *lp)
175 {
176 	if (panicstr)
177 		return;
178 
179 	if (casptr(&panic_rwlock_addr, NULL, lp) == NULL)
180 		panic_rwlock = *lp;
181 
182 	panic("%s, lp=%p wwwh=%lx thread=%p",
183 	    msg, (void *)lp, panic_rwlock.rw_wwwh, (void *)curthread);
184 }
185 
186 /* ARGSUSED */
187 void
188 rw_init(krwlock_t *rwlp, char *name, krw_type_t type, void *arg)
189 {
190 	((rwlock_impl_t *)rwlp)->rw_wwwh = 0;
191 }
192 
193 void
194 rw_destroy(krwlock_t *rwlp)
195 {
196 	rwlock_impl_t *lp = (rwlock_impl_t *)rwlp;
197 
198 	if (lp->rw_wwwh != 0) {
199 		if ((lp->rw_wwwh & RW_DOUBLE_LOCK) == RW_DOUBLE_LOCK)
200 			rw_panic("rw_destroy: lock already destroyed", lp);
201 		else
202 			rw_panic("rw_destroy: lock still active", lp);
203 	}
204 
205 	lp->rw_wwwh = RW_DOUBLE_LOCK;
206 }
207 
208 /*
209  * Verify that an rwlock is held correctly.
210  */
211 static int
212 rw_locked(rwlock_impl_t *lp, krw_t rw)
213 {
214 	uintptr_t old = lp->rw_wwwh;
215 
216 	if (rw == RW_READER)
217 		return ((old & RW_LOCKED) && !(old & RW_WRITE_LOCKED));
218 
219 	if (rw == RW_WRITER)
220 		return ((old & RW_OWNER) == (uintptr_t)curthread);
221 
222 	return (0);
223 }
224 
225 uint_t (*rw_lock_backoff)(uint_t) = NULL;
226 void (*rw_lock_delay)(uint_t) = NULL;
227 
228 /*
229  * Full-service implementation of rw_enter() to handle all the hard cases.
230  * Called from the assembly version if anything complicated is going on.
231  * The only semantic difference between calling rw_enter() and calling
232  * rw_enter_sleep() directly is that we assume the caller has already done
233  * a THREAD_KPRI_REQUEST() in the RW_READER case.
234  */
235 void
236 rw_enter_sleep(rwlock_impl_t *lp, krw_t rw)
237 {
238 	uintptr_t old, new, lock_value, lock_busy, lock_wait;
239 	hrtime_t sleep_time;
240 	turnstile_t *ts;
241 	uint_t  backoff = 0;
242 	int loop_count = 0;
243 
244 	if (rw == RW_READER) {
245 		lock_value = RW_READ_LOCK;
246 		lock_busy = RW_WRITE_CLAIMED;
247 		lock_wait = RW_HAS_WAITERS;
248 	} else {
249 		lock_value = RW_WRITE_LOCK(curthread);
250 		lock_busy = (uintptr_t)RW_LOCKED;
251 		lock_wait = RW_HAS_WAITERS | RW_WRITE_WANTED;
252 	}
253 
254 	for (;;) {
255 		if (((old = lp->rw_wwwh) & lock_busy) == 0) {
256 			if (casip(&lp->rw_wwwh, old, old + lock_value) != old) {
257 				if (rw_lock_delay != NULL) {
258 					backoff = rw_lock_backoff(backoff);
259 					rw_lock_delay(backoff);
260 					if (++loop_count == ncpus_online) {
261 						backoff = 0;
262 						loop_count = 0;
263 					}
264 				}
265 				continue;
266 			}
267 			break;
268 		}
269 
270 		if (panicstr)
271 			return;
272 
273 		if ((old & RW_DOUBLE_LOCK) == RW_DOUBLE_LOCK) {
274 			rw_panic("rw_enter: bad rwlock", lp);
275 			return;
276 		}
277 
278 		if ((old & RW_OWNER) == (uintptr_t)curthread) {
279 			rw_panic("recursive rw_enter", lp);
280 			return;
281 		}
282 
283 		ts = turnstile_lookup(lp);
284 
285 		do {
286 			if (((old = lp->rw_wwwh) & lock_busy) == 0)
287 				break;
288 			new = old | lock_wait;
289 		} while (old != new && casip(&lp->rw_wwwh, old, new) != old);
290 
291 		if ((old & lock_busy) == 0) {
292 			/*
293 			 * The lock appears free now; try the dance again
294 			 */
295 			turnstile_exit(lp);
296 			continue;
297 		}
298 
299 		/*
300 		 * We really are going to block.  Bump the stats, and drop
301 		 * kpri if we're a reader.
302 		 */
303 		ASSERT(lp->rw_wwwh & lock_wait);
304 		ASSERT(lp->rw_wwwh & RW_LOCKED);
305 
306 		sleep_time = -gethrtime();
307 		if (rw == RW_READER) {
308 			THREAD_KPRI_RELEASE();
309 			CPU_STATS_ADDQ(CPU, sys, rw_rdfails, 1);
310 			(void) turnstile_block(ts, TS_READER_Q, lp,
311 			    &rw_sobj_ops, NULL, NULL);
312 		} else {
313 			CPU_STATS_ADDQ(CPU, sys, rw_wrfails, 1);
314 			(void) turnstile_block(ts, TS_WRITER_Q, lp,
315 			    &rw_sobj_ops, NULL, NULL);
316 		}
317 		sleep_time += gethrtime();
318 
319 		LOCKSTAT_RECORD4(LS_RW_ENTER_BLOCK, lp, sleep_time, rw,
320 		    (old & RW_WRITE_LOCKED) ? 1 : 0,
321 		    old >> RW_HOLD_COUNT_SHIFT);
322 
323 		/*
324 		 * We wake up holding the lock (and having kpri if we're
325 		 * a reader) via direct handoff from the previous owner.
326 		 */
327 		break;
328 	}
329 
330 	ASSERT(rw_locked(lp, rw));
331 
332 	membar_enter();
333 
334 	LOCKSTAT_RECORD(LS_RW_ENTER_ACQUIRE, lp, rw);
335 }
336 
337 /*
338  * Return the number of readers to wake, or zero if we should wake a writer.
339  * Called only by exiting/downgrading writers (readers don't wake readers).
340  */
341 static int
342 rw_readers_to_wake(turnstile_t *ts)
343 {
344 	kthread_t *next_writer = ts->ts_sleepq[TS_WRITER_Q].sq_first;
345 	kthread_t *next_reader = ts->ts_sleepq[TS_READER_Q].sq_first;
346 	pri_t wpri = (next_writer != NULL) ? DISP_PRIO(next_writer) : -1;
347 	int count = 0;
348 
349 	while (next_reader != NULL) {
350 		if (DISP_PRIO(next_reader) < wpri)
351 			break;
352 		next_reader->t_kpri_req++;
353 		next_reader = next_reader->t_link;
354 		count++;
355 	}
356 	return (count);
357 }
358 
359 /*
360  * Full-service implementation of rw_exit() to handle all the hard cases.
361  * Called from the assembly version if anything complicated is going on.
362  * There is no semantic difference between calling rw_exit() and calling
363  * rw_exit_wakeup() directly.
364  */
365 void
366 rw_exit_wakeup(rwlock_impl_t *lp)
367 {
368 	turnstile_t *ts;
369 	uintptr_t old, new, lock_value;
370 	kthread_t *next_writer;
371 	int nreaders;
372 	uint_t  backoff = 0;
373 	int loop_count = 0;
374 
375 	membar_exit();
376 
377 	old = lp->rw_wwwh;
378 	if (old & RW_WRITE_LOCKED) {
379 		if ((old & RW_OWNER) != (uintptr_t)curthread) {
380 			rw_panic("rw_exit: not owner", lp);
381 			lp->rw_wwwh = 0;
382 			return;
383 		}
384 		lock_value = RW_WRITE_LOCK(curthread);
385 	} else {
386 		if ((old & RW_LOCKED) == 0) {
387 			rw_panic("rw_exit: lock not held", lp);
388 			return;
389 		}
390 		lock_value = RW_READ_LOCK;
391 	}
392 
393 	for (;;) {
394 		/*
395 		 * If this is *not* the final exit of a lock with waiters,
396 		 * just drop the lock -- there's nothing tricky going on.
397 		 */
398 		old = lp->rw_wwwh;
399 		new = old - lock_value;
400 		if ((new & (RW_LOCKED | RW_HAS_WAITERS)) != RW_HAS_WAITERS) {
401 			if (casip(&lp->rw_wwwh, old, new) != old) {
402 				if (rw_lock_delay != NULL) {
403 					backoff = rw_lock_backoff(backoff);
404 					rw_lock_delay(backoff);
405 					if (++loop_count == ncpus_online) {
406 						backoff = 0;
407 						loop_count = 0;
408 					}
409 				}
410 				continue;
411 			}
412 			break;
413 		}
414 
415 		/*
416 		 * Perform the final exit of a lock that has waiters.
417 		 */
418 		ts = turnstile_lookup(lp);
419 
420 		next_writer = ts->ts_sleepq[TS_WRITER_Q].sq_first;
421 
422 		if ((old & RW_WRITE_LOCKED) &&
423 		    (nreaders = rw_readers_to_wake(ts)) > 0) {
424 			/*
425 			 * Don't drop the lock -- just set the hold count
426 			 * such that we grant the lock to all readers at once.
427 			 */
428 			new = nreaders * RW_READ_LOCK;
429 			if (ts->ts_waiters > nreaders)
430 				new |= RW_HAS_WAITERS;
431 			if (next_writer)
432 				new |= RW_WRITE_WANTED;
433 			lp->rw_wwwh = new;
434 			membar_enter();
435 			turnstile_wakeup(ts, TS_READER_Q, nreaders, NULL);
436 		} else {
437 			/*
438 			 * Don't drop the lock -- just transfer ownership
439 			 * directly to next_writer.  Note that there must
440 			 * be at least one waiting writer, because we get
441 			 * here only if (A) the lock is read-locked or
442 			 * (B) there are no waiting readers.  In case (A),
443 			 * since the lock is read-locked there would be no
444 			 * reason for other readers to have blocked unless
445 			 * the RW_WRITE_WANTED bit was set.  In case (B),
446 			 * since there are waiters but no waiting readers,
447 			 * they must all be waiting writers.
448 			 */
449 			ASSERT(lp->rw_wwwh & RW_WRITE_WANTED);
450 			new = RW_WRITE_LOCK(next_writer);
451 			if (ts->ts_waiters > 1)
452 				new |= RW_HAS_WAITERS;
453 			if (next_writer->t_link)
454 				new |= RW_WRITE_WANTED;
455 			lp->rw_wwwh = new;
456 			membar_enter();
457 			turnstile_wakeup(ts, TS_WRITER_Q, 1, next_writer);
458 		}
459 		break;
460 	}
461 
462 	if (lock_value == RW_READ_LOCK) {
463 		THREAD_KPRI_RELEASE();
464 		LOCKSTAT_RECORD(LS_RW_EXIT_RELEASE, lp, RW_READER);
465 	} else {
466 		LOCKSTAT_RECORD(LS_RW_EXIT_RELEASE, lp, RW_WRITER);
467 	}
468 }
469 
470 int
471 rw_tryenter(krwlock_t *rwlp, krw_t rw)
472 {
473 	rwlock_impl_t *lp = (rwlock_impl_t *)rwlp;
474 	uintptr_t old;
475 
476 	if (rw == RW_READER) {
477 		uint_t backoff = 0;
478 		int loop_count = 0;
479 		THREAD_KPRI_REQUEST();
480 		for (;;) {
481 			if ((old = lp->rw_wwwh) & RW_WRITE_CLAIMED) {
482 				THREAD_KPRI_RELEASE();
483 				return (0);
484 			}
485 			if (casip(&lp->rw_wwwh, old, old + RW_READ_LOCK) == old)
486 				break;
487 			if (rw_lock_delay != NULL) {
488 				backoff = rw_lock_backoff(backoff);
489 				rw_lock_delay(backoff);
490 				if (++loop_count == ncpus_online) {
491 					backoff = 0;
492 					loop_count = 0;
493 				}
494 			}
495 		}
496 		LOCKSTAT_RECORD(LS_RW_TRYENTER_ACQUIRE, lp, rw);
497 	} else {
498 		if (casip(&lp->rw_wwwh, 0, RW_WRITE_LOCK(curthread)) != 0)
499 			return (0);
500 		LOCKSTAT_RECORD(LS_RW_TRYENTER_ACQUIRE, lp, rw);
501 	}
502 	ASSERT(rw_locked(lp, rw));
503 	membar_enter();
504 	return (1);
505 }
506 
507 void
508 rw_downgrade(krwlock_t *rwlp)
509 {
510 	rwlock_impl_t *lp = (rwlock_impl_t *)rwlp;
511 
512 	THREAD_KPRI_REQUEST();
513 	membar_exit();
514 
515 	if ((lp->rw_wwwh & RW_OWNER) != (uintptr_t)curthread) {
516 		rw_panic("rw_downgrade: not owner", lp);
517 		return;
518 	}
519 
520 	if (atomic_add_ip_nv(&lp->rw_wwwh,
521 	    RW_READ_LOCK - RW_WRITE_LOCK(curthread)) & RW_HAS_WAITERS) {
522 		turnstile_t *ts = turnstile_lookup(lp);
523 		int nreaders = rw_readers_to_wake(ts);
524 		if (nreaders > 0) {
525 			uintptr_t delta = nreaders * RW_READ_LOCK;
526 			if (ts->ts_waiters == nreaders)
527 				delta -= RW_HAS_WAITERS;
528 			atomic_add_ip(&lp->rw_wwwh, delta);
529 		}
530 		turnstile_wakeup(ts, TS_READER_Q, nreaders, NULL);
531 	}
532 	ASSERT(rw_locked(lp, RW_READER));
533 	LOCKSTAT_RECORD0(LS_RW_DOWNGRADE_DOWNGRADE, lp);
534 }
535 
536 int
537 rw_tryupgrade(krwlock_t *rwlp)
538 {
539 	rwlock_impl_t *lp = (rwlock_impl_t *)rwlp;
540 	uintptr_t old, new;
541 
542 	ASSERT(rw_locked(lp, RW_READER));
543 
544 	do {
545 		if (((old = lp->rw_wwwh) & ~RW_HAS_WAITERS) != RW_READ_LOCK)
546 			return (0);
547 		new = old + RW_WRITE_LOCK(curthread) - RW_READ_LOCK;
548 	} while (casip(&lp->rw_wwwh, old, new) != old);
549 
550 	membar_enter();
551 	THREAD_KPRI_RELEASE();
552 	LOCKSTAT_RECORD0(LS_RW_TRYUPGRADE_UPGRADE, lp);
553 	ASSERT(rw_locked(lp, RW_WRITER));
554 	return (1);
555 }
556 
557 int
558 rw_read_held(krwlock_t *rwlp)
559 {
560 	uintptr_t tmp;
561 
562 	return (_RW_READ_HELD(rwlp, tmp));
563 }
564 
565 int
566 rw_write_held(krwlock_t *rwlp)
567 {
568 	return (_RW_WRITE_HELD(rwlp));
569 }
570 
571 int
572 rw_lock_held(krwlock_t *rwlp)
573 {
574 	return (_RW_LOCK_HELD(rwlp));
575 }
576 
577 /*
578  * Like rw_read_held(), but ASSERTs that the lock is currently held
579  */
580 int
581 rw_read_locked(krwlock_t *rwlp)
582 {
583 	uintptr_t old = ((rwlock_impl_t *)rwlp)->rw_wwwh;
584 
585 	ASSERT(old & RW_LOCKED);
586 	return ((old & RW_LOCKED) && !(old & RW_WRITE_LOCKED));
587 }
588 
589 /*
590  * Returns non-zero if the lock is either held or desired by a writer
591  */
592 int
593 rw_iswriter(krwlock_t *rwlp)
594 {
595 	return (_RW_ISWRITER(rwlp));
596 }
597 
598 kthread_t *
599 rw_owner(krwlock_t *rwlp)
600 {
601 	uintptr_t old = ((rwlock_impl_t *)rwlp)->rw_wwwh;
602 
603 	return ((old & RW_WRITE_LOCKED) ? (kthread_t *)(old & RW_OWNER) : NULL);
604 }
605