1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3 * Copyright 2023 Red Hat
4 */
5
6 /**
7 * DOC:
8 *
9 * Hash Locks:
10 *
11 * A hash_lock controls and coordinates writing, index access, and dedupe among groups of data_vios
12 * concurrently writing identical blocks, allowing them to deduplicate not only against advice but
13 * also against each other. This saves on index queries and allows those data_vios to concurrently
14 * deduplicate against a single block instead of being serialized through a PBN read lock. Only one
15 * index query is needed for each hash_lock, instead of one for every data_vio.
16 *
17 * Hash_locks are assigned to hash_zones by computing a modulus on the hash itself. Each hash_zone
18 * has a single dedicated queue and thread for performing all operations on the hash_locks assigned
19 * to that zone. The concurrency guarantees of this single-threaded model allow the code to omit
20 * more fine-grained locking for the hash_lock structures.
21 *
22 * A hash_lock acts like a state machine perhaps more than as a lock. Other than the starting and
23 * ending states INITIALIZING and BYPASSING, every state represents and is held for the duration of
24 * an asynchronous operation. All state transitions are performed on the thread of the hash_zone
25 * containing the lock. An asynchronous operation is almost always performed upon entering a state,
26 * and the callback from that operation triggers exiting the state and entering a new state.
27 *
28 * In all states except DEDUPING, there is a single data_vio, called the lock agent, performing the
29 * asynchronous operations on behalf of the lock. The agent will change during the lifetime of the
30 * lock if the lock is shared by more than one data_vio. data_vios waiting to deduplicate are kept
31 * on a wait queue. Viewed a different way, the agent holds the lock exclusively until the lock
32 * enters the DEDUPING state, at which point it becomes a shared lock that all the waiters (and any
33 * new data_vios that arrive) use to share a PBN lock. In state DEDUPING, there is no agent. When
34 * the last data_vio in the lock calls back in DEDUPING, it becomes the agent and the lock becomes
35 * exclusive again. New data_vios that arrive in the lock will also go on the wait queue.
36 *
37 * The existence of lock waiters is a key factor controlling which state the lock transitions to
38 * next. When the lock is new or has waiters, it will always try to reach DEDUPING, and when it
39 * doesn't, it will try to clean up and exit.
40 *
41 * Deduping requires holding a PBN lock on a block that is known to contain data identical to the
42 * data_vios in the lock, so the lock will send the agent to the duplicate zone to acquire the PBN
43 * lock (LOCKING), to the kernel I/O threads to read and verify the data (VERIFYING), or to write a
44 * new copy of the data to a full data block or a slot in a compressed block (WRITING).
45 *
46 * Cleaning up consists of updating the index when the data location is different from the initial
47 * index query (UPDATING, triggered by stale advice, compression, and rollover), releasing the PBN
48 * lock on the duplicate block (UNLOCKING), and if the agent is the last data_vio referencing the
49 * lock, releasing the hash_lock itself back to the hash zone (BYPASSING).
50 *
51 * The shortest sequence of states is for non-concurrent writes of new data:
52 * INITIALIZING -> QUERYING -> WRITING -> BYPASSING
53 * This sequence is short because no PBN read lock or index update is needed.
54 *
55 * Non-concurrent, finding valid advice looks like this (endpoints elided):
56 * -> QUERYING -> LOCKING -> VERIFYING -> DEDUPING -> UNLOCKING ->
57 * Or with stale advice (endpoints elided):
58 * -> QUERYING -> LOCKING -> VERIFYING -> UNLOCKING -> WRITING -> UPDATING ->
59 *
60 * When there are not enough available reference count increments available on a PBN for a data_vio
61 * to deduplicate, a new lock is forked and the excess waiters roll over to the new lock (which
62 * goes directly to WRITING). The new lock takes the place of the old lock in the lock map so new
63 * data_vios will be directed to it. The two locks will proceed independently, but only the new
64 * lock will have the right to update the index (unless it also forks).
65 *
66 * Since rollover happens in a lock instance, once a valid data location has been selected, it will
67 * not change. QUERYING and WRITING are only performed once per lock lifetime. All other
68 * non-endpoint states can be re-entered.
69 *
70 * The function names in this module follow a convention referencing the states and transitions in
71 * the state machine. For example, for the LOCKING state, there are start_locking() and
72 * finish_locking() functions. start_locking() is invoked by the finish function of the state (or
73 * states) that transition to LOCKING. It performs the actual lock state change and must be invoked
74 * on the hash zone thread. finish_locking() is called by (or continued via callback from) the
75 * code actually obtaining the lock. It does any bookkeeping or decision-making required and
76 * invokes the appropriate start function of the state being transitioned to after LOCKING.
77 *
78 * ----------------------------------------------------------------------
79 *
80 * Index Queries:
81 *
82 * A query to the UDS index is handled asynchronously by the index's threads. When the query is
83 * complete, a callback supplied with the query will be called from one of the those threads. Under
84 * heavy system load, the index may be slower to respond than is desirable for reasonable I/O
85 * throughput. Since deduplication of writes is not necessary for correct operation of a VDO
86 * device, it is acceptable to timeout out slow index queries and proceed to fulfill a write
87 * request without deduplicating. However, because the uds_request struct itself is supplied by the
88 * caller, we can not simply reuse a uds_request object which we have chosen to timeout. Hence,
89 * each hash_zone maintains a pool of dedupe_contexts which each contain a uds_request along with a
90 * reference to the data_vio on behalf of which they are performing a query.
91 *
92 * When a hash_lock needs to query the index, it attempts to acquire an unused dedupe_context from
93 * its hash_zone's pool. If one is available, that context is prepared, associated with the
94 * hash_lock's agent, added to the list of pending contexts, and then sent to the index. The
95 * context's state will be transitioned from DEDUPE_CONTEXT_IDLE to DEDUPE_CONTEXT_PENDING. If all
96 * goes well, the dedupe callback will be called by the index which will change the context's state
97 * to DEDUPE_CONTEXT_COMPLETE, and the associated data_vio will be enqueued to run back in the hash
98 * zone where the query results will be processed and the context will be put back in the idle
99 * state and returned to the hash_zone's available list.
100 *
101 * The first time an index query is launched from a given hash_zone, a timer is started. When the
102 * timer fires, the hash_zone's completion is enqueued to run in the hash_zone where the zone's
103 * pending list will be searched for any contexts in the pending state which have been running for
104 * too long. Those contexts are transitioned to the DEDUPE_CONTEXT_TIMED_OUT state and moved to the
105 * zone's timed_out list where they won't be examined again if there is a subsequent time out). The
106 * data_vios associated with timed out contexts are sent to continue processing their write
107 * operation without deduplicating. The timer is also restarted.
108 *
109 * When the dedupe callback is run for a context which is in the timed out state, that context is
110 * moved to the DEDUPE_CONTEXT_TIMED_OUT_COMPLETE state. No other action need be taken as the
111 * associated data_vios have already been dispatched.
112 *
113 * If a hash_lock needs a dedupe context, and the available list is empty, the timed_out list will
114 * be searched for any contexts which are timed out and complete. One of these will be used
115 * immediately, and the rest will be returned to the available list and marked idle.
116 */
117
118 #include "dedupe.h"
119
120 #include <linux/atomic.h>
121 #include <linux/jiffies.h>
122 #include <linux/kernel.h>
123 #include <linux/list.h>
124 #include <linux/ratelimit.h>
125 #include <linux/spinlock.h>
126 #include <linux/timer.h>
127
128 #include "logger.h"
129 #include "memory-alloc.h"
130 #include "numeric.h"
131 #include "permassert.h"
132 #include "string-utils.h"
133
134 #include "indexer.h"
135
136 #include "action-manager.h"
137 #include "admin-state.h"
138 #include "completion.h"
139 #include "constants.h"
140 #include "data-vio.h"
141 #include "int-map.h"
142 #include "io-submitter.h"
143 #include "packer.h"
144 #include "physical-zone.h"
145 #include "slab-depot.h"
146 #include "statistics.h"
147 #include "types.h"
148 #include "vdo.h"
149 #include "wait-queue.h"
150
151 #define DEDUPE_QUERY_TIMER_IDLE 0
152 #define DEDUPE_QUERY_TIMER_RUNNING 1
153 #define DEDUPE_QUERY_TIMER_FIRED 2
154
155 enum dedupe_context_state {
156 DEDUPE_CONTEXT_IDLE,
157 DEDUPE_CONTEXT_PENDING,
158 DEDUPE_CONTEXT_TIMED_OUT,
159 DEDUPE_CONTEXT_COMPLETE,
160 DEDUPE_CONTEXT_TIMED_OUT_COMPLETE,
161 };
162
163 /* Possible index states: closed, opened, or transitioning between those two. */
164 enum index_state {
165 IS_CLOSED,
166 IS_CHANGING,
167 IS_OPENED,
168 };
169
170 static const char *CLOSED = "closed";
171 static const char *CLOSING = "closing";
172 static const char *ERROR = "error";
173 static const char *OFFLINE = "offline";
174 static const char *ONLINE = "online";
175 static const char *OPENING = "opening";
176 static const char *SUSPENDED = "suspended";
177 static const char *UNKNOWN = "unknown";
178
179 /* Version 2 uses the kernel space UDS index and is limited to 16 bytes */
180 #define UDS_ADVICE_VERSION 2
181 /* version byte + state byte + 64-bit little-endian PBN */
182 #define UDS_ADVICE_SIZE (1 + 1 + sizeof(u64))
183
184 enum hash_lock_state {
185 /* State for locks that are not in use or are being initialized. */
186 VDO_HASH_LOCK_INITIALIZING,
187
188 /* This is the sequence of states typically used on the non-dedupe path. */
189 VDO_HASH_LOCK_QUERYING,
190 VDO_HASH_LOCK_WRITING,
191 VDO_HASH_LOCK_UPDATING,
192
193 /* The remaining states are typically used on the dedupe path in this order. */
194 VDO_HASH_LOCK_LOCKING,
195 VDO_HASH_LOCK_VERIFYING,
196 VDO_HASH_LOCK_DEDUPING,
197 VDO_HASH_LOCK_UNLOCKING,
198
199 /*
200 * Terminal state for locks returning to the pool. Must be last both because it's the final
201 * state, and also because it's used to count the states.
202 */
203 VDO_HASH_LOCK_BYPASSING,
204 };
205
206 static const char * const LOCK_STATE_NAMES[] = {
207 [VDO_HASH_LOCK_BYPASSING] = "BYPASSING",
208 [VDO_HASH_LOCK_DEDUPING] = "DEDUPING",
209 [VDO_HASH_LOCK_INITIALIZING] = "INITIALIZING",
210 [VDO_HASH_LOCK_LOCKING] = "LOCKING",
211 [VDO_HASH_LOCK_QUERYING] = "QUERYING",
212 [VDO_HASH_LOCK_UNLOCKING] = "UNLOCKING",
213 [VDO_HASH_LOCK_UPDATING] = "UPDATING",
214 [VDO_HASH_LOCK_VERIFYING] = "VERIFYING",
215 [VDO_HASH_LOCK_WRITING] = "WRITING",
216 };
217
218 struct hash_lock {
219 /* The block hash covered by this lock */
220 struct uds_record_name hash;
221
222 /* When the lock is unused, this list entry allows the lock to be pooled */
223 struct list_head pool_node;
224
225 /*
226 * A list containing the data VIOs sharing this lock, all having the same record name and
227 * data block contents, linked by their hash_lock_node fields.
228 */
229 struct list_head duplicate_vios;
230
231 /* The number of data_vios sharing this lock instance */
232 data_vio_count_t reference_count;
233
234 /* The maximum value of reference_count in the lifetime of this lock */
235 data_vio_count_t max_references;
236
237 /* The current state of this lock */
238 enum hash_lock_state state;
239
240 /* True if the UDS index should be updated with new advice */
241 bool update_advice;
242
243 /* True if the advice has been verified to be a true duplicate */
244 bool verified;
245
246 /* True if the lock has already accounted for an initial verification */
247 bool verify_counted;
248
249 /* True if this lock is registered in the lock map (cleared on rollover) */
250 bool registered;
251
252 /*
253 * If verified is false, this is the location of a possible duplicate. If verified is true,
254 * it is the verified location of a true duplicate.
255 */
256 struct zoned_pbn duplicate;
257
258 /* The PBN lock on the block containing the duplicate data */
259 struct pbn_lock *duplicate_lock;
260
261 /* The data_vio designated to act on behalf of the lock */
262 struct data_vio *agent;
263
264 /*
265 * Other data_vios with data identical to the agent who are currently waiting for the agent
266 * to get the information they all need to deduplicate--either against each other, or
267 * against an existing duplicate on disk.
268 */
269 struct vdo_wait_queue waiters;
270 };
271
272 #define LOCK_POOL_CAPACITY MAXIMUM_VDO_USER_VIOS
273
274 struct hash_zones {
275 struct action_manager *manager;
276 struct uds_parameters parameters;
277 struct uds_index_session *index_session;
278 struct ratelimit_state ratelimiter;
279 atomic64_t timeouts;
280 atomic64_t dedupe_context_busy;
281
282 /* This spinlock protects the state fields and the starting of dedupe requests. */
283 spinlock_t lock;
284
285 /* The fields in the next block are all protected by the lock */
286 struct vdo_completion completion;
287 enum index_state index_state;
288 enum index_state index_target;
289 struct admin_state state;
290 bool changing;
291 bool create_flag;
292 bool dedupe_flag;
293 bool error_flag;
294 u64 reported_timeouts;
295
296 /* The number of zones */
297 zone_count_t zone_count;
298 /* The hash zones themselves */
299 struct hash_zone zones[];
300 };
301
302 /* These are in milliseconds. */
303 unsigned int vdo_dedupe_index_timeout_interval = 5000;
304 unsigned int vdo_dedupe_index_min_timer_interval = 100;
305 /* Same two variables, in jiffies for easier consumption. */
306 static u64 vdo_dedupe_index_timeout_jiffies;
307 static u64 vdo_dedupe_index_min_timer_jiffies;
308
as_hash_zone(struct vdo_completion * completion)309 static inline struct hash_zone *as_hash_zone(struct vdo_completion *completion)
310 {
311 vdo_assert_completion_type(completion, VDO_HASH_ZONE_COMPLETION);
312 return container_of(completion, struct hash_zone, completion);
313 }
314
as_hash_zones(struct vdo_completion * completion)315 static inline struct hash_zones *as_hash_zones(struct vdo_completion *completion)
316 {
317 vdo_assert_completion_type(completion, VDO_HASH_ZONES_COMPLETION);
318 return container_of(completion, struct hash_zones, completion);
319 }
320
assert_in_hash_zone(struct hash_zone * zone,const char * name)321 static inline void assert_in_hash_zone(struct hash_zone *zone, const char *name)
322 {
323 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == zone->thread_id),
324 "%s called on hash zone thread", name);
325 }
326
change_context_state(struct dedupe_context * context,int old,int new)327 static inline bool change_context_state(struct dedupe_context *context, int old, int new)
328 {
329 return (atomic_cmpxchg(&context->state, old, new) == old);
330 }
331
change_timer_state(struct hash_zone * zone,int old,int new)332 static inline bool change_timer_state(struct hash_zone *zone, int old, int new)
333 {
334 return (atomic_cmpxchg(&zone->timer_state, old, new) == old);
335 }
336
337 /**
338 * return_hash_lock_to_pool() - (Re)initialize a hash lock and return it to its pool.
339 * @zone: The zone from which the lock was borrowed.
340 * @lock: The lock that is no longer in use.
341 */
return_hash_lock_to_pool(struct hash_zone * zone,struct hash_lock * lock)342 static void return_hash_lock_to_pool(struct hash_zone *zone, struct hash_lock *lock)
343 {
344 memset(lock, 0, sizeof(*lock));
345 INIT_LIST_HEAD(&lock->pool_node);
346 INIT_LIST_HEAD(&lock->duplicate_vios);
347 vdo_waitq_init(&lock->waiters);
348 list_add_tail(&lock->pool_node, &zone->lock_pool);
349 }
350
351 /**
352 * vdo_get_duplicate_lock() - Get the PBN lock on the duplicate data location for a data_vio from
353 * the hash_lock the data_vio holds (if there is one).
354 * @data_vio: The data_vio to query.
355 *
356 * Return: The PBN lock on the data_vio's duplicate location.
357 */
vdo_get_duplicate_lock(struct data_vio * data_vio)358 struct pbn_lock *vdo_get_duplicate_lock(struct data_vio *data_vio)
359 {
360 if (data_vio->hash_lock == NULL)
361 return NULL;
362
363 return data_vio->hash_lock->duplicate_lock;
364 }
365
366 /**
367 * hash_lock_key() - Return hash_lock's record name as a hash code.
368 * @lock: The hash lock.
369 *
370 * Return: The key to use for the int map.
371 */
hash_lock_key(struct hash_lock * lock)372 static inline u64 hash_lock_key(struct hash_lock *lock)
373 {
374 return get_unaligned_le64(&lock->hash.name);
375 }
376
377 /**
378 * get_hash_lock_state_name() - Get the string representation of a hash lock state.
379 * @state: The hash lock state.
380 *
381 * Return: The short string representing the state
382 */
get_hash_lock_state_name(enum hash_lock_state state)383 static const char *get_hash_lock_state_name(enum hash_lock_state state)
384 {
385 /* Catch if a state has been added without updating the name array. */
386 BUILD_BUG_ON((VDO_HASH_LOCK_BYPASSING + 1) != ARRAY_SIZE(LOCK_STATE_NAMES));
387 return (state < ARRAY_SIZE(LOCK_STATE_NAMES)) ? LOCK_STATE_NAMES[state] : "INVALID";
388 }
389
390 /**
391 * assert_hash_lock_agent() - Assert that a data_vio is the agent of its hash lock, and that this
392 * is being called in the hash zone.
393 * @data_vio: The data_vio expected to be the lock agent.
394 * @where: A string describing the function making the assertion.
395 */
assert_hash_lock_agent(struct data_vio * data_vio,const char * where)396 static void assert_hash_lock_agent(struct data_vio *data_vio, const char *where)
397 {
398 /* Not safe to access the agent field except from the hash zone. */
399 assert_data_vio_in_hash_zone(data_vio);
400 VDO_ASSERT_LOG_ONLY(data_vio == data_vio->hash_lock->agent,
401 "%s must be for the hash lock agent", where);
402 }
403
404 /**
405 * set_duplicate_lock() - Set the duplicate lock held by a hash lock. May only be called in the
406 * physical zone of the PBN lock.
407 * @hash_lock: The hash lock to update.
408 * @pbn_lock: The PBN read lock to use as the duplicate lock.
409 */
set_duplicate_lock(struct hash_lock * hash_lock,struct pbn_lock * pbn_lock)410 static void set_duplicate_lock(struct hash_lock *hash_lock, struct pbn_lock *pbn_lock)
411 {
412 VDO_ASSERT_LOG_ONLY((hash_lock->duplicate_lock == NULL),
413 "hash lock must not already hold a duplicate lock");
414 pbn_lock->holder_count += 1;
415 hash_lock->duplicate_lock = pbn_lock;
416 }
417
418 /**
419 * dequeue_lock_waiter() - Remove the first data_vio from the lock's waitq and return it.
420 * @lock: The lock containing the wait queue.
421 *
422 * Return: The first (oldest) waiter in the queue, or NULL if the queue is empty.
423 */
dequeue_lock_waiter(struct hash_lock * lock)424 static inline struct data_vio *dequeue_lock_waiter(struct hash_lock *lock)
425 {
426 return vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock->waiters));
427 }
428
429 /**
430 * set_hash_lock() - Set, change, or clear the hash lock a data_vio is using.
431 * @data_vio: The data_vio to update.
432 * @new_lock: The hash lock the data_vio is joining.
433 *
434 * Updates the hash lock (or locks) to reflect the change in membership.
435 */
set_hash_lock(struct data_vio * data_vio,struct hash_lock * new_lock)436 static void set_hash_lock(struct data_vio *data_vio, struct hash_lock *new_lock)
437 {
438 struct hash_lock *old_lock = data_vio->hash_lock;
439
440 if (old_lock != NULL) {
441 VDO_ASSERT_LOG_ONLY(data_vio->hash_zone != NULL,
442 "must have a hash zone when holding a hash lock");
443 VDO_ASSERT_LOG_ONLY(!list_empty(&data_vio->hash_lock_entry),
444 "must be on a hash lock list when holding a hash lock");
445 VDO_ASSERT_LOG_ONLY(old_lock->reference_count > 0,
446 "hash lock reference must be counted");
447
448 if ((old_lock->state != VDO_HASH_LOCK_BYPASSING) &&
449 (old_lock->state != VDO_HASH_LOCK_UNLOCKING)) {
450 /*
451 * If the reference count goes to zero in a non-terminal state, we're most
452 * likely leaking this lock.
453 */
454 VDO_ASSERT_LOG_ONLY(old_lock->reference_count > 1,
455 "hash locks should only become unreferenced in a terminal state, not state %s",
456 get_hash_lock_state_name(old_lock->state));
457 }
458
459 list_del_init(&data_vio->hash_lock_entry);
460 old_lock->reference_count -= 1;
461
462 data_vio->hash_lock = NULL;
463 }
464
465 if (new_lock != NULL) {
466 /*
467 * Keep all data_vios sharing the lock on a list since they can complete in any
468 * order and we'll always need a pointer to one to compare data.
469 */
470 list_move_tail(&data_vio->hash_lock_entry, &new_lock->duplicate_vios);
471 new_lock->reference_count += 1;
472 if (new_lock->max_references < new_lock->reference_count)
473 new_lock->max_references = new_lock->reference_count;
474
475 data_vio->hash_lock = new_lock;
476 }
477 }
478
479 /* There are loops in the state diagram, so some forward decl's are needed. */
480 static void start_deduping(struct hash_lock *lock, struct data_vio *agent,
481 bool agent_is_done);
482 static void start_locking(struct hash_lock *lock, struct data_vio *agent);
483 static void start_writing(struct hash_lock *lock, struct data_vio *agent);
484 static void unlock_duplicate_pbn(struct vdo_completion *completion);
485 static void transfer_allocation_lock(struct data_vio *data_vio);
486
487 /**
488 * exit_hash_lock() - Bottleneck for data_vios that have written or deduplicated and that are no
489 * longer needed to be an agent for the hash lock.
490 * @data_vio: The data_vio to complete and send to be cleaned up.
491 */
exit_hash_lock(struct data_vio * data_vio)492 static void exit_hash_lock(struct data_vio *data_vio)
493 {
494 /* Release the hash lock now, saving a thread transition in cleanup. */
495 vdo_release_hash_lock(data_vio);
496
497 /* Complete the data_vio and start the clean-up path to release any locks it still holds. */
498 data_vio->vio.completion.callback = complete_data_vio;
499
500 continue_data_vio(data_vio);
501 }
502
503 /**
504 * set_duplicate_location() - Set the location of the duplicate block for data_vio, updating the
505 * is_duplicate and duplicate fields from a zoned_pbn.
506 * @data_vio: The data_vio to modify.
507 * @source: The location of the duplicate.
508 */
set_duplicate_location(struct data_vio * data_vio,const struct zoned_pbn source)509 static void set_duplicate_location(struct data_vio *data_vio,
510 const struct zoned_pbn source)
511 {
512 data_vio->is_duplicate = (source.pbn != VDO_ZERO_BLOCK);
513 data_vio->duplicate = source;
514 }
515
516 /**
517 * retire_lock_agent() - Retire the active lock agent, replacing it with the first lock waiter, and
518 * make the retired agent exit the hash lock.
519 * @lock: The hash lock to update.
520 *
521 * Return: The new lock agent (which will be NULL if there was no waiter)
522 */
retire_lock_agent(struct hash_lock * lock)523 static struct data_vio *retire_lock_agent(struct hash_lock *lock)
524 {
525 struct data_vio *old_agent = lock->agent;
526 struct data_vio *new_agent = dequeue_lock_waiter(lock);
527
528 lock->agent = new_agent;
529 exit_hash_lock(old_agent);
530 if (new_agent != NULL)
531 set_duplicate_location(new_agent, lock->duplicate);
532 return new_agent;
533 }
534
535 /**
536 * wait_on_hash_lock() - Add a data_vio to the lock's queue of waiters.
537 * @lock: The hash lock on which to wait.
538 * @data_vio: The data_vio to add to the queue.
539 */
wait_on_hash_lock(struct hash_lock * lock,struct data_vio * data_vio)540 static void wait_on_hash_lock(struct hash_lock *lock, struct data_vio *data_vio)
541 {
542 vdo_waitq_enqueue_waiter(&lock->waiters, &data_vio->waiter);
543
544 /*
545 * Make sure the agent doesn't block indefinitely in the packer since it now has at least
546 * one other data_vio waiting on it.
547 */
548 if ((lock->state != VDO_HASH_LOCK_WRITING) || !cancel_data_vio_compression(lock->agent))
549 return;
550
551 /*
552 * Even though we're waiting, we also have to send ourselves as a one-way message to the
553 * packer to ensure the agent continues executing. This is safe because
554 * cancel_vio_compression() guarantees the agent won't continue executing until this
555 * message arrives in the packer, and because the wait queue link isn't used for sending
556 * the message.
557 */
558 data_vio->compression.lock_holder = lock->agent;
559 launch_data_vio_packer_callback(data_vio, vdo_remove_lock_holder_from_packer);
560 }
561
562 /**
563 * abort_waiter() - waiter_callback_fn function that shunts waiters to write their blocks without
564 * optimization.
565 * @waiter: The data_vio's waiter link.
566 * @context: Not used.
567 */
abort_waiter(struct vdo_waiter * waiter,void __always_unused * context)568 static void abort_waiter(struct vdo_waiter *waiter, void __always_unused *context)
569 {
570 write_data_vio(vdo_waiter_as_data_vio(waiter));
571 }
572
573 /**
574 * start_bypassing() - Stop using the hash lock.
575 * @lock: The hash lock.
576 * @agent: The data_vio acting as the agent for the lock.
577 *
578 * Stops using the hash lock. This is the final transition for hash locks which did not get an
579 * error.
580 */
start_bypassing(struct hash_lock * lock,struct data_vio * agent)581 static void start_bypassing(struct hash_lock *lock, struct data_vio *agent)
582 {
583 lock->state = VDO_HASH_LOCK_BYPASSING;
584 exit_hash_lock(agent);
585 }
586
vdo_clean_failed_hash_lock(struct data_vio * data_vio)587 void vdo_clean_failed_hash_lock(struct data_vio *data_vio)
588 {
589 struct hash_lock *lock = data_vio->hash_lock;
590
591 if (lock->state == VDO_HASH_LOCK_BYPASSING) {
592 exit_hash_lock(data_vio);
593 return;
594 }
595
596 if (lock->agent == NULL) {
597 lock->agent = data_vio;
598 } else if (data_vio != lock->agent) {
599 exit_hash_lock(data_vio);
600 return;
601 }
602
603 lock->state = VDO_HASH_LOCK_BYPASSING;
604
605 /* Ensure we don't attempt to update advice when cleaning up. */
606 lock->update_advice = false;
607
608 vdo_waitq_notify_all_waiters(&lock->waiters, abort_waiter, NULL);
609
610 if (lock->duplicate_lock != NULL) {
611 /* The agent must reference the duplicate zone to launch it. */
612 data_vio->duplicate = lock->duplicate;
613 launch_data_vio_duplicate_zone_callback(data_vio, unlock_duplicate_pbn);
614 return;
615 }
616
617 lock->agent = NULL;
618 data_vio->is_duplicate = false;
619 exit_hash_lock(data_vio);
620 }
621
622 /**
623 * finish_unlocking() - Handle the result of the agent for the lock releasing a read lock on
624 * duplicate candidate.
625 * @completion: The completion of the data_vio acting as the lock's agent.
626 *
627 * This continuation is registered in unlock_duplicate_pbn().
628 */
finish_unlocking(struct vdo_completion * completion)629 static void finish_unlocking(struct vdo_completion *completion)
630 {
631 struct data_vio *agent = as_data_vio(completion);
632 struct hash_lock *lock = agent->hash_lock;
633
634 assert_hash_lock_agent(agent, __func__);
635
636 VDO_ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
637 "must have released the duplicate lock for the hash lock");
638
639 if (!lock->verified) {
640 /*
641 * UNLOCKING -> WRITING transition: The lock we released was on an unverified
642 * block, so it must have been a lock on advice we were verifying, not on a
643 * location that was used for deduplication. Go write (or compress) the block to
644 * get a location to dedupe against.
645 */
646 start_writing(lock, agent);
647 return;
648 }
649
650 /*
651 * With the lock released, the verified duplicate block may already have changed and will
652 * need to be re-verified if a waiter arrived.
653 */
654 lock->verified = false;
655
656 if (vdo_waitq_has_waiters(&lock->waiters)) {
657 /*
658 * UNLOCKING -> LOCKING transition: A new data_vio entered the hash lock while the
659 * agent was releasing the PBN lock. The current agent exits and the waiter has to
660 * re-lock and re-verify the duplicate location.
661 *
662 * TODO: If we used the current agent to re-acquire the PBN lock we wouldn't need
663 * to re-verify.
664 */
665 agent = retire_lock_agent(lock);
666 start_locking(lock, agent);
667 return;
668 }
669
670 /*
671 * UNLOCKING -> BYPASSING transition: The agent is done with the lock and no other
672 * data_vios reference it, so remove it from the lock map and return it to the pool.
673 */
674 start_bypassing(lock, agent);
675 }
676
677 /**
678 * unlock_duplicate_pbn() - Release a read lock on the PBN of the block that may or may not have
679 * contained duplicate data.
680 * @completion: The completion of the data_vio acting as the lock's agent.
681 *
682 * This continuation is launched by start_unlocking(), and calls back to finish_unlocking() on the
683 * hash zone thread.
684 */
unlock_duplicate_pbn(struct vdo_completion * completion)685 static void unlock_duplicate_pbn(struct vdo_completion *completion)
686 {
687 struct data_vio *agent = as_data_vio(completion);
688 struct hash_lock *lock = agent->hash_lock;
689
690 assert_data_vio_in_duplicate_zone(agent);
691 VDO_ASSERT_LOG_ONLY(lock->duplicate_lock != NULL,
692 "must have a duplicate lock to release");
693
694 vdo_release_physical_zone_pbn_lock(agent->duplicate.zone, agent->duplicate.pbn,
695 vdo_forget(lock->duplicate_lock));
696 if (lock->state == VDO_HASH_LOCK_BYPASSING) {
697 complete_data_vio(completion);
698 return;
699 }
700
701 launch_data_vio_hash_zone_callback(agent, finish_unlocking);
702 }
703
704 /**
705 * start_unlocking() - Release a read lock on the PBN of the block that may or may not have
706 * contained duplicate data.
707 * @lock: The hash lock.
708 * @agent: The data_vio currently acting as the agent for the lock.
709 */
start_unlocking(struct hash_lock * lock,struct data_vio * agent)710 static void start_unlocking(struct hash_lock *lock, struct data_vio *agent)
711 {
712 lock->state = VDO_HASH_LOCK_UNLOCKING;
713 launch_data_vio_duplicate_zone_callback(agent, unlock_duplicate_pbn);
714 }
715
release_context(struct dedupe_context * context)716 static void release_context(struct dedupe_context *context)
717 {
718 struct hash_zone *zone = context->zone;
719
720 WRITE_ONCE(zone->active, zone->active - 1);
721 list_move(&context->list_entry, &zone->available);
722 }
723
process_update_result(struct data_vio * agent)724 static void process_update_result(struct data_vio *agent)
725 {
726 struct dedupe_context *context = agent->dedupe_context;
727
728 if ((context == NULL) ||
729 !change_context_state(context, DEDUPE_CONTEXT_COMPLETE, DEDUPE_CONTEXT_IDLE))
730 return;
731
732 agent->dedupe_context = NULL;
733 release_context(context);
734 }
735
736 /**
737 * finish_updating() - Process the result of a UDS update performed by the agent for the lock.
738 * @completion: The completion of the data_vio that performed the update
739 *
740 * This continuation is registered in start_querying().
741 */
finish_updating(struct vdo_completion * completion)742 static void finish_updating(struct vdo_completion *completion)
743 {
744 struct data_vio *agent = as_data_vio(completion);
745 struct hash_lock *lock = agent->hash_lock;
746
747 assert_hash_lock_agent(agent, __func__);
748
749 process_update_result(agent);
750
751 /*
752 * UDS was updated successfully, so don't update again unless the duplicate location
753 * changes due to rollover.
754 */
755 lock->update_advice = false;
756
757 if (vdo_waitq_has_waiters(&lock->waiters)) {
758 /*
759 * UPDATING -> DEDUPING transition: A new data_vio arrived during the UDS update.
760 * Send it on the verified dedupe path. The agent is done with the lock, but the
761 * lock may still need to use it to clean up after rollover.
762 */
763 start_deduping(lock, agent, true);
764 return;
765 }
766
767 if (lock->duplicate_lock != NULL) {
768 /*
769 * UPDATING -> UNLOCKING transition: No one is waiting to dedupe, but we hold a
770 * duplicate PBN lock, so go release it.
771 */
772 start_unlocking(lock, agent);
773 return;
774 }
775
776 /*
777 * UPDATING -> BYPASSING transition: No one is waiting to dedupe and there's no lock to
778 * release.
779 */
780 start_bypassing(lock, agent);
781 }
782
783 static void query_index(struct data_vio *data_vio, enum uds_request_type operation);
784
785 /**
786 * start_updating() - Continue deduplication with the last step, updating UDS with the location of
787 * the duplicate that should be returned as advice in the future.
788 * @lock: The hash lock.
789 * @agent: The data_vio currently acting as the agent for the lock.
790 */
start_updating(struct hash_lock * lock,struct data_vio * agent)791 static void start_updating(struct hash_lock *lock, struct data_vio *agent)
792 {
793 lock->state = VDO_HASH_LOCK_UPDATING;
794
795 VDO_ASSERT_LOG_ONLY(lock->verified, "new advice should have been verified");
796 VDO_ASSERT_LOG_ONLY(lock->update_advice, "should only update advice if needed");
797
798 agent->last_async_operation = VIO_ASYNC_OP_UPDATE_DEDUPE_INDEX;
799 set_data_vio_hash_zone_callback(agent, finish_updating);
800 query_index(agent, UDS_UPDATE);
801 }
802
803 /**
804 * finish_deduping() - Handle a data_vio that has finished deduplicating against the block locked
805 * by the hash lock.
806 * @lock: The hash lock.
807 * @data_vio: The lock holder that has finished deduplicating.
808 *
809 * If there are other data_vios still sharing the lock, this will just release the data_vio's share
810 * of the lock and finish processing the data_vio. If this is the last data_vio holding the lock,
811 * this makes the data_vio the lock agent and uses it to advance the state of the lock so it can
812 * eventually be released.
813 */
finish_deduping(struct hash_lock * lock,struct data_vio * data_vio)814 static void finish_deduping(struct hash_lock *lock, struct data_vio *data_vio)
815 {
816 struct data_vio *agent = data_vio;
817
818 VDO_ASSERT_LOG_ONLY(lock->agent == NULL, "shouldn't have an agent in DEDUPING");
819 VDO_ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&lock->waiters),
820 "shouldn't have any lock waiters in DEDUPING");
821
822 /* Just release the lock reference if other data_vios are still deduping. */
823 if (lock->reference_count > 1) {
824 exit_hash_lock(data_vio);
825 return;
826 }
827
828 /* The hash lock must have an agent for all other lock states. */
829 lock->agent = agent;
830 if (lock->update_advice) {
831 /*
832 * DEDUPING -> UPDATING transition: The location of the duplicate block changed
833 * since the initial UDS query because of compression, rollover, or because the
834 * query agent didn't have an allocation. The UDS update was delayed in case there
835 * was another change in location, but with only this data_vio using the hash lock,
836 * it's time to update the advice.
837 */
838 start_updating(lock, agent);
839 } else {
840 /*
841 * DEDUPING -> UNLOCKING transition: Release the PBN read lock on the duplicate
842 * location so the hash lock itself can be released (contingent on no new data_vios
843 * arriving in the lock before the agent returns).
844 */
845 start_unlocking(lock, agent);
846 }
847 }
848
849 /**
850 * acquire_lock() - Get the lock for a record name.
851 * @zone: The zone responsible for the hash.
852 * @hash: The hash to lock.
853 * @replace_lock: If non-NULL, the lock already registered for the hash which should be replaced by
854 * the new lock.
855 * @lock_ptr: A pointer to receive the hash lock.
856 *
857 * Gets the lock for the hash (record name) of the data in a data_vio, or if one does not exist (or
858 * if we are explicitly rolling over), initialize a new lock for the hash and register it in the
859 * zone. This must only be called in the correct thread for the zone.
860 *
861 * Return: VDO_SUCCESS or an error code.
862 */
acquire_lock(struct hash_zone * zone,const struct uds_record_name * hash,struct hash_lock * replace_lock,struct hash_lock ** lock_ptr)863 static int __must_check acquire_lock(struct hash_zone *zone,
864 const struct uds_record_name *hash,
865 struct hash_lock *replace_lock,
866 struct hash_lock **lock_ptr)
867 {
868 struct hash_lock *lock, *new_lock;
869 int result;
870
871 /*
872 * Borrow and prepare a lock from the pool so we don't have to do two int_map accesses
873 * in the common case of no lock contention.
874 */
875 result = VDO_ASSERT(!list_empty(&zone->lock_pool),
876 "never need to wait for a free hash lock");
877 if (result != VDO_SUCCESS)
878 return result;
879
880 new_lock = list_entry(zone->lock_pool.prev, struct hash_lock, pool_node);
881 list_del_init(&new_lock->pool_node);
882
883 /*
884 * Fill in the hash of the new lock so we can map it, since we have to use the hash as the
885 * map key.
886 */
887 new_lock->hash = *hash;
888
889 result = vdo_int_map_put(zone->hash_lock_map, hash_lock_key(new_lock),
890 new_lock, (replace_lock != NULL), (void **) &lock);
891 if (result != VDO_SUCCESS) {
892 return_hash_lock_to_pool(zone, vdo_forget(new_lock));
893 return result;
894 }
895
896 if (replace_lock != NULL) {
897 /* On mismatch put the old lock back and return a severe error */
898 VDO_ASSERT_LOG_ONLY(lock == replace_lock,
899 "old lock must have been in the lock map");
900 /* TODO: Check earlier and bail out? */
901 VDO_ASSERT_LOG_ONLY(replace_lock->registered,
902 "old lock must have been marked registered");
903 replace_lock->registered = false;
904 }
905
906 if (lock == replace_lock) {
907 lock = new_lock;
908 lock->registered = true;
909 } else {
910 /* There's already a lock for the hash, so we don't need the borrowed lock. */
911 return_hash_lock_to_pool(zone, vdo_forget(new_lock));
912 }
913
914 *lock_ptr = lock;
915 return VDO_SUCCESS;
916 }
917
918 /**
919 * enter_forked_lock() - Bind the data_vio to a new hash lock.
920 * @waiter: The data_vio's waiter link.
921 * @context: The new hash lock.
922 *
923 * Implements waiter_callback_fn. Binds the data_vio that was waiting to a new hash lock and waits
924 * on that lock.
925 */
enter_forked_lock(struct vdo_waiter * waiter,void * context)926 static void enter_forked_lock(struct vdo_waiter *waiter, void *context)
927 {
928 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
929 struct hash_lock *new_lock = context;
930
931 set_hash_lock(data_vio, new_lock);
932 wait_on_hash_lock(new_lock, data_vio);
933 }
934
935 /**
936 * fork_hash_lock() - Fork a hash lock because it has run out of increments on the duplicate PBN.
937 * @old_lock: The hash lock to fork.
938 * @new_agent: The data_vio that will be the agent for the new lock.
939 *
940 * Transfers the new agent and any lock waiters to a new hash lock instance which takes the place
941 * of the old lock in the lock map. The old lock remains active, but will not update advice.
942 */
fork_hash_lock(struct hash_lock * old_lock,struct data_vio * new_agent)943 static void fork_hash_lock(struct hash_lock *old_lock, struct data_vio *new_agent)
944 {
945 struct hash_lock *new_lock;
946 int result;
947
948 result = acquire_lock(new_agent->hash_zone, &new_agent->record_name, old_lock,
949 &new_lock);
950 if (result != VDO_SUCCESS) {
951 continue_data_vio_with_error(new_agent, result);
952 return;
953 }
954
955 /*
956 * Only one of the two locks should update UDS. The old lock is out of references, so it
957 * would be poor dedupe advice in the short term.
958 */
959 old_lock->update_advice = false;
960 new_lock->update_advice = true;
961
962 set_hash_lock(new_agent, new_lock);
963 new_lock->agent = new_agent;
964
965 vdo_waitq_notify_all_waiters(&old_lock->waiters, enter_forked_lock, new_lock);
966
967 new_agent->is_duplicate = false;
968 start_writing(new_lock, new_agent);
969 }
970
971 /**
972 * launch_dedupe() - Reserve a reference count increment for a data_vio and launch it on the dedupe
973 * path.
974 * @lock: The hash lock.
975 * @data_vio: The data_vio to deduplicate using the hash lock.
976 * @has_claim: True if the data_vio already has claimed an increment from the duplicate lock.
977 *
978 * If no increments are available, this will roll over to a new hash lock and launch the data_vio
979 * as the writing agent for that lock.
980 */
launch_dedupe(struct hash_lock * lock,struct data_vio * data_vio,bool has_claim)981 static void launch_dedupe(struct hash_lock *lock, struct data_vio *data_vio,
982 bool has_claim)
983 {
984 if (!has_claim && !vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
985 /* Out of increments, so must roll over to a new lock. */
986 fork_hash_lock(lock, data_vio);
987 return;
988 }
989
990 /* Deduplicate against the lock's verified location. */
991 set_duplicate_location(data_vio, lock->duplicate);
992 data_vio->new_mapped = data_vio->duplicate;
993 update_metadata_for_data_vio_write(data_vio, lock->duplicate_lock);
994 }
995
996 /**
997 * start_deduping() - Enter the hash lock state where data_vios deduplicate in parallel against a
998 * true copy of their data on disk.
999 * @lock: The hash lock.
1000 * @agent: The data_vio acting as the agent for the lock.
1001 * @agent_is_done: True only if the agent has already written or deduplicated against its data.
1002 *
1003 * If the agent itself needs to deduplicate, an increment for it must already have been claimed
1004 * from the duplicate lock, ensuring the hash lock will still have a data_vio holding it.
1005 */
start_deduping(struct hash_lock * lock,struct data_vio * agent,bool agent_is_done)1006 static void start_deduping(struct hash_lock *lock, struct data_vio *agent,
1007 bool agent_is_done)
1008 {
1009 lock->state = VDO_HASH_LOCK_DEDUPING;
1010
1011 /*
1012 * We don't take the downgraded allocation lock from the agent unless we actually need to
1013 * deduplicate against it.
1014 */
1015 if (lock->duplicate_lock == NULL) {
1016 VDO_ASSERT_LOG_ONLY(!vdo_is_state_compressed(agent->new_mapped.state),
1017 "compression must have shared a lock");
1018 VDO_ASSERT_LOG_ONLY(agent_is_done,
1019 "agent must have written the new duplicate");
1020 transfer_allocation_lock(agent);
1021 }
1022
1023 VDO_ASSERT_LOG_ONLY(vdo_is_pbn_read_lock(lock->duplicate_lock),
1024 "duplicate_lock must be a PBN read lock");
1025
1026 /*
1027 * This state is not like any of the other states. There is no designated agent--the agent
1028 * transitioning to this state and all the waiters will be launched to deduplicate in
1029 * parallel.
1030 */
1031 lock->agent = NULL;
1032
1033 /*
1034 * Launch the agent (if not already deduplicated) and as many lock waiters as we have
1035 * available increments for on the dedupe path. If we run out of increments, rollover will
1036 * be triggered and the remaining waiters will be transferred to the new lock.
1037 */
1038 if (!agent_is_done) {
1039 launch_dedupe(lock, agent, true);
1040 agent = NULL;
1041 }
1042 while (vdo_waitq_has_waiters(&lock->waiters))
1043 launch_dedupe(lock, dequeue_lock_waiter(lock), false);
1044
1045 if (agent_is_done) {
1046 /*
1047 * In the degenerate case where all the waiters rolled over to a new lock, this
1048 * will continue to use the old agent to clean up this lock, and otherwise it just
1049 * lets the agent exit the lock.
1050 */
1051 finish_deduping(lock, agent);
1052 }
1053 }
1054
1055 /**
1056 * increment_stat() - Increment a statistic counter in a non-atomic yet thread-safe manner.
1057 * @stat: The statistic field to increment.
1058 */
increment_stat(u64 * stat)1059 static inline void increment_stat(u64 *stat)
1060 {
1061 /*
1062 * Must only be mutated on the hash zone thread. Prevents any compiler shenanigans from
1063 * affecting other threads reading stats.
1064 */
1065 WRITE_ONCE(*stat, *stat + 1);
1066 }
1067
1068 /**
1069 * finish_verifying() - Handle the result of the agent for the lock comparing its data to the
1070 * duplicate candidate.
1071 * @completion: The completion of the data_vio used to verify dedupe
1072 *
1073 * This continuation is registered in start_verifying().
1074 */
finish_verifying(struct vdo_completion * completion)1075 static void finish_verifying(struct vdo_completion *completion)
1076 {
1077 struct data_vio *agent = as_data_vio(completion);
1078 struct hash_lock *lock = agent->hash_lock;
1079
1080 assert_hash_lock_agent(agent, __func__);
1081
1082 lock->verified = agent->is_duplicate;
1083
1084 /*
1085 * Only count the result of the initial verification of the advice as valid or stale, and
1086 * not any re-verifications due to PBN lock releases.
1087 */
1088 if (!lock->verify_counted) {
1089 lock->verify_counted = true;
1090 if (lock->verified)
1091 increment_stat(&agent->hash_zone->statistics.dedupe_advice_valid);
1092 else
1093 increment_stat(&agent->hash_zone->statistics.dedupe_advice_stale);
1094 }
1095
1096 /*
1097 * Even if the block is a verified duplicate, we can't start to deduplicate unless we can
1098 * claim a reference count increment for the agent.
1099 */
1100 if (lock->verified && !vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
1101 agent->is_duplicate = false;
1102 lock->verified = false;
1103 }
1104
1105 if (lock->verified) {
1106 /*
1107 * VERIFYING -> DEDUPING transition: The advice is for a true duplicate, so start
1108 * deduplicating against it, if references are available.
1109 */
1110 start_deduping(lock, agent, false);
1111 } else {
1112 /*
1113 * VERIFYING -> UNLOCKING transition: Either the verify failed or we'd try to
1114 * dedupe and roll over immediately, which would fail because it would leave the
1115 * lock without an agent to release the PBN lock. In both cases, the data will have
1116 * to be written or compressed, but first the advice PBN must be unlocked by the
1117 * VERIFYING agent.
1118 */
1119 lock->update_advice = true;
1120 start_unlocking(lock, agent);
1121 }
1122 }
1123
blocks_equal(char * block1,char * block2)1124 static bool blocks_equal(char *block1, char *block2)
1125 {
1126 int i;
1127
1128 for (i = 0; i < VDO_BLOCK_SIZE; i += sizeof(u64)) {
1129 if (*((u64 *) &block1[i]) != *((u64 *) &block2[i]))
1130 return false;
1131 }
1132
1133 return true;
1134 }
1135
verify_callback(struct vdo_completion * completion)1136 static void verify_callback(struct vdo_completion *completion)
1137 {
1138 struct data_vio *agent = as_data_vio(completion);
1139
1140 agent->is_duplicate = blocks_equal(agent->vio.data, agent->scratch_block);
1141 launch_data_vio_hash_zone_callback(agent, finish_verifying);
1142 }
1143
uncompress_and_verify(struct vdo_completion * completion)1144 static void uncompress_and_verify(struct vdo_completion *completion)
1145 {
1146 struct data_vio *agent = as_data_vio(completion);
1147 int result;
1148
1149 result = uncompress_data_vio(agent, agent->duplicate.state,
1150 agent->scratch_block);
1151 if (result == VDO_SUCCESS) {
1152 verify_callback(completion);
1153 return;
1154 }
1155
1156 agent->is_duplicate = false;
1157 launch_data_vio_hash_zone_callback(agent, finish_verifying);
1158 }
1159
verify_endio(struct bio * bio)1160 static void verify_endio(struct bio *bio)
1161 {
1162 struct data_vio *agent = vio_as_data_vio(bio->bi_private);
1163 int result = blk_status_to_errno(bio->bi_status);
1164
1165 vdo_count_completed_bios(bio);
1166 if (result != VDO_SUCCESS) {
1167 agent->is_duplicate = false;
1168 launch_data_vio_hash_zone_callback(agent, finish_verifying);
1169 return;
1170 }
1171
1172 if (vdo_is_state_compressed(agent->duplicate.state)) {
1173 launch_data_vio_cpu_callback(agent, uncompress_and_verify,
1174 CPU_Q_COMPRESS_BLOCK_PRIORITY);
1175 return;
1176 }
1177
1178 launch_data_vio_cpu_callback(agent, verify_callback,
1179 CPU_Q_COMPLETE_READ_PRIORITY);
1180 }
1181
1182 /**
1183 * start_verifying() - Begin the data verification phase.
1184 * @lock: The hash lock (must be LOCKING).
1185 * @agent: The data_vio to use to read and compare candidate data.
1186 *
1187 * Continue the deduplication path for a hash lock by using the agent to read (and possibly
1188 * decompress) the data at the candidate duplicate location, comparing it to the data in the agent
1189 * to verify that the candidate is identical to all the data_vios sharing the hash. If so, it can
1190 * be deduplicated against, otherwise a data_vio allocation will have to be written to and used for
1191 * dedupe.
1192 */
start_verifying(struct hash_lock * lock,struct data_vio * agent)1193 static void start_verifying(struct hash_lock *lock, struct data_vio *agent)
1194 {
1195 int result;
1196 struct vio *vio = &agent->vio;
1197 char *buffer = (vdo_is_state_compressed(agent->duplicate.state) ?
1198 (char *) agent->compression.block :
1199 agent->scratch_block);
1200
1201 lock->state = VDO_HASH_LOCK_VERIFYING;
1202 VDO_ASSERT_LOG_ONLY(!lock->verified, "hash lock only verifies advice once");
1203
1204 agent->last_async_operation = VIO_ASYNC_OP_VERIFY_DUPLICATION;
1205 result = vio_reset_bio(vio, buffer, verify_endio, REQ_OP_READ,
1206 agent->duplicate.pbn);
1207 if (result != VDO_SUCCESS) {
1208 set_data_vio_hash_zone_callback(agent, finish_verifying);
1209 continue_data_vio_with_error(agent, result);
1210 return;
1211 }
1212
1213 set_data_vio_bio_zone_callback(agent, vdo_submit_vio);
1214 vdo_launch_completion_with_priority(&vio->completion, BIO_Q_VERIFY_PRIORITY);
1215 }
1216
1217 /**
1218 * finish_locking() - Handle the result of the agent for the lock attempting to obtain a PBN read
1219 * lock on the candidate duplicate block.
1220 * @completion: The completion of the data_vio that attempted to get the read lock.
1221 *
1222 * This continuation is registered in lock_duplicate_pbn().
1223 */
finish_locking(struct vdo_completion * completion)1224 static void finish_locking(struct vdo_completion *completion)
1225 {
1226 struct data_vio *agent = as_data_vio(completion);
1227 struct hash_lock *lock = agent->hash_lock;
1228
1229 assert_hash_lock_agent(agent, __func__);
1230
1231 if (!agent->is_duplicate) {
1232 VDO_ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
1233 "must not hold duplicate_lock if not flagged as a duplicate");
1234 /*
1235 * LOCKING -> WRITING transition: The advice block is being modified or has no
1236 * available references, so try to write or compress the data, remembering to
1237 * update UDS later with the new advice.
1238 */
1239 increment_stat(&agent->hash_zone->statistics.dedupe_advice_stale);
1240 lock->update_advice = true;
1241 start_writing(lock, agent);
1242 return;
1243 }
1244
1245 VDO_ASSERT_LOG_ONLY(lock->duplicate_lock != NULL,
1246 "must hold duplicate_lock if flagged as a duplicate");
1247
1248 if (!lock->verified) {
1249 /*
1250 * LOCKING -> VERIFYING transition: Continue on the unverified dedupe path, reading
1251 * the candidate duplicate and comparing it to the agent's data to decide whether
1252 * it is a true duplicate or stale advice.
1253 */
1254 start_verifying(lock, agent);
1255 return;
1256 }
1257
1258 if (!vdo_claim_pbn_lock_increment(lock->duplicate_lock)) {
1259 /*
1260 * LOCKING -> UNLOCKING transition: The verified block was re-locked, but has no
1261 * available increments left. Must first release the useless PBN read lock before
1262 * rolling over to a new copy of the block.
1263 */
1264 agent->is_duplicate = false;
1265 lock->verified = false;
1266 lock->update_advice = true;
1267 start_unlocking(lock, agent);
1268 return;
1269 }
1270
1271 /*
1272 * LOCKING -> DEDUPING transition: Continue on the verified dedupe path, deduplicating
1273 * against a location that was previously verified or written to.
1274 */
1275 start_deduping(lock, agent, false);
1276 }
1277
acquire_provisional_reference(struct data_vio * agent,struct pbn_lock * lock,struct slab_depot * depot)1278 static bool acquire_provisional_reference(struct data_vio *agent, struct pbn_lock *lock,
1279 struct slab_depot *depot)
1280 {
1281 /* Ensure that the newly-locked block is referenced. */
1282 struct vdo_slab *slab = vdo_get_slab(depot, agent->duplicate.pbn);
1283 int result = vdo_acquire_provisional_reference(slab, agent->duplicate.pbn, lock);
1284
1285 if (result == VDO_SUCCESS)
1286 return true;
1287
1288 vdo_log_warning_strerror(result,
1289 "Error acquiring provisional reference for dedupe candidate; aborting dedupe");
1290 agent->is_duplicate = false;
1291 vdo_release_physical_zone_pbn_lock(agent->duplicate.zone,
1292 agent->duplicate.pbn, lock);
1293 continue_data_vio_with_error(agent, result);
1294 return false;
1295 }
1296
1297 /**
1298 * lock_duplicate_pbn() - Acquire a read lock on the PBN of the block containing candidate
1299 * duplicate data (compressed or uncompressed).
1300 * @completion: The completion of the data_vio attempting to acquire the physical block lock on
1301 * behalf of its hash lock.
1302 *
1303 * If the PBN is already locked for writing, the lock attempt is abandoned and is_duplicate will be
1304 * cleared before calling back. This continuation is launched from start_locking(), and calls back
1305 * to finish_locking() on the hash zone thread.
1306 */
lock_duplicate_pbn(struct vdo_completion * completion)1307 static void lock_duplicate_pbn(struct vdo_completion *completion)
1308 {
1309 unsigned int increment_limit;
1310 struct pbn_lock *lock;
1311 int result;
1312
1313 struct data_vio *agent = as_data_vio(completion);
1314 struct slab_depot *depot = vdo_from_data_vio(agent)->depot;
1315 struct physical_zone *zone = agent->duplicate.zone;
1316
1317 assert_data_vio_in_duplicate_zone(agent);
1318
1319 set_data_vio_hash_zone_callback(agent, finish_locking);
1320
1321 /*
1322 * While in the zone that owns it, find out how many additional references can be made to
1323 * the block if it turns out to truly be a duplicate.
1324 */
1325 increment_limit = vdo_get_increment_limit(depot, agent->duplicate.pbn);
1326 if (increment_limit == 0) {
1327 /*
1328 * We could deduplicate against it later if a reference happened to be released
1329 * during verification, but it's probably better to bail out now.
1330 */
1331 agent->is_duplicate = false;
1332 continue_data_vio(agent);
1333 return;
1334 }
1335
1336 result = vdo_attempt_physical_zone_pbn_lock(zone, agent->duplicate.pbn,
1337 VIO_READ_LOCK, &lock);
1338 if (result != VDO_SUCCESS) {
1339 continue_data_vio_with_error(agent, result);
1340 return;
1341 }
1342
1343 if (!vdo_is_pbn_read_lock(lock)) {
1344 /*
1345 * There are three cases of write locks: uncompressed data block writes, compressed
1346 * (packed) block writes, and block map page writes. In all three cases, we give up
1347 * on trying to verify the advice and don't bother to try deduplicate against the
1348 * data in the write lock holder.
1349 *
1350 * 1) We don't ever want to try to deduplicate against a block map page.
1351 *
1352 * 2a) It's very unlikely we'd deduplicate against an entire packed block, both
1353 * because of the chance of matching it, and because we don't record advice for it,
1354 * but for the uncompressed representation of all the fragments it contains. The
1355 * only way we'd be getting lock contention is if we've written the same
1356 * representation coincidentally before, had it become unreferenced, and it just
1357 * happened to be packed together from compressed writes when we go to verify the
1358 * lucky advice. Giving up is a minuscule loss of potential dedupe.
1359 *
1360 * 2b) If the advice is for a slot of a compressed block, it's about to get
1361 * smashed, and the write smashing it cannot contain our data--it would have to be
1362 * writing on behalf of our hash lock, but that's impossible since we're the lock
1363 * agent.
1364 *
1365 * 3a) If the lock is held by a data_vio with different data, the advice is already
1366 * stale or is about to become stale.
1367 *
1368 * 3b) If the lock is held by a data_vio that matches us, we may as well either
1369 * write it ourselves (or reference the copy we already wrote) instead of
1370 * potentially having many duplicates wait for the lock holder to write, journal,
1371 * hash, and finally arrive in the hash lock. We lose a chance to avoid a UDS
1372 * update in the very rare case of advice for a free block that just happened to be
1373 * allocated to a data_vio with the same hash. There's also a chance to save on a
1374 * block write, at the cost of a block verify. Saving on a full block compare in
1375 * all stale advice cases almost certainly outweighs saving a UDS update and
1376 * trading a write for a read in a lucky case where advice would have been saved
1377 * from becoming stale.
1378 */
1379 agent->is_duplicate = false;
1380 continue_data_vio(agent);
1381 return;
1382 }
1383
1384 if (lock->holder_count == 0) {
1385 if (!acquire_provisional_reference(agent, lock, depot))
1386 return;
1387
1388 /*
1389 * The increment limit we grabbed earlier is still valid. The lock now holds the
1390 * rights to acquire all those references. Those rights will be claimed by hash
1391 * locks sharing this read lock.
1392 */
1393 lock->increment_limit = increment_limit;
1394 }
1395
1396 /*
1397 * We've successfully acquired a read lock on behalf of the hash lock, so mark it as such.
1398 */
1399 set_duplicate_lock(agent->hash_lock, lock);
1400
1401 /*
1402 * TODO: Optimization: We could directly launch the block verify, then switch to a hash
1403 * thread.
1404 */
1405 continue_data_vio(agent);
1406 }
1407
1408 /**
1409 * start_locking() - Continue deduplication for a hash lock that has obtained valid advice of a
1410 * potential duplicate through its agent.
1411 * @lock: The hash lock (currently must be QUERYING).
1412 * @agent: The data_vio bearing the dedupe advice.
1413 */
start_locking(struct hash_lock * lock,struct data_vio * agent)1414 static void start_locking(struct hash_lock *lock, struct data_vio *agent)
1415 {
1416 VDO_ASSERT_LOG_ONLY(lock->duplicate_lock == NULL,
1417 "must not acquire a duplicate lock when already holding it");
1418
1419 lock->state = VDO_HASH_LOCK_LOCKING;
1420
1421 /*
1422 * TODO: Optimization: If we arrange to continue on the duplicate zone thread when
1423 * accepting the advice, and don't explicitly change lock states (or use an agent-local
1424 * state, or an atomic), we can avoid a thread transition here.
1425 */
1426 agent->last_async_operation = VIO_ASYNC_OP_LOCK_DUPLICATE_PBN;
1427 launch_data_vio_duplicate_zone_callback(agent, lock_duplicate_pbn);
1428 }
1429
1430 /**
1431 * finish_writing() - Re-entry point for the lock agent after it has finished writing or
1432 * compressing its copy of the data block.
1433 * @lock: The hash lock, which must be in state WRITING.
1434 * @agent: The data_vio that wrote its data for the lock.
1435 *
1436 * The agent will never need to dedupe against anything, so it's done with the lock, but the lock
1437 * may not be finished with it, as a UDS update might still be needed.
1438 *
1439 * If there are other lock holders, the agent will hand the job to one of them and exit, leaving
1440 * the lock to deduplicate against the just-written block. If there are no other lock holders, the
1441 * agent either exits (and later tears down the hash lock), or it remains the agent and updates
1442 * UDS.
1443 */
finish_writing(struct hash_lock * lock,struct data_vio * agent)1444 static void finish_writing(struct hash_lock *lock, struct data_vio *agent)
1445 {
1446 /*
1447 * Dedupe against the data block or compressed block slot the agent wrote. Since we know
1448 * the write succeeded, there's no need to verify it.
1449 */
1450 lock->duplicate = agent->new_mapped;
1451 lock->verified = true;
1452
1453 if (vdo_is_state_compressed(lock->duplicate.state) && lock->registered) {
1454 /*
1455 * Compression means the location we gave in the UDS query is not the location
1456 * we're using to deduplicate.
1457 */
1458 lock->update_advice = true;
1459 }
1460
1461 /* If there are any waiters, we need to start deduping them. */
1462 if (vdo_waitq_has_waiters(&lock->waiters)) {
1463 /*
1464 * WRITING -> DEDUPING transition: an asynchronously-written block failed to
1465 * compress, so the PBN lock on the written copy was already transferred. The agent
1466 * is done with the lock, but the lock may still need to use it to clean up after
1467 * rollover.
1468 */
1469 start_deduping(lock, agent, true);
1470 return;
1471 }
1472
1473 /*
1474 * There are no waiters and the agent has successfully written, so take a step towards
1475 * being able to release the hash lock (or just release it).
1476 */
1477 if (lock->update_advice) {
1478 /*
1479 * WRITING -> UPDATING transition: There's no waiter and a UDS update is needed, so
1480 * retain the WRITING agent and use it to launch the update. The happens on
1481 * compression, rollover, or the QUERYING agent not having an allocation.
1482 */
1483 start_updating(lock, agent);
1484 } else if (lock->duplicate_lock != NULL) {
1485 /*
1486 * WRITING -> UNLOCKING transition: There's no waiter and no update needed, but the
1487 * compressed write gave us a shared duplicate lock that we must release.
1488 */
1489 set_duplicate_location(agent, lock->duplicate);
1490 start_unlocking(lock, agent);
1491 } else {
1492 /*
1493 * WRITING -> BYPASSING transition: There's no waiter, no update needed, and no
1494 * duplicate lock held, so both the agent and lock have no more work to do. The
1495 * agent will release its allocation lock in cleanup.
1496 */
1497 start_bypassing(lock, agent);
1498 }
1499 }
1500
1501 /**
1502 * select_writing_agent() - Search through the lock waiters for a data_vio that has an allocation.
1503 * @lock: The hash lock to modify.
1504 *
1505 * If an allocation is found, swap agents, put the old agent at the head of the wait queue, then
1506 * return the new agent. Otherwise, just return the current agent.
1507 */
select_writing_agent(struct hash_lock * lock)1508 static struct data_vio *select_writing_agent(struct hash_lock *lock)
1509 {
1510 struct vdo_wait_queue temp_queue;
1511 struct data_vio *data_vio;
1512
1513 vdo_waitq_init(&temp_queue);
1514
1515 /*
1516 * Move waiters to the temp queue one-by-one until we find an allocation. Not ideal to
1517 * search, but it only happens when nearly out of space.
1518 */
1519 while (((data_vio = dequeue_lock_waiter(lock)) != NULL) &&
1520 !data_vio_has_allocation(data_vio)) {
1521 /* Use the lower-level enqueue since we're just moving waiters around. */
1522 vdo_waitq_enqueue_waiter(&temp_queue, &data_vio->waiter);
1523 }
1524
1525 if (data_vio != NULL) {
1526 /*
1527 * Move the rest of the waiters over to the temp queue, preserving the order they
1528 * arrived at the lock.
1529 */
1530 vdo_waitq_transfer_all_waiters(&lock->waiters, &temp_queue);
1531
1532 /*
1533 * The current agent is being replaced and will have to wait to dedupe; make it the
1534 * first waiter since it was the first to reach the lock.
1535 */
1536 vdo_waitq_enqueue_waiter(&lock->waiters, &lock->agent->waiter);
1537 lock->agent = data_vio;
1538 } else {
1539 /* No one has an allocation, so keep the current agent. */
1540 data_vio = lock->agent;
1541 }
1542
1543 /* Swap all the waiters back onto the lock's queue. */
1544 vdo_waitq_transfer_all_waiters(&temp_queue, &lock->waiters);
1545 return data_vio;
1546 }
1547
1548 /**
1549 * start_writing() - Begin the non-duplicate write path.
1550 * @lock: The hash lock (currently must be QUERYING).
1551 * @agent: The data_vio currently acting as the agent for the lock.
1552 *
1553 * Begins the non-duplicate write path for a hash lock that had no advice, selecting a data_vio
1554 * with an allocation as a new agent, if necessary, then resuming the agent on the data_vio write
1555 * path.
1556 */
start_writing(struct hash_lock * lock,struct data_vio * agent)1557 static void start_writing(struct hash_lock *lock, struct data_vio *agent)
1558 {
1559 lock->state = VDO_HASH_LOCK_WRITING;
1560
1561 /*
1562 * The agent might not have received an allocation and so can't be used for writing, but
1563 * it's entirely possible that one of the waiters did.
1564 */
1565 if (!data_vio_has_allocation(agent)) {
1566 agent = select_writing_agent(lock);
1567 /* If none of the waiters had an allocation, the writes all have to fail. */
1568 if (!data_vio_has_allocation(agent)) {
1569 /*
1570 * TODO: Should we keep a variant of BYPASSING that causes new arrivals to
1571 * fail immediately if they don't have an allocation? It might be possible
1572 * that on some path there would be non-waiters still referencing the lock,
1573 * so it would remain in the map as everything is currently spelled, even
1574 * if the agent and all waiters release.
1575 */
1576 continue_data_vio_with_error(agent, VDO_NO_SPACE);
1577 return;
1578 }
1579 }
1580
1581 /*
1582 * If the agent compresses, it might wait indefinitely in the packer, which would be bad if
1583 * there are any other data_vios waiting.
1584 */
1585 if (vdo_waitq_has_waiters(&lock->waiters))
1586 cancel_data_vio_compression(agent);
1587
1588 /*
1589 * Send the agent to the compress/pack/write path in vioWrite. If it succeeds, it will
1590 * return to the hash lock via vdo_continue_hash_lock() and call finish_writing().
1591 */
1592 launch_compress_data_vio(agent);
1593 }
1594
1595 /*
1596 * Decode VDO duplicate advice from the old_metadata field of a UDS request.
1597 * Returns true if valid advice was found and decoded
1598 */
decode_uds_advice(struct dedupe_context * context)1599 static bool decode_uds_advice(struct dedupe_context *context)
1600 {
1601 const struct uds_request *request = &context->request;
1602 struct data_vio *data_vio = context->requestor;
1603 size_t offset = 0;
1604 const struct uds_record_data *encoding = &request->old_metadata;
1605 struct vdo *vdo = vdo_from_data_vio(data_vio);
1606 struct zoned_pbn *advice = &data_vio->duplicate;
1607 u8 version;
1608 int result;
1609
1610 if ((request->status != UDS_SUCCESS) || !request->found)
1611 return false;
1612
1613 version = encoding->data[offset++];
1614 if (version != UDS_ADVICE_VERSION) {
1615 vdo_log_error("invalid UDS advice version code %u", version);
1616 return false;
1617 }
1618
1619 advice->state = encoding->data[offset++];
1620 advice->pbn = get_unaligned_le64(&encoding->data[offset]);
1621 offset += sizeof(u64);
1622 BUG_ON(offset != UDS_ADVICE_SIZE);
1623
1624 /* Don't use advice that's clearly meaningless. */
1625 if ((advice->state == VDO_MAPPING_STATE_UNMAPPED) || (advice->pbn == VDO_ZERO_BLOCK)) {
1626 vdo_log_debug("Invalid advice from deduplication server: pbn %llu, state %u. Giving up on deduplication of logical block %llu",
1627 (unsigned long long) advice->pbn, advice->state,
1628 (unsigned long long) data_vio->logical.lbn);
1629 atomic64_inc(&vdo->stats.invalid_advice_pbn_count);
1630 return false;
1631 }
1632
1633 result = vdo_get_physical_zone(vdo, advice->pbn, &advice->zone);
1634 if ((result != VDO_SUCCESS) || (advice->zone == NULL)) {
1635 vdo_log_debug("Invalid physical block number from deduplication server: %llu, giving up on deduplication of logical block %llu",
1636 (unsigned long long) advice->pbn,
1637 (unsigned long long) data_vio->logical.lbn);
1638 atomic64_inc(&vdo->stats.invalid_advice_pbn_count);
1639 return false;
1640 }
1641
1642 return true;
1643 }
1644
process_query_result(struct data_vio * agent)1645 static void process_query_result(struct data_vio *agent)
1646 {
1647 struct dedupe_context *context = agent->dedupe_context;
1648
1649 if (context == NULL)
1650 return;
1651
1652 if (change_context_state(context, DEDUPE_CONTEXT_COMPLETE, DEDUPE_CONTEXT_IDLE)) {
1653 agent->is_duplicate = decode_uds_advice(context);
1654 agent->dedupe_context = NULL;
1655 release_context(context);
1656 }
1657 }
1658
1659 /**
1660 * finish_querying() - Process the result of a UDS query performed by the agent for the lock.
1661 * @completion: The completion of the data_vio that performed the query.
1662 *
1663 * This continuation is registered in start_querying().
1664 */
finish_querying(struct vdo_completion * completion)1665 static void finish_querying(struct vdo_completion *completion)
1666 {
1667 struct data_vio *agent = as_data_vio(completion);
1668 struct hash_lock *lock = agent->hash_lock;
1669
1670 assert_hash_lock_agent(agent, __func__);
1671
1672 process_query_result(agent);
1673
1674 if (agent->is_duplicate) {
1675 lock->duplicate = agent->duplicate;
1676 /*
1677 * QUERYING -> LOCKING transition: Valid advice was obtained from UDS. Use the
1678 * QUERYING agent to start the hash lock on the unverified dedupe path, verifying
1679 * that the advice can be used.
1680 */
1681 start_locking(lock, agent);
1682 } else {
1683 /*
1684 * The agent will be used as the duplicate if has an allocation; if it does, that
1685 * location was posted to UDS, so no update will be needed.
1686 */
1687 lock->update_advice = !data_vio_has_allocation(agent);
1688 /*
1689 * QUERYING -> WRITING transition: There was no advice or the advice wasn't valid,
1690 * so try to write or compress the data.
1691 */
1692 start_writing(lock, agent);
1693 }
1694 }
1695
1696 /**
1697 * start_querying() - Start deduplication for a hash lock.
1698 * @lock: The initialized hash lock.
1699 * @data_vio: The data_vio that has just obtained the new lock.
1700 *
1701 * Starts deduplication for a hash lock that has finished initializing by making the data_vio that
1702 * requested it the agent, entering the QUERYING state, and using the agent to perform the UDS
1703 * query on behalf of the lock.
1704 */
start_querying(struct hash_lock * lock,struct data_vio * data_vio)1705 static void start_querying(struct hash_lock *lock, struct data_vio *data_vio)
1706 {
1707 lock->agent = data_vio;
1708 lock->state = VDO_HASH_LOCK_QUERYING;
1709 data_vio->last_async_operation = VIO_ASYNC_OP_CHECK_FOR_DUPLICATION;
1710 set_data_vio_hash_zone_callback(data_vio, finish_querying);
1711 query_index(data_vio,
1712 (data_vio_has_allocation(data_vio) ? UDS_POST : UDS_QUERY));
1713 }
1714
1715 /**
1716 * report_bogus_lock_state() - Complain that a data_vio has entered a hash_lock that is in an
1717 * unimplemented or unusable state and continue the data_vio with an
1718 * error.
1719 * @lock: The hash lock.
1720 * @data_vio: The data_vio attempting to enter the lock.
1721 */
report_bogus_lock_state(struct hash_lock * lock,struct data_vio * data_vio)1722 static void report_bogus_lock_state(struct hash_lock *lock, struct data_vio *data_vio)
1723 {
1724 VDO_ASSERT_LOG_ONLY(false, "hash lock must not be in unimplemented state %s",
1725 get_hash_lock_state_name(lock->state));
1726 continue_data_vio_with_error(data_vio, VDO_LOCK_ERROR);
1727 }
1728
1729 /**
1730 * vdo_continue_hash_lock() - Continue the processing state after writing, compressing, or
1731 * deduplicating.
1732 * @completion: The data_vio completion to continue processing in its hash lock.
1733 *
1734 * Asynchronously continue processing a data_vio in its hash lock after it has finished writing,
1735 * compressing, or deduplicating, so it can share the result with any data_vios waiting in the hash
1736 * lock, or update the UDS index, or simply release its share of the lock.
1737 *
1738 * Context: This must only be called in the correct thread for the hash zone.
1739 */
vdo_continue_hash_lock(struct vdo_completion * completion)1740 void vdo_continue_hash_lock(struct vdo_completion *completion)
1741 {
1742 struct data_vio *data_vio = as_data_vio(completion);
1743 struct hash_lock *lock = data_vio->hash_lock;
1744
1745 switch (lock->state) {
1746 case VDO_HASH_LOCK_WRITING:
1747 VDO_ASSERT_LOG_ONLY(data_vio == lock->agent,
1748 "only the lock agent may continue the lock");
1749 finish_writing(lock, data_vio);
1750 break;
1751
1752 case VDO_HASH_LOCK_DEDUPING:
1753 finish_deduping(lock, data_vio);
1754 break;
1755
1756 case VDO_HASH_LOCK_BYPASSING:
1757 /* This data_vio has finished the write path and the lock doesn't need it. */
1758 exit_hash_lock(data_vio);
1759 break;
1760
1761 case VDO_HASH_LOCK_INITIALIZING:
1762 case VDO_HASH_LOCK_QUERYING:
1763 case VDO_HASH_LOCK_UPDATING:
1764 case VDO_HASH_LOCK_LOCKING:
1765 case VDO_HASH_LOCK_VERIFYING:
1766 case VDO_HASH_LOCK_UNLOCKING:
1767 /* A lock in this state should never be re-entered. */
1768 report_bogus_lock_state(lock, data_vio);
1769 break;
1770
1771 default:
1772 report_bogus_lock_state(lock, data_vio);
1773 }
1774 }
1775
1776 /**
1777 * is_hash_collision() - Check to see if a hash collision has occurred.
1778 * @lock: The lock to check.
1779 * @candidate: The data_vio seeking to share the lock.
1780 *
1781 * Check whether the data in data_vios sharing a lock is different than in a data_vio seeking to
1782 * share the lock, which should only be possible in the extremely unlikely case of a hash
1783 * collision.
1784 *
1785 * Return: true if the given data_vio must not share the lock because it doesn't have the same data
1786 * as the lock holders.
1787 */
is_hash_collision(struct hash_lock * lock,struct data_vio * candidate)1788 static bool is_hash_collision(struct hash_lock *lock, struct data_vio *candidate)
1789 {
1790 struct data_vio *lock_holder;
1791 struct hash_zone *zone;
1792 bool collides;
1793
1794 if (list_empty(&lock->duplicate_vios))
1795 return false;
1796
1797 lock_holder = list_first_entry(&lock->duplicate_vios, struct data_vio,
1798 hash_lock_entry);
1799 zone = candidate->hash_zone;
1800 collides = !blocks_equal(lock_holder->vio.data, candidate->vio.data);
1801 if (collides)
1802 increment_stat(&zone->statistics.concurrent_hash_collisions);
1803 else
1804 increment_stat(&zone->statistics.concurrent_data_matches);
1805
1806 return collides;
1807 }
1808
assert_hash_lock_preconditions(const struct data_vio * data_vio)1809 static inline int assert_hash_lock_preconditions(const struct data_vio *data_vio)
1810 {
1811 int result;
1812
1813 /* FIXME: BUG_ON() and/or enter read-only mode? */
1814 result = VDO_ASSERT(data_vio->hash_lock == NULL,
1815 "must not already hold a hash lock");
1816 if (result != VDO_SUCCESS)
1817 return result;
1818
1819 result = VDO_ASSERT(list_empty(&data_vio->hash_lock_entry),
1820 "must not already be a member of a hash lock list");
1821 if (result != VDO_SUCCESS)
1822 return result;
1823
1824 return VDO_ASSERT(data_vio->recovery_sequence_number == 0,
1825 "must not hold a recovery lock when getting a hash lock");
1826 }
1827
1828 /**
1829 * vdo_acquire_hash_lock() - Acquire or share a lock on a record name.
1830 * @completion: The data_vio completion acquiring a lock on its record name.
1831 *
1832 * Acquire or share a lock on the hash (record name) of the data in a data_vio, updating the
1833 * data_vio to reference the lock. This must only be called in the correct thread for the zone. In
1834 * the unlikely case of a hash collision, this function will succeed, but the data_vio will not get
1835 * a lock reference.
1836 */
vdo_acquire_hash_lock(struct vdo_completion * completion)1837 void vdo_acquire_hash_lock(struct vdo_completion *completion)
1838 {
1839 struct data_vio *data_vio = as_data_vio(completion);
1840 struct hash_lock *lock;
1841 int result;
1842
1843 assert_data_vio_in_hash_zone(data_vio);
1844
1845 result = assert_hash_lock_preconditions(data_vio);
1846 if (result != VDO_SUCCESS) {
1847 continue_data_vio_with_error(data_vio, result);
1848 return;
1849 }
1850
1851 result = acquire_lock(data_vio->hash_zone, &data_vio->record_name, NULL, &lock);
1852 if (result != VDO_SUCCESS) {
1853 continue_data_vio_with_error(data_vio, result);
1854 return;
1855 }
1856
1857 if (is_hash_collision(lock, data_vio)) {
1858 /*
1859 * Hash collisions are extremely unlikely, but the bogus dedupe would be a data
1860 * corruption. Bypass optimization entirely. We can't compress a data_vio without
1861 * a hash_lock as the compressed write depends on the hash_lock to manage the
1862 * references for the compressed block.
1863 */
1864 write_data_vio(data_vio);
1865 return;
1866 }
1867
1868 set_hash_lock(data_vio, lock);
1869 switch (lock->state) {
1870 case VDO_HASH_LOCK_INITIALIZING:
1871 start_querying(lock, data_vio);
1872 return;
1873
1874 case VDO_HASH_LOCK_QUERYING:
1875 case VDO_HASH_LOCK_WRITING:
1876 case VDO_HASH_LOCK_UPDATING:
1877 case VDO_HASH_LOCK_LOCKING:
1878 case VDO_HASH_LOCK_VERIFYING:
1879 case VDO_HASH_LOCK_UNLOCKING:
1880 /* The lock is busy, and can't be shared yet. */
1881 wait_on_hash_lock(lock, data_vio);
1882 return;
1883
1884 case VDO_HASH_LOCK_BYPASSING:
1885 /* We can't use this lock, so bypass optimization entirely. */
1886 vdo_release_hash_lock(data_vio);
1887 write_data_vio(data_vio);
1888 return;
1889
1890 case VDO_HASH_LOCK_DEDUPING:
1891 launch_dedupe(lock, data_vio, false);
1892 return;
1893
1894 default:
1895 /* A lock in this state should not be acquired by new VIOs. */
1896 report_bogus_lock_state(lock, data_vio);
1897 }
1898 }
1899
1900 /**
1901 * vdo_release_hash_lock() - Release a data_vio's share of a hash lock, if held, and null out the
1902 * data_vio's reference to it.
1903 * @data_vio: The data_vio releasing its hash lock.
1904 *
1905 * If the data_vio is the only one holding the lock, this also releases any resources or locks used
1906 * by the hash lock (such as a PBN read lock on a block containing data with the same hash) and
1907 * returns the lock to the hash zone's lock pool.
1908 *
1909 * Context: This must only be called in the correct thread for the hash zone.
1910 */
vdo_release_hash_lock(struct data_vio * data_vio)1911 void vdo_release_hash_lock(struct data_vio *data_vio)
1912 {
1913 u64 lock_key;
1914 struct hash_lock *lock = data_vio->hash_lock;
1915 struct hash_zone *zone = data_vio->hash_zone;
1916
1917 if (lock == NULL)
1918 return;
1919
1920 set_hash_lock(data_vio, NULL);
1921
1922 if (lock->reference_count > 0) {
1923 /* The lock is still in use by other data_vios. */
1924 return;
1925 }
1926
1927 lock_key = hash_lock_key(lock);
1928 if (lock->registered) {
1929 struct hash_lock *removed;
1930
1931 removed = vdo_int_map_remove(zone->hash_lock_map, lock_key);
1932 VDO_ASSERT_LOG_ONLY(lock == removed,
1933 "hash lock being released must have been mapped");
1934 } else {
1935 VDO_ASSERT_LOG_ONLY(lock != vdo_int_map_get(zone->hash_lock_map, lock_key),
1936 "unregistered hash lock must not be in the lock map");
1937 }
1938
1939 VDO_ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&lock->waiters),
1940 "hash lock returned to zone must have no waiters");
1941 VDO_ASSERT_LOG_ONLY((lock->duplicate_lock == NULL),
1942 "hash lock returned to zone must not reference a PBN lock");
1943 VDO_ASSERT_LOG_ONLY((lock->state == VDO_HASH_LOCK_BYPASSING),
1944 "returned hash lock must not be in use with state %s",
1945 get_hash_lock_state_name(lock->state));
1946 VDO_ASSERT_LOG_ONLY(list_empty(&lock->pool_node),
1947 "hash lock returned to zone must not be in a pool list");
1948 VDO_ASSERT_LOG_ONLY(list_empty(&lock->duplicate_vios),
1949 "hash lock returned to zone must not reference DataVIOs");
1950
1951 return_hash_lock_to_pool(zone, lock);
1952 }
1953
1954 /**
1955 * transfer_allocation_lock() - Transfer a data_vio's downgraded allocation PBN lock to the
1956 * data_vio's hash lock, converting it to a duplicate PBN lock.
1957 * @data_vio: The data_vio holding the allocation lock to transfer.
1958 */
transfer_allocation_lock(struct data_vio * data_vio)1959 static void transfer_allocation_lock(struct data_vio *data_vio)
1960 {
1961 struct allocation *allocation = &data_vio->allocation;
1962 struct hash_lock *hash_lock = data_vio->hash_lock;
1963
1964 VDO_ASSERT_LOG_ONLY(data_vio->new_mapped.pbn == allocation->pbn,
1965 "transferred lock must be for the block written");
1966
1967 allocation->pbn = VDO_ZERO_BLOCK;
1968
1969 VDO_ASSERT_LOG_ONLY(vdo_is_pbn_read_lock(allocation->lock),
1970 "must have downgraded the allocation lock before transfer");
1971
1972 hash_lock->duplicate = data_vio->new_mapped;
1973 data_vio->duplicate = data_vio->new_mapped;
1974
1975 /*
1976 * Since the lock is being transferred, the holder count doesn't change (and isn't even
1977 * safe to examine on this thread).
1978 */
1979 hash_lock->duplicate_lock = vdo_forget(allocation->lock);
1980 }
1981
1982 /**
1983 * vdo_share_compressed_write_lock() - Make a data_vio's hash lock a shared holder of the PBN lock
1984 * on the compressed block to which its data was just written.
1985 * @data_vio: The data_vio which was just compressed.
1986 * @pbn_lock: The PBN lock on the compressed block.
1987 *
1988 * If the lock is still a write lock (as it will be for the first share), it will be converted to a
1989 * read lock. This also reserves a reference count increment for the data_vio.
1990 */
vdo_share_compressed_write_lock(struct data_vio * data_vio,struct pbn_lock * pbn_lock)1991 void vdo_share_compressed_write_lock(struct data_vio *data_vio,
1992 struct pbn_lock *pbn_lock)
1993 {
1994 bool claimed;
1995
1996 VDO_ASSERT_LOG_ONLY(vdo_get_duplicate_lock(data_vio) == NULL,
1997 "a duplicate PBN lock should not exist when writing");
1998 VDO_ASSERT_LOG_ONLY(vdo_is_state_compressed(data_vio->new_mapped.state),
1999 "lock transfer must be for a compressed write");
2000 assert_data_vio_in_new_mapped_zone(data_vio);
2001
2002 /* First sharer downgrades the lock. */
2003 if (!vdo_is_pbn_read_lock(pbn_lock))
2004 vdo_downgrade_pbn_write_lock(pbn_lock, true);
2005
2006 /*
2007 * Get a share of the PBN lock, ensuring it cannot be released until after this data_vio
2008 * has had a chance to journal a reference.
2009 */
2010 data_vio->duplicate = data_vio->new_mapped;
2011 data_vio->hash_lock->duplicate = data_vio->new_mapped;
2012 set_duplicate_lock(data_vio->hash_lock, pbn_lock);
2013
2014 /*
2015 * Claim a reference for this data_vio. Necessary since another hash_lock might start
2016 * deduplicating against it before our incRef.
2017 */
2018 claimed = vdo_claim_pbn_lock_increment(pbn_lock);
2019 VDO_ASSERT_LOG_ONLY(claimed, "impossible to fail to claim an initial increment");
2020 }
2021
start_uds_queue(void * ptr)2022 static void start_uds_queue(void *ptr)
2023 {
2024 /*
2025 * Allow the UDS dedupe worker thread to do memory allocations. It will only do allocations
2026 * during the UDS calls that open or close an index, but those allocations can safely sleep
2027 * while reserving a large amount of memory. We could use an allocations_allowed boolean
2028 * (like the base threads do), but it would be an unnecessary embellishment.
2029 */
2030 struct vdo_thread *thread = vdo_get_work_queue_owner(vdo_get_current_work_queue());
2031
2032 vdo_register_allocating_thread(&thread->allocating_thread, NULL);
2033 }
2034
finish_uds_queue(void * ptr __always_unused)2035 static void finish_uds_queue(void *ptr __always_unused)
2036 {
2037 vdo_unregister_allocating_thread();
2038 }
2039
close_index(struct hash_zones * zones)2040 static void close_index(struct hash_zones *zones)
2041 __must_hold(&zones->lock)
2042 {
2043 int result;
2044
2045 /*
2046 * Change the index state so that get_index_statistics() will not try to use the index
2047 * session we are closing.
2048 */
2049 zones->index_state = IS_CHANGING;
2050 /* Close the index session, while not holding the lock. */
2051 spin_unlock(&zones->lock);
2052 result = uds_close_index(zones->index_session);
2053
2054 if (result != UDS_SUCCESS)
2055 vdo_log_error_strerror(result, "Error closing index");
2056 spin_lock(&zones->lock);
2057 zones->index_state = IS_CLOSED;
2058 zones->error_flag |= result != UDS_SUCCESS;
2059 /* ASSERTION: We leave in IS_CLOSED state. */
2060 }
2061
open_index(struct hash_zones * zones)2062 static void open_index(struct hash_zones *zones)
2063 __must_hold(&zones->lock)
2064 {
2065 /* ASSERTION: We enter in IS_CLOSED state. */
2066 int result;
2067 bool create_flag = zones->create_flag;
2068
2069 zones->create_flag = false;
2070 /*
2071 * Change the index state so that the it will be reported to the outside world as
2072 * "opening".
2073 */
2074 zones->index_state = IS_CHANGING;
2075 zones->error_flag = false;
2076
2077 /* Open the index session, while not holding the lock */
2078 spin_unlock(&zones->lock);
2079 result = uds_open_index(create_flag ? UDS_CREATE : UDS_LOAD,
2080 &zones->parameters, zones->index_session);
2081 if (result != UDS_SUCCESS)
2082 vdo_log_error_strerror(result, "Error opening index");
2083
2084 spin_lock(&zones->lock);
2085 if (!create_flag) {
2086 switch (result) {
2087 case -ENOENT:
2088 /*
2089 * Either there is no index, or there is no way we can recover the index.
2090 * We will be called again and try to create a new index.
2091 */
2092 zones->index_state = IS_CLOSED;
2093 zones->create_flag = true;
2094 return;
2095 default:
2096 break;
2097 }
2098 }
2099 if (result == UDS_SUCCESS) {
2100 zones->index_state = IS_OPENED;
2101 } else {
2102 zones->index_state = IS_CLOSED;
2103 zones->index_target = IS_CLOSED;
2104 zones->error_flag = true;
2105 spin_unlock(&zones->lock);
2106 vdo_log_info("Setting UDS index target state to error");
2107 spin_lock(&zones->lock);
2108 }
2109 /*
2110 * ASSERTION: On success, we leave in IS_OPENED state.
2111 * ASSERTION: On failure, we leave in IS_CLOSED state.
2112 */
2113 }
2114
change_dedupe_state(struct vdo_completion * completion)2115 static void change_dedupe_state(struct vdo_completion *completion)
2116 {
2117 struct hash_zones *zones = as_hash_zones(completion);
2118
2119 spin_lock(&zones->lock);
2120
2121 /* Loop until the index is in the target state and the create flag is clear. */
2122 while (vdo_is_state_normal(&zones->state) &&
2123 ((zones->index_state != zones->index_target) || zones->create_flag)) {
2124 if (zones->index_state == IS_OPENED)
2125 close_index(zones);
2126 else
2127 open_index(zones);
2128 }
2129
2130 zones->changing = false;
2131 spin_unlock(&zones->lock);
2132 }
2133
start_expiration_timer(struct dedupe_context * context)2134 static void start_expiration_timer(struct dedupe_context *context)
2135 {
2136 u64 start_time = context->submission_jiffies;
2137 u64 end_time;
2138
2139 if (!change_timer_state(context->zone, DEDUPE_QUERY_TIMER_IDLE,
2140 DEDUPE_QUERY_TIMER_RUNNING))
2141 return;
2142
2143 end_time = max(start_time + vdo_dedupe_index_timeout_jiffies,
2144 jiffies + vdo_dedupe_index_min_timer_jiffies);
2145 mod_timer(&context->zone->timer, end_time);
2146 }
2147
2148 /**
2149 * report_dedupe_timeouts() - Record and eventually report that some dedupe requests reached their
2150 * expiration time without getting answers, so we timed them out.
2151 * @zones: The hash zones.
2152 * @timeouts: The number of newly timed out requests.
2153 */
report_dedupe_timeouts(struct hash_zones * zones,unsigned int timeouts)2154 static void report_dedupe_timeouts(struct hash_zones *zones, unsigned int timeouts)
2155 {
2156 atomic64_add(timeouts, &zones->timeouts);
2157 spin_lock(&zones->lock);
2158 if (__ratelimit(&zones->ratelimiter)) {
2159 u64 unreported = atomic64_read(&zones->timeouts);
2160
2161 unreported -= zones->reported_timeouts;
2162 vdo_log_debug("UDS index timeout on %llu requests",
2163 (unsigned long long) unreported);
2164 zones->reported_timeouts += unreported;
2165 }
2166 spin_unlock(&zones->lock);
2167 }
2168
initialize_index(struct vdo * vdo,struct hash_zones * zones)2169 static int initialize_index(struct vdo *vdo, struct hash_zones *zones)
2170 {
2171 int result;
2172 off_t uds_offset;
2173 struct volume_geometry geometry = vdo->geometry;
2174 static const struct vdo_work_queue_type uds_queue_type = {
2175 .start = start_uds_queue,
2176 .finish = finish_uds_queue,
2177 .max_priority = UDS_Q_MAX_PRIORITY,
2178 .default_priority = UDS_Q_PRIORITY,
2179 };
2180
2181 vdo_set_dedupe_index_timeout_interval(vdo_dedupe_index_timeout_interval);
2182 vdo_set_dedupe_index_min_timer_interval(vdo_dedupe_index_min_timer_interval);
2183 spin_lock_init(&zones->lock);
2184
2185 /*
2186 * Since we will save up the timeouts that would have been reported but were ratelimited,
2187 * we don't need to report ratelimiting.
2188 */
2189 ratelimit_default_init(&zones->ratelimiter);
2190 ratelimit_set_flags(&zones->ratelimiter, RATELIMIT_MSG_ON_RELEASE);
2191 uds_offset = ((vdo_get_index_region_start(geometry) -
2192 geometry.bio_offset) * VDO_BLOCK_SIZE);
2193 zones->parameters = (struct uds_parameters) {
2194 .bdev = vdo->device_config->owned_device->bdev,
2195 .offset = uds_offset,
2196 .size = (vdo_get_index_region_size(geometry) * VDO_BLOCK_SIZE),
2197 .memory_size = geometry.index_config.mem,
2198 .sparse = geometry.index_config.sparse,
2199 .nonce = (u64) geometry.nonce,
2200 };
2201
2202 result = uds_create_index_session(&zones->index_session);
2203 if (result != UDS_SUCCESS)
2204 return result;
2205
2206 result = vdo_make_thread(vdo, vdo->thread_config.dedupe_thread, &uds_queue_type,
2207 1, NULL);
2208 if (result != VDO_SUCCESS) {
2209 uds_destroy_index_session(vdo_forget(zones->index_session));
2210 vdo_log_error("UDS index queue initialization failed (%d)", result);
2211 return result;
2212 }
2213
2214 vdo_initialize_completion(&zones->completion, vdo, VDO_HASH_ZONES_COMPLETION);
2215 vdo_set_completion_callback(&zones->completion, change_dedupe_state,
2216 vdo->thread_config.dedupe_thread);
2217 return VDO_SUCCESS;
2218 }
2219
2220 /**
2221 * finish_index_operation() - This is the UDS callback for index queries.
2222 * @request: The uds request which has just completed.
2223 */
finish_index_operation(struct uds_request * request)2224 static void finish_index_operation(struct uds_request *request)
2225 {
2226 struct dedupe_context *context = container_of(request, struct dedupe_context,
2227 request);
2228
2229 if (change_context_state(context, DEDUPE_CONTEXT_PENDING,
2230 DEDUPE_CONTEXT_COMPLETE)) {
2231 /*
2232 * This query has not timed out, so send its data_vio back to its hash zone to
2233 * process the results.
2234 */
2235 continue_data_vio(context->requestor);
2236 return;
2237 }
2238
2239 /*
2240 * This query has timed out, so try to mark it complete and hence eligible for reuse. Its
2241 * data_vio has already moved on.
2242 */
2243 if (!change_context_state(context, DEDUPE_CONTEXT_TIMED_OUT,
2244 DEDUPE_CONTEXT_TIMED_OUT_COMPLETE)) {
2245 VDO_ASSERT_LOG_ONLY(false, "uds request was timed out (state %d)",
2246 atomic_read(&context->state));
2247 }
2248
2249 vdo_funnel_queue_put(context->zone->timed_out_complete, &context->queue_entry);
2250 }
2251
2252 /**
2253 * check_for_drain_complete() - Check whether this zone has drained.
2254 * @zone: The zone to check.
2255 */
check_for_drain_complete(struct hash_zone * zone)2256 static void check_for_drain_complete(struct hash_zone *zone)
2257 {
2258 data_vio_count_t recycled = 0;
2259
2260 if (!vdo_is_state_draining(&zone->state))
2261 return;
2262
2263 if ((atomic_read(&zone->timer_state) == DEDUPE_QUERY_TIMER_IDLE) ||
2264 change_timer_state(zone, DEDUPE_QUERY_TIMER_RUNNING,
2265 DEDUPE_QUERY_TIMER_IDLE)) {
2266 timer_delete_sync(&zone->timer);
2267 } else {
2268 /*
2269 * There is an in flight time-out, which must get processed before we can continue.
2270 */
2271 return;
2272 }
2273
2274 for (;;) {
2275 struct dedupe_context *context;
2276 struct funnel_queue_entry *entry;
2277
2278 entry = vdo_funnel_queue_poll(zone->timed_out_complete);
2279 if (entry == NULL)
2280 break;
2281
2282 context = container_of(entry, struct dedupe_context, queue_entry);
2283 atomic_set(&context->state, DEDUPE_CONTEXT_IDLE);
2284 list_add(&context->list_entry, &zone->available);
2285 recycled++;
2286 }
2287
2288 if (recycled > 0)
2289 WRITE_ONCE(zone->active, zone->active - recycled);
2290 VDO_ASSERT_LOG_ONLY(READ_ONCE(zone->active) == 0, "all contexts inactive");
2291 vdo_finish_draining(&zone->state);
2292 }
2293
timeout_index_operations_callback(struct vdo_completion * completion)2294 static void timeout_index_operations_callback(struct vdo_completion *completion)
2295 {
2296 struct dedupe_context *context, *tmp;
2297 struct hash_zone *zone = as_hash_zone(completion);
2298 u64 timeout_jiffies = msecs_to_jiffies(vdo_dedupe_index_timeout_interval);
2299 unsigned long cutoff = jiffies - timeout_jiffies;
2300 unsigned int timed_out = 0;
2301
2302 atomic_set(&zone->timer_state, DEDUPE_QUERY_TIMER_IDLE);
2303 list_for_each_entry_safe(context, tmp, &zone->pending, list_entry) {
2304 if (cutoff <= context->submission_jiffies) {
2305 /*
2306 * We have reached the oldest query which has not timed out yet, so restart
2307 * the timer.
2308 */
2309 start_expiration_timer(context);
2310 break;
2311 }
2312
2313 if (!change_context_state(context, DEDUPE_CONTEXT_PENDING,
2314 DEDUPE_CONTEXT_TIMED_OUT)) {
2315 /*
2316 * This context completed between the time the timeout fired, and now. We
2317 * can treat it as a successful query, its requestor is already enqueued
2318 * to process it.
2319 */
2320 continue;
2321 }
2322
2323 /*
2324 * Remove this context from the pending list so we won't look at it again on a
2325 * subsequent timeout. Once the index completes it, it will be reused. Meanwhile,
2326 * send its requestor on its way.
2327 */
2328 list_del_init(&context->list_entry);
2329 context->requestor->dedupe_context = NULL;
2330 continue_data_vio(context->requestor);
2331 timed_out++;
2332 }
2333
2334 if (timed_out > 0)
2335 report_dedupe_timeouts(completion->vdo->hash_zones, timed_out);
2336
2337 check_for_drain_complete(zone);
2338 }
2339
timeout_index_operations(struct timer_list * t)2340 static void timeout_index_operations(struct timer_list *t)
2341 {
2342 struct hash_zone *zone = timer_container_of(zone, t, timer);
2343
2344 if (change_timer_state(zone, DEDUPE_QUERY_TIMER_RUNNING,
2345 DEDUPE_QUERY_TIMER_FIRED))
2346 vdo_launch_completion(&zone->completion);
2347 }
2348
initialize_zone(struct vdo * vdo,struct hash_zones * zones,zone_count_t zone_number)2349 static int __must_check initialize_zone(struct vdo *vdo, struct hash_zones *zones,
2350 zone_count_t zone_number)
2351 {
2352 int result;
2353 data_vio_count_t i;
2354 struct hash_zone *zone = &zones->zones[zone_number];
2355
2356 result = vdo_int_map_create(VDO_LOCK_MAP_CAPACITY, &zone->hash_lock_map);
2357 if (result != VDO_SUCCESS)
2358 return result;
2359
2360 vdo_set_admin_state_code(&zone->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
2361 zone->zone_number = zone_number;
2362 zone->thread_id = vdo->thread_config.hash_zone_threads[zone_number];
2363 vdo_initialize_completion(&zone->completion, vdo, VDO_HASH_ZONE_COMPLETION);
2364 vdo_set_completion_callback(&zone->completion, timeout_index_operations_callback,
2365 zone->thread_id);
2366 INIT_LIST_HEAD(&zone->lock_pool);
2367 result = vdo_allocate(LOCK_POOL_CAPACITY, struct hash_lock, "hash_lock array",
2368 &zone->lock_array);
2369 if (result != VDO_SUCCESS)
2370 return result;
2371
2372 for (i = 0; i < LOCK_POOL_CAPACITY; i++)
2373 return_hash_lock_to_pool(zone, &zone->lock_array[i]);
2374
2375 INIT_LIST_HEAD(&zone->available);
2376 INIT_LIST_HEAD(&zone->pending);
2377 result = vdo_make_funnel_queue(&zone->timed_out_complete);
2378 if (result != VDO_SUCCESS)
2379 return result;
2380
2381 timer_setup(&zone->timer, timeout_index_operations, 0);
2382
2383 for (i = 0; i < MAXIMUM_VDO_USER_VIOS; i++) {
2384 struct dedupe_context *context = &zone->contexts[i];
2385
2386 context->zone = zone;
2387 context->request.callback = finish_index_operation;
2388 context->request.session = zones->index_session;
2389 list_add(&context->list_entry, &zone->available);
2390 }
2391
2392 return vdo_make_default_thread(vdo, zone->thread_id);
2393 }
2394
2395 /** get_thread_id_for_zone() - Implements vdo_zone_thread_getter_fn. */
get_thread_id_for_zone(void * context,zone_count_t zone_number)2396 static thread_id_t get_thread_id_for_zone(void *context, zone_count_t zone_number)
2397 {
2398 struct hash_zones *zones = context;
2399
2400 return zones->zones[zone_number].thread_id;
2401 }
2402
2403 /**
2404 * vdo_make_hash_zones() - Create the hash zones.
2405 *
2406 * @vdo: The vdo to which the zone will belong.
2407 * @zones_ptr: A pointer to hold the zones.
2408 *
2409 * Return: VDO_SUCCESS or an error code.
2410 */
vdo_make_hash_zones(struct vdo * vdo,struct hash_zones ** zones_ptr)2411 int vdo_make_hash_zones(struct vdo *vdo, struct hash_zones **zones_ptr)
2412 {
2413 int result;
2414 struct hash_zones *zones;
2415 zone_count_t z;
2416 zone_count_t zone_count = vdo->thread_config.hash_zone_count;
2417
2418 if (zone_count == 0)
2419 return VDO_SUCCESS;
2420
2421 result = vdo_allocate_extended(struct hash_zones, zone_count, struct hash_zone,
2422 __func__, &zones);
2423 if (result != VDO_SUCCESS)
2424 return result;
2425
2426 result = initialize_index(vdo, zones);
2427 if (result != VDO_SUCCESS) {
2428 vdo_free(zones);
2429 return result;
2430 }
2431
2432 vdo_set_admin_state_code(&zones->state, VDO_ADMIN_STATE_NEW);
2433
2434 zones->zone_count = zone_count;
2435 for (z = 0; z < zone_count; z++) {
2436 result = initialize_zone(vdo, zones, z);
2437 if (result != VDO_SUCCESS) {
2438 vdo_free_hash_zones(zones);
2439 return result;
2440 }
2441 }
2442
2443 result = vdo_make_action_manager(zones->zone_count, get_thread_id_for_zone,
2444 vdo->thread_config.admin_thread, zones, NULL,
2445 vdo, &zones->manager);
2446 if (result != VDO_SUCCESS) {
2447 vdo_free_hash_zones(zones);
2448 return result;
2449 }
2450
2451 *zones_ptr = zones;
2452 return VDO_SUCCESS;
2453 }
2454
vdo_finish_dedupe_index(struct hash_zones * zones)2455 void vdo_finish_dedupe_index(struct hash_zones *zones)
2456 {
2457 if (zones == NULL)
2458 return;
2459
2460 uds_destroy_index_session(vdo_forget(zones->index_session));
2461 }
2462
2463 /**
2464 * vdo_free_hash_zones() - Free the hash zones.
2465 * @zones: The zone to free.
2466 */
vdo_free_hash_zones(struct hash_zones * zones)2467 void vdo_free_hash_zones(struct hash_zones *zones)
2468 {
2469 zone_count_t i;
2470
2471 if (zones == NULL)
2472 return;
2473
2474 vdo_free(vdo_forget(zones->manager));
2475
2476 for (i = 0; i < zones->zone_count; i++) {
2477 struct hash_zone *zone = &zones->zones[i];
2478
2479 vdo_free_funnel_queue(vdo_forget(zone->timed_out_complete));
2480 vdo_int_map_free(vdo_forget(zone->hash_lock_map));
2481 vdo_free(vdo_forget(zone->lock_array));
2482 }
2483
2484 if (zones->index_session != NULL)
2485 vdo_finish_dedupe_index(zones);
2486
2487 ratelimit_state_exit(&zones->ratelimiter);
2488 vdo_free(zones);
2489 }
2490
initiate_suspend_index(struct admin_state * state)2491 static void initiate_suspend_index(struct admin_state *state)
2492 {
2493 struct hash_zones *zones = container_of(state, struct hash_zones, state);
2494 enum index_state index_state;
2495
2496 spin_lock(&zones->lock);
2497 index_state = zones->index_state;
2498 spin_unlock(&zones->lock);
2499
2500 if (index_state != IS_CLOSED) {
2501 bool save = vdo_is_state_saving(&zones->state);
2502 int result;
2503
2504 result = uds_suspend_index_session(zones->index_session, save);
2505 if (result != UDS_SUCCESS)
2506 vdo_log_error_strerror(result, "Error suspending dedupe index");
2507 }
2508
2509 vdo_finish_draining(state);
2510 }
2511
2512 /**
2513 * suspend_index() - Suspend the UDS index prior to draining hash zones.
2514 * @context: Not used.
2515 * @completion: The completion for the suspend operation.
2516 *
2517 * Implements vdo_action_preamble_fn
2518 */
suspend_index(void * context,struct vdo_completion * completion)2519 static void suspend_index(void *context, struct vdo_completion *completion)
2520 {
2521 struct hash_zones *zones = context;
2522
2523 vdo_start_draining(&zones->state,
2524 vdo_get_current_manager_operation(zones->manager), completion,
2525 initiate_suspend_index);
2526 }
2527
2528 /** Implements vdo_admin_initiator_fn. */
initiate_drain(struct admin_state * state)2529 static void initiate_drain(struct admin_state *state)
2530 {
2531 check_for_drain_complete(container_of(state, struct hash_zone, state));
2532 }
2533
2534 /** Implements vdo_zone_action_fn. */
drain_hash_zone(void * context,zone_count_t zone_number,struct vdo_completion * parent)2535 static void drain_hash_zone(void *context, zone_count_t zone_number,
2536 struct vdo_completion *parent)
2537 {
2538 struct hash_zones *zones = context;
2539
2540 vdo_start_draining(&zones->zones[zone_number].state,
2541 vdo_get_current_manager_operation(zones->manager), parent,
2542 initiate_drain);
2543 }
2544
2545 /** vdo_drain_hash_zones() - Drain all hash zones. */
vdo_drain_hash_zones(struct hash_zones * zones,struct vdo_completion * parent)2546 void vdo_drain_hash_zones(struct hash_zones *zones, struct vdo_completion *parent)
2547 {
2548 vdo_schedule_operation(zones->manager, parent->vdo->suspend_type, suspend_index,
2549 drain_hash_zone, NULL, parent);
2550 }
2551
launch_dedupe_state_change(struct hash_zones * zones)2552 static void launch_dedupe_state_change(struct hash_zones *zones)
2553 __must_hold(&zones->lock)
2554 {
2555 /* ASSERTION: We enter with the lock held. */
2556 if (zones->changing || !vdo_is_state_normal(&zones->state))
2557 /* Either a change is already in progress, or changes are not allowed. */
2558 return;
2559
2560 if (zones->create_flag || (zones->index_state != zones->index_target)) {
2561 zones->changing = true;
2562 vdo_launch_completion(&zones->completion);
2563 return;
2564 }
2565
2566 /* ASSERTION: We exit with the lock held. */
2567 }
2568
2569 /**
2570 * resume_index() - Resume the UDS index prior to resuming hash zones.
2571 * @context: Not used.
2572 * @parent: The completion for the resume operation.
2573 *
2574 * Implements vdo_action_preamble_fn
2575 */
resume_index(void * context,struct vdo_completion * parent)2576 static void resume_index(void *context, struct vdo_completion *parent)
2577 {
2578 struct hash_zones *zones = context;
2579 struct device_config *config = parent->vdo->device_config;
2580 int result;
2581
2582 zones->parameters.bdev = config->owned_device->bdev;
2583 result = uds_resume_index_session(zones->index_session, zones->parameters.bdev);
2584 if (result != UDS_SUCCESS)
2585 vdo_log_error_strerror(result, "Error resuming dedupe index");
2586
2587 spin_lock(&zones->lock);
2588 vdo_resume_if_quiescent(&zones->state);
2589
2590 if (config->deduplication) {
2591 zones->index_target = IS_OPENED;
2592 WRITE_ONCE(zones->dedupe_flag, true);
2593 } else {
2594 zones->index_target = IS_CLOSED;
2595 }
2596
2597 launch_dedupe_state_change(zones);
2598 spin_unlock(&zones->lock);
2599
2600 vdo_finish_completion(parent);
2601 }
2602
2603 /** Implements vdo_zone_action_fn. */
resume_hash_zone(void * context,zone_count_t zone_number,struct vdo_completion * parent)2604 static void resume_hash_zone(void *context, zone_count_t zone_number,
2605 struct vdo_completion *parent)
2606 {
2607 struct hash_zone *zone = &(((struct hash_zones *) context)->zones[zone_number]);
2608
2609 vdo_fail_completion(parent, vdo_resume_if_quiescent(&zone->state));
2610 }
2611
2612 /**
2613 * vdo_resume_hash_zones() - Resume a set of hash zones.
2614 * @zones: The hash zones to resume.
2615 * @parent: The object to notify when the zones have resumed.
2616 */
vdo_resume_hash_zones(struct hash_zones * zones,struct vdo_completion * parent)2617 void vdo_resume_hash_zones(struct hash_zones *zones, struct vdo_completion *parent)
2618 {
2619 if (vdo_is_read_only(parent->vdo)) {
2620 vdo_launch_completion(parent);
2621 return;
2622 }
2623
2624 vdo_schedule_operation(zones->manager, VDO_ADMIN_STATE_RESUMING, resume_index,
2625 resume_hash_zone, NULL, parent);
2626 }
2627
2628 /**
2629 * get_hash_zone_statistics() - Add the statistics for this hash zone to the tally for all zones.
2630 * @zone: The hash zone to query.
2631 * @tally: The tally.
2632 */
get_hash_zone_statistics(const struct hash_zone * zone,struct hash_lock_statistics * tally)2633 static void get_hash_zone_statistics(const struct hash_zone *zone,
2634 struct hash_lock_statistics *tally)
2635 {
2636 const struct hash_lock_statistics *stats = &zone->statistics;
2637
2638 tally->dedupe_advice_valid += READ_ONCE(stats->dedupe_advice_valid);
2639 tally->dedupe_advice_stale += READ_ONCE(stats->dedupe_advice_stale);
2640 tally->concurrent_data_matches += READ_ONCE(stats->concurrent_data_matches);
2641 tally->concurrent_hash_collisions += READ_ONCE(stats->concurrent_hash_collisions);
2642 tally->curr_dedupe_queries += READ_ONCE(zone->active);
2643 }
2644
get_index_statistics(struct hash_zones * zones,struct index_statistics * stats)2645 static void get_index_statistics(struct hash_zones *zones,
2646 struct index_statistics *stats)
2647 {
2648 enum index_state state;
2649 struct uds_index_stats index_stats;
2650 int result;
2651
2652 spin_lock(&zones->lock);
2653 state = zones->index_state;
2654 spin_unlock(&zones->lock);
2655
2656 if (state != IS_OPENED)
2657 return;
2658
2659 result = uds_get_index_session_stats(zones->index_session, &index_stats);
2660 if (result != UDS_SUCCESS) {
2661 vdo_log_error_strerror(result, "Error reading index stats");
2662 return;
2663 }
2664
2665 stats->entries_indexed = index_stats.entries_indexed;
2666 stats->posts_found = index_stats.posts_found;
2667 stats->posts_not_found = index_stats.posts_not_found;
2668 stats->queries_found = index_stats.queries_found;
2669 stats->queries_not_found = index_stats.queries_not_found;
2670 stats->updates_found = index_stats.updates_found;
2671 stats->updates_not_found = index_stats.updates_not_found;
2672 stats->entries_discarded = index_stats.entries_discarded;
2673 }
2674
2675 /**
2676 * vdo_get_dedupe_statistics() - Tally the statistics from all the hash zones and the UDS index.
2677 * @zones: The hash zones to query.
2678 * @stats: A structure to store the statistics.
2679 *
2680 * Return: The sum of the hash lock statistics from all hash zones plus the statistics from the UDS
2681 * index
2682 */
vdo_get_dedupe_statistics(struct hash_zones * zones,struct vdo_statistics * stats)2683 void vdo_get_dedupe_statistics(struct hash_zones *zones, struct vdo_statistics *stats)
2684
2685 {
2686 zone_count_t zone;
2687
2688 for (zone = 0; zone < zones->zone_count; zone++)
2689 get_hash_zone_statistics(&zones->zones[zone], &stats->hash_lock);
2690
2691 get_index_statistics(zones, &stats->index);
2692
2693 /*
2694 * zones->timeouts gives the number of timeouts, and dedupe_context_busy gives the number
2695 * of queries not made because of earlier timeouts.
2696 */
2697 stats->dedupe_advice_timeouts =
2698 (atomic64_read(&zones->timeouts) + atomic64_read(&zones->dedupe_context_busy));
2699 }
2700
2701 /**
2702 * vdo_select_hash_zone() - Select the hash zone responsible for locking a given record name.
2703 * @zones: The hash_zones from which to select.
2704 * @name: The record name.
2705 *
2706 * Return: The hash zone responsible for the record name.
2707 */
vdo_select_hash_zone(struct hash_zones * zones,const struct uds_record_name * name)2708 struct hash_zone *vdo_select_hash_zone(struct hash_zones *zones,
2709 const struct uds_record_name *name)
2710 {
2711 /*
2712 * Use a fragment of the record name as a hash code. Eight bits of hash should suffice
2713 * since the number of hash zones is small.
2714 * TODO: Verify that the first byte is independent enough.
2715 */
2716 u32 hash = name->name[0];
2717
2718 /*
2719 * Scale the 8-bit hash fragment to a zone index by treating it as a binary fraction and
2720 * multiplying that by the zone count. If the hash is uniformly distributed over [0 ..
2721 * 2^8-1], then (hash * count / 2^8) should be uniformly distributed over [0 .. count-1].
2722 * The multiply and shift is much faster than a divide (modulus) on X86 CPUs.
2723 */
2724 hash = (hash * zones->zone_count) >> 8;
2725 return &zones->zones[hash];
2726 }
2727
2728 /**
2729 * dump_hash_lock() - Dump a compact description of hash_lock to the log if the lock is not on the
2730 * free list.
2731 * @lock: The hash lock to dump.
2732 */
dump_hash_lock(const struct hash_lock * lock)2733 static void dump_hash_lock(const struct hash_lock *lock)
2734 {
2735 const char *state;
2736
2737 if (!list_empty(&lock->pool_node)) {
2738 /* This lock is on the free list. */
2739 return;
2740 }
2741
2742 /*
2743 * Necessarily cryptic since we can log a lot of these. First three chars of state is
2744 * unambiguous. 'U' indicates a lock not registered in the map.
2745 */
2746 state = get_hash_lock_state_name(lock->state);
2747 vdo_log_info(" hl %px: %3.3s %c%llu/%u rc=%u wc=%zu agt=%px",
2748 lock, state, (lock->registered ? 'D' : 'U'),
2749 (unsigned long long) lock->duplicate.pbn,
2750 lock->duplicate.state, lock->reference_count,
2751 vdo_waitq_num_waiters(&lock->waiters), lock->agent);
2752 }
2753
index_state_to_string(struct hash_zones * zones,enum index_state state)2754 static const char *index_state_to_string(struct hash_zones *zones,
2755 enum index_state state)
2756 {
2757 if (!vdo_is_state_normal(&zones->state))
2758 return SUSPENDED;
2759
2760 switch (state) {
2761 case IS_CLOSED:
2762 return zones->error_flag ? ERROR : CLOSED;
2763 case IS_CHANGING:
2764 return zones->index_target == IS_OPENED ? OPENING : CLOSING;
2765 case IS_OPENED:
2766 return READ_ONCE(zones->dedupe_flag) ? ONLINE : OFFLINE;
2767 default:
2768 return UNKNOWN;
2769 }
2770 }
2771
2772 /**
2773 * dump_hash_zone() - Dump information about a hash zone to the log for debugging.
2774 * @zone: The zone to dump.
2775 */
dump_hash_zone(const struct hash_zone * zone)2776 static void dump_hash_zone(const struct hash_zone *zone)
2777 {
2778 data_vio_count_t i;
2779
2780 if (zone->hash_lock_map == NULL) {
2781 vdo_log_info("struct hash_zone %u: NULL map", zone->zone_number);
2782 return;
2783 }
2784
2785 vdo_log_info("struct hash_zone %u: mapSize=%zu",
2786 zone->zone_number, vdo_int_map_size(zone->hash_lock_map));
2787 for (i = 0; i < LOCK_POOL_CAPACITY; i++)
2788 dump_hash_lock(&zone->lock_array[i]);
2789 }
2790
2791 /**
2792 * vdo_dump_hash_zones() - Dump information about the hash zones to the log for debugging.
2793 * @zones: The zones to dump.
2794 */
vdo_dump_hash_zones(struct hash_zones * zones)2795 void vdo_dump_hash_zones(struct hash_zones *zones)
2796 {
2797 const char *state, *target;
2798 zone_count_t zone;
2799
2800 spin_lock(&zones->lock);
2801 state = index_state_to_string(zones, zones->index_state);
2802 target = (zones->changing ? index_state_to_string(zones, zones->index_target) : NULL);
2803 spin_unlock(&zones->lock);
2804
2805 vdo_log_info("UDS index: state: %s", state);
2806 if (target != NULL)
2807 vdo_log_info("UDS index: changing to state: %s", target);
2808
2809 for (zone = 0; zone < zones->zone_count; zone++)
2810 dump_hash_zone(&zones->zones[zone]);
2811 }
2812
vdo_set_dedupe_index_timeout_interval(unsigned int value)2813 void vdo_set_dedupe_index_timeout_interval(unsigned int value)
2814 {
2815 u64 alb_jiffies;
2816
2817 /* Arbitrary maximum value is two minutes */
2818 if (value > 120000)
2819 value = 120000;
2820 /* Arbitrary minimum value is 2 jiffies */
2821 alb_jiffies = msecs_to_jiffies(value);
2822
2823 if (alb_jiffies < 2) {
2824 alb_jiffies = 2;
2825 value = jiffies_to_msecs(alb_jiffies);
2826 }
2827 vdo_dedupe_index_timeout_interval = value;
2828 vdo_dedupe_index_timeout_jiffies = alb_jiffies;
2829 }
2830
vdo_set_dedupe_index_min_timer_interval(unsigned int value)2831 void vdo_set_dedupe_index_min_timer_interval(unsigned int value)
2832 {
2833 u64 min_jiffies;
2834
2835 /* Arbitrary maximum value is one second */
2836 if (value > 1000)
2837 value = 1000;
2838
2839 /* Arbitrary minimum value is 2 jiffies */
2840 min_jiffies = msecs_to_jiffies(value);
2841
2842 if (min_jiffies < 2) {
2843 min_jiffies = 2;
2844 value = jiffies_to_msecs(min_jiffies);
2845 }
2846
2847 vdo_dedupe_index_min_timer_interval = value;
2848 vdo_dedupe_index_min_timer_jiffies = min_jiffies;
2849 }
2850
2851 /**
2852 * acquire_context() - Acquire a dedupe context from a hash_zone if any are available.
2853 * @zone: The hash zone.
2854 *
2855 * Return: A dedupe_context or NULL if none are available.
2856 */
acquire_context(struct hash_zone * zone)2857 static struct dedupe_context * __must_check acquire_context(struct hash_zone *zone)
2858 {
2859 struct dedupe_context *context;
2860 struct funnel_queue_entry *entry;
2861
2862 assert_in_hash_zone(zone, __func__);
2863
2864 if (!list_empty(&zone->available)) {
2865 WRITE_ONCE(zone->active, zone->active + 1);
2866 context = list_first_entry(&zone->available, struct dedupe_context,
2867 list_entry);
2868 list_del_init(&context->list_entry);
2869 return context;
2870 }
2871
2872 entry = vdo_funnel_queue_poll(zone->timed_out_complete);
2873 return ((entry == NULL) ?
2874 NULL : container_of(entry, struct dedupe_context, queue_entry));
2875 }
2876
prepare_uds_request(struct uds_request * request,struct data_vio * data_vio,enum uds_request_type operation)2877 static void prepare_uds_request(struct uds_request *request, struct data_vio *data_vio,
2878 enum uds_request_type operation)
2879 {
2880 request->record_name = data_vio->record_name;
2881 request->type = operation;
2882 if ((operation == UDS_POST) || (operation == UDS_UPDATE)) {
2883 size_t offset = 0;
2884 struct uds_record_data *encoding = &request->new_metadata;
2885
2886 encoding->data[offset++] = UDS_ADVICE_VERSION;
2887 encoding->data[offset++] = data_vio->new_mapped.state;
2888 put_unaligned_le64(data_vio->new_mapped.pbn, &encoding->data[offset]);
2889 offset += sizeof(u64);
2890 BUG_ON(offset != UDS_ADVICE_SIZE);
2891 }
2892 }
2893
2894 /*
2895 * The index operation will inquire about data_vio.record_name, providing (if the operation is
2896 * appropriate) advice from the data_vio's new_mapped fields. The advice found in the index (or
2897 * NULL if none) will be returned via receive_data_vio_dedupe_advice(). dedupe_context.status is
2898 * set to the return status code of any asynchronous index processing.
2899 */
query_index(struct data_vio * data_vio,enum uds_request_type operation)2900 static void query_index(struct data_vio *data_vio, enum uds_request_type operation)
2901 {
2902 int result;
2903 struct dedupe_context *context;
2904 struct vdo *vdo = vdo_from_data_vio(data_vio);
2905 struct hash_zone *zone = data_vio->hash_zone;
2906
2907 assert_data_vio_in_hash_zone(data_vio);
2908
2909 if (!READ_ONCE(vdo->hash_zones->dedupe_flag)) {
2910 continue_data_vio(data_vio);
2911 return;
2912 }
2913
2914 context = acquire_context(zone);
2915 if (context == NULL) {
2916 atomic64_inc(&vdo->hash_zones->dedupe_context_busy);
2917 continue_data_vio(data_vio);
2918 return;
2919 }
2920
2921 data_vio->dedupe_context = context;
2922 context->requestor = data_vio;
2923 context->submission_jiffies = jiffies;
2924 prepare_uds_request(&context->request, data_vio, operation);
2925 atomic_set(&context->state, DEDUPE_CONTEXT_PENDING);
2926 list_add_tail(&context->list_entry, &zone->pending);
2927 start_expiration_timer(context);
2928 result = uds_launch_request(&context->request);
2929 if (result != UDS_SUCCESS) {
2930 context->request.status = result;
2931 finish_index_operation(&context->request);
2932 }
2933 }
2934
set_target_state(struct hash_zones * zones,enum index_state target,bool change_dedupe,bool dedupe,bool set_create)2935 static void set_target_state(struct hash_zones *zones, enum index_state target,
2936 bool change_dedupe, bool dedupe, bool set_create)
2937 {
2938 const char *old_state, *new_state;
2939
2940 spin_lock(&zones->lock);
2941 old_state = index_state_to_string(zones, zones->index_target);
2942 if (change_dedupe)
2943 WRITE_ONCE(zones->dedupe_flag, dedupe);
2944
2945 if (set_create)
2946 zones->create_flag = true;
2947
2948 zones->index_target = target;
2949 launch_dedupe_state_change(zones);
2950 new_state = index_state_to_string(zones, zones->index_target);
2951 spin_unlock(&zones->lock);
2952
2953 if (old_state != new_state)
2954 vdo_log_info("Setting UDS index target state to %s", new_state);
2955 }
2956
vdo_get_dedupe_index_state_name(struct hash_zones * zones)2957 const char *vdo_get_dedupe_index_state_name(struct hash_zones *zones)
2958 {
2959 const char *state;
2960
2961 spin_lock(&zones->lock);
2962 state = index_state_to_string(zones, zones->index_state);
2963 spin_unlock(&zones->lock);
2964
2965 return state;
2966 }
2967
2968 /* Handle a dmsetup message relevant to the index. */
vdo_message_dedupe_index(struct hash_zones * zones,const char * name)2969 int vdo_message_dedupe_index(struct hash_zones *zones, const char *name)
2970 {
2971 if (strcasecmp(name, "index-close") == 0) {
2972 set_target_state(zones, IS_CLOSED, false, false, false);
2973 return 0;
2974 } else if (strcasecmp(name, "index-create") == 0) {
2975 set_target_state(zones, IS_OPENED, false, false, true);
2976 return 0;
2977 } else if (strcasecmp(name, "index-disable") == 0) {
2978 set_target_state(zones, IS_OPENED, true, false, false);
2979 return 0;
2980 } else if (strcasecmp(name, "index-enable") == 0) {
2981 set_target_state(zones, IS_OPENED, true, true, false);
2982 return 0;
2983 }
2984
2985 return -EINVAL;
2986 }
2987
vdo_set_dedupe_state_normal(struct hash_zones * zones)2988 void vdo_set_dedupe_state_normal(struct hash_zones *zones)
2989 {
2990 vdo_set_admin_state_code(&zones->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
2991 }
2992
2993 /* If create_flag, create a new index without first attempting to load an existing index. */
vdo_start_dedupe_index(struct hash_zones * zones,bool create_flag)2994 void vdo_start_dedupe_index(struct hash_zones *zones, bool create_flag)
2995 {
2996 set_target_state(zones, IS_OPENED, true, true, create_flag);
2997 }
2998