xref: /linux/Documentation/RCU/listRCU.rst (revision 02680c23d7b3febe45ea3d4f9818c2b2dc89020a)
1.. _list_rcu_doc:
2
3Using RCU to Protect Read-Mostly Linked Lists
4=============================================
5
6One of the best applications of RCU is to protect read-mostly linked lists
7(``struct list_head`` in list.h).  One big advantage of this approach
8is that all of the required memory barriers are included for you in
9the list macros.  This document describes several applications of RCU,
10with the best fits first.
11
12
13Example 1: Read-mostly list: Deferred Destruction
14-------------------------------------------------
15
16A widely used usecase for RCU lists in the kernel is lockless iteration over
17all processes in the system. ``task_struct::tasks`` represents the list node that
18links all the processes. The list can be traversed in parallel to any list
19additions or removals.
20
21The traversal of the list is done using ``for_each_process()`` which is defined
22by the 2 macros::
23
24	#define next_task(p) \
25		list_entry_rcu((p)->tasks.next, struct task_struct, tasks)
26
27	#define for_each_process(p) \
28		for (p = &init_task ; (p = next_task(p)) != &init_task ; )
29
30The code traversing the list of all processes typically looks like::
31
32	rcu_read_lock();
33	for_each_process(p) {
34		/* Do something with p */
35	}
36	rcu_read_unlock();
37
38The simplified code for removing a process from a task list is::
39
40	void release_task(struct task_struct *p)
41	{
42		write_lock(&tasklist_lock);
43		list_del_rcu(&p->tasks);
44		write_unlock(&tasklist_lock);
45		call_rcu(&p->rcu, delayed_put_task_struct);
46	}
47
48When a process exits, ``release_task()`` calls ``list_del_rcu(&p->tasks)`` under
49``tasklist_lock`` writer lock protection, to remove the task from the list of
50all tasks. The ``tasklist_lock`` prevents concurrent list additions/removals
51from corrupting the list. Readers using ``for_each_process()`` are not protected
52with the ``tasklist_lock``. To prevent readers from noticing changes in the list
53pointers, the ``task_struct`` object is freed only after one or more grace
54periods elapse (with the help of call_rcu()). This deferring of destruction
55ensures that any readers traversing the list will see valid ``p->tasks.next``
56pointers and deletion/freeing can happen in parallel with traversal of the list.
57This pattern is also called an **existence lock**, since RCU pins the object in
58memory until all existing readers finish.
59
60
61Example 2: Read-Side Action Taken Outside of Lock: No In-Place Updates
62----------------------------------------------------------------------
63
64The best applications are cases where, if reader-writer locking were
65used, the read-side lock would be dropped before taking any action
66based on the results of the search.  The most celebrated example is
67the routing table.  Because the routing table is tracking the state of
68equipment outside of the computer, it will at times contain stale data.
69Therefore, once the route has been computed, there is no need to hold
70the routing table static during transmission of the packet.  After all,
71you can hold the routing table static all you want, but that won't keep
72the external Internet from changing, and it is the state of the external
73Internet that really matters.  In addition, routing entries are typically
74added or deleted, rather than being modified in place.
75
76A straightforward example of this use of RCU may be found in the
77system-call auditing support.  For example, a reader-writer locked
78implementation of ``audit_filter_task()`` might be as follows::
79
80	static enum audit_state audit_filter_task(struct task_struct *tsk)
81	{
82		struct audit_entry *e;
83		enum audit_state   state;
84
85		read_lock(&auditsc_lock);
86		/* Note: audit_filter_mutex held by caller. */
87		list_for_each_entry(e, &audit_tsklist, list) {
88			if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
89				read_unlock(&auditsc_lock);
90				return state;
91			}
92		}
93		read_unlock(&auditsc_lock);
94		return AUDIT_BUILD_CONTEXT;
95	}
96
97Here the list is searched under the lock, but the lock is dropped before
98the corresponding value is returned.  By the time that this value is acted
99on, the list may well have been modified.  This makes sense, since if
100you are turning auditing off, it is OK to audit a few extra system calls.
101
102This means that RCU can be easily applied to the read side, as follows::
103
104	static enum audit_state audit_filter_task(struct task_struct *tsk)
105	{
106		struct audit_entry *e;
107		enum audit_state   state;
108
109		rcu_read_lock();
110		/* Note: audit_filter_mutex held by caller. */
111		list_for_each_entry_rcu(e, &audit_tsklist, list) {
112			if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
113				rcu_read_unlock();
114				return state;
115			}
116		}
117		rcu_read_unlock();
118		return AUDIT_BUILD_CONTEXT;
119	}
120
121The ``read_lock()`` and ``read_unlock()`` calls have become rcu_read_lock()
122and rcu_read_unlock(), respectively, and the list_for_each_entry() has
123become list_for_each_entry_rcu().  The **_rcu()** list-traversal primitives
124insert the read-side memory barriers that are required on DEC Alpha CPUs.
125
126The changes to the update side are also straightforward. A reader-writer lock
127might be used as follows for deletion and insertion::
128
129	static inline int audit_del_rule(struct audit_rule *rule,
130					 struct list_head *list)
131	{
132		struct audit_entry *e;
133
134		write_lock(&auditsc_lock);
135		list_for_each_entry(e, list, list) {
136			if (!audit_compare_rule(rule, &e->rule)) {
137				list_del(&e->list);
138				write_unlock(&auditsc_lock);
139				return 0;
140			}
141		}
142		write_unlock(&auditsc_lock);
143		return -EFAULT;		/* No matching rule */
144	}
145
146	static inline int audit_add_rule(struct audit_entry *entry,
147					 struct list_head *list)
148	{
149		write_lock(&auditsc_lock);
150		if (entry->rule.flags & AUDIT_PREPEND) {
151			entry->rule.flags &= ~AUDIT_PREPEND;
152			list_add(&entry->list, list);
153		} else {
154			list_add_tail(&entry->list, list);
155		}
156		write_unlock(&auditsc_lock);
157		return 0;
158	}
159
160Following are the RCU equivalents for these two functions::
161
162	static inline int audit_del_rule(struct audit_rule *rule,
163					 struct list_head *list)
164	{
165		struct audit_entry *e;
166
167		/* No need to use the _rcu iterator here, since this is the only
168		 * deletion routine. */
169		list_for_each_entry(e, list, list) {
170			if (!audit_compare_rule(rule, &e->rule)) {
171				list_del_rcu(&e->list);
172				call_rcu(&e->rcu, audit_free_rule);
173				return 0;
174			}
175		}
176		return -EFAULT;		/* No matching rule */
177	}
178
179	static inline int audit_add_rule(struct audit_entry *entry,
180					 struct list_head *list)
181	{
182		if (entry->rule.flags & AUDIT_PREPEND) {
183			entry->rule.flags &= ~AUDIT_PREPEND;
184			list_add_rcu(&entry->list, list);
185		} else {
186			list_add_tail_rcu(&entry->list, list);
187		}
188		return 0;
189	}
190
191Normally, the ``write_lock()`` and ``write_unlock()`` would be replaced by a
192spin_lock() and a spin_unlock(). But in this case, all callers hold
193``audit_filter_mutex``, so no additional locking is required. The
194``auditsc_lock`` can therefore be eliminated, since use of RCU eliminates the
195need for writers to exclude readers.
196
197The list_del(), list_add(), and list_add_tail() primitives have been
198replaced by list_del_rcu(), list_add_rcu(), and list_add_tail_rcu().
199The **_rcu()** list-manipulation primitives add memory barriers that are needed on
200weakly ordered CPUs (most of them!).  The list_del_rcu() primitive omits the
201pointer poisoning debug-assist code that would otherwise cause concurrent
202readers to fail spectacularly.
203
204So, when readers can tolerate stale data and when entries are either added or
205deleted, without in-place modification, it is very easy to use RCU!
206
207
208Example 3: Handling In-Place Updates
209------------------------------------
210
211The system-call auditing code does not update auditing rules in place.  However,
212if it did, the reader-writer-locked code to do so might look as follows
213(assuming only ``field_count`` is updated, otherwise, the added fields would
214need to be filled in)::
215
216	static inline int audit_upd_rule(struct audit_rule *rule,
217					 struct list_head *list,
218					 __u32 newaction,
219					 __u32 newfield_count)
220	{
221		struct audit_entry *e;
222		struct audit_entry *ne;
223
224		write_lock(&auditsc_lock);
225		/* Note: audit_filter_mutex held by caller. */
226		list_for_each_entry(e, list, list) {
227			if (!audit_compare_rule(rule, &e->rule)) {
228				e->rule.action = newaction;
229				e->rule.field_count = newfield_count;
230				write_unlock(&auditsc_lock);
231				return 0;
232			}
233		}
234		write_unlock(&auditsc_lock);
235		return -EFAULT;		/* No matching rule */
236	}
237
238The RCU version creates a copy, updates the copy, then replaces the old
239entry with the newly updated entry.  This sequence of actions, allowing
240concurrent reads while making a copy to perform an update, is what gives
241RCU (*read-copy update*) its name.  The RCU code is as follows::
242
243	static inline int audit_upd_rule(struct audit_rule *rule,
244					 struct list_head *list,
245					 __u32 newaction,
246					 __u32 newfield_count)
247	{
248		struct audit_entry *e;
249		struct audit_entry *ne;
250
251		list_for_each_entry(e, list, list) {
252			if (!audit_compare_rule(rule, &e->rule)) {
253				ne = kmalloc(sizeof(*entry), GFP_ATOMIC);
254				if (ne == NULL)
255					return -ENOMEM;
256				audit_copy_rule(&ne->rule, &e->rule);
257				ne->rule.action = newaction;
258				ne->rule.field_count = newfield_count;
259				list_replace_rcu(&e->list, &ne->list);
260				call_rcu(&e->rcu, audit_free_rule);
261				return 0;
262			}
263		}
264		return -EFAULT;		/* No matching rule */
265	}
266
267Again, this assumes that the caller holds ``audit_filter_mutex``.  Normally, the
268writer lock would become a spinlock in this sort of code.
269
270Another use of this pattern can be found in the openswitch driver's *connection
271tracking table* code in ``ct_limit_set()``.  The table holds connection tracking
272entries and has a limit on the maximum entries.  There is one such table
273per-zone and hence one *limit* per zone.  The zones are mapped to their limits
274through a hashtable using an RCU-managed hlist for the hash chains. When a new
275limit is set, a new limit object is allocated and ``ct_limit_set()`` is called
276to replace the old limit object with the new one using list_replace_rcu().
277The old limit object is then freed after a grace period using kfree_rcu().
278
279
280Example 4: Eliminating Stale Data
281---------------------------------
282
283The auditing example above tolerates stale data, as do most algorithms
284that are tracking external state.  Because there is a delay from the
285time the external state changes before Linux becomes aware of the change,
286additional RCU-induced staleness is generally not a problem.
287
288However, there are many examples where stale data cannot be tolerated.
289One example in the Linux kernel is the System V IPC (see the shm_lock()
290function in ipc/shm.c).  This code checks a *deleted* flag under a
291per-entry spinlock, and, if the *deleted* flag is set, pretends that the
292entry does not exist.  For this to be helpful, the search function must
293return holding the per-entry spinlock, as shm_lock() does in fact do.
294
295.. _quick_quiz:
296
297Quick Quiz:
298	For the deleted-flag technique to be helpful, why is it necessary
299	to hold the per-entry lock while returning from the search function?
300
301:ref:`Answer to Quick Quiz <quick_quiz_answer>`
302
303If the system-call audit module were to ever need to reject stale data, one way
304to accomplish this would be to add a ``deleted`` flag and a ``lock`` spinlock to the
305audit_entry structure, and modify ``audit_filter_task()`` as follows::
306
307	static enum audit_state audit_filter_task(struct task_struct *tsk)
308	{
309		struct audit_entry *e;
310		enum audit_state   state;
311
312		rcu_read_lock();
313		list_for_each_entry_rcu(e, &audit_tsklist, list) {
314			if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
315				spin_lock(&e->lock);
316				if (e->deleted) {
317					spin_unlock(&e->lock);
318					rcu_read_unlock();
319					return AUDIT_BUILD_CONTEXT;
320				}
321				rcu_read_unlock();
322				return state;
323			}
324		}
325		rcu_read_unlock();
326		return AUDIT_BUILD_CONTEXT;
327	}
328
329Note that this example assumes that entries are only added and deleted.
330Additional mechanism is required to deal correctly with the update-in-place
331performed by ``audit_upd_rule()``.  For one thing, ``audit_upd_rule()`` would
332need additional memory barriers to ensure that the list_add_rcu() was really
333executed before the list_del_rcu().
334
335The ``audit_del_rule()`` function would need to set the ``deleted`` flag under the
336spinlock as follows::
337
338	static inline int audit_del_rule(struct audit_rule *rule,
339					 struct list_head *list)
340	{
341		struct audit_entry *e;
342
343		/* No need to use the _rcu iterator here, since this
344		 * is the only deletion routine. */
345		list_for_each_entry(e, list, list) {
346			if (!audit_compare_rule(rule, &e->rule)) {
347				spin_lock(&e->lock);
348				list_del_rcu(&e->list);
349				e->deleted = 1;
350				spin_unlock(&e->lock);
351				call_rcu(&e->rcu, audit_free_rule);
352				return 0;
353			}
354		}
355		return -EFAULT;		/* No matching rule */
356	}
357
358This too assumes that the caller holds ``audit_filter_mutex``.
359
360
361Example 5: Skipping Stale Objects
362---------------------------------
363
364For some usecases, reader performance can be improved by skipping stale objects
365during read-side list traversal if the object in concern is pending destruction
366after one or more grace periods. One such example can be found in the timerfd
367subsystem. When a ``CLOCK_REALTIME`` clock is reprogrammed - for example due to
368setting of the system time, then all programmed timerfds that depend on this
369clock get triggered and processes waiting on them to expire are woken up in
370advance of their scheduled expiry. To facilitate this, all such timers are added
371to an RCU-managed ``cancel_list`` when they are setup in
372``timerfd_setup_cancel()``::
373
374	static void timerfd_setup_cancel(struct timerfd_ctx *ctx, int flags)
375	{
376		spin_lock(&ctx->cancel_lock);
377		if ((ctx->clockid == CLOCK_REALTIME &&
378		    (flags & TFD_TIMER_ABSTIME) && (flags & TFD_TIMER_CANCEL_ON_SET)) {
379			if (!ctx->might_cancel) {
380				ctx->might_cancel = true;
381				spin_lock(&cancel_lock);
382				list_add_rcu(&ctx->clist, &cancel_list);
383				spin_unlock(&cancel_lock);
384			}
385		}
386		spin_unlock(&ctx->cancel_lock);
387	}
388
389When a timerfd is freed (fd is closed), then the ``might_cancel`` flag of the
390timerfd object is cleared, the object removed from the ``cancel_list`` and
391destroyed::
392
393	int timerfd_release(struct inode *inode, struct file *file)
394	{
395		struct timerfd_ctx *ctx = file->private_data;
396
397		spin_lock(&ctx->cancel_lock);
398		if (ctx->might_cancel) {
399			ctx->might_cancel = false;
400			spin_lock(&cancel_lock);
401			list_del_rcu(&ctx->clist);
402			spin_unlock(&cancel_lock);
403		}
404		spin_unlock(&ctx->cancel_lock);
405
406		hrtimer_cancel(&ctx->t.tmr);
407		kfree_rcu(ctx, rcu);
408		return 0;
409	}
410
411If the ``CLOCK_REALTIME`` clock is set, for example by a time server, the
412hrtimer framework calls ``timerfd_clock_was_set()`` which walks the
413``cancel_list`` and wakes up processes waiting on the timerfd. While iterating
414the ``cancel_list``, the ``might_cancel`` flag is consulted to skip stale
415objects::
416
417	void timerfd_clock_was_set(void)
418	{
419		struct timerfd_ctx *ctx;
420		unsigned long flags;
421
422		rcu_read_lock();
423		list_for_each_entry_rcu(ctx, &cancel_list, clist) {
424			if (!ctx->might_cancel)
425				continue;
426			spin_lock_irqsave(&ctx->wqh.lock, flags);
427			if (ctx->moffs != ktime_mono_to_real(0)) {
428				ctx->moffs = KTIME_MAX;
429				ctx->ticks++;
430				wake_up_locked_poll(&ctx->wqh, EPOLLIN);
431			}
432			spin_unlock_irqrestore(&ctx->wqh.lock, flags);
433		}
434		rcu_read_unlock();
435	}
436
437The key point here is, because RCU-traversal of the ``cancel_list`` happens
438while objects are being added and removed to the list, sometimes the traversal
439can step on an object that has been removed from the list. In this example, it
440is seen that it is better to skip such objects using a flag.
441
442
443Summary
444-------
445
446Read-mostly list-based data structures that can tolerate stale data are
447the most amenable to use of RCU.  The simplest case is where entries are
448either added or deleted from the data structure (or atomically modified
449in place), but non-atomic in-place modifications can be handled by making
450a copy, updating the copy, then replacing the original with the copy.
451If stale data cannot be tolerated, then a *deleted* flag may be used
452in conjunction with a per-entry spinlock in order to allow the search
453function to reject newly deleted data.
454
455.. _quick_quiz_answer:
456
457Answer to Quick Quiz:
458	For the deleted-flag technique to be helpful, why is it necessary
459	to hold the per-entry lock while returning from the search function?
460
461	If the search function drops the per-entry lock before returning,
462	then the caller will be processing stale data in any case.  If it
463	is really OK to be processing stale data, then you don't need a
464	*deleted* flag.  If processing stale data really is a problem,
465	then you need to hold the per-entry lock across all of the code
466	that uses the value that was returned.
467
468:ref:`Back to Quick Quiz <quick_quiz>`
469