1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26 #include <linux/types.h> 27 #include <linux/slab.h> 28 #include <linux/highmem.h> 29 #include <linux/mm.h> 30 #include <linux/kthread.h> 31 #include <linux/pagemap.h> 32 #include <linux/debugfs.h> 33 #include <linux/seq_file.h> 34 #include <linux/time.h> 35 #include <linux/quotaops.h> 36 #include <linux/sched/signal.h> 37 38 #define MLOG_MASK_PREFIX ML_DLM_GLUE 39 #include <cluster/masklog.h> 40 41 #include "ocfs2.h" 42 #include "ocfs2_lockingver.h" 43 44 #include "alloc.h" 45 #include "dcache.h" 46 #include "dlmglue.h" 47 #include "extent_map.h" 48 #include "file.h" 49 #include "heartbeat.h" 50 #include "inode.h" 51 #include "journal.h" 52 #include "stackglue.h" 53 #include "slot_map.h" 54 #include "super.h" 55 #include "uptodate.h" 56 #include "quota.h" 57 #include "refcounttree.h" 58 #include "acl.h" 59 60 #include "buffer_head_io.h" 61 62 struct ocfs2_mask_waiter { 63 struct list_head mw_item; 64 int mw_status; 65 struct completion mw_complete; 66 unsigned long mw_mask; 67 unsigned long mw_goal; 68 #ifdef CONFIG_OCFS2_FS_STATS 69 ktime_t mw_lock_start; 70 #endif 71 }; 72 73 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 74 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 75 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres); 76 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres); 77 78 /* 79 * Return value from ->downconvert_worker functions. 80 * 81 * These control the precise actions of ocfs2_unblock_lock() 82 * and ocfs2_process_blocked_lock() 83 * 84 */ 85 enum ocfs2_unblock_action { 86 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 87 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 88 * ->post_unlock callback */ 89 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 90 * ->post_unlock() callback. */ 91 }; 92 93 struct ocfs2_unblock_ctl { 94 int requeue; 95 enum ocfs2_unblock_action unblock_action; 96 }; 97 98 /* Lockdep class keys */ 99 struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES]; 100 101 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 102 int new_level); 103 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 104 105 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 106 int blocking); 107 108 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 109 int blocking); 110 111 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 112 struct ocfs2_lock_res *lockres); 113 114 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres); 115 116 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 117 int new_level); 118 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 119 int blocking); 120 121 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres) 122 123 /* This aids in debugging situations where a bad LVB might be involved. */ 124 static void ocfs2_dump_meta_lvb_info(u64 level, 125 const char *function, 126 unsigned int line, 127 struct ocfs2_lock_res *lockres) 128 { 129 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 130 131 mlog(level, "LVB information for %s (called from %s:%u):\n", 132 lockres->l_name, function, line); 133 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 134 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 135 be32_to_cpu(lvb->lvb_igeneration)); 136 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 137 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 138 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 139 be16_to_cpu(lvb->lvb_imode)); 140 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 141 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 142 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 143 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 144 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 145 be32_to_cpu(lvb->lvb_iattr)); 146 } 147 148 149 /* 150 * OCFS2 Lock Resource Operations 151 * 152 * These fine tune the behavior of the generic dlmglue locking infrastructure. 153 * 154 * The most basic of lock types can point ->l_priv to their respective 155 * struct ocfs2_super and allow the default actions to manage things. 156 * 157 * Right now, each lock type also needs to implement an init function, 158 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres() 159 * should be called when the lock is no longer needed (i.e., object 160 * destruction time). 161 */ 162 struct ocfs2_lock_res_ops { 163 /* 164 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 165 * this callback if ->l_priv is not an ocfs2_super pointer 166 */ 167 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 168 169 /* 170 * Optionally called in the downconvert thread after a 171 * successful downconvert. The lockres will not be referenced 172 * after this callback is called, so it is safe to free 173 * memory, etc. 174 * 175 * The exact semantics of when this is called are controlled 176 * by ->downconvert_worker() 177 */ 178 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 179 180 /* 181 * Allow a lock type to add checks to determine whether it is 182 * safe to downconvert a lock. Return 0 to re-queue the 183 * downconvert at a later time, nonzero to continue. 184 * 185 * For most locks, the default checks that there are no 186 * incompatible holders are sufficient. 187 * 188 * Called with the lockres spinlock held. 189 */ 190 int (*check_downconvert)(struct ocfs2_lock_res *, int); 191 192 /* 193 * Allows a lock type to populate the lock value block. This 194 * is called on downconvert, and when we drop a lock. 195 * 196 * Locks that want to use this should set LOCK_TYPE_USES_LVB 197 * in the flags field. 198 * 199 * Called with the lockres spinlock held. 200 */ 201 void (*set_lvb)(struct ocfs2_lock_res *); 202 203 /* 204 * Called from the downconvert thread when it is determined 205 * that a lock will be downconverted. This is called without 206 * any locks held so the function can do work that might 207 * schedule (syncing out data, etc). 208 * 209 * This should return any one of the ocfs2_unblock_action 210 * values, depending on what it wants the thread to do. 211 */ 212 int (*downconvert_worker)(struct ocfs2_lock_res *, int); 213 214 /* 215 * LOCK_TYPE_* flags which describe the specific requirements 216 * of a lock type. Descriptions of each individual flag follow. 217 */ 218 int flags; 219 }; 220 221 /* 222 * Some locks want to "refresh" potentially stale data when a 223 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 224 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 225 * individual lockres l_flags member from the ast function. It is 226 * expected that the locking wrapper will clear the 227 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 228 */ 229 #define LOCK_TYPE_REQUIRES_REFRESH 0x1 230 231 /* 232 * Indicate that a lock type makes use of the lock value block. The 233 * ->set_lvb lock type callback must be defined. 234 */ 235 #define LOCK_TYPE_USES_LVB 0x2 236 237 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 238 .get_osb = ocfs2_get_inode_osb, 239 .flags = 0, 240 }; 241 242 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = { 243 .get_osb = ocfs2_get_inode_osb, 244 .check_downconvert = ocfs2_check_meta_downconvert, 245 .set_lvb = ocfs2_set_meta_lvb, 246 .downconvert_worker = ocfs2_data_convert_worker, 247 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 248 }; 249 250 static struct ocfs2_lock_res_ops ocfs2_super_lops = { 251 .flags = LOCK_TYPE_REQUIRES_REFRESH, 252 }; 253 254 static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 255 .flags = 0, 256 }; 257 258 static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = { 259 .flags = 0, 260 }; 261 262 static struct ocfs2_lock_res_ops ocfs2_trim_fs_lops = { 263 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 264 }; 265 266 static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = { 267 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 268 }; 269 270 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 271 .get_osb = ocfs2_get_dentry_osb, 272 .post_unlock = ocfs2_dentry_post_unlock, 273 .downconvert_worker = ocfs2_dentry_convert_worker, 274 .flags = 0, 275 }; 276 277 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = { 278 .get_osb = ocfs2_get_inode_osb, 279 .flags = 0, 280 }; 281 282 static struct ocfs2_lock_res_ops ocfs2_flock_lops = { 283 .get_osb = ocfs2_get_file_osb, 284 .flags = 0, 285 }; 286 287 static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = { 288 .set_lvb = ocfs2_set_qinfo_lvb, 289 .get_osb = ocfs2_get_qinfo_osb, 290 .flags = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB, 291 }; 292 293 static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = { 294 .check_downconvert = ocfs2_check_refcount_downconvert, 295 .downconvert_worker = ocfs2_refcount_convert_worker, 296 .flags = 0, 297 }; 298 299 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 300 { 301 return lockres->l_type == OCFS2_LOCK_TYPE_META || 302 lockres->l_type == OCFS2_LOCK_TYPE_RW || 303 lockres->l_type == OCFS2_LOCK_TYPE_OPEN; 304 } 305 306 static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb) 307 { 308 return container_of(lksb, struct ocfs2_lock_res, l_lksb); 309 } 310 311 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 312 { 313 BUG_ON(!ocfs2_is_inode_lock(lockres)); 314 315 return (struct inode *) lockres->l_priv; 316 } 317 318 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 319 { 320 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 321 322 return (struct ocfs2_dentry_lock *)lockres->l_priv; 323 } 324 325 static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres) 326 { 327 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO); 328 329 return (struct ocfs2_mem_dqinfo *)lockres->l_priv; 330 } 331 332 static inline struct ocfs2_refcount_tree * 333 ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res) 334 { 335 return container_of(res, struct ocfs2_refcount_tree, rf_lockres); 336 } 337 338 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 339 { 340 if (lockres->l_ops->get_osb) 341 return lockres->l_ops->get_osb(lockres); 342 343 return (struct ocfs2_super *)lockres->l_priv; 344 } 345 346 static int ocfs2_lock_create(struct ocfs2_super *osb, 347 struct ocfs2_lock_res *lockres, 348 int level, 349 u32 dlm_flags); 350 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 351 int wanted); 352 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 353 struct ocfs2_lock_res *lockres, 354 int level, unsigned long caller_ip); 355 static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb, 356 struct ocfs2_lock_res *lockres, 357 int level) 358 { 359 __ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_); 360 } 361 362 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 363 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 364 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 365 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 366 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 367 struct ocfs2_lock_res *lockres); 368 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 369 int convert); 370 #define ocfs2_log_dlm_error(_func, _err, _lockres) do { \ 371 if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY) \ 372 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \ 373 _err, _func, _lockres->l_name); \ 374 else \ 375 mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n", \ 376 _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name, \ 377 (unsigned int)ocfs2_get_dentry_lock_ino(_lockres)); \ 378 } while (0) 379 static int ocfs2_downconvert_thread(void *arg); 380 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 381 struct ocfs2_lock_res *lockres); 382 static int ocfs2_inode_lock_update(struct inode *inode, 383 struct buffer_head **bh); 384 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 385 static inline int ocfs2_highest_compat_lock_level(int level); 386 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 387 int new_level); 388 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 389 struct ocfs2_lock_res *lockres, 390 int new_level, 391 int lvb, 392 unsigned int generation); 393 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 394 struct ocfs2_lock_res *lockres); 395 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 396 struct ocfs2_lock_res *lockres); 397 398 399 static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 400 u64 blkno, 401 u32 generation, 402 char *name) 403 { 404 int len; 405 406 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 407 408 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 409 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 410 (long long)blkno, generation); 411 412 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 413 414 mlog(0, "built lock resource with name: %s\n", name); 415 } 416 417 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 418 419 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 420 struct ocfs2_dlm_debug *dlm_debug) 421 { 422 mlog(0, "Add tracking for lockres %s\n", res->l_name); 423 424 spin_lock(&ocfs2_dlm_tracking_lock); 425 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 426 spin_unlock(&ocfs2_dlm_tracking_lock); 427 } 428 429 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 430 { 431 spin_lock(&ocfs2_dlm_tracking_lock); 432 if (!list_empty(&res->l_debug_list)) 433 list_del_init(&res->l_debug_list); 434 spin_unlock(&ocfs2_dlm_tracking_lock); 435 } 436 437 #ifdef CONFIG_OCFS2_FS_STATS 438 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 439 { 440 res->l_lock_refresh = 0; 441 memset(&res->l_lock_prmode, 0, sizeof(struct ocfs2_lock_stats)); 442 memset(&res->l_lock_exmode, 0, sizeof(struct ocfs2_lock_stats)); 443 } 444 445 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level, 446 struct ocfs2_mask_waiter *mw, int ret) 447 { 448 u32 usec; 449 ktime_t kt; 450 struct ocfs2_lock_stats *stats; 451 452 if (level == LKM_PRMODE) 453 stats = &res->l_lock_prmode; 454 else if (level == LKM_EXMODE) 455 stats = &res->l_lock_exmode; 456 else 457 return; 458 459 kt = ktime_sub(ktime_get(), mw->mw_lock_start); 460 usec = ktime_to_us(kt); 461 462 stats->ls_gets++; 463 stats->ls_total += ktime_to_ns(kt); 464 /* overflow */ 465 if (unlikely(stats->ls_gets == 0)) { 466 stats->ls_gets++; 467 stats->ls_total = ktime_to_ns(kt); 468 } 469 470 if (stats->ls_max < usec) 471 stats->ls_max = usec; 472 473 if (ret) 474 stats->ls_fail++; 475 } 476 477 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 478 { 479 lockres->l_lock_refresh++; 480 } 481 482 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 483 { 484 mw->mw_lock_start = ktime_get(); 485 } 486 #else 487 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 488 { 489 } 490 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, 491 int level, struct ocfs2_mask_waiter *mw, int ret) 492 { 493 } 494 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 495 { 496 } 497 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 498 { 499 } 500 #endif 501 502 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 503 struct ocfs2_lock_res *res, 504 enum ocfs2_lock_type type, 505 struct ocfs2_lock_res_ops *ops, 506 void *priv) 507 { 508 res->l_type = type; 509 res->l_ops = ops; 510 res->l_priv = priv; 511 512 res->l_level = DLM_LOCK_IV; 513 res->l_requested = DLM_LOCK_IV; 514 res->l_blocking = DLM_LOCK_IV; 515 res->l_action = OCFS2_AST_INVALID; 516 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 517 518 res->l_flags = OCFS2_LOCK_INITIALIZED; 519 520 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 521 522 ocfs2_init_lock_stats(res); 523 #ifdef CONFIG_DEBUG_LOCK_ALLOC 524 if (type != OCFS2_LOCK_TYPE_OPEN) 525 lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type], 526 &lockdep_keys[type], 0); 527 else 528 res->l_lockdep_map.key = NULL; 529 #endif 530 } 531 532 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 533 { 534 /* This also clears out the lock status block */ 535 memset(res, 0, sizeof(struct ocfs2_lock_res)); 536 spin_lock_init(&res->l_lock); 537 init_waitqueue_head(&res->l_event); 538 INIT_LIST_HEAD(&res->l_blocked_list); 539 INIT_LIST_HEAD(&res->l_mask_waiters); 540 INIT_LIST_HEAD(&res->l_holders); 541 } 542 543 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 544 enum ocfs2_lock_type type, 545 unsigned int generation, 546 struct inode *inode) 547 { 548 struct ocfs2_lock_res_ops *ops; 549 550 switch(type) { 551 case OCFS2_LOCK_TYPE_RW: 552 ops = &ocfs2_inode_rw_lops; 553 break; 554 case OCFS2_LOCK_TYPE_META: 555 ops = &ocfs2_inode_inode_lops; 556 break; 557 case OCFS2_LOCK_TYPE_OPEN: 558 ops = &ocfs2_inode_open_lops; 559 break; 560 default: 561 mlog_bug_on_msg(1, "type: %d\n", type); 562 ops = NULL; /* thanks, gcc */ 563 break; 564 }; 565 566 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 567 generation, res->l_name); 568 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 569 } 570 571 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 572 { 573 struct inode *inode = ocfs2_lock_res_inode(lockres); 574 575 return OCFS2_SB(inode->i_sb); 576 } 577 578 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres) 579 { 580 struct ocfs2_mem_dqinfo *info = lockres->l_priv; 581 582 return OCFS2_SB(info->dqi_gi.dqi_sb); 583 } 584 585 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres) 586 { 587 struct ocfs2_file_private *fp = lockres->l_priv; 588 589 return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb); 590 } 591 592 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 593 { 594 __be64 inode_blkno_be; 595 596 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 597 sizeof(__be64)); 598 599 return be64_to_cpu(inode_blkno_be); 600 } 601 602 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 603 { 604 struct ocfs2_dentry_lock *dl = lockres->l_priv; 605 606 return OCFS2_SB(dl->dl_inode->i_sb); 607 } 608 609 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 610 u64 parent, struct inode *inode) 611 { 612 int len; 613 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 614 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 615 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 616 617 ocfs2_lock_res_init_once(lockres); 618 619 /* 620 * Unfortunately, the standard lock naming scheme won't work 621 * here because we have two 16 byte values to use. Instead, 622 * we'll stuff the inode number as a binary value. We still 623 * want error prints to show something without garbling the 624 * display, so drop a null byte in there before the inode 625 * number. A future version of OCFS2 will likely use all 626 * binary lock names. The stringified names have been a 627 * tremendous aid in debugging, but now that the debugfs 628 * interface exists, we can mangle things there if need be. 629 * 630 * NOTE: We also drop the standard "pad" value (the total lock 631 * name size stays the same though - the last part is all 632 * zeros due to the memset in ocfs2_lock_res_init_once() 633 */ 634 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 635 "%c%016llx", 636 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 637 (long long)parent); 638 639 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 640 641 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 642 sizeof(__be64)); 643 644 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 645 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 646 dl); 647 } 648 649 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 650 struct ocfs2_super *osb) 651 { 652 /* Superblock lockres doesn't come from a slab so we call init 653 * once on it manually. */ 654 ocfs2_lock_res_init_once(res); 655 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 656 0, res->l_name); 657 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 658 &ocfs2_super_lops, osb); 659 } 660 661 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 662 struct ocfs2_super *osb) 663 { 664 /* Rename lockres doesn't come from a slab so we call init 665 * once on it manually. */ 666 ocfs2_lock_res_init_once(res); 667 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 668 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 669 &ocfs2_rename_lops, osb); 670 } 671 672 static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res, 673 struct ocfs2_super *osb) 674 { 675 /* nfs_sync lockres doesn't come from a slab so we call init 676 * once on it manually. */ 677 ocfs2_lock_res_init_once(res); 678 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name); 679 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC, 680 &ocfs2_nfs_sync_lops, osb); 681 } 682 683 void ocfs2_trim_fs_lock_res_init(struct ocfs2_super *osb) 684 { 685 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 686 687 ocfs2_lock_res_init_once(lockres); 688 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_TRIM_FS, 0, 0, lockres->l_name); 689 ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_TRIM_FS, 690 &ocfs2_trim_fs_lops, osb); 691 } 692 693 void ocfs2_trim_fs_lock_res_uninit(struct ocfs2_super *osb) 694 { 695 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 696 697 ocfs2_simple_drop_lockres(osb, lockres); 698 ocfs2_lock_res_free(lockres); 699 } 700 701 static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res, 702 struct ocfs2_super *osb) 703 { 704 ocfs2_lock_res_init_once(res); 705 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name); 706 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN, 707 &ocfs2_orphan_scan_lops, osb); 708 } 709 710 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 711 struct ocfs2_file_private *fp) 712 { 713 struct inode *inode = fp->fp_file->f_mapping->host; 714 struct ocfs2_inode_info *oi = OCFS2_I(inode); 715 716 ocfs2_lock_res_init_once(lockres); 717 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno, 718 inode->i_generation, lockres->l_name); 719 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 720 OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops, 721 fp); 722 lockres->l_flags |= OCFS2_LOCK_NOCACHE; 723 } 724 725 void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres, 726 struct ocfs2_mem_dqinfo *info) 727 { 728 ocfs2_lock_res_init_once(lockres); 729 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type, 730 0, lockres->l_name); 731 ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres, 732 OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops, 733 info); 734 } 735 736 void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres, 737 struct ocfs2_super *osb, u64 ref_blkno, 738 unsigned int generation) 739 { 740 ocfs2_lock_res_init_once(lockres); 741 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno, 742 generation, lockres->l_name); 743 ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT, 744 &ocfs2_refcount_block_lops, osb); 745 } 746 747 void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 748 { 749 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 750 return; 751 752 ocfs2_remove_lockres_tracking(res); 753 754 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 755 "Lockres %s is on the blocked list\n", 756 res->l_name); 757 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 758 "Lockres %s has mask waiters pending\n", 759 res->l_name); 760 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 761 "Lockres %s is locked\n", 762 res->l_name); 763 mlog_bug_on_msg(res->l_ro_holders, 764 "Lockres %s has %u ro holders\n", 765 res->l_name, res->l_ro_holders); 766 mlog_bug_on_msg(res->l_ex_holders, 767 "Lockres %s has %u ex holders\n", 768 res->l_name, res->l_ex_holders); 769 770 /* Need to clear out the lock status block for the dlm */ 771 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 772 773 res->l_flags = 0UL; 774 } 775 776 /* 777 * Keep a list of processes who have interest in a lockres. 778 * Note: this is now only uesed for check recursive cluster locking. 779 */ 780 static inline void ocfs2_add_holder(struct ocfs2_lock_res *lockres, 781 struct ocfs2_lock_holder *oh) 782 { 783 INIT_LIST_HEAD(&oh->oh_list); 784 oh->oh_owner_pid = get_pid(task_pid(current)); 785 786 spin_lock(&lockres->l_lock); 787 list_add_tail(&oh->oh_list, &lockres->l_holders); 788 spin_unlock(&lockres->l_lock); 789 } 790 791 static struct ocfs2_lock_holder * 792 ocfs2_pid_holder(struct ocfs2_lock_res *lockres, 793 struct pid *pid) 794 { 795 struct ocfs2_lock_holder *oh; 796 797 spin_lock(&lockres->l_lock); 798 list_for_each_entry(oh, &lockres->l_holders, oh_list) { 799 if (oh->oh_owner_pid == pid) { 800 spin_unlock(&lockres->l_lock); 801 return oh; 802 } 803 } 804 spin_unlock(&lockres->l_lock); 805 return NULL; 806 } 807 808 static inline void ocfs2_remove_holder(struct ocfs2_lock_res *lockres, 809 struct ocfs2_lock_holder *oh) 810 { 811 spin_lock(&lockres->l_lock); 812 list_del(&oh->oh_list); 813 spin_unlock(&lockres->l_lock); 814 815 put_pid(oh->oh_owner_pid); 816 } 817 818 819 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 820 int level) 821 { 822 BUG_ON(!lockres); 823 824 switch(level) { 825 case DLM_LOCK_EX: 826 lockres->l_ex_holders++; 827 break; 828 case DLM_LOCK_PR: 829 lockres->l_ro_holders++; 830 break; 831 default: 832 BUG(); 833 } 834 } 835 836 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 837 int level) 838 { 839 BUG_ON(!lockres); 840 841 switch(level) { 842 case DLM_LOCK_EX: 843 BUG_ON(!lockres->l_ex_holders); 844 lockres->l_ex_holders--; 845 break; 846 case DLM_LOCK_PR: 847 BUG_ON(!lockres->l_ro_holders); 848 lockres->l_ro_holders--; 849 break; 850 default: 851 BUG(); 852 } 853 } 854 855 /* WARNING: This function lives in a world where the only three lock 856 * levels are EX, PR, and NL. It *will* have to be adjusted when more 857 * lock types are added. */ 858 static inline int ocfs2_highest_compat_lock_level(int level) 859 { 860 int new_level = DLM_LOCK_EX; 861 862 if (level == DLM_LOCK_EX) 863 new_level = DLM_LOCK_NL; 864 else if (level == DLM_LOCK_PR) 865 new_level = DLM_LOCK_PR; 866 return new_level; 867 } 868 869 static void lockres_set_flags(struct ocfs2_lock_res *lockres, 870 unsigned long newflags) 871 { 872 struct ocfs2_mask_waiter *mw, *tmp; 873 874 assert_spin_locked(&lockres->l_lock); 875 876 lockres->l_flags = newflags; 877 878 list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) { 879 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 880 continue; 881 882 list_del_init(&mw->mw_item); 883 mw->mw_status = 0; 884 complete(&mw->mw_complete); 885 } 886 } 887 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 888 { 889 lockres_set_flags(lockres, lockres->l_flags | or); 890 } 891 static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 892 unsigned long clear) 893 { 894 lockres_set_flags(lockres, lockres->l_flags & ~clear); 895 } 896 897 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 898 { 899 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 900 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 901 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 902 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 903 904 lockres->l_level = lockres->l_requested; 905 if (lockres->l_level <= 906 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 907 lockres->l_blocking = DLM_LOCK_NL; 908 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 909 } 910 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 911 } 912 913 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 914 { 915 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 916 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 917 918 /* Convert from RO to EX doesn't really need anything as our 919 * information is already up to data. Convert from NL to 920 * *anything* however should mark ourselves as needing an 921 * update */ 922 if (lockres->l_level == DLM_LOCK_NL && 923 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 924 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 925 926 lockres->l_level = lockres->l_requested; 927 928 /* 929 * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing 930 * the OCFS2_LOCK_BUSY flag to prevent the dc thread from 931 * downconverting the lock before the upconvert has fully completed. 932 * Do not prevent the dc thread from downconverting if NONBLOCK lock 933 * had already returned. 934 */ 935 if (!(lockres->l_flags & OCFS2_LOCK_NONBLOCK_FINISHED)) 936 lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 937 else 938 lockres_clear_flags(lockres, OCFS2_LOCK_NONBLOCK_FINISHED); 939 940 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 941 } 942 943 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 944 { 945 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); 946 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 947 948 if (lockres->l_requested > DLM_LOCK_NL && 949 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 950 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 951 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 952 953 lockres->l_level = lockres->l_requested; 954 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 955 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 956 } 957 958 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 959 int level) 960 { 961 int needs_downconvert = 0; 962 963 assert_spin_locked(&lockres->l_lock); 964 965 if (level > lockres->l_blocking) { 966 /* only schedule a downconvert if we haven't already scheduled 967 * one that goes low enough to satisfy the level we're 968 * blocking. this also catches the case where we get 969 * duplicate BASTs */ 970 if (ocfs2_highest_compat_lock_level(level) < 971 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 972 needs_downconvert = 1; 973 974 lockres->l_blocking = level; 975 } 976 977 mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n", 978 lockres->l_name, level, lockres->l_level, lockres->l_blocking, 979 needs_downconvert); 980 981 if (needs_downconvert) 982 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 983 mlog(0, "needs_downconvert = %d\n", needs_downconvert); 984 return needs_downconvert; 985 } 986 987 /* 988 * OCFS2_LOCK_PENDING and l_pending_gen. 989 * 990 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting 991 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock() 992 * for more details on the race. 993 * 994 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces 995 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock() 996 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear 997 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns, 998 * the caller is going to try to clear PENDING again. If nothing else is 999 * happening, __lockres_clear_pending() sees PENDING is unset and does 1000 * nothing. 1001 * 1002 * But what if another path (eg downconvert thread) has just started a 1003 * new locking action? The other path has re-set PENDING. Our path 1004 * cannot clear PENDING, because that will re-open the original race 1005 * window. 1006 * 1007 * [Example] 1008 * 1009 * ocfs2_meta_lock() 1010 * ocfs2_cluster_lock() 1011 * set BUSY 1012 * set PENDING 1013 * drop l_lock 1014 * ocfs2_dlm_lock() 1015 * ocfs2_locking_ast() ocfs2_downconvert_thread() 1016 * clear PENDING ocfs2_unblock_lock() 1017 * take_l_lock 1018 * !BUSY 1019 * ocfs2_prepare_downconvert() 1020 * set BUSY 1021 * set PENDING 1022 * drop l_lock 1023 * take l_lock 1024 * clear PENDING 1025 * drop l_lock 1026 * <window> 1027 * ocfs2_dlm_lock() 1028 * 1029 * So as you can see, we now have a window where l_lock is not held, 1030 * PENDING is not set, and ocfs2_dlm_lock() has not been called. 1031 * 1032 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING 1033 * set by ocfs2_prepare_downconvert(). That wasn't nice. 1034 * 1035 * To solve this we introduce l_pending_gen. A call to 1036 * lockres_clear_pending() will only do so when it is passed a generation 1037 * number that matches the lockres. lockres_set_pending() will return the 1038 * current generation number. When ocfs2_cluster_lock() goes to clear 1039 * PENDING, it passes the generation it got from set_pending(). In our 1040 * example above, the generation numbers will *not* match. Thus, 1041 * ocfs2_cluster_lock() will not clear the PENDING set by 1042 * ocfs2_prepare_downconvert(). 1043 */ 1044 1045 /* Unlocked version for ocfs2_locking_ast() */ 1046 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres, 1047 unsigned int generation, 1048 struct ocfs2_super *osb) 1049 { 1050 assert_spin_locked(&lockres->l_lock); 1051 1052 /* 1053 * The ast and locking functions can race us here. The winner 1054 * will clear pending, the loser will not. 1055 */ 1056 if (!(lockres->l_flags & OCFS2_LOCK_PENDING) || 1057 (lockres->l_pending_gen != generation)) 1058 return; 1059 1060 lockres_clear_flags(lockres, OCFS2_LOCK_PENDING); 1061 lockres->l_pending_gen++; 1062 1063 /* 1064 * The downconvert thread may have skipped us because we 1065 * were PENDING. Wake it up. 1066 */ 1067 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1068 ocfs2_wake_downconvert_thread(osb); 1069 } 1070 1071 /* Locked version for callers of ocfs2_dlm_lock() */ 1072 static void lockres_clear_pending(struct ocfs2_lock_res *lockres, 1073 unsigned int generation, 1074 struct ocfs2_super *osb) 1075 { 1076 unsigned long flags; 1077 1078 spin_lock_irqsave(&lockres->l_lock, flags); 1079 __lockres_clear_pending(lockres, generation, osb); 1080 spin_unlock_irqrestore(&lockres->l_lock, flags); 1081 } 1082 1083 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres) 1084 { 1085 assert_spin_locked(&lockres->l_lock); 1086 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 1087 1088 lockres_or_flags(lockres, OCFS2_LOCK_PENDING); 1089 1090 return lockres->l_pending_gen; 1091 } 1092 1093 static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level) 1094 { 1095 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1096 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1097 int needs_downconvert; 1098 unsigned long flags; 1099 1100 BUG_ON(level <= DLM_LOCK_NL); 1101 1102 mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, " 1103 "type %s\n", lockres->l_name, level, lockres->l_level, 1104 ocfs2_lock_type_string(lockres->l_type)); 1105 1106 /* 1107 * We can skip the bast for locks which don't enable caching - 1108 * they'll be dropped at the earliest possible time anyway. 1109 */ 1110 if (lockres->l_flags & OCFS2_LOCK_NOCACHE) 1111 return; 1112 1113 spin_lock_irqsave(&lockres->l_lock, flags); 1114 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 1115 if (needs_downconvert) 1116 ocfs2_schedule_blocked_lock(osb, lockres); 1117 spin_unlock_irqrestore(&lockres->l_lock, flags); 1118 1119 wake_up(&lockres->l_event); 1120 1121 ocfs2_wake_downconvert_thread(osb); 1122 } 1123 1124 static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb) 1125 { 1126 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1127 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1128 unsigned long flags; 1129 int status; 1130 1131 spin_lock_irqsave(&lockres->l_lock, flags); 1132 1133 status = ocfs2_dlm_lock_status(&lockres->l_lksb); 1134 1135 if (status == -EAGAIN) { 1136 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1137 goto out; 1138 } 1139 1140 if (status) { 1141 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n", 1142 lockres->l_name, status); 1143 spin_unlock_irqrestore(&lockres->l_lock, flags); 1144 return; 1145 } 1146 1147 mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, " 1148 "level %d => %d\n", lockres->l_name, lockres->l_action, 1149 lockres->l_unlock_action, lockres->l_level, lockres->l_requested); 1150 1151 switch(lockres->l_action) { 1152 case OCFS2_AST_ATTACH: 1153 ocfs2_generic_handle_attach_action(lockres); 1154 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 1155 break; 1156 case OCFS2_AST_CONVERT: 1157 ocfs2_generic_handle_convert_action(lockres); 1158 break; 1159 case OCFS2_AST_DOWNCONVERT: 1160 ocfs2_generic_handle_downconvert_action(lockres); 1161 break; 1162 default: 1163 mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, " 1164 "flags 0x%lx, unlock: %u\n", 1165 lockres->l_name, lockres->l_action, lockres->l_flags, 1166 lockres->l_unlock_action); 1167 BUG(); 1168 } 1169 out: 1170 /* set it to something invalid so if we get called again we 1171 * can catch it. */ 1172 lockres->l_action = OCFS2_AST_INVALID; 1173 1174 /* Did we try to cancel this lock? Clear that state */ 1175 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) 1176 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1177 1178 /* 1179 * We may have beaten the locking functions here. We certainly 1180 * know that dlm_lock() has been called :-) 1181 * Because we can't have two lock calls in flight at once, we 1182 * can use lockres->l_pending_gen. 1183 */ 1184 __lockres_clear_pending(lockres, lockres->l_pending_gen, osb); 1185 1186 wake_up(&lockres->l_event); 1187 spin_unlock_irqrestore(&lockres->l_lock, flags); 1188 } 1189 1190 static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error) 1191 { 1192 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1193 unsigned long flags; 1194 1195 mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n", 1196 lockres->l_name, lockres->l_unlock_action); 1197 1198 spin_lock_irqsave(&lockres->l_lock, flags); 1199 if (error) { 1200 mlog(ML_ERROR, "Dlm passes error %d for lock %s, " 1201 "unlock_action %d\n", error, lockres->l_name, 1202 lockres->l_unlock_action); 1203 spin_unlock_irqrestore(&lockres->l_lock, flags); 1204 return; 1205 } 1206 1207 switch(lockres->l_unlock_action) { 1208 case OCFS2_UNLOCK_CANCEL_CONVERT: 1209 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 1210 lockres->l_action = OCFS2_AST_INVALID; 1211 /* Downconvert thread may have requeued this lock, we 1212 * need to wake it. */ 1213 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1214 ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres)); 1215 break; 1216 case OCFS2_UNLOCK_DROP_LOCK: 1217 lockres->l_level = DLM_LOCK_IV; 1218 break; 1219 default: 1220 BUG(); 1221 } 1222 1223 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1224 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1225 wake_up(&lockres->l_event); 1226 spin_unlock_irqrestore(&lockres->l_lock, flags); 1227 } 1228 1229 /* 1230 * This is the filesystem locking protocol. It provides the lock handling 1231 * hooks for the underlying DLM. It has a maximum version number. 1232 * The version number allows interoperability with systems running at 1233 * the same major number and an equal or smaller minor number. 1234 * 1235 * Whenever the filesystem does new things with locks (adds or removes a 1236 * lock, orders them differently, does different things underneath a lock), 1237 * the version must be changed. The protocol is negotiated when joining 1238 * the dlm domain. A node may join the domain if its major version is 1239 * identical to all other nodes and its minor version is greater than 1240 * or equal to all other nodes. When its minor version is greater than 1241 * the other nodes, it will run at the minor version specified by the 1242 * other nodes. 1243 * 1244 * If a locking change is made that will not be compatible with older 1245 * versions, the major number must be increased and the minor version set 1246 * to zero. If a change merely adds a behavior that can be disabled when 1247 * speaking to older versions, the minor version must be increased. If a 1248 * change adds a fully backwards compatible change (eg, LVB changes that 1249 * are just ignored by older versions), the version does not need to be 1250 * updated. 1251 */ 1252 static struct ocfs2_locking_protocol lproto = { 1253 .lp_max_version = { 1254 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 1255 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 1256 }, 1257 .lp_lock_ast = ocfs2_locking_ast, 1258 .lp_blocking_ast = ocfs2_blocking_ast, 1259 .lp_unlock_ast = ocfs2_unlock_ast, 1260 }; 1261 1262 void ocfs2_set_locking_protocol(void) 1263 { 1264 ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version); 1265 } 1266 1267 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 1268 int convert) 1269 { 1270 unsigned long flags; 1271 1272 spin_lock_irqsave(&lockres->l_lock, flags); 1273 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1274 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1275 if (convert) 1276 lockres->l_action = OCFS2_AST_INVALID; 1277 else 1278 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1279 spin_unlock_irqrestore(&lockres->l_lock, flags); 1280 1281 wake_up(&lockres->l_event); 1282 } 1283 1284 /* Note: If we detect another process working on the lock (i.e., 1285 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 1286 * to do the right thing in that case. 1287 */ 1288 static int ocfs2_lock_create(struct ocfs2_super *osb, 1289 struct ocfs2_lock_res *lockres, 1290 int level, 1291 u32 dlm_flags) 1292 { 1293 int ret = 0; 1294 unsigned long flags; 1295 unsigned int gen; 1296 1297 mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level, 1298 dlm_flags); 1299 1300 spin_lock_irqsave(&lockres->l_lock, flags); 1301 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 1302 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 1303 spin_unlock_irqrestore(&lockres->l_lock, flags); 1304 goto bail; 1305 } 1306 1307 lockres->l_action = OCFS2_AST_ATTACH; 1308 lockres->l_requested = level; 1309 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1310 gen = lockres_set_pending(lockres); 1311 spin_unlock_irqrestore(&lockres->l_lock, flags); 1312 1313 ret = ocfs2_dlm_lock(osb->cconn, 1314 level, 1315 &lockres->l_lksb, 1316 dlm_flags, 1317 lockres->l_name, 1318 OCFS2_LOCK_ID_MAX_LEN - 1); 1319 lockres_clear_pending(lockres, gen, osb); 1320 if (ret) { 1321 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1322 ocfs2_recover_from_dlm_error(lockres, 1); 1323 } 1324 1325 mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name); 1326 1327 bail: 1328 return ret; 1329 } 1330 1331 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 1332 int flag) 1333 { 1334 unsigned long flags; 1335 int ret; 1336 1337 spin_lock_irqsave(&lockres->l_lock, flags); 1338 ret = lockres->l_flags & flag; 1339 spin_unlock_irqrestore(&lockres->l_lock, flags); 1340 1341 return ret; 1342 } 1343 1344 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 1345 1346 { 1347 wait_event(lockres->l_event, 1348 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 1349 } 1350 1351 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 1352 1353 { 1354 wait_event(lockres->l_event, 1355 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 1356 } 1357 1358 /* predict what lock level we'll be dropping down to on behalf 1359 * of another node, and return true if the currently wanted 1360 * level will be compatible with it. */ 1361 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 1362 int wanted) 1363 { 1364 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 1365 1366 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 1367 } 1368 1369 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 1370 { 1371 INIT_LIST_HEAD(&mw->mw_item); 1372 init_completion(&mw->mw_complete); 1373 ocfs2_init_start_time(mw); 1374 } 1375 1376 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 1377 { 1378 wait_for_completion(&mw->mw_complete); 1379 /* Re-arm the completion in case we want to wait on it again */ 1380 reinit_completion(&mw->mw_complete); 1381 return mw->mw_status; 1382 } 1383 1384 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 1385 struct ocfs2_mask_waiter *mw, 1386 unsigned long mask, 1387 unsigned long goal) 1388 { 1389 BUG_ON(!list_empty(&mw->mw_item)); 1390 1391 assert_spin_locked(&lockres->l_lock); 1392 1393 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 1394 mw->mw_mask = mask; 1395 mw->mw_goal = goal; 1396 } 1397 1398 /* returns 0 if the mw that was removed was already satisfied, -EBUSY 1399 * if the mask still hadn't reached its goal */ 1400 static int __lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1401 struct ocfs2_mask_waiter *mw) 1402 { 1403 int ret = 0; 1404 1405 assert_spin_locked(&lockres->l_lock); 1406 if (!list_empty(&mw->mw_item)) { 1407 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 1408 ret = -EBUSY; 1409 1410 list_del_init(&mw->mw_item); 1411 init_completion(&mw->mw_complete); 1412 } 1413 1414 return ret; 1415 } 1416 1417 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1418 struct ocfs2_mask_waiter *mw) 1419 { 1420 unsigned long flags; 1421 int ret = 0; 1422 1423 spin_lock_irqsave(&lockres->l_lock, flags); 1424 ret = __lockres_remove_mask_waiter(lockres, mw); 1425 spin_unlock_irqrestore(&lockres->l_lock, flags); 1426 1427 return ret; 1428 1429 } 1430 1431 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, 1432 struct ocfs2_lock_res *lockres) 1433 { 1434 int ret; 1435 1436 ret = wait_for_completion_interruptible(&mw->mw_complete); 1437 if (ret) 1438 lockres_remove_mask_waiter(lockres, mw); 1439 else 1440 ret = mw->mw_status; 1441 /* Re-arm the completion in case we want to wait on it again */ 1442 reinit_completion(&mw->mw_complete); 1443 return ret; 1444 } 1445 1446 static int __ocfs2_cluster_lock(struct ocfs2_super *osb, 1447 struct ocfs2_lock_res *lockres, 1448 int level, 1449 u32 lkm_flags, 1450 int arg_flags, 1451 int l_subclass, 1452 unsigned long caller_ip) 1453 { 1454 struct ocfs2_mask_waiter mw; 1455 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1456 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1457 unsigned long flags; 1458 unsigned int gen; 1459 int noqueue_attempted = 0; 1460 int dlm_locked = 0; 1461 int kick_dc = 0; 1462 1463 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) { 1464 mlog_errno(-EINVAL); 1465 return -EINVAL; 1466 } 1467 1468 ocfs2_init_mask_waiter(&mw); 1469 1470 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 1471 lkm_flags |= DLM_LKF_VALBLK; 1472 1473 again: 1474 wait = 0; 1475 1476 spin_lock_irqsave(&lockres->l_lock, flags); 1477 1478 if (catch_signals && signal_pending(current)) { 1479 ret = -ERESTARTSYS; 1480 goto unlock; 1481 } 1482 1483 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1484 "Cluster lock called on freeing lockres %s! flags " 1485 "0x%lx\n", lockres->l_name, lockres->l_flags); 1486 1487 /* We only compare against the currently granted level 1488 * here. If the lock is blocked waiting on a downconvert, 1489 * we'll get caught below. */ 1490 if (lockres->l_flags & OCFS2_LOCK_BUSY && 1491 level > lockres->l_level) { 1492 /* is someone sitting in dlm_lock? If so, wait on 1493 * them. */ 1494 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1495 wait = 1; 1496 goto unlock; 1497 } 1498 1499 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) { 1500 /* 1501 * We've upconverted. If the lock now has a level we can 1502 * work with, we take it. If, however, the lock is not at the 1503 * required level, we go thru the full cycle. One way this could 1504 * happen is if a process requesting an upconvert to PR is 1505 * closely followed by another requesting upconvert to an EX. 1506 * If the process requesting EX lands here, we want it to 1507 * continue attempting to upconvert and let the process 1508 * requesting PR take the lock. 1509 * If multiple processes request upconvert to PR, the first one 1510 * here will take the lock. The others will have to go thru the 1511 * OCFS2_LOCK_BLOCKED check to ensure that there is no pending 1512 * downconvert request. 1513 */ 1514 if (level <= lockres->l_level) 1515 goto update_holders; 1516 } 1517 1518 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1519 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1520 /* is the lock is currently blocked on behalf of 1521 * another node */ 1522 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 1523 wait = 1; 1524 goto unlock; 1525 } 1526 1527 if (level > lockres->l_level) { 1528 if (noqueue_attempted > 0) { 1529 ret = -EAGAIN; 1530 goto unlock; 1531 } 1532 if (lkm_flags & DLM_LKF_NOQUEUE) 1533 noqueue_attempted = 1; 1534 1535 if (lockres->l_action != OCFS2_AST_INVALID) 1536 mlog(ML_ERROR, "lockres %s has action %u pending\n", 1537 lockres->l_name, lockres->l_action); 1538 1539 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1540 lockres->l_action = OCFS2_AST_ATTACH; 1541 lkm_flags &= ~DLM_LKF_CONVERT; 1542 } else { 1543 lockres->l_action = OCFS2_AST_CONVERT; 1544 lkm_flags |= DLM_LKF_CONVERT; 1545 } 1546 1547 lockres->l_requested = level; 1548 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1549 gen = lockres_set_pending(lockres); 1550 spin_unlock_irqrestore(&lockres->l_lock, flags); 1551 1552 BUG_ON(level == DLM_LOCK_IV); 1553 BUG_ON(level == DLM_LOCK_NL); 1554 1555 mlog(ML_BASTS, "lockres %s, convert from %d to %d\n", 1556 lockres->l_name, lockres->l_level, level); 1557 1558 /* call dlm_lock to upgrade lock now */ 1559 ret = ocfs2_dlm_lock(osb->cconn, 1560 level, 1561 &lockres->l_lksb, 1562 lkm_flags, 1563 lockres->l_name, 1564 OCFS2_LOCK_ID_MAX_LEN - 1); 1565 lockres_clear_pending(lockres, gen, osb); 1566 if (ret) { 1567 if (!(lkm_flags & DLM_LKF_NOQUEUE) || 1568 (ret != -EAGAIN)) { 1569 ocfs2_log_dlm_error("ocfs2_dlm_lock", 1570 ret, lockres); 1571 } 1572 ocfs2_recover_from_dlm_error(lockres, 1); 1573 goto out; 1574 } 1575 dlm_locked = 1; 1576 1577 mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n", 1578 lockres->l_name); 1579 1580 /* At this point we've gone inside the dlm and need to 1581 * complete our work regardless. */ 1582 catch_signals = 0; 1583 1584 /* wait for busy to clear and carry on */ 1585 goto again; 1586 } 1587 1588 update_holders: 1589 /* Ok, if we get here then we're good to go. */ 1590 ocfs2_inc_holders(lockres, level); 1591 1592 ret = 0; 1593 unlock: 1594 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1595 1596 /* ocfs2_unblock_lock reques on seeing OCFS2_LOCK_UPCONVERT_FINISHING */ 1597 kick_dc = (lockres->l_flags & OCFS2_LOCK_BLOCKED); 1598 1599 spin_unlock_irqrestore(&lockres->l_lock, flags); 1600 if (kick_dc) 1601 ocfs2_wake_downconvert_thread(osb); 1602 out: 1603 /* 1604 * This is helping work around a lock inversion between the page lock 1605 * and dlm locks. One path holds the page lock while calling aops 1606 * which block acquiring dlm locks. The voting thread holds dlm 1607 * locks while acquiring page locks while down converting data locks. 1608 * This block is helping an aop path notice the inversion and back 1609 * off to unlock its page lock before trying the dlm lock again. 1610 */ 1611 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1612 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1613 wait = 0; 1614 spin_lock_irqsave(&lockres->l_lock, flags); 1615 if (__lockres_remove_mask_waiter(lockres, &mw)) { 1616 if (dlm_locked) 1617 lockres_or_flags(lockres, 1618 OCFS2_LOCK_NONBLOCK_FINISHED); 1619 spin_unlock_irqrestore(&lockres->l_lock, flags); 1620 ret = -EAGAIN; 1621 } else { 1622 spin_unlock_irqrestore(&lockres->l_lock, flags); 1623 goto again; 1624 } 1625 } 1626 if (wait) { 1627 ret = ocfs2_wait_for_mask(&mw); 1628 if (ret == 0) 1629 goto again; 1630 mlog_errno(ret); 1631 } 1632 ocfs2_update_lock_stats(lockres, level, &mw, ret); 1633 1634 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1635 if (!ret && lockres->l_lockdep_map.key != NULL) { 1636 if (level == DLM_LOCK_PR) 1637 rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass, 1638 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1639 caller_ip); 1640 else 1641 rwsem_acquire(&lockres->l_lockdep_map, l_subclass, 1642 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1643 caller_ip); 1644 } 1645 #endif 1646 return ret; 1647 } 1648 1649 static inline int ocfs2_cluster_lock(struct ocfs2_super *osb, 1650 struct ocfs2_lock_res *lockres, 1651 int level, 1652 u32 lkm_flags, 1653 int arg_flags) 1654 { 1655 return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags, 1656 0, _RET_IP_); 1657 } 1658 1659 1660 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 1661 struct ocfs2_lock_res *lockres, 1662 int level, 1663 unsigned long caller_ip) 1664 { 1665 unsigned long flags; 1666 1667 spin_lock_irqsave(&lockres->l_lock, flags); 1668 ocfs2_dec_holders(lockres, level); 1669 ocfs2_downconvert_on_unlock(osb, lockres); 1670 spin_unlock_irqrestore(&lockres->l_lock, flags); 1671 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1672 if (lockres->l_lockdep_map.key != NULL) 1673 rwsem_release(&lockres->l_lockdep_map, 1, caller_ip); 1674 #endif 1675 } 1676 1677 static int ocfs2_create_new_lock(struct ocfs2_super *osb, 1678 struct ocfs2_lock_res *lockres, 1679 int ex, 1680 int local) 1681 { 1682 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1683 unsigned long flags; 1684 u32 lkm_flags = local ? DLM_LKF_LOCAL : 0; 1685 1686 spin_lock_irqsave(&lockres->l_lock, flags); 1687 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1688 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1689 spin_unlock_irqrestore(&lockres->l_lock, flags); 1690 1691 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1692 } 1693 1694 /* Grants us an EX lock on the data and metadata resources, skipping 1695 * the normal cluster directory lookup. Use this ONLY on newly created 1696 * inodes which other nodes can't possibly see, and which haven't been 1697 * hashed in the inode hash yet. This can give us a good performance 1698 * increase as it'll skip the network broadcast normally associated 1699 * with creating a new lock resource. */ 1700 int ocfs2_create_new_inode_locks(struct inode *inode) 1701 { 1702 int ret; 1703 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1704 1705 BUG_ON(!ocfs2_inode_is_new(inode)); 1706 1707 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1708 1709 /* NOTE: That we don't increment any of the holder counts, nor 1710 * do we add anything to a journal handle. Since this is 1711 * supposed to be a new inode which the cluster doesn't know 1712 * about yet, there is no need to. As far as the LVB handling 1713 * is concerned, this is basically like acquiring an EX lock 1714 * on a resource which has an invalid one -- we'll set it 1715 * valid when we release the EX. */ 1716 1717 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1718 if (ret) { 1719 mlog_errno(ret); 1720 goto bail; 1721 } 1722 1723 /* 1724 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they 1725 * don't use a generation in their lock names. 1726 */ 1727 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); 1728 if (ret) { 1729 mlog_errno(ret); 1730 goto bail; 1731 } 1732 1733 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0); 1734 if (ret) 1735 mlog_errno(ret); 1736 1737 bail: 1738 return ret; 1739 } 1740 1741 int ocfs2_rw_lock(struct inode *inode, int write) 1742 { 1743 int status, level; 1744 struct ocfs2_lock_res *lockres; 1745 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1746 1747 mlog(0, "inode %llu take %s RW lock\n", 1748 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1749 write ? "EXMODE" : "PRMODE"); 1750 1751 if (ocfs2_mount_local(osb)) 1752 return 0; 1753 1754 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1755 1756 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1757 1758 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 1759 if (status < 0) 1760 mlog_errno(status); 1761 1762 return status; 1763 } 1764 1765 int ocfs2_try_rw_lock(struct inode *inode, int write) 1766 { 1767 int status, level; 1768 struct ocfs2_lock_res *lockres; 1769 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1770 1771 mlog(0, "inode %llu try to take %s RW lock\n", 1772 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1773 write ? "EXMODE" : "PRMODE"); 1774 1775 if (ocfs2_mount_local(osb)) 1776 return 0; 1777 1778 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1779 1780 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1781 1782 status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0); 1783 return status; 1784 } 1785 1786 void ocfs2_rw_unlock(struct inode *inode, int write) 1787 { 1788 int level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1789 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1790 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1791 1792 mlog(0, "inode %llu drop %s RW lock\n", 1793 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1794 write ? "EXMODE" : "PRMODE"); 1795 1796 if (!ocfs2_mount_local(osb)) 1797 ocfs2_cluster_unlock(osb, lockres, level); 1798 } 1799 1800 /* 1801 * ocfs2_open_lock always get PR mode lock. 1802 */ 1803 int ocfs2_open_lock(struct inode *inode) 1804 { 1805 int status = 0; 1806 struct ocfs2_lock_res *lockres; 1807 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1808 1809 mlog(0, "inode %llu take PRMODE open lock\n", 1810 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1811 1812 if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb)) 1813 goto out; 1814 1815 lockres = &OCFS2_I(inode)->ip_open_lockres; 1816 1817 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_PR, 0, 0); 1818 if (status < 0) 1819 mlog_errno(status); 1820 1821 out: 1822 return status; 1823 } 1824 1825 int ocfs2_try_open_lock(struct inode *inode, int write) 1826 { 1827 int status = 0, level; 1828 struct ocfs2_lock_res *lockres; 1829 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1830 1831 mlog(0, "inode %llu try to take %s open lock\n", 1832 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1833 write ? "EXMODE" : "PRMODE"); 1834 1835 if (ocfs2_is_hard_readonly(osb)) { 1836 if (write) 1837 status = -EROFS; 1838 goto out; 1839 } 1840 1841 if (ocfs2_mount_local(osb)) 1842 goto out; 1843 1844 lockres = &OCFS2_I(inode)->ip_open_lockres; 1845 1846 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1847 1848 /* 1849 * The file system may already holding a PRMODE/EXMODE open lock. 1850 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on 1851 * other nodes and the -EAGAIN will indicate to the caller that 1852 * this inode is still in use. 1853 */ 1854 status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0); 1855 1856 out: 1857 return status; 1858 } 1859 1860 /* 1861 * ocfs2_open_unlock unlock PR and EX mode open locks. 1862 */ 1863 void ocfs2_open_unlock(struct inode *inode) 1864 { 1865 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres; 1866 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1867 1868 mlog(0, "inode %llu drop open lock\n", 1869 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1870 1871 if (ocfs2_mount_local(osb)) 1872 goto out; 1873 1874 if(lockres->l_ro_holders) 1875 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_PR); 1876 if(lockres->l_ex_holders) 1877 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 1878 1879 out: 1880 return; 1881 } 1882 1883 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, 1884 int level) 1885 { 1886 int ret; 1887 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1888 unsigned long flags; 1889 struct ocfs2_mask_waiter mw; 1890 1891 ocfs2_init_mask_waiter(&mw); 1892 1893 retry_cancel: 1894 spin_lock_irqsave(&lockres->l_lock, flags); 1895 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 1896 ret = ocfs2_prepare_cancel_convert(osb, lockres); 1897 if (ret) { 1898 spin_unlock_irqrestore(&lockres->l_lock, flags); 1899 ret = ocfs2_cancel_convert(osb, lockres); 1900 if (ret < 0) { 1901 mlog_errno(ret); 1902 goto out; 1903 } 1904 goto retry_cancel; 1905 } 1906 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1907 spin_unlock_irqrestore(&lockres->l_lock, flags); 1908 1909 ocfs2_wait_for_mask(&mw); 1910 goto retry_cancel; 1911 } 1912 1913 ret = -ERESTARTSYS; 1914 /* 1915 * We may still have gotten the lock, in which case there's no 1916 * point to restarting the syscall. 1917 */ 1918 if (lockres->l_level == level) 1919 ret = 0; 1920 1921 mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret, 1922 lockres->l_flags, lockres->l_level, lockres->l_action); 1923 1924 spin_unlock_irqrestore(&lockres->l_lock, flags); 1925 1926 out: 1927 return ret; 1928 } 1929 1930 /* 1931 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of 1932 * flock() calls. The locking approach this requires is sufficiently 1933 * different from all other cluster lock types that we implement a 1934 * separate path to the "low-level" dlm calls. In particular: 1935 * 1936 * - No optimization of lock levels is done - we take at exactly 1937 * what's been requested. 1938 * 1939 * - No lock caching is employed. We immediately downconvert to 1940 * no-lock at unlock time. This also means flock locks never go on 1941 * the blocking list). 1942 * 1943 * - Since userspace can trivially deadlock itself with flock, we make 1944 * sure to allow cancellation of a misbehaving applications flock() 1945 * request. 1946 * 1947 * - Access to any flock lockres doesn't require concurrency, so we 1948 * can simplify the code by requiring the caller to guarantee 1949 * serialization of dlmglue flock calls. 1950 */ 1951 int ocfs2_file_lock(struct file *file, int ex, int trylock) 1952 { 1953 int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1954 unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0; 1955 unsigned long flags; 1956 struct ocfs2_file_private *fp = file->private_data; 1957 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1958 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1959 struct ocfs2_mask_waiter mw; 1960 1961 ocfs2_init_mask_waiter(&mw); 1962 1963 if ((lockres->l_flags & OCFS2_LOCK_BUSY) || 1964 (lockres->l_level > DLM_LOCK_NL)) { 1965 mlog(ML_ERROR, 1966 "File lock \"%s\" has busy or locked state: flags: 0x%lx, " 1967 "level: %u\n", lockres->l_name, lockres->l_flags, 1968 lockres->l_level); 1969 return -EINVAL; 1970 } 1971 1972 spin_lock_irqsave(&lockres->l_lock, flags); 1973 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1974 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1975 spin_unlock_irqrestore(&lockres->l_lock, flags); 1976 1977 /* 1978 * Get the lock at NLMODE to start - that way we 1979 * can cancel the upconvert request if need be. 1980 */ 1981 ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0); 1982 if (ret < 0) { 1983 mlog_errno(ret); 1984 goto out; 1985 } 1986 1987 ret = ocfs2_wait_for_mask(&mw); 1988 if (ret) { 1989 mlog_errno(ret); 1990 goto out; 1991 } 1992 spin_lock_irqsave(&lockres->l_lock, flags); 1993 } 1994 1995 lockres->l_action = OCFS2_AST_CONVERT; 1996 lkm_flags |= DLM_LKF_CONVERT; 1997 lockres->l_requested = level; 1998 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1999 2000 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 2001 spin_unlock_irqrestore(&lockres->l_lock, flags); 2002 2003 ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags, 2004 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1); 2005 if (ret) { 2006 if (!trylock || (ret != -EAGAIN)) { 2007 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 2008 ret = -EINVAL; 2009 } 2010 2011 ocfs2_recover_from_dlm_error(lockres, 1); 2012 lockres_remove_mask_waiter(lockres, &mw); 2013 goto out; 2014 } 2015 2016 ret = ocfs2_wait_for_mask_interruptible(&mw, lockres); 2017 if (ret == -ERESTARTSYS) { 2018 /* 2019 * Userspace can cause deadlock itself with 2020 * flock(). Current behavior locally is to allow the 2021 * deadlock, but abort the system call if a signal is 2022 * received. We follow this example, otherwise a 2023 * poorly written program could sit in kernel until 2024 * reboot. 2025 * 2026 * Handling this is a bit more complicated for Ocfs2 2027 * though. We can't exit this function with an 2028 * outstanding lock request, so a cancel convert is 2029 * required. We intentionally overwrite 'ret' - if the 2030 * cancel fails and the lock was granted, it's easier 2031 * to just bubble success back up to the user. 2032 */ 2033 ret = ocfs2_flock_handle_signal(lockres, level); 2034 } else if (!ret && (level > lockres->l_level)) { 2035 /* Trylock failed asynchronously */ 2036 BUG_ON(!trylock); 2037 ret = -EAGAIN; 2038 } 2039 2040 out: 2041 2042 mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n", 2043 lockres->l_name, ex, trylock, ret); 2044 return ret; 2045 } 2046 2047 void ocfs2_file_unlock(struct file *file) 2048 { 2049 int ret; 2050 unsigned int gen; 2051 unsigned long flags; 2052 struct ocfs2_file_private *fp = file->private_data; 2053 struct ocfs2_lock_res *lockres = &fp->fp_flock; 2054 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 2055 struct ocfs2_mask_waiter mw; 2056 2057 ocfs2_init_mask_waiter(&mw); 2058 2059 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) 2060 return; 2061 2062 if (lockres->l_level == DLM_LOCK_NL) 2063 return; 2064 2065 mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", 2066 lockres->l_name, lockres->l_flags, lockres->l_level, 2067 lockres->l_action); 2068 2069 spin_lock_irqsave(&lockres->l_lock, flags); 2070 /* 2071 * Fake a blocking ast for the downconvert code. 2072 */ 2073 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 2074 lockres->l_blocking = DLM_LOCK_EX; 2075 2076 gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL); 2077 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 2078 spin_unlock_irqrestore(&lockres->l_lock, flags); 2079 2080 ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen); 2081 if (ret) { 2082 mlog_errno(ret); 2083 return; 2084 } 2085 2086 ret = ocfs2_wait_for_mask(&mw); 2087 if (ret) 2088 mlog_errno(ret); 2089 } 2090 2091 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 2092 struct ocfs2_lock_res *lockres) 2093 { 2094 int kick = 0; 2095 2096 /* If we know that another node is waiting on our lock, kick 2097 * the downconvert thread * pre-emptively when we reach a release 2098 * condition. */ 2099 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 2100 switch(lockres->l_blocking) { 2101 case DLM_LOCK_EX: 2102 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 2103 kick = 1; 2104 break; 2105 case DLM_LOCK_PR: 2106 if (!lockres->l_ex_holders) 2107 kick = 1; 2108 break; 2109 default: 2110 BUG(); 2111 } 2112 } 2113 2114 if (kick) 2115 ocfs2_wake_downconvert_thread(osb); 2116 } 2117 2118 #define OCFS2_SEC_BITS 34 2119 #define OCFS2_SEC_SHIFT (64 - 34) 2120 #define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 2121 2122 /* LVB only has room for 64 bits of time here so we pack it for 2123 * now. */ 2124 static u64 ocfs2_pack_timespec(struct timespec *spec) 2125 { 2126 u64 res; 2127 u64 sec = spec->tv_sec; 2128 u32 nsec = spec->tv_nsec; 2129 2130 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 2131 2132 return res; 2133 } 2134 2135 /* Call this with the lockres locked. I am reasonably sure we don't 2136 * need ip_lock in this function as anyone who would be changing those 2137 * values is supposed to be blocked in ocfs2_inode_lock right now. */ 2138 static void __ocfs2_stuff_meta_lvb(struct inode *inode) 2139 { 2140 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2141 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2142 struct ocfs2_meta_lvb *lvb; 2143 2144 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2145 2146 /* 2147 * Invalidate the LVB of a deleted inode - this way other 2148 * nodes are forced to go to disk and discover the new inode 2149 * status. 2150 */ 2151 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2152 lvb->lvb_version = 0; 2153 goto out; 2154 } 2155 2156 lvb->lvb_version = OCFS2_LVB_VERSION; 2157 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 2158 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 2159 lvb->lvb_iuid = cpu_to_be32(i_uid_read(inode)); 2160 lvb->lvb_igid = cpu_to_be32(i_gid_read(inode)); 2161 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 2162 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 2163 lvb->lvb_iatime_packed = 2164 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime)); 2165 lvb->lvb_ictime_packed = 2166 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime)); 2167 lvb->lvb_imtime_packed = 2168 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime)); 2169 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 2170 lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features); 2171 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 2172 2173 out: 2174 mlog_meta_lvb(0, lockres); 2175 } 2176 2177 static void ocfs2_unpack_timespec(struct timespec *spec, 2178 u64 packed_time) 2179 { 2180 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 2181 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 2182 } 2183 2184 static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 2185 { 2186 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2187 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2188 struct ocfs2_meta_lvb *lvb; 2189 2190 mlog_meta_lvb(0, lockres); 2191 2192 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2193 2194 /* We're safe here without the lockres lock... */ 2195 spin_lock(&oi->ip_lock); 2196 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 2197 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 2198 2199 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 2200 oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures); 2201 ocfs2_set_inode_flags(inode); 2202 2203 /* fast-symlinks are a special case */ 2204 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 2205 inode->i_blocks = 0; 2206 else 2207 inode->i_blocks = ocfs2_inode_sector_count(inode); 2208 2209 i_uid_write(inode, be32_to_cpu(lvb->lvb_iuid)); 2210 i_gid_write(inode, be32_to_cpu(lvb->lvb_igid)); 2211 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 2212 set_nlink(inode, be16_to_cpu(lvb->lvb_inlink)); 2213 ocfs2_unpack_timespec(&inode->i_atime, 2214 be64_to_cpu(lvb->lvb_iatime_packed)); 2215 ocfs2_unpack_timespec(&inode->i_mtime, 2216 be64_to_cpu(lvb->lvb_imtime_packed)); 2217 ocfs2_unpack_timespec(&inode->i_ctime, 2218 be64_to_cpu(lvb->lvb_ictime_packed)); 2219 spin_unlock(&oi->ip_lock); 2220 } 2221 2222 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 2223 struct ocfs2_lock_res *lockres) 2224 { 2225 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2226 2227 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) 2228 && lvb->lvb_version == OCFS2_LVB_VERSION 2229 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 2230 return 1; 2231 return 0; 2232 } 2233 2234 /* Determine whether a lock resource needs to be refreshed, and 2235 * arbitrate who gets to refresh it. 2236 * 2237 * 0 means no refresh needed. 2238 * 2239 * > 0 means you need to refresh this and you MUST call 2240 * ocfs2_complete_lock_res_refresh afterwards. */ 2241 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 2242 { 2243 unsigned long flags; 2244 int status = 0; 2245 2246 refresh_check: 2247 spin_lock_irqsave(&lockres->l_lock, flags); 2248 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 2249 spin_unlock_irqrestore(&lockres->l_lock, flags); 2250 goto bail; 2251 } 2252 2253 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 2254 spin_unlock_irqrestore(&lockres->l_lock, flags); 2255 2256 ocfs2_wait_on_refreshing_lock(lockres); 2257 goto refresh_check; 2258 } 2259 2260 /* Ok, I'll be the one to refresh this lock. */ 2261 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 2262 spin_unlock_irqrestore(&lockres->l_lock, flags); 2263 2264 status = 1; 2265 bail: 2266 mlog(0, "status %d\n", status); 2267 return status; 2268 } 2269 2270 /* If status is non zero, I'll mark it as not being in refresh 2271 * anymroe, but i won't clear the needs refresh flag. */ 2272 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 2273 int status) 2274 { 2275 unsigned long flags; 2276 2277 spin_lock_irqsave(&lockres->l_lock, flags); 2278 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 2279 if (!status) 2280 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 2281 spin_unlock_irqrestore(&lockres->l_lock, flags); 2282 2283 wake_up(&lockres->l_event); 2284 } 2285 2286 /* may or may not return a bh if it went to disk. */ 2287 static int ocfs2_inode_lock_update(struct inode *inode, 2288 struct buffer_head **bh) 2289 { 2290 int status = 0; 2291 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2292 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2293 struct ocfs2_dinode *fe; 2294 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2295 2296 if (ocfs2_mount_local(osb)) 2297 goto bail; 2298 2299 spin_lock(&oi->ip_lock); 2300 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2301 mlog(0, "Orphaned inode %llu was deleted while we " 2302 "were waiting on a lock. ip_flags = 0x%x\n", 2303 (unsigned long long)oi->ip_blkno, oi->ip_flags); 2304 spin_unlock(&oi->ip_lock); 2305 status = -ENOENT; 2306 goto bail; 2307 } 2308 spin_unlock(&oi->ip_lock); 2309 2310 if (!ocfs2_should_refresh_lock_res(lockres)) 2311 goto bail; 2312 2313 /* This will discard any caching information we might have had 2314 * for the inode metadata. */ 2315 ocfs2_metadata_cache_purge(INODE_CACHE(inode)); 2316 2317 ocfs2_extent_map_trunc(inode, 0); 2318 2319 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 2320 mlog(0, "Trusting LVB on inode %llu\n", 2321 (unsigned long long)oi->ip_blkno); 2322 ocfs2_refresh_inode_from_lvb(inode); 2323 } else { 2324 /* Boo, we have to go to disk. */ 2325 /* read bh, cast, ocfs2_refresh_inode */ 2326 status = ocfs2_read_inode_block(inode, bh); 2327 if (status < 0) { 2328 mlog_errno(status); 2329 goto bail_refresh; 2330 } 2331 fe = (struct ocfs2_dinode *) (*bh)->b_data; 2332 2333 /* This is a good chance to make sure we're not 2334 * locking an invalid object. ocfs2_read_inode_block() 2335 * already checked that the inode block is sane. 2336 * 2337 * We bug on a stale inode here because we checked 2338 * above whether it was wiped from disk. The wiping 2339 * node provides a guarantee that we receive that 2340 * message and can mark the inode before dropping any 2341 * locks associated with it. */ 2342 mlog_bug_on_msg(inode->i_generation != 2343 le32_to_cpu(fe->i_generation), 2344 "Invalid dinode %llu disk generation: %u " 2345 "inode->i_generation: %u\n", 2346 (unsigned long long)oi->ip_blkno, 2347 le32_to_cpu(fe->i_generation), 2348 inode->i_generation); 2349 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 2350 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 2351 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 2352 (unsigned long long)oi->ip_blkno, 2353 (unsigned long long)le64_to_cpu(fe->i_dtime), 2354 le32_to_cpu(fe->i_flags)); 2355 2356 ocfs2_refresh_inode(inode, fe); 2357 ocfs2_track_lock_refresh(lockres); 2358 } 2359 2360 status = 0; 2361 bail_refresh: 2362 ocfs2_complete_lock_res_refresh(lockres, status); 2363 bail: 2364 return status; 2365 } 2366 2367 static int ocfs2_assign_bh(struct inode *inode, 2368 struct buffer_head **ret_bh, 2369 struct buffer_head *passed_bh) 2370 { 2371 int status; 2372 2373 if (passed_bh) { 2374 /* Ok, the update went to disk for us, use the 2375 * returned bh. */ 2376 *ret_bh = passed_bh; 2377 get_bh(*ret_bh); 2378 2379 return 0; 2380 } 2381 2382 status = ocfs2_read_inode_block(inode, ret_bh); 2383 if (status < 0) 2384 mlog_errno(status); 2385 2386 return status; 2387 } 2388 2389 /* 2390 * returns < 0 error if the callback will never be called, otherwise 2391 * the result of the lock will be communicated via the callback. 2392 */ 2393 int ocfs2_inode_lock_full_nested(struct inode *inode, 2394 struct buffer_head **ret_bh, 2395 int ex, 2396 int arg_flags, 2397 int subclass) 2398 { 2399 int status, level, acquired; 2400 u32 dlm_flags; 2401 struct ocfs2_lock_res *lockres = NULL; 2402 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2403 struct buffer_head *local_bh = NULL; 2404 2405 mlog(0, "inode %llu, take %s META lock\n", 2406 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2407 ex ? "EXMODE" : "PRMODE"); 2408 2409 status = 0; 2410 acquired = 0; 2411 /* We'll allow faking a readonly metadata lock for 2412 * rodevices. */ 2413 if (ocfs2_is_hard_readonly(osb)) { 2414 if (ex) 2415 status = -EROFS; 2416 goto getbh; 2417 } 2418 2419 if ((arg_flags & OCFS2_META_LOCK_GETBH) || 2420 ocfs2_mount_local(osb)) 2421 goto update; 2422 2423 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2424 ocfs2_wait_for_recovery(osb); 2425 2426 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2427 level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2428 dlm_flags = 0; 2429 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 2430 dlm_flags |= DLM_LKF_NOQUEUE; 2431 2432 status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags, 2433 arg_flags, subclass, _RET_IP_); 2434 if (status < 0) { 2435 if (status != -EAGAIN) 2436 mlog_errno(status); 2437 goto bail; 2438 } 2439 2440 /* Notify the error cleanup path to drop the cluster lock. */ 2441 acquired = 1; 2442 2443 /* We wait twice because a node may have died while we were in 2444 * the lower dlm layers. The second time though, we've 2445 * committed to owning this lock so we don't allow signals to 2446 * abort the operation. */ 2447 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2448 ocfs2_wait_for_recovery(osb); 2449 2450 update: 2451 /* 2452 * We only see this flag if we're being called from 2453 * ocfs2_read_locked_inode(). It means we're locking an inode 2454 * which hasn't been populated yet, so clear the refresh flag 2455 * and let the caller handle it. 2456 */ 2457 if (inode->i_state & I_NEW) { 2458 status = 0; 2459 if (lockres) 2460 ocfs2_complete_lock_res_refresh(lockres, 0); 2461 goto bail; 2462 } 2463 2464 /* This is fun. The caller may want a bh back, or it may 2465 * not. ocfs2_inode_lock_update definitely wants one in, but 2466 * may or may not read one, depending on what's in the 2467 * LVB. The result of all of this is that we've *only* gone to 2468 * disk if we have to, so the complexity is worthwhile. */ 2469 status = ocfs2_inode_lock_update(inode, &local_bh); 2470 if (status < 0) { 2471 if (status != -ENOENT) 2472 mlog_errno(status); 2473 goto bail; 2474 } 2475 getbh: 2476 if (ret_bh) { 2477 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 2478 if (status < 0) { 2479 mlog_errno(status); 2480 goto bail; 2481 } 2482 } 2483 2484 bail: 2485 if (status < 0) { 2486 if (ret_bh && (*ret_bh)) { 2487 brelse(*ret_bh); 2488 *ret_bh = NULL; 2489 } 2490 if (acquired) 2491 ocfs2_inode_unlock(inode, ex); 2492 } 2493 2494 if (local_bh) 2495 brelse(local_bh); 2496 2497 return status; 2498 } 2499 2500 /* 2501 * This is working around a lock inversion between tasks acquiring DLM 2502 * locks while holding a page lock and the downconvert thread which 2503 * blocks dlm lock acquiry while acquiring page locks. 2504 * 2505 * ** These _with_page variantes are only intended to be called from aop 2506 * methods that hold page locks and return a very specific *positive* error 2507 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 2508 * 2509 * The DLM is called such that it returns -EAGAIN if it would have 2510 * blocked waiting for the downconvert thread. In that case we unlock 2511 * our page so the downconvert thread can make progress. Once we've 2512 * done this we have to return AOP_TRUNCATED_PAGE so the aop method 2513 * that called us can bubble that back up into the VFS who will then 2514 * immediately retry the aop call. 2515 */ 2516 int ocfs2_inode_lock_with_page(struct inode *inode, 2517 struct buffer_head **ret_bh, 2518 int ex, 2519 struct page *page) 2520 { 2521 int ret; 2522 2523 ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); 2524 if (ret == -EAGAIN) { 2525 unlock_page(page); 2526 /* 2527 * If we can't get inode lock immediately, we should not return 2528 * directly here, since this will lead to a softlockup problem. 2529 * The method is to get a blocking lock and immediately unlock 2530 * before returning, this can avoid CPU resource waste due to 2531 * lots of retries, and benefits fairness in getting lock. 2532 */ 2533 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0) 2534 ocfs2_inode_unlock(inode, ex); 2535 ret = AOP_TRUNCATED_PAGE; 2536 } 2537 2538 return ret; 2539 } 2540 2541 int ocfs2_inode_lock_atime(struct inode *inode, 2542 struct vfsmount *vfsmnt, 2543 int *level, int wait) 2544 { 2545 int ret; 2546 2547 if (wait) 2548 ret = ocfs2_inode_lock(inode, NULL, 0); 2549 else 2550 ret = ocfs2_try_inode_lock(inode, NULL, 0); 2551 2552 if (ret < 0) { 2553 if (ret != -EAGAIN) 2554 mlog_errno(ret); 2555 return ret; 2556 } 2557 2558 /* 2559 * If we should update atime, we will get EX lock, 2560 * otherwise we just get PR lock. 2561 */ 2562 if (ocfs2_should_update_atime(inode, vfsmnt)) { 2563 struct buffer_head *bh = NULL; 2564 2565 ocfs2_inode_unlock(inode, 0); 2566 if (wait) 2567 ret = ocfs2_inode_lock(inode, &bh, 1); 2568 else 2569 ret = ocfs2_try_inode_lock(inode, &bh, 1); 2570 2571 if (ret < 0) { 2572 if (ret != -EAGAIN) 2573 mlog_errno(ret); 2574 return ret; 2575 } 2576 *level = 1; 2577 if (ocfs2_should_update_atime(inode, vfsmnt)) 2578 ocfs2_update_inode_atime(inode, bh); 2579 if (bh) 2580 brelse(bh); 2581 } else 2582 *level = 0; 2583 2584 return ret; 2585 } 2586 2587 void ocfs2_inode_unlock(struct inode *inode, 2588 int ex) 2589 { 2590 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2591 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; 2592 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2593 2594 mlog(0, "inode %llu drop %s META lock\n", 2595 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2596 ex ? "EXMODE" : "PRMODE"); 2597 2598 if (!ocfs2_is_hard_readonly(osb) && 2599 !ocfs2_mount_local(osb)) 2600 ocfs2_cluster_unlock(osb, lockres, level); 2601 } 2602 2603 /* 2604 * This _tracker variantes are introduced to deal with the recursive cluster 2605 * locking issue. The idea is to keep track of a lock holder on the stack of 2606 * the current process. If there's a lock holder on the stack, we know the 2607 * task context is already protected by cluster locking. Currently, they're 2608 * used in some VFS entry routines. 2609 * 2610 * return < 0 on error, return == 0 if there's no lock holder on the stack 2611 * before this call, return == 1 if this call would be a recursive locking. 2612 * return == -1 if this lock attempt will cause an upgrade which is forbidden. 2613 * 2614 * When taking lock levels into account,we face some different situations. 2615 * 2616 * 1. no lock is held 2617 * In this case, just lock the inode as requested and return 0 2618 * 2619 * 2. We are holding a lock 2620 * For this situation, things diverges into several cases 2621 * 2622 * wanted holding what to do 2623 * ex ex see 2.1 below 2624 * ex pr see 2.2 below 2625 * pr ex see 2.1 below 2626 * pr pr see 2.1 below 2627 * 2628 * 2.1 lock level that is been held is compatible 2629 * with the wanted level, so no lock action will be tacken. 2630 * 2631 * 2.2 Otherwise, an upgrade is needed, but it is forbidden. 2632 * 2633 * Reason why upgrade within a process is forbidden is that 2634 * lock upgrade may cause dead lock. The following illustrates 2635 * how it happens. 2636 * 2637 * thread on node1 thread on node2 2638 * ocfs2_inode_lock_tracker(ex=0) 2639 * 2640 * <====== ocfs2_inode_lock_tracker(ex=1) 2641 * 2642 * ocfs2_inode_lock_tracker(ex=1) 2643 */ 2644 int ocfs2_inode_lock_tracker(struct inode *inode, 2645 struct buffer_head **ret_bh, 2646 int ex, 2647 struct ocfs2_lock_holder *oh) 2648 { 2649 int status = 0; 2650 struct ocfs2_lock_res *lockres; 2651 struct ocfs2_lock_holder *tmp_oh; 2652 struct pid *pid = task_pid(current); 2653 2654 2655 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2656 tmp_oh = ocfs2_pid_holder(lockres, pid); 2657 2658 if (!tmp_oh) { 2659 /* 2660 * This corresponds to the case 1. 2661 * We haven't got any lock before. 2662 */ 2663 status = ocfs2_inode_lock_full(inode, ret_bh, ex, 0); 2664 if (status < 0) { 2665 if (status != -ENOENT) 2666 mlog_errno(status); 2667 return status; 2668 } 2669 2670 oh->oh_ex = ex; 2671 ocfs2_add_holder(lockres, oh); 2672 return 0; 2673 } 2674 2675 if (unlikely(ex && !tmp_oh->oh_ex)) { 2676 /* 2677 * case 2.2 upgrade may cause dead lock, forbid it. 2678 */ 2679 mlog(ML_ERROR, "Recursive locking is not permitted to " 2680 "upgrade to EX level from PR level.\n"); 2681 dump_stack(); 2682 return -EINVAL; 2683 } 2684 2685 /* 2686 * case 2.1 OCFS2_META_LOCK_GETBH flag make ocfs2_inode_lock_full. 2687 * ignore the lock level and just update it. 2688 */ 2689 if (ret_bh) { 2690 status = ocfs2_inode_lock_full(inode, ret_bh, ex, 2691 OCFS2_META_LOCK_GETBH); 2692 if (status < 0) { 2693 if (status != -ENOENT) 2694 mlog_errno(status); 2695 return status; 2696 } 2697 } 2698 return tmp_oh ? 1 : 0; 2699 } 2700 2701 void ocfs2_inode_unlock_tracker(struct inode *inode, 2702 int ex, 2703 struct ocfs2_lock_holder *oh, 2704 int had_lock) 2705 { 2706 struct ocfs2_lock_res *lockres; 2707 2708 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2709 /* had_lock means that the currect process already takes the cluster 2710 * lock previously. 2711 * If had_lock is 1, we have nothing to do here. 2712 * If had_lock is 0, we will release the lock. 2713 */ 2714 if (!had_lock) { 2715 ocfs2_inode_unlock(inode, oh->oh_ex); 2716 ocfs2_remove_holder(lockres, oh); 2717 } 2718 } 2719 2720 int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno) 2721 { 2722 struct ocfs2_lock_res *lockres; 2723 struct ocfs2_orphan_scan_lvb *lvb; 2724 int status = 0; 2725 2726 if (ocfs2_is_hard_readonly(osb)) 2727 return -EROFS; 2728 2729 if (ocfs2_mount_local(osb)) 2730 return 0; 2731 2732 lockres = &osb->osb_orphan_scan.os_lockres; 2733 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2734 if (status < 0) 2735 return status; 2736 2737 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2738 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 2739 lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION) 2740 *seqno = be32_to_cpu(lvb->lvb_os_seqno); 2741 else 2742 *seqno = osb->osb_orphan_scan.os_seqno + 1; 2743 2744 return status; 2745 } 2746 2747 void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno) 2748 { 2749 struct ocfs2_lock_res *lockres; 2750 struct ocfs2_orphan_scan_lvb *lvb; 2751 2752 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) { 2753 lockres = &osb->osb_orphan_scan.os_lockres; 2754 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2755 lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION; 2756 lvb->lvb_os_seqno = cpu_to_be32(seqno); 2757 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2758 } 2759 } 2760 2761 int ocfs2_super_lock(struct ocfs2_super *osb, 2762 int ex) 2763 { 2764 int status = 0; 2765 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2766 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2767 2768 if (ocfs2_is_hard_readonly(osb)) 2769 return -EROFS; 2770 2771 if (ocfs2_mount_local(osb)) 2772 goto bail; 2773 2774 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 2775 if (status < 0) { 2776 mlog_errno(status); 2777 goto bail; 2778 } 2779 2780 /* The super block lock path is really in the best position to 2781 * know when resources covered by the lock need to be 2782 * refreshed, so we do it here. Of course, making sense of 2783 * everything is up to the caller :) */ 2784 status = ocfs2_should_refresh_lock_res(lockres); 2785 if (status) { 2786 status = ocfs2_refresh_slot_info(osb); 2787 2788 ocfs2_complete_lock_res_refresh(lockres, status); 2789 2790 if (status < 0) { 2791 ocfs2_cluster_unlock(osb, lockres, level); 2792 mlog_errno(status); 2793 } 2794 ocfs2_track_lock_refresh(lockres); 2795 } 2796 bail: 2797 return status; 2798 } 2799 2800 void ocfs2_super_unlock(struct ocfs2_super *osb, 2801 int ex) 2802 { 2803 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2804 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2805 2806 if (!ocfs2_mount_local(osb)) 2807 ocfs2_cluster_unlock(osb, lockres, level); 2808 } 2809 2810 int ocfs2_rename_lock(struct ocfs2_super *osb) 2811 { 2812 int status; 2813 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2814 2815 if (ocfs2_is_hard_readonly(osb)) 2816 return -EROFS; 2817 2818 if (ocfs2_mount_local(osb)) 2819 return 0; 2820 2821 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2822 if (status < 0) 2823 mlog_errno(status); 2824 2825 return status; 2826 } 2827 2828 void ocfs2_rename_unlock(struct ocfs2_super *osb) 2829 { 2830 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2831 2832 if (!ocfs2_mount_local(osb)) 2833 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2834 } 2835 2836 int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex) 2837 { 2838 int status; 2839 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2840 2841 if (ocfs2_is_hard_readonly(osb)) 2842 return -EROFS; 2843 2844 if (ocfs2_mount_local(osb)) 2845 return 0; 2846 2847 status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE, 2848 0, 0); 2849 if (status < 0) 2850 mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status); 2851 2852 return status; 2853 } 2854 2855 void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex) 2856 { 2857 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2858 2859 if (!ocfs2_mount_local(osb)) 2860 ocfs2_cluster_unlock(osb, lockres, 2861 ex ? LKM_EXMODE : LKM_PRMODE); 2862 } 2863 2864 int ocfs2_trim_fs_lock(struct ocfs2_super *osb, 2865 struct ocfs2_trim_fs_info *info, int trylock) 2866 { 2867 int status; 2868 struct ocfs2_trim_fs_lvb *lvb; 2869 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 2870 2871 if (info) 2872 info->tf_valid = 0; 2873 2874 if (ocfs2_is_hard_readonly(osb)) 2875 return -EROFS; 2876 2877 if (ocfs2_mount_local(osb)) 2878 return 0; 2879 2880 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 2881 trylock ? DLM_LKF_NOQUEUE : 0, 0); 2882 if (status < 0) { 2883 if (status != -EAGAIN) 2884 mlog_errno(status); 2885 return status; 2886 } 2887 2888 if (info) { 2889 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2890 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 2891 lvb->lvb_version == OCFS2_TRIMFS_LVB_VERSION) { 2892 info->tf_valid = 1; 2893 info->tf_success = lvb->lvb_success; 2894 info->tf_nodenum = be32_to_cpu(lvb->lvb_nodenum); 2895 info->tf_start = be64_to_cpu(lvb->lvb_start); 2896 info->tf_len = be64_to_cpu(lvb->lvb_len); 2897 info->tf_minlen = be64_to_cpu(lvb->lvb_minlen); 2898 info->tf_trimlen = be64_to_cpu(lvb->lvb_trimlen); 2899 } 2900 } 2901 2902 return status; 2903 } 2904 2905 void ocfs2_trim_fs_unlock(struct ocfs2_super *osb, 2906 struct ocfs2_trim_fs_info *info) 2907 { 2908 struct ocfs2_trim_fs_lvb *lvb; 2909 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 2910 2911 if (ocfs2_mount_local(osb)) 2912 return; 2913 2914 if (info) { 2915 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2916 lvb->lvb_version = OCFS2_TRIMFS_LVB_VERSION; 2917 lvb->lvb_success = info->tf_success; 2918 lvb->lvb_nodenum = cpu_to_be32(info->tf_nodenum); 2919 lvb->lvb_start = cpu_to_be64(info->tf_start); 2920 lvb->lvb_len = cpu_to_be64(info->tf_len); 2921 lvb->lvb_minlen = cpu_to_be64(info->tf_minlen); 2922 lvb->lvb_trimlen = cpu_to_be64(info->tf_trimlen); 2923 } 2924 2925 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2926 } 2927 2928 int ocfs2_dentry_lock(struct dentry *dentry, int ex) 2929 { 2930 int ret; 2931 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2932 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2933 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2934 2935 BUG_ON(!dl); 2936 2937 if (ocfs2_is_hard_readonly(osb)) { 2938 if (ex) 2939 return -EROFS; 2940 return 0; 2941 } 2942 2943 if (ocfs2_mount_local(osb)) 2944 return 0; 2945 2946 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 2947 if (ret < 0) 2948 mlog_errno(ret); 2949 2950 return ret; 2951 } 2952 2953 void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 2954 { 2955 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2956 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2957 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2958 2959 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 2960 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 2961 } 2962 2963 /* Reference counting of the dlm debug structure. We want this because 2964 * open references on the debug inodes can live on after a mount, so 2965 * we can't rely on the ocfs2_super to always exist. */ 2966 static void ocfs2_dlm_debug_free(struct kref *kref) 2967 { 2968 struct ocfs2_dlm_debug *dlm_debug; 2969 2970 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 2971 2972 kfree(dlm_debug); 2973 } 2974 2975 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 2976 { 2977 if (dlm_debug) 2978 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 2979 } 2980 2981 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 2982 { 2983 kref_get(&debug->d_refcnt); 2984 } 2985 2986 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 2987 { 2988 struct ocfs2_dlm_debug *dlm_debug; 2989 2990 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 2991 if (!dlm_debug) { 2992 mlog_errno(-ENOMEM); 2993 goto out; 2994 } 2995 2996 kref_init(&dlm_debug->d_refcnt); 2997 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 2998 dlm_debug->d_locking_state = NULL; 2999 out: 3000 return dlm_debug; 3001 } 3002 3003 /* Access to this is arbitrated for us via seq_file->sem. */ 3004 struct ocfs2_dlm_seq_priv { 3005 struct ocfs2_dlm_debug *p_dlm_debug; 3006 struct ocfs2_lock_res p_iter_res; 3007 struct ocfs2_lock_res p_tmp_res; 3008 }; 3009 3010 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 3011 struct ocfs2_dlm_seq_priv *priv) 3012 { 3013 struct ocfs2_lock_res *iter, *ret = NULL; 3014 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 3015 3016 assert_spin_locked(&ocfs2_dlm_tracking_lock); 3017 3018 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 3019 /* discover the head of the list */ 3020 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 3021 mlog(0, "End of list found, %p\n", ret); 3022 break; 3023 } 3024 3025 /* We track our "dummy" iteration lockres' by a NULL 3026 * l_ops field. */ 3027 if (iter->l_ops != NULL) { 3028 ret = iter; 3029 break; 3030 } 3031 } 3032 3033 return ret; 3034 } 3035 3036 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 3037 { 3038 struct ocfs2_dlm_seq_priv *priv = m->private; 3039 struct ocfs2_lock_res *iter; 3040 3041 spin_lock(&ocfs2_dlm_tracking_lock); 3042 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 3043 if (iter) { 3044 /* Since lockres' have the lifetime of their container 3045 * (which can be inodes, ocfs2_supers, etc) we want to 3046 * copy this out to a temporary lockres while still 3047 * under the spinlock. Obviously after this we can't 3048 * trust any pointers on the copy returned, but that's 3049 * ok as the information we want isn't typically held 3050 * in them. */ 3051 priv->p_tmp_res = *iter; 3052 iter = &priv->p_tmp_res; 3053 } 3054 spin_unlock(&ocfs2_dlm_tracking_lock); 3055 3056 return iter; 3057 } 3058 3059 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 3060 { 3061 } 3062 3063 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 3064 { 3065 struct ocfs2_dlm_seq_priv *priv = m->private; 3066 struct ocfs2_lock_res *iter = v; 3067 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 3068 3069 spin_lock(&ocfs2_dlm_tracking_lock); 3070 iter = ocfs2_dlm_next_res(iter, priv); 3071 list_del_init(&dummy->l_debug_list); 3072 if (iter) { 3073 list_add(&dummy->l_debug_list, &iter->l_debug_list); 3074 priv->p_tmp_res = *iter; 3075 iter = &priv->p_tmp_res; 3076 } 3077 spin_unlock(&ocfs2_dlm_tracking_lock); 3078 3079 return iter; 3080 } 3081 3082 /* 3083 * Version is used by debugfs.ocfs2 to determine the format being used 3084 * 3085 * New in version 2 3086 * - Lock stats printed 3087 * New in version 3 3088 * - Max time in lock stats is in usecs (instead of nsecs) 3089 */ 3090 #define OCFS2_DLM_DEBUG_STR_VERSION 3 3091 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 3092 { 3093 int i; 3094 char *lvb; 3095 struct ocfs2_lock_res *lockres = v; 3096 3097 if (!lockres) 3098 return -EINVAL; 3099 3100 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 3101 3102 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 3103 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 3104 lockres->l_name, 3105 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 3106 else 3107 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 3108 3109 seq_printf(m, "%d\t" 3110 "0x%lx\t" 3111 "0x%x\t" 3112 "0x%x\t" 3113 "%u\t" 3114 "%u\t" 3115 "%d\t" 3116 "%d\t", 3117 lockres->l_level, 3118 lockres->l_flags, 3119 lockres->l_action, 3120 lockres->l_unlock_action, 3121 lockres->l_ro_holders, 3122 lockres->l_ex_holders, 3123 lockres->l_requested, 3124 lockres->l_blocking); 3125 3126 /* Dump the raw LVB */ 3127 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 3128 for(i = 0; i < DLM_LVB_LEN; i++) 3129 seq_printf(m, "0x%x\t", lvb[i]); 3130 3131 #ifdef CONFIG_OCFS2_FS_STATS 3132 # define lock_num_prmode(_l) ((_l)->l_lock_prmode.ls_gets) 3133 # define lock_num_exmode(_l) ((_l)->l_lock_exmode.ls_gets) 3134 # define lock_num_prmode_failed(_l) ((_l)->l_lock_prmode.ls_fail) 3135 # define lock_num_exmode_failed(_l) ((_l)->l_lock_exmode.ls_fail) 3136 # define lock_total_prmode(_l) ((_l)->l_lock_prmode.ls_total) 3137 # define lock_total_exmode(_l) ((_l)->l_lock_exmode.ls_total) 3138 # define lock_max_prmode(_l) ((_l)->l_lock_prmode.ls_max) 3139 # define lock_max_exmode(_l) ((_l)->l_lock_exmode.ls_max) 3140 # define lock_refresh(_l) ((_l)->l_lock_refresh) 3141 #else 3142 # define lock_num_prmode(_l) (0) 3143 # define lock_num_exmode(_l) (0) 3144 # define lock_num_prmode_failed(_l) (0) 3145 # define lock_num_exmode_failed(_l) (0) 3146 # define lock_total_prmode(_l) (0ULL) 3147 # define lock_total_exmode(_l) (0ULL) 3148 # define lock_max_prmode(_l) (0) 3149 # define lock_max_exmode(_l) (0) 3150 # define lock_refresh(_l) (0) 3151 #endif 3152 /* The following seq_print was added in version 2 of this output */ 3153 seq_printf(m, "%u\t" 3154 "%u\t" 3155 "%u\t" 3156 "%u\t" 3157 "%llu\t" 3158 "%llu\t" 3159 "%u\t" 3160 "%u\t" 3161 "%u\t", 3162 lock_num_prmode(lockres), 3163 lock_num_exmode(lockres), 3164 lock_num_prmode_failed(lockres), 3165 lock_num_exmode_failed(lockres), 3166 lock_total_prmode(lockres), 3167 lock_total_exmode(lockres), 3168 lock_max_prmode(lockres), 3169 lock_max_exmode(lockres), 3170 lock_refresh(lockres)); 3171 3172 /* End the line */ 3173 seq_printf(m, "\n"); 3174 return 0; 3175 } 3176 3177 static const struct seq_operations ocfs2_dlm_seq_ops = { 3178 .start = ocfs2_dlm_seq_start, 3179 .stop = ocfs2_dlm_seq_stop, 3180 .next = ocfs2_dlm_seq_next, 3181 .show = ocfs2_dlm_seq_show, 3182 }; 3183 3184 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 3185 { 3186 struct seq_file *seq = file->private_data; 3187 struct ocfs2_dlm_seq_priv *priv = seq->private; 3188 struct ocfs2_lock_res *res = &priv->p_iter_res; 3189 3190 ocfs2_remove_lockres_tracking(res); 3191 ocfs2_put_dlm_debug(priv->p_dlm_debug); 3192 return seq_release_private(inode, file); 3193 } 3194 3195 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 3196 { 3197 struct ocfs2_dlm_seq_priv *priv; 3198 struct ocfs2_super *osb; 3199 3200 priv = __seq_open_private(file, &ocfs2_dlm_seq_ops, sizeof(*priv)); 3201 if (!priv) { 3202 mlog_errno(-ENOMEM); 3203 return -ENOMEM; 3204 } 3205 3206 osb = inode->i_private; 3207 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 3208 priv->p_dlm_debug = osb->osb_dlm_debug; 3209 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 3210 3211 ocfs2_add_lockres_tracking(&priv->p_iter_res, 3212 priv->p_dlm_debug); 3213 3214 return 0; 3215 } 3216 3217 static const struct file_operations ocfs2_dlm_debug_fops = { 3218 .open = ocfs2_dlm_debug_open, 3219 .release = ocfs2_dlm_debug_release, 3220 .read = seq_read, 3221 .llseek = seq_lseek, 3222 }; 3223 3224 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 3225 { 3226 int ret = 0; 3227 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 3228 3229 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 3230 S_IFREG|S_IRUSR, 3231 osb->osb_debug_root, 3232 osb, 3233 &ocfs2_dlm_debug_fops); 3234 if (!dlm_debug->d_locking_state) { 3235 ret = -EINVAL; 3236 mlog(ML_ERROR, 3237 "Unable to create locking state debugfs file.\n"); 3238 goto out; 3239 } 3240 3241 ocfs2_get_dlm_debug(dlm_debug); 3242 out: 3243 return ret; 3244 } 3245 3246 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 3247 { 3248 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 3249 3250 if (dlm_debug) { 3251 debugfs_remove(dlm_debug->d_locking_state); 3252 ocfs2_put_dlm_debug(dlm_debug); 3253 } 3254 } 3255 3256 int ocfs2_dlm_init(struct ocfs2_super *osb) 3257 { 3258 int status = 0; 3259 struct ocfs2_cluster_connection *conn = NULL; 3260 3261 if (ocfs2_mount_local(osb)) { 3262 osb->node_num = 0; 3263 goto local; 3264 } 3265 3266 status = ocfs2_dlm_init_debug(osb); 3267 if (status < 0) { 3268 mlog_errno(status); 3269 goto bail; 3270 } 3271 3272 /* launch downconvert thread */ 3273 osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc-%s", 3274 osb->uuid_str); 3275 if (IS_ERR(osb->dc_task)) { 3276 status = PTR_ERR(osb->dc_task); 3277 osb->dc_task = NULL; 3278 mlog_errno(status); 3279 goto bail; 3280 } 3281 3282 /* for now, uuid == domain */ 3283 status = ocfs2_cluster_connect(osb->osb_cluster_stack, 3284 osb->osb_cluster_name, 3285 strlen(osb->osb_cluster_name), 3286 osb->uuid_str, 3287 strlen(osb->uuid_str), 3288 &lproto, ocfs2_do_node_down, osb, 3289 &conn); 3290 if (status) { 3291 mlog_errno(status); 3292 goto bail; 3293 } 3294 3295 status = ocfs2_cluster_this_node(conn, &osb->node_num); 3296 if (status < 0) { 3297 mlog_errno(status); 3298 mlog(ML_ERROR, 3299 "could not find this host's node number\n"); 3300 ocfs2_cluster_disconnect(conn, 0); 3301 goto bail; 3302 } 3303 3304 local: 3305 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 3306 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 3307 ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb); 3308 ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb); 3309 3310 osb->cconn = conn; 3311 bail: 3312 if (status < 0) { 3313 ocfs2_dlm_shutdown_debug(osb); 3314 if (osb->dc_task) 3315 kthread_stop(osb->dc_task); 3316 } 3317 3318 return status; 3319 } 3320 3321 void ocfs2_dlm_shutdown(struct ocfs2_super *osb, 3322 int hangup_pending) 3323 { 3324 ocfs2_drop_osb_locks(osb); 3325 3326 /* 3327 * Now that we have dropped all locks and ocfs2_dismount_volume() 3328 * has disabled recovery, the DLM won't be talking to us. It's 3329 * safe to tear things down before disconnecting the cluster. 3330 */ 3331 3332 if (osb->dc_task) { 3333 kthread_stop(osb->dc_task); 3334 osb->dc_task = NULL; 3335 } 3336 3337 ocfs2_lock_res_free(&osb->osb_super_lockres); 3338 ocfs2_lock_res_free(&osb->osb_rename_lockres); 3339 ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres); 3340 ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres); 3341 3342 ocfs2_cluster_disconnect(osb->cconn, hangup_pending); 3343 osb->cconn = NULL; 3344 3345 ocfs2_dlm_shutdown_debug(osb); 3346 } 3347 3348 static int ocfs2_drop_lock(struct ocfs2_super *osb, 3349 struct ocfs2_lock_res *lockres) 3350 { 3351 int ret; 3352 unsigned long flags; 3353 u32 lkm_flags = 0; 3354 3355 /* We didn't get anywhere near actually using this lockres. */ 3356 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 3357 goto out; 3358 3359 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 3360 lkm_flags |= DLM_LKF_VALBLK; 3361 3362 spin_lock_irqsave(&lockres->l_lock, flags); 3363 3364 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 3365 "lockres %s, flags 0x%lx\n", 3366 lockres->l_name, lockres->l_flags); 3367 3368 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 3369 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 3370 "%u, unlock_action = %u\n", 3371 lockres->l_name, lockres->l_flags, lockres->l_action, 3372 lockres->l_unlock_action); 3373 3374 spin_unlock_irqrestore(&lockres->l_lock, flags); 3375 3376 /* XXX: Today we just wait on any busy 3377 * locks... Perhaps we need to cancel converts in the 3378 * future? */ 3379 ocfs2_wait_on_busy_lock(lockres); 3380 3381 spin_lock_irqsave(&lockres->l_lock, flags); 3382 } 3383 3384 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3385 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 3386 lockres->l_level == DLM_LOCK_EX && 3387 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3388 lockres->l_ops->set_lvb(lockres); 3389 } 3390 3391 if (lockres->l_flags & OCFS2_LOCK_BUSY) 3392 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 3393 lockres->l_name); 3394 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 3395 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 3396 3397 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 3398 spin_unlock_irqrestore(&lockres->l_lock, flags); 3399 goto out; 3400 } 3401 3402 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 3403 3404 /* make sure we never get here while waiting for an ast to 3405 * fire. */ 3406 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 3407 3408 /* is this necessary? */ 3409 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3410 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 3411 spin_unlock_irqrestore(&lockres->l_lock, flags); 3412 3413 mlog(0, "lock %s\n", lockres->l_name); 3414 3415 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags); 3416 if (ret) { 3417 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3418 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 3419 ocfs2_dlm_dump_lksb(&lockres->l_lksb); 3420 BUG(); 3421 } 3422 mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n", 3423 lockres->l_name); 3424 3425 ocfs2_wait_on_busy_lock(lockres); 3426 out: 3427 return 0; 3428 } 3429 3430 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3431 struct ocfs2_lock_res *lockres); 3432 3433 /* Mark the lockres as being dropped. It will no longer be 3434 * queued if blocking, but we still may have to wait on it 3435 * being dequeued from the downconvert thread before we can consider 3436 * it safe to drop. 3437 * 3438 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 3439 void ocfs2_mark_lockres_freeing(struct ocfs2_super *osb, 3440 struct ocfs2_lock_res *lockres) 3441 { 3442 int status; 3443 struct ocfs2_mask_waiter mw; 3444 unsigned long flags, flags2; 3445 3446 ocfs2_init_mask_waiter(&mw); 3447 3448 spin_lock_irqsave(&lockres->l_lock, flags); 3449 lockres->l_flags |= OCFS2_LOCK_FREEING; 3450 if (lockres->l_flags & OCFS2_LOCK_QUEUED && current == osb->dc_task) { 3451 /* 3452 * We know the downconvert is queued but not in progress 3453 * because we are the downconvert thread and processing 3454 * different lock. So we can just remove the lock from the 3455 * queue. This is not only an optimization but also a way 3456 * to avoid the following deadlock: 3457 * ocfs2_dentry_post_unlock() 3458 * ocfs2_dentry_lock_put() 3459 * ocfs2_drop_dentry_lock() 3460 * iput() 3461 * ocfs2_evict_inode() 3462 * ocfs2_clear_inode() 3463 * ocfs2_mark_lockres_freeing() 3464 * ... blocks waiting for OCFS2_LOCK_QUEUED 3465 * since we are the downconvert thread which 3466 * should clear the flag. 3467 */ 3468 spin_unlock_irqrestore(&lockres->l_lock, flags); 3469 spin_lock_irqsave(&osb->dc_task_lock, flags2); 3470 list_del_init(&lockres->l_blocked_list); 3471 osb->blocked_lock_count--; 3472 spin_unlock_irqrestore(&osb->dc_task_lock, flags2); 3473 /* 3474 * Warn if we recurse into another post_unlock call. Strictly 3475 * speaking it isn't a problem but we need to be careful if 3476 * that happens (stack overflow, deadlocks, ...) so warn if 3477 * ocfs2 grows a path for which this can happen. 3478 */ 3479 WARN_ON_ONCE(lockres->l_ops->post_unlock); 3480 /* Since the lock is freeing we don't do much in the fn below */ 3481 ocfs2_process_blocked_lock(osb, lockres); 3482 return; 3483 } 3484 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 3485 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 3486 spin_unlock_irqrestore(&lockres->l_lock, flags); 3487 3488 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 3489 3490 status = ocfs2_wait_for_mask(&mw); 3491 if (status) 3492 mlog_errno(status); 3493 3494 spin_lock_irqsave(&lockres->l_lock, flags); 3495 } 3496 spin_unlock_irqrestore(&lockres->l_lock, flags); 3497 } 3498 3499 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 3500 struct ocfs2_lock_res *lockres) 3501 { 3502 int ret; 3503 3504 ocfs2_mark_lockres_freeing(osb, lockres); 3505 ret = ocfs2_drop_lock(osb, lockres); 3506 if (ret) 3507 mlog_errno(ret); 3508 } 3509 3510 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 3511 { 3512 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 3513 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 3514 ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres); 3515 ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres); 3516 } 3517 3518 int ocfs2_drop_inode_locks(struct inode *inode) 3519 { 3520 int status, err; 3521 3522 /* No need to call ocfs2_mark_lockres_freeing here - 3523 * ocfs2_clear_inode has done it for us. */ 3524 3525 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3526 &OCFS2_I(inode)->ip_open_lockres); 3527 if (err < 0) 3528 mlog_errno(err); 3529 3530 status = err; 3531 3532 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3533 &OCFS2_I(inode)->ip_inode_lockres); 3534 if (err < 0) 3535 mlog_errno(err); 3536 if (err < 0 && !status) 3537 status = err; 3538 3539 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3540 &OCFS2_I(inode)->ip_rw_lockres); 3541 if (err < 0) 3542 mlog_errno(err); 3543 if (err < 0 && !status) 3544 status = err; 3545 3546 return status; 3547 } 3548 3549 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 3550 int new_level) 3551 { 3552 assert_spin_locked(&lockres->l_lock); 3553 3554 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 3555 3556 if (lockres->l_level <= new_level) { 3557 mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, " 3558 "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, " 3559 "block %d, pgen %d\n", lockres->l_name, lockres->l_level, 3560 new_level, list_empty(&lockres->l_blocked_list), 3561 list_empty(&lockres->l_mask_waiters), lockres->l_type, 3562 lockres->l_flags, lockres->l_ro_holders, 3563 lockres->l_ex_holders, lockres->l_action, 3564 lockres->l_unlock_action, lockres->l_requested, 3565 lockres->l_blocking, lockres->l_pending_gen); 3566 BUG(); 3567 } 3568 3569 mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n", 3570 lockres->l_name, lockres->l_level, new_level, lockres->l_blocking); 3571 3572 lockres->l_action = OCFS2_AST_DOWNCONVERT; 3573 lockres->l_requested = new_level; 3574 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3575 return lockres_set_pending(lockres); 3576 } 3577 3578 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 3579 struct ocfs2_lock_res *lockres, 3580 int new_level, 3581 int lvb, 3582 unsigned int generation) 3583 { 3584 int ret; 3585 u32 dlm_flags = DLM_LKF_CONVERT; 3586 3587 mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name, 3588 lockres->l_level, new_level); 3589 3590 /* 3591 * On DLM_LKF_VALBLK, fsdlm behaves differently with o2cb. It always 3592 * expects DLM_LKF_VALBLK being set if the LKB has LVB, so that 3593 * we can recover correctly from node failure. Otherwise, we may get 3594 * invalid LVB in LKB, but without DLM_SBF_VALNOTVALID being set. 3595 */ 3596 if (!ocfs2_is_o2cb_active() && 3597 lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 3598 lvb = 1; 3599 3600 if (lvb) 3601 dlm_flags |= DLM_LKF_VALBLK; 3602 3603 ret = ocfs2_dlm_lock(osb->cconn, 3604 new_level, 3605 &lockres->l_lksb, 3606 dlm_flags, 3607 lockres->l_name, 3608 OCFS2_LOCK_ID_MAX_LEN - 1); 3609 lockres_clear_pending(lockres, generation, osb); 3610 if (ret) { 3611 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 3612 ocfs2_recover_from_dlm_error(lockres, 1); 3613 goto bail; 3614 } 3615 3616 ret = 0; 3617 bail: 3618 return ret; 3619 } 3620 3621 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */ 3622 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 3623 struct ocfs2_lock_res *lockres) 3624 { 3625 assert_spin_locked(&lockres->l_lock); 3626 3627 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 3628 /* If we're already trying to cancel a lock conversion 3629 * then just drop the spinlock and allow the caller to 3630 * requeue this lock. */ 3631 mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name); 3632 return 0; 3633 } 3634 3635 /* were we in a convert when we got the bast fire? */ 3636 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 3637 lockres->l_action != OCFS2_AST_DOWNCONVERT); 3638 /* set things up for the unlockast to know to just 3639 * clear out the ast_action and unset busy, etc. */ 3640 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 3641 3642 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 3643 "lock %s, invalid flags: 0x%lx\n", 3644 lockres->l_name, lockres->l_flags); 3645 3646 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3647 3648 return 1; 3649 } 3650 3651 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 3652 struct ocfs2_lock_res *lockres) 3653 { 3654 int ret; 3655 3656 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, 3657 DLM_LKF_CANCEL); 3658 if (ret) { 3659 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3660 ocfs2_recover_from_dlm_error(lockres, 0); 3661 } 3662 3663 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3664 3665 return ret; 3666 } 3667 3668 static int ocfs2_unblock_lock(struct ocfs2_super *osb, 3669 struct ocfs2_lock_res *lockres, 3670 struct ocfs2_unblock_ctl *ctl) 3671 { 3672 unsigned long flags; 3673 int blocking; 3674 int new_level; 3675 int level; 3676 int ret = 0; 3677 int set_lvb = 0; 3678 unsigned int gen; 3679 3680 spin_lock_irqsave(&lockres->l_lock, flags); 3681 3682 recheck: 3683 /* 3684 * Is it still blocking? If not, we have no more work to do. 3685 */ 3686 if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) { 3687 BUG_ON(lockres->l_blocking != DLM_LOCK_NL); 3688 spin_unlock_irqrestore(&lockres->l_lock, flags); 3689 ret = 0; 3690 goto leave; 3691 } 3692 3693 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3694 /* XXX 3695 * This is a *big* race. The OCFS2_LOCK_PENDING flag 3696 * exists entirely for one reason - another thread has set 3697 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock(). 3698 * 3699 * If we do ocfs2_cancel_convert() before the other thread 3700 * calls dlm_lock(), our cancel will do nothing. We will 3701 * get no ast, and we will have no way of knowing the 3702 * cancel failed. Meanwhile, the other thread will call 3703 * into dlm_lock() and wait...forever. 3704 * 3705 * Why forever? Because another node has asked for the 3706 * lock first; that's why we're here in unblock_lock(). 3707 * 3708 * The solution is OCFS2_LOCK_PENDING. When PENDING is 3709 * set, we just requeue the unblock. Only when the other 3710 * thread has called dlm_lock() and cleared PENDING will 3711 * we then cancel their request. 3712 * 3713 * All callers of dlm_lock() must set OCFS2_DLM_PENDING 3714 * at the same time they set OCFS2_DLM_BUSY. They must 3715 * clear OCFS2_DLM_PENDING after dlm_lock() returns. 3716 */ 3717 if (lockres->l_flags & OCFS2_LOCK_PENDING) { 3718 mlog(ML_BASTS, "lockres %s, ReQ: Pending\n", 3719 lockres->l_name); 3720 goto leave_requeue; 3721 } 3722 3723 ctl->requeue = 1; 3724 ret = ocfs2_prepare_cancel_convert(osb, lockres); 3725 spin_unlock_irqrestore(&lockres->l_lock, flags); 3726 if (ret) { 3727 ret = ocfs2_cancel_convert(osb, lockres); 3728 if (ret < 0) 3729 mlog_errno(ret); 3730 } 3731 goto leave; 3732 } 3733 3734 /* 3735 * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is 3736 * set when the ast is received for an upconvert just before the 3737 * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast 3738 * on the heels of the ast, we want to delay the downconvert just 3739 * enough to allow the up requestor to do its task. Because this 3740 * lock is in the blocked queue, the lock will be downconverted 3741 * as soon as the requestor is done with the lock. 3742 */ 3743 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) 3744 goto leave_requeue; 3745 3746 /* 3747 * How can we block and yet be at NL? We were trying to upconvert 3748 * from NL and got canceled. The code comes back here, and now 3749 * we notice and clear BLOCKING. 3750 */ 3751 if (lockres->l_level == DLM_LOCK_NL) { 3752 BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders); 3753 mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name); 3754 lockres->l_blocking = DLM_LOCK_NL; 3755 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 3756 spin_unlock_irqrestore(&lockres->l_lock, flags); 3757 goto leave; 3758 } 3759 3760 /* if we're blocking an exclusive and we have *any* holders, 3761 * then requeue. */ 3762 if ((lockres->l_blocking == DLM_LOCK_EX) 3763 && (lockres->l_ex_holders || lockres->l_ro_holders)) { 3764 mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n", 3765 lockres->l_name, lockres->l_ex_holders, 3766 lockres->l_ro_holders); 3767 goto leave_requeue; 3768 } 3769 3770 /* If it's a PR we're blocking, then only 3771 * requeue if we've got any EX holders */ 3772 if (lockres->l_blocking == DLM_LOCK_PR && 3773 lockres->l_ex_holders) { 3774 mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n", 3775 lockres->l_name, lockres->l_ex_holders); 3776 goto leave_requeue; 3777 } 3778 3779 /* 3780 * Can we get a lock in this state if the holder counts are 3781 * zero? The meta data unblock code used to check this. 3782 */ 3783 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 3784 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) { 3785 mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n", 3786 lockres->l_name); 3787 goto leave_requeue; 3788 } 3789 3790 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 3791 3792 if (lockres->l_ops->check_downconvert 3793 && !lockres->l_ops->check_downconvert(lockres, new_level)) { 3794 mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n", 3795 lockres->l_name); 3796 goto leave_requeue; 3797 } 3798 3799 /* If we get here, then we know that there are no more 3800 * incompatible holders (and anyone asking for an incompatible 3801 * lock is blocked). We can now downconvert the lock */ 3802 if (!lockres->l_ops->downconvert_worker) 3803 goto downconvert; 3804 3805 /* Some lockres types want to do a bit of work before 3806 * downconverting a lock. Allow that here. The worker function 3807 * may sleep, so we save off a copy of what we're blocking as 3808 * it may change while we're not holding the spin lock. */ 3809 blocking = lockres->l_blocking; 3810 level = lockres->l_level; 3811 spin_unlock_irqrestore(&lockres->l_lock, flags); 3812 3813 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 3814 3815 if (ctl->unblock_action == UNBLOCK_STOP_POST) { 3816 mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n", 3817 lockres->l_name); 3818 goto leave; 3819 } 3820 3821 spin_lock_irqsave(&lockres->l_lock, flags); 3822 if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) { 3823 /* If this changed underneath us, then we can't drop 3824 * it just yet. */ 3825 mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, " 3826 "Recheck\n", lockres->l_name, blocking, 3827 lockres->l_blocking, level, lockres->l_level); 3828 goto recheck; 3829 } 3830 3831 downconvert: 3832 ctl->requeue = 0; 3833 3834 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3835 if (lockres->l_level == DLM_LOCK_EX) 3836 set_lvb = 1; 3837 3838 /* 3839 * We only set the lvb if the lock has been fully 3840 * refreshed - otherwise we risk setting stale 3841 * data. Otherwise, there's no need to actually clear 3842 * out the lvb here as it's value is still valid. 3843 */ 3844 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3845 lockres->l_ops->set_lvb(lockres); 3846 } 3847 3848 gen = ocfs2_prepare_downconvert(lockres, new_level); 3849 spin_unlock_irqrestore(&lockres->l_lock, flags); 3850 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb, 3851 gen); 3852 3853 leave: 3854 if (ret) 3855 mlog_errno(ret); 3856 return ret; 3857 3858 leave_requeue: 3859 spin_unlock_irqrestore(&lockres->l_lock, flags); 3860 ctl->requeue = 1; 3861 3862 return 0; 3863 } 3864 3865 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 3866 int blocking) 3867 { 3868 struct inode *inode; 3869 struct address_space *mapping; 3870 struct ocfs2_inode_info *oi; 3871 3872 inode = ocfs2_lock_res_inode(lockres); 3873 mapping = inode->i_mapping; 3874 3875 if (S_ISDIR(inode->i_mode)) { 3876 oi = OCFS2_I(inode); 3877 oi->ip_dir_lock_gen++; 3878 mlog(0, "generation: %u\n", oi->ip_dir_lock_gen); 3879 goto out; 3880 } 3881 3882 if (!S_ISREG(inode->i_mode)) 3883 goto out; 3884 3885 /* 3886 * We need this before the filemap_fdatawrite() so that it can 3887 * transfer the dirty bit from the PTE to the 3888 * page. Unfortunately this means that even for EX->PR 3889 * downconverts, we'll lose our mappings and have to build 3890 * them up again. 3891 */ 3892 unmap_mapping_range(mapping, 0, 0, 0); 3893 3894 if (filemap_fdatawrite(mapping)) { 3895 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 3896 (unsigned long long)OCFS2_I(inode)->ip_blkno); 3897 } 3898 sync_mapping_buffers(mapping); 3899 if (blocking == DLM_LOCK_EX) { 3900 truncate_inode_pages(mapping, 0); 3901 } else { 3902 /* We only need to wait on the I/O if we're not also 3903 * truncating pages because truncate_inode_pages waits 3904 * for us above. We don't truncate pages if we're 3905 * blocking anything < EXMODE because we want to keep 3906 * them around in that case. */ 3907 filemap_fdatawait(mapping); 3908 } 3909 3910 forget_all_cached_acls(inode); 3911 3912 out: 3913 return UNBLOCK_CONTINUE; 3914 } 3915 3916 static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci, 3917 struct ocfs2_lock_res *lockres, 3918 int new_level) 3919 { 3920 int checkpointed = ocfs2_ci_fully_checkpointed(ci); 3921 3922 BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR); 3923 BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed); 3924 3925 if (checkpointed) 3926 return 1; 3927 3928 ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci))); 3929 return 0; 3930 } 3931 3932 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 3933 int new_level) 3934 { 3935 struct inode *inode = ocfs2_lock_res_inode(lockres); 3936 3937 return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level); 3938 } 3939 3940 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 3941 { 3942 struct inode *inode = ocfs2_lock_res_inode(lockres); 3943 3944 __ocfs2_stuff_meta_lvb(inode); 3945 } 3946 3947 /* 3948 * Does the final reference drop on our dentry lock. Right now this 3949 * happens in the downconvert thread, but we could choose to simplify the 3950 * dlmglue API and push these off to the ocfs2_wq in the future. 3951 */ 3952 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 3953 struct ocfs2_lock_res *lockres) 3954 { 3955 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3956 ocfs2_dentry_lock_put(osb, dl); 3957 } 3958 3959 /* 3960 * d_delete() matching dentries before the lock downconvert. 3961 * 3962 * At this point, any process waiting to destroy the 3963 * dentry_lock due to last ref count is stopped by the 3964 * OCFS2_LOCK_QUEUED flag. 3965 * 3966 * We have two potential problems 3967 * 3968 * 1) If we do the last reference drop on our dentry_lock (via dput) 3969 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 3970 * the downconvert to finish. Instead we take an elevated 3971 * reference and push the drop until after we've completed our 3972 * unblock processing. 3973 * 3974 * 2) There might be another process with a final reference, 3975 * waiting on us to finish processing. If this is the case, we 3976 * detect it and exit out - there's no more dentries anyway. 3977 */ 3978 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 3979 int blocking) 3980 { 3981 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3982 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 3983 struct dentry *dentry; 3984 unsigned long flags; 3985 int extra_ref = 0; 3986 3987 /* 3988 * This node is blocking another node from getting a read 3989 * lock. This happens when we've renamed within a 3990 * directory. We've forced the other nodes to d_delete(), but 3991 * we never actually dropped our lock because it's still 3992 * valid. The downconvert code will retain a PR for this node, 3993 * so there's no further work to do. 3994 */ 3995 if (blocking == DLM_LOCK_PR) 3996 return UNBLOCK_CONTINUE; 3997 3998 /* 3999 * Mark this inode as potentially orphaned. The code in 4000 * ocfs2_delete_inode() will figure out whether it actually 4001 * needs to be freed or not. 4002 */ 4003 spin_lock(&oi->ip_lock); 4004 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 4005 spin_unlock(&oi->ip_lock); 4006 4007 /* 4008 * Yuck. We need to make sure however that the check of 4009 * OCFS2_LOCK_FREEING and the extra reference are atomic with 4010 * respect to a reference decrement or the setting of that 4011 * flag. 4012 */ 4013 spin_lock_irqsave(&lockres->l_lock, flags); 4014 spin_lock(&dentry_attach_lock); 4015 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 4016 && dl->dl_count) { 4017 dl->dl_count++; 4018 extra_ref = 1; 4019 } 4020 spin_unlock(&dentry_attach_lock); 4021 spin_unlock_irqrestore(&lockres->l_lock, flags); 4022 4023 mlog(0, "extra_ref = %d\n", extra_ref); 4024 4025 /* 4026 * We have a process waiting on us in ocfs2_dentry_iput(), 4027 * which means we can't have any more outstanding 4028 * aliases. There's no need to do any more work. 4029 */ 4030 if (!extra_ref) 4031 return UNBLOCK_CONTINUE; 4032 4033 spin_lock(&dentry_attach_lock); 4034 while (1) { 4035 dentry = ocfs2_find_local_alias(dl->dl_inode, 4036 dl->dl_parent_blkno, 1); 4037 if (!dentry) 4038 break; 4039 spin_unlock(&dentry_attach_lock); 4040 4041 if (S_ISDIR(dl->dl_inode->i_mode)) 4042 shrink_dcache_parent(dentry); 4043 4044 mlog(0, "d_delete(%pd);\n", dentry); 4045 4046 /* 4047 * The following dcache calls may do an 4048 * iput(). Normally we don't want that from the 4049 * downconverting thread, but in this case it's ok 4050 * because the requesting node already has an 4051 * exclusive lock on the inode, so it can't be queued 4052 * for a downconvert. 4053 */ 4054 d_delete(dentry); 4055 dput(dentry); 4056 4057 spin_lock(&dentry_attach_lock); 4058 } 4059 spin_unlock(&dentry_attach_lock); 4060 4061 /* 4062 * If we are the last holder of this dentry lock, there is no 4063 * reason to downconvert so skip straight to the unlock. 4064 */ 4065 if (dl->dl_count == 1) 4066 return UNBLOCK_STOP_POST; 4067 4068 return UNBLOCK_CONTINUE_POST; 4069 } 4070 4071 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 4072 int new_level) 4073 { 4074 struct ocfs2_refcount_tree *tree = 4075 ocfs2_lock_res_refcount_tree(lockres); 4076 4077 return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level); 4078 } 4079 4080 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 4081 int blocking) 4082 { 4083 struct ocfs2_refcount_tree *tree = 4084 ocfs2_lock_res_refcount_tree(lockres); 4085 4086 ocfs2_metadata_cache_purge(&tree->rf_ci); 4087 4088 return UNBLOCK_CONTINUE; 4089 } 4090 4091 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres) 4092 { 4093 struct ocfs2_qinfo_lvb *lvb; 4094 struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres); 4095 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 4096 oinfo->dqi_gi.dqi_type); 4097 4098 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 4099 lvb->lvb_version = OCFS2_QINFO_LVB_VERSION; 4100 lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace); 4101 lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace); 4102 lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms); 4103 lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks); 4104 lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk); 4105 lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry); 4106 } 4107 4108 void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex) 4109 { 4110 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 4111 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 4112 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4113 4114 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 4115 ocfs2_cluster_unlock(osb, lockres, level); 4116 } 4117 4118 static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo) 4119 { 4120 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 4121 oinfo->dqi_gi.dqi_type); 4122 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 4123 struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 4124 struct buffer_head *bh = NULL; 4125 struct ocfs2_global_disk_dqinfo *gdinfo; 4126 int status = 0; 4127 4128 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 4129 lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) { 4130 info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace); 4131 info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace); 4132 oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms); 4133 oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks); 4134 oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk); 4135 oinfo->dqi_gi.dqi_free_entry = 4136 be32_to_cpu(lvb->lvb_free_entry); 4137 } else { 4138 status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode, 4139 oinfo->dqi_giblk, &bh); 4140 if (status) { 4141 mlog_errno(status); 4142 goto bail; 4143 } 4144 gdinfo = (struct ocfs2_global_disk_dqinfo *) 4145 (bh->b_data + OCFS2_GLOBAL_INFO_OFF); 4146 info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace); 4147 info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace); 4148 oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms); 4149 oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks); 4150 oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk); 4151 oinfo->dqi_gi.dqi_free_entry = 4152 le32_to_cpu(gdinfo->dqi_free_entry); 4153 brelse(bh); 4154 ocfs2_track_lock_refresh(lockres); 4155 } 4156 4157 bail: 4158 return status; 4159 } 4160 4161 /* Lock quota info, this function expects at least shared lock on the quota file 4162 * so that we can safely refresh quota info from disk. */ 4163 int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex) 4164 { 4165 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 4166 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 4167 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4168 int status = 0; 4169 4170 /* On RO devices, locking really isn't needed... */ 4171 if (ocfs2_is_hard_readonly(osb)) { 4172 if (ex) 4173 status = -EROFS; 4174 goto bail; 4175 } 4176 if (ocfs2_mount_local(osb)) 4177 goto bail; 4178 4179 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 4180 if (status < 0) { 4181 mlog_errno(status); 4182 goto bail; 4183 } 4184 if (!ocfs2_should_refresh_lock_res(lockres)) 4185 goto bail; 4186 /* OK, we have the lock but we need to refresh the quota info */ 4187 status = ocfs2_refresh_qinfo(oinfo); 4188 if (status) 4189 ocfs2_qinfo_unlock(oinfo, ex); 4190 ocfs2_complete_lock_res_refresh(lockres, status); 4191 bail: 4192 return status; 4193 } 4194 4195 int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex) 4196 { 4197 int status; 4198 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4199 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 4200 struct ocfs2_super *osb = lockres->l_priv; 4201 4202 4203 if (ocfs2_is_hard_readonly(osb)) 4204 return -EROFS; 4205 4206 if (ocfs2_mount_local(osb)) 4207 return 0; 4208 4209 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 4210 if (status < 0) 4211 mlog_errno(status); 4212 4213 return status; 4214 } 4215 4216 void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex) 4217 { 4218 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4219 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 4220 struct ocfs2_super *osb = lockres->l_priv; 4221 4222 if (!ocfs2_mount_local(osb)) 4223 ocfs2_cluster_unlock(osb, lockres, level); 4224 } 4225 4226 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 4227 struct ocfs2_lock_res *lockres) 4228 { 4229 int status; 4230 struct ocfs2_unblock_ctl ctl = {0, 0,}; 4231 unsigned long flags; 4232 4233 /* Our reference to the lockres in this function can be 4234 * considered valid until we remove the OCFS2_LOCK_QUEUED 4235 * flag. */ 4236 4237 BUG_ON(!lockres); 4238 BUG_ON(!lockres->l_ops); 4239 4240 mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name); 4241 4242 /* Detect whether a lock has been marked as going away while 4243 * the downconvert thread was processing other things. A lock can 4244 * still be marked with OCFS2_LOCK_FREEING after this check, 4245 * but short circuiting here will still save us some 4246 * performance. */ 4247 spin_lock_irqsave(&lockres->l_lock, flags); 4248 if (lockres->l_flags & OCFS2_LOCK_FREEING) 4249 goto unqueue; 4250 spin_unlock_irqrestore(&lockres->l_lock, flags); 4251 4252 status = ocfs2_unblock_lock(osb, lockres, &ctl); 4253 if (status < 0) 4254 mlog_errno(status); 4255 4256 spin_lock_irqsave(&lockres->l_lock, flags); 4257 unqueue: 4258 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 4259 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 4260 } else 4261 ocfs2_schedule_blocked_lock(osb, lockres); 4262 4263 mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name, 4264 ctl.requeue ? "yes" : "no"); 4265 spin_unlock_irqrestore(&lockres->l_lock, flags); 4266 4267 if (ctl.unblock_action != UNBLOCK_CONTINUE 4268 && lockres->l_ops->post_unlock) 4269 lockres->l_ops->post_unlock(osb, lockres); 4270 } 4271 4272 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 4273 struct ocfs2_lock_res *lockres) 4274 { 4275 unsigned long flags; 4276 4277 assert_spin_locked(&lockres->l_lock); 4278 4279 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 4280 /* Do not schedule a lock for downconvert when it's on 4281 * the way to destruction - any nodes wanting access 4282 * to the resource will get it soon. */ 4283 mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n", 4284 lockres->l_name, lockres->l_flags); 4285 return; 4286 } 4287 4288 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 4289 4290 spin_lock_irqsave(&osb->dc_task_lock, flags); 4291 if (list_empty(&lockres->l_blocked_list)) { 4292 list_add_tail(&lockres->l_blocked_list, 4293 &osb->blocked_lock_list); 4294 osb->blocked_lock_count++; 4295 } 4296 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4297 } 4298 4299 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) 4300 { 4301 unsigned long processed; 4302 unsigned long flags; 4303 struct ocfs2_lock_res *lockres; 4304 4305 spin_lock_irqsave(&osb->dc_task_lock, flags); 4306 /* grab this early so we know to try again if a state change and 4307 * wake happens part-way through our work */ 4308 osb->dc_work_sequence = osb->dc_wake_sequence; 4309 4310 processed = osb->blocked_lock_count; 4311 /* 4312 * blocked lock processing in this loop might call iput which can 4313 * remove items off osb->blocked_lock_list. Downconvert up to 4314 * 'processed' number of locks, but stop short if we had some 4315 * removed in ocfs2_mark_lockres_freeing when downconverting. 4316 */ 4317 while (processed && !list_empty(&osb->blocked_lock_list)) { 4318 lockres = list_entry(osb->blocked_lock_list.next, 4319 struct ocfs2_lock_res, l_blocked_list); 4320 list_del_init(&lockres->l_blocked_list); 4321 osb->blocked_lock_count--; 4322 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4323 4324 BUG_ON(!processed); 4325 processed--; 4326 4327 ocfs2_process_blocked_lock(osb, lockres); 4328 4329 spin_lock_irqsave(&osb->dc_task_lock, flags); 4330 } 4331 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4332 } 4333 4334 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) 4335 { 4336 int empty = 0; 4337 unsigned long flags; 4338 4339 spin_lock_irqsave(&osb->dc_task_lock, flags); 4340 if (list_empty(&osb->blocked_lock_list)) 4341 empty = 1; 4342 4343 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4344 return empty; 4345 } 4346 4347 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) 4348 { 4349 int should_wake = 0; 4350 unsigned long flags; 4351 4352 spin_lock_irqsave(&osb->dc_task_lock, flags); 4353 if (osb->dc_work_sequence != osb->dc_wake_sequence) 4354 should_wake = 1; 4355 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4356 4357 return should_wake; 4358 } 4359 4360 static int ocfs2_downconvert_thread(void *arg) 4361 { 4362 int status = 0; 4363 struct ocfs2_super *osb = arg; 4364 4365 /* only quit once we've been asked to stop and there is no more 4366 * work available */ 4367 while (!(kthread_should_stop() && 4368 ocfs2_downconvert_thread_lists_empty(osb))) { 4369 4370 wait_event_interruptible(osb->dc_event, 4371 ocfs2_downconvert_thread_should_wake(osb) || 4372 kthread_should_stop()); 4373 4374 mlog(0, "downconvert_thread: awoken\n"); 4375 4376 ocfs2_downconvert_thread_do_work(osb); 4377 } 4378 4379 osb->dc_task = NULL; 4380 return status; 4381 } 4382 4383 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) 4384 { 4385 unsigned long flags; 4386 4387 spin_lock_irqsave(&osb->dc_task_lock, flags); 4388 /* make sure the voting thread gets a swipe at whatever changes 4389 * the caller may have made to the voting state */ 4390 osb->dc_wake_sequence++; 4391 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4392 wake_up(&osb->dc_event); 4393 } 4394