1 /*- 2 * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org> 3 * Copyright (c) 2008-2009, Lawrence Stewart <lstewart@freebsd.org> 4 * Copyright (c) 2009-2010, The FreeBSD Foundation 5 * All rights reserved. 6 * 7 * Portions of this software were developed at the Centre for Advanced 8 * Internet Architectures, Swinburne University of Technology, Melbourne, 9 * Australia by Lawrence Stewart under sponsorship from the FreeBSD Foundation. 10 * 11 * Redistribution and use in source and binary forms, with or without 12 * modification, are permitted provided that the following conditions 13 * are met: 14 * 1. Redistributions of source code must retain the above copyright 15 * notice unmodified, this list of conditions, and the following 16 * disclaimer. 17 * 2. Redistributions in binary form must reproduce the above copyright 18 * notice, this list of conditions and the following disclaimer in the 19 * documentation and/or other materials provided with the distribution. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 22 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 23 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 24 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 25 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 26 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 30 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 #include <sys/cdefs.h> 34 __FBSDID("$FreeBSD$"); 35 36 #include "opt_mac.h" 37 38 #include <sys/param.h> 39 #include <sys/systm.h> 40 #include <sys/kernel.h> 41 #include <sys/kthread.h> 42 #include <sys/lock.h> 43 #include <sys/mount.h> 44 #include <sys/mutex.h> 45 #include <sys/namei.h> 46 #include <sys/proc.h> 47 #include <sys/vnode.h> 48 #include <sys/alq.h> 49 #include <sys/malloc.h> 50 #include <sys/unistd.h> 51 #include <sys/fcntl.h> 52 #include <sys/eventhandler.h> 53 54 #include <security/mac/mac_framework.h> 55 56 /* Async. Logging Queue */ 57 struct alq { 58 char *aq_entbuf; /* Buffer for stored entries */ 59 int aq_entmax; /* Max entries */ 60 int aq_entlen; /* Entry length */ 61 int aq_freebytes; /* Bytes available in buffer */ 62 int aq_buflen; /* Total length of our buffer */ 63 int aq_writehead; /* Location for next write */ 64 int aq_writetail; /* Flush starts at this location */ 65 int aq_wrapearly; /* # bytes left blank at end of buf */ 66 int aq_flags; /* Queue flags */ 67 int aq_waiters; /* Num threads waiting for resources 68 * NB: Used as a wait channel so must 69 * not be first field in the alq struct 70 */ 71 struct ale aq_getpost; /* ALE for use by get/post */ 72 struct mtx aq_mtx; /* Queue lock */ 73 struct vnode *aq_vp; /* Open vnode handle */ 74 struct ucred *aq_cred; /* Credentials of the opening thread */ 75 LIST_ENTRY(alq) aq_act; /* List of active queues */ 76 LIST_ENTRY(alq) aq_link; /* List of all queues */ 77 }; 78 79 #define AQ_WANTED 0x0001 /* Wakeup sleeper when io is done */ 80 #define AQ_ACTIVE 0x0002 /* on the active list */ 81 #define AQ_FLUSHING 0x0004 /* doing IO */ 82 #define AQ_SHUTDOWN 0x0008 /* Queue no longer valid */ 83 #define AQ_ORDERED 0x0010 /* Queue enforces ordered writes */ 84 #define AQ_LEGACY 0x0020 /* Legacy queue (fixed length writes) */ 85 86 #define ALQ_LOCK(alq) mtx_lock_spin(&(alq)->aq_mtx) 87 #define ALQ_UNLOCK(alq) mtx_unlock_spin(&(alq)->aq_mtx) 88 89 #define HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen) 90 91 static MALLOC_DEFINE(M_ALD, "ALD", "ALD"); 92 93 /* 94 * The ald_mtx protects the ald_queues list and the ald_active list. 95 */ 96 static struct mtx ald_mtx; 97 static LIST_HEAD(, alq) ald_queues; 98 static LIST_HEAD(, alq) ald_active; 99 static int ald_shutingdown = 0; 100 struct thread *ald_thread; 101 static struct proc *ald_proc; 102 static eventhandler_tag alq_eventhandler_tag = NULL; 103 104 #define ALD_LOCK() mtx_lock(&ald_mtx) 105 #define ALD_UNLOCK() mtx_unlock(&ald_mtx) 106 107 /* Daemon functions */ 108 static int ald_add(struct alq *); 109 static int ald_rem(struct alq *); 110 static void ald_startup(void *); 111 static void ald_daemon(void); 112 static void ald_shutdown(void *, int); 113 static void ald_activate(struct alq *); 114 static void ald_deactivate(struct alq *); 115 116 /* Internal queue functions */ 117 static void alq_shutdown(struct alq *); 118 static void alq_destroy(struct alq *); 119 static int alq_doio(struct alq *); 120 121 122 /* 123 * Add a new queue to the global list. Fail if we're shutting down. 124 */ 125 static int 126 ald_add(struct alq *alq) 127 { 128 int error; 129 130 error = 0; 131 132 ALD_LOCK(); 133 if (ald_shutingdown) { 134 error = EBUSY; 135 goto done; 136 } 137 LIST_INSERT_HEAD(&ald_queues, alq, aq_link); 138 done: 139 ALD_UNLOCK(); 140 return (error); 141 } 142 143 /* 144 * Remove a queue from the global list unless we're shutting down. If so, 145 * the ald will take care of cleaning up it's resources. 146 */ 147 static int 148 ald_rem(struct alq *alq) 149 { 150 int error; 151 152 error = 0; 153 154 ALD_LOCK(); 155 if (ald_shutingdown) { 156 error = EBUSY; 157 goto done; 158 } 159 LIST_REMOVE(alq, aq_link); 160 done: 161 ALD_UNLOCK(); 162 return (error); 163 } 164 165 /* 166 * Put a queue on the active list. This will schedule it for writing. 167 */ 168 static void 169 ald_activate(struct alq *alq) 170 { 171 LIST_INSERT_HEAD(&ald_active, alq, aq_act); 172 wakeup(&ald_active); 173 } 174 175 static void 176 ald_deactivate(struct alq *alq) 177 { 178 LIST_REMOVE(alq, aq_act); 179 alq->aq_flags &= ~AQ_ACTIVE; 180 } 181 182 static void 183 ald_startup(void *unused) 184 { 185 mtx_init(&ald_mtx, "ALDmtx", NULL, MTX_DEF|MTX_QUIET); 186 LIST_INIT(&ald_queues); 187 LIST_INIT(&ald_active); 188 } 189 190 static void 191 ald_daemon(void) 192 { 193 int needwakeup; 194 struct alq *alq; 195 196 ald_thread = FIRST_THREAD_IN_PROC(ald_proc); 197 198 alq_eventhandler_tag = EVENTHANDLER_REGISTER(shutdown_pre_sync, 199 ald_shutdown, NULL, SHUTDOWN_PRI_FIRST); 200 201 ALD_LOCK(); 202 203 for (;;) { 204 while ((alq = LIST_FIRST(&ald_active)) == NULL && 205 !ald_shutingdown) 206 mtx_sleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0); 207 208 /* Don't shutdown until all active ALQs are flushed. */ 209 if (ald_shutingdown && alq == NULL) { 210 ALD_UNLOCK(); 211 break; 212 } 213 214 ALQ_LOCK(alq); 215 ald_deactivate(alq); 216 ALD_UNLOCK(); 217 needwakeup = alq_doio(alq); 218 ALQ_UNLOCK(alq); 219 if (needwakeup) 220 wakeup_one(alq); 221 ALD_LOCK(); 222 } 223 224 kproc_exit(0); 225 } 226 227 static void 228 ald_shutdown(void *arg, int howto) 229 { 230 struct alq *alq; 231 232 ALD_LOCK(); 233 234 /* Ensure no new queues can be created. */ 235 ald_shutingdown = 1; 236 237 /* Shutdown all ALQs prior to terminating the ald_daemon. */ 238 while ((alq = LIST_FIRST(&ald_queues)) != NULL) { 239 LIST_REMOVE(alq, aq_link); 240 ALD_UNLOCK(); 241 alq_shutdown(alq); 242 ALD_LOCK(); 243 } 244 245 /* At this point, all ALQs are flushed and shutdown. */ 246 247 /* 248 * Wake ald_daemon so that it exits. It won't be able to do 249 * anything until we mtx_sleep because we hold the ald_mtx. 250 */ 251 wakeup(&ald_active); 252 253 /* Wait for ald_daemon to exit. */ 254 mtx_sleep(ald_proc, &ald_mtx, PWAIT, "aldslp", 0); 255 256 ALD_UNLOCK(); 257 } 258 259 static void 260 alq_shutdown(struct alq *alq) 261 { 262 ALQ_LOCK(alq); 263 264 /* Stop any new writers. */ 265 alq->aq_flags |= AQ_SHUTDOWN; 266 267 /* 268 * If the ALQ isn't active but has unwritten data (possible if 269 * the ALQ_NOACTIVATE flag has been used), explicitly activate the 270 * ALQ here so that the pending data gets flushed by the ald_daemon. 271 */ 272 if (!(alq->aq_flags & AQ_ACTIVE) && HAS_PENDING_DATA(alq)) { 273 alq->aq_flags |= AQ_ACTIVE; 274 ALQ_UNLOCK(alq); 275 ALD_LOCK(); 276 ald_activate(alq); 277 ALD_UNLOCK(); 278 ALQ_LOCK(alq); 279 } 280 281 /* Drain IO */ 282 while (alq->aq_flags & AQ_ACTIVE) { 283 alq->aq_flags |= AQ_WANTED; 284 msleep_spin(alq, &alq->aq_mtx, "aldclose", 0); 285 } 286 ALQ_UNLOCK(alq); 287 288 vn_close(alq->aq_vp, FWRITE, alq->aq_cred, 289 curthread); 290 crfree(alq->aq_cred); 291 } 292 293 void 294 alq_destroy(struct alq *alq) 295 { 296 /* Drain all pending IO. */ 297 alq_shutdown(alq); 298 299 mtx_destroy(&alq->aq_mtx); 300 free(alq->aq_entbuf, M_ALD); 301 free(alq, M_ALD); 302 } 303 304 /* 305 * Flush all pending data to disk. This operation will block. 306 */ 307 static int 308 alq_doio(struct alq *alq) 309 { 310 struct thread *td; 311 struct mount *mp; 312 struct vnode *vp; 313 struct uio auio; 314 struct iovec aiov[2]; 315 int totlen; 316 int iov; 317 int wrapearly; 318 319 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__)); 320 321 vp = alq->aq_vp; 322 td = curthread; 323 totlen = 0; 324 iov = 1; 325 wrapearly = alq->aq_wrapearly; 326 327 bzero(&aiov, sizeof(aiov)); 328 bzero(&auio, sizeof(auio)); 329 330 /* Start the write from the location of our buffer tail pointer. */ 331 aiov[0].iov_base = alq->aq_entbuf + alq->aq_writetail; 332 333 if (alq->aq_writetail < alq->aq_writehead) { 334 /* Buffer not wrapped. */ 335 totlen = aiov[0].iov_len = alq->aq_writehead - alq->aq_writetail; 336 } else if (alq->aq_writehead == 0) { 337 /* Buffer not wrapped (special case to avoid an empty iov). */ 338 totlen = aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail - 339 wrapearly; 340 } else { 341 /* 342 * Buffer wrapped, requires 2 aiov entries: 343 * - first is from writetail to end of buffer 344 * - second is from start of buffer to writehead 345 */ 346 aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail - 347 wrapearly; 348 iov++; 349 aiov[1].iov_base = alq->aq_entbuf; 350 aiov[1].iov_len = alq->aq_writehead; 351 totlen = aiov[0].iov_len + aiov[1].iov_len; 352 } 353 354 alq->aq_flags |= AQ_FLUSHING; 355 ALQ_UNLOCK(alq); 356 357 auio.uio_iov = &aiov[0]; 358 auio.uio_offset = 0; 359 auio.uio_segflg = UIO_SYSSPACE; 360 auio.uio_rw = UIO_WRITE; 361 auio.uio_iovcnt = iov; 362 auio.uio_resid = totlen; 363 auio.uio_td = td; 364 365 /* 366 * Do all of the junk required to write now. 367 */ 368 vn_start_write(vp, &mp, V_WAIT); 369 vn_lock(vp, LK_EXCLUSIVE | LK_RETRY); 370 /* 371 * XXX: VOP_WRITE error checks are ignored. 372 */ 373 #ifdef MAC 374 if (mac_vnode_check_write(alq->aq_cred, NOCRED, vp) == 0) 375 #endif 376 VOP_WRITE(vp, &auio, IO_UNIT | IO_APPEND, alq->aq_cred); 377 VOP_UNLOCK(vp, 0); 378 vn_finished_write(mp); 379 380 ALQ_LOCK(alq); 381 alq->aq_flags &= ~AQ_FLUSHING; 382 383 /* Adjust writetail as required, taking into account wrapping. */ 384 alq->aq_writetail = (alq->aq_writetail + totlen + wrapearly) % 385 alq->aq_buflen; 386 alq->aq_freebytes += totlen + wrapearly; 387 388 /* 389 * If we just flushed part of the buffer which wrapped, reset the 390 * wrapearly indicator. 391 */ 392 if (wrapearly) 393 alq->aq_wrapearly = 0; 394 395 /* 396 * If we just flushed the buffer completely, reset indexes to 0 to 397 * minimise buffer wraps. 398 * This is also required to ensure alq_getn() can't wedge itself. 399 */ 400 if (!HAS_PENDING_DATA(alq)) 401 alq->aq_writehead = alq->aq_writetail = 0; 402 403 KASSERT((alq->aq_writetail >= 0 && alq->aq_writetail < alq->aq_buflen), 404 ("%s: aq_writetail < 0 || aq_writetail >= aq_buflen", __func__)); 405 406 if (alq->aq_flags & AQ_WANTED) { 407 alq->aq_flags &= ~AQ_WANTED; 408 return (1); 409 } 410 411 return(0); 412 } 413 414 static struct kproc_desc ald_kp = { 415 "ALQ Daemon", 416 ald_daemon, 417 &ald_proc 418 }; 419 420 SYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, SI_ORDER_ANY, kproc_start, &ald_kp); 421 SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, ald_startup, NULL); 422 423 424 /* User visible queue functions */ 425 426 /* 427 * Create the queue data structure, allocate the buffer, and open the file. 428 */ 429 430 int 431 alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode, 432 int size, int flags) 433 { 434 struct thread *td; 435 struct nameidata nd; 436 struct alq *alq; 437 int oflags; 438 int error; 439 440 KASSERT((size > 0), ("%s: size <= 0", __func__)); 441 442 *alqp = NULL; 443 td = curthread; 444 445 NDINIT(&nd, LOOKUP, NOFOLLOW, UIO_SYSSPACE, file, td); 446 oflags = FWRITE | O_NOFOLLOW | O_CREAT; 447 448 error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL); 449 if (error) 450 return (error); 451 452 NDFREE(&nd, NDF_ONLY_PNBUF); 453 /* We just unlock so we hold a reference */ 454 VOP_UNLOCK(nd.ni_vp, 0); 455 456 alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO); 457 alq->aq_vp = nd.ni_vp; 458 alq->aq_cred = crhold(cred); 459 460 mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET); 461 462 alq->aq_buflen = size; 463 alq->aq_entmax = 0; 464 alq->aq_entlen = 0; 465 466 alq->aq_freebytes = alq->aq_buflen; 467 alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO); 468 alq->aq_writehead = alq->aq_writetail = 0; 469 if (flags & ALQ_ORDERED) 470 alq->aq_flags |= AQ_ORDERED; 471 472 if ((error = ald_add(alq)) != 0) { 473 alq_destroy(alq); 474 return (error); 475 } 476 477 *alqp = alq; 478 479 return (0); 480 } 481 482 int 483 alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode, 484 int size, int count) 485 { 486 int ret; 487 488 KASSERT((count >= 0), ("%s: count < 0", __func__)); 489 490 if (count > 0) { 491 if ((ret = alq_open_flags(alqp, file, cred, cmode, 492 size*count, 0)) == 0) { 493 (*alqp)->aq_flags |= AQ_LEGACY; 494 (*alqp)->aq_entmax = count; 495 (*alqp)->aq_entlen = size; 496 } 497 } else 498 ret = alq_open_flags(alqp, file, cred, cmode, size, 0); 499 500 return (ret); 501 } 502 503 504 /* 505 * Copy a new entry into the queue. If the operation would block either 506 * wait or return an error depending on the value of waitok. 507 */ 508 int 509 alq_writen(struct alq *alq, void *data, int len, int flags) 510 { 511 int activate, copy, ret; 512 void *waitchan; 513 514 KASSERT((len > 0 && len <= alq->aq_buflen), 515 ("%s: len <= 0 || len > aq_buflen", __func__)); 516 517 activate = ret = 0; 518 copy = len; 519 waitchan = NULL; 520 521 ALQ_LOCK(alq); 522 523 /* 524 * Fail to perform the write and return EWOULDBLOCK if: 525 * - The message is larger than our underlying buffer. 526 * - The ALQ is being shutdown. 527 * - There is insufficient free space in our underlying buffer 528 * to accept the message and the user can't wait for space. 529 * - There is insufficient free space in our underlying buffer 530 * to accept the message and the alq is inactive due to prior 531 * use of the ALQ_NOACTIVATE flag (which would lead to deadlock). 532 */ 533 if (len > alq->aq_buflen || 534 alq->aq_flags & AQ_SHUTDOWN || 535 (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) && 536 HAS_PENDING_DATA(alq))) && alq->aq_freebytes < len)) { 537 ALQ_UNLOCK(alq); 538 return (EWOULDBLOCK); 539 } 540 541 /* 542 * If we want ordered writes and there is already at least one thread 543 * waiting for resources to become available, sleep until we're woken. 544 */ 545 if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) { 546 KASSERT(!(flags & ALQ_NOWAIT), 547 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 548 alq->aq_waiters++; 549 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwnord", 0); 550 alq->aq_waiters--; 551 } 552 553 /* 554 * (ALQ_WAITOK && aq_freebytes < len) or aq_freebytes >= len, either 555 * enter while loop and sleep until we have enough free bytes (former) 556 * or skip (latter). If AQ_ORDERED is set, only 1 thread at a time will 557 * be in this loop. Otherwise, multiple threads may be sleeping here 558 * competing for ALQ resources. 559 */ 560 while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) { 561 KASSERT(!(flags & ALQ_NOWAIT), 562 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 563 alq->aq_flags |= AQ_WANTED; 564 alq->aq_waiters++; 565 if (waitchan) 566 wakeup(waitchan); 567 msleep_spin(alq, &alq->aq_mtx, "alqwnres", 0); 568 alq->aq_waiters--; 569 570 /* 571 * If we're the first thread to wake after an AQ_WANTED wakeup 572 * but there isn't enough free space for us, we're going to loop 573 * and sleep again. If there are other threads waiting in this 574 * loop, schedule a wakeup so that they can see if the space 575 * they require is available. 576 */ 577 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) && 578 alq->aq_freebytes < len && !(alq->aq_flags & AQ_WANTED)) 579 waitchan = alq; 580 else 581 waitchan = NULL; 582 } 583 584 /* 585 * If there are waiters, we need to signal the waiting threads after we 586 * complete our work. The alq ptr is used as a wait channel for threads 587 * requiring resources to be freed up. In the AQ_ORDERED case, threads 588 * are not allowed to concurrently compete for resources in the above 589 * while loop, so we use a different wait channel in this case. 590 */ 591 if (alq->aq_waiters > 0) { 592 if (alq->aq_flags & AQ_ORDERED) 593 waitchan = &alq->aq_waiters; 594 else 595 waitchan = alq; 596 } else 597 waitchan = NULL; 598 599 /* Bail if we're shutting down. */ 600 if (alq->aq_flags & AQ_SHUTDOWN) { 601 ret = EWOULDBLOCK; 602 goto unlock; 603 } 604 605 /* 606 * If we need to wrap the buffer to accommodate the write, 607 * we'll need 2 calls to bcopy. 608 */ 609 if ((alq->aq_buflen - alq->aq_writehead) < len) 610 copy = alq->aq_buflen - alq->aq_writehead; 611 612 /* Copy message (or part thereof if wrap required) to the buffer. */ 613 bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy); 614 alq->aq_writehead += copy; 615 616 if (alq->aq_writehead >= alq->aq_buflen) { 617 KASSERT((alq->aq_writehead == alq->aq_buflen), 618 ("%s: alq->aq_writehead (%d) > alq->aq_buflen (%d)", 619 __func__, 620 alq->aq_writehead, 621 alq->aq_buflen)); 622 alq->aq_writehead = 0; 623 } 624 625 if (copy != len) { 626 /* 627 * Wrap the buffer by copying the remainder of our message 628 * to the start of the buffer and resetting aq_writehead. 629 */ 630 bcopy(((uint8_t *)data)+copy, alq->aq_entbuf, len - copy); 631 alq->aq_writehead = len - copy; 632 } 633 634 KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen), 635 ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__)); 636 637 alq->aq_freebytes -= len; 638 639 if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) { 640 alq->aq_flags |= AQ_ACTIVE; 641 activate = 1; 642 } 643 644 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__)); 645 646 unlock: 647 ALQ_UNLOCK(alq); 648 649 if (activate) { 650 ALD_LOCK(); 651 ald_activate(alq); 652 ALD_UNLOCK(); 653 } 654 655 /* NB: We rely on wakeup_one waking threads in a FIFO manner. */ 656 if (waitchan != NULL) 657 wakeup_one(waitchan); 658 659 return (ret); 660 } 661 662 int 663 alq_write(struct alq *alq, void *data, int flags) 664 { 665 /* Should only be called in fixed length message (legacy) mode. */ 666 KASSERT((alq->aq_flags & AQ_LEGACY), 667 ("%s: fixed length write on variable length queue", __func__)); 668 return (alq_writen(alq, data, alq->aq_entlen, flags)); 669 } 670 671 /* 672 * Retrieve a pointer for the ALQ to write directly into, avoiding bcopy. 673 */ 674 struct ale * 675 alq_getn(struct alq *alq, int len, int flags) 676 { 677 int contigbytes; 678 void *waitchan; 679 680 KASSERT((len > 0 && len <= alq->aq_buflen), 681 ("%s: len <= 0 || len > alq->aq_buflen", __func__)); 682 683 waitchan = NULL; 684 685 ALQ_LOCK(alq); 686 687 /* 688 * Determine the number of free contiguous bytes. 689 * We ensure elsewhere that if aq_writehead == aq_writetail because 690 * the buffer is empty, they will both be set to 0 and therefore 691 * aq_freebytes == aq_buflen and is fully contiguous. 692 * If they are equal and the buffer is not empty, aq_freebytes will 693 * be 0 indicating the buffer is full. 694 */ 695 if (alq->aq_writehead <= alq->aq_writetail) 696 contigbytes = alq->aq_freebytes; 697 else { 698 contigbytes = alq->aq_buflen - alq->aq_writehead; 699 700 if (contigbytes < len) { 701 /* 702 * Insufficient space at end of buffer to handle a 703 * contiguous write. Wrap early if there's space at 704 * the beginning. This will leave a hole at the end 705 * of the buffer which we will have to skip over when 706 * flushing the buffer to disk. 707 */ 708 if (alq->aq_writetail >= len || flags & ALQ_WAITOK) { 709 /* Keep track of # bytes left blank. */ 710 alq->aq_wrapearly = contigbytes; 711 /* Do the wrap and adjust counters. */ 712 contigbytes = alq->aq_freebytes = 713 alq->aq_writetail; 714 alq->aq_writehead = 0; 715 } 716 } 717 } 718 719 /* 720 * Return a NULL ALE if: 721 * - The message is larger than our underlying buffer. 722 * - The ALQ is being shutdown. 723 * - There is insufficient free space in our underlying buffer 724 * to accept the message and the user can't wait for space. 725 * - There is insufficient free space in our underlying buffer 726 * to accept the message and the alq is inactive due to prior 727 * use of the ALQ_NOACTIVATE flag (which would lead to deadlock). 728 */ 729 if (len > alq->aq_buflen || 730 alq->aq_flags & AQ_SHUTDOWN || 731 (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) && 732 HAS_PENDING_DATA(alq))) && contigbytes < len)) { 733 ALQ_UNLOCK(alq); 734 return (NULL); 735 } 736 737 /* 738 * If we want ordered writes and there is already at least one thread 739 * waiting for resources to become available, sleep until we're woken. 740 */ 741 if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) { 742 KASSERT(!(flags & ALQ_NOWAIT), 743 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 744 alq->aq_waiters++; 745 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqgnord", 0); 746 alq->aq_waiters--; 747 } 748 749 /* 750 * (ALQ_WAITOK && contigbytes < len) or contigbytes >= len, either enter 751 * while loop and sleep until we have enough contiguous free bytes 752 * (former) or skip (latter). If AQ_ORDERED is set, only 1 thread at a 753 * time will be in this loop. Otherwise, multiple threads may be 754 * sleeping here competing for ALQ resources. 755 */ 756 while (contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) { 757 KASSERT(!(flags & ALQ_NOWAIT), 758 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 759 alq->aq_flags |= AQ_WANTED; 760 alq->aq_waiters++; 761 if (waitchan) 762 wakeup(waitchan); 763 msleep_spin(alq, &alq->aq_mtx, "alqgnres", 0); 764 alq->aq_waiters--; 765 766 if (alq->aq_writehead <= alq->aq_writetail) 767 contigbytes = alq->aq_freebytes; 768 else 769 contigbytes = alq->aq_buflen - alq->aq_writehead; 770 771 /* 772 * If we're the first thread to wake after an AQ_WANTED wakeup 773 * but there isn't enough free space for us, we're going to loop 774 * and sleep again. If there are other threads waiting in this 775 * loop, schedule a wakeup so that they can see if the space 776 * they require is available. 777 */ 778 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) && 779 contigbytes < len && !(alq->aq_flags & AQ_WANTED)) 780 waitchan = alq; 781 else 782 waitchan = NULL; 783 } 784 785 /* 786 * If there are waiters, we need to signal the waiting threads after we 787 * complete our work. The alq ptr is used as a wait channel for threads 788 * requiring resources to be freed up. In the AQ_ORDERED case, threads 789 * are not allowed to concurrently compete for resources in the above 790 * while loop, so we use a different wait channel in this case. 791 */ 792 if (alq->aq_waiters > 0) { 793 if (alq->aq_flags & AQ_ORDERED) 794 waitchan = &alq->aq_waiters; 795 else 796 waitchan = alq; 797 } else 798 waitchan = NULL; 799 800 /* Bail if we're shutting down. */ 801 if (alq->aq_flags & AQ_SHUTDOWN) { 802 ALQ_UNLOCK(alq); 803 if (waitchan != NULL) 804 wakeup_one(waitchan); 805 return (NULL); 806 } 807 808 /* 809 * If we are here, we have a contiguous number of bytes >= len 810 * available in our buffer starting at aq_writehead. 811 */ 812 alq->aq_getpost.ae_data = alq->aq_entbuf + alq->aq_writehead; 813 alq->aq_getpost.ae_bytesused = len; 814 815 return (&alq->aq_getpost); 816 } 817 818 struct ale * 819 alq_get(struct alq *alq, int flags) 820 { 821 /* Should only be called in fixed length message (legacy) mode. */ 822 KASSERT((alq->aq_flags & AQ_LEGACY), 823 ("%s: fixed length get on variable length queue", __func__)); 824 return (alq_getn(alq, alq->aq_entlen, flags)); 825 } 826 827 void 828 alq_post_flags(struct alq *alq, struct ale *ale, int flags) 829 { 830 int activate; 831 void *waitchan; 832 833 activate = 0; 834 835 if (ale->ae_bytesused > 0) { 836 if (!(alq->aq_flags & AQ_ACTIVE) && 837 !(flags & ALQ_NOACTIVATE)) { 838 alq->aq_flags |= AQ_ACTIVE; 839 activate = 1; 840 } 841 842 alq->aq_writehead += ale->ae_bytesused; 843 alq->aq_freebytes -= ale->ae_bytesused; 844 845 /* Wrap aq_writehead if we filled to the end of the buffer. */ 846 if (alq->aq_writehead == alq->aq_buflen) 847 alq->aq_writehead = 0; 848 849 KASSERT((alq->aq_writehead >= 0 && 850 alq->aq_writehead < alq->aq_buflen), 851 ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", 852 __func__)); 853 854 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__)); 855 } 856 857 /* 858 * If there are waiters, we need to signal the waiting threads after we 859 * complete our work. The alq ptr is used as a wait channel for threads 860 * requiring resources to be freed up. In the AQ_ORDERED case, threads 861 * are not allowed to concurrently compete for resources in the 862 * alq_getn() while loop, so we use a different wait channel in this case. 863 */ 864 if (alq->aq_waiters > 0) { 865 if (alq->aq_flags & AQ_ORDERED) 866 waitchan = &alq->aq_waiters; 867 else 868 waitchan = alq; 869 } else 870 waitchan = NULL; 871 872 ALQ_UNLOCK(alq); 873 874 if (activate) { 875 ALD_LOCK(); 876 ald_activate(alq); 877 ALD_UNLOCK(); 878 } 879 880 /* NB: We rely on wakeup_one waking threads in a FIFO manner. */ 881 if (waitchan != NULL) 882 wakeup_one(waitchan); 883 } 884 885 void 886 alq_flush(struct alq *alq) 887 { 888 int needwakeup = 0; 889 890 ALD_LOCK(); 891 ALQ_LOCK(alq); 892 893 /* 894 * Pull the lever iff there is data to flush and we're 895 * not already in the middle of a flush operation. 896 */ 897 if (HAS_PENDING_DATA(alq) && !(alq->aq_flags & AQ_FLUSHING)) { 898 if (alq->aq_flags & AQ_ACTIVE) 899 ald_deactivate(alq); 900 901 ALD_UNLOCK(); 902 needwakeup = alq_doio(alq); 903 } else 904 ALD_UNLOCK(); 905 906 ALQ_UNLOCK(alq); 907 908 if (needwakeup) 909 wakeup_one(alq); 910 } 911 912 /* 913 * Flush remaining data, close the file and free all resources. 914 */ 915 void 916 alq_close(struct alq *alq) 917 { 918 /* Only flush and destroy alq if not already shutting down. */ 919 if (ald_rem(alq) == 0) 920 alq_destroy(alq); 921 } 922 923 static int 924 alq_load_handler(module_t mod, int what, void *arg) 925 { 926 int ret; 927 928 ret = 0; 929 930 switch (what) { 931 case MOD_LOAD: 932 case MOD_SHUTDOWN: 933 break; 934 935 case MOD_QUIESCE: 936 ALD_LOCK(); 937 /* Only allow unload if there are no open queues. */ 938 if (LIST_FIRST(&ald_queues) == NULL) { 939 ald_shutingdown = 1; 940 ALD_UNLOCK(); 941 EVENTHANDLER_DEREGISTER(shutdown_pre_sync, 942 alq_eventhandler_tag); 943 ald_shutdown(NULL, 0); 944 mtx_destroy(&ald_mtx); 945 } else { 946 ALD_UNLOCK(); 947 ret = EBUSY; 948 } 949 break; 950 951 case MOD_UNLOAD: 952 /* If MOD_QUIESCE failed we must fail here too. */ 953 if (ald_shutingdown == 0) 954 ret = EBUSY; 955 break; 956 957 default: 958 ret = EINVAL; 959 break; 960 } 961 962 return (ret); 963 } 964 965 static moduledata_t alq_mod = 966 { 967 "alq", 968 alq_load_handler, 969 NULL 970 }; 971 972 DECLARE_MODULE(alq, alq_mod, SI_SUB_LAST, SI_ORDER_ANY); 973 MODULE_VERSION(alq, 1); 974