1 /*- 2 * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org> 3 * Copyright (c) 2008-2009, Lawrence Stewart <lstewart@freebsd.org> 4 * Copyright (c) 2009-2010, The FreeBSD Foundation 5 * All rights reserved. 6 * 7 * Portions of this software were developed at the Centre for Advanced 8 * Internet Architectures, Swinburne University of Technology, Melbourne, 9 * Australia by Lawrence Stewart under sponsorship from the FreeBSD Foundation. 10 * 11 * Redistribution and use in source and binary forms, with or without 12 * modification, are permitted provided that the following conditions 13 * are met: 14 * 1. Redistributions of source code must retain the above copyright 15 * notice unmodified, this list of conditions, and the following 16 * disclaimer. 17 * 2. Redistributions in binary form must reproduce the above copyright 18 * notice, this list of conditions and the following disclaimer in the 19 * documentation and/or other materials provided with the distribution. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 22 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 23 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 24 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 25 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 26 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 30 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 #include <sys/cdefs.h> 34 __FBSDID("$FreeBSD$"); 35 36 #include "opt_mac.h" 37 38 #include <sys/param.h> 39 #include <sys/systm.h> 40 #include <sys/kernel.h> 41 #include <sys/kthread.h> 42 #include <sys/lock.h> 43 #include <sys/mount.h> 44 #include <sys/mutex.h> 45 #include <sys/namei.h> 46 #include <sys/proc.h> 47 #include <sys/vnode.h> 48 #include <sys/alq.h> 49 #include <sys/malloc.h> 50 #include <sys/unistd.h> 51 #include <sys/fcntl.h> 52 #include <sys/eventhandler.h> 53 54 #include <security/mac/mac_framework.h> 55 56 /* Async. Logging Queue */ 57 struct alq { 58 char *aq_entbuf; /* Buffer for stored entries */ 59 int aq_entmax; /* Max entries */ 60 int aq_entlen; /* Entry length */ 61 int aq_freebytes; /* Bytes available in buffer */ 62 int aq_buflen; /* Total length of our buffer */ 63 int aq_writehead; /* Location for next write */ 64 int aq_writetail; /* Flush starts at this location */ 65 int aq_wrapearly; /* # bytes left blank at end of buf */ 66 int aq_flags; /* Queue flags */ 67 int aq_waiters; /* Num threads waiting for resources 68 * NB: Used as a wait channel so must 69 * not be first field in the alq struct 70 */ 71 struct ale aq_getpost; /* ALE for use by get/post */ 72 struct mtx aq_mtx; /* Queue lock */ 73 struct vnode *aq_vp; /* Open vnode handle */ 74 struct ucred *aq_cred; /* Credentials of the opening thread */ 75 LIST_ENTRY(alq) aq_act; /* List of active queues */ 76 LIST_ENTRY(alq) aq_link; /* List of all queues */ 77 }; 78 79 #define AQ_WANTED 0x0001 /* Wakeup sleeper when io is done */ 80 #define AQ_ACTIVE 0x0002 /* on the active list */ 81 #define AQ_FLUSHING 0x0004 /* doing IO */ 82 #define AQ_SHUTDOWN 0x0008 /* Queue no longer valid */ 83 #define AQ_ORDERED 0x0010 /* Queue enforces ordered writes */ 84 #define AQ_LEGACY 0x0020 /* Legacy queue (fixed length writes) */ 85 86 #define ALQ_LOCK(alq) mtx_lock_spin(&(alq)->aq_mtx) 87 #define ALQ_UNLOCK(alq) mtx_unlock_spin(&(alq)->aq_mtx) 88 89 #define HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen) 90 91 static MALLOC_DEFINE(M_ALD, "ALD", "ALD"); 92 93 /* 94 * The ald_mtx protects the ald_queues list and the ald_active list. 95 */ 96 static struct mtx ald_mtx; 97 static LIST_HEAD(, alq) ald_queues; 98 static LIST_HEAD(, alq) ald_active; 99 static int ald_shutingdown = 0; 100 struct thread *ald_thread; 101 static struct proc *ald_proc; 102 103 #define ALD_LOCK() mtx_lock(&ald_mtx) 104 #define ALD_UNLOCK() mtx_unlock(&ald_mtx) 105 106 /* Daemon functions */ 107 static int ald_add(struct alq *); 108 static int ald_rem(struct alq *); 109 static void ald_startup(void *); 110 static void ald_daemon(void); 111 static void ald_shutdown(void *, int); 112 static void ald_activate(struct alq *); 113 static void ald_deactivate(struct alq *); 114 115 /* Internal queue functions */ 116 static void alq_shutdown(struct alq *); 117 static void alq_destroy(struct alq *); 118 static int alq_doio(struct alq *); 119 120 121 /* 122 * Add a new queue to the global list. Fail if we're shutting down. 123 */ 124 static int 125 ald_add(struct alq *alq) 126 { 127 int error; 128 129 error = 0; 130 131 ALD_LOCK(); 132 if (ald_shutingdown) { 133 error = EBUSY; 134 goto done; 135 } 136 LIST_INSERT_HEAD(&ald_queues, alq, aq_link); 137 done: 138 ALD_UNLOCK(); 139 return (error); 140 } 141 142 /* 143 * Remove a queue from the global list unless we're shutting down. If so, 144 * the ald will take care of cleaning up it's resources. 145 */ 146 static int 147 ald_rem(struct alq *alq) 148 { 149 int error; 150 151 error = 0; 152 153 ALD_LOCK(); 154 if (ald_shutingdown) { 155 error = EBUSY; 156 goto done; 157 } 158 LIST_REMOVE(alq, aq_link); 159 done: 160 ALD_UNLOCK(); 161 return (error); 162 } 163 164 /* 165 * Put a queue on the active list. This will schedule it for writing. 166 */ 167 static void 168 ald_activate(struct alq *alq) 169 { 170 LIST_INSERT_HEAD(&ald_active, alq, aq_act); 171 wakeup(&ald_active); 172 } 173 174 static void 175 ald_deactivate(struct alq *alq) 176 { 177 LIST_REMOVE(alq, aq_act); 178 alq->aq_flags &= ~AQ_ACTIVE; 179 } 180 181 static void 182 ald_startup(void *unused) 183 { 184 mtx_init(&ald_mtx, "ALDmtx", NULL, MTX_DEF|MTX_QUIET); 185 LIST_INIT(&ald_queues); 186 LIST_INIT(&ald_active); 187 } 188 189 static void 190 ald_daemon(void) 191 { 192 int needwakeup; 193 struct alq *alq; 194 195 ald_thread = FIRST_THREAD_IN_PROC(ald_proc); 196 197 EVENTHANDLER_REGISTER(shutdown_pre_sync, ald_shutdown, NULL, 198 SHUTDOWN_PRI_FIRST); 199 200 ALD_LOCK(); 201 202 for (;;) { 203 while ((alq = LIST_FIRST(&ald_active)) == NULL && 204 !ald_shutingdown) 205 mtx_sleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0); 206 207 /* Don't shutdown until all active ALQs are flushed. */ 208 if (ald_shutingdown && alq == NULL) { 209 ALD_UNLOCK(); 210 break; 211 } 212 213 ALQ_LOCK(alq); 214 ald_deactivate(alq); 215 ALD_UNLOCK(); 216 needwakeup = alq_doio(alq); 217 ALQ_UNLOCK(alq); 218 if (needwakeup) 219 wakeup_one(alq); 220 ALD_LOCK(); 221 } 222 223 kproc_exit(0); 224 } 225 226 static void 227 ald_shutdown(void *arg, int howto) 228 { 229 struct alq *alq; 230 231 ALD_LOCK(); 232 233 /* Ensure no new queues can be created. */ 234 ald_shutingdown = 1; 235 236 /* Shutdown all ALQs prior to terminating the ald_daemon. */ 237 while ((alq = LIST_FIRST(&ald_queues)) != NULL) { 238 LIST_REMOVE(alq, aq_link); 239 ALD_UNLOCK(); 240 alq_shutdown(alq); 241 ALD_LOCK(); 242 } 243 244 /* At this point, all ALQs are flushed and shutdown. */ 245 246 /* 247 * Wake ald_daemon so that it exits. It won't be able to do 248 * anything until we mtx_sleep because we hold the ald_mtx. 249 */ 250 wakeup(&ald_active); 251 252 /* Wait for ald_daemon to exit. */ 253 mtx_sleep(ald_proc, &ald_mtx, PWAIT, "aldslp", 0); 254 255 ALD_UNLOCK(); 256 } 257 258 static void 259 alq_shutdown(struct alq *alq) 260 { 261 ALQ_LOCK(alq); 262 263 /* Stop any new writers. */ 264 alq->aq_flags |= AQ_SHUTDOWN; 265 266 /* 267 * If the ALQ isn't active but has unwritten data (possible if 268 * the ALQ_NOACTIVATE flag has been used), explicitly activate the 269 * ALQ here so that the pending data gets flushed by the ald_daemon. 270 */ 271 if (!(alq->aq_flags & AQ_ACTIVE) && HAS_PENDING_DATA(alq)) { 272 alq->aq_flags |= AQ_ACTIVE; 273 ALQ_UNLOCK(alq); 274 ALD_LOCK(); 275 ald_activate(alq); 276 ALD_UNLOCK(); 277 ALQ_LOCK(alq); 278 } 279 280 /* Drain IO */ 281 while (alq->aq_flags & AQ_ACTIVE) { 282 alq->aq_flags |= AQ_WANTED; 283 msleep_spin(alq, &alq->aq_mtx, "aldclose", 0); 284 } 285 ALQ_UNLOCK(alq); 286 287 vn_close(alq->aq_vp, FWRITE, alq->aq_cred, 288 curthread); 289 crfree(alq->aq_cred); 290 } 291 292 void 293 alq_destroy(struct alq *alq) 294 { 295 /* Drain all pending IO. */ 296 alq_shutdown(alq); 297 298 mtx_destroy(&alq->aq_mtx); 299 free(alq->aq_entbuf, M_ALD); 300 free(alq, M_ALD); 301 } 302 303 /* 304 * Flush all pending data to disk. This operation will block. 305 */ 306 static int 307 alq_doio(struct alq *alq) 308 { 309 struct thread *td; 310 struct mount *mp; 311 struct vnode *vp; 312 struct uio auio; 313 struct iovec aiov[2]; 314 int totlen; 315 int iov; 316 int vfslocked; 317 int wrapearly; 318 319 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__)); 320 321 vp = alq->aq_vp; 322 td = curthread; 323 totlen = 0; 324 iov = 1; 325 wrapearly = alq->aq_wrapearly; 326 327 bzero(&aiov, sizeof(aiov)); 328 bzero(&auio, sizeof(auio)); 329 330 /* Start the write from the location of our buffer tail pointer. */ 331 aiov[0].iov_base = alq->aq_entbuf + alq->aq_writetail; 332 333 if (alq->aq_writetail < alq->aq_writehead) { 334 /* Buffer not wrapped. */ 335 totlen = aiov[0].iov_len = alq->aq_writehead - alq->aq_writetail; 336 } else if (alq->aq_writehead == 0) { 337 /* Buffer not wrapped (special case to avoid an empty iov). */ 338 totlen = aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail - 339 wrapearly; 340 } else { 341 /* 342 * Buffer wrapped, requires 2 aiov entries: 343 * - first is from writetail to end of buffer 344 * - second is from start of buffer to writehead 345 */ 346 aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail - 347 wrapearly; 348 iov++; 349 aiov[1].iov_base = alq->aq_entbuf; 350 aiov[1].iov_len = alq->aq_writehead; 351 totlen = aiov[0].iov_len + aiov[1].iov_len; 352 } 353 354 alq->aq_flags |= AQ_FLUSHING; 355 ALQ_UNLOCK(alq); 356 357 auio.uio_iov = &aiov[0]; 358 auio.uio_offset = 0; 359 auio.uio_segflg = UIO_SYSSPACE; 360 auio.uio_rw = UIO_WRITE; 361 auio.uio_iovcnt = iov; 362 auio.uio_resid = totlen; 363 auio.uio_td = td; 364 365 /* 366 * Do all of the junk required to write now. 367 */ 368 vfslocked = VFS_LOCK_GIANT(vp->v_mount); 369 vn_start_write(vp, &mp, V_WAIT); 370 vn_lock(vp, LK_EXCLUSIVE | LK_RETRY); 371 /* 372 * XXX: VOP_WRITE error checks are ignored. 373 */ 374 #ifdef MAC 375 if (mac_vnode_check_write(alq->aq_cred, NOCRED, vp) == 0) 376 #endif 377 VOP_WRITE(vp, &auio, IO_UNIT | IO_APPEND, alq->aq_cred); 378 VOP_UNLOCK(vp, 0); 379 vn_finished_write(mp); 380 VFS_UNLOCK_GIANT(vfslocked); 381 382 ALQ_LOCK(alq); 383 alq->aq_flags &= ~AQ_FLUSHING; 384 385 /* Adjust writetail as required, taking into account wrapping. */ 386 alq->aq_writetail = (alq->aq_writetail + totlen + wrapearly) % 387 alq->aq_buflen; 388 alq->aq_freebytes += totlen + wrapearly; 389 390 /* 391 * If we just flushed part of the buffer which wrapped, reset the 392 * wrapearly indicator. 393 */ 394 if (wrapearly) 395 alq->aq_wrapearly = 0; 396 397 /* 398 * If we just flushed the buffer completely, reset indexes to 0 to 399 * minimise buffer wraps. 400 * This is also required to ensure alq_getn() can't wedge itself. 401 */ 402 if (!HAS_PENDING_DATA(alq)) 403 alq->aq_writehead = alq->aq_writetail = 0; 404 405 KASSERT((alq->aq_writetail >= 0 && alq->aq_writetail < alq->aq_buflen), 406 ("%s: aq_writetail < 0 || aq_writetail >= aq_buflen", __func__)); 407 408 if (alq->aq_flags & AQ_WANTED) { 409 alq->aq_flags &= ~AQ_WANTED; 410 return (1); 411 } 412 413 return(0); 414 } 415 416 static struct kproc_desc ald_kp = { 417 "ALQ Daemon", 418 ald_daemon, 419 &ald_proc 420 }; 421 422 SYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, SI_ORDER_ANY, kproc_start, &ald_kp); 423 SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, ald_startup, NULL); 424 425 426 /* User visible queue functions */ 427 428 /* 429 * Create the queue data structure, allocate the buffer, and open the file. 430 */ 431 432 int 433 alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode, 434 int size, int flags) 435 { 436 struct thread *td; 437 struct nameidata nd; 438 struct alq *alq; 439 int oflags; 440 int error; 441 int vfslocked; 442 443 KASSERT((size > 0), ("%s: size <= 0", __func__)); 444 445 *alqp = NULL; 446 td = curthread; 447 448 NDINIT(&nd, LOOKUP, NOFOLLOW | MPSAFE, UIO_SYSSPACE, file, td); 449 oflags = FWRITE | O_NOFOLLOW | O_CREAT; 450 451 error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL); 452 if (error) 453 return (error); 454 455 vfslocked = NDHASGIANT(&nd); 456 NDFREE(&nd, NDF_ONLY_PNBUF); 457 /* We just unlock so we hold a reference */ 458 VOP_UNLOCK(nd.ni_vp, 0); 459 VFS_UNLOCK_GIANT(vfslocked); 460 461 alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO); 462 alq->aq_vp = nd.ni_vp; 463 alq->aq_cred = crhold(cred); 464 465 mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET); 466 467 alq->aq_buflen = size; 468 alq->aq_entmax = 0; 469 alq->aq_entlen = 0; 470 471 alq->aq_freebytes = alq->aq_buflen; 472 alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO); 473 alq->aq_writehead = alq->aq_writetail = 0; 474 if (flags & ALQ_ORDERED) 475 alq->aq_flags |= AQ_ORDERED; 476 477 if ((error = ald_add(alq)) != 0) { 478 alq_destroy(alq); 479 return (error); 480 } 481 482 *alqp = alq; 483 484 return (0); 485 } 486 487 int 488 alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode, 489 int size, int count) 490 { 491 int ret; 492 493 KASSERT((count >= 0), ("%s: count < 0", __func__)); 494 495 if (count > 0) { 496 ret = alq_open_flags(alqp, file, cred, cmode, size*count, 0); 497 (*alqp)->aq_flags |= AQ_LEGACY; 498 (*alqp)->aq_entmax = count; 499 (*alqp)->aq_entlen = size; 500 } else 501 ret = alq_open_flags(alqp, file, cred, cmode, size, 0); 502 503 return (ret); 504 } 505 506 507 /* 508 * Copy a new entry into the queue. If the operation would block either 509 * wait or return an error depending on the value of waitok. 510 */ 511 int 512 alq_writen(struct alq *alq, void *data, int len, int flags) 513 { 514 int activate, copy, ret; 515 void *waitchan; 516 517 KASSERT((len > 0 && len <= alq->aq_buflen), 518 ("%s: len <= 0 || len > aq_buflen", __func__)); 519 520 activate = ret = 0; 521 copy = len; 522 waitchan = NULL; 523 524 ALQ_LOCK(alq); 525 526 /* 527 * Fail to perform the write and return EWOULDBLOCK if: 528 * - The message is larger than our underlying buffer. 529 * - The ALQ is being shutdown. 530 * - There is insufficient free space in our underlying buffer 531 * to accept the message and the user can't wait for space. 532 * - There is insufficient free space in our underlying buffer 533 * to accept the message and the alq is inactive due to prior 534 * use of the ALQ_NOACTIVATE flag (which would lead to deadlock). 535 */ 536 if (len > alq->aq_buflen || 537 alq->aq_flags & AQ_SHUTDOWN || 538 (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) && 539 HAS_PENDING_DATA(alq))) && alq->aq_freebytes < len)) { 540 ALQ_UNLOCK(alq); 541 return (EWOULDBLOCK); 542 } 543 544 /* 545 * If we want ordered writes and there is already at least one thread 546 * waiting for resources to become available, sleep until we're woken. 547 */ 548 if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) { 549 KASSERT(!(flags & ALQ_NOWAIT), 550 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 551 alq->aq_waiters++; 552 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwnord", 0); 553 alq->aq_waiters--; 554 } 555 556 /* 557 * (ALQ_WAITOK && aq_freebytes < len) or aq_freebytes >= len, either 558 * enter while loop and sleep until we have enough free bytes (former) 559 * or skip (latter). If AQ_ORDERED is set, only 1 thread at a time will 560 * be in this loop. Otherwise, multiple threads may be sleeping here 561 * competing for ALQ resources. 562 */ 563 while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) { 564 KASSERT(!(flags & ALQ_NOWAIT), 565 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 566 alq->aq_flags |= AQ_WANTED; 567 alq->aq_waiters++; 568 if (waitchan) 569 wakeup(waitchan); 570 msleep_spin(alq, &alq->aq_mtx, "alqwnres", 0); 571 alq->aq_waiters--; 572 573 /* 574 * If we're the first thread to wake after an AQ_WANTED wakeup 575 * but there isn't enough free space for us, we're going to loop 576 * and sleep again. If there are other threads waiting in this 577 * loop, schedule a wakeup so that they can see if the space 578 * they require is available. 579 */ 580 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) && 581 alq->aq_freebytes < len && !(alq->aq_flags & AQ_WANTED)) 582 waitchan = alq; 583 else 584 waitchan = NULL; 585 } 586 587 /* 588 * If there are waiters, we need to signal the waiting threads after we 589 * complete our work. The alq ptr is used as a wait channel for threads 590 * requiring resources to be freed up. In the AQ_ORDERED case, threads 591 * are not allowed to concurrently compete for resources in the above 592 * while loop, so we use a different wait channel in this case. 593 */ 594 if (alq->aq_waiters > 0) { 595 if (alq->aq_flags & AQ_ORDERED) 596 waitchan = &alq->aq_waiters; 597 else 598 waitchan = alq; 599 } else 600 waitchan = NULL; 601 602 /* Bail if we're shutting down. */ 603 if (alq->aq_flags & AQ_SHUTDOWN) { 604 ret = EWOULDBLOCK; 605 goto unlock; 606 } 607 608 /* 609 * If we need to wrap the buffer to accommodate the write, 610 * we'll need 2 calls to bcopy. 611 */ 612 if ((alq->aq_buflen - alq->aq_writehead) < len) 613 copy = alq->aq_buflen - alq->aq_writehead; 614 615 /* Copy message (or part thereof if wrap required) to the buffer. */ 616 bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy); 617 alq->aq_writehead += copy; 618 619 if (alq->aq_writehead >= alq->aq_buflen) { 620 KASSERT((alq->aq_writehead == alq->aq_buflen), 621 ("%s: alq->aq_writehead (%d) > alq->aq_buflen (%d)", 622 __func__, 623 alq->aq_writehead, 624 alq->aq_buflen)); 625 alq->aq_writehead = 0; 626 } 627 628 if (copy != len) { 629 /* 630 * Wrap the buffer by copying the remainder of our message 631 * to the start of the buffer and resetting aq_writehead. 632 */ 633 bcopy(((uint8_t *)data)+copy, alq->aq_entbuf, len - copy); 634 alq->aq_writehead = len - copy; 635 } 636 637 KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen), 638 ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__)); 639 640 alq->aq_freebytes -= len; 641 642 if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) { 643 alq->aq_flags |= AQ_ACTIVE; 644 activate = 1; 645 } 646 647 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__)); 648 649 unlock: 650 ALQ_UNLOCK(alq); 651 652 if (activate) { 653 ALD_LOCK(); 654 ald_activate(alq); 655 ALD_UNLOCK(); 656 } 657 658 /* NB: We rely on wakeup_one waking threads in a FIFO manner. */ 659 if (waitchan != NULL) 660 wakeup_one(waitchan); 661 662 return (ret); 663 } 664 665 int 666 alq_write(struct alq *alq, void *data, int flags) 667 { 668 /* Should only be called in fixed length message (legacy) mode. */ 669 KASSERT((alq->aq_flags & AQ_LEGACY), 670 ("%s: fixed length write on variable length queue", __func__)); 671 return (alq_writen(alq, data, alq->aq_entlen, flags)); 672 } 673 674 /* 675 * Retrieve a pointer for the ALQ to write directly into, avoiding bcopy. 676 */ 677 struct ale * 678 alq_getn(struct alq *alq, int len, int flags) 679 { 680 int contigbytes; 681 void *waitchan; 682 683 KASSERT((len > 0 && len <= alq->aq_buflen), 684 ("%s: len <= 0 || len > alq->aq_buflen", __func__)); 685 686 waitchan = NULL; 687 688 ALQ_LOCK(alq); 689 690 /* 691 * Determine the number of free contiguous bytes. 692 * We ensure elsewhere that if aq_writehead == aq_writetail because 693 * the buffer is empty, they will both be set to 0 and therefore 694 * aq_freebytes == aq_buflen and is fully contiguous. 695 * If they are equal and the buffer is not empty, aq_freebytes will 696 * be 0 indicating the buffer is full. 697 */ 698 if (alq->aq_writehead <= alq->aq_writetail) 699 contigbytes = alq->aq_freebytes; 700 else { 701 contigbytes = alq->aq_buflen - alq->aq_writehead; 702 703 if (contigbytes < len) { 704 /* 705 * Insufficient space at end of buffer to handle a 706 * contiguous write. Wrap early if there's space at 707 * the beginning. This will leave a hole at the end 708 * of the buffer which we will have to skip over when 709 * flushing the buffer to disk. 710 */ 711 if (alq->aq_writetail >= len || flags & ALQ_WAITOK) { 712 /* Keep track of # bytes left blank. */ 713 alq->aq_wrapearly = contigbytes; 714 /* Do the wrap and adjust counters. */ 715 contigbytes = alq->aq_freebytes = 716 alq->aq_writetail; 717 alq->aq_writehead = 0; 718 } 719 } 720 } 721 722 /* 723 * Return a NULL ALE if: 724 * - The message is larger than our underlying buffer. 725 * - The ALQ is being shutdown. 726 * - There is insufficient free space in our underlying buffer 727 * to accept the message and the user can't wait for space. 728 * - There is insufficient free space in our underlying buffer 729 * to accept the message and the alq is inactive due to prior 730 * use of the ALQ_NOACTIVATE flag (which would lead to deadlock). 731 */ 732 if (len > alq->aq_buflen || 733 alq->aq_flags & AQ_SHUTDOWN || 734 (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) && 735 HAS_PENDING_DATA(alq))) && contigbytes < len)) { 736 ALQ_UNLOCK(alq); 737 return (NULL); 738 } 739 740 /* 741 * If we want ordered writes and there is already at least one thread 742 * waiting for resources to become available, sleep until we're woken. 743 */ 744 if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) { 745 KASSERT(!(flags & ALQ_NOWAIT), 746 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 747 alq->aq_waiters++; 748 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqgnord", 0); 749 alq->aq_waiters--; 750 } 751 752 /* 753 * (ALQ_WAITOK && contigbytes < len) or contigbytes >= len, either enter 754 * while loop and sleep until we have enough contiguous free bytes 755 * (former) or skip (latter). If AQ_ORDERED is set, only 1 thread at a 756 * time will be in this loop. Otherwise, multiple threads may be 757 * sleeping here competing for ALQ resources. 758 */ 759 while (contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) { 760 KASSERT(!(flags & ALQ_NOWAIT), 761 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 762 alq->aq_flags |= AQ_WANTED; 763 alq->aq_waiters++; 764 if (waitchan) 765 wakeup(waitchan); 766 msleep_spin(alq, &alq->aq_mtx, "alqgnres", 0); 767 alq->aq_waiters--; 768 769 if (alq->aq_writehead <= alq->aq_writetail) 770 contigbytes = alq->aq_freebytes; 771 else 772 contigbytes = alq->aq_buflen - alq->aq_writehead; 773 774 /* 775 * If we're the first thread to wake after an AQ_WANTED wakeup 776 * but there isn't enough free space for us, we're going to loop 777 * and sleep again. If there are other threads waiting in this 778 * loop, schedule a wakeup so that they can see if the space 779 * they require is available. 780 */ 781 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) && 782 contigbytes < len && !(alq->aq_flags & AQ_WANTED)) 783 waitchan = alq; 784 else 785 waitchan = NULL; 786 } 787 788 /* 789 * If there are waiters, we need to signal the waiting threads after we 790 * complete our work. The alq ptr is used as a wait channel for threads 791 * requiring resources to be freed up. In the AQ_ORDERED case, threads 792 * are not allowed to concurrently compete for resources in the above 793 * while loop, so we use a different wait channel in this case. 794 */ 795 if (alq->aq_waiters > 0) { 796 if (alq->aq_flags & AQ_ORDERED) 797 waitchan = &alq->aq_waiters; 798 else 799 waitchan = alq; 800 } else 801 waitchan = NULL; 802 803 /* Bail if we're shutting down. */ 804 if (alq->aq_flags & AQ_SHUTDOWN) { 805 ALQ_UNLOCK(alq); 806 if (waitchan != NULL) 807 wakeup_one(waitchan); 808 return (NULL); 809 } 810 811 /* 812 * If we are here, we have a contiguous number of bytes >= len 813 * available in our buffer starting at aq_writehead. 814 */ 815 alq->aq_getpost.ae_data = alq->aq_entbuf + alq->aq_writehead; 816 alq->aq_getpost.ae_bytesused = len; 817 818 return (&alq->aq_getpost); 819 } 820 821 struct ale * 822 alq_get(struct alq *alq, int flags) 823 { 824 /* Should only be called in fixed length message (legacy) mode. */ 825 KASSERT((alq->aq_flags & AQ_LEGACY), 826 ("%s: fixed length get on variable length queue", __func__)); 827 return (alq_getn(alq, alq->aq_entlen, flags)); 828 } 829 830 void 831 alq_post_flags(struct alq *alq, struct ale *ale, int flags) 832 { 833 int activate; 834 void *waitchan; 835 836 activate = 0; 837 838 if (ale->ae_bytesused > 0) { 839 if (!(alq->aq_flags & AQ_ACTIVE) && 840 !(flags & ALQ_NOACTIVATE)) { 841 alq->aq_flags |= AQ_ACTIVE; 842 activate = 1; 843 } 844 845 alq->aq_writehead += ale->ae_bytesused; 846 alq->aq_freebytes -= ale->ae_bytesused; 847 848 /* Wrap aq_writehead if we filled to the end of the buffer. */ 849 if (alq->aq_writehead == alq->aq_buflen) 850 alq->aq_writehead = 0; 851 852 KASSERT((alq->aq_writehead >= 0 && 853 alq->aq_writehead < alq->aq_buflen), 854 ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", 855 __func__)); 856 857 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__)); 858 } 859 860 /* 861 * If there are waiters, we need to signal the waiting threads after we 862 * complete our work. The alq ptr is used as a wait channel for threads 863 * requiring resources to be freed up. In the AQ_ORDERED case, threads 864 * are not allowed to concurrently compete for resources in the 865 * alq_getn() while loop, so we use a different wait channel in this case. 866 */ 867 if (alq->aq_waiters > 0) { 868 if (alq->aq_flags & AQ_ORDERED) 869 waitchan = &alq->aq_waiters; 870 else 871 waitchan = alq; 872 } else 873 waitchan = NULL; 874 875 ALQ_UNLOCK(alq); 876 877 if (activate) { 878 ALD_LOCK(); 879 ald_activate(alq); 880 ALD_UNLOCK(); 881 } 882 883 /* NB: We rely on wakeup_one waking threads in a FIFO manner. */ 884 if (waitchan != NULL) 885 wakeup_one(waitchan); 886 } 887 888 void 889 alq_flush(struct alq *alq) 890 { 891 int needwakeup = 0; 892 893 ALD_LOCK(); 894 ALQ_LOCK(alq); 895 896 /* 897 * Pull the lever iff there is data to flush and we're 898 * not already in the middle of a flush operation. 899 */ 900 if (HAS_PENDING_DATA(alq) && !(alq->aq_flags & AQ_FLUSHING)) { 901 if (alq->aq_flags & AQ_ACTIVE) 902 ald_deactivate(alq); 903 904 ALD_UNLOCK(); 905 needwakeup = alq_doio(alq); 906 } else 907 ALD_UNLOCK(); 908 909 ALQ_UNLOCK(alq); 910 911 if (needwakeup) 912 wakeup_one(alq); 913 } 914 915 /* 916 * Flush remaining data, close the file and free all resources. 917 */ 918 void 919 alq_close(struct alq *alq) 920 { 921 /* Only flush and destroy alq if not already shutting down. */ 922 if (ald_rem(alq) == 0) 923 alq_destroy(alq); 924 } 925 926 static int 927 alq_load_handler(module_t mod, int what, void *arg) 928 { 929 int ret; 930 931 ret = 0; 932 933 switch (what) { 934 case MOD_LOAD: 935 case MOD_SHUTDOWN: 936 break; 937 938 case MOD_QUIESCE: 939 ALD_LOCK(); 940 /* Only allow unload if there are no open queues. */ 941 if (LIST_FIRST(&ald_queues) == NULL) { 942 ald_shutingdown = 1; 943 ALD_UNLOCK(); 944 ald_shutdown(NULL, 0); 945 mtx_destroy(&ald_mtx); 946 } else { 947 ALD_UNLOCK(); 948 ret = EBUSY; 949 } 950 break; 951 952 case MOD_UNLOAD: 953 /* If MOD_QUIESCE failed we must fail here too. */ 954 if (ald_shutingdown == 0) 955 ret = EBUSY; 956 break; 957 958 default: 959 ret = EINVAL; 960 break; 961 } 962 963 return (ret); 964 } 965 966 static moduledata_t alq_mod = 967 { 968 "alq", 969 alq_load_handler, 970 NULL 971 }; 972 973 DECLARE_MODULE(alq, alq_mod, SI_SUB_SMP, SI_ORDER_ANY); 974 MODULE_VERSION(alq, 1); 975