xref: /linux/drivers/block/rnbd/rnbd-clt.c (revision c88fb897c1fb5a590dc6353ac4b01c8f46a347b3)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3  * RDMA Network Block Driver
4  *
5  * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
6  * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
7  * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
8  */
9 
10 #undef pr_fmt
11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
12 
13 #include <linux/module.h>
14 #include <linux/blkdev.h>
15 #include <linux/hdreg.h>
16 #include <linux/scatterlist.h>
17 #include <linux/idr.h>
18 
19 #include "rnbd-clt.h"
20 
21 MODULE_DESCRIPTION("RDMA Network Block Device Client");
22 MODULE_LICENSE("GPL");
23 
24 static int rnbd_client_major;
25 static DEFINE_IDA(index_ida);
26 static DEFINE_MUTEX(ida_lock);
27 static DEFINE_MUTEX(sess_lock);
28 static LIST_HEAD(sess_list);
29 
30 /*
31  * Maximum number of partitions an instance can have.
32  * 6 bits = 64 minors = 63 partitions (one minor is used for the device itself)
33  */
34 #define RNBD_PART_BITS		6
35 
36 static inline bool rnbd_clt_get_sess(struct rnbd_clt_session *sess)
37 {
38 	return refcount_inc_not_zero(&sess->refcount);
39 }
40 
41 static void free_sess(struct rnbd_clt_session *sess);
42 
43 static void rnbd_clt_put_sess(struct rnbd_clt_session *sess)
44 {
45 	might_sleep();
46 
47 	if (refcount_dec_and_test(&sess->refcount))
48 		free_sess(sess);
49 }
50 
51 static void rnbd_clt_put_dev(struct rnbd_clt_dev *dev)
52 {
53 	might_sleep();
54 
55 	if (!refcount_dec_and_test(&dev->refcount))
56 		return;
57 
58 	mutex_lock(&ida_lock);
59 	ida_simple_remove(&index_ida, dev->clt_device_id);
60 	mutex_unlock(&ida_lock);
61 	kfree(dev->hw_queues);
62 	kfree(dev->pathname);
63 	rnbd_clt_put_sess(dev->sess);
64 	mutex_destroy(&dev->lock);
65 	kfree(dev);
66 }
67 
68 static inline bool rnbd_clt_get_dev(struct rnbd_clt_dev *dev)
69 {
70 	return refcount_inc_not_zero(&dev->refcount);
71 }
72 
73 static int rnbd_clt_set_dev_attr(struct rnbd_clt_dev *dev,
74 				 const struct rnbd_msg_open_rsp *rsp)
75 {
76 	struct rnbd_clt_session *sess = dev->sess;
77 
78 	if (!rsp->logical_block_size)
79 		return -EINVAL;
80 
81 	dev->device_id		    = le32_to_cpu(rsp->device_id);
82 	dev->nsectors		    = le64_to_cpu(rsp->nsectors);
83 	dev->logical_block_size	    = le16_to_cpu(rsp->logical_block_size);
84 	dev->physical_block_size    = le16_to_cpu(rsp->physical_block_size);
85 	dev->max_write_same_sectors = le32_to_cpu(rsp->max_write_same_sectors);
86 	dev->max_discard_sectors    = le32_to_cpu(rsp->max_discard_sectors);
87 	dev->discard_granularity    = le32_to_cpu(rsp->discard_granularity);
88 	dev->discard_alignment	    = le32_to_cpu(rsp->discard_alignment);
89 	dev->secure_discard	    = le16_to_cpu(rsp->secure_discard);
90 	dev->rotational		    = rsp->rotational;
91 	dev->wc 		    = !!(rsp->cache_policy & RNBD_WRITEBACK);
92 	dev->fua		    = !!(rsp->cache_policy & RNBD_FUA);
93 
94 	dev->max_hw_sectors = sess->max_io_size / SECTOR_SIZE;
95 	dev->max_segments = BMAX_SEGMENTS;
96 
97 	return 0;
98 }
99 
100 static int rnbd_clt_change_capacity(struct rnbd_clt_dev *dev,
101 				    size_t new_nsectors)
102 {
103 	rnbd_clt_info(dev, "Device size changed from %zu to %zu sectors\n",
104 		       dev->nsectors, new_nsectors);
105 	dev->nsectors = new_nsectors;
106 	set_capacity_and_notify(dev->gd, dev->nsectors);
107 	return 0;
108 }
109 
110 static int process_msg_open_rsp(struct rnbd_clt_dev *dev,
111 				struct rnbd_msg_open_rsp *rsp)
112 {
113 	int err = 0;
114 
115 	mutex_lock(&dev->lock);
116 	if (dev->dev_state == DEV_STATE_UNMAPPED) {
117 		rnbd_clt_info(dev,
118 			       "Ignoring Open-Response message from server for  unmapped device\n");
119 		err = -ENOENT;
120 		goto out;
121 	}
122 	if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) {
123 		u64 nsectors = le64_to_cpu(rsp->nsectors);
124 
125 		/*
126 		 * If the device was remapped and the size changed in the
127 		 * meantime we need to revalidate it
128 		 */
129 		if (dev->nsectors != nsectors)
130 			rnbd_clt_change_capacity(dev, nsectors);
131 		rnbd_clt_info(dev, "Device online, device remapped successfully\n");
132 	}
133 	err = rnbd_clt_set_dev_attr(dev, rsp);
134 	if (err)
135 		goto out;
136 	dev->dev_state = DEV_STATE_MAPPED;
137 
138 out:
139 	mutex_unlock(&dev->lock);
140 
141 	return err;
142 }
143 
144 int rnbd_clt_resize_disk(struct rnbd_clt_dev *dev, size_t newsize)
145 {
146 	int ret = 0;
147 
148 	mutex_lock(&dev->lock);
149 	if (dev->dev_state != DEV_STATE_MAPPED) {
150 		pr_err("Failed to set new size of the device, device is not opened\n");
151 		ret = -ENOENT;
152 		goto out;
153 	}
154 	ret = rnbd_clt_change_capacity(dev, newsize);
155 
156 out:
157 	mutex_unlock(&dev->lock);
158 
159 	return ret;
160 }
161 
162 static inline void rnbd_clt_dev_requeue(struct rnbd_queue *q)
163 {
164 	if (WARN_ON(!q->hctx))
165 		return;
166 
167 	/* We can come here from interrupt, thus async=true */
168 	blk_mq_run_hw_queue(q->hctx, true);
169 }
170 
171 enum {
172 	RNBD_DELAY_IFBUSY = -1,
173 };
174 
175 /**
176  * rnbd_get_cpu_qlist() - finds a list with HW queues to be rerun
177  * @sess:	Session to find a queue for
178  * @cpu:	Cpu to start the search from
179  *
180  * Description:
181  *     Each CPU has a list of HW queues, which needs to be rerun.  If a list
182  *     is not empty - it is marked with a bit.  This function finds first
183  *     set bit in a bitmap and returns corresponding CPU list.
184  */
185 static struct rnbd_cpu_qlist *
186 rnbd_get_cpu_qlist(struct rnbd_clt_session *sess, int cpu)
187 {
188 	int bit;
189 
190 	/* Search from cpu to nr_cpu_ids */
191 	bit = find_next_bit(sess->cpu_queues_bm, nr_cpu_ids, cpu);
192 	if (bit < nr_cpu_ids) {
193 		return per_cpu_ptr(sess->cpu_queues, bit);
194 	} else if (cpu != 0) {
195 		/* Search from 0 to cpu */
196 		bit = find_next_bit(sess->cpu_queues_bm, cpu, 0);
197 		if (bit < cpu)
198 			return per_cpu_ptr(sess->cpu_queues, bit);
199 	}
200 
201 	return NULL;
202 }
203 
204 static inline int nxt_cpu(int cpu)
205 {
206 	return (cpu + 1) % nr_cpu_ids;
207 }
208 
209 /**
210  * rnbd_rerun_if_needed() - rerun next queue marked as stopped
211  * @sess:	Session to rerun a queue on
212  *
213  * Description:
214  *     Each CPU has it's own list of HW queues, which should be rerun.
215  *     Function finds such list with HW queues, takes a list lock, picks up
216  *     the first HW queue out of the list and requeues it.
217  *
218  * Return:
219  *     True if the queue was requeued, false otherwise.
220  *
221  * Context:
222  *     Does not matter.
223  */
224 static bool rnbd_rerun_if_needed(struct rnbd_clt_session *sess)
225 {
226 	struct rnbd_queue *q = NULL;
227 	struct rnbd_cpu_qlist *cpu_q;
228 	unsigned long flags;
229 	int *cpup;
230 
231 	/*
232 	 * To keep fairness and not to let other queues starve we always
233 	 * try to wake up someone else in round-robin manner.  That of course
234 	 * increases latency but queues always have a chance to be executed.
235 	 */
236 	cpup = get_cpu_ptr(sess->cpu_rr);
237 	for (cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(*cpup)); cpu_q;
238 	     cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(cpu_q->cpu))) {
239 		if (!spin_trylock_irqsave(&cpu_q->requeue_lock, flags))
240 			continue;
241 		if (unlikely(!test_bit(cpu_q->cpu, sess->cpu_queues_bm)))
242 			goto unlock;
243 		q = list_first_entry_or_null(&cpu_q->requeue_list,
244 					     typeof(*q), requeue_list);
245 		if (WARN_ON(!q))
246 			goto clear_bit;
247 		list_del_init(&q->requeue_list);
248 		clear_bit_unlock(0, &q->in_list);
249 
250 		if (list_empty(&cpu_q->requeue_list)) {
251 			/* Clear bit if nothing is left */
252 clear_bit:
253 			clear_bit(cpu_q->cpu, sess->cpu_queues_bm);
254 		}
255 unlock:
256 		spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
257 
258 		if (q)
259 			break;
260 	}
261 
262 	/**
263 	 * Saves the CPU that is going to be requeued on the per-cpu var. Just
264 	 * incrementing it doesn't work because rnbd_get_cpu_qlist() will
265 	 * always return the first CPU with something on the queue list when the
266 	 * value stored on the var is greater than the last CPU with something
267 	 * on the list.
268 	 */
269 	if (cpu_q)
270 		*cpup = cpu_q->cpu;
271 	put_cpu_var(sess->cpu_rr);
272 
273 	if (q)
274 		rnbd_clt_dev_requeue(q);
275 
276 	return q;
277 }
278 
279 /**
280  * rnbd_rerun_all_if_idle() - rerun all queues left in the list if
281  *				 session is idling (there are no requests
282  *				 in-flight).
283  * @sess:	Session to rerun the queues on
284  *
285  * Description:
286  *     This function tries to rerun all stopped queues if there are no
287  *     requests in-flight anymore.  This function tries to solve an obvious
288  *     problem, when number of tags < than number of queues (hctx), which
289  *     are stopped and put to sleep.  If last permit, which has been just put,
290  *     does not wake up all left queues (hctxs), IO requests hang forever.
291  *
292  *     That can happen when all number of permits, say N, have been exhausted
293  *     from one CPU, and we have many block devices per session, say M.
294  *     Each block device has it's own queue (hctx) for each CPU, so eventually
295  *     we can put that number of queues (hctxs) to sleep: M x nr_cpu_ids.
296  *     If number of permits N < M x nr_cpu_ids finally we will get an IO hang.
297  *
298  *     To avoid this hang last caller of rnbd_put_permit() (last caller is the
299  *     one who observes sess->busy == 0) must wake up all remaining queues.
300  *
301  * Context:
302  *     Does not matter.
303  */
304 static void rnbd_rerun_all_if_idle(struct rnbd_clt_session *sess)
305 {
306 	bool requeued;
307 
308 	do {
309 		requeued = rnbd_rerun_if_needed(sess);
310 	} while (atomic_read(&sess->busy) == 0 && requeued);
311 }
312 
313 static struct rtrs_permit *rnbd_get_permit(struct rnbd_clt_session *sess,
314 					     enum rtrs_clt_con_type con_type,
315 					     int wait)
316 {
317 	struct rtrs_permit *permit;
318 
319 	permit = rtrs_clt_get_permit(sess->rtrs, con_type,
320 				      wait ? RTRS_PERMIT_WAIT :
321 				      RTRS_PERMIT_NOWAIT);
322 	if (likely(permit))
323 		/* We have a subtle rare case here, when all permits can be
324 		 * consumed before busy counter increased.  This is safe,
325 		 * because loser will get NULL as a permit, observe 0 busy
326 		 * counter and immediately restart the queue himself.
327 		 */
328 		atomic_inc(&sess->busy);
329 
330 	return permit;
331 }
332 
333 static void rnbd_put_permit(struct rnbd_clt_session *sess,
334 			     struct rtrs_permit *permit)
335 {
336 	rtrs_clt_put_permit(sess->rtrs, permit);
337 	atomic_dec(&sess->busy);
338 	/* Paired with rnbd_clt_dev_add_to_requeue().  Decrement first
339 	 * and then check queue bits.
340 	 */
341 	smp_mb__after_atomic();
342 	rnbd_rerun_all_if_idle(sess);
343 }
344 
345 static struct rnbd_iu *rnbd_get_iu(struct rnbd_clt_session *sess,
346 				     enum rtrs_clt_con_type con_type,
347 				     int wait)
348 {
349 	struct rnbd_iu *iu;
350 	struct rtrs_permit *permit;
351 
352 	iu = kzalloc(sizeof(*iu), GFP_KERNEL);
353 	if (!iu) {
354 		return NULL;
355 	}
356 
357 	permit = rnbd_get_permit(sess, con_type,
358 				  wait ? RTRS_PERMIT_WAIT :
359 				  RTRS_PERMIT_NOWAIT);
360 	if (unlikely(!permit)) {
361 		kfree(iu);
362 		return NULL;
363 	}
364 
365 	iu->permit = permit;
366 	/*
367 	 * 1st reference is dropped after finishing sending a "user" message,
368 	 * 2nd reference is dropped after confirmation with the response is
369 	 * returned.
370 	 * 1st and 2nd can happen in any order, so the rnbd_iu should be
371 	 * released (rtrs_permit returned to rtrs) only after both
372 	 * are finished.
373 	 */
374 	atomic_set(&iu->refcount, 2);
375 	init_waitqueue_head(&iu->comp.wait);
376 	iu->comp.errno = INT_MAX;
377 
378 	if (sg_alloc_table(&iu->sgt, 1, GFP_KERNEL)) {
379 		rnbd_put_permit(sess, permit);
380 		kfree(iu);
381 		return NULL;
382 	}
383 
384 	return iu;
385 }
386 
387 static void rnbd_put_iu(struct rnbd_clt_session *sess, struct rnbd_iu *iu)
388 {
389 	if (atomic_dec_and_test(&iu->refcount)) {
390 		sg_free_table(&iu->sgt);
391 		rnbd_put_permit(sess, iu->permit);
392 		kfree(iu);
393 	}
394 }
395 
396 static void rnbd_softirq_done_fn(struct request *rq)
397 {
398 	struct rnbd_clt_dev *dev	= rq->rq_disk->private_data;
399 	struct rnbd_clt_session *sess	= dev->sess;
400 	struct rnbd_iu *iu;
401 
402 	iu = blk_mq_rq_to_pdu(rq);
403 	sg_free_table_chained(&iu->sgt, RNBD_INLINE_SG_CNT);
404 	rnbd_put_permit(sess, iu->permit);
405 	blk_mq_end_request(rq, errno_to_blk_status(iu->errno));
406 }
407 
408 static void msg_io_conf(void *priv, int errno)
409 {
410 	struct rnbd_iu *iu = priv;
411 	struct rnbd_clt_dev *dev = iu->dev;
412 	struct request *rq = iu->rq;
413 	int rw = rq_data_dir(rq);
414 
415 	iu->errno = errno;
416 
417 	blk_mq_complete_request(rq);
418 
419 	if (errno)
420 		rnbd_clt_info_rl(dev, "%s I/O failed with err: %d\n",
421 				 rw == READ ? "read" : "write", errno);
422 }
423 
424 static void wake_up_iu_comp(struct rnbd_iu *iu, int errno)
425 {
426 	iu->comp.errno = errno;
427 	wake_up(&iu->comp.wait);
428 }
429 
430 static void msg_conf(void *priv, int errno)
431 {
432 	struct rnbd_iu *iu = priv;
433 
434 	iu->errno = errno;
435 	schedule_work(&iu->work);
436 }
437 
438 enum wait_type {
439 	NO_WAIT = 0,
440 	WAIT    = 1
441 };
442 
443 static int send_usr_msg(struct rtrs_clt *rtrs, int dir,
444 			struct rnbd_iu *iu, struct kvec *vec,
445 			size_t len, struct scatterlist *sg, unsigned int sg_len,
446 			void (*conf)(struct work_struct *work),
447 			int *errno, enum wait_type wait)
448 {
449 	int err;
450 	struct rtrs_clt_req_ops req_ops;
451 
452 	INIT_WORK(&iu->work, conf);
453 	req_ops = (struct rtrs_clt_req_ops) {
454 		.priv = iu,
455 		.conf_fn = msg_conf,
456 	};
457 	err = rtrs_clt_request(dir, &req_ops, rtrs, iu->permit,
458 				vec, 1, len, sg, sg_len);
459 	if (!err && wait) {
460 		wait_event(iu->comp.wait, iu->comp.errno != INT_MAX);
461 		*errno = iu->comp.errno;
462 	} else {
463 		*errno = 0;
464 	}
465 
466 	return err;
467 }
468 
469 static void msg_close_conf(struct work_struct *work)
470 {
471 	struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
472 	struct rnbd_clt_dev *dev = iu->dev;
473 
474 	wake_up_iu_comp(iu, iu->errno);
475 	rnbd_put_iu(dev->sess, iu);
476 	rnbd_clt_put_dev(dev);
477 }
478 
479 static int send_msg_close(struct rnbd_clt_dev *dev, u32 device_id, bool wait)
480 {
481 	struct rnbd_clt_session *sess = dev->sess;
482 	struct rnbd_msg_close msg;
483 	struct rnbd_iu *iu;
484 	struct kvec vec = {
485 		.iov_base = &msg,
486 		.iov_len  = sizeof(msg)
487 	};
488 	int err, errno;
489 
490 	iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
491 	if (!iu)
492 		return -ENOMEM;
493 
494 	iu->buf = NULL;
495 	iu->dev = dev;
496 
497 	msg.hdr.type	= cpu_to_le16(RNBD_MSG_CLOSE);
498 	msg.device_id	= cpu_to_le32(device_id);
499 
500 	WARN_ON(!rnbd_clt_get_dev(dev));
501 	err = send_usr_msg(sess->rtrs, WRITE, iu, &vec, 0, NULL, 0,
502 			   msg_close_conf, &errno, wait);
503 	if (err) {
504 		rnbd_clt_put_dev(dev);
505 		rnbd_put_iu(sess, iu);
506 	} else {
507 		err = errno;
508 	}
509 
510 	rnbd_put_iu(sess, iu);
511 	return err;
512 }
513 
514 static void msg_open_conf(struct work_struct *work)
515 {
516 	struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
517 	struct rnbd_msg_open_rsp *rsp = iu->buf;
518 	struct rnbd_clt_dev *dev = iu->dev;
519 	int errno = iu->errno;
520 
521 	if (errno) {
522 		rnbd_clt_err(dev,
523 			      "Opening failed, server responded: %d\n",
524 			      errno);
525 	} else {
526 		errno = process_msg_open_rsp(dev, rsp);
527 		if (errno) {
528 			u32 device_id = le32_to_cpu(rsp->device_id);
529 			/*
530 			 * If server thinks its fine, but we fail to process
531 			 * then be nice and send a close to server.
532 			 */
533 			(void)send_msg_close(dev, device_id, NO_WAIT);
534 		}
535 	}
536 	kfree(rsp);
537 	wake_up_iu_comp(iu, errno);
538 	rnbd_put_iu(dev->sess, iu);
539 	rnbd_clt_put_dev(dev);
540 }
541 
542 static void msg_sess_info_conf(struct work_struct *work)
543 {
544 	struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
545 	struct rnbd_msg_sess_info_rsp *rsp = iu->buf;
546 	struct rnbd_clt_session *sess = iu->sess;
547 
548 	if (!iu->errno)
549 		sess->ver = min_t(u8, rsp->ver, RNBD_PROTO_VER_MAJOR);
550 
551 	kfree(rsp);
552 	wake_up_iu_comp(iu, iu->errno);
553 	rnbd_put_iu(sess, iu);
554 	rnbd_clt_put_sess(sess);
555 }
556 
557 static int send_msg_open(struct rnbd_clt_dev *dev, bool wait)
558 {
559 	struct rnbd_clt_session *sess = dev->sess;
560 	struct rnbd_msg_open_rsp *rsp;
561 	struct rnbd_msg_open msg;
562 	struct rnbd_iu *iu;
563 	struct kvec vec = {
564 		.iov_base = &msg,
565 		.iov_len  = sizeof(msg)
566 	};
567 	int err, errno;
568 
569 	rsp = kzalloc(sizeof(*rsp), GFP_KERNEL);
570 	if (!rsp)
571 		return -ENOMEM;
572 
573 	iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
574 	if (!iu) {
575 		kfree(rsp);
576 		return -ENOMEM;
577 	}
578 
579 	iu->buf = rsp;
580 	iu->dev = dev;
581 
582 	sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp));
583 
584 	msg.hdr.type	= cpu_to_le16(RNBD_MSG_OPEN);
585 	msg.access_mode	= dev->access_mode;
586 	strlcpy(msg.dev_name, dev->pathname, sizeof(msg.dev_name));
587 
588 	WARN_ON(!rnbd_clt_get_dev(dev));
589 	err = send_usr_msg(sess->rtrs, READ, iu,
590 			   &vec, sizeof(*rsp), iu->sgt.sgl, 1,
591 			   msg_open_conf, &errno, wait);
592 	if (err) {
593 		rnbd_clt_put_dev(dev);
594 		rnbd_put_iu(sess, iu);
595 		kfree(rsp);
596 	} else {
597 		err = errno;
598 	}
599 
600 	rnbd_put_iu(sess, iu);
601 	return err;
602 }
603 
604 static int send_msg_sess_info(struct rnbd_clt_session *sess, bool wait)
605 {
606 	struct rnbd_msg_sess_info_rsp *rsp;
607 	struct rnbd_msg_sess_info msg;
608 	struct rnbd_iu *iu;
609 	struct kvec vec = {
610 		.iov_base = &msg,
611 		.iov_len  = sizeof(msg)
612 	};
613 	int err, errno;
614 
615 	rsp = kzalloc(sizeof(*rsp), GFP_KERNEL);
616 	if (!rsp)
617 		return -ENOMEM;
618 
619 	iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
620 	if (!iu) {
621 		kfree(rsp);
622 		return -ENOMEM;
623 	}
624 
625 	iu->buf = rsp;
626 	iu->sess = sess;
627 	sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp));
628 
629 	msg.hdr.type = cpu_to_le16(RNBD_MSG_SESS_INFO);
630 	msg.ver      = RNBD_PROTO_VER_MAJOR;
631 
632 	if (!rnbd_clt_get_sess(sess)) {
633 		/*
634 		 * That can happen only in one case, when RTRS has restablished
635 		 * the connection and link_ev() is called, but session is almost
636 		 * dead, last reference on session is put and caller is waiting
637 		 * for RTRS to close everything.
638 		 */
639 		err = -ENODEV;
640 		goto put_iu;
641 	}
642 	err = send_usr_msg(sess->rtrs, READ, iu,
643 			   &vec, sizeof(*rsp), iu->sgt.sgl, 1,
644 			   msg_sess_info_conf, &errno, wait);
645 	if (err) {
646 		rnbd_clt_put_sess(sess);
647 put_iu:
648 		rnbd_put_iu(sess, iu);
649 		kfree(rsp);
650 	} else {
651 		err = errno;
652 	}
653 	rnbd_put_iu(sess, iu);
654 	return err;
655 }
656 
657 static void set_dev_states_to_disconnected(struct rnbd_clt_session *sess)
658 {
659 	struct rnbd_clt_dev *dev;
660 
661 	mutex_lock(&sess->lock);
662 	list_for_each_entry(dev, &sess->devs_list, list) {
663 		rnbd_clt_err(dev, "Device disconnected.\n");
664 
665 		mutex_lock(&dev->lock);
666 		if (dev->dev_state == DEV_STATE_MAPPED)
667 			dev->dev_state = DEV_STATE_MAPPED_DISCONNECTED;
668 		mutex_unlock(&dev->lock);
669 	}
670 	mutex_unlock(&sess->lock);
671 }
672 
673 static void remap_devs(struct rnbd_clt_session *sess)
674 {
675 	struct rnbd_clt_dev *dev;
676 	struct rtrs_attrs attrs;
677 	int err;
678 
679 	/*
680 	 * Careful here: we are called from RTRS link event directly,
681 	 * thus we can't send any RTRS request and wait for response
682 	 * or RTRS will not be able to complete request with failure
683 	 * if something goes wrong (failing of outstanding requests
684 	 * happens exactly from the context where we are blocking now).
685 	 *
686 	 * So to avoid deadlocks each usr message sent from here must
687 	 * be asynchronous.
688 	 */
689 
690 	err = send_msg_sess_info(sess, NO_WAIT);
691 	if (err) {
692 		pr_err("send_msg_sess_info(\"%s\"): %d\n", sess->sessname, err);
693 		return;
694 	}
695 
696 	rtrs_clt_query(sess->rtrs, &attrs);
697 	mutex_lock(&sess->lock);
698 	sess->max_io_size = attrs.max_io_size;
699 
700 	list_for_each_entry(dev, &sess->devs_list, list) {
701 		bool skip;
702 
703 		mutex_lock(&dev->lock);
704 		skip = (dev->dev_state == DEV_STATE_INIT);
705 		mutex_unlock(&dev->lock);
706 		if (skip)
707 			/*
708 			 * When device is establishing connection for the first
709 			 * time - do not remap, it will be closed soon.
710 			 */
711 			continue;
712 
713 		rnbd_clt_info(dev, "session reconnected, remapping device\n");
714 		err = send_msg_open(dev, NO_WAIT);
715 		if (err) {
716 			rnbd_clt_err(dev, "send_msg_open(): %d\n", err);
717 			break;
718 		}
719 	}
720 	mutex_unlock(&sess->lock);
721 }
722 
723 static void rnbd_clt_link_ev(void *priv, enum rtrs_clt_link_ev ev)
724 {
725 	struct rnbd_clt_session *sess = priv;
726 
727 	switch (ev) {
728 	case RTRS_CLT_LINK_EV_DISCONNECTED:
729 		set_dev_states_to_disconnected(sess);
730 		break;
731 	case RTRS_CLT_LINK_EV_RECONNECTED:
732 		remap_devs(sess);
733 		break;
734 	default:
735 		pr_err("Unknown session event received (%d), session: %s\n",
736 		       ev, sess->sessname);
737 	}
738 }
739 
740 static void rnbd_init_cpu_qlists(struct rnbd_cpu_qlist __percpu *cpu_queues)
741 {
742 	unsigned int cpu;
743 	struct rnbd_cpu_qlist *cpu_q;
744 
745 	for_each_possible_cpu(cpu) {
746 		cpu_q = per_cpu_ptr(cpu_queues, cpu);
747 
748 		cpu_q->cpu = cpu;
749 		INIT_LIST_HEAD(&cpu_q->requeue_list);
750 		spin_lock_init(&cpu_q->requeue_lock);
751 	}
752 }
753 
754 static void destroy_mq_tags(struct rnbd_clt_session *sess)
755 {
756 	if (sess->tag_set.tags)
757 		blk_mq_free_tag_set(&sess->tag_set);
758 }
759 
760 static inline void wake_up_rtrs_waiters(struct rnbd_clt_session *sess)
761 {
762 	sess->rtrs_ready = true;
763 	wake_up_all(&sess->rtrs_waitq);
764 }
765 
766 static void close_rtrs(struct rnbd_clt_session *sess)
767 {
768 	might_sleep();
769 
770 	if (!IS_ERR_OR_NULL(sess->rtrs)) {
771 		rtrs_clt_close(sess->rtrs);
772 		sess->rtrs = NULL;
773 		wake_up_rtrs_waiters(sess);
774 	}
775 }
776 
777 static void free_sess(struct rnbd_clt_session *sess)
778 {
779 	WARN_ON(!list_empty(&sess->devs_list));
780 
781 	might_sleep();
782 
783 	close_rtrs(sess);
784 	destroy_mq_tags(sess);
785 	if (!list_empty(&sess->list)) {
786 		mutex_lock(&sess_lock);
787 		list_del(&sess->list);
788 		mutex_unlock(&sess_lock);
789 	}
790 	free_percpu(sess->cpu_queues);
791 	free_percpu(sess->cpu_rr);
792 	mutex_destroy(&sess->lock);
793 	kfree(sess);
794 }
795 
796 static struct rnbd_clt_session *alloc_sess(const char *sessname)
797 {
798 	struct rnbd_clt_session *sess;
799 	int err, cpu;
800 
801 	sess = kzalloc_node(sizeof(*sess), GFP_KERNEL, NUMA_NO_NODE);
802 	if (!sess)
803 		return ERR_PTR(-ENOMEM);
804 	strlcpy(sess->sessname, sessname, sizeof(sess->sessname));
805 	atomic_set(&sess->busy, 0);
806 	mutex_init(&sess->lock);
807 	INIT_LIST_HEAD(&sess->devs_list);
808 	INIT_LIST_HEAD(&sess->list);
809 	bitmap_zero(sess->cpu_queues_bm, NR_CPUS);
810 	init_waitqueue_head(&sess->rtrs_waitq);
811 	refcount_set(&sess->refcount, 1);
812 
813 	sess->cpu_queues = alloc_percpu(struct rnbd_cpu_qlist);
814 	if (!sess->cpu_queues) {
815 		err = -ENOMEM;
816 		goto err;
817 	}
818 	rnbd_init_cpu_qlists(sess->cpu_queues);
819 
820 	/*
821 	 * That is simple percpu variable which stores cpu indices, which are
822 	 * incremented on each access.  We need that for the sake of fairness
823 	 * to wake up queues in a round-robin manner.
824 	 */
825 	sess->cpu_rr = alloc_percpu(int);
826 	if (!sess->cpu_rr) {
827 		err = -ENOMEM;
828 		goto err;
829 	}
830 	for_each_possible_cpu(cpu)
831 		* per_cpu_ptr(sess->cpu_rr, cpu) = cpu;
832 
833 	return sess;
834 
835 err:
836 	free_sess(sess);
837 
838 	return ERR_PTR(err);
839 }
840 
841 static int wait_for_rtrs_connection(struct rnbd_clt_session *sess)
842 {
843 	wait_event(sess->rtrs_waitq, sess->rtrs_ready);
844 	if (IS_ERR_OR_NULL(sess->rtrs))
845 		return -ECONNRESET;
846 
847 	return 0;
848 }
849 
850 static void wait_for_rtrs_disconnection(struct rnbd_clt_session *sess)
851 	__releases(&sess_lock)
852 	__acquires(&sess_lock)
853 {
854 	DEFINE_WAIT(wait);
855 
856 	prepare_to_wait(&sess->rtrs_waitq, &wait, TASK_UNINTERRUPTIBLE);
857 	if (IS_ERR_OR_NULL(sess->rtrs)) {
858 		finish_wait(&sess->rtrs_waitq, &wait);
859 		return;
860 	}
861 	mutex_unlock(&sess_lock);
862 	/* loop in caller, see __find_and_get_sess().
863 	 * You can't leave mutex locked and call schedule(), you will catch a
864 	 * deadlock with a caller of free_sess(), which has just put the last
865 	 * reference and is about to take the sess_lock in order to delete
866 	 * the session from the list.
867 	 */
868 	schedule();
869 	mutex_lock(&sess_lock);
870 }
871 
872 static struct rnbd_clt_session *__find_and_get_sess(const char *sessname)
873 	__releases(&sess_lock)
874 	__acquires(&sess_lock)
875 {
876 	struct rnbd_clt_session *sess, *sn;
877 	int err;
878 
879 again:
880 	list_for_each_entry_safe(sess, sn, &sess_list, list) {
881 		if (strcmp(sessname, sess->sessname))
882 			continue;
883 
884 		if (sess->rtrs_ready && IS_ERR_OR_NULL(sess->rtrs))
885 			/*
886 			 * No RTRS connection, session is dying.
887 			 */
888 			continue;
889 
890 		if (rnbd_clt_get_sess(sess)) {
891 			/*
892 			 * Alive session is found, wait for RTRS connection.
893 			 */
894 			mutex_unlock(&sess_lock);
895 			err = wait_for_rtrs_connection(sess);
896 			if (err)
897 				rnbd_clt_put_sess(sess);
898 			mutex_lock(&sess_lock);
899 
900 			if (err)
901 				/* Session is dying, repeat the loop */
902 				goto again;
903 
904 			return sess;
905 		}
906 		/*
907 		 * Ref is 0, session is dying, wait for RTRS disconnect
908 		 * in order to avoid session names clashes.
909 		 */
910 		wait_for_rtrs_disconnection(sess);
911 		/*
912 		 * RTRS is disconnected and soon session will be freed,
913 		 * so repeat a loop.
914 		 */
915 		goto again;
916 	}
917 
918 	return NULL;
919 }
920 
921 static struct
922 rnbd_clt_session *find_or_create_sess(const char *sessname, bool *first)
923 {
924 	struct rnbd_clt_session *sess = NULL;
925 
926 	mutex_lock(&sess_lock);
927 	sess = __find_and_get_sess(sessname);
928 	if (!sess) {
929 		sess = alloc_sess(sessname);
930 		if (IS_ERR(sess)) {
931 			mutex_unlock(&sess_lock);
932 			return sess;
933 		}
934 		list_add(&sess->list, &sess_list);
935 		*first = true;
936 	} else
937 		*first = false;
938 	mutex_unlock(&sess_lock);
939 
940 	return sess;
941 }
942 
943 static int rnbd_client_open(struct block_device *block_device, fmode_t mode)
944 {
945 	struct rnbd_clt_dev *dev = block_device->bd_disk->private_data;
946 
947 	if (dev->read_only && (mode & FMODE_WRITE))
948 		return -EPERM;
949 
950 	if (dev->dev_state == DEV_STATE_UNMAPPED ||
951 	    !rnbd_clt_get_dev(dev))
952 		return -EIO;
953 
954 	return 0;
955 }
956 
957 static void rnbd_client_release(struct gendisk *gen, fmode_t mode)
958 {
959 	struct rnbd_clt_dev *dev = gen->private_data;
960 
961 	rnbd_clt_put_dev(dev);
962 }
963 
964 static int rnbd_client_getgeo(struct block_device *block_device,
965 			      struct hd_geometry *geo)
966 {
967 	u64 size;
968 	struct rnbd_clt_dev *dev;
969 
970 	dev = block_device->bd_disk->private_data;
971 	size = dev->size * (dev->logical_block_size / SECTOR_SIZE);
972 	geo->cylinders	= size >> 6;	/* size/64 */
973 	geo->heads	= 4;
974 	geo->sectors	= 16;
975 	geo->start	= 0;
976 
977 	return 0;
978 }
979 
980 static const struct block_device_operations rnbd_client_ops = {
981 	.owner		= THIS_MODULE,
982 	.open		= rnbd_client_open,
983 	.release	= rnbd_client_release,
984 	.getgeo		= rnbd_client_getgeo
985 };
986 
987 /* The amount of data that belongs to an I/O and the amount of data that
988  * should be read or written to the disk (bi_size) can differ.
989  *
990  * E.g. When WRITE_SAME is used, only a small amount of data is
991  * transferred that is then written repeatedly over a lot of sectors.
992  *
993  * Get the size of data to be transferred via RTRS by summing up the size
994  * of the scather-gather list entries.
995  */
996 static size_t rnbd_clt_get_sg_size(struct scatterlist *sglist, u32 len)
997 {
998 	struct scatterlist *sg;
999 	size_t tsize = 0;
1000 	int i;
1001 
1002 	for_each_sg(sglist, sg, len, i)
1003 		tsize += sg->length;
1004 	return tsize;
1005 }
1006 
1007 static int rnbd_client_xfer_request(struct rnbd_clt_dev *dev,
1008 				     struct request *rq,
1009 				     struct rnbd_iu *iu)
1010 {
1011 	struct rtrs_clt *rtrs = dev->sess->rtrs;
1012 	struct rtrs_permit *permit = iu->permit;
1013 	struct rnbd_msg_io msg;
1014 	struct rtrs_clt_req_ops req_ops;
1015 	unsigned int sg_cnt = 0;
1016 	struct kvec vec;
1017 	size_t size;
1018 	int err;
1019 
1020 	iu->rq		= rq;
1021 	iu->dev		= dev;
1022 	msg.sector	= cpu_to_le64(blk_rq_pos(rq));
1023 	msg.bi_size	= cpu_to_le32(blk_rq_bytes(rq));
1024 	msg.rw		= cpu_to_le32(rq_to_rnbd_flags(rq));
1025 	msg.prio	= cpu_to_le16(req_get_ioprio(rq));
1026 
1027 	/*
1028 	 * We only support discards with single segment for now.
1029 	 * See queue limits.
1030 	 */
1031 	if (req_op(rq) != REQ_OP_DISCARD)
1032 		sg_cnt = blk_rq_map_sg(dev->queue, rq, iu->sgt.sgl);
1033 
1034 	if (sg_cnt == 0)
1035 		sg_mark_end(&iu->sgt.sgl[0]);
1036 
1037 	msg.hdr.type	= cpu_to_le16(RNBD_MSG_IO);
1038 	msg.device_id	= cpu_to_le32(dev->device_id);
1039 
1040 	vec = (struct kvec) {
1041 		.iov_base = &msg,
1042 		.iov_len  = sizeof(msg)
1043 	};
1044 	size = rnbd_clt_get_sg_size(iu->sgt.sgl, sg_cnt);
1045 	req_ops = (struct rtrs_clt_req_ops) {
1046 		.priv = iu,
1047 		.conf_fn = msg_io_conf,
1048 	};
1049 	err = rtrs_clt_request(rq_data_dir(rq), &req_ops, rtrs, permit,
1050 			       &vec, 1, size, iu->sgt.sgl, sg_cnt);
1051 	if (unlikely(err)) {
1052 		rnbd_clt_err_rl(dev, "RTRS failed to transfer IO, err: %d\n",
1053 				 err);
1054 		return err;
1055 	}
1056 
1057 	return 0;
1058 }
1059 
1060 /**
1061  * rnbd_clt_dev_add_to_requeue() - add device to requeue if session is busy
1062  * @dev:	Device to be checked
1063  * @q:		Queue to be added to the requeue list if required
1064  *
1065  * Description:
1066  *     If session is busy, that means someone will requeue us when resources
1067  *     are freed.  If session is not doing anything - device is not added to
1068  *     the list and @false is returned.
1069  */
1070 static bool rnbd_clt_dev_add_to_requeue(struct rnbd_clt_dev *dev,
1071 						struct rnbd_queue *q)
1072 {
1073 	struct rnbd_clt_session *sess = dev->sess;
1074 	struct rnbd_cpu_qlist *cpu_q;
1075 	unsigned long flags;
1076 	bool added = true;
1077 	bool need_set;
1078 
1079 	cpu_q = get_cpu_ptr(sess->cpu_queues);
1080 	spin_lock_irqsave(&cpu_q->requeue_lock, flags);
1081 
1082 	if (likely(!test_and_set_bit_lock(0, &q->in_list))) {
1083 		if (WARN_ON(!list_empty(&q->requeue_list)))
1084 			goto unlock;
1085 
1086 		need_set = !test_bit(cpu_q->cpu, sess->cpu_queues_bm);
1087 		if (need_set) {
1088 			set_bit(cpu_q->cpu, sess->cpu_queues_bm);
1089 			/* Paired with rnbd_put_permit(). Set a bit first
1090 			 * and then observe the busy counter.
1091 			 */
1092 			smp_mb__before_atomic();
1093 		}
1094 		if (likely(atomic_read(&sess->busy))) {
1095 			list_add_tail(&q->requeue_list, &cpu_q->requeue_list);
1096 		} else {
1097 			/* Very unlikely, but possible: busy counter was
1098 			 * observed as zero.  Drop all bits and return
1099 			 * false to restart the queue by ourselves.
1100 			 */
1101 			if (need_set)
1102 				clear_bit(cpu_q->cpu, sess->cpu_queues_bm);
1103 			clear_bit_unlock(0, &q->in_list);
1104 			added = false;
1105 		}
1106 	}
1107 unlock:
1108 	spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
1109 	put_cpu_ptr(sess->cpu_queues);
1110 
1111 	return added;
1112 }
1113 
1114 static void rnbd_clt_dev_kick_mq_queue(struct rnbd_clt_dev *dev,
1115 					struct blk_mq_hw_ctx *hctx,
1116 					int delay)
1117 {
1118 	struct rnbd_queue *q = hctx->driver_data;
1119 
1120 	if (delay != RNBD_DELAY_IFBUSY)
1121 		blk_mq_delay_run_hw_queue(hctx, delay);
1122 	else if (unlikely(!rnbd_clt_dev_add_to_requeue(dev, q)))
1123 		/*
1124 		 * If session is not busy we have to restart
1125 		 * the queue ourselves.
1126 		 */
1127 		blk_mq_delay_run_hw_queue(hctx, 10/*ms*/);
1128 }
1129 
1130 static blk_status_t rnbd_queue_rq(struct blk_mq_hw_ctx *hctx,
1131 				   const struct blk_mq_queue_data *bd)
1132 {
1133 	struct request *rq = bd->rq;
1134 	struct rnbd_clt_dev *dev = rq->rq_disk->private_data;
1135 	struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq);
1136 	int err;
1137 	blk_status_t ret = BLK_STS_IOERR;
1138 
1139 	if (unlikely(dev->dev_state != DEV_STATE_MAPPED))
1140 		return BLK_STS_IOERR;
1141 
1142 	iu->permit = rnbd_get_permit(dev->sess, RTRS_IO_CON,
1143 				      RTRS_PERMIT_NOWAIT);
1144 	if (unlikely(!iu->permit)) {
1145 		rnbd_clt_dev_kick_mq_queue(dev, hctx, RNBD_DELAY_IFBUSY);
1146 		return BLK_STS_RESOURCE;
1147 	}
1148 
1149 	iu->sgt.sgl = iu->first_sgl;
1150 	err = sg_alloc_table_chained(&iu->sgt,
1151 				     /* Even-if the request has no segment,
1152 				      * sglist must have one entry at least */
1153 				     blk_rq_nr_phys_segments(rq) ? : 1,
1154 				     iu->sgt.sgl,
1155 				     RNBD_INLINE_SG_CNT);
1156 	if (err) {
1157 		rnbd_clt_err_rl(dev, "sg_alloc_table_chained ret=%d\n", err);
1158 		rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/);
1159 		rnbd_put_permit(dev->sess, iu->permit);
1160 		return BLK_STS_RESOURCE;
1161 	}
1162 
1163 	blk_mq_start_request(rq);
1164 	err = rnbd_client_xfer_request(dev, rq, iu);
1165 	if (likely(err == 0))
1166 		return BLK_STS_OK;
1167 	if (unlikely(err == -EAGAIN || err == -ENOMEM)) {
1168 		rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/);
1169 		ret = BLK_STS_RESOURCE;
1170 	}
1171 	sg_free_table_chained(&iu->sgt, RNBD_INLINE_SG_CNT);
1172 	rnbd_put_permit(dev->sess, iu->permit);
1173 	return ret;
1174 }
1175 
1176 static struct blk_mq_ops rnbd_mq_ops = {
1177 	.queue_rq	= rnbd_queue_rq,
1178 	.complete	= rnbd_softirq_done_fn,
1179 };
1180 
1181 static int setup_mq_tags(struct rnbd_clt_session *sess)
1182 {
1183 	struct blk_mq_tag_set *tag_set = &sess->tag_set;
1184 
1185 	memset(tag_set, 0, sizeof(*tag_set));
1186 	tag_set->ops		= &rnbd_mq_ops;
1187 	tag_set->queue_depth	= sess->queue_depth;
1188 	tag_set->numa_node		= NUMA_NO_NODE;
1189 	tag_set->flags		= BLK_MQ_F_SHOULD_MERGE |
1190 				  BLK_MQ_F_TAG_QUEUE_SHARED;
1191 	tag_set->cmd_size	= sizeof(struct rnbd_iu) + RNBD_RDMA_SGL_SIZE;
1192 	tag_set->nr_hw_queues	= num_online_cpus();
1193 
1194 	return blk_mq_alloc_tag_set(tag_set);
1195 }
1196 
1197 static struct rnbd_clt_session *
1198 find_and_get_or_create_sess(const char *sessname,
1199 			    const struct rtrs_addr *paths,
1200 			    size_t path_cnt, u16 port_nr)
1201 {
1202 	struct rnbd_clt_session *sess;
1203 	struct rtrs_attrs attrs;
1204 	int err;
1205 	bool first;
1206 	struct rtrs_clt_ops rtrs_ops;
1207 
1208 	sess = find_or_create_sess(sessname, &first);
1209 	if (sess == ERR_PTR(-ENOMEM))
1210 		return ERR_PTR(-ENOMEM);
1211 	else if (!first)
1212 		return sess;
1213 
1214 	if (!path_cnt) {
1215 		pr_err("Session %s not found, and path parameter not given", sessname);
1216 		err = -ENXIO;
1217 		goto put_sess;
1218 	}
1219 
1220 	rtrs_ops = (struct rtrs_clt_ops) {
1221 		.priv = sess,
1222 		.link_ev = rnbd_clt_link_ev,
1223 	};
1224 	/*
1225 	 * Nothing was found, establish rtrs connection and proceed further.
1226 	 */
1227 	sess->rtrs = rtrs_clt_open(&rtrs_ops, sessname,
1228 				   paths, path_cnt, port_nr,
1229 				   0, /* Do not use pdu of rtrs */
1230 				   RECONNECT_DELAY, BMAX_SEGMENTS,
1231 				   BLK_MAX_SEGMENT_SIZE,
1232 				   MAX_RECONNECTS);
1233 	if (IS_ERR(sess->rtrs)) {
1234 		err = PTR_ERR(sess->rtrs);
1235 		goto wake_up_and_put;
1236 	}
1237 	rtrs_clt_query(sess->rtrs, &attrs);
1238 	sess->max_io_size = attrs.max_io_size;
1239 	sess->queue_depth = attrs.queue_depth;
1240 
1241 	err = setup_mq_tags(sess);
1242 	if (err)
1243 		goto close_rtrs;
1244 
1245 	err = send_msg_sess_info(sess, WAIT);
1246 	if (err)
1247 		goto close_rtrs;
1248 
1249 	wake_up_rtrs_waiters(sess);
1250 
1251 	return sess;
1252 
1253 close_rtrs:
1254 	close_rtrs(sess);
1255 put_sess:
1256 	rnbd_clt_put_sess(sess);
1257 
1258 	return ERR_PTR(err);
1259 
1260 wake_up_and_put:
1261 	wake_up_rtrs_waiters(sess);
1262 	goto put_sess;
1263 }
1264 
1265 static inline void rnbd_init_hw_queue(struct rnbd_clt_dev *dev,
1266 				       struct rnbd_queue *q,
1267 				       struct blk_mq_hw_ctx *hctx)
1268 {
1269 	INIT_LIST_HEAD(&q->requeue_list);
1270 	q->dev  = dev;
1271 	q->hctx = hctx;
1272 }
1273 
1274 static void rnbd_init_mq_hw_queues(struct rnbd_clt_dev *dev)
1275 {
1276 	int i;
1277 	struct blk_mq_hw_ctx *hctx;
1278 	struct rnbd_queue *q;
1279 
1280 	queue_for_each_hw_ctx(dev->queue, hctx, i) {
1281 		q = &dev->hw_queues[i];
1282 		rnbd_init_hw_queue(dev, q, hctx);
1283 		hctx->driver_data = q;
1284 	}
1285 }
1286 
1287 static int setup_mq_dev(struct rnbd_clt_dev *dev)
1288 {
1289 	dev->queue = blk_mq_init_queue(&dev->sess->tag_set);
1290 	if (IS_ERR(dev->queue)) {
1291 		rnbd_clt_err(dev, "Initializing multiqueue queue failed, err: %ld\n",
1292 			      PTR_ERR(dev->queue));
1293 		return PTR_ERR(dev->queue);
1294 	}
1295 	rnbd_init_mq_hw_queues(dev);
1296 	return 0;
1297 }
1298 
1299 static void setup_request_queue(struct rnbd_clt_dev *dev)
1300 {
1301 	blk_queue_logical_block_size(dev->queue, dev->logical_block_size);
1302 	blk_queue_physical_block_size(dev->queue, dev->physical_block_size);
1303 	blk_queue_max_hw_sectors(dev->queue, dev->max_hw_sectors);
1304 	blk_queue_max_write_same_sectors(dev->queue,
1305 					 dev->max_write_same_sectors);
1306 
1307 	/*
1308 	 * we don't support discards to "discontiguous" segments
1309 	 * in on request
1310 	 */
1311 	blk_queue_max_discard_segments(dev->queue, 1);
1312 
1313 	blk_queue_max_discard_sectors(dev->queue, dev->max_discard_sectors);
1314 	dev->queue->limits.discard_granularity	= dev->discard_granularity;
1315 	dev->queue->limits.discard_alignment	= dev->discard_alignment;
1316 	if (dev->max_discard_sectors)
1317 		blk_queue_flag_set(QUEUE_FLAG_DISCARD, dev->queue);
1318 	if (dev->secure_discard)
1319 		blk_queue_flag_set(QUEUE_FLAG_SECERASE, dev->queue);
1320 
1321 	blk_queue_flag_set(QUEUE_FLAG_SAME_COMP, dev->queue);
1322 	blk_queue_flag_set(QUEUE_FLAG_SAME_FORCE, dev->queue);
1323 	blk_queue_max_segments(dev->queue, dev->max_segments);
1324 	blk_queue_io_opt(dev->queue, dev->sess->max_io_size);
1325 	blk_queue_virt_boundary(dev->queue, SZ_4K - 1);
1326 	blk_queue_write_cache(dev->queue, dev->wc, dev->fua);
1327 	dev->queue->queuedata = dev;
1328 }
1329 
1330 static void rnbd_clt_setup_gen_disk(struct rnbd_clt_dev *dev, int idx)
1331 {
1332 	dev->gd->major		= rnbd_client_major;
1333 	dev->gd->first_minor	= idx << RNBD_PART_BITS;
1334 	dev->gd->fops		= &rnbd_client_ops;
1335 	dev->gd->queue		= dev->queue;
1336 	dev->gd->private_data	= dev;
1337 	snprintf(dev->gd->disk_name, sizeof(dev->gd->disk_name), "rnbd%d",
1338 		 idx);
1339 	pr_debug("disk_name=%s, capacity=%zu\n",
1340 		 dev->gd->disk_name,
1341 		 dev->nsectors * (dev->logical_block_size / SECTOR_SIZE)
1342 		 );
1343 
1344 	set_capacity(dev->gd, dev->nsectors);
1345 
1346 	if (dev->access_mode == RNBD_ACCESS_RO) {
1347 		dev->read_only = true;
1348 		set_disk_ro(dev->gd, true);
1349 	} else {
1350 		dev->read_only = false;
1351 	}
1352 
1353 	if (!dev->rotational)
1354 		blk_queue_flag_set(QUEUE_FLAG_NONROT, dev->queue);
1355 }
1356 
1357 static int rnbd_client_setup_device(struct rnbd_clt_session *sess,
1358 				     struct rnbd_clt_dev *dev, int idx)
1359 {
1360 	int err;
1361 
1362 	dev->size = dev->nsectors * dev->logical_block_size;
1363 
1364 	err = setup_mq_dev(dev);
1365 	if (err)
1366 		return err;
1367 
1368 	setup_request_queue(dev);
1369 
1370 	dev->gd = alloc_disk_node(1 << RNBD_PART_BITS,	NUMA_NO_NODE);
1371 	if (!dev->gd) {
1372 		blk_cleanup_queue(dev->queue);
1373 		return -ENOMEM;
1374 	}
1375 
1376 	rnbd_clt_setup_gen_disk(dev, idx);
1377 
1378 	return 0;
1379 }
1380 
1381 static struct rnbd_clt_dev *init_dev(struct rnbd_clt_session *sess,
1382 				      enum rnbd_access_mode access_mode,
1383 				      const char *pathname)
1384 {
1385 	struct rnbd_clt_dev *dev;
1386 	int ret;
1387 
1388 	dev = kzalloc_node(sizeof(*dev), GFP_KERNEL, NUMA_NO_NODE);
1389 	if (!dev)
1390 		return ERR_PTR(-ENOMEM);
1391 
1392 	dev->hw_queues = kcalloc(nr_cpu_ids, sizeof(*dev->hw_queues),
1393 				 GFP_KERNEL);
1394 	if (!dev->hw_queues) {
1395 		ret = -ENOMEM;
1396 		goto out_alloc;
1397 	}
1398 
1399 	mutex_lock(&ida_lock);
1400 	ret = ida_simple_get(&index_ida, 0, 1 << (MINORBITS - RNBD_PART_BITS),
1401 			     GFP_KERNEL);
1402 	mutex_unlock(&ida_lock);
1403 	if (ret < 0) {
1404 		pr_err("Failed to initialize device '%s' from session %s, allocating idr failed, err: %d\n",
1405 		       pathname, sess->sessname, ret);
1406 		goto out_queues;
1407 	}
1408 
1409 	dev->pathname = kstrdup(pathname, GFP_KERNEL);
1410 	if (!dev->pathname) {
1411 		ret = -ENOMEM;
1412 		goto out_queues;
1413 	}
1414 
1415 	dev->clt_device_id	= ret;
1416 	dev->sess		= sess;
1417 	dev->access_mode	= access_mode;
1418 	mutex_init(&dev->lock);
1419 	refcount_set(&dev->refcount, 1);
1420 	dev->dev_state = DEV_STATE_INIT;
1421 
1422 	/*
1423 	 * Here we called from sysfs entry, thus clt-sysfs is
1424 	 * responsible that session will not disappear.
1425 	 */
1426 	WARN_ON(!rnbd_clt_get_sess(sess));
1427 
1428 	return dev;
1429 
1430 out_queues:
1431 	kfree(dev->hw_queues);
1432 out_alloc:
1433 	kfree(dev);
1434 	return ERR_PTR(ret);
1435 }
1436 
1437 static bool __exists_dev(const char *pathname, const char *sessname)
1438 {
1439 	struct rnbd_clt_session *sess;
1440 	struct rnbd_clt_dev *dev;
1441 	bool found = false;
1442 
1443 	list_for_each_entry(sess, &sess_list, list) {
1444 		if (sessname && strncmp(sess->sessname, sessname,
1445 					sizeof(sess->sessname)))
1446 			continue;
1447 		mutex_lock(&sess->lock);
1448 		list_for_each_entry(dev, &sess->devs_list, list) {
1449 			if (strlen(dev->pathname) == strlen(pathname) &&
1450 			    !strcmp(dev->pathname, pathname)) {
1451 				found = true;
1452 				break;
1453 			}
1454 		}
1455 		mutex_unlock(&sess->lock);
1456 		if (found)
1457 			break;
1458 	}
1459 
1460 	return found;
1461 }
1462 
1463 static bool exists_devpath(const char *pathname, const char *sessname)
1464 {
1465 	bool found;
1466 
1467 	mutex_lock(&sess_lock);
1468 	found = __exists_dev(pathname, sessname);
1469 	mutex_unlock(&sess_lock);
1470 
1471 	return found;
1472 }
1473 
1474 static bool insert_dev_if_not_exists_devpath(const char *pathname,
1475 					     struct rnbd_clt_session *sess,
1476 					     struct rnbd_clt_dev *dev)
1477 {
1478 	bool found;
1479 
1480 	mutex_lock(&sess_lock);
1481 	found = __exists_dev(pathname, sess->sessname);
1482 	if (!found) {
1483 		mutex_lock(&sess->lock);
1484 		list_add_tail(&dev->list, &sess->devs_list);
1485 		mutex_unlock(&sess->lock);
1486 	}
1487 	mutex_unlock(&sess_lock);
1488 
1489 	return found;
1490 }
1491 
1492 static void delete_dev(struct rnbd_clt_dev *dev)
1493 {
1494 	struct rnbd_clt_session *sess = dev->sess;
1495 
1496 	mutex_lock(&sess->lock);
1497 	list_del(&dev->list);
1498 	mutex_unlock(&sess->lock);
1499 }
1500 
1501 struct rnbd_clt_dev *rnbd_clt_map_device(const char *sessname,
1502 					   struct rtrs_addr *paths,
1503 					   size_t path_cnt, u16 port_nr,
1504 					   const char *pathname,
1505 					   enum rnbd_access_mode access_mode)
1506 {
1507 	struct rnbd_clt_session *sess;
1508 	struct rnbd_clt_dev *dev;
1509 	int ret;
1510 
1511 	if (unlikely(exists_devpath(pathname, sessname)))
1512 		return ERR_PTR(-EEXIST);
1513 
1514 	sess = find_and_get_or_create_sess(sessname, paths, path_cnt, port_nr);
1515 	if (IS_ERR(sess))
1516 		return ERR_CAST(sess);
1517 
1518 	dev = init_dev(sess, access_mode, pathname);
1519 	if (IS_ERR(dev)) {
1520 		pr_err("map_device: failed to map device '%s' from session %s, can't initialize device, err: %ld\n",
1521 		       pathname, sess->sessname, PTR_ERR(dev));
1522 		ret = PTR_ERR(dev);
1523 		goto put_sess;
1524 	}
1525 	if (insert_dev_if_not_exists_devpath(pathname, sess, dev)) {
1526 		ret = -EEXIST;
1527 		goto put_dev;
1528 	}
1529 	ret = send_msg_open(dev, WAIT);
1530 	if (ret) {
1531 		rnbd_clt_err(dev,
1532 			      "map_device: failed, can't open remote device, err: %d\n",
1533 			      ret);
1534 		goto del_dev;
1535 	}
1536 	mutex_lock(&dev->lock);
1537 	pr_debug("Opened remote device: session=%s, path='%s'\n",
1538 		 sess->sessname, pathname);
1539 	ret = rnbd_client_setup_device(sess, dev, dev->clt_device_id);
1540 	if (ret) {
1541 		rnbd_clt_err(dev,
1542 			      "map_device: Failed to configure device, err: %d\n",
1543 			      ret);
1544 		mutex_unlock(&dev->lock);
1545 		goto send_close;
1546 	}
1547 
1548 	rnbd_clt_info(dev,
1549 		       "map_device: Device mapped as %s (nsectors: %zu, logical_block_size: %d, physical_block_size: %d, max_write_same_sectors: %d, max_discard_sectors: %d, discard_granularity: %d, discard_alignment: %d, secure_discard: %d, max_segments: %d, max_hw_sectors: %d, rotational: %d, wc: %d, fua: %d)\n",
1550 		       dev->gd->disk_name, dev->nsectors,
1551 		       dev->logical_block_size, dev->physical_block_size,
1552 		       dev->max_write_same_sectors, dev->max_discard_sectors,
1553 		       dev->discard_granularity, dev->discard_alignment,
1554 		       dev->secure_discard, dev->max_segments,
1555 		       dev->max_hw_sectors, dev->rotational, dev->wc, dev->fua);
1556 
1557 	mutex_unlock(&dev->lock);
1558 
1559 	add_disk(dev->gd);
1560 	rnbd_clt_put_sess(sess);
1561 
1562 	return dev;
1563 
1564 send_close:
1565 	send_msg_close(dev, dev->device_id, WAIT);
1566 del_dev:
1567 	delete_dev(dev);
1568 put_dev:
1569 	rnbd_clt_put_dev(dev);
1570 put_sess:
1571 	rnbd_clt_put_sess(sess);
1572 
1573 	return ERR_PTR(ret);
1574 }
1575 
1576 static void destroy_gen_disk(struct rnbd_clt_dev *dev)
1577 {
1578 	del_gendisk(dev->gd);
1579 	blk_cleanup_queue(dev->queue);
1580 	put_disk(dev->gd);
1581 }
1582 
1583 static void destroy_sysfs(struct rnbd_clt_dev *dev,
1584 			  const struct attribute *sysfs_self)
1585 {
1586 	rnbd_clt_remove_dev_symlink(dev);
1587 	if (dev->kobj.state_initialized) {
1588 		if (sysfs_self)
1589 			/* To avoid deadlock firstly remove itself */
1590 			sysfs_remove_file_self(&dev->kobj, sysfs_self);
1591 		kobject_del(&dev->kobj);
1592 		kobject_put(&dev->kobj);
1593 	}
1594 }
1595 
1596 int rnbd_clt_unmap_device(struct rnbd_clt_dev *dev, bool force,
1597 			   const struct attribute *sysfs_self)
1598 {
1599 	struct rnbd_clt_session *sess = dev->sess;
1600 	int refcount, ret = 0;
1601 	bool was_mapped;
1602 
1603 	mutex_lock(&dev->lock);
1604 	if (dev->dev_state == DEV_STATE_UNMAPPED) {
1605 		rnbd_clt_info(dev, "Device is already being unmapped\n");
1606 		ret = -EALREADY;
1607 		goto err;
1608 	}
1609 	refcount = refcount_read(&dev->refcount);
1610 	if (!force && refcount > 1) {
1611 		rnbd_clt_err(dev,
1612 			      "Closing device failed, device is in use, (%d device users)\n",
1613 			      refcount - 1);
1614 		ret = -EBUSY;
1615 		goto err;
1616 	}
1617 	was_mapped = (dev->dev_state == DEV_STATE_MAPPED);
1618 	dev->dev_state = DEV_STATE_UNMAPPED;
1619 	mutex_unlock(&dev->lock);
1620 
1621 	delete_dev(dev);
1622 	destroy_sysfs(dev, sysfs_self);
1623 	destroy_gen_disk(dev);
1624 	if (was_mapped && sess->rtrs)
1625 		send_msg_close(dev, dev->device_id, WAIT);
1626 
1627 	rnbd_clt_info(dev, "Device is unmapped\n");
1628 
1629 	/* Likely last reference put */
1630 	rnbd_clt_put_dev(dev);
1631 
1632 	/*
1633 	 * Here device and session can be vanished!
1634 	 */
1635 
1636 	return 0;
1637 err:
1638 	mutex_unlock(&dev->lock);
1639 
1640 	return ret;
1641 }
1642 
1643 int rnbd_clt_remap_device(struct rnbd_clt_dev *dev)
1644 {
1645 	int err;
1646 
1647 	mutex_lock(&dev->lock);
1648 	if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED)
1649 		err = 0;
1650 	else if (dev->dev_state == DEV_STATE_UNMAPPED)
1651 		err = -ENODEV;
1652 	else if (dev->dev_state == DEV_STATE_MAPPED)
1653 		err = -EALREADY;
1654 	else
1655 		err = -EBUSY;
1656 	mutex_unlock(&dev->lock);
1657 	if (!err) {
1658 		rnbd_clt_info(dev, "Remapping device.\n");
1659 		err = send_msg_open(dev, WAIT);
1660 		if (err)
1661 			rnbd_clt_err(dev, "remap_device: %d\n", err);
1662 	}
1663 
1664 	return err;
1665 }
1666 
1667 static void unmap_device_work(struct work_struct *work)
1668 {
1669 	struct rnbd_clt_dev *dev;
1670 
1671 	dev = container_of(work, typeof(*dev), unmap_on_rmmod_work);
1672 	rnbd_clt_unmap_device(dev, true, NULL);
1673 }
1674 
1675 static void rnbd_destroy_sessions(void)
1676 {
1677 	struct rnbd_clt_session *sess, *sn;
1678 	struct rnbd_clt_dev *dev, *tn;
1679 
1680 	/* Firstly forbid access through sysfs interface */
1681 	rnbd_clt_destroy_default_group();
1682 	rnbd_clt_destroy_sysfs_files();
1683 
1684 	/*
1685 	 * Here at this point there is no any concurrent access to sessions
1686 	 * list and devices list:
1687 	 *   1. New session or device can't be created - session sysfs files
1688 	 *      are removed.
1689 	 *   2. Device or session can't be removed - module reference is taken
1690 	 *      into account in unmap device sysfs callback.
1691 	 *   3. No IO requests inflight - each file open of block_dev increases
1692 	 *      module reference in get_disk().
1693 	 *
1694 	 * But still there can be user requests inflights, which are sent by
1695 	 * asynchronous send_msg_*() functions, thus before unmapping devices
1696 	 * RTRS session must be explicitly closed.
1697 	 */
1698 
1699 	list_for_each_entry_safe(sess, sn, &sess_list, list) {
1700 		if (!rnbd_clt_get_sess(sess))
1701 			continue;
1702 		close_rtrs(sess);
1703 		list_for_each_entry_safe(dev, tn, &sess->devs_list, list) {
1704 			/*
1705 			 * Here unmap happens in parallel for only one reason:
1706 			 * blk_cleanup_queue() takes around half a second, so
1707 			 * on huge amount of devices the whole module unload
1708 			 * procedure takes minutes.
1709 			 */
1710 			INIT_WORK(&dev->unmap_on_rmmod_work, unmap_device_work);
1711 			queue_work(system_long_wq, &dev->unmap_on_rmmod_work);
1712 		}
1713 		rnbd_clt_put_sess(sess);
1714 	}
1715 	/* Wait for all scheduled unmap works */
1716 	flush_workqueue(system_long_wq);
1717 	WARN_ON(!list_empty(&sess_list));
1718 }
1719 
1720 static int __init rnbd_client_init(void)
1721 {
1722 	int err = 0;
1723 
1724 	BUILD_BUG_ON(sizeof(struct rnbd_msg_hdr) != 4);
1725 	BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info) != 36);
1726 	BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info_rsp) != 36);
1727 	BUILD_BUG_ON(sizeof(struct rnbd_msg_open) != 264);
1728 	BUILD_BUG_ON(sizeof(struct rnbd_msg_close) != 8);
1729 	BUILD_BUG_ON(sizeof(struct rnbd_msg_open_rsp) != 56);
1730 	rnbd_client_major = register_blkdev(rnbd_client_major, "rnbd");
1731 	if (rnbd_client_major <= 0) {
1732 		pr_err("Failed to load module, block device registration failed\n");
1733 		return -EBUSY;
1734 	}
1735 
1736 	err = rnbd_clt_create_sysfs_files();
1737 	if (err) {
1738 		pr_err("Failed to load module, creating sysfs device files failed, err: %d\n",
1739 		       err);
1740 		unregister_blkdev(rnbd_client_major, "rnbd");
1741 	}
1742 
1743 	return err;
1744 }
1745 
1746 static void __exit rnbd_client_exit(void)
1747 {
1748 	rnbd_destroy_sessions();
1749 	unregister_blkdev(rnbd_client_major, "rnbd");
1750 	ida_destroy(&index_ida);
1751 }
1752 
1753 module_init(rnbd_client_init);
1754 module_exit(rnbd_client_exit);
1755