xref: /linux/drivers/md/dm-vdo/dedupe.c (revision a3a02a52bcfcbcc4a637d4b68bf1bc391c9fad02)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 /**
7  * DOC:
8  *
9  * Hash Locks:
10  *
11  * A hash_lock controls and coordinates writing, index access, and dedupe among groups of data_vios
12  * concurrently writing identical blocks, allowing them to deduplicate not only against advice but
13  * also against each other. This saves on index queries and allows those data_vios to concurrently
14  * deduplicate against a single block instead of being serialized through a PBN read lock. Only one
15  * index query is needed for each hash_lock, instead of one for every data_vio.
16  *
17  * Hash_locks are assigned to hash_zones by computing a modulus on the hash itself. Each hash_zone
18  * has a single dedicated queue and thread for performing all operations on the hash_locks assigned
19  * to that zone. The concurrency guarantees of this single-threaded model allow the code to omit
20  * more fine-grained locking for the hash_lock structures.
21  *
22  * A hash_lock acts like a state machine perhaps more than as a lock. Other than the starting and
23  * ending states INITIALIZING and BYPASSING, every state represents and is held for the duration of
24  * an asynchronous operation. All state transitions are performed on the thread of the hash_zone
25  * containing the lock. An asynchronous operation is almost always performed upon entering a state,
26  * and the callback from that operation triggers exiting the state and entering a new state.
27  *
28  * In all states except DEDUPING, there is a single data_vio, called the lock agent, performing the
29  * asynchronous operations on behalf of the lock. The agent will change during the lifetime of the
30  * lock if the lock is shared by more than one data_vio. data_vios waiting to deduplicate are kept
31  * on a wait queue. Viewed a different way, the agent holds the lock exclusively until the lock
32  * enters the DEDUPING state, at which point it becomes a shared lock that all the waiters (and any
33  * new data_vios that arrive) use to share a PBN lock. In state DEDUPING, there is no agent. When
34  * the last data_vio in the lock calls back in DEDUPING, it becomes the agent and the lock becomes
35  * exclusive again. New data_vios that arrive in the lock will also go on the wait queue.
36  *
37  * The existence of lock waiters is a key factor controlling which state the lock transitions to
38  * next. When the lock is new or has waiters, it will always try to reach DEDUPING, and when it
39  * doesn't, it will try to clean up and exit.
40  *
41  * Deduping requires holding a PBN lock on a block that is known to contain data identical to the
42  * data_vios in the lock, so the lock will send the agent to the duplicate zone to acquire the PBN
43  * lock (LOCKING), to the kernel I/O threads to read and verify the data (VERIFYING), or to write a
44  * new copy of the data to a full data block or a slot in a compressed block (WRITING).
45  *
46  * Cleaning up consists of updating the index when the data location is different from the initial
47  * index query (UPDATING, triggered by stale advice, compression, and rollover), releasing the PBN
48  * lock on the duplicate block (UNLOCKING), and if the agent is the last data_vio referencing the
49  * lock, releasing the hash_lock itself back to the hash zone (BYPASSING).
50  *
51  * The shortest sequence of states is for non-concurrent writes of new data:
52  *   INITIALIZING -> QUERYING -> WRITING -> BYPASSING
53  * This sequence is short because no PBN read lock or index update is needed.
54  *
55  * Non-concurrent, finding valid advice looks like this (endpoints elided):
56  *   -> QUERYING -> LOCKING -> VERIFYING -> DEDUPING -> UNLOCKING ->
57  * Or with stale advice (endpoints elided):
58  *   -> QUERYING -> LOCKING -> VERIFYING -> UNLOCKING -> WRITING -> UPDATING ->
59  *
60  * When there are not enough available reference count increments available on a PBN for a data_vio
61  * to deduplicate, a new lock is forked and the excess waiters roll over to the new lock (which
62  * goes directly to WRITING). The new lock takes the place of the old lock in the lock map so new
63  * data_vios will be directed to it. The two locks will proceed independently, but only the new
64  * lock will have the right to update the index (unless it also forks).
65  *
66  * Since rollover happens in a lock instance, once a valid data location has been selected, it will
67  * not change. QUERYING and WRITING are only performed once per lock lifetime. All other
68  * non-endpoint states can be re-entered.
69  *
70  * The function names in this module follow a convention referencing the states and transitions in
71  * the state machine. For example, for the LOCKING state, there are start_locking() and
72  * finish_locking() functions.  start_locking() is invoked by the finish function of the state (or
73  * states) that transition to LOCKING. It performs the actual lock state change and must be invoked
74  * on the hash zone thread.  finish_locking() is called by (or continued via callback from) the
75  * code actually obtaining the lock. It does any bookkeeping or decision-making required and
76  * invokes the appropriate start function of the state being transitioned to after LOCKING.
77  *
78  * ----------------------------------------------------------------------
79  *
80  * Index Queries:
81  *
82  * A query to the UDS index is handled asynchronously by the index's threads. When the query is
83  * complete, a callback supplied with the query will be called from one of the those threads. Under
84  * heavy system load, the index may be slower to respond than is desirable for reasonable I/O
85  * throughput. Since deduplication of writes is not necessary for correct operation of a VDO
86  * device, it is acceptable to timeout out slow index queries and proceed to fulfill a write
87  * request without deduplicating. However, because the uds_request struct itself is supplied by the
88  * caller, we can not simply reuse a uds_request object which we have chosen to timeout. Hence,
89  * each hash_zone maintains a pool of dedupe_contexts which each contain a uds_request along with a
90  * reference to the data_vio on behalf of which they are performing a query.
91  *
92  * When a hash_lock needs to query the index, it attempts to acquire an unused dedupe_context from
93  * its hash_zone's pool. If one is available, that context is prepared, associated with the
94  * hash_lock's agent, added to the list of pending contexts, and then sent to the index. The
95  * context's state will be transitioned from DEDUPE_CONTEXT_IDLE to DEDUPE_CONTEXT_PENDING. If all
96  * goes well, the dedupe callback will be called by the index which will change the context's state
97  * to DEDUPE_CONTEXT_COMPLETE, and the associated data_vio will be enqueued to run back in the hash
98  * zone where the query results will be processed and the context will be put back in the idle
99  * state and returned to the hash_zone's available list.
100  *
101  * The first time an index query is launched from a given hash_zone, a timer is started. When the
102  * timer fires, the hash_zone's completion is enqueued to run in the hash_zone where the zone's
103  * pending list will be searched for any contexts in the pending state which have been running for
104  * too long. Those contexts are transitioned to the DEDUPE_CONTEXT_TIMED_OUT state and moved to the
105  * zone's timed_out list where they won't be examined again if there is a subsequent time out). The
106  * data_vios associated with timed out contexts are sent to continue processing their write
107  * operation without deduplicating. The timer is also restarted.
108  *
109  * When the dedupe callback is run for a context which is in the timed out state, that context is
110  * moved to the DEDUPE_CONTEXT_TIMED_OUT_COMPLETE state. No other action need be taken as the
111  * associated data_vios have already been dispatched.
112  *
113  * If a hash_lock needs a dedupe context, and the available list is empty, the timed_out list will
114  * be searched for any contexts which are timed out and complete. One of these will be used
115  * immediately, and the rest will be returned to the available list and marked idle.
116  */
117 
118 #include "dedupe.h"
119 
120 #include <linux/atomic.h>
121 #include <linux/jiffies.h>
122 #include <linux/kernel.h>
123 #include <linux/list.h>
124 #include <linux/ratelimit.h>
125 #include <linux/spinlock.h>
126 #include <linux/timer.h>
127 
128 #include "logger.h"
129 #include "memory-alloc.h"
130 #include "numeric.h"
131 #include "permassert.h"
132 #include "string-utils.h"
133 
134 #include "indexer.h"
135 
136 #include "action-manager.h"
137 #include "admin-state.h"
138 #include "completion.h"
139 #include "constants.h"
140 #include "data-vio.h"
141 #include "int-map.h"
142 #include "io-submitter.h"
143 #include "packer.h"
144 #include "physical-zone.h"
145 #include "slab-depot.h"
146 #include "statistics.h"
147 #include "types.h"
148 #include "vdo.h"
149 #include "wait-queue.h"
150 
151 #define DEDUPE_QUERY_TIMER_IDLE 0
152 #define DEDUPE_QUERY_TIMER_RUNNING 1
153 #define DEDUPE_QUERY_TIMER_FIRED 2
154 
155 enum dedupe_context_state {
156 	DEDUPE_CONTEXT_IDLE,
157 	DEDUPE_CONTEXT_PENDING,
158 	DEDUPE_CONTEXT_TIMED_OUT,
159 	DEDUPE_CONTEXT_COMPLETE,
160 	DEDUPE_CONTEXT_TIMED_OUT_COMPLETE,
161 };
162 
163 /* Possible index states: closed, opened, or transitioning between those two. */
164 enum index_state {
165 	IS_CLOSED,
166 	IS_CHANGING,
167 	IS_OPENED,
168 };
169 
170 static const char *CLOSED = "closed";
171 static const char *CLOSING = "closing";
172 static const char *ERROR = "error";
173 static const char *OFFLINE = "offline";
174 static const char *ONLINE = "online";
175 static const char *OPENING = "opening";
176 static const char *SUSPENDED = "suspended";
177 static const char *UNKNOWN = "unknown";
178 
179 /* Version 2 uses the kernel space UDS index and is limited to 16 bytes */
180 #define UDS_ADVICE_VERSION 2
181 /* version byte + state byte + 64-bit little-endian PBN */
182 #define UDS_ADVICE_SIZE (1 + 1 + sizeof(u64))
183 
184 enum hash_lock_state {
185 	/* State for locks that are not in use or are being initialized. */
186 	VDO_HASH_LOCK_INITIALIZING,
187 
188 	/* This is the sequence of states typically used on the non-dedupe path. */
189 	VDO_HASH_LOCK_QUERYING,
190 	VDO_HASH_LOCK_WRITING,
191 	VDO_HASH_LOCK_UPDATING,
192 
193 	/* The remaining states are typically used on the dedupe path in this order. */
194 	VDO_HASH_LOCK_LOCKING,
195 	VDO_HASH_LOCK_VERIFYING,
196 	VDO_HASH_LOCK_DEDUPING,
197 	VDO_HASH_LOCK_UNLOCKING,
198 
199 	/*
200 	 * Terminal state for locks returning to the pool. Must be last both because it's the final
201 	 * state, and also because it's used to count the states.
202 	 */
203 	VDO_HASH_LOCK_BYPASSING,
204 };
205 
206 static const char * const LOCK_STATE_NAMES[] = {
207 	[VDO_HASH_LOCK_BYPASSING] = "BYPASSING",
208 	[VDO_HASH_LOCK_DEDUPING] = "DEDUPING",
209 	[VDO_HASH_LOCK_INITIALIZING] = "INITIALIZING",
210 	[VDO_HASH_LOCK_LOCKING] = "LOCKING",
211 	[VDO_HASH_LOCK_QUERYING] = "QUERYING",
212 	[VDO_HASH_LOCK_UNLOCKING] = "UNLOCKING",
213 	[VDO_HASH_LOCK_UPDATING] = "UPDATING",
214 	[VDO_HASH_LOCK_VERIFYING] = "VERIFYING",
215 	[VDO_HASH_LOCK_WRITING] = "WRITING",
216 };
217 
218 struct hash_lock {
219 	/* The block hash covered by this lock */
220 	struct uds_record_name hash;
221 
222 	/* When the lock is unused, this list entry allows the lock to be pooled */
223 	struct list_head pool_node;
224 
225 	/*
226 	 * A list containing the data VIOs sharing this lock, all having the same record name and
227 	 * data block contents, linked by their hash_lock_node fields.
228 	 */
229 	struct list_head duplicate_ring;
230 
231 	/* The number of data_vios sharing this lock instance */
232 	data_vio_count_t reference_count;
233 
234 	/* The maximum value of reference_count in the lifetime of this lock */
235 	data_vio_count_t max_references;
236 
237 	/* The current state of this lock */
238 	enum hash_lock_state state;
239 
240 	/* True if the UDS index should be updated with new advice */
241 	bool update_advice;
242 
243 	/* True if the advice has been verified to be a true duplicate */
244 	bool verified;
245 
246 	/* True if the lock has already accounted for an initial verification */
247 	bool verify_counted;
248 
249 	/* True if this lock is registered in the lock map (cleared on rollover) */
250 	bool registered;
251 
252 	/*
253 	 * If verified is false, this is the location of a possible duplicate. If verified is true,
254 	 * it is the verified location of a true duplicate.
255 	 */
256 	struct zoned_pbn duplicate;
257 
258 	/* The PBN lock on the block containing the duplicate data */
259 	struct pbn_lock *duplicate_lock;
260 
261 	/* The data_vio designated to act on behalf of the lock */
262 	struct data_vio *agent;
263 
264 	/*
265 	 * Other data_vios with data identical to the agent who are currently waiting for the agent
266 	 * to get the information they all need to deduplicate--either against each other, or
267 	 * against an existing duplicate on disk.
268 	 */
269 	struct vdo_wait_queue waiters;
270 };
271 
272 #define LOCK_POOL_CAPACITY MAXIMUM_VDO_USER_VIOS
273 
274 struct hash_zones {
275 	struct action_manager *manager;
276 	struct uds_parameters parameters;
277 	struct uds_index_session *index_session;
278 	struct ratelimit_state ratelimiter;
279 	atomic64_t timeouts;
280 	atomic64_t dedupe_context_busy;
281 
282 	/* This spinlock protects the state fields and the starting of dedupe requests. */
283 	spinlock_t lock;
284 
285 	/* The fields in the next block are all protected by the lock */
286 	struct vdo_completion completion;
287 	enum index_state index_state;
288 	enum index_state index_target;
289 	struct admin_state state;
290 	bool changing;
291 	bool create_flag;
292 	bool dedupe_flag;
293 	bool error_flag;
294 	u64 reported_timeouts;
295 
296 	/* The number of zones */
297 	zone_count_t zone_count;
298 	/* The hash zones themselves */
299 	struct hash_zone zones[];
300 };
301 
302 /* These are in milliseconds. */
303 unsigned int vdo_dedupe_index_timeout_interval = 5000;
304 unsigned int vdo_dedupe_index_min_timer_interval = 100;
305 /* Same two variables, in jiffies for easier consumption. */
306 static u64 vdo_dedupe_index_timeout_jiffies;
307 static u64 vdo_dedupe_index_min_timer_jiffies;
308 
309 static inline struct hash_zone *as_hash_zone(struct vdo_completion *completion)
310 {
311 	vdo_assert_completion_type(completion, VDO_HASH_ZONE_COMPLETION);
312 	return container_of(completion, struct hash_zone, completion);
313 }
314 
315 static inline struct hash_zones *as_hash_zones(struct vdo_completion *completion)
316 {
317 	vdo_assert_completion_type(completion, VDO_HASH_ZONES_COMPLETION);
318 	return container_of(completion, struct hash_zones, completion);
319 }
320 
321 static inline void assert_in_hash_zone(struct hash_zone *zone, const char *name)
322 {
323 	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == zone->thread_id),
324 			    "%s called on hash zone thread", name);
325 }
326 
327 static inline bool change_context_state(struct dedupe_context *context, int old, int new)
328 {
329 	return (atomic_cmpxchg(&context->state, old, new) == old);
330 }
331 
332 static inline bool change_timer_state(struct hash_zone *zone, int old, int new)
333 {
334 	return (atomic_cmpxchg(&zone->timer_state, old, new) == old);
335 }
336 
337 /**
338  * return_hash_lock_to_pool() - (Re)initialize a hash lock and return it to its pool.
339  * @zone: The zone from which the lock was borrowed.
340  * @lock: The lock that is no longer in use.
341  */
342 static void return_hash_lock_to_pool(struct hash_zone *zone, struct hash_lock *lock)
343 {
344 	memset(lock, 0, sizeof(*lock));
345 	INIT_LIST_HEAD(&lock->pool_node);
346 	INIT_LIST_HEAD(&lock->duplicate_ring);
347 	vdo_waitq_init(&lock->waiters);
348 	list_add_tail(&lock->pool_node, &zone->lock_pool);
349 }
350 
351 /**
352  * vdo_get_duplicate_lock() - Get the PBN lock on the duplicate data location for a data_vio from
353  *                            the hash_lock the data_vio holds (if there is one).
354  * @data_vio: The data_vio to query.
355  *
356  * Return: The PBN lock on the data_vio's duplicate location.
357  */
358 struct pbn_lock *vdo_get_duplicate_lock(struct data_vio *data_vio)
359 {
360 	if (data_vio->hash_lock == NULL)
361 		return NULL;
362 
363 	return data_vio->hash_lock->duplicate_lock;
364 }
365 
366 /**
367  * hash_lock_key() - Return hash_lock's record name as a hash code.
368  * @lock: The hash lock.
369  *
370  * Return: The key to use for the int map.
371  */
372 static inline u64 hash_lock_key(struct hash_lock *lock)
373 {
374 	return get_unaligned_le64(&lock->hash.name);
375 }
376 
377 /**
378  * get_hash_lock_state_name() - Get the string representation of a hash lock state.
379  * @state: The hash lock state.
380  *
381  * Return: The short string representing the state
382  */
383 static const char *get_hash_lock_state_name(enum hash_lock_state state)
384 {
385 	/* Catch if a state has been added without updating the name array. */
386 	BUILD_BUG_ON((VDO_HASH_LOCK_BYPASSING + 1) != ARRAY_SIZE(LOCK_STATE_NAMES));
387 	return (state < ARRAY_SIZE(LOCK_STATE_NAMES)) ? LOCK_STATE_NAMES[state] : "INVALID";
388 }
389 
390 /**
391  * assert_hash_lock_agent() - Assert that a data_vio is the agent of its hash lock, and that this
392  *                            is being called in the hash zone.
393  * @data_vio: The data_vio expected to be the lock agent.
394  * @where: A string describing the function making the assertion.
395  */
396 static void assert_hash_lock_agent(struct data_vio *data_vio, const char *where)
397 {
398 	/* Not safe to access the agent field except from the hash zone. */
399 	assert_data_vio_in_hash_zone(data_vio);
400 	VDO_ASSERT_LOG_ONLY(data_vio == data_vio->hash_lock->agent,
401 			    "%s must be for the hash lock agent", where);
402 }
403 
404 /**
405  * set_duplicate_lock() - Set the duplicate lock held by a hash lock. May only be called in the
406  *                        physical zone of the PBN lock.
407  * @hash_lock: The hash lock to update.
408  * @pbn_lock: The PBN read lock to use as the duplicate lock.
409  */
410 static void set_duplicate_lock(struct hash_lock *hash_lock, struct pbn_lock *pbn_lock)
411 {
412 	VDO_ASSERT_LOG_ONLY((hash_lock->duplicate_lock == NULL),
413 			    "hash lock must not already hold a duplicate lock");
414 	pbn_lock->holder_count += 1;
415 	hash_lock->duplicate_lock = pbn_lock;
416 }
417 
418 /**
419  * dequeue_lock_waiter() - Remove the first data_vio from the lock's waitq and return it.
420  * @lock: The lock containing the wait queue.
421  *
422  * Return: The first (oldest) waiter in the queue, or NULL if the queue is empty.
423  */
424 static inline struct data_vio *dequeue_lock_waiter(struct hash_lock *lock)
425 {
426 	return vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock->waiters));
427 }
428 
429 /**
430  * set_hash_lock() - Set, change, or clear the hash lock a data_vio is using.
431  * @data_vio: The data_vio to update.
432  * @new_lock: The hash lock the data_vio is joining.
433  *
434  * Updates the hash lock (or locks) to reflect the change in membership.
435  */
436 static void set_hash_lock(struct data_vio *data_vio, struct hash_lock *new_lock)
437 {
438 	struct hash_lock *old_lock = data_vio->hash_lock;
439 
440 	if (old_lock != NULL) {
441 		VDO_ASSERT_LOG_ONLY(data_vio->hash_zone != NULL,
442 				    "must have a hash zone when holding a hash lock");
443 		VDO_ASSERT_LOG_ONLY(!list_empty(&data_vio->hash_lock_entry),
444 				    "must be on a hash lock ring when holding a hash lock");
445 		VDO_ASSERT_LOG_ONLY(old_lock->reference_count > 0,
446 				    "hash lock reference must be counted");
447 
448 		if ((old_lock->state != VDO_HASH_LOCK_BYPASSING) &&
449 		    (old_lock->state != VDO_HASH_LOCK_UNLOCKING)) {
450 			/*
451 			 * If the reference count goes to zero in a non-terminal state, we're most
452 			 * likely leaking this lock.
453 			 */
454 			VDO_ASSERT_LOG_ONLY(old_lock->reference_count > 1,
455 					    "hash locks should only become unreferenced in a terminal state, not state %s",
456 					    get_hash_lock_state_name(old_lock->state));
457 		}
458 
459 		list_del_init(&data_vio->hash_lock_entry);
460 		old_lock->reference_count -= 1;
461 
462 		data_vio->hash_lock = NULL;
463 	}
464 
465 	if (new_lock != NULL) {
466 		/*
467 		 * Keep all data_vios sharing the lock on a ring since they can complete in any
468 		 * order and we'll always need a pointer to one to compare data.
469 		 */
470 		list_move_tail(&data_vio->hash_lock_entry, &new_lock->duplicate_ring);
471 		new_lock->reference_count += 1;
472 		if (new_lock->max_references < new_lock->reference_count)
473 			new_lock->max_references = new_lock->reference_count;
474 
475 		data_vio->hash_lock = new_lock;
476 	}
477 }
478 
479 /* There are loops in the state diagram, so some forward decl's are needed. */
480 static void start_deduping(struct hash_lock *lock, struct data_vio *agent,
481 			   bool agent_is_done);
482 static void start_locking(struct hash_lock *lock, struct data_vio *agent);
483 static void start_writing(struct hash_lock *lock, struct data_vio *agent);
484 static void unlock_duplicate_pbn(struct vdo_completion *completion);
485 static void transfer_allocation_lock(struct data_vio *data_vio);
486 
487 /**
488  * exit_hash_lock() - Bottleneck for data_vios that have written or deduplicated and that are no
489  *                    longer needed to be an agent for the hash lock.
490  * @data_vio: The data_vio to complete and send to be cleaned up.
491  */
492 static void exit_hash_lock(struct data_vio *data_vio)
493 {
494 	/* Release the hash lock now, saving a thread transition in cleanup. */
495 	vdo_release_hash_lock(data_vio);
496 
497 	/* Complete the data_vio and start the clean-up path to release any locks it still holds. */
498 	data_vio->vio.completion.callback = complete_data_vio;
499 
500 	continue_data_vio(data_vio);
501 }
502 
503 /**
504  * set_duplicate_location() - Set the location of the duplicate block for data_vio, updating the
505  *                            is_duplicate and duplicate fields from a zoned_pbn.
506  * @data_vio: The data_vio to modify.
507  * @source: The location of the duplicate.
508  */
509 static void set_duplicate_location(struct data_vio *data_vio,
510 				   const struct zoned_pbn source)
511 {
512 	data_vio->is_duplicate = (source.pbn != VDO_ZERO_BLOCK);
513 	data_vio->duplicate = source;
514 }
515 
516 /**
517  * retire_lock_agent() - Retire the active lock agent, replacing it with the first lock waiter, and
518  *                       make the retired agent exit the hash lock.
519  * @lock: The hash lock to update.
520  *
521  * Return: The new lock agent (which will be NULL if there was no waiter)
522  */
523 static struct data_vio *retire_lock_agent(struct hash_lock *lock)
524 {
525 	struct data_vio *old_agent = lock->agent;
526 	struct data_vio *new_agent = dequeue_lock_waiter(lock);
527 
528 	lock->agent = new_agent;
529 	exit_hash_lock(old_agent);
530 	if (new_agent != NULL)
531 		set_duplicate_location(new_agent, lock->duplicate);
532 	return new_agent;
533 }
534 
535 /**
536  * wait_on_hash_lock() - Add a data_vio to the lock's queue of waiters.
537  * @lock: The hash lock on which to wait.
538  * @data_vio: The data_vio to add to the queue.
539  */
540 static void wait_on_hash_lock(struct hash_lock *lock, struct data_vio *data_vio)
541 {
542 	vdo_waitq_enqueue_waiter(&lock->waiters, &data_vio->waiter);
543 
544 	/*
545 	 * Make sure the agent doesn't block indefinitely in the packer since it now has at least
546 	 * one other data_vio waiting on it.
547 	 */
548 	if ((lock->state != VDO_HASH_LOCK_WRITING) || !cancel_data_vio_compression(lock->agent))
549 		return;
550 
551 	/*
552 	 * Even though we're waiting, we also have to send ourselves as a one-way message to the
553 	 * packer to ensure the agent continues executing. This is safe because
554 	 * cancel_vio_compression() guarantees the agent won't continue executing until this
555 	 * message arrives in the packer, and because the wait queue link isn't used for sending
556 	 * the message.
557 	 */
558 	data_vio->compression.lock_holder = lock->agent;
559 	launch_data_vio_packer_callback(data_vio, vdo_remove_lock_holder_from_packer);
560 }
561 
562 /**
563  * abort_waiter() - waiter_callback_fn function that shunts waiters to write their blocks without
564  *                  optimization.
565  * @waiter: The data_vio's waiter link.
566  * @context: Not used.
567  */
568 static void abort_waiter(struct vdo_waiter *waiter, void *context __always_unused)
569 {
570 	write_data_vio(vdo_waiter_as_data_vio(waiter));
571 }
572 
573 /**
574  * start_bypassing() - Stop using the hash lock.
575  * @lock: The hash lock.
576  * @agent: The data_vio acting as the agent for the lock.
577  *
578  * Stops using the hash lock. This is the final transition for hash locks which did not get an
579  * error.
580  */
581 static void start_bypassing(struct hash_lock *lock, struct data_vio *agent)
582 {
583 	lock->state = VDO_HASH_LOCK_BYPASSING;
584 	exit_hash_lock(agent);
585 }
586 
587 void vdo_clean_failed_hash_lock(struct data_vio *data_vio)
588 {
589 	struct hash_lock *lock = data_vio->hash_lock;
590 
591 	if (lock->state == VDO_HASH_LOCK_BYPASSING) {
592 		exit_hash_lock(data_vio);
593 		return;
594 	}
595 
596 	if (lock->agent == NULL) {
597 		lock->agent = data_vio;
598 	} else if (data_vio != lock->agent) {
599 		exit_hash_lock(data_vio);
600 		return;
601 	}
602 
603 	lock->state = VDO_HASH_LOCK_BYPASSING;
604 
605 	/* Ensure we don't attempt to update advice when cleaning up. */
606 	lock->update_advice = false;
607 
608 	vdo_waitq_notify_all_waiters(&lock->waiters, abort_waiter, NULL);
609 
610 	if (lock->duplicate_lock != NULL) {
611 		/* The agent must reference the duplicate zone to launch it. */
612 		data_vio->duplicate = lock->duplicate;
613 		launch_data_vio_duplicate_zone_callback(data_vio, unlock_duplicate_pbn);
614 		return;
615 	}
616 
617 	lock->agent = NULL;
618 	data_vio->is_duplicate = false;
619 	exit_hash_lock(data_vio);
620 }
621 
622 /**
623  * finish_unlocking() - Handle the result of the agent for the lock releasing a read lock on
624  *                      duplicate candidate.
625  * @completion: The completion of the data_vio acting as the lock's agent.
626  *
627  * This continuation is registered in unlock_duplicate_pbn().
628  */
629 static void finish_unlocking(struct vdo_completion *completion)
630 {
631 	struct data_vio *agent = as_data_vio(completion);
632 	struct hash_lock *lock = agent->hash_lock;
633 
634 	assert_hash_lock_agent(agent, __func__);
635 
636 	VDO_ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
637 			    "must have released the duplicate lock for the hash lock");
638 
639 	if (!lock->verified) {
640 		/*
641 		 * UNLOCKING -> WRITING transition: The lock we released was on an unverified
642 		 * block, so it must have been a lock on advice we were verifying, not on a
643 		 * location that was used for deduplication. Go write (or compress) the block to
644 		 * get a location to dedupe against.
645 		 */
646 		start_writing(lock, agent);
647 		return;
648 	}
649 
650 	/*
651 	 * With the lock released, the verified duplicate block may already have changed and will
652 	 * need to be re-verified if a waiter arrived.
653 	 */
654 	lock->verified = false;
655 
656 	if (vdo_waitq_has_waiters(&lock->waiters)) {
657 		/*
658 		 * UNLOCKING -> LOCKING transition: A new data_vio entered the hash lock while the
659 		 * agent was releasing the PBN lock. The current agent exits and the waiter has to
660 		 * re-lock and re-verify the duplicate location.
661 		 *
662 		 * TODO: If we used the current agent to re-acquire the PBN lock we wouldn't need
663 		 * to re-verify.
664 		 */
665 		agent = retire_lock_agent(lock);
666 		start_locking(lock, agent);
667 		return;
668 	}
669 
670 	/*
671 	 * UNLOCKING -> BYPASSING transition: The agent is done with the lock and no other
672 	 * data_vios reference it, so remove it from the lock map and return it to the pool.
673 	 */
674 	start_bypassing(lock, agent);
675 }
676 
677 /**
678  * unlock_duplicate_pbn() - Release a read lock on the PBN of the block that may or may not have
679  *                          contained duplicate data.
680  * @completion: The completion of the data_vio acting as the lock's agent.
681  *
682  * This continuation is launched by start_unlocking(), and calls back to finish_unlocking() on the
683  * hash zone thread.
684  */
685 static void unlock_duplicate_pbn(struct vdo_completion *completion)
686 {
687 	struct data_vio *agent = as_data_vio(completion);
688 	struct hash_lock *lock = agent->hash_lock;
689 
690 	assert_data_vio_in_duplicate_zone(agent);
691 	VDO_ASSERT_LOG_ONLY(lock->duplicate_lock != NULL,
692 			    "must have a duplicate lock to release");
693 
694 	vdo_release_physical_zone_pbn_lock(agent->duplicate.zone, agent->duplicate.pbn,
695 					   vdo_forget(lock->duplicate_lock));
696 	if (lock->state == VDO_HASH_LOCK_BYPASSING) {
697 		complete_data_vio(completion);
698 		return;
699 	}
700 
701 	launch_data_vio_hash_zone_callback(agent, finish_unlocking);
702 }
703 
704 /**
705  * start_unlocking() - Release a read lock on the PBN of the block that may or may not have
706  *                     contained duplicate data.
707  * @lock: The hash lock.
708  * @agent: The data_vio currently acting as the agent for the lock.
709  */
710 static void start_unlocking(struct hash_lock *lock, struct data_vio *agent)
711 {
712 	lock->state = VDO_HASH_LOCK_UNLOCKING;
713 	launch_data_vio_duplicate_zone_callback(agent, unlock_duplicate_pbn);
714 }
715 
716 static void release_context(struct dedupe_context *context)
717 {
718 	struct hash_zone *zone = context->zone;
719 
720 	WRITE_ONCE(zone->active, zone->active - 1);
721 	list_move(&context->list_entry, &zone->available);
722 }
723 
724 static void process_update_result(struct data_vio *agent)
725 {
726 	struct dedupe_context *context = agent->dedupe_context;
727 
728 	if ((context == NULL) ||
729 	    !change_context_state(context, DEDUPE_CONTEXT_COMPLETE, DEDUPE_CONTEXT_IDLE))
730 		return;
731 
732 	release_context(context);
733 }
734 
735 /**
736  * finish_updating() - Process the result of a UDS update performed by the agent for the lock.
737  * @completion: The completion of the data_vio that performed the update
738  *
739  * This continuation is registered in start_querying().
740  */
741 static void finish_updating(struct vdo_completion *completion)
742 {
743 	struct data_vio *agent = as_data_vio(completion);
744 	struct hash_lock *lock = agent->hash_lock;
745 
746 	assert_hash_lock_agent(agent, __func__);
747 
748 	process_update_result(agent);
749 
750 	/*
751 	 * UDS was updated successfully, so don't update again unless the duplicate location
752 	 * changes due to rollover.
753 	 */
754 	lock->update_advice = false;
755 
756 	if (vdo_waitq_has_waiters(&lock->waiters)) {
757 		/*
758 		 * UPDATING -> DEDUPING transition: A new data_vio arrived during the UDS update.
759 		 * Send it on the verified dedupe path. The agent is done with the lock, but the
760 		 * lock may still need to use it to clean up after rollover.
761 		 */
762 		start_deduping(lock, agent, true);
763 		return;
764 	}
765 
766 	if (lock->duplicate_lock != NULL) {
767 		/*
768 		 * UPDATING -> UNLOCKING transition: No one is waiting to dedupe, but we hold a
769 		 * duplicate PBN lock, so go release it.
770 		 */
771 		start_unlocking(lock, agent);
772 		return;
773 	}
774 
775 	/*
776 	 * UPDATING -> BYPASSING transition: No one is waiting to dedupe and there's no lock to
777 	 * release.
778 	 */
779 	start_bypassing(lock, agent);
780 }
781 
782 static void query_index(struct data_vio *data_vio, enum uds_request_type operation);
783 
784 /**
785  * start_updating() - Continue deduplication with the last step, updating UDS with the location of
786  *                    the duplicate that should be returned as advice in the future.
787  * @lock: The hash lock.
788  * @agent: The data_vio currently acting as the agent for the lock.
789  */
790 static void start_updating(struct hash_lock *lock, struct data_vio *agent)
791 {
792 	lock->state = VDO_HASH_LOCK_UPDATING;
793 
794 	VDO_ASSERT_LOG_ONLY(lock->verified, "new advice should have been verified");
795 	VDO_ASSERT_LOG_ONLY(lock->update_advice, "should only update advice if needed");
796 
797 	agent->last_async_operation = VIO_ASYNC_OP_UPDATE_DEDUPE_INDEX;
798 	set_data_vio_hash_zone_callback(agent, finish_updating);
799 	query_index(agent, UDS_UPDATE);
800 }
801 
802 /**
803  * finish_deduping() - Handle a data_vio that has finished deduplicating against the block locked
804  *                     by the hash lock.
805  * @lock: The hash lock.
806  * @data_vio: The lock holder that has finished deduplicating.
807  *
808  * If there are other data_vios still sharing the lock, this will just release the data_vio's share
809  * of the lock and finish processing the data_vio. If this is the last data_vio holding the lock,
810  * this makes the data_vio the lock agent and uses it to advance the state of the lock so it can
811  * eventually be released.
812  */
813 static void finish_deduping(struct hash_lock *lock, struct data_vio *data_vio)
814 {
815 	struct data_vio *agent = data_vio;
816 
817 	VDO_ASSERT_LOG_ONLY(lock->agent == NULL, "shouldn't have an agent in DEDUPING");
818 	VDO_ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&lock->waiters),
819 			    "shouldn't have any lock waiters in DEDUPING");
820 
821 	/* Just release the lock reference if other data_vios are still deduping. */
822 	if (lock->reference_count > 1) {
823 		exit_hash_lock(data_vio);
824 		return;
825 	}
826 
827 	/* The hash lock must have an agent for all other lock states. */
828 	lock->agent = agent;
829 	if (lock->update_advice) {
830 		/*
831 		 * DEDUPING -> UPDATING transition: The location of the duplicate block changed
832 		 * since the initial UDS query because of compression, rollover, or because the
833 		 * query agent didn't have an allocation. The UDS update was delayed in case there
834 		 * was another change in location, but with only this data_vio using the hash lock,
835 		 * it's time to update the advice.
836 		 */
837 		start_updating(lock, agent);
838 	} else {
839 		/*
840 		 * DEDUPING -> UNLOCKING transition: Release the PBN read lock on the duplicate
841 		 * location so the hash lock itself can be released (contingent on no new data_vios
842 		 * arriving in the lock before the agent returns).
843 		 */
844 		start_unlocking(lock, agent);
845 	}
846 }
847 
848 /**
849  * acquire_lock() - Get the lock for a record name.
850  * @zone: The zone responsible for the hash.
851  * @hash: The hash to lock.
852  * @replace_lock: If non-NULL, the lock already registered for the hash which should be replaced by
853  *                the new lock.
854  * @lock_ptr: A pointer to receive the hash lock.
855  *
856  * Gets the lock for the hash (record name) of the data in a data_vio, or if one does not exist (or
857  * if we are explicitly rolling over), initialize a new lock for the hash and register it in the
858  * zone. This must only be called in the correct thread for the zone.
859  *
860  * Return: VDO_SUCCESS or an error code.
861  */
862 static int __must_check acquire_lock(struct hash_zone *zone,
863 				     const struct uds_record_name *hash,
864 				     struct hash_lock *replace_lock,
865 				     struct hash_lock **lock_ptr)
866 {
867 	struct hash_lock *lock, *new_lock;
868 	int result;
869 
870 	/*
871 	 * Borrow and prepare a lock from the pool so we don't have to do two int_map accesses
872 	 * in the common case of no lock contention.
873 	 */
874 	result = VDO_ASSERT(!list_empty(&zone->lock_pool),
875 			    "never need to wait for a free hash lock");
876 	if (result != VDO_SUCCESS)
877 		return result;
878 
879 	new_lock = list_entry(zone->lock_pool.prev, struct hash_lock, pool_node);
880 	list_del_init(&new_lock->pool_node);
881 
882 	/*
883 	 * Fill in the hash of the new lock so we can map it, since we have to use the hash as the
884 	 * map key.
885 	 */
886 	new_lock->hash = *hash;
887 
888 	result = vdo_int_map_put(zone->hash_lock_map, hash_lock_key(new_lock),
889 				 new_lock, (replace_lock != NULL), (void **) &lock);
890 	if (result != VDO_SUCCESS) {
891 		return_hash_lock_to_pool(zone, vdo_forget(new_lock));
892 		return result;
893 	}
894 
895 	if (replace_lock != NULL) {
896 		/* On mismatch put the old lock back and return a severe error */
897 		VDO_ASSERT_LOG_ONLY(lock == replace_lock,
898 				    "old lock must have been in the lock map");
899 		/* TODO: Check earlier and bail out? */
900 		VDO_ASSERT_LOG_ONLY(replace_lock->registered,
901 				    "old lock must have been marked registered");
902 		replace_lock->registered = false;
903 	}
904 
905 	if (lock == replace_lock) {
906 		lock = new_lock;
907 		lock->registered = true;
908 	} else {
909 		/* There's already a lock for the hash, so we don't need the borrowed lock. */
910 		return_hash_lock_to_pool(zone, vdo_forget(new_lock));
911 	}
912 
913 	*lock_ptr = lock;
914 	return VDO_SUCCESS;
915 }
916 
917 /**
918  * enter_forked_lock() - Bind the data_vio to a new hash lock.
919  *
920  * Implements waiter_callback_fn. Binds the data_vio that was waiting to a new hash lock and waits
921  * on that lock.
922  */
923 static void enter_forked_lock(struct vdo_waiter *waiter, void *context)
924 {
925 	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
926 	struct hash_lock *new_lock = context;
927 
928 	set_hash_lock(data_vio, new_lock);
929 	wait_on_hash_lock(new_lock, data_vio);
930 }
931 
932 /**
933  * fork_hash_lock() - Fork a hash lock because it has run out of increments on the duplicate PBN.
934  * @old_lock: The hash lock to fork.
935  * @new_agent: The data_vio that will be the agent for the new lock.
936  *
937  * Transfers the new agent and any lock waiters to a new hash lock instance which takes the place
938  * of the old lock in the lock map. The old lock remains active, but will not update advice.
939  */
940 static void fork_hash_lock(struct hash_lock *old_lock, struct data_vio *new_agent)
941 {
942 	struct hash_lock *new_lock;
943 	int result;
944 
945 	result = acquire_lock(new_agent->hash_zone, &new_agent->record_name, old_lock,
946 			      &new_lock);
947 	if (result != VDO_SUCCESS) {
948 		continue_data_vio_with_error(new_agent, result);
949 		return;
950 	}
951 
952 	/*
953 	 * Only one of the two locks should update UDS. The old lock is out of references, so it
954 	 * would be poor dedupe advice in the short term.
955 	 */
956 	old_lock->update_advice = false;
957 	new_lock->update_advice = true;
958 
959 	set_hash_lock(new_agent, new_lock);
960 	new_lock->agent = new_agent;
961 
962 	vdo_waitq_notify_all_waiters(&old_lock->waiters, enter_forked_lock, new_lock);
963 
964 	new_agent->is_duplicate = false;
965 	start_writing(new_lock, new_agent);
966 }
967 
968 /**
969  * launch_dedupe() - Reserve a reference count increment for a data_vio and launch it on the dedupe
970  *                   path.
971  * @lock: The hash lock.
972  * @data_vio: The data_vio to deduplicate using the hash lock.
973  * @has_claim: true if the data_vio already has claimed an increment from the duplicate lock.
974  *
975  * If no increments are available, this will roll over to a new hash lock and launch the data_vio
976  * as the writing agent for that lock.
977  */
978 static void launch_dedupe(struct hash_lock *lock, struct data_vio *data_vio,
979 			  bool has_claim)
980 {
981 	if (!has_claim && !vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
982 		/* Out of increments, so must roll over to a new lock. */
983 		fork_hash_lock(lock, data_vio);
984 		return;
985 	}
986 
987 	/* Deduplicate against the lock's verified location. */
988 	set_duplicate_location(data_vio, lock->duplicate);
989 	data_vio->new_mapped = data_vio->duplicate;
990 	update_metadata_for_data_vio_write(data_vio, lock->duplicate_lock);
991 }
992 
993 /**
994  * start_deduping() - Enter the hash lock state where data_vios deduplicate in parallel against a
995  *                    true copy of their data on disk.
996  * @lock: The hash lock.
997  * @agent: The data_vio acting as the agent for the lock.
998  * @agent_is_done: true only if the agent has already written or deduplicated against its data.
999  *
1000  * If the agent itself needs to deduplicate, an increment for it must already have been claimed
1001  * from the duplicate lock, ensuring the hash lock will still have a data_vio holding it.
1002  */
1003 static void start_deduping(struct hash_lock *lock, struct data_vio *agent,
1004 			   bool agent_is_done)
1005 {
1006 	lock->state = VDO_HASH_LOCK_DEDUPING;
1007 
1008 	/*
1009 	 * We don't take the downgraded allocation lock from the agent unless we actually need to
1010 	 * deduplicate against it.
1011 	 */
1012 	if (lock->duplicate_lock == NULL) {
1013 		VDO_ASSERT_LOG_ONLY(!vdo_is_state_compressed(agent->new_mapped.state),
1014 				    "compression must have shared a lock");
1015 		VDO_ASSERT_LOG_ONLY(agent_is_done,
1016 				    "agent must have written the new duplicate");
1017 		transfer_allocation_lock(agent);
1018 	}
1019 
1020 	VDO_ASSERT_LOG_ONLY(vdo_is_pbn_read_lock(lock->duplicate_lock),
1021 			    "duplicate_lock must be a PBN read lock");
1022 
1023 	/*
1024 	 * This state is not like any of the other states. There is no designated agent--the agent
1025 	 * transitioning to this state and all the waiters will be launched to deduplicate in
1026 	 * parallel.
1027 	 */
1028 	lock->agent = NULL;
1029 
1030 	/*
1031 	 * Launch the agent (if not already deduplicated) and as many lock waiters as we have
1032 	 * available increments for on the dedupe path. If we run out of increments, rollover will
1033 	 * be triggered and the remaining waiters will be transferred to the new lock.
1034 	 */
1035 	if (!agent_is_done) {
1036 		launch_dedupe(lock, agent, true);
1037 		agent = NULL;
1038 	}
1039 	while (vdo_waitq_has_waiters(&lock->waiters))
1040 		launch_dedupe(lock, dequeue_lock_waiter(lock), false);
1041 
1042 	if (agent_is_done) {
1043 		/*
1044 		 * In the degenerate case where all the waiters rolled over to a new lock, this
1045 		 * will continue to use the old agent to clean up this lock, and otherwise it just
1046 		 * lets the agent exit the lock.
1047 		 */
1048 		finish_deduping(lock, agent);
1049 	}
1050 }
1051 
1052 /**
1053  * increment_stat() - Increment a statistic counter in a non-atomic yet thread-safe manner.
1054  * @stat: The statistic field to increment.
1055  */
1056 static inline void increment_stat(u64 *stat)
1057 {
1058 	/*
1059 	 * Must only be mutated on the hash zone thread. Prevents any compiler shenanigans from
1060 	 * affecting other threads reading stats.
1061 	 */
1062 	WRITE_ONCE(*stat, *stat + 1);
1063 }
1064 
1065 /**
1066  * finish_verifying() - Handle the result of the agent for the lock comparing its data to the
1067  *                      duplicate candidate.
1068  * @completion: The completion of the data_vio used to verify dedupe
1069  *
1070  * This continuation is registered in start_verifying().
1071  */
1072 static void finish_verifying(struct vdo_completion *completion)
1073 {
1074 	struct data_vio *agent = as_data_vio(completion);
1075 	struct hash_lock *lock = agent->hash_lock;
1076 
1077 	assert_hash_lock_agent(agent, __func__);
1078 
1079 	lock->verified = agent->is_duplicate;
1080 
1081 	/*
1082 	 * Only count the result of the initial verification of the advice as valid or stale, and
1083 	 * not any re-verifications due to PBN lock releases.
1084 	 */
1085 	if (!lock->verify_counted) {
1086 		lock->verify_counted = true;
1087 		if (lock->verified)
1088 			increment_stat(&agent->hash_zone->statistics.dedupe_advice_valid);
1089 		else
1090 			increment_stat(&agent->hash_zone->statistics.dedupe_advice_stale);
1091 	}
1092 
1093 	/*
1094 	 * Even if the block is a verified duplicate, we can't start to deduplicate unless we can
1095 	 * claim a reference count increment for the agent.
1096 	 */
1097 	if (lock->verified && !vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
1098 		agent->is_duplicate = false;
1099 		lock->verified = false;
1100 	}
1101 
1102 	if (lock->verified) {
1103 		/*
1104 		 * VERIFYING -> DEDUPING transition: The advice is for a true duplicate, so start
1105 		 * deduplicating against it, if references are available.
1106 		 */
1107 		start_deduping(lock, agent, false);
1108 	} else {
1109 		/*
1110 		 * VERIFYING -> UNLOCKING transition: Either the verify failed or we'd try to
1111 		 * dedupe and roll over immediately, which would fail because it would leave the
1112 		 * lock without an agent to release the PBN lock. In both cases, the data will have
1113 		 * to be written or compressed, but first the advice PBN must be unlocked by the
1114 		 * VERIFYING agent.
1115 		 */
1116 		lock->update_advice = true;
1117 		start_unlocking(lock, agent);
1118 	}
1119 }
1120 
1121 static bool blocks_equal(char *block1, char *block2)
1122 {
1123 	int i;
1124 
1125 	for (i = 0; i < VDO_BLOCK_SIZE; i += sizeof(u64)) {
1126 		if (*((u64 *) &block1[i]) != *((u64 *) &block2[i]))
1127 			return false;
1128 	}
1129 
1130 	return true;
1131 }
1132 
1133 static void verify_callback(struct vdo_completion *completion)
1134 {
1135 	struct data_vio *agent = as_data_vio(completion);
1136 
1137 	agent->is_duplicate = blocks_equal(agent->vio.data, agent->scratch_block);
1138 	launch_data_vio_hash_zone_callback(agent, finish_verifying);
1139 }
1140 
1141 static void uncompress_and_verify(struct vdo_completion *completion)
1142 {
1143 	struct data_vio *agent = as_data_vio(completion);
1144 	int result;
1145 
1146 	result = uncompress_data_vio(agent, agent->duplicate.state,
1147 				     agent->scratch_block);
1148 	if (result == VDO_SUCCESS) {
1149 		verify_callback(completion);
1150 		return;
1151 	}
1152 
1153 	agent->is_duplicate = false;
1154 	launch_data_vio_hash_zone_callback(agent, finish_verifying);
1155 }
1156 
1157 static void verify_endio(struct bio *bio)
1158 {
1159 	struct data_vio *agent = vio_as_data_vio(bio->bi_private);
1160 	int result = blk_status_to_errno(bio->bi_status);
1161 
1162 	vdo_count_completed_bios(bio);
1163 	if (result != VDO_SUCCESS) {
1164 		agent->is_duplicate = false;
1165 		launch_data_vio_hash_zone_callback(agent, finish_verifying);
1166 		return;
1167 	}
1168 
1169 	if (vdo_is_state_compressed(agent->duplicate.state)) {
1170 		launch_data_vio_cpu_callback(agent, uncompress_and_verify,
1171 					     CPU_Q_COMPRESS_BLOCK_PRIORITY);
1172 		return;
1173 	}
1174 
1175 	launch_data_vio_cpu_callback(agent, verify_callback,
1176 				     CPU_Q_COMPLETE_READ_PRIORITY);
1177 }
1178 
1179 /**
1180  * start_verifying() - Begin the data verification phase.
1181  * @lock: The hash lock (must be LOCKING).
1182  * @agent: The data_vio to use to read and compare candidate data.
1183  *
1184  * Continue the deduplication path for a hash lock by using the agent to read (and possibly
1185  * decompress) the data at the candidate duplicate location, comparing it to the data in the agent
1186  * to verify that the candidate is identical to all the data_vios sharing the hash. If so, it can
1187  * be deduplicated against, otherwise a data_vio allocation will have to be written to and used for
1188  * dedupe.
1189  */
1190 static void start_verifying(struct hash_lock *lock, struct data_vio *agent)
1191 {
1192 	int result;
1193 	struct vio *vio = &agent->vio;
1194 	char *buffer = (vdo_is_state_compressed(agent->duplicate.state) ?
1195 			(char *) agent->compression.block :
1196 			agent->scratch_block);
1197 
1198 	lock->state = VDO_HASH_LOCK_VERIFYING;
1199 	VDO_ASSERT_LOG_ONLY(!lock->verified, "hash lock only verifies advice once");
1200 
1201 	agent->last_async_operation = VIO_ASYNC_OP_VERIFY_DUPLICATION;
1202 	result = vio_reset_bio(vio, buffer, verify_endio, REQ_OP_READ,
1203 			       agent->duplicate.pbn);
1204 	if (result != VDO_SUCCESS) {
1205 		set_data_vio_hash_zone_callback(agent, finish_verifying);
1206 		continue_data_vio_with_error(agent, result);
1207 		return;
1208 	}
1209 
1210 	set_data_vio_bio_zone_callback(agent, vdo_submit_vio);
1211 	vdo_launch_completion_with_priority(&vio->completion, BIO_Q_VERIFY_PRIORITY);
1212 }
1213 
1214 /**
1215  * finish_locking() - Handle the result of the agent for the lock attempting to obtain a PBN read
1216  *                    lock on the candidate duplicate block.
1217  * @completion: The completion of the data_vio that attempted to get the read lock.
1218  *
1219  * This continuation is registered in lock_duplicate_pbn().
1220  */
1221 static void finish_locking(struct vdo_completion *completion)
1222 {
1223 	struct data_vio *agent = as_data_vio(completion);
1224 	struct hash_lock *lock = agent->hash_lock;
1225 
1226 	assert_hash_lock_agent(agent, __func__);
1227 
1228 	if (!agent->is_duplicate) {
1229 		VDO_ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
1230 				    "must not hold duplicate_lock if not flagged as a duplicate");
1231 		/*
1232 		 * LOCKING -> WRITING transition: The advice block is being modified or has no
1233 		 * available references, so try to write or compress the data, remembering to
1234 		 * update UDS later with the new advice.
1235 		 */
1236 		increment_stat(&agent->hash_zone->statistics.dedupe_advice_stale);
1237 		lock->update_advice = true;
1238 		start_writing(lock, agent);
1239 		return;
1240 	}
1241 
1242 	VDO_ASSERT_LOG_ONLY(lock->duplicate_lock != NULL,
1243 			    "must hold duplicate_lock if flagged as a duplicate");
1244 
1245 	if (!lock->verified) {
1246 		/*
1247 		 * LOCKING -> VERIFYING transition: Continue on the unverified dedupe path, reading
1248 		 * the candidate duplicate and comparing it to the agent's data to decide whether
1249 		 * it is a true duplicate or stale advice.
1250 		 */
1251 		start_verifying(lock, agent);
1252 		return;
1253 	}
1254 
1255 	if (!vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
1256 		/*
1257 		 * LOCKING -> UNLOCKING transition: The verified block was re-locked, but has no
1258 		 * available increments left. Must first release the useless PBN read lock before
1259 		 * rolling over to a new copy of the block.
1260 		 */
1261 		agent->is_duplicate = false;
1262 		lock->verified = false;
1263 		lock->update_advice = true;
1264 		start_unlocking(lock, agent);
1265 		return;
1266 	}
1267 
1268 	/*
1269 	 * LOCKING -> DEDUPING transition: Continue on the verified dedupe path, deduplicating
1270 	 * against a location that was previously verified or written to.
1271 	 */
1272 	start_deduping(lock, agent, false);
1273 }
1274 
1275 static bool acquire_provisional_reference(struct data_vio *agent, struct pbn_lock *lock,
1276 					  struct slab_depot *depot)
1277 {
1278 	/* Ensure that the newly-locked block is referenced. */
1279 	struct vdo_slab *slab = vdo_get_slab(depot, agent->duplicate.pbn);
1280 	int result = vdo_acquire_provisional_reference(slab, agent->duplicate.pbn, lock);
1281 
1282 	if (result == VDO_SUCCESS)
1283 		return true;
1284 
1285 	vdo_log_warning_strerror(result,
1286 				 "Error acquiring provisional reference for dedupe candidate; aborting dedupe");
1287 	agent->is_duplicate = false;
1288 	vdo_release_physical_zone_pbn_lock(agent->duplicate.zone,
1289 					   agent->duplicate.pbn, lock);
1290 	continue_data_vio_with_error(agent, result);
1291 	return false;
1292 }
1293 
1294 /**
1295  * lock_duplicate_pbn() - Acquire a read lock on the PBN of the block containing candidate
1296  *                        duplicate data (compressed or uncompressed).
1297  * @completion: The completion of the data_vio attempting to acquire the physical block lock on
1298  *              behalf of its hash lock.
1299  *
1300  * If the PBN is already locked for writing, the lock attempt is abandoned and is_duplicate will be
1301  * cleared before calling back. This continuation is launched from start_locking(), and calls back
1302  * to finish_locking() on the hash zone thread.
1303  */
1304 static void lock_duplicate_pbn(struct vdo_completion *completion)
1305 {
1306 	unsigned int increment_limit;
1307 	struct pbn_lock *lock;
1308 	int result;
1309 
1310 	struct data_vio *agent = as_data_vio(completion);
1311 	struct slab_depot *depot = vdo_from_data_vio(agent)->depot;
1312 	struct physical_zone *zone = agent->duplicate.zone;
1313 
1314 	assert_data_vio_in_duplicate_zone(agent);
1315 
1316 	set_data_vio_hash_zone_callback(agent, finish_locking);
1317 
1318 	/*
1319 	 * While in the zone that owns it, find out how many additional references can be made to
1320 	 * the block if it turns out to truly be a duplicate.
1321 	 */
1322 	increment_limit = vdo_get_increment_limit(depot, agent->duplicate.pbn);
1323 	if (increment_limit == 0) {
1324 		/*
1325 		 * We could deduplicate against it later if a reference happened to be released
1326 		 * during verification, but it's probably better to bail out now.
1327 		 */
1328 		agent->is_duplicate = false;
1329 		continue_data_vio(agent);
1330 		return;
1331 	}
1332 
1333 	result = vdo_attempt_physical_zone_pbn_lock(zone, agent->duplicate.pbn,
1334 						    VIO_READ_LOCK, &lock);
1335 	if (result != VDO_SUCCESS) {
1336 		continue_data_vio_with_error(agent, result);
1337 		return;
1338 	}
1339 
1340 	if (!vdo_is_pbn_read_lock(lock)) {
1341 		/*
1342 		 * There are three cases of write locks: uncompressed data block writes, compressed
1343 		 * (packed) block writes, and block map page writes. In all three cases, we give up
1344 		 * on trying to verify the advice and don't bother to try deduplicate against the
1345 		 * data in the write lock holder.
1346 		 *
1347 		 * 1) We don't ever want to try to deduplicate against a block map page.
1348 		 *
1349 		 * 2a) It's very unlikely we'd deduplicate against an entire packed block, both
1350 		 * because of the chance of matching it, and because we don't record advice for it,
1351 		 * but for the uncompressed representation of all the fragments it contains. The
1352 		 * only way we'd be getting lock contention is if we've written the same
1353 		 * representation coincidentally before, had it become unreferenced, and it just
1354 		 * happened to be packed together from compressed writes when we go to verify the
1355 		 * lucky advice. Giving up is a minuscule loss of potential dedupe.
1356 		 *
1357 		 * 2b) If the advice is for a slot of a compressed block, it's about to get
1358 		 * smashed, and the write smashing it cannot contain our data--it would have to be
1359 		 * writing on behalf of our hash lock, but that's impossible since we're the lock
1360 		 * agent.
1361 		 *
1362 		 * 3a) If the lock is held by a data_vio with different data, the advice is already
1363 		 * stale or is about to become stale.
1364 		 *
1365 		 * 3b) If the lock is held by a data_vio that matches us, we may as well either
1366 		 * write it ourselves (or reference the copy we already wrote) instead of
1367 		 * potentially having many duplicates wait for the lock holder to write, journal,
1368 		 * hash, and finally arrive in the hash lock. We lose a chance to avoid a UDS
1369 		 * update in the very rare case of advice for a free block that just happened to be
1370 		 * allocated to a data_vio with the same hash. There's also a chance to save on a
1371 		 * block write, at the cost of a block verify. Saving on a full block compare in
1372 		 * all stale advice cases almost certainly outweighs saving a UDS update and
1373 		 * trading a write for a read in a lucky case where advice would have been saved
1374 		 * from becoming stale.
1375 		 */
1376 		agent->is_duplicate = false;
1377 		continue_data_vio(agent);
1378 		return;
1379 	}
1380 
1381 	if (lock->holder_count == 0) {
1382 		if (!acquire_provisional_reference(agent, lock, depot))
1383 			return;
1384 
1385 		/*
1386 		 * The increment limit we grabbed earlier is still valid. The lock now holds the
1387 		 * rights to acquire all those references. Those rights will be claimed by hash
1388 		 * locks sharing this read lock.
1389 		 */
1390 		lock->increment_limit = increment_limit;
1391 	}
1392 
1393 	/*
1394 	 * We've successfully acquired a read lock on behalf of the hash lock, so mark it as such.
1395 	 */
1396 	set_duplicate_lock(agent->hash_lock, lock);
1397 
1398 	/*
1399 	 * TODO: Optimization: We could directly launch the block verify, then switch to a hash
1400 	 * thread.
1401 	 */
1402 	continue_data_vio(agent);
1403 }
1404 
1405 /**
1406  * start_locking() - Continue deduplication for a hash lock that has obtained valid advice of a
1407  *                   potential duplicate through its agent.
1408  * @lock: The hash lock (currently must be QUERYING).
1409  * @agent: The data_vio bearing the dedupe advice.
1410  */
1411 static void start_locking(struct hash_lock *lock, struct data_vio *agent)
1412 {
1413 	VDO_ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
1414 			    "must not acquire a duplicate lock when already holding it");
1415 
1416 	lock->state = VDO_HASH_LOCK_LOCKING;
1417 
1418 	/*
1419 	 * TODO: Optimization: If we arrange to continue on the duplicate zone thread when
1420 	 * accepting the advice, and don't explicitly change lock states (or use an agent-local
1421 	 * state, or an atomic), we can avoid a thread transition here.
1422 	 */
1423 	agent->last_async_operation = VIO_ASYNC_OP_LOCK_DUPLICATE_PBN;
1424 	launch_data_vio_duplicate_zone_callback(agent, lock_duplicate_pbn);
1425 }
1426 
1427 /**
1428  * finish_writing() - Re-entry point for the lock agent after it has finished writing or
1429  *                    compressing its copy of the data block.
1430  * @lock: The hash lock, which must be in state WRITING.
1431  * @agent: The data_vio that wrote its data for the lock.
1432  *
1433  * The agent will never need to dedupe against anything, so it's done with the lock, but the lock
1434  * may not be finished with it, as a UDS update might still be needed.
1435  *
1436  * If there are other lock holders, the agent will hand the job to one of them and exit, leaving
1437  * the lock to deduplicate against the just-written block. If there are no other lock holders, the
1438  * agent either exits (and later tears down the hash lock), or it remains the agent and updates
1439  * UDS.
1440  */
1441 static void finish_writing(struct hash_lock *lock, struct data_vio *agent)
1442 {
1443 	/*
1444 	 * Dedupe against the data block or compressed block slot the agent wrote. Since we know
1445 	 * the write succeeded, there's no need to verify it.
1446 	 */
1447 	lock->duplicate = agent->new_mapped;
1448 	lock->verified = true;
1449 
1450 	if (vdo_is_state_compressed(lock->duplicate.state) && lock->registered) {
1451 		/*
1452 		 * Compression means the location we gave in the UDS query is not the location
1453 		 * we're using to deduplicate.
1454 		 */
1455 		lock->update_advice = true;
1456 	}
1457 
1458 	/* If there are any waiters, we need to start deduping them. */
1459 	if (vdo_waitq_has_waiters(&lock->waiters)) {
1460 		/*
1461 		 * WRITING -> DEDUPING transition: an asynchronously-written block failed to
1462 		 * compress, so the PBN lock on the written copy was already transferred. The agent
1463 		 * is done with the lock, but the lock may still need to use it to clean up after
1464 		 * rollover.
1465 		 */
1466 		start_deduping(lock, agent, true);
1467 		return;
1468 	}
1469 
1470 	/*
1471 	 * There are no waiters and the agent has successfully written, so take a step towards
1472 	 * being able to release the hash lock (or just release it).
1473 	 */
1474 	if (lock->update_advice) {
1475 		/*
1476 		 * WRITING -> UPDATING transition: There's no waiter and a UDS update is needed, so
1477 		 * retain the WRITING agent and use it to launch the update. The happens on
1478 		 * compression, rollover, or the QUERYING agent not having an allocation.
1479 		 */
1480 		start_updating(lock, agent);
1481 	} else if (lock->duplicate_lock != NULL) {
1482 		/*
1483 		 * WRITING -> UNLOCKING transition: There's no waiter and no update needed, but the
1484 		 * compressed write gave us a shared duplicate lock that we must release.
1485 		 */
1486 		set_duplicate_location(agent, lock->duplicate);
1487 		start_unlocking(lock, agent);
1488 	} else {
1489 		/*
1490 		 * WRITING -> BYPASSING transition: There's no waiter, no update needed, and no
1491 		 * duplicate lock held, so both the agent and lock have no more work to do. The
1492 		 * agent will release its allocation lock in cleanup.
1493 		 */
1494 		start_bypassing(lock, agent);
1495 	}
1496 }
1497 
1498 /**
1499  * select_writing_agent() - Search through the lock waiters for a data_vio that has an allocation.
1500  * @lock: The hash lock to modify.
1501  *
1502  * If an allocation is found, swap agents, put the old agent at the head of the wait queue, then
1503  * return the new agent. Otherwise, just return the current agent.
1504  */
1505 static struct data_vio *select_writing_agent(struct hash_lock *lock)
1506 {
1507 	struct vdo_wait_queue temp_queue;
1508 	struct data_vio *data_vio;
1509 
1510 	vdo_waitq_init(&temp_queue);
1511 
1512 	/*
1513 	 * Move waiters to the temp queue one-by-one until we find an allocation. Not ideal to
1514 	 * search, but it only happens when nearly out of space.
1515 	 */
1516 	while (((data_vio = dequeue_lock_waiter(lock)) != NULL) &&
1517 	       !data_vio_has_allocation(data_vio)) {
1518 		/* Use the lower-level enqueue since we're just moving waiters around. */
1519 		vdo_waitq_enqueue_waiter(&temp_queue, &data_vio->waiter);
1520 	}
1521 
1522 	if (data_vio != NULL) {
1523 		/*
1524 		 * Move the rest of the waiters over to the temp queue, preserving the order they
1525 		 * arrived at the lock.
1526 		 */
1527 		vdo_waitq_transfer_all_waiters(&lock->waiters, &temp_queue);
1528 
1529 		/*
1530 		 * The current agent is being replaced and will have to wait to dedupe; make it the
1531 		 * first waiter since it was the first to reach the lock.
1532 		 */
1533 		vdo_waitq_enqueue_waiter(&lock->waiters, &lock->agent->waiter);
1534 		lock->agent = data_vio;
1535 	} else {
1536 		/* No one has an allocation, so keep the current agent. */
1537 		data_vio = lock->agent;
1538 	}
1539 
1540 	/* Swap all the waiters back onto the lock's queue. */
1541 	vdo_waitq_transfer_all_waiters(&temp_queue, &lock->waiters);
1542 	return data_vio;
1543 }
1544 
1545 /**
1546  * start_writing() - Begin the non-duplicate write path.
1547  * @lock: The hash lock (currently must be QUERYING).
1548  * @agent: The data_vio currently acting as the agent for the lock.
1549  *
1550  * Begins the non-duplicate write path for a hash lock that had no advice, selecting a data_vio
1551  * with an allocation as a new agent, if necessary, then resuming the agent on the data_vio write
1552  * path.
1553  */
1554 static void start_writing(struct hash_lock *lock, struct data_vio *agent)
1555 {
1556 	lock->state = VDO_HASH_LOCK_WRITING;
1557 
1558 	/*
1559 	 * The agent might not have received an allocation and so can't be used for writing, but
1560 	 * it's entirely possible that one of the waiters did.
1561 	 */
1562 	if (!data_vio_has_allocation(agent)) {
1563 		agent = select_writing_agent(lock);
1564 		/* If none of the waiters had an allocation, the writes all have to fail. */
1565 		if (!data_vio_has_allocation(agent)) {
1566 			/*
1567 			 * TODO: Should we keep a variant of BYPASSING that causes new arrivals to
1568 			 * fail immediately if they don't have an allocation? It might be possible
1569 			 * that on some path there would be non-waiters still referencing the lock,
1570 			 * so it would remain in the map as everything is currently spelled, even
1571 			 * if the agent and all waiters release.
1572 			 */
1573 			continue_data_vio_with_error(agent, VDO_NO_SPACE);
1574 			return;
1575 		}
1576 	}
1577 
1578 	/*
1579 	 * If the agent compresses, it might wait indefinitely in the packer, which would be bad if
1580 	 * there are any other data_vios waiting.
1581 	 */
1582 	if (vdo_waitq_has_waiters(&lock->waiters))
1583 		cancel_data_vio_compression(agent);
1584 
1585 	/*
1586 	 * Send the agent to the compress/pack/write path in vioWrite. If it succeeds, it will
1587 	 * return to the hash lock via vdo_continue_hash_lock() and call finish_writing().
1588 	 */
1589 	launch_compress_data_vio(agent);
1590 }
1591 
1592 /*
1593  * Decode VDO duplicate advice from the old_metadata field of a UDS request.
1594  * Returns true if valid advice was found and decoded
1595  */
1596 static bool decode_uds_advice(struct dedupe_context *context)
1597 {
1598 	const struct uds_request *request = &context->request;
1599 	struct data_vio *data_vio = context->requestor;
1600 	size_t offset = 0;
1601 	const struct uds_record_data *encoding = &request->old_metadata;
1602 	struct vdo *vdo = vdo_from_data_vio(data_vio);
1603 	struct zoned_pbn *advice = &data_vio->duplicate;
1604 	u8 version;
1605 	int result;
1606 
1607 	if ((request->status != UDS_SUCCESS) || !request->found)
1608 		return false;
1609 
1610 	version = encoding->data[offset++];
1611 	if (version != UDS_ADVICE_VERSION) {
1612 		vdo_log_error("invalid UDS advice version code %u", version);
1613 		return false;
1614 	}
1615 
1616 	advice->state = encoding->data[offset++];
1617 	advice->pbn = get_unaligned_le64(&encoding->data[offset]);
1618 	offset += sizeof(u64);
1619 	BUG_ON(offset != UDS_ADVICE_SIZE);
1620 
1621 	/* Don't use advice that's clearly meaningless. */
1622 	if ((advice->state == VDO_MAPPING_STATE_UNMAPPED) || (advice->pbn == VDO_ZERO_BLOCK)) {
1623 		vdo_log_debug("Invalid advice from deduplication server: pbn %llu, state %u. Giving up on deduplication of logical block %llu",
1624 			      (unsigned long long) advice->pbn, advice->state,
1625 			      (unsigned long long) data_vio->logical.lbn);
1626 		atomic64_inc(&vdo->stats.invalid_advice_pbn_count);
1627 		return false;
1628 	}
1629 
1630 	result = vdo_get_physical_zone(vdo, advice->pbn, &advice->zone);
1631 	if ((result != VDO_SUCCESS) || (advice->zone == NULL)) {
1632 		vdo_log_debug("Invalid physical block number from deduplication server: %llu, giving up on deduplication of logical block %llu",
1633 			      (unsigned long long) advice->pbn,
1634 			      (unsigned long long) data_vio->logical.lbn);
1635 		atomic64_inc(&vdo->stats.invalid_advice_pbn_count);
1636 		return false;
1637 	}
1638 
1639 	return true;
1640 }
1641 
1642 static void process_query_result(struct data_vio *agent)
1643 {
1644 	struct dedupe_context *context = agent->dedupe_context;
1645 
1646 	if (context == NULL)
1647 		return;
1648 
1649 	if (change_context_state(context, DEDUPE_CONTEXT_COMPLETE, DEDUPE_CONTEXT_IDLE)) {
1650 		agent->is_duplicate = decode_uds_advice(context);
1651 		release_context(context);
1652 	}
1653 }
1654 
1655 /**
1656  * finish_querying() - Process the result of a UDS query performed by the agent for the lock.
1657  * @completion: The completion of the data_vio that performed the query.
1658  *
1659  * This continuation is registered in start_querying().
1660  */
1661 static void finish_querying(struct vdo_completion *completion)
1662 {
1663 	struct data_vio *agent = as_data_vio(completion);
1664 	struct hash_lock *lock = agent->hash_lock;
1665 
1666 	assert_hash_lock_agent(agent, __func__);
1667 
1668 	process_query_result(agent);
1669 
1670 	if (agent->is_duplicate) {
1671 		lock->duplicate = agent->duplicate;
1672 		/*
1673 		 * QUERYING -> LOCKING transition: Valid advice was obtained from UDS. Use the
1674 		 * QUERYING agent to start the hash lock on the unverified dedupe path, verifying
1675 		 * that the advice can be used.
1676 		 */
1677 		start_locking(lock, agent);
1678 	} else {
1679 		/*
1680 		 * The agent will be used as the duplicate if has an allocation; if it does, that
1681 		 * location was posted to UDS, so no update will be needed.
1682 		 */
1683 		lock->update_advice = !data_vio_has_allocation(agent);
1684 		/*
1685 		 * QUERYING -> WRITING transition: There was no advice or the advice wasn't valid,
1686 		 * so try to write or compress the data.
1687 		 */
1688 		start_writing(lock, agent);
1689 	}
1690 }
1691 
1692 /**
1693  * start_querying() - Start deduplication for a hash lock.
1694  * @lock: The initialized hash lock.
1695  * @data_vio: The data_vio that has just obtained the new lock.
1696  *
1697  * Starts deduplication for a hash lock that has finished initializing by making the data_vio that
1698  * requested it the agent, entering the QUERYING state, and using the agent to perform the UDS
1699  * query on behalf of the lock.
1700  */
1701 static void start_querying(struct hash_lock *lock, struct data_vio *data_vio)
1702 {
1703 	lock->agent = data_vio;
1704 	lock->state = VDO_HASH_LOCK_QUERYING;
1705 	data_vio->last_async_operation = VIO_ASYNC_OP_CHECK_FOR_DUPLICATION;
1706 	set_data_vio_hash_zone_callback(data_vio, finish_querying);
1707 	query_index(data_vio,
1708 		    (data_vio_has_allocation(data_vio) ? UDS_POST : UDS_QUERY));
1709 }
1710 
1711 /**
1712  * report_bogus_lock_state() - Complain that a data_vio has entered a hash_lock that is in an
1713  *                             unimplemented or unusable state and continue the data_vio with an
1714  *                             error.
1715  * @lock: The hash lock.
1716  * @data_vio: The data_vio attempting to enter the lock.
1717  */
1718 static void report_bogus_lock_state(struct hash_lock *lock, struct data_vio *data_vio)
1719 {
1720 	VDO_ASSERT_LOG_ONLY(false, "hash lock must not be in unimplemented state %s",
1721 			    get_hash_lock_state_name(lock->state));
1722 	continue_data_vio_with_error(data_vio, VDO_LOCK_ERROR);
1723 }
1724 
1725 /**
1726  * vdo_continue_hash_lock() - Continue the processing state after writing, compressing, or
1727  *                            deduplicating.
1728  * @data_vio: The data_vio to continue processing in its hash lock.
1729  *
1730  * Asynchronously continue processing a data_vio in its hash lock after it has finished writing,
1731  * compressing, or deduplicating, so it can share the result with any data_vios waiting in the hash
1732  * lock, or update the UDS index, or simply release its share of the lock.
1733  *
1734  * Context: This must only be called in the correct thread for the hash zone.
1735  */
1736 void vdo_continue_hash_lock(struct vdo_completion *completion)
1737 {
1738 	struct data_vio *data_vio = as_data_vio(completion);
1739 	struct hash_lock *lock = data_vio->hash_lock;
1740 
1741 	switch (lock->state) {
1742 	case VDO_HASH_LOCK_WRITING:
1743 		VDO_ASSERT_LOG_ONLY(data_vio == lock->agent,
1744 				    "only the lock agent may continue the lock");
1745 		finish_writing(lock, data_vio);
1746 		break;
1747 
1748 	case VDO_HASH_LOCK_DEDUPING:
1749 		finish_deduping(lock, data_vio);
1750 		break;
1751 
1752 	case VDO_HASH_LOCK_BYPASSING:
1753 		/* This data_vio has finished the write path and the lock doesn't need it. */
1754 		exit_hash_lock(data_vio);
1755 		break;
1756 
1757 	case VDO_HASH_LOCK_INITIALIZING:
1758 	case VDO_HASH_LOCK_QUERYING:
1759 	case VDO_HASH_LOCK_UPDATING:
1760 	case VDO_HASH_LOCK_LOCKING:
1761 	case VDO_HASH_LOCK_VERIFYING:
1762 	case VDO_HASH_LOCK_UNLOCKING:
1763 		/* A lock in this state should never be re-entered. */
1764 		report_bogus_lock_state(lock, data_vio);
1765 		break;
1766 
1767 	default:
1768 		report_bogus_lock_state(lock, data_vio);
1769 	}
1770 }
1771 
1772 /**
1773  * is_hash_collision() - Check to see if a hash collision has occurred.
1774  * @lock: The lock to check.
1775  * @candidate: The data_vio seeking to share the lock.
1776  *
1777  * Check whether the data in data_vios sharing a lock is different than in a data_vio seeking to
1778  * share the lock, which should only be possible in the extremely unlikely case of a hash
1779  * collision.
1780  *
1781  * Return: true if the given data_vio must not share the lock because it doesn't have the same data
1782  *         as the lock holders.
1783  */
1784 static bool is_hash_collision(struct hash_lock *lock, struct data_vio *candidate)
1785 {
1786 	struct data_vio *lock_holder;
1787 	struct hash_zone *zone;
1788 	bool collides;
1789 
1790 	if (list_empty(&lock->duplicate_ring))
1791 		return false;
1792 
1793 	lock_holder = list_first_entry(&lock->duplicate_ring, struct data_vio,
1794 				       hash_lock_entry);
1795 	zone = candidate->hash_zone;
1796 	collides = !blocks_equal(lock_holder->vio.data, candidate->vio.data);
1797 	if (collides)
1798 		increment_stat(&zone->statistics.concurrent_hash_collisions);
1799 	else
1800 		increment_stat(&zone->statistics.concurrent_data_matches);
1801 
1802 	return collides;
1803 }
1804 
1805 static inline int assert_hash_lock_preconditions(const struct data_vio *data_vio)
1806 {
1807 	int result;
1808 
1809 	/* FIXME: BUG_ON() and/or enter read-only mode? */
1810 	result = VDO_ASSERT(data_vio->hash_lock == NULL,
1811 			    "must not already hold a hash lock");
1812 	if (result != VDO_SUCCESS)
1813 		return result;
1814 
1815 	result = VDO_ASSERT(list_empty(&data_vio->hash_lock_entry),
1816 			    "must not already be a member of a hash lock ring");
1817 	if (result != VDO_SUCCESS)
1818 		return result;
1819 
1820 	return VDO_ASSERT(data_vio->recovery_sequence_number == 0,
1821 			  "must not hold a recovery lock when getting a hash lock");
1822 }
1823 
1824 /**
1825  * vdo_acquire_hash_lock() - Acquire or share a lock on a record name.
1826  * @data_vio: The data_vio acquiring a lock on its record name.
1827  *
1828  * Acquire or share a lock on the hash (record name) of the data in a data_vio, updating the
1829  * data_vio to reference the lock. This must only be called in the correct thread for the zone. In
1830  * the unlikely case of a hash collision, this function will succeed, but the data_vio will not get
1831  * a lock reference.
1832  */
1833 void vdo_acquire_hash_lock(struct vdo_completion *completion)
1834 {
1835 	struct data_vio *data_vio = as_data_vio(completion);
1836 	struct hash_lock *lock;
1837 	int result;
1838 
1839 	assert_data_vio_in_hash_zone(data_vio);
1840 
1841 	result = assert_hash_lock_preconditions(data_vio);
1842 	if (result != VDO_SUCCESS) {
1843 		continue_data_vio_with_error(data_vio, result);
1844 		return;
1845 	}
1846 
1847 	result = acquire_lock(data_vio->hash_zone, &data_vio->record_name, NULL, &lock);
1848 	if (result != VDO_SUCCESS) {
1849 		continue_data_vio_with_error(data_vio, result);
1850 		return;
1851 	}
1852 
1853 	if (is_hash_collision(lock, data_vio)) {
1854 		/*
1855 		 * Hash collisions are extremely unlikely, but the bogus dedupe would be a data
1856 		 * corruption. Bypass optimization entirely. We can't compress a data_vio without
1857 		 * a hash_lock as the compressed write depends on the hash_lock to manage the
1858 		 * references for the compressed block.
1859 		 */
1860 		write_data_vio(data_vio);
1861 		return;
1862 	}
1863 
1864 	set_hash_lock(data_vio, lock);
1865 	switch (lock->state) {
1866 	case VDO_HASH_LOCK_INITIALIZING:
1867 		start_querying(lock, data_vio);
1868 		return;
1869 
1870 	case VDO_HASH_LOCK_QUERYING:
1871 	case VDO_HASH_LOCK_WRITING:
1872 	case VDO_HASH_LOCK_UPDATING:
1873 	case VDO_HASH_LOCK_LOCKING:
1874 	case VDO_HASH_LOCK_VERIFYING:
1875 	case VDO_HASH_LOCK_UNLOCKING:
1876 		/* The lock is busy, and can't be shared yet. */
1877 		wait_on_hash_lock(lock, data_vio);
1878 		return;
1879 
1880 	case VDO_HASH_LOCK_BYPASSING:
1881 		/* We can't use this lock, so bypass optimization entirely. */
1882 		vdo_release_hash_lock(data_vio);
1883 		write_data_vio(data_vio);
1884 		return;
1885 
1886 	case VDO_HASH_LOCK_DEDUPING:
1887 		launch_dedupe(lock, data_vio, false);
1888 		return;
1889 
1890 	default:
1891 		/* A lock in this state should not be acquired by new VIOs. */
1892 		report_bogus_lock_state(lock, data_vio);
1893 	}
1894 }
1895 
1896 /**
1897  * vdo_release_hash_lock() - Release a data_vio's share of a hash lock, if held, and null out the
1898  *                           data_vio's reference to it.
1899  * @data_vio: The data_vio releasing its hash lock.
1900  *
1901  * If the data_vio is the only one holding the lock, this also releases any resources or locks used
1902  * by the hash lock (such as a PBN read lock on a block containing data with the same hash) and
1903  * returns the lock to the hash zone's lock pool.
1904  *
1905  * Context: This must only be called in the correct thread for the hash zone.
1906  */
1907 void vdo_release_hash_lock(struct data_vio *data_vio)
1908 {
1909 	u64 lock_key;
1910 	struct hash_lock *lock = data_vio->hash_lock;
1911 	struct hash_zone *zone = data_vio->hash_zone;
1912 
1913 	if (lock == NULL)
1914 		return;
1915 
1916 	set_hash_lock(data_vio, NULL);
1917 
1918 	if (lock->reference_count > 0) {
1919 		/* The lock is still in use by other data_vios. */
1920 		return;
1921 	}
1922 
1923 	lock_key = hash_lock_key(lock);
1924 	if (lock->registered) {
1925 		struct hash_lock *removed;
1926 
1927 		removed = vdo_int_map_remove(zone->hash_lock_map, lock_key);
1928 		VDO_ASSERT_LOG_ONLY(lock == removed,
1929 				    "hash lock being released must have been mapped");
1930 	} else {
1931 		VDO_ASSERT_LOG_ONLY(lock != vdo_int_map_get(zone->hash_lock_map, lock_key),
1932 				    "unregistered hash lock must not be in the lock map");
1933 	}
1934 
1935 	VDO_ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&lock->waiters),
1936 			    "hash lock returned to zone must have no waiters");
1937 	VDO_ASSERT_LOG_ONLY((lock->duplicate_lock == NULL),
1938 			    "hash lock returned to zone must not reference a PBN lock");
1939 	VDO_ASSERT_LOG_ONLY((lock->state == VDO_HASH_LOCK_BYPASSING),
1940 			    "returned hash lock must not be in use with state %s",
1941 			    get_hash_lock_state_name(lock->state));
1942 	VDO_ASSERT_LOG_ONLY(list_empty(&lock->pool_node),
1943 			    "hash lock returned to zone must not be in a pool ring");
1944 	VDO_ASSERT_LOG_ONLY(list_empty(&lock->duplicate_ring),
1945 			    "hash lock returned to zone must not reference DataVIOs");
1946 
1947 	return_hash_lock_to_pool(zone, lock);
1948 }
1949 
1950 /**
1951  * transfer_allocation_lock() - Transfer a data_vio's downgraded allocation PBN lock to the
1952  *                              data_vio's hash lock, converting it to a duplicate PBN lock.
1953  * @data_vio: The data_vio holding the allocation lock to transfer.
1954  */
1955 static void transfer_allocation_lock(struct data_vio *data_vio)
1956 {
1957 	struct allocation *allocation = &data_vio->allocation;
1958 	struct hash_lock *hash_lock = data_vio->hash_lock;
1959 
1960 	VDO_ASSERT_LOG_ONLY(data_vio->new_mapped.pbn == allocation->pbn,
1961 			    "transferred lock must be for the block written");
1962 
1963 	allocation->pbn = VDO_ZERO_BLOCK;
1964 
1965 	VDO_ASSERT_LOG_ONLY(vdo_is_pbn_read_lock(allocation->lock),
1966 			    "must have downgraded the allocation lock before transfer");
1967 
1968 	hash_lock->duplicate = data_vio->new_mapped;
1969 	data_vio->duplicate = data_vio->new_mapped;
1970 
1971 	/*
1972 	 * Since the lock is being transferred, the holder count doesn't change (and isn't even
1973 	 * safe to examine on this thread).
1974 	 */
1975 	hash_lock->duplicate_lock = vdo_forget(allocation->lock);
1976 }
1977 
1978 /**
1979  * vdo_share_compressed_write_lock() - Make a data_vio's hash lock a shared holder of the PBN lock
1980  *                                     on the compressed block to which its data was just written.
1981  * @data_vio: The data_vio which was just compressed.
1982  * @pbn_lock: The PBN lock on the compressed block.
1983  *
1984  * If the lock is still a write lock (as it will be for the first share), it will be converted to a
1985  * read lock. This also reserves a reference count increment for the data_vio.
1986  */
1987 void vdo_share_compressed_write_lock(struct data_vio *data_vio,
1988 				     struct pbn_lock *pbn_lock)
1989 {
1990 	bool claimed;
1991 
1992 	VDO_ASSERT_LOG_ONLY(vdo_get_duplicate_lock(data_vio) == NULL,
1993 			    "a duplicate PBN lock should not exist when writing");
1994 	VDO_ASSERT_LOG_ONLY(vdo_is_state_compressed(data_vio->new_mapped.state),
1995 			    "lock transfer must be for a compressed write");
1996 	assert_data_vio_in_new_mapped_zone(data_vio);
1997 
1998 	/* First sharer downgrades the lock. */
1999 	if (!vdo_is_pbn_read_lock(pbn_lock))
2000 		vdo_downgrade_pbn_write_lock(pbn_lock, true);
2001 
2002 	/*
2003 	 * Get a share of the PBN lock, ensuring it cannot be released until after this data_vio
2004 	 * has had a chance to journal a reference.
2005 	 */
2006 	data_vio->duplicate = data_vio->new_mapped;
2007 	data_vio->hash_lock->duplicate = data_vio->new_mapped;
2008 	set_duplicate_lock(data_vio->hash_lock, pbn_lock);
2009 
2010 	/*
2011 	 * Claim a reference for this data_vio. Necessary since another hash_lock might start
2012 	 * deduplicating against it before our incRef.
2013 	 */
2014 	claimed = vdo_claim_pbn_lock_increment(pbn_lock);
2015 	VDO_ASSERT_LOG_ONLY(claimed, "impossible to fail to claim an initial increment");
2016 }
2017 
2018 static void start_uds_queue(void *ptr)
2019 {
2020 	/*
2021 	 * Allow the UDS dedupe worker thread to do memory allocations. It will only do allocations
2022 	 * during the UDS calls that open or close an index, but those allocations can safely sleep
2023 	 * while reserving a large amount of memory. We could use an allocations_allowed boolean
2024 	 * (like the base threads do), but it would be an unnecessary embellishment.
2025 	 */
2026 	struct vdo_thread *thread = vdo_get_work_queue_owner(vdo_get_current_work_queue());
2027 
2028 	vdo_register_allocating_thread(&thread->allocating_thread, NULL);
2029 }
2030 
2031 static void finish_uds_queue(void *ptr __always_unused)
2032 {
2033 	vdo_unregister_allocating_thread();
2034 }
2035 
2036 static void close_index(struct hash_zones *zones)
2037 	__must_hold(&zones->lock)
2038 {
2039 	int result;
2040 
2041 	/*
2042 	 * Change the index state so that get_index_statistics() will not try to use the index
2043 	 * session we are closing.
2044 	 */
2045 	zones->index_state = IS_CHANGING;
2046 	/* Close the index session, while not holding the lock. */
2047 	spin_unlock(&zones->lock);
2048 	result = uds_close_index(zones->index_session);
2049 
2050 	if (result != UDS_SUCCESS)
2051 		vdo_log_error_strerror(result, "Error closing index");
2052 	spin_lock(&zones->lock);
2053 	zones->index_state = IS_CLOSED;
2054 	zones->error_flag |= result != UDS_SUCCESS;
2055 	/* ASSERTION: We leave in IS_CLOSED state. */
2056 }
2057 
2058 static void open_index(struct hash_zones *zones)
2059 	__must_hold(&zones->lock)
2060 {
2061 	/* ASSERTION: We enter in IS_CLOSED state. */
2062 	int result;
2063 	bool create_flag = zones->create_flag;
2064 
2065 	zones->create_flag = false;
2066 	/*
2067 	 * Change the index state so that the it will be reported to the outside world as
2068 	 * "opening".
2069 	 */
2070 	zones->index_state = IS_CHANGING;
2071 	zones->error_flag = false;
2072 
2073 	/* Open the index session, while not holding the lock */
2074 	spin_unlock(&zones->lock);
2075 	result = uds_open_index(create_flag ? UDS_CREATE : UDS_LOAD,
2076 				&zones->parameters, zones->index_session);
2077 	if (result != UDS_SUCCESS)
2078 		vdo_log_error_strerror(result, "Error opening index");
2079 
2080 	spin_lock(&zones->lock);
2081 	if (!create_flag) {
2082 		switch (result) {
2083 		case -ENOENT:
2084 			/*
2085 			 * Either there is no index, or there is no way we can recover the index.
2086 			 * We will be called again and try to create a new index.
2087 			 */
2088 			zones->index_state = IS_CLOSED;
2089 			zones->create_flag = true;
2090 			return;
2091 		default:
2092 			break;
2093 		}
2094 	}
2095 	if (result == UDS_SUCCESS) {
2096 		zones->index_state = IS_OPENED;
2097 	} else {
2098 		zones->index_state = IS_CLOSED;
2099 		zones->index_target = IS_CLOSED;
2100 		zones->error_flag = true;
2101 		spin_unlock(&zones->lock);
2102 		vdo_log_info("Setting UDS index target state to error");
2103 		spin_lock(&zones->lock);
2104 	}
2105 	/*
2106 	 * ASSERTION: On success, we leave in IS_OPENED state.
2107 	 * ASSERTION: On failure, we leave in IS_CLOSED state.
2108 	 */
2109 }
2110 
2111 static void change_dedupe_state(struct vdo_completion *completion)
2112 {
2113 	struct hash_zones *zones = as_hash_zones(completion);
2114 
2115 	spin_lock(&zones->lock);
2116 
2117 	/* Loop until the index is in the target state and the create flag is clear. */
2118 	while (vdo_is_state_normal(&zones->state) &&
2119 	       ((zones->index_state != zones->index_target) || zones->create_flag)) {
2120 		if (zones->index_state == IS_OPENED)
2121 			close_index(zones);
2122 		else
2123 			open_index(zones);
2124 	}
2125 
2126 	zones->changing = false;
2127 	spin_unlock(&zones->lock);
2128 }
2129 
2130 static void start_expiration_timer(struct dedupe_context *context)
2131 {
2132 	u64 start_time = context->submission_jiffies;
2133 	u64 end_time;
2134 
2135 	if (!change_timer_state(context->zone, DEDUPE_QUERY_TIMER_IDLE,
2136 				DEDUPE_QUERY_TIMER_RUNNING))
2137 		return;
2138 
2139 	end_time = max(start_time + vdo_dedupe_index_timeout_jiffies,
2140 		       jiffies + vdo_dedupe_index_min_timer_jiffies);
2141 	mod_timer(&context->zone->timer, end_time);
2142 }
2143 
2144 /**
2145  * report_dedupe_timeouts() - Record and eventually report that some dedupe requests reached their
2146  *                            expiration time without getting answers, so we timed them out.
2147  * @zones: the hash zones.
2148  * @timeouts: the number of newly timed out requests.
2149  */
2150 static void report_dedupe_timeouts(struct hash_zones *zones, unsigned int timeouts)
2151 {
2152 	atomic64_add(timeouts, &zones->timeouts);
2153 	spin_lock(&zones->lock);
2154 	if (__ratelimit(&zones->ratelimiter)) {
2155 		u64 unreported = atomic64_read(&zones->timeouts);
2156 
2157 		unreported -= zones->reported_timeouts;
2158 		vdo_log_debug("UDS index timeout on %llu requests",
2159 			      (unsigned long long) unreported);
2160 		zones->reported_timeouts += unreported;
2161 	}
2162 	spin_unlock(&zones->lock);
2163 }
2164 
2165 static int initialize_index(struct vdo *vdo, struct hash_zones *zones)
2166 {
2167 	int result;
2168 	off_t uds_offset;
2169 	struct volume_geometry geometry = vdo->geometry;
2170 	static const struct vdo_work_queue_type uds_queue_type = {
2171 		.start = start_uds_queue,
2172 		.finish = finish_uds_queue,
2173 		.max_priority = UDS_Q_MAX_PRIORITY,
2174 		.default_priority = UDS_Q_PRIORITY,
2175 	};
2176 
2177 	vdo_set_dedupe_index_timeout_interval(vdo_dedupe_index_timeout_interval);
2178 	vdo_set_dedupe_index_min_timer_interval(vdo_dedupe_index_min_timer_interval);
2179 
2180 	/*
2181 	 * Since we will save up the timeouts that would have been reported but were ratelimited,
2182 	 * we don't need to report ratelimiting.
2183 	 */
2184 	ratelimit_default_init(&zones->ratelimiter);
2185 	ratelimit_set_flags(&zones->ratelimiter, RATELIMIT_MSG_ON_RELEASE);
2186 	uds_offset = ((vdo_get_index_region_start(geometry) -
2187 		       geometry.bio_offset) * VDO_BLOCK_SIZE);
2188 	zones->parameters = (struct uds_parameters) {
2189 		.bdev = vdo->device_config->owned_device->bdev,
2190 		.offset = uds_offset,
2191 		.size = (vdo_get_index_region_size(geometry) * VDO_BLOCK_SIZE),
2192 		.memory_size = geometry.index_config.mem,
2193 		.sparse = geometry.index_config.sparse,
2194 		.nonce = (u64) geometry.nonce,
2195 	};
2196 
2197 	result = uds_create_index_session(&zones->index_session);
2198 	if (result != UDS_SUCCESS)
2199 		return result;
2200 
2201 	result = vdo_make_thread(vdo, vdo->thread_config.dedupe_thread, &uds_queue_type,
2202 				 1, NULL);
2203 	if (result != VDO_SUCCESS) {
2204 		uds_destroy_index_session(vdo_forget(zones->index_session));
2205 		vdo_log_error("UDS index queue initialization failed (%d)", result);
2206 		return result;
2207 	}
2208 
2209 	vdo_initialize_completion(&zones->completion, vdo, VDO_HASH_ZONES_COMPLETION);
2210 	vdo_set_completion_callback(&zones->completion, change_dedupe_state,
2211 				    vdo->thread_config.dedupe_thread);
2212 	return VDO_SUCCESS;
2213 }
2214 
2215 /**
2216  * finish_index_operation() - This is the UDS callback for index queries.
2217  * @request: The uds request which has just completed.
2218  */
2219 static void finish_index_operation(struct uds_request *request)
2220 {
2221 	struct dedupe_context *context = container_of(request, struct dedupe_context,
2222 						      request);
2223 
2224 	if (change_context_state(context, DEDUPE_CONTEXT_PENDING,
2225 				 DEDUPE_CONTEXT_COMPLETE)) {
2226 		/*
2227 		 * This query has not timed out, so send its data_vio back to its hash zone to
2228 		 * process the results.
2229 		 */
2230 		continue_data_vio(context->requestor);
2231 		return;
2232 	}
2233 
2234 	/*
2235 	 * This query has timed out, so try to mark it complete and hence eligible for reuse. Its
2236 	 * data_vio has already moved on.
2237 	 */
2238 	if (!change_context_state(context, DEDUPE_CONTEXT_TIMED_OUT,
2239 				  DEDUPE_CONTEXT_TIMED_OUT_COMPLETE)) {
2240 		VDO_ASSERT_LOG_ONLY(false, "uds request was timed out (state %d)",
2241 				    atomic_read(&context->state));
2242 	}
2243 
2244 	vdo_funnel_queue_put(context->zone->timed_out_complete, &context->queue_entry);
2245 }
2246 
2247 /**
2248  * check_for_drain_complete() - Check whether this zone has drained.
2249  * @zone: The zone to check.
2250  */
2251 static void check_for_drain_complete(struct hash_zone *zone)
2252 {
2253 	data_vio_count_t recycled = 0;
2254 
2255 	if (!vdo_is_state_draining(&zone->state))
2256 		return;
2257 
2258 	if ((atomic_read(&zone->timer_state) == DEDUPE_QUERY_TIMER_IDLE) ||
2259 	    change_timer_state(zone, DEDUPE_QUERY_TIMER_RUNNING,
2260 			       DEDUPE_QUERY_TIMER_IDLE)) {
2261 		del_timer_sync(&zone->timer);
2262 	} else {
2263 		/*
2264 		 * There is an in flight time-out, which must get processed before we can continue.
2265 		 */
2266 		return;
2267 	}
2268 
2269 	for (;;) {
2270 		struct dedupe_context *context;
2271 		struct funnel_queue_entry *entry;
2272 
2273 		entry = vdo_funnel_queue_poll(zone->timed_out_complete);
2274 		if (entry == NULL)
2275 			break;
2276 
2277 		context = container_of(entry, struct dedupe_context, queue_entry);
2278 		atomic_set(&context->state, DEDUPE_CONTEXT_IDLE);
2279 		list_add(&context->list_entry, &zone->available);
2280 		recycled++;
2281 	}
2282 
2283 	if (recycled > 0)
2284 		WRITE_ONCE(zone->active, zone->active - recycled);
2285 	VDO_ASSERT_LOG_ONLY(READ_ONCE(zone->active) == 0, "all contexts inactive");
2286 	vdo_finish_draining(&zone->state);
2287 }
2288 
2289 static void timeout_index_operations_callback(struct vdo_completion *completion)
2290 {
2291 	struct dedupe_context *context, *tmp;
2292 	struct hash_zone *zone = as_hash_zone(completion);
2293 	u64 timeout_jiffies = msecs_to_jiffies(vdo_dedupe_index_timeout_interval);
2294 	unsigned long cutoff = jiffies - timeout_jiffies;
2295 	unsigned int timed_out = 0;
2296 
2297 	atomic_set(&zone->timer_state, DEDUPE_QUERY_TIMER_IDLE);
2298 	list_for_each_entry_safe(context, tmp, &zone->pending, list_entry) {
2299 		if (cutoff <= context->submission_jiffies) {
2300 			/*
2301 			 * We have reached the oldest query which has not timed out yet, so restart
2302 			 * the timer.
2303 			 */
2304 			start_expiration_timer(context);
2305 			break;
2306 		}
2307 
2308 		if (!change_context_state(context, DEDUPE_CONTEXT_PENDING,
2309 					  DEDUPE_CONTEXT_TIMED_OUT)) {
2310 			/*
2311 			 * This context completed between the time the timeout fired, and now. We
2312 			 * can treat it as a successful query, its requestor is already enqueued
2313 			 * to process it.
2314 			 */
2315 			continue;
2316 		}
2317 
2318 		/*
2319 		 * Remove this context from the pending list so we won't look at it again on a
2320 		 * subsequent timeout. Once the index completes it, it will be reused. Meanwhile,
2321 		 * send its requestor on its way.
2322 		 */
2323 		list_del_init(&context->list_entry);
2324 		continue_data_vio(context->requestor);
2325 		timed_out++;
2326 	}
2327 
2328 	if (timed_out > 0)
2329 		report_dedupe_timeouts(completion->vdo->hash_zones, timed_out);
2330 
2331 	check_for_drain_complete(zone);
2332 }
2333 
2334 static void timeout_index_operations(struct timer_list *t)
2335 {
2336 	struct hash_zone *zone = from_timer(zone, t, timer);
2337 
2338 	if (change_timer_state(zone, DEDUPE_QUERY_TIMER_RUNNING,
2339 			       DEDUPE_QUERY_TIMER_FIRED))
2340 		vdo_launch_completion(&zone->completion);
2341 }
2342 
2343 static int __must_check initialize_zone(struct vdo *vdo, struct hash_zones *zones,
2344 					zone_count_t zone_number)
2345 {
2346 	int result;
2347 	data_vio_count_t i;
2348 	struct hash_zone *zone = &zones->zones[zone_number];
2349 
2350 	result = vdo_int_map_create(VDO_LOCK_MAP_CAPACITY, &zone->hash_lock_map);
2351 	if (result != VDO_SUCCESS)
2352 		return result;
2353 
2354 	vdo_set_admin_state_code(&zone->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
2355 	zone->zone_number = zone_number;
2356 	zone->thread_id = vdo->thread_config.hash_zone_threads[zone_number];
2357 	vdo_initialize_completion(&zone->completion, vdo, VDO_HASH_ZONE_COMPLETION);
2358 	vdo_set_completion_callback(&zone->completion, timeout_index_operations_callback,
2359 				    zone->thread_id);
2360 	INIT_LIST_HEAD(&zone->lock_pool);
2361 	result = vdo_allocate(LOCK_POOL_CAPACITY, struct hash_lock, "hash_lock array",
2362 			      &zone->lock_array);
2363 	if (result != VDO_SUCCESS)
2364 		return result;
2365 
2366 	for (i = 0; i < LOCK_POOL_CAPACITY; i++)
2367 		return_hash_lock_to_pool(zone, &zone->lock_array[i]);
2368 
2369 	INIT_LIST_HEAD(&zone->available);
2370 	INIT_LIST_HEAD(&zone->pending);
2371 	result = vdo_make_funnel_queue(&zone->timed_out_complete);
2372 	if (result != VDO_SUCCESS)
2373 		return result;
2374 
2375 	timer_setup(&zone->timer, timeout_index_operations, 0);
2376 
2377 	for (i = 0; i < MAXIMUM_VDO_USER_VIOS; i++) {
2378 		struct dedupe_context *context = &zone->contexts[i];
2379 
2380 		context->zone = zone;
2381 		context->request.callback = finish_index_operation;
2382 		context->request.session = zones->index_session;
2383 		list_add(&context->list_entry, &zone->available);
2384 	}
2385 
2386 	return vdo_make_default_thread(vdo, zone->thread_id);
2387 }
2388 
2389 /** get_thread_id_for_zone() - Implements vdo_zone_thread_getter_fn. */
2390 static thread_id_t get_thread_id_for_zone(void *context, zone_count_t zone_number)
2391 {
2392 	struct hash_zones *zones = context;
2393 
2394 	return zones->zones[zone_number].thread_id;
2395 }
2396 
2397 /**
2398  * vdo_make_hash_zones() - Create the hash zones.
2399  *
2400  * @vdo: The vdo to which the zone will belong.
2401  * @zones_ptr: A pointer to hold the zones.
2402  *
2403  * Return: VDO_SUCCESS or an error code.
2404  */
2405 int vdo_make_hash_zones(struct vdo *vdo, struct hash_zones **zones_ptr)
2406 {
2407 	int result;
2408 	struct hash_zones *zones;
2409 	zone_count_t z;
2410 	zone_count_t zone_count = vdo->thread_config.hash_zone_count;
2411 
2412 	if (zone_count == 0)
2413 		return VDO_SUCCESS;
2414 
2415 	result = vdo_allocate_extended(struct hash_zones, zone_count, struct hash_zone,
2416 				       __func__, &zones);
2417 	if (result != VDO_SUCCESS)
2418 		return result;
2419 
2420 	result = initialize_index(vdo, zones);
2421 	if (result != VDO_SUCCESS) {
2422 		vdo_free(zones);
2423 		return result;
2424 	}
2425 
2426 	vdo_set_admin_state_code(&zones->state, VDO_ADMIN_STATE_NEW);
2427 
2428 	zones->zone_count = zone_count;
2429 	for (z = 0; z < zone_count; z++) {
2430 		result = initialize_zone(vdo, zones, z);
2431 		if (result != VDO_SUCCESS) {
2432 			vdo_free_hash_zones(zones);
2433 			return result;
2434 		}
2435 	}
2436 
2437 	result = vdo_make_action_manager(zones->zone_count, get_thread_id_for_zone,
2438 					 vdo->thread_config.admin_thread, zones, NULL,
2439 					 vdo, &zones->manager);
2440 	if (result != VDO_SUCCESS) {
2441 		vdo_free_hash_zones(zones);
2442 		return result;
2443 	}
2444 
2445 	*zones_ptr = zones;
2446 	return VDO_SUCCESS;
2447 }
2448 
2449 void vdo_finish_dedupe_index(struct hash_zones *zones)
2450 {
2451 	if (zones == NULL)
2452 		return;
2453 
2454 	uds_destroy_index_session(vdo_forget(zones->index_session));
2455 }
2456 
2457 /**
2458  * vdo_free_hash_zones() - Free the hash zones.
2459  * @zones: The zone to free.
2460  */
2461 void vdo_free_hash_zones(struct hash_zones *zones)
2462 {
2463 	zone_count_t i;
2464 
2465 	if (zones == NULL)
2466 		return;
2467 
2468 	vdo_free(vdo_forget(zones->manager));
2469 
2470 	for (i = 0; i < zones->zone_count; i++) {
2471 		struct hash_zone *zone = &zones->zones[i];
2472 
2473 		vdo_free_funnel_queue(vdo_forget(zone->timed_out_complete));
2474 		vdo_int_map_free(vdo_forget(zone->hash_lock_map));
2475 		vdo_free(vdo_forget(zone->lock_array));
2476 	}
2477 
2478 	if (zones->index_session != NULL)
2479 		vdo_finish_dedupe_index(zones);
2480 
2481 	ratelimit_state_exit(&zones->ratelimiter);
2482 	vdo_free(zones);
2483 }
2484 
2485 static void initiate_suspend_index(struct admin_state *state)
2486 {
2487 	struct hash_zones *zones = container_of(state, struct hash_zones, state);
2488 	enum index_state index_state;
2489 
2490 	spin_lock(&zones->lock);
2491 	index_state = zones->index_state;
2492 	spin_unlock(&zones->lock);
2493 
2494 	if (index_state != IS_CLOSED) {
2495 		bool save = vdo_is_state_saving(&zones->state);
2496 		int result;
2497 
2498 		result = uds_suspend_index_session(zones->index_session, save);
2499 		if (result != UDS_SUCCESS)
2500 			vdo_log_error_strerror(result, "Error suspending dedupe index");
2501 	}
2502 
2503 	vdo_finish_draining(state);
2504 }
2505 
2506 /**
2507  * suspend_index() - Suspend the UDS index prior to draining hash zones.
2508  *
2509  * Implements vdo_action_preamble_fn
2510  */
2511 static void suspend_index(void *context, struct vdo_completion *completion)
2512 {
2513 	struct hash_zones *zones = context;
2514 
2515 	vdo_start_draining(&zones->state,
2516 			   vdo_get_current_manager_operation(zones->manager), completion,
2517 			   initiate_suspend_index);
2518 }
2519 
2520 /**
2521  * initiate_drain() - Initiate a drain.
2522  *
2523  * Implements vdo_admin_initiator_fn.
2524  */
2525 static void initiate_drain(struct admin_state *state)
2526 {
2527 	check_for_drain_complete(container_of(state, struct hash_zone, state));
2528 }
2529 
2530 /**
2531  * drain_hash_zone() - Drain a hash zone.
2532  *
2533  * Implements vdo_zone_action_fn.
2534  */
2535 static void drain_hash_zone(void *context, zone_count_t zone_number,
2536 			    struct vdo_completion *parent)
2537 {
2538 	struct hash_zones *zones = context;
2539 
2540 	vdo_start_draining(&zones->zones[zone_number].state,
2541 			   vdo_get_current_manager_operation(zones->manager), parent,
2542 			   initiate_drain);
2543 }
2544 
2545 /** vdo_drain_hash_zones() - Drain all hash zones. */
2546 void vdo_drain_hash_zones(struct hash_zones *zones, struct vdo_completion *parent)
2547 {
2548 	vdo_schedule_operation(zones->manager, parent->vdo->suspend_type, suspend_index,
2549 			       drain_hash_zone, NULL, parent);
2550 }
2551 
2552 static void launch_dedupe_state_change(struct hash_zones *zones)
2553 	__must_hold(&zones->lock)
2554 {
2555 	/* ASSERTION: We enter with the lock held. */
2556 	if (zones->changing || !vdo_is_state_normal(&zones->state))
2557 		/* Either a change is already in progress, or changes are not allowed. */
2558 		return;
2559 
2560 	if (zones->create_flag || (zones->index_state != zones->index_target)) {
2561 		zones->changing = true;
2562 		vdo_launch_completion(&zones->completion);
2563 		return;
2564 	}
2565 
2566 	/* ASSERTION: We exit with the lock held. */
2567 }
2568 
2569 /**
2570  * resume_index() - Resume the UDS index prior to resuming hash zones.
2571  *
2572  * Implements vdo_action_preamble_fn
2573  */
2574 static void resume_index(void *context, struct vdo_completion *parent)
2575 {
2576 	struct hash_zones *zones = context;
2577 	struct device_config *config = parent->vdo->device_config;
2578 	int result;
2579 
2580 	zones->parameters.bdev = config->owned_device->bdev;
2581 	result = uds_resume_index_session(zones->index_session, zones->parameters.bdev);
2582 	if (result != UDS_SUCCESS)
2583 		vdo_log_error_strerror(result, "Error resuming dedupe index");
2584 
2585 	spin_lock(&zones->lock);
2586 	vdo_resume_if_quiescent(&zones->state);
2587 
2588 	if (config->deduplication) {
2589 		zones->index_target = IS_OPENED;
2590 		WRITE_ONCE(zones->dedupe_flag, true);
2591 	} else {
2592 		zones->index_target = IS_CLOSED;
2593 	}
2594 
2595 	launch_dedupe_state_change(zones);
2596 	spin_unlock(&zones->lock);
2597 
2598 	vdo_finish_completion(parent);
2599 }
2600 
2601 /**
2602  * resume_hash_zone() - Resume a hash zone.
2603  *
2604  * Implements vdo_zone_action_fn.
2605  */
2606 static void resume_hash_zone(void *context, zone_count_t zone_number,
2607 			     struct vdo_completion *parent)
2608 {
2609 	struct hash_zone *zone = &(((struct hash_zones *) context)->zones[zone_number]);
2610 
2611 	vdo_fail_completion(parent, vdo_resume_if_quiescent(&zone->state));
2612 }
2613 
2614 /**
2615  * vdo_resume_hash_zones() - Resume a set of hash zones.
2616  * @zones: The hash zones to resume.
2617  * @parent: The object to notify when the zones have resumed.
2618  */
2619 void vdo_resume_hash_zones(struct hash_zones *zones, struct vdo_completion *parent)
2620 {
2621 	if (vdo_is_read_only(parent->vdo)) {
2622 		vdo_launch_completion(parent);
2623 		return;
2624 	}
2625 
2626 	vdo_schedule_operation(zones->manager, VDO_ADMIN_STATE_RESUMING, resume_index,
2627 			       resume_hash_zone, NULL, parent);
2628 }
2629 
2630 /**
2631  * get_hash_zone_statistics() - Add the statistics for this hash zone to the tally for all zones.
2632  * @zone: The hash zone to query.
2633  * @tally: The tally
2634  */
2635 static void get_hash_zone_statistics(const struct hash_zone *zone,
2636 				     struct hash_lock_statistics *tally)
2637 {
2638 	const struct hash_lock_statistics *stats = &zone->statistics;
2639 
2640 	tally->dedupe_advice_valid += READ_ONCE(stats->dedupe_advice_valid);
2641 	tally->dedupe_advice_stale += READ_ONCE(stats->dedupe_advice_stale);
2642 	tally->concurrent_data_matches += READ_ONCE(stats->concurrent_data_matches);
2643 	tally->concurrent_hash_collisions += READ_ONCE(stats->concurrent_hash_collisions);
2644 	tally->curr_dedupe_queries += READ_ONCE(zone->active);
2645 }
2646 
2647 static void get_index_statistics(struct hash_zones *zones,
2648 				 struct index_statistics *stats)
2649 {
2650 	enum index_state state;
2651 	struct uds_index_stats index_stats;
2652 	int result;
2653 
2654 	spin_lock(&zones->lock);
2655 	state = zones->index_state;
2656 	spin_unlock(&zones->lock);
2657 
2658 	if (state != IS_OPENED)
2659 		return;
2660 
2661 	result = uds_get_index_session_stats(zones->index_session, &index_stats);
2662 	if (result != UDS_SUCCESS) {
2663 		vdo_log_error_strerror(result, "Error reading index stats");
2664 		return;
2665 	}
2666 
2667 	stats->entries_indexed = index_stats.entries_indexed;
2668 	stats->posts_found = index_stats.posts_found;
2669 	stats->posts_not_found = index_stats.posts_not_found;
2670 	stats->queries_found = index_stats.queries_found;
2671 	stats->queries_not_found = index_stats.queries_not_found;
2672 	stats->updates_found = index_stats.updates_found;
2673 	stats->updates_not_found = index_stats.updates_not_found;
2674 	stats->entries_discarded = index_stats.entries_discarded;
2675 }
2676 
2677 /**
2678  * vdo_get_dedupe_statistics() - Tally the statistics from all the hash zones and the UDS index.
2679  * @hash_zones: The hash zones to query
2680  *
2681  * Return: The sum of the hash lock statistics from all hash zones plus the statistics from the UDS
2682  *         index
2683  */
2684 void vdo_get_dedupe_statistics(struct hash_zones *zones, struct vdo_statistics *stats)
2685 
2686 {
2687 	zone_count_t zone;
2688 
2689 	for (zone = 0; zone < zones->zone_count; zone++)
2690 		get_hash_zone_statistics(&zones->zones[zone], &stats->hash_lock);
2691 
2692 	get_index_statistics(zones, &stats->index);
2693 
2694 	/*
2695 	 * zones->timeouts gives the number of timeouts, and dedupe_context_busy gives the number
2696 	 * of queries not made because of earlier timeouts.
2697 	 */
2698 	stats->dedupe_advice_timeouts =
2699 		(atomic64_read(&zones->timeouts) + atomic64_read(&zones->dedupe_context_busy));
2700 }
2701 
2702 /**
2703  * vdo_select_hash_zone() - Select the hash zone responsible for locking a given record name.
2704  * @zones: The hash_zones from which to select.
2705  * @name: The record name.
2706  *
2707  * Return: The hash zone responsible for the record name.
2708  */
2709 struct hash_zone *vdo_select_hash_zone(struct hash_zones *zones,
2710 				       const struct uds_record_name *name)
2711 {
2712 	/*
2713 	 * Use a fragment of the record name as a hash code. Eight bits of hash should suffice
2714 	 * since the number of hash zones is small.
2715 	 * TODO: Verify that the first byte is independent enough.
2716 	 */
2717 	u32 hash = name->name[0];
2718 
2719 	/*
2720 	 * Scale the 8-bit hash fragment to a zone index by treating it as a binary fraction and
2721 	 * multiplying that by the zone count. If the hash is uniformly distributed over [0 ..
2722 	 * 2^8-1], then (hash * count / 2^8) should be uniformly distributed over [0 .. count-1].
2723 	 * The multiply and shift is much faster than a divide (modulus) on X86 CPUs.
2724 	 */
2725 	hash = (hash * zones->zone_count) >> 8;
2726 	return &zones->zones[hash];
2727 }
2728 
2729 /**
2730  * dump_hash_lock() - Dump a compact description of hash_lock to the log if the lock is not on the
2731  *                    free list.
2732  * @lock: The hash lock to dump.
2733  */
2734 static void dump_hash_lock(const struct hash_lock *lock)
2735 {
2736 	const char *state;
2737 
2738 	if (!list_empty(&lock->pool_node)) {
2739 		/* This lock is on the free list. */
2740 		return;
2741 	}
2742 
2743 	/*
2744 	 * Necessarily cryptic since we can log a lot of these. First three chars of state is
2745 	 * unambiguous. 'U' indicates a lock not registered in the map.
2746 	 */
2747 	state = get_hash_lock_state_name(lock->state);
2748 	vdo_log_info("  hl %px: %3.3s %c%llu/%u rc=%u wc=%zu agt=%px",
2749 		     lock, state, (lock->registered ? 'D' : 'U'),
2750 		     (unsigned long long) lock->duplicate.pbn,
2751 		     lock->duplicate.state, lock->reference_count,
2752 		     vdo_waitq_num_waiters(&lock->waiters), lock->agent);
2753 }
2754 
2755 static const char *index_state_to_string(struct hash_zones *zones,
2756 					 enum index_state state)
2757 {
2758 	if (!vdo_is_state_normal(&zones->state))
2759 		return SUSPENDED;
2760 
2761 	switch (state) {
2762 	case IS_CLOSED:
2763 		return zones->error_flag ? ERROR : CLOSED;
2764 	case IS_CHANGING:
2765 		return zones->index_target == IS_OPENED ? OPENING : CLOSING;
2766 	case IS_OPENED:
2767 		return READ_ONCE(zones->dedupe_flag) ? ONLINE : OFFLINE;
2768 	default:
2769 		return UNKNOWN;
2770 	}
2771 }
2772 
2773 /**
2774  * dump_hash_zone() - Dump information about a hash zone to the log for debugging.
2775  * @zone: The zone to dump.
2776  */
2777 static void dump_hash_zone(const struct hash_zone *zone)
2778 {
2779 	data_vio_count_t i;
2780 
2781 	if (zone->hash_lock_map == NULL) {
2782 		vdo_log_info("struct hash_zone %u: NULL map", zone->zone_number);
2783 		return;
2784 	}
2785 
2786 	vdo_log_info("struct hash_zone %u: mapSize=%zu",
2787 		     zone->zone_number, vdo_int_map_size(zone->hash_lock_map));
2788 	for (i = 0; i < LOCK_POOL_CAPACITY; i++)
2789 		dump_hash_lock(&zone->lock_array[i]);
2790 }
2791 
2792 /**
2793  * vdo_dump_hash_zones() - Dump information about the hash zones to the log for debugging.
2794  * @zones: The zones to dump.
2795  */
2796 void vdo_dump_hash_zones(struct hash_zones *zones)
2797 {
2798 	const char *state, *target;
2799 	zone_count_t zone;
2800 
2801 	spin_lock(&zones->lock);
2802 	state = index_state_to_string(zones, zones->index_state);
2803 	target = (zones->changing ? index_state_to_string(zones, zones->index_target) : NULL);
2804 	spin_unlock(&zones->lock);
2805 
2806 	vdo_log_info("UDS index: state: %s", state);
2807 	if (target != NULL)
2808 		vdo_log_info("UDS index: changing to state: %s", target);
2809 
2810 	for (zone = 0; zone < zones->zone_count; zone++)
2811 		dump_hash_zone(&zones->zones[zone]);
2812 }
2813 
2814 void vdo_set_dedupe_index_timeout_interval(unsigned int value)
2815 {
2816 	u64 alb_jiffies;
2817 
2818 	/* Arbitrary maximum value is two minutes */
2819 	if (value > 120000)
2820 		value = 120000;
2821 	/* Arbitrary minimum value is 2 jiffies */
2822 	alb_jiffies = msecs_to_jiffies(value);
2823 
2824 	if (alb_jiffies < 2) {
2825 		alb_jiffies = 2;
2826 		value = jiffies_to_msecs(alb_jiffies);
2827 	}
2828 	vdo_dedupe_index_timeout_interval = value;
2829 	vdo_dedupe_index_timeout_jiffies = alb_jiffies;
2830 }
2831 
2832 void vdo_set_dedupe_index_min_timer_interval(unsigned int value)
2833 {
2834 	u64 min_jiffies;
2835 
2836 	/* Arbitrary maximum value is one second */
2837 	if (value > 1000)
2838 		value = 1000;
2839 
2840 	/* Arbitrary minimum value is 2 jiffies */
2841 	min_jiffies = msecs_to_jiffies(value);
2842 
2843 	if (min_jiffies < 2) {
2844 		min_jiffies = 2;
2845 		value = jiffies_to_msecs(min_jiffies);
2846 	}
2847 
2848 	vdo_dedupe_index_min_timer_interval = value;
2849 	vdo_dedupe_index_min_timer_jiffies = min_jiffies;
2850 }
2851 
2852 /**
2853  * acquire_context() - Acquire a dedupe context from a hash_zone if any are available.
2854  * @zone: the hash zone
2855  *
2856  * Return: A dedupe_context or NULL if none are available
2857  */
2858 static struct dedupe_context * __must_check acquire_context(struct hash_zone *zone)
2859 {
2860 	struct dedupe_context *context;
2861 	struct funnel_queue_entry *entry;
2862 
2863 	assert_in_hash_zone(zone, __func__);
2864 
2865 	if (!list_empty(&zone->available)) {
2866 		WRITE_ONCE(zone->active, zone->active + 1);
2867 		context = list_first_entry(&zone->available, struct dedupe_context,
2868 					   list_entry);
2869 		list_del_init(&context->list_entry);
2870 		return context;
2871 	}
2872 
2873 	entry = vdo_funnel_queue_poll(zone->timed_out_complete);
2874 	return ((entry == NULL) ?
2875 		NULL : container_of(entry, struct dedupe_context, queue_entry));
2876 }
2877 
2878 static void prepare_uds_request(struct uds_request *request, struct data_vio *data_vio,
2879 				enum uds_request_type operation)
2880 {
2881 	request->record_name = data_vio->record_name;
2882 	request->type = operation;
2883 	if ((operation == UDS_POST) || (operation == UDS_UPDATE)) {
2884 		size_t offset = 0;
2885 		struct uds_record_data *encoding = &request->new_metadata;
2886 
2887 		encoding->data[offset++] = UDS_ADVICE_VERSION;
2888 		encoding->data[offset++] = data_vio->new_mapped.state;
2889 		put_unaligned_le64(data_vio->new_mapped.pbn, &encoding->data[offset]);
2890 		offset += sizeof(u64);
2891 		BUG_ON(offset != UDS_ADVICE_SIZE);
2892 	}
2893 }
2894 
2895 /*
2896  * The index operation will inquire about data_vio.record_name, providing (if the operation is
2897  * appropriate) advice from the data_vio's new_mapped fields. The advice found in the index (or
2898  * NULL if none) will be returned via receive_data_vio_dedupe_advice(). dedupe_context.status is
2899  * set to the return status code of any asynchronous index processing.
2900  */
2901 static void query_index(struct data_vio *data_vio, enum uds_request_type operation)
2902 {
2903 	int result;
2904 	struct dedupe_context *context;
2905 	struct vdo *vdo = vdo_from_data_vio(data_vio);
2906 	struct hash_zone *zone = data_vio->hash_zone;
2907 
2908 	assert_data_vio_in_hash_zone(data_vio);
2909 
2910 	if (!READ_ONCE(vdo->hash_zones->dedupe_flag)) {
2911 		continue_data_vio(data_vio);
2912 		return;
2913 	}
2914 
2915 	context = acquire_context(zone);
2916 	if (context == NULL) {
2917 		atomic64_inc(&vdo->hash_zones->dedupe_context_busy);
2918 		continue_data_vio(data_vio);
2919 		return;
2920 	}
2921 
2922 	data_vio->dedupe_context = context;
2923 	context->requestor = data_vio;
2924 	context->submission_jiffies = jiffies;
2925 	prepare_uds_request(&context->request, data_vio, operation);
2926 	atomic_set(&context->state, DEDUPE_CONTEXT_PENDING);
2927 	list_add_tail(&context->list_entry, &zone->pending);
2928 	start_expiration_timer(context);
2929 	result = uds_launch_request(&context->request);
2930 	if (result != UDS_SUCCESS) {
2931 		context->request.status = result;
2932 		finish_index_operation(&context->request);
2933 	}
2934 }
2935 
2936 static void set_target_state(struct hash_zones *zones, enum index_state target,
2937 			     bool change_dedupe, bool dedupe, bool set_create)
2938 {
2939 	const char *old_state, *new_state;
2940 
2941 	spin_lock(&zones->lock);
2942 	old_state = index_state_to_string(zones, zones->index_target);
2943 	if (change_dedupe)
2944 		WRITE_ONCE(zones->dedupe_flag, dedupe);
2945 
2946 	if (set_create)
2947 		zones->create_flag = true;
2948 
2949 	zones->index_target = target;
2950 	launch_dedupe_state_change(zones);
2951 	new_state = index_state_to_string(zones, zones->index_target);
2952 	spin_unlock(&zones->lock);
2953 
2954 	if (old_state != new_state)
2955 		vdo_log_info("Setting UDS index target state to %s", new_state);
2956 }
2957 
2958 const char *vdo_get_dedupe_index_state_name(struct hash_zones *zones)
2959 {
2960 	const char *state;
2961 
2962 	spin_lock(&zones->lock);
2963 	state = index_state_to_string(zones, zones->index_state);
2964 	spin_unlock(&zones->lock);
2965 
2966 	return state;
2967 }
2968 
2969 /* Handle a dmsetup message relevant to the index. */
2970 int vdo_message_dedupe_index(struct hash_zones *zones, const char *name)
2971 {
2972 	if (strcasecmp(name, "index-close") == 0) {
2973 		set_target_state(zones, IS_CLOSED, false, false, false);
2974 		return 0;
2975 	} else if (strcasecmp(name, "index-create") == 0) {
2976 		set_target_state(zones, IS_OPENED, false, false, true);
2977 		return 0;
2978 	} else if (strcasecmp(name, "index-disable") == 0) {
2979 		set_target_state(zones, IS_OPENED, true, false, false);
2980 		return 0;
2981 	} else if (strcasecmp(name, "index-enable") == 0) {
2982 		set_target_state(zones, IS_OPENED, true, true, false);
2983 		return 0;
2984 	}
2985 
2986 	return -EINVAL;
2987 }
2988 
2989 void vdo_set_dedupe_state_normal(struct hash_zones *zones)
2990 {
2991 	vdo_set_admin_state_code(&zones->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
2992 }
2993 
2994 /* If create_flag, create a new index without first attempting to load an existing index. */
2995 void vdo_start_dedupe_index(struct hash_zones *zones, bool create_flag)
2996 {
2997 	set_target_state(zones, IS_OPENED, true, true, create_flag);
2998 }
2999