1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26 #include <linux/types.h> 27 #include <linux/slab.h> 28 #include <linux/highmem.h> 29 #include <linux/mm.h> 30 #include <linux/crc32.h> 31 #include <linux/kthread.h> 32 #include <linux/pagemap.h> 33 #include <linux/debugfs.h> 34 #include <linux/seq_file.h> 35 36 #include <cluster/heartbeat.h> 37 #include <cluster/nodemanager.h> 38 #include <cluster/tcp.h> 39 40 #include <dlm/dlmapi.h> 41 42 #define MLOG_MASK_PREFIX ML_DLM_GLUE 43 #include <cluster/masklog.h> 44 45 #include "ocfs2.h" 46 47 #include "alloc.h" 48 #include "dcache.h" 49 #include "dlmglue.h" 50 #include "extent_map.h" 51 #include "file.h" 52 #include "heartbeat.h" 53 #include "inode.h" 54 #include "journal.h" 55 #include "slot_map.h" 56 #include "super.h" 57 #include "uptodate.h" 58 59 #include "buffer_head_io.h" 60 61 struct ocfs2_mask_waiter { 62 struct list_head mw_item; 63 int mw_status; 64 struct completion mw_complete; 65 unsigned long mw_mask; 66 unsigned long mw_goal; 67 }; 68 69 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 70 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 71 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres); 72 73 /* 74 * Return value from ->downconvert_worker functions. 75 * 76 * These control the precise actions of ocfs2_unblock_lock() 77 * and ocfs2_process_blocked_lock() 78 * 79 */ 80 enum ocfs2_unblock_action { 81 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 82 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 83 * ->post_unlock callback */ 84 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 85 * ->post_unlock() callback. */ 86 }; 87 88 struct ocfs2_unblock_ctl { 89 int requeue; 90 enum ocfs2_unblock_action unblock_action; 91 }; 92 93 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 94 int new_level); 95 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 96 97 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 98 int blocking); 99 100 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 101 int blocking); 102 103 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 104 struct ocfs2_lock_res *lockres); 105 106 107 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres) 108 109 /* This aids in debugging situations where a bad LVB might be involved. */ 110 static void ocfs2_dump_meta_lvb_info(u64 level, 111 const char *function, 112 unsigned int line, 113 struct ocfs2_lock_res *lockres) 114 { 115 struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 116 117 mlog(level, "LVB information for %s (called from %s:%u):\n", 118 lockres->l_name, function, line); 119 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 120 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 121 be32_to_cpu(lvb->lvb_igeneration)); 122 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 123 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 124 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 125 be16_to_cpu(lvb->lvb_imode)); 126 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 127 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 128 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 129 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 130 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 131 be32_to_cpu(lvb->lvb_iattr)); 132 } 133 134 135 /* 136 * OCFS2 Lock Resource Operations 137 * 138 * These fine tune the behavior of the generic dlmglue locking infrastructure. 139 * 140 * The most basic of lock types can point ->l_priv to their respective 141 * struct ocfs2_super and allow the default actions to manage things. 142 * 143 * Right now, each lock type also needs to implement an init function, 144 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres() 145 * should be called when the lock is no longer needed (i.e., object 146 * destruction time). 147 */ 148 struct ocfs2_lock_res_ops { 149 /* 150 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 151 * this callback if ->l_priv is not an ocfs2_super pointer 152 */ 153 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 154 155 /* 156 * Optionally called in the downconvert thread after a 157 * successful downconvert. The lockres will not be referenced 158 * after this callback is called, so it is safe to free 159 * memory, etc. 160 * 161 * The exact semantics of when this is called are controlled 162 * by ->downconvert_worker() 163 */ 164 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 165 166 /* 167 * Allow a lock type to add checks to determine whether it is 168 * safe to downconvert a lock. Return 0 to re-queue the 169 * downconvert at a later time, nonzero to continue. 170 * 171 * For most locks, the default checks that there are no 172 * incompatible holders are sufficient. 173 * 174 * Called with the lockres spinlock held. 175 */ 176 int (*check_downconvert)(struct ocfs2_lock_res *, int); 177 178 /* 179 * Allows a lock type to populate the lock value block. This 180 * is called on downconvert, and when we drop a lock. 181 * 182 * Locks that want to use this should set LOCK_TYPE_USES_LVB 183 * in the flags field. 184 * 185 * Called with the lockres spinlock held. 186 */ 187 void (*set_lvb)(struct ocfs2_lock_res *); 188 189 /* 190 * Called from the downconvert thread when it is determined 191 * that a lock will be downconverted. This is called without 192 * any locks held so the function can do work that might 193 * schedule (syncing out data, etc). 194 * 195 * This should return any one of the ocfs2_unblock_action 196 * values, depending on what it wants the thread to do. 197 */ 198 int (*downconvert_worker)(struct ocfs2_lock_res *, int); 199 200 /* 201 * LOCK_TYPE_* flags which describe the specific requirements 202 * of a lock type. Descriptions of each individual flag follow. 203 */ 204 int flags; 205 }; 206 207 /* 208 * Some locks want to "refresh" potentially stale data when a 209 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 210 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 211 * individual lockres l_flags member from the ast function. It is 212 * expected that the locking wrapper will clear the 213 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 214 */ 215 #define LOCK_TYPE_REQUIRES_REFRESH 0x1 216 217 /* 218 * Indicate that a lock type makes use of the lock value block. The 219 * ->set_lvb lock type callback must be defined. 220 */ 221 #define LOCK_TYPE_USES_LVB 0x2 222 223 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 224 .get_osb = ocfs2_get_inode_osb, 225 .flags = 0, 226 }; 227 228 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = { 229 .get_osb = ocfs2_get_inode_osb, 230 .check_downconvert = ocfs2_check_meta_downconvert, 231 .set_lvb = ocfs2_set_meta_lvb, 232 .downconvert_worker = ocfs2_data_convert_worker, 233 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 234 }; 235 236 static struct ocfs2_lock_res_ops ocfs2_super_lops = { 237 .flags = LOCK_TYPE_REQUIRES_REFRESH, 238 }; 239 240 static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 241 .flags = 0, 242 }; 243 244 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 245 .get_osb = ocfs2_get_dentry_osb, 246 .post_unlock = ocfs2_dentry_post_unlock, 247 .downconvert_worker = ocfs2_dentry_convert_worker, 248 .flags = 0, 249 }; 250 251 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = { 252 .get_osb = ocfs2_get_inode_osb, 253 .flags = 0, 254 }; 255 256 static struct ocfs2_lock_res_ops ocfs2_flock_lops = { 257 .get_osb = ocfs2_get_file_osb, 258 .flags = 0, 259 }; 260 261 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 262 { 263 return lockres->l_type == OCFS2_LOCK_TYPE_META || 264 lockres->l_type == OCFS2_LOCK_TYPE_RW || 265 lockres->l_type == OCFS2_LOCK_TYPE_OPEN; 266 } 267 268 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 269 { 270 BUG_ON(!ocfs2_is_inode_lock(lockres)); 271 272 return (struct inode *) lockres->l_priv; 273 } 274 275 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 276 { 277 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 278 279 return (struct ocfs2_dentry_lock *)lockres->l_priv; 280 } 281 282 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 283 { 284 if (lockres->l_ops->get_osb) 285 return lockres->l_ops->get_osb(lockres); 286 287 return (struct ocfs2_super *)lockres->l_priv; 288 } 289 290 static int ocfs2_lock_create(struct ocfs2_super *osb, 291 struct ocfs2_lock_res *lockres, 292 int level, 293 int dlm_flags); 294 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 295 int wanted); 296 static void ocfs2_cluster_unlock(struct ocfs2_super *osb, 297 struct ocfs2_lock_res *lockres, 298 int level); 299 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 300 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 301 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 302 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 303 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 304 struct ocfs2_lock_res *lockres); 305 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 306 int convert); 307 #define ocfs2_log_dlm_error(_func, _stat, _lockres) do { \ 308 mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on " \ 309 "resource %s: %s\n", dlm_errname(_stat), _func, \ 310 _lockres->l_name, dlm_errmsg(_stat)); \ 311 } while (0) 312 static int ocfs2_downconvert_thread(void *arg); 313 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 314 struct ocfs2_lock_res *lockres); 315 static int ocfs2_inode_lock_update(struct inode *inode, 316 struct buffer_head **bh); 317 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 318 static inline int ocfs2_highest_compat_lock_level(int level); 319 static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 320 int new_level); 321 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 322 struct ocfs2_lock_res *lockres, 323 int new_level, 324 int lvb); 325 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 326 struct ocfs2_lock_res *lockres); 327 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 328 struct ocfs2_lock_res *lockres); 329 330 331 static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 332 u64 blkno, 333 u32 generation, 334 char *name) 335 { 336 int len; 337 338 mlog_entry_void(); 339 340 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 341 342 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 343 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 344 (long long)blkno, generation); 345 346 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 347 348 mlog(0, "built lock resource with name: %s\n", name); 349 350 mlog_exit_void(); 351 } 352 353 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 354 355 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 356 struct ocfs2_dlm_debug *dlm_debug) 357 { 358 mlog(0, "Add tracking for lockres %s\n", res->l_name); 359 360 spin_lock(&ocfs2_dlm_tracking_lock); 361 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 362 spin_unlock(&ocfs2_dlm_tracking_lock); 363 } 364 365 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 366 { 367 spin_lock(&ocfs2_dlm_tracking_lock); 368 if (!list_empty(&res->l_debug_list)) 369 list_del_init(&res->l_debug_list); 370 spin_unlock(&ocfs2_dlm_tracking_lock); 371 } 372 373 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 374 struct ocfs2_lock_res *res, 375 enum ocfs2_lock_type type, 376 struct ocfs2_lock_res_ops *ops, 377 void *priv) 378 { 379 res->l_type = type; 380 res->l_ops = ops; 381 res->l_priv = priv; 382 383 res->l_level = LKM_IVMODE; 384 res->l_requested = LKM_IVMODE; 385 res->l_blocking = LKM_IVMODE; 386 res->l_action = OCFS2_AST_INVALID; 387 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 388 389 res->l_flags = OCFS2_LOCK_INITIALIZED; 390 391 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 392 } 393 394 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 395 { 396 /* This also clears out the lock status block */ 397 memset(res, 0, sizeof(struct ocfs2_lock_res)); 398 spin_lock_init(&res->l_lock); 399 init_waitqueue_head(&res->l_event); 400 INIT_LIST_HEAD(&res->l_blocked_list); 401 INIT_LIST_HEAD(&res->l_mask_waiters); 402 } 403 404 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 405 enum ocfs2_lock_type type, 406 unsigned int generation, 407 struct inode *inode) 408 { 409 struct ocfs2_lock_res_ops *ops; 410 411 switch(type) { 412 case OCFS2_LOCK_TYPE_RW: 413 ops = &ocfs2_inode_rw_lops; 414 break; 415 case OCFS2_LOCK_TYPE_META: 416 ops = &ocfs2_inode_inode_lops; 417 break; 418 case OCFS2_LOCK_TYPE_OPEN: 419 ops = &ocfs2_inode_open_lops; 420 break; 421 default: 422 mlog_bug_on_msg(1, "type: %d\n", type); 423 ops = NULL; /* thanks, gcc */ 424 break; 425 }; 426 427 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 428 generation, res->l_name); 429 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 430 } 431 432 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 433 { 434 struct inode *inode = ocfs2_lock_res_inode(lockres); 435 436 return OCFS2_SB(inode->i_sb); 437 } 438 439 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres) 440 { 441 struct ocfs2_file_private *fp = lockres->l_priv; 442 443 return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb); 444 } 445 446 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 447 { 448 __be64 inode_blkno_be; 449 450 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 451 sizeof(__be64)); 452 453 return be64_to_cpu(inode_blkno_be); 454 } 455 456 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 457 { 458 struct ocfs2_dentry_lock *dl = lockres->l_priv; 459 460 return OCFS2_SB(dl->dl_inode->i_sb); 461 } 462 463 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 464 u64 parent, struct inode *inode) 465 { 466 int len; 467 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 468 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 469 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 470 471 ocfs2_lock_res_init_once(lockres); 472 473 /* 474 * Unfortunately, the standard lock naming scheme won't work 475 * here because we have two 16 byte values to use. Instead, 476 * we'll stuff the inode number as a binary value. We still 477 * want error prints to show something without garbling the 478 * display, so drop a null byte in there before the inode 479 * number. A future version of OCFS2 will likely use all 480 * binary lock names. The stringified names have been a 481 * tremendous aid in debugging, but now that the debugfs 482 * interface exists, we can mangle things there if need be. 483 * 484 * NOTE: We also drop the standard "pad" value (the total lock 485 * name size stays the same though - the last part is all 486 * zeros due to the memset in ocfs2_lock_res_init_once() 487 */ 488 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 489 "%c%016llx", 490 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 491 (long long)parent); 492 493 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 494 495 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 496 sizeof(__be64)); 497 498 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 499 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 500 dl); 501 } 502 503 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 504 struct ocfs2_super *osb) 505 { 506 /* Superblock lockres doesn't come from a slab so we call init 507 * once on it manually. */ 508 ocfs2_lock_res_init_once(res); 509 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 510 0, res->l_name); 511 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 512 &ocfs2_super_lops, osb); 513 } 514 515 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 516 struct ocfs2_super *osb) 517 { 518 /* Rename lockres doesn't come from a slab so we call init 519 * once on it manually. */ 520 ocfs2_lock_res_init_once(res); 521 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 522 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 523 &ocfs2_rename_lops, osb); 524 } 525 526 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 527 struct ocfs2_file_private *fp) 528 { 529 struct inode *inode = fp->fp_file->f_mapping->host; 530 struct ocfs2_inode_info *oi = OCFS2_I(inode); 531 532 ocfs2_lock_res_init_once(lockres); 533 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno, 534 inode->i_generation, lockres->l_name); 535 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 536 OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops, 537 fp); 538 lockres->l_flags |= OCFS2_LOCK_NOCACHE; 539 } 540 541 void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 542 { 543 mlog_entry_void(); 544 545 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 546 return; 547 548 ocfs2_remove_lockres_tracking(res); 549 550 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 551 "Lockres %s is on the blocked list\n", 552 res->l_name); 553 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 554 "Lockres %s has mask waiters pending\n", 555 res->l_name); 556 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 557 "Lockres %s is locked\n", 558 res->l_name); 559 mlog_bug_on_msg(res->l_ro_holders, 560 "Lockres %s has %u ro holders\n", 561 res->l_name, res->l_ro_holders); 562 mlog_bug_on_msg(res->l_ex_holders, 563 "Lockres %s has %u ex holders\n", 564 res->l_name, res->l_ex_holders); 565 566 /* Need to clear out the lock status block for the dlm */ 567 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 568 569 res->l_flags = 0UL; 570 mlog_exit_void(); 571 } 572 573 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 574 int level) 575 { 576 mlog_entry_void(); 577 578 BUG_ON(!lockres); 579 580 switch(level) { 581 case LKM_EXMODE: 582 lockres->l_ex_holders++; 583 break; 584 case LKM_PRMODE: 585 lockres->l_ro_holders++; 586 break; 587 default: 588 BUG(); 589 } 590 591 mlog_exit_void(); 592 } 593 594 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 595 int level) 596 { 597 mlog_entry_void(); 598 599 BUG_ON(!lockres); 600 601 switch(level) { 602 case LKM_EXMODE: 603 BUG_ON(!lockres->l_ex_holders); 604 lockres->l_ex_holders--; 605 break; 606 case LKM_PRMODE: 607 BUG_ON(!lockres->l_ro_holders); 608 lockres->l_ro_holders--; 609 break; 610 default: 611 BUG(); 612 } 613 mlog_exit_void(); 614 } 615 616 /* WARNING: This function lives in a world where the only three lock 617 * levels are EX, PR, and NL. It *will* have to be adjusted when more 618 * lock types are added. */ 619 static inline int ocfs2_highest_compat_lock_level(int level) 620 { 621 int new_level = LKM_EXMODE; 622 623 if (level == LKM_EXMODE) 624 new_level = LKM_NLMODE; 625 else if (level == LKM_PRMODE) 626 new_level = LKM_PRMODE; 627 return new_level; 628 } 629 630 static void lockres_set_flags(struct ocfs2_lock_res *lockres, 631 unsigned long newflags) 632 { 633 struct ocfs2_mask_waiter *mw, *tmp; 634 635 assert_spin_locked(&lockres->l_lock); 636 637 lockres->l_flags = newflags; 638 639 list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) { 640 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 641 continue; 642 643 list_del_init(&mw->mw_item); 644 mw->mw_status = 0; 645 complete(&mw->mw_complete); 646 } 647 } 648 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 649 { 650 lockres_set_flags(lockres, lockres->l_flags | or); 651 } 652 static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 653 unsigned long clear) 654 { 655 lockres_set_flags(lockres, lockres->l_flags & ~clear); 656 } 657 658 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 659 { 660 mlog_entry_void(); 661 662 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 663 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 664 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 665 BUG_ON(lockres->l_blocking <= LKM_NLMODE); 666 667 lockres->l_level = lockres->l_requested; 668 if (lockres->l_level <= 669 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 670 lockres->l_blocking = LKM_NLMODE; 671 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 672 } 673 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 674 675 mlog_exit_void(); 676 } 677 678 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 679 { 680 mlog_entry_void(); 681 682 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 683 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 684 685 /* Convert from RO to EX doesn't really need anything as our 686 * information is already up to data. Convert from NL to 687 * *anything* however should mark ourselves as needing an 688 * update */ 689 if (lockres->l_level == LKM_NLMODE && 690 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 691 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 692 693 lockres->l_level = lockres->l_requested; 694 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 695 696 mlog_exit_void(); 697 } 698 699 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 700 { 701 mlog_entry_void(); 702 703 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); 704 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 705 706 if (lockres->l_requested > LKM_NLMODE && 707 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 708 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 709 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 710 711 lockres->l_level = lockres->l_requested; 712 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 713 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 714 715 mlog_exit_void(); 716 } 717 718 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 719 int level) 720 { 721 int needs_downconvert = 0; 722 mlog_entry_void(); 723 724 assert_spin_locked(&lockres->l_lock); 725 726 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 727 728 if (level > lockres->l_blocking) { 729 /* only schedule a downconvert if we haven't already scheduled 730 * one that goes low enough to satisfy the level we're 731 * blocking. this also catches the case where we get 732 * duplicate BASTs */ 733 if (ocfs2_highest_compat_lock_level(level) < 734 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 735 needs_downconvert = 1; 736 737 lockres->l_blocking = level; 738 } 739 740 mlog_exit(needs_downconvert); 741 return needs_downconvert; 742 } 743 744 static void ocfs2_blocking_ast(void *opaque, int level) 745 { 746 struct ocfs2_lock_res *lockres = opaque; 747 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 748 int needs_downconvert; 749 unsigned long flags; 750 751 BUG_ON(level <= LKM_NLMODE); 752 753 mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n", 754 lockres->l_name, level, lockres->l_level, 755 ocfs2_lock_type_string(lockres->l_type)); 756 757 /* 758 * We can skip the bast for locks which don't enable caching - 759 * they'll be dropped at the earliest possible time anyway. 760 */ 761 if (lockres->l_flags & OCFS2_LOCK_NOCACHE) 762 return; 763 764 spin_lock_irqsave(&lockres->l_lock, flags); 765 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 766 if (needs_downconvert) 767 ocfs2_schedule_blocked_lock(osb, lockres); 768 spin_unlock_irqrestore(&lockres->l_lock, flags); 769 770 wake_up(&lockres->l_event); 771 772 ocfs2_wake_downconvert_thread(osb); 773 } 774 775 static void ocfs2_locking_ast(void *opaque) 776 { 777 struct ocfs2_lock_res *lockres = opaque; 778 struct dlm_lockstatus *lksb = &lockres->l_lksb; 779 unsigned long flags; 780 781 spin_lock_irqsave(&lockres->l_lock, flags); 782 783 if (lksb->status != DLM_NORMAL) { 784 mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n", 785 lockres->l_name, lksb->status); 786 spin_unlock_irqrestore(&lockres->l_lock, flags); 787 return; 788 } 789 790 switch(lockres->l_action) { 791 case OCFS2_AST_ATTACH: 792 ocfs2_generic_handle_attach_action(lockres); 793 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 794 break; 795 case OCFS2_AST_CONVERT: 796 ocfs2_generic_handle_convert_action(lockres); 797 break; 798 case OCFS2_AST_DOWNCONVERT: 799 ocfs2_generic_handle_downconvert_action(lockres); 800 break; 801 default: 802 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u " 803 "lockres flags = 0x%lx, unlock action: %u\n", 804 lockres->l_name, lockres->l_action, lockres->l_flags, 805 lockres->l_unlock_action); 806 BUG(); 807 } 808 809 /* set it to something invalid so if we get called again we 810 * can catch it. */ 811 lockres->l_action = OCFS2_AST_INVALID; 812 813 wake_up(&lockres->l_event); 814 spin_unlock_irqrestore(&lockres->l_lock, flags); 815 } 816 817 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 818 int convert) 819 { 820 unsigned long flags; 821 822 mlog_entry_void(); 823 spin_lock_irqsave(&lockres->l_lock, flags); 824 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 825 if (convert) 826 lockres->l_action = OCFS2_AST_INVALID; 827 else 828 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 829 spin_unlock_irqrestore(&lockres->l_lock, flags); 830 831 wake_up(&lockres->l_event); 832 mlog_exit_void(); 833 } 834 835 /* Note: If we detect another process working on the lock (i.e., 836 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 837 * to do the right thing in that case. 838 */ 839 static int ocfs2_lock_create(struct ocfs2_super *osb, 840 struct ocfs2_lock_res *lockres, 841 int level, 842 int dlm_flags) 843 { 844 int ret = 0; 845 enum dlm_status status = DLM_NORMAL; 846 unsigned long flags; 847 848 mlog_entry_void(); 849 850 mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level, 851 dlm_flags); 852 853 spin_lock_irqsave(&lockres->l_lock, flags); 854 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 855 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 856 spin_unlock_irqrestore(&lockres->l_lock, flags); 857 goto bail; 858 } 859 860 lockres->l_action = OCFS2_AST_ATTACH; 861 lockres->l_requested = level; 862 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 863 spin_unlock_irqrestore(&lockres->l_lock, flags); 864 865 status = dlmlock(osb->dlm, 866 level, 867 &lockres->l_lksb, 868 dlm_flags, 869 lockres->l_name, 870 OCFS2_LOCK_ID_MAX_LEN - 1, 871 ocfs2_locking_ast, 872 lockres, 873 ocfs2_blocking_ast); 874 if (status != DLM_NORMAL) { 875 ocfs2_log_dlm_error("dlmlock", status, lockres); 876 ret = -EINVAL; 877 ocfs2_recover_from_dlm_error(lockres, 1); 878 } 879 880 mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name); 881 882 bail: 883 mlog_exit(ret); 884 return ret; 885 } 886 887 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 888 int flag) 889 { 890 unsigned long flags; 891 int ret; 892 893 spin_lock_irqsave(&lockres->l_lock, flags); 894 ret = lockres->l_flags & flag; 895 spin_unlock_irqrestore(&lockres->l_lock, flags); 896 897 return ret; 898 } 899 900 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 901 902 { 903 wait_event(lockres->l_event, 904 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 905 } 906 907 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 908 909 { 910 wait_event(lockres->l_event, 911 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 912 } 913 914 /* predict what lock level we'll be dropping down to on behalf 915 * of another node, and return true if the currently wanted 916 * level will be compatible with it. */ 917 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 918 int wanted) 919 { 920 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 921 922 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 923 } 924 925 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 926 { 927 INIT_LIST_HEAD(&mw->mw_item); 928 init_completion(&mw->mw_complete); 929 } 930 931 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 932 { 933 wait_for_completion(&mw->mw_complete); 934 /* Re-arm the completion in case we want to wait on it again */ 935 INIT_COMPLETION(mw->mw_complete); 936 return mw->mw_status; 937 } 938 939 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 940 struct ocfs2_mask_waiter *mw, 941 unsigned long mask, 942 unsigned long goal) 943 { 944 BUG_ON(!list_empty(&mw->mw_item)); 945 946 assert_spin_locked(&lockres->l_lock); 947 948 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 949 mw->mw_mask = mask; 950 mw->mw_goal = goal; 951 } 952 953 /* returns 0 if the mw that was removed was already satisfied, -EBUSY 954 * if the mask still hadn't reached its goal */ 955 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 956 struct ocfs2_mask_waiter *mw) 957 { 958 unsigned long flags; 959 int ret = 0; 960 961 spin_lock_irqsave(&lockres->l_lock, flags); 962 if (!list_empty(&mw->mw_item)) { 963 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 964 ret = -EBUSY; 965 966 list_del_init(&mw->mw_item); 967 init_completion(&mw->mw_complete); 968 } 969 spin_unlock_irqrestore(&lockres->l_lock, flags); 970 971 return ret; 972 973 } 974 975 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, 976 struct ocfs2_lock_res *lockres) 977 { 978 int ret; 979 980 ret = wait_for_completion_interruptible(&mw->mw_complete); 981 if (ret) 982 lockres_remove_mask_waiter(lockres, mw); 983 else 984 ret = mw->mw_status; 985 /* Re-arm the completion in case we want to wait on it again */ 986 INIT_COMPLETION(mw->mw_complete); 987 return ret; 988 } 989 990 static int ocfs2_cluster_lock(struct ocfs2_super *osb, 991 struct ocfs2_lock_res *lockres, 992 int level, 993 int lkm_flags, 994 int arg_flags) 995 { 996 struct ocfs2_mask_waiter mw; 997 enum dlm_status status; 998 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 999 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1000 unsigned long flags; 1001 1002 mlog_entry_void(); 1003 1004 ocfs2_init_mask_waiter(&mw); 1005 1006 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 1007 lkm_flags |= LKM_VALBLK; 1008 1009 again: 1010 wait = 0; 1011 1012 if (catch_signals && signal_pending(current)) { 1013 ret = -ERESTARTSYS; 1014 goto out; 1015 } 1016 1017 spin_lock_irqsave(&lockres->l_lock, flags); 1018 1019 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1020 "Cluster lock called on freeing lockres %s! flags " 1021 "0x%lx\n", lockres->l_name, lockres->l_flags); 1022 1023 /* We only compare against the currently granted level 1024 * here. If the lock is blocked waiting on a downconvert, 1025 * we'll get caught below. */ 1026 if (lockres->l_flags & OCFS2_LOCK_BUSY && 1027 level > lockres->l_level) { 1028 /* is someone sitting in dlm_lock? If so, wait on 1029 * them. */ 1030 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1031 wait = 1; 1032 goto unlock; 1033 } 1034 1035 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1036 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1037 /* is the lock is currently blocked on behalf of 1038 * another node */ 1039 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 1040 wait = 1; 1041 goto unlock; 1042 } 1043 1044 if (level > lockres->l_level) { 1045 if (lockres->l_action != OCFS2_AST_INVALID) 1046 mlog(ML_ERROR, "lockres %s has action %u pending\n", 1047 lockres->l_name, lockres->l_action); 1048 1049 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1050 lockres->l_action = OCFS2_AST_ATTACH; 1051 lkm_flags &= ~LKM_CONVERT; 1052 } else { 1053 lockres->l_action = OCFS2_AST_CONVERT; 1054 lkm_flags |= LKM_CONVERT; 1055 } 1056 1057 lockres->l_requested = level; 1058 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1059 spin_unlock_irqrestore(&lockres->l_lock, flags); 1060 1061 BUG_ON(level == LKM_IVMODE); 1062 BUG_ON(level == LKM_NLMODE); 1063 1064 mlog(0, "lock %s, convert from %d to level = %d\n", 1065 lockres->l_name, lockres->l_level, level); 1066 1067 /* call dlm_lock to upgrade lock now */ 1068 status = dlmlock(osb->dlm, 1069 level, 1070 &lockres->l_lksb, 1071 lkm_flags, 1072 lockres->l_name, 1073 OCFS2_LOCK_ID_MAX_LEN - 1, 1074 ocfs2_locking_ast, 1075 lockres, 1076 ocfs2_blocking_ast); 1077 if (status != DLM_NORMAL) { 1078 if ((lkm_flags & LKM_NOQUEUE) && 1079 (status == DLM_NOTQUEUED)) 1080 ret = -EAGAIN; 1081 else { 1082 ocfs2_log_dlm_error("dlmlock", status, 1083 lockres); 1084 ret = -EINVAL; 1085 } 1086 ocfs2_recover_from_dlm_error(lockres, 1); 1087 goto out; 1088 } 1089 1090 mlog(0, "lock %s, successfull return from dlmlock\n", 1091 lockres->l_name); 1092 1093 /* At this point we've gone inside the dlm and need to 1094 * complete our work regardless. */ 1095 catch_signals = 0; 1096 1097 /* wait for busy to clear and carry on */ 1098 goto again; 1099 } 1100 1101 /* Ok, if we get here then we're good to go. */ 1102 ocfs2_inc_holders(lockres, level); 1103 1104 ret = 0; 1105 unlock: 1106 spin_unlock_irqrestore(&lockres->l_lock, flags); 1107 out: 1108 /* 1109 * This is helping work around a lock inversion between the page lock 1110 * and dlm locks. One path holds the page lock while calling aops 1111 * which block acquiring dlm locks. The voting thread holds dlm 1112 * locks while acquiring page locks while down converting data locks. 1113 * This block is helping an aop path notice the inversion and back 1114 * off to unlock its page lock before trying the dlm lock again. 1115 */ 1116 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1117 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1118 wait = 0; 1119 if (lockres_remove_mask_waiter(lockres, &mw)) 1120 ret = -EAGAIN; 1121 else 1122 goto again; 1123 } 1124 if (wait) { 1125 ret = ocfs2_wait_for_mask(&mw); 1126 if (ret == 0) 1127 goto again; 1128 mlog_errno(ret); 1129 } 1130 1131 mlog_exit(ret); 1132 return ret; 1133 } 1134 1135 static void ocfs2_cluster_unlock(struct ocfs2_super *osb, 1136 struct ocfs2_lock_res *lockres, 1137 int level) 1138 { 1139 unsigned long flags; 1140 1141 mlog_entry_void(); 1142 spin_lock_irqsave(&lockres->l_lock, flags); 1143 ocfs2_dec_holders(lockres, level); 1144 ocfs2_downconvert_on_unlock(osb, lockres); 1145 spin_unlock_irqrestore(&lockres->l_lock, flags); 1146 mlog_exit_void(); 1147 } 1148 1149 static int ocfs2_create_new_lock(struct ocfs2_super *osb, 1150 struct ocfs2_lock_res *lockres, 1151 int ex, 1152 int local) 1153 { 1154 int level = ex ? LKM_EXMODE : LKM_PRMODE; 1155 unsigned long flags; 1156 int lkm_flags = local ? LKM_LOCAL : 0; 1157 1158 spin_lock_irqsave(&lockres->l_lock, flags); 1159 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1160 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1161 spin_unlock_irqrestore(&lockres->l_lock, flags); 1162 1163 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1164 } 1165 1166 /* Grants us an EX lock on the data and metadata resources, skipping 1167 * the normal cluster directory lookup. Use this ONLY on newly created 1168 * inodes which other nodes can't possibly see, and which haven't been 1169 * hashed in the inode hash yet. This can give us a good performance 1170 * increase as it'll skip the network broadcast normally associated 1171 * with creating a new lock resource. */ 1172 int ocfs2_create_new_inode_locks(struct inode *inode) 1173 { 1174 int ret; 1175 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1176 1177 BUG_ON(!inode); 1178 BUG_ON(!ocfs2_inode_is_new(inode)); 1179 1180 mlog_entry_void(); 1181 1182 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1183 1184 /* NOTE: That we don't increment any of the holder counts, nor 1185 * do we add anything to a journal handle. Since this is 1186 * supposed to be a new inode which the cluster doesn't know 1187 * about yet, there is no need to. As far as the LVB handling 1188 * is concerned, this is basically like acquiring an EX lock 1189 * on a resource which has an invalid one -- we'll set it 1190 * valid when we release the EX. */ 1191 1192 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1193 if (ret) { 1194 mlog_errno(ret); 1195 goto bail; 1196 } 1197 1198 /* 1199 * We don't want to use LKM_LOCAL on a meta data lock as they 1200 * don't use a generation in their lock names. 1201 */ 1202 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); 1203 if (ret) { 1204 mlog_errno(ret); 1205 goto bail; 1206 } 1207 1208 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0); 1209 if (ret) { 1210 mlog_errno(ret); 1211 goto bail; 1212 } 1213 1214 bail: 1215 mlog_exit(ret); 1216 return ret; 1217 } 1218 1219 int ocfs2_rw_lock(struct inode *inode, int write) 1220 { 1221 int status, level; 1222 struct ocfs2_lock_res *lockres; 1223 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1224 1225 BUG_ON(!inode); 1226 1227 mlog_entry_void(); 1228 1229 mlog(0, "inode %llu take %s RW lock\n", 1230 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1231 write ? "EXMODE" : "PRMODE"); 1232 1233 if (ocfs2_mount_local(osb)) 1234 return 0; 1235 1236 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1237 1238 level = write ? LKM_EXMODE : LKM_PRMODE; 1239 1240 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0, 1241 0); 1242 if (status < 0) 1243 mlog_errno(status); 1244 1245 mlog_exit(status); 1246 return status; 1247 } 1248 1249 void ocfs2_rw_unlock(struct inode *inode, int write) 1250 { 1251 int level = write ? LKM_EXMODE : LKM_PRMODE; 1252 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1253 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1254 1255 mlog_entry_void(); 1256 1257 mlog(0, "inode %llu drop %s RW lock\n", 1258 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1259 write ? "EXMODE" : "PRMODE"); 1260 1261 if (!ocfs2_mount_local(osb)) 1262 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1263 1264 mlog_exit_void(); 1265 } 1266 1267 /* 1268 * ocfs2_open_lock always get PR mode lock. 1269 */ 1270 int ocfs2_open_lock(struct inode *inode) 1271 { 1272 int status = 0; 1273 struct ocfs2_lock_res *lockres; 1274 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1275 1276 BUG_ON(!inode); 1277 1278 mlog_entry_void(); 1279 1280 mlog(0, "inode %llu take PRMODE open lock\n", 1281 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1282 1283 if (ocfs2_mount_local(osb)) 1284 goto out; 1285 1286 lockres = &OCFS2_I(inode)->ip_open_lockres; 1287 1288 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1289 LKM_PRMODE, 0, 0); 1290 if (status < 0) 1291 mlog_errno(status); 1292 1293 out: 1294 mlog_exit(status); 1295 return status; 1296 } 1297 1298 int ocfs2_try_open_lock(struct inode *inode, int write) 1299 { 1300 int status = 0, level; 1301 struct ocfs2_lock_res *lockres; 1302 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1303 1304 BUG_ON(!inode); 1305 1306 mlog_entry_void(); 1307 1308 mlog(0, "inode %llu try to take %s open lock\n", 1309 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1310 write ? "EXMODE" : "PRMODE"); 1311 1312 if (ocfs2_mount_local(osb)) 1313 goto out; 1314 1315 lockres = &OCFS2_I(inode)->ip_open_lockres; 1316 1317 level = write ? LKM_EXMODE : LKM_PRMODE; 1318 1319 /* 1320 * The file system may already holding a PRMODE/EXMODE open lock. 1321 * Since we pass LKM_NOQUEUE, the request won't block waiting on 1322 * other nodes and the -EAGAIN will indicate to the caller that 1323 * this inode is still in use. 1324 */ 1325 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1326 level, LKM_NOQUEUE, 0); 1327 1328 out: 1329 mlog_exit(status); 1330 return status; 1331 } 1332 1333 /* 1334 * ocfs2_open_unlock unlock PR and EX mode open locks. 1335 */ 1336 void ocfs2_open_unlock(struct inode *inode) 1337 { 1338 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres; 1339 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1340 1341 mlog_entry_void(); 1342 1343 mlog(0, "inode %llu drop open lock\n", 1344 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1345 1346 if (ocfs2_mount_local(osb)) 1347 goto out; 1348 1349 if(lockres->l_ro_holders) 1350 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1351 LKM_PRMODE); 1352 if(lockres->l_ex_holders) 1353 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1354 LKM_EXMODE); 1355 1356 out: 1357 mlog_exit_void(); 1358 } 1359 1360 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, 1361 int level) 1362 { 1363 int ret; 1364 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1365 unsigned long flags; 1366 struct ocfs2_mask_waiter mw; 1367 1368 ocfs2_init_mask_waiter(&mw); 1369 1370 retry_cancel: 1371 spin_lock_irqsave(&lockres->l_lock, flags); 1372 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 1373 ret = ocfs2_prepare_cancel_convert(osb, lockres); 1374 if (ret) { 1375 spin_unlock_irqrestore(&lockres->l_lock, flags); 1376 ret = ocfs2_cancel_convert(osb, lockres); 1377 if (ret < 0) { 1378 mlog_errno(ret); 1379 goto out; 1380 } 1381 goto retry_cancel; 1382 } 1383 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1384 spin_unlock_irqrestore(&lockres->l_lock, flags); 1385 1386 ocfs2_wait_for_mask(&mw); 1387 goto retry_cancel; 1388 } 1389 1390 ret = -ERESTARTSYS; 1391 /* 1392 * We may still have gotten the lock, in which case there's no 1393 * point to restarting the syscall. 1394 */ 1395 if (lockres->l_level == level) 1396 ret = 0; 1397 1398 mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret, 1399 lockres->l_flags, lockres->l_level, lockres->l_action); 1400 1401 spin_unlock_irqrestore(&lockres->l_lock, flags); 1402 1403 out: 1404 return ret; 1405 } 1406 1407 /* 1408 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of 1409 * flock() calls. The locking approach this requires is sufficiently 1410 * different from all other cluster lock types that we implement a 1411 * seperate path to the "low-level" dlm calls. In particular: 1412 * 1413 * - No optimization of lock levels is done - we take at exactly 1414 * what's been requested. 1415 * 1416 * - No lock caching is employed. We immediately downconvert to 1417 * no-lock at unlock time. This also means flock locks never go on 1418 * the blocking list). 1419 * 1420 * - Since userspace can trivially deadlock itself with flock, we make 1421 * sure to allow cancellation of a misbehaving applications flock() 1422 * request. 1423 * 1424 * - Access to any flock lockres doesn't require concurrency, so we 1425 * can simplify the code by requiring the caller to guarantee 1426 * serialization of dlmglue flock calls. 1427 */ 1428 int ocfs2_file_lock(struct file *file, int ex, int trylock) 1429 { 1430 int ret, level = ex ? LKM_EXMODE : LKM_PRMODE; 1431 unsigned int lkm_flags = trylock ? LKM_NOQUEUE : 0; 1432 unsigned long flags; 1433 struct ocfs2_file_private *fp = file->private_data; 1434 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1435 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1436 struct ocfs2_mask_waiter mw; 1437 1438 ocfs2_init_mask_waiter(&mw); 1439 1440 if ((lockres->l_flags & OCFS2_LOCK_BUSY) || 1441 (lockres->l_level > LKM_NLMODE)) { 1442 mlog(ML_ERROR, 1443 "File lock \"%s\" has busy or locked state: flags: 0x%lx, " 1444 "level: %u\n", lockres->l_name, lockres->l_flags, 1445 lockres->l_level); 1446 return -EINVAL; 1447 } 1448 1449 spin_lock_irqsave(&lockres->l_lock, flags); 1450 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1451 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1452 spin_unlock_irqrestore(&lockres->l_lock, flags); 1453 1454 /* 1455 * Get the lock at NLMODE to start - that way we 1456 * can cancel the upconvert request if need be. 1457 */ 1458 ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0); 1459 if (ret < 0) { 1460 mlog_errno(ret); 1461 goto out; 1462 } 1463 1464 ret = ocfs2_wait_for_mask(&mw); 1465 if (ret) { 1466 mlog_errno(ret); 1467 goto out; 1468 } 1469 spin_lock_irqsave(&lockres->l_lock, flags); 1470 } 1471 1472 lockres->l_action = OCFS2_AST_CONVERT; 1473 lkm_flags |= LKM_CONVERT; 1474 lockres->l_requested = level; 1475 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1476 1477 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1478 spin_unlock_irqrestore(&lockres->l_lock, flags); 1479 1480 ret = dlmlock(osb->dlm, level, &lockres->l_lksb, lkm_flags, 1481 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1, 1482 ocfs2_locking_ast, lockres, ocfs2_blocking_ast); 1483 if (ret != DLM_NORMAL) { 1484 if (trylock && ret == DLM_NOTQUEUED) 1485 ret = -EAGAIN; 1486 else { 1487 ocfs2_log_dlm_error("dlmlock", ret, lockres); 1488 ret = -EINVAL; 1489 } 1490 1491 ocfs2_recover_from_dlm_error(lockres, 1); 1492 lockres_remove_mask_waiter(lockres, &mw); 1493 goto out; 1494 } 1495 1496 ret = ocfs2_wait_for_mask_interruptible(&mw, lockres); 1497 if (ret == -ERESTARTSYS) { 1498 /* 1499 * Userspace can cause deadlock itself with 1500 * flock(). Current behavior locally is to allow the 1501 * deadlock, but abort the system call if a signal is 1502 * received. We follow this example, otherwise a 1503 * poorly written program could sit in kernel until 1504 * reboot. 1505 * 1506 * Handling this is a bit more complicated for Ocfs2 1507 * though. We can't exit this function with an 1508 * outstanding lock request, so a cancel convert is 1509 * required. We intentionally overwrite 'ret' - if the 1510 * cancel fails and the lock was granted, it's easier 1511 * to just bubble sucess back up to the user. 1512 */ 1513 ret = ocfs2_flock_handle_signal(lockres, level); 1514 } 1515 1516 out: 1517 1518 mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n", 1519 lockres->l_name, ex, trylock, ret); 1520 return ret; 1521 } 1522 1523 void ocfs2_file_unlock(struct file *file) 1524 { 1525 int ret; 1526 unsigned long flags; 1527 struct ocfs2_file_private *fp = file->private_data; 1528 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1529 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1530 struct ocfs2_mask_waiter mw; 1531 1532 ocfs2_init_mask_waiter(&mw); 1533 1534 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) 1535 return; 1536 1537 if (lockres->l_level == LKM_NLMODE) 1538 return; 1539 1540 mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", 1541 lockres->l_name, lockres->l_flags, lockres->l_level, 1542 lockres->l_action); 1543 1544 spin_lock_irqsave(&lockres->l_lock, flags); 1545 /* 1546 * Fake a blocking ast for the downconvert code. 1547 */ 1548 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 1549 lockres->l_blocking = LKM_EXMODE; 1550 1551 ocfs2_prepare_downconvert(lockres, LKM_NLMODE); 1552 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1553 spin_unlock_irqrestore(&lockres->l_lock, flags); 1554 1555 ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0); 1556 if (ret) { 1557 mlog_errno(ret); 1558 return; 1559 } 1560 1561 ret = ocfs2_wait_for_mask(&mw); 1562 if (ret) 1563 mlog_errno(ret); 1564 } 1565 1566 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 1567 struct ocfs2_lock_res *lockres) 1568 { 1569 int kick = 0; 1570 1571 mlog_entry_void(); 1572 1573 /* If we know that another node is waiting on our lock, kick 1574 * the downconvert thread * pre-emptively when we reach a release 1575 * condition. */ 1576 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 1577 switch(lockres->l_blocking) { 1578 case LKM_EXMODE: 1579 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 1580 kick = 1; 1581 break; 1582 case LKM_PRMODE: 1583 if (!lockres->l_ex_holders) 1584 kick = 1; 1585 break; 1586 default: 1587 BUG(); 1588 } 1589 } 1590 1591 if (kick) 1592 ocfs2_wake_downconvert_thread(osb); 1593 1594 mlog_exit_void(); 1595 } 1596 1597 #define OCFS2_SEC_BITS 34 1598 #define OCFS2_SEC_SHIFT (64 - 34) 1599 #define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 1600 1601 /* LVB only has room for 64 bits of time here so we pack it for 1602 * now. */ 1603 static u64 ocfs2_pack_timespec(struct timespec *spec) 1604 { 1605 u64 res; 1606 u64 sec = spec->tv_sec; 1607 u32 nsec = spec->tv_nsec; 1608 1609 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 1610 1611 return res; 1612 } 1613 1614 /* Call this with the lockres locked. I am reasonably sure we don't 1615 * need ip_lock in this function as anyone who would be changing those 1616 * values is supposed to be blocked in ocfs2_inode_lock right now. */ 1617 static void __ocfs2_stuff_meta_lvb(struct inode *inode) 1618 { 1619 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1620 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 1621 struct ocfs2_meta_lvb *lvb; 1622 1623 mlog_entry_void(); 1624 1625 lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 1626 1627 /* 1628 * Invalidate the LVB of a deleted inode - this way other 1629 * nodes are forced to go to disk and discover the new inode 1630 * status. 1631 */ 1632 if (oi->ip_flags & OCFS2_INODE_DELETED) { 1633 lvb->lvb_version = 0; 1634 goto out; 1635 } 1636 1637 lvb->lvb_version = OCFS2_LVB_VERSION; 1638 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 1639 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 1640 lvb->lvb_iuid = cpu_to_be32(inode->i_uid); 1641 lvb->lvb_igid = cpu_to_be32(inode->i_gid); 1642 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 1643 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 1644 lvb->lvb_iatime_packed = 1645 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime)); 1646 lvb->lvb_ictime_packed = 1647 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime)); 1648 lvb->lvb_imtime_packed = 1649 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime)); 1650 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 1651 lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features); 1652 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 1653 1654 out: 1655 mlog_meta_lvb(0, lockres); 1656 1657 mlog_exit_void(); 1658 } 1659 1660 static void ocfs2_unpack_timespec(struct timespec *spec, 1661 u64 packed_time) 1662 { 1663 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 1664 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 1665 } 1666 1667 static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 1668 { 1669 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1670 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 1671 struct ocfs2_meta_lvb *lvb; 1672 1673 mlog_entry_void(); 1674 1675 mlog_meta_lvb(0, lockres); 1676 1677 lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 1678 1679 /* We're safe here without the lockres lock... */ 1680 spin_lock(&oi->ip_lock); 1681 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 1682 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 1683 1684 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 1685 oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures); 1686 ocfs2_set_inode_flags(inode); 1687 1688 /* fast-symlinks are a special case */ 1689 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 1690 inode->i_blocks = 0; 1691 else 1692 inode->i_blocks = ocfs2_inode_sector_count(inode); 1693 1694 inode->i_uid = be32_to_cpu(lvb->lvb_iuid); 1695 inode->i_gid = be32_to_cpu(lvb->lvb_igid); 1696 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 1697 inode->i_nlink = be16_to_cpu(lvb->lvb_inlink); 1698 ocfs2_unpack_timespec(&inode->i_atime, 1699 be64_to_cpu(lvb->lvb_iatime_packed)); 1700 ocfs2_unpack_timespec(&inode->i_mtime, 1701 be64_to_cpu(lvb->lvb_imtime_packed)); 1702 ocfs2_unpack_timespec(&inode->i_ctime, 1703 be64_to_cpu(lvb->lvb_ictime_packed)); 1704 spin_unlock(&oi->ip_lock); 1705 1706 mlog_exit_void(); 1707 } 1708 1709 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 1710 struct ocfs2_lock_res *lockres) 1711 { 1712 struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 1713 1714 if (lvb->lvb_version == OCFS2_LVB_VERSION 1715 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 1716 return 1; 1717 return 0; 1718 } 1719 1720 /* Determine whether a lock resource needs to be refreshed, and 1721 * arbitrate who gets to refresh it. 1722 * 1723 * 0 means no refresh needed. 1724 * 1725 * > 0 means you need to refresh this and you MUST call 1726 * ocfs2_complete_lock_res_refresh afterwards. */ 1727 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 1728 { 1729 unsigned long flags; 1730 int status = 0; 1731 1732 mlog_entry_void(); 1733 1734 refresh_check: 1735 spin_lock_irqsave(&lockres->l_lock, flags); 1736 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 1737 spin_unlock_irqrestore(&lockres->l_lock, flags); 1738 goto bail; 1739 } 1740 1741 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 1742 spin_unlock_irqrestore(&lockres->l_lock, flags); 1743 1744 ocfs2_wait_on_refreshing_lock(lockres); 1745 goto refresh_check; 1746 } 1747 1748 /* Ok, I'll be the one to refresh this lock. */ 1749 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 1750 spin_unlock_irqrestore(&lockres->l_lock, flags); 1751 1752 status = 1; 1753 bail: 1754 mlog_exit(status); 1755 return status; 1756 } 1757 1758 /* If status is non zero, I'll mark it as not being in refresh 1759 * anymroe, but i won't clear the needs refresh flag. */ 1760 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 1761 int status) 1762 { 1763 unsigned long flags; 1764 mlog_entry_void(); 1765 1766 spin_lock_irqsave(&lockres->l_lock, flags); 1767 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 1768 if (!status) 1769 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 1770 spin_unlock_irqrestore(&lockres->l_lock, flags); 1771 1772 wake_up(&lockres->l_event); 1773 1774 mlog_exit_void(); 1775 } 1776 1777 /* may or may not return a bh if it went to disk. */ 1778 static int ocfs2_inode_lock_update(struct inode *inode, 1779 struct buffer_head **bh) 1780 { 1781 int status = 0; 1782 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1783 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 1784 struct ocfs2_dinode *fe; 1785 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1786 1787 mlog_entry_void(); 1788 1789 if (ocfs2_mount_local(osb)) 1790 goto bail; 1791 1792 spin_lock(&oi->ip_lock); 1793 if (oi->ip_flags & OCFS2_INODE_DELETED) { 1794 mlog(0, "Orphaned inode %llu was deleted while we " 1795 "were waiting on a lock. ip_flags = 0x%x\n", 1796 (unsigned long long)oi->ip_blkno, oi->ip_flags); 1797 spin_unlock(&oi->ip_lock); 1798 status = -ENOENT; 1799 goto bail; 1800 } 1801 spin_unlock(&oi->ip_lock); 1802 1803 if (!ocfs2_should_refresh_lock_res(lockres)) 1804 goto bail; 1805 1806 /* This will discard any caching information we might have had 1807 * for the inode metadata. */ 1808 ocfs2_metadata_cache_purge(inode); 1809 1810 ocfs2_extent_map_trunc(inode, 0); 1811 1812 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 1813 mlog(0, "Trusting LVB on inode %llu\n", 1814 (unsigned long long)oi->ip_blkno); 1815 ocfs2_refresh_inode_from_lvb(inode); 1816 } else { 1817 /* Boo, we have to go to disk. */ 1818 /* read bh, cast, ocfs2_refresh_inode */ 1819 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno, 1820 bh, OCFS2_BH_CACHED, inode); 1821 if (status < 0) { 1822 mlog_errno(status); 1823 goto bail_refresh; 1824 } 1825 fe = (struct ocfs2_dinode *) (*bh)->b_data; 1826 1827 /* This is a good chance to make sure we're not 1828 * locking an invalid object. 1829 * 1830 * We bug on a stale inode here because we checked 1831 * above whether it was wiped from disk. The wiping 1832 * node provides a guarantee that we receive that 1833 * message and can mark the inode before dropping any 1834 * locks associated with it. */ 1835 if (!OCFS2_IS_VALID_DINODE(fe)) { 1836 OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe); 1837 status = -EIO; 1838 goto bail_refresh; 1839 } 1840 mlog_bug_on_msg(inode->i_generation != 1841 le32_to_cpu(fe->i_generation), 1842 "Invalid dinode %llu disk generation: %u " 1843 "inode->i_generation: %u\n", 1844 (unsigned long long)oi->ip_blkno, 1845 le32_to_cpu(fe->i_generation), 1846 inode->i_generation); 1847 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 1848 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 1849 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 1850 (unsigned long long)oi->ip_blkno, 1851 (unsigned long long)le64_to_cpu(fe->i_dtime), 1852 le32_to_cpu(fe->i_flags)); 1853 1854 ocfs2_refresh_inode(inode, fe); 1855 } 1856 1857 status = 0; 1858 bail_refresh: 1859 ocfs2_complete_lock_res_refresh(lockres, status); 1860 bail: 1861 mlog_exit(status); 1862 return status; 1863 } 1864 1865 static int ocfs2_assign_bh(struct inode *inode, 1866 struct buffer_head **ret_bh, 1867 struct buffer_head *passed_bh) 1868 { 1869 int status; 1870 1871 if (passed_bh) { 1872 /* Ok, the update went to disk for us, use the 1873 * returned bh. */ 1874 *ret_bh = passed_bh; 1875 get_bh(*ret_bh); 1876 1877 return 0; 1878 } 1879 1880 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), 1881 OCFS2_I(inode)->ip_blkno, 1882 ret_bh, 1883 OCFS2_BH_CACHED, 1884 inode); 1885 if (status < 0) 1886 mlog_errno(status); 1887 1888 return status; 1889 } 1890 1891 /* 1892 * returns < 0 error if the callback will never be called, otherwise 1893 * the result of the lock will be communicated via the callback. 1894 */ 1895 int ocfs2_inode_lock_full(struct inode *inode, 1896 struct buffer_head **ret_bh, 1897 int ex, 1898 int arg_flags) 1899 { 1900 int status, level, dlm_flags, acquired; 1901 struct ocfs2_lock_res *lockres = NULL; 1902 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1903 struct buffer_head *local_bh = NULL; 1904 1905 BUG_ON(!inode); 1906 1907 mlog_entry_void(); 1908 1909 mlog(0, "inode %llu, take %s META lock\n", 1910 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1911 ex ? "EXMODE" : "PRMODE"); 1912 1913 status = 0; 1914 acquired = 0; 1915 /* We'll allow faking a readonly metadata lock for 1916 * rodevices. */ 1917 if (ocfs2_is_hard_readonly(osb)) { 1918 if (ex) 1919 status = -EROFS; 1920 goto bail; 1921 } 1922 1923 if (ocfs2_mount_local(osb)) 1924 goto local; 1925 1926 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 1927 wait_event(osb->recovery_event, 1928 ocfs2_node_map_is_empty(osb, &osb->recovery_map)); 1929 1930 lockres = &OCFS2_I(inode)->ip_inode_lockres; 1931 level = ex ? LKM_EXMODE : LKM_PRMODE; 1932 dlm_flags = 0; 1933 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 1934 dlm_flags |= LKM_NOQUEUE; 1935 1936 status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags); 1937 if (status < 0) { 1938 if (status != -EAGAIN && status != -EIOCBRETRY) 1939 mlog_errno(status); 1940 goto bail; 1941 } 1942 1943 /* Notify the error cleanup path to drop the cluster lock. */ 1944 acquired = 1; 1945 1946 /* We wait twice because a node may have died while we were in 1947 * the lower dlm layers. The second time though, we've 1948 * committed to owning this lock so we don't allow signals to 1949 * abort the operation. */ 1950 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 1951 wait_event(osb->recovery_event, 1952 ocfs2_node_map_is_empty(osb, &osb->recovery_map)); 1953 1954 local: 1955 /* 1956 * We only see this flag if we're being called from 1957 * ocfs2_read_locked_inode(). It means we're locking an inode 1958 * which hasn't been populated yet, so clear the refresh flag 1959 * and let the caller handle it. 1960 */ 1961 if (inode->i_state & I_NEW) { 1962 status = 0; 1963 if (lockres) 1964 ocfs2_complete_lock_res_refresh(lockres, 0); 1965 goto bail; 1966 } 1967 1968 /* This is fun. The caller may want a bh back, or it may 1969 * not. ocfs2_inode_lock_update definitely wants one in, but 1970 * may or may not read one, depending on what's in the 1971 * LVB. The result of all of this is that we've *only* gone to 1972 * disk if we have to, so the complexity is worthwhile. */ 1973 status = ocfs2_inode_lock_update(inode, &local_bh); 1974 if (status < 0) { 1975 if (status != -ENOENT) 1976 mlog_errno(status); 1977 goto bail; 1978 } 1979 1980 if (ret_bh) { 1981 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 1982 if (status < 0) { 1983 mlog_errno(status); 1984 goto bail; 1985 } 1986 } 1987 1988 bail: 1989 if (status < 0) { 1990 if (ret_bh && (*ret_bh)) { 1991 brelse(*ret_bh); 1992 *ret_bh = NULL; 1993 } 1994 if (acquired) 1995 ocfs2_inode_unlock(inode, ex); 1996 } 1997 1998 if (local_bh) 1999 brelse(local_bh); 2000 2001 mlog_exit(status); 2002 return status; 2003 } 2004 2005 /* 2006 * This is working around a lock inversion between tasks acquiring DLM 2007 * locks while holding a page lock and the downconvert thread which 2008 * blocks dlm lock acquiry while acquiring page locks. 2009 * 2010 * ** These _with_page variantes are only intended to be called from aop 2011 * methods that hold page locks and return a very specific *positive* error 2012 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 2013 * 2014 * The DLM is called such that it returns -EAGAIN if it would have 2015 * blocked waiting for the downconvert thread. In that case we unlock 2016 * our page so the downconvert thread can make progress. Once we've 2017 * done this we have to return AOP_TRUNCATED_PAGE so the aop method 2018 * that called us can bubble that back up into the VFS who will then 2019 * immediately retry the aop call. 2020 * 2021 * We do a blocking lock and immediate unlock before returning, though, so that 2022 * the lock has a great chance of being cached on this node by the time the VFS 2023 * calls back to retry the aop. This has a potential to livelock as nodes 2024 * ping locks back and forth, but that's a risk we're willing to take to avoid 2025 * the lock inversion simply. 2026 */ 2027 int ocfs2_inode_lock_with_page(struct inode *inode, 2028 struct buffer_head **ret_bh, 2029 int ex, 2030 struct page *page) 2031 { 2032 int ret; 2033 2034 ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); 2035 if (ret == -EAGAIN) { 2036 unlock_page(page); 2037 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0) 2038 ocfs2_inode_unlock(inode, ex); 2039 ret = AOP_TRUNCATED_PAGE; 2040 } 2041 2042 return ret; 2043 } 2044 2045 int ocfs2_inode_lock_atime(struct inode *inode, 2046 struct vfsmount *vfsmnt, 2047 int *level) 2048 { 2049 int ret; 2050 2051 mlog_entry_void(); 2052 ret = ocfs2_inode_lock(inode, NULL, 0); 2053 if (ret < 0) { 2054 mlog_errno(ret); 2055 return ret; 2056 } 2057 2058 /* 2059 * If we should update atime, we will get EX lock, 2060 * otherwise we just get PR lock. 2061 */ 2062 if (ocfs2_should_update_atime(inode, vfsmnt)) { 2063 struct buffer_head *bh = NULL; 2064 2065 ocfs2_inode_unlock(inode, 0); 2066 ret = ocfs2_inode_lock(inode, &bh, 1); 2067 if (ret < 0) { 2068 mlog_errno(ret); 2069 return ret; 2070 } 2071 *level = 1; 2072 if (ocfs2_should_update_atime(inode, vfsmnt)) 2073 ocfs2_update_inode_atime(inode, bh); 2074 if (bh) 2075 brelse(bh); 2076 } else 2077 *level = 0; 2078 2079 mlog_exit(ret); 2080 return ret; 2081 } 2082 2083 void ocfs2_inode_unlock(struct inode *inode, 2084 int ex) 2085 { 2086 int level = ex ? LKM_EXMODE : LKM_PRMODE; 2087 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; 2088 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2089 2090 mlog_entry_void(); 2091 2092 mlog(0, "inode %llu drop %s META lock\n", 2093 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2094 ex ? "EXMODE" : "PRMODE"); 2095 2096 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) && 2097 !ocfs2_mount_local(osb)) 2098 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 2099 2100 mlog_exit_void(); 2101 } 2102 2103 int ocfs2_super_lock(struct ocfs2_super *osb, 2104 int ex) 2105 { 2106 int status = 0; 2107 int level = ex ? LKM_EXMODE : LKM_PRMODE; 2108 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2109 struct buffer_head *bh; 2110 struct ocfs2_slot_info *si = osb->slot_info; 2111 2112 mlog_entry_void(); 2113 2114 if (ocfs2_is_hard_readonly(osb)) 2115 return -EROFS; 2116 2117 if (ocfs2_mount_local(osb)) 2118 goto bail; 2119 2120 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 2121 if (status < 0) { 2122 mlog_errno(status); 2123 goto bail; 2124 } 2125 2126 /* The super block lock path is really in the best position to 2127 * know when resources covered by the lock need to be 2128 * refreshed, so we do it here. Of course, making sense of 2129 * everything is up to the caller :) */ 2130 status = ocfs2_should_refresh_lock_res(lockres); 2131 if (status < 0) { 2132 mlog_errno(status); 2133 goto bail; 2134 } 2135 if (status) { 2136 bh = si->si_bh; 2137 status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0, 2138 si->si_inode); 2139 if (status == 0) 2140 ocfs2_update_slot_info(si); 2141 2142 ocfs2_complete_lock_res_refresh(lockres, status); 2143 2144 if (status < 0) 2145 mlog_errno(status); 2146 } 2147 bail: 2148 mlog_exit(status); 2149 return status; 2150 } 2151 2152 void ocfs2_super_unlock(struct ocfs2_super *osb, 2153 int ex) 2154 { 2155 int level = ex ? LKM_EXMODE : LKM_PRMODE; 2156 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2157 2158 if (!ocfs2_mount_local(osb)) 2159 ocfs2_cluster_unlock(osb, lockres, level); 2160 } 2161 2162 int ocfs2_rename_lock(struct ocfs2_super *osb) 2163 { 2164 int status; 2165 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2166 2167 if (ocfs2_is_hard_readonly(osb)) 2168 return -EROFS; 2169 2170 if (ocfs2_mount_local(osb)) 2171 return 0; 2172 2173 status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0); 2174 if (status < 0) 2175 mlog_errno(status); 2176 2177 return status; 2178 } 2179 2180 void ocfs2_rename_unlock(struct ocfs2_super *osb) 2181 { 2182 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2183 2184 if (!ocfs2_mount_local(osb)) 2185 ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE); 2186 } 2187 2188 int ocfs2_dentry_lock(struct dentry *dentry, int ex) 2189 { 2190 int ret; 2191 int level = ex ? LKM_EXMODE : LKM_PRMODE; 2192 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2193 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2194 2195 BUG_ON(!dl); 2196 2197 if (ocfs2_is_hard_readonly(osb)) 2198 return -EROFS; 2199 2200 if (ocfs2_mount_local(osb)) 2201 return 0; 2202 2203 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 2204 if (ret < 0) 2205 mlog_errno(ret); 2206 2207 return ret; 2208 } 2209 2210 void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 2211 { 2212 int level = ex ? LKM_EXMODE : LKM_PRMODE; 2213 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2214 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2215 2216 if (!ocfs2_mount_local(osb)) 2217 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 2218 } 2219 2220 /* Reference counting of the dlm debug structure. We want this because 2221 * open references on the debug inodes can live on after a mount, so 2222 * we can't rely on the ocfs2_super to always exist. */ 2223 static void ocfs2_dlm_debug_free(struct kref *kref) 2224 { 2225 struct ocfs2_dlm_debug *dlm_debug; 2226 2227 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 2228 2229 kfree(dlm_debug); 2230 } 2231 2232 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 2233 { 2234 if (dlm_debug) 2235 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 2236 } 2237 2238 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 2239 { 2240 kref_get(&debug->d_refcnt); 2241 } 2242 2243 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 2244 { 2245 struct ocfs2_dlm_debug *dlm_debug; 2246 2247 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 2248 if (!dlm_debug) { 2249 mlog_errno(-ENOMEM); 2250 goto out; 2251 } 2252 2253 kref_init(&dlm_debug->d_refcnt); 2254 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 2255 dlm_debug->d_locking_state = NULL; 2256 out: 2257 return dlm_debug; 2258 } 2259 2260 /* Access to this is arbitrated for us via seq_file->sem. */ 2261 struct ocfs2_dlm_seq_priv { 2262 struct ocfs2_dlm_debug *p_dlm_debug; 2263 struct ocfs2_lock_res p_iter_res; 2264 struct ocfs2_lock_res p_tmp_res; 2265 }; 2266 2267 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 2268 struct ocfs2_dlm_seq_priv *priv) 2269 { 2270 struct ocfs2_lock_res *iter, *ret = NULL; 2271 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 2272 2273 assert_spin_locked(&ocfs2_dlm_tracking_lock); 2274 2275 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 2276 /* discover the head of the list */ 2277 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 2278 mlog(0, "End of list found, %p\n", ret); 2279 break; 2280 } 2281 2282 /* We track our "dummy" iteration lockres' by a NULL 2283 * l_ops field. */ 2284 if (iter->l_ops != NULL) { 2285 ret = iter; 2286 break; 2287 } 2288 } 2289 2290 return ret; 2291 } 2292 2293 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 2294 { 2295 struct ocfs2_dlm_seq_priv *priv = m->private; 2296 struct ocfs2_lock_res *iter; 2297 2298 spin_lock(&ocfs2_dlm_tracking_lock); 2299 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 2300 if (iter) { 2301 /* Since lockres' have the lifetime of their container 2302 * (which can be inodes, ocfs2_supers, etc) we want to 2303 * copy this out to a temporary lockres while still 2304 * under the spinlock. Obviously after this we can't 2305 * trust any pointers on the copy returned, but that's 2306 * ok as the information we want isn't typically held 2307 * in them. */ 2308 priv->p_tmp_res = *iter; 2309 iter = &priv->p_tmp_res; 2310 } 2311 spin_unlock(&ocfs2_dlm_tracking_lock); 2312 2313 return iter; 2314 } 2315 2316 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 2317 { 2318 } 2319 2320 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 2321 { 2322 struct ocfs2_dlm_seq_priv *priv = m->private; 2323 struct ocfs2_lock_res *iter = v; 2324 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 2325 2326 spin_lock(&ocfs2_dlm_tracking_lock); 2327 iter = ocfs2_dlm_next_res(iter, priv); 2328 list_del_init(&dummy->l_debug_list); 2329 if (iter) { 2330 list_add(&dummy->l_debug_list, &iter->l_debug_list); 2331 priv->p_tmp_res = *iter; 2332 iter = &priv->p_tmp_res; 2333 } 2334 spin_unlock(&ocfs2_dlm_tracking_lock); 2335 2336 return iter; 2337 } 2338 2339 /* So that debugfs.ocfs2 can determine which format is being used */ 2340 #define OCFS2_DLM_DEBUG_STR_VERSION 1 2341 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 2342 { 2343 int i; 2344 char *lvb; 2345 struct ocfs2_lock_res *lockres = v; 2346 2347 if (!lockres) 2348 return -EINVAL; 2349 2350 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 2351 2352 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 2353 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 2354 lockres->l_name, 2355 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 2356 else 2357 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 2358 2359 seq_printf(m, "%d\t" 2360 "0x%lx\t" 2361 "0x%x\t" 2362 "0x%x\t" 2363 "%u\t" 2364 "%u\t" 2365 "%d\t" 2366 "%d\t", 2367 lockres->l_level, 2368 lockres->l_flags, 2369 lockres->l_action, 2370 lockres->l_unlock_action, 2371 lockres->l_ro_holders, 2372 lockres->l_ex_holders, 2373 lockres->l_requested, 2374 lockres->l_blocking); 2375 2376 /* Dump the raw LVB */ 2377 lvb = lockres->l_lksb.lvb; 2378 for(i = 0; i < DLM_LVB_LEN; i++) 2379 seq_printf(m, "0x%x\t", lvb[i]); 2380 2381 /* End the line */ 2382 seq_printf(m, "\n"); 2383 return 0; 2384 } 2385 2386 static struct seq_operations ocfs2_dlm_seq_ops = { 2387 .start = ocfs2_dlm_seq_start, 2388 .stop = ocfs2_dlm_seq_stop, 2389 .next = ocfs2_dlm_seq_next, 2390 .show = ocfs2_dlm_seq_show, 2391 }; 2392 2393 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 2394 { 2395 struct seq_file *seq = (struct seq_file *) file->private_data; 2396 struct ocfs2_dlm_seq_priv *priv = seq->private; 2397 struct ocfs2_lock_res *res = &priv->p_iter_res; 2398 2399 ocfs2_remove_lockres_tracking(res); 2400 ocfs2_put_dlm_debug(priv->p_dlm_debug); 2401 return seq_release_private(inode, file); 2402 } 2403 2404 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 2405 { 2406 int ret; 2407 struct ocfs2_dlm_seq_priv *priv; 2408 struct seq_file *seq; 2409 struct ocfs2_super *osb; 2410 2411 priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL); 2412 if (!priv) { 2413 ret = -ENOMEM; 2414 mlog_errno(ret); 2415 goto out; 2416 } 2417 osb = inode->i_private; 2418 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 2419 priv->p_dlm_debug = osb->osb_dlm_debug; 2420 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 2421 2422 ret = seq_open(file, &ocfs2_dlm_seq_ops); 2423 if (ret) { 2424 kfree(priv); 2425 mlog_errno(ret); 2426 goto out; 2427 } 2428 2429 seq = (struct seq_file *) file->private_data; 2430 seq->private = priv; 2431 2432 ocfs2_add_lockres_tracking(&priv->p_iter_res, 2433 priv->p_dlm_debug); 2434 2435 out: 2436 return ret; 2437 } 2438 2439 static const struct file_operations ocfs2_dlm_debug_fops = { 2440 .open = ocfs2_dlm_debug_open, 2441 .release = ocfs2_dlm_debug_release, 2442 .read = seq_read, 2443 .llseek = seq_lseek, 2444 }; 2445 2446 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 2447 { 2448 int ret = 0; 2449 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2450 2451 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 2452 S_IFREG|S_IRUSR, 2453 osb->osb_debug_root, 2454 osb, 2455 &ocfs2_dlm_debug_fops); 2456 if (!dlm_debug->d_locking_state) { 2457 ret = -EINVAL; 2458 mlog(ML_ERROR, 2459 "Unable to create locking state debugfs file.\n"); 2460 goto out; 2461 } 2462 2463 ocfs2_get_dlm_debug(dlm_debug); 2464 out: 2465 return ret; 2466 } 2467 2468 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 2469 { 2470 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2471 2472 if (dlm_debug) { 2473 debugfs_remove(dlm_debug->d_locking_state); 2474 ocfs2_put_dlm_debug(dlm_debug); 2475 } 2476 } 2477 2478 int ocfs2_dlm_init(struct ocfs2_super *osb) 2479 { 2480 int status = 0; 2481 u32 dlm_key; 2482 struct dlm_ctxt *dlm = NULL; 2483 2484 mlog_entry_void(); 2485 2486 if (ocfs2_mount_local(osb)) 2487 goto local; 2488 2489 status = ocfs2_dlm_init_debug(osb); 2490 if (status < 0) { 2491 mlog_errno(status); 2492 goto bail; 2493 } 2494 2495 /* launch downconvert thread */ 2496 osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc"); 2497 if (IS_ERR(osb->dc_task)) { 2498 status = PTR_ERR(osb->dc_task); 2499 osb->dc_task = NULL; 2500 mlog_errno(status); 2501 goto bail; 2502 } 2503 2504 /* used by the dlm code to make message headers unique, each 2505 * node in this domain must agree on this. */ 2506 dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str)); 2507 2508 /* for now, uuid == domain */ 2509 dlm = dlm_register_domain(osb->uuid_str, dlm_key); 2510 if (IS_ERR(dlm)) { 2511 status = PTR_ERR(dlm); 2512 mlog_errno(status); 2513 goto bail; 2514 } 2515 2516 dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb); 2517 2518 local: 2519 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 2520 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 2521 2522 osb->dlm = dlm; 2523 2524 status = 0; 2525 bail: 2526 if (status < 0) { 2527 ocfs2_dlm_shutdown_debug(osb); 2528 if (osb->dc_task) 2529 kthread_stop(osb->dc_task); 2530 } 2531 2532 mlog_exit(status); 2533 return status; 2534 } 2535 2536 void ocfs2_dlm_shutdown(struct ocfs2_super *osb) 2537 { 2538 mlog_entry_void(); 2539 2540 dlm_unregister_eviction_cb(&osb->osb_eviction_cb); 2541 2542 ocfs2_drop_osb_locks(osb); 2543 2544 if (osb->dc_task) { 2545 kthread_stop(osb->dc_task); 2546 osb->dc_task = NULL; 2547 } 2548 2549 ocfs2_lock_res_free(&osb->osb_super_lockres); 2550 ocfs2_lock_res_free(&osb->osb_rename_lockres); 2551 2552 dlm_unregister_domain(osb->dlm); 2553 osb->dlm = NULL; 2554 2555 ocfs2_dlm_shutdown_debug(osb); 2556 2557 mlog_exit_void(); 2558 } 2559 2560 static void ocfs2_unlock_ast(void *opaque, enum dlm_status status) 2561 { 2562 struct ocfs2_lock_res *lockres = opaque; 2563 unsigned long flags; 2564 2565 mlog_entry_void(); 2566 2567 mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name, 2568 lockres->l_unlock_action); 2569 2570 spin_lock_irqsave(&lockres->l_lock, flags); 2571 /* We tried to cancel a convert request, but it was already 2572 * granted. All we want to do here is clear our unlock 2573 * state. The wake_up call done at the bottom is redundant 2574 * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't 2575 * hurt anything anyway */ 2576 if (status == DLM_CANCELGRANT && 2577 lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 2578 mlog(0, "Got cancelgrant for %s\n", lockres->l_name); 2579 2580 /* We don't clear the busy flag in this case as it 2581 * should have been cleared by the ast which the dlm 2582 * has called. */ 2583 goto complete_unlock; 2584 } 2585 2586 if (status != DLM_NORMAL) { 2587 mlog(ML_ERROR, "Dlm passes status %d for lock %s, " 2588 "unlock_action %d\n", status, lockres->l_name, 2589 lockres->l_unlock_action); 2590 spin_unlock_irqrestore(&lockres->l_lock, flags); 2591 return; 2592 } 2593 2594 switch(lockres->l_unlock_action) { 2595 case OCFS2_UNLOCK_CANCEL_CONVERT: 2596 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 2597 lockres->l_action = OCFS2_AST_INVALID; 2598 break; 2599 case OCFS2_UNLOCK_DROP_LOCK: 2600 lockres->l_level = LKM_IVMODE; 2601 break; 2602 default: 2603 BUG(); 2604 } 2605 2606 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 2607 complete_unlock: 2608 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 2609 spin_unlock_irqrestore(&lockres->l_lock, flags); 2610 2611 wake_up(&lockres->l_event); 2612 2613 mlog_exit_void(); 2614 } 2615 2616 static int ocfs2_drop_lock(struct ocfs2_super *osb, 2617 struct ocfs2_lock_res *lockres) 2618 { 2619 enum dlm_status status; 2620 unsigned long flags; 2621 int lkm_flags = 0; 2622 2623 /* We didn't get anywhere near actually using this lockres. */ 2624 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 2625 goto out; 2626 2627 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 2628 lkm_flags |= LKM_VALBLK; 2629 2630 spin_lock_irqsave(&lockres->l_lock, flags); 2631 2632 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 2633 "lockres %s, flags 0x%lx\n", 2634 lockres->l_name, lockres->l_flags); 2635 2636 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 2637 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 2638 "%u, unlock_action = %u\n", 2639 lockres->l_name, lockres->l_flags, lockres->l_action, 2640 lockres->l_unlock_action); 2641 2642 spin_unlock_irqrestore(&lockres->l_lock, flags); 2643 2644 /* XXX: Today we just wait on any busy 2645 * locks... Perhaps we need to cancel converts in the 2646 * future? */ 2647 ocfs2_wait_on_busy_lock(lockres); 2648 2649 spin_lock_irqsave(&lockres->l_lock, flags); 2650 } 2651 2652 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 2653 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 2654 lockres->l_level == LKM_EXMODE && 2655 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 2656 lockres->l_ops->set_lvb(lockres); 2657 } 2658 2659 if (lockres->l_flags & OCFS2_LOCK_BUSY) 2660 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 2661 lockres->l_name); 2662 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 2663 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 2664 2665 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 2666 spin_unlock_irqrestore(&lockres->l_lock, flags); 2667 goto out; 2668 } 2669 2670 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 2671 2672 /* make sure we never get here while waiting for an ast to 2673 * fire. */ 2674 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 2675 2676 /* is this necessary? */ 2677 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2678 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 2679 spin_unlock_irqrestore(&lockres->l_lock, flags); 2680 2681 mlog(0, "lock %s\n", lockres->l_name); 2682 2683 status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags, 2684 ocfs2_unlock_ast, lockres); 2685 if (status != DLM_NORMAL) { 2686 ocfs2_log_dlm_error("dlmunlock", status, lockres); 2687 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 2688 dlm_print_one_lock(lockres->l_lksb.lockid); 2689 BUG(); 2690 } 2691 mlog(0, "lock %s, successfull return from dlmunlock\n", 2692 lockres->l_name); 2693 2694 ocfs2_wait_on_busy_lock(lockres); 2695 out: 2696 mlog_exit(0); 2697 return 0; 2698 } 2699 2700 /* Mark the lockres as being dropped. It will no longer be 2701 * queued if blocking, but we still may have to wait on it 2702 * being dequeued from the downconvert thread before we can consider 2703 * it safe to drop. 2704 * 2705 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 2706 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres) 2707 { 2708 int status; 2709 struct ocfs2_mask_waiter mw; 2710 unsigned long flags; 2711 2712 ocfs2_init_mask_waiter(&mw); 2713 2714 spin_lock_irqsave(&lockres->l_lock, flags); 2715 lockres->l_flags |= OCFS2_LOCK_FREEING; 2716 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 2717 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 2718 spin_unlock_irqrestore(&lockres->l_lock, flags); 2719 2720 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 2721 2722 status = ocfs2_wait_for_mask(&mw); 2723 if (status) 2724 mlog_errno(status); 2725 2726 spin_lock_irqsave(&lockres->l_lock, flags); 2727 } 2728 spin_unlock_irqrestore(&lockres->l_lock, flags); 2729 } 2730 2731 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 2732 struct ocfs2_lock_res *lockres) 2733 { 2734 int ret; 2735 2736 ocfs2_mark_lockres_freeing(lockres); 2737 ret = ocfs2_drop_lock(osb, lockres); 2738 if (ret) 2739 mlog_errno(ret); 2740 } 2741 2742 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 2743 { 2744 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 2745 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 2746 } 2747 2748 int ocfs2_drop_inode_locks(struct inode *inode) 2749 { 2750 int status, err; 2751 2752 mlog_entry_void(); 2753 2754 /* No need to call ocfs2_mark_lockres_freeing here - 2755 * ocfs2_clear_inode has done it for us. */ 2756 2757 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2758 &OCFS2_I(inode)->ip_open_lockres); 2759 if (err < 0) 2760 mlog_errno(err); 2761 2762 status = err; 2763 2764 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2765 &OCFS2_I(inode)->ip_inode_lockres); 2766 if (err < 0) 2767 mlog_errno(err); 2768 if (err < 0 && !status) 2769 status = err; 2770 2771 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2772 &OCFS2_I(inode)->ip_rw_lockres); 2773 if (err < 0) 2774 mlog_errno(err); 2775 if (err < 0 && !status) 2776 status = err; 2777 2778 mlog_exit(status); 2779 return status; 2780 } 2781 2782 static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 2783 int new_level) 2784 { 2785 assert_spin_locked(&lockres->l_lock); 2786 2787 BUG_ON(lockres->l_blocking <= LKM_NLMODE); 2788 2789 if (lockres->l_level <= new_level) { 2790 mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n", 2791 lockres->l_level, new_level); 2792 BUG(); 2793 } 2794 2795 mlog(0, "lock %s, new_level = %d, l_blocking = %d\n", 2796 lockres->l_name, new_level, lockres->l_blocking); 2797 2798 lockres->l_action = OCFS2_AST_DOWNCONVERT; 2799 lockres->l_requested = new_level; 2800 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2801 } 2802 2803 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 2804 struct ocfs2_lock_res *lockres, 2805 int new_level, 2806 int lvb) 2807 { 2808 int ret, dlm_flags = LKM_CONVERT; 2809 enum dlm_status status; 2810 2811 mlog_entry_void(); 2812 2813 if (lvb) 2814 dlm_flags |= LKM_VALBLK; 2815 2816 status = dlmlock(osb->dlm, 2817 new_level, 2818 &lockres->l_lksb, 2819 dlm_flags, 2820 lockres->l_name, 2821 OCFS2_LOCK_ID_MAX_LEN - 1, 2822 ocfs2_locking_ast, 2823 lockres, 2824 ocfs2_blocking_ast); 2825 if (status != DLM_NORMAL) { 2826 ocfs2_log_dlm_error("dlmlock", status, lockres); 2827 ret = -EINVAL; 2828 ocfs2_recover_from_dlm_error(lockres, 1); 2829 goto bail; 2830 } 2831 2832 ret = 0; 2833 bail: 2834 mlog_exit(ret); 2835 return ret; 2836 } 2837 2838 /* returns 1 when the caller should unlock and call dlmunlock */ 2839 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 2840 struct ocfs2_lock_res *lockres) 2841 { 2842 assert_spin_locked(&lockres->l_lock); 2843 2844 mlog_entry_void(); 2845 mlog(0, "lock %s\n", lockres->l_name); 2846 2847 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 2848 /* If we're already trying to cancel a lock conversion 2849 * then just drop the spinlock and allow the caller to 2850 * requeue this lock. */ 2851 2852 mlog(0, "Lockres %s, skip convert\n", lockres->l_name); 2853 return 0; 2854 } 2855 2856 /* were we in a convert when we got the bast fire? */ 2857 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 2858 lockres->l_action != OCFS2_AST_DOWNCONVERT); 2859 /* set things up for the unlockast to know to just 2860 * clear out the ast_action and unset busy, etc. */ 2861 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 2862 2863 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 2864 "lock %s, invalid flags: 0x%lx\n", 2865 lockres->l_name, lockres->l_flags); 2866 2867 return 1; 2868 } 2869 2870 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 2871 struct ocfs2_lock_res *lockres) 2872 { 2873 int ret; 2874 enum dlm_status status; 2875 2876 mlog_entry_void(); 2877 mlog(0, "lock %s\n", lockres->l_name); 2878 2879 ret = 0; 2880 status = dlmunlock(osb->dlm, 2881 &lockres->l_lksb, 2882 LKM_CANCEL, 2883 ocfs2_unlock_ast, 2884 lockres); 2885 if (status != DLM_NORMAL) { 2886 ocfs2_log_dlm_error("dlmunlock", status, lockres); 2887 ret = -EINVAL; 2888 ocfs2_recover_from_dlm_error(lockres, 0); 2889 } 2890 2891 mlog(0, "lock %s return from dlmunlock\n", lockres->l_name); 2892 2893 mlog_exit(ret); 2894 return ret; 2895 } 2896 2897 static int ocfs2_unblock_lock(struct ocfs2_super *osb, 2898 struct ocfs2_lock_res *lockres, 2899 struct ocfs2_unblock_ctl *ctl) 2900 { 2901 unsigned long flags; 2902 int blocking; 2903 int new_level; 2904 int ret = 0; 2905 int set_lvb = 0; 2906 2907 mlog_entry_void(); 2908 2909 spin_lock_irqsave(&lockres->l_lock, flags); 2910 2911 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 2912 2913 recheck: 2914 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 2915 ctl->requeue = 1; 2916 ret = ocfs2_prepare_cancel_convert(osb, lockres); 2917 spin_unlock_irqrestore(&lockres->l_lock, flags); 2918 if (ret) { 2919 ret = ocfs2_cancel_convert(osb, lockres); 2920 if (ret < 0) 2921 mlog_errno(ret); 2922 } 2923 goto leave; 2924 } 2925 2926 /* if we're blocking an exclusive and we have *any* holders, 2927 * then requeue. */ 2928 if ((lockres->l_blocking == LKM_EXMODE) 2929 && (lockres->l_ex_holders || lockres->l_ro_holders)) 2930 goto leave_requeue; 2931 2932 /* If it's a PR we're blocking, then only 2933 * requeue if we've got any EX holders */ 2934 if (lockres->l_blocking == LKM_PRMODE && 2935 lockres->l_ex_holders) 2936 goto leave_requeue; 2937 2938 /* 2939 * Can we get a lock in this state if the holder counts are 2940 * zero? The meta data unblock code used to check this. 2941 */ 2942 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 2943 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) 2944 goto leave_requeue; 2945 2946 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 2947 2948 if (lockres->l_ops->check_downconvert 2949 && !lockres->l_ops->check_downconvert(lockres, new_level)) 2950 goto leave_requeue; 2951 2952 /* If we get here, then we know that there are no more 2953 * incompatible holders (and anyone asking for an incompatible 2954 * lock is blocked). We can now downconvert the lock */ 2955 if (!lockres->l_ops->downconvert_worker) 2956 goto downconvert; 2957 2958 /* Some lockres types want to do a bit of work before 2959 * downconverting a lock. Allow that here. The worker function 2960 * may sleep, so we save off a copy of what we're blocking as 2961 * it may change while we're not holding the spin lock. */ 2962 blocking = lockres->l_blocking; 2963 spin_unlock_irqrestore(&lockres->l_lock, flags); 2964 2965 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 2966 2967 if (ctl->unblock_action == UNBLOCK_STOP_POST) 2968 goto leave; 2969 2970 spin_lock_irqsave(&lockres->l_lock, flags); 2971 if (blocking != lockres->l_blocking) { 2972 /* If this changed underneath us, then we can't drop 2973 * it just yet. */ 2974 goto recheck; 2975 } 2976 2977 downconvert: 2978 ctl->requeue = 0; 2979 2980 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 2981 if (lockres->l_level == LKM_EXMODE) 2982 set_lvb = 1; 2983 2984 /* 2985 * We only set the lvb if the lock has been fully 2986 * refreshed - otherwise we risk setting stale 2987 * data. Otherwise, there's no need to actually clear 2988 * out the lvb here as it's value is still valid. 2989 */ 2990 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 2991 lockres->l_ops->set_lvb(lockres); 2992 } 2993 2994 ocfs2_prepare_downconvert(lockres, new_level); 2995 spin_unlock_irqrestore(&lockres->l_lock, flags); 2996 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb); 2997 leave: 2998 mlog_exit(ret); 2999 return ret; 3000 3001 leave_requeue: 3002 spin_unlock_irqrestore(&lockres->l_lock, flags); 3003 ctl->requeue = 1; 3004 3005 mlog_exit(0); 3006 return 0; 3007 } 3008 3009 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 3010 int blocking) 3011 { 3012 struct inode *inode; 3013 struct address_space *mapping; 3014 3015 inode = ocfs2_lock_res_inode(lockres); 3016 mapping = inode->i_mapping; 3017 3018 if (S_ISREG(inode->i_mode)) 3019 goto out; 3020 3021 /* 3022 * We need this before the filemap_fdatawrite() so that it can 3023 * transfer the dirty bit from the PTE to the 3024 * page. Unfortunately this means that even for EX->PR 3025 * downconverts, we'll lose our mappings and have to build 3026 * them up again. 3027 */ 3028 unmap_mapping_range(mapping, 0, 0, 0); 3029 3030 if (filemap_fdatawrite(mapping)) { 3031 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 3032 (unsigned long long)OCFS2_I(inode)->ip_blkno); 3033 } 3034 sync_mapping_buffers(mapping); 3035 if (blocking == LKM_EXMODE) { 3036 truncate_inode_pages(mapping, 0); 3037 } else { 3038 /* We only need to wait on the I/O if we're not also 3039 * truncating pages because truncate_inode_pages waits 3040 * for us above. We don't truncate pages if we're 3041 * blocking anything < EXMODE because we want to keep 3042 * them around in that case. */ 3043 filemap_fdatawait(mapping); 3044 } 3045 3046 out: 3047 return UNBLOCK_CONTINUE; 3048 } 3049 3050 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 3051 int new_level) 3052 { 3053 struct inode *inode = ocfs2_lock_res_inode(lockres); 3054 int checkpointed = ocfs2_inode_fully_checkpointed(inode); 3055 3056 BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE); 3057 BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed); 3058 3059 if (checkpointed) 3060 return 1; 3061 3062 ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb)); 3063 return 0; 3064 } 3065 3066 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 3067 { 3068 struct inode *inode = ocfs2_lock_res_inode(lockres); 3069 3070 __ocfs2_stuff_meta_lvb(inode); 3071 } 3072 3073 /* 3074 * Does the final reference drop on our dentry lock. Right now this 3075 * happens in the downconvert thread, but we could choose to simplify the 3076 * dlmglue API and push these off to the ocfs2_wq in the future. 3077 */ 3078 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 3079 struct ocfs2_lock_res *lockres) 3080 { 3081 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3082 ocfs2_dentry_lock_put(osb, dl); 3083 } 3084 3085 /* 3086 * d_delete() matching dentries before the lock downconvert. 3087 * 3088 * At this point, any process waiting to destroy the 3089 * dentry_lock due to last ref count is stopped by the 3090 * OCFS2_LOCK_QUEUED flag. 3091 * 3092 * We have two potential problems 3093 * 3094 * 1) If we do the last reference drop on our dentry_lock (via dput) 3095 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 3096 * the downconvert to finish. Instead we take an elevated 3097 * reference and push the drop until after we've completed our 3098 * unblock processing. 3099 * 3100 * 2) There might be another process with a final reference, 3101 * waiting on us to finish processing. If this is the case, we 3102 * detect it and exit out - there's no more dentries anyway. 3103 */ 3104 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 3105 int blocking) 3106 { 3107 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3108 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 3109 struct dentry *dentry; 3110 unsigned long flags; 3111 int extra_ref = 0; 3112 3113 /* 3114 * This node is blocking another node from getting a read 3115 * lock. This happens when we've renamed within a 3116 * directory. We've forced the other nodes to d_delete(), but 3117 * we never actually dropped our lock because it's still 3118 * valid. The downconvert code will retain a PR for this node, 3119 * so there's no further work to do. 3120 */ 3121 if (blocking == LKM_PRMODE) 3122 return UNBLOCK_CONTINUE; 3123 3124 /* 3125 * Mark this inode as potentially orphaned. The code in 3126 * ocfs2_delete_inode() will figure out whether it actually 3127 * needs to be freed or not. 3128 */ 3129 spin_lock(&oi->ip_lock); 3130 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 3131 spin_unlock(&oi->ip_lock); 3132 3133 /* 3134 * Yuck. We need to make sure however that the check of 3135 * OCFS2_LOCK_FREEING and the extra reference are atomic with 3136 * respect to a reference decrement or the setting of that 3137 * flag. 3138 */ 3139 spin_lock_irqsave(&lockres->l_lock, flags); 3140 spin_lock(&dentry_attach_lock); 3141 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 3142 && dl->dl_count) { 3143 dl->dl_count++; 3144 extra_ref = 1; 3145 } 3146 spin_unlock(&dentry_attach_lock); 3147 spin_unlock_irqrestore(&lockres->l_lock, flags); 3148 3149 mlog(0, "extra_ref = %d\n", extra_ref); 3150 3151 /* 3152 * We have a process waiting on us in ocfs2_dentry_iput(), 3153 * which means we can't have any more outstanding 3154 * aliases. There's no need to do any more work. 3155 */ 3156 if (!extra_ref) 3157 return UNBLOCK_CONTINUE; 3158 3159 spin_lock(&dentry_attach_lock); 3160 while (1) { 3161 dentry = ocfs2_find_local_alias(dl->dl_inode, 3162 dl->dl_parent_blkno, 1); 3163 if (!dentry) 3164 break; 3165 spin_unlock(&dentry_attach_lock); 3166 3167 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len, 3168 dentry->d_name.name); 3169 3170 /* 3171 * The following dcache calls may do an 3172 * iput(). Normally we don't want that from the 3173 * downconverting thread, but in this case it's ok 3174 * because the requesting node already has an 3175 * exclusive lock on the inode, so it can't be queued 3176 * for a downconvert. 3177 */ 3178 d_delete(dentry); 3179 dput(dentry); 3180 3181 spin_lock(&dentry_attach_lock); 3182 } 3183 spin_unlock(&dentry_attach_lock); 3184 3185 /* 3186 * If we are the last holder of this dentry lock, there is no 3187 * reason to downconvert so skip straight to the unlock. 3188 */ 3189 if (dl->dl_count == 1) 3190 return UNBLOCK_STOP_POST; 3191 3192 return UNBLOCK_CONTINUE_POST; 3193 } 3194 3195 void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3196 struct ocfs2_lock_res *lockres) 3197 { 3198 int status; 3199 struct ocfs2_unblock_ctl ctl = {0, 0,}; 3200 unsigned long flags; 3201 3202 /* Our reference to the lockres in this function can be 3203 * considered valid until we remove the OCFS2_LOCK_QUEUED 3204 * flag. */ 3205 3206 mlog_entry_void(); 3207 3208 BUG_ON(!lockres); 3209 BUG_ON(!lockres->l_ops); 3210 3211 mlog(0, "lockres %s blocked.\n", lockres->l_name); 3212 3213 /* Detect whether a lock has been marked as going away while 3214 * the downconvert thread was processing other things. A lock can 3215 * still be marked with OCFS2_LOCK_FREEING after this check, 3216 * but short circuiting here will still save us some 3217 * performance. */ 3218 spin_lock_irqsave(&lockres->l_lock, flags); 3219 if (lockres->l_flags & OCFS2_LOCK_FREEING) 3220 goto unqueue; 3221 spin_unlock_irqrestore(&lockres->l_lock, flags); 3222 3223 status = ocfs2_unblock_lock(osb, lockres, &ctl); 3224 if (status < 0) 3225 mlog_errno(status); 3226 3227 spin_lock_irqsave(&lockres->l_lock, flags); 3228 unqueue: 3229 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 3230 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 3231 } else 3232 ocfs2_schedule_blocked_lock(osb, lockres); 3233 3234 mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name, 3235 ctl.requeue ? "yes" : "no"); 3236 spin_unlock_irqrestore(&lockres->l_lock, flags); 3237 3238 if (ctl.unblock_action != UNBLOCK_CONTINUE 3239 && lockres->l_ops->post_unlock) 3240 lockres->l_ops->post_unlock(osb, lockres); 3241 3242 mlog_exit_void(); 3243 } 3244 3245 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 3246 struct ocfs2_lock_res *lockres) 3247 { 3248 mlog_entry_void(); 3249 3250 assert_spin_locked(&lockres->l_lock); 3251 3252 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 3253 /* Do not schedule a lock for downconvert when it's on 3254 * the way to destruction - any nodes wanting access 3255 * to the resource will get it soon. */ 3256 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n", 3257 lockres->l_name, lockres->l_flags); 3258 return; 3259 } 3260 3261 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 3262 3263 spin_lock(&osb->dc_task_lock); 3264 if (list_empty(&lockres->l_blocked_list)) { 3265 list_add_tail(&lockres->l_blocked_list, 3266 &osb->blocked_lock_list); 3267 osb->blocked_lock_count++; 3268 } 3269 spin_unlock(&osb->dc_task_lock); 3270 3271 mlog_exit_void(); 3272 } 3273 3274 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) 3275 { 3276 unsigned long processed; 3277 struct ocfs2_lock_res *lockres; 3278 3279 mlog_entry_void(); 3280 3281 spin_lock(&osb->dc_task_lock); 3282 /* grab this early so we know to try again if a state change and 3283 * wake happens part-way through our work */ 3284 osb->dc_work_sequence = osb->dc_wake_sequence; 3285 3286 processed = osb->blocked_lock_count; 3287 while (processed) { 3288 BUG_ON(list_empty(&osb->blocked_lock_list)); 3289 3290 lockres = list_entry(osb->blocked_lock_list.next, 3291 struct ocfs2_lock_res, l_blocked_list); 3292 list_del_init(&lockres->l_blocked_list); 3293 osb->blocked_lock_count--; 3294 spin_unlock(&osb->dc_task_lock); 3295 3296 BUG_ON(!processed); 3297 processed--; 3298 3299 ocfs2_process_blocked_lock(osb, lockres); 3300 3301 spin_lock(&osb->dc_task_lock); 3302 } 3303 spin_unlock(&osb->dc_task_lock); 3304 3305 mlog_exit_void(); 3306 } 3307 3308 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) 3309 { 3310 int empty = 0; 3311 3312 spin_lock(&osb->dc_task_lock); 3313 if (list_empty(&osb->blocked_lock_list)) 3314 empty = 1; 3315 3316 spin_unlock(&osb->dc_task_lock); 3317 return empty; 3318 } 3319 3320 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) 3321 { 3322 int should_wake = 0; 3323 3324 spin_lock(&osb->dc_task_lock); 3325 if (osb->dc_work_sequence != osb->dc_wake_sequence) 3326 should_wake = 1; 3327 spin_unlock(&osb->dc_task_lock); 3328 3329 return should_wake; 3330 } 3331 3332 int ocfs2_downconvert_thread(void *arg) 3333 { 3334 int status = 0; 3335 struct ocfs2_super *osb = arg; 3336 3337 /* only quit once we've been asked to stop and there is no more 3338 * work available */ 3339 while (!(kthread_should_stop() && 3340 ocfs2_downconvert_thread_lists_empty(osb))) { 3341 3342 wait_event_interruptible(osb->dc_event, 3343 ocfs2_downconvert_thread_should_wake(osb) || 3344 kthread_should_stop()); 3345 3346 mlog(0, "downconvert_thread: awoken\n"); 3347 3348 ocfs2_downconvert_thread_do_work(osb); 3349 } 3350 3351 osb->dc_task = NULL; 3352 return status; 3353 } 3354 3355 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) 3356 { 3357 spin_lock(&osb->dc_task_lock); 3358 /* make sure the voting thread gets a swipe at whatever changes 3359 * the caller may have made to the voting state */ 3360 osb->dc_wake_sequence++; 3361 spin_unlock(&osb->dc_task_lock); 3362 wake_up(&osb->dc_event); 3363 } 3364