xref: /linux/fs/dlm/user.c (revision ed683b9bb91fc274383e222ba5873a9ee9033462)
1  // SPDX-License-Identifier: GPL-2.0-only
2  /*
3   * Copyright (C) 2006-2010 Red Hat, Inc.  All rights reserved.
4   */
5  
6  #include <linux/miscdevice.h>
7  #include <linux/init.h>
8  #include <linux/wait.h>
9  #include <linux/file.h>
10  #include <linux/fs.h>
11  #include <linux/poll.h>
12  #include <linux/signal.h>
13  #include <linux/spinlock.h>
14  #include <linux/dlm.h>
15  #include <linux/dlm_device.h>
16  #include <linux/slab.h>
17  #include <linux/sched/signal.h>
18  
19  #include <trace/events/dlm.h>
20  
21  #include "dlm_internal.h"
22  #include "lockspace.h"
23  #include "lock.h"
24  #include "lvb_table.h"
25  #include "user.h"
26  #include "ast.h"
27  #include "config.h"
28  #include "memory.h"
29  
30  static const char name_prefix[] = "dlm";
31  static const struct file_operations device_fops;
32  static atomic_t dlm_monitor_opened;
33  static int dlm_monitor_unused = 1;
34  
35  #ifdef CONFIG_COMPAT
36  
37  struct dlm_lock_params32 {
38  	__u8 mode;
39  	__u8 namelen;
40  	__u16 unused;
41  	__u32 flags;
42  	__u32 lkid;
43  	__u32 parent;
44  	__u64 xid;
45  	__u64 timeout;
46  	__u32 castparam;
47  	__u32 castaddr;
48  	__u32 bastparam;
49  	__u32 bastaddr;
50  	__u32 lksb;
51  	char lvb[DLM_USER_LVB_LEN];
52  	char name[];
53  };
54  
55  struct dlm_write_request32 {
56  	__u32 version[3];
57  	__u8 cmd;
58  	__u8 is64bit;
59  	__u8 unused[2];
60  
61  	union  {
62  		struct dlm_lock_params32 lock;
63  		struct dlm_lspace_params lspace;
64  		struct dlm_purge_params purge;
65  	} i;
66  };
67  
68  struct dlm_lksb32 {
69  	__u32 sb_status;
70  	__u32 sb_lkid;
71  	__u8 sb_flags;
72  	__u32 sb_lvbptr;
73  };
74  
75  struct dlm_lock_result32 {
76  	__u32 version[3];
77  	__u32 length;
78  	__u32 user_astaddr;
79  	__u32 user_astparam;
80  	__u32 user_lksb;
81  	struct dlm_lksb32 lksb;
82  	__u8 bast_mode;
83  	__u8 unused[3];
84  	/* Offsets may be zero if no data is present */
85  	__u32 lvb_offset;
86  };
87  
88  static void compat_input(struct dlm_write_request *kb,
89  			 struct dlm_write_request32 *kb32,
90  			 int namelen)
91  {
92  	kb->version[0] = kb32->version[0];
93  	kb->version[1] = kb32->version[1];
94  	kb->version[2] = kb32->version[2];
95  
96  	kb->cmd = kb32->cmd;
97  	kb->is64bit = kb32->is64bit;
98  	if (kb->cmd == DLM_USER_CREATE_LOCKSPACE ||
99  	    kb->cmd == DLM_USER_REMOVE_LOCKSPACE) {
100  		kb->i.lspace.flags = kb32->i.lspace.flags;
101  		kb->i.lspace.minor = kb32->i.lspace.minor;
102  		memcpy(kb->i.lspace.name, kb32->i.lspace.name, namelen);
103  	} else if (kb->cmd == DLM_USER_PURGE) {
104  		kb->i.purge.nodeid = kb32->i.purge.nodeid;
105  		kb->i.purge.pid = kb32->i.purge.pid;
106  	} else {
107  		kb->i.lock.mode = kb32->i.lock.mode;
108  		kb->i.lock.namelen = kb32->i.lock.namelen;
109  		kb->i.lock.flags = kb32->i.lock.flags;
110  		kb->i.lock.lkid = kb32->i.lock.lkid;
111  		kb->i.lock.parent = kb32->i.lock.parent;
112  		kb->i.lock.xid = kb32->i.lock.xid;
113  		kb->i.lock.timeout = kb32->i.lock.timeout;
114  		kb->i.lock.castparam = (__user void *)(long)kb32->i.lock.castparam;
115  		kb->i.lock.castaddr = (__user void *)(long)kb32->i.lock.castaddr;
116  		kb->i.lock.bastparam = (__user void *)(long)kb32->i.lock.bastparam;
117  		kb->i.lock.bastaddr = (__user void *)(long)kb32->i.lock.bastaddr;
118  		kb->i.lock.lksb = (__user void *)(long)kb32->i.lock.lksb;
119  		memcpy(kb->i.lock.lvb, kb32->i.lock.lvb, DLM_USER_LVB_LEN);
120  		memcpy(kb->i.lock.name, kb32->i.lock.name, namelen);
121  	}
122  }
123  
124  static void compat_output(struct dlm_lock_result *res,
125  			  struct dlm_lock_result32 *res32)
126  {
127  	memset(res32, 0, sizeof(*res32));
128  
129  	res32->version[0] = res->version[0];
130  	res32->version[1] = res->version[1];
131  	res32->version[2] = res->version[2];
132  
133  	res32->user_astaddr = (__u32)(__force long)res->user_astaddr;
134  	res32->user_astparam = (__u32)(__force long)res->user_astparam;
135  	res32->user_lksb = (__u32)(__force long)res->user_lksb;
136  	res32->bast_mode = res->bast_mode;
137  
138  	res32->lvb_offset = res->lvb_offset;
139  	res32->length = res->length;
140  
141  	res32->lksb.sb_status = res->lksb.sb_status;
142  	res32->lksb.sb_flags = res->lksb.sb_flags;
143  	res32->lksb.sb_lkid = res->lksb.sb_lkid;
144  	res32->lksb.sb_lvbptr = (__u32)(long)res->lksb.sb_lvbptr;
145  }
146  #endif
147  
148  /* should held proc->asts_spin lock */
149  void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb)
150  {
151  	struct dlm_callback *cb, *safe;
152  
153  	list_for_each_entry_safe(cb, safe, &lkb->lkb_callbacks, list) {
154  		list_del(&cb->list);
155  		kref_put(&cb->ref, dlm_release_callback);
156  	}
157  
158  	clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
159  
160  	/* invalidate */
161  	dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL);
162  	dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL);
163  	lkb->lkb_last_bast_mode = -1;
164  }
165  
166  /* Figure out if this lock is at the end of its life and no longer
167     available for the application to use.  The lkb still exists until
168     the final ast is read.  A lock becomes EOL in three situations:
169       1. a noqueue request fails with EAGAIN
170       2. an unlock completes with EUNLOCK
171       3. a cancel of a waiting request completes with ECANCEL/EDEADLK
172     An EOL lock needs to be removed from the process's list of locks.
173     And we can't allow any new operation on an EOL lock.  This is
174     not related to the lifetime of the lkb struct which is managed
175     entirely by refcount. */
176  
177  static int lkb_is_endoflife(int mode, int status)
178  {
179  	switch (status) {
180  	case -DLM_EUNLOCK:
181  		return 1;
182  	case -DLM_ECANCEL:
183  	case -ETIMEDOUT:
184  	case -EDEADLK:
185  	case -EAGAIN:
186  		if (mode == DLM_LOCK_IV)
187  			return 1;
188  		break;
189  	}
190  	return 0;
191  }
192  
193  /* we could possibly check if the cancel of an orphan has resulted in the lkb
194     being removed and then remove that lkb from the orphans list and free it */
195  
196  void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
197  		      int status, uint32_t sbflags)
198  {
199  	struct dlm_ls *ls;
200  	struct dlm_user_args *ua;
201  	struct dlm_user_proc *proc;
202  	int rv;
203  
204  	if (test_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags) ||
205  	    test_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags))
206  		return;
207  
208  	ls = lkb->lkb_resource->res_ls;
209  	spin_lock(&ls->ls_clear_proc_locks);
210  
211  	/* If ORPHAN/DEAD flag is set, it means the process is dead so an ast
212  	   can't be delivered.  For ORPHAN's, dlm_clear_proc_locks() freed
213  	   lkb->ua so we can't try to use it.  This second check is necessary
214  	   for cases where a completion ast is received for an operation that
215  	   began before clear_proc_locks did its cancel/unlock. */
216  
217  	if (test_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags) ||
218  	    test_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags))
219  		goto out;
220  
221  	DLM_ASSERT(lkb->lkb_ua, dlm_print_lkb(lkb););
222  	ua = lkb->lkb_ua;
223  	proc = ua->proc;
224  
225  	if ((flags & DLM_CB_BAST) && ua->bastaddr == NULL)
226  		goto out;
227  
228  	if ((flags & DLM_CB_CAST) && lkb_is_endoflife(mode, status))
229  		set_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags);
230  
231  	spin_lock(&proc->asts_spin);
232  
233  	rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags);
234  	switch (rv) {
235  	case DLM_ENQUEUE_CALLBACK_FAILURE:
236  		spin_unlock(&proc->asts_spin);
237  		WARN_ON_ONCE(1);
238  		goto out;
239  	case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
240  		kref_get(&lkb->lkb_ref);
241  		list_add_tail(&lkb->lkb_cb_list, &proc->asts);
242  		wake_up_interruptible(&proc->wait);
243  		break;
244  	case DLM_ENQUEUE_CALLBACK_SUCCESS:
245  		break;
246  	default:
247  		WARN_ON_ONCE(1);
248  		break;
249  	}
250  	spin_unlock(&proc->asts_spin);
251  
252  	if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
253  		/* N.B. spin_lock locks_spin, not asts_spin */
254  		spin_lock(&proc->locks_spin);
255  		if (!list_empty(&lkb->lkb_ownqueue)) {
256  			list_del_init(&lkb->lkb_ownqueue);
257  			dlm_put_lkb(lkb);
258  		}
259  		spin_unlock(&proc->locks_spin);
260  	}
261   out:
262  	spin_unlock(&ls->ls_clear_proc_locks);
263  }
264  
265  static int device_user_lock(struct dlm_user_proc *proc,
266  			    struct dlm_lock_params *params)
267  {
268  	struct dlm_ls *ls;
269  	struct dlm_user_args *ua;
270  	uint32_t lkid;
271  	int error = -ENOMEM;
272  
273  	ls = dlm_find_lockspace_local(proc->lockspace);
274  	if (!ls)
275  		return -ENOENT;
276  
277  	if (!params->castaddr || !params->lksb) {
278  		error = -EINVAL;
279  		goto out;
280  	}
281  
282  	ua = kzalloc(sizeof(struct dlm_user_args), GFP_NOFS);
283  	if (!ua)
284  		goto out;
285  	ua->proc = proc;
286  	ua->user_lksb = params->lksb;
287  	ua->castparam = params->castparam;
288  	ua->castaddr = params->castaddr;
289  	ua->bastparam = params->bastparam;
290  	ua->bastaddr = params->bastaddr;
291  	ua->xid = params->xid;
292  
293  	if (params->flags & DLM_LKF_CONVERT) {
294  		error = dlm_user_convert(ls, ua,
295  					 params->mode, params->flags,
296  					 params->lkid, params->lvb);
297  	} else if (params->flags & DLM_LKF_ORPHAN) {
298  		error = dlm_user_adopt_orphan(ls, ua,
299  					 params->mode, params->flags,
300  					 params->name, params->namelen,
301  					 &lkid);
302  		if (!error)
303  			error = lkid;
304  	} else {
305  		error = dlm_user_request(ls, ua,
306  					 params->mode, params->flags,
307  					 params->name, params->namelen);
308  		if (!error)
309  			error = ua->lksb.sb_lkid;
310  	}
311   out:
312  	dlm_put_lockspace(ls);
313  	return error;
314  }
315  
316  static int device_user_unlock(struct dlm_user_proc *proc,
317  			      struct dlm_lock_params *params)
318  {
319  	struct dlm_ls *ls;
320  	struct dlm_user_args *ua;
321  	int error = -ENOMEM;
322  
323  	ls = dlm_find_lockspace_local(proc->lockspace);
324  	if (!ls)
325  		return -ENOENT;
326  
327  	ua = kzalloc(sizeof(struct dlm_user_args), GFP_NOFS);
328  	if (!ua)
329  		goto out;
330  	ua->proc = proc;
331  	ua->user_lksb = params->lksb;
332  	ua->castparam = params->castparam;
333  	ua->castaddr = params->castaddr;
334  
335  	if (params->flags & DLM_LKF_CANCEL)
336  		error = dlm_user_cancel(ls, ua, params->flags, params->lkid);
337  	else
338  		error = dlm_user_unlock(ls, ua, params->flags, params->lkid,
339  					params->lvb);
340   out:
341  	dlm_put_lockspace(ls);
342  	return error;
343  }
344  
345  static int device_user_deadlock(struct dlm_user_proc *proc,
346  				struct dlm_lock_params *params)
347  {
348  	struct dlm_ls *ls;
349  	int error;
350  
351  	ls = dlm_find_lockspace_local(proc->lockspace);
352  	if (!ls)
353  		return -ENOENT;
354  
355  	error = dlm_user_deadlock(ls, params->flags, params->lkid);
356  
357  	dlm_put_lockspace(ls);
358  	return error;
359  }
360  
361  static int dlm_device_register(struct dlm_ls *ls, char *name)
362  {
363  	int error, len;
364  
365  	/* The device is already registered.  This happens when the
366  	   lockspace is created multiple times from userspace. */
367  	if (ls->ls_device.name)
368  		return 0;
369  
370  	error = -ENOMEM;
371  	len = strlen(name) + strlen(name_prefix) + 2;
372  	ls->ls_device.name = kzalloc(len, GFP_NOFS);
373  	if (!ls->ls_device.name)
374  		goto fail;
375  
376  	snprintf((char *)ls->ls_device.name, len, "%s_%s", name_prefix,
377  		 name);
378  	ls->ls_device.fops = &device_fops;
379  	ls->ls_device.minor = MISC_DYNAMIC_MINOR;
380  
381  	error = misc_register(&ls->ls_device);
382  	if (error) {
383  		kfree(ls->ls_device.name);
384  		/* this has to be set to NULL
385  		 * to avoid a double-free in dlm_device_deregister
386  		 */
387  		ls->ls_device.name = NULL;
388  	}
389  fail:
390  	return error;
391  }
392  
393  int dlm_device_deregister(struct dlm_ls *ls)
394  {
395  	/* The device is not registered.  This happens when the lockspace
396  	   was never used from userspace, or when device_create_lockspace()
397  	   calls dlm_release_lockspace() after the register fails. */
398  	if (!ls->ls_device.name)
399  		return 0;
400  
401  	misc_deregister(&ls->ls_device);
402  	kfree(ls->ls_device.name);
403  	return 0;
404  }
405  
406  static int device_user_purge(struct dlm_user_proc *proc,
407  			     struct dlm_purge_params *params)
408  {
409  	struct dlm_ls *ls;
410  	int error;
411  
412  	ls = dlm_find_lockspace_local(proc->lockspace);
413  	if (!ls)
414  		return -ENOENT;
415  
416  	error = dlm_user_purge(ls, proc, params->nodeid, params->pid);
417  
418  	dlm_put_lockspace(ls);
419  	return error;
420  }
421  
422  static int device_create_lockspace(struct dlm_lspace_params *params)
423  {
424  	dlm_lockspace_t *lockspace;
425  	struct dlm_ls *ls;
426  	int error;
427  
428  	if (!capable(CAP_SYS_ADMIN))
429  		return -EPERM;
430  
431  	error = dlm_new_user_lockspace(params->name, dlm_config.ci_cluster_name,
432  				       params->flags, DLM_USER_LVB_LEN, NULL,
433  				       NULL, NULL, &lockspace);
434  	if (error)
435  		return error;
436  
437  	ls = dlm_find_lockspace_local(lockspace);
438  	if (!ls)
439  		return -ENOENT;
440  
441  	error = dlm_device_register(ls, params->name);
442  	dlm_put_lockspace(ls);
443  
444  	if (error)
445  		dlm_release_lockspace(lockspace, 0);
446  	else
447  		error = ls->ls_device.minor;
448  
449  	return error;
450  }
451  
452  static int device_remove_lockspace(struct dlm_lspace_params *params)
453  {
454  	dlm_lockspace_t *lockspace;
455  	struct dlm_ls *ls;
456  	int error, force = 0;
457  
458  	if (!capable(CAP_SYS_ADMIN))
459  		return -EPERM;
460  
461  	ls = dlm_find_lockspace_device(params->minor);
462  	if (!ls)
463  		return -ENOENT;
464  
465  	if (params->flags & DLM_USER_LSFLG_FORCEFREE)
466  		force = 2;
467  
468  	lockspace = ls->ls_local_handle;
469  	dlm_put_lockspace(ls);
470  
471  	/* The final dlm_release_lockspace waits for references to go to
472  	   zero, so all processes will need to close their device for the
473  	   ls before the release will proceed.  release also calls the
474  	   device_deregister above.  Converting a positive return value
475  	   from release to zero means that userspace won't know when its
476  	   release was the final one, but it shouldn't need to know. */
477  
478  	error = dlm_release_lockspace(lockspace, force);
479  	if (error > 0)
480  		error = 0;
481  	return error;
482  }
483  
484  /* Check the user's version matches ours */
485  static int check_version(struct dlm_write_request *req)
486  {
487  	if (req->version[0] != DLM_DEVICE_VERSION_MAJOR ||
488  	    (req->version[0] == DLM_DEVICE_VERSION_MAJOR &&
489  	     req->version[1] > DLM_DEVICE_VERSION_MINOR)) {
490  
491  		printk(KERN_DEBUG "dlm: process %s (%d) version mismatch "
492  		       "user (%d.%d.%d) kernel (%d.%d.%d)\n",
493  		       current->comm,
494  		       task_pid_nr(current),
495  		       req->version[0],
496  		       req->version[1],
497  		       req->version[2],
498  		       DLM_DEVICE_VERSION_MAJOR,
499  		       DLM_DEVICE_VERSION_MINOR,
500  		       DLM_DEVICE_VERSION_PATCH);
501  		return -EINVAL;
502  	}
503  	return 0;
504  }
505  
506  /*
507   * device_write
508   *
509   *   device_user_lock
510   *     dlm_user_request -> request_lock
511   *     dlm_user_convert -> convert_lock
512   *
513   *   device_user_unlock
514   *     dlm_user_unlock -> unlock_lock
515   *     dlm_user_cancel -> cancel_lock
516   *
517   *   device_create_lockspace
518   *     dlm_new_lockspace
519   *
520   *   device_remove_lockspace
521   *     dlm_release_lockspace
522   */
523  
524  /* a write to a lockspace device is a lock or unlock request, a write
525     to the control device is to create/remove a lockspace */
526  
527  static ssize_t device_write(struct file *file, const char __user *buf,
528  			    size_t count, loff_t *ppos)
529  {
530  	struct dlm_user_proc *proc = file->private_data;
531  	struct dlm_write_request *kbuf;
532  	int error;
533  
534  #ifdef CONFIG_COMPAT
535  	if (count < sizeof(struct dlm_write_request32))
536  #else
537  	if (count < sizeof(struct dlm_write_request))
538  #endif
539  		return -EINVAL;
540  
541  	/*
542  	 * can't compare against COMPAT/dlm_write_request32 because
543  	 * we don't yet know if is64bit is zero
544  	 */
545  	if (count > sizeof(struct dlm_write_request) + DLM_RESNAME_MAXLEN)
546  		return -EINVAL;
547  
548  	kbuf = memdup_user_nul(buf, count);
549  	if (IS_ERR(kbuf))
550  		return PTR_ERR(kbuf);
551  
552  	if (check_version(kbuf)) {
553  		error = -EBADE;
554  		goto out_free;
555  	}
556  
557  #ifdef CONFIG_COMPAT
558  	if (!kbuf->is64bit) {
559  		struct dlm_write_request32 *k32buf;
560  		int namelen = 0;
561  
562  		if (count > sizeof(struct dlm_write_request32))
563  			namelen = count - sizeof(struct dlm_write_request32);
564  
565  		k32buf = (struct dlm_write_request32 *)kbuf;
566  
567  		/* add 1 after namelen so that the name string is terminated */
568  		kbuf = kzalloc(sizeof(struct dlm_write_request) + namelen + 1,
569  			       GFP_NOFS);
570  		if (!kbuf) {
571  			kfree(k32buf);
572  			return -ENOMEM;
573  		}
574  
575  		if (proc)
576  			set_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags);
577  
578  		compat_input(kbuf, k32buf, namelen);
579  		kfree(k32buf);
580  	}
581  #endif
582  
583  	/* do we really need this? can a write happen after a close? */
584  	if ((kbuf->cmd == DLM_USER_LOCK || kbuf->cmd == DLM_USER_UNLOCK) &&
585  	    (proc && test_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags))) {
586  		error = -EINVAL;
587  		goto out_free;
588  	}
589  
590  	error = -EINVAL;
591  
592  	switch (kbuf->cmd)
593  	{
594  	case DLM_USER_LOCK:
595  		if (!proc) {
596  			log_print("no locking on control device");
597  			goto out_free;
598  		}
599  		error = device_user_lock(proc, &kbuf->i.lock);
600  		break;
601  
602  	case DLM_USER_UNLOCK:
603  		if (!proc) {
604  			log_print("no locking on control device");
605  			goto out_free;
606  		}
607  		error = device_user_unlock(proc, &kbuf->i.lock);
608  		break;
609  
610  	case DLM_USER_DEADLOCK:
611  		if (!proc) {
612  			log_print("no locking on control device");
613  			goto out_free;
614  		}
615  		error = device_user_deadlock(proc, &kbuf->i.lock);
616  		break;
617  
618  	case DLM_USER_CREATE_LOCKSPACE:
619  		if (proc) {
620  			log_print("create/remove only on control device");
621  			goto out_free;
622  		}
623  		error = device_create_lockspace(&kbuf->i.lspace);
624  		break;
625  
626  	case DLM_USER_REMOVE_LOCKSPACE:
627  		if (proc) {
628  			log_print("create/remove only on control device");
629  			goto out_free;
630  		}
631  		error = device_remove_lockspace(&kbuf->i.lspace);
632  		break;
633  
634  	case DLM_USER_PURGE:
635  		if (!proc) {
636  			log_print("no locking on control device");
637  			goto out_free;
638  		}
639  		error = device_user_purge(proc, &kbuf->i.purge);
640  		break;
641  
642  	default:
643  		log_print("Unknown command passed to DLM device : %d\n",
644  			  kbuf->cmd);
645  	}
646  
647   out_free:
648  	kfree(kbuf);
649  	return error;
650  }
651  
652  /* Every process that opens the lockspace device has its own "proc" structure
653     hanging off the open file that's used to keep track of locks owned by the
654     process and asts that need to be delivered to the process. */
655  
656  static int device_open(struct inode *inode, struct file *file)
657  {
658  	struct dlm_user_proc *proc;
659  	struct dlm_ls *ls;
660  
661  	ls = dlm_find_lockspace_device(iminor(inode));
662  	if (!ls)
663  		return -ENOENT;
664  
665  	proc = kzalloc(sizeof(struct dlm_user_proc), GFP_NOFS);
666  	if (!proc) {
667  		dlm_put_lockspace(ls);
668  		return -ENOMEM;
669  	}
670  
671  	proc->lockspace = ls->ls_local_handle;
672  	INIT_LIST_HEAD(&proc->asts);
673  	INIT_LIST_HEAD(&proc->locks);
674  	INIT_LIST_HEAD(&proc->unlocking);
675  	spin_lock_init(&proc->asts_spin);
676  	spin_lock_init(&proc->locks_spin);
677  	init_waitqueue_head(&proc->wait);
678  	file->private_data = proc;
679  
680  	return 0;
681  }
682  
683  static int device_close(struct inode *inode, struct file *file)
684  {
685  	struct dlm_user_proc *proc = file->private_data;
686  	struct dlm_ls *ls;
687  
688  	ls = dlm_find_lockspace_local(proc->lockspace);
689  	if (!ls)
690  		return -ENOENT;
691  
692  	set_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags);
693  
694  	dlm_clear_proc_locks(ls, proc);
695  
696  	/* at this point no more lkb's should exist for this lockspace,
697  	   so there's no chance of dlm_user_add_ast() being called and
698  	   looking for lkb->ua->proc */
699  
700  	kfree(proc);
701  	file->private_data = NULL;
702  
703  	dlm_put_lockspace(ls);
704  	dlm_put_lockspace(ls);  /* for the find in device_open() */
705  
706  	/* FIXME: AUTOFREE: if this ls is no longer used do
707  	   device_remove_lockspace() */
708  
709  	return 0;
710  }
711  
712  static int copy_result_to_user(struct dlm_user_args *ua, int compat,
713  			       uint32_t flags, int mode, int copy_lvb,
714  			       char __user *buf, size_t count)
715  {
716  #ifdef CONFIG_COMPAT
717  	struct dlm_lock_result32 result32;
718  #endif
719  	struct dlm_lock_result result;
720  	void *resultptr;
721  	int error=0;
722  	int len;
723  	int struct_len;
724  
725  	memset(&result, 0, sizeof(struct dlm_lock_result));
726  	result.version[0] = DLM_DEVICE_VERSION_MAJOR;
727  	result.version[1] = DLM_DEVICE_VERSION_MINOR;
728  	result.version[2] = DLM_DEVICE_VERSION_PATCH;
729  	memcpy(&result.lksb, &ua->lksb, offsetof(struct dlm_lksb, sb_lvbptr));
730  	result.user_lksb = ua->user_lksb;
731  
732  	/* FIXME: dlm1 provides for the user's bastparam/addr to not be updated
733  	   in a conversion unless the conversion is successful.  See code
734  	   in dlm_user_convert() for updating ua from ua_tmp.  OpenVMS, though,
735  	   notes that a new blocking AST address and parameter are set even if
736  	   the conversion fails, so maybe we should just do that. */
737  
738  	if (flags & DLM_CB_BAST) {
739  		result.user_astaddr = ua->bastaddr;
740  		result.user_astparam = ua->bastparam;
741  		result.bast_mode = mode;
742  	} else {
743  		result.user_astaddr = ua->castaddr;
744  		result.user_astparam = ua->castparam;
745  	}
746  
747  #ifdef CONFIG_COMPAT
748  	if (compat)
749  		len = sizeof(struct dlm_lock_result32);
750  	else
751  #endif
752  		len = sizeof(struct dlm_lock_result);
753  	struct_len = len;
754  
755  	/* copy lvb to userspace if there is one, it's been updated, and
756  	   the user buffer has space for it */
757  
758  	if (copy_lvb && ua->lksb.sb_lvbptr && count >= len + DLM_USER_LVB_LEN) {
759  		if (copy_to_user(buf+len, ua->lksb.sb_lvbptr,
760  				 DLM_USER_LVB_LEN)) {
761  			error = -EFAULT;
762  			goto out;
763  		}
764  
765  		result.lvb_offset = len;
766  		len += DLM_USER_LVB_LEN;
767  	}
768  
769  	result.length = len;
770  	resultptr = &result;
771  #ifdef CONFIG_COMPAT
772  	if (compat) {
773  		compat_output(&result, &result32);
774  		resultptr = &result32;
775  	}
776  #endif
777  
778  	if (copy_to_user(buf, resultptr, struct_len))
779  		error = -EFAULT;
780  	else
781  		error = len;
782   out:
783  	return error;
784  }
785  
786  static int copy_version_to_user(char __user *buf, size_t count)
787  {
788  	struct dlm_device_version ver;
789  
790  	memset(&ver, 0, sizeof(struct dlm_device_version));
791  	ver.version[0] = DLM_DEVICE_VERSION_MAJOR;
792  	ver.version[1] = DLM_DEVICE_VERSION_MINOR;
793  	ver.version[2] = DLM_DEVICE_VERSION_PATCH;
794  
795  	if (copy_to_user(buf, &ver, sizeof(struct dlm_device_version)))
796  		return -EFAULT;
797  	return sizeof(struct dlm_device_version);
798  }
799  
800  /* a read returns a single ast described in a struct dlm_lock_result */
801  
802  static ssize_t device_read(struct file *file, char __user *buf, size_t count,
803  			   loff_t *ppos)
804  {
805  	struct dlm_user_proc *proc = file->private_data;
806  	struct dlm_lkb *lkb;
807  	DECLARE_WAITQUEUE(wait, current);
808  	struct dlm_callback *cb;
809  	int rv, copy_lvb = 0;
810  	int old_mode, new_mode;
811  
812  	if (count == sizeof(struct dlm_device_version)) {
813  		rv = copy_version_to_user(buf, count);
814  		return rv;
815  	}
816  
817  	if (!proc) {
818  		log_print("non-version read from control device %zu", count);
819  		return -EINVAL;
820  	}
821  
822  #ifdef CONFIG_COMPAT
823  	if (count < sizeof(struct dlm_lock_result32))
824  #else
825  	if (count < sizeof(struct dlm_lock_result))
826  #endif
827  		return -EINVAL;
828  
829   try_another:
830  
831  	/* do we really need this? can a read happen after a close? */
832  	if (test_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags))
833  		return -EINVAL;
834  
835  	spin_lock(&proc->asts_spin);
836  	if (list_empty(&proc->asts)) {
837  		if (file->f_flags & O_NONBLOCK) {
838  			spin_unlock(&proc->asts_spin);
839  			return -EAGAIN;
840  		}
841  
842  		add_wait_queue(&proc->wait, &wait);
843  
844  	repeat:
845  		set_current_state(TASK_INTERRUPTIBLE);
846  		if (list_empty(&proc->asts) && !signal_pending(current)) {
847  			spin_unlock(&proc->asts_spin);
848  			schedule();
849  			spin_lock(&proc->asts_spin);
850  			goto repeat;
851  		}
852  		set_current_state(TASK_RUNNING);
853  		remove_wait_queue(&proc->wait, &wait);
854  
855  		if (signal_pending(current)) {
856  			spin_unlock(&proc->asts_spin);
857  			return -ERESTARTSYS;
858  		}
859  	}
860  
861  	/* if we empty lkb_callbacks, we don't want to unlock the spinlock
862  	   without removing lkb_cb_list; so empty lkb_cb_list is always
863  	   consistent with empty lkb_callbacks */
864  
865  	lkb = list_first_entry(&proc->asts, struct dlm_lkb, lkb_cb_list);
866  
867  	/* rem_lkb_callback sets a new lkb_last_cast */
868  	old_mode = lkb->lkb_last_cast->mode;
869  
870  	rv = dlm_dequeue_lkb_callback(lkb, &cb);
871  	switch (rv) {
872  	case DLM_DEQUEUE_CALLBACK_EMPTY:
873  		/* this shouldn't happen; lkb should have been removed from
874  		 * list when last item was dequeued
875  		 */
876  		log_print("dlm_rem_lkb_callback empty %x", lkb->lkb_id);
877  		list_del_init(&lkb->lkb_cb_list);
878  		spin_unlock(&proc->asts_spin);
879  		/* removes ref for proc->asts, may cause lkb to be freed */
880  		dlm_put_lkb(lkb);
881  		WARN_ON_ONCE(1);
882  		goto try_another;
883  	case DLM_DEQUEUE_CALLBACK_LAST:
884  		list_del_init(&lkb->lkb_cb_list);
885  		clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
886  		break;
887  	case DLM_DEQUEUE_CALLBACK_SUCCESS:
888  		break;
889  	default:
890  		WARN_ON_ONCE(1);
891  		break;
892  	}
893  	spin_unlock(&proc->asts_spin);
894  
895  	if (cb->flags & DLM_CB_BAST) {
896  		trace_dlm_bast(lkb->lkb_resource->res_ls, lkb, cb->mode);
897  	} else if (cb->flags & DLM_CB_CAST) {
898  		new_mode = cb->mode;
899  
900  		if (!cb->sb_status && lkb->lkb_lksb->sb_lvbptr &&
901  		    dlm_lvb_operations[old_mode + 1][new_mode + 1])
902  			copy_lvb = 1;
903  
904  		lkb->lkb_lksb->sb_status = cb->sb_status;
905  		lkb->lkb_lksb->sb_flags = cb->sb_flags;
906  		trace_dlm_ast(lkb->lkb_resource->res_ls, lkb);
907  	}
908  
909  	rv = copy_result_to_user(lkb->lkb_ua,
910  				 test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags),
911  				 cb->flags, cb->mode, copy_lvb, buf, count);
912  
913  	kref_put(&cb->ref, dlm_release_callback);
914  
915  	/* removes ref for proc->asts, may cause lkb to be freed */
916  	if (rv == DLM_DEQUEUE_CALLBACK_LAST)
917  		dlm_put_lkb(lkb);
918  
919  	return rv;
920  }
921  
922  static __poll_t device_poll(struct file *file, poll_table *wait)
923  {
924  	struct dlm_user_proc *proc = file->private_data;
925  
926  	poll_wait(file, &proc->wait, wait);
927  
928  	spin_lock(&proc->asts_spin);
929  	if (!list_empty(&proc->asts)) {
930  		spin_unlock(&proc->asts_spin);
931  		return EPOLLIN | EPOLLRDNORM;
932  	}
933  	spin_unlock(&proc->asts_spin);
934  	return 0;
935  }
936  
937  int dlm_user_daemon_available(void)
938  {
939  	/* dlm_controld hasn't started (or, has started, but not
940  	   properly populated configfs) */
941  
942  	if (!dlm_our_nodeid())
943  		return 0;
944  
945  	/* This is to deal with versions of dlm_controld that don't
946  	   know about the monitor device.  We assume that if the
947  	   dlm_controld was started (above), but the monitor device
948  	   was never opened, that it's an old version.  dlm_controld
949  	   should open the monitor device before populating configfs. */
950  
951  	if (dlm_monitor_unused)
952  		return 1;
953  
954  	return atomic_read(&dlm_monitor_opened) ? 1 : 0;
955  }
956  
957  static int ctl_device_open(struct inode *inode, struct file *file)
958  {
959  	file->private_data = NULL;
960  	return 0;
961  }
962  
963  static int ctl_device_close(struct inode *inode, struct file *file)
964  {
965  	return 0;
966  }
967  
968  static int monitor_device_open(struct inode *inode, struct file *file)
969  {
970  	atomic_inc(&dlm_monitor_opened);
971  	dlm_monitor_unused = 0;
972  	return 0;
973  }
974  
975  static int monitor_device_close(struct inode *inode, struct file *file)
976  {
977  	if (atomic_dec_and_test(&dlm_monitor_opened))
978  		dlm_stop_lockspaces();
979  	return 0;
980  }
981  
982  static const struct file_operations device_fops = {
983  	.open    = device_open,
984  	.release = device_close,
985  	.read    = device_read,
986  	.write   = device_write,
987  	.poll    = device_poll,
988  	.owner   = THIS_MODULE,
989  	.llseek  = noop_llseek,
990  };
991  
992  static const struct file_operations ctl_device_fops = {
993  	.open    = ctl_device_open,
994  	.release = ctl_device_close,
995  	.read    = device_read,
996  	.write   = device_write,
997  	.owner   = THIS_MODULE,
998  	.llseek  = noop_llseek,
999  };
1000  
1001  static struct miscdevice ctl_device = {
1002  	.name  = "dlm-control",
1003  	.fops  = &ctl_device_fops,
1004  	.minor = MISC_DYNAMIC_MINOR,
1005  };
1006  
1007  static const struct file_operations monitor_device_fops = {
1008  	.open    = monitor_device_open,
1009  	.release = monitor_device_close,
1010  	.owner   = THIS_MODULE,
1011  	.llseek  = noop_llseek,
1012  };
1013  
1014  static struct miscdevice monitor_device = {
1015  	.name  = "dlm-monitor",
1016  	.fops  = &monitor_device_fops,
1017  	.minor = MISC_DYNAMIC_MINOR,
1018  };
1019  
1020  int __init dlm_user_init(void)
1021  {
1022  	int error;
1023  
1024  	atomic_set(&dlm_monitor_opened, 0);
1025  
1026  	error = misc_register(&ctl_device);
1027  	if (error) {
1028  		log_print("misc_register failed for control device");
1029  		goto out;
1030  	}
1031  
1032  	error = misc_register(&monitor_device);
1033  	if (error) {
1034  		log_print("misc_register failed for monitor device");
1035  		misc_deregister(&ctl_device);
1036  	}
1037   out:
1038  	return error;
1039  }
1040  
1041  void dlm_user_exit(void)
1042  {
1043  	misc_deregister(&ctl_device);
1044  	misc_deregister(&monitor_device);
1045  }
1046  
1047