lock.c (0b7877d4eea3f93e3dd941999522bbd8c538cb53) lock.c (4875647a08e35f77274838d97ca8fa44158d50e2)
1/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
5**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.

--- 146 unchanged lines hidden (view full) ---

155 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
156 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
157 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
158 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
159};
160
161void dlm_print_lkb(struct dlm_lkb *lkb)
162{
1/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
5**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.

--- 146 unchanged lines hidden (view full) ---

155 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
156 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
157 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
158 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
159};
160
161void dlm_print_lkb(struct dlm_lkb *lkb)
162{
163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
164 " status %d rqmode %d grmode %d wait_type %d\n",
163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
167 lkb->lkb_grmode, lkb->lkb_wait_type);
167 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168 (unsigned long long)lkb->lkb_recover_seq);
168}
169
170static void dlm_print_rsb(struct dlm_rsb *r)
171{
172 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
173 r->res_nodeid, r->res_flags, r->res_first_lkid,
174 r->res_recover_locks_count, r->res_name);
175}

--- 70 unchanged lines hidden (view full) ---

246
247static inline int is_process_copy(struct dlm_lkb *lkb)
248{
249 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
250}
251
252static inline int is_master_copy(struct dlm_lkb *lkb)
253{
169}
170
171static void dlm_print_rsb(struct dlm_rsb *r)
172{
173 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
174 r->res_nodeid, r->res_flags, r->res_first_lkid,
175 r->res_recover_locks_count, r->res_name);
176}

--- 70 unchanged lines hidden (view full) ---

247
248static inline int is_process_copy(struct dlm_lkb *lkb)
249{
250 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
251}
252
253static inline int is_master_copy(struct dlm_lkb *lkb)
254{
254 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
255 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
256 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
257}
258
259static inline int middle_conversion(struct dlm_lkb *lkb)
260{
261 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
262 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
263 return 1;

--- 210 unchanged lines hidden (view full) ---

474 struct dlm_rsb *r;
475 int error;
476
477 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
478 if (!error) {
479 kref_get(&r->res_ref);
480 goto out;
481 }
255 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
256}
257
258static inline int middle_conversion(struct dlm_lkb *lkb)
259{
260 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
262 return 1;

--- 210 unchanged lines hidden (view full) ---

473 struct dlm_rsb *r;
474 int error;
475
476 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
477 if (!error) {
478 kref_get(&r->res_ref);
479 goto out;
480 }
481 if (error == -ENOTBLK)
482 goto out;
483
482 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
483 if (error)
484 goto out;
485
486 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
487 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
488 if (error)
489 return error;

--- 91 unchanged lines hidden (view full) ---

581 error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
582 out_unlock:
583 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
584 out:
585 *r_ret = r;
586 return error;
587}
588
484 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
485 if (error)
486 goto out;
487
488 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
489 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
490 if (error)
491 return error;

--- 91 unchanged lines hidden (view full) ---

583 error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
584 out_unlock:
585 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
586 out:
587 *r_ret = r;
588 return error;
589}
590
591static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
592{
593 struct rb_node *n;
594 struct dlm_rsb *r;
595 int i;
596
597 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
598 spin_lock(&ls->ls_rsbtbl[i].lock);
599 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
600 r = rb_entry(n, struct dlm_rsb, res_hashnode);
601 if (r->res_hash == hash)
602 dlm_dump_rsb(r);
603 }
604 spin_unlock(&ls->ls_rsbtbl[i].lock);
605 }
606}
607
589/* This is only called to add a reference when the code already holds
590 a valid reference to the rsb, so there's no need for locking. */
591
592static inline void hold_rsb(struct dlm_rsb *r)
593{
594 kref_get(&r->res_ref);
595}
596

--- 462 unchanged lines hidden (view full) ---

1059 /* N.B. type of reply may not always correspond to type of original
1060 msg due to lookup->request optimization, verify others? */
1061
1062 if (lkb->lkb_wait_type) {
1063 lkb->lkb_wait_type = 0;
1064 goto out_del;
1065 }
1066
608/* This is only called to add a reference when the code already holds
609 a valid reference to the rsb, so there's no need for locking. */
610
611static inline void hold_rsb(struct dlm_rsb *r)
612{
613 kref_get(&r->res_ref);
614}
615

--- 462 unchanged lines hidden (view full) ---

1078 /* N.B. type of reply may not always correspond to type of original
1079 msg due to lookup->request optimization, verify others? */
1080
1081 if (lkb->lkb_wait_type) {
1082 lkb->lkb_wait_type = 0;
1083 goto out_del;
1084 }
1085
1067 log_error(ls, "remwait error %x reply %d flags %x no wait_type",
1068 lkb->lkb_id, mstype, lkb->lkb_flags);
1086 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1087 lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
1088 mstype, lkb->lkb_flags);
1069 return -1;
1070
1071 out_del:
1072 /* the force-unlock/cancel has completed and we haven't recvd a reply
1073 to the op that was in progress prior to the unlock/cancel; we
1074 give up on any reply to the earlier op. FIXME: not sure when/how
1075 this would happen */
1076

--- 416 unchanged lines hidden (view full) ---

1493 lkb->lkb_grmode = lkb->lkb_rqmode;
1494 if (lkb->lkb_status)
1495 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1496 else
1497 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1498 }
1499
1500 lkb->lkb_rqmode = DLM_LOCK_IV;
1089 return -1;
1090
1091 out_del:
1092 /* the force-unlock/cancel has completed and we haven't recvd a reply
1093 to the op that was in progress prior to the unlock/cancel; we
1094 give up on any reply to the earlier op. FIXME: not sure when/how
1095 this would happen */
1096

--- 416 unchanged lines hidden (view full) ---

1513 lkb->lkb_grmode = lkb->lkb_rqmode;
1514 if (lkb->lkb_status)
1515 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1516 else
1517 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1518 }
1519
1520 lkb->lkb_rqmode = DLM_LOCK_IV;
1521 lkb->lkb_highbast = 0;
1501}
1502
1503static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1504{
1505 set_lvb_lock(r, lkb);
1506 _grant_lock(r, lkb);
1522}
1523
1524static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1525{
1526 set_lvb_lock(r, lkb);
1527 _grant_lock(r, lkb);
1507 lkb->lkb_highbast = 0;
1508}
1509
1510static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1511 struct dlm_message *ms)
1512{
1513 set_lvb_lock_pc(r, lkb, ms);
1514 _grant_lock(r, lkb);
1515}

--- 345 unchanged lines hidden (view full) ---

1861 log_prints), we should be able to just call _can_be_granted() and not
1862 bother with the demote/deadlk cases here (and there's no easy way to deal
1863 with a deadlk here, we'd have to generate something like grant_lock with
1864 the deadlk error.) */
1865
1866/* Returns the highest requested mode of all blocked conversions; sets
1867 cw if there's a blocked conversion to DLM_LOCK_CW. */
1868
1528}
1529
1530static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1531 struct dlm_message *ms)
1532{
1533 set_lvb_lock_pc(r, lkb, ms);
1534 _grant_lock(r, lkb);
1535}

--- 345 unchanged lines hidden (view full) ---

