xref: /freebsd/sys/cam/ctl/ctl_backend_block.c (revision b1d046441de9053152c7cf03d6b60d9882687e1b)
1 /*-
2  * Copyright (c) 2003 Silicon Graphics International Corp.
3  * Copyright (c) 2009-2011 Spectra Logic Corporation
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions, and the following disclaimer,
11  *    without modification.
12  * 2. Redistributions in binary form must reproduce at minimum a disclaimer
13  *    substantially similar to the "NO WARRANTY" disclaimer below
14  *    ("Disclaimer") and any redistribution must be conditioned upon
15  *    including a substantially similar Disclaimer requirement for further
16  *    binary redistribution.
17  *
18  * NO WARRANTY
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTIBILITY AND FITNESS FOR
22  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23  * HOLDERS OR CONTRIBUTORS BE LIABLE FOR SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
25  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
26  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
27  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
28  * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGES.
30  *
31  * $Id: //depot/users/kenm/FreeBSD-test2/sys/cam/ctl/ctl_backend_block.c#5 $
32  */
33 /*
34  * CAM Target Layer driver backend for block devices.
35  *
36  * Author: Ken Merry <ken@FreeBSD.org>
37  */
38 #include <sys/cdefs.h>
39 __FBSDID("$FreeBSD$");
40 
41 #include <opt_kdtrace.h>
42 
43 #include <sys/param.h>
44 #include <sys/systm.h>
45 #include <sys/kernel.h>
46 #include <sys/types.h>
47 #include <sys/kthread.h>
48 #include <sys/bio.h>
49 #include <sys/fcntl.h>
50 #include <sys/lock.h>
51 #include <sys/mutex.h>
52 #include <sys/condvar.h>
53 #include <sys/malloc.h>
54 #include <sys/conf.h>
55 #include <sys/ioccom.h>
56 #include <sys/queue.h>
57 #include <sys/sbuf.h>
58 #include <sys/endian.h>
59 #include <sys/uio.h>
60 #include <sys/buf.h>
61 #include <sys/taskqueue.h>
62 #include <sys/vnode.h>
63 #include <sys/namei.h>
64 #include <sys/mount.h>
65 #include <sys/disk.h>
66 #include <sys/fcntl.h>
67 #include <sys/filedesc.h>
68 #include <sys/proc.h>
69 #include <sys/pcpu.h>
70 #include <sys/module.h>
71 #include <sys/sdt.h>
72 #include <sys/devicestat.h>
73 #include <sys/sysctl.h>
74 
75 #include <geom/geom.h>
76 
77 #include <cam/cam.h>
78 #include <cam/scsi/scsi_all.h>
79 #include <cam/scsi/scsi_da.h>
80 #include <cam/ctl/ctl_io.h>
81 #include <cam/ctl/ctl.h>
82 #include <cam/ctl/ctl_backend.h>
83 #include <cam/ctl/ctl_frontend_internal.h>
84 #include <cam/ctl/ctl_ioctl.h>
85 #include <cam/ctl/ctl_scsi_all.h>
86 #include <cam/ctl/ctl_error.h>
87 
88 /*
89  * The idea here is that we'll allocate enough S/G space to hold a 16MB
90  * I/O.  If we get an I/O larger than that, we'll reject it.
91  */
92 #define	CTLBLK_MAX_IO_SIZE	(16 * 1024 * 1024)
93 #define	CTLBLK_MAX_SEGS		(CTLBLK_MAX_IO_SIZE / MAXPHYS) + 1
94 
95 #ifdef CTLBLK_DEBUG
96 #define DPRINTF(fmt, args...) \
97     printf("cbb(%s:%d): " fmt, __FUNCTION__, __LINE__, ##args)
98 #else
99 #define DPRINTF(fmt, args...) do {} while(0)
100 #endif
101 
102 SDT_PROVIDER_DEFINE(cbb);
103 
104 typedef enum {
105 	CTL_BE_BLOCK_LUN_UNCONFIGURED	= 0x01,
106 	CTL_BE_BLOCK_LUN_CONFIG_ERR	= 0x02,
107 	CTL_BE_BLOCK_LUN_WAITING	= 0x04,
108 	CTL_BE_BLOCK_LUN_MULTI_THREAD	= 0x08
109 } ctl_be_block_lun_flags;
110 
111 typedef enum {
112 	CTL_BE_BLOCK_NONE,
113 	CTL_BE_BLOCK_DEV,
114 	CTL_BE_BLOCK_FILE
115 } ctl_be_block_type;
116 
117 struct ctl_be_block_devdata {
118 	struct cdev *cdev;
119 	struct cdevsw *csw;
120 	int dev_ref;
121 };
122 
123 struct ctl_be_block_filedata {
124 	struct ucred *cred;
125 };
126 
127 union ctl_be_block_bedata {
128 	struct ctl_be_block_devdata dev;
129 	struct ctl_be_block_filedata file;
130 };
131 
132 struct ctl_be_block_io;
133 struct ctl_be_block_lun;
134 
135 typedef void (*cbb_dispatch_t)(struct ctl_be_block_lun *be_lun,
136 			       struct ctl_be_block_io *beio);
137 
138 /*
139  * Backend LUN structure.  There is a 1:1 mapping between a block device
140  * and a backend block LUN, and between a backend block LUN and a CTL LUN.
141  */
142 struct ctl_be_block_lun {
143 	struct ctl_block_disk *disk;
144 	char lunname[32];
145 	char *dev_path;
146 	ctl_be_block_type dev_type;
147 	struct vnode *vn;
148 	union ctl_be_block_bedata backend;
149 	cbb_dispatch_t dispatch;
150 	cbb_dispatch_t lun_flush;
151 	struct mtx lock;
152 	uma_zone_t lun_zone;
153 	uint64_t size_blocks;
154 	uint64_t size_bytes;
155 	uint32_t blocksize;
156 	int blocksize_shift;
157 	struct ctl_be_block_softc *softc;
158 	struct devstat *disk_stats;
159 	ctl_be_block_lun_flags flags;
160 	STAILQ_ENTRY(ctl_be_block_lun) links;
161 	struct ctl_be_lun ctl_be_lun;
162 	struct taskqueue *io_taskqueue;
163 	struct task io_task;
164 	int num_threads;
165 	STAILQ_HEAD(, ctl_io_hdr) input_queue;
166 	STAILQ_HEAD(, ctl_io_hdr) config_write_queue;
167 	STAILQ_HEAD(, ctl_io_hdr) datamove_queue;
168 };
169 
170 /*
171  * Overall softc structure for the block backend module.
172  */
173 struct ctl_be_block_softc {
174 	STAILQ_HEAD(, ctl_be_block_io)   beio_free_queue;
175 	struct mtx			 lock;
176 	int				 prealloc_beio;
177 	int				 num_disks;
178 	STAILQ_HEAD(, ctl_block_disk)	 disk_list;
179 	int				 num_luns;
180 	STAILQ_HEAD(, ctl_be_block_lun)	 lun_list;
181 };
182 
183 static struct ctl_be_block_softc backend_block_softc;
184 
185 /*
186  * Per-I/O information.
187  */
188 struct ctl_be_block_io {
189 	union ctl_io			*io;
190 	struct ctl_sg_entry		sg_segs[CTLBLK_MAX_SEGS];
191 	struct iovec			xiovecs[CTLBLK_MAX_SEGS];
192 	int				bio_cmd;
193 	int				bio_flags;
194 	int				num_segs;
195 	int				num_bios_sent;
196 	int				num_bios_done;
197 	int				send_complete;
198 	int				num_errors;
199 	struct bintime			ds_t0;
200 	devstat_tag_type		ds_tag_type;
201 	devstat_trans_flags		ds_trans_type;
202 	uint64_t			io_len;
203 	uint64_t			io_offset;
204 	struct ctl_be_block_softc	*softc;
205 	struct ctl_be_block_lun		*lun;
206 	STAILQ_ENTRY(ctl_be_block_io)	links;
207 };
208 
209 static int cbb_num_threads = 14;
210 TUNABLE_INT("kern.cam.ctl.block.num_threads", &cbb_num_threads);
211 SYSCTL_NODE(_kern_cam_ctl, OID_AUTO, block, CTLFLAG_RD, 0,
212 	    "CAM Target Layer Block Backend");
213 SYSCTL_INT(_kern_cam_ctl_block, OID_AUTO, num_threads, CTLFLAG_RW,
214            &cbb_num_threads, 0, "Number of threads per backing file");
215 
216 static struct ctl_be_block_io *ctl_alloc_beio(struct ctl_be_block_softc *softc);
217 static void ctl_free_beio(struct ctl_be_block_io *beio);
218 static int ctl_grow_beio(struct ctl_be_block_softc *softc, int count);
219 #if 0
220 static void ctl_shrink_beio(struct ctl_be_block_softc *softc);
221 #endif
222 static void ctl_complete_beio(struct ctl_be_block_io *beio);
223 static int ctl_be_block_move_done(union ctl_io *io);
224 static void ctl_be_block_biodone(struct bio *bio);
225 static void ctl_be_block_flush_file(struct ctl_be_block_lun *be_lun,
226 				    struct ctl_be_block_io *beio);
227 static void ctl_be_block_dispatch_file(struct ctl_be_block_lun *be_lun,
228 				       struct ctl_be_block_io *beio);
229 static void ctl_be_block_flush_dev(struct ctl_be_block_lun *be_lun,
230 				   struct ctl_be_block_io *beio);
231 static void ctl_be_block_dispatch_dev(struct ctl_be_block_lun *be_lun,
232 				      struct ctl_be_block_io *beio);
233 static void ctl_be_block_cw_dispatch(struct ctl_be_block_lun *be_lun,
234 				    union ctl_io *io);
235 static void ctl_be_block_dispatch(struct ctl_be_block_lun *be_lun,
236 				  union ctl_io *io);
237 static void ctl_be_block_worker(void *context, int pending);
238 static int ctl_be_block_submit(union ctl_io *io);
239 static int ctl_be_block_ioctl(struct cdev *dev, u_long cmd, caddr_t addr,
240 				   int flag, struct thread *td);
241 static int ctl_be_block_open_file(struct ctl_be_block_lun *be_lun,
242 				  struct ctl_lun_req *req);
243 static int ctl_be_block_open_dev(struct ctl_be_block_lun *be_lun,
244 				 struct ctl_lun_req *req);
245 static int ctl_be_block_close(struct ctl_be_block_lun *be_lun);
246 static int ctl_be_block_open(struct ctl_be_block_softc *softc,
247 			     struct ctl_be_block_lun *be_lun,
248 			     struct ctl_lun_req *req);
249 static int ctl_be_block_create(struct ctl_be_block_softc *softc,
250 			       struct ctl_lun_req *req);
251 static int ctl_be_block_rm(struct ctl_be_block_softc *softc,
252 			   struct ctl_lun_req *req);
253 static void ctl_be_block_lun_shutdown(void *be_lun);
254 static void ctl_be_block_lun_config_status(void *be_lun,
255 					   ctl_lun_config_status status);
256 static int ctl_be_block_config_write(union ctl_io *io);
257 static int ctl_be_block_config_read(union ctl_io *io);
258 static int ctl_be_block_lun_info(void *be_lun, struct sbuf *sb);
259 int ctl_be_block_init(void);
260 
261 static struct ctl_backend_driver ctl_be_block_driver =
262 {
263 	name: "block",
264 	flags: CTL_BE_FLAG_HAS_CONFIG,
265 	init: ctl_be_block_init,
266 	data_submit: ctl_be_block_submit,
267 	data_move_done: ctl_be_block_move_done,
268 	config_read: ctl_be_block_config_read,
269 	config_write: ctl_be_block_config_write,
270 	ioctl: ctl_be_block_ioctl,
271 	lun_info: ctl_be_block_lun_info
272 };
273 
274 MALLOC_DEFINE(M_CTLBLK, "ctlblk", "Memory used for CTL block backend");
275 CTL_BACKEND_DECLARE(cbb, ctl_be_block_driver);
276 
277 static struct ctl_be_block_io *
278 ctl_alloc_beio(struct ctl_be_block_softc *softc)
279 {
280 	struct ctl_be_block_io *beio;
281 	int count;
282 
283 	mtx_lock(&softc->lock);
284 
285 	beio = STAILQ_FIRST(&softc->beio_free_queue);
286 	if (beio != NULL) {
287 		STAILQ_REMOVE(&softc->beio_free_queue, beio,
288 			      ctl_be_block_io, links);
289 	}
290 	mtx_unlock(&softc->lock);
291 
292 	if (beio != NULL) {
293 		bzero(beio, sizeof(*beio));
294 		beio->softc = softc;
295 		return (beio);
296 	}
297 
298 	for (;;) {
299 
300 		count = ctl_grow_beio(softc, /*count*/ 10);
301 
302 		/*
303 		 * This shouldn't be possible, since ctl_grow_beio() uses a
304 		 * blocking malloc.
305 		 */
306 		if (count == 0)
307 			return (NULL);
308 
309 		/*
310 		 * Since we have to drop the lock when we're allocating beio
311 		 * structures, it's possible someone else can come along and
312 		 * allocate the beio's we've just allocated.
313 		 */
314 		mtx_lock(&softc->lock);
315 		beio = STAILQ_FIRST(&softc->beio_free_queue);
316 		if (beio != NULL) {
317 			STAILQ_REMOVE(&softc->beio_free_queue, beio,
318 				      ctl_be_block_io, links);
319 		}
320 		mtx_unlock(&softc->lock);
321 
322 		if (beio != NULL) {
323 			bzero(beio, sizeof(*beio));
324 			beio->softc = softc;
325 			break;
326 		}
327 	}
328 	return (beio);
329 }
330 
331 static void
332 ctl_free_beio(struct ctl_be_block_io *beio)
333 {
334 	struct ctl_be_block_softc *softc;
335 	int duplicate_free;
336 	int i;
337 
338 	softc = beio->softc;
339 	duplicate_free = 0;
340 
341 	for (i = 0; i < beio->num_segs; i++) {
342 		if (beio->sg_segs[i].addr == NULL)
343 			duplicate_free++;
344 
345 		uma_zfree(beio->lun->lun_zone, beio->sg_segs[i].addr);
346 		beio->sg_segs[i].addr = NULL;
347 	}
348 
349 	if (duplicate_free > 0) {
350 		printf("%s: %d duplicate frees out of %d segments\n", __func__,
351 		       duplicate_free, beio->num_segs);
352 	}
353 	mtx_lock(&softc->lock);
354 	STAILQ_INSERT_TAIL(&softc->beio_free_queue, beio, links);
355 	mtx_unlock(&softc->lock);
356 }
357 
358 static int
359 ctl_grow_beio(struct ctl_be_block_softc *softc, int count)
360 {
361 	int i;
362 
363 	for (i = 0; i < count; i++) {
364 		struct ctl_be_block_io *beio;
365 
366 		beio = (struct ctl_be_block_io *)malloc(sizeof(*beio),
367 							   M_CTLBLK,
368 							   M_WAITOK | M_ZERO);
369 		if (beio == NULL)
370 			break;
371 
372 		bzero(beio, sizeof(*beio));
373 		beio->softc = softc;
374 		mtx_lock(&softc->lock);
375 		STAILQ_INSERT_TAIL(&softc->beio_free_queue, beio, links);
376 		mtx_unlock(&softc->lock);
377 	}
378 
379 	return (i);
380 }
381 
382 #if 0
383 static void
384 ctl_shrink_beio(struct ctl_be_block_softc *softc)
385 {
386 	struct ctl_be_block_io *beio, *beio_tmp;
387 
388 	mtx_lock(&softc->lock);
389 	STAILQ_FOREACH_SAFE(beio, &softc->beio_free_queue, links, beio_tmp) {
390 		STAILQ_REMOVE(&softc->beio_free_queue, beio,
391 			      ctl_be_block_io, links);
392 		free(beio, M_CTLBLK);
393 	}
394 	mtx_unlock(&softc->lock);
395 }
396 #endif
397 
398 static void
399 ctl_complete_beio(struct ctl_be_block_io *beio)
400 {
401 	union ctl_io *io;
402 	int io_len;
403 
404 	io = beio->io;
405 
406 	if ((io->io_hdr.status & CTL_STATUS_MASK) == CTL_SUCCESS)
407 		io_len = beio->io_len;
408 	else
409 		io_len = 0;
410 
411 	devstat_end_transaction(beio->lun->disk_stats,
412 				/*bytes*/ io_len,
413 				beio->ds_tag_type,
414 				beio->ds_trans_type,
415 				/*now*/ NULL,
416 				/*then*/&beio->ds_t0);
417 
418 	ctl_free_beio(beio);
419 	ctl_done(io);
420 }
421 
422 static int
423 ctl_be_block_move_done(union ctl_io *io)
424 {
425 	struct ctl_be_block_io *beio;
426 	struct ctl_be_block_lun *be_lun;
427 #ifdef CTL_TIME_IO
428 	struct bintime cur_bt;
429 #endif
430 
431 	beio = (struct ctl_be_block_io *)
432 		io->io_hdr.ctl_private[CTL_PRIV_BACKEND].ptr;
433 
434 	be_lun = beio->lun;
435 
436 	DPRINTF("entered\n");
437 
438 #ifdef CTL_TIME_IO
439 	getbintime(&cur_bt);
440 	bintime_sub(&cur_bt, &io->io_hdr.dma_start_bt);
441 	bintime_add(&io->io_hdr.dma_bt, &cur_bt);
442 	io->io_hdr.num_dmas++;
443 #endif
444 
445 	/*
446 	 * We set status at this point for read commands, and write
447 	 * commands with errors.
448 	 */
449 	if ((beio->bio_cmd == BIO_READ)
450 	 && (io->io_hdr.port_status == 0)
451 	 && ((io->io_hdr.flags & CTL_FLAG_ABORT) == 0)
452 	 && ((io->io_hdr.status & CTL_STATUS_MASK) == CTL_STATUS_NONE))
453 		ctl_set_success(&io->scsiio);
454 	else if ((io->io_hdr.port_status != 0)
455 	      && ((io->io_hdr.flags & CTL_FLAG_ABORT) == 0)
456 	      && ((io->io_hdr.status & CTL_STATUS_MASK) == CTL_STATUS_NONE)) {
457 		/*
458 		 * For hardware error sense keys, the sense key
459 		 * specific value is defined to be a retry count,
460 		 * but we use it to pass back an internal FETD
461 		 * error code.  XXX KDM  Hopefully the FETD is only
462 		 * using 16 bits for an error code, since that's
463 		 * all the space we have in the sks field.
464 		 */
465 		ctl_set_internal_failure(&io->scsiio,
466 					 /*sks_valid*/ 1,
467 					 /*retry_count*/
468 					 io->io_hdr.port_status);
469 	}
470 
471 	/*
472 	 * If this is a read, or a write with errors, it is done.
473 	 */
474 	if ((beio->bio_cmd == BIO_READ)
475 	 || ((io->io_hdr.flags & CTL_FLAG_ABORT) != 0)
476 	 || ((io->io_hdr.status & CTL_STATUS_MASK) != CTL_STATUS_NONE)) {
477 		ctl_complete_beio(beio);
478 		return (0);
479 	}
480 
481 	/*
482 	 * At this point, we have a write and the DMA completed
483 	 * successfully.  We now have to queue it to the task queue to
484 	 * execute the backend I/O.  That is because we do blocking
485 	 * memory allocations, and in the file backing case, blocking I/O.
486 	 * This move done routine is generally called in the SIM's
487 	 * interrupt context, and therefore we cannot block.
488 	 */
489 	mtx_lock(&be_lun->lock);
490 	/*
491 	 * XXX KDM make sure that links is okay to use at this point.
492 	 * Otherwise, we either need to add another field to ctl_io_hdr,
493 	 * or deal with resource allocation here.
494 	 */
495 	STAILQ_INSERT_TAIL(&be_lun->datamove_queue, &io->io_hdr, links);
496 	mtx_unlock(&be_lun->lock);
497 
498 	taskqueue_enqueue(be_lun->io_taskqueue, &be_lun->io_task);
499 
500 	return (0);
501 }
502 
503 static void
504 ctl_be_block_biodone(struct bio *bio)
505 {
506 	struct ctl_be_block_io *beio;
507 	struct ctl_be_block_lun *be_lun;
508 	union ctl_io *io;
509 
510 	beio = bio->bio_caller1;
511 	be_lun = beio->lun;
512 	io = beio->io;
513 
514 	DPRINTF("entered\n");
515 
516 	mtx_lock(&be_lun->lock);
517 	if (bio->bio_error != 0)
518 		beio->num_errors++;
519 
520 	beio->num_bios_done++;
521 
522 	/*
523 	 * XXX KDM will this cause WITNESS to complain?  Holding a lock
524 	 * during the free might cause it to complain.
525 	 */
526 	g_destroy_bio(bio);
527 
528 	/*
529 	 * If the send complete bit isn't set, or we aren't the last I/O to
530 	 * complete, then we're done.
531 	 */
532 	if ((beio->send_complete == 0)
533 	 || (beio->num_bios_done < beio->num_bios_sent)) {
534 		mtx_unlock(&be_lun->lock);
535 		return;
536 	}
537 
538 	/*
539 	 * At this point, we've verified that we are the last I/O to
540 	 * complete, so it's safe to drop the lock.
541 	 */
542 	mtx_unlock(&be_lun->lock);
543 
544 	/*
545 	 * If there are any errors from the backing device, we fail the
546 	 * entire I/O with a medium error.
547 	 */
548 	if (beio->num_errors > 0) {
549 		if (beio->bio_cmd == BIO_FLUSH) {
550 			/* XXX KDM is there is a better error here? */
551 			ctl_set_internal_failure(&io->scsiio,
552 						 /*sks_valid*/ 1,
553 						 /*retry_count*/ 0xbad2);
554 		} else
555 			ctl_set_medium_error(&io->scsiio);
556 		ctl_complete_beio(beio);
557 		return;
558 	}
559 
560 	/*
561 	 * If this is a write or a flush, we're all done.
562 	 * If this is a read, we can now send the data to the user.
563 	 */
564 	if ((beio->bio_cmd == BIO_WRITE)
565 	 || (beio->bio_cmd == BIO_FLUSH)) {
566 		ctl_set_success(&io->scsiio);
567 		ctl_complete_beio(beio);
568 	} else {
569 		io->scsiio.be_move_done = ctl_be_block_move_done;
570 		io->scsiio.kern_data_ptr = (uint8_t *)beio->sg_segs;
571 		io->scsiio.kern_data_len = beio->io_len;
572 		io->scsiio.kern_total_len = beio->io_len;
573 		io->scsiio.kern_rel_offset = 0;
574 		io->scsiio.kern_data_resid = 0;
575 		io->scsiio.kern_sg_entries = beio->num_segs;
576 		io->io_hdr.flags |= CTL_FLAG_ALLOCATED | CTL_FLAG_KDPTR_SGLIST;
577 #ifdef CTL_TIME_IO
578         	getbintime(&io->io_hdr.dma_start_bt);
579 #endif
580 		ctl_datamove(io);
581 	}
582 }
583 
584 static void
585 ctl_be_block_flush_file(struct ctl_be_block_lun *be_lun,
586 			struct ctl_be_block_io *beio)
587 {
588 	union ctl_io *io;
589 	struct mount *mountpoint;
590 	int vfs_is_locked, error, lock_flags;
591 
592 	DPRINTF("entered\n");
593 
594 	io = beio->io;
595 
596 	vfs_is_locked = VFS_LOCK_GIANT(be_lun->vn->v_mount);
597 
598        	(void) vn_start_write(be_lun->vn, &mountpoint, V_WAIT);
599 
600 	if (MNT_SHARED_WRITES(mountpoint)
601 	 || ((mountpoint == NULL)
602 	  && MNT_SHARED_WRITES(be_lun->vn->v_mount)))
603 		lock_flags = LK_SHARED;
604 	else
605 		lock_flags = LK_EXCLUSIVE;
606 
607 	vn_lock(be_lun->vn, lock_flags | LK_RETRY);
608 
609 	binuptime(&beio->ds_t0);
610 	devstat_start_transaction(beio->lun->disk_stats, &beio->ds_t0);
611 
612 	error = VOP_FSYNC(be_lun->vn, MNT_WAIT, curthread);
613 	VOP_UNLOCK(be_lun->vn, 0);
614 
615 	vn_finished_write(mountpoint);
616 
617 	VFS_UNLOCK_GIANT(vfs_is_locked);
618 
619 	if (error == 0)
620 		ctl_set_success(&io->scsiio);
621 	else {
622 		/* XXX KDM is there is a better error here? */
623 		ctl_set_internal_failure(&io->scsiio,
624 					 /*sks_valid*/ 1,
625 					 /*retry_count*/ 0xbad1);
626 	}
627 
628 	ctl_complete_beio(beio);
629 }
630 
631 SDT_PROBE_DEFINE1(cbb, kernel, read, file_start, file_start, "uint64_t");
632 SDT_PROBE_DEFINE1(cbb, kernel, write, file_start, file_start, "uint64_t");
633 SDT_PROBE_DEFINE1(cbb, kernel, read, file_done, file_done,"uint64_t");
634 SDT_PROBE_DEFINE1(cbb, kernel, write, file_done, file_done, "uint64_t");
635 
636 static void
637 ctl_be_block_dispatch_file(struct ctl_be_block_lun *be_lun,
638 			   struct ctl_be_block_io *beio)
639 {
640 	struct ctl_be_block_filedata *file_data;
641 	union ctl_io *io;
642 	struct uio xuio;
643 	struct iovec *xiovec;
644 	int vfs_is_locked, flags;
645 	int error, i;
646 
647 	DPRINTF("entered\n");
648 
649 	file_data = &be_lun->backend.file;
650 	io = beio->io;
651 	flags = beio->bio_flags;
652 
653 	if (beio->bio_cmd == BIO_READ) {
654 		SDT_PROBE(cbb, kernel, read, file_start, 0, 0, 0, 0, 0);
655 	} else {
656 		SDT_PROBE(cbb, kernel, write, file_start, 0, 0, 0, 0, 0);
657 	}
658 
659 	bzero(&xuio, sizeof(xuio));
660 	if (beio->bio_cmd == BIO_READ)
661 		xuio.uio_rw = UIO_READ;
662 	else
663 		xuio.uio_rw = UIO_WRITE;
664 
665 	xuio.uio_offset = beio->io_offset;
666 	xuio.uio_resid = beio->io_len;
667 	xuio.uio_segflg = UIO_SYSSPACE;
668 	xuio.uio_iov = beio->xiovecs;
669 	xuio.uio_iovcnt = beio->num_segs;
670 	xuio.uio_td = curthread;
671 
672 	for (i = 0, xiovec = xuio.uio_iov; i < xuio.uio_iovcnt; i++, xiovec++) {
673 		xiovec->iov_base = beio->sg_segs[i].addr;
674 		xiovec->iov_len = beio->sg_segs[i].len;
675 	}
676 
677 	vfs_is_locked = VFS_LOCK_GIANT(be_lun->vn->v_mount);
678 	if (beio->bio_cmd == BIO_READ) {
679 		vn_lock(be_lun->vn, LK_SHARED | LK_RETRY);
680 
681 		binuptime(&beio->ds_t0);
682 		devstat_start_transaction(beio->lun->disk_stats, &beio->ds_t0);
683 
684 		/*
685 		 * UFS pays attention to IO_DIRECT for reads.  If the
686 		 * DIRECTIO option is configured into the kernel, it calls
687 		 * ffs_rawread().  But that only works for single-segment
688 		 * uios with user space addresses.  In our case, with a
689 		 * kernel uio, it still reads into the buffer cache, but it
690 		 * will just try to release the buffer from the cache later
691 		 * on in ffs_read().
692 		 *
693 		 * ZFS does not pay attention to IO_DIRECT for reads.
694 		 *
695 		 * UFS does not pay attention to IO_SYNC for reads.
696 		 *
697 		 * ZFS pays attention to IO_SYNC (which translates into the
698 		 * Solaris define FRSYNC for zfs_read()) for reads.  It
699 		 * attempts to sync the file before reading.
700 		 *
701 		 * So, to attempt to provide some barrier semantics in the
702 		 * BIO_ORDERED case, set both IO_DIRECT and IO_SYNC.
703 		 */
704 		error = VOP_READ(be_lun->vn, &xuio, (flags & BIO_ORDERED) ?
705 				 (IO_DIRECT|IO_SYNC) : 0, file_data->cred);
706 
707 		VOP_UNLOCK(be_lun->vn, 0);
708 	} else {
709 		struct mount *mountpoint;
710 		int lock_flags;
711 
712 		(void)vn_start_write(be_lun->vn, &mountpoint, V_WAIT);
713 
714 		if (MNT_SHARED_WRITES(mountpoint)
715 		 || ((mountpoint == NULL)
716 		  && MNT_SHARED_WRITES(be_lun->vn->v_mount)))
717 			lock_flags = LK_SHARED;
718 		else
719 			lock_flags = LK_EXCLUSIVE;
720 
721 		vn_lock(be_lun->vn, lock_flags | LK_RETRY);
722 
723 		binuptime(&beio->ds_t0);
724 		devstat_start_transaction(beio->lun->disk_stats, &beio->ds_t0);
725 
726 		/*
727 		 * UFS pays attention to IO_DIRECT for writes.  The write
728 		 * is done asynchronously.  (Normally the write would just
729 		 * get put into cache.
730 		 *
731 		 * UFS pays attention to IO_SYNC for writes.  It will
732 		 * attempt to write the buffer out synchronously if that
733 		 * flag is set.
734 		 *
735 		 * ZFS does not pay attention to IO_DIRECT for writes.
736 		 *
737 		 * ZFS pays attention to IO_SYNC (a.k.a. FSYNC or FRSYNC)
738 		 * for writes.  It will flush the transaction from the
739 		 * cache before returning.
740 		 *
741 		 * So if we've got the BIO_ORDERED flag set, we want
742 		 * IO_SYNC in either the UFS or ZFS case.
743 		 */
744 		error = VOP_WRITE(be_lun->vn, &xuio, (flags & BIO_ORDERED) ?
745 				  IO_SYNC : 0, file_data->cred);
746 		VOP_UNLOCK(be_lun->vn, 0);
747 
748 		vn_finished_write(mountpoint);
749         }
750         VFS_UNLOCK_GIANT(vfs_is_locked);
751 
752 	/*
753 	 * If we got an error, set the sense data to "MEDIUM ERROR" and
754 	 * return the I/O to the user.
755 	 */
756 	if (error != 0) {
757 		char path_str[32];
758 
759 		ctl_scsi_path_string(io, path_str, sizeof(path_str));
760 		/*
761 		 * XXX KDM ZFS returns ENOSPC when the underlying
762 		 * filesystem fills up.  What kind of SCSI error should we
763 		 * return for that?
764 		 */
765 		printf("%s%s command returned errno %d\n", path_str,
766 		       (beio->bio_cmd == BIO_READ) ? "READ" : "WRITE", error);
767 		ctl_set_medium_error(&io->scsiio);
768 		ctl_complete_beio(beio);
769 		return;
770 	}
771 
772 	/*
773 	 * If this is a write, we're all done.
774 	 * If this is a read, we can now send the data to the user.
775 	 */
776 	if (beio->bio_cmd == BIO_WRITE) {
777 		ctl_set_success(&io->scsiio);
778 		SDT_PROBE(cbb, kernel, write, file_done, 0, 0, 0, 0, 0);
779 		ctl_complete_beio(beio);
780 	} else {
781 		SDT_PROBE(cbb, kernel, read, file_done, 0, 0, 0, 0, 0);
782 		io->scsiio.be_move_done = ctl_be_block_move_done;
783 		io->scsiio.kern_data_ptr = (uint8_t *)beio->sg_segs;
784 		io->scsiio.kern_data_len = beio->io_len;
785 		io->scsiio.kern_total_len = beio->io_len;
786 		io->scsiio.kern_rel_offset = 0;
787 		io->scsiio.kern_data_resid = 0;
788 		io->scsiio.kern_sg_entries = beio->num_segs;
789 		io->io_hdr.flags |= CTL_FLAG_ALLOCATED | CTL_FLAG_KDPTR_SGLIST;
790 #ifdef CTL_TIME_IO
791         	getbintime(&io->io_hdr.dma_start_bt);
792 #endif
793 		ctl_datamove(io);
794 	}
795 }
796 
797 static void
798 ctl_be_block_flush_dev(struct ctl_be_block_lun *be_lun,
799 		       struct ctl_be_block_io *beio)
800 {
801 	struct bio *bio;
802 	union ctl_io *io;
803 	struct ctl_be_block_devdata *dev_data;
804 
805 	dev_data = &be_lun->backend.dev;
806 	io = beio->io;
807 
808 	DPRINTF("entered\n");
809 
810 	/* This can't fail, it's a blocking allocation. */
811 	bio = g_alloc_bio();
812 
813 	bio->bio_cmd	    = BIO_FLUSH;
814 	bio->bio_flags	   |= BIO_ORDERED;
815 	bio->bio_dev	    = dev_data->cdev;
816 	bio->bio_offset	    = 0;
817 	bio->bio_data	    = 0;
818 	bio->bio_done	    = ctl_be_block_biodone;
819 	bio->bio_caller1    = beio;
820 	bio->bio_pblkno	    = 0;
821 
822 	/*
823 	 * We don't need to acquire the LUN lock here, because we are only
824 	 * sending one bio, and so there is no other context to synchronize
825 	 * with.
826 	 */
827 	beio->num_bios_sent = 1;
828 	beio->send_complete = 1;
829 
830 	binuptime(&beio->ds_t0);
831 	devstat_start_transaction(be_lun->disk_stats, &beio->ds_t0);
832 
833 	(*dev_data->csw->d_strategy)(bio);
834 }
835 
836 static void
837 ctl_be_block_dispatch_dev(struct ctl_be_block_lun *be_lun,
838 			  struct ctl_be_block_io *beio)
839 {
840 	int i;
841 	struct bio *bio;
842 	struct ctl_be_block_devdata *dev_data;
843 	off_t cur_offset;
844 	int max_iosize;
845 
846 	DPRINTF("entered\n");
847 
848 	dev_data = &be_lun->backend.dev;
849 
850 	/*
851 	 * We have to limit our I/O size to the maximum supported by the
852 	 * backend device.  Hopefully it is MAXPHYS.  If the driver doesn't
853 	 * set it properly, use DFLTPHYS.
854 	 */
855 	max_iosize = dev_data->cdev->si_iosize_max;
856 	if (max_iosize < PAGE_SIZE)
857 		max_iosize = DFLTPHYS;
858 
859 	cur_offset = beio->io_offset;
860 
861 	/*
862 	 * XXX KDM need to accurately reflect the number of I/Os outstanding
863 	 * to a device.
864 	 */
865 	binuptime(&beio->ds_t0);
866 	devstat_start_transaction(be_lun->disk_stats, &beio->ds_t0);
867 
868 	for (i = 0; i < beio->num_segs; i++) {
869 		size_t cur_size;
870 		uint8_t *cur_ptr;
871 
872 		cur_size = beio->sg_segs[i].len;
873 		cur_ptr = beio->sg_segs[i].addr;
874 
875 		while (cur_size > 0) {
876 			/* This can't fail, it's a blocking allocation. */
877 			bio = g_alloc_bio();
878 
879 			KASSERT(bio != NULL, ("g_alloc_bio() failed!\n"));
880 
881 			bio->bio_cmd = beio->bio_cmd;
882 			bio->bio_flags |= beio->bio_flags;
883 			bio->bio_dev = dev_data->cdev;
884 			bio->bio_caller1 = beio;
885 			bio->bio_length = min(cur_size, max_iosize);
886 			bio->bio_offset = cur_offset;
887 			bio->bio_data = cur_ptr;
888 			bio->bio_done = ctl_be_block_biodone;
889 			bio->bio_pblkno = cur_offset / be_lun->blocksize;
890 
891 			cur_offset += bio->bio_length;
892 			cur_ptr += bio->bio_length;
893 			cur_size -= bio->bio_length;
894 
895 			/*
896 			 * Make sure we set the complete bit just before we
897 			 * issue the last bio so we don't wind up with a
898 			 * race.
899 			 *
900 			 * Use the LUN mutex here instead of a combination
901 			 * of atomic variables for simplicity.
902 			 *
903 			 * XXX KDM we could have a per-IO lock, but that
904 			 * would cause additional per-IO setup and teardown
905 			 * overhead.  Hopefully there won't be too much
906 			 * contention on the LUN lock.
907 			 */
908 			mtx_lock(&be_lun->lock);
909 
910 			beio->num_bios_sent++;
911 
912 			if ((i == beio->num_segs - 1)
913 			 && (cur_size == 0))
914 				beio->send_complete = 1;
915 
916 			mtx_unlock(&be_lun->lock);
917 
918 			(*dev_data->csw->d_strategy)(bio);
919 		}
920 	}
921 }
922 
923 static void
924 ctl_be_block_cw_dispatch(struct ctl_be_block_lun *be_lun,
925 			 union ctl_io *io)
926 {
927 	struct ctl_be_block_io *beio;
928 	struct ctl_be_block_softc *softc;
929 
930 	DPRINTF("entered\n");
931 
932 	softc = be_lun->softc;
933 	beio = ctl_alloc_beio(softc);
934 	if (beio == NULL) {
935 		/*
936 		 * This should not happen.  ctl_alloc_beio() will call
937 		 * ctl_grow_beio() with a blocking malloc as needed.
938 		 * A malloc with M_WAITOK should not fail.
939 		 */
940 		ctl_set_busy(&io->scsiio);
941 		ctl_done(io);
942 		return;
943 	}
944 
945 	beio->io = io;
946 	beio->softc = softc;
947 	beio->lun = be_lun;
948 	io->io_hdr.ctl_private[CTL_PRIV_BACKEND].ptr = beio;
949 
950 	switch (io->scsiio.cdb[0]) {
951 	case SYNCHRONIZE_CACHE:
952 	case SYNCHRONIZE_CACHE_16:
953 		beio->ds_trans_type = DEVSTAT_NO_DATA;
954 		beio->ds_tag_type = DEVSTAT_TAG_ORDERED;
955 		beio->io_len = 0;
956 		be_lun->lun_flush(be_lun, beio);
957 		break;
958 	default:
959 		panic("Unhandled CDB type %#x", io->scsiio.cdb[0]);
960 		break;
961 	}
962 }
963 
964 SDT_PROBE_DEFINE1(cbb, kernel, read, start, start, "uint64_t");
965 SDT_PROBE_DEFINE1(cbb, kernel, write, start, start, "uint64_t");
966 SDT_PROBE_DEFINE1(cbb, kernel, read, alloc_done, alloc_done, "uint64_t");
967 SDT_PROBE_DEFINE1(cbb, kernel, write, alloc_done, alloc_done, "uint64_t");
968 
969 static void
970 ctl_be_block_dispatch(struct ctl_be_block_lun *be_lun,
971 			   union ctl_io *io)
972 {
973 	struct ctl_be_block_io *beio;
974 	struct ctl_be_block_softc *softc;
975 	struct ctl_lba_len lbalen;
976 	uint64_t len_left, io_size_bytes;
977 	int i;
978 
979 	softc = be_lun->softc;
980 
981 	DPRINTF("entered\n");
982 
983 	if ((io->io_hdr.flags & CTL_FLAG_DATA_MASK) == CTL_FLAG_DATA_IN) {
984 		SDT_PROBE(cbb, kernel, read, start, 0, 0, 0, 0, 0);
985 	} else {
986 		SDT_PROBE(cbb, kernel, write, start, 0, 0, 0, 0, 0);
987 	}
988 
989 	memcpy(&lbalen, io->io_hdr.ctl_private[CTL_PRIV_LBA_LEN].bytes,
990 	       sizeof(lbalen));
991 
992 	io_size_bytes = lbalen.len * be_lun->blocksize;
993 
994 	/*
995 	 * XXX KDM this is temporary, until we implement chaining of beio
996 	 * structures and multiple datamove calls to move all the data in
997 	 * or out.
998 	 */
999 	if (io_size_bytes > CTLBLK_MAX_IO_SIZE) {
1000 		printf("%s: IO length %ju > max io size %u\n", __func__,
1001 		       io_size_bytes, CTLBLK_MAX_IO_SIZE);
1002 		ctl_set_invalid_field(&io->scsiio,
1003 				      /*sks_valid*/ 0,
1004 				      /*command*/ 1,
1005 				      /*field*/ 0,
1006 				      /*bit_valid*/ 0,
1007 				      /*bit*/ 0);
1008 		ctl_done(io);
1009 		return;
1010 	}
1011 
1012 	beio = ctl_alloc_beio(softc);
1013 	if (beio == NULL) {
1014 		/*
1015 		 * This should not happen.  ctl_alloc_beio() will call
1016 		 * ctl_grow_beio() with a blocking malloc as needed.
1017 		 * A malloc with M_WAITOK should not fail.
1018 		 */
1019 		ctl_set_busy(&io->scsiio);
1020 		ctl_done(io);
1021 		return;
1022 	}
1023 
1024 	beio->io = io;
1025 	beio->softc = softc;
1026 	beio->lun = be_lun;
1027 	io->io_hdr.ctl_private[CTL_PRIV_BACKEND].ptr = beio;
1028 
1029 	/*
1030 	 * If the I/O came down with an ordered or head of queue tag, set
1031 	 * the BIO_ORDERED attribute.  For head of queue tags, that's
1032 	 * pretty much the best we can do.
1033 	 *
1034 	 * XXX KDM we don't have a great way to easily know about the FUA
1035 	 * bit right now (it is decoded in ctl_read_write(), but we don't
1036 	 * pass that knowledge to the backend), and in any case we would
1037 	 * need to determine how to handle it.
1038 	 */
1039 	if ((io->scsiio.tag_type == CTL_TAG_ORDERED)
1040 	 || (io->scsiio.tag_type == CTL_TAG_HEAD_OF_QUEUE))
1041 		beio->bio_flags = BIO_ORDERED;
1042 
1043 	switch (io->scsiio.tag_type) {
1044 	case CTL_TAG_ORDERED:
1045 		beio->ds_tag_type = DEVSTAT_TAG_ORDERED;
1046 		break;
1047 	case CTL_TAG_HEAD_OF_QUEUE:
1048 		beio->ds_tag_type = DEVSTAT_TAG_HEAD;
1049 		break;
1050 	case CTL_TAG_UNTAGGED:
1051 	case CTL_TAG_SIMPLE:
1052 	case CTL_TAG_ACA:
1053 	default:
1054 		beio->ds_tag_type = DEVSTAT_TAG_SIMPLE;
1055 		break;
1056 	}
1057 
1058 	/*
1059 	 * This path handles read and write only.  The config write path
1060 	 * handles flush operations.
1061 	 */
1062 	if ((io->io_hdr.flags & CTL_FLAG_DATA_MASK) == CTL_FLAG_DATA_IN) {
1063 		beio->bio_cmd = BIO_READ;
1064 		beio->ds_trans_type = DEVSTAT_READ;
1065 	} else {
1066 		beio->bio_cmd = BIO_WRITE;
1067 		beio->ds_trans_type = DEVSTAT_WRITE;
1068 	}
1069 
1070 	beio->io_len = lbalen.len * be_lun->blocksize;
1071 	beio->io_offset = lbalen.lba * be_lun->blocksize;
1072 
1073 	DPRINTF("%s at LBA %jx len %u\n",
1074 	       (beio->bio_cmd == BIO_READ) ? "READ" : "WRITE",
1075 	       (uintmax_t)lbalen.lba, lbalen.len);
1076 
1077 	for (i = 0, len_left = io_size_bytes; i < CTLBLK_MAX_SEGS &&
1078 	     len_left > 0; i++) {
1079 
1080 		/*
1081 		 * Setup the S/G entry for this chunk.
1082 		 */
1083 		beio->sg_segs[i].len = min(MAXPHYS, len_left);
1084 		beio->sg_segs[i].addr = uma_zalloc(be_lun->lun_zone, M_WAITOK);
1085 		/*
1086 		 * uma_zalloc() can in theory return NULL even with M_WAITOK
1087 		 * if it can't pull more memory into the zone.
1088 		 */
1089 		if (beio->sg_segs[i].addr == NULL) {
1090 			ctl_set_busy(&io->scsiio);
1091 			ctl_complete_beio(beio);
1092 			return;
1093 		}
1094 
1095 		DPRINTF("segment %d addr %p len %zd\n", i,
1096 			beio->sg_segs[i].addr, beio->sg_segs[i].len);
1097 
1098 		beio->num_segs++;
1099 		len_left -= beio->sg_segs[i].len;
1100 	}
1101 
1102 	/*
1103 	 * For the read case, we need to read the data into our buffers and
1104 	 * then we can send it back to the user.  For the write case, we
1105 	 * need to get the data from the user first.
1106 	 */
1107 	if (beio->bio_cmd == BIO_READ) {
1108 		SDT_PROBE(cbb, kernel, read, alloc_done, 0, 0, 0, 0, 0);
1109 		be_lun->dispatch(be_lun, beio);
1110 	} else {
1111 		SDT_PROBE(cbb, kernel, write, alloc_done, 0, 0, 0, 0, 0);
1112 		io->scsiio.be_move_done = ctl_be_block_move_done;
1113 		io->scsiio.kern_data_ptr = (uint8_t *)beio->sg_segs;
1114 		io->scsiio.kern_data_len = beio->io_len;
1115 		io->scsiio.kern_total_len = beio->io_len;
1116 		io->scsiio.kern_rel_offset = 0;
1117 		io->scsiio.kern_data_resid = 0;
1118 		io->scsiio.kern_sg_entries = beio->num_segs;
1119 		io->io_hdr.flags |= CTL_FLAG_ALLOCATED | CTL_FLAG_KDPTR_SGLIST;
1120 #ifdef CTL_TIME_IO
1121         	getbintime(&io->io_hdr.dma_start_bt);
1122 #endif
1123 		ctl_datamove(io);
1124 	}
1125 }
1126 
1127 static void
1128 ctl_be_block_worker(void *context, int pending)
1129 {
1130 	struct ctl_be_block_lun *be_lun;
1131 	struct ctl_be_block_softc *softc;
1132 	union ctl_io *io;
1133 
1134 	be_lun = (struct ctl_be_block_lun *)context;
1135 	softc = be_lun->softc;
1136 
1137 	DPRINTF("entered\n");
1138 
1139 	mtx_lock(&be_lun->lock);
1140 	for (;;) {
1141 		io = (union ctl_io *)STAILQ_FIRST(&be_lun->datamove_queue);
1142 		if (io != NULL) {
1143 			struct ctl_be_block_io *beio;
1144 
1145 			DPRINTF("datamove queue\n");
1146 
1147 			STAILQ_REMOVE(&be_lun->datamove_queue, &io->io_hdr,
1148 				      ctl_io_hdr, links);
1149 
1150 			mtx_unlock(&be_lun->lock);
1151 
1152 			beio = (struct ctl_be_block_io *)
1153 			    io->io_hdr.ctl_private[CTL_PRIV_BACKEND].ptr;
1154 
1155 			be_lun->dispatch(be_lun, beio);
1156 
1157 			mtx_lock(&be_lun->lock);
1158 			continue;
1159 		}
1160 		io = (union ctl_io *)STAILQ_FIRST(&be_lun->config_write_queue);
1161 		if (io != NULL) {
1162 
1163 			DPRINTF("config write queue\n");
1164 
1165 			STAILQ_REMOVE(&be_lun->config_write_queue, &io->io_hdr,
1166 				      ctl_io_hdr, links);
1167 
1168 			mtx_unlock(&be_lun->lock);
1169 
1170 			ctl_be_block_cw_dispatch(be_lun, io);
1171 
1172 			mtx_lock(&be_lun->lock);
1173 			continue;
1174 		}
1175 		io = (union ctl_io *)STAILQ_FIRST(&be_lun->input_queue);
1176 		if (io != NULL) {
1177 			DPRINTF("input queue\n");
1178 
1179 			STAILQ_REMOVE(&be_lun->input_queue, &io->io_hdr,
1180 				      ctl_io_hdr, links);
1181 			mtx_unlock(&be_lun->lock);
1182 
1183 			/*
1184 			 * We must drop the lock, since this routine and
1185 			 * its children may sleep.
1186 			 */
1187 			ctl_be_block_dispatch(be_lun, io);
1188 
1189 			mtx_lock(&be_lun->lock);
1190 			continue;
1191 		}
1192 
1193 		/*
1194 		 * If we get here, there is no work left in the queues, so
1195 		 * just break out and let the task queue go to sleep.
1196 		 */
1197 		break;
1198 	}
1199 	mtx_unlock(&be_lun->lock);
1200 }
1201 
1202 /*
1203  * Entry point from CTL to the backend for I/O.  We queue everything to a
1204  * work thread, so this just puts the I/O on a queue and wakes up the
1205  * thread.
1206  */
1207 static int
1208 ctl_be_block_submit(union ctl_io *io)
1209 {
1210 	struct ctl_be_block_lun *be_lun;
1211 	struct ctl_be_lun *ctl_be_lun;
1212 	int retval;
1213 
1214 	DPRINTF("entered\n");
1215 
1216 	retval = CTL_RETVAL_COMPLETE;
1217 
1218 	ctl_be_lun = (struct ctl_be_lun *)io->io_hdr.ctl_private[
1219 		CTL_PRIV_BACKEND_LUN].ptr;
1220 	be_lun = (struct ctl_be_block_lun *)ctl_be_lun->be_lun;
1221 
1222 	/*
1223 	 * Make sure we only get SCSI I/O.
1224 	 */
1225 	KASSERT(io->io_hdr.io_type == CTL_IO_SCSI, ("Non-SCSI I/O (type "
1226 		"%#x) encountered", io->io_hdr.io_type));
1227 
1228 	mtx_lock(&be_lun->lock);
1229 	/*
1230 	 * XXX KDM make sure that links is okay to use at this point.
1231 	 * Otherwise, we either need to add another field to ctl_io_hdr,
1232 	 * or deal with resource allocation here.
1233 	 */
1234 	STAILQ_INSERT_TAIL(&be_lun->input_queue, &io->io_hdr, links);
1235 	mtx_unlock(&be_lun->lock);
1236 
1237 	taskqueue_enqueue(be_lun->io_taskqueue, &be_lun->io_task);
1238 
1239 	return (retval);
1240 }
1241 
1242 static int
1243 ctl_be_block_ioctl(struct cdev *dev, u_long cmd, caddr_t addr,
1244 			int flag, struct thread *td)
1245 {
1246 	struct ctl_be_block_softc *softc;
1247 	int error;
1248 
1249 	softc = &backend_block_softc;
1250 
1251 	error = 0;
1252 
1253 	switch (cmd) {
1254 	case CTL_LUN_REQ: {
1255 		struct ctl_lun_req *lun_req;
1256 
1257 		lun_req = (struct ctl_lun_req *)addr;
1258 
1259 		switch (lun_req->reqtype) {
1260 		case CTL_LUNREQ_CREATE:
1261 			error = ctl_be_block_create(softc, lun_req);
1262 			break;
1263 		case CTL_LUNREQ_RM:
1264 			error = ctl_be_block_rm(softc, lun_req);
1265 			break;
1266 		default:
1267 			lun_req->status = CTL_LUN_ERROR;
1268 			snprintf(lun_req->error_str, sizeof(lun_req->error_str),
1269 				 "%s: invalid LUN request type %d", __func__,
1270 				 lun_req->reqtype);
1271 			break;
1272 		}
1273 		break;
1274 	}
1275 	default:
1276 		error = ENOTTY;
1277 		break;
1278 	}
1279 
1280 	return (error);
1281 }
1282 
1283 static int
1284 ctl_be_block_open_file(struct ctl_be_block_lun *be_lun, struct ctl_lun_req *req)
1285 {
1286 	struct ctl_be_block_filedata *file_data;
1287 	struct ctl_lun_create_params *params;
1288 	struct vattr		      vattr;
1289 	int			      error;
1290 
1291 	error = 0;
1292 	file_data = &be_lun->backend.file;
1293 	params = &req->reqdata.create;
1294 
1295 	be_lun->dev_type = CTL_BE_BLOCK_FILE;
1296 	be_lun->dispatch = ctl_be_block_dispatch_file;
1297 	be_lun->lun_flush = ctl_be_block_flush_file;
1298 
1299 	error = VOP_GETATTR(be_lun->vn, &vattr, curthread->td_ucred);
1300 	if (error != 0) {
1301 		snprintf(req->error_str, sizeof(req->error_str),
1302 			 "error calling VOP_GETATTR() for file %s",
1303 			 be_lun->dev_path);
1304 		return (error);
1305 	}
1306 
1307 	/*
1308 	 * Verify that we have the ability to upgrade to exclusive
1309 	 * access on this file so we can trap errors at open instead
1310 	 * of reporting them during first access.
1311 	 */
1312 	if (VOP_ISLOCKED(be_lun->vn) != LK_EXCLUSIVE) {
1313 		vn_lock(be_lun->vn, LK_UPGRADE | LK_RETRY);
1314 		if (be_lun->vn->v_iflag & VI_DOOMED) {
1315 			error = EBADF;
1316 			snprintf(req->error_str, sizeof(req->error_str),
1317 				 "error locking file %s", be_lun->dev_path);
1318 			return (error);
1319 		}
1320 	}
1321 
1322 
1323 	file_data->cred = crhold(curthread->td_ucred);
1324 	be_lun->size_bytes = vattr.va_size;
1325 	/*
1326 	 * We set the multi thread flag for file operations because all
1327 	 * filesystems (in theory) are capable of allowing multiple readers
1328 	 * of a file at once.  So we want to get the maximum possible
1329 	 * concurrency.
1330 	 */
1331 	be_lun->flags |= CTL_BE_BLOCK_LUN_MULTI_THREAD;
1332 
1333 	/*
1334 	 * XXX KDM vattr.va_blocksize may be larger than 512 bytes here.
1335 	 * With ZFS, it is 131072 bytes.  Block sizes that large don't work
1336 	 * with disklabel and UFS on FreeBSD at least.  Large block sizes
1337 	 * may not work with other OSes as well.  So just export a sector
1338 	 * size of 512 bytes, which should work with any OS or
1339 	 * application.  Since our backing is a file, any block size will
1340 	 * work fine for the backing store.
1341 	 */
1342 #if 0
1343 	be_lun->blocksize= vattr.va_blocksize;
1344 #endif
1345 	if (params->blocksize_bytes != 0)
1346 		be_lun->blocksize = params->blocksize_bytes;
1347 	else
1348 		be_lun->blocksize = 512;
1349 
1350 	/*
1351 	 * Sanity check.  The media size has to be at least one
1352 	 * sector long.
1353 	 */
1354 	if (be_lun->size_bytes < be_lun->blocksize) {
1355 		error = EINVAL;
1356 		snprintf(req->error_str, sizeof(req->error_str),
1357 			 "file %s size %ju < block size %u", be_lun->dev_path,
1358 			 (uintmax_t)be_lun->size_bytes, be_lun->blocksize);
1359 	}
1360 	return (error);
1361 }
1362 
1363 static int
1364 ctl_be_block_open_dev(struct ctl_be_block_lun *be_lun, struct ctl_lun_req *req)
1365 {
1366 	struct ctl_lun_create_params *params;
1367 	struct vattr		      vattr;
1368 	struct cdev		     *dev;
1369 	struct cdevsw		     *devsw;
1370 	int			      error;
1371 
1372 	params = &req->reqdata.create;
1373 
1374 	be_lun->dev_type = CTL_BE_BLOCK_DEV;
1375 	be_lun->dispatch = ctl_be_block_dispatch_dev;
1376 	be_lun->lun_flush = ctl_be_block_flush_dev;
1377 	be_lun->backend.dev.cdev = be_lun->vn->v_rdev;
1378 	be_lun->backend.dev.csw = dev_refthread(be_lun->backend.dev.cdev,
1379 					     &be_lun->backend.dev.dev_ref);
1380 	if (be_lun->backend.dev.csw == NULL)
1381 		panic("Unable to retrieve device switch");
1382 
1383 	error = VOP_GETATTR(be_lun->vn, &vattr, NOCRED);
1384 	if (error) {
1385 		snprintf(req->error_str, sizeof(req->error_str),
1386 			 "%s: error getting vnode attributes for device %s",
1387 			 __func__, be_lun->dev_path);
1388 		return (error);
1389 	}
1390 
1391 	dev = be_lun->vn->v_rdev;
1392 	devsw = dev->si_devsw;
1393 	if (!devsw->d_ioctl) {
1394 		snprintf(req->error_str, sizeof(req->error_str),
1395 			 "%s: no d_ioctl for device %s!", __func__,
1396 			 be_lun->dev_path);
1397 		return (ENODEV);
1398 	}
1399 
1400 	error = devsw->d_ioctl(dev, DIOCGSECTORSIZE,
1401 			       (caddr_t)&be_lun->blocksize, FREAD,
1402 			       curthread);
1403 	if (error) {
1404 		snprintf(req->error_str, sizeof(req->error_str),
1405 			 "%s: error %d returned for DIOCGSECTORSIZE ioctl "
1406 			 "on %s!", __func__, error, be_lun->dev_path);
1407 		return (error);
1408 	}
1409 
1410 	/*
1411 	 * If the user has asked for a blocksize that is greater than the
1412 	 * backing device's blocksize, we can do it only if the blocksize
1413 	 * the user is asking for is an even multiple of the underlying
1414 	 * device's blocksize.
1415 	 */
1416 	if ((params->blocksize_bytes != 0)
1417 	 && (params->blocksize_bytes > be_lun->blocksize)) {
1418 		uint32_t bs_multiple, tmp_blocksize;
1419 
1420 		bs_multiple = params->blocksize_bytes / be_lun->blocksize;
1421 
1422 		tmp_blocksize = bs_multiple * be_lun->blocksize;
1423 
1424 		if (tmp_blocksize == params->blocksize_bytes) {
1425 			be_lun->blocksize = params->blocksize_bytes;
1426 		} else {
1427 			snprintf(req->error_str, sizeof(req->error_str),
1428 				 "%s: requested blocksize %u is not an even "
1429 				 "multiple of backing device blocksize %u",
1430 				 __func__, params->blocksize_bytes,
1431 				 be_lun->blocksize);
1432 			return (EINVAL);
1433 
1434 		}
1435 	} else if ((params->blocksize_bytes != 0)
1436 		&& (params->blocksize_bytes != be_lun->blocksize)) {
1437 		snprintf(req->error_str, sizeof(req->error_str),
1438 			 "%s: requested blocksize %u < backing device "
1439 			 "blocksize %u", __func__, params->blocksize_bytes,
1440 			 be_lun->blocksize);
1441 		return (EINVAL);
1442 	}
1443 
1444 	error = devsw->d_ioctl(dev, DIOCGMEDIASIZE,
1445 			       (caddr_t)&be_lun->size_bytes, FREAD,
1446 			       curthread);
1447 	if (error) {
1448 		snprintf(req->error_str, sizeof(req->error_str),
1449 			 "%s: error %d returned for DIOCGMEDIASIZE ioctl "
1450 			 "on %s!", __func__, error, be_lun->dev_path);
1451 		return (error);
1452 	}
1453 
1454 	return (0);
1455 
1456 }
1457 
1458 
1459 static int
1460 ctl_be_block_close(struct ctl_be_block_lun *be_lun)
1461 {
1462 	DROP_GIANT();
1463 	if (be_lun->vn) {
1464 		int flags = FREAD | FWRITE;
1465 		int vfs_is_locked = 0;
1466 
1467 		switch (be_lun->dev_type) {
1468 		case CTL_BE_BLOCK_DEV:
1469 			if (be_lun->backend.dev.csw) {
1470 				dev_relthread(be_lun->backend.dev.cdev,
1471 					      be_lun->backend.dev.dev_ref);
1472 				be_lun->backend.dev.csw  = NULL;
1473 				be_lun->backend.dev.cdev = NULL;
1474 			}
1475 			break;
1476 		case CTL_BE_BLOCK_FILE:
1477 			vfs_is_locked = VFS_LOCK_GIANT(be_lun->vn->v_mount);
1478 			break;
1479 		case CTL_BE_BLOCK_NONE:
1480 		default:
1481 			panic("Unexpected backend type.");
1482 			break;
1483 		}
1484 
1485 		(void)vn_close(be_lun->vn, flags, NOCRED, curthread);
1486 		be_lun->vn = NULL;
1487 
1488 		switch (be_lun->dev_type) {
1489 		case CTL_BE_BLOCK_DEV:
1490 			break;
1491 		case CTL_BE_BLOCK_FILE:
1492 			VFS_UNLOCK_GIANT(vfs_is_locked);
1493 			if (be_lun->backend.file.cred != NULL) {
1494 				crfree(be_lun->backend.file.cred);
1495 				be_lun->backend.file.cred = NULL;
1496 			}
1497 			break;
1498 		case CTL_BE_BLOCK_NONE:
1499 		default:
1500 			panic("Unexpected backend type.");
1501 			break;
1502 		}
1503 	}
1504 	PICKUP_GIANT();
1505 
1506 	return (0);
1507 }
1508 
1509 static int
1510 ctl_be_block_open(struct ctl_be_block_softc *softc,
1511 		       struct ctl_be_block_lun *be_lun, struct ctl_lun_req *req)
1512 {
1513 	struct nameidata nd;
1514 	int		 flags;
1515 	int		 error;
1516 	int		 vfs_is_locked;
1517 
1518 	/*
1519 	 * XXX KDM allow a read-only option?
1520 	 */
1521 	flags = FREAD | FWRITE;
1522 	error = 0;
1523 
1524 	if (rootvnode == NULL) {
1525 		snprintf(req->error_str, sizeof(req->error_str),
1526 			 "%s: Root filesystem is not mounted", __func__);
1527 		return (1);
1528 	}
1529 
1530 	if (!curthread->td_proc->p_fd->fd_cdir) {
1531 		curthread->td_proc->p_fd->fd_cdir = rootvnode;
1532 		VREF(rootvnode);
1533 	}
1534 	if (!curthread->td_proc->p_fd->fd_rdir) {
1535 		curthread->td_proc->p_fd->fd_rdir = rootvnode;
1536 		VREF(rootvnode);
1537 	}
1538 	if (!curthread->td_proc->p_fd->fd_jdir) {
1539 		curthread->td_proc->p_fd->fd_jdir = rootvnode;
1540 		VREF(rootvnode);
1541 	}
1542 
1543  again:
1544 	NDINIT(&nd, LOOKUP, FOLLOW, UIO_SYSSPACE, be_lun->dev_path, curthread);
1545 	error = vn_open(&nd, &flags, 0, NULL);
1546 	if (error) {
1547 		/*
1548 		 * This is the only reasonable guess we can make as far as
1549 		 * path if the user doesn't give us a fully qualified path.
1550 		 * If they want to specify a file, they need to specify the
1551 		 * full path.
1552 		 */
1553 		if (be_lun->dev_path[0] != '/') {
1554 			char *dev_path = "/dev/";
1555 			char *dev_name;
1556 
1557 			/* Try adding device path at beginning of name */
1558 			dev_name = malloc(strlen(be_lun->dev_path)
1559 					+ strlen(dev_path) + 1,
1560 					  M_CTLBLK, M_WAITOK);
1561 			if (dev_name) {
1562 				sprintf(dev_name, "%s%s", dev_path,
1563 					be_lun->dev_path);
1564 				free(be_lun->dev_path, M_CTLBLK);
1565 				be_lun->dev_path = dev_name;
1566 				goto again;
1567 			}
1568 		}
1569 		snprintf(req->error_str, sizeof(req->error_str),
1570 			 "%s: error opening %s", __func__, be_lun->dev_path);
1571 		return (error);
1572 	}
1573 
1574 	vfs_is_locked = NDHASGIANT(&nd);
1575 
1576 	NDFREE(&nd, NDF_ONLY_PNBUF);
1577 
1578 	be_lun->vn = nd.ni_vp;
1579 
1580 	/* We only support disks and files. */
1581 	if (vn_isdisk(be_lun->vn, &error)) {
1582 		error = ctl_be_block_open_dev(be_lun, req);
1583 	} else if (be_lun->vn->v_type == VREG) {
1584 		error = ctl_be_block_open_file(be_lun, req);
1585 	} else {
1586 		error = EINVAL;
1587 		snprintf(req->error_str, sizeof(req->error_str),
1588 			 "%s is not a disk or file", be_lun->dev_path);
1589 	}
1590 	VOP_UNLOCK(be_lun->vn, 0);
1591 	VFS_UNLOCK_GIANT(vfs_is_locked);
1592 
1593 	if (error != 0) {
1594 		ctl_be_block_close(be_lun);
1595 		return (error);
1596 	}
1597 
1598 	be_lun->blocksize_shift = fls(be_lun->blocksize) - 1;
1599 	be_lun->size_blocks = be_lun->size_bytes >> be_lun->blocksize_shift;
1600 
1601 	return (0);
1602 
1603 }
1604 
1605 static int
1606 ctl_be_block_mem_ctor(void *mem, int size, void *arg, int flags)
1607 {
1608 	return (0);
1609 }
1610 
1611 static void
1612 ctl_be_block_mem_dtor(void *mem, int size, void *arg)
1613 {
1614 	bzero(mem, size);
1615 }
1616 
1617 static int
1618 ctl_be_block_create(struct ctl_be_block_softc *softc, struct ctl_lun_req *req)
1619 {
1620 	struct ctl_be_block_lun *be_lun;
1621 	struct ctl_lun_create_params *params;
1622 	struct ctl_be_arg *file_arg;
1623 	char tmpstr[32];
1624 	int retval, num_threads;
1625 	int i;
1626 
1627 	params = &req->reqdata.create;
1628 	retval = 0;
1629 
1630 	num_threads = cbb_num_threads;
1631 
1632 	file_arg = NULL;
1633 
1634 	be_lun = malloc(sizeof(*be_lun), M_CTLBLK, M_ZERO | M_WAITOK);
1635 
1636 	if (be_lun == NULL) {
1637 		snprintf(req->error_str, sizeof(req->error_str),
1638 			 "%s: error allocating %zd bytes", __func__,
1639 			 sizeof(*be_lun));
1640 		goto bailout_error;
1641 	}
1642 
1643 	be_lun->softc = softc;
1644 	STAILQ_INIT(&be_lun->input_queue);
1645 	STAILQ_INIT(&be_lun->config_write_queue);
1646 	STAILQ_INIT(&be_lun->datamove_queue);
1647 	sprintf(be_lun->lunname, "cblk%d", softc->num_luns);
1648 	mtx_init(&be_lun->lock, be_lun->lunname, NULL, MTX_DEF);
1649 
1650 	be_lun->lun_zone = uma_zcreate(be_lun->lunname, MAXPHYS,
1651 	    ctl_be_block_mem_ctor, ctl_be_block_mem_dtor, NULL, NULL,
1652 	    /*align*/ 0, /*flags*/0);
1653 
1654 	if (be_lun->lun_zone == NULL) {
1655 		snprintf(req->error_str, sizeof(req->error_str),
1656 			 "%s: error allocating UMA zone", __func__);
1657 		goto bailout_error;
1658 	}
1659 
1660 	if (params->flags & CTL_LUN_FLAG_DEV_TYPE)
1661 		be_lun->ctl_be_lun.lun_type = params->device_type;
1662 	else
1663 		be_lun->ctl_be_lun.lun_type = T_DIRECT;
1664 
1665 	if (be_lun->ctl_be_lun.lun_type == T_DIRECT) {
1666 		for (i = 0; i < req->num_be_args; i++) {
1667 			if (strcmp(req->kern_be_args[i].name, "file") == 0) {
1668 				file_arg = &req->kern_be_args[i];
1669 				break;
1670 			}
1671 		}
1672 
1673 		if (file_arg == NULL) {
1674 			snprintf(req->error_str, sizeof(req->error_str),
1675 				 "%s: no file argument specified", __func__);
1676 			goto bailout_error;
1677 		}
1678 
1679 		be_lun->dev_path = malloc(file_arg->vallen, M_CTLBLK,
1680 					  M_WAITOK | M_ZERO);
1681 		if (be_lun->dev_path == NULL) {
1682 			snprintf(req->error_str, sizeof(req->error_str),
1683 				 "%s: error allocating %d bytes", __func__,
1684 				 file_arg->vallen);
1685 			goto bailout_error;
1686 		}
1687 
1688 		strlcpy(be_lun->dev_path, (char *)file_arg->value,
1689 			file_arg->vallen);
1690 
1691 		retval = ctl_be_block_open(softc, be_lun, req);
1692 		if (retval != 0) {
1693 			retval = 0;
1694 			goto bailout_error;
1695 		}
1696 
1697 		/*
1698 		 * Tell the user the size of the file/device.
1699 		 */
1700 		params->lun_size_bytes = be_lun->size_bytes;
1701 
1702 		/*
1703 		 * The maximum LBA is the size - 1.
1704 		 */
1705 		be_lun->ctl_be_lun.maxlba = be_lun->size_blocks - 1;
1706 	} else {
1707 		/*
1708 		 * For processor devices, we don't have any size.
1709 		 */
1710 		be_lun->blocksize = 0;
1711 		be_lun->size_blocks = 0;
1712 		be_lun->size_bytes = 0;
1713 		be_lun->ctl_be_lun.maxlba = 0;
1714 		params->lun_size_bytes = 0;
1715 
1716 		/*
1717 		 * Default to just 1 thread for processor devices.
1718 		 */
1719 		num_threads = 1;
1720 	}
1721 
1722 	/*
1723 	 * XXX This searching loop might be refactored to be combined with
1724 	 * the loop above,
1725 	 */
1726 	for (i = 0; i < req->num_be_args; i++) {
1727 		if (strcmp(req->kern_be_args[i].name, "num_threads") == 0) {
1728 			struct ctl_be_arg *thread_arg;
1729 			char num_thread_str[16];
1730 			int tmp_num_threads;
1731 
1732 
1733 			thread_arg = &req->kern_be_args[i];
1734 
1735 			strlcpy(num_thread_str, (char *)thread_arg->value,
1736 				min(thread_arg->vallen,
1737 				sizeof(num_thread_str)));
1738 
1739 			tmp_num_threads = strtol(num_thread_str, NULL, 0);
1740 
1741 			/*
1742 			 * We don't let the user specify less than one
1743 			 * thread, but hope he's clueful enough not to
1744 			 * specify 1000 threads.
1745 			 */
1746 			if (tmp_num_threads < 1) {
1747 				snprintf(req->error_str, sizeof(req->error_str),
1748 					 "%s: invalid number of threads %s",
1749 				         __func__, num_thread_str);
1750 				goto bailout_error;
1751 			}
1752 
1753 			num_threads = tmp_num_threads;
1754 		}
1755 	}
1756 
1757 	be_lun->flags = CTL_BE_BLOCK_LUN_UNCONFIGURED;
1758 	be_lun->ctl_be_lun.flags = CTL_LUN_FLAG_PRIMARY;
1759 	be_lun->ctl_be_lun.be_lun = be_lun;
1760 	be_lun->ctl_be_lun.blocksize = be_lun->blocksize;
1761 	/* Tell the user the blocksize we ended up using */
1762 	params->blocksize_bytes = be_lun->blocksize;
1763 	if (params->flags & CTL_LUN_FLAG_ID_REQ) {
1764 		be_lun->ctl_be_lun.req_lun_id = params->req_lun_id;
1765 		be_lun->ctl_be_lun.flags |= CTL_LUN_FLAG_ID_REQ;
1766 	} else
1767 		be_lun->ctl_be_lun.req_lun_id = 0;
1768 
1769 	be_lun->ctl_be_lun.lun_shutdown = ctl_be_block_lun_shutdown;
1770 	be_lun->ctl_be_lun.lun_config_status =
1771 		ctl_be_block_lun_config_status;
1772 	be_lun->ctl_be_lun.be = &ctl_be_block_driver;
1773 
1774 	if ((params->flags & CTL_LUN_FLAG_SERIAL_NUM) == 0) {
1775 		snprintf(tmpstr, sizeof(tmpstr), "MYSERIAL%4d",
1776 			 softc->num_luns);
1777 		strncpy((char *)be_lun->ctl_be_lun.serial_num, tmpstr,
1778 			ctl_min(sizeof(be_lun->ctl_be_lun.serial_num),
1779 			sizeof(tmpstr)));
1780 
1781 		/* Tell the user what we used for a serial number */
1782 		strncpy((char *)params->serial_num, tmpstr,
1783 			ctl_min(sizeof(params->serial_num), sizeof(tmpstr)));
1784 	} else {
1785 		strncpy((char *)be_lun->ctl_be_lun.serial_num,
1786 			params->serial_num,
1787 			ctl_min(sizeof(be_lun->ctl_be_lun.serial_num),
1788 			sizeof(params->serial_num)));
1789 	}
1790 	if ((params->flags & CTL_LUN_FLAG_DEVID) == 0) {
1791 		snprintf(tmpstr, sizeof(tmpstr), "MYDEVID%4d", softc->num_luns);
1792 		strncpy((char *)be_lun->ctl_be_lun.device_id, tmpstr,
1793 			ctl_min(sizeof(be_lun->ctl_be_lun.device_id),
1794 			sizeof(tmpstr)));
1795 
1796 		/* Tell the user what we used for a device ID */
1797 		strncpy((char *)params->device_id, tmpstr,
1798 			ctl_min(sizeof(params->device_id), sizeof(tmpstr)));
1799 	} else {
1800 		strncpy((char *)be_lun->ctl_be_lun.device_id,
1801 			params->device_id,
1802 			ctl_min(sizeof(be_lun->ctl_be_lun.device_id),
1803 				sizeof(params->device_id)));
1804 	}
1805 
1806 	TASK_INIT(&be_lun->io_task, /*priority*/0, ctl_be_block_worker, be_lun);
1807 
1808 	be_lun->io_taskqueue = taskqueue_create(be_lun->lunname, M_WAITOK,
1809 	    taskqueue_thread_enqueue, /*context*/&be_lun->io_taskqueue);
1810 
1811 	if (be_lun->io_taskqueue == NULL) {
1812 		snprintf(req->error_str, sizeof(req->error_str),
1813 			 "%s: Unable to create taskqueue", __func__);
1814 		goto bailout_error;
1815 	}
1816 
1817 	/*
1818 	 * Note that we start the same number of threads by default for
1819 	 * both the file case and the block device case.  For the file
1820 	 * case, we need multiple threads to allow concurrency, because the
1821 	 * vnode interface is designed to be a blocking interface.  For the
1822 	 * block device case, ZFS zvols at least will block the caller's
1823 	 * context in many instances, and so we need multiple threads to
1824 	 * overcome that problem.  Other block devices don't need as many
1825 	 * threads, but they shouldn't cause too many problems.
1826 	 *
1827 	 * If the user wants to just have a single thread for a block
1828 	 * device, he can specify that when the LUN is created, or change
1829 	 * the tunable/sysctl to alter the default number of threads.
1830 	 */
1831 	retval = taskqueue_start_threads(&be_lun->io_taskqueue,
1832 					 /*num threads*/num_threads,
1833 					 /*priority*/PWAIT,
1834 					 /*thread name*/
1835 					 "%s taskq", be_lun->lunname);
1836 
1837 	if (retval != 0)
1838 		goto bailout_error;
1839 
1840 	be_lun->num_threads = num_threads;
1841 
1842 	mtx_lock(&softc->lock);
1843 	softc->num_luns++;
1844 	STAILQ_INSERT_TAIL(&softc->lun_list, be_lun, links);
1845 
1846 	mtx_unlock(&softc->lock);
1847 
1848 	retval = ctl_add_lun(&be_lun->ctl_be_lun);
1849 	if (retval != 0) {
1850 		mtx_lock(&softc->lock);
1851 		STAILQ_REMOVE(&softc->lun_list, be_lun, ctl_be_block_lun,
1852 			      links);
1853 		softc->num_luns--;
1854 		mtx_unlock(&softc->lock);
1855 		snprintf(req->error_str, sizeof(req->error_str),
1856 			 "%s: ctl_add_lun() returned error %d, see dmesg for "
1857 			"details", __func__, retval);
1858 		retval = 0;
1859 		goto bailout_error;
1860 	}
1861 
1862 	mtx_lock(&softc->lock);
1863 
1864 	/*
1865 	 * Tell the config_status routine that we're waiting so it won't
1866 	 * clean up the LUN in the event of an error.
1867 	 */
1868 	be_lun->flags |= CTL_BE_BLOCK_LUN_WAITING;
1869 
1870 	while (be_lun->flags & CTL_BE_BLOCK_LUN_UNCONFIGURED) {
1871 		retval = msleep(be_lun, &softc->lock, PCATCH, "ctlblk", 0);
1872 		if (retval == EINTR)
1873 			break;
1874 	}
1875 	be_lun->flags &= ~CTL_BE_BLOCK_LUN_WAITING;
1876 
1877 	if (be_lun->flags & CTL_BE_BLOCK_LUN_CONFIG_ERR) {
1878 		snprintf(req->error_str, sizeof(req->error_str),
1879 			 "%s: LUN configuration error, see dmesg for details",
1880 			 __func__);
1881 		STAILQ_REMOVE(&softc->lun_list, be_lun, ctl_be_block_lun,
1882 			      links);
1883 		softc->num_luns--;
1884 		mtx_unlock(&softc->lock);
1885 		goto bailout_error;
1886 	} else {
1887 		params->req_lun_id = be_lun->ctl_be_lun.lun_id;
1888 	}
1889 
1890 	mtx_unlock(&softc->lock);
1891 
1892 	be_lun->disk_stats = devstat_new_entry("cbb", params->req_lun_id,
1893 					       be_lun->blocksize,
1894 					       DEVSTAT_ALL_SUPPORTED,
1895 					       be_lun->ctl_be_lun.lun_type
1896 					       | DEVSTAT_TYPE_IF_OTHER,
1897 					       DEVSTAT_PRIORITY_OTHER);
1898 
1899 
1900 	req->status = CTL_LUN_OK;
1901 
1902 	return (retval);
1903 
1904 bailout_error:
1905 	req->status = CTL_LUN_ERROR;
1906 
1907 	ctl_be_block_close(be_lun);
1908 
1909 	free(be_lun->dev_path, M_CTLBLK);
1910 	free(be_lun, M_CTLBLK);
1911 
1912 	return (retval);
1913 }
1914 
1915 static int
1916 ctl_be_block_rm(struct ctl_be_block_softc *softc, struct ctl_lun_req *req)
1917 {
1918 	struct ctl_lun_rm_params *params;
1919 	struct ctl_be_block_lun *be_lun;
1920 	int retval;
1921 
1922 	params = &req->reqdata.rm;
1923 
1924 	mtx_lock(&softc->lock);
1925 
1926 	be_lun = NULL;
1927 
1928 	STAILQ_FOREACH(be_lun, &softc->lun_list, links) {
1929 		if (be_lun->ctl_be_lun.lun_id == params->lun_id)
1930 			break;
1931 	}
1932 	mtx_unlock(&softc->lock);
1933 
1934 	if (be_lun == NULL) {
1935 		snprintf(req->error_str, sizeof(req->error_str),
1936 			 "%s: LUN %u is not managed by the block backend",
1937 			 __func__, params->lun_id);
1938 		goto bailout_error;
1939 	}
1940 
1941 	retval = ctl_disable_lun(&be_lun->ctl_be_lun);
1942 
1943 	if (retval != 0) {
1944 		snprintf(req->error_str, sizeof(req->error_str),
1945 			 "%s: error %d returned from ctl_disable_lun() for "
1946 			 "LUN %d", __func__, retval, params->lun_id);
1947 		goto bailout_error;
1948 
1949 	}
1950 
1951 	retval = ctl_invalidate_lun(&be_lun->ctl_be_lun);
1952 	if (retval != 0) {
1953 		snprintf(req->error_str, sizeof(req->error_str),
1954 			 "%s: error %d returned from ctl_invalidate_lun() for "
1955 			 "LUN %d", __func__, retval, params->lun_id);
1956 		goto bailout_error;
1957 	}
1958 
1959 	mtx_lock(&softc->lock);
1960 
1961 	be_lun->flags |= CTL_BE_BLOCK_LUN_WAITING;
1962 
1963 	while ((be_lun->flags & CTL_BE_BLOCK_LUN_UNCONFIGURED) == 0) {
1964                 retval = msleep(be_lun, &softc->lock, PCATCH, "ctlblk", 0);
1965                 if (retval == EINTR)
1966                         break;
1967         }
1968 
1969 	be_lun->flags &= ~CTL_BE_BLOCK_LUN_WAITING;
1970 
1971 	if ((be_lun->flags & CTL_BE_BLOCK_LUN_UNCONFIGURED) == 0) {
1972 		snprintf(req->error_str, sizeof(req->error_str),
1973 			 "%s: interrupted waiting for LUN to be freed",
1974 			 __func__);
1975 		mtx_unlock(&softc->lock);
1976 		goto bailout_error;
1977 	}
1978 
1979 	STAILQ_REMOVE(&softc->lun_list, be_lun, ctl_be_block_lun, links);
1980 
1981 	softc->num_luns--;
1982 	mtx_unlock(&softc->lock);
1983 
1984 	taskqueue_drain(be_lun->io_taskqueue, &be_lun->io_task);
1985 
1986 	taskqueue_free(be_lun->io_taskqueue);
1987 
1988 	ctl_be_block_close(be_lun);
1989 
1990 	if (be_lun->disk_stats != NULL)
1991 		devstat_remove_entry(be_lun->disk_stats);
1992 
1993 	uma_zdestroy(be_lun->lun_zone);
1994 
1995 	free(be_lun->dev_path, M_CTLBLK);
1996 
1997 	free(be_lun, M_CTLBLK);
1998 
1999 	req->status = CTL_LUN_OK;
2000 
2001 	return (0);
2002 
2003 bailout_error:
2004 
2005 	req->status = CTL_LUN_ERROR;
2006 
2007 	return (0);
2008 }
2009 
2010 static void
2011 ctl_be_block_lun_shutdown(void *be_lun)
2012 {
2013 	struct ctl_be_block_lun *lun;
2014 	struct ctl_be_block_softc *softc;
2015 
2016 	lun = (struct ctl_be_block_lun *)be_lun;
2017 
2018 	softc = lun->softc;
2019 
2020 	mtx_lock(&softc->lock);
2021 	lun->flags |= CTL_BE_BLOCK_LUN_UNCONFIGURED;
2022 	if (lun->flags & CTL_BE_BLOCK_LUN_WAITING)
2023 		wakeup(lun);
2024 	mtx_unlock(&softc->lock);
2025 
2026 }
2027 
2028 static void
2029 ctl_be_block_lun_config_status(void *be_lun, ctl_lun_config_status status)
2030 {
2031 	struct ctl_be_block_lun *lun;
2032 	struct ctl_be_block_softc *softc;
2033 
2034 	lun = (struct ctl_be_block_lun *)be_lun;
2035 	softc = lun->softc;
2036 
2037 	if (status == CTL_LUN_CONFIG_OK) {
2038 		mtx_lock(&softc->lock);
2039 		lun->flags &= ~CTL_BE_BLOCK_LUN_UNCONFIGURED;
2040 		if (lun->flags & CTL_BE_BLOCK_LUN_WAITING)
2041 			wakeup(lun);
2042 		mtx_unlock(&softc->lock);
2043 
2044 		/*
2045 		 * We successfully added the LUN, attempt to enable it.
2046 		 */
2047 		if (ctl_enable_lun(&lun->ctl_be_lun) != 0) {
2048 			printf("%s: ctl_enable_lun() failed!\n", __func__);
2049 			if (ctl_invalidate_lun(&lun->ctl_be_lun) != 0) {
2050 				printf("%s: ctl_invalidate_lun() failed!\n",
2051 				       __func__);
2052 			}
2053 		}
2054 
2055 		return;
2056 	}
2057 
2058 
2059 	mtx_lock(&softc->lock);
2060 	lun->flags &= ~CTL_BE_BLOCK_LUN_UNCONFIGURED;
2061 	lun->flags |= CTL_BE_BLOCK_LUN_CONFIG_ERR;
2062 	wakeup(lun);
2063 	mtx_unlock(&softc->lock);
2064 }
2065 
2066 
2067 static int
2068 ctl_be_block_config_write(union ctl_io *io)
2069 {
2070 	struct ctl_be_block_lun *be_lun;
2071 	struct ctl_be_lun *ctl_be_lun;
2072 	int retval;
2073 
2074 	retval = 0;
2075 
2076 	DPRINTF("entered\n");
2077 
2078 	ctl_be_lun = (struct ctl_be_lun *)io->io_hdr.ctl_private[
2079 		CTL_PRIV_BACKEND_LUN].ptr;
2080 	be_lun = (struct ctl_be_block_lun *)ctl_be_lun->be_lun;
2081 
2082 	switch (io->scsiio.cdb[0]) {
2083 	case SYNCHRONIZE_CACHE:
2084 	case SYNCHRONIZE_CACHE_16:
2085 		/*
2086 		 * The upper level CTL code will filter out any CDBs with
2087 		 * the immediate bit set and return the proper error.
2088 		 *
2089 		 * We don't really need to worry about what LBA range the
2090 		 * user asked to be synced out.  When they issue a sync
2091 		 * cache command, we'll sync out the whole thing.
2092 		 */
2093 		mtx_lock(&be_lun->lock);
2094 		STAILQ_INSERT_TAIL(&be_lun->config_write_queue, &io->io_hdr,
2095 				   links);
2096 		mtx_unlock(&be_lun->lock);
2097 		taskqueue_enqueue(be_lun->io_taskqueue, &be_lun->io_task);
2098 		break;
2099 	case START_STOP_UNIT: {
2100 		struct scsi_start_stop_unit *cdb;
2101 
2102 		cdb = (struct scsi_start_stop_unit *)io->scsiio.cdb;
2103 
2104 		if (cdb->how & SSS_START)
2105 			retval = ctl_start_lun(ctl_be_lun);
2106 		else {
2107 			retval = ctl_stop_lun(ctl_be_lun);
2108 			/*
2109 			 * XXX KDM Copan-specific offline behavior.
2110 			 * Figure out a reasonable way to port this?
2111 			 */
2112 #ifdef NEEDTOPORT
2113 			if ((retval == 0)
2114 			 && (cdb->byte2 & SSS_ONOFFLINE))
2115 				retval = ctl_lun_offline(ctl_be_lun);
2116 #endif
2117 		}
2118 
2119 		/*
2120 		 * In general, the above routines should not fail.  They
2121 		 * just set state for the LUN.  So we've got something
2122 		 * pretty wrong here if we can't start or stop the LUN.
2123 		 */
2124 		if (retval != 0) {
2125 			ctl_set_internal_failure(&io->scsiio,
2126 						 /*sks_valid*/ 1,
2127 						 /*retry_count*/ 0xf051);
2128 			retval = CTL_RETVAL_COMPLETE;
2129 		} else {
2130 			ctl_set_success(&io->scsiio);
2131 		}
2132 		ctl_config_write_done(io);
2133 		break;
2134 	}
2135 	default:
2136 		ctl_set_invalid_opcode(&io->scsiio);
2137 		ctl_config_write_done(io);
2138 		retval = CTL_RETVAL_COMPLETE;
2139 		break;
2140 	}
2141 
2142 	return (retval);
2143 
2144 }
2145 
2146 static int
2147 ctl_be_block_config_read(union ctl_io *io)
2148 {
2149 	return (0);
2150 }
2151 
2152 static int
2153 ctl_be_block_lun_info(void *be_lun, struct sbuf *sb)
2154 {
2155 	struct ctl_be_block_lun *lun;
2156 	int retval;
2157 
2158 	lun = (struct ctl_be_block_lun *)be_lun;
2159 	retval = 0;
2160 
2161 	retval = sbuf_printf(sb, "<num_threads>");
2162 
2163 	if (retval != 0)
2164 		goto bailout;
2165 
2166 	retval = sbuf_printf(sb, "%d", lun->num_threads);
2167 
2168 	if (retval != 0)
2169 		goto bailout;
2170 
2171 	retval = sbuf_printf(sb, "</num_threads>");
2172 
2173 	/*
2174 	 * For processor devices, we don't have a path variable.
2175 	 */
2176 	if ((retval != 0)
2177 	 || (lun->dev_path == NULL))
2178 		goto bailout;
2179 
2180 	retval = sbuf_printf(sb, "<file>");
2181 
2182 	if (retval != 0)
2183 		goto bailout;
2184 
2185 	retval = ctl_sbuf_printf_esc(sb, lun->dev_path);
2186 
2187 	if (retval != 0)
2188 		goto bailout;
2189 
2190 	retval = sbuf_printf(sb, "</file>\n");
2191 
2192 bailout:
2193 
2194 	return (retval);
2195 }
2196 
2197 int
2198 ctl_be_block_init(void)
2199 {
2200 	struct ctl_be_block_softc *softc;
2201 	int retval;
2202 
2203 	softc = &backend_block_softc;
2204 	retval = 0;
2205 
2206 	mtx_init(&softc->lock, "ctlblk", NULL, MTX_DEF);
2207 	STAILQ_INIT(&softc->beio_free_queue);
2208 	STAILQ_INIT(&softc->disk_list);
2209 	STAILQ_INIT(&softc->lun_list);
2210 	ctl_grow_beio(softc, 200);
2211 
2212 	return (retval);
2213 }
2214