lock.c (0b7877d4eea3f93e3dd941999522bbd8c538cb53) | lock.c (4875647a08e35f77274838d97ca8fa44158d50e2) |
---|---|
1/****************************************************************************** 2******************************************************************************* 3** 4** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved. 5** 6** This copyrighted material is made available to anyone wishing to use, 7** modify, copy, or redistribute it subject to the terms and conditions 8** of the GNU General Public License v.2. --- 146 unchanged lines hidden (view full) --- 155 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */ 156 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */ 157 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */ 158 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ 159}; 160 161void dlm_print_lkb(struct dlm_lkb *lkb) 162{ | 1/****************************************************************************** 2******************************************************************************* 3** 4** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved. 5** 6** This copyrighted material is made available to anyone wishing to use, 7** modify, copy, or redistribute it subject to the terms and conditions 8** of the GNU General Public License v.2. --- 146 unchanged lines hidden (view full) --- 155 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */ 156 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */ 157 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */ 158 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ 159}; 160 161void dlm_print_lkb(struct dlm_lkb *lkb) 162{ |
163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n" 164 " status %d rqmode %d grmode %d wait_type %d\n", | 163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x " 164 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n", |
165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, 166 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode, | 165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, 166 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode, |
167 lkb->lkb_grmode, lkb->lkb_wait_type); | 167 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid, 168 (unsigned long long)lkb->lkb_recover_seq); |
168} 169 170static void dlm_print_rsb(struct dlm_rsb *r) 171{ 172 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n", 173 r->res_nodeid, r->res_flags, r->res_first_lkid, 174 r->res_recover_locks_count, r->res_name); 175} --- 70 unchanged lines hidden (view full) --- 246 247static inline int is_process_copy(struct dlm_lkb *lkb) 248{ 249 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY)); 250} 251 252static inline int is_master_copy(struct dlm_lkb *lkb) 253{ | 169} 170 171static void dlm_print_rsb(struct dlm_rsb *r) 172{ 173 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n", 174 r->res_nodeid, r->res_flags, r->res_first_lkid, 175 r->res_recover_locks_count, r->res_name); 176} --- 70 unchanged lines hidden (view full) --- 247 248static inline int is_process_copy(struct dlm_lkb *lkb) 249{ 250 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY)); 251} 252 253static inline int is_master_copy(struct dlm_lkb *lkb) 254{ |
254 if (lkb->lkb_flags & DLM_IFL_MSTCPY) 255 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb);); | |
256 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0; 257} 258 259static inline int middle_conversion(struct dlm_lkb *lkb) 260{ 261 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) || 262 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW)) 263 return 1; --- 210 unchanged lines hidden (view full) --- 474 struct dlm_rsb *r; 475 int error; 476 477 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r); 478 if (!error) { 479 kref_get(&r->res_ref); 480 goto out; 481 } | 255 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0; 256} 257 258static inline int middle_conversion(struct dlm_lkb *lkb) 259{ 260 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) || 261 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW)) 262 return 1; --- 210 unchanged lines hidden (view full) --- 473 struct dlm_rsb *r; 474 int error; 475 476 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r); 477 if (!error) { 478 kref_get(&r->res_ref); 479 goto out; 480 } |
481 if (error == -ENOTBLK) 482 goto out; 483 |
|
482 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r); 483 if (error) 484 goto out; 485 486 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 487 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 488 if (error) 489 return error; --- 91 unchanged lines hidden (view full) --- 581 error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep); 582 out_unlock: 583 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 584 out: 585 *r_ret = r; 586 return error; 587} 588 | 484 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r); 485 if (error) 486 goto out; 487 488 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 489 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 490 if (error) 491 return error; --- 91 unchanged lines hidden (view full) --- 583 error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep); 584 out_unlock: 585 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 586 out: 587 *r_ret = r; 588 return error; 589} 590 |
591static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) 592{ 593 struct rb_node *n; 594 struct dlm_rsb *r; 595 int i; 596 597 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 598 spin_lock(&ls->ls_rsbtbl[i].lock); 599 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) { 600 r = rb_entry(n, struct dlm_rsb, res_hashnode); 601 if (r->res_hash == hash) 602 dlm_dump_rsb(r); 603 } 604 spin_unlock(&ls->ls_rsbtbl[i].lock); 605 } 606} 607 |
|
589/* This is only called to add a reference when the code already holds 590 a valid reference to the rsb, so there's no need for locking. */ 591 592static inline void hold_rsb(struct dlm_rsb *r) 593{ 594 kref_get(&r->res_ref); 595} 596 --- 462 unchanged lines hidden (view full) --- 1059 /* N.B. type of reply may not always correspond to type of original 1060 msg due to lookup->request optimization, verify others? */ 1061 1062 if (lkb->lkb_wait_type) { 1063 lkb->lkb_wait_type = 0; 1064 goto out_del; 1065 } 1066 | 608/* This is only called to add a reference when the code already holds 609 a valid reference to the rsb, so there's no need for locking. */ 610 611static inline void hold_rsb(struct dlm_rsb *r) 612{ 613 kref_get(&r->res_ref); 614} 615 --- 462 unchanged lines hidden (view full) --- 1078 /* N.B. type of reply may not always correspond to type of original 1079 msg due to lookup->request optimization, verify others? */ 1080 1081 if (lkb->lkb_wait_type) { 1082 lkb->lkb_wait_type = 0; 1083 goto out_del; 1084 } 1085 |
1067 log_error(ls, "remwait error %x reply %d flags %x no wait_type", 1068 lkb->lkb_id, mstype, lkb->lkb_flags); | 1086 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait", 1087 lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid, 1088 mstype, lkb->lkb_flags); |
1069 return -1; 1070 1071 out_del: 1072 /* the force-unlock/cancel has completed and we haven't recvd a reply 1073 to the op that was in progress prior to the unlock/cancel; we 1074 give up on any reply to the earlier op. FIXME: not sure when/how 1075 this would happen */ 1076 --- 416 unchanged lines hidden (view full) --- 1493 lkb->lkb_grmode = lkb->lkb_rqmode; 1494 if (lkb->lkb_status) 1495 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 1496 else 1497 add_lkb(r, lkb, DLM_LKSTS_GRANTED); 1498 } 1499 1500 lkb->lkb_rqmode = DLM_LOCK_IV; | 1089 return -1; 1090 1091 out_del: 1092 /* the force-unlock/cancel has completed and we haven't recvd a reply 1093 to the op that was in progress prior to the unlock/cancel; we 1094 give up on any reply to the earlier op. FIXME: not sure when/how 1095 this would happen */ 1096 --- 416 unchanged lines hidden (view full) --- 1513 lkb->lkb_grmode = lkb->lkb_rqmode; 1514 if (lkb->lkb_status) 1515 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 1516 else 1517 add_lkb(r, lkb, DLM_LKSTS_GRANTED); 1518 } 1519 1520 lkb->lkb_rqmode = DLM_LOCK_IV; |
1521 lkb->lkb_highbast = 0; |
|
1501} 1502 1503static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1504{ 1505 set_lvb_lock(r, lkb); 1506 _grant_lock(r, lkb); | 1522} 1523 1524static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1525{ 1526 set_lvb_lock(r, lkb); 1527 _grant_lock(r, lkb); |
1507 lkb->lkb_highbast = 0; | |
1508} 1509 1510static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 1511 struct dlm_message *ms) 1512{ 1513 set_lvb_lock_pc(r, lkb, ms); 1514 _grant_lock(r, lkb); 1515} --- 345 unchanged lines hidden (view full) --- 1861 log_prints), we should be able to just call _can_be_granted() and not 1862 bother with the demote/deadlk cases here (and there's no easy way to deal 1863 with a deadlk here, we'd have to generate something like grant_lock with 1864 the deadlk error.) */ 1865 1866/* Returns the highest requested mode of all blocked conversions; sets 1867 cw if there's a blocked conversion to DLM_LOCK_CW. */ 1868 | 1528} 1529 1530static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 1531 struct dlm_message *ms) 1532{ 1533 set_lvb_lock_pc(r, lkb, ms); 1534 _grant_lock(r, lkb); 1535} --- 345 unchanged lines hidden (view full) --- 1881 log_prints), we should be able to just call _can_be_granted() and not 1882 bother with the demote/deadlk cases here (and there's no easy way to deal 1883 with a deadlk here, we'd have to generate something like grant_lock with 1884 the deadlk error.) */ 1885 1886/* Returns the highest requested mode of all blocked conversions; sets 1887 cw if there's a blocked conversion to DLM_LOCK_CW. */ 1888 |
1869static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw) | 1889static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw, 1890 unsigned int *count) |
1870{ 1871 struct dlm_lkb *lkb, *s; 1872 int hi, demoted, quit, grant_restart, demote_restart; 1873 int deadlk; 1874 1875 quit = 0; 1876 restart: 1877 grant_restart = 0; 1878 demote_restart = 0; 1879 hi = DLM_LOCK_IV; 1880 1881 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) { 1882 demoted = is_demoted(lkb); 1883 deadlk = 0; 1884 1885 if (can_be_granted(r, lkb, 0, &deadlk)) { 1886 grant_lock_pending(r, lkb); 1887 grant_restart = 1; | 1891{ 1892 struct dlm_lkb *lkb, *s; 1893 int hi, demoted, quit, grant_restart, demote_restart; 1894 int deadlk; 1895 1896 quit = 0; 1897 restart: 1898 grant_restart = 0; 1899 demote_restart = 0; 1900 hi = DLM_LOCK_IV; 1901 1902 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) { 1903 demoted = is_demoted(lkb); 1904 deadlk = 0; 1905 1906 if (can_be_granted(r, lkb, 0, &deadlk)) { 1907 grant_lock_pending(r, lkb); 1908 grant_restart = 1; |
1909 if (count) 1910 (*count)++; |
|
1888 continue; 1889 } 1890 1891 if (!demoted && is_demoted(lkb)) { 1892 log_print("WARN: pending demoted %x node %d %s", 1893 lkb->lkb_id, lkb->lkb_nodeid, r->res_name); 1894 demote_restart = 1; 1895 continue; --- 17 unchanged lines hidden (view full) --- 1913 if (demote_restart && !quit) { 1914 quit = 1; 1915 goto restart; 1916 } 1917 1918 return max_t(int, high, hi); 1919} 1920 | 1911 continue; 1912 } 1913 1914 if (!demoted && is_demoted(lkb)) { 1915 log_print("WARN: pending demoted %x node %d %s", 1916 lkb->lkb_id, lkb->lkb_nodeid, r->res_name); 1917 demote_restart = 1; 1918 continue; --- 17 unchanged lines hidden (view full) --- 1936 if (demote_restart && !quit) { 1937 quit = 1; 1938 goto restart; 1939 } 1940 1941 return max_t(int, high, hi); 1942} 1943 |
1921static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw) | 1944static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw, 1945 unsigned int *count) |
1922{ 1923 struct dlm_lkb *lkb, *s; 1924 1925 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { | 1946{ 1947 struct dlm_lkb *lkb, *s; 1948 1949 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { |
1926 if (can_be_granted(r, lkb, 0, NULL)) | 1950 if (can_be_granted(r, lkb, 0, NULL)) { |
1927 grant_lock_pending(r, lkb); | 1951 grant_lock_pending(r, lkb); |
1928 else { | 1952 if (count) 1953 (*count)++; 1954 } else { |
1929 high = max_t(int, lkb->lkb_rqmode, high); 1930 if (lkb->lkb_rqmode == DLM_LOCK_CW) 1931 *cw = 1; 1932 } 1933 } 1934 1935 return high; 1936} --- 12 unchanged lines hidden (view full) --- 1949 } 1950 1951 if (gr->lkb_highbast < high && 1952 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1]) 1953 return 1; 1954 return 0; 1955} 1956 | 1955 high = max_t(int, lkb->lkb_rqmode, high); 1956 if (lkb->lkb_rqmode == DLM_LOCK_CW) 1957 *cw = 1; 1958 } 1959 } 1960 1961 return high; 1962} --- 12 unchanged lines hidden (view full) --- 1975 } 1976 1977 if (gr->lkb_highbast < high && 1978 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1]) 1979 return 1; 1980 return 0; 1981} 1982 |
1957static void grant_pending_locks(struct dlm_rsb *r) | 1983static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count) |
1958{ 1959 struct dlm_lkb *lkb, *s; 1960 int high = DLM_LOCK_IV; 1961 int cw = 0; 1962 | 1984{ 1985 struct dlm_lkb *lkb, *s; 1986 int high = DLM_LOCK_IV; 1987 int cw = 0; 1988 |
1963 DLM_ASSERT(is_master(r), dlm_dump_rsb(r);); | 1989 if (!is_master(r)) { 1990 log_print("grant_pending_locks r nodeid %d", r->res_nodeid); 1991 dlm_dump_rsb(r); 1992 return; 1993 } |
1964 | 1994 |
1965 high = grant_pending_convert(r, high, &cw); 1966 high = grant_pending_wait(r, high, &cw); | 1995 high = grant_pending_convert(r, high, &cw, count); 1996 high = grant_pending_wait(r, high, &cw, count); |
1967 1968 if (high == DLM_LOCK_IV) 1969 return; 1970 1971 /* 1972 * If there are locks left on the wait/convert queue then send blocking 1973 * ASTs to granted locks based on the largest requested mode (high) 1974 * found above. --- 519 unchanged lines hidden (view full) --- 2494 2495 /* is_demoted() means the can_be_granted() above set the grmode 2496 to NL, and left us on the granted queue. This auto-demotion 2497 (due to CONVDEADLK) might mean other locks, and/or this lock, are 2498 now grantable. We have to try to grant other converting locks 2499 before we try again to grant this one. */ 2500 2501 if (is_demoted(lkb)) { | 1997 1998 if (high == DLM_LOCK_IV) 1999 return; 2000 2001 /* 2002 * If there are locks left on the wait/convert queue then send blocking 2003 * ASTs to granted locks based on the largest requested mode (high) 2004 * found above. --- 519 unchanged lines hidden (view full) --- 2524 2525 /* is_demoted() means the can_be_granted() above set the grmode 2526 to NL, and left us on the granted queue. This auto-demotion 2527 (due to CONVDEADLK) might mean other locks, and/or this lock, are 2528 now grantable. We have to try to grant other converting locks 2529 before we try again to grant this one. */ 2530 2531 if (is_demoted(lkb)) { |
2502 grant_pending_convert(r, DLM_LOCK_IV, NULL); | 2532 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL); |
2503 if (_can_be_granted(r, lkb, 1)) { 2504 grant_lock(r, lkb); 2505 queue_cast(r, lkb, 0); 2506 goto out; 2507 } 2508 /* else fall through and move to convert queue */ 2509 } 2510 --- 11 unchanged lines hidden (view full) --- 2522 return error; 2523} 2524 2525static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 2526 int error) 2527{ 2528 switch (error) { 2529 case 0: | 2533 if (_can_be_granted(r, lkb, 1)) { 2534 grant_lock(r, lkb); 2535 queue_cast(r, lkb, 0); 2536 goto out; 2537 } 2538 /* else fall through and move to convert queue */ 2539 } 2540 --- 11 unchanged lines hidden (view full) --- 2552 return error; 2553} 2554 2555static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 2556 int error) 2557{ 2558 switch (error) { 2559 case 0: |
2530 grant_pending_locks(r); | 2560 grant_pending_locks(r, NULL); |
2531 /* grant_pending_locks also sends basts */ 2532 break; 2533 case -EAGAIN: 2534 if (force_blocking_asts(lkb)) 2535 send_blocking_asts_all(r, lkb); 2536 break; 2537 case -EINPROGRESS: 2538 send_blocking_asts(r, lkb); --- 6 unchanged lines hidden (view full) --- 2545 remove_lock(r, lkb); 2546 queue_cast(r, lkb, -DLM_EUNLOCK); 2547 return -DLM_EUNLOCK; 2548} 2549 2550static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 2551 int error) 2552{ | 2561 /* grant_pending_locks also sends basts */ 2562 break; 2563 case -EAGAIN: 2564 if (force_blocking_asts(lkb)) 2565 send_blocking_asts_all(r, lkb); 2566 break; 2567 case -EINPROGRESS: 2568 send_blocking_asts(r, lkb); --- 6 unchanged lines hidden (view full) --- 2575 remove_lock(r, lkb); 2576 queue_cast(r, lkb, -DLM_EUNLOCK); 2577 return -DLM_EUNLOCK; 2578} 2579 2580static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 2581 int error) 2582{ |
2553 grant_pending_locks(r); | 2583 grant_pending_locks(r, NULL); |
2554} 2555 2556/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ 2557 2558static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 2559{ 2560 int error; 2561 --- 4 unchanged lines hidden (view full) --- 2566 } 2567 return 0; 2568} 2569 2570static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 2571 int error) 2572{ 2573 if (error) | 2584} 2585 2586/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ 2587 2588static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 2589{ 2590 int error; 2591 --- 4 unchanged lines hidden (view full) --- 2596 } 2597 return 0; 2598} 2599 2600static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 2601 int error) 2602{ 2603 if (error) |
2574 grant_pending_locks(r); | 2604 grant_pending_locks(r, NULL); |
2575} 2576 2577/* 2578 * Four stage 3 varieties: 2579 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock() 2580 */ 2581 2582/* add a new lkb to a possibly new rsb, called by requesting process */ --- 784 unchanged lines hidden (view full) --- 3367 if (error) 3368 log_error(lkb->lkb_resource->res_ls, 3369 "ignore invalid message %d from %d %x %x %x %d", 3370 ms->m_type, from, lkb->lkb_id, lkb->lkb_remid, 3371 lkb->lkb_flags, lkb->lkb_nodeid); 3372 return error; 3373} 3374 | 2605} 2606 2607/* 2608 * Four stage 3 varieties: 2609 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock() 2610 */ 2611 2612/* add a new lkb to a possibly new rsb, called by requesting process */ --- 784 unchanged lines hidden (view full) --- 3397 if (error) 3398 log_error(lkb->lkb_resource->res_ls, 3399 "ignore invalid message %d from %d %x %x %x %d", 3400 ms->m_type, from, lkb->lkb_id, lkb->lkb_remid, 3401 lkb->lkb_flags, lkb->lkb_nodeid); 3402 return error; 3403} 3404 |
3375static void receive_request(struct dlm_ls *ls, struct dlm_message *ms) | 3405static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) |
3376{ 3377 struct dlm_lkb *lkb; 3378 struct dlm_rsb *r; 3379 int error, namelen; 3380 3381 error = create_lkb(ls, &lkb); 3382 if (error) 3383 goto fail; --- 23 unchanged lines hidden (view full) --- 3407 3408 unlock_rsb(r); 3409 put_rsb(r); 3410 3411 if (error == -EINPROGRESS) 3412 error = 0; 3413 if (error) 3414 dlm_put_lkb(lkb); | 3406{ 3407 struct dlm_lkb *lkb; 3408 struct dlm_rsb *r; 3409 int error, namelen; 3410 3411 error = create_lkb(ls, &lkb); 3412 if (error) 3413 goto fail; --- 23 unchanged lines hidden (view full) --- 3437 3438 unlock_rsb(r); 3439 put_rsb(r); 3440 3441 if (error == -EINPROGRESS) 3442 error = 0; 3443 if (error) 3444 dlm_put_lkb(lkb); |
3415 return; | 3445 return 0; |
3416 3417 fail: 3418 setup_stub_lkb(ls, ms); 3419 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); | 3446 3447 fail: 3448 setup_stub_lkb(ls, ms); 3449 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); |
3450 return error; |
|
3420} 3421 | 3451} 3452 |
3422static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms) | 3453static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms) |
3423{ 3424 struct dlm_lkb *lkb; 3425 struct dlm_rsb *r; 3426 int error, reply = 1; 3427 3428 error = find_lkb(ls, ms->m_remid, &lkb); 3429 if (error) 3430 goto fail; 3431 | 3454{ 3455 struct dlm_lkb *lkb; 3456 struct dlm_rsb *r; 3457 int error, reply = 1; 3458 3459 error = find_lkb(ls, ms->m_remid, &lkb); 3460 if (error) 3461 goto fail; 3462 |
3463 if (lkb->lkb_remid != ms->m_lkid) { 3464 log_error(ls, "receive_convert %x remid %x recover_seq %llu " 3465 "remote %d %x", lkb->lkb_id, lkb->lkb_remid, 3466 (unsigned long long)lkb->lkb_recover_seq, 3467 ms->m_header.h_nodeid, ms->m_lkid); 3468 error = -ENOENT; 3469 goto fail; 3470 } 3471 |
|
3432 r = lkb->lkb_resource; 3433 3434 hold_rsb(r); 3435 lock_rsb(r); 3436 3437 error = validate_message(lkb, ms); 3438 if (error) 3439 goto out; --- 11 unchanged lines hidden (view full) --- 3451 error = do_convert(r, lkb); 3452 if (reply) 3453 send_convert_reply(r, lkb, error); 3454 do_convert_effects(r, lkb, error); 3455 out: 3456 unlock_rsb(r); 3457 put_rsb(r); 3458 dlm_put_lkb(lkb); | 3472 r = lkb->lkb_resource; 3473 3474 hold_rsb(r); 3475 lock_rsb(r); 3476 3477 error = validate_message(lkb, ms); 3478 if (error) 3479 goto out; --- 11 unchanged lines hidden (view full) --- 3491 error = do_convert(r, lkb); 3492 if (reply) 3493 send_convert_reply(r, lkb, error); 3494 do_convert_effects(r, lkb, error); 3495 out: 3496 unlock_rsb(r); 3497 put_rsb(r); 3498 dlm_put_lkb(lkb); |
3459 return; | 3499 return 0; |
3460 3461 fail: 3462 setup_stub_lkb(ls, ms); 3463 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); | 3500 3501 fail: 3502 setup_stub_lkb(ls, ms); 3503 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); |
3504 return error; |
|
3464} 3465 | 3505} 3506 |
3466static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) | 3507static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) |
3467{ 3468 struct dlm_lkb *lkb; 3469 struct dlm_rsb *r; 3470 int error; 3471 3472 error = find_lkb(ls, ms->m_remid, &lkb); 3473 if (error) 3474 goto fail; 3475 | 3508{ 3509 struct dlm_lkb *lkb; 3510 struct dlm_rsb *r; 3511 int error; 3512 3513 error = find_lkb(ls, ms->m_remid, &lkb); 3514 if (error) 3515 goto fail; 3516 |
3517 if (lkb->lkb_remid != ms->m_lkid) { 3518 log_error(ls, "receive_unlock %x remid %x remote %d %x", 3519 lkb->lkb_id, lkb->lkb_remid, 3520 ms->m_header.h_nodeid, ms->m_lkid); 3521 error = -ENOENT; 3522 goto fail; 3523 } 3524 |
|
3476 r = lkb->lkb_resource; 3477 3478 hold_rsb(r); 3479 lock_rsb(r); 3480 3481 error = validate_message(lkb, ms); 3482 if (error) 3483 goto out; --- 8 unchanged lines hidden (view full) --- 3492 3493 error = do_unlock(r, lkb); 3494 send_unlock_reply(r, lkb, error); 3495 do_unlock_effects(r, lkb, error); 3496 out: 3497 unlock_rsb(r); 3498 put_rsb(r); 3499 dlm_put_lkb(lkb); | 3525 r = lkb->lkb_resource; 3526 3527 hold_rsb(r); 3528 lock_rsb(r); 3529 3530 error = validate_message(lkb, ms); 3531 if (error) 3532 goto out; --- 8 unchanged lines hidden (view full) --- 3541 3542 error = do_unlock(r, lkb); 3543 send_unlock_reply(r, lkb, error); 3544 do_unlock_effects(r, lkb, error); 3545 out: 3546 unlock_rsb(r); 3547 put_rsb(r); 3548 dlm_put_lkb(lkb); |
3500 return; | 3549 return 0; |
3501 3502 fail: 3503 setup_stub_lkb(ls, ms); 3504 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); | 3550 3551 fail: 3552 setup_stub_lkb(ls, ms); 3553 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); |
3554 return error; |
|
3505} 3506 | 3555} 3556 |
3507static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) | 3557static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) |
3508{ 3509 struct dlm_lkb *lkb; 3510 struct dlm_rsb *r; 3511 int error; 3512 3513 error = find_lkb(ls, ms->m_remid, &lkb); 3514 if (error) 3515 goto fail; --- 11 unchanged lines hidden (view full) --- 3527 3528 error = do_cancel(r, lkb); 3529 send_cancel_reply(r, lkb, error); 3530 do_cancel_effects(r, lkb, error); 3531 out: 3532 unlock_rsb(r); 3533 put_rsb(r); 3534 dlm_put_lkb(lkb); | 3558{ 3559 struct dlm_lkb *lkb; 3560 struct dlm_rsb *r; 3561 int error; 3562 3563 error = find_lkb(ls, ms->m_remid, &lkb); 3564 if (error) 3565 goto fail; --- 11 unchanged lines hidden (view full) --- 3577 3578 error = do_cancel(r, lkb); 3579 send_cancel_reply(r, lkb, error); 3580 do_cancel_effects(r, lkb, error); 3581 out: 3582 unlock_rsb(r); 3583 put_rsb(r); 3584 dlm_put_lkb(lkb); |
3535 return; | 3585 return 0; |
3536 3537 fail: 3538 setup_stub_lkb(ls, ms); 3539 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); | 3586 3587 fail: 3588 setup_stub_lkb(ls, ms); 3589 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); |
3590 return error; |
|
3540} 3541 | 3591} 3592 |
3542static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms) | 3593static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms) |
3543{ 3544 struct dlm_lkb *lkb; 3545 struct dlm_rsb *r; 3546 int error; 3547 3548 error = find_lkb(ls, ms->m_remid, &lkb); | 3594{ 3595 struct dlm_lkb *lkb; 3596 struct dlm_rsb *r; 3597 int error; 3598 3599 error = find_lkb(ls, ms->m_remid, &lkb); |
3549 if (error) { 3550 log_debug(ls, "receive_grant from %d no lkb %x", 3551 ms->m_header.h_nodeid, ms->m_remid); 3552 return; 3553 } | 3600 if (error) 3601 return error; |
3554 3555 r = lkb->lkb_resource; 3556 3557 hold_rsb(r); 3558 lock_rsb(r); 3559 3560 error = validate_message(lkb, ms); 3561 if (error) 3562 goto out; 3563 3564 receive_flags_reply(lkb, ms); 3565 if (is_altmode(lkb)) 3566 munge_altmode(lkb, ms); 3567 grant_lock_pc(r, lkb, ms); 3568 queue_cast(r, lkb, 0); 3569 out: 3570 unlock_rsb(r); 3571 put_rsb(r); 3572 dlm_put_lkb(lkb); | 3602 3603 r = lkb->lkb_resource; 3604 3605 hold_rsb(r); 3606 lock_rsb(r); 3607 3608 error = validate_message(lkb, ms); 3609 if (error) 3610 goto out; 3611 3612 receive_flags_reply(lkb, ms); 3613 if (is_altmode(lkb)) 3614 munge_altmode(lkb, ms); 3615 grant_lock_pc(r, lkb, ms); 3616 queue_cast(r, lkb, 0); 3617 out: 3618 unlock_rsb(r); 3619 put_rsb(r); 3620 dlm_put_lkb(lkb); |
3621 return 0; |
|
3573} 3574 | 3622} 3623 |
3575static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms) | 3624static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms) |
3576{ 3577 struct dlm_lkb *lkb; 3578 struct dlm_rsb *r; 3579 int error; 3580 3581 error = find_lkb(ls, ms->m_remid, &lkb); | 3625{ 3626 struct dlm_lkb *lkb; 3627 struct dlm_rsb *r; 3628 int error; 3629 3630 error = find_lkb(ls, ms->m_remid, &lkb); |
3582 if (error) { 3583 log_debug(ls, "receive_bast from %d no lkb %x", 3584 ms->m_header.h_nodeid, ms->m_remid); 3585 return; 3586 } | 3631 if (error) 3632 return error; |
3587 3588 r = lkb->lkb_resource; 3589 3590 hold_rsb(r); 3591 lock_rsb(r); 3592 3593 error = validate_message(lkb, ms); 3594 if (error) 3595 goto out; 3596 3597 queue_bast(r, lkb, ms->m_bastmode); | 3633 3634 r = lkb->lkb_resource; 3635 3636 hold_rsb(r); 3637 lock_rsb(r); 3638 3639 error = validate_message(lkb, ms); 3640 if (error) 3641 goto out; 3642 3643 queue_bast(r, lkb, ms->m_bastmode); |
3644 lkb->lkb_highbast = ms->m_bastmode; |
|
3598 out: 3599 unlock_rsb(r); 3600 put_rsb(r); 3601 dlm_put_lkb(lkb); | 3645 out: 3646 unlock_rsb(r); 3647 put_rsb(r); 3648 dlm_put_lkb(lkb); |
3649 return 0; |
|
3602} 3603 3604static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) 3605{ 3606 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid; 3607 3608 from_nodeid = ms->m_header.h_nodeid; 3609 our_nodeid = dlm_our_nodeid(); --- 38 unchanged lines hidden (view full) --- 3648 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len); 3649} 3650 3651static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) 3652{ 3653 do_purge(ls, ms->m_nodeid, ms->m_pid); 3654} 3655 | 3650} 3651 3652static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) 3653{ 3654 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid; 3655 3656 from_nodeid = ms->m_header.h_nodeid; 3657 our_nodeid = dlm_our_nodeid(); --- 38 unchanged lines hidden (view full) --- 3696 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len); 3697} 3698 3699static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) 3700{ 3701 do_purge(ls, ms->m_nodeid, ms->m_pid); 3702} 3703 |
3656static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) | 3704static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) |
3657{ 3658 struct dlm_lkb *lkb; 3659 struct dlm_rsb *r; 3660 int error, mstype, result; 3661 3662 error = find_lkb(ls, ms->m_remid, &lkb); | 3705{ 3706 struct dlm_lkb *lkb; 3707 struct dlm_rsb *r; 3708 int error, mstype, result; 3709 3710 error = find_lkb(ls, ms->m_remid, &lkb); |
3663 if (error) { 3664 log_debug(ls, "receive_request_reply from %d no lkb %x", 3665 ms->m_header.h_nodeid, ms->m_remid); 3666 return; 3667 } | 3711 if (error) 3712 return error; |
3668 3669 r = lkb->lkb_resource; 3670 hold_rsb(r); 3671 lock_rsb(r); 3672 3673 error = validate_message(lkb, ms); 3674 if (error) 3675 goto out; 3676 3677 mstype = lkb->lkb_wait_type; 3678 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); | 3713 3714 r = lkb->lkb_resource; 3715 hold_rsb(r); 3716 lock_rsb(r); 3717 3718 error = validate_message(lkb, ms); 3719 if (error) 3720 goto out; 3721 3722 mstype = lkb->lkb_wait_type; 3723 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); |
3679 if (error) | 3724 if (error) { 3725 log_error(ls, "receive_request_reply %x remote %d %x result %d", 3726 lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid, 3727 ms->m_result); 3728 dlm_dump_rsb(r); |
3680 goto out; | 3729 goto out; |
3730 } |
|
3681 3682 /* Optimization: the dir node was also the master, so it took our 3683 lookup as a request and sent request reply instead of lookup reply */ 3684 if (mstype == DLM_MSG_LOOKUP) { 3685 r->res_nodeid = ms->m_header.h_nodeid; 3686 lkb->lkb_nodeid = r->res_nodeid; 3687 } 3688 --- 61 unchanged lines hidden (view full) --- 3750 } else { 3751 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 3752 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 3753 } 3754 out: 3755 unlock_rsb(r); 3756 put_rsb(r); 3757 dlm_put_lkb(lkb); | 3731 3732 /* Optimization: the dir node was also the master, so it took our 3733 lookup as a request and sent request reply instead of lookup reply */ 3734 if (mstype == DLM_MSG_LOOKUP) { 3735 r->res_nodeid = ms->m_header.h_nodeid; 3736 lkb->lkb_nodeid = r->res_nodeid; 3737 } 3738 --- 61 unchanged lines hidden (view full) --- 3800 } else { 3801 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 3802 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 3803 } 3804 out: 3805 unlock_rsb(r); 3806 put_rsb(r); 3807 dlm_put_lkb(lkb); |
3808 return 0; |
|
3758} 3759 3760static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 3761 struct dlm_message *ms) 3762{ 3763 /* this is the value returned from do_convert() on the master */ 3764 switch (ms->m_result) { 3765 case -EAGAIN: --- 22 unchanged lines hidden (view full) --- 3788 receive_flags_reply(lkb, ms); 3789 if (is_demoted(lkb)) 3790 munge_demoted(lkb); 3791 grant_lock_pc(r, lkb, ms); 3792 queue_cast(r, lkb, 0); 3793 break; 3794 3795 default: | 3809} 3810 3811static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 3812 struct dlm_message *ms) 3813{ 3814 /* this is the value returned from do_convert() on the master */ 3815 switch (ms->m_result) { 3816 case -EAGAIN: --- 22 unchanged lines hidden (view full) --- 3839 receive_flags_reply(lkb, ms); 3840 if (is_demoted(lkb)) 3841 munge_demoted(lkb); 3842 grant_lock_pc(r, lkb, ms); 3843 queue_cast(r, lkb, 0); 3844 break; 3845 3846 default: |
3796 log_error(r->res_ls, "receive_convert_reply %x error %d", 3797 lkb->lkb_id, ms->m_result); | 3847 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d", 3848 lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid, 3849 ms->m_result); 3850 dlm_print_rsb(r); 3851 dlm_print_lkb(lkb); |
3798 } 3799} 3800 3801static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3802{ 3803 struct dlm_rsb *r = lkb->lkb_resource; 3804 int error; 3805 --- 10 unchanged lines hidden (view full) --- 3816 goto out; 3817 3818 __receive_convert_reply(r, lkb, ms); 3819 out: 3820 unlock_rsb(r); 3821 put_rsb(r); 3822} 3823 | 3852 } 3853} 3854 3855static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3856{ 3857 struct dlm_rsb *r = lkb->lkb_resource; 3858 int error; 3859 --- 10 unchanged lines hidden (view full) --- 3870 goto out; 3871 3872 __receive_convert_reply(r, lkb, ms); 3873 out: 3874 unlock_rsb(r); 3875 put_rsb(r); 3876} 3877 |
3824static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) | 3878static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) |
3825{ 3826 struct dlm_lkb *lkb; 3827 int error; 3828 3829 error = find_lkb(ls, ms->m_remid, &lkb); | 3879{ 3880 struct dlm_lkb *lkb; 3881 int error; 3882 3883 error = find_lkb(ls, ms->m_remid, &lkb); |
3830 if (error) { 3831 log_debug(ls, "receive_convert_reply from %d no lkb %x", 3832 ms->m_header.h_nodeid, ms->m_remid); 3833 return; 3834 } | 3884 if (error) 3885 return error; |
3835 3836 _receive_convert_reply(lkb, ms); 3837 dlm_put_lkb(lkb); | 3886 3887 _receive_convert_reply(lkb, ms); 3888 dlm_put_lkb(lkb); |
3889 return 0; |
|
3838} 3839 3840static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3841{ 3842 struct dlm_rsb *r = lkb->lkb_resource; 3843 int error; 3844 3845 hold_rsb(r); --- 22 unchanged lines hidden (view full) --- 3868 log_error(r->res_ls, "receive_unlock_reply %x error %d", 3869 lkb->lkb_id, ms->m_result); 3870 } 3871 out: 3872 unlock_rsb(r); 3873 put_rsb(r); 3874} 3875 | 3890} 3891 3892static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3893{ 3894 struct dlm_rsb *r = lkb->lkb_resource; 3895 int error; 3896 3897 hold_rsb(r); --- 22 unchanged lines hidden (view full) --- 3920 log_error(r->res_ls, "receive_unlock_reply %x error %d", 3921 lkb->lkb_id, ms->m_result); 3922 } 3923 out: 3924 unlock_rsb(r); 3925 put_rsb(r); 3926} 3927 |
3876static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) | 3928static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) |
3877{ 3878 struct dlm_lkb *lkb; 3879 int error; 3880 3881 error = find_lkb(ls, ms->m_remid, &lkb); | 3929{ 3930 struct dlm_lkb *lkb; 3931 int error; 3932 3933 error = find_lkb(ls, ms->m_remid, &lkb); |
3882 if (error) { 3883 log_debug(ls, "receive_unlock_reply from %d no lkb %x", 3884 ms->m_header.h_nodeid, ms->m_remid); 3885 return; 3886 } | 3934 if (error) 3935 return error; |
3887 3888 _receive_unlock_reply(lkb, ms); 3889 dlm_put_lkb(lkb); | 3936 3937 _receive_unlock_reply(lkb, ms); 3938 dlm_put_lkb(lkb); |
3939 return 0; |
|
3890} 3891 3892static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3893{ 3894 struct dlm_rsb *r = lkb->lkb_resource; 3895 int error; 3896 3897 hold_rsb(r); --- 22 unchanged lines hidden (view full) --- 3920 log_error(r->res_ls, "receive_cancel_reply %x error %d", 3921 lkb->lkb_id, ms->m_result); 3922 } 3923 out: 3924 unlock_rsb(r); 3925 put_rsb(r); 3926} 3927 | 3940} 3941 3942static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3943{ 3944 struct dlm_rsb *r = lkb->lkb_resource; 3945 int error; 3946 3947 hold_rsb(r); --- 22 unchanged lines hidden (view full) --- 3970 log_error(r->res_ls, "receive_cancel_reply %x error %d", 3971 lkb->lkb_id, ms->m_result); 3972 } 3973 out: 3974 unlock_rsb(r); 3975 put_rsb(r); 3976} 3977 |
3928static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) | 3978static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) |
3929{ 3930 struct dlm_lkb *lkb; 3931 int error; 3932 3933 error = find_lkb(ls, ms->m_remid, &lkb); | 3979{ 3980 struct dlm_lkb *lkb; 3981 int error; 3982 3983 error = find_lkb(ls, ms->m_remid, &lkb); |
3934 if (error) { 3935 log_debug(ls, "receive_cancel_reply from %d no lkb %x", 3936 ms->m_header.h_nodeid, ms->m_remid); 3937 return; 3938 } | 3984 if (error) 3985 return error; |
3939 3940 _receive_cancel_reply(lkb, ms); 3941 dlm_put_lkb(lkb); | 3986 3987 _receive_cancel_reply(lkb, ms); 3988 dlm_put_lkb(lkb); |
3989 return 0; |
|
3942} 3943 3944static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) 3945{ 3946 struct dlm_lkb *lkb; 3947 struct dlm_rsb *r; 3948 int error, ret_nodeid; 3949 3950 error = find_lkb(ls, ms->m_lkid, &lkb); 3951 if (error) { | 3990} 3991 3992static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) 3993{ 3994 struct dlm_lkb *lkb; 3995 struct dlm_rsb *r; 3996 int error, ret_nodeid; 3997 3998 error = find_lkb(ls, ms->m_lkid, &lkb); 3999 if (error) { |
3952 log_error(ls, "receive_lookup_reply no lkb"); | 4000 log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid); |
3953 return; 3954 } 3955 3956 /* ms->m_result is the value returned by dlm_dir_lookup on dir node 3957 FIXME: will a non-zero error ever be returned? */ 3958 3959 r = lkb->lkb_resource; 3960 hold_rsb(r); --- 27 unchanged lines hidden (view full) --- 3988 if (!ret_nodeid) 3989 process_lookup_list(r); 3990 out: 3991 unlock_rsb(r); 3992 put_rsb(r); 3993 dlm_put_lkb(lkb); 3994} 3995 | 4001 return; 4002 } 4003 4004 /* ms->m_result is the value returned by dlm_dir_lookup on dir node 4005 FIXME: will a non-zero error ever be returned? */ 4006 4007 r = lkb->lkb_resource; 4008 hold_rsb(r); --- 27 unchanged lines hidden (view full) --- 4036 if (!ret_nodeid) 4037 process_lookup_list(r); 4038 out: 4039 unlock_rsb(r); 4040 put_rsb(r); 4041 dlm_put_lkb(lkb); 4042} 4043 |
3996static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms) | 4044static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, 4045 uint32_t saved_seq) |
3997{ | 4046{ |
4047 int error = 0, noent = 0; 4048 |
|
3998 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) { 3999 log_debug(ls, "ignore non-member message %d from %d %x %x %d", 4000 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid, 4001 ms->m_remid, ms->m_result); 4002 return; 4003 } 4004 4005 switch (ms->m_type) { 4006 4007 /* messages sent to a master node */ 4008 4009 case DLM_MSG_REQUEST: | 4049 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) { 4050 log_debug(ls, "ignore non-member message %d from %d %x %x %d", 4051 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid, 4052 ms->m_remid, ms->m_result); 4053 return; 4054 } 4055 4056 switch (ms->m_type) { 4057 4058 /* messages sent to a master node */ 4059 4060 case DLM_MSG_REQUEST: |
4010 receive_request(ls, ms); | 4061 error = receive_request(ls, ms); |
4011 break; 4012 4013 case DLM_MSG_CONVERT: | 4062 break; 4063 4064 case DLM_MSG_CONVERT: |
4014 receive_convert(ls, ms); | 4065 error = receive_convert(ls, ms); |
4015 break; 4016 4017 case DLM_MSG_UNLOCK: | 4066 break; 4067 4068 case DLM_MSG_UNLOCK: |
4018 receive_unlock(ls, ms); | 4069 error = receive_unlock(ls, ms); |
4019 break; 4020 4021 case DLM_MSG_CANCEL: | 4070 break; 4071 4072 case DLM_MSG_CANCEL: |
4022 receive_cancel(ls, ms); | 4073 noent = 1; 4074 error = receive_cancel(ls, ms); |
4023 break; 4024 4025 /* messages sent from a master node (replies to above) */ 4026 4027 case DLM_MSG_REQUEST_REPLY: | 4075 break; 4076 4077 /* messages sent from a master node (replies to above) */ 4078 4079 case DLM_MSG_REQUEST_REPLY: |
4028 receive_request_reply(ls, ms); | 4080 error = receive_request_reply(ls, ms); |
4029 break; 4030 4031 case DLM_MSG_CONVERT_REPLY: | 4081 break; 4082 4083 case DLM_MSG_CONVERT_REPLY: |
4032 receive_convert_reply(ls, ms); | 4084 error = receive_convert_reply(ls, ms); |
4033 break; 4034 4035 case DLM_MSG_UNLOCK_REPLY: | 4085 break; 4086 4087 case DLM_MSG_UNLOCK_REPLY: |
4036 receive_unlock_reply(ls, ms); | 4088 error = receive_unlock_reply(ls, ms); |
4037 break; 4038 4039 case DLM_MSG_CANCEL_REPLY: | 4089 break; 4090 4091 case DLM_MSG_CANCEL_REPLY: |
4040 receive_cancel_reply(ls, ms); | 4092 error = receive_cancel_reply(ls, ms); |
4041 break; 4042 4043 /* messages sent from a master node (only two types of async msg) */ 4044 4045 case DLM_MSG_GRANT: | 4093 break; 4094 4095 /* messages sent from a master node (only two types of async msg) */ 4096 4097 case DLM_MSG_GRANT: |
4046 receive_grant(ls, ms); | 4098 noent = 1; 4099 error = receive_grant(ls, ms); |
4047 break; 4048 4049 case DLM_MSG_BAST: | 4100 break; 4101 4102 case DLM_MSG_BAST: |
4050 receive_bast(ls, ms); | 4103 noent = 1; 4104 error = receive_bast(ls, ms); |
4051 break; 4052 4053 /* messages sent to a dir node */ 4054 4055 case DLM_MSG_LOOKUP: 4056 receive_lookup(ls, ms); 4057 break; 4058 --- 11 unchanged lines hidden (view full) --- 4070 4071 case DLM_MSG_PURGE: 4072 receive_purge(ls, ms); 4073 break; 4074 4075 default: 4076 log_error(ls, "unknown message type %d", ms->m_type); 4077 } | 4105 break; 4106 4107 /* messages sent to a dir node */ 4108 4109 case DLM_MSG_LOOKUP: 4110 receive_lookup(ls, ms); 4111 break; 4112 --- 11 unchanged lines hidden (view full) --- 4124 4125 case DLM_MSG_PURGE: 4126 receive_purge(ls, ms); 4127 break; 4128 4129 default: 4130 log_error(ls, "unknown message type %d", ms->m_type); 4131 } |
4132 4133 /* 4134 * When checking for ENOENT, we're checking the result of 4135 * find_lkb(m_remid): 4136 * 4137 * The lock id referenced in the message wasn't found. This may 4138 * happen in normal usage for the async messages and cancel, so 4139 * only use log_debug for them. 4140 * 4141 * Some errors are expected and normal. 4142 */ 4143 4144 if (error == -ENOENT && noent) { 4145 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u", 4146 ms->m_type, ms->m_remid, ms->m_header.h_nodeid, 4147 ms->m_lkid, saved_seq); 4148 } else if (error == -ENOENT) { 4149 log_error(ls, "receive %d no %x remote %d %x saved_seq %u", 4150 ms->m_type, ms->m_remid, ms->m_header.h_nodeid, 4151 ms->m_lkid, saved_seq); 4152 4153 if (ms->m_type == DLM_MSG_CONVERT) 4154 dlm_dump_rsb_hash(ls, ms->m_hash); 4155 } 4156 4157 if (error == -EINVAL) { 4158 log_error(ls, "receive %d inval from %d lkid %x remid %x " 4159 "saved_seq %u", 4160 ms->m_type, ms->m_header.h_nodeid, 4161 ms->m_lkid, ms->m_remid, saved_seq); 4162 } |
|
4078} 4079 4080/* If the lockspace is in recovery mode (locking stopped), then normal 4081 messages are saved on the requestqueue for processing after recovery is 4082 done. When not in recovery mode, we wait for dlm_recoverd to drain saved 4083 messages off the requestqueue before we process new ones. This occurs right 4084 after recovery completes when we transition from saving all messages on 4085 requestqueue, to processing all the saved messages, to processing new 4086 messages as they arrive. */ 4087 4088static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, 4089 int nodeid) 4090{ 4091 if (dlm_locking_stopped(ls)) { 4092 dlm_add_requestqueue(ls, nodeid, ms); 4093 } else { 4094 dlm_wait_requestqueue(ls); | 4163} 4164 4165/* If the lockspace is in recovery mode (locking stopped), then normal 4166 messages are saved on the requestqueue for processing after recovery is 4167 done. When not in recovery mode, we wait for dlm_recoverd to drain saved 4168 messages off the requestqueue before we process new ones. This occurs right 4169 after recovery completes when we transition from saving all messages on 4170 requestqueue, to processing all the saved messages, to processing new 4171 messages as they arrive. */ 4172 4173static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, 4174 int nodeid) 4175{ 4176 if (dlm_locking_stopped(ls)) { 4177 dlm_add_requestqueue(ls, nodeid, ms); 4178 } else { 4179 dlm_wait_requestqueue(ls); |
4095 _receive_message(ls, ms); | 4180 _receive_message(ls, ms, 0); |
4096 } 4097} 4098 4099/* This is called by dlm_recoverd to process messages that were saved on 4100 the requestqueue. */ 4101 | 4181 } 4182} 4183 4184/* This is called by dlm_recoverd to process messages that were saved on 4185 the requestqueue. */ 4186 |
4102void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms) | 4187void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, 4188 uint32_t saved_seq) |
4103{ | 4189{ |
4104 _receive_message(ls, ms); | 4190 _receive_message(ls, ms, saved_seq); |
4105} 4106 4107/* This is called by the midcomms layer when something is received for 4108 the lockspace. It could be either a MSG (normal message sent as part of 4109 standard locking activity) or an RCOM (recovery message sent as part of 4110 lockspace recovery). */ 4111 4112void dlm_receive_buffer(union dlm_packet *p, int nodeid) --- 19 unchanged lines hidden (view full) --- 4132 if (hd->h_nodeid != nodeid) { 4133 log_print("invalid h_nodeid %d from %d lockspace %x", 4134 hd->h_nodeid, nodeid, hd->h_lockspace); 4135 return; 4136 } 4137 4138 ls = dlm_find_lockspace_global(hd->h_lockspace); 4139 if (!ls) { | 4191} 4192 4193/* This is called by the midcomms layer when something is received for 4194 the lockspace. It could be either a MSG (normal message sent as part of 4195 standard locking activity) or an RCOM (recovery message sent as part of 4196 lockspace recovery). */ 4197 4198void dlm_receive_buffer(union dlm_packet *p, int nodeid) --- 19 unchanged lines hidden (view full) --- 4218 if (hd->h_nodeid != nodeid) { 4219 log_print("invalid h_nodeid %d from %d lockspace %x", 4220 hd->h_nodeid, nodeid, hd->h_lockspace); 4221 return; 4222 } 4223 4224 ls = dlm_find_lockspace_global(hd->h_lockspace); 4225 if (!ls) { |
4140 if (dlm_config.ci_log_debug) 4141 log_print("invalid lockspace %x from %d cmd %d type %d", 4142 hd->h_lockspace, nodeid, hd->h_cmd, type); | 4226 if (dlm_config.ci_log_debug) { 4227 printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace " 4228 "%u from %d cmd %d type %d\n", 4229 hd->h_lockspace, nodeid, hd->h_cmd, type); 4230 } |
4143 4144 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS) 4145 dlm_send_ls_not_ready(nodeid, &p->rcom); 4146 return; 4147 } 4148 4149 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to 4150 be inactive (in this ls) before transitioning to recovery mode */ --- 31 unchanged lines hidden (view full) --- 4182 4183 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down 4184 conversions are async; there's no reply from the remote master */ 4185} 4186 4187/* A waiting lkb needs recovery if the master node has failed, or 4188 the master node is changing (only when no directory is used) */ 4189 | 4231 4232 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS) 4233 dlm_send_ls_not_ready(nodeid, &p->rcom); 4234 return; 4235 } 4236 4237 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to 4238 be inactive (in this ls) before transitioning to recovery mode */ --- 31 unchanged lines hidden (view full) --- 4270 4271 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down 4272 conversions are async; there's no reply from the remote master */ 4273} 4274 4275/* A waiting lkb needs recovery if the master node has failed, or 4276 the master node is changing (only when no directory is used) */ 4277 |
4190static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb) | 4278static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb, 4279 int dir_nodeid) |
4191{ | 4280{ |
4192 if (dlm_is_removed(ls, lkb->lkb_nodeid)) | 4281 if (dlm_no_directory(ls)) |
4193 return 1; 4194 | 4282 return 1; 4283 |
4195 if (!dlm_no_directory(ls)) 4196 return 0; 4197 4198 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid) | 4284 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid)) |
4199 return 1; 4200 4201 return 0; 4202} 4203 4204/* Recovery for locks that are waiting for replies from nodes that are now 4205 gone. We can just complete unlocks and cancels by faking a reply from the 4206 dead node. Requests and up-conversions we flag to be resent after 4207 recovery. Down-conversions can just be completed with a fake reply like 4208 unlocks. Conversions between PR and CW need special attention. */ 4209 4210void dlm_recover_waiters_pre(struct dlm_ls *ls) 4211{ 4212 struct dlm_lkb *lkb, *safe; 4213 struct dlm_message *ms_stub; 4214 int wait_type, stub_unlock_result, stub_cancel_result; | 4285 return 1; 4286 4287 return 0; 4288} 4289 4290/* Recovery for locks that are waiting for replies from nodes that are now 4291 gone. We can just complete unlocks and cancels by faking a reply from the 4292 dead node. Requests and up-conversions we flag to be resent after 4293 recovery. Down-conversions can just be completed with a fake reply like 4294 unlocks. Conversions between PR and CW need special attention. */ 4295 4296void dlm_recover_waiters_pre(struct dlm_ls *ls) 4297{ 4298 struct dlm_lkb *lkb, *safe; 4299 struct dlm_message *ms_stub; 4300 int wait_type, stub_unlock_result, stub_cancel_result; |
4301 int dir_nodeid; |
|
4215 4216 ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL); 4217 if (!ms_stub) { 4218 log_error(ls, "dlm_recover_waiters_pre no mem"); 4219 return; 4220 } 4221 4222 mutex_lock(&ls->ls_waiters_mutex); 4223 4224 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) { 4225 | 4302 4303 ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL); 4304 if (!ms_stub) { 4305 log_error(ls, "dlm_recover_waiters_pre no mem"); 4306 return; 4307 } 4308 4309 mutex_lock(&ls->ls_waiters_mutex); 4310 4311 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) { 4312 |
4313 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource); 4314 |
|
4226 /* exclude debug messages about unlocks because there can be so 4227 many and they aren't very interesting */ 4228 4229 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) { | 4315 /* exclude debug messages about unlocks because there can be so 4316 many and they aren't very interesting */ 4317 4318 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) { |
4230 log_debug(ls, "recover_waiter %x nodeid %d " 4231 "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid, 4232 lkb->lkb_wait_type, lkb->lkb_wait_nodeid); | 4319 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d " 4320 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d", 4321 lkb->lkb_id, 4322 lkb->lkb_remid, 4323 lkb->lkb_wait_type, 4324 lkb->lkb_resource->res_nodeid, 4325 lkb->lkb_nodeid, 4326 lkb->lkb_wait_nodeid, 4327 dir_nodeid); |
4233 } 4234 4235 /* all outstanding lookups, regardless of destination will be 4236 resent after recovery is done */ 4237 4238 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) { 4239 lkb->lkb_flags |= DLM_IFL_RESEND; 4240 continue; 4241 } 4242 | 4328 } 4329 4330 /* all outstanding lookups, regardless of destination will be 4331 resent after recovery is done */ 4332 4333 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) { 4334 lkb->lkb_flags |= DLM_IFL_RESEND; 4335 continue; 4336 } 4337 |
4243 if (!waiter_needs_recovery(ls, lkb)) | 4338 if (!waiter_needs_recovery(ls, lkb, dir_nodeid)) |
4244 continue; 4245 4246 wait_type = lkb->lkb_wait_type; 4247 stub_unlock_result = -DLM_EUNLOCK; 4248 stub_cancel_result = -DLM_ECANCEL; 4249 4250 /* Main reply may have been received leaving a zero wait_type, 4251 but a reply for the overlapping op may not have been --- 116 unchanged lines hidden (view full) --- 4368 hold_rsb(r); 4369 lock_rsb(r); 4370 4371 mstype = lkb->lkb_wait_type; 4372 oc = is_overlap_cancel(lkb); 4373 ou = is_overlap_unlock(lkb); 4374 err = 0; 4375 | 4339 continue; 4340 4341 wait_type = lkb->lkb_wait_type; 4342 stub_unlock_result = -DLM_EUNLOCK; 4343 stub_cancel_result = -DLM_ECANCEL; 4344 4345 /* Main reply may have been received leaving a zero wait_type, 4346 but a reply for the overlapping op may not have been --- 116 unchanged lines hidden (view full) --- 4463 hold_rsb(r); 4464 lock_rsb(r); 4465 4466 mstype = lkb->lkb_wait_type; 4467 oc = is_overlap_cancel(lkb); 4468 ou = is_overlap_unlock(lkb); 4469 err = 0; 4470 |
4376 log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d", 4377 lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid); | 4471 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d " 4472 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d " 4473 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype, 4474 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid, 4475 dlm_dir_nodeid(r), oc, ou); |
4378 4379 /* At this point we assume that we won't get a reply to any 4380 previous op or overlap op on this lock. First, do a big 4381 remove_from_waiters() for all previous ops. */ 4382 4383 lkb->lkb_flags &= ~DLM_IFL_RESEND; 4384 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 4385 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; --- 35 unchanged lines hidden (view full) --- 4421 case DLM_MSG_CONVERT: 4422 _convert_lock(r, lkb); 4423 break; 4424 default: 4425 err = 1; 4426 } 4427 } 4428 | 4476 4477 /* At this point we assume that we won't get a reply to any 4478 previous op or overlap op on this lock. First, do a big 4479 remove_from_waiters() for all previous ops. */ 4480 4481 lkb->lkb_flags &= ~DLM_IFL_RESEND; 4482 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 4483 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; --- 35 unchanged lines hidden (view full) --- 4519 case DLM_MSG_CONVERT: 4520 _convert_lock(r, lkb); 4521 break; 4522 default: 4523 err = 1; 4524 } 4525 } 4526 |
4429 if (err) 4430 log_error(ls, "recover_waiters_post %x %d %x %d %d", 4431 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou); | 4527 if (err) { 4528 log_error(ls, "waiter %x msg %d r_nodeid %d " 4529 "dir_nodeid %d overlap %d %d", 4530 lkb->lkb_id, mstype, r->res_nodeid, 4531 dlm_dir_nodeid(r), oc, ou); 4532 } |
4432 unlock_rsb(r); 4433 put_rsb(r); 4434 dlm_put_lkb(lkb); 4435 } 4436 4437 return error; 4438} 4439 | 4533 unlock_rsb(r); 4534 put_rsb(r); 4535 dlm_put_lkb(lkb); 4536 } 4537 4538 return error; 4539} 4540 |
4440static void purge_queue(struct dlm_rsb *r, struct list_head *queue, 4441 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb)) | 4541static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r, 4542 struct list_head *list) |
4442{ | 4543{ |
4443 struct dlm_ls *ls = r->res_ls; | |
4444 struct dlm_lkb *lkb, *safe; 4445 | 4544 struct dlm_lkb *lkb, *safe; 4545 |
4446 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) { 4447 if (test(ls, lkb)) { 4448 rsb_set_flag(r, RSB_LOCKS_PURGED); 4449 del_lkb(r, lkb); 4450 /* this put should free the lkb */ 4451 if (!dlm_put_lkb(lkb)) 4452 log_error(ls, "purged lkb not released"); 4453 } | 4546 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) { 4547 if (!is_master_copy(lkb)) 4548 continue; 4549 4550 /* don't purge lkbs we've added in recover_master_copy for 4551 the current recovery seq */ 4552 4553 if (lkb->lkb_recover_seq == ls->ls_recover_seq) 4554 continue; 4555 4556 del_lkb(r, lkb); 4557 4558 /* this put should free the lkb */ 4559 if (!dlm_put_lkb(lkb)) 4560 log_error(ls, "purged mstcpy lkb not released"); |
4454 } 4455} 4456 | 4561 } 4562} 4563 |
4457static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb) | 4564void dlm_purge_mstcpy_locks(struct dlm_rsb *r) |
4458{ | 4565{ |
4459 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid)); 4460} | 4566 struct dlm_ls *ls = r->res_ls; |
4461 | 4567 |
4462static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb) 4463{ 4464 return is_master_copy(lkb); | 4568 purge_mstcpy_list(ls, r, &r->res_grantqueue); 4569 purge_mstcpy_list(ls, r, &r->res_convertqueue); 4570 purge_mstcpy_list(ls, r, &r->res_waitqueue); |
4465} 4466 | 4571} 4572 |
4467static void purge_dead_locks(struct dlm_rsb *r) | 4573static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r, 4574 struct list_head *list, 4575 int nodeid_gone, unsigned int *count) |
4468{ | 4576{ |
4469 purge_queue(r, &r->res_grantqueue, &purge_dead_test); 4470 purge_queue(r, &r->res_convertqueue, &purge_dead_test); 4471 purge_queue(r, &r->res_waitqueue, &purge_dead_test); 4472} | 4577 struct dlm_lkb *lkb, *safe; |
4473 | 4578 |
4474void dlm_purge_mstcpy_locks(struct dlm_rsb *r) 4475{ 4476 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test); 4477 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test); 4478 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test); | 4579 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) { 4580 if (!is_master_copy(lkb)) 4581 continue; 4582 4583 if ((lkb->lkb_nodeid == nodeid_gone) || 4584 dlm_is_removed(ls, lkb->lkb_nodeid)) { 4585 4586 del_lkb(r, lkb); 4587 4588 /* this put should free the lkb */ 4589 if (!dlm_put_lkb(lkb)) 4590 log_error(ls, "purged dead lkb not released"); 4591 4592 rsb_set_flag(r, RSB_RECOVER_GRANT); 4593 4594 (*count)++; 4595 } 4596 } |
4479} 4480 4481/* Get rid of locks held by nodes that are gone. */ 4482 | 4597} 4598 4599/* Get rid of locks held by nodes that are gone. */ 4600 |
4483int dlm_purge_locks(struct dlm_ls *ls) | 4601void dlm_recover_purge(struct dlm_ls *ls) |
4484{ 4485 struct dlm_rsb *r; | 4602{ 4603 struct dlm_rsb *r; |
4604 struct dlm_member *memb; 4605 int nodes_count = 0; 4606 int nodeid_gone = 0; 4607 unsigned int lkb_count = 0; |
|
4486 | 4608 |
4487 log_debug(ls, "dlm_purge_locks"); | 4609 /* cache one removed nodeid to optimize the common 4610 case of a single node removed */ |
4488 | 4611 |
4612 list_for_each_entry(memb, &ls->ls_nodes_gone, list) { 4613 nodes_count++; 4614 nodeid_gone = memb->nodeid; 4615 } 4616 4617 if (!nodes_count) 4618 return; 4619 |
|
4489 down_write(&ls->ls_root_sem); 4490 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 4491 hold_rsb(r); 4492 lock_rsb(r); | 4620 down_write(&ls->ls_root_sem); 4621 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 4622 hold_rsb(r); 4623 lock_rsb(r); |
4493 if (is_master(r)) 4494 purge_dead_locks(r); | 4624 if (is_master(r)) { 4625 purge_dead_list(ls, r, &r->res_grantqueue, 4626 nodeid_gone, &lkb_count); 4627 purge_dead_list(ls, r, &r->res_convertqueue, 4628 nodeid_gone, &lkb_count); 4629 purge_dead_list(ls, r, &r->res_waitqueue, 4630 nodeid_gone, &lkb_count); 4631 } |
4495 unlock_rsb(r); 4496 unhold_rsb(r); | 4632 unlock_rsb(r); 4633 unhold_rsb(r); |
4497 4498 schedule(); | 4634 cond_resched(); |
4499 } 4500 up_write(&ls->ls_root_sem); 4501 | 4635 } 4636 up_write(&ls->ls_root_sem); 4637 |
4502 return 0; | 4638 if (lkb_count) 4639 log_debug(ls, "dlm_recover_purge %u locks for %u nodes", 4640 lkb_count, nodes_count); |
4503} 4504 | 4641} 4642 |
4505static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket) | 4643static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket) |
4506{ 4507 struct rb_node *n; | 4644{ 4645 struct rb_node *n; |
4508 struct dlm_rsb *r, *r_ret = NULL; | 4646 struct dlm_rsb *r; |
4509 4510 spin_lock(&ls->ls_rsbtbl[bucket].lock); 4511 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) { 4512 r = rb_entry(n, struct dlm_rsb, res_hashnode); | 4647 4648 spin_lock(&ls->ls_rsbtbl[bucket].lock); 4649 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) { 4650 r = rb_entry(n, struct dlm_rsb, res_hashnode); |
4513 if (!rsb_flag(r, RSB_LOCKS_PURGED)) | 4651 4652 if (!rsb_flag(r, RSB_RECOVER_GRANT)) |
4514 continue; | 4653 continue; |
4654 rsb_clear_flag(r, RSB_RECOVER_GRANT); 4655 if (!is_master(r)) 4656 continue; |
|
4515 hold_rsb(r); | 4657 hold_rsb(r); |
4516 rsb_clear_flag(r, RSB_LOCKS_PURGED); 4517 r_ret = r; 4518 break; | 4658 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 4659 return r; |
4519 } 4520 spin_unlock(&ls->ls_rsbtbl[bucket].lock); | 4660 } 4661 spin_unlock(&ls->ls_rsbtbl[bucket].lock); |
4521 return r_ret; | 4662 return NULL; |
4522} 4523 | 4663} 4664 |
4524void dlm_grant_after_purge(struct dlm_ls *ls) | 4665/* 4666 * Attempt to grant locks on resources that we are the master of. 4667 * Locks may have become grantable during recovery because locks 4668 * from departed nodes have been purged (or not rebuilt), allowing 4669 * previously blocked locks to now be granted. The subset of rsb's 4670 * we are interested in are those with lkb's on either the convert or 4671 * waiting queues. 4672 * 4673 * Simplest would be to go through each master rsb and check for non-empty 4674 * convert or waiting queues, and attempt to grant on those rsbs. 4675 * Checking the queues requires lock_rsb, though, for which we'd need 4676 * to release the rsbtbl lock. This would make iterating through all 4677 * rsb's very inefficient. So, we rely on earlier recovery routines 4678 * to set RECOVER_GRANT on any rsb's that we should attempt to grant 4679 * locks for. 4680 */ 4681 4682void dlm_recover_grant(struct dlm_ls *ls) |
4525{ 4526 struct dlm_rsb *r; 4527 int bucket = 0; | 4683{ 4684 struct dlm_rsb *r; 4685 int bucket = 0; |
4686 unsigned int count = 0; 4687 unsigned int rsb_count = 0; 4688 unsigned int lkb_count = 0; |
|
4528 4529 while (1) { | 4689 4690 while (1) { |
4530 r = find_purged_rsb(ls, bucket); | 4691 r = find_grant_rsb(ls, bucket); |
4531 if (!r) { 4532 if (bucket == ls->ls_rsbtbl_size - 1) 4533 break; 4534 bucket++; 4535 continue; 4536 } | 4692 if (!r) { 4693 if (bucket == ls->ls_rsbtbl_size - 1) 4694 break; 4695 bucket++; 4696 continue; 4697 } |
4698 rsb_count++; 4699 count = 0; |
|
4537 lock_rsb(r); | 4700 lock_rsb(r); |
4538 if (is_master(r)) { 4539 grant_pending_locks(r); 4540 confirm_master(r, 0); 4541 } | 4701 grant_pending_locks(r, &count); 4702 lkb_count += count; 4703 confirm_master(r, 0); |
4542 unlock_rsb(r); 4543 put_rsb(r); | 4704 unlock_rsb(r); 4705 put_rsb(r); |
4544 schedule(); | 4706 cond_resched(); |
4545 } | 4707 } |
4708 4709 if (lkb_count) 4710 log_debug(ls, "dlm_recover_grant %u locks on %u resources", 4711 lkb_count, rsb_count); |
|
4546} 4547 4548static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid, 4549 uint32_t remid) 4550{ 4551 struct dlm_lkb *lkb; 4552 4553 list_for_each_entry(lkb, head, lkb_statequeue) { --- 72 unchanged lines hidden (view full) --- 4626 back the rcom_lock struct we got but with the remid field filled in. */ 4627 4628/* needs at least dlm_rcom + rcom_lock */ 4629int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) 4630{ 4631 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 4632 struct dlm_rsb *r; 4633 struct dlm_lkb *lkb; | 4712} 4713 4714static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid, 4715 uint32_t remid) 4716{ 4717 struct dlm_lkb *lkb; 4718 4719 list_for_each_entry(lkb, head, lkb_statequeue) { --- 72 unchanged lines hidden (view full) --- 4792 back the rcom_lock struct we got but with the remid field filled in. */ 4793 4794/* needs at least dlm_rcom + rcom_lock */ 4795int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) 4796{ 4797 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 4798 struct dlm_rsb *r; 4799 struct dlm_lkb *lkb; |
4800 uint32_t remid = 0; |
|
4634 int error; 4635 4636 if (rl->rl_parent_lkid) { 4637 error = -EOPNOTSUPP; 4638 goto out; 4639 } 4640 | 4801 int error; 4802 4803 if (rl->rl_parent_lkid) { 4804 error = -EOPNOTSUPP; 4805 goto out; 4806 } 4807 |
4641 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 4642 R_MASTER, &r); | 4808 remid = le32_to_cpu(rl->rl_lkid); 4809 4810 /* In general we expect the rsb returned to be R_MASTER, but we don't 4811 have to require it. Recovery of masters on one node can overlap 4812 recovery of locks on another node, so one node can send us MSTCPY 4813 locks before we've made ourselves master of this rsb. We can still 4814 add new MSTCPY locks that we receive here without any harm; when 4815 we make ourselves master, dlm_recover_masters() won't touch the 4816 MSTCPY locks we've received early. */ 4817 4818 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 0, &r); |
4643 if (error) 4644 goto out; 4645 | 4819 if (error) 4820 goto out; 4821 |
4822 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) { 4823 log_error(ls, "dlm_recover_master_copy remote %d %x not dir", 4824 rc->rc_header.h_nodeid, remid); 4825 error = -EBADR; 4826 put_rsb(r); 4827 goto out; 4828 } 4829 |
|
4646 lock_rsb(r); 4647 | 4830 lock_rsb(r); 4831 |
4648 lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid)); | 4832 lkb = search_remid(r, rc->rc_header.h_nodeid, remid); |
4649 if (lkb) { 4650 error = -EEXIST; 4651 goto out_remid; 4652 } 4653 4654 error = create_lkb(ls, &lkb); 4655 if (error) 4656 goto out_unlock; 4657 4658 error = receive_rcom_lock_args(ls, lkb, r, rc); 4659 if (error) { 4660 __put_lkb(ls, lkb); 4661 goto out_unlock; 4662 } 4663 4664 attach_lkb(r, lkb); 4665 add_lkb(r, lkb, rl->rl_status); 4666 error = 0; | 4833 if (lkb) { 4834 error = -EEXIST; 4835 goto out_remid; 4836 } 4837 4838 error = create_lkb(ls, &lkb); 4839 if (error) 4840 goto out_unlock; 4841 4842 error = receive_rcom_lock_args(ls, lkb, r, rc); 4843 if (error) { 4844 __put_lkb(ls, lkb); 4845 goto out_unlock; 4846 } 4847 4848 attach_lkb(r, lkb); 4849 add_lkb(r, lkb, rl->rl_status); 4850 error = 0; |
4851 ls->ls_recover_locks_in++; |
|
4667 | 4852 |
4853 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue)) 4854 rsb_set_flag(r, RSB_RECOVER_GRANT); 4855 |
|
4668 out_remid: 4669 /* this is the new value returned to the lock holder for 4670 saving in its process-copy lkb */ 4671 rl->rl_remid = cpu_to_le32(lkb->lkb_id); 4672 | 4856 out_remid: 4857 /* this is the new value returned to the lock holder for 4858 saving in its process-copy lkb */ 4859 rl->rl_remid = cpu_to_le32(lkb->lkb_id); 4860 |
4861 lkb->lkb_recover_seq = ls->ls_recover_seq; 4862 |
|
4673 out_unlock: 4674 unlock_rsb(r); 4675 put_rsb(r); 4676 out: | 4863 out_unlock: 4864 unlock_rsb(r); 4865 put_rsb(r); 4866 out: |
4677 if (error) 4678 log_debug(ls, "recover_master_copy %d %x", error, 4679 le32_to_cpu(rl->rl_lkid)); | 4867 if (error && error != -EEXIST) 4868 log_debug(ls, "dlm_recover_master_copy remote %d %x error %d", 4869 rc->rc_header.h_nodeid, remid, error); |
4680 rl->rl_result = cpu_to_le32(error); 4681 return error; 4682} 4683 4684/* needs at least dlm_rcom + rcom_lock */ 4685int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) 4686{ 4687 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 4688 struct dlm_rsb *r; 4689 struct dlm_lkb *lkb; | 4870 rl->rl_result = cpu_to_le32(error); 4871 return error; 4872} 4873 4874/* needs at least dlm_rcom + rcom_lock */ 4875int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) 4876{ 4877 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 4878 struct dlm_rsb *r; 4879 struct dlm_lkb *lkb; |
4690 int error; | 4880 uint32_t lkid, remid; 4881 int error, result; |
4691 | 4882 |
4692 error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb); | 4883 lkid = le32_to_cpu(rl->rl_lkid); 4884 remid = le32_to_cpu(rl->rl_remid); 4885 result = le32_to_cpu(rl->rl_result); 4886 4887 error = find_lkb(ls, lkid, &lkb); |
4693 if (error) { | 4888 if (error) { |
4694 log_error(ls, "recover_process_copy no lkid %x", 4695 le32_to_cpu(rl->rl_lkid)); | 4889 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d", 4890 lkid, rc->rc_header.h_nodeid, remid, result); |
4696 return error; 4697 } 4698 | 4891 return error; 4892 } 4893 |
4699 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 4700 4701 error = le32_to_cpu(rl->rl_result); 4702 | |
4703 r = lkb->lkb_resource; 4704 hold_rsb(r); 4705 lock_rsb(r); 4706 | 4894 r = lkb->lkb_resource; 4895 hold_rsb(r); 4896 lock_rsb(r); 4897 |
4707 switch (error) { | 4898 if (!is_process_copy(lkb)) { 4899 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d", 4900 lkid, rc->rc_header.h_nodeid, remid, result); 4901 dlm_dump_rsb(r); 4902 unlock_rsb(r); 4903 put_rsb(r); 4904 dlm_put_lkb(lkb); 4905 return -EINVAL; 4906 } 4907 4908 switch (result) { |
4708 case -EBADR: 4709 /* There's a chance the new master received our lock before 4710 dlm_recover_master_reply(), this wouldn't happen if we did 4711 a barrier between recover_masters and recover_locks. */ | 4909 case -EBADR: 4910 /* There's a chance the new master received our lock before 4911 dlm_recover_master_reply(), this wouldn't happen if we did 4912 a barrier between recover_masters and recover_locks. */ |
4712 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id, 4713 (unsigned long)r, r->res_name); | 4913 4914 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d", 4915 lkid, rc->rc_header.h_nodeid, remid, result); 4916 |
4714 dlm_send_rcom_lock(r, lkb); 4715 goto out; 4716 case -EEXIST: | 4917 dlm_send_rcom_lock(r, lkb); 4918 goto out; 4919 case -EEXIST: |
4717 log_debug(ls, "master copy exists %x", lkb->lkb_id); 4718 /* fall through */ | |
4719 case 0: | 4920 case 0: |
4720 lkb->lkb_remid = le32_to_cpu(rl->rl_remid); | 4921 lkb->lkb_remid = remid; |
4721 break; 4722 default: | 4922 break; 4923 default: |
4723 log_error(ls, "dlm_recover_process_copy unknown error %d %x", 4724 error, lkb->lkb_id); | 4924 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk", 4925 lkid, rc->rc_header.h_nodeid, remid, result); |
4725 } 4726 4727 /* an ack for dlm_recover_locks() which waits for replies from 4728 all the locks it sends to new masters */ 4729 dlm_recovered_lock(r); 4730 out: 4731 unlock_rsb(r); 4732 put_rsb(r); --- 465 unchanged lines hidden --- | 4926 } 4927 4928 /* an ack for dlm_recover_locks() which waits for replies from 4929 all the locks it sends to new masters */ 4930 dlm_recovered_lock(r); 4931 out: 4932 unlock_rsb(r); 4933 put_rsb(r); --- 465 unchanged lines hidden --- |