1881 log_prints), we should be able to just call _can_be_granted() and not
1882 bother with the demote/deadlk cases here (and there's no easy way to deal
1883 with a deadlk here, we'd have to generate something like grant_lock with
1884 the deadlk error.) */
1885
1886/* Returns the highest requested mode of all blocked conversions; sets
1887 cw if there's a blocked conversion to DLM_LOCK_CW. */
1888
1869static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1889static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
1890 unsigned int *count)
1870{
1871 struct dlm_lkb *lkb, *s;
1872 int hi, demoted, quit, grant_restart, demote_restart;
1873 int deadlk;
1874
1875 quit = 0;
1876 restart:
1877 grant_restart = 0;
1878 demote_restart = 0;
1879 hi = DLM_LOCK_IV;
1880
1881 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1882 demoted = is_demoted(lkb);
1883 deadlk = 0;
1884
1885 if (can_be_granted(r, lkb, 0, &deadlk)) {
1886 grant_lock_pending(r, lkb);
1887 grant_restart = 1;
1891{
1892 struct dlm_lkb *lkb, *s;
1893 int hi, demoted, quit, grant_restart, demote_restart;
1894 int deadlk;
1895
1896 quit = 0;
1897 restart:
1898 grant_restart = 0;
1899 demote_restart = 0;
1900 hi = DLM_LOCK_IV;
1901
1902 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1903 demoted = is_demoted(lkb);
1904 deadlk = 0;
1905
1906 if (can_be_granted(r, lkb, 0, &deadlk)) {
1907 grant_lock_pending(r, lkb);
1908 grant_restart = 1;
1909 if (count)
1910 (*count)++;
1888 continue;
1889 }
1890
1891 if (!demoted && is_demoted(lkb)) {
1892 log_print("WARN: pending demoted %x node %d %s",
1893 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1894 demote_restart = 1;
1895 continue;

--- 17 unchanged lines hidden (view full) ---

1913 if (demote_restart && !quit) {
1914 quit = 1;
1915 goto restart;
1916 }
1917
1918 return max_t(int, high, hi);
1919}
1920
1911 continue;
1912 }
1913
1914 if (!demoted && is_demoted(lkb)) {
1915 log_print("WARN: pending demoted %x node %d %s",
1916 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1917 demote_restart = 1;
1918 continue;

--- 17 unchanged lines hidden (view full) ---

1936 if (demote_restart && !quit) {
1937 quit = 1;
1938 goto restart;
1939 }
1940
1941 return max_t(int, high, hi);
1942}
1943
1921static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1944static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
1945 unsigned int *count)
1922{
1923 struct dlm_lkb *lkb, *s;
1924
1925 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1946{
1947 struct dlm_lkb *lkb, *s;
1948
1949 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1926 if (can_be_granted(r, lkb, 0, NULL))
1950 if (can_be_granted(r, lkb, 0, NULL)) {
1927 grant_lock_pending(r, lkb);
1951 grant_lock_pending(r, lkb);
1928 else {
1952 if (count)
1953 (*count)++;
1954 } else {
1929 high = max_t(int, lkb->lkb_rqmode, high);
1930 if (lkb->lkb_rqmode == DLM_LOCK_CW)
1931 *cw = 1;
1932 }
1933 }
1934
1935 return high;
1936}

--- 12 unchanged lines hidden (view full) ---

1949 }
1950
1951 if (gr->lkb_highbast < high &&
1952 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1953 return 1;
1954 return 0;
1955}
1956
1955 high = max_t(int, lkb->lkb_rqmode, high);
1956 if (lkb->lkb_rqmode == DLM_LOCK_CW)
1957 *cw = 1;
1958 }
1959 }
1960
1961 return high;
1962}

--- 12 unchanged lines hidden (view full) ---

1975 }
1976
1977 if (gr->lkb_highbast < high &&
1978 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1979 return 1;
1980 return 0;
1981}
1982
1957static void grant_pending_locks(struct dlm_rsb *r)
1983static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
1958{
1959 struct dlm_lkb *lkb, *s;
1960 int high = DLM_LOCK_IV;
1961 int cw = 0;
1962
1984{
1985 struct dlm_lkb *lkb, *s;
1986 int high = DLM_LOCK_IV;
1987 int cw = 0;
1988
1963 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1989 if (!is_master(r)) {
1990 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
1991 dlm_dump_rsb(r);
1992 return;
1993 }
1964
1994
1965 high = grant_pending_convert(r, high, &cw);
1966 high = grant_pending_wait(r, high, &cw);
1995 high = grant_pending_convert(r, high, &cw, count);
1996 high = grant_pending_wait(r, high, &cw, count);
1967
1968 if (high == DLM_LOCK_IV)
1969 return;
1970
1971 /*
1972 * If there are locks left on the wait/convert queue then send blocking
1973 * ASTs to granted locks based on the largest requested mode (high)
1974 * found above.

--- 519 unchanged lines hidden (view full) ---

2494
2495 /* is_demoted() means the can_be_granted() above set the grmode
2496 to NL, and left us on the granted queue. This auto-demotion
2497 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2498 now grantable. We have to try to grant other converting locks
2499 before we try again to grant this one. */
2500
2501 if (is_demoted(lkb)) {
1997
1998 if (high == DLM_LOCK_IV)
1999 return;
2000
2001 /*
2002 * If there are locks left on the wait/convert queue then send blocking
2003 * ASTs to granted locks based on the largest requested mode (high)
2004 * found above.

--- 519 unchanged lines hidden (view full) ---

2524
2525 /* is_demoted() means the can_be_granted() above set the grmode
2526 to NL, and left us on the granted queue. This auto-demotion
2527 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2528 now grantable. We have to try to grant other converting locks
2529 before we try again to grant this one. */
2530
2531 if (is_demoted(lkb)) {
2502 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2532 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
2503 if (_can_be_granted(r, lkb, 1)) {
2504 grant_lock(r, lkb);
2505 queue_cast(r, lkb, 0);
2506 goto out;
2507 }
2508 /* else fall through and move to convert queue */
2509 }
2510

--- 11 unchanged lines hidden (view full) ---

2522 return error;
2523}
2524
2525static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2526 int error)
2527{
2528 switch (error) {
2529 case 0:
2533 if (_can_be_granted(r, lkb, 1)) {
2534 grant_lock(r, lkb);
2535 queue_cast(r, lkb, 0);
2536 goto out;
2537 }
2538 /* else fall through and move to convert queue */
2539 }
2540

--- 11 unchanged lines hidden (view full) ---

2552 return error;
2553}
2554
2555static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2556 int error)
2557{
2558 switch (error) {
2559 case 0:
2530 grant_pending_locks(r);
2560 grant_pending_locks(r, NULL);
2531 /* grant_pending_locks also sends basts */
2532 break;
2533 case -EAGAIN:
2534 if (force_blocking_asts(lkb))
2535 send_blocking_asts_all(r, lkb);
2536 break;
2537 case -EINPROGRESS:
2538 send_blocking_asts(r, lkb);

--- 6 unchanged lines hidden (view full) ---

2545 remove_lock(r, lkb);
2546 queue_cast(r, lkb, -DLM_EUNLOCK);
2547 return -DLM_EUNLOCK;
2548}
2549
2550static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2551 int error)
2552{
2561 /* grant_pending_locks also sends basts */
2562 break;
2563 case -EAGAIN:
2564 if (force_blocking_asts(lkb))
2565 send_blocking_asts_all(r, lkb);
2566 break;
2567 case -EINPROGRESS:
2568 send_blocking_asts(r, lkb);

--- 6 unchanged lines hidden (view full) ---

2575 remove_lock(r, lkb);
2576 queue_cast(r, lkb, -DLM_EUNLOCK);
2577 return -DLM_EUNLOCK;
2578}
2579
2580static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2581 int error)
2582{
2553 grant_pending_locks(r);
2583 grant_pending_locks(r, NULL);
2554}
2555
2556/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2557
2558static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2559{
2560 int error;
2561

--- 4 unchanged lines hidden (view full) ---

2566 }
2567 return 0;
2568}
2569
2570static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2571 int error)
2572{
2573 if (error)
2584}
2585
2586/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2587
2588static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2589{
2590 int error;
2591

--- 4 unchanged lines hidden (view full) ---

2596 }
2597 return 0;
2598}
2599
2600static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2601 int error)
2602{
2603 if (error)
2574 grant_pending_locks(r);
2604 grant_pending_locks(r, NULL);
2575}
2576
2577/*
2578 * Four stage 3 varieties:
2579 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2580 */
2581
2582/* add a new lkb to a possibly new rsb, called by requesting process */

--- 784 unchanged lines hidden (view full) ---

3367 if (error)
3368 log_error(lkb->lkb_resource->res_ls,
3369 "ignore invalid message %d from %d %x %x %x %d",
3370 ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3371 lkb->lkb_flags, lkb->lkb_nodeid);
3372 return error;
3373}
3374
2605}
2606
2607/*
2608 * Four stage 3 varieties:
2609 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2610 */
2611
2612/* add a new lkb to a possibly new rsb, called by requesting process */

--- 784 unchanged lines hidden (view full) ---

