1 /* 2 * An async IO implementation for Linux 3 * Written by Benjamin LaHaise <bcrl@kvack.org> 4 * 5 * Implements an efficient asynchronous io interface. 6 * 7 * Copyright 2000, 2001, 2002 Red Hat, Inc. All Rights Reserved. 8 * 9 * See ../COPYING for licensing terms. 10 */ 11 #define pr_fmt(fmt) "%s: " fmt, __func__ 12 13 #include <linux/kernel.h> 14 #include <linux/init.h> 15 #include <linux/errno.h> 16 #include <linux/time.h> 17 #include <linux/aio_abi.h> 18 #include <linux/export.h> 19 #include <linux/syscalls.h> 20 #include <linux/backing-dev.h> 21 #include <linux/uio.h> 22 23 #include <linux/sched.h> 24 #include <linux/fs.h> 25 #include <linux/file.h> 26 #include <linux/mm.h> 27 #include <linux/mman.h> 28 #include <linux/mmu_context.h> 29 #include <linux/percpu.h> 30 #include <linux/slab.h> 31 #include <linux/timer.h> 32 #include <linux/aio.h> 33 #include <linux/highmem.h> 34 #include <linux/workqueue.h> 35 #include <linux/security.h> 36 #include <linux/eventfd.h> 37 #include <linux/blkdev.h> 38 #include <linux/compat.h> 39 #include <linux/migrate.h> 40 #include <linux/ramfs.h> 41 #include <linux/percpu-refcount.h> 42 #include <linux/mount.h> 43 44 #include <asm/kmap_types.h> 45 #include <asm/uaccess.h> 46 47 #include "internal.h" 48 49 #define AIO_RING_MAGIC 0xa10a10a1 50 #define AIO_RING_COMPAT_FEATURES 1 51 #define AIO_RING_INCOMPAT_FEATURES 0 52 struct aio_ring { 53 unsigned id; /* kernel internal index number */ 54 unsigned nr; /* number of io_events */ 55 unsigned head; /* Written to by userland or under ring_lock 56 * mutex by aio_read_events_ring(). */ 57 unsigned tail; 58 59 unsigned magic; 60 unsigned compat_features; 61 unsigned incompat_features; 62 unsigned header_length; /* size of aio_ring */ 63 64 65 struct io_event io_events[0]; 66 }; /* 128 bytes + ring size */ 67 68 #define AIO_RING_PAGES 8 69 70 struct kioctx_table { 71 struct rcu_head rcu; 72 unsigned nr; 73 struct kioctx *table[]; 74 }; 75 76 struct kioctx_cpu { 77 unsigned reqs_available; 78 }; 79 80 struct kioctx { 81 struct percpu_ref users; 82 atomic_t dead; 83 84 struct percpu_ref reqs; 85 86 unsigned long user_id; 87 88 struct __percpu kioctx_cpu *cpu; 89 90 /* 91 * For percpu reqs_available, number of slots we move to/from global 92 * counter at a time: 93 */ 94 unsigned req_batch; 95 /* 96 * This is what userspace passed to io_setup(), it's not used for 97 * anything but counting against the global max_reqs quota. 98 * 99 * The real limit is nr_events - 1, which will be larger (see 100 * aio_setup_ring()) 101 */ 102 unsigned max_reqs; 103 104 /* Size of ringbuffer, in units of struct io_event */ 105 unsigned nr_events; 106 107 unsigned long mmap_base; 108 unsigned long mmap_size; 109 110 struct page **ring_pages; 111 long nr_pages; 112 113 struct work_struct free_work; 114 115 /* 116 * signals when all in-flight requests are done 117 */ 118 struct completion *requests_done; 119 120 struct { 121 /* 122 * This counts the number of available slots in the ringbuffer, 123 * so we avoid overflowing it: it's decremented (if positive) 124 * when allocating a kiocb and incremented when the resulting 125 * io_event is pulled off the ringbuffer. 126 * 127 * We batch accesses to it with a percpu version. 128 */ 129 atomic_t reqs_available; 130 } ____cacheline_aligned_in_smp; 131 132 struct { 133 spinlock_t ctx_lock; 134 struct list_head active_reqs; /* used for cancellation */ 135 } ____cacheline_aligned_in_smp; 136 137 struct { 138 struct mutex ring_lock; 139 wait_queue_head_t wait; 140 } ____cacheline_aligned_in_smp; 141 142 struct { 143 unsigned tail; 144 unsigned completed_events; 145 spinlock_t completion_lock; 146 } ____cacheline_aligned_in_smp; 147 148 struct page *internal_pages[AIO_RING_PAGES]; 149 struct file *aio_ring_file; 150 151 unsigned id; 152 }; 153 154 /* 155 * We use ki_cancel == KIOCB_CANCELLED to indicate that a kiocb has been either 156 * cancelled or completed (this makes a certain amount of sense because 157 * successful cancellation - io_cancel() - does deliver the completion to 158 * userspace). 159 * 160 * And since most things don't implement kiocb cancellation and we'd really like 161 * kiocb completion to be lockless when possible, we use ki_cancel to 162 * synchronize cancellation and completion - we only set it to KIOCB_CANCELLED 163 * with xchg() or cmpxchg(), see batch_complete_aio() and kiocb_cancel(). 164 */ 165 #define KIOCB_CANCELLED ((void *) (~0ULL)) 166 167 struct aio_kiocb { 168 struct kiocb common; 169 170 struct kioctx *ki_ctx; 171 kiocb_cancel_fn *ki_cancel; 172 173 struct iocb __user *ki_user_iocb; /* user's aiocb */ 174 __u64 ki_user_data; /* user's data for completion */ 175 176 struct list_head ki_list; /* the aio core uses this 177 * for cancellation */ 178 179 /* 180 * If the aio_resfd field of the userspace iocb is not zero, 181 * this is the underlying eventfd context to deliver events to. 182 */ 183 struct eventfd_ctx *ki_eventfd; 184 }; 185 186 /*------ sysctl variables----*/ 187 static DEFINE_SPINLOCK(aio_nr_lock); 188 unsigned long aio_nr; /* current system wide number of aio requests */ 189 unsigned long aio_max_nr = 0x10000; /* system wide maximum number of aio requests */ 190 /*----end sysctl variables---*/ 191 192 static struct kmem_cache *kiocb_cachep; 193 static struct kmem_cache *kioctx_cachep; 194 195 static struct vfsmount *aio_mnt; 196 197 static const struct file_operations aio_ring_fops; 198 static const struct address_space_operations aio_ctx_aops; 199 200 static struct file *aio_private_file(struct kioctx *ctx, loff_t nr_pages) 201 { 202 struct qstr this = QSTR_INIT("[aio]", 5); 203 struct file *file; 204 struct path path; 205 struct inode *inode = alloc_anon_inode(aio_mnt->mnt_sb); 206 if (IS_ERR(inode)) 207 return ERR_CAST(inode); 208 209 inode->i_mapping->a_ops = &aio_ctx_aops; 210 inode->i_mapping->private_data = ctx; 211 inode->i_size = PAGE_SIZE * nr_pages; 212 213 path.dentry = d_alloc_pseudo(aio_mnt->mnt_sb, &this); 214 if (!path.dentry) { 215 iput(inode); 216 return ERR_PTR(-ENOMEM); 217 } 218 path.mnt = mntget(aio_mnt); 219 220 d_instantiate(path.dentry, inode); 221 file = alloc_file(&path, FMODE_READ | FMODE_WRITE, &aio_ring_fops); 222 if (IS_ERR(file)) { 223 path_put(&path); 224 return file; 225 } 226 227 file->f_flags = O_RDWR; 228 return file; 229 } 230 231 static struct dentry *aio_mount(struct file_system_type *fs_type, 232 int flags, const char *dev_name, void *data) 233 { 234 static const struct dentry_operations ops = { 235 .d_dname = simple_dname, 236 }; 237 return mount_pseudo(fs_type, "aio:", NULL, &ops, AIO_RING_MAGIC); 238 } 239 240 /* aio_setup 241 * Creates the slab caches used by the aio routines, panic on 242 * failure as this is done early during the boot sequence. 243 */ 244 static int __init aio_setup(void) 245 { 246 static struct file_system_type aio_fs = { 247 .name = "aio", 248 .mount = aio_mount, 249 .kill_sb = kill_anon_super, 250 }; 251 aio_mnt = kern_mount(&aio_fs); 252 if (IS_ERR(aio_mnt)) 253 panic("Failed to create aio fs mount."); 254 255 kiocb_cachep = KMEM_CACHE(aio_kiocb, SLAB_HWCACHE_ALIGN|SLAB_PANIC); 256 kioctx_cachep = KMEM_CACHE(kioctx,SLAB_HWCACHE_ALIGN|SLAB_PANIC); 257 258 pr_debug("sizeof(struct page) = %zu\n", sizeof(struct page)); 259 260 return 0; 261 } 262 __initcall(aio_setup); 263 264 static void put_aio_ring_file(struct kioctx *ctx) 265 { 266 struct file *aio_ring_file = ctx->aio_ring_file; 267 if (aio_ring_file) { 268 truncate_setsize(aio_ring_file->f_inode, 0); 269 270 /* Prevent further access to the kioctx from migratepages */ 271 spin_lock(&aio_ring_file->f_inode->i_mapping->private_lock); 272 aio_ring_file->f_inode->i_mapping->private_data = NULL; 273 ctx->aio_ring_file = NULL; 274 spin_unlock(&aio_ring_file->f_inode->i_mapping->private_lock); 275 276 fput(aio_ring_file); 277 } 278 } 279 280 static void aio_free_ring(struct kioctx *ctx) 281 { 282 int i; 283 284 /* Disconnect the kiotx from the ring file. This prevents future 285 * accesses to the kioctx from page migration. 286 */ 287 put_aio_ring_file(ctx); 288 289 for (i = 0; i < ctx->nr_pages; i++) { 290 struct page *page; 291 pr_debug("pid(%d) [%d] page->count=%d\n", current->pid, i, 292 page_count(ctx->ring_pages[i])); 293 page = ctx->ring_pages[i]; 294 if (!page) 295 continue; 296 ctx->ring_pages[i] = NULL; 297 put_page(page); 298 } 299 300 if (ctx->ring_pages && ctx->ring_pages != ctx->internal_pages) { 301 kfree(ctx->ring_pages); 302 ctx->ring_pages = NULL; 303 } 304 } 305 306 static int aio_ring_mmap(struct file *file, struct vm_area_struct *vma) 307 { 308 vma->vm_flags |= VM_DONTEXPAND; 309 vma->vm_ops = &generic_file_vm_ops; 310 return 0; 311 } 312 313 static int aio_ring_remap(struct file *file, struct vm_area_struct *vma) 314 { 315 struct mm_struct *mm = vma->vm_mm; 316 struct kioctx_table *table; 317 int i, res = -EINVAL; 318 319 spin_lock(&mm->ioctx_lock); 320 rcu_read_lock(); 321 table = rcu_dereference(mm->ioctx_table); 322 for (i = 0; i < table->nr; i++) { 323 struct kioctx *ctx; 324 325 ctx = table->table[i]; 326 if (ctx && ctx->aio_ring_file == file) { 327 if (!atomic_read(&ctx->dead)) { 328 ctx->user_id = ctx->mmap_base = vma->vm_start; 329 res = 0; 330 } 331 break; 332 } 333 } 334 335 rcu_read_unlock(); 336 spin_unlock(&mm->ioctx_lock); 337 return res; 338 } 339 340 static const struct file_operations aio_ring_fops = { 341 .mmap = aio_ring_mmap, 342 .mremap = aio_ring_remap, 343 }; 344 345 #if IS_ENABLED(CONFIG_MIGRATION) 346 static int aio_migratepage(struct address_space *mapping, struct page *new, 347 struct page *old, enum migrate_mode mode) 348 { 349 struct kioctx *ctx; 350 unsigned long flags; 351 pgoff_t idx; 352 int rc; 353 354 rc = 0; 355 356 /* mapping->private_lock here protects against the kioctx teardown. */ 357 spin_lock(&mapping->private_lock); 358 ctx = mapping->private_data; 359 if (!ctx) { 360 rc = -EINVAL; 361 goto out; 362 } 363 364 /* The ring_lock mutex. The prevents aio_read_events() from writing 365 * to the ring's head, and prevents page migration from mucking in 366 * a partially initialized kiotx. 367 */ 368 if (!mutex_trylock(&ctx->ring_lock)) { 369 rc = -EAGAIN; 370 goto out; 371 } 372 373 idx = old->index; 374 if (idx < (pgoff_t)ctx->nr_pages) { 375 /* Make sure the old page hasn't already been changed */ 376 if (ctx->ring_pages[idx] != old) 377 rc = -EAGAIN; 378 } else 379 rc = -EINVAL; 380 381 if (rc != 0) 382 goto out_unlock; 383 384 /* Writeback must be complete */ 385 BUG_ON(PageWriteback(old)); 386 get_page(new); 387 388 rc = migrate_page_move_mapping(mapping, new, old, NULL, mode, 1); 389 if (rc != MIGRATEPAGE_SUCCESS) { 390 put_page(new); 391 goto out_unlock; 392 } 393 394 /* Take completion_lock to prevent other writes to the ring buffer 395 * while the old page is copied to the new. This prevents new 396 * events from being lost. 397 */ 398 spin_lock_irqsave(&ctx->completion_lock, flags); 399 migrate_page_copy(new, old); 400 BUG_ON(ctx->ring_pages[idx] != old); 401 ctx->ring_pages[idx] = new; 402 spin_unlock_irqrestore(&ctx->completion_lock, flags); 403 404 /* The old page is no longer accessible. */ 405 put_page(old); 406 407 out_unlock: 408 mutex_unlock(&ctx->ring_lock); 409 out: 410 spin_unlock(&mapping->private_lock); 411 return rc; 412 } 413 #endif 414 415 static const struct address_space_operations aio_ctx_aops = { 416 .set_page_dirty = __set_page_dirty_no_writeback, 417 #if IS_ENABLED(CONFIG_MIGRATION) 418 .migratepage = aio_migratepage, 419 #endif 420 }; 421 422 static int aio_setup_ring(struct kioctx *ctx) 423 { 424 struct aio_ring *ring; 425 unsigned nr_events = ctx->max_reqs; 426 struct mm_struct *mm = current->mm; 427 unsigned long size, unused; 428 int nr_pages; 429 int i; 430 struct file *file; 431 432 /* Compensate for the ring buffer's head/tail overlap entry */ 433 nr_events += 2; /* 1 is required, 2 for good luck */ 434 435 size = sizeof(struct aio_ring); 436 size += sizeof(struct io_event) * nr_events; 437 438 nr_pages = PFN_UP(size); 439 if (nr_pages < 0) 440 return -EINVAL; 441 442 file = aio_private_file(ctx, nr_pages); 443 if (IS_ERR(file)) { 444 ctx->aio_ring_file = NULL; 445 return -ENOMEM; 446 } 447 448 ctx->aio_ring_file = file; 449 nr_events = (PAGE_SIZE * nr_pages - sizeof(struct aio_ring)) 450 / sizeof(struct io_event); 451 452 ctx->ring_pages = ctx->internal_pages; 453 if (nr_pages > AIO_RING_PAGES) { 454 ctx->ring_pages = kcalloc(nr_pages, sizeof(struct page *), 455 GFP_KERNEL); 456 if (!ctx->ring_pages) { 457 put_aio_ring_file(ctx); 458 return -ENOMEM; 459 } 460 } 461 462 for (i = 0; i < nr_pages; i++) { 463 struct page *page; 464 page = find_or_create_page(file->f_inode->i_mapping, 465 i, GFP_HIGHUSER | __GFP_ZERO); 466 if (!page) 467 break; 468 pr_debug("pid(%d) page[%d]->count=%d\n", 469 current->pid, i, page_count(page)); 470 SetPageUptodate(page); 471 unlock_page(page); 472 473 ctx->ring_pages[i] = page; 474 } 475 ctx->nr_pages = i; 476 477 if (unlikely(i != nr_pages)) { 478 aio_free_ring(ctx); 479 return -ENOMEM; 480 } 481 482 ctx->mmap_size = nr_pages * PAGE_SIZE; 483 pr_debug("attempting mmap of %lu bytes\n", ctx->mmap_size); 484 485 down_write(&mm->mmap_sem); 486 ctx->mmap_base = do_mmap_pgoff(ctx->aio_ring_file, 0, ctx->mmap_size, 487 PROT_READ | PROT_WRITE, 488 MAP_SHARED, 0, &unused); 489 up_write(&mm->mmap_sem); 490 if (IS_ERR((void *)ctx->mmap_base)) { 491 ctx->mmap_size = 0; 492 aio_free_ring(ctx); 493 return -ENOMEM; 494 } 495 496 pr_debug("mmap address: 0x%08lx\n", ctx->mmap_base); 497 498 ctx->user_id = ctx->mmap_base; 499 ctx->nr_events = nr_events; /* trusted copy */ 500 501 ring = kmap_atomic(ctx->ring_pages[0]); 502 ring->nr = nr_events; /* user copy */ 503 ring->id = ~0U; 504 ring->head = ring->tail = 0; 505 ring->magic = AIO_RING_MAGIC; 506 ring->compat_features = AIO_RING_COMPAT_FEATURES; 507 ring->incompat_features = AIO_RING_INCOMPAT_FEATURES; 508 ring->header_length = sizeof(struct aio_ring); 509 kunmap_atomic(ring); 510 flush_dcache_page(ctx->ring_pages[0]); 511 512 return 0; 513 } 514 515 #define AIO_EVENTS_PER_PAGE (PAGE_SIZE / sizeof(struct io_event)) 516 #define AIO_EVENTS_FIRST_PAGE ((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event)) 517 #define AIO_EVENTS_OFFSET (AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE) 518 519 void kiocb_set_cancel_fn(struct kiocb *iocb, kiocb_cancel_fn *cancel) 520 { 521 struct aio_kiocb *req = container_of(iocb, struct aio_kiocb, common); 522 struct kioctx *ctx = req->ki_ctx; 523 unsigned long flags; 524 525 spin_lock_irqsave(&ctx->ctx_lock, flags); 526 527 if (!req->ki_list.next) 528 list_add(&req->ki_list, &ctx->active_reqs); 529 530 req->ki_cancel = cancel; 531 532 spin_unlock_irqrestore(&ctx->ctx_lock, flags); 533 } 534 EXPORT_SYMBOL(kiocb_set_cancel_fn); 535 536 static int kiocb_cancel(struct aio_kiocb *kiocb) 537 { 538 kiocb_cancel_fn *old, *cancel; 539 540 /* 541 * Don't want to set kiocb->ki_cancel = KIOCB_CANCELLED unless it 542 * actually has a cancel function, hence the cmpxchg() 543 */ 544 545 cancel = ACCESS_ONCE(kiocb->ki_cancel); 546 do { 547 if (!cancel || cancel == KIOCB_CANCELLED) 548 return -EINVAL; 549 550 old = cancel; 551 cancel = cmpxchg(&kiocb->ki_cancel, old, KIOCB_CANCELLED); 552 } while (cancel != old); 553 554 return cancel(&kiocb->common); 555 } 556 557 static void free_ioctx(struct work_struct *work) 558 { 559 struct kioctx *ctx = container_of(work, struct kioctx, free_work); 560 561 pr_debug("freeing %p\n", ctx); 562 563 aio_free_ring(ctx); 564 free_percpu(ctx->cpu); 565 percpu_ref_exit(&ctx->reqs); 566 percpu_ref_exit(&ctx->users); 567 kmem_cache_free(kioctx_cachep, ctx); 568 } 569 570 static void free_ioctx_reqs(struct percpu_ref *ref) 571 { 572 struct kioctx *ctx = container_of(ref, struct kioctx, reqs); 573 574 /* At this point we know that there are no any in-flight requests */ 575 if (ctx->requests_done) 576 complete(ctx->requests_done); 577 578 INIT_WORK(&ctx->free_work, free_ioctx); 579 schedule_work(&ctx->free_work); 580 } 581 582 /* 583 * When this function runs, the kioctx has been removed from the "hash table" 584 * and ctx->users has dropped to 0, so we know no more kiocbs can be submitted - 585 * now it's safe to cancel any that need to be. 586 */ 587 static void free_ioctx_users(struct percpu_ref *ref) 588 { 589 struct kioctx *ctx = container_of(ref, struct kioctx, users); 590 struct aio_kiocb *req; 591 592 spin_lock_irq(&ctx->ctx_lock); 593 594 while (!list_empty(&ctx->active_reqs)) { 595 req = list_first_entry(&ctx->active_reqs, 596 struct aio_kiocb, ki_list); 597 598 list_del_init(&req->ki_list); 599 kiocb_cancel(req); 600 } 601 602 spin_unlock_irq(&ctx->ctx_lock); 603 604 percpu_ref_kill(&ctx->reqs); 605 percpu_ref_put(&ctx->reqs); 606 } 607 608 static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm) 609 { 610 unsigned i, new_nr; 611 struct kioctx_table *table, *old; 612 struct aio_ring *ring; 613 614 spin_lock(&mm->ioctx_lock); 615 table = rcu_dereference_raw(mm->ioctx_table); 616 617 while (1) { 618 if (table) 619 for (i = 0; i < table->nr; i++) 620 if (!table->table[i]) { 621 ctx->id = i; 622 table->table[i] = ctx; 623 spin_unlock(&mm->ioctx_lock); 624 625 /* While kioctx setup is in progress, 626 * we are protected from page migration 627 * changes ring_pages by ->ring_lock. 628 */ 629 ring = kmap_atomic(ctx->ring_pages[0]); 630 ring->id = ctx->id; 631 kunmap_atomic(ring); 632 return 0; 633 } 634 635 new_nr = (table ? table->nr : 1) * 4; 636 spin_unlock(&mm->ioctx_lock); 637 638 table = kzalloc(sizeof(*table) + sizeof(struct kioctx *) * 639 new_nr, GFP_KERNEL); 640 if (!table) 641 return -ENOMEM; 642 643 table->nr = new_nr; 644 645 spin_lock(&mm->ioctx_lock); 646 old = rcu_dereference_raw(mm->ioctx_table); 647 648 if (!old) { 649 rcu_assign_pointer(mm->ioctx_table, table); 650 } else if (table->nr > old->nr) { 651 memcpy(table->table, old->table, 652 old->nr * sizeof(struct kioctx *)); 653 654 rcu_assign_pointer(mm->ioctx_table, table); 655 kfree_rcu(old, rcu); 656 } else { 657 kfree(table); 658 table = old; 659 } 660 } 661 } 662 663 static void aio_nr_sub(unsigned nr) 664 { 665 spin_lock(&aio_nr_lock); 666 if (WARN_ON(aio_nr - nr > aio_nr)) 667 aio_nr = 0; 668 else 669 aio_nr -= nr; 670 spin_unlock(&aio_nr_lock); 671 } 672 673 /* ioctx_alloc 674 * Allocates and initializes an ioctx. Returns an ERR_PTR if it failed. 675 */ 676 static struct kioctx *ioctx_alloc(unsigned nr_events) 677 { 678 struct mm_struct *mm = current->mm; 679 struct kioctx *ctx; 680 int err = -ENOMEM; 681 682 /* 683 * We keep track of the number of available ringbuffer slots, to prevent 684 * overflow (reqs_available), and we also use percpu counters for this. 685 * 686 * So since up to half the slots might be on other cpu's percpu counters 687 * and unavailable, double nr_events so userspace sees what they 688 * expected: additionally, we move req_batch slots to/from percpu 689 * counters at a time, so make sure that isn't 0: 690 */ 691 nr_events = max(nr_events, num_possible_cpus() * 4); 692 nr_events *= 2; 693 694 /* Prevent overflows */ 695 if (nr_events > (0x10000000U / sizeof(struct io_event))) { 696 pr_debug("ENOMEM: nr_events too high\n"); 697 return ERR_PTR(-EINVAL); 698 } 699 700 if (!nr_events || (unsigned long)nr_events > (aio_max_nr * 2UL)) 701 return ERR_PTR(-EAGAIN); 702 703 ctx = kmem_cache_zalloc(kioctx_cachep, GFP_KERNEL); 704 if (!ctx) 705 return ERR_PTR(-ENOMEM); 706 707 ctx->max_reqs = nr_events; 708 709 spin_lock_init(&ctx->ctx_lock); 710 spin_lock_init(&ctx->completion_lock); 711 mutex_init(&ctx->ring_lock); 712 /* Protect against page migration throughout kiotx setup by keeping 713 * the ring_lock mutex held until setup is complete. */ 714 mutex_lock(&ctx->ring_lock); 715 init_waitqueue_head(&ctx->wait); 716 717 INIT_LIST_HEAD(&ctx->active_reqs); 718 719 if (percpu_ref_init(&ctx->users, free_ioctx_users, 0, GFP_KERNEL)) 720 goto err; 721 722 if (percpu_ref_init(&ctx->reqs, free_ioctx_reqs, 0, GFP_KERNEL)) 723 goto err; 724 725 ctx->cpu = alloc_percpu(struct kioctx_cpu); 726 if (!ctx->cpu) 727 goto err; 728 729 err = aio_setup_ring(ctx); 730 if (err < 0) 731 goto err; 732 733 atomic_set(&ctx->reqs_available, ctx->nr_events - 1); 734 ctx->req_batch = (ctx->nr_events - 1) / (num_possible_cpus() * 4); 735 if (ctx->req_batch < 1) 736 ctx->req_batch = 1; 737 738 /* limit the number of system wide aios */ 739 spin_lock(&aio_nr_lock); 740 if (aio_nr + nr_events > (aio_max_nr * 2UL) || 741 aio_nr + nr_events < aio_nr) { 742 spin_unlock(&aio_nr_lock); 743 err = -EAGAIN; 744 goto err_ctx; 745 } 746 aio_nr += ctx->max_reqs; 747 spin_unlock(&aio_nr_lock); 748 749 percpu_ref_get(&ctx->users); /* io_setup() will drop this ref */ 750 percpu_ref_get(&ctx->reqs); /* free_ioctx_users() will drop this */ 751 752 err = ioctx_add_table(ctx, mm); 753 if (err) 754 goto err_cleanup; 755 756 /* Release the ring_lock mutex now that all setup is complete. */ 757 mutex_unlock(&ctx->ring_lock); 758 759 pr_debug("allocated ioctx %p[%ld]: mm=%p mask=0x%x\n", 760 ctx, ctx->user_id, mm, ctx->nr_events); 761 return ctx; 762 763 err_cleanup: 764 aio_nr_sub(ctx->max_reqs); 765 err_ctx: 766 atomic_set(&ctx->dead, 1); 767 if (ctx->mmap_size) 768 vm_munmap(ctx->mmap_base, ctx->mmap_size); 769 aio_free_ring(ctx); 770 err: 771 mutex_unlock(&ctx->ring_lock); 772 free_percpu(ctx->cpu); 773 percpu_ref_exit(&ctx->reqs); 774 percpu_ref_exit(&ctx->users); 775 kmem_cache_free(kioctx_cachep, ctx); 776 pr_debug("error allocating ioctx %d\n", err); 777 return ERR_PTR(err); 778 } 779 780 /* kill_ioctx 781 * Cancels all outstanding aio requests on an aio context. Used 782 * when the processes owning a context have all exited to encourage 783 * the rapid destruction of the kioctx. 784 */ 785 static int kill_ioctx(struct mm_struct *mm, struct kioctx *ctx, 786 struct completion *requests_done) 787 { 788 struct kioctx_table *table; 789 790 spin_lock(&mm->ioctx_lock); 791 if (atomic_xchg(&ctx->dead, 1)) { 792 spin_unlock(&mm->ioctx_lock); 793 return -EINVAL; 794 } 795 796 table = rcu_dereference_raw(mm->ioctx_table); 797 WARN_ON(ctx != table->table[ctx->id]); 798 table->table[ctx->id] = NULL; 799 spin_unlock(&mm->ioctx_lock); 800 801 /* percpu_ref_kill() will do the necessary call_rcu() */ 802 wake_up_all(&ctx->wait); 803 804 /* 805 * It'd be more correct to do this in free_ioctx(), after all 806 * the outstanding kiocbs have finished - but by then io_destroy 807 * has already returned, so io_setup() could potentially return 808 * -EAGAIN with no ioctxs actually in use (as far as userspace 809 * could tell). 810 */ 811 aio_nr_sub(ctx->max_reqs); 812 813 if (ctx->mmap_size) 814 vm_munmap(ctx->mmap_base, ctx->mmap_size); 815 816 ctx->requests_done = requests_done; 817 percpu_ref_kill(&ctx->users); 818 return 0; 819 } 820 821 /* 822 * exit_aio: called when the last user of mm goes away. At this point, there is 823 * no way for any new requests to be submited or any of the io_* syscalls to be 824 * called on the context. 825 * 826 * There may be outstanding kiocbs, but free_ioctx() will explicitly wait on 827 * them. 828 */ 829 void exit_aio(struct mm_struct *mm) 830 { 831 struct kioctx_table *table = rcu_dereference_raw(mm->ioctx_table); 832 int i; 833 834 if (!table) 835 return; 836 837 for (i = 0; i < table->nr; ++i) { 838 struct kioctx *ctx = table->table[i]; 839 struct completion requests_done = 840 COMPLETION_INITIALIZER_ONSTACK(requests_done); 841 842 if (!ctx) 843 continue; 844 /* 845 * We don't need to bother with munmap() here - exit_mmap(mm) 846 * is coming and it'll unmap everything. And we simply can't, 847 * this is not necessarily our ->mm. 848 * Since kill_ioctx() uses non-zero ->mmap_size as indicator 849 * that it needs to unmap the area, just set it to 0. 850 */ 851 ctx->mmap_size = 0; 852 kill_ioctx(mm, ctx, &requests_done); 853 854 /* Wait until all IO for the context are done. */ 855 wait_for_completion(&requests_done); 856 } 857 858 RCU_INIT_POINTER(mm->ioctx_table, NULL); 859 kfree(table); 860 } 861 862 static void put_reqs_available(struct kioctx *ctx, unsigned nr) 863 { 864 struct kioctx_cpu *kcpu; 865 unsigned long flags; 866 867 local_irq_save(flags); 868 kcpu = this_cpu_ptr(ctx->cpu); 869 kcpu->reqs_available += nr; 870 871 while (kcpu->reqs_available >= ctx->req_batch * 2) { 872 kcpu->reqs_available -= ctx->req_batch; 873 atomic_add(ctx->req_batch, &ctx->reqs_available); 874 } 875 876 local_irq_restore(flags); 877 } 878 879 static bool get_reqs_available(struct kioctx *ctx) 880 { 881 struct kioctx_cpu *kcpu; 882 bool ret = false; 883 unsigned long flags; 884 885 local_irq_save(flags); 886 kcpu = this_cpu_ptr(ctx->cpu); 887 if (!kcpu->reqs_available) { 888 int old, avail = atomic_read(&ctx->reqs_available); 889 890 do { 891 if (avail < ctx->req_batch) 892 goto out; 893 894 old = avail; 895 avail = atomic_cmpxchg(&ctx->reqs_available, 896 avail, avail - ctx->req_batch); 897 } while (avail != old); 898 899 kcpu->reqs_available += ctx->req_batch; 900 } 901 902 ret = true; 903 kcpu->reqs_available--; 904 out: 905 local_irq_restore(flags); 906 return ret; 907 } 908 909 /* refill_reqs_available 910 * Updates the reqs_available reference counts used for tracking the 911 * number of free slots in the completion ring. This can be called 912 * from aio_complete() (to optimistically update reqs_available) or 913 * from aio_get_req() (the we're out of events case). It must be 914 * called holding ctx->completion_lock. 915 */ 916 static void refill_reqs_available(struct kioctx *ctx, unsigned head, 917 unsigned tail) 918 { 919 unsigned events_in_ring, completed; 920 921 /* Clamp head since userland can write to it. */ 922 head %= ctx->nr_events; 923 if (head <= tail) 924 events_in_ring = tail - head; 925 else 926 events_in_ring = ctx->nr_events - (head - tail); 927 928 completed = ctx->completed_events; 929 if (events_in_ring < completed) 930 completed -= events_in_ring; 931 else 932 completed = 0; 933 934 if (!completed) 935 return; 936 937 ctx->completed_events -= completed; 938 put_reqs_available(ctx, completed); 939 } 940 941 /* user_refill_reqs_available 942 * Called to refill reqs_available when aio_get_req() encounters an 943 * out of space in the completion ring. 944 */ 945 static void user_refill_reqs_available(struct kioctx *ctx) 946 { 947 spin_lock_irq(&ctx->completion_lock); 948 if (ctx->completed_events) { 949 struct aio_ring *ring; 950 unsigned head; 951 952 /* Access of ring->head may race with aio_read_events_ring() 953 * here, but that's okay since whether we read the old version 954 * or the new version, and either will be valid. The important 955 * part is that head cannot pass tail since we prevent 956 * aio_complete() from updating tail by holding 957 * ctx->completion_lock. Even if head is invalid, the check 958 * against ctx->completed_events below will make sure we do the 959 * safe/right thing. 960 */ 961 ring = kmap_atomic(ctx->ring_pages[0]); 962 head = ring->head; 963 kunmap_atomic(ring); 964 965 refill_reqs_available(ctx, head, ctx->tail); 966 } 967 968 spin_unlock_irq(&ctx->completion_lock); 969 } 970 971 /* aio_get_req 972 * Allocate a slot for an aio request. 973 * Returns NULL if no requests are free. 974 */ 975 static inline struct aio_kiocb *aio_get_req(struct kioctx *ctx) 976 { 977 struct aio_kiocb *req; 978 979 if (!get_reqs_available(ctx)) { 980 user_refill_reqs_available(ctx); 981 if (!get_reqs_available(ctx)) 982 return NULL; 983 } 984 985 req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO); 986 if (unlikely(!req)) 987 goto out_put; 988 989 percpu_ref_get(&ctx->reqs); 990 991 req->ki_ctx = ctx; 992 return req; 993 out_put: 994 put_reqs_available(ctx, 1); 995 return NULL; 996 } 997 998 static void kiocb_free(struct aio_kiocb *req) 999 { 1000 if (req->common.ki_filp) 1001 fput(req->common.ki_filp); 1002 if (req->ki_eventfd != NULL) 1003 eventfd_ctx_put(req->ki_eventfd); 1004 kmem_cache_free(kiocb_cachep, req); 1005 } 1006 1007 static struct kioctx *lookup_ioctx(unsigned long ctx_id) 1008 { 1009 struct aio_ring __user *ring = (void __user *)ctx_id; 1010 struct mm_struct *mm = current->mm; 1011 struct kioctx *ctx, *ret = NULL; 1012 struct kioctx_table *table; 1013 unsigned id; 1014 1015 if (get_user(id, &ring->id)) 1016 return NULL; 1017 1018 rcu_read_lock(); 1019 table = rcu_dereference(mm->ioctx_table); 1020 1021 if (!table || id >= table->nr) 1022 goto out; 1023 1024 ctx = table->table[id]; 1025 if (ctx && ctx->user_id == ctx_id) { 1026 percpu_ref_get(&ctx->users); 1027 ret = ctx; 1028 } 1029 out: 1030 rcu_read_unlock(); 1031 return ret; 1032 } 1033 1034 /* aio_complete 1035 * Called when the io request on the given iocb is complete. 1036 */ 1037 static void aio_complete(struct kiocb *kiocb, long res, long res2) 1038 { 1039 struct aio_kiocb *iocb = container_of(kiocb, struct aio_kiocb, common); 1040 struct kioctx *ctx = iocb->ki_ctx; 1041 struct aio_ring *ring; 1042 struct io_event *ev_page, *event; 1043 unsigned tail, pos, head; 1044 unsigned long flags; 1045 1046 /* 1047 * Special case handling for sync iocbs: 1048 * - events go directly into the iocb for fast handling 1049 * - the sync task with the iocb in its stack holds the single iocb 1050 * ref, no other paths have a way to get another ref 1051 * - the sync task helpfully left a reference to itself in the iocb 1052 */ 1053 BUG_ON(is_sync_kiocb(kiocb)); 1054 1055 if (iocb->ki_list.next) { 1056 unsigned long flags; 1057 1058 spin_lock_irqsave(&ctx->ctx_lock, flags); 1059 list_del(&iocb->ki_list); 1060 spin_unlock_irqrestore(&ctx->ctx_lock, flags); 1061 } 1062 1063 /* 1064 * Add a completion event to the ring buffer. Must be done holding 1065 * ctx->completion_lock to prevent other code from messing with the tail 1066 * pointer since we might be called from irq context. 1067 */ 1068 spin_lock_irqsave(&ctx->completion_lock, flags); 1069 1070 tail = ctx->tail; 1071 pos = tail + AIO_EVENTS_OFFSET; 1072 1073 if (++tail >= ctx->nr_events) 1074 tail = 0; 1075 1076 ev_page = kmap_atomic(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]); 1077 event = ev_page + pos % AIO_EVENTS_PER_PAGE; 1078 1079 event->obj = (u64)(unsigned long)iocb->ki_user_iocb; 1080 event->data = iocb->ki_user_data; 1081 event->res = res; 1082 event->res2 = res2; 1083 1084 kunmap_atomic(ev_page); 1085 flush_dcache_page(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]); 1086 1087 pr_debug("%p[%u]: %p: %p %Lx %lx %lx\n", 1088 ctx, tail, iocb, iocb->ki_user_iocb, iocb->ki_user_data, 1089 res, res2); 1090 1091 /* after flagging the request as done, we 1092 * must never even look at it again 1093 */ 1094 smp_wmb(); /* make event visible before updating tail */ 1095 1096 ctx->tail = tail; 1097 1098 ring = kmap_atomic(ctx->ring_pages[0]); 1099 head = ring->head; 1100 ring->tail = tail; 1101 kunmap_atomic(ring); 1102 flush_dcache_page(ctx->ring_pages[0]); 1103 1104 ctx->completed_events++; 1105 if (ctx->completed_events > 1) 1106 refill_reqs_available(ctx, head, tail); 1107 spin_unlock_irqrestore(&ctx->completion_lock, flags); 1108 1109 pr_debug("added to ring %p at [%u]\n", iocb, tail); 1110 1111 /* 1112 * Check if the user asked us to deliver the result through an 1113 * eventfd. The eventfd_signal() function is safe to be called 1114 * from IRQ context. 1115 */ 1116 if (iocb->ki_eventfd != NULL) 1117 eventfd_signal(iocb->ki_eventfd, 1); 1118 1119 /* everything turned out well, dispose of the aiocb. */ 1120 kiocb_free(iocb); 1121 1122 /* 1123 * We have to order our ring_info tail store above and test 1124 * of the wait list below outside the wait lock. This is 1125 * like in wake_up_bit() where clearing a bit has to be 1126 * ordered with the unlocked test. 1127 */ 1128 smp_mb(); 1129 1130 if (waitqueue_active(&ctx->wait)) 1131 wake_up(&ctx->wait); 1132 1133 percpu_ref_put(&ctx->reqs); 1134 } 1135 1136 /* aio_read_events_ring 1137 * Pull an event off of the ioctx's event ring. Returns the number of 1138 * events fetched 1139 */ 1140 static long aio_read_events_ring(struct kioctx *ctx, 1141 struct io_event __user *event, long nr) 1142 { 1143 struct aio_ring *ring; 1144 unsigned head, tail, pos; 1145 long ret = 0; 1146 int copy_ret; 1147 1148 /* 1149 * The mutex can block and wake us up and that will cause 1150 * wait_event_interruptible_hrtimeout() to schedule without sleeping 1151 * and repeat. This should be rare enough that it doesn't cause 1152 * peformance issues. See the comment in read_events() for more detail. 1153 */ 1154 sched_annotate_sleep(); 1155 mutex_lock(&ctx->ring_lock); 1156 1157 /* Access to ->ring_pages here is protected by ctx->ring_lock. */ 1158 ring = kmap_atomic(ctx->ring_pages[0]); 1159 head = ring->head; 1160 tail = ring->tail; 1161 kunmap_atomic(ring); 1162 1163 /* 1164 * Ensure that once we've read the current tail pointer, that 1165 * we also see the events that were stored up to the tail. 1166 */ 1167 smp_rmb(); 1168 1169 pr_debug("h%u t%u m%u\n", head, tail, ctx->nr_events); 1170 1171 if (head == tail) 1172 goto out; 1173 1174 head %= ctx->nr_events; 1175 tail %= ctx->nr_events; 1176 1177 while (ret < nr) { 1178 long avail; 1179 struct io_event *ev; 1180 struct page *page; 1181 1182 avail = (head <= tail ? tail : ctx->nr_events) - head; 1183 if (head == tail) 1184 break; 1185 1186 avail = min(avail, nr - ret); 1187 avail = min_t(long, avail, AIO_EVENTS_PER_PAGE - 1188 ((head + AIO_EVENTS_OFFSET) % AIO_EVENTS_PER_PAGE)); 1189 1190 pos = head + AIO_EVENTS_OFFSET; 1191 page = ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]; 1192 pos %= AIO_EVENTS_PER_PAGE; 1193 1194 ev = kmap(page); 1195 copy_ret = copy_to_user(event + ret, ev + pos, 1196 sizeof(*ev) * avail); 1197 kunmap(page); 1198 1199 if (unlikely(copy_ret)) { 1200 ret = -EFAULT; 1201 goto out; 1202 } 1203 1204 ret += avail; 1205 head += avail; 1206 head %= ctx->nr_events; 1207 } 1208 1209 ring = kmap_atomic(ctx->ring_pages[0]); 1210 ring->head = head; 1211 kunmap_atomic(ring); 1212 flush_dcache_page(ctx->ring_pages[0]); 1213 1214 pr_debug("%li h%u t%u\n", ret, head, tail); 1215 out: 1216 mutex_unlock(&ctx->ring_lock); 1217 1218 return ret; 1219 } 1220 1221 static bool aio_read_events(struct kioctx *ctx, long min_nr, long nr, 1222 struct io_event __user *event, long *i) 1223 { 1224 long ret = aio_read_events_ring(ctx, event + *i, nr - *i); 1225 1226 if (ret > 0) 1227 *i += ret; 1228 1229 if (unlikely(atomic_read(&ctx->dead))) 1230 ret = -EINVAL; 1231 1232 if (!*i) 1233 *i = ret; 1234 1235 return ret < 0 || *i >= min_nr; 1236 } 1237 1238 static long read_events(struct kioctx *ctx, long min_nr, long nr, 1239 struct io_event __user *event, 1240 struct timespec __user *timeout) 1241 { 1242 ktime_t until = { .tv64 = KTIME_MAX }; 1243 long ret = 0; 1244 1245 if (timeout) { 1246 struct timespec ts; 1247 1248 if (unlikely(copy_from_user(&ts, timeout, sizeof(ts)))) 1249 return -EFAULT; 1250 1251 until = timespec_to_ktime(ts); 1252 } 1253 1254 /* 1255 * Note that aio_read_events() is being called as the conditional - i.e. 1256 * we're calling it after prepare_to_wait() has set task state to 1257 * TASK_INTERRUPTIBLE. 1258 * 1259 * But aio_read_events() can block, and if it blocks it's going to flip 1260 * the task state back to TASK_RUNNING. 1261 * 1262 * This should be ok, provided it doesn't flip the state back to 1263 * TASK_RUNNING and return 0 too much - that causes us to spin. That 1264 * will only happen if the mutex_lock() call blocks, and we then find 1265 * the ringbuffer empty. So in practice we should be ok, but it's 1266 * something to be aware of when touching this code. 1267 */ 1268 if (until.tv64 == 0) 1269 aio_read_events(ctx, min_nr, nr, event, &ret); 1270 else 1271 wait_event_interruptible_hrtimeout(ctx->wait, 1272 aio_read_events(ctx, min_nr, nr, event, &ret), 1273 until); 1274 1275 if (!ret && signal_pending(current)) 1276 ret = -EINTR; 1277 1278 return ret; 1279 } 1280 1281 /* sys_io_setup: 1282 * Create an aio_context capable of receiving at least nr_events. 1283 * ctxp must not point to an aio_context that already exists, and 1284 * must be initialized to 0 prior to the call. On successful 1285 * creation of the aio_context, *ctxp is filled in with the resulting 1286 * handle. May fail with -EINVAL if *ctxp is not initialized, 1287 * if the specified nr_events exceeds internal limits. May fail 1288 * with -EAGAIN if the specified nr_events exceeds the user's limit 1289 * of available events. May fail with -ENOMEM if insufficient kernel 1290 * resources are available. May fail with -EFAULT if an invalid 1291 * pointer is passed for ctxp. Will fail with -ENOSYS if not 1292 * implemented. 1293 */ 1294 SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp) 1295 { 1296 struct kioctx *ioctx = NULL; 1297 unsigned long ctx; 1298 long ret; 1299 1300 ret = get_user(ctx, ctxp); 1301 if (unlikely(ret)) 1302 goto out; 1303 1304 ret = -EINVAL; 1305 if (unlikely(ctx || nr_events == 0)) { 1306 pr_debug("EINVAL: ctx %lu nr_events %u\n", 1307 ctx, nr_events); 1308 goto out; 1309 } 1310 1311 ioctx = ioctx_alloc(nr_events); 1312 ret = PTR_ERR(ioctx); 1313 if (!IS_ERR(ioctx)) { 1314 ret = put_user(ioctx->user_id, ctxp); 1315 if (ret) 1316 kill_ioctx(current->mm, ioctx, NULL); 1317 percpu_ref_put(&ioctx->users); 1318 } 1319 1320 out: 1321 return ret; 1322 } 1323 1324 /* sys_io_destroy: 1325 * Destroy the aio_context specified. May cancel any outstanding 1326 * AIOs and block on completion. Will fail with -ENOSYS if not 1327 * implemented. May fail with -EINVAL if the context pointed to 1328 * is invalid. 1329 */ 1330 SYSCALL_DEFINE1(io_destroy, aio_context_t, ctx) 1331 { 1332 struct kioctx *ioctx = lookup_ioctx(ctx); 1333 if (likely(NULL != ioctx)) { 1334 struct completion requests_done = 1335 COMPLETION_INITIALIZER_ONSTACK(requests_done); 1336 int ret; 1337 1338 /* Pass requests_done to kill_ioctx() where it can be set 1339 * in a thread-safe way. If we try to set it here then we have 1340 * a race condition if two io_destroy() called simultaneously. 1341 */ 1342 ret = kill_ioctx(current->mm, ioctx, &requests_done); 1343 percpu_ref_put(&ioctx->users); 1344 1345 /* Wait until all IO for the context are done. Otherwise kernel 1346 * keep using user-space buffers even if user thinks the context 1347 * is destroyed. 1348 */ 1349 if (!ret) 1350 wait_for_completion(&requests_done); 1351 1352 return ret; 1353 } 1354 pr_debug("EINVAL: invalid context id\n"); 1355 return -EINVAL; 1356 } 1357 1358 typedef ssize_t (rw_iter_op)(struct kiocb *, struct iov_iter *); 1359 1360 static int aio_setup_vectored_rw(int rw, char __user *buf, size_t len, 1361 struct iovec **iovec, 1362 bool compat, 1363 struct iov_iter *iter) 1364 { 1365 #ifdef CONFIG_COMPAT 1366 if (compat) 1367 return compat_import_iovec(rw, 1368 (struct compat_iovec __user *)buf, 1369 len, UIO_FASTIOV, iovec, iter); 1370 #endif 1371 return import_iovec(rw, (struct iovec __user *)buf, 1372 len, UIO_FASTIOV, iovec, iter); 1373 } 1374 1375 /* 1376 * aio_run_iocb: 1377 * Performs the initial checks and io submission. 1378 */ 1379 static ssize_t aio_run_iocb(struct kiocb *req, unsigned opcode, 1380 char __user *buf, size_t len, bool compat) 1381 { 1382 struct file *file = req->ki_filp; 1383 ssize_t ret; 1384 int rw; 1385 fmode_t mode; 1386 rw_iter_op *iter_op; 1387 struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; 1388 struct iov_iter iter; 1389 1390 switch (opcode) { 1391 case IOCB_CMD_PREAD: 1392 case IOCB_CMD_PREADV: 1393 mode = FMODE_READ; 1394 rw = READ; 1395 iter_op = file->f_op->read_iter; 1396 goto rw_common; 1397 1398 case IOCB_CMD_PWRITE: 1399 case IOCB_CMD_PWRITEV: 1400 mode = FMODE_WRITE; 1401 rw = WRITE; 1402 iter_op = file->f_op->write_iter; 1403 goto rw_common; 1404 rw_common: 1405 if (unlikely(!(file->f_mode & mode))) 1406 return -EBADF; 1407 1408 if (!iter_op) 1409 return -EINVAL; 1410 1411 if (opcode == IOCB_CMD_PREADV || opcode == IOCB_CMD_PWRITEV) 1412 ret = aio_setup_vectored_rw(rw, buf, len, 1413 &iovec, compat, &iter); 1414 else { 1415 ret = import_single_range(rw, buf, len, iovec, &iter); 1416 iovec = NULL; 1417 } 1418 if (!ret) 1419 ret = rw_verify_area(rw, file, &req->ki_pos, 1420 iov_iter_count(&iter)); 1421 if (ret < 0) { 1422 kfree(iovec); 1423 return ret; 1424 } 1425 1426 len = ret; 1427 1428 if (rw == WRITE) 1429 file_start_write(file); 1430 1431 ret = iter_op(req, &iter); 1432 1433 if (rw == WRITE) 1434 file_end_write(file); 1435 kfree(iovec); 1436 break; 1437 1438 case IOCB_CMD_FDSYNC: 1439 if (!file->f_op->aio_fsync) 1440 return -EINVAL; 1441 1442 ret = file->f_op->aio_fsync(req, 1); 1443 break; 1444 1445 case IOCB_CMD_FSYNC: 1446 if (!file->f_op->aio_fsync) 1447 return -EINVAL; 1448 1449 ret = file->f_op->aio_fsync(req, 0); 1450 break; 1451 1452 default: 1453 pr_debug("EINVAL: no operation provided\n"); 1454 return -EINVAL; 1455 } 1456 1457 if (ret != -EIOCBQUEUED) { 1458 /* 1459 * There's no easy way to restart the syscall since other AIO's 1460 * may be already running. Just fail this IO with EINTR. 1461 */ 1462 if (unlikely(ret == -ERESTARTSYS || ret == -ERESTARTNOINTR || 1463 ret == -ERESTARTNOHAND || 1464 ret == -ERESTART_RESTARTBLOCK)) 1465 ret = -EINTR; 1466 aio_complete(req, ret, 0); 1467 } 1468 1469 return 0; 1470 } 1471 1472 static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, 1473 struct iocb *iocb, bool compat) 1474 { 1475 struct aio_kiocb *req; 1476 ssize_t ret; 1477 1478 /* enforce forwards compatibility on users */ 1479 if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2)) { 1480 pr_debug("EINVAL: reserve field set\n"); 1481 return -EINVAL; 1482 } 1483 1484 /* prevent overflows */ 1485 if (unlikely( 1486 (iocb->aio_buf != (unsigned long)iocb->aio_buf) || 1487 (iocb->aio_nbytes != (size_t)iocb->aio_nbytes) || 1488 ((ssize_t)iocb->aio_nbytes < 0) 1489 )) { 1490 pr_debug("EINVAL: overflow check\n"); 1491 return -EINVAL; 1492 } 1493 1494 req = aio_get_req(ctx); 1495 if (unlikely(!req)) 1496 return -EAGAIN; 1497 1498 req->common.ki_filp = fget(iocb->aio_fildes); 1499 if (unlikely(!req->common.ki_filp)) { 1500 ret = -EBADF; 1501 goto out_put_req; 1502 } 1503 req->common.ki_pos = iocb->aio_offset; 1504 req->common.ki_complete = aio_complete; 1505 req->common.ki_flags = 0; 1506 1507 if (iocb->aio_flags & IOCB_FLAG_RESFD) { 1508 /* 1509 * If the IOCB_FLAG_RESFD flag of aio_flags is set, get an 1510 * instance of the file* now. The file descriptor must be 1511 * an eventfd() fd, and will be signaled for each completed 1512 * event using the eventfd_signal() function. 1513 */ 1514 req->ki_eventfd = eventfd_ctx_fdget((int) iocb->aio_resfd); 1515 if (IS_ERR(req->ki_eventfd)) { 1516 ret = PTR_ERR(req->ki_eventfd); 1517 req->ki_eventfd = NULL; 1518 goto out_put_req; 1519 } 1520 1521 req->common.ki_flags |= IOCB_EVENTFD; 1522 } 1523 1524 ret = put_user(KIOCB_KEY, &user_iocb->aio_key); 1525 if (unlikely(ret)) { 1526 pr_debug("EFAULT: aio_key\n"); 1527 goto out_put_req; 1528 } 1529 1530 req->ki_user_iocb = user_iocb; 1531 req->ki_user_data = iocb->aio_data; 1532 1533 ret = aio_run_iocb(&req->common, iocb->aio_lio_opcode, 1534 (char __user *)(unsigned long)iocb->aio_buf, 1535 iocb->aio_nbytes, 1536 compat); 1537 if (ret) 1538 goto out_put_req; 1539 1540 return 0; 1541 out_put_req: 1542 put_reqs_available(ctx, 1); 1543 percpu_ref_put(&ctx->reqs); 1544 kiocb_free(req); 1545 return ret; 1546 } 1547 1548 long do_io_submit(aio_context_t ctx_id, long nr, 1549 struct iocb __user *__user *iocbpp, bool compat) 1550 { 1551 struct kioctx *ctx; 1552 long ret = 0; 1553 int i = 0; 1554 struct blk_plug plug; 1555 1556 if (unlikely(nr < 0)) 1557 return -EINVAL; 1558 1559 if (unlikely(nr > LONG_MAX/sizeof(*iocbpp))) 1560 nr = LONG_MAX/sizeof(*iocbpp); 1561 1562 if (unlikely(!access_ok(VERIFY_READ, iocbpp, (nr*sizeof(*iocbpp))))) 1563 return -EFAULT; 1564 1565 ctx = lookup_ioctx(ctx_id); 1566 if (unlikely(!ctx)) { 1567 pr_debug("EINVAL: invalid context id\n"); 1568 return -EINVAL; 1569 } 1570 1571 blk_start_plug(&plug); 1572 1573 /* 1574 * AKPM: should this return a partial result if some of the IOs were 1575 * successfully submitted? 1576 */ 1577 for (i=0; i<nr; i++) { 1578 struct iocb __user *user_iocb; 1579 struct iocb tmp; 1580 1581 if (unlikely(__get_user(user_iocb, iocbpp + i))) { 1582 ret = -EFAULT; 1583 break; 1584 } 1585 1586 if (unlikely(copy_from_user(&tmp, user_iocb, sizeof(tmp)))) { 1587 ret = -EFAULT; 1588 break; 1589 } 1590 1591 ret = io_submit_one(ctx, user_iocb, &tmp, compat); 1592 if (ret) 1593 break; 1594 } 1595 blk_finish_plug(&plug); 1596 1597 percpu_ref_put(&ctx->users); 1598 return i ? i : ret; 1599 } 1600 1601 /* sys_io_submit: 1602 * Queue the nr iocbs pointed to by iocbpp for processing. Returns 1603 * the number of iocbs queued. May return -EINVAL if the aio_context 1604 * specified by ctx_id is invalid, if nr is < 0, if the iocb at 1605 * *iocbpp[0] is not properly initialized, if the operation specified 1606 * is invalid for the file descriptor in the iocb. May fail with 1607 * -EFAULT if any of the data structures point to invalid data. May 1608 * fail with -EBADF if the file descriptor specified in the first 1609 * iocb is invalid. May fail with -EAGAIN if insufficient resources 1610 * are available to queue any iocbs. Will return 0 if nr is 0. Will 1611 * fail with -ENOSYS if not implemented. 1612 */ 1613 SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, nr, 1614 struct iocb __user * __user *, iocbpp) 1615 { 1616 return do_io_submit(ctx_id, nr, iocbpp, 0); 1617 } 1618 1619 /* lookup_kiocb 1620 * Finds a given iocb for cancellation. 1621 */ 1622 static struct aio_kiocb * 1623 lookup_kiocb(struct kioctx *ctx, struct iocb __user *iocb, u32 key) 1624 { 1625 struct aio_kiocb *kiocb; 1626 1627 assert_spin_locked(&ctx->ctx_lock); 1628 1629 if (key != KIOCB_KEY) 1630 return NULL; 1631 1632 /* TODO: use a hash or array, this sucks. */ 1633 list_for_each_entry(kiocb, &ctx->active_reqs, ki_list) { 1634 if (kiocb->ki_user_iocb == iocb) 1635 return kiocb; 1636 } 1637 return NULL; 1638 } 1639 1640 /* sys_io_cancel: 1641 * Attempts to cancel an iocb previously passed to io_submit. If 1642 * the operation is successfully cancelled, the resulting event is 1643 * copied into the memory pointed to by result without being placed 1644 * into the completion queue and 0 is returned. May fail with 1645 * -EFAULT if any of the data structures pointed to are invalid. 1646 * May fail with -EINVAL if aio_context specified by ctx_id is 1647 * invalid. May fail with -EAGAIN if the iocb specified was not 1648 * cancelled. Will fail with -ENOSYS if not implemented. 1649 */ 1650 SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb, 1651 struct io_event __user *, result) 1652 { 1653 struct kioctx *ctx; 1654 struct aio_kiocb *kiocb; 1655 u32 key; 1656 int ret; 1657 1658 ret = get_user(key, &iocb->aio_key); 1659 if (unlikely(ret)) 1660 return -EFAULT; 1661 1662 ctx = lookup_ioctx(ctx_id); 1663 if (unlikely(!ctx)) 1664 return -EINVAL; 1665 1666 spin_lock_irq(&ctx->ctx_lock); 1667 1668 kiocb = lookup_kiocb(ctx, iocb, key); 1669 if (kiocb) 1670 ret = kiocb_cancel(kiocb); 1671 else 1672 ret = -EINVAL; 1673 1674 spin_unlock_irq(&ctx->ctx_lock); 1675 1676 if (!ret) { 1677 /* 1678 * The result argument is no longer used - the io_event is 1679 * always delivered via the ring buffer. -EINPROGRESS indicates 1680 * cancellation is progress: 1681 */ 1682 ret = -EINPROGRESS; 1683 } 1684 1685 percpu_ref_put(&ctx->users); 1686 1687 return ret; 1688 } 1689 1690 /* io_getevents: 1691 * Attempts to read at least min_nr events and up to nr events from 1692 * the completion queue for the aio_context specified by ctx_id. If 1693 * it succeeds, the number of read events is returned. May fail with 1694 * -EINVAL if ctx_id is invalid, if min_nr is out of range, if nr is 1695 * out of range, if timeout is out of range. May fail with -EFAULT 1696 * if any of the memory specified is invalid. May return 0 or 1697 * < min_nr if the timeout specified by timeout has elapsed 1698 * before sufficient events are available, where timeout == NULL 1699 * specifies an infinite timeout. Note that the timeout pointed to by 1700 * timeout is relative. Will fail with -ENOSYS if not implemented. 1701 */ 1702 SYSCALL_DEFINE5(io_getevents, aio_context_t, ctx_id, 1703 long, min_nr, 1704 long, nr, 1705 struct io_event __user *, events, 1706 struct timespec __user *, timeout) 1707 { 1708 struct kioctx *ioctx = lookup_ioctx(ctx_id); 1709 long ret = -EINVAL; 1710 1711 if (likely(ioctx)) { 1712 if (likely(min_nr <= nr && min_nr >= 0)) 1713 ret = read_events(ioctx, min_nr, nr, events, timeout); 1714 percpu_ref_put(&ioctx->users); 1715 } 1716 return ret; 1717 } 1718