1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26 #include <linux/types.h> 27 #include <linux/slab.h> 28 #include <linux/highmem.h> 29 #include <linux/mm.h> 30 #include <linux/kthread.h> 31 #include <linux/pagemap.h> 32 #include <linux/debugfs.h> 33 #include <linux/seq_file.h> 34 #include <linux/time.h> 35 36 #define MLOG_MASK_PREFIX ML_DLM_GLUE 37 #include <cluster/masklog.h> 38 39 #include "ocfs2.h" 40 #include "ocfs2_lockingver.h" 41 42 #include "alloc.h" 43 #include "dcache.h" 44 #include "dlmglue.h" 45 #include "extent_map.h" 46 #include "file.h" 47 #include "heartbeat.h" 48 #include "inode.h" 49 #include "journal.h" 50 #include "stackglue.h" 51 #include "slot_map.h" 52 #include "super.h" 53 #include "uptodate.h" 54 55 #include "buffer_head_io.h" 56 57 struct ocfs2_mask_waiter { 58 struct list_head mw_item; 59 int mw_status; 60 struct completion mw_complete; 61 unsigned long mw_mask; 62 unsigned long mw_goal; 63 #ifdef CONFIG_OCFS2_FS_STATS 64 unsigned long long mw_lock_start; 65 #endif 66 }; 67 68 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 69 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 70 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres); 71 72 /* 73 * Return value from ->downconvert_worker functions. 74 * 75 * These control the precise actions of ocfs2_unblock_lock() 76 * and ocfs2_process_blocked_lock() 77 * 78 */ 79 enum ocfs2_unblock_action { 80 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 81 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 82 * ->post_unlock callback */ 83 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 84 * ->post_unlock() callback. */ 85 }; 86 87 struct ocfs2_unblock_ctl { 88 int requeue; 89 enum ocfs2_unblock_action unblock_action; 90 }; 91 92 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 93 int new_level); 94 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 95 96 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 97 int blocking); 98 99 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 100 int blocking); 101 102 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 103 struct ocfs2_lock_res *lockres); 104 105 106 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres) 107 108 /* This aids in debugging situations where a bad LVB might be involved. */ 109 static void ocfs2_dump_meta_lvb_info(u64 level, 110 const char *function, 111 unsigned int line, 112 struct ocfs2_lock_res *lockres) 113 { 114 struct ocfs2_meta_lvb *lvb = 115 (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb); 116 117 mlog(level, "LVB information for %s (called from %s:%u):\n", 118 lockres->l_name, function, line); 119 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 120 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 121 be32_to_cpu(lvb->lvb_igeneration)); 122 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 123 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 124 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 125 be16_to_cpu(lvb->lvb_imode)); 126 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 127 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 128 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 129 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 130 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 131 be32_to_cpu(lvb->lvb_iattr)); 132 } 133 134 135 /* 136 * OCFS2 Lock Resource Operations 137 * 138 * These fine tune the behavior of the generic dlmglue locking infrastructure. 139 * 140 * The most basic of lock types can point ->l_priv to their respective 141 * struct ocfs2_super and allow the default actions to manage things. 142 * 143 * Right now, each lock type also needs to implement an init function, 144 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres() 145 * should be called when the lock is no longer needed (i.e., object 146 * destruction time). 147 */ 148 struct ocfs2_lock_res_ops { 149 /* 150 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 151 * this callback if ->l_priv is not an ocfs2_super pointer 152 */ 153 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 154 155 /* 156 * Optionally called in the downconvert thread after a 157 * successful downconvert. The lockres will not be referenced 158 * after this callback is called, so it is safe to free 159 * memory, etc. 160 * 161 * The exact semantics of when this is called are controlled 162 * by ->downconvert_worker() 163 */ 164 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 165 166 /* 167 * Allow a lock type to add checks to determine whether it is 168 * safe to downconvert a lock. Return 0 to re-queue the 169 * downconvert at a later time, nonzero to continue. 170 * 171 * For most locks, the default checks that there are no 172 * incompatible holders are sufficient. 173 * 174 * Called with the lockres spinlock held. 175 */ 176 int (*check_downconvert)(struct ocfs2_lock_res *, int); 177 178 /* 179 * Allows a lock type to populate the lock value block. This 180 * is called on downconvert, and when we drop a lock. 181 * 182 * Locks that want to use this should set LOCK_TYPE_USES_LVB 183 * in the flags field. 184 * 185 * Called with the lockres spinlock held. 186 */ 187 void (*set_lvb)(struct ocfs2_lock_res *); 188 189 /* 190 * Called from the downconvert thread when it is determined 191 * that a lock will be downconverted. This is called without 192 * any locks held so the function can do work that might 193 * schedule (syncing out data, etc). 194 * 195 * This should return any one of the ocfs2_unblock_action 196 * values, depending on what it wants the thread to do. 197 */ 198 int (*downconvert_worker)(struct ocfs2_lock_res *, int); 199 200 /* 201 * LOCK_TYPE_* flags which describe the specific requirements 202 * of a lock type. Descriptions of each individual flag follow. 203 */ 204 int flags; 205 }; 206 207 /* 208 * Some locks want to "refresh" potentially stale data when a 209 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 210 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 211 * individual lockres l_flags member from the ast function. It is 212 * expected that the locking wrapper will clear the 213 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 214 */ 215 #define LOCK_TYPE_REQUIRES_REFRESH 0x1 216 217 /* 218 * Indicate that a lock type makes use of the lock value block. The 219 * ->set_lvb lock type callback must be defined. 220 */ 221 #define LOCK_TYPE_USES_LVB 0x2 222 223 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 224 .get_osb = ocfs2_get_inode_osb, 225 .flags = 0, 226 }; 227 228 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = { 229 .get_osb = ocfs2_get_inode_osb, 230 .check_downconvert = ocfs2_check_meta_downconvert, 231 .set_lvb = ocfs2_set_meta_lvb, 232 .downconvert_worker = ocfs2_data_convert_worker, 233 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 234 }; 235 236 static struct ocfs2_lock_res_ops ocfs2_super_lops = { 237 .flags = LOCK_TYPE_REQUIRES_REFRESH, 238 }; 239 240 static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 241 .flags = 0, 242 }; 243 244 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 245 .get_osb = ocfs2_get_dentry_osb, 246 .post_unlock = ocfs2_dentry_post_unlock, 247 .downconvert_worker = ocfs2_dentry_convert_worker, 248 .flags = 0, 249 }; 250 251 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = { 252 .get_osb = ocfs2_get_inode_osb, 253 .flags = 0, 254 }; 255 256 static struct ocfs2_lock_res_ops ocfs2_flock_lops = { 257 .get_osb = ocfs2_get_file_osb, 258 .flags = 0, 259 }; 260 261 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 262 { 263 return lockres->l_type == OCFS2_LOCK_TYPE_META || 264 lockres->l_type == OCFS2_LOCK_TYPE_RW || 265 lockres->l_type == OCFS2_LOCK_TYPE_OPEN; 266 } 267 268 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 269 { 270 BUG_ON(!ocfs2_is_inode_lock(lockres)); 271 272 return (struct inode *) lockres->l_priv; 273 } 274 275 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 276 { 277 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 278 279 return (struct ocfs2_dentry_lock *)lockres->l_priv; 280 } 281 282 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 283 { 284 if (lockres->l_ops->get_osb) 285 return lockres->l_ops->get_osb(lockres); 286 287 return (struct ocfs2_super *)lockres->l_priv; 288 } 289 290 static int ocfs2_lock_create(struct ocfs2_super *osb, 291 struct ocfs2_lock_res *lockres, 292 int level, 293 u32 dlm_flags); 294 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 295 int wanted); 296 static void ocfs2_cluster_unlock(struct ocfs2_super *osb, 297 struct ocfs2_lock_res *lockres, 298 int level); 299 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 300 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 301 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 302 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 303 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 304 struct ocfs2_lock_res *lockres); 305 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 306 int convert); 307 #define ocfs2_log_dlm_error(_func, _err, _lockres) do { \ 308 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \ 309 _err, _func, _lockres->l_name); \ 310 } while (0) 311 static int ocfs2_downconvert_thread(void *arg); 312 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 313 struct ocfs2_lock_res *lockres); 314 static int ocfs2_inode_lock_update(struct inode *inode, 315 struct buffer_head **bh); 316 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 317 static inline int ocfs2_highest_compat_lock_level(int level); 318 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 319 int new_level); 320 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 321 struct ocfs2_lock_res *lockres, 322 int new_level, 323 int lvb, 324 unsigned int generation); 325 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 326 struct ocfs2_lock_res *lockres); 327 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 328 struct ocfs2_lock_res *lockres); 329 330 331 static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 332 u64 blkno, 333 u32 generation, 334 char *name) 335 { 336 int len; 337 338 mlog_entry_void(); 339 340 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 341 342 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 343 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 344 (long long)blkno, generation); 345 346 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 347 348 mlog(0, "built lock resource with name: %s\n", name); 349 350 mlog_exit_void(); 351 } 352 353 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 354 355 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 356 struct ocfs2_dlm_debug *dlm_debug) 357 { 358 mlog(0, "Add tracking for lockres %s\n", res->l_name); 359 360 spin_lock(&ocfs2_dlm_tracking_lock); 361 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 362 spin_unlock(&ocfs2_dlm_tracking_lock); 363 } 364 365 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 366 { 367 spin_lock(&ocfs2_dlm_tracking_lock); 368 if (!list_empty(&res->l_debug_list)) 369 list_del_init(&res->l_debug_list); 370 spin_unlock(&ocfs2_dlm_tracking_lock); 371 } 372 373 #ifdef CONFIG_OCFS2_FS_STATS 374 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 375 { 376 res->l_lock_num_prmode = 0; 377 res->l_lock_num_prmode_failed = 0; 378 res->l_lock_total_prmode = 0; 379 res->l_lock_max_prmode = 0; 380 res->l_lock_num_exmode = 0; 381 res->l_lock_num_exmode_failed = 0; 382 res->l_lock_total_exmode = 0; 383 res->l_lock_max_exmode = 0; 384 res->l_lock_refresh = 0; 385 } 386 387 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level, 388 struct ocfs2_mask_waiter *mw, int ret) 389 { 390 unsigned long long *num, *sum; 391 unsigned int *max, *failed; 392 struct timespec ts = current_kernel_time(); 393 unsigned long long time = timespec_to_ns(&ts) - mw->mw_lock_start; 394 395 if (level == LKM_PRMODE) { 396 num = &res->l_lock_num_prmode; 397 sum = &res->l_lock_total_prmode; 398 max = &res->l_lock_max_prmode; 399 failed = &res->l_lock_num_prmode_failed; 400 } else if (level == LKM_EXMODE) { 401 num = &res->l_lock_num_exmode; 402 sum = &res->l_lock_total_exmode; 403 max = &res->l_lock_max_exmode; 404 failed = &res->l_lock_num_exmode_failed; 405 } else 406 return; 407 408 (*num)++; 409 (*sum) += time; 410 if (time > *max) 411 *max = time; 412 if (ret) 413 (*failed)++; 414 } 415 416 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 417 { 418 lockres->l_lock_refresh++; 419 } 420 421 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 422 { 423 struct timespec ts = current_kernel_time(); 424 mw->mw_lock_start = timespec_to_ns(&ts); 425 } 426 #else 427 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 428 { 429 } 430 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, 431 int level, struct ocfs2_mask_waiter *mw, int ret) 432 { 433 } 434 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 435 { 436 } 437 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 438 { 439 } 440 #endif 441 442 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 443 struct ocfs2_lock_res *res, 444 enum ocfs2_lock_type type, 445 struct ocfs2_lock_res_ops *ops, 446 void *priv) 447 { 448 res->l_type = type; 449 res->l_ops = ops; 450 res->l_priv = priv; 451 452 res->l_level = DLM_LOCK_IV; 453 res->l_requested = DLM_LOCK_IV; 454 res->l_blocking = DLM_LOCK_IV; 455 res->l_action = OCFS2_AST_INVALID; 456 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 457 458 res->l_flags = OCFS2_LOCK_INITIALIZED; 459 460 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 461 462 ocfs2_init_lock_stats(res); 463 } 464 465 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 466 { 467 /* This also clears out the lock status block */ 468 memset(res, 0, sizeof(struct ocfs2_lock_res)); 469 spin_lock_init(&res->l_lock); 470 init_waitqueue_head(&res->l_event); 471 INIT_LIST_HEAD(&res->l_blocked_list); 472 INIT_LIST_HEAD(&res->l_mask_waiters); 473 } 474 475 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 476 enum ocfs2_lock_type type, 477 unsigned int generation, 478 struct inode *inode) 479 { 480 struct ocfs2_lock_res_ops *ops; 481 482 switch(type) { 483 case OCFS2_LOCK_TYPE_RW: 484 ops = &ocfs2_inode_rw_lops; 485 break; 486 case OCFS2_LOCK_TYPE_META: 487 ops = &ocfs2_inode_inode_lops; 488 break; 489 case OCFS2_LOCK_TYPE_OPEN: 490 ops = &ocfs2_inode_open_lops; 491 break; 492 default: 493 mlog_bug_on_msg(1, "type: %d\n", type); 494 ops = NULL; /* thanks, gcc */ 495 break; 496 }; 497 498 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 499 generation, res->l_name); 500 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 501 } 502 503 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 504 { 505 struct inode *inode = ocfs2_lock_res_inode(lockres); 506 507 return OCFS2_SB(inode->i_sb); 508 } 509 510 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres) 511 { 512 struct ocfs2_file_private *fp = lockres->l_priv; 513 514 return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb); 515 } 516 517 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 518 { 519 __be64 inode_blkno_be; 520 521 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 522 sizeof(__be64)); 523 524 return be64_to_cpu(inode_blkno_be); 525 } 526 527 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 528 { 529 struct ocfs2_dentry_lock *dl = lockres->l_priv; 530 531 return OCFS2_SB(dl->dl_inode->i_sb); 532 } 533 534 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 535 u64 parent, struct inode *inode) 536 { 537 int len; 538 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 539 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 540 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 541 542 ocfs2_lock_res_init_once(lockres); 543 544 /* 545 * Unfortunately, the standard lock naming scheme won't work 546 * here because we have two 16 byte values to use. Instead, 547 * we'll stuff the inode number as a binary value. We still 548 * want error prints to show something without garbling the 549 * display, so drop a null byte in there before the inode 550 * number. A future version of OCFS2 will likely use all 551 * binary lock names. The stringified names have been a 552 * tremendous aid in debugging, but now that the debugfs 553 * interface exists, we can mangle things there if need be. 554 * 555 * NOTE: We also drop the standard "pad" value (the total lock 556 * name size stays the same though - the last part is all 557 * zeros due to the memset in ocfs2_lock_res_init_once() 558 */ 559 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 560 "%c%016llx", 561 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 562 (long long)parent); 563 564 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 565 566 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 567 sizeof(__be64)); 568 569 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 570 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 571 dl); 572 } 573 574 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 575 struct ocfs2_super *osb) 576 { 577 /* Superblock lockres doesn't come from a slab so we call init 578 * once on it manually. */ 579 ocfs2_lock_res_init_once(res); 580 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 581 0, res->l_name); 582 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 583 &ocfs2_super_lops, osb); 584 } 585 586 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 587 struct ocfs2_super *osb) 588 { 589 /* Rename lockres doesn't come from a slab so we call init 590 * once on it manually. */ 591 ocfs2_lock_res_init_once(res); 592 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 593 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 594 &ocfs2_rename_lops, osb); 595 } 596 597 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 598 struct ocfs2_file_private *fp) 599 { 600 struct inode *inode = fp->fp_file->f_mapping->host; 601 struct ocfs2_inode_info *oi = OCFS2_I(inode); 602 603 ocfs2_lock_res_init_once(lockres); 604 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno, 605 inode->i_generation, lockres->l_name); 606 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 607 OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops, 608 fp); 609 lockres->l_flags |= OCFS2_LOCK_NOCACHE; 610 } 611 612 void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 613 { 614 mlog_entry_void(); 615 616 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 617 return; 618 619 ocfs2_remove_lockres_tracking(res); 620 621 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 622 "Lockres %s is on the blocked list\n", 623 res->l_name); 624 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 625 "Lockres %s has mask waiters pending\n", 626 res->l_name); 627 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 628 "Lockres %s is locked\n", 629 res->l_name); 630 mlog_bug_on_msg(res->l_ro_holders, 631 "Lockres %s has %u ro holders\n", 632 res->l_name, res->l_ro_holders); 633 mlog_bug_on_msg(res->l_ex_holders, 634 "Lockres %s has %u ex holders\n", 635 res->l_name, res->l_ex_holders); 636 637 /* Need to clear out the lock status block for the dlm */ 638 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 639 640 res->l_flags = 0UL; 641 mlog_exit_void(); 642 } 643 644 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 645 int level) 646 { 647 mlog_entry_void(); 648 649 BUG_ON(!lockres); 650 651 switch(level) { 652 case DLM_LOCK_EX: 653 lockres->l_ex_holders++; 654 break; 655 case DLM_LOCK_PR: 656 lockres->l_ro_holders++; 657 break; 658 default: 659 BUG(); 660 } 661 662 mlog_exit_void(); 663 } 664 665 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 666 int level) 667 { 668 mlog_entry_void(); 669 670 BUG_ON(!lockres); 671 672 switch(level) { 673 case DLM_LOCK_EX: 674 BUG_ON(!lockres->l_ex_holders); 675 lockres->l_ex_holders--; 676 break; 677 case DLM_LOCK_PR: 678 BUG_ON(!lockres->l_ro_holders); 679 lockres->l_ro_holders--; 680 break; 681 default: 682 BUG(); 683 } 684 mlog_exit_void(); 685 } 686 687 /* WARNING: This function lives in a world where the only three lock 688 * levels are EX, PR, and NL. It *will* have to be adjusted when more 689 * lock types are added. */ 690 static inline int ocfs2_highest_compat_lock_level(int level) 691 { 692 int new_level = DLM_LOCK_EX; 693 694 if (level == DLM_LOCK_EX) 695 new_level = DLM_LOCK_NL; 696 else if (level == DLM_LOCK_PR) 697 new_level = DLM_LOCK_PR; 698 return new_level; 699 } 700 701 static void lockres_set_flags(struct ocfs2_lock_res *lockres, 702 unsigned long newflags) 703 { 704 struct ocfs2_mask_waiter *mw, *tmp; 705 706 assert_spin_locked(&lockres->l_lock); 707 708 lockres->l_flags = newflags; 709 710 list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) { 711 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 712 continue; 713 714 list_del_init(&mw->mw_item); 715 mw->mw_status = 0; 716 complete(&mw->mw_complete); 717 } 718 } 719 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 720 { 721 lockres_set_flags(lockres, lockres->l_flags | or); 722 } 723 static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 724 unsigned long clear) 725 { 726 lockres_set_flags(lockres, lockres->l_flags & ~clear); 727 } 728 729 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 730 { 731 mlog_entry_void(); 732 733 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 734 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 735 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 736 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 737 738 lockres->l_level = lockres->l_requested; 739 if (lockres->l_level <= 740 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 741 lockres->l_blocking = DLM_LOCK_NL; 742 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 743 } 744 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 745 746 mlog_exit_void(); 747 } 748 749 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 750 { 751 mlog_entry_void(); 752 753 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 754 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 755 756 /* Convert from RO to EX doesn't really need anything as our 757 * information is already up to data. Convert from NL to 758 * *anything* however should mark ourselves as needing an 759 * update */ 760 if (lockres->l_level == DLM_LOCK_NL && 761 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 762 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 763 764 lockres->l_level = lockres->l_requested; 765 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 766 767 mlog_exit_void(); 768 } 769 770 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 771 { 772 mlog_entry_void(); 773 774 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); 775 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 776 777 if (lockres->l_requested > DLM_LOCK_NL && 778 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 779 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 780 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 781 782 lockres->l_level = lockres->l_requested; 783 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 784 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 785 786 mlog_exit_void(); 787 } 788 789 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 790 int level) 791 { 792 int needs_downconvert = 0; 793 mlog_entry_void(); 794 795 assert_spin_locked(&lockres->l_lock); 796 797 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 798 799 if (level > lockres->l_blocking) { 800 /* only schedule a downconvert if we haven't already scheduled 801 * one that goes low enough to satisfy the level we're 802 * blocking. this also catches the case where we get 803 * duplicate BASTs */ 804 if (ocfs2_highest_compat_lock_level(level) < 805 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 806 needs_downconvert = 1; 807 808 lockres->l_blocking = level; 809 } 810 811 mlog_exit(needs_downconvert); 812 return needs_downconvert; 813 } 814 815 /* 816 * OCFS2_LOCK_PENDING and l_pending_gen. 817 * 818 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting 819 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock() 820 * for more details on the race. 821 * 822 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces 823 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock() 824 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear 825 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns, 826 * the caller is going to try to clear PENDING again. If nothing else is 827 * happening, __lockres_clear_pending() sees PENDING is unset and does 828 * nothing. 829 * 830 * But what if another path (eg downconvert thread) has just started a 831 * new locking action? The other path has re-set PENDING. Our path 832 * cannot clear PENDING, because that will re-open the original race 833 * window. 834 * 835 * [Example] 836 * 837 * ocfs2_meta_lock() 838 * ocfs2_cluster_lock() 839 * set BUSY 840 * set PENDING 841 * drop l_lock 842 * ocfs2_dlm_lock() 843 * ocfs2_locking_ast() ocfs2_downconvert_thread() 844 * clear PENDING ocfs2_unblock_lock() 845 * take_l_lock 846 * !BUSY 847 * ocfs2_prepare_downconvert() 848 * set BUSY 849 * set PENDING 850 * drop l_lock 851 * take l_lock 852 * clear PENDING 853 * drop l_lock 854 * <window> 855 * ocfs2_dlm_lock() 856 * 857 * So as you can see, we now have a window where l_lock is not held, 858 * PENDING is not set, and ocfs2_dlm_lock() has not been called. 859 * 860 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING 861 * set by ocfs2_prepare_downconvert(). That wasn't nice. 862 * 863 * To solve this we introduce l_pending_gen. A call to 864 * lockres_clear_pending() will only do so when it is passed a generation 865 * number that matches the lockres. lockres_set_pending() will return the 866 * current generation number. When ocfs2_cluster_lock() goes to clear 867 * PENDING, it passes the generation it got from set_pending(). In our 868 * example above, the generation numbers will *not* match. Thus, 869 * ocfs2_cluster_lock() will not clear the PENDING set by 870 * ocfs2_prepare_downconvert(). 871 */ 872 873 /* Unlocked version for ocfs2_locking_ast() */ 874 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres, 875 unsigned int generation, 876 struct ocfs2_super *osb) 877 { 878 assert_spin_locked(&lockres->l_lock); 879 880 /* 881 * The ast and locking functions can race us here. The winner 882 * will clear pending, the loser will not. 883 */ 884 if (!(lockres->l_flags & OCFS2_LOCK_PENDING) || 885 (lockres->l_pending_gen != generation)) 886 return; 887 888 lockres_clear_flags(lockres, OCFS2_LOCK_PENDING); 889 lockres->l_pending_gen++; 890 891 /* 892 * The downconvert thread may have skipped us because we 893 * were PENDING. Wake it up. 894 */ 895 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 896 ocfs2_wake_downconvert_thread(osb); 897 } 898 899 /* Locked version for callers of ocfs2_dlm_lock() */ 900 static void lockres_clear_pending(struct ocfs2_lock_res *lockres, 901 unsigned int generation, 902 struct ocfs2_super *osb) 903 { 904 unsigned long flags; 905 906 spin_lock_irqsave(&lockres->l_lock, flags); 907 __lockres_clear_pending(lockres, generation, osb); 908 spin_unlock_irqrestore(&lockres->l_lock, flags); 909 } 910 911 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres) 912 { 913 assert_spin_locked(&lockres->l_lock); 914 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 915 916 lockres_or_flags(lockres, OCFS2_LOCK_PENDING); 917 918 return lockres->l_pending_gen; 919 } 920 921 922 static void ocfs2_blocking_ast(void *opaque, int level) 923 { 924 struct ocfs2_lock_res *lockres = opaque; 925 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 926 int needs_downconvert; 927 unsigned long flags; 928 929 BUG_ON(level <= DLM_LOCK_NL); 930 931 mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n", 932 lockres->l_name, level, lockres->l_level, 933 ocfs2_lock_type_string(lockres->l_type)); 934 935 /* 936 * We can skip the bast for locks which don't enable caching - 937 * they'll be dropped at the earliest possible time anyway. 938 */ 939 if (lockres->l_flags & OCFS2_LOCK_NOCACHE) 940 return; 941 942 spin_lock_irqsave(&lockres->l_lock, flags); 943 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 944 if (needs_downconvert) 945 ocfs2_schedule_blocked_lock(osb, lockres); 946 spin_unlock_irqrestore(&lockres->l_lock, flags); 947 948 wake_up(&lockres->l_event); 949 950 ocfs2_wake_downconvert_thread(osb); 951 } 952 953 static void ocfs2_locking_ast(void *opaque) 954 { 955 struct ocfs2_lock_res *lockres = opaque; 956 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 957 unsigned long flags; 958 int status; 959 960 spin_lock_irqsave(&lockres->l_lock, flags); 961 962 status = ocfs2_dlm_lock_status(&lockres->l_lksb); 963 964 if (status == -EAGAIN) { 965 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 966 goto out; 967 } 968 969 if (status) { 970 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n", 971 lockres->l_name, status); 972 spin_unlock_irqrestore(&lockres->l_lock, flags); 973 return; 974 } 975 976 switch(lockres->l_action) { 977 case OCFS2_AST_ATTACH: 978 ocfs2_generic_handle_attach_action(lockres); 979 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 980 break; 981 case OCFS2_AST_CONVERT: 982 ocfs2_generic_handle_convert_action(lockres); 983 break; 984 case OCFS2_AST_DOWNCONVERT: 985 ocfs2_generic_handle_downconvert_action(lockres); 986 break; 987 default: 988 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u " 989 "lockres flags = 0x%lx, unlock action: %u\n", 990 lockres->l_name, lockres->l_action, lockres->l_flags, 991 lockres->l_unlock_action); 992 BUG(); 993 } 994 out: 995 /* set it to something invalid so if we get called again we 996 * can catch it. */ 997 lockres->l_action = OCFS2_AST_INVALID; 998 999 /* Did we try to cancel this lock? Clear that state */ 1000 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) 1001 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1002 1003 /* 1004 * We may have beaten the locking functions here. We certainly 1005 * know that dlm_lock() has been called :-) 1006 * Because we can't have two lock calls in flight at once, we 1007 * can use lockres->l_pending_gen. 1008 */ 1009 __lockres_clear_pending(lockres, lockres->l_pending_gen, osb); 1010 1011 wake_up(&lockres->l_event); 1012 spin_unlock_irqrestore(&lockres->l_lock, flags); 1013 } 1014 1015 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 1016 int convert) 1017 { 1018 unsigned long flags; 1019 1020 mlog_entry_void(); 1021 spin_lock_irqsave(&lockres->l_lock, flags); 1022 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1023 if (convert) 1024 lockres->l_action = OCFS2_AST_INVALID; 1025 else 1026 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1027 spin_unlock_irqrestore(&lockres->l_lock, flags); 1028 1029 wake_up(&lockres->l_event); 1030 mlog_exit_void(); 1031 } 1032 1033 /* Note: If we detect another process working on the lock (i.e., 1034 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 1035 * to do the right thing in that case. 1036 */ 1037 static int ocfs2_lock_create(struct ocfs2_super *osb, 1038 struct ocfs2_lock_res *lockres, 1039 int level, 1040 u32 dlm_flags) 1041 { 1042 int ret = 0; 1043 unsigned long flags; 1044 unsigned int gen; 1045 1046 mlog_entry_void(); 1047 1048 mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level, 1049 dlm_flags); 1050 1051 spin_lock_irqsave(&lockres->l_lock, flags); 1052 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 1053 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 1054 spin_unlock_irqrestore(&lockres->l_lock, flags); 1055 goto bail; 1056 } 1057 1058 lockres->l_action = OCFS2_AST_ATTACH; 1059 lockres->l_requested = level; 1060 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1061 gen = lockres_set_pending(lockres); 1062 spin_unlock_irqrestore(&lockres->l_lock, flags); 1063 1064 ret = ocfs2_dlm_lock(osb->cconn, 1065 level, 1066 &lockres->l_lksb, 1067 dlm_flags, 1068 lockres->l_name, 1069 OCFS2_LOCK_ID_MAX_LEN - 1, 1070 lockres); 1071 lockres_clear_pending(lockres, gen, osb); 1072 if (ret) { 1073 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1074 ocfs2_recover_from_dlm_error(lockres, 1); 1075 } 1076 1077 mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name); 1078 1079 bail: 1080 mlog_exit(ret); 1081 return ret; 1082 } 1083 1084 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 1085 int flag) 1086 { 1087 unsigned long flags; 1088 int ret; 1089 1090 spin_lock_irqsave(&lockres->l_lock, flags); 1091 ret = lockres->l_flags & flag; 1092 spin_unlock_irqrestore(&lockres->l_lock, flags); 1093 1094 return ret; 1095 } 1096 1097 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 1098 1099 { 1100 wait_event(lockres->l_event, 1101 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 1102 } 1103 1104 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 1105 1106 { 1107 wait_event(lockres->l_event, 1108 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 1109 } 1110 1111 /* predict what lock level we'll be dropping down to on behalf 1112 * of another node, and return true if the currently wanted 1113 * level will be compatible with it. */ 1114 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 1115 int wanted) 1116 { 1117 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 1118 1119 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 1120 } 1121 1122 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 1123 { 1124 INIT_LIST_HEAD(&mw->mw_item); 1125 init_completion(&mw->mw_complete); 1126 ocfs2_init_start_time(mw); 1127 } 1128 1129 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 1130 { 1131 wait_for_completion(&mw->mw_complete); 1132 /* Re-arm the completion in case we want to wait on it again */ 1133 INIT_COMPLETION(mw->mw_complete); 1134 return mw->mw_status; 1135 } 1136 1137 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 1138 struct ocfs2_mask_waiter *mw, 1139 unsigned long mask, 1140 unsigned long goal) 1141 { 1142 BUG_ON(!list_empty(&mw->mw_item)); 1143 1144 assert_spin_locked(&lockres->l_lock); 1145 1146 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 1147 mw->mw_mask = mask; 1148 mw->mw_goal = goal; 1149 } 1150 1151 /* returns 0 if the mw that was removed was already satisfied, -EBUSY 1152 * if the mask still hadn't reached its goal */ 1153 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1154 struct ocfs2_mask_waiter *mw) 1155 { 1156 unsigned long flags; 1157 int ret = 0; 1158 1159 spin_lock_irqsave(&lockres->l_lock, flags); 1160 if (!list_empty(&mw->mw_item)) { 1161 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 1162 ret = -EBUSY; 1163 1164 list_del_init(&mw->mw_item); 1165 init_completion(&mw->mw_complete); 1166 } 1167 spin_unlock_irqrestore(&lockres->l_lock, flags); 1168 1169 return ret; 1170 1171 } 1172 1173 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, 1174 struct ocfs2_lock_res *lockres) 1175 { 1176 int ret; 1177 1178 ret = wait_for_completion_interruptible(&mw->mw_complete); 1179 if (ret) 1180 lockres_remove_mask_waiter(lockres, mw); 1181 else 1182 ret = mw->mw_status; 1183 /* Re-arm the completion in case we want to wait on it again */ 1184 INIT_COMPLETION(mw->mw_complete); 1185 return ret; 1186 } 1187 1188 static int ocfs2_cluster_lock(struct ocfs2_super *osb, 1189 struct ocfs2_lock_res *lockres, 1190 int level, 1191 u32 lkm_flags, 1192 int arg_flags) 1193 { 1194 struct ocfs2_mask_waiter mw; 1195 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1196 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1197 unsigned long flags; 1198 unsigned int gen; 1199 int noqueue_attempted = 0; 1200 1201 mlog_entry_void(); 1202 1203 ocfs2_init_mask_waiter(&mw); 1204 1205 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 1206 lkm_flags |= DLM_LKF_VALBLK; 1207 1208 again: 1209 wait = 0; 1210 1211 if (catch_signals && signal_pending(current)) { 1212 ret = -ERESTARTSYS; 1213 goto out; 1214 } 1215 1216 spin_lock_irqsave(&lockres->l_lock, flags); 1217 1218 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1219 "Cluster lock called on freeing lockres %s! flags " 1220 "0x%lx\n", lockres->l_name, lockres->l_flags); 1221 1222 /* We only compare against the currently granted level 1223 * here. If the lock is blocked waiting on a downconvert, 1224 * we'll get caught below. */ 1225 if (lockres->l_flags & OCFS2_LOCK_BUSY && 1226 level > lockres->l_level) { 1227 /* is someone sitting in dlm_lock? If so, wait on 1228 * them. */ 1229 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1230 wait = 1; 1231 goto unlock; 1232 } 1233 1234 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1235 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1236 /* is the lock is currently blocked on behalf of 1237 * another node */ 1238 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 1239 wait = 1; 1240 goto unlock; 1241 } 1242 1243 if (level > lockres->l_level) { 1244 if (noqueue_attempted > 0) { 1245 ret = -EAGAIN; 1246 goto unlock; 1247 } 1248 if (lkm_flags & DLM_LKF_NOQUEUE) 1249 noqueue_attempted = 1; 1250 1251 if (lockres->l_action != OCFS2_AST_INVALID) 1252 mlog(ML_ERROR, "lockres %s has action %u pending\n", 1253 lockres->l_name, lockres->l_action); 1254 1255 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1256 lockres->l_action = OCFS2_AST_ATTACH; 1257 lkm_flags &= ~DLM_LKF_CONVERT; 1258 } else { 1259 lockres->l_action = OCFS2_AST_CONVERT; 1260 lkm_flags |= DLM_LKF_CONVERT; 1261 } 1262 1263 lockres->l_requested = level; 1264 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1265 gen = lockres_set_pending(lockres); 1266 spin_unlock_irqrestore(&lockres->l_lock, flags); 1267 1268 BUG_ON(level == DLM_LOCK_IV); 1269 BUG_ON(level == DLM_LOCK_NL); 1270 1271 mlog(0, "lock %s, convert from %d to level = %d\n", 1272 lockres->l_name, lockres->l_level, level); 1273 1274 /* call dlm_lock to upgrade lock now */ 1275 ret = ocfs2_dlm_lock(osb->cconn, 1276 level, 1277 &lockres->l_lksb, 1278 lkm_flags, 1279 lockres->l_name, 1280 OCFS2_LOCK_ID_MAX_LEN - 1, 1281 lockres); 1282 lockres_clear_pending(lockres, gen, osb); 1283 if (ret) { 1284 if (!(lkm_flags & DLM_LKF_NOQUEUE) || 1285 (ret != -EAGAIN)) { 1286 ocfs2_log_dlm_error("ocfs2_dlm_lock", 1287 ret, lockres); 1288 } 1289 ocfs2_recover_from_dlm_error(lockres, 1); 1290 goto out; 1291 } 1292 1293 mlog(0, "lock %s, successfull return from ocfs2_dlm_lock\n", 1294 lockres->l_name); 1295 1296 /* At this point we've gone inside the dlm and need to 1297 * complete our work regardless. */ 1298 catch_signals = 0; 1299 1300 /* wait for busy to clear and carry on */ 1301 goto again; 1302 } 1303 1304 /* Ok, if we get here then we're good to go. */ 1305 ocfs2_inc_holders(lockres, level); 1306 1307 ret = 0; 1308 unlock: 1309 spin_unlock_irqrestore(&lockres->l_lock, flags); 1310 out: 1311 /* 1312 * This is helping work around a lock inversion between the page lock 1313 * and dlm locks. One path holds the page lock while calling aops 1314 * which block acquiring dlm locks. The voting thread holds dlm 1315 * locks while acquiring page locks while down converting data locks. 1316 * This block is helping an aop path notice the inversion and back 1317 * off to unlock its page lock before trying the dlm lock again. 1318 */ 1319 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1320 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1321 wait = 0; 1322 if (lockres_remove_mask_waiter(lockres, &mw)) 1323 ret = -EAGAIN; 1324 else 1325 goto again; 1326 } 1327 if (wait) { 1328 ret = ocfs2_wait_for_mask(&mw); 1329 if (ret == 0) 1330 goto again; 1331 mlog_errno(ret); 1332 } 1333 ocfs2_update_lock_stats(lockres, level, &mw, ret); 1334 1335 mlog_exit(ret); 1336 return ret; 1337 } 1338 1339 static void ocfs2_cluster_unlock(struct ocfs2_super *osb, 1340 struct ocfs2_lock_res *lockres, 1341 int level) 1342 { 1343 unsigned long flags; 1344 1345 mlog_entry_void(); 1346 spin_lock_irqsave(&lockres->l_lock, flags); 1347 ocfs2_dec_holders(lockres, level); 1348 ocfs2_downconvert_on_unlock(osb, lockres); 1349 spin_unlock_irqrestore(&lockres->l_lock, flags); 1350 mlog_exit_void(); 1351 } 1352 1353 static int ocfs2_create_new_lock(struct ocfs2_super *osb, 1354 struct ocfs2_lock_res *lockres, 1355 int ex, 1356 int local) 1357 { 1358 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1359 unsigned long flags; 1360 u32 lkm_flags = local ? DLM_LKF_LOCAL : 0; 1361 1362 spin_lock_irqsave(&lockres->l_lock, flags); 1363 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1364 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1365 spin_unlock_irqrestore(&lockres->l_lock, flags); 1366 1367 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1368 } 1369 1370 /* Grants us an EX lock on the data and metadata resources, skipping 1371 * the normal cluster directory lookup. Use this ONLY on newly created 1372 * inodes which other nodes can't possibly see, and which haven't been 1373 * hashed in the inode hash yet. This can give us a good performance 1374 * increase as it'll skip the network broadcast normally associated 1375 * with creating a new lock resource. */ 1376 int ocfs2_create_new_inode_locks(struct inode *inode) 1377 { 1378 int ret; 1379 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1380 1381 BUG_ON(!inode); 1382 BUG_ON(!ocfs2_inode_is_new(inode)); 1383 1384 mlog_entry_void(); 1385 1386 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1387 1388 /* NOTE: That we don't increment any of the holder counts, nor 1389 * do we add anything to a journal handle. Since this is 1390 * supposed to be a new inode which the cluster doesn't know 1391 * about yet, there is no need to. As far as the LVB handling 1392 * is concerned, this is basically like acquiring an EX lock 1393 * on a resource which has an invalid one -- we'll set it 1394 * valid when we release the EX. */ 1395 1396 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1397 if (ret) { 1398 mlog_errno(ret); 1399 goto bail; 1400 } 1401 1402 /* 1403 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they 1404 * don't use a generation in their lock names. 1405 */ 1406 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); 1407 if (ret) { 1408 mlog_errno(ret); 1409 goto bail; 1410 } 1411 1412 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0); 1413 if (ret) { 1414 mlog_errno(ret); 1415 goto bail; 1416 } 1417 1418 bail: 1419 mlog_exit(ret); 1420 return ret; 1421 } 1422 1423 int ocfs2_rw_lock(struct inode *inode, int write) 1424 { 1425 int status, level; 1426 struct ocfs2_lock_res *lockres; 1427 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1428 1429 BUG_ON(!inode); 1430 1431 mlog_entry_void(); 1432 1433 mlog(0, "inode %llu take %s RW lock\n", 1434 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1435 write ? "EXMODE" : "PRMODE"); 1436 1437 if (ocfs2_mount_local(osb)) 1438 return 0; 1439 1440 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1441 1442 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1443 1444 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0, 1445 0); 1446 if (status < 0) 1447 mlog_errno(status); 1448 1449 mlog_exit(status); 1450 return status; 1451 } 1452 1453 void ocfs2_rw_unlock(struct inode *inode, int write) 1454 { 1455 int level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1456 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1457 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1458 1459 mlog_entry_void(); 1460 1461 mlog(0, "inode %llu drop %s RW lock\n", 1462 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1463 write ? "EXMODE" : "PRMODE"); 1464 1465 if (!ocfs2_mount_local(osb)) 1466 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1467 1468 mlog_exit_void(); 1469 } 1470 1471 /* 1472 * ocfs2_open_lock always get PR mode lock. 1473 */ 1474 int ocfs2_open_lock(struct inode *inode) 1475 { 1476 int status = 0; 1477 struct ocfs2_lock_res *lockres; 1478 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1479 1480 BUG_ON(!inode); 1481 1482 mlog_entry_void(); 1483 1484 mlog(0, "inode %llu take PRMODE open lock\n", 1485 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1486 1487 if (ocfs2_mount_local(osb)) 1488 goto out; 1489 1490 lockres = &OCFS2_I(inode)->ip_open_lockres; 1491 1492 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1493 DLM_LOCK_PR, 0, 0); 1494 if (status < 0) 1495 mlog_errno(status); 1496 1497 out: 1498 mlog_exit(status); 1499 return status; 1500 } 1501 1502 int ocfs2_try_open_lock(struct inode *inode, int write) 1503 { 1504 int status = 0, level; 1505 struct ocfs2_lock_res *lockres; 1506 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1507 1508 BUG_ON(!inode); 1509 1510 mlog_entry_void(); 1511 1512 mlog(0, "inode %llu try to take %s open lock\n", 1513 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1514 write ? "EXMODE" : "PRMODE"); 1515 1516 if (ocfs2_mount_local(osb)) 1517 goto out; 1518 1519 lockres = &OCFS2_I(inode)->ip_open_lockres; 1520 1521 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1522 1523 /* 1524 * The file system may already holding a PRMODE/EXMODE open lock. 1525 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on 1526 * other nodes and the -EAGAIN will indicate to the caller that 1527 * this inode is still in use. 1528 */ 1529 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1530 level, DLM_LKF_NOQUEUE, 0); 1531 1532 out: 1533 mlog_exit(status); 1534 return status; 1535 } 1536 1537 /* 1538 * ocfs2_open_unlock unlock PR and EX mode open locks. 1539 */ 1540 void ocfs2_open_unlock(struct inode *inode) 1541 { 1542 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres; 1543 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1544 1545 mlog_entry_void(); 1546 1547 mlog(0, "inode %llu drop open lock\n", 1548 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1549 1550 if (ocfs2_mount_local(osb)) 1551 goto out; 1552 1553 if(lockres->l_ro_holders) 1554 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1555 DLM_LOCK_PR); 1556 if(lockres->l_ex_holders) 1557 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1558 DLM_LOCK_EX); 1559 1560 out: 1561 mlog_exit_void(); 1562 } 1563 1564 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, 1565 int level) 1566 { 1567 int ret; 1568 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1569 unsigned long flags; 1570 struct ocfs2_mask_waiter mw; 1571 1572 ocfs2_init_mask_waiter(&mw); 1573 1574 retry_cancel: 1575 spin_lock_irqsave(&lockres->l_lock, flags); 1576 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 1577 ret = ocfs2_prepare_cancel_convert(osb, lockres); 1578 if (ret) { 1579 spin_unlock_irqrestore(&lockres->l_lock, flags); 1580 ret = ocfs2_cancel_convert(osb, lockres); 1581 if (ret < 0) { 1582 mlog_errno(ret); 1583 goto out; 1584 } 1585 goto retry_cancel; 1586 } 1587 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1588 spin_unlock_irqrestore(&lockres->l_lock, flags); 1589 1590 ocfs2_wait_for_mask(&mw); 1591 goto retry_cancel; 1592 } 1593 1594 ret = -ERESTARTSYS; 1595 /* 1596 * We may still have gotten the lock, in which case there's no 1597 * point to restarting the syscall. 1598 */ 1599 if (lockres->l_level == level) 1600 ret = 0; 1601 1602 mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret, 1603 lockres->l_flags, lockres->l_level, lockres->l_action); 1604 1605 spin_unlock_irqrestore(&lockres->l_lock, flags); 1606 1607 out: 1608 return ret; 1609 } 1610 1611 /* 1612 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of 1613 * flock() calls. The locking approach this requires is sufficiently 1614 * different from all other cluster lock types that we implement a 1615 * seperate path to the "low-level" dlm calls. In particular: 1616 * 1617 * - No optimization of lock levels is done - we take at exactly 1618 * what's been requested. 1619 * 1620 * - No lock caching is employed. We immediately downconvert to 1621 * no-lock at unlock time. This also means flock locks never go on 1622 * the blocking list). 1623 * 1624 * - Since userspace can trivially deadlock itself with flock, we make 1625 * sure to allow cancellation of a misbehaving applications flock() 1626 * request. 1627 * 1628 * - Access to any flock lockres doesn't require concurrency, so we 1629 * can simplify the code by requiring the caller to guarantee 1630 * serialization of dlmglue flock calls. 1631 */ 1632 int ocfs2_file_lock(struct file *file, int ex, int trylock) 1633 { 1634 int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1635 unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0; 1636 unsigned long flags; 1637 struct ocfs2_file_private *fp = file->private_data; 1638 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1639 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1640 struct ocfs2_mask_waiter mw; 1641 1642 ocfs2_init_mask_waiter(&mw); 1643 1644 if ((lockres->l_flags & OCFS2_LOCK_BUSY) || 1645 (lockres->l_level > DLM_LOCK_NL)) { 1646 mlog(ML_ERROR, 1647 "File lock \"%s\" has busy or locked state: flags: 0x%lx, " 1648 "level: %u\n", lockres->l_name, lockres->l_flags, 1649 lockres->l_level); 1650 return -EINVAL; 1651 } 1652 1653 spin_lock_irqsave(&lockres->l_lock, flags); 1654 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1655 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1656 spin_unlock_irqrestore(&lockres->l_lock, flags); 1657 1658 /* 1659 * Get the lock at NLMODE to start - that way we 1660 * can cancel the upconvert request if need be. 1661 */ 1662 ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0); 1663 if (ret < 0) { 1664 mlog_errno(ret); 1665 goto out; 1666 } 1667 1668 ret = ocfs2_wait_for_mask(&mw); 1669 if (ret) { 1670 mlog_errno(ret); 1671 goto out; 1672 } 1673 spin_lock_irqsave(&lockres->l_lock, flags); 1674 } 1675 1676 lockres->l_action = OCFS2_AST_CONVERT; 1677 lkm_flags |= DLM_LKF_CONVERT; 1678 lockres->l_requested = level; 1679 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1680 1681 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1682 spin_unlock_irqrestore(&lockres->l_lock, flags); 1683 1684 ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags, 1685 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1, 1686 lockres); 1687 if (ret) { 1688 if (!trylock || (ret != -EAGAIN)) { 1689 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1690 ret = -EINVAL; 1691 } 1692 1693 ocfs2_recover_from_dlm_error(lockres, 1); 1694 lockres_remove_mask_waiter(lockres, &mw); 1695 goto out; 1696 } 1697 1698 ret = ocfs2_wait_for_mask_interruptible(&mw, lockres); 1699 if (ret == -ERESTARTSYS) { 1700 /* 1701 * Userspace can cause deadlock itself with 1702 * flock(). Current behavior locally is to allow the 1703 * deadlock, but abort the system call if a signal is 1704 * received. We follow this example, otherwise a 1705 * poorly written program could sit in kernel until 1706 * reboot. 1707 * 1708 * Handling this is a bit more complicated for Ocfs2 1709 * though. We can't exit this function with an 1710 * outstanding lock request, so a cancel convert is 1711 * required. We intentionally overwrite 'ret' - if the 1712 * cancel fails and the lock was granted, it's easier 1713 * to just bubble sucess back up to the user. 1714 */ 1715 ret = ocfs2_flock_handle_signal(lockres, level); 1716 } else if (!ret && (level > lockres->l_level)) { 1717 /* Trylock failed asynchronously */ 1718 BUG_ON(!trylock); 1719 ret = -EAGAIN; 1720 } 1721 1722 out: 1723 1724 mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n", 1725 lockres->l_name, ex, trylock, ret); 1726 return ret; 1727 } 1728 1729 void ocfs2_file_unlock(struct file *file) 1730 { 1731 int ret; 1732 unsigned int gen; 1733 unsigned long flags; 1734 struct ocfs2_file_private *fp = file->private_data; 1735 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1736 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1737 struct ocfs2_mask_waiter mw; 1738 1739 ocfs2_init_mask_waiter(&mw); 1740 1741 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) 1742 return; 1743 1744 if (lockres->l_level == DLM_LOCK_NL) 1745 return; 1746 1747 mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", 1748 lockres->l_name, lockres->l_flags, lockres->l_level, 1749 lockres->l_action); 1750 1751 spin_lock_irqsave(&lockres->l_lock, flags); 1752 /* 1753 * Fake a blocking ast for the downconvert code. 1754 */ 1755 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 1756 lockres->l_blocking = DLM_LOCK_EX; 1757 1758 gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL); 1759 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1760 spin_unlock_irqrestore(&lockres->l_lock, flags); 1761 1762 ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen); 1763 if (ret) { 1764 mlog_errno(ret); 1765 return; 1766 } 1767 1768 ret = ocfs2_wait_for_mask(&mw); 1769 if (ret) 1770 mlog_errno(ret); 1771 } 1772 1773 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 1774 struct ocfs2_lock_res *lockres) 1775 { 1776 int kick = 0; 1777 1778 mlog_entry_void(); 1779 1780 /* If we know that another node is waiting on our lock, kick 1781 * the downconvert thread * pre-emptively when we reach a release 1782 * condition. */ 1783 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 1784 switch(lockres->l_blocking) { 1785 case DLM_LOCK_EX: 1786 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 1787 kick = 1; 1788 break; 1789 case DLM_LOCK_PR: 1790 if (!lockres->l_ex_holders) 1791 kick = 1; 1792 break; 1793 default: 1794 BUG(); 1795 } 1796 } 1797 1798 if (kick) 1799 ocfs2_wake_downconvert_thread(osb); 1800 1801 mlog_exit_void(); 1802 } 1803 1804 #define OCFS2_SEC_BITS 34 1805 #define OCFS2_SEC_SHIFT (64 - 34) 1806 #define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 1807 1808 /* LVB only has room for 64 bits of time here so we pack it for 1809 * now. */ 1810 static u64 ocfs2_pack_timespec(struct timespec *spec) 1811 { 1812 u64 res; 1813 u64 sec = spec->tv_sec; 1814 u32 nsec = spec->tv_nsec; 1815 1816 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 1817 1818 return res; 1819 } 1820 1821 /* Call this with the lockres locked. I am reasonably sure we don't 1822 * need ip_lock in this function as anyone who would be changing those 1823 * values is supposed to be blocked in ocfs2_inode_lock right now. */ 1824 static void __ocfs2_stuff_meta_lvb(struct inode *inode) 1825 { 1826 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1827 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 1828 struct ocfs2_meta_lvb *lvb; 1829 1830 mlog_entry_void(); 1831 1832 lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb); 1833 1834 /* 1835 * Invalidate the LVB of a deleted inode - this way other 1836 * nodes are forced to go to disk and discover the new inode 1837 * status. 1838 */ 1839 if (oi->ip_flags & OCFS2_INODE_DELETED) { 1840 lvb->lvb_version = 0; 1841 goto out; 1842 } 1843 1844 lvb->lvb_version = OCFS2_LVB_VERSION; 1845 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 1846 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 1847 lvb->lvb_iuid = cpu_to_be32(inode->i_uid); 1848 lvb->lvb_igid = cpu_to_be32(inode->i_gid); 1849 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 1850 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 1851 lvb->lvb_iatime_packed = 1852 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime)); 1853 lvb->lvb_ictime_packed = 1854 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime)); 1855 lvb->lvb_imtime_packed = 1856 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime)); 1857 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 1858 lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features); 1859 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 1860 1861 out: 1862 mlog_meta_lvb(0, lockres); 1863 1864 mlog_exit_void(); 1865 } 1866 1867 static void ocfs2_unpack_timespec(struct timespec *spec, 1868 u64 packed_time) 1869 { 1870 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 1871 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 1872 } 1873 1874 static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 1875 { 1876 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1877 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 1878 struct ocfs2_meta_lvb *lvb; 1879 1880 mlog_entry_void(); 1881 1882 mlog_meta_lvb(0, lockres); 1883 1884 lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb); 1885 1886 /* We're safe here without the lockres lock... */ 1887 spin_lock(&oi->ip_lock); 1888 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 1889 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 1890 1891 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 1892 oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures); 1893 ocfs2_set_inode_flags(inode); 1894 1895 /* fast-symlinks are a special case */ 1896 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 1897 inode->i_blocks = 0; 1898 else 1899 inode->i_blocks = ocfs2_inode_sector_count(inode); 1900 1901 inode->i_uid = be32_to_cpu(lvb->lvb_iuid); 1902 inode->i_gid = be32_to_cpu(lvb->lvb_igid); 1903 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 1904 inode->i_nlink = be16_to_cpu(lvb->lvb_inlink); 1905 ocfs2_unpack_timespec(&inode->i_atime, 1906 be64_to_cpu(lvb->lvb_iatime_packed)); 1907 ocfs2_unpack_timespec(&inode->i_mtime, 1908 be64_to_cpu(lvb->lvb_imtime_packed)); 1909 ocfs2_unpack_timespec(&inode->i_ctime, 1910 be64_to_cpu(lvb->lvb_ictime_packed)); 1911 spin_unlock(&oi->ip_lock); 1912 1913 mlog_exit_void(); 1914 } 1915 1916 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 1917 struct ocfs2_lock_res *lockres) 1918 { 1919 struct ocfs2_meta_lvb *lvb = 1920 (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb); 1921 1922 if (lvb->lvb_version == OCFS2_LVB_VERSION 1923 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 1924 return 1; 1925 return 0; 1926 } 1927 1928 /* Determine whether a lock resource needs to be refreshed, and 1929 * arbitrate who gets to refresh it. 1930 * 1931 * 0 means no refresh needed. 1932 * 1933 * > 0 means you need to refresh this and you MUST call 1934 * ocfs2_complete_lock_res_refresh afterwards. */ 1935 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 1936 { 1937 unsigned long flags; 1938 int status = 0; 1939 1940 mlog_entry_void(); 1941 1942 refresh_check: 1943 spin_lock_irqsave(&lockres->l_lock, flags); 1944 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 1945 spin_unlock_irqrestore(&lockres->l_lock, flags); 1946 goto bail; 1947 } 1948 1949 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 1950 spin_unlock_irqrestore(&lockres->l_lock, flags); 1951 1952 ocfs2_wait_on_refreshing_lock(lockres); 1953 goto refresh_check; 1954 } 1955 1956 /* Ok, I'll be the one to refresh this lock. */ 1957 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 1958 spin_unlock_irqrestore(&lockres->l_lock, flags); 1959 1960 status = 1; 1961 bail: 1962 mlog_exit(status); 1963 return status; 1964 } 1965 1966 /* If status is non zero, I'll mark it as not being in refresh 1967 * anymroe, but i won't clear the needs refresh flag. */ 1968 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 1969 int status) 1970 { 1971 unsigned long flags; 1972 mlog_entry_void(); 1973 1974 spin_lock_irqsave(&lockres->l_lock, flags); 1975 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 1976 if (!status) 1977 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 1978 spin_unlock_irqrestore(&lockres->l_lock, flags); 1979 1980 wake_up(&lockres->l_event); 1981 1982 mlog_exit_void(); 1983 } 1984 1985 /* may or may not return a bh if it went to disk. */ 1986 static int ocfs2_inode_lock_update(struct inode *inode, 1987 struct buffer_head **bh) 1988 { 1989 int status = 0; 1990 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1991 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 1992 struct ocfs2_dinode *fe; 1993 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1994 1995 mlog_entry_void(); 1996 1997 if (ocfs2_mount_local(osb)) 1998 goto bail; 1999 2000 spin_lock(&oi->ip_lock); 2001 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2002 mlog(0, "Orphaned inode %llu was deleted while we " 2003 "were waiting on a lock. ip_flags = 0x%x\n", 2004 (unsigned long long)oi->ip_blkno, oi->ip_flags); 2005 spin_unlock(&oi->ip_lock); 2006 status = -ENOENT; 2007 goto bail; 2008 } 2009 spin_unlock(&oi->ip_lock); 2010 2011 if (!ocfs2_should_refresh_lock_res(lockres)) 2012 goto bail; 2013 2014 /* This will discard any caching information we might have had 2015 * for the inode metadata. */ 2016 ocfs2_metadata_cache_purge(inode); 2017 2018 ocfs2_extent_map_trunc(inode, 0); 2019 2020 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 2021 mlog(0, "Trusting LVB on inode %llu\n", 2022 (unsigned long long)oi->ip_blkno); 2023 ocfs2_refresh_inode_from_lvb(inode); 2024 } else { 2025 /* Boo, we have to go to disk. */ 2026 /* read bh, cast, ocfs2_refresh_inode */ 2027 status = ocfs2_read_block(inode, oi->ip_blkno, bh); 2028 if (status < 0) { 2029 mlog_errno(status); 2030 goto bail_refresh; 2031 } 2032 fe = (struct ocfs2_dinode *) (*bh)->b_data; 2033 2034 /* This is a good chance to make sure we're not 2035 * locking an invalid object. 2036 * 2037 * We bug on a stale inode here because we checked 2038 * above whether it was wiped from disk. The wiping 2039 * node provides a guarantee that we receive that 2040 * message and can mark the inode before dropping any 2041 * locks associated with it. */ 2042 if (!OCFS2_IS_VALID_DINODE(fe)) { 2043 OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe); 2044 status = -EIO; 2045 goto bail_refresh; 2046 } 2047 mlog_bug_on_msg(inode->i_generation != 2048 le32_to_cpu(fe->i_generation), 2049 "Invalid dinode %llu disk generation: %u " 2050 "inode->i_generation: %u\n", 2051 (unsigned long long)oi->ip_blkno, 2052 le32_to_cpu(fe->i_generation), 2053 inode->i_generation); 2054 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 2055 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 2056 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 2057 (unsigned long long)oi->ip_blkno, 2058 (unsigned long long)le64_to_cpu(fe->i_dtime), 2059 le32_to_cpu(fe->i_flags)); 2060 2061 ocfs2_refresh_inode(inode, fe); 2062 ocfs2_track_lock_refresh(lockres); 2063 } 2064 2065 status = 0; 2066 bail_refresh: 2067 ocfs2_complete_lock_res_refresh(lockres, status); 2068 bail: 2069 mlog_exit(status); 2070 return status; 2071 } 2072 2073 static int ocfs2_assign_bh(struct inode *inode, 2074 struct buffer_head **ret_bh, 2075 struct buffer_head *passed_bh) 2076 { 2077 int status; 2078 2079 if (passed_bh) { 2080 /* Ok, the update went to disk for us, use the 2081 * returned bh. */ 2082 *ret_bh = passed_bh; 2083 get_bh(*ret_bh); 2084 2085 return 0; 2086 } 2087 2088 status = ocfs2_read_block(inode, OCFS2_I(inode)->ip_blkno, ret_bh); 2089 if (status < 0) 2090 mlog_errno(status); 2091 2092 return status; 2093 } 2094 2095 /* 2096 * returns < 0 error if the callback will never be called, otherwise 2097 * the result of the lock will be communicated via the callback. 2098 */ 2099 int ocfs2_inode_lock_full(struct inode *inode, 2100 struct buffer_head **ret_bh, 2101 int ex, 2102 int arg_flags) 2103 { 2104 int status, level, acquired; 2105 u32 dlm_flags; 2106 struct ocfs2_lock_res *lockres = NULL; 2107 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2108 struct buffer_head *local_bh = NULL; 2109 2110 BUG_ON(!inode); 2111 2112 mlog_entry_void(); 2113 2114 mlog(0, "inode %llu, take %s META lock\n", 2115 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2116 ex ? "EXMODE" : "PRMODE"); 2117 2118 status = 0; 2119 acquired = 0; 2120 /* We'll allow faking a readonly metadata lock for 2121 * rodevices. */ 2122 if (ocfs2_is_hard_readonly(osb)) { 2123 if (ex) 2124 status = -EROFS; 2125 goto bail; 2126 } 2127 2128 if (ocfs2_mount_local(osb)) 2129 goto local; 2130 2131 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2132 ocfs2_wait_for_recovery(osb); 2133 2134 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2135 level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2136 dlm_flags = 0; 2137 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 2138 dlm_flags |= DLM_LKF_NOQUEUE; 2139 2140 status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags); 2141 if (status < 0) { 2142 if (status != -EAGAIN && status != -EIOCBRETRY) 2143 mlog_errno(status); 2144 goto bail; 2145 } 2146 2147 /* Notify the error cleanup path to drop the cluster lock. */ 2148 acquired = 1; 2149 2150 /* We wait twice because a node may have died while we were in 2151 * the lower dlm layers. The second time though, we've 2152 * committed to owning this lock so we don't allow signals to 2153 * abort the operation. */ 2154 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2155 ocfs2_wait_for_recovery(osb); 2156 2157 local: 2158 /* 2159 * We only see this flag if we're being called from 2160 * ocfs2_read_locked_inode(). It means we're locking an inode 2161 * which hasn't been populated yet, so clear the refresh flag 2162 * and let the caller handle it. 2163 */ 2164 if (inode->i_state & I_NEW) { 2165 status = 0; 2166 if (lockres) 2167 ocfs2_complete_lock_res_refresh(lockres, 0); 2168 goto bail; 2169 } 2170 2171 /* This is fun. The caller may want a bh back, or it may 2172 * not. ocfs2_inode_lock_update definitely wants one in, but 2173 * may or may not read one, depending on what's in the 2174 * LVB. The result of all of this is that we've *only* gone to 2175 * disk if we have to, so the complexity is worthwhile. */ 2176 status = ocfs2_inode_lock_update(inode, &local_bh); 2177 if (status < 0) { 2178 if (status != -ENOENT) 2179 mlog_errno(status); 2180 goto bail; 2181 } 2182 2183 if (ret_bh) { 2184 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 2185 if (status < 0) { 2186 mlog_errno(status); 2187 goto bail; 2188 } 2189 } 2190 2191 bail: 2192 if (status < 0) { 2193 if (ret_bh && (*ret_bh)) { 2194 brelse(*ret_bh); 2195 *ret_bh = NULL; 2196 } 2197 if (acquired) 2198 ocfs2_inode_unlock(inode, ex); 2199 } 2200 2201 if (local_bh) 2202 brelse(local_bh); 2203 2204 mlog_exit(status); 2205 return status; 2206 } 2207 2208 /* 2209 * This is working around a lock inversion between tasks acquiring DLM 2210 * locks while holding a page lock and the downconvert thread which 2211 * blocks dlm lock acquiry while acquiring page locks. 2212 * 2213 * ** These _with_page variantes are only intended to be called from aop 2214 * methods that hold page locks and return a very specific *positive* error 2215 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 2216 * 2217 * The DLM is called such that it returns -EAGAIN if it would have 2218 * blocked waiting for the downconvert thread. In that case we unlock 2219 * our page so the downconvert thread can make progress. Once we've 2220 * done this we have to return AOP_TRUNCATED_PAGE so the aop method 2221 * that called us can bubble that back up into the VFS who will then 2222 * immediately retry the aop call. 2223 * 2224 * We do a blocking lock and immediate unlock before returning, though, so that 2225 * the lock has a great chance of being cached on this node by the time the VFS 2226 * calls back to retry the aop. This has a potential to livelock as nodes 2227 * ping locks back and forth, but that's a risk we're willing to take to avoid 2228 * the lock inversion simply. 2229 */ 2230 int ocfs2_inode_lock_with_page(struct inode *inode, 2231 struct buffer_head **ret_bh, 2232 int ex, 2233 struct page *page) 2234 { 2235 int ret; 2236 2237 ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); 2238 if (ret == -EAGAIN) { 2239 unlock_page(page); 2240 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0) 2241 ocfs2_inode_unlock(inode, ex); 2242 ret = AOP_TRUNCATED_PAGE; 2243 } 2244 2245 return ret; 2246 } 2247 2248 int ocfs2_inode_lock_atime(struct inode *inode, 2249 struct vfsmount *vfsmnt, 2250 int *level) 2251 { 2252 int ret; 2253 2254 mlog_entry_void(); 2255 ret = ocfs2_inode_lock(inode, NULL, 0); 2256 if (ret < 0) { 2257 mlog_errno(ret); 2258 return ret; 2259 } 2260 2261 /* 2262 * If we should update atime, we will get EX lock, 2263 * otherwise we just get PR lock. 2264 */ 2265 if (ocfs2_should_update_atime(inode, vfsmnt)) { 2266 struct buffer_head *bh = NULL; 2267 2268 ocfs2_inode_unlock(inode, 0); 2269 ret = ocfs2_inode_lock(inode, &bh, 1); 2270 if (ret < 0) { 2271 mlog_errno(ret); 2272 return ret; 2273 } 2274 *level = 1; 2275 if (ocfs2_should_update_atime(inode, vfsmnt)) 2276 ocfs2_update_inode_atime(inode, bh); 2277 if (bh) 2278 brelse(bh); 2279 } else 2280 *level = 0; 2281 2282 mlog_exit(ret); 2283 return ret; 2284 } 2285 2286 void ocfs2_inode_unlock(struct inode *inode, 2287 int ex) 2288 { 2289 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2290 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; 2291 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2292 2293 mlog_entry_void(); 2294 2295 mlog(0, "inode %llu drop %s META lock\n", 2296 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2297 ex ? "EXMODE" : "PRMODE"); 2298 2299 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) && 2300 !ocfs2_mount_local(osb)) 2301 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 2302 2303 mlog_exit_void(); 2304 } 2305 2306 int ocfs2_super_lock(struct ocfs2_super *osb, 2307 int ex) 2308 { 2309 int status = 0; 2310 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2311 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2312 2313 mlog_entry_void(); 2314 2315 if (ocfs2_is_hard_readonly(osb)) 2316 return -EROFS; 2317 2318 if (ocfs2_mount_local(osb)) 2319 goto bail; 2320 2321 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 2322 if (status < 0) { 2323 mlog_errno(status); 2324 goto bail; 2325 } 2326 2327 /* The super block lock path is really in the best position to 2328 * know when resources covered by the lock need to be 2329 * refreshed, so we do it here. Of course, making sense of 2330 * everything is up to the caller :) */ 2331 status = ocfs2_should_refresh_lock_res(lockres); 2332 if (status < 0) { 2333 mlog_errno(status); 2334 goto bail; 2335 } 2336 if (status) { 2337 status = ocfs2_refresh_slot_info(osb); 2338 2339 ocfs2_complete_lock_res_refresh(lockres, status); 2340 2341 if (status < 0) 2342 mlog_errno(status); 2343 ocfs2_track_lock_refresh(lockres); 2344 } 2345 bail: 2346 mlog_exit(status); 2347 return status; 2348 } 2349 2350 void ocfs2_super_unlock(struct ocfs2_super *osb, 2351 int ex) 2352 { 2353 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2354 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2355 2356 if (!ocfs2_mount_local(osb)) 2357 ocfs2_cluster_unlock(osb, lockres, level); 2358 } 2359 2360 int ocfs2_rename_lock(struct ocfs2_super *osb) 2361 { 2362 int status; 2363 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2364 2365 if (ocfs2_is_hard_readonly(osb)) 2366 return -EROFS; 2367 2368 if (ocfs2_mount_local(osb)) 2369 return 0; 2370 2371 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2372 if (status < 0) 2373 mlog_errno(status); 2374 2375 return status; 2376 } 2377 2378 void ocfs2_rename_unlock(struct ocfs2_super *osb) 2379 { 2380 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2381 2382 if (!ocfs2_mount_local(osb)) 2383 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2384 } 2385 2386 int ocfs2_dentry_lock(struct dentry *dentry, int ex) 2387 { 2388 int ret; 2389 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2390 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2391 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2392 2393 BUG_ON(!dl); 2394 2395 if (ocfs2_is_hard_readonly(osb)) 2396 return -EROFS; 2397 2398 if (ocfs2_mount_local(osb)) 2399 return 0; 2400 2401 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 2402 if (ret < 0) 2403 mlog_errno(ret); 2404 2405 return ret; 2406 } 2407 2408 void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 2409 { 2410 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2411 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2412 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2413 2414 if (!ocfs2_mount_local(osb)) 2415 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 2416 } 2417 2418 /* Reference counting of the dlm debug structure. We want this because 2419 * open references on the debug inodes can live on after a mount, so 2420 * we can't rely on the ocfs2_super to always exist. */ 2421 static void ocfs2_dlm_debug_free(struct kref *kref) 2422 { 2423 struct ocfs2_dlm_debug *dlm_debug; 2424 2425 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 2426 2427 kfree(dlm_debug); 2428 } 2429 2430 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 2431 { 2432 if (dlm_debug) 2433 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 2434 } 2435 2436 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 2437 { 2438 kref_get(&debug->d_refcnt); 2439 } 2440 2441 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 2442 { 2443 struct ocfs2_dlm_debug *dlm_debug; 2444 2445 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 2446 if (!dlm_debug) { 2447 mlog_errno(-ENOMEM); 2448 goto out; 2449 } 2450 2451 kref_init(&dlm_debug->d_refcnt); 2452 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 2453 dlm_debug->d_locking_state = NULL; 2454 out: 2455 return dlm_debug; 2456 } 2457 2458 /* Access to this is arbitrated for us via seq_file->sem. */ 2459 struct ocfs2_dlm_seq_priv { 2460 struct ocfs2_dlm_debug *p_dlm_debug; 2461 struct ocfs2_lock_res p_iter_res; 2462 struct ocfs2_lock_res p_tmp_res; 2463 }; 2464 2465 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 2466 struct ocfs2_dlm_seq_priv *priv) 2467 { 2468 struct ocfs2_lock_res *iter, *ret = NULL; 2469 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 2470 2471 assert_spin_locked(&ocfs2_dlm_tracking_lock); 2472 2473 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 2474 /* discover the head of the list */ 2475 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 2476 mlog(0, "End of list found, %p\n", ret); 2477 break; 2478 } 2479 2480 /* We track our "dummy" iteration lockres' by a NULL 2481 * l_ops field. */ 2482 if (iter->l_ops != NULL) { 2483 ret = iter; 2484 break; 2485 } 2486 } 2487 2488 return ret; 2489 } 2490 2491 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 2492 { 2493 struct ocfs2_dlm_seq_priv *priv = m->private; 2494 struct ocfs2_lock_res *iter; 2495 2496 spin_lock(&ocfs2_dlm_tracking_lock); 2497 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 2498 if (iter) { 2499 /* Since lockres' have the lifetime of their container 2500 * (which can be inodes, ocfs2_supers, etc) we want to 2501 * copy this out to a temporary lockres while still 2502 * under the spinlock. Obviously after this we can't 2503 * trust any pointers on the copy returned, but that's 2504 * ok as the information we want isn't typically held 2505 * in them. */ 2506 priv->p_tmp_res = *iter; 2507 iter = &priv->p_tmp_res; 2508 } 2509 spin_unlock(&ocfs2_dlm_tracking_lock); 2510 2511 return iter; 2512 } 2513 2514 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 2515 { 2516 } 2517 2518 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 2519 { 2520 struct ocfs2_dlm_seq_priv *priv = m->private; 2521 struct ocfs2_lock_res *iter = v; 2522 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 2523 2524 spin_lock(&ocfs2_dlm_tracking_lock); 2525 iter = ocfs2_dlm_next_res(iter, priv); 2526 list_del_init(&dummy->l_debug_list); 2527 if (iter) { 2528 list_add(&dummy->l_debug_list, &iter->l_debug_list); 2529 priv->p_tmp_res = *iter; 2530 iter = &priv->p_tmp_res; 2531 } 2532 spin_unlock(&ocfs2_dlm_tracking_lock); 2533 2534 return iter; 2535 } 2536 2537 /* So that debugfs.ocfs2 can determine which format is being used */ 2538 #define OCFS2_DLM_DEBUG_STR_VERSION 2 2539 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 2540 { 2541 int i; 2542 char *lvb; 2543 struct ocfs2_lock_res *lockres = v; 2544 2545 if (!lockres) 2546 return -EINVAL; 2547 2548 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 2549 2550 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 2551 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 2552 lockres->l_name, 2553 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 2554 else 2555 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 2556 2557 seq_printf(m, "%d\t" 2558 "0x%lx\t" 2559 "0x%x\t" 2560 "0x%x\t" 2561 "%u\t" 2562 "%u\t" 2563 "%d\t" 2564 "%d\t", 2565 lockres->l_level, 2566 lockres->l_flags, 2567 lockres->l_action, 2568 lockres->l_unlock_action, 2569 lockres->l_ro_holders, 2570 lockres->l_ex_holders, 2571 lockres->l_requested, 2572 lockres->l_blocking); 2573 2574 /* Dump the raw LVB */ 2575 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2576 for(i = 0; i < DLM_LVB_LEN; i++) 2577 seq_printf(m, "0x%x\t", lvb[i]); 2578 2579 #ifdef CONFIG_OCFS2_FS_STATS 2580 # define lock_num_prmode(_l) (_l)->l_lock_num_prmode 2581 # define lock_num_exmode(_l) (_l)->l_lock_num_exmode 2582 # define lock_num_prmode_failed(_l) (_l)->l_lock_num_prmode_failed 2583 # define lock_num_exmode_failed(_l) (_l)->l_lock_num_exmode_failed 2584 # define lock_total_prmode(_l) (_l)->l_lock_total_prmode 2585 # define lock_total_exmode(_l) (_l)->l_lock_total_exmode 2586 # define lock_max_prmode(_l) (_l)->l_lock_max_prmode 2587 # define lock_max_exmode(_l) (_l)->l_lock_max_exmode 2588 # define lock_refresh(_l) (_l)->l_lock_refresh 2589 #else 2590 # define lock_num_prmode(_l) (0ULL) 2591 # define lock_num_exmode(_l) (0ULL) 2592 # define lock_num_prmode_failed(_l) (0) 2593 # define lock_num_exmode_failed(_l) (0) 2594 # define lock_total_prmode(_l) (0ULL) 2595 # define lock_total_exmode(_l) (0ULL) 2596 # define lock_max_prmode(_l) (0) 2597 # define lock_max_exmode(_l) (0) 2598 # define lock_refresh(_l) (0) 2599 #endif 2600 /* The following seq_print was added in version 2 of this output */ 2601 seq_printf(m, "%llu\t" 2602 "%llu\t" 2603 "%u\t" 2604 "%u\t" 2605 "%llu\t" 2606 "%llu\t" 2607 "%u\t" 2608 "%u\t" 2609 "%u\t", 2610 lock_num_prmode(lockres), 2611 lock_num_exmode(lockres), 2612 lock_num_prmode_failed(lockres), 2613 lock_num_exmode_failed(lockres), 2614 lock_total_prmode(lockres), 2615 lock_total_exmode(lockres), 2616 lock_max_prmode(lockres), 2617 lock_max_exmode(lockres), 2618 lock_refresh(lockres)); 2619 2620 /* End the line */ 2621 seq_printf(m, "\n"); 2622 return 0; 2623 } 2624 2625 static const struct seq_operations ocfs2_dlm_seq_ops = { 2626 .start = ocfs2_dlm_seq_start, 2627 .stop = ocfs2_dlm_seq_stop, 2628 .next = ocfs2_dlm_seq_next, 2629 .show = ocfs2_dlm_seq_show, 2630 }; 2631 2632 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 2633 { 2634 struct seq_file *seq = (struct seq_file *) file->private_data; 2635 struct ocfs2_dlm_seq_priv *priv = seq->private; 2636 struct ocfs2_lock_res *res = &priv->p_iter_res; 2637 2638 ocfs2_remove_lockres_tracking(res); 2639 ocfs2_put_dlm_debug(priv->p_dlm_debug); 2640 return seq_release_private(inode, file); 2641 } 2642 2643 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 2644 { 2645 int ret; 2646 struct ocfs2_dlm_seq_priv *priv; 2647 struct seq_file *seq; 2648 struct ocfs2_super *osb; 2649 2650 priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL); 2651 if (!priv) { 2652 ret = -ENOMEM; 2653 mlog_errno(ret); 2654 goto out; 2655 } 2656 osb = inode->i_private; 2657 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 2658 priv->p_dlm_debug = osb->osb_dlm_debug; 2659 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 2660 2661 ret = seq_open(file, &ocfs2_dlm_seq_ops); 2662 if (ret) { 2663 kfree(priv); 2664 mlog_errno(ret); 2665 goto out; 2666 } 2667 2668 seq = (struct seq_file *) file->private_data; 2669 seq->private = priv; 2670 2671 ocfs2_add_lockres_tracking(&priv->p_iter_res, 2672 priv->p_dlm_debug); 2673 2674 out: 2675 return ret; 2676 } 2677 2678 static const struct file_operations ocfs2_dlm_debug_fops = { 2679 .open = ocfs2_dlm_debug_open, 2680 .release = ocfs2_dlm_debug_release, 2681 .read = seq_read, 2682 .llseek = seq_lseek, 2683 }; 2684 2685 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 2686 { 2687 int ret = 0; 2688 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2689 2690 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 2691 S_IFREG|S_IRUSR, 2692 osb->osb_debug_root, 2693 osb, 2694 &ocfs2_dlm_debug_fops); 2695 if (!dlm_debug->d_locking_state) { 2696 ret = -EINVAL; 2697 mlog(ML_ERROR, 2698 "Unable to create locking state debugfs file.\n"); 2699 goto out; 2700 } 2701 2702 ocfs2_get_dlm_debug(dlm_debug); 2703 out: 2704 return ret; 2705 } 2706 2707 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 2708 { 2709 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2710 2711 if (dlm_debug) { 2712 debugfs_remove(dlm_debug->d_locking_state); 2713 ocfs2_put_dlm_debug(dlm_debug); 2714 } 2715 } 2716 2717 int ocfs2_dlm_init(struct ocfs2_super *osb) 2718 { 2719 int status = 0; 2720 struct ocfs2_cluster_connection *conn = NULL; 2721 2722 mlog_entry_void(); 2723 2724 if (ocfs2_mount_local(osb)) { 2725 osb->node_num = 0; 2726 goto local; 2727 } 2728 2729 status = ocfs2_dlm_init_debug(osb); 2730 if (status < 0) { 2731 mlog_errno(status); 2732 goto bail; 2733 } 2734 2735 /* launch downconvert thread */ 2736 osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc"); 2737 if (IS_ERR(osb->dc_task)) { 2738 status = PTR_ERR(osb->dc_task); 2739 osb->dc_task = NULL; 2740 mlog_errno(status); 2741 goto bail; 2742 } 2743 2744 /* for now, uuid == domain */ 2745 status = ocfs2_cluster_connect(osb->osb_cluster_stack, 2746 osb->uuid_str, 2747 strlen(osb->uuid_str), 2748 ocfs2_do_node_down, osb, 2749 &conn); 2750 if (status) { 2751 mlog_errno(status); 2752 goto bail; 2753 } 2754 2755 status = ocfs2_cluster_this_node(&osb->node_num); 2756 if (status < 0) { 2757 mlog_errno(status); 2758 mlog(ML_ERROR, 2759 "could not find this host's node number\n"); 2760 ocfs2_cluster_disconnect(conn, 0); 2761 goto bail; 2762 } 2763 2764 local: 2765 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 2766 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 2767 2768 osb->cconn = conn; 2769 2770 status = 0; 2771 bail: 2772 if (status < 0) { 2773 ocfs2_dlm_shutdown_debug(osb); 2774 if (osb->dc_task) 2775 kthread_stop(osb->dc_task); 2776 } 2777 2778 mlog_exit(status); 2779 return status; 2780 } 2781 2782 void ocfs2_dlm_shutdown(struct ocfs2_super *osb, 2783 int hangup_pending) 2784 { 2785 mlog_entry_void(); 2786 2787 ocfs2_drop_osb_locks(osb); 2788 2789 /* 2790 * Now that we have dropped all locks and ocfs2_dismount_volume() 2791 * has disabled recovery, the DLM won't be talking to us. It's 2792 * safe to tear things down before disconnecting the cluster. 2793 */ 2794 2795 if (osb->dc_task) { 2796 kthread_stop(osb->dc_task); 2797 osb->dc_task = NULL; 2798 } 2799 2800 ocfs2_lock_res_free(&osb->osb_super_lockres); 2801 ocfs2_lock_res_free(&osb->osb_rename_lockres); 2802 2803 ocfs2_cluster_disconnect(osb->cconn, hangup_pending); 2804 osb->cconn = NULL; 2805 2806 ocfs2_dlm_shutdown_debug(osb); 2807 2808 mlog_exit_void(); 2809 } 2810 2811 static void ocfs2_unlock_ast(void *opaque, int error) 2812 { 2813 struct ocfs2_lock_res *lockres = opaque; 2814 unsigned long flags; 2815 2816 mlog_entry_void(); 2817 2818 mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name, 2819 lockres->l_unlock_action); 2820 2821 spin_lock_irqsave(&lockres->l_lock, flags); 2822 if (error) { 2823 mlog(ML_ERROR, "Dlm passes error %d for lock %s, " 2824 "unlock_action %d\n", error, lockres->l_name, 2825 lockres->l_unlock_action); 2826 spin_unlock_irqrestore(&lockres->l_lock, flags); 2827 return; 2828 } 2829 2830 switch(lockres->l_unlock_action) { 2831 case OCFS2_UNLOCK_CANCEL_CONVERT: 2832 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 2833 lockres->l_action = OCFS2_AST_INVALID; 2834 break; 2835 case OCFS2_UNLOCK_DROP_LOCK: 2836 lockres->l_level = DLM_LOCK_IV; 2837 break; 2838 default: 2839 BUG(); 2840 } 2841 2842 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 2843 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 2844 spin_unlock_irqrestore(&lockres->l_lock, flags); 2845 2846 wake_up(&lockres->l_event); 2847 2848 mlog_exit_void(); 2849 } 2850 2851 static int ocfs2_drop_lock(struct ocfs2_super *osb, 2852 struct ocfs2_lock_res *lockres) 2853 { 2854 int ret; 2855 unsigned long flags; 2856 u32 lkm_flags = 0; 2857 2858 /* We didn't get anywhere near actually using this lockres. */ 2859 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 2860 goto out; 2861 2862 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 2863 lkm_flags |= DLM_LKF_VALBLK; 2864 2865 spin_lock_irqsave(&lockres->l_lock, flags); 2866 2867 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 2868 "lockres %s, flags 0x%lx\n", 2869 lockres->l_name, lockres->l_flags); 2870 2871 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 2872 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 2873 "%u, unlock_action = %u\n", 2874 lockres->l_name, lockres->l_flags, lockres->l_action, 2875 lockres->l_unlock_action); 2876 2877 spin_unlock_irqrestore(&lockres->l_lock, flags); 2878 2879 /* XXX: Today we just wait on any busy 2880 * locks... Perhaps we need to cancel converts in the 2881 * future? */ 2882 ocfs2_wait_on_busy_lock(lockres); 2883 2884 spin_lock_irqsave(&lockres->l_lock, flags); 2885 } 2886 2887 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 2888 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 2889 lockres->l_level == DLM_LOCK_EX && 2890 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 2891 lockres->l_ops->set_lvb(lockres); 2892 } 2893 2894 if (lockres->l_flags & OCFS2_LOCK_BUSY) 2895 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 2896 lockres->l_name); 2897 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 2898 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 2899 2900 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 2901 spin_unlock_irqrestore(&lockres->l_lock, flags); 2902 goto out; 2903 } 2904 2905 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 2906 2907 /* make sure we never get here while waiting for an ast to 2908 * fire. */ 2909 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 2910 2911 /* is this necessary? */ 2912 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2913 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 2914 spin_unlock_irqrestore(&lockres->l_lock, flags); 2915 2916 mlog(0, "lock %s\n", lockres->l_name); 2917 2918 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags, 2919 lockres); 2920 if (ret) { 2921 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 2922 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 2923 ocfs2_dlm_dump_lksb(&lockres->l_lksb); 2924 BUG(); 2925 } 2926 mlog(0, "lock %s, successfull return from ocfs2_dlm_unlock\n", 2927 lockres->l_name); 2928 2929 ocfs2_wait_on_busy_lock(lockres); 2930 out: 2931 mlog_exit(0); 2932 return 0; 2933 } 2934 2935 /* Mark the lockres as being dropped. It will no longer be 2936 * queued if blocking, but we still may have to wait on it 2937 * being dequeued from the downconvert thread before we can consider 2938 * it safe to drop. 2939 * 2940 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 2941 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres) 2942 { 2943 int status; 2944 struct ocfs2_mask_waiter mw; 2945 unsigned long flags; 2946 2947 ocfs2_init_mask_waiter(&mw); 2948 2949 spin_lock_irqsave(&lockres->l_lock, flags); 2950 lockres->l_flags |= OCFS2_LOCK_FREEING; 2951 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 2952 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 2953 spin_unlock_irqrestore(&lockres->l_lock, flags); 2954 2955 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 2956 2957 status = ocfs2_wait_for_mask(&mw); 2958 if (status) 2959 mlog_errno(status); 2960 2961 spin_lock_irqsave(&lockres->l_lock, flags); 2962 } 2963 spin_unlock_irqrestore(&lockres->l_lock, flags); 2964 } 2965 2966 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 2967 struct ocfs2_lock_res *lockres) 2968 { 2969 int ret; 2970 2971 ocfs2_mark_lockres_freeing(lockres); 2972 ret = ocfs2_drop_lock(osb, lockres); 2973 if (ret) 2974 mlog_errno(ret); 2975 } 2976 2977 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 2978 { 2979 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 2980 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 2981 } 2982 2983 int ocfs2_drop_inode_locks(struct inode *inode) 2984 { 2985 int status, err; 2986 2987 mlog_entry_void(); 2988 2989 /* No need to call ocfs2_mark_lockres_freeing here - 2990 * ocfs2_clear_inode has done it for us. */ 2991 2992 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2993 &OCFS2_I(inode)->ip_open_lockres); 2994 if (err < 0) 2995 mlog_errno(err); 2996 2997 status = err; 2998 2999 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3000 &OCFS2_I(inode)->ip_inode_lockres); 3001 if (err < 0) 3002 mlog_errno(err); 3003 if (err < 0 && !status) 3004 status = err; 3005 3006 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3007 &OCFS2_I(inode)->ip_rw_lockres); 3008 if (err < 0) 3009 mlog_errno(err); 3010 if (err < 0 && !status) 3011 status = err; 3012 3013 mlog_exit(status); 3014 return status; 3015 } 3016 3017 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 3018 int new_level) 3019 { 3020 assert_spin_locked(&lockres->l_lock); 3021 3022 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 3023 3024 if (lockres->l_level <= new_level) { 3025 mlog(ML_ERROR, "lockres->l_level (%d) <= new_level (%d)\n", 3026 lockres->l_level, new_level); 3027 BUG(); 3028 } 3029 3030 mlog(0, "lock %s, new_level = %d, l_blocking = %d\n", 3031 lockres->l_name, new_level, lockres->l_blocking); 3032 3033 lockres->l_action = OCFS2_AST_DOWNCONVERT; 3034 lockres->l_requested = new_level; 3035 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3036 return lockres_set_pending(lockres); 3037 } 3038 3039 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 3040 struct ocfs2_lock_res *lockres, 3041 int new_level, 3042 int lvb, 3043 unsigned int generation) 3044 { 3045 int ret; 3046 u32 dlm_flags = DLM_LKF_CONVERT; 3047 3048 mlog_entry_void(); 3049 3050 if (lvb) 3051 dlm_flags |= DLM_LKF_VALBLK; 3052 3053 ret = ocfs2_dlm_lock(osb->cconn, 3054 new_level, 3055 &lockres->l_lksb, 3056 dlm_flags, 3057 lockres->l_name, 3058 OCFS2_LOCK_ID_MAX_LEN - 1, 3059 lockres); 3060 lockres_clear_pending(lockres, generation, osb); 3061 if (ret) { 3062 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 3063 ocfs2_recover_from_dlm_error(lockres, 1); 3064 goto bail; 3065 } 3066 3067 ret = 0; 3068 bail: 3069 mlog_exit(ret); 3070 return ret; 3071 } 3072 3073 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */ 3074 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 3075 struct ocfs2_lock_res *lockres) 3076 { 3077 assert_spin_locked(&lockres->l_lock); 3078 3079 mlog_entry_void(); 3080 mlog(0, "lock %s\n", lockres->l_name); 3081 3082 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 3083 /* If we're already trying to cancel a lock conversion 3084 * then just drop the spinlock and allow the caller to 3085 * requeue this lock. */ 3086 3087 mlog(0, "Lockres %s, skip convert\n", lockres->l_name); 3088 return 0; 3089 } 3090 3091 /* were we in a convert when we got the bast fire? */ 3092 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 3093 lockres->l_action != OCFS2_AST_DOWNCONVERT); 3094 /* set things up for the unlockast to know to just 3095 * clear out the ast_action and unset busy, etc. */ 3096 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 3097 3098 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 3099 "lock %s, invalid flags: 0x%lx\n", 3100 lockres->l_name, lockres->l_flags); 3101 3102 return 1; 3103 } 3104 3105 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 3106 struct ocfs2_lock_res *lockres) 3107 { 3108 int ret; 3109 3110 mlog_entry_void(); 3111 mlog(0, "lock %s\n", lockres->l_name); 3112 3113 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, 3114 DLM_LKF_CANCEL, lockres); 3115 if (ret) { 3116 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3117 ocfs2_recover_from_dlm_error(lockres, 0); 3118 } 3119 3120 mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name); 3121 3122 mlog_exit(ret); 3123 return ret; 3124 } 3125 3126 static int ocfs2_unblock_lock(struct ocfs2_super *osb, 3127 struct ocfs2_lock_res *lockres, 3128 struct ocfs2_unblock_ctl *ctl) 3129 { 3130 unsigned long flags; 3131 int blocking; 3132 int new_level; 3133 int ret = 0; 3134 int set_lvb = 0; 3135 unsigned int gen; 3136 3137 mlog_entry_void(); 3138 3139 spin_lock_irqsave(&lockres->l_lock, flags); 3140 3141 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 3142 3143 recheck: 3144 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3145 /* XXX 3146 * This is a *big* race. The OCFS2_LOCK_PENDING flag 3147 * exists entirely for one reason - another thread has set 3148 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock(). 3149 * 3150 * If we do ocfs2_cancel_convert() before the other thread 3151 * calls dlm_lock(), our cancel will do nothing. We will 3152 * get no ast, and we will have no way of knowing the 3153 * cancel failed. Meanwhile, the other thread will call 3154 * into dlm_lock() and wait...forever. 3155 * 3156 * Why forever? Because another node has asked for the 3157 * lock first; that's why we're here in unblock_lock(). 3158 * 3159 * The solution is OCFS2_LOCK_PENDING. When PENDING is 3160 * set, we just requeue the unblock. Only when the other 3161 * thread has called dlm_lock() and cleared PENDING will 3162 * we then cancel their request. 3163 * 3164 * All callers of dlm_lock() must set OCFS2_DLM_PENDING 3165 * at the same time they set OCFS2_DLM_BUSY. They must 3166 * clear OCFS2_DLM_PENDING after dlm_lock() returns. 3167 */ 3168 if (lockres->l_flags & OCFS2_LOCK_PENDING) 3169 goto leave_requeue; 3170 3171 ctl->requeue = 1; 3172 ret = ocfs2_prepare_cancel_convert(osb, lockres); 3173 spin_unlock_irqrestore(&lockres->l_lock, flags); 3174 if (ret) { 3175 ret = ocfs2_cancel_convert(osb, lockres); 3176 if (ret < 0) 3177 mlog_errno(ret); 3178 } 3179 goto leave; 3180 } 3181 3182 /* if we're blocking an exclusive and we have *any* holders, 3183 * then requeue. */ 3184 if ((lockres->l_blocking == DLM_LOCK_EX) 3185 && (lockres->l_ex_holders || lockres->l_ro_holders)) 3186 goto leave_requeue; 3187 3188 /* If it's a PR we're blocking, then only 3189 * requeue if we've got any EX holders */ 3190 if (lockres->l_blocking == DLM_LOCK_PR && 3191 lockres->l_ex_holders) 3192 goto leave_requeue; 3193 3194 /* 3195 * Can we get a lock in this state if the holder counts are 3196 * zero? The meta data unblock code used to check this. 3197 */ 3198 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 3199 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) 3200 goto leave_requeue; 3201 3202 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 3203 3204 if (lockres->l_ops->check_downconvert 3205 && !lockres->l_ops->check_downconvert(lockres, new_level)) 3206 goto leave_requeue; 3207 3208 /* If we get here, then we know that there are no more 3209 * incompatible holders (and anyone asking for an incompatible 3210 * lock is blocked). We can now downconvert the lock */ 3211 if (!lockres->l_ops->downconvert_worker) 3212 goto downconvert; 3213 3214 /* Some lockres types want to do a bit of work before 3215 * downconverting a lock. Allow that here. The worker function 3216 * may sleep, so we save off a copy of what we're blocking as 3217 * it may change while we're not holding the spin lock. */ 3218 blocking = lockres->l_blocking; 3219 spin_unlock_irqrestore(&lockres->l_lock, flags); 3220 3221 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 3222 3223 if (ctl->unblock_action == UNBLOCK_STOP_POST) 3224 goto leave; 3225 3226 spin_lock_irqsave(&lockres->l_lock, flags); 3227 if (blocking != lockres->l_blocking) { 3228 /* If this changed underneath us, then we can't drop 3229 * it just yet. */ 3230 goto recheck; 3231 } 3232 3233 downconvert: 3234 ctl->requeue = 0; 3235 3236 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3237 if (lockres->l_level == DLM_LOCK_EX) 3238 set_lvb = 1; 3239 3240 /* 3241 * We only set the lvb if the lock has been fully 3242 * refreshed - otherwise we risk setting stale 3243 * data. Otherwise, there's no need to actually clear 3244 * out the lvb here as it's value is still valid. 3245 */ 3246 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3247 lockres->l_ops->set_lvb(lockres); 3248 } 3249 3250 gen = ocfs2_prepare_downconvert(lockres, new_level); 3251 spin_unlock_irqrestore(&lockres->l_lock, flags); 3252 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb, 3253 gen); 3254 3255 leave: 3256 mlog_exit(ret); 3257 return ret; 3258 3259 leave_requeue: 3260 spin_unlock_irqrestore(&lockres->l_lock, flags); 3261 ctl->requeue = 1; 3262 3263 mlog_exit(0); 3264 return 0; 3265 } 3266 3267 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 3268 int blocking) 3269 { 3270 struct inode *inode; 3271 struct address_space *mapping; 3272 3273 inode = ocfs2_lock_res_inode(lockres); 3274 mapping = inode->i_mapping; 3275 3276 if (!S_ISREG(inode->i_mode)) 3277 goto out; 3278 3279 /* 3280 * We need this before the filemap_fdatawrite() so that it can 3281 * transfer the dirty bit from the PTE to the 3282 * page. Unfortunately this means that even for EX->PR 3283 * downconverts, we'll lose our mappings and have to build 3284 * them up again. 3285 */ 3286 unmap_mapping_range(mapping, 0, 0, 0); 3287 3288 if (filemap_fdatawrite(mapping)) { 3289 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 3290 (unsigned long long)OCFS2_I(inode)->ip_blkno); 3291 } 3292 sync_mapping_buffers(mapping); 3293 if (blocking == DLM_LOCK_EX) { 3294 truncate_inode_pages(mapping, 0); 3295 } else { 3296 /* We only need to wait on the I/O if we're not also 3297 * truncating pages because truncate_inode_pages waits 3298 * for us above. We don't truncate pages if we're 3299 * blocking anything < EXMODE because we want to keep 3300 * them around in that case. */ 3301 filemap_fdatawait(mapping); 3302 } 3303 3304 out: 3305 return UNBLOCK_CONTINUE; 3306 } 3307 3308 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 3309 int new_level) 3310 { 3311 struct inode *inode = ocfs2_lock_res_inode(lockres); 3312 int checkpointed = ocfs2_inode_fully_checkpointed(inode); 3313 3314 BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR); 3315 BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed); 3316 3317 if (checkpointed) 3318 return 1; 3319 3320 ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb)); 3321 return 0; 3322 } 3323 3324 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 3325 { 3326 struct inode *inode = ocfs2_lock_res_inode(lockres); 3327 3328 __ocfs2_stuff_meta_lvb(inode); 3329 } 3330 3331 /* 3332 * Does the final reference drop on our dentry lock. Right now this 3333 * happens in the downconvert thread, but we could choose to simplify the 3334 * dlmglue API and push these off to the ocfs2_wq in the future. 3335 */ 3336 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 3337 struct ocfs2_lock_res *lockres) 3338 { 3339 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3340 ocfs2_dentry_lock_put(osb, dl); 3341 } 3342 3343 /* 3344 * d_delete() matching dentries before the lock downconvert. 3345 * 3346 * At this point, any process waiting to destroy the 3347 * dentry_lock due to last ref count is stopped by the 3348 * OCFS2_LOCK_QUEUED flag. 3349 * 3350 * We have two potential problems 3351 * 3352 * 1) If we do the last reference drop on our dentry_lock (via dput) 3353 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 3354 * the downconvert to finish. Instead we take an elevated 3355 * reference and push the drop until after we've completed our 3356 * unblock processing. 3357 * 3358 * 2) There might be another process with a final reference, 3359 * waiting on us to finish processing. If this is the case, we 3360 * detect it and exit out - there's no more dentries anyway. 3361 */ 3362 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 3363 int blocking) 3364 { 3365 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3366 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 3367 struct dentry *dentry; 3368 unsigned long flags; 3369 int extra_ref = 0; 3370 3371 /* 3372 * This node is blocking another node from getting a read 3373 * lock. This happens when we've renamed within a 3374 * directory. We've forced the other nodes to d_delete(), but 3375 * we never actually dropped our lock because it's still 3376 * valid. The downconvert code will retain a PR for this node, 3377 * so there's no further work to do. 3378 */ 3379 if (blocking == DLM_LOCK_PR) 3380 return UNBLOCK_CONTINUE; 3381 3382 /* 3383 * Mark this inode as potentially orphaned. The code in 3384 * ocfs2_delete_inode() will figure out whether it actually 3385 * needs to be freed or not. 3386 */ 3387 spin_lock(&oi->ip_lock); 3388 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 3389 spin_unlock(&oi->ip_lock); 3390 3391 /* 3392 * Yuck. We need to make sure however that the check of 3393 * OCFS2_LOCK_FREEING and the extra reference are atomic with 3394 * respect to a reference decrement or the setting of that 3395 * flag. 3396 */ 3397 spin_lock_irqsave(&lockres->l_lock, flags); 3398 spin_lock(&dentry_attach_lock); 3399 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 3400 && dl->dl_count) { 3401 dl->dl_count++; 3402 extra_ref = 1; 3403 } 3404 spin_unlock(&dentry_attach_lock); 3405 spin_unlock_irqrestore(&lockres->l_lock, flags); 3406 3407 mlog(0, "extra_ref = %d\n", extra_ref); 3408 3409 /* 3410 * We have a process waiting on us in ocfs2_dentry_iput(), 3411 * which means we can't have any more outstanding 3412 * aliases. There's no need to do any more work. 3413 */ 3414 if (!extra_ref) 3415 return UNBLOCK_CONTINUE; 3416 3417 spin_lock(&dentry_attach_lock); 3418 while (1) { 3419 dentry = ocfs2_find_local_alias(dl->dl_inode, 3420 dl->dl_parent_blkno, 1); 3421 if (!dentry) 3422 break; 3423 spin_unlock(&dentry_attach_lock); 3424 3425 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len, 3426 dentry->d_name.name); 3427 3428 /* 3429 * The following dcache calls may do an 3430 * iput(). Normally we don't want that from the 3431 * downconverting thread, but in this case it's ok 3432 * because the requesting node already has an 3433 * exclusive lock on the inode, so it can't be queued 3434 * for a downconvert. 3435 */ 3436 d_delete(dentry); 3437 dput(dentry); 3438 3439 spin_lock(&dentry_attach_lock); 3440 } 3441 spin_unlock(&dentry_attach_lock); 3442 3443 /* 3444 * If we are the last holder of this dentry lock, there is no 3445 * reason to downconvert so skip straight to the unlock. 3446 */ 3447 if (dl->dl_count == 1) 3448 return UNBLOCK_STOP_POST; 3449 3450 return UNBLOCK_CONTINUE_POST; 3451 } 3452 3453 /* 3454 * This is the filesystem locking protocol. It provides the lock handling 3455 * hooks for the underlying DLM. It has a maximum version number. 3456 * The version number allows interoperability with systems running at 3457 * the same major number and an equal or smaller minor number. 3458 * 3459 * Whenever the filesystem does new things with locks (adds or removes a 3460 * lock, orders them differently, does different things underneath a lock), 3461 * the version must be changed. The protocol is negotiated when joining 3462 * the dlm domain. A node may join the domain if its major version is 3463 * identical to all other nodes and its minor version is greater than 3464 * or equal to all other nodes. When its minor version is greater than 3465 * the other nodes, it will run at the minor version specified by the 3466 * other nodes. 3467 * 3468 * If a locking change is made that will not be compatible with older 3469 * versions, the major number must be increased and the minor version set 3470 * to zero. If a change merely adds a behavior that can be disabled when 3471 * speaking to older versions, the minor version must be increased. If a 3472 * change adds a fully backwards compatible change (eg, LVB changes that 3473 * are just ignored by older versions), the version does not need to be 3474 * updated. 3475 */ 3476 static struct ocfs2_locking_protocol lproto = { 3477 .lp_max_version = { 3478 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 3479 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 3480 }, 3481 .lp_lock_ast = ocfs2_locking_ast, 3482 .lp_blocking_ast = ocfs2_blocking_ast, 3483 .lp_unlock_ast = ocfs2_unlock_ast, 3484 }; 3485 3486 void ocfs2_set_locking_protocol(void) 3487 { 3488 ocfs2_stack_glue_set_locking_protocol(&lproto); 3489 } 3490 3491 3492 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3493 struct ocfs2_lock_res *lockres) 3494 { 3495 int status; 3496 struct ocfs2_unblock_ctl ctl = {0, 0,}; 3497 unsigned long flags; 3498 3499 /* Our reference to the lockres in this function can be 3500 * considered valid until we remove the OCFS2_LOCK_QUEUED 3501 * flag. */ 3502 3503 mlog_entry_void(); 3504 3505 BUG_ON(!lockres); 3506 BUG_ON(!lockres->l_ops); 3507 3508 mlog(0, "lockres %s blocked.\n", lockres->l_name); 3509 3510 /* Detect whether a lock has been marked as going away while 3511 * the downconvert thread was processing other things. A lock can 3512 * still be marked with OCFS2_LOCK_FREEING after this check, 3513 * but short circuiting here will still save us some 3514 * performance. */ 3515 spin_lock_irqsave(&lockres->l_lock, flags); 3516 if (lockres->l_flags & OCFS2_LOCK_FREEING) 3517 goto unqueue; 3518 spin_unlock_irqrestore(&lockres->l_lock, flags); 3519 3520 status = ocfs2_unblock_lock(osb, lockres, &ctl); 3521 if (status < 0) 3522 mlog_errno(status); 3523 3524 spin_lock_irqsave(&lockres->l_lock, flags); 3525 unqueue: 3526 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 3527 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 3528 } else 3529 ocfs2_schedule_blocked_lock(osb, lockres); 3530 3531 mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name, 3532 ctl.requeue ? "yes" : "no"); 3533 spin_unlock_irqrestore(&lockres->l_lock, flags); 3534 3535 if (ctl.unblock_action != UNBLOCK_CONTINUE 3536 && lockres->l_ops->post_unlock) 3537 lockres->l_ops->post_unlock(osb, lockres); 3538 3539 mlog_exit_void(); 3540 } 3541 3542 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 3543 struct ocfs2_lock_res *lockres) 3544 { 3545 mlog_entry_void(); 3546 3547 assert_spin_locked(&lockres->l_lock); 3548 3549 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 3550 /* Do not schedule a lock for downconvert when it's on 3551 * the way to destruction - any nodes wanting access 3552 * to the resource will get it soon. */ 3553 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n", 3554 lockres->l_name, lockres->l_flags); 3555 return; 3556 } 3557 3558 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 3559 3560 spin_lock(&osb->dc_task_lock); 3561 if (list_empty(&lockres->l_blocked_list)) { 3562 list_add_tail(&lockres->l_blocked_list, 3563 &osb->blocked_lock_list); 3564 osb->blocked_lock_count++; 3565 } 3566 spin_unlock(&osb->dc_task_lock); 3567 3568 mlog_exit_void(); 3569 } 3570 3571 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) 3572 { 3573 unsigned long processed; 3574 struct ocfs2_lock_res *lockres; 3575 3576 mlog_entry_void(); 3577 3578 spin_lock(&osb->dc_task_lock); 3579 /* grab this early so we know to try again if a state change and 3580 * wake happens part-way through our work */ 3581 osb->dc_work_sequence = osb->dc_wake_sequence; 3582 3583 processed = osb->blocked_lock_count; 3584 while (processed) { 3585 BUG_ON(list_empty(&osb->blocked_lock_list)); 3586 3587 lockres = list_entry(osb->blocked_lock_list.next, 3588 struct ocfs2_lock_res, l_blocked_list); 3589 list_del_init(&lockres->l_blocked_list); 3590 osb->blocked_lock_count--; 3591 spin_unlock(&osb->dc_task_lock); 3592 3593 BUG_ON(!processed); 3594 processed--; 3595 3596 ocfs2_process_blocked_lock(osb, lockres); 3597 3598 spin_lock(&osb->dc_task_lock); 3599 } 3600 spin_unlock(&osb->dc_task_lock); 3601 3602 mlog_exit_void(); 3603 } 3604 3605 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) 3606 { 3607 int empty = 0; 3608 3609 spin_lock(&osb->dc_task_lock); 3610 if (list_empty(&osb->blocked_lock_list)) 3611 empty = 1; 3612 3613 spin_unlock(&osb->dc_task_lock); 3614 return empty; 3615 } 3616 3617 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) 3618 { 3619 int should_wake = 0; 3620 3621 spin_lock(&osb->dc_task_lock); 3622 if (osb->dc_work_sequence != osb->dc_wake_sequence) 3623 should_wake = 1; 3624 spin_unlock(&osb->dc_task_lock); 3625 3626 return should_wake; 3627 } 3628 3629 static int ocfs2_downconvert_thread(void *arg) 3630 { 3631 int status = 0; 3632 struct ocfs2_super *osb = arg; 3633 3634 /* only quit once we've been asked to stop and there is no more 3635 * work available */ 3636 while (!(kthread_should_stop() && 3637 ocfs2_downconvert_thread_lists_empty(osb))) { 3638 3639 wait_event_interruptible(osb->dc_event, 3640 ocfs2_downconvert_thread_should_wake(osb) || 3641 kthread_should_stop()); 3642 3643 mlog(0, "downconvert_thread: awoken\n"); 3644 3645 ocfs2_downconvert_thread_do_work(osb); 3646 } 3647 3648 osb->dc_task = NULL; 3649 return status; 3650 } 3651 3652 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) 3653 { 3654 spin_lock(&osb->dc_task_lock); 3655 /* make sure the voting thread gets a swipe at whatever changes 3656 * the caller may have made to the voting state */ 3657 osb->dc_wake_sequence++; 3658 spin_unlock(&osb->dc_task_lock); 3659 wake_up(&osb->dc_event); 3660 } 3661