xref: /linux/drivers/block/drbd/drbd_main.c (revision a33f32244d8550da8b4a26e277ce07d5c6d158b5)
1 /*
2    drbd.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12 
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17 
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22 
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 
27  */
28 
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/smp_lock.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48 
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52 
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56 
57 #include "drbd_vli.h"
58 
59 struct after_state_chg_work {
60 	struct drbd_work w;
61 	union drbd_state os;
62 	union drbd_state ns;
63 	enum chg_state_flags flags;
64 	struct completion *done;
65 };
66 
67 int drbdd_init(struct drbd_thread *);
68 int drbd_worker(struct drbd_thread *);
69 int drbd_asender(struct drbd_thread *);
70 
71 int drbd_init(void);
72 static int drbd_open(struct block_device *bdev, fmode_t mode);
73 static int drbd_release(struct gendisk *gd, fmode_t mode);
74 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
75 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
76 			   union drbd_state ns, enum chg_state_flags flags);
77 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
78 static void md_sync_timer_fn(unsigned long data);
79 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
80 
81 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
82 	      "Lars Ellenberg <lars@linbit.com>");
83 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
84 MODULE_VERSION(REL_VERSION);
85 MODULE_LICENSE("GPL");
86 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices (1-255)");
87 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
88 
89 #include <linux/moduleparam.h>
90 /* allow_open_on_secondary */
91 MODULE_PARM_DESC(allow_oos, "DONT USE!");
92 /* thanks to these macros, if compiled into the kernel (not-module),
93  * this becomes the boot parameter drbd.minor_count */
94 module_param(minor_count, uint, 0444);
95 module_param(disable_sendpage, bool, 0644);
96 module_param(allow_oos, bool, 0);
97 module_param(cn_idx, uint, 0444);
98 module_param(proc_details, int, 0644);
99 
100 #ifdef CONFIG_DRBD_FAULT_INJECTION
101 int enable_faults;
102 int fault_rate;
103 static int fault_count;
104 int fault_devs;
105 /* bitmap of enabled faults */
106 module_param(enable_faults, int, 0664);
107 /* fault rate % value - applies to all enabled faults */
108 module_param(fault_rate, int, 0664);
109 /* count of faults inserted */
110 module_param(fault_count, int, 0664);
111 /* bitmap of devices to insert faults on */
112 module_param(fault_devs, int, 0644);
113 #endif
114 
115 /* module parameter, defined */
116 unsigned int minor_count = 32;
117 int disable_sendpage;
118 int allow_oos;
119 unsigned int cn_idx = CN_IDX_DRBD;
120 int proc_details;       /* Detail level in proc drbd*/
121 
122 /* Module parameter for setting the user mode helper program
123  * to run. Default is /sbin/drbdadm */
124 char usermode_helper[80] = "/sbin/drbdadm";
125 
126 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
127 
128 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
129  * as member "struct gendisk *vdisk;"
130  */
131 struct drbd_conf **minor_table;
132 
133 struct kmem_cache *drbd_request_cache;
134 struct kmem_cache *drbd_ee_cache;	/* epoch entries */
135 struct kmem_cache *drbd_bm_ext_cache;	/* bitmap extents */
136 struct kmem_cache *drbd_al_ext_cache;	/* activity log extents */
137 mempool_t *drbd_request_mempool;
138 mempool_t *drbd_ee_mempool;
139 
140 /* I do not use a standard mempool, because:
141    1) I want to hand out the pre-allocated objects first.
142    2) I want to be able to interrupt sleeping allocation with a signal.
143    Note: This is a single linked list, the next pointer is the private
144 	 member of struct page.
145  */
146 struct page *drbd_pp_pool;
147 spinlock_t   drbd_pp_lock;
148 int          drbd_pp_vacant;
149 wait_queue_head_t drbd_pp_wait;
150 
151 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
152 
153 static const struct block_device_operations drbd_ops = {
154 	.owner =   THIS_MODULE,
155 	.open =    drbd_open,
156 	.release = drbd_release,
157 };
158 
159 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
160 
161 #ifdef __CHECKER__
162 /* When checking with sparse, and this is an inline function, sparse will
163    give tons of false positives. When this is a real functions sparse works.
164  */
165 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
166 {
167 	int io_allowed;
168 
169 	atomic_inc(&mdev->local_cnt);
170 	io_allowed = (mdev->state.disk >= mins);
171 	if (!io_allowed) {
172 		if (atomic_dec_and_test(&mdev->local_cnt))
173 			wake_up(&mdev->misc_wait);
174 	}
175 	return io_allowed;
176 }
177 
178 #endif
179 
180 /**
181  * DOC: The transfer log
182  *
183  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
184  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
185  * of the list. There is always at least one &struct drbd_tl_epoch object.
186  *
187  * Each &struct drbd_tl_epoch has a circular double linked list of requests
188  * attached.
189  */
190 static int tl_init(struct drbd_conf *mdev)
191 {
192 	struct drbd_tl_epoch *b;
193 
194 	/* during device minor initialization, we may well use GFP_KERNEL */
195 	b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
196 	if (!b)
197 		return 0;
198 	INIT_LIST_HEAD(&b->requests);
199 	INIT_LIST_HEAD(&b->w.list);
200 	b->next = NULL;
201 	b->br_number = 4711;
202 	b->n_req = 0;
203 	b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
204 
205 	mdev->oldest_tle = b;
206 	mdev->newest_tle = b;
207 	INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
208 
209 	mdev->tl_hash = NULL;
210 	mdev->tl_hash_s = 0;
211 
212 	return 1;
213 }
214 
215 static void tl_cleanup(struct drbd_conf *mdev)
216 {
217 	D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
218 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
219 	kfree(mdev->oldest_tle);
220 	mdev->oldest_tle = NULL;
221 	kfree(mdev->unused_spare_tle);
222 	mdev->unused_spare_tle = NULL;
223 	kfree(mdev->tl_hash);
224 	mdev->tl_hash = NULL;
225 	mdev->tl_hash_s = 0;
226 }
227 
228 /**
229  * _tl_add_barrier() - Adds a barrier to the transfer log
230  * @mdev:	DRBD device.
231  * @new:	Barrier to be added before the current head of the TL.
232  *
233  * The caller must hold the req_lock.
234  */
235 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
236 {
237 	struct drbd_tl_epoch *newest_before;
238 
239 	INIT_LIST_HEAD(&new->requests);
240 	INIT_LIST_HEAD(&new->w.list);
241 	new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
242 	new->next = NULL;
243 	new->n_req = 0;
244 
245 	newest_before = mdev->newest_tle;
246 	/* never send a barrier number == 0, because that is special-cased
247 	 * when using TCQ for our write ordering code */
248 	new->br_number = (newest_before->br_number+1) ?: 1;
249 	if (mdev->newest_tle != new) {
250 		mdev->newest_tle->next = new;
251 		mdev->newest_tle = new;
252 	}
253 }
254 
255 /**
256  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
257  * @mdev:	DRBD device.
258  * @barrier_nr:	Expected identifier of the DRBD write barrier packet.
259  * @set_size:	Expected number of requests before that barrier.
260  *
261  * In case the passed barrier_nr or set_size does not match the oldest
262  * &struct drbd_tl_epoch objects this function will cause a termination
263  * of the connection.
264  */
265 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
266 		       unsigned int set_size)
267 {
268 	struct drbd_tl_epoch *b, *nob; /* next old barrier */
269 	struct list_head *le, *tle;
270 	struct drbd_request *r;
271 
272 	spin_lock_irq(&mdev->req_lock);
273 
274 	b = mdev->oldest_tle;
275 
276 	/* first some paranoia code */
277 	if (b == NULL) {
278 		dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
279 			barrier_nr);
280 		goto bail;
281 	}
282 	if (b->br_number != barrier_nr) {
283 		dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
284 			barrier_nr, b->br_number);
285 		goto bail;
286 	}
287 	if (b->n_req != set_size) {
288 		dev_err(DEV, "BAD! BarrierAck #%u received with n_req=%u, expected n_req=%u!\n",
289 			barrier_nr, set_size, b->n_req);
290 		goto bail;
291 	}
292 
293 	/* Clean up list of requests processed during current epoch */
294 	list_for_each_safe(le, tle, &b->requests) {
295 		r = list_entry(le, struct drbd_request, tl_requests);
296 		_req_mod(r, barrier_acked);
297 	}
298 	/* There could be requests on the list waiting for completion
299 	   of the write to the local disk. To avoid corruptions of
300 	   slab's data structures we have to remove the lists head.
301 
302 	   Also there could have been a barrier ack out of sequence, overtaking
303 	   the write acks - which would be a bug and violating write ordering.
304 	   To not deadlock in case we lose connection while such requests are
305 	   still pending, we need some way to find them for the
306 	   _req_mode(connection_lost_while_pending).
307 
308 	   These have been list_move'd to the out_of_sequence_requests list in
309 	   _req_mod(, barrier_acked) above.
310 	   */
311 	list_del_init(&b->requests);
312 
313 	nob = b->next;
314 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
315 		_tl_add_barrier(mdev, b);
316 		if (nob)
317 			mdev->oldest_tle = nob;
318 		/* if nob == NULL b was the only barrier, and becomes the new
319 		   barrier. Therefore mdev->oldest_tle points already to b */
320 	} else {
321 		D_ASSERT(nob != NULL);
322 		mdev->oldest_tle = nob;
323 		kfree(b);
324 	}
325 
326 	spin_unlock_irq(&mdev->req_lock);
327 	dec_ap_pending(mdev);
328 
329 	return;
330 
331 bail:
332 	spin_unlock_irq(&mdev->req_lock);
333 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
334 }
335 
336 
337 /**
338  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
339  * @mdev:	DRBD device.
340  *
341  * This is called after the connection to the peer was lost. The storage covered
342  * by the requests on the transfer gets marked as our of sync. Called from the
343  * receiver thread and the worker thread.
344  */
345 void tl_clear(struct drbd_conf *mdev)
346 {
347 	struct drbd_tl_epoch *b, *tmp;
348 	struct list_head *le, *tle;
349 	struct drbd_request *r;
350 	int new_initial_bnr = net_random();
351 
352 	spin_lock_irq(&mdev->req_lock);
353 
354 	b = mdev->oldest_tle;
355 	while (b) {
356 		list_for_each_safe(le, tle, &b->requests) {
357 			r = list_entry(le, struct drbd_request, tl_requests);
358 			/* It would be nice to complete outside of spinlock.
359 			 * But this is easier for now. */
360 			_req_mod(r, connection_lost_while_pending);
361 		}
362 		tmp = b->next;
363 
364 		/* there could still be requests on that ring list,
365 		 * in case local io is still pending */
366 		list_del(&b->requests);
367 
368 		/* dec_ap_pending corresponding to queue_barrier.
369 		 * the newest barrier may not have been queued yet,
370 		 * in which case w.cb is still NULL. */
371 		if (b->w.cb != NULL)
372 			dec_ap_pending(mdev);
373 
374 		if (b == mdev->newest_tle) {
375 			/* recycle, but reinit! */
376 			D_ASSERT(tmp == NULL);
377 			INIT_LIST_HEAD(&b->requests);
378 			INIT_LIST_HEAD(&b->w.list);
379 			b->w.cb = NULL;
380 			b->br_number = new_initial_bnr;
381 			b->n_req = 0;
382 
383 			mdev->oldest_tle = b;
384 			break;
385 		}
386 		kfree(b);
387 		b = tmp;
388 	}
389 
390 	/* we expect this list to be empty. */
391 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
392 
393 	/* but just in case, clean it up anyways! */
394 	list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
395 		r = list_entry(le, struct drbd_request, tl_requests);
396 		/* It would be nice to complete outside of spinlock.
397 		 * But this is easier for now. */
398 		_req_mod(r, connection_lost_while_pending);
399 	}
400 
401 	/* ensure bit indicating barrier is required is clear */
402 	clear_bit(CREATE_BARRIER, &mdev->flags);
403 
404 	spin_unlock_irq(&mdev->req_lock);
405 }
406 
407 /**
408  * cl_wide_st_chg() - TRUE if the state change is a cluster wide one
409  * @mdev:	DRBD device.
410  * @os:		old (current) state.
411  * @ns:		new (wanted) state.
412  */
413 static int cl_wide_st_chg(struct drbd_conf *mdev,
414 			  union drbd_state os, union drbd_state ns)
415 {
416 	return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
417 		 ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
418 		  (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
419 		  (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
420 		  (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))) ||
421 		(os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
422 		(os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
423 }
424 
425 int drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
426 		      union drbd_state mask, union drbd_state val)
427 {
428 	unsigned long flags;
429 	union drbd_state os, ns;
430 	int rv;
431 
432 	spin_lock_irqsave(&mdev->req_lock, flags);
433 	os = mdev->state;
434 	ns.i = (os.i & ~mask.i) | val.i;
435 	rv = _drbd_set_state(mdev, ns, f, NULL);
436 	ns = mdev->state;
437 	spin_unlock_irqrestore(&mdev->req_lock, flags);
438 
439 	return rv;
440 }
441 
442 /**
443  * drbd_force_state() - Impose a change which happens outside our control on our state
444  * @mdev:	DRBD device.
445  * @mask:	mask of state bits to change.
446  * @val:	value of new state bits.
447  */
448 void drbd_force_state(struct drbd_conf *mdev,
449 	union drbd_state mask, union drbd_state val)
450 {
451 	drbd_change_state(mdev, CS_HARD, mask, val);
452 }
453 
454 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns);
455 static int is_valid_state_transition(struct drbd_conf *,
456 				     union drbd_state, union drbd_state);
457 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
458 				       union drbd_state ns, int *warn_sync_abort);
459 int drbd_send_state_req(struct drbd_conf *,
460 			union drbd_state, union drbd_state);
461 
462 static enum drbd_state_ret_codes _req_st_cond(struct drbd_conf *mdev,
463 				    union drbd_state mask, union drbd_state val)
464 {
465 	union drbd_state os, ns;
466 	unsigned long flags;
467 	int rv;
468 
469 	if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
470 		return SS_CW_SUCCESS;
471 
472 	if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
473 		return SS_CW_FAILED_BY_PEER;
474 
475 	rv = 0;
476 	spin_lock_irqsave(&mdev->req_lock, flags);
477 	os = mdev->state;
478 	ns.i = (os.i & ~mask.i) | val.i;
479 	ns = sanitize_state(mdev, os, ns, NULL);
480 
481 	if (!cl_wide_st_chg(mdev, os, ns))
482 		rv = SS_CW_NO_NEED;
483 	if (!rv) {
484 		rv = is_valid_state(mdev, ns);
485 		if (rv == SS_SUCCESS) {
486 			rv = is_valid_state_transition(mdev, ns, os);
487 			if (rv == SS_SUCCESS)
488 				rv = 0; /* cont waiting, otherwise fail. */
489 		}
490 	}
491 	spin_unlock_irqrestore(&mdev->req_lock, flags);
492 
493 	return rv;
494 }
495 
496 /**
497  * drbd_req_state() - Perform an eventually cluster wide state change
498  * @mdev:	DRBD device.
499  * @mask:	mask of state bits to change.
500  * @val:	value of new state bits.
501  * @f:		flags
502  *
503  * Should not be called directly, use drbd_request_state() or
504  * _drbd_request_state().
505  */
506 static int drbd_req_state(struct drbd_conf *mdev,
507 			  union drbd_state mask, union drbd_state val,
508 			  enum chg_state_flags f)
509 {
510 	struct completion done;
511 	unsigned long flags;
512 	union drbd_state os, ns;
513 	int rv;
514 
515 	init_completion(&done);
516 
517 	if (f & CS_SERIALIZE)
518 		mutex_lock(&mdev->state_mutex);
519 
520 	spin_lock_irqsave(&mdev->req_lock, flags);
521 	os = mdev->state;
522 	ns.i = (os.i & ~mask.i) | val.i;
523 	ns = sanitize_state(mdev, os, ns, NULL);
524 
525 	if (cl_wide_st_chg(mdev, os, ns)) {
526 		rv = is_valid_state(mdev, ns);
527 		if (rv == SS_SUCCESS)
528 			rv = is_valid_state_transition(mdev, ns, os);
529 		spin_unlock_irqrestore(&mdev->req_lock, flags);
530 
531 		if (rv < SS_SUCCESS) {
532 			if (f & CS_VERBOSE)
533 				print_st_err(mdev, os, ns, rv);
534 			goto abort;
535 		}
536 
537 		drbd_state_lock(mdev);
538 		if (!drbd_send_state_req(mdev, mask, val)) {
539 			drbd_state_unlock(mdev);
540 			rv = SS_CW_FAILED_BY_PEER;
541 			if (f & CS_VERBOSE)
542 				print_st_err(mdev, os, ns, rv);
543 			goto abort;
544 		}
545 
546 		wait_event(mdev->state_wait,
547 			(rv = _req_st_cond(mdev, mask, val)));
548 
549 		if (rv < SS_SUCCESS) {
550 			drbd_state_unlock(mdev);
551 			if (f & CS_VERBOSE)
552 				print_st_err(mdev, os, ns, rv);
553 			goto abort;
554 		}
555 		spin_lock_irqsave(&mdev->req_lock, flags);
556 		os = mdev->state;
557 		ns.i = (os.i & ~mask.i) | val.i;
558 		rv = _drbd_set_state(mdev, ns, f, &done);
559 		drbd_state_unlock(mdev);
560 	} else {
561 		rv = _drbd_set_state(mdev, ns, f, &done);
562 	}
563 
564 	spin_unlock_irqrestore(&mdev->req_lock, flags);
565 
566 	if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
567 		D_ASSERT(current != mdev->worker.task);
568 		wait_for_completion(&done);
569 	}
570 
571 abort:
572 	if (f & CS_SERIALIZE)
573 		mutex_unlock(&mdev->state_mutex);
574 
575 	return rv;
576 }
577 
578 /**
579  * _drbd_request_state() - Request a state change (with flags)
580  * @mdev:	DRBD device.
581  * @mask:	mask of state bits to change.
582  * @val:	value of new state bits.
583  * @f:		flags
584  *
585  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
586  * flag, or when logging of failed state change requests is not desired.
587  */
588 int _drbd_request_state(struct drbd_conf *mdev,	union drbd_state mask,
589 			union drbd_state val,	enum chg_state_flags f)
590 {
591 	int rv;
592 
593 	wait_event(mdev->state_wait,
594 		   (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
595 
596 	return rv;
597 }
598 
599 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
600 {
601 	dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
602 	    name,
603 	    drbd_conn_str(ns.conn),
604 	    drbd_role_str(ns.role),
605 	    drbd_role_str(ns.peer),
606 	    drbd_disk_str(ns.disk),
607 	    drbd_disk_str(ns.pdsk),
608 	    ns.susp ? 's' : 'r',
609 	    ns.aftr_isp ? 'a' : '-',
610 	    ns.peer_isp ? 'p' : '-',
611 	    ns.user_isp ? 'u' : '-'
612 	    );
613 }
614 
615 void print_st_err(struct drbd_conf *mdev,
616 	union drbd_state os, union drbd_state ns, int err)
617 {
618 	if (err == SS_IN_TRANSIENT_STATE)
619 		return;
620 	dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
621 	print_st(mdev, " state", os);
622 	print_st(mdev, "wanted", ns);
623 }
624 
625 
626 #define drbd_peer_str drbd_role_str
627 #define drbd_pdsk_str drbd_disk_str
628 
629 #define drbd_susp_str(A)     ((A) ? "1" : "0")
630 #define drbd_aftr_isp_str(A) ((A) ? "1" : "0")
631 #define drbd_peer_isp_str(A) ((A) ? "1" : "0")
632 #define drbd_user_isp_str(A) ((A) ? "1" : "0")
633 
634 #define PSC(A) \
635 	({ if (ns.A != os.A) { \
636 		pbp += sprintf(pbp, #A "( %s -> %s ) ", \
637 			      drbd_##A##_str(os.A), \
638 			      drbd_##A##_str(ns.A)); \
639 	} })
640 
641 /**
642  * is_valid_state() - Returns an SS_ error code if ns is not valid
643  * @mdev:	DRBD device.
644  * @ns:		State to consider.
645  */
646 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
647 {
648 	/* See drbd_state_sw_errors in drbd_strings.c */
649 
650 	enum drbd_fencing_p fp;
651 	int rv = SS_SUCCESS;
652 
653 	fp = FP_DONT_CARE;
654 	if (get_ldev(mdev)) {
655 		fp = mdev->ldev->dc.fencing;
656 		put_ldev(mdev);
657 	}
658 
659 	if (get_net_conf(mdev)) {
660 		if (!mdev->net_conf->two_primaries &&
661 		    ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
662 			rv = SS_TWO_PRIMARIES;
663 		put_net_conf(mdev);
664 	}
665 
666 	if (rv <= 0)
667 		/* already found a reason to abort */;
668 	else if (ns.role == R_SECONDARY && mdev->open_cnt)
669 		rv = SS_DEVICE_IN_USE;
670 
671 	else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
672 		rv = SS_NO_UP_TO_DATE_DISK;
673 
674 	else if (fp >= FP_RESOURCE &&
675 		 ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
676 		rv = SS_PRIMARY_NOP;
677 
678 	else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
679 		rv = SS_NO_UP_TO_DATE_DISK;
680 
681 	else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
682 		rv = SS_NO_LOCAL_DISK;
683 
684 	else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
685 		rv = SS_NO_REMOTE_DISK;
686 
687 	else if ((ns.conn == C_CONNECTED ||
688 		  ns.conn == C_WF_BITMAP_S ||
689 		  ns.conn == C_SYNC_SOURCE ||
690 		  ns.conn == C_PAUSED_SYNC_S) &&
691 		  ns.disk == D_OUTDATED)
692 		rv = SS_CONNECTED_OUTDATES;
693 
694 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
695 		 (mdev->sync_conf.verify_alg[0] == 0))
696 		rv = SS_NO_VERIFY_ALG;
697 
698 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
699 		  mdev->agreed_pro_version < 88)
700 		rv = SS_NOT_SUPPORTED;
701 
702 	return rv;
703 }
704 
705 /**
706  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
707  * @mdev:	DRBD device.
708  * @ns:		new state.
709  * @os:		old state.
710  */
711 static int is_valid_state_transition(struct drbd_conf *mdev,
712 				     union drbd_state ns, union drbd_state os)
713 {
714 	int rv = SS_SUCCESS;
715 
716 	if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
717 	    os.conn > C_CONNECTED)
718 		rv = SS_RESYNC_RUNNING;
719 
720 	if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
721 		rv = SS_ALREADY_STANDALONE;
722 
723 	if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
724 		rv = SS_IS_DISKLESS;
725 
726 	if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
727 		rv = SS_NO_NET_CONFIG;
728 
729 	if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
730 		rv = SS_LOWER_THAN_OUTDATED;
731 
732 	if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
733 		rv = SS_IN_TRANSIENT_STATE;
734 
735 	if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
736 		rv = SS_IN_TRANSIENT_STATE;
737 
738 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
739 		rv = SS_NEED_CONNECTION;
740 
741 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
742 	    ns.conn != os.conn && os.conn > C_CONNECTED)
743 		rv = SS_RESYNC_RUNNING;
744 
745 	if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
746 	    os.conn < C_CONNECTED)
747 		rv = SS_NEED_CONNECTION;
748 
749 	return rv;
750 }
751 
752 /**
753  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
754  * @mdev:	DRBD device.
755  * @os:		old state.
756  * @ns:		new state.
757  * @warn_sync_abort:
758  *
759  * When we loose connection, we have to set the state of the peers disk (pdsk)
760  * to D_UNKNOWN. This rule and many more along those lines are in this function.
761  */
762 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
763 				       union drbd_state ns, int *warn_sync_abort)
764 {
765 	enum drbd_fencing_p fp;
766 
767 	fp = FP_DONT_CARE;
768 	if (get_ldev(mdev)) {
769 		fp = mdev->ldev->dc.fencing;
770 		put_ldev(mdev);
771 	}
772 
773 	/* Disallow Network errors to configure a device's network part */
774 	if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
775 	    os.conn <= C_DISCONNECTING)
776 		ns.conn = os.conn;
777 
778 	/* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow */
779 	if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
780 	    ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING)
781 		ns.conn = os.conn;
782 
783 	/* After C_DISCONNECTING only C_STANDALONE may follow */
784 	if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
785 		ns.conn = os.conn;
786 
787 	if (ns.conn < C_CONNECTED) {
788 		ns.peer_isp = 0;
789 		ns.peer = R_UNKNOWN;
790 		if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
791 			ns.pdsk = D_UNKNOWN;
792 	}
793 
794 	/* Clear the aftr_isp when becoming unconfigured */
795 	if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
796 		ns.aftr_isp = 0;
797 
798 	if (ns.conn <= C_DISCONNECTING && ns.disk == D_DISKLESS)
799 		ns.pdsk = D_UNKNOWN;
800 
801 	/* Abort resync if a disk fails/detaches */
802 	if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
803 	    (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
804 		if (warn_sync_abort)
805 			*warn_sync_abort = 1;
806 		ns.conn = C_CONNECTED;
807 	}
808 
809 	if (ns.conn >= C_CONNECTED &&
810 	    ((ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED) ||
811 	     (ns.disk == D_NEGOTIATING && ns.conn == C_WF_BITMAP_T))) {
812 		switch (ns.conn) {
813 		case C_WF_BITMAP_T:
814 		case C_PAUSED_SYNC_T:
815 			ns.disk = D_OUTDATED;
816 			break;
817 		case C_CONNECTED:
818 		case C_WF_BITMAP_S:
819 		case C_SYNC_SOURCE:
820 		case C_PAUSED_SYNC_S:
821 			ns.disk = D_UP_TO_DATE;
822 			break;
823 		case C_SYNC_TARGET:
824 			ns.disk = D_INCONSISTENT;
825 			dev_warn(DEV, "Implicitly set disk state Inconsistent!\n");
826 			break;
827 		}
828 		if (os.disk == D_OUTDATED && ns.disk == D_UP_TO_DATE)
829 			dev_warn(DEV, "Implicitly set disk from Outdated to UpToDate\n");
830 	}
831 
832 	if (ns.conn >= C_CONNECTED &&
833 	    (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)) {
834 		switch (ns.conn) {
835 		case C_CONNECTED:
836 		case C_WF_BITMAP_T:
837 		case C_PAUSED_SYNC_T:
838 		case C_SYNC_TARGET:
839 			ns.pdsk = D_UP_TO_DATE;
840 			break;
841 		case C_WF_BITMAP_S:
842 		case C_PAUSED_SYNC_S:
843 			ns.pdsk = D_OUTDATED;
844 			break;
845 		case C_SYNC_SOURCE:
846 			ns.pdsk = D_INCONSISTENT;
847 			dev_warn(DEV, "Implicitly set pdsk Inconsistent!\n");
848 			break;
849 		}
850 		if (os.pdsk == D_OUTDATED && ns.pdsk == D_UP_TO_DATE)
851 			dev_warn(DEV, "Implicitly set pdsk from Outdated to UpToDate\n");
852 	}
853 
854 	/* Connection breaks down before we finished "Negotiating" */
855 	if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
856 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
857 		if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
858 			ns.disk = mdev->new_state_tmp.disk;
859 			ns.pdsk = mdev->new_state_tmp.pdsk;
860 		} else {
861 			dev_alert(DEV, "Connection lost while negotiating, no data!\n");
862 			ns.disk = D_DISKLESS;
863 			ns.pdsk = D_UNKNOWN;
864 		}
865 		put_ldev(mdev);
866 	}
867 
868 	if (fp == FP_STONITH &&
869 	    (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
870 	    !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
871 		ns.susp = 1;
872 
873 	if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
874 		if (ns.conn == C_SYNC_SOURCE)
875 			ns.conn = C_PAUSED_SYNC_S;
876 		if (ns.conn == C_SYNC_TARGET)
877 			ns.conn = C_PAUSED_SYNC_T;
878 	} else {
879 		if (ns.conn == C_PAUSED_SYNC_S)
880 			ns.conn = C_SYNC_SOURCE;
881 		if (ns.conn == C_PAUSED_SYNC_T)
882 			ns.conn = C_SYNC_TARGET;
883 	}
884 
885 	return ns;
886 }
887 
888 /* helper for __drbd_set_state */
889 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
890 {
891 	if (cs == C_VERIFY_T) {
892 		/* starting online verify from an arbitrary position
893 		 * does not fit well into the existing protocol.
894 		 * on C_VERIFY_T, we initialize ov_left and friends
895 		 * implicitly in receive_DataRequest once the
896 		 * first P_OV_REQUEST is received */
897 		mdev->ov_start_sector = ~(sector_t)0;
898 	} else {
899 		unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
900 		if (bit >= mdev->rs_total)
901 			mdev->ov_start_sector =
902 				BM_BIT_TO_SECT(mdev->rs_total - 1);
903 		mdev->ov_position = mdev->ov_start_sector;
904 	}
905 }
906 
907 /**
908  * __drbd_set_state() - Set a new DRBD state
909  * @mdev:	DRBD device.
910  * @ns:		new state.
911  * @flags:	Flags
912  * @done:	Optional completion, that will get completed after the after_state_ch() finished
913  *
914  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
915  */
916 int __drbd_set_state(struct drbd_conf *mdev,
917 		    union drbd_state ns, enum chg_state_flags flags,
918 		    struct completion *done)
919 {
920 	union drbd_state os;
921 	int rv = SS_SUCCESS;
922 	int warn_sync_abort = 0;
923 	struct after_state_chg_work *ascw;
924 
925 	os = mdev->state;
926 
927 	ns = sanitize_state(mdev, os, ns, &warn_sync_abort);
928 
929 	if (ns.i == os.i)
930 		return SS_NOTHING_TO_DO;
931 
932 	if (!(flags & CS_HARD)) {
933 		/*  pre-state-change checks ; only look at ns  */
934 		/* See drbd_state_sw_errors in drbd_strings.c */
935 
936 		rv = is_valid_state(mdev, ns);
937 		if (rv < SS_SUCCESS) {
938 			/* If the old state was illegal as well, then let
939 			   this happen...*/
940 
941 			if (is_valid_state(mdev, os) == rv) {
942 				dev_err(DEV, "Considering state change from bad state. "
943 				    "Error would be: '%s'\n",
944 				    drbd_set_st_err_str(rv));
945 				print_st(mdev, "old", os);
946 				print_st(mdev, "new", ns);
947 				rv = is_valid_state_transition(mdev, ns, os);
948 			}
949 		} else
950 			rv = is_valid_state_transition(mdev, ns, os);
951 	}
952 
953 	if (rv < SS_SUCCESS) {
954 		if (flags & CS_VERBOSE)
955 			print_st_err(mdev, os, ns, rv);
956 		return rv;
957 	}
958 
959 	if (warn_sync_abort)
960 		dev_warn(DEV, "Resync aborted.\n");
961 
962 	{
963 		char *pbp, pb[300];
964 		pbp = pb;
965 		*pbp = 0;
966 		PSC(role);
967 		PSC(peer);
968 		PSC(conn);
969 		PSC(disk);
970 		PSC(pdsk);
971 		PSC(susp);
972 		PSC(aftr_isp);
973 		PSC(peer_isp);
974 		PSC(user_isp);
975 		dev_info(DEV, "%s\n", pb);
976 	}
977 
978 	/* solve the race between becoming unconfigured,
979 	 * worker doing the cleanup, and
980 	 * admin reconfiguring us:
981 	 * on (re)configure, first set CONFIG_PENDING,
982 	 * then wait for a potentially exiting worker,
983 	 * start the worker, and schedule one no_op.
984 	 * then proceed with configuration.
985 	 */
986 	if (ns.disk == D_DISKLESS &&
987 	    ns.conn == C_STANDALONE &&
988 	    ns.role == R_SECONDARY &&
989 	    !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
990 		set_bit(DEVICE_DYING, &mdev->flags);
991 
992 	mdev->state.i = ns.i;
993 	wake_up(&mdev->misc_wait);
994 	wake_up(&mdev->state_wait);
995 
996 	/*   post-state-change actions   */
997 	if (os.conn >= C_SYNC_SOURCE   && ns.conn <= C_CONNECTED) {
998 		set_bit(STOP_SYNC_TIMER, &mdev->flags);
999 		mod_timer(&mdev->resync_timer, jiffies);
1000 	}
1001 
1002 	/* aborted verify run. log the last position */
1003 	if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1004 	    ns.conn < C_CONNECTED) {
1005 		mdev->ov_start_sector =
1006 			BM_BIT_TO_SECT(mdev->rs_total - mdev->ov_left);
1007 		dev_info(DEV, "Online Verify reached sector %llu\n",
1008 			(unsigned long long)mdev->ov_start_sector);
1009 	}
1010 
1011 	if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1012 	    (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1013 		dev_info(DEV, "Syncer continues.\n");
1014 		mdev->rs_paused += (long)jiffies-(long)mdev->rs_mark_time;
1015 		if (ns.conn == C_SYNC_TARGET) {
1016 			if (!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))
1017 				mod_timer(&mdev->resync_timer, jiffies);
1018 			/* This if (!test_bit) is only needed for the case
1019 			   that a device that has ceased to used its timer,
1020 			   i.e. it is already in drbd_resync_finished() gets
1021 			   paused and resumed. */
1022 		}
1023 	}
1024 
1025 	if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1026 	    (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1027 		dev_info(DEV, "Resync suspended\n");
1028 		mdev->rs_mark_time = jiffies;
1029 		if (ns.conn == C_PAUSED_SYNC_T)
1030 			set_bit(STOP_SYNC_TIMER, &mdev->flags);
1031 	}
1032 
1033 	if (os.conn == C_CONNECTED &&
1034 	    (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1035 		mdev->ov_position = 0;
1036 		mdev->rs_total =
1037 		mdev->rs_mark_left = drbd_bm_bits(mdev);
1038 		if (mdev->agreed_pro_version >= 90)
1039 			set_ov_position(mdev, ns.conn);
1040 		else
1041 			mdev->ov_start_sector = 0;
1042 		mdev->ov_left = mdev->rs_total
1043 			      - BM_SECT_TO_BIT(mdev->ov_position);
1044 		mdev->rs_start     =
1045 		mdev->rs_mark_time = jiffies;
1046 		mdev->ov_last_oos_size = 0;
1047 		mdev->ov_last_oos_start = 0;
1048 
1049 		if (ns.conn == C_VERIFY_S) {
1050 			dev_info(DEV, "Starting Online Verify from sector %llu\n",
1051 					(unsigned long long)mdev->ov_position);
1052 			mod_timer(&mdev->resync_timer, jiffies);
1053 		}
1054 	}
1055 
1056 	if (get_ldev(mdev)) {
1057 		u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1058 						 MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1059 						 MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1060 
1061 		if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1062 			mdf |= MDF_CRASHED_PRIMARY;
1063 		if (mdev->state.role == R_PRIMARY ||
1064 		    (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1065 			mdf |= MDF_PRIMARY_IND;
1066 		if (mdev->state.conn > C_WF_REPORT_PARAMS)
1067 			mdf |= MDF_CONNECTED_IND;
1068 		if (mdev->state.disk > D_INCONSISTENT)
1069 			mdf |= MDF_CONSISTENT;
1070 		if (mdev->state.disk > D_OUTDATED)
1071 			mdf |= MDF_WAS_UP_TO_DATE;
1072 		if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1073 			mdf |= MDF_PEER_OUT_DATED;
1074 		if (mdf != mdev->ldev->md.flags) {
1075 			mdev->ldev->md.flags = mdf;
1076 			drbd_md_mark_dirty(mdev);
1077 		}
1078 		if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1079 			drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1080 		put_ldev(mdev);
1081 	}
1082 
1083 	/* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1084 	if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1085 	    os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1086 		set_bit(CONSIDER_RESYNC, &mdev->flags);
1087 
1088 	/* Receiver should clean up itself */
1089 	if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1090 		drbd_thread_stop_nowait(&mdev->receiver);
1091 
1092 	/* Now the receiver finished cleaning up itself, it should die */
1093 	if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1094 		drbd_thread_stop_nowait(&mdev->receiver);
1095 
1096 	/* Upon network failure, we need to restart the receiver. */
1097 	if (os.conn > C_TEAR_DOWN &&
1098 	    ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1099 		drbd_thread_restart_nowait(&mdev->receiver);
1100 
1101 	ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1102 	if (ascw) {
1103 		ascw->os = os;
1104 		ascw->ns = ns;
1105 		ascw->flags = flags;
1106 		ascw->w.cb = w_after_state_ch;
1107 		ascw->done = done;
1108 		drbd_queue_work(&mdev->data.work, &ascw->w);
1109 	} else {
1110 		dev_warn(DEV, "Could not kmalloc an ascw\n");
1111 	}
1112 
1113 	return rv;
1114 }
1115 
1116 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1117 {
1118 	struct after_state_chg_work *ascw =
1119 		container_of(w, struct after_state_chg_work, w);
1120 	after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1121 	if (ascw->flags & CS_WAIT_COMPLETE) {
1122 		D_ASSERT(ascw->done != NULL);
1123 		complete(ascw->done);
1124 	}
1125 	kfree(ascw);
1126 
1127 	return 1;
1128 }
1129 
1130 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1131 {
1132 	if (rv) {
1133 		dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1134 		_drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1135 		return;
1136 	}
1137 
1138 	switch (mdev->state.conn) {
1139 	case C_STARTING_SYNC_T:
1140 		_drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1141 		break;
1142 	case C_STARTING_SYNC_S:
1143 		drbd_start_resync(mdev, C_SYNC_SOURCE);
1144 		break;
1145 	}
1146 }
1147 
1148 /**
1149  * after_state_ch() - Perform after state change actions that may sleep
1150  * @mdev:	DRBD device.
1151  * @os:		old state.
1152  * @ns:		new state.
1153  * @flags:	Flags
1154  */
1155 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1156 			   union drbd_state ns, enum chg_state_flags flags)
1157 {
1158 	enum drbd_fencing_p fp;
1159 
1160 	if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1161 		clear_bit(CRASHED_PRIMARY, &mdev->flags);
1162 		if (mdev->p_uuid)
1163 			mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1164 	}
1165 
1166 	fp = FP_DONT_CARE;
1167 	if (get_ldev(mdev)) {
1168 		fp = mdev->ldev->dc.fencing;
1169 		put_ldev(mdev);
1170 	}
1171 
1172 	/* Inform userspace about the change... */
1173 	drbd_bcast_state(mdev, ns);
1174 
1175 	if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1176 	    (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1177 		drbd_khelper(mdev, "pri-on-incon-degr");
1178 
1179 	/* Here we have the actions that are performed after a
1180 	   state change. This function might sleep */
1181 
1182 	if (fp == FP_STONITH && ns.susp) {
1183 		/* case1: The outdate peer handler is successful:
1184 		 * case2: The connection was established again: */
1185 		if ((os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) ||
1186 		    (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)) {
1187 			tl_clear(mdev);
1188 			spin_lock_irq(&mdev->req_lock);
1189 			_drbd_set_state(_NS(mdev, susp, 0), CS_VERBOSE, NULL);
1190 			spin_unlock_irq(&mdev->req_lock);
1191 		}
1192 	}
1193 	/* Do not change the order of the if above and the two below... */
1194 	if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1195 		drbd_send_uuids(mdev);
1196 		drbd_send_state(mdev);
1197 	}
1198 	if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S)
1199 		drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL, "send_bitmap (WFBitMapS)");
1200 
1201 	/* Lost contact to peer's copy of the data */
1202 	if ((os.pdsk >= D_INCONSISTENT &&
1203 	     os.pdsk != D_UNKNOWN &&
1204 	     os.pdsk != D_OUTDATED)
1205 	&&  (ns.pdsk < D_INCONSISTENT ||
1206 	     ns.pdsk == D_UNKNOWN ||
1207 	     ns.pdsk == D_OUTDATED)) {
1208 		kfree(mdev->p_uuid);
1209 		mdev->p_uuid = NULL;
1210 		if (get_ldev(mdev)) {
1211 			if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1212 			    mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1213 				drbd_uuid_new_current(mdev);
1214 				drbd_send_uuids(mdev);
1215 			}
1216 			put_ldev(mdev);
1217 		}
1218 	}
1219 
1220 	if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1221 		if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0)
1222 			drbd_uuid_new_current(mdev);
1223 
1224 		/* D_DISKLESS Peer becomes secondary */
1225 		if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1226 			drbd_al_to_on_disk_bm(mdev);
1227 		put_ldev(mdev);
1228 	}
1229 
1230 	/* Last part of the attaching process ... */
1231 	if (ns.conn >= C_CONNECTED &&
1232 	    os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1233 		kfree(mdev->p_uuid); /* We expect to receive up-to-date UUIDs soon. */
1234 		mdev->p_uuid = NULL; /* ...to not use the old ones in the mean time */
1235 		drbd_send_sizes(mdev, 0);  /* to start sync... */
1236 		drbd_send_uuids(mdev);
1237 		drbd_send_state(mdev);
1238 	}
1239 
1240 	/* We want to pause/continue resync, tell peer. */
1241 	if (ns.conn >= C_CONNECTED &&
1242 	     ((os.aftr_isp != ns.aftr_isp) ||
1243 	      (os.user_isp != ns.user_isp)))
1244 		drbd_send_state(mdev);
1245 
1246 	/* In case one of the isp bits got set, suspend other devices. */
1247 	if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1248 	    (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1249 		suspend_other_sg(mdev);
1250 
1251 	/* Make sure the peer gets informed about eventual state
1252 	   changes (ISP bits) while we were in WFReportParams. */
1253 	if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1254 		drbd_send_state(mdev);
1255 
1256 	/* We are in the progress to start a full sync... */
1257 	if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1258 	    (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1259 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, &abw_start_sync, "set_n_write from StartingSync");
1260 
1261 	/* We are invalidating our self... */
1262 	if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1263 	    os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1264 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
1265 
1266 	if (os.disk > D_FAILED && ns.disk == D_FAILED) {
1267 		enum drbd_io_error_p eh;
1268 
1269 		eh = EP_PASS_ON;
1270 		if (get_ldev_if_state(mdev, D_FAILED)) {
1271 			eh = mdev->ldev->dc.on_io_error;
1272 			put_ldev(mdev);
1273 		}
1274 
1275 		drbd_rs_cancel_all(mdev);
1276 		/* since get_ldev() only works as long as disk>=D_INCONSISTENT,
1277 		   and it is D_DISKLESS here, local_cnt can only go down, it can
1278 		   not increase... It will reach zero */
1279 		wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1280 		mdev->rs_total = 0;
1281 		mdev->rs_failed = 0;
1282 		atomic_set(&mdev->rs_pending_cnt, 0);
1283 
1284 		spin_lock_irq(&mdev->req_lock);
1285 		_drbd_set_state(_NS(mdev, disk, D_DISKLESS), CS_HARD, NULL);
1286 		spin_unlock_irq(&mdev->req_lock);
1287 
1288 		if (eh == EP_CALL_HELPER)
1289 			drbd_khelper(mdev, "local-io-error");
1290 	}
1291 
1292 	if (os.disk > D_DISKLESS && ns.disk == D_DISKLESS) {
1293 
1294 		if (os.disk == D_FAILED) /* && ns.disk == D_DISKLESS*/ {
1295 			if (drbd_send_state(mdev))
1296 				dev_warn(DEV, "Notified peer that my disk is broken.\n");
1297 			else
1298 				dev_err(DEV, "Sending state in drbd_io_error() failed\n");
1299 		}
1300 
1301 		wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1302 		lc_destroy(mdev->resync);
1303 		mdev->resync = NULL;
1304 		lc_destroy(mdev->act_log);
1305 		mdev->act_log = NULL;
1306 		__no_warn(local,
1307 			drbd_free_bc(mdev->ldev);
1308 			mdev->ldev = NULL;);
1309 
1310 		if (mdev->md_io_tmpp)
1311 			__free_page(mdev->md_io_tmpp);
1312 	}
1313 
1314 	/* Disks got bigger while they were detached */
1315 	if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1316 	    test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1317 		if (ns.conn == C_CONNECTED)
1318 			resync_after_online_grow(mdev);
1319 	}
1320 
1321 	/* A resync finished or aborted, wake paused devices... */
1322 	if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1323 	    (os.peer_isp && !ns.peer_isp) ||
1324 	    (os.user_isp && !ns.user_isp))
1325 		resume_next_sg(mdev);
1326 
1327 	/* Upon network connection, we need to start the receiver */
1328 	if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1329 		drbd_thread_start(&mdev->receiver);
1330 
1331 	/* Terminate worker thread if we are unconfigured - it will be
1332 	   restarted as needed... */
1333 	if (ns.disk == D_DISKLESS &&
1334 	    ns.conn == C_STANDALONE &&
1335 	    ns.role == R_SECONDARY) {
1336 		if (os.aftr_isp != ns.aftr_isp)
1337 			resume_next_sg(mdev);
1338 		/* set in __drbd_set_state, unless CONFIG_PENDING was set */
1339 		if (test_bit(DEVICE_DYING, &mdev->flags))
1340 			drbd_thread_stop_nowait(&mdev->worker);
1341 	}
1342 
1343 	drbd_md_sync(mdev);
1344 }
1345 
1346 
1347 static int drbd_thread_setup(void *arg)
1348 {
1349 	struct drbd_thread *thi = (struct drbd_thread *) arg;
1350 	struct drbd_conf *mdev = thi->mdev;
1351 	unsigned long flags;
1352 	int retval;
1353 
1354 restart:
1355 	retval = thi->function(thi);
1356 
1357 	spin_lock_irqsave(&thi->t_lock, flags);
1358 
1359 	/* if the receiver has been "Exiting", the last thing it did
1360 	 * was set the conn state to "StandAlone",
1361 	 * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1362 	 * and receiver thread will be "started".
1363 	 * drbd_thread_start needs to set "Restarting" in that case.
1364 	 * t_state check and assignment needs to be within the same spinlock,
1365 	 * so either thread_start sees Exiting, and can remap to Restarting,
1366 	 * or thread_start see None, and can proceed as normal.
1367 	 */
1368 
1369 	if (thi->t_state == Restarting) {
1370 		dev_info(DEV, "Restarting %s\n", current->comm);
1371 		thi->t_state = Running;
1372 		spin_unlock_irqrestore(&thi->t_lock, flags);
1373 		goto restart;
1374 	}
1375 
1376 	thi->task = NULL;
1377 	thi->t_state = None;
1378 	smp_mb();
1379 	complete(&thi->stop);
1380 	spin_unlock_irqrestore(&thi->t_lock, flags);
1381 
1382 	dev_info(DEV, "Terminating %s\n", current->comm);
1383 
1384 	/* Release mod reference taken when thread was started */
1385 	module_put(THIS_MODULE);
1386 	return retval;
1387 }
1388 
1389 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1390 		      int (*func) (struct drbd_thread *))
1391 {
1392 	spin_lock_init(&thi->t_lock);
1393 	thi->task    = NULL;
1394 	thi->t_state = None;
1395 	thi->function = func;
1396 	thi->mdev = mdev;
1397 }
1398 
1399 int drbd_thread_start(struct drbd_thread *thi)
1400 {
1401 	struct drbd_conf *mdev = thi->mdev;
1402 	struct task_struct *nt;
1403 	unsigned long flags;
1404 
1405 	const char *me =
1406 		thi == &mdev->receiver ? "receiver" :
1407 		thi == &mdev->asender  ? "asender"  :
1408 		thi == &mdev->worker   ? "worker"   : "NONSENSE";
1409 
1410 	/* is used from state engine doing drbd_thread_stop_nowait,
1411 	 * while holding the req lock irqsave */
1412 	spin_lock_irqsave(&thi->t_lock, flags);
1413 
1414 	switch (thi->t_state) {
1415 	case None:
1416 		dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1417 				me, current->comm, current->pid);
1418 
1419 		/* Get ref on module for thread - this is released when thread exits */
1420 		if (!try_module_get(THIS_MODULE)) {
1421 			dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1422 			spin_unlock_irqrestore(&thi->t_lock, flags);
1423 			return FALSE;
1424 		}
1425 
1426 		init_completion(&thi->stop);
1427 		D_ASSERT(thi->task == NULL);
1428 		thi->reset_cpu_mask = 1;
1429 		thi->t_state = Running;
1430 		spin_unlock_irqrestore(&thi->t_lock, flags);
1431 		flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1432 
1433 		nt = kthread_create(drbd_thread_setup, (void *) thi,
1434 				    "drbd%d_%s", mdev_to_minor(mdev), me);
1435 
1436 		if (IS_ERR(nt)) {
1437 			dev_err(DEV, "Couldn't start thread\n");
1438 
1439 			module_put(THIS_MODULE);
1440 			return FALSE;
1441 		}
1442 		spin_lock_irqsave(&thi->t_lock, flags);
1443 		thi->task = nt;
1444 		thi->t_state = Running;
1445 		spin_unlock_irqrestore(&thi->t_lock, flags);
1446 		wake_up_process(nt);
1447 		break;
1448 	case Exiting:
1449 		thi->t_state = Restarting;
1450 		dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1451 				me, current->comm, current->pid);
1452 		/* fall through */
1453 	case Running:
1454 	case Restarting:
1455 	default:
1456 		spin_unlock_irqrestore(&thi->t_lock, flags);
1457 		break;
1458 	}
1459 
1460 	return TRUE;
1461 }
1462 
1463 
1464 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1465 {
1466 	unsigned long flags;
1467 
1468 	enum drbd_thread_state ns = restart ? Restarting : Exiting;
1469 
1470 	/* may be called from state engine, holding the req lock irqsave */
1471 	spin_lock_irqsave(&thi->t_lock, flags);
1472 
1473 	if (thi->t_state == None) {
1474 		spin_unlock_irqrestore(&thi->t_lock, flags);
1475 		if (restart)
1476 			drbd_thread_start(thi);
1477 		return;
1478 	}
1479 
1480 	if (thi->t_state != ns) {
1481 		if (thi->task == NULL) {
1482 			spin_unlock_irqrestore(&thi->t_lock, flags);
1483 			return;
1484 		}
1485 
1486 		thi->t_state = ns;
1487 		smp_mb();
1488 		init_completion(&thi->stop);
1489 		if (thi->task != current)
1490 			force_sig(DRBD_SIGKILL, thi->task);
1491 
1492 	}
1493 
1494 	spin_unlock_irqrestore(&thi->t_lock, flags);
1495 
1496 	if (wait)
1497 		wait_for_completion(&thi->stop);
1498 }
1499 
1500 #ifdef CONFIG_SMP
1501 /**
1502  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1503  * @mdev:	DRBD device.
1504  *
1505  * Forces all threads of a device onto the same CPU. This is beneficial for
1506  * DRBD's performance. May be overwritten by user's configuration.
1507  */
1508 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1509 {
1510 	int ord, cpu;
1511 
1512 	/* user override. */
1513 	if (cpumask_weight(mdev->cpu_mask))
1514 		return;
1515 
1516 	ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1517 	for_each_online_cpu(cpu) {
1518 		if (ord-- == 0) {
1519 			cpumask_set_cpu(cpu, mdev->cpu_mask);
1520 			return;
1521 		}
1522 	}
1523 	/* should not be reached */
1524 	cpumask_setall(mdev->cpu_mask);
1525 }
1526 
1527 /**
1528  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1529  * @mdev:	DRBD device.
1530  *
1531  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1532  * prematurely.
1533  */
1534 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1535 {
1536 	struct task_struct *p = current;
1537 	struct drbd_thread *thi =
1538 		p == mdev->asender.task  ? &mdev->asender  :
1539 		p == mdev->receiver.task ? &mdev->receiver :
1540 		p == mdev->worker.task   ? &mdev->worker   :
1541 		NULL;
1542 	ERR_IF(thi == NULL)
1543 		return;
1544 	if (!thi->reset_cpu_mask)
1545 		return;
1546 	thi->reset_cpu_mask = 0;
1547 	set_cpus_allowed_ptr(p, mdev->cpu_mask);
1548 }
1549 #endif
1550 
1551 /* the appropriate socket mutex must be held already */
1552 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1553 			  enum drbd_packets cmd, struct p_header *h,
1554 			  size_t size, unsigned msg_flags)
1555 {
1556 	int sent, ok;
1557 
1558 	ERR_IF(!h) return FALSE;
1559 	ERR_IF(!size) return FALSE;
1560 
1561 	h->magic   = BE_DRBD_MAGIC;
1562 	h->command = cpu_to_be16(cmd);
1563 	h->length  = cpu_to_be16(size-sizeof(struct p_header));
1564 
1565 	sent = drbd_send(mdev, sock, h, size, msg_flags);
1566 
1567 	ok = (sent == size);
1568 	if (!ok)
1569 		dev_err(DEV, "short sent %s size=%d sent=%d\n",
1570 		    cmdname(cmd), (int)size, sent);
1571 	return ok;
1572 }
1573 
1574 /* don't pass the socket. we may only look at it
1575  * when we hold the appropriate socket mutex.
1576  */
1577 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1578 		  enum drbd_packets cmd, struct p_header *h, size_t size)
1579 {
1580 	int ok = 0;
1581 	struct socket *sock;
1582 
1583 	if (use_data_socket) {
1584 		mutex_lock(&mdev->data.mutex);
1585 		sock = mdev->data.socket;
1586 	} else {
1587 		mutex_lock(&mdev->meta.mutex);
1588 		sock = mdev->meta.socket;
1589 	}
1590 
1591 	/* drbd_disconnect() could have called drbd_free_sock()
1592 	 * while we were waiting in down()... */
1593 	if (likely(sock != NULL))
1594 		ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1595 
1596 	if (use_data_socket)
1597 		mutex_unlock(&mdev->data.mutex);
1598 	else
1599 		mutex_unlock(&mdev->meta.mutex);
1600 	return ok;
1601 }
1602 
1603 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
1604 		   size_t size)
1605 {
1606 	struct p_header h;
1607 	int ok;
1608 
1609 	h.magic   = BE_DRBD_MAGIC;
1610 	h.command = cpu_to_be16(cmd);
1611 	h.length  = cpu_to_be16(size);
1612 
1613 	if (!drbd_get_data_sock(mdev))
1614 		return 0;
1615 
1616 	ok = (sizeof(h) ==
1617 		drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
1618 	ok = ok && (size ==
1619 		drbd_send(mdev, mdev->data.socket, data, size, 0));
1620 
1621 	drbd_put_data_sock(mdev);
1622 
1623 	return ok;
1624 }
1625 
1626 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
1627 {
1628 	struct p_rs_param_89 *p;
1629 	struct socket *sock;
1630 	int size, rv;
1631 	const int apv = mdev->agreed_pro_version;
1632 
1633 	size = apv <= 87 ? sizeof(struct p_rs_param)
1634 		: apv == 88 ? sizeof(struct p_rs_param)
1635 			+ strlen(mdev->sync_conf.verify_alg) + 1
1636 		: /* 89 */    sizeof(struct p_rs_param_89);
1637 
1638 	/* used from admin command context and receiver/worker context.
1639 	 * to avoid kmalloc, grab the socket right here,
1640 	 * then use the pre-allocated sbuf there */
1641 	mutex_lock(&mdev->data.mutex);
1642 	sock = mdev->data.socket;
1643 
1644 	if (likely(sock != NULL)) {
1645 		enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
1646 
1647 		p = &mdev->data.sbuf.rs_param_89;
1648 
1649 		/* initialize verify_alg and csums_alg */
1650 		memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
1651 
1652 		p->rate = cpu_to_be32(sc->rate);
1653 
1654 		if (apv >= 88)
1655 			strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
1656 		if (apv >= 89)
1657 			strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
1658 
1659 		rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
1660 	} else
1661 		rv = 0; /* not ok */
1662 
1663 	mutex_unlock(&mdev->data.mutex);
1664 
1665 	return rv;
1666 }
1667 
1668 int drbd_send_protocol(struct drbd_conf *mdev)
1669 {
1670 	struct p_protocol *p;
1671 	int size, cf, rv;
1672 
1673 	size = sizeof(struct p_protocol);
1674 
1675 	if (mdev->agreed_pro_version >= 87)
1676 		size += strlen(mdev->net_conf->integrity_alg) + 1;
1677 
1678 	/* we must not recurse into our own queue,
1679 	 * as that is blocked during handshake */
1680 	p = kmalloc(size, GFP_NOIO);
1681 	if (p == NULL)
1682 		return 0;
1683 
1684 	p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
1685 	p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
1686 	p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
1687 	p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
1688 	p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
1689 
1690 	cf = 0;
1691 	if (mdev->net_conf->want_lose)
1692 		cf |= CF_WANT_LOSE;
1693 	if (mdev->net_conf->dry_run) {
1694 		if (mdev->agreed_pro_version >= 92)
1695 			cf |= CF_DRY_RUN;
1696 		else {
1697 			dev_err(DEV, "--dry-run is not supported by peer");
1698 			return 0;
1699 		}
1700 	}
1701 	p->conn_flags    = cpu_to_be32(cf);
1702 
1703 	if (mdev->agreed_pro_version >= 87)
1704 		strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
1705 
1706 	rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
1707 			   (struct p_header *)p, size);
1708 	kfree(p);
1709 	return rv;
1710 }
1711 
1712 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1713 {
1714 	struct p_uuids p;
1715 	int i;
1716 
1717 	if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1718 		return 1;
1719 
1720 	for (i = UI_CURRENT; i < UI_SIZE; i++)
1721 		p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1722 
1723 	mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1724 	p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1725 	uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
1726 	uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1727 	uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1728 	p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1729 
1730 	put_ldev(mdev);
1731 
1732 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
1733 			     (struct p_header *)&p, sizeof(p));
1734 }
1735 
1736 int drbd_send_uuids(struct drbd_conf *mdev)
1737 {
1738 	return _drbd_send_uuids(mdev, 0);
1739 }
1740 
1741 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1742 {
1743 	return _drbd_send_uuids(mdev, 8);
1744 }
1745 
1746 
1747 int drbd_send_sync_uuid(struct drbd_conf *mdev, u64 val)
1748 {
1749 	struct p_rs_uuid p;
1750 
1751 	p.uuid = cpu_to_be64(val);
1752 
1753 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
1754 			     (struct p_header *)&p, sizeof(p));
1755 }
1756 
1757 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply)
1758 {
1759 	struct p_sizes p;
1760 	sector_t d_size, u_size;
1761 	int q_order_type;
1762 	int ok;
1763 
1764 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1765 		D_ASSERT(mdev->ldev->backing_bdev);
1766 		d_size = drbd_get_max_capacity(mdev->ldev);
1767 		u_size = mdev->ldev->dc.disk_size;
1768 		q_order_type = drbd_queue_order_type(mdev);
1769 		p.queue_order_type = cpu_to_be32(drbd_queue_order_type(mdev));
1770 		put_ldev(mdev);
1771 	} else {
1772 		d_size = 0;
1773 		u_size = 0;
1774 		q_order_type = QUEUE_ORDERED_NONE;
1775 	}
1776 
1777 	p.d_size = cpu_to_be64(d_size);
1778 	p.u_size = cpu_to_be64(u_size);
1779 	p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1780 	p.max_segment_size = cpu_to_be32(queue_max_segment_size(mdev->rq_queue));
1781 	p.queue_order_type = cpu_to_be32(q_order_type);
1782 
1783 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
1784 			   (struct p_header *)&p, sizeof(p));
1785 	return ok;
1786 }
1787 
1788 /**
1789  * drbd_send_state() - Sends the drbd state to the peer
1790  * @mdev:	DRBD device.
1791  */
1792 int drbd_send_state(struct drbd_conf *mdev)
1793 {
1794 	struct socket *sock;
1795 	struct p_state p;
1796 	int ok = 0;
1797 
1798 	/* Grab state lock so we wont send state if we're in the middle
1799 	 * of a cluster wide state change on another thread */
1800 	drbd_state_lock(mdev);
1801 
1802 	mutex_lock(&mdev->data.mutex);
1803 
1804 	p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1805 	sock = mdev->data.socket;
1806 
1807 	if (likely(sock != NULL)) {
1808 		ok = _drbd_send_cmd(mdev, sock, P_STATE,
1809 				    (struct p_header *)&p, sizeof(p), 0);
1810 	}
1811 
1812 	mutex_unlock(&mdev->data.mutex);
1813 
1814 	drbd_state_unlock(mdev);
1815 	return ok;
1816 }
1817 
1818 int drbd_send_state_req(struct drbd_conf *mdev,
1819 	union drbd_state mask, union drbd_state val)
1820 {
1821 	struct p_req_state p;
1822 
1823 	p.mask    = cpu_to_be32(mask.i);
1824 	p.val     = cpu_to_be32(val.i);
1825 
1826 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
1827 			     (struct p_header *)&p, sizeof(p));
1828 }
1829 
1830 int drbd_send_sr_reply(struct drbd_conf *mdev, int retcode)
1831 {
1832 	struct p_req_state_reply p;
1833 
1834 	p.retcode    = cpu_to_be32(retcode);
1835 
1836 	return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
1837 			     (struct p_header *)&p, sizeof(p));
1838 }
1839 
1840 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1841 	struct p_compressed_bm *p,
1842 	struct bm_xfer_ctx *c)
1843 {
1844 	struct bitstream bs;
1845 	unsigned long plain_bits;
1846 	unsigned long tmp;
1847 	unsigned long rl;
1848 	unsigned len;
1849 	unsigned toggle;
1850 	int bits;
1851 
1852 	/* may we use this feature? */
1853 	if ((mdev->sync_conf.use_rle == 0) ||
1854 		(mdev->agreed_pro_version < 90))
1855 			return 0;
1856 
1857 	if (c->bit_offset >= c->bm_bits)
1858 		return 0; /* nothing to do. */
1859 
1860 	/* use at most thus many bytes */
1861 	bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1862 	memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1863 	/* plain bits covered in this code string */
1864 	plain_bits = 0;
1865 
1866 	/* p->encoding & 0x80 stores whether the first run length is set.
1867 	 * bit offset is implicit.
1868 	 * start with toggle == 2 to be able to tell the first iteration */
1869 	toggle = 2;
1870 
1871 	/* see how much plain bits we can stuff into one packet
1872 	 * using RLE and VLI. */
1873 	do {
1874 		tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1875 				    : _drbd_bm_find_next(mdev, c->bit_offset);
1876 		if (tmp == -1UL)
1877 			tmp = c->bm_bits;
1878 		rl = tmp - c->bit_offset;
1879 
1880 		if (toggle == 2) { /* first iteration */
1881 			if (rl == 0) {
1882 				/* the first checked bit was set,
1883 				 * store start value, */
1884 				DCBP_set_start(p, 1);
1885 				/* but skip encoding of zero run length */
1886 				toggle = !toggle;
1887 				continue;
1888 			}
1889 			DCBP_set_start(p, 0);
1890 		}
1891 
1892 		/* paranoia: catch zero runlength.
1893 		 * can only happen if bitmap is modified while we scan it. */
1894 		if (rl == 0) {
1895 			dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1896 			    "t:%u bo:%lu\n", toggle, c->bit_offset);
1897 			return -1;
1898 		}
1899 
1900 		bits = vli_encode_bits(&bs, rl);
1901 		if (bits == -ENOBUFS) /* buffer full */
1902 			break;
1903 		if (bits <= 0) {
1904 			dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1905 			return 0;
1906 		}
1907 
1908 		toggle = !toggle;
1909 		plain_bits += rl;
1910 		c->bit_offset = tmp;
1911 	} while (c->bit_offset < c->bm_bits);
1912 
1913 	len = bs.cur.b - p->code + !!bs.cur.bit;
1914 
1915 	if (plain_bits < (len << 3)) {
1916 		/* incompressible with this method.
1917 		 * we need to rewind both word and bit position. */
1918 		c->bit_offset -= plain_bits;
1919 		bm_xfer_ctx_bit_to_word_offset(c);
1920 		c->bit_offset = c->word_offset * BITS_PER_LONG;
1921 		return 0;
1922 	}
1923 
1924 	/* RLE + VLI was able to compress it just fine.
1925 	 * update c->word_offset. */
1926 	bm_xfer_ctx_bit_to_word_offset(c);
1927 
1928 	/* store pad_bits */
1929 	DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1930 
1931 	return len;
1932 }
1933 
1934 enum { OK, FAILED, DONE }
1935 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
1936 	struct p_header *h, struct bm_xfer_ctx *c)
1937 {
1938 	struct p_compressed_bm *p = (void*)h;
1939 	unsigned long num_words;
1940 	int len;
1941 	int ok;
1942 
1943 	len = fill_bitmap_rle_bits(mdev, p, c);
1944 
1945 	if (len < 0)
1946 		return FAILED;
1947 
1948 	if (len) {
1949 		DCBP_set_code(p, RLE_VLI_Bits);
1950 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
1951 			sizeof(*p) + len, 0);
1952 
1953 		c->packets[0]++;
1954 		c->bytes[0] += sizeof(*p) + len;
1955 
1956 		if (c->bit_offset >= c->bm_bits)
1957 			len = 0; /* DONE */
1958 	} else {
1959 		/* was not compressible.
1960 		 * send a buffer full of plain text bits instead. */
1961 		num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
1962 		len = num_words * sizeof(long);
1963 		if (len)
1964 			drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
1965 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
1966 				   h, sizeof(struct p_header) + len, 0);
1967 		c->word_offset += num_words;
1968 		c->bit_offset = c->word_offset * BITS_PER_LONG;
1969 
1970 		c->packets[1]++;
1971 		c->bytes[1] += sizeof(struct p_header) + len;
1972 
1973 		if (c->bit_offset > c->bm_bits)
1974 			c->bit_offset = c->bm_bits;
1975 	}
1976 	ok = ok ? ((len == 0) ? DONE : OK) : FAILED;
1977 
1978 	if (ok == DONE)
1979 		INFO_bm_xfer_stats(mdev, "send", c);
1980 	return ok;
1981 }
1982 
1983 /* See the comment at receive_bitmap() */
1984 int _drbd_send_bitmap(struct drbd_conf *mdev)
1985 {
1986 	struct bm_xfer_ctx c;
1987 	struct p_header *p;
1988 	int ret;
1989 
1990 	ERR_IF(!mdev->bitmap) return FALSE;
1991 
1992 	/* maybe we should use some per thread scratch page,
1993 	 * and allocate that during initial device creation? */
1994 	p = (struct p_header *) __get_free_page(GFP_NOIO);
1995 	if (!p) {
1996 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
1997 		return FALSE;
1998 	}
1999 
2000 	if (get_ldev(mdev)) {
2001 		if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2002 			dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2003 			drbd_bm_set_all(mdev);
2004 			if (drbd_bm_write(mdev)) {
2005 				/* write_bm did fail! Leave full sync flag set in Meta P_DATA
2006 				 * but otherwise process as per normal - need to tell other
2007 				 * side that a full resync is required! */
2008 				dev_err(DEV, "Failed to write bitmap to disk!\n");
2009 			} else {
2010 				drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2011 				drbd_md_sync(mdev);
2012 			}
2013 		}
2014 		put_ldev(mdev);
2015 	}
2016 
2017 	c = (struct bm_xfer_ctx) {
2018 		.bm_bits = drbd_bm_bits(mdev),
2019 		.bm_words = drbd_bm_words(mdev),
2020 	};
2021 
2022 	do {
2023 		ret = send_bitmap_rle_or_plain(mdev, p, &c);
2024 	} while (ret == OK);
2025 
2026 	free_page((unsigned long) p);
2027 	return (ret == DONE);
2028 }
2029 
2030 int drbd_send_bitmap(struct drbd_conf *mdev)
2031 {
2032 	int err;
2033 
2034 	if (!drbd_get_data_sock(mdev))
2035 		return -1;
2036 	err = !_drbd_send_bitmap(mdev);
2037 	drbd_put_data_sock(mdev);
2038 	return err;
2039 }
2040 
2041 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2042 {
2043 	int ok;
2044 	struct p_barrier_ack p;
2045 
2046 	p.barrier  = barrier_nr;
2047 	p.set_size = cpu_to_be32(set_size);
2048 
2049 	if (mdev->state.conn < C_CONNECTED)
2050 		return FALSE;
2051 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2052 			(struct p_header *)&p, sizeof(p));
2053 	return ok;
2054 }
2055 
2056 /**
2057  * _drbd_send_ack() - Sends an ack packet
2058  * @mdev:	DRBD device.
2059  * @cmd:	Packet command code.
2060  * @sector:	sector, needs to be in big endian byte order
2061  * @blksize:	size in byte, needs to be in big endian byte order
2062  * @block_id:	Id, big endian byte order
2063  */
2064 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2065 			  u64 sector,
2066 			  u32 blksize,
2067 			  u64 block_id)
2068 {
2069 	int ok;
2070 	struct p_block_ack p;
2071 
2072 	p.sector   = sector;
2073 	p.block_id = block_id;
2074 	p.blksize  = blksize;
2075 	p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2076 
2077 	if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2078 		return FALSE;
2079 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2080 				(struct p_header *)&p, sizeof(p));
2081 	return ok;
2082 }
2083 
2084 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2085 		     struct p_data *dp)
2086 {
2087 	const int header_size = sizeof(struct p_data)
2088 			      - sizeof(struct p_header);
2089 	int data_size  = ((struct p_header *)dp)->length - header_size;
2090 
2091 	return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2092 			      dp->block_id);
2093 }
2094 
2095 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2096 		     struct p_block_req *rp)
2097 {
2098 	return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2099 }
2100 
2101 /**
2102  * drbd_send_ack() - Sends an ack packet
2103  * @mdev:	DRBD device.
2104  * @cmd:	Packet command code.
2105  * @e:		Epoch entry.
2106  */
2107 int drbd_send_ack(struct drbd_conf *mdev,
2108 	enum drbd_packets cmd, struct drbd_epoch_entry *e)
2109 {
2110 	return _drbd_send_ack(mdev, cmd,
2111 			      cpu_to_be64(e->sector),
2112 			      cpu_to_be32(e->size),
2113 			      e->block_id);
2114 }
2115 
2116 /* This function misuses the block_id field to signal if the blocks
2117  * are is sync or not. */
2118 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2119 		     sector_t sector, int blksize, u64 block_id)
2120 {
2121 	return _drbd_send_ack(mdev, cmd,
2122 			      cpu_to_be64(sector),
2123 			      cpu_to_be32(blksize),
2124 			      cpu_to_be64(block_id));
2125 }
2126 
2127 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2128 		       sector_t sector, int size, u64 block_id)
2129 {
2130 	int ok;
2131 	struct p_block_req p;
2132 
2133 	p.sector   = cpu_to_be64(sector);
2134 	p.block_id = block_id;
2135 	p.blksize  = cpu_to_be32(size);
2136 
2137 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2138 				(struct p_header *)&p, sizeof(p));
2139 	return ok;
2140 }
2141 
2142 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2143 			    sector_t sector, int size,
2144 			    void *digest, int digest_size,
2145 			    enum drbd_packets cmd)
2146 {
2147 	int ok;
2148 	struct p_block_req p;
2149 
2150 	p.sector   = cpu_to_be64(sector);
2151 	p.block_id = BE_DRBD_MAGIC + 0xbeef;
2152 	p.blksize  = cpu_to_be32(size);
2153 
2154 	p.head.magic   = BE_DRBD_MAGIC;
2155 	p.head.command = cpu_to_be16(cmd);
2156 	p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header) + digest_size);
2157 
2158 	mutex_lock(&mdev->data.mutex);
2159 
2160 	ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2161 	ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2162 
2163 	mutex_unlock(&mdev->data.mutex);
2164 
2165 	return ok;
2166 }
2167 
2168 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2169 {
2170 	int ok;
2171 	struct p_block_req p;
2172 
2173 	p.sector   = cpu_to_be64(sector);
2174 	p.block_id = BE_DRBD_MAGIC + 0xbabe;
2175 	p.blksize  = cpu_to_be32(size);
2176 
2177 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2178 			   (struct p_header *)&p, sizeof(p));
2179 	return ok;
2180 }
2181 
2182 /* called on sndtimeo
2183  * returns FALSE if we should retry,
2184  * TRUE if we think connection is dead
2185  */
2186 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2187 {
2188 	int drop_it;
2189 	/* long elapsed = (long)(jiffies - mdev->last_received); */
2190 
2191 	drop_it =   mdev->meta.socket == sock
2192 		|| !mdev->asender.task
2193 		|| get_t_state(&mdev->asender) != Running
2194 		|| mdev->state.conn < C_CONNECTED;
2195 
2196 	if (drop_it)
2197 		return TRUE;
2198 
2199 	drop_it = !--mdev->ko_count;
2200 	if (!drop_it) {
2201 		dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2202 		       current->comm, current->pid, mdev->ko_count);
2203 		request_ping(mdev);
2204 	}
2205 
2206 	return drop_it; /* && (mdev->state == R_PRIMARY) */;
2207 }
2208 
2209 /* The idea of sendpage seems to be to put some kind of reference
2210  * to the page into the skb, and to hand it over to the NIC. In
2211  * this process get_page() gets called.
2212  *
2213  * As soon as the page was really sent over the network put_page()
2214  * gets called by some part of the network layer. [ NIC driver? ]
2215  *
2216  * [ get_page() / put_page() increment/decrement the count. If count
2217  *   reaches 0 the page will be freed. ]
2218  *
2219  * This works nicely with pages from FSs.
2220  * But this means that in protocol A we might signal IO completion too early!
2221  *
2222  * In order not to corrupt data during a resync we must make sure
2223  * that we do not reuse our own buffer pages (EEs) to early, therefore
2224  * we have the net_ee list.
2225  *
2226  * XFS seems to have problems, still, it submits pages with page_count == 0!
2227  * As a workaround, we disable sendpage on pages
2228  * with page_count == 0 or PageSlab.
2229  */
2230 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2231 		   int offset, size_t size)
2232 {
2233 	int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, 0);
2234 	kunmap(page);
2235 	if (sent == size)
2236 		mdev->send_cnt += size>>9;
2237 	return sent == size;
2238 }
2239 
2240 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2241 		    int offset, size_t size)
2242 {
2243 	mm_segment_t oldfs = get_fs();
2244 	int sent, ok;
2245 	int len = size;
2246 
2247 	/* e.g. XFS meta- & log-data is in slab pages, which have a
2248 	 * page_count of 0 and/or have PageSlab() set.
2249 	 * we cannot use send_page for those, as that does get_page();
2250 	 * put_page(); and would cause either a VM_BUG directly, or
2251 	 * __page_cache_release a page that would actually still be referenced
2252 	 * by someone, leading to some obscure delayed Oops somewhere else. */
2253 	if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2254 		return _drbd_no_send_page(mdev, page, offset, size);
2255 
2256 	drbd_update_congested(mdev);
2257 	set_fs(KERNEL_DS);
2258 	do {
2259 		sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2260 							offset, len,
2261 							MSG_NOSIGNAL);
2262 		if (sent == -EAGAIN) {
2263 			if (we_should_drop_the_connection(mdev,
2264 							  mdev->data.socket))
2265 				break;
2266 			else
2267 				continue;
2268 		}
2269 		if (sent <= 0) {
2270 			dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2271 			     __func__, (int)size, len, sent);
2272 			break;
2273 		}
2274 		len    -= sent;
2275 		offset += sent;
2276 	} while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2277 	set_fs(oldfs);
2278 	clear_bit(NET_CONGESTED, &mdev->flags);
2279 
2280 	ok = (len == 0);
2281 	if (likely(ok))
2282 		mdev->send_cnt += size>>9;
2283 	return ok;
2284 }
2285 
2286 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2287 {
2288 	struct bio_vec *bvec;
2289 	int i;
2290 	__bio_for_each_segment(bvec, bio, i, 0) {
2291 		if (!_drbd_no_send_page(mdev, bvec->bv_page,
2292 				     bvec->bv_offset, bvec->bv_len))
2293 			return 0;
2294 	}
2295 	return 1;
2296 }
2297 
2298 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2299 {
2300 	struct bio_vec *bvec;
2301 	int i;
2302 	__bio_for_each_segment(bvec, bio, i, 0) {
2303 		if (!_drbd_send_page(mdev, bvec->bv_page,
2304 				     bvec->bv_offset, bvec->bv_len))
2305 			return 0;
2306 	}
2307 
2308 	return 1;
2309 }
2310 
2311 /* Used to send write requests
2312  * R_PRIMARY -> Peer	(P_DATA)
2313  */
2314 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2315 {
2316 	int ok = 1;
2317 	struct p_data p;
2318 	unsigned int dp_flags = 0;
2319 	void *dgb;
2320 	int dgs;
2321 
2322 	if (!drbd_get_data_sock(mdev))
2323 		return 0;
2324 
2325 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2326 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2327 
2328 	p.head.magic   = BE_DRBD_MAGIC;
2329 	p.head.command = cpu_to_be16(P_DATA);
2330 	p.head.length  =
2331 		cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + req->size);
2332 
2333 	p.sector   = cpu_to_be64(req->sector);
2334 	p.block_id = (unsigned long)req;
2335 	p.seq_num  = cpu_to_be32(req->seq_num =
2336 				 atomic_add_return(1, &mdev->packet_seq));
2337 	dp_flags = 0;
2338 
2339 	/* NOTE: no need to check if barriers supported here as we would
2340 	 *       not pass the test in make_request_common in that case
2341 	 */
2342 	if (bio_rw_flagged(req->master_bio, BIO_RW_BARRIER)) {
2343 		dev_err(DEV, "ASSERT FAILED would have set DP_HARDBARRIER\n");
2344 		/* dp_flags |= DP_HARDBARRIER; */
2345 	}
2346 	if (bio_rw_flagged(req->master_bio, BIO_RW_SYNCIO))
2347 		dp_flags |= DP_RW_SYNC;
2348 	/* for now handle SYNCIO and UNPLUG
2349 	 * as if they still were one and the same flag */
2350 	if (bio_rw_flagged(req->master_bio, BIO_RW_UNPLUG))
2351 		dp_flags |= DP_RW_SYNC;
2352 	if (mdev->state.conn >= C_SYNC_SOURCE &&
2353 	    mdev->state.conn <= C_PAUSED_SYNC_T)
2354 		dp_flags |= DP_MAY_SET_IN_SYNC;
2355 
2356 	p.dp_flags = cpu_to_be32(dp_flags);
2357 	set_bit(UNPLUG_REMOTE, &mdev->flags);
2358 	ok = (sizeof(p) ==
2359 		drbd_send(mdev, mdev->data.socket, &p, sizeof(p), MSG_MORE));
2360 	if (ok && dgs) {
2361 		dgb = mdev->int_dig_out;
2362 		drbd_csum(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2363 		ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
2364 	}
2365 	if (ok) {
2366 		if (mdev->net_conf->wire_protocol == DRBD_PROT_A)
2367 			ok = _drbd_send_bio(mdev, req->master_bio);
2368 		else
2369 			ok = _drbd_send_zc_bio(mdev, req->master_bio);
2370 	}
2371 
2372 	drbd_put_data_sock(mdev);
2373 	return ok;
2374 }
2375 
2376 /* answer packet, used to send data back for read requests:
2377  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2378  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2379  */
2380 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2381 		    struct drbd_epoch_entry *e)
2382 {
2383 	int ok;
2384 	struct p_data p;
2385 	void *dgb;
2386 	int dgs;
2387 
2388 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2389 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2390 
2391 	p.head.magic   = BE_DRBD_MAGIC;
2392 	p.head.command = cpu_to_be16(cmd);
2393 	p.head.length  =
2394 		cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + e->size);
2395 
2396 	p.sector   = cpu_to_be64(e->sector);
2397 	p.block_id = e->block_id;
2398 	/* p.seq_num  = 0;    No sequence numbers here.. */
2399 
2400 	/* Only called by our kernel thread.
2401 	 * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2402 	 * in response to admin command or module unload.
2403 	 */
2404 	if (!drbd_get_data_sock(mdev))
2405 		return 0;
2406 
2407 	ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p,
2408 					sizeof(p), MSG_MORE);
2409 	if (ok && dgs) {
2410 		dgb = mdev->int_dig_out;
2411 		drbd_csum(mdev, mdev->integrity_w_tfm, e->private_bio, dgb);
2412 		ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
2413 	}
2414 	if (ok)
2415 		ok = _drbd_send_zc_bio(mdev, e->private_bio);
2416 
2417 	drbd_put_data_sock(mdev);
2418 	return ok;
2419 }
2420 
2421 /*
2422   drbd_send distinguishes two cases:
2423 
2424   Packets sent via the data socket "sock"
2425   and packets sent via the meta data socket "msock"
2426 
2427 		    sock                      msock
2428   -----------------+-------------------------+------------------------------
2429   timeout           conf.timeout / 2          conf.timeout / 2
2430   timeout action    send a ping via msock     Abort communication
2431 					      and close all sockets
2432 */
2433 
2434 /*
2435  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2436  */
2437 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2438 	      void *buf, size_t size, unsigned msg_flags)
2439 {
2440 	struct kvec iov;
2441 	struct msghdr msg;
2442 	int rv, sent = 0;
2443 
2444 	if (!sock)
2445 		return -1000;
2446 
2447 	/* THINK  if (signal_pending) return ... ? */
2448 
2449 	iov.iov_base = buf;
2450 	iov.iov_len  = size;
2451 
2452 	msg.msg_name       = NULL;
2453 	msg.msg_namelen    = 0;
2454 	msg.msg_control    = NULL;
2455 	msg.msg_controllen = 0;
2456 	msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
2457 
2458 	if (sock == mdev->data.socket) {
2459 		mdev->ko_count = mdev->net_conf->ko_count;
2460 		drbd_update_congested(mdev);
2461 	}
2462 	do {
2463 		/* STRANGE
2464 		 * tcp_sendmsg does _not_ use its size parameter at all ?
2465 		 *
2466 		 * -EAGAIN on timeout, -EINTR on signal.
2467 		 */
2468 /* THINK
2469  * do we need to block DRBD_SIG if sock == &meta.socket ??
2470  * otherwise wake_asender() might interrupt some send_*Ack !
2471  */
2472 		rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
2473 		if (rv == -EAGAIN) {
2474 			if (we_should_drop_the_connection(mdev, sock))
2475 				break;
2476 			else
2477 				continue;
2478 		}
2479 		D_ASSERT(rv != 0);
2480 		if (rv == -EINTR) {
2481 			flush_signals(current);
2482 			rv = 0;
2483 		}
2484 		if (rv < 0)
2485 			break;
2486 		sent += rv;
2487 		iov.iov_base += rv;
2488 		iov.iov_len  -= rv;
2489 	} while (sent < size);
2490 
2491 	if (sock == mdev->data.socket)
2492 		clear_bit(NET_CONGESTED, &mdev->flags);
2493 
2494 	if (rv <= 0) {
2495 		if (rv != -EAGAIN) {
2496 			dev_err(DEV, "%s_sendmsg returned %d\n",
2497 			    sock == mdev->meta.socket ? "msock" : "sock",
2498 			    rv);
2499 			drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
2500 		} else
2501 			drbd_force_state(mdev, NS(conn, C_TIMEOUT));
2502 	}
2503 
2504 	return sent;
2505 }
2506 
2507 static int drbd_open(struct block_device *bdev, fmode_t mode)
2508 {
2509 	struct drbd_conf *mdev = bdev->bd_disk->private_data;
2510 	unsigned long flags;
2511 	int rv = 0;
2512 
2513 	spin_lock_irqsave(&mdev->req_lock, flags);
2514 	/* to have a stable mdev->state.role
2515 	 * and no race with updating open_cnt */
2516 
2517 	if (mdev->state.role != R_PRIMARY) {
2518 		if (mode & FMODE_WRITE)
2519 			rv = -EROFS;
2520 		else if (!allow_oos)
2521 			rv = -EMEDIUMTYPE;
2522 	}
2523 
2524 	if (!rv)
2525 		mdev->open_cnt++;
2526 	spin_unlock_irqrestore(&mdev->req_lock, flags);
2527 
2528 	return rv;
2529 }
2530 
2531 static int drbd_release(struct gendisk *gd, fmode_t mode)
2532 {
2533 	struct drbd_conf *mdev = gd->private_data;
2534 	mdev->open_cnt--;
2535 	return 0;
2536 }
2537 
2538 static void drbd_unplug_fn(struct request_queue *q)
2539 {
2540 	struct drbd_conf *mdev = q->queuedata;
2541 
2542 	/* unplug FIRST */
2543 	spin_lock_irq(q->queue_lock);
2544 	blk_remove_plug(q);
2545 	spin_unlock_irq(q->queue_lock);
2546 
2547 	/* only if connected */
2548 	spin_lock_irq(&mdev->req_lock);
2549 	if (mdev->state.pdsk >= D_INCONSISTENT && mdev->state.conn >= C_CONNECTED) {
2550 		D_ASSERT(mdev->state.role == R_PRIMARY);
2551 		if (test_and_clear_bit(UNPLUG_REMOTE, &mdev->flags)) {
2552 			/* add to the data.work queue,
2553 			 * unless already queued.
2554 			 * XXX this might be a good addition to drbd_queue_work
2555 			 * anyways, to detect "double queuing" ... */
2556 			if (list_empty(&mdev->unplug_work.list))
2557 				drbd_queue_work(&mdev->data.work,
2558 						&mdev->unplug_work);
2559 		}
2560 	}
2561 	spin_unlock_irq(&mdev->req_lock);
2562 
2563 	if (mdev->state.disk >= D_INCONSISTENT)
2564 		drbd_kick_lo(mdev);
2565 }
2566 
2567 static void drbd_set_defaults(struct drbd_conf *mdev)
2568 {
2569 	mdev->sync_conf.after      = DRBD_AFTER_DEF;
2570 	mdev->sync_conf.rate       = DRBD_RATE_DEF;
2571 	mdev->sync_conf.al_extents = DRBD_AL_EXTENTS_DEF;
2572 	mdev->state = (union drbd_state) {
2573 		{ .role = R_SECONDARY,
2574 		  .peer = R_UNKNOWN,
2575 		  .conn = C_STANDALONE,
2576 		  .disk = D_DISKLESS,
2577 		  .pdsk = D_UNKNOWN,
2578 		  .susp = 0
2579 		} };
2580 }
2581 
2582 void drbd_init_set_defaults(struct drbd_conf *mdev)
2583 {
2584 	/* the memset(,0,) did most of this.
2585 	 * note: only assignments, no allocation in here */
2586 
2587 	drbd_set_defaults(mdev);
2588 
2589 	/* for now, we do NOT yet support it,
2590 	 * even though we start some framework
2591 	 * to eventually support barriers */
2592 	set_bit(NO_BARRIER_SUPP, &mdev->flags);
2593 
2594 	atomic_set(&mdev->ap_bio_cnt, 0);
2595 	atomic_set(&mdev->ap_pending_cnt, 0);
2596 	atomic_set(&mdev->rs_pending_cnt, 0);
2597 	atomic_set(&mdev->unacked_cnt, 0);
2598 	atomic_set(&mdev->local_cnt, 0);
2599 	atomic_set(&mdev->net_cnt, 0);
2600 	atomic_set(&mdev->packet_seq, 0);
2601 	atomic_set(&mdev->pp_in_use, 0);
2602 
2603 	mutex_init(&mdev->md_io_mutex);
2604 	mutex_init(&mdev->data.mutex);
2605 	mutex_init(&mdev->meta.mutex);
2606 	sema_init(&mdev->data.work.s, 0);
2607 	sema_init(&mdev->meta.work.s, 0);
2608 	mutex_init(&mdev->state_mutex);
2609 
2610 	spin_lock_init(&mdev->data.work.q_lock);
2611 	spin_lock_init(&mdev->meta.work.q_lock);
2612 
2613 	spin_lock_init(&mdev->al_lock);
2614 	spin_lock_init(&mdev->req_lock);
2615 	spin_lock_init(&mdev->peer_seq_lock);
2616 	spin_lock_init(&mdev->epoch_lock);
2617 
2618 	INIT_LIST_HEAD(&mdev->active_ee);
2619 	INIT_LIST_HEAD(&mdev->sync_ee);
2620 	INIT_LIST_HEAD(&mdev->done_ee);
2621 	INIT_LIST_HEAD(&mdev->read_ee);
2622 	INIT_LIST_HEAD(&mdev->net_ee);
2623 	INIT_LIST_HEAD(&mdev->resync_reads);
2624 	INIT_LIST_HEAD(&mdev->data.work.q);
2625 	INIT_LIST_HEAD(&mdev->meta.work.q);
2626 	INIT_LIST_HEAD(&mdev->resync_work.list);
2627 	INIT_LIST_HEAD(&mdev->unplug_work.list);
2628 	INIT_LIST_HEAD(&mdev->md_sync_work.list);
2629 	INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2630 	mdev->resync_work.cb  = w_resync_inactive;
2631 	mdev->unplug_work.cb  = w_send_write_hint;
2632 	mdev->md_sync_work.cb = w_md_sync;
2633 	mdev->bm_io_work.w.cb = w_bitmap_io;
2634 	init_timer(&mdev->resync_timer);
2635 	init_timer(&mdev->md_sync_timer);
2636 	mdev->resync_timer.function = resync_timer_fn;
2637 	mdev->resync_timer.data = (unsigned long) mdev;
2638 	mdev->md_sync_timer.function = md_sync_timer_fn;
2639 	mdev->md_sync_timer.data = (unsigned long) mdev;
2640 
2641 	init_waitqueue_head(&mdev->misc_wait);
2642 	init_waitqueue_head(&mdev->state_wait);
2643 	init_waitqueue_head(&mdev->ee_wait);
2644 	init_waitqueue_head(&mdev->al_wait);
2645 	init_waitqueue_head(&mdev->seq_wait);
2646 
2647 	drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
2648 	drbd_thread_init(mdev, &mdev->worker, drbd_worker);
2649 	drbd_thread_init(mdev, &mdev->asender, drbd_asender);
2650 
2651 	mdev->agreed_pro_version = PRO_VERSION_MAX;
2652 	mdev->write_ordering = WO_bio_barrier;
2653 	mdev->resync_wenr = LC_FREE;
2654 }
2655 
2656 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2657 {
2658 	if (mdev->receiver.t_state != None)
2659 		dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2660 				mdev->receiver.t_state);
2661 
2662 	/* no need to lock it, I'm the only thread alive */
2663 	if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2664 		dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2665 	mdev->al_writ_cnt  =
2666 	mdev->bm_writ_cnt  =
2667 	mdev->read_cnt     =
2668 	mdev->recv_cnt     =
2669 	mdev->send_cnt     =
2670 	mdev->writ_cnt     =
2671 	mdev->p_size       =
2672 	mdev->rs_start     =
2673 	mdev->rs_total     =
2674 	mdev->rs_failed    =
2675 	mdev->rs_mark_left =
2676 	mdev->rs_mark_time = 0;
2677 	D_ASSERT(mdev->net_conf == NULL);
2678 
2679 	drbd_set_my_capacity(mdev, 0);
2680 	if (mdev->bitmap) {
2681 		/* maybe never allocated. */
2682 		drbd_bm_resize(mdev, 0);
2683 		drbd_bm_cleanup(mdev);
2684 	}
2685 
2686 	drbd_free_resources(mdev);
2687 
2688 	/*
2689 	 * currently we drbd_init_ee only on module load, so
2690 	 * we may do drbd_release_ee only on module unload!
2691 	 */
2692 	D_ASSERT(list_empty(&mdev->active_ee));
2693 	D_ASSERT(list_empty(&mdev->sync_ee));
2694 	D_ASSERT(list_empty(&mdev->done_ee));
2695 	D_ASSERT(list_empty(&mdev->read_ee));
2696 	D_ASSERT(list_empty(&mdev->net_ee));
2697 	D_ASSERT(list_empty(&mdev->resync_reads));
2698 	D_ASSERT(list_empty(&mdev->data.work.q));
2699 	D_ASSERT(list_empty(&mdev->meta.work.q));
2700 	D_ASSERT(list_empty(&mdev->resync_work.list));
2701 	D_ASSERT(list_empty(&mdev->unplug_work.list));
2702 
2703 }
2704 
2705 
2706 static void drbd_destroy_mempools(void)
2707 {
2708 	struct page *page;
2709 
2710 	while (drbd_pp_pool) {
2711 		page = drbd_pp_pool;
2712 		drbd_pp_pool = (struct page *)page_private(page);
2713 		__free_page(page);
2714 		drbd_pp_vacant--;
2715 	}
2716 
2717 	/* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2718 
2719 	if (drbd_ee_mempool)
2720 		mempool_destroy(drbd_ee_mempool);
2721 	if (drbd_request_mempool)
2722 		mempool_destroy(drbd_request_mempool);
2723 	if (drbd_ee_cache)
2724 		kmem_cache_destroy(drbd_ee_cache);
2725 	if (drbd_request_cache)
2726 		kmem_cache_destroy(drbd_request_cache);
2727 	if (drbd_bm_ext_cache)
2728 		kmem_cache_destroy(drbd_bm_ext_cache);
2729 	if (drbd_al_ext_cache)
2730 		kmem_cache_destroy(drbd_al_ext_cache);
2731 
2732 	drbd_ee_mempool      = NULL;
2733 	drbd_request_mempool = NULL;
2734 	drbd_ee_cache        = NULL;
2735 	drbd_request_cache   = NULL;
2736 	drbd_bm_ext_cache    = NULL;
2737 	drbd_al_ext_cache    = NULL;
2738 
2739 	return;
2740 }
2741 
2742 static int drbd_create_mempools(void)
2743 {
2744 	struct page *page;
2745 	const int number = (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE) * minor_count;
2746 	int i;
2747 
2748 	/* prepare our caches and mempools */
2749 	drbd_request_mempool = NULL;
2750 	drbd_ee_cache        = NULL;
2751 	drbd_request_cache   = NULL;
2752 	drbd_bm_ext_cache    = NULL;
2753 	drbd_al_ext_cache    = NULL;
2754 	drbd_pp_pool         = NULL;
2755 
2756 	/* caches */
2757 	drbd_request_cache = kmem_cache_create(
2758 		"drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2759 	if (drbd_request_cache == NULL)
2760 		goto Enomem;
2761 
2762 	drbd_ee_cache = kmem_cache_create(
2763 		"drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
2764 	if (drbd_ee_cache == NULL)
2765 		goto Enomem;
2766 
2767 	drbd_bm_ext_cache = kmem_cache_create(
2768 		"drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2769 	if (drbd_bm_ext_cache == NULL)
2770 		goto Enomem;
2771 
2772 	drbd_al_ext_cache = kmem_cache_create(
2773 		"drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2774 	if (drbd_al_ext_cache == NULL)
2775 		goto Enomem;
2776 
2777 	/* mempools */
2778 	drbd_request_mempool = mempool_create(number,
2779 		mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2780 	if (drbd_request_mempool == NULL)
2781 		goto Enomem;
2782 
2783 	drbd_ee_mempool = mempool_create(number,
2784 		mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2785 	if (drbd_request_mempool == NULL)
2786 		goto Enomem;
2787 
2788 	/* drbd's page pool */
2789 	spin_lock_init(&drbd_pp_lock);
2790 
2791 	for (i = 0; i < number; i++) {
2792 		page = alloc_page(GFP_HIGHUSER);
2793 		if (!page)
2794 			goto Enomem;
2795 		set_page_private(page, (unsigned long)drbd_pp_pool);
2796 		drbd_pp_pool = page;
2797 	}
2798 	drbd_pp_vacant = number;
2799 
2800 	return 0;
2801 
2802 Enomem:
2803 	drbd_destroy_mempools(); /* in case we allocated some */
2804 	return -ENOMEM;
2805 }
2806 
2807 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2808 	void *unused)
2809 {
2810 	/* just so we have it.  you never know what interesting things we
2811 	 * might want to do here some day...
2812 	 */
2813 
2814 	return NOTIFY_DONE;
2815 }
2816 
2817 static struct notifier_block drbd_notifier = {
2818 	.notifier_call = drbd_notify_sys,
2819 };
2820 
2821 static void drbd_release_ee_lists(struct drbd_conf *mdev)
2822 {
2823 	int rr;
2824 
2825 	rr = drbd_release_ee(mdev, &mdev->active_ee);
2826 	if (rr)
2827 		dev_err(DEV, "%d EEs in active list found!\n", rr);
2828 
2829 	rr = drbd_release_ee(mdev, &mdev->sync_ee);
2830 	if (rr)
2831 		dev_err(DEV, "%d EEs in sync list found!\n", rr);
2832 
2833 	rr = drbd_release_ee(mdev, &mdev->read_ee);
2834 	if (rr)
2835 		dev_err(DEV, "%d EEs in read list found!\n", rr);
2836 
2837 	rr = drbd_release_ee(mdev, &mdev->done_ee);
2838 	if (rr)
2839 		dev_err(DEV, "%d EEs in done list found!\n", rr);
2840 
2841 	rr = drbd_release_ee(mdev, &mdev->net_ee);
2842 	if (rr)
2843 		dev_err(DEV, "%d EEs in net list found!\n", rr);
2844 }
2845 
2846 /* caution. no locking.
2847  * currently only used from module cleanup code. */
2848 static void drbd_delete_device(unsigned int minor)
2849 {
2850 	struct drbd_conf *mdev = minor_to_mdev(minor);
2851 
2852 	if (!mdev)
2853 		return;
2854 
2855 	/* paranoia asserts */
2856 	if (mdev->open_cnt != 0)
2857 		dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
2858 				__FILE__ , __LINE__);
2859 
2860 	ERR_IF (!list_empty(&mdev->data.work.q)) {
2861 		struct list_head *lp;
2862 		list_for_each(lp, &mdev->data.work.q) {
2863 			dev_err(DEV, "lp = %p\n", lp);
2864 		}
2865 	};
2866 	/* end paranoia asserts */
2867 
2868 	del_gendisk(mdev->vdisk);
2869 
2870 	/* cleanup stuff that may have been allocated during
2871 	 * device (re-)configuration or state changes */
2872 
2873 	if (mdev->this_bdev)
2874 		bdput(mdev->this_bdev);
2875 
2876 	drbd_free_resources(mdev);
2877 
2878 	drbd_release_ee_lists(mdev);
2879 
2880 	/* should be free'd on disconnect? */
2881 	kfree(mdev->ee_hash);
2882 	/*
2883 	mdev->ee_hash_s = 0;
2884 	mdev->ee_hash = NULL;
2885 	*/
2886 
2887 	lc_destroy(mdev->act_log);
2888 	lc_destroy(mdev->resync);
2889 
2890 	kfree(mdev->p_uuid);
2891 	/* mdev->p_uuid = NULL; */
2892 
2893 	kfree(mdev->int_dig_out);
2894 	kfree(mdev->int_dig_in);
2895 	kfree(mdev->int_dig_vv);
2896 
2897 	/* cleanup the rest that has been
2898 	 * allocated from drbd_new_device
2899 	 * and actually free the mdev itself */
2900 	drbd_free_mdev(mdev);
2901 }
2902 
2903 static void drbd_cleanup(void)
2904 {
2905 	unsigned int i;
2906 
2907 	unregister_reboot_notifier(&drbd_notifier);
2908 
2909 	drbd_nl_cleanup();
2910 
2911 	if (minor_table) {
2912 		if (drbd_proc)
2913 			remove_proc_entry("drbd", NULL);
2914 		i = minor_count;
2915 		while (i--)
2916 			drbd_delete_device(i);
2917 		drbd_destroy_mempools();
2918 	}
2919 
2920 	kfree(minor_table);
2921 
2922 	unregister_blkdev(DRBD_MAJOR, "drbd");
2923 
2924 	printk(KERN_INFO "drbd: module cleanup done.\n");
2925 }
2926 
2927 /**
2928  * drbd_congested() - Callback for pdflush
2929  * @congested_data:	User data
2930  * @bdi_bits:		Bits pdflush is currently interested in
2931  *
2932  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2933  */
2934 static int drbd_congested(void *congested_data, int bdi_bits)
2935 {
2936 	struct drbd_conf *mdev = congested_data;
2937 	struct request_queue *q;
2938 	char reason = '-';
2939 	int r = 0;
2940 
2941 	if (!__inc_ap_bio_cond(mdev)) {
2942 		/* DRBD has frozen IO */
2943 		r = bdi_bits;
2944 		reason = 'd';
2945 		goto out;
2946 	}
2947 
2948 	if (get_ldev(mdev)) {
2949 		q = bdev_get_queue(mdev->ldev->backing_bdev);
2950 		r = bdi_congested(&q->backing_dev_info, bdi_bits);
2951 		put_ldev(mdev);
2952 		if (r)
2953 			reason = 'b';
2954 	}
2955 
2956 	if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
2957 		r |= (1 << BDI_async_congested);
2958 		reason = reason == 'b' ? 'a' : 'n';
2959 	}
2960 
2961 out:
2962 	mdev->congestion_reason = reason;
2963 	return r;
2964 }
2965 
2966 struct drbd_conf *drbd_new_device(unsigned int minor)
2967 {
2968 	struct drbd_conf *mdev;
2969 	struct gendisk *disk;
2970 	struct request_queue *q;
2971 
2972 	/* GFP_KERNEL, we are outside of all write-out paths */
2973 	mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2974 	if (!mdev)
2975 		return NULL;
2976 	if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
2977 		goto out_no_cpumask;
2978 
2979 	mdev->minor = minor;
2980 
2981 	drbd_init_set_defaults(mdev);
2982 
2983 	q = blk_alloc_queue(GFP_KERNEL);
2984 	if (!q)
2985 		goto out_no_q;
2986 	mdev->rq_queue = q;
2987 	q->queuedata   = mdev;
2988 
2989 	disk = alloc_disk(1);
2990 	if (!disk)
2991 		goto out_no_disk;
2992 	mdev->vdisk = disk;
2993 
2994 	set_disk_ro(disk, TRUE);
2995 
2996 	disk->queue = q;
2997 	disk->major = DRBD_MAJOR;
2998 	disk->first_minor = minor;
2999 	disk->fops = &drbd_ops;
3000 	sprintf(disk->disk_name, "drbd%d", minor);
3001 	disk->private_data = mdev;
3002 
3003 	mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3004 	/* we have no partitions. we contain only ourselves. */
3005 	mdev->this_bdev->bd_contains = mdev->this_bdev;
3006 
3007 	q->backing_dev_info.congested_fn = drbd_congested;
3008 	q->backing_dev_info.congested_data = mdev;
3009 
3010 	blk_queue_make_request(q, drbd_make_request_26);
3011 	blk_queue_max_segment_size(q, DRBD_MAX_SEGMENT_SIZE);
3012 	blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3013 	blk_queue_merge_bvec(q, drbd_merge_bvec);
3014 	q->queue_lock = &mdev->req_lock; /* needed since we use */
3015 		/* plugging on a queue, that actually has no requests! */
3016 	q->unplug_fn = drbd_unplug_fn;
3017 
3018 	mdev->md_io_page = alloc_page(GFP_KERNEL);
3019 	if (!mdev->md_io_page)
3020 		goto out_no_io_page;
3021 
3022 	if (drbd_bm_init(mdev))
3023 		goto out_no_bitmap;
3024 	/* no need to lock access, we are still initializing this minor device. */
3025 	if (!tl_init(mdev))
3026 		goto out_no_tl;
3027 
3028 	mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3029 	if (!mdev->app_reads_hash)
3030 		goto out_no_app_reads;
3031 
3032 	mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3033 	if (!mdev->current_epoch)
3034 		goto out_no_epoch;
3035 
3036 	INIT_LIST_HEAD(&mdev->current_epoch->list);
3037 	mdev->epochs = 1;
3038 
3039 	return mdev;
3040 
3041 /* out_whatever_else:
3042 	kfree(mdev->current_epoch); */
3043 out_no_epoch:
3044 	kfree(mdev->app_reads_hash);
3045 out_no_app_reads:
3046 	tl_cleanup(mdev);
3047 out_no_tl:
3048 	drbd_bm_cleanup(mdev);
3049 out_no_bitmap:
3050 	__free_page(mdev->md_io_page);
3051 out_no_io_page:
3052 	put_disk(disk);
3053 out_no_disk:
3054 	blk_cleanup_queue(q);
3055 out_no_q:
3056 	free_cpumask_var(mdev->cpu_mask);
3057 out_no_cpumask:
3058 	kfree(mdev);
3059 	return NULL;
3060 }
3061 
3062 /* counterpart of drbd_new_device.
3063  * last part of drbd_delete_device. */
3064 void drbd_free_mdev(struct drbd_conf *mdev)
3065 {
3066 	kfree(mdev->current_epoch);
3067 	kfree(mdev->app_reads_hash);
3068 	tl_cleanup(mdev);
3069 	if (mdev->bitmap) /* should no longer be there. */
3070 		drbd_bm_cleanup(mdev);
3071 	__free_page(mdev->md_io_page);
3072 	put_disk(mdev->vdisk);
3073 	blk_cleanup_queue(mdev->rq_queue);
3074 	free_cpumask_var(mdev->cpu_mask);
3075 	kfree(mdev);
3076 }
3077 
3078 
3079 int __init drbd_init(void)
3080 {
3081 	int err;
3082 
3083 	if (sizeof(struct p_handshake) != 80) {
3084 		printk(KERN_ERR
3085 		       "drbd: never change the size or layout "
3086 		       "of the HandShake packet.\n");
3087 		return -EINVAL;
3088 	}
3089 
3090 	if (1 > minor_count || minor_count > 255) {
3091 		printk(KERN_ERR
3092 			"drbd: invalid minor_count (%d)\n", minor_count);
3093 #ifdef MODULE
3094 		return -EINVAL;
3095 #else
3096 		minor_count = 8;
3097 #endif
3098 	}
3099 
3100 	err = drbd_nl_init();
3101 	if (err)
3102 		return err;
3103 
3104 	err = register_blkdev(DRBD_MAJOR, "drbd");
3105 	if (err) {
3106 		printk(KERN_ERR
3107 		       "drbd: unable to register block device major %d\n",
3108 		       DRBD_MAJOR);
3109 		return err;
3110 	}
3111 
3112 	register_reboot_notifier(&drbd_notifier);
3113 
3114 	/*
3115 	 * allocate all necessary structs
3116 	 */
3117 	err = -ENOMEM;
3118 
3119 	init_waitqueue_head(&drbd_pp_wait);
3120 
3121 	drbd_proc = NULL; /* play safe for drbd_cleanup */
3122 	minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3123 				GFP_KERNEL);
3124 	if (!minor_table)
3125 		goto Enomem;
3126 
3127 	err = drbd_create_mempools();
3128 	if (err)
3129 		goto Enomem;
3130 
3131 	drbd_proc = proc_create("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops);
3132 	if (!drbd_proc)	{
3133 		printk(KERN_ERR "drbd: unable to register proc file\n");
3134 		goto Enomem;
3135 	}
3136 
3137 	rwlock_init(&global_state_lock);
3138 
3139 	printk(KERN_INFO "drbd: initialized. "
3140 	       "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3141 	       API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3142 	printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3143 	printk(KERN_INFO "drbd: registered as block device major %d\n",
3144 		DRBD_MAJOR);
3145 	printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3146 
3147 	return 0; /* Success! */
3148 
3149 Enomem:
3150 	drbd_cleanup();
3151 	if (err == -ENOMEM)
3152 		/* currently always the case */
3153 		printk(KERN_ERR "drbd: ran out of memory\n");
3154 	else
3155 		printk(KERN_ERR "drbd: initialization failure\n");
3156 	return err;
3157 }
3158 
3159 void drbd_free_bc(struct drbd_backing_dev *ldev)
3160 {
3161 	if (ldev == NULL)
3162 		return;
3163 
3164 	bd_release(ldev->backing_bdev);
3165 	bd_release(ldev->md_bdev);
3166 
3167 	fput(ldev->lo_file);
3168 	fput(ldev->md_file);
3169 
3170 	kfree(ldev);
3171 }
3172 
3173 void drbd_free_sock(struct drbd_conf *mdev)
3174 {
3175 	if (mdev->data.socket) {
3176 		mutex_lock(&mdev->data.mutex);
3177 		kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3178 		sock_release(mdev->data.socket);
3179 		mdev->data.socket = NULL;
3180 		mutex_unlock(&mdev->data.mutex);
3181 	}
3182 	if (mdev->meta.socket) {
3183 		mutex_lock(&mdev->meta.mutex);
3184 		kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3185 		sock_release(mdev->meta.socket);
3186 		mdev->meta.socket = NULL;
3187 		mutex_unlock(&mdev->meta.mutex);
3188 	}
3189 }
3190 
3191 
3192 void drbd_free_resources(struct drbd_conf *mdev)
3193 {
3194 	crypto_free_hash(mdev->csums_tfm);
3195 	mdev->csums_tfm = NULL;
3196 	crypto_free_hash(mdev->verify_tfm);
3197 	mdev->verify_tfm = NULL;
3198 	crypto_free_hash(mdev->cram_hmac_tfm);
3199 	mdev->cram_hmac_tfm = NULL;
3200 	crypto_free_hash(mdev->integrity_w_tfm);
3201 	mdev->integrity_w_tfm = NULL;
3202 	crypto_free_hash(mdev->integrity_r_tfm);
3203 	mdev->integrity_r_tfm = NULL;
3204 
3205 	drbd_free_sock(mdev);
3206 
3207 	__no_warn(local,
3208 		  drbd_free_bc(mdev->ldev);
3209 		  mdev->ldev = NULL;);
3210 }
3211 
3212 /* meta data management */
3213 
3214 struct meta_data_on_disk {
3215 	u64 la_size;           /* last agreed size. */
3216 	u64 uuid[UI_SIZE];   /* UUIDs. */
3217 	u64 device_uuid;
3218 	u64 reserved_u64_1;
3219 	u32 flags;             /* MDF */
3220 	u32 magic;
3221 	u32 md_size_sect;
3222 	u32 al_offset;         /* offset to this block */
3223 	u32 al_nr_extents;     /* important for restoring the AL */
3224 	      /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3225 	u32 bm_offset;         /* offset to the bitmap, from here */
3226 	u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3227 	u32 reserved_u32[4];
3228 
3229 } __packed;
3230 
3231 /**
3232  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3233  * @mdev:	DRBD device.
3234  */
3235 void drbd_md_sync(struct drbd_conf *mdev)
3236 {
3237 	struct meta_data_on_disk *buffer;
3238 	sector_t sector;
3239 	int i;
3240 
3241 	if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3242 		return;
3243 	del_timer(&mdev->md_sync_timer);
3244 
3245 	/* We use here D_FAILED and not D_ATTACHING because we try to write
3246 	 * metadata even if we detach due to a disk failure! */
3247 	if (!get_ldev_if_state(mdev, D_FAILED))
3248 		return;
3249 
3250 	mutex_lock(&mdev->md_io_mutex);
3251 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3252 	memset(buffer, 0, 512);
3253 
3254 	buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3255 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3256 		buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3257 	buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3258 	buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3259 
3260 	buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3261 	buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3262 	buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3263 	buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3264 	buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3265 
3266 	buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3267 
3268 	D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3269 	sector = mdev->ldev->md.md_offset;
3270 
3271 	if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3272 		clear_bit(MD_DIRTY, &mdev->flags);
3273 	} else {
3274 		/* this was a try anyways ... */
3275 		dev_err(DEV, "meta data update failed!\n");
3276 
3277 		drbd_chk_io_error(mdev, 1, TRUE);
3278 	}
3279 
3280 	/* Update mdev->ldev->md.la_size_sect,
3281 	 * since we updated it on metadata. */
3282 	mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3283 
3284 	mutex_unlock(&mdev->md_io_mutex);
3285 	put_ldev(mdev);
3286 }
3287 
3288 /**
3289  * drbd_md_read() - Reads in the meta data super block
3290  * @mdev:	DRBD device.
3291  * @bdev:	Device from which the meta data should be read in.
3292  *
3293  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_codes in case
3294  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3295  */
3296 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3297 {
3298 	struct meta_data_on_disk *buffer;
3299 	int i, rv = NO_ERROR;
3300 
3301 	if (!get_ldev_if_state(mdev, D_ATTACHING))
3302 		return ERR_IO_MD_DISK;
3303 
3304 	mutex_lock(&mdev->md_io_mutex);
3305 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3306 
3307 	if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3308 		/* NOTE: cant do normal error processing here as this is
3309 		   called BEFORE disk is attached */
3310 		dev_err(DEV, "Error while reading metadata.\n");
3311 		rv = ERR_IO_MD_DISK;
3312 		goto err;
3313 	}
3314 
3315 	if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3316 		dev_err(DEV, "Error while reading metadata, magic not found.\n");
3317 		rv = ERR_MD_INVALID;
3318 		goto err;
3319 	}
3320 	if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3321 		dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3322 		    be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3323 		rv = ERR_MD_INVALID;
3324 		goto err;
3325 	}
3326 	if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3327 		dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3328 		    be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3329 		rv = ERR_MD_INVALID;
3330 		goto err;
3331 	}
3332 	if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3333 		dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3334 		    be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3335 		rv = ERR_MD_INVALID;
3336 		goto err;
3337 	}
3338 
3339 	if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3340 		dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3341 		    be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3342 		rv = ERR_MD_INVALID;
3343 		goto err;
3344 	}
3345 
3346 	bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3347 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3348 		bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3349 	bdev->md.flags = be32_to_cpu(buffer->flags);
3350 	mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3351 	bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3352 
3353 	if (mdev->sync_conf.al_extents < 7)
3354 		mdev->sync_conf.al_extents = 127;
3355 
3356  err:
3357 	mutex_unlock(&mdev->md_io_mutex);
3358 	put_ldev(mdev);
3359 
3360 	return rv;
3361 }
3362 
3363 /**
3364  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3365  * @mdev:	DRBD device.
3366  *
3367  * Call this function if you change anything that should be written to
3368  * the meta-data super block. This function sets MD_DIRTY, and starts a
3369  * timer that ensures that within five seconds you have to call drbd_md_sync().
3370  */
3371 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3372 {
3373 	set_bit(MD_DIRTY, &mdev->flags);
3374 	mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3375 }
3376 
3377 
3378 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3379 {
3380 	int i;
3381 
3382 	for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3383 		mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3384 }
3385 
3386 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3387 {
3388 	if (idx == UI_CURRENT) {
3389 		if (mdev->state.role == R_PRIMARY)
3390 			val |= 1;
3391 		else
3392 			val &= ~((u64)1);
3393 
3394 		drbd_set_ed_uuid(mdev, val);
3395 	}
3396 
3397 	mdev->ldev->md.uuid[idx] = val;
3398 	drbd_md_mark_dirty(mdev);
3399 }
3400 
3401 
3402 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3403 {
3404 	if (mdev->ldev->md.uuid[idx]) {
3405 		drbd_uuid_move_history(mdev);
3406 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3407 	}
3408 	_drbd_uuid_set(mdev, idx, val);
3409 }
3410 
3411 /**
3412  * drbd_uuid_new_current() - Creates a new current UUID
3413  * @mdev:	DRBD device.
3414  *
3415  * Creates a new current UUID, and rotates the old current UUID into
3416  * the bitmap slot. Causes an incremental resync upon next connect.
3417  */
3418 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3419 {
3420 	u64 val;
3421 
3422 	dev_info(DEV, "Creating new current UUID\n");
3423 	D_ASSERT(mdev->ldev->md.uuid[UI_BITMAP] == 0);
3424 	mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3425 
3426 	get_random_bytes(&val, sizeof(u64));
3427 	_drbd_uuid_set(mdev, UI_CURRENT, val);
3428 }
3429 
3430 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3431 {
3432 	if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3433 		return;
3434 
3435 	if (val == 0) {
3436 		drbd_uuid_move_history(mdev);
3437 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3438 		mdev->ldev->md.uuid[UI_BITMAP] = 0;
3439 	} else {
3440 		if (mdev->ldev->md.uuid[UI_BITMAP])
3441 			dev_warn(DEV, "bm UUID already set");
3442 
3443 		mdev->ldev->md.uuid[UI_BITMAP] = val;
3444 		mdev->ldev->md.uuid[UI_BITMAP] &= ~((u64)1);
3445 
3446 	}
3447 	drbd_md_mark_dirty(mdev);
3448 }
3449 
3450 /**
3451  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3452  * @mdev:	DRBD device.
3453  *
3454  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3455  */
3456 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3457 {
3458 	int rv = -EIO;
3459 
3460 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
3461 		drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3462 		drbd_md_sync(mdev);
3463 		drbd_bm_set_all(mdev);
3464 
3465 		rv = drbd_bm_write(mdev);
3466 
3467 		if (!rv) {
3468 			drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3469 			drbd_md_sync(mdev);
3470 		}
3471 
3472 		put_ldev(mdev);
3473 	}
3474 
3475 	return rv;
3476 }
3477 
3478 /**
3479  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3480  * @mdev:	DRBD device.
3481  *
3482  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3483  */
3484 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3485 {
3486 	int rv = -EIO;
3487 
3488 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
3489 		drbd_bm_clear_all(mdev);
3490 		rv = drbd_bm_write(mdev);
3491 		put_ldev(mdev);
3492 	}
3493 
3494 	return rv;
3495 }
3496 
3497 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3498 {
3499 	struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3500 	int rv;
3501 
3502 	D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3503 
3504 	drbd_bm_lock(mdev, work->why);
3505 	rv = work->io_fn(mdev);
3506 	drbd_bm_unlock(mdev);
3507 
3508 	clear_bit(BITMAP_IO, &mdev->flags);
3509 	wake_up(&mdev->misc_wait);
3510 
3511 	if (work->done)
3512 		work->done(mdev, rv);
3513 
3514 	clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3515 	work->why = NULL;
3516 
3517 	return 1;
3518 }
3519 
3520 /**
3521  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3522  * @mdev:	DRBD device.
3523  * @io_fn:	IO callback to be called when bitmap IO is possible
3524  * @done:	callback to be called after the bitmap IO was performed
3525  * @why:	Descriptive text of the reason for doing the IO
3526  *
3527  * While IO on the bitmap happens we freeze application IO thus we ensure
3528  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3529  * called from worker context. It MUST NOT be used while a previous such
3530  * work is still pending!
3531  */
3532 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3533 			  int (*io_fn)(struct drbd_conf *),
3534 			  void (*done)(struct drbd_conf *, int),
3535 			  char *why)
3536 {
3537 	D_ASSERT(current == mdev->worker.task);
3538 
3539 	D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3540 	D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3541 	D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3542 	if (mdev->bm_io_work.why)
3543 		dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3544 			why, mdev->bm_io_work.why);
3545 
3546 	mdev->bm_io_work.io_fn = io_fn;
3547 	mdev->bm_io_work.done = done;
3548 	mdev->bm_io_work.why = why;
3549 
3550 	set_bit(BITMAP_IO, &mdev->flags);
3551 	if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3552 		if (list_empty(&mdev->bm_io_work.w.list)) {
3553 			set_bit(BITMAP_IO_QUEUED, &mdev->flags);
3554 			drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
3555 		} else
3556 			dev_err(DEV, "FIXME avoided double queuing bm_io_work\n");
3557 	}
3558 }
3559 
3560 /**
3561  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3562  * @mdev:	DRBD device.
3563  * @io_fn:	IO callback to be called when bitmap IO is possible
3564  * @why:	Descriptive text of the reason for doing the IO
3565  *
3566  * freezes application IO while that the actual IO operations runs. This
3567  * functions MAY NOT be called from worker context.
3568  */
3569 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why)
3570 {
3571 	int rv;
3572 
3573 	D_ASSERT(current != mdev->worker.task);
3574 
3575 	drbd_suspend_io(mdev);
3576 
3577 	drbd_bm_lock(mdev, why);
3578 	rv = io_fn(mdev);
3579 	drbd_bm_unlock(mdev);
3580 
3581 	drbd_resume_io(mdev);
3582 
3583 	return rv;
3584 }
3585 
3586 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3587 {
3588 	if ((mdev->ldev->md.flags & flag) != flag) {
3589 		drbd_md_mark_dirty(mdev);
3590 		mdev->ldev->md.flags |= flag;
3591 	}
3592 }
3593 
3594 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3595 {
3596 	if ((mdev->ldev->md.flags & flag) != 0) {
3597 		drbd_md_mark_dirty(mdev);
3598 		mdev->ldev->md.flags &= ~flag;
3599 	}
3600 }
3601 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3602 {
3603 	return (bdev->md.flags & flag) != 0;
3604 }
3605 
3606 static void md_sync_timer_fn(unsigned long data)
3607 {
3608 	struct drbd_conf *mdev = (struct drbd_conf *) data;
3609 
3610 	drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
3611 }
3612 
3613 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3614 {
3615 	dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3616 	drbd_md_sync(mdev);
3617 
3618 	return 1;
3619 }
3620 
3621 #ifdef CONFIG_DRBD_FAULT_INJECTION
3622 /* Fault insertion support including random number generator shamelessly
3623  * stolen from kernel/rcutorture.c */
3624 struct fault_random_state {
3625 	unsigned long state;
3626 	unsigned long count;
3627 };
3628 
3629 #define FAULT_RANDOM_MULT 39916801  /* prime */
3630 #define FAULT_RANDOM_ADD	479001701 /* prime */
3631 #define FAULT_RANDOM_REFRESH 10000
3632 
3633 /*
3634  * Crude but fast random-number generator.  Uses a linear congruential
3635  * generator, with occasional help from get_random_bytes().
3636  */
3637 static unsigned long
3638 _drbd_fault_random(struct fault_random_state *rsp)
3639 {
3640 	long refresh;
3641 
3642 	if (!rsp->count--) {
3643 		get_random_bytes(&refresh, sizeof(refresh));
3644 		rsp->state += refresh;
3645 		rsp->count = FAULT_RANDOM_REFRESH;
3646 	}
3647 	rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3648 	return swahw32(rsp->state);
3649 }
3650 
3651 static char *
3652 _drbd_fault_str(unsigned int type) {
3653 	static char *_faults[] = {
3654 		[DRBD_FAULT_MD_WR] = "Meta-data write",
3655 		[DRBD_FAULT_MD_RD] = "Meta-data read",
3656 		[DRBD_FAULT_RS_WR] = "Resync write",
3657 		[DRBD_FAULT_RS_RD] = "Resync read",
3658 		[DRBD_FAULT_DT_WR] = "Data write",
3659 		[DRBD_FAULT_DT_RD] = "Data read",
3660 		[DRBD_FAULT_DT_RA] = "Data read ahead",
3661 		[DRBD_FAULT_BM_ALLOC] = "BM allocation",
3662 		[DRBD_FAULT_AL_EE] = "EE allocation"
3663 	};
3664 
3665 	return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3666 }
3667 
3668 unsigned int
3669 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3670 {
3671 	static struct fault_random_state rrs = {0, 0};
3672 
3673 	unsigned int ret = (
3674 		(fault_devs == 0 ||
3675 			((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3676 		(((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3677 
3678 	if (ret) {
3679 		fault_count++;
3680 
3681 		if (printk_ratelimit())
3682 			dev_warn(DEV, "***Simulating %s failure\n",
3683 				_drbd_fault_str(type));
3684 	}
3685 
3686 	return ret;
3687 }
3688 #endif
3689 
3690 const char *drbd_buildtag(void)
3691 {
3692 	/* DRBD built from external sources has here a reference to the
3693 	   git hash of the source code. */
3694 
3695 	static char buildtag[38] = "\0uilt-in";
3696 
3697 	if (buildtag[0] == 0) {
3698 #ifdef CONFIG_MODULES
3699 		if (THIS_MODULE != NULL)
3700 			sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3701 		else
3702 #endif
3703 			buildtag[0] = 'b';
3704 	}
3705 
3706 	return buildtag;
3707 }
3708 
3709 module_init(drbd_init)
3710 module_exit(drbd_cleanup)
3711 
3712 EXPORT_SYMBOL(drbd_conn_str);
3713 EXPORT_SYMBOL(drbd_role_str);
3714 EXPORT_SYMBOL(drbd_disk_str);
3715 EXPORT_SYMBOL(drbd_set_st_err_str);
3716