1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Copyright (c) 2010 Red Hat, Inc. All Rights Reserved. 4 */ 5 6 #include "xfs_platform.h" 7 #include "xfs_fs.h" 8 #include "xfs_format.h" 9 #include "xfs_log_format.h" 10 #include "xfs_shared.h" 11 #include "xfs_trans_resv.h" 12 #include "xfs_mount.h" 13 #include "xfs_extent_busy.h" 14 #include "xfs_trans.h" 15 #include "xfs_trans_priv.h" 16 #include "xfs_log.h" 17 #include "xfs_log_priv.h" 18 #include "xfs_trace.h" 19 #include "xfs_discard.h" 20 21 /* 22 * Allocate a new ticket. Failing to get a new ticket makes it really hard to 23 * recover, so we don't allow failure here. Also, we allocate in a context that 24 * we don't want to be issuing transactions from, so we need to tell the 25 * allocation code this as well. 26 * 27 * We don't reserve any space for the ticket - we are going to steal whatever 28 * space we require from transactions as they commit. To ensure we reserve all 29 * the space required, we need to set the current reservation of the ticket to 30 * zero so that we know to steal the initial transaction overhead from the 31 * first transaction commit. 32 */ 33 static struct xlog_ticket * 34 xlog_cil_ticket_alloc( 35 struct xlog *log) 36 { 37 struct xlog_ticket *tic; 38 39 tic = xlog_ticket_alloc(log, 0, 1, 0); 40 41 /* 42 * set the current reservation to zero so we know to steal the basic 43 * transaction overhead reservation from the first transaction commit. 44 */ 45 tic->t_curr_res = 0; 46 tic->t_iclog_hdrs = 0; 47 return tic; 48 } 49 50 static inline void 51 xlog_cil_set_iclog_hdr_count(struct xfs_cil *cil) 52 { 53 struct xlog *log = cil->xc_log; 54 55 atomic_set(&cil->xc_iclog_hdrs, 56 (XLOG_CIL_BLOCKING_SPACE_LIMIT(log) / 57 (log->l_iclog_size - log->l_iclog_hsize))); 58 } 59 60 /* 61 * Check if the current log item was first committed in this sequence. 62 * We can't rely on just the log item being in the CIL, we have to check 63 * the recorded commit sequence number. 64 * 65 * Note: for this to be used in a non-racy manner, it has to be called with 66 * CIL flushing locked out. As a result, it should only be used during the 67 * transaction commit process when deciding what to format into the item. 68 */ 69 static bool 70 xlog_item_in_current_chkpt( 71 struct xfs_cil *cil, 72 struct xfs_log_item *lip) 73 { 74 if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) 75 return false; 76 77 /* 78 * li_seq is written on the first commit of a log item to record the 79 * first checkpoint it is written to. Hence if it is different to the 80 * current sequence, we're in a new checkpoint. 81 */ 82 return lip->li_seq == READ_ONCE(cil->xc_current_sequence); 83 } 84 85 bool 86 xfs_log_item_in_current_chkpt( 87 struct xfs_log_item *lip) 88 { 89 return xlog_item_in_current_chkpt(lip->li_log->l_cilp, lip); 90 } 91 92 /* 93 * Unavoidable forward declaration - xlog_cil_push_work() calls 94 * xlog_cil_ctx_alloc() itself. 95 */ 96 static void xlog_cil_push_work(struct work_struct *work); 97 98 static struct xfs_cil_ctx * 99 xlog_cil_ctx_alloc(void) 100 { 101 struct xfs_cil_ctx *ctx; 102 103 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL | __GFP_NOFAIL); 104 INIT_LIST_HEAD(&ctx->committing); 105 INIT_LIST_HEAD(&ctx->busy_extents.extent_list); 106 INIT_LIST_HEAD(&ctx->log_items); 107 INIT_LIST_HEAD(&ctx->lv_chain); 108 INIT_WORK(&ctx->push_work, xlog_cil_push_work); 109 return ctx; 110 } 111 112 /* 113 * Aggregate the CIL per cpu structures into global counts, lists, etc and 114 * clear the percpu state ready for the next context to use. This is called 115 * from the push code with the context lock held exclusively, hence nothing else 116 * will be accessing or modifying the per-cpu counters. 117 */ 118 static void 119 xlog_cil_push_pcp_aggregate( 120 struct xfs_cil *cil, 121 struct xfs_cil_ctx *ctx) 122 { 123 struct xlog_cil_pcp *cilpcp; 124 int cpu; 125 126 for_each_cpu(cpu, &ctx->cil_pcpmask) { 127 cilpcp = per_cpu_ptr(cil->xc_pcp, cpu); 128 129 ctx->ticket->t_curr_res += cilpcp->space_reserved; 130 cilpcp->space_reserved = 0; 131 132 if (!list_empty(&cilpcp->busy_extents)) { 133 list_splice_init(&cilpcp->busy_extents, 134 &ctx->busy_extents.extent_list); 135 } 136 if (!list_empty(&cilpcp->log_items)) 137 list_splice_init(&cilpcp->log_items, &ctx->log_items); 138 139 /* 140 * We're in the middle of switching cil contexts. Reset the 141 * counter we use to detect when the current context is nearing 142 * full. 143 */ 144 cilpcp->space_used = 0; 145 } 146 } 147 148 /* 149 * Aggregate the CIL per-cpu space used counters into the global atomic value. 150 * This is called when the per-cpu counter aggregation will first pass the soft 151 * limit threshold so we can switch to atomic counter aggregation for accurate 152 * detection of hard limit traversal. 153 */ 154 static void 155 xlog_cil_insert_pcp_aggregate( 156 struct xfs_cil *cil, 157 struct xfs_cil_ctx *ctx) 158 { 159 int cpu; 160 int count = 0; 161 162 /* Trigger atomic updates then aggregate only for the first caller */ 163 if (!test_and_clear_bit(XLOG_CIL_PCP_SPACE, &cil->xc_flags)) 164 return; 165 166 /* 167 * We can race with other cpus setting cil_pcpmask. However, we've 168 * atomically cleared PCP_SPACE which forces other threads to add to 169 * the global space used count. cil_pcpmask is a superset of cilpcp 170 * structures that could have a nonzero space_used. 171 */ 172 for_each_cpu(cpu, &ctx->cil_pcpmask) { 173 struct xlog_cil_pcp *cilpcp = per_cpu_ptr(cil->xc_pcp, cpu); 174 175 count += xchg(&cilpcp->space_used, 0); 176 } 177 atomic_add(count, &ctx->space_used); 178 } 179 180 static void 181 xlog_cil_ctx_switch( 182 struct xfs_cil *cil, 183 struct xfs_cil_ctx *ctx) 184 { 185 xlog_cil_set_iclog_hdr_count(cil); 186 set_bit(XLOG_CIL_EMPTY, &cil->xc_flags); 187 set_bit(XLOG_CIL_PCP_SPACE, &cil->xc_flags); 188 ctx->sequence = ++cil->xc_current_sequence; 189 ctx->cil = cil; 190 cil->xc_ctx = ctx; 191 } 192 193 /* 194 * After the first stage of log recovery is done, we know where the head and 195 * tail of the log are. We need this log initialisation done before we can 196 * initialise the first CIL checkpoint context. 197 * 198 * Here we allocate a log ticket to track space usage during a CIL push. This 199 * ticket is passed to xlog_write() directly so that we don't slowly leak log 200 * space by failing to account for space used by log headers and additional 201 * region headers for split regions. 202 */ 203 void 204 xlog_cil_init_post_recovery( 205 struct xlog *log) 206 { 207 log->l_cilp->xc_ctx->ticket = xlog_cil_ticket_alloc(log); 208 log->l_cilp->xc_ctx->sequence = 1; 209 xlog_cil_set_iclog_hdr_count(log->l_cilp); 210 } 211 212 static inline int 213 xlog_cil_iovec_space( 214 uint niovecs) 215 { 216 return round_up((sizeof(struct xfs_log_vec) + 217 niovecs * sizeof(struct xfs_log_iovec)), 218 sizeof(uint64_t)); 219 } 220 221 /* 222 * Allocate or pin log vector buffers for CIL insertion. 223 * 224 * The CIL currently uses disposable buffers for copying a snapshot of the 225 * modified items into the log during a push. The biggest problem with this is 226 * the requirement to allocate the disposable buffer during the commit if: 227 * a) does not exist; or 228 * b) it is too small 229 * 230 * If we do this allocation within xlog_cil_insert_format_items(), it is done 231 * under the xc_ctx_lock, which means that a CIL push cannot occur during 232 * the memory allocation. This means that we have a potential deadlock situation 233 * under low memory conditions when we have lots of dirty metadata pinned in 234 * the CIL and we need a CIL commit to occur to free memory. 235 * 236 * To avoid this, we need to move the memory allocation outside the 237 * xc_ctx_lock, but because the log vector buffers are disposable, that opens 238 * up a TOCTOU race condition w.r.t. the CIL committing and removing the log 239 * vector buffers between the check and the formatting of the item into the 240 * log vector buffer within the xc_ctx_lock. 241 * 242 * Because the log vector buffer needs to be unchanged during the CIL push 243 * process, we cannot share the buffer between the transaction commit (which 244 * modifies the buffer) and the CIL push context that is writing the changes 245 * into the log. This means skipping preallocation of buffer space is 246 * unreliable, but we most definitely do not want to be allocating and freeing 247 * buffers unnecessarily during commits when overwrites can be done safely. 248 * 249 * The simplest solution to this problem is to allocate a shadow buffer when a 250 * log item is committed for the second time, and then to only use this buffer 251 * if necessary. The buffer can remain attached to the log item until such time 252 * it is needed, and this is the buffer that is reallocated to match the size of 253 * the incoming modification. Then during the formatting of the item we can swap 254 * the active buffer with the new one if we can't reuse the existing buffer. We 255 * don't free the old buffer as it may be reused on the next modification if 256 * it's size is right, otherwise we'll free and reallocate it at that point. 257 * 258 * This function builds a vector for the changes in each log item in the 259 * transaction. It then works out the length of the buffer needed for each log 260 * item, allocates them and attaches the vector to the log item in preparation 261 * for the formatting step which occurs under the xc_ctx_lock. 262 * 263 * While this means the memory footprint goes up, it avoids the repeated 264 * alloc/free pattern that repeated modifications of an item would otherwise 265 * cause, and hence minimises the CPU overhead of such behaviour. 266 */ 267 static void 268 xlog_cil_alloc_shadow_bufs( 269 struct xlog *log, 270 struct xfs_trans *tp) 271 { 272 struct xfs_log_item *lip; 273 274 list_for_each_entry(lip, &tp->t_items, li_trans) { 275 struct xfs_log_vec *lv; 276 int niovecs = 0; 277 int nbytes = 0; 278 int alloc_size; 279 bool ordered = false; 280 281 /* Skip items which aren't dirty in this transaction. */ 282 if (!test_bit(XFS_LI_DIRTY, &lip->li_flags)) 283 continue; 284 285 /* get number of vecs and size of data to be stored */ 286 lip->li_ops->iop_size(lip, &niovecs, &nbytes); 287 288 /* 289 * Ordered items need to be tracked but we do not wish to write 290 * them. We need a logvec to track the object, but we do not 291 * need an iovec or buffer to be allocated for copying data. 292 */ 293 if (niovecs == XFS_LOG_VEC_ORDERED) { 294 ordered = true; 295 niovecs = 0; 296 nbytes = 0; 297 } 298 299 /* 300 * We 64-bit align the length of each iovec so that the start of 301 * the next one is naturally aligned. We'll need to account for 302 * that slack space here. 303 * 304 * We also add the xlog_op_header to each region when 305 * formatting, but that's not accounted to the size of the item 306 * at this point. Hence we'll need an addition number of bytes 307 * for each vector to hold an opheader. 308 * 309 * Then round nbytes up to 64-bit alignment so that the initial 310 * buffer alignment is easy to calculate and verify. 311 */ 312 nbytes = xlog_item_space(niovecs, nbytes); 313 314 /* 315 * The data buffer needs to start 64-bit aligned, so round up 316 * that space to ensure we can align it appropriately and not 317 * overrun the buffer. 318 */ 319 alloc_size = nbytes + xlog_cil_iovec_space(niovecs); 320 321 /* 322 * if we have no shadow buffer, or it is too small, we need to 323 * reallocate it. 324 */ 325 if (!lip->li_lv_shadow || 326 alloc_size > lip->li_lv_shadow->lv_alloc_size) { 327 /* 328 * We free and allocate here as a realloc would copy 329 * unnecessary data. We don't use kvzalloc() for the 330 * same reason - we don't need to zero the data area in 331 * the buffer, only the log vector header and the iovec 332 * storage. 333 */ 334 kvfree(lip->li_lv_shadow); 335 lv = xlog_kvmalloc(alloc_size); 336 337 memset(lv, 0, xlog_cil_iovec_space(niovecs)); 338 339 INIT_LIST_HEAD(&lv->lv_list); 340 lv->lv_item = lip; 341 lv->lv_alloc_size = alloc_size; 342 if (ordered) 343 lv->lv_buf_used = XFS_LOG_VEC_ORDERED; 344 else 345 lv->lv_iovecp = (struct xfs_log_iovec *)&lv[1]; 346 lip->li_lv_shadow = lv; 347 } else { 348 /* same or smaller, optimise common overwrite case */ 349 lv = lip->li_lv_shadow; 350 if (ordered) 351 lv->lv_buf_used = XFS_LOG_VEC_ORDERED; 352 else 353 lv->lv_buf_used = 0; 354 lv->lv_bytes = 0; 355 } 356 357 /* Ensure the lv is set up according to ->iop_size */ 358 lv->lv_niovecs = niovecs; 359 360 /* The allocated data region lies beyond the iovec region */ 361 lv->lv_buf = (char *)lv + xlog_cil_iovec_space(niovecs); 362 } 363 364 } 365 366 /* 367 * Prepare the log item for insertion into the CIL. Calculate the difference in 368 * log space it will consume, and if it is a new item pin it as well. 369 */ 370 STATIC void 371 xfs_cil_prepare_item( 372 struct xlog *log, 373 struct xfs_log_item *lip, 374 struct xfs_log_vec *lv, 375 int *diff_len) 376 { 377 /* Account for the new LV being passed in */ 378 if (lv->lv_buf_used != XFS_LOG_VEC_ORDERED) 379 *diff_len += lv->lv_bytes; 380 381 /* 382 * If there is no old LV, this is the first time we've seen the item in 383 * this CIL context and so we need to pin it. If we are replacing the 384 * old lv, then remove the space it accounts for and make it the shadow 385 * buffer for later freeing. In both cases we are now switching to the 386 * shadow buffer, so update the pointer to it appropriately. 387 */ 388 if (!lip->li_lv) { 389 if (lv->lv_item->li_ops->iop_pin) 390 lv->lv_item->li_ops->iop_pin(lv->lv_item); 391 lv->lv_item->li_lv_shadow = NULL; 392 } else if (lip->li_lv != lv) { 393 ASSERT(lv->lv_buf_used != XFS_LOG_VEC_ORDERED); 394 395 *diff_len -= lip->li_lv->lv_bytes; 396 lv->lv_item->li_lv_shadow = lip->li_lv; 397 } 398 399 /* attach new log vector to log item */ 400 lv->lv_item->li_lv = lv; 401 402 /* 403 * If this is the first time the item is being committed to the 404 * CIL, store the sequence number on the log item so we can 405 * tell in future commits whether this is the first checkpoint 406 * the item is being committed into. 407 */ 408 if (!lv->lv_item->li_seq) 409 lv->lv_item->li_seq = log->l_cilp->xc_ctx->sequence; 410 } 411 412 struct xlog_format_buf { 413 struct xfs_log_vec *lv; 414 unsigned int idx; 415 }; 416 417 /* 418 * We need to make sure the buffer pointer returned is naturally aligned for the 419 * biggest basic data type we put into it. We have already accounted for this 420 * padding when sizing the buffer. 421 * 422 * However, this padding does not get written into the log, and hence we have to 423 * track the space used by the log vectors separately to prevent log space hangs 424 * due to inaccurate accounting (i.e. a leak) of the used log space through the 425 * CIL context ticket. 426 * 427 * We also add space for the xlog_op_header that describes this region in the 428 * log. This prepends the data region we return to the caller to copy their data 429 * into, so do all the static initialisation of the ophdr now. Because the ophdr 430 * is not 8 byte aligned, we have to be careful to ensure that we align the 431 * start of the buffer such that the region we return to the call is 8 byte 432 * aligned and packed against the tail of the ophdr. 433 */ 434 void * 435 xlog_format_start( 436 struct xlog_format_buf *lfb, 437 uint16_t type) 438 { 439 struct xfs_log_vec *lv = lfb->lv; 440 struct xfs_log_iovec *vec = &lv->lv_iovecp[lfb->idx]; 441 struct xlog_op_header *oph; 442 uint32_t len; 443 void *buf; 444 445 ASSERT(lfb->idx < lv->lv_niovecs); 446 447 len = lv->lv_buf_used + sizeof(struct xlog_op_header); 448 if (!IS_ALIGNED(len, sizeof(uint64_t))) { 449 lv->lv_buf_used = round_up(len, sizeof(uint64_t)) - 450 sizeof(struct xlog_op_header); 451 } 452 453 vec->i_type = type; 454 vec->i_addr = lv->lv_buf + lv->lv_buf_used; 455 456 oph = vec->i_addr; 457 oph->oh_clientid = XFS_TRANSACTION; 458 oph->oh_res2 = 0; 459 oph->oh_flags = 0; 460 461 buf = vec->i_addr + sizeof(struct xlog_op_header); 462 ASSERT(IS_ALIGNED((unsigned long)buf, sizeof(uint64_t))); 463 return buf; 464 } 465 466 void 467 xlog_format_commit( 468 struct xlog_format_buf *lfb, 469 unsigned int data_len) 470 { 471 struct xfs_log_vec *lv = lfb->lv; 472 struct xfs_log_iovec *vec = &lv->lv_iovecp[lfb->idx]; 473 struct xlog_op_header *oph = vec->i_addr; 474 int len; 475 476 /* 477 * Always round up the length to the correct alignment so callers don't 478 * need to know anything about this log vec layout requirement. This 479 * means we have to zero the area the data to be written does not cover. 480 * This is complicated by fact the payload region is offset into the 481 * logvec region by the opheader that tracks the payload. 482 */ 483 len = xlog_calc_iovec_len(data_len); 484 if (len - data_len != 0) { 485 char *buf = vec->i_addr + sizeof(struct xlog_op_header); 486 487 memset(buf + data_len, 0, len - data_len); 488 } 489 490 /* 491 * The opheader tracks aligned payload length, whilst the logvec tracks 492 * the overall region length. 493 */ 494 oph->oh_len = cpu_to_be32(len); 495 496 len += sizeof(struct xlog_op_header); 497 lv->lv_buf_used += len; 498 lv->lv_bytes += len; 499 vec->i_len = len; 500 501 /* Catch buffer overruns */ 502 ASSERT((void *)lv->lv_buf + lv->lv_bytes <= 503 (void *)lv + lv->lv_alloc_size); 504 505 lfb->idx++; 506 } 507 508 /* 509 * Format log item into a flat buffers 510 * 511 * For delayed logging, we need to hold a formatted buffer containing all the 512 * changes on the log item. This enables us to relog the item in memory and 513 * write it out asynchronously without needing to relock the object that was 514 * modified at the time it gets written into the iclog. 515 * 516 * This function takes the prepared log vectors attached to each log item, and 517 * formats the changes into the log vector buffer. The buffer it uses is 518 * dependent on the current state of the vector in the CIL - the shadow lv is 519 * guaranteed to be large enough for the current modification, but we will only 520 * use that if we can't reuse the existing lv. If we can't reuse the existing 521 * lv, then simple swap it out for the shadow lv. We don't free it - that is 522 * done lazily either by th enext modification or the freeing of the log item. 523 * 524 * We don't set up region headers during this process; we simply copy the 525 * regions into the flat buffer. We can do this because we still have to do a 526 * formatting step to write the regions into the iclog buffer. Writing the 527 * ophdrs during the iclog write means that we can support splitting large 528 * regions across iclog boundares without needing a change in the format of the 529 * item/region encapsulation. 530 * 531 * Hence what we need to do now is change the rewrite the vector array to point 532 * to the copied region inside the buffer we just allocated. This allows us to 533 * format the regions into the iclog as though they are being formatted 534 * directly out of the objects themselves. 535 */ 536 static void 537 xlog_cil_insert_format_items( 538 struct xlog *log, 539 struct xfs_trans *tp, 540 int *diff_len) 541 { 542 struct xfs_log_item *lip; 543 544 /* Bail out if we didn't find a log item. */ 545 if (list_empty(&tp->t_items)) { 546 ASSERT(0); 547 return; 548 } 549 550 list_for_each_entry(lip, &tp->t_items, li_trans) { 551 struct xfs_log_vec *lv = lip->li_lv; 552 struct xfs_log_vec *shadow = lip->li_lv_shadow; 553 struct xlog_format_buf lfb = { }; 554 555 /* Skip items which aren't dirty in this transaction. */ 556 if (!test_bit(XFS_LI_DIRTY, &lip->li_flags)) 557 continue; 558 559 /* 560 * The formatting size information is already attached to 561 * the shadow lv on the log item. 562 */ 563 if (shadow->lv_buf_used == XFS_LOG_VEC_ORDERED) { 564 if (!lv) { 565 lv = shadow; 566 lv->lv_item = lip; 567 } 568 ASSERT(shadow->lv_alloc_size == lv->lv_alloc_size); 569 xfs_cil_prepare_item(log, lip, lv, diff_len); 570 continue; 571 } 572 573 /* Skip items that do not have any vectors for writing */ 574 if (!shadow->lv_niovecs) 575 continue; 576 577 /* compare to existing item size */ 578 if (lv && shadow->lv_alloc_size <= lv->lv_alloc_size) { 579 /* same or smaller, optimise common overwrite case */ 580 581 /* 582 * set the item up as though it is a new insertion so 583 * that the space reservation accounting is correct. 584 */ 585 *diff_len -= lv->lv_bytes; 586 587 /* Ensure the lv is set up according to ->iop_size */ 588 lv->lv_niovecs = shadow->lv_niovecs; 589 590 /* reset the lv buffer information for new formatting */ 591 lv->lv_buf_used = 0; 592 lv->lv_bytes = 0; 593 lv->lv_buf = (char *)lv + 594 xlog_cil_iovec_space(lv->lv_niovecs); 595 } else { 596 /* switch to shadow buffer! */ 597 lv = shadow; 598 lv->lv_item = lip; 599 } 600 601 lfb.lv = lv; 602 ASSERT(IS_ALIGNED((unsigned long)lv->lv_buf, sizeof(uint64_t))); 603 lip->li_ops->iop_format(lip, &lfb); 604 xfs_cil_prepare_item(log, lip, lv, diff_len); 605 } 606 } 607 608 /* 609 * The use of lockless waitqueue_active() requires that the caller has 610 * serialised itself against the wakeup call in xlog_cil_push_work(). That 611 * can be done by either holding the push lock or the context lock. 612 */ 613 static inline bool 614 xlog_cil_over_hard_limit( 615 struct xlog *log, 616 int32_t space_used) 617 { 618 if (waitqueue_active(&log->l_cilp->xc_push_wait)) 619 return true; 620 if (space_used >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log)) 621 return true; 622 return false; 623 } 624 625 /* 626 * Insert the log items into the CIL and calculate the difference in space 627 * consumed by the item. Add the space to the checkpoint ticket and calculate 628 * if the change requires additional log metadata. If it does, take that space 629 * as well. Remove the amount of space we added to the checkpoint ticket from 630 * the current transaction ticket so that the accounting works out correctly. 631 */ 632 static void 633 xlog_cil_insert_items( 634 struct xlog *log, 635 struct xfs_trans *tp, 636 uint32_t released_space) 637 { 638 struct xfs_cil *cil = log->l_cilp; 639 struct xfs_cil_ctx *ctx = cil->xc_ctx; 640 struct xfs_log_item *lip; 641 int len = 0; 642 int iovhdr_res = 0, split_res = 0, ctx_res = 0; 643 int space_used; 644 int order; 645 unsigned int cpu_nr; 646 struct xlog_cil_pcp *cilpcp; 647 648 ASSERT(tp); 649 650 /* 651 * We can do this safely because the context can't checkpoint until we 652 * are done so it doesn't matter exactly how we update the CIL. 653 */ 654 xlog_cil_insert_format_items(log, tp, &len); 655 656 /* 657 * Subtract the space released by intent cancelation from the space we 658 * consumed so that we remove it from the CIL space and add it back to 659 * the current transaction reservation context. 660 */ 661 len -= released_space; 662 663 /* 664 * Grab the per-cpu pointer for the CIL before we start any accounting. 665 * That ensures that we are running with pre-emption disabled and so we 666 * can't be scheduled away between split sample/update operations that 667 * are done without outside locking to serialise them. 668 */ 669 cpu_nr = get_cpu(); 670 cilpcp = this_cpu_ptr(cil->xc_pcp); 671 672 /* Tell the future push that there was work added by this CPU. */ 673 if (!cpumask_test_cpu(cpu_nr, &ctx->cil_pcpmask)) 674 cpumask_test_and_set_cpu(cpu_nr, &ctx->cil_pcpmask); 675 676 /* 677 * We need to take the CIL checkpoint unit reservation on the first 678 * commit into the CIL. Test the XLOG_CIL_EMPTY bit first so we don't 679 * unnecessarily do an atomic op in the fast path here. We can clear the 680 * XLOG_CIL_EMPTY bit as we are under the xc_ctx_lock here and that 681 * needs to be held exclusively to reset the XLOG_CIL_EMPTY bit. 682 */ 683 if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags) && 684 test_and_clear_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) 685 ctx_res = ctx->ticket->t_unit_res; 686 687 /* 688 * Check if we need to steal iclog headers. atomic_read() is not a 689 * locked atomic operation, so we can check the value before we do any 690 * real atomic ops in the fast path. If we've already taken the CIL unit 691 * reservation from this commit, we've already got one iclog header 692 * space reserved so we have to account for that otherwise we risk 693 * overrunning the reservation on this ticket. 694 * 695 * If the CIL is already at the hard limit, we might need more header 696 * space that originally reserved. So steal more header space from every 697 * commit that occurs once we are over the hard limit to ensure the CIL 698 * push won't run out of reservation space. 699 * 700 * This can steal more than we need, but that's OK. 701 * 702 * The cil->xc_ctx_lock provides the serialisation necessary for safely 703 * calling xlog_cil_over_hard_limit() in this context. 704 */ 705 space_used = atomic_read(&ctx->space_used) + cilpcp->space_used + len; 706 if (atomic_read(&cil->xc_iclog_hdrs) > 0 || 707 xlog_cil_over_hard_limit(log, space_used)) { 708 split_res = log->l_iclog_hsize + 709 sizeof(struct xlog_op_header); 710 if (ctx_res) 711 ctx_res += split_res * (tp->t_ticket->t_iclog_hdrs - 1); 712 else 713 ctx_res = split_res * tp->t_ticket->t_iclog_hdrs; 714 atomic_sub(tp->t_ticket->t_iclog_hdrs, &cil->xc_iclog_hdrs); 715 } 716 cilpcp->space_reserved += ctx_res; 717 718 /* 719 * Accurately account when over the soft limit, otherwise fold the 720 * percpu count into the global count if over the per-cpu threshold. 721 */ 722 if (!test_bit(XLOG_CIL_PCP_SPACE, &cil->xc_flags)) { 723 atomic_add(len, &ctx->space_used); 724 } else if (cilpcp->space_used + len > 725 (XLOG_CIL_SPACE_LIMIT(log) / num_online_cpus())) { 726 space_used = atomic_add_return(cilpcp->space_used + len, 727 &ctx->space_used); 728 cilpcp->space_used = 0; 729 730 /* 731 * If we just transitioned over the soft limit, we need to 732 * transition to the global atomic counter. 733 */ 734 if (space_used >= XLOG_CIL_SPACE_LIMIT(log)) 735 xlog_cil_insert_pcp_aggregate(cil, ctx); 736 } else { 737 cilpcp->space_used += len; 738 } 739 /* attach the transaction to the CIL if it has any busy extents */ 740 if (!list_empty(&tp->t_busy)) 741 list_splice_init(&tp->t_busy, &cilpcp->busy_extents); 742 743 /* 744 * Now update the order of everything modified in the transaction 745 * and insert items into the CIL if they aren't already there. 746 * We do this here so we only need to take the CIL lock once during 747 * the transaction commit. 748 */ 749 order = atomic_inc_return(&ctx->order_id); 750 list_for_each_entry(lip, &tp->t_items, li_trans) { 751 /* Skip items which aren't dirty in this transaction. */ 752 if (!test_bit(XFS_LI_DIRTY, &lip->li_flags)) 753 continue; 754 755 lip->li_order_id = order; 756 if (!list_empty(&lip->li_cil)) 757 continue; 758 list_add_tail(&lip->li_cil, &cilpcp->log_items); 759 } 760 put_cpu(); 761 762 /* 763 * If we've overrun the reservation, dump the tx details before we move 764 * the log items. Shutdown is imminent... 765 */ 766 tp->t_ticket->t_curr_res -= ctx_res + len; 767 if (WARN_ON(tp->t_ticket->t_curr_res < 0)) { 768 xfs_warn(log->l_mp, "Transaction log reservation overrun:"); 769 xfs_warn(log->l_mp, 770 " log items: %d bytes (iov hdrs: %d bytes)", 771 len, iovhdr_res); 772 xfs_warn(log->l_mp, " split region headers: %d bytes", 773 split_res); 774 xfs_warn(log->l_mp, " ctx ticket: %d bytes", ctx_res); 775 xlog_print_trans(tp); 776 xlog_force_shutdown(log, SHUTDOWN_LOG_IO_ERROR); 777 } 778 } 779 780 static inline void 781 xlog_cil_ail_insert_batch( 782 struct xfs_ail *ailp, 783 struct xfs_ail_cursor *cur, 784 struct xfs_log_item **log_items, 785 int nr_items, 786 xfs_lsn_t commit_lsn) 787 { 788 int i; 789 790 spin_lock(&ailp->ail_lock); 791 /* xfs_trans_ail_update_bulk drops ailp->ail_lock */ 792 xfs_trans_ail_update_bulk(ailp, cur, log_items, nr_items, commit_lsn); 793 794 for (i = 0; i < nr_items; i++) { 795 struct xfs_log_item *lip = log_items[i]; 796 797 if (lip->li_ops->iop_unpin) 798 lip->li_ops->iop_unpin(lip, 0); 799 } 800 } 801 802 /* 803 * Take the checkpoint's log vector chain of items and insert the attached log 804 * items into the AIL. This uses bulk insertion techniques to minimise AIL lock 805 * traffic. 806 * 807 * The AIL tracks log items via the start record LSN of the checkpoint, 808 * not the commit record LSN. This is because we can pipeline multiple 809 * checkpoints, and so the start record of checkpoint N+1 can be 810 * written before the commit record of checkpoint N. i.e: 811 * 812 * start N commit N 813 * +-------------+------------+----------------+ 814 * start N+1 commit N+1 815 * 816 * The tail of the log cannot be moved to the LSN of commit N when all 817 * the items of that checkpoint are written back, because then the 818 * start record for N+1 is no longer in the active portion of the log 819 * and recovery will fail/corrupt the filesystem. 820 * 821 * Hence when all the log items in checkpoint N are written back, the 822 * tail of the log most now only move as far forwards as the start LSN 823 * of checkpoint N+1. 824 * 825 * If we are called with the aborted flag set, it is because a log write during 826 * a CIL checkpoint commit has failed. In this case, all the items in the 827 * checkpoint have already gone through iop_committed and iop_committing, which 828 * means that checkpoint commit abort handling is treated exactly the same as an 829 * iclog write error even though we haven't started any IO yet. Hence in this 830 * case all we need to do is iop_committed processing, followed by an 831 * iop_unpin(aborted) call. 832 * 833 * The AIL cursor is used to optimise the insert process. If commit_lsn is not 834 * at the end of the AIL, the insert cursor avoids the need to walk the AIL to 835 * find the insertion point on every xfs_log_item_batch_insert() call. This 836 * saves a lot of needless list walking and is a net win, even though it 837 * slightly increases that amount of AIL lock traffic to set it up and tear it 838 * down. 839 */ 840 static void 841 xlog_cil_ail_insert( 842 struct xfs_cil_ctx *ctx, 843 bool aborted) 844 { 845 #define LOG_ITEM_BATCH_SIZE 32 846 struct xfs_ail *ailp = ctx->cil->xc_log->l_ailp; 847 struct xfs_log_item *log_items[LOG_ITEM_BATCH_SIZE]; 848 struct xfs_log_vec *lv; 849 struct xfs_ail_cursor cur; 850 xfs_lsn_t old_head; 851 int i = 0; 852 853 /* 854 * Update the AIL head LSN with the commit record LSN of this 855 * checkpoint. As iclogs are always completed in order, this should 856 * always be the same (as iclogs can contain multiple commit records) or 857 * higher LSN than the current head. We do this before insertion of the 858 * items so that log space checks during insertion will reflect the 859 * space that this checkpoint has already consumed. We call 860 * xfs_ail_update_finish() so that tail space and space-based wakeups 861 * will be recalculated appropriately. 862 */ 863 ASSERT(XFS_LSN_CMP(ctx->commit_lsn, ailp->ail_head_lsn) >= 0 || 864 aborted); 865 spin_lock(&ailp->ail_lock); 866 xfs_trans_ail_cursor_last(ailp, &cur, ctx->start_lsn); 867 old_head = ailp->ail_head_lsn; 868 ailp->ail_head_lsn = ctx->commit_lsn; 869 /* xfs_ail_update_finish() drops the ail_lock */ 870 xfs_ail_update_finish(ailp, NULLCOMMITLSN); 871 872 /* 873 * We move the AIL head forwards to account for the space used in the 874 * log before we remove that space from the grant heads. This prevents a 875 * transient condition where reservation space appears to become 876 * available on return, only for it to disappear again immediately as 877 * the AIL head update accounts in the log tail space. 878 */ 879 smp_wmb(); /* paired with smp_rmb in xlog_grant_space_left */ 880 xlog_grant_return_space(ailp->ail_log, old_head, ailp->ail_head_lsn); 881 882 /* unpin all the log items */ 883 list_for_each_entry(lv, &ctx->lv_chain, lv_list) { 884 struct xfs_log_item *lip = lv->lv_item; 885 xfs_lsn_t item_lsn; 886 887 if (aborted) { 888 trace_xlog_ail_insert_abort(lip); 889 set_bit(XFS_LI_ABORTED, &lip->li_flags); 890 } 891 892 if (lip->li_ops->flags & XFS_ITEM_RELEASE_WHEN_COMMITTED) { 893 lip->li_ops->iop_release(lip); 894 continue; 895 } 896 897 if (lip->li_ops->iop_committed) 898 item_lsn = lip->li_ops->iop_committed(lip, 899 ctx->start_lsn); 900 else 901 item_lsn = ctx->start_lsn; 902 903 /* item_lsn of -1 means the item needs no further processing */ 904 if (XFS_LSN_CMP(item_lsn, (xfs_lsn_t)-1) == 0) 905 continue; 906 907 /* 908 * if we are aborting the operation, no point in inserting the 909 * object into the AIL as we are in a shutdown situation. 910 */ 911 if (aborted) { 912 ASSERT(xlog_is_shutdown(ailp->ail_log)); 913 if (lip->li_ops->iop_unpin) 914 lip->li_ops->iop_unpin(lip, 1); 915 continue; 916 } 917 918 if (item_lsn != ctx->start_lsn) { 919 920 /* 921 * Not a bulk update option due to unusual item_lsn. 922 * Push into AIL immediately, rechecking the lsn once 923 * we have the ail lock. Then unpin the item. This does 924 * not affect the AIL cursor the bulk insert path is 925 * using. 926 */ 927 spin_lock(&ailp->ail_lock); 928 if (XFS_LSN_CMP(item_lsn, lip->li_lsn) > 0) 929 xfs_trans_ail_update(ailp, lip, item_lsn); 930 else 931 spin_unlock(&ailp->ail_lock); 932 if (lip->li_ops->iop_unpin) 933 lip->li_ops->iop_unpin(lip, 0); 934 continue; 935 } 936 937 /* Item is a candidate for bulk AIL insert. */ 938 log_items[i++] = lv->lv_item; 939 if (i >= LOG_ITEM_BATCH_SIZE) { 940 xlog_cil_ail_insert_batch(ailp, &cur, log_items, 941 LOG_ITEM_BATCH_SIZE, ctx->start_lsn); 942 i = 0; 943 } 944 } 945 946 /* make sure we insert the remainder! */ 947 if (i) 948 xlog_cil_ail_insert_batch(ailp, &cur, log_items, i, 949 ctx->start_lsn); 950 951 spin_lock(&ailp->ail_lock); 952 xfs_trans_ail_cursor_done(&cur); 953 spin_unlock(&ailp->ail_lock); 954 } 955 956 static void 957 xlog_cil_free_logvec( 958 struct list_head *lv_chain) 959 { 960 struct xfs_log_vec *lv; 961 962 while (!list_empty(lv_chain)) { 963 lv = list_first_entry(lv_chain, struct xfs_log_vec, lv_list); 964 list_del_init(&lv->lv_list); 965 kvfree(lv); 966 } 967 } 968 969 /* 970 * Mark all items committed and clear busy extents. We free the log vector 971 * chains in a separate pass so that we unpin the log items as quickly as 972 * possible. 973 */ 974 static void 975 xlog_cil_committed( 976 struct xfs_cil_ctx *ctx) 977 { 978 struct xfs_mount *mp = ctx->cil->xc_log->l_mp; 979 bool abort = xlog_is_shutdown(ctx->cil->xc_log); 980 981 /* 982 * If the I/O failed, we're aborting the commit and already shutdown. 983 * Wake any commit waiters before aborting the log items so we don't 984 * block async log pushers on callbacks. Async log pushers explicitly do 985 * not wait on log force completion because they may be holding locks 986 * required to unpin items. 987 */ 988 if (abort) { 989 spin_lock(&ctx->cil->xc_push_lock); 990 wake_up_all(&ctx->cil->xc_start_wait); 991 wake_up_all(&ctx->cil->xc_commit_wait); 992 spin_unlock(&ctx->cil->xc_push_lock); 993 } 994 995 xlog_cil_ail_insert(ctx, abort); 996 997 xfs_extent_busy_sort(&ctx->busy_extents.extent_list); 998 xfs_extent_busy_clear(&ctx->busy_extents.extent_list, 999 xfs_has_discard(mp) && !abort); 1000 1001 spin_lock(&ctx->cil->xc_push_lock); 1002 list_del(&ctx->committing); 1003 spin_unlock(&ctx->cil->xc_push_lock); 1004 1005 xlog_cil_free_logvec(&ctx->lv_chain); 1006 1007 if (!list_empty(&ctx->busy_extents.extent_list)) { 1008 ctx->busy_extents.owner = ctx; 1009 xfs_discard_extents(mp, &ctx->busy_extents); 1010 return; 1011 } 1012 1013 kfree(ctx); 1014 } 1015 1016 void 1017 xlog_cil_process_committed( 1018 struct list_head *list) 1019 { 1020 struct xfs_cil_ctx *ctx; 1021 1022 while ((ctx = list_first_entry_or_null(list, 1023 struct xfs_cil_ctx, iclog_entry))) { 1024 list_del(&ctx->iclog_entry); 1025 xlog_cil_committed(ctx); 1026 } 1027 } 1028 1029 /* 1030 * Record the LSN of the iclog we were just granted space to start writing into. 1031 * If the context doesn't have a start_lsn recorded, then this iclog will 1032 * contain the start record for the checkpoint. Otherwise this write contains 1033 * the commit record for the checkpoint. 1034 */ 1035 void 1036 xlog_cil_set_ctx_write_state( 1037 struct xfs_cil_ctx *ctx, 1038 struct xlog_in_core *iclog) 1039 { 1040 struct xfs_cil *cil = ctx->cil; 1041 xfs_lsn_t lsn = be64_to_cpu(iclog->ic_header->h_lsn); 1042 1043 ASSERT(!ctx->commit_lsn); 1044 if (!ctx->start_lsn) { 1045 spin_lock(&cil->xc_push_lock); 1046 /* 1047 * The LSN we need to pass to the log items on transaction 1048 * commit is the LSN reported by the first log vector write, not 1049 * the commit lsn. If we use the commit record lsn then we can 1050 * move the grant write head beyond the tail LSN and overwrite 1051 * it. 1052 */ 1053 ctx->start_lsn = lsn; 1054 wake_up_all(&cil->xc_start_wait); 1055 spin_unlock(&cil->xc_push_lock); 1056 1057 /* 1058 * Make sure the metadata we are about to overwrite in the log 1059 * has been flushed to stable storage before this iclog is 1060 * issued. 1061 */ 1062 spin_lock(&cil->xc_log->l_icloglock); 1063 iclog->ic_flags |= XLOG_ICL_NEED_FLUSH; 1064 spin_unlock(&cil->xc_log->l_icloglock); 1065 return; 1066 } 1067 1068 /* 1069 * Take a reference to the iclog for the context so that we still hold 1070 * it when xlog_write is done and has released it. This means the 1071 * context controls when the iclog is released for IO. 1072 */ 1073 atomic_inc(&iclog->ic_refcnt); 1074 1075 /* 1076 * xlog_state_get_iclog_space() guarantees there is enough space in the 1077 * iclog for an entire commit record, so we can attach the context 1078 * callbacks now. This needs to be done before we make the commit_lsn 1079 * visible to waiters so that checkpoints with commit records in the 1080 * same iclog order their IO completion callbacks in the same order that 1081 * the commit records appear in the iclog. 1082 */ 1083 spin_lock(&cil->xc_log->l_icloglock); 1084 list_add_tail(&ctx->iclog_entry, &iclog->ic_callbacks); 1085 spin_unlock(&cil->xc_log->l_icloglock); 1086 1087 /* 1088 * Now we can record the commit LSN and wake anyone waiting for this 1089 * sequence to have the ordered commit record assigned to a physical 1090 * location in the log. 1091 */ 1092 spin_lock(&cil->xc_push_lock); 1093 ctx->commit_iclog = iclog; 1094 ctx->commit_lsn = lsn; 1095 wake_up_all(&cil->xc_commit_wait); 1096 spin_unlock(&cil->xc_push_lock); 1097 } 1098 1099 1100 /* 1101 * Ensure that the order of log writes follows checkpoint sequence order. This 1102 * relies on the context LSN being zero until the log write has guaranteed the 1103 * LSN that the log write will start at via xlog_state_get_iclog_space(). 1104 */ 1105 enum _record_type { 1106 _START_RECORD, 1107 _COMMIT_RECORD, 1108 }; 1109 1110 static int 1111 xlog_cil_order_write( 1112 struct xfs_cil *cil, 1113 xfs_csn_t sequence, 1114 enum _record_type record) 1115 { 1116 struct xfs_cil_ctx *ctx; 1117 1118 restart: 1119 spin_lock(&cil->xc_push_lock); 1120 list_for_each_entry(ctx, &cil->xc_committing, committing) { 1121 /* 1122 * Avoid getting stuck in this loop because we were woken by the 1123 * shutdown, but then went back to sleep once already in the 1124 * shutdown state. 1125 */ 1126 if (xlog_is_shutdown(cil->xc_log)) { 1127 spin_unlock(&cil->xc_push_lock); 1128 return -EIO; 1129 } 1130 1131 /* 1132 * Higher sequences will wait for this one so skip them. 1133 * Don't wait for our own sequence, either. 1134 */ 1135 if (ctx->sequence >= sequence) 1136 continue; 1137 1138 /* Wait until the LSN for the record has been recorded. */ 1139 switch (record) { 1140 case _START_RECORD: 1141 if (!ctx->start_lsn) { 1142 xlog_wait(&cil->xc_start_wait, &cil->xc_push_lock); 1143 goto restart; 1144 } 1145 break; 1146 case _COMMIT_RECORD: 1147 if (!ctx->commit_lsn) { 1148 xlog_wait(&cil->xc_commit_wait, &cil->xc_push_lock); 1149 goto restart; 1150 } 1151 break; 1152 } 1153 } 1154 spin_unlock(&cil->xc_push_lock); 1155 return 0; 1156 } 1157 1158 /* 1159 * Write out the log vector change now attached to the CIL context. This will 1160 * write a start record that needs to be strictly ordered in ascending CIL 1161 * sequence order so that log recovery will always use in-order start LSNs when 1162 * replaying checkpoints. 1163 */ 1164 static int 1165 xlog_cil_write_chain( 1166 struct xfs_cil_ctx *ctx, 1167 uint32_t chain_len) 1168 { 1169 struct xlog *log = ctx->cil->xc_log; 1170 int error; 1171 1172 error = xlog_cil_order_write(ctx->cil, ctx->sequence, _START_RECORD); 1173 if (error) 1174 return error; 1175 return xlog_write(log, ctx, &ctx->lv_chain, ctx->ticket, chain_len); 1176 } 1177 1178 /* 1179 * Write out the commit record of a checkpoint transaction to close off a 1180 * running log write. These commit records are strictly ordered in ascending CIL 1181 * sequence order so that log recovery will always replay the checkpoints in the 1182 * correct order. 1183 */ 1184 static int 1185 xlog_cil_write_commit_record( 1186 struct xfs_cil_ctx *ctx) 1187 { 1188 struct xlog *log = ctx->cil->xc_log; 1189 struct xlog_op_header ophdr = { 1190 .oh_clientid = XFS_TRANSACTION, 1191 .oh_tid = cpu_to_be32(ctx->ticket->t_tid), 1192 .oh_flags = XLOG_COMMIT_TRANS, 1193 }; 1194 struct xfs_log_iovec reg = { 1195 .i_addr = &ophdr, 1196 .i_len = sizeof(struct xlog_op_header), 1197 .i_type = XLOG_REG_TYPE_COMMIT, 1198 }; 1199 int error; 1200 1201 if (xlog_is_shutdown(log)) 1202 return -EIO; 1203 1204 error = xlog_cil_order_write(ctx->cil, ctx->sequence, _COMMIT_RECORD); 1205 if (error) 1206 return error; 1207 error = xlog_write_one_vec(log, ctx, ®, ctx->ticket); 1208 if (error) 1209 xlog_force_shutdown(log, SHUTDOWN_LOG_IO_ERROR); 1210 return error; 1211 } 1212 1213 struct xlog_cil_trans_hdr { 1214 struct xlog_op_header oph[2]; 1215 struct xfs_trans_header thdr; 1216 struct xfs_log_iovec lhdr[2]; 1217 }; 1218 1219 /* 1220 * Build a checkpoint transaction header to begin the journal transaction. We 1221 * need to account for the space used by the transaction header here as it is 1222 * not accounted for in xlog_write(). 1223 * 1224 * This is the only place we write a transaction header, so we also build the 1225 * log opheaders that indicate the start of a log transaction and wrap the 1226 * transaction header. We keep the start record in it's own log vector rather 1227 * than compacting them into a single region as this ends up making the logic 1228 * in xlog_write() for handling empty opheaders for start, commit and unmount 1229 * records much simpler. 1230 */ 1231 static void 1232 xlog_cil_build_trans_hdr( 1233 struct xfs_cil_ctx *ctx, 1234 struct xlog_cil_trans_hdr *hdr, 1235 struct xfs_log_vec *lvhdr, 1236 int num_iovecs) 1237 { 1238 struct xlog_ticket *tic = ctx->ticket; 1239 __be32 tid = cpu_to_be32(tic->t_tid); 1240 1241 memset(hdr, 0, sizeof(*hdr)); 1242 1243 /* Log start record */ 1244 hdr->oph[0].oh_tid = tid; 1245 hdr->oph[0].oh_clientid = XFS_TRANSACTION; 1246 hdr->oph[0].oh_flags = XLOG_START_TRANS; 1247 1248 /* log iovec region pointer */ 1249 hdr->lhdr[0].i_addr = &hdr->oph[0]; 1250 hdr->lhdr[0].i_len = sizeof(struct xlog_op_header); 1251 hdr->lhdr[0].i_type = XLOG_REG_TYPE_LRHEADER; 1252 1253 /* log opheader */ 1254 hdr->oph[1].oh_tid = tid; 1255 hdr->oph[1].oh_clientid = XFS_TRANSACTION; 1256 hdr->oph[1].oh_len = cpu_to_be32(sizeof(struct xfs_trans_header)); 1257 1258 /* transaction header in host byte order format */ 1259 hdr->thdr.th_magic = XFS_TRANS_HEADER_MAGIC; 1260 hdr->thdr.th_type = XFS_TRANS_CHECKPOINT; 1261 hdr->thdr.th_tid = tic->t_tid; 1262 hdr->thdr.th_num_items = num_iovecs; 1263 1264 /* log iovec region pointer */ 1265 hdr->lhdr[1].i_addr = &hdr->oph[1]; 1266 hdr->lhdr[1].i_len = sizeof(struct xlog_op_header) + 1267 sizeof(struct xfs_trans_header); 1268 hdr->lhdr[1].i_type = XLOG_REG_TYPE_TRANSHDR; 1269 1270 lvhdr->lv_niovecs = 2; 1271 lvhdr->lv_iovecp = &hdr->lhdr[0]; 1272 lvhdr->lv_bytes = hdr->lhdr[0].i_len + hdr->lhdr[1].i_len; 1273 1274 tic->t_curr_res -= lvhdr->lv_bytes; 1275 } 1276 1277 /* 1278 * CIL item reordering compare function. We want to order in ascending ID order, 1279 * but we want to leave items with the same ID in the order they were added to 1280 * the list. This is important for operations like reflink where we log 4 order 1281 * dependent intents in a single transaction when we overwrite an existing 1282 * shared extent with a new shared extent. i.e. BUI(unmap), CUI(drop), 1283 * CUI (inc), BUI(remap)... 1284 */ 1285 static int 1286 xlog_cil_order_cmp( 1287 void *priv, 1288 const struct list_head *a, 1289 const struct list_head *b) 1290 { 1291 struct xfs_log_vec *l1 = container_of(a, struct xfs_log_vec, lv_list); 1292 struct xfs_log_vec *l2 = container_of(b, struct xfs_log_vec, lv_list); 1293 1294 return l1->lv_order_id > l2->lv_order_id; 1295 } 1296 1297 /* 1298 * Pull all the log vectors off the items in the CIL, and remove the items from 1299 * the CIL. We don't need the CIL lock here because it's only needed on the 1300 * transaction commit side which is currently locked out by the flush lock. 1301 * 1302 * If a log item is marked with a whiteout, we do not need to write it to the 1303 * journal and so we just move them to the whiteout list for the caller to 1304 * dispose of appropriately. 1305 */ 1306 static void 1307 xlog_cil_build_lv_chain( 1308 struct xfs_cil_ctx *ctx, 1309 struct list_head *whiteouts, 1310 uint32_t *num_iovecs, 1311 uint32_t *num_bytes) 1312 { 1313 while (!list_empty(&ctx->log_items)) { 1314 struct xfs_log_item *item; 1315 struct xfs_log_vec *lv; 1316 1317 item = list_first_entry(&ctx->log_items, 1318 struct xfs_log_item, li_cil); 1319 1320 if (test_bit(XFS_LI_WHITEOUT, &item->li_flags)) { 1321 list_move(&item->li_cil, whiteouts); 1322 trace_xfs_cil_whiteout_skip(item); 1323 continue; 1324 } 1325 1326 lv = item->li_lv; 1327 lv->lv_order_id = item->li_order_id; 1328 1329 /* we don't write ordered log vectors */ 1330 if (lv->lv_buf_used != XFS_LOG_VEC_ORDERED) 1331 *num_bytes += lv->lv_bytes; 1332 *num_iovecs += lv->lv_niovecs; 1333 list_add_tail(&lv->lv_list, &ctx->lv_chain); 1334 1335 list_del_init(&item->li_cil); 1336 item->li_order_id = 0; 1337 item->li_lv = NULL; 1338 } 1339 } 1340 1341 static void 1342 xlog_cil_cleanup_whiteouts( 1343 struct list_head *whiteouts) 1344 { 1345 while (!list_empty(whiteouts)) { 1346 struct xfs_log_item *item = list_first_entry(whiteouts, 1347 struct xfs_log_item, li_cil); 1348 list_del_init(&item->li_cil); 1349 trace_xfs_cil_whiteout_unpin(item); 1350 item->li_ops->iop_unpin(item, 1); 1351 } 1352 } 1353 1354 /* 1355 * Push the Committed Item List to the log. 1356 * 1357 * If the current sequence is the same as xc_push_seq we need to do a flush. If 1358 * xc_push_seq is less than the current sequence, then it has already been 1359 * flushed and we don't need to do anything - the caller will wait for it to 1360 * complete if necessary. 1361 * 1362 * xc_push_seq is checked unlocked against the sequence number for a match. 1363 * Hence we can allow log forces to run racily and not issue pushes for the 1364 * same sequence twice. If we get a race between multiple pushes for the same 1365 * sequence they will block on the first one and then abort, hence avoiding 1366 * needless pushes. 1367 * 1368 * This runs from a workqueue so it does not inherent any specific memory 1369 * allocation context. However, we do not want to block on memory reclaim 1370 * recursing back into the filesystem because this push may have been triggered 1371 * by memory reclaim itself. Hence we really need to run under full GFP_NOFS 1372 * contraints here. 1373 */ 1374 static void 1375 xlog_cil_push_work( 1376 struct work_struct *work) 1377 { 1378 unsigned int nofs_flags = memalloc_nofs_save(); 1379 struct xfs_cil_ctx *ctx = 1380 container_of(work, struct xfs_cil_ctx, push_work); 1381 struct xfs_cil *cil = ctx->cil; 1382 struct xlog *log = cil->xc_log; 1383 struct xfs_cil_ctx *new_ctx; 1384 int num_iovecs = 0; 1385 int num_bytes = 0; 1386 int error = 0; 1387 struct xlog_cil_trans_hdr thdr; 1388 struct xfs_log_vec lvhdr = {}; 1389 xfs_csn_t push_seq; 1390 bool push_commit_stable; 1391 LIST_HEAD (whiteouts); 1392 struct xlog_ticket *ticket; 1393 1394 new_ctx = xlog_cil_ctx_alloc(); 1395 new_ctx->ticket = xlog_cil_ticket_alloc(log); 1396 1397 down_write(&cil->xc_ctx_lock); 1398 1399 spin_lock(&cil->xc_push_lock); 1400 push_seq = cil->xc_push_seq; 1401 ASSERT(push_seq <= ctx->sequence); 1402 push_commit_stable = cil->xc_push_commit_stable; 1403 cil->xc_push_commit_stable = false; 1404 1405 /* 1406 * As we are about to switch to a new, empty CIL context, we no longer 1407 * need to throttle tasks on CIL space overruns. Wake any waiters that 1408 * the hard push throttle may have caught so they can start committing 1409 * to the new context. The ctx->xc_push_lock provides the serialisation 1410 * necessary for safely using the lockless waitqueue_active() check in 1411 * this context. 1412 */ 1413 if (waitqueue_active(&cil->xc_push_wait)) 1414 wake_up_all(&cil->xc_push_wait); 1415 1416 xlog_cil_push_pcp_aggregate(cil, ctx); 1417 1418 /* 1419 * Check if we've anything to push. If there is nothing, then we don't 1420 * move on to a new sequence number and so we have to be able to push 1421 * this sequence again later. 1422 */ 1423 if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) { 1424 cil->xc_push_seq = 0; 1425 spin_unlock(&cil->xc_push_lock); 1426 goto out_skip; 1427 } 1428 1429 1430 /* check for a previously pushed sequence */ 1431 if (push_seq < ctx->sequence) { 1432 spin_unlock(&cil->xc_push_lock); 1433 goto out_skip; 1434 } 1435 1436 /* 1437 * We are now going to push this context, so add it to the committing 1438 * list before we do anything else. This ensures that anyone waiting on 1439 * this push can easily detect the difference between a "push in 1440 * progress" and "CIL is empty, nothing to do". 1441 * 1442 * IOWs, a wait loop can now check for: 1443 * the current sequence not being found on the committing list; 1444 * an empty CIL; and 1445 * an unchanged sequence number 1446 * to detect a push that had nothing to do and therefore does not need 1447 * waiting on. If the CIL is not empty, we get put on the committing 1448 * list before emptying the CIL and bumping the sequence number. Hence 1449 * an empty CIL and an unchanged sequence number means we jumped out 1450 * above after doing nothing. 1451 * 1452 * Hence the waiter will either find the commit sequence on the 1453 * committing list or the sequence number will be unchanged and the CIL 1454 * still dirty. In that latter case, the push has not yet started, and 1455 * so the waiter will have to continue trying to check the CIL 1456 * committing list until it is found. In extreme cases of delay, the 1457 * sequence may fully commit between the attempts the wait makes to wait 1458 * on the commit sequence. 1459 */ 1460 list_add(&ctx->committing, &cil->xc_committing); 1461 spin_unlock(&cil->xc_push_lock); 1462 1463 xlog_cil_build_lv_chain(ctx, &whiteouts, &num_iovecs, &num_bytes); 1464 1465 /* 1466 * Switch the contexts so we can drop the context lock and move out 1467 * of a shared context. We can't just go straight to the commit record, 1468 * though - we need to synchronise with previous and future commits so 1469 * that the commit records are correctly ordered in the log to ensure 1470 * that we process items during log IO completion in the correct order. 1471 * 1472 * For example, if we get an EFI in one checkpoint and the EFD in the 1473 * next (e.g. due to log forces), we do not want the checkpoint with 1474 * the EFD to be committed before the checkpoint with the EFI. Hence 1475 * we must strictly order the commit records of the checkpoints so 1476 * that: a) the checkpoint callbacks are attached to the iclogs in the 1477 * correct order; and b) the checkpoints are replayed in correct order 1478 * in log recovery. 1479 * 1480 * Hence we need to add this context to the committing context list so 1481 * that higher sequences will wait for us to write out a commit record 1482 * before they do. 1483 * 1484 * xfs_log_force_seq requires us to mirror the new sequence into the cil 1485 * structure atomically with the addition of this sequence to the 1486 * committing list. This also ensures that we can do unlocked checks 1487 * against the current sequence in log forces without risking 1488 * deferencing a freed context pointer. 1489 */ 1490 spin_lock(&cil->xc_push_lock); 1491 xlog_cil_ctx_switch(cil, new_ctx); 1492 spin_unlock(&cil->xc_push_lock); 1493 up_write(&cil->xc_ctx_lock); 1494 1495 /* 1496 * Sort the log vector chain before we add the transaction headers. 1497 * This ensures we always have the transaction headers at the start 1498 * of the chain. 1499 */ 1500 list_sort(NULL, &ctx->lv_chain, xlog_cil_order_cmp); 1501 1502 /* 1503 * Build a checkpoint transaction header and write it to the log to 1504 * begin the transaction. We need to account for the space used by the 1505 * transaction header here as it is not accounted for in xlog_write(). 1506 * Add the lvhdr to the head of the lv chain we pass to xlog_write() so 1507 * it gets written into the iclog first. 1508 */ 1509 xlog_cil_build_trans_hdr(ctx, &thdr, &lvhdr, num_iovecs); 1510 num_bytes += lvhdr.lv_bytes; 1511 list_add(&lvhdr.lv_list, &ctx->lv_chain); 1512 1513 /* 1514 * Take the lvhdr back off the lv_chain immediately after calling 1515 * xlog_cil_write_chain() as it should not be passed to log IO 1516 * completion. 1517 */ 1518 error = xlog_cil_write_chain(ctx, num_bytes); 1519 list_del(&lvhdr.lv_list); 1520 if (error) 1521 goto out_abort_free_ticket; 1522 1523 error = xlog_cil_write_commit_record(ctx); 1524 if (error) 1525 goto out_abort_free_ticket; 1526 1527 /* 1528 * Grab the ticket from the ctx so we can ungrant it after releasing the 1529 * commit_iclog. The ctx may be freed by the time we return from 1530 * releasing the commit_iclog (i.e. checkpoint has been completed and 1531 * callback run) so we can't reference the ctx after the call to 1532 * xlog_state_release_iclog(). 1533 */ 1534 ticket = ctx->ticket; 1535 1536 /* 1537 * If the checkpoint spans multiple iclogs, wait for all previous iclogs 1538 * to complete before we submit the commit_iclog. We can't use state 1539 * checks for this - ACTIVE can be either a past completed iclog or a 1540 * future iclog being filled, while WANT_SYNC through SYNC_DONE can be a 1541 * past or future iclog awaiting IO or ordered IO completion to be run. 1542 * In the latter case, if it's a future iclog and we wait on it, the we 1543 * will hang because it won't get processed through to ic_force_wait 1544 * wakeup until this commit_iclog is written to disk. Hence we use the 1545 * iclog header lsn and compare it to the commit lsn to determine if we 1546 * need to wait on iclogs or not. 1547 */ 1548 spin_lock(&log->l_icloglock); 1549 if (ctx->start_lsn != ctx->commit_lsn) { 1550 xfs_lsn_t plsn = be64_to_cpu( 1551 ctx->commit_iclog->ic_prev->ic_header->h_lsn); 1552 1553 if (plsn && XFS_LSN_CMP(plsn, ctx->commit_lsn) < 0) { 1554 /* 1555 * Waiting on ic_force_wait orders the completion of 1556 * iclogs older than ic_prev. Hence we only need to wait 1557 * on the most recent older iclog here. 1558 */ 1559 xlog_wait_on_iclog(ctx->commit_iclog->ic_prev); 1560 spin_lock(&log->l_icloglock); 1561 } 1562 1563 /* 1564 * We need to issue a pre-flush so that the ordering for this 1565 * checkpoint is correctly preserved down to stable storage. 1566 */ 1567 ctx->commit_iclog->ic_flags |= XLOG_ICL_NEED_FLUSH; 1568 } 1569 1570 /* 1571 * The commit iclog must be written to stable storage to guarantee 1572 * journal IO vs metadata writeback IO is correctly ordered on stable 1573 * storage. 1574 * 1575 * If the push caller needs the commit to be immediately stable and the 1576 * commit_iclog is not yet marked as XLOG_STATE_WANT_SYNC to indicate it 1577 * will be written when released, switch it's state to WANT_SYNC right 1578 * now. 1579 */ 1580 ctx->commit_iclog->ic_flags |= XLOG_ICL_NEED_FUA; 1581 if (push_commit_stable && 1582 ctx->commit_iclog->ic_state == XLOG_STATE_ACTIVE) 1583 xlog_state_switch_iclogs(log, ctx->commit_iclog, 0); 1584 ticket = ctx->ticket; 1585 xlog_state_release_iclog(log, ctx->commit_iclog, ticket); 1586 1587 /* Not safe to reference ctx now! */ 1588 1589 spin_unlock(&log->l_icloglock); 1590 xlog_cil_cleanup_whiteouts(&whiteouts); 1591 xfs_log_ticket_ungrant(log, ticket); 1592 memalloc_nofs_restore(nofs_flags); 1593 return; 1594 1595 out_skip: 1596 up_write(&cil->xc_ctx_lock); 1597 xfs_log_ticket_put(new_ctx->ticket); 1598 kfree(new_ctx); 1599 memalloc_nofs_restore(nofs_flags); 1600 return; 1601 1602 out_abort_free_ticket: 1603 ASSERT(xlog_is_shutdown(log)); 1604 xlog_cil_cleanup_whiteouts(&whiteouts); 1605 if (!ctx->commit_iclog) { 1606 xfs_log_ticket_ungrant(log, ctx->ticket); 1607 xlog_cil_committed(ctx); 1608 memalloc_nofs_restore(nofs_flags); 1609 return; 1610 } 1611 spin_lock(&log->l_icloglock); 1612 ticket = ctx->ticket; 1613 xlog_state_release_iclog(log, ctx->commit_iclog, ticket); 1614 /* Not safe to reference ctx now! */ 1615 spin_unlock(&log->l_icloglock); 1616 xfs_log_ticket_ungrant(log, ticket); 1617 memalloc_nofs_restore(nofs_flags); 1618 } 1619 1620 /* 1621 * We need to push CIL every so often so we don't cache more than we can fit in 1622 * the log. The limit really is that a checkpoint can't be more than half the 1623 * log (the current checkpoint is not allowed to overwrite the previous 1624 * checkpoint), but commit latency and memory usage limit this to a smaller 1625 * size. 1626 */ 1627 static void 1628 xlog_cil_push_background( 1629 struct xlog *log) 1630 { 1631 struct xfs_cil *cil = log->l_cilp; 1632 int space_used = atomic_read(&cil->xc_ctx->space_used); 1633 1634 /* 1635 * The cil won't be empty because we are called while holding the 1636 * context lock so whatever we added to the CIL will still be there. 1637 */ 1638 ASSERT(!test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)); 1639 1640 /* 1641 * We are done if: 1642 * - we haven't used up all the space available yet; or 1643 * - we've already queued up a push; and 1644 * - we're not over the hard limit; and 1645 * - nothing has been over the hard limit. 1646 * 1647 * If so, we don't need to take the push lock as there's nothing to do. 1648 */ 1649 if (space_used < XLOG_CIL_SPACE_LIMIT(log) || 1650 (cil->xc_push_seq == cil->xc_current_sequence && 1651 space_used < XLOG_CIL_BLOCKING_SPACE_LIMIT(log) && 1652 !waitqueue_active(&cil->xc_push_wait))) { 1653 up_read(&cil->xc_ctx_lock); 1654 return; 1655 } 1656 1657 spin_lock(&cil->xc_push_lock); 1658 if (cil->xc_push_seq < cil->xc_current_sequence) { 1659 cil->xc_push_seq = cil->xc_current_sequence; 1660 queue_work(cil->xc_push_wq, &cil->xc_ctx->push_work); 1661 } 1662 1663 /* 1664 * Drop the context lock now, we can't hold that if we need to sleep 1665 * because we are over the blocking threshold. The push_lock is still 1666 * held, so blocking threshold sleep/wakeup is still correctly 1667 * serialised here. 1668 */ 1669 up_read(&cil->xc_ctx_lock); 1670 1671 /* 1672 * If we are well over the space limit, throttle the work that is being 1673 * done until the push work on this context has begun. Enforce the hard 1674 * throttle on all transaction commits once it has been activated, even 1675 * if the committing transactions have resulted in the space usage 1676 * dipping back down under the hard limit. 1677 * 1678 * The ctx->xc_push_lock provides the serialisation necessary for safely 1679 * calling xlog_cil_over_hard_limit() in this context. 1680 */ 1681 if (xlog_cil_over_hard_limit(log, space_used)) { 1682 trace_xfs_log_cil_wait(log, cil->xc_ctx->ticket); 1683 ASSERT(space_used < log->l_logsize); 1684 xlog_wait(&cil->xc_push_wait, &cil->xc_push_lock); 1685 return; 1686 } 1687 1688 spin_unlock(&cil->xc_push_lock); 1689 1690 } 1691 1692 /* 1693 * xlog_cil_push_now() is used to trigger an immediate CIL push to the sequence 1694 * number that is passed. When it returns, the work will be queued for 1695 * @push_seq, but it won't be completed. 1696 * 1697 * If the caller is performing a synchronous force, we will flush the workqueue 1698 * to get previously queued work moving to minimise the wait time they will 1699 * undergo waiting for all outstanding pushes to complete. The caller is 1700 * expected to do the required waiting for push_seq to complete. 1701 * 1702 * If the caller is performing an async push, we need to ensure that the 1703 * checkpoint is fully flushed out of the iclogs when we finish the push. If we 1704 * don't do this, then the commit record may remain sitting in memory in an 1705 * ACTIVE iclog. This then requires another full log force to push to disk, 1706 * which defeats the purpose of having an async, non-blocking CIL force 1707 * mechanism. Hence in this case we need to pass a flag to the push work to 1708 * indicate it needs to flush the commit record itself. 1709 */ 1710 static void 1711 xlog_cil_push_now( 1712 struct xlog *log, 1713 xfs_lsn_t push_seq, 1714 bool async) 1715 { 1716 struct xfs_cil *cil = log->l_cilp; 1717 1718 if (!cil) 1719 return; 1720 1721 ASSERT(push_seq && push_seq <= cil->xc_current_sequence); 1722 1723 /* start on any pending background push to minimise wait time on it */ 1724 if (!async) 1725 flush_workqueue(cil->xc_push_wq); 1726 1727 spin_lock(&cil->xc_push_lock); 1728 1729 /* 1730 * If this is an async flush request, we always need to set the 1731 * xc_push_commit_stable flag even if something else has already queued 1732 * a push. The flush caller is asking for the CIL to be on stable 1733 * storage when the next push completes, so regardless of who has queued 1734 * the push, the flush requires stable semantics from it. 1735 */ 1736 cil->xc_push_commit_stable = async; 1737 1738 /* 1739 * If the CIL is empty or we've already pushed the sequence then 1740 * there's no more work that we need to do. 1741 */ 1742 if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags) || 1743 push_seq <= cil->xc_push_seq) { 1744 spin_unlock(&cil->xc_push_lock); 1745 return; 1746 } 1747 1748 cil->xc_push_seq = push_seq; 1749 queue_work(cil->xc_push_wq, &cil->xc_ctx->push_work); 1750 spin_unlock(&cil->xc_push_lock); 1751 } 1752 1753 bool 1754 xlog_cil_empty( 1755 struct xlog *log) 1756 { 1757 struct xfs_cil *cil = log->l_cilp; 1758 bool empty = false; 1759 1760 spin_lock(&cil->xc_push_lock); 1761 if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) 1762 empty = true; 1763 spin_unlock(&cil->xc_push_lock); 1764 return empty; 1765 } 1766 1767 /* 1768 * If there are intent done items in this transaction and the related intent was 1769 * committed in the current (same) CIL checkpoint, we don't need to write either 1770 * the intent or intent done item to the journal as the change will be 1771 * journalled atomically within this checkpoint. As we cannot remove items from 1772 * the CIL here, mark the related intent with a whiteout so that the CIL push 1773 * can remove it rather than writing it to the journal. Then remove the intent 1774 * done item from the current transaction and release it so it doesn't get put 1775 * into the CIL at all. 1776 */ 1777 static uint32_t 1778 xlog_cil_process_intents( 1779 struct xfs_cil *cil, 1780 struct xfs_trans *tp) 1781 { 1782 struct xfs_log_item *lip, *ilip, *next; 1783 uint32_t len = 0; 1784 1785 list_for_each_entry_safe(lip, next, &tp->t_items, li_trans) { 1786 if (!(lip->li_ops->flags & XFS_ITEM_INTENT_DONE)) 1787 continue; 1788 1789 ilip = lip->li_ops->iop_intent(lip); 1790 if (!ilip || !xlog_item_in_current_chkpt(cil, ilip)) 1791 continue; 1792 set_bit(XFS_LI_WHITEOUT, &ilip->li_flags); 1793 trace_xfs_cil_whiteout_mark(ilip); 1794 len += ilip->li_lv->lv_bytes; 1795 kvfree(ilip->li_lv); 1796 ilip->li_lv = NULL; 1797 1798 xfs_trans_del_item(lip); 1799 lip->li_ops->iop_release(lip); 1800 } 1801 return len; 1802 } 1803 1804 /* 1805 * Commit a transaction with the given vector to the Committed Item List. 1806 * 1807 * To do this, we need to format the item, pin it in memory if required and 1808 * account for the space used by the transaction. Once we have done that we 1809 * need to release the unused reservation for the transaction, attach the 1810 * transaction to the checkpoint context so we carry the busy extents through 1811 * to checkpoint completion, and then unlock all the items in the transaction. 1812 * 1813 * Called with the context lock already held in read mode to lock out 1814 * background commit, returns without it held once background commits are 1815 * allowed again. 1816 */ 1817 void 1818 xlog_cil_commit( 1819 struct xlog *log, 1820 struct xfs_trans *tp, 1821 xfs_csn_t *commit_seq, 1822 bool regrant) 1823 { 1824 struct xfs_cil *cil = log->l_cilp; 1825 struct xfs_log_item *lip, *next; 1826 uint32_t released_space = 0; 1827 1828 /* 1829 * Do all necessary memory allocation before we lock the CIL. 1830 * This ensures the allocation does not deadlock with a CIL 1831 * push in memory reclaim (e.g. from kswapd). 1832 */ 1833 xlog_cil_alloc_shadow_bufs(log, tp); 1834 1835 /* lock out background commit */ 1836 down_read(&cil->xc_ctx_lock); 1837 1838 if (tp->t_flags & XFS_TRANS_HAS_INTENT_DONE) 1839 released_space = xlog_cil_process_intents(cil, tp); 1840 1841 xlog_cil_insert_items(log, tp, released_space); 1842 1843 if (regrant && !xlog_is_shutdown(log)) 1844 xfs_log_ticket_regrant(log, tp->t_ticket); 1845 else 1846 xfs_log_ticket_ungrant(log, tp->t_ticket); 1847 tp->t_ticket = NULL; 1848 xfs_trans_unreserve_and_mod_sb(tp); 1849 1850 /* 1851 * Once all the items of the transaction have been copied to the CIL, 1852 * the items can be unlocked and possibly freed. 1853 * 1854 * This needs to be done before we drop the CIL context lock because we 1855 * have to update state in the log items and unlock them before they go 1856 * to disk. If we don't, then the CIL checkpoint can race with us and 1857 * we can run checkpoint completion before we've updated and unlocked 1858 * the log items. This affects (at least) processing of stale buffers, 1859 * inodes and EFIs. 1860 */ 1861 trace_xfs_trans_commit_items(tp, _RET_IP_); 1862 list_for_each_entry_safe(lip, next, &tp->t_items, li_trans) { 1863 xfs_trans_del_item(lip); 1864 if (lip->li_ops->iop_committing) 1865 lip->li_ops->iop_committing(lip, cil->xc_ctx->sequence); 1866 } 1867 if (commit_seq) 1868 *commit_seq = cil->xc_ctx->sequence; 1869 1870 /* xlog_cil_push_background() releases cil->xc_ctx_lock */ 1871 xlog_cil_push_background(log); 1872 } 1873 1874 /* 1875 * Flush the CIL to stable storage but don't wait for it to complete. This 1876 * requires the CIL push to ensure the commit record for the push hits the disk, 1877 * but otherwise is no different to a push done from a log force. 1878 */ 1879 void 1880 xlog_cil_flush( 1881 struct xlog *log) 1882 { 1883 xfs_csn_t seq = log->l_cilp->xc_current_sequence; 1884 1885 trace_xfs_log_force(log->l_mp, seq, _RET_IP_); 1886 xlog_cil_push_now(log, seq, true); 1887 1888 /* 1889 * If the CIL is empty, make sure that any previous checkpoint that may 1890 * still be in an active iclog is pushed to stable storage. 1891 */ 1892 if (test_bit(XLOG_CIL_EMPTY, &log->l_cilp->xc_flags)) 1893 xfs_log_force(log->l_mp, 0); 1894 } 1895 1896 /* 1897 * Conditionally push the CIL based on the sequence passed in. 1898 * 1899 * We only need to push if we haven't already pushed the sequence number given. 1900 * Hence the only time we will trigger a push here is if the push sequence is 1901 * the same as the current context. 1902 * 1903 * We return the current commit lsn to allow the callers to determine if a 1904 * iclog flush is necessary following this call. 1905 */ 1906 xfs_lsn_t 1907 xlog_cil_force_seq( 1908 struct xlog *log, 1909 xfs_csn_t sequence) 1910 { 1911 struct xfs_cil *cil = log->l_cilp; 1912 struct xfs_cil_ctx *ctx; 1913 xfs_lsn_t commit_lsn = NULLCOMMITLSN; 1914 1915 ASSERT(sequence <= cil->xc_current_sequence); 1916 1917 if (!sequence) 1918 sequence = cil->xc_current_sequence; 1919 trace_xfs_log_force(log->l_mp, sequence, _RET_IP_); 1920 1921 /* 1922 * check to see if we need to force out the current context. 1923 * xlog_cil_push() handles racing pushes for the same sequence, 1924 * so no need to deal with it here. 1925 */ 1926 restart: 1927 xlog_cil_push_now(log, sequence, false); 1928 1929 /* 1930 * See if we can find a previous sequence still committing. 1931 * We need to wait for all previous sequence commits to complete 1932 * before allowing the force of push_seq to go ahead. Hence block 1933 * on commits for those as well. 1934 */ 1935 spin_lock(&cil->xc_push_lock); 1936 list_for_each_entry(ctx, &cil->xc_committing, committing) { 1937 /* 1938 * Avoid getting stuck in this loop because we were woken by the 1939 * shutdown, but then went back to sleep once already in the 1940 * shutdown state. 1941 */ 1942 if (xlog_is_shutdown(log)) 1943 goto out_shutdown; 1944 if (ctx->sequence > sequence) 1945 continue; 1946 if (!ctx->commit_lsn) { 1947 /* 1948 * It is still being pushed! Wait for the push to 1949 * complete, then start again from the beginning. 1950 */ 1951 XFS_STATS_INC(log->l_mp, xs_log_force_sleep); 1952 xlog_wait(&cil->xc_commit_wait, &cil->xc_push_lock); 1953 goto restart; 1954 } 1955 if (ctx->sequence != sequence) 1956 continue; 1957 /* found it! */ 1958 commit_lsn = ctx->commit_lsn; 1959 } 1960 1961 /* 1962 * The call to xlog_cil_push_now() executes the push in the background. 1963 * Hence by the time we have got here it our sequence may not have been 1964 * pushed yet. This is true if the current sequence still matches the 1965 * push sequence after the above wait loop and the CIL still contains 1966 * dirty objects. This is guaranteed by the push code first adding the 1967 * context to the committing list before emptying the CIL. 1968 * 1969 * Hence if we don't find the context in the committing list and the 1970 * current sequence number is unchanged then the CIL contents are 1971 * significant. If the CIL is empty, if means there was nothing to push 1972 * and that means there is nothing to wait for. If the CIL is not empty, 1973 * it means we haven't yet started the push, because if it had started 1974 * we would have found the context on the committing list. 1975 */ 1976 if (sequence == cil->xc_current_sequence && 1977 !test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) { 1978 spin_unlock(&cil->xc_push_lock); 1979 goto restart; 1980 } 1981 1982 spin_unlock(&cil->xc_push_lock); 1983 return commit_lsn; 1984 1985 /* 1986 * We detected a shutdown in progress. We need to trigger the log force 1987 * to pass through it's iclog state machine error handling, even though 1988 * we are already in a shutdown state. Hence we can't return 1989 * NULLCOMMITLSN here as that has special meaning to log forces (i.e. 1990 * LSN is already stable), so we return a zero LSN instead. 1991 */ 1992 out_shutdown: 1993 spin_unlock(&cil->xc_push_lock); 1994 return 0; 1995 } 1996 1997 /* 1998 * Perform initial CIL structure initialisation. 1999 */ 2000 int 2001 xlog_cil_init( 2002 struct xlog *log) 2003 { 2004 struct xfs_cil *cil; 2005 struct xfs_cil_ctx *ctx; 2006 struct xlog_cil_pcp *cilpcp; 2007 int cpu; 2008 2009 cil = kzalloc(sizeof(*cil), GFP_KERNEL | __GFP_RETRY_MAYFAIL); 2010 if (!cil) 2011 return -ENOMEM; 2012 /* 2013 * Limit the CIL pipeline depth to 4 concurrent works to bound the 2014 * concurrency the log spinlocks will be exposed to. 2015 */ 2016 cil->xc_push_wq = alloc_workqueue("xfs-cil/%s", 2017 XFS_WQFLAGS(WQ_FREEZABLE | WQ_MEM_RECLAIM | WQ_UNBOUND), 2018 4, log->l_mp->m_super->s_id); 2019 if (!cil->xc_push_wq) 2020 goto out_destroy_cil; 2021 2022 cil->xc_log = log; 2023 cil->xc_pcp = alloc_percpu(struct xlog_cil_pcp); 2024 if (!cil->xc_pcp) 2025 goto out_destroy_wq; 2026 2027 for_each_possible_cpu(cpu) { 2028 cilpcp = per_cpu_ptr(cil->xc_pcp, cpu); 2029 INIT_LIST_HEAD(&cilpcp->busy_extents); 2030 INIT_LIST_HEAD(&cilpcp->log_items); 2031 } 2032 2033 INIT_LIST_HEAD(&cil->xc_committing); 2034 spin_lock_init(&cil->xc_push_lock); 2035 init_waitqueue_head(&cil->xc_push_wait); 2036 init_rwsem(&cil->xc_ctx_lock); 2037 init_waitqueue_head(&cil->xc_start_wait); 2038 init_waitqueue_head(&cil->xc_commit_wait); 2039 log->l_cilp = cil; 2040 2041 ctx = xlog_cil_ctx_alloc(); 2042 xlog_cil_ctx_switch(cil, ctx); 2043 return 0; 2044 2045 out_destroy_wq: 2046 destroy_workqueue(cil->xc_push_wq); 2047 out_destroy_cil: 2048 kfree(cil); 2049 return -ENOMEM; 2050 } 2051 2052 void 2053 xlog_cil_destroy( 2054 struct xlog *log) 2055 { 2056 struct xfs_cil *cil = log->l_cilp; 2057 2058 if (cil->xc_ctx) { 2059 if (cil->xc_ctx->ticket) 2060 xfs_log_ticket_put(cil->xc_ctx->ticket); 2061 kfree(cil->xc_ctx); 2062 } 2063 2064 ASSERT(test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)); 2065 free_percpu(cil->xc_pcp); 2066 destroy_workqueue(cil->xc_push_wq); 2067 kfree(cil); 2068 } 2069 2070