1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2010 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 #include <trace/events/dlm.h> 13 14 #include "dlm_internal.h" 15 #include "lvb_table.h" 16 #include "memory.h" 17 #include "lock.h" 18 #include "user.h" 19 #include "ast.h" 20 21 static void dlm_callback_work(struct work_struct *work) 22 { 23 struct dlm_callback *cb = container_of(work, struct dlm_callback, work); 24 25 if (cb->flags & DLM_CB_BAST) { 26 trace_dlm_bast(cb->ls_id, cb->lkb_id, cb->mode, cb->res_name, 27 cb->res_length); 28 cb->bastfn(cb->astparam, cb->mode); 29 } else if (cb->flags & DLM_CB_CAST) { 30 trace_dlm_ast(cb->ls_id, cb->lkb_id, cb->sb_status, 31 cb->sb_flags, cb->res_name, cb->res_length); 32 cb->lkb_lksb->sb_status = cb->sb_status; 33 cb->lkb_lksb->sb_flags = cb->sb_flags; 34 cb->astfn(cb->astparam); 35 } 36 37 dlm_free_cb(cb); 38 } 39 40 int dlm_queue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, 41 int status, uint32_t sbflags, 42 struct dlm_callback **cb) 43 { 44 struct dlm_rsb *rsb = lkb->lkb_resource; 45 int rv = DLM_ENQUEUE_CALLBACK_SUCCESS; 46 struct dlm_ls *ls = rsb->res_ls; 47 int copy_lvb = 0; 48 int prev_mode; 49 50 if (flags & DLM_CB_BAST) { 51 /* if cb is a bast, it should be skipped if the blocking mode is 52 * compatible with the last granted mode 53 */ 54 if (lkb->lkb_last_cast_cb_mode != -1) { 55 if (dlm_modes_compat(mode, lkb->lkb_last_cast_cb_mode)) { 56 log_debug(ls, "skip %x bast mode %d for cast mode %d", 57 lkb->lkb_id, mode, 58 lkb->lkb_last_cast_cb_mode); 59 goto out; 60 } 61 } 62 63 /* 64 * Suppress some redundant basts here, do more on removal. 65 * Don't even add a bast if the callback just before it 66 * is a bast for the same mode or a more restrictive mode. 67 * (the addional > PR check is needed for PR/CW inversion) 68 */ 69 if (lkb->lkb_last_cb_mode != -1 && 70 lkb->lkb_last_cb_flags & DLM_CB_BAST) { 71 prev_mode = lkb->lkb_last_cb_mode; 72 73 if ((prev_mode == mode) || 74 (prev_mode > mode && prev_mode > DLM_LOCK_PR)) { 75 log_debug(ls, "skip %x add bast mode %d for bast mode %d", 76 lkb->lkb_id, mode, prev_mode); 77 goto out; 78 } 79 } 80 81 lkb->lkb_last_bast_time = ktime_get(); 82 lkb->lkb_last_bast_cb_mode = mode; 83 } else if (flags & DLM_CB_CAST) { 84 if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) { 85 prev_mode = lkb->lkb_last_cast_cb_mode; 86 87 if (!status && lkb->lkb_lksb->sb_lvbptr && 88 dlm_lvb_operations[prev_mode + 1][mode + 1]) 89 copy_lvb = 1; 90 } 91 92 lkb->lkb_last_cast_cb_mode = mode; 93 lkb->lkb_last_cast_time = ktime_get(); 94 } 95 96 lkb->lkb_last_cb_mode = mode; 97 lkb->lkb_last_cb_flags = flags; 98 99 *cb = dlm_allocate_cb(); 100 if (!*cb) { 101 rv = DLM_ENQUEUE_CALLBACK_FAILURE; 102 goto out; 103 } 104 105 /* for tracing */ 106 (*cb)->lkb_id = lkb->lkb_id; 107 (*cb)->ls_id = ls->ls_global_id; 108 memcpy((*cb)->res_name, rsb->res_name, rsb->res_length); 109 (*cb)->res_length = rsb->res_length; 110 111 (*cb)->flags = flags; 112 (*cb)->mode = mode; 113 (*cb)->sb_status = status; 114 (*cb)->sb_flags = (sbflags & 0x000000FF); 115 (*cb)->copy_lvb = copy_lvb; 116 (*cb)->lkb_lksb = lkb->lkb_lksb; 117 118 rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED; 119 120 out: 121 return rv; 122 } 123 124 void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, 125 uint32_t sbflags) 126 { 127 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 128 struct dlm_callback *cb; 129 int rv; 130 131 if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) { 132 dlm_user_add_ast(lkb, flags, mode, status, sbflags); 133 return; 134 } 135 136 rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags, 137 &cb); 138 switch (rv) { 139 case DLM_ENQUEUE_CALLBACK_NEED_SCHED: 140 cb->astfn = lkb->lkb_astfn; 141 cb->bastfn = lkb->lkb_bastfn; 142 cb->astparam = lkb->lkb_astparam; 143 INIT_WORK(&cb->work, dlm_callback_work); 144 145 spin_lock_bh(&ls->ls_cb_lock); 146 if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) 147 list_add(&cb->list, &ls->ls_cb_delay); 148 else 149 queue_work(ls->ls_callback_wq, &cb->work); 150 spin_unlock_bh(&ls->ls_cb_lock); 151 break; 152 case DLM_ENQUEUE_CALLBACK_SUCCESS: 153 break; 154 case DLM_ENQUEUE_CALLBACK_FAILURE: 155 fallthrough; 156 default: 157 WARN_ON_ONCE(1); 158 break; 159 } 160 } 161 162 int dlm_callback_start(struct dlm_ls *ls) 163 { 164 ls->ls_callback_wq = alloc_ordered_workqueue("dlm_callback", 165 WQ_HIGHPRI | WQ_MEM_RECLAIM); 166 if (!ls->ls_callback_wq) { 167 log_print("can't start dlm_callback workqueue"); 168 return -ENOMEM; 169 } 170 return 0; 171 } 172 173 void dlm_callback_stop(struct dlm_ls *ls) 174 { 175 if (ls->ls_callback_wq) 176 destroy_workqueue(ls->ls_callback_wq); 177 } 178 179 void dlm_callback_suspend(struct dlm_ls *ls) 180 { 181 if (ls->ls_callback_wq) { 182 spin_lock_bh(&ls->ls_cb_lock); 183 set_bit(LSFL_CB_DELAY, &ls->ls_flags); 184 spin_unlock_bh(&ls->ls_cb_lock); 185 186 flush_workqueue(ls->ls_callback_wq); 187 } 188 } 189 190 #define MAX_CB_QUEUE 25 191 192 void dlm_callback_resume(struct dlm_ls *ls) 193 { 194 struct dlm_callback *cb, *safe; 195 int count = 0, sum = 0; 196 bool empty; 197 198 if (!ls->ls_callback_wq) 199 return; 200 201 more: 202 spin_lock_bh(&ls->ls_cb_lock); 203 list_for_each_entry_safe(cb, safe, &ls->ls_cb_delay, list) { 204 list_del(&cb->list); 205 queue_work(ls->ls_callback_wq, &cb->work); 206 count++; 207 if (count == MAX_CB_QUEUE) 208 break; 209 } 210 empty = list_empty(&ls->ls_cb_delay); 211 if (empty) 212 clear_bit(LSFL_CB_DELAY, &ls->ls_flags); 213 spin_unlock_bh(&ls->ls_cb_lock); 214 215 sum += count; 216 if (!empty) { 217 count = 0; 218 cond_resched(); 219 goto more; 220 } 221 222 if (sum) 223 log_rinfo(ls, "%s %d", __func__, sum); 224 } 225 226