1 /*- 2 * SPDX-License-Identifier: BSD-2-Clause 3 * 4 * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org> 5 * Copyright (c) 2008-2009, Lawrence Stewart <lstewart@freebsd.org> 6 * Copyright (c) 2009-2010, The FreeBSD Foundation 7 * All rights reserved. 8 * 9 * Portions of this software were developed at the Centre for Advanced 10 * Internet Architectures, Swinburne University of Technology, Melbourne, 11 * Australia by Lawrence Stewart under sponsorship from the FreeBSD Foundation. 12 * 13 * Redistribution and use in source and binary forms, with or without 14 * modification, are permitted provided that the following conditions 15 * are met: 16 * 1. Redistributions of source code must retain the above copyright 17 * notice unmodified, this list of conditions, and the following 18 * disclaimer. 19 * 2. Redistributions in binary form must reproduce the above copyright 20 * notice, this list of conditions and the following disclaimer in the 21 * documentation and/or other materials provided with the distribution. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 24 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 25 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 26 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 27 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 28 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 32 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33 */ 34 35 #include <sys/cdefs.h> 36 #include "opt_mac.h" 37 38 #include <sys/param.h> 39 #include <sys/systm.h> 40 #include <sys/kernel.h> 41 #include <sys/kthread.h> 42 #include <sys/lock.h> 43 #include <sys/mount.h> 44 #include <sys/mutex.h> 45 #include <sys/namei.h> 46 #include <sys/proc.h> 47 #include <sys/vnode.h> 48 #include <sys/alq.h> 49 #include <sys/malloc.h> 50 #include <sys/unistd.h> 51 #include <sys/fcntl.h> 52 #include <sys/eventhandler.h> 53 54 #include <security/mac/mac_framework.h> 55 56 /* Async. Logging Queue */ 57 struct alq { 58 char *aq_entbuf; /* Buffer for stored entries */ 59 int aq_entmax; /* Max entries */ 60 int aq_entlen; /* Entry length */ 61 int aq_freebytes; /* Bytes available in buffer */ 62 int aq_buflen; /* Total length of our buffer */ 63 int aq_writehead; /* Location for next write */ 64 int aq_writetail; /* Flush starts at this location */ 65 int aq_wrapearly; /* # bytes left blank at end of buf */ 66 int aq_flags; /* Queue flags */ 67 int aq_waiters; /* Num threads waiting for resources 68 * NB: Used as a wait channel so must 69 * not be first field in the alq struct 70 */ 71 struct ale aq_getpost; /* ALE for use by get/post */ 72 struct mtx aq_mtx; /* Queue lock */ 73 struct vnode *aq_vp; /* Open vnode handle */ 74 struct ucred *aq_cred; /* Credentials of the opening thread */ 75 LIST_ENTRY(alq) aq_act; /* List of active queues */ 76 LIST_ENTRY(alq) aq_link; /* List of all queues */ 77 }; 78 79 #define AQ_WANTED 0x0001 /* Wakeup sleeper when io is done */ 80 #define AQ_ACTIVE 0x0002 /* on the active list */ 81 #define AQ_FLUSHING 0x0004 /* doing IO */ 82 #define AQ_SHUTDOWN 0x0008 /* Queue no longer valid */ 83 #define AQ_ORDERED 0x0010 /* Queue enforces ordered writes */ 84 #define AQ_LEGACY 0x0020 /* Legacy queue (fixed length writes) */ 85 86 #define ALQ_LOCK(alq) mtx_lock_spin(&(alq)->aq_mtx) 87 #define ALQ_UNLOCK(alq) mtx_unlock_spin(&(alq)->aq_mtx) 88 89 #define HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen) 90 91 static MALLOC_DEFINE(M_ALD, "ALD", "ALD"); 92 93 /* 94 * The ald_mtx protects the ald_queues list and the ald_active list. 95 */ 96 static struct mtx ald_mtx; 97 static LIST_HEAD(, alq) ald_queues; 98 static LIST_HEAD(, alq) ald_active; 99 static int ald_shutingdown = 0; 100 struct thread *ald_thread; 101 static struct proc *ald_proc; 102 static eventhandler_tag alq_eventhandler_tag = NULL; 103 104 #define ALD_LOCK() mtx_lock(&ald_mtx) 105 #define ALD_UNLOCK() mtx_unlock(&ald_mtx) 106 107 /* Daemon functions */ 108 static int ald_add(struct alq *); 109 static int ald_rem(struct alq *); 110 static void ald_startup(void *); 111 static void ald_daemon(void); 112 static void ald_shutdown(void *, int); 113 static void ald_activate(struct alq *); 114 static void ald_deactivate(struct alq *); 115 116 /* Internal queue functions */ 117 static void alq_shutdown(struct alq *); 118 static void alq_destroy(struct alq *); 119 static int alq_doio(struct alq *); 120 121 /* 122 * Add a new queue to the global list. Fail if we're shutting down. 123 */ 124 static int 125 ald_add(struct alq *alq) 126 { 127 int error; 128 129 error = 0; 130 131 ALD_LOCK(); 132 if (ald_shutingdown) { 133 error = EBUSY; 134 goto done; 135 } 136 LIST_INSERT_HEAD(&ald_queues, alq, aq_link); 137 done: 138 ALD_UNLOCK(); 139 return (error); 140 } 141 142 /* 143 * Remove a queue from the global list unless we're shutting down. If so, 144 * the ald will take care of cleaning up it's resources. 145 */ 146 static int 147 ald_rem(struct alq *alq) 148 { 149 int error; 150 151 error = 0; 152 153 ALD_LOCK(); 154 if (ald_shutingdown) { 155 error = EBUSY; 156 goto done; 157 } 158 LIST_REMOVE(alq, aq_link); 159 done: 160 ALD_UNLOCK(); 161 return (error); 162 } 163 164 /* 165 * Put a queue on the active list. This will schedule it for writing. 166 */ 167 static void 168 ald_activate(struct alq *alq) 169 { 170 LIST_INSERT_HEAD(&ald_active, alq, aq_act); 171 wakeup(&ald_active); 172 } 173 174 static void 175 ald_deactivate(struct alq *alq) 176 { 177 LIST_REMOVE(alq, aq_act); 178 alq->aq_flags &= ~AQ_ACTIVE; 179 } 180 181 static void 182 ald_startup(void *unused) 183 { 184 mtx_init(&ald_mtx, "ALDmtx", NULL, MTX_DEF|MTX_QUIET); 185 LIST_INIT(&ald_queues); 186 LIST_INIT(&ald_active); 187 } 188 189 static void 190 ald_daemon(void) 191 { 192 int needwakeup; 193 struct alq *alq; 194 195 ald_thread = FIRST_THREAD_IN_PROC(ald_proc); 196 197 alq_eventhandler_tag = EVENTHANDLER_REGISTER(shutdown_pre_sync, 198 ald_shutdown, NULL, SHUTDOWN_PRI_FIRST); 199 200 ALD_LOCK(); 201 202 for (;;) { 203 while ((alq = LIST_FIRST(&ald_active)) == NULL && 204 !ald_shutingdown) 205 mtx_sleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0); 206 207 /* Don't shutdown until all active ALQs are flushed. */ 208 if (ald_shutingdown && alq == NULL) { 209 ALD_UNLOCK(); 210 break; 211 } 212 213 ALQ_LOCK(alq); 214 ald_deactivate(alq); 215 ALD_UNLOCK(); 216 needwakeup = alq_doio(alq); 217 ALQ_UNLOCK(alq); 218 if (needwakeup) 219 wakeup_one(alq); 220 ALD_LOCK(); 221 } 222 223 kproc_exit(0); 224 } 225 226 static void 227 ald_shutdown(void *arg, int howto) 228 { 229 struct alq *alq; 230 231 ALD_LOCK(); 232 233 /* Ensure no new queues can be created. */ 234 ald_shutingdown = 1; 235 236 /* Shutdown all ALQs prior to terminating the ald_daemon. */ 237 while ((alq = LIST_FIRST(&ald_queues)) != NULL) { 238 LIST_REMOVE(alq, aq_link); 239 ALD_UNLOCK(); 240 alq_shutdown(alq); 241 ALD_LOCK(); 242 } 243 244 /* At this point, all ALQs are flushed and shutdown. */ 245 246 /* 247 * Wake ald_daemon so that it exits. It won't be able to do 248 * anything until we mtx_sleep because we hold the ald_mtx. 249 */ 250 wakeup(&ald_active); 251 252 /* Wait for ald_daemon to exit. */ 253 mtx_sleep(ald_proc, &ald_mtx, PWAIT, "aldslp", 0); 254 255 ALD_UNLOCK(); 256 } 257 258 static void 259 alq_shutdown(struct alq *alq) 260 { 261 ALQ_LOCK(alq); 262 263 /* Stop any new writers. */ 264 alq->aq_flags |= AQ_SHUTDOWN; 265 266 /* 267 * If the ALQ isn't active but has unwritten data (possible if 268 * the ALQ_NOACTIVATE flag has been used), explicitly activate the 269 * ALQ here so that the pending data gets flushed by the ald_daemon. 270 */ 271 if (!(alq->aq_flags & AQ_ACTIVE) && HAS_PENDING_DATA(alq)) { 272 alq->aq_flags |= AQ_ACTIVE; 273 ALQ_UNLOCK(alq); 274 ALD_LOCK(); 275 ald_activate(alq); 276 ALD_UNLOCK(); 277 ALQ_LOCK(alq); 278 } 279 280 /* Drain IO */ 281 while (alq->aq_flags & AQ_ACTIVE) { 282 alq->aq_flags |= AQ_WANTED; 283 msleep_spin(alq, &alq->aq_mtx, "aldclose", 0); 284 } 285 ALQ_UNLOCK(alq); 286 287 vn_close(alq->aq_vp, FWRITE, alq->aq_cred, 288 curthread); 289 crfree(alq->aq_cred); 290 } 291 292 void 293 alq_destroy(struct alq *alq) 294 { 295 /* Drain all pending IO. */ 296 alq_shutdown(alq); 297 298 mtx_destroy(&alq->aq_mtx); 299 free(alq->aq_entbuf, M_ALD); 300 free(alq, M_ALD); 301 } 302 303 /* 304 * Flush all pending data to disk. This operation will block. 305 */ 306 static int 307 alq_doio(struct alq *alq) 308 { 309 struct thread *td; 310 struct mount *mp; 311 struct vnode *vp; 312 struct uio auio; 313 struct iovec aiov[2]; 314 int totlen; 315 int iov; 316 int wrapearly; 317 318 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__)); 319 320 vp = alq->aq_vp; 321 td = curthread; 322 totlen = 0; 323 iov = 1; 324 wrapearly = alq->aq_wrapearly; 325 326 bzero(&aiov, sizeof(aiov)); 327 bzero(&auio, sizeof(auio)); 328 329 /* Start the write from the location of our buffer tail pointer. */ 330 aiov[0].iov_base = alq->aq_entbuf + alq->aq_writetail; 331 332 if (alq->aq_writetail < alq->aq_writehead) { 333 /* Buffer not wrapped. */ 334 totlen = aiov[0].iov_len = alq->aq_writehead - alq->aq_writetail; 335 } else if (alq->aq_writehead == 0) { 336 /* Buffer not wrapped (special case to avoid an empty iov). */ 337 totlen = aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail - 338 wrapearly; 339 } else { 340 /* 341 * Buffer wrapped, requires 2 aiov entries: 342 * - first is from writetail to end of buffer 343 * - second is from start of buffer to writehead 344 */ 345 aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail - 346 wrapearly; 347 iov++; 348 aiov[1].iov_base = alq->aq_entbuf; 349 aiov[1].iov_len = alq->aq_writehead; 350 totlen = aiov[0].iov_len + aiov[1].iov_len; 351 } 352 353 alq->aq_flags |= AQ_FLUSHING; 354 ALQ_UNLOCK(alq); 355 356 auio.uio_iov = &aiov[0]; 357 auio.uio_offset = 0; 358 auio.uio_segflg = UIO_SYSSPACE; 359 auio.uio_rw = UIO_WRITE; 360 auio.uio_iovcnt = iov; 361 auio.uio_resid = totlen; 362 auio.uio_td = td; 363 364 /* 365 * Do all of the junk required to write now. 366 */ 367 vn_start_write(vp, &mp, V_WAIT); 368 vn_lock(vp, LK_EXCLUSIVE | LK_RETRY); 369 /* 370 * XXX: VOP_WRITE error checks are ignored. 371 */ 372 #ifdef MAC 373 if (mac_vnode_check_write(alq->aq_cred, NOCRED, vp) == 0) 374 #endif 375 VOP_WRITE(vp, &auio, IO_UNIT | IO_APPEND, alq->aq_cred); 376 VOP_UNLOCK(vp); 377 vn_finished_write(mp); 378 379 ALQ_LOCK(alq); 380 alq->aq_flags &= ~AQ_FLUSHING; 381 382 /* Adjust writetail as required, taking into account wrapping. */ 383 alq->aq_writetail = (alq->aq_writetail + totlen + wrapearly) % 384 alq->aq_buflen; 385 alq->aq_freebytes += totlen + wrapearly; 386 387 /* 388 * If we just flushed part of the buffer which wrapped, reset the 389 * wrapearly indicator. 390 */ 391 if (wrapearly) 392 alq->aq_wrapearly = 0; 393 394 /* 395 * If we just flushed the buffer completely, reset indexes to 0 to 396 * minimise buffer wraps. 397 * This is also required to ensure alq_getn() can't wedge itself. 398 */ 399 if (!HAS_PENDING_DATA(alq)) 400 alq->aq_writehead = alq->aq_writetail = 0; 401 402 KASSERT((alq->aq_writetail >= 0 && alq->aq_writetail < alq->aq_buflen), 403 ("%s: aq_writetail < 0 || aq_writetail >= aq_buflen", __func__)); 404 405 if (alq->aq_flags & AQ_WANTED) { 406 alq->aq_flags &= ~AQ_WANTED; 407 return (1); 408 } 409 410 return(0); 411 } 412 413 static struct kproc_desc ald_kp = { 414 "ALQ Daemon", 415 ald_daemon, 416 &ald_proc 417 }; 418 419 SYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, SI_ORDER_ANY, kproc_start, &ald_kp); 420 SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, ald_startup, NULL); 421 422 /* User visible queue functions */ 423 424 /* 425 * Create the queue data structure, allocate the buffer, and open the file. 426 */ 427 428 int 429 alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode, 430 int size, int flags) 431 { 432 struct nameidata nd; 433 struct alq *alq; 434 int oflags; 435 int error; 436 437 KASSERT((size > 0), ("%s: size <= 0", __func__)); 438 439 *alqp = NULL; 440 441 NDINIT(&nd, LOOKUP, NOFOLLOW, UIO_SYSSPACE, file); 442 oflags = FWRITE | O_NOFOLLOW | O_CREAT; 443 444 error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL); 445 if (error) 446 return (error); 447 448 NDFREE_PNBUF(&nd); 449 /* We just unlock so we hold a reference */ 450 VOP_UNLOCK(nd.ni_vp); 451 452 alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO); 453 alq->aq_vp = nd.ni_vp; 454 alq->aq_cred = crhold(cred); 455 456 mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET); 457 458 alq->aq_buflen = size; 459 alq->aq_entmax = 0; 460 alq->aq_entlen = 0; 461 462 alq->aq_freebytes = alq->aq_buflen; 463 alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO); 464 alq->aq_writehead = alq->aq_writetail = 0; 465 if (flags & ALQ_ORDERED) 466 alq->aq_flags |= AQ_ORDERED; 467 468 if ((error = ald_add(alq)) != 0) { 469 alq_destroy(alq); 470 return (error); 471 } 472 473 *alqp = alq; 474 475 return (0); 476 } 477 478 int 479 alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode, 480 int size, int count) 481 { 482 int ret; 483 484 KASSERT((count >= 0), ("%s: count < 0", __func__)); 485 486 if (count > 0) { 487 if ((ret = alq_open_flags(alqp, file, cred, cmode, 488 size*count, 0)) == 0) { 489 (*alqp)->aq_flags |= AQ_LEGACY; 490 (*alqp)->aq_entmax = count; 491 (*alqp)->aq_entlen = size; 492 } 493 } else 494 ret = alq_open_flags(alqp, file, cred, cmode, size, 0); 495 496 return (ret); 497 } 498 499 /* 500 * Copy a new entry into the queue. If the operation would block either 501 * wait or return an error depending on the value of waitok. 502 */ 503 int 504 alq_writen(struct alq *alq, void *data, int len, int flags) 505 { 506 int activate, copy, ret; 507 void *waitchan; 508 509 KASSERT((len > 0 && len <= alq->aq_buflen), 510 ("%s: len <= 0 || len > aq_buflen", __func__)); 511 512 activate = ret = 0; 513 copy = len; 514 waitchan = NULL; 515 516 ALQ_LOCK(alq); 517 518 /* 519 * Fail to perform the write and return EWOULDBLOCK if: 520 * - The message is larger than our underlying buffer. 521 * - The ALQ is being shutdown. 522 * - There is insufficient free space in our underlying buffer 523 * to accept the message and the user can't wait for space. 524 * - There is insufficient free space in our underlying buffer 525 * to accept the message and the alq is inactive due to prior 526 * use of the ALQ_NOACTIVATE flag (which would lead to deadlock). 527 */ 528 if (len > alq->aq_buflen || 529 alq->aq_flags & AQ_SHUTDOWN || 530 (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) && 531 HAS_PENDING_DATA(alq))) && alq->aq_freebytes < len)) { 532 ALQ_UNLOCK(alq); 533 return (EWOULDBLOCK); 534 } 535 536 /* 537 * If we want ordered writes and there is already at least one thread 538 * waiting for resources to become available, sleep until we're woken. 539 */ 540 if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) { 541 KASSERT(!(flags & ALQ_NOWAIT), 542 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 543 alq->aq_waiters++; 544 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwnord", 0); 545 alq->aq_waiters--; 546 } 547 548 /* 549 * (ALQ_WAITOK && aq_freebytes < len) or aq_freebytes >= len, either 550 * enter while loop and sleep until we have enough free bytes (former) 551 * or skip (latter). If AQ_ORDERED is set, only 1 thread at a time will 552 * be in this loop. Otherwise, multiple threads may be sleeping here 553 * competing for ALQ resources. 554 */ 555 while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) { 556 KASSERT(!(flags & ALQ_NOWAIT), 557 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 558 alq->aq_flags |= AQ_WANTED; 559 alq->aq_waiters++; 560 if (waitchan) 561 wakeup(waitchan); 562 msleep_spin(alq, &alq->aq_mtx, "alqwnres", 0); 563 alq->aq_waiters--; 564 565 /* 566 * If we're the first thread to wake after an AQ_WANTED wakeup 567 * but there isn't enough free space for us, we're going to loop 568 * and sleep again. If there are other threads waiting in this 569 * loop, schedule a wakeup so that they can see if the space 570 * they require is available. 571 */ 572 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) && 573 alq->aq_freebytes < len && !(alq->aq_flags & AQ_WANTED)) 574 waitchan = alq; 575 else 576 waitchan = NULL; 577 } 578 579 /* 580 * If there are waiters, we need to signal the waiting threads after we 581 * complete our work. The alq ptr is used as a wait channel for threads 582 * requiring resources to be freed up. In the AQ_ORDERED case, threads 583 * are not allowed to concurrently compete for resources in the above 584 * while loop, so we use a different wait channel in this case. 585 */ 586 if (alq->aq_waiters > 0) { 587 if (alq->aq_flags & AQ_ORDERED) 588 waitchan = &alq->aq_waiters; 589 else 590 waitchan = alq; 591 } else 592 waitchan = NULL; 593 594 /* Bail if we're shutting down. */ 595 if (alq->aq_flags & AQ_SHUTDOWN) { 596 ret = EWOULDBLOCK; 597 goto unlock; 598 } 599 600 /* 601 * If we need to wrap the buffer to accommodate the write, 602 * we'll need 2 calls to bcopy. 603 */ 604 if ((alq->aq_buflen - alq->aq_writehead) < len) 605 copy = alq->aq_buflen - alq->aq_writehead; 606 607 /* Copy message (or part thereof if wrap required) to the buffer. */ 608 bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy); 609 alq->aq_writehead += copy; 610 611 if (alq->aq_writehead >= alq->aq_buflen) { 612 KASSERT((alq->aq_writehead == alq->aq_buflen), 613 ("%s: alq->aq_writehead (%d) > alq->aq_buflen (%d)", 614 __func__, 615 alq->aq_writehead, 616 alq->aq_buflen)); 617 alq->aq_writehead = 0; 618 } 619 620 if (copy != len) { 621 /* 622 * Wrap the buffer by copying the remainder of our message 623 * to the start of the buffer and resetting aq_writehead. 624 */ 625 bcopy(((uint8_t *)data)+copy, alq->aq_entbuf, len - copy); 626 alq->aq_writehead = len - copy; 627 } 628 629 KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen), 630 ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__)); 631 632 alq->aq_freebytes -= len; 633 634 if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) { 635 alq->aq_flags |= AQ_ACTIVE; 636 activate = 1; 637 } 638 639 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__)); 640 641 unlock: 642 ALQ_UNLOCK(alq); 643 644 if (activate) { 645 ALD_LOCK(); 646 ald_activate(alq); 647 ALD_UNLOCK(); 648 } 649 650 /* NB: We rely on wakeup_one waking threads in a FIFO manner. */ 651 if (waitchan != NULL) 652 wakeup_one(waitchan); 653 654 return (ret); 655 } 656 657 int 658 alq_write(struct alq *alq, void *data, int flags) 659 { 660 /* Should only be called in fixed length message (legacy) mode. */ 661 KASSERT((alq->aq_flags & AQ_LEGACY), 662 ("%s: fixed length write on variable length queue", __func__)); 663 return (alq_writen(alq, data, alq->aq_entlen, flags)); 664 } 665 666 /* 667 * Retrieve a pointer for the ALQ to write directly into, avoiding bcopy. 668 */ 669 struct ale * 670 alq_getn(struct alq *alq, int len, int flags) 671 { 672 int contigbytes; 673 void *waitchan; 674 675 KASSERT((len > 0 && len <= alq->aq_buflen), 676 ("%s: len <= 0 || len > alq->aq_buflen", __func__)); 677 678 waitchan = NULL; 679 680 ALQ_LOCK(alq); 681 682 /* 683 * Determine the number of free contiguous bytes. 684 * We ensure elsewhere that if aq_writehead == aq_writetail because 685 * the buffer is empty, they will both be set to 0 and therefore 686 * aq_freebytes == aq_buflen and is fully contiguous. 687 * If they are equal and the buffer is not empty, aq_freebytes will 688 * be 0 indicating the buffer is full. 689 */ 690 if (alq->aq_writehead <= alq->aq_writetail) 691 contigbytes = alq->aq_freebytes; 692 else { 693 contigbytes = alq->aq_buflen - alq->aq_writehead; 694 695 if (contigbytes < len) { 696 /* 697 * Insufficient space at end of buffer to handle a 698 * contiguous write. Wrap early if there's space at 699 * the beginning. This will leave a hole at the end 700 * of the buffer which we will have to skip over when 701 * flushing the buffer to disk. 702 */ 703 if (alq->aq_writetail >= len || flags & ALQ_WAITOK) { 704 /* Keep track of # bytes left blank. */ 705 alq->aq_wrapearly = contigbytes; 706 /* Do the wrap and adjust counters. */ 707 contigbytes = alq->aq_freebytes = 708 alq->aq_writetail; 709 alq->aq_writehead = 0; 710 } 711 } 712 } 713 714 /* 715 * Return a NULL ALE if: 716 * - The message is larger than our underlying buffer. 717 * - The ALQ is being shutdown. 718 * - There is insufficient free space in our underlying buffer 719 * to accept the message and the user can't wait for space. 720 * - There is insufficient free space in our underlying buffer 721 * to accept the message and the alq is inactive due to prior 722 * use of the ALQ_NOACTIVATE flag (which would lead to deadlock). 723 */ 724 if (len > alq->aq_buflen || 725 alq->aq_flags & AQ_SHUTDOWN || 726 (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) && 727 HAS_PENDING_DATA(alq))) && contigbytes < len)) { 728 ALQ_UNLOCK(alq); 729 return (NULL); 730 } 731 732 /* 733 * If we want ordered writes and there is already at least one thread 734 * waiting for resources to become available, sleep until we're woken. 735 */ 736 if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) { 737 KASSERT(!(flags & ALQ_NOWAIT), 738 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 739 alq->aq_waiters++; 740 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqgnord", 0); 741 alq->aq_waiters--; 742 } 743 744 /* 745 * (ALQ_WAITOK && contigbytes < len) or contigbytes >= len, either enter 746 * while loop and sleep until we have enough contiguous free bytes 747 * (former) or skip (latter). If AQ_ORDERED is set, only 1 thread at a 748 * time will be in this loop. Otherwise, multiple threads may be 749 * sleeping here competing for ALQ resources. 750 */ 751 while (contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) { 752 KASSERT(!(flags & ALQ_NOWAIT), 753 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 754 alq->aq_flags |= AQ_WANTED; 755 alq->aq_waiters++; 756 if (waitchan) 757 wakeup(waitchan); 758 msleep_spin(alq, &alq->aq_mtx, "alqgnres", 0); 759 alq->aq_waiters--; 760 761 if (alq->aq_writehead <= alq->aq_writetail) 762 contigbytes = alq->aq_freebytes; 763 else 764 contigbytes = alq->aq_buflen - alq->aq_writehead; 765 766 /* 767 * If we're the first thread to wake after an AQ_WANTED wakeup 768 * but there isn't enough free space for us, we're going to loop 769 * and sleep again. If there are other threads waiting in this 770 * loop, schedule a wakeup so that they can see if the space 771 * they require is available. 772 */ 773 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) && 774 contigbytes < len && !(alq->aq_flags & AQ_WANTED)) 775 waitchan = alq; 776 else 777 waitchan = NULL; 778 } 779 780 /* 781 * If there are waiters, we need to signal the waiting threads after we 782 * complete our work. The alq ptr is used as a wait channel for threads 783 * requiring resources to be freed up. In the AQ_ORDERED case, threads 784 * are not allowed to concurrently compete for resources in the above 785 * while loop, so we use a different wait channel in this case. 786 */ 787 if (alq->aq_waiters > 0) { 788 if (alq->aq_flags & AQ_ORDERED) 789 waitchan = &alq->aq_waiters; 790 else 791 waitchan = alq; 792 } else 793 waitchan = NULL; 794 795 /* Bail if we're shutting down. */ 796 if (alq->aq_flags & AQ_SHUTDOWN) { 797 ALQ_UNLOCK(alq); 798 if (waitchan != NULL) 799 wakeup_one(waitchan); 800 return (NULL); 801 } 802 803 /* 804 * If we are here, we have a contiguous number of bytes >= len 805 * available in our buffer starting at aq_writehead. 806 */ 807 alq->aq_getpost.ae_data = alq->aq_entbuf + alq->aq_writehead; 808 alq->aq_getpost.ae_bytesused = len; 809 810 return (&alq->aq_getpost); 811 } 812 813 struct ale * 814 alq_get(struct alq *alq, int flags) 815 { 816 /* Should only be called in fixed length message (legacy) mode. */ 817 KASSERT((alq->aq_flags & AQ_LEGACY), 818 ("%s: fixed length get on variable length queue", __func__)); 819 return (alq_getn(alq, alq->aq_entlen, flags)); 820 } 821 822 void 823 alq_post_flags(struct alq *alq, struct ale *ale, int flags) 824 { 825 int activate; 826 void *waitchan; 827 828 activate = 0; 829 830 if (ale->ae_bytesused > 0) { 831 if (!(alq->aq_flags & AQ_ACTIVE) && 832 !(flags & ALQ_NOACTIVATE)) { 833 alq->aq_flags |= AQ_ACTIVE; 834 activate = 1; 835 } 836 837 alq->aq_writehead += ale->ae_bytesused; 838 alq->aq_freebytes -= ale->ae_bytesused; 839 840 /* Wrap aq_writehead if we filled to the end of the buffer. */ 841 if (alq->aq_writehead == alq->aq_buflen) 842 alq->aq_writehead = 0; 843 844 KASSERT((alq->aq_writehead >= 0 && 845 alq->aq_writehead < alq->aq_buflen), 846 ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", 847 __func__)); 848 849 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__)); 850 } 851 852 /* 853 * If there are waiters, we need to signal the waiting threads after we 854 * complete our work. The alq ptr is used as a wait channel for threads 855 * requiring resources to be freed up. In the AQ_ORDERED case, threads 856 * are not allowed to concurrently compete for resources in the 857 * alq_getn() while loop, so we use a different wait channel in this case. 858 */ 859 if (alq->aq_waiters > 0) { 860 if (alq->aq_flags & AQ_ORDERED) 861 waitchan = &alq->aq_waiters; 862 else 863 waitchan = alq; 864 } else 865 waitchan = NULL; 866 867 ALQ_UNLOCK(alq); 868 869 if (activate) { 870 ALD_LOCK(); 871 ald_activate(alq); 872 ALD_UNLOCK(); 873 } 874 875 /* NB: We rely on wakeup_one waking threads in a FIFO manner. */ 876 if (waitchan != NULL) 877 wakeup_one(waitchan); 878 } 879 880 void 881 alq_flush(struct alq *alq) 882 { 883 int needwakeup = 0; 884 885 ALD_LOCK(); 886 ALQ_LOCK(alq); 887 888 /* 889 * Pull the lever iff there is data to flush and we're 890 * not already in the middle of a flush operation. 891 */ 892 if (HAS_PENDING_DATA(alq) && !(alq->aq_flags & AQ_FLUSHING)) { 893 if (alq->aq_flags & AQ_ACTIVE) 894 ald_deactivate(alq); 895 896 ALD_UNLOCK(); 897 needwakeup = alq_doio(alq); 898 } else 899 ALD_UNLOCK(); 900 901 ALQ_UNLOCK(alq); 902 903 if (needwakeup) 904 wakeup_one(alq); 905 } 906 907 /* 908 * Flush remaining data, close the file and free all resources. 909 */ 910 void 911 alq_close(struct alq *alq) 912 { 913 /* Only flush and destroy alq if not already shutting down. */ 914 if (ald_rem(alq) == 0) 915 alq_destroy(alq); 916 } 917 918 static int 919 alq_load_handler(module_t mod, int what, void *arg) 920 { 921 int ret; 922 923 ret = 0; 924 925 switch (what) { 926 case MOD_LOAD: 927 case MOD_SHUTDOWN: 928 break; 929 930 case MOD_QUIESCE: 931 ALD_LOCK(); 932 /* Only allow unload if there are no open queues. */ 933 if (LIST_FIRST(&ald_queues) == NULL) { 934 ald_shutingdown = 1; 935 ALD_UNLOCK(); 936 EVENTHANDLER_DEREGISTER(shutdown_pre_sync, 937 alq_eventhandler_tag); 938 ald_shutdown(NULL, 0); 939 mtx_destroy(&ald_mtx); 940 } else { 941 ALD_UNLOCK(); 942 ret = EBUSY; 943 } 944 break; 945 946 case MOD_UNLOAD: 947 /* If MOD_QUIESCE failed we must fail here too. */ 948 if (ald_shutingdown == 0) 949 ret = EBUSY; 950 break; 951 952 default: 953 ret = EINVAL; 954 break; 955 } 956 957 return (ret); 958 } 959 960 static moduledata_t alq_mod = 961 { 962 "alq", 963 alq_load_handler, 964 NULL 965 }; 966 967 DECLARE_MODULE(alq, alq_mod, SI_SUB_LAST, SI_ORDER_ANY); 968 MODULE_VERSION(alq, 1); 969