xref: /linux/drivers/block/drbd/drbd_main.c (revision 092e0e7e520a1fca03e13c9f2d157432a8657ff2)
1 /*
2    drbd.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12 
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17 
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22 
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 
27  */
28 
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48 
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52 
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56 
57 #include "drbd_vli.h"
58 
59 struct after_state_chg_work {
60 	struct drbd_work w;
61 	union drbd_state os;
62 	union drbd_state ns;
63 	enum chg_state_flags flags;
64 	struct completion *done;
65 };
66 
67 static DEFINE_MUTEX(drbd_main_mutex);
68 int drbdd_init(struct drbd_thread *);
69 int drbd_worker(struct drbd_thread *);
70 int drbd_asender(struct drbd_thread *);
71 
72 int drbd_init(void);
73 static int drbd_open(struct block_device *bdev, fmode_t mode);
74 static int drbd_release(struct gendisk *gd, fmode_t mode);
75 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
76 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
77 			   union drbd_state ns, enum chg_state_flags flags);
78 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
79 static void md_sync_timer_fn(unsigned long data);
80 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
81 
82 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
83 	      "Lars Ellenberg <lars@linbit.com>");
84 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
85 MODULE_VERSION(REL_VERSION);
86 MODULE_LICENSE("GPL");
87 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices (1-255)");
88 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
89 
90 #include <linux/moduleparam.h>
91 /* allow_open_on_secondary */
92 MODULE_PARM_DESC(allow_oos, "DONT USE!");
93 /* thanks to these macros, if compiled into the kernel (not-module),
94  * this becomes the boot parameter drbd.minor_count */
95 module_param(minor_count, uint, 0444);
96 module_param(disable_sendpage, bool, 0644);
97 module_param(allow_oos, bool, 0);
98 module_param(cn_idx, uint, 0444);
99 module_param(proc_details, int, 0644);
100 
101 #ifdef CONFIG_DRBD_FAULT_INJECTION
102 int enable_faults;
103 int fault_rate;
104 static int fault_count;
105 int fault_devs;
106 /* bitmap of enabled faults */
107 module_param(enable_faults, int, 0664);
108 /* fault rate % value - applies to all enabled faults */
109 module_param(fault_rate, int, 0664);
110 /* count of faults inserted */
111 module_param(fault_count, int, 0664);
112 /* bitmap of devices to insert faults on */
113 module_param(fault_devs, int, 0644);
114 #endif
115 
116 /* module parameter, defined */
117 unsigned int minor_count = 32;
118 int disable_sendpage;
119 int allow_oos;
120 unsigned int cn_idx = CN_IDX_DRBD;
121 int proc_details;       /* Detail level in proc drbd*/
122 
123 /* Module parameter for setting the user mode helper program
124  * to run. Default is /sbin/drbdadm */
125 char usermode_helper[80] = "/sbin/drbdadm";
126 
127 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
128 
129 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
130  * as member "struct gendisk *vdisk;"
131  */
132 struct drbd_conf **minor_table;
133 
134 struct kmem_cache *drbd_request_cache;
135 struct kmem_cache *drbd_ee_cache;	/* epoch entries */
136 struct kmem_cache *drbd_bm_ext_cache;	/* bitmap extents */
137 struct kmem_cache *drbd_al_ext_cache;	/* activity log extents */
138 mempool_t *drbd_request_mempool;
139 mempool_t *drbd_ee_mempool;
140 
141 /* I do not use a standard mempool, because:
142    1) I want to hand out the pre-allocated objects first.
143    2) I want to be able to interrupt sleeping allocation with a signal.
144    Note: This is a single linked list, the next pointer is the private
145 	 member of struct page.
146  */
147 struct page *drbd_pp_pool;
148 spinlock_t   drbd_pp_lock;
149 int          drbd_pp_vacant;
150 wait_queue_head_t drbd_pp_wait;
151 
152 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
153 
154 static const struct block_device_operations drbd_ops = {
155 	.owner =   THIS_MODULE,
156 	.open =    drbd_open,
157 	.release = drbd_release,
158 };
159 
160 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
161 
162 #ifdef __CHECKER__
163 /* When checking with sparse, and this is an inline function, sparse will
164    give tons of false positives. When this is a real functions sparse works.
165  */
166 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
167 {
168 	int io_allowed;
169 
170 	atomic_inc(&mdev->local_cnt);
171 	io_allowed = (mdev->state.disk >= mins);
172 	if (!io_allowed) {
173 		if (atomic_dec_and_test(&mdev->local_cnt))
174 			wake_up(&mdev->misc_wait);
175 	}
176 	return io_allowed;
177 }
178 
179 #endif
180 
181 /**
182  * DOC: The transfer log
183  *
184  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
185  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
186  * of the list. There is always at least one &struct drbd_tl_epoch object.
187  *
188  * Each &struct drbd_tl_epoch has a circular double linked list of requests
189  * attached.
190  */
191 static int tl_init(struct drbd_conf *mdev)
192 {
193 	struct drbd_tl_epoch *b;
194 
195 	/* during device minor initialization, we may well use GFP_KERNEL */
196 	b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
197 	if (!b)
198 		return 0;
199 	INIT_LIST_HEAD(&b->requests);
200 	INIT_LIST_HEAD(&b->w.list);
201 	b->next = NULL;
202 	b->br_number = 4711;
203 	b->n_req = 0;
204 	b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
205 
206 	mdev->oldest_tle = b;
207 	mdev->newest_tle = b;
208 	INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
209 
210 	mdev->tl_hash = NULL;
211 	mdev->tl_hash_s = 0;
212 
213 	return 1;
214 }
215 
216 static void tl_cleanup(struct drbd_conf *mdev)
217 {
218 	D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
219 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
220 	kfree(mdev->oldest_tle);
221 	mdev->oldest_tle = NULL;
222 	kfree(mdev->unused_spare_tle);
223 	mdev->unused_spare_tle = NULL;
224 	kfree(mdev->tl_hash);
225 	mdev->tl_hash = NULL;
226 	mdev->tl_hash_s = 0;
227 }
228 
229 /**
230  * _tl_add_barrier() - Adds a barrier to the transfer log
231  * @mdev:	DRBD device.
232  * @new:	Barrier to be added before the current head of the TL.
233  *
234  * The caller must hold the req_lock.
235  */
236 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
237 {
238 	struct drbd_tl_epoch *newest_before;
239 
240 	INIT_LIST_HEAD(&new->requests);
241 	INIT_LIST_HEAD(&new->w.list);
242 	new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
243 	new->next = NULL;
244 	new->n_req = 0;
245 
246 	newest_before = mdev->newest_tle;
247 	/* never send a barrier number == 0, because that is special-cased
248 	 * when using TCQ for our write ordering code */
249 	new->br_number = (newest_before->br_number+1) ?: 1;
250 	if (mdev->newest_tle != new) {
251 		mdev->newest_tle->next = new;
252 		mdev->newest_tle = new;
253 	}
254 }
255 
256 /**
257  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
258  * @mdev:	DRBD device.
259  * @barrier_nr:	Expected identifier of the DRBD write barrier packet.
260  * @set_size:	Expected number of requests before that barrier.
261  *
262  * In case the passed barrier_nr or set_size does not match the oldest
263  * &struct drbd_tl_epoch objects this function will cause a termination
264  * of the connection.
265  */
266 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
267 		       unsigned int set_size)
268 {
269 	struct drbd_tl_epoch *b, *nob; /* next old barrier */
270 	struct list_head *le, *tle;
271 	struct drbd_request *r;
272 
273 	spin_lock_irq(&mdev->req_lock);
274 
275 	b = mdev->oldest_tle;
276 
277 	/* first some paranoia code */
278 	if (b == NULL) {
279 		dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
280 			barrier_nr);
281 		goto bail;
282 	}
283 	if (b->br_number != barrier_nr) {
284 		dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
285 			barrier_nr, b->br_number);
286 		goto bail;
287 	}
288 	if (b->n_req != set_size) {
289 		dev_err(DEV, "BAD! BarrierAck #%u received with n_req=%u, expected n_req=%u!\n",
290 			barrier_nr, set_size, b->n_req);
291 		goto bail;
292 	}
293 
294 	/* Clean up list of requests processed during current epoch */
295 	list_for_each_safe(le, tle, &b->requests) {
296 		r = list_entry(le, struct drbd_request, tl_requests);
297 		_req_mod(r, barrier_acked);
298 	}
299 	/* There could be requests on the list waiting for completion
300 	   of the write to the local disk. To avoid corruptions of
301 	   slab's data structures we have to remove the lists head.
302 
303 	   Also there could have been a barrier ack out of sequence, overtaking
304 	   the write acks - which would be a bug and violating write ordering.
305 	   To not deadlock in case we lose connection while such requests are
306 	   still pending, we need some way to find them for the
307 	   _req_mode(connection_lost_while_pending).
308 
309 	   These have been list_move'd to the out_of_sequence_requests list in
310 	   _req_mod(, barrier_acked) above.
311 	   */
312 	list_del_init(&b->requests);
313 
314 	nob = b->next;
315 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
316 		_tl_add_barrier(mdev, b);
317 		if (nob)
318 			mdev->oldest_tle = nob;
319 		/* if nob == NULL b was the only barrier, and becomes the new
320 		   barrier. Therefore mdev->oldest_tle points already to b */
321 	} else {
322 		D_ASSERT(nob != NULL);
323 		mdev->oldest_tle = nob;
324 		kfree(b);
325 	}
326 
327 	spin_unlock_irq(&mdev->req_lock);
328 	dec_ap_pending(mdev);
329 
330 	return;
331 
332 bail:
333 	spin_unlock_irq(&mdev->req_lock);
334 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
335 }
336 
337 
338 /**
339  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
340  * @mdev:	DRBD device.
341  *
342  * This is called after the connection to the peer was lost. The storage covered
343  * by the requests on the transfer gets marked as our of sync. Called from the
344  * receiver thread and the worker thread.
345  */
346 void tl_clear(struct drbd_conf *mdev)
347 {
348 	struct drbd_tl_epoch *b, *tmp;
349 	struct list_head *le, *tle;
350 	struct drbd_request *r;
351 	int new_initial_bnr = net_random();
352 
353 	spin_lock_irq(&mdev->req_lock);
354 
355 	b = mdev->oldest_tle;
356 	while (b) {
357 		list_for_each_safe(le, tle, &b->requests) {
358 			r = list_entry(le, struct drbd_request, tl_requests);
359 			/* It would be nice to complete outside of spinlock.
360 			 * But this is easier for now. */
361 			_req_mod(r, connection_lost_while_pending);
362 		}
363 		tmp = b->next;
364 
365 		/* there could still be requests on that ring list,
366 		 * in case local io is still pending */
367 		list_del(&b->requests);
368 
369 		/* dec_ap_pending corresponding to queue_barrier.
370 		 * the newest barrier may not have been queued yet,
371 		 * in which case w.cb is still NULL. */
372 		if (b->w.cb != NULL)
373 			dec_ap_pending(mdev);
374 
375 		if (b == mdev->newest_tle) {
376 			/* recycle, but reinit! */
377 			D_ASSERT(tmp == NULL);
378 			INIT_LIST_HEAD(&b->requests);
379 			INIT_LIST_HEAD(&b->w.list);
380 			b->w.cb = NULL;
381 			b->br_number = new_initial_bnr;
382 			b->n_req = 0;
383 
384 			mdev->oldest_tle = b;
385 			break;
386 		}
387 		kfree(b);
388 		b = tmp;
389 	}
390 
391 	/* we expect this list to be empty. */
392 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
393 
394 	/* but just in case, clean it up anyways! */
395 	list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
396 		r = list_entry(le, struct drbd_request, tl_requests);
397 		/* It would be nice to complete outside of spinlock.
398 		 * But this is easier for now. */
399 		_req_mod(r, connection_lost_while_pending);
400 	}
401 
402 	/* ensure bit indicating barrier is required is clear */
403 	clear_bit(CREATE_BARRIER, &mdev->flags);
404 
405 	spin_unlock_irq(&mdev->req_lock);
406 }
407 
408 /**
409  * cl_wide_st_chg() - TRUE if the state change is a cluster wide one
410  * @mdev:	DRBD device.
411  * @os:		old (current) state.
412  * @ns:		new (wanted) state.
413  */
414 static int cl_wide_st_chg(struct drbd_conf *mdev,
415 			  union drbd_state os, union drbd_state ns)
416 {
417 	return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
418 		 ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
419 		  (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
420 		  (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
421 		  (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))) ||
422 		(os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
423 		(os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
424 }
425 
426 int drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
427 		      union drbd_state mask, union drbd_state val)
428 {
429 	unsigned long flags;
430 	union drbd_state os, ns;
431 	int rv;
432 
433 	spin_lock_irqsave(&mdev->req_lock, flags);
434 	os = mdev->state;
435 	ns.i = (os.i & ~mask.i) | val.i;
436 	rv = _drbd_set_state(mdev, ns, f, NULL);
437 	ns = mdev->state;
438 	spin_unlock_irqrestore(&mdev->req_lock, flags);
439 
440 	return rv;
441 }
442 
443 /**
444  * drbd_force_state() - Impose a change which happens outside our control on our state
445  * @mdev:	DRBD device.
446  * @mask:	mask of state bits to change.
447  * @val:	value of new state bits.
448  */
449 void drbd_force_state(struct drbd_conf *mdev,
450 	union drbd_state mask, union drbd_state val)
451 {
452 	drbd_change_state(mdev, CS_HARD, mask, val);
453 }
454 
455 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns);
456 static int is_valid_state_transition(struct drbd_conf *,
457 				     union drbd_state, union drbd_state);
458 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
459 				       union drbd_state ns, int *warn_sync_abort);
460 int drbd_send_state_req(struct drbd_conf *,
461 			union drbd_state, union drbd_state);
462 
463 static enum drbd_state_ret_codes _req_st_cond(struct drbd_conf *mdev,
464 				    union drbd_state mask, union drbd_state val)
465 {
466 	union drbd_state os, ns;
467 	unsigned long flags;
468 	int rv;
469 
470 	if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
471 		return SS_CW_SUCCESS;
472 
473 	if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
474 		return SS_CW_FAILED_BY_PEER;
475 
476 	rv = 0;
477 	spin_lock_irqsave(&mdev->req_lock, flags);
478 	os = mdev->state;
479 	ns.i = (os.i & ~mask.i) | val.i;
480 	ns = sanitize_state(mdev, os, ns, NULL);
481 
482 	if (!cl_wide_st_chg(mdev, os, ns))
483 		rv = SS_CW_NO_NEED;
484 	if (!rv) {
485 		rv = is_valid_state(mdev, ns);
486 		if (rv == SS_SUCCESS) {
487 			rv = is_valid_state_transition(mdev, ns, os);
488 			if (rv == SS_SUCCESS)
489 				rv = 0; /* cont waiting, otherwise fail. */
490 		}
491 	}
492 	spin_unlock_irqrestore(&mdev->req_lock, flags);
493 
494 	return rv;
495 }
496 
497 /**
498  * drbd_req_state() - Perform an eventually cluster wide state change
499  * @mdev:	DRBD device.
500  * @mask:	mask of state bits to change.
501  * @val:	value of new state bits.
502  * @f:		flags
503  *
504  * Should not be called directly, use drbd_request_state() or
505  * _drbd_request_state().
506  */
507 static int drbd_req_state(struct drbd_conf *mdev,
508 			  union drbd_state mask, union drbd_state val,
509 			  enum chg_state_flags f)
510 {
511 	struct completion done;
512 	unsigned long flags;
513 	union drbd_state os, ns;
514 	int rv;
515 
516 	init_completion(&done);
517 
518 	if (f & CS_SERIALIZE)
519 		mutex_lock(&mdev->state_mutex);
520 
521 	spin_lock_irqsave(&mdev->req_lock, flags);
522 	os = mdev->state;
523 	ns.i = (os.i & ~mask.i) | val.i;
524 	ns = sanitize_state(mdev, os, ns, NULL);
525 
526 	if (cl_wide_st_chg(mdev, os, ns)) {
527 		rv = is_valid_state(mdev, ns);
528 		if (rv == SS_SUCCESS)
529 			rv = is_valid_state_transition(mdev, ns, os);
530 		spin_unlock_irqrestore(&mdev->req_lock, flags);
531 
532 		if (rv < SS_SUCCESS) {
533 			if (f & CS_VERBOSE)
534 				print_st_err(mdev, os, ns, rv);
535 			goto abort;
536 		}
537 
538 		drbd_state_lock(mdev);
539 		if (!drbd_send_state_req(mdev, mask, val)) {
540 			drbd_state_unlock(mdev);
541 			rv = SS_CW_FAILED_BY_PEER;
542 			if (f & CS_VERBOSE)
543 				print_st_err(mdev, os, ns, rv);
544 			goto abort;
545 		}
546 
547 		wait_event(mdev->state_wait,
548 			(rv = _req_st_cond(mdev, mask, val)));
549 
550 		if (rv < SS_SUCCESS) {
551 			drbd_state_unlock(mdev);
552 			if (f & CS_VERBOSE)
553 				print_st_err(mdev, os, ns, rv);
554 			goto abort;
555 		}
556 		spin_lock_irqsave(&mdev->req_lock, flags);
557 		os = mdev->state;
558 		ns.i = (os.i & ~mask.i) | val.i;
559 		rv = _drbd_set_state(mdev, ns, f, &done);
560 		drbd_state_unlock(mdev);
561 	} else {
562 		rv = _drbd_set_state(mdev, ns, f, &done);
563 	}
564 
565 	spin_unlock_irqrestore(&mdev->req_lock, flags);
566 
567 	if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
568 		D_ASSERT(current != mdev->worker.task);
569 		wait_for_completion(&done);
570 	}
571 
572 abort:
573 	if (f & CS_SERIALIZE)
574 		mutex_unlock(&mdev->state_mutex);
575 
576 	return rv;
577 }
578 
579 /**
580  * _drbd_request_state() - Request a state change (with flags)
581  * @mdev:	DRBD device.
582  * @mask:	mask of state bits to change.
583  * @val:	value of new state bits.
584  * @f:		flags
585  *
586  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
587  * flag, or when logging of failed state change requests is not desired.
588  */
589 int _drbd_request_state(struct drbd_conf *mdev,	union drbd_state mask,
590 			union drbd_state val,	enum chg_state_flags f)
591 {
592 	int rv;
593 
594 	wait_event(mdev->state_wait,
595 		   (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
596 
597 	return rv;
598 }
599 
600 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
601 {
602 	dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
603 	    name,
604 	    drbd_conn_str(ns.conn),
605 	    drbd_role_str(ns.role),
606 	    drbd_role_str(ns.peer),
607 	    drbd_disk_str(ns.disk),
608 	    drbd_disk_str(ns.pdsk),
609 	    ns.susp ? 's' : 'r',
610 	    ns.aftr_isp ? 'a' : '-',
611 	    ns.peer_isp ? 'p' : '-',
612 	    ns.user_isp ? 'u' : '-'
613 	    );
614 }
615 
616 void print_st_err(struct drbd_conf *mdev,
617 	union drbd_state os, union drbd_state ns, int err)
618 {
619 	if (err == SS_IN_TRANSIENT_STATE)
620 		return;
621 	dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
622 	print_st(mdev, " state", os);
623 	print_st(mdev, "wanted", ns);
624 }
625 
626 
627 #define drbd_peer_str drbd_role_str
628 #define drbd_pdsk_str drbd_disk_str
629 
630 #define drbd_susp_str(A)     ((A) ? "1" : "0")
631 #define drbd_aftr_isp_str(A) ((A) ? "1" : "0")
632 #define drbd_peer_isp_str(A) ((A) ? "1" : "0")
633 #define drbd_user_isp_str(A) ((A) ? "1" : "0")
634 
635 #define PSC(A) \
636 	({ if (ns.A != os.A) { \
637 		pbp += sprintf(pbp, #A "( %s -> %s ) ", \
638 			      drbd_##A##_str(os.A), \
639 			      drbd_##A##_str(ns.A)); \
640 	} })
641 
642 /**
643  * is_valid_state() - Returns an SS_ error code if ns is not valid
644  * @mdev:	DRBD device.
645  * @ns:		State to consider.
646  */
647 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
648 {
649 	/* See drbd_state_sw_errors in drbd_strings.c */
650 
651 	enum drbd_fencing_p fp;
652 	int rv = SS_SUCCESS;
653 
654 	fp = FP_DONT_CARE;
655 	if (get_ldev(mdev)) {
656 		fp = mdev->ldev->dc.fencing;
657 		put_ldev(mdev);
658 	}
659 
660 	if (get_net_conf(mdev)) {
661 		if (!mdev->net_conf->two_primaries &&
662 		    ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
663 			rv = SS_TWO_PRIMARIES;
664 		put_net_conf(mdev);
665 	}
666 
667 	if (rv <= 0)
668 		/* already found a reason to abort */;
669 	else if (ns.role == R_SECONDARY && mdev->open_cnt)
670 		rv = SS_DEVICE_IN_USE;
671 
672 	else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
673 		rv = SS_NO_UP_TO_DATE_DISK;
674 
675 	else if (fp >= FP_RESOURCE &&
676 		 ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
677 		rv = SS_PRIMARY_NOP;
678 
679 	else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
680 		rv = SS_NO_UP_TO_DATE_DISK;
681 
682 	else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
683 		rv = SS_NO_LOCAL_DISK;
684 
685 	else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
686 		rv = SS_NO_REMOTE_DISK;
687 
688 	else if (ns.conn > C_CONNECTED && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE)
689 		rv = SS_NO_UP_TO_DATE_DISK;
690 
691 	else if ((ns.conn == C_CONNECTED ||
692 		  ns.conn == C_WF_BITMAP_S ||
693 		  ns.conn == C_SYNC_SOURCE ||
694 		  ns.conn == C_PAUSED_SYNC_S) &&
695 		  ns.disk == D_OUTDATED)
696 		rv = SS_CONNECTED_OUTDATES;
697 
698 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
699 		 (mdev->sync_conf.verify_alg[0] == 0))
700 		rv = SS_NO_VERIFY_ALG;
701 
702 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
703 		  mdev->agreed_pro_version < 88)
704 		rv = SS_NOT_SUPPORTED;
705 
706 	return rv;
707 }
708 
709 /**
710  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
711  * @mdev:	DRBD device.
712  * @ns:		new state.
713  * @os:		old state.
714  */
715 static int is_valid_state_transition(struct drbd_conf *mdev,
716 				     union drbd_state ns, union drbd_state os)
717 {
718 	int rv = SS_SUCCESS;
719 
720 	if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
721 	    os.conn > C_CONNECTED)
722 		rv = SS_RESYNC_RUNNING;
723 
724 	if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
725 		rv = SS_ALREADY_STANDALONE;
726 
727 	if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
728 		rv = SS_IS_DISKLESS;
729 
730 	if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
731 		rv = SS_NO_NET_CONFIG;
732 
733 	if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
734 		rv = SS_LOWER_THAN_OUTDATED;
735 
736 	if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
737 		rv = SS_IN_TRANSIENT_STATE;
738 
739 	if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
740 		rv = SS_IN_TRANSIENT_STATE;
741 
742 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
743 		rv = SS_NEED_CONNECTION;
744 
745 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
746 	    ns.conn != os.conn && os.conn > C_CONNECTED)
747 		rv = SS_RESYNC_RUNNING;
748 
749 	if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
750 	    os.conn < C_CONNECTED)
751 		rv = SS_NEED_CONNECTION;
752 
753 	return rv;
754 }
755 
756 /**
757  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
758  * @mdev:	DRBD device.
759  * @os:		old state.
760  * @ns:		new state.
761  * @warn_sync_abort:
762  *
763  * When we loose connection, we have to set the state of the peers disk (pdsk)
764  * to D_UNKNOWN. This rule and many more along those lines are in this function.
765  */
766 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
767 				       union drbd_state ns, int *warn_sync_abort)
768 {
769 	enum drbd_fencing_p fp;
770 
771 	fp = FP_DONT_CARE;
772 	if (get_ldev(mdev)) {
773 		fp = mdev->ldev->dc.fencing;
774 		put_ldev(mdev);
775 	}
776 
777 	/* Disallow Network errors to configure a device's network part */
778 	if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
779 	    os.conn <= C_DISCONNECTING)
780 		ns.conn = os.conn;
781 
782 	/* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow */
783 	if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
784 	    ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING)
785 		ns.conn = os.conn;
786 
787 	/* After C_DISCONNECTING only C_STANDALONE may follow */
788 	if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
789 		ns.conn = os.conn;
790 
791 	if (ns.conn < C_CONNECTED) {
792 		ns.peer_isp = 0;
793 		ns.peer = R_UNKNOWN;
794 		if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
795 			ns.pdsk = D_UNKNOWN;
796 	}
797 
798 	/* Clear the aftr_isp when becoming unconfigured */
799 	if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
800 		ns.aftr_isp = 0;
801 
802 	if (ns.conn <= C_DISCONNECTING && ns.disk == D_DISKLESS)
803 		ns.pdsk = D_UNKNOWN;
804 
805 	/* Abort resync if a disk fails/detaches */
806 	if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
807 	    (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
808 		if (warn_sync_abort)
809 			*warn_sync_abort = 1;
810 		ns.conn = C_CONNECTED;
811 	}
812 
813 	if (ns.conn >= C_CONNECTED &&
814 	    ((ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED) ||
815 	     (ns.disk == D_NEGOTIATING && ns.conn == C_WF_BITMAP_T))) {
816 		switch (ns.conn) {
817 		case C_WF_BITMAP_T:
818 		case C_PAUSED_SYNC_T:
819 			ns.disk = D_OUTDATED;
820 			break;
821 		case C_CONNECTED:
822 		case C_WF_BITMAP_S:
823 		case C_SYNC_SOURCE:
824 		case C_PAUSED_SYNC_S:
825 			ns.disk = D_UP_TO_DATE;
826 			break;
827 		case C_SYNC_TARGET:
828 			ns.disk = D_INCONSISTENT;
829 			dev_warn(DEV, "Implicitly set disk state Inconsistent!\n");
830 			break;
831 		}
832 		if (os.disk == D_OUTDATED && ns.disk == D_UP_TO_DATE)
833 			dev_warn(DEV, "Implicitly set disk from Outdated to UpToDate\n");
834 	}
835 
836 	if (ns.conn >= C_CONNECTED &&
837 	    (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)) {
838 		switch (ns.conn) {
839 		case C_CONNECTED:
840 		case C_WF_BITMAP_T:
841 		case C_PAUSED_SYNC_T:
842 		case C_SYNC_TARGET:
843 			ns.pdsk = D_UP_TO_DATE;
844 			break;
845 		case C_WF_BITMAP_S:
846 		case C_PAUSED_SYNC_S:
847 			/* remap any consistent state to D_OUTDATED,
848 			 * but disallow "upgrade" of not even consistent states.
849 			 */
850 			ns.pdsk =
851 				(D_DISKLESS < os.pdsk && os.pdsk < D_OUTDATED)
852 				? os.pdsk : D_OUTDATED;
853 			break;
854 		case C_SYNC_SOURCE:
855 			ns.pdsk = D_INCONSISTENT;
856 			dev_warn(DEV, "Implicitly set pdsk Inconsistent!\n");
857 			break;
858 		}
859 		if (os.pdsk == D_OUTDATED && ns.pdsk == D_UP_TO_DATE)
860 			dev_warn(DEV, "Implicitly set pdsk from Outdated to UpToDate\n");
861 	}
862 
863 	/* Connection breaks down before we finished "Negotiating" */
864 	if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
865 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
866 		if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
867 			ns.disk = mdev->new_state_tmp.disk;
868 			ns.pdsk = mdev->new_state_tmp.pdsk;
869 		} else {
870 			dev_alert(DEV, "Connection lost while negotiating, no data!\n");
871 			ns.disk = D_DISKLESS;
872 			ns.pdsk = D_UNKNOWN;
873 		}
874 		put_ldev(mdev);
875 	}
876 
877 	if (fp == FP_STONITH &&
878 	    (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
879 	    !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
880 		ns.susp = 1;
881 
882 	if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
883 		if (ns.conn == C_SYNC_SOURCE)
884 			ns.conn = C_PAUSED_SYNC_S;
885 		if (ns.conn == C_SYNC_TARGET)
886 			ns.conn = C_PAUSED_SYNC_T;
887 	} else {
888 		if (ns.conn == C_PAUSED_SYNC_S)
889 			ns.conn = C_SYNC_SOURCE;
890 		if (ns.conn == C_PAUSED_SYNC_T)
891 			ns.conn = C_SYNC_TARGET;
892 	}
893 
894 	return ns;
895 }
896 
897 /* helper for __drbd_set_state */
898 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
899 {
900 	if (cs == C_VERIFY_T) {
901 		/* starting online verify from an arbitrary position
902 		 * does not fit well into the existing protocol.
903 		 * on C_VERIFY_T, we initialize ov_left and friends
904 		 * implicitly in receive_DataRequest once the
905 		 * first P_OV_REQUEST is received */
906 		mdev->ov_start_sector = ~(sector_t)0;
907 	} else {
908 		unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
909 		if (bit >= mdev->rs_total)
910 			mdev->ov_start_sector =
911 				BM_BIT_TO_SECT(mdev->rs_total - 1);
912 		mdev->ov_position = mdev->ov_start_sector;
913 	}
914 }
915 
916 /**
917  * __drbd_set_state() - Set a new DRBD state
918  * @mdev:	DRBD device.
919  * @ns:		new state.
920  * @flags:	Flags
921  * @done:	Optional completion, that will get completed after the after_state_ch() finished
922  *
923  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
924  */
925 int __drbd_set_state(struct drbd_conf *mdev,
926 		    union drbd_state ns, enum chg_state_flags flags,
927 		    struct completion *done)
928 {
929 	union drbd_state os;
930 	int rv = SS_SUCCESS;
931 	int warn_sync_abort = 0;
932 	struct after_state_chg_work *ascw;
933 
934 	os = mdev->state;
935 
936 	ns = sanitize_state(mdev, os, ns, &warn_sync_abort);
937 
938 	if (ns.i == os.i)
939 		return SS_NOTHING_TO_DO;
940 
941 	if (!(flags & CS_HARD)) {
942 		/*  pre-state-change checks ; only look at ns  */
943 		/* See drbd_state_sw_errors in drbd_strings.c */
944 
945 		rv = is_valid_state(mdev, ns);
946 		if (rv < SS_SUCCESS) {
947 			/* If the old state was illegal as well, then let
948 			   this happen...*/
949 
950 			if (is_valid_state(mdev, os) == rv) {
951 				dev_err(DEV, "Considering state change from bad state. "
952 				    "Error would be: '%s'\n",
953 				    drbd_set_st_err_str(rv));
954 				print_st(mdev, "old", os);
955 				print_st(mdev, "new", ns);
956 				rv = is_valid_state_transition(mdev, ns, os);
957 			}
958 		} else
959 			rv = is_valid_state_transition(mdev, ns, os);
960 	}
961 
962 	if (rv < SS_SUCCESS) {
963 		if (flags & CS_VERBOSE)
964 			print_st_err(mdev, os, ns, rv);
965 		return rv;
966 	}
967 
968 	if (warn_sync_abort)
969 		dev_warn(DEV, "Resync aborted.\n");
970 
971 	{
972 		char *pbp, pb[300];
973 		pbp = pb;
974 		*pbp = 0;
975 		PSC(role);
976 		PSC(peer);
977 		PSC(conn);
978 		PSC(disk);
979 		PSC(pdsk);
980 		PSC(susp);
981 		PSC(aftr_isp);
982 		PSC(peer_isp);
983 		PSC(user_isp);
984 		dev_info(DEV, "%s\n", pb);
985 	}
986 
987 	/* solve the race between becoming unconfigured,
988 	 * worker doing the cleanup, and
989 	 * admin reconfiguring us:
990 	 * on (re)configure, first set CONFIG_PENDING,
991 	 * then wait for a potentially exiting worker,
992 	 * start the worker, and schedule one no_op.
993 	 * then proceed with configuration.
994 	 */
995 	if (ns.disk == D_DISKLESS &&
996 	    ns.conn == C_STANDALONE &&
997 	    ns.role == R_SECONDARY &&
998 	    !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
999 		set_bit(DEVICE_DYING, &mdev->flags);
1000 
1001 	mdev->state.i = ns.i;
1002 	wake_up(&mdev->misc_wait);
1003 	wake_up(&mdev->state_wait);
1004 
1005 	/*   post-state-change actions   */
1006 	if (os.conn >= C_SYNC_SOURCE   && ns.conn <= C_CONNECTED) {
1007 		set_bit(STOP_SYNC_TIMER, &mdev->flags);
1008 		mod_timer(&mdev->resync_timer, jiffies);
1009 	}
1010 
1011 	/* aborted verify run. log the last position */
1012 	if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1013 	    ns.conn < C_CONNECTED) {
1014 		mdev->ov_start_sector =
1015 			BM_BIT_TO_SECT(mdev->rs_total - mdev->ov_left);
1016 		dev_info(DEV, "Online Verify reached sector %llu\n",
1017 			(unsigned long long)mdev->ov_start_sector);
1018 	}
1019 
1020 	if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1021 	    (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1022 		dev_info(DEV, "Syncer continues.\n");
1023 		mdev->rs_paused += (long)jiffies-(long)mdev->rs_mark_time;
1024 		if (ns.conn == C_SYNC_TARGET) {
1025 			if (!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))
1026 				mod_timer(&mdev->resync_timer, jiffies);
1027 			/* This if (!test_bit) is only needed for the case
1028 			   that a device that has ceased to used its timer,
1029 			   i.e. it is already in drbd_resync_finished() gets
1030 			   paused and resumed. */
1031 		}
1032 	}
1033 
1034 	if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1035 	    (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1036 		dev_info(DEV, "Resync suspended\n");
1037 		mdev->rs_mark_time = jiffies;
1038 		if (ns.conn == C_PAUSED_SYNC_T)
1039 			set_bit(STOP_SYNC_TIMER, &mdev->flags);
1040 	}
1041 
1042 	if (os.conn == C_CONNECTED &&
1043 	    (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1044 		mdev->ov_position = 0;
1045 		mdev->rs_total =
1046 		mdev->rs_mark_left = drbd_bm_bits(mdev);
1047 		if (mdev->agreed_pro_version >= 90)
1048 			set_ov_position(mdev, ns.conn);
1049 		else
1050 			mdev->ov_start_sector = 0;
1051 		mdev->ov_left = mdev->rs_total
1052 			      - BM_SECT_TO_BIT(mdev->ov_position);
1053 		mdev->rs_start     =
1054 		mdev->rs_mark_time = jiffies;
1055 		mdev->ov_last_oos_size = 0;
1056 		mdev->ov_last_oos_start = 0;
1057 
1058 		if (ns.conn == C_VERIFY_S) {
1059 			dev_info(DEV, "Starting Online Verify from sector %llu\n",
1060 					(unsigned long long)mdev->ov_position);
1061 			mod_timer(&mdev->resync_timer, jiffies);
1062 		}
1063 	}
1064 
1065 	if (get_ldev(mdev)) {
1066 		u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1067 						 MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1068 						 MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1069 
1070 		if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1071 			mdf |= MDF_CRASHED_PRIMARY;
1072 		if (mdev->state.role == R_PRIMARY ||
1073 		    (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1074 			mdf |= MDF_PRIMARY_IND;
1075 		if (mdev->state.conn > C_WF_REPORT_PARAMS)
1076 			mdf |= MDF_CONNECTED_IND;
1077 		if (mdev->state.disk > D_INCONSISTENT)
1078 			mdf |= MDF_CONSISTENT;
1079 		if (mdev->state.disk > D_OUTDATED)
1080 			mdf |= MDF_WAS_UP_TO_DATE;
1081 		if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1082 			mdf |= MDF_PEER_OUT_DATED;
1083 		if (mdf != mdev->ldev->md.flags) {
1084 			mdev->ldev->md.flags = mdf;
1085 			drbd_md_mark_dirty(mdev);
1086 		}
1087 		if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1088 			drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1089 		put_ldev(mdev);
1090 	}
1091 
1092 	/* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1093 	if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1094 	    os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1095 		set_bit(CONSIDER_RESYNC, &mdev->flags);
1096 
1097 	/* Receiver should clean up itself */
1098 	if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1099 		drbd_thread_stop_nowait(&mdev->receiver);
1100 
1101 	/* Now the receiver finished cleaning up itself, it should die */
1102 	if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1103 		drbd_thread_stop_nowait(&mdev->receiver);
1104 
1105 	/* Upon network failure, we need to restart the receiver. */
1106 	if (os.conn > C_TEAR_DOWN &&
1107 	    ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1108 		drbd_thread_restart_nowait(&mdev->receiver);
1109 
1110 	ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1111 	if (ascw) {
1112 		ascw->os = os;
1113 		ascw->ns = ns;
1114 		ascw->flags = flags;
1115 		ascw->w.cb = w_after_state_ch;
1116 		ascw->done = done;
1117 		drbd_queue_work(&mdev->data.work, &ascw->w);
1118 	} else {
1119 		dev_warn(DEV, "Could not kmalloc an ascw\n");
1120 	}
1121 
1122 	return rv;
1123 }
1124 
1125 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1126 {
1127 	struct after_state_chg_work *ascw =
1128 		container_of(w, struct after_state_chg_work, w);
1129 	after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1130 	if (ascw->flags & CS_WAIT_COMPLETE) {
1131 		D_ASSERT(ascw->done != NULL);
1132 		complete(ascw->done);
1133 	}
1134 	kfree(ascw);
1135 
1136 	return 1;
1137 }
1138 
1139 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1140 {
1141 	if (rv) {
1142 		dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1143 		_drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1144 		return;
1145 	}
1146 
1147 	switch (mdev->state.conn) {
1148 	case C_STARTING_SYNC_T:
1149 		_drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1150 		break;
1151 	case C_STARTING_SYNC_S:
1152 		drbd_start_resync(mdev, C_SYNC_SOURCE);
1153 		break;
1154 	}
1155 }
1156 
1157 /**
1158  * after_state_ch() - Perform after state change actions that may sleep
1159  * @mdev:	DRBD device.
1160  * @os:		old state.
1161  * @ns:		new state.
1162  * @flags:	Flags
1163  */
1164 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1165 			   union drbd_state ns, enum chg_state_flags flags)
1166 {
1167 	enum drbd_fencing_p fp;
1168 
1169 	if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1170 		clear_bit(CRASHED_PRIMARY, &mdev->flags);
1171 		if (mdev->p_uuid)
1172 			mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1173 	}
1174 
1175 	fp = FP_DONT_CARE;
1176 	if (get_ldev(mdev)) {
1177 		fp = mdev->ldev->dc.fencing;
1178 		put_ldev(mdev);
1179 	}
1180 
1181 	/* Inform userspace about the change... */
1182 	drbd_bcast_state(mdev, ns);
1183 
1184 	if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1185 	    (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1186 		drbd_khelper(mdev, "pri-on-incon-degr");
1187 
1188 	/* Here we have the actions that are performed after a
1189 	   state change. This function might sleep */
1190 
1191 	if (fp == FP_STONITH && ns.susp) {
1192 		/* case1: The outdate peer handler is successful:
1193 		 * case2: The connection was established again: */
1194 		if ((os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) ||
1195 		    (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)) {
1196 			tl_clear(mdev);
1197 			spin_lock_irq(&mdev->req_lock);
1198 			_drbd_set_state(_NS(mdev, susp, 0), CS_VERBOSE, NULL);
1199 			spin_unlock_irq(&mdev->req_lock);
1200 		}
1201 	}
1202 	/* Do not change the order of the if above and the two below... */
1203 	if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1204 		drbd_send_uuids(mdev);
1205 		drbd_send_state(mdev);
1206 	}
1207 	if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S)
1208 		drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL, "send_bitmap (WFBitMapS)");
1209 
1210 	/* Lost contact to peer's copy of the data */
1211 	if ((os.pdsk >= D_INCONSISTENT &&
1212 	     os.pdsk != D_UNKNOWN &&
1213 	     os.pdsk != D_OUTDATED)
1214 	&&  (ns.pdsk < D_INCONSISTENT ||
1215 	     ns.pdsk == D_UNKNOWN ||
1216 	     ns.pdsk == D_OUTDATED)) {
1217 		if (get_ldev(mdev)) {
1218 			if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1219 			    mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1220 				drbd_uuid_new_current(mdev);
1221 				drbd_send_uuids(mdev);
1222 			}
1223 			put_ldev(mdev);
1224 		}
1225 	}
1226 
1227 	if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1228 		if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0)
1229 			drbd_uuid_new_current(mdev);
1230 
1231 		/* D_DISKLESS Peer becomes secondary */
1232 		if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1233 			drbd_al_to_on_disk_bm(mdev);
1234 		put_ldev(mdev);
1235 	}
1236 
1237 	/* Last part of the attaching process ... */
1238 	if (ns.conn >= C_CONNECTED &&
1239 	    os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1240 		drbd_send_sizes(mdev, 0, 0);  /* to start sync... */
1241 		drbd_send_uuids(mdev);
1242 		drbd_send_state(mdev);
1243 	}
1244 
1245 	/* We want to pause/continue resync, tell peer. */
1246 	if (ns.conn >= C_CONNECTED &&
1247 	     ((os.aftr_isp != ns.aftr_isp) ||
1248 	      (os.user_isp != ns.user_isp)))
1249 		drbd_send_state(mdev);
1250 
1251 	/* In case one of the isp bits got set, suspend other devices. */
1252 	if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1253 	    (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1254 		suspend_other_sg(mdev);
1255 
1256 	/* Make sure the peer gets informed about eventual state
1257 	   changes (ISP bits) while we were in WFReportParams. */
1258 	if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1259 		drbd_send_state(mdev);
1260 
1261 	/* We are in the progress to start a full sync... */
1262 	if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1263 	    (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1264 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, &abw_start_sync, "set_n_write from StartingSync");
1265 
1266 	/* We are invalidating our self... */
1267 	if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1268 	    os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1269 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
1270 
1271 	if (os.disk > D_FAILED && ns.disk == D_FAILED) {
1272 		enum drbd_io_error_p eh;
1273 
1274 		eh = EP_PASS_ON;
1275 		if (get_ldev_if_state(mdev, D_FAILED)) {
1276 			eh = mdev->ldev->dc.on_io_error;
1277 			put_ldev(mdev);
1278 		}
1279 
1280 		drbd_rs_cancel_all(mdev);
1281 		/* since get_ldev() only works as long as disk>=D_INCONSISTENT,
1282 		   and it is D_DISKLESS here, local_cnt can only go down, it can
1283 		   not increase... It will reach zero */
1284 		wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1285 		mdev->rs_total = 0;
1286 		mdev->rs_failed = 0;
1287 		atomic_set(&mdev->rs_pending_cnt, 0);
1288 
1289 		spin_lock_irq(&mdev->req_lock);
1290 		_drbd_set_state(_NS(mdev, disk, D_DISKLESS), CS_HARD, NULL);
1291 		spin_unlock_irq(&mdev->req_lock);
1292 
1293 		if (eh == EP_CALL_HELPER)
1294 			drbd_khelper(mdev, "local-io-error");
1295 	}
1296 
1297 	if (os.disk > D_DISKLESS && ns.disk == D_DISKLESS) {
1298 
1299 		if (os.disk == D_FAILED) /* && ns.disk == D_DISKLESS*/ {
1300 			if (drbd_send_state(mdev))
1301 				dev_warn(DEV, "Notified peer that my disk is broken.\n");
1302 			else
1303 				dev_err(DEV, "Sending state in drbd_io_error() failed\n");
1304 		}
1305 
1306 		wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1307 		lc_destroy(mdev->resync);
1308 		mdev->resync = NULL;
1309 		lc_destroy(mdev->act_log);
1310 		mdev->act_log = NULL;
1311 		__no_warn(local,
1312 			drbd_free_bc(mdev->ldev);
1313 			mdev->ldev = NULL;);
1314 
1315 		if (mdev->md_io_tmpp)
1316 			__free_page(mdev->md_io_tmpp);
1317 	}
1318 
1319 	/* Disks got bigger while they were detached */
1320 	if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1321 	    test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1322 		if (ns.conn == C_CONNECTED)
1323 			resync_after_online_grow(mdev);
1324 	}
1325 
1326 	/* A resync finished or aborted, wake paused devices... */
1327 	if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1328 	    (os.peer_isp && !ns.peer_isp) ||
1329 	    (os.user_isp && !ns.user_isp))
1330 		resume_next_sg(mdev);
1331 
1332 	/* Upon network connection, we need to start the receiver */
1333 	if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1334 		drbd_thread_start(&mdev->receiver);
1335 
1336 	/* Terminate worker thread if we are unconfigured - it will be
1337 	   restarted as needed... */
1338 	if (ns.disk == D_DISKLESS &&
1339 	    ns.conn == C_STANDALONE &&
1340 	    ns.role == R_SECONDARY) {
1341 		if (os.aftr_isp != ns.aftr_isp)
1342 			resume_next_sg(mdev);
1343 		/* set in __drbd_set_state, unless CONFIG_PENDING was set */
1344 		if (test_bit(DEVICE_DYING, &mdev->flags))
1345 			drbd_thread_stop_nowait(&mdev->worker);
1346 	}
1347 
1348 	drbd_md_sync(mdev);
1349 }
1350 
1351 
1352 static int drbd_thread_setup(void *arg)
1353 {
1354 	struct drbd_thread *thi = (struct drbd_thread *) arg;
1355 	struct drbd_conf *mdev = thi->mdev;
1356 	unsigned long flags;
1357 	int retval;
1358 
1359 restart:
1360 	retval = thi->function(thi);
1361 
1362 	spin_lock_irqsave(&thi->t_lock, flags);
1363 
1364 	/* if the receiver has been "Exiting", the last thing it did
1365 	 * was set the conn state to "StandAlone",
1366 	 * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1367 	 * and receiver thread will be "started".
1368 	 * drbd_thread_start needs to set "Restarting" in that case.
1369 	 * t_state check and assignment needs to be within the same spinlock,
1370 	 * so either thread_start sees Exiting, and can remap to Restarting,
1371 	 * or thread_start see None, and can proceed as normal.
1372 	 */
1373 
1374 	if (thi->t_state == Restarting) {
1375 		dev_info(DEV, "Restarting %s\n", current->comm);
1376 		thi->t_state = Running;
1377 		spin_unlock_irqrestore(&thi->t_lock, flags);
1378 		goto restart;
1379 	}
1380 
1381 	thi->task = NULL;
1382 	thi->t_state = None;
1383 	smp_mb();
1384 	complete(&thi->stop);
1385 	spin_unlock_irqrestore(&thi->t_lock, flags);
1386 
1387 	dev_info(DEV, "Terminating %s\n", current->comm);
1388 
1389 	/* Release mod reference taken when thread was started */
1390 	module_put(THIS_MODULE);
1391 	return retval;
1392 }
1393 
1394 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1395 		      int (*func) (struct drbd_thread *))
1396 {
1397 	spin_lock_init(&thi->t_lock);
1398 	thi->task    = NULL;
1399 	thi->t_state = None;
1400 	thi->function = func;
1401 	thi->mdev = mdev;
1402 }
1403 
1404 int drbd_thread_start(struct drbd_thread *thi)
1405 {
1406 	struct drbd_conf *mdev = thi->mdev;
1407 	struct task_struct *nt;
1408 	unsigned long flags;
1409 
1410 	const char *me =
1411 		thi == &mdev->receiver ? "receiver" :
1412 		thi == &mdev->asender  ? "asender"  :
1413 		thi == &mdev->worker   ? "worker"   : "NONSENSE";
1414 
1415 	/* is used from state engine doing drbd_thread_stop_nowait,
1416 	 * while holding the req lock irqsave */
1417 	spin_lock_irqsave(&thi->t_lock, flags);
1418 
1419 	switch (thi->t_state) {
1420 	case None:
1421 		dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1422 				me, current->comm, current->pid);
1423 
1424 		/* Get ref on module for thread - this is released when thread exits */
1425 		if (!try_module_get(THIS_MODULE)) {
1426 			dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1427 			spin_unlock_irqrestore(&thi->t_lock, flags);
1428 			return FALSE;
1429 		}
1430 
1431 		init_completion(&thi->stop);
1432 		D_ASSERT(thi->task == NULL);
1433 		thi->reset_cpu_mask = 1;
1434 		thi->t_state = Running;
1435 		spin_unlock_irqrestore(&thi->t_lock, flags);
1436 		flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1437 
1438 		nt = kthread_create(drbd_thread_setup, (void *) thi,
1439 				    "drbd%d_%s", mdev_to_minor(mdev), me);
1440 
1441 		if (IS_ERR(nt)) {
1442 			dev_err(DEV, "Couldn't start thread\n");
1443 
1444 			module_put(THIS_MODULE);
1445 			return FALSE;
1446 		}
1447 		spin_lock_irqsave(&thi->t_lock, flags);
1448 		thi->task = nt;
1449 		thi->t_state = Running;
1450 		spin_unlock_irqrestore(&thi->t_lock, flags);
1451 		wake_up_process(nt);
1452 		break;
1453 	case Exiting:
1454 		thi->t_state = Restarting;
1455 		dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1456 				me, current->comm, current->pid);
1457 		/* fall through */
1458 	case Running:
1459 	case Restarting:
1460 	default:
1461 		spin_unlock_irqrestore(&thi->t_lock, flags);
1462 		break;
1463 	}
1464 
1465 	return TRUE;
1466 }
1467 
1468 
1469 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1470 {
1471 	unsigned long flags;
1472 
1473 	enum drbd_thread_state ns = restart ? Restarting : Exiting;
1474 
1475 	/* may be called from state engine, holding the req lock irqsave */
1476 	spin_lock_irqsave(&thi->t_lock, flags);
1477 
1478 	if (thi->t_state == None) {
1479 		spin_unlock_irqrestore(&thi->t_lock, flags);
1480 		if (restart)
1481 			drbd_thread_start(thi);
1482 		return;
1483 	}
1484 
1485 	if (thi->t_state != ns) {
1486 		if (thi->task == NULL) {
1487 			spin_unlock_irqrestore(&thi->t_lock, flags);
1488 			return;
1489 		}
1490 
1491 		thi->t_state = ns;
1492 		smp_mb();
1493 		init_completion(&thi->stop);
1494 		if (thi->task != current)
1495 			force_sig(DRBD_SIGKILL, thi->task);
1496 
1497 	}
1498 
1499 	spin_unlock_irqrestore(&thi->t_lock, flags);
1500 
1501 	if (wait)
1502 		wait_for_completion(&thi->stop);
1503 }
1504 
1505 #ifdef CONFIG_SMP
1506 /**
1507  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1508  * @mdev:	DRBD device.
1509  *
1510  * Forces all threads of a device onto the same CPU. This is beneficial for
1511  * DRBD's performance. May be overwritten by user's configuration.
1512  */
1513 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1514 {
1515 	int ord, cpu;
1516 
1517 	/* user override. */
1518 	if (cpumask_weight(mdev->cpu_mask))
1519 		return;
1520 
1521 	ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1522 	for_each_online_cpu(cpu) {
1523 		if (ord-- == 0) {
1524 			cpumask_set_cpu(cpu, mdev->cpu_mask);
1525 			return;
1526 		}
1527 	}
1528 	/* should not be reached */
1529 	cpumask_setall(mdev->cpu_mask);
1530 }
1531 
1532 /**
1533  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1534  * @mdev:	DRBD device.
1535  *
1536  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1537  * prematurely.
1538  */
1539 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1540 {
1541 	struct task_struct *p = current;
1542 	struct drbd_thread *thi =
1543 		p == mdev->asender.task  ? &mdev->asender  :
1544 		p == mdev->receiver.task ? &mdev->receiver :
1545 		p == mdev->worker.task   ? &mdev->worker   :
1546 		NULL;
1547 	ERR_IF(thi == NULL)
1548 		return;
1549 	if (!thi->reset_cpu_mask)
1550 		return;
1551 	thi->reset_cpu_mask = 0;
1552 	set_cpus_allowed_ptr(p, mdev->cpu_mask);
1553 }
1554 #endif
1555 
1556 /* the appropriate socket mutex must be held already */
1557 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1558 			  enum drbd_packets cmd, struct p_header *h,
1559 			  size_t size, unsigned msg_flags)
1560 {
1561 	int sent, ok;
1562 
1563 	ERR_IF(!h) return FALSE;
1564 	ERR_IF(!size) return FALSE;
1565 
1566 	h->magic   = BE_DRBD_MAGIC;
1567 	h->command = cpu_to_be16(cmd);
1568 	h->length  = cpu_to_be16(size-sizeof(struct p_header));
1569 
1570 	sent = drbd_send(mdev, sock, h, size, msg_flags);
1571 
1572 	ok = (sent == size);
1573 	if (!ok)
1574 		dev_err(DEV, "short sent %s size=%d sent=%d\n",
1575 		    cmdname(cmd), (int)size, sent);
1576 	return ok;
1577 }
1578 
1579 /* don't pass the socket. we may only look at it
1580  * when we hold the appropriate socket mutex.
1581  */
1582 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1583 		  enum drbd_packets cmd, struct p_header *h, size_t size)
1584 {
1585 	int ok = 0;
1586 	struct socket *sock;
1587 
1588 	if (use_data_socket) {
1589 		mutex_lock(&mdev->data.mutex);
1590 		sock = mdev->data.socket;
1591 	} else {
1592 		mutex_lock(&mdev->meta.mutex);
1593 		sock = mdev->meta.socket;
1594 	}
1595 
1596 	/* drbd_disconnect() could have called drbd_free_sock()
1597 	 * while we were waiting in down()... */
1598 	if (likely(sock != NULL))
1599 		ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1600 
1601 	if (use_data_socket)
1602 		mutex_unlock(&mdev->data.mutex);
1603 	else
1604 		mutex_unlock(&mdev->meta.mutex);
1605 	return ok;
1606 }
1607 
1608 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
1609 		   size_t size)
1610 {
1611 	struct p_header h;
1612 	int ok;
1613 
1614 	h.magic   = BE_DRBD_MAGIC;
1615 	h.command = cpu_to_be16(cmd);
1616 	h.length  = cpu_to_be16(size);
1617 
1618 	if (!drbd_get_data_sock(mdev))
1619 		return 0;
1620 
1621 	ok = (sizeof(h) ==
1622 		drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
1623 	ok = ok && (size ==
1624 		drbd_send(mdev, mdev->data.socket, data, size, 0));
1625 
1626 	drbd_put_data_sock(mdev);
1627 
1628 	return ok;
1629 }
1630 
1631 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
1632 {
1633 	struct p_rs_param_89 *p;
1634 	struct socket *sock;
1635 	int size, rv;
1636 	const int apv = mdev->agreed_pro_version;
1637 
1638 	size = apv <= 87 ? sizeof(struct p_rs_param)
1639 		: apv == 88 ? sizeof(struct p_rs_param)
1640 			+ strlen(mdev->sync_conf.verify_alg) + 1
1641 		: /* 89 */    sizeof(struct p_rs_param_89);
1642 
1643 	/* used from admin command context and receiver/worker context.
1644 	 * to avoid kmalloc, grab the socket right here,
1645 	 * then use the pre-allocated sbuf there */
1646 	mutex_lock(&mdev->data.mutex);
1647 	sock = mdev->data.socket;
1648 
1649 	if (likely(sock != NULL)) {
1650 		enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
1651 
1652 		p = &mdev->data.sbuf.rs_param_89;
1653 
1654 		/* initialize verify_alg and csums_alg */
1655 		memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
1656 
1657 		p->rate = cpu_to_be32(sc->rate);
1658 
1659 		if (apv >= 88)
1660 			strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
1661 		if (apv >= 89)
1662 			strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
1663 
1664 		rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
1665 	} else
1666 		rv = 0; /* not ok */
1667 
1668 	mutex_unlock(&mdev->data.mutex);
1669 
1670 	return rv;
1671 }
1672 
1673 int drbd_send_protocol(struct drbd_conf *mdev)
1674 {
1675 	struct p_protocol *p;
1676 	int size, cf, rv;
1677 
1678 	size = sizeof(struct p_protocol);
1679 
1680 	if (mdev->agreed_pro_version >= 87)
1681 		size += strlen(mdev->net_conf->integrity_alg) + 1;
1682 
1683 	/* we must not recurse into our own queue,
1684 	 * as that is blocked during handshake */
1685 	p = kmalloc(size, GFP_NOIO);
1686 	if (p == NULL)
1687 		return 0;
1688 
1689 	p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
1690 	p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
1691 	p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
1692 	p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
1693 	p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
1694 
1695 	cf = 0;
1696 	if (mdev->net_conf->want_lose)
1697 		cf |= CF_WANT_LOSE;
1698 	if (mdev->net_conf->dry_run) {
1699 		if (mdev->agreed_pro_version >= 92)
1700 			cf |= CF_DRY_RUN;
1701 		else {
1702 			dev_err(DEV, "--dry-run is not supported by peer");
1703 			kfree(p);
1704 			return 0;
1705 		}
1706 	}
1707 	p->conn_flags    = cpu_to_be32(cf);
1708 
1709 	if (mdev->agreed_pro_version >= 87)
1710 		strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
1711 
1712 	rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
1713 			   (struct p_header *)p, size);
1714 	kfree(p);
1715 	return rv;
1716 }
1717 
1718 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1719 {
1720 	struct p_uuids p;
1721 	int i;
1722 
1723 	if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1724 		return 1;
1725 
1726 	for (i = UI_CURRENT; i < UI_SIZE; i++)
1727 		p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1728 
1729 	mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1730 	p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1731 	uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
1732 	uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1733 	uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1734 	p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1735 
1736 	put_ldev(mdev);
1737 
1738 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
1739 			     (struct p_header *)&p, sizeof(p));
1740 }
1741 
1742 int drbd_send_uuids(struct drbd_conf *mdev)
1743 {
1744 	return _drbd_send_uuids(mdev, 0);
1745 }
1746 
1747 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1748 {
1749 	return _drbd_send_uuids(mdev, 8);
1750 }
1751 
1752 
1753 int drbd_send_sync_uuid(struct drbd_conf *mdev, u64 val)
1754 {
1755 	struct p_rs_uuid p;
1756 
1757 	p.uuid = cpu_to_be64(val);
1758 
1759 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
1760 			     (struct p_header *)&p, sizeof(p));
1761 }
1762 
1763 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1764 {
1765 	struct p_sizes p;
1766 	sector_t d_size, u_size;
1767 	int q_order_type;
1768 	int ok;
1769 
1770 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1771 		D_ASSERT(mdev->ldev->backing_bdev);
1772 		d_size = drbd_get_max_capacity(mdev->ldev);
1773 		u_size = mdev->ldev->dc.disk_size;
1774 		q_order_type = drbd_queue_order_type(mdev);
1775 		put_ldev(mdev);
1776 	} else {
1777 		d_size = 0;
1778 		u_size = 0;
1779 		q_order_type = QUEUE_ORDERED_NONE;
1780 	}
1781 
1782 	p.d_size = cpu_to_be64(d_size);
1783 	p.u_size = cpu_to_be64(u_size);
1784 	p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1785 	p.max_segment_size = cpu_to_be32(queue_max_segment_size(mdev->rq_queue));
1786 	p.queue_order_type = cpu_to_be16(q_order_type);
1787 	p.dds_flags = cpu_to_be16(flags);
1788 
1789 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
1790 			   (struct p_header *)&p, sizeof(p));
1791 	return ok;
1792 }
1793 
1794 /**
1795  * drbd_send_state() - Sends the drbd state to the peer
1796  * @mdev:	DRBD device.
1797  */
1798 int drbd_send_state(struct drbd_conf *mdev)
1799 {
1800 	struct socket *sock;
1801 	struct p_state p;
1802 	int ok = 0;
1803 
1804 	/* Grab state lock so we wont send state if we're in the middle
1805 	 * of a cluster wide state change on another thread */
1806 	drbd_state_lock(mdev);
1807 
1808 	mutex_lock(&mdev->data.mutex);
1809 
1810 	p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1811 	sock = mdev->data.socket;
1812 
1813 	if (likely(sock != NULL)) {
1814 		ok = _drbd_send_cmd(mdev, sock, P_STATE,
1815 				    (struct p_header *)&p, sizeof(p), 0);
1816 	}
1817 
1818 	mutex_unlock(&mdev->data.mutex);
1819 
1820 	drbd_state_unlock(mdev);
1821 	return ok;
1822 }
1823 
1824 int drbd_send_state_req(struct drbd_conf *mdev,
1825 	union drbd_state mask, union drbd_state val)
1826 {
1827 	struct p_req_state p;
1828 
1829 	p.mask    = cpu_to_be32(mask.i);
1830 	p.val     = cpu_to_be32(val.i);
1831 
1832 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
1833 			     (struct p_header *)&p, sizeof(p));
1834 }
1835 
1836 int drbd_send_sr_reply(struct drbd_conf *mdev, int retcode)
1837 {
1838 	struct p_req_state_reply p;
1839 
1840 	p.retcode    = cpu_to_be32(retcode);
1841 
1842 	return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
1843 			     (struct p_header *)&p, sizeof(p));
1844 }
1845 
1846 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1847 	struct p_compressed_bm *p,
1848 	struct bm_xfer_ctx *c)
1849 {
1850 	struct bitstream bs;
1851 	unsigned long plain_bits;
1852 	unsigned long tmp;
1853 	unsigned long rl;
1854 	unsigned len;
1855 	unsigned toggle;
1856 	int bits;
1857 
1858 	/* may we use this feature? */
1859 	if ((mdev->sync_conf.use_rle == 0) ||
1860 		(mdev->agreed_pro_version < 90))
1861 			return 0;
1862 
1863 	if (c->bit_offset >= c->bm_bits)
1864 		return 0; /* nothing to do. */
1865 
1866 	/* use at most thus many bytes */
1867 	bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1868 	memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1869 	/* plain bits covered in this code string */
1870 	plain_bits = 0;
1871 
1872 	/* p->encoding & 0x80 stores whether the first run length is set.
1873 	 * bit offset is implicit.
1874 	 * start with toggle == 2 to be able to tell the first iteration */
1875 	toggle = 2;
1876 
1877 	/* see how much plain bits we can stuff into one packet
1878 	 * using RLE and VLI. */
1879 	do {
1880 		tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1881 				    : _drbd_bm_find_next(mdev, c->bit_offset);
1882 		if (tmp == -1UL)
1883 			tmp = c->bm_bits;
1884 		rl = tmp - c->bit_offset;
1885 
1886 		if (toggle == 2) { /* first iteration */
1887 			if (rl == 0) {
1888 				/* the first checked bit was set,
1889 				 * store start value, */
1890 				DCBP_set_start(p, 1);
1891 				/* but skip encoding of zero run length */
1892 				toggle = !toggle;
1893 				continue;
1894 			}
1895 			DCBP_set_start(p, 0);
1896 		}
1897 
1898 		/* paranoia: catch zero runlength.
1899 		 * can only happen if bitmap is modified while we scan it. */
1900 		if (rl == 0) {
1901 			dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1902 			    "t:%u bo:%lu\n", toggle, c->bit_offset);
1903 			return -1;
1904 		}
1905 
1906 		bits = vli_encode_bits(&bs, rl);
1907 		if (bits == -ENOBUFS) /* buffer full */
1908 			break;
1909 		if (bits <= 0) {
1910 			dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1911 			return 0;
1912 		}
1913 
1914 		toggle = !toggle;
1915 		plain_bits += rl;
1916 		c->bit_offset = tmp;
1917 	} while (c->bit_offset < c->bm_bits);
1918 
1919 	len = bs.cur.b - p->code + !!bs.cur.bit;
1920 
1921 	if (plain_bits < (len << 3)) {
1922 		/* incompressible with this method.
1923 		 * we need to rewind both word and bit position. */
1924 		c->bit_offset -= plain_bits;
1925 		bm_xfer_ctx_bit_to_word_offset(c);
1926 		c->bit_offset = c->word_offset * BITS_PER_LONG;
1927 		return 0;
1928 	}
1929 
1930 	/* RLE + VLI was able to compress it just fine.
1931 	 * update c->word_offset. */
1932 	bm_xfer_ctx_bit_to_word_offset(c);
1933 
1934 	/* store pad_bits */
1935 	DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1936 
1937 	return len;
1938 }
1939 
1940 enum { OK, FAILED, DONE }
1941 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
1942 	struct p_header *h, struct bm_xfer_ctx *c)
1943 {
1944 	struct p_compressed_bm *p = (void*)h;
1945 	unsigned long num_words;
1946 	int len;
1947 	int ok;
1948 
1949 	len = fill_bitmap_rle_bits(mdev, p, c);
1950 
1951 	if (len < 0)
1952 		return FAILED;
1953 
1954 	if (len) {
1955 		DCBP_set_code(p, RLE_VLI_Bits);
1956 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
1957 			sizeof(*p) + len, 0);
1958 
1959 		c->packets[0]++;
1960 		c->bytes[0] += sizeof(*p) + len;
1961 
1962 		if (c->bit_offset >= c->bm_bits)
1963 			len = 0; /* DONE */
1964 	} else {
1965 		/* was not compressible.
1966 		 * send a buffer full of plain text bits instead. */
1967 		num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
1968 		len = num_words * sizeof(long);
1969 		if (len)
1970 			drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
1971 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
1972 				   h, sizeof(struct p_header) + len, 0);
1973 		c->word_offset += num_words;
1974 		c->bit_offset = c->word_offset * BITS_PER_LONG;
1975 
1976 		c->packets[1]++;
1977 		c->bytes[1] += sizeof(struct p_header) + len;
1978 
1979 		if (c->bit_offset > c->bm_bits)
1980 			c->bit_offset = c->bm_bits;
1981 	}
1982 	ok = ok ? ((len == 0) ? DONE : OK) : FAILED;
1983 
1984 	if (ok == DONE)
1985 		INFO_bm_xfer_stats(mdev, "send", c);
1986 	return ok;
1987 }
1988 
1989 /* See the comment at receive_bitmap() */
1990 int _drbd_send_bitmap(struct drbd_conf *mdev)
1991 {
1992 	struct bm_xfer_ctx c;
1993 	struct p_header *p;
1994 	int ret;
1995 
1996 	ERR_IF(!mdev->bitmap) return FALSE;
1997 
1998 	/* maybe we should use some per thread scratch page,
1999 	 * and allocate that during initial device creation? */
2000 	p = (struct p_header *) __get_free_page(GFP_NOIO);
2001 	if (!p) {
2002 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
2003 		return FALSE;
2004 	}
2005 
2006 	if (get_ldev(mdev)) {
2007 		if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2008 			dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2009 			drbd_bm_set_all(mdev);
2010 			if (drbd_bm_write(mdev)) {
2011 				/* write_bm did fail! Leave full sync flag set in Meta P_DATA
2012 				 * but otherwise process as per normal - need to tell other
2013 				 * side that a full resync is required! */
2014 				dev_err(DEV, "Failed to write bitmap to disk!\n");
2015 			} else {
2016 				drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2017 				drbd_md_sync(mdev);
2018 			}
2019 		}
2020 		put_ldev(mdev);
2021 	}
2022 
2023 	c = (struct bm_xfer_ctx) {
2024 		.bm_bits = drbd_bm_bits(mdev),
2025 		.bm_words = drbd_bm_words(mdev),
2026 	};
2027 
2028 	do {
2029 		ret = send_bitmap_rle_or_plain(mdev, p, &c);
2030 	} while (ret == OK);
2031 
2032 	free_page((unsigned long) p);
2033 	return (ret == DONE);
2034 }
2035 
2036 int drbd_send_bitmap(struct drbd_conf *mdev)
2037 {
2038 	int err;
2039 
2040 	if (!drbd_get_data_sock(mdev))
2041 		return -1;
2042 	err = !_drbd_send_bitmap(mdev);
2043 	drbd_put_data_sock(mdev);
2044 	return err;
2045 }
2046 
2047 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2048 {
2049 	int ok;
2050 	struct p_barrier_ack p;
2051 
2052 	p.barrier  = barrier_nr;
2053 	p.set_size = cpu_to_be32(set_size);
2054 
2055 	if (mdev->state.conn < C_CONNECTED)
2056 		return FALSE;
2057 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2058 			(struct p_header *)&p, sizeof(p));
2059 	return ok;
2060 }
2061 
2062 /**
2063  * _drbd_send_ack() - Sends an ack packet
2064  * @mdev:	DRBD device.
2065  * @cmd:	Packet command code.
2066  * @sector:	sector, needs to be in big endian byte order
2067  * @blksize:	size in byte, needs to be in big endian byte order
2068  * @block_id:	Id, big endian byte order
2069  */
2070 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2071 			  u64 sector,
2072 			  u32 blksize,
2073 			  u64 block_id)
2074 {
2075 	int ok;
2076 	struct p_block_ack p;
2077 
2078 	p.sector   = sector;
2079 	p.block_id = block_id;
2080 	p.blksize  = blksize;
2081 	p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2082 
2083 	if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2084 		return FALSE;
2085 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2086 				(struct p_header *)&p, sizeof(p));
2087 	return ok;
2088 }
2089 
2090 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2091 		     struct p_data *dp)
2092 {
2093 	const int header_size = sizeof(struct p_data)
2094 			      - sizeof(struct p_header);
2095 	int data_size  = ((struct p_header *)dp)->length - header_size;
2096 
2097 	return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2098 			      dp->block_id);
2099 }
2100 
2101 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2102 		     struct p_block_req *rp)
2103 {
2104 	return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2105 }
2106 
2107 /**
2108  * drbd_send_ack() - Sends an ack packet
2109  * @mdev:	DRBD device.
2110  * @cmd:	Packet command code.
2111  * @e:		Epoch entry.
2112  */
2113 int drbd_send_ack(struct drbd_conf *mdev,
2114 	enum drbd_packets cmd, struct drbd_epoch_entry *e)
2115 {
2116 	return _drbd_send_ack(mdev, cmd,
2117 			      cpu_to_be64(e->sector),
2118 			      cpu_to_be32(e->size),
2119 			      e->block_id);
2120 }
2121 
2122 /* This function misuses the block_id field to signal if the blocks
2123  * are is sync or not. */
2124 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2125 		     sector_t sector, int blksize, u64 block_id)
2126 {
2127 	return _drbd_send_ack(mdev, cmd,
2128 			      cpu_to_be64(sector),
2129 			      cpu_to_be32(blksize),
2130 			      cpu_to_be64(block_id));
2131 }
2132 
2133 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2134 		       sector_t sector, int size, u64 block_id)
2135 {
2136 	int ok;
2137 	struct p_block_req p;
2138 
2139 	p.sector   = cpu_to_be64(sector);
2140 	p.block_id = block_id;
2141 	p.blksize  = cpu_to_be32(size);
2142 
2143 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2144 				(struct p_header *)&p, sizeof(p));
2145 	return ok;
2146 }
2147 
2148 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2149 			    sector_t sector, int size,
2150 			    void *digest, int digest_size,
2151 			    enum drbd_packets cmd)
2152 {
2153 	int ok;
2154 	struct p_block_req p;
2155 
2156 	p.sector   = cpu_to_be64(sector);
2157 	p.block_id = BE_DRBD_MAGIC + 0xbeef;
2158 	p.blksize  = cpu_to_be32(size);
2159 
2160 	p.head.magic   = BE_DRBD_MAGIC;
2161 	p.head.command = cpu_to_be16(cmd);
2162 	p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header) + digest_size);
2163 
2164 	mutex_lock(&mdev->data.mutex);
2165 
2166 	ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2167 	ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2168 
2169 	mutex_unlock(&mdev->data.mutex);
2170 
2171 	return ok;
2172 }
2173 
2174 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2175 {
2176 	int ok;
2177 	struct p_block_req p;
2178 
2179 	p.sector   = cpu_to_be64(sector);
2180 	p.block_id = BE_DRBD_MAGIC + 0xbabe;
2181 	p.blksize  = cpu_to_be32(size);
2182 
2183 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2184 			   (struct p_header *)&p, sizeof(p));
2185 	return ok;
2186 }
2187 
2188 /* called on sndtimeo
2189  * returns FALSE if we should retry,
2190  * TRUE if we think connection is dead
2191  */
2192 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2193 {
2194 	int drop_it;
2195 	/* long elapsed = (long)(jiffies - mdev->last_received); */
2196 
2197 	drop_it =   mdev->meta.socket == sock
2198 		|| !mdev->asender.task
2199 		|| get_t_state(&mdev->asender) != Running
2200 		|| mdev->state.conn < C_CONNECTED;
2201 
2202 	if (drop_it)
2203 		return TRUE;
2204 
2205 	drop_it = !--mdev->ko_count;
2206 	if (!drop_it) {
2207 		dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2208 		       current->comm, current->pid, mdev->ko_count);
2209 		request_ping(mdev);
2210 	}
2211 
2212 	return drop_it; /* && (mdev->state == R_PRIMARY) */;
2213 }
2214 
2215 /* The idea of sendpage seems to be to put some kind of reference
2216  * to the page into the skb, and to hand it over to the NIC. In
2217  * this process get_page() gets called.
2218  *
2219  * As soon as the page was really sent over the network put_page()
2220  * gets called by some part of the network layer. [ NIC driver? ]
2221  *
2222  * [ get_page() / put_page() increment/decrement the count. If count
2223  *   reaches 0 the page will be freed. ]
2224  *
2225  * This works nicely with pages from FSs.
2226  * But this means that in protocol A we might signal IO completion too early!
2227  *
2228  * In order not to corrupt data during a resync we must make sure
2229  * that we do not reuse our own buffer pages (EEs) to early, therefore
2230  * we have the net_ee list.
2231  *
2232  * XFS seems to have problems, still, it submits pages with page_count == 0!
2233  * As a workaround, we disable sendpage on pages
2234  * with page_count == 0 or PageSlab.
2235  */
2236 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2237 		   int offset, size_t size, unsigned msg_flags)
2238 {
2239 	int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, msg_flags);
2240 	kunmap(page);
2241 	if (sent == size)
2242 		mdev->send_cnt += size>>9;
2243 	return sent == size;
2244 }
2245 
2246 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2247 		    int offset, size_t size, unsigned msg_flags)
2248 {
2249 	mm_segment_t oldfs = get_fs();
2250 	int sent, ok;
2251 	int len = size;
2252 
2253 	/* e.g. XFS meta- & log-data is in slab pages, which have a
2254 	 * page_count of 0 and/or have PageSlab() set.
2255 	 * we cannot use send_page for those, as that does get_page();
2256 	 * put_page(); and would cause either a VM_BUG directly, or
2257 	 * __page_cache_release a page that would actually still be referenced
2258 	 * by someone, leading to some obscure delayed Oops somewhere else. */
2259 	if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2260 		return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
2261 
2262 	msg_flags |= MSG_NOSIGNAL;
2263 	drbd_update_congested(mdev);
2264 	set_fs(KERNEL_DS);
2265 	do {
2266 		sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2267 							offset, len,
2268 							msg_flags);
2269 		if (sent == -EAGAIN) {
2270 			if (we_should_drop_the_connection(mdev,
2271 							  mdev->data.socket))
2272 				break;
2273 			else
2274 				continue;
2275 		}
2276 		if (sent <= 0) {
2277 			dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2278 			     __func__, (int)size, len, sent);
2279 			break;
2280 		}
2281 		len    -= sent;
2282 		offset += sent;
2283 	} while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2284 	set_fs(oldfs);
2285 	clear_bit(NET_CONGESTED, &mdev->flags);
2286 
2287 	ok = (len == 0);
2288 	if (likely(ok))
2289 		mdev->send_cnt += size>>9;
2290 	return ok;
2291 }
2292 
2293 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2294 {
2295 	struct bio_vec *bvec;
2296 	int i;
2297 	/* hint all but last page with MSG_MORE */
2298 	__bio_for_each_segment(bvec, bio, i, 0) {
2299 		if (!_drbd_no_send_page(mdev, bvec->bv_page,
2300 				     bvec->bv_offset, bvec->bv_len,
2301 				     i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2302 			return 0;
2303 	}
2304 	return 1;
2305 }
2306 
2307 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2308 {
2309 	struct bio_vec *bvec;
2310 	int i;
2311 	/* hint all but last page with MSG_MORE */
2312 	__bio_for_each_segment(bvec, bio, i, 0) {
2313 		if (!_drbd_send_page(mdev, bvec->bv_page,
2314 				     bvec->bv_offset, bvec->bv_len,
2315 				     i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2316 			return 0;
2317 	}
2318 	return 1;
2319 }
2320 
2321 static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2322 {
2323 	struct page *page = e->pages;
2324 	unsigned len = e->size;
2325 	/* hint all but last page with MSG_MORE */
2326 	page_chain_for_each(page) {
2327 		unsigned l = min_t(unsigned, len, PAGE_SIZE);
2328 		if (!_drbd_send_page(mdev, page, 0, l,
2329 				page_chain_next(page) ? MSG_MORE : 0))
2330 			return 0;
2331 		len -= l;
2332 	}
2333 	return 1;
2334 }
2335 
2336 /* Used to send write requests
2337  * R_PRIMARY -> Peer	(P_DATA)
2338  */
2339 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2340 {
2341 	int ok = 1;
2342 	struct p_data p;
2343 	unsigned int dp_flags = 0;
2344 	void *dgb;
2345 	int dgs;
2346 
2347 	if (!drbd_get_data_sock(mdev))
2348 		return 0;
2349 
2350 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2351 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2352 
2353 	p.head.magic   = BE_DRBD_MAGIC;
2354 	p.head.command = cpu_to_be16(P_DATA);
2355 	p.head.length  =
2356 		cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + req->size);
2357 
2358 	p.sector   = cpu_to_be64(req->sector);
2359 	p.block_id = (unsigned long)req;
2360 	p.seq_num  = cpu_to_be32(req->seq_num =
2361 				 atomic_add_return(1, &mdev->packet_seq));
2362 	dp_flags = 0;
2363 
2364 	/* NOTE: no need to check if barriers supported here as we would
2365 	 *       not pass the test in make_request_common in that case
2366 	 */
2367 	if (req->master_bio->bi_rw & REQ_HARDBARRIER) {
2368 		dev_err(DEV, "ASSERT FAILED would have set DP_HARDBARRIER\n");
2369 		/* dp_flags |= DP_HARDBARRIER; */
2370 	}
2371 	if (req->master_bio->bi_rw & REQ_SYNC)
2372 		dp_flags |= DP_RW_SYNC;
2373 	/* for now handle SYNCIO and UNPLUG
2374 	 * as if they still were one and the same flag */
2375 	if (req->master_bio->bi_rw & REQ_UNPLUG)
2376 		dp_flags |= DP_RW_SYNC;
2377 	if (mdev->state.conn >= C_SYNC_SOURCE &&
2378 	    mdev->state.conn <= C_PAUSED_SYNC_T)
2379 		dp_flags |= DP_MAY_SET_IN_SYNC;
2380 
2381 	p.dp_flags = cpu_to_be32(dp_flags);
2382 	set_bit(UNPLUG_REMOTE, &mdev->flags);
2383 	ok = (sizeof(p) ==
2384 		drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
2385 	if (ok && dgs) {
2386 		dgb = mdev->int_dig_out;
2387 		drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2388 		ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2389 	}
2390 	if (ok) {
2391 		if (mdev->net_conf->wire_protocol == DRBD_PROT_A)
2392 			ok = _drbd_send_bio(mdev, req->master_bio);
2393 		else
2394 			ok = _drbd_send_zc_bio(mdev, req->master_bio);
2395 	}
2396 
2397 	drbd_put_data_sock(mdev);
2398 
2399 	return ok;
2400 }
2401 
2402 /* answer packet, used to send data back for read requests:
2403  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2404  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2405  */
2406 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2407 		    struct drbd_epoch_entry *e)
2408 {
2409 	int ok;
2410 	struct p_data p;
2411 	void *dgb;
2412 	int dgs;
2413 
2414 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2415 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2416 
2417 	p.head.magic   = BE_DRBD_MAGIC;
2418 	p.head.command = cpu_to_be16(cmd);
2419 	p.head.length  =
2420 		cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + e->size);
2421 
2422 	p.sector   = cpu_to_be64(e->sector);
2423 	p.block_id = e->block_id;
2424 	/* p.seq_num  = 0;    No sequence numbers here.. */
2425 
2426 	/* Only called by our kernel thread.
2427 	 * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2428 	 * in response to admin command or module unload.
2429 	 */
2430 	if (!drbd_get_data_sock(mdev))
2431 		return 0;
2432 
2433 	ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p,
2434 					sizeof(p), dgs ? MSG_MORE : 0);
2435 	if (ok && dgs) {
2436 		dgb = mdev->int_dig_out;
2437 		drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
2438 		ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2439 	}
2440 	if (ok)
2441 		ok = _drbd_send_zc_ee(mdev, e);
2442 
2443 	drbd_put_data_sock(mdev);
2444 
2445 	return ok;
2446 }
2447 
2448 /*
2449   drbd_send distinguishes two cases:
2450 
2451   Packets sent via the data socket "sock"
2452   and packets sent via the meta data socket "msock"
2453 
2454 		    sock                      msock
2455   -----------------+-------------------------+------------------------------
2456   timeout           conf.timeout / 2          conf.timeout / 2
2457   timeout action    send a ping via msock     Abort communication
2458 					      and close all sockets
2459 */
2460 
2461 /*
2462  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2463  */
2464 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2465 	      void *buf, size_t size, unsigned msg_flags)
2466 {
2467 	struct kvec iov;
2468 	struct msghdr msg;
2469 	int rv, sent = 0;
2470 
2471 	if (!sock)
2472 		return -1000;
2473 
2474 	/* THINK  if (signal_pending) return ... ? */
2475 
2476 	iov.iov_base = buf;
2477 	iov.iov_len  = size;
2478 
2479 	msg.msg_name       = NULL;
2480 	msg.msg_namelen    = 0;
2481 	msg.msg_control    = NULL;
2482 	msg.msg_controllen = 0;
2483 	msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
2484 
2485 	if (sock == mdev->data.socket) {
2486 		mdev->ko_count = mdev->net_conf->ko_count;
2487 		drbd_update_congested(mdev);
2488 	}
2489 	do {
2490 		/* STRANGE
2491 		 * tcp_sendmsg does _not_ use its size parameter at all ?
2492 		 *
2493 		 * -EAGAIN on timeout, -EINTR on signal.
2494 		 */
2495 /* THINK
2496  * do we need to block DRBD_SIG if sock == &meta.socket ??
2497  * otherwise wake_asender() might interrupt some send_*Ack !
2498  */
2499 		rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
2500 		if (rv == -EAGAIN) {
2501 			if (we_should_drop_the_connection(mdev, sock))
2502 				break;
2503 			else
2504 				continue;
2505 		}
2506 		D_ASSERT(rv != 0);
2507 		if (rv == -EINTR) {
2508 			flush_signals(current);
2509 			rv = 0;
2510 		}
2511 		if (rv < 0)
2512 			break;
2513 		sent += rv;
2514 		iov.iov_base += rv;
2515 		iov.iov_len  -= rv;
2516 	} while (sent < size);
2517 
2518 	if (sock == mdev->data.socket)
2519 		clear_bit(NET_CONGESTED, &mdev->flags);
2520 
2521 	if (rv <= 0) {
2522 		if (rv != -EAGAIN) {
2523 			dev_err(DEV, "%s_sendmsg returned %d\n",
2524 			    sock == mdev->meta.socket ? "msock" : "sock",
2525 			    rv);
2526 			drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
2527 		} else
2528 			drbd_force_state(mdev, NS(conn, C_TIMEOUT));
2529 	}
2530 
2531 	return sent;
2532 }
2533 
2534 static int drbd_open(struct block_device *bdev, fmode_t mode)
2535 {
2536 	struct drbd_conf *mdev = bdev->bd_disk->private_data;
2537 	unsigned long flags;
2538 	int rv = 0;
2539 
2540 	mutex_lock(&drbd_main_mutex);
2541 	spin_lock_irqsave(&mdev->req_lock, flags);
2542 	/* to have a stable mdev->state.role
2543 	 * and no race with updating open_cnt */
2544 
2545 	if (mdev->state.role != R_PRIMARY) {
2546 		if (mode & FMODE_WRITE)
2547 			rv = -EROFS;
2548 		else if (!allow_oos)
2549 			rv = -EMEDIUMTYPE;
2550 	}
2551 
2552 	if (!rv)
2553 		mdev->open_cnt++;
2554 	spin_unlock_irqrestore(&mdev->req_lock, flags);
2555 	mutex_unlock(&drbd_main_mutex);
2556 
2557 	return rv;
2558 }
2559 
2560 static int drbd_release(struct gendisk *gd, fmode_t mode)
2561 {
2562 	struct drbd_conf *mdev = gd->private_data;
2563 	mutex_lock(&drbd_main_mutex);
2564 	mdev->open_cnt--;
2565 	mutex_unlock(&drbd_main_mutex);
2566 	return 0;
2567 }
2568 
2569 static void drbd_unplug_fn(struct request_queue *q)
2570 {
2571 	struct drbd_conf *mdev = q->queuedata;
2572 
2573 	/* unplug FIRST */
2574 	spin_lock_irq(q->queue_lock);
2575 	blk_remove_plug(q);
2576 	spin_unlock_irq(q->queue_lock);
2577 
2578 	/* only if connected */
2579 	spin_lock_irq(&mdev->req_lock);
2580 	if (mdev->state.pdsk >= D_INCONSISTENT && mdev->state.conn >= C_CONNECTED) {
2581 		D_ASSERT(mdev->state.role == R_PRIMARY);
2582 		if (test_and_clear_bit(UNPLUG_REMOTE, &mdev->flags)) {
2583 			/* add to the data.work queue,
2584 			 * unless already queued.
2585 			 * XXX this might be a good addition to drbd_queue_work
2586 			 * anyways, to detect "double queuing" ... */
2587 			if (list_empty(&mdev->unplug_work.list))
2588 				drbd_queue_work(&mdev->data.work,
2589 						&mdev->unplug_work);
2590 		}
2591 	}
2592 	spin_unlock_irq(&mdev->req_lock);
2593 
2594 	if (mdev->state.disk >= D_INCONSISTENT)
2595 		drbd_kick_lo(mdev);
2596 }
2597 
2598 static void drbd_set_defaults(struct drbd_conf *mdev)
2599 {
2600 	/* This way we get a compile error when sync_conf grows,
2601 	   and we forgot to initialize it here */
2602 	mdev->sync_conf = (struct syncer_conf) {
2603 		/* .rate = */		DRBD_RATE_DEF,
2604 		/* .after = */		DRBD_AFTER_DEF,
2605 		/* .al_extents = */	DRBD_AL_EXTENTS_DEF,
2606 		/* .verify_alg = */	{}, 0,
2607 		/* .cpu_mask = */	{}, 0,
2608 		/* .csums_alg = */	{}, 0,
2609 		/* .use_rle = */	0
2610 	};
2611 
2612 	/* Have to use that way, because the layout differs between
2613 	   big endian and little endian */
2614 	mdev->state = (union drbd_state) {
2615 		{ .role = R_SECONDARY,
2616 		  .peer = R_UNKNOWN,
2617 		  .conn = C_STANDALONE,
2618 		  .disk = D_DISKLESS,
2619 		  .pdsk = D_UNKNOWN,
2620 		  .susp = 0
2621 		} };
2622 }
2623 
2624 void drbd_init_set_defaults(struct drbd_conf *mdev)
2625 {
2626 	/* the memset(,0,) did most of this.
2627 	 * note: only assignments, no allocation in here */
2628 
2629 	drbd_set_defaults(mdev);
2630 
2631 	/* for now, we do NOT yet support it,
2632 	 * even though we start some framework
2633 	 * to eventually support barriers */
2634 	set_bit(NO_BARRIER_SUPP, &mdev->flags);
2635 
2636 	atomic_set(&mdev->ap_bio_cnt, 0);
2637 	atomic_set(&mdev->ap_pending_cnt, 0);
2638 	atomic_set(&mdev->rs_pending_cnt, 0);
2639 	atomic_set(&mdev->unacked_cnt, 0);
2640 	atomic_set(&mdev->local_cnt, 0);
2641 	atomic_set(&mdev->net_cnt, 0);
2642 	atomic_set(&mdev->packet_seq, 0);
2643 	atomic_set(&mdev->pp_in_use, 0);
2644 
2645 	mutex_init(&mdev->md_io_mutex);
2646 	mutex_init(&mdev->data.mutex);
2647 	mutex_init(&mdev->meta.mutex);
2648 	sema_init(&mdev->data.work.s, 0);
2649 	sema_init(&mdev->meta.work.s, 0);
2650 	mutex_init(&mdev->state_mutex);
2651 
2652 	spin_lock_init(&mdev->data.work.q_lock);
2653 	spin_lock_init(&mdev->meta.work.q_lock);
2654 
2655 	spin_lock_init(&mdev->al_lock);
2656 	spin_lock_init(&mdev->req_lock);
2657 	spin_lock_init(&mdev->peer_seq_lock);
2658 	spin_lock_init(&mdev->epoch_lock);
2659 
2660 	INIT_LIST_HEAD(&mdev->active_ee);
2661 	INIT_LIST_HEAD(&mdev->sync_ee);
2662 	INIT_LIST_HEAD(&mdev->done_ee);
2663 	INIT_LIST_HEAD(&mdev->read_ee);
2664 	INIT_LIST_HEAD(&mdev->net_ee);
2665 	INIT_LIST_HEAD(&mdev->resync_reads);
2666 	INIT_LIST_HEAD(&mdev->data.work.q);
2667 	INIT_LIST_HEAD(&mdev->meta.work.q);
2668 	INIT_LIST_HEAD(&mdev->resync_work.list);
2669 	INIT_LIST_HEAD(&mdev->unplug_work.list);
2670 	INIT_LIST_HEAD(&mdev->md_sync_work.list);
2671 	INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2672 
2673 	mdev->resync_work.cb  = w_resync_inactive;
2674 	mdev->unplug_work.cb  = w_send_write_hint;
2675 	mdev->md_sync_work.cb = w_md_sync;
2676 	mdev->bm_io_work.w.cb = w_bitmap_io;
2677 	init_timer(&mdev->resync_timer);
2678 	init_timer(&mdev->md_sync_timer);
2679 	mdev->resync_timer.function = resync_timer_fn;
2680 	mdev->resync_timer.data = (unsigned long) mdev;
2681 	mdev->md_sync_timer.function = md_sync_timer_fn;
2682 	mdev->md_sync_timer.data = (unsigned long) mdev;
2683 
2684 	init_waitqueue_head(&mdev->misc_wait);
2685 	init_waitqueue_head(&mdev->state_wait);
2686 	init_waitqueue_head(&mdev->ee_wait);
2687 	init_waitqueue_head(&mdev->al_wait);
2688 	init_waitqueue_head(&mdev->seq_wait);
2689 
2690 	drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
2691 	drbd_thread_init(mdev, &mdev->worker, drbd_worker);
2692 	drbd_thread_init(mdev, &mdev->asender, drbd_asender);
2693 
2694 	mdev->agreed_pro_version = PRO_VERSION_MAX;
2695 	mdev->write_ordering = WO_bio_barrier;
2696 	mdev->resync_wenr = LC_FREE;
2697 }
2698 
2699 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2700 {
2701 	if (mdev->receiver.t_state != None)
2702 		dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2703 				mdev->receiver.t_state);
2704 
2705 	/* no need to lock it, I'm the only thread alive */
2706 	if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2707 		dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2708 	mdev->al_writ_cnt  =
2709 	mdev->bm_writ_cnt  =
2710 	mdev->read_cnt     =
2711 	mdev->recv_cnt     =
2712 	mdev->send_cnt     =
2713 	mdev->writ_cnt     =
2714 	mdev->p_size       =
2715 	mdev->rs_start     =
2716 	mdev->rs_total     =
2717 	mdev->rs_failed    =
2718 	mdev->rs_mark_left =
2719 	mdev->rs_mark_time = 0;
2720 	D_ASSERT(mdev->net_conf == NULL);
2721 
2722 	drbd_set_my_capacity(mdev, 0);
2723 	if (mdev->bitmap) {
2724 		/* maybe never allocated. */
2725 		drbd_bm_resize(mdev, 0, 1);
2726 		drbd_bm_cleanup(mdev);
2727 	}
2728 
2729 	drbd_free_resources(mdev);
2730 
2731 	/*
2732 	 * currently we drbd_init_ee only on module load, so
2733 	 * we may do drbd_release_ee only on module unload!
2734 	 */
2735 	D_ASSERT(list_empty(&mdev->active_ee));
2736 	D_ASSERT(list_empty(&mdev->sync_ee));
2737 	D_ASSERT(list_empty(&mdev->done_ee));
2738 	D_ASSERT(list_empty(&mdev->read_ee));
2739 	D_ASSERT(list_empty(&mdev->net_ee));
2740 	D_ASSERT(list_empty(&mdev->resync_reads));
2741 	D_ASSERT(list_empty(&mdev->data.work.q));
2742 	D_ASSERT(list_empty(&mdev->meta.work.q));
2743 	D_ASSERT(list_empty(&mdev->resync_work.list));
2744 	D_ASSERT(list_empty(&mdev->unplug_work.list));
2745 
2746 }
2747 
2748 
2749 static void drbd_destroy_mempools(void)
2750 {
2751 	struct page *page;
2752 
2753 	while (drbd_pp_pool) {
2754 		page = drbd_pp_pool;
2755 		drbd_pp_pool = (struct page *)page_private(page);
2756 		__free_page(page);
2757 		drbd_pp_vacant--;
2758 	}
2759 
2760 	/* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2761 
2762 	if (drbd_ee_mempool)
2763 		mempool_destroy(drbd_ee_mempool);
2764 	if (drbd_request_mempool)
2765 		mempool_destroy(drbd_request_mempool);
2766 	if (drbd_ee_cache)
2767 		kmem_cache_destroy(drbd_ee_cache);
2768 	if (drbd_request_cache)
2769 		kmem_cache_destroy(drbd_request_cache);
2770 	if (drbd_bm_ext_cache)
2771 		kmem_cache_destroy(drbd_bm_ext_cache);
2772 	if (drbd_al_ext_cache)
2773 		kmem_cache_destroy(drbd_al_ext_cache);
2774 
2775 	drbd_ee_mempool      = NULL;
2776 	drbd_request_mempool = NULL;
2777 	drbd_ee_cache        = NULL;
2778 	drbd_request_cache   = NULL;
2779 	drbd_bm_ext_cache    = NULL;
2780 	drbd_al_ext_cache    = NULL;
2781 
2782 	return;
2783 }
2784 
2785 static int drbd_create_mempools(void)
2786 {
2787 	struct page *page;
2788 	const int number = (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE) * minor_count;
2789 	int i;
2790 
2791 	/* prepare our caches and mempools */
2792 	drbd_request_mempool = NULL;
2793 	drbd_ee_cache        = NULL;
2794 	drbd_request_cache   = NULL;
2795 	drbd_bm_ext_cache    = NULL;
2796 	drbd_al_ext_cache    = NULL;
2797 	drbd_pp_pool         = NULL;
2798 
2799 	/* caches */
2800 	drbd_request_cache = kmem_cache_create(
2801 		"drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2802 	if (drbd_request_cache == NULL)
2803 		goto Enomem;
2804 
2805 	drbd_ee_cache = kmem_cache_create(
2806 		"drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
2807 	if (drbd_ee_cache == NULL)
2808 		goto Enomem;
2809 
2810 	drbd_bm_ext_cache = kmem_cache_create(
2811 		"drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2812 	if (drbd_bm_ext_cache == NULL)
2813 		goto Enomem;
2814 
2815 	drbd_al_ext_cache = kmem_cache_create(
2816 		"drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2817 	if (drbd_al_ext_cache == NULL)
2818 		goto Enomem;
2819 
2820 	/* mempools */
2821 	drbd_request_mempool = mempool_create(number,
2822 		mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2823 	if (drbd_request_mempool == NULL)
2824 		goto Enomem;
2825 
2826 	drbd_ee_mempool = mempool_create(number,
2827 		mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2828 	if (drbd_request_mempool == NULL)
2829 		goto Enomem;
2830 
2831 	/* drbd's page pool */
2832 	spin_lock_init(&drbd_pp_lock);
2833 
2834 	for (i = 0; i < number; i++) {
2835 		page = alloc_page(GFP_HIGHUSER);
2836 		if (!page)
2837 			goto Enomem;
2838 		set_page_private(page, (unsigned long)drbd_pp_pool);
2839 		drbd_pp_pool = page;
2840 	}
2841 	drbd_pp_vacant = number;
2842 
2843 	return 0;
2844 
2845 Enomem:
2846 	drbd_destroy_mempools(); /* in case we allocated some */
2847 	return -ENOMEM;
2848 }
2849 
2850 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2851 	void *unused)
2852 {
2853 	/* just so we have it.  you never know what interesting things we
2854 	 * might want to do here some day...
2855 	 */
2856 
2857 	return NOTIFY_DONE;
2858 }
2859 
2860 static struct notifier_block drbd_notifier = {
2861 	.notifier_call = drbd_notify_sys,
2862 };
2863 
2864 static void drbd_release_ee_lists(struct drbd_conf *mdev)
2865 {
2866 	int rr;
2867 
2868 	rr = drbd_release_ee(mdev, &mdev->active_ee);
2869 	if (rr)
2870 		dev_err(DEV, "%d EEs in active list found!\n", rr);
2871 
2872 	rr = drbd_release_ee(mdev, &mdev->sync_ee);
2873 	if (rr)
2874 		dev_err(DEV, "%d EEs in sync list found!\n", rr);
2875 
2876 	rr = drbd_release_ee(mdev, &mdev->read_ee);
2877 	if (rr)
2878 		dev_err(DEV, "%d EEs in read list found!\n", rr);
2879 
2880 	rr = drbd_release_ee(mdev, &mdev->done_ee);
2881 	if (rr)
2882 		dev_err(DEV, "%d EEs in done list found!\n", rr);
2883 
2884 	rr = drbd_release_ee(mdev, &mdev->net_ee);
2885 	if (rr)
2886 		dev_err(DEV, "%d EEs in net list found!\n", rr);
2887 }
2888 
2889 /* caution. no locking.
2890  * currently only used from module cleanup code. */
2891 static void drbd_delete_device(unsigned int minor)
2892 {
2893 	struct drbd_conf *mdev = minor_to_mdev(minor);
2894 
2895 	if (!mdev)
2896 		return;
2897 
2898 	/* paranoia asserts */
2899 	if (mdev->open_cnt != 0)
2900 		dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
2901 				__FILE__ , __LINE__);
2902 
2903 	ERR_IF (!list_empty(&mdev->data.work.q)) {
2904 		struct list_head *lp;
2905 		list_for_each(lp, &mdev->data.work.q) {
2906 			dev_err(DEV, "lp = %p\n", lp);
2907 		}
2908 	};
2909 	/* end paranoia asserts */
2910 
2911 	del_gendisk(mdev->vdisk);
2912 
2913 	/* cleanup stuff that may have been allocated during
2914 	 * device (re-)configuration or state changes */
2915 
2916 	if (mdev->this_bdev)
2917 		bdput(mdev->this_bdev);
2918 
2919 	drbd_free_resources(mdev);
2920 
2921 	drbd_release_ee_lists(mdev);
2922 
2923 	/* should be free'd on disconnect? */
2924 	kfree(mdev->ee_hash);
2925 	/*
2926 	mdev->ee_hash_s = 0;
2927 	mdev->ee_hash = NULL;
2928 	*/
2929 
2930 	lc_destroy(mdev->act_log);
2931 	lc_destroy(mdev->resync);
2932 
2933 	kfree(mdev->p_uuid);
2934 	/* mdev->p_uuid = NULL; */
2935 
2936 	kfree(mdev->int_dig_out);
2937 	kfree(mdev->int_dig_in);
2938 	kfree(mdev->int_dig_vv);
2939 
2940 	/* cleanup the rest that has been
2941 	 * allocated from drbd_new_device
2942 	 * and actually free the mdev itself */
2943 	drbd_free_mdev(mdev);
2944 }
2945 
2946 static void drbd_cleanup(void)
2947 {
2948 	unsigned int i;
2949 
2950 	unregister_reboot_notifier(&drbd_notifier);
2951 
2952 	drbd_nl_cleanup();
2953 
2954 	if (minor_table) {
2955 		if (drbd_proc)
2956 			remove_proc_entry("drbd", NULL);
2957 		i = minor_count;
2958 		while (i--)
2959 			drbd_delete_device(i);
2960 		drbd_destroy_mempools();
2961 	}
2962 
2963 	kfree(minor_table);
2964 
2965 	unregister_blkdev(DRBD_MAJOR, "drbd");
2966 
2967 	printk(KERN_INFO "drbd: module cleanup done.\n");
2968 }
2969 
2970 /**
2971  * drbd_congested() - Callback for pdflush
2972  * @congested_data:	User data
2973  * @bdi_bits:		Bits pdflush is currently interested in
2974  *
2975  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2976  */
2977 static int drbd_congested(void *congested_data, int bdi_bits)
2978 {
2979 	struct drbd_conf *mdev = congested_data;
2980 	struct request_queue *q;
2981 	char reason = '-';
2982 	int r = 0;
2983 
2984 	if (!__inc_ap_bio_cond(mdev)) {
2985 		/* DRBD has frozen IO */
2986 		r = bdi_bits;
2987 		reason = 'd';
2988 		goto out;
2989 	}
2990 
2991 	if (get_ldev(mdev)) {
2992 		q = bdev_get_queue(mdev->ldev->backing_bdev);
2993 		r = bdi_congested(&q->backing_dev_info, bdi_bits);
2994 		put_ldev(mdev);
2995 		if (r)
2996 			reason = 'b';
2997 	}
2998 
2999 	if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
3000 		r |= (1 << BDI_async_congested);
3001 		reason = reason == 'b' ? 'a' : 'n';
3002 	}
3003 
3004 out:
3005 	mdev->congestion_reason = reason;
3006 	return r;
3007 }
3008 
3009 struct drbd_conf *drbd_new_device(unsigned int minor)
3010 {
3011 	struct drbd_conf *mdev;
3012 	struct gendisk *disk;
3013 	struct request_queue *q;
3014 
3015 	/* GFP_KERNEL, we are outside of all write-out paths */
3016 	mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
3017 	if (!mdev)
3018 		return NULL;
3019 	if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
3020 		goto out_no_cpumask;
3021 
3022 	mdev->minor = minor;
3023 
3024 	drbd_init_set_defaults(mdev);
3025 
3026 	q = blk_alloc_queue(GFP_KERNEL);
3027 	if (!q)
3028 		goto out_no_q;
3029 	mdev->rq_queue = q;
3030 	q->queuedata   = mdev;
3031 
3032 	disk = alloc_disk(1);
3033 	if (!disk)
3034 		goto out_no_disk;
3035 	mdev->vdisk = disk;
3036 
3037 	set_disk_ro(disk, TRUE);
3038 
3039 	disk->queue = q;
3040 	disk->major = DRBD_MAJOR;
3041 	disk->first_minor = minor;
3042 	disk->fops = &drbd_ops;
3043 	sprintf(disk->disk_name, "drbd%d", minor);
3044 	disk->private_data = mdev;
3045 
3046 	mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3047 	/* we have no partitions. we contain only ourselves. */
3048 	mdev->this_bdev->bd_contains = mdev->this_bdev;
3049 
3050 	q->backing_dev_info.congested_fn = drbd_congested;
3051 	q->backing_dev_info.congested_data = mdev;
3052 
3053 	blk_queue_make_request(q, drbd_make_request_26);
3054 	blk_queue_max_segment_size(q, DRBD_MAX_SEGMENT_SIZE);
3055 	blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3056 	blk_queue_merge_bvec(q, drbd_merge_bvec);
3057 	q->queue_lock = &mdev->req_lock; /* needed since we use */
3058 		/* plugging on a queue, that actually has no requests! */
3059 	q->unplug_fn = drbd_unplug_fn;
3060 
3061 	mdev->md_io_page = alloc_page(GFP_KERNEL);
3062 	if (!mdev->md_io_page)
3063 		goto out_no_io_page;
3064 
3065 	if (drbd_bm_init(mdev))
3066 		goto out_no_bitmap;
3067 	/* no need to lock access, we are still initializing this minor device. */
3068 	if (!tl_init(mdev))
3069 		goto out_no_tl;
3070 
3071 	mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3072 	if (!mdev->app_reads_hash)
3073 		goto out_no_app_reads;
3074 
3075 	mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3076 	if (!mdev->current_epoch)
3077 		goto out_no_epoch;
3078 
3079 	INIT_LIST_HEAD(&mdev->current_epoch->list);
3080 	mdev->epochs = 1;
3081 
3082 	return mdev;
3083 
3084 /* out_whatever_else:
3085 	kfree(mdev->current_epoch); */
3086 out_no_epoch:
3087 	kfree(mdev->app_reads_hash);
3088 out_no_app_reads:
3089 	tl_cleanup(mdev);
3090 out_no_tl:
3091 	drbd_bm_cleanup(mdev);
3092 out_no_bitmap:
3093 	__free_page(mdev->md_io_page);
3094 out_no_io_page:
3095 	put_disk(disk);
3096 out_no_disk:
3097 	blk_cleanup_queue(q);
3098 out_no_q:
3099 	free_cpumask_var(mdev->cpu_mask);
3100 out_no_cpumask:
3101 	kfree(mdev);
3102 	return NULL;
3103 }
3104 
3105 /* counterpart of drbd_new_device.
3106  * last part of drbd_delete_device. */
3107 void drbd_free_mdev(struct drbd_conf *mdev)
3108 {
3109 	kfree(mdev->current_epoch);
3110 	kfree(mdev->app_reads_hash);
3111 	tl_cleanup(mdev);
3112 	if (mdev->bitmap) /* should no longer be there. */
3113 		drbd_bm_cleanup(mdev);
3114 	__free_page(mdev->md_io_page);
3115 	put_disk(mdev->vdisk);
3116 	blk_cleanup_queue(mdev->rq_queue);
3117 	free_cpumask_var(mdev->cpu_mask);
3118 	kfree(mdev);
3119 }
3120 
3121 
3122 int __init drbd_init(void)
3123 {
3124 	int err;
3125 
3126 	if (sizeof(struct p_handshake) != 80) {
3127 		printk(KERN_ERR
3128 		       "drbd: never change the size or layout "
3129 		       "of the HandShake packet.\n");
3130 		return -EINVAL;
3131 	}
3132 
3133 	if (1 > minor_count || minor_count > 255) {
3134 		printk(KERN_ERR
3135 			"drbd: invalid minor_count (%d)\n", minor_count);
3136 #ifdef MODULE
3137 		return -EINVAL;
3138 #else
3139 		minor_count = 8;
3140 #endif
3141 	}
3142 
3143 	err = drbd_nl_init();
3144 	if (err)
3145 		return err;
3146 
3147 	err = register_blkdev(DRBD_MAJOR, "drbd");
3148 	if (err) {
3149 		printk(KERN_ERR
3150 		       "drbd: unable to register block device major %d\n",
3151 		       DRBD_MAJOR);
3152 		return err;
3153 	}
3154 
3155 	register_reboot_notifier(&drbd_notifier);
3156 
3157 	/*
3158 	 * allocate all necessary structs
3159 	 */
3160 	err = -ENOMEM;
3161 
3162 	init_waitqueue_head(&drbd_pp_wait);
3163 
3164 	drbd_proc = NULL; /* play safe for drbd_cleanup */
3165 	minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3166 				GFP_KERNEL);
3167 	if (!minor_table)
3168 		goto Enomem;
3169 
3170 	err = drbd_create_mempools();
3171 	if (err)
3172 		goto Enomem;
3173 
3174 	drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
3175 	if (!drbd_proc)	{
3176 		printk(KERN_ERR "drbd: unable to register proc file\n");
3177 		goto Enomem;
3178 	}
3179 
3180 	rwlock_init(&global_state_lock);
3181 
3182 	printk(KERN_INFO "drbd: initialized. "
3183 	       "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3184 	       API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3185 	printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3186 	printk(KERN_INFO "drbd: registered as block device major %d\n",
3187 		DRBD_MAJOR);
3188 	printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3189 
3190 	return 0; /* Success! */
3191 
3192 Enomem:
3193 	drbd_cleanup();
3194 	if (err == -ENOMEM)
3195 		/* currently always the case */
3196 		printk(KERN_ERR "drbd: ran out of memory\n");
3197 	else
3198 		printk(KERN_ERR "drbd: initialization failure\n");
3199 	return err;
3200 }
3201 
3202 void drbd_free_bc(struct drbd_backing_dev *ldev)
3203 {
3204 	if (ldev == NULL)
3205 		return;
3206 
3207 	bd_release(ldev->backing_bdev);
3208 	bd_release(ldev->md_bdev);
3209 
3210 	fput(ldev->lo_file);
3211 	fput(ldev->md_file);
3212 
3213 	kfree(ldev);
3214 }
3215 
3216 void drbd_free_sock(struct drbd_conf *mdev)
3217 {
3218 	if (mdev->data.socket) {
3219 		mutex_lock(&mdev->data.mutex);
3220 		kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3221 		sock_release(mdev->data.socket);
3222 		mdev->data.socket = NULL;
3223 		mutex_unlock(&mdev->data.mutex);
3224 	}
3225 	if (mdev->meta.socket) {
3226 		mutex_lock(&mdev->meta.mutex);
3227 		kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3228 		sock_release(mdev->meta.socket);
3229 		mdev->meta.socket = NULL;
3230 		mutex_unlock(&mdev->meta.mutex);
3231 	}
3232 }
3233 
3234 
3235 void drbd_free_resources(struct drbd_conf *mdev)
3236 {
3237 	crypto_free_hash(mdev->csums_tfm);
3238 	mdev->csums_tfm = NULL;
3239 	crypto_free_hash(mdev->verify_tfm);
3240 	mdev->verify_tfm = NULL;
3241 	crypto_free_hash(mdev->cram_hmac_tfm);
3242 	mdev->cram_hmac_tfm = NULL;
3243 	crypto_free_hash(mdev->integrity_w_tfm);
3244 	mdev->integrity_w_tfm = NULL;
3245 	crypto_free_hash(mdev->integrity_r_tfm);
3246 	mdev->integrity_r_tfm = NULL;
3247 
3248 	drbd_free_sock(mdev);
3249 
3250 	__no_warn(local,
3251 		  drbd_free_bc(mdev->ldev);
3252 		  mdev->ldev = NULL;);
3253 }
3254 
3255 /* meta data management */
3256 
3257 struct meta_data_on_disk {
3258 	u64 la_size;           /* last agreed size. */
3259 	u64 uuid[UI_SIZE];   /* UUIDs. */
3260 	u64 device_uuid;
3261 	u64 reserved_u64_1;
3262 	u32 flags;             /* MDF */
3263 	u32 magic;
3264 	u32 md_size_sect;
3265 	u32 al_offset;         /* offset to this block */
3266 	u32 al_nr_extents;     /* important for restoring the AL */
3267 	      /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3268 	u32 bm_offset;         /* offset to the bitmap, from here */
3269 	u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3270 	u32 reserved_u32[4];
3271 
3272 } __packed;
3273 
3274 /**
3275  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3276  * @mdev:	DRBD device.
3277  */
3278 void drbd_md_sync(struct drbd_conf *mdev)
3279 {
3280 	struct meta_data_on_disk *buffer;
3281 	sector_t sector;
3282 	int i;
3283 
3284 	if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3285 		return;
3286 	del_timer(&mdev->md_sync_timer);
3287 
3288 	/* We use here D_FAILED and not D_ATTACHING because we try to write
3289 	 * metadata even if we detach due to a disk failure! */
3290 	if (!get_ldev_if_state(mdev, D_FAILED))
3291 		return;
3292 
3293 	mutex_lock(&mdev->md_io_mutex);
3294 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3295 	memset(buffer, 0, 512);
3296 
3297 	buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3298 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3299 		buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3300 	buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3301 	buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3302 
3303 	buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3304 	buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3305 	buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3306 	buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3307 	buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3308 
3309 	buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3310 
3311 	D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3312 	sector = mdev->ldev->md.md_offset;
3313 
3314 	if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3315 		clear_bit(MD_DIRTY, &mdev->flags);
3316 	} else {
3317 		/* this was a try anyways ... */
3318 		dev_err(DEV, "meta data update failed!\n");
3319 
3320 		drbd_chk_io_error(mdev, 1, TRUE);
3321 	}
3322 
3323 	/* Update mdev->ldev->md.la_size_sect,
3324 	 * since we updated it on metadata. */
3325 	mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3326 
3327 	mutex_unlock(&mdev->md_io_mutex);
3328 	put_ldev(mdev);
3329 }
3330 
3331 /**
3332  * drbd_md_read() - Reads in the meta data super block
3333  * @mdev:	DRBD device.
3334  * @bdev:	Device from which the meta data should be read in.
3335  *
3336  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_codes in case
3337  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3338  */
3339 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3340 {
3341 	struct meta_data_on_disk *buffer;
3342 	int i, rv = NO_ERROR;
3343 
3344 	if (!get_ldev_if_state(mdev, D_ATTACHING))
3345 		return ERR_IO_MD_DISK;
3346 
3347 	mutex_lock(&mdev->md_io_mutex);
3348 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3349 
3350 	if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3351 		/* NOTE: cant do normal error processing here as this is
3352 		   called BEFORE disk is attached */
3353 		dev_err(DEV, "Error while reading metadata.\n");
3354 		rv = ERR_IO_MD_DISK;
3355 		goto err;
3356 	}
3357 
3358 	if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3359 		dev_err(DEV, "Error while reading metadata, magic not found.\n");
3360 		rv = ERR_MD_INVALID;
3361 		goto err;
3362 	}
3363 	if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3364 		dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3365 		    be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3366 		rv = ERR_MD_INVALID;
3367 		goto err;
3368 	}
3369 	if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3370 		dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3371 		    be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3372 		rv = ERR_MD_INVALID;
3373 		goto err;
3374 	}
3375 	if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3376 		dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3377 		    be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3378 		rv = ERR_MD_INVALID;
3379 		goto err;
3380 	}
3381 
3382 	if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3383 		dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3384 		    be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3385 		rv = ERR_MD_INVALID;
3386 		goto err;
3387 	}
3388 
3389 	bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3390 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3391 		bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3392 	bdev->md.flags = be32_to_cpu(buffer->flags);
3393 	mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3394 	bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3395 
3396 	if (mdev->sync_conf.al_extents < 7)
3397 		mdev->sync_conf.al_extents = 127;
3398 
3399  err:
3400 	mutex_unlock(&mdev->md_io_mutex);
3401 	put_ldev(mdev);
3402 
3403 	return rv;
3404 }
3405 
3406 /**
3407  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3408  * @mdev:	DRBD device.
3409  *
3410  * Call this function if you change anything that should be written to
3411  * the meta-data super block. This function sets MD_DIRTY, and starts a
3412  * timer that ensures that within five seconds you have to call drbd_md_sync().
3413  */
3414 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3415 {
3416 	set_bit(MD_DIRTY, &mdev->flags);
3417 	mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3418 }
3419 
3420 
3421 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3422 {
3423 	int i;
3424 
3425 	for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3426 		mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3427 }
3428 
3429 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3430 {
3431 	if (idx == UI_CURRENT) {
3432 		if (mdev->state.role == R_PRIMARY)
3433 			val |= 1;
3434 		else
3435 			val &= ~((u64)1);
3436 
3437 		drbd_set_ed_uuid(mdev, val);
3438 	}
3439 
3440 	mdev->ldev->md.uuid[idx] = val;
3441 	drbd_md_mark_dirty(mdev);
3442 }
3443 
3444 
3445 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3446 {
3447 	if (mdev->ldev->md.uuid[idx]) {
3448 		drbd_uuid_move_history(mdev);
3449 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3450 	}
3451 	_drbd_uuid_set(mdev, idx, val);
3452 }
3453 
3454 /**
3455  * drbd_uuid_new_current() - Creates a new current UUID
3456  * @mdev:	DRBD device.
3457  *
3458  * Creates a new current UUID, and rotates the old current UUID into
3459  * the bitmap slot. Causes an incremental resync upon next connect.
3460  */
3461 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3462 {
3463 	u64 val;
3464 
3465 	dev_info(DEV, "Creating new current UUID\n");
3466 	D_ASSERT(mdev->ldev->md.uuid[UI_BITMAP] == 0);
3467 	mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3468 
3469 	get_random_bytes(&val, sizeof(u64));
3470 	_drbd_uuid_set(mdev, UI_CURRENT, val);
3471 }
3472 
3473 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3474 {
3475 	if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3476 		return;
3477 
3478 	if (val == 0) {
3479 		drbd_uuid_move_history(mdev);
3480 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3481 		mdev->ldev->md.uuid[UI_BITMAP] = 0;
3482 	} else {
3483 		if (mdev->ldev->md.uuid[UI_BITMAP])
3484 			dev_warn(DEV, "bm UUID already set");
3485 
3486 		mdev->ldev->md.uuid[UI_BITMAP] = val;
3487 		mdev->ldev->md.uuid[UI_BITMAP] &= ~((u64)1);
3488 
3489 	}
3490 	drbd_md_mark_dirty(mdev);
3491 }
3492 
3493 /**
3494  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3495  * @mdev:	DRBD device.
3496  *
3497  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3498  */
3499 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3500 {
3501 	int rv = -EIO;
3502 
3503 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
3504 		drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3505 		drbd_md_sync(mdev);
3506 		drbd_bm_set_all(mdev);
3507 
3508 		rv = drbd_bm_write(mdev);
3509 
3510 		if (!rv) {
3511 			drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3512 			drbd_md_sync(mdev);
3513 		}
3514 
3515 		put_ldev(mdev);
3516 	}
3517 
3518 	return rv;
3519 }
3520 
3521 /**
3522  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3523  * @mdev:	DRBD device.
3524  *
3525  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3526  */
3527 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3528 {
3529 	int rv = -EIO;
3530 
3531 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
3532 		drbd_bm_clear_all(mdev);
3533 		rv = drbd_bm_write(mdev);
3534 		put_ldev(mdev);
3535 	}
3536 
3537 	return rv;
3538 }
3539 
3540 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3541 {
3542 	struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3543 	int rv;
3544 
3545 	D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3546 
3547 	drbd_bm_lock(mdev, work->why);
3548 	rv = work->io_fn(mdev);
3549 	drbd_bm_unlock(mdev);
3550 
3551 	clear_bit(BITMAP_IO, &mdev->flags);
3552 	wake_up(&mdev->misc_wait);
3553 
3554 	if (work->done)
3555 		work->done(mdev, rv);
3556 
3557 	clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3558 	work->why = NULL;
3559 
3560 	return 1;
3561 }
3562 
3563 /**
3564  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3565  * @mdev:	DRBD device.
3566  * @io_fn:	IO callback to be called when bitmap IO is possible
3567  * @done:	callback to be called after the bitmap IO was performed
3568  * @why:	Descriptive text of the reason for doing the IO
3569  *
3570  * While IO on the bitmap happens we freeze application IO thus we ensure
3571  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3572  * called from worker context. It MUST NOT be used while a previous such
3573  * work is still pending!
3574  */
3575 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3576 			  int (*io_fn)(struct drbd_conf *),
3577 			  void (*done)(struct drbd_conf *, int),
3578 			  char *why)
3579 {
3580 	D_ASSERT(current == mdev->worker.task);
3581 
3582 	D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3583 	D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3584 	D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3585 	if (mdev->bm_io_work.why)
3586 		dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3587 			why, mdev->bm_io_work.why);
3588 
3589 	mdev->bm_io_work.io_fn = io_fn;
3590 	mdev->bm_io_work.done = done;
3591 	mdev->bm_io_work.why = why;
3592 
3593 	set_bit(BITMAP_IO, &mdev->flags);
3594 	if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3595 		if (list_empty(&mdev->bm_io_work.w.list)) {
3596 			set_bit(BITMAP_IO_QUEUED, &mdev->flags);
3597 			drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
3598 		} else
3599 			dev_err(DEV, "FIXME avoided double queuing bm_io_work\n");
3600 	}
3601 }
3602 
3603 /**
3604  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3605  * @mdev:	DRBD device.
3606  * @io_fn:	IO callback to be called when bitmap IO is possible
3607  * @why:	Descriptive text of the reason for doing the IO
3608  *
3609  * freezes application IO while that the actual IO operations runs. This
3610  * functions MAY NOT be called from worker context.
3611  */
3612 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why)
3613 {
3614 	int rv;
3615 
3616 	D_ASSERT(current != mdev->worker.task);
3617 
3618 	drbd_suspend_io(mdev);
3619 
3620 	drbd_bm_lock(mdev, why);
3621 	rv = io_fn(mdev);
3622 	drbd_bm_unlock(mdev);
3623 
3624 	drbd_resume_io(mdev);
3625 
3626 	return rv;
3627 }
3628 
3629 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3630 {
3631 	if ((mdev->ldev->md.flags & flag) != flag) {
3632 		drbd_md_mark_dirty(mdev);
3633 		mdev->ldev->md.flags |= flag;
3634 	}
3635 }
3636 
3637 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3638 {
3639 	if ((mdev->ldev->md.flags & flag) != 0) {
3640 		drbd_md_mark_dirty(mdev);
3641 		mdev->ldev->md.flags &= ~flag;
3642 	}
3643 }
3644 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3645 {
3646 	return (bdev->md.flags & flag) != 0;
3647 }
3648 
3649 static void md_sync_timer_fn(unsigned long data)
3650 {
3651 	struct drbd_conf *mdev = (struct drbd_conf *) data;
3652 
3653 	drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
3654 }
3655 
3656 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3657 {
3658 	dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3659 	drbd_md_sync(mdev);
3660 
3661 	return 1;
3662 }
3663 
3664 #ifdef CONFIG_DRBD_FAULT_INJECTION
3665 /* Fault insertion support including random number generator shamelessly
3666  * stolen from kernel/rcutorture.c */
3667 struct fault_random_state {
3668 	unsigned long state;
3669 	unsigned long count;
3670 };
3671 
3672 #define FAULT_RANDOM_MULT 39916801  /* prime */
3673 #define FAULT_RANDOM_ADD	479001701 /* prime */
3674 #define FAULT_RANDOM_REFRESH 10000
3675 
3676 /*
3677  * Crude but fast random-number generator.  Uses a linear congruential
3678  * generator, with occasional help from get_random_bytes().
3679  */
3680 static unsigned long
3681 _drbd_fault_random(struct fault_random_state *rsp)
3682 {
3683 	long refresh;
3684 
3685 	if (!rsp->count--) {
3686 		get_random_bytes(&refresh, sizeof(refresh));
3687 		rsp->state += refresh;
3688 		rsp->count = FAULT_RANDOM_REFRESH;
3689 	}
3690 	rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3691 	return swahw32(rsp->state);
3692 }
3693 
3694 static char *
3695 _drbd_fault_str(unsigned int type) {
3696 	static char *_faults[] = {
3697 		[DRBD_FAULT_MD_WR] = "Meta-data write",
3698 		[DRBD_FAULT_MD_RD] = "Meta-data read",
3699 		[DRBD_FAULT_RS_WR] = "Resync write",
3700 		[DRBD_FAULT_RS_RD] = "Resync read",
3701 		[DRBD_FAULT_DT_WR] = "Data write",
3702 		[DRBD_FAULT_DT_RD] = "Data read",
3703 		[DRBD_FAULT_DT_RA] = "Data read ahead",
3704 		[DRBD_FAULT_BM_ALLOC] = "BM allocation",
3705 		[DRBD_FAULT_AL_EE] = "EE allocation",
3706 		[DRBD_FAULT_RECEIVE] = "receive data corruption",
3707 	};
3708 
3709 	return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3710 }
3711 
3712 unsigned int
3713 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3714 {
3715 	static struct fault_random_state rrs = {0, 0};
3716 
3717 	unsigned int ret = (
3718 		(fault_devs == 0 ||
3719 			((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3720 		(((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3721 
3722 	if (ret) {
3723 		fault_count++;
3724 
3725 		if (__ratelimit(&drbd_ratelimit_state))
3726 			dev_warn(DEV, "***Simulating %s failure\n",
3727 				_drbd_fault_str(type));
3728 	}
3729 
3730 	return ret;
3731 }
3732 #endif
3733 
3734 const char *drbd_buildtag(void)
3735 {
3736 	/* DRBD built from external sources has here a reference to the
3737 	   git hash of the source code. */
3738 
3739 	static char buildtag[38] = "\0uilt-in";
3740 
3741 	if (buildtag[0] == 0) {
3742 #ifdef CONFIG_MODULES
3743 		if (THIS_MODULE != NULL)
3744 			sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3745 		else
3746 #endif
3747 			buildtag[0] = 'b';
3748 	}
3749 
3750 	return buildtag;
3751 }
3752 
3753 module_init(drbd_init)
3754 module_exit(drbd_cleanup)
3755 
3756 EXPORT_SYMBOL(drbd_conn_str);
3757 EXPORT_SYMBOL(drbd_role_str);
3758 EXPORT_SYMBOL(drbd_disk_str);
3759 EXPORT_SYMBOL(drbd_set_st_err_str);
3760