1 /*- 2 * SPDX-License-Identifier: BSD-2-Clause-FreeBSD 3 * 4 * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org> 5 * Copyright (c) 2008-2009, Lawrence Stewart <lstewart@freebsd.org> 6 * Copyright (c) 2009-2010, The FreeBSD Foundation 7 * All rights reserved. 8 * 9 * Portions of this software were developed at the Centre for Advanced 10 * Internet Architectures, Swinburne University of Technology, Melbourne, 11 * Australia by Lawrence Stewart under sponsorship from the FreeBSD Foundation. 12 * 13 * Redistribution and use in source and binary forms, with or without 14 * modification, are permitted provided that the following conditions 15 * are met: 16 * 1. Redistributions of source code must retain the above copyright 17 * notice unmodified, this list of conditions, and the following 18 * disclaimer. 19 * 2. Redistributions in binary form must reproduce the above copyright 20 * notice, this list of conditions and the following disclaimer in the 21 * documentation and/or other materials provided with the distribution. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 24 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 25 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 26 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 27 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 28 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 32 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33 */ 34 35 #include <sys/cdefs.h> 36 __FBSDID("$FreeBSD$"); 37 38 #include "opt_mac.h" 39 40 #include <sys/param.h> 41 #include <sys/systm.h> 42 #include <sys/kernel.h> 43 #include <sys/kthread.h> 44 #include <sys/lock.h> 45 #include <sys/mount.h> 46 #include <sys/mutex.h> 47 #include <sys/namei.h> 48 #include <sys/proc.h> 49 #include <sys/vnode.h> 50 #include <sys/alq.h> 51 #include <sys/malloc.h> 52 #include <sys/unistd.h> 53 #include <sys/fcntl.h> 54 #include <sys/eventhandler.h> 55 56 #include <security/mac/mac_framework.h> 57 58 /* Async. Logging Queue */ 59 struct alq { 60 char *aq_entbuf; /* Buffer for stored entries */ 61 int aq_entmax; /* Max entries */ 62 int aq_entlen; /* Entry length */ 63 int aq_freebytes; /* Bytes available in buffer */ 64 int aq_buflen; /* Total length of our buffer */ 65 int aq_writehead; /* Location for next write */ 66 int aq_writetail; /* Flush starts at this location */ 67 int aq_wrapearly; /* # bytes left blank at end of buf */ 68 int aq_flags; /* Queue flags */ 69 int aq_waiters; /* Num threads waiting for resources 70 * NB: Used as a wait channel so must 71 * not be first field in the alq struct 72 */ 73 struct ale aq_getpost; /* ALE for use by get/post */ 74 struct mtx aq_mtx; /* Queue lock */ 75 struct vnode *aq_vp; /* Open vnode handle */ 76 struct ucred *aq_cred; /* Credentials of the opening thread */ 77 LIST_ENTRY(alq) aq_act; /* List of active queues */ 78 LIST_ENTRY(alq) aq_link; /* List of all queues */ 79 }; 80 81 #define AQ_WANTED 0x0001 /* Wakeup sleeper when io is done */ 82 #define AQ_ACTIVE 0x0002 /* on the active list */ 83 #define AQ_FLUSHING 0x0004 /* doing IO */ 84 #define AQ_SHUTDOWN 0x0008 /* Queue no longer valid */ 85 #define AQ_ORDERED 0x0010 /* Queue enforces ordered writes */ 86 #define AQ_LEGACY 0x0020 /* Legacy queue (fixed length writes) */ 87 88 #define ALQ_LOCK(alq) mtx_lock_spin(&(alq)->aq_mtx) 89 #define ALQ_UNLOCK(alq) mtx_unlock_spin(&(alq)->aq_mtx) 90 91 #define HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen) 92 93 static MALLOC_DEFINE(M_ALD, "ALD", "ALD"); 94 95 /* 96 * The ald_mtx protects the ald_queues list and the ald_active list. 97 */ 98 static struct mtx ald_mtx; 99 static LIST_HEAD(, alq) ald_queues; 100 static LIST_HEAD(, alq) ald_active; 101 static int ald_shutingdown = 0; 102 struct thread *ald_thread; 103 static struct proc *ald_proc; 104 static eventhandler_tag alq_eventhandler_tag = NULL; 105 106 #define ALD_LOCK() mtx_lock(&ald_mtx) 107 #define ALD_UNLOCK() mtx_unlock(&ald_mtx) 108 109 /* Daemon functions */ 110 static int ald_add(struct alq *); 111 static int ald_rem(struct alq *); 112 static void ald_startup(void *); 113 static void ald_daemon(void); 114 static void ald_shutdown(void *, int); 115 static void ald_activate(struct alq *); 116 static void ald_deactivate(struct alq *); 117 118 /* Internal queue functions */ 119 static void alq_shutdown(struct alq *); 120 static void alq_destroy(struct alq *); 121 static int alq_doio(struct alq *); 122 123 /* 124 * Add a new queue to the global list. Fail if we're shutting down. 125 */ 126 static int 127 ald_add(struct alq *alq) 128 { 129 int error; 130 131 error = 0; 132 133 ALD_LOCK(); 134 if (ald_shutingdown) { 135 error = EBUSY; 136 goto done; 137 } 138 LIST_INSERT_HEAD(&ald_queues, alq, aq_link); 139 done: 140 ALD_UNLOCK(); 141 return (error); 142 } 143 144 /* 145 * Remove a queue from the global list unless we're shutting down. If so, 146 * the ald will take care of cleaning up it's resources. 147 */ 148 static int 149 ald_rem(struct alq *alq) 150 { 151 int error; 152 153 error = 0; 154 155 ALD_LOCK(); 156 if (ald_shutingdown) { 157 error = EBUSY; 158 goto done; 159 } 160 LIST_REMOVE(alq, aq_link); 161 done: 162 ALD_UNLOCK(); 163 return (error); 164 } 165 166 /* 167 * Put a queue on the active list. This will schedule it for writing. 168 */ 169 static void 170 ald_activate(struct alq *alq) 171 { 172 LIST_INSERT_HEAD(&ald_active, alq, aq_act); 173 wakeup(&ald_active); 174 } 175 176 static void 177 ald_deactivate(struct alq *alq) 178 { 179 LIST_REMOVE(alq, aq_act); 180 alq->aq_flags &= ~AQ_ACTIVE; 181 } 182 183 static void 184 ald_startup(void *unused) 185 { 186 mtx_init(&ald_mtx, "ALDmtx", NULL, MTX_DEF|MTX_QUIET); 187 LIST_INIT(&ald_queues); 188 LIST_INIT(&ald_active); 189 } 190 191 static void 192 ald_daemon(void) 193 { 194 int needwakeup; 195 struct alq *alq; 196 197 ald_thread = FIRST_THREAD_IN_PROC(ald_proc); 198 199 alq_eventhandler_tag = EVENTHANDLER_REGISTER(shutdown_pre_sync, 200 ald_shutdown, NULL, SHUTDOWN_PRI_FIRST); 201 202 ALD_LOCK(); 203 204 for (;;) { 205 while ((alq = LIST_FIRST(&ald_active)) == NULL && 206 !ald_shutingdown) 207 mtx_sleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0); 208 209 /* Don't shutdown until all active ALQs are flushed. */ 210 if (ald_shutingdown && alq == NULL) { 211 ALD_UNLOCK(); 212 break; 213 } 214 215 ALQ_LOCK(alq); 216 ald_deactivate(alq); 217 ALD_UNLOCK(); 218 needwakeup = alq_doio(alq); 219 ALQ_UNLOCK(alq); 220 if (needwakeup) 221 wakeup_one(alq); 222 ALD_LOCK(); 223 } 224 225 kproc_exit(0); 226 } 227 228 static void 229 ald_shutdown(void *arg, int howto) 230 { 231 struct alq *alq; 232 233 ALD_LOCK(); 234 235 /* Ensure no new queues can be created. */ 236 ald_shutingdown = 1; 237 238 /* Shutdown all ALQs prior to terminating the ald_daemon. */ 239 while ((alq = LIST_FIRST(&ald_queues)) != NULL) { 240 LIST_REMOVE(alq, aq_link); 241 ALD_UNLOCK(); 242 alq_shutdown(alq); 243 ALD_LOCK(); 244 } 245 246 /* At this point, all ALQs are flushed and shutdown. */ 247 248 /* 249 * Wake ald_daemon so that it exits. It won't be able to do 250 * anything until we mtx_sleep because we hold the ald_mtx. 251 */ 252 wakeup(&ald_active); 253 254 /* Wait for ald_daemon to exit. */ 255 mtx_sleep(ald_proc, &ald_mtx, PWAIT, "aldslp", 0); 256 257 ALD_UNLOCK(); 258 } 259 260 static void 261 alq_shutdown(struct alq *alq) 262 { 263 ALQ_LOCK(alq); 264 265 /* Stop any new writers. */ 266 alq->aq_flags |= AQ_SHUTDOWN; 267 268 /* 269 * If the ALQ isn't active but has unwritten data (possible if 270 * the ALQ_NOACTIVATE flag has been used), explicitly activate the 271 * ALQ here so that the pending data gets flushed by the ald_daemon. 272 */ 273 if (!(alq->aq_flags & AQ_ACTIVE) && HAS_PENDING_DATA(alq)) { 274 alq->aq_flags |= AQ_ACTIVE; 275 ALQ_UNLOCK(alq); 276 ALD_LOCK(); 277 ald_activate(alq); 278 ALD_UNLOCK(); 279 ALQ_LOCK(alq); 280 } 281 282 /* Drain IO */ 283 while (alq->aq_flags & AQ_ACTIVE) { 284 alq->aq_flags |= AQ_WANTED; 285 msleep_spin(alq, &alq->aq_mtx, "aldclose", 0); 286 } 287 ALQ_UNLOCK(alq); 288 289 vn_close(alq->aq_vp, FWRITE, alq->aq_cred, 290 curthread); 291 crfree(alq->aq_cred); 292 } 293 294 void 295 alq_destroy(struct alq *alq) 296 { 297 /* Drain all pending IO. */ 298 alq_shutdown(alq); 299 300 mtx_destroy(&alq->aq_mtx); 301 free(alq->aq_entbuf, M_ALD); 302 free(alq, M_ALD); 303 } 304 305 /* 306 * Flush all pending data to disk. This operation will block. 307 */ 308 static int 309 alq_doio(struct alq *alq) 310 { 311 struct thread *td; 312 struct mount *mp; 313 struct vnode *vp; 314 struct uio auio; 315 struct iovec aiov[2]; 316 int totlen; 317 int iov; 318 int wrapearly; 319 320 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__)); 321 322 vp = alq->aq_vp; 323 td = curthread; 324 totlen = 0; 325 iov = 1; 326 wrapearly = alq->aq_wrapearly; 327 328 bzero(&aiov, sizeof(aiov)); 329 bzero(&auio, sizeof(auio)); 330 331 /* Start the write from the location of our buffer tail pointer. */ 332 aiov[0].iov_base = alq->aq_entbuf + alq->aq_writetail; 333 334 if (alq->aq_writetail < alq->aq_writehead) { 335 /* Buffer not wrapped. */ 336 totlen = aiov[0].iov_len = alq->aq_writehead - alq->aq_writetail; 337 } else if (alq->aq_writehead == 0) { 338 /* Buffer not wrapped (special case to avoid an empty iov). */ 339 totlen = aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail - 340 wrapearly; 341 } else { 342 /* 343 * Buffer wrapped, requires 2 aiov entries: 344 * - first is from writetail to end of buffer 345 * - second is from start of buffer to writehead 346 */ 347 aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail - 348 wrapearly; 349 iov++; 350 aiov[1].iov_base = alq->aq_entbuf; 351 aiov[1].iov_len = alq->aq_writehead; 352 totlen = aiov[0].iov_len + aiov[1].iov_len; 353 } 354 355 alq->aq_flags |= AQ_FLUSHING; 356 ALQ_UNLOCK(alq); 357 358 auio.uio_iov = &aiov[0]; 359 auio.uio_offset = 0; 360 auio.uio_segflg = UIO_SYSSPACE; 361 auio.uio_rw = UIO_WRITE; 362 auio.uio_iovcnt = iov; 363 auio.uio_resid = totlen; 364 auio.uio_td = td; 365 366 /* 367 * Do all of the junk required to write now. 368 */ 369 vn_start_write(vp, &mp, V_WAIT); 370 vn_lock(vp, LK_EXCLUSIVE | LK_RETRY); 371 /* 372 * XXX: VOP_WRITE error checks are ignored. 373 */ 374 #ifdef MAC 375 if (mac_vnode_check_write(alq->aq_cred, NOCRED, vp) == 0) 376 #endif 377 VOP_WRITE(vp, &auio, IO_UNIT | IO_APPEND, alq->aq_cred); 378 VOP_UNLOCK(vp); 379 vn_finished_write(mp); 380 381 ALQ_LOCK(alq); 382 alq->aq_flags &= ~AQ_FLUSHING; 383 384 /* Adjust writetail as required, taking into account wrapping. */ 385 alq->aq_writetail = (alq->aq_writetail + totlen + wrapearly) % 386 alq->aq_buflen; 387 alq->aq_freebytes += totlen + wrapearly; 388 389 /* 390 * If we just flushed part of the buffer which wrapped, reset the 391 * wrapearly indicator. 392 */ 393 if (wrapearly) 394 alq->aq_wrapearly = 0; 395 396 /* 397 * If we just flushed the buffer completely, reset indexes to 0 to 398 * minimise buffer wraps. 399 * This is also required to ensure alq_getn() can't wedge itself. 400 */ 401 if (!HAS_PENDING_DATA(alq)) 402 alq->aq_writehead = alq->aq_writetail = 0; 403 404 KASSERT((alq->aq_writetail >= 0 && alq->aq_writetail < alq->aq_buflen), 405 ("%s: aq_writetail < 0 || aq_writetail >= aq_buflen", __func__)); 406 407 if (alq->aq_flags & AQ_WANTED) { 408 alq->aq_flags &= ~AQ_WANTED; 409 return (1); 410 } 411 412 return(0); 413 } 414 415 static struct kproc_desc ald_kp = { 416 "ALQ Daemon", 417 ald_daemon, 418 &ald_proc 419 }; 420 421 SYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, SI_ORDER_ANY, kproc_start, &ald_kp); 422 SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, ald_startup, NULL); 423 424 /* User visible queue functions */ 425 426 /* 427 * Create the queue data structure, allocate the buffer, and open the file. 428 */ 429 430 int 431 alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode, 432 int size, int flags) 433 { 434 struct thread *td __unused; 435 struct nameidata nd; 436 struct alq *alq; 437 int oflags; 438 int error; 439 440 KASSERT((size > 0), ("%s: size <= 0", __func__)); 441 442 *alqp = NULL; 443 td = curthread; 444 445 NDINIT(&nd, LOOKUP, NOFOLLOW, UIO_SYSSPACE, file, td); 446 oflags = FWRITE | O_NOFOLLOW | O_CREAT; 447 448 error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL); 449 if (error) 450 return (error); 451 452 NDFREE(&nd, NDF_ONLY_PNBUF); 453 /* We just unlock so we hold a reference */ 454 VOP_UNLOCK(nd.ni_vp); 455 456 alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO); 457 alq->aq_vp = nd.ni_vp; 458 alq->aq_cred = crhold(cred); 459 460 mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET); 461 462 alq->aq_buflen = size; 463 alq->aq_entmax = 0; 464 alq->aq_entlen = 0; 465 466 alq->aq_freebytes = alq->aq_buflen; 467 alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO); 468 alq->aq_writehead = alq->aq_writetail = 0; 469 if (flags & ALQ_ORDERED) 470 alq->aq_flags |= AQ_ORDERED; 471 472 if ((error = ald_add(alq)) != 0) { 473 alq_destroy(alq); 474 return (error); 475 } 476 477 *alqp = alq; 478 479 return (0); 480 } 481 482 int 483 alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode, 484 int size, int count) 485 { 486 int ret; 487 488 KASSERT((count >= 0), ("%s: count < 0", __func__)); 489 490 if (count > 0) { 491 if ((ret = alq_open_flags(alqp, file, cred, cmode, 492 size*count, 0)) == 0) { 493 (*alqp)->aq_flags |= AQ_LEGACY; 494 (*alqp)->aq_entmax = count; 495 (*alqp)->aq_entlen = size; 496 } 497 } else 498 ret = alq_open_flags(alqp, file, cred, cmode, size, 0); 499 500 return (ret); 501 } 502 503 /* 504 * Copy a new entry into the queue. If the operation would block either 505 * wait or return an error depending on the value of waitok. 506 */ 507 int 508 alq_writen(struct alq *alq, void *data, int len, int flags) 509 { 510 int activate, copy, ret; 511 void *waitchan; 512 513 KASSERT((len > 0 && len <= alq->aq_buflen), 514 ("%s: len <= 0 || len > aq_buflen", __func__)); 515 516 activate = ret = 0; 517 copy = len; 518 waitchan = NULL; 519 520 ALQ_LOCK(alq); 521 522 /* 523 * Fail to perform the write and return EWOULDBLOCK if: 524 * - The message is larger than our underlying buffer. 525 * - The ALQ is being shutdown. 526 * - There is insufficient free space in our underlying buffer 527 * to accept the message and the user can't wait for space. 528 * - There is insufficient free space in our underlying buffer 529 * to accept the message and the alq is inactive due to prior 530 * use of the ALQ_NOACTIVATE flag (which would lead to deadlock). 531 */ 532 if (len > alq->aq_buflen || 533 alq->aq_flags & AQ_SHUTDOWN || 534 (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) && 535 HAS_PENDING_DATA(alq))) && alq->aq_freebytes < len)) { 536 ALQ_UNLOCK(alq); 537 return (EWOULDBLOCK); 538 } 539 540 /* 541 * If we want ordered writes and there is already at least one thread 542 * waiting for resources to become available, sleep until we're woken. 543 */ 544 if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) { 545 KASSERT(!(flags & ALQ_NOWAIT), 546 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 547 alq->aq_waiters++; 548 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwnord", 0); 549 alq->aq_waiters--; 550 } 551 552 /* 553 * (ALQ_WAITOK && aq_freebytes < len) or aq_freebytes >= len, either 554 * enter while loop and sleep until we have enough free bytes (former) 555 * or skip (latter). If AQ_ORDERED is set, only 1 thread at a time will 556 * be in this loop. Otherwise, multiple threads may be sleeping here 557 * competing for ALQ resources. 558 */ 559 while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) { 560 KASSERT(!(flags & ALQ_NOWAIT), 561 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 562 alq->aq_flags |= AQ_WANTED; 563 alq->aq_waiters++; 564 if (waitchan) 565 wakeup(waitchan); 566 msleep_spin(alq, &alq->aq_mtx, "alqwnres", 0); 567 alq->aq_waiters--; 568 569 /* 570 * If we're the first thread to wake after an AQ_WANTED wakeup 571 * but there isn't enough free space for us, we're going to loop 572 * and sleep again. If there are other threads waiting in this 573 * loop, schedule a wakeup so that they can see if the space 574 * they require is available. 575 */ 576 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) && 577 alq->aq_freebytes < len && !(alq->aq_flags & AQ_WANTED)) 578 waitchan = alq; 579 else 580 waitchan = NULL; 581 } 582 583 /* 584 * If there are waiters, we need to signal the waiting threads after we 585 * complete our work. The alq ptr is used as a wait channel for threads 586 * requiring resources to be freed up. In the AQ_ORDERED case, threads 587 * are not allowed to concurrently compete for resources in the above 588 * while loop, so we use a different wait channel in this case. 589 */ 590 if (alq->aq_waiters > 0) { 591 if (alq->aq_flags & AQ_ORDERED) 592 waitchan = &alq->aq_waiters; 593 else 594 waitchan = alq; 595 } else 596 waitchan = NULL; 597 598 /* Bail if we're shutting down. */ 599 if (alq->aq_flags & AQ_SHUTDOWN) { 600 ret = EWOULDBLOCK; 601 goto unlock; 602 } 603 604 /* 605 * If we need to wrap the buffer to accommodate the write, 606 * we'll need 2 calls to bcopy. 607 */ 608 if ((alq->aq_buflen - alq->aq_writehead) < len) 609 copy = alq->aq_buflen - alq->aq_writehead; 610 611 /* Copy message (or part thereof if wrap required) to the buffer. */ 612 bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy); 613 alq->aq_writehead += copy; 614 615 if (alq->aq_writehead >= alq->aq_buflen) { 616 KASSERT((alq->aq_writehead == alq->aq_buflen), 617 ("%s: alq->aq_writehead (%d) > alq->aq_buflen (%d)", 618 __func__, 619 alq->aq_writehead, 620 alq->aq_buflen)); 621 alq->aq_writehead = 0; 622 } 623 624 if (copy != len) { 625 /* 626 * Wrap the buffer by copying the remainder of our message 627 * to the start of the buffer and resetting aq_writehead. 628 */ 629 bcopy(((uint8_t *)data)+copy, alq->aq_entbuf, len - copy); 630 alq->aq_writehead = len - copy; 631 } 632 633 KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen), 634 ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__)); 635 636 alq->aq_freebytes -= len; 637 638 if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) { 639 alq->aq_flags |= AQ_ACTIVE; 640 activate = 1; 641 } 642 643 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__)); 644 645 unlock: 646 ALQ_UNLOCK(alq); 647 648 if (activate) { 649 ALD_LOCK(); 650 ald_activate(alq); 651 ALD_UNLOCK(); 652 } 653 654 /* NB: We rely on wakeup_one waking threads in a FIFO manner. */ 655 if (waitchan != NULL) 656 wakeup_one(waitchan); 657 658 return (ret); 659 } 660 661 int 662 alq_write(struct alq *alq, void *data, int flags) 663 { 664 /* Should only be called in fixed length message (legacy) mode. */ 665 KASSERT((alq->aq_flags & AQ_LEGACY), 666 ("%s: fixed length write on variable length queue", __func__)); 667 return (alq_writen(alq, data, alq->aq_entlen, flags)); 668 } 669 670 /* 671 * Retrieve a pointer for the ALQ to write directly into, avoiding bcopy. 672 */ 673 struct ale * 674 alq_getn(struct alq *alq, int len, int flags) 675 { 676 int contigbytes; 677 void *waitchan; 678 679 KASSERT((len > 0 && len <= alq->aq_buflen), 680 ("%s: len <= 0 || len > alq->aq_buflen", __func__)); 681 682 waitchan = NULL; 683 684 ALQ_LOCK(alq); 685 686 /* 687 * Determine the number of free contiguous bytes. 688 * We ensure elsewhere that if aq_writehead == aq_writetail because 689 * the buffer is empty, they will both be set to 0 and therefore 690 * aq_freebytes == aq_buflen and is fully contiguous. 691 * If they are equal and the buffer is not empty, aq_freebytes will 692 * be 0 indicating the buffer is full. 693 */ 694 if (alq->aq_writehead <= alq->aq_writetail) 695 contigbytes = alq->aq_freebytes; 696 else { 697 contigbytes = alq->aq_buflen - alq->aq_writehead; 698 699 if (contigbytes < len) { 700 /* 701 * Insufficient space at end of buffer to handle a 702 * contiguous write. Wrap early if there's space at 703 * the beginning. This will leave a hole at the end 704 * of the buffer which we will have to skip over when 705 * flushing the buffer to disk. 706 */ 707 if (alq->aq_writetail >= len || flags & ALQ_WAITOK) { 708 /* Keep track of # bytes left blank. */ 709 alq->aq_wrapearly = contigbytes; 710 /* Do the wrap and adjust counters. */ 711 contigbytes = alq->aq_freebytes = 712 alq->aq_writetail; 713 alq->aq_writehead = 0; 714 } 715 } 716 } 717 718 /* 719 * Return a NULL ALE if: 720 * - The message is larger than our underlying buffer. 721 * - The ALQ is being shutdown. 722 * - There is insufficient free space in our underlying buffer 723 * to accept the message and the user can't wait for space. 724 * - There is insufficient free space in our underlying buffer 725 * to accept the message and the alq is inactive due to prior 726 * use of the ALQ_NOACTIVATE flag (which would lead to deadlock). 727 */ 728 if (len > alq->aq_buflen || 729 alq->aq_flags & AQ_SHUTDOWN || 730 (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) && 731 HAS_PENDING_DATA(alq))) && contigbytes < len)) { 732 ALQ_UNLOCK(alq); 733 return (NULL); 734 } 735 736 /* 737 * If we want ordered writes and there is already at least one thread 738 * waiting for resources to become available, sleep until we're woken. 739 */ 740 if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) { 741 KASSERT(!(flags & ALQ_NOWAIT), 742 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 743 alq->aq_waiters++; 744 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqgnord", 0); 745 alq->aq_waiters--; 746 } 747 748 /* 749 * (ALQ_WAITOK && contigbytes < len) or contigbytes >= len, either enter 750 * while loop and sleep until we have enough contiguous free bytes 751 * (former) or skip (latter). If AQ_ORDERED is set, only 1 thread at a 752 * time will be in this loop. Otherwise, multiple threads may be 753 * sleeping here competing for ALQ resources. 754 */ 755 while (contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) { 756 KASSERT(!(flags & ALQ_NOWAIT), 757 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 758 alq->aq_flags |= AQ_WANTED; 759 alq->aq_waiters++; 760 if (waitchan) 761 wakeup(waitchan); 762 msleep_spin(alq, &alq->aq_mtx, "alqgnres", 0); 763 alq->aq_waiters--; 764 765 if (alq->aq_writehead <= alq->aq_writetail) 766 contigbytes = alq->aq_freebytes; 767 else 768 contigbytes = alq->aq_buflen - alq->aq_writehead; 769 770 /* 771 * If we're the first thread to wake after an AQ_WANTED wakeup 772 * but there isn't enough free space for us, we're going to loop 773 * and sleep again. If there are other threads waiting in this 774 * loop, schedule a wakeup so that they can see if the space 775 * they require is available. 776 */ 777 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) && 778 contigbytes < len && !(alq->aq_flags & AQ_WANTED)) 779 waitchan = alq; 780 else 781 waitchan = NULL; 782 } 783 784 /* 785 * If there are waiters, we need to signal the waiting threads after we 786 * complete our work. The alq ptr is used as a wait channel for threads 787 * requiring resources to be freed up. In the AQ_ORDERED case, threads 788 * are not allowed to concurrently compete for resources in the above 789 * while loop, so we use a different wait channel in this case. 790 */ 791 if (alq->aq_waiters > 0) { 792 if (alq->aq_flags & AQ_ORDERED) 793 waitchan = &alq->aq_waiters; 794 else 795 waitchan = alq; 796 } else 797 waitchan = NULL; 798 799 /* Bail if we're shutting down. */ 800 if (alq->aq_flags & AQ_SHUTDOWN) { 801 ALQ_UNLOCK(alq); 802 if (waitchan != NULL) 803 wakeup_one(waitchan); 804 return (NULL); 805 } 806 807 /* 808 * If we are here, we have a contiguous number of bytes >= len 809 * available in our buffer starting at aq_writehead. 810 */ 811 alq->aq_getpost.ae_data = alq->aq_entbuf + alq->aq_writehead; 812 alq->aq_getpost.ae_bytesused = len; 813 814 return (&alq->aq_getpost); 815 } 816 817 struct ale * 818 alq_get(struct alq *alq, int flags) 819 { 820 /* Should only be called in fixed length message (legacy) mode. */ 821 KASSERT((alq->aq_flags & AQ_LEGACY), 822 ("%s: fixed length get on variable length queue", __func__)); 823 return (alq_getn(alq, alq->aq_entlen, flags)); 824 } 825 826 void 827 alq_post_flags(struct alq *alq, struct ale *ale, int flags) 828 { 829 int activate; 830 void *waitchan; 831 832 activate = 0; 833 834 if (ale->ae_bytesused > 0) { 835 if (!(alq->aq_flags & AQ_ACTIVE) && 836 !(flags & ALQ_NOACTIVATE)) { 837 alq->aq_flags |= AQ_ACTIVE; 838 activate = 1; 839 } 840 841 alq->aq_writehead += ale->ae_bytesused; 842 alq->aq_freebytes -= ale->ae_bytesused; 843 844 /* Wrap aq_writehead if we filled to the end of the buffer. */ 845 if (alq->aq_writehead == alq->aq_buflen) 846 alq->aq_writehead = 0; 847 848 KASSERT((alq->aq_writehead >= 0 && 849 alq->aq_writehead < alq->aq_buflen), 850 ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", 851 __func__)); 852 853 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__)); 854 } 855 856 /* 857 * If there are waiters, we need to signal the waiting threads after we 858 * complete our work. The alq ptr is used as a wait channel for threads 859 * requiring resources to be freed up. In the AQ_ORDERED case, threads 860 * are not allowed to concurrently compete for resources in the 861 * alq_getn() while loop, so we use a different wait channel in this case. 862 */ 863 if (alq->aq_waiters > 0) { 864 if (alq->aq_flags & AQ_ORDERED) 865 waitchan = &alq->aq_waiters; 866 else 867 waitchan = alq; 868 } else 869 waitchan = NULL; 870 871 ALQ_UNLOCK(alq); 872 873 if (activate) { 874 ALD_LOCK(); 875 ald_activate(alq); 876 ALD_UNLOCK(); 877 } 878 879 /* NB: We rely on wakeup_one waking threads in a FIFO manner. */ 880 if (waitchan != NULL) 881 wakeup_one(waitchan); 882 } 883 884 void 885 alq_flush(struct alq *alq) 886 { 887 int needwakeup = 0; 888 889 ALD_LOCK(); 890 ALQ_LOCK(alq); 891 892 /* 893 * Pull the lever iff there is data to flush and we're 894 * not already in the middle of a flush operation. 895 */ 896 if (HAS_PENDING_DATA(alq) && !(alq->aq_flags & AQ_FLUSHING)) { 897 if (alq->aq_flags & AQ_ACTIVE) 898 ald_deactivate(alq); 899 900 ALD_UNLOCK(); 901 needwakeup = alq_doio(alq); 902 } else 903 ALD_UNLOCK(); 904 905 ALQ_UNLOCK(alq); 906 907 if (needwakeup) 908 wakeup_one(alq); 909 } 910 911 /* 912 * Flush remaining data, close the file and free all resources. 913 */ 914 void 915 alq_close(struct alq *alq) 916 { 917 /* Only flush and destroy alq if not already shutting down. */ 918 if (ald_rem(alq) == 0) 919 alq_destroy(alq); 920 } 921 922 static int 923 alq_load_handler(module_t mod, int what, void *arg) 924 { 925 int ret; 926 927 ret = 0; 928 929 switch (what) { 930 case MOD_LOAD: 931 case MOD_SHUTDOWN: 932 break; 933 934 case MOD_QUIESCE: 935 ALD_LOCK(); 936 /* Only allow unload if there are no open queues. */ 937 if (LIST_FIRST(&ald_queues) == NULL) { 938 ald_shutingdown = 1; 939 ALD_UNLOCK(); 940 EVENTHANDLER_DEREGISTER(shutdown_pre_sync, 941 alq_eventhandler_tag); 942 ald_shutdown(NULL, 0); 943 mtx_destroy(&ald_mtx); 944 } else { 945 ALD_UNLOCK(); 946 ret = EBUSY; 947 } 948 break; 949 950 case MOD_UNLOAD: 951 /* If MOD_QUIESCE failed we must fail here too. */ 952 if (ald_shutingdown == 0) 953 ret = EBUSY; 954 break; 955 956 default: 957 ret = EINVAL; 958 break; 959 } 960 961 return (ret); 962 } 963 964 static moduledata_t alq_mod = 965 { 966 "alq", 967 alq_load_handler, 968 NULL 969 }; 970 971 DECLARE_MODULE(alq, alq_mod, SI_SUB_LAST, SI_ORDER_ANY); 972 MODULE_VERSION(alq, 1); 973