xref: /linux/drivers/block/drbd/drbd_main.c (revision b3b77c8caef1750ebeea1054e39e358550ea9f55)
1 /*
2    drbd.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12 
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17 
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22 
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 
27  */
28 
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/smp_lock.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48 
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52 
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56 
57 #include "drbd_vli.h"
58 
59 struct after_state_chg_work {
60 	struct drbd_work w;
61 	union drbd_state os;
62 	union drbd_state ns;
63 	enum chg_state_flags flags;
64 	struct completion *done;
65 };
66 
67 int drbdd_init(struct drbd_thread *);
68 int drbd_worker(struct drbd_thread *);
69 int drbd_asender(struct drbd_thread *);
70 
71 int drbd_init(void);
72 static int drbd_open(struct block_device *bdev, fmode_t mode);
73 static int drbd_release(struct gendisk *gd, fmode_t mode);
74 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
75 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
76 			   union drbd_state ns, enum chg_state_flags flags);
77 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
78 static void md_sync_timer_fn(unsigned long data);
79 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
80 
81 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
82 	      "Lars Ellenberg <lars@linbit.com>");
83 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
84 MODULE_VERSION(REL_VERSION);
85 MODULE_LICENSE("GPL");
86 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices (1-255)");
87 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
88 
89 #include <linux/moduleparam.h>
90 /* allow_open_on_secondary */
91 MODULE_PARM_DESC(allow_oos, "DONT USE!");
92 /* thanks to these macros, if compiled into the kernel (not-module),
93  * this becomes the boot parameter drbd.minor_count */
94 module_param(minor_count, uint, 0444);
95 module_param(disable_sendpage, bool, 0644);
96 module_param(allow_oos, bool, 0);
97 module_param(cn_idx, uint, 0444);
98 module_param(proc_details, int, 0644);
99 
100 #ifdef CONFIG_DRBD_FAULT_INJECTION
101 int enable_faults;
102 int fault_rate;
103 static int fault_count;
104 int fault_devs;
105 /* bitmap of enabled faults */
106 module_param(enable_faults, int, 0664);
107 /* fault rate % value - applies to all enabled faults */
108 module_param(fault_rate, int, 0664);
109 /* count of faults inserted */
110 module_param(fault_count, int, 0664);
111 /* bitmap of devices to insert faults on */
112 module_param(fault_devs, int, 0644);
113 #endif
114 
115 /* module parameter, defined */
116 unsigned int minor_count = 32;
117 int disable_sendpage;
118 int allow_oos;
119 unsigned int cn_idx = CN_IDX_DRBD;
120 int proc_details;       /* Detail level in proc drbd*/
121 
122 /* Module parameter for setting the user mode helper program
123  * to run. Default is /sbin/drbdadm */
124 char usermode_helper[80] = "/sbin/drbdadm";
125 
126 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
127 
128 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
129  * as member "struct gendisk *vdisk;"
130  */
131 struct drbd_conf **minor_table;
132 
133 struct kmem_cache *drbd_request_cache;
134 struct kmem_cache *drbd_ee_cache;	/* epoch entries */
135 struct kmem_cache *drbd_bm_ext_cache;	/* bitmap extents */
136 struct kmem_cache *drbd_al_ext_cache;	/* activity log extents */
137 mempool_t *drbd_request_mempool;
138 mempool_t *drbd_ee_mempool;
139 
140 /* I do not use a standard mempool, because:
141    1) I want to hand out the pre-allocated objects first.
142    2) I want to be able to interrupt sleeping allocation with a signal.
143    Note: This is a single linked list, the next pointer is the private
144 	 member of struct page.
145  */
146 struct page *drbd_pp_pool;
147 spinlock_t   drbd_pp_lock;
148 int          drbd_pp_vacant;
149 wait_queue_head_t drbd_pp_wait;
150 
151 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
152 
153 static const struct block_device_operations drbd_ops = {
154 	.owner =   THIS_MODULE,
155 	.open =    drbd_open,
156 	.release = drbd_release,
157 };
158 
159 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
160 
161 #ifdef __CHECKER__
162 /* When checking with sparse, and this is an inline function, sparse will
163    give tons of false positives. When this is a real functions sparse works.
164  */
165 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
166 {
167 	int io_allowed;
168 
169 	atomic_inc(&mdev->local_cnt);
170 	io_allowed = (mdev->state.disk >= mins);
171 	if (!io_allowed) {
172 		if (atomic_dec_and_test(&mdev->local_cnt))
173 			wake_up(&mdev->misc_wait);
174 	}
175 	return io_allowed;
176 }
177 
178 #endif
179 
180 /**
181  * DOC: The transfer log
182  *
183  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
184  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
185  * of the list. There is always at least one &struct drbd_tl_epoch object.
186  *
187  * Each &struct drbd_tl_epoch has a circular double linked list of requests
188  * attached.
189  */
190 static int tl_init(struct drbd_conf *mdev)
191 {
192 	struct drbd_tl_epoch *b;
193 
194 	/* during device minor initialization, we may well use GFP_KERNEL */
195 	b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
196 	if (!b)
197 		return 0;
198 	INIT_LIST_HEAD(&b->requests);
199 	INIT_LIST_HEAD(&b->w.list);
200 	b->next = NULL;
201 	b->br_number = 4711;
202 	b->n_req = 0;
203 	b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
204 
205 	mdev->oldest_tle = b;
206 	mdev->newest_tle = b;
207 	INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
208 
209 	mdev->tl_hash = NULL;
210 	mdev->tl_hash_s = 0;
211 
212 	return 1;
213 }
214 
215 static void tl_cleanup(struct drbd_conf *mdev)
216 {
217 	D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
218 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
219 	kfree(mdev->oldest_tle);
220 	mdev->oldest_tle = NULL;
221 	kfree(mdev->unused_spare_tle);
222 	mdev->unused_spare_tle = NULL;
223 	kfree(mdev->tl_hash);
224 	mdev->tl_hash = NULL;
225 	mdev->tl_hash_s = 0;
226 }
227 
228 /**
229  * _tl_add_barrier() - Adds a barrier to the transfer log
230  * @mdev:	DRBD device.
231  * @new:	Barrier to be added before the current head of the TL.
232  *
233  * The caller must hold the req_lock.
234  */
235 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
236 {
237 	struct drbd_tl_epoch *newest_before;
238 
239 	INIT_LIST_HEAD(&new->requests);
240 	INIT_LIST_HEAD(&new->w.list);
241 	new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
242 	new->next = NULL;
243 	new->n_req = 0;
244 
245 	newest_before = mdev->newest_tle;
246 	/* never send a barrier number == 0, because that is special-cased
247 	 * when using TCQ for our write ordering code */
248 	new->br_number = (newest_before->br_number+1) ?: 1;
249 	if (mdev->newest_tle != new) {
250 		mdev->newest_tle->next = new;
251 		mdev->newest_tle = new;
252 	}
253 }
254 
255 /**
256  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
257  * @mdev:	DRBD device.
258  * @barrier_nr:	Expected identifier of the DRBD write barrier packet.
259  * @set_size:	Expected number of requests before that barrier.
260  *
261  * In case the passed barrier_nr or set_size does not match the oldest
262  * &struct drbd_tl_epoch objects this function will cause a termination
263  * of the connection.
264  */
265 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
266 		       unsigned int set_size)
267 {
268 	struct drbd_tl_epoch *b, *nob; /* next old barrier */
269 	struct list_head *le, *tle;
270 	struct drbd_request *r;
271 
272 	spin_lock_irq(&mdev->req_lock);
273 
274 	b = mdev->oldest_tle;
275 
276 	/* first some paranoia code */
277 	if (b == NULL) {
278 		dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
279 			barrier_nr);
280 		goto bail;
281 	}
282 	if (b->br_number != barrier_nr) {
283 		dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
284 			barrier_nr, b->br_number);
285 		goto bail;
286 	}
287 	if (b->n_req != set_size) {
288 		dev_err(DEV, "BAD! BarrierAck #%u received with n_req=%u, expected n_req=%u!\n",
289 			barrier_nr, set_size, b->n_req);
290 		goto bail;
291 	}
292 
293 	/* Clean up list of requests processed during current epoch */
294 	list_for_each_safe(le, tle, &b->requests) {
295 		r = list_entry(le, struct drbd_request, tl_requests);
296 		_req_mod(r, barrier_acked);
297 	}
298 	/* There could be requests on the list waiting for completion
299 	   of the write to the local disk. To avoid corruptions of
300 	   slab's data structures we have to remove the lists head.
301 
302 	   Also there could have been a barrier ack out of sequence, overtaking
303 	   the write acks - which would be a bug and violating write ordering.
304 	   To not deadlock in case we lose connection while such requests are
305 	   still pending, we need some way to find them for the
306 	   _req_mode(connection_lost_while_pending).
307 
308 	   These have been list_move'd to the out_of_sequence_requests list in
309 	   _req_mod(, barrier_acked) above.
310 	   */
311 	list_del_init(&b->requests);
312 
313 	nob = b->next;
314 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
315 		_tl_add_barrier(mdev, b);
316 		if (nob)
317 			mdev->oldest_tle = nob;
318 		/* if nob == NULL b was the only barrier, and becomes the new
319 		   barrier. Therefore mdev->oldest_tle points already to b */
320 	} else {
321 		D_ASSERT(nob != NULL);
322 		mdev->oldest_tle = nob;
323 		kfree(b);
324 	}
325 
326 	spin_unlock_irq(&mdev->req_lock);
327 	dec_ap_pending(mdev);
328 
329 	return;
330 
331 bail:
332 	spin_unlock_irq(&mdev->req_lock);
333 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
334 }
335 
336 
337 /**
338  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
339  * @mdev:	DRBD device.
340  *
341  * This is called after the connection to the peer was lost. The storage covered
342  * by the requests on the transfer gets marked as our of sync. Called from the
343  * receiver thread and the worker thread.
344  */
345 void tl_clear(struct drbd_conf *mdev)
346 {
347 	struct drbd_tl_epoch *b, *tmp;
348 	struct list_head *le, *tle;
349 	struct drbd_request *r;
350 	int new_initial_bnr = net_random();
351 
352 	spin_lock_irq(&mdev->req_lock);
353 
354 	b = mdev->oldest_tle;
355 	while (b) {
356 		list_for_each_safe(le, tle, &b->requests) {
357 			r = list_entry(le, struct drbd_request, tl_requests);
358 			/* It would be nice to complete outside of spinlock.
359 			 * But this is easier for now. */
360 			_req_mod(r, connection_lost_while_pending);
361 		}
362 		tmp = b->next;
363 
364 		/* there could still be requests on that ring list,
365 		 * in case local io is still pending */
366 		list_del(&b->requests);
367 
368 		/* dec_ap_pending corresponding to queue_barrier.
369 		 * the newest barrier may not have been queued yet,
370 		 * in which case w.cb is still NULL. */
371 		if (b->w.cb != NULL)
372 			dec_ap_pending(mdev);
373 
374 		if (b == mdev->newest_tle) {
375 			/* recycle, but reinit! */
376 			D_ASSERT(tmp == NULL);
377 			INIT_LIST_HEAD(&b->requests);
378 			INIT_LIST_HEAD(&b->w.list);
379 			b->w.cb = NULL;
380 			b->br_number = new_initial_bnr;
381 			b->n_req = 0;
382 
383 			mdev->oldest_tle = b;
384 			break;
385 		}
386 		kfree(b);
387 		b = tmp;
388 	}
389 
390 	/* we expect this list to be empty. */
391 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
392 
393 	/* but just in case, clean it up anyways! */
394 	list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
395 		r = list_entry(le, struct drbd_request, tl_requests);
396 		/* It would be nice to complete outside of spinlock.
397 		 * But this is easier for now. */
398 		_req_mod(r, connection_lost_while_pending);
399 	}
400 
401 	/* ensure bit indicating barrier is required is clear */
402 	clear_bit(CREATE_BARRIER, &mdev->flags);
403 
404 	spin_unlock_irq(&mdev->req_lock);
405 }
406 
407 /**
408  * cl_wide_st_chg() - TRUE if the state change is a cluster wide one
409  * @mdev:	DRBD device.
410  * @os:		old (current) state.
411  * @ns:		new (wanted) state.
412  */
413 static int cl_wide_st_chg(struct drbd_conf *mdev,
414 			  union drbd_state os, union drbd_state ns)
415 {
416 	return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
417 		 ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
418 		  (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
419 		  (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
420 		  (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))) ||
421 		(os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
422 		(os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
423 }
424 
425 int drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
426 		      union drbd_state mask, union drbd_state val)
427 {
428 	unsigned long flags;
429 	union drbd_state os, ns;
430 	int rv;
431 
432 	spin_lock_irqsave(&mdev->req_lock, flags);
433 	os = mdev->state;
434 	ns.i = (os.i & ~mask.i) | val.i;
435 	rv = _drbd_set_state(mdev, ns, f, NULL);
436 	ns = mdev->state;
437 	spin_unlock_irqrestore(&mdev->req_lock, flags);
438 
439 	return rv;
440 }
441 
442 /**
443  * drbd_force_state() - Impose a change which happens outside our control on our state
444  * @mdev:	DRBD device.
445  * @mask:	mask of state bits to change.
446  * @val:	value of new state bits.
447  */
448 void drbd_force_state(struct drbd_conf *mdev,
449 	union drbd_state mask, union drbd_state val)
450 {
451 	drbd_change_state(mdev, CS_HARD, mask, val);
452 }
453 
454 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns);
455 static int is_valid_state_transition(struct drbd_conf *,
456 				     union drbd_state, union drbd_state);
457 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
458 				       union drbd_state ns, int *warn_sync_abort);
459 int drbd_send_state_req(struct drbd_conf *,
460 			union drbd_state, union drbd_state);
461 
462 static enum drbd_state_ret_codes _req_st_cond(struct drbd_conf *mdev,
463 				    union drbd_state mask, union drbd_state val)
464 {
465 	union drbd_state os, ns;
466 	unsigned long flags;
467 	int rv;
468 
469 	if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
470 		return SS_CW_SUCCESS;
471 
472 	if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
473 		return SS_CW_FAILED_BY_PEER;
474 
475 	rv = 0;
476 	spin_lock_irqsave(&mdev->req_lock, flags);
477 	os = mdev->state;
478 	ns.i = (os.i & ~mask.i) | val.i;
479 	ns = sanitize_state(mdev, os, ns, NULL);
480 
481 	if (!cl_wide_st_chg(mdev, os, ns))
482 		rv = SS_CW_NO_NEED;
483 	if (!rv) {
484 		rv = is_valid_state(mdev, ns);
485 		if (rv == SS_SUCCESS) {
486 			rv = is_valid_state_transition(mdev, ns, os);
487 			if (rv == SS_SUCCESS)
488 				rv = 0; /* cont waiting, otherwise fail. */
489 		}
490 	}
491 	spin_unlock_irqrestore(&mdev->req_lock, flags);
492 
493 	return rv;
494 }
495 
496 /**
497  * drbd_req_state() - Perform an eventually cluster wide state change
498  * @mdev:	DRBD device.
499  * @mask:	mask of state bits to change.
500  * @val:	value of new state bits.
501  * @f:		flags
502  *
503  * Should not be called directly, use drbd_request_state() or
504  * _drbd_request_state().
505  */
506 static int drbd_req_state(struct drbd_conf *mdev,
507 			  union drbd_state mask, union drbd_state val,
508 			  enum chg_state_flags f)
509 {
510 	struct completion done;
511 	unsigned long flags;
512 	union drbd_state os, ns;
513 	int rv;
514 
515 	init_completion(&done);
516 
517 	if (f & CS_SERIALIZE)
518 		mutex_lock(&mdev->state_mutex);
519 
520 	spin_lock_irqsave(&mdev->req_lock, flags);
521 	os = mdev->state;
522 	ns.i = (os.i & ~mask.i) | val.i;
523 	ns = sanitize_state(mdev, os, ns, NULL);
524 
525 	if (cl_wide_st_chg(mdev, os, ns)) {
526 		rv = is_valid_state(mdev, ns);
527 		if (rv == SS_SUCCESS)
528 			rv = is_valid_state_transition(mdev, ns, os);
529 		spin_unlock_irqrestore(&mdev->req_lock, flags);
530 
531 		if (rv < SS_SUCCESS) {
532 			if (f & CS_VERBOSE)
533 				print_st_err(mdev, os, ns, rv);
534 			goto abort;
535 		}
536 
537 		drbd_state_lock(mdev);
538 		if (!drbd_send_state_req(mdev, mask, val)) {
539 			drbd_state_unlock(mdev);
540 			rv = SS_CW_FAILED_BY_PEER;
541 			if (f & CS_VERBOSE)
542 				print_st_err(mdev, os, ns, rv);
543 			goto abort;
544 		}
545 
546 		wait_event(mdev->state_wait,
547 			(rv = _req_st_cond(mdev, mask, val)));
548 
549 		if (rv < SS_SUCCESS) {
550 			drbd_state_unlock(mdev);
551 			if (f & CS_VERBOSE)
552 				print_st_err(mdev, os, ns, rv);
553 			goto abort;
554 		}
555 		spin_lock_irqsave(&mdev->req_lock, flags);
556 		os = mdev->state;
557 		ns.i = (os.i & ~mask.i) | val.i;
558 		rv = _drbd_set_state(mdev, ns, f, &done);
559 		drbd_state_unlock(mdev);
560 	} else {
561 		rv = _drbd_set_state(mdev, ns, f, &done);
562 	}
563 
564 	spin_unlock_irqrestore(&mdev->req_lock, flags);
565 
566 	if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
567 		D_ASSERT(current != mdev->worker.task);
568 		wait_for_completion(&done);
569 	}
570 
571 abort:
572 	if (f & CS_SERIALIZE)
573 		mutex_unlock(&mdev->state_mutex);
574 
575 	return rv;
576 }
577 
578 /**
579  * _drbd_request_state() - Request a state change (with flags)
580  * @mdev:	DRBD device.
581  * @mask:	mask of state bits to change.
582  * @val:	value of new state bits.
583  * @f:		flags
584  *
585  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
586  * flag, or when logging of failed state change requests is not desired.
587  */
588 int _drbd_request_state(struct drbd_conf *mdev,	union drbd_state mask,
589 			union drbd_state val,	enum chg_state_flags f)
590 {
591 	int rv;
592 
593 	wait_event(mdev->state_wait,
594 		   (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
595 
596 	return rv;
597 }
598 
599 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
600 {
601 	dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
602 	    name,
603 	    drbd_conn_str(ns.conn),
604 	    drbd_role_str(ns.role),
605 	    drbd_role_str(ns.peer),
606 	    drbd_disk_str(ns.disk),
607 	    drbd_disk_str(ns.pdsk),
608 	    ns.susp ? 's' : 'r',
609 	    ns.aftr_isp ? 'a' : '-',
610 	    ns.peer_isp ? 'p' : '-',
611 	    ns.user_isp ? 'u' : '-'
612 	    );
613 }
614 
615 void print_st_err(struct drbd_conf *mdev,
616 	union drbd_state os, union drbd_state ns, int err)
617 {
618 	if (err == SS_IN_TRANSIENT_STATE)
619 		return;
620 	dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
621 	print_st(mdev, " state", os);
622 	print_st(mdev, "wanted", ns);
623 }
624 
625 
626 #define drbd_peer_str drbd_role_str
627 #define drbd_pdsk_str drbd_disk_str
628 
629 #define drbd_susp_str(A)     ((A) ? "1" : "0")
630 #define drbd_aftr_isp_str(A) ((A) ? "1" : "0")
631 #define drbd_peer_isp_str(A) ((A) ? "1" : "0")
632 #define drbd_user_isp_str(A) ((A) ? "1" : "0")
633 
634 #define PSC(A) \
635 	({ if (ns.A != os.A) { \
636 		pbp += sprintf(pbp, #A "( %s -> %s ) ", \
637 			      drbd_##A##_str(os.A), \
638 			      drbd_##A##_str(ns.A)); \
639 	} })
640 
641 /**
642  * is_valid_state() - Returns an SS_ error code if ns is not valid
643  * @mdev:	DRBD device.
644  * @ns:		State to consider.
645  */
646 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
647 {
648 	/* See drbd_state_sw_errors in drbd_strings.c */
649 
650 	enum drbd_fencing_p fp;
651 	int rv = SS_SUCCESS;
652 
653 	fp = FP_DONT_CARE;
654 	if (get_ldev(mdev)) {
655 		fp = mdev->ldev->dc.fencing;
656 		put_ldev(mdev);
657 	}
658 
659 	if (get_net_conf(mdev)) {
660 		if (!mdev->net_conf->two_primaries &&
661 		    ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
662 			rv = SS_TWO_PRIMARIES;
663 		put_net_conf(mdev);
664 	}
665 
666 	if (rv <= 0)
667 		/* already found a reason to abort */;
668 	else if (ns.role == R_SECONDARY && mdev->open_cnt)
669 		rv = SS_DEVICE_IN_USE;
670 
671 	else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
672 		rv = SS_NO_UP_TO_DATE_DISK;
673 
674 	else if (fp >= FP_RESOURCE &&
675 		 ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
676 		rv = SS_PRIMARY_NOP;
677 
678 	else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
679 		rv = SS_NO_UP_TO_DATE_DISK;
680 
681 	else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
682 		rv = SS_NO_LOCAL_DISK;
683 
684 	else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
685 		rv = SS_NO_REMOTE_DISK;
686 
687 	else if (ns.conn > C_CONNECTED && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE)
688 		rv = SS_NO_UP_TO_DATE_DISK;
689 
690 	else if ((ns.conn == C_CONNECTED ||
691 		  ns.conn == C_WF_BITMAP_S ||
692 		  ns.conn == C_SYNC_SOURCE ||
693 		  ns.conn == C_PAUSED_SYNC_S) &&
694 		  ns.disk == D_OUTDATED)
695 		rv = SS_CONNECTED_OUTDATES;
696 
697 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
698 		 (mdev->sync_conf.verify_alg[0] == 0))
699 		rv = SS_NO_VERIFY_ALG;
700 
701 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
702 		  mdev->agreed_pro_version < 88)
703 		rv = SS_NOT_SUPPORTED;
704 
705 	return rv;
706 }
707 
708 /**
709  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
710  * @mdev:	DRBD device.
711  * @ns:		new state.
712  * @os:		old state.
713  */
714 static int is_valid_state_transition(struct drbd_conf *mdev,
715 				     union drbd_state ns, union drbd_state os)
716 {
717 	int rv = SS_SUCCESS;
718 
719 	if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
720 	    os.conn > C_CONNECTED)
721 		rv = SS_RESYNC_RUNNING;
722 
723 	if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
724 		rv = SS_ALREADY_STANDALONE;
725 
726 	if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
727 		rv = SS_IS_DISKLESS;
728 
729 	if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
730 		rv = SS_NO_NET_CONFIG;
731 
732 	if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
733 		rv = SS_LOWER_THAN_OUTDATED;
734 
735 	if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
736 		rv = SS_IN_TRANSIENT_STATE;
737 
738 	if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
739 		rv = SS_IN_TRANSIENT_STATE;
740 
741 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
742 		rv = SS_NEED_CONNECTION;
743 
744 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
745 	    ns.conn != os.conn && os.conn > C_CONNECTED)
746 		rv = SS_RESYNC_RUNNING;
747 
748 	if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
749 	    os.conn < C_CONNECTED)
750 		rv = SS_NEED_CONNECTION;
751 
752 	return rv;
753 }
754 
755 /**
756  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
757  * @mdev:	DRBD device.
758  * @os:		old state.
759  * @ns:		new state.
760  * @warn_sync_abort:
761  *
762  * When we loose connection, we have to set the state of the peers disk (pdsk)
763  * to D_UNKNOWN. This rule and many more along those lines are in this function.
764  */
765 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
766 				       union drbd_state ns, int *warn_sync_abort)
767 {
768 	enum drbd_fencing_p fp;
769 
770 	fp = FP_DONT_CARE;
771 	if (get_ldev(mdev)) {
772 		fp = mdev->ldev->dc.fencing;
773 		put_ldev(mdev);
774 	}
775 
776 	/* Disallow Network errors to configure a device's network part */
777 	if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
778 	    os.conn <= C_DISCONNECTING)
779 		ns.conn = os.conn;
780 
781 	/* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow */
782 	if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
783 	    ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING)
784 		ns.conn = os.conn;
785 
786 	/* After C_DISCONNECTING only C_STANDALONE may follow */
787 	if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
788 		ns.conn = os.conn;
789 
790 	if (ns.conn < C_CONNECTED) {
791 		ns.peer_isp = 0;
792 		ns.peer = R_UNKNOWN;
793 		if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
794 			ns.pdsk = D_UNKNOWN;
795 	}
796 
797 	/* Clear the aftr_isp when becoming unconfigured */
798 	if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
799 		ns.aftr_isp = 0;
800 
801 	if (ns.conn <= C_DISCONNECTING && ns.disk == D_DISKLESS)
802 		ns.pdsk = D_UNKNOWN;
803 
804 	/* Abort resync if a disk fails/detaches */
805 	if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
806 	    (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
807 		if (warn_sync_abort)
808 			*warn_sync_abort = 1;
809 		ns.conn = C_CONNECTED;
810 	}
811 
812 	if (ns.conn >= C_CONNECTED &&
813 	    ((ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED) ||
814 	     (ns.disk == D_NEGOTIATING && ns.conn == C_WF_BITMAP_T))) {
815 		switch (ns.conn) {
816 		case C_WF_BITMAP_T:
817 		case C_PAUSED_SYNC_T:
818 			ns.disk = D_OUTDATED;
819 			break;
820 		case C_CONNECTED:
821 		case C_WF_BITMAP_S:
822 		case C_SYNC_SOURCE:
823 		case C_PAUSED_SYNC_S:
824 			ns.disk = D_UP_TO_DATE;
825 			break;
826 		case C_SYNC_TARGET:
827 			ns.disk = D_INCONSISTENT;
828 			dev_warn(DEV, "Implicitly set disk state Inconsistent!\n");
829 			break;
830 		}
831 		if (os.disk == D_OUTDATED && ns.disk == D_UP_TO_DATE)
832 			dev_warn(DEV, "Implicitly set disk from Outdated to UpToDate\n");
833 	}
834 
835 	if (ns.conn >= C_CONNECTED &&
836 	    (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)) {
837 		switch (ns.conn) {
838 		case C_CONNECTED:
839 		case C_WF_BITMAP_T:
840 		case C_PAUSED_SYNC_T:
841 		case C_SYNC_TARGET:
842 			ns.pdsk = D_UP_TO_DATE;
843 			break;
844 		case C_WF_BITMAP_S:
845 		case C_PAUSED_SYNC_S:
846 			/* remap any consistent state to D_OUTDATED,
847 			 * but disallow "upgrade" of not even consistent states.
848 			 */
849 			ns.pdsk =
850 				(D_DISKLESS < os.pdsk && os.pdsk < D_OUTDATED)
851 				? os.pdsk : D_OUTDATED;
852 			break;
853 		case C_SYNC_SOURCE:
854 			ns.pdsk = D_INCONSISTENT;
855 			dev_warn(DEV, "Implicitly set pdsk Inconsistent!\n");
856 			break;
857 		}
858 		if (os.pdsk == D_OUTDATED && ns.pdsk == D_UP_TO_DATE)
859 			dev_warn(DEV, "Implicitly set pdsk from Outdated to UpToDate\n");
860 	}
861 
862 	/* Connection breaks down before we finished "Negotiating" */
863 	if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
864 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
865 		if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
866 			ns.disk = mdev->new_state_tmp.disk;
867 			ns.pdsk = mdev->new_state_tmp.pdsk;
868 		} else {
869 			dev_alert(DEV, "Connection lost while negotiating, no data!\n");
870 			ns.disk = D_DISKLESS;
871 			ns.pdsk = D_UNKNOWN;
872 		}
873 		put_ldev(mdev);
874 	}
875 
876 	if (fp == FP_STONITH &&
877 	    (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
878 	    !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
879 		ns.susp = 1;
880 
881 	if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
882 		if (ns.conn == C_SYNC_SOURCE)
883 			ns.conn = C_PAUSED_SYNC_S;
884 		if (ns.conn == C_SYNC_TARGET)
885 			ns.conn = C_PAUSED_SYNC_T;
886 	} else {
887 		if (ns.conn == C_PAUSED_SYNC_S)
888 			ns.conn = C_SYNC_SOURCE;
889 		if (ns.conn == C_PAUSED_SYNC_T)
890 			ns.conn = C_SYNC_TARGET;
891 	}
892 
893 	return ns;
894 }
895 
896 /* helper for __drbd_set_state */
897 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
898 {
899 	if (cs == C_VERIFY_T) {
900 		/* starting online verify from an arbitrary position
901 		 * does not fit well into the existing protocol.
902 		 * on C_VERIFY_T, we initialize ov_left and friends
903 		 * implicitly in receive_DataRequest once the
904 		 * first P_OV_REQUEST is received */
905 		mdev->ov_start_sector = ~(sector_t)0;
906 	} else {
907 		unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
908 		if (bit >= mdev->rs_total)
909 			mdev->ov_start_sector =
910 				BM_BIT_TO_SECT(mdev->rs_total - 1);
911 		mdev->ov_position = mdev->ov_start_sector;
912 	}
913 }
914 
915 /**
916  * __drbd_set_state() - Set a new DRBD state
917  * @mdev:	DRBD device.
918  * @ns:		new state.
919  * @flags:	Flags
920  * @done:	Optional completion, that will get completed after the after_state_ch() finished
921  *
922  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
923  */
924 int __drbd_set_state(struct drbd_conf *mdev,
925 		    union drbd_state ns, enum chg_state_flags flags,
926 		    struct completion *done)
927 {
928 	union drbd_state os;
929 	int rv = SS_SUCCESS;
930 	int warn_sync_abort = 0;
931 	struct after_state_chg_work *ascw;
932 
933 	os = mdev->state;
934 
935 	ns = sanitize_state(mdev, os, ns, &warn_sync_abort);
936 
937 	if (ns.i == os.i)
938 		return SS_NOTHING_TO_DO;
939 
940 	if (!(flags & CS_HARD)) {
941 		/*  pre-state-change checks ; only look at ns  */
942 		/* See drbd_state_sw_errors in drbd_strings.c */
943 
944 		rv = is_valid_state(mdev, ns);
945 		if (rv < SS_SUCCESS) {
946 			/* If the old state was illegal as well, then let
947 			   this happen...*/
948 
949 			if (is_valid_state(mdev, os) == rv) {
950 				dev_err(DEV, "Considering state change from bad state. "
951 				    "Error would be: '%s'\n",
952 				    drbd_set_st_err_str(rv));
953 				print_st(mdev, "old", os);
954 				print_st(mdev, "new", ns);
955 				rv = is_valid_state_transition(mdev, ns, os);
956 			}
957 		} else
958 			rv = is_valid_state_transition(mdev, ns, os);
959 	}
960 
961 	if (rv < SS_SUCCESS) {
962 		if (flags & CS_VERBOSE)
963 			print_st_err(mdev, os, ns, rv);
964 		return rv;
965 	}
966 
967 	if (warn_sync_abort)
968 		dev_warn(DEV, "Resync aborted.\n");
969 
970 	{
971 		char *pbp, pb[300];
972 		pbp = pb;
973 		*pbp = 0;
974 		PSC(role);
975 		PSC(peer);
976 		PSC(conn);
977 		PSC(disk);
978 		PSC(pdsk);
979 		PSC(susp);
980 		PSC(aftr_isp);
981 		PSC(peer_isp);
982 		PSC(user_isp);
983 		dev_info(DEV, "%s\n", pb);
984 	}
985 
986 	/* solve the race between becoming unconfigured,
987 	 * worker doing the cleanup, and
988 	 * admin reconfiguring us:
989 	 * on (re)configure, first set CONFIG_PENDING,
990 	 * then wait for a potentially exiting worker,
991 	 * start the worker, and schedule one no_op.
992 	 * then proceed with configuration.
993 	 */
994 	if (ns.disk == D_DISKLESS &&
995 	    ns.conn == C_STANDALONE &&
996 	    ns.role == R_SECONDARY &&
997 	    !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
998 		set_bit(DEVICE_DYING, &mdev->flags);
999 
1000 	mdev->state.i = ns.i;
1001 	wake_up(&mdev->misc_wait);
1002 	wake_up(&mdev->state_wait);
1003 
1004 	/*   post-state-change actions   */
1005 	if (os.conn >= C_SYNC_SOURCE   && ns.conn <= C_CONNECTED) {
1006 		set_bit(STOP_SYNC_TIMER, &mdev->flags);
1007 		mod_timer(&mdev->resync_timer, jiffies);
1008 	}
1009 
1010 	/* aborted verify run. log the last position */
1011 	if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1012 	    ns.conn < C_CONNECTED) {
1013 		mdev->ov_start_sector =
1014 			BM_BIT_TO_SECT(mdev->rs_total - mdev->ov_left);
1015 		dev_info(DEV, "Online Verify reached sector %llu\n",
1016 			(unsigned long long)mdev->ov_start_sector);
1017 	}
1018 
1019 	if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1020 	    (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1021 		dev_info(DEV, "Syncer continues.\n");
1022 		mdev->rs_paused += (long)jiffies-(long)mdev->rs_mark_time;
1023 		if (ns.conn == C_SYNC_TARGET) {
1024 			if (!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))
1025 				mod_timer(&mdev->resync_timer, jiffies);
1026 			/* This if (!test_bit) is only needed for the case
1027 			   that a device that has ceased to used its timer,
1028 			   i.e. it is already in drbd_resync_finished() gets
1029 			   paused and resumed. */
1030 		}
1031 	}
1032 
1033 	if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1034 	    (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1035 		dev_info(DEV, "Resync suspended\n");
1036 		mdev->rs_mark_time = jiffies;
1037 		if (ns.conn == C_PAUSED_SYNC_T)
1038 			set_bit(STOP_SYNC_TIMER, &mdev->flags);
1039 	}
1040 
1041 	if (os.conn == C_CONNECTED &&
1042 	    (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1043 		mdev->ov_position = 0;
1044 		mdev->rs_total =
1045 		mdev->rs_mark_left = drbd_bm_bits(mdev);
1046 		if (mdev->agreed_pro_version >= 90)
1047 			set_ov_position(mdev, ns.conn);
1048 		else
1049 			mdev->ov_start_sector = 0;
1050 		mdev->ov_left = mdev->rs_total
1051 			      - BM_SECT_TO_BIT(mdev->ov_position);
1052 		mdev->rs_start     =
1053 		mdev->rs_mark_time = jiffies;
1054 		mdev->ov_last_oos_size = 0;
1055 		mdev->ov_last_oos_start = 0;
1056 
1057 		if (ns.conn == C_VERIFY_S) {
1058 			dev_info(DEV, "Starting Online Verify from sector %llu\n",
1059 					(unsigned long long)mdev->ov_position);
1060 			mod_timer(&mdev->resync_timer, jiffies);
1061 		}
1062 	}
1063 
1064 	if (get_ldev(mdev)) {
1065 		u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1066 						 MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1067 						 MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1068 
1069 		if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1070 			mdf |= MDF_CRASHED_PRIMARY;
1071 		if (mdev->state.role == R_PRIMARY ||
1072 		    (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1073 			mdf |= MDF_PRIMARY_IND;
1074 		if (mdev->state.conn > C_WF_REPORT_PARAMS)
1075 			mdf |= MDF_CONNECTED_IND;
1076 		if (mdev->state.disk > D_INCONSISTENT)
1077 			mdf |= MDF_CONSISTENT;
1078 		if (mdev->state.disk > D_OUTDATED)
1079 			mdf |= MDF_WAS_UP_TO_DATE;
1080 		if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1081 			mdf |= MDF_PEER_OUT_DATED;
1082 		if (mdf != mdev->ldev->md.flags) {
1083 			mdev->ldev->md.flags = mdf;
1084 			drbd_md_mark_dirty(mdev);
1085 		}
1086 		if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1087 			drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1088 		put_ldev(mdev);
1089 	}
1090 
1091 	/* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1092 	if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1093 	    os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1094 		set_bit(CONSIDER_RESYNC, &mdev->flags);
1095 
1096 	/* Receiver should clean up itself */
1097 	if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1098 		drbd_thread_stop_nowait(&mdev->receiver);
1099 
1100 	/* Now the receiver finished cleaning up itself, it should die */
1101 	if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1102 		drbd_thread_stop_nowait(&mdev->receiver);
1103 
1104 	/* Upon network failure, we need to restart the receiver. */
1105 	if (os.conn > C_TEAR_DOWN &&
1106 	    ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1107 		drbd_thread_restart_nowait(&mdev->receiver);
1108 
1109 	ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1110 	if (ascw) {
1111 		ascw->os = os;
1112 		ascw->ns = ns;
1113 		ascw->flags = flags;
1114 		ascw->w.cb = w_after_state_ch;
1115 		ascw->done = done;
1116 		drbd_queue_work(&mdev->data.work, &ascw->w);
1117 	} else {
1118 		dev_warn(DEV, "Could not kmalloc an ascw\n");
1119 	}
1120 
1121 	return rv;
1122 }
1123 
1124 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1125 {
1126 	struct after_state_chg_work *ascw =
1127 		container_of(w, struct after_state_chg_work, w);
1128 	after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1129 	if (ascw->flags & CS_WAIT_COMPLETE) {
1130 		D_ASSERT(ascw->done != NULL);
1131 		complete(ascw->done);
1132 	}
1133 	kfree(ascw);
1134 
1135 	return 1;
1136 }
1137 
1138 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1139 {
1140 	if (rv) {
1141 		dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1142 		_drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1143 		return;
1144 	}
1145 
1146 	switch (mdev->state.conn) {
1147 	case C_STARTING_SYNC_T:
1148 		_drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1149 		break;
1150 	case C_STARTING_SYNC_S:
1151 		drbd_start_resync(mdev, C_SYNC_SOURCE);
1152 		break;
1153 	}
1154 }
1155 
1156 /**
1157  * after_state_ch() - Perform after state change actions that may sleep
1158  * @mdev:	DRBD device.
1159  * @os:		old state.
1160  * @ns:		new state.
1161  * @flags:	Flags
1162  */
1163 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1164 			   union drbd_state ns, enum chg_state_flags flags)
1165 {
1166 	enum drbd_fencing_p fp;
1167 
1168 	if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1169 		clear_bit(CRASHED_PRIMARY, &mdev->flags);
1170 		if (mdev->p_uuid)
1171 			mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1172 	}
1173 
1174 	fp = FP_DONT_CARE;
1175 	if (get_ldev(mdev)) {
1176 		fp = mdev->ldev->dc.fencing;
1177 		put_ldev(mdev);
1178 	}
1179 
1180 	/* Inform userspace about the change... */
1181 	drbd_bcast_state(mdev, ns);
1182 
1183 	if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1184 	    (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1185 		drbd_khelper(mdev, "pri-on-incon-degr");
1186 
1187 	/* Here we have the actions that are performed after a
1188 	   state change. This function might sleep */
1189 
1190 	if (fp == FP_STONITH && ns.susp) {
1191 		/* case1: The outdate peer handler is successful:
1192 		 * case2: The connection was established again: */
1193 		if ((os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) ||
1194 		    (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)) {
1195 			tl_clear(mdev);
1196 			spin_lock_irq(&mdev->req_lock);
1197 			_drbd_set_state(_NS(mdev, susp, 0), CS_VERBOSE, NULL);
1198 			spin_unlock_irq(&mdev->req_lock);
1199 		}
1200 	}
1201 	/* Do not change the order of the if above and the two below... */
1202 	if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1203 		drbd_send_uuids(mdev);
1204 		drbd_send_state(mdev);
1205 	}
1206 	if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S)
1207 		drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL, "send_bitmap (WFBitMapS)");
1208 
1209 	/* Lost contact to peer's copy of the data */
1210 	if ((os.pdsk >= D_INCONSISTENT &&
1211 	     os.pdsk != D_UNKNOWN &&
1212 	     os.pdsk != D_OUTDATED)
1213 	&&  (ns.pdsk < D_INCONSISTENT ||
1214 	     ns.pdsk == D_UNKNOWN ||
1215 	     ns.pdsk == D_OUTDATED)) {
1216 		if (get_ldev(mdev)) {
1217 			if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1218 			    mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE &&
1219 			    !atomic_read(&mdev->new_c_uuid))
1220 				atomic_set(&mdev->new_c_uuid, 2);
1221 			put_ldev(mdev);
1222 		}
1223 	}
1224 
1225 	if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1226 		/* Diskless peer becomes primary or got connected do diskless, primary peer. */
1227 		if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0 &&
1228 		    !atomic_read(&mdev->new_c_uuid))
1229 			atomic_set(&mdev->new_c_uuid, 2);
1230 
1231 		/* D_DISKLESS Peer becomes secondary */
1232 		if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1233 			drbd_al_to_on_disk_bm(mdev);
1234 		put_ldev(mdev);
1235 	}
1236 
1237 	/* Last part of the attaching process ... */
1238 	if (ns.conn >= C_CONNECTED &&
1239 	    os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1240 		kfree(mdev->p_uuid); /* We expect to receive up-to-date UUIDs soon. */
1241 		mdev->p_uuid = NULL; /* ...to not use the old ones in the mean time */
1242 		drbd_send_sizes(mdev, 0, 0);  /* to start sync... */
1243 		drbd_send_uuids(mdev);
1244 		drbd_send_state(mdev);
1245 	}
1246 
1247 	/* We want to pause/continue resync, tell peer. */
1248 	if (ns.conn >= C_CONNECTED &&
1249 	     ((os.aftr_isp != ns.aftr_isp) ||
1250 	      (os.user_isp != ns.user_isp)))
1251 		drbd_send_state(mdev);
1252 
1253 	/* In case one of the isp bits got set, suspend other devices. */
1254 	if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1255 	    (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1256 		suspend_other_sg(mdev);
1257 
1258 	/* Make sure the peer gets informed about eventual state
1259 	   changes (ISP bits) while we were in WFReportParams. */
1260 	if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1261 		drbd_send_state(mdev);
1262 
1263 	/* We are in the progress to start a full sync... */
1264 	if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1265 	    (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1266 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, &abw_start_sync, "set_n_write from StartingSync");
1267 
1268 	/* We are invalidating our self... */
1269 	if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1270 	    os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1271 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
1272 
1273 	if (os.disk > D_FAILED && ns.disk == D_FAILED) {
1274 		enum drbd_io_error_p eh;
1275 
1276 		eh = EP_PASS_ON;
1277 		if (get_ldev_if_state(mdev, D_FAILED)) {
1278 			eh = mdev->ldev->dc.on_io_error;
1279 			put_ldev(mdev);
1280 		}
1281 
1282 		drbd_rs_cancel_all(mdev);
1283 		/* since get_ldev() only works as long as disk>=D_INCONSISTENT,
1284 		   and it is D_DISKLESS here, local_cnt can only go down, it can
1285 		   not increase... It will reach zero */
1286 		wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1287 		mdev->rs_total = 0;
1288 		mdev->rs_failed = 0;
1289 		atomic_set(&mdev->rs_pending_cnt, 0);
1290 
1291 		spin_lock_irq(&mdev->req_lock);
1292 		_drbd_set_state(_NS(mdev, disk, D_DISKLESS), CS_HARD, NULL);
1293 		spin_unlock_irq(&mdev->req_lock);
1294 
1295 		if (eh == EP_CALL_HELPER)
1296 			drbd_khelper(mdev, "local-io-error");
1297 	}
1298 
1299 	if (os.disk > D_DISKLESS && ns.disk == D_DISKLESS) {
1300 
1301 		if (os.disk == D_FAILED) /* && ns.disk == D_DISKLESS*/ {
1302 			if (drbd_send_state(mdev))
1303 				dev_warn(DEV, "Notified peer that my disk is broken.\n");
1304 			else
1305 				dev_err(DEV, "Sending state in drbd_io_error() failed\n");
1306 		}
1307 
1308 		wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1309 		lc_destroy(mdev->resync);
1310 		mdev->resync = NULL;
1311 		lc_destroy(mdev->act_log);
1312 		mdev->act_log = NULL;
1313 		__no_warn(local,
1314 			drbd_free_bc(mdev->ldev);
1315 			mdev->ldev = NULL;);
1316 
1317 		if (mdev->md_io_tmpp)
1318 			__free_page(mdev->md_io_tmpp);
1319 	}
1320 
1321 	/* Disks got bigger while they were detached */
1322 	if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1323 	    test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1324 		if (ns.conn == C_CONNECTED)
1325 			resync_after_online_grow(mdev);
1326 	}
1327 
1328 	/* A resync finished or aborted, wake paused devices... */
1329 	if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1330 	    (os.peer_isp && !ns.peer_isp) ||
1331 	    (os.user_isp && !ns.user_isp))
1332 		resume_next_sg(mdev);
1333 
1334 	/* Upon network connection, we need to start the receiver */
1335 	if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1336 		drbd_thread_start(&mdev->receiver);
1337 
1338 	/* Terminate worker thread if we are unconfigured - it will be
1339 	   restarted as needed... */
1340 	if (ns.disk == D_DISKLESS &&
1341 	    ns.conn == C_STANDALONE &&
1342 	    ns.role == R_SECONDARY) {
1343 		if (os.aftr_isp != ns.aftr_isp)
1344 			resume_next_sg(mdev);
1345 		/* set in __drbd_set_state, unless CONFIG_PENDING was set */
1346 		if (test_bit(DEVICE_DYING, &mdev->flags))
1347 			drbd_thread_stop_nowait(&mdev->worker);
1348 	}
1349 
1350 	drbd_md_sync(mdev);
1351 }
1352 
1353 static int w_new_current_uuid(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1354 {
1355 	if (get_ldev(mdev)) {
1356 		if (mdev->ldev->md.uuid[UI_BITMAP] == 0) {
1357 			drbd_uuid_new_current(mdev);
1358 			if (get_net_conf(mdev)) {
1359 				drbd_send_uuids(mdev);
1360 				put_net_conf(mdev);
1361 			}
1362 			drbd_md_sync(mdev);
1363 		}
1364 		put_ldev(mdev);
1365 	}
1366 	atomic_dec(&mdev->new_c_uuid);
1367 	wake_up(&mdev->misc_wait);
1368 
1369 	return 1;
1370 }
1371 
1372 static int drbd_thread_setup(void *arg)
1373 {
1374 	struct drbd_thread *thi = (struct drbd_thread *) arg;
1375 	struct drbd_conf *mdev = thi->mdev;
1376 	unsigned long flags;
1377 	int retval;
1378 
1379 restart:
1380 	retval = thi->function(thi);
1381 
1382 	spin_lock_irqsave(&thi->t_lock, flags);
1383 
1384 	/* if the receiver has been "Exiting", the last thing it did
1385 	 * was set the conn state to "StandAlone",
1386 	 * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1387 	 * and receiver thread will be "started".
1388 	 * drbd_thread_start needs to set "Restarting" in that case.
1389 	 * t_state check and assignment needs to be within the same spinlock,
1390 	 * so either thread_start sees Exiting, and can remap to Restarting,
1391 	 * or thread_start see None, and can proceed as normal.
1392 	 */
1393 
1394 	if (thi->t_state == Restarting) {
1395 		dev_info(DEV, "Restarting %s\n", current->comm);
1396 		thi->t_state = Running;
1397 		spin_unlock_irqrestore(&thi->t_lock, flags);
1398 		goto restart;
1399 	}
1400 
1401 	thi->task = NULL;
1402 	thi->t_state = None;
1403 	smp_mb();
1404 	complete(&thi->stop);
1405 	spin_unlock_irqrestore(&thi->t_lock, flags);
1406 
1407 	dev_info(DEV, "Terminating %s\n", current->comm);
1408 
1409 	/* Release mod reference taken when thread was started */
1410 	module_put(THIS_MODULE);
1411 	return retval;
1412 }
1413 
1414 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1415 		      int (*func) (struct drbd_thread *))
1416 {
1417 	spin_lock_init(&thi->t_lock);
1418 	thi->task    = NULL;
1419 	thi->t_state = None;
1420 	thi->function = func;
1421 	thi->mdev = mdev;
1422 }
1423 
1424 int drbd_thread_start(struct drbd_thread *thi)
1425 {
1426 	struct drbd_conf *mdev = thi->mdev;
1427 	struct task_struct *nt;
1428 	unsigned long flags;
1429 
1430 	const char *me =
1431 		thi == &mdev->receiver ? "receiver" :
1432 		thi == &mdev->asender  ? "asender"  :
1433 		thi == &mdev->worker   ? "worker"   : "NONSENSE";
1434 
1435 	/* is used from state engine doing drbd_thread_stop_nowait,
1436 	 * while holding the req lock irqsave */
1437 	spin_lock_irqsave(&thi->t_lock, flags);
1438 
1439 	switch (thi->t_state) {
1440 	case None:
1441 		dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1442 				me, current->comm, current->pid);
1443 
1444 		/* Get ref on module for thread - this is released when thread exits */
1445 		if (!try_module_get(THIS_MODULE)) {
1446 			dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1447 			spin_unlock_irqrestore(&thi->t_lock, flags);
1448 			return FALSE;
1449 		}
1450 
1451 		init_completion(&thi->stop);
1452 		D_ASSERT(thi->task == NULL);
1453 		thi->reset_cpu_mask = 1;
1454 		thi->t_state = Running;
1455 		spin_unlock_irqrestore(&thi->t_lock, flags);
1456 		flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1457 
1458 		nt = kthread_create(drbd_thread_setup, (void *) thi,
1459 				    "drbd%d_%s", mdev_to_minor(mdev), me);
1460 
1461 		if (IS_ERR(nt)) {
1462 			dev_err(DEV, "Couldn't start thread\n");
1463 
1464 			module_put(THIS_MODULE);
1465 			return FALSE;
1466 		}
1467 		spin_lock_irqsave(&thi->t_lock, flags);
1468 		thi->task = nt;
1469 		thi->t_state = Running;
1470 		spin_unlock_irqrestore(&thi->t_lock, flags);
1471 		wake_up_process(nt);
1472 		break;
1473 	case Exiting:
1474 		thi->t_state = Restarting;
1475 		dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1476 				me, current->comm, current->pid);
1477 		/* fall through */
1478 	case Running:
1479 	case Restarting:
1480 	default:
1481 		spin_unlock_irqrestore(&thi->t_lock, flags);
1482 		break;
1483 	}
1484 
1485 	return TRUE;
1486 }
1487 
1488 
1489 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1490 {
1491 	unsigned long flags;
1492 
1493 	enum drbd_thread_state ns = restart ? Restarting : Exiting;
1494 
1495 	/* may be called from state engine, holding the req lock irqsave */
1496 	spin_lock_irqsave(&thi->t_lock, flags);
1497 
1498 	if (thi->t_state == None) {
1499 		spin_unlock_irqrestore(&thi->t_lock, flags);
1500 		if (restart)
1501 			drbd_thread_start(thi);
1502 		return;
1503 	}
1504 
1505 	if (thi->t_state != ns) {
1506 		if (thi->task == NULL) {
1507 			spin_unlock_irqrestore(&thi->t_lock, flags);
1508 			return;
1509 		}
1510 
1511 		thi->t_state = ns;
1512 		smp_mb();
1513 		init_completion(&thi->stop);
1514 		if (thi->task != current)
1515 			force_sig(DRBD_SIGKILL, thi->task);
1516 
1517 	}
1518 
1519 	spin_unlock_irqrestore(&thi->t_lock, flags);
1520 
1521 	if (wait)
1522 		wait_for_completion(&thi->stop);
1523 }
1524 
1525 #ifdef CONFIG_SMP
1526 /**
1527  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1528  * @mdev:	DRBD device.
1529  *
1530  * Forces all threads of a device onto the same CPU. This is beneficial for
1531  * DRBD's performance. May be overwritten by user's configuration.
1532  */
1533 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1534 {
1535 	int ord, cpu;
1536 
1537 	/* user override. */
1538 	if (cpumask_weight(mdev->cpu_mask))
1539 		return;
1540 
1541 	ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1542 	for_each_online_cpu(cpu) {
1543 		if (ord-- == 0) {
1544 			cpumask_set_cpu(cpu, mdev->cpu_mask);
1545 			return;
1546 		}
1547 	}
1548 	/* should not be reached */
1549 	cpumask_setall(mdev->cpu_mask);
1550 }
1551 
1552 /**
1553  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1554  * @mdev:	DRBD device.
1555  *
1556  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1557  * prematurely.
1558  */
1559 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1560 {
1561 	struct task_struct *p = current;
1562 	struct drbd_thread *thi =
1563 		p == mdev->asender.task  ? &mdev->asender  :
1564 		p == mdev->receiver.task ? &mdev->receiver :
1565 		p == mdev->worker.task   ? &mdev->worker   :
1566 		NULL;
1567 	ERR_IF(thi == NULL)
1568 		return;
1569 	if (!thi->reset_cpu_mask)
1570 		return;
1571 	thi->reset_cpu_mask = 0;
1572 	set_cpus_allowed_ptr(p, mdev->cpu_mask);
1573 }
1574 #endif
1575 
1576 /* the appropriate socket mutex must be held already */
1577 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1578 			  enum drbd_packets cmd, struct p_header *h,
1579 			  size_t size, unsigned msg_flags)
1580 {
1581 	int sent, ok;
1582 
1583 	ERR_IF(!h) return FALSE;
1584 	ERR_IF(!size) return FALSE;
1585 
1586 	h->magic   = BE_DRBD_MAGIC;
1587 	h->command = cpu_to_be16(cmd);
1588 	h->length  = cpu_to_be16(size-sizeof(struct p_header));
1589 
1590 	sent = drbd_send(mdev, sock, h, size, msg_flags);
1591 
1592 	ok = (sent == size);
1593 	if (!ok)
1594 		dev_err(DEV, "short sent %s size=%d sent=%d\n",
1595 		    cmdname(cmd), (int)size, sent);
1596 	return ok;
1597 }
1598 
1599 /* don't pass the socket. we may only look at it
1600  * when we hold the appropriate socket mutex.
1601  */
1602 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1603 		  enum drbd_packets cmd, struct p_header *h, size_t size)
1604 {
1605 	int ok = 0;
1606 	struct socket *sock;
1607 
1608 	if (use_data_socket) {
1609 		mutex_lock(&mdev->data.mutex);
1610 		sock = mdev->data.socket;
1611 	} else {
1612 		mutex_lock(&mdev->meta.mutex);
1613 		sock = mdev->meta.socket;
1614 	}
1615 
1616 	/* drbd_disconnect() could have called drbd_free_sock()
1617 	 * while we were waiting in down()... */
1618 	if (likely(sock != NULL))
1619 		ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1620 
1621 	if (use_data_socket)
1622 		mutex_unlock(&mdev->data.mutex);
1623 	else
1624 		mutex_unlock(&mdev->meta.mutex);
1625 	return ok;
1626 }
1627 
1628 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
1629 		   size_t size)
1630 {
1631 	struct p_header h;
1632 	int ok;
1633 
1634 	h.magic   = BE_DRBD_MAGIC;
1635 	h.command = cpu_to_be16(cmd);
1636 	h.length  = cpu_to_be16(size);
1637 
1638 	if (!drbd_get_data_sock(mdev))
1639 		return 0;
1640 
1641 	ok = (sizeof(h) ==
1642 		drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
1643 	ok = ok && (size ==
1644 		drbd_send(mdev, mdev->data.socket, data, size, 0));
1645 
1646 	drbd_put_data_sock(mdev);
1647 
1648 	return ok;
1649 }
1650 
1651 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
1652 {
1653 	struct p_rs_param_89 *p;
1654 	struct socket *sock;
1655 	int size, rv;
1656 	const int apv = mdev->agreed_pro_version;
1657 
1658 	size = apv <= 87 ? sizeof(struct p_rs_param)
1659 		: apv == 88 ? sizeof(struct p_rs_param)
1660 			+ strlen(mdev->sync_conf.verify_alg) + 1
1661 		: /* 89 */    sizeof(struct p_rs_param_89);
1662 
1663 	/* used from admin command context and receiver/worker context.
1664 	 * to avoid kmalloc, grab the socket right here,
1665 	 * then use the pre-allocated sbuf there */
1666 	mutex_lock(&mdev->data.mutex);
1667 	sock = mdev->data.socket;
1668 
1669 	if (likely(sock != NULL)) {
1670 		enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
1671 
1672 		p = &mdev->data.sbuf.rs_param_89;
1673 
1674 		/* initialize verify_alg and csums_alg */
1675 		memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
1676 
1677 		p->rate = cpu_to_be32(sc->rate);
1678 
1679 		if (apv >= 88)
1680 			strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
1681 		if (apv >= 89)
1682 			strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
1683 
1684 		rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
1685 	} else
1686 		rv = 0; /* not ok */
1687 
1688 	mutex_unlock(&mdev->data.mutex);
1689 
1690 	return rv;
1691 }
1692 
1693 int drbd_send_protocol(struct drbd_conf *mdev)
1694 {
1695 	struct p_protocol *p;
1696 	int size, cf, rv;
1697 
1698 	size = sizeof(struct p_protocol);
1699 
1700 	if (mdev->agreed_pro_version >= 87)
1701 		size += strlen(mdev->net_conf->integrity_alg) + 1;
1702 
1703 	/* we must not recurse into our own queue,
1704 	 * as that is blocked during handshake */
1705 	p = kmalloc(size, GFP_NOIO);
1706 	if (p == NULL)
1707 		return 0;
1708 
1709 	p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
1710 	p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
1711 	p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
1712 	p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
1713 	p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
1714 
1715 	cf = 0;
1716 	if (mdev->net_conf->want_lose)
1717 		cf |= CF_WANT_LOSE;
1718 	if (mdev->net_conf->dry_run) {
1719 		if (mdev->agreed_pro_version >= 92)
1720 			cf |= CF_DRY_RUN;
1721 		else {
1722 			dev_err(DEV, "--dry-run is not supported by peer");
1723 			kfree(p);
1724 			return 0;
1725 		}
1726 	}
1727 	p->conn_flags    = cpu_to_be32(cf);
1728 
1729 	if (mdev->agreed_pro_version >= 87)
1730 		strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
1731 
1732 	rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
1733 			   (struct p_header *)p, size);
1734 	kfree(p);
1735 	return rv;
1736 }
1737 
1738 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1739 {
1740 	struct p_uuids p;
1741 	int i;
1742 
1743 	if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1744 		return 1;
1745 
1746 	for (i = UI_CURRENT; i < UI_SIZE; i++)
1747 		p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1748 
1749 	mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1750 	p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1751 	uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
1752 	uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1753 	uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1754 	p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1755 
1756 	put_ldev(mdev);
1757 
1758 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
1759 			     (struct p_header *)&p, sizeof(p));
1760 }
1761 
1762 int drbd_send_uuids(struct drbd_conf *mdev)
1763 {
1764 	return _drbd_send_uuids(mdev, 0);
1765 }
1766 
1767 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1768 {
1769 	return _drbd_send_uuids(mdev, 8);
1770 }
1771 
1772 
1773 int drbd_send_sync_uuid(struct drbd_conf *mdev, u64 val)
1774 {
1775 	struct p_rs_uuid p;
1776 
1777 	p.uuid = cpu_to_be64(val);
1778 
1779 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
1780 			     (struct p_header *)&p, sizeof(p));
1781 }
1782 
1783 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1784 {
1785 	struct p_sizes p;
1786 	sector_t d_size, u_size;
1787 	int q_order_type;
1788 	int ok;
1789 
1790 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1791 		D_ASSERT(mdev->ldev->backing_bdev);
1792 		d_size = drbd_get_max_capacity(mdev->ldev);
1793 		u_size = mdev->ldev->dc.disk_size;
1794 		q_order_type = drbd_queue_order_type(mdev);
1795 		put_ldev(mdev);
1796 	} else {
1797 		d_size = 0;
1798 		u_size = 0;
1799 		q_order_type = QUEUE_ORDERED_NONE;
1800 	}
1801 
1802 	p.d_size = cpu_to_be64(d_size);
1803 	p.u_size = cpu_to_be64(u_size);
1804 	p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1805 	p.max_segment_size = cpu_to_be32(queue_max_segment_size(mdev->rq_queue));
1806 	p.queue_order_type = cpu_to_be16(q_order_type);
1807 	p.dds_flags = cpu_to_be16(flags);
1808 
1809 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
1810 			   (struct p_header *)&p, sizeof(p));
1811 	return ok;
1812 }
1813 
1814 /**
1815  * drbd_send_state() - Sends the drbd state to the peer
1816  * @mdev:	DRBD device.
1817  */
1818 int drbd_send_state(struct drbd_conf *mdev)
1819 {
1820 	struct socket *sock;
1821 	struct p_state p;
1822 	int ok = 0;
1823 
1824 	/* Grab state lock so we wont send state if we're in the middle
1825 	 * of a cluster wide state change on another thread */
1826 	drbd_state_lock(mdev);
1827 
1828 	mutex_lock(&mdev->data.mutex);
1829 
1830 	p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1831 	sock = mdev->data.socket;
1832 
1833 	if (likely(sock != NULL)) {
1834 		ok = _drbd_send_cmd(mdev, sock, P_STATE,
1835 				    (struct p_header *)&p, sizeof(p), 0);
1836 	}
1837 
1838 	mutex_unlock(&mdev->data.mutex);
1839 
1840 	drbd_state_unlock(mdev);
1841 	return ok;
1842 }
1843 
1844 int drbd_send_state_req(struct drbd_conf *mdev,
1845 	union drbd_state mask, union drbd_state val)
1846 {
1847 	struct p_req_state p;
1848 
1849 	p.mask    = cpu_to_be32(mask.i);
1850 	p.val     = cpu_to_be32(val.i);
1851 
1852 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
1853 			     (struct p_header *)&p, sizeof(p));
1854 }
1855 
1856 int drbd_send_sr_reply(struct drbd_conf *mdev, int retcode)
1857 {
1858 	struct p_req_state_reply p;
1859 
1860 	p.retcode    = cpu_to_be32(retcode);
1861 
1862 	return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
1863 			     (struct p_header *)&p, sizeof(p));
1864 }
1865 
1866 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1867 	struct p_compressed_bm *p,
1868 	struct bm_xfer_ctx *c)
1869 {
1870 	struct bitstream bs;
1871 	unsigned long plain_bits;
1872 	unsigned long tmp;
1873 	unsigned long rl;
1874 	unsigned len;
1875 	unsigned toggle;
1876 	int bits;
1877 
1878 	/* may we use this feature? */
1879 	if ((mdev->sync_conf.use_rle == 0) ||
1880 		(mdev->agreed_pro_version < 90))
1881 			return 0;
1882 
1883 	if (c->bit_offset >= c->bm_bits)
1884 		return 0; /* nothing to do. */
1885 
1886 	/* use at most thus many bytes */
1887 	bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1888 	memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1889 	/* plain bits covered in this code string */
1890 	plain_bits = 0;
1891 
1892 	/* p->encoding & 0x80 stores whether the first run length is set.
1893 	 * bit offset is implicit.
1894 	 * start with toggle == 2 to be able to tell the first iteration */
1895 	toggle = 2;
1896 
1897 	/* see how much plain bits we can stuff into one packet
1898 	 * using RLE and VLI. */
1899 	do {
1900 		tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1901 				    : _drbd_bm_find_next(mdev, c->bit_offset);
1902 		if (tmp == -1UL)
1903 			tmp = c->bm_bits;
1904 		rl = tmp - c->bit_offset;
1905 
1906 		if (toggle == 2) { /* first iteration */
1907 			if (rl == 0) {
1908 				/* the first checked bit was set,
1909 				 * store start value, */
1910 				DCBP_set_start(p, 1);
1911 				/* but skip encoding of zero run length */
1912 				toggle = !toggle;
1913 				continue;
1914 			}
1915 			DCBP_set_start(p, 0);
1916 		}
1917 
1918 		/* paranoia: catch zero runlength.
1919 		 * can only happen if bitmap is modified while we scan it. */
1920 		if (rl == 0) {
1921 			dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1922 			    "t:%u bo:%lu\n", toggle, c->bit_offset);
1923 			return -1;
1924 		}
1925 
1926 		bits = vli_encode_bits(&bs, rl);
1927 		if (bits == -ENOBUFS) /* buffer full */
1928 			break;
1929 		if (bits <= 0) {
1930 			dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1931 			return 0;
1932 		}
1933 
1934 		toggle = !toggle;
1935 		plain_bits += rl;
1936 		c->bit_offset = tmp;
1937 	} while (c->bit_offset < c->bm_bits);
1938 
1939 	len = bs.cur.b - p->code + !!bs.cur.bit;
1940 
1941 	if (plain_bits < (len << 3)) {
1942 		/* incompressible with this method.
1943 		 * we need to rewind both word and bit position. */
1944 		c->bit_offset -= plain_bits;
1945 		bm_xfer_ctx_bit_to_word_offset(c);
1946 		c->bit_offset = c->word_offset * BITS_PER_LONG;
1947 		return 0;
1948 	}
1949 
1950 	/* RLE + VLI was able to compress it just fine.
1951 	 * update c->word_offset. */
1952 	bm_xfer_ctx_bit_to_word_offset(c);
1953 
1954 	/* store pad_bits */
1955 	DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1956 
1957 	return len;
1958 }
1959 
1960 enum { OK, FAILED, DONE }
1961 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
1962 	struct p_header *h, struct bm_xfer_ctx *c)
1963 {
1964 	struct p_compressed_bm *p = (void*)h;
1965 	unsigned long num_words;
1966 	int len;
1967 	int ok;
1968 
1969 	len = fill_bitmap_rle_bits(mdev, p, c);
1970 
1971 	if (len < 0)
1972 		return FAILED;
1973 
1974 	if (len) {
1975 		DCBP_set_code(p, RLE_VLI_Bits);
1976 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
1977 			sizeof(*p) + len, 0);
1978 
1979 		c->packets[0]++;
1980 		c->bytes[0] += sizeof(*p) + len;
1981 
1982 		if (c->bit_offset >= c->bm_bits)
1983 			len = 0; /* DONE */
1984 	} else {
1985 		/* was not compressible.
1986 		 * send a buffer full of plain text bits instead. */
1987 		num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
1988 		len = num_words * sizeof(long);
1989 		if (len)
1990 			drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
1991 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
1992 				   h, sizeof(struct p_header) + len, 0);
1993 		c->word_offset += num_words;
1994 		c->bit_offset = c->word_offset * BITS_PER_LONG;
1995 
1996 		c->packets[1]++;
1997 		c->bytes[1] += sizeof(struct p_header) + len;
1998 
1999 		if (c->bit_offset > c->bm_bits)
2000 			c->bit_offset = c->bm_bits;
2001 	}
2002 	ok = ok ? ((len == 0) ? DONE : OK) : FAILED;
2003 
2004 	if (ok == DONE)
2005 		INFO_bm_xfer_stats(mdev, "send", c);
2006 	return ok;
2007 }
2008 
2009 /* See the comment at receive_bitmap() */
2010 int _drbd_send_bitmap(struct drbd_conf *mdev)
2011 {
2012 	struct bm_xfer_ctx c;
2013 	struct p_header *p;
2014 	int ret;
2015 
2016 	ERR_IF(!mdev->bitmap) return FALSE;
2017 
2018 	/* maybe we should use some per thread scratch page,
2019 	 * and allocate that during initial device creation? */
2020 	p = (struct p_header *) __get_free_page(GFP_NOIO);
2021 	if (!p) {
2022 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
2023 		return FALSE;
2024 	}
2025 
2026 	if (get_ldev(mdev)) {
2027 		if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2028 			dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2029 			drbd_bm_set_all(mdev);
2030 			if (drbd_bm_write(mdev)) {
2031 				/* write_bm did fail! Leave full sync flag set in Meta P_DATA
2032 				 * but otherwise process as per normal - need to tell other
2033 				 * side that a full resync is required! */
2034 				dev_err(DEV, "Failed to write bitmap to disk!\n");
2035 			} else {
2036 				drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2037 				drbd_md_sync(mdev);
2038 			}
2039 		}
2040 		put_ldev(mdev);
2041 	}
2042 
2043 	c = (struct bm_xfer_ctx) {
2044 		.bm_bits = drbd_bm_bits(mdev),
2045 		.bm_words = drbd_bm_words(mdev),
2046 	};
2047 
2048 	do {
2049 		ret = send_bitmap_rle_or_plain(mdev, p, &c);
2050 	} while (ret == OK);
2051 
2052 	free_page((unsigned long) p);
2053 	return (ret == DONE);
2054 }
2055 
2056 int drbd_send_bitmap(struct drbd_conf *mdev)
2057 {
2058 	int err;
2059 
2060 	if (!drbd_get_data_sock(mdev))
2061 		return -1;
2062 	err = !_drbd_send_bitmap(mdev);
2063 	drbd_put_data_sock(mdev);
2064 	return err;
2065 }
2066 
2067 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2068 {
2069 	int ok;
2070 	struct p_barrier_ack p;
2071 
2072 	p.barrier  = barrier_nr;
2073 	p.set_size = cpu_to_be32(set_size);
2074 
2075 	if (mdev->state.conn < C_CONNECTED)
2076 		return FALSE;
2077 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2078 			(struct p_header *)&p, sizeof(p));
2079 	return ok;
2080 }
2081 
2082 /**
2083  * _drbd_send_ack() - Sends an ack packet
2084  * @mdev:	DRBD device.
2085  * @cmd:	Packet command code.
2086  * @sector:	sector, needs to be in big endian byte order
2087  * @blksize:	size in byte, needs to be in big endian byte order
2088  * @block_id:	Id, big endian byte order
2089  */
2090 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2091 			  u64 sector,
2092 			  u32 blksize,
2093 			  u64 block_id)
2094 {
2095 	int ok;
2096 	struct p_block_ack p;
2097 
2098 	p.sector   = sector;
2099 	p.block_id = block_id;
2100 	p.blksize  = blksize;
2101 	p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2102 
2103 	if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2104 		return FALSE;
2105 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2106 				(struct p_header *)&p, sizeof(p));
2107 	return ok;
2108 }
2109 
2110 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2111 		     struct p_data *dp)
2112 {
2113 	const int header_size = sizeof(struct p_data)
2114 			      - sizeof(struct p_header);
2115 	int data_size  = ((struct p_header *)dp)->length - header_size;
2116 
2117 	return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2118 			      dp->block_id);
2119 }
2120 
2121 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2122 		     struct p_block_req *rp)
2123 {
2124 	return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2125 }
2126 
2127 /**
2128  * drbd_send_ack() - Sends an ack packet
2129  * @mdev:	DRBD device.
2130  * @cmd:	Packet command code.
2131  * @e:		Epoch entry.
2132  */
2133 int drbd_send_ack(struct drbd_conf *mdev,
2134 	enum drbd_packets cmd, struct drbd_epoch_entry *e)
2135 {
2136 	return _drbd_send_ack(mdev, cmd,
2137 			      cpu_to_be64(e->sector),
2138 			      cpu_to_be32(e->size),
2139 			      e->block_id);
2140 }
2141 
2142 /* This function misuses the block_id field to signal if the blocks
2143  * are is sync or not. */
2144 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2145 		     sector_t sector, int blksize, u64 block_id)
2146 {
2147 	return _drbd_send_ack(mdev, cmd,
2148 			      cpu_to_be64(sector),
2149 			      cpu_to_be32(blksize),
2150 			      cpu_to_be64(block_id));
2151 }
2152 
2153 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2154 		       sector_t sector, int size, u64 block_id)
2155 {
2156 	int ok;
2157 	struct p_block_req p;
2158 
2159 	p.sector   = cpu_to_be64(sector);
2160 	p.block_id = block_id;
2161 	p.blksize  = cpu_to_be32(size);
2162 
2163 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2164 				(struct p_header *)&p, sizeof(p));
2165 	return ok;
2166 }
2167 
2168 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2169 			    sector_t sector, int size,
2170 			    void *digest, int digest_size,
2171 			    enum drbd_packets cmd)
2172 {
2173 	int ok;
2174 	struct p_block_req p;
2175 
2176 	p.sector   = cpu_to_be64(sector);
2177 	p.block_id = BE_DRBD_MAGIC + 0xbeef;
2178 	p.blksize  = cpu_to_be32(size);
2179 
2180 	p.head.magic   = BE_DRBD_MAGIC;
2181 	p.head.command = cpu_to_be16(cmd);
2182 	p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header) + digest_size);
2183 
2184 	mutex_lock(&mdev->data.mutex);
2185 
2186 	ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2187 	ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2188 
2189 	mutex_unlock(&mdev->data.mutex);
2190 
2191 	return ok;
2192 }
2193 
2194 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2195 {
2196 	int ok;
2197 	struct p_block_req p;
2198 
2199 	p.sector   = cpu_to_be64(sector);
2200 	p.block_id = BE_DRBD_MAGIC + 0xbabe;
2201 	p.blksize  = cpu_to_be32(size);
2202 
2203 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2204 			   (struct p_header *)&p, sizeof(p));
2205 	return ok;
2206 }
2207 
2208 static int drbd_send_delay_probe(struct drbd_conf *mdev, struct drbd_socket *ds)
2209 {
2210 	struct p_delay_probe dp;
2211 	int offset, ok = 0;
2212 	struct timeval now;
2213 
2214 	mutex_lock(&ds->mutex);
2215 	if (likely(ds->socket)) {
2216 		do_gettimeofday(&now);
2217 		offset = now.tv_usec - mdev->dps_time.tv_usec +
2218 			 (now.tv_sec - mdev->dps_time.tv_sec) * 1000000;
2219 		dp.seq_num  = cpu_to_be32(mdev->delay_seq);
2220 		dp.offset   = cpu_to_be32(offset);
2221 
2222 		ok = _drbd_send_cmd(mdev, ds->socket, P_DELAY_PROBE,
2223 				    (struct p_header *)&dp, sizeof(dp), 0);
2224 	}
2225 	mutex_unlock(&ds->mutex);
2226 
2227 	return ok;
2228 }
2229 
2230 static int drbd_send_delay_probes(struct drbd_conf *mdev)
2231 {
2232 	int ok;
2233 
2234 	mdev->delay_seq++;
2235 	do_gettimeofday(&mdev->dps_time);
2236 	ok = drbd_send_delay_probe(mdev, &mdev->meta);
2237 	ok = ok && drbd_send_delay_probe(mdev, &mdev->data);
2238 
2239 	mdev->dp_volume_last = mdev->send_cnt;
2240 	mod_timer(&mdev->delay_probe_timer, jiffies + mdev->sync_conf.dp_interval * HZ / 10);
2241 
2242 	return ok;
2243 }
2244 
2245 /* called on sndtimeo
2246  * returns FALSE if we should retry,
2247  * TRUE if we think connection is dead
2248  */
2249 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2250 {
2251 	int drop_it;
2252 	/* long elapsed = (long)(jiffies - mdev->last_received); */
2253 
2254 	drop_it =   mdev->meta.socket == sock
2255 		|| !mdev->asender.task
2256 		|| get_t_state(&mdev->asender) != Running
2257 		|| mdev->state.conn < C_CONNECTED;
2258 
2259 	if (drop_it)
2260 		return TRUE;
2261 
2262 	drop_it = !--mdev->ko_count;
2263 	if (!drop_it) {
2264 		dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2265 		       current->comm, current->pid, mdev->ko_count);
2266 		request_ping(mdev);
2267 	}
2268 
2269 	return drop_it; /* && (mdev->state == R_PRIMARY) */;
2270 }
2271 
2272 /* The idea of sendpage seems to be to put some kind of reference
2273  * to the page into the skb, and to hand it over to the NIC. In
2274  * this process get_page() gets called.
2275  *
2276  * As soon as the page was really sent over the network put_page()
2277  * gets called by some part of the network layer. [ NIC driver? ]
2278  *
2279  * [ get_page() / put_page() increment/decrement the count. If count
2280  *   reaches 0 the page will be freed. ]
2281  *
2282  * This works nicely with pages from FSs.
2283  * But this means that in protocol A we might signal IO completion too early!
2284  *
2285  * In order not to corrupt data during a resync we must make sure
2286  * that we do not reuse our own buffer pages (EEs) to early, therefore
2287  * we have the net_ee list.
2288  *
2289  * XFS seems to have problems, still, it submits pages with page_count == 0!
2290  * As a workaround, we disable sendpage on pages
2291  * with page_count == 0 or PageSlab.
2292  */
2293 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2294 		   int offset, size_t size)
2295 {
2296 	int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, 0);
2297 	kunmap(page);
2298 	if (sent == size)
2299 		mdev->send_cnt += size>>9;
2300 	return sent == size;
2301 }
2302 
2303 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2304 		    int offset, size_t size)
2305 {
2306 	mm_segment_t oldfs = get_fs();
2307 	int sent, ok;
2308 	int len = size;
2309 
2310 	/* e.g. XFS meta- & log-data is in slab pages, which have a
2311 	 * page_count of 0 and/or have PageSlab() set.
2312 	 * we cannot use send_page for those, as that does get_page();
2313 	 * put_page(); and would cause either a VM_BUG directly, or
2314 	 * __page_cache_release a page that would actually still be referenced
2315 	 * by someone, leading to some obscure delayed Oops somewhere else. */
2316 	if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2317 		return _drbd_no_send_page(mdev, page, offset, size);
2318 
2319 	drbd_update_congested(mdev);
2320 	set_fs(KERNEL_DS);
2321 	do {
2322 		sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2323 							offset, len,
2324 							MSG_NOSIGNAL);
2325 		if (sent == -EAGAIN) {
2326 			if (we_should_drop_the_connection(mdev,
2327 							  mdev->data.socket))
2328 				break;
2329 			else
2330 				continue;
2331 		}
2332 		if (sent <= 0) {
2333 			dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2334 			     __func__, (int)size, len, sent);
2335 			break;
2336 		}
2337 		len    -= sent;
2338 		offset += sent;
2339 	} while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2340 	set_fs(oldfs);
2341 	clear_bit(NET_CONGESTED, &mdev->flags);
2342 
2343 	ok = (len == 0);
2344 	if (likely(ok))
2345 		mdev->send_cnt += size>>9;
2346 	return ok;
2347 }
2348 
2349 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2350 {
2351 	struct bio_vec *bvec;
2352 	int i;
2353 	__bio_for_each_segment(bvec, bio, i, 0) {
2354 		if (!_drbd_no_send_page(mdev, bvec->bv_page,
2355 				     bvec->bv_offset, bvec->bv_len))
2356 			return 0;
2357 	}
2358 	return 1;
2359 }
2360 
2361 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2362 {
2363 	struct bio_vec *bvec;
2364 	int i;
2365 	__bio_for_each_segment(bvec, bio, i, 0) {
2366 		if (!_drbd_send_page(mdev, bvec->bv_page,
2367 				     bvec->bv_offset, bvec->bv_len))
2368 			return 0;
2369 	}
2370 
2371 	return 1;
2372 }
2373 
2374 static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2375 {
2376 	struct page *page = e->pages;
2377 	unsigned len = e->size;
2378 	page_chain_for_each(page) {
2379 		unsigned l = min_t(unsigned, len, PAGE_SIZE);
2380 		if (!_drbd_send_page(mdev, page, 0, l))
2381 			return 0;
2382 		len -= l;
2383 	}
2384 	return 1;
2385 }
2386 
2387 static void consider_delay_probes(struct drbd_conf *mdev)
2388 {
2389 	if (mdev->state.conn != C_SYNC_SOURCE || mdev->agreed_pro_version < 93)
2390 		return;
2391 
2392 	if (mdev->dp_volume_last + mdev->sync_conf.dp_volume * 2 < mdev->send_cnt)
2393 		drbd_send_delay_probes(mdev);
2394 }
2395 
2396 static int w_delay_probes(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
2397 {
2398 	if (!cancel && mdev->state.conn == C_SYNC_SOURCE)
2399 		drbd_send_delay_probes(mdev);
2400 
2401 	return 1;
2402 }
2403 
2404 static void delay_probe_timer_fn(unsigned long data)
2405 {
2406 	struct drbd_conf *mdev = (struct drbd_conf *) data;
2407 
2408 	if (list_empty(&mdev->delay_probe_work.list))
2409 		drbd_queue_work(&mdev->data.work, &mdev->delay_probe_work);
2410 }
2411 
2412 /* Used to send write requests
2413  * R_PRIMARY -> Peer	(P_DATA)
2414  */
2415 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2416 {
2417 	int ok = 1;
2418 	struct p_data p;
2419 	unsigned int dp_flags = 0;
2420 	void *dgb;
2421 	int dgs;
2422 
2423 	if (!drbd_get_data_sock(mdev))
2424 		return 0;
2425 
2426 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2427 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2428 
2429 	p.head.magic   = BE_DRBD_MAGIC;
2430 	p.head.command = cpu_to_be16(P_DATA);
2431 	p.head.length  =
2432 		cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + req->size);
2433 
2434 	p.sector   = cpu_to_be64(req->sector);
2435 	p.block_id = (unsigned long)req;
2436 	p.seq_num  = cpu_to_be32(req->seq_num =
2437 				 atomic_add_return(1, &mdev->packet_seq));
2438 	dp_flags = 0;
2439 
2440 	/* NOTE: no need to check if barriers supported here as we would
2441 	 *       not pass the test in make_request_common in that case
2442 	 */
2443 	if (bio_rw_flagged(req->master_bio, BIO_RW_BARRIER)) {
2444 		dev_err(DEV, "ASSERT FAILED would have set DP_HARDBARRIER\n");
2445 		/* dp_flags |= DP_HARDBARRIER; */
2446 	}
2447 	if (bio_rw_flagged(req->master_bio, BIO_RW_SYNCIO))
2448 		dp_flags |= DP_RW_SYNC;
2449 	/* for now handle SYNCIO and UNPLUG
2450 	 * as if they still were one and the same flag */
2451 	if (bio_rw_flagged(req->master_bio, BIO_RW_UNPLUG))
2452 		dp_flags |= DP_RW_SYNC;
2453 	if (mdev->state.conn >= C_SYNC_SOURCE &&
2454 	    mdev->state.conn <= C_PAUSED_SYNC_T)
2455 		dp_flags |= DP_MAY_SET_IN_SYNC;
2456 
2457 	p.dp_flags = cpu_to_be32(dp_flags);
2458 	set_bit(UNPLUG_REMOTE, &mdev->flags);
2459 	ok = (sizeof(p) ==
2460 		drbd_send(mdev, mdev->data.socket, &p, sizeof(p), MSG_MORE));
2461 	if (ok && dgs) {
2462 		dgb = mdev->int_dig_out;
2463 		drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2464 		ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
2465 	}
2466 	if (ok) {
2467 		if (mdev->net_conf->wire_protocol == DRBD_PROT_A)
2468 			ok = _drbd_send_bio(mdev, req->master_bio);
2469 		else
2470 			ok = _drbd_send_zc_bio(mdev, req->master_bio);
2471 	}
2472 
2473 	drbd_put_data_sock(mdev);
2474 
2475 	if (ok)
2476 		consider_delay_probes(mdev);
2477 
2478 	return ok;
2479 }
2480 
2481 /* answer packet, used to send data back for read requests:
2482  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2483  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2484  */
2485 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2486 		    struct drbd_epoch_entry *e)
2487 {
2488 	int ok;
2489 	struct p_data p;
2490 	void *dgb;
2491 	int dgs;
2492 
2493 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2494 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2495 
2496 	p.head.magic   = BE_DRBD_MAGIC;
2497 	p.head.command = cpu_to_be16(cmd);
2498 	p.head.length  =
2499 		cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + e->size);
2500 
2501 	p.sector   = cpu_to_be64(e->sector);
2502 	p.block_id = e->block_id;
2503 	/* p.seq_num  = 0;    No sequence numbers here.. */
2504 
2505 	/* Only called by our kernel thread.
2506 	 * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2507 	 * in response to admin command or module unload.
2508 	 */
2509 	if (!drbd_get_data_sock(mdev))
2510 		return 0;
2511 
2512 	ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p,
2513 					sizeof(p), MSG_MORE);
2514 	if (ok && dgs) {
2515 		dgb = mdev->int_dig_out;
2516 		drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
2517 		ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
2518 	}
2519 	if (ok)
2520 		ok = _drbd_send_zc_ee(mdev, e);
2521 
2522 	drbd_put_data_sock(mdev);
2523 
2524 	if (ok)
2525 		consider_delay_probes(mdev);
2526 
2527 	return ok;
2528 }
2529 
2530 /*
2531   drbd_send distinguishes two cases:
2532 
2533   Packets sent via the data socket "sock"
2534   and packets sent via the meta data socket "msock"
2535 
2536 		    sock                      msock
2537   -----------------+-------------------------+------------------------------
2538   timeout           conf.timeout / 2          conf.timeout / 2
2539   timeout action    send a ping via msock     Abort communication
2540 					      and close all sockets
2541 */
2542 
2543 /*
2544  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2545  */
2546 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2547 	      void *buf, size_t size, unsigned msg_flags)
2548 {
2549 	struct kvec iov;
2550 	struct msghdr msg;
2551 	int rv, sent = 0;
2552 
2553 	if (!sock)
2554 		return -1000;
2555 
2556 	/* THINK  if (signal_pending) return ... ? */
2557 
2558 	iov.iov_base = buf;
2559 	iov.iov_len  = size;
2560 
2561 	msg.msg_name       = NULL;
2562 	msg.msg_namelen    = 0;
2563 	msg.msg_control    = NULL;
2564 	msg.msg_controllen = 0;
2565 	msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
2566 
2567 	if (sock == mdev->data.socket) {
2568 		mdev->ko_count = mdev->net_conf->ko_count;
2569 		drbd_update_congested(mdev);
2570 	}
2571 	do {
2572 		/* STRANGE
2573 		 * tcp_sendmsg does _not_ use its size parameter at all ?
2574 		 *
2575 		 * -EAGAIN on timeout, -EINTR on signal.
2576 		 */
2577 /* THINK
2578  * do we need to block DRBD_SIG if sock == &meta.socket ??
2579  * otherwise wake_asender() might interrupt some send_*Ack !
2580  */
2581 		rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
2582 		if (rv == -EAGAIN) {
2583 			if (we_should_drop_the_connection(mdev, sock))
2584 				break;
2585 			else
2586 				continue;
2587 		}
2588 		D_ASSERT(rv != 0);
2589 		if (rv == -EINTR) {
2590 			flush_signals(current);
2591 			rv = 0;
2592 		}
2593 		if (rv < 0)
2594 			break;
2595 		sent += rv;
2596 		iov.iov_base += rv;
2597 		iov.iov_len  -= rv;
2598 	} while (sent < size);
2599 
2600 	if (sock == mdev->data.socket)
2601 		clear_bit(NET_CONGESTED, &mdev->flags);
2602 
2603 	if (rv <= 0) {
2604 		if (rv != -EAGAIN) {
2605 			dev_err(DEV, "%s_sendmsg returned %d\n",
2606 			    sock == mdev->meta.socket ? "msock" : "sock",
2607 			    rv);
2608 			drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
2609 		} else
2610 			drbd_force_state(mdev, NS(conn, C_TIMEOUT));
2611 	}
2612 
2613 	return sent;
2614 }
2615 
2616 static int drbd_open(struct block_device *bdev, fmode_t mode)
2617 {
2618 	struct drbd_conf *mdev = bdev->bd_disk->private_data;
2619 	unsigned long flags;
2620 	int rv = 0;
2621 
2622 	spin_lock_irqsave(&mdev->req_lock, flags);
2623 	/* to have a stable mdev->state.role
2624 	 * and no race with updating open_cnt */
2625 
2626 	if (mdev->state.role != R_PRIMARY) {
2627 		if (mode & FMODE_WRITE)
2628 			rv = -EROFS;
2629 		else if (!allow_oos)
2630 			rv = -EMEDIUMTYPE;
2631 	}
2632 
2633 	if (!rv)
2634 		mdev->open_cnt++;
2635 	spin_unlock_irqrestore(&mdev->req_lock, flags);
2636 
2637 	return rv;
2638 }
2639 
2640 static int drbd_release(struct gendisk *gd, fmode_t mode)
2641 {
2642 	struct drbd_conf *mdev = gd->private_data;
2643 	mdev->open_cnt--;
2644 	return 0;
2645 }
2646 
2647 static void drbd_unplug_fn(struct request_queue *q)
2648 {
2649 	struct drbd_conf *mdev = q->queuedata;
2650 
2651 	/* unplug FIRST */
2652 	spin_lock_irq(q->queue_lock);
2653 	blk_remove_plug(q);
2654 	spin_unlock_irq(q->queue_lock);
2655 
2656 	/* only if connected */
2657 	spin_lock_irq(&mdev->req_lock);
2658 	if (mdev->state.pdsk >= D_INCONSISTENT && mdev->state.conn >= C_CONNECTED) {
2659 		D_ASSERT(mdev->state.role == R_PRIMARY);
2660 		if (test_and_clear_bit(UNPLUG_REMOTE, &mdev->flags)) {
2661 			/* add to the data.work queue,
2662 			 * unless already queued.
2663 			 * XXX this might be a good addition to drbd_queue_work
2664 			 * anyways, to detect "double queuing" ... */
2665 			if (list_empty(&mdev->unplug_work.list))
2666 				drbd_queue_work(&mdev->data.work,
2667 						&mdev->unplug_work);
2668 		}
2669 	}
2670 	spin_unlock_irq(&mdev->req_lock);
2671 
2672 	if (mdev->state.disk >= D_INCONSISTENT)
2673 		drbd_kick_lo(mdev);
2674 }
2675 
2676 static void drbd_set_defaults(struct drbd_conf *mdev)
2677 {
2678 	mdev->sync_conf.after      = DRBD_AFTER_DEF;
2679 	mdev->sync_conf.rate       = DRBD_RATE_DEF;
2680 	mdev->sync_conf.al_extents = DRBD_AL_EXTENTS_DEF;
2681 	mdev->state = (union drbd_state) {
2682 		{ .role = R_SECONDARY,
2683 		  .peer = R_UNKNOWN,
2684 		  .conn = C_STANDALONE,
2685 		  .disk = D_DISKLESS,
2686 		  .pdsk = D_UNKNOWN,
2687 		  .susp = 0
2688 		} };
2689 }
2690 
2691 void drbd_init_set_defaults(struct drbd_conf *mdev)
2692 {
2693 	/* the memset(,0,) did most of this.
2694 	 * note: only assignments, no allocation in here */
2695 
2696 	drbd_set_defaults(mdev);
2697 
2698 	/* for now, we do NOT yet support it,
2699 	 * even though we start some framework
2700 	 * to eventually support barriers */
2701 	set_bit(NO_BARRIER_SUPP, &mdev->flags);
2702 
2703 	atomic_set(&mdev->ap_bio_cnt, 0);
2704 	atomic_set(&mdev->ap_pending_cnt, 0);
2705 	atomic_set(&mdev->rs_pending_cnt, 0);
2706 	atomic_set(&mdev->unacked_cnt, 0);
2707 	atomic_set(&mdev->local_cnt, 0);
2708 	atomic_set(&mdev->net_cnt, 0);
2709 	atomic_set(&mdev->packet_seq, 0);
2710 	atomic_set(&mdev->pp_in_use, 0);
2711 	atomic_set(&mdev->new_c_uuid, 0);
2712 
2713 	mutex_init(&mdev->md_io_mutex);
2714 	mutex_init(&mdev->data.mutex);
2715 	mutex_init(&mdev->meta.mutex);
2716 	sema_init(&mdev->data.work.s, 0);
2717 	sema_init(&mdev->meta.work.s, 0);
2718 	mutex_init(&mdev->state_mutex);
2719 
2720 	spin_lock_init(&mdev->data.work.q_lock);
2721 	spin_lock_init(&mdev->meta.work.q_lock);
2722 
2723 	spin_lock_init(&mdev->al_lock);
2724 	spin_lock_init(&mdev->req_lock);
2725 	spin_lock_init(&mdev->peer_seq_lock);
2726 	spin_lock_init(&mdev->epoch_lock);
2727 
2728 	INIT_LIST_HEAD(&mdev->active_ee);
2729 	INIT_LIST_HEAD(&mdev->sync_ee);
2730 	INIT_LIST_HEAD(&mdev->done_ee);
2731 	INIT_LIST_HEAD(&mdev->read_ee);
2732 	INIT_LIST_HEAD(&mdev->net_ee);
2733 	INIT_LIST_HEAD(&mdev->resync_reads);
2734 	INIT_LIST_HEAD(&mdev->data.work.q);
2735 	INIT_LIST_HEAD(&mdev->meta.work.q);
2736 	INIT_LIST_HEAD(&mdev->resync_work.list);
2737 	INIT_LIST_HEAD(&mdev->unplug_work.list);
2738 	INIT_LIST_HEAD(&mdev->md_sync_work.list);
2739 	INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2740 	INIT_LIST_HEAD(&mdev->delay_probes);
2741 	INIT_LIST_HEAD(&mdev->delay_probe_work.list);
2742 	INIT_LIST_HEAD(&mdev->uuid_work.list);
2743 
2744 	mdev->resync_work.cb  = w_resync_inactive;
2745 	mdev->unplug_work.cb  = w_send_write_hint;
2746 	mdev->md_sync_work.cb = w_md_sync;
2747 	mdev->bm_io_work.w.cb = w_bitmap_io;
2748 	mdev->delay_probe_work.cb = w_delay_probes;
2749 	mdev->uuid_work.cb = w_new_current_uuid;
2750 	init_timer(&mdev->resync_timer);
2751 	init_timer(&mdev->md_sync_timer);
2752 	init_timer(&mdev->delay_probe_timer);
2753 	mdev->resync_timer.function = resync_timer_fn;
2754 	mdev->resync_timer.data = (unsigned long) mdev;
2755 	mdev->md_sync_timer.function = md_sync_timer_fn;
2756 	mdev->md_sync_timer.data = (unsigned long) mdev;
2757 	mdev->delay_probe_timer.function = delay_probe_timer_fn;
2758 	mdev->delay_probe_timer.data = (unsigned long) mdev;
2759 
2760 
2761 	init_waitqueue_head(&mdev->misc_wait);
2762 	init_waitqueue_head(&mdev->state_wait);
2763 	init_waitqueue_head(&mdev->ee_wait);
2764 	init_waitqueue_head(&mdev->al_wait);
2765 	init_waitqueue_head(&mdev->seq_wait);
2766 
2767 	drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
2768 	drbd_thread_init(mdev, &mdev->worker, drbd_worker);
2769 	drbd_thread_init(mdev, &mdev->asender, drbd_asender);
2770 
2771 	mdev->agreed_pro_version = PRO_VERSION_MAX;
2772 	mdev->write_ordering = WO_bio_barrier;
2773 	mdev->resync_wenr = LC_FREE;
2774 }
2775 
2776 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2777 {
2778 	if (mdev->receiver.t_state != None)
2779 		dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2780 				mdev->receiver.t_state);
2781 
2782 	/* no need to lock it, I'm the only thread alive */
2783 	if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2784 		dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2785 	mdev->al_writ_cnt  =
2786 	mdev->bm_writ_cnt  =
2787 	mdev->read_cnt     =
2788 	mdev->recv_cnt     =
2789 	mdev->send_cnt     =
2790 	mdev->writ_cnt     =
2791 	mdev->p_size       =
2792 	mdev->rs_start     =
2793 	mdev->rs_total     =
2794 	mdev->rs_failed    =
2795 	mdev->rs_mark_left =
2796 	mdev->rs_mark_time = 0;
2797 	D_ASSERT(mdev->net_conf == NULL);
2798 
2799 	drbd_set_my_capacity(mdev, 0);
2800 	if (mdev->bitmap) {
2801 		/* maybe never allocated. */
2802 		drbd_bm_resize(mdev, 0, 1);
2803 		drbd_bm_cleanup(mdev);
2804 	}
2805 
2806 	drbd_free_resources(mdev);
2807 
2808 	/*
2809 	 * currently we drbd_init_ee only on module load, so
2810 	 * we may do drbd_release_ee only on module unload!
2811 	 */
2812 	D_ASSERT(list_empty(&mdev->active_ee));
2813 	D_ASSERT(list_empty(&mdev->sync_ee));
2814 	D_ASSERT(list_empty(&mdev->done_ee));
2815 	D_ASSERT(list_empty(&mdev->read_ee));
2816 	D_ASSERT(list_empty(&mdev->net_ee));
2817 	D_ASSERT(list_empty(&mdev->resync_reads));
2818 	D_ASSERT(list_empty(&mdev->data.work.q));
2819 	D_ASSERT(list_empty(&mdev->meta.work.q));
2820 	D_ASSERT(list_empty(&mdev->resync_work.list));
2821 	D_ASSERT(list_empty(&mdev->unplug_work.list));
2822 
2823 }
2824 
2825 
2826 static void drbd_destroy_mempools(void)
2827 {
2828 	struct page *page;
2829 
2830 	while (drbd_pp_pool) {
2831 		page = drbd_pp_pool;
2832 		drbd_pp_pool = (struct page *)page_private(page);
2833 		__free_page(page);
2834 		drbd_pp_vacant--;
2835 	}
2836 
2837 	/* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2838 
2839 	if (drbd_ee_mempool)
2840 		mempool_destroy(drbd_ee_mempool);
2841 	if (drbd_request_mempool)
2842 		mempool_destroy(drbd_request_mempool);
2843 	if (drbd_ee_cache)
2844 		kmem_cache_destroy(drbd_ee_cache);
2845 	if (drbd_request_cache)
2846 		kmem_cache_destroy(drbd_request_cache);
2847 	if (drbd_bm_ext_cache)
2848 		kmem_cache_destroy(drbd_bm_ext_cache);
2849 	if (drbd_al_ext_cache)
2850 		kmem_cache_destroy(drbd_al_ext_cache);
2851 
2852 	drbd_ee_mempool      = NULL;
2853 	drbd_request_mempool = NULL;
2854 	drbd_ee_cache        = NULL;
2855 	drbd_request_cache   = NULL;
2856 	drbd_bm_ext_cache    = NULL;
2857 	drbd_al_ext_cache    = NULL;
2858 
2859 	return;
2860 }
2861 
2862 static int drbd_create_mempools(void)
2863 {
2864 	struct page *page;
2865 	const int number = (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE) * minor_count;
2866 	int i;
2867 
2868 	/* prepare our caches and mempools */
2869 	drbd_request_mempool = NULL;
2870 	drbd_ee_cache        = NULL;
2871 	drbd_request_cache   = NULL;
2872 	drbd_bm_ext_cache    = NULL;
2873 	drbd_al_ext_cache    = NULL;
2874 	drbd_pp_pool         = NULL;
2875 
2876 	/* caches */
2877 	drbd_request_cache = kmem_cache_create(
2878 		"drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2879 	if (drbd_request_cache == NULL)
2880 		goto Enomem;
2881 
2882 	drbd_ee_cache = kmem_cache_create(
2883 		"drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
2884 	if (drbd_ee_cache == NULL)
2885 		goto Enomem;
2886 
2887 	drbd_bm_ext_cache = kmem_cache_create(
2888 		"drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2889 	if (drbd_bm_ext_cache == NULL)
2890 		goto Enomem;
2891 
2892 	drbd_al_ext_cache = kmem_cache_create(
2893 		"drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2894 	if (drbd_al_ext_cache == NULL)
2895 		goto Enomem;
2896 
2897 	/* mempools */
2898 	drbd_request_mempool = mempool_create(number,
2899 		mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2900 	if (drbd_request_mempool == NULL)
2901 		goto Enomem;
2902 
2903 	drbd_ee_mempool = mempool_create(number,
2904 		mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2905 	if (drbd_request_mempool == NULL)
2906 		goto Enomem;
2907 
2908 	/* drbd's page pool */
2909 	spin_lock_init(&drbd_pp_lock);
2910 
2911 	for (i = 0; i < number; i++) {
2912 		page = alloc_page(GFP_HIGHUSER);
2913 		if (!page)
2914 			goto Enomem;
2915 		set_page_private(page, (unsigned long)drbd_pp_pool);
2916 		drbd_pp_pool = page;
2917 	}
2918 	drbd_pp_vacant = number;
2919 
2920 	return 0;
2921 
2922 Enomem:
2923 	drbd_destroy_mempools(); /* in case we allocated some */
2924 	return -ENOMEM;
2925 }
2926 
2927 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2928 	void *unused)
2929 {
2930 	/* just so we have it.  you never know what interesting things we
2931 	 * might want to do here some day...
2932 	 */
2933 
2934 	return NOTIFY_DONE;
2935 }
2936 
2937 static struct notifier_block drbd_notifier = {
2938 	.notifier_call = drbd_notify_sys,
2939 };
2940 
2941 static void drbd_release_ee_lists(struct drbd_conf *mdev)
2942 {
2943 	int rr;
2944 
2945 	rr = drbd_release_ee(mdev, &mdev->active_ee);
2946 	if (rr)
2947 		dev_err(DEV, "%d EEs in active list found!\n", rr);
2948 
2949 	rr = drbd_release_ee(mdev, &mdev->sync_ee);
2950 	if (rr)
2951 		dev_err(DEV, "%d EEs in sync list found!\n", rr);
2952 
2953 	rr = drbd_release_ee(mdev, &mdev->read_ee);
2954 	if (rr)
2955 		dev_err(DEV, "%d EEs in read list found!\n", rr);
2956 
2957 	rr = drbd_release_ee(mdev, &mdev->done_ee);
2958 	if (rr)
2959 		dev_err(DEV, "%d EEs in done list found!\n", rr);
2960 
2961 	rr = drbd_release_ee(mdev, &mdev->net_ee);
2962 	if (rr)
2963 		dev_err(DEV, "%d EEs in net list found!\n", rr);
2964 }
2965 
2966 /* caution. no locking.
2967  * currently only used from module cleanup code. */
2968 static void drbd_delete_device(unsigned int minor)
2969 {
2970 	struct drbd_conf *mdev = minor_to_mdev(minor);
2971 
2972 	if (!mdev)
2973 		return;
2974 
2975 	/* paranoia asserts */
2976 	if (mdev->open_cnt != 0)
2977 		dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
2978 				__FILE__ , __LINE__);
2979 
2980 	ERR_IF (!list_empty(&mdev->data.work.q)) {
2981 		struct list_head *lp;
2982 		list_for_each(lp, &mdev->data.work.q) {
2983 			dev_err(DEV, "lp = %p\n", lp);
2984 		}
2985 	};
2986 	/* end paranoia asserts */
2987 
2988 	del_gendisk(mdev->vdisk);
2989 
2990 	/* cleanup stuff that may have been allocated during
2991 	 * device (re-)configuration or state changes */
2992 
2993 	if (mdev->this_bdev)
2994 		bdput(mdev->this_bdev);
2995 
2996 	drbd_free_resources(mdev);
2997 
2998 	drbd_release_ee_lists(mdev);
2999 
3000 	/* should be free'd on disconnect? */
3001 	kfree(mdev->ee_hash);
3002 	/*
3003 	mdev->ee_hash_s = 0;
3004 	mdev->ee_hash = NULL;
3005 	*/
3006 
3007 	lc_destroy(mdev->act_log);
3008 	lc_destroy(mdev->resync);
3009 
3010 	kfree(mdev->p_uuid);
3011 	/* mdev->p_uuid = NULL; */
3012 
3013 	kfree(mdev->int_dig_out);
3014 	kfree(mdev->int_dig_in);
3015 	kfree(mdev->int_dig_vv);
3016 
3017 	/* cleanup the rest that has been
3018 	 * allocated from drbd_new_device
3019 	 * and actually free the mdev itself */
3020 	drbd_free_mdev(mdev);
3021 }
3022 
3023 static void drbd_cleanup(void)
3024 {
3025 	unsigned int i;
3026 
3027 	unregister_reboot_notifier(&drbd_notifier);
3028 
3029 	drbd_nl_cleanup();
3030 
3031 	if (minor_table) {
3032 		if (drbd_proc)
3033 			remove_proc_entry("drbd", NULL);
3034 		i = minor_count;
3035 		while (i--)
3036 			drbd_delete_device(i);
3037 		drbd_destroy_mempools();
3038 	}
3039 
3040 	kfree(minor_table);
3041 
3042 	unregister_blkdev(DRBD_MAJOR, "drbd");
3043 
3044 	printk(KERN_INFO "drbd: module cleanup done.\n");
3045 }
3046 
3047 /**
3048  * drbd_congested() - Callback for pdflush
3049  * @congested_data:	User data
3050  * @bdi_bits:		Bits pdflush is currently interested in
3051  *
3052  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
3053  */
3054 static int drbd_congested(void *congested_data, int bdi_bits)
3055 {
3056 	struct drbd_conf *mdev = congested_data;
3057 	struct request_queue *q;
3058 	char reason = '-';
3059 	int r = 0;
3060 
3061 	if (!__inc_ap_bio_cond(mdev)) {
3062 		/* DRBD has frozen IO */
3063 		r = bdi_bits;
3064 		reason = 'd';
3065 		goto out;
3066 	}
3067 
3068 	if (get_ldev(mdev)) {
3069 		q = bdev_get_queue(mdev->ldev->backing_bdev);
3070 		r = bdi_congested(&q->backing_dev_info, bdi_bits);
3071 		put_ldev(mdev);
3072 		if (r)
3073 			reason = 'b';
3074 	}
3075 
3076 	if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
3077 		r |= (1 << BDI_async_congested);
3078 		reason = reason == 'b' ? 'a' : 'n';
3079 	}
3080 
3081 out:
3082 	mdev->congestion_reason = reason;
3083 	return r;
3084 }
3085 
3086 struct drbd_conf *drbd_new_device(unsigned int minor)
3087 {
3088 	struct drbd_conf *mdev;
3089 	struct gendisk *disk;
3090 	struct request_queue *q;
3091 
3092 	/* GFP_KERNEL, we are outside of all write-out paths */
3093 	mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
3094 	if (!mdev)
3095 		return NULL;
3096 	if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
3097 		goto out_no_cpumask;
3098 
3099 	mdev->minor = minor;
3100 
3101 	drbd_init_set_defaults(mdev);
3102 
3103 	q = blk_alloc_queue(GFP_KERNEL);
3104 	if (!q)
3105 		goto out_no_q;
3106 	mdev->rq_queue = q;
3107 	q->queuedata   = mdev;
3108 
3109 	disk = alloc_disk(1);
3110 	if (!disk)
3111 		goto out_no_disk;
3112 	mdev->vdisk = disk;
3113 
3114 	set_disk_ro(disk, TRUE);
3115 
3116 	disk->queue = q;
3117 	disk->major = DRBD_MAJOR;
3118 	disk->first_minor = minor;
3119 	disk->fops = &drbd_ops;
3120 	sprintf(disk->disk_name, "drbd%d", minor);
3121 	disk->private_data = mdev;
3122 
3123 	mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3124 	/* we have no partitions. we contain only ourselves. */
3125 	mdev->this_bdev->bd_contains = mdev->this_bdev;
3126 
3127 	q->backing_dev_info.congested_fn = drbd_congested;
3128 	q->backing_dev_info.congested_data = mdev;
3129 
3130 	blk_queue_make_request(q, drbd_make_request_26);
3131 	blk_queue_max_segment_size(q, DRBD_MAX_SEGMENT_SIZE);
3132 	blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3133 	blk_queue_merge_bvec(q, drbd_merge_bvec);
3134 	q->queue_lock = &mdev->req_lock; /* needed since we use */
3135 		/* plugging on a queue, that actually has no requests! */
3136 	q->unplug_fn = drbd_unplug_fn;
3137 
3138 	mdev->md_io_page = alloc_page(GFP_KERNEL);
3139 	if (!mdev->md_io_page)
3140 		goto out_no_io_page;
3141 
3142 	if (drbd_bm_init(mdev))
3143 		goto out_no_bitmap;
3144 	/* no need to lock access, we are still initializing this minor device. */
3145 	if (!tl_init(mdev))
3146 		goto out_no_tl;
3147 
3148 	mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3149 	if (!mdev->app_reads_hash)
3150 		goto out_no_app_reads;
3151 
3152 	mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3153 	if (!mdev->current_epoch)
3154 		goto out_no_epoch;
3155 
3156 	INIT_LIST_HEAD(&mdev->current_epoch->list);
3157 	mdev->epochs = 1;
3158 
3159 	return mdev;
3160 
3161 /* out_whatever_else:
3162 	kfree(mdev->current_epoch); */
3163 out_no_epoch:
3164 	kfree(mdev->app_reads_hash);
3165 out_no_app_reads:
3166 	tl_cleanup(mdev);
3167 out_no_tl:
3168 	drbd_bm_cleanup(mdev);
3169 out_no_bitmap:
3170 	__free_page(mdev->md_io_page);
3171 out_no_io_page:
3172 	put_disk(disk);
3173 out_no_disk:
3174 	blk_cleanup_queue(q);
3175 out_no_q:
3176 	free_cpumask_var(mdev->cpu_mask);
3177 out_no_cpumask:
3178 	kfree(mdev);
3179 	return NULL;
3180 }
3181 
3182 /* counterpart of drbd_new_device.
3183  * last part of drbd_delete_device. */
3184 void drbd_free_mdev(struct drbd_conf *mdev)
3185 {
3186 	kfree(mdev->current_epoch);
3187 	kfree(mdev->app_reads_hash);
3188 	tl_cleanup(mdev);
3189 	if (mdev->bitmap) /* should no longer be there. */
3190 		drbd_bm_cleanup(mdev);
3191 	__free_page(mdev->md_io_page);
3192 	put_disk(mdev->vdisk);
3193 	blk_cleanup_queue(mdev->rq_queue);
3194 	free_cpumask_var(mdev->cpu_mask);
3195 	kfree(mdev);
3196 }
3197 
3198 
3199 int __init drbd_init(void)
3200 {
3201 	int err;
3202 
3203 	if (sizeof(struct p_handshake) != 80) {
3204 		printk(KERN_ERR
3205 		       "drbd: never change the size or layout "
3206 		       "of the HandShake packet.\n");
3207 		return -EINVAL;
3208 	}
3209 
3210 	if (1 > minor_count || minor_count > 255) {
3211 		printk(KERN_ERR
3212 			"drbd: invalid minor_count (%d)\n", minor_count);
3213 #ifdef MODULE
3214 		return -EINVAL;
3215 #else
3216 		minor_count = 8;
3217 #endif
3218 	}
3219 
3220 	err = drbd_nl_init();
3221 	if (err)
3222 		return err;
3223 
3224 	err = register_blkdev(DRBD_MAJOR, "drbd");
3225 	if (err) {
3226 		printk(KERN_ERR
3227 		       "drbd: unable to register block device major %d\n",
3228 		       DRBD_MAJOR);
3229 		return err;
3230 	}
3231 
3232 	register_reboot_notifier(&drbd_notifier);
3233 
3234 	/*
3235 	 * allocate all necessary structs
3236 	 */
3237 	err = -ENOMEM;
3238 
3239 	init_waitqueue_head(&drbd_pp_wait);
3240 
3241 	drbd_proc = NULL; /* play safe for drbd_cleanup */
3242 	minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3243 				GFP_KERNEL);
3244 	if (!minor_table)
3245 		goto Enomem;
3246 
3247 	err = drbd_create_mempools();
3248 	if (err)
3249 		goto Enomem;
3250 
3251 	drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
3252 	if (!drbd_proc)	{
3253 		printk(KERN_ERR "drbd: unable to register proc file\n");
3254 		goto Enomem;
3255 	}
3256 
3257 	rwlock_init(&global_state_lock);
3258 
3259 	printk(KERN_INFO "drbd: initialized. "
3260 	       "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3261 	       API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3262 	printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3263 	printk(KERN_INFO "drbd: registered as block device major %d\n",
3264 		DRBD_MAJOR);
3265 	printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3266 
3267 	return 0; /* Success! */
3268 
3269 Enomem:
3270 	drbd_cleanup();
3271 	if (err == -ENOMEM)
3272 		/* currently always the case */
3273 		printk(KERN_ERR "drbd: ran out of memory\n");
3274 	else
3275 		printk(KERN_ERR "drbd: initialization failure\n");
3276 	return err;
3277 }
3278 
3279 void drbd_free_bc(struct drbd_backing_dev *ldev)
3280 {
3281 	if (ldev == NULL)
3282 		return;
3283 
3284 	bd_release(ldev->backing_bdev);
3285 	bd_release(ldev->md_bdev);
3286 
3287 	fput(ldev->lo_file);
3288 	fput(ldev->md_file);
3289 
3290 	kfree(ldev);
3291 }
3292 
3293 void drbd_free_sock(struct drbd_conf *mdev)
3294 {
3295 	if (mdev->data.socket) {
3296 		mutex_lock(&mdev->data.mutex);
3297 		kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3298 		sock_release(mdev->data.socket);
3299 		mdev->data.socket = NULL;
3300 		mutex_unlock(&mdev->data.mutex);
3301 	}
3302 	if (mdev->meta.socket) {
3303 		mutex_lock(&mdev->meta.mutex);
3304 		kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3305 		sock_release(mdev->meta.socket);
3306 		mdev->meta.socket = NULL;
3307 		mutex_unlock(&mdev->meta.mutex);
3308 	}
3309 }
3310 
3311 
3312 void drbd_free_resources(struct drbd_conf *mdev)
3313 {
3314 	crypto_free_hash(mdev->csums_tfm);
3315 	mdev->csums_tfm = NULL;
3316 	crypto_free_hash(mdev->verify_tfm);
3317 	mdev->verify_tfm = NULL;
3318 	crypto_free_hash(mdev->cram_hmac_tfm);
3319 	mdev->cram_hmac_tfm = NULL;
3320 	crypto_free_hash(mdev->integrity_w_tfm);
3321 	mdev->integrity_w_tfm = NULL;
3322 	crypto_free_hash(mdev->integrity_r_tfm);
3323 	mdev->integrity_r_tfm = NULL;
3324 
3325 	drbd_free_sock(mdev);
3326 
3327 	__no_warn(local,
3328 		  drbd_free_bc(mdev->ldev);
3329 		  mdev->ldev = NULL;);
3330 }
3331 
3332 /* meta data management */
3333 
3334 struct meta_data_on_disk {
3335 	u64 la_size;           /* last agreed size. */
3336 	u64 uuid[UI_SIZE];   /* UUIDs. */
3337 	u64 device_uuid;
3338 	u64 reserved_u64_1;
3339 	u32 flags;             /* MDF */
3340 	u32 magic;
3341 	u32 md_size_sect;
3342 	u32 al_offset;         /* offset to this block */
3343 	u32 al_nr_extents;     /* important for restoring the AL */
3344 	      /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3345 	u32 bm_offset;         /* offset to the bitmap, from here */
3346 	u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3347 	u32 reserved_u32[4];
3348 
3349 } __packed;
3350 
3351 /**
3352  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3353  * @mdev:	DRBD device.
3354  */
3355 void drbd_md_sync(struct drbd_conf *mdev)
3356 {
3357 	struct meta_data_on_disk *buffer;
3358 	sector_t sector;
3359 	int i;
3360 
3361 	if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3362 		return;
3363 	del_timer(&mdev->md_sync_timer);
3364 
3365 	/* We use here D_FAILED and not D_ATTACHING because we try to write
3366 	 * metadata even if we detach due to a disk failure! */
3367 	if (!get_ldev_if_state(mdev, D_FAILED))
3368 		return;
3369 
3370 	mutex_lock(&mdev->md_io_mutex);
3371 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3372 	memset(buffer, 0, 512);
3373 
3374 	buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3375 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3376 		buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3377 	buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3378 	buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3379 
3380 	buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3381 	buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3382 	buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3383 	buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3384 	buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3385 
3386 	buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3387 
3388 	D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3389 	sector = mdev->ldev->md.md_offset;
3390 
3391 	if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3392 		clear_bit(MD_DIRTY, &mdev->flags);
3393 	} else {
3394 		/* this was a try anyways ... */
3395 		dev_err(DEV, "meta data update failed!\n");
3396 
3397 		drbd_chk_io_error(mdev, 1, TRUE);
3398 	}
3399 
3400 	/* Update mdev->ldev->md.la_size_sect,
3401 	 * since we updated it on metadata. */
3402 	mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3403 
3404 	mutex_unlock(&mdev->md_io_mutex);
3405 	put_ldev(mdev);
3406 }
3407 
3408 /**
3409  * drbd_md_read() - Reads in the meta data super block
3410  * @mdev:	DRBD device.
3411  * @bdev:	Device from which the meta data should be read in.
3412  *
3413  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_codes in case
3414  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3415  */
3416 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3417 {
3418 	struct meta_data_on_disk *buffer;
3419 	int i, rv = NO_ERROR;
3420 
3421 	if (!get_ldev_if_state(mdev, D_ATTACHING))
3422 		return ERR_IO_MD_DISK;
3423 
3424 	mutex_lock(&mdev->md_io_mutex);
3425 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3426 
3427 	if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3428 		/* NOTE: cant do normal error processing here as this is
3429 		   called BEFORE disk is attached */
3430 		dev_err(DEV, "Error while reading metadata.\n");
3431 		rv = ERR_IO_MD_DISK;
3432 		goto err;
3433 	}
3434 
3435 	if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3436 		dev_err(DEV, "Error while reading metadata, magic not found.\n");
3437 		rv = ERR_MD_INVALID;
3438 		goto err;
3439 	}
3440 	if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3441 		dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3442 		    be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3443 		rv = ERR_MD_INVALID;
3444 		goto err;
3445 	}
3446 	if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3447 		dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3448 		    be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3449 		rv = ERR_MD_INVALID;
3450 		goto err;
3451 	}
3452 	if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3453 		dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3454 		    be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3455 		rv = ERR_MD_INVALID;
3456 		goto err;
3457 	}
3458 
3459 	if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3460 		dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3461 		    be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3462 		rv = ERR_MD_INVALID;
3463 		goto err;
3464 	}
3465 
3466 	bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3467 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3468 		bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3469 	bdev->md.flags = be32_to_cpu(buffer->flags);
3470 	mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3471 	bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3472 
3473 	if (mdev->sync_conf.al_extents < 7)
3474 		mdev->sync_conf.al_extents = 127;
3475 
3476  err:
3477 	mutex_unlock(&mdev->md_io_mutex);
3478 	put_ldev(mdev);
3479 
3480 	return rv;
3481 }
3482 
3483 /**
3484  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3485  * @mdev:	DRBD device.
3486  *
3487  * Call this function if you change anything that should be written to
3488  * the meta-data super block. This function sets MD_DIRTY, and starts a
3489  * timer that ensures that within five seconds you have to call drbd_md_sync().
3490  */
3491 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3492 {
3493 	set_bit(MD_DIRTY, &mdev->flags);
3494 	mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3495 }
3496 
3497 
3498 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3499 {
3500 	int i;
3501 
3502 	for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3503 		mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3504 }
3505 
3506 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3507 {
3508 	if (idx == UI_CURRENT) {
3509 		if (mdev->state.role == R_PRIMARY)
3510 			val |= 1;
3511 		else
3512 			val &= ~((u64)1);
3513 
3514 		drbd_set_ed_uuid(mdev, val);
3515 	}
3516 
3517 	mdev->ldev->md.uuid[idx] = val;
3518 	drbd_md_mark_dirty(mdev);
3519 }
3520 
3521 
3522 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3523 {
3524 	if (mdev->ldev->md.uuid[idx]) {
3525 		drbd_uuid_move_history(mdev);
3526 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3527 	}
3528 	_drbd_uuid_set(mdev, idx, val);
3529 }
3530 
3531 /**
3532  * drbd_uuid_new_current() - Creates a new current UUID
3533  * @mdev:	DRBD device.
3534  *
3535  * Creates a new current UUID, and rotates the old current UUID into
3536  * the bitmap slot. Causes an incremental resync upon next connect.
3537  */
3538 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3539 {
3540 	u64 val;
3541 
3542 	dev_info(DEV, "Creating new current UUID\n");
3543 	D_ASSERT(mdev->ldev->md.uuid[UI_BITMAP] == 0);
3544 	mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3545 
3546 	get_random_bytes(&val, sizeof(u64));
3547 	_drbd_uuid_set(mdev, UI_CURRENT, val);
3548 }
3549 
3550 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3551 {
3552 	if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3553 		return;
3554 
3555 	if (val == 0) {
3556 		drbd_uuid_move_history(mdev);
3557 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3558 		mdev->ldev->md.uuid[UI_BITMAP] = 0;
3559 	} else {
3560 		if (mdev->ldev->md.uuid[UI_BITMAP])
3561 			dev_warn(DEV, "bm UUID already set");
3562 
3563 		mdev->ldev->md.uuid[UI_BITMAP] = val;
3564 		mdev->ldev->md.uuid[UI_BITMAP] &= ~((u64)1);
3565 
3566 	}
3567 	drbd_md_mark_dirty(mdev);
3568 }
3569 
3570 /**
3571  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3572  * @mdev:	DRBD device.
3573  *
3574  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3575  */
3576 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3577 {
3578 	int rv = -EIO;
3579 
3580 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
3581 		drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3582 		drbd_md_sync(mdev);
3583 		drbd_bm_set_all(mdev);
3584 
3585 		rv = drbd_bm_write(mdev);
3586 
3587 		if (!rv) {
3588 			drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3589 			drbd_md_sync(mdev);
3590 		}
3591 
3592 		put_ldev(mdev);
3593 	}
3594 
3595 	return rv;
3596 }
3597 
3598 /**
3599  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3600  * @mdev:	DRBD device.
3601  *
3602  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3603  */
3604 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3605 {
3606 	int rv = -EIO;
3607 
3608 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
3609 		drbd_bm_clear_all(mdev);
3610 		rv = drbd_bm_write(mdev);
3611 		put_ldev(mdev);
3612 	}
3613 
3614 	return rv;
3615 }
3616 
3617 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3618 {
3619 	struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3620 	int rv;
3621 
3622 	D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3623 
3624 	drbd_bm_lock(mdev, work->why);
3625 	rv = work->io_fn(mdev);
3626 	drbd_bm_unlock(mdev);
3627 
3628 	clear_bit(BITMAP_IO, &mdev->flags);
3629 	wake_up(&mdev->misc_wait);
3630 
3631 	if (work->done)
3632 		work->done(mdev, rv);
3633 
3634 	clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3635 	work->why = NULL;
3636 
3637 	return 1;
3638 }
3639 
3640 /**
3641  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3642  * @mdev:	DRBD device.
3643  * @io_fn:	IO callback to be called when bitmap IO is possible
3644  * @done:	callback to be called after the bitmap IO was performed
3645  * @why:	Descriptive text of the reason for doing the IO
3646  *
3647  * While IO on the bitmap happens we freeze application IO thus we ensure
3648  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3649  * called from worker context. It MUST NOT be used while a previous such
3650  * work is still pending!
3651  */
3652 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3653 			  int (*io_fn)(struct drbd_conf *),
3654 			  void (*done)(struct drbd_conf *, int),
3655 			  char *why)
3656 {
3657 	D_ASSERT(current == mdev->worker.task);
3658 
3659 	D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3660 	D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3661 	D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3662 	if (mdev->bm_io_work.why)
3663 		dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3664 			why, mdev->bm_io_work.why);
3665 
3666 	mdev->bm_io_work.io_fn = io_fn;
3667 	mdev->bm_io_work.done = done;
3668 	mdev->bm_io_work.why = why;
3669 
3670 	set_bit(BITMAP_IO, &mdev->flags);
3671 	if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3672 		if (list_empty(&mdev->bm_io_work.w.list)) {
3673 			set_bit(BITMAP_IO_QUEUED, &mdev->flags);
3674 			drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
3675 		} else
3676 			dev_err(DEV, "FIXME avoided double queuing bm_io_work\n");
3677 	}
3678 }
3679 
3680 /**
3681  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3682  * @mdev:	DRBD device.
3683  * @io_fn:	IO callback to be called when bitmap IO is possible
3684  * @why:	Descriptive text of the reason for doing the IO
3685  *
3686  * freezes application IO while that the actual IO operations runs. This
3687  * functions MAY NOT be called from worker context.
3688  */
3689 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why)
3690 {
3691 	int rv;
3692 
3693 	D_ASSERT(current != mdev->worker.task);
3694 
3695 	drbd_suspend_io(mdev);
3696 
3697 	drbd_bm_lock(mdev, why);
3698 	rv = io_fn(mdev);
3699 	drbd_bm_unlock(mdev);
3700 
3701 	drbd_resume_io(mdev);
3702 
3703 	return rv;
3704 }
3705 
3706 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3707 {
3708 	if ((mdev->ldev->md.flags & flag) != flag) {
3709 		drbd_md_mark_dirty(mdev);
3710 		mdev->ldev->md.flags |= flag;
3711 	}
3712 }
3713 
3714 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3715 {
3716 	if ((mdev->ldev->md.flags & flag) != 0) {
3717 		drbd_md_mark_dirty(mdev);
3718 		mdev->ldev->md.flags &= ~flag;
3719 	}
3720 }
3721 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3722 {
3723 	return (bdev->md.flags & flag) != 0;
3724 }
3725 
3726 static void md_sync_timer_fn(unsigned long data)
3727 {
3728 	struct drbd_conf *mdev = (struct drbd_conf *) data;
3729 
3730 	drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
3731 }
3732 
3733 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3734 {
3735 	dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3736 	drbd_md_sync(mdev);
3737 
3738 	return 1;
3739 }
3740 
3741 #ifdef CONFIG_DRBD_FAULT_INJECTION
3742 /* Fault insertion support including random number generator shamelessly
3743  * stolen from kernel/rcutorture.c */
3744 struct fault_random_state {
3745 	unsigned long state;
3746 	unsigned long count;
3747 };
3748 
3749 #define FAULT_RANDOM_MULT 39916801  /* prime */
3750 #define FAULT_RANDOM_ADD	479001701 /* prime */
3751 #define FAULT_RANDOM_REFRESH 10000
3752 
3753 /*
3754  * Crude but fast random-number generator.  Uses a linear congruential
3755  * generator, with occasional help from get_random_bytes().
3756  */
3757 static unsigned long
3758 _drbd_fault_random(struct fault_random_state *rsp)
3759 {
3760 	long refresh;
3761 
3762 	if (!rsp->count--) {
3763 		get_random_bytes(&refresh, sizeof(refresh));
3764 		rsp->state += refresh;
3765 		rsp->count = FAULT_RANDOM_REFRESH;
3766 	}
3767 	rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3768 	return swahw32(rsp->state);
3769 }
3770 
3771 static char *
3772 _drbd_fault_str(unsigned int type) {
3773 	static char *_faults[] = {
3774 		[DRBD_FAULT_MD_WR] = "Meta-data write",
3775 		[DRBD_FAULT_MD_RD] = "Meta-data read",
3776 		[DRBD_FAULT_RS_WR] = "Resync write",
3777 		[DRBD_FAULT_RS_RD] = "Resync read",
3778 		[DRBD_FAULT_DT_WR] = "Data write",
3779 		[DRBD_FAULT_DT_RD] = "Data read",
3780 		[DRBD_FAULT_DT_RA] = "Data read ahead",
3781 		[DRBD_FAULT_BM_ALLOC] = "BM allocation",
3782 		[DRBD_FAULT_AL_EE] = "EE allocation",
3783 		[DRBD_FAULT_RECEIVE] = "receive data corruption",
3784 	};
3785 
3786 	return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3787 }
3788 
3789 unsigned int
3790 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3791 {
3792 	static struct fault_random_state rrs = {0, 0};
3793 
3794 	unsigned int ret = (
3795 		(fault_devs == 0 ||
3796 			((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3797 		(((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3798 
3799 	if (ret) {
3800 		fault_count++;
3801 
3802 		if (printk_ratelimit())
3803 			dev_warn(DEV, "***Simulating %s failure\n",
3804 				_drbd_fault_str(type));
3805 	}
3806 
3807 	return ret;
3808 }
3809 #endif
3810 
3811 const char *drbd_buildtag(void)
3812 {
3813 	/* DRBD built from external sources has here a reference to the
3814 	   git hash of the source code. */
3815 
3816 	static char buildtag[38] = "\0uilt-in";
3817 
3818 	if (buildtag[0] == 0) {
3819 #ifdef CONFIG_MODULES
3820 		if (THIS_MODULE != NULL)
3821 			sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3822 		else
3823 #endif
3824 			buildtag[0] = 'b';
3825 	}
3826 
3827 	return buildtag;
3828 }
3829 
3830 module_init(drbd_init)
3831 module_exit(drbd_cleanup)
3832 
3833 EXPORT_SYMBOL(drbd_conn_str);
3834 EXPORT_SYMBOL(drbd_role_str);
3835 EXPORT_SYMBOL(drbd_disk_str);
3836 EXPORT_SYMBOL(drbd_set_st_err_str);
3837