xref: /linux/Documentation/RCU/listRCU.rst (revision eb01fe7abbe2d0b38824d2a93fdb4cc3eaf2ccc1)
1.. _list_rcu_doc:
2
3Using RCU to Protect Read-Mostly Linked Lists
4=============================================
5
6One of the most common uses of RCU is protecting read-mostly linked lists
7(``struct list_head`` in list.h).  One big advantage of this approach is
8that all of the required memory ordering is provided by the list macros.
9This document describes several list-based RCU use cases.
10
11When iterating a list while holding the rcu_read_lock(), writers may
12modify the list.  The reader is guaranteed to see all of the elements
13which were added to the list before they acquired the rcu_read_lock()
14and are still on the list when they drop the rcu_read_unlock().
15Elements which are added to, or removed from the list may or may not
16be seen.  If the writer calls list_replace_rcu(), the reader may see
17either the old element or the new element; they will not see both,
18nor will they see neither.
19
20
21Example 1: Read-mostly list: Deferred Destruction
22-------------------------------------------------
23
24A widely used usecase for RCU lists in the kernel is lockless iteration over
25all processes in the system. ``task_struct::tasks`` represents the list node that
26links all the processes. The list can be traversed in parallel to any list
27additions or removals.
28
29The traversal of the list is done using ``for_each_process()`` which is defined
30by the 2 macros::
31
32	#define next_task(p) \
33		list_entry_rcu((p)->tasks.next, struct task_struct, tasks)
34
35	#define for_each_process(p) \
36		for (p = &init_task ; (p = next_task(p)) != &init_task ; )
37
38The code traversing the list of all processes typically looks like::
39
40	rcu_read_lock();
41	for_each_process(p) {
42		/* Do something with p */
43	}
44	rcu_read_unlock();
45
46The simplified and heavily inlined code for removing a process from a
47task list is::
48
49	void release_task(struct task_struct *p)
50	{
51		write_lock(&tasklist_lock);
52		list_del_rcu(&p->tasks);
53		write_unlock(&tasklist_lock);
54		call_rcu(&p->rcu, delayed_put_task_struct);
55	}
56
57When a process exits, ``release_task()`` calls ``list_del_rcu(&p->tasks)``
58via __exit_signal() and __unhash_process() under ``tasklist_lock``
59writer lock protection.  The list_del_rcu() invocation removes
60the task from the list of all tasks. The ``tasklist_lock``
61prevents concurrent list additions/removals from corrupting the
62list. Readers using ``for_each_process()`` are not protected with the
63``tasklist_lock``. To prevent readers from noticing changes in the list
64pointers, the ``task_struct`` object is freed only after one or more
65grace periods elapse, with the help of call_rcu(), which is invoked via
66put_task_struct_rcu_user(). This deferring of destruction ensures that
67any readers traversing the list will see valid ``p->tasks.next`` pointers
68and deletion/freeing can happen in parallel with traversal of the list.
69This pattern is also called an **existence lock**, since RCU refrains
70from invoking the delayed_put_task_struct() callback function until
71all existing readers finish, which guarantees that the ``task_struct``
72object in question will remain in existence until after the completion
73of all RCU readers that might possibly have a reference to that object.
74
75
76Example 2: Read-Side Action Taken Outside of Lock: No In-Place Updates
77----------------------------------------------------------------------
78
79Some reader-writer locking use cases compute a value while holding
80the read-side lock, but continue to use that value after that lock is
81released.  These use cases are often good candidates for conversion
82to RCU.  One prominent example involves network packet routing.
83Because the packet-routing data tracks the state of equipment outside
84of the computer, it will at times contain stale data.  Therefore, once
85the route has been computed, there is no need to hold the routing table
86static during transmission of the packet.  After all, you can hold the
87routing table static all you want, but that won't keep the external
88Internet from changing, and it is the state of the external Internet
89that really matters.  In addition, routing entries are typically added
90or deleted, rather than being modified in place.  This is a rare example
91of the finite speed of light and the non-zero size of atoms actually
92helping make synchronization be lighter weight.
93
94A straightforward example of this type of RCU use case may be found in
95the system-call auditing support.  For example, a reader-writer locked
96implementation of ``audit_filter_task()`` might be as follows::
97
98	static enum audit_state audit_filter_task(struct task_struct *tsk, char **key)
99	{
100		struct audit_entry *e;
101		enum audit_state   state;
102
103		read_lock(&auditsc_lock);
104		/* Note: audit_filter_mutex held by caller. */
105		list_for_each_entry(e, &audit_tsklist, list) {
106			if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
107				if (state == AUDIT_STATE_RECORD)
108					*key = kstrdup(e->rule.filterkey, GFP_ATOMIC);
109				read_unlock(&auditsc_lock);
110				return state;
111			}
112		}
113		read_unlock(&auditsc_lock);
114		return AUDIT_BUILD_CONTEXT;
115	}
116
117Here the list is searched under the lock, but the lock is dropped before
118the corresponding value is returned.  By the time that this value is acted
119on, the list may well have been modified.  This makes sense, since if
120you are turning auditing off, it is OK to audit a few extra system calls.
121
122This means that RCU can be easily applied to the read side, as follows::
123
124	static enum audit_state audit_filter_task(struct task_struct *tsk, char **key)
125	{
126		struct audit_entry *e;
127		enum audit_state   state;
128
129		rcu_read_lock();
130		/* Note: audit_filter_mutex held by caller. */
131		list_for_each_entry_rcu(e, &audit_tsklist, list) {
132			if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
133				if (state == AUDIT_STATE_RECORD)
134					*key = kstrdup(e->rule.filterkey, GFP_ATOMIC);
135				rcu_read_unlock();
136				return state;
137			}
138		}
139		rcu_read_unlock();
140		return AUDIT_BUILD_CONTEXT;
141	}
142
143The read_lock() and read_unlock() calls have become rcu_read_lock()
144and rcu_read_unlock(), respectively, and the list_for_each_entry()
145has become list_for_each_entry_rcu().  The **_rcu()** list-traversal
146primitives add READ_ONCE() and diagnostic checks for incorrect use
147outside of an RCU read-side critical section.
148
149The changes to the update side are also straightforward. A reader-writer lock
150might be used as follows for deletion and insertion in these simplified
151versions of audit_del_rule() and audit_add_rule()::
152
153	static inline int audit_del_rule(struct audit_rule *rule,
154					 struct list_head *list)
155	{
156		struct audit_entry *e;
157
158		write_lock(&auditsc_lock);
159		list_for_each_entry(e, list, list) {
160			if (!audit_compare_rule(rule, &e->rule)) {
161				list_del(&e->list);
162				write_unlock(&auditsc_lock);
163				return 0;
164			}
165		}
166		write_unlock(&auditsc_lock);
167		return -EFAULT;		/* No matching rule */
168	}
169
170	static inline int audit_add_rule(struct audit_entry *entry,
171					 struct list_head *list)
172	{
173		write_lock(&auditsc_lock);
174		if (entry->rule.flags & AUDIT_PREPEND) {
175			entry->rule.flags &= ~AUDIT_PREPEND;
176			list_add(&entry->list, list);
177		} else {
178			list_add_tail(&entry->list, list);
179		}
180		write_unlock(&auditsc_lock);
181		return 0;
182	}
183
184Following are the RCU equivalents for these two functions::
185
186	static inline int audit_del_rule(struct audit_rule *rule,
187					 struct list_head *list)
188	{
189		struct audit_entry *e;
190
191		/* No need to use the _rcu iterator here, since this is the only
192		 * deletion routine. */
193		list_for_each_entry(e, list, list) {
194			if (!audit_compare_rule(rule, &e->rule)) {
195				list_del_rcu(&e->list);
196				call_rcu(&e->rcu, audit_free_rule);
197				return 0;
198			}
199		}
200		return -EFAULT;		/* No matching rule */
201	}
202
203	static inline int audit_add_rule(struct audit_entry *entry,
204					 struct list_head *list)
205	{
206		if (entry->rule.flags & AUDIT_PREPEND) {
207			entry->rule.flags &= ~AUDIT_PREPEND;
208			list_add_rcu(&entry->list, list);
209		} else {
210			list_add_tail_rcu(&entry->list, list);
211		}
212		return 0;
213	}
214
215Normally, the write_lock() and write_unlock() would be replaced by a
216spin_lock() and a spin_unlock(). But in this case, all callers hold
217``audit_filter_mutex``, so no additional locking is required. The
218auditsc_lock can therefore be eliminated, since use of RCU eliminates the
219need for writers to exclude readers.
220
221The list_del(), list_add(), and list_add_tail() primitives have been
222replaced by list_del_rcu(), list_add_rcu(), and list_add_tail_rcu().
223The **_rcu()** list-manipulation primitives add memory barriers that are
224needed on weakly ordered CPUs.  The list_del_rcu() primitive omits the
225pointer poisoning debug-assist code that would otherwise cause concurrent
226readers to fail spectacularly.
227
228So, when readers can tolerate stale data and when entries are either added or
229deleted, without in-place modification, it is very easy to use RCU!
230
231
232Example 3: Handling In-Place Updates
233------------------------------------
234
235The system-call auditing code does not update auditing rules in place.  However,
236if it did, the reader-writer-locked code to do so might look as follows
237(assuming only ``field_count`` is updated, otherwise, the added fields would
238need to be filled in)::
239
240	static inline int audit_upd_rule(struct audit_rule *rule,
241					 struct list_head *list,
242					 __u32 newaction,
243					 __u32 newfield_count)
244	{
245		struct audit_entry *e;
246		struct audit_entry *ne;
247
248		write_lock(&auditsc_lock);
249		/* Note: audit_filter_mutex held by caller. */
250		list_for_each_entry(e, list, list) {
251			if (!audit_compare_rule(rule, &e->rule)) {
252				e->rule.action = newaction;
253				e->rule.field_count = newfield_count;
254				write_unlock(&auditsc_lock);
255				return 0;
256			}
257		}
258		write_unlock(&auditsc_lock);
259		return -EFAULT;		/* No matching rule */
260	}
261
262The RCU version creates a copy, updates the copy, then replaces the old
263entry with the newly updated entry.  This sequence of actions, allowing
264concurrent reads while making a copy to perform an update, is what gives
265RCU (*read-copy update*) its name.
266
267The RCU version of audit_upd_rule() is as follows::
268
269	static inline int audit_upd_rule(struct audit_rule *rule,
270					 struct list_head *list,
271					 __u32 newaction,
272					 __u32 newfield_count)
273	{
274		struct audit_entry *e;
275		struct audit_entry *ne;
276
277		list_for_each_entry(e, list, list) {
278			if (!audit_compare_rule(rule, &e->rule)) {
279				ne = kmalloc(sizeof(*entry), GFP_ATOMIC);
280				if (ne == NULL)
281					return -ENOMEM;
282				audit_copy_rule(&ne->rule, &e->rule);
283				ne->rule.action = newaction;
284				ne->rule.field_count = newfield_count;
285				list_replace_rcu(&e->list, &ne->list);
286				call_rcu(&e->rcu, audit_free_rule);
287				return 0;
288			}
289		}
290		return -EFAULT;		/* No matching rule */
291	}
292
293Again, this assumes that the caller holds ``audit_filter_mutex``.  Normally, the
294writer lock would become a spinlock in this sort of code.
295
296The update_lsm_rule() does something very similar, for those who would
297prefer to look at real Linux-kernel code.
298
299Another use of this pattern can be found in the openswitch driver's *connection
300tracking table* code in ``ct_limit_set()``.  The table holds connection tracking
301entries and has a limit on the maximum entries.  There is one such table
302per-zone and hence one *limit* per zone.  The zones are mapped to their limits
303through a hashtable using an RCU-managed hlist for the hash chains. When a new
304limit is set, a new limit object is allocated and ``ct_limit_set()`` is called
305to replace the old limit object with the new one using list_replace_rcu().
306The old limit object is then freed after a grace period using kfree_rcu().
307
308
309Example 4: Eliminating Stale Data
310---------------------------------
311
312The auditing example above tolerates stale data, as do most algorithms
313that are tracking external state.  After all, given there is a delay
314from the time the external state changes before Linux becomes aware
315of the change, and so as noted earlier, a small quantity of additional
316RCU-induced staleness is generally not a problem.
317
318However, there are many examples where stale data cannot be tolerated.
319One example in the Linux kernel is the System V IPC (see the shm_lock()
320function in ipc/shm.c).  This code checks a *deleted* flag under a
321per-entry spinlock, and, if the *deleted* flag is set, pretends that the
322entry does not exist.  For this to be helpful, the search function must
323return holding the per-entry spinlock, as shm_lock() does in fact do.
324
325.. _quick_quiz:
326
327Quick Quiz:
328	For the deleted-flag technique to be helpful, why is it necessary
329	to hold the per-entry lock while returning from the search function?
330
331:ref:`Answer to Quick Quiz <quick_quiz_answer>`
332
333If the system-call audit module were to ever need to reject stale data, one way
334to accomplish this would be to add a ``deleted`` flag and a ``lock`` spinlock to the
335``audit_entry`` structure, and modify audit_filter_task() as follows::
336
337	static enum audit_state audit_filter_task(struct task_struct *tsk)
338	{
339		struct audit_entry *e;
340		enum audit_state   state;
341
342		rcu_read_lock();
343		list_for_each_entry_rcu(e, &audit_tsklist, list) {
344			if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
345				spin_lock(&e->lock);
346				if (e->deleted) {
347					spin_unlock(&e->lock);
348					rcu_read_unlock();
349					return AUDIT_BUILD_CONTEXT;
350				}
351				rcu_read_unlock();
352				if (state == AUDIT_STATE_RECORD)
353					*key = kstrdup(e->rule.filterkey, GFP_ATOMIC);
354				return state;
355			}
356		}
357		rcu_read_unlock();
358		return AUDIT_BUILD_CONTEXT;
359	}
360
361The ``audit_del_rule()`` function would need to set the ``deleted`` flag under the
362spinlock as follows::
363
364	static inline int audit_del_rule(struct audit_rule *rule,
365					 struct list_head *list)
366	{
367		struct audit_entry *e;
368
369		/* No need to use the _rcu iterator here, since this
370		 * is the only deletion routine. */
371		list_for_each_entry(e, list, list) {
372			if (!audit_compare_rule(rule, &e->rule)) {
373				spin_lock(&e->lock);
374				list_del_rcu(&e->list);
375				e->deleted = 1;
376				spin_unlock(&e->lock);
377				call_rcu(&e->rcu, audit_free_rule);
378				return 0;
379			}
380		}
381		return -EFAULT;		/* No matching rule */
382	}
383
384This too assumes that the caller holds ``audit_filter_mutex``.
385
386Note that this example assumes that entries are only added and deleted.
387Additional mechanism is required to deal correctly with the update-in-place
388performed by audit_upd_rule().  For one thing, audit_upd_rule() would
389need to hold the locks of both the old ``audit_entry`` and its replacement
390while executing the list_replace_rcu().
391
392
393Example 5: Skipping Stale Objects
394---------------------------------
395
396For some use cases, reader performance can be improved by skipping
397stale objects during read-side list traversal, where stale objects
398are those that will be removed and destroyed after one or more grace
399periods. One such example can be found in the timerfd subsystem. When a
400``CLOCK_REALTIME`` clock is reprogrammed (for example due to setting
401of the system time) then all programmed ``timerfds`` that depend on
402this clock get triggered and processes waiting on them are awakened in
403advance of their scheduled expiry. To facilitate this, all such timers
404are added to an RCU-managed ``cancel_list`` when they are setup in
405``timerfd_setup_cancel()``::
406
407	static void timerfd_setup_cancel(struct timerfd_ctx *ctx, int flags)
408	{
409		spin_lock(&ctx->cancel_lock);
410		if ((ctx->clockid == CLOCK_REALTIME ||
411		     ctx->clockid == CLOCK_REALTIME_ALARM) &&
412		    (flags & TFD_TIMER_ABSTIME) && (flags & TFD_TIMER_CANCEL_ON_SET)) {
413			if (!ctx->might_cancel) {
414				ctx->might_cancel = true;
415				spin_lock(&cancel_lock);
416				list_add_rcu(&ctx->clist, &cancel_list);
417				spin_unlock(&cancel_lock);
418			}
419		} else {
420			__timerfd_remove_cancel(ctx);
421		}
422		spin_unlock(&ctx->cancel_lock);
423	}
424
425When a timerfd is freed (fd is closed), then the ``might_cancel``
426flag of the timerfd object is cleared, the object removed from the
427``cancel_list`` and destroyed, as shown in this simplified and inlined
428version of timerfd_release()::
429
430	int timerfd_release(struct inode *inode, struct file *file)
431	{
432		struct timerfd_ctx *ctx = file->private_data;
433
434		spin_lock(&ctx->cancel_lock);
435		if (ctx->might_cancel) {
436			ctx->might_cancel = false;
437			spin_lock(&cancel_lock);
438			list_del_rcu(&ctx->clist);
439			spin_unlock(&cancel_lock);
440		}
441		spin_unlock(&ctx->cancel_lock);
442
443		if (isalarm(ctx))
444			alarm_cancel(&ctx->t.alarm);
445		else
446			hrtimer_cancel(&ctx->t.tmr);
447		kfree_rcu(ctx, rcu);
448		return 0;
449	}
450
451If the ``CLOCK_REALTIME`` clock is set, for example by a time server, the
452hrtimer framework calls ``timerfd_clock_was_set()`` which walks the
453``cancel_list`` and wakes up processes waiting on the timerfd. While iterating
454the ``cancel_list``, the ``might_cancel`` flag is consulted to skip stale
455objects::
456
457	void timerfd_clock_was_set(void)
458	{
459		ktime_t moffs = ktime_mono_to_real(0);
460		struct timerfd_ctx *ctx;
461		unsigned long flags;
462
463		rcu_read_lock();
464		list_for_each_entry_rcu(ctx, &cancel_list, clist) {
465			if (!ctx->might_cancel)
466				continue;
467			spin_lock_irqsave(&ctx->wqh.lock, flags);
468			if (ctx->moffs != moffs) {
469				ctx->moffs = KTIME_MAX;
470				ctx->ticks++;
471				wake_up_locked_poll(&ctx->wqh, EPOLLIN);
472			}
473			spin_unlock_irqrestore(&ctx->wqh.lock, flags);
474		}
475		rcu_read_unlock();
476	}
477
478The key point is that because RCU-protected traversal of the
479``cancel_list`` happens concurrently with object addition and removal,
480sometimes the traversal can access an object that has been removed from
481the list. In this example, a flag is used to skip such objects.
482
483
484Summary
485-------
486
487Read-mostly list-based data structures that can tolerate stale data are
488the most amenable to use of RCU.  The simplest case is where entries are
489either added or deleted from the data structure (or atomically modified
490in place), but non-atomic in-place modifications can be handled by making
491a copy, updating the copy, then replacing the original with the copy.
492If stale data cannot be tolerated, then a *deleted* flag may be used
493in conjunction with a per-entry spinlock in order to allow the search
494function to reject newly deleted data.
495
496.. _quick_quiz_answer:
497
498Answer to Quick Quiz:
499	For the deleted-flag technique to be helpful, why is it necessary
500	to hold the per-entry lock while returning from the search function?
501
502	If the search function drops the per-entry lock before returning,
503	then the caller will be processing stale data in any case.  If it
504	is really OK to be processing stale data, then you don't need a
505	*deleted* flag.  If processing stale data really is a problem,
506	then you need to hold the per-entry lock across all of the code
507	that uses the value that was returned.
508
509:ref:`Back to Quick Quiz <quick_quiz>`
510