xref: /illumos-gate/usr/src/uts/common/os/rwlock.c (revision 99e6869ff7df020d73a2a959aafdf73d3e7b31ca)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 #include <sys/param.h>
29 #include <sys/thread.h>
30 #include <sys/cmn_err.h>
31 #include <sys/debug.h>
32 #include <sys/cpuvar.h>
33 #include <sys/sobject.h>
34 #include <sys/turnstile.h>
35 #include <sys/rwlock.h>
36 #include <sys/rwlock_impl.h>
37 #include <sys/atomic.h>
38 #include <sys/lockstat.h>
39 
40 /*
41  * Big Theory Statement for readers/writer locking primitives.
42  *
43  * An rwlock provides exclusive access to a single thread ("writer") or
44  * concurrent access to multiple threads ("readers").  See rwlock(9F)
45  * for a full description of the interfaces and programming model.
46  * The rest of this comment describes the implementation.
47  *
48  * An rwlock is a single word with the following structure:
49  *
50  *	---------------------------------------------------------------------
51  *	| OWNER (writer) or HOLD COUNT (readers)   | WRLOCK | WRWANT | WAIT |
52  *	---------------------------------------------------------------------
53  *			63 / 31 .. 3			2	1	0
54  *
55  * The waiters bit (0) indicates whether any threads are blocked waiting
56  * for the lock.  The write-wanted bit (1) indicates whether any threads
57  * are blocked waiting for write access.  The write-locked bit (2) indicates
58  * whether the lock is held by a writer, which determines whether the upper
59  * bits (3..31 in ILP32, 3..63 in LP64) should be interpreted as the owner
60  * (thread pointer) or the hold count (number of readers).
61  *
62  * In the absence of any contention, a writer gets the lock by setting
63  * this word to (curthread | RW_WRITE_LOCKED); a reader gets the lock
64  * by incrementing the hold count (i.e. adding 8, aka RW_READ_LOCK).
65  *
66  * A writer will fail to acquire the lock if any other thread owns it.
67  * A reader will fail if the lock is either owned or wanted by a writer.
68  * rw_tryenter() returns 0 in these cases; rw_enter() blocks until the
69  * lock becomes available.
70  *
71  * When a thread blocks it acquires the rwlock's hashed turnstile lock and
72  * attempts to set RW_HAS_WAITERS (and RW_WRITE_WANTED in the writer case)
73  * atomically *only if the lock still appears busy*.  A thread must never
74  * accidentally block for an available lock since there would be no owner
75  * to awaken it.  casip() provides the required atomicity.  Once casip()
76  * succeeds, the decision to block becomes final and irreversible.  The
77  * thread will not become runnable again until it has been granted ownership
78  * of the lock via direct handoff from a former owner as described below.
79  *
80  * In the absence of any waiters, rw_exit() just clears the lock (if it
81  * is write-locked) or decrements the hold count (if it is read-locked).
82  * Note that even if waiters are present, decrementing the hold count
83  * to a non-zero value requires no special action since the lock is still
84  * held by at least one other thread.
85  *
86  * On the "final exit" (transition to unheld state) of a lock with waiters,
87  * rw_exit_wakeup() grabs the turnstile lock and transfers ownership directly
88  * to the next writer or set of readers.  There are several advantages to this
89  * approach: (1) it closes all windows for priority inversion (when a new
90  * writer has grabbed the lock but has not yet inherited from blocked readers);
91  * (2) it prevents starvation of equal-priority threads by granting the lock
92  * in FIFO order; (3) it eliminates the need for a write-wanted count -- a
93  * single bit suffices because the lock remains held until all waiting
94  * writers are gone; (4) when we awaken N readers we can perform a single
95  * "atomic_add(&x, N)" to set the total hold count rather than having all N
96  * threads fight for the cache to perform an "atomic_add(&x, 1)" upon wakeup.
97  *
98  * The most interesting policy decision in rw_exit_wakeup() is which thread
99  * to wake.  Starvation is always possible with priority-based scheduling,
100  * but any sane wakeup policy should at least satisfy these requirements:
101  *
102  * (1) The highest-priority thread in the system should not starve.
103  * (2) The highest-priority writer should not starve.
104  * (3) No writer should starve due to lower-priority threads.
105  * (4) No reader should starve due to lower-priority writers.
106  * (5) If all threads have equal priority, none of them should starve.
107  *
108  * We used to employ a writers-always-win policy, which doesn't even
109  * satisfy (1): a steady stream of low-priority writers can starve out
110  * a real-time reader!  This is clearly a broken policy -- it violates
111  * (1), (4), and (5) -- but it's how rwlocks always used to behave.
112  *
113  * A round-robin policy (exiting readers grant the lock to blocked writers
114  * and vice versa) satisfies all but (3): a single high-priority writer
115  * and many low-priority readers can starve out medium-priority writers.
116  *
117  * A strict priority policy (grant the lock to the highest priority blocked
118  * thread) satisfies everything but (2): a steady stream of high-priority
119  * readers can permanently starve the highest-priority writer.
120  *
121  * The reason we care about (2) is that it's important to process writers
122  * reasonably quickly -- even if they're low priority -- because their very
123  * presence causes all readers to take the slow (blocking) path through this
124  * code.  There is also a general sense that writers deserve some degree of
125  * deference because they're updating the data upon which all readers act.
126  * Presumably this data should not be allowed to become arbitrarily stale
127  * due to writer starvation.  Finally, it seems reasonable to level the
128  * playing field a bit to compensate for the fact that it's so much harder
129  * for a writer to get in when there are already many readers present.
130  *
131  * A hybrid of round-robin and strict priority can be made to satisfy
132  * all five criteria.  In this "writer priority policy" exiting readers
133  * always grant the lock to waiting writers, but exiting writers only
134  * grant the lock to readers of the same or higher priority than the
135  * highest-priority blocked writer.  Thus requirement (2) is satisfied,
136  * necessarily, by a willful act of priority inversion: an exiting reader
137  * will grant the lock to a blocked writer even if there are blocked
138  * readers of higher priority.  The situation is mitigated by the fact
139  * that writers always inherit priority from blocked readers, and the
140  * writer will awaken those readers as soon as it exits the lock.
141  *
142  * rw_downgrade() follows the same wakeup policy as an exiting writer.
143  *
144  * rw_tryupgrade() has the same failure mode as rw_tryenter() for a
145  * write lock.  Both honor the WRITE_WANTED bit by specification.
146  *
147  * The following rules apply to manipulation of rwlock internal state:
148  *
149  * (1) The rwlock is only modified via the atomic primitives casip()
150  *     and atomic_add_ip().
151  *
152  * (2) The waiters bit and write-wanted bit are only modified under
153  *     turnstile_lookup().  This ensures that the turnstile is consistent
154  *     with the rwlock.
155  *
156  * (3) Waiters receive the lock by direct handoff from the previous
157  *     owner.  Therefore, waiters *always* wake up holding the lock.
158  */
159 
160 /*
161  * The sobj_ops vector exports a set of functions needed when a thread
162  * is asleep on a synchronization object of a given type.
163  */
164 static sobj_ops_t rw_sobj_ops = {
165 	SOBJ_RWLOCK, rw_owner, turnstile_stay_asleep, turnstile_change_pri
166 };
167 
168 /*
169  * If the system panics on an rwlock, save the address of the offending
170  * rwlock in panic_rwlock_addr, and save the contents in panic_rwlock.
171  */
172 static rwlock_impl_t panic_rwlock;
173 static rwlock_impl_t *panic_rwlock_addr;
174 
175 static void
176 rw_panic(char *msg, rwlock_impl_t *lp)
177 {
178 	if (panicstr)
179 		return;
180 
181 	if (casptr(&panic_rwlock_addr, NULL, lp) == NULL)
182 		panic_rwlock = *lp;
183 
184 	panic("%s, lp=%p wwwh=%lx thread=%p",
185 	    msg, lp, panic_rwlock.rw_wwwh, curthread);
186 }
187 
188 /* ARGSUSED */
189 void
190 rw_init(krwlock_t *rwlp, char *name, krw_type_t type, void *arg)
191 {
192 	((rwlock_impl_t *)rwlp)->rw_wwwh = 0;
193 }
194 
195 void
196 rw_destroy(krwlock_t *rwlp)
197 {
198 	rwlock_impl_t *lp = (rwlock_impl_t *)rwlp;
199 
200 	if (lp->rw_wwwh != 0) {
201 		if ((lp->rw_wwwh & RW_DOUBLE_LOCK) == RW_DOUBLE_LOCK)
202 			rw_panic("rw_destroy: lock already destroyed", lp);
203 		else
204 			rw_panic("rw_destroy: lock still active", lp);
205 	}
206 
207 	lp->rw_wwwh = RW_DOUBLE_LOCK;
208 }
209 
210 /*
211  * Verify that an rwlock is held correctly.
212  */
213 static int
214 rw_locked(rwlock_impl_t *lp, krw_t rw)
215 {
216 	uintptr_t old = lp->rw_wwwh;
217 
218 	if (rw == RW_READER)
219 		return ((old & RW_LOCKED) && !(old & RW_WRITE_LOCKED));
220 
221 	if (rw == RW_WRITER)
222 		return ((old & RW_OWNER) == (uintptr_t)curthread);
223 
224 	return (0);
225 }
226 
227 uint_t (*rw_lock_backoff)(uint_t) = NULL;
228 void (*rw_lock_delay)(uint_t) = NULL;
229 
230 /*
231  * Full-service implementation of rw_enter() to handle all the hard cases.
232  * Called from the assembly version if anything complicated is going on.
233  * The only semantic difference between calling rw_enter() and calling
234  * rw_enter_sleep() directly is that we assume the caller has already done
235  * a THREAD_KPRI_REQUEST() in the RW_READER case.
236  */
237 void
238 rw_enter_sleep(rwlock_impl_t *lp, krw_t rw)
239 {
240 	uintptr_t old, new, lock_value, lock_busy, lock_wait;
241 	hrtime_t sleep_time;
242 	turnstile_t *ts;
243 	uint_t  backoff = 0;
244 	int loop_count = 0;
245 
246 	if (rw == RW_READER) {
247 		lock_value = RW_READ_LOCK;
248 		lock_busy = RW_WRITE_CLAIMED;
249 		lock_wait = RW_HAS_WAITERS;
250 	} else {
251 		lock_value = RW_WRITE_LOCK(curthread);
252 		lock_busy = (uintptr_t)RW_LOCKED;
253 		lock_wait = RW_HAS_WAITERS | RW_WRITE_WANTED;
254 	}
255 
256 	for (;;) {
257 		if (((old = lp->rw_wwwh) & lock_busy) == 0) {
258 			if (casip(&lp->rw_wwwh, old, old + lock_value) != old) {
259 				if (rw_lock_delay != NULL) {
260 					backoff = rw_lock_backoff(backoff);
261 					rw_lock_delay(backoff);
262 					if (++loop_count == ncpus_online) {
263 						backoff = 0;
264 						loop_count = 0;
265 					}
266 				}
267 				continue;
268 			}
269 			break;
270 		}
271 
272 		if (panicstr)
273 			return;
274 
275 		if ((old & RW_DOUBLE_LOCK) == RW_DOUBLE_LOCK) {
276 			rw_panic("rw_enter: bad rwlock", lp);
277 			return;
278 		}
279 
280 		if ((old & RW_OWNER) == (uintptr_t)curthread) {
281 			rw_panic("recursive rw_enter", lp);
282 			return;
283 		}
284 
285 		ts = turnstile_lookup(lp);
286 
287 		do {
288 			if (((old = lp->rw_wwwh) & lock_busy) == 0)
289 				break;
290 			new = old | lock_wait;
291 		} while (old != new && casip(&lp->rw_wwwh, old, new) != old);
292 
293 		if ((old & lock_busy) == 0) {
294 			/*
295 			 * The lock appears free now; try the dance again
296 			 */
297 			turnstile_exit(lp);
298 			continue;
299 		}
300 
301 		/*
302 		 * We really are going to block.  Bump the stats, and drop
303 		 * kpri if we're a reader.
304 		 */
305 		ASSERT(lp->rw_wwwh & lock_wait);
306 		ASSERT(lp->rw_wwwh & RW_LOCKED);
307 
308 		sleep_time = -gethrtime();
309 		if (rw == RW_READER) {
310 			THREAD_KPRI_RELEASE();
311 			CPU_STATS_ADDQ(CPU, sys, rw_rdfails, 1);
312 			(void) turnstile_block(ts, TS_READER_Q, lp,
313 			    &rw_sobj_ops, NULL, NULL);
314 		} else {
315 			CPU_STATS_ADDQ(CPU, sys, rw_wrfails, 1);
316 			(void) turnstile_block(ts, TS_WRITER_Q, lp,
317 			    &rw_sobj_ops, NULL, NULL);
318 		}
319 		sleep_time += gethrtime();
320 
321 		LOCKSTAT_RECORD4(LS_RW_ENTER_BLOCK, lp, sleep_time, rw,
322 		    (old & RW_WRITE_LOCKED) ? 1 : 0,
323 		    old >> RW_HOLD_COUNT_SHIFT);
324 
325 		/*
326 		 * We wake up holding the lock (and having kpri if we're
327 		 * a reader) via direct handoff from the previous owner.
328 		 */
329 		break;
330 	}
331 
332 	ASSERT(rw_locked(lp, rw));
333 
334 	membar_enter();
335 
336 	LOCKSTAT_RECORD(LS_RW_ENTER_ACQUIRE, lp, rw);
337 }
338 
339 /*
340  * Return the number of readers to wake, or zero if we should wake a writer.
341  * Called only by exiting/downgrading writers (readers don't wake readers).
342  */
343 static int
344 rw_readers_to_wake(turnstile_t *ts)
345 {
346 	kthread_t *next_writer = ts->ts_sleepq[TS_WRITER_Q].sq_first;
347 	kthread_t *next_reader = ts->ts_sleepq[TS_READER_Q].sq_first;
348 	pri_t wpri = (next_writer != NULL) ? DISP_PRIO(next_writer) : -1;
349 	int count = 0;
350 
351 	while (next_reader != NULL) {
352 		if (DISP_PRIO(next_reader) < wpri)
353 			break;
354 		next_reader->t_kpri_req++;
355 		next_reader = next_reader->t_link;
356 		count++;
357 	}
358 	return (count);
359 }
360 
361 /*
362  * Full-service implementation of rw_exit() to handle all the hard cases.
363  * Called from the assembly version if anything complicated is going on.
364  * There is no semantic difference between calling rw_exit() and calling
365  * rw_exit_wakeup() directly.
366  */
367 void
368 rw_exit_wakeup(rwlock_impl_t *lp)
369 {
370 	turnstile_t *ts;
371 	uintptr_t old, new, lock_value;
372 	kthread_t *next_writer;
373 	int nreaders;
374 	uint_t  backoff = 0;
375 	int loop_count = 0;
376 
377 	membar_exit();
378 
379 	old = lp->rw_wwwh;
380 	if (old & RW_WRITE_LOCKED) {
381 		if ((old & RW_OWNER) != (uintptr_t)curthread) {
382 			rw_panic("rw_exit: not owner", lp);
383 			lp->rw_wwwh = 0;
384 			return;
385 		}
386 		lock_value = RW_WRITE_LOCK(curthread);
387 	} else {
388 		if ((old & RW_LOCKED) == 0) {
389 			rw_panic("rw_exit: lock not held", lp);
390 			return;
391 		}
392 		lock_value = RW_READ_LOCK;
393 	}
394 
395 	for (;;) {
396 		/*
397 		 * If this is *not* the final exit of a lock with waiters,
398 		 * just drop the lock -- there's nothing tricky going on.
399 		 */
400 		old = lp->rw_wwwh;
401 		new = old - lock_value;
402 		if ((new & (RW_LOCKED | RW_HAS_WAITERS)) != RW_HAS_WAITERS) {
403 			if (casip(&lp->rw_wwwh, old, new) != old) {
404 				if (rw_lock_delay != NULL) {
405 					backoff = rw_lock_backoff(backoff);
406 					rw_lock_delay(backoff);
407 					if (++loop_count == ncpus_online) {
408 						backoff = 0;
409 						loop_count = 0;
410 					}
411 				}
412 				continue;
413 			}
414 			break;
415 		}
416 
417 		/*
418 		 * Perform the final exit of a lock that has waiters.
419 		 */
420 		ts = turnstile_lookup(lp);
421 
422 		next_writer = ts->ts_sleepq[TS_WRITER_Q].sq_first;
423 
424 		if ((old & RW_WRITE_LOCKED) &&
425 		    (nreaders = rw_readers_to_wake(ts)) > 0) {
426 			/*
427 			 * Don't drop the lock -- just set the hold count
428 			 * such that we grant the lock to all readers at once.
429 			 */
430 			new = nreaders * RW_READ_LOCK;
431 			if (ts->ts_waiters > nreaders)
432 				new |= RW_HAS_WAITERS;
433 			if (next_writer)
434 				new |= RW_WRITE_WANTED;
435 			lp->rw_wwwh = new;
436 			membar_enter();
437 			turnstile_wakeup(ts, TS_READER_Q, nreaders, NULL);
438 		} else {
439 			/*
440 			 * Don't drop the lock -- just transfer ownership
441 			 * directly to next_writer.  Note that there must
442 			 * be at least one waiting writer, because we get
443 			 * here only if (A) the lock is read-locked or
444 			 * (B) there are no waiting readers.  In case (A),
445 			 * since the lock is read-locked there would be no
446 			 * reason for other readers to have blocked unless
447 			 * the RW_WRITE_WANTED bit was set.  In case (B),
448 			 * since there are waiters but no waiting readers,
449 			 * they must all be waiting writers.
450 			 */
451 			ASSERT(lp->rw_wwwh & RW_WRITE_WANTED);
452 			new = RW_WRITE_LOCK(next_writer);
453 			if (ts->ts_waiters > 1)
454 				new |= RW_HAS_WAITERS;
455 			if (next_writer->t_link)
456 				new |= RW_WRITE_WANTED;
457 			lp->rw_wwwh = new;
458 			membar_enter();
459 			turnstile_wakeup(ts, TS_WRITER_Q, 1, next_writer);
460 		}
461 		break;
462 	}
463 
464 	if (lock_value == RW_READ_LOCK) {
465 		THREAD_KPRI_RELEASE();
466 		LOCKSTAT_RECORD(LS_RW_EXIT_RELEASE, lp, RW_READER);
467 	} else {
468 		LOCKSTAT_RECORD(LS_RW_EXIT_RELEASE, lp, RW_WRITER);
469 	}
470 }
471 
472 int
473 rw_tryenter(krwlock_t *rwlp, krw_t rw)
474 {
475 	rwlock_impl_t *lp = (rwlock_impl_t *)rwlp;
476 	uintptr_t old;
477 
478 	if (rw == RW_READER) {
479 		uint_t backoff = 0;
480 		int loop_count = 0;
481 		THREAD_KPRI_REQUEST();
482 		for (;;) {
483 			if ((old = lp->rw_wwwh) & RW_WRITE_CLAIMED) {
484 				THREAD_KPRI_RELEASE();
485 				return (0);
486 			}
487 			if (casip(&lp->rw_wwwh, old, old + RW_READ_LOCK) == old)
488 				break;
489 			if (rw_lock_delay != NULL) {
490 				backoff = rw_lock_backoff(backoff);
491 				rw_lock_delay(backoff);
492 				if (++loop_count == ncpus_online) {
493 					backoff = 0;
494 					loop_count = 0;
495 				}
496 			}
497 		}
498 		LOCKSTAT_RECORD(LS_RW_TRYENTER_ACQUIRE, lp, rw);
499 	} else {
500 		if (casip(&lp->rw_wwwh, 0, RW_WRITE_LOCK(curthread)) != 0)
501 			return (0);
502 		LOCKSTAT_RECORD(LS_RW_TRYENTER_ACQUIRE, lp, rw);
503 	}
504 	ASSERT(rw_locked(lp, rw));
505 	membar_enter();
506 	return (1);
507 }
508 
509 void
510 rw_downgrade(krwlock_t *rwlp)
511 {
512 	rwlock_impl_t *lp = (rwlock_impl_t *)rwlp;
513 
514 	THREAD_KPRI_REQUEST();
515 	membar_exit();
516 
517 	if ((lp->rw_wwwh & RW_OWNER) != (uintptr_t)curthread) {
518 		rw_panic("rw_downgrade: not owner", lp);
519 		return;
520 	}
521 
522 	if (atomic_add_ip_nv(&lp->rw_wwwh,
523 	    RW_READ_LOCK - RW_WRITE_LOCK(curthread)) & RW_HAS_WAITERS) {
524 		turnstile_t *ts = turnstile_lookup(lp);
525 		int nreaders = rw_readers_to_wake(ts);
526 		if (nreaders > 0) {
527 			uintptr_t delta = nreaders * RW_READ_LOCK;
528 			if (ts->ts_waiters == nreaders)
529 				delta -= RW_HAS_WAITERS;
530 			atomic_add_ip(&lp->rw_wwwh, delta);
531 		}
532 		turnstile_wakeup(ts, TS_READER_Q, nreaders, NULL);
533 	}
534 	ASSERT(rw_locked(lp, RW_READER));
535 	LOCKSTAT_RECORD0(LS_RW_DOWNGRADE_DOWNGRADE, lp);
536 }
537 
538 int
539 rw_tryupgrade(krwlock_t *rwlp)
540 {
541 	rwlock_impl_t *lp = (rwlock_impl_t *)rwlp;
542 	uintptr_t old, new;
543 
544 	ASSERT(rw_locked(lp, RW_READER));
545 
546 	do {
547 		if (((old = lp->rw_wwwh) & ~RW_HAS_WAITERS) != RW_READ_LOCK)
548 			return (0);
549 		new = old + RW_WRITE_LOCK(curthread) - RW_READ_LOCK;
550 	} while (casip(&lp->rw_wwwh, old, new) != old);
551 
552 	membar_enter();
553 	THREAD_KPRI_RELEASE();
554 	LOCKSTAT_RECORD0(LS_RW_TRYUPGRADE_UPGRADE, lp);
555 	ASSERT(rw_locked(lp, RW_WRITER));
556 	return (1);
557 }
558 
559 int
560 rw_read_held(krwlock_t *rwlp)
561 {
562 	uintptr_t tmp;
563 
564 	return (_RW_READ_HELD(rwlp, tmp));
565 }
566 
567 int
568 rw_write_held(krwlock_t *rwlp)
569 {
570 	return (_RW_WRITE_HELD(rwlp));
571 }
572 
573 int
574 rw_lock_held(krwlock_t *rwlp)
575 {
576 	return (_RW_LOCK_HELD(rwlp));
577 }
578 
579 /*
580  * Like rw_read_held(), but ASSERTs that the lock is currently held
581  */
582 int
583 rw_read_locked(krwlock_t *rwlp)
584 {
585 	uintptr_t old = ((rwlock_impl_t *)rwlp)->rw_wwwh;
586 
587 	ASSERT(old & RW_LOCKED);
588 	return ((old & RW_LOCKED) && !(old & RW_WRITE_LOCKED));
589 }
590 
591 /*
592  * Returns non-zero if the lock is either held or desired by a writer
593  */
594 int
595 rw_iswriter(krwlock_t *rwlp)
596 {
597 	return (_RW_ISWRITER(rwlp));
598 }
599 
600 kthread_t *
601 rw_owner(krwlock_t *rwlp)
602 {
603 	uintptr_t old = ((rwlock_impl_t *)rwlp)->rw_wwwh;
604 
605 	return ((old & RW_WRITE_LOCKED) ? (kthread_t *)(old & RW_OWNER) : NULL);
606 }
607