1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26 #include <linux/types.h> 27 #include <linux/slab.h> 28 #include <linux/highmem.h> 29 #include <linux/mm.h> 30 #include <linux/kthread.h> 31 #include <linux/pagemap.h> 32 #include <linux/debugfs.h> 33 #include <linux/seq_file.h> 34 #include <linux/time.h> 35 #include <linux/quotaops.h> 36 #include <linux/sched/signal.h> 37 38 #define MLOG_MASK_PREFIX ML_DLM_GLUE 39 #include <cluster/masklog.h> 40 41 #include "ocfs2.h" 42 #include "ocfs2_lockingver.h" 43 44 #include "alloc.h" 45 #include "dcache.h" 46 #include "dlmglue.h" 47 #include "extent_map.h" 48 #include "file.h" 49 #include "heartbeat.h" 50 #include "inode.h" 51 #include "journal.h" 52 #include "stackglue.h" 53 #include "slot_map.h" 54 #include "super.h" 55 #include "uptodate.h" 56 #include "quota.h" 57 #include "refcounttree.h" 58 #include "acl.h" 59 60 #include "buffer_head_io.h" 61 62 struct ocfs2_mask_waiter { 63 struct list_head mw_item; 64 int mw_status; 65 struct completion mw_complete; 66 unsigned long mw_mask; 67 unsigned long mw_goal; 68 #ifdef CONFIG_OCFS2_FS_STATS 69 ktime_t mw_lock_start; 70 #endif 71 }; 72 73 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 74 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 75 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres); 76 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres); 77 78 /* 79 * Return value from ->downconvert_worker functions. 80 * 81 * These control the precise actions of ocfs2_unblock_lock() 82 * and ocfs2_process_blocked_lock() 83 * 84 */ 85 enum ocfs2_unblock_action { 86 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 87 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 88 * ->post_unlock callback */ 89 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 90 * ->post_unlock() callback. */ 91 }; 92 93 struct ocfs2_unblock_ctl { 94 int requeue; 95 enum ocfs2_unblock_action unblock_action; 96 }; 97 98 /* Lockdep class keys */ 99 struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES]; 100 101 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 102 int new_level); 103 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 104 105 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 106 int blocking); 107 108 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 109 int blocking); 110 111 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 112 struct ocfs2_lock_res *lockres); 113 114 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres); 115 116 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 117 int new_level); 118 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 119 int blocking); 120 121 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres) 122 123 /* This aids in debugging situations where a bad LVB might be involved. */ 124 static void ocfs2_dump_meta_lvb_info(u64 level, 125 const char *function, 126 unsigned int line, 127 struct ocfs2_lock_res *lockres) 128 { 129 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 130 131 mlog(level, "LVB information for %s (called from %s:%u):\n", 132 lockres->l_name, function, line); 133 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 134 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 135 be32_to_cpu(lvb->lvb_igeneration)); 136 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 137 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 138 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 139 be16_to_cpu(lvb->lvb_imode)); 140 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 141 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 142 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 143 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 144 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 145 be32_to_cpu(lvb->lvb_iattr)); 146 } 147 148 149 /* 150 * OCFS2 Lock Resource Operations 151 * 152 * These fine tune the behavior of the generic dlmglue locking infrastructure. 153 * 154 * The most basic of lock types can point ->l_priv to their respective 155 * struct ocfs2_super and allow the default actions to manage things. 156 * 157 * Right now, each lock type also needs to implement an init function, 158 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres() 159 * should be called when the lock is no longer needed (i.e., object 160 * destruction time). 161 */ 162 struct ocfs2_lock_res_ops { 163 /* 164 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 165 * this callback if ->l_priv is not an ocfs2_super pointer 166 */ 167 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 168 169 /* 170 * Optionally called in the downconvert thread after a 171 * successful downconvert. The lockres will not be referenced 172 * after this callback is called, so it is safe to free 173 * memory, etc. 174 * 175 * The exact semantics of when this is called are controlled 176 * by ->downconvert_worker() 177 */ 178 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 179 180 /* 181 * Allow a lock type to add checks to determine whether it is 182 * safe to downconvert a lock. Return 0 to re-queue the 183 * downconvert at a later time, nonzero to continue. 184 * 185 * For most locks, the default checks that there are no 186 * incompatible holders are sufficient. 187 * 188 * Called with the lockres spinlock held. 189 */ 190 int (*check_downconvert)(struct ocfs2_lock_res *, int); 191 192 /* 193 * Allows a lock type to populate the lock value block. This 194 * is called on downconvert, and when we drop a lock. 195 * 196 * Locks that want to use this should set LOCK_TYPE_USES_LVB 197 * in the flags field. 198 * 199 * Called with the lockres spinlock held. 200 */ 201 void (*set_lvb)(struct ocfs2_lock_res *); 202 203 /* 204 * Called from the downconvert thread when it is determined 205 * that a lock will be downconverted. This is called without 206 * any locks held so the function can do work that might 207 * schedule (syncing out data, etc). 208 * 209 * This should return any one of the ocfs2_unblock_action 210 * values, depending on what it wants the thread to do. 211 */ 212 int (*downconvert_worker)(struct ocfs2_lock_res *, int); 213 214 /* 215 * LOCK_TYPE_* flags which describe the specific requirements 216 * of a lock type. Descriptions of each individual flag follow. 217 */ 218 int flags; 219 }; 220 221 /* 222 * Some locks want to "refresh" potentially stale data when a 223 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 224 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 225 * individual lockres l_flags member from the ast function. It is 226 * expected that the locking wrapper will clear the 227 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 228 */ 229 #define LOCK_TYPE_REQUIRES_REFRESH 0x1 230 231 /* 232 * Indicate that a lock type makes use of the lock value block. The 233 * ->set_lvb lock type callback must be defined. 234 */ 235 #define LOCK_TYPE_USES_LVB 0x2 236 237 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 238 .get_osb = ocfs2_get_inode_osb, 239 .flags = 0, 240 }; 241 242 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = { 243 .get_osb = ocfs2_get_inode_osb, 244 .check_downconvert = ocfs2_check_meta_downconvert, 245 .set_lvb = ocfs2_set_meta_lvb, 246 .downconvert_worker = ocfs2_data_convert_worker, 247 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 248 }; 249 250 static struct ocfs2_lock_res_ops ocfs2_super_lops = { 251 .flags = LOCK_TYPE_REQUIRES_REFRESH, 252 }; 253 254 static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 255 .flags = 0, 256 }; 257 258 static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = { 259 .flags = 0, 260 }; 261 262 static struct ocfs2_lock_res_ops ocfs2_trim_fs_lops = { 263 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 264 }; 265 266 static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = { 267 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 268 }; 269 270 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 271 .get_osb = ocfs2_get_dentry_osb, 272 .post_unlock = ocfs2_dentry_post_unlock, 273 .downconvert_worker = ocfs2_dentry_convert_worker, 274 .flags = 0, 275 }; 276 277 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = { 278 .get_osb = ocfs2_get_inode_osb, 279 .flags = 0, 280 }; 281 282 static struct ocfs2_lock_res_ops ocfs2_flock_lops = { 283 .get_osb = ocfs2_get_file_osb, 284 .flags = 0, 285 }; 286 287 static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = { 288 .set_lvb = ocfs2_set_qinfo_lvb, 289 .get_osb = ocfs2_get_qinfo_osb, 290 .flags = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB, 291 }; 292 293 static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = { 294 .check_downconvert = ocfs2_check_refcount_downconvert, 295 .downconvert_worker = ocfs2_refcount_convert_worker, 296 .flags = 0, 297 }; 298 299 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 300 { 301 return lockres->l_type == OCFS2_LOCK_TYPE_META || 302 lockres->l_type == OCFS2_LOCK_TYPE_RW || 303 lockres->l_type == OCFS2_LOCK_TYPE_OPEN; 304 } 305 306 static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb) 307 { 308 return container_of(lksb, struct ocfs2_lock_res, l_lksb); 309 } 310 311 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 312 { 313 BUG_ON(!ocfs2_is_inode_lock(lockres)); 314 315 return (struct inode *) lockres->l_priv; 316 } 317 318 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 319 { 320 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 321 322 return (struct ocfs2_dentry_lock *)lockres->l_priv; 323 } 324 325 static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres) 326 { 327 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO); 328 329 return (struct ocfs2_mem_dqinfo *)lockres->l_priv; 330 } 331 332 static inline struct ocfs2_refcount_tree * 333 ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res) 334 { 335 return container_of(res, struct ocfs2_refcount_tree, rf_lockres); 336 } 337 338 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 339 { 340 if (lockres->l_ops->get_osb) 341 return lockres->l_ops->get_osb(lockres); 342 343 return (struct ocfs2_super *)lockres->l_priv; 344 } 345 346 static int ocfs2_lock_create(struct ocfs2_super *osb, 347 struct ocfs2_lock_res *lockres, 348 int level, 349 u32 dlm_flags); 350 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 351 int wanted); 352 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 353 struct ocfs2_lock_res *lockres, 354 int level, unsigned long caller_ip); 355 static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb, 356 struct ocfs2_lock_res *lockres, 357 int level) 358 { 359 __ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_); 360 } 361 362 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 363 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 364 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 365 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 366 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 367 struct ocfs2_lock_res *lockres); 368 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 369 int convert); 370 #define ocfs2_log_dlm_error(_func, _err, _lockres) do { \ 371 if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY) \ 372 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \ 373 _err, _func, _lockres->l_name); \ 374 else \ 375 mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n", \ 376 _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name, \ 377 (unsigned int)ocfs2_get_dentry_lock_ino(_lockres)); \ 378 } while (0) 379 static int ocfs2_downconvert_thread(void *arg); 380 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 381 struct ocfs2_lock_res *lockres); 382 static int ocfs2_inode_lock_update(struct inode *inode, 383 struct buffer_head **bh); 384 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 385 static inline int ocfs2_highest_compat_lock_level(int level); 386 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 387 int new_level); 388 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 389 struct ocfs2_lock_res *lockres, 390 int new_level, 391 int lvb, 392 unsigned int generation); 393 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 394 struct ocfs2_lock_res *lockres); 395 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 396 struct ocfs2_lock_res *lockres); 397 398 399 static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 400 u64 blkno, 401 u32 generation, 402 char *name) 403 { 404 int len; 405 406 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 407 408 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 409 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 410 (long long)blkno, generation); 411 412 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 413 414 mlog(0, "built lock resource with name: %s\n", name); 415 } 416 417 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 418 419 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 420 struct ocfs2_dlm_debug *dlm_debug) 421 { 422 mlog(0, "Add tracking for lockres %s\n", res->l_name); 423 424 spin_lock(&ocfs2_dlm_tracking_lock); 425 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 426 spin_unlock(&ocfs2_dlm_tracking_lock); 427 } 428 429 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 430 { 431 spin_lock(&ocfs2_dlm_tracking_lock); 432 if (!list_empty(&res->l_debug_list)) 433 list_del_init(&res->l_debug_list); 434 spin_unlock(&ocfs2_dlm_tracking_lock); 435 } 436 437 #ifdef CONFIG_OCFS2_FS_STATS 438 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 439 { 440 res->l_lock_refresh = 0; 441 memset(&res->l_lock_prmode, 0, sizeof(struct ocfs2_lock_stats)); 442 memset(&res->l_lock_exmode, 0, sizeof(struct ocfs2_lock_stats)); 443 } 444 445 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level, 446 struct ocfs2_mask_waiter *mw, int ret) 447 { 448 u32 usec; 449 ktime_t kt; 450 struct ocfs2_lock_stats *stats; 451 452 if (level == LKM_PRMODE) 453 stats = &res->l_lock_prmode; 454 else if (level == LKM_EXMODE) 455 stats = &res->l_lock_exmode; 456 else 457 return; 458 459 kt = ktime_sub(ktime_get(), mw->mw_lock_start); 460 usec = ktime_to_us(kt); 461 462 stats->ls_gets++; 463 stats->ls_total += ktime_to_ns(kt); 464 /* overflow */ 465 if (unlikely(stats->ls_gets == 0)) { 466 stats->ls_gets++; 467 stats->ls_total = ktime_to_ns(kt); 468 } 469 470 if (stats->ls_max < usec) 471 stats->ls_max = usec; 472 473 if (ret) 474 stats->ls_fail++; 475 } 476 477 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 478 { 479 lockres->l_lock_refresh++; 480 } 481 482 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 483 { 484 mw->mw_lock_start = ktime_get(); 485 } 486 #else 487 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) 488 { 489 } 490 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, 491 int level, struct ocfs2_mask_waiter *mw, int ret) 492 { 493 } 494 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) 495 { 496 } 497 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) 498 { 499 } 500 #endif 501 502 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 503 struct ocfs2_lock_res *res, 504 enum ocfs2_lock_type type, 505 struct ocfs2_lock_res_ops *ops, 506 void *priv) 507 { 508 res->l_type = type; 509 res->l_ops = ops; 510 res->l_priv = priv; 511 512 res->l_level = DLM_LOCK_IV; 513 res->l_requested = DLM_LOCK_IV; 514 res->l_blocking = DLM_LOCK_IV; 515 res->l_action = OCFS2_AST_INVALID; 516 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 517 518 res->l_flags = OCFS2_LOCK_INITIALIZED; 519 520 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 521 522 ocfs2_init_lock_stats(res); 523 #ifdef CONFIG_DEBUG_LOCK_ALLOC 524 if (type != OCFS2_LOCK_TYPE_OPEN) 525 lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type], 526 &lockdep_keys[type], 0); 527 else 528 res->l_lockdep_map.key = NULL; 529 #endif 530 } 531 532 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 533 { 534 /* This also clears out the lock status block */ 535 memset(res, 0, sizeof(struct ocfs2_lock_res)); 536 spin_lock_init(&res->l_lock); 537 init_waitqueue_head(&res->l_event); 538 INIT_LIST_HEAD(&res->l_blocked_list); 539 INIT_LIST_HEAD(&res->l_mask_waiters); 540 INIT_LIST_HEAD(&res->l_holders); 541 } 542 543 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 544 enum ocfs2_lock_type type, 545 unsigned int generation, 546 struct inode *inode) 547 { 548 struct ocfs2_lock_res_ops *ops; 549 550 switch(type) { 551 case OCFS2_LOCK_TYPE_RW: 552 ops = &ocfs2_inode_rw_lops; 553 break; 554 case OCFS2_LOCK_TYPE_META: 555 ops = &ocfs2_inode_inode_lops; 556 break; 557 case OCFS2_LOCK_TYPE_OPEN: 558 ops = &ocfs2_inode_open_lops; 559 break; 560 default: 561 mlog_bug_on_msg(1, "type: %d\n", type); 562 ops = NULL; /* thanks, gcc */ 563 break; 564 }; 565 566 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 567 generation, res->l_name); 568 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 569 } 570 571 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 572 { 573 struct inode *inode = ocfs2_lock_res_inode(lockres); 574 575 return OCFS2_SB(inode->i_sb); 576 } 577 578 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres) 579 { 580 struct ocfs2_mem_dqinfo *info = lockres->l_priv; 581 582 return OCFS2_SB(info->dqi_gi.dqi_sb); 583 } 584 585 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres) 586 { 587 struct ocfs2_file_private *fp = lockres->l_priv; 588 589 return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb); 590 } 591 592 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 593 { 594 __be64 inode_blkno_be; 595 596 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 597 sizeof(__be64)); 598 599 return be64_to_cpu(inode_blkno_be); 600 } 601 602 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 603 { 604 struct ocfs2_dentry_lock *dl = lockres->l_priv; 605 606 return OCFS2_SB(dl->dl_inode->i_sb); 607 } 608 609 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 610 u64 parent, struct inode *inode) 611 { 612 int len; 613 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 614 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 615 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 616 617 ocfs2_lock_res_init_once(lockres); 618 619 /* 620 * Unfortunately, the standard lock naming scheme won't work 621 * here because we have two 16 byte values to use. Instead, 622 * we'll stuff the inode number as a binary value. We still 623 * want error prints to show something without garbling the 624 * display, so drop a null byte in there before the inode 625 * number. A future version of OCFS2 will likely use all 626 * binary lock names. The stringified names have been a 627 * tremendous aid in debugging, but now that the debugfs 628 * interface exists, we can mangle things there if need be. 629 * 630 * NOTE: We also drop the standard "pad" value (the total lock 631 * name size stays the same though - the last part is all 632 * zeros due to the memset in ocfs2_lock_res_init_once() 633 */ 634 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 635 "%c%016llx", 636 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 637 (long long)parent); 638 639 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 640 641 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 642 sizeof(__be64)); 643 644 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 645 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 646 dl); 647 } 648 649 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 650 struct ocfs2_super *osb) 651 { 652 /* Superblock lockres doesn't come from a slab so we call init 653 * once on it manually. */ 654 ocfs2_lock_res_init_once(res); 655 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 656 0, res->l_name); 657 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 658 &ocfs2_super_lops, osb); 659 } 660 661 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 662 struct ocfs2_super *osb) 663 { 664 /* Rename lockres doesn't come from a slab so we call init 665 * once on it manually. */ 666 ocfs2_lock_res_init_once(res); 667 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 668 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 669 &ocfs2_rename_lops, osb); 670 } 671 672 static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res, 673 struct ocfs2_super *osb) 674 { 675 /* nfs_sync lockres doesn't come from a slab so we call init 676 * once on it manually. */ 677 ocfs2_lock_res_init_once(res); 678 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name); 679 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC, 680 &ocfs2_nfs_sync_lops, osb); 681 } 682 683 void ocfs2_trim_fs_lock_res_init(struct ocfs2_super *osb) 684 { 685 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 686 687 ocfs2_lock_res_init_once(lockres); 688 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_TRIM_FS, 0, 0, lockres->l_name); 689 ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_TRIM_FS, 690 &ocfs2_trim_fs_lops, osb); 691 } 692 693 void ocfs2_trim_fs_lock_res_uninit(struct ocfs2_super *osb) 694 { 695 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 696 697 ocfs2_simple_drop_lockres(osb, lockres); 698 ocfs2_lock_res_free(lockres); 699 } 700 701 static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res, 702 struct ocfs2_super *osb) 703 { 704 ocfs2_lock_res_init_once(res); 705 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name); 706 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN, 707 &ocfs2_orphan_scan_lops, osb); 708 } 709 710 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 711 struct ocfs2_file_private *fp) 712 { 713 struct inode *inode = fp->fp_file->f_mapping->host; 714 struct ocfs2_inode_info *oi = OCFS2_I(inode); 715 716 ocfs2_lock_res_init_once(lockres); 717 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno, 718 inode->i_generation, lockres->l_name); 719 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 720 OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops, 721 fp); 722 lockres->l_flags |= OCFS2_LOCK_NOCACHE; 723 } 724 725 void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres, 726 struct ocfs2_mem_dqinfo *info) 727 { 728 ocfs2_lock_res_init_once(lockres); 729 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type, 730 0, lockres->l_name); 731 ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres, 732 OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops, 733 info); 734 } 735 736 void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres, 737 struct ocfs2_super *osb, u64 ref_blkno, 738 unsigned int generation) 739 { 740 ocfs2_lock_res_init_once(lockres); 741 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno, 742 generation, lockres->l_name); 743 ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT, 744 &ocfs2_refcount_block_lops, osb); 745 } 746 747 void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 748 { 749 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 750 return; 751 752 ocfs2_remove_lockres_tracking(res); 753 754 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 755 "Lockres %s is on the blocked list\n", 756 res->l_name); 757 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 758 "Lockres %s has mask waiters pending\n", 759 res->l_name); 760 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 761 "Lockres %s is locked\n", 762 res->l_name); 763 mlog_bug_on_msg(res->l_ro_holders, 764 "Lockres %s has %u ro holders\n", 765 res->l_name, res->l_ro_holders); 766 mlog_bug_on_msg(res->l_ex_holders, 767 "Lockres %s has %u ex holders\n", 768 res->l_name, res->l_ex_holders); 769 770 /* Need to clear out the lock status block for the dlm */ 771 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 772 773 res->l_flags = 0UL; 774 } 775 776 /* 777 * Keep a list of processes who have interest in a lockres. 778 * Note: this is now only uesed for check recursive cluster locking. 779 */ 780 static inline void ocfs2_add_holder(struct ocfs2_lock_res *lockres, 781 struct ocfs2_lock_holder *oh) 782 { 783 INIT_LIST_HEAD(&oh->oh_list); 784 oh->oh_owner_pid = get_pid(task_pid(current)); 785 786 spin_lock(&lockres->l_lock); 787 list_add_tail(&oh->oh_list, &lockres->l_holders); 788 spin_unlock(&lockres->l_lock); 789 } 790 791 static inline void ocfs2_remove_holder(struct ocfs2_lock_res *lockres, 792 struct ocfs2_lock_holder *oh) 793 { 794 spin_lock(&lockres->l_lock); 795 list_del(&oh->oh_list); 796 spin_unlock(&lockres->l_lock); 797 798 put_pid(oh->oh_owner_pid); 799 } 800 801 static inline int ocfs2_is_locked_by_me(struct ocfs2_lock_res *lockres) 802 { 803 struct ocfs2_lock_holder *oh; 804 struct pid *pid; 805 806 /* look in the list of holders for one with the current task as owner */ 807 spin_lock(&lockres->l_lock); 808 pid = task_pid(current); 809 list_for_each_entry(oh, &lockres->l_holders, oh_list) { 810 if (oh->oh_owner_pid == pid) { 811 spin_unlock(&lockres->l_lock); 812 return 1; 813 } 814 } 815 spin_unlock(&lockres->l_lock); 816 817 return 0; 818 } 819 820 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 821 int level) 822 { 823 BUG_ON(!lockres); 824 825 switch(level) { 826 case DLM_LOCK_EX: 827 lockres->l_ex_holders++; 828 break; 829 case DLM_LOCK_PR: 830 lockres->l_ro_holders++; 831 break; 832 default: 833 BUG(); 834 } 835 } 836 837 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 838 int level) 839 { 840 BUG_ON(!lockres); 841 842 switch(level) { 843 case DLM_LOCK_EX: 844 BUG_ON(!lockres->l_ex_holders); 845 lockres->l_ex_holders--; 846 break; 847 case DLM_LOCK_PR: 848 BUG_ON(!lockres->l_ro_holders); 849 lockres->l_ro_holders--; 850 break; 851 default: 852 BUG(); 853 } 854 } 855 856 /* WARNING: This function lives in a world where the only three lock 857 * levels are EX, PR, and NL. It *will* have to be adjusted when more 858 * lock types are added. */ 859 static inline int ocfs2_highest_compat_lock_level(int level) 860 { 861 int new_level = DLM_LOCK_EX; 862 863 if (level == DLM_LOCK_EX) 864 new_level = DLM_LOCK_NL; 865 else if (level == DLM_LOCK_PR) 866 new_level = DLM_LOCK_PR; 867 return new_level; 868 } 869 870 static void lockres_set_flags(struct ocfs2_lock_res *lockres, 871 unsigned long newflags) 872 { 873 struct ocfs2_mask_waiter *mw, *tmp; 874 875 assert_spin_locked(&lockres->l_lock); 876 877 lockres->l_flags = newflags; 878 879 list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) { 880 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 881 continue; 882 883 list_del_init(&mw->mw_item); 884 mw->mw_status = 0; 885 complete(&mw->mw_complete); 886 } 887 } 888 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 889 { 890 lockres_set_flags(lockres, lockres->l_flags | or); 891 } 892 static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 893 unsigned long clear) 894 { 895 lockres_set_flags(lockres, lockres->l_flags & ~clear); 896 } 897 898 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 899 { 900 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 901 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 902 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 903 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 904 905 lockres->l_level = lockres->l_requested; 906 if (lockres->l_level <= 907 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 908 lockres->l_blocking = DLM_LOCK_NL; 909 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 910 } 911 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 912 } 913 914 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 915 { 916 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 917 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 918 919 /* Convert from RO to EX doesn't really need anything as our 920 * information is already up to data. Convert from NL to 921 * *anything* however should mark ourselves as needing an 922 * update */ 923 if (lockres->l_level == DLM_LOCK_NL && 924 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 925 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 926 927 lockres->l_level = lockres->l_requested; 928 929 /* 930 * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing 931 * the OCFS2_LOCK_BUSY flag to prevent the dc thread from 932 * downconverting the lock before the upconvert has fully completed. 933 * Do not prevent the dc thread from downconverting if NONBLOCK lock 934 * had already returned. 935 */ 936 if (!(lockres->l_flags & OCFS2_LOCK_NONBLOCK_FINISHED)) 937 lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 938 else 939 lockres_clear_flags(lockres, OCFS2_LOCK_NONBLOCK_FINISHED); 940 941 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 942 } 943 944 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 945 { 946 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); 947 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 948 949 if (lockres->l_requested > DLM_LOCK_NL && 950 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 951 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 952 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 953 954 lockres->l_level = lockres->l_requested; 955 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 956 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 957 } 958 959 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 960 int level) 961 { 962 int needs_downconvert = 0; 963 964 assert_spin_locked(&lockres->l_lock); 965 966 if (level > lockres->l_blocking) { 967 /* only schedule a downconvert if we haven't already scheduled 968 * one that goes low enough to satisfy the level we're 969 * blocking. this also catches the case where we get 970 * duplicate BASTs */ 971 if (ocfs2_highest_compat_lock_level(level) < 972 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 973 needs_downconvert = 1; 974 975 lockres->l_blocking = level; 976 } 977 978 mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n", 979 lockres->l_name, level, lockres->l_level, lockres->l_blocking, 980 needs_downconvert); 981 982 if (needs_downconvert) 983 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 984 mlog(0, "needs_downconvert = %d\n", needs_downconvert); 985 return needs_downconvert; 986 } 987 988 /* 989 * OCFS2_LOCK_PENDING and l_pending_gen. 990 * 991 * Why does OCFS2_LOCK_PENDING exist? To close a race between setting 992 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock(). See ocfs2_unblock_lock() 993 * for more details on the race. 994 * 995 * OCFS2_LOCK_PENDING closes the race quite nicely. However, it introduces 996 * a race on itself. In o2dlm, we can get the ast before ocfs2_dlm_lock() 997 * returns. The ast clears OCFS2_LOCK_BUSY, and must therefore clear 998 * OCFS2_LOCK_PENDING at the same time. When ocfs2_dlm_lock() returns, 999 * the caller is going to try to clear PENDING again. If nothing else is 1000 * happening, __lockres_clear_pending() sees PENDING is unset and does 1001 * nothing. 1002 * 1003 * But what if another path (eg downconvert thread) has just started a 1004 * new locking action? The other path has re-set PENDING. Our path 1005 * cannot clear PENDING, because that will re-open the original race 1006 * window. 1007 * 1008 * [Example] 1009 * 1010 * ocfs2_meta_lock() 1011 * ocfs2_cluster_lock() 1012 * set BUSY 1013 * set PENDING 1014 * drop l_lock 1015 * ocfs2_dlm_lock() 1016 * ocfs2_locking_ast() ocfs2_downconvert_thread() 1017 * clear PENDING ocfs2_unblock_lock() 1018 * take_l_lock 1019 * !BUSY 1020 * ocfs2_prepare_downconvert() 1021 * set BUSY 1022 * set PENDING 1023 * drop l_lock 1024 * take l_lock 1025 * clear PENDING 1026 * drop l_lock 1027 * <window> 1028 * ocfs2_dlm_lock() 1029 * 1030 * So as you can see, we now have a window where l_lock is not held, 1031 * PENDING is not set, and ocfs2_dlm_lock() has not been called. 1032 * 1033 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING 1034 * set by ocfs2_prepare_downconvert(). That wasn't nice. 1035 * 1036 * To solve this we introduce l_pending_gen. A call to 1037 * lockres_clear_pending() will only do so when it is passed a generation 1038 * number that matches the lockres. lockres_set_pending() will return the 1039 * current generation number. When ocfs2_cluster_lock() goes to clear 1040 * PENDING, it passes the generation it got from set_pending(). In our 1041 * example above, the generation numbers will *not* match. Thus, 1042 * ocfs2_cluster_lock() will not clear the PENDING set by 1043 * ocfs2_prepare_downconvert(). 1044 */ 1045 1046 /* Unlocked version for ocfs2_locking_ast() */ 1047 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres, 1048 unsigned int generation, 1049 struct ocfs2_super *osb) 1050 { 1051 assert_spin_locked(&lockres->l_lock); 1052 1053 /* 1054 * The ast and locking functions can race us here. The winner 1055 * will clear pending, the loser will not. 1056 */ 1057 if (!(lockres->l_flags & OCFS2_LOCK_PENDING) || 1058 (lockres->l_pending_gen != generation)) 1059 return; 1060 1061 lockres_clear_flags(lockres, OCFS2_LOCK_PENDING); 1062 lockres->l_pending_gen++; 1063 1064 /* 1065 * The downconvert thread may have skipped us because we 1066 * were PENDING. Wake it up. 1067 */ 1068 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1069 ocfs2_wake_downconvert_thread(osb); 1070 } 1071 1072 /* Locked version for callers of ocfs2_dlm_lock() */ 1073 static void lockres_clear_pending(struct ocfs2_lock_res *lockres, 1074 unsigned int generation, 1075 struct ocfs2_super *osb) 1076 { 1077 unsigned long flags; 1078 1079 spin_lock_irqsave(&lockres->l_lock, flags); 1080 __lockres_clear_pending(lockres, generation, osb); 1081 spin_unlock_irqrestore(&lockres->l_lock, flags); 1082 } 1083 1084 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres) 1085 { 1086 assert_spin_locked(&lockres->l_lock); 1087 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 1088 1089 lockres_or_flags(lockres, OCFS2_LOCK_PENDING); 1090 1091 return lockres->l_pending_gen; 1092 } 1093 1094 static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level) 1095 { 1096 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1097 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1098 int needs_downconvert; 1099 unsigned long flags; 1100 1101 BUG_ON(level <= DLM_LOCK_NL); 1102 1103 mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, " 1104 "type %s\n", lockres->l_name, level, lockres->l_level, 1105 ocfs2_lock_type_string(lockres->l_type)); 1106 1107 /* 1108 * We can skip the bast for locks which don't enable caching - 1109 * they'll be dropped at the earliest possible time anyway. 1110 */ 1111 if (lockres->l_flags & OCFS2_LOCK_NOCACHE) 1112 return; 1113 1114 spin_lock_irqsave(&lockres->l_lock, flags); 1115 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 1116 if (needs_downconvert) 1117 ocfs2_schedule_blocked_lock(osb, lockres); 1118 spin_unlock_irqrestore(&lockres->l_lock, flags); 1119 1120 wake_up(&lockres->l_event); 1121 1122 ocfs2_wake_downconvert_thread(osb); 1123 } 1124 1125 static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb) 1126 { 1127 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1128 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1129 unsigned long flags; 1130 int status; 1131 1132 spin_lock_irqsave(&lockres->l_lock, flags); 1133 1134 status = ocfs2_dlm_lock_status(&lockres->l_lksb); 1135 1136 if (status == -EAGAIN) { 1137 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1138 goto out; 1139 } 1140 1141 if (status) { 1142 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n", 1143 lockres->l_name, status); 1144 spin_unlock_irqrestore(&lockres->l_lock, flags); 1145 return; 1146 } 1147 1148 mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, " 1149 "level %d => %d\n", lockres->l_name, lockres->l_action, 1150 lockres->l_unlock_action, lockres->l_level, lockres->l_requested); 1151 1152 switch(lockres->l_action) { 1153 case OCFS2_AST_ATTACH: 1154 ocfs2_generic_handle_attach_action(lockres); 1155 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 1156 break; 1157 case OCFS2_AST_CONVERT: 1158 ocfs2_generic_handle_convert_action(lockres); 1159 break; 1160 case OCFS2_AST_DOWNCONVERT: 1161 ocfs2_generic_handle_downconvert_action(lockres); 1162 break; 1163 default: 1164 mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, " 1165 "flags 0x%lx, unlock: %u\n", 1166 lockres->l_name, lockres->l_action, lockres->l_flags, 1167 lockres->l_unlock_action); 1168 BUG(); 1169 } 1170 out: 1171 /* set it to something invalid so if we get called again we 1172 * can catch it. */ 1173 lockres->l_action = OCFS2_AST_INVALID; 1174 1175 /* Did we try to cancel this lock? Clear that state */ 1176 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) 1177 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1178 1179 /* 1180 * We may have beaten the locking functions here. We certainly 1181 * know that dlm_lock() has been called :-) 1182 * Because we can't have two lock calls in flight at once, we 1183 * can use lockres->l_pending_gen. 1184 */ 1185 __lockres_clear_pending(lockres, lockres->l_pending_gen, osb); 1186 1187 wake_up(&lockres->l_event); 1188 spin_unlock_irqrestore(&lockres->l_lock, flags); 1189 } 1190 1191 static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error) 1192 { 1193 struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); 1194 unsigned long flags; 1195 1196 mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n", 1197 lockres->l_name, lockres->l_unlock_action); 1198 1199 spin_lock_irqsave(&lockres->l_lock, flags); 1200 if (error) { 1201 mlog(ML_ERROR, "Dlm passes error %d for lock %s, " 1202 "unlock_action %d\n", error, lockres->l_name, 1203 lockres->l_unlock_action); 1204 spin_unlock_irqrestore(&lockres->l_lock, flags); 1205 return; 1206 } 1207 1208 switch(lockres->l_unlock_action) { 1209 case OCFS2_UNLOCK_CANCEL_CONVERT: 1210 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 1211 lockres->l_action = OCFS2_AST_INVALID; 1212 /* Downconvert thread may have requeued this lock, we 1213 * need to wake it. */ 1214 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 1215 ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres)); 1216 break; 1217 case OCFS2_UNLOCK_DROP_LOCK: 1218 lockres->l_level = DLM_LOCK_IV; 1219 break; 1220 default: 1221 BUG(); 1222 } 1223 1224 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1225 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1226 wake_up(&lockres->l_event); 1227 spin_unlock_irqrestore(&lockres->l_lock, flags); 1228 } 1229 1230 /* 1231 * This is the filesystem locking protocol. It provides the lock handling 1232 * hooks for the underlying DLM. It has a maximum version number. 1233 * The version number allows interoperability with systems running at 1234 * the same major number and an equal or smaller minor number. 1235 * 1236 * Whenever the filesystem does new things with locks (adds or removes a 1237 * lock, orders them differently, does different things underneath a lock), 1238 * the version must be changed. The protocol is negotiated when joining 1239 * the dlm domain. A node may join the domain if its major version is 1240 * identical to all other nodes and its minor version is greater than 1241 * or equal to all other nodes. When its minor version is greater than 1242 * the other nodes, it will run at the minor version specified by the 1243 * other nodes. 1244 * 1245 * If a locking change is made that will not be compatible with older 1246 * versions, the major number must be increased and the minor version set 1247 * to zero. If a change merely adds a behavior that can be disabled when 1248 * speaking to older versions, the minor version must be increased. If a 1249 * change adds a fully backwards compatible change (eg, LVB changes that 1250 * are just ignored by older versions), the version does not need to be 1251 * updated. 1252 */ 1253 static struct ocfs2_locking_protocol lproto = { 1254 .lp_max_version = { 1255 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 1256 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 1257 }, 1258 .lp_lock_ast = ocfs2_locking_ast, 1259 .lp_blocking_ast = ocfs2_blocking_ast, 1260 .lp_unlock_ast = ocfs2_unlock_ast, 1261 }; 1262 1263 void ocfs2_set_locking_protocol(void) 1264 { 1265 ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version); 1266 } 1267 1268 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 1269 int convert) 1270 { 1271 unsigned long flags; 1272 1273 spin_lock_irqsave(&lockres->l_lock, flags); 1274 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 1275 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1276 if (convert) 1277 lockres->l_action = OCFS2_AST_INVALID; 1278 else 1279 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 1280 spin_unlock_irqrestore(&lockres->l_lock, flags); 1281 1282 wake_up(&lockres->l_event); 1283 } 1284 1285 /* Note: If we detect another process working on the lock (i.e., 1286 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 1287 * to do the right thing in that case. 1288 */ 1289 static int ocfs2_lock_create(struct ocfs2_super *osb, 1290 struct ocfs2_lock_res *lockres, 1291 int level, 1292 u32 dlm_flags) 1293 { 1294 int ret = 0; 1295 unsigned long flags; 1296 unsigned int gen; 1297 1298 mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level, 1299 dlm_flags); 1300 1301 spin_lock_irqsave(&lockres->l_lock, flags); 1302 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 1303 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 1304 spin_unlock_irqrestore(&lockres->l_lock, flags); 1305 goto bail; 1306 } 1307 1308 lockres->l_action = OCFS2_AST_ATTACH; 1309 lockres->l_requested = level; 1310 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1311 gen = lockres_set_pending(lockres); 1312 spin_unlock_irqrestore(&lockres->l_lock, flags); 1313 1314 ret = ocfs2_dlm_lock(osb->cconn, 1315 level, 1316 &lockres->l_lksb, 1317 dlm_flags, 1318 lockres->l_name, 1319 OCFS2_LOCK_ID_MAX_LEN - 1); 1320 lockres_clear_pending(lockres, gen, osb); 1321 if (ret) { 1322 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1323 ocfs2_recover_from_dlm_error(lockres, 1); 1324 } 1325 1326 mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name); 1327 1328 bail: 1329 return ret; 1330 } 1331 1332 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 1333 int flag) 1334 { 1335 unsigned long flags; 1336 int ret; 1337 1338 spin_lock_irqsave(&lockres->l_lock, flags); 1339 ret = lockres->l_flags & flag; 1340 spin_unlock_irqrestore(&lockres->l_lock, flags); 1341 1342 return ret; 1343 } 1344 1345 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 1346 1347 { 1348 wait_event(lockres->l_event, 1349 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 1350 } 1351 1352 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 1353 1354 { 1355 wait_event(lockres->l_event, 1356 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 1357 } 1358 1359 /* predict what lock level we'll be dropping down to on behalf 1360 * of another node, and return true if the currently wanted 1361 * level will be compatible with it. */ 1362 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 1363 int wanted) 1364 { 1365 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 1366 1367 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 1368 } 1369 1370 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 1371 { 1372 INIT_LIST_HEAD(&mw->mw_item); 1373 init_completion(&mw->mw_complete); 1374 ocfs2_init_start_time(mw); 1375 } 1376 1377 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 1378 { 1379 wait_for_completion(&mw->mw_complete); 1380 /* Re-arm the completion in case we want to wait on it again */ 1381 reinit_completion(&mw->mw_complete); 1382 return mw->mw_status; 1383 } 1384 1385 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 1386 struct ocfs2_mask_waiter *mw, 1387 unsigned long mask, 1388 unsigned long goal) 1389 { 1390 BUG_ON(!list_empty(&mw->mw_item)); 1391 1392 assert_spin_locked(&lockres->l_lock); 1393 1394 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 1395 mw->mw_mask = mask; 1396 mw->mw_goal = goal; 1397 } 1398 1399 /* returns 0 if the mw that was removed was already satisfied, -EBUSY 1400 * if the mask still hadn't reached its goal */ 1401 static int __lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1402 struct ocfs2_mask_waiter *mw) 1403 { 1404 int ret = 0; 1405 1406 assert_spin_locked(&lockres->l_lock); 1407 if (!list_empty(&mw->mw_item)) { 1408 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 1409 ret = -EBUSY; 1410 1411 list_del_init(&mw->mw_item); 1412 init_completion(&mw->mw_complete); 1413 } 1414 1415 return ret; 1416 } 1417 1418 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 1419 struct ocfs2_mask_waiter *mw) 1420 { 1421 unsigned long flags; 1422 int ret = 0; 1423 1424 spin_lock_irqsave(&lockres->l_lock, flags); 1425 ret = __lockres_remove_mask_waiter(lockres, mw); 1426 spin_unlock_irqrestore(&lockres->l_lock, flags); 1427 1428 return ret; 1429 1430 } 1431 1432 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, 1433 struct ocfs2_lock_res *lockres) 1434 { 1435 int ret; 1436 1437 ret = wait_for_completion_interruptible(&mw->mw_complete); 1438 if (ret) 1439 lockres_remove_mask_waiter(lockres, mw); 1440 else 1441 ret = mw->mw_status; 1442 /* Re-arm the completion in case we want to wait on it again */ 1443 reinit_completion(&mw->mw_complete); 1444 return ret; 1445 } 1446 1447 static int __ocfs2_cluster_lock(struct ocfs2_super *osb, 1448 struct ocfs2_lock_res *lockres, 1449 int level, 1450 u32 lkm_flags, 1451 int arg_flags, 1452 int l_subclass, 1453 unsigned long caller_ip) 1454 { 1455 struct ocfs2_mask_waiter mw; 1456 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1457 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1458 unsigned long flags; 1459 unsigned int gen; 1460 int noqueue_attempted = 0; 1461 int dlm_locked = 0; 1462 int kick_dc = 0; 1463 1464 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) { 1465 mlog_errno(-EINVAL); 1466 return -EINVAL; 1467 } 1468 1469 ocfs2_init_mask_waiter(&mw); 1470 1471 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 1472 lkm_flags |= DLM_LKF_VALBLK; 1473 1474 again: 1475 wait = 0; 1476 1477 spin_lock_irqsave(&lockres->l_lock, flags); 1478 1479 if (catch_signals && signal_pending(current)) { 1480 ret = -ERESTARTSYS; 1481 goto unlock; 1482 } 1483 1484 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1485 "Cluster lock called on freeing lockres %s! flags " 1486 "0x%lx\n", lockres->l_name, lockres->l_flags); 1487 1488 /* We only compare against the currently granted level 1489 * here. If the lock is blocked waiting on a downconvert, 1490 * we'll get caught below. */ 1491 if (lockres->l_flags & OCFS2_LOCK_BUSY && 1492 level > lockres->l_level) { 1493 /* is someone sitting in dlm_lock? If so, wait on 1494 * them. */ 1495 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1496 wait = 1; 1497 goto unlock; 1498 } 1499 1500 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) { 1501 /* 1502 * We've upconverted. If the lock now has a level we can 1503 * work with, we take it. If, however, the lock is not at the 1504 * required level, we go thru the full cycle. One way this could 1505 * happen is if a process requesting an upconvert to PR is 1506 * closely followed by another requesting upconvert to an EX. 1507 * If the process requesting EX lands here, we want it to 1508 * continue attempting to upconvert and let the process 1509 * requesting PR take the lock. 1510 * If multiple processes request upconvert to PR, the first one 1511 * here will take the lock. The others will have to go thru the 1512 * OCFS2_LOCK_BLOCKED check to ensure that there is no pending 1513 * downconvert request. 1514 */ 1515 if (level <= lockres->l_level) 1516 goto update_holders; 1517 } 1518 1519 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1520 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1521 /* is the lock is currently blocked on behalf of 1522 * another node */ 1523 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 1524 wait = 1; 1525 goto unlock; 1526 } 1527 1528 if (level > lockres->l_level) { 1529 if (noqueue_attempted > 0) { 1530 ret = -EAGAIN; 1531 goto unlock; 1532 } 1533 if (lkm_flags & DLM_LKF_NOQUEUE) 1534 noqueue_attempted = 1; 1535 1536 if (lockres->l_action != OCFS2_AST_INVALID) 1537 mlog(ML_ERROR, "lockres %s has action %u pending\n", 1538 lockres->l_name, lockres->l_action); 1539 1540 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1541 lockres->l_action = OCFS2_AST_ATTACH; 1542 lkm_flags &= ~DLM_LKF_CONVERT; 1543 } else { 1544 lockres->l_action = OCFS2_AST_CONVERT; 1545 lkm_flags |= DLM_LKF_CONVERT; 1546 } 1547 1548 lockres->l_requested = level; 1549 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1550 gen = lockres_set_pending(lockres); 1551 spin_unlock_irqrestore(&lockres->l_lock, flags); 1552 1553 BUG_ON(level == DLM_LOCK_IV); 1554 BUG_ON(level == DLM_LOCK_NL); 1555 1556 mlog(ML_BASTS, "lockres %s, convert from %d to %d\n", 1557 lockres->l_name, lockres->l_level, level); 1558 1559 /* call dlm_lock to upgrade lock now */ 1560 ret = ocfs2_dlm_lock(osb->cconn, 1561 level, 1562 &lockres->l_lksb, 1563 lkm_flags, 1564 lockres->l_name, 1565 OCFS2_LOCK_ID_MAX_LEN - 1); 1566 lockres_clear_pending(lockres, gen, osb); 1567 if (ret) { 1568 if (!(lkm_flags & DLM_LKF_NOQUEUE) || 1569 (ret != -EAGAIN)) { 1570 ocfs2_log_dlm_error("ocfs2_dlm_lock", 1571 ret, lockres); 1572 } 1573 ocfs2_recover_from_dlm_error(lockres, 1); 1574 goto out; 1575 } 1576 dlm_locked = 1; 1577 1578 mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n", 1579 lockres->l_name); 1580 1581 /* At this point we've gone inside the dlm and need to 1582 * complete our work regardless. */ 1583 catch_signals = 0; 1584 1585 /* wait for busy to clear and carry on */ 1586 goto again; 1587 } 1588 1589 update_holders: 1590 /* Ok, if we get here then we're good to go. */ 1591 ocfs2_inc_holders(lockres, level); 1592 1593 ret = 0; 1594 unlock: 1595 lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); 1596 1597 /* ocfs2_unblock_lock reques on seeing OCFS2_LOCK_UPCONVERT_FINISHING */ 1598 kick_dc = (lockres->l_flags & OCFS2_LOCK_BLOCKED); 1599 1600 spin_unlock_irqrestore(&lockres->l_lock, flags); 1601 if (kick_dc) 1602 ocfs2_wake_downconvert_thread(osb); 1603 out: 1604 /* 1605 * This is helping work around a lock inversion between the page lock 1606 * and dlm locks. One path holds the page lock while calling aops 1607 * which block acquiring dlm locks. The voting thread holds dlm 1608 * locks while acquiring page locks while down converting data locks. 1609 * This block is helping an aop path notice the inversion and back 1610 * off to unlock its page lock before trying the dlm lock again. 1611 */ 1612 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1613 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1614 wait = 0; 1615 spin_lock_irqsave(&lockres->l_lock, flags); 1616 if (__lockres_remove_mask_waiter(lockres, &mw)) { 1617 if (dlm_locked) 1618 lockres_or_flags(lockres, 1619 OCFS2_LOCK_NONBLOCK_FINISHED); 1620 spin_unlock_irqrestore(&lockres->l_lock, flags); 1621 ret = -EAGAIN; 1622 } else { 1623 spin_unlock_irqrestore(&lockres->l_lock, flags); 1624 goto again; 1625 } 1626 } 1627 if (wait) { 1628 ret = ocfs2_wait_for_mask(&mw); 1629 if (ret == 0) 1630 goto again; 1631 mlog_errno(ret); 1632 } 1633 ocfs2_update_lock_stats(lockres, level, &mw, ret); 1634 1635 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1636 if (!ret && lockres->l_lockdep_map.key != NULL) { 1637 if (level == DLM_LOCK_PR) 1638 rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass, 1639 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1640 caller_ip); 1641 else 1642 rwsem_acquire(&lockres->l_lockdep_map, l_subclass, 1643 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE), 1644 caller_ip); 1645 } 1646 #endif 1647 return ret; 1648 } 1649 1650 static inline int ocfs2_cluster_lock(struct ocfs2_super *osb, 1651 struct ocfs2_lock_res *lockres, 1652 int level, 1653 u32 lkm_flags, 1654 int arg_flags) 1655 { 1656 return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags, 1657 0, _RET_IP_); 1658 } 1659 1660 1661 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, 1662 struct ocfs2_lock_res *lockres, 1663 int level, 1664 unsigned long caller_ip) 1665 { 1666 unsigned long flags; 1667 1668 spin_lock_irqsave(&lockres->l_lock, flags); 1669 ocfs2_dec_holders(lockres, level); 1670 ocfs2_downconvert_on_unlock(osb, lockres); 1671 spin_unlock_irqrestore(&lockres->l_lock, flags); 1672 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1673 if (lockres->l_lockdep_map.key != NULL) 1674 rwsem_release(&lockres->l_lockdep_map, 1, caller_ip); 1675 #endif 1676 } 1677 1678 static int ocfs2_create_new_lock(struct ocfs2_super *osb, 1679 struct ocfs2_lock_res *lockres, 1680 int ex, 1681 int local) 1682 { 1683 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1684 unsigned long flags; 1685 u32 lkm_flags = local ? DLM_LKF_LOCAL : 0; 1686 1687 spin_lock_irqsave(&lockres->l_lock, flags); 1688 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1689 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1690 spin_unlock_irqrestore(&lockres->l_lock, flags); 1691 1692 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1693 } 1694 1695 /* Grants us an EX lock on the data and metadata resources, skipping 1696 * the normal cluster directory lookup. Use this ONLY on newly created 1697 * inodes which other nodes can't possibly see, and which haven't been 1698 * hashed in the inode hash yet. This can give us a good performance 1699 * increase as it'll skip the network broadcast normally associated 1700 * with creating a new lock resource. */ 1701 int ocfs2_create_new_inode_locks(struct inode *inode) 1702 { 1703 int ret; 1704 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1705 1706 BUG_ON(!ocfs2_inode_is_new(inode)); 1707 1708 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1709 1710 /* NOTE: That we don't increment any of the holder counts, nor 1711 * do we add anything to a journal handle. Since this is 1712 * supposed to be a new inode which the cluster doesn't know 1713 * about yet, there is no need to. As far as the LVB handling 1714 * is concerned, this is basically like acquiring an EX lock 1715 * on a resource which has an invalid one -- we'll set it 1716 * valid when we release the EX. */ 1717 1718 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1719 if (ret) { 1720 mlog_errno(ret); 1721 goto bail; 1722 } 1723 1724 /* 1725 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they 1726 * don't use a generation in their lock names. 1727 */ 1728 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); 1729 if (ret) { 1730 mlog_errno(ret); 1731 goto bail; 1732 } 1733 1734 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0); 1735 if (ret) 1736 mlog_errno(ret); 1737 1738 bail: 1739 return ret; 1740 } 1741 1742 int ocfs2_rw_lock(struct inode *inode, int write) 1743 { 1744 int status, level; 1745 struct ocfs2_lock_res *lockres; 1746 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1747 1748 mlog(0, "inode %llu take %s RW lock\n", 1749 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1750 write ? "EXMODE" : "PRMODE"); 1751 1752 if (ocfs2_mount_local(osb)) 1753 return 0; 1754 1755 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1756 1757 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1758 1759 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 1760 if (status < 0) 1761 mlog_errno(status); 1762 1763 return status; 1764 } 1765 1766 int ocfs2_try_rw_lock(struct inode *inode, int write) 1767 { 1768 int status, level; 1769 struct ocfs2_lock_res *lockres; 1770 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1771 1772 mlog(0, "inode %llu try to take %s RW lock\n", 1773 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1774 write ? "EXMODE" : "PRMODE"); 1775 1776 if (ocfs2_mount_local(osb)) 1777 return 0; 1778 1779 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1780 1781 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1782 1783 status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0); 1784 return status; 1785 } 1786 1787 void ocfs2_rw_unlock(struct inode *inode, int write) 1788 { 1789 int level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1790 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1791 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1792 1793 mlog(0, "inode %llu drop %s RW lock\n", 1794 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1795 write ? "EXMODE" : "PRMODE"); 1796 1797 if (!ocfs2_mount_local(osb)) 1798 ocfs2_cluster_unlock(osb, lockres, level); 1799 } 1800 1801 /* 1802 * ocfs2_open_lock always get PR mode lock. 1803 */ 1804 int ocfs2_open_lock(struct inode *inode) 1805 { 1806 int status = 0; 1807 struct ocfs2_lock_res *lockres; 1808 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1809 1810 mlog(0, "inode %llu take PRMODE open lock\n", 1811 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1812 1813 if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb)) 1814 goto out; 1815 1816 lockres = &OCFS2_I(inode)->ip_open_lockres; 1817 1818 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_PR, 0, 0); 1819 if (status < 0) 1820 mlog_errno(status); 1821 1822 out: 1823 return status; 1824 } 1825 1826 int ocfs2_try_open_lock(struct inode *inode, int write) 1827 { 1828 int status = 0, level; 1829 struct ocfs2_lock_res *lockres; 1830 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1831 1832 mlog(0, "inode %llu try to take %s open lock\n", 1833 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1834 write ? "EXMODE" : "PRMODE"); 1835 1836 if (ocfs2_is_hard_readonly(osb)) { 1837 if (write) 1838 status = -EROFS; 1839 goto out; 1840 } 1841 1842 if (ocfs2_mount_local(osb)) 1843 goto out; 1844 1845 lockres = &OCFS2_I(inode)->ip_open_lockres; 1846 1847 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1848 1849 /* 1850 * The file system may already holding a PRMODE/EXMODE open lock. 1851 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on 1852 * other nodes and the -EAGAIN will indicate to the caller that 1853 * this inode is still in use. 1854 */ 1855 status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0); 1856 1857 out: 1858 return status; 1859 } 1860 1861 /* 1862 * ocfs2_open_unlock unlock PR and EX mode open locks. 1863 */ 1864 void ocfs2_open_unlock(struct inode *inode) 1865 { 1866 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres; 1867 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1868 1869 mlog(0, "inode %llu drop open lock\n", 1870 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1871 1872 if (ocfs2_mount_local(osb)) 1873 goto out; 1874 1875 if(lockres->l_ro_holders) 1876 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_PR); 1877 if(lockres->l_ex_holders) 1878 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 1879 1880 out: 1881 return; 1882 } 1883 1884 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, 1885 int level) 1886 { 1887 int ret; 1888 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1889 unsigned long flags; 1890 struct ocfs2_mask_waiter mw; 1891 1892 ocfs2_init_mask_waiter(&mw); 1893 1894 retry_cancel: 1895 spin_lock_irqsave(&lockres->l_lock, flags); 1896 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 1897 ret = ocfs2_prepare_cancel_convert(osb, lockres); 1898 if (ret) { 1899 spin_unlock_irqrestore(&lockres->l_lock, flags); 1900 ret = ocfs2_cancel_convert(osb, lockres); 1901 if (ret < 0) { 1902 mlog_errno(ret); 1903 goto out; 1904 } 1905 goto retry_cancel; 1906 } 1907 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1908 spin_unlock_irqrestore(&lockres->l_lock, flags); 1909 1910 ocfs2_wait_for_mask(&mw); 1911 goto retry_cancel; 1912 } 1913 1914 ret = -ERESTARTSYS; 1915 /* 1916 * We may still have gotten the lock, in which case there's no 1917 * point to restarting the syscall. 1918 */ 1919 if (lockres->l_level == level) 1920 ret = 0; 1921 1922 mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret, 1923 lockres->l_flags, lockres->l_level, lockres->l_action); 1924 1925 spin_unlock_irqrestore(&lockres->l_lock, flags); 1926 1927 out: 1928 return ret; 1929 } 1930 1931 /* 1932 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of 1933 * flock() calls. The locking approach this requires is sufficiently 1934 * different from all other cluster lock types that we implement a 1935 * separate path to the "low-level" dlm calls. In particular: 1936 * 1937 * - No optimization of lock levels is done - we take at exactly 1938 * what's been requested. 1939 * 1940 * - No lock caching is employed. We immediately downconvert to 1941 * no-lock at unlock time. This also means flock locks never go on 1942 * the blocking list). 1943 * 1944 * - Since userspace can trivially deadlock itself with flock, we make 1945 * sure to allow cancellation of a misbehaving applications flock() 1946 * request. 1947 * 1948 * - Access to any flock lockres doesn't require concurrency, so we 1949 * can simplify the code by requiring the caller to guarantee 1950 * serialization of dlmglue flock calls. 1951 */ 1952 int ocfs2_file_lock(struct file *file, int ex, int trylock) 1953 { 1954 int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1955 unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0; 1956 unsigned long flags; 1957 struct ocfs2_file_private *fp = file->private_data; 1958 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1959 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1960 struct ocfs2_mask_waiter mw; 1961 1962 ocfs2_init_mask_waiter(&mw); 1963 1964 if ((lockres->l_flags & OCFS2_LOCK_BUSY) || 1965 (lockres->l_level > DLM_LOCK_NL)) { 1966 mlog(ML_ERROR, 1967 "File lock \"%s\" has busy or locked state: flags: 0x%lx, " 1968 "level: %u\n", lockres->l_name, lockres->l_flags, 1969 lockres->l_level); 1970 return -EINVAL; 1971 } 1972 1973 spin_lock_irqsave(&lockres->l_lock, flags); 1974 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1975 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1976 spin_unlock_irqrestore(&lockres->l_lock, flags); 1977 1978 /* 1979 * Get the lock at NLMODE to start - that way we 1980 * can cancel the upconvert request if need be. 1981 */ 1982 ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0); 1983 if (ret < 0) { 1984 mlog_errno(ret); 1985 goto out; 1986 } 1987 1988 ret = ocfs2_wait_for_mask(&mw); 1989 if (ret) { 1990 mlog_errno(ret); 1991 goto out; 1992 } 1993 spin_lock_irqsave(&lockres->l_lock, flags); 1994 } 1995 1996 lockres->l_action = OCFS2_AST_CONVERT; 1997 lkm_flags |= DLM_LKF_CONVERT; 1998 lockres->l_requested = level; 1999 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2000 2001 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 2002 spin_unlock_irqrestore(&lockres->l_lock, flags); 2003 2004 ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags, 2005 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1); 2006 if (ret) { 2007 if (!trylock || (ret != -EAGAIN)) { 2008 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 2009 ret = -EINVAL; 2010 } 2011 2012 ocfs2_recover_from_dlm_error(lockres, 1); 2013 lockres_remove_mask_waiter(lockres, &mw); 2014 goto out; 2015 } 2016 2017 ret = ocfs2_wait_for_mask_interruptible(&mw, lockres); 2018 if (ret == -ERESTARTSYS) { 2019 /* 2020 * Userspace can cause deadlock itself with 2021 * flock(). Current behavior locally is to allow the 2022 * deadlock, but abort the system call if a signal is 2023 * received. We follow this example, otherwise a 2024 * poorly written program could sit in kernel until 2025 * reboot. 2026 * 2027 * Handling this is a bit more complicated for Ocfs2 2028 * though. We can't exit this function with an 2029 * outstanding lock request, so a cancel convert is 2030 * required. We intentionally overwrite 'ret' - if the 2031 * cancel fails and the lock was granted, it's easier 2032 * to just bubble success back up to the user. 2033 */ 2034 ret = ocfs2_flock_handle_signal(lockres, level); 2035 } else if (!ret && (level > lockres->l_level)) { 2036 /* Trylock failed asynchronously */ 2037 BUG_ON(!trylock); 2038 ret = -EAGAIN; 2039 } 2040 2041 out: 2042 2043 mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n", 2044 lockres->l_name, ex, trylock, ret); 2045 return ret; 2046 } 2047 2048 void ocfs2_file_unlock(struct file *file) 2049 { 2050 int ret; 2051 unsigned int gen; 2052 unsigned long flags; 2053 struct ocfs2_file_private *fp = file->private_data; 2054 struct ocfs2_lock_res *lockres = &fp->fp_flock; 2055 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 2056 struct ocfs2_mask_waiter mw; 2057 2058 ocfs2_init_mask_waiter(&mw); 2059 2060 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) 2061 return; 2062 2063 if (lockres->l_level == DLM_LOCK_NL) 2064 return; 2065 2066 mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", 2067 lockres->l_name, lockres->l_flags, lockres->l_level, 2068 lockres->l_action); 2069 2070 spin_lock_irqsave(&lockres->l_lock, flags); 2071 /* 2072 * Fake a blocking ast for the downconvert code. 2073 */ 2074 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 2075 lockres->l_blocking = DLM_LOCK_EX; 2076 2077 gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL); 2078 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 2079 spin_unlock_irqrestore(&lockres->l_lock, flags); 2080 2081 ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen); 2082 if (ret) { 2083 mlog_errno(ret); 2084 return; 2085 } 2086 2087 ret = ocfs2_wait_for_mask(&mw); 2088 if (ret) 2089 mlog_errno(ret); 2090 } 2091 2092 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 2093 struct ocfs2_lock_res *lockres) 2094 { 2095 int kick = 0; 2096 2097 /* If we know that another node is waiting on our lock, kick 2098 * the downconvert thread * pre-emptively when we reach a release 2099 * condition. */ 2100 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 2101 switch(lockres->l_blocking) { 2102 case DLM_LOCK_EX: 2103 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 2104 kick = 1; 2105 break; 2106 case DLM_LOCK_PR: 2107 if (!lockres->l_ex_holders) 2108 kick = 1; 2109 break; 2110 default: 2111 BUG(); 2112 } 2113 } 2114 2115 if (kick) 2116 ocfs2_wake_downconvert_thread(osb); 2117 } 2118 2119 #define OCFS2_SEC_BITS 34 2120 #define OCFS2_SEC_SHIFT (64 - 34) 2121 #define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 2122 2123 /* LVB only has room for 64 bits of time here so we pack it for 2124 * now. */ 2125 static u64 ocfs2_pack_timespec(struct timespec *spec) 2126 { 2127 u64 res; 2128 u64 sec = spec->tv_sec; 2129 u32 nsec = spec->tv_nsec; 2130 2131 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 2132 2133 return res; 2134 } 2135 2136 /* Call this with the lockres locked. I am reasonably sure we don't 2137 * need ip_lock in this function as anyone who would be changing those 2138 * values is supposed to be blocked in ocfs2_inode_lock right now. */ 2139 static void __ocfs2_stuff_meta_lvb(struct inode *inode) 2140 { 2141 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2142 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2143 struct ocfs2_meta_lvb *lvb; 2144 2145 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2146 2147 /* 2148 * Invalidate the LVB of a deleted inode - this way other 2149 * nodes are forced to go to disk and discover the new inode 2150 * status. 2151 */ 2152 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2153 lvb->lvb_version = 0; 2154 goto out; 2155 } 2156 2157 lvb->lvb_version = OCFS2_LVB_VERSION; 2158 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 2159 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 2160 lvb->lvb_iuid = cpu_to_be32(i_uid_read(inode)); 2161 lvb->lvb_igid = cpu_to_be32(i_gid_read(inode)); 2162 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 2163 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 2164 lvb->lvb_iatime_packed = 2165 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime)); 2166 lvb->lvb_ictime_packed = 2167 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime)); 2168 lvb->lvb_imtime_packed = 2169 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime)); 2170 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 2171 lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features); 2172 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 2173 2174 out: 2175 mlog_meta_lvb(0, lockres); 2176 } 2177 2178 static void ocfs2_unpack_timespec(struct timespec *spec, 2179 u64 packed_time) 2180 { 2181 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 2182 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 2183 } 2184 2185 static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 2186 { 2187 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2188 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2189 struct ocfs2_meta_lvb *lvb; 2190 2191 mlog_meta_lvb(0, lockres); 2192 2193 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2194 2195 /* We're safe here without the lockres lock... */ 2196 spin_lock(&oi->ip_lock); 2197 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 2198 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 2199 2200 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 2201 oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures); 2202 ocfs2_set_inode_flags(inode); 2203 2204 /* fast-symlinks are a special case */ 2205 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 2206 inode->i_blocks = 0; 2207 else 2208 inode->i_blocks = ocfs2_inode_sector_count(inode); 2209 2210 i_uid_write(inode, be32_to_cpu(lvb->lvb_iuid)); 2211 i_gid_write(inode, be32_to_cpu(lvb->lvb_igid)); 2212 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 2213 set_nlink(inode, be16_to_cpu(lvb->lvb_inlink)); 2214 ocfs2_unpack_timespec(&inode->i_atime, 2215 be64_to_cpu(lvb->lvb_iatime_packed)); 2216 ocfs2_unpack_timespec(&inode->i_mtime, 2217 be64_to_cpu(lvb->lvb_imtime_packed)); 2218 ocfs2_unpack_timespec(&inode->i_ctime, 2219 be64_to_cpu(lvb->lvb_ictime_packed)); 2220 spin_unlock(&oi->ip_lock); 2221 } 2222 2223 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 2224 struct ocfs2_lock_res *lockres) 2225 { 2226 struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2227 2228 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) 2229 && lvb->lvb_version == OCFS2_LVB_VERSION 2230 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 2231 return 1; 2232 return 0; 2233 } 2234 2235 /* Determine whether a lock resource needs to be refreshed, and 2236 * arbitrate who gets to refresh it. 2237 * 2238 * 0 means no refresh needed. 2239 * 2240 * > 0 means you need to refresh this and you MUST call 2241 * ocfs2_complete_lock_res_refresh afterwards. */ 2242 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 2243 { 2244 unsigned long flags; 2245 int status = 0; 2246 2247 refresh_check: 2248 spin_lock_irqsave(&lockres->l_lock, flags); 2249 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 2250 spin_unlock_irqrestore(&lockres->l_lock, flags); 2251 goto bail; 2252 } 2253 2254 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 2255 spin_unlock_irqrestore(&lockres->l_lock, flags); 2256 2257 ocfs2_wait_on_refreshing_lock(lockres); 2258 goto refresh_check; 2259 } 2260 2261 /* Ok, I'll be the one to refresh this lock. */ 2262 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 2263 spin_unlock_irqrestore(&lockres->l_lock, flags); 2264 2265 status = 1; 2266 bail: 2267 mlog(0, "status %d\n", status); 2268 return status; 2269 } 2270 2271 /* If status is non zero, I'll mark it as not being in refresh 2272 * anymroe, but i won't clear the needs refresh flag. */ 2273 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 2274 int status) 2275 { 2276 unsigned long flags; 2277 2278 spin_lock_irqsave(&lockres->l_lock, flags); 2279 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 2280 if (!status) 2281 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 2282 spin_unlock_irqrestore(&lockres->l_lock, flags); 2283 2284 wake_up(&lockres->l_event); 2285 } 2286 2287 /* may or may not return a bh if it went to disk. */ 2288 static int ocfs2_inode_lock_update(struct inode *inode, 2289 struct buffer_head **bh) 2290 { 2291 int status = 0; 2292 struct ocfs2_inode_info *oi = OCFS2_I(inode); 2293 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 2294 struct ocfs2_dinode *fe; 2295 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2296 2297 if (ocfs2_mount_local(osb)) 2298 goto bail; 2299 2300 spin_lock(&oi->ip_lock); 2301 if (oi->ip_flags & OCFS2_INODE_DELETED) { 2302 mlog(0, "Orphaned inode %llu was deleted while we " 2303 "were waiting on a lock. ip_flags = 0x%x\n", 2304 (unsigned long long)oi->ip_blkno, oi->ip_flags); 2305 spin_unlock(&oi->ip_lock); 2306 status = -ENOENT; 2307 goto bail; 2308 } 2309 spin_unlock(&oi->ip_lock); 2310 2311 if (!ocfs2_should_refresh_lock_res(lockres)) 2312 goto bail; 2313 2314 /* This will discard any caching information we might have had 2315 * for the inode metadata. */ 2316 ocfs2_metadata_cache_purge(INODE_CACHE(inode)); 2317 2318 ocfs2_extent_map_trunc(inode, 0); 2319 2320 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 2321 mlog(0, "Trusting LVB on inode %llu\n", 2322 (unsigned long long)oi->ip_blkno); 2323 ocfs2_refresh_inode_from_lvb(inode); 2324 } else { 2325 /* Boo, we have to go to disk. */ 2326 /* read bh, cast, ocfs2_refresh_inode */ 2327 status = ocfs2_read_inode_block(inode, bh); 2328 if (status < 0) { 2329 mlog_errno(status); 2330 goto bail_refresh; 2331 } 2332 fe = (struct ocfs2_dinode *) (*bh)->b_data; 2333 2334 /* This is a good chance to make sure we're not 2335 * locking an invalid object. ocfs2_read_inode_block() 2336 * already checked that the inode block is sane. 2337 * 2338 * We bug on a stale inode here because we checked 2339 * above whether it was wiped from disk. The wiping 2340 * node provides a guarantee that we receive that 2341 * message and can mark the inode before dropping any 2342 * locks associated with it. */ 2343 mlog_bug_on_msg(inode->i_generation != 2344 le32_to_cpu(fe->i_generation), 2345 "Invalid dinode %llu disk generation: %u " 2346 "inode->i_generation: %u\n", 2347 (unsigned long long)oi->ip_blkno, 2348 le32_to_cpu(fe->i_generation), 2349 inode->i_generation); 2350 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 2351 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 2352 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 2353 (unsigned long long)oi->ip_blkno, 2354 (unsigned long long)le64_to_cpu(fe->i_dtime), 2355 le32_to_cpu(fe->i_flags)); 2356 2357 ocfs2_refresh_inode(inode, fe); 2358 ocfs2_track_lock_refresh(lockres); 2359 } 2360 2361 status = 0; 2362 bail_refresh: 2363 ocfs2_complete_lock_res_refresh(lockres, status); 2364 bail: 2365 return status; 2366 } 2367 2368 static int ocfs2_assign_bh(struct inode *inode, 2369 struct buffer_head **ret_bh, 2370 struct buffer_head *passed_bh) 2371 { 2372 int status; 2373 2374 if (passed_bh) { 2375 /* Ok, the update went to disk for us, use the 2376 * returned bh. */ 2377 *ret_bh = passed_bh; 2378 get_bh(*ret_bh); 2379 2380 return 0; 2381 } 2382 2383 status = ocfs2_read_inode_block(inode, ret_bh); 2384 if (status < 0) 2385 mlog_errno(status); 2386 2387 return status; 2388 } 2389 2390 /* 2391 * returns < 0 error if the callback will never be called, otherwise 2392 * the result of the lock will be communicated via the callback. 2393 */ 2394 int ocfs2_inode_lock_full_nested(struct inode *inode, 2395 struct buffer_head **ret_bh, 2396 int ex, 2397 int arg_flags, 2398 int subclass) 2399 { 2400 int status, level, acquired; 2401 u32 dlm_flags; 2402 struct ocfs2_lock_res *lockres = NULL; 2403 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2404 struct buffer_head *local_bh = NULL; 2405 2406 mlog(0, "inode %llu, take %s META lock\n", 2407 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2408 ex ? "EXMODE" : "PRMODE"); 2409 2410 status = 0; 2411 acquired = 0; 2412 /* We'll allow faking a readonly metadata lock for 2413 * rodevices. */ 2414 if (ocfs2_is_hard_readonly(osb)) { 2415 if (ex) 2416 status = -EROFS; 2417 goto getbh; 2418 } 2419 2420 if ((arg_flags & OCFS2_META_LOCK_GETBH) || 2421 ocfs2_mount_local(osb)) 2422 goto update; 2423 2424 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2425 ocfs2_wait_for_recovery(osb); 2426 2427 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2428 level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2429 dlm_flags = 0; 2430 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 2431 dlm_flags |= DLM_LKF_NOQUEUE; 2432 2433 status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags, 2434 arg_flags, subclass, _RET_IP_); 2435 if (status < 0) { 2436 if (status != -EAGAIN) 2437 mlog_errno(status); 2438 goto bail; 2439 } 2440 2441 /* Notify the error cleanup path to drop the cluster lock. */ 2442 acquired = 1; 2443 2444 /* We wait twice because a node may have died while we were in 2445 * the lower dlm layers. The second time though, we've 2446 * committed to owning this lock so we don't allow signals to 2447 * abort the operation. */ 2448 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 2449 ocfs2_wait_for_recovery(osb); 2450 2451 update: 2452 /* 2453 * We only see this flag if we're being called from 2454 * ocfs2_read_locked_inode(). It means we're locking an inode 2455 * which hasn't been populated yet, so clear the refresh flag 2456 * and let the caller handle it. 2457 */ 2458 if (inode->i_state & I_NEW) { 2459 status = 0; 2460 if (lockres) 2461 ocfs2_complete_lock_res_refresh(lockres, 0); 2462 goto bail; 2463 } 2464 2465 /* This is fun. The caller may want a bh back, or it may 2466 * not. ocfs2_inode_lock_update definitely wants one in, but 2467 * may or may not read one, depending on what's in the 2468 * LVB. The result of all of this is that we've *only* gone to 2469 * disk if we have to, so the complexity is worthwhile. */ 2470 status = ocfs2_inode_lock_update(inode, &local_bh); 2471 if (status < 0) { 2472 if (status != -ENOENT) 2473 mlog_errno(status); 2474 goto bail; 2475 } 2476 getbh: 2477 if (ret_bh) { 2478 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 2479 if (status < 0) { 2480 mlog_errno(status); 2481 goto bail; 2482 } 2483 } 2484 2485 bail: 2486 if (status < 0) { 2487 if (ret_bh && (*ret_bh)) { 2488 brelse(*ret_bh); 2489 *ret_bh = NULL; 2490 } 2491 if (acquired) 2492 ocfs2_inode_unlock(inode, ex); 2493 } 2494 2495 if (local_bh) 2496 brelse(local_bh); 2497 2498 return status; 2499 } 2500 2501 /* 2502 * This is working around a lock inversion between tasks acquiring DLM 2503 * locks while holding a page lock and the downconvert thread which 2504 * blocks dlm lock acquiry while acquiring page locks. 2505 * 2506 * ** These _with_page variantes are only intended to be called from aop 2507 * methods that hold page locks and return a very specific *positive* error 2508 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 2509 * 2510 * The DLM is called such that it returns -EAGAIN if it would have 2511 * blocked waiting for the downconvert thread. In that case we unlock 2512 * our page so the downconvert thread can make progress. Once we've 2513 * done this we have to return AOP_TRUNCATED_PAGE so the aop method 2514 * that called us can bubble that back up into the VFS who will then 2515 * immediately retry the aop call. 2516 */ 2517 int ocfs2_inode_lock_with_page(struct inode *inode, 2518 struct buffer_head **ret_bh, 2519 int ex, 2520 struct page *page) 2521 { 2522 int ret; 2523 2524 ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); 2525 if (ret == -EAGAIN) { 2526 unlock_page(page); 2527 /* 2528 * If we can't get inode lock immediately, we should not return 2529 * directly here, since this will lead to a softlockup problem. 2530 * The method is to get a blocking lock and immediately unlock 2531 * before returning, this can avoid CPU resource waste due to 2532 * lots of retries, and benefits fairness in getting lock. 2533 */ 2534 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0) 2535 ocfs2_inode_unlock(inode, ex); 2536 ret = AOP_TRUNCATED_PAGE; 2537 } 2538 2539 return ret; 2540 } 2541 2542 int ocfs2_inode_lock_atime(struct inode *inode, 2543 struct vfsmount *vfsmnt, 2544 int *level, int wait) 2545 { 2546 int ret; 2547 2548 if (wait) 2549 ret = ocfs2_inode_lock(inode, NULL, 0); 2550 else 2551 ret = ocfs2_try_inode_lock(inode, NULL, 0); 2552 2553 if (ret < 0) { 2554 if (ret != -EAGAIN) 2555 mlog_errno(ret); 2556 return ret; 2557 } 2558 2559 /* 2560 * If we should update atime, we will get EX lock, 2561 * otherwise we just get PR lock. 2562 */ 2563 if (ocfs2_should_update_atime(inode, vfsmnt)) { 2564 struct buffer_head *bh = NULL; 2565 2566 ocfs2_inode_unlock(inode, 0); 2567 if (wait) 2568 ret = ocfs2_inode_lock(inode, &bh, 1); 2569 else 2570 ret = ocfs2_try_inode_lock(inode, &bh, 1); 2571 2572 if (ret < 0) { 2573 if (ret != -EAGAIN) 2574 mlog_errno(ret); 2575 return ret; 2576 } 2577 *level = 1; 2578 if (ocfs2_should_update_atime(inode, vfsmnt)) 2579 ocfs2_update_inode_atime(inode, bh); 2580 if (bh) 2581 brelse(bh); 2582 } else 2583 *level = 0; 2584 2585 return ret; 2586 } 2587 2588 void ocfs2_inode_unlock(struct inode *inode, 2589 int ex) 2590 { 2591 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2592 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; 2593 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2594 2595 mlog(0, "inode %llu drop %s META lock\n", 2596 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2597 ex ? "EXMODE" : "PRMODE"); 2598 2599 if (!ocfs2_is_hard_readonly(osb) && 2600 !ocfs2_mount_local(osb)) 2601 ocfs2_cluster_unlock(osb, lockres, level); 2602 } 2603 2604 /* 2605 * This _tracker variantes are introduced to deal with the recursive cluster 2606 * locking issue. The idea is to keep track of a lock holder on the stack of 2607 * the current process. If there's a lock holder on the stack, we know the 2608 * task context is already protected by cluster locking. Currently, they're 2609 * used in some VFS entry routines. 2610 * 2611 * return < 0 on error, return == 0 if there's no lock holder on the stack 2612 * before this call, return == 1 if this call would be a recursive locking. 2613 */ 2614 int ocfs2_inode_lock_tracker(struct inode *inode, 2615 struct buffer_head **ret_bh, 2616 int ex, 2617 struct ocfs2_lock_holder *oh) 2618 { 2619 int status; 2620 int arg_flags = 0, has_locked; 2621 struct ocfs2_lock_res *lockres; 2622 2623 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2624 has_locked = ocfs2_is_locked_by_me(lockres); 2625 /* Just get buffer head if the cluster lock has been taken */ 2626 if (has_locked) 2627 arg_flags = OCFS2_META_LOCK_GETBH; 2628 2629 if (likely(!has_locked || ret_bh)) { 2630 status = ocfs2_inode_lock_full(inode, ret_bh, ex, arg_flags); 2631 if (status < 0) { 2632 if (status != -ENOENT) 2633 mlog_errno(status); 2634 return status; 2635 } 2636 } 2637 if (!has_locked) 2638 ocfs2_add_holder(lockres, oh); 2639 2640 return has_locked; 2641 } 2642 2643 void ocfs2_inode_unlock_tracker(struct inode *inode, 2644 int ex, 2645 struct ocfs2_lock_holder *oh, 2646 int had_lock) 2647 { 2648 struct ocfs2_lock_res *lockres; 2649 2650 lockres = &OCFS2_I(inode)->ip_inode_lockres; 2651 /* had_lock means that the currect process already takes the cluster 2652 * lock previously. If had_lock is 1, we have nothing to do here, and 2653 * it will get unlocked where we got the lock. 2654 */ 2655 if (!had_lock) { 2656 ocfs2_remove_holder(lockres, oh); 2657 ocfs2_inode_unlock(inode, ex); 2658 } 2659 } 2660 2661 int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno) 2662 { 2663 struct ocfs2_lock_res *lockres; 2664 struct ocfs2_orphan_scan_lvb *lvb; 2665 int status = 0; 2666 2667 if (ocfs2_is_hard_readonly(osb)) 2668 return -EROFS; 2669 2670 if (ocfs2_mount_local(osb)) 2671 return 0; 2672 2673 lockres = &osb->osb_orphan_scan.os_lockres; 2674 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2675 if (status < 0) 2676 return status; 2677 2678 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2679 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 2680 lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION) 2681 *seqno = be32_to_cpu(lvb->lvb_os_seqno); 2682 else 2683 *seqno = osb->osb_orphan_scan.os_seqno + 1; 2684 2685 return status; 2686 } 2687 2688 void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno) 2689 { 2690 struct ocfs2_lock_res *lockres; 2691 struct ocfs2_orphan_scan_lvb *lvb; 2692 2693 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) { 2694 lockres = &osb->osb_orphan_scan.os_lockres; 2695 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2696 lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION; 2697 lvb->lvb_os_seqno = cpu_to_be32(seqno); 2698 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2699 } 2700 } 2701 2702 int ocfs2_super_lock(struct ocfs2_super *osb, 2703 int ex) 2704 { 2705 int status = 0; 2706 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2707 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2708 2709 if (ocfs2_is_hard_readonly(osb)) 2710 return -EROFS; 2711 2712 if (ocfs2_mount_local(osb)) 2713 goto bail; 2714 2715 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 2716 if (status < 0) { 2717 mlog_errno(status); 2718 goto bail; 2719 } 2720 2721 /* The super block lock path is really in the best position to 2722 * know when resources covered by the lock need to be 2723 * refreshed, so we do it here. Of course, making sense of 2724 * everything is up to the caller :) */ 2725 status = ocfs2_should_refresh_lock_res(lockres); 2726 if (status) { 2727 status = ocfs2_refresh_slot_info(osb); 2728 2729 ocfs2_complete_lock_res_refresh(lockres, status); 2730 2731 if (status < 0) { 2732 ocfs2_cluster_unlock(osb, lockres, level); 2733 mlog_errno(status); 2734 } 2735 ocfs2_track_lock_refresh(lockres); 2736 } 2737 bail: 2738 return status; 2739 } 2740 2741 void ocfs2_super_unlock(struct ocfs2_super *osb, 2742 int ex) 2743 { 2744 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2745 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2746 2747 if (!ocfs2_mount_local(osb)) 2748 ocfs2_cluster_unlock(osb, lockres, level); 2749 } 2750 2751 int ocfs2_rename_lock(struct ocfs2_super *osb) 2752 { 2753 int status; 2754 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2755 2756 if (ocfs2_is_hard_readonly(osb)) 2757 return -EROFS; 2758 2759 if (ocfs2_mount_local(osb)) 2760 return 0; 2761 2762 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2763 if (status < 0) 2764 mlog_errno(status); 2765 2766 return status; 2767 } 2768 2769 void ocfs2_rename_unlock(struct ocfs2_super *osb) 2770 { 2771 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2772 2773 if (!ocfs2_mount_local(osb)) 2774 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2775 } 2776 2777 int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex) 2778 { 2779 int status; 2780 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2781 2782 if (ocfs2_is_hard_readonly(osb)) 2783 return -EROFS; 2784 2785 if (ocfs2_mount_local(osb)) 2786 return 0; 2787 2788 status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE, 2789 0, 0); 2790 if (status < 0) 2791 mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status); 2792 2793 return status; 2794 } 2795 2796 void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex) 2797 { 2798 struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres; 2799 2800 if (!ocfs2_mount_local(osb)) 2801 ocfs2_cluster_unlock(osb, lockres, 2802 ex ? LKM_EXMODE : LKM_PRMODE); 2803 } 2804 2805 int ocfs2_trim_fs_lock(struct ocfs2_super *osb, 2806 struct ocfs2_trim_fs_info *info, int trylock) 2807 { 2808 int status; 2809 struct ocfs2_trim_fs_lvb *lvb; 2810 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 2811 2812 if (info) 2813 info->tf_valid = 0; 2814 2815 if (ocfs2_is_hard_readonly(osb)) 2816 return -EROFS; 2817 2818 if (ocfs2_mount_local(osb)) 2819 return 0; 2820 2821 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 2822 trylock ? DLM_LKF_NOQUEUE : 0, 0); 2823 if (status < 0) { 2824 if (status != -EAGAIN) 2825 mlog_errno(status); 2826 return status; 2827 } 2828 2829 if (info) { 2830 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2831 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 2832 lvb->lvb_version == OCFS2_TRIMFS_LVB_VERSION) { 2833 info->tf_valid = 1; 2834 info->tf_success = lvb->lvb_success; 2835 info->tf_nodenum = be32_to_cpu(lvb->lvb_nodenum); 2836 info->tf_start = be64_to_cpu(lvb->lvb_start); 2837 info->tf_len = be64_to_cpu(lvb->lvb_len); 2838 info->tf_minlen = be64_to_cpu(lvb->lvb_minlen); 2839 info->tf_trimlen = be64_to_cpu(lvb->lvb_trimlen); 2840 } 2841 } 2842 2843 return status; 2844 } 2845 2846 void ocfs2_trim_fs_unlock(struct ocfs2_super *osb, 2847 struct ocfs2_trim_fs_info *info) 2848 { 2849 struct ocfs2_trim_fs_lvb *lvb; 2850 struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres; 2851 2852 if (ocfs2_mount_local(osb)) 2853 return; 2854 2855 if (info) { 2856 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2857 lvb->lvb_version = OCFS2_TRIMFS_LVB_VERSION; 2858 lvb->lvb_success = info->tf_success; 2859 lvb->lvb_nodenum = cpu_to_be32(info->tf_nodenum); 2860 lvb->lvb_start = cpu_to_be64(info->tf_start); 2861 lvb->lvb_len = cpu_to_be64(info->tf_len); 2862 lvb->lvb_minlen = cpu_to_be64(info->tf_minlen); 2863 lvb->lvb_trimlen = cpu_to_be64(info->tf_trimlen); 2864 } 2865 2866 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2867 } 2868 2869 int ocfs2_dentry_lock(struct dentry *dentry, int ex) 2870 { 2871 int ret; 2872 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2873 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2874 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2875 2876 BUG_ON(!dl); 2877 2878 if (ocfs2_is_hard_readonly(osb)) { 2879 if (ex) 2880 return -EROFS; 2881 return 0; 2882 } 2883 2884 if (ocfs2_mount_local(osb)) 2885 return 0; 2886 2887 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 2888 if (ret < 0) 2889 mlog_errno(ret); 2890 2891 return ret; 2892 } 2893 2894 void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 2895 { 2896 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2897 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2898 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2899 2900 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 2901 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 2902 } 2903 2904 /* Reference counting of the dlm debug structure. We want this because 2905 * open references on the debug inodes can live on after a mount, so 2906 * we can't rely on the ocfs2_super to always exist. */ 2907 static void ocfs2_dlm_debug_free(struct kref *kref) 2908 { 2909 struct ocfs2_dlm_debug *dlm_debug; 2910 2911 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 2912 2913 kfree(dlm_debug); 2914 } 2915 2916 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 2917 { 2918 if (dlm_debug) 2919 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 2920 } 2921 2922 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 2923 { 2924 kref_get(&debug->d_refcnt); 2925 } 2926 2927 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 2928 { 2929 struct ocfs2_dlm_debug *dlm_debug; 2930 2931 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 2932 if (!dlm_debug) { 2933 mlog_errno(-ENOMEM); 2934 goto out; 2935 } 2936 2937 kref_init(&dlm_debug->d_refcnt); 2938 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 2939 dlm_debug->d_locking_state = NULL; 2940 out: 2941 return dlm_debug; 2942 } 2943 2944 /* Access to this is arbitrated for us via seq_file->sem. */ 2945 struct ocfs2_dlm_seq_priv { 2946 struct ocfs2_dlm_debug *p_dlm_debug; 2947 struct ocfs2_lock_res p_iter_res; 2948 struct ocfs2_lock_res p_tmp_res; 2949 }; 2950 2951 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 2952 struct ocfs2_dlm_seq_priv *priv) 2953 { 2954 struct ocfs2_lock_res *iter, *ret = NULL; 2955 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 2956 2957 assert_spin_locked(&ocfs2_dlm_tracking_lock); 2958 2959 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 2960 /* discover the head of the list */ 2961 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 2962 mlog(0, "End of list found, %p\n", ret); 2963 break; 2964 } 2965 2966 /* We track our "dummy" iteration lockres' by a NULL 2967 * l_ops field. */ 2968 if (iter->l_ops != NULL) { 2969 ret = iter; 2970 break; 2971 } 2972 } 2973 2974 return ret; 2975 } 2976 2977 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 2978 { 2979 struct ocfs2_dlm_seq_priv *priv = m->private; 2980 struct ocfs2_lock_res *iter; 2981 2982 spin_lock(&ocfs2_dlm_tracking_lock); 2983 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 2984 if (iter) { 2985 /* Since lockres' have the lifetime of their container 2986 * (which can be inodes, ocfs2_supers, etc) we want to 2987 * copy this out to a temporary lockres while still 2988 * under the spinlock. Obviously after this we can't 2989 * trust any pointers on the copy returned, but that's 2990 * ok as the information we want isn't typically held 2991 * in them. */ 2992 priv->p_tmp_res = *iter; 2993 iter = &priv->p_tmp_res; 2994 } 2995 spin_unlock(&ocfs2_dlm_tracking_lock); 2996 2997 return iter; 2998 } 2999 3000 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 3001 { 3002 } 3003 3004 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 3005 { 3006 struct ocfs2_dlm_seq_priv *priv = m->private; 3007 struct ocfs2_lock_res *iter = v; 3008 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 3009 3010 spin_lock(&ocfs2_dlm_tracking_lock); 3011 iter = ocfs2_dlm_next_res(iter, priv); 3012 list_del_init(&dummy->l_debug_list); 3013 if (iter) { 3014 list_add(&dummy->l_debug_list, &iter->l_debug_list); 3015 priv->p_tmp_res = *iter; 3016 iter = &priv->p_tmp_res; 3017 } 3018 spin_unlock(&ocfs2_dlm_tracking_lock); 3019 3020 return iter; 3021 } 3022 3023 /* 3024 * Version is used by debugfs.ocfs2 to determine the format being used 3025 * 3026 * New in version 2 3027 * - Lock stats printed 3028 * New in version 3 3029 * - Max time in lock stats is in usecs (instead of nsecs) 3030 */ 3031 #define OCFS2_DLM_DEBUG_STR_VERSION 3 3032 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 3033 { 3034 int i; 3035 char *lvb; 3036 struct ocfs2_lock_res *lockres = v; 3037 3038 if (!lockres) 3039 return -EINVAL; 3040 3041 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 3042 3043 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 3044 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 3045 lockres->l_name, 3046 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 3047 else 3048 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 3049 3050 seq_printf(m, "%d\t" 3051 "0x%lx\t" 3052 "0x%x\t" 3053 "0x%x\t" 3054 "%u\t" 3055 "%u\t" 3056 "%d\t" 3057 "%d\t", 3058 lockres->l_level, 3059 lockres->l_flags, 3060 lockres->l_action, 3061 lockres->l_unlock_action, 3062 lockres->l_ro_holders, 3063 lockres->l_ex_holders, 3064 lockres->l_requested, 3065 lockres->l_blocking); 3066 3067 /* Dump the raw LVB */ 3068 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 3069 for(i = 0; i < DLM_LVB_LEN; i++) 3070 seq_printf(m, "0x%x\t", lvb[i]); 3071 3072 #ifdef CONFIG_OCFS2_FS_STATS 3073 # define lock_num_prmode(_l) ((_l)->l_lock_prmode.ls_gets) 3074 # define lock_num_exmode(_l) ((_l)->l_lock_exmode.ls_gets) 3075 # define lock_num_prmode_failed(_l) ((_l)->l_lock_prmode.ls_fail) 3076 # define lock_num_exmode_failed(_l) ((_l)->l_lock_exmode.ls_fail) 3077 # define lock_total_prmode(_l) ((_l)->l_lock_prmode.ls_total) 3078 # define lock_total_exmode(_l) ((_l)->l_lock_exmode.ls_total) 3079 # define lock_max_prmode(_l) ((_l)->l_lock_prmode.ls_max) 3080 # define lock_max_exmode(_l) ((_l)->l_lock_exmode.ls_max) 3081 # define lock_refresh(_l) ((_l)->l_lock_refresh) 3082 #else 3083 # define lock_num_prmode(_l) (0) 3084 # define lock_num_exmode(_l) (0) 3085 # define lock_num_prmode_failed(_l) (0) 3086 # define lock_num_exmode_failed(_l) (0) 3087 # define lock_total_prmode(_l) (0ULL) 3088 # define lock_total_exmode(_l) (0ULL) 3089 # define lock_max_prmode(_l) (0) 3090 # define lock_max_exmode(_l) (0) 3091 # define lock_refresh(_l) (0) 3092 #endif 3093 /* The following seq_print was added in version 2 of this output */ 3094 seq_printf(m, "%u\t" 3095 "%u\t" 3096 "%u\t" 3097 "%u\t" 3098 "%llu\t" 3099 "%llu\t" 3100 "%u\t" 3101 "%u\t" 3102 "%u\t", 3103 lock_num_prmode(lockres), 3104 lock_num_exmode(lockres), 3105 lock_num_prmode_failed(lockres), 3106 lock_num_exmode_failed(lockres), 3107 lock_total_prmode(lockres), 3108 lock_total_exmode(lockres), 3109 lock_max_prmode(lockres), 3110 lock_max_exmode(lockres), 3111 lock_refresh(lockres)); 3112 3113 /* End the line */ 3114 seq_printf(m, "\n"); 3115 return 0; 3116 } 3117 3118 static const struct seq_operations ocfs2_dlm_seq_ops = { 3119 .start = ocfs2_dlm_seq_start, 3120 .stop = ocfs2_dlm_seq_stop, 3121 .next = ocfs2_dlm_seq_next, 3122 .show = ocfs2_dlm_seq_show, 3123 }; 3124 3125 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 3126 { 3127 struct seq_file *seq = file->private_data; 3128 struct ocfs2_dlm_seq_priv *priv = seq->private; 3129 struct ocfs2_lock_res *res = &priv->p_iter_res; 3130 3131 ocfs2_remove_lockres_tracking(res); 3132 ocfs2_put_dlm_debug(priv->p_dlm_debug); 3133 return seq_release_private(inode, file); 3134 } 3135 3136 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 3137 { 3138 struct ocfs2_dlm_seq_priv *priv; 3139 struct ocfs2_super *osb; 3140 3141 priv = __seq_open_private(file, &ocfs2_dlm_seq_ops, sizeof(*priv)); 3142 if (!priv) { 3143 mlog_errno(-ENOMEM); 3144 return -ENOMEM; 3145 } 3146 3147 osb = inode->i_private; 3148 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 3149 priv->p_dlm_debug = osb->osb_dlm_debug; 3150 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 3151 3152 ocfs2_add_lockres_tracking(&priv->p_iter_res, 3153 priv->p_dlm_debug); 3154 3155 return 0; 3156 } 3157 3158 static const struct file_operations ocfs2_dlm_debug_fops = { 3159 .open = ocfs2_dlm_debug_open, 3160 .release = ocfs2_dlm_debug_release, 3161 .read = seq_read, 3162 .llseek = seq_lseek, 3163 }; 3164 3165 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 3166 { 3167 int ret = 0; 3168 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 3169 3170 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 3171 S_IFREG|S_IRUSR, 3172 osb->osb_debug_root, 3173 osb, 3174 &ocfs2_dlm_debug_fops); 3175 if (!dlm_debug->d_locking_state) { 3176 ret = -EINVAL; 3177 mlog(ML_ERROR, 3178 "Unable to create locking state debugfs file.\n"); 3179 goto out; 3180 } 3181 3182 ocfs2_get_dlm_debug(dlm_debug); 3183 out: 3184 return ret; 3185 } 3186 3187 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 3188 { 3189 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 3190 3191 if (dlm_debug) { 3192 debugfs_remove(dlm_debug->d_locking_state); 3193 ocfs2_put_dlm_debug(dlm_debug); 3194 } 3195 } 3196 3197 int ocfs2_dlm_init(struct ocfs2_super *osb) 3198 { 3199 int status = 0; 3200 struct ocfs2_cluster_connection *conn = NULL; 3201 3202 if (ocfs2_mount_local(osb)) { 3203 osb->node_num = 0; 3204 goto local; 3205 } 3206 3207 status = ocfs2_dlm_init_debug(osb); 3208 if (status < 0) { 3209 mlog_errno(status); 3210 goto bail; 3211 } 3212 3213 /* launch downconvert thread */ 3214 osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc-%s", 3215 osb->uuid_str); 3216 if (IS_ERR(osb->dc_task)) { 3217 status = PTR_ERR(osb->dc_task); 3218 osb->dc_task = NULL; 3219 mlog_errno(status); 3220 goto bail; 3221 } 3222 3223 /* for now, uuid == domain */ 3224 status = ocfs2_cluster_connect(osb->osb_cluster_stack, 3225 osb->osb_cluster_name, 3226 strlen(osb->osb_cluster_name), 3227 osb->uuid_str, 3228 strlen(osb->uuid_str), 3229 &lproto, ocfs2_do_node_down, osb, 3230 &conn); 3231 if (status) { 3232 mlog_errno(status); 3233 goto bail; 3234 } 3235 3236 status = ocfs2_cluster_this_node(conn, &osb->node_num); 3237 if (status < 0) { 3238 mlog_errno(status); 3239 mlog(ML_ERROR, 3240 "could not find this host's node number\n"); 3241 ocfs2_cluster_disconnect(conn, 0); 3242 goto bail; 3243 } 3244 3245 local: 3246 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 3247 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 3248 ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb); 3249 ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb); 3250 3251 osb->cconn = conn; 3252 bail: 3253 if (status < 0) { 3254 ocfs2_dlm_shutdown_debug(osb); 3255 if (osb->dc_task) 3256 kthread_stop(osb->dc_task); 3257 } 3258 3259 return status; 3260 } 3261 3262 void ocfs2_dlm_shutdown(struct ocfs2_super *osb, 3263 int hangup_pending) 3264 { 3265 ocfs2_drop_osb_locks(osb); 3266 3267 /* 3268 * Now that we have dropped all locks and ocfs2_dismount_volume() 3269 * has disabled recovery, the DLM won't be talking to us. It's 3270 * safe to tear things down before disconnecting the cluster. 3271 */ 3272 3273 if (osb->dc_task) { 3274 kthread_stop(osb->dc_task); 3275 osb->dc_task = NULL; 3276 } 3277 3278 ocfs2_lock_res_free(&osb->osb_super_lockres); 3279 ocfs2_lock_res_free(&osb->osb_rename_lockres); 3280 ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres); 3281 ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres); 3282 3283 ocfs2_cluster_disconnect(osb->cconn, hangup_pending); 3284 osb->cconn = NULL; 3285 3286 ocfs2_dlm_shutdown_debug(osb); 3287 } 3288 3289 static int ocfs2_drop_lock(struct ocfs2_super *osb, 3290 struct ocfs2_lock_res *lockres) 3291 { 3292 int ret; 3293 unsigned long flags; 3294 u32 lkm_flags = 0; 3295 3296 /* We didn't get anywhere near actually using this lockres. */ 3297 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 3298 goto out; 3299 3300 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 3301 lkm_flags |= DLM_LKF_VALBLK; 3302 3303 spin_lock_irqsave(&lockres->l_lock, flags); 3304 3305 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 3306 "lockres %s, flags 0x%lx\n", 3307 lockres->l_name, lockres->l_flags); 3308 3309 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 3310 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 3311 "%u, unlock_action = %u\n", 3312 lockres->l_name, lockres->l_flags, lockres->l_action, 3313 lockres->l_unlock_action); 3314 3315 spin_unlock_irqrestore(&lockres->l_lock, flags); 3316 3317 /* XXX: Today we just wait on any busy 3318 * locks... Perhaps we need to cancel converts in the 3319 * future? */ 3320 ocfs2_wait_on_busy_lock(lockres); 3321 3322 spin_lock_irqsave(&lockres->l_lock, flags); 3323 } 3324 3325 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3326 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 3327 lockres->l_level == DLM_LOCK_EX && 3328 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3329 lockres->l_ops->set_lvb(lockres); 3330 } 3331 3332 if (lockres->l_flags & OCFS2_LOCK_BUSY) 3333 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 3334 lockres->l_name); 3335 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 3336 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 3337 3338 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 3339 spin_unlock_irqrestore(&lockres->l_lock, flags); 3340 goto out; 3341 } 3342 3343 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 3344 3345 /* make sure we never get here while waiting for an ast to 3346 * fire. */ 3347 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 3348 3349 /* is this necessary? */ 3350 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3351 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 3352 spin_unlock_irqrestore(&lockres->l_lock, flags); 3353 3354 mlog(0, "lock %s\n", lockres->l_name); 3355 3356 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags); 3357 if (ret) { 3358 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3359 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 3360 ocfs2_dlm_dump_lksb(&lockres->l_lksb); 3361 BUG(); 3362 } 3363 mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n", 3364 lockres->l_name); 3365 3366 ocfs2_wait_on_busy_lock(lockres); 3367 out: 3368 return 0; 3369 } 3370 3371 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3372 struct ocfs2_lock_res *lockres); 3373 3374 /* Mark the lockres as being dropped. It will no longer be 3375 * queued if blocking, but we still may have to wait on it 3376 * being dequeued from the downconvert thread before we can consider 3377 * it safe to drop. 3378 * 3379 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 3380 void ocfs2_mark_lockres_freeing(struct ocfs2_super *osb, 3381 struct ocfs2_lock_res *lockres) 3382 { 3383 int status; 3384 struct ocfs2_mask_waiter mw; 3385 unsigned long flags, flags2; 3386 3387 ocfs2_init_mask_waiter(&mw); 3388 3389 spin_lock_irqsave(&lockres->l_lock, flags); 3390 lockres->l_flags |= OCFS2_LOCK_FREEING; 3391 if (lockres->l_flags & OCFS2_LOCK_QUEUED && current == osb->dc_task) { 3392 /* 3393 * We know the downconvert is queued but not in progress 3394 * because we are the downconvert thread and processing 3395 * different lock. So we can just remove the lock from the 3396 * queue. This is not only an optimization but also a way 3397 * to avoid the following deadlock: 3398 * ocfs2_dentry_post_unlock() 3399 * ocfs2_dentry_lock_put() 3400 * ocfs2_drop_dentry_lock() 3401 * iput() 3402 * ocfs2_evict_inode() 3403 * ocfs2_clear_inode() 3404 * ocfs2_mark_lockres_freeing() 3405 * ... blocks waiting for OCFS2_LOCK_QUEUED 3406 * since we are the downconvert thread which 3407 * should clear the flag. 3408 */ 3409 spin_unlock_irqrestore(&lockres->l_lock, flags); 3410 spin_lock_irqsave(&osb->dc_task_lock, flags2); 3411 list_del_init(&lockres->l_blocked_list); 3412 osb->blocked_lock_count--; 3413 spin_unlock_irqrestore(&osb->dc_task_lock, flags2); 3414 /* 3415 * Warn if we recurse into another post_unlock call. Strictly 3416 * speaking it isn't a problem but we need to be careful if 3417 * that happens (stack overflow, deadlocks, ...) so warn if 3418 * ocfs2 grows a path for which this can happen. 3419 */ 3420 WARN_ON_ONCE(lockres->l_ops->post_unlock); 3421 /* Since the lock is freeing we don't do much in the fn below */ 3422 ocfs2_process_blocked_lock(osb, lockres); 3423 return; 3424 } 3425 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 3426 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 3427 spin_unlock_irqrestore(&lockres->l_lock, flags); 3428 3429 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 3430 3431 status = ocfs2_wait_for_mask(&mw); 3432 if (status) 3433 mlog_errno(status); 3434 3435 spin_lock_irqsave(&lockres->l_lock, flags); 3436 } 3437 spin_unlock_irqrestore(&lockres->l_lock, flags); 3438 } 3439 3440 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 3441 struct ocfs2_lock_res *lockres) 3442 { 3443 int ret; 3444 3445 ocfs2_mark_lockres_freeing(osb, lockres); 3446 ret = ocfs2_drop_lock(osb, lockres); 3447 if (ret) 3448 mlog_errno(ret); 3449 } 3450 3451 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 3452 { 3453 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 3454 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 3455 ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres); 3456 ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres); 3457 } 3458 3459 int ocfs2_drop_inode_locks(struct inode *inode) 3460 { 3461 int status, err; 3462 3463 /* No need to call ocfs2_mark_lockres_freeing here - 3464 * ocfs2_clear_inode has done it for us. */ 3465 3466 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3467 &OCFS2_I(inode)->ip_open_lockres); 3468 if (err < 0) 3469 mlog_errno(err); 3470 3471 status = err; 3472 3473 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3474 &OCFS2_I(inode)->ip_inode_lockres); 3475 if (err < 0) 3476 mlog_errno(err); 3477 if (err < 0 && !status) 3478 status = err; 3479 3480 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 3481 &OCFS2_I(inode)->ip_rw_lockres); 3482 if (err < 0) 3483 mlog_errno(err); 3484 if (err < 0 && !status) 3485 status = err; 3486 3487 return status; 3488 } 3489 3490 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 3491 int new_level) 3492 { 3493 assert_spin_locked(&lockres->l_lock); 3494 3495 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 3496 3497 if (lockres->l_level <= new_level) { 3498 mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, " 3499 "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, " 3500 "block %d, pgen %d\n", lockres->l_name, lockres->l_level, 3501 new_level, list_empty(&lockres->l_blocked_list), 3502 list_empty(&lockres->l_mask_waiters), lockres->l_type, 3503 lockres->l_flags, lockres->l_ro_holders, 3504 lockres->l_ex_holders, lockres->l_action, 3505 lockres->l_unlock_action, lockres->l_requested, 3506 lockres->l_blocking, lockres->l_pending_gen); 3507 BUG(); 3508 } 3509 3510 mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n", 3511 lockres->l_name, lockres->l_level, new_level, lockres->l_blocking); 3512 3513 lockres->l_action = OCFS2_AST_DOWNCONVERT; 3514 lockres->l_requested = new_level; 3515 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 3516 return lockres_set_pending(lockres); 3517 } 3518 3519 static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 3520 struct ocfs2_lock_res *lockres, 3521 int new_level, 3522 int lvb, 3523 unsigned int generation) 3524 { 3525 int ret; 3526 u32 dlm_flags = DLM_LKF_CONVERT; 3527 3528 mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name, 3529 lockres->l_level, new_level); 3530 3531 /* 3532 * On DLM_LKF_VALBLK, fsdlm behaves differently with o2cb. It always 3533 * expects DLM_LKF_VALBLK being set if the LKB has LVB, so that 3534 * we can recover correctly from node failure. Otherwise, we may get 3535 * invalid LVB in LKB, but without DLM_SBF_VALNOTVALID being set. 3536 */ 3537 if (!ocfs2_is_o2cb_active() && 3538 lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 3539 lvb = 1; 3540 3541 if (lvb) 3542 dlm_flags |= DLM_LKF_VALBLK; 3543 3544 ret = ocfs2_dlm_lock(osb->cconn, 3545 new_level, 3546 &lockres->l_lksb, 3547 dlm_flags, 3548 lockres->l_name, 3549 OCFS2_LOCK_ID_MAX_LEN - 1); 3550 lockres_clear_pending(lockres, generation, osb); 3551 if (ret) { 3552 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 3553 ocfs2_recover_from_dlm_error(lockres, 1); 3554 goto bail; 3555 } 3556 3557 ret = 0; 3558 bail: 3559 return ret; 3560 } 3561 3562 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */ 3563 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 3564 struct ocfs2_lock_res *lockres) 3565 { 3566 assert_spin_locked(&lockres->l_lock); 3567 3568 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 3569 /* If we're already trying to cancel a lock conversion 3570 * then just drop the spinlock and allow the caller to 3571 * requeue this lock. */ 3572 mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name); 3573 return 0; 3574 } 3575 3576 /* were we in a convert when we got the bast fire? */ 3577 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 3578 lockres->l_action != OCFS2_AST_DOWNCONVERT); 3579 /* set things up for the unlockast to know to just 3580 * clear out the ast_action and unset busy, etc. */ 3581 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 3582 3583 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 3584 "lock %s, invalid flags: 0x%lx\n", 3585 lockres->l_name, lockres->l_flags); 3586 3587 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3588 3589 return 1; 3590 } 3591 3592 static int ocfs2_cancel_convert(struct ocfs2_super *osb, 3593 struct ocfs2_lock_res *lockres) 3594 { 3595 int ret; 3596 3597 ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, 3598 DLM_LKF_CANCEL); 3599 if (ret) { 3600 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 3601 ocfs2_recover_from_dlm_error(lockres, 0); 3602 } 3603 3604 mlog(ML_BASTS, "lockres %s\n", lockres->l_name); 3605 3606 return ret; 3607 } 3608 3609 static int ocfs2_unblock_lock(struct ocfs2_super *osb, 3610 struct ocfs2_lock_res *lockres, 3611 struct ocfs2_unblock_ctl *ctl) 3612 { 3613 unsigned long flags; 3614 int blocking; 3615 int new_level; 3616 int level; 3617 int ret = 0; 3618 int set_lvb = 0; 3619 unsigned int gen; 3620 3621 spin_lock_irqsave(&lockres->l_lock, flags); 3622 3623 recheck: 3624 /* 3625 * Is it still blocking? If not, we have no more work to do. 3626 */ 3627 if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) { 3628 BUG_ON(lockres->l_blocking != DLM_LOCK_NL); 3629 spin_unlock_irqrestore(&lockres->l_lock, flags); 3630 ret = 0; 3631 goto leave; 3632 } 3633 3634 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 3635 /* XXX 3636 * This is a *big* race. The OCFS2_LOCK_PENDING flag 3637 * exists entirely for one reason - another thread has set 3638 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock(). 3639 * 3640 * If we do ocfs2_cancel_convert() before the other thread 3641 * calls dlm_lock(), our cancel will do nothing. We will 3642 * get no ast, and we will have no way of knowing the 3643 * cancel failed. Meanwhile, the other thread will call 3644 * into dlm_lock() and wait...forever. 3645 * 3646 * Why forever? Because another node has asked for the 3647 * lock first; that's why we're here in unblock_lock(). 3648 * 3649 * The solution is OCFS2_LOCK_PENDING. When PENDING is 3650 * set, we just requeue the unblock. Only when the other 3651 * thread has called dlm_lock() and cleared PENDING will 3652 * we then cancel their request. 3653 * 3654 * All callers of dlm_lock() must set OCFS2_DLM_PENDING 3655 * at the same time they set OCFS2_DLM_BUSY. They must 3656 * clear OCFS2_DLM_PENDING after dlm_lock() returns. 3657 */ 3658 if (lockres->l_flags & OCFS2_LOCK_PENDING) { 3659 mlog(ML_BASTS, "lockres %s, ReQ: Pending\n", 3660 lockres->l_name); 3661 goto leave_requeue; 3662 } 3663 3664 ctl->requeue = 1; 3665 ret = ocfs2_prepare_cancel_convert(osb, lockres); 3666 spin_unlock_irqrestore(&lockres->l_lock, flags); 3667 if (ret) { 3668 ret = ocfs2_cancel_convert(osb, lockres); 3669 if (ret < 0) 3670 mlog_errno(ret); 3671 } 3672 goto leave; 3673 } 3674 3675 /* 3676 * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is 3677 * set when the ast is received for an upconvert just before the 3678 * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast 3679 * on the heels of the ast, we want to delay the downconvert just 3680 * enough to allow the up requestor to do its task. Because this 3681 * lock is in the blocked queue, the lock will be downconverted 3682 * as soon as the requestor is done with the lock. 3683 */ 3684 if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) 3685 goto leave_requeue; 3686 3687 /* 3688 * How can we block and yet be at NL? We were trying to upconvert 3689 * from NL and got canceled. The code comes back here, and now 3690 * we notice and clear BLOCKING. 3691 */ 3692 if (lockres->l_level == DLM_LOCK_NL) { 3693 BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders); 3694 mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name); 3695 lockres->l_blocking = DLM_LOCK_NL; 3696 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 3697 spin_unlock_irqrestore(&lockres->l_lock, flags); 3698 goto leave; 3699 } 3700 3701 /* if we're blocking an exclusive and we have *any* holders, 3702 * then requeue. */ 3703 if ((lockres->l_blocking == DLM_LOCK_EX) 3704 && (lockres->l_ex_holders || lockres->l_ro_holders)) { 3705 mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n", 3706 lockres->l_name, lockres->l_ex_holders, 3707 lockres->l_ro_holders); 3708 goto leave_requeue; 3709 } 3710 3711 /* If it's a PR we're blocking, then only 3712 * requeue if we've got any EX holders */ 3713 if (lockres->l_blocking == DLM_LOCK_PR && 3714 lockres->l_ex_holders) { 3715 mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n", 3716 lockres->l_name, lockres->l_ex_holders); 3717 goto leave_requeue; 3718 } 3719 3720 /* 3721 * Can we get a lock in this state if the holder counts are 3722 * zero? The meta data unblock code used to check this. 3723 */ 3724 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 3725 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) { 3726 mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n", 3727 lockres->l_name); 3728 goto leave_requeue; 3729 } 3730 3731 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 3732 3733 if (lockres->l_ops->check_downconvert 3734 && !lockres->l_ops->check_downconvert(lockres, new_level)) { 3735 mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n", 3736 lockres->l_name); 3737 goto leave_requeue; 3738 } 3739 3740 /* If we get here, then we know that there are no more 3741 * incompatible holders (and anyone asking for an incompatible 3742 * lock is blocked). We can now downconvert the lock */ 3743 if (!lockres->l_ops->downconvert_worker) 3744 goto downconvert; 3745 3746 /* Some lockres types want to do a bit of work before 3747 * downconverting a lock. Allow that here. The worker function 3748 * may sleep, so we save off a copy of what we're blocking as 3749 * it may change while we're not holding the spin lock. */ 3750 blocking = lockres->l_blocking; 3751 level = lockres->l_level; 3752 spin_unlock_irqrestore(&lockres->l_lock, flags); 3753 3754 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 3755 3756 if (ctl->unblock_action == UNBLOCK_STOP_POST) { 3757 mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n", 3758 lockres->l_name); 3759 goto leave; 3760 } 3761 3762 spin_lock_irqsave(&lockres->l_lock, flags); 3763 if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) { 3764 /* If this changed underneath us, then we can't drop 3765 * it just yet. */ 3766 mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, " 3767 "Recheck\n", lockres->l_name, blocking, 3768 lockres->l_blocking, level, lockres->l_level); 3769 goto recheck; 3770 } 3771 3772 downconvert: 3773 ctl->requeue = 0; 3774 3775 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3776 if (lockres->l_level == DLM_LOCK_EX) 3777 set_lvb = 1; 3778 3779 /* 3780 * We only set the lvb if the lock has been fully 3781 * refreshed - otherwise we risk setting stale 3782 * data. Otherwise, there's no need to actually clear 3783 * out the lvb here as it's value is still valid. 3784 */ 3785 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3786 lockres->l_ops->set_lvb(lockres); 3787 } 3788 3789 gen = ocfs2_prepare_downconvert(lockres, new_level); 3790 spin_unlock_irqrestore(&lockres->l_lock, flags); 3791 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb, 3792 gen); 3793 3794 leave: 3795 if (ret) 3796 mlog_errno(ret); 3797 return ret; 3798 3799 leave_requeue: 3800 spin_unlock_irqrestore(&lockres->l_lock, flags); 3801 ctl->requeue = 1; 3802 3803 return 0; 3804 } 3805 3806 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 3807 int blocking) 3808 { 3809 struct inode *inode; 3810 struct address_space *mapping; 3811 struct ocfs2_inode_info *oi; 3812 3813 inode = ocfs2_lock_res_inode(lockres); 3814 mapping = inode->i_mapping; 3815 3816 if (S_ISDIR(inode->i_mode)) { 3817 oi = OCFS2_I(inode); 3818 oi->ip_dir_lock_gen++; 3819 mlog(0, "generation: %u\n", oi->ip_dir_lock_gen); 3820 goto out; 3821 } 3822 3823 if (!S_ISREG(inode->i_mode)) 3824 goto out; 3825 3826 /* 3827 * We need this before the filemap_fdatawrite() so that it can 3828 * transfer the dirty bit from the PTE to the 3829 * page. Unfortunately this means that even for EX->PR 3830 * downconverts, we'll lose our mappings and have to build 3831 * them up again. 3832 */ 3833 unmap_mapping_range(mapping, 0, 0, 0); 3834 3835 if (filemap_fdatawrite(mapping)) { 3836 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 3837 (unsigned long long)OCFS2_I(inode)->ip_blkno); 3838 } 3839 sync_mapping_buffers(mapping); 3840 if (blocking == DLM_LOCK_EX) { 3841 truncate_inode_pages(mapping, 0); 3842 } else { 3843 /* We only need to wait on the I/O if we're not also 3844 * truncating pages because truncate_inode_pages waits 3845 * for us above. We don't truncate pages if we're 3846 * blocking anything < EXMODE because we want to keep 3847 * them around in that case. */ 3848 filemap_fdatawait(mapping); 3849 } 3850 3851 forget_all_cached_acls(inode); 3852 3853 out: 3854 return UNBLOCK_CONTINUE; 3855 } 3856 3857 static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci, 3858 struct ocfs2_lock_res *lockres, 3859 int new_level) 3860 { 3861 int checkpointed = ocfs2_ci_fully_checkpointed(ci); 3862 3863 BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR); 3864 BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed); 3865 3866 if (checkpointed) 3867 return 1; 3868 3869 ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci))); 3870 return 0; 3871 } 3872 3873 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 3874 int new_level) 3875 { 3876 struct inode *inode = ocfs2_lock_res_inode(lockres); 3877 3878 return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level); 3879 } 3880 3881 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 3882 { 3883 struct inode *inode = ocfs2_lock_res_inode(lockres); 3884 3885 __ocfs2_stuff_meta_lvb(inode); 3886 } 3887 3888 /* 3889 * Does the final reference drop on our dentry lock. Right now this 3890 * happens in the downconvert thread, but we could choose to simplify the 3891 * dlmglue API and push these off to the ocfs2_wq in the future. 3892 */ 3893 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 3894 struct ocfs2_lock_res *lockres) 3895 { 3896 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3897 ocfs2_dentry_lock_put(osb, dl); 3898 } 3899 3900 /* 3901 * d_delete() matching dentries before the lock downconvert. 3902 * 3903 * At this point, any process waiting to destroy the 3904 * dentry_lock due to last ref count is stopped by the 3905 * OCFS2_LOCK_QUEUED flag. 3906 * 3907 * We have two potential problems 3908 * 3909 * 1) If we do the last reference drop on our dentry_lock (via dput) 3910 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 3911 * the downconvert to finish. Instead we take an elevated 3912 * reference and push the drop until after we've completed our 3913 * unblock processing. 3914 * 3915 * 2) There might be another process with a final reference, 3916 * waiting on us to finish processing. If this is the case, we 3917 * detect it and exit out - there's no more dentries anyway. 3918 */ 3919 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 3920 int blocking) 3921 { 3922 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3923 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 3924 struct dentry *dentry; 3925 unsigned long flags; 3926 int extra_ref = 0; 3927 3928 /* 3929 * This node is blocking another node from getting a read 3930 * lock. This happens when we've renamed within a 3931 * directory. We've forced the other nodes to d_delete(), but 3932 * we never actually dropped our lock because it's still 3933 * valid. The downconvert code will retain a PR for this node, 3934 * so there's no further work to do. 3935 */ 3936 if (blocking == DLM_LOCK_PR) 3937 return UNBLOCK_CONTINUE; 3938 3939 /* 3940 * Mark this inode as potentially orphaned. The code in 3941 * ocfs2_delete_inode() will figure out whether it actually 3942 * needs to be freed or not. 3943 */ 3944 spin_lock(&oi->ip_lock); 3945 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 3946 spin_unlock(&oi->ip_lock); 3947 3948 /* 3949 * Yuck. We need to make sure however that the check of 3950 * OCFS2_LOCK_FREEING and the extra reference are atomic with 3951 * respect to a reference decrement or the setting of that 3952 * flag. 3953 */ 3954 spin_lock_irqsave(&lockres->l_lock, flags); 3955 spin_lock(&dentry_attach_lock); 3956 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 3957 && dl->dl_count) { 3958 dl->dl_count++; 3959 extra_ref = 1; 3960 } 3961 spin_unlock(&dentry_attach_lock); 3962 spin_unlock_irqrestore(&lockres->l_lock, flags); 3963 3964 mlog(0, "extra_ref = %d\n", extra_ref); 3965 3966 /* 3967 * We have a process waiting on us in ocfs2_dentry_iput(), 3968 * which means we can't have any more outstanding 3969 * aliases. There's no need to do any more work. 3970 */ 3971 if (!extra_ref) 3972 return UNBLOCK_CONTINUE; 3973 3974 spin_lock(&dentry_attach_lock); 3975 while (1) { 3976 dentry = ocfs2_find_local_alias(dl->dl_inode, 3977 dl->dl_parent_blkno, 1); 3978 if (!dentry) 3979 break; 3980 spin_unlock(&dentry_attach_lock); 3981 3982 if (S_ISDIR(dl->dl_inode->i_mode)) 3983 shrink_dcache_parent(dentry); 3984 3985 mlog(0, "d_delete(%pd);\n", dentry); 3986 3987 /* 3988 * The following dcache calls may do an 3989 * iput(). Normally we don't want that from the 3990 * downconverting thread, but in this case it's ok 3991 * because the requesting node already has an 3992 * exclusive lock on the inode, so it can't be queued 3993 * for a downconvert. 3994 */ 3995 d_delete(dentry); 3996 dput(dentry); 3997 3998 spin_lock(&dentry_attach_lock); 3999 } 4000 spin_unlock(&dentry_attach_lock); 4001 4002 /* 4003 * If we are the last holder of this dentry lock, there is no 4004 * reason to downconvert so skip straight to the unlock. 4005 */ 4006 if (dl->dl_count == 1) 4007 return UNBLOCK_STOP_POST; 4008 4009 return UNBLOCK_CONTINUE_POST; 4010 } 4011 4012 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres, 4013 int new_level) 4014 { 4015 struct ocfs2_refcount_tree *tree = 4016 ocfs2_lock_res_refcount_tree(lockres); 4017 4018 return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level); 4019 } 4020 4021 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres, 4022 int blocking) 4023 { 4024 struct ocfs2_refcount_tree *tree = 4025 ocfs2_lock_res_refcount_tree(lockres); 4026 4027 ocfs2_metadata_cache_purge(&tree->rf_ci); 4028 4029 return UNBLOCK_CONTINUE; 4030 } 4031 4032 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres) 4033 { 4034 struct ocfs2_qinfo_lvb *lvb; 4035 struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres); 4036 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 4037 oinfo->dqi_gi.dqi_type); 4038 4039 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 4040 lvb->lvb_version = OCFS2_QINFO_LVB_VERSION; 4041 lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace); 4042 lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace); 4043 lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms); 4044 lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks); 4045 lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk); 4046 lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry); 4047 } 4048 4049 void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex) 4050 { 4051 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 4052 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 4053 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4054 4055 if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) 4056 ocfs2_cluster_unlock(osb, lockres, level); 4057 } 4058 4059 static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo) 4060 { 4061 struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, 4062 oinfo->dqi_gi.dqi_type); 4063 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 4064 struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 4065 struct buffer_head *bh = NULL; 4066 struct ocfs2_global_disk_dqinfo *gdinfo; 4067 int status = 0; 4068 4069 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) && 4070 lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) { 4071 info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace); 4072 info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace); 4073 oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms); 4074 oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks); 4075 oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk); 4076 oinfo->dqi_gi.dqi_free_entry = 4077 be32_to_cpu(lvb->lvb_free_entry); 4078 } else { 4079 status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode, 4080 oinfo->dqi_giblk, &bh); 4081 if (status) { 4082 mlog_errno(status); 4083 goto bail; 4084 } 4085 gdinfo = (struct ocfs2_global_disk_dqinfo *) 4086 (bh->b_data + OCFS2_GLOBAL_INFO_OFF); 4087 info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace); 4088 info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace); 4089 oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms); 4090 oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks); 4091 oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk); 4092 oinfo->dqi_gi.dqi_free_entry = 4093 le32_to_cpu(gdinfo->dqi_free_entry); 4094 brelse(bh); 4095 ocfs2_track_lock_refresh(lockres); 4096 } 4097 4098 bail: 4099 return status; 4100 } 4101 4102 /* Lock quota info, this function expects at least shared lock on the quota file 4103 * so that we can safely refresh quota info from disk. */ 4104 int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex) 4105 { 4106 struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock; 4107 struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); 4108 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4109 int status = 0; 4110 4111 /* On RO devices, locking really isn't needed... */ 4112 if (ocfs2_is_hard_readonly(osb)) { 4113 if (ex) 4114 status = -EROFS; 4115 goto bail; 4116 } 4117 if (ocfs2_mount_local(osb)) 4118 goto bail; 4119 4120 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 4121 if (status < 0) { 4122 mlog_errno(status); 4123 goto bail; 4124 } 4125 if (!ocfs2_should_refresh_lock_res(lockres)) 4126 goto bail; 4127 /* OK, we have the lock but we need to refresh the quota info */ 4128 status = ocfs2_refresh_qinfo(oinfo); 4129 if (status) 4130 ocfs2_qinfo_unlock(oinfo, ex); 4131 ocfs2_complete_lock_res_refresh(lockres, status); 4132 bail: 4133 return status; 4134 } 4135 4136 int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex) 4137 { 4138 int status; 4139 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4140 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 4141 struct ocfs2_super *osb = lockres->l_priv; 4142 4143 4144 if (ocfs2_is_hard_readonly(osb)) 4145 return -EROFS; 4146 4147 if (ocfs2_mount_local(osb)) 4148 return 0; 4149 4150 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 4151 if (status < 0) 4152 mlog_errno(status); 4153 4154 return status; 4155 } 4156 4157 void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex) 4158 { 4159 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 4160 struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres; 4161 struct ocfs2_super *osb = lockres->l_priv; 4162 4163 if (!ocfs2_mount_local(osb)) 4164 ocfs2_cluster_unlock(osb, lockres, level); 4165 } 4166 4167 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 4168 struct ocfs2_lock_res *lockres) 4169 { 4170 int status; 4171 struct ocfs2_unblock_ctl ctl = {0, 0,}; 4172 unsigned long flags; 4173 4174 /* Our reference to the lockres in this function can be 4175 * considered valid until we remove the OCFS2_LOCK_QUEUED 4176 * flag. */ 4177 4178 BUG_ON(!lockres); 4179 BUG_ON(!lockres->l_ops); 4180 4181 mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name); 4182 4183 /* Detect whether a lock has been marked as going away while 4184 * the downconvert thread was processing other things. A lock can 4185 * still be marked with OCFS2_LOCK_FREEING after this check, 4186 * but short circuiting here will still save us some 4187 * performance. */ 4188 spin_lock_irqsave(&lockres->l_lock, flags); 4189 if (lockres->l_flags & OCFS2_LOCK_FREEING) 4190 goto unqueue; 4191 spin_unlock_irqrestore(&lockres->l_lock, flags); 4192 4193 status = ocfs2_unblock_lock(osb, lockres, &ctl); 4194 if (status < 0) 4195 mlog_errno(status); 4196 4197 spin_lock_irqsave(&lockres->l_lock, flags); 4198 unqueue: 4199 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 4200 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 4201 } else 4202 ocfs2_schedule_blocked_lock(osb, lockres); 4203 4204 mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name, 4205 ctl.requeue ? "yes" : "no"); 4206 spin_unlock_irqrestore(&lockres->l_lock, flags); 4207 4208 if (ctl.unblock_action != UNBLOCK_CONTINUE 4209 && lockres->l_ops->post_unlock) 4210 lockres->l_ops->post_unlock(osb, lockres); 4211 } 4212 4213 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 4214 struct ocfs2_lock_res *lockres) 4215 { 4216 unsigned long flags; 4217 4218 assert_spin_locked(&lockres->l_lock); 4219 4220 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 4221 /* Do not schedule a lock for downconvert when it's on 4222 * the way to destruction - any nodes wanting access 4223 * to the resource will get it soon. */ 4224 mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n", 4225 lockres->l_name, lockres->l_flags); 4226 return; 4227 } 4228 4229 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 4230 4231 spin_lock_irqsave(&osb->dc_task_lock, flags); 4232 if (list_empty(&lockres->l_blocked_list)) { 4233 list_add_tail(&lockres->l_blocked_list, 4234 &osb->blocked_lock_list); 4235 osb->blocked_lock_count++; 4236 } 4237 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4238 } 4239 4240 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) 4241 { 4242 unsigned long processed; 4243 unsigned long flags; 4244 struct ocfs2_lock_res *lockres; 4245 4246 spin_lock_irqsave(&osb->dc_task_lock, flags); 4247 /* grab this early so we know to try again if a state change and 4248 * wake happens part-way through our work */ 4249 osb->dc_work_sequence = osb->dc_wake_sequence; 4250 4251 processed = osb->blocked_lock_count; 4252 /* 4253 * blocked lock processing in this loop might call iput which can 4254 * remove items off osb->blocked_lock_list. Downconvert up to 4255 * 'processed' number of locks, but stop short if we had some 4256 * removed in ocfs2_mark_lockres_freeing when downconverting. 4257 */ 4258 while (processed && !list_empty(&osb->blocked_lock_list)) { 4259 lockres = list_entry(osb->blocked_lock_list.next, 4260 struct ocfs2_lock_res, l_blocked_list); 4261 list_del_init(&lockres->l_blocked_list); 4262 osb->blocked_lock_count--; 4263 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4264 4265 BUG_ON(!processed); 4266 processed--; 4267 4268 ocfs2_process_blocked_lock(osb, lockres); 4269 4270 spin_lock_irqsave(&osb->dc_task_lock, flags); 4271 } 4272 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4273 } 4274 4275 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) 4276 { 4277 int empty = 0; 4278 unsigned long flags; 4279 4280 spin_lock_irqsave(&osb->dc_task_lock, flags); 4281 if (list_empty(&osb->blocked_lock_list)) 4282 empty = 1; 4283 4284 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4285 return empty; 4286 } 4287 4288 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) 4289 { 4290 int should_wake = 0; 4291 unsigned long flags; 4292 4293 spin_lock_irqsave(&osb->dc_task_lock, flags); 4294 if (osb->dc_work_sequence != osb->dc_wake_sequence) 4295 should_wake = 1; 4296 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4297 4298 return should_wake; 4299 } 4300 4301 static int ocfs2_downconvert_thread(void *arg) 4302 { 4303 int status = 0; 4304 struct ocfs2_super *osb = arg; 4305 4306 /* only quit once we've been asked to stop and there is no more 4307 * work available */ 4308 while (!(kthread_should_stop() && 4309 ocfs2_downconvert_thread_lists_empty(osb))) { 4310 4311 wait_event_interruptible(osb->dc_event, 4312 ocfs2_downconvert_thread_should_wake(osb) || 4313 kthread_should_stop()); 4314 4315 mlog(0, "downconvert_thread: awoken\n"); 4316 4317 ocfs2_downconvert_thread_do_work(osb); 4318 } 4319 4320 osb->dc_task = NULL; 4321 return status; 4322 } 4323 4324 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) 4325 { 4326 unsigned long flags; 4327 4328 spin_lock_irqsave(&osb->dc_task_lock, flags); 4329 /* make sure the voting thread gets a swipe at whatever changes 4330 * the caller may have made to the voting state */ 4331 osb->dc_wake_sequence++; 4332 spin_unlock_irqrestore(&osb->dc_task_lock, flags); 4333 wake_up(&osb->dc_event); 4334 } 4335