xref: /freebsd/sys/contrib/ck/include/ck_ec.h (revision 5ca8e32633c4ffbbcd6762e5888b6a4ba0708c6c)
1 /*
2  * Copyright 2018 Paul Khuong, Google LLC.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  */
26 
27 /*
28  * Overview
29  * ========
30  *
31  * ck_ec implements 32- and 64- bit event counts. Event counts let us
32  * easily integrate OS-level blocking (e.g., futexes) in lock-free
33  * protocols. Waiters block conditionally, if the event count's value
34  * is still equal to some old value.
35  *
36  * Event counts come in four variants: 32 and 64 bit (with one bit
37  * stolen for internal signaling, so 31 and 63 bit counters), and
38  * single or multiple producers (wakers). Waiters are always multiple
39  * consumers. The 32 bit variants are smaller, and more efficient,
40  * especially in single producer mode. The 64 bit variants are larger,
41  * but practically invulnerable to ABA.
42  *
43  * The 32 bit variant is always available. The 64 bit variant is only
44  * available if CK supports 64-bit atomic operations. Currently,
45  * specialization for single producer is only implemented for x86 and
46  * x86-64, on compilers that support GCC extended inline assembly;
47  * other platforms fall back to the multiple producer code path.
48  *
49  * A typical usage pattern is:
50  *
51  *  1. On the producer side:
52  *
53  *    - Make changes to some shared data structure, without involving
54  *	the event count at all.
55  *    - After each change, call ck_ec_inc on the event count. The call
56  *	acts as a write-write barrier, and wakes up any consumer blocked
57  *	on the event count (waiting for new changes).
58  *
59  *  2. On the consumer side:
60  *
61  *    - Snapshot ck_ec_value of the event count. The call acts as a
62  *	read barrier.
63  *    - Read and process the shared data structure.
64  *    - Wait for new changes by calling ck_ec_wait with the snapshot value.
65  *
66  * Some data structures may opt for tighter integration with their
67  * event count. For example, an SPMC ring buffer or disruptor might
68  * use the event count's value as the write pointer. If the buffer is
69  * regularly full, it might also make sense to store the read pointer
70  * in an MP event count.
71  *
72  * This event count implementation supports tighter integration in two
73  * ways.
74  *
75  * Producers may opt to increment by an arbitrary value (less than
76  * INT32_MAX / INT64_MAX), in order to encode, e.g., byte
77  * offsets. Larger increment values make wraparound more likely, so
78  * the increments should still be relatively small.
79  *
80  * Consumers may pass a predicate to ck_ec_wait_pred. This predicate
81  * can make `ck_ec_wait_pred` return early, before the event count's
82  * value changes, and can override the deadline passed to futex_wait.
83  * This lets consumer block on one eventcount, while optimistically
84  * looking at other waking conditions.
85  *
86  * API Reference
87  * =============
88  *
89  * When compiled as C11 or later, this header defines type-generic
90  * macros for ck_ec32 and ck_ec64; the reference describes this
91  * type-generic API.
92  *
93  * ck_ec needs additional OS primitives to determine the current time,
94  * to wait on an address, and to wake all threads waiting on a given
95  * address. These are defined with fields in a struct ck_ec_ops.  Each
96  * ck_ec_ops may additionally define the number of spin loop
97  * iterations in the slow path, as well as the initial wait time in
98  * the internal exponential backoff, the exponential scale factor, and
99  * the right shift count (< 32).
100  *
101  * The ops, in addition to the single/multiple producer flag, are
102  * encapsulated in a struct ck_ec_mode, passed to most ck_ec
103  * operations.
104  *
105  * ec is a struct ck_ec32 *, or a struct ck_ec64 *.
106  *
107  * value is an uint32_t for ck_ec32, and an uint64_t for ck_ec64. It
108  * never exceeds INT32_MAX and INT64_MAX respectively.
109  *
110  * mode is a struct ck_ec_mode *.
111  *
112  * deadline is either NULL, or a `const struct timespec *` that will
113  * be treated as an absolute deadline.
114  *
115  * `void ck_ec_init(ec, value)`: initializes the event count to value.
116  *
117  * `value ck_ec_value(ec)`: returns the current value of the event
118  *  counter.  This read acts as a read (acquire) barrier.
119  *
120  * `bool ck_ec_has_waiters(ec)`: returns whether some thread has
121  *  marked the event count as requiring an OS wakeup.
122  *
123  * `void ck_ec_inc(ec, mode)`: increments the value of the event
124  *  counter by one. This writes acts as a write barrier. Wakes up
125  *  any waiting thread.
126  *
127  * `value ck_ec_add(ec, mode, value)`: increments the event counter by
128  *  `value`, and returns the event counter's previous value. This
129  *  write acts as a write barrier. Wakes up any waiting thread.
130  *
131  * `int ck_ec_deadline(struct timespec *new_deadline,
132  *		       mode,
133  *		       const struct timespec *timeout)`:
134  *  computes a deadline `timeout` away from the current time. If
135  *  timeout is NULL, computes a deadline in the infinite future. The
136  *  resulting deadline is written to `new_deadline`. Returns 0 on
137  *  success, and -1 if ops->gettime failed (without touching errno).
138  *
139  * `int ck_ec_wait(ec, mode, value, deadline)`: waits until the event
140  *  counter's value differs from `value`, or, if `deadline` is
141  *  provided and non-NULL, until the current time is after that
142  *  deadline. Use a deadline with tv_sec = 0 for a non-blocking
143  *  execution. Returns 0 if the event counter has changed, and -1 on
144  *  timeout. This function acts as a read (acquire) barrier.
145  *
146  * `int ck_ec_wait_pred(ec, mode, value, pred, data, deadline)`: waits
147  * until the event counter's value differs from `value`, or until
148  * `pred` returns non-zero, or, if `deadline` is provided and
149  * non-NULL, until the current time is after that deadline. Use a
150  * deadline with tv_sec = 0 for a non-blocking execution. Returns 0 if
151  * the event counter has changed, `pred`'s return value if non-zero,
152  * and -1 on timeout. This function acts as a read (acquire) barrier.
153  *
154  * `pred` is always called as `pred(data, iteration_deadline, now)`,
155  * where `iteration_deadline` is a timespec of the deadline for this
156  * exponential backoff iteration, and `now` is the current time. If
157  * `pred` returns a non-zero value, that value is immediately returned
158  * to the waiter. Otherwise, `pred` is free to modify
159  * `iteration_deadline` (moving it further in the future is a bad
160  * idea).
161  *
162  * Implementation notes
163  * ====================
164  *
165  * The multiple producer implementation is a regular locked event
166  * count, with a single flag bit to denote the need to wake up waiting
167  * threads.
168  *
169  * The single producer specialization is heavily tied to
170  * [x86-TSO](https://www.cl.cam.ac.uk/~pes20/weakmemory/cacm.pdf), and
171  * to non-atomic read-modify-write instructions (e.g., `inc mem`);
172  * these non-atomic RMW let us write to the same memory locations with
173  * atomic and non-atomic instructions, without suffering from process
174  * scheduling stalls.
175  *
176  * The reason we can mix atomic and non-atomic writes to the `counter`
177  * word is that every non-atomic write obviates the need for the
178  * atomically flipped flag bit: we only use non-atomic writes to
179  * update the event count, and the atomic flag only informs the
180  * producer that we would like a futex_wake, because of the update.
181  * We only require the non-atomic RMW counter update to prevent
182  * preemption from introducing arbitrarily long worst case delays.
183  *
184  * Correctness does not rely on the usual ordering argument: in the
185  * absence of fences, there is no strict ordering between atomic and
186  * non-atomic writes. The key is instead x86-TSO's guarantee that a
187  * read is satisfied from the most recent buffered write in the local
188  * store queue if there is one, or from memory if there is no write to
189  * that address in the store queue.
190  *
191  * x86-TSO's constraint on reads suffices to guarantee that the
192  * producer will never forget about a counter update. If the last
193  * update is still queued, the new update will be based on the queued
194  * value. Otherwise, the new update will be based on the value in
195  * memory, which may or may not have had its flag flipped. In either
196  * case, the value of the counter (modulo flag) is correct.
197  *
198  * When the producer forwards the counter's value from its store
199  * queue, the new update might not preserve a flag flip. Any waiter
200  * thus has to check from time to time to determine if it wasn't
201  * woken up because the flag bit was silently cleared.
202  *
203  * In reality, the store queue in x86-TSO stands for in-flight
204  * instructions in the chip's out-of-order backend. In the vast
205  * majority of cases, instructions will only remain in flight for a
206  * few hundred or thousand of cycles. That's why ck_ec_wait spins on
207  * the `counter` word for ~100 iterations after flipping its flag bit:
208  * if the counter hasn't changed after that many iterations, it is
209  * very likely that the producer's next counter update will observe
210  * the flag flip.
211  *
212  * That's still not a hard guarantee of correctness. Conservatively,
213  * we can expect that no instruction will remain in flight for more
214  * than 1 second... if only because some interrupt will have forced
215  * the chip to store its architectural state in memory, at which point
216  * an instruction is either fully retired or rolled back. Interrupts,
217  * particularly the pre-emption timer, are why single-producer updates
218  * must happen in a single non-atomic read-modify-write instruction.
219  * Having a single instruction as the critical section means we only
220  * have to consider the worst-case execution time for that
221  * instruction. That's easier than doing the same for a pair of
222  * instructions, which an unlucky pre-emption could delay for
223  * arbitrarily long.
224  *
225  * Thus, after a short spin loop, ck_ec_wait enters an exponential
226  * backoff loop, where each "sleep" is instead a futex_wait.  The
227  * backoff is only necessary to handle rare cases where the flag flip
228  * was overwritten after the spin loop. Eventually, more than one
229  * second will have elapsed since the flag flip, and the sleep timeout
230  * becomes infinite: since the flag bit has been set for much longer
231  * than the time for which an instruction may remain in flight, the
232  * flag will definitely be observed at the next counter update.
233  *
234  * The 64 bit ck_ec_wait pulls another trick: futexes only handle 32
235  * bit ints, so we must treat the 64 bit counter's low 32 bits as an
236  * int in futex_wait. That's a bit dodgy, but fine in practice, given
237  * that the OS's futex code will always read whatever value is
238  * currently in memory: even if the producer thread were to wait on
239  * its own event count, the syscall and ring transition would empty
240  * the store queue (the out-of-order execution backend).
241  *
242  * Finally, what happens when the producer is migrated to another core
243  * or otherwise pre-empted? Migration must already incur a barrier, so
244  * that thread always sees its own writes, so that's safe. As for
245  * pre-emption, that requires storing the architectural state, which
246  * means every instruction must either be executed fully or not at
247  * all when pre-emption happens.
248  */
249 
250 #ifndef CK_EC_H
251 #define CK_EC_H
252 #include <ck_cc.h>
253 #include <ck_pr.h>
254 #include <ck_stdbool.h>
255 #include <ck_stdint.h>
256 #include <ck_stddef.h>
257 #include <sys/time.h>
258 
259 /*
260  * If we have ck_pr_faa_64 (and, presumably, ck_pr_load_64), we
261  * support 63 bit counters.
262  */
263 #ifdef CK_F_PR_FAA_64
264 #define CK_F_EC64
265 #endif /* CK_F_PR_FAA_64 */
266 
267 /*
268  * GCC inline assembly lets us exploit non-atomic read-modify-write
269  * instructions on x86/x86_64 for a fast single-producer mode.
270  *
271  * If we CK_F_EC_SP is not defined, CK_EC always uses the slower
272  * multiple producer code.
273  */
274 #if defined(__GNUC__) && (defined(__i386__) || defined(__x86_64__))
275 #define CK_F_EC_SP
276 #endif /* GNUC && (__i386__ || __x86_64__) */
277 
278 struct ck_ec_ops;
279 
280 struct ck_ec_wait_state {
281 	struct timespec start;	/* Time when we entered ck_ec_wait. */
282 	struct timespec now;  /* Time now. */
283 	const struct ck_ec_ops *ops;
284 	void *data;  /* Opaque pointer for the predicate's internal state. */
285 
286 };
287 
288 /*
289  * ck_ec_ops define system-specific functions to get the current time,
290  * atomically wait on an address if it still has some expected value,
291  * and to wake all threads waiting on an address.
292  *
293  * Each platform is expected to have few (one) opaque pointer to a
294  * const ops struct, and reuse it for all ck_ec_mode structs.
295  */
296 struct ck_ec_ops {
297 	/* Populates out with the current time. Returns non-zero on failure. */
298 	int (*gettime)(const struct ck_ec_ops *, struct timespec *out);
299 
300 	/*
301 	 * Waits on address if its value is still `expected`.  If
302 	 * deadline is non-NULL, stops waiting once that deadline is
303 	 * reached. May return early for any reason.
304 	 */
305 	void (*wait32)(const struct ck_ec_wait_state *, const uint32_t *,
306 		       uint32_t expected, const struct timespec *deadline);
307 
308 	/*
309 	 * Same as wait32, but for a 64 bit counter. Only used if
310 	 * CK_F_EC64 is defined.
311 	 *
312 	 * If underlying blocking primitive only supports 32 bit
313 	 * control words, it should be safe to block on the least
314 	 * significant half of the 64 bit address.
315 	 */
316 	void (*wait64)(const struct ck_ec_wait_state *, const uint64_t *,
317 		       uint64_t expected, const struct timespec *deadline);
318 
319 	/* Wakes up all threads waiting on address. */
320 	void (*wake32)(const struct ck_ec_ops *, const uint32_t *address);
321 
322 	/*
323 	 * Same as wake32, but for a 64 bit counter. Only used if
324 	 * CK_F_EC64 is defined.
325 	 *
326 	 * When wait64 truncates the control word at address to `only`
327 	 * consider its least significant half, wake64 should perform
328 	 * any necessary fixup (e.g., on big endian platforms).
329 	 */
330 	void (*wake64)(const struct ck_ec_ops *, const uint64_t *address);
331 
332 	/*
333 	 * Number of iterations for the initial busy wait. 0 defaults
334 	 * to 100 (not ABI stable).
335 	 */
336 	uint32_t busy_loop_iter;
337 
338 	/*
339 	 * Delay in nanoseconds for the first iteration of the
340 	 * exponential backoff. 0 defaults to 2 ms (not ABI stable).
341 	 */
342 	uint32_t initial_wait_ns;
343 
344 	/*
345 	 * Scale factor for the exponential backoff. 0 defaults to 8x
346 	 * (not ABI stable).
347 	 */
348 	uint32_t wait_scale_factor;
349 
350 	/*
351 	 * Right shift count for the exponential backoff. The update
352 	 * after each iteration is
353 	 *     wait_ns = (wait_ns * wait_scale_factor) >> wait_shift_count,
354 	 * until one second has elapsed. After that, the deadline goes
355 	 * to infinity.
356 	 */
357 	uint32_t wait_shift_count;
358 };
359 
360 /*
361  * ck_ec_mode wraps the ops table, and informs the fast path whether
362  * it should attempt to specialize for single producer mode.
363  *
364  * mode structs are expected to be exposed by value, e.g.,
365  *
366  *    extern const struct ck_ec_ops system_ec_ops;
367  *
368  *    static const struct ck_ec_mode ec_sp = {
369  *	  .ops = &system_ec_ops,
370  *	  .single_producer = true
371  *    };
372  *
373  *    static const struct ck_ec_mode ec_mp = {
374  *	  .ops = &system_ec_ops,
375  *	  .single_producer = false
376  *    };
377  *
378  * ck_ec_mode structs are only passed to inline functions defined in
379  * this header, and never escape to their slow paths, so they should
380  * not result in any object file size increase.
381  */
382 struct ck_ec_mode {
383 	const struct ck_ec_ops *ops;
384 	/*
385 	 * If single_producer is true, the event count has a unique
386 	 * incrementer. The implementation will specialize ck_ec_inc
387 	 * and ck_ec_add if possible (if CK_F_EC_SP is defined).
388 	 */
389 	bool single_producer;
390 };
391 
392 struct ck_ec32 {
393 	/* Flag is "sign" bit, value in bits 0:30. */
394 	uint32_t counter;
395 };
396 
397 typedef struct ck_ec32 ck_ec32_t;
398 
399 #ifdef CK_F_EC64
400 struct ck_ec64 {
401 	/*
402 	 * Flag is bottom bit, value in bits 1:63. Eventcount only
403 	 * works on x86-64 (i.e., little endian), so the futex int
404 	 * lies in the first 4 (bottom) bytes.
405 	 */
406 	uint64_t counter;
407 };
408 
409 typedef struct ck_ec64 ck_ec64_t;
410 #endif /* CK_F_EC64 */
411 
412 #define CK_EC_INITIALIZER { .counter = 0 }
413 
414 /*
415  * Initializes the event count to `value`. The value must not
416  * exceed INT32_MAX.
417  */
418 static void ck_ec32_init(struct ck_ec32 *ec, uint32_t value);
419 
420 #ifndef CK_F_EC64
421 #define ck_ec_init ck_ec32_init
422 #else
423 /*
424  * Initializes the event count to `value`. The value must not
425  * exceed INT64_MAX.
426  */
427 static void ck_ec64_init(struct ck_ec64 *ec, uint64_t value);
428 
429 #if __STDC_VERSION__ >= 201112L
430 #define ck_ec_init(EC, VALUE)				\
431 	(_Generic(*(EC),				\
432 		  struct ck_ec32 : ck_ec32_init,	\
433 		  struct ck_ec64 : ck_ec64_init)((EC), (VALUE)))
434 #endif /* __STDC_VERSION__ */
435 #endif /* CK_F_EC64 */
436 
437 /*
438  * Returns the counter value in the event count. The value is at most
439  * INT32_MAX.
440  */
441 static uint32_t ck_ec32_value(const struct ck_ec32* ec);
442 
443 #ifndef CK_F_EC64
444 #define ck_ec_value ck_ec32_value
445 #else
446 /*
447  * Returns the counter value in the event count. The value is at most
448  * INT64_MAX.
449  */
450 static uint64_t ck_ec64_value(const struct ck_ec64* ec);
451 
452 #if __STDC_VERSION__ >= 201112L
453 #define ck_ec_value(EC)					\
454 	(_Generic(*(EC),				\
455 		  struct ck_ec32 : ck_ec32_value,	\
456 		struct ck_ec64 : ck_ec64_value)((EC)))
457 #endif /* __STDC_VERSION__ */
458 #endif /* CK_F_EC64 */
459 
460 /*
461  * Returns whether there may be slow pathed waiters that need an
462  * explicit OS wakeup for this event count.
463  */
464 static bool ck_ec32_has_waiters(const struct ck_ec32 *ec);
465 
466 #ifndef CK_F_EC64
467 #define ck_ec_has_waiters ck_ec32_has_waiters
468 #else
469 static bool ck_ec64_has_waiters(const struct ck_ec64 *ec);
470 
471 #if __STDC_VERSION__ >= 201112L
472 #define ck_ec_has_waiters(EC)				      \
473 	(_Generic(*(EC),				      \
474 		  struct ck_ec32 : ck_ec32_has_waiters,	      \
475 		  struct ck_ec64 : ck_ec64_has_waiters)((EC)))
476 #endif /* __STDC_VERSION__ */
477 #endif /* CK_F_EC64 */
478 
479 /*
480  * Increments the counter value in the event count by one, and wakes
481  * up any waiter.
482  */
483 static void ck_ec32_inc(struct ck_ec32 *ec, const struct ck_ec_mode *mode);
484 
485 #ifndef CK_F_EC64
486 #define ck_ec_inc ck_ec32_inc
487 #else
488 static void ck_ec64_inc(struct ck_ec64 *ec, const struct ck_ec_mode *mode);
489 
490 #if __STDC_VERSION__ >= 201112L
491 #define ck_ec_inc(EC, MODE)					\
492 	(_Generic(*(EC),					\
493 		  struct ck_ec32 : ck_ec32_inc,			\
494 		  struct ck_ec64 : ck_ec64_inc)((EC), (MODE)))
495 #endif /* __STDC_VERSION__ */
496 #endif /* CK_F_EC64 */
497 
498 /*
499  * Increments the counter value in the event count by delta, wakes
500  * up any waiter, and returns the previous counter value.
501  */
502 static uint32_t ck_ec32_add(struct ck_ec32 *ec,
503 			    const struct ck_ec_mode *mode,
504 			    uint32_t delta);
505 
506 #ifndef CK_F_EC64
507 #define ck_ec_add ck_ec32_add
508 #else
509 static uint64_t ck_ec64_add(struct ck_ec64 *ec,
510 			    const struct ck_ec_mode *mode,
511 			    uint64_t delta);
512 
513 #if __STDC_VERSION__ >= 201112L
514 #define ck_ec_add(EC, MODE, DELTA)					\
515 	(_Generic(*(EC),						\
516 		  struct ck_ec32 : ck_ec32_add,				\
517 		  struct ck_ec64 : ck_ec64_add)((EC), (MODE), (DELTA)))
518 #endif /* __STDC_VERSION__ */
519 #endif /* CK_F_EC64 */
520 
521 /*
522  * Populates `new_deadline` with a deadline `timeout` in the future.
523  * Returns 0 on success, and -1 if clock_gettime failed, in which
524  * case errno is left as is.
525  */
526 static int ck_ec_deadline(struct timespec *new_deadline,
527 			  const struct ck_ec_mode *mode,
528 			  const struct timespec *timeout);
529 
530 /*
531  * Waits until the counter value in the event count differs from
532  * old_value, or, if deadline is non-NULL, until CLOCK_MONOTONIC is
533  * past the deadline.
534  *
535  * Returns 0 on success, and -1 on timeout.
536  */
537 static int ck_ec32_wait(struct ck_ec32 *ec,
538 			const struct ck_ec_mode *mode,
539 			uint32_t old_value,
540 			const struct timespec *deadline);
541 
542 #ifndef CK_F_EC64
543 #define ck_ec_wait ck_ec32_wait
544 #else
545 static int ck_ec64_wait(struct ck_ec64 *ec,
546 			const struct ck_ec_mode *mode,
547 			uint64_t old_value,
548 			const struct timespec *deadline);
549 
550 #if __STDC_VERSION__ >= 201112L
551 #define ck_ec_wait(EC, MODE, OLD_VALUE, DEADLINE)			\
552 	(_Generic(*(EC),						\
553 		  struct ck_ec32 : ck_ec32_wait,			\
554 		  struct ck_ec64 : ck_ec64_wait)((EC), (MODE),		\
555 						 (OLD_VALUE), (DEADLINE)))
556 
557 #endif /* __STDC_VERSION__ */
558 #endif /* CK_F_EC64 */
559 
560 /*
561  * Waits until the counter value in the event count differs from
562  * old_value, pred returns non-zero, or, if deadline is non-NULL,
563  * until CLOCK_MONOTONIC is past the deadline.
564  *
565  * Returns 0 on success, -1 on timeout, and the return value of pred
566  * if it returns non-zero.
567  *
568  * A NULL pred represents a function that always returns 0.
569  */
570 static int ck_ec32_wait_pred(struct ck_ec32 *ec,
571 			     const struct ck_ec_mode *mode,
572 			     uint32_t old_value,
573 			     int (*pred)(const struct ck_ec_wait_state *,
574 					 struct timespec *deadline),
575 			     void *data,
576 			     const struct timespec *deadline);
577 
578 #ifndef CK_F_EC64
579 #define ck_ec_wait_pred ck_ec32_wait_pred
580 #else
581 static int ck_ec64_wait_pred(struct ck_ec64 *ec,
582 			     const struct ck_ec_mode *mode,
583 			     uint64_t old_value,
584 			     int (*pred)(const struct ck_ec_wait_state *,
585 					 struct timespec *deadline),
586 			     void *data,
587 			     const struct timespec *deadline);
588 
589 #if __STDC_VERSION__ >= 201112L
590 #define ck_ec_wait_pred(EC, MODE, OLD_VALUE, PRED, DATA, DEADLINE)	\
591 	(_Generic(*(EC),						\
592 		  struct ck_ec32 : ck_ec32_wait_pred,			\
593 		  struct ck_ec64 : ck_ec64_wait_pred)			\
594 	 ((EC), (MODE), (OLD_VALUE), (PRED), (DATA), (DEADLINE)))
595 #endif /* __STDC_VERSION__ */
596 #endif /* CK_F_EC64 */
597 
598 /*
599  * Inline implementation details. 32 bit first, then 64 bit
600  * conditionally.
601  */
602 CK_CC_FORCE_INLINE void ck_ec32_init(struct ck_ec32 *ec, uint32_t value)
603 {
604 	ec->counter = value & ~(1UL << 31);
605 	return;
606 }
607 
608 CK_CC_FORCE_INLINE uint32_t ck_ec32_value(const struct ck_ec32 *ec)
609 {
610 	uint32_t ret = ck_pr_load_32(&ec->counter) & ~(1UL << 31);
611 
612 	ck_pr_fence_acquire();
613 	return ret;
614 }
615 
616 CK_CC_FORCE_INLINE bool ck_ec32_has_waiters(const struct ck_ec32 *ec)
617 {
618 	return ck_pr_load_32(&ec->counter) & (1UL << 31);
619 }
620 
621 /* Slow path for ck_ec{32,64}_{inc,add} */
622 void ck_ec32_wake(struct ck_ec32 *ec, const struct ck_ec_ops *ops);
623 
624 CK_CC_FORCE_INLINE void ck_ec32_inc(struct ck_ec32 *ec,
625 				    const struct ck_ec_mode *mode)
626 {
627 #if !defined(CK_F_EC_SP)
628 	/* Nothing to specialize if we don't have EC_SP. */
629 	ck_ec32_add(ec, mode, 1);
630 	return;
631 #else
632 	char flagged;
633 
634 #if __GNUC__ >= 6
635 	/*
636 	 * We don't want to wake if the sign bit is 0. We do want to
637 	 * wake if the sign bit just flipped from 1 to 0. We don't
638 	 * care what happens when our increment caused the sign bit to
639 	 * flip from 0 to 1 (that's once per 2^31 increment).
640 	 *
641 	 * This leaves us with four cases:
642 	 *
643 	 *  old sign bit | new sign bit | SF | OF | ZF
644 	 *  -------------------------------------------
645 	 *	       0 |	      0 |  0 |	0 | ?
646 	 *	       0 |	      1 |  1 |	0 | ?
647 	 *	       1 |	      1 |  1 |	0 | ?
648 	 *	       1 |	      0 |  0 |	0 | 1
649 	 *
650 	 * In the first case, we don't want to hit ck_ec32_wake. In
651 	 * the last two cases, we do want to call ck_ec32_wake. In the
652 	 * second case, we don't care, so we arbitrarily choose to
653 	 * call ck_ec32_wake.
654 	 *
655 	 * The "le" condition checks if SF != OF, or ZF == 1, which
656 	 * meets our requirements.
657 	 */
658 #define CK_EC32_INC_ASM(PREFIX)					\
659 	__asm__ volatile(PREFIX " incl %0"		    \
660 			 : "+m"(ec->counter), "=@ccle"(flagged)	 \
661 			 :: "cc", "memory")
662 #else
663 #define CK_EC32_INC_ASM(PREFIX)						\
664 	__asm__ volatile(PREFIX " incl %0; setle %1"			\
665 			 : "+m"(ec->counter), "=r"(flagged)		\
666 			 :: "cc", "memory")
667 #endif /* __GNUC__ */
668 
669 	if (mode->single_producer == true) {
670 		ck_pr_fence_store();
671 		CK_EC32_INC_ASM("");
672 	} else {
673 		ck_pr_fence_store_atomic();
674 		CK_EC32_INC_ASM("lock");
675 	}
676 #undef CK_EC32_INC_ASM
677 
678 	if (CK_CC_UNLIKELY(flagged)) {
679 		ck_ec32_wake(ec, mode->ops);
680 	}
681 
682 	return;
683 #endif /* CK_F_EC_SP */
684 }
685 
686 CK_CC_FORCE_INLINE uint32_t ck_ec32_add_epilogue(struct ck_ec32 *ec,
687 						 const struct ck_ec_mode *mode,
688 						 uint32_t old)
689 {
690 	const uint32_t flag_mask = 1U << 31;
691 	uint32_t ret;
692 
693 	ret = old & ~flag_mask;
694 	/* These two only differ if the flag bit is set. */
695 	if (CK_CC_UNLIKELY(old != ret)) {
696 		ck_ec32_wake(ec, mode->ops);
697 	}
698 
699 	return ret;
700 }
701 
702 static CK_CC_INLINE uint32_t ck_ec32_add_mp(struct ck_ec32 *ec,
703 					    const struct ck_ec_mode *mode,
704 					    uint32_t delta)
705 {
706 	uint32_t old;
707 
708 	ck_pr_fence_store_atomic();
709 	old = ck_pr_faa_32(&ec->counter, delta);
710 	return ck_ec32_add_epilogue(ec, mode, old);
711 }
712 
713 #ifdef CK_F_EC_SP
714 static CK_CC_INLINE uint32_t ck_ec32_add_sp(struct ck_ec32 *ec,
715 					    const struct ck_ec_mode *mode,
716 					    uint32_t delta)
717 {
718 	uint32_t old;
719 
720 	/*
721 	 * Correctness of this racy write depends on actually
722 	 * having an update to write. Exit here if the update
723 	 * is a no-op.
724 	 */
725 	if (CK_CC_UNLIKELY(delta == 0)) {
726 		return ck_ec32_value(ec);
727 	}
728 
729 	ck_pr_fence_store();
730 	old = delta;
731 	__asm__ volatile("xaddl %1, %0"
732 			 : "+m"(ec->counter), "+r"(old)
733 			 :: "cc", "memory");
734 	return ck_ec32_add_epilogue(ec, mode, old);
735 }
736 #endif /* CK_F_EC_SP */
737 
738 CK_CC_FORCE_INLINE uint32_t ck_ec32_add(struct ck_ec32 *ec,
739 					const struct ck_ec_mode *mode,
740 					uint32_t delta)
741 {
742 #ifdef CK_F_EC_SP
743 	if (mode->single_producer == true) {
744 		return ck_ec32_add_sp(ec, mode, delta);
745 	}
746 #endif
747 
748 	return ck_ec32_add_mp(ec, mode, delta);
749 }
750 
751 int ck_ec_deadline_impl(struct timespec *new_deadline,
752 			const struct ck_ec_ops *ops,
753 			const struct timespec *timeout);
754 
755 CK_CC_FORCE_INLINE int ck_ec_deadline(struct timespec *new_deadline,
756 				      const struct ck_ec_mode *mode,
757 				      const struct timespec *timeout)
758 {
759 	return ck_ec_deadline_impl(new_deadline, mode->ops, timeout);
760 }
761 
762 
763 int ck_ec32_wait_slow(struct ck_ec32 *ec,
764 		      const struct ck_ec_ops *ops,
765 		      uint32_t old_value,
766 		      const struct timespec *deadline);
767 
768 CK_CC_FORCE_INLINE int ck_ec32_wait(struct ck_ec32 *ec,
769 				    const struct ck_ec_mode *mode,
770 				    uint32_t old_value,
771 				    const struct timespec *deadline)
772 {
773 	if (ck_ec32_value(ec) != old_value) {
774 		return 0;
775 	}
776 
777 	return ck_ec32_wait_slow(ec, mode->ops, old_value, deadline);
778 }
779 
780 int ck_ec32_wait_pred_slow(struct ck_ec32 *ec,
781 			   const struct ck_ec_ops *ops,
782 			   uint32_t old_value,
783 			   int (*pred)(const struct ck_ec_wait_state *state,
784 				       struct timespec *deadline),
785 			   void *data,
786 			   const struct timespec *deadline);
787 
788 CK_CC_FORCE_INLINE int
789 ck_ec32_wait_pred(struct ck_ec32 *ec,
790 		  const struct ck_ec_mode *mode,
791 		  uint32_t old_value,
792 		  int (*pred)(const struct ck_ec_wait_state *state,
793 			      struct timespec *deadline),
794 		  void *data,
795 		  const struct timespec *deadline)
796 {
797 	if (ck_ec32_value(ec) != old_value) {
798 		return 0;
799 	}
800 
801 	return ck_ec32_wait_pred_slow(ec, mode->ops, old_value,
802 				      pred, data, deadline);
803 }
804 
805 #ifdef CK_F_EC64
806 CK_CC_FORCE_INLINE void ck_ec64_init(struct ck_ec64 *ec, uint64_t value)
807 {
808 	ec->counter = value << 1;
809 	return;
810 }
811 
812 CK_CC_FORCE_INLINE uint64_t ck_ec64_value(const struct ck_ec64 *ec)
813 {
814 	uint64_t ret = ck_pr_load_64(&ec->counter) >> 1;
815 
816 	ck_pr_fence_acquire();
817 	return ret;
818 }
819 
820 CK_CC_FORCE_INLINE bool ck_ec64_has_waiters(const struct ck_ec64 *ec)
821 {
822 	return ck_pr_load_64(&ec->counter) & 1;
823 }
824 
825 void ck_ec64_wake(struct ck_ec64 *ec, const struct ck_ec_ops *ops);
826 
827 CK_CC_FORCE_INLINE void ck_ec64_inc(struct ck_ec64 *ec,
828 				    const struct ck_ec_mode *mode)
829 {
830 	/* We always xadd, so there's no special optimization here. */
831 	(void)ck_ec64_add(ec, mode, 1);
832 	return;
833 }
834 
835 CK_CC_FORCE_INLINE uint64_t ck_ec_add64_epilogue(struct ck_ec64 *ec,
836 					       const struct ck_ec_mode *mode,
837 					       uint64_t old)
838 {
839 	uint64_t ret = old >> 1;
840 
841 	if (CK_CC_UNLIKELY(old & 1)) {
842 		ck_ec64_wake(ec, mode->ops);
843 	}
844 
845 	return ret;
846 }
847 
848 static CK_CC_INLINE uint64_t ck_ec64_add_mp(struct ck_ec64 *ec,
849 					    const struct ck_ec_mode *mode,
850 					    uint64_t delta)
851 {
852 	uint64_t inc = 2 * delta;  /* The low bit is the flag bit. */
853 
854 	ck_pr_fence_store_atomic();
855 	return ck_ec_add64_epilogue(ec, mode, ck_pr_faa_64(&ec->counter, inc));
856 }
857 
858 #ifdef CK_F_EC_SP
859 /* Single-producer specialisation. */
860 static CK_CC_INLINE uint64_t ck_ec64_add_sp(struct ck_ec64 *ec,
861 					    const struct ck_ec_mode *mode,
862 					    uint64_t delta)
863 {
864 	uint64_t old;
865 
866 	/*
867 	 * Correctness of this racy write depends on actually
868 	 * having an update to write. Exit here if the update
869 	 * is a no-op.
870 	 */
871 	if (CK_CC_UNLIKELY(delta == 0)) {
872 		return ck_ec64_value(ec);
873 	}
874 
875 	ck_pr_fence_store();
876 	old = 2 * delta;  /* The low bit is the flag bit. */
877 	__asm__ volatile("xaddq %1, %0"
878 			 : "+m"(ec->counter), "+r"(old)
879 			 :: "cc", "memory");
880 	return ck_ec_add64_epilogue(ec, mode, old);
881 }
882 #endif /* CK_F_EC_SP */
883 
884 /*
885  * Dispatch on mode->single_producer in this FORCE_INLINE function:
886  * the end result is always small, but not all compilers have enough
887  * foresight to inline and get the reduction.
888  */
889 CK_CC_FORCE_INLINE uint64_t ck_ec64_add(struct ck_ec64 *ec,
890 					const struct ck_ec_mode *mode,
891 					uint64_t delta)
892 {
893 #ifdef CK_F_EC_SP
894 	if (mode->single_producer == true) {
895 		return ck_ec64_add_sp(ec, mode, delta);
896 	}
897 #endif
898 
899 	return ck_ec64_add_mp(ec, mode, delta);
900 }
901 
902 int ck_ec64_wait_slow(struct ck_ec64 *ec,
903 		      const struct ck_ec_ops *ops,
904 		      uint64_t old_value,
905 		      const struct timespec *deadline);
906 
907 CK_CC_FORCE_INLINE int ck_ec64_wait(struct ck_ec64 *ec,
908 				    const struct ck_ec_mode *mode,
909 				    uint64_t old_value,
910 				    const struct timespec *deadline)
911 {
912 	if (ck_ec64_value(ec) != old_value) {
913 		return 0;
914 	}
915 
916 	return ck_ec64_wait_slow(ec, mode->ops, old_value, deadline);
917 }
918 
919 int ck_ec64_wait_pred_slow(struct ck_ec64 *ec,
920 			   const struct ck_ec_ops *ops,
921 			   uint64_t old_value,
922 			   int (*pred)(const struct ck_ec_wait_state *state,
923 				       struct timespec *deadline),
924 			   void *data,
925 			   const struct timespec *deadline);
926 
927 
928 CK_CC_FORCE_INLINE int
929 ck_ec64_wait_pred(struct ck_ec64 *ec,
930 		  const struct ck_ec_mode *mode,
931 		  uint64_t old_value,
932 		  int (*pred)(const struct ck_ec_wait_state *state,
933 			      struct timespec *deadline),
934 		  void *data,
935 		  const struct timespec *deadline)
936 {
937 	if (ck_ec64_value(ec) != old_value) {
938 		return 0;
939 	}
940 
941 	return ck_ec64_wait_pred_slow(ec, mode->ops, old_value,
942 				      pred, data, deadline);
943 }
944 #endif /* CK_F_EC64 */
945 #endif /* !CK_EC_H */
946