3397 if (error)
3398 log_error(lkb->lkb_resource->res_ls,
3399 "ignore invalid message %d from %d %x %x %x %d",
3400 ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3401 lkb->lkb_flags, lkb->lkb_nodeid);
3402 return error;
3403}
3404
3375static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3405static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3376{
3377 struct dlm_lkb *lkb;
3378 struct dlm_rsb *r;
3379 int error, namelen;
3380
3381 error = create_lkb(ls, &lkb);
3382 if (error)
3383 goto fail;

--- 23 unchanged lines hidden (view full) ---

3407
3408 unlock_rsb(r);
3409 put_rsb(r);
3410
3411 if (error == -EINPROGRESS)
3412 error = 0;
3413 if (error)
3414 dlm_put_lkb(lkb);
3406{
3407 struct dlm_lkb *lkb;
3408 struct dlm_rsb *r;
3409 int error, namelen;
3410
3411 error = create_lkb(ls, &lkb);
3412 if (error)
3413 goto fail;

--- 23 unchanged lines hidden (view full) ---

3437
3438 unlock_rsb(r);
3439 put_rsb(r);
3440
3441 if (error == -EINPROGRESS)
3442 error = 0;
3443 if (error)
3444 dlm_put_lkb(lkb);
3415 return;
3445 return 0;
3416
3417 fail:
3418 setup_stub_lkb(ls, ms);
3419 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3446
3447 fail:
3448 setup_stub_lkb(ls, ms);
3449 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3450 return error;
3420}
3421
3451}
3452
3422static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3453static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3423{
3424 struct dlm_lkb *lkb;
3425 struct dlm_rsb *r;
3426 int error, reply = 1;
3427
3428 error = find_lkb(ls, ms->m_remid, &lkb);
3429 if (error)
3430 goto fail;
3431
3454{
3455 struct dlm_lkb *lkb;
3456 struct dlm_rsb *r;
3457 int error, reply = 1;
3458
3459 error = find_lkb(ls, ms->m_remid, &lkb);
3460 if (error)
3461 goto fail;
3462
3463 if (lkb->lkb_remid != ms->m_lkid) {
3464 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
3465 "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
3466 (unsigned long long)lkb->lkb_recover_seq,
3467 ms->m_header.h_nodeid, ms->m_lkid);
3468 error = -ENOENT;
3469 goto fail;
3470 }
3471
3432 r = lkb->lkb_resource;
3433
3434 hold_rsb(r);
3435 lock_rsb(r);
3436
3437 error = validate_message(lkb, ms);
3438 if (error)
3439 goto out;

--- 11 unchanged lines hidden (view full) ---

3451 error = do_convert(r, lkb);
3452 if (reply)
3453 send_convert_reply(r, lkb, error);
3454 do_convert_effects(r, lkb, error);
3455 out:
3456 unlock_rsb(r);
3457 put_rsb(r);
3458 dlm_put_lkb(lkb);
3472 r = lkb->lkb_resource;
3473
3474 hold_rsb(r);
3475 lock_rsb(r);
3476
3477 error = validate_message(lkb, ms);
3478 if (error)
3479 goto out;

--- 11 unchanged lines hidden (view full) ---

3491 error = do_convert(r, lkb);
3492 if (reply)
3493 send_convert_reply(r, lkb, error);
3494 do_convert_effects(r, lkb, error);
3495 out:
3496 unlock_rsb(r);
3497 put_rsb(r);
3498 dlm_put_lkb(lkb);
3459 return;
3499 return 0;
3460
3461 fail:
3462 setup_stub_lkb(ls, ms);
3463 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3500
3501 fail:
3502 setup_stub_lkb(ls, ms);
3503 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3504 return error;
3464}
3465
3505}
3506
3466static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3507static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3467{
3468 struct dlm_lkb *lkb;
3469 struct dlm_rsb *r;
3470 int error;
3471
3472 error = find_lkb(ls, ms->m_remid, &lkb);
3473 if (error)
3474 goto fail;
3475
3508{
3509 struct dlm_lkb *lkb;
3510 struct dlm_rsb *r;
3511 int error;
3512
3513 error = find_lkb(ls, ms->m_remid, &lkb);
3514 if (error)
3515 goto fail;
3516
3517 if (lkb->lkb_remid != ms->m_lkid) {
3518 log_error(ls, "receive_unlock %x remid %x remote %d %x",
3519 lkb->lkb_id, lkb->lkb_remid,
3520 ms->m_header.h_nodeid, ms->m_lkid);
3521 error = -ENOENT;
3522 goto fail;
3523 }
3524
3476 r = lkb->lkb_resource;
3477
3478 hold_rsb(r);
3479 lock_rsb(r);
3480
3481 error = validate_message(lkb, ms);
3482 if (error)
3483 goto out;

--- 8 unchanged lines hidden (view full) ---

3492
3493 error = do_unlock(r, lkb);
3494 send_unlock_reply(r, lkb, error);
3495 do_unlock_effects(r, lkb, error);
3496 out:
3497 unlock_rsb(r);
3498 put_rsb(r);
3499 dlm_put_lkb(lkb);
3525 r = lkb->lkb_resource;
3526
3527 hold_rsb(r);
3528 lock_rsb(r);
3529
3530 error = validate_message(lkb, ms);
3531 if (error)
3532 goto out;

--- 8 unchanged lines hidden (view full) ---

3541
3542 error = do_unlock(r, lkb);
3543 send_unlock_reply(r, lkb, error);
3544 do_unlock_effects(r, lkb, error);
3545 out:
3546 unlock_rsb(r);
3547 put_rsb(r);
3548 dlm_put_lkb(lkb);
3500 return;
3549 return 0;
3501
3502 fail:
3503 setup_stub_lkb(ls, ms);
3504 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3550
3551 fail:
3552 setup_stub_lkb(ls, ms);
3553 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3554 return error;
3505}
3506
3555}
3556
3507static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3557static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3508{
3509 struct dlm_lkb *lkb;
3510 struct dlm_rsb *r;
3511 int error;
3512
3513 error = find_lkb(ls, ms->m_remid, &lkb);
3514 if (error)
3515 goto fail;

--- 11 unchanged lines hidden (view full) ---

3527
3528 error = do_cancel(r, lkb);
3529 send_cancel_reply(r, lkb, error);
3530 do_cancel_effects(r, lkb, error);
3531 out:
3532 unlock_rsb(r);
3533 put_rsb(r);
3534 dlm_put_lkb(lkb);
3558{
3559 struct dlm_lkb *lkb;
3560 struct dlm_rsb *r;
3561 int error;
3562
3563 error = find_lkb(ls, ms->m_remid, &lkb);
3564 if (error)
3565 goto fail;

--- 11 unchanged lines hidden (view full) ---

3577
3578 error = do_cancel(r, lkb);
3579 send_cancel_reply(r, lkb, error);
3580 do_cancel_effects(r, lkb, error);
3581 out:
3582 unlock_rsb(r);
3583 put_rsb(r);
3584 dlm_put_lkb(lkb);
3535 return;
3585 return 0;
3536
3537 fail:
3538 setup_stub_lkb(ls, ms);
3539 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3586
3587 fail:
3588 setup_stub_lkb(ls, ms);
3589 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3590 return error;
3540}
3541
3591}
3592
3542static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3593static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3543{
3544 struct dlm_lkb *lkb;
3545 struct dlm_rsb *r;
3546 int error;
3547
3548 error = find_lkb(ls, ms->m_remid, &lkb);
3594{
3595 struct dlm_lkb *lkb;
3596 struct dlm_rsb *r;
3597 int error;
3598
3599 error = find_lkb(ls, ms->m_remid, &lkb);
3549 if (error) {
3550 log_debug(ls, "receive_grant from %d no lkb %x",
3551 ms->m_header.h_nodeid, ms->m_remid);
3552 return;
3553 }
3600 if (error)
3601 return error;
3554
3555 r = lkb->lkb_resource;
3556
3557 hold_rsb(r);
3558 lock_rsb(r);
3559
3560 error = validate_message(lkb, ms);
3561 if (error)
3562 goto out;
3563
3564 receive_flags_reply(lkb, ms);
3565 if (is_altmode(lkb))
3566 munge_altmode(lkb, ms);
3567 grant_lock_pc(r, lkb, ms);
3568 queue_cast(r, lkb, 0);
3569 out:
3570 unlock_rsb(r);
3571 put_rsb(r);
3572 dlm_put_lkb(lkb);
3602
3603 r = lkb->lkb_resource;
3604
3605 hold_rsb(r);
3606 lock_rsb(r);
3607
3608 error = validate_message(lkb, ms);
3609 if (error)
3610 goto out;
3611
3612 receive_flags_reply(lkb, ms);
3613 if (is_altmode(lkb))
3614 munge_altmode(lkb, ms);
3615 grant_lock_pc(r, lkb, ms);
3616 queue_cast(r, lkb, 0);
3617 out:
3618 unlock_rsb(r);
3619 put_rsb(r);
3620 dlm_put_lkb(lkb);
3621 return 0;
3573}
3574
3622}
3623
3575static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3624static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3576{
3577 struct dlm_lkb *lkb;
3578 struct dlm_rsb *r;
3579 int error;
3580
3581 error = find_lkb(ls, ms->m_remid, &lkb);
3625{
3626 struct dlm_lkb *lkb;
3627 struct dlm_rsb *r;
3628 int error;
3629
3630 error = find_lkb(ls, ms->m_remid, &lkb);
3582 if (error) {
3583 log_debug(ls, "receive_bast from %d no lkb %x",
3584 ms->m_header.h_nodeid, ms->m_remid);
3585 return;
3586 }
3631 if (error)
3632 return error;
3587
3588 r = lkb->lkb_resource;
3589
3590 hold_rsb(r);
3591 lock_rsb(r);
3592
3593 error = validate_message(lkb, ms);
3594 if (error)
3595 goto out;
3596
3597 queue_bast(r, lkb, ms->m_bastmode);
3633
3634 r = lkb->lkb_resource;
3635
3636 hold_rsb(r);
3637 lock_rsb(r);
3638
3639 error = validate_message(lkb, ms);
3640 if (error)
3641 goto out;
3642
3643 queue_bast(r, lkb, ms->m_bastmode);
3644 lkb->lkb_highbast = ms->m_bastmode;
3598 out:
3599 unlock_rsb(r);
3600 put_rsb(r);
3601 dlm_put_lkb(lkb);
3645 out:
3646 unlock_rsb(r);
3647 put_rsb(r);
3648 dlm_put_lkb(lkb);
3649 return 0;
3602}
3603
3604static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3605{
3606 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3607
3608 from_nodeid = ms->m_header.h_nodeid;
3609 our_nodeid = dlm_our_nodeid();

--- 38 unchanged lines hidden (view full) ---

3648 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3649}
3650
3651static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3652{
3653 do_purge(ls, ms->m_nodeid, ms->m_pid);
3654}
3655
3650}
3651
3652static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3653{
3654 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3655
3656 from_nodeid = ms->m_header.h_nodeid;
3657 our_nodeid = dlm_our_nodeid();

--- 38 unchanged lines hidden (view full) ---

3696 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3697}
3698
3699static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3700{
3701 do_purge(ls, ms->m_nodeid, ms->m_pid);
3702}
3703
3656static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3704static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3657{
3658 struct dlm_lkb *lkb;
3659 struct dlm_rsb *r;
3660 int error, mstype, result;
3661
3662 error = find_lkb(ls, ms->m_remid, &lkb);
3705{
3706 struct dlm_lkb *lkb;
3707 struct dlm_rsb *r;
3708 int error, mstype, result;
3709
3710 error = find_lkb(ls, ms->m_remid, &lkb);
3663 if (error) {
3664 log_debug(ls, "receive_request_reply from %d no lkb %x",
3665 ms->m_header.h_nodeid, ms->m_remid);
3666 return;
3667 }
3711 if (error)
3712 return error;
3668
3669 r = lkb->lkb_resource;
3670 hold_rsb(r);
3671 lock_rsb(r);
3672
3673 error = validate_message(lkb, ms);
3674 if (error)
3675 goto out;
3676
3677 mstype = lkb->lkb_wait_type;
3678 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3713
3714 r = lkb->lkb_resource;
3715 hold_rsb(r);
3716 lock_rsb(r);
3717
3718 error = validate_message(lkb, ms);
3719 if (error)
3720 goto out;
3721
3722 mstype = lkb->lkb_wait_type;
3723 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3679 if (error)
3724 if (error) {
3725 log_error(ls, "receive_request_reply %x remote %d %x result %d",
3726 lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
3727 ms->m_result);
3728 dlm_dump_rsb(r);
3680 goto out;
3729 goto out;
3730 }
3681
3682 /* Optimization: the dir node was also the master, so it took our
3683 lookup as a request and sent request reply instead of lookup reply */
3684 if (mstype == DLM_MSG_LOOKUP) {
3685 r->res_nodeid = ms->m_header.h_nodeid;
3686 lkb->lkb_nodeid = r->res_nodeid;
3687 }
3688

--- 61 unchanged lines hidden (view full) ---

3750 } else {
3751 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3752 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3753 }
3754 out:
3755 unlock_rsb(r);
3756 put_rsb(r);
3757 dlm_put_lkb(lkb);
3731
3732 /* Optimization: the dir node was also the master, so it took our
3733 lookup as a request and sent request reply instead of lookup reply */
3734 if (mstype == DLM_MSG_LOOKUP) {
3735 r->res_nodeid = ms->m_header.h_nodeid;
3736 lkb->lkb_nodeid = r->res_nodeid;
3737 }
3738

--- 61 unchanged lines hidden (view full) ---

3800 } else {
3801 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3802 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3803 }
3804 out:
3805 unlock_rsb(r);
3806 put_rsb(r);
3807 dlm_put_lkb(lkb);
3808 return 0;
3758}
3759
3760static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3761 struct dlm_message *ms)
3762{
3763 /* this is the value returned from do_convert() on the master */
3764 switch (ms->m_result) {
3765 case -EAGAIN:

--- 22 unchanged lines hidden (view full) ---

3788 receive_flags_reply(lkb, ms);
3789 if (is_demoted(lkb))
3790 munge_demoted(lkb);
3791 grant_lock_pc(r, lkb, ms);
3792 queue_cast(r, lkb, 0);
3793 break;
3794
3795 default:
3809}
3810
3811static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3812 struct dlm_message *ms)
3813{
3814 /* this is the value returned from do_convert() on the master */
3815 switch (ms->m_result) {
3816 case -EAGAIN:

--- 22 unchanged lines hidden (view full) ---

3839 receive_flags_reply(lkb, ms);
3840 if (is_demoted(lkb))
3841 munge_demoted(lkb);
3842 grant_lock_pc(r, lkb, ms);
3843 queue_cast(r, lkb, 0);
3844 break;
3845
3846 default:
3796 log_error(r->res_ls, "receive_convert_reply %x error %d",
3797 lkb->lkb_id, ms->m_result);
3847 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
3848 lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
3849 ms->m_result);
3850 dlm_print_rsb(r);
3851 dlm_print_lkb(lkb);
3798 }
3799}
3800
3801static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3802{
3803 struct dlm_rsb *r = lkb->lkb_resource;
3804 int error;
3805

--- 10 unchanged lines hidden (view full) ---

3816 goto out;
3817
3818 __receive_convert_reply(r, lkb, ms);
3819 out:
3820 unlock_rsb(r);
3821 put_rsb(r);
3822}
3823
3852 }
3853}
3854
3855static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3856{
3857 struct dlm_rsb *r = lkb->lkb_resource;
3858 int error;
3859

--- 10 unchanged lines hidden (view full) ---

3870 goto out;
3871
3872 __receive_convert_reply(r, lkb, ms);
3873 out:
3874 unlock_rsb(r);
3875 put_rsb(r);
3876}
3877
3824static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3878static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3825{
3826 struct dlm_lkb *lkb;
3827 int error;
3828
3829 error = find_lkb(ls, ms->m_remid, &lkb);
3879{
3880 struct dlm_lkb *lkb;
3881 int error;
3882
3883 error = find_lkb(ls, ms->m_remid, &lkb);
3830 if (error) {
3831 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3832 ms->m_header.h_nodeid, ms->m_remid);
3833 return;
3834 }
3884 if (error)
3885 return error;
3835
3836 _receive_convert_reply(lkb, ms);
3837 dlm_put_lkb(lkb);
3886
3887 _receive_convert_reply(lkb, ms);
3888 dlm_put_lkb(lkb);
3889 return 0;
3838}
3839
3840static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3841{
3842 struct dlm_rsb *r = lkb->lkb_resource;
3843 int error;
3844
3845 hold_rsb(r);

--- 22 unchanged lines hidden (view full) ---

3868 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3869 lkb->lkb_id, ms->m_result);
3870 }
3871 out:
3872 unlock_rsb(r);
3873 put_rsb(r);
3874}
3875
3890}
3891
3892static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3893{
3894 struct dlm_rsb *r = lkb->lkb_resource;
3895 int error;
3896
3897 hold_rsb(r);

--- 22 unchanged lines hidden (view full) ---

3920 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3921 lkb->lkb_id, ms->m_result);
3922 }
3923 out:
3924 unlock_rsb(r);
3925 put_rsb(r);
3926}
3927
3876static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3928static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3877{
3878 struct dlm_lkb *lkb;
3879 int error;
3880
3881 error = find_lkb(ls, ms->m_remid, &lkb);
3929{
3930 struct dlm_lkb *lkb;
3931 int error;
3932
3933 error = find_lkb(ls, ms->m_remid, &lkb);
3882 if (error) {
3883 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3884 ms->m_header.h_nodeid, ms->m_remid);
3885 return;
3886 }
3934 if (error)
3935 return error;
3887
3888 _receive_unlock_reply(lkb, ms);
3889 dlm_put_lkb(lkb);
3936
3937 _receive_unlock_reply(lkb, ms);
3938 dlm_put_lkb(lkb);
3939 return 0;
3890}
3891
3892static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3893{
3894 struct dlm_rsb *r = lkb->lkb_resource;
3895 int error;
3896
3897 hold_rsb(r);

--- 22 unchanged lines hidden (view full) ---

3920 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3921 lkb->lkb_id, ms->m_result);
3922 }
3923 out:
3924 unlock_rsb(r);
3925 put_rsb(r);
3926}
3927
3940}
3941
3942static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3943{
3944 struct dlm_rsb *r = lkb->lkb_resource;
3945 int error;
3946
3947 hold_rsb(r);

--- 22 unchanged lines hidden (view full) ---

3970 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3971 lkb->lkb_id, ms->m_result);
3972 }
3973 out:
3974 unlock_rsb(r);
3975 put_rsb(r);
3976}
3977
3928static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3978static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3929{
3930 struct dlm_lkb *lkb;
3931 int error;
3932
3933 error = find_lkb(ls, ms->m_remid, &lkb);
3979{
3980 struct dlm_lkb *lkb;
3981 int error;
3982
3983 error = find_lkb(ls, ms->m_remid, &lkb);
3934 if (error) {
3935 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3936 ms->m_header.h_nodeid, ms->m_remid);
3937 return;
3938 }
3984 if (error)
3985 return error;
3939
3940 _receive_cancel_reply(lkb, ms);
3941 dlm_put_lkb(lkb);
3986
3987 _receive_cancel_reply(lkb, ms);
3988 dlm_put_lkb(lkb);
3989 return 0;
3942}
3943
3944static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3945{
3946 struct dlm_lkb *lkb;
3947 struct dlm_rsb *r;
3948 int error, ret_nodeid;
3949
3950 error = find_lkb(ls, ms->m_lkid, &lkb);
3951 if (error) {
3990}
3991
3992static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3993{
3994 struct dlm_lkb *lkb;
3995 struct dlm_rsb *r;
3996 int error, ret_nodeid;
3997
3998 error = find_lkb(ls, ms->m_lkid, &lkb);
3999 if (error) {
3952 log_error(ls, "receive_lookup_reply no lkb");
4000 log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
3953 return;
3954 }
3955
3956 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3957 FIXME: will a non-zero error ever be returned? */
3958
3959 r = lkb->lkb_resource;
3960 hold_rsb(r);

--- 27 unchanged lines hidden (view full) ---

3988 if (!ret_nodeid)
3989 process_lookup_list(r);
3990 out:
3991 unlock_rsb(r);
3992 put_rsb(r);
3993 dlm_put_lkb(lkb);
3994}
3995
4001 return;
4002 }
4003
4004 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
4005 FIXME: will a non-zero error ever be returned? */
4006
4007 r = lkb->lkb_resource;
4008 hold_rsb(r);

