xref: /linux/drivers/block/drbd/drbd_main.c (revision c145211d1f9e2ef19e7b4c2b943f68366daa97af)
1 /*
2    drbd.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12 
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17 
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22 
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 
27  */
28 
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/smp_lock.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48 
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52 
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56 
57 #include "drbd_vli.h"
58 
59 struct after_state_chg_work {
60 	struct drbd_work w;
61 	union drbd_state os;
62 	union drbd_state ns;
63 	enum chg_state_flags flags;
64 	struct completion *done;
65 };
66 
67 int drbdd_init(struct drbd_thread *);
68 int drbd_worker(struct drbd_thread *);
69 int drbd_asender(struct drbd_thread *);
70 
71 int drbd_init(void);
72 static int drbd_open(struct block_device *bdev, fmode_t mode);
73 static int drbd_release(struct gendisk *gd, fmode_t mode);
74 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
75 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
76 			   union drbd_state ns, enum chg_state_flags flags);
77 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
78 static void md_sync_timer_fn(unsigned long data);
79 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
80 
81 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
82 	      "Lars Ellenberg <lars@linbit.com>");
83 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
84 MODULE_VERSION(REL_VERSION);
85 MODULE_LICENSE("GPL");
86 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices (1-255)");
87 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
88 
89 #include <linux/moduleparam.h>
90 /* allow_open_on_secondary */
91 MODULE_PARM_DESC(allow_oos, "DONT USE!");
92 /* thanks to these macros, if compiled into the kernel (not-module),
93  * this becomes the boot parameter drbd.minor_count */
94 module_param(minor_count, uint, 0444);
95 module_param(disable_sendpage, bool, 0644);
96 module_param(allow_oos, bool, 0);
97 module_param(cn_idx, uint, 0444);
98 module_param(proc_details, int, 0644);
99 
100 #ifdef CONFIG_DRBD_FAULT_INJECTION
101 int enable_faults;
102 int fault_rate;
103 static int fault_count;
104 int fault_devs;
105 /* bitmap of enabled faults */
106 module_param(enable_faults, int, 0664);
107 /* fault rate % value - applies to all enabled faults */
108 module_param(fault_rate, int, 0664);
109 /* count of faults inserted */
110 module_param(fault_count, int, 0664);
111 /* bitmap of devices to insert faults on */
112 module_param(fault_devs, int, 0644);
113 #endif
114 
115 /* module parameter, defined */
116 unsigned int minor_count = 32;
117 int disable_sendpage;
118 int allow_oos;
119 unsigned int cn_idx = CN_IDX_DRBD;
120 int proc_details;       /* Detail level in proc drbd*/
121 
122 /* Module parameter for setting the user mode helper program
123  * to run. Default is /sbin/drbdadm */
124 char usermode_helper[80] = "/sbin/drbdadm";
125 
126 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
127 
128 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
129  * as member "struct gendisk *vdisk;"
130  */
131 struct drbd_conf **minor_table;
132 
133 struct kmem_cache *drbd_request_cache;
134 struct kmem_cache *drbd_ee_cache;	/* epoch entries */
135 struct kmem_cache *drbd_bm_ext_cache;	/* bitmap extents */
136 struct kmem_cache *drbd_al_ext_cache;	/* activity log extents */
137 mempool_t *drbd_request_mempool;
138 mempool_t *drbd_ee_mempool;
139 
140 /* I do not use a standard mempool, because:
141    1) I want to hand out the pre-allocated objects first.
142    2) I want to be able to interrupt sleeping allocation with a signal.
143    Note: This is a single linked list, the next pointer is the private
144 	 member of struct page.
145  */
146 struct page *drbd_pp_pool;
147 spinlock_t   drbd_pp_lock;
148 int          drbd_pp_vacant;
149 wait_queue_head_t drbd_pp_wait;
150 
151 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
152 
153 static const struct block_device_operations drbd_ops = {
154 	.owner =   THIS_MODULE,
155 	.open =    drbd_open,
156 	.release = drbd_release,
157 };
158 
159 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
160 
161 #ifdef __CHECKER__
162 /* When checking with sparse, and this is an inline function, sparse will
163    give tons of false positives. When this is a real functions sparse works.
164  */
165 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
166 {
167 	int io_allowed;
168 
169 	atomic_inc(&mdev->local_cnt);
170 	io_allowed = (mdev->state.disk >= mins);
171 	if (!io_allowed) {
172 		if (atomic_dec_and_test(&mdev->local_cnt))
173 			wake_up(&mdev->misc_wait);
174 	}
175 	return io_allowed;
176 }
177 
178 #endif
179 
180 /**
181  * DOC: The transfer log
182  *
183  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
184  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
185  * of the list. There is always at least one &struct drbd_tl_epoch object.
186  *
187  * Each &struct drbd_tl_epoch has a circular double linked list of requests
188  * attached.
189  */
190 static int tl_init(struct drbd_conf *mdev)
191 {
192 	struct drbd_tl_epoch *b;
193 
194 	/* during device minor initialization, we may well use GFP_KERNEL */
195 	b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
196 	if (!b)
197 		return 0;
198 	INIT_LIST_HEAD(&b->requests);
199 	INIT_LIST_HEAD(&b->w.list);
200 	b->next = NULL;
201 	b->br_number = 4711;
202 	b->n_req = 0;
203 	b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
204 
205 	mdev->oldest_tle = b;
206 	mdev->newest_tle = b;
207 	INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
208 
209 	mdev->tl_hash = NULL;
210 	mdev->tl_hash_s = 0;
211 
212 	return 1;
213 }
214 
215 static void tl_cleanup(struct drbd_conf *mdev)
216 {
217 	D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
218 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
219 	kfree(mdev->oldest_tle);
220 	mdev->oldest_tle = NULL;
221 	kfree(mdev->unused_spare_tle);
222 	mdev->unused_spare_tle = NULL;
223 	kfree(mdev->tl_hash);
224 	mdev->tl_hash = NULL;
225 	mdev->tl_hash_s = 0;
226 }
227 
228 /**
229  * _tl_add_barrier() - Adds a barrier to the transfer log
230  * @mdev:	DRBD device.
231  * @new:	Barrier to be added before the current head of the TL.
232  *
233  * The caller must hold the req_lock.
234  */
235 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
236 {
237 	struct drbd_tl_epoch *newest_before;
238 
239 	INIT_LIST_HEAD(&new->requests);
240 	INIT_LIST_HEAD(&new->w.list);
241 	new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
242 	new->next = NULL;
243 	new->n_req = 0;
244 
245 	newest_before = mdev->newest_tle;
246 	/* never send a barrier number == 0, because that is special-cased
247 	 * when using TCQ for our write ordering code */
248 	new->br_number = (newest_before->br_number+1) ?: 1;
249 	if (mdev->newest_tle != new) {
250 		mdev->newest_tle->next = new;
251 		mdev->newest_tle = new;
252 	}
253 }
254 
255 /**
256  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
257  * @mdev:	DRBD device.
258  * @barrier_nr:	Expected identifier of the DRBD write barrier packet.
259  * @set_size:	Expected number of requests before that barrier.
260  *
261  * In case the passed barrier_nr or set_size does not match the oldest
262  * &struct drbd_tl_epoch objects this function will cause a termination
263  * of the connection.
264  */
265 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
266 		       unsigned int set_size)
267 {
268 	struct drbd_tl_epoch *b, *nob; /* next old barrier */
269 	struct list_head *le, *tle;
270 	struct drbd_request *r;
271 
272 	spin_lock_irq(&mdev->req_lock);
273 
274 	b = mdev->oldest_tle;
275 
276 	/* first some paranoia code */
277 	if (b == NULL) {
278 		dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
279 			barrier_nr);
280 		goto bail;
281 	}
282 	if (b->br_number != barrier_nr) {
283 		dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
284 			barrier_nr, b->br_number);
285 		goto bail;
286 	}
287 	if (b->n_req != set_size) {
288 		dev_err(DEV, "BAD! BarrierAck #%u received with n_req=%u, expected n_req=%u!\n",
289 			barrier_nr, set_size, b->n_req);
290 		goto bail;
291 	}
292 
293 	/* Clean up list of requests processed during current epoch */
294 	list_for_each_safe(le, tle, &b->requests) {
295 		r = list_entry(le, struct drbd_request, tl_requests);
296 		_req_mod(r, barrier_acked);
297 	}
298 	/* There could be requests on the list waiting for completion
299 	   of the write to the local disk. To avoid corruptions of
300 	   slab's data structures we have to remove the lists head.
301 
302 	   Also there could have been a barrier ack out of sequence, overtaking
303 	   the write acks - which would be a bug and violating write ordering.
304 	   To not deadlock in case we lose connection while such requests are
305 	   still pending, we need some way to find them for the
306 	   _req_mode(connection_lost_while_pending).
307 
308 	   These have been list_move'd to the out_of_sequence_requests list in
309 	   _req_mod(, barrier_acked) above.
310 	   */
311 	list_del_init(&b->requests);
312 
313 	nob = b->next;
314 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
315 		_tl_add_barrier(mdev, b);
316 		if (nob)
317 			mdev->oldest_tle = nob;
318 		/* if nob == NULL b was the only barrier, and becomes the new
319 		   barrier. Therefore mdev->oldest_tle points already to b */
320 	} else {
321 		D_ASSERT(nob != NULL);
322 		mdev->oldest_tle = nob;
323 		kfree(b);
324 	}
325 
326 	spin_unlock_irq(&mdev->req_lock);
327 	dec_ap_pending(mdev);
328 
329 	return;
330 
331 bail:
332 	spin_unlock_irq(&mdev->req_lock);
333 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
334 }
335 
336 
337 /**
338  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
339  * @mdev:	DRBD device.
340  *
341  * This is called after the connection to the peer was lost. The storage covered
342  * by the requests on the transfer gets marked as our of sync. Called from the
343  * receiver thread and the worker thread.
344  */
345 void tl_clear(struct drbd_conf *mdev)
346 {
347 	struct drbd_tl_epoch *b, *tmp;
348 	struct list_head *le, *tle;
349 	struct drbd_request *r;
350 	int new_initial_bnr = net_random();
351 
352 	spin_lock_irq(&mdev->req_lock);
353 
354 	b = mdev->oldest_tle;
355 	while (b) {
356 		list_for_each_safe(le, tle, &b->requests) {
357 			r = list_entry(le, struct drbd_request, tl_requests);
358 			/* It would be nice to complete outside of spinlock.
359 			 * But this is easier for now. */
360 			_req_mod(r, connection_lost_while_pending);
361 		}
362 		tmp = b->next;
363 
364 		/* there could still be requests on that ring list,
365 		 * in case local io is still pending */
366 		list_del(&b->requests);
367 
368 		/* dec_ap_pending corresponding to queue_barrier.
369 		 * the newest barrier may not have been queued yet,
370 		 * in which case w.cb is still NULL. */
371 		if (b->w.cb != NULL)
372 			dec_ap_pending(mdev);
373 
374 		if (b == mdev->newest_tle) {
375 			/* recycle, but reinit! */
376 			D_ASSERT(tmp == NULL);
377 			INIT_LIST_HEAD(&b->requests);
378 			INIT_LIST_HEAD(&b->w.list);
379 			b->w.cb = NULL;
380 			b->br_number = new_initial_bnr;
381 			b->n_req = 0;
382 
383 			mdev->oldest_tle = b;
384 			break;
385 		}
386 		kfree(b);
387 		b = tmp;
388 	}
389 
390 	/* we expect this list to be empty. */
391 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
392 
393 	/* but just in case, clean it up anyways! */
394 	list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
395 		r = list_entry(le, struct drbd_request, tl_requests);
396 		/* It would be nice to complete outside of spinlock.
397 		 * But this is easier for now. */
398 		_req_mod(r, connection_lost_while_pending);
399 	}
400 
401 	/* ensure bit indicating barrier is required is clear */
402 	clear_bit(CREATE_BARRIER, &mdev->flags);
403 
404 	spin_unlock_irq(&mdev->req_lock);
405 }
406 
407 /**
408  * cl_wide_st_chg() - TRUE if the state change is a cluster wide one
409  * @mdev:	DRBD device.
410  * @os:		old (current) state.
411  * @ns:		new (wanted) state.
412  */
413 static int cl_wide_st_chg(struct drbd_conf *mdev,
414 			  union drbd_state os, union drbd_state ns)
415 {
416 	return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
417 		 ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
418 		  (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
419 		  (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
420 		  (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))) ||
421 		(os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
422 		(os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
423 }
424 
425 int drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
426 		      union drbd_state mask, union drbd_state val)
427 {
428 	unsigned long flags;
429 	union drbd_state os, ns;
430 	int rv;
431 
432 	spin_lock_irqsave(&mdev->req_lock, flags);
433 	os = mdev->state;
434 	ns.i = (os.i & ~mask.i) | val.i;
435 	rv = _drbd_set_state(mdev, ns, f, NULL);
436 	ns = mdev->state;
437 	spin_unlock_irqrestore(&mdev->req_lock, flags);
438 
439 	return rv;
440 }
441 
442 /**
443  * drbd_force_state() - Impose a change which happens outside our control on our state
444  * @mdev:	DRBD device.
445  * @mask:	mask of state bits to change.
446  * @val:	value of new state bits.
447  */
448 void drbd_force_state(struct drbd_conf *mdev,
449 	union drbd_state mask, union drbd_state val)
450 {
451 	drbd_change_state(mdev, CS_HARD, mask, val);
452 }
453 
454 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns);
455 static int is_valid_state_transition(struct drbd_conf *,
456 				     union drbd_state, union drbd_state);
457 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
458 				       union drbd_state ns, int *warn_sync_abort);
459 int drbd_send_state_req(struct drbd_conf *,
460 			union drbd_state, union drbd_state);
461 
462 static enum drbd_state_ret_codes _req_st_cond(struct drbd_conf *mdev,
463 				    union drbd_state mask, union drbd_state val)
464 {
465 	union drbd_state os, ns;
466 	unsigned long flags;
467 	int rv;
468 
469 	if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
470 		return SS_CW_SUCCESS;
471 
472 	if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
473 		return SS_CW_FAILED_BY_PEER;
474 
475 	rv = 0;
476 	spin_lock_irqsave(&mdev->req_lock, flags);
477 	os = mdev->state;
478 	ns.i = (os.i & ~mask.i) | val.i;
479 	ns = sanitize_state(mdev, os, ns, NULL);
480 
481 	if (!cl_wide_st_chg(mdev, os, ns))
482 		rv = SS_CW_NO_NEED;
483 	if (!rv) {
484 		rv = is_valid_state(mdev, ns);
485 		if (rv == SS_SUCCESS) {
486 			rv = is_valid_state_transition(mdev, ns, os);
487 			if (rv == SS_SUCCESS)
488 				rv = 0; /* cont waiting, otherwise fail. */
489 		}
490 	}
491 	spin_unlock_irqrestore(&mdev->req_lock, flags);
492 
493 	return rv;
494 }
495 
496 /**
497  * drbd_req_state() - Perform an eventually cluster wide state change
498  * @mdev:	DRBD device.
499  * @mask:	mask of state bits to change.
500  * @val:	value of new state bits.
501  * @f:		flags
502  *
503  * Should not be called directly, use drbd_request_state() or
504  * _drbd_request_state().
505  */
506 static int drbd_req_state(struct drbd_conf *mdev,
507 			  union drbd_state mask, union drbd_state val,
508 			  enum chg_state_flags f)
509 {
510 	struct completion done;
511 	unsigned long flags;
512 	union drbd_state os, ns;
513 	int rv;
514 
515 	init_completion(&done);
516 
517 	if (f & CS_SERIALIZE)
518 		mutex_lock(&mdev->state_mutex);
519 
520 	spin_lock_irqsave(&mdev->req_lock, flags);
521 	os = mdev->state;
522 	ns.i = (os.i & ~mask.i) | val.i;
523 	ns = sanitize_state(mdev, os, ns, NULL);
524 
525 	if (cl_wide_st_chg(mdev, os, ns)) {
526 		rv = is_valid_state(mdev, ns);
527 		if (rv == SS_SUCCESS)
528 			rv = is_valid_state_transition(mdev, ns, os);
529 		spin_unlock_irqrestore(&mdev->req_lock, flags);
530 
531 		if (rv < SS_SUCCESS) {
532 			if (f & CS_VERBOSE)
533 				print_st_err(mdev, os, ns, rv);
534 			goto abort;
535 		}
536 
537 		drbd_state_lock(mdev);
538 		if (!drbd_send_state_req(mdev, mask, val)) {
539 			drbd_state_unlock(mdev);
540 			rv = SS_CW_FAILED_BY_PEER;
541 			if (f & CS_VERBOSE)
542 				print_st_err(mdev, os, ns, rv);
543 			goto abort;
544 		}
545 
546 		wait_event(mdev->state_wait,
547 			(rv = _req_st_cond(mdev, mask, val)));
548 
549 		if (rv < SS_SUCCESS) {
550 			drbd_state_unlock(mdev);
551 			if (f & CS_VERBOSE)
552 				print_st_err(mdev, os, ns, rv);
553 			goto abort;
554 		}
555 		spin_lock_irqsave(&mdev->req_lock, flags);
556 		os = mdev->state;
557 		ns.i = (os.i & ~mask.i) | val.i;
558 		rv = _drbd_set_state(mdev, ns, f, &done);
559 		drbd_state_unlock(mdev);
560 	} else {
561 		rv = _drbd_set_state(mdev, ns, f, &done);
562 	}
563 
564 	spin_unlock_irqrestore(&mdev->req_lock, flags);
565 
566 	if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
567 		D_ASSERT(current != mdev->worker.task);
568 		wait_for_completion(&done);
569 	}
570 
571 abort:
572 	if (f & CS_SERIALIZE)
573 		mutex_unlock(&mdev->state_mutex);
574 
575 	return rv;
576 }
577 
578 /**
579  * _drbd_request_state() - Request a state change (with flags)
580  * @mdev:	DRBD device.
581  * @mask:	mask of state bits to change.
582  * @val:	value of new state bits.
583  * @f:		flags
584  *
585  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
586  * flag, or when logging of failed state change requests is not desired.
587  */
588 int _drbd_request_state(struct drbd_conf *mdev,	union drbd_state mask,
589 			union drbd_state val,	enum chg_state_flags f)
590 {
591 	int rv;
592 
593 	wait_event(mdev->state_wait,
594 		   (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
595 
596 	return rv;
597 }
598 
599 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
600 {
601 	dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
602 	    name,
603 	    drbd_conn_str(ns.conn),
604 	    drbd_role_str(ns.role),
605 	    drbd_role_str(ns.peer),
606 	    drbd_disk_str(ns.disk),
607 	    drbd_disk_str(ns.pdsk),
608 	    ns.susp ? 's' : 'r',
609 	    ns.aftr_isp ? 'a' : '-',
610 	    ns.peer_isp ? 'p' : '-',
611 	    ns.user_isp ? 'u' : '-'
612 	    );
613 }
614 
615 void print_st_err(struct drbd_conf *mdev,
616 	union drbd_state os, union drbd_state ns, int err)
617 {
618 	if (err == SS_IN_TRANSIENT_STATE)
619 		return;
620 	dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
621 	print_st(mdev, " state", os);
622 	print_st(mdev, "wanted", ns);
623 }
624 
625 
626 #define drbd_peer_str drbd_role_str
627 #define drbd_pdsk_str drbd_disk_str
628 
629 #define drbd_susp_str(A)     ((A) ? "1" : "0")
630 #define drbd_aftr_isp_str(A) ((A) ? "1" : "0")
631 #define drbd_peer_isp_str(A) ((A) ? "1" : "0")
632 #define drbd_user_isp_str(A) ((A) ? "1" : "0")
633 
634 #define PSC(A) \
635 	({ if (ns.A != os.A) { \
636 		pbp += sprintf(pbp, #A "( %s -> %s ) ", \
637 			      drbd_##A##_str(os.A), \
638 			      drbd_##A##_str(ns.A)); \
639 	} })
640 
641 /**
642  * is_valid_state() - Returns an SS_ error code if ns is not valid
643  * @mdev:	DRBD device.
644  * @ns:		State to consider.
645  */
646 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
647 {
648 	/* See drbd_state_sw_errors in drbd_strings.c */
649 
650 	enum drbd_fencing_p fp;
651 	int rv = SS_SUCCESS;
652 
653 	fp = FP_DONT_CARE;
654 	if (get_ldev(mdev)) {
655 		fp = mdev->ldev->dc.fencing;
656 		put_ldev(mdev);
657 	}
658 
659 	if (get_net_conf(mdev)) {
660 		if (!mdev->net_conf->two_primaries &&
661 		    ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
662 			rv = SS_TWO_PRIMARIES;
663 		put_net_conf(mdev);
664 	}
665 
666 	if (rv <= 0)
667 		/* already found a reason to abort */;
668 	else if (ns.role == R_SECONDARY && mdev->open_cnt)
669 		rv = SS_DEVICE_IN_USE;
670 
671 	else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
672 		rv = SS_NO_UP_TO_DATE_DISK;
673 
674 	else if (fp >= FP_RESOURCE &&
675 		 ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
676 		rv = SS_PRIMARY_NOP;
677 
678 	else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
679 		rv = SS_NO_UP_TO_DATE_DISK;
680 
681 	else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
682 		rv = SS_NO_LOCAL_DISK;
683 
684 	else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
685 		rv = SS_NO_REMOTE_DISK;
686 
687 	else if ((ns.conn == C_CONNECTED ||
688 		  ns.conn == C_WF_BITMAP_S ||
689 		  ns.conn == C_SYNC_SOURCE ||
690 		  ns.conn == C_PAUSED_SYNC_S) &&
691 		  ns.disk == D_OUTDATED)
692 		rv = SS_CONNECTED_OUTDATES;
693 
694 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
695 		 (mdev->sync_conf.verify_alg[0] == 0))
696 		rv = SS_NO_VERIFY_ALG;
697 
698 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
699 		  mdev->agreed_pro_version < 88)
700 		rv = SS_NOT_SUPPORTED;
701 
702 	return rv;
703 }
704 
705 /**
706  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
707  * @mdev:	DRBD device.
708  * @ns:		new state.
709  * @os:		old state.
710  */
711 static int is_valid_state_transition(struct drbd_conf *mdev,
712 				     union drbd_state ns, union drbd_state os)
713 {
714 	int rv = SS_SUCCESS;
715 
716 	if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
717 	    os.conn > C_CONNECTED)
718 		rv = SS_RESYNC_RUNNING;
719 
720 	if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
721 		rv = SS_ALREADY_STANDALONE;
722 
723 	if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
724 		rv = SS_IS_DISKLESS;
725 
726 	if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
727 		rv = SS_NO_NET_CONFIG;
728 
729 	if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
730 		rv = SS_LOWER_THAN_OUTDATED;
731 
732 	if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
733 		rv = SS_IN_TRANSIENT_STATE;
734 
735 	if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
736 		rv = SS_IN_TRANSIENT_STATE;
737 
738 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
739 		rv = SS_NEED_CONNECTION;
740 
741 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
742 	    ns.conn != os.conn && os.conn > C_CONNECTED)
743 		rv = SS_RESYNC_RUNNING;
744 
745 	if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
746 	    os.conn < C_CONNECTED)
747 		rv = SS_NEED_CONNECTION;
748 
749 	return rv;
750 }
751 
752 /**
753  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
754  * @mdev:	DRBD device.
755  * @os:		old state.
756  * @ns:		new state.
757  * @warn_sync_abort:
758  *
759  * When we loose connection, we have to set the state of the peers disk (pdsk)
760  * to D_UNKNOWN. This rule and many more along those lines are in this function.
761  */
762 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
763 				       union drbd_state ns, int *warn_sync_abort)
764 {
765 	enum drbd_fencing_p fp;
766 
767 	fp = FP_DONT_CARE;
768 	if (get_ldev(mdev)) {
769 		fp = mdev->ldev->dc.fencing;
770 		put_ldev(mdev);
771 	}
772 
773 	/* Disallow Network errors to configure a device's network part */
774 	if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
775 	    os.conn <= C_DISCONNECTING)
776 		ns.conn = os.conn;
777 
778 	/* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow */
779 	if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
780 	    ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING)
781 		ns.conn = os.conn;
782 
783 	/* After C_DISCONNECTING only C_STANDALONE may follow */
784 	if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
785 		ns.conn = os.conn;
786 
787 	if (ns.conn < C_CONNECTED) {
788 		ns.peer_isp = 0;
789 		ns.peer = R_UNKNOWN;
790 		if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
791 			ns.pdsk = D_UNKNOWN;
792 	}
793 
794 	/* Clear the aftr_isp when becoming unconfigured */
795 	if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
796 		ns.aftr_isp = 0;
797 
798 	if (ns.conn <= C_DISCONNECTING && ns.disk == D_DISKLESS)
799 		ns.pdsk = D_UNKNOWN;
800 
801 	/* Abort resync if a disk fails/detaches */
802 	if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
803 	    (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
804 		if (warn_sync_abort)
805 			*warn_sync_abort = 1;
806 		ns.conn = C_CONNECTED;
807 	}
808 
809 	if (ns.conn >= C_CONNECTED &&
810 	    ((ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED) ||
811 	     (ns.disk == D_NEGOTIATING && ns.conn == C_WF_BITMAP_T))) {
812 		switch (ns.conn) {
813 		case C_WF_BITMAP_T:
814 		case C_PAUSED_SYNC_T:
815 			ns.disk = D_OUTDATED;
816 			break;
817 		case C_CONNECTED:
818 		case C_WF_BITMAP_S:
819 		case C_SYNC_SOURCE:
820 		case C_PAUSED_SYNC_S:
821 			ns.disk = D_UP_TO_DATE;
822 			break;
823 		case C_SYNC_TARGET:
824 			ns.disk = D_INCONSISTENT;
825 			dev_warn(DEV, "Implicitly set disk state Inconsistent!\n");
826 			break;
827 		}
828 		if (os.disk == D_OUTDATED && ns.disk == D_UP_TO_DATE)
829 			dev_warn(DEV, "Implicitly set disk from Outdated to UpToDate\n");
830 	}
831 
832 	if (ns.conn >= C_CONNECTED &&
833 	    (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)) {
834 		switch (ns.conn) {
835 		case C_CONNECTED:
836 		case C_WF_BITMAP_T:
837 		case C_PAUSED_SYNC_T:
838 		case C_SYNC_TARGET:
839 			ns.pdsk = D_UP_TO_DATE;
840 			break;
841 		case C_WF_BITMAP_S:
842 		case C_PAUSED_SYNC_S:
843 			ns.pdsk = D_OUTDATED;
844 			break;
845 		case C_SYNC_SOURCE:
846 			ns.pdsk = D_INCONSISTENT;
847 			dev_warn(DEV, "Implicitly set pdsk Inconsistent!\n");
848 			break;
849 		}
850 		if (os.pdsk == D_OUTDATED && ns.pdsk == D_UP_TO_DATE)
851 			dev_warn(DEV, "Implicitly set pdsk from Outdated to UpToDate\n");
852 	}
853 
854 	/* Connection breaks down before we finished "Negotiating" */
855 	if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
856 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
857 		if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
858 			ns.disk = mdev->new_state_tmp.disk;
859 			ns.pdsk = mdev->new_state_tmp.pdsk;
860 		} else {
861 			dev_alert(DEV, "Connection lost while negotiating, no data!\n");
862 			ns.disk = D_DISKLESS;
863 			ns.pdsk = D_UNKNOWN;
864 		}
865 		put_ldev(mdev);
866 	}
867 
868 	if (fp == FP_STONITH &&
869 	    (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
870 	    !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
871 		ns.susp = 1;
872 
873 	if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
874 		if (ns.conn == C_SYNC_SOURCE)
875 			ns.conn = C_PAUSED_SYNC_S;
876 		if (ns.conn == C_SYNC_TARGET)
877 			ns.conn = C_PAUSED_SYNC_T;
878 	} else {
879 		if (ns.conn == C_PAUSED_SYNC_S)
880 			ns.conn = C_SYNC_SOURCE;
881 		if (ns.conn == C_PAUSED_SYNC_T)
882 			ns.conn = C_SYNC_TARGET;
883 	}
884 
885 	return ns;
886 }
887 
888 /* helper for __drbd_set_state */
889 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
890 {
891 	if (cs == C_VERIFY_T) {
892 		/* starting online verify from an arbitrary position
893 		 * does not fit well into the existing protocol.
894 		 * on C_VERIFY_T, we initialize ov_left and friends
895 		 * implicitly in receive_DataRequest once the
896 		 * first P_OV_REQUEST is received */
897 		mdev->ov_start_sector = ~(sector_t)0;
898 	} else {
899 		unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
900 		if (bit >= mdev->rs_total)
901 			mdev->ov_start_sector =
902 				BM_BIT_TO_SECT(mdev->rs_total - 1);
903 		mdev->ov_position = mdev->ov_start_sector;
904 	}
905 }
906 
907 /**
908  * __drbd_set_state() - Set a new DRBD state
909  * @mdev:	DRBD device.
910  * @ns:		new state.
911  * @flags:	Flags
912  * @done:	Optional completion, that will get completed after the after_state_ch() finished
913  *
914  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
915  */
916 int __drbd_set_state(struct drbd_conf *mdev,
917 		    union drbd_state ns, enum chg_state_flags flags,
918 		    struct completion *done)
919 {
920 	union drbd_state os;
921 	int rv = SS_SUCCESS;
922 	int warn_sync_abort = 0;
923 	struct after_state_chg_work *ascw;
924 
925 	os = mdev->state;
926 
927 	ns = sanitize_state(mdev, os, ns, &warn_sync_abort);
928 
929 	if (ns.i == os.i)
930 		return SS_NOTHING_TO_DO;
931 
932 	if (!(flags & CS_HARD)) {
933 		/*  pre-state-change checks ; only look at ns  */
934 		/* See drbd_state_sw_errors in drbd_strings.c */
935 
936 		rv = is_valid_state(mdev, ns);
937 		if (rv < SS_SUCCESS) {
938 			/* If the old state was illegal as well, then let
939 			   this happen...*/
940 
941 			if (is_valid_state(mdev, os) == rv) {
942 				dev_err(DEV, "Considering state change from bad state. "
943 				    "Error would be: '%s'\n",
944 				    drbd_set_st_err_str(rv));
945 				print_st(mdev, "old", os);
946 				print_st(mdev, "new", ns);
947 				rv = is_valid_state_transition(mdev, ns, os);
948 			}
949 		} else
950 			rv = is_valid_state_transition(mdev, ns, os);
951 	}
952 
953 	if (rv < SS_SUCCESS) {
954 		if (flags & CS_VERBOSE)
955 			print_st_err(mdev, os, ns, rv);
956 		return rv;
957 	}
958 
959 	if (warn_sync_abort)
960 		dev_warn(DEV, "Resync aborted.\n");
961 
962 	{
963 		char *pbp, pb[300];
964 		pbp = pb;
965 		*pbp = 0;
966 		PSC(role);
967 		PSC(peer);
968 		PSC(conn);
969 		PSC(disk);
970 		PSC(pdsk);
971 		PSC(susp);
972 		PSC(aftr_isp);
973 		PSC(peer_isp);
974 		PSC(user_isp);
975 		dev_info(DEV, "%s\n", pb);
976 	}
977 
978 	/* solve the race between becoming unconfigured,
979 	 * worker doing the cleanup, and
980 	 * admin reconfiguring us:
981 	 * on (re)configure, first set CONFIG_PENDING,
982 	 * then wait for a potentially exiting worker,
983 	 * start the worker, and schedule one no_op.
984 	 * then proceed with configuration.
985 	 */
986 	if (ns.disk == D_DISKLESS &&
987 	    ns.conn == C_STANDALONE &&
988 	    ns.role == R_SECONDARY &&
989 	    !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
990 		set_bit(DEVICE_DYING, &mdev->flags);
991 
992 	mdev->state.i = ns.i;
993 	wake_up(&mdev->misc_wait);
994 	wake_up(&mdev->state_wait);
995 
996 	/*   post-state-change actions   */
997 	if (os.conn >= C_SYNC_SOURCE   && ns.conn <= C_CONNECTED) {
998 		set_bit(STOP_SYNC_TIMER, &mdev->flags);
999 		mod_timer(&mdev->resync_timer, jiffies);
1000 	}
1001 
1002 	/* aborted verify run. log the last position */
1003 	if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1004 	    ns.conn < C_CONNECTED) {
1005 		mdev->ov_start_sector =
1006 			BM_BIT_TO_SECT(mdev->rs_total - mdev->ov_left);
1007 		dev_info(DEV, "Online Verify reached sector %llu\n",
1008 			(unsigned long long)mdev->ov_start_sector);
1009 	}
1010 
1011 	if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1012 	    (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1013 		dev_info(DEV, "Syncer continues.\n");
1014 		mdev->rs_paused += (long)jiffies-(long)mdev->rs_mark_time;
1015 		if (ns.conn == C_SYNC_TARGET) {
1016 			if (!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))
1017 				mod_timer(&mdev->resync_timer, jiffies);
1018 			/* This if (!test_bit) is only needed for the case
1019 			   that a device that has ceased to used its timer,
1020 			   i.e. it is already in drbd_resync_finished() gets
1021 			   paused and resumed. */
1022 		}
1023 	}
1024 
1025 	if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1026 	    (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1027 		dev_info(DEV, "Resync suspended\n");
1028 		mdev->rs_mark_time = jiffies;
1029 		if (ns.conn == C_PAUSED_SYNC_T)
1030 			set_bit(STOP_SYNC_TIMER, &mdev->flags);
1031 	}
1032 
1033 	if (os.conn == C_CONNECTED &&
1034 	    (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1035 		mdev->ov_position = 0;
1036 		mdev->rs_total =
1037 		mdev->rs_mark_left = drbd_bm_bits(mdev);
1038 		if (mdev->agreed_pro_version >= 90)
1039 			set_ov_position(mdev, ns.conn);
1040 		else
1041 			mdev->ov_start_sector = 0;
1042 		mdev->ov_left = mdev->rs_total
1043 			      - BM_SECT_TO_BIT(mdev->ov_position);
1044 		mdev->rs_start     =
1045 		mdev->rs_mark_time = jiffies;
1046 		mdev->ov_last_oos_size = 0;
1047 		mdev->ov_last_oos_start = 0;
1048 
1049 		if (ns.conn == C_VERIFY_S) {
1050 			dev_info(DEV, "Starting Online Verify from sector %llu\n",
1051 					(unsigned long long)mdev->ov_position);
1052 			mod_timer(&mdev->resync_timer, jiffies);
1053 		}
1054 	}
1055 
1056 	if (get_ldev(mdev)) {
1057 		u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1058 						 MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1059 						 MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1060 
1061 		if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1062 			mdf |= MDF_CRASHED_PRIMARY;
1063 		if (mdev->state.role == R_PRIMARY ||
1064 		    (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1065 			mdf |= MDF_PRIMARY_IND;
1066 		if (mdev->state.conn > C_WF_REPORT_PARAMS)
1067 			mdf |= MDF_CONNECTED_IND;
1068 		if (mdev->state.disk > D_INCONSISTENT)
1069 			mdf |= MDF_CONSISTENT;
1070 		if (mdev->state.disk > D_OUTDATED)
1071 			mdf |= MDF_WAS_UP_TO_DATE;
1072 		if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1073 			mdf |= MDF_PEER_OUT_DATED;
1074 		if (mdf != mdev->ldev->md.flags) {
1075 			mdev->ldev->md.flags = mdf;
1076 			drbd_md_mark_dirty(mdev);
1077 		}
1078 		if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1079 			drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1080 		put_ldev(mdev);
1081 	}
1082 
1083 	/* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1084 	if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1085 	    os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1086 		set_bit(CONSIDER_RESYNC, &mdev->flags);
1087 
1088 	/* Receiver should clean up itself */
1089 	if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1090 		drbd_thread_stop_nowait(&mdev->receiver);
1091 
1092 	/* Now the receiver finished cleaning up itself, it should die */
1093 	if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1094 		drbd_thread_stop_nowait(&mdev->receiver);
1095 
1096 	/* Upon network failure, we need to restart the receiver. */
1097 	if (os.conn > C_TEAR_DOWN &&
1098 	    ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1099 		drbd_thread_restart_nowait(&mdev->receiver);
1100 
1101 	ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1102 	if (ascw) {
1103 		ascw->os = os;
1104 		ascw->ns = ns;
1105 		ascw->flags = flags;
1106 		ascw->w.cb = w_after_state_ch;
1107 		ascw->done = done;
1108 		drbd_queue_work(&mdev->data.work, &ascw->w);
1109 	} else {
1110 		dev_warn(DEV, "Could not kmalloc an ascw\n");
1111 	}
1112 
1113 	return rv;
1114 }
1115 
1116 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1117 {
1118 	struct after_state_chg_work *ascw =
1119 		container_of(w, struct after_state_chg_work, w);
1120 	after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1121 	if (ascw->flags & CS_WAIT_COMPLETE) {
1122 		D_ASSERT(ascw->done != NULL);
1123 		complete(ascw->done);
1124 	}
1125 	kfree(ascw);
1126 
1127 	return 1;
1128 }
1129 
1130 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1131 {
1132 	if (rv) {
1133 		dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1134 		_drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1135 		return;
1136 	}
1137 
1138 	switch (mdev->state.conn) {
1139 	case C_STARTING_SYNC_T:
1140 		_drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1141 		break;
1142 	case C_STARTING_SYNC_S:
1143 		drbd_start_resync(mdev, C_SYNC_SOURCE);
1144 		break;
1145 	}
1146 }
1147 
1148 /**
1149  * after_state_ch() - Perform after state change actions that may sleep
1150  * @mdev:	DRBD device.
1151  * @os:		old state.
1152  * @ns:		new state.
1153  * @flags:	Flags
1154  */
1155 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1156 			   union drbd_state ns, enum chg_state_flags flags)
1157 {
1158 	enum drbd_fencing_p fp;
1159 
1160 	if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1161 		clear_bit(CRASHED_PRIMARY, &mdev->flags);
1162 		if (mdev->p_uuid)
1163 			mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1164 	}
1165 
1166 	fp = FP_DONT_CARE;
1167 	if (get_ldev(mdev)) {
1168 		fp = mdev->ldev->dc.fencing;
1169 		put_ldev(mdev);
1170 	}
1171 
1172 	/* Inform userspace about the change... */
1173 	drbd_bcast_state(mdev, ns);
1174 
1175 	if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1176 	    (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1177 		drbd_khelper(mdev, "pri-on-incon-degr");
1178 
1179 	/* Here we have the actions that are performed after a
1180 	   state change. This function might sleep */
1181 
1182 	if (fp == FP_STONITH && ns.susp) {
1183 		/* case1: The outdate peer handler is successful:
1184 		 * case2: The connection was established again: */
1185 		if ((os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) ||
1186 		    (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)) {
1187 			tl_clear(mdev);
1188 			spin_lock_irq(&mdev->req_lock);
1189 			_drbd_set_state(_NS(mdev, susp, 0), CS_VERBOSE, NULL);
1190 			spin_unlock_irq(&mdev->req_lock);
1191 		}
1192 	}
1193 	/* Do not change the order of the if above and the two below... */
1194 	if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1195 		drbd_send_uuids(mdev);
1196 		drbd_send_state(mdev);
1197 	}
1198 	if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S)
1199 		drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL, "send_bitmap (WFBitMapS)");
1200 
1201 	/* Lost contact to peer's copy of the data */
1202 	if ((os.pdsk >= D_INCONSISTENT &&
1203 	     os.pdsk != D_UNKNOWN &&
1204 	     os.pdsk != D_OUTDATED)
1205 	&&  (ns.pdsk < D_INCONSISTENT ||
1206 	     ns.pdsk == D_UNKNOWN ||
1207 	     ns.pdsk == D_OUTDATED)) {
1208 		kfree(mdev->p_uuid);
1209 		mdev->p_uuid = NULL;
1210 		if (get_ldev(mdev)) {
1211 			if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1212 			    mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1213 				drbd_uuid_new_current(mdev);
1214 				drbd_send_uuids(mdev);
1215 			}
1216 			put_ldev(mdev);
1217 		}
1218 	}
1219 
1220 	if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1221 		if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0)
1222 			drbd_uuid_new_current(mdev);
1223 
1224 		/* D_DISKLESS Peer becomes secondary */
1225 		if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1226 			drbd_al_to_on_disk_bm(mdev);
1227 		put_ldev(mdev);
1228 	}
1229 
1230 	/* Last part of the attaching process ... */
1231 	if (ns.conn >= C_CONNECTED &&
1232 	    os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1233 		kfree(mdev->p_uuid); /* We expect to receive up-to-date UUIDs soon. */
1234 		mdev->p_uuid = NULL; /* ...to not use the old ones in the mean time */
1235 		drbd_send_sizes(mdev, 0);  /* to start sync... */
1236 		drbd_send_uuids(mdev);
1237 		drbd_send_state(mdev);
1238 	}
1239 
1240 	/* We want to pause/continue resync, tell peer. */
1241 	if (ns.conn >= C_CONNECTED &&
1242 	     ((os.aftr_isp != ns.aftr_isp) ||
1243 	      (os.user_isp != ns.user_isp)))
1244 		drbd_send_state(mdev);
1245 
1246 	/* In case one of the isp bits got set, suspend other devices. */
1247 	if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1248 	    (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1249 		suspend_other_sg(mdev);
1250 
1251 	/* Make sure the peer gets informed about eventual state
1252 	   changes (ISP bits) while we were in WFReportParams. */
1253 	if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1254 		drbd_send_state(mdev);
1255 
1256 	/* We are in the progress to start a full sync... */
1257 	if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1258 	    (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1259 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, &abw_start_sync, "set_n_write from StartingSync");
1260 
1261 	/* We are invalidating our self... */
1262 	if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1263 	    os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1264 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
1265 
1266 	if (os.disk > D_FAILED && ns.disk == D_FAILED) {
1267 		enum drbd_io_error_p eh;
1268 
1269 		eh = EP_PASS_ON;
1270 		if (get_ldev_if_state(mdev, D_FAILED)) {
1271 			eh = mdev->ldev->dc.on_io_error;
1272 			put_ldev(mdev);
1273 		}
1274 
1275 		drbd_rs_cancel_all(mdev);
1276 		/* since get_ldev() only works as long as disk>=D_INCONSISTENT,
1277 		   and it is D_DISKLESS here, local_cnt can only go down, it can
1278 		   not increase... It will reach zero */
1279 		wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1280 		mdev->rs_total = 0;
1281 		mdev->rs_failed = 0;
1282 		atomic_set(&mdev->rs_pending_cnt, 0);
1283 
1284 		spin_lock_irq(&mdev->req_lock);
1285 		_drbd_set_state(_NS(mdev, disk, D_DISKLESS), CS_HARD, NULL);
1286 		spin_unlock_irq(&mdev->req_lock);
1287 
1288 		if (eh == EP_CALL_HELPER)
1289 			drbd_khelper(mdev, "local-io-error");
1290 	}
1291 
1292 	if (os.disk > D_DISKLESS && ns.disk == D_DISKLESS) {
1293 
1294 		if (os.disk == D_FAILED) /* && ns.disk == D_DISKLESS*/ {
1295 			if (drbd_send_state(mdev))
1296 				dev_warn(DEV, "Notified peer that my disk is broken.\n");
1297 			else
1298 				dev_err(DEV, "Sending state in drbd_io_error() failed\n");
1299 		}
1300 
1301 		wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1302 		lc_destroy(mdev->resync);
1303 		mdev->resync = NULL;
1304 		lc_destroy(mdev->act_log);
1305 		mdev->act_log = NULL;
1306 		__no_warn(local,
1307 			drbd_free_bc(mdev->ldev);
1308 			mdev->ldev = NULL;);
1309 
1310 		if (mdev->md_io_tmpp)
1311 			__free_page(mdev->md_io_tmpp);
1312 	}
1313 
1314 	/* Disks got bigger while they were detached */
1315 	if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1316 	    test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1317 		if (ns.conn == C_CONNECTED)
1318 			resync_after_online_grow(mdev);
1319 	}
1320 
1321 	/* A resync finished or aborted, wake paused devices... */
1322 	if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1323 	    (os.peer_isp && !ns.peer_isp) ||
1324 	    (os.user_isp && !ns.user_isp))
1325 		resume_next_sg(mdev);
1326 
1327 	/* Upon network connection, we need to start the receiver */
1328 	if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1329 		drbd_thread_start(&mdev->receiver);
1330 
1331 	/* Terminate worker thread if we are unconfigured - it will be
1332 	   restarted as needed... */
1333 	if (ns.disk == D_DISKLESS &&
1334 	    ns.conn == C_STANDALONE &&
1335 	    ns.role == R_SECONDARY) {
1336 		if (os.aftr_isp != ns.aftr_isp)
1337 			resume_next_sg(mdev);
1338 		/* set in __drbd_set_state, unless CONFIG_PENDING was set */
1339 		if (test_bit(DEVICE_DYING, &mdev->flags))
1340 			drbd_thread_stop_nowait(&mdev->worker);
1341 	}
1342 
1343 	drbd_md_sync(mdev);
1344 }
1345 
1346 
1347 static int drbd_thread_setup(void *arg)
1348 {
1349 	struct drbd_thread *thi = (struct drbd_thread *) arg;
1350 	struct drbd_conf *mdev = thi->mdev;
1351 	unsigned long flags;
1352 	int retval;
1353 
1354 restart:
1355 	retval = thi->function(thi);
1356 
1357 	spin_lock_irqsave(&thi->t_lock, flags);
1358 
1359 	/* if the receiver has been "Exiting", the last thing it did
1360 	 * was set the conn state to "StandAlone",
1361 	 * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1362 	 * and receiver thread will be "started".
1363 	 * drbd_thread_start needs to set "Restarting" in that case.
1364 	 * t_state check and assignment needs to be within the same spinlock,
1365 	 * so either thread_start sees Exiting, and can remap to Restarting,
1366 	 * or thread_start see None, and can proceed as normal.
1367 	 */
1368 
1369 	if (thi->t_state == Restarting) {
1370 		dev_info(DEV, "Restarting %s\n", current->comm);
1371 		thi->t_state = Running;
1372 		spin_unlock_irqrestore(&thi->t_lock, flags);
1373 		goto restart;
1374 	}
1375 
1376 	thi->task = NULL;
1377 	thi->t_state = None;
1378 	smp_mb();
1379 	complete(&thi->stop);
1380 	spin_unlock_irqrestore(&thi->t_lock, flags);
1381 
1382 	dev_info(DEV, "Terminating %s\n", current->comm);
1383 
1384 	/* Release mod reference taken when thread was started */
1385 	module_put(THIS_MODULE);
1386 	return retval;
1387 }
1388 
1389 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1390 		      int (*func) (struct drbd_thread *))
1391 {
1392 	spin_lock_init(&thi->t_lock);
1393 	thi->task    = NULL;
1394 	thi->t_state = None;
1395 	thi->function = func;
1396 	thi->mdev = mdev;
1397 }
1398 
1399 int drbd_thread_start(struct drbd_thread *thi)
1400 {
1401 	struct drbd_conf *mdev = thi->mdev;
1402 	struct task_struct *nt;
1403 	unsigned long flags;
1404 
1405 	const char *me =
1406 		thi == &mdev->receiver ? "receiver" :
1407 		thi == &mdev->asender  ? "asender"  :
1408 		thi == &mdev->worker   ? "worker"   : "NONSENSE";
1409 
1410 	/* is used from state engine doing drbd_thread_stop_nowait,
1411 	 * while holding the req lock irqsave */
1412 	spin_lock_irqsave(&thi->t_lock, flags);
1413 
1414 	switch (thi->t_state) {
1415 	case None:
1416 		dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1417 				me, current->comm, current->pid);
1418 
1419 		/* Get ref on module for thread - this is released when thread exits */
1420 		if (!try_module_get(THIS_MODULE)) {
1421 			dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1422 			spin_unlock_irqrestore(&thi->t_lock, flags);
1423 			return FALSE;
1424 		}
1425 
1426 		init_completion(&thi->stop);
1427 		D_ASSERT(thi->task == NULL);
1428 		thi->reset_cpu_mask = 1;
1429 		thi->t_state = Running;
1430 		spin_unlock_irqrestore(&thi->t_lock, flags);
1431 		flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1432 
1433 		nt = kthread_create(drbd_thread_setup, (void *) thi,
1434 				    "drbd%d_%s", mdev_to_minor(mdev), me);
1435 
1436 		if (IS_ERR(nt)) {
1437 			dev_err(DEV, "Couldn't start thread\n");
1438 
1439 			module_put(THIS_MODULE);
1440 			return FALSE;
1441 		}
1442 		spin_lock_irqsave(&thi->t_lock, flags);
1443 		thi->task = nt;
1444 		thi->t_state = Running;
1445 		spin_unlock_irqrestore(&thi->t_lock, flags);
1446 		wake_up_process(nt);
1447 		break;
1448 	case Exiting:
1449 		thi->t_state = Restarting;
1450 		dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1451 				me, current->comm, current->pid);
1452 		/* fall through */
1453 	case Running:
1454 	case Restarting:
1455 	default:
1456 		spin_unlock_irqrestore(&thi->t_lock, flags);
1457 		break;
1458 	}
1459 
1460 	return TRUE;
1461 }
1462 
1463 
1464 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1465 {
1466 	unsigned long flags;
1467 
1468 	enum drbd_thread_state ns = restart ? Restarting : Exiting;
1469 
1470 	/* may be called from state engine, holding the req lock irqsave */
1471 	spin_lock_irqsave(&thi->t_lock, flags);
1472 
1473 	if (thi->t_state == None) {
1474 		spin_unlock_irqrestore(&thi->t_lock, flags);
1475 		if (restart)
1476 			drbd_thread_start(thi);
1477 		return;
1478 	}
1479 
1480 	if (thi->t_state != ns) {
1481 		if (thi->task == NULL) {
1482 			spin_unlock_irqrestore(&thi->t_lock, flags);
1483 			return;
1484 		}
1485 
1486 		thi->t_state = ns;
1487 		smp_mb();
1488 		init_completion(&thi->stop);
1489 		if (thi->task != current)
1490 			force_sig(DRBD_SIGKILL, thi->task);
1491 
1492 	}
1493 
1494 	spin_unlock_irqrestore(&thi->t_lock, flags);
1495 
1496 	if (wait)
1497 		wait_for_completion(&thi->stop);
1498 }
1499 
1500 #ifdef CONFIG_SMP
1501 /**
1502  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1503  * @mdev:	DRBD device.
1504  *
1505  * Forces all threads of a device onto the same CPU. This is beneficial for
1506  * DRBD's performance. May be overwritten by user's configuration.
1507  */
1508 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1509 {
1510 	int ord, cpu;
1511 
1512 	/* user override. */
1513 	if (cpumask_weight(mdev->cpu_mask))
1514 		return;
1515 
1516 	ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1517 	for_each_online_cpu(cpu) {
1518 		if (ord-- == 0) {
1519 			cpumask_set_cpu(cpu, mdev->cpu_mask);
1520 			return;
1521 		}
1522 	}
1523 	/* should not be reached */
1524 	cpumask_setall(mdev->cpu_mask);
1525 }
1526 
1527 /**
1528  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1529  * @mdev:	DRBD device.
1530  *
1531  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1532  * prematurely.
1533  */
1534 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1535 {
1536 	struct task_struct *p = current;
1537 	struct drbd_thread *thi =
1538 		p == mdev->asender.task  ? &mdev->asender  :
1539 		p == mdev->receiver.task ? &mdev->receiver :
1540 		p == mdev->worker.task   ? &mdev->worker   :
1541 		NULL;
1542 	ERR_IF(thi == NULL)
1543 		return;
1544 	if (!thi->reset_cpu_mask)
1545 		return;
1546 	thi->reset_cpu_mask = 0;
1547 	set_cpus_allowed_ptr(p, mdev->cpu_mask);
1548 }
1549 #endif
1550 
1551 /* the appropriate socket mutex must be held already */
1552 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1553 			  enum drbd_packets cmd, struct p_header *h,
1554 			  size_t size, unsigned msg_flags)
1555 {
1556 	int sent, ok;
1557 
1558 	ERR_IF(!h) return FALSE;
1559 	ERR_IF(!size) return FALSE;
1560 
1561 	h->magic   = BE_DRBD_MAGIC;
1562 	h->command = cpu_to_be16(cmd);
1563 	h->length  = cpu_to_be16(size-sizeof(struct p_header));
1564 
1565 	sent = drbd_send(mdev, sock, h, size, msg_flags);
1566 
1567 	ok = (sent == size);
1568 	if (!ok)
1569 		dev_err(DEV, "short sent %s size=%d sent=%d\n",
1570 		    cmdname(cmd), (int)size, sent);
1571 	return ok;
1572 }
1573 
1574 /* don't pass the socket. we may only look at it
1575  * when we hold the appropriate socket mutex.
1576  */
1577 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1578 		  enum drbd_packets cmd, struct p_header *h, size_t size)
1579 {
1580 	int ok = 0;
1581 	struct socket *sock;
1582 
1583 	if (use_data_socket) {
1584 		mutex_lock(&mdev->data.mutex);
1585 		sock = mdev->data.socket;
1586 	} else {
1587 		mutex_lock(&mdev->meta.mutex);
1588 		sock = mdev->meta.socket;
1589 	}
1590 
1591 	/* drbd_disconnect() could have called drbd_free_sock()
1592 	 * while we were waiting in down()... */
1593 	if (likely(sock != NULL))
1594 		ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1595 
1596 	if (use_data_socket)
1597 		mutex_unlock(&mdev->data.mutex);
1598 	else
1599 		mutex_unlock(&mdev->meta.mutex);
1600 	return ok;
1601 }
1602 
1603 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
1604 		   size_t size)
1605 {
1606 	struct p_header h;
1607 	int ok;
1608 
1609 	h.magic   = BE_DRBD_MAGIC;
1610 	h.command = cpu_to_be16(cmd);
1611 	h.length  = cpu_to_be16(size);
1612 
1613 	if (!drbd_get_data_sock(mdev))
1614 		return 0;
1615 
1616 	ok = (sizeof(h) ==
1617 		drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
1618 	ok = ok && (size ==
1619 		drbd_send(mdev, mdev->data.socket, data, size, 0));
1620 
1621 	drbd_put_data_sock(mdev);
1622 
1623 	return ok;
1624 }
1625 
1626 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
1627 {
1628 	struct p_rs_param_89 *p;
1629 	struct socket *sock;
1630 	int size, rv;
1631 	const int apv = mdev->agreed_pro_version;
1632 
1633 	size = apv <= 87 ? sizeof(struct p_rs_param)
1634 		: apv == 88 ? sizeof(struct p_rs_param)
1635 			+ strlen(mdev->sync_conf.verify_alg) + 1
1636 		: /* 89 */    sizeof(struct p_rs_param_89);
1637 
1638 	/* used from admin command context and receiver/worker context.
1639 	 * to avoid kmalloc, grab the socket right here,
1640 	 * then use the pre-allocated sbuf there */
1641 	mutex_lock(&mdev->data.mutex);
1642 	sock = mdev->data.socket;
1643 
1644 	if (likely(sock != NULL)) {
1645 		enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
1646 
1647 		p = &mdev->data.sbuf.rs_param_89;
1648 
1649 		/* initialize verify_alg and csums_alg */
1650 		memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
1651 
1652 		p->rate = cpu_to_be32(sc->rate);
1653 
1654 		if (apv >= 88)
1655 			strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
1656 		if (apv >= 89)
1657 			strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
1658 
1659 		rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
1660 	} else
1661 		rv = 0; /* not ok */
1662 
1663 	mutex_unlock(&mdev->data.mutex);
1664 
1665 	return rv;
1666 }
1667 
1668 int drbd_send_protocol(struct drbd_conf *mdev)
1669 {
1670 	struct p_protocol *p;
1671 	int size, cf, rv;
1672 
1673 	size = sizeof(struct p_protocol);
1674 
1675 	if (mdev->agreed_pro_version >= 87)
1676 		size += strlen(mdev->net_conf->integrity_alg) + 1;
1677 
1678 	/* we must not recurse into our own queue,
1679 	 * as that is blocked during handshake */
1680 	p = kmalloc(size, GFP_NOIO);
1681 	if (p == NULL)
1682 		return 0;
1683 
1684 	p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
1685 	p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
1686 	p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
1687 	p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
1688 	p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
1689 
1690 	cf = 0;
1691 	if (mdev->net_conf->want_lose)
1692 		cf |= CF_WANT_LOSE;
1693 	if (mdev->net_conf->dry_run) {
1694 		if (mdev->agreed_pro_version >= 92)
1695 			cf |= CF_DRY_RUN;
1696 		else {
1697 			dev_err(DEV, "--dry-run is not supported by peer");
1698 			kfree(p);
1699 			return 0;
1700 		}
1701 	}
1702 	p->conn_flags    = cpu_to_be32(cf);
1703 
1704 	if (mdev->agreed_pro_version >= 87)
1705 		strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
1706 
1707 	rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
1708 			   (struct p_header *)p, size);
1709 	kfree(p);
1710 	return rv;
1711 }
1712 
1713 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1714 {
1715 	struct p_uuids p;
1716 	int i;
1717 
1718 	if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1719 		return 1;
1720 
1721 	for (i = UI_CURRENT; i < UI_SIZE; i++)
1722 		p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1723 
1724 	mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1725 	p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1726 	uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
1727 	uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1728 	uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1729 	p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1730 
1731 	put_ldev(mdev);
1732 
1733 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
1734 			     (struct p_header *)&p, sizeof(p));
1735 }
1736 
1737 int drbd_send_uuids(struct drbd_conf *mdev)
1738 {
1739 	return _drbd_send_uuids(mdev, 0);
1740 }
1741 
1742 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1743 {
1744 	return _drbd_send_uuids(mdev, 8);
1745 }
1746 
1747 
1748 int drbd_send_sync_uuid(struct drbd_conf *mdev, u64 val)
1749 {
1750 	struct p_rs_uuid p;
1751 
1752 	p.uuid = cpu_to_be64(val);
1753 
1754 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
1755 			     (struct p_header *)&p, sizeof(p));
1756 }
1757 
1758 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply)
1759 {
1760 	struct p_sizes p;
1761 	sector_t d_size, u_size;
1762 	int q_order_type;
1763 	int ok;
1764 
1765 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1766 		D_ASSERT(mdev->ldev->backing_bdev);
1767 		d_size = drbd_get_max_capacity(mdev->ldev);
1768 		u_size = mdev->ldev->dc.disk_size;
1769 		q_order_type = drbd_queue_order_type(mdev);
1770 		p.queue_order_type = cpu_to_be32(drbd_queue_order_type(mdev));
1771 		put_ldev(mdev);
1772 	} else {
1773 		d_size = 0;
1774 		u_size = 0;
1775 		q_order_type = QUEUE_ORDERED_NONE;
1776 	}
1777 
1778 	p.d_size = cpu_to_be64(d_size);
1779 	p.u_size = cpu_to_be64(u_size);
1780 	p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1781 	p.max_segment_size = cpu_to_be32(queue_max_segment_size(mdev->rq_queue));
1782 	p.queue_order_type = cpu_to_be32(q_order_type);
1783 
1784 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
1785 			   (struct p_header *)&p, sizeof(p));
1786 	return ok;
1787 }
1788 
1789 /**
1790  * drbd_send_state() - Sends the drbd state to the peer
1791  * @mdev:	DRBD device.
1792  */
1793 int drbd_send_state(struct drbd_conf *mdev)
1794 {
1795 	struct socket *sock;
1796 	struct p_state p;
1797 	int ok = 0;
1798 
1799 	/* Grab state lock so we wont send state if we're in the middle
1800 	 * of a cluster wide state change on another thread */
1801 	drbd_state_lock(mdev);
1802 
1803 	mutex_lock(&mdev->data.mutex);
1804 
1805 	p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1806 	sock = mdev->data.socket;
1807 
1808 	if (likely(sock != NULL)) {
1809 		ok = _drbd_send_cmd(mdev, sock, P_STATE,
1810 				    (struct p_header *)&p, sizeof(p), 0);
1811 	}
1812 
1813 	mutex_unlock(&mdev->data.mutex);
1814 
1815 	drbd_state_unlock(mdev);
1816 	return ok;
1817 }
1818 
1819 int drbd_send_state_req(struct drbd_conf *mdev,
1820 	union drbd_state mask, union drbd_state val)
1821 {
1822 	struct p_req_state p;
1823 
1824 	p.mask    = cpu_to_be32(mask.i);
1825 	p.val     = cpu_to_be32(val.i);
1826 
1827 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
1828 			     (struct p_header *)&p, sizeof(p));
1829 }
1830 
1831 int drbd_send_sr_reply(struct drbd_conf *mdev, int retcode)
1832 {
1833 	struct p_req_state_reply p;
1834 
1835 	p.retcode    = cpu_to_be32(retcode);
1836 
1837 	return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
1838 			     (struct p_header *)&p, sizeof(p));
1839 }
1840 
1841 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1842 	struct p_compressed_bm *p,
1843 	struct bm_xfer_ctx *c)
1844 {
1845 	struct bitstream bs;
1846 	unsigned long plain_bits;
1847 	unsigned long tmp;
1848 	unsigned long rl;
1849 	unsigned len;
1850 	unsigned toggle;
1851 	int bits;
1852 
1853 	/* may we use this feature? */
1854 	if ((mdev->sync_conf.use_rle == 0) ||
1855 		(mdev->agreed_pro_version < 90))
1856 			return 0;
1857 
1858 	if (c->bit_offset >= c->bm_bits)
1859 		return 0; /* nothing to do. */
1860 
1861 	/* use at most thus many bytes */
1862 	bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1863 	memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1864 	/* plain bits covered in this code string */
1865 	plain_bits = 0;
1866 
1867 	/* p->encoding & 0x80 stores whether the first run length is set.
1868 	 * bit offset is implicit.
1869 	 * start with toggle == 2 to be able to tell the first iteration */
1870 	toggle = 2;
1871 
1872 	/* see how much plain bits we can stuff into one packet
1873 	 * using RLE and VLI. */
1874 	do {
1875 		tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1876 				    : _drbd_bm_find_next(mdev, c->bit_offset);
1877 		if (tmp == -1UL)
1878 			tmp = c->bm_bits;
1879 		rl = tmp - c->bit_offset;
1880 
1881 		if (toggle == 2) { /* first iteration */
1882 			if (rl == 0) {
1883 				/* the first checked bit was set,
1884 				 * store start value, */
1885 				DCBP_set_start(p, 1);
1886 				/* but skip encoding of zero run length */
1887 				toggle = !toggle;
1888 				continue;
1889 			}
1890 			DCBP_set_start(p, 0);
1891 		}
1892 
1893 		/* paranoia: catch zero runlength.
1894 		 * can only happen if bitmap is modified while we scan it. */
1895 		if (rl == 0) {
1896 			dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1897 			    "t:%u bo:%lu\n", toggle, c->bit_offset);
1898 			return -1;
1899 		}
1900 
1901 		bits = vli_encode_bits(&bs, rl);
1902 		if (bits == -ENOBUFS) /* buffer full */
1903 			break;
1904 		if (bits <= 0) {
1905 			dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1906 			return 0;
1907 		}
1908 
1909 		toggle = !toggle;
1910 		plain_bits += rl;
1911 		c->bit_offset = tmp;
1912 	} while (c->bit_offset < c->bm_bits);
1913 
1914 	len = bs.cur.b - p->code + !!bs.cur.bit;
1915 
1916 	if (plain_bits < (len << 3)) {
1917 		/* incompressible with this method.
1918 		 * we need to rewind both word and bit position. */
1919 		c->bit_offset -= plain_bits;
1920 		bm_xfer_ctx_bit_to_word_offset(c);
1921 		c->bit_offset = c->word_offset * BITS_PER_LONG;
1922 		return 0;
1923 	}
1924 
1925 	/* RLE + VLI was able to compress it just fine.
1926 	 * update c->word_offset. */
1927 	bm_xfer_ctx_bit_to_word_offset(c);
1928 
1929 	/* store pad_bits */
1930 	DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1931 
1932 	return len;
1933 }
1934 
1935 enum { OK, FAILED, DONE }
1936 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
1937 	struct p_header *h, struct bm_xfer_ctx *c)
1938 {
1939 	struct p_compressed_bm *p = (void*)h;
1940 	unsigned long num_words;
1941 	int len;
1942 	int ok;
1943 
1944 	len = fill_bitmap_rle_bits(mdev, p, c);
1945 
1946 	if (len < 0)
1947 		return FAILED;
1948 
1949 	if (len) {
1950 		DCBP_set_code(p, RLE_VLI_Bits);
1951 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
1952 			sizeof(*p) + len, 0);
1953 
1954 		c->packets[0]++;
1955 		c->bytes[0] += sizeof(*p) + len;
1956 
1957 		if (c->bit_offset >= c->bm_bits)
1958 			len = 0; /* DONE */
1959 	} else {
1960 		/* was not compressible.
1961 		 * send a buffer full of plain text bits instead. */
1962 		num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
1963 		len = num_words * sizeof(long);
1964 		if (len)
1965 			drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
1966 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
1967 				   h, sizeof(struct p_header) + len, 0);
1968 		c->word_offset += num_words;
1969 		c->bit_offset = c->word_offset * BITS_PER_LONG;
1970 
1971 		c->packets[1]++;
1972 		c->bytes[1] += sizeof(struct p_header) + len;
1973 
1974 		if (c->bit_offset > c->bm_bits)
1975 			c->bit_offset = c->bm_bits;
1976 	}
1977 	ok = ok ? ((len == 0) ? DONE : OK) : FAILED;
1978 
1979 	if (ok == DONE)
1980 		INFO_bm_xfer_stats(mdev, "send", c);
1981 	return ok;
1982 }
1983 
1984 /* See the comment at receive_bitmap() */
1985 int _drbd_send_bitmap(struct drbd_conf *mdev)
1986 {
1987 	struct bm_xfer_ctx c;
1988 	struct p_header *p;
1989 	int ret;
1990 
1991 	ERR_IF(!mdev->bitmap) return FALSE;
1992 
1993 	/* maybe we should use some per thread scratch page,
1994 	 * and allocate that during initial device creation? */
1995 	p = (struct p_header *) __get_free_page(GFP_NOIO);
1996 	if (!p) {
1997 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
1998 		return FALSE;
1999 	}
2000 
2001 	if (get_ldev(mdev)) {
2002 		if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2003 			dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2004 			drbd_bm_set_all(mdev);
2005 			if (drbd_bm_write(mdev)) {
2006 				/* write_bm did fail! Leave full sync flag set in Meta P_DATA
2007 				 * but otherwise process as per normal - need to tell other
2008 				 * side that a full resync is required! */
2009 				dev_err(DEV, "Failed to write bitmap to disk!\n");
2010 			} else {
2011 				drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2012 				drbd_md_sync(mdev);
2013 			}
2014 		}
2015 		put_ldev(mdev);
2016 	}
2017 
2018 	c = (struct bm_xfer_ctx) {
2019 		.bm_bits = drbd_bm_bits(mdev),
2020 		.bm_words = drbd_bm_words(mdev),
2021 	};
2022 
2023 	do {
2024 		ret = send_bitmap_rle_or_plain(mdev, p, &c);
2025 	} while (ret == OK);
2026 
2027 	free_page((unsigned long) p);
2028 	return (ret == DONE);
2029 }
2030 
2031 int drbd_send_bitmap(struct drbd_conf *mdev)
2032 {
2033 	int err;
2034 
2035 	if (!drbd_get_data_sock(mdev))
2036 		return -1;
2037 	err = !_drbd_send_bitmap(mdev);
2038 	drbd_put_data_sock(mdev);
2039 	return err;
2040 }
2041 
2042 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2043 {
2044 	int ok;
2045 	struct p_barrier_ack p;
2046 
2047 	p.barrier  = barrier_nr;
2048 	p.set_size = cpu_to_be32(set_size);
2049 
2050 	if (mdev->state.conn < C_CONNECTED)
2051 		return FALSE;
2052 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2053 			(struct p_header *)&p, sizeof(p));
2054 	return ok;
2055 }
2056 
2057 /**
2058  * _drbd_send_ack() - Sends an ack packet
2059  * @mdev:	DRBD device.
2060  * @cmd:	Packet command code.
2061  * @sector:	sector, needs to be in big endian byte order
2062  * @blksize:	size in byte, needs to be in big endian byte order
2063  * @block_id:	Id, big endian byte order
2064  */
2065 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2066 			  u64 sector,
2067 			  u32 blksize,
2068 			  u64 block_id)
2069 {
2070 	int ok;
2071 	struct p_block_ack p;
2072 
2073 	p.sector   = sector;
2074 	p.block_id = block_id;
2075 	p.blksize  = blksize;
2076 	p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2077 
2078 	if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2079 		return FALSE;
2080 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2081 				(struct p_header *)&p, sizeof(p));
2082 	return ok;
2083 }
2084 
2085 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2086 		     struct p_data *dp)
2087 {
2088 	const int header_size = sizeof(struct p_data)
2089 			      - sizeof(struct p_header);
2090 	int data_size  = ((struct p_header *)dp)->length - header_size;
2091 
2092 	return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2093 			      dp->block_id);
2094 }
2095 
2096 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2097 		     struct p_block_req *rp)
2098 {
2099 	return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2100 }
2101 
2102 /**
2103  * drbd_send_ack() - Sends an ack packet
2104  * @mdev:	DRBD device.
2105  * @cmd:	Packet command code.
2106  * @e:		Epoch entry.
2107  */
2108 int drbd_send_ack(struct drbd_conf *mdev,
2109 	enum drbd_packets cmd, struct drbd_epoch_entry *e)
2110 {
2111 	return _drbd_send_ack(mdev, cmd,
2112 			      cpu_to_be64(e->sector),
2113 			      cpu_to_be32(e->size),
2114 			      e->block_id);
2115 }
2116 
2117 /* This function misuses the block_id field to signal if the blocks
2118  * are is sync or not. */
2119 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2120 		     sector_t sector, int blksize, u64 block_id)
2121 {
2122 	return _drbd_send_ack(mdev, cmd,
2123 			      cpu_to_be64(sector),
2124 			      cpu_to_be32(blksize),
2125 			      cpu_to_be64(block_id));
2126 }
2127 
2128 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2129 		       sector_t sector, int size, u64 block_id)
2130 {
2131 	int ok;
2132 	struct p_block_req p;
2133 
2134 	p.sector   = cpu_to_be64(sector);
2135 	p.block_id = block_id;
2136 	p.blksize  = cpu_to_be32(size);
2137 
2138 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2139 				(struct p_header *)&p, sizeof(p));
2140 	return ok;
2141 }
2142 
2143 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2144 			    sector_t sector, int size,
2145 			    void *digest, int digest_size,
2146 			    enum drbd_packets cmd)
2147 {
2148 	int ok;
2149 	struct p_block_req p;
2150 
2151 	p.sector   = cpu_to_be64(sector);
2152 	p.block_id = BE_DRBD_MAGIC + 0xbeef;
2153 	p.blksize  = cpu_to_be32(size);
2154 
2155 	p.head.magic   = BE_DRBD_MAGIC;
2156 	p.head.command = cpu_to_be16(cmd);
2157 	p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header) + digest_size);
2158 
2159 	mutex_lock(&mdev->data.mutex);
2160 
2161 	ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2162 	ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2163 
2164 	mutex_unlock(&mdev->data.mutex);
2165 
2166 	return ok;
2167 }
2168 
2169 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2170 {
2171 	int ok;
2172 	struct p_block_req p;
2173 
2174 	p.sector   = cpu_to_be64(sector);
2175 	p.block_id = BE_DRBD_MAGIC + 0xbabe;
2176 	p.blksize  = cpu_to_be32(size);
2177 
2178 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2179 			   (struct p_header *)&p, sizeof(p));
2180 	return ok;
2181 }
2182 
2183 /* called on sndtimeo
2184  * returns FALSE if we should retry,
2185  * TRUE if we think connection is dead
2186  */
2187 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2188 {
2189 	int drop_it;
2190 	/* long elapsed = (long)(jiffies - mdev->last_received); */
2191 
2192 	drop_it =   mdev->meta.socket == sock
2193 		|| !mdev->asender.task
2194 		|| get_t_state(&mdev->asender) != Running
2195 		|| mdev->state.conn < C_CONNECTED;
2196 
2197 	if (drop_it)
2198 		return TRUE;
2199 
2200 	drop_it = !--mdev->ko_count;
2201 	if (!drop_it) {
2202 		dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2203 		       current->comm, current->pid, mdev->ko_count);
2204 		request_ping(mdev);
2205 	}
2206 
2207 	return drop_it; /* && (mdev->state == R_PRIMARY) */;
2208 }
2209 
2210 /* The idea of sendpage seems to be to put some kind of reference
2211  * to the page into the skb, and to hand it over to the NIC. In
2212  * this process get_page() gets called.
2213  *
2214  * As soon as the page was really sent over the network put_page()
2215  * gets called by some part of the network layer. [ NIC driver? ]
2216  *
2217  * [ get_page() / put_page() increment/decrement the count. If count
2218  *   reaches 0 the page will be freed. ]
2219  *
2220  * This works nicely with pages from FSs.
2221  * But this means that in protocol A we might signal IO completion too early!
2222  *
2223  * In order not to corrupt data during a resync we must make sure
2224  * that we do not reuse our own buffer pages (EEs) to early, therefore
2225  * we have the net_ee list.
2226  *
2227  * XFS seems to have problems, still, it submits pages with page_count == 0!
2228  * As a workaround, we disable sendpage on pages
2229  * with page_count == 0 or PageSlab.
2230  */
2231 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2232 		   int offset, size_t size)
2233 {
2234 	int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, 0);
2235 	kunmap(page);
2236 	if (sent == size)
2237 		mdev->send_cnt += size>>9;
2238 	return sent == size;
2239 }
2240 
2241 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2242 		    int offset, size_t size)
2243 {
2244 	mm_segment_t oldfs = get_fs();
2245 	int sent, ok;
2246 	int len = size;
2247 
2248 	/* e.g. XFS meta- & log-data is in slab pages, which have a
2249 	 * page_count of 0 and/or have PageSlab() set.
2250 	 * we cannot use send_page for those, as that does get_page();
2251 	 * put_page(); and would cause either a VM_BUG directly, or
2252 	 * __page_cache_release a page that would actually still be referenced
2253 	 * by someone, leading to some obscure delayed Oops somewhere else. */
2254 	if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2255 		return _drbd_no_send_page(mdev, page, offset, size);
2256 
2257 	drbd_update_congested(mdev);
2258 	set_fs(KERNEL_DS);
2259 	do {
2260 		sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2261 							offset, len,
2262 							MSG_NOSIGNAL);
2263 		if (sent == -EAGAIN) {
2264 			if (we_should_drop_the_connection(mdev,
2265 							  mdev->data.socket))
2266 				break;
2267 			else
2268 				continue;
2269 		}
2270 		if (sent <= 0) {
2271 			dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2272 			     __func__, (int)size, len, sent);
2273 			break;
2274 		}
2275 		len    -= sent;
2276 		offset += sent;
2277 	} while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2278 	set_fs(oldfs);
2279 	clear_bit(NET_CONGESTED, &mdev->flags);
2280 
2281 	ok = (len == 0);
2282 	if (likely(ok))
2283 		mdev->send_cnt += size>>9;
2284 	return ok;
2285 }
2286 
2287 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2288 {
2289 	struct bio_vec *bvec;
2290 	int i;
2291 	__bio_for_each_segment(bvec, bio, i, 0) {
2292 		if (!_drbd_no_send_page(mdev, bvec->bv_page,
2293 				     bvec->bv_offset, bvec->bv_len))
2294 			return 0;
2295 	}
2296 	return 1;
2297 }
2298 
2299 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2300 {
2301 	struct bio_vec *bvec;
2302 	int i;
2303 	__bio_for_each_segment(bvec, bio, i, 0) {
2304 		if (!_drbd_send_page(mdev, bvec->bv_page,
2305 				     bvec->bv_offset, bvec->bv_len))
2306 			return 0;
2307 	}
2308 
2309 	return 1;
2310 }
2311 
2312 /* Used to send write requests
2313  * R_PRIMARY -> Peer	(P_DATA)
2314  */
2315 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2316 {
2317 	int ok = 1;
2318 	struct p_data p;
2319 	unsigned int dp_flags = 0;
2320 	void *dgb;
2321 	int dgs;
2322 
2323 	if (!drbd_get_data_sock(mdev))
2324 		return 0;
2325 
2326 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2327 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2328 
2329 	p.head.magic   = BE_DRBD_MAGIC;
2330 	p.head.command = cpu_to_be16(P_DATA);
2331 	p.head.length  =
2332 		cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + req->size);
2333 
2334 	p.sector   = cpu_to_be64(req->sector);
2335 	p.block_id = (unsigned long)req;
2336 	p.seq_num  = cpu_to_be32(req->seq_num =
2337 				 atomic_add_return(1, &mdev->packet_seq));
2338 	dp_flags = 0;
2339 
2340 	/* NOTE: no need to check if barriers supported here as we would
2341 	 *       not pass the test in make_request_common in that case
2342 	 */
2343 	if (bio_rw_flagged(req->master_bio, BIO_RW_BARRIER)) {
2344 		dev_err(DEV, "ASSERT FAILED would have set DP_HARDBARRIER\n");
2345 		/* dp_flags |= DP_HARDBARRIER; */
2346 	}
2347 	if (bio_rw_flagged(req->master_bio, BIO_RW_SYNCIO))
2348 		dp_flags |= DP_RW_SYNC;
2349 	/* for now handle SYNCIO and UNPLUG
2350 	 * as if they still were one and the same flag */
2351 	if (bio_rw_flagged(req->master_bio, BIO_RW_UNPLUG))
2352 		dp_flags |= DP_RW_SYNC;
2353 	if (mdev->state.conn >= C_SYNC_SOURCE &&
2354 	    mdev->state.conn <= C_PAUSED_SYNC_T)
2355 		dp_flags |= DP_MAY_SET_IN_SYNC;
2356 
2357 	p.dp_flags = cpu_to_be32(dp_flags);
2358 	set_bit(UNPLUG_REMOTE, &mdev->flags);
2359 	ok = (sizeof(p) ==
2360 		drbd_send(mdev, mdev->data.socket, &p, sizeof(p), MSG_MORE));
2361 	if (ok && dgs) {
2362 		dgb = mdev->int_dig_out;
2363 		drbd_csum(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2364 		ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
2365 	}
2366 	if (ok) {
2367 		if (mdev->net_conf->wire_protocol == DRBD_PROT_A)
2368 			ok = _drbd_send_bio(mdev, req->master_bio);
2369 		else
2370 			ok = _drbd_send_zc_bio(mdev, req->master_bio);
2371 	}
2372 
2373 	drbd_put_data_sock(mdev);
2374 	return ok;
2375 }
2376 
2377 /* answer packet, used to send data back for read requests:
2378  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2379  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2380  */
2381 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2382 		    struct drbd_epoch_entry *e)
2383 {
2384 	int ok;
2385 	struct p_data p;
2386 	void *dgb;
2387 	int dgs;
2388 
2389 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2390 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2391 
2392 	p.head.magic   = BE_DRBD_MAGIC;
2393 	p.head.command = cpu_to_be16(cmd);
2394 	p.head.length  =
2395 		cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + e->size);
2396 
2397 	p.sector   = cpu_to_be64(e->sector);
2398 	p.block_id = e->block_id;
2399 	/* p.seq_num  = 0;    No sequence numbers here.. */
2400 
2401 	/* Only called by our kernel thread.
2402 	 * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2403 	 * in response to admin command or module unload.
2404 	 */
2405 	if (!drbd_get_data_sock(mdev))
2406 		return 0;
2407 
2408 	ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p,
2409 					sizeof(p), MSG_MORE);
2410 	if (ok && dgs) {
2411 		dgb = mdev->int_dig_out;
2412 		drbd_csum(mdev, mdev->integrity_w_tfm, e->private_bio, dgb);
2413 		ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
2414 	}
2415 	if (ok)
2416 		ok = _drbd_send_zc_bio(mdev, e->private_bio);
2417 
2418 	drbd_put_data_sock(mdev);
2419 	return ok;
2420 }
2421 
2422 /*
2423   drbd_send distinguishes two cases:
2424 
2425   Packets sent via the data socket "sock"
2426   and packets sent via the meta data socket "msock"
2427 
2428 		    sock                      msock
2429   -----------------+-------------------------+------------------------------
2430   timeout           conf.timeout / 2          conf.timeout / 2
2431   timeout action    send a ping via msock     Abort communication
2432 					      and close all sockets
2433 */
2434 
2435 /*
2436  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2437  */
2438 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2439 	      void *buf, size_t size, unsigned msg_flags)
2440 {
2441 	struct kvec iov;
2442 	struct msghdr msg;
2443 	int rv, sent = 0;
2444 
2445 	if (!sock)
2446 		return -1000;
2447 
2448 	/* THINK  if (signal_pending) return ... ? */
2449 
2450 	iov.iov_base = buf;
2451 	iov.iov_len  = size;
2452 
2453 	msg.msg_name       = NULL;
2454 	msg.msg_namelen    = 0;
2455 	msg.msg_control    = NULL;
2456 	msg.msg_controllen = 0;
2457 	msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
2458 
2459 	if (sock == mdev->data.socket) {
2460 		mdev->ko_count = mdev->net_conf->ko_count;
2461 		drbd_update_congested(mdev);
2462 	}
2463 	do {
2464 		/* STRANGE
2465 		 * tcp_sendmsg does _not_ use its size parameter at all ?
2466 		 *
2467 		 * -EAGAIN on timeout, -EINTR on signal.
2468 		 */
2469 /* THINK
2470  * do we need to block DRBD_SIG if sock == &meta.socket ??
2471  * otherwise wake_asender() might interrupt some send_*Ack !
2472  */
2473 		rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
2474 		if (rv == -EAGAIN) {
2475 			if (we_should_drop_the_connection(mdev, sock))
2476 				break;
2477 			else
2478 				continue;
2479 		}
2480 		D_ASSERT(rv != 0);
2481 		if (rv == -EINTR) {
2482 			flush_signals(current);
2483 			rv = 0;
2484 		}
2485 		if (rv < 0)
2486 			break;
2487 		sent += rv;
2488 		iov.iov_base += rv;
2489 		iov.iov_len  -= rv;
2490 	} while (sent < size);
2491 
2492 	if (sock == mdev->data.socket)
2493 		clear_bit(NET_CONGESTED, &mdev->flags);
2494 
2495 	if (rv <= 0) {
2496 		if (rv != -EAGAIN) {
2497 			dev_err(DEV, "%s_sendmsg returned %d\n",
2498 			    sock == mdev->meta.socket ? "msock" : "sock",
2499 			    rv);
2500 			drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
2501 		} else
2502 			drbd_force_state(mdev, NS(conn, C_TIMEOUT));
2503 	}
2504 
2505 	return sent;
2506 }
2507 
2508 static int drbd_open(struct block_device *bdev, fmode_t mode)
2509 {
2510 	struct drbd_conf *mdev = bdev->bd_disk->private_data;
2511 	unsigned long flags;
2512 	int rv = 0;
2513 
2514 	spin_lock_irqsave(&mdev->req_lock, flags);
2515 	/* to have a stable mdev->state.role
2516 	 * and no race with updating open_cnt */
2517 
2518 	if (mdev->state.role != R_PRIMARY) {
2519 		if (mode & FMODE_WRITE)
2520 			rv = -EROFS;
2521 		else if (!allow_oos)
2522 			rv = -EMEDIUMTYPE;
2523 	}
2524 
2525 	if (!rv)
2526 		mdev->open_cnt++;
2527 	spin_unlock_irqrestore(&mdev->req_lock, flags);
2528 
2529 	return rv;
2530 }
2531 
2532 static int drbd_release(struct gendisk *gd, fmode_t mode)
2533 {
2534 	struct drbd_conf *mdev = gd->private_data;
2535 	mdev->open_cnt--;
2536 	return 0;
2537 }
2538 
2539 static void drbd_unplug_fn(struct request_queue *q)
2540 {
2541 	struct drbd_conf *mdev = q->queuedata;
2542 
2543 	/* unplug FIRST */
2544 	spin_lock_irq(q->queue_lock);
2545 	blk_remove_plug(q);
2546 	spin_unlock_irq(q->queue_lock);
2547 
2548 	/* only if connected */
2549 	spin_lock_irq(&mdev->req_lock);
2550 	if (mdev->state.pdsk >= D_INCONSISTENT && mdev->state.conn >= C_CONNECTED) {
2551 		D_ASSERT(mdev->state.role == R_PRIMARY);
2552 		if (test_and_clear_bit(UNPLUG_REMOTE, &mdev->flags)) {
2553 			/* add to the data.work queue,
2554 			 * unless already queued.
2555 			 * XXX this might be a good addition to drbd_queue_work
2556 			 * anyways, to detect "double queuing" ... */
2557 			if (list_empty(&mdev->unplug_work.list))
2558 				drbd_queue_work(&mdev->data.work,
2559 						&mdev->unplug_work);
2560 		}
2561 	}
2562 	spin_unlock_irq(&mdev->req_lock);
2563 
2564 	if (mdev->state.disk >= D_INCONSISTENT)
2565 		drbd_kick_lo(mdev);
2566 }
2567 
2568 static void drbd_set_defaults(struct drbd_conf *mdev)
2569 {
2570 	mdev->sync_conf.after      = DRBD_AFTER_DEF;
2571 	mdev->sync_conf.rate       = DRBD_RATE_DEF;
2572 	mdev->sync_conf.al_extents = DRBD_AL_EXTENTS_DEF;
2573 	mdev->state = (union drbd_state) {
2574 		{ .role = R_SECONDARY,
2575 		  .peer = R_UNKNOWN,
2576 		  .conn = C_STANDALONE,
2577 		  .disk = D_DISKLESS,
2578 		  .pdsk = D_UNKNOWN,
2579 		  .susp = 0
2580 		} };
2581 }
2582 
2583 void drbd_init_set_defaults(struct drbd_conf *mdev)
2584 {
2585 	/* the memset(,0,) did most of this.
2586 	 * note: only assignments, no allocation in here */
2587 
2588 	drbd_set_defaults(mdev);
2589 
2590 	/* for now, we do NOT yet support it,
2591 	 * even though we start some framework
2592 	 * to eventually support barriers */
2593 	set_bit(NO_BARRIER_SUPP, &mdev->flags);
2594 
2595 	atomic_set(&mdev->ap_bio_cnt, 0);
2596 	atomic_set(&mdev->ap_pending_cnt, 0);
2597 	atomic_set(&mdev->rs_pending_cnt, 0);
2598 	atomic_set(&mdev->unacked_cnt, 0);
2599 	atomic_set(&mdev->local_cnt, 0);
2600 	atomic_set(&mdev->net_cnt, 0);
2601 	atomic_set(&mdev->packet_seq, 0);
2602 	atomic_set(&mdev->pp_in_use, 0);
2603 
2604 	mutex_init(&mdev->md_io_mutex);
2605 	mutex_init(&mdev->data.mutex);
2606 	mutex_init(&mdev->meta.mutex);
2607 	sema_init(&mdev->data.work.s, 0);
2608 	sema_init(&mdev->meta.work.s, 0);
2609 	mutex_init(&mdev->state_mutex);
2610 
2611 	spin_lock_init(&mdev->data.work.q_lock);
2612 	spin_lock_init(&mdev->meta.work.q_lock);
2613 
2614 	spin_lock_init(&mdev->al_lock);
2615 	spin_lock_init(&mdev->req_lock);
2616 	spin_lock_init(&mdev->peer_seq_lock);
2617 	spin_lock_init(&mdev->epoch_lock);
2618 
2619 	INIT_LIST_HEAD(&mdev->active_ee);
2620 	INIT_LIST_HEAD(&mdev->sync_ee);
2621 	INIT_LIST_HEAD(&mdev->done_ee);
2622 	INIT_LIST_HEAD(&mdev->read_ee);
2623 	INIT_LIST_HEAD(&mdev->net_ee);
2624 	INIT_LIST_HEAD(&mdev->resync_reads);
2625 	INIT_LIST_HEAD(&mdev->data.work.q);
2626 	INIT_LIST_HEAD(&mdev->meta.work.q);
2627 	INIT_LIST_HEAD(&mdev->resync_work.list);
2628 	INIT_LIST_HEAD(&mdev->unplug_work.list);
2629 	INIT_LIST_HEAD(&mdev->md_sync_work.list);
2630 	INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2631 	mdev->resync_work.cb  = w_resync_inactive;
2632 	mdev->unplug_work.cb  = w_send_write_hint;
2633 	mdev->md_sync_work.cb = w_md_sync;
2634 	mdev->bm_io_work.w.cb = w_bitmap_io;
2635 	init_timer(&mdev->resync_timer);
2636 	init_timer(&mdev->md_sync_timer);
2637 	mdev->resync_timer.function = resync_timer_fn;
2638 	mdev->resync_timer.data = (unsigned long) mdev;
2639 	mdev->md_sync_timer.function = md_sync_timer_fn;
2640 	mdev->md_sync_timer.data = (unsigned long) mdev;
2641 
2642 	init_waitqueue_head(&mdev->misc_wait);
2643 	init_waitqueue_head(&mdev->state_wait);
2644 	init_waitqueue_head(&mdev->ee_wait);
2645 	init_waitqueue_head(&mdev->al_wait);
2646 	init_waitqueue_head(&mdev->seq_wait);
2647 
2648 	drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
2649 	drbd_thread_init(mdev, &mdev->worker, drbd_worker);
2650 	drbd_thread_init(mdev, &mdev->asender, drbd_asender);
2651 
2652 	mdev->agreed_pro_version = PRO_VERSION_MAX;
2653 	mdev->write_ordering = WO_bio_barrier;
2654 	mdev->resync_wenr = LC_FREE;
2655 }
2656 
2657 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2658 {
2659 	if (mdev->receiver.t_state != None)
2660 		dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2661 				mdev->receiver.t_state);
2662 
2663 	/* no need to lock it, I'm the only thread alive */
2664 	if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2665 		dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2666 	mdev->al_writ_cnt  =
2667 	mdev->bm_writ_cnt  =
2668 	mdev->read_cnt     =
2669 	mdev->recv_cnt     =
2670 	mdev->send_cnt     =
2671 	mdev->writ_cnt     =
2672 	mdev->p_size       =
2673 	mdev->rs_start     =
2674 	mdev->rs_total     =
2675 	mdev->rs_failed    =
2676 	mdev->rs_mark_left =
2677 	mdev->rs_mark_time = 0;
2678 	D_ASSERT(mdev->net_conf == NULL);
2679 
2680 	drbd_set_my_capacity(mdev, 0);
2681 	if (mdev->bitmap) {
2682 		/* maybe never allocated. */
2683 		drbd_bm_resize(mdev, 0);
2684 		drbd_bm_cleanup(mdev);
2685 	}
2686 
2687 	drbd_free_resources(mdev);
2688 
2689 	/*
2690 	 * currently we drbd_init_ee only on module load, so
2691 	 * we may do drbd_release_ee only on module unload!
2692 	 */
2693 	D_ASSERT(list_empty(&mdev->active_ee));
2694 	D_ASSERT(list_empty(&mdev->sync_ee));
2695 	D_ASSERT(list_empty(&mdev->done_ee));
2696 	D_ASSERT(list_empty(&mdev->read_ee));
2697 	D_ASSERT(list_empty(&mdev->net_ee));
2698 	D_ASSERT(list_empty(&mdev->resync_reads));
2699 	D_ASSERT(list_empty(&mdev->data.work.q));
2700 	D_ASSERT(list_empty(&mdev->meta.work.q));
2701 	D_ASSERT(list_empty(&mdev->resync_work.list));
2702 	D_ASSERT(list_empty(&mdev->unplug_work.list));
2703 
2704 }
2705 
2706 
2707 static void drbd_destroy_mempools(void)
2708 {
2709 	struct page *page;
2710 
2711 	while (drbd_pp_pool) {
2712 		page = drbd_pp_pool;
2713 		drbd_pp_pool = (struct page *)page_private(page);
2714 		__free_page(page);
2715 		drbd_pp_vacant--;
2716 	}
2717 
2718 	/* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2719 
2720 	if (drbd_ee_mempool)
2721 		mempool_destroy(drbd_ee_mempool);
2722 	if (drbd_request_mempool)
2723 		mempool_destroy(drbd_request_mempool);
2724 	if (drbd_ee_cache)
2725 		kmem_cache_destroy(drbd_ee_cache);
2726 	if (drbd_request_cache)
2727 		kmem_cache_destroy(drbd_request_cache);
2728 	if (drbd_bm_ext_cache)
2729 		kmem_cache_destroy(drbd_bm_ext_cache);
2730 	if (drbd_al_ext_cache)
2731 		kmem_cache_destroy(drbd_al_ext_cache);
2732 
2733 	drbd_ee_mempool      = NULL;
2734 	drbd_request_mempool = NULL;
2735 	drbd_ee_cache        = NULL;
2736 	drbd_request_cache   = NULL;
2737 	drbd_bm_ext_cache    = NULL;
2738 	drbd_al_ext_cache    = NULL;
2739 
2740 	return;
2741 }
2742 
2743 static int drbd_create_mempools(void)
2744 {
2745 	struct page *page;
2746 	const int number = (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE) * minor_count;
2747 	int i;
2748 
2749 	/* prepare our caches and mempools */
2750 	drbd_request_mempool = NULL;
2751 	drbd_ee_cache        = NULL;
2752 	drbd_request_cache   = NULL;
2753 	drbd_bm_ext_cache    = NULL;
2754 	drbd_al_ext_cache    = NULL;
2755 	drbd_pp_pool         = NULL;
2756 
2757 	/* caches */
2758 	drbd_request_cache = kmem_cache_create(
2759 		"drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2760 	if (drbd_request_cache == NULL)
2761 		goto Enomem;
2762 
2763 	drbd_ee_cache = kmem_cache_create(
2764 		"drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
2765 	if (drbd_ee_cache == NULL)
2766 		goto Enomem;
2767 
2768 	drbd_bm_ext_cache = kmem_cache_create(
2769 		"drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2770 	if (drbd_bm_ext_cache == NULL)
2771 		goto Enomem;
2772 
2773 	drbd_al_ext_cache = kmem_cache_create(
2774 		"drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2775 	if (drbd_al_ext_cache == NULL)
2776 		goto Enomem;
2777 
2778 	/* mempools */
2779 	drbd_request_mempool = mempool_create(number,
2780 		mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2781 	if (drbd_request_mempool == NULL)
2782 		goto Enomem;
2783 
2784 	drbd_ee_mempool = mempool_create(number,
2785 		mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2786 	if (drbd_request_mempool == NULL)
2787 		goto Enomem;
2788 
2789 	/* drbd's page pool */
2790 	spin_lock_init(&drbd_pp_lock);
2791 
2792 	for (i = 0; i < number; i++) {
2793 		page = alloc_page(GFP_HIGHUSER);
2794 		if (!page)
2795 			goto Enomem;
2796 		set_page_private(page, (unsigned long)drbd_pp_pool);
2797 		drbd_pp_pool = page;
2798 	}
2799 	drbd_pp_vacant = number;
2800 
2801 	return 0;
2802 
2803 Enomem:
2804 	drbd_destroy_mempools(); /* in case we allocated some */
2805 	return -ENOMEM;
2806 }
2807 
2808 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2809 	void *unused)
2810 {
2811 	/* just so we have it.  you never know what interesting things we
2812 	 * might want to do here some day...
2813 	 */
2814 
2815 	return NOTIFY_DONE;
2816 }
2817 
2818 static struct notifier_block drbd_notifier = {
2819 	.notifier_call = drbd_notify_sys,
2820 };
2821 
2822 static void drbd_release_ee_lists(struct drbd_conf *mdev)
2823 {
2824 	int rr;
2825 
2826 	rr = drbd_release_ee(mdev, &mdev->active_ee);
2827 	if (rr)
2828 		dev_err(DEV, "%d EEs in active list found!\n", rr);
2829 
2830 	rr = drbd_release_ee(mdev, &mdev->sync_ee);
2831 	if (rr)
2832 		dev_err(DEV, "%d EEs in sync list found!\n", rr);
2833 
2834 	rr = drbd_release_ee(mdev, &mdev->read_ee);
2835 	if (rr)
2836 		dev_err(DEV, "%d EEs in read list found!\n", rr);
2837 
2838 	rr = drbd_release_ee(mdev, &mdev->done_ee);
2839 	if (rr)
2840 		dev_err(DEV, "%d EEs in done list found!\n", rr);
2841 
2842 	rr = drbd_release_ee(mdev, &mdev->net_ee);
2843 	if (rr)
2844 		dev_err(DEV, "%d EEs in net list found!\n", rr);
2845 }
2846 
2847 /* caution. no locking.
2848  * currently only used from module cleanup code. */
2849 static void drbd_delete_device(unsigned int minor)
2850 {
2851 	struct drbd_conf *mdev = minor_to_mdev(minor);
2852 
2853 	if (!mdev)
2854 		return;
2855 
2856 	/* paranoia asserts */
2857 	if (mdev->open_cnt != 0)
2858 		dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
2859 				__FILE__ , __LINE__);
2860 
2861 	ERR_IF (!list_empty(&mdev->data.work.q)) {
2862 		struct list_head *lp;
2863 		list_for_each(lp, &mdev->data.work.q) {
2864 			dev_err(DEV, "lp = %p\n", lp);
2865 		}
2866 	};
2867 	/* end paranoia asserts */
2868 
2869 	del_gendisk(mdev->vdisk);
2870 
2871 	/* cleanup stuff that may have been allocated during
2872 	 * device (re-)configuration or state changes */
2873 
2874 	if (mdev->this_bdev)
2875 		bdput(mdev->this_bdev);
2876 
2877 	drbd_free_resources(mdev);
2878 
2879 	drbd_release_ee_lists(mdev);
2880 
2881 	/* should be free'd on disconnect? */
2882 	kfree(mdev->ee_hash);
2883 	/*
2884 	mdev->ee_hash_s = 0;
2885 	mdev->ee_hash = NULL;
2886 	*/
2887 
2888 	lc_destroy(mdev->act_log);
2889 	lc_destroy(mdev->resync);
2890 
2891 	kfree(mdev->p_uuid);
2892 	/* mdev->p_uuid = NULL; */
2893 
2894 	kfree(mdev->int_dig_out);
2895 	kfree(mdev->int_dig_in);
2896 	kfree(mdev->int_dig_vv);
2897 
2898 	/* cleanup the rest that has been
2899 	 * allocated from drbd_new_device
2900 	 * and actually free the mdev itself */
2901 	drbd_free_mdev(mdev);
2902 }
2903 
2904 static void drbd_cleanup(void)
2905 {
2906 	unsigned int i;
2907 
2908 	unregister_reboot_notifier(&drbd_notifier);
2909 
2910 	drbd_nl_cleanup();
2911 
2912 	if (minor_table) {
2913 		if (drbd_proc)
2914 			remove_proc_entry("drbd", NULL);
2915 		i = minor_count;
2916 		while (i--)
2917 			drbd_delete_device(i);
2918 		drbd_destroy_mempools();
2919 	}
2920 
2921 	kfree(minor_table);
2922 
2923 	unregister_blkdev(DRBD_MAJOR, "drbd");
2924 
2925 	printk(KERN_INFO "drbd: module cleanup done.\n");
2926 }
2927 
2928 /**
2929  * drbd_congested() - Callback for pdflush
2930  * @congested_data:	User data
2931  * @bdi_bits:		Bits pdflush is currently interested in
2932  *
2933  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2934  */
2935 static int drbd_congested(void *congested_data, int bdi_bits)
2936 {
2937 	struct drbd_conf *mdev = congested_data;
2938 	struct request_queue *q;
2939 	char reason = '-';
2940 	int r = 0;
2941 
2942 	if (!__inc_ap_bio_cond(mdev)) {
2943 		/* DRBD has frozen IO */
2944 		r = bdi_bits;
2945 		reason = 'd';
2946 		goto out;
2947 	}
2948 
2949 	if (get_ldev(mdev)) {
2950 		q = bdev_get_queue(mdev->ldev->backing_bdev);
2951 		r = bdi_congested(&q->backing_dev_info, bdi_bits);
2952 		put_ldev(mdev);
2953 		if (r)
2954 			reason = 'b';
2955 	}
2956 
2957 	if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
2958 		r |= (1 << BDI_async_congested);
2959 		reason = reason == 'b' ? 'a' : 'n';
2960 	}
2961 
2962 out:
2963 	mdev->congestion_reason = reason;
2964 	return r;
2965 }
2966 
2967 struct drbd_conf *drbd_new_device(unsigned int minor)
2968 {
2969 	struct drbd_conf *mdev;
2970 	struct gendisk *disk;
2971 	struct request_queue *q;
2972 
2973 	/* GFP_KERNEL, we are outside of all write-out paths */
2974 	mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2975 	if (!mdev)
2976 		return NULL;
2977 	if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
2978 		goto out_no_cpumask;
2979 
2980 	mdev->minor = minor;
2981 
2982 	drbd_init_set_defaults(mdev);
2983 
2984 	q = blk_alloc_queue(GFP_KERNEL);
2985 	if (!q)
2986 		goto out_no_q;
2987 	mdev->rq_queue = q;
2988 	q->queuedata   = mdev;
2989 
2990 	disk = alloc_disk(1);
2991 	if (!disk)
2992 		goto out_no_disk;
2993 	mdev->vdisk = disk;
2994 
2995 	set_disk_ro(disk, TRUE);
2996 
2997 	disk->queue = q;
2998 	disk->major = DRBD_MAJOR;
2999 	disk->first_minor = minor;
3000 	disk->fops = &drbd_ops;
3001 	sprintf(disk->disk_name, "drbd%d", minor);
3002 	disk->private_data = mdev;
3003 
3004 	mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3005 	/* we have no partitions. we contain only ourselves. */
3006 	mdev->this_bdev->bd_contains = mdev->this_bdev;
3007 
3008 	q->backing_dev_info.congested_fn = drbd_congested;
3009 	q->backing_dev_info.congested_data = mdev;
3010 
3011 	blk_queue_make_request(q, drbd_make_request_26);
3012 	blk_queue_max_segment_size(q, DRBD_MAX_SEGMENT_SIZE);
3013 	blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3014 	blk_queue_merge_bvec(q, drbd_merge_bvec);
3015 	q->queue_lock = &mdev->req_lock; /* needed since we use */
3016 		/* plugging on a queue, that actually has no requests! */
3017 	q->unplug_fn = drbd_unplug_fn;
3018 
3019 	mdev->md_io_page = alloc_page(GFP_KERNEL);
3020 	if (!mdev->md_io_page)
3021 		goto out_no_io_page;
3022 
3023 	if (drbd_bm_init(mdev))
3024 		goto out_no_bitmap;
3025 	/* no need to lock access, we are still initializing this minor device. */
3026 	if (!tl_init(mdev))
3027 		goto out_no_tl;
3028 
3029 	mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3030 	if (!mdev->app_reads_hash)
3031 		goto out_no_app_reads;
3032 
3033 	mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3034 	if (!mdev->current_epoch)
3035 		goto out_no_epoch;
3036 
3037 	INIT_LIST_HEAD(&mdev->current_epoch->list);
3038 	mdev->epochs = 1;
3039 
3040 	return mdev;
3041 
3042 /* out_whatever_else:
3043 	kfree(mdev->current_epoch); */
3044 out_no_epoch:
3045 	kfree(mdev->app_reads_hash);
3046 out_no_app_reads:
3047 	tl_cleanup(mdev);
3048 out_no_tl:
3049 	drbd_bm_cleanup(mdev);
3050 out_no_bitmap:
3051 	__free_page(mdev->md_io_page);
3052 out_no_io_page:
3053 	put_disk(disk);
3054 out_no_disk:
3055 	blk_cleanup_queue(q);
3056 out_no_q:
3057 	free_cpumask_var(mdev->cpu_mask);
3058 out_no_cpumask:
3059 	kfree(mdev);
3060 	return NULL;
3061 }
3062 
3063 /* counterpart of drbd_new_device.
3064  * last part of drbd_delete_device. */
3065 void drbd_free_mdev(struct drbd_conf *mdev)
3066 {
3067 	kfree(mdev->current_epoch);
3068 	kfree(mdev->app_reads_hash);
3069 	tl_cleanup(mdev);
3070 	if (mdev->bitmap) /* should no longer be there. */
3071 		drbd_bm_cleanup(mdev);
3072 	__free_page(mdev->md_io_page);
3073 	put_disk(mdev->vdisk);
3074 	blk_cleanup_queue(mdev->rq_queue);
3075 	free_cpumask_var(mdev->cpu_mask);
3076 	kfree(mdev);
3077 }
3078 
3079 
3080 int __init drbd_init(void)
3081 {
3082 	int err;
3083 
3084 	if (sizeof(struct p_handshake) != 80) {
3085 		printk(KERN_ERR
3086 		       "drbd: never change the size or layout "
3087 		       "of the HandShake packet.\n");
3088 		return -EINVAL;
3089 	}
3090 
3091 	if (1 > minor_count || minor_count > 255) {
3092 		printk(KERN_ERR
3093 			"drbd: invalid minor_count (%d)\n", minor_count);
3094 #ifdef MODULE
3095 		return -EINVAL;
3096 #else
3097 		minor_count = 8;
3098 #endif
3099 	}
3100 
3101 	err = drbd_nl_init();
3102 	if (err)
3103 		return err;
3104 
3105 	err = register_blkdev(DRBD_MAJOR, "drbd");
3106 	if (err) {
3107 		printk(KERN_ERR
3108 		       "drbd: unable to register block device major %d\n",
3109 		       DRBD_MAJOR);
3110 		return err;
3111 	}
3112 
3113 	register_reboot_notifier(&drbd_notifier);
3114 
3115 	/*
3116 	 * allocate all necessary structs
3117 	 */
3118 	err = -ENOMEM;
3119 
3120 	init_waitqueue_head(&drbd_pp_wait);
3121 
3122 	drbd_proc = NULL; /* play safe for drbd_cleanup */
3123 	minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3124 				GFP_KERNEL);
3125 	if (!minor_table)
3126 		goto Enomem;
3127 
3128 	err = drbd_create_mempools();
3129 	if (err)
3130 		goto Enomem;
3131 
3132 	drbd_proc = proc_create("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops);
3133 	if (!drbd_proc)	{
3134 		printk(KERN_ERR "drbd: unable to register proc file\n");
3135 		goto Enomem;
3136 	}
3137 
3138 	rwlock_init(&global_state_lock);
3139 
3140 	printk(KERN_INFO "drbd: initialized. "
3141 	       "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3142 	       API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3143 	printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3144 	printk(KERN_INFO "drbd: registered as block device major %d\n",
3145 		DRBD_MAJOR);
3146 	printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3147 
3148 	return 0; /* Success! */
3149 
3150 Enomem:
3151 	drbd_cleanup();
3152 	if (err == -ENOMEM)
3153 		/* currently always the case */
3154 		printk(KERN_ERR "drbd: ran out of memory\n");
3155 	else
3156 		printk(KERN_ERR "drbd: initialization failure\n");
3157 	return err;
3158 }
3159 
3160 void drbd_free_bc(struct drbd_backing_dev *ldev)
3161 {
3162 	if (ldev == NULL)
3163 		return;
3164 
3165 	bd_release(ldev->backing_bdev);
3166 	bd_release(ldev->md_bdev);
3167 
3168 	fput(ldev->lo_file);
3169 	fput(ldev->md_file);
3170 
3171 	kfree(ldev);
3172 }
3173 
3174 void drbd_free_sock(struct drbd_conf *mdev)
3175 {
3176 	if (mdev->data.socket) {
3177 		mutex_lock(&mdev->data.mutex);
3178 		kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3179 		sock_release(mdev->data.socket);
3180 		mdev->data.socket = NULL;
3181 		mutex_unlock(&mdev->data.mutex);
3182 	}
3183 	if (mdev->meta.socket) {
3184 		mutex_lock(&mdev->meta.mutex);
3185 		kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3186 		sock_release(mdev->meta.socket);
3187 		mdev->meta.socket = NULL;
3188 		mutex_unlock(&mdev->meta.mutex);
3189 	}
3190 }
3191 
3192 
3193 void drbd_free_resources(struct drbd_conf *mdev)
3194 {
3195 	crypto_free_hash(mdev->csums_tfm);
3196 	mdev->csums_tfm = NULL;
3197 	crypto_free_hash(mdev->verify_tfm);
3198 	mdev->verify_tfm = NULL;
3199 	crypto_free_hash(mdev->cram_hmac_tfm);
3200 	mdev->cram_hmac_tfm = NULL;
3201 	crypto_free_hash(mdev->integrity_w_tfm);
3202 	mdev->integrity_w_tfm = NULL;
3203 	crypto_free_hash(mdev->integrity_r_tfm);
3204 	mdev->integrity_r_tfm = NULL;
3205 
3206 	drbd_free_sock(mdev);
3207 
3208 	__no_warn(local,
3209 		  drbd_free_bc(mdev->ldev);
3210 		  mdev->ldev = NULL;);
3211 }
3212 
3213 /* meta data management */
3214 
3215 struct meta_data_on_disk {
3216 	u64 la_size;           /* last agreed size. */
3217 	u64 uuid[UI_SIZE];   /* UUIDs. */
3218 	u64 device_uuid;
3219 	u64 reserved_u64_1;
3220 	u32 flags;             /* MDF */
3221 	u32 magic;
3222 	u32 md_size_sect;
3223 	u32 al_offset;         /* offset to this block */
3224 	u32 al_nr_extents;     /* important for restoring the AL */
3225 	      /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3226 	u32 bm_offset;         /* offset to the bitmap, from here */
3227 	u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3228 	u32 reserved_u32[4];
3229 
3230 } __packed;
3231 
3232 /**
3233  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3234  * @mdev:	DRBD device.
3235  */
3236 void drbd_md_sync(struct drbd_conf *mdev)
3237 {
3238 	struct meta_data_on_disk *buffer;
3239 	sector_t sector;
3240 	int i;
3241 
3242 	if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3243 		return;
3244 	del_timer(&mdev->md_sync_timer);
3245 
3246 	/* We use here D_FAILED and not D_ATTACHING because we try to write
3247 	 * metadata even if we detach due to a disk failure! */
3248 	if (!get_ldev_if_state(mdev, D_FAILED))
3249 		return;
3250 
3251 	mutex_lock(&mdev->md_io_mutex);
3252 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3253 	memset(buffer, 0, 512);
3254 
3255 	buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3256 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3257 		buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3258 	buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3259 	buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3260 
3261 	buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3262 	buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3263 	buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3264 	buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3265 	buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3266 
3267 	buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3268 
3269 	D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3270 	sector = mdev->ldev->md.md_offset;
3271 
3272 	if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3273 		clear_bit(MD_DIRTY, &mdev->flags);
3274 	} else {
3275 		/* this was a try anyways ... */
3276 		dev_err(DEV, "meta data update failed!\n");
3277 
3278 		drbd_chk_io_error(mdev, 1, TRUE);
3279 	}
3280 
3281 	/* Update mdev->ldev->md.la_size_sect,
3282 	 * since we updated it on metadata. */
3283 	mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3284 
3285 	mutex_unlock(&mdev->md_io_mutex);
3286 	put_ldev(mdev);
3287 }
3288 
3289 /**
3290  * drbd_md_read() - Reads in the meta data super block
3291  * @mdev:	DRBD device.
3292  * @bdev:	Device from which the meta data should be read in.
3293  *
3294  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_codes in case
3295  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3296  */
3297 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3298 {
3299 	struct meta_data_on_disk *buffer;
3300 	int i, rv = NO_ERROR;
3301 
3302 	if (!get_ldev_if_state(mdev, D_ATTACHING))
3303 		return ERR_IO_MD_DISK;
3304 
3305 	mutex_lock(&mdev->md_io_mutex);
3306 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3307 
3308 	if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3309 		/* NOTE: cant do normal error processing here as this is
3310 		   called BEFORE disk is attached */
3311 		dev_err(DEV, "Error while reading metadata.\n");
3312 		rv = ERR_IO_MD_DISK;
3313 		goto err;
3314 	}
3315 
3316 	if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3317 		dev_err(DEV, "Error while reading metadata, magic not found.\n");
3318 		rv = ERR_MD_INVALID;
3319 		goto err;
3320 	}
3321 	if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3322 		dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3323 		    be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3324 		rv = ERR_MD_INVALID;
3325 		goto err;
3326 	}
3327 	if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3328 		dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3329 		    be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3330 		rv = ERR_MD_INVALID;
3331 		goto err;
3332 	}
3333 	if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3334 		dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3335 		    be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3336 		rv = ERR_MD_INVALID;
3337 		goto err;
3338 	}
3339 
3340 	if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3341 		dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3342 		    be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3343 		rv = ERR_MD_INVALID;
3344 		goto err;
3345 	}
3346 
3347 	bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3348 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3349 		bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3350 	bdev->md.flags = be32_to_cpu(buffer->flags);
3351 	mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3352 	bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3353 
3354 	if (mdev->sync_conf.al_extents < 7)
3355 		mdev->sync_conf.al_extents = 127;
3356 
3357  err:
3358 	mutex_unlock(&mdev->md_io_mutex);
3359 	put_ldev(mdev);
3360 
3361 	return rv;
3362 }
3363 
3364 /**
3365  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3366  * @mdev:	DRBD device.
3367  *
3368  * Call this function if you change anything that should be written to
3369  * the meta-data super block. This function sets MD_DIRTY, and starts a
3370  * timer that ensures that within five seconds you have to call drbd_md_sync().
3371  */
3372 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3373 {
3374 	set_bit(MD_DIRTY, &mdev->flags);
3375 	mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3376 }
3377 
3378 
3379 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3380 {
3381 	int i;
3382 
3383 	for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3384 		mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3385 }
3386 
3387 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3388 {
3389 	if (idx == UI_CURRENT) {
3390 		if (mdev->state.role == R_PRIMARY)
3391 			val |= 1;
3392 		else
3393 			val &= ~((u64)1);
3394 
3395 		drbd_set_ed_uuid(mdev, val);
3396 	}
3397 
3398 	mdev->ldev->md.uuid[idx] = val;
3399 	drbd_md_mark_dirty(mdev);
3400 }
3401 
3402 
3403 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3404 {
3405 	if (mdev->ldev->md.uuid[idx]) {
3406 		drbd_uuid_move_history(mdev);
3407 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3408 	}
3409 	_drbd_uuid_set(mdev, idx, val);
3410 }
3411 
3412 /**
3413  * drbd_uuid_new_current() - Creates a new current UUID
3414  * @mdev:	DRBD device.
3415  *
3416  * Creates a new current UUID, and rotates the old current UUID into
3417  * the bitmap slot. Causes an incremental resync upon next connect.
3418  */
3419 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3420 {
3421 	u64 val;
3422 
3423 	dev_info(DEV, "Creating new current UUID\n");
3424 	D_ASSERT(mdev->ldev->md.uuid[UI_BITMAP] == 0);
3425 	mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3426 
3427 	get_random_bytes(&val, sizeof(u64));
3428 	_drbd_uuid_set(mdev, UI_CURRENT, val);
3429 }
3430 
3431 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3432 {
3433 	if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3434 		return;
3435 
3436 	if (val == 0) {
3437 		drbd_uuid_move_history(mdev);
3438 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3439 		mdev->ldev->md.uuid[UI_BITMAP] = 0;
3440 	} else {
3441 		if (mdev->ldev->md.uuid[UI_BITMAP])
3442 			dev_warn(DEV, "bm UUID already set");
3443 
3444 		mdev->ldev->md.uuid[UI_BITMAP] = val;
3445 		mdev->ldev->md.uuid[UI_BITMAP] &= ~((u64)1);
3446 
3447 	}
3448 	drbd_md_mark_dirty(mdev);
3449 }
3450 
3451 /**
3452  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3453  * @mdev:	DRBD device.
3454  *
3455  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3456  */
3457 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3458 {
3459 	int rv = -EIO;
3460 
3461 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
3462 		drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3463 		drbd_md_sync(mdev);
3464 		drbd_bm_set_all(mdev);
3465 
3466 		rv = drbd_bm_write(mdev);
3467 
3468 		if (!rv) {
3469 			drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3470 			drbd_md_sync(mdev);
3471 		}
3472 
3473 		put_ldev(mdev);
3474 	}
3475 
3476 	return rv;
3477 }
3478 
3479 /**
3480  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3481  * @mdev:	DRBD device.
3482  *
3483  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3484  */
3485 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3486 {
3487 	int rv = -EIO;
3488 
3489 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
3490 		drbd_bm_clear_all(mdev);
3491 		rv = drbd_bm_write(mdev);
3492 		put_ldev(mdev);
3493 	}
3494 
3495 	return rv;
3496 }
3497 
3498 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3499 {
3500 	struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3501 	int rv;
3502 
3503 	D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3504 
3505 	drbd_bm_lock(mdev, work->why);
3506 	rv = work->io_fn(mdev);
3507 	drbd_bm_unlock(mdev);
3508 
3509 	clear_bit(BITMAP_IO, &mdev->flags);
3510 	wake_up(&mdev->misc_wait);
3511 
3512 	if (work->done)
3513 		work->done(mdev, rv);
3514 
3515 	clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3516 	work->why = NULL;
3517 
3518 	return 1;
3519 }
3520 
3521 /**
3522  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3523  * @mdev:	DRBD device.
3524  * @io_fn:	IO callback to be called when bitmap IO is possible
3525  * @done:	callback to be called after the bitmap IO was performed
3526  * @why:	Descriptive text of the reason for doing the IO
3527  *
3528  * While IO on the bitmap happens we freeze application IO thus we ensure
3529  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3530  * called from worker context. It MUST NOT be used while a previous such
3531  * work is still pending!
3532  */
3533 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3534 			  int (*io_fn)(struct drbd_conf *),
3535 			  void (*done)(struct drbd_conf *, int),
3536 			  char *why)
3537 {
3538 	D_ASSERT(current == mdev->worker.task);
3539 
3540 	D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3541 	D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3542 	D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3543 	if (mdev->bm_io_work.why)
3544 		dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3545 			why, mdev->bm_io_work.why);
3546 
3547 	mdev->bm_io_work.io_fn = io_fn;
3548 	mdev->bm_io_work.done = done;
3549 	mdev->bm_io_work.why = why;
3550 
3551 	set_bit(BITMAP_IO, &mdev->flags);
3552 	if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3553 		if (list_empty(&mdev->bm_io_work.w.list)) {
3554 			set_bit(BITMAP_IO_QUEUED, &mdev->flags);
3555 			drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
3556 		} else
3557 			dev_err(DEV, "FIXME avoided double queuing bm_io_work\n");
3558 	}
3559 }
3560 
3561 /**
3562  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3563  * @mdev:	DRBD device.
3564  * @io_fn:	IO callback to be called when bitmap IO is possible
3565  * @why:	Descriptive text of the reason for doing the IO
3566  *
3567  * freezes application IO while that the actual IO operations runs. This
3568  * functions MAY NOT be called from worker context.
3569  */
3570 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why)
3571 {
3572 	int rv;
3573 
3574 	D_ASSERT(current != mdev->worker.task);
3575 
3576 	drbd_suspend_io(mdev);
3577 
3578 	drbd_bm_lock(mdev, why);
3579 	rv = io_fn(mdev);
3580 	drbd_bm_unlock(mdev);
3581 
3582 	drbd_resume_io(mdev);
3583 
3584 	return rv;
3585 }
3586 
3587 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3588 {
3589 	if ((mdev->ldev->md.flags & flag) != flag) {
3590 		drbd_md_mark_dirty(mdev);
3591 		mdev->ldev->md.flags |= flag;
3592 	}
3593 }
3594 
3595 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3596 {
3597 	if ((mdev->ldev->md.flags & flag) != 0) {
3598 		drbd_md_mark_dirty(mdev);
3599 		mdev->ldev->md.flags &= ~flag;
3600 	}
3601 }
3602 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3603 {
3604 	return (bdev->md.flags & flag) != 0;
3605 }
3606 
3607 static void md_sync_timer_fn(unsigned long data)
3608 {
3609 	struct drbd_conf *mdev = (struct drbd_conf *) data;
3610 
3611 	drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
3612 }
3613 
3614 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3615 {
3616 	dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3617 	drbd_md_sync(mdev);
3618 
3619 	return 1;
3620 }
3621 
3622 #ifdef CONFIG_DRBD_FAULT_INJECTION
3623 /* Fault insertion support including random number generator shamelessly
3624  * stolen from kernel/rcutorture.c */
3625 struct fault_random_state {
3626 	unsigned long state;
3627 	unsigned long count;
3628 };
3629 
3630 #define FAULT_RANDOM_MULT 39916801  /* prime */
3631 #define FAULT_RANDOM_ADD	479001701 /* prime */
3632 #define FAULT_RANDOM_REFRESH 10000
3633 
3634 /*
3635  * Crude but fast random-number generator.  Uses a linear congruential
3636  * generator, with occasional help from get_random_bytes().
3637  */
3638 static unsigned long
3639 _drbd_fault_random(struct fault_random_state *rsp)
3640 {
3641 	long refresh;
3642 
3643 	if (!rsp->count--) {
3644 		get_random_bytes(&refresh, sizeof(refresh));
3645 		rsp->state += refresh;
3646 		rsp->count = FAULT_RANDOM_REFRESH;
3647 	}
3648 	rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3649 	return swahw32(rsp->state);
3650 }
3651 
3652 static char *
3653 _drbd_fault_str(unsigned int type) {
3654 	static char *_faults[] = {
3655 		[DRBD_FAULT_MD_WR] = "Meta-data write",
3656 		[DRBD_FAULT_MD_RD] = "Meta-data read",
3657 		[DRBD_FAULT_RS_WR] = "Resync write",
3658 		[DRBD_FAULT_RS_RD] = "Resync read",
3659 		[DRBD_FAULT_DT_WR] = "Data write",
3660 		[DRBD_FAULT_DT_RD] = "Data read",
3661 		[DRBD_FAULT_DT_RA] = "Data read ahead",
3662 		[DRBD_FAULT_BM_ALLOC] = "BM allocation",
3663 		[DRBD_FAULT_AL_EE] = "EE allocation"
3664 	};
3665 
3666 	return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3667 }
3668 
3669 unsigned int
3670 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3671 {
3672 	static struct fault_random_state rrs = {0, 0};
3673 
3674 	unsigned int ret = (
3675 		(fault_devs == 0 ||
3676 			((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3677 		(((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3678 
3679 	if (ret) {
3680 		fault_count++;
3681 
3682 		if (printk_ratelimit())
3683 			dev_warn(DEV, "***Simulating %s failure\n",
3684 				_drbd_fault_str(type));
3685 	}
3686 
3687 	return ret;
3688 }
3689 #endif
3690 
3691 const char *drbd_buildtag(void)
3692 {
3693 	/* DRBD built from external sources has here a reference to the
3694 	   git hash of the source code. */
3695 
3696 	static char buildtag[38] = "\0uilt-in";
3697 
3698 	if (buildtag[0] == 0) {
3699 #ifdef CONFIG_MODULES
3700 		if (THIS_MODULE != NULL)
3701 			sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3702 		else
3703 #endif
3704 			buildtag[0] = 'b';
3705 	}
3706 
3707 	return buildtag;
3708 }
3709 
3710 module_init(drbd_init)
3711 module_exit(drbd_cleanup)
3712 
3713 EXPORT_SYMBOL(drbd_conn_str);
3714 EXPORT_SYMBOL(drbd_role_str);
3715 EXPORT_SYMBOL(drbd_disk_str);
3716 EXPORT_SYMBOL(drbd_set_st_err_str);
3717