1 /*- 2 * SPDX-License-Identifier: BSD-2-Clause-FreeBSD 3 * 4 * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org> 5 * Copyright (c) 2008-2009, Lawrence Stewart <lstewart@freebsd.org> 6 * Copyright (c) 2009-2010, The FreeBSD Foundation 7 * All rights reserved. 8 * 9 * Portions of this software were developed at the Centre for Advanced 10 * Internet Architectures, Swinburne University of Technology, Melbourne, 11 * Australia by Lawrence Stewart under sponsorship from the FreeBSD Foundation. 12 * 13 * Redistribution and use in source and binary forms, with or without 14 * modification, are permitted provided that the following conditions 15 * are met: 16 * 1. Redistributions of source code must retain the above copyright 17 * notice unmodified, this list of conditions, and the following 18 * disclaimer. 19 * 2. Redistributions in binary form must reproduce the above copyright 20 * notice, this list of conditions and the following disclaimer in the 21 * documentation and/or other materials provided with the distribution. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 24 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 25 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 26 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 27 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 28 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 32 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33 */ 34 35 #include <sys/cdefs.h> 36 __FBSDID("$FreeBSD$"); 37 38 #include "opt_mac.h" 39 40 #include <sys/param.h> 41 #include <sys/systm.h> 42 #include <sys/kernel.h> 43 #include <sys/kthread.h> 44 #include <sys/lock.h> 45 #include <sys/mount.h> 46 #include <sys/mutex.h> 47 #include <sys/namei.h> 48 #include <sys/proc.h> 49 #include <sys/vnode.h> 50 #include <sys/alq.h> 51 #include <sys/malloc.h> 52 #include <sys/unistd.h> 53 #include <sys/fcntl.h> 54 #include <sys/eventhandler.h> 55 56 #include <security/mac/mac_framework.h> 57 58 /* Async. Logging Queue */ 59 struct alq { 60 char *aq_entbuf; /* Buffer for stored entries */ 61 int aq_entmax; /* Max entries */ 62 int aq_entlen; /* Entry length */ 63 int aq_freebytes; /* Bytes available in buffer */ 64 int aq_buflen; /* Total length of our buffer */ 65 int aq_writehead; /* Location for next write */ 66 int aq_writetail; /* Flush starts at this location */ 67 int aq_wrapearly; /* # bytes left blank at end of buf */ 68 int aq_flags; /* Queue flags */ 69 int aq_waiters; /* Num threads waiting for resources 70 * NB: Used as a wait channel so must 71 * not be first field in the alq struct 72 */ 73 struct ale aq_getpost; /* ALE for use by get/post */ 74 struct mtx aq_mtx; /* Queue lock */ 75 struct vnode *aq_vp; /* Open vnode handle */ 76 struct ucred *aq_cred; /* Credentials of the opening thread */ 77 LIST_ENTRY(alq) aq_act; /* List of active queues */ 78 LIST_ENTRY(alq) aq_link; /* List of all queues */ 79 }; 80 81 #define AQ_WANTED 0x0001 /* Wakeup sleeper when io is done */ 82 #define AQ_ACTIVE 0x0002 /* on the active list */ 83 #define AQ_FLUSHING 0x0004 /* doing IO */ 84 #define AQ_SHUTDOWN 0x0008 /* Queue no longer valid */ 85 #define AQ_ORDERED 0x0010 /* Queue enforces ordered writes */ 86 #define AQ_LEGACY 0x0020 /* Legacy queue (fixed length writes) */ 87 88 #define ALQ_LOCK(alq) mtx_lock_spin(&(alq)->aq_mtx) 89 #define ALQ_UNLOCK(alq) mtx_unlock_spin(&(alq)->aq_mtx) 90 91 #define HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen) 92 93 static MALLOC_DEFINE(M_ALD, "ALD", "ALD"); 94 95 /* 96 * The ald_mtx protects the ald_queues list and the ald_active list. 97 */ 98 static struct mtx ald_mtx; 99 static LIST_HEAD(, alq) ald_queues; 100 static LIST_HEAD(, alq) ald_active; 101 static int ald_shutingdown = 0; 102 struct thread *ald_thread; 103 static struct proc *ald_proc; 104 static eventhandler_tag alq_eventhandler_tag = NULL; 105 106 #define ALD_LOCK() mtx_lock(&ald_mtx) 107 #define ALD_UNLOCK() mtx_unlock(&ald_mtx) 108 109 /* Daemon functions */ 110 static int ald_add(struct alq *); 111 static int ald_rem(struct alq *); 112 static void ald_startup(void *); 113 static void ald_daemon(void); 114 static void ald_shutdown(void *, int); 115 static void ald_activate(struct alq *); 116 static void ald_deactivate(struct alq *); 117 118 /* Internal queue functions */ 119 static void alq_shutdown(struct alq *); 120 static void alq_destroy(struct alq *); 121 static int alq_doio(struct alq *); 122 123 124 /* 125 * Add a new queue to the global list. Fail if we're shutting down. 126 */ 127 static int 128 ald_add(struct alq *alq) 129 { 130 int error; 131 132 error = 0; 133 134 ALD_LOCK(); 135 if (ald_shutingdown) { 136 error = EBUSY; 137 goto done; 138 } 139 LIST_INSERT_HEAD(&ald_queues, alq, aq_link); 140 done: 141 ALD_UNLOCK(); 142 return (error); 143 } 144 145 /* 146 * Remove a queue from the global list unless we're shutting down. If so, 147 * the ald will take care of cleaning up it's resources. 148 */ 149 static int 150 ald_rem(struct alq *alq) 151 { 152 int error; 153 154 error = 0; 155 156 ALD_LOCK(); 157 if (ald_shutingdown) { 158 error = EBUSY; 159 goto done; 160 } 161 LIST_REMOVE(alq, aq_link); 162 done: 163 ALD_UNLOCK(); 164 return (error); 165 } 166 167 /* 168 * Put a queue on the active list. This will schedule it for writing. 169 */ 170 static void 171 ald_activate(struct alq *alq) 172 { 173 LIST_INSERT_HEAD(&ald_active, alq, aq_act); 174 wakeup(&ald_active); 175 } 176 177 static void 178 ald_deactivate(struct alq *alq) 179 { 180 LIST_REMOVE(alq, aq_act); 181 alq->aq_flags &= ~AQ_ACTIVE; 182 } 183 184 static void 185 ald_startup(void *unused) 186 { 187 mtx_init(&ald_mtx, "ALDmtx", NULL, MTX_DEF|MTX_QUIET); 188 LIST_INIT(&ald_queues); 189 LIST_INIT(&ald_active); 190 } 191 192 static void 193 ald_daemon(void) 194 { 195 int needwakeup; 196 struct alq *alq; 197 198 ald_thread = FIRST_THREAD_IN_PROC(ald_proc); 199 200 alq_eventhandler_tag = EVENTHANDLER_REGISTER(shutdown_pre_sync, 201 ald_shutdown, NULL, SHUTDOWN_PRI_FIRST); 202 203 ALD_LOCK(); 204 205 for (;;) { 206 while ((alq = LIST_FIRST(&ald_active)) == NULL && 207 !ald_shutingdown) 208 mtx_sleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0); 209 210 /* Don't shutdown until all active ALQs are flushed. */ 211 if (ald_shutingdown && alq == NULL) { 212 ALD_UNLOCK(); 213 break; 214 } 215 216 ALQ_LOCK(alq); 217 ald_deactivate(alq); 218 ALD_UNLOCK(); 219 needwakeup = alq_doio(alq); 220 ALQ_UNLOCK(alq); 221 if (needwakeup) 222 wakeup_one(alq); 223 ALD_LOCK(); 224 } 225 226 kproc_exit(0); 227 } 228 229 static void 230 ald_shutdown(void *arg, int howto) 231 { 232 struct alq *alq; 233 234 ALD_LOCK(); 235 236 /* Ensure no new queues can be created. */ 237 ald_shutingdown = 1; 238 239 /* Shutdown all ALQs prior to terminating the ald_daemon. */ 240 while ((alq = LIST_FIRST(&ald_queues)) != NULL) { 241 LIST_REMOVE(alq, aq_link); 242 ALD_UNLOCK(); 243 alq_shutdown(alq); 244 ALD_LOCK(); 245 } 246 247 /* At this point, all ALQs are flushed and shutdown. */ 248 249 /* 250 * Wake ald_daemon so that it exits. It won't be able to do 251 * anything until we mtx_sleep because we hold the ald_mtx. 252 */ 253 wakeup(&ald_active); 254 255 /* Wait for ald_daemon to exit. */ 256 mtx_sleep(ald_proc, &ald_mtx, PWAIT, "aldslp", 0); 257 258 ALD_UNLOCK(); 259 } 260 261 static void 262 alq_shutdown(struct alq *alq) 263 { 264 ALQ_LOCK(alq); 265 266 /* Stop any new writers. */ 267 alq->aq_flags |= AQ_SHUTDOWN; 268 269 /* 270 * If the ALQ isn't active but has unwritten data (possible if 271 * the ALQ_NOACTIVATE flag has been used), explicitly activate the 272 * ALQ here so that the pending data gets flushed by the ald_daemon. 273 */ 274 if (!(alq->aq_flags & AQ_ACTIVE) && HAS_PENDING_DATA(alq)) { 275 alq->aq_flags |= AQ_ACTIVE; 276 ALQ_UNLOCK(alq); 277 ALD_LOCK(); 278 ald_activate(alq); 279 ALD_UNLOCK(); 280 ALQ_LOCK(alq); 281 } 282 283 /* Drain IO */ 284 while (alq->aq_flags & AQ_ACTIVE) { 285 alq->aq_flags |= AQ_WANTED; 286 msleep_spin(alq, &alq->aq_mtx, "aldclose", 0); 287 } 288 ALQ_UNLOCK(alq); 289 290 vn_close(alq->aq_vp, FWRITE, alq->aq_cred, 291 curthread); 292 crfree(alq->aq_cred); 293 } 294 295 void 296 alq_destroy(struct alq *alq) 297 { 298 /* Drain all pending IO. */ 299 alq_shutdown(alq); 300 301 mtx_destroy(&alq->aq_mtx); 302 free(alq->aq_entbuf, M_ALD); 303 free(alq, M_ALD); 304 } 305 306 /* 307 * Flush all pending data to disk. This operation will block. 308 */ 309 static int 310 alq_doio(struct alq *alq) 311 { 312 struct thread *td; 313 struct mount *mp; 314 struct vnode *vp; 315 struct uio auio; 316 struct iovec aiov[2]; 317 int totlen; 318 int iov; 319 int wrapearly; 320 321 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__)); 322 323 vp = alq->aq_vp; 324 td = curthread; 325 totlen = 0; 326 iov = 1; 327 wrapearly = alq->aq_wrapearly; 328 329 bzero(&aiov, sizeof(aiov)); 330 bzero(&auio, sizeof(auio)); 331 332 /* Start the write from the location of our buffer tail pointer. */ 333 aiov[0].iov_base = alq->aq_entbuf + alq->aq_writetail; 334 335 if (alq->aq_writetail < alq->aq_writehead) { 336 /* Buffer not wrapped. */ 337 totlen = aiov[0].iov_len = alq->aq_writehead - alq->aq_writetail; 338 } else if (alq->aq_writehead == 0) { 339 /* Buffer not wrapped (special case to avoid an empty iov). */ 340 totlen = aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail - 341 wrapearly; 342 } else { 343 /* 344 * Buffer wrapped, requires 2 aiov entries: 345 * - first is from writetail to end of buffer 346 * - second is from start of buffer to writehead 347 */ 348 aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail - 349 wrapearly; 350 iov++; 351 aiov[1].iov_base = alq->aq_entbuf; 352 aiov[1].iov_len = alq->aq_writehead; 353 totlen = aiov[0].iov_len + aiov[1].iov_len; 354 } 355 356 alq->aq_flags |= AQ_FLUSHING; 357 ALQ_UNLOCK(alq); 358 359 auio.uio_iov = &aiov[0]; 360 auio.uio_offset = 0; 361 auio.uio_segflg = UIO_SYSSPACE; 362 auio.uio_rw = UIO_WRITE; 363 auio.uio_iovcnt = iov; 364 auio.uio_resid = totlen; 365 auio.uio_td = td; 366 367 /* 368 * Do all of the junk required to write now. 369 */ 370 vn_start_write(vp, &mp, V_WAIT); 371 vn_lock(vp, LK_EXCLUSIVE | LK_RETRY); 372 /* 373 * XXX: VOP_WRITE error checks are ignored. 374 */ 375 #ifdef MAC 376 if (mac_vnode_check_write(alq->aq_cred, NOCRED, vp) == 0) 377 #endif 378 VOP_WRITE(vp, &auio, IO_UNIT | IO_APPEND, alq->aq_cred); 379 VOP_UNLOCK(vp, 0); 380 vn_finished_write(mp); 381 382 ALQ_LOCK(alq); 383 alq->aq_flags &= ~AQ_FLUSHING; 384 385 /* Adjust writetail as required, taking into account wrapping. */ 386 alq->aq_writetail = (alq->aq_writetail + totlen + wrapearly) % 387 alq->aq_buflen; 388 alq->aq_freebytes += totlen + wrapearly; 389 390 /* 391 * If we just flushed part of the buffer which wrapped, reset the 392 * wrapearly indicator. 393 */ 394 if (wrapearly) 395 alq->aq_wrapearly = 0; 396 397 /* 398 * If we just flushed the buffer completely, reset indexes to 0 to 399 * minimise buffer wraps. 400 * This is also required to ensure alq_getn() can't wedge itself. 401 */ 402 if (!HAS_PENDING_DATA(alq)) 403 alq->aq_writehead = alq->aq_writetail = 0; 404 405 KASSERT((alq->aq_writetail >= 0 && alq->aq_writetail < alq->aq_buflen), 406 ("%s: aq_writetail < 0 || aq_writetail >= aq_buflen", __func__)); 407 408 if (alq->aq_flags & AQ_WANTED) { 409 alq->aq_flags &= ~AQ_WANTED; 410 return (1); 411 } 412 413 return(0); 414 } 415 416 static struct kproc_desc ald_kp = { 417 "ALQ Daemon", 418 ald_daemon, 419 &ald_proc 420 }; 421 422 SYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, SI_ORDER_ANY, kproc_start, &ald_kp); 423 SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, ald_startup, NULL); 424 425 426 /* User visible queue functions */ 427 428 /* 429 * Create the queue data structure, allocate the buffer, and open the file. 430 */ 431 432 int 433 alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode, 434 int size, int flags) 435 { 436 struct thread *td; 437 struct nameidata nd; 438 struct alq *alq; 439 int oflags; 440 int error; 441 442 KASSERT((size > 0), ("%s: size <= 0", __func__)); 443 444 *alqp = NULL; 445 td = curthread; 446 447 NDINIT(&nd, LOOKUP, NOFOLLOW, UIO_SYSSPACE, file, td); 448 oflags = FWRITE | O_NOFOLLOW | O_CREAT; 449 450 error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL); 451 if (error) 452 return (error); 453 454 NDFREE(&nd, NDF_ONLY_PNBUF); 455 /* We just unlock so we hold a reference */ 456 VOP_UNLOCK(nd.ni_vp, 0); 457 458 alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO); 459 alq->aq_vp = nd.ni_vp; 460 alq->aq_cred = crhold(cred); 461 462 mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET); 463 464 alq->aq_buflen = size; 465 alq->aq_entmax = 0; 466 alq->aq_entlen = 0; 467 468 alq->aq_freebytes = alq->aq_buflen; 469 alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO); 470 alq->aq_writehead = alq->aq_writetail = 0; 471 if (flags & ALQ_ORDERED) 472 alq->aq_flags |= AQ_ORDERED; 473 474 if ((error = ald_add(alq)) != 0) { 475 alq_destroy(alq); 476 return (error); 477 } 478 479 *alqp = alq; 480 481 return (0); 482 } 483 484 int 485 alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode, 486 int size, int count) 487 { 488 int ret; 489 490 KASSERT((count >= 0), ("%s: count < 0", __func__)); 491 492 if (count > 0) { 493 if ((ret = alq_open_flags(alqp, file, cred, cmode, 494 size*count, 0)) == 0) { 495 (*alqp)->aq_flags |= AQ_LEGACY; 496 (*alqp)->aq_entmax = count; 497 (*alqp)->aq_entlen = size; 498 } 499 } else 500 ret = alq_open_flags(alqp, file, cred, cmode, size, 0); 501 502 return (ret); 503 } 504 505 506 /* 507 * Copy a new entry into the queue. If the operation would block either 508 * wait or return an error depending on the value of waitok. 509 */ 510 int 511 alq_writen(struct alq *alq, void *data, int len, int flags) 512 { 513 int activate, copy, ret; 514 void *waitchan; 515 516 KASSERT((len > 0 && len <= alq->aq_buflen), 517 ("%s: len <= 0 || len > aq_buflen", __func__)); 518 519 activate = ret = 0; 520 copy = len; 521 waitchan = NULL; 522 523 ALQ_LOCK(alq); 524 525 /* 526 * Fail to perform the write and return EWOULDBLOCK if: 527 * - The message is larger than our underlying buffer. 528 * - The ALQ is being shutdown. 529 * - There is insufficient free space in our underlying buffer 530 * to accept the message and the user can't wait for space. 531 * - There is insufficient free space in our underlying buffer 532 * to accept the message and the alq is inactive due to prior 533 * use of the ALQ_NOACTIVATE flag (which would lead to deadlock). 534 */ 535 if (len > alq->aq_buflen || 536 alq->aq_flags & AQ_SHUTDOWN || 537 (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) && 538 HAS_PENDING_DATA(alq))) && alq->aq_freebytes < len)) { 539 ALQ_UNLOCK(alq); 540 return (EWOULDBLOCK); 541 } 542 543 /* 544 * If we want ordered writes and there is already at least one thread 545 * waiting for resources to become available, sleep until we're woken. 546 */ 547 if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) { 548 KASSERT(!(flags & ALQ_NOWAIT), 549 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 550 alq->aq_waiters++; 551 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwnord", 0); 552 alq->aq_waiters--; 553 } 554 555 /* 556 * (ALQ_WAITOK && aq_freebytes < len) or aq_freebytes >= len, either 557 * enter while loop and sleep until we have enough free bytes (former) 558 * or skip (latter). If AQ_ORDERED is set, only 1 thread at a time will 559 * be in this loop. Otherwise, multiple threads may be sleeping here 560 * competing for ALQ resources. 561 */ 562 while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) { 563 KASSERT(!(flags & ALQ_NOWAIT), 564 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 565 alq->aq_flags |= AQ_WANTED; 566 alq->aq_waiters++; 567 if (waitchan) 568 wakeup(waitchan); 569 msleep_spin(alq, &alq->aq_mtx, "alqwnres", 0); 570 alq->aq_waiters--; 571 572 /* 573 * If we're the first thread to wake after an AQ_WANTED wakeup 574 * but there isn't enough free space for us, we're going to loop 575 * and sleep again. If there are other threads waiting in this 576 * loop, schedule a wakeup so that they can see if the space 577 * they require is available. 578 */ 579 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) && 580 alq->aq_freebytes < len && !(alq->aq_flags & AQ_WANTED)) 581 waitchan = alq; 582 else 583 waitchan = NULL; 584 } 585 586 /* 587 * If there are waiters, we need to signal the waiting threads after we 588 * complete our work. The alq ptr is used as a wait channel for threads 589 * requiring resources to be freed up. In the AQ_ORDERED case, threads 590 * are not allowed to concurrently compete for resources in the above 591 * while loop, so we use a different wait channel in this case. 592 */ 593 if (alq->aq_waiters > 0) { 594 if (alq->aq_flags & AQ_ORDERED) 595 waitchan = &alq->aq_waiters; 596 else 597 waitchan = alq; 598 } else 599 waitchan = NULL; 600 601 /* Bail if we're shutting down. */ 602 if (alq->aq_flags & AQ_SHUTDOWN) { 603 ret = EWOULDBLOCK; 604 goto unlock; 605 } 606 607 /* 608 * If we need to wrap the buffer to accommodate the write, 609 * we'll need 2 calls to bcopy. 610 */ 611 if ((alq->aq_buflen - alq->aq_writehead) < len) 612 copy = alq->aq_buflen - alq->aq_writehead; 613 614 /* Copy message (or part thereof if wrap required) to the buffer. */ 615 bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy); 616 alq->aq_writehead += copy; 617 618 if (alq->aq_writehead >= alq->aq_buflen) { 619 KASSERT((alq->aq_writehead == alq->aq_buflen), 620 ("%s: alq->aq_writehead (%d) > alq->aq_buflen (%d)", 621 __func__, 622 alq->aq_writehead, 623 alq->aq_buflen)); 624 alq->aq_writehead = 0; 625 } 626 627 if (copy != len) { 628 /* 629 * Wrap the buffer by copying the remainder of our message 630 * to the start of the buffer and resetting aq_writehead. 631 */ 632 bcopy(((uint8_t *)data)+copy, alq->aq_entbuf, len - copy); 633 alq->aq_writehead = len - copy; 634 } 635 636 KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen), 637 ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__)); 638 639 alq->aq_freebytes -= len; 640 641 if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) { 642 alq->aq_flags |= AQ_ACTIVE; 643 activate = 1; 644 } 645 646 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__)); 647 648 unlock: 649 ALQ_UNLOCK(alq); 650 651 if (activate) { 652 ALD_LOCK(); 653 ald_activate(alq); 654 ALD_UNLOCK(); 655 } 656 657 /* NB: We rely on wakeup_one waking threads in a FIFO manner. */ 658 if (waitchan != NULL) 659 wakeup_one(waitchan); 660 661 return (ret); 662 } 663 664 int 665 alq_write(struct alq *alq, void *data, int flags) 666 { 667 /* Should only be called in fixed length message (legacy) mode. */ 668 KASSERT((alq->aq_flags & AQ_LEGACY), 669 ("%s: fixed length write on variable length queue", __func__)); 670 return (alq_writen(alq, data, alq->aq_entlen, flags)); 671 } 672 673 /* 674 * Retrieve a pointer for the ALQ to write directly into, avoiding bcopy. 675 */ 676 struct ale * 677 alq_getn(struct alq *alq, int len, int flags) 678 { 679 int contigbytes; 680 void *waitchan; 681 682 KASSERT((len > 0 && len <= alq->aq_buflen), 683 ("%s: len <= 0 || len > alq->aq_buflen", __func__)); 684 685 waitchan = NULL; 686 687 ALQ_LOCK(alq); 688 689 /* 690 * Determine the number of free contiguous bytes. 691 * We ensure elsewhere that if aq_writehead == aq_writetail because 692 * the buffer is empty, they will both be set to 0 and therefore 693 * aq_freebytes == aq_buflen and is fully contiguous. 694 * If they are equal and the buffer is not empty, aq_freebytes will 695 * be 0 indicating the buffer is full. 696 */ 697 if (alq->aq_writehead <= alq->aq_writetail) 698 contigbytes = alq->aq_freebytes; 699 else { 700 contigbytes = alq->aq_buflen - alq->aq_writehead; 701 702 if (contigbytes < len) { 703 /* 704 * Insufficient space at end of buffer to handle a 705 * contiguous write. Wrap early if there's space at 706 * the beginning. This will leave a hole at the end 707 * of the buffer which we will have to skip over when 708 * flushing the buffer to disk. 709 */ 710 if (alq->aq_writetail >= len || flags & ALQ_WAITOK) { 711 /* Keep track of # bytes left blank. */ 712 alq->aq_wrapearly = contigbytes; 713 /* Do the wrap and adjust counters. */ 714 contigbytes = alq->aq_freebytes = 715 alq->aq_writetail; 716 alq->aq_writehead = 0; 717 } 718 } 719 } 720 721 /* 722 * Return a NULL ALE if: 723 * - The message is larger than our underlying buffer. 724 * - The ALQ is being shutdown. 725 * - There is insufficient free space in our underlying buffer 726 * to accept the message and the user can't wait for space. 727 * - There is insufficient free space in our underlying buffer 728 * to accept the message and the alq is inactive due to prior 729 * use of the ALQ_NOACTIVATE flag (which would lead to deadlock). 730 */ 731 if (len > alq->aq_buflen || 732 alq->aq_flags & AQ_SHUTDOWN || 733 (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) && 734 HAS_PENDING_DATA(alq))) && contigbytes < len)) { 735 ALQ_UNLOCK(alq); 736 return (NULL); 737 } 738 739 /* 740 * If we want ordered writes and there is already at least one thread 741 * waiting for resources to become available, sleep until we're woken. 742 */ 743 if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) { 744 KASSERT(!(flags & ALQ_NOWAIT), 745 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 746 alq->aq_waiters++; 747 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqgnord", 0); 748 alq->aq_waiters--; 749 } 750 751 /* 752 * (ALQ_WAITOK && contigbytes < len) or contigbytes >= len, either enter 753 * while loop and sleep until we have enough contiguous free bytes 754 * (former) or skip (latter). If AQ_ORDERED is set, only 1 thread at a 755 * time will be in this loop. Otherwise, multiple threads may be 756 * sleeping here competing for ALQ resources. 757 */ 758 while (contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) { 759 KASSERT(!(flags & ALQ_NOWAIT), 760 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__)); 761 alq->aq_flags |= AQ_WANTED; 762 alq->aq_waiters++; 763 if (waitchan) 764 wakeup(waitchan); 765 msleep_spin(alq, &alq->aq_mtx, "alqgnres", 0); 766 alq->aq_waiters--; 767 768 if (alq->aq_writehead <= alq->aq_writetail) 769 contigbytes = alq->aq_freebytes; 770 else 771 contigbytes = alq->aq_buflen - alq->aq_writehead; 772 773 /* 774 * If we're the first thread to wake after an AQ_WANTED wakeup 775 * but there isn't enough free space for us, we're going to loop 776 * and sleep again. If there are other threads waiting in this 777 * loop, schedule a wakeup so that they can see if the space 778 * they require is available. 779 */ 780 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) && 781 contigbytes < len && !(alq->aq_flags & AQ_WANTED)) 782 waitchan = alq; 783 else 784 waitchan = NULL; 785 } 786 787 /* 788 * If there are waiters, we need to signal the waiting threads after we 789 * complete our work. The alq ptr is used as a wait channel for threads 790 * requiring resources to be freed up. In the AQ_ORDERED case, threads 791 * are not allowed to concurrently compete for resources in the above 792 * while loop, so we use a different wait channel in this case. 793 */ 794 if (alq->aq_waiters > 0) { 795 if (alq->aq_flags & AQ_ORDERED) 796 waitchan = &alq->aq_waiters; 797 else 798 waitchan = alq; 799 } else 800 waitchan = NULL; 801 802 /* Bail if we're shutting down. */ 803 if (alq->aq_flags & AQ_SHUTDOWN) { 804 ALQ_UNLOCK(alq); 805 if (waitchan != NULL) 806 wakeup_one(waitchan); 807 return (NULL); 808 } 809 810 /* 811 * If we are here, we have a contiguous number of bytes >= len 812 * available in our buffer starting at aq_writehead. 813 */ 814 alq->aq_getpost.ae_data = alq->aq_entbuf + alq->aq_writehead; 815 alq->aq_getpost.ae_bytesused = len; 816 817 return (&alq->aq_getpost); 818 } 819 820 struct ale * 821 alq_get(struct alq *alq, int flags) 822 { 823 /* Should only be called in fixed length message (legacy) mode. */ 824 KASSERT((alq->aq_flags & AQ_LEGACY), 825 ("%s: fixed length get on variable length queue", __func__)); 826 return (alq_getn(alq, alq->aq_entlen, flags)); 827 } 828 829 void 830 alq_post_flags(struct alq *alq, struct ale *ale, int flags) 831 { 832 int activate; 833 void *waitchan; 834 835 activate = 0; 836 837 if (ale->ae_bytesused > 0) { 838 if (!(alq->aq_flags & AQ_ACTIVE) && 839 !(flags & ALQ_NOACTIVATE)) { 840 alq->aq_flags |= AQ_ACTIVE; 841 activate = 1; 842 } 843 844 alq->aq_writehead += ale->ae_bytesused; 845 alq->aq_freebytes -= ale->ae_bytesused; 846 847 /* Wrap aq_writehead if we filled to the end of the buffer. */ 848 if (alq->aq_writehead == alq->aq_buflen) 849 alq->aq_writehead = 0; 850 851 KASSERT((alq->aq_writehead >= 0 && 852 alq->aq_writehead < alq->aq_buflen), 853 ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", 854 __func__)); 855 856 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__)); 857 } 858 859 /* 860 * If there are waiters, we need to signal the waiting threads after we 861 * complete our work. The alq ptr is used as a wait channel for threads 862 * requiring resources to be freed up. In the AQ_ORDERED case, threads 863 * are not allowed to concurrently compete for resources in the 864 * alq_getn() while loop, so we use a different wait channel in this case. 865 */ 866 if (alq->aq_waiters > 0) { 867 if (alq->aq_flags & AQ_ORDERED) 868 waitchan = &alq->aq_waiters; 869 else 870 waitchan = alq; 871 } else 872 waitchan = NULL; 873 874 ALQ_UNLOCK(alq); 875 876 if (activate) { 877 ALD_LOCK(); 878 ald_activate(alq); 879 ALD_UNLOCK(); 880 } 881 882 /* NB: We rely on wakeup_one waking threads in a FIFO manner. */ 883 if (waitchan != NULL) 884 wakeup_one(waitchan); 885 } 886 887 void 888 alq_flush(struct alq *alq) 889 { 890 int needwakeup = 0; 891 892 ALD_LOCK(); 893 ALQ_LOCK(alq); 894 895 /* 896 * Pull the lever iff there is data to flush and we're 897 * not already in the middle of a flush operation. 898 */ 899 if (HAS_PENDING_DATA(alq) && !(alq->aq_flags & AQ_FLUSHING)) { 900 if (alq->aq_flags & AQ_ACTIVE) 901 ald_deactivate(alq); 902 903 ALD_UNLOCK(); 904 needwakeup = alq_doio(alq); 905 } else 906 ALD_UNLOCK(); 907 908 ALQ_UNLOCK(alq); 909 910 if (needwakeup) 911 wakeup_one(alq); 912 } 913 914 /* 915 * Flush remaining data, close the file and free all resources. 916 */ 917 void 918 alq_close(struct alq *alq) 919 { 920 /* Only flush and destroy alq if not already shutting down. */ 921 if (ald_rem(alq) == 0) 922 alq_destroy(alq); 923 } 924 925 static int 926 alq_load_handler(module_t mod, int what, void *arg) 927 { 928 int ret; 929 930 ret = 0; 931 932 switch (what) { 933 case MOD_LOAD: 934 case MOD_SHUTDOWN: 935 break; 936 937 case MOD_QUIESCE: 938 ALD_LOCK(); 939 /* Only allow unload if there are no open queues. */ 940 if (LIST_FIRST(&ald_queues) == NULL) { 941 ald_shutingdown = 1; 942 ALD_UNLOCK(); 943 EVENTHANDLER_DEREGISTER(shutdown_pre_sync, 944 alq_eventhandler_tag); 945 ald_shutdown(NULL, 0); 946 mtx_destroy(&ald_mtx); 947 } else { 948 ALD_UNLOCK(); 949 ret = EBUSY; 950 } 951 break; 952 953 case MOD_UNLOAD: 954 /* If MOD_QUIESCE failed we must fail here too. */ 955 if (ald_shutingdown == 0) 956 ret = EBUSY; 957 break; 958 959 default: 960 ret = EINVAL; 961 break; 962 } 963 964 return (ret); 965 } 966 967 static moduledata_t alq_mod = 968 { 969 "alq", 970 alq_load_handler, 971 NULL 972 }; 973 974 DECLARE_MODULE(alq, alq_mod, SI_SUB_LAST, SI_ORDER_ANY); 975 MODULE_VERSION(alq, 1); 976