--- 27 unchanged lines hidden (view full) ---

4036 if (!ret_nodeid)
4037 process_lookup_list(r);
4038 out:
4039 unlock_rsb(r);
4040 put_rsb(r);
4041 dlm_put_lkb(lkb);
4042}
4043
3996static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
4044static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4045 uint32_t saved_seq)
3997{
4046{
4047 int error = 0, noent = 0;
4048
3998 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3999 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
4000 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4001 ms->m_remid, ms->m_result);
4002 return;
4003 }
4004
4005 switch (ms->m_type) {
4006
4007 /* messages sent to a master node */
4008
4009 case DLM_MSG_REQUEST:
4049 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4050 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
4051 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4052 ms->m_remid, ms->m_result);
4053 return;
4054 }
4055
4056 switch (ms->m_type) {
4057
4058 /* messages sent to a master node */
4059
4060 case DLM_MSG_REQUEST:
4010 receive_request(ls, ms);
4061 error = receive_request(ls, ms);
4011 break;
4012
4013 case DLM_MSG_CONVERT:
4062 break;
4063
4064 case DLM_MSG_CONVERT:
4014 receive_convert(ls, ms);
4065 error = receive_convert(ls, ms);
4015 break;
4016
4017 case DLM_MSG_UNLOCK:
4066 break;
4067
4068 case DLM_MSG_UNLOCK:
4018 receive_unlock(ls, ms);
4069 error = receive_unlock(ls, ms);
4019 break;
4020
4021 case DLM_MSG_CANCEL:
4070 break;
4071
4072 case DLM_MSG_CANCEL:
4022 receive_cancel(ls, ms);
4073 noent = 1;
4074 error = receive_cancel(ls, ms);
4023 break;
4024
4025 /* messages sent from a master node (replies to above) */
4026
4027 case DLM_MSG_REQUEST_REPLY:
4075 break;
4076
4077 /* messages sent from a master node (replies to above) */
4078
4079 case DLM_MSG_REQUEST_REPLY:
4028 receive_request_reply(ls, ms);
4080 error = receive_request_reply(ls, ms);
4029 break;
4030
4031 case DLM_MSG_CONVERT_REPLY:
4081 break;
4082
4083 case DLM_MSG_CONVERT_REPLY:
4032 receive_convert_reply(ls, ms);
4084 error = receive_convert_reply(ls, ms);
4033 break;
4034
4035 case DLM_MSG_UNLOCK_REPLY:
4085 break;
4086
4087 case DLM_MSG_UNLOCK_REPLY:
4036 receive_unlock_reply(ls, ms);
4088 error = receive_unlock_reply(ls, ms);
4037 break;
4038
4039 case DLM_MSG_CANCEL_REPLY:
4089 break;
4090
4091 case DLM_MSG_CANCEL_REPLY:
4040 receive_cancel_reply(ls, ms);
4092 error = receive_cancel_reply(ls, ms);
4041 break;
4042
4043 /* messages sent from a master node (only two types of async msg) */
4044
4045 case DLM_MSG_GRANT:
4093 break;
4094
4095 /* messages sent from a master node (only two types of async msg) */
4096
4097 case DLM_MSG_GRANT:
4046 receive_grant(ls, ms);
4098 noent = 1;
4099 error = receive_grant(ls, ms);
4047 break;
4048
4049 case DLM_MSG_BAST:
4100 break;
4101
4102 case DLM_MSG_BAST:
4050 receive_bast(ls, ms);
4103 noent = 1;
4104 error = receive_bast(ls, ms);
4051 break;
4052
4053 /* messages sent to a dir node */
4054
4055 case DLM_MSG_LOOKUP:
4056 receive_lookup(ls, ms);
4057 break;
4058

--- 11 unchanged lines hidden (view full) ---

4070
4071 case DLM_MSG_PURGE:
4072 receive_purge(ls, ms);
4073 break;
4074
4075 default:
4076 log_error(ls, "unknown message type %d", ms->m_type);
4077 }
4105 break;
4106
4107 /* messages sent to a dir node */
4108
4109 case DLM_MSG_LOOKUP:
4110 receive_lookup(ls, ms);
4111 break;
4112

