xref: /linux/fs/ocfs2/dlmfs/userdlm.c (revision a44e4f3ab16bc808590763a543a93b6fbf3abcc4)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* -*- mode: c; c-basic-offset: 8; -*-
3  * vim: noexpandtab sw=8 ts=8 sts=0:
4  *
5  * userdlm.c
6  *
7  * Code which implements the kernel side of a minimal userspace
8  * interface to our DLM.
9  *
10  * Many of the functions here are pared down versions of dlmglue.c
11  * functions.
12  *
13  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
14  */
15 
16 #include <linux/signal.h>
17 #include <linux/sched/signal.h>
18 
19 #include <linux/module.h>
20 #include <linux/fs.h>
21 #include <linux/types.h>
22 #include <linux/crc32.h>
23 
24 #include "ocfs2_lockingver.h"
25 #include "stackglue.h"
26 #include "userdlm.h"
27 
28 #define MLOG_MASK_PREFIX ML_DLMFS
29 #include "cluster/masklog.h"
30 
31 
32 static inline struct user_lock_res *user_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
33 {
34 	return container_of(lksb, struct user_lock_res, l_lksb);
35 }
36 
37 static inline int user_check_wait_flag(struct user_lock_res *lockres,
38 				       int flag)
39 {
40 	int ret;
41 
42 	spin_lock(&lockres->l_lock);
43 	ret = lockres->l_flags & flag;
44 	spin_unlock(&lockres->l_lock);
45 
46 	return ret;
47 }
48 
49 static inline void user_wait_on_busy_lock(struct user_lock_res *lockres)
50 
51 {
52 	wait_event(lockres->l_event,
53 		   !user_check_wait_flag(lockres, USER_LOCK_BUSY));
54 }
55 
56 static inline void user_wait_on_blocked_lock(struct user_lock_res *lockres)
57 
58 {
59 	wait_event(lockres->l_event,
60 		   !user_check_wait_flag(lockres, USER_LOCK_BLOCKED));
61 }
62 
63 /* I heart container_of... */
64 static inline struct ocfs2_cluster_connection *
65 cluster_connection_from_user_lockres(struct user_lock_res *lockres)
66 {
67 	struct dlmfs_inode_private *ip;
68 
69 	ip = container_of(lockres,
70 			  struct dlmfs_inode_private,
71 			  ip_lockres);
72 	return ip->ip_conn;
73 }
74 
75 static struct inode *
76 user_dlm_inode_from_user_lockres(struct user_lock_res *lockres)
77 {
78 	struct dlmfs_inode_private *ip;
79 
80 	ip = container_of(lockres,
81 			  struct dlmfs_inode_private,
82 			  ip_lockres);
83 	return &ip->ip_vfs_inode;
84 }
85 
86 static inline void user_recover_from_dlm_error(struct user_lock_res *lockres)
87 {
88 	spin_lock(&lockres->l_lock);
89 	lockres->l_flags &= ~USER_LOCK_BUSY;
90 	spin_unlock(&lockres->l_lock);
91 }
92 
93 #define user_log_dlm_error(_func, _stat, _lockres) do {			\
94 	mlog(ML_ERROR, "Dlm error %d while calling %s on "		\
95 		"resource %.*s\n", _stat, _func,			\
96 		_lockres->l_namelen, _lockres->l_name); 		\
97 } while (0)
98 
99 /* WARNING: This function lives in a world where the only three lock
100  * levels are EX, PR, and NL. It *will* have to be adjusted when more
101  * lock types are added. */
102 static inline int user_highest_compat_lock_level(int level)
103 {
104 	int new_level = DLM_LOCK_EX;
105 
106 	if (level == DLM_LOCK_EX)
107 		new_level = DLM_LOCK_NL;
108 	else if (level == DLM_LOCK_PR)
109 		new_level = DLM_LOCK_PR;
110 	return new_level;
111 }
112 
113 static void user_ast(struct ocfs2_dlm_lksb *lksb)
114 {
115 	struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
116 	int status;
117 
118 	mlog(ML_BASTS, "AST fired for lockres %.*s, level %d => %d\n",
119 	     lockres->l_namelen, lockres->l_name, lockres->l_level,
120 	     lockres->l_requested);
121 
122 	spin_lock(&lockres->l_lock);
123 
124 	status = ocfs2_dlm_lock_status(&lockres->l_lksb);
125 	if (status) {
126 		mlog(ML_ERROR, "lksb status value of %u on lockres %.*s\n",
127 		     status, lockres->l_namelen, lockres->l_name);
128 		spin_unlock(&lockres->l_lock);
129 		return;
130 	}
131 
132 	mlog_bug_on_msg(lockres->l_requested == DLM_LOCK_IV,
133 			"Lockres %.*s, requested ivmode. flags 0x%x\n",
134 			lockres->l_namelen, lockres->l_name, lockres->l_flags);
135 
136 	/* we're downconverting. */
137 	if (lockres->l_requested < lockres->l_level) {
138 		if (lockres->l_requested <=
139 		    user_highest_compat_lock_level(lockres->l_blocking)) {
140 			lockres->l_blocking = DLM_LOCK_NL;
141 			lockres->l_flags &= ~USER_LOCK_BLOCKED;
142 		}
143 	}
144 
145 	lockres->l_level = lockres->l_requested;
146 	lockres->l_requested = DLM_LOCK_IV;
147 	lockres->l_flags |= USER_LOCK_ATTACHED;
148 	lockres->l_flags &= ~USER_LOCK_BUSY;
149 
150 	spin_unlock(&lockres->l_lock);
151 
152 	wake_up(&lockres->l_event);
153 }
154 
155 static inline void user_dlm_grab_inode_ref(struct user_lock_res *lockres)
156 {
157 	struct inode *inode;
158 	inode = user_dlm_inode_from_user_lockres(lockres);
159 	if (!igrab(inode))
160 		BUG();
161 }
162 
163 static void user_dlm_unblock_lock(struct work_struct *work);
164 
165 static void __user_dlm_queue_lockres(struct user_lock_res *lockres)
166 {
167 	if (!(lockres->l_flags & USER_LOCK_QUEUED)) {
168 		user_dlm_grab_inode_ref(lockres);
169 
170 		INIT_WORK(&lockres->l_work, user_dlm_unblock_lock);
171 
172 		queue_work(user_dlm_worker, &lockres->l_work);
173 		lockres->l_flags |= USER_LOCK_QUEUED;
174 	}
175 }
176 
177 static void __user_dlm_cond_queue_lockres(struct user_lock_res *lockres)
178 {
179 	int queue = 0;
180 
181 	if (!(lockres->l_flags & USER_LOCK_BLOCKED))
182 		return;
183 
184 	switch (lockres->l_blocking) {
185 	case DLM_LOCK_EX:
186 		if (!lockres->l_ex_holders && !lockres->l_ro_holders)
187 			queue = 1;
188 		break;
189 	case DLM_LOCK_PR:
190 		if (!lockres->l_ex_holders)
191 			queue = 1;
192 		break;
193 	default:
194 		BUG();
195 	}
196 
197 	if (queue)
198 		__user_dlm_queue_lockres(lockres);
199 }
200 
201 static void user_bast(struct ocfs2_dlm_lksb *lksb, int level)
202 {
203 	struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
204 
205 	mlog(ML_BASTS, "BAST fired for lockres %.*s, blocking %d, level %d\n",
206 	     lockres->l_namelen, lockres->l_name, level, lockres->l_level);
207 
208 	spin_lock(&lockres->l_lock);
209 	lockres->l_flags |= USER_LOCK_BLOCKED;
210 	if (level > lockres->l_blocking)
211 		lockres->l_blocking = level;
212 
213 	__user_dlm_queue_lockres(lockres);
214 	spin_unlock(&lockres->l_lock);
215 
216 	wake_up(&lockres->l_event);
217 }
218 
219 static void user_unlock_ast(struct ocfs2_dlm_lksb *lksb, int status)
220 {
221 	struct user_lock_res *lockres = user_lksb_to_lock_res(lksb);
222 
223 	mlog(ML_BASTS, "UNLOCK AST fired for lockres %.*s, flags 0x%x\n",
224 	     lockres->l_namelen, lockres->l_name, lockres->l_flags);
225 
226 	if (status)
227 		mlog(ML_ERROR, "dlm returns status %d\n", status);
228 
229 	spin_lock(&lockres->l_lock);
230 	/* The teardown flag gets set early during the unlock process,
231 	 * so test the cancel flag to make sure that this ast isn't
232 	 * for a concurrent cancel. */
233 	if (lockres->l_flags & USER_LOCK_IN_TEARDOWN
234 	    && !(lockres->l_flags & USER_LOCK_IN_CANCEL)) {
235 		lockres->l_level = DLM_LOCK_IV;
236 	} else if (status == DLM_CANCELGRANT) {
237 		/* We tried to cancel a convert request, but it was
238 		 * already granted. Don't clear the busy flag - the
239 		 * ast should've done this already. */
240 		BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL));
241 		lockres->l_flags &= ~USER_LOCK_IN_CANCEL;
242 		goto out_noclear;
243 	} else {
244 		BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL));
245 		/* Cancel succeeded, we want to re-queue */
246 		lockres->l_requested = DLM_LOCK_IV; /* cancel an
247 						    * upconvert
248 						    * request. */
249 		lockres->l_flags &= ~USER_LOCK_IN_CANCEL;
250 		/* we want the unblock thread to look at it again
251 		 * now. */
252 		if (lockres->l_flags & USER_LOCK_BLOCKED)
253 			__user_dlm_queue_lockres(lockres);
254 	}
255 
256 	lockres->l_flags &= ~USER_LOCK_BUSY;
257 out_noclear:
258 	spin_unlock(&lockres->l_lock);
259 
260 	wake_up(&lockres->l_event);
261 }
262 
263 /*
264  * This is the userdlmfs locking protocol version.
265  *
266  * See fs/ocfs2/dlmglue.c for more details on locking versions.
267  */
268 static struct ocfs2_locking_protocol user_dlm_lproto = {
269 	.lp_max_version = {
270 		.pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
271 		.pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
272 	},
273 	.lp_lock_ast		= user_ast,
274 	.lp_blocking_ast	= user_bast,
275 	.lp_unlock_ast		= user_unlock_ast,
276 };
277 
278 static inline void user_dlm_drop_inode_ref(struct user_lock_res *lockres)
279 {
280 	struct inode *inode;
281 	inode = user_dlm_inode_from_user_lockres(lockres);
282 	iput(inode);
283 }
284 
285 static void user_dlm_unblock_lock(struct work_struct *work)
286 {
287 	int new_level, status;
288 	struct user_lock_res *lockres =
289 		container_of(work, struct user_lock_res, l_work);
290 	struct ocfs2_cluster_connection *conn =
291 		cluster_connection_from_user_lockres(lockres);
292 
293 	mlog(0, "lockres %.*s\n", lockres->l_namelen, lockres->l_name);
294 
295 	spin_lock(&lockres->l_lock);
296 
297 	mlog_bug_on_msg(!(lockres->l_flags & USER_LOCK_QUEUED),
298 			"Lockres %.*s, flags 0x%x\n",
299 			lockres->l_namelen, lockres->l_name, lockres->l_flags);
300 
301 	/* notice that we don't clear USER_LOCK_BLOCKED here. If it's
302 	 * set, we want user_ast clear it. */
303 	lockres->l_flags &= ~USER_LOCK_QUEUED;
304 
305 	/* It's valid to get here and no longer be blocked - if we get
306 	 * several basts in a row, we might be queued by the first
307 	 * one, the unblock thread might run and clear the queued
308 	 * flag, and finally we might get another bast which re-queues
309 	 * us before our ast for the downconvert is called. */
310 	if (!(lockres->l_flags & USER_LOCK_BLOCKED)) {
311 		mlog(ML_BASTS, "lockres %.*s USER_LOCK_BLOCKED\n",
312 		     lockres->l_namelen, lockres->l_name);
313 		spin_unlock(&lockres->l_lock);
314 		goto drop_ref;
315 	}
316 
317 	if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
318 		mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_TEARDOWN\n",
319 		     lockres->l_namelen, lockres->l_name);
320 		spin_unlock(&lockres->l_lock);
321 		goto drop_ref;
322 	}
323 
324 	if (lockres->l_flags & USER_LOCK_BUSY) {
325 		if (lockres->l_flags & USER_LOCK_IN_CANCEL) {
326 			mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_CANCEL\n",
327 			     lockres->l_namelen, lockres->l_name);
328 			spin_unlock(&lockres->l_lock);
329 			goto drop_ref;
330 		}
331 
332 		lockres->l_flags |= USER_LOCK_IN_CANCEL;
333 		spin_unlock(&lockres->l_lock);
334 
335 		status = ocfs2_dlm_unlock(conn, &lockres->l_lksb,
336 					  DLM_LKF_CANCEL);
337 		if (status)
338 			user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
339 		goto drop_ref;
340 	}
341 
342 	/* If there are still incompat holders, we can exit safely
343 	 * without worrying about re-queueing this lock as that will
344 	 * happen on the last call to user_cluster_unlock. */
345 	if ((lockres->l_blocking == DLM_LOCK_EX)
346 	    && (lockres->l_ex_holders || lockres->l_ro_holders)) {
347 		spin_unlock(&lockres->l_lock);
348 		mlog(ML_BASTS, "lockres %.*s, EX/PR Holders %u,%u\n",
349 		     lockres->l_namelen, lockres->l_name,
350 		     lockres->l_ex_holders, lockres->l_ro_holders);
351 		goto drop_ref;
352 	}
353 
354 	if ((lockres->l_blocking == DLM_LOCK_PR)
355 	    && lockres->l_ex_holders) {
356 		spin_unlock(&lockres->l_lock);
357 		mlog(ML_BASTS, "lockres %.*s, EX Holders %u\n",
358 		     lockres->l_namelen, lockres->l_name,
359 		     lockres->l_ex_holders);
360 		goto drop_ref;
361 	}
362 
363 	/* yay, we can downconvert now. */
364 	new_level = user_highest_compat_lock_level(lockres->l_blocking);
365 	lockres->l_requested = new_level;
366 	lockres->l_flags |= USER_LOCK_BUSY;
367 	mlog(ML_BASTS, "lockres %.*s, downconvert %d => %d\n",
368 	     lockres->l_namelen, lockres->l_name, lockres->l_level, new_level);
369 	spin_unlock(&lockres->l_lock);
370 
371 	/* need lock downconvert request now... */
372 	status = ocfs2_dlm_lock(conn, new_level, &lockres->l_lksb,
373 				DLM_LKF_CONVERT|DLM_LKF_VALBLK,
374 				lockres->l_name,
375 				lockres->l_namelen);
376 	if (status) {
377 		user_log_dlm_error("ocfs2_dlm_lock", status, lockres);
378 		user_recover_from_dlm_error(lockres);
379 	}
380 
381 drop_ref:
382 	user_dlm_drop_inode_ref(lockres);
383 }
384 
385 static inline void user_dlm_inc_holders(struct user_lock_res *lockres,
386 					int level)
387 {
388 	switch(level) {
389 	case DLM_LOCK_EX:
390 		lockres->l_ex_holders++;
391 		break;
392 	case DLM_LOCK_PR:
393 		lockres->l_ro_holders++;
394 		break;
395 	default:
396 		BUG();
397 	}
398 }
399 
400 /* predict what lock level we'll be dropping down to on behalf
401  * of another node, and return true if the currently wanted
402  * level will be compatible with it. */
403 static inline int
404 user_may_continue_on_blocked_lock(struct user_lock_res *lockres,
405 				  int wanted)
406 {
407 	BUG_ON(!(lockres->l_flags & USER_LOCK_BLOCKED));
408 
409 	return wanted <= user_highest_compat_lock_level(lockres->l_blocking);
410 }
411 
412 int user_dlm_cluster_lock(struct user_lock_res *lockres,
413 			  int level,
414 			  int lkm_flags)
415 {
416 	int status, local_flags;
417 	struct ocfs2_cluster_connection *conn =
418 		cluster_connection_from_user_lockres(lockres);
419 
420 	if (level != DLM_LOCK_EX &&
421 	    level != DLM_LOCK_PR) {
422 		mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
423 		     lockres->l_namelen, lockres->l_name);
424 		status = -EINVAL;
425 		goto bail;
426 	}
427 
428 	mlog(ML_BASTS, "lockres %.*s, level %d, flags = 0x%x\n",
429 	     lockres->l_namelen, lockres->l_name, level, lkm_flags);
430 
431 again:
432 	if (signal_pending(current)) {
433 		status = -ERESTARTSYS;
434 		goto bail;
435 	}
436 
437 	spin_lock(&lockres->l_lock);
438 
439 	/* We only compare against the currently granted level
440 	 * here. If the lock is blocked waiting on a downconvert,
441 	 * we'll get caught below. */
442 	if ((lockres->l_flags & USER_LOCK_BUSY) &&
443 	    (level > lockres->l_level)) {
444 		/* is someone sitting in dlm_lock? If so, wait on
445 		 * them. */
446 		spin_unlock(&lockres->l_lock);
447 
448 		user_wait_on_busy_lock(lockres);
449 		goto again;
450 	}
451 
452 	if ((lockres->l_flags & USER_LOCK_BLOCKED) &&
453 	    (!user_may_continue_on_blocked_lock(lockres, level))) {
454 		/* is the lock is currently blocked on behalf of
455 		 * another node */
456 		spin_unlock(&lockres->l_lock);
457 
458 		user_wait_on_blocked_lock(lockres);
459 		goto again;
460 	}
461 
462 	if (level > lockres->l_level) {
463 		local_flags = lkm_flags | DLM_LKF_VALBLK;
464 		if (lockres->l_level != DLM_LOCK_IV)
465 			local_flags |= DLM_LKF_CONVERT;
466 
467 		lockres->l_requested = level;
468 		lockres->l_flags |= USER_LOCK_BUSY;
469 		spin_unlock(&lockres->l_lock);
470 
471 		BUG_ON(level == DLM_LOCK_IV);
472 		BUG_ON(level == DLM_LOCK_NL);
473 
474 		/* call dlm_lock to upgrade lock now */
475 		status = ocfs2_dlm_lock(conn, level, &lockres->l_lksb,
476 					local_flags, lockres->l_name,
477 					lockres->l_namelen);
478 		if (status) {
479 			if ((lkm_flags & DLM_LKF_NOQUEUE) &&
480 			    (status != -EAGAIN))
481 				user_log_dlm_error("ocfs2_dlm_lock",
482 						   status, lockres);
483 			user_recover_from_dlm_error(lockres);
484 			goto bail;
485 		}
486 
487 		user_wait_on_busy_lock(lockres);
488 		goto again;
489 	}
490 
491 	user_dlm_inc_holders(lockres, level);
492 	spin_unlock(&lockres->l_lock);
493 
494 	status = 0;
495 bail:
496 	return status;
497 }
498 
499 static inline void user_dlm_dec_holders(struct user_lock_res *lockres,
500 					int level)
501 {
502 	switch(level) {
503 	case DLM_LOCK_EX:
504 		BUG_ON(!lockres->l_ex_holders);
505 		lockres->l_ex_holders--;
506 		break;
507 	case DLM_LOCK_PR:
508 		BUG_ON(!lockres->l_ro_holders);
509 		lockres->l_ro_holders--;
510 		break;
511 	default:
512 		BUG();
513 	}
514 }
515 
516 void user_dlm_cluster_unlock(struct user_lock_res *lockres,
517 			     int level)
518 {
519 	if (level != DLM_LOCK_EX &&
520 	    level != DLM_LOCK_PR) {
521 		mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
522 		     lockres->l_namelen, lockres->l_name);
523 		return;
524 	}
525 
526 	spin_lock(&lockres->l_lock);
527 	user_dlm_dec_holders(lockres, level);
528 	__user_dlm_cond_queue_lockres(lockres);
529 	spin_unlock(&lockres->l_lock);
530 }
531 
532 void user_dlm_write_lvb(struct inode *inode,
533 			const char *val,
534 			unsigned int len)
535 {
536 	struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
537 	char *lvb;
538 
539 	BUG_ON(len > DLM_LVB_LEN);
540 
541 	spin_lock(&lockres->l_lock);
542 
543 	BUG_ON(lockres->l_level < DLM_LOCK_EX);
544 	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
545 	memcpy(lvb, val, len);
546 
547 	spin_unlock(&lockres->l_lock);
548 }
549 
550 ssize_t user_dlm_read_lvb(struct inode *inode,
551 			  char *val,
552 			  unsigned int len)
553 {
554 	struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres;
555 	char *lvb;
556 	ssize_t ret = len;
557 
558 	BUG_ON(len > DLM_LVB_LEN);
559 
560 	spin_lock(&lockres->l_lock);
561 
562 	BUG_ON(lockres->l_level < DLM_LOCK_PR);
563 	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)) {
564 		lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
565 		memcpy(val, lvb, len);
566 	} else
567 		ret = 0;
568 
569 	spin_unlock(&lockres->l_lock);
570 	return ret;
571 }
572 
573 void user_dlm_lock_res_init(struct user_lock_res *lockres,
574 			    struct dentry *dentry)
575 {
576 	memset(lockres, 0, sizeof(*lockres));
577 
578 	spin_lock_init(&lockres->l_lock);
579 	init_waitqueue_head(&lockres->l_event);
580 	lockres->l_level = DLM_LOCK_IV;
581 	lockres->l_requested = DLM_LOCK_IV;
582 	lockres->l_blocking = DLM_LOCK_IV;
583 
584 	/* should have been checked before getting here. */
585 	BUG_ON(dentry->d_name.len >= USER_DLM_LOCK_ID_MAX_LEN);
586 
587 	memcpy(lockres->l_name,
588 	       dentry->d_name.name,
589 	       dentry->d_name.len);
590 	lockres->l_namelen = dentry->d_name.len;
591 }
592 
593 int user_dlm_destroy_lock(struct user_lock_res *lockres)
594 {
595 	int status = -EBUSY;
596 	struct ocfs2_cluster_connection *conn =
597 		cluster_connection_from_user_lockres(lockres);
598 
599 	mlog(ML_BASTS, "lockres %.*s\n", lockres->l_namelen, lockres->l_name);
600 
601 	spin_lock(&lockres->l_lock);
602 	if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
603 		spin_unlock(&lockres->l_lock);
604 		return 0;
605 	}
606 
607 	lockres->l_flags |= USER_LOCK_IN_TEARDOWN;
608 
609 	while (lockres->l_flags & USER_LOCK_BUSY) {
610 		spin_unlock(&lockres->l_lock);
611 
612 		user_wait_on_busy_lock(lockres);
613 
614 		spin_lock(&lockres->l_lock);
615 	}
616 
617 	if (lockres->l_ro_holders || lockres->l_ex_holders) {
618 		spin_unlock(&lockres->l_lock);
619 		goto bail;
620 	}
621 
622 	status = 0;
623 	if (!(lockres->l_flags & USER_LOCK_ATTACHED)) {
624 		spin_unlock(&lockres->l_lock);
625 		goto bail;
626 	}
627 
628 	lockres->l_flags &= ~USER_LOCK_ATTACHED;
629 	lockres->l_flags |= USER_LOCK_BUSY;
630 	spin_unlock(&lockres->l_lock);
631 
632 	status = ocfs2_dlm_unlock(conn, &lockres->l_lksb, DLM_LKF_VALBLK);
633 	if (status) {
634 		user_log_dlm_error("ocfs2_dlm_unlock", status, lockres);
635 		goto bail;
636 	}
637 
638 	user_wait_on_busy_lock(lockres);
639 
640 	status = 0;
641 bail:
642 	return status;
643 }
644 
645 static void user_dlm_recovery_handler_noop(int node_num,
646 					   void *recovery_data)
647 {
648 	/* We ignore recovery events */
649 	return;
650 }
651 
652 void user_dlm_set_locking_protocol(void)
653 {
654 	ocfs2_stack_glue_set_max_proto_version(&user_dlm_lproto.lp_max_version);
655 }
656 
657 struct ocfs2_cluster_connection *user_dlm_register(const struct qstr *name)
658 {
659 	int rc;
660 	struct ocfs2_cluster_connection *conn;
661 
662 	rc = ocfs2_cluster_connect_agnostic(name->name, name->len,
663 					    &user_dlm_lproto,
664 					    user_dlm_recovery_handler_noop,
665 					    NULL, &conn);
666 	if (rc)
667 		mlog_errno(rc);
668 
669 	return rc ? ERR_PTR(rc) : conn;
670 }
671 
672 void user_dlm_unregister(struct ocfs2_cluster_connection *conn)
673 {
674 	ocfs2_cluster_disconnect(conn, 0);
675 }
676