xref: /linux/drivers/block/drbd/drbd_main.c (revision cb299ba8b5ef2239429484072fea394cd7581bd7)
1 /*
2    drbd.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12 
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17 
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22 
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 
27  */
28 
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48 
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52 
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56 
57 #include "drbd_vli.h"
58 
59 struct after_state_chg_work {
60 	struct drbd_work w;
61 	union drbd_state os;
62 	union drbd_state ns;
63 	enum chg_state_flags flags;
64 	struct completion *done;
65 };
66 
67 static DEFINE_MUTEX(drbd_main_mutex);
68 int drbdd_init(struct drbd_thread *);
69 int drbd_worker(struct drbd_thread *);
70 int drbd_asender(struct drbd_thread *);
71 
72 int drbd_init(void);
73 static int drbd_open(struct block_device *bdev, fmode_t mode);
74 static int drbd_release(struct gendisk *gd, fmode_t mode);
75 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
76 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
77 			   union drbd_state ns, enum chg_state_flags flags);
78 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
79 static void md_sync_timer_fn(unsigned long data);
80 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
81 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused);
82 
83 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
84 	      "Lars Ellenberg <lars@linbit.com>");
85 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
86 MODULE_VERSION(REL_VERSION);
87 MODULE_LICENSE("GPL");
88 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices (1-255)");
89 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
90 
91 #include <linux/moduleparam.h>
92 /* allow_open_on_secondary */
93 MODULE_PARM_DESC(allow_oos, "DONT USE!");
94 /* thanks to these macros, if compiled into the kernel (not-module),
95  * this becomes the boot parameter drbd.minor_count */
96 module_param(minor_count, uint, 0444);
97 module_param(disable_sendpage, bool, 0644);
98 module_param(allow_oos, bool, 0);
99 module_param(cn_idx, uint, 0444);
100 module_param(proc_details, int, 0644);
101 
102 #ifdef CONFIG_DRBD_FAULT_INJECTION
103 int enable_faults;
104 int fault_rate;
105 static int fault_count;
106 int fault_devs;
107 /* bitmap of enabled faults */
108 module_param(enable_faults, int, 0664);
109 /* fault rate % value - applies to all enabled faults */
110 module_param(fault_rate, int, 0664);
111 /* count of faults inserted */
112 module_param(fault_count, int, 0664);
113 /* bitmap of devices to insert faults on */
114 module_param(fault_devs, int, 0644);
115 #endif
116 
117 /* module parameter, defined */
118 unsigned int minor_count = 32;
119 int disable_sendpage;
120 int allow_oos;
121 unsigned int cn_idx = CN_IDX_DRBD;
122 int proc_details;       /* Detail level in proc drbd*/
123 
124 /* Module parameter for setting the user mode helper program
125  * to run. Default is /sbin/drbdadm */
126 char usermode_helper[80] = "/sbin/drbdadm";
127 
128 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
129 
130 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
131  * as member "struct gendisk *vdisk;"
132  */
133 struct drbd_conf **minor_table;
134 
135 struct kmem_cache *drbd_request_cache;
136 struct kmem_cache *drbd_ee_cache;	/* epoch entries */
137 struct kmem_cache *drbd_bm_ext_cache;	/* bitmap extents */
138 struct kmem_cache *drbd_al_ext_cache;	/* activity log extents */
139 mempool_t *drbd_request_mempool;
140 mempool_t *drbd_ee_mempool;
141 
142 /* I do not use a standard mempool, because:
143    1) I want to hand out the pre-allocated objects first.
144    2) I want to be able to interrupt sleeping allocation with a signal.
145    Note: This is a single linked list, the next pointer is the private
146 	 member of struct page.
147  */
148 struct page *drbd_pp_pool;
149 spinlock_t   drbd_pp_lock;
150 int          drbd_pp_vacant;
151 wait_queue_head_t drbd_pp_wait;
152 
153 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
154 
155 static const struct block_device_operations drbd_ops = {
156 	.owner =   THIS_MODULE,
157 	.open =    drbd_open,
158 	.release = drbd_release,
159 };
160 
161 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
162 
163 #ifdef __CHECKER__
164 /* When checking with sparse, and this is an inline function, sparse will
165    give tons of false positives. When this is a real functions sparse works.
166  */
167 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
168 {
169 	int io_allowed;
170 
171 	atomic_inc(&mdev->local_cnt);
172 	io_allowed = (mdev->state.disk >= mins);
173 	if (!io_allowed) {
174 		if (atomic_dec_and_test(&mdev->local_cnt))
175 			wake_up(&mdev->misc_wait);
176 	}
177 	return io_allowed;
178 }
179 
180 #endif
181 
182 /**
183  * DOC: The transfer log
184  *
185  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
186  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
187  * of the list. There is always at least one &struct drbd_tl_epoch object.
188  *
189  * Each &struct drbd_tl_epoch has a circular double linked list of requests
190  * attached.
191  */
192 static int tl_init(struct drbd_conf *mdev)
193 {
194 	struct drbd_tl_epoch *b;
195 
196 	/* during device minor initialization, we may well use GFP_KERNEL */
197 	b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
198 	if (!b)
199 		return 0;
200 	INIT_LIST_HEAD(&b->requests);
201 	INIT_LIST_HEAD(&b->w.list);
202 	b->next = NULL;
203 	b->br_number = 4711;
204 	b->n_writes = 0;
205 	b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
206 
207 	mdev->oldest_tle = b;
208 	mdev->newest_tle = b;
209 	INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
210 
211 	mdev->tl_hash = NULL;
212 	mdev->tl_hash_s = 0;
213 
214 	return 1;
215 }
216 
217 static void tl_cleanup(struct drbd_conf *mdev)
218 {
219 	D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
220 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
221 	kfree(mdev->oldest_tle);
222 	mdev->oldest_tle = NULL;
223 	kfree(mdev->unused_spare_tle);
224 	mdev->unused_spare_tle = NULL;
225 	kfree(mdev->tl_hash);
226 	mdev->tl_hash = NULL;
227 	mdev->tl_hash_s = 0;
228 }
229 
230 /**
231  * _tl_add_barrier() - Adds a barrier to the transfer log
232  * @mdev:	DRBD device.
233  * @new:	Barrier to be added before the current head of the TL.
234  *
235  * The caller must hold the req_lock.
236  */
237 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
238 {
239 	struct drbd_tl_epoch *newest_before;
240 
241 	INIT_LIST_HEAD(&new->requests);
242 	INIT_LIST_HEAD(&new->w.list);
243 	new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
244 	new->next = NULL;
245 	new->n_writes = 0;
246 
247 	newest_before = mdev->newest_tle;
248 	/* never send a barrier number == 0, because that is special-cased
249 	 * when using TCQ for our write ordering code */
250 	new->br_number = (newest_before->br_number+1) ?: 1;
251 	if (mdev->newest_tle != new) {
252 		mdev->newest_tle->next = new;
253 		mdev->newest_tle = new;
254 	}
255 }
256 
257 /**
258  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
259  * @mdev:	DRBD device.
260  * @barrier_nr:	Expected identifier of the DRBD write barrier packet.
261  * @set_size:	Expected number of requests before that barrier.
262  *
263  * In case the passed barrier_nr or set_size does not match the oldest
264  * &struct drbd_tl_epoch objects this function will cause a termination
265  * of the connection.
266  */
267 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
268 		       unsigned int set_size)
269 {
270 	struct drbd_tl_epoch *b, *nob; /* next old barrier */
271 	struct list_head *le, *tle;
272 	struct drbd_request *r;
273 
274 	spin_lock_irq(&mdev->req_lock);
275 
276 	b = mdev->oldest_tle;
277 
278 	/* first some paranoia code */
279 	if (b == NULL) {
280 		dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
281 			barrier_nr);
282 		goto bail;
283 	}
284 	if (b->br_number != barrier_nr) {
285 		dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
286 			barrier_nr, b->br_number);
287 		goto bail;
288 	}
289 	if (b->n_writes != set_size) {
290 		dev_err(DEV, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
291 			barrier_nr, set_size, b->n_writes);
292 		goto bail;
293 	}
294 
295 	/* Clean up list of requests processed during current epoch */
296 	list_for_each_safe(le, tle, &b->requests) {
297 		r = list_entry(le, struct drbd_request, tl_requests);
298 		_req_mod(r, barrier_acked);
299 	}
300 	/* There could be requests on the list waiting for completion
301 	   of the write to the local disk. To avoid corruptions of
302 	   slab's data structures we have to remove the lists head.
303 
304 	   Also there could have been a barrier ack out of sequence, overtaking
305 	   the write acks - which would be a bug and violating write ordering.
306 	   To not deadlock in case we lose connection while such requests are
307 	   still pending, we need some way to find them for the
308 	   _req_mode(connection_lost_while_pending).
309 
310 	   These have been list_move'd to the out_of_sequence_requests list in
311 	   _req_mod(, barrier_acked) above.
312 	   */
313 	list_del_init(&b->requests);
314 
315 	nob = b->next;
316 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
317 		_tl_add_barrier(mdev, b);
318 		if (nob)
319 			mdev->oldest_tle = nob;
320 		/* if nob == NULL b was the only barrier, and becomes the new
321 		   barrier. Therefore mdev->oldest_tle points already to b */
322 	} else {
323 		D_ASSERT(nob != NULL);
324 		mdev->oldest_tle = nob;
325 		kfree(b);
326 	}
327 
328 	spin_unlock_irq(&mdev->req_lock);
329 	dec_ap_pending(mdev);
330 
331 	return;
332 
333 bail:
334 	spin_unlock_irq(&mdev->req_lock);
335 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
336 }
337 
338 /**
339  * _tl_restart() - Walks the transfer log, and applies an action to all requests
340  * @mdev:	DRBD device.
341  * @what:       The action/event to perform with all request objects
342  *
343  * @what might be one of connection_lost_while_pending, resend, fail_frozen_disk_io,
344  * restart_frozen_disk_io.
345  */
346 static void _tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
347 {
348 	struct drbd_tl_epoch *b, *tmp, **pn;
349 	struct list_head *le, *tle, carry_reads;
350 	struct drbd_request *req;
351 	int rv, n_writes, n_reads;
352 
353 	b = mdev->oldest_tle;
354 	pn = &mdev->oldest_tle;
355 	while (b) {
356 		n_writes = 0;
357 		n_reads = 0;
358 		INIT_LIST_HEAD(&carry_reads);
359 		list_for_each_safe(le, tle, &b->requests) {
360 			req = list_entry(le, struct drbd_request, tl_requests);
361 			rv = _req_mod(req, what);
362 
363 			n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
364 			n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
365 		}
366 		tmp = b->next;
367 
368 		if (n_writes) {
369 			if (what == resend) {
370 				b->n_writes = n_writes;
371 				if (b->w.cb == NULL) {
372 					b->w.cb = w_send_barrier;
373 					inc_ap_pending(mdev);
374 					set_bit(CREATE_BARRIER, &mdev->flags);
375 				}
376 
377 				drbd_queue_work(&mdev->data.work, &b->w);
378 			}
379 			pn = &b->next;
380 		} else {
381 			if (n_reads)
382 				list_add(&carry_reads, &b->requests);
383 			/* there could still be requests on that ring list,
384 			 * in case local io is still pending */
385 			list_del(&b->requests);
386 
387 			/* dec_ap_pending corresponding to queue_barrier.
388 			 * the newest barrier may not have been queued yet,
389 			 * in which case w.cb is still NULL. */
390 			if (b->w.cb != NULL)
391 				dec_ap_pending(mdev);
392 
393 			if (b == mdev->newest_tle) {
394 				/* recycle, but reinit! */
395 				D_ASSERT(tmp == NULL);
396 				INIT_LIST_HEAD(&b->requests);
397 				list_splice(&carry_reads, &b->requests);
398 				INIT_LIST_HEAD(&b->w.list);
399 				b->w.cb = NULL;
400 				b->br_number = net_random();
401 				b->n_writes = 0;
402 
403 				*pn = b;
404 				break;
405 			}
406 			*pn = tmp;
407 			kfree(b);
408 		}
409 		b = tmp;
410 		list_splice(&carry_reads, &b->requests);
411 	}
412 }
413 
414 
415 /**
416  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
417  * @mdev:	DRBD device.
418  *
419  * This is called after the connection to the peer was lost. The storage covered
420  * by the requests on the transfer gets marked as our of sync. Called from the
421  * receiver thread and the worker thread.
422  */
423 void tl_clear(struct drbd_conf *mdev)
424 {
425 	struct list_head *le, *tle;
426 	struct drbd_request *r;
427 
428 	spin_lock_irq(&mdev->req_lock);
429 
430 	_tl_restart(mdev, connection_lost_while_pending);
431 
432 	/* we expect this list to be empty. */
433 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
434 
435 	/* but just in case, clean it up anyways! */
436 	list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
437 		r = list_entry(le, struct drbd_request, tl_requests);
438 		/* It would be nice to complete outside of spinlock.
439 		 * But this is easier for now. */
440 		_req_mod(r, connection_lost_while_pending);
441 	}
442 
443 	/* ensure bit indicating barrier is required is clear */
444 	clear_bit(CREATE_BARRIER, &mdev->flags);
445 
446 	memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
447 
448 	spin_unlock_irq(&mdev->req_lock);
449 }
450 
451 void tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
452 {
453 	spin_lock_irq(&mdev->req_lock);
454 	_tl_restart(mdev, what);
455 	spin_unlock_irq(&mdev->req_lock);
456 }
457 
458 /**
459  * cl_wide_st_chg() - TRUE if the state change is a cluster wide one
460  * @mdev:	DRBD device.
461  * @os:		old (current) state.
462  * @ns:		new (wanted) state.
463  */
464 static int cl_wide_st_chg(struct drbd_conf *mdev,
465 			  union drbd_state os, union drbd_state ns)
466 {
467 	return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
468 		 ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
469 		  (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
470 		  (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
471 		  (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))) ||
472 		(os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
473 		(os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
474 }
475 
476 int drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
477 		      union drbd_state mask, union drbd_state val)
478 {
479 	unsigned long flags;
480 	union drbd_state os, ns;
481 	int rv;
482 
483 	spin_lock_irqsave(&mdev->req_lock, flags);
484 	os = mdev->state;
485 	ns.i = (os.i & ~mask.i) | val.i;
486 	rv = _drbd_set_state(mdev, ns, f, NULL);
487 	ns = mdev->state;
488 	spin_unlock_irqrestore(&mdev->req_lock, flags);
489 
490 	return rv;
491 }
492 
493 /**
494  * drbd_force_state() - Impose a change which happens outside our control on our state
495  * @mdev:	DRBD device.
496  * @mask:	mask of state bits to change.
497  * @val:	value of new state bits.
498  */
499 void drbd_force_state(struct drbd_conf *mdev,
500 	union drbd_state mask, union drbd_state val)
501 {
502 	drbd_change_state(mdev, CS_HARD, mask, val);
503 }
504 
505 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns);
506 static int is_valid_state_transition(struct drbd_conf *,
507 				     union drbd_state, union drbd_state);
508 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
509 				       union drbd_state ns, const char **warn_sync_abort);
510 int drbd_send_state_req(struct drbd_conf *,
511 			union drbd_state, union drbd_state);
512 
513 static enum drbd_state_ret_codes _req_st_cond(struct drbd_conf *mdev,
514 				    union drbd_state mask, union drbd_state val)
515 {
516 	union drbd_state os, ns;
517 	unsigned long flags;
518 	int rv;
519 
520 	if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
521 		return SS_CW_SUCCESS;
522 
523 	if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
524 		return SS_CW_FAILED_BY_PEER;
525 
526 	rv = 0;
527 	spin_lock_irqsave(&mdev->req_lock, flags);
528 	os = mdev->state;
529 	ns.i = (os.i & ~mask.i) | val.i;
530 	ns = sanitize_state(mdev, os, ns, NULL);
531 
532 	if (!cl_wide_st_chg(mdev, os, ns))
533 		rv = SS_CW_NO_NEED;
534 	if (!rv) {
535 		rv = is_valid_state(mdev, ns);
536 		if (rv == SS_SUCCESS) {
537 			rv = is_valid_state_transition(mdev, ns, os);
538 			if (rv == SS_SUCCESS)
539 				rv = 0; /* cont waiting, otherwise fail. */
540 		}
541 	}
542 	spin_unlock_irqrestore(&mdev->req_lock, flags);
543 
544 	return rv;
545 }
546 
547 /**
548  * drbd_req_state() - Perform an eventually cluster wide state change
549  * @mdev:	DRBD device.
550  * @mask:	mask of state bits to change.
551  * @val:	value of new state bits.
552  * @f:		flags
553  *
554  * Should not be called directly, use drbd_request_state() or
555  * _drbd_request_state().
556  */
557 static int drbd_req_state(struct drbd_conf *mdev,
558 			  union drbd_state mask, union drbd_state val,
559 			  enum chg_state_flags f)
560 {
561 	struct completion done;
562 	unsigned long flags;
563 	union drbd_state os, ns;
564 	int rv;
565 
566 	init_completion(&done);
567 
568 	if (f & CS_SERIALIZE)
569 		mutex_lock(&mdev->state_mutex);
570 
571 	spin_lock_irqsave(&mdev->req_lock, flags);
572 	os = mdev->state;
573 	ns.i = (os.i & ~mask.i) | val.i;
574 	ns = sanitize_state(mdev, os, ns, NULL);
575 
576 	if (cl_wide_st_chg(mdev, os, ns)) {
577 		rv = is_valid_state(mdev, ns);
578 		if (rv == SS_SUCCESS)
579 			rv = is_valid_state_transition(mdev, ns, os);
580 		spin_unlock_irqrestore(&mdev->req_lock, flags);
581 
582 		if (rv < SS_SUCCESS) {
583 			if (f & CS_VERBOSE)
584 				print_st_err(mdev, os, ns, rv);
585 			goto abort;
586 		}
587 
588 		drbd_state_lock(mdev);
589 		if (!drbd_send_state_req(mdev, mask, val)) {
590 			drbd_state_unlock(mdev);
591 			rv = SS_CW_FAILED_BY_PEER;
592 			if (f & CS_VERBOSE)
593 				print_st_err(mdev, os, ns, rv);
594 			goto abort;
595 		}
596 
597 		wait_event(mdev->state_wait,
598 			(rv = _req_st_cond(mdev, mask, val)));
599 
600 		if (rv < SS_SUCCESS) {
601 			drbd_state_unlock(mdev);
602 			if (f & CS_VERBOSE)
603 				print_st_err(mdev, os, ns, rv);
604 			goto abort;
605 		}
606 		spin_lock_irqsave(&mdev->req_lock, flags);
607 		os = mdev->state;
608 		ns.i = (os.i & ~mask.i) | val.i;
609 		rv = _drbd_set_state(mdev, ns, f, &done);
610 		drbd_state_unlock(mdev);
611 	} else {
612 		rv = _drbd_set_state(mdev, ns, f, &done);
613 	}
614 
615 	spin_unlock_irqrestore(&mdev->req_lock, flags);
616 
617 	if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
618 		D_ASSERT(current != mdev->worker.task);
619 		wait_for_completion(&done);
620 	}
621 
622 abort:
623 	if (f & CS_SERIALIZE)
624 		mutex_unlock(&mdev->state_mutex);
625 
626 	return rv;
627 }
628 
629 /**
630  * _drbd_request_state() - Request a state change (with flags)
631  * @mdev:	DRBD device.
632  * @mask:	mask of state bits to change.
633  * @val:	value of new state bits.
634  * @f:		flags
635  *
636  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
637  * flag, or when logging of failed state change requests is not desired.
638  */
639 int _drbd_request_state(struct drbd_conf *mdev,	union drbd_state mask,
640 			union drbd_state val,	enum chg_state_flags f)
641 {
642 	int rv;
643 
644 	wait_event(mdev->state_wait,
645 		   (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
646 
647 	return rv;
648 }
649 
650 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
651 {
652 	dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
653 	    name,
654 	    drbd_conn_str(ns.conn),
655 	    drbd_role_str(ns.role),
656 	    drbd_role_str(ns.peer),
657 	    drbd_disk_str(ns.disk),
658 	    drbd_disk_str(ns.pdsk),
659 	    is_susp(ns) ? 's' : 'r',
660 	    ns.aftr_isp ? 'a' : '-',
661 	    ns.peer_isp ? 'p' : '-',
662 	    ns.user_isp ? 'u' : '-'
663 	    );
664 }
665 
666 void print_st_err(struct drbd_conf *mdev,
667 	union drbd_state os, union drbd_state ns, int err)
668 {
669 	if (err == SS_IN_TRANSIENT_STATE)
670 		return;
671 	dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
672 	print_st(mdev, " state", os);
673 	print_st(mdev, "wanted", ns);
674 }
675 
676 
677 #define drbd_peer_str drbd_role_str
678 #define drbd_pdsk_str drbd_disk_str
679 
680 #define drbd_susp_str(A)     ((A) ? "1" : "0")
681 #define drbd_aftr_isp_str(A) ((A) ? "1" : "0")
682 #define drbd_peer_isp_str(A) ((A) ? "1" : "0")
683 #define drbd_user_isp_str(A) ((A) ? "1" : "0")
684 
685 #define PSC(A) \
686 	({ if (ns.A != os.A) { \
687 		pbp += sprintf(pbp, #A "( %s -> %s ) ", \
688 			      drbd_##A##_str(os.A), \
689 			      drbd_##A##_str(ns.A)); \
690 	} })
691 
692 /**
693  * is_valid_state() - Returns an SS_ error code if ns is not valid
694  * @mdev:	DRBD device.
695  * @ns:		State to consider.
696  */
697 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
698 {
699 	/* See drbd_state_sw_errors in drbd_strings.c */
700 
701 	enum drbd_fencing_p fp;
702 	int rv = SS_SUCCESS;
703 
704 	fp = FP_DONT_CARE;
705 	if (get_ldev(mdev)) {
706 		fp = mdev->ldev->dc.fencing;
707 		put_ldev(mdev);
708 	}
709 
710 	if (get_net_conf(mdev)) {
711 		if (!mdev->net_conf->two_primaries &&
712 		    ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
713 			rv = SS_TWO_PRIMARIES;
714 		put_net_conf(mdev);
715 	}
716 
717 	if (rv <= 0)
718 		/* already found a reason to abort */;
719 	else if (ns.role == R_SECONDARY && mdev->open_cnt)
720 		rv = SS_DEVICE_IN_USE;
721 
722 	else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
723 		rv = SS_NO_UP_TO_DATE_DISK;
724 
725 	else if (fp >= FP_RESOURCE &&
726 		 ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
727 		rv = SS_PRIMARY_NOP;
728 
729 	else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
730 		rv = SS_NO_UP_TO_DATE_DISK;
731 
732 	else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
733 		rv = SS_NO_LOCAL_DISK;
734 
735 	else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
736 		rv = SS_NO_REMOTE_DISK;
737 
738 	else if (ns.conn > C_CONNECTED && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE)
739 		rv = SS_NO_UP_TO_DATE_DISK;
740 
741 	else if ((ns.conn == C_CONNECTED ||
742 		  ns.conn == C_WF_BITMAP_S ||
743 		  ns.conn == C_SYNC_SOURCE ||
744 		  ns.conn == C_PAUSED_SYNC_S) &&
745 		  ns.disk == D_OUTDATED)
746 		rv = SS_CONNECTED_OUTDATES;
747 
748 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
749 		 (mdev->sync_conf.verify_alg[0] == 0))
750 		rv = SS_NO_VERIFY_ALG;
751 
752 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
753 		  mdev->agreed_pro_version < 88)
754 		rv = SS_NOT_SUPPORTED;
755 
756 	return rv;
757 }
758 
759 /**
760  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
761  * @mdev:	DRBD device.
762  * @ns:		new state.
763  * @os:		old state.
764  */
765 static int is_valid_state_transition(struct drbd_conf *mdev,
766 				     union drbd_state ns, union drbd_state os)
767 {
768 	int rv = SS_SUCCESS;
769 
770 	if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
771 	    os.conn > C_CONNECTED)
772 		rv = SS_RESYNC_RUNNING;
773 
774 	if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
775 		rv = SS_ALREADY_STANDALONE;
776 
777 	if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
778 		rv = SS_IS_DISKLESS;
779 
780 	if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
781 		rv = SS_NO_NET_CONFIG;
782 
783 	if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
784 		rv = SS_LOWER_THAN_OUTDATED;
785 
786 	if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
787 		rv = SS_IN_TRANSIENT_STATE;
788 
789 	if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
790 		rv = SS_IN_TRANSIENT_STATE;
791 
792 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
793 		rv = SS_NEED_CONNECTION;
794 
795 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
796 	    ns.conn != os.conn && os.conn > C_CONNECTED)
797 		rv = SS_RESYNC_RUNNING;
798 
799 	if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
800 	    os.conn < C_CONNECTED)
801 		rv = SS_NEED_CONNECTION;
802 
803 	return rv;
804 }
805 
806 /**
807  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
808  * @mdev:	DRBD device.
809  * @os:		old state.
810  * @ns:		new state.
811  * @warn_sync_abort:
812  *
813  * When we loose connection, we have to set the state of the peers disk (pdsk)
814  * to D_UNKNOWN. This rule and many more along those lines are in this function.
815  */
816 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
817 				       union drbd_state ns, const char **warn_sync_abort)
818 {
819 	enum drbd_fencing_p fp;
820 
821 	fp = FP_DONT_CARE;
822 	if (get_ldev(mdev)) {
823 		fp = mdev->ldev->dc.fencing;
824 		put_ldev(mdev);
825 	}
826 
827 	/* Disallow Network errors to configure a device's network part */
828 	if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
829 	    os.conn <= C_DISCONNECTING)
830 		ns.conn = os.conn;
831 
832 	/* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow.
833 	 * If you try to go into some Sync* state, that shall fail (elsewhere). */
834 	if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
835 	    ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING && ns.conn <= C_TEAR_DOWN)
836 		ns.conn = os.conn;
837 
838 	/* After C_DISCONNECTING only C_STANDALONE may follow */
839 	if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
840 		ns.conn = os.conn;
841 
842 	if (ns.conn < C_CONNECTED) {
843 		ns.peer_isp = 0;
844 		ns.peer = R_UNKNOWN;
845 		if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
846 			ns.pdsk = D_UNKNOWN;
847 	}
848 
849 	/* Clear the aftr_isp when becoming unconfigured */
850 	if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
851 		ns.aftr_isp = 0;
852 
853 	/* Abort resync if a disk fails/detaches */
854 	if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
855 	    (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
856 		if (warn_sync_abort)
857 			*warn_sync_abort =
858 				os.conn == C_VERIFY_S || os.conn == C_VERIFY_T ?
859 				"Online-verify" : "Resync";
860 		ns.conn = C_CONNECTED;
861 	}
862 
863 	if (ns.conn >= C_CONNECTED &&
864 	    ((ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED) ||
865 	     (ns.disk == D_NEGOTIATING && ns.conn == C_WF_BITMAP_T))) {
866 		switch (ns.conn) {
867 		case C_WF_BITMAP_T:
868 		case C_PAUSED_SYNC_T:
869 			ns.disk = D_OUTDATED;
870 			break;
871 		case C_CONNECTED:
872 		case C_WF_BITMAP_S:
873 		case C_SYNC_SOURCE:
874 		case C_PAUSED_SYNC_S:
875 			ns.disk = D_UP_TO_DATE;
876 			break;
877 		case C_SYNC_TARGET:
878 			ns.disk = D_INCONSISTENT;
879 			dev_warn(DEV, "Implicitly set disk state Inconsistent!\n");
880 			break;
881 		}
882 		if (os.disk == D_OUTDATED && ns.disk == D_UP_TO_DATE)
883 			dev_warn(DEV, "Implicitly set disk from Outdated to UpToDate\n");
884 	}
885 
886 	if (ns.conn >= C_CONNECTED &&
887 	    (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)) {
888 		switch (ns.conn) {
889 		case C_CONNECTED:
890 		case C_WF_BITMAP_T:
891 		case C_PAUSED_SYNC_T:
892 		case C_SYNC_TARGET:
893 			ns.pdsk = D_UP_TO_DATE;
894 			break;
895 		case C_WF_BITMAP_S:
896 		case C_PAUSED_SYNC_S:
897 			/* remap any consistent state to D_OUTDATED,
898 			 * but disallow "upgrade" of not even consistent states.
899 			 */
900 			ns.pdsk =
901 				(D_DISKLESS < os.pdsk && os.pdsk < D_OUTDATED)
902 				? os.pdsk : D_OUTDATED;
903 			break;
904 		case C_SYNC_SOURCE:
905 			ns.pdsk = D_INCONSISTENT;
906 			dev_warn(DEV, "Implicitly set pdsk Inconsistent!\n");
907 			break;
908 		}
909 		if (os.pdsk == D_OUTDATED && ns.pdsk == D_UP_TO_DATE)
910 			dev_warn(DEV, "Implicitly set pdsk from Outdated to UpToDate\n");
911 	}
912 
913 	/* Connection breaks down before we finished "Negotiating" */
914 	if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
915 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
916 		if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
917 			ns.disk = mdev->new_state_tmp.disk;
918 			ns.pdsk = mdev->new_state_tmp.pdsk;
919 		} else {
920 			dev_alert(DEV, "Connection lost while negotiating, no data!\n");
921 			ns.disk = D_DISKLESS;
922 			ns.pdsk = D_UNKNOWN;
923 		}
924 		put_ldev(mdev);
925 	}
926 
927 	if (fp == FP_STONITH &&
928 	    (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
929 	    !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
930 		ns.susp_fen = 1; /* Suspend IO while fence-peer handler runs (peer lost) */
931 
932 	if (mdev->sync_conf.on_no_data == OND_SUSPEND_IO &&
933 	    (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE) &&
934 	    !(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE))
935 		ns.susp_nod = 1; /* Suspend IO while no data available (no accessible data available) */
936 
937 	if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
938 		if (ns.conn == C_SYNC_SOURCE)
939 			ns.conn = C_PAUSED_SYNC_S;
940 		if (ns.conn == C_SYNC_TARGET)
941 			ns.conn = C_PAUSED_SYNC_T;
942 	} else {
943 		if (ns.conn == C_PAUSED_SYNC_S)
944 			ns.conn = C_SYNC_SOURCE;
945 		if (ns.conn == C_PAUSED_SYNC_T)
946 			ns.conn = C_SYNC_TARGET;
947 	}
948 
949 	return ns;
950 }
951 
952 /* helper for __drbd_set_state */
953 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
954 {
955 	if (cs == C_VERIFY_T) {
956 		/* starting online verify from an arbitrary position
957 		 * does not fit well into the existing protocol.
958 		 * on C_VERIFY_T, we initialize ov_left and friends
959 		 * implicitly in receive_DataRequest once the
960 		 * first P_OV_REQUEST is received */
961 		mdev->ov_start_sector = ~(sector_t)0;
962 	} else {
963 		unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
964 		if (bit >= mdev->rs_total)
965 			mdev->ov_start_sector =
966 				BM_BIT_TO_SECT(mdev->rs_total - 1);
967 		mdev->ov_position = mdev->ov_start_sector;
968 	}
969 }
970 
971 static void drbd_resume_al(struct drbd_conf *mdev)
972 {
973 	if (test_and_clear_bit(AL_SUSPENDED, &mdev->flags))
974 		dev_info(DEV, "Resumed AL updates\n");
975 }
976 
977 /**
978  * __drbd_set_state() - Set a new DRBD state
979  * @mdev:	DRBD device.
980  * @ns:		new state.
981  * @flags:	Flags
982  * @done:	Optional completion, that will get completed after the after_state_ch() finished
983  *
984  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
985  */
986 int __drbd_set_state(struct drbd_conf *mdev,
987 		    union drbd_state ns, enum chg_state_flags flags,
988 		    struct completion *done)
989 {
990 	union drbd_state os;
991 	int rv = SS_SUCCESS;
992 	const char *warn_sync_abort = NULL;
993 	struct after_state_chg_work *ascw;
994 
995 	os = mdev->state;
996 
997 	ns = sanitize_state(mdev, os, ns, &warn_sync_abort);
998 
999 	if (ns.i == os.i)
1000 		return SS_NOTHING_TO_DO;
1001 
1002 	if (!(flags & CS_HARD)) {
1003 		/*  pre-state-change checks ; only look at ns  */
1004 		/* See drbd_state_sw_errors in drbd_strings.c */
1005 
1006 		rv = is_valid_state(mdev, ns);
1007 		if (rv < SS_SUCCESS) {
1008 			/* If the old state was illegal as well, then let
1009 			   this happen...*/
1010 
1011 			if (is_valid_state(mdev, os) == rv)
1012 				rv = is_valid_state_transition(mdev, ns, os);
1013 		} else
1014 			rv = is_valid_state_transition(mdev, ns, os);
1015 	}
1016 
1017 	if (rv < SS_SUCCESS) {
1018 		if (flags & CS_VERBOSE)
1019 			print_st_err(mdev, os, ns, rv);
1020 		return rv;
1021 	}
1022 
1023 	if (warn_sync_abort)
1024 		dev_warn(DEV, "%s aborted.\n", warn_sync_abort);
1025 
1026 	{
1027 		char *pbp, pb[300];
1028 		pbp = pb;
1029 		*pbp = 0;
1030 		PSC(role);
1031 		PSC(peer);
1032 		PSC(conn);
1033 		PSC(disk);
1034 		PSC(pdsk);
1035 		if (is_susp(ns) != is_susp(os))
1036 			pbp += sprintf(pbp, "susp( %s -> %s ) ",
1037 				       drbd_susp_str(is_susp(os)),
1038 				       drbd_susp_str(is_susp(ns)));
1039 		PSC(aftr_isp);
1040 		PSC(peer_isp);
1041 		PSC(user_isp);
1042 		dev_info(DEV, "%s\n", pb);
1043 	}
1044 
1045 	/* solve the race between becoming unconfigured,
1046 	 * worker doing the cleanup, and
1047 	 * admin reconfiguring us:
1048 	 * on (re)configure, first set CONFIG_PENDING,
1049 	 * then wait for a potentially exiting worker,
1050 	 * start the worker, and schedule one no_op.
1051 	 * then proceed with configuration.
1052 	 */
1053 	if (ns.disk == D_DISKLESS &&
1054 	    ns.conn == C_STANDALONE &&
1055 	    ns.role == R_SECONDARY &&
1056 	    !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
1057 		set_bit(DEVICE_DYING, &mdev->flags);
1058 
1059 	mdev->state.i = ns.i;
1060 	wake_up(&mdev->misc_wait);
1061 	wake_up(&mdev->state_wait);
1062 
1063 	/* aborted verify run. log the last position */
1064 	if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1065 	    ns.conn < C_CONNECTED) {
1066 		mdev->ov_start_sector =
1067 			BM_BIT_TO_SECT(mdev->rs_total - mdev->ov_left);
1068 		dev_info(DEV, "Online Verify reached sector %llu\n",
1069 			(unsigned long long)mdev->ov_start_sector);
1070 	}
1071 
1072 	if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1073 	    (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1074 		dev_info(DEV, "Syncer continues.\n");
1075 		mdev->rs_paused += (long)jiffies
1076 				  -(long)mdev->rs_mark_time[mdev->rs_last_mark];
1077 		if (ns.conn == C_SYNC_TARGET)
1078 			mod_timer(&mdev->resync_timer, jiffies);
1079 	}
1080 
1081 	if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1082 	    (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1083 		dev_info(DEV, "Resync suspended\n");
1084 		mdev->rs_mark_time[mdev->rs_last_mark] = jiffies;
1085 	}
1086 
1087 	if (os.conn == C_CONNECTED &&
1088 	    (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1089 		unsigned long now = jiffies;
1090 		int i;
1091 
1092 		mdev->ov_position = 0;
1093 		mdev->rs_total = drbd_bm_bits(mdev);
1094 		if (mdev->agreed_pro_version >= 90)
1095 			set_ov_position(mdev, ns.conn);
1096 		else
1097 			mdev->ov_start_sector = 0;
1098 		mdev->ov_left = mdev->rs_total
1099 			      - BM_SECT_TO_BIT(mdev->ov_position);
1100 		mdev->rs_start = now;
1101 		mdev->rs_last_events = 0;
1102 		mdev->rs_last_sect_ev = 0;
1103 		mdev->ov_last_oos_size = 0;
1104 		mdev->ov_last_oos_start = 0;
1105 
1106 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1107 			mdev->rs_mark_left[i] = mdev->rs_total;
1108 			mdev->rs_mark_time[i] = now;
1109 		}
1110 
1111 		if (ns.conn == C_VERIFY_S) {
1112 			dev_info(DEV, "Starting Online Verify from sector %llu\n",
1113 					(unsigned long long)mdev->ov_position);
1114 			mod_timer(&mdev->resync_timer, jiffies);
1115 		}
1116 	}
1117 
1118 	if (get_ldev(mdev)) {
1119 		u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1120 						 MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1121 						 MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1122 
1123 		if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1124 			mdf |= MDF_CRASHED_PRIMARY;
1125 		if (mdev->state.role == R_PRIMARY ||
1126 		    (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1127 			mdf |= MDF_PRIMARY_IND;
1128 		if (mdev->state.conn > C_WF_REPORT_PARAMS)
1129 			mdf |= MDF_CONNECTED_IND;
1130 		if (mdev->state.disk > D_INCONSISTENT)
1131 			mdf |= MDF_CONSISTENT;
1132 		if (mdev->state.disk > D_OUTDATED)
1133 			mdf |= MDF_WAS_UP_TO_DATE;
1134 		if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1135 			mdf |= MDF_PEER_OUT_DATED;
1136 		if (mdf != mdev->ldev->md.flags) {
1137 			mdev->ldev->md.flags = mdf;
1138 			drbd_md_mark_dirty(mdev);
1139 		}
1140 		if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1141 			drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1142 		put_ldev(mdev);
1143 	}
1144 
1145 	/* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1146 	if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1147 	    os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1148 		set_bit(CONSIDER_RESYNC, &mdev->flags);
1149 
1150 	/* Receiver should clean up itself */
1151 	if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1152 		drbd_thread_stop_nowait(&mdev->receiver);
1153 
1154 	/* Now the receiver finished cleaning up itself, it should die */
1155 	if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1156 		drbd_thread_stop_nowait(&mdev->receiver);
1157 
1158 	/* Upon network failure, we need to restart the receiver. */
1159 	if (os.conn > C_TEAR_DOWN &&
1160 	    ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1161 		drbd_thread_restart_nowait(&mdev->receiver);
1162 
1163 	/* Resume AL writing if we get a connection */
1164 	if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
1165 		drbd_resume_al(mdev);
1166 
1167 	ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1168 	if (ascw) {
1169 		ascw->os = os;
1170 		ascw->ns = ns;
1171 		ascw->flags = flags;
1172 		ascw->w.cb = w_after_state_ch;
1173 		ascw->done = done;
1174 		drbd_queue_work(&mdev->data.work, &ascw->w);
1175 	} else {
1176 		dev_warn(DEV, "Could not kmalloc an ascw\n");
1177 	}
1178 
1179 	return rv;
1180 }
1181 
1182 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1183 {
1184 	struct after_state_chg_work *ascw =
1185 		container_of(w, struct after_state_chg_work, w);
1186 	after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1187 	if (ascw->flags & CS_WAIT_COMPLETE) {
1188 		D_ASSERT(ascw->done != NULL);
1189 		complete(ascw->done);
1190 	}
1191 	kfree(ascw);
1192 
1193 	return 1;
1194 }
1195 
1196 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1197 {
1198 	if (rv) {
1199 		dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1200 		_drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1201 		return;
1202 	}
1203 
1204 	switch (mdev->state.conn) {
1205 	case C_STARTING_SYNC_T:
1206 		_drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1207 		break;
1208 	case C_STARTING_SYNC_S:
1209 		drbd_start_resync(mdev, C_SYNC_SOURCE);
1210 		break;
1211 	}
1212 }
1213 
1214 /**
1215  * after_state_ch() - Perform after state change actions that may sleep
1216  * @mdev:	DRBD device.
1217  * @os:		old state.
1218  * @ns:		new state.
1219  * @flags:	Flags
1220  */
1221 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1222 			   union drbd_state ns, enum chg_state_flags flags)
1223 {
1224 	enum drbd_fencing_p fp;
1225 	enum drbd_req_event what = nothing;
1226 	union drbd_state nsm = (union drbd_state){ .i = -1 };
1227 
1228 	if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1229 		clear_bit(CRASHED_PRIMARY, &mdev->flags);
1230 		if (mdev->p_uuid)
1231 			mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1232 	}
1233 
1234 	fp = FP_DONT_CARE;
1235 	if (get_ldev(mdev)) {
1236 		fp = mdev->ldev->dc.fencing;
1237 		put_ldev(mdev);
1238 	}
1239 
1240 	/* Inform userspace about the change... */
1241 	drbd_bcast_state(mdev, ns);
1242 
1243 	if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1244 	    (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1245 		drbd_khelper(mdev, "pri-on-incon-degr");
1246 
1247 	/* Here we have the actions that are performed after a
1248 	   state change. This function might sleep */
1249 
1250 	nsm.i = -1;
1251 	if (ns.susp_nod) {
1252 		if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
1253 			if (ns.conn == C_CONNECTED)
1254 				what = resend, nsm.susp_nod = 0;
1255 			else /* ns.conn > C_CONNECTED */
1256 				dev_err(DEV, "Unexpected Resynd going on!\n");
1257 		}
1258 
1259 		if (os.disk == D_ATTACHING && ns.disk > D_ATTACHING)
1260 			what = restart_frozen_disk_io, nsm.susp_nod = 0;
1261 
1262 	}
1263 
1264 	if (ns.susp_fen) {
1265 		/* case1: The outdate peer handler is successful: */
1266 		if (os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) {
1267 			tl_clear(mdev);
1268 			if (test_bit(NEW_CUR_UUID, &mdev->flags)) {
1269 				drbd_uuid_new_current(mdev);
1270 				clear_bit(NEW_CUR_UUID, &mdev->flags);
1271 				drbd_md_sync(mdev);
1272 			}
1273 			spin_lock_irq(&mdev->req_lock);
1274 			_drbd_set_state(_NS(mdev, susp_fen, 0), CS_VERBOSE, NULL);
1275 			spin_unlock_irq(&mdev->req_lock);
1276 		}
1277 		/* case2: The connection was established again: */
1278 		if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
1279 			clear_bit(NEW_CUR_UUID, &mdev->flags);
1280 			what = resend;
1281 			nsm.susp_fen = 0;
1282 		}
1283 	}
1284 
1285 	if (what != nothing) {
1286 		spin_lock_irq(&mdev->req_lock);
1287 		_tl_restart(mdev, what);
1288 		nsm.i &= mdev->state.i;
1289 		_drbd_set_state(mdev, nsm, CS_VERBOSE, NULL);
1290 		spin_unlock_irq(&mdev->req_lock);
1291 	}
1292 
1293 	/* Do not change the order of the if above and the two below... */
1294 	if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1295 		drbd_send_uuids(mdev);
1296 		drbd_send_state(mdev);
1297 	}
1298 	if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S)
1299 		drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL, "send_bitmap (WFBitMapS)");
1300 
1301 	/* Lost contact to peer's copy of the data */
1302 	if ((os.pdsk >= D_INCONSISTENT &&
1303 	     os.pdsk != D_UNKNOWN &&
1304 	     os.pdsk != D_OUTDATED)
1305 	&&  (ns.pdsk < D_INCONSISTENT ||
1306 	     ns.pdsk == D_UNKNOWN ||
1307 	     ns.pdsk == D_OUTDATED)) {
1308 		if (get_ldev(mdev)) {
1309 			if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1310 			    mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1311 				if (is_susp(mdev->state)) {
1312 					set_bit(NEW_CUR_UUID, &mdev->flags);
1313 				} else {
1314 					drbd_uuid_new_current(mdev);
1315 					drbd_send_uuids(mdev);
1316 				}
1317 			}
1318 			put_ldev(mdev);
1319 		}
1320 	}
1321 
1322 	if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1323 		if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0) {
1324 			drbd_uuid_new_current(mdev);
1325 			drbd_send_uuids(mdev);
1326 		}
1327 
1328 		/* D_DISKLESS Peer becomes secondary */
1329 		if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1330 			drbd_al_to_on_disk_bm(mdev);
1331 		put_ldev(mdev);
1332 	}
1333 
1334 	/* Last part of the attaching process ... */
1335 	if (ns.conn >= C_CONNECTED &&
1336 	    os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1337 		drbd_send_sizes(mdev, 0, 0);  /* to start sync... */
1338 		drbd_send_uuids(mdev);
1339 		drbd_send_state(mdev);
1340 	}
1341 
1342 	/* We want to pause/continue resync, tell peer. */
1343 	if (ns.conn >= C_CONNECTED &&
1344 	     ((os.aftr_isp != ns.aftr_isp) ||
1345 	      (os.user_isp != ns.user_isp)))
1346 		drbd_send_state(mdev);
1347 
1348 	/* In case one of the isp bits got set, suspend other devices. */
1349 	if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1350 	    (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1351 		suspend_other_sg(mdev);
1352 
1353 	/* Make sure the peer gets informed about eventual state
1354 	   changes (ISP bits) while we were in WFReportParams. */
1355 	if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1356 		drbd_send_state(mdev);
1357 
1358 	/* We are in the progress to start a full sync... */
1359 	if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1360 	    (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1361 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, &abw_start_sync, "set_n_write from StartingSync");
1362 
1363 	/* We are invalidating our self... */
1364 	if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1365 	    os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1366 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
1367 
1368 	/* first half of local IO error */
1369 	if (os.disk > D_FAILED && ns.disk == D_FAILED) {
1370 		enum drbd_io_error_p eh = EP_PASS_ON;
1371 
1372 		if (drbd_send_state(mdev))
1373 			dev_warn(DEV, "Notified peer that my disk is broken.\n");
1374 		else
1375 			dev_err(DEV, "Sending state for drbd_io_error() failed\n");
1376 
1377 		drbd_rs_cancel_all(mdev);
1378 
1379 		if (get_ldev_if_state(mdev, D_FAILED)) {
1380 			eh = mdev->ldev->dc.on_io_error;
1381 			put_ldev(mdev);
1382 		}
1383 		if (eh == EP_CALL_HELPER)
1384 			drbd_khelper(mdev, "local-io-error");
1385 	}
1386 
1387 
1388 	/* second half of local IO error handling,
1389 	 * after local_cnt references have reached zero: */
1390 	if (os.disk == D_FAILED && ns.disk == D_DISKLESS) {
1391 		mdev->rs_total = 0;
1392 		mdev->rs_failed = 0;
1393 		atomic_set(&mdev->rs_pending_cnt, 0);
1394 	}
1395 
1396 	if (os.disk > D_DISKLESS && ns.disk == D_DISKLESS) {
1397 		/* We must still be diskless,
1398 		 * re-attach has to be serialized with this! */
1399 		if (mdev->state.disk != D_DISKLESS)
1400 			dev_err(DEV,
1401 				"ASSERT FAILED: disk is %s while going diskless\n",
1402 				drbd_disk_str(mdev->state.disk));
1403 
1404 		/* we cannot assert local_cnt == 0 here, as get_ldev_if_state
1405 		 * will inc/dec it frequently. Since we became D_DISKLESS, no
1406 		 * one has touched the protected members anymore, though, so we
1407 		 * are safe to free them here. */
1408 		if (drbd_send_state(mdev))
1409 			dev_warn(DEV, "Notified peer that I detached my disk.\n");
1410 		else
1411 			dev_err(DEV, "Sending state for detach failed\n");
1412 
1413 		lc_destroy(mdev->resync);
1414 		mdev->resync = NULL;
1415 		lc_destroy(mdev->act_log);
1416 		mdev->act_log = NULL;
1417 		__no_warn(local,
1418 			drbd_free_bc(mdev->ldev);
1419 			mdev->ldev = NULL;);
1420 
1421 		if (mdev->md_io_tmpp) {
1422 			__free_page(mdev->md_io_tmpp);
1423 			mdev->md_io_tmpp = NULL;
1424 		}
1425 	}
1426 
1427 	/* Disks got bigger while they were detached */
1428 	if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1429 	    test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1430 		if (ns.conn == C_CONNECTED)
1431 			resync_after_online_grow(mdev);
1432 	}
1433 
1434 	/* A resync finished or aborted, wake paused devices... */
1435 	if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1436 	    (os.peer_isp && !ns.peer_isp) ||
1437 	    (os.user_isp && !ns.user_isp))
1438 		resume_next_sg(mdev);
1439 
1440 	/* sync target done with resync.  Explicitly notify peer, even though
1441 	 * it should (at least for non-empty resyncs) already know itself. */
1442 	if (os.disk < D_UP_TO_DATE && os.conn >= C_SYNC_SOURCE && ns.conn == C_CONNECTED)
1443 		drbd_send_state(mdev);
1444 
1445 	/* free tl_hash if we Got thawed and are C_STANDALONE */
1446 	if (ns.conn == C_STANDALONE && !is_susp(ns) && mdev->tl_hash)
1447 		drbd_free_tl_hash(mdev);
1448 
1449 	/* Upon network connection, we need to start the receiver */
1450 	if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1451 		drbd_thread_start(&mdev->receiver);
1452 
1453 	/* Terminate worker thread if we are unconfigured - it will be
1454 	   restarted as needed... */
1455 	if (ns.disk == D_DISKLESS &&
1456 	    ns.conn == C_STANDALONE &&
1457 	    ns.role == R_SECONDARY) {
1458 		if (os.aftr_isp != ns.aftr_isp)
1459 			resume_next_sg(mdev);
1460 		/* set in __drbd_set_state, unless CONFIG_PENDING was set */
1461 		if (test_bit(DEVICE_DYING, &mdev->flags))
1462 			drbd_thread_stop_nowait(&mdev->worker);
1463 	}
1464 
1465 	drbd_md_sync(mdev);
1466 }
1467 
1468 
1469 static int drbd_thread_setup(void *arg)
1470 {
1471 	struct drbd_thread *thi = (struct drbd_thread *) arg;
1472 	struct drbd_conf *mdev = thi->mdev;
1473 	unsigned long flags;
1474 	int retval;
1475 
1476 restart:
1477 	retval = thi->function(thi);
1478 
1479 	spin_lock_irqsave(&thi->t_lock, flags);
1480 
1481 	/* if the receiver has been "Exiting", the last thing it did
1482 	 * was set the conn state to "StandAlone",
1483 	 * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1484 	 * and receiver thread will be "started".
1485 	 * drbd_thread_start needs to set "Restarting" in that case.
1486 	 * t_state check and assignment needs to be within the same spinlock,
1487 	 * so either thread_start sees Exiting, and can remap to Restarting,
1488 	 * or thread_start see None, and can proceed as normal.
1489 	 */
1490 
1491 	if (thi->t_state == Restarting) {
1492 		dev_info(DEV, "Restarting %s\n", current->comm);
1493 		thi->t_state = Running;
1494 		spin_unlock_irqrestore(&thi->t_lock, flags);
1495 		goto restart;
1496 	}
1497 
1498 	thi->task = NULL;
1499 	thi->t_state = None;
1500 	smp_mb();
1501 	complete(&thi->stop);
1502 	spin_unlock_irqrestore(&thi->t_lock, flags);
1503 
1504 	dev_info(DEV, "Terminating %s\n", current->comm);
1505 
1506 	/* Release mod reference taken when thread was started */
1507 	module_put(THIS_MODULE);
1508 	return retval;
1509 }
1510 
1511 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1512 		      int (*func) (struct drbd_thread *))
1513 {
1514 	spin_lock_init(&thi->t_lock);
1515 	thi->task    = NULL;
1516 	thi->t_state = None;
1517 	thi->function = func;
1518 	thi->mdev = mdev;
1519 }
1520 
1521 int drbd_thread_start(struct drbd_thread *thi)
1522 {
1523 	struct drbd_conf *mdev = thi->mdev;
1524 	struct task_struct *nt;
1525 	unsigned long flags;
1526 
1527 	const char *me =
1528 		thi == &mdev->receiver ? "receiver" :
1529 		thi == &mdev->asender  ? "asender"  :
1530 		thi == &mdev->worker   ? "worker"   : "NONSENSE";
1531 
1532 	/* is used from state engine doing drbd_thread_stop_nowait,
1533 	 * while holding the req lock irqsave */
1534 	spin_lock_irqsave(&thi->t_lock, flags);
1535 
1536 	switch (thi->t_state) {
1537 	case None:
1538 		dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1539 				me, current->comm, current->pid);
1540 
1541 		/* Get ref on module for thread - this is released when thread exits */
1542 		if (!try_module_get(THIS_MODULE)) {
1543 			dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1544 			spin_unlock_irqrestore(&thi->t_lock, flags);
1545 			return FALSE;
1546 		}
1547 
1548 		init_completion(&thi->stop);
1549 		D_ASSERT(thi->task == NULL);
1550 		thi->reset_cpu_mask = 1;
1551 		thi->t_state = Running;
1552 		spin_unlock_irqrestore(&thi->t_lock, flags);
1553 		flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1554 
1555 		nt = kthread_create(drbd_thread_setup, (void *) thi,
1556 				    "drbd%d_%s", mdev_to_minor(mdev), me);
1557 
1558 		if (IS_ERR(nt)) {
1559 			dev_err(DEV, "Couldn't start thread\n");
1560 
1561 			module_put(THIS_MODULE);
1562 			return FALSE;
1563 		}
1564 		spin_lock_irqsave(&thi->t_lock, flags);
1565 		thi->task = nt;
1566 		thi->t_state = Running;
1567 		spin_unlock_irqrestore(&thi->t_lock, flags);
1568 		wake_up_process(nt);
1569 		break;
1570 	case Exiting:
1571 		thi->t_state = Restarting;
1572 		dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1573 				me, current->comm, current->pid);
1574 		/* fall through */
1575 	case Running:
1576 	case Restarting:
1577 	default:
1578 		spin_unlock_irqrestore(&thi->t_lock, flags);
1579 		break;
1580 	}
1581 
1582 	return TRUE;
1583 }
1584 
1585 
1586 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1587 {
1588 	unsigned long flags;
1589 
1590 	enum drbd_thread_state ns = restart ? Restarting : Exiting;
1591 
1592 	/* may be called from state engine, holding the req lock irqsave */
1593 	spin_lock_irqsave(&thi->t_lock, flags);
1594 
1595 	if (thi->t_state == None) {
1596 		spin_unlock_irqrestore(&thi->t_lock, flags);
1597 		if (restart)
1598 			drbd_thread_start(thi);
1599 		return;
1600 	}
1601 
1602 	if (thi->t_state != ns) {
1603 		if (thi->task == NULL) {
1604 			spin_unlock_irqrestore(&thi->t_lock, flags);
1605 			return;
1606 		}
1607 
1608 		thi->t_state = ns;
1609 		smp_mb();
1610 		init_completion(&thi->stop);
1611 		if (thi->task != current)
1612 			force_sig(DRBD_SIGKILL, thi->task);
1613 
1614 	}
1615 
1616 	spin_unlock_irqrestore(&thi->t_lock, flags);
1617 
1618 	if (wait)
1619 		wait_for_completion(&thi->stop);
1620 }
1621 
1622 #ifdef CONFIG_SMP
1623 /**
1624  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1625  * @mdev:	DRBD device.
1626  *
1627  * Forces all threads of a device onto the same CPU. This is beneficial for
1628  * DRBD's performance. May be overwritten by user's configuration.
1629  */
1630 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1631 {
1632 	int ord, cpu;
1633 
1634 	/* user override. */
1635 	if (cpumask_weight(mdev->cpu_mask))
1636 		return;
1637 
1638 	ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1639 	for_each_online_cpu(cpu) {
1640 		if (ord-- == 0) {
1641 			cpumask_set_cpu(cpu, mdev->cpu_mask);
1642 			return;
1643 		}
1644 	}
1645 	/* should not be reached */
1646 	cpumask_setall(mdev->cpu_mask);
1647 }
1648 
1649 /**
1650  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1651  * @mdev:	DRBD device.
1652  *
1653  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1654  * prematurely.
1655  */
1656 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1657 {
1658 	struct task_struct *p = current;
1659 	struct drbd_thread *thi =
1660 		p == mdev->asender.task  ? &mdev->asender  :
1661 		p == mdev->receiver.task ? &mdev->receiver :
1662 		p == mdev->worker.task   ? &mdev->worker   :
1663 		NULL;
1664 	ERR_IF(thi == NULL)
1665 		return;
1666 	if (!thi->reset_cpu_mask)
1667 		return;
1668 	thi->reset_cpu_mask = 0;
1669 	set_cpus_allowed_ptr(p, mdev->cpu_mask);
1670 }
1671 #endif
1672 
1673 /* the appropriate socket mutex must be held already */
1674 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1675 			  enum drbd_packets cmd, struct p_header80 *h,
1676 			  size_t size, unsigned msg_flags)
1677 {
1678 	int sent, ok;
1679 
1680 	ERR_IF(!h) return FALSE;
1681 	ERR_IF(!size) return FALSE;
1682 
1683 	h->magic   = BE_DRBD_MAGIC;
1684 	h->command = cpu_to_be16(cmd);
1685 	h->length  = cpu_to_be16(size-sizeof(struct p_header80));
1686 
1687 	sent = drbd_send(mdev, sock, h, size, msg_flags);
1688 
1689 	ok = (sent == size);
1690 	if (!ok)
1691 		dev_err(DEV, "short sent %s size=%d sent=%d\n",
1692 		    cmdname(cmd), (int)size, sent);
1693 	return ok;
1694 }
1695 
1696 /* don't pass the socket. we may only look at it
1697  * when we hold the appropriate socket mutex.
1698  */
1699 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1700 		  enum drbd_packets cmd, struct p_header80 *h, size_t size)
1701 {
1702 	int ok = 0;
1703 	struct socket *sock;
1704 
1705 	if (use_data_socket) {
1706 		mutex_lock(&mdev->data.mutex);
1707 		sock = mdev->data.socket;
1708 	} else {
1709 		mutex_lock(&mdev->meta.mutex);
1710 		sock = mdev->meta.socket;
1711 	}
1712 
1713 	/* drbd_disconnect() could have called drbd_free_sock()
1714 	 * while we were waiting in down()... */
1715 	if (likely(sock != NULL))
1716 		ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1717 
1718 	if (use_data_socket)
1719 		mutex_unlock(&mdev->data.mutex);
1720 	else
1721 		mutex_unlock(&mdev->meta.mutex);
1722 	return ok;
1723 }
1724 
1725 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
1726 		   size_t size)
1727 {
1728 	struct p_header80 h;
1729 	int ok;
1730 
1731 	h.magic   = BE_DRBD_MAGIC;
1732 	h.command = cpu_to_be16(cmd);
1733 	h.length  = cpu_to_be16(size);
1734 
1735 	if (!drbd_get_data_sock(mdev))
1736 		return 0;
1737 
1738 	ok = (sizeof(h) ==
1739 		drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
1740 	ok = ok && (size ==
1741 		drbd_send(mdev, mdev->data.socket, data, size, 0));
1742 
1743 	drbd_put_data_sock(mdev);
1744 
1745 	return ok;
1746 }
1747 
1748 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
1749 {
1750 	struct p_rs_param_95 *p;
1751 	struct socket *sock;
1752 	int size, rv;
1753 	const int apv = mdev->agreed_pro_version;
1754 
1755 	size = apv <= 87 ? sizeof(struct p_rs_param)
1756 		: apv == 88 ? sizeof(struct p_rs_param)
1757 			+ strlen(mdev->sync_conf.verify_alg) + 1
1758 		: apv <= 94 ? sizeof(struct p_rs_param_89)
1759 		: /* apv >= 95 */ sizeof(struct p_rs_param_95);
1760 
1761 	/* used from admin command context and receiver/worker context.
1762 	 * to avoid kmalloc, grab the socket right here,
1763 	 * then use the pre-allocated sbuf there */
1764 	mutex_lock(&mdev->data.mutex);
1765 	sock = mdev->data.socket;
1766 
1767 	if (likely(sock != NULL)) {
1768 		enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
1769 
1770 		p = &mdev->data.sbuf.rs_param_95;
1771 
1772 		/* initialize verify_alg and csums_alg */
1773 		memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
1774 
1775 		p->rate = cpu_to_be32(sc->rate);
1776 		p->c_plan_ahead = cpu_to_be32(sc->c_plan_ahead);
1777 		p->c_delay_target = cpu_to_be32(sc->c_delay_target);
1778 		p->c_fill_target = cpu_to_be32(sc->c_fill_target);
1779 		p->c_max_rate = cpu_to_be32(sc->c_max_rate);
1780 
1781 		if (apv >= 88)
1782 			strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
1783 		if (apv >= 89)
1784 			strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
1785 
1786 		rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
1787 	} else
1788 		rv = 0; /* not ok */
1789 
1790 	mutex_unlock(&mdev->data.mutex);
1791 
1792 	return rv;
1793 }
1794 
1795 int drbd_send_protocol(struct drbd_conf *mdev)
1796 {
1797 	struct p_protocol *p;
1798 	int size, cf, rv;
1799 
1800 	size = sizeof(struct p_protocol);
1801 
1802 	if (mdev->agreed_pro_version >= 87)
1803 		size += strlen(mdev->net_conf->integrity_alg) + 1;
1804 
1805 	/* we must not recurse into our own queue,
1806 	 * as that is blocked during handshake */
1807 	p = kmalloc(size, GFP_NOIO);
1808 	if (p == NULL)
1809 		return 0;
1810 
1811 	p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
1812 	p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
1813 	p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
1814 	p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
1815 	p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
1816 
1817 	cf = 0;
1818 	if (mdev->net_conf->want_lose)
1819 		cf |= CF_WANT_LOSE;
1820 	if (mdev->net_conf->dry_run) {
1821 		if (mdev->agreed_pro_version >= 92)
1822 			cf |= CF_DRY_RUN;
1823 		else {
1824 			dev_err(DEV, "--dry-run is not supported by peer");
1825 			kfree(p);
1826 			return 0;
1827 		}
1828 	}
1829 	p->conn_flags    = cpu_to_be32(cf);
1830 
1831 	if (mdev->agreed_pro_version >= 87)
1832 		strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
1833 
1834 	rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
1835 			   (struct p_header80 *)p, size);
1836 	kfree(p);
1837 	return rv;
1838 }
1839 
1840 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1841 {
1842 	struct p_uuids p;
1843 	int i;
1844 
1845 	if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1846 		return 1;
1847 
1848 	for (i = UI_CURRENT; i < UI_SIZE; i++)
1849 		p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1850 
1851 	mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1852 	p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1853 	uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
1854 	uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1855 	uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1856 	p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1857 
1858 	put_ldev(mdev);
1859 
1860 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
1861 			     (struct p_header80 *)&p, sizeof(p));
1862 }
1863 
1864 int drbd_send_uuids(struct drbd_conf *mdev)
1865 {
1866 	return _drbd_send_uuids(mdev, 0);
1867 }
1868 
1869 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1870 {
1871 	return _drbd_send_uuids(mdev, 8);
1872 }
1873 
1874 
1875 int drbd_send_sync_uuid(struct drbd_conf *mdev, u64 val)
1876 {
1877 	struct p_rs_uuid p;
1878 
1879 	p.uuid = cpu_to_be64(val);
1880 
1881 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
1882 			     (struct p_header80 *)&p, sizeof(p));
1883 }
1884 
1885 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1886 {
1887 	struct p_sizes p;
1888 	sector_t d_size, u_size;
1889 	int q_order_type;
1890 	int ok;
1891 
1892 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1893 		D_ASSERT(mdev->ldev->backing_bdev);
1894 		d_size = drbd_get_max_capacity(mdev->ldev);
1895 		u_size = mdev->ldev->dc.disk_size;
1896 		q_order_type = drbd_queue_order_type(mdev);
1897 		put_ldev(mdev);
1898 	} else {
1899 		d_size = 0;
1900 		u_size = 0;
1901 		q_order_type = QUEUE_ORDERED_NONE;
1902 	}
1903 
1904 	p.d_size = cpu_to_be64(d_size);
1905 	p.u_size = cpu_to_be64(u_size);
1906 	p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1907 	p.max_segment_size = cpu_to_be32(queue_max_segment_size(mdev->rq_queue));
1908 	p.queue_order_type = cpu_to_be16(q_order_type);
1909 	p.dds_flags = cpu_to_be16(flags);
1910 
1911 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
1912 			   (struct p_header80 *)&p, sizeof(p));
1913 	return ok;
1914 }
1915 
1916 /**
1917  * drbd_send_state() - Sends the drbd state to the peer
1918  * @mdev:	DRBD device.
1919  */
1920 int drbd_send_state(struct drbd_conf *mdev)
1921 {
1922 	struct socket *sock;
1923 	struct p_state p;
1924 	int ok = 0;
1925 
1926 	/* Grab state lock so we wont send state if we're in the middle
1927 	 * of a cluster wide state change on another thread */
1928 	drbd_state_lock(mdev);
1929 
1930 	mutex_lock(&mdev->data.mutex);
1931 
1932 	p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1933 	sock = mdev->data.socket;
1934 
1935 	if (likely(sock != NULL)) {
1936 		ok = _drbd_send_cmd(mdev, sock, P_STATE,
1937 				    (struct p_header80 *)&p, sizeof(p), 0);
1938 	}
1939 
1940 	mutex_unlock(&mdev->data.mutex);
1941 
1942 	drbd_state_unlock(mdev);
1943 	return ok;
1944 }
1945 
1946 int drbd_send_state_req(struct drbd_conf *mdev,
1947 	union drbd_state mask, union drbd_state val)
1948 {
1949 	struct p_req_state p;
1950 
1951 	p.mask    = cpu_to_be32(mask.i);
1952 	p.val     = cpu_to_be32(val.i);
1953 
1954 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
1955 			     (struct p_header80 *)&p, sizeof(p));
1956 }
1957 
1958 int drbd_send_sr_reply(struct drbd_conf *mdev, int retcode)
1959 {
1960 	struct p_req_state_reply p;
1961 
1962 	p.retcode    = cpu_to_be32(retcode);
1963 
1964 	return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
1965 			     (struct p_header80 *)&p, sizeof(p));
1966 }
1967 
1968 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1969 	struct p_compressed_bm *p,
1970 	struct bm_xfer_ctx *c)
1971 {
1972 	struct bitstream bs;
1973 	unsigned long plain_bits;
1974 	unsigned long tmp;
1975 	unsigned long rl;
1976 	unsigned len;
1977 	unsigned toggle;
1978 	int bits;
1979 
1980 	/* may we use this feature? */
1981 	if ((mdev->sync_conf.use_rle == 0) ||
1982 		(mdev->agreed_pro_version < 90))
1983 			return 0;
1984 
1985 	if (c->bit_offset >= c->bm_bits)
1986 		return 0; /* nothing to do. */
1987 
1988 	/* use at most thus many bytes */
1989 	bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1990 	memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1991 	/* plain bits covered in this code string */
1992 	plain_bits = 0;
1993 
1994 	/* p->encoding & 0x80 stores whether the first run length is set.
1995 	 * bit offset is implicit.
1996 	 * start with toggle == 2 to be able to tell the first iteration */
1997 	toggle = 2;
1998 
1999 	/* see how much plain bits we can stuff into one packet
2000 	 * using RLE and VLI. */
2001 	do {
2002 		tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
2003 				    : _drbd_bm_find_next(mdev, c->bit_offset);
2004 		if (tmp == -1UL)
2005 			tmp = c->bm_bits;
2006 		rl = tmp - c->bit_offset;
2007 
2008 		if (toggle == 2) { /* first iteration */
2009 			if (rl == 0) {
2010 				/* the first checked bit was set,
2011 				 * store start value, */
2012 				DCBP_set_start(p, 1);
2013 				/* but skip encoding of zero run length */
2014 				toggle = !toggle;
2015 				continue;
2016 			}
2017 			DCBP_set_start(p, 0);
2018 		}
2019 
2020 		/* paranoia: catch zero runlength.
2021 		 * can only happen if bitmap is modified while we scan it. */
2022 		if (rl == 0) {
2023 			dev_err(DEV, "unexpected zero runlength while encoding bitmap "
2024 			    "t:%u bo:%lu\n", toggle, c->bit_offset);
2025 			return -1;
2026 		}
2027 
2028 		bits = vli_encode_bits(&bs, rl);
2029 		if (bits == -ENOBUFS) /* buffer full */
2030 			break;
2031 		if (bits <= 0) {
2032 			dev_err(DEV, "error while encoding bitmap: %d\n", bits);
2033 			return 0;
2034 		}
2035 
2036 		toggle = !toggle;
2037 		plain_bits += rl;
2038 		c->bit_offset = tmp;
2039 	} while (c->bit_offset < c->bm_bits);
2040 
2041 	len = bs.cur.b - p->code + !!bs.cur.bit;
2042 
2043 	if (plain_bits < (len << 3)) {
2044 		/* incompressible with this method.
2045 		 * we need to rewind both word and bit position. */
2046 		c->bit_offset -= plain_bits;
2047 		bm_xfer_ctx_bit_to_word_offset(c);
2048 		c->bit_offset = c->word_offset * BITS_PER_LONG;
2049 		return 0;
2050 	}
2051 
2052 	/* RLE + VLI was able to compress it just fine.
2053 	 * update c->word_offset. */
2054 	bm_xfer_ctx_bit_to_word_offset(c);
2055 
2056 	/* store pad_bits */
2057 	DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
2058 
2059 	return len;
2060 }
2061 
2062 enum { OK, FAILED, DONE }
2063 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
2064 	struct p_header80 *h, struct bm_xfer_ctx *c)
2065 {
2066 	struct p_compressed_bm *p = (void*)h;
2067 	unsigned long num_words;
2068 	int len;
2069 	int ok;
2070 
2071 	len = fill_bitmap_rle_bits(mdev, p, c);
2072 
2073 	if (len < 0)
2074 		return FAILED;
2075 
2076 	if (len) {
2077 		DCBP_set_code(p, RLE_VLI_Bits);
2078 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
2079 			sizeof(*p) + len, 0);
2080 
2081 		c->packets[0]++;
2082 		c->bytes[0] += sizeof(*p) + len;
2083 
2084 		if (c->bit_offset >= c->bm_bits)
2085 			len = 0; /* DONE */
2086 	} else {
2087 		/* was not compressible.
2088 		 * send a buffer full of plain text bits instead. */
2089 		num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
2090 		len = num_words * sizeof(long);
2091 		if (len)
2092 			drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
2093 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
2094 				   h, sizeof(struct p_header80) + len, 0);
2095 		c->word_offset += num_words;
2096 		c->bit_offset = c->word_offset * BITS_PER_LONG;
2097 
2098 		c->packets[1]++;
2099 		c->bytes[1] += sizeof(struct p_header80) + len;
2100 
2101 		if (c->bit_offset > c->bm_bits)
2102 			c->bit_offset = c->bm_bits;
2103 	}
2104 	ok = ok ? ((len == 0) ? DONE : OK) : FAILED;
2105 
2106 	if (ok == DONE)
2107 		INFO_bm_xfer_stats(mdev, "send", c);
2108 	return ok;
2109 }
2110 
2111 /* See the comment at receive_bitmap() */
2112 int _drbd_send_bitmap(struct drbd_conf *mdev)
2113 {
2114 	struct bm_xfer_ctx c;
2115 	struct p_header80 *p;
2116 	int ret;
2117 
2118 	ERR_IF(!mdev->bitmap) return FALSE;
2119 
2120 	/* maybe we should use some per thread scratch page,
2121 	 * and allocate that during initial device creation? */
2122 	p = (struct p_header80 *) __get_free_page(GFP_NOIO);
2123 	if (!p) {
2124 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
2125 		return FALSE;
2126 	}
2127 
2128 	if (get_ldev(mdev)) {
2129 		if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2130 			dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2131 			drbd_bm_set_all(mdev);
2132 			if (drbd_bm_write(mdev)) {
2133 				/* write_bm did fail! Leave full sync flag set in Meta P_DATA
2134 				 * but otherwise process as per normal - need to tell other
2135 				 * side that a full resync is required! */
2136 				dev_err(DEV, "Failed to write bitmap to disk!\n");
2137 			} else {
2138 				drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2139 				drbd_md_sync(mdev);
2140 			}
2141 		}
2142 		put_ldev(mdev);
2143 	}
2144 
2145 	c = (struct bm_xfer_ctx) {
2146 		.bm_bits = drbd_bm_bits(mdev),
2147 		.bm_words = drbd_bm_words(mdev),
2148 	};
2149 
2150 	do {
2151 		ret = send_bitmap_rle_or_plain(mdev, p, &c);
2152 	} while (ret == OK);
2153 
2154 	free_page((unsigned long) p);
2155 	return (ret == DONE);
2156 }
2157 
2158 int drbd_send_bitmap(struct drbd_conf *mdev)
2159 {
2160 	int err;
2161 
2162 	if (!drbd_get_data_sock(mdev))
2163 		return -1;
2164 	err = !_drbd_send_bitmap(mdev);
2165 	drbd_put_data_sock(mdev);
2166 	return err;
2167 }
2168 
2169 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2170 {
2171 	int ok;
2172 	struct p_barrier_ack p;
2173 
2174 	p.barrier  = barrier_nr;
2175 	p.set_size = cpu_to_be32(set_size);
2176 
2177 	if (mdev->state.conn < C_CONNECTED)
2178 		return FALSE;
2179 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2180 			(struct p_header80 *)&p, sizeof(p));
2181 	return ok;
2182 }
2183 
2184 /**
2185  * _drbd_send_ack() - Sends an ack packet
2186  * @mdev:	DRBD device.
2187  * @cmd:	Packet command code.
2188  * @sector:	sector, needs to be in big endian byte order
2189  * @blksize:	size in byte, needs to be in big endian byte order
2190  * @block_id:	Id, big endian byte order
2191  */
2192 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2193 			  u64 sector,
2194 			  u32 blksize,
2195 			  u64 block_id)
2196 {
2197 	int ok;
2198 	struct p_block_ack p;
2199 
2200 	p.sector   = sector;
2201 	p.block_id = block_id;
2202 	p.blksize  = blksize;
2203 	p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2204 
2205 	if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2206 		return FALSE;
2207 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2208 				(struct p_header80 *)&p, sizeof(p));
2209 	return ok;
2210 }
2211 
2212 /* dp->sector and dp->block_id already/still in network byte order,
2213  * data_size is payload size according to dp->head,
2214  * and may need to be corrected for digest size. */
2215 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2216 		     struct p_data *dp, int data_size)
2217 {
2218 	data_size -= (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
2219 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
2220 	return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2221 			      dp->block_id);
2222 }
2223 
2224 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2225 		     struct p_block_req *rp)
2226 {
2227 	return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2228 }
2229 
2230 /**
2231  * drbd_send_ack() - Sends an ack packet
2232  * @mdev:	DRBD device.
2233  * @cmd:	Packet command code.
2234  * @e:		Epoch entry.
2235  */
2236 int drbd_send_ack(struct drbd_conf *mdev,
2237 	enum drbd_packets cmd, struct drbd_epoch_entry *e)
2238 {
2239 	return _drbd_send_ack(mdev, cmd,
2240 			      cpu_to_be64(e->sector),
2241 			      cpu_to_be32(e->size),
2242 			      e->block_id);
2243 }
2244 
2245 /* This function misuses the block_id field to signal if the blocks
2246  * are is sync or not. */
2247 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2248 		     sector_t sector, int blksize, u64 block_id)
2249 {
2250 	return _drbd_send_ack(mdev, cmd,
2251 			      cpu_to_be64(sector),
2252 			      cpu_to_be32(blksize),
2253 			      cpu_to_be64(block_id));
2254 }
2255 
2256 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2257 		       sector_t sector, int size, u64 block_id)
2258 {
2259 	int ok;
2260 	struct p_block_req p;
2261 
2262 	p.sector   = cpu_to_be64(sector);
2263 	p.block_id = block_id;
2264 	p.blksize  = cpu_to_be32(size);
2265 
2266 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2267 				(struct p_header80 *)&p, sizeof(p));
2268 	return ok;
2269 }
2270 
2271 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2272 			    sector_t sector, int size,
2273 			    void *digest, int digest_size,
2274 			    enum drbd_packets cmd)
2275 {
2276 	int ok;
2277 	struct p_block_req p;
2278 
2279 	p.sector   = cpu_to_be64(sector);
2280 	p.block_id = BE_DRBD_MAGIC + 0xbeef;
2281 	p.blksize  = cpu_to_be32(size);
2282 
2283 	p.head.magic   = BE_DRBD_MAGIC;
2284 	p.head.command = cpu_to_be16(cmd);
2285 	p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + digest_size);
2286 
2287 	mutex_lock(&mdev->data.mutex);
2288 
2289 	ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2290 	ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2291 
2292 	mutex_unlock(&mdev->data.mutex);
2293 
2294 	return ok;
2295 }
2296 
2297 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2298 {
2299 	int ok;
2300 	struct p_block_req p;
2301 
2302 	p.sector   = cpu_to_be64(sector);
2303 	p.block_id = BE_DRBD_MAGIC + 0xbabe;
2304 	p.blksize  = cpu_to_be32(size);
2305 
2306 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2307 			   (struct p_header80 *)&p, sizeof(p));
2308 	return ok;
2309 }
2310 
2311 /* called on sndtimeo
2312  * returns FALSE if we should retry,
2313  * TRUE if we think connection is dead
2314  */
2315 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2316 {
2317 	int drop_it;
2318 	/* long elapsed = (long)(jiffies - mdev->last_received); */
2319 
2320 	drop_it =   mdev->meta.socket == sock
2321 		|| !mdev->asender.task
2322 		|| get_t_state(&mdev->asender) != Running
2323 		|| mdev->state.conn < C_CONNECTED;
2324 
2325 	if (drop_it)
2326 		return TRUE;
2327 
2328 	drop_it = !--mdev->ko_count;
2329 	if (!drop_it) {
2330 		dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2331 		       current->comm, current->pid, mdev->ko_count);
2332 		request_ping(mdev);
2333 	}
2334 
2335 	return drop_it; /* && (mdev->state == R_PRIMARY) */;
2336 }
2337 
2338 /* The idea of sendpage seems to be to put some kind of reference
2339  * to the page into the skb, and to hand it over to the NIC. In
2340  * this process get_page() gets called.
2341  *
2342  * As soon as the page was really sent over the network put_page()
2343  * gets called by some part of the network layer. [ NIC driver? ]
2344  *
2345  * [ get_page() / put_page() increment/decrement the count. If count
2346  *   reaches 0 the page will be freed. ]
2347  *
2348  * This works nicely with pages from FSs.
2349  * But this means that in protocol A we might signal IO completion too early!
2350  *
2351  * In order not to corrupt data during a resync we must make sure
2352  * that we do not reuse our own buffer pages (EEs) to early, therefore
2353  * we have the net_ee list.
2354  *
2355  * XFS seems to have problems, still, it submits pages with page_count == 0!
2356  * As a workaround, we disable sendpage on pages
2357  * with page_count == 0 or PageSlab.
2358  */
2359 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2360 		   int offset, size_t size, unsigned msg_flags)
2361 {
2362 	int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, msg_flags);
2363 	kunmap(page);
2364 	if (sent == size)
2365 		mdev->send_cnt += size>>9;
2366 	return sent == size;
2367 }
2368 
2369 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2370 		    int offset, size_t size, unsigned msg_flags)
2371 {
2372 	mm_segment_t oldfs = get_fs();
2373 	int sent, ok;
2374 	int len = size;
2375 
2376 	/* e.g. XFS meta- & log-data is in slab pages, which have a
2377 	 * page_count of 0 and/or have PageSlab() set.
2378 	 * we cannot use send_page for those, as that does get_page();
2379 	 * put_page(); and would cause either a VM_BUG directly, or
2380 	 * __page_cache_release a page that would actually still be referenced
2381 	 * by someone, leading to some obscure delayed Oops somewhere else. */
2382 	if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2383 		return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
2384 
2385 	msg_flags |= MSG_NOSIGNAL;
2386 	drbd_update_congested(mdev);
2387 	set_fs(KERNEL_DS);
2388 	do {
2389 		sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2390 							offset, len,
2391 							msg_flags);
2392 		if (sent == -EAGAIN) {
2393 			if (we_should_drop_the_connection(mdev,
2394 							  mdev->data.socket))
2395 				break;
2396 			else
2397 				continue;
2398 		}
2399 		if (sent <= 0) {
2400 			dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2401 			     __func__, (int)size, len, sent);
2402 			break;
2403 		}
2404 		len    -= sent;
2405 		offset += sent;
2406 	} while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2407 	set_fs(oldfs);
2408 	clear_bit(NET_CONGESTED, &mdev->flags);
2409 
2410 	ok = (len == 0);
2411 	if (likely(ok))
2412 		mdev->send_cnt += size>>9;
2413 	return ok;
2414 }
2415 
2416 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2417 {
2418 	struct bio_vec *bvec;
2419 	int i;
2420 	/* hint all but last page with MSG_MORE */
2421 	__bio_for_each_segment(bvec, bio, i, 0) {
2422 		if (!_drbd_no_send_page(mdev, bvec->bv_page,
2423 				     bvec->bv_offset, bvec->bv_len,
2424 				     i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2425 			return 0;
2426 	}
2427 	return 1;
2428 }
2429 
2430 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2431 {
2432 	struct bio_vec *bvec;
2433 	int i;
2434 	/* hint all but last page with MSG_MORE */
2435 	__bio_for_each_segment(bvec, bio, i, 0) {
2436 		if (!_drbd_send_page(mdev, bvec->bv_page,
2437 				     bvec->bv_offset, bvec->bv_len,
2438 				     i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2439 			return 0;
2440 	}
2441 	return 1;
2442 }
2443 
2444 static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2445 {
2446 	struct page *page = e->pages;
2447 	unsigned len = e->size;
2448 	/* hint all but last page with MSG_MORE */
2449 	page_chain_for_each(page) {
2450 		unsigned l = min_t(unsigned, len, PAGE_SIZE);
2451 		if (!_drbd_send_page(mdev, page, 0, l,
2452 				page_chain_next(page) ? MSG_MORE : 0))
2453 			return 0;
2454 		len -= l;
2455 	}
2456 	return 1;
2457 }
2458 
2459 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
2460 {
2461 	if (mdev->agreed_pro_version >= 95)
2462 		return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
2463 			(bi_rw & REQ_UNPLUG ? DP_UNPLUG : 0) |
2464 			(bi_rw & REQ_FUA ? DP_FUA : 0) |
2465 			(bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
2466 			(bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
2467 	else
2468 		return bi_rw & (REQ_SYNC | REQ_UNPLUG) ? DP_RW_SYNC : 0;
2469 }
2470 
2471 /* Used to send write requests
2472  * R_PRIMARY -> Peer	(P_DATA)
2473  */
2474 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2475 {
2476 	int ok = 1;
2477 	struct p_data p;
2478 	unsigned int dp_flags = 0;
2479 	void *dgb;
2480 	int dgs;
2481 
2482 	if (!drbd_get_data_sock(mdev))
2483 		return 0;
2484 
2485 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2486 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2487 
2488 	if (req->size <= DRBD_MAX_SIZE_H80_PACKET) {
2489 		p.head.h80.magic   = BE_DRBD_MAGIC;
2490 		p.head.h80.command = cpu_to_be16(P_DATA);
2491 		p.head.h80.length  =
2492 			cpu_to_be16(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2493 	} else {
2494 		p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2495 		p.head.h95.command = cpu_to_be16(P_DATA);
2496 		p.head.h95.length  =
2497 			cpu_to_be32(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2498 	}
2499 
2500 	p.sector   = cpu_to_be64(req->sector);
2501 	p.block_id = (unsigned long)req;
2502 	p.seq_num  = cpu_to_be32(req->seq_num =
2503 				 atomic_add_return(1, &mdev->packet_seq));
2504 
2505 	dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
2506 
2507 	if (mdev->state.conn >= C_SYNC_SOURCE &&
2508 	    mdev->state.conn <= C_PAUSED_SYNC_T)
2509 		dp_flags |= DP_MAY_SET_IN_SYNC;
2510 
2511 	p.dp_flags = cpu_to_be32(dp_flags);
2512 	set_bit(UNPLUG_REMOTE, &mdev->flags);
2513 	ok = (sizeof(p) ==
2514 		drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
2515 	if (ok && dgs) {
2516 		dgb = mdev->int_dig_out;
2517 		drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2518 		ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2519 	}
2520 	if (ok) {
2521 		if (mdev->net_conf->wire_protocol == DRBD_PROT_A)
2522 			ok = _drbd_send_bio(mdev, req->master_bio);
2523 		else
2524 			ok = _drbd_send_zc_bio(mdev, req->master_bio);
2525 	}
2526 
2527 	drbd_put_data_sock(mdev);
2528 
2529 	return ok;
2530 }
2531 
2532 /* answer packet, used to send data back for read requests:
2533  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2534  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2535  */
2536 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2537 		    struct drbd_epoch_entry *e)
2538 {
2539 	int ok;
2540 	struct p_data p;
2541 	void *dgb;
2542 	int dgs;
2543 
2544 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2545 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2546 
2547 	if (e->size <= DRBD_MAX_SIZE_H80_PACKET) {
2548 		p.head.h80.magic   = BE_DRBD_MAGIC;
2549 		p.head.h80.command = cpu_to_be16(cmd);
2550 		p.head.h80.length  =
2551 			cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2552 	} else {
2553 		p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2554 		p.head.h95.command = cpu_to_be16(cmd);
2555 		p.head.h95.length  =
2556 			cpu_to_be32(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2557 	}
2558 
2559 	p.sector   = cpu_to_be64(e->sector);
2560 	p.block_id = e->block_id;
2561 	/* p.seq_num  = 0;    No sequence numbers here.. */
2562 
2563 	/* Only called by our kernel thread.
2564 	 * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2565 	 * in response to admin command or module unload.
2566 	 */
2567 	if (!drbd_get_data_sock(mdev))
2568 		return 0;
2569 
2570 	ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0);
2571 	if (ok && dgs) {
2572 		dgb = mdev->int_dig_out;
2573 		drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
2574 		ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2575 	}
2576 	if (ok)
2577 		ok = _drbd_send_zc_ee(mdev, e);
2578 
2579 	drbd_put_data_sock(mdev);
2580 
2581 	return ok;
2582 }
2583 
2584 /*
2585   drbd_send distinguishes two cases:
2586 
2587   Packets sent via the data socket "sock"
2588   and packets sent via the meta data socket "msock"
2589 
2590 		    sock                      msock
2591   -----------------+-------------------------+------------------------------
2592   timeout           conf.timeout / 2          conf.timeout / 2
2593   timeout action    send a ping via msock     Abort communication
2594 					      and close all sockets
2595 */
2596 
2597 /*
2598  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2599  */
2600 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2601 	      void *buf, size_t size, unsigned msg_flags)
2602 {
2603 	struct kvec iov;
2604 	struct msghdr msg;
2605 	int rv, sent = 0;
2606 
2607 	if (!sock)
2608 		return -1000;
2609 
2610 	/* THINK  if (signal_pending) return ... ? */
2611 
2612 	iov.iov_base = buf;
2613 	iov.iov_len  = size;
2614 
2615 	msg.msg_name       = NULL;
2616 	msg.msg_namelen    = 0;
2617 	msg.msg_control    = NULL;
2618 	msg.msg_controllen = 0;
2619 	msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
2620 
2621 	if (sock == mdev->data.socket) {
2622 		mdev->ko_count = mdev->net_conf->ko_count;
2623 		drbd_update_congested(mdev);
2624 	}
2625 	do {
2626 		/* STRANGE
2627 		 * tcp_sendmsg does _not_ use its size parameter at all ?
2628 		 *
2629 		 * -EAGAIN on timeout, -EINTR on signal.
2630 		 */
2631 /* THINK
2632  * do we need to block DRBD_SIG if sock == &meta.socket ??
2633  * otherwise wake_asender() might interrupt some send_*Ack !
2634  */
2635 		rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
2636 		if (rv == -EAGAIN) {
2637 			if (we_should_drop_the_connection(mdev, sock))
2638 				break;
2639 			else
2640 				continue;
2641 		}
2642 		D_ASSERT(rv != 0);
2643 		if (rv == -EINTR) {
2644 			flush_signals(current);
2645 			rv = 0;
2646 		}
2647 		if (rv < 0)
2648 			break;
2649 		sent += rv;
2650 		iov.iov_base += rv;
2651 		iov.iov_len  -= rv;
2652 	} while (sent < size);
2653 
2654 	if (sock == mdev->data.socket)
2655 		clear_bit(NET_CONGESTED, &mdev->flags);
2656 
2657 	if (rv <= 0) {
2658 		if (rv != -EAGAIN) {
2659 			dev_err(DEV, "%s_sendmsg returned %d\n",
2660 			    sock == mdev->meta.socket ? "msock" : "sock",
2661 			    rv);
2662 			drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
2663 		} else
2664 			drbd_force_state(mdev, NS(conn, C_TIMEOUT));
2665 	}
2666 
2667 	return sent;
2668 }
2669 
2670 static int drbd_open(struct block_device *bdev, fmode_t mode)
2671 {
2672 	struct drbd_conf *mdev = bdev->bd_disk->private_data;
2673 	unsigned long flags;
2674 	int rv = 0;
2675 
2676 	mutex_lock(&drbd_main_mutex);
2677 	spin_lock_irqsave(&mdev->req_lock, flags);
2678 	/* to have a stable mdev->state.role
2679 	 * and no race with updating open_cnt */
2680 
2681 	if (mdev->state.role != R_PRIMARY) {
2682 		if (mode & FMODE_WRITE)
2683 			rv = -EROFS;
2684 		else if (!allow_oos)
2685 			rv = -EMEDIUMTYPE;
2686 	}
2687 
2688 	if (!rv)
2689 		mdev->open_cnt++;
2690 	spin_unlock_irqrestore(&mdev->req_lock, flags);
2691 	mutex_unlock(&drbd_main_mutex);
2692 
2693 	return rv;
2694 }
2695 
2696 static int drbd_release(struct gendisk *gd, fmode_t mode)
2697 {
2698 	struct drbd_conf *mdev = gd->private_data;
2699 	mutex_lock(&drbd_main_mutex);
2700 	mdev->open_cnt--;
2701 	mutex_unlock(&drbd_main_mutex);
2702 	return 0;
2703 }
2704 
2705 static void drbd_unplug_fn(struct request_queue *q)
2706 {
2707 	struct drbd_conf *mdev = q->queuedata;
2708 
2709 	/* unplug FIRST */
2710 	spin_lock_irq(q->queue_lock);
2711 	blk_remove_plug(q);
2712 	spin_unlock_irq(q->queue_lock);
2713 
2714 	/* only if connected */
2715 	spin_lock_irq(&mdev->req_lock);
2716 	if (mdev->state.pdsk >= D_INCONSISTENT && mdev->state.conn >= C_CONNECTED) {
2717 		D_ASSERT(mdev->state.role == R_PRIMARY);
2718 		if (test_and_clear_bit(UNPLUG_REMOTE, &mdev->flags)) {
2719 			/* add to the data.work queue,
2720 			 * unless already queued.
2721 			 * XXX this might be a good addition to drbd_queue_work
2722 			 * anyways, to detect "double queuing" ... */
2723 			if (list_empty(&mdev->unplug_work.list))
2724 				drbd_queue_work(&mdev->data.work,
2725 						&mdev->unplug_work);
2726 		}
2727 	}
2728 	spin_unlock_irq(&mdev->req_lock);
2729 
2730 	if (mdev->state.disk >= D_INCONSISTENT)
2731 		drbd_kick_lo(mdev);
2732 }
2733 
2734 static void drbd_set_defaults(struct drbd_conf *mdev)
2735 {
2736 	/* This way we get a compile error when sync_conf grows,
2737 	   and we forgot to initialize it here */
2738 	mdev->sync_conf = (struct syncer_conf) {
2739 		/* .rate = */		DRBD_RATE_DEF,
2740 		/* .after = */		DRBD_AFTER_DEF,
2741 		/* .al_extents = */	DRBD_AL_EXTENTS_DEF,
2742 		/* .verify_alg = */	{}, 0,
2743 		/* .cpu_mask = */	{}, 0,
2744 		/* .csums_alg = */	{}, 0,
2745 		/* .use_rle = */	0,
2746 		/* .on_no_data = */	DRBD_ON_NO_DATA_DEF,
2747 		/* .c_plan_ahead = */	DRBD_C_PLAN_AHEAD_DEF,
2748 		/* .c_delay_target = */	DRBD_C_DELAY_TARGET_DEF,
2749 		/* .c_fill_target = */	DRBD_C_FILL_TARGET_DEF,
2750 		/* .c_max_rate = */	DRBD_C_MAX_RATE_DEF,
2751 		/* .c_min_rate = */	DRBD_C_MIN_RATE_DEF
2752 	};
2753 
2754 	/* Have to use that way, because the layout differs between
2755 	   big endian and little endian */
2756 	mdev->state = (union drbd_state) {
2757 		{ .role = R_SECONDARY,
2758 		  .peer = R_UNKNOWN,
2759 		  .conn = C_STANDALONE,
2760 		  .disk = D_DISKLESS,
2761 		  .pdsk = D_UNKNOWN,
2762 		  .susp = 0,
2763 		  .susp_nod = 0,
2764 		  .susp_fen = 0
2765 		} };
2766 }
2767 
2768 void drbd_init_set_defaults(struct drbd_conf *mdev)
2769 {
2770 	/* the memset(,0,) did most of this.
2771 	 * note: only assignments, no allocation in here */
2772 
2773 	drbd_set_defaults(mdev);
2774 
2775 	/* for now, we do NOT yet support it,
2776 	 * even though we start some framework
2777 	 * to eventually support barriers */
2778 	set_bit(NO_BARRIER_SUPP, &mdev->flags);
2779 
2780 	atomic_set(&mdev->ap_bio_cnt, 0);
2781 	atomic_set(&mdev->ap_pending_cnt, 0);
2782 	atomic_set(&mdev->rs_pending_cnt, 0);
2783 	atomic_set(&mdev->unacked_cnt, 0);
2784 	atomic_set(&mdev->local_cnt, 0);
2785 	atomic_set(&mdev->net_cnt, 0);
2786 	atomic_set(&mdev->packet_seq, 0);
2787 	atomic_set(&mdev->pp_in_use, 0);
2788 	atomic_set(&mdev->pp_in_use_by_net, 0);
2789 	atomic_set(&mdev->rs_sect_in, 0);
2790 	atomic_set(&mdev->rs_sect_ev, 0);
2791 
2792 	mutex_init(&mdev->md_io_mutex);
2793 	mutex_init(&mdev->data.mutex);
2794 	mutex_init(&mdev->meta.mutex);
2795 	sema_init(&mdev->data.work.s, 0);
2796 	sema_init(&mdev->meta.work.s, 0);
2797 	mutex_init(&mdev->state_mutex);
2798 
2799 	spin_lock_init(&mdev->data.work.q_lock);
2800 	spin_lock_init(&mdev->meta.work.q_lock);
2801 
2802 	spin_lock_init(&mdev->al_lock);
2803 	spin_lock_init(&mdev->req_lock);
2804 	spin_lock_init(&mdev->peer_seq_lock);
2805 	spin_lock_init(&mdev->epoch_lock);
2806 
2807 	INIT_LIST_HEAD(&mdev->active_ee);
2808 	INIT_LIST_HEAD(&mdev->sync_ee);
2809 	INIT_LIST_HEAD(&mdev->done_ee);
2810 	INIT_LIST_HEAD(&mdev->read_ee);
2811 	INIT_LIST_HEAD(&mdev->net_ee);
2812 	INIT_LIST_HEAD(&mdev->resync_reads);
2813 	INIT_LIST_HEAD(&mdev->data.work.q);
2814 	INIT_LIST_HEAD(&mdev->meta.work.q);
2815 	INIT_LIST_HEAD(&mdev->resync_work.list);
2816 	INIT_LIST_HEAD(&mdev->unplug_work.list);
2817 	INIT_LIST_HEAD(&mdev->go_diskless.list);
2818 	INIT_LIST_HEAD(&mdev->md_sync_work.list);
2819 	INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2820 
2821 	mdev->resync_work.cb  = w_resync_inactive;
2822 	mdev->unplug_work.cb  = w_send_write_hint;
2823 	mdev->go_diskless.cb  = w_go_diskless;
2824 	mdev->md_sync_work.cb = w_md_sync;
2825 	mdev->bm_io_work.w.cb = w_bitmap_io;
2826 	init_timer(&mdev->resync_timer);
2827 	init_timer(&mdev->md_sync_timer);
2828 	mdev->resync_timer.function = resync_timer_fn;
2829 	mdev->resync_timer.data = (unsigned long) mdev;
2830 	mdev->md_sync_timer.function = md_sync_timer_fn;
2831 	mdev->md_sync_timer.data = (unsigned long) mdev;
2832 
2833 	init_waitqueue_head(&mdev->misc_wait);
2834 	init_waitqueue_head(&mdev->state_wait);
2835 	init_waitqueue_head(&mdev->net_cnt_wait);
2836 	init_waitqueue_head(&mdev->ee_wait);
2837 	init_waitqueue_head(&mdev->al_wait);
2838 	init_waitqueue_head(&mdev->seq_wait);
2839 
2840 	drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
2841 	drbd_thread_init(mdev, &mdev->worker, drbd_worker);
2842 	drbd_thread_init(mdev, &mdev->asender, drbd_asender);
2843 
2844 	mdev->agreed_pro_version = PRO_VERSION_MAX;
2845 	mdev->write_ordering = WO_bio_barrier;
2846 	mdev->resync_wenr = LC_FREE;
2847 }
2848 
2849 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2850 {
2851 	int i;
2852 	if (mdev->receiver.t_state != None)
2853 		dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2854 				mdev->receiver.t_state);
2855 
2856 	/* no need to lock it, I'm the only thread alive */
2857 	if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2858 		dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2859 	mdev->al_writ_cnt  =
2860 	mdev->bm_writ_cnt  =
2861 	mdev->read_cnt     =
2862 	mdev->recv_cnt     =
2863 	mdev->send_cnt     =
2864 	mdev->writ_cnt     =
2865 	mdev->p_size       =
2866 	mdev->rs_start     =
2867 	mdev->rs_total     =
2868 	mdev->rs_failed    = 0;
2869 	mdev->rs_last_events = 0;
2870 	mdev->rs_last_sect_ev = 0;
2871 	for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2872 		mdev->rs_mark_left[i] = 0;
2873 		mdev->rs_mark_time[i] = 0;
2874 	}
2875 	D_ASSERT(mdev->net_conf == NULL);
2876 
2877 	drbd_set_my_capacity(mdev, 0);
2878 	if (mdev->bitmap) {
2879 		/* maybe never allocated. */
2880 		drbd_bm_resize(mdev, 0, 1);
2881 		drbd_bm_cleanup(mdev);
2882 	}
2883 
2884 	drbd_free_resources(mdev);
2885 	clear_bit(AL_SUSPENDED, &mdev->flags);
2886 
2887 	/*
2888 	 * currently we drbd_init_ee only on module load, so
2889 	 * we may do drbd_release_ee only on module unload!
2890 	 */
2891 	D_ASSERT(list_empty(&mdev->active_ee));
2892 	D_ASSERT(list_empty(&mdev->sync_ee));
2893 	D_ASSERT(list_empty(&mdev->done_ee));
2894 	D_ASSERT(list_empty(&mdev->read_ee));
2895 	D_ASSERT(list_empty(&mdev->net_ee));
2896 	D_ASSERT(list_empty(&mdev->resync_reads));
2897 	D_ASSERT(list_empty(&mdev->data.work.q));
2898 	D_ASSERT(list_empty(&mdev->meta.work.q));
2899 	D_ASSERT(list_empty(&mdev->resync_work.list));
2900 	D_ASSERT(list_empty(&mdev->unplug_work.list));
2901 	D_ASSERT(list_empty(&mdev->go_diskless.list));
2902 
2903 }
2904 
2905 
2906 static void drbd_destroy_mempools(void)
2907 {
2908 	struct page *page;
2909 
2910 	while (drbd_pp_pool) {
2911 		page = drbd_pp_pool;
2912 		drbd_pp_pool = (struct page *)page_private(page);
2913 		__free_page(page);
2914 		drbd_pp_vacant--;
2915 	}
2916 
2917 	/* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2918 
2919 	if (drbd_ee_mempool)
2920 		mempool_destroy(drbd_ee_mempool);
2921 	if (drbd_request_mempool)
2922 		mempool_destroy(drbd_request_mempool);
2923 	if (drbd_ee_cache)
2924 		kmem_cache_destroy(drbd_ee_cache);
2925 	if (drbd_request_cache)
2926 		kmem_cache_destroy(drbd_request_cache);
2927 	if (drbd_bm_ext_cache)
2928 		kmem_cache_destroy(drbd_bm_ext_cache);
2929 	if (drbd_al_ext_cache)
2930 		kmem_cache_destroy(drbd_al_ext_cache);
2931 
2932 	drbd_ee_mempool      = NULL;
2933 	drbd_request_mempool = NULL;
2934 	drbd_ee_cache        = NULL;
2935 	drbd_request_cache   = NULL;
2936 	drbd_bm_ext_cache    = NULL;
2937 	drbd_al_ext_cache    = NULL;
2938 
2939 	return;
2940 }
2941 
2942 static int drbd_create_mempools(void)
2943 {
2944 	struct page *page;
2945 	const int number = (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE) * minor_count;
2946 	int i;
2947 
2948 	/* prepare our caches and mempools */
2949 	drbd_request_mempool = NULL;
2950 	drbd_ee_cache        = NULL;
2951 	drbd_request_cache   = NULL;
2952 	drbd_bm_ext_cache    = NULL;
2953 	drbd_al_ext_cache    = NULL;
2954 	drbd_pp_pool         = NULL;
2955 
2956 	/* caches */
2957 	drbd_request_cache = kmem_cache_create(
2958 		"drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2959 	if (drbd_request_cache == NULL)
2960 		goto Enomem;
2961 
2962 	drbd_ee_cache = kmem_cache_create(
2963 		"drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
2964 	if (drbd_ee_cache == NULL)
2965 		goto Enomem;
2966 
2967 	drbd_bm_ext_cache = kmem_cache_create(
2968 		"drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2969 	if (drbd_bm_ext_cache == NULL)
2970 		goto Enomem;
2971 
2972 	drbd_al_ext_cache = kmem_cache_create(
2973 		"drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2974 	if (drbd_al_ext_cache == NULL)
2975 		goto Enomem;
2976 
2977 	/* mempools */
2978 	drbd_request_mempool = mempool_create(number,
2979 		mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2980 	if (drbd_request_mempool == NULL)
2981 		goto Enomem;
2982 
2983 	drbd_ee_mempool = mempool_create(number,
2984 		mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2985 	if (drbd_request_mempool == NULL)
2986 		goto Enomem;
2987 
2988 	/* drbd's page pool */
2989 	spin_lock_init(&drbd_pp_lock);
2990 
2991 	for (i = 0; i < number; i++) {
2992 		page = alloc_page(GFP_HIGHUSER);
2993 		if (!page)
2994 			goto Enomem;
2995 		set_page_private(page, (unsigned long)drbd_pp_pool);
2996 		drbd_pp_pool = page;
2997 	}
2998 	drbd_pp_vacant = number;
2999 
3000 	return 0;
3001 
3002 Enomem:
3003 	drbd_destroy_mempools(); /* in case we allocated some */
3004 	return -ENOMEM;
3005 }
3006 
3007 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
3008 	void *unused)
3009 {
3010 	/* just so we have it.  you never know what interesting things we
3011 	 * might want to do here some day...
3012 	 */
3013 
3014 	return NOTIFY_DONE;
3015 }
3016 
3017 static struct notifier_block drbd_notifier = {
3018 	.notifier_call = drbd_notify_sys,
3019 };
3020 
3021 static void drbd_release_ee_lists(struct drbd_conf *mdev)
3022 {
3023 	int rr;
3024 
3025 	rr = drbd_release_ee(mdev, &mdev->active_ee);
3026 	if (rr)
3027 		dev_err(DEV, "%d EEs in active list found!\n", rr);
3028 
3029 	rr = drbd_release_ee(mdev, &mdev->sync_ee);
3030 	if (rr)
3031 		dev_err(DEV, "%d EEs in sync list found!\n", rr);
3032 
3033 	rr = drbd_release_ee(mdev, &mdev->read_ee);
3034 	if (rr)
3035 		dev_err(DEV, "%d EEs in read list found!\n", rr);
3036 
3037 	rr = drbd_release_ee(mdev, &mdev->done_ee);
3038 	if (rr)
3039 		dev_err(DEV, "%d EEs in done list found!\n", rr);
3040 
3041 	rr = drbd_release_ee(mdev, &mdev->net_ee);
3042 	if (rr)
3043 		dev_err(DEV, "%d EEs in net list found!\n", rr);
3044 }
3045 
3046 /* caution. no locking.
3047  * currently only used from module cleanup code. */
3048 static void drbd_delete_device(unsigned int minor)
3049 {
3050 	struct drbd_conf *mdev = minor_to_mdev(minor);
3051 
3052 	if (!mdev)
3053 		return;
3054 
3055 	/* paranoia asserts */
3056 	if (mdev->open_cnt != 0)
3057 		dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
3058 				__FILE__ , __LINE__);
3059 
3060 	ERR_IF (!list_empty(&mdev->data.work.q)) {
3061 		struct list_head *lp;
3062 		list_for_each(lp, &mdev->data.work.q) {
3063 			dev_err(DEV, "lp = %p\n", lp);
3064 		}
3065 	};
3066 	/* end paranoia asserts */
3067 
3068 	del_gendisk(mdev->vdisk);
3069 
3070 	/* cleanup stuff that may have been allocated during
3071 	 * device (re-)configuration or state changes */
3072 
3073 	if (mdev->this_bdev)
3074 		bdput(mdev->this_bdev);
3075 
3076 	drbd_free_resources(mdev);
3077 
3078 	drbd_release_ee_lists(mdev);
3079 
3080 	/* should be free'd on disconnect? */
3081 	kfree(mdev->ee_hash);
3082 	/*
3083 	mdev->ee_hash_s = 0;
3084 	mdev->ee_hash = NULL;
3085 	*/
3086 
3087 	lc_destroy(mdev->act_log);
3088 	lc_destroy(mdev->resync);
3089 
3090 	kfree(mdev->p_uuid);
3091 	/* mdev->p_uuid = NULL; */
3092 
3093 	kfree(mdev->int_dig_out);
3094 	kfree(mdev->int_dig_in);
3095 	kfree(mdev->int_dig_vv);
3096 
3097 	/* cleanup the rest that has been
3098 	 * allocated from drbd_new_device
3099 	 * and actually free the mdev itself */
3100 	drbd_free_mdev(mdev);
3101 }
3102 
3103 static void drbd_cleanup(void)
3104 {
3105 	unsigned int i;
3106 
3107 	unregister_reboot_notifier(&drbd_notifier);
3108 
3109 	drbd_nl_cleanup();
3110 
3111 	if (minor_table) {
3112 		if (drbd_proc)
3113 			remove_proc_entry("drbd", NULL);
3114 		i = minor_count;
3115 		while (i--)
3116 			drbd_delete_device(i);
3117 		drbd_destroy_mempools();
3118 	}
3119 
3120 	kfree(minor_table);
3121 
3122 	unregister_blkdev(DRBD_MAJOR, "drbd");
3123 
3124 	printk(KERN_INFO "drbd: module cleanup done.\n");
3125 }
3126 
3127 /**
3128  * drbd_congested() - Callback for pdflush
3129  * @congested_data:	User data
3130  * @bdi_bits:		Bits pdflush is currently interested in
3131  *
3132  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
3133  */
3134 static int drbd_congested(void *congested_data, int bdi_bits)
3135 {
3136 	struct drbd_conf *mdev = congested_data;
3137 	struct request_queue *q;
3138 	char reason = '-';
3139 	int r = 0;
3140 
3141 	if (!__inc_ap_bio_cond(mdev)) {
3142 		/* DRBD has frozen IO */
3143 		r = bdi_bits;
3144 		reason = 'd';
3145 		goto out;
3146 	}
3147 
3148 	if (get_ldev(mdev)) {
3149 		q = bdev_get_queue(mdev->ldev->backing_bdev);
3150 		r = bdi_congested(&q->backing_dev_info, bdi_bits);
3151 		put_ldev(mdev);
3152 		if (r)
3153 			reason = 'b';
3154 	}
3155 
3156 	if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
3157 		r |= (1 << BDI_async_congested);
3158 		reason = reason == 'b' ? 'a' : 'n';
3159 	}
3160 
3161 out:
3162 	mdev->congestion_reason = reason;
3163 	return r;
3164 }
3165 
3166 struct drbd_conf *drbd_new_device(unsigned int minor)
3167 {
3168 	struct drbd_conf *mdev;
3169 	struct gendisk *disk;
3170 	struct request_queue *q;
3171 
3172 	/* GFP_KERNEL, we are outside of all write-out paths */
3173 	mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
3174 	if (!mdev)
3175 		return NULL;
3176 	if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
3177 		goto out_no_cpumask;
3178 
3179 	mdev->minor = minor;
3180 
3181 	drbd_init_set_defaults(mdev);
3182 
3183 	q = blk_alloc_queue(GFP_KERNEL);
3184 	if (!q)
3185 		goto out_no_q;
3186 	mdev->rq_queue = q;
3187 	q->queuedata   = mdev;
3188 
3189 	disk = alloc_disk(1);
3190 	if (!disk)
3191 		goto out_no_disk;
3192 	mdev->vdisk = disk;
3193 
3194 	set_disk_ro(disk, TRUE);
3195 
3196 	disk->queue = q;
3197 	disk->major = DRBD_MAJOR;
3198 	disk->first_minor = minor;
3199 	disk->fops = &drbd_ops;
3200 	sprintf(disk->disk_name, "drbd%d", minor);
3201 	disk->private_data = mdev;
3202 
3203 	mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3204 	/* we have no partitions. we contain only ourselves. */
3205 	mdev->this_bdev->bd_contains = mdev->this_bdev;
3206 
3207 	q->backing_dev_info.congested_fn = drbd_congested;
3208 	q->backing_dev_info.congested_data = mdev;
3209 
3210 	blk_queue_make_request(q, drbd_make_request_26);
3211 	blk_queue_max_segment_size(q, DRBD_MAX_SEGMENT_SIZE);
3212 	blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3213 	blk_queue_merge_bvec(q, drbd_merge_bvec);
3214 	q->queue_lock = &mdev->req_lock; /* needed since we use */
3215 		/* plugging on a queue, that actually has no requests! */
3216 	q->unplug_fn = drbd_unplug_fn;
3217 
3218 	mdev->md_io_page = alloc_page(GFP_KERNEL);
3219 	if (!mdev->md_io_page)
3220 		goto out_no_io_page;
3221 
3222 	if (drbd_bm_init(mdev))
3223 		goto out_no_bitmap;
3224 	/* no need to lock access, we are still initializing this minor device. */
3225 	if (!tl_init(mdev))
3226 		goto out_no_tl;
3227 
3228 	mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3229 	if (!mdev->app_reads_hash)
3230 		goto out_no_app_reads;
3231 
3232 	mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3233 	if (!mdev->current_epoch)
3234 		goto out_no_epoch;
3235 
3236 	INIT_LIST_HEAD(&mdev->current_epoch->list);
3237 	mdev->epochs = 1;
3238 
3239 	return mdev;
3240 
3241 /* out_whatever_else:
3242 	kfree(mdev->current_epoch); */
3243 out_no_epoch:
3244 	kfree(mdev->app_reads_hash);
3245 out_no_app_reads:
3246 	tl_cleanup(mdev);
3247 out_no_tl:
3248 	drbd_bm_cleanup(mdev);
3249 out_no_bitmap:
3250 	__free_page(mdev->md_io_page);
3251 out_no_io_page:
3252 	put_disk(disk);
3253 out_no_disk:
3254 	blk_cleanup_queue(q);
3255 out_no_q:
3256 	free_cpumask_var(mdev->cpu_mask);
3257 out_no_cpumask:
3258 	kfree(mdev);
3259 	return NULL;
3260 }
3261 
3262 /* counterpart of drbd_new_device.
3263  * last part of drbd_delete_device. */
3264 void drbd_free_mdev(struct drbd_conf *mdev)
3265 {
3266 	kfree(mdev->current_epoch);
3267 	kfree(mdev->app_reads_hash);
3268 	tl_cleanup(mdev);
3269 	if (mdev->bitmap) /* should no longer be there. */
3270 		drbd_bm_cleanup(mdev);
3271 	__free_page(mdev->md_io_page);
3272 	put_disk(mdev->vdisk);
3273 	blk_cleanup_queue(mdev->rq_queue);
3274 	free_cpumask_var(mdev->cpu_mask);
3275 	kfree(mdev);
3276 }
3277 
3278 
3279 int __init drbd_init(void)
3280 {
3281 	int err;
3282 
3283 	if (sizeof(struct p_handshake) != 80) {
3284 		printk(KERN_ERR
3285 		       "drbd: never change the size or layout "
3286 		       "of the HandShake packet.\n");
3287 		return -EINVAL;
3288 	}
3289 
3290 	if (1 > minor_count || minor_count > 255) {
3291 		printk(KERN_ERR
3292 			"drbd: invalid minor_count (%d)\n", minor_count);
3293 #ifdef MODULE
3294 		return -EINVAL;
3295 #else
3296 		minor_count = 8;
3297 #endif
3298 	}
3299 
3300 	err = drbd_nl_init();
3301 	if (err)
3302 		return err;
3303 
3304 	err = register_blkdev(DRBD_MAJOR, "drbd");
3305 	if (err) {
3306 		printk(KERN_ERR
3307 		       "drbd: unable to register block device major %d\n",
3308 		       DRBD_MAJOR);
3309 		return err;
3310 	}
3311 
3312 	register_reboot_notifier(&drbd_notifier);
3313 
3314 	/*
3315 	 * allocate all necessary structs
3316 	 */
3317 	err = -ENOMEM;
3318 
3319 	init_waitqueue_head(&drbd_pp_wait);
3320 
3321 	drbd_proc = NULL; /* play safe for drbd_cleanup */
3322 	minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3323 				GFP_KERNEL);
3324 	if (!minor_table)
3325 		goto Enomem;
3326 
3327 	err = drbd_create_mempools();
3328 	if (err)
3329 		goto Enomem;
3330 
3331 	drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
3332 	if (!drbd_proc)	{
3333 		printk(KERN_ERR "drbd: unable to register proc file\n");
3334 		goto Enomem;
3335 	}
3336 
3337 	rwlock_init(&global_state_lock);
3338 
3339 	printk(KERN_INFO "drbd: initialized. "
3340 	       "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3341 	       API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3342 	printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3343 	printk(KERN_INFO "drbd: registered as block device major %d\n",
3344 		DRBD_MAJOR);
3345 	printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3346 
3347 	return 0; /* Success! */
3348 
3349 Enomem:
3350 	drbd_cleanup();
3351 	if (err == -ENOMEM)
3352 		/* currently always the case */
3353 		printk(KERN_ERR "drbd: ran out of memory\n");
3354 	else
3355 		printk(KERN_ERR "drbd: initialization failure\n");
3356 	return err;
3357 }
3358 
3359 void drbd_free_bc(struct drbd_backing_dev *ldev)
3360 {
3361 	if (ldev == NULL)
3362 		return;
3363 
3364 	bd_release(ldev->backing_bdev);
3365 	bd_release(ldev->md_bdev);
3366 
3367 	fput(ldev->lo_file);
3368 	fput(ldev->md_file);
3369 
3370 	kfree(ldev);
3371 }
3372 
3373 void drbd_free_sock(struct drbd_conf *mdev)
3374 {
3375 	if (mdev->data.socket) {
3376 		mutex_lock(&mdev->data.mutex);
3377 		kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3378 		sock_release(mdev->data.socket);
3379 		mdev->data.socket = NULL;
3380 		mutex_unlock(&mdev->data.mutex);
3381 	}
3382 	if (mdev->meta.socket) {
3383 		mutex_lock(&mdev->meta.mutex);
3384 		kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3385 		sock_release(mdev->meta.socket);
3386 		mdev->meta.socket = NULL;
3387 		mutex_unlock(&mdev->meta.mutex);
3388 	}
3389 }
3390 
3391 
3392 void drbd_free_resources(struct drbd_conf *mdev)
3393 {
3394 	crypto_free_hash(mdev->csums_tfm);
3395 	mdev->csums_tfm = NULL;
3396 	crypto_free_hash(mdev->verify_tfm);
3397 	mdev->verify_tfm = NULL;
3398 	crypto_free_hash(mdev->cram_hmac_tfm);
3399 	mdev->cram_hmac_tfm = NULL;
3400 	crypto_free_hash(mdev->integrity_w_tfm);
3401 	mdev->integrity_w_tfm = NULL;
3402 	crypto_free_hash(mdev->integrity_r_tfm);
3403 	mdev->integrity_r_tfm = NULL;
3404 
3405 	drbd_free_sock(mdev);
3406 
3407 	__no_warn(local,
3408 		  drbd_free_bc(mdev->ldev);
3409 		  mdev->ldev = NULL;);
3410 }
3411 
3412 /* meta data management */
3413 
3414 struct meta_data_on_disk {
3415 	u64 la_size;           /* last agreed size. */
3416 	u64 uuid[UI_SIZE];   /* UUIDs. */
3417 	u64 device_uuid;
3418 	u64 reserved_u64_1;
3419 	u32 flags;             /* MDF */
3420 	u32 magic;
3421 	u32 md_size_sect;
3422 	u32 al_offset;         /* offset to this block */
3423 	u32 al_nr_extents;     /* important for restoring the AL */
3424 	      /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3425 	u32 bm_offset;         /* offset to the bitmap, from here */
3426 	u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3427 	u32 reserved_u32[4];
3428 
3429 } __packed;
3430 
3431 /**
3432  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3433  * @mdev:	DRBD device.
3434  */
3435 void drbd_md_sync(struct drbd_conf *mdev)
3436 {
3437 	struct meta_data_on_disk *buffer;
3438 	sector_t sector;
3439 	int i;
3440 
3441 	del_timer(&mdev->md_sync_timer);
3442 	/* timer may be rearmed by drbd_md_mark_dirty() now. */
3443 	if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3444 		return;
3445 
3446 	/* We use here D_FAILED and not D_ATTACHING because we try to write
3447 	 * metadata even if we detach due to a disk failure! */
3448 	if (!get_ldev_if_state(mdev, D_FAILED))
3449 		return;
3450 
3451 	mutex_lock(&mdev->md_io_mutex);
3452 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3453 	memset(buffer, 0, 512);
3454 
3455 	buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3456 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3457 		buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3458 	buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3459 	buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3460 
3461 	buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3462 	buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3463 	buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3464 	buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3465 	buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3466 
3467 	buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3468 
3469 	D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3470 	sector = mdev->ldev->md.md_offset;
3471 
3472 	if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3473 		/* this was a try anyways ... */
3474 		dev_err(DEV, "meta data update failed!\n");
3475 		drbd_chk_io_error(mdev, 1, TRUE);
3476 	}
3477 
3478 	/* Update mdev->ldev->md.la_size_sect,
3479 	 * since we updated it on metadata. */
3480 	mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3481 
3482 	mutex_unlock(&mdev->md_io_mutex);
3483 	put_ldev(mdev);
3484 }
3485 
3486 /**
3487  * drbd_md_read() - Reads in the meta data super block
3488  * @mdev:	DRBD device.
3489  * @bdev:	Device from which the meta data should be read in.
3490  *
3491  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_codes in case
3492  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3493  */
3494 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3495 {
3496 	struct meta_data_on_disk *buffer;
3497 	int i, rv = NO_ERROR;
3498 
3499 	if (!get_ldev_if_state(mdev, D_ATTACHING))
3500 		return ERR_IO_MD_DISK;
3501 
3502 	mutex_lock(&mdev->md_io_mutex);
3503 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3504 
3505 	if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3506 		/* NOTE: cant do normal error processing here as this is
3507 		   called BEFORE disk is attached */
3508 		dev_err(DEV, "Error while reading metadata.\n");
3509 		rv = ERR_IO_MD_DISK;
3510 		goto err;
3511 	}
3512 
3513 	if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3514 		dev_err(DEV, "Error while reading metadata, magic not found.\n");
3515 		rv = ERR_MD_INVALID;
3516 		goto err;
3517 	}
3518 	if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3519 		dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3520 		    be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3521 		rv = ERR_MD_INVALID;
3522 		goto err;
3523 	}
3524 	if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3525 		dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3526 		    be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3527 		rv = ERR_MD_INVALID;
3528 		goto err;
3529 	}
3530 	if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3531 		dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3532 		    be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3533 		rv = ERR_MD_INVALID;
3534 		goto err;
3535 	}
3536 
3537 	if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3538 		dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3539 		    be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3540 		rv = ERR_MD_INVALID;
3541 		goto err;
3542 	}
3543 
3544 	bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3545 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3546 		bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3547 	bdev->md.flags = be32_to_cpu(buffer->flags);
3548 	mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3549 	bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3550 
3551 	if (mdev->sync_conf.al_extents < 7)
3552 		mdev->sync_conf.al_extents = 127;
3553 
3554  err:
3555 	mutex_unlock(&mdev->md_io_mutex);
3556 	put_ldev(mdev);
3557 
3558 	return rv;
3559 }
3560 
3561 static void debug_drbd_uuid(struct drbd_conf *mdev, enum drbd_uuid_index index)
3562 {
3563 	static char *uuid_str[UI_EXTENDED_SIZE] = {
3564 		[UI_CURRENT] = "CURRENT",
3565 		[UI_BITMAP] = "BITMAP",
3566 		[UI_HISTORY_START] = "HISTORY_START",
3567 		[UI_HISTORY_END] = "HISTORY_END",
3568 		[UI_SIZE] = "SIZE",
3569 		[UI_FLAGS] = "FLAGS",
3570 	};
3571 
3572 	if (index >= UI_EXTENDED_SIZE) {
3573 		dev_warn(DEV, " uuid_index >= EXTENDED_SIZE\n");
3574 		return;
3575 	}
3576 
3577 	dynamic_dev_dbg(DEV, " uuid[%s] now %016llX\n",
3578 		 uuid_str[index],
3579 		 (unsigned long long)mdev->ldev->md.uuid[index]);
3580 }
3581 
3582 
3583 /**
3584  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3585  * @mdev:	DRBD device.
3586  *
3587  * Call this function if you change anything that should be written to
3588  * the meta-data super block. This function sets MD_DIRTY, and starts a
3589  * timer that ensures that within five seconds you have to call drbd_md_sync().
3590  */
3591 #ifdef DEBUG
3592 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
3593 {
3594 	if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
3595 		mod_timer(&mdev->md_sync_timer, jiffies + HZ);
3596 		mdev->last_md_mark_dirty.line = line;
3597 		mdev->last_md_mark_dirty.func = func;
3598 	}
3599 }
3600 #else
3601 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3602 {
3603 	if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
3604 		mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3605 }
3606 #endif
3607 
3608 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3609 {
3610 	int i;
3611 
3612 	for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++) {
3613 		mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3614 		debug_drbd_uuid(mdev, i+1);
3615 	}
3616 }
3617 
3618 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3619 {
3620 	if (idx == UI_CURRENT) {
3621 		if (mdev->state.role == R_PRIMARY)
3622 			val |= 1;
3623 		else
3624 			val &= ~((u64)1);
3625 
3626 		drbd_set_ed_uuid(mdev, val);
3627 	}
3628 
3629 	mdev->ldev->md.uuid[idx] = val;
3630 	debug_drbd_uuid(mdev, idx);
3631 	drbd_md_mark_dirty(mdev);
3632 }
3633 
3634 
3635 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3636 {
3637 	if (mdev->ldev->md.uuid[idx]) {
3638 		drbd_uuid_move_history(mdev);
3639 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3640 		debug_drbd_uuid(mdev, UI_HISTORY_START);
3641 	}
3642 	_drbd_uuid_set(mdev, idx, val);
3643 }
3644 
3645 /**
3646  * drbd_uuid_new_current() - Creates a new current UUID
3647  * @mdev:	DRBD device.
3648  *
3649  * Creates a new current UUID, and rotates the old current UUID into
3650  * the bitmap slot. Causes an incremental resync upon next connect.
3651  */
3652 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3653 {
3654 	u64 val;
3655 
3656 	dev_info(DEV, "Creating new current UUID\n");
3657 	D_ASSERT(mdev->ldev->md.uuid[UI_BITMAP] == 0);
3658 	mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3659 	debug_drbd_uuid(mdev, UI_BITMAP);
3660 
3661 	get_random_bytes(&val, sizeof(u64));
3662 	_drbd_uuid_set(mdev, UI_CURRENT, val);
3663 }
3664 
3665 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3666 {
3667 	if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3668 		return;
3669 
3670 	if (val == 0) {
3671 		drbd_uuid_move_history(mdev);
3672 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3673 		mdev->ldev->md.uuid[UI_BITMAP] = 0;
3674 		debug_drbd_uuid(mdev, UI_HISTORY_START);
3675 		debug_drbd_uuid(mdev, UI_BITMAP);
3676 	} else {
3677 		if (mdev->ldev->md.uuid[UI_BITMAP])
3678 			dev_warn(DEV, "bm UUID already set");
3679 
3680 		mdev->ldev->md.uuid[UI_BITMAP] = val;
3681 		mdev->ldev->md.uuid[UI_BITMAP] &= ~((u64)1);
3682 
3683 		debug_drbd_uuid(mdev, UI_BITMAP);
3684 	}
3685 	drbd_md_mark_dirty(mdev);
3686 }
3687 
3688 /**
3689  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3690  * @mdev:	DRBD device.
3691  *
3692  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3693  */
3694 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3695 {
3696 	int rv = -EIO;
3697 
3698 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
3699 		drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3700 		drbd_md_sync(mdev);
3701 		drbd_bm_set_all(mdev);
3702 
3703 		rv = drbd_bm_write(mdev);
3704 
3705 		if (!rv) {
3706 			drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3707 			drbd_md_sync(mdev);
3708 		}
3709 
3710 		put_ldev(mdev);
3711 	}
3712 
3713 	return rv;
3714 }
3715 
3716 /**
3717  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3718  * @mdev:	DRBD device.
3719  *
3720  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3721  */
3722 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3723 {
3724 	int rv = -EIO;
3725 
3726 	drbd_resume_al(mdev);
3727 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
3728 		drbd_bm_clear_all(mdev);
3729 		rv = drbd_bm_write(mdev);
3730 		put_ldev(mdev);
3731 	}
3732 
3733 	return rv;
3734 }
3735 
3736 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3737 {
3738 	struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3739 	int rv;
3740 
3741 	D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3742 
3743 	drbd_bm_lock(mdev, work->why);
3744 	rv = work->io_fn(mdev);
3745 	drbd_bm_unlock(mdev);
3746 
3747 	clear_bit(BITMAP_IO, &mdev->flags);
3748 	wake_up(&mdev->misc_wait);
3749 
3750 	if (work->done)
3751 		work->done(mdev, rv);
3752 
3753 	clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3754 	work->why = NULL;
3755 
3756 	return 1;
3757 }
3758 
3759 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3760 {
3761 	D_ASSERT(mdev->state.disk == D_FAILED);
3762 	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3763 	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3764 	 * the protected members anymore, though, so in the after_state_ch work
3765 	 * it will be safe to free them. */
3766 	drbd_force_state(mdev, NS(disk, D_DISKLESS));
3767 	/* We need to wait for return of references checked out while we still
3768 	 * have been D_FAILED, though (drbd_md_sync, bitmap io). */
3769 	wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
3770 
3771 	clear_bit(GO_DISKLESS, &mdev->flags);
3772 	return 1;
3773 }
3774 
3775 void drbd_go_diskless(struct drbd_conf *mdev)
3776 {
3777 	D_ASSERT(mdev->state.disk == D_FAILED);
3778 	if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3779 		drbd_queue_work(&mdev->data.work, &mdev->go_diskless);
3780 		/* don't drbd_queue_work_front,
3781 		 * we need to serialize with the after_state_ch work
3782 		 * of the -> D_FAILED transition. */
3783 }
3784 
3785 /**
3786  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3787  * @mdev:	DRBD device.
3788  * @io_fn:	IO callback to be called when bitmap IO is possible
3789  * @done:	callback to be called after the bitmap IO was performed
3790  * @why:	Descriptive text of the reason for doing the IO
3791  *
3792  * While IO on the bitmap happens we freeze application IO thus we ensure
3793  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3794  * called from worker context. It MUST NOT be used while a previous such
3795  * work is still pending!
3796  */
3797 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3798 			  int (*io_fn)(struct drbd_conf *),
3799 			  void (*done)(struct drbd_conf *, int),
3800 			  char *why)
3801 {
3802 	D_ASSERT(current == mdev->worker.task);
3803 
3804 	D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3805 	D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3806 	D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3807 	if (mdev->bm_io_work.why)
3808 		dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3809 			why, mdev->bm_io_work.why);
3810 
3811 	mdev->bm_io_work.io_fn = io_fn;
3812 	mdev->bm_io_work.done = done;
3813 	mdev->bm_io_work.why = why;
3814 
3815 	set_bit(BITMAP_IO, &mdev->flags);
3816 	if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3817 		if (list_empty(&mdev->bm_io_work.w.list)) {
3818 			set_bit(BITMAP_IO_QUEUED, &mdev->flags);
3819 			drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
3820 		} else
3821 			dev_err(DEV, "FIXME avoided double queuing bm_io_work\n");
3822 	}
3823 }
3824 
3825 /**
3826  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3827  * @mdev:	DRBD device.
3828  * @io_fn:	IO callback to be called when bitmap IO is possible
3829  * @why:	Descriptive text of the reason for doing the IO
3830  *
3831  * freezes application IO while that the actual IO operations runs. This
3832  * functions MAY NOT be called from worker context.
3833  */
3834 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why)
3835 {
3836 	int rv;
3837 
3838 	D_ASSERT(current != mdev->worker.task);
3839 
3840 	drbd_suspend_io(mdev);
3841 
3842 	drbd_bm_lock(mdev, why);
3843 	rv = io_fn(mdev);
3844 	drbd_bm_unlock(mdev);
3845 
3846 	drbd_resume_io(mdev);
3847 
3848 	return rv;
3849 }
3850 
3851 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3852 {
3853 	if ((mdev->ldev->md.flags & flag) != flag) {
3854 		drbd_md_mark_dirty(mdev);
3855 		mdev->ldev->md.flags |= flag;
3856 	}
3857 }
3858 
3859 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3860 {
3861 	if ((mdev->ldev->md.flags & flag) != 0) {
3862 		drbd_md_mark_dirty(mdev);
3863 		mdev->ldev->md.flags &= ~flag;
3864 	}
3865 }
3866 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3867 {
3868 	return (bdev->md.flags & flag) != 0;
3869 }
3870 
3871 static void md_sync_timer_fn(unsigned long data)
3872 {
3873 	struct drbd_conf *mdev = (struct drbd_conf *) data;
3874 
3875 	drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
3876 }
3877 
3878 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3879 {
3880 	dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3881 #ifdef DEBUG
3882 	dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3883 		mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3884 #endif
3885 	drbd_md_sync(mdev);
3886 	return 1;
3887 }
3888 
3889 #ifdef CONFIG_DRBD_FAULT_INJECTION
3890 /* Fault insertion support including random number generator shamelessly
3891  * stolen from kernel/rcutorture.c */
3892 struct fault_random_state {
3893 	unsigned long state;
3894 	unsigned long count;
3895 };
3896 
3897 #define FAULT_RANDOM_MULT 39916801  /* prime */
3898 #define FAULT_RANDOM_ADD	479001701 /* prime */
3899 #define FAULT_RANDOM_REFRESH 10000
3900 
3901 /*
3902  * Crude but fast random-number generator.  Uses a linear congruential
3903  * generator, with occasional help from get_random_bytes().
3904  */
3905 static unsigned long
3906 _drbd_fault_random(struct fault_random_state *rsp)
3907 {
3908 	long refresh;
3909 
3910 	if (!rsp->count--) {
3911 		get_random_bytes(&refresh, sizeof(refresh));
3912 		rsp->state += refresh;
3913 		rsp->count = FAULT_RANDOM_REFRESH;
3914 	}
3915 	rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3916 	return swahw32(rsp->state);
3917 }
3918 
3919 static char *
3920 _drbd_fault_str(unsigned int type) {
3921 	static char *_faults[] = {
3922 		[DRBD_FAULT_MD_WR] = "Meta-data write",
3923 		[DRBD_FAULT_MD_RD] = "Meta-data read",
3924 		[DRBD_FAULT_RS_WR] = "Resync write",
3925 		[DRBD_FAULT_RS_RD] = "Resync read",
3926 		[DRBD_FAULT_DT_WR] = "Data write",
3927 		[DRBD_FAULT_DT_RD] = "Data read",
3928 		[DRBD_FAULT_DT_RA] = "Data read ahead",
3929 		[DRBD_FAULT_BM_ALLOC] = "BM allocation",
3930 		[DRBD_FAULT_AL_EE] = "EE allocation",
3931 		[DRBD_FAULT_RECEIVE] = "receive data corruption",
3932 	};
3933 
3934 	return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3935 }
3936 
3937 unsigned int
3938 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3939 {
3940 	static struct fault_random_state rrs = {0, 0};
3941 
3942 	unsigned int ret = (
3943 		(fault_devs == 0 ||
3944 			((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3945 		(((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3946 
3947 	if (ret) {
3948 		fault_count++;
3949 
3950 		if (__ratelimit(&drbd_ratelimit_state))
3951 			dev_warn(DEV, "***Simulating %s failure\n",
3952 				_drbd_fault_str(type));
3953 	}
3954 
3955 	return ret;
3956 }
3957 #endif
3958 
3959 const char *drbd_buildtag(void)
3960 {
3961 	/* DRBD built from external sources has here a reference to the
3962 	   git hash of the source code. */
3963 
3964 	static char buildtag[38] = "\0uilt-in";
3965 
3966 	if (buildtag[0] == 0) {
3967 #ifdef CONFIG_MODULES
3968 		if (THIS_MODULE != NULL)
3969 			sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3970 		else
3971 #endif
3972 			buildtag[0] = 'b';
3973 	}
3974 
3975 	return buildtag;
3976 }
3977 
3978 module_init(drbd_init)
3979 module_exit(drbd_cleanup)
3980 
3981 EXPORT_SYMBOL(drbd_conn_str);
3982 EXPORT_SYMBOL(drbd_role_str);
3983 EXPORT_SYMBOL(drbd_disk_str);
3984 EXPORT_SYMBOL(drbd_set_st_err_str);
3985