1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26 #include <linux/types.h> 27 #include <linux/slab.h> 28 #include <linux/highmem.h> 29 #include <linux/mm.h> 30 #include <linux/kthread.h> 31 #include <linux/pagemap.h> 32 #include <linux/debugfs.h> 33 #include <linux/seq_file.h> 34 #include <linux/time.h> 35 #include <linux/quotaops.h> 36 37 #define MLOG_MASK_PREFIX ML_DLM_GLUE 38 #include <cluster/masklog.h> 39 40 #include "ocfs2.h" 41 #include "ocfs2_lockingver.h" 42 43 #include "alloc.h" 44 #include "dcache.h" 45 #include "dlmglue.h" 46 #include "extent_map.h" 47 #include "file.h" 48 #include "heartbeat.h" 49 #include "inode.h" 50 #include "journal.h" 51 #include "stackglue.h" 52 #include "slot_map.h" 53 #include "super.h" 54 #include "uptodate.h" 55 #include "quota.h" 56 #include "refcounttree.h" 57 58 #include "buffer_head_io.h" 59 60 struct ocfs2_mask_waiter { 61 struct list_head mw_item; 62 int mw_status; 63 struct completion mw_complete; 64 unsigned long mw_mask; 65 unsigned long mw_goal; 66 #ifdef CONFIG_OCFS2_FS_STATS 67 ktime_t mw_lock_start; 68 #endif 69 }; 70 71 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 72 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 73 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres); 74 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres); 75 76 /* 77 * Return value from ->downconvert_worker functions. 78 * 79 * These control the precise actions of ocfs2_unblock_lock() 80 * and ocfs2_process_blocked_lock() 81 * 82 */ 83 enum ocfs2_unblock_action { 84 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 85 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 86 * ->post_unlock callback */ 87 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 88 * ->post_unlock() callback. */ 89 }; 90 91 struct ocfs2_unblock_ctl { 92 int requeue; 93 enum ocfs2_unblock_action unblock_action; 94 }; 95 96 /* Lockdep class keys */ 97 struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES]; 98 99 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 100 int new_level); 101 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 102 103 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 104 int blocking); 105 106 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 107 int blocking); 108 109 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 110 struct ocfs2_lock_res *lockres); 111 112 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres); 113 114 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 115 int new_level); 116 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 117 int blocking); 118 119 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres) 120 121 /* This aids in debugging situations where a bad LVB might be involved. */ 122 static void ocfs2_dump_meta_lvb_info(u64 level, 123 const char *function, 124 unsigned int line, 125 struct ocfs2_lock_res *lockres) 126 { 127 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 128 129 mlog(level, "LVB information for %s (called from %s:%u):\n", 130 lockres->l_name, function, line); 131 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 132 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 133 be32_to_cpu(lvb->lvb_igeneration)); 134 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 135 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 136 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 137 be16_to_cpu(lvb->lvb_imode)); 138 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 139 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 140 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 141 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 142 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 143 be32_to_cpu(lvb->lvb_iattr)); 144 } 145 146 147 /* 148 * OCFS2 Lock Resource Operations 149 * 150 * These fine tune the behavior of the generic dlmglue locking infrastructure. 151 * 152 * The most basic of lock types can point ->l_priv to their respective 153 * struct ocfs2_super and allow the default actions to manage things. 154 * 155 * Right now, each lock type also needs to implement an init function, 156 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres() 157 * should be called when the lock is no longer needed (i.e., object 158 * destruction time). 159 */ 160 struct ocfs2_lock_res_ops { 161 /* 162 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 163 * this callback if ->l_priv is not an ocfs2_super pointer 164 */ 165 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 166 167 /* 168 * Optionally called in the downconvert thread after a 169 * successful downconvert. The lockres will not be referenced 170 * after this callback is called, so it is safe to free 171 * memory, etc. 172 * 173 * The exact semantics of when this is called are controlled 174 * by ->downconvert_worker() 175 */ 176 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 177 178 /* 179 * Allow a lock type to add checks to determine whether it is 180 * safe to downconvert a lock. Return 0 to re-queue the 181 * downconvert at a later time, nonzero to continue. 182 * 183 * For most locks, the default checks that there are no 184 * incompatible holders are sufficient. 185 * 186 * Called with the lockres spinlock held. 187 */ 188 int (*check_downconvert)(struct ocfs2_lock_res *, int); 189 190 /* 191 * Allows a lock type to populate the lock value block. This 192 * is called on downconvert, and when we drop a lock. 193 * 194 * Locks that want to use this should set LOCK_TYPE_USES_LVB 195 * in the flags field. 196 * 197 * Called with the lockres spinlock held. 198 */ 199 void (*set_lvb)(struct ocfs2_lock_res *); 200 201 /* 202 * Called from the downconvert thread when it is determined 203 * that a lock will be downconverted. This is called without 204 * any locks held so the function can do work that might 205 * schedule (syncing out data, etc). 206 * 207 * This should return any one of the ocfs2_unblock_action 208 * values, depending on what it wants the thread to do. 209 */ 210 int (*downconvert_worker)(struct ocfs2_lock_res *, int); 211 212 /* 213 * LOCK_TYPE_* flags which describe the specific requirements 214 * of a lock type. Descriptions of each individual flag follow. 215 */ 216 int flags; 217 }; 218 219 /* 220 * Some locks want to "refresh" potentially stale data when a 221 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 222 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 223 * individual lockres l_flags member from the ast function. It is 224 * expected that the locking wrapper will clear the 225 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 226 */ 227 #define LOCK_TYPE_REQUIRES_REFRESH 0x1 228 229 /* 230 * Indicate that a lock type makes use of the lock value block. The 231 * ->set_lvb lock type callback must be defined. 232 */ 233 #define LOCK_TYPE_USES_LVB 0x2 234 235 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 236 .get_osb = ocfs2_get_inode_osb, 237 .flags = 0, 238 }; 239 240 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = { 241 .get_osb = ocfs2_get_inode_osb, 242 .check_downconvert = ocfs2_check_meta_downconvert, 243 .set_lvb = ocfs2_set_meta_lvb, 244 .downconvert_worker = ocfs2_data_convert_worker, 245 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 246 }; 247 248 static struct ocfs2_lock_res_ops ocfs2_super_lops = { 249 .flags = LOCK_TYPE_REQUIRES_REFRESH, 250 }; 251 252 static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 253 .flags = 0, 254 }; 255 256 static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = { 257 .flags = 0, 258 }; 259 260 static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = { 261 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 262 }; 263 264 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 265 .get_osb = ocfs2_get_dentry_osb, 266 .post_unlock = ocfs2_dentry_post_unlock, 267 .downconvert_worker = ocfs2_dentry_convert_worker, 268 .flags = 0, 269 }; 270 271 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = { 272 .get_osb = ocfs2_get_inode_osb, 273 .flags = 0, 274 }; 275 276 static struct ocfs2_lock_res_ops ocfs2_flock_lops = { 277 .get_osb = ocfs2_get_file_osb, 278 .flags = 0, 279 }; 280 281 static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = { 282 .set_lvb = ocfs2_set_qinfo_lvb, 283 .get_osb = ocfs2_get_qinfo_osb, 284 .flags = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB, 285 }; 286 287 static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = { 288 .check_downconvert = ocfs2_check_refcount_downconvert, 289 .downconvert_worker = ocfs2_refcount_convert_worker, 290 .flags = 0, 291 }; 292 293 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 294 { 295 return lockres->l_type == OCFS2_LOCK_TYPE_META || 296 lockres->l_type == OCFS2_LOCK_TYPE_RW || 297 lockres->l_type == OCFS2_LOCK_TYPE_OPEN; 298 } 299 300 static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb) 301 { 302 return container_of(lksb, struct ocfs2_lock_res, l_lksb); 303 } 304 305 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 306 { 307 BUG_ON(!ocfs2_is_inode_lock(lockres)); 308 309 return (struct inode *) lockres->l_priv; 310 } 311 312 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 313 { 314 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 315 316 return (struct ocfs2_dentry_lock *)lockres->l_priv; 317 } 318 319 static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres) 320 { 321 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO); 322 323 return (struct ocfs2_mem_dqinfo *)lockres->l_priv; 324 } 325 326 static inline struct ocfs2_refcount_tree * 327 ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res) 328 { 329 return container_of(res, struct ocfs2_refcount_tree, rf_lockres); 330 } 331 332 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 333 { 334 if (lockres->l_ops->get_osb) 335 return lockres->l_ops->get_osb(lockres); 336 337 return (struct ocfs2_super *)lockres->l_priv; 338 } 339 340 static int ocfs2_lock_create(struct ocfs2_super *osb, 341 struct ocfs2_lock_res *lockres, 342 int level, 343 u32 dlm_flags); 344 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 345 int wanted); 346 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 347 struct ocfs2_lock_res *lockres, 348 int level, unsigned long caller_ip); 349 static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb, 350 struct ocfs2_lock_res *lockres, 351 int level) 352 { 353 __ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_); 354 } 355 356 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 357 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 358 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 359 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 360 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 361 struct ocfs2_lock_res *lockres); 362 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 363 int convert); 364 #define ocfs2_log_dlm_error(_func, _err, _lockres) do { \ 365 if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY) \ 366 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \ 367 _err, _func, _lockres->l_name); \ 368 else \ 369 mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n", \ 370 _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name, \ 371 (unsigned int)ocfs2_get_dentry_lock_ino(_lockres)); \ 372 } while (0) 373 static int ocfs2_downconvert_thread(void *arg); 374 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 375 struct ocfs2_lock_res *lockres); 376 static int ocfs2_inode_lock_update(struct inode *inode, 377 struct buffer_head **bh); 378 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 379 static inline int ocfs2_highest_compat_lock_level(int level); 380 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 381 int new_level); 382 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 383 struct ocfs2_lock_res *lockres, 384 int new_level, 385 int lvb, 386 unsigned int generation); 387 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 388 struct ocfs2_lock_res *lockres); 389 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 390 struct ocfs2_lock_res *lockres); 391 392 393 static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 394 u64 blkno, 395 u32 generation, 396 char *name) 397 { 398 int len; 399 400 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 401 402 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 403 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 404 (long long)blkno, generation); 405 406 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 407 408 mlog(0, "built lock resource with name: %s\n", name); 409 } 410 411 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 412 413 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 414 struct ocfs2_dlm_debug *dlm_debug) 415 { 416 mlog(0, "Add tracking for lockres %s\n", res->l_name); 417 418 spin_lock(&ocfs2_dlm_tracking_lock); 419 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 420 spin_unlock(&ocfs2_dlm_tracking_lock); 421 } 422 423 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 424 { 425 spin_lock(&ocfs2_dlm_tracking_lock); 426 if (!list_empty(&res->l_debug_list)) 427 list_del_init(&res->l_debug_list); 428 spin_unlock(&ocfs2_dlm_tracking_lock); 429 } 430 431 #ifdef CONFIG_OCFS2_FS_STATS 432 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 433 { 434 res->l_lock_refresh = 0; 435 memset(&res->l_lock_prmode, 0, sizeof(struct ocfs2_lock_stats)); 436 memset(&res->l_lock_exmode, 0, sizeof(struct ocfs2_lock_stats)); 437 } 438 439 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level, 440 struct ocfs2_mask_waiter *mw, int ret) 441 { 442 u32 usec; 443 ktime_t kt; 444 struct ocfs2_lock_stats *stats; 445 446 if (level == LKM_PRMODE) 447 stats = &res->l_lock_prmode; 448 else if (level == LKM_EXMODE) 449 stats = &res->l_lock_exmode; 450 else 451 return; 452 453 kt = ktime_sub(ktime_get(), mw->mw_lock_start); 454 usec = ktime_to_us(kt); 455 456 stats->ls_gets++; 457 stats->ls_total += ktime_to_ns(kt); 458 /* overflow */ 459 if (unlikely(stats->ls_gets == 0)) { 460 stats->ls_gets++; 461 stats->ls_total = ktime_to_ns(kt); 462 } 463 464 if (stats->ls_max < usec) 465 stats->ls_max = usec; 466 467 if (ret) 468 stats->ls_fail++; 469 } 470 471 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 472 { 473 lockres->l_lock_refresh++; 474 } 475 476 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 477 { 478 mw->mw_lock_start = ktime_get(); 479 } 480 #else 481 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 482 { 483 } 484 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, 485 int level, struct ocfs2_mask_waiter *mw, int ret) 486 { 487 } 488 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 489 { 490 } 491 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 492 { 493 } 494 #endif 495 496 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 497 struct ocfs2_lock_res *res, 498 enum ocfs2_lock_type type, 499 struct ocfs2_lock_res_ops *ops, 500 void *priv) 501 { 502 res->l_type = type; 503 res->l_ops = ops; 504 res->l_priv = priv; 505 506 res->l_level = DLM_LOCK_IV; 507 res->l_requested = DLM_LOCK_IV; 508 res->l_blocking = DLM_LOCK_IV; 509 res->l_action = OCFS2_AST_INVALID; 510 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 511 512 res->l_flags = OCFS2_LOCK_INITIALIZED; 513 514 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 515 516 ocfs2_init_lock_stats(res); 517 #ifdef CONFIG_DEBUG_LOCK_ALLOC 518 if (type != OCFS2_LOCK_TYPE_OPEN) 519 lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type], 520 &lockdep_keys[type], 0); 521 else 522 res->l_lockdep_map.key = NULL; 523 #endif 524 } 525 526 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 527 { 528 /* This also clears out the lock status block */ 529 memset(res, 0, sizeof(struct ocfs2_lock_res)); 530 spin_lock_init(&res->l_lock); 531 init_waitqueue_head(&res->l_event); 532 INIT_LIST_HEAD(&res->l_blocked_list); 533 INIT_LIST_HEAD(&res->l_mask_waiters); 534 } 535 536 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 537 enum ocfs2_lock_type type, 538 unsigned int generation, 539 struct inode *inode) 540 { 541 struct ocfs2_lock_res_ops *ops; 542 543 switch(type) { 544 case OCFS2_LOCK_TYPE_RW: 545 ops = &ocfs2_inode_rw_lops; 546 break; 547 case OCFS2_LOCK_TYPE_META: 548 ops = &ocfs2_inode_inode_lops; 549 break; 550 case OCFS2_LOCK_TYPE_OPEN: 551 ops = &ocfs2_inode_open_lops; 552 break; 553 default: 554 mlog_bug_on_msg(1, "type: %d\n", type); 555 ops = NULL; /* thanks, gcc */ 556 break; 557 }; 558 559 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 560 generation, res->l_name); 561 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 562 } 563 564 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 565 { 566 struct inode *inode = ocfs2_lock_res_inode(lockres); 567 568 return OCFS2_SB(inode->i_sb); 569 } 570 571 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres) 572 { 573 struct ocfs2_mem_dqinfo *info = lockres->l_priv; 574 575 return OCFS2_SB(info->dqi_gi.dqi_sb); 576 } 577 578 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres) 579 { 580 struct ocfs2_file_private *fp = lockres->l_priv; 581 582 return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb); 583 } 584 585 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 586 { 587 __be64 inode_blkno_be; 588 589 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 590 sizeof(__be64)); 591 592 return be64_to_cpu(inode_blkno_be); 593 } 594 595 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 596 { 597 struct ocfs2_dentry_lock *dl = lockres->l_priv; 598 599 return OCFS2_SB(dl->dl_inode->i_sb); 600 } 601 602 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 603 u64 parent, struct inode *inode) 604 { 605 int len; 606 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 607 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 608 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 609 610 ocfs2_lock_res_init_once(lockres); 611 612 /* 613 * Unfortunately, the standard lock naming scheme won't work 614 * here because we have two 16 byte values to use. Instead, 615 * we'll stuff the inode number as a binary value. We still 616 * want error prints to show something without garbling the 617 * display, so drop a null byte in there before the inode 618 * number. A future version of OCFS2 will likely use all 619 * binary lock names. The stringified names have been a 620 * tremendous aid in debugging, but now that the debugfs 621 * interface exists, we can mangle things there if need be. 622 * 623 * NOTE: We also drop the standard "pad" value (the total lock 624 * name size stays the same though - the last part is all 625 * zeros due to the memset in ocfs2_lock_res_init_once() 626 */ 627 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 628 "%c%016llx", 629 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 630 (long long)parent); 631 632 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 633 634 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 635 sizeof(__be64)); 636 637 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 638 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 639 dl); 640 } 641 642 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 643 struct ocfs2_super *osb) 644 { 645 /* Superblock lockres doesn't come from a slab so we call init 646 * once on it manually. */ 647 ocfs2_lock_res_init_once(res); 648 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 649 0, res->l_name); 650 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 651 &ocfs2_super_lops, osb); 652 } 653 654 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 655 struct ocfs2_super *osb) 656 { 657 /* Rename lockres doesn't come from a slab so we call init 658 * once on it manually. */ 659 ocfs2_lock_res_init_once(res); 660 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 661 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 662 &ocfs2_rename_lops, osb); 663 } 664 665 static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res, 666 struct ocfs2_super *osb) 667 { 668 /* nfs_sync lockres doesn't come from a slab so we call init 669 * once on it manually. */ 670 ocfs2_lock_res_init_once(res); 671 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name); 672 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC, 673 &ocfs2_nfs_sync_lops, osb); 674 } 675 676 static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res, 677 struct ocfs2_super *osb) 678 { 679 ocfs2_lock_res_init_once(res); 680 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name); 681 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN, 682 &ocfs2_orphan_scan_lops, osb); 683 } 684 685 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 686 struct ocfs2_file_private *fp) 687 { 688 struct inode *inode = fp->fp_file->f_mapping->host; 689 struct ocfs2_inode_info *oi = OCFS2_I(inode); 690 691 ocfs2_lock_res_init_once(lockres); 692 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno, 693 inode->i_generation, lockres->l_name); 694 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 695 OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops, 696 fp); 697 lockres->l_flags |= OCFS2_LOCK_NOCACHE; 698 } 699 700 void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres, 701 struct ocfs2_mem_dqinfo *info) 702 { 703 ocfs2_lock_res_init_once(lockres); 704 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type, 705 0, lockres->l_name); 706 ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres, 707 OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops, 708 info); 709 } 710 711 void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres, 712 struct ocfs2_super *osb, u64 ref_blkno, 713 unsigned int generation) 714 { 715 ocfs2_lock_res_init_once(lockres); 716 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno, 717 generation, lockres->l_name); 718 ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT, 719 &ocfs2_refcount_block_lops, osb); 720 } 721 722 void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 723 { 724 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 725 return; 726 727 ocfs2_remove_lockres_tracking(res); 728 729 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 730 "Lockres %s is on the blocked list\n", 731 res->l_name); 732 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 733 "Lockres %s has mask waiters pending\n", 734 res->l_name); 735 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 736 "Lockres %s is locked\n", 737 res->l_name); 738 mlog_bug_on_msg(res->l_ro_holders, 739 "Lockres %s has %u ro holders\n", 740 res->l_name, res->l_ro_holders); 741 mlog_bug_on_msg(res->l_ex_holders, 742 "Lockres %s has %u ex holders\n", 743 res->l_name, res->l_ex_holders); 744 745 /* Need to clear out the lock status block for the dlm */ 746 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 747 748 res->l_flags = 0UL; 749 } 750 751 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 752 int level) 753 { 754 BUG_ON(!lockres); 755 756 switch(level) { 757 case DLM_LOCK_EX: 758 lockres->l_ex_holders++; 759 break; 760 case DLM_LOCK_PR: 761 lockres->l_ro_holders++; 762 break; 763 default: 764 BUG(); 765 } 766 } 767 768 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 769 int level) 770 { 771 BUG_ON(!lockres); 772 773 switch(level) { 774 case DLM_LOCK_EX: 775 BUG_ON(!lockres->l_ex_holders); 776 lockres->l_ex_holders--; 777 break; 778 case DLM_LOCK_PR: 779 BUG_ON(!lockres->l_ro_holders); 780 lockres->l_ro_holders--; 781 break; 782 default: 783 BUG(); 784 } 785 } 786 787 /* WARNING: This function lives in a world where the only three lock 788 * levels are EX, PR, and NL. It *will* have to be adjusted when more 789 * lock types are added. */ 790 static inline int ocfs2_highest_compat_lock_level(int level) 791 { 792 int new_level = DLM_LOCK_EX; 793 794 if (level == DLM_LOCK_EX) 795 new_level = DLM_LOCK_NL; 796 else if (level == DLM_LOCK_PR) 797 new_level = DLM_LOCK_PR; 798 return new_level; 799 } 800 801 static void lockres_set_flags(struct ocfs2_lock_res *lockres, 802 unsigned long newflags) 803 { 804 struct ocfs2_mask_waiter *mw, *tmp; 805 806 assert_spin_locked(&lockres->l_lock); 807 808 lockres->l_flags = newflags; 809 810 list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) { 811 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 812 continue; 813 814 list_del_init(&mw->mw_item); 815 mw->mw_status = 0; 816 complete(&mw->mw_complete); 817 } 818 } 819 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 820 { 821 lockres_set_flags(lockres, lockres->l_flags | or); 822 } 823 static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 824 unsigned long clear) 825 { 826 lockres_set_flags(lockres, lockres->l_flags & ~clear); 827 } 828 829 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 830 { 831 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 832 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 833 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 834 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 835 836 lockres->l_level = lockres->l_requested; 837 if (lockres->l_level <= 838 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 839 lockres->l_blocking = DLM_LOCK_NL; 840 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 841 } 842 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 843 } 844 845 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 846 { 847 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 848 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 849 850 /* Convert from RO to EX doesn't really need anything as our 851 * information is already up to data. Convert from NL to 852 * *anything* however should mark ourselves as needing an 853 * update */ 854 if (lockres->l_level == DLM_LOCK_NL && 855 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 856 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 857 858 lockres->l_level = lockres->l_requested; 859 860 /* 861 * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing 862 * the OCFS2_LOCK_BUSY flag to prevent the dc thread from 863 * downconverting the lock before the upconvert has fully completed. 864 * Do not prevent the dc thread from downconverting if NONBLOCK lock 865 * had already returned. 866 */ 867 if (!(lockres->l_flags & OCFS2_LOCK_NONBLOCK_FINISHED)) 868 lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 869 else 870 lockres_clear_flags(lockres, OCFS2_LOCK_NONBLOCK_FINISHED); 871 872 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 873 } 874 875 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 876 { 877 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); 878 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 879 880 if (lockres->l_requested > DLM_LOCK_NL && 881 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 882 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 883 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 884 885 lockres->l_level = lockres->l_requested; 886 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 887 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 888 } 889 890 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 891 int level) 892 { 893 int needs_downconvert = 0; 894 895 assert_spin_locked(&lockres->l_lock); 896 897 if (level > lockres->l_blocking) { 898 /* only schedule a downconvert if we haven't already scheduled 899 * one that goes low enough to satisfy the level we're 900 * blocking. this also catches the case where we get 901 * duplicate BASTs */ 902 if (ocfs2_highest_compat_lock_level(level) < 903 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 904 needs_downconvert = 1; 905 906 lockres->l_blocking = level; 907 } 908 909 mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n", 910 lockres->l_name, level, lockres->l_level, lockres->l_blocking, 911 needs_downconvert); 912 913 if (needs_downconvert) 914 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 915 mlog(0, "needs_downconvert = %d\n", needs_downconvert); 916 return needs_downconvert; 917 } 918 919 /* 920 * OCFS2_LOCK_PENDING and l_pending_gen. 921 * 922 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting 923 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock() 924 * for more details on the race. 925 * 926 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces 927 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock() 928 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear 929 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns, 930 * the caller is going to try to clear PENDING again. If nothing else is 931 * happening, __lockres_clear_pending() sees PENDING is unset and does 932 * nothing. 933 * 934 * But what if another path (eg downconvert thread) has just started a 935 * new locking action? The other path has re-set PENDING. Our path 936 * cannot clear PENDING, because that will re-open the original race 937 * window. 938 * 939 * [Example] 940 * 941 * ocfs2_meta_lock() 942 * ocfs2_cluster_lock() 943 * set BUSY 944 * set PENDING 945 * drop l_lock 946 * ocfs2_dlm_lock() 947 * ocfs2_locking_ast() ocfs2_downconvert_thread() 948 * clear PENDING ocfs2_unblock_lock() 949 * take_l_lock 950 * !BUSY 951 * ocfs2_prepare_downconvert() 952 * set BUSY 953 * set PENDING 954 * drop l_lock 955 * take l_lock 956 * clear PENDING 957 * drop l_lock 958 * <window> 959 * ocfs2_dlm_lock() 960 * 961 * So as you can see, we now have a window where l_lock is not held, 962 * PENDING is not set, and ocfs2_dlm_lock() has not been called. 963 * 964 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING 965 * set by ocfs2_prepare_downconvert(). That wasn't nice. 966 * 967 * To solve this we introduce l_pending_gen. A call to 968 * lockres_clear_pending() will only do so when it is passed a generation 969 * number that matches the lockres. lockres_set_pending() will return the 970 * current generation number. When ocfs2_cluster_lock() goes to clear 971 * PENDING, it passes the generation it got from set_pending(). In our 972 * example above, the generation numbers will *not* match. Thus, 973 * ocfs2_cluster_lock() will not clear the PENDING set by 974 * ocfs2_prepare_downconvert(). 975 */ 976 977 /* Unlocked version for ocfs2_locking_ast() */ 978 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres, 979 unsigned int generation, 980 struct ocfs2_super *osb) 981 { 982 assert_spin_locked(&lockres->l_lock); 983 984 /* 985 * The ast and locking functions can race us here. The winner 986 * will clear pending, the loser will not. 987 */ 988 if (!(lockres->l_flags & OCFS2_LOCK_PENDING) || 989 (lockres->l_pending_gen != generation)) 990 return; 991 992 lockres_clear_flags(lockres, OCFS2_LOCK_PENDING); 993 lockres->l_pending_gen++; 994 995 /* 996 * The downconvert thread may have skipped us because we 997 * were PENDING. Wake it up. 998 */ 999 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1000 ocfs2_wake_downconvert_thread(osb); 1001 } 1002 1003 /* Locked version for callers of ocfs2_dlm_lock() */ 1004 static void lockres_clear_pending(struct ocfs2_lock_res *lockres, 1005 unsigned int generation, 1006 struct ocfs2_super *osb) 1007 { 1008 unsigned long flags; 1009 1010 spin_lock_irqsave(&lockres->l_lock, flags); 1011 __lockres_clear_pending(lockres, generation, osb); 1012 spin_unlock_irqrestore(&lockres->l_lock, flags); 1013 } 1014 1015 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres) 1016 { 1017 assert_spin_locked(&lockres->l_lock); 1018 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 1019 1020 lockres_or_flags(lockres, OCFS2_LOCK_PENDING); 1021 1022 return lockres->l_pending_gen; 1023 } 1024 1025 static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level) 1026 { 1027 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1028 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1029 int needs_downconvert; 1030 unsigned long flags; 1031 1032 BUG_ON(level <= DLM_LOCK_NL); 1033 1034 mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, " 1035 "type %s\n", lockres->l_name, level, lockres->l_level, 1036 ocfs2_lock_type_string(lockres->l_type)); 1037 1038 /* 1039 * We can skip the bast for locks which don't enable caching - 1040 * they'll be dropped at the earliest possible time anyway. 1041 */ 1042 if (lockres->l_flags & OCFS2_LOCK_NOCACHE) 1043 return; 1044 1045 spin_lock_irqsave(&lockres->l_lock, flags); 1046 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 1047 if (needs_downconvert) 1048 ocfs2_schedule_blocked_lock(osb, lockres); 1049 spin_unlock_irqrestore(&lockres->l_lock, flags); 1050 1051 wake_up(&lockres->l_event); 1052 1053 ocfs2_wake_downconvert_thread(osb); 1054 } 1055 1056 static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb) 1057 { 1058 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1059 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1060 unsigned long flags; 1061 int status; 1062 1063 spin_lock_irqsave(&lockres->l_lock, flags); 1064 1065 status = ocfs2_dlm_lock_status(&lockres->l_lksb); 1066 1067 if (status == -EAGAIN) { 1068 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1069 goto out; 1070 } 1071 1072 if (status) { 1073 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n", 1074 lockres->l_name, status); 1075 spin_unlock_irqrestore(&lockres->l_lock, flags); 1076 return; 1077 } 1078 1079 mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, " 1080 "level %d => %d\n", lockres->l_name, lockres->l_action, 1081 lockres->l_unlock_action, lockres->l_level, lockres->l_requested); 1082 1083 switch(lockres->l_action) { 1084 case OCFS2_AST_ATTACH: 1085 ocfs2_generic_handle_attach_action(lockres); 1086 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 1087 break; 1088 case OCFS2_AST_CONVERT: 1089 ocfs2_generic_handle_convert_action(lockres); 1090 break; 1091 case OCFS2_AST_DOWNCONVERT: 1092 ocfs2_generic_handle_downconvert_action(lockres); 1093 break; 1094 default: 1095 mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, " 1096 "flags 0x%lx, unlock: %u\n", 1097 lockres->l_name, lockres->l_action, lockres->l_flags, 1098 lockres->l_unlock_action); 1099 BUG(); 1100 } 1101 out: 1102 /* set it to something invalid so if we get called again we 1103 * can catch it. */ 1104 lockres->l_action = OCFS2_AST_INVALID; 1105 1106 /* Did we try to cancel this lock? Clear that state */ 1107 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) 1108 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1109 1110 /* 1111 * We may have beaten the locking functions here. We certainly 1112 * know that dlm_lock() has been called :-) 1113 * Because we can't have two lock calls in flight at once, we 1114 * can use lockres->l_pending_gen. 1115 */ 1116 __lockres_clear_pending(lockres, lockres->l_pending_gen, osb); 1117 1118 wake_up(&lockres->l_event); 1119 spin_unlock_irqrestore(&lockres->l_lock, flags); 1120 } 1121 1122 static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error) 1123 { 1124 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1125 unsigned long flags; 1126 1127 mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n", 1128 lockres->l_name, lockres->l_unlock_action); 1129 1130 spin_lock_irqsave(&lockres->l_lock, flags); 1131 if (error) { 1132 mlog(ML_ERROR, "Dlm passes error %d for lock %s, " 1133 "unlock_action %d\n", error, lockres->l_name, 1134 lockres->l_unlock_action); 1135 spin_unlock_irqrestore(&lockres->l_lock, flags); 1136 return; 1137 } 1138 1139 switch(lockres->l_unlock_action) { 1140 case OCFS2_UNLOCK_CANCEL_CONVERT: 1141 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 1142 lockres->l_action = OCFS2_AST_INVALID; 1143 /* Downconvert thread may have requeued this lock, we 1144 * need to wake it. */ 1145 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1146 ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres)); 1147 break; 1148 case OCFS2_UNLOCK_DROP_LOCK: 1149 lockres->l_level = DLM_LOCK_IV; 1150 break; 1151 default: 1152 BUG(); 1153 } 1154 1155 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1156 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1157 wake_up(&lockres->l_event); 1158 spin_unlock_irqrestore(&lockres->l_lock, flags); 1159 } 1160 1161 /* 1162 * This is the filesystem locking protocol. It provides the lock handling 1163 * hooks for the underlying DLM. It has a maximum version number. 1164 * The version number allows interoperability with systems running at 1165 * the same major number and an equal or smaller minor number. 1166 * 1167 * Whenever the filesystem does new things with locks (adds or removes a 1168 * lock, orders them differently, does different things underneath a lock), 1169 * the version must be changed. The protocol is negotiated when joining 1170 * the dlm domain. A node may join the domain if its major version is 1171 * identical to all other nodes and its minor version is greater than 1172 * or equal to all other nodes. When its minor version is greater than 1173 * the other nodes, it will run at the minor version specified by the 1174 * other nodes. 1175 * 1176 * If a locking change is made that will not be compatible with older 1177 * versions, the major number must be increased and the minor version set 1178 * to zero. If a change merely adds a behavior that can be disabled when 1179 * speaking to older versions, the minor version must be increased. If a 1180 * change adds a fully backwards compatible change (eg, LVB changes that 1181 * are just ignored by older versions), the version does not need to be 1182 * updated. 1183 */ 1184 static struct ocfs2_locking_protocol lproto = { 1185 .lp_max_version = { 1186 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 1187 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 1188 }, 1189 .lp_lock_ast = ocfs2_locking_ast, 1190 .lp_blocking_ast = ocfs2_blocking_ast, 1191 .lp_unlock_ast = ocfs2_unlock_ast, 1192 }; 1193 1194 void ocfs2_set_locking_protocol(void) 1195 { 1196 ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version); 1197 } 1198 1199 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 1200 int convert) 1201 { 1202 unsigned long flags; 1203 1204 spin_lock_irqsave(&lockres->l_lock, flags); 1205 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1206 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1207 if (convert) 1208 lockres->l_action = OCFS2_AST_INVALID; 1209 else 1210 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1211 spin_unlock_irqrestore(&lockres->l_lock, flags); 1212 1213 wake_up(&lockres->l_event); 1214 } 1215 1216 /* Note: If we detect another process working on the lock (i.e., 1217 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 1218 * to do the right thing in that case. 1219 */ 1220 static int ocfs2_lock_create(struct ocfs2_super *osb, 1221 struct ocfs2_lock_res *lockres, 1222 int level, 1223 u32 dlm_flags) 1224 { 1225 int ret = 0; 1226 unsigned long flags; 1227 unsigned int gen; 1228 1229 mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level, 1230 dlm_flags); 1231 1232 spin_lock_irqsave(&lockres->l_lock, flags); 1233 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 1234 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 1235 spin_unlock_irqrestore(&lockres->l_lock, flags); 1236 goto bail; 1237 } 1238 1239 lockres->l_action = OCFS2_AST_ATTACH; 1240 lockres->l_requested = level; 1241 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1242 gen = lockres_set_pending(lockres); 1243 spin_unlock_irqrestore(&lockres->l_lock, flags); 1244 1245 ret = ocfs2_dlm_lock(osb->cconn, 1246 level, 1247 &lockres->l_lksb, 1248 dlm_flags, 1249 lockres->l_name, 1250 OCFS2_LOCK_ID_MAX_LEN - 1); 1251 lockres_clear_pending(lockres, gen, osb); 1252 if (ret) { 1253 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1254 ocfs2_recover_from_dlm_error(lockres, 1); 1255 } 1256 1257 mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name); 1258 1259 bail: 1260 return ret; 1261 } 1262 1263 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 1264 int flag) 1265 { 1266 unsigned long flags; 1267 int ret; 1268 1269 spin_lock_irqsave(&lockres->l_lock, flags); 1270 ret = lockres->l_flags & flag; 1271 spin_unlock_irqrestore(&lockres->l_lock, flags); 1272 1273 return ret; 1274 } 1275 1276 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 1277 1278 { 1279 wait_event(lockres->l_event, 1280 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 1281 } 1282 1283 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 1284 1285 { 1286 wait_event(lockres->l_event, 1287 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 1288 } 1289 1290 /* predict what lock level we'll be dropping down to on behalf 1291 * of another node, and return true if the currently wanted 1292 * level will be compatible with it. */ 1293 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 1294 int wanted) 1295 { 1296 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 1297 1298 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 1299 } 1300 1301 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 1302 { 1303 INIT_LIST_HEAD(&mw->mw_item); 1304 init_completion(&mw->mw_complete); 1305 ocfs2_init_start_time(mw); 1306 } 1307 1308 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 1309 { 1310 wait_for_completion(&mw->mw_complete); 1311 /* Re-arm the completion in case we want to wait on it again */ 1312 reinit_completion(&mw->mw_complete); 1313 return mw->mw_status; 1314 } 1315 1316 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 1317 struct ocfs2_mask_waiter *mw, 1318 unsigned long mask, 1319 unsigned long goal) 1320 { 1321 BUG_ON(!list_empty(&mw->mw_item)); 1322 1323 assert_spin_locked(&lockres->l_lock); 1324 1325 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 1326 mw->mw_mask = mask; 1327 mw->mw_goal = goal; 1328 } 1329 1330 /* returns 0 if the mw that was removed was already satisfied, -EBUSY 1331 * if the mask still hadn't reached its goal */ 1332 static int __lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1333 struct ocfs2_mask_waiter *mw) 1334 { 1335 int ret = 0; 1336 1337 assert_spin_locked(&lockres->l_lock); 1338 if (!list_empty(&mw->mw_item)) { 1339 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 1340 ret = -EBUSY; 1341 1342 list_del_init(&mw->mw_item); 1343 init_completion(&mw->mw_complete); 1344 } 1345 1346 return ret; 1347 } 1348 1349 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1350 struct ocfs2_mask_waiter *mw) 1351 { 1352 unsigned long flags; 1353 int ret = 0; 1354 1355 spin_lock_irqsave(&lockres->l_lock, flags); 1356 ret = __lockres_remove_mask_waiter(lockres, mw); 1357 spin_unlock_irqrestore(&lockres->l_lock, flags); 1358 1359 return ret; 1360 1361 } 1362 1363 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, 1364 struct ocfs2_lock_res *lockres) 1365 { 1366 int ret; 1367 1368 ret = wait_for_completion_interruptible(&mw->mw_complete); 1369 if (ret) 1370 lockres_remove_mask_waiter(lockres, mw); 1371 else 1372 ret = mw->mw_status; 1373 /* Re-arm the completion in case we want to wait on it again */ 1374 reinit_completion(&mw->mw_complete); 1375 return ret; 1376 } 1377 1378 static int __ocfs2_cluster_lock(struct ocfs2_super *osb, 1379 struct ocfs2_lock_res *lockres, 1380 int level, 1381 u32 lkm_flags, 1382 int arg_flags, 1383 int l_subclass, 1384 unsigned long caller_ip) 1385 { 1386 struct ocfs2_mask_waiter mw; 1387 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1388 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1389 unsigned long flags; 1390 unsigned int gen; 1391 int noqueue_attempted = 0; 1392 int dlm_locked = 0; 1393 1394 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) { 1395 mlog_errno(-EINVAL); 1396 return -EINVAL; 1397 } 1398 1399 ocfs2_init_mask_waiter(&mw); 1400 1401 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 1402 lkm_flags |= DLM_LKF_VALBLK; 1403 1404 again: 1405 wait = 0; 1406 1407 spin_lock_irqsave(&lockres->l_lock, flags); 1408 1409 if (catch_signals && signal_pending(current)) { 1410 ret = -ERESTARTSYS; 1411 goto unlock; 1412 } 1413 1414 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1415 "Cluster lock called on freeing lockres %s! flags " 1416 "0x%lx\n", lockres->l_name, lockres->l_flags); 1417 1418 /* We only compare against the currently granted level 1419 * here. If the lock is blocked waiting on a downconvert, 1420 * we'll get caught below. */ 1421 if (lockres->l_flags & OCFS2_LOCK_BUSY && 1422 level > lockres->l_level) { 1423 /* is someone sitting in dlm_lock? If so, wait on 1424 * them. */ 1425 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1426 wait = 1; 1427 goto unlock; 1428 } 1429 1430 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) { 1431 /* 1432 * We've upconverted. If the lock now has a level we can 1433 * work with, we take it. If, however, the lock is not at the 1434 * required level, we go thru the full cycle. One way this could 1435 * happen is if a process requesting an upconvert to PR is 1436 * closely followed by another requesting upconvert to an EX. 1437 * If the process requesting EX lands here, we want it to 1438 * continue attempting to upconvert and let the process 1439 * requesting PR take the lock. 1440 * If multiple processes request upconvert to PR, the first one 1441 * here will take the lock. The others will have to go thru the 1442 * OCFS2_LOCK_BLOCKED check to ensure that there is no pending 1443 * downconvert request. 1444 */ 1445 if (level <= lockres->l_level) 1446 goto update_holders; 1447 } 1448 1449 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1450 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1451 /* is the lock is currently blocked on behalf of 1452 * another node */ 1453 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 1454 wait = 1; 1455 goto unlock; 1456 } 1457 1458 if (level > lockres->l_level) { 1459 if (noqueue_attempted > 0) { 1460 ret = -EAGAIN; 1461 goto unlock; 1462 } 1463 if (lkm_flags & DLM_LKF_NOQUEUE) 1464 noqueue_attempted = 1; 1465 1466 if (lockres->l_action != OCFS2_AST_INVALID) 1467 mlog(ML_ERROR, "lockres %s has action %u pending\n", 1468 lockres->l_name, lockres->l_action); 1469 1470 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1471 lockres->l_action = OCFS2_AST_ATTACH; 1472 lkm_flags &= ~DLM_LKF_CONVERT; 1473 } else { 1474 lockres->l_action = OCFS2_AST_CONVERT; 1475 lkm_flags |= DLM_LKF_CONVERT; 1476 } 1477 1478 lockres->l_requested = level; 1479 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1480 gen = lockres_set_pending(lockres); 1481 spin_unlock_irqrestore(&lockres->l_lock, flags); 1482 1483 BUG_ON(level == DLM_LOCK_IV); 1484 BUG_ON(level == DLM_LOCK_NL); 1485 1486 mlog(ML_BASTS, "lockres %s, convert from %d to %d\n", 1487 lockres->l_name, lockres->l_level, level); 1488 1489 /* call dlm_lock to upgrade lock now */ 1490 ret = ocfs2_dlm_lock(osb->cconn, 1491 level, 1492 &lockres->l_lksb, 1493 lkm_flags, 1494 lockres->l_name, 1495 OCFS2_LOCK_ID_MAX_LEN - 1); 1496 lockres_clear_pending(lockres, gen, osb); 1497 if (ret) { 1498 if (!(lkm_flags & DLM_LKF_NOQUEUE) || 1499 (ret != -EAGAIN)) { 1500 ocfs2_log_dlm_error("ocfs2_dlm_lock", 1501 ret, lockres); 1502 } 1503 ocfs2_recover_from_dlm_error(lockres, 1); 1504 goto out; 1505 } 1506 dlm_locked = 1; 1507 1508 mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n", 1509 lockres->l_name); 1510 1511 /* At this point we've gone inside the dlm and need to 1512 * complete our work regardless. */ 1513 catch_signals = 0; 1514 1515 /* wait for busy to clear and carry on */ 1516 goto again; 1517 } 1518 1519 update_holders: 1520 /* Ok, if we get here then we're good to go. */ 1521 ocfs2_inc_holders(lockres, level); 1522 1523 ret = 0; 1524 unlock: 1525 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1526 1527 spin_unlock_irqrestore(&lockres->l_lock, flags); 1528 out: 1529 /* 1530 * This is helping work around a lock inversion between the page lock 1531 * and dlm locks. One path holds the page lock while calling aops 1532 * which block acquiring dlm locks. The voting thread holds dlm 1533 * locks while acquiring page locks while down converting data locks. 1534 * This block is helping an aop path notice the inversion and back 1535 * off to unlock its page lock before trying the dlm lock again. 1536 */ 1537 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1538 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1539 wait = 0; 1540 spin_lock_irqsave(&lockres->l_lock, flags); 1541 if (__lockres_remove_mask_waiter(lockres, &mw)) { 1542 if (dlm_locked) 1543 lockres_or_flags(lockres, 1544 OCFS2_LOCK_NONBLOCK_FINISHED); 1545 spin_unlock_irqrestore(&lockres->l_lock, flags); 1546 ret = -EAGAIN; 1547 } else { 1548 spin_unlock_irqrestore(&lockres->l_lock, flags); 1549 goto again; 1550 } 1551 } 1552 if (wait) { 1553 ret = ocfs2_wait_for_mask(&mw); 1554 if (ret == 0) 1555 goto again; 1556 mlog_errno(ret); 1557 } 1558 ocfs2_update_lock_stats(lockres, level, &mw, ret); 1559 1560 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1561 if (!ret && lockres->l_lockdep_map.key != NULL) { 1562 if (level == DLM_LOCK_PR) 1563 rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass, 1564 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1565 caller_ip); 1566 else 1567 rwsem_acquire(&lockres->l_lockdep_map, l_subclass, 1568 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1569 caller_ip); 1570 } 1571 #endif 1572 return ret; 1573 } 1574 1575 static inline int ocfs2_cluster_lock(struct ocfs2_super *osb, 1576 struct ocfs2_lock_res *lockres, 1577 int level, 1578 u32 lkm_flags, 1579 int arg_flags) 1580 { 1581 return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags, 1582 0, _RET_IP_); 1583 } 1584 1585 1586 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 1587 struct ocfs2_lock_res *lockres, 1588 int level, 1589 unsigned long caller_ip) 1590 { 1591 unsigned long flags; 1592 1593 spin_lock_irqsave(&lockres->l_lock, flags); 1594 ocfs2_dec_holders(lockres, level); 1595 ocfs2_downconvert_on_unlock(osb, lockres); 1596 spin_unlock_irqrestore(&lockres->l_lock, flags); 1597 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1598 if (lockres->l_lockdep_map.key != NULL) 1599 rwsem_release(&lockres->l_lockdep_map, 1, caller_ip); 1600 #endif 1601 } 1602 1603 static int ocfs2_create_new_lock(struct ocfs2_super *osb, 1604 struct ocfs2_lock_res *lockres, 1605 int ex, 1606 int local) 1607 { 1608 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1609 unsigned long flags; 1610 u32 lkm_flags = local ? DLM_LKF_LOCAL : 0; 1611 1612 spin_lock_irqsave(&lockres->l_lock, flags); 1613 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1614 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1615 spin_unlock_irqrestore(&lockres->l_lock, flags); 1616 1617 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1618 } 1619 1620 /* Grants us an EX lock on the data and metadata resources, skipping 1621 * the normal cluster directory lookup. Use this ONLY on newly created 1622 * inodes which other nodes can't possibly see, and which haven't been 1623 * hashed in the inode hash yet. This can give us a good performance 1624 * increase as it'll skip the network broadcast normally associated 1625 * with creating a new lock resource. */ 1626 int ocfs2_create_new_inode_locks(struct inode *inode) 1627 { 1628 int ret; 1629 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1630 1631 BUG_ON(!inode); 1632 BUG_ON(!ocfs2_inode_is_new(inode)); 1633 1634 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1635 1636 /* NOTE: That we don't increment any of the holder counts, nor 1637 * do we add anything to a journal handle. Since this is 1638 * supposed to be a new inode which the cluster doesn't know 1639 * about yet, there is no need to. As far as the LVB handling 1640 * is concerned, this is basically like acquiring an EX lock 1641 * on a resource which has an invalid one -- we'll set it 1642 * valid when we release the EX. */ 1643 1644 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1645 if (ret) { 1646 mlog_errno(ret); 1647 goto bail; 1648 } 1649 1650 /* 1651 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they 1652 * don't use a generation in their lock names. 1653 */ 1654 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); 1655 if (ret) { 1656 mlog_errno(ret); 1657 goto bail; 1658 } 1659 1660 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0); 1661 if (ret) { 1662 mlog_errno(ret); 1663 goto bail; 1664 } 1665 1666 bail: 1667 return ret; 1668 } 1669 1670 int ocfs2_rw_lock(struct inode *inode, int write) 1671 { 1672 int status, level; 1673 struct ocfs2_lock_res *lockres; 1674 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1675 1676 BUG_ON(!inode); 1677 1678 mlog(0, "inode %llu take %s RW lock\n", 1679 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1680 write ? "EXMODE" : "PRMODE"); 1681 1682 if (ocfs2_mount_local(osb)) 1683 return 0; 1684 1685 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1686 1687 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1688 1689 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0, 1690 0); 1691 if (status < 0) 1692 mlog_errno(status); 1693 1694 return status; 1695 } 1696 1697 void ocfs2_rw_unlock(struct inode *inode, int write) 1698 { 1699 int level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1700 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1701 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1702 1703 mlog(0, "inode %llu drop %s RW lock\n", 1704 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1705 write ? "EXMODE" : "PRMODE"); 1706 1707 if (!ocfs2_mount_local(osb)) 1708 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1709 } 1710 1711 /* 1712 * ocfs2_open_lock always get PR mode lock. 1713 */ 1714 int ocfs2_open_lock(struct inode *inode) 1715 { 1716 int status = 0; 1717 struct ocfs2_lock_res *lockres; 1718 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1719 1720 BUG_ON(!inode); 1721 1722 mlog(0, "inode %llu take PRMODE open lock\n", 1723 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1724 1725 if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb)) 1726 goto out; 1727 1728 lockres = &OCFS2_I(inode)->ip_open_lockres; 1729 1730 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1731 DLM_LOCK_PR, 0, 0); 1732 if (status < 0) 1733 mlog_errno(status); 1734 1735 out: 1736 return status; 1737 } 1738 1739 int ocfs2_try_open_lock(struct inode *inode, int write) 1740 { 1741 int status = 0, level; 1742 struct ocfs2_lock_res *lockres; 1743 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1744 1745 BUG_ON(!inode); 1746 1747 mlog(0, "inode %llu try to take %s open lock\n", 1748 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1749 write ? "EXMODE" : "PRMODE"); 1750 1751 if (ocfs2_is_hard_readonly(osb)) { 1752 if (write) 1753 status = -EROFS; 1754 goto out; 1755 } 1756 1757 if (ocfs2_mount_local(osb)) 1758 goto out; 1759 1760 lockres = &OCFS2_I(inode)->ip_open_lockres; 1761 1762 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1763 1764 /* 1765 * The file system may already holding a PRMODE/EXMODE open lock. 1766 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on 1767 * other nodes and the -EAGAIN will indicate to the caller that 1768 * this inode is still in use. 1769 */ 1770 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1771 level, DLM_LKF_NOQUEUE, 0); 1772 1773 out: 1774 return status; 1775 } 1776 1777 /* 1778 * ocfs2_open_unlock unlock PR and EX mode open locks. 1779 */ 1780 void ocfs2_open_unlock(struct inode *inode) 1781 { 1782 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres; 1783 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1784 1785 mlog(0, "inode %llu drop open lock\n", 1786 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1787 1788 if (ocfs2_mount_local(osb)) 1789 goto out; 1790 1791 if(lockres->l_ro_holders) 1792 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1793 DLM_LOCK_PR); 1794 if(lockres->l_ex_holders) 1795 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1796 DLM_LOCK_EX); 1797 1798 out: 1799 return; 1800 } 1801 1802 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, 1803 int level) 1804 { 1805 int ret; 1806 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1807 unsigned long flags; 1808 struct ocfs2_mask_waiter mw; 1809 1810 ocfs2_init_mask_waiter(&mw); 1811 1812 retry_cancel: 1813 spin_lock_irqsave(&lockres->l_lock, flags); 1814 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 1815 ret = ocfs2_prepare_cancel_convert(osb, lockres); 1816 if (ret) { 1817 spin_unlock_irqrestore(&lockres->l_lock, flags); 1818 ret = ocfs2_cancel_convert(osb, lockres); 1819 if (ret < 0) { 1820 mlog_errno(ret); 1821 goto out; 1822 } 1823 goto retry_cancel; 1824 } 1825 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1826 spin_unlock_irqrestore(&lockres->l_lock, flags); 1827 1828 ocfs2_wait_for_mask(&mw); 1829 goto retry_cancel; 1830 } 1831 1832 ret = -ERESTARTSYS; 1833 /* 1834 * We may still have gotten the lock, in which case there's no 1835 * point to restarting the syscall. 1836 */ 1837 if (lockres->l_level == level) 1838 ret = 0; 1839 1840 mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret, 1841 lockres->l_flags, lockres->l_level, lockres->l_action); 1842 1843 spin_unlock_irqrestore(&lockres->l_lock, flags); 1844 1845 out: 1846 return ret; 1847 } 1848 1849 /* 1850 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of 1851 * flock() calls. The locking approach this requires is sufficiently 1852 * different from all other cluster lock types that we implement a 1853 * separate path to the "low-level" dlm calls. In particular: 1854 * 1855 * - No optimization of lock levels is done - we take at exactly 1856 * what's been requested. 1857 * 1858 * - No lock caching is employed. We immediately downconvert to 1859 * no-lock at unlock time. This also means flock locks never go on 1860 * the blocking list). 1861 * 1862 * - Since userspace can trivially deadlock itself with flock, we make 1863 * sure to allow cancellation of a misbehaving applications flock() 1864 * request. 1865 * 1866 * - Access to any flock lockres doesn't require concurrency, so we 1867 * can simplify the code by requiring the caller to guarantee 1868 * serialization of dlmglue flock calls. 1869 */ 1870 int ocfs2_file_lock(struct file *file, int ex, int trylock) 1871 { 1872 int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1873 unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0; 1874 unsigned long flags; 1875 struct ocfs2_file_private *fp = file->private_data; 1876 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1877 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1878 struct ocfs2_mask_waiter mw; 1879 1880 ocfs2_init_mask_waiter(&mw); 1881 1882 if ((lockres->l_flags & OCFS2_LOCK_BUSY) || 1883 (lockres->l_level > DLM_LOCK_NL)) { 1884 mlog(ML_ERROR, 1885 "File lock \"%s\" has busy or locked state: flags: 0x%lx, " 1886 "level: %u\n", lockres->l_name, lockres->l_flags, 1887 lockres->l_level); 1888 return -EINVAL; 1889 } 1890 1891 spin_lock_irqsave(&lockres->l_lock, flags); 1892 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1893 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1894 spin_unlock_irqrestore(&lockres->l_lock, flags); 1895 1896 /* 1897 * Get the lock at NLMODE to start - that way we 1898 * can cancel the upconvert request if need be. 1899 */ 1900 ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0); 1901 if (ret < 0) { 1902 mlog_errno(ret); 1903 goto out; 1904 } 1905 1906 ret = ocfs2_wait_for_mask(&mw); 1907 if (ret) { 1908 mlog_errno(ret); 1909 goto out; 1910 } 1911 spin_lock_irqsave(&lockres->l_lock, flags); 1912 } 1913 1914 lockres->l_action = OCFS2_AST_CONVERT; 1915 lkm_flags |= DLM_LKF_CONVERT; 1916 lockres->l_requested = level; 1917 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1918 1919 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1920 spin_unlock_irqrestore(&lockres->l_lock, flags); 1921 1922 ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags, 1923 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1); 1924 if (ret) { 1925 if (!trylock || (ret != -EAGAIN)) { 1926 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1927 ret = -EINVAL; 1928 } 1929 1930 ocfs2_recover_from_dlm_error(lockres, 1); 1931 lockres_remove_mask_waiter(lockres, &mw); 1932 goto out; 1933 } 1934 1935 ret = ocfs2_wait_for_mask_interruptible(&mw, lockres); 1936 if (ret == -ERESTARTSYS) { 1937 /* 1938 * Userspace can cause deadlock itself with 1939 * flock(). Current behavior locally is to allow the 1940 * deadlock, but abort the system call if a signal is 1941 * received. We follow this example, otherwise a 1942 * poorly written program could sit in kernel until 1943 * reboot. 1944 * 1945 * Handling this is a bit more complicated for Ocfs2 1946 * though. We can't exit this function with an 1947 * outstanding lock request, so a cancel convert is 1948 * required. We intentionally overwrite 'ret' - if the 1949 * cancel fails and the lock was granted, it's easier 1950 * to just bubble success back up to the user. 1951 */ 1952 ret = ocfs2_flock_handle_signal(lockres, level); 1953 } else if (!ret && (level > lockres->l_level)) { 1954 /* Trylock failed asynchronously */ 1955 BUG_ON(!trylock); 1956 ret = -EAGAIN; 1957 } 1958 1959 out: 1960 1961 mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n", 1962 lockres->l_name, ex, trylock, ret); 1963 return ret; 1964 } 1965 1966 void ocfs2_file_unlock(struct file *file) 1967 { 1968 int ret; 1969 unsigned int gen; 1970 unsigned long flags; 1971 struct ocfs2_file_private *fp = file->private_data; 1972 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1973 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1974 struct ocfs2_mask_waiter mw; 1975 1976 ocfs2_init_mask_waiter(&mw); 1977 1978 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) 1979 return; 1980 1981 if (lockres->l_level == DLM_LOCK_NL) 1982 return; 1983 1984 mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", 1985 lockres->l_name, lockres->l_flags, lockres->l_level, 1986 lockres->l_action); 1987 1988 spin_lock_irqsave(&lockres->l_lock, flags); 1989 /* 1990 * Fake a blocking ast for the downconvert code. 1991 */ 1992 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 1993 lockres->l_blocking = DLM_LOCK_EX; 1994 1995 gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL); 1996 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1997 spin_unlock_irqrestore(&lockres->l_lock, flags); 1998 1999 ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen); 2000 if (ret) { 2001 mlog_errno(ret); 2002 return; 2003 } 2004 2005 ret = ocfs2_wait_for_mask(&mw); 2006 if (ret) 2007 mlog_errno(ret); 2008 } 2009 2010 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 2011 struct ocfs2_lock_res *lockres) 2012 { 2013 int kick = 0; 2014 2015 /* If we know that another node is waiting on our lock, kick 2016 * the downconvert thread * pre-emptively when we reach a release 2017 * condition. */ 2018 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 2019 switch(lockres->l_blocking) { 2020 case DLM_LOCK_EX: 2021 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 2022 kick = 1; 2023 break; 2024 case DLM_LOCK_PR: 2025 if (!lockres->l_ex_holders) 2026 kick = 1; 2027 break; 2028 default: 2029 BUG(); 2030 } 2031 } 2032 2033 if (kick) 2034 ocfs2_wake_downconvert_thread(osb); 2035 } 2036 2037 #define OCFS2_SEC_BITS 34 2038 #define OCFS2_SEC_SHIFT (64 - 34) 2039 #define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 2040 2041 /* LVB only has room for 64 bits of time here so we pack it for 2042 * now. */ 2043 static u64 ocfs2_pack_timespec(struct timespec *spec) 2044 { 2045 u64 res; 2046 u64 sec = spec->tv_sec; 2047 u32 nsec = spec->tv_nsec; 2048 2049 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 2050 2051 return res; 2052 } 2053 2054 /* Call this with the lockres locked. I am reasonably sure we don't 2055 * need ip_lock in this function as anyone who would be changing those 2056 * values is supposed to be blocked in ocfs2_inode_lock right now. */ 2057 static void __ocfs2_stuff_meta_lvb(struct inode *inode) 2058 { 2059 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2060 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2061 struct ocfs2_meta_lvb *lvb; 2062 2063 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2064 2065 /* 2066 * Invalidate the LVB of a deleted inode - this way other 2067 * nodes are forced to go to disk and discover the new inode 2068 * status. 2069 */ 2070 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2071 lvb->lvb_version = 0; 2072 goto out; 2073 } 2074 2075 lvb->lvb_version = OCFS2_LVB_VERSION; 2076 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 2077 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 2078 lvb->lvb_iuid = cpu_to_be32(i_uid_read(inode)); 2079 lvb->lvb_igid = cpu_to_be32(i_gid_read(inode)); 2080 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 2081 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 2082 lvb->lvb_iatime_packed = 2083 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime)); 2084 lvb->lvb_ictime_packed = 2085 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime)); 2086 lvb->lvb_imtime_packed = 2087 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime)); 2088 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 2089 lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features); 2090 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 2091 2092 out: 2093 mlog_meta_lvb(0, lockres); 2094 } 2095 2096 static void ocfs2_unpack_timespec(struct timespec *spec, 2097 u64 packed_time) 2098 { 2099 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 2100 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 2101 } 2102 2103 static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 2104 { 2105 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2106 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2107 struct ocfs2_meta_lvb *lvb; 2108 2109 mlog_meta_lvb(0, lockres); 2110 2111 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2112 2113 /* We're safe here without the lockres lock... */ 2114 spin_lock(&oi->ip_lock); 2115 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 2116 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 2117 2118 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 2119 oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures); 2120 ocfs2_set_inode_flags(inode); 2121 2122 /* fast-symlinks are a special case */ 2123 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 2124 inode->i_blocks = 0; 2125 else 2126 inode->i_blocks = ocfs2_inode_sector_count(inode); 2127 2128 i_uid_write(inode, be32_to_cpu(lvb->lvb_iuid)); 2129 i_gid_write(inode, be32_to_cpu(lvb->lvb_igid)); 2130 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 2131 set_nlink(inode, be16_to_cpu(lvb->lvb_inlink)); 2132 ocfs2_unpack_timespec(&inode->i_atime, 2133 be64_to_cpu(lvb->lvb_iatime_packed)); 2134 ocfs2_unpack_timespec(&inode->i_mtime, 2135 be64_to_cpu(lvb->lvb_imtime_packed)); 2136 ocfs2_unpack_timespec(&inode->i_ctime, 2137 be64_to_cpu(lvb->lvb_ictime_packed)); 2138 spin_unlock(&oi->ip_lock); 2139 } 2140 2141 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 2142 struct ocfs2_lock_res *lockres) 2143 { 2144 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2145 2146 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) 2147 && lvb->lvb_version == OCFS2_LVB_VERSION 2148 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 2149 return 1; 2150 return 0; 2151 } 2152 2153 /* Determine whether a lock resource needs to be refreshed, and 2154 * arbitrate who gets to refresh it. 2155 * 2156 * 0 means no refresh needed. 2157 * 2158 * > 0 means you need to refresh this and you MUST call 2159 * ocfs2_complete_lock_res_refresh afterwards. */ 2160 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 2161 { 2162 unsigned long flags; 2163 int status = 0; 2164 2165 refresh_check: 2166 spin_lock_irqsave(&lockres->l_lock, flags); 2167 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 2168 spin_unlock_irqrestore(&lockres->l_lock, flags); 2169 goto bail; 2170 } 2171 2172 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 2173 spin_unlock_irqrestore(&lockres->l_lock, flags); 2174 2175 ocfs2_wait_on_refreshing_lock(lockres); 2176 goto refresh_check; 2177 } 2178 2179 /* Ok, I'll be the one to refresh this lock. */ 2180 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 2181 spin_unlock_irqrestore(&lockres->l_lock, flags); 2182 2183 status = 1; 2184 bail: 2185 mlog(0, "status %d\n", status); 2186 return status; 2187 } 2188 2189 /* If status is non zero, I'll mark it as not being in refresh 2190 * anymroe, but i won't clear the needs refresh flag. */ 2191 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 2192 int status) 2193 { 2194 unsigned long flags; 2195 2196 spin_lock_irqsave(&lockres->l_lock, flags); 2197 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 2198 if (!status) 2199 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 2200 spin_unlock_irqrestore(&lockres->l_lock, flags); 2201 2202 wake_up(&lockres->l_event); 2203 } 2204 2205 /* may or may not return a bh if it went to disk. */ 2206 static int ocfs2_inode_lock_update(struct inode *inode, 2207 struct buffer_head **bh) 2208 { 2209 int status = 0; 2210 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2211 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2212 struct ocfs2_dinode *fe; 2213 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2214 2215 if (ocfs2_mount_local(osb)) 2216 goto bail; 2217 2218 spin_lock(&oi->ip_lock); 2219 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2220 mlog(0, "Orphaned inode %llu was deleted while we " 2221 "were waiting on a lock. ip_flags = 0x%x\n", 2222 (unsigned long long)oi->ip_blkno, oi->ip_flags); 2223 spin_unlock(&oi->ip_lock); 2224 status = -ENOENT; 2225 goto bail; 2226 } 2227 spin_unlock(&oi->ip_lock); 2228 2229 if (!ocfs2_should_refresh_lock_res(lockres)) 2230 goto bail; 2231 2232 /* This will discard any caching information we might have had 2233 * for the inode metadata. */ 2234 ocfs2_metadata_cache_purge(INODE_CACHE(inode)); 2235 2236 ocfs2_extent_map_trunc(inode, 0); 2237 2238 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 2239 mlog(0, "Trusting LVB on inode %llu\n", 2240 (unsigned long long)oi->ip_blkno); 2241 ocfs2_refresh_inode_from_lvb(inode); 2242 } else { 2243 /* Boo, we have to go to disk. */ 2244 /* read bh, cast, ocfs2_refresh_inode */ 2245 status = ocfs2_read_inode_block(inode, bh); 2246 if (status < 0) { 2247 mlog_errno(status); 2248 goto bail_refresh; 2249 } 2250 fe = (struct ocfs2_dinode *) (*bh)->b_data; 2251 2252 /* This is a good chance to make sure we're not 2253 * locking an invalid object. ocfs2_read_inode_block() 2254 * already checked that the inode block is sane. 2255 * 2256 * We bug on a stale inode here because we checked 2257 * above whether it was wiped from disk. The wiping 2258 * node provides a guarantee that we receive that 2259 * message and can mark the inode before dropping any 2260 * locks associated with it. */ 2261 mlog_bug_on_msg(inode->i_generation != 2262 le32_to_cpu(fe->i_generation), 2263 "Invalid dinode %llu disk generation: %u " 2264 "inode->i_generation: %u\n", 2265 (unsigned long long)oi->ip_blkno, 2266 le32_to_cpu(fe->i_generation), 2267 inode->i_generation); 2268 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 2269 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 2270 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 2271 (unsigned long long)oi->ip_blkno, 2272 (unsigned long long)le64_to_cpu(fe->i_dtime), 2273 le32_to_cpu(fe->i_flags)); 2274 2275 ocfs2_refresh_inode(inode, fe); 2276 ocfs2_track_lock_refresh(lockres); 2277 } 2278 2279 status = 0; 2280 bail_refresh: 2281 ocfs2_complete_lock_res_refresh(lockres, status); 2282 bail: 2283 return status; 2284 } 2285 2286 static int ocfs2_assign_bh(struct inode *inode, 2287 struct buffer_head **ret_bh, 2288 struct buffer_head *passed_bh) 2289 { 2290 int status; 2291 2292 if (passed_bh) { 2293 /* Ok, the update went to disk for us, use the 2294 * returned bh. */ 2295 *ret_bh = passed_bh; 2296 get_bh(*ret_bh); 2297 2298 return 0; 2299 } 2300 2301 status = ocfs2_read_inode_block(inode, ret_bh); 2302 if (status < 0) 2303 mlog_errno(status); 2304 2305 return status; 2306 } 2307 2308 /* 2309 * returns < 0 error if the callback will never be called, otherwise 2310 * the result of the lock will be communicated via the callback. 2311 */ 2312 int ocfs2_inode_lock_full_nested(struct inode *inode, 2313 struct buffer_head **ret_bh, 2314 int ex, 2315 int arg_flags, 2316 int subclass) 2317 { 2318 int status, level, acquired; 2319 u32 dlm_flags; 2320 struct ocfs2_lock_res *lockres = NULL; 2321 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2322 struct buffer_head *local_bh = NULL; 2323 2324 BUG_ON(!inode); 2325 2326 mlog(0, "inode %llu, take %s META lock\n", 2327 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2328 ex ? "EXMODE" : "PRMODE"); 2329 2330 status = 0; 2331 acquired = 0; 2332 /* We'll allow faking a readonly metadata lock for 2333 * rodevices. */ 2334 if (ocfs2_is_hard_readonly(osb)) { 2335 if (ex) 2336 status = -EROFS; 2337 goto getbh; 2338 } 2339 2340 if (ocfs2_mount_local(osb)) 2341 goto local; 2342 2343 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2344 ocfs2_wait_for_recovery(osb); 2345 2346 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2347 level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2348 dlm_flags = 0; 2349 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 2350 dlm_flags |= DLM_LKF_NOQUEUE; 2351 2352 status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags, 2353 arg_flags, subclass, _RET_IP_); 2354 if (status < 0) { 2355 if (status != -EAGAIN) 2356 mlog_errno(status); 2357 goto bail; 2358 } 2359 2360 /* Notify the error cleanup path to drop the cluster lock. */ 2361 acquired = 1; 2362 2363 /* We wait twice because a node may have died while we were in 2364 * the lower dlm layers. The second time though, we've 2365 * committed to owning this lock so we don't allow signals to 2366 * abort the operation. */ 2367 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2368 ocfs2_wait_for_recovery(osb); 2369 2370 local: 2371 /* 2372 * We only see this flag if we're being called from 2373 * ocfs2_read_locked_inode(). It means we're locking an inode 2374 * which hasn't been populated yet, so clear the refresh flag 2375 * and let the caller handle it. 2376 */ 2377 if (inode->i_state & I_NEW) { 2378 status = 0; 2379 if (lockres) 2380 ocfs2_complete_lock_res_refresh(lockres, 0); 2381 goto bail; 2382 } 2383 2384 /* This is fun. The caller may want a bh back, or it may 2385 * not. ocfs2_inode_lock_update definitely wants one in, but 2386 * may or may not read one, depending on what's in the 2387 * LVB. The result of all of this is that we've *only* gone to 2388 * disk if we have to, so the complexity is worthwhile. */ 2389 status = ocfs2_inode_lock_update(inode, &local_bh); 2390 if (status < 0) { 2391 if (status != -ENOENT) 2392 mlog_errno(status); 2393 goto bail; 2394 } 2395 getbh: 2396 if (ret_bh) { 2397 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 2398 if (status < 0) { 2399 mlog_errno(status); 2400 goto bail; 2401 } 2402 } 2403 2404 bail: 2405 if (status < 0) { 2406 if (ret_bh && (*ret_bh)) { 2407 brelse(*ret_bh); 2408 *ret_bh = NULL; 2409 } 2410 if (acquired) 2411 ocfs2_inode_unlock(inode, ex); 2412 } 2413 2414 if (local_bh) 2415 brelse(local_bh); 2416 2417 return status; 2418 } 2419 2420 /* 2421 * This is working around a lock inversion between tasks acquiring DLM 2422 * locks while holding a page lock and the downconvert thread which 2423 * blocks dlm lock acquiry while acquiring page locks. 2424 * 2425 * ** These _with_page variantes are only intended to be called from aop 2426 * methods that hold page locks and return a very specific *positive* error 2427 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 2428 * 2429 * The DLM is called such that it returns -EAGAIN if it would have 2430 * blocked waiting for the downconvert thread. In that case we unlock 2431 * our page so the downconvert thread can make progress. Once we've 2432 * done this we have to return AOP_TRUNCATED_PAGE so the aop method 2433 * that called us can bubble that back up into the VFS who will then 2434 * immediately retry the aop call. 2435 * 2436 * We do a blocking lock and immediate unlock before returning, though, so that 2437 * the lock has a great chance of being cached on this node by the time the VFS 2438 * calls back to retry the aop. This has a potential to livelock as nodes 2439 * ping locks back and forth, but that's a risk we're willing to take to avoid 2440 * the lock inversion simply. 2441 */ 2442 int ocfs2_inode_lock_with_page(struct inode *inode, 2443 struct buffer_head **ret_bh, 2444 int ex, 2445 struct page *page) 2446 { 2447 int ret; 2448 2449 ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); 2450 if (ret == -EAGAIN) { 2451 unlock_page(page); 2452 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0) 2453 ocfs2_inode_unlock(inode, ex); 2454 ret = AOP_TRUNCATED_PAGE; 2455 } 2456 2457 return ret; 2458 } 2459 2460 int ocfs2_inode_lock_atime(struct inode *inode, 2461 struct vfsmount *vfsmnt, 2462 int *level) 2463 { 2464 int ret; 2465 2466 ret = ocfs2_inode_lock(inode, NULL, 0); 2467 if (ret < 0) { 2468 mlog_errno(ret); 2469 return ret; 2470 } 2471 2472 /* 2473 * If we should update atime, we will get EX lock, 2474 * otherwise we just get PR lock. 2475 */ 2476 if (ocfs2_should_update_atime(inode, vfsmnt)) { 2477 struct buffer_head *bh = NULL; 2478 2479 ocfs2_inode_unlock(inode, 0); 2480 ret = ocfs2_inode_lock(inode, &bh, 1); 2481 if (ret < 0) { 2482 mlog_errno(ret); 2483 return ret; 2484 } 2485 *level = 1; 2486 if (ocfs2_should_update_atime(inode, vfsmnt)) 2487 ocfs2_update_inode_atime(inode, bh); 2488 if (bh) 2489 brelse(bh); 2490 } else 2491 *level = 0; 2492 2493 return ret; 2494 } 2495 2496 void ocfs2_inode_unlock(struct inode *inode, 2497 int ex) 2498 { 2499 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2500 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; 2501 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2502 2503 mlog(0, "inode %llu drop %s META lock\n", 2504 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2505 ex ? "EXMODE" : "PRMODE"); 2506 2507 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) && 2508 !ocfs2_mount_local(osb)) 2509 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 2510 } 2511 2512 int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno) 2513 { 2514 struct ocfs2_lock_res *lockres; 2515 struct ocfs2_orphan_scan_lvb *lvb; 2516 int status = 0; 2517 2518 if (ocfs2_is_hard_readonly(osb)) 2519 return -EROFS; 2520 2521 if (ocfs2_mount_local(osb)) 2522 return 0; 2523 2524 lockres = &osb->osb_orphan_scan.os_lockres; 2525 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2526 if (status < 0) 2527 return status; 2528 2529 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2530 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 2531 lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION) 2532 *seqno = be32_to_cpu(lvb->lvb_os_seqno); 2533 else 2534 *seqno = osb->osb_orphan_scan.os_seqno + 1; 2535 2536 return status; 2537 } 2538 2539 void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno) 2540 { 2541 struct ocfs2_lock_res *lockres; 2542 struct ocfs2_orphan_scan_lvb *lvb; 2543 2544 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) { 2545 lockres = &osb->osb_orphan_scan.os_lockres; 2546 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2547 lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION; 2548 lvb->lvb_os_seqno = cpu_to_be32(seqno); 2549 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2550 } 2551 } 2552 2553 int ocfs2_super_lock(struct ocfs2_super *osb, 2554 int ex) 2555 { 2556 int status = 0; 2557 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2558 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2559 2560 if (ocfs2_is_hard_readonly(osb)) 2561 return -EROFS; 2562 2563 if (ocfs2_mount_local(osb)) 2564 goto bail; 2565 2566 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 2567 if (status < 0) { 2568 mlog_errno(status); 2569 goto bail; 2570 } 2571 2572 /* The super block lock path is really in the best position to 2573 * know when resources covered by the lock need to be 2574 * refreshed, so we do it here. Of course, making sense of 2575 * everything is up to the caller :) */ 2576 status = ocfs2_should_refresh_lock_res(lockres); 2577 if (status) { 2578 status = ocfs2_refresh_slot_info(osb); 2579 2580 ocfs2_complete_lock_res_refresh(lockres, status); 2581 2582 if (status < 0) { 2583 ocfs2_cluster_unlock(osb, lockres, level); 2584 mlog_errno(status); 2585 } 2586 ocfs2_track_lock_refresh(lockres); 2587 } 2588 bail: 2589 return status; 2590 } 2591 2592 void ocfs2_super_unlock(struct ocfs2_super *osb, 2593 int ex) 2594 { 2595 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2596 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2597 2598 if (!ocfs2_mount_local(osb)) 2599 ocfs2_cluster_unlock(osb, lockres, level); 2600 } 2601 2602 int ocfs2_rename_lock(struct ocfs2_super *osb) 2603 { 2604 int status; 2605 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2606 2607 if (ocfs2_is_hard_readonly(osb)) 2608 return -EROFS; 2609 2610 if (ocfs2_mount_local(osb)) 2611 return 0; 2612 2613 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2614 if (status < 0) 2615 mlog_errno(status); 2616 2617 return status; 2618 } 2619 2620 void ocfs2_rename_unlock(struct ocfs2_super *osb) 2621 { 2622 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2623 2624 if (!ocfs2_mount_local(osb)) 2625 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2626 } 2627 2628 int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex) 2629 { 2630 int status; 2631 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2632 2633 if (ocfs2_is_hard_readonly(osb)) 2634 return -EROFS; 2635 2636 if (ocfs2_mount_local(osb)) 2637 return 0; 2638 2639 status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE, 2640 0, 0); 2641 if (status < 0) 2642 mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status); 2643 2644 return status; 2645 } 2646 2647 void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex) 2648 { 2649 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2650 2651 if (!ocfs2_mount_local(osb)) 2652 ocfs2_cluster_unlock(osb, lockres, 2653 ex ? LKM_EXMODE : LKM_PRMODE); 2654 } 2655 2656 int ocfs2_dentry_lock(struct dentry *dentry, int ex) 2657 { 2658 int ret; 2659 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2660 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2661 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2662 2663 BUG_ON(!dl); 2664 2665 if (ocfs2_is_hard_readonly(osb)) { 2666 if (ex) 2667 return -EROFS; 2668 return 0; 2669 } 2670 2671 if (ocfs2_mount_local(osb)) 2672 return 0; 2673 2674 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 2675 if (ret < 0) 2676 mlog_errno(ret); 2677 2678 return ret; 2679 } 2680 2681 void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 2682 { 2683 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2684 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2685 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2686 2687 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 2688 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 2689 } 2690 2691 /* Reference counting of the dlm debug structure. We want this because 2692 * open references on the debug inodes can live on after a mount, so 2693 * we can't rely on the ocfs2_super to always exist. */ 2694 static void ocfs2_dlm_debug_free(struct kref *kref) 2695 { 2696 struct ocfs2_dlm_debug *dlm_debug; 2697 2698 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 2699 2700 kfree(dlm_debug); 2701 } 2702 2703 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 2704 { 2705 if (dlm_debug) 2706 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 2707 } 2708 2709 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 2710 { 2711 kref_get(&debug->d_refcnt); 2712 } 2713 2714 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 2715 { 2716 struct ocfs2_dlm_debug *dlm_debug; 2717 2718 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 2719 if (!dlm_debug) { 2720 mlog_errno(-ENOMEM); 2721 goto out; 2722 } 2723 2724 kref_init(&dlm_debug->d_refcnt); 2725 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 2726 dlm_debug->d_locking_state = NULL; 2727 out: 2728 return dlm_debug; 2729 } 2730 2731 /* Access to this is arbitrated for us via seq_file->sem. */ 2732 struct ocfs2_dlm_seq_priv { 2733 struct ocfs2_dlm_debug *p_dlm_debug; 2734 struct ocfs2_lock_res p_iter_res; 2735 struct ocfs2_lock_res p_tmp_res; 2736 }; 2737 2738 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 2739 struct ocfs2_dlm_seq_priv *priv) 2740 { 2741 struct ocfs2_lock_res *iter, *ret = NULL; 2742 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 2743 2744 assert_spin_locked(&ocfs2_dlm_tracking_lock); 2745 2746 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 2747 /* discover the head of the list */ 2748 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 2749 mlog(0, "End of list found, %p\n", ret); 2750 break; 2751 } 2752 2753 /* We track our "dummy" iteration lockres' by a NULL 2754 * l_ops field. */ 2755 if (iter->l_ops != NULL) { 2756 ret = iter; 2757 break; 2758 } 2759 } 2760 2761 return ret; 2762 } 2763 2764 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 2765 { 2766 struct ocfs2_dlm_seq_priv *priv = m->private; 2767 struct ocfs2_lock_res *iter; 2768 2769 spin_lock(&ocfs2_dlm_tracking_lock); 2770 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 2771 if (iter) { 2772 /* Since lockres' have the lifetime of their container 2773 * (which can be inodes, ocfs2_supers, etc) we want to 2774 * copy this out to a temporary lockres while still 2775 * under the spinlock. Obviously after this we can't 2776 * trust any pointers on the copy returned, but that's 2777 * ok as the information we want isn't typically held 2778 * in them. */ 2779 priv->p_tmp_res = *iter; 2780 iter = &priv->p_tmp_res; 2781 } 2782 spin_unlock(&ocfs2_dlm_tracking_lock); 2783 2784 return iter; 2785 } 2786 2787 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 2788 { 2789 } 2790 2791 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 2792 { 2793 struct ocfs2_dlm_seq_priv *priv = m->private; 2794 struct ocfs2_lock_res *iter = v; 2795 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 2796 2797 spin_lock(&ocfs2_dlm_tracking_lock); 2798 iter = ocfs2_dlm_next_res(iter, priv); 2799 list_del_init(&dummy->l_debug_list); 2800 if (iter) { 2801 list_add(&dummy->l_debug_list, &iter->l_debug_list); 2802 priv->p_tmp_res = *iter; 2803 iter = &priv->p_tmp_res; 2804 } 2805 spin_unlock(&ocfs2_dlm_tracking_lock); 2806 2807 return iter; 2808 } 2809 2810 /* 2811 * Version is used by debugfs.ocfs2 to determine the format being used 2812 * 2813 * New in version 2 2814 * - Lock stats printed 2815 * New in version 3 2816 * - Max time in lock stats is in usecs (instead of nsecs) 2817 */ 2818 #define OCFS2_DLM_DEBUG_STR_VERSION 3 2819 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 2820 { 2821 int i; 2822 char *lvb; 2823 struct ocfs2_lock_res *lockres = v; 2824 2825 if (!lockres) 2826 return -EINVAL; 2827 2828 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 2829 2830 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 2831 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 2832 lockres->l_name, 2833 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 2834 else 2835 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 2836 2837 seq_printf(m, "%d\t" 2838 "0x%lx\t" 2839 "0x%x\t" 2840 "0x%x\t" 2841 "%u\t" 2842 "%u\t" 2843 "%d\t" 2844 "%d\t", 2845 lockres->l_level, 2846 lockres->l_flags, 2847 lockres->l_action, 2848 lockres->l_unlock_action, 2849 lockres->l_ro_holders, 2850 lockres->l_ex_holders, 2851 lockres->l_requested, 2852 lockres->l_blocking); 2853 2854 /* Dump the raw LVB */ 2855 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2856 for(i = 0; i < DLM_LVB_LEN; i++) 2857 seq_printf(m, "0x%x\t", lvb[i]); 2858 2859 #ifdef CONFIG_OCFS2_FS_STATS 2860 # define lock_num_prmode(_l) ((_l)->l_lock_prmode.ls_gets) 2861 # define lock_num_exmode(_l) ((_l)->l_lock_exmode.ls_gets) 2862 # define lock_num_prmode_failed(_l) ((_l)->l_lock_prmode.ls_fail) 2863 # define lock_num_exmode_failed(_l) ((_l)->l_lock_exmode.ls_fail) 2864 # define lock_total_prmode(_l) ((_l)->l_lock_prmode.ls_total) 2865 # define lock_total_exmode(_l) ((_l)->l_lock_exmode.ls_total) 2866 # define lock_max_prmode(_l) ((_l)->l_lock_prmode.ls_max) 2867 # define lock_max_exmode(_l) ((_l)->l_lock_exmode.ls_max) 2868 # define lock_refresh(_l) ((_l)->l_lock_refresh) 2869 #else 2870 # define lock_num_prmode(_l) (0) 2871 # define lock_num_exmode(_l) (0) 2872 # define lock_num_prmode_failed(_l) (0) 2873 # define lock_num_exmode_failed(_l) (0) 2874 # define lock_total_prmode(_l) (0ULL) 2875 # define lock_total_exmode(_l) (0ULL) 2876 # define lock_max_prmode(_l) (0) 2877 # define lock_max_exmode(_l) (0) 2878 # define lock_refresh(_l) (0) 2879 #endif 2880 /* The following seq_print was added in version 2 of this output */ 2881 seq_printf(m, "%u\t" 2882 "%u\t" 2883 "%u\t" 2884 "%u\t" 2885 "%llu\t" 2886 "%llu\t" 2887 "%u\t" 2888 "%u\t" 2889 "%u\t", 2890 lock_num_prmode(lockres), 2891 lock_num_exmode(lockres), 2892 lock_num_prmode_failed(lockres), 2893 lock_num_exmode_failed(lockres), 2894 lock_total_prmode(lockres), 2895 lock_total_exmode(lockres), 2896 lock_max_prmode(lockres), 2897 lock_max_exmode(lockres), 2898 lock_refresh(lockres)); 2899 2900 /* End the line */ 2901 seq_printf(m, "\n"); 2902 return 0; 2903 } 2904 2905 static const struct seq_operations ocfs2_dlm_seq_ops = { 2906 .start = ocfs2_dlm_seq_start, 2907 .stop = ocfs2_dlm_seq_stop, 2908 .next = ocfs2_dlm_seq_next, 2909 .show = ocfs2_dlm_seq_show, 2910 }; 2911 2912 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 2913 { 2914 struct seq_file *seq = file->private_data; 2915 struct ocfs2_dlm_seq_priv *priv = seq->private; 2916 struct ocfs2_lock_res *res = &priv->p_iter_res; 2917 2918 ocfs2_remove_lockres_tracking(res); 2919 ocfs2_put_dlm_debug(priv->p_dlm_debug); 2920 return seq_release_private(inode, file); 2921 } 2922 2923 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 2924 { 2925 struct ocfs2_dlm_seq_priv *priv; 2926 struct ocfs2_super *osb; 2927 2928 priv = __seq_open_private(file, &ocfs2_dlm_seq_ops, sizeof(*priv)); 2929 if (!priv) { 2930 mlog_errno(-ENOMEM); 2931 return -ENOMEM; 2932 } 2933 2934 osb = inode->i_private; 2935 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 2936 priv->p_dlm_debug = osb->osb_dlm_debug; 2937 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 2938 2939 ocfs2_add_lockres_tracking(&priv->p_iter_res, 2940 priv->p_dlm_debug); 2941 2942 return 0; 2943 } 2944 2945 static const struct file_operations ocfs2_dlm_debug_fops = { 2946 .open = ocfs2_dlm_debug_open, 2947 .release = ocfs2_dlm_debug_release, 2948 .read = seq_read, 2949 .llseek = seq_lseek, 2950 }; 2951 2952 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 2953 { 2954 int ret = 0; 2955 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2956 2957 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 2958 S_IFREG|S_IRUSR, 2959 osb->osb_debug_root, 2960 osb, 2961 &ocfs2_dlm_debug_fops); 2962 if (!dlm_debug->d_locking_state) { 2963 ret = -EINVAL; 2964 mlog(ML_ERROR, 2965 "Unable to create locking state debugfs file.\n"); 2966 goto out; 2967 } 2968 2969 ocfs2_get_dlm_debug(dlm_debug); 2970 out: 2971 return ret; 2972 } 2973 2974 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 2975 { 2976 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2977 2978 if (dlm_debug) { 2979 debugfs_remove(dlm_debug->d_locking_state); 2980 ocfs2_put_dlm_debug(dlm_debug); 2981 } 2982 } 2983 2984 int ocfs2_dlm_init(struct ocfs2_super *osb) 2985 { 2986 int status = 0; 2987 struct ocfs2_cluster_connection *conn = NULL; 2988 2989 if (ocfs2_mount_local(osb)) { 2990 osb->node_num = 0; 2991 goto local; 2992 } 2993 2994 status = ocfs2_dlm_init_debug(osb); 2995 if (status < 0) { 2996 mlog_errno(status); 2997 goto bail; 2998 } 2999 3000 /* launch downconvert thread */ 3001 osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc"); 3002 if (IS_ERR(osb->dc_task)) { 3003 status = PTR_ERR(osb->dc_task); 3004 osb->dc_task = NULL; 3005 mlog_errno(status); 3006 goto bail; 3007 } 3008 3009 /* for now, uuid == domain */ 3010 status = ocfs2_cluster_connect(osb->osb_cluster_stack, 3011 osb->osb_cluster_name, 3012 strlen(osb->osb_cluster_name), 3013 osb->uuid_str, 3014 strlen(osb->uuid_str), 3015 &lproto, ocfs2_do_node_down, osb, 3016 &conn); 3017 if (status) { 3018 mlog_errno(status); 3019 goto bail; 3020 } 3021 3022 status = ocfs2_cluster_this_node(conn, &osb->node_num); 3023 if (status < 0) { 3024 mlog_errno(status); 3025 mlog(ML_ERROR, 3026 "could not find this host's node number\n"); 3027 ocfs2_cluster_disconnect(conn, 0); 3028 goto bail; 3029 } 3030 3031 local: 3032 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 3033 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 3034 ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb); 3035 ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb); 3036 3037 osb->cconn = conn; 3038 bail: 3039 if (status < 0) { 3040 ocfs2_dlm_shutdown_debug(osb); 3041 if (osb->dc_task) 3042 kthread_stop(osb->dc_task); 3043 } 3044 3045 return status; 3046 } 3047 3048 void ocfs2_dlm_shutdown(struct ocfs2_super *osb, 3049 int hangup_pending) 3050 { 3051 ocfs2_drop_osb_locks(osb); 3052 3053 /* 3054 * Now that we have dropped all locks and ocfs2_dismount_volume() 3055 * has disabled recovery, the DLM won't be talking to us. It's 3056 * safe to tear things down before disconnecting the cluster. 3057 */ 3058 3059 if (osb->dc_task) { 3060 kthread_stop(osb->dc_task); 3061 osb->dc_task = NULL; 3062 } 3063 3064 ocfs2_lock_res_free(&osb->osb_super_lockres); 3065 ocfs2_lock_res_free(&osb->osb_rename_lockres); 3066 ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres); 3067 ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres); 3068 3069 ocfs2_cluster_disconnect(osb->cconn, hangup_pending); 3070 osb->cconn = NULL; 3071 3072 ocfs2_dlm_shutdown_debug(osb); 3073 } 3074 3075 static int ocfs2_drop_lock(struct ocfs2_super *osb, 3076 struct ocfs2_lock_res *lockres) 3077 { 3078 int ret; 3079 unsigned long flags; 3080 u32 lkm_flags = 0; 3081 3082 /* We didn't get anywhere near actually using this lockres. */ 3083 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 3084 goto out; 3085 3086 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 3087 lkm_flags |= DLM_LKF_VALBLK; 3088 3089 spin_lock_irqsave(&lockres->l_lock, flags); 3090 3091 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 3092 "lockres %s, flags 0x%lx\n", 3093 lockres->l_name, lockres->l_flags); 3094 3095 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 3096 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 3097 "%u, unlock_action = %u\n", 3098 lockres->l_name, lockres->l_flags, lockres->l_action, 3099 lockres->l_unlock_action); 3100 3101 spin_unlock_irqrestore(&lockres->l_lock, flags); 3102 3103 /* XXX: Today we just wait on any busy 3104 * locks... Perhaps we need to cancel converts in the 3105 * future? */ 3106 ocfs2_wait_on_busy_lock(lockres); 3107 3108 spin_lock_irqsave(&lockres->l_lock, flags); 3109 } 3110 3111 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3112 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 3113 lockres->l_level == DLM_LOCK_EX && 3114 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3115 lockres->l_ops->set_lvb(lockres); 3116 } 3117 3118 if (lockres->l_flags & OCFS2_LOCK_BUSY) 3119 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 3120 lockres->l_name); 3121 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 3122 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 3123 3124 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 3125 spin_unlock_irqrestore(&lockres->l_lock, flags); 3126 goto out; 3127 } 3128 3129 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 3130 3131 /* make sure we never get here while waiting for an ast to 3132 * fire. */ 3133 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 3134 3135 /* is this necessary? */ 3136 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3137 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 3138 spin_unlock_irqrestore(&lockres->l_lock, flags); 3139 3140 mlog(0, "lock %s\n", lockres->l_name); 3141 3142 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags); 3143 if (ret) { 3144 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3145 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 3146 ocfs2_dlm_dump_lksb(&lockres->l_lksb); 3147 BUG(); 3148 } 3149 mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n", 3150 lockres->l_name); 3151 3152 ocfs2_wait_on_busy_lock(lockres); 3153 out: 3154 return 0; 3155 } 3156 3157 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3158 struct ocfs2_lock_res *lockres); 3159 3160 /* Mark the lockres as being dropped. It will no longer be 3161 * queued if blocking, but we still may have to wait on it 3162 * being dequeued from the downconvert thread before we can consider 3163 * it safe to drop. 3164 * 3165 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 3166 void ocfs2_mark_lockres_freeing(struct ocfs2_super *osb, 3167 struct ocfs2_lock_res *lockres) 3168 { 3169 int status; 3170 struct ocfs2_mask_waiter mw; 3171 unsigned long flags, flags2; 3172 3173 ocfs2_init_mask_waiter(&mw); 3174 3175 spin_lock_irqsave(&lockres->l_lock, flags); 3176 lockres->l_flags |= OCFS2_LOCK_FREEING; 3177 if (lockres->l_flags & OCFS2_LOCK_QUEUED && current == osb->dc_task) { 3178 /* 3179 * We know the downconvert is queued but not in progress 3180 * because we are the downconvert thread and processing 3181 * different lock. So we can just remove the lock from the 3182 * queue. This is not only an optimization but also a way 3183 * to avoid the following deadlock: 3184 * ocfs2_dentry_post_unlock() 3185 * ocfs2_dentry_lock_put() 3186 * ocfs2_drop_dentry_lock() 3187 * iput() 3188 * ocfs2_evict_inode() 3189 * ocfs2_clear_inode() 3190 * ocfs2_mark_lockres_freeing() 3191 * ... blocks waiting for OCFS2_LOCK_QUEUED 3192 * since we are the downconvert thread which 3193 * should clear the flag. 3194 */ 3195 spin_unlock_irqrestore(&lockres->l_lock, flags); 3196 spin_lock_irqsave(&osb->dc_task_lock, flags2); 3197 list_del_init(&lockres->l_blocked_list); 3198 osb->blocked_lock_count--; 3199 spin_unlock_irqrestore(&osb->dc_task_lock, flags2); 3200 /* 3201 * Warn if we recurse into another post_unlock call. Strictly 3202 * speaking it isn't a problem but we need to be careful if 3203 * that happens (stack overflow, deadlocks, ...) so warn if 3204 * ocfs2 grows a path for which this can happen. 3205 */ 3206 WARN_ON_ONCE(lockres->l_ops->post_unlock); 3207 /* Since the lock is freeing we don't do much in the fn below */ 3208 ocfs2_process_blocked_lock(osb, lockres); 3209 return; 3210 } 3211 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 3212 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 3213 spin_unlock_irqrestore(&lockres->l_lock, flags); 3214 3215 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 3216 3217 status = ocfs2_wait_for_mask(&mw); 3218 if (status) 3219 mlog_errno(status); 3220 3221 spin_lock_irqsave(&lockres->l_lock, flags); 3222 } 3223 spin_unlock_irqrestore(&lockres->l_lock, flags); 3224 } 3225 3226 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 3227 struct ocfs2_lock_res *lockres) 3228 { 3229 int ret; 3230 3231 ocfs2_mark_lockres_freeing(osb, lockres); 3232 ret = ocfs2_drop_lock(osb, lockres); 3233 if (ret) 3234 mlog_errno(ret); 3235 } 3236 3237 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 3238 { 3239 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 3240 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 3241 ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres); 3242 ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres); 3243 } 3244 3245 int ocfs2_drop_inode_locks(struct inode *inode) 3246 { 3247 int status, err; 3248 3249 /* No need to call ocfs2_mark_lockres_freeing here - 3250 * ocfs2_clear_inode has done it for us. */ 3251 3252 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3253 &OCFS2_I(inode)->ip_open_lockres); 3254 if (err < 0) 3255 mlog_errno(err); 3256 3257 status = err; 3258 3259 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3260 &OCFS2_I(inode)->ip_inode_lockres); 3261 if (err < 0) 3262 mlog_errno(err); 3263 if (err < 0 && !status) 3264 status = err; 3265 3266 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3267 &OCFS2_I(inode)->ip_rw_lockres); 3268 if (err < 0) 3269 mlog_errno(err); 3270 if (err < 0 && !status) 3271 status = err; 3272 3273 return status; 3274 } 3275 3276 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 3277 int new_level) 3278 { 3279 assert_spin_locked(&lockres->l_lock); 3280 3281 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 3282 3283 if (lockres->l_level <= new_level) { 3284 mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, " 3285 "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, " 3286 "block %d, pgen %d\n", lockres->l_name, lockres->l_level, 3287 new_level, list_empty(&lockres->l_blocked_list), 3288 list_empty(&lockres->l_mask_waiters), lockres->l_type, 3289 lockres->l_flags, lockres->l_ro_holders, 3290 lockres->l_ex_holders, lockres->l_action, 3291 lockres->l_unlock_action, lockres->l_requested, 3292 lockres->l_blocking, lockres->l_pending_gen); 3293 BUG(); 3294 } 3295 3296 mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n", 3297 lockres->l_name, lockres->l_level, new_level, lockres->l_blocking); 3298 3299 lockres->l_action = OCFS2_AST_DOWNCONVERT; 3300 lockres->l_requested = new_level; 3301 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3302 return lockres_set_pending(lockres); 3303 } 3304 3305 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 3306 struct ocfs2_lock_res *lockres, 3307 int new_level, 3308 int lvb, 3309 unsigned int generation) 3310 { 3311 int ret; 3312 u32 dlm_flags = DLM_LKF_CONVERT; 3313 3314 mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name, 3315 lockres->l_level, new_level); 3316 3317 if (lvb) 3318 dlm_flags |= DLM_LKF_VALBLK; 3319 3320 ret = ocfs2_dlm_lock(osb->cconn, 3321 new_level, 3322 &lockres->l_lksb, 3323 dlm_flags, 3324 lockres->l_name, 3325 OCFS2_LOCK_ID_MAX_LEN - 1); 3326 lockres_clear_pending(lockres, generation, osb); 3327 if (ret) { 3328 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 3329 ocfs2_recover_from_dlm_error(lockres, 1); 3330 goto bail; 3331 } 3332 3333 ret = 0; 3334 bail: 3335 return ret; 3336 } 3337 3338 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */ 3339 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 3340 struct ocfs2_lock_res *lockres) 3341 { 3342 assert_spin_locked(&lockres->l_lock); 3343 3344 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 3345 /* If we're already trying to cancel a lock conversion 3346 * then just drop the spinlock and allow the caller to 3347 * requeue this lock. */ 3348 mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name); 3349 return 0; 3350 } 3351 3352 /* were we in a convert when we got the bast fire? */ 3353 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 3354 lockres->l_action != OCFS2_AST_DOWNCONVERT); 3355 /* set things up for the unlockast to know to just 3356 * clear out the ast_action and unset busy, etc. */ 3357 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 3358 3359 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 3360 "lock %s, invalid flags: 0x%lx\n", 3361 lockres->l_name, lockres->l_flags); 3362 3363 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3364 3365 return 1; 3366 } 3367 3368 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 3369 struct ocfs2_lock_res *lockres) 3370 { 3371 int ret; 3372 3373 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, 3374 DLM_LKF_CANCEL); 3375 if (ret) { 3376 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3377 ocfs2_recover_from_dlm_error(lockres, 0); 3378 } 3379 3380 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3381 3382 return ret; 3383 } 3384 3385 static int ocfs2_unblock_lock(struct ocfs2_super *osb, 3386 struct ocfs2_lock_res *lockres, 3387 struct ocfs2_unblock_ctl *ctl) 3388 { 3389 unsigned long flags; 3390 int blocking; 3391 int new_level; 3392 int level; 3393 int ret = 0; 3394 int set_lvb = 0; 3395 unsigned int gen; 3396 3397 spin_lock_irqsave(&lockres->l_lock, flags); 3398 3399 recheck: 3400 /* 3401 * Is it still blocking? If not, we have no more work to do. 3402 */ 3403 if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) { 3404 BUG_ON(lockres->l_blocking != DLM_LOCK_NL); 3405 spin_unlock_irqrestore(&lockres->l_lock, flags); 3406 ret = 0; 3407 goto leave; 3408 } 3409 3410 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3411 /* XXX 3412 * This is a *big* race. The OCFS2_LOCK_PENDING flag 3413 * exists entirely for one reason - another thread has set 3414 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock(). 3415 * 3416 * If we do ocfs2_cancel_convert() before the other thread 3417 * calls dlm_lock(), our cancel will do nothing. We will 3418 * get no ast, and we will have no way of knowing the 3419 * cancel failed. Meanwhile, the other thread will call 3420 * into dlm_lock() and wait...forever. 3421 * 3422 * Why forever? Because another node has asked for the 3423 * lock first; that's why we're here in unblock_lock(). 3424 * 3425 * The solution is OCFS2_LOCK_PENDING. When PENDING is 3426 * set, we just requeue the unblock. Only when the other 3427 * thread has called dlm_lock() and cleared PENDING will 3428 * we then cancel their request. 3429 * 3430 * All callers of dlm_lock() must set OCFS2_DLM_PENDING 3431 * at the same time they set OCFS2_DLM_BUSY. They must 3432 * clear OCFS2_DLM_PENDING after dlm_lock() returns. 3433 */ 3434 if (lockres->l_flags & OCFS2_LOCK_PENDING) { 3435 mlog(ML_BASTS, "lockres %s, ReQ: Pending\n", 3436 lockres->l_name); 3437 goto leave_requeue; 3438 } 3439 3440 ctl->requeue = 1; 3441 ret = ocfs2_prepare_cancel_convert(osb, lockres); 3442 spin_unlock_irqrestore(&lockres->l_lock, flags); 3443 if (ret) { 3444 ret = ocfs2_cancel_convert(osb, lockres); 3445 if (ret < 0) 3446 mlog_errno(ret); 3447 } 3448 goto leave; 3449 } 3450 3451 /* 3452 * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is 3453 * set when the ast is received for an upconvert just before the 3454 * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast 3455 * on the heels of the ast, we want to delay the downconvert just 3456 * enough to allow the up requestor to do its task. Because this 3457 * lock is in the blocked queue, the lock will be downconverted 3458 * as soon as the requestor is done with the lock. 3459 */ 3460 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) 3461 goto leave_requeue; 3462 3463 /* 3464 * How can we block and yet be at NL? We were trying to upconvert 3465 * from NL and got canceled. The code comes back here, and now 3466 * we notice and clear BLOCKING. 3467 */ 3468 if (lockres->l_level == DLM_LOCK_NL) { 3469 BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders); 3470 mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name); 3471 lockres->l_blocking = DLM_LOCK_NL; 3472 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 3473 spin_unlock_irqrestore(&lockres->l_lock, flags); 3474 goto leave; 3475 } 3476 3477 /* if we're blocking an exclusive and we have *any* holders, 3478 * then requeue. */ 3479 if ((lockres->l_blocking == DLM_LOCK_EX) 3480 && (lockres->l_ex_holders || lockres->l_ro_holders)) { 3481 mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n", 3482 lockres->l_name, lockres->l_ex_holders, 3483 lockres->l_ro_holders); 3484 goto leave_requeue; 3485 } 3486 3487 /* If it's a PR we're blocking, then only 3488 * requeue if we've got any EX holders */ 3489 if (lockres->l_blocking == DLM_LOCK_PR && 3490 lockres->l_ex_holders) { 3491 mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n", 3492 lockres->l_name, lockres->l_ex_holders); 3493 goto leave_requeue; 3494 } 3495 3496 /* 3497 * Can we get a lock in this state if the holder counts are 3498 * zero? The meta data unblock code used to check this. 3499 */ 3500 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 3501 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) { 3502 mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n", 3503 lockres->l_name); 3504 goto leave_requeue; 3505 } 3506 3507 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 3508 3509 if (lockres->l_ops->check_downconvert 3510 && !lockres->l_ops->check_downconvert(lockres, new_level)) { 3511 mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n", 3512 lockres->l_name); 3513 goto leave_requeue; 3514 } 3515 3516 /* If we get here, then we know that there are no more 3517 * incompatible holders (and anyone asking for an incompatible 3518 * lock is blocked). We can now downconvert the lock */ 3519 if (!lockres->l_ops->downconvert_worker) 3520 goto downconvert; 3521 3522 /* Some lockres types want to do a bit of work before 3523 * downconverting a lock. Allow that here. The worker function 3524 * may sleep, so we save off a copy of what we're blocking as 3525 * it may change while we're not holding the spin lock. */ 3526 blocking = lockres->l_blocking; 3527 level = lockres->l_level; 3528 spin_unlock_irqrestore(&lockres->l_lock, flags); 3529 3530 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 3531 3532 if (ctl->unblock_action == UNBLOCK_STOP_POST) { 3533 mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n", 3534 lockres->l_name); 3535 goto leave; 3536 } 3537 3538 spin_lock_irqsave(&lockres->l_lock, flags); 3539 if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) { 3540 /* If this changed underneath us, then we can't drop 3541 * it just yet. */ 3542 mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, " 3543 "Recheck\n", lockres->l_name, blocking, 3544 lockres->l_blocking, level, lockres->l_level); 3545 goto recheck; 3546 } 3547 3548 downconvert: 3549 ctl->requeue = 0; 3550 3551 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3552 if (lockres->l_level == DLM_LOCK_EX) 3553 set_lvb = 1; 3554 3555 /* 3556 * We only set the lvb if the lock has been fully 3557 * refreshed - otherwise we risk setting stale 3558 * data. Otherwise, there's no need to actually clear 3559 * out the lvb here as it's value is still valid. 3560 */ 3561 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3562 lockres->l_ops->set_lvb(lockres); 3563 } 3564 3565 gen = ocfs2_prepare_downconvert(lockres, new_level); 3566 spin_unlock_irqrestore(&lockres->l_lock, flags); 3567 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb, 3568 gen); 3569 3570 leave: 3571 if (ret) 3572 mlog_errno(ret); 3573 return ret; 3574 3575 leave_requeue: 3576 spin_unlock_irqrestore(&lockres->l_lock, flags); 3577 ctl->requeue = 1; 3578 3579 return 0; 3580 } 3581 3582 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 3583 int blocking) 3584 { 3585 struct inode *inode; 3586 struct address_space *mapping; 3587 struct ocfs2_inode_info *oi; 3588 3589 inode = ocfs2_lock_res_inode(lockres); 3590 mapping = inode->i_mapping; 3591 3592 if (S_ISDIR(inode->i_mode)) { 3593 oi = OCFS2_I(inode); 3594 oi->ip_dir_lock_gen++; 3595 mlog(0, "generation: %u\n", oi->ip_dir_lock_gen); 3596 goto out; 3597 } 3598 3599 if (!S_ISREG(inode->i_mode)) 3600 goto out; 3601 3602 /* 3603 * We need this before the filemap_fdatawrite() so that it can 3604 * transfer the dirty bit from the PTE to the 3605 * page. Unfortunately this means that even for EX->PR 3606 * downconverts, we'll lose our mappings and have to build 3607 * them up again. 3608 */ 3609 unmap_mapping_range(mapping, 0, 0, 0); 3610 3611 if (filemap_fdatawrite(mapping)) { 3612 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 3613 (unsigned long long)OCFS2_I(inode)->ip_blkno); 3614 } 3615 sync_mapping_buffers(mapping); 3616 if (blocking == DLM_LOCK_EX) { 3617 truncate_inode_pages(mapping, 0); 3618 } else { 3619 /* We only need to wait on the I/O if we're not also 3620 * truncating pages because truncate_inode_pages waits 3621 * for us above. We don't truncate pages if we're 3622 * blocking anything < EXMODE because we want to keep 3623 * them around in that case. */ 3624 filemap_fdatawait(mapping); 3625 } 3626 3627 out: 3628 return UNBLOCK_CONTINUE; 3629 } 3630 3631 static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci, 3632 struct ocfs2_lock_res *lockres, 3633 int new_level) 3634 { 3635 int checkpointed = ocfs2_ci_fully_checkpointed(ci); 3636 3637 BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR); 3638 BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed); 3639 3640 if (checkpointed) 3641 return 1; 3642 3643 ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci))); 3644 return 0; 3645 } 3646 3647 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 3648 int new_level) 3649 { 3650 struct inode *inode = ocfs2_lock_res_inode(lockres); 3651 3652 return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level); 3653 } 3654 3655 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 3656 { 3657 struct inode *inode = ocfs2_lock_res_inode(lockres); 3658 3659 __ocfs2_stuff_meta_lvb(inode); 3660 } 3661 3662 /* 3663 * Does the final reference drop on our dentry lock. Right now this 3664 * happens in the downconvert thread, but we could choose to simplify the 3665 * dlmglue API and push these off to the ocfs2_wq in the future. 3666 */ 3667 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 3668 struct ocfs2_lock_res *lockres) 3669 { 3670 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3671 ocfs2_dentry_lock_put(osb, dl); 3672 } 3673 3674 /* 3675 * d_delete() matching dentries before the lock downconvert. 3676 * 3677 * At this point, any process waiting to destroy the 3678 * dentry_lock due to last ref count is stopped by the 3679 * OCFS2_LOCK_QUEUED flag. 3680 * 3681 * We have two potential problems 3682 * 3683 * 1) If we do the last reference drop on our dentry_lock (via dput) 3684 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 3685 * the downconvert to finish. Instead we take an elevated 3686 * reference and push the drop until after we've completed our 3687 * unblock processing. 3688 * 3689 * 2) There might be another process with a final reference, 3690 * waiting on us to finish processing. If this is the case, we 3691 * detect it and exit out - there's no more dentries anyway. 3692 */ 3693 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 3694 int blocking) 3695 { 3696 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3697 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 3698 struct dentry *dentry; 3699 unsigned long flags; 3700 int extra_ref = 0; 3701 3702 /* 3703 * This node is blocking another node from getting a read 3704 * lock. This happens when we've renamed within a 3705 * directory. We've forced the other nodes to d_delete(), but 3706 * we never actually dropped our lock because it's still 3707 * valid. The downconvert code will retain a PR for this node, 3708 * so there's no further work to do. 3709 */ 3710 if (blocking == DLM_LOCK_PR) 3711 return UNBLOCK_CONTINUE; 3712 3713 /* 3714 * Mark this inode as potentially orphaned. The code in 3715 * ocfs2_delete_inode() will figure out whether it actually 3716 * needs to be freed or not. 3717 */ 3718 spin_lock(&oi->ip_lock); 3719 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 3720 spin_unlock(&oi->ip_lock); 3721 3722 /* 3723 * Yuck. We need to make sure however that the check of 3724 * OCFS2_LOCK_FREEING and the extra reference are atomic with 3725 * respect to a reference decrement or the setting of that 3726 * flag. 3727 */ 3728 spin_lock_irqsave(&lockres->l_lock, flags); 3729 spin_lock(&dentry_attach_lock); 3730 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 3731 && dl->dl_count) { 3732 dl->dl_count++; 3733 extra_ref = 1; 3734 } 3735 spin_unlock(&dentry_attach_lock); 3736 spin_unlock_irqrestore(&lockres->l_lock, flags); 3737 3738 mlog(0, "extra_ref = %d\n", extra_ref); 3739 3740 /* 3741 * We have a process waiting on us in ocfs2_dentry_iput(), 3742 * which means we can't have any more outstanding 3743 * aliases. There's no need to do any more work. 3744 */ 3745 if (!extra_ref) 3746 return UNBLOCK_CONTINUE; 3747 3748 spin_lock(&dentry_attach_lock); 3749 while (1) { 3750 dentry = ocfs2_find_local_alias(dl->dl_inode, 3751 dl->dl_parent_blkno, 1); 3752 if (!dentry) 3753 break; 3754 spin_unlock(&dentry_attach_lock); 3755 3756 if (S_ISDIR(dl->dl_inode->i_mode)) 3757 shrink_dcache_parent(dentry); 3758 3759 mlog(0, "d_delete(%pd);\n", dentry); 3760 3761 /* 3762 * The following dcache calls may do an 3763 * iput(). Normally we don't want that from the 3764 * downconverting thread, but in this case it's ok 3765 * because the requesting node already has an 3766 * exclusive lock on the inode, so it can't be queued 3767 * for a downconvert. 3768 */ 3769 d_delete(dentry); 3770 dput(dentry); 3771 3772 spin_lock(&dentry_attach_lock); 3773 } 3774 spin_unlock(&dentry_attach_lock); 3775 3776 /* 3777 * If we are the last holder of this dentry lock, there is no 3778 * reason to downconvert so skip straight to the unlock. 3779 */ 3780 if (dl->dl_count == 1) 3781 return UNBLOCK_STOP_POST; 3782 3783 return UNBLOCK_CONTINUE_POST; 3784 } 3785 3786 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 3787 int new_level) 3788 { 3789 struct ocfs2_refcount_tree *tree = 3790 ocfs2_lock_res_refcount_tree(lockres); 3791 3792 return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level); 3793 } 3794 3795 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 3796 int blocking) 3797 { 3798 struct ocfs2_refcount_tree *tree = 3799 ocfs2_lock_res_refcount_tree(lockres); 3800 3801 ocfs2_metadata_cache_purge(&tree->rf_ci); 3802 3803 return UNBLOCK_CONTINUE; 3804 } 3805 3806 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres) 3807 { 3808 struct ocfs2_qinfo_lvb *lvb; 3809 struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres); 3810 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 3811 oinfo->dqi_gi.dqi_type); 3812 3813 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 3814 lvb->lvb_version = OCFS2_QINFO_LVB_VERSION; 3815 lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace); 3816 lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace); 3817 lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms); 3818 lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks); 3819 lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk); 3820 lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry); 3821 } 3822 3823 void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex) 3824 { 3825 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 3826 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 3827 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3828 3829 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 3830 ocfs2_cluster_unlock(osb, lockres, level); 3831 } 3832 3833 static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo) 3834 { 3835 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 3836 oinfo->dqi_gi.dqi_type); 3837 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 3838 struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 3839 struct buffer_head *bh = NULL; 3840 struct ocfs2_global_disk_dqinfo *gdinfo; 3841 int status = 0; 3842 3843 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 3844 lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) { 3845 info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace); 3846 info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace); 3847 oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms); 3848 oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks); 3849 oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk); 3850 oinfo->dqi_gi.dqi_free_entry = 3851 be32_to_cpu(lvb->lvb_free_entry); 3852 } else { 3853 status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode, 3854 oinfo->dqi_giblk, &bh); 3855 if (status) { 3856 mlog_errno(status); 3857 goto bail; 3858 } 3859 gdinfo = (struct ocfs2_global_disk_dqinfo *) 3860 (bh->b_data + OCFS2_GLOBAL_INFO_OFF); 3861 info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace); 3862 info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace); 3863 oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms); 3864 oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks); 3865 oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk); 3866 oinfo->dqi_gi.dqi_free_entry = 3867 le32_to_cpu(gdinfo->dqi_free_entry); 3868 brelse(bh); 3869 ocfs2_track_lock_refresh(lockres); 3870 } 3871 3872 bail: 3873 return status; 3874 } 3875 3876 /* Lock quota info, this function expects at least shared lock on the quota file 3877 * so that we can safely refresh quota info from disk. */ 3878 int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex) 3879 { 3880 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 3881 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 3882 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3883 int status = 0; 3884 3885 /* On RO devices, locking really isn't needed... */ 3886 if (ocfs2_is_hard_readonly(osb)) { 3887 if (ex) 3888 status = -EROFS; 3889 goto bail; 3890 } 3891 if (ocfs2_mount_local(osb)) 3892 goto bail; 3893 3894 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 3895 if (status < 0) { 3896 mlog_errno(status); 3897 goto bail; 3898 } 3899 if (!ocfs2_should_refresh_lock_res(lockres)) 3900 goto bail; 3901 /* OK, we have the lock but we need to refresh the quota info */ 3902 status = ocfs2_refresh_qinfo(oinfo); 3903 if (status) 3904 ocfs2_qinfo_unlock(oinfo, ex); 3905 ocfs2_complete_lock_res_refresh(lockres, status); 3906 bail: 3907 return status; 3908 } 3909 3910 int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex) 3911 { 3912 int status; 3913 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3914 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 3915 struct ocfs2_super *osb = lockres->l_priv; 3916 3917 3918 if (ocfs2_is_hard_readonly(osb)) 3919 return -EROFS; 3920 3921 if (ocfs2_mount_local(osb)) 3922 return 0; 3923 3924 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 3925 if (status < 0) 3926 mlog_errno(status); 3927 3928 return status; 3929 } 3930 3931 void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex) 3932 { 3933 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 3934 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 3935 struct ocfs2_super *osb = lockres->l_priv; 3936 3937 if (!ocfs2_mount_local(osb)) 3938 ocfs2_cluster_unlock(osb, lockres, level); 3939 } 3940 3941 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3942 struct ocfs2_lock_res *lockres) 3943 { 3944 int status; 3945 struct ocfs2_unblock_ctl ctl = {0, 0,}; 3946 unsigned long flags; 3947 3948 /* Our reference to the lockres in this function can be 3949 * considered valid until we remove the OCFS2_LOCK_QUEUED 3950 * flag. */ 3951 3952 BUG_ON(!lockres); 3953 BUG_ON(!lockres->l_ops); 3954 3955 mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name); 3956 3957 /* Detect whether a lock has been marked as going away while 3958 * the downconvert thread was processing other things. A lock can 3959 * still be marked with OCFS2_LOCK_FREEING after this check, 3960 * but short circuiting here will still save us some 3961 * performance. */ 3962 spin_lock_irqsave(&lockres->l_lock, flags); 3963 if (lockres->l_flags & OCFS2_LOCK_FREEING) 3964 goto unqueue; 3965 spin_unlock_irqrestore(&lockres->l_lock, flags); 3966 3967 status = ocfs2_unblock_lock(osb, lockres, &ctl); 3968 if (status < 0) 3969 mlog_errno(status); 3970 3971 spin_lock_irqsave(&lockres->l_lock, flags); 3972 unqueue: 3973 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 3974 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 3975 } else 3976 ocfs2_schedule_blocked_lock(osb, lockres); 3977 3978 mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name, 3979 ctl.requeue ? "yes" : "no"); 3980 spin_unlock_irqrestore(&lockres->l_lock, flags); 3981 3982 if (ctl.unblock_action != UNBLOCK_CONTINUE 3983 && lockres->l_ops->post_unlock) 3984 lockres->l_ops->post_unlock(osb, lockres); 3985 } 3986 3987 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 3988 struct ocfs2_lock_res *lockres) 3989 { 3990 unsigned long flags; 3991 3992 assert_spin_locked(&lockres->l_lock); 3993 3994 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 3995 /* Do not schedule a lock for downconvert when it's on 3996 * the way to destruction - any nodes wanting access 3997 * to the resource will get it soon. */ 3998 mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n", 3999 lockres->l_name, lockres->l_flags); 4000 return; 4001 } 4002 4003 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 4004 4005 spin_lock_irqsave(&osb->dc_task_lock, flags); 4006 if (list_empty(&lockres->l_blocked_list)) { 4007 list_add_tail(&lockres->l_blocked_list, 4008 &osb->blocked_lock_list); 4009 osb->blocked_lock_count++; 4010 } 4011 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4012 } 4013 4014 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) 4015 { 4016 unsigned long processed; 4017 unsigned long flags; 4018 struct ocfs2_lock_res *lockres; 4019 4020 spin_lock_irqsave(&osb->dc_task_lock, flags); 4021 /* grab this early so we know to try again if a state change and 4022 * wake happens part-way through our work */ 4023 osb->dc_work_sequence = osb->dc_wake_sequence; 4024 4025 processed = osb->blocked_lock_count; 4026 /* 4027 * blocked lock processing in this loop might call iput which can 4028 * remove items off osb->blocked_lock_list. Downconvert up to 4029 * 'processed' number of locks, but stop short if we had some 4030 * removed in ocfs2_mark_lockres_freeing when downconverting. 4031 */ 4032 while (processed && !list_empty(&osb->blocked_lock_list)) { 4033 lockres = list_entry(osb->blocked_lock_list.next, 4034 struct ocfs2_lock_res, l_blocked_list); 4035 list_del_init(&lockres->l_blocked_list); 4036 osb->blocked_lock_count--; 4037 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4038 4039 BUG_ON(!processed); 4040 processed--; 4041 4042 ocfs2_process_blocked_lock(osb, lockres); 4043 4044 spin_lock_irqsave(&osb->dc_task_lock, flags); 4045 } 4046 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4047 } 4048 4049 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) 4050 { 4051 int empty = 0; 4052 unsigned long flags; 4053 4054 spin_lock_irqsave(&osb->dc_task_lock, flags); 4055 if (list_empty(&osb->blocked_lock_list)) 4056 empty = 1; 4057 4058 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4059 return empty; 4060 } 4061 4062 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) 4063 { 4064 int should_wake = 0; 4065 unsigned long flags; 4066 4067 spin_lock_irqsave(&osb->dc_task_lock, flags); 4068 if (osb->dc_work_sequence != osb->dc_wake_sequence) 4069 should_wake = 1; 4070 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4071 4072 return should_wake; 4073 } 4074 4075 static int ocfs2_downconvert_thread(void *arg) 4076 { 4077 int status = 0; 4078 struct ocfs2_super *osb = arg; 4079 4080 /* only quit once we've been asked to stop and there is no more 4081 * work available */ 4082 while (!(kthread_should_stop() && 4083 ocfs2_downconvert_thread_lists_empty(osb))) { 4084 4085 wait_event_interruptible(osb->dc_event, 4086 ocfs2_downconvert_thread_should_wake(osb) || 4087 kthread_should_stop()); 4088 4089 mlog(0, "downconvert_thread: awoken\n"); 4090 4091 ocfs2_downconvert_thread_do_work(osb); 4092 } 4093 4094 osb->dc_task = NULL; 4095 return status; 4096 } 4097 4098 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) 4099 { 4100 unsigned long flags; 4101 4102 spin_lock_irqsave(&osb->dc_task_lock, flags); 4103 /* make sure the voting thread gets a swipe at whatever changes 4104 * the caller may have made to the voting state */ 4105 osb->dc_wake_sequence++; 4106 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4107 wake_up(&osb->dc_event); 4108 } 4109