--- 11 unchanged lines hidden (view full) ---

4124
4125 case DLM_MSG_PURGE:
4126 receive_purge(ls, ms);
4127 break;
4128
4129 default:
4130 log_error(ls, "unknown message type %d", ms->m_type);
4131 }
4132
4133 /*
4134 * When checking for ENOENT, we're checking the result of
4135 * find_lkb(m_remid):
4136 *
4137 * The lock id referenced in the message wasn't found. This may
4138 * happen in normal usage for the async messages and cancel, so
4139 * only use log_debug for them.
4140 *
4141 * Some errors are expected and normal.
4142 */
4143
4144 if (error == -ENOENT && noent) {
4145 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4146 ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4147 ms->m_lkid, saved_seq);
4148 } else if (error == -ENOENT) {
4149 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4150 ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4151 ms->m_lkid, saved_seq);
4152
4153 if (ms->m_type == DLM_MSG_CONVERT)
4154 dlm_dump_rsb_hash(ls, ms->m_hash);
4155 }
4156
4157 if (error == -EINVAL) {
4158 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4159 "saved_seq %u",
4160 ms->m_type, ms->m_header.h_nodeid,
4161 ms->m_lkid, ms->m_remid, saved_seq);
4162 }
4078}
4079
4080/* If the lockspace is in recovery mode (locking stopped), then normal
4081 messages are saved on the requestqueue for processing after recovery is
4082 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
4083 messages off the requestqueue before we process new ones. This occurs right
4084 after recovery completes when we transition from saving all messages on
4085 requestqueue, to processing all the saved messages, to processing new
4086 messages as they arrive. */
4087
4088static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4089 int nodeid)
4090{
4091 if (dlm_locking_stopped(ls)) {
4092 dlm_add_requestqueue(ls, nodeid, ms);
4093 } else {
4094 dlm_wait_requestqueue(ls);
4163}
4164
4165/* If the lockspace is in recovery mode (locking stopped), then normal
4166 messages are saved on the requestqueue for processing after recovery is
4167 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
4168 messages off the requestqueue before we process new ones. This occurs right
4169 after recovery completes when we transition from saving all messages on
4170 requestqueue, to processing all the saved messages, to processing new
4171 messages as they arrive. */
4172
4173static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4174 int nodeid)
4175{
4176 if (dlm_locking_stopped(ls)) {
4177 dlm_add_requestqueue(ls, nodeid, ms);
4178 } else {
4179 dlm_wait_requestqueue(ls);
4095 _receive_message(ls, ms);
4180 _receive_message(ls, ms, 0);
4096 }
4097}
4098
4099/* This is called by dlm_recoverd to process messages that were saved on
4100 the requestqueue. */
4101
4181 }
4182}
4183
4184/* This is called by dlm_recoverd to process messages that were saved on
4185 the requestqueue. */
4186
4102void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
4187void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
4188 uint32_t saved_seq)
4103{
4189{
4104 _receive_message(ls, ms);
4190 _receive_message(ls, ms, saved_seq);
4105}
4106
4107/* This is called by the midcomms layer when something is received for
4108 the lockspace. It could be either a MSG (normal message sent as part of
4109 standard locking activity) or an RCOM (recovery message sent as part of
4110 lockspace recovery). */
4111
4112void dlm_receive_buffer(union dlm_packet *p, int nodeid)

--- 19 unchanged lines hidden (view full) ---

4132 if (hd->h_nodeid != nodeid) {
4133 log_print("invalid h_nodeid %d from %d lockspace %x",
4134 hd->h_nodeid, nodeid, hd->h_lockspace);
4135 return;
4136 }
4137
4138 ls = dlm_find_lockspace_global(hd->h_lockspace);
4139 if (!ls) {
4191}
4192
4193/* This is called by the midcomms layer when something is received for
4194 the lockspace. It could be either a MSG (normal message sent as part of
4195 standard locking activity) or an RCOM (recovery message sent as part of
4196 lockspace recovery). */
4197
4198void dlm_receive_buffer(union dlm_packet *p, int nodeid)

--- 19 unchanged lines hidden (view full) ---

4218 if (hd->h_nodeid != nodeid) {
4219 log_print("invalid h_nodeid %d from %d lockspace %x",
4220 hd->h_nodeid, nodeid, hd->h_lockspace);
4221 return;
4222 }
4223
4224 ls = dlm_find_lockspace_global(hd->h_lockspace);
4225 if (!ls) {
4140 if (dlm_config.ci_log_debug)
4141 log_print("invalid lockspace %x from %d cmd %d type %d",
4142 hd->h_lockspace, nodeid, hd->h_cmd, type);
4226 if (dlm_config.ci_log_debug) {
4227 printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4228 "%u from %d cmd %d type %d\n",
4229 hd->h_lockspace, nodeid, hd->h_cmd, type);
4230 }
4143
4144 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4145 dlm_send_ls_not_ready(nodeid, &p->rcom);
4146 return;
4147 }
4148
4149 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4150 be inactive (in this ls) before transitioning to recovery mode */

