xref: /linux/Documentation/RCU/listRCU.rst (revision 3f58ff6b53c11773b1bd564082fae37d48e0cc40)
1.. _list_rcu_doc:
2
3Using RCU to Protect Read-Mostly Linked Lists
4=============================================
5
6One of the most common uses of RCU is protecting read-mostly linked lists
7(``struct list_head`` in list.h).  One big advantage of this approach is
8that all of the required memory ordering is provided by the list macros.
9This document describes several list-based RCU use cases.
10
11
12Example 1: Read-mostly list: Deferred Destruction
13-------------------------------------------------
14
15A widely used usecase for RCU lists in the kernel is lockless iteration over
16all processes in the system. ``task_struct::tasks`` represents the list node that
17links all the processes. The list can be traversed in parallel to any list
18additions or removals.
19
20The traversal of the list is done using ``for_each_process()`` which is defined
21by the 2 macros::
22
23	#define next_task(p) \
24		list_entry_rcu((p)->tasks.next, struct task_struct, tasks)
25
26	#define for_each_process(p) \
27		for (p = &init_task ; (p = next_task(p)) != &init_task ; )
28
29The code traversing the list of all processes typically looks like::
30
31	rcu_read_lock();
32	for_each_process(p) {
33		/* Do something with p */
34	}
35	rcu_read_unlock();
36
37The simplified and heavily inlined code for removing a process from a
38task list is::
39
40	void release_task(struct task_struct *p)
41	{
42		write_lock(&tasklist_lock);
43		list_del_rcu(&p->tasks);
44		write_unlock(&tasklist_lock);
45		call_rcu(&p->rcu, delayed_put_task_struct);
46	}
47
48When a process exits, ``release_task()`` calls ``list_del_rcu(&p->tasks)``
49via __exit_signal() and __unhash_process() under ``tasklist_lock``
50writer lock protection.  The list_del_rcu() invocation removes
51the task from the list of all tasks. The ``tasklist_lock``
52prevents concurrent list additions/removals from corrupting the
53list. Readers using ``for_each_process()`` are not protected with the
54``tasklist_lock``. To prevent readers from noticing changes in the list
55pointers, the ``task_struct`` object is freed only after one or more
56grace periods elapse, with the help of call_rcu(), which is invoked via
57put_task_struct_rcu_user(). This deferring of destruction ensures that
58any readers traversing the list will see valid ``p->tasks.next`` pointers
59and deletion/freeing can happen in parallel with traversal of the list.
60This pattern is also called an **existence lock**, since RCU refrains
61from invoking the delayed_put_task_struct() callback function until
62all existing readers finish, which guarantees that the ``task_struct``
63object in question will remain in existence until after the completion
64of all RCU readers that might possibly have a reference to that object.
65
66
67Example 2: Read-Side Action Taken Outside of Lock: No In-Place Updates
68----------------------------------------------------------------------
69
70Some reader-writer locking use cases compute a value while holding
71the read-side lock, but continue to use that value after that lock is
72released.  These use cases are often good candidates for conversion
73to RCU.  One prominent example involves network packet routing.
74Because the packet-routing data tracks the state of equipment outside
75of the computer, it will at times contain stale data.  Therefore, once
76the route has been computed, there is no need to hold the routing table
77static during transmission of the packet.  After all, you can hold the
78routing table static all you want, but that won't keep the external
79Internet from changing, and it is the state of the external Internet
80that really matters.  In addition, routing entries are typically added
81or deleted, rather than being modified in place.  This is a rare example
82of the finite speed of light and the non-zero size of atoms actually
83helping make synchronization be lighter weight.
84
85A straightforward example of this type of RCU use case may be found in
86the system-call auditing support.  For example, a reader-writer locked
87implementation of ``audit_filter_task()`` might be as follows::
88
89	static enum audit_state audit_filter_task(struct task_struct *tsk, char **key)
90	{
91		struct audit_entry *e;
92		enum audit_state   state;
93
94		read_lock(&auditsc_lock);
95		/* Note: audit_filter_mutex held by caller. */
96		list_for_each_entry(e, &audit_tsklist, list) {
97			if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
98				if (state == AUDIT_STATE_RECORD)
99					*key = kstrdup(e->rule.filterkey, GFP_ATOMIC);
100				read_unlock(&auditsc_lock);
101				return state;
102			}
103		}
104		read_unlock(&auditsc_lock);
105		return AUDIT_BUILD_CONTEXT;
106	}
107
108Here the list is searched under the lock, but the lock is dropped before
109the corresponding value is returned.  By the time that this value is acted
110on, the list may well have been modified.  This makes sense, since if
111you are turning auditing off, it is OK to audit a few extra system calls.
112
113This means that RCU can be easily applied to the read side, as follows::
114
115	static enum audit_state audit_filter_task(struct task_struct *tsk, char **key)
116	{
117		struct audit_entry *e;
118		enum audit_state   state;
119
120		rcu_read_lock();
121		/* Note: audit_filter_mutex held by caller. */
122		list_for_each_entry_rcu(e, &audit_tsklist, list) {
123			if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
124				if (state == AUDIT_STATE_RECORD)
125					*key = kstrdup(e->rule.filterkey, GFP_ATOMIC);
126				rcu_read_unlock();
127				return state;
128			}
129		}
130		rcu_read_unlock();
131		return AUDIT_BUILD_CONTEXT;
132	}
133
134The read_lock() and read_unlock() calls have become rcu_read_lock()
135and rcu_read_unlock(), respectively, and the list_for_each_entry()
136has become list_for_each_entry_rcu().  The **_rcu()** list-traversal
137primitives add READ_ONCE() and diagnostic checks for incorrect use
138outside of an RCU read-side critical section.
139
140The changes to the update side are also straightforward. A reader-writer lock
141might be used as follows for deletion and insertion in these simplified
142versions of audit_del_rule() and audit_add_rule()::
143
144	static inline int audit_del_rule(struct audit_rule *rule,
145					 struct list_head *list)
146	{
147		struct audit_entry *e;
148
149		write_lock(&auditsc_lock);
150		list_for_each_entry(e, list, list) {
151			if (!audit_compare_rule(rule, &e->rule)) {
152				list_del(&e->list);
153				write_unlock(&auditsc_lock);
154				return 0;
155			}
156		}
157		write_unlock(&auditsc_lock);
158		return -EFAULT;		/* No matching rule */
159	}
160
161	static inline int audit_add_rule(struct audit_entry *entry,
162					 struct list_head *list)
163	{
164		write_lock(&auditsc_lock);
165		if (entry->rule.flags & AUDIT_PREPEND) {
166			entry->rule.flags &= ~AUDIT_PREPEND;
167			list_add(&entry->list, list);
168		} else {
169			list_add_tail(&entry->list, list);
170		}
171		write_unlock(&auditsc_lock);
172		return 0;
173	}
174
175Following are the RCU equivalents for these two functions::
176
177	static inline int audit_del_rule(struct audit_rule *rule,
178					 struct list_head *list)
179	{
180		struct audit_entry *e;
181
182		/* No need to use the _rcu iterator here, since this is the only
183		 * deletion routine. */
184		list_for_each_entry(e, list, list) {
185			if (!audit_compare_rule(rule, &e->rule)) {
186				list_del_rcu(&e->list);
187				call_rcu(&e->rcu, audit_free_rule);
188				return 0;
189			}
190		}
191		return -EFAULT;		/* No matching rule */
192	}
193
194	static inline int audit_add_rule(struct audit_entry *entry,
195					 struct list_head *list)
196	{
197		if (entry->rule.flags & AUDIT_PREPEND) {
198			entry->rule.flags &= ~AUDIT_PREPEND;
199			list_add_rcu(&entry->list, list);
200		} else {
201			list_add_tail_rcu(&entry->list, list);
202		}
203		return 0;
204	}
205
206Normally, the write_lock() and write_unlock() would be replaced by a
207spin_lock() and a spin_unlock(). But in this case, all callers hold
208``audit_filter_mutex``, so no additional locking is required. The
209auditsc_lock can therefore be eliminated, since use of RCU eliminates the
210need for writers to exclude readers.
211
212The list_del(), list_add(), and list_add_tail() primitives have been
213replaced by list_del_rcu(), list_add_rcu(), and list_add_tail_rcu().
214The **_rcu()** list-manipulation primitives add memory barriers that are
215needed on weakly ordered CPUs.  The list_del_rcu() primitive omits the
216pointer poisoning debug-assist code that would otherwise cause concurrent
217readers to fail spectacularly.
218
219So, when readers can tolerate stale data and when entries are either added or
220deleted, without in-place modification, it is very easy to use RCU!
221
222
223Example 3: Handling In-Place Updates
224------------------------------------
225
226The system-call auditing code does not update auditing rules in place.  However,
227if it did, the reader-writer-locked code to do so might look as follows
228(assuming only ``field_count`` is updated, otherwise, the added fields would
229need to be filled in)::
230
231	static inline int audit_upd_rule(struct audit_rule *rule,
232					 struct list_head *list,
233					 __u32 newaction,
234					 __u32 newfield_count)
235	{
236		struct audit_entry *e;
237		struct audit_entry *ne;
238
239		write_lock(&auditsc_lock);
240		/* Note: audit_filter_mutex held by caller. */
241		list_for_each_entry(e, list, list) {
242			if (!audit_compare_rule(rule, &e->rule)) {
243				e->rule.action = newaction;
244				e->rule.field_count = newfield_count;
245				write_unlock(&auditsc_lock);
246				return 0;
247			}
248		}
249		write_unlock(&auditsc_lock);
250		return -EFAULT;		/* No matching rule */
251	}
252
253The RCU version creates a copy, updates the copy, then replaces the old
254entry with the newly updated entry.  This sequence of actions, allowing
255concurrent reads while making a copy to perform an update, is what gives
256RCU (*read-copy update*) its name.
257
258The RCU version of audit_upd_rule() is as follows::
259
260	static inline int audit_upd_rule(struct audit_rule *rule,
261					 struct list_head *list,
262					 __u32 newaction,
263					 __u32 newfield_count)
264	{
265		struct audit_entry *e;
266		struct audit_entry *ne;
267
268		list_for_each_entry(e, list, list) {
269			if (!audit_compare_rule(rule, &e->rule)) {
270				ne = kmalloc(sizeof(*entry), GFP_ATOMIC);
271				if (ne == NULL)
272					return -ENOMEM;
273				audit_copy_rule(&ne->rule, &e->rule);
274				ne->rule.action = newaction;
275				ne->rule.field_count = newfield_count;
276				list_replace_rcu(&e->list, &ne->list);
277				call_rcu(&e->rcu, audit_free_rule);
278				return 0;
279			}
280		}
281		return -EFAULT;		/* No matching rule */
282	}
283
284Again, this assumes that the caller holds ``audit_filter_mutex``.  Normally, the
285writer lock would become a spinlock in this sort of code.
286
287The update_lsm_rule() does something very similar, for those who would
288prefer to look at real Linux-kernel code.
289
290Another use of this pattern can be found in the openswitch driver's *connection
291tracking table* code in ``ct_limit_set()``.  The table holds connection tracking
292entries and has a limit on the maximum entries.  There is one such table
293per-zone and hence one *limit* per zone.  The zones are mapped to their limits
294through a hashtable using an RCU-managed hlist for the hash chains. When a new
295limit is set, a new limit object is allocated and ``ct_limit_set()`` is called
296to replace the old limit object with the new one using list_replace_rcu().
297The old limit object is then freed after a grace period using kfree_rcu().
298
299
300Example 4: Eliminating Stale Data
301---------------------------------
302
303The auditing example above tolerates stale data, as do most algorithms
304that are tracking external state.  After all, given there is a delay
305from the time the external state changes before Linux becomes aware
306of the change, and so as noted earlier, a small quantity of additional
307RCU-induced staleness is generally not a problem.
308
309However, there are many examples where stale data cannot be tolerated.
310One example in the Linux kernel is the System V IPC (see the shm_lock()
311function in ipc/shm.c).  This code checks a *deleted* flag under a
312per-entry spinlock, and, if the *deleted* flag is set, pretends that the
313entry does not exist.  For this to be helpful, the search function must
314return holding the per-entry spinlock, as shm_lock() does in fact do.
315
316.. _quick_quiz:
317
318Quick Quiz:
319	For the deleted-flag technique to be helpful, why is it necessary
320	to hold the per-entry lock while returning from the search function?
321
322:ref:`Answer to Quick Quiz <quick_quiz_answer>`
323
324If the system-call audit module were to ever need to reject stale data, one way
325to accomplish this would be to add a ``deleted`` flag and a ``lock`` spinlock to the
326``audit_entry`` structure, and modify audit_filter_task() as follows::
327
328	static enum audit_state audit_filter_task(struct task_struct *tsk)
329	{
330		struct audit_entry *e;
331		enum audit_state   state;
332
333		rcu_read_lock();
334		list_for_each_entry_rcu(e, &audit_tsklist, list) {
335			if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
336				spin_lock(&e->lock);
337				if (e->deleted) {
338					spin_unlock(&e->lock);
339					rcu_read_unlock();
340					return AUDIT_BUILD_CONTEXT;
341				}
342				rcu_read_unlock();
343				if (state == AUDIT_STATE_RECORD)
344					*key = kstrdup(e->rule.filterkey, GFP_ATOMIC);
345				return state;
346			}
347		}
348		rcu_read_unlock();
349		return AUDIT_BUILD_CONTEXT;
350	}
351
352The ``audit_del_rule()`` function would need to set the ``deleted`` flag under the
353spinlock as follows::
354
355	static inline int audit_del_rule(struct audit_rule *rule,
356					 struct list_head *list)
357	{
358		struct audit_entry *e;
359
360		/* No need to use the _rcu iterator here, since this
361		 * is the only deletion routine. */
362		list_for_each_entry(e, list, list) {
363			if (!audit_compare_rule(rule, &e->rule)) {
364				spin_lock(&e->lock);
365				list_del_rcu(&e->list);
366				e->deleted = 1;
367				spin_unlock(&e->lock);
368				call_rcu(&e->rcu, audit_free_rule);
369				return 0;
370			}
371		}
372		return -EFAULT;		/* No matching rule */
373	}
374
375This too assumes that the caller holds ``audit_filter_mutex``.
376
377Note that this example assumes that entries are only added and deleted.
378Additional mechanism is required to deal correctly with the update-in-place
379performed by audit_upd_rule().  For one thing, audit_upd_rule() would
380need to hold the locks of both the old ``audit_entry`` and its replacement
381while executing the list_replace_rcu().
382
383
384Example 5: Skipping Stale Objects
385---------------------------------
386
387For some use cases, reader performance can be improved by skipping
388stale objects during read-side list traversal, where stale objects
389are those that will be removed and destroyed after one or more grace
390periods. One such example can be found in the timerfd subsystem. When a
391``CLOCK_REALTIME`` clock is reprogrammed (for example due to setting
392of the system time) then all programmed ``timerfds`` that depend on
393this clock get triggered and processes waiting on them are awakened in
394advance of their scheduled expiry. To facilitate this, all such timers
395are added to an RCU-managed ``cancel_list`` when they are setup in
396``timerfd_setup_cancel()``::
397
398	static void timerfd_setup_cancel(struct timerfd_ctx *ctx, int flags)
399	{
400		spin_lock(&ctx->cancel_lock);
401		if ((ctx->clockid == CLOCK_REALTIME ||
402		     ctx->clockid == CLOCK_REALTIME_ALARM) &&
403		    (flags & TFD_TIMER_ABSTIME) && (flags & TFD_TIMER_CANCEL_ON_SET)) {
404			if (!ctx->might_cancel) {
405				ctx->might_cancel = true;
406				spin_lock(&cancel_lock);
407				list_add_rcu(&ctx->clist, &cancel_list);
408				spin_unlock(&cancel_lock);
409			}
410		} else {
411			__timerfd_remove_cancel(ctx);
412		}
413		spin_unlock(&ctx->cancel_lock);
414	}
415
416When a timerfd is freed (fd is closed), then the ``might_cancel``
417flag of the timerfd object is cleared, the object removed from the
418``cancel_list`` and destroyed, as shown in this simplified and inlined
419version of timerfd_release()::
420
421	int timerfd_release(struct inode *inode, struct file *file)
422	{
423		struct timerfd_ctx *ctx = file->private_data;
424
425		spin_lock(&ctx->cancel_lock);
426		if (ctx->might_cancel) {
427			ctx->might_cancel = false;
428			spin_lock(&cancel_lock);
429			list_del_rcu(&ctx->clist);
430			spin_unlock(&cancel_lock);
431		}
432		spin_unlock(&ctx->cancel_lock);
433
434		if (isalarm(ctx))
435			alarm_cancel(&ctx->t.alarm);
436		else
437			hrtimer_cancel(&ctx->t.tmr);
438		kfree_rcu(ctx, rcu);
439		return 0;
440	}
441
442If the ``CLOCK_REALTIME`` clock is set, for example by a time server, the
443hrtimer framework calls ``timerfd_clock_was_set()`` which walks the
444``cancel_list`` and wakes up processes waiting on the timerfd. While iterating
445the ``cancel_list``, the ``might_cancel`` flag is consulted to skip stale
446objects::
447
448	void timerfd_clock_was_set(void)
449	{
450		ktime_t moffs = ktime_mono_to_real(0);
451		struct timerfd_ctx *ctx;
452		unsigned long flags;
453
454		rcu_read_lock();
455		list_for_each_entry_rcu(ctx, &cancel_list, clist) {
456			if (!ctx->might_cancel)
457				continue;
458			spin_lock_irqsave(&ctx->wqh.lock, flags);
459			if (ctx->moffs != moffs) {
460				ctx->moffs = KTIME_MAX;
461				ctx->ticks++;
462				wake_up_locked_poll(&ctx->wqh, EPOLLIN);
463			}
464			spin_unlock_irqrestore(&ctx->wqh.lock, flags);
465		}
466		rcu_read_unlock();
467	}
468
469The key point is that because RCU-protected traversal of the
470``cancel_list`` happens concurrently with object addition and removal,
471sometimes the traversal can access an object that has been removed from
472the list. In this example, a flag is used to skip such objects.
473
474
475Summary
476-------
477
478Read-mostly list-based data structures that can tolerate stale data are
479the most amenable to use of RCU.  The simplest case is where entries are
480either added or deleted from the data structure (or atomically modified
481in place), but non-atomic in-place modifications can be handled by making
482a copy, updating the copy, then replacing the original with the copy.
483If stale data cannot be tolerated, then a *deleted* flag may be used
484in conjunction with a per-entry spinlock in order to allow the search
485function to reject newly deleted data.
486
487.. _quick_quiz_answer:
488
489Answer to Quick Quiz:
490	For the deleted-flag technique to be helpful, why is it necessary
491	to hold the per-entry lock while returning from the search function?
492
493	If the search function drops the per-entry lock before returning,
494	then the caller will be processing stale data in any case.  If it
495	is really OK to be processing stale data, then you don't need a
496	*deleted* flag.  If processing stale data really is a problem,
497	then you need to hold the per-entry lock across all of the code
498	that uses the value that was returned.
499
500:ref:`Back to Quick Quiz <quick_quiz>`
501