1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26 #include <linux/types.h> 27 #include <linux/slab.h> 28 #include <linux/highmem.h> 29 #include <linux/mm.h> 30 #include <linux/kthread.h> 31 #include <linux/pagemap.h> 32 #include <linux/debugfs.h> 33 #include <linux/seq_file.h> 34 #include <linux/time.h> 35 #include <linux/quotaops.h> 36 37 #define MLOG_MASK_PREFIX ML_DLM_GLUE 38 #include <cluster/masklog.h> 39 40 #include "ocfs2.h" 41 #include "ocfs2_lockingver.h" 42 43 #include "alloc.h" 44 #include "dcache.h" 45 #include "dlmglue.h" 46 #include "extent_map.h" 47 #include "file.h" 48 #include "heartbeat.h" 49 #include "inode.h" 50 #include "journal.h" 51 #include "stackglue.h" 52 #include "slot_map.h" 53 #include "super.h" 54 #include "uptodate.h" 55 #include "quota.h" 56 #include "refcounttree.h" 57 58 #include "buffer_head_io.h" 59 60 struct ocfs2_mask_waiter { 61 struct list_head mw_item; 62 int mw_status; 63 struct completion mw_complete; 64 unsigned long mw_mask; 65 unsigned long mw_goal; 66 #ifdef CONFIG_OCFS2_FS_STATS 67 ktime_t mw_lock_start; 68 #endif 69 }; 70 71 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 72 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 73 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres); 74 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres); 75 76 /* 77 * Return value from ->downconvert_worker functions. 78 * 79 * These control the precise actions of ocfs2_unblock_lock() 80 * and ocfs2_process_blocked_lock() 81 * 82 */ 83 enum ocfs2_unblock_action { 84 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 85 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 86 * ->post_unlock callback */ 87 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 88 * ->post_unlock() callback. */ 89 }; 90 91 struct ocfs2_unblock_ctl { 92 int requeue; 93 enum ocfs2_unblock_action unblock_action; 94 }; 95 96 /* Lockdep class keys */ 97 struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES]; 98 99 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 100 int new_level); 101 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 102 103 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 104 int blocking); 105 106 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 107 int blocking); 108 109 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 110 struct ocfs2_lock_res *lockres); 111 112 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres); 113 114 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 115 int new_level); 116 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 117 int blocking); 118 119 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres) 120 121 /* This aids in debugging situations where a bad LVB might be involved. */ 122 static void ocfs2_dump_meta_lvb_info(u64 level, 123 const char *function, 124 unsigned int line, 125 struct ocfs2_lock_res *lockres) 126 { 127 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 128 129 mlog(level, "LVB information for %s (called from %s:%u):\n", 130 lockres->l_name, function, line); 131 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 132 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 133 be32_to_cpu(lvb->lvb_igeneration)); 134 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 135 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 136 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 137 be16_to_cpu(lvb->lvb_imode)); 138 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 139 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 140 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 141 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 142 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 143 be32_to_cpu(lvb->lvb_iattr)); 144 } 145 146 147 /* 148 * OCFS2 Lock Resource Operations 149 * 150 * These fine tune the behavior of the generic dlmglue locking infrastructure. 151 * 152 * The most basic of lock types can point ->l_priv to their respective 153 * struct ocfs2_super and allow the default actions to manage things. 154 * 155 * Right now, each lock type also needs to implement an init function, 156 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres() 157 * should be called when the lock is no longer needed (i.e., object 158 * destruction time). 159 */ 160 struct ocfs2_lock_res_ops { 161 /* 162 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 163 * this callback if ->l_priv is not an ocfs2_super pointer 164 */ 165 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 166 167 /* 168 * Optionally called in the downconvert thread after a 169 * successful downconvert. The lockres will not be referenced 170 * after this callback is called, so it is safe to free 171 * memory, etc. 172 * 173 * The exact semantics of when this is called are controlled 174 * by ->downconvert_worker() 175 */ 176 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 177 178 /* 179 * Allow a lock type to add checks to determine whether it is 180 * safe to downconvert a lock. Return 0 to re-queue the 181 * downconvert at a later time, nonzero to continue. 182 * 183 * For most locks, the default checks that there are no 184 * incompatible holders are sufficient. 185 * 186 * Called with the lockres spinlock held. 187 */ 188 int (*check_downconvert)(struct ocfs2_lock_res *, int); 189 190 /* 191 * Allows a lock type to populate the lock value block. This 192 * is called on downconvert, and when we drop a lock. 193 * 194 * Locks that want to use this should set LOCK_TYPE_USES_LVB 195 * in the flags field. 196 * 197 * Called with the lockres spinlock held. 198 */ 199 void (*set_lvb)(struct ocfs2_lock_res *); 200 201 /* 202 * Called from the downconvert thread when it is determined 203 * that a lock will be downconverted. This is called without 204 * any locks held so the function can do work that might 205 * schedule (syncing out data, etc). 206 * 207 * This should return any one of the ocfs2_unblock_action 208 * values, depending on what it wants the thread to do. 209 */ 210 int (*downconvert_worker)(struct ocfs2_lock_res *, int); 211 212 /* 213 * LOCK_TYPE_* flags which describe the specific requirements 214 * of a lock type. Descriptions of each individual flag follow. 215 */ 216 int flags; 217 }; 218 219 /* 220 * Some locks want to "refresh" potentially stale data when a 221 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 222 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 223 * individual lockres l_flags member from the ast function. It is 224 * expected that the locking wrapper will clear the 225 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 226 */ 227 #define LOCK_TYPE_REQUIRES_REFRESH 0x1 228 229 /* 230 * Indicate that a lock type makes use of the lock value block. The 231 * ->set_lvb lock type callback must be defined. 232 */ 233 #define LOCK_TYPE_USES_LVB 0x2 234 235 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 236 .get_osb = ocfs2_get_inode_osb, 237 .flags = 0, 238 }; 239 240 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = { 241 .get_osb = ocfs2_get_inode_osb, 242 .check_downconvert = ocfs2_check_meta_downconvert, 243 .set_lvb = ocfs2_set_meta_lvb, 244 .downconvert_worker = ocfs2_data_convert_worker, 245 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 246 }; 247 248 static struct ocfs2_lock_res_ops ocfs2_super_lops = { 249 .flags = LOCK_TYPE_REQUIRES_REFRESH, 250 }; 251 252 static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 253 .flags = 0, 254 }; 255 256 static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = { 257 .flags = 0, 258 }; 259 260 static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = { 261 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 262 }; 263 264 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 265 .get_osb = ocfs2_get_dentry_osb, 266 .post_unlock = ocfs2_dentry_post_unlock, 267 .downconvert_worker = ocfs2_dentry_convert_worker, 268 .flags = 0, 269 }; 270 271 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = { 272 .get_osb = ocfs2_get_inode_osb, 273 .flags = 0, 274 }; 275 276 static struct ocfs2_lock_res_ops ocfs2_flock_lops = { 277 .get_osb = ocfs2_get_file_osb, 278 .flags = 0, 279 }; 280 281 static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = { 282 .set_lvb = ocfs2_set_qinfo_lvb, 283 .get_osb = ocfs2_get_qinfo_osb, 284 .flags = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB, 285 }; 286 287 static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = { 288 .check_downconvert = ocfs2_check_refcount_downconvert, 289 .downconvert_worker = ocfs2_refcount_convert_worker, 290 .flags = 0, 291 }; 292 293 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 294 { 295 return lockres->l_type == OCFS2_LOCK_TYPE_META || 296 lockres->l_type == OCFS2_LOCK_TYPE_RW || 297 lockres->l_type == OCFS2_LOCK_TYPE_OPEN; 298 } 299 300 static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb) 301 { 302 return container_of(lksb, struct ocfs2_lock_res, l_lksb); 303 } 304 305 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 306 { 307 BUG_ON(!ocfs2_is_inode_lock(lockres)); 308 309 return (struct inode *) lockres->l_priv; 310 } 311 312 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 313 { 314 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 315 316 return (struct ocfs2_dentry_lock *)lockres->l_priv; 317 } 318 319 static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres) 320 { 321 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO); 322 323 return (struct ocfs2_mem_dqinfo *)lockres->l_priv; 324 } 325 326 static inline struct ocfs2_refcount_tree * 327 ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res) 328 { 329 return container_of(res, struct ocfs2_refcount_tree, rf_lockres); 330 } 331 332 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 333 { 334 if (lockres->l_ops->get_osb) 335 return lockres->l_ops->get_osb(lockres); 336 337 return (struct ocfs2_super *)lockres->l_priv; 338 } 339 340 static int ocfs2_lock_create(struct ocfs2_super *osb, 341 struct ocfs2_lock_res *lockres, 342 int level, 343 u32 dlm_flags); 344 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 345 int wanted); 346 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 347 struct ocfs2_lock_res *lockres, 348 int level, unsigned long caller_ip); 349 static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb, 350 struct ocfs2_lock_res *lockres, 351 int level) 352 { 353 __ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_); 354 } 355 356 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 357 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 358 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 359 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 360 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 361 struct ocfs2_lock_res *lockres); 362 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 363 int convert); 364 #define ocfs2_log_dlm_error(_func, _err, _lockres) do { \ 365 if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY) \ 366 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \ 367 _err, _func, _lockres->l_name); \ 368 else \ 369 mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n", \ 370 _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name, \ 371 (unsigned int)ocfs2_get_dentry_lock_ino(_lockres)); \ 372 } while (0) 373 static int ocfs2_downconvert_thread(void *arg); 374 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 375 struct ocfs2_lock_res *lockres); 376 static int ocfs2_inode_lock_update(struct inode *inode, 377 struct buffer_head **bh); 378 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 379 static inline int ocfs2_highest_compat_lock_level(int level); 380 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 381 int new_level); 382 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 383 struct ocfs2_lock_res *lockres, 384 int new_level, 385 int lvb, 386 unsigned int generation); 387 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 388 struct ocfs2_lock_res *lockres); 389 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 390 struct ocfs2_lock_res *lockres); 391 392 393 static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 394 u64 blkno, 395 u32 generation, 396 char *name) 397 { 398 int len; 399 400 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 401 402 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 403 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 404 (long long)blkno, generation); 405 406 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 407 408 mlog(0, "built lock resource with name: %s\n", name); 409 } 410 411 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 412 413 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 414 struct ocfs2_dlm_debug *dlm_debug) 415 { 416 mlog(0, "Add tracking for lockres %s\n", res->l_name); 417 418 spin_lock(&ocfs2_dlm_tracking_lock); 419 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 420 spin_unlock(&ocfs2_dlm_tracking_lock); 421 } 422 423 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 424 { 425 spin_lock(&ocfs2_dlm_tracking_lock); 426 if (!list_empty(&res->l_debug_list)) 427 list_del_init(&res->l_debug_list); 428 spin_unlock(&ocfs2_dlm_tracking_lock); 429 } 430 431 #ifdef CONFIG_OCFS2_FS_STATS 432 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 433 { 434 res->l_lock_refresh = 0; 435 memset(&res->l_lock_prmode, 0, sizeof(struct ocfs2_lock_stats)); 436 memset(&res->l_lock_exmode, 0, sizeof(struct ocfs2_lock_stats)); 437 } 438 439 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level, 440 struct ocfs2_mask_waiter *mw, int ret) 441 { 442 u32 usec; 443 ktime_t kt; 444 struct ocfs2_lock_stats *stats; 445 446 if (level == LKM_PRMODE) 447 stats = &res->l_lock_prmode; 448 else if (level == LKM_EXMODE) 449 stats = &res->l_lock_exmode; 450 else 451 return; 452 453 kt = ktime_sub(ktime_get(), mw->mw_lock_start); 454 usec = ktime_to_us(kt); 455 456 stats->ls_gets++; 457 stats->ls_total += ktime_to_ns(kt); 458 /* overflow */ 459 if (unlikely(stats->ls_gets == 0)) { 460 stats->ls_gets++; 461 stats->ls_total = ktime_to_ns(kt); 462 } 463 464 if (stats->ls_max < usec) 465 stats->ls_max = usec; 466 467 if (ret) 468 stats->ls_fail++; 469 } 470 471 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 472 { 473 lockres->l_lock_refresh++; 474 } 475 476 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 477 { 478 mw->mw_lock_start = ktime_get(); 479 } 480 #else 481 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 482 { 483 } 484 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, 485 int level, struct ocfs2_mask_waiter *mw, int ret) 486 { 487 } 488 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 489 { 490 } 491 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 492 { 493 } 494 #endif 495 496 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 497 struct ocfs2_lock_res *res, 498 enum ocfs2_lock_type type, 499 struct ocfs2_lock_res_ops *ops, 500 void *priv) 501 { 502 res->l_type = type; 503 res->l_ops = ops; 504 res->l_priv = priv; 505 506 res->l_level = DLM_LOCK_IV; 507 res->l_requested = DLM_LOCK_IV; 508 res->l_blocking = DLM_LOCK_IV; 509 res->l_action = OCFS2_AST_INVALID; 510 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 511 512 res->l_flags = OCFS2_LOCK_INITIALIZED; 513 514 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 515 516 ocfs2_init_lock_stats(res); 517 #ifdef CONFIG_DEBUG_LOCK_ALLOC 518 if (type != OCFS2_LOCK_TYPE_OPEN) 519 lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type], 520 &lockdep_keys[type], 0); 521 else 522 res->l_lockdep_map.key = NULL; 523 #endif 524 } 525 526 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 527 { 528 /* This also clears out the lock status block */ 529 memset(res, 0, sizeof(struct ocfs2_lock_res)); 530 spin_lock_init(&res->l_lock); 531 init_waitqueue_head(&res->l_event); 532 INIT_LIST_HEAD(&res->l_blocked_list); 533 INIT_LIST_HEAD(&res->l_mask_waiters); 534 } 535 536 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 537 enum ocfs2_lock_type type, 538 unsigned int generation, 539 struct inode *inode) 540 { 541 struct ocfs2_lock_res_ops *ops; 542 543 switch(type) { 544 case OCFS2_LOCK_TYPE_RW: 545 ops = &ocfs2_inode_rw_lops; 546 break; 547 case OCFS2_LOCK_TYPE_META: 548 ops = &ocfs2_inode_inode_lops; 549 break; 550 case OCFS2_LOCK_TYPE_OPEN: 551 ops = &ocfs2_inode_open_lops; 552 break; 553 default: 554 mlog_bug_on_msg(1, "type: %d\n", type); 555 ops = NULL; /* thanks, gcc */ 556 break; 557 }; 558 559 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 560 generation, res->l_name); 561 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 562 } 563 564 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 565 { 566 struct inode *inode = ocfs2_lock_res_inode(lockres); 567 568 return OCFS2_SB(inode->i_sb); 569 } 570 571 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres) 572 { 573 struct ocfs2_mem_dqinfo *info = lockres->l_priv; 574 575 return OCFS2_SB(info->dqi_gi.dqi_sb); 576 } 577 578 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres) 579 { 580 struct ocfs2_file_private *fp = lockres->l_priv; 581 582 return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb); 583 } 584 585 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 586 { 587 __be64 inode_blkno_be; 588 589 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 590 sizeof(__be64)); 591 592 return be64_to_cpu(inode_blkno_be); 593 } 594 595 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 596 { 597 struct ocfs2_dentry_lock *dl = lockres->l_priv; 598 599 return OCFS2_SB(dl->dl_inode->i_sb); 600 } 601 602 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 603 u64 parent, struct inode *inode) 604 { 605 int len; 606 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 607 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 608 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 609 610 ocfs2_lock_res_init_once(lockres); 611 612 /* 613 * Unfortunately, the standard lock naming scheme won't work 614 * here because we have two 16 byte values to use. Instead, 615 * we'll stuff the inode number as a binary value. We still 616 * want error prints to show something without garbling the 617 * display, so drop a null byte in there before the inode 618 * number. A future version of OCFS2 will likely use all 619 * binary lock names. The stringified names have been a 620 * tremendous aid in debugging, but now that the debugfs 621 * interface exists, we can mangle things there if need be. 622 * 623 * NOTE: We also drop the standard "pad" value (the total lock 624 * name size stays the same though - the last part is all 625 * zeros due to the memset in ocfs2_lock_res_init_once() 626 */ 627 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 628 "%c%016llx", 629 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 630 (long long)parent); 631 632 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 633 634 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 635 sizeof(__be64)); 636 637 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 638 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 639 dl); 640 } 641 642 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 643 struct ocfs2_super *osb) 644 { 645 /* Superblock lockres doesn't come from a slab so we call init 646 * once on it manually. */ 647 ocfs2_lock_res_init_once(res); 648 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 649 0, res->l_name); 650 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 651 &ocfs2_super_lops, osb); 652 } 653 654 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 655 struct ocfs2_super *osb) 656 { 657 /* Rename lockres doesn't come from a slab so we call init 658 * once on it manually. */ 659 ocfs2_lock_res_init_once(res); 660 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 661 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 662 &ocfs2_rename_lops, osb); 663 } 664 665 static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res, 666 struct ocfs2_super *osb) 667 { 668 /* nfs_sync lockres doesn't come from a slab so we call init 669 * once on it manually. */ 670 ocfs2_lock_res_init_once(res); 671 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name); 672 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC, 673 &ocfs2_nfs_sync_lops, osb); 674 } 675 676 static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res, 677 struct ocfs2_super *osb) 678 { 679 ocfs2_lock_res_init_once(res); 680 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name); 681 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN, 682 &ocfs2_orphan_scan_lops, osb); 683 } 684 685 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 686 struct ocfs2_file_private *fp) 687 { 688 struct inode *inode = fp->fp_file->f_mapping->host; 689 struct ocfs2_inode_info *oi = OCFS2_I(inode); 690 691 ocfs2_lock_res_init_once(lockres); 692 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno, 693 inode->i_generation, lockres->l_name); 694 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 695 OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops, 696 fp); 697 lockres->l_flags |= OCFS2_LOCK_NOCACHE; 698 } 699 700 void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres, 701 struct ocfs2_mem_dqinfo *info) 702 { 703 ocfs2_lock_res_init_once(lockres); 704 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type, 705 0, lockres->l_name); 706 ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres, 707 OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops, 708 info); 709 } 710 711 void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres, 712 struct ocfs2_super *osb, u64 ref_blkno, 713 unsigned int generation) 714 { 715 ocfs2_lock_res_init_once(lockres); 716 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno, 717 generation, lockres->l_name); 718 ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT, 719 &ocfs2_refcount_block_lops, osb); 720 } 721 722 void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 723 { 724 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 725 return; 726 727 ocfs2_remove_lockres_tracking(res); 728 729 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 730 "Lockres %s is on the blocked list\n", 731 res->l_name); 732 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 733 "Lockres %s has mask waiters pending\n", 734 res->l_name); 735 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 736 "Lockres %s is locked\n", 737 res->l_name); 738 mlog_bug_on_msg(res->l_ro_holders, 739 "Lockres %s has %u ro holders\n", 740 res->l_name, res->l_ro_holders); 741 mlog_bug_on_msg(res->l_ex_holders, 742 "Lockres %s has %u ex holders\n", 743 res->l_name, res->l_ex_holders); 744 745 /* Need to clear out the lock status block for the dlm */ 746 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 747 748 res->l_flags = 0UL; 749 } 750 751 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 752 int level) 753 { 754 BUG_ON(!lockres); 755 756 switch(level) { 757 case DLM_LOCK_EX: 758 lockres->l_ex_holders++; 759 break; 760 case DLM_LOCK_PR: 761 lockres->l_ro_holders++; 762 break; 763 default: 764 BUG(); 765 } 766 } 767 768 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 769 int level) 770 { 771 BUG_ON(!lockres); 772 773 switch(level) { 774 case DLM_LOCK_EX: 775 BUG_ON(!lockres->l_ex_holders); 776 lockres->l_ex_holders--; 777 break; 778 case DLM_LOCK_PR: 779 BUG_ON(!lockres->l_ro_holders); 780 lockres->l_ro_holders--; 781 break; 782 default: 783 BUG(); 784 } 785 } 786 787 /* WARNING: This function lives in a world where the only three lock 788 * levels are EX, PR, and NL. It *will* have to be adjusted when more 789 * lock types are added. */ 790 static inline int ocfs2_highest_compat_lock_level(int level) 791 { 792 int new_level = DLM_LOCK_EX; 793 794 if (level == DLM_LOCK_EX) 795 new_level = DLM_LOCK_NL; 796 else if (level == DLM_LOCK_PR) 797 new_level = DLM_LOCK_PR; 798 return new_level; 799 } 800 801 static void lockres_set_flags(struct ocfs2_lock_res *lockres, 802 unsigned long newflags) 803 { 804 struct ocfs2_mask_waiter *mw, *tmp; 805 806 assert_spin_locked(&lockres->l_lock); 807 808 lockres->l_flags = newflags; 809 810 list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) { 811 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 812 continue; 813 814 list_del_init(&mw->mw_item); 815 mw->mw_status = 0; 816 complete(&mw->mw_complete); 817 } 818 } 819 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 820 { 821 lockres_set_flags(lockres, lockres->l_flags | or); 822 } 823 static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 824 unsigned long clear) 825 { 826 lockres_set_flags(lockres, lockres->l_flags & ~clear); 827 } 828 829 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 830 { 831 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 832 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 833 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 834 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 835 836 lockres->l_level = lockres->l_requested; 837 if (lockres->l_level <= 838 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 839 lockres->l_blocking = DLM_LOCK_NL; 840 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 841 } 842 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 843 } 844 845 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 846 { 847 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 848 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 849 850 /* Convert from RO to EX doesn't really need anything as our 851 * information is already up to data. Convert from NL to 852 * *anything* however should mark ourselves as needing an 853 * update */ 854 if (lockres->l_level == DLM_LOCK_NL && 855 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 856 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 857 858 lockres->l_level = lockres->l_requested; 859 860 /* 861 * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing 862 * the OCFS2_LOCK_BUSY flag to prevent the dc thread from 863 * downconverting the lock before the upconvert has fully completed. 864 * Do not prevent the dc thread from downconverting if NONBLOCK lock 865 * had already returned. 866 */ 867 if (!(lockres->l_flags & OCFS2_LOCK_NONBLOCK_FINISHED)) 868 lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 869 else 870 lockres_clear_flags(lockres, OCFS2_LOCK_NONBLOCK_FINISHED); 871 872 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 873 } 874 875 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 876 { 877 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); 878 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 879 880 if (lockres->l_requested > DLM_LOCK_NL && 881 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 882 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 883 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 884 885 lockres->l_level = lockres->l_requested; 886 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 887 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 888 } 889 890 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 891 int level) 892 { 893 int needs_downconvert = 0; 894 895 assert_spin_locked(&lockres->l_lock); 896 897 if (level > lockres->l_blocking) { 898 /* only schedule a downconvert if we haven't already scheduled 899 * one that goes low enough to satisfy the level we're 900 * blocking. this also catches the case where we get 901 * duplicate BASTs */ 902 if (ocfs2_highest_compat_lock_level(level) < 903 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 904 needs_downconvert = 1; 905 906 lockres->l_blocking = level; 907 } 908 909 mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n", 910 lockres->l_name, level, lockres->l_level, lockres->l_blocking, 911 needs_downconvert); 912 913 if (needs_downconvert) 914 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 915 mlog(0, "needs_downconvert = %d\n", needs_downconvert); 916 return needs_downconvert; 917 } 918 919 /* 920 * OCFS2_LOCK_PENDING and l_pending_gen. 921 * 922 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting 923 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock() 924 * for more details on the race. 925 * 926 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces 927 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock() 928 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear 929 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns, 930 * the caller is going to try to clear PENDING again. If nothing else is 931 * happening, __lockres_clear_pending() sees PENDING is unset and does 932 * nothing. 933 * 934 * But what if another path (eg downconvert thread) has just started a 935 * new locking action? The other path has re-set PENDING. Our path 936 * cannot clear PENDING, because that will re-open the original race 937 * window. 938 * 939 * [Example] 940 * 941 * ocfs2_meta_lock() 942 * ocfs2_cluster_lock() 943 * set BUSY 944 * set PENDING 945 * drop l_lock 946 * ocfs2_dlm_lock() 947 * ocfs2_locking_ast() ocfs2_downconvert_thread() 948 * clear PENDING ocfs2_unblock_lock() 949 * take_l_lock 950 * !BUSY 951 * ocfs2_prepare_downconvert() 952 * set BUSY 953 * set PENDING 954 * drop l_lock 955 * take l_lock 956 * clear PENDING 957 * drop l_lock 958 * <window> 959 * ocfs2_dlm_lock() 960 * 961 * So as you can see, we now have a window where l_lock is not held, 962 * PENDING is not set, and ocfs2_dlm_lock() has not been called. 963 * 964 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING 965 * set by ocfs2_prepare_downconvert(). That wasn't nice. 966 * 967 * To solve this we introduce l_pending_gen. A call to 968 * lockres_clear_pending() will only do so when it is passed a generation 969 * number that matches the lockres. lockres_set_pending() will return the 970 * current generation number. When ocfs2_cluster_lock() goes to clear 971 * PENDING, it passes the generation it got from set_pending(). In our 972 * example above, the generation numbers will *not* match. Thus, 973 * ocfs2_cluster_lock() will not clear the PENDING set by 974 * ocfs2_prepare_downconvert(). 975 */ 976 977 /* Unlocked version for ocfs2_locking_ast() */ 978 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres, 979 unsigned int generation, 980 struct ocfs2_super *osb) 981 { 982 assert_spin_locked(&lockres->l_lock); 983 984 /* 985 * The ast and locking functions can race us here. The winner 986 * will clear pending, the loser will not. 987 */ 988 if (!(lockres->l_flags & OCFS2_LOCK_PENDING) || 989 (lockres->l_pending_gen != generation)) 990 return; 991 992 lockres_clear_flags(lockres, OCFS2_LOCK_PENDING); 993 lockres->l_pending_gen++; 994 995 /* 996 * The downconvert thread may have skipped us because we 997 * were PENDING. Wake it up. 998 */ 999 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1000 ocfs2_wake_downconvert_thread(osb); 1001 } 1002 1003 /* Locked version for callers of ocfs2_dlm_lock() */ 1004 static void lockres_clear_pending(struct ocfs2_lock_res *lockres, 1005 unsigned int generation, 1006 struct ocfs2_super *osb) 1007 { 1008 unsigned long flags; 1009 1010 spin_lock_irqsave(&lockres->l_lock, flags); 1011 __lockres_clear_pending(lockres, generation, osb); 1012 spin_unlock_irqrestore(&lockres->l_lock, flags); 1013 } 1014 1015 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres) 1016 { 1017 assert_spin_locked(&lockres->l_lock); 1018 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 1019 1020 lockres_or_flags(lockres, OCFS2_LOCK_PENDING); 1021 1022 return lockres->l_pending_gen; 1023 } 1024 1025 static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level) 1026 { 1027 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1028 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1029 int needs_downconvert; 1030 unsigned long flags; 1031 1032 BUG_ON(level <= DLM_LOCK_NL); 1033 1034 mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, " 1035 "type %s\n", lockres->l_name, level, lockres->l_level, 1036 ocfs2_lock_type_string(lockres->l_type)); 1037 1038 /* 1039 * We can skip the bast for locks which don't enable caching - 1040 * they'll be dropped at the earliest possible time anyway. 1041 */ 1042 if (lockres->l_flags & OCFS2_LOCK_NOCACHE) 1043 return; 1044 1045 spin_lock_irqsave(&lockres->l_lock, flags); 1046 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 1047 if (needs_downconvert) 1048 ocfs2_schedule_blocked_lock(osb, lockres); 1049 spin_unlock_irqrestore(&lockres->l_lock, flags); 1050 1051 wake_up(&lockres->l_event); 1052 1053 ocfs2_wake_downconvert_thread(osb); 1054 } 1055 1056 static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb) 1057 { 1058 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1059 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1060 unsigned long flags; 1061 int status; 1062 1063 spin_lock_irqsave(&lockres->l_lock, flags); 1064 1065 status = ocfs2_dlm_lock_status(&lockres->l_lksb); 1066 1067 if (status == -EAGAIN) { 1068 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1069 goto out; 1070 } 1071 1072 if (status) { 1073 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n", 1074 lockres->l_name, status); 1075 spin_unlock_irqrestore(&lockres->l_lock, flags); 1076 return; 1077 } 1078 1079 mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, " 1080 "level %d => %d\n", lockres->l_name, lockres->l_action, 1081 lockres->l_unlock_action, lockres->l_level, lockres->l_requested); 1082 1083 switch(lockres->l_action) { 1084 case OCFS2_AST_ATTACH: 1085 ocfs2_generic_handle_attach_action(lockres); 1086 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 1087 break; 1088 case OCFS2_AST_CONVERT: 1089 ocfs2_generic_handle_convert_action(lockres); 1090 break; 1091 case OCFS2_AST_DOWNCONVERT: 1092 ocfs2_generic_handle_downconvert_action(lockres); 1093 break; 1094 default: 1095 mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, " 1096 "flags 0x%lx, unlock: %u\n", 1097 lockres->l_name, lockres->l_action, lockres->l_flags, 1098 lockres->l_unlock_action); 1099 BUG(); 1100 } 1101 out: 1102 /* set it to something invalid so if we get called again we 1103 * can catch it. */ 1104 lockres->l_action = OCFS2_AST_INVALID; 1105 1106 /* Did we try to cancel this lock? Clear that state */ 1107 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) 1108 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1109 1110 /* 1111 * We may have beaten the locking functions here. We certainly 1112 * know that dlm_lock() has been called :-) 1113 * Because we can't have two lock calls in flight at once, we 1114 * can use lockres->l_pending_gen. 1115 */ 1116 __lockres_clear_pending(lockres, lockres->l_pending_gen, osb); 1117 1118 wake_up(&lockres->l_event); 1119 spin_unlock_irqrestore(&lockres->l_lock, flags); 1120 } 1121 1122 static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error) 1123 { 1124 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1125 unsigned long flags; 1126 1127 mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n", 1128 lockres->l_name, lockres->l_unlock_action); 1129 1130 spin_lock_irqsave(&lockres->l_lock, flags); 1131 if (error) { 1132 mlog(ML_ERROR, "Dlm passes error %d for lock %s, " 1133 "unlock_action %d\n", error, lockres->l_name, 1134 lockres->l_unlock_action); 1135 spin_unlock_irqrestore(&lockres->l_lock, flags); 1136 return; 1137 } 1138 1139 switch(lockres->l_unlock_action) { 1140 case OCFS2_UNLOCK_CANCEL_CONVERT: 1141 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 1142 lockres->l_action = OCFS2_AST_INVALID; 1143 /* Downconvert thread may have requeued this lock, we 1144 * need to wake it. */ 1145 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1146 ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres)); 1147 break; 1148 case OCFS2_UNLOCK_DROP_LOCK: 1149 lockres->l_level = DLM_LOCK_IV; 1150 break; 1151 default: 1152 BUG(); 1153 } 1154 1155 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1156 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1157 wake_up(&lockres->l_event); 1158 spin_unlock_irqrestore(&lockres->l_lock, flags); 1159 } 1160 1161 /* 1162 * This is the filesystem locking protocol. It provides the lock handling 1163 * hooks for the underlying DLM. It has a maximum version number. 1164 * The version number allows interoperability with systems running at 1165 * the same major number and an equal or smaller minor number. 1166 * 1167 * Whenever the filesystem does new things with locks (adds or removes a 1168 * lock, orders them differently, does different things underneath a lock), 1169 * the version must be changed. The protocol is negotiated when joining 1170 * the dlm domain. A node may join the domain if its major version is 1171 * identical to all other nodes and its minor version is greater than 1172 * or equal to all other nodes. When its minor version is greater than 1173 * the other nodes, it will run at the minor version specified by the 1174 * other nodes. 1175 * 1176 * If a locking change is made that will not be compatible with older 1177 * versions, the major number must be increased and the minor version set 1178 * to zero. If a change merely adds a behavior that can be disabled when 1179 * speaking to older versions, the minor version must be increased. If a 1180 * change adds a fully backwards compatible change (eg, LVB changes that 1181 * are just ignored by older versions), the version does not need to be 1182 * updated. 1183 */ 1184 static struct ocfs2_locking_protocol lproto = { 1185 .lp_max_version = { 1186 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 1187 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 1188 }, 1189 .lp_lock_ast = ocfs2_locking_ast, 1190 .lp_blocking_ast = ocfs2_blocking_ast, 1191 .lp_unlock_ast = ocfs2_unlock_ast, 1192 }; 1193 1194 void ocfs2_set_locking_protocol(void) 1195 { 1196 ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version); 1197 } 1198 1199 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 1200 int convert) 1201 { 1202 unsigned long flags; 1203 1204 spin_lock_irqsave(&lockres->l_lock, flags); 1205 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1206 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1207 if (convert) 1208 lockres->l_action = OCFS2_AST_INVALID; 1209 else 1210 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1211 spin_unlock_irqrestore(&lockres->l_lock, flags); 1212 1213 wake_up(&lockres->l_event); 1214 } 1215 1216 /* Note: If we detect another process working on the lock (i.e., 1217 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 1218 * to do the right thing in that case. 1219 */ 1220 static int ocfs2_lock_create(struct ocfs2_super *osb, 1221 struct ocfs2_lock_res *lockres, 1222 int level, 1223 u32 dlm_flags) 1224 { 1225 int ret = 0; 1226 unsigned long flags; 1227 unsigned int gen; 1228 1229 mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level, 1230 dlm_flags); 1231 1232 spin_lock_irqsave(&lockres->l_lock, flags); 1233 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 1234 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 1235 spin_unlock_irqrestore(&lockres->l_lock, flags); 1236 goto bail; 1237 } 1238 1239 lockres->l_action = OCFS2_AST_ATTACH; 1240 lockres->l_requested = level; 1241 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1242 gen = lockres_set_pending(lockres); 1243 spin_unlock_irqrestore(&lockres->l_lock, flags); 1244 1245 ret = ocfs2_dlm_lock(osb->cconn, 1246 level, 1247 &lockres->l_lksb, 1248 dlm_flags, 1249 lockres->l_name, 1250 OCFS2_LOCK_ID_MAX_LEN - 1); 1251 lockres_clear_pending(lockres, gen, osb); 1252 if (ret) { 1253 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1254 ocfs2_recover_from_dlm_error(lockres, 1); 1255 } 1256 1257 mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name); 1258 1259 bail: 1260 return ret; 1261 } 1262 1263 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 1264 int flag) 1265 { 1266 unsigned long flags; 1267 int ret; 1268 1269 spin_lock_irqsave(&lockres->l_lock, flags); 1270 ret = lockres->l_flags & flag; 1271 spin_unlock_irqrestore(&lockres->l_lock, flags); 1272 1273 return ret; 1274 } 1275 1276 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 1277 1278 { 1279 wait_event(lockres->l_event, 1280 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 1281 } 1282 1283 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 1284 1285 { 1286 wait_event(lockres->l_event, 1287 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 1288 } 1289 1290 /* predict what lock level we'll be dropping down to on behalf 1291 * of another node, and return true if the currently wanted 1292 * level will be compatible with it. */ 1293 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 1294 int wanted) 1295 { 1296 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 1297 1298 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 1299 } 1300 1301 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 1302 { 1303 INIT_LIST_HEAD(&mw->mw_item); 1304 init_completion(&mw->mw_complete); 1305 ocfs2_init_start_time(mw); 1306 } 1307 1308 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 1309 { 1310 wait_for_completion(&mw->mw_complete); 1311 /* Re-arm the completion in case we want to wait on it again */ 1312 reinit_completion(&mw->mw_complete); 1313 return mw->mw_status; 1314 } 1315 1316 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 1317 struct ocfs2_mask_waiter *mw, 1318 unsigned long mask, 1319 unsigned long goal) 1320 { 1321 BUG_ON(!list_empty(&mw->mw_item)); 1322 1323 assert_spin_locked(&lockres->l_lock); 1324 1325 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 1326 mw->mw_mask = mask; 1327 mw->mw_goal = goal; 1328 } 1329 1330 /* returns 0 if the mw that was removed was already satisfied, -EBUSY 1331 * if the mask still hadn't reached its goal */ 1332 static int __lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1333 struct ocfs2_mask_waiter *mw) 1334 { 1335 int ret = 0; 1336 1337 assert_spin_locked(&lockres->l_lock); 1338 if (!list_empty(&mw->mw_item)) { 1339 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 1340 ret = -EBUSY; 1341 1342 list_del_init(&mw->mw_item); 1343 init_completion(&mw->mw_complete); 1344 } 1345 1346 return ret; 1347 } 1348 1349 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1350 struct ocfs2_mask_waiter *mw) 1351 { 1352 unsigned long flags; 1353 int ret = 0; 1354 1355 spin_lock_irqsave(&lockres->l_lock, flags); 1356 ret = __lockres_remove_mask_waiter(lockres, mw); 1357 spin_unlock_irqrestore(&lockres->l_lock, flags); 1358 1359 return ret; 1360 1361 } 1362 1363 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, 1364 struct ocfs2_lock_res *lockres) 1365 { 1366 int ret; 1367 1368 ret = wait_for_completion_interruptible(&mw->mw_complete); 1369 if (ret) 1370 lockres_remove_mask_waiter(lockres, mw); 1371 else 1372 ret = mw->mw_status; 1373 /* Re-arm the completion in case we want to wait on it again */ 1374 reinit_completion(&mw->mw_complete); 1375 return ret; 1376 } 1377 1378 static int __ocfs2_cluster_lock(struct ocfs2_super *osb, 1379 struct ocfs2_lock_res *lockres, 1380 int level, 1381 u32 lkm_flags, 1382 int arg_flags, 1383 int l_subclass, 1384 unsigned long caller_ip) 1385 { 1386 struct ocfs2_mask_waiter mw; 1387 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1388 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1389 unsigned long flags; 1390 unsigned int gen; 1391 int noqueue_attempted = 0; 1392 int dlm_locked = 0; 1393 1394 ocfs2_init_mask_waiter(&mw); 1395 1396 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 1397 lkm_flags |= DLM_LKF_VALBLK; 1398 1399 again: 1400 wait = 0; 1401 1402 spin_lock_irqsave(&lockres->l_lock, flags); 1403 1404 if (catch_signals && signal_pending(current)) { 1405 ret = -ERESTARTSYS; 1406 goto unlock; 1407 } 1408 1409 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1410 "Cluster lock called on freeing lockres %s! flags " 1411 "0x%lx\n", lockres->l_name, lockres->l_flags); 1412 1413 /* We only compare against the currently granted level 1414 * here. If the lock is blocked waiting on a downconvert, 1415 * we'll get caught below. */ 1416 if (lockres->l_flags & OCFS2_LOCK_BUSY && 1417 level > lockres->l_level) { 1418 /* is someone sitting in dlm_lock? If so, wait on 1419 * them. */ 1420 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1421 wait = 1; 1422 goto unlock; 1423 } 1424 1425 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) { 1426 /* 1427 * We've upconverted. If the lock now has a level we can 1428 * work with, we take it. If, however, the lock is not at the 1429 * required level, we go thru the full cycle. One way this could 1430 * happen is if a process requesting an upconvert to PR is 1431 * closely followed by another requesting upconvert to an EX. 1432 * If the process requesting EX lands here, we want it to 1433 * continue attempting to upconvert and let the process 1434 * requesting PR take the lock. 1435 * If multiple processes request upconvert to PR, the first one 1436 * here will take the lock. The others will have to go thru the 1437 * OCFS2_LOCK_BLOCKED check to ensure that there is no pending 1438 * downconvert request. 1439 */ 1440 if (level <= lockres->l_level) 1441 goto update_holders; 1442 } 1443 1444 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1445 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1446 /* is the lock is currently blocked on behalf of 1447 * another node */ 1448 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 1449 wait = 1; 1450 goto unlock; 1451 } 1452 1453 if (level > lockres->l_level) { 1454 if (noqueue_attempted > 0) { 1455 ret = -EAGAIN; 1456 goto unlock; 1457 } 1458 if (lkm_flags & DLM_LKF_NOQUEUE) 1459 noqueue_attempted = 1; 1460 1461 if (lockres->l_action != OCFS2_AST_INVALID) 1462 mlog(ML_ERROR, "lockres %s has action %u pending\n", 1463 lockres->l_name, lockres->l_action); 1464 1465 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1466 lockres->l_action = OCFS2_AST_ATTACH; 1467 lkm_flags &= ~DLM_LKF_CONVERT; 1468 } else { 1469 lockres->l_action = OCFS2_AST_CONVERT; 1470 lkm_flags |= DLM_LKF_CONVERT; 1471 } 1472 1473 lockres->l_requested = level; 1474 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1475 gen = lockres_set_pending(lockres); 1476 spin_unlock_irqrestore(&lockres->l_lock, flags); 1477 1478 BUG_ON(level == DLM_LOCK_IV); 1479 BUG_ON(level == DLM_LOCK_NL); 1480 1481 mlog(ML_BASTS, "lockres %s, convert from %d to %d\n", 1482 lockres->l_name, lockres->l_level, level); 1483 1484 /* call dlm_lock to upgrade lock now */ 1485 ret = ocfs2_dlm_lock(osb->cconn, 1486 level, 1487 &lockres->l_lksb, 1488 lkm_flags, 1489 lockres->l_name, 1490 OCFS2_LOCK_ID_MAX_LEN - 1); 1491 lockres_clear_pending(lockres, gen, osb); 1492 if (ret) { 1493 if (!(lkm_flags & DLM_LKF_NOQUEUE) || 1494 (ret != -EAGAIN)) { 1495 ocfs2_log_dlm_error("ocfs2_dlm_lock", 1496 ret, lockres); 1497 } 1498 ocfs2_recover_from_dlm_error(lockres, 1); 1499 goto out; 1500 } 1501 dlm_locked = 1; 1502 1503 mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n", 1504 lockres->l_name); 1505 1506 /* At this point we've gone inside the dlm and need to 1507 * complete our work regardless. */ 1508 catch_signals = 0; 1509 1510 /* wait for busy to clear and carry on */ 1511 goto again; 1512 } 1513 1514 update_holders: 1515 /* Ok, if we get here then we're good to go. */ 1516 ocfs2_inc_holders(lockres, level); 1517 1518 ret = 0; 1519 unlock: 1520 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1521 1522 spin_unlock_irqrestore(&lockres->l_lock, flags); 1523 out: 1524 /* 1525 * This is helping work around a lock inversion between the page lock 1526 * and dlm locks. One path holds the page lock while calling aops 1527 * which block acquiring dlm locks. The voting thread holds dlm 1528 * locks while acquiring page locks while down converting data locks. 1529 * This block is helping an aop path notice the inversion and back 1530 * off to unlock its page lock before trying the dlm lock again. 1531 */ 1532 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1533 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1534 wait = 0; 1535 spin_lock_irqsave(&lockres->l_lock, flags); 1536 if (__lockres_remove_mask_waiter(lockres, &mw)) { 1537 if (dlm_locked) 1538 lockres_or_flags(lockres, 1539 OCFS2_LOCK_NONBLOCK_FINISHED); 1540 spin_unlock_irqrestore(&lockres->l_lock, flags); 1541 ret = -EAGAIN; 1542 } else { 1543 spin_unlock_irqrestore(&lockres->l_lock, flags); 1544 goto again; 1545 } 1546 } 1547 if (wait) { 1548 ret = ocfs2_wait_for_mask(&mw); 1549 if (ret == 0) 1550 goto again; 1551 mlog_errno(ret); 1552 } 1553 ocfs2_update_lock_stats(lockres, level, &mw, ret); 1554 1555 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1556 if (!ret && lockres->l_lockdep_map.key != NULL) { 1557 if (level == DLM_LOCK_PR) 1558 rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass, 1559 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1560 caller_ip); 1561 else 1562 rwsem_acquire(&lockres->l_lockdep_map, l_subclass, 1563 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1564 caller_ip); 1565 } 1566 #endif 1567 return ret; 1568 } 1569 1570 static inline int ocfs2_cluster_lock(struct ocfs2_super *osb, 1571 struct ocfs2_lock_res *lockres, 1572 int level, 1573 u32 lkm_flags, 1574 int arg_flags) 1575 { 1576 return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags, 1577 0, _RET_IP_); 1578 } 1579 1580 1581 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 1582 struct ocfs2_lock_res *lockres, 1583 int level, 1584 unsigned long caller_ip) 1585 { 1586 unsigned long flags; 1587 1588 spin_lock_irqsave(&lockres->l_lock, flags); 1589 ocfs2_dec_holders(lockres, level); 1590 ocfs2_downconvert_on_unlock(osb, lockres); 1591 spin_unlock_irqrestore(&lockres->l_lock, flags); 1592 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1593 if (lockres->l_lockdep_map.key != NULL) 1594 rwsem_release(&lockres->l_lockdep_map, 1, caller_ip); 1595 #endif 1596 } 1597 1598 static int ocfs2_create_new_lock(struct ocfs2_super *osb, 1599 struct ocfs2_lock_res *lockres, 1600 int ex, 1601 int local) 1602 { 1603 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1604 unsigned long flags; 1605 u32 lkm_flags = local ? DLM_LKF_LOCAL : 0; 1606 1607 spin_lock_irqsave(&lockres->l_lock, flags); 1608 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1609 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1610 spin_unlock_irqrestore(&lockres->l_lock, flags); 1611 1612 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1613 } 1614 1615 /* Grants us an EX lock on the data and metadata resources, skipping 1616 * the normal cluster directory lookup. Use this ONLY on newly created 1617 * inodes which other nodes can't possibly see, and which haven't been 1618 * hashed in the inode hash yet. This can give us a good performance 1619 * increase as it'll skip the network broadcast normally associated 1620 * with creating a new lock resource. */ 1621 int ocfs2_create_new_inode_locks(struct inode *inode) 1622 { 1623 int ret; 1624 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1625 1626 BUG_ON(!inode); 1627 BUG_ON(!ocfs2_inode_is_new(inode)); 1628 1629 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1630 1631 /* NOTE: That we don't increment any of the holder counts, nor 1632 * do we add anything to a journal handle. Since this is 1633 * supposed to be a new inode which the cluster doesn't know 1634 * about yet, there is no need to. As far as the LVB handling 1635 * is concerned, this is basically like acquiring an EX lock 1636 * on a resource which has an invalid one -- we'll set it 1637 * valid when we release the EX. */ 1638 1639 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1640 if (ret) { 1641 mlog_errno(ret); 1642 goto bail; 1643 } 1644 1645 /* 1646 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they 1647 * don't use a generation in their lock names. 1648 */ 1649 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); 1650 if (ret) { 1651 mlog_errno(ret); 1652 goto bail; 1653 } 1654 1655 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0); 1656 if (ret) { 1657 mlog_errno(ret); 1658 goto bail; 1659 } 1660 1661 bail: 1662 return ret; 1663 } 1664 1665 int ocfs2_rw_lock(struct inode *inode, int write) 1666 { 1667 int status, level; 1668 struct ocfs2_lock_res *lockres; 1669 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1670 1671 BUG_ON(!inode); 1672 1673 mlog(0, "inode %llu take %s RW lock\n", 1674 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1675 write ? "EXMODE" : "PRMODE"); 1676 1677 if (ocfs2_mount_local(osb)) 1678 return 0; 1679 1680 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1681 1682 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1683 1684 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0, 1685 0); 1686 if (status < 0) 1687 mlog_errno(status); 1688 1689 return status; 1690 } 1691 1692 void ocfs2_rw_unlock(struct inode *inode, int write) 1693 { 1694 int level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1695 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1696 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1697 1698 mlog(0, "inode %llu drop %s RW lock\n", 1699 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1700 write ? "EXMODE" : "PRMODE"); 1701 1702 if (!ocfs2_mount_local(osb)) 1703 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1704 } 1705 1706 /* 1707 * ocfs2_open_lock always get PR mode lock. 1708 */ 1709 int ocfs2_open_lock(struct inode *inode) 1710 { 1711 int status = 0; 1712 struct ocfs2_lock_res *lockres; 1713 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1714 1715 BUG_ON(!inode); 1716 1717 mlog(0, "inode %llu take PRMODE open lock\n", 1718 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1719 1720 if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb)) 1721 goto out; 1722 1723 lockres = &OCFS2_I(inode)->ip_open_lockres; 1724 1725 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1726 DLM_LOCK_PR, 0, 0); 1727 if (status < 0) 1728 mlog_errno(status); 1729 1730 out: 1731 return status; 1732 } 1733 1734 int ocfs2_try_open_lock(struct inode *inode, int write) 1735 { 1736 int status = 0, level; 1737 struct ocfs2_lock_res *lockres; 1738 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1739 1740 BUG_ON(!inode); 1741 1742 mlog(0, "inode %llu try to take %s open lock\n", 1743 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1744 write ? "EXMODE" : "PRMODE"); 1745 1746 if (ocfs2_is_hard_readonly(osb)) { 1747 if (write) 1748 status = -EROFS; 1749 goto out; 1750 } 1751 1752 if (ocfs2_mount_local(osb)) 1753 goto out; 1754 1755 lockres = &OCFS2_I(inode)->ip_open_lockres; 1756 1757 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1758 1759 /* 1760 * The file system may already holding a PRMODE/EXMODE open lock. 1761 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on 1762 * other nodes and the -EAGAIN will indicate to the caller that 1763 * this inode is still in use. 1764 */ 1765 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1766 level, DLM_LKF_NOQUEUE, 0); 1767 1768 out: 1769 return status; 1770 } 1771 1772 /* 1773 * ocfs2_open_unlock unlock PR and EX mode open locks. 1774 */ 1775 void ocfs2_open_unlock(struct inode *inode) 1776 { 1777 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres; 1778 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1779 1780 mlog(0, "inode %llu drop open lock\n", 1781 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1782 1783 if (ocfs2_mount_local(osb)) 1784 goto out; 1785 1786 if(lockres->l_ro_holders) 1787 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1788 DLM_LOCK_PR); 1789 if(lockres->l_ex_holders) 1790 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1791 DLM_LOCK_EX); 1792 1793 out: 1794 return; 1795 } 1796 1797 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, 1798 int level) 1799 { 1800 int ret; 1801 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1802 unsigned long flags; 1803 struct ocfs2_mask_waiter mw; 1804 1805 ocfs2_init_mask_waiter(&mw); 1806 1807 retry_cancel: 1808 spin_lock_irqsave(&lockres->l_lock, flags); 1809 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 1810 ret = ocfs2_prepare_cancel_convert(osb, lockres); 1811 if (ret) { 1812 spin_unlock_irqrestore(&lockres->l_lock, flags); 1813 ret = ocfs2_cancel_convert(osb, lockres); 1814 if (ret < 0) { 1815 mlog_errno(ret); 1816 goto out; 1817 } 1818 goto retry_cancel; 1819 } 1820 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1821 spin_unlock_irqrestore(&lockres->l_lock, flags); 1822 1823 ocfs2_wait_for_mask(&mw); 1824 goto retry_cancel; 1825 } 1826 1827 ret = -ERESTARTSYS; 1828 /* 1829 * We may still have gotten the lock, in which case there's no 1830 * point to restarting the syscall. 1831 */ 1832 if (lockres->l_level == level) 1833 ret = 0; 1834 1835 mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret, 1836 lockres->l_flags, lockres->l_level, lockres->l_action); 1837 1838 spin_unlock_irqrestore(&lockres->l_lock, flags); 1839 1840 out: 1841 return ret; 1842 } 1843 1844 /* 1845 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of 1846 * flock() calls. The locking approach this requires is sufficiently 1847 * different from all other cluster lock types that we implement a 1848 * separate path to the "low-level" dlm calls. In particular: 1849 * 1850 * - No optimization of lock levels is done - we take at exactly 1851 * what's been requested. 1852 * 1853 * - No lock caching is employed. We immediately downconvert to 1854 * no-lock at unlock time. This also means flock locks never go on 1855 * the blocking list). 1856 * 1857 * - Since userspace can trivially deadlock itself with flock, we make 1858 * sure to allow cancellation of a misbehaving applications flock() 1859 * request. 1860 * 1861 * - Access to any flock lockres doesn't require concurrency, so we 1862 * can simplify the code by requiring the caller to guarantee 1863 * serialization of dlmglue flock calls. 1864 */ 1865 int ocfs2_file_lock(struct file *file, int ex, int trylock) 1866 { 1867 int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1868 unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0; 1869 unsigned long flags; 1870 struct ocfs2_file_private *fp = file->private_data; 1871 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1872 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1873 struct ocfs2_mask_waiter mw; 1874 1875 ocfs2_init_mask_waiter(&mw); 1876 1877 if ((lockres->l_flags & OCFS2_LOCK_BUSY) || 1878 (lockres->l_level > DLM_LOCK_NL)) { 1879 mlog(ML_ERROR, 1880 "File lock \"%s\" has busy or locked state: flags: 0x%lx, " 1881 "level: %u\n", lockres->l_name, lockres->l_flags, 1882 lockres->l_level); 1883 return -EINVAL; 1884 } 1885 1886 spin_lock_irqsave(&lockres->l_lock, flags); 1887 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1888 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1889 spin_unlock_irqrestore(&lockres->l_lock, flags); 1890 1891 /* 1892 * Get the lock at NLMODE to start - that way we 1893 * can cancel the upconvert request if need be. 1894 */ 1895 ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0); 1896 if (ret < 0) { 1897 mlog_errno(ret); 1898 goto out; 1899 } 1900 1901 ret = ocfs2_wait_for_mask(&mw); 1902 if (ret) { 1903 mlog_errno(ret); 1904 goto out; 1905 } 1906 spin_lock_irqsave(&lockres->l_lock, flags); 1907 } 1908 1909 lockres->l_action = OCFS2_AST_CONVERT; 1910 lkm_flags |= DLM_LKF_CONVERT; 1911 lockres->l_requested = level; 1912 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1913 1914 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1915 spin_unlock_irqrestore(&lockres->l_lock, flags); 1916 1917 ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags, 1918 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1); 1919 if (ret) { 1920 if (!trylock || (ret != -EAGAIN)) { 1921 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1922 ret = -EINVAL; 1923 } 1924 1925 ocfs2_recover_from_dlm_error(lockres, 1); 1926 lockres_remove_mask_waiter(lockres, &mw); 1927 goto out; 1928 } 1929 1930 ret = ocfs2_wait_for_mask_interruptible(&mw, lockres); 1931 if (ret == -ERESTARTSYS) { 1932 /* 1933 * Userspace can cause deadlock itself with 1934 * flock(). Current behavior locally is to allow the 1935 * deadlock, but abort the system call if a signal is 1936 * received. We follow this example, otherwise a 1937 * poorly written program could sit in kernel until 1938 * reboot. 1939 * 1940 * Handling this is a bit more complicated for Ocfs2 1941 * though. We can't exit this function with an 1942 * outstanding lock request, so a cancel convert is 1943 * required. We intentionally overwrite 'ret' - if the 1944 * cancel fails and the lock was granted, it's easier 1945 * to just bubble success back up to the user. 1946 */ 1947 ret = ocfs2_flock_handle_signal(lockres, level); 1948 } else if (!ret && (level > lockres->l_level)) { 1949 /* Trylock failed asynchronously */ 1950 BUG_ON(!trylock); 1951 ret = -EAGAIN; 1952 } 1953 1954 out: 1955 1956 mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n", 1957 lockres->l_name, ex, trylock, ret); 1958 return ret; 1959 } 1960 1961 void ocfs2_file_unlock(struct file *file) 1962 { 1963 int ret; 1964 unsigned int gen; 1965 unsigned long flags; 1966 struct ocfs2_file_private *fp = file->private_data; 1967 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1968 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1969 struct ocfs2_mask_waiter mw; 1970 1971 ocfs2_init_mask_waiter(&mw); 1972 1973 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) 1974 return; 1975 1976 if (lockres->l_level == DLM_LOCK_NL) 1977 return; 1978 1979 mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", 1980 lockres->l_name, lockres->l_flags, lockres->l_level, 1981 lockres->l_action); 1982 1983 spin_lock_irqsave(&lockres->l_lock, flags); 1984 /* 1985 * Fake a blocking ast for the downconvert code. 1986 */ 1987 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 1988 lockres->l_blocking = DLM_LOCK_EX; 1989 1990 gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL); 1991 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1992 spin_unlock_irqrestore(&lockres->l_lock, flags); 1993 1994 ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen); 1995 if (ret) { 1996 mlog_errno(ret); 1997 return; 1998 } 1999 2000 ret = ocfs2_wait_for_mask(&mw); 2001 if (ret) 2002 mlog_errno(ret); 2003 } 2004 2005 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 2006 struct ocfs2_lock_res *lockres) 2007 { 2008 int kick = 0; 2009 2010 /* If we know that another node is waiting on our lock, kick 2011 * the downconvert thread * pre-emptively when we reach a release 2012 * condition. */ 2013 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 2014 switch(lockres->l_blocking) { 2015 case DLM_LOCK_EX: 2016 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 2017 kick = 1; 2018 break; 2019 case DLM_LOCK_PR: 2020 if (!lockres->l_ex_holders) 2021 kick = 1; 2022 break; 2023 default: 2024 BUG(); 2025 } 2026 } 2027 2028 if (kick) 2029 ocfs2_wake_downconvert_thread(osb); 2030 } 2031 2032 #define OCFS2_SEC_BITS 34 2033 #define OCFS2_SEC_SHIFT (64 - 34) 2034 #define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 2035 2036 /* LVB only has room for 64 bits of time here so we pack it for 2037 * now. */ 2038 static u64 ocfs2_pack_timespec(struct timespec *spec) 2039 { 2040 u64 res; 2041 u64 sec = spec->tv_sec; 2042 u32 nsec = spec->tv_nsec; 2043 2044 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 2045 2046 return res; 2047 } 2048 2049 /* Call this with the lockres locked. I am reasonably sure we don't 2050 * need ip_lock in this function as anyone who would be changing those 2051 * values is supposed to be blocked in ocfs2_inode_lock right now. */ 2052 static void __ocfs2_stuff_meta_lvb(struct inode *inode) 2053 { 2054 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2055 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2056 struct ocfs2_meta_lvb *lvb; 2057 2058 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2059 2060 /* 2061 * Invalidate the LVB of a deleted inode - this way other 2062 * nodes are forced to go to disk and discover the new inode 2063 * status. 2064 */ 2065 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2066 lvb->lvb_version = 0; 2067 goto out; 2068 } 2069 2070 lvb->lvb_version = OCFS2_LVB_VERSION; 2071 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 2072 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 2073 lvb->lvb_iuid = cpu_to_be32(i_uid_read(inode)); 2074 lvb->lvb_igid = cpu_to_be32(i_gid_read(inode)); 2075 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 2076 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 2077 lvb->lvb_iatime_packed = 2078 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime)); 2079 lvb->lvb_ictime_packed = 2080 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime)); 2081 lvb->lvb_imtime_packed = 2082 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime)); 2083 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 2084 lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features); 2085 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 2086 2087 out: 2088 mlog_meta_lvb(0, lockres); 2089 } 2090 2091 static void ocfs2_unpack_timespec(struct timespec *spec, 2092 u64 packed_time) 2093 { 2094 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 2095 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 2096 } 2097 2098 static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 2099 { 2100 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2101 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2102 struct ocfs2_meta_lvb *lvb; 2103 2104 mlog_meta_lvb(0, lockres); 2105 2106 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2107 2108 /* We're safe here without the lockres lock... */ 2109 spin_lock(&oi->ip_lock); 2110 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 2111 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 2112 2113 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 2114 oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures); 2115 ocfs2_set_inode_flags(inode); 2116 2117 /* fast-symlinks are a special case */ 2118 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 2119 inode->i_blocks = 0; 2120 else 2121 inode->i_blocks = ocfs2_inode_sector_count(inode); 2122 2123 i_uid_write(inode, be32_to_cpu(lvb->lvb_iuid)); 2124 i_gid_write(inode, be32_to_cpu(lvb->lvb_igid)); 2125 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 2126 set_nlink(inode, be16_to_cpu(lvb->lvb_inlink)); 2127 ocfs2_unpack_timespec(&inode->i_atime, 2128 be64_to_cpu(lvb->lvb_iatime_packed)); 2129 ocfs2_unpack_timespec(&inode->i_mtime, 2130 be64_to_cpu(lvb->lvb_imtime_packed)); 2131 ocfs2_unpack_timespec(&inode->i_ctime, 2132 be64_to_cpu(lvb->lvb_ictime_packed)); 2133 spin_unlock(&oi->ip_lock); 2134 } 2135 2136 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 2137 struct ocfs2_lock_res *lockres) 2138 { 2139 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2140 2141 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) 2142 && lvb->lvb_version == OCFS2_LVB_VERSION 2143 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 2144 return 1; 2145 return 0; 2146 } 2147 2148 /* Determine whether a lock resource needs to be refreshed, and 2149 * arbitrate who gets to refresh it. 2150 * 2151 * 0 means no refresh needed. 2152 * 2153 * > 0 means you need to refresh this and you MUST call 2154 * ocfs2_complete_lock_res_refresh afterwards. */ 2155 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 2156 { 2157 unsigned long flags; 2158 int status = 0; 2159 2160 refresh_check: 2161 spin_lock_irqsave(&lockres->l_lock, flags); 2162 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 2163 spin_unlock_irqrestore(&lockres->l_lock, flags); 2164 goto bail; 2165 } 2166 2167 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 2168 spin_unlock_irqrestore(&lockres->l_lock, flags); 2169 2170 ocfs2_wait_on_refreshing_lock(lockres); 2171 goto refresh_check; 2172 } 2173 2174 /* Ok, I'll be the one to refresh this lock. */ 2175 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 2176 spin_unlock_irqrestore(&lockres->l_lock, flags); 2177 2178 status = 1; 2179 bail: 2180 mlog(0, "status %d\n", status); 2181 return status; 2182 } 2183 2184 /* If status is non zero, I'll mark it as not being in refresh 2185 * anymroe, but i won't clear the needs refresh flag. */ 2186 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 2187 int status) 2188 { 2189 unsigned long flags; 2190 2191 spin_lock_irqsave(&lockres->l_lock, flags); 2192 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 2193 if (!status) 2194 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 2195 spin_unlock_irqrestore(&lockres->l_lock, flags); 2196 2197 wake_up(&lockres->l_event); 2198 } 2199 2200 /* may or may not return a bh if it went to disk. */ 2201 static int ocfs2_inode_lock_update(struct inode *inode, 2202 struct buffer_head **bh) 2203 { 2204 int status = 0; 2205 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2206 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2207 struct ocfs2_dinode *fe; 2208 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2209 2210 if (ocfs2_mount_local(osb)) 2211 goto bail; 2212 2213 spin_lock(&oi->ip_lock); 2214 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2215 mlog(0, "Orphaned inode %llu was deleted while we " 2216 "were waiting on a lock. ip_flags = 0x%x\n", 2217 (unsigned long long)oi->ip_blkno, oi->ip_flags); 2218 spin_unlock(&oi->ip_lock); 2219 status = -ENOENT; 2220 goto bail; 2221 } 2222 spin_unlock(&oi->ip_lock); 2223 2224 if (!ocfs2_should_refresh_lock_res(lockres)) 2225 goto bail; 2226 2227 /* This will discard any caching information we might have had 2228 * for the inode metadata. */ 2229 ocfs2_metadata_cache_purge(INODE_CACHE(inode)); 2230 2231 ocfs2_extent_map_trunc(inode, 0); 2232 2233 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 2234 mlog(0, "Trusting LVB on inode %llu\n", 2235 (unsigned long long)oi->ip_blkno); 2236 ocfs2_refresh_inode_from_lvb(inode); 2237 } else { 2238 /* Boo, we have to go to disk. */ 2239 /* read bh, cast, ocfs2_refresh_inode */ 2240 status = ocfs2_read_inode_block(inode, bh); 2241 if (status < 0) { 2242 mlog_errno(status); 2243 goto bail_refresh; 2244 } 2245 fe = (struct ocfs2_dinode *) (*bh)->b_data; 2246 2247 /* This is a good chance to make sure we're not 2248 * locking an invalid object. ocfs2_read_inode_block() 2249 * already checked that the inode block is sane. 2250 * 2251 * We bug on a stale inode here because we checked 2252 * above whether it was wiped from disk. The wiping 2253 * node provides a guarantee that we receive that 2254 * message and can mark the inode before dropping any 2255 * locks associated with it. */ 2256 mlog_bug_on_msg(inode->i_generation != 2257 le32_to_cpu(fe->i_generation), 2258 "Invalid dinode %llu disk generation: %u " 2259 "inode->i_generation: %u\n", 2260 (unsigned long long)oi->ip_blkno, 2261 le32_to_cpu(fe->i_generation), 2262 inode->i_generation); 2263 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 2264 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 2265 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 2266 (unsigned long long)oi->ip_blkno, 2267 (unsigned long long)le64_to_cpu(fe->i_dtime), 2268 le32_to_cpu(fe->i_flags)); 2269 2270 ocfs2_refresh_inode(inode, fe); 2271 ocfs2_track_lock_refresh(lockres); 2272 } 2273 2274 status = 0; 2275 bail_refresh: 2276 ocfs2_complete_lock_res_refresh(lockres, status); 2277 bail: 2278 return status; 2279 } 2280 2281 static int ocfs2_assign_bh(struct inode *inode, 2282 struct buffer_head **ret_bh, 2283 struct buffer_head *passed_bh) 2284 { 2285 int status; 2286 2287 if (passed_bh) { 2288 /* Ok, the update went to disk for us, use the 2289 * returned bh. */ 2290 *ret_bh = passed_bh; 2291 get_bh(*ret_bh); 2292 2293 return 0; 2294 } 2295 2296 status = ocfs2_read_inode_block(inode, ret_bh); 2297 if (status < 0) 2298 mlog_errno(status); 2299 2300 return status; 2301 } 2302 2303 /* 2304 * returns < 0 error if the callback will never be called, otherwise 2305 * the result of the lock will be communicated via the callback. 2306 */ 2307 int ocfs2_inode_lock_full_nested(struct inode *inode, 2308 struct buffer_head **ret_bh, 2309 int ex, 2310 int arg_flags, 2311 int subclass) 2312 { 2313 int status, level, acquired; 2314 u32 dlm_flags; 2315 struct ocfs2_lock_res *lockres = NULL; 2316 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2317 struct buffer_head *local_bh = NULL; 2318 2319 BUG_ON(!inode); 2320 2321 mlog(0, "inode %llu, take %s META lock\n", 2322 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2323 ex ? "EXMODE" : "PRMODE"); 2324 2325 status = 0; 2326 acquired = 0; 2327 /* We'll allow faking a readonly metadata lock for 2328 * rodevices. */ 2329 if (ocfs2_is_hard_readonly(osb)) { 2330 if (ex) 2331 status = -EROFS; 2332 goto getbh; 2333 } 2334 2335 if (ocfs2_mount_local(osb)) 2336 goto local; 2337 2338 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2339 ocfs2_wait_for_recovery(osb); 2340 2341 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2342 level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2343 dlm_flags = 0; 2344 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 2345 dlm_flags |= DLM_LKF_NOQUEUE; 2346 2347 status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags, 2348 arg_flags, subclass, _RET_IP_); 2349 if (status < 0) { 2350 if (status != -EAGAIN) 2351 mlog_errno(status); 2352 goto bail; 2353 } 2354 2355 /* Notify the error cleanup path to drop the cluster lock. */ 2356 acquired = 1; 2357 2358 /* We wait twice because a node may have died while we were in 2359 * the lower dlm layers. The second time though, we've 2360 * committed to owning this lock so we don't allow signals to 2361 * abort the operation. */ 2362 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2363 ocfs2_wait_for_recovery(osb); 2364 2365 local: 2366 /* 2367 * We only see this flag if we're being called from 2368 * ocfs2_read_locked_inode(). It means we're locking an inode 2369 * which hasn't been populated yet, so clear the refresh flag 2370 * and let the caller handle it. 2371 */ 2372 if (inode->i_state & I_NEW) { 2373 status = 0; 2374 if (lockres) 2375 ocfs2_complete_lock_res_refresh(lockres, 0); 2376 goto bail; 2377 } 2378 2379 /* This is fun. The caller may want a bh back, or it may 2380 * not. ocfs2_inode_lock_update definitely wants one in, but 2381 * may or may not read one, depending on what's in the 2382 * LVB. The result of all of this is that we've *only* gone to 2383 * disk if we have to, so the complexity is worthwhile. */ 2384 status = ocfs2_inode_lock_update(inode, &local_bh); 2385 if (status < 0) { 2386 if (status != -ENOENT) 2387 mlog_errno(status); 2388 goto bail; 2389 } 2390 getbh: 2391 if (ret_bh) { 2392 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 2393 if (status < 0) { 2394 mlog_errno(status); 2395 goto bail; 2396 } 2397 } 2398 2399 bail: 2400 if (status < 0) { 2401 if (ret_bh && (*ret_bh)) { 2402 brelse(*ret_bh); 2403 *ret_bh = NULL; 2404 } 2405 if (acquired) 2406 ocfs2_inode_unlock(inode, ex); 2407 } 2408 2409 if (local_bh) 2410 brelse(local_bh); 2411 2412 return status; 2413 } 2414 2415 /* 2416 * This is working around a lock inversion between tasks acquiring DLM 2417 * locks while holding a page lock and the downconvert thread which 2418 * blocks dlm lock acquiry while acquiring page locks. 2419 * 2420 * ** These _with_page variantes are only intended to be called from aop 2421 * methods that hold page locks and return a very specific *positive* error 2422 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 2423 * 2424 * The DLM is called such that it returns -EAGAIN if it would have 2425 * blocked waiting for the downconvert thread. In that case we unlock 2426 * our page so the downconvert thread can make progress. Once we've 2427 * done this we have to return AOP_TRUNCATED_PAGE so the aop method 2428 * that called us can bubble that back up into the VFS who will then 2429 * immediately retry the aop call. 2430 * 2431 * We do a blocking lock and immediate unlock before returning, though, so that 2432 * the lock has a great chance of being cached on this node by the time the VFS 2433 * calls back to retry the aop. This has a potential to livelock as nodes 2434 * ping locks back and forth, but that's a risk we're willing to take to avoid 2435 * the lock inversion simply. 2436 */ 2437 int ocfs2_inode_lock_with_page(struct inode *inode, 2438 struct buffer_head **ret_bh, 2439 int ex, 2440 struct page *page) 2441 { 2442 int ret; 2443 2444 ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); 2445 if (ret == -EAGAIN) { 2446 unlock_page(page); 2447 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0) 2448 ocfs2_inode_unlock(inode, ex); 2449 ret = AOP_TRUNCATED_PAGE; 2450 } 2451 2452 return ret; 2453 } 2454 2455 int ocfs2_inode_lock_atime(struct inode *inode, 2456 struct vfsmount *vfsmnt, 2457 int *level) 2458 { 2459 int ret; 2460 2461 ret = ocfs2_inode_lock(inode, NULL, 0); 2462 if (ret < 0) { 2463 mlog_errno(ret); 2464 return ret; 2465 } 2466 2467 /* 2468 * If we should update atime, we will get EX lock, 2469 * otherwise we just get PR lock. 2470 */ 2471 if (ocfs2_should_update_atime(inode, vfsmnt)) { 2472 struct buffer_head *bh = NULL; 2473 2474 ocfs2_inode_unlock(inode, 0); 2475 ret = ocfs2_inode_lock(inode, &bh, 1); 2476 if (ret < 0) { 2477 mlog_errno(ret); 2478 return ret; 2479 } 2480 *level = 1; 2481 if (ocfs2_should_update_atime(inode, vfsmnt)) 2482 ocfs2_update_inode_atime(inode, bh); 2483 if (bh) 2484 brelse(bh); 2485 } else 2486 *level = 0; 2487 2488 return ret; 2489 } 2490 2491 void ocfs2_inode_unlock(struct inode *inode, 2492 int ex) 2493 { 2494 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2495 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; 2496 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2497 2498 mlog(0, "inode %llu drop %s META lock\n", 2499 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2500 ex ? "EXMODE" : "PRMODE"); 2501 2502 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) && 2503 !ocfs2_mount_local(osb)) 2504 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 2505 } 2506 2507 int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno) 2508 { 2509 struct ocfs2_lock_res *lockres; 2510 struct ocfs2_orphan_scan_lvb *lvb; 2511 int status = 0; 2512 2513 if (ocfs2_is_hard_readonly(osb)) 2514 return -EROFS; 2515 2516 if (ocfs2_mount_local(osb)) 2517 return 0; 2518 2519 lockres = &osb->osb_orphan_scan.os_lockres; 2520 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2521 if (status < 0) 2522 return status; 2523 2524 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2525 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 2526 lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION) 2527 *seqno = be32_to_cpu(lvb->lvb_os_seqno); 2528 else 2529 *seqno = osb->osb_orphan_scan.os_seqno + 1; 2530 2531 return status; 2532 } 2533 2534 void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno) 2535 { 2536 struct ocfs2_lock_res *lockres; 2537 struct ocfs2_orphan_scan_lvb *lvb; 2538 2539 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) { 2540 lockres = &osb->osb_orphan_scan.os_lockres; 2541 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2542 lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION; 2543 lvb->lvb_os_seqno = cpu_to_be32(seqno); 2544 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2545 } 2546 } 2547 2548 int ocfs2_super_lock(struct ocfs2_super *osb, 2549 int ex) 2550 { 2551 int status = 0; 2552 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2553 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2554 2555 if (ocfs2_is_hard_readonly(osb)) 2556 return -EROFS; 2557 2558 if (ocfs2_mount_local(osb)) 2559 goto bail; 2560 2561 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 2562 if (status < 0) { 2563 mlog_errno(status); 2564 goto bail; 2565 } 2566 2567 /* The super block lock path is really in the best position to 2568 * know when resources covered by the lock need to be 2569 * refreshed, so we do it here. Of course, making sense of 2570 * everything is up to the caller :) */ 2571 status = ocfs2_should_refresh_lock_res(lockres); 2572 if (status) { 2573 status = ocfs2_refresh_slot_info(osb); 2574 2575 ocfs2_complete_lock_res_refresh(lockres, status); 2576 2577 if (status < 0) { 2578 ocfs2_cluster_unlock(osb, lockres, level); 2579 mlog_errno(status); 2580 } 2581 ocfs2_track_lock_refresh(lockres); 2582 } 2583 bail: 2584 return status; 2585 } 2586 2587 void ocfs2_super_unlock(struct ocfs2_super *osb, 2588 int ex) 2589 { 2590 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2591 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2592 2593 if (!ocfs2_mount_local(osb)) 2594 ocfs2_cluster_unlock(osb, lockres, level); 2595 } 2596 2597 int ocfs2_rename_lock(struct ocfs2_super *osb) 2598 { 2599 int status; 2600 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2601 2602 if (ocfs2_is_hard_readonly(osb)) 2603 return -EROFS; 2604 2605 if (ocfs2_mount_local(osb)) 2606 return 0; 2607 2608 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2609 if (status < 0) 2610 mlog_errno(status); 2611 2612 return status; 2613 } 2614 2615 void ocfs2_rename_unlock(struct ocfs2_super *osb) 2616 { 2617 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2618 2619 if (!ocfs2_mount_local(osb)) 2620 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2621 } 2622 2623 int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex) 2624 { 2625 int status; 2626 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2627 2628 if (ocfs2_is_hard_readonly(osb)) 2629 return -EROFS; 2630 2631 if (ocfs2_mount_local(osb)) 2632 return 0; 2633 2634 status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE, 2635 0, 0); 2636 if (status < 0) 2637 mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status); 2638 2639 return status; 2640 } 2641 2642 void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex) 2643 { 2644 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2645 2646 if (!ocfs2_mount_local(osb)) 2647 ocfs2_cluster_unlock(osb, lockres, 2648 ex ? LKM_EXMODE : LKM_PRMODE); 2649 } 2650 2651 int ocfs2_dentry_lock(struct dentry *dentry, int ex) 2652 { 2653 int ret; 2654 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2655 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2656 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2657 2658 BUG_ON(!dl); 2659 2660 if (ocfs2_is_hard_readonly(osb)) { 2661 if (ex) 2662 return -EROFS; 2663 return 0; 2664 } 2665 2666 if (ocfs2_mount_local(osb)) 2667 return 0; 2668 2669 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 2670 if (ret < 0) 2671 mlog_errno(ret); 2672 2673 return ret; 2674 } 2675 2676 void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 2677 { 2678 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2679 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2680 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2681 2682 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 2683 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 2684 } 2685 2686 /* Reference counting of the dlm debug structure. We want this because 2687 * open references on the debug inodes can live on after a mount, so 2688 * we can't rely on the ocfs2_super to always exist. */ 2689 static void ocfs2_dlm_debug_free(struct kref *kref) 2690 { 2691 struct ocfs2_dlm_debug *dlm_debug; 2692 2693 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 2694 2695 kfree(dlm_debug); 2696 } 2697 2698 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 2699 { 2700 if (dlm_debug) 2701 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 2702 } 2703 2704 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 2705 { 2706 kref_get(&debug->d_refcnt); 2707 } 2708 2709 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 2710 { 2711 struct ocfs2_dlm_debug *dlm_debug; 2712 2713 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 2714 if (!dlm_debug) { 2715 mlog_errno(-ENOMEM); 2716 goto out; 2717 } 2718 2719 kref_init(&dlm_debug->d_refcnt); 2720 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 2721 dlm_debug->d_locking_state = NULL; 2722 out: 2723 return dlm_debug; 2724 } 2725 2726 /* Access to this is arbitrated for us via seq_file->sem. */ 2727 struct ocfs2_dlm_seq_priv { 2728 struct ocfs2_dlm_debug *p_dlm_debug; 2729 struct ocfs2_lock_res p_iter_res; 2730 struct ocfs2_lock_res p_tmp_res; 2731 }; 2732 2733 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 2734 struct ocfs2_dlm_seq_priv *priv) 2735 { 2736 struct ocfs2_lock_res *iter, *ret = NULL; 2737 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 2738 2739 assert_spin_locked(&ocfs2_dlm_tracking_lock); 2740 2741 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 2742 /* discover the head of the list */ 2743 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 2744 mlog(0, "End of list found, %p\n", ret); 2745 break; 2746 } 2747 2748 /* We track our "dummy" iteration lockres' by a NULL 2749 * l_ops field. */ 2750 if (iter->l_ops != NULL) { 2751 ret = iter; 2752 break; 2753 } 2754 } 2755 2756 return ret; 2757 } 2758 2759 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 2760 { 2761 struct ocfs2_dlm_seq_priv *priv = m->private; 2762 struct ocfs2_lock_res *iter; 2763 2764 spin_lock(&ocfs2_dlm_tracking_lock); 2765 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 2766 if (iter) { 2767 /* Since lockres' have the lifetime of their container 2768 * (which can be inodes, ocfs2_supers, etc) we want to 2769 * copy this out to a temporary lockres while still 2770 * under the spinlock. Obviously after this we can't 2771 * trust any pointers on the copy returned, but that's 2772 * ok as the information we want isn't typically held 2773 * in them. */ 2774 priv->p_tmp_res = *iter; 2775 iter = &priv->p_tmp_res; 2776 } 2777 spin_unlock(&ocfs2_dlm_tracking_lock); 2778 2779 return iter; 2780 } 2781 2782 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 2783 { 2784 } 2785 2786 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 2787 { 2788 struct ocfs2_dlm_seq_priv *priv = m->private; 2789 struct ocfs2_lock_res *iter = v; 2790 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 2791 2792 spin_lock(&ocfs2_dlm_tracking_lock); 2793 iter = ocfs2_dlm_next_res(iter, priv); 2794 list_del_init(&dummy->l_debug_list); 2795 if (iter) { 2796 list_add(&dummy->l_debug_list, &iter->l_debug_list); 2797 priv->p_tmp_res = *iter; 2798 iter = &priv->p_tmp_res; 2799 } 2800 spin_unlock(&ocfs2_dlm_tracking_lock); 2801 2802 return iter; 2803 } 2804 2805 /* 2806 * Version is used by debugfs.ocfs2 to determine the format being used 2807 * 2808 * New in version 2 2809 * - Lock stats printed 2810 * New in version 3 2811 * - Max time in lock stats is in usecs (instead of nsecs) 2812 */ 2813 #define OCFS2_DLM_DEBUG_STR_VERSION 3 2814 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 2815 { 2816 int i; 2817 char *lvb; 2818 struct ocfs2_lock_res *lockres = v; 2819 2820 if (!lockres) 2821 return -EINVAL; 2822 2823 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 2824 2825 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 2826 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 2827 lockres->l_name, 2828 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 2829 else 2830 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 2831 2832 seq_printf(m, "%d\t" 2833 "0x%lx\t" 2834 "0x%x\t" 2835 "0x%x\t" 2836 "%u\t" 2837 "%u\t" 2838 "%d\t" 2839 "%d\t", 2840 lockres->l_level, 2841 lockres->l_flags, 2842 lockres->l_action, 2843 lockres->l_unlock_action, 2844 lockres->l_ro_holders, 2845 lockres->l_ex_holders, 2846 lockres->l_requested, 2847 lockres->l_blocking); 2848 2849 /* Dump the raw LVB */ 2850 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2851 for(i = 0; i < DLM_LVB_LEN; i++) 2852 seq_printf(m, "0x%x\t", lvb[i]); 2853 2854 #ifdef CONFIG_OCFS2_FS_STATS 2855 # define lock_num_prmode(_l) ((_l)->l_lock_prmode.ls_gets) 2856 # define lock_num_exmode(_l) ((_l)->l_lock_exmode.ls_gets) 2857 # define lock_num_prmode_failed(_l) ((_l)->l_lock_prmode.ls_fail) 2858 # define lock_num_exmode_failed(_l) ((_l)->l_lock_exmode.ls_fail) 2859 # define lock_total_prmode(_l) ((_l)->l_lock_prmode.ls_total) 2860 # define lock_total_exmode(_l) ((_l)->l_lock_exmode.ls_total) 2861 # define lock_max_prmode(_l) ((_l)->l_lock_prmode.ls_max) 2862 # define lock_max_exmode(_l) ((_l)->l_lock_exmode.ls_max) 2863 # define lock_refresh(_l) ((_l)->l_lock_refresh) 2864 #else 2865 # define lock_num_prmode(_l) (0) 2866 # define lock_num_exmode(_l) (0) 2867 # define lock_num_prmode_failed(_l) (0) 2868 # define lock_num_exmode_failed(_l) (0) 2869 # define lock_total_prmode(_l) (0ULL) 2870 # define lock_total_exmode(_l) (0ULL) 2871 # define lock_max_prmode(_l) (0) 2872 # define lock_max_exmode(_l) (0) 2873 # define lock_refresh(_l) (0) 2874 #endif 2875 /* The following seq_print was added in version 2 of this output */ 2876 seq_printf(m, "%u\t" 2877 "%u\t" 2878 "%u\t" 2879 "%u\t" 2880 "%llu\t" 2881 "%llu\t" 2882 "%u\t" 2883 "%u\t" 2884 "%u\t", 2885 lock_num_prmode(lockres), 2886 lock_num_exmode(lockres), 2887 lock_num_prmode_failed(lockres), 2888 lock_num_exmode_failed(lockres), 2889 lock_total_prmode(lockres), 2890 lock_total_exmode(lockres), 2891 lock_max_prmode(lockres), 2892 lock_max_exmode(lockres), 2893 lock_refresh(lockres)); 2894 2895 /* End the line */ 2896 seq_printf(m, "\n"); 2897 return 0; 2898 } 2899 2900 static const struct seq_operations ocfs2_dlm_seq_ops = { 2901 .start = ocfs2_dlm_seq_start, 2902 .stop = ocfs2_dlm_seq_stop, 2903 .next = ocfs2_dlm_seq_next, 2904 .show = ocfs2_dlm_seq_show, 2905 }; 2906 2907 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 2908 { 2909 struct seq_file *seq = file->private_data; 2910 struct ocfs2_dlm_seq_priv *priv = seq->private; 2911 struct ocfs2_lock_res *res = &priv->p_iter_res; 2912 2913 ocfs2_remove_lockres_tracking(res); 2914 ocfs2_put_dlm_debug(priv->p_dlm_debug); 2915 return seq_release_private(inode, file); 2916 } 2917 2918 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 2919 { 2920 struct ocfs2_dlm_seq_priv *priv; 2921 struct ocfs2_super *osb; 2922 2923 priv = __seq_open_private(file, &ocfs2_dlm_seq_ops, sizeof(*priv)); 2924 if (!priv) { 2925 mlog_errno(-ENOMEM); 2926 return -ENOMEM; 2927 } 2928 2929 osb = inode->i_private; 2930 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 2931 priv->p_dlm_debug = osb->osb_dlm_debug; 2932 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 2933 2934 ocfs2_add_lockres_tracking(&priv->p_iter_res, 2935 priv->p_dlm_debug); 2936 2937 return 0; 2938 } 2939 2940 static const struct file_operations ocfs2_dlm_debug_fops = { 2941 .open = ocfs2_dlm_debug_open, 2942 .release = ocfs2_dlm_debug_release, 2943 .read = seq_read, 2944 .llseek = seq_lseek, 2945 }; 2946 2947 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 2948 { 2949 int ret = 0; 2950 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2951 2952 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 2953 S_IFREG|S_IRUSR, 2954 osb->osb_debug_root, 2955 osb, 2956 &ocfs2_dlm_debug_fops); 2957 if (!dlm_debug->d_locking_state) { 2958 ret = -EINVAL; 2959 mlog(ML_ERROR, 2960 "Unable to create locking state debugfs file.\n"); 2961 goto out; 2962 } 2963 2964 ocfs2_get_dlm_debug(dlm_debug); 2965 out: 2966 return ret; 2967 } 2968 2969 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 2970 { 2971 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2972 2973 if (dlm_debug) { 2974 debugfs_remove(dlm_debug->d_locking_state); 2975 ocfs2_put_dlm_debug(dlm_debug); 2976 } 2977 } 2978 2979 int ocfs2_dlm_init(struct ocfs2_super *osb) 2980 { 2981 int status = 0; 2982 struct ocfs2_cluster_connection *conn = NULL; 2983 2984 if (ocfs2_mount_local(osb)) { 2985 osb->node_num = 0; 2986 goto local; 2987 } 2988 2989 status = ocfs2_dlm_init_debug(osb); 2990 if (status < 0) { 2991 mlog_errno(status); 2992 goto bail; 2993 } 2994 2995 /* launch downconvert thread */ 2996 osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc"); 2997 if (IS_ERR(osb->dc_task)) { 2998 status = PTR_ERR(osb->dc_task); 2999 osb->dc_task = NULL; 3000 mlog_errno(status); 3001 goto bail; 3002 } 3003 3004 /* for now, uuid == domain */ 3005 status = ocfs2_cluster_connect(osb->osb_cluster_stack, 3006 osb->osb_cluster_name, 3007 strlen(osb->osb_cluster_name), 3008 osb->uuid_str, 3009 strlen(osb->uuid_str), 3010 &lproto, ocfs2_do_node_down, osb, 3011 &conn); 3012 if (status) { 3013 mlog_errno(status); 3014 goto bail; 3015 } 3016 3017 status = ocfs2_cluster_this_node(conn, &osb->node_num); 3018 if (status < 0) { 3019 mlog_errno(status); 3020 mlog(ML_ERROR, 3021 "could not find this host's node number\n"); 3022 ocfs2_cluster_disconnect(conn, 0); 3023 goto bail; 3024 } 3025 3026 local: 3027 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 3028 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 3029 ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb); 3030 ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb); 3031 3032 osb->cconn = conn; 3033 3034 status = 0; 3035 bail: 3036 if (status < 0) { 3037 ocfs2_dlm_shutdown_debug(osb); 3038 if (osb->dc_task) 3039 kthread_stop(osb->dc_task); 3040 } 3041 3042 return status; 3043 } 3044 3045 void ocfs2_dlm_shutdown(struct ocfs2_super *osb, 3046 int hangup_pending) 3047 { 3048 ocfs2_drop_osb_locks(osb); 3049 3050 /* 3051 * Now that we have dropped all locks and ocfs2_dismount_volume() 3052 * has disabled recovery, the DLM won't be talking to us. It's 3053 * safe to tear things down before disconnecting the cluster. 3054 */ 3055 3056 if (osb->dc_task) { 3057 kthread_stop(osb->dc_task); 3058 osb->dc_task = NULL; 3059 } 3060 3061 ocfs2_lock_res_free(&osb->osb_super_lockres); 3062 ocfs2_lock_res_free(&osb->osb_rename_lockres); 3063 ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres); 3064 ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres); 3065 3066 ocfs2_cluster_disconnect(osb->cconn, hangup_pending); 3067 osb->cconn = NULL; 3068 3069 ocfs2_dlm_shutdown_debug(osb); 3070 } 3071 3072 static int ocfs2_drop_lock(struct ocfs2_super *osb, 3073 struct ocfs2_lock_res *lockres) 3074 { 3075 int ret; 3076 unsigned long flags; 3077 u32 lkm_flags = 0; 3078 3079 /* We didn't get anywhere near actually using this lockres. */ 3080 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 3081 goto out; 3082 3083 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 3084 lkm_flags |= DLM_LKF_VALBLK; 3085 3086 spin_lock_irqsave(&lockres->l_lock, flags); 3087 3088 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 3089 "lockres %s, flags 0x%lx\n", 3090 lockres->l_name, lockres->l_flags); 3091 3092 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 3093 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 3094 "%u, unlock_action = %u\n", 3095 lockres->l_name, lockres->l_flags, lockres->l_action, 3096 lockres->l_unlock_action); 3097 3098 spin_unlock_irqrestore(&lockres->l_lock, flags); 3099 3100 /* XXX: Today we just wait on any busy 3101 * locks... Perhaps we need to cancel converts in the 3102 * future? */ 3103 ocfs2_wait_on_busy_lock(lockres); 3104 3105 spin_lock_irqsave(&lockres->l_lock, flags); 3106 } 3107 3108 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3109 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 3110 lockres->l_level == DLM_LOCK_EX && 3111 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3112 lockres->l_ops->set_lvb(lockres); 3113 } 3114 3115 if (lockres->l_flags & OCFS2_LOCK_BUSY) 3116 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 3117 lockres->l_name); 3118 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 3119 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 3120 3121 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 3122 spin_unlock_irqrestore(&lockres->l_lock, flags); 3123 goto out; 3124 } 3125 3126 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 3127 3128 /* make sure we never get here while waiting for an ast to 3129 * fire. */ 3130 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 3131 3132 /* is this necessary? */ 3133 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3134 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 3135 spin_unlock_irqrestore(&lockres->l_lock, flags); 3136 3137 mlog(0, "lock %s\n", lockres->l_name); 3138 3139 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags); 3140 if (ret) { 3141 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3142 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 3143 ocfs2_dlm_dump_lksb(&lockres->l_lksb); 3144 BUG(); 3145 } 3146 mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n", 3147 lockres->l_name); 3148 3149 ocfs2_wait_on_busy_lock(lockres); 3150 out: 3151 return 0; 3152 } 3153 3154 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3155 struct ocfs2_lock_res *lockres); 3156 3157 /* Mark the lockres as being dropped. It will no longer be 3158 * queued if blocking, but we still may have to wait on it 3159 * being dequeued from the downconvert thread before we can consider 3160 * it safe to drop. 3161 * 3162 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 3163 void ocfs2_mark_lockres_freeing(struct ocfs2_super *osb, 3164 struct ocfs2_lock_res *lockres) 3165 { 3166 int status; 3167 struct ocfs2_mask_waiter mw; 3168 unsigned long flags, flags2; 3169 3170 ocfs2_init_mask_waiter(&mw); 3171 3172 spin_lock_irqsave(&lockres->l_lock, flags); 3173 lockres->l_flags |= OCFS2_LOCK_FREEING; 3174 if (lockres->l_flags & OCFS2_LOCK_QUEUED && current == osb->dc_task) { 3175 /* 3176 * We know the downconvert is queued but not in progress 3177 * because we are the downconvert thread and processing 3178 * different lock. So we can just remove the lock from the 3179 * queue. This is not only an optimization but also a way 3180 * to avoid the following deadlock: 3181 * ocfs2_dentry_post_unlock() 3182 * ocfs2_dentry_lock_put() 3183 * ocfs2_drop_dentry_lock() 3184 * iput() 3185 * ocfs2_evict_inode() 3186 * ocfs2_clear_inode() 3187 * ocfs2_mark_lockres_freeing() 3188 * ... blocks waiting for OCFS2_LOCK_QUEUED 3189 * since we are the downconvert thread which 3190 * should clear the flag. 3191 */ 3192 spin_unlock_irqrestore(&lockres->l_lock, flags); 3193 spin_lock_irqsave(&osb->dc_task_lock, flags2); 3194 list_del_init(&lockres->l_blocked_list); 3195 osb->blocked_lock_count--; 3196 spin_unlock_irqrestore(&osb->dc_task_lock, flags2); 3197 /* 3198 * Warn if we recurse into another post_unlock call. Strictly 3199 * speaking it isn't a problem but we need to be careful if 3200 * that happens (stack overflow, deadlocks, ...) so warn if 3201 * ocfs2 grows a path for which this can happen. 3202 */ 3203 WARN_ON_ONCE(lockres->l_ops->post_unlock); 3204 /* Since the lock is freeing we don't do much in the fn below */ 3205 ocfs2_process_blocked_lock(osb, lockres); 3206 return; 3207 } 3208 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 3209 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 3210 spin_unlock_irqrestore(&lockres->l_lock, flags); 3211 3212 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 3213 3214 status = ocfs2_wait_for_mask(&mw); 3215 if (status) 3216 mlog_errno(status); 3217 3218 spin_lock_irqsave(&lockres->l_lock, flags); 3219 } 3220 spin_unlock_irqrestore(&lockres->l_lock, flags); 3221 } 3222 3223 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 3224 struct ocfs2_lock_res *lockres) 3225 { 3226 int ret; 3227 3228 ocfs2_mark_lockres_freeing(osb, lockres); 3229 ret = ocfs2_drop_lock(osb, lockres); 3230 if (ret) 3231 mlog_errno(ret); 3232 } 3233 3234 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 3235 { 3236 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 3237 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 3238 ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres); 3239 ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres); 3240 } 3241 3242 int ocfs2_drop_inode_locks(struct inode *inode) 3243 { 3244 int status, err; 3245 3246 /* No need to call ocfs2_mark_lockres_freeing here - 3247 * ocfs2_clear_inode has done it for us. */ 3248 3249 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3250 &OCFS2_I(inode)->ip_open_lockres); 3251 if (err < 0) 3252 mlog_errno(err); 3253 3254 status = err; 3255 3256 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3257 &OCFS2_I(inode)->ip_inode_lockres); 3258 if (err < 0) 3259 mlog_errno(err); 3260 if (err < 0 && !status) 3261 status = err; 3262 3263 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3264 &OCFS2_I(inode)->ip_rw_lockres); 3265 if (err < 0) 3266 mlog_errno(err); 3267 if (err < 0 && !status) 3268 status = err; 3269 3270 return status; 3271 } 3272 3273 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 3274 int new_level) 3275 { 3276 assert_spin_locked(&lockres->l_lock); 3277 3278 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 3279 3280 if (lockres->l_level <= new_level) { 3281 mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, " 3282 "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, " 3283 "block %d, pgen %d\n", lockres->l_name, lockres->l_level, 3284 new_level, list_empty(&lockres->l_blocked_list), 3285 list_empty(&lockres->l_mask_waiters), lockres->l_type, 3286 lockres->l_flags, lockres->l_ro_holders, 3287 lockres->l_ex_holders, lockres->l_action, 3288 lockres->l_unlock_action, lockres->l_requested, 3289 lockres->l_blocking, lockres->l_pending_gen); 3290 BUG(); 3291 } 3292 3293 mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n", 3294 lockres->l_name, lockres->l_level, new_level, lockres->l_blocking); 3295 3296 lockres->l_action = OCFS2_AST_DOWNCONVERT; 3297 lockres->l_requested = new_level; 3298 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3299 return lockres_set_pending(lockres); 3300 } 3301 3302 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 3303 struct ocfs2_lock_res *lockres, 3304 int new_level, 3305 int lvb, 3306 unsigned int generation) 3307 { 3308 int ret; 3309 u32 dlm_flags = DLM_LKF_CONVERT; 3310 3311 mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name, 3312 lockres->l_level, new_level); 3313 3314 if (lvb) 3315 dlm_flags |= DLM_LKF_VALBLK; 3316 3317 ret = ocfs2_dlm_lock(osb->cconn, 3318 new_level, 3319 &lockres->l_lksb, 3320 dlm_flags, 3321 lockres->l_name, 3322 OCFS2_LOCK_ID_MAX_LEN - 1); 3323 lockres_clear_pending(lockres, generation, osb); 3324 if (ret) { 3325 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 3326 ocfs2_recover_from_dlm_error(lockres, 1); 3327 goto bail; 3328 } 3329 3330 ret = 0; 3331 bail: 3332 return ret; 3333 } 3334 3335 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */ 3336 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 3337 struct ocfs2_lock_res *lockres) 3338 { 3339 assert_spin_locked(&lockres->l_lock); 3340 3341 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 3342 /* If we're already trying to cancel a lock conversion 3343 * then just drop the spinlock and allow the caller to 3344 * requeue this lock. */ 3345 mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name); 3346 return 0; 3347 } 3348 3349 /* were we in a convert when we got the bast fire? */ 3350 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 3351 lockres->l_action != OCFS2_AST_DOWNCONVERT); 3352 /* set things up for the unlockast to know to just 3353 * clear out the ast_action and unset busy, etc. */ 3354 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 3355 3356 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 3357 "lock %s, invalid flags: 0x%lx\n", 3358 lockres->l_name, lockres->l_flags); 3359 3360 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3361 3362 return 1; 3363 } 3364 3365 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 3366 struct ocfs2_lock_res *lockres) 3367 { 3368 int ret; 3369 3370 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, 3371 DLM_LKF_CANCEL); 3372 if (ret) { 3373 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3374 ocfs2_recover_from_dlm_error(lockres, 0); 3375 } 3376 3377 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3378 3379 return ret; 3380 } 3381 3382 static int ocfs2_unblock_lock(struct ocfs2_super *osb, 3383 struct ocfs2_lock_res *lockres, 3384 struct ocfs2_unblock_ctl *ctl) 3385 { 3386 unsigned long flags; 3387 int blocking; 3388 int new_level; 3389 int level; 3390 int ret = 0; 3391 int set_lvb = 0; 3392 unsigned int gen; 3393 3394 spin_lock_irqsave(&lockres->l_lock, flags); 3395 3396 recheck: 3397 /* 3398 * Is it still blocking? If not, we have no more work to do. 3399 */ 3400 if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) { 3401 BUG_ON(lockres->l_blocking != DLM_LOCK_NL); 3402 spin_unlock_irqrestore(&lockres->l_lock, flags); 3403 ret = 0; 3404 goto leave; 3405 } 3406 3407 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3408 /* XXX 3409 * This is a *big* race. The OCFS2_LOCK_PENDING flag 3410 * exists entirely for one reason - another thread has set 3411 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock(). 3412 * 3413 * If we do ocfs2_cancel_convert() before the other thread 3414 * calls dlm_lock(), our cancel will do nothing. We will 3415 * get no ast, and we will have no way of knowing the 3416 * cancel failed. Meanwhile, the other thread will call 3417 * into dlm_lock() and wait...forever. 3418 * 3419 * Why forever? Because another node has asked for the 3420 * lock first; that's why we're here in unblock_lock(). 3421 * 3422 * The solution is OCFS2_LOCK_PENDING. When PENDING is 3423 * set, we just requeue the unblock. Only when the other 3424 * thread has called dlm_lock() and cleared PENDING will 3425 * we then cancel their request. 3426 * 3427 * All callers of dlm_lock() must set OCFS2_DLM_PENDING 3428 * at the same time they set OCFS2_DLM_BUSY. They must 3429 * clear OCFS2_DLM_PENDING after dlm_lock() returns. 3430 */ 3431 if (lockres->l_flags & OCFS2_LOCK_PENDING) { 3432 mlog(ML_BASTS, "lockres %s, ReQ: Pending\n", 3433 lockres->l_name); 3434 goto leave_requeue; 3435 } 3436 3437 ctl->requeue = 1; 3438 ret = ocfs2_prepare_cancel_convert(osb, lockres); 3439 spin_unlock_irqrestore(&lockres->l_lock, flags); 3440 if (ret) { 3441 ret = ocfs2_cancel_convert(osb, lockres); 3442 if (ret < 0) 3443 mlog_errno(ret); 3444 } 3445 goto leave; 3446 } 3447 3448 /* 3449 * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is 3450 * set when the ast is received for an upconvert just before the 3451 * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast 3452 * on the heels of the ast, we want to delay the downconvert just 3453 * enough to allow the up requestor to do its task. Because this 3454 * lock is in the blocked queue, the lock will be downconverted 3455 * as soon as the requestor is done with the lock. 3456 */ 3457 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) 3458 goto leave_requeue; 3459 3460 /* 3461 * How can we block and yet be at NL? We were trying to upconvert 3462 * from NL and got canceled. The code comes back here, and now 3463 * we notice and clear BLOCKING. 3464 */ 3465 if (lockres->l_level == DLM_LOCK_NL) { 3466 BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders); 3467 mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name); 3468 lockres->l_blocking = DLM_LOCK_NL; 3469 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 3470 spin_unlock_irqrestore(&lockres->l_lock, flags); 3471 goto leave; 3472 } 3473 3474 /* if we're blocking an exclusive and we have *any* holders, 3475 * then requeue. */ 3476 if ((lockres->l_blocking == DLM_LOCK_EX) 3477 && (lockres->l_ex_holders || lockres->l_ro_holders)) { 3478 mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n", 3479 lockres->l_name, lockres->l_ex_holders, 3480 lockres->l_ro_holders); 3481 goto leave_requeue; 3482 } 3483 3484 /* If it's a PR we're blocking, then only 3485 * requeue if we've got any EX holders */ 3486 if (lockres->l_blocking == DLM_LOCK_PR && 3487 lockres->l_ex_holders) { 3488 mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n", 3489 lockres->l_name, lockres->l_ex_holders); 3490 goto leave_requeue; 3491 } 3492 3493 /* 3494 * Can we get a lock in this state if the holder counts are 3495 * zero? The meta data unblock code used to check this. 3496 */ 3497 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 3498 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) { 3499 mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n", 3500 lockres->l_name); 3501 goto leave_requeue; 3502 } 3503 3504 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 3505 3506 if (lockres->l_ops->check_downconvert 3507 && !lockres->l_ops->check_downconvert(lockres, new_level)) { 3508 mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n", 3509 lockres->l_name); 3510 goto leave_requeue; 3511 } 3512 3513 /* If we get here, then we know that there are no more 3514 * incompatible holders (and anyone asking for an incompatible 3515 * lock is blocked). We can now downconvert the lock */ 3516 if (!lockres->l_ops->downconvert_worker) 3517 goto downconvert; 3518 3519 /* Some lockres types want to do a bit of work before 3520 * downconverting a lock. Allow that here. The worker function 3521 * may sleep, so we save off a copy of what we're blocking as 3522 * it may change while we're not holding the spin lock. */ 3523 blocking = lockres->l_blocking; 3524 level = lockres->l_level; 3525 spin_unlock_irqrestore(&lockres->l_lock, flags); 3526 3527 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 3528 3529 if (ctl->unblock_action == UNBLOCK_STOP_POST) { 3530 mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n", 3531 lockres->l_name); 3532 goto leave; 3533 } 3534 3535 spin_lock_irqsave(&lockres->l_lock, flags); 3536 if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) { 3537 /* If this changed underneath us, then we can't drop 3538 * it just yet. */ 3539 mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, " 3540 "Recheck\n", lockres->l_name, blocking, 3541 lockres->l_blocking, level, lockres->l_level); 3542 goto recheck; 3543 } 3544 3545 downconvert: 3546 ctl->requeue = 0; 3547 3548 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3549 if (lockres->l_level == DLM_LOCK_EX) 3550 set_lvb = 1; 3551 3552 /* 3553 * We only set the lvb if the lock has been fully 3554 * refreshed - otherwise we risk setting stale 3555 * data. Otherwise, there's no need to actually clear 3556 * out the lvb here as it's value is still valid. 3557 */ 3558 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3559 lockres->l_ops->set_lvb(lockres); 3560 } 3561 3562 gen = ocfs2_prepare_downconvert(lockres, new_level); 3563 spin_unlock_irqrestore(&lockres->l_lock, flags); 3564 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb, 3565 gen); 3566 3567 leave: 3568 if (ret) 3569 mlog_errno(ret); 3570 return ret; 3571 3572 leave_requeue: 3573 spin_unlock_irqrestore(&lockres->l_lock, flags); 3574 ctl->requeue = 1; 3575 3576 return 0; 3577 } 3578 3579 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 3580 int blocking) 3581 { 3582 struct inode *inode; 3583 struct address_space *mapping; 3584 struct ocfs2_inode_info *oi; 3585 3586 inode = ocfs2_lock_res_inode(lockres); 3587 mapping = inode->i_mapping; 3588 3589 if (S_ISDIR(inode->i_mode)) { 3590 oi = OCFS2_I(inode); 3591 oi->ip_dir_lock_gen++; 3592 mlog(0, "generation: %u\n", oi->ip_dir_lock_gen); 3593 goto out; 3594 } 3595 3596 if (!S_ISREG(inode->i_mode)) 3597 goto out; 3598 3599 /* 3600 * We need this before the filemap_fdatawrite() so that it can 3601 * transfer the dirty bit from the PTE to the 3602 * page. Unfortunately this means that even for EX->PR 3603 * downconverts, we'll lose our mappings and have to build 3604 * them up again. 3605 */ 3606 unmap_mapping_range(mapping, 0, 0, 0); 3607 3608 if (filemap_fdatawrite(mapping)) { 3609 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 3610 (unsigned long long)OCFS2_I(inode)->ip_blkno); 3611 } 3612 sync_mapping_buffers(mapping); 3613 if (blocking == DLM_LOCK_EX) { 3614 truncate_inode_pages(mapping, 0); 3615 } else { 3616 /* We only need to wait on the I/O if we're not also 3617 * truncating pages because truncate_inode_pages waits 3618 * for us above. We don't truncate pages if we're 3619 * blocking anything < EXMODE because we want to keep 3620 * them around in that case. */ 3621 filemap_fdatawait(mapping); 3622 } 3623 3624 out: 3625 return UNBLOCK_CONTINUE; 3626 } 3627 3628 static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci, 3629 struct ocfs2_lock_res *lockres, 3630 int new_level) 3631 { 3632 int checkpointed = ocfs2_ci_fully_checkpointed(ci); 3633 3634 BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR); 3635 BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed); 3636 3637 if (checkpointed) 3638 return 1; 3639 3640 ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci))); 3641 return 0; 3642 } 3643 3644 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 3645 int new_level) 3646 { 3647 struct inode *inode = ocfs2_lock_res_inode(lockres); 3648 3649 return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level); 3650 } 3651 3652 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 3653 { 3654 struct inode *inode = ocfs2_lock_res_inode(lockres); 3655 3656 __ocfs2_stuff_meta_lvb(inode); 3657 } 3658 3659 /* 3660 * Does the final reference drop on our dentry lock. Right now this 3661 * happens in the downconvert thread, but we could choose to simplify the 3662 * dlmglue API and push these off to the ocfs2_wq in the future. 3663 */ 3664 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 3665 struct ocfs2_lock_res *lockres) 3666 { 3667 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3668 ocfs2_dentry_lock_put(osb, dl); 3669 } 3670 3671 /* 3672 * d_delete() matching dentries before the lock downconvert. 3673 * 3674 * At this point, any process waiting to destroy the 3675 * dentry_lock due to last ref count is stopped by the 3676 * OCFS2_LOCK_QUEUED flag. 3677 * 3678 * We have two potential problems 3679 * 3680 * 1) If we do the last reference drop on our dentry_lock (via dput) 3681 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 3682 * the downconvert to finish. Instead we take an elevated 3683 * reference and push the drop until after we've completed our 3684 * unblock processing. 3685 * 3686 * 2) There might be another process with a final reference, 3687 * waiting on us to finish processing. If this is the case, we 3688 * detect it and exit out - there's no more dentries anyway. 3689 */ 3690 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 3691 int blocking) 3692 { 3693 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3694 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 3695 struct dentry *dentry; 3696 unsigned long flags; 3697 int extra_ref = 0; 3698 3699 /* 3700 * This node is blocking another node from getting a read 3701 * lock. This happens when we've renamed within a 3702 * directory. We've forced the other nodes to d_delete(), but 3703 * we never actually dropped our lock because it's still 3704 * valid. The downconvert code will retain a PR for this node, 3705 * so there's no further work to do. 3706 */ 3707 if (blocking == DLM_LOCK_PR) 3708 return UNBLOCK_CONTINUE; 3709 3710 /* 3711 * Mark this inode as potentially orphaned. The code in 3712 * ocfs2_delete_inode() will figure out whether it actually 3713 * needs to be freed or not. 3714 */ 3715 spin_lock(&oi->ip_lock); 3716 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 3717 spin_unlock(&oi->ip_lock); 3718 3719 /* 3720 * Yuck. We need to make sure however that the check of 3721 * OCFS2_LOCK_FREEING and the extra reference are atomic with 3722 * respect to a reference decrement or the setting of that 3723 * flag. 3724 */ 3725 spin_lock_irqsave(&lockres->l_lock, flags); 3726 spin_lock(&dentry_attach_lock); 3727 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 3728 && dl->dl_count) { 3729 dl->dl_count++; 3730 extra_ref = 1; 3731 } 3732 spin_unlock(&dentry_attach_lock); 3733 spin_unlock_irqrestore(&lockres->l_lock, flags); 3734 3735 mlog(0, "extra_ref = %d\n", extra_ref); 3736 3737 /* 3738 * We have a process waiting on us in ocfs2_dentry_iput(), 3739 * which means we can't have any more outstanding 3740 * aliases. There's no need to do any more work. 3741 */ 3742 if (!extra_ref) 3743 return UNBLOCK_CONTINUE; 3744 3745 spin_lock(&dentry_attach_lock); 3746 while (1) { 3747 dentry = ocfs2_find_local_alias(dl->dl_inode, 3748 dl->dl_parent_blkno, 1); 3749 if (!dentry) 3750 break; 3751 spin_unlock(&dentry_attach_lock); 3752 3753 if (S_ISDIR(dl->dl_inode->i_mode)) 3754 shrink_dcache_parent(dentry); 3755 3756 mlog(0, "d_delete(%pd);\n", dentry); 3757 3758 /* 3759 * The following dcache calls may do an 3760 * iput(). Normally we don't want that from the 3761 * downconverting thread, but in this case it's ok 3762 * because the requesting node already has an 3763 * exclusive lock on the inode, so it can't be queued 3764 * for a downconvert. 3765 */ 3766 d_delete(dentry); 3767 dput(dentry); 3768 3769 spin_lock(&dentry_attach_lock); 3770 } 3771 spin_unlock(&dentry_attach_lock); 3772 3773 /* 3774 * If we are the last holder of this dentry lock, there is no 3775 * reason to downconvert so skip straight to the unlock. 3776 */ 3777 if (dl->dl_count == 1) 3778 return UNBLOCK_STOP_POST; 3779 3780 return UNBLOCK_CONTINUE_POST; 3781 } 3782 3783 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 3784 int new_level) 3785 { 3786 struct ocfs2_refcount_tree *tree = 3787 ocfs2_lock_res_refcount_tree(lockres); 3788 3789 return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level); 3790 } 3791 3792 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 3793 int blocking) 3794 { 3795 struct ocfs2_refcount_tree *tree = 3796 ocfs2_lock_res_refcount_tree(lockres); 3797 3798 ocfs2_metadata_cache_purge(&tree->rf_ci); 3799 3800 return UNBLOCK_CONTINUE; 3801 } 3802 3803 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres) 3804 { 3805 struct ocfs2_qinfo_lvb *lvb; 3806 struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres); 3807 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 3808 oinfo->dqi_gi.dqi_type); 3809 3810 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 3811 lvb->lvb_version = OCFS2_QINFO_LVB_VERSION; 3812 lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace); 3813 lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace); 3814 lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms); 3815 lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks); 3816 lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk); 3817 lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry); 3818 } 3819 3820 void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex) 3821 { 3822 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 3823 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 3824 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3825 3826 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 3827 ocfs2_cluster_unlock(osb, lockres, level); 3828 } 3829 3830 static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo) 3831 { 3832 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 3833 oinfo->dqi_gi.dqi_type); 3834 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 3835 struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 3836 struct buffer_head *bh = NULL; 3837 struct ocfs2_global_disk_dqinfo *gdinfo; 3838 int status = 0; 3839 3840 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 3841 lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) { 3842 info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace); 3843 info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace); 3844 oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms); 3845 oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks); 3846 oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk); 3847 oinfo->dqi_gi.dqi_free_entry = 3848 be32_to_cpu(lvb->lvb_free_entry); 3849 } else { 3850 status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode, 3851 oinfo->dqi_giblk, &bh); 3852 if (status) { 3853 mlog_errno(status); 3854 goto bail; 3855 } 3856 gdinfo = (struct ocfs2_global_disk_dqinfo *) 3857 (bh->b_data + OCFS2_GLOBAL_INFO_OFF); 3858 info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace); 3859 info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace); 3860 oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms); 3861 oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks); 3862 oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk); 3863 oinfo->dqi_gi.dqi_free_entry = 3864 le32_to_cpu(gdinfo->dqi_free_entry); 3865 brelse(bh); 3866 ocfs2_track_lock_refresh(lockres); 3867 } 3868 3869 bail: 3870 return status; 3871 } 3872 3873 /* Lock quota info, this function expects at least shared lock on the quota file 3874 * so that we can safely refresh quota info from disk. */ 3875 int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex) 3876 { 3877 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 3878 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 3879 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3880 int status = 0; 3881 3882 /* On RO devices, locking really isn't needed... */ 3883 if (ocfs2_is_hard_readonly(osb)) { 3884 if (ex) 3885 status = -EROFS; 3886 goto bail; 3887 } 3888 if (ocfs2_mount_local(osb)) 3889 goto bail; 3890 3891 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 3892 if (status < 0) { 3893 mlog_errno(status); 3894 goto bail; 3895 } 3896 if (!ocfs2_should_refresh_lock_res(lockres)) 3897 goto bail; 3898 /* OK, we have the lock but we need to refresh the quota info */ 3899 status = ocfs2_refresh_qinfo(oinfo); 3900 if (status) 3901 ocfs2_qinfo_unlock(oinfo, ex); 3902 ocfs2_complete_lock_res_refresh(lockres, status); 3903 bail: 3904 return status; 3905 } 3906 3907 int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex) 3908 { 3909 int status; 3910 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3911 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 3912 struct ocfs2_super *osb = lockres->l_priv; 3913 3914 3915 if (ocfs2_is_hard_readonly(osb)) 3916 return -EROFS; 3917 3918 if (ocfs2_mount_local(osb)) 3919 return 0; 3920 3921 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 3922 if (status < 0) 3923 mlog_errno(status); 3924 3925 return status; 3926 } 3927 3928 void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex) 3929 { 3930 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3931 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 3932 struct ocfs2_super *osb = lockres->l_priv; 3933 3934 if (!ocfs2_mount_local(osb)) 3935 ocfs2_cluster_unlock(osb, lockres, level); 3936 } 3937 3938 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3939 struct ocfs2_lock_res *lockres) 3940 { 3941 int status; 3942 struct ocfs2_unblock_ctl ctl = {0, 0,}; 3943 unsigned long flags; 3944 3945 /* Our reference to the lockres in this function can be 3946 * considered valid until we remove the OCFS2_LOCK_QUEUED 3947 * flag. */ 3948 3949 BUG_ON(!lockres); 3950 BUG_ON(!lockres->l_ops); 3951 3952 mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name); 3953 3954 /* Detect whether a lock has been marked as going away while 3955 * the downconvert thread was processing other things. A lock can 3956 * still be marked with OCFS2_LOCK_FREEING after this check, 3957 * but short circuiting here will still save us some 3958 * performance. */ 3959 spin_lock_irqsave(&lockres->l_lock, flags); 3960 if (lockres->l_flags & OCFS2_LOCK_FREEING) 3961 goto unqueue; 3962 spin_unlock_irqrestore(&lockres->l_lock, flags); 3963 3964 status = ocfs2_unblock_lock(osb, lockres, &ctl); 3965 if (status < 0) 3966 mlog_errno(status); 3967 3968 spin_lock_irqsave(&lockres->l_lock, flags); 3969 unqueue: 3970 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 3971 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 3972 } else 3973 ocfs2_schedule_blocked_lock(osb, lockres); 3974 3975 mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name, 3976 ctl.requeue ? "yes" : "no"); 3977 spin_unlock_irqrestore(&lockres->l_lock, flags); 3978 3979 if (ctl.unblock_action != UNBLOCK_CONTINUE 3980 && lockres->l_ops->post_unlock) 3981 lockres->l_ops->post_unlock(osb, lockres); 3982 } 3983 3984 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 3985 struct ocfs2_lock_res *lockres) 3986 { 3987 unsigned long flags; 3988 3989 assert_spin_locked(&lockres->l_lock); 3990 3991 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 3992 /* Do not schedule a lock for downconvert when it's on 3993 * the way to destruction - any nodes wanting access 3994 * to the resource will get it soon. */ 3995 mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n", 3996 lockres->l_name, lockres->l_flags); 3997 return; 3998 } 3999 4000 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 4001 4002 spin_lock_irqsave(&osb->dc_task_lock, flags); 4003 if (list_empty(&lockres->l_blocked_list)) { 4004 list_add_tail(&lockres->l_blocked_list, 4005 &osb->blocked_lock_list); 4006 osb->blocked_lock_count++; 4007 } 4008 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4009 } 4010 4011 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) 4012 { 4013 unsigned long processed; 4014 unsigned long flags; 4015 struct ocfs2_lock_res *lockres; 4016 4017 spin_lock_irqsave(&osb->dc_task_lock, flags); 4018 /* grab this early so we know to try again if a state change and 4019 * wake happens part-way through our work */ 4020 osb->dc_work_sequence = osb->dc_wake_sequence; 4021 4022 processed = osb->blocked_lock_count; 4023 while (processed) { 4024 BUG_ON(list_empty(&osb->blocked_lock_list)); 4025 4026 lockres = list_entry(osb->blocked_lock_list.next, 4027 struct ocfs2_lock_res, l_blocked_list); 4028 list_del_init(&lockres->l_blocked_list); 4029 osb->blocked_lock_count--; 4030 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4031 4032 BUG_ON(!processed); 4033 processed--; 4034 4035 ocfs2_process_blocked_lock(osb, lockres); 4036 4037 spin_lock_irqsave(&osb->dc_task_lock, flags); 4038 } 4039 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4040 } 4041 4042 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) 4043 { 4044 int empty = 0; 4045 unsigned long flags; 4046 4047 spin_lock_irqsave(&osb->dc_task_lock, flags); 4048 if (list_empty(&osb->blocked_lock_list)) 4049 empty = 1; 4050 4051 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4052 return empty; 4053 } 4054 4055 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) 4056 { 4057 int should_wake = 0; 4058 unsigned long flags; 4059 4060 spin_lock_irqsave(&osb->dc_task_lock, flags); 4061 if (osb->dc_work_sequence != osb->dc_wake_sequence) 4062 should_wake = 1; 4063 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4064 4065 return should_wake; 4066 } 4067 4068 static int ocfs2_downconvert_thread(void *arg) 4069 { 4070 int status = 0; 4071 struct ocfs2_super *osb = arg; 4072 4073 /* only quit once we've been asked to stop and there is no more 4074 * work available */ 4075 while (!(kthread_should_stop() && 4076 ocfs2_downconvert_thread_lists_empty(osb))) { 4077 4078 wait_event_interruptible(osb->dc_event, 4079 ocfs2_downconvert_thread_should_wake(osb) || 4080 kthread_should_stop()); 4081 4082 mlog(0, "downconvert_thread: awoken\n"); 4083 4084 ocfs2_downconvert_thread_do_work(osb); 4085 } 4086 4087 osb->dc_task = NULL; 4088 return status; 4089 } 4090 4091 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) 4092 { 4093 unsigned long flags; 4094 4095 spin_lock_irqsave(&osb->dc_task_lock, flags); 4096 /* make sure the voting thread gets a swipe at whatever changes 4097 * the caller may have made to the voting state */ 4098 osb->dc_wake_sequence++; 4099 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4100 wake_up(&osb->dc_event); 4101 } 4102