--- 31 unchanged lines hidden (view full) ---

4182
4183 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4184 conversions are async; there's no reply from the remote master */
4185}
4186
4187/* A waiting lkb needs recovery if the master node has failed, or
4188 the master node is changing (only when no directory is used) */
4189
4231
4232 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4233 dlm_send_ls_not_ready(nodeid, &p->rcom);
4234 return;
4235 }
4236
4237 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4238 be inactive (in this ls) before transitioning to recovery mode */

--- 31 unchanged lines hidden (view full) ---

4270
4271 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4272 conversions are async; there's no reply from the remote master */
4273}
4274
4275/* A waiting lkb needs recovery if the master node has failed, or
4276 the master node is changing (only when no directory is used) */
4277
4190static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4278static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
4279 int dir_nodeid)
4191{
4280{
4192 if (dlm_is_removed(ls, lkb->lkb_nodeid))
4281 if (dlm_no_directory(ls))
4193 return 1;
4194
4282 return 1;
4283
4195 if (!dlm_no_directory(ls))
4196 return 0;
4197
4198 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4284 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
4199 return 1;
4200
4201 return 0;
4202}
4203
4204/* Recovery for locks that are waiting for replies from nodes that are now
4205 gone. We can just complete unlocks and cancels by faking a reply from the
4206 dead node. Requests and up-conversions we flag to be resent after
4207 recovery. Down-conversions can just be completed with a fake reply like
4208 unlocks. Conversions between PR and CW need special attention. */
4209
4210void dlm_recover_waiters_pre(struct dlm_ls *ls)
4211{
4212 struct dlm_lkb *lkb, *safe;
4213 struct dlm_message *ms_stub;
4214 int wait_type, stub_unlock_result, stub_cancel_result;
4285 return 1;
4286
4287 return 0;
4288}
4289
4290/* Recovery for locks that are waiting for replies from nodes that are now
4291 gone. We can just complete unlocks and cancels by faking a reply from the
4292 dead node. Requests and up-conversions we flag to be resent after
4293 recovery. Down-conversions can just be completed with a fake reply like
4294 unlocks. Conversions between PR and CW need special attention. */
4295
4296void dlm_recover_waiters_pre(struct dlm_ls *ls)
4297{
4298 struct dlm_lkb *lkb, *safe;
4299 struct dlm_message *ms_stub;
4300 int wait_type, stub_unlock_result, stub_cancel_result;
4301 int dir_nodeid;
4215
4216 ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
4217 if (!ms_stub) {
4218 log_error(ls, "dlm_recover_waiters_pre no mem");
4219 return;
4220 }
4221
4222 mutex_lock(&ls->ls_waiters_mutex);
4223
4224 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4225
4302
4303 ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
4304 if (!ms_stub) {
4305 log_error(ls, "dlm_recover_waiters_pre no mem");
4306 return;
4307 }
4308
4309 mutex_lock(&ls->ls_waiters_mutex);
4310
4311 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4312
4313 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
4314
4226 /* exclude debug messages about unlocks because there can be so
4227 many and they aren't very interesting */
4228
4229 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4315 /* exclude debug messages about unlocks because there can be so
4316 many and they aren't very interesting */
4317
4318 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4230 log_debug(ls, "recover_waiter %x nodeid %d "
4231 "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
4232 lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
4319 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
4320 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
4321 lkb->lkb_id,
4322 lkb->lkb_remid,
4323 lkb->lkb_wait_type,
4324 lkb->lkb_resource->res_nodeid,
4325 lkb->lkb_nodeid,
4326 lkb->lkb_wait_nodeid,
4327 dir_nodeid);
4233 }
4234
4235 /* all outstanding lookups, regardless of destination will be
4236 resent after recovery is done */
4237
4238 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4239 lkb->lkb_flags |= DLM_IFL_RESEND;
4240 continue;
4241 }
4242
4328 }
4329
4330 /* all outstanding lookups, regardless of destination will be
4331 resent after recovery is done */
4332
4333 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4334 lkb->lkb_flags |= DLM_IFL_RESEND;
4335 continue;
4336 }
4337
4243 if (!waiter_needs_recovery(ls, lkb))
4338 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
4244 continue;
4245
4246 wait_type = lkb->lkb_wait_type;
4247 stub_unlock_result = -DLM_EUNLOCK;
4248 stub_cancel_result = -DLM_ECANCEL;
4249
4250 /* Main reply may have been received leaving a zero wait_type,
4251 but a reply for the overlapping op may not have been

--- 116 unchanged lines hidden (view full) ---

4368 hold_rsb(r);
4369 lock_rsb(r);
4370
4371 mstype = lkb->lkb_wait_type;
4372 oc = is_overlap_cancel(lkb);
4373 ou = is_overlap_unlock(lkb);
4374 err = 0;
4375
4339 continue;
4340
4341 wait_type = lkb->lkb_wait_type;
4342 stub_unlock_result = -DLM_EUNLOCK;
4343 stub_cancel_result = -DLM_ECANCEL;
4344
4345 /* Main reply may have been received leaving a zero wait_type,
4346 but a reply for the overlapping op may not have been

--- 116 unchanged lines hidden (view full) ---

4463 hold_rsb(r);
4464 lock_rsb(r);
4465
4466 mstype = lkb->lkb_wait_type;
4467 oc = is_overlap_cancel(lkb);
4468 ou = is_overlap_unlock(lkb);
4469 err = 0;
4470
4376 log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
4377 lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
4471 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
4472 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
4473 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
4474 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
4475 dlm_dir_nodeid(r), oc, ou);
4378
4379 /* At this point we assume that we won't get a reply to any
4380 previous op or overlap op on this lock. First, do a big
4381 remove_from_waiters() for all previous ops. */
4382
4383 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4384 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4385 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;

--- 35 unchanged lines hidden (view full) ---

4421 case DLM_MSG_CONVERT:
4422 _convert_lock(r, lkb);
4423 break;
4424 default:
4425 err = 1;
4426 }
4427 }
4428
4476
4477 /* At this point we assume that we won't get a reply to any
4478 previous op or overlap op on this lock. First, do a big
4479 remove_from_waiters() for all previous ops. */
4480
4481 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4482 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4483 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;

--- 35 unchanged lines hidden (view full) ---

4519 case DLM_MSG_CONVERT:
4520 _convert_lock(r, lkb);
4521 break;
4522 default:
4523 err = 1;
4524 }
4525 }
4526
4429 if (err)
4430 log_error(ls, "recover_waiters_post %x %d %x %d %d",
4431 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4527 if (err) {
4528 log_error(ls, "waiter %x msg %d r_nodeid %d "
4529 "dir_nodeid %d overlap %d %d",
4530 lkb->lkb_id, mstype, r->res_nodeid,
4531 dlm_dir_nodeid(r), oc, ou);
4532 }
4432 unlock_rsb(r);
4433 put_rsb(r);
4434 dlm_put_lkb(lkb);
4435 }
4436
4437 return error;
4438}
4439
4533 unlock_rsb(r);
4534 put_rsb(r);
4535 dlm_put_lkb(lkb);
4536 }
4537
4538 return error;
4539}
4540
4440static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4441 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4541static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
4542 struct list_head *list)
4442{
4543{
4443 struct dlm_ls *ls = r->res_ls;
4444 struct dlm_lkb *lkb, *safe;
4445
4544 struct dlm_lkb *lkb, *safe;
4545
4446 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4447 if (test(ls, lkb)) {
4448 rsb_set_flag(r, RSB_LOCKS_PURGED);
4449 del_lkb(r, lkb);
4450 /* this put should free the lkb */
4451 if (!dlm_put_lkb(lkb))
4452 log_error(ls, "purged lkb not released");
4453 }
4546 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
4547 if (!is_master_copy(lkb))
4548 continue;
4549
4550 /* don't purge lkbs we've added in recover_master_copy for
4551 the current recovery seq */
4552
4553 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
4554 continue;
4555
4556 del_lkb(r, lkb);
4557
4558 /* this put should free the lkb */
4559 if (!dlm_put_lkb(lkb))
4560 log_error(ls, "purged mstcpy lkb not released");
4454 }
4455}
4456
4561 }
4562}
4563
4457static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4564void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4458{
4565{
4459 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4460}
4566 struct dlm_ls *ls = r->res_ls;
4461
4567
4462static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4463{
4464 return is_master_copy(lkb);
4568 purge_mstcpy_list(ls, r, &r->res_grantqueue);
4569 purge_mstcpy_list(ls, r, &r->res_convertqueue);
4570 purge_mstcpy_list(ls, r, &r->res_waitqueue);
4465}
4466
4571}
4572
4467static void purge_dead_locks(struct dlm_rsb *r)
4573static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
4574 struct list_head *list,
4575 int nodeid_gone, unsigned int *count)
4468{
4576{
4469 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4470 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4471 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4472}
4577 struct dlm_lkb *lkb, *safe;
4473
4578
4474void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4475{
4476 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4477 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4478 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4579 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
4580 if (!is_master_copy(lkb))
4581 continue;
4582
4583 if ((lkb->lkb_nodeid == nodeid_gone) ||
4584 dlm_is_removed(ls, lkb->lkb_nodeid)) {
4585
4586 del_lkb(r, lkb);
4587
4588 /* this put should free the lkb */
4589 if (!dlm_put_lkb(lkb))
4590 log_error(ls, "purged dead lkb not released");
4591
4592 rsb_set_flag(r, RSB_RECOVER_GRANT);
4593
4594 (*count)++;
4595 }
4596 }
4479}
4480
4481/* Get rid of locks held by nodes that are gone. */
4482
4597}
4598
4599/* Get rid of locks held by nodes that are gone. */
4600
4483int dlm_purge_locks(struct dlm_ls *ls)
4601void dlm_recover_purge(struct dlm_ls *ls)
4484{
4485 struct dlm_rsb *r;
4602{
4603 struct dlm_rsb *r;
4604 struct dlm_member *memb;
4605 int nodes_count = 0;
4606 int nodeid_gone = 0;
4607 unsigned int lkb_count = 0;
4486
4608
4487 log_debug(ls, "dlm_purge_locks");
4609 /* cache one removed nodeid to optimize the common
4610 case of a single node removed */
4488
4611
4612 list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
4613 nodes_count++;
4614 nodeid_gone = memb->nodeid;
4615 }
4616
4617 if (!nodes_count)
4618 return;
4619
4489 down_write(&ls->ls_root_sem);
4490 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4491 hold_rsb(r);
4492 lock_rsb(r);
4620 down_write(&ls->ls_root_sem);
4621 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4622 hold_rsb(r);
4623 lock_rsb(r);
4493 if (is_master(r))
4494 purge_dead_locks(r);
4624 if (is_master(r)) {
4625 purge_dead_list(ls, r, &r->res_grantqueue,
4626 nodeid_gone, &lkb_count);
4627 purge_dead_list(ls, r, &r->res_convertqueue,
4628 nodeid_gone, &lkb_count);
4629 purge_dead_list(ls, r, &r->res_waitqueue,
4630 nodeid_gone, &lkb_count);
4631 }
4495 unlock_rsb(r);
4496 unhold_rsb(r);
4632 unlock_rsb(r);
4633 unhold_rsb(r);
4497
4498 schedule();
4634 cond_resched();
4499 }
4500 up_write(&ls->ls_root_sem);
4501
4635 }
4636 up_write(&ls->ls_root_sem);
4637
4502 return 0;
4638 if (lkb_count)
4639 log_debug(ls, "dlm_recover_purge %u locks for %u nodes",
4640 lkb_count, nodes_count);
4503}
4504
4641}
4642
4505static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4643static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
4506{
4507 struct rb_node *n;
4644{
4645 struct rb_node *n;
4508 struct dlm_rsb *r, *r_ret = NULL;
4646 struct dlm_rsb *r;
4509
4510 spin_lock(&ls->ls_rsbtbl[bucket].lock);
4511 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
4512 r = rb_entry(n, struct dlm_rsb, res_hashnode);
4647
4648 spin_lock(&ls->ls_rsbtbl[bucket].lock);
4649 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
4650 r = rb_entry(n, struct dlm_rsb, res_hashnode);
4513 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4651
4652 if (!rsb_flag(r, RSB_RECOVER_GRANT))
4514 continue;
4653 continue;
4654 rsb_clear_flag(r, RSB_RECOVER_GRANT);
4655 if (!is_master(r))
4656 continue;
4515 hold_rsb(r);
4657 hold_rsb(r);
4516 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4517 r_ret = r;
4518 break;
4658 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4659 return r;
4519 }
4520 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4660 }
4661 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4521 return r_ret;
4662 return NULL;
4522}
4523
4663}
4664
4524void dlm_grant_after_purge(struct dlm_ls *ls)
4665/*
4666 * Attempt to grant locks on resources that we are the master of.
4667 * Locks may have become grantable during recovery because locks
4668 * from departed nodes have been purged (or not rebuilt), allowing
4669 * previously blocked locks to now be granted. The subset of rsb's
4670 * we are interested in are those with lkb's on either the convert or
4671 * waiting queues.
4672 *
4673 * Simplest would be to go through each master rsb and check for non-empty
4674 * convert or waiting queues, and attempt to grant on those rsbs.
4675 * Checking the queues requires lock_rsb, though, for which we'd need
4676 * to release the rsbtbl lock. This would make iterating through all
4677 * rsb's very inefficient. So, we rely on earlier recovery routines
4678 * to set RECOVER_GRANT on any rsb's that we should attempt to grant
4679 * locks for.
4680 */
4681
4682void dlm_recover_grant(struct dlm_ls *ls)
4525{
4526 struct dlm_rsb *r;
4527 int bucket = 0;
4683{
4684 struct dlm_rsb *r;
4685 int bucket = 0;
4686 unsigned int count = 0;
4687 unsigned int rsb_count = 0;
4688 unsigned int lkb_count = 0;
4528
4529 while (1) {
4689
4690 while (1) {
4530 r = find_purged_rsb(ls, bucket);
4691 r = find_grant_rsb(ls, bucket);
4531 if (!r) {
4532 if (bucket == ls->ls_rsbtbl_size - 1)
4533 break;
4534 bucket++;
4535 continue;
4536 }
4692 if (!r) {
4693 if (bucket == ls->ls_rsbtbl_size - 1)
4694 break;
4695 bucket++;
4696 continue;
4697 }
4698 rsb_count++;
4699 count = 0;
4537 lock_rsb(r);
4700 lock_rsb(r);
4538 if (is_master(r)) {
4539 grant_pending_locks(r);
4540 confirm_master(r, 0);
4541 }
4701 grant_pending_locks(r, &count);
4702 lkb_count += count;
4703 confirm_master(r, 0);
4542 unlock_rsb(r);
4543 put_rsb(r);
4704 unlock_rsb(r);
4705 put_rsb(r);
4544 schedule();
4706 cond_resched();
4545 }
4707 }
4708
4709 if (lkb_count)
4710 log_debug(ls, "dlm_recover_grant %u locks on %u resources",
4711 lkb_count, rsb_count);
4546}
4547
4548static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4549 uint32_t remid)
4550{
4551 struct dlm_lkb *lkb;
4552
4553 list_for_each_entry(lkb, head, lkb_statequeue) {

--- 72 unchanged lines hidden (view full) ---

4626 back the rcom_lock struct we got but with the remid field filled in. */
4627
4628/* needs at least dlm_rcom + rcom_lock */
4629int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4630{
4631 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4632 struct dlm_rsb *r;
4633 struct dlm_lkb *lkb;
4712}
4713
4714static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4715 uint32_t remid)
4716{
4717 struct dlm_lkb *lkb;
4718
4719 list_for_each_entry(lkb, head, lkb_statequeue) {

--- 72 unchanged lines hidden (view full) ---

4792 back the rcom_lock struct we got but with the remid field filled in. */
4793
4794/* needs at least dlm_rcom + rcom_lock */
4795int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4796{
4797 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4798 struct dlm_rsb *r;
4799 struct dlm_lkb *lkb;
4800 uint32_t remid = 0;
4634 int error;
4635
4636 if (rl->rl_parent_lkid) {
4637 error = -EOPNOTSUPP;
4638 goto out;
4639 }
4640
4801 int error;
4802
4803 if (rl->rl_parent_lkid) {
4804 error = -EOPNOTSUPP;
4805 goto out;
4806 }
4807
4641 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4642 R_MASTER, &r);
4808 remid = le32_to_cpu(rl->rl_lkid);
4809
4810 /* In general we expect the rsb returned to be R_MASTER, but we don't
4811 have to require it. Recovery of masters on one node can overlap
4812 recovery of locks on another node, so one node can send us MSTCPY
4813 locks before we've made ourselves master of this rsb. We can still
4814 add new MSTCPY locks that we receive here without any harm; when
4815 we make ourselves master, dlm_recover_masters() won't touch the
4816 MSTCPY locks we've received early. */
4817
4818 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 0, &r);
4643 if (error)
4644 goto out;
4645
4819 if (error)
4820 goto out;
4821
4822 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
4823 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
4824 rc->rc_header.h_nodeid, remid);
4825 error = -EBADR;
4826 put_rsb(r);
4827 goto out;
4828 }
4829
4646 lock_rsb(r);
4647
4830 lock_rsb(r);
4831
4648 lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4832 lkb = search_remid(r, rc->rc_header.h_nodeid, remid);
4649 if (lkb) {
4650 error = -EEXIST;
4651 goto out_remid;
4652 }
4653
4654 error = create_lkb(ls, &lkb);
4655 if (error)
4656 goto out_unlock;
4657
4658 error = receive_rcom_lock_args(ls, lkb, r, rc);
4659 if (error) {
4660 __put_lkb(ls, lkb);
4661 goto out_unlock;
4662 }
4663
4664 attach_lkb(r, lkb);
4665 add_lkb(r, lkb, rl->rl_status);
4666 error = 0;
4833 if (lkb) {
4834 error = -EEXIST;
4835 goto out_remid;
4836 }
4837
4838 error = create_lkb(ls, &lkb);
4839 if (error)
4840 goto out_unlock;
4841
4842 error = receive_rcom_lock_args(ls, lkb, r, rc);
4843 if (error) {
4844 __put_lkb(ls, lkb);
4845 goto out_unlock;
4846 }
4847
4848 attach_lkb(r, lkb);
4849 add_lkb(r, lkb, rl->rl_status);
4850 error = 0;
4851 ls->ls_recover_locks_in++;
4667
4852
4853 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
4854 rsb_set_flag(r, RSB_RECOVER_GRANT);
4855
4668 out_remid:
4669 /* this is the new value returned to the lock holder for
4670 saving in its process-copy lkb */
4671 rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4672
4856 out_remid:
4857 /* this is the new value returned to the lock holder for
4858 saving in its process-copy lkb */
4859 rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4860
4861 lkb->lkb_recover_seq = ls->ls_recover_seq;
4862
4673 out_unlock:
4674 unlock_rsb(r);
4675 put_rsb(r);
4676 out:
4863 out_unlock:
4864 unlock_rsb(r);
4865 put_rsb(r);
4866 out:
4677 if (error)
4678 log_debug(ls, "recover_master_copy %d %x", error,
4679 le32_to_cpu(rl->rl_lkid));
4867 if (error && error != -EEXIST)
4868 log_debug(ls, "dlm_recover_master_copy remote %d %x error %d",
4869 rc->rc_header.h_nodeid, remid, error);
4680 rl->rl_result = cpu_to_le32(error);
4681 return error;
4682}
4683
4684/* needs at least dlm_rcom + rcom_lock */
4685int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4686{
4687 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4688 struct dlm_rsb *r;
4689 struct dlm_lkb *lkb;
4870 rl->rl_result = cpu_to_le32(error);
4871 return error;
4872}
4873
4874/* needs at least dlm_rcom + rcom_lock */
4875int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4876{
4877 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4878 struct dlm_rsb *r;
4879 struct dlm_lkb *lkb;
4690 int error;
4880 uint32_t lkid, remid;
4881 int error, result;
4691
4882
4692 error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4883 lkid = le32_to_cpu(rl->rl_lkid);
4884 remid = le32_to_cpu(rl->rl_remid);
4885 result = le32_to_cpu(rl->rl_result);
4886
4887 error = find_lkb(ls, lkid, &lkb);
4693 if (error) {
4888 if (error) {
4694 log_error(ls, "recover_process_copy no lkid %x",
4695 le32_to_cpu(rl->rl_lkid));
4889 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
4890 lkid, rc->rc_header.h_nodeid, remid, result);
4696 return error;
4697 }
4698
4891 return error;
4892 }
4893
4699 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4700
4701 error = le32_to_cpu(rl->rl_result);
4702
4703 r = lkb->lkb_resource;
4704 hold_rsb(r);
4705 lock_rsb(r);
4706
4894 r = lkb->lkb_resource;
4895 hold_rsb(r);
4896 lock_rsb(r);
4897
4707 switch (error) {
4898 if (!is_process_copy(lkb)) {
4899 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
4900 lkid, rc->rc_header.h_nodeid, remid, result);
4901 dlm_dump_rsb(r);
4902 unlock_rsb(r);
4903 put_rsb(r);
4904 dlm_put_lkb(lkb);
4905 return -EINVAL;
4906 }
4907
4908 switch (result) {
4708 case -EBADR:
4709 /* There's a chance the new master received our lock before
4710 dlm_recover_master_reply(), this wouldn't happen if we did
4711 a barrier between recover_masters and recover_locks. */
4909 case -EBADR:
4910 /* There's a chance the new master received our lock before
4911 dlm_recover_master_reply(), this wouldn't happen if we did
4912 a barrier between recover_masters and recover_locks. */
4712 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4713 (unsigned long)r, r->res_name);
4913
4914 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
4915 lkid, rc->rc_header.h_nodeid, remid, result);
4916
4714 dlm_send_rcom_lock(r, lkb);
4715 goto out;
4716 case -EEXIST:
4917 dlm_send_rcom_lock(r, lkb);
4918 goto out;
4919 case -EEXIST:
4717 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4718 /* fall through */
4719 case 0:
4920 case 0:
4720 lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4921 lkb->lkb_remid = remid;
4721 break;
4722 default:
4922 break;
4923 default:
4723 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4724 error, lkb->lkb_id);
4924 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
4925 lkid, rc->rc_header.h_nodeid, remid, result);
4725 }
4726
4727 /* an ack for dlm_recover_locks() which waits for replies from
4728 all the locks it sends to new masters */
4729 dlm_recovered_lock(r);
4730 out:
4731 unlock_rsb(r);
4732 put_rsb(r);

--- 465 unchanged lines hidden ---
4926 }
4927
4928 /* an ack for dlm_recover_locks() which waits for replies from
4929 all the locks it sends to new masters */
4930 dlm_recovered_lock(r);
4931 out:
4932 unlock_rsb(r);
4933 put_rsb(r);

--- 465 unchanged lines hidden ---