xref: /linux/drivers/block/drbd/drbd_main.c (revision ce7240e445303de3ca66e6d08f17a2ec278a5bf6)
1 /*
2    drbd.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12 
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17 
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22 
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 
27  */
28 
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48 
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52 
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56 
57 #include "drbd_vli.h"
58 
59 struct after_state_chg_work {
60 	struct drbd_work w;
61 	union drbd_state os;
62 	union drbd_state ns;
63 	enum chg_state_flags flags;
64 	struct completion *done;
65 };
66 
67 static DEFINE_MUTEX(drbd_main_mutex);
68 int drbdd_init(struct drbd_thread *);
69 int drbd_worker(struct drbd_thread *);
70 int drbd_asender(struct drbd_thread *);
71 
72 int drbd_init(void);
73 static int drbd_open(struct block_device *bdev, fmode_t mode);
74 static int drbd_release(struct gendisk *gd, fmode_t mode);
75 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
76 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
77 			   union drbd_state ns, enum chg_state_flags flags);
78 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
79 static void md_sync_timer_fn(unsigned long data);
80 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
81 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused);
82 
83 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
84 	      "Lars Ellenberg <lars@linbit.com>");
85 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
86 MODULE_VERSION(REL_VERSION);
87 MODULE_LICENSE("GPL");
88 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices ("
89 		 __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
90 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
91 
92 #include <linux/moduleparam.h>
93 /* allow_open_on_secondary */
94 MODULE_PARM_DESC(allow_oos, "DONT USE!");
95 /* thanks to these macros, if compiled into the kernel (not-module),
96  * this becomes the boot parameter drbd.minor_count */
97 module_param(minor_count, uint, 0444);
98 module_param(disable_sendpage, bool, 0644);
99 module_param(allow_oos, bool, 0);
100 module_param(cn_idx, uint, 0444);
101 module_param(proc_details, int, 0644);
102 
103 #ifdef CONFIG_DRBD_FAULT_INJECTION
104 int enable_faults;
105 int fault_rate;
106 static int fault_count;
107 int fault_devs;
108 /* bitmap of enabled faults */
109 module_param(enable_faults, int, 0664);
110 /* fault rate % value - applies to all enabled faults */
111 module_param(fault_rate, int, 0664);
112 /* count of faults inserted */
113 module_param(fault_count, int, 0664);
114 /* bitmap of devices to insert faults on */
115 module_param(fault_devs, int, 0644);
116 #endif
117 
118 /* module parameter, defined */
119 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
120 bool disable_sendpage;
121 bool allow_oos;
122 unsigned int cn_idx = CN_IDX_DRBD;
123 int proc_details;       /* Detail level in proc drbd*/
124 
125 /* Module parameter for setting the user mode helper program
126  * to run. Default is /sbin/drbdadm */
127 char usermode_helper[80] = "/sbin/drbdadm";
128 
129 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
130 
131 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
132  * as member "struct gendisk *vdisk;"
133  */
134 struct drbd_conf **minor_table;
135 
136 struct kmem_cache *drbd_request_cache;
137 struct kmem_cache *drbd_ee_cache;	/* epoch entries */
138 struct kmem_cache *drbd_bm_ext_cache;	/* bitmap extents */
139 struct kmem_cache *drbd_al_ext_cache;	/* activity log extents */
140 mempool_t *drbd_request_mempool;
141 mempool_t *drbd_ee_mempool;
142 mempool_t *drbd_md_io_page_pool;
143 struct bio_set *drbd_md_io_bio_set;
144 
145 /* I do not use a standard mempool, because:
146    1) I want to hand out the pre-allocated objects first.
147    2) I want to be able to interrupt sleeping allocation with a signal.
148    Note: This is a single linked list, the next pointer is the private
149 	 member of struct page.
150  */
151 struct page *drbd_pp_pool;
152 spinlock_t   drbd_pp_lock;
153 int          drbd_pp_vacant;
154 wait_queue_head_t drbd_pp_wait;
155 
156 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
157 
158 static const struct block_device_operations drbd_ops = {
159 	.owner =   THIS_MODULE,
160 	.open =    drbd_open,
161 	.release = drbd_release,
162 };
163 
164 static void bio_destructor_drbd(struct bio *bio)
165 {
166 	bio_free(bio, drbd_md_io_bio_set);
167 }
168 
169 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
170 {
171 	struct bio *bio;
172 
173 	if (!drbd_md_io_bio_set)
174 		return bio_alloc(gfp_mask, 1);
175 
176 	bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
177 	if (!bio)
178 		return NULL;
179 	bio->bi_destructor = bio_destructor_drbd;
180 	return bio;
181 }
182 
183 #ifdef __CHECKER__
184 /* When checking with sparse, and this is an inline function, sparse will
185    give tons of false positives. When this is a real functions sparse works.
186  */
187 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
188 {
189 	int io_allowed;
190 
191 	atomic_inc(&mdev->local_cnt);
192 	io_allowed = (mdev->state.disk >= mins);
193 	if (!io_allowed) {
194 		if (atomic_dec_and_test(&mdev->local_cnt))
195 			wake_up(&mdev->misc_wait);
196 	}
197 	return io_allowed;
198 }
199 
200 #endif
201 
202 /**
203  * DOC: The transfer log
204  *
205  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
206  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
207  * of the list. There is always at least one &struct drbd_tl_epoch object.
208  *
209  * Each &struct drbd_tl_epoch has a circular double linked list of requests
210  * attached.
211  */
212 static int tl_init(struct drbd_conf *mdev)
213 {
214 	struct drbd_tl_epoch *b;
215 
216 	/* during device minor initialization, we may well use GFP_KERNEL */
217 	b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
218 	if (!b)
219 		return 0;
220 	INIT_LIST_HEAD(&b->requests);
221 	INIT_LIST_HEAD(&b->w.list);
222 	b->next = NULL;
223 	b->br_number = 4711;
224 	b->n_writes = 0;
225 	b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
226 
227 	mdev->oldest_tle = b;
228 	mdev->newest_tle = b;
229 	INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
230 	INIT_LIST_HEAD(&mdev->barrier_acked_requests);
231 
232 	mdev->tl_hash = NULL;
233 	mdev->tl_hash_s = 0;
234 
235 	return 1;
236 }
237 
238 static void tl_cleanup(struct drbd_conf *mdev)
239 {
240 	D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
241 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
242 	kfree(mdev->oldest_tle);
243 	mdev->oldest_tle = NULL;
244 	kfree(mdev->unused_spare_tle);
245 	mdev->unused_spare_tle = NULL;
246 	kfree(mdev->tl_hash);
247 	mdev->tl_hash = NULL;
248 	mdev->tl_hash_s = 0;
249 }
250 
251 /**
252  * _tl_add_barrier() - Adds a barrier to the transfer log
253  * @mdev:	DRBD device.
254  * @new:	Barrier to be added before the current head of the TL.
255  *
256  * The caller must hold the req_lock.
257  */
258 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
259 {
260 	struct drbd_tl_epoch *newest_before;
261 
262 	INIT_LIST_HEAD(&new->requests);
263 	INIT_LIST_HEAD(&new->w.list);
264 	new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
265 	new->next = NULL;
266 	new->n_writes = 0;
267 
268 	newest_before = mdev->newest_tle;
269 	new->br_number = newest_before->br_number+1;
270 	if (mdev->newest_tle != new) {
271 		mdev->newest_tle->next = new;
272 		mdev->newest_tle = new;
273 	}
274 }
275 
276 /**
277  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
278  * @mdev:	DRBD device.
279  * @barrier_nr:	Expected identifier of the DRBD write barrier packet.
280  * @set_size:	Expected number of requests before that barrier.
281  *
282  * In case the passed barrier_nr or set_size does not match the oldest
283  * &struct drbd_tl_epoch objects this function will cause a termination
284  * of the connection.
285  */
286 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
287 		       unsigned int set_size)
288 {
289 	struct drbd_tl_epoch *b, *nob; /* next old barrier */
290 	struct list_head *le, *tle;
291 	struct drbd_request *r;
292 
293 	spin_lock_irq(&mdev->req_lock);
294 
295 	b = mdev->oldest_tle;
296 
297 	/* first some paranoia code */
298 	if (b == NULL) {
299 		dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
300 			barrier_nr);
301 		goto bail;
302 	}
303 	if (b->br_number != barrier_nr) {
304 		dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
305 			barrier_nr, b->br_number);
306 		goto bail;
307 	}
308 	if (b->n_writes != set_size) {
309 		dev_err(DEV, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
310 			barrier_nr, set_size, b->n_writes);
311 		goto bail;
312 	}
313 
314 	/* Clean up list of requests processed during current epoch */
315 	list_for_each_safe(le, tle, &b->requests) {
316 		r = list_entry(le, struct drbd_request, tl_requests);
317 		_req_mod(r, barrier_acked);
318 	}
319 	/* There could be requests on the list waiting for completion
320 	   of the write to the local disk. To avoid corruptions of
321 	   slab's data structures we have to remove the lists head.
322 
323 	   Also there could have been a barrier ack out of sequence, overtaking
324 	   the write acks - which would be a bug and violating write ordering.
325 	   To not deadlock in case we lose connection while such requests are
326 	   still pending, we need some way to find them for the
327 	   _req_mode(connection_lost_while_pending).
328 
329 	   These have been list_move'd to the out_of_sequence_requests list in
330 	   _req_mod(, barrier_acked) above.
331 	   */
332 	list_splice_init(&b->requests, &mdev->barrier_acked_requests);
333 
334 	nob = b->next;
335 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
336 		_tl_add_barrier(mdev, b);
337 		if (nob)
338 			mdev->oldest_tle = nob;
339 		/* if nob == NULL b was the only barrier, and becomes the new
340 		   barrier. Therefore mdev->oldest_tle points already to b */
341 	} else {
342 		D_ASSERT(nob != NULL);
343 		mdev->oldest_tle = nob;
344 		kfree(b);
345 	}
346 
347 	spin_unlock_irq(&mdev->req_lock);
348 	dec_ap_pending(mdev);
349 
350 	return;
351 
352 bail:
353 	spin_unlock_irq(&mdev->req_lock);
354 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
355 }
356 
357 
358 /**
359  * _tl_restart() - Walks the transfer log, and applies an action to all requests
360  * @mdev:	DRBD device.
361  * @what:       The action/event to perform with all request objects
362  *
363  * @what might be one of connection_lost_while_pending, resend, fail_frozen_disk_io,
364  * restart_frozen_disk_io.
365  */
366 static void _tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
367 {
368 	struct drbd_tl_epoch *b, *tmp, **pn;
369 	struct list_head *le, *tle, carry_reads;
370 	struct drbd_request *req;
371 	int rv, n_writes, n_reads;
372 
373 	b = mdev->oldest_tle;
374 	pn = &mdev->oldest_tle;
375 	while (b) {
376 		n_writes = 0;
377 		n_reads = 0;
378 		INIT_LIST_HEAD(&carry_reads);
379 		list_for_each_safe(le, tle, &b->requests) {
380 			req = list_entry(le, struct drbd_request, tl_requests);
381 			rv = _req_mod(req, what);
382 
383 			n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
384 			n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
385 		}
386 		tmp = b->next;
387 
388 		if (n_writes) {
389 			if (what == resend) {
390 				b->n_writes = n_writes;
391 				if (b->w.cb == NULL) {
392 					b->w.cb = w_send_barrier;
393 					inc_ap_pending(mdev);
394 					set_bit(CREATE_BARRIER, &mdev->flags);
395 				}
396 
397 				drbd_queue_work(&mdev->data.work, &b->w);
398 			}
399 			pn = &b->next;
400 		} else {
401 			if (n_reads)
402 				list_add(&carry_reads, &b->requests);
403 			/* there could still be requests on that ring list,
404 			 * in case local io is still pending */
405 			list_del(&b->requests);
406 
407 			/* dec_ap_pending corresponding to queue_barrier.
408 			 * the newest barrier may not have been queued yet,
409 			 * in which case w.cb is still NULL. */
410 			if (b->w.cb != NULL)
411 				dec_ap_pending(mdev);
412 
413 			if (b == mdev->newest_tle) {
414 				/* recycle, but reinit! */
415 				D_ASSERT(tmp == NULL);
416 				INIT_LIST_HEAD(&b->requests);
417 				list_splice(&carry_reads, &b->requests);
418 				INIT_LIST_HEAD(&b->w.list);
419 				b->w.cb = NULL;
420 				b->br_number = net_random();
421 				b->n_writes = 0;
422 
423 				*pn = b;
424 				break;
425 			}
426 			*pn = tmp;
427 			kfree(b);
428 		}
429 		b = tmp;
430 		list_splice(&carry_reads, &b->requests);
431 	}
432 
433 	/* Actions operating on the disk state, also want to work on
434 	   requests that got barrier acked. */
435 	switch (what) {
436 	case fail_frozen_disk_io:
437 	case restart_frozen_disk_io:
438 		list_for_each_safe(le, tle, &mdev->barrier_acked_requests) {
439 			req = list_entry(le, struct drbd_request, tl_requests);
440 			_req_mod(req, what);
441 		}
442 
443 	case connection_lost_while_pending:
444 	case resend:
445 		break;
446 	default:
447 		dev_err(DEV, "what = %d in _tl_restart()\n", what);
448 	}
449 }
450 
451 
452 /**
453  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
454  * @mdev:	DRBD device.
455  *
456  * This is called after the connection to the peer was lost. The storage covered
457  * by the requests on the transfer gets marked as our of sync. Called from the
458  * receiver thread and the worker thread.
459  */
460 void tl_clear(struct drbd_conf *mdev)
461 {
462 	struct list_head *le, *tle;
463 	struct drbd_request *r;
464 
465 	spin_lock_irq(&mdev->req_lock);
466 
467 	_tl_restart(mdev, connection_lost_while_pending);
468 
469 	/* we expect this list to be empty. */
470 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
471 
472 	/* but just in case, clean it up anyways! */
473 	list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
474 		r = list_entry(le, struct drbd_request, tl_requests);
475 		/* It would be nice to complete outside of spinlock.
476 		 * But this is easier for now. */
477 		_req_mod(r, connection_lost_while_pending);
478 	}
479 
480 	/* ensure bit indicating barrier is required is clear */
481 	clear_bit(CREATE_BARRIER, &mdev->flags);
482 
483 	memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
484 
485 	spin_unlock_irq(&mdev->req_lock);
486 }
487 
488 void tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
489 {
490 	spin_lock_irq(&mdev->req_lock);
491 	_tl_restart(mdev, what);
492 	spin_unlock_irq(&mdev->req_lock);
493 }
494 
495 /**
496  * tl_abort_disk_io() - Abort disk I/O for all requests for a certain mdev in the TL
497  * @mdev:	DRBD device.
498  */
499 void tl_abort_disk_io(struct drbd_conf *mdev)
500 {
501 	struct drbd_tl_epoch *b;
502 	struct list_head *le, *tle;
503 	struct drbd_request *req;
504 
505 	spin_lock_irq(&mdev->req_lock);
506 	b = mdev->oldest_tle;
507 	while (b) {
508 		list_for_each_safe(le, tle, &b->requests) {
509 			req = list_entry(le, struct drbd_request, tl_requests);
510 			if (!(req->rq_state & RQ_LOCAL_PENDING))
511 				continue;
512 			_req_mod(req, abort_disk_io);
513 		}
514 		b = b->next;
515 	}
516 
517 	list_for_each_safe(le, tle, &mdev->barrier_acked_requests) {
518 		req = list_entry(le, struct drbd_request, tl_requests);
519 		if (!(req->rq_state & RQ_LOCAL_PENDING))
520 			continue;
521 		_req_mod(req, abort_disk_io);
522 	}
523 
524 	spin_unlock_irq(&mdev->req_lock);
525 }
526 
527 /**
528  * cl_wide_st_chg() - true if the state change is a cluster wide one
529  * @mdev:	DRBD device.
530  * @os:		old (current) state.
531  * @ns:		new (wanted) state.
532  */
533 static int cl_wide_st_chg(struct drbd_conf *mdev,
534 			  union drbd_state os, union drbd_state ns)
535 {
536 	return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
537 		 ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
538 		  (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
539 		  (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
540 		  (os.disk != D_FAILED && ns.disk == D_FAILED))) ||
541 		(os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
542 		(os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
543 }
544 
545 enum drbd_state_rv
546 drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
547 		  union drbd_state mask, union drbd_state val)
548 {
549 	unsigned long flags;
550 	union drbd_state os, ns;
551 	enum drbd_state_rv rv;
552 
553 	spin_lock_irqsave(&mdev->req_lock, flags);
554 	os = mdev->state;
555 	ns.i = (os.i & ~mask.i) | val.i;
556 	rv = _drbd_set_state(mdev, ns, f, NULL);
557 	ns = mdev->state;
558 	spin_unlock_irqrestore(&mdev->req_lock, flags);
559 
560 	return rv;
561 }
562 
563 /**
564  * drbd_force_state() - Impose a change which happens outside our control on our state
565  * @mdev:	DRBD device.
566  * @mask:	mask of state bits to change.
567  * @val:	value of new state bits.
568  */
569 void drbd_force_state(struct drbd_conf *mdev,
570 	union drbd_state mask, union drbd_state val)
571 {
572 	drbd_change_state(mdev, CS_HARD, mask, val);
573 }
574 
575 static enum drbd_state_rv is_valid_state(struct drbd_conf *, union drbd_state);
576 static enum drbd_state_rv is_valid_state_transition(struct drbd_conf *,
577 						    union drbd_state,
578 						    union drbd_state);
579 enum sanitize_state_warnings {
580 	NO_WARNING,
581 	ABORTED_ONLINE_VERIFY,
582 	ABORTED_RESYNC,
583 	CONNECTION_LOST_NEGOTIATING,
584 	IMPLICITLY_UPGRADED_DISK,
585 	IMPLICITLY_UPGRADED_PDSK,
586 };
587 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
588 				       union drbd_state ns, enum sanitize_state_warnings *warn);
589 int drbd_send_state_req(struct drbd_conf *,
590 			union drbd_state, union drbd_state);
591 
592 static enum drbd_state_rv
593 _req_st_cond(struct drbd_conf *mdev, union drbd_state mask,
594 	     union drbd_state val)
595 {
596 	union drbd_state os, ns;
597 	unsigned long flags;
598 	enum drbd_state_rv rv;
599 
600 	if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
601 		return SS_CW_SUCCESS;
602 
603 	if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
604 		return SS_CW_FAILED_BY_PEER;
605 
606 	rv = 0;
607 	spin_lock_irqsave(&mdev->req_lock, flags);
608 	os = mdev->state;
609 	ns.i = (os.i & ~mask.i) | val.i;
610 	ns = sanitize_state(mdev, os, ns, NULL);
611 
612 	if (!cl_wide_st_chg(mdev, os, ns))
613 		rv = SS_CW_NO_NEED;
614 	if (!rv) {
615 		rv = is_valid_state(mdev, ns);
616 		if (rv == SS_SUCCESS) {
617 			rv = is_valid_state_transition(mdev, ns, os);
618 			if (rv == SS_SUCCESS)
619 				rv = SS_UNKNOWN_ERROR; /* cont waiting, otherwise fail. */
620 		}
621 	}
622 	spin_unlock_irqrestore(&mdev->req_lock, flags);
623 
624 	return rv;
625 }
626 
627 /**
628  * drbd_req_state() - Perform an eventually cluster wide state change
629  * @mdev:	DRBD device.
630  * @mask:	mask of state bits to change.
631  * @val:	value of new state bits.
632  * @f:		flags
633  *
634  * Should not be called directly, use drbd_request_state() or
635  * _drbd_request_state().
636  */
637 static enum drbd_state_rv
638 drbd_req_state(struct drbd_conf *mdev, union drbd_state mask,
639 	       union drbd_state val, enum chg_state_flags f)
640 {
641 	struct completion done;
642 	unsigned long flags;
643 	union drbd_state os, ns;
644 	enum drbd_state_rv rv;
645 
646 	init_completion(&done);
647 
648 	if (f & CS_SERIALIZE)
649 		mutex_lock(&mdev->state_mutex);
650 
651 	spin_lock_irqsave(&mdev->req_lock, flags);
652 	os = mdev->state;
653 	ns.i = (os.i & ~mask.i) | val.i;
654 	ns = sanitize_state(mdev, os, ns, NULL);
655 
656 	if (cl_wide_st_chg(mdev, os, ns)) {
657 		rv = is_valid_state(mdev, ns);
658 		if (rv == SS_SUCCESS)
659 			rv = is_valid_state_transition(mdev, ns, os);
660 		spin_unlock_irqrestore(&mdev->req_lock, flags);
661 
662 		if (rv < SS_SUCCESS) {
663 			if (f & CS_VERBOSE)
664 				print_st_err(mdev, os, ns, rv);
665 			goto abort;
666 		}
667 
668 		drbd_state_lock(mdev);
669 		if (!drbd_send_state_req(mdev, mask, val)) {
670 			drbd_state_unlock(mdev);
671 			rv = SS_CW_FAILED_BY_PEER;
672 			if (f & CS_VERBOSE)
673 				print_st_err(mdev, os, ns, rv);
674 			goto abort;
675 		}
676 
677 		wait_event(mdev->state_wait,
678 			(rv = _req_st_cond(mdev, mask, val)));
679 
680 		if (rv < SS_SUCCESS) {
681 			drbd_state_unlock(mdev);
682 			if (f & CS_VERBOSE)
683 				print_st_err(mdev, os, ns, rv);
684 			goto abort;
685 		}
686 		spin_lock_irqsave(&mdev->req_lock, flags);
687 		os = mdev->state;
688 		ns.i = (os.i & ~mask.i) | val.i;
689 		rv = _drbd_set_state(mdev, ns, f, &done);
690 		drbd_state_unlock(mdev);
691 	} else {
692 		rv = _drbd_set_state(mdev, ns, f, &done);
693 	}
694 
695 	spin_unlock_irqrestore(&mdev->req_lock, flags);
696 
697 	if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
698 		D_ASSERT(current != mdev->worker.task);
699 		wait_for_completion(&done);
700 	}
701 
702 abort:
703 	if (f & CS_SERIALIZE)
704 		mutex_unlock(&mdev->state_mutex);
705 
706 	return rv;
707 }
708 
709 /**
710  * _drbd_request_state() - Request a state change (with flags)
711  * @mdev:	DRBD device.
712  * @mask:	mask of state bits to change.
713  * @val:	value of new state bits.
714  * @f:		flags
715  *
716  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
717  * flag, or when logging of failed state change requests is not desired.
718  */
719 enum drbd_state_rv
720 _drbd_request_state(struct drbd_conf *mdev, union drbd_state mask,
721 		    union drbd_state val, enum chg_state_flags f)
722 {
723 	enum drbd_state_rv rv;
724 
725 	wait_event(mdev->state_wait,
726 		   (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
727 
728 	return rv;
729 }
730 
731 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
732 {
733 	dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
734 	    name,
735 	    drbd_conn_str(ns.conn),
736 	    drbd_role_str(ns.role),
737 	    drbd_role_str(ns.peer),
738 	    drbd_disk_str(ns.disk),
739 	    drbd_disk_str(ns.pdsk),
740 	    is_susp(ns) ? 's' : 'r',
741 	    ns.aftr_isp ? 'a' : '-',
742 	    ns.peer_isp ? 'p' : '-',
743 	    ns.user_isp ? 'u' : '-'
744 	    );
745 }
746 
747 void print_st_err(struct drbd_conf *mdev, union drbd_state os,
748 	          union drbd_state ns, enum drbd_state_rv err)
749 {
750 	if (err == SS_IN_TRANSIENT_STATE)
751 		return;
752 	dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
753 	print_st(mdev, " state", os);
754 	print_st(mdev, "wanted", ns);
755 }
756 
757 
758 /**
759  * is_valid_state() - Returns an SS_ error code if ns is not valid
760  * @mdev:	DRBD device.
761  * @ns:		State to consider.
762  */
763 static enum drbd_state_rv
764 is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
765 {
766 	/* See drbd_state_sw_errors in drbd_strings.c */
767 
768 	enum drbd_fencing_p fp;
769 	enum drbd_state_rv rv = SS_SUCCESS;
770 
771 	fp = FP_DONT_CARE;
772 	if (get_ldev(mdev)) {
773 		fp = mdev->ldev->dc.fencing;
774 		put_ldev(mdev);
775 	}
776 
777 	if (get_net_conf(mdev)) {
778 		if (!mdev->net_conf->two_primaries &&
779 		    ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
780 			rv = SS_TWO_PRIMARIES;
781 		put_net_conf(mdev);
782 	}
783 
784 	if (rv <= 0)
785 		/* already found a reason to abort */;
786 	else if (ns.role == R_SECONDARY && mdev->open_cnt)
787 		rv = SS_DEVICE_IN_USE;
788 
789 	else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
790 		rv = SS_NO_UP_TO_DATE_DISK;
791 
792 	else if (fp >= FP_RESOURCE &&
793 		 ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
794 		rv = SS_PRIMARY_NOP;
795 
796 	else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
797 		rv = SS_NO_UP_TO_DATE_DISK;
798 
799 	else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
800 		rv = SS_NO_LOCAL_DISK;
801 
802 	else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
803 		rv = SS_NO_REMOTE_DISK;
804 
805 	else if (ns.conn > C_CONNECTED && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE)
806 		rv = SS_NO_UP_TO_DATE_DISK;
807 
808 	else if ((ns.conn == C_CONNECTED ||
809 		  ns.conn == C_WF_BITMAP_S ||
810 		  ns.conn == C_SYNC_SOURCE ||
811 		  ns.conn == C_PAUSED_SYNC_S) &&
812 		  ns.disk == D_OUTDATED)
813 		rv = SS_CONNECTED_OUTDATES;
814 
815 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
816 		 (mdev->sync_conf.verify_alg[0] == 0))
817 		rv = SS_NO_VERIFY_ALG;
818 
819 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
820 		  mdev->agreed_pro_version < 88)
821 		rv = SS_NOT_SUPPORTED;
822 
823 	else if (ns.conn >= C_CONNECTED && ns.pdsk == D_UNKNOWN)
824 		rv = SS_CONNECTED_OUTDATES;
825 
826 	return rv;
827 }
828 
829 /**
830  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
831  * @mdev:	DRBD device.
832  * @ns:		new state.
833  * @os:		old state.
834  */
835 static enum drbd_state_rv
836 is_valid_state_transition(struct drbd_conf *mdev, union drbd_state ns,
837 			  union drbd_state os)
838 {
839 	enum drbd_state_rv rv = SS_SUCCESS;
840 
841 	if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
842 	    os.conn > C_CONNECTED)
843 		rv = SS_RESYNC_RUNNING;
844 
845 	if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
846 		rv = SS_ALREADY_STANDALONE;
847 
848 	if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
849 		rv = SS_IS_DISKLESS;
850 
851 	if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
852 		rv = SS_NO_NET_CONFIG;
853 
854 	if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
855 		rv = SS_LOWER_THAN_OUTDATED;
856 
857 	if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
858 		rv = SS_IN_TRANSIENT_STATE;
859 
860 	if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
861 		rv = SS_IN_TRANSIENT_STATE;
862 
863 	/* While establishing a connection only allow cstate to change.
864 	   Delay/refuse role changes, detach attach etc... */
865 	if (test_bit(STATE_SENT, &mdev->flags) &&
866 	    !(os.conn == C_WF_REPORT_PARAMS ||
867 	      (ns.conn == C_WF_REPORT_PARAMS && os.conn == C_WF_CONNECTION)))
868 		rv = SS_IN_TRANSIENT_STATE;
869 
870 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
871 		rv = SS_NEED_CONNECTION;
872 
873 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
874 	    ns.conn != os.conn && os.conn > C_CONNECTED)
875 		rv = SS_RESYNC_RUNNING;
876 
877 	if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
878 	    os.conn < C_CONNECTED)
879 		rv = SS_NEED_CONNECTION;
880 
881 	if ((ns.conn == C_SYNC_TARGET || ns.conn == C_SYNC_SOURCE)
882 	    && os.conn < C_WF_REPORT_PARAMS)
883 		rv = SS_NEED_CONNECTION; /* No NetworkFailure -> SyncTarget etc... */
884 
885 	return rv;
886 }
887 
888 static void print_sanitize_warnings(struct drbd_conf *mdev, enum sanitize_state_warnings warn)
889 {
890 	static const char *msg_table[] = {
891 		[NO_WARNING] = "",
892 		[ABORTED_ONLINE_VERIFY] = "Online-verify aborted.",
893 		[ABORTED_RESYNC] = "Resync aborted.",
894 		[CONNECTION_LOST_NEGOTIATING] = "Connection lost while negotiating, no data!",
895 		[IMPLICITLY_UPGRADED_DISK] = "Implicitly upgraded disk",
896 		[IMPLICITLY_UPGRADED_PDSK] = "Implicitly upgraded pdsk",
897 	};
898 
899 	if (warn != NO_WARNING)
900 		dev_warn(DEV, "%s\n", msg_table[warn]);
901 }
902 
903 /**
904  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
905  * @mdev:	DRBD device.
906  * @os:		old state.
907  * @ns:		new state.
908  * @warn_sync_abort:
909  *
910  * When we loose connection, we have to set the state of the peers disk (pdsk)
911  * to D_UNKNOWN. This rule and many more along those lines are in this function.
912  */
913 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
914 				       union drbd_state ns, enum sanitize_state_warnings *warn)
915 {
916 	enum drbd_fencing_p fp;
917 	enum drbd_disk_state disk_min, disk_max, pdsk_min, pdsk_max;
918 
919 	if (warn)
920 		*warn = NO_WARNING;
921 
922 	fp = FP_DONT_CARE;
923 	if (get_ldev(mdev)) {
924 		fp = mdev->ldev->dc.fencing;
925 		put_ldev(mdev);
926 	}
927 
928 	/* Disallow Network errors to configure a device's network part */
929 	if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
930 	    os.conn <= C_DISCONNECTING)
931 		ns.conn = os.conn;
932 
933 	/* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow.
934 	 * If you try to go into some Sync* state, that shall fail (elsewhere). */
935 	if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
936 	    ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING && ns.conn <= C_CONNECTED)
937 		ns.conn = os.conn;
938 
939 	/* we cannot fail (again) if we already detached */
940 	if (ns.disk == D_FAILED && os.disk == D_DISKLESS)
941 		ns.disk = D_DISKLESS;
942 
943 	/* After C_DISCONNECTING only C_STANDALONE may follow */
944 	if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
945 		ns.conn = os.conn;
946 
947 	if (ns.conn < C_CONNECTED) {
948 		ns.peer_isp = 0;
949 		ns.peer = R_UNKNOWN;
950 		if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
951 			ns.pdsk = D_UNKNOWN;
952 	}
953 
954 	/* Clear the aftr_isp when becoming unconfigured */
955 	if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
956 		ns.aftr_isp = 0;
957 
958 	/* Abort resync if a disk fails/detaches */
959 	if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
960 	    (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
961 		if (warn)
962 			*warn =	os.conn == C_VERIFY_S || os.conn == C_VERIFY_T ?
963 				ABORTED_ONLINE_VERIFY : ABORTED_RESYNC;
964 		ns.conn = C_CONNECTED;
965 	}
966 
967 	/* Connection breaks down before we finished "Negotiating" */
968 	if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
969 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
970 		if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
971 			ns.disk = mdev->new_state_tmp.disk;
972 			ns.pdsk = mdev->new_state_tmp.pdsk;
973 		} else {
974 			if (warn)
975 				*warn = CONNECTION_LOST_NEGOTIATING;
976 			ns.disk = D_DISKLESS;
977 			ns.pdsk = D_UNKNOWN;
978 		}
979 		put_ldev(mdev);
980 	}
981 
982 	/* D_CONSISTENT and D_OUTDATED vanish when we get connected */
983 	if (ns.conn >= C_CONNECTED && ns.conn < C_AHEAD) {
984 		if (ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED)
985 			ns.disk = D_UP_TO_DATE;
986 		if (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)
987 			ns.pdsk = D_UP_TO_DATE;
988 	}
989 
990 	/* Implications of the connection stat on the disk states */
991 	disk_min = D_DISKLESS;
992 	disk_max = D_UP_TO_DATE;
993 	pdsk_min = D_INCONSISTENT;
994 	pdsk_max = D_UNKNOWN;
995 	switch ((enum drbd_conns)ns.conn) {
996 	case C_WF_BITMAP_T:
997 	case C_PAUSED_SYNC_T:
998 	case C_STARTING_SYNC_T:
999 	case C_WF_SYNC_UUID:
1000 	case C_BEHIND:
1001 		disk_min = D_INCONSISTENT;
1002 		disk_max = D_OUTDATED;
1003 		pdsk_min = D_UP_TO_DATE;
1004 		pdsk_max = D_UP_TO_DATE;
1005 		break;
1006 	case C_VERIFY_S:
1007 	case C_VERIFY_T:
1008 		disk_min = D_UP_TO_DATE;
1009 		disk_max = D_UP_TO_DATE;
1010 		pdsk_min = D_UP_TO_DATE;
1011 		pdsk_max = D_UP_TO_DATE;
1012 		break;
1013 	case C_CONNECTED:
1014 		disk_min = D_DISKLESS;
1015 		disk_max = D_UP_TO_DATE;
1016 		pdsk_min = D_DISKLESS;
1017 		pdsk_max = D_UP_TO_DATE;
1018 		break;
1019 	case C_WF_BITMAP_S:
1020 	case C_PAUSED_SYNC_S:
1021 	case C_STARTING_SYNC_S:
1022 	case C_AHEAD:
1023 		disk_min = D_UP_TO_DATE;
1024 		disk_max = D_UP_TO_DATE;
1025 		pdsk_min = D_INCONSISTENT;
1026 		pdsk_max = D_CONSISTENT; /* D_OUTDATED would be nice. But explicit outdate necessary*/
1027 		break;
1028 	case C_SYNC_TARGET:
1029 		disk_min = D_INCONSISTENT;
1030 		disk_max = D_INCONSISTENT;
1031 		pdsk_min = D_UP_TO_DATE;
1032 		pdsk_max = D_UP_TO_DATE;
1033 		break;
1034 	case C_SYNC_SOURCE:
1035 		disk_min = D_UP_TO_DATE;
1036 		disk_max = D_UP_TO_DATE;
1037 		pdsk_min = D_INCONSISTENT;
1038 		pdsk_max = D_INCONSISTENT;
1039 		break;
1040 	case C_STANDALONE:
1041 	case C_DISCONNECTING:
1042 	case C_UNCONNECTED:
1043 	case C_TIMEOUT:
1044 	case C_BROKEN_PIPE:
1045 	case C_NETWORK_FAILURE:
1046 	case C_PROTOCOL_ERROR:
1047 	case C_TEAR_DOWN:
1048 	case C_WF_CONNECTION:
1049 	case C_WF_REPORT_PARAMS:
1050 	case C_MASK:
1051 		break;
1052 	}
1053 	if (ns.disk > disk_max)
1054 		ns.disk = disk_max;
1055 
1056 	if (ns.disk < disk_min) {
1057 		if (warn)
1058 			*warn = IMPLICITLY_UPGRADED_DISK;
1059 		ns.disk = disk_min;
1060 	}
1061 	if (ns.pdsk > pdsk_max)
1062 		ns.pdsk = pdsk_max;
1063 
1064 	if (ns.pdsk < pdsk_min) {
1065 		if (warn)
1066 			*warn = IMPLICITLY_UPGRADED_PDSK;
1067 		ns.pdsk = pdsk_min;
1068 	}
1069 
1070 	if (fp == FP_STONITH &&
1071 	    (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
1072 	    !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
1073 		ns.susp_fen = 1; /* Suspend IO while fence-peer handler runs (peer lost) */
1074 
1075 	if (mdev->sync_conf.on_no_data == OND_SUSPEND_IO &&
1076 	    (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE) &&
1077 	    !(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE))
1078 		ns.susp_nod = 1; /* Suspend IO while no data available (no accessible data available) */
1079 
1080 	if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
1081 		if (ns.conn == C_SYNC_SOURCE)
1082 			ns.conn = C_PAUSED_SYNC_S;
1083 		if (ns.conn == C_SYNC_TARGET)
1084 			ns.conn = C_PAUSED_SYNC_T;
1085 	} else {
1086 		if (ns.conn == C_PAUSED_SYNC_S)
1087 			ns.conn = C_SYNC_SOURCE;
1088 		if (ns.conn == C_PAUSED_SYNC_T)
1089 			ns.conn = C_SYNC_TARGET;
1090 	}
1091 
1092 	return ns;
1093 }
1094 
1095 /* helper for __drbd_set_state */
1096 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
1097 {
1098 	if (mdev->agreed_pro_version < 90)
1099 		mdev->ov_start_sector = 0;
1100 	mdev->rs_total = drbd_bm_bits(mdev);
1101 	mdev->ov_position = 0;
1102 	if (cs == C_VERIFY_T) {
1103 		/* starting online verify from an arbitrary position
1104 		 * does not fit well into the existing protocol.
1105 		 * on C_VERIFY_T, we initialize ov_left and friends
1106 		 * implicitly in receive_DataRequest once the
1107 		 * first P_OV_REQUEST is received */
1108 		mdev->ov_start_sector = ~(sector_t)0;
1109 	} else {
1110 		unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
1111 		if (bit >= mdev->rs_total) {
1112 			mdev->ov_start_sector =
1113 				BM_BIT_TO_SECT(mdev->rs_total - 1);
1114 			mdev->rs_total = 1;
1115 		} else
1116 			mdev->rs_total -= bit;
1117 		mdev->ov_position = mdev->ov_start_sector;
1118 	}
1119 	mdev->ov_left = mdev->rs_total;
1120 }
1121 
1122 static void drbd_resume_al(struct drbd_conf *mdev)
1123 {
1124 	if (test_and_clear_bit(AL_SUSPENDED, &mdev->flags))
1125 		dev_info(DEV, "Resumed AL updates\n");
1126 }
1127 
1128 /**
1129  * __drbd_set_state() - Set a new DRBD state
1130  * @mdev:	DRBD device.
1131  * @ns:		new state.
1132  * @flags:	Flags
1133  * @done:	Optional completion, that will get completed after the after_state_ch() finished
1134  *
1135  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
1136  */
1137 enum drbd_state_rv
1138 __drbd_set_state(struct drbd_conf *mdev, union drbd_state ns,
1139 	         enum chg_state_flags flags, struct completion *done)
1140 {
1141 	union drbd_state os;
1142 	enum drbd_state_rv rv = SS_SUCCESS;
1143 	enum sanitize_state_warnings ssw;
1144 	struct after_state_chg_work *ascw;
1145 
1146 	os = mdev->state;
1147 
1148 	ns = sanitize_state(mdev, os, ns, &ssw);
1149 
1150 	if (ns.i == os.i)
1151 		return SS_NOTHING_TO_DO;
1152 
1153 	if (!(flags & CS_HARD)) {
1154 		/*  pre-state-change checks ; only look at ns  */
1155 		/* See drbd_state_sw_errors in drbd_strings.c */
1156 
1157 		rv = is_valid_state(mdev, ns);
1158 		if (rv < SS_SUCCESS) {
1159 			/* If the old state was illegal as well, then let
1160 			   this happen...*/
1161 
1162 			if (is_valid_state(mdev, os) == rv)
1163 				rv = is_valid_state_transition(mdev, ns, os);
1164 		} else
1165 			rv = is_valid_state_transition(mdev, ns, os);
1166 	}
1167 
1168 	if (rv < SS_SUCCESS) {
1169 		if (flags & CS_VERBOSE)
1170 			print_st_err(mdev, os, ns, rv);
1171 		return rv;
1172 	}
1173 
1174 	print_sanitize_warnings(mdev, ssw);
1175 
1176 	{
1177 	char *pbp, pb[300];
1178 	pbp = pb;
1179 	*pbp = 0;
1180 	if (ns.role != os.role)
1181 		pbp += sprintf(pbp, "role( %s -> %s ) ",
1182 			       drbd_role_str(os.role),
1183 			       drbd_role_str(ns.role));
1184 	if (ns.peer != os.peer)
1185 		pbp += sprintf(pbp, "peer( %s -> %s ) ",
1186 			       drbd_role_str(os.peer),
1187 			       drbd_role_str(ns.peer));
1188 	if (ns.conn != os.conn)
1189 		pbp += sprintf(pbp, "conn( %s -> %s ) ",
1190 			       drbd_conn_str(os.conn),
1191 			       drbd_conn_str(ns.conn));
1192 	if (ns.disk != os.disk)
1193 		pbp += sprintf(pbp, "disk( %s -> %s ) ",
1194 			       drbd_disk_str(os.disk),
1195 			       drbd_disk_str(ns.disk));
1196 	if (ns.pdsk != os.pdsk)
1197 		pbp += sprintf(pbp, "pdsk( %s -> %s ) ",
1198 			       drbd_disk_str(os.pdsk),
1199 			       drbd_disk_str(ns.pdsk));
1200 	if (is_susp(ns) != is_susp(os))
1201 		pbp += sprintf(pbp, "susp( %d -> %d ) ",
1202 			       is_susp(os),
1203 			       is_susp(ns));
1204 	if (ns.aftr_isp != os.aftr_isp)
1205 		pbp += sprintf(pbp, "aftr_isp( %d -> %d ) ",
1206 			       os.aftr_isp,
1207 			       ns.aftr_isp);
1208 	if (ns.peer_isp != os.peer_isp)
1209 		pbp += sprintf(pbp, "peer_isp( %d -> %d ) ",
1210 			       os.peer_isp,
1211 			       ns.peer_isp);
1212 	if (ns.user_isp != os.user_isp)
1213 		pbp += sprintf(pbp, "user_isp( %d -> %d ) ",
1214 			       os.user_isp,
1215 			       ns.user_isp);
1216 	dev_info(DEV, "%s\n", pb);
1217 	}
1218 
1219 	/* solve the race between becoming unconfigured,
1220 	 * worker doing the cleanup, and
1221 	 * admin reconfiguring us:
1222 	 * on (re)configure, first set CONFIG_PENDING,
1223 	 * then wait for a potentially exiting worker,
1224 	 * start the worker, and schedule one no_op.
1225 	 * then proceed with configuration.
1226 	 */
1227 	if (ns.disk == D_DISKLESS &&
1228 	    ns.conn == C_STANDALONE &&
1229 	    ns.role == R_SECONDARY &&
1230 	    !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
1231 		set_bit(DEVICE_DYING, &mdev->flags);
1232 
1233 	/* if we are going -> D_FAILED or D_DISKLESS, grab one extra reference
1234 	 * on the ldev here, to be sure the transition -> D_DISKLESS resp.
1235 	 * drbd_ldev_destroy() won't happen before our corresponding
1236 	 * after_state_ch works run, where we put_ldev again. */
1237 	if ((os.disk != D_FAILED && ns.disk == D_FAILED) ||
1238 	    (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))
1239 		atomic_inc(&mdev->local_cnt);
1240 
1241 	mdev->state = ns;
1242 
1243 	if (os.disk == D_ATTACHING && ns.disk >= D_NEGOTIATING)
1244 		drbd_print_uuids(mdev, "attached to UUIDs");
1245 
1246 	wake_up(&mdev->misc_wait);
1247 	wake_up(&mdev->state_wait);
1248 
1249 	/* aborted verify run. log the last position */
1250 	if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1251 	    ns.conn < C_CONNECTED) {
1252 		mdev->ov_start_sector =
1253 			BM_BIT_TO_SECT(drbd_bm_bits(mdev) - mdev->ov_left);
1254 		dev_info(DEV, "Online Verify reached sector %llu\n",
1255 			(unsigned long long)mdev->ov_start_sector);
1256 	}
1257 
1258 	if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1259 	    (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1260 		dev_info(DEV, "Syncer continues.\n");
1261 		mdev->rs_paused += (long)jiffies
1262 				  -(long)mdev->rs_mark_time[mdev->rs_last_mark];
1263 		if (ns.conn == C_SYNC_TARGET)
1264 			mod_timer(&mdev->resync_timer, jiffies);
1265 	}
1266 
1267 	if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1268 	    (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1269 		dev_info(DEV, "Resync suspended\n");
1270 		mdev->rs_mark_time[mdev->rs_last_mark] = jiffies;
1271 	}
1272 
1273 	if (os.conn == C_CONNECTED &&
1274 	    (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1275 		unsigned long now = jiffies;
1276 		int i;
1277 
1278 		set_ov_position(mdev, ns.conn);
1279 		mdev->rs_start = now;
1280 		mdev->rs_last_events = 0;
1281 		mdev->rs_last_sect_ev = 0;
1282 		mdev->ov_last_oos_size = 0;
1283 		mdev->ov_last_oos_start = 0;
1284 
1285 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1286 			mdev->rs_mark_left[i] = mdev->ov_left;
1287 			mdev->rs_mark_time[i] = now;
1288 		}
1289 
1290 		drbd_rs_controller_reset(mdev);
1291 
1292 		if (ns.conn == C_VERIFY_S) {
1293 			dev_info(DEV, "Starting Online Verify from sector %llu\n",
1294 					(unsigned long long)mdev->ov_position);
1295 			mod_timer(&mdev->resync_timer, jiffies);
1296 		}
1297 	}
1298 
1299 	if (get_ldev(mdev)) {
1300 		u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1301 						 MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1302 						 MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1303 
1304 		if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1305 			mdf |= MDF_CRASHED_PRIMARY;
1306 		if (mdev->state.role == R_PRIMARY ||
1307 		    (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1308 			mdf |= MDF_PRIMARY_IND;
1309 		if (mdev->state.conn > C_WF_REPORT_PARAMS)
1310 			mdf |= MDF_CONNECTED_IND;
1311 		if (mdev->state.disk > D_INCONSISTENT)
1312 			mdf |= MDF_CONSISTENT;
1313 		if (mdev->state.disk > D_OUTDATED)
1314 			mdf |= MDF_WAS_UP_TO_DATE;
1315 		if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1316 			mdf |= MDF_PEER_OUT_DATED;
1317 		if (mdf != mdev->ldev->md.flags) {
1318 			mdev->ldev->md.flags = mdf;
1319 			drbd_md_mark_dirty(mdev);
1320 		}
1321 		if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1322 			drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1323 		put_ldev(mdev);
1324 	}
1325 
1326 	/* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1327 	if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1328 	    os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1329 		set_bit(CONSIDER_RESYNC, &mdev->flags);
1330 
1331 	/* Receiver should clean up itself */
1332 	if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1333 		drbd_thread_stop_nowait(&mdev->receiver);
1334 
1335 	/* Now the receiver finished cleaning up itself, it should die */
1336 	if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1337 		drbd_thread_stop_nowait(&mdev->receiver);
1338 
1339 	/* Upon network failure, we need to restart the receiver. */
1340 	if (os.conn > C_WF_CONNECTION &&
1341 	    ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1342 		drbd_thread_restart_nowait(&mdev->receiver);
1343 
1344 	/* Resume AL writing if we get a connection */
1345 	if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
1346 		drbd_resume_al(mdev);
1347 
1348 	/* remember last connect and attach times so request_timer_fn() won't
1349 	 * kill newly established sessions while we are still trying to thaw
1350 	 * previously frozen IO */
1351 	if (os.conn != C_WF_REPORT_PARAMS && ns.conn == C_WF_REPORT_PARAMS)
1352 		mdev->last_reconnect_jif = jiffies;
1353 	if ((os.disk == D_ATTACHING || os.disk == D_NEGOTIATING) &&
1354 	    ns.disk > D_NEGOTIATING)
1355 		mdev->last_reattach_jif = jiffies;
1356 
1357 	ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1358 	if (ascw) {
1359 		ascw->os = os;
1360 		ascw->ns = ns;
1361 		ascw->flags = flags;
1362 		ascw->w.cb = w_after_state_ch;
1363 		ascw->done = done;
1364 		drbd_queue_work(&mdev->data.work, &ascw->w);
1365 	} else {
1366 		dev_warn(DEV, "Could not kmalloc an ascw\n");
1367 	}
1368 
1369 	return rv;
1370 }
1371 
1372 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1373 {
1374 	struct after_state_chg_work *ascw =
1375 		container_of(w, struct after_state_chg_work, w);
1376 	after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1377 	if (ascw->flags & CS_WAIT_COMPLETE) {
1378 		D_ASSERT(ascw->done != NULL);
1379 		complete(ascw->done);
1380 	}
1381 	kfree(ascw);
1382 
1383 	return 1;
1384 }
1385 
1386 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1387 {
1388 	if (rv) {
1389 		dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1390 		_drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1391 		return;
1392 	}
1393 
1394 	switch (mdev->state.conn) {
1395 	case C_STARTING_SYNC_T:
1396 		_drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1397 		break;
1398 	case C_STARTING_SYNC_S:
1399 		drbd_start_resync(mdev, C_SYNC_SOURCE);
1400 		break;
1401 	}
1402 }
1403 
1404 int drbd_bitmap_io_from_worker(struct drbd_conf *mdev,
1405 		int (*io_fn)(struct drbd_conf *),
1406 		char *why, enum bm_flag flags)
1407 {
1408 	int rv;
1409 
1410 	D_ASSERT(current == mdev->worker.task);
1411 
1412 	/* open coded non-blocking drbd_suspend_io(mdev); */
1413 	set_bit(SUSPEND_IO, &mdev->flags);
1414 
1415 	drbd_bm_lock(mdev, why, flags);
1416 	rv = io_fn(mdev);
1417 	drbd_bm_unlock(mdev);
1418 
1419 	drbd_resume_io(mdev);
1420 
1421 	return rv;
1422 }
1423 
1424 /**
1425  * after_state_ch() - Perform after state change actions that may sleep
1426  * @mdev:	DRBD device.
1427  * @os:		old state.
1428  * @ns:		new state.
1429  * @flags:	Flags
1430  */
1431 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1432 			   union drbd_state ns, enum chg_state_flags flags)
1433 {
1434 	enum drbd_fencing_p fp;
1435 	enum drbd_req_event what = nothing;
1436 	union drbd_state nsm = (union drbd_state){ .i = -1 };
1437 
1438 	if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1439 		clear_bit(CRASHED_PRIMARY, &mdev->flags);
1440 		if (mdev->p_uuid)
1441 			mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1442 	}
1443 
1444 	fp = FP_DONT_CARE;
1445 	if (get_ldev(mdev)) {
1446 		fp = mdev->ldev->dc.fencing;
1447 		put_ldev(mdev);
1448 	}
1449 
1450 	/* Inform userspace about the change... */
1451 	drbd_bcast_state(mdev, ns);
1452 
1453 	if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1454 	    (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1455 		drbd_khelper(mdev, "pri-on-incon-degr");
1456 
1457 	/* Here we have the actions that are performed after a
1458 	   state change. This function might sleep */
1459 
1460 	if (os.disk <= D_NEGOTIATING && ns.disk > D_NEGOTIATING)
1461 		mod_timer(&mdev->request_timer, jiffies + HZ);
1462 
1463 	nsm.i = -1;
1464 	if (ns.susp_nod) {
1465 		if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
1466 			what = resend;
1467 
1468 		if ((os.disk == D_ATTACHING || os.disk == D_NEGOTIATING) &&
1469 		    ns.disk > D_NEGOTIATING)
1470 			what = restart_frozen_disk_io;
1471 
1472 		if (what != nothing)
1473 			nsm.susp_nod = 0;
1474 	}
1475 
1476 	if (ns.susp_fen) {
1477 		/* case1: The outdate peer handler is successful: */
1478 		if (os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) {
1479 			tl_clear(mdev);
1480 			if (test_bit(NEW_CUR_UUID, &mdev->flags)) {
1481 				drbd_uuid_new_current(mdev);
1482 				clear_bit(NEW_CUR_UUID, &mdev->flags);
1483 			}
1484 			spin_lock_irq(&mdev->req_lock);
1485 			_drbd_set_state(_NS(mdev, susp_fen, 0), CS_VERBOSE, NULL);
1486 			spin_unlock_irq(&mdev->req_lock);
1487 		}
1488 		/* case2: The connection was established again: */
1489 		if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
1490 			clear_bit(NEW_CUR_UUID, &mdev->flags);
1491 			what = resend;
1492 			nsm.susp_fen = 0;
1493 		}
1494 	}
1495 
1496 	if (what != nothing) {
1497 		spin_lock_irq(&mdev->req_lock);
1498 		_tl_restart(mdev, what);
1499 		nsm.i &= mdev->state.i;
1500 		_drbd_set_state(mdev, nsm, CS_VERBOSE, NULL);
1501 		spin_unlock_irq(&mdev->req_lock);
1502 	}
1503 
1504 	/* Became sync source.  With protocol >= 96, we still need to send out
1505 	 * the sync uuid now. Need to do that before any drbd_send_state, or
1506 	 * the other side may go "paused sync" before receiving the sync uuids,
1507 	 * which is unexpected. */
1508 	if ((os.conn != C_SYNC_SOURCE && os.conn != C_PAUSED_SYNC_S) &&
1509 	    (ns.conn == C_SYNC_SOURCE || ns.conn == C_PAUSED_SYNC_S) &&
1510 	    mdev->agreed_pro_version >= 96 && get_ldev(mdev)) {
1511 		drbd_gen_and_send_sync_uuid(mdev);
1512 		put_ldev(mdev);
1513 	}
1514 
1515 	/* Do not change the order of the if above and the two below... */
1516 	if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1517 		drbd_send_uuids(mdev);
1518 		drbd_send_state(mdev, ns);
1519 	}
1520 	/* No point in queuing send_bitmap if we don't have a connection
1521 	 * anymore, so check also the _current_ state, not only the new state
1522 	 * at the time this work was queued. */
1523 	if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S &&
1524 	    mdev->state.conn == C_WF_BITMAP_S)
1525 		drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL,
1526 				"send_bitmap (WFBitMapS)",
1527 				BM_LOCKED_TEST_ALLOWED);
1528 
1529 	/* Lost contact to peer's copy of the data */
1530 	if ((os.pdsk >= D_INCONSISTENT &&
1531 	     os.pdsk != D_UNKNOWN &&
1532 	     os.pdsk != D_OUTDATED)
1533 	&&  (ns.pdsk < D_INCONSISTENT ||
1534 	     ns.pdsk == D_UNKNOWN ||
1535 	     ns.pdsk == D_OUTDATED)) {
1536 		if (get_ldev(mdev)) {
1537 			if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1538 			    mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1539 				if (is_susp(mdev->state)) {
1540 					set_bit(NEW_CUR_UUID, &mdev->flags);
1541 				} else {
1542 					drbd_uuid_new_current(mdev);
1543 					drbd_send_uuids(mdev);
1544 				}
1545 			}
1546 			put_ldev(mdev);
1547 		}
1548 	}
1549 
1550 	if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1551 		if (os.peer == R_SECONDARY && ns.peer == R_PRIMARY &&
1552 		    mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1553 			drbd_uuid_new_current(mdev);
1554 			drbd_send_uuids(mdev);
1555 		}
1556 		/* D_DISKLESS Peer becomes secondary */
1557 		if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1558 			/* We may still be Primary ourselves.
1559 			 * No harm done if the bitmap still changes,
1560 			 * redirtied pages will follow later. */
1561 			drbd_bitmap_io_from_worker(mdev, &drbd_bm_write,
1562 				"demote diskless peer", BM_LOCKED_SET_ALLOWED);
1563 		put_ldev(mdev);
1564 	}
1565 
1566 	/* Write out all changed bits on demote.
1567 	 * Though, no need to da that just yet
1568 	 * if there is a resync going on still */
1569 	if (os.role == R_PRIMARY && ns.role == R_SECONDARY &&
1570 		mdev->state.conn <= C_CONNECTED && get_ldev(mdev)) {
1571 		/* No changes to the bitmap expected this time, so assert that,
1572 		 * even though no harm was done if it did change. */
1573 		drbd_bitmap_io_from_worker(mdev, &drbd_bm_write,
1574 				"demote", BM_LOCKED_TEST_ALLOWED);
1575 		put_ldev(mdev);
1576 	}
1577 
1578 	/* Last part of the attaching process ... */
1579 	if (ns.conn >= C_CONNECTED &&
1580 	    os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1581 		drbd_send_sizes(mdev, 0, 0);  /* to start sync... */
1582 		drbd_send_uuids(mdev);
1583 		drbd_send_state(mdev, ns);
1584 	}
1585 
1586 	/* We want to pause/continue resync, tell peer. */
1587 	if (ns.conn >= C_CONNECTED &&
1588 	     ((os.aftr_isp != ns.aftr_isp) ||
1589 	      (os.user_isp != ns.user_isp)))
1590 		drbd_send_state(mdev, ns);
1591 
1592 	/* In case one of the isp bits got set, suspend other devices. */
1593 	if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1594 	    (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1595 		suspend_other_sg(mdev);
1596 
1597 	/* Make sure the peer gets informed about eventual state
1598 	   changes (ISP bits) while we were in WFReportParams. */
1599 	if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1600 		drbd_send_state(mdev, ns);
1601 
1602 	if (os.conn != C_AHEAD && ns.conn == C_AHEAD)
1603 		drbd_send_state(mdev, ns);
1604 
1605 	/* We are in the progress to start a full sync... */
1606 	if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1607 	    (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1608 		/* no other bitmap changes expected during this phase */
1609 		drbd_queue_bitmap_io(mdev,
1610 			&drbd_bmio_set_n_write, &abw_start_sync,
1611 			"set_n_write from StartingSync", BM_LOCKED_TEST_ALLOWED);
1612 
1613 	/* We are invalidating our self... */
1614 	if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1615 	    os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1616 		/* other bitmap operation expected during this phase */
1617 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL,
1618 			"set_n_write from invalidate", BM_LOCKED_MASK);
1619 
1620 	/* first half of local IO error, failure to attach,
1621 	 * or administrative detach */
1622 	if (os.disk != D_FAILED && ns.disk == D_FAILED) {
1623 		enum drbd_io_error_p eh = EP_PASS_ON;
1624 		int was_io_error = 0;
1625 		/* corresponding get_ldev was in __drbd_set_state, to serialize
1626 		 * our cleanup here with the transition to D_DISKLESS.
1627 		 * But is is still not save to dreference ldev here, since
1628 		 * we might come from an failed Attach before ldev was set. */
1629 		if (mdev->ldev) {
1630 			eh = mdev->ldev->dc.on_io_error;
1631 			was_io_error = test_and_clear_bit(WAS_IO_ERROR, &mdev->flags);
1632 
1633 			/* Immediately allow completion of all application IO, that waits
1634 			   for completion from the local disk. */
1635 			tl_abort_disk_io(mdev);
1636 
1637 			/* current state still has to be D_FAILED,
1638 			 * there is only one way out: to D_DISKLESS,
1639 			 * and that may only happen after our put_ldev below. */
1640 			if (mdev->state.disk != D_FAILED)
1641 				dev_err(DEV,
1642 					"ASSERT FAILED: disk is %s during detach\n",
1643 					drbd_disk_str(mdev->state.disk));
1644 
1645 			if (ns.conn >= C_CONNECTED)
1646 				drbd_send_state(mdev, ns);
1647 
1648 			drbd_rs_cancel_all(mdev);
1649 
1650 			/* In case we want to get something to stable storage still,
1651 			 * this may be the last chance.
1652 			 * Following put_ldev may transition to D_DISKLESS. */
1653 			drbd_md_sync(mdev);
1654 		}
1655 		put_ldev(mdev);
1656 
1657 		if (was_io_error && eh == EP_CALL_HELPER)
1658 			drbd_khelper(mdev, "local-io-error");
1659 	}
1660 
1661         /* second half of local IO error, failure to attach,
1662          * or administrative detach,
1663          * after local_cnt references have reached zero again */
1664         if (os.disk != D_DISKLESS && ns.disk == D_DISKLESS) {
1665                 /* We must still be diskless,
1666                  * re-attach has to be serialized with this! */
1667                 if (mdev->state.disk != D_DISKLESS)
1668                         dev_err(DEV,
1669                                 "ASSERT FAILED: disk is %s while going diskless\n",
1670                                 drbd_disk_str(mdev->state.disk));
1671 
1672                 mdev->rs_total = 0;
1673                 mdev->rs_failed = 0;
1674                 atomic_set(&mdev->rs_pending_cnt, 0);
1675 
1676 		if (ns.conn >= C_CONNECTED)
1677 			drbd_send_state(mdev, ns);
1678 
1679 		/* corresponding get_ldev in __drbd_set_state
1680 		 * this may finally trigger drbd_ldev_destroy. */
1681 		put_ldev(mdev);
1682 	}
1683 
1684 	/* Notify peer that I had a local IO error, and did not detached.. */
1685 	if (os.disk == D_UP_TO_DATE && ns.disk == D_INCONSISTENT && ns.conn >= C_CONNECTED)
1686 		drbd_send_state(mdev, ns);
1687 
1688 	/* Disks got bigger while they were detached */
1689 	if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1690 	    test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1691 		if (ns.conn == C_CONNECTED)
1692 			resync_after_online_grow(mdev);
1693 	}
1694 
1695 	/* A resync finished or aborted, wake paused devices... */
1696 	if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1697 	    (os.peer_isp && !ns.peer_isp) ||
1698 	    (os.user_isp && !ns.user_isp))
1699 		resume_next_sg(mdev);
1700 
1701 	/* sync target done with resync.  Explicitly notify peer, even though
1702 	 * it should (at least for non-empty resyncs) already know itself. */
1703 	if (os.disk < D_UP_TO_DATE && os.conn >= C_SYNC_SOURCE && ns.conn == C_CONNECTED)
1704 		drbd_send_state(mdev, ns);
1705 
1706 	/* Wake up role changes, that were delayed because of connection establishing */
1707 	if (os.conn == C_WF_REPORT_PARAMS && ns.conn != C_WF_REPORT_PARAMS) {
1708 		clear_bit(STATE_SENT, &mdev->flags);
1709 		wake_up(&mdev->state_wait);
1710 	}
1711 
1712 	/* This triggers bitmap writeout of potentially still unwritten pages
1713 	 * if the resync finished cleanly, or aborted because of peer disk
1714 	 * failure, or because of connection loss.
1715 	 * For resync aborted because of local disk failure, we cannot do
1716 	 * any bitmap writeout anymore.
1717 	 * No harm done if some bits change during this phase.
1718 	 */
1719 	if (os.conn > C_CONNECTED && ns.conn <= C_CONNECTED && get_ldev(mdev)) {
1720 		drbd_queue_bitmap_io(mdev, &drbd_bm_write_copy_pages, NULL,
1721 			"write from resync_finished", BM_LOCKED_CHANGE_ALLOWED);
1722 		put_ldev(mdev);
1723 	}
1724 
1725 	/* free tl_hash if we Got thawed and are C_STANDALONE */
1726 	if (ns.conn == C_STANDALONE && !is_susp(ns) && mdev->tl_hash)
1727 		drbd_free_tl_hash(mdev);
1728 
1729 	/* Upon network connection, we need to start the receiver */
1730 	if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1731 		drbd_thread_start(&mdev->receiver);
1732 
1733 	/* Terminate worker thread if we are unconfigured - it will be
1734 	   restarted as needed... */
1735 	if (ns.disk == D_DISKLESS &&
1736 	    ns.conn == C_STANDALONE &&
1737 	    ns.role == R_SECONDARY) {
1738 		if (os.aftr_isp != ns.aftr_isp)
1739 			resume_next_sg(mdev);
1740 		/* set in __drbd_set_state, unless CONFIG_PENDING was set */
1741 		if (test_bit(DEVICE_DYING, &mdev->flags))
1742 			drbd_thread_stop_nowait(&mdev->worker);
1743 	}
1744 
1745 	drbd_md_sync(mdev);
1746 }
1747 
1748 
1749 static int drbd_thread_setup(void *arg)
1750 {
1751 	struct drbd_thread *thi = (struct drbd_thread *) arg;
1752 	struct drbd_conf *mdev = thi->mdev;
1753 	unsigned long flags;
1754 	int retval;
1755 
1756 restart:
1757 	retval = thi->function(thi);
1758 
1759 	spin_lock_irqsave(&thi->t_lock, flags);
1760 
1761 	/* if the receiver has been "Exiting", the last thing it did
1762 	 * was set the conn state to "StandAlone",
1763 	 * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1764 	 * and receiver thread will be "started".
1765 	 * drbd_thread_start needs to set "Restarting" in that case.
1766 	 * t_state check and assignment needs to be within the same spinlock,
1767 	 * so either thread_start sees Exiting, and can remap to Restarting,
1768 	 * or thread_start see None, and can proceed as normal.
1769 	 */
1770 
1771 	if (thi->t_state == Restarting) {
1772 		dev_info(DEV, "Restarting %s\n", current->comm);
1773 		thi->t_state = Running;
1774 		spin_unlock_irqrestore(&thi->t_lock, flags);
1775 		goto restart;
1776 	}
1777 
1778 	thi->task = NULL;
1779 	thi->t_state = None;
1780 	smp_mb();
1781 	complete(&thi->stop);
1782 	spin_unlock_irqrestore(&thi->t_lock, flags);
1783 
1784 	dev_info(DEV, "Terminating %s\n", current->comm);
1785 
1786 	/* Release mod reference taken when thread was started */
1787 	module_put(THIS_MODULE);
1788 	return retval;
1789 }
1790 
1791 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1792 		      int (*func) (struct drbd_thread *))
1793 {
1794 	spin_lock_init(&thi->t_lock);
1795 	thi->task    = NULL;
1796 	thi->t_state = None;
1797 	thi->function = func;
1798 	thi->mdev = mdev;
1799 }
1800 
1801 int drbd_thread_start(struct drbd_thread *thi)
1802 {
1803 	struct drbd_conf *mdev = thi->mdev;
1804 	struct task_struct *nt;
1805 	unsigned long flags;
1806 
1807 	const char *me =
1808 		thi == &mdev->receiver ? "receiver" :
1809 		thi == &mdev->asender  ? "asender"  :
1810 		thi == &mdev->worker   ? "worker"   : "NONSENSE";
1811 
1812 	/* is used from state engine doing drbd_thread_stop_nowait,
1813 	 * while holding the req lock irqsave */
1814 	spin_lock_irqsave(&thi->t_lock, flags);
1815 
1816 	switch (thi->t_state) {
1817 	case None:
1818 		dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1819 				me, current->comm, current->pid);
1820 
1821 		/* Get ref on module for thread - this is released when thread exits */
1822 		if (!try_module_get(THIS_MODULE)) {
1823 			dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1824 			spin_unlock_irqrestore(&thi->t_lock, flags);
1825 			return false;
1826 		}
1827 
1828 		init_completion(&thi->stop);
1829 		D_ASSERT(thi->task == NULL);
1830 		thi->reset_cpu_mask = 1;
1831 		thi->t_state = Running;
1832 		spin_unlock_irqrestore(&thi->t_lock, flags);
1833 		flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1834 
1835 		nt = kthread_create(drbd_thread_setup, (void *) thi,
1836 				    "drbd%d_%s", mdev_to_minor(mdev), me);
1837 
1838 		if (IS_ERR(nt)) {
1839 			dev_err(DEV, "Couldn't start thread\n");
1840 
1841 			module_put(THIS_MODULE);
1842 			return false;
1843 		}
1844 		spin_lock_irqsave(&thi->t_lock, flags);
1845 		thi->task = nt;
1846 		thi->t_state = Running;
1847 		spin_unlock_irqrestore(&thi->t_lock, flags);
1848 		wake_up_process(nt);
1849 		break;
1850 	case Exiting:
1851 		thi->t_state = Restarting;
1852 		dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1853 				me, current->comm, current->pid);
1854 		/* fall through */
1855 	case Running:
1856 	case Restarting:
1857 	default:
1858 		spin_unlock_irqrestore(&thi->t_lock, flags);
1859 		break;
1860 	}
1861 
1862 	return true;
1863 }
1864 
1865 
1866 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1867 {
1868 	unsigned long flags;
1869 
1870 	enum drbd_thread_state ns = restart ? Restarting : Exiting;
1871 
1872 	/* may be called from state engine, holding the req lock irqsave */
1873 	spin_lock_irqsave(&thi->t_lock, flags);
1874 
1875 	if (thi->t_state == None) {
1876 		spin_unlock_irqrestore(&thi->t_lock, flags);
1877 		if (restart)
1878 			drbd_thread_start(thi);
1879 		return;
1880 	}
1881 
1882 	if (thi->t_state != ns) {
1883 		if (thi->task == NULL) {
1884 			spin_unlock_irqrestore(&thi->t_lock, flags);
1885 			return;
1886 		}
1887 
1888 		thi->t_state = ns;
1889 		smp_mb();
1890 		init_completion(&thi->stop);
1891 		if (thi->task != current)
1892 			force_sig(DRBD_SIGKILL, thi->task);
1893 
1894 	}
1895 
1896 	spin_unlock_irqrestore(&thi->t_lock, flags);
1897 
1898 	if (wait)
1899 		wait_for_completion(&thi->stop);
1900 }
1901 
1902 #ifdef CONFIG_SMP
1903 /**
1904  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1905  * @mdev:	DRBD device.
1906  *
1907  * Forces all threads of a device onto the same CPU. This is beneficial for
1908  * DRBD's performance. May be overwritten by user's configuration.
1909  */
1910 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1911 {
1912 	int ord, cpu;
1913 
1914 	/* user override. */
1915 	if (cpumask_weight(mdev->cpu_mask))
1916 		return;
1917 
1918 	ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1919 	for_each_online_cpu(cpu) {
1920 		if (ord-- == 0) {
1921 			cpumask_set_cpu(cpu, mdev->cpu_mask);
1922 			return;
1923 		}
1924 	}
1925 	/* should not be reached */
1926 	cpumask_setall(mdev->cpu_mask);
1927 }
1928 
1929 /**
1930  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1931  * @mdev:	DRBD device.
1932  *
1933  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1934  * prematurely.
1935  */
1936 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1937 {
1938 	struct task_struct *p = current;
1939 	struct drbd_thread *thi =
1940 		p == mdev->asender.task  ? &mdev->asender  :
1941 		p == mdev->receiver.task ? &mdev->receiver :
1942 		p == mdev->worker.task   ? &mdev->worker   :
1943 		NULL;
1944 	ERR_IF(thi == NULL)
1945 		return;
1946 	if (!thi->reset_cpu_mask)
1947 		return;
1948 	thi->reset_cpu_mask = 0;
1949 	set_cpus_allowed_ptr(p, mdev->cpu_mask);
1950 }
1951 #endif
1952 
1953 /* the appropriate socket mutex must be held already */
1954 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1955 			  enum drbd_packets cmd, struct p_header80 *h,
1956 			  size_t size, unsigned msg_flags)
1957 {
1958 	int sent, ok;
1959 
1960 	ERR_IF(!h) return false;
1961 	ERR_IF(!size) return false;
1962 
1963 	h->magic   = BE_DRBD_MAGIC;
1964 	h->command = cpu_to_be16(cmd);
1965 	h->length  = cpu_to_be16(size-sizeof(struct p_header80));
1966 
1967 	sent = drbd_send(mdev, sock, h, size, msg_flags);
1968 
1969 	ok = (sent == size);
1970 	if (!ok && !signal_pending(current))
1971 		dev_warn(DEV, "short sent %s size=%d sent=%d\n",
1972 		    cmdname(cmd), (int)size, sent);
1973 	return ok;
1974 }
1975 
1976 /* don't pass the socket. we may only look at it
1977  * when we hold the appropriate socket mutex.
1978  */
1979 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1980 		  enum drbd_packets cmd, struct p_header80 *h, size_t size)
1981 {
1982 	int ok = 0;
1983 	struct socket *sock;
1984 
1985 	if (use_data_socket) {
1986 		mutex_lock(&mdev->data.mutex);
1987 		sock = mdev->data.socket;
1988 	} else {
1989 		mutex_lock(&mdev->meta.mutex);
1990 		sock = mdev->meta.socket;
1991 	}
1992 
1993 	/* drbd_disconnect() could have called drbd_free_sock()
1994 	 * while we were waiting in down()... */
1995 	if (likely(sock != NULL))
1996 		ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1997 
1998 	if (use_data_socket)
1999 		mutex_unlock(&mdev->data.mutex);
2000 	else
2001 		mutex_unlock(&mdev->meta.mutex);
2002 	return ok;
2003 }
2004 
2005 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
2006 		   size_t size)
2007 {
2008 	struct p_header80 h;
2009 	int ok;
2010 
2011 	h.magic   = BE_DRBD_MAGIC;
2012 	h.command = cpu_to_be16(cmd);
2013 	h.length  = cpu_to_be16(size);
2014 
2015 	if (!drbd_get_data_sock(mdev))
2016 		return 0;
2017 
2018 	ok = (sizeof(h) ==
2019 		drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
2020 	ok = ok && (size ==
2021 		drbd_send(mdev, mdev->data.socket, data, size, 0));
2022 
2023 	drbd_put_data_sock(mdev);
2024 
2025 	return ok;
2026 }
2027 
2028 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
2029 {
2030 	struct p_rs_param_95 *p;
2031 	struct socket *sock;
2032 	int size, rv;
2033 	const int apv = mdev->agreed_pro_version;
2034 
2035 	size = apv <= 87 ? sizeof(struct p_rs_param)
2036 		: apv == 88 ? sizeof(struct p_rs_param)
2037 			+ strlen(mdev->sync_conf.verify_alg) + 1
2038 		: apv <= 94 ? sizeof(struct p_rs_param_89)
2039 		: /* apv >= 95 */ sizeof(struct p_rs_param_95);
2040 
2041 	/* used from admin command context and receiver/worker context.
2042 	 * to avoid kmalloc, grab the socket right here,
2043 	 * then use the pre-allocated sbuf there */
2044 	mutex_lock(&mdev->data.mutex);
2045 	sock = mdev->data.socket;
2046 
2047 	if (likely(sock != NULL)) {
2048 		enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
2049 
2050 		p = &mdev->data.sbuf.rs_param_95;
2051 
2052 		/* initialize verify_alg and csums_alg */
2053 		memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2054 
2055 		p->rate = cpu_to_be32(sc->rate);
2056 		p->c_plan_ahead = cpu_to_be32(sc->c_plan_ahead);
2057 		p->c_delay_target = cpu_to_be32(sc->c_delay_target);
2058 		p->c_fill_target = cpu_to_be32(sc->c_fill_target);
2059 		p->c_max_rate = cpu_to_be32(sc->c_max_rate);
2060 
2061 		if (apv >= 88)
2062 			strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
2063 		if (apv >= 89)
2064 			strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
2065 
2066 		rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
2067 	} else
2068 		rv = 0; /* not ok */
2069 
2070 	mutex_unlock(&mdev->data.mutex);
2071 
2072 	return rv;
2073 }
2074 
2075 int drbd_send_protocol(struct drbd_conf *mdev)
2076 {
2077 	struct p_protocol *p;
2078 	int size, cf, rv;
2079 
2080 	size = sizeof(struct p_protocol);
2081 
2082 	if (mdev->agreed_pro_version >= 87)
2083 		size += strlen(mdev->net_conf->integrity_alg) + 1;
2084 
2085 	/* we must not recurse into our own queue,
2086 	 * as that is blocked during handshake */
2087 	p = kmalloc(size, GFP_NOIO);
2088 	if (p == NULL)
2089 		return 0;
2090 
2091 	p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
2092 	p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
2093 	p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
2094 	p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
2095 	p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
2096 
2097 	cf = 0;
2098 	if (mdev->net_conf->want_lose)
2099 		cf |= CF_WANT_LOSE;
2100 	if (mdev->net_conf->dry_run) {
2101 		if (mdev->agreed_pro_version >= 92)
2102 			cf |= CF_DRY_RUN;
2103 		else {
2104 			dev_err(DEV, "--dry-run is not supported by peer");
2105 			kfree(p);
2106 			return -1;
2107 		}
2108 	}
2109 	p->conn_flags    = cpu_to_be32(cf);
2110 
2111 	if (mdev->agreed_pro_version >= 87)
2112 		strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
2113 
2114 	rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
2115 			   (struct p_header80 *)p, size);
2116 	kfree(p);
2117 	return rv;
2118 }
2119 
2120 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
2121 {
2122 	struct p_uuids p;
2123 	int i;
2124 
2125 	if (!get_ldev_if_state(mdev, D_NEGOTIATING))
2126 		return 1;
2127 
2128 	for (i = UI_CURRENT; i < UI_SIZE; i++)
2129 		p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
2130 
2131 	mdev->comm_bm_set = drbd_bm_total_weight(mdev);
2132 	p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
2133 	uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
2134 	uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
2135 	uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
2136 	p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
2137 
2138 	put_ldev(mdev);
2139 
2140 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
2141 			     (struct p_header80 *)&p, sizeof(p));
2142 }
2143 
2144 int drbd_send_uuids(struct drbd_conf *mdev)
2145 {
2146 	return _drbd_send_uuids(mdev, 0);
2147 }
2148 
2149 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
2150 {
2151 	return _drbd_send_uuids(mdev, 8);
2152 }
2153 
2154 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
2155 {
2156 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
2157 		u64 *uuid = mdev->ldev->md.uuid;
2158 		dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
2159 		     text,
2160 		     (unsigned long long)uuid[UI_CURRENT],
2161 		     (unsigned long long)uuid[UI_BITMAP],
2162 		     (unsigned long long)uuid[UI_HISTORY_START],
2163 		     (unsigned long long)uuid[UI_HISTORY_END]);
2164 		put_ldev(mdev);
2165 	} else {
2166 		dev_info(DEV, "%s effective data uuid: %016llX\n",
2167 				text,
2168 				(unsigned long long)mdev->ed_uuid);
2169 	}
2170 }
2171 
2172 int drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
2173 {
2174 	struct p_rs_uuid p;
2175 	u64 uuid;
2176 
2177 	D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
2178 
2179 	uuid = mdev->ldev->md.uuid[UI_BITMAP];
2180 	if (uuid && uuid != UUID_JUST_CREATED)
2181 		uuid = uuid + UUID_NEW_BM_OFFSET;
2182 	else
2183 		get_random_bytes(&uuid, sizeof(u64));
2184 	drbd_uuid_set(mdev, UI_BITMAP, uuid);
2185 	drbd_print_uuids(mdev, "updated sync UUID");
2186 	drbd_md_sync(mdev);
2187 	p.uuid = cpu_to_be64(uuid);
2188 
2189 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
2190 			     (struct p_header80 *)&p, sizeof(p));
2191 }
2192 
2193 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
2194 {
2195 	struct p_sizes p;
2196 	sector_t d_size, u_size;
2197 	int q_order_type, max_bio_size;
2198 	int ok;
2199 
2200 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
2201 		D_ASSERT(mdev->ldev->backing_bdev);
2202 		d_size = drbd_get_max_capacity(mdev->ldev);
2203 		u_size = mdev->ldev->dc.disk_size;
2204 		q_order_type = drbd_queue_order_type(mdev);
2205 		max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
2206 		max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
2207 		put_ldev(mdev);
2208 	} else {
2209 		d_size = 0;
2210 		u_size = 0;
2211 		q_order_type = QUEUE_ORDERED_NONE;
2212 		max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
2213 	}
2214 
2215 	/* Never allow old drbd (up to 8.3.7) to see more than 32KiB */
2216 	if (mdev->agreed_pro_version <= 94)
2217 		max_bio_size = min_t(int, max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
2218 
2219 	p.d_size = cpu_to_be64(d_size);
2220 	p.u_size = cpu_to_be64(u_size);
2221 	p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
2222 	p.max_bio_size = cpu_to_be32(max_bio_size);
2223 	p.queue_order_type = cpu_to_be16(q_order_type);
2224 	p.dds_flags = cpu_to_be16(flags);
2225 
2226 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
2227 			   (struct p_header80 *)&p, sizeof(p));
2228 	return ok;
2229 }
2230 
2231 /**
2232  * drbd_send_current_state() - Sends the drbd state to the peer
2233  * @mdev:	DRBD device.
2234  */
2235 int drbd_send_current_state(struct drbd_conf *mdev)
2236 {
2237 	struct socket *sock;
2238 	struct p_state p;
2239 	int ok = 0;
2240 
2241 	/* Grab state lock so we wont send state if we're in the middle
2242 	 * of a cluster wide state change on another thread */
2243 	drbd_state_lock(mdev);
2244 
2245 	mutex_lock(&mdev->data.mutex);
2246 
2247 	p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
2248 	sock = mdev->data.socket;
2249 
2250 	if (likely(sock != NULL)) {
2251 		ok = _drbd_send_cmd(mdev, sock, P_STATE,
2252 				    (struct p_header80 *)&p, sizeof(p), 0);
2253 	}
2254 
2255 	mutex_unlock(&mdev->data.mutex);
2256 
2257 	drbd_state_unlock(mdev);
2258 	return ok;
2259 }
2260 
2261 /**
2262  * drbd_send_state() - After a state change, sends the new state to the peer
2263  * @mdev:	DRBD device.
2264  * @state:	the state to send, not necessarily the current state.
2265  *
2266  * Each state change queues an "after_state_ch" work, which will eventually
2267  * send the resulting new state to the peer. If more state changes happen
2268  * between queuing and processing of the after_state_ch work, we still
2269  * want to send each intermediary state in the order it occurred.
2270  */
2271 int drbd_send_state(struct drbd_conf *mdev, union drbd_state state)
2272 {
2273 	struct socket *sock;
2274 	struct p_state p;
2275 	int ok = 0;
2276 
2277 	mutex_lock(&mdev->data.mutex);
2278 
2279 	p.state = cpu_to_be32(state.i);
2280 	sock = mdev->data.socket;
2281 
2282 	if (likely(sock != NULL)) {
2283 		ok = _drbd_send_cmd(mdev, sock, P_STATE,
2284 				    (struct p_header80 *)&p, sizeof(p), 0);
2285 	}
2286 
2287 	mutex_unlock(&mdev->data.mutex);
2288 
2289 	return ok;
2290 }
2291 
2292 int drbd_send_state_req(struct drbd_conf *mdev,
2293 	union drbd_state mask, union drbd_state val)
2294 {
2295 	struct p_req_state p;
2296 
2297 	p.mask    = cpu_to_be32(mask.i);
2298 	p.val     = cpu_to_be32(val.i);
2299 
2300 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
2301 			     (struct p_header80 *)&p, sizeof(p));
2302 }
2303 
2304 int drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
2305 {
2306 	struct p_req_state_reply p;
2307 
2308 	p.retcode    = cpu_to_be32(retcode);
2309 
2310 	return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
2311 			     (struct p_header80 *)&p, sizeof(p));
2312 }
2313 
2314 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
2315 	struct p_compressed_bm *p,
2316 	struct bm_xfer_ctx *c)
2317 {
2318 	struct bitstream bs;
2319 	unsigned long plain_bits;
2320 	unsigned long tmp;
2321 	unsigned long rl;
2322 	unsigned len;
2323 	unsigned toggle;
2324 	int bits;
2325 
2326 	/* may we use this feature? */
2327 	if ((mdev->sync_conf.use_rle == 0) ||
2328 		(mdev->agreed_pro_version < 90))
2329 			return 0;
2330 
2331 	if (c->bit_offset >= c->bm_bits)
2332 		return 0; /* nothing to do. */
2333 
2334 	/* use at most thus many bytes */
2335 	bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
2336 	memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
2337 	/* plain bits covered in this code string */
2338 	plain_bits = 0;
2339 
2340 	/* p->encoding & 0x80 stores whether the first run length is set.
2341 	 * bit offset is implicit.
2342 	 * start with toggle == 2 to be able to tell the first iteration */
2343 	toggle = 2;
2344 
2345 	/* see how much plain bits we can stuff into one packet
2346 	 * using RLE and VLI. */
2347 	do {
2348 		tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
2349 				    : _drbd_bm_find_next(mdev, c->bit_offset);
2350 		if (tmp == -1UL)
2351 			tmp = c->bm_bits;
2352 		rl = tmp - c->bit_offset;
2353 
2354 		if (toggle == 2) { /* first iteration */
2355 			if (rl == 0) {
2356 				/* the first checked bit was set,
2357 				 * store start value, */
2358 				DCBP_set_start(p, 1);
2359 				/* but skip encoding of zero run length */
2360 				toggle = !toggle;
2361 				continue;
2362 			}
2363 			DCBP_set_start(p, 0);
2364 		}
2365 
2366 		/* paranoia: catch zero runlength.
2367 		 * can only happen if bitmap is modified while we scan it. */
2368 		if (rl == 0) {
2369 			dev_err(DEV, "unexpected zero runlength while encoding bitmap "
2370 			    "t:%u bo:%lu\n", toggle, c->bit_offset);
2371 			return -1;
2372 		}
2373 
2374 		bits = vli_encode_bits(&bs, rl);
2375 		if (bits == -ENOBUFS) /* buffer full */
2376 			break;
2377 		if (bits <= 0) {
2378 			dev_err(DEV, "error while encoding bitmap: %d\n", bits);
2379 			return 0;
2380 		}
2381 
2382 		toggle = !toggle;
2383 		plain_bits += rl;
2384 		c->bit_offset = tmp;
2385 	} while (c->bit_offset < c->bm_bits);
2386 
2387 	len = bs.cur.b - p->code + !!bs.cur.bit;
2388 
2389 	if (plain_bits < (len << 3)) {
2390 		/* incompressible with this method.
2391 		 * we need to rewind both word and bit position. */
2392 		c->bit_offset -= plain_bits;
2393 		bm_xfer_ctx_bit_to_word_offset(c);
2394 		c->bit_offset = c->word_offset * BITS_PER_LONG;
2395 		return 0;
2396 	}
2397 
2398 	/* RLE + VLI was able to compress it just fine.
2399 	 * update c->word_offset. */
2400 	bm_xfer_ctx_bit_to_word_offset(c);
2401 
2402 	/* store pad_bits */
2403 	DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
2404 
2405 	return len;
2406 }
2407 
2408 /**
2409  * send_bitmap_rle_or_plain
2410  *
2411  * Return 0 when done, 1 when another iteration is needed, and a negative error
2412  * code upon failure.
2413  */
2414 static int
2415 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
2416 			 struct p_header80 *h, struct bm_xfer_ctx *c)
2417 {
2418 	struct p_compressed_bm *p = (void*)h;
2419 	unsigned long num_words;
2420 	int len;
2421 	int ok;
2422 
2423 	len = fill_bitmap_rle_bits(mdev, p, c);
2424 
2425 	if (len < 0)
2426 		return -EIO;
2427 
2428 	if (len) {
2429 		DCBP_set_code(p, RLE_VLI_Bits);
2430 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
2431 			sizeof(*p) + len, 0);
2432 
2433 		c->packets[0]++;
2434 		c->bytes[0] += sizeof(*p) + len;
2435 
2436 		if (c->bit_offset >= c->bm_bits)
2437 			len = 0; /* DONE */
2438 	} else {
2439 		/* was not compressible.
2440 		 * send a buffer full of plain text bits instead. */
2441 		num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
2442 		len = num_words * sizeof(long);
2443 		if (len)
2444 			drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
2445 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
2446 				   h, sizeof(struct p_header80) + len, 0);
2447 		c->word_offset += num_words;
2448 		c->bit_offset = c->word_offset * BITS_PER_LONG;
2449 
2450 		c->packets[1]++;
2451 		c->bytes[1] += sizeof(struct p_header80) + len;
2452 
2453 		if (c->bit_offset > c->bm_bits)
2454 			c->bit_offset = c->bm_bits;
2455 	}
2456 	if (ok) {
2457 		if (len == 0) {
2458 			INFO_bm_xfer_stats(mdev, "send", c);
2459 			return 0;
2460 		} else
2461 			return 1;
2462 	}
2463 	return -EIO;
2464 }
2465 
2466 /* See the comment at receive_bitmap() */
2467 int _drbd_send_bitmap(struct drbd_conf *mdev)
2468 {
2469 	struct bm_xfer_ctx c;
2470 	struct p_header80 *p;
2471 	int err;
2472 
2473 	ERR_IF(!mdev->bitmap) return false;
2474 
2475 	/* maybe we should use some per thread scratch page,
2476 	 * and allocate that during initial device creation? */
2477 	p = (struct p_header80 *) __get_free_page(GFP_NOIO);
2478 	if (!p) {
2479 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
2480 		return false;
2481 	}
2482 
2483 	if (get_ldev(mdev)) {
2484 		if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2485 			dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2486 			drbd_bm_set_all(mdev);
2487 			if (drbd_bm_write(mdev)) {
2488 				/* write_bm did fail! Leave full sync flag set in Meta P_DATA
2489 				 * but otherwise process as per normal - need to tell other
2490 				 * side that a full resync is required! */
2491 				dev_err(DEV, "Failed to write bitmap to disk!\n");
2492 			} else {
2493 				drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2494 				drbd_md_sync(mdev);
2495 			}
2496 		}
2497 		put_ldev(mdev);
2498 	}
2499 
2500 	c = (struct bm_xfer_ctx) {
2501 		.bm_bits = drbd_bm_bits(mdev),
2502 		.bm_words = drbd_bm_words(mdev),
2503 	};
2504 
2505 	do {
2506 		err = send_bitmap_rle_or_plain(mdev, p, &c);
2507 	} while (err > 0);
2508 
2509 	free_page((unsigned long) p);
2510 	return err == 0;
2511 }
2512 
2513 int drbd_send_bitmap(struct drbd_conf *mdev)
2514 {
2515 	int err;
2516 
2517 	if (!drbd_get_data_sock(mdev))
2518 		return -1;
2519 	err = !_drbd_send_bitmap(mdev);
2520 	drbd_put_data_sock(mdev);
2521 	return err;
2522 }
2523 
2524 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2525 {
2526 	int ok;
2527 	struct p_barrier_ack p;
2528 
2529 	p.barrier  = barrier_nr;
2530 	p.set_size = cpu_to_be32(set_size);
2531 
2532 	if (mdev->state.conn < C_CONNECTED)
2533 		return false;
2534 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2535 			(struct p_header80 *)&p, sizeof(p));
2536 	return ok;
2537 }
2538 
2539 /**
2540  * _drbd_send_ack() - Sends an ack packet
2541  * @mdev:	DRBD device.
2542  * @cmd:	Packet command code.
2543  * @sector:	sector, needs to be in big endian byte order
2544  * @blksize:	size in byte, needs to be in big endian byte order
2545  * @block_id:	Id, big endian byte order
2546  */
2547 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2548 			  u64 sector,
2549 			  u32 blksize,
2550 			  u64 block_id)
2551 {
2552 	int ok;
2553 	struct p_block_ack p;
2554 
2555 	p.sector   = sector;
2556 	p.block_id = block_id;
2557 	p.blksize  = blksize;
2558 	p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2559 
2560 	if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2561 		return false;
2562 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2563 				(struct p_header80 *)&p, sizeof(p));
2564 	return ok;
2565 }
2566 
2567 /* dp->sector and dp->block_id already/still in network byte order,
2568  * data_size is payload size according to dp->head,
2569  * and may need to be corrected for digest size. */
2570 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2571 		     struct p_data *dp, int data_size)
2572 {
2573 	data_size -= (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
2574 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
2575 	return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2576 			      dp->block_id);
2577 }
2578 
2579 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2580 		     struct p_block_req *rp)
2581 {
2582 	return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2583 }
2584 
2585 /**
2586  * drbd_send_ack() - Sends an ack packet
2587  * @mdev:	DRBD device.
2588  * @cmd:	Packet command code.
2589  * @e:		Epoch entry.
2590  */
2591 int drbd_send_ack(struct drbd_conf *mdev,
2592 	enum drbd_packets cmd, struct drbd_epoch_entry *e)
2593 {
2594 	return _drbd_send_ack(mdev, cmd,
2595 			      cpu_to_be64(e->sector),
2596 			      cpu_to_be32(e->size),
2597 			      e->block_id);
2598 }
2599 
2600 /* This function misuses the block_id field to signal if the blocks
2601  * are is sync or not. */
2602 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2603 		     sector_t sector, int blksize, u64 block_id)
2604 {
2605 	return _drbd_send_ack(mdev, cmd,
2606 			      cpu_to_be64(sector),
2607 			      cpu_to_be32(blksize),
2608 			      cpu_to_be64(block_id));
2609 }
2610 
2611 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2612 		       sector_t sector, int size, u64 block_id)
2613 {
2614 	int ok;
2615 	struct p_block_req p;
2616 
2617 	p.sector   = cpu_to_be64(sector);
2618 	p.block_id = block_id;
2619 	p.blksize  = cpu_to_be32(size);
2620 
2621 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2622 				(struct p_header80 *)&p, sizeof(p));
2623 	return ok;
2624 }
2625 
2626 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2627 			    sector_t sector, int size,
2628 			    void *digest, int digest_size,
2629 			    enum drbd_packets cmd)
2630 {
2631 	int ok;
2632 	struct p_block_req p;
2633 
2634 	p.sector   = cpu_to_be64(sector);
2635 	p.block_id = BE_DRBD_MAGIC + 0xbeef;
2636 	p.blksize  = cpu_to_be32(size);
2637 
2638 	p.head.magic   = BE_DRBD_MAGIC;
2639 	p.head.command = cpu_to_be16(cmd);
2640 	p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + digest_size);
2641 
2642 	mutex_lock(&mdev->data.mutex);
2643 
2644 	ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2645 	ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2646 
2647 	mutex_unlock(&mdev->data.mutex);
2648 
2649 	return ok;
2650 }
2651 
2652 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2653 {
2654 	int ok;
2655 	struct p_block_req p;
2656 
2657 	p.sector   = cpu_to_be64(sector);
2658 	p.block_id = BE_DRBD_MAGIC + 0xbabe;
2659 	p.blksize  = cpu_to_be32(size);
2660 
2661 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2662 			   (struct p_header80 *)&p, sizeof(p));
2663 	return ok;
2664 }
2665 
2666 /* called on sndtimeo
2667  * returns false if we should retry,
2668  * true if we think connection is dead
2669  */
2670 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2671 {
2672 	int drop_it;
2673 	/* long elapsed = (long)(jiffies - mdev->last_received); */
2674 
2675 	drop_it =   mdev->meta.socket == sock
2676 		|| !mdev->asender.task
2677 		|| get_t_state(&mdev->asender) != Running
2678 		|| mdev->state.conn < C_CONNECTED;
2679 
2680 	if (drop_it)
2681 		return true;
2682 
2683 	drop_it = !--mdev->ko_count;
2684 	if (!drop_it) {
2685 		dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2686 		       current->comm, current->pid, mdev->ko_count);
2687 		request_ping(mdev);
2688 	}
2689 
2690 	return drop_it; /* && (mdev->state == R_PRIMARY) */;
2691 }
2692 
2693 /* The idea of sendpage seems to be to put some kind of reference
2694  * to the page into the skb, and to hand it over to the NIC. In
2695  * this process get_page() gets called.
2696  *
2697  * As soon as the page was really sent over the network put_page()
2698  * gets called by some part of the network layer. [ NIC driver? ]
2699  *
2700  * [ get_page() / put_page() increment/decrement the count. If count
2701  *   reaches 0 the page will be freed. ]
2702  *
2703  * This works nicely with pages from FSs.
2704  * But this means that in protocol A we might signal IO completion too early!
2705  *
2706  * In order not to corrupt data during a resync we must make sure
2707  * that we do not reuse our own buffer pages (EEs) to early, therefore
2708  * we have the net_ee list.
2709  *
2710  * XFS seems to have problems, still, it submits pages with page_count == 0!
2711  * As a workaround, we disable sendpage on pages
2712  * with page_count == 0 or PageSlab.
2713  */
2714 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2715 		   int offset, size_t size, unsigned msg_flags)
2716 {
2717 	int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, msg_flags);
2718 	kunmap(page);
2719 	if (sent == size)
2720 		mdev->send_cnt += size>>9;
2721 	return sent == size;
2722 }
2723 
2724 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2725 		    int offset, size_t size, unsigned msg_flags)
2726 {
2727 	mm_segment_t oldfs = get_fs();
2728 	int sent, ok;
2729 	int len = size;
2730 
2731 	/* e.g. XFS meta- & log-data is in slab pages, which have a
2732 	 * page_count of 0 and/or have PageSlab() set.
2733 	 * we cannot use send_page for those, as that does get_page();
2734 	 * put_page(); and would cause either a VM_BUG directly, or
2735 	 * __page_cache_release a page that would actually still be referenced
2736 	 * by someone, leading to some obscure delayed Oops somewhere else. */
2737 	if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2738 		return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
2739 
2740 	msg_flags |= MSG_NOSIGNAL;
2741 	drbd_update_congested(mdev);
2742 	set_fs(KERNEL_DS);
2743 	do {
2744 		sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2745 							offset, len,
2746 							msg_flags);
2747 		if (sent == -EAGAIN) {
2748 			if (we_should_drop_the_connection(mdev,
2749 							  mdev->data.socket))
2750 				break;
2751 			else
2752 				continue;
2753 		}
2754 		if (sent <= 0) {
2755 			dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2756 			     __func__, (int)size, len, sent);
2757 			break;
2758 		}
2759 		len    -= sent;
2760 		offset += sent;
2761 	} while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2762 	set_fs(oldfs);
2763 	clear_bit(NET_CONGESTED, &mdev->flags);
2764 
2765 	ok = (len == 0);
2766 	if (likely(ok))
2767 		mdev->send_cnt += size>>9;
2768 	return ok;
2769 }
2770 
2771 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2772 {
2773 	struct bio_vec *bvec;
2774 	int i;
2775 	/* hint all but last page with MSG_MORE */
2776 	bio_for_each_segment(bvec, bio, i) {
2777 		if (!_drbd_no_send_page(mdev, bvec->bv_page,
2778 				     bvec->bv_offset, bvec->bv_len,
2779 				     i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2780 			return 0;
2781 	}
2782 	return 1;
2783 }
2784 
2785 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2786 {
2787 	struct bio_vec *bvec;
2788 	int i;
2789 	/* hint all but last page with MSG_MORE */
2790 	bio_for_each_segment(bvec, bio, i) {
2791 		if (!_drbd_send_page(mdev, bvec->bv_page,
2792 				     bvec->bv_offset, bvec->bv_len,
2793 				     i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2794 			return 0;
2795 	}
2796 	return 1;
2797 }
2798 
2799 static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2800 {
2801 	struct page *page = e->pages;
2802 	unsigned len = e->size;
2803 	/* hint all but last page with MSG_MORE */
2804 	page_chain_for_each(page) {
2805 		unsigned l = min_t(unsigned, len, PAGE_SIZE);
2806 		if (!_drbd_send_page(mdev, page, 0, l,
2807 				page_chain_next(page) ? MSG_MORE : 0))
2808 			return 0;
2809 		len -= l;
2810 	}
2811 	return 1;
2812 }
2813 
2814 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
2815 {
2816 	if (mdev->agreed_pro_version >= 95)
2817 		return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
2818 			(bi_rw & REQ_FUA ? DP_FUA : 0) |
2819 			(bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
2820 			(bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
2821 	else
2822 		return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
2823 }
2824 
2825 /* Used to send write requests
2826  * R_PRIMARY -> Peer	(P_DATA)
2827  */
2828 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2829 {
2830 	int ok = 1;
2831 	struct p_data p;
2832 	unsigned int dp_flags = 0;
2833 	void *dgb;
2834 	int dgs;
2835 
2836 	if (!drbd_get_data_sock(mdev))
2837 		return 0;
2838 
2839 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2840 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2841 
2842 	if (req->size <= DRBD_MAX_SIZE_H80_PACKET) {
2843 		p.head.h80.magic   = BE_DRBD_MAGIC;
2844 		p.head.h80.command = cpu_to_be16(P_DATA);
2845 		p.head.h80.length  =
2846 			cpu_to_be16(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2847 	} else {
2848 		p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2849 		p.head.h95.command = cpu_to_be16(P_DATA);
2850 		p.head.h95.length  =
2851 			cpu_to_be32(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2852 	}
2853 
2854 	p.sector   = cpu_to_be64(req->sector);
2855 	p.block_id = (unsigned long)req;
2856 	p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2857 
2858 	dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
2859 
2860 	if (mdev->state.conn >= C_SYNC_SOURCE &&
2861 	    mdev->state.conn <= C_PAUSED_SYNC_T)
2862 		dp_flags |= DP_MAY_SET_IN_SYNC;
2863 
2864 	p.dp_flags = cpu_to_be32(dp_flags);
2865 	set_bit(UNPLUG_REMOTE, &mdev->flags);
2866 	ok = (sizeof(p) ==
2867 		drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
2868 	if (ok && dgs) {
2869 		dgb = mdev->int_dig_out;
2870 		drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2871 		ok = dgs == drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2872 	}
2873 	if (ok) {
2874 		/* For protocol A, we have to memcpy the payload into
2875 		 * socket buffers, as we may complete right away
2876 		 * as soon as we handed it over to tcp, at which point the data
2877 		 * pages may become invalid.
2878 		 *
2879 		 * For data-integrity enabled, we copy it as well, so we can be
2880 		 * sure that even if the bio pages may still be modified, it
2881 		 * won't change the data on the wire, thus if the digest checks
2882 		 * out ok after sending on this side, but does not fit on the
2883 		 * receiving side, we sure have detected corruption elsewhere.
2884 		 */
2885 		if (mdev->net_conf->wire_protocol == DRBD_PROT_A || dgs)
2886 			ok = _drbd_send_bio(mdev, req->master_bio);
2887 		else
2888 			ok = _drbd_send_zc_bio(mdev, req->master_bio);
2889 
2890 		/* double check digest, sometimes buffers have been modified in flight. */
2891 		if (dgs > 0 && dgs <= 64) {
2892 			/* 64 byte, 512 bit, is the largest digest size
2893 			 * currently supported in kernel crypto. */
2894 			unsigned char digest[64];
2895 			drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, digest);
2896 			if (memcmp(mdev->int_dig_out, digest, dgs)) {
2897 				dev_warn(DEV,
2898 					"Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
2899 					(unsigned long long)req->sector, req->size);
2900 			}
2901 		} /* else if (dgs > 64) {
2902 		     ... Be noisy about digest too large ...
2903 		} */
2904 	}
2905 
2906 	drbd_put_data_sock(mdev);
2907 
2908 	return ok;
2909 }
2910 
2911 /* answer packet, used to send data back for read requests:
2912  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2913  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2914  */
2915 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2916 		    struct drbd_epoch_entry *e)
2917 {
2918 	int ok;
2919 	struct p_data p;
2920 	void *dgb;
2921 	int dgs;
2922 
2923 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2924 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2925 
2926 	if (e->size <= DRBD_MAX_SIZE_H80_PACKET) {
2927 		p.head.h80.magic   = BE_DRBD_MAGIC;
2928 		p.head.h80.command = cpu_to_be16(cmd);
2929 		p.head.h80.length  =
2930 			cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2931 	} else {
2932 		p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2933 		p.head.h95.command = cpu_to_be16(cmd);
2934 		p.head.h95.length  =
2935 			cpu_to_be32(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2936 	}
2937 
2938 	p.sector   = cpu_to_be64(e->sector);
2939 	p.block_id = e->block_id;
2940 	/* p.seq_num  = 0;    No sequence numbers here.. */
2941 
2942 	/* Only called by our kernel thread.
2943 	 * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2944 	 * in response to admin command or module unload.
2945 	 */
2946 	if (!drbd_get_data_sock(mdev))
2947 		return 0;
2948 
2949 	ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0);
2950 	if (ok && dgs) {
2951 		dgb = mdev->int_dig_out;
2952 		drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
2953 		ok = dgs == drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2954 	}
2955 	if (ok)
2956 		ok = _drbd_send_zc_ee(mdev, e);
2957 
2958 	drbd_put_data_sock(mdev);
2959 
2960 	return ok;
2961 }
2962 
2963 int drbd_send_oos(struct drbd_conf *mdev, struct drbd_request *req)
2964 {
2965 	struct p_block_desc p;
2966 
2967 	p.sector  = cpu_to_be64(req->sector);
2968 	p.blksize = cpu_to_be32(req->size);
2969 
2970 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OUT_OF_SYNC, &p.head, sizeof(p));
2971 }
2972 
2973 /*
2974   drbd_send distinguishes two cases:
2975 
2976   Packets sent via the data socket "sock"
2977   and packets sent via the meta data socket "msock"
2978 
2979 		    sock                      msock
2980   -----------------+-------------------------+------------------------------
2981   timeout           conf.timeout / 2          conf.timeout / 2
2982   timeout action    send a ping via msock     Abort communication
2983 					      and close all sockets
2984 */
2985 
2986 /*
2987  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2988  */
2989 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2990 	      void *buf, size_t size, unsigned msg_flags)
2991 {
2992 	struct kvec iov;
2993 	struct msghdr msg;
2994 	int rv, sent = 0;
2995 
2996 	if (!sock)
2997 		return -1000;
2998 
2999 	/* THINK  if (signal_pending) return ... ? */
3000 
3001 	iov.iov_base = buf;
3002 	iov.iov_len  = size;
3003 
3004 	msg.msg_name       = NULL;
3005 	msg.msg_namelen    = 0;
3006 	msg.msg_control    = NULL;
3007 	msg.msg_controllen = 0;
3008 	msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
3009 
3010 	if (sock == mdev->data.socket) {
3011 		mdev->ko_count = mdev->net_conf->ko_count;
3012 		drbd_update_congested(mdev);
3013 	}
3014 	do {
3015 		/* STRANGE
3016 		 * tcp_sendmsg does _not_ use its size parameter at all ?
3017 		 *
3018 		 * -EAGAIN on timeout, -EINTR on signal.
3019 		 */
3020 /* THINK
3021  * do we need to block DRBD_SIG if sock == &meta.socket ??
3022  * otherwise wake_asender() might interrupt some send_*Ack !
3023  */
3024 		rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
3025 		if (rv == -EAGAIN) {
3026 			if (we_should_drop_the_connection(mdev, sock))
3027 				break;
3028 			else
3029 				continue;
3030 		}
3031 		D_ASSERT(rv != 0);
3032 		if (rv == -EINTR) {
3033 			flush_signals(current);
3034 			rv = 0;
3035 		}
3036 		if (rv < 0)
3037 			break;
3038 		sent += rv;
3039 		iov.iov_base += rv;
3040 		iov.iov_len  -= rv;
3041 	} while (sent < size);
3042 
3043 	if (sock == mdev->data.socket)
3044 		clear_bit(NET_CONGESTED, &mdev->flags);
3045 
3046 	if (rv <= 0) {
3047 		if (rv != -EAGAIN) {
3048 			dev_err(DEV, "%s_sendmsg returned %d\n",
3049 			    sock == mdev->meta.socket ? "msock" : "sock",
3050 			    rv);
3051 			drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
3052 		} else
3053 			drbd_force_state(mdev, NS(conn, C_TIMEOUT));
3054 	}
3055 
3056 	return sent;
3057 }
3058 
3059 static int drbd_open(struct block_device *bdev, fmode_t mode)
3060 {
3061 	struct drbd_conf *mdev = bdev->bd_disk->private_data;
3062 	unsigned long flags;
3063 	int rv = 0;
3064 
3065 	mutex_lock(&drbd_main_mutex);
3066 	spin_lock_irqsave(&mdev->req_lock, flags);
3067 	/* to have a stable mdev->state.role
3068 	 * and no race with updating open_cnt */
3069 
3070 	if (mdev->state.role != R_PRIMARY) {
3071 		if (mode & FMODE_WRITE)
3072 			rv = -EROFS;
3073 		else if (!allow_oos)
3074 			rv = -EMEDIUMTYPE;
3075 	}
3076 
3077 	if (!rv)
3078 		mdev->open_cnt++;
3079 	spin_unlock_irqrestore(&mdev->req_lock, flags);
3080 	mutex_unlock(&drbd_main_mutex);
3081 
3082 	return rv;
3083 }
3084 
3085 static int drbd_release(struct gendisk *gd, fmode_t mode)
3086 {
3087 	struct drbd_conf *mdev = gd->private_data;
3088 	mutex_lock(&drbd_main_mutex);
3089 	mdev->open_cnt--;
3090 	mutex_unlock(&drbd_main_mutex);
3091 	return 0;
3092 }
3093 
3094 static void drbd_set_defaults(struct drbd_conf *mdev)
3095 {
3096 	/* This way we get a compile error when sync_conf grows,
3097 	   and we forgot to initialize it here */
3098 	mdev->sync_conf = (struct syncer_conf) {
3099 		/* .rate = */		DRBD_RATE_DEF,
3100 		/* .after = */		DRBD_AFTER_DEF,
3101 		/* .al_extents = */	DRBD_AL_EXTENTS_DEF,
3102 		/* .verify_alg = */	{}, 0,
3103 		/* .cpu_mask = */	{}, 0,
3104 		/* .csums_alg = */	{}, 0,
3105 		/* .use_rle = */	0,
3106 		/* .on_no_data = */	DRBD_ON_NO_DATA_DEF,
3107 		/* .c_plan_ahead = */	DRBD_C_PLAN_AHEAD_DEF,
3108 		/* .c_delay_target = */	DRBD_C_DELAY_TARGET_DEF,
3109 		/* .c_fill_target = */	DRBD_C_FILL_TARGET_DEF,
3110 		/* .c_max_rate = */	DRBD_C_MAX_RATE_DEF,
3111 		/* .c_min_rate = */	DRBD_C_MIN_RATE_DEF
3112 	};
3113 
3114 	/* Have to use that way, because the layout differs between
3115 	   big endian and little endian */
3116 	mdev->state = (union drbd_state) {
3117 		{ .role = R_SECONDARY,
3118 		  .peer = R_UNKNOWN,
3119 		  .conn = C_STANDALONE,
3120 		  .disk = D_DISKLESS,
3121 		  .pdsk = D_UNKNOWN,
3122 		  .susp = 0,
3123 		  .susp_nod = 0,
3124 		  .susp_fen = 0
3125 		} };
3126 }
3127 
3128 void drbd_init_set_defaults(struct drbd_conf *mdev)
3129 {
3130 	/* the memset(,0,) did most of this.
3131 	 * note: only assignments, no allocation in here */
3132 
3133 	drbd_set_defaults(mdev);
3134 
3135 	atomic_set(&mdev->ap_bio_cnt, 0);
3136 	atomic_set(&mdev->ap_pending_cnt, 0);
3137 	atomic_set(&mdev->rs_pending_cnt, 0);
3138 	atomic_set(&mdev->unacked_cnt, 0);
3139 	atomic_set(&mdev->local_cnt, 0);
3140 	atomic_set(&mdev->net_cnt, 0);
3141 	atomic_set(&mdev->packet_seq, 0);
3142 	atomic_set(&mdev->pp_in_use, 0);
3143 	atomic_set(&mdev->pp_in_use_by_net, 0);
3144 	atomic_set(&mdev->rs_sect_in, 0);
3145 	atomic_set(&mdev->rs_sect_ev, 0);
3146 	atomic_set(&mdev->ap_in_flight, 0);
3147 	atomic_set(&mdev->md_io_in_use, 0);
3148 
3149 	mutex_init(&mdev->data.mutex);
3150 	mutex_init(&mdev->meta.mutex);
3151 	sema_init(&mdev->data.work.s, 0);
3152 	sema_init(&mdev->meta.work.s, 0);
3153 	mutex_init(&mdev->state_mutex);
3154 
3155 	spin_lock_init(&mdev->data.work.q_lock);
3156 	spin_lock_init(&mdev->meta.work.q_lock);
3157 
3158 	spin_lock_init(&mdev->al_lock);
3159 	spin_lock_init(&mdev->req_lock);
3160 	spin_lock_init(&mdev->peer_seq_lock);
3161 	spin_lock_init(&mdev->epoch_lock);
3162 
3163 	INIT_LIST_HEAD(&mdev->active_ee);
3164 	INIT_LIST_HEAD(&mdev->sync_ee);
3165 	INIT_LIST_HEAD(&mdev->done_ee);
3166 	INIT_LIST_HEAD(&mdev->read_ee);
3167 	INIT_LIST_HEAD(&mdev->net_ee);
3168 	INIT_LIST_HEAD(&mdev->resync_reads);
3169 	INIT_LIST_HEAD(&mdev->data.work.q);
3170 	INIT_LIST_HEAD(&mdev->meta.work.q);
3171 	INIT_LIST_HEAD(&mdev->resync_work.list);
3172 	INIT_LIST_HEAD(&mdev->unplug_work.list);
3173 	INIT_LIST_HEAD(&mdev->go_diskless.list);
3174 	INIT_LIST_HEAD(&mdev->md_sync_work.list);
3175 	INIT_LIST_HEAD(&mdev->start_resync_work.list);
3176 	INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
3177 
3178 	mdev->resync_work.cb  = w_resync_timer;
3179 	mdev->unplug_work.cb  = w_send_write_hint;
3180 	mdev->go_diskless.cb  = w_go_diskless;
3181 	mdev->md_sync_work.cb = w_md_sync;
3182 	mdev->bm_io_work.w.cb = w_bitmap_io;
3183 	mdev->start_resync_work.cb = w_start_resync;
3184 	init_timer(&mdev->resync_timer);
3185 	init_timer(&mdev->md_sync_timer);
3186 	init_timer(&mdev->start_resync_timer);
3187 	init_timer(&mdev->request_timer);
3188 	mdev->resync_timer.function = resync_timer_fn;
3189 	mdev->resync_timer.data = (unsigned long) mdev;
3190 	mdev->md_sync_timer.function = md_sync_timer_fn;
3191 	mdev->md_sync_timer.data = (unsigned long) mdev;
3192 	mdev->start_resync_timer.function = start_resync_timer_fn;
3193 	mdev->start_resync_timer.data = (unsigned long) mdev;
3194 	mdev->request_timer.function = request_timer_fn;
3195 	mdev->request_timer.data = (unsigned long) mdev;
3196 
3197 	init_waitqueue_head(&mdev->misc_wait);
3198 	init_waitqueue_head(&mdev->state_wait);
3199 	init_waitqueue_head(&mdev->net_cnt_wait);
3200 	init_waitqueue_head(&mdev->ee_wait);
3201 	init_waitqueue_head(&mdev->al_wait);
3202 	init_waitqueue_head(&mdev->seq_wait);
3203 
3204 	drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
3205 	drbd_thread_init(mdev, &mdev->worker, drbd_worker);
3206 	drbd_thread_init(mdev, &mdev->asender, drbd_asender);
3207 
3208 	mdev->agreed_pro_version = PRO_VERSION_MAX;
3209 	mdev->write_ordering = WO_bdev_flush;
3210 	mdev->resync_wenr = LC_FREE;
3211 	mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
3212 	mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
3213 }
3214 
3215 void drbd_mdev_cleanup(struct drbd_conf *mdev)
3216 {
3217 	int i;
3218 	if (mdev->receiver.t_state != None)
3219 		dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
3220 				mdev->receiver.t_state);
3221 
3222 	/* no need to lock it, I'm the only thread alive */
3223 	if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
3224 		dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
3225 	mdev->al_writ_cnt  =
3226 	mdev->bm_writ_cnt  =
3227 	mdev->read_cnt     =
3228 	mdev->recv_cnt     =
3229 	mdev->send_cnt     =
3230 	mdev->writ_cnt     =
3231 	mdev->p_size       =
3232 	mdev->rs_start     =
3233 	mdev->rs_total     =
3234 	mdev->rs_failed    = 0;
3235 	mdev->rs_last_events = 0;
3236 	mdev->rs_last_sect_ev = 0;
3237 	for (i = 0; i < DRBD_SYNC_MARKS; i++) {
3238 		mdev->rs_mark_left[i] = 0;
3239 		mdev->rs_mark_time[i] = 0;
3240 	}
3241 	D_ASSERT(mdev->net_conf == NULL);
3242 
3243 	drbd_set_my_capacity(mdev, 0);
3244 	if (mdev->bitmap) {
3245 		/* maybe never allocated. */
3246 		drbd_bm_resize(mdev, 0, 1);
3247 		drbd_bm_cleanup(mdev);
3248 	}
3249 
3250 	drbd_free_resources(mdev);
3251 	clear_bit(AL_SUSPENDED, &mdev->flags);
3252 
3253 	/*
3254 	 * currently we drbd_init_ee only on module load, so
3255 	 * we may do drbd_release_ee only on module unload!
3256 	 */
3257 	D_ASSERT(list_empty(&mdev->active_ee));
3258 	D_ASSERT(list_empty(&mdev->sync_ee));
3259 	D_ASSERT(list_empty(&mdev->done_ee));
3260 	D_ASSERT(list_empty(&mdev->read_ee));
3261 	D_ASSERT(list_empty(&mdev->net_ee));
3262 	D_ASSERT(list_empty(&mdev->resync_reads));
3263 	D_ASSERT(list_empty(&mdev->data.work.q));
3264 	D_ASSERT(list_empty(&mdev->meta.work.q));
3265 	D_ASSERT(list_empty(&mdev->resync_work.list));
3266 	D_ASSERT(list_empty(&mdev->unplug_work.list));
3267 	D_ASSERT(list_empty(&mdev->go_diskless.list));
3268 
3269 	drbd_set_defaults(mdev);
3270 }
3271 
3272 
3273 static void drbd_destroy_mempools(void)
3274 {
3275 	struct page *page;
3276 
3277 	while (drbd_pp_pool) {
3278 		page = drbd_pp_pool;
3279 		drbd_pp_pool = (struct page *)page_private(page);
3280 		__free_page(page);
3281 		drbd_pp_vacant--;
3282 	}
3283 
3284 	/* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
3285 
3286 	if (drbd_md_io_bio_set)
3287 		bioset_free(drbd_md_io_bio_set);
3288 	if (drbd_md_io_page_pool)
3289 		mempool_destroy(drbd_md_io_page_pool);
3290 	if (drbd_ee_mempool)
3291 		mempool_destroy(drbd_ee_mempool);
3292 	if (drbd_request_mempool)
3293 		mempool_destroy(drbd_request_mempool);
3294 	if (drbd_ee_cache)
3295 		kmem_cache_destroy(drbd_ee_cache);
3296 	if (drbd_request_cache)
3297 		kmem_cache_destroy(drbd_request_cache);
3298 	if (drbd_bm_ext_cache)
3299 		kmem_cache_destroy(drbd_bm_ext_cache);
3300 	if (drbd_al_ext_cache)
3301 		kmem_cache_destroy(drbd_al_ext_cache);
3302 
3303 	drbd_md_io_bio_set   = NULL;
3304 	drbd_md_io_page_pool = NULL;
3305 	drbd_ee_mempool      = NULL;
3306 	drbd_request_mempool = NULL;
3307 	drbd_ee_cache        = NULL;
3308 	drbd_request_cache   = NULL;
3309 	drbd_bm_ext_cache    = NULL;
3310 	drbd_al_ext_cache    = NULL;
3311 
3312 	return;
3313 }
3314 
3315 static int drbd_create_mempools(void)
3316 {
3317 	struct page *page;
3318 	const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
3319 	int i;
3320 
3321 	/* prepare our caches and mempools */
3322 	drbd_request_mempool = NULL;
3323 	drbd_ee_cache        = NULL;
3324 	drbd_request_cache   = NULL;
3325 	drbd_bm_ext_cache    = NULL;
3326 	drbd_al_ext_cache    = NULL;
3327 	drbd_pp_pool         = NULL;
3328 	drbd_md_io_page_pool = NULL;
3329 	drbd_md_io_bio_set   = NULL;
3330 
3331 	/* caches */
3332 	drbd_request_cache = kmem_cache_create(
3333 		"drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
3334 	if (drbd_request_cache == NULL)
3335 		goto Enomem;
3336 
3337 	drbd_ee_cache = kmem_cache_create(
3338 		"drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
3339 	if (drbd_ee_cache == NULL)
3340 		goto Enomem;
3341 
3342 	drbd_bm_ext_cache = kmem_cache_create(
3343 		"drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
3344 	if (drbd_bm_ext_cache == NULL)
3345 		goto Enomem;
3346 
3347 	drbd_al_ext_cache = kmem_cache_create(
3348 		"drbd_al", sizeof(struct lc_element), 0, 0, NULL);
3349 	if (drbd_al_ext_cache == NULL)
3350 		goto Enomem;
3351 
3352 	/* mempools */
3353 #ifdef COMPAT_HAVE_BIOSET_CREATE
3354 	drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
3355 	if (drbd_md_io_bio_set == NULL)
3356 		goto Enomem;
3357 #endif
3358 
3359 	drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
3360 	if (drbd_md_io_page_pool == NULL)
3361 		goto Enomem;
3362 
3363 	drbd_request_mempool = mempool_create(number,
3364 		mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
3365 	if (drbd_request_mempool == NULL)
3366 		goto Enomem;
3367 
3368 	drbd_ee_mempool = mempool_create(number,
3369 		mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
3370 	if (drbd_ee_mempool == NULL)
3371 		goto Enomem;
3372 
3373 	/* drbd's page pool */
3374 	spin_lock_init(&drbd_pp_lock);
3375 
3376 	for (i = 0; i < number; i++) {
3377 		page = alloc_page(GFP_HIGHUSER);
3378 		if (!page)
3379 			goto Enomem;
3380 		set_page_private(page, (unsigned long)drbd_pp_pool);
3381 		drbd_pp_pool = page;
3382 	}
3383 	drbd_pp_vacant = number;
3384 
3385 	return 0;
3386 
3387 Enomem:
3388 	drbd_destroy_mempools(); /* in case we allocated some */
3389 	return -ENOMEM;
3390 }
3391 
3392 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
3393 	void *unused)
3394 {
3395 	/* just so we have it.  you never know what interesting things we
3396 	 * might want to do here some day...
3397 	 */
3398 
3399 	return NOTIFY_DONE;
3400 }
3401 
3402 static struct notifier_block drbd_notifier = {
3403 	.notifier_call = drbd_notify_sys,
3404 };
3405 
3406 static void drbd_release_ee_lists(struct drbd_conf *mdev)
3407 {
3408 	int rr;
3409 
3410 	rr = drbd_release_ee(mdev, &mdev->active_ee);
3411 	if (rr)
3412 		dev_err(DEV, "%d EEs in active list found!\n", rr);
3413 
3414 	rr = drbd_release_ee(mdev, &mdev->sync_ee);
3415 	if (rr)
3416 		dev_err(DEV, "%d EEs in sync list found!\n", rr);
3417 
3418 	rr = drbd_release_ee(mdev, &mdev->read_ee);
3419 	if (rr)
3420 		dev_err(DEV, "%d EEs in read list found!\n", rr);
3421 
3422 	rr = drbd_release_ee(mdev, &mdev->done_ee);
3423 	if (rr)
3424 		dev_err(DEV, "%d EEs in done list found!\n", rr);
3425 
3426 	rr = drbd_release_ee(mdev, &mdev->net_ee);
3427 	if (rr)
3428 		dev_err(DEV, "%d EEs in net list found!\n", rr);
3429 }
3430 
3431 /* caution. no locking.
3432  * currently only used from module cleanup code. */
3433 static void drbd_delete_device(unsigned int minor)
3434 {
3435 	struct drbd_conf *mdev = minor_to_mdev(minor);
3436 
3437 	if (!mdev)
3438 		return;
3439 
3440 	del_timer_sync(&mdev->request_timer);
3441 
3442 	/* paranoia asserts */
3443 	if (mdev->open_cnt != 0)
3444 		dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
3445 				__FILE__ , __LINE__);
3446 
3447 	ERR_IF (!list_empty(&mdev->data.work.q)) {
3448 		struct list_head *lp;
3449 		list_for_each(lp, &mdev->data.work.q) {
3450 			dev_err(DEV, "lp = %p\n", lp);
3451 		}
3452 	};
3453 	/* end paranoia asserts */
3454 
3455 	del_gendisk(mdev->vdisk);
3456 
3457 	/* cleanup stuff that may have been allocated during
3458 	 * device (re-)configuration or state changes */
3459 
3460 	if (mdev->this_bdev)
3461 		bdput(mdev->this_bdev);
3462 
3463 	drbd_free_resources(mdev);
3464 
3465 	drbd_release_ee_lists(mdev);
3466 
3467 	/* should be freed on disconnect? */
3468 	kfree(mdev->ee_hash);
3469 	/*
3470 	mdev->ee_hash_s = 0;
3471 	mdev->ee_hash = NULL;
3472 	*/
3473 
3474 	lc_destroy(mdev->act_log);
3475 	lc_destroy(mdev->resync);
3476 
3477 	kfree(mdev->p_uuid);
3478 	/* mdev->p_uuid = NULL; */
3479 
3480 	kfree(mdev->int_dig_out);
3481 	kfree(mdev->int_dig_in);
3482 	kfree(mdev->int_dig_vv);
3483 
3484 	/* cleanup the rest that has been
3485 	 * allocated from drbd_new_device
3486 	 * and actually free the mdev itself */
3487 	drbd_free_mdev(mdev);
3488 }
3489 
3490 static void drbd_cleanup(void)
3491 {
3492 	unsigned int i;
3493 
3494 	unregister_reboot_notifier(&drbd_notifier);
3495 
3496 	/* first remove proc,
3497 	 * drbdsetup uses it's presence to detect
3498 	 * whether DRBD is loaded.
3499 	 * If we would get stuck in proc removal,
3500 	 * but have netlink already deregistered,
3501 	 * some drbdsetup commands may wait forever
3502 	 * for an answer.
3503 	 */
3504 	if (drbd_proc)
3505 		remove_proc_entry("drbd", NULL);
3506 
3507 	drbd_nl_cleanup();
3508 
3509 	if (minor_table) {
3510 		i = minor_count;
3511 		while (i--)
3512 			drbd_delete_device(i);
3513 		drbd_destroy_mempools();
3514 	}
3515 
3516 	kfree(minor_table);
3517 
3518 	unregister_blkdev(DRBD_MAJOR, "drbd");
3519 
3520 	printk(KERN_INFO "drbd: module cleanup done.\n");
3521 }
3522 
3523 /**
3524  * drbd_congested() - Callback for pdflush
3525  * @congested_data:	User data
3526  * @bdi_bits:		Bits pdflush is currently interested in
3527  *
3528  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
3529  */
3530 static int drbd_congested(void *congested_data, int bdi_bits)
3531 {
3532 	struct drbd_conf *mdev = congested_data;
3533 	struct request_queue *q;
3534 	char reason = '-';
3535 	int r = 0;
3536 
3537 	if (!may_inc_ap_bio(mdev)) {
3538 		/* DRBD has frozen IO */
3539 		r = bdi_bits;
3540 		reason = 'd';
3541 		goto out;
3542 	}
3543 
3544 	if (get_ldev(mdev)) {
3545 		q = bdev_get_queue(mdev->ldev->backing_bdev);
3546 		r = bdi_congested(&q->backing_dev_info, bdi_bits);
3547 		put_ldev(mdev);
3548 		if (r)
3549 			reason = 'b';
3550 	}
3551 
3552 	if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
3553 		r |= (1 << BDI_async_congested);
3554 		reason = reason == 'b' ? 'a' : 'n';
3555 	}
3556 
3557 out:
3558 	mdev->congestion_reason = reason;
3559 	return r;
3560 }
3561 
3562 struct drbd_conf *drbd_new_device(unsigned int minor)
3563 {
3564 	struct drbd_conf *mdev;
3565 	struct gendisk *disk;
3566 	struct request_queue *q;
3567 
3568 	/* GFP_KERNEL, we are outside of all write-out paths */
3569 	mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
3570 	if (!mdev)
3571 		return NULL;
3572 	if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
3573 		goto out_no_cpumask;
3574 
3575 	mdev->minor = minor;
3576 
3577 	drbd_init_set_defaults(mdev);
3578 
3579 	q = blk_alloc_queue(GFP_KERNEL);
3580 	if (!q)
3581 		goto out_no_q;
3582 	mdev->rq_queue = q;
3583 	q->queuedata   = mdev;
3584 
3585 	disk = alloc_disk(1);
3586 	if (!disk)
3587 		goto out_no_disk;
3588 	mdev->vdisk = disk;
3589 
3590 	set_disk_ro(disk, true);
3591 
3592 	disk->queue = q;
3593 	disk->major = DRBD_MAJOR;
3594 	disk->first_minor = minor;
3595 	disk->fops = &drbd_ops;
3596 	sprintf(disk->disk_name, "drbd%d", minor);
3597 	disk->private_data = mdev;
3598 
3599 	mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3600 	/* we have no partitions. we contain only ourselves. */
3601 	mdev->this_bdev->bd_contains = mdev->this_bdev;
3602 
3603 	q->backing_dev_info.congested_fn = drbd_congested;
3604 	q->backing_dev_info.congested_data = mdev;
3605 
3606 	blk_queue_make_request(q, drbd_make_request);
3607 	/* Setting the max_hw_sectors to an odd value of 8kibyte here
3608 	   This triggers a max_bio_size message upon first attach or connect */
3609 	blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
3610 	blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3611 	blk_queue_merge_bvec(q, drbd_merge_bvec);
3612 	q->queue_lock = &mdev->req_lock;
3613 
3614 	mdev->md_io_page = alloc_page(GFP_KERNEL);
3615 	if (!mdev->md_io_page)
3616 		goto out_no_io_page;
3617 
3618 	if (drbd_bm_init(mdev))
3619 		goto out_no_bitmap;
3620 	/* no need to lock access, we are still initializing this minor device. */
3621 	if (!tl_init(mdev))
3622 		goto out_no_tl;
3623 
3624 	mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3625 	if (!mdev->app_reads_hash)
3626 		goto out_no_app_reads;
3627 
3628 	mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3629 	if (!mdev->current_epoch)
3630 		goto out_no_epoch;
3631 
3632 	INIT_LIST_HEAD(&mdev->current_epoch->list);
3633 	mdev->epochs = 1;
3634 
3635 	return mdev;
3636 
3637 /* out_whatever_else:
3638 	kfree(mdev->current_epoch); */
3639 out_no_epoch:
3640 	kfree(mdev->app_reads_hash);
3641 out_no_app_reads:
3642 	tl_cleanup(mdev);
3643 out_no_tl:
3644 	drbd_bm_cleanup(mdev);
3645 out_no_bitmap:
3646 	__free_page(mdev->md_io_page);
3647 out_no_io_page:
3648 	put_disk(disk);
3649 out_no_disk:
3650 	blk_cleanup_queue(q);
3651 out_no_q:
3652 	free_cpumask_var(mdev->cpu_mask);
3653 out_no_cpumask:
3654 	kfree(mdev);
3655 	return NULL;
3656 }
3657 
3658 /* counterpart of drbd_new_device.
3659  * last part of drbd_delete_device. */
3660 void drbd_free_mdev(struct drbd_conf *mdev)
3661 {
3662 	kfree(mdev->current_epoch);
3663 	kfree(mdev->app_reads_hash);
3664 	tl_cleanup(mdev);
3665 	if (mdev->bitmap) /* should no longer be there. */
3666 		drbd_bm_cleanup(mdev);
3667 	__free_page(mdev->md_io_page);
3668 	put_disk(mdev->vdisk);
3669 	blk_cleanup_queue(mdev->rq_queue);
3670 	free_cpumask_var(mdev->cpu_mask);
3671 	drbd_free_tl_hash(mdev);
3672 	kfree(mdev);
3673 }
3674 
3675 
3676 int __init drbd_init(void)
3677 {
3678 	int err;
3679 
3680 	if (sizeof(struct p_handshake) != 80) {
3681 		printk(KERN_ERR
3682 		       "drbd: never change the size or layout "
3683 		       "of the HandShake packet.\n");
3684 		return -EINVAL;
3685 	}
3686 
3687 	if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
3688 		printk(KERN_ERR
3689 			"drbd: invalid minor_count (%d)\n", minor_count);
3690 #ifdef MODULE
3691 		return -EINVAL;
3692 #else
3693 		minor_count = 8;
3694 #endif
3695 	}
3696 
3697 	err = drbd_nl_init();
3698 	if (err)
3699 		return err;
3700 
3701 	err = register_blkdev(DRBD_MAJOR, "drbd");
3702 	if (err) {
3703 		printk(KERN_ERR
3704 		       "drbd: unable to register block device major %d\n",
3705 		       DRBD_MAJOR);
3706 		return err;
3707 	}
3708 
3709 	register_reboot_notifier(&drbd_notifier);
3710 
3711 	/*
3712 	 * allocate all necessary structs
3713 	 */
3714 	err = -ENOMEM;
3715 
3716 	init_waitqueue_head(&drbd_pp_wait);
3717 
3718 	drbd_proc = NULL; /* play safe for drbd_cleanup */
3719 	minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3720 				GFP_KERNEL);
3721 	if (!minor_table)
3722 		goto Enomem;
3723 
3724 	err = drbd_create_mempools();
3725 	if (err)
3726 		goto Enomem;
3727 
3728 	drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
3729 	if (!drbd_proc)	{
3730 		printk(KERN_ERR "drbd: unable to register proc file\n");
3731 		goto Enomem;
3732 	}
3733 
3734 	rwlock_init(&global_state_lock);
3735 
3736 	printk(KERN_INFO "drbd: initialized. "
3737 	       "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3738 	       API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3739 	printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3740 	printk(KERN_INFO "drbd: registered as block device major %d\n",
3741 		DRBD_MAJOR);
3742 	printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3743 
3744 	return 0; /* Success! */
3745 
3746 Enomem:
3747 	drbd_cleanup();
3748 	if (err == -ENOMEM)
3749 		/* currently always the case */
3750 		printk(KERN_ERR "drbd: ran out of memory\n");
3751 	else
3752 		printk(KERN_ERR "drbd: initialization failure\n");
3753 	return err;
3754 }
3755 
3756 void drbd_free_bc(struct drbd_backing_dev *ldev)
3757 {
3758 	if (ldev == NULL)
3759 		return;
3760 
3761 	blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
3762 	blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
3763 
3764 	kfree(ldev);
3765 }
3766 
3767 void drbd_free_sock(struct drbd_conf *mdev)
3768 {
3769 	if (mdev->data.socket) {
3770 		mutex_lock(&mdev->data.mutex);
3771 		kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3772 		sock_release(mdev->data.socket);
3773 		mdev->data.socket = NULL;
3774 		mutex_unlock(&mdev->data.mutex);
3775 	}
3776 	if (mdev->meta.socket) {
3777 		mutex_lock(&mdev->meta.mutex);
3778 		kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3779 		sock_release(mdev->meta.socket);
3780 		mdev->meta.socket = NULL;
3781 		mutex_unlock(&mdev->meta.mutex);
3782 	}
3783 }
3784 
3785 
3786 void drbd_free_resources(struct drbd_conf *mdev)
3787 {
3788 	crypto_free_hash(mdev->csums_tfm);
3789 	mdev->csums_tfm = NULL;
3790 	crypto_free_hash(mdev->verify_tfm);
3791 	mdev->verify_tfm = NULL;
3792 	crypto_free_hash(mdev->cram_hmac_tfm);
3793 	mdev->cram_hmac_tfm = NULL;
3794 	crypto_free_hash(mdev->integrity_w_tfm);
3795 	mdev->integrity_w_tfm = NULL;
3796 	crypto_free_hash(mdev->integrity_r_tfm);
3797 	mdev->integrity_r_tfm = NULL;
3798 
3799 	drbd_free_sock(mdev);
3800 
3801 	__no_warn(local,
3802 		  drbd_free_bc(mdev->ldev);
3803 		  mdev->ldev = NULL;);
3804 }
3805 
3806 /* meta data management */
3807 
3808 struct meta_data_on_disk {
3809 	u64 la_size;           /* last agreed size. */
3810 	u64 uuid[UI_SIZE];   /* UUIDs. */
3811 	u64 device_uuid;
3812 	u64 reserved_u64_1;
3813 	u32 flags;             /* MDF */
3814 	u32 magic;
3815 	u32 md_size_sect;
3816 	u32 al_offset;         /* offset to this block */
3817 	u32 al_nr_extents;     /* important for restoring the AL */
3818 	      /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3819 	u32 bm_offset;         /* offset to the bitmap, from here */
3820 	u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3821 	u32 la_peer_max_bio_size;   /* last peer max_bio_size */
3822 	u32 reserved_u32[3];
3823 
3824 } __packed;
3825 
3826 /**
3827  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3828  * @mdev:	DRBD device.
3829  */
3830 void drbd_md_sync(struct drbd_conf *mdev)
3831 {
3832 	struct meta_data_on_disk *buffer;
3833 	sector_t sector;
3834 	int i;
3835 
3836 	del_timer(&mdev->md_sync_timer);
3837 	/* timer may be rearmed by drbd_md_mark_dirty() now. */
3838 	if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3839 		return;
3840 
3841 	/* We use here D_FAILED and not D_ATTACHING because we try to write
3842 	 * metadata even if we detach due to a disk failure! */
3843 	if (!get_ldev_if_state(mdev, D_FAILED))
3844 		return;
3845 
3846 	buffer = drbd_md_get_buffer(mdev);
3847 	if (!buffer)
3848 		goto out;
3849 
3850 	memset(buffer, 0, 512);
3851 
3852 	buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3853 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3854 		buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3855 	buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3856 	buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3857 
3858 	buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3859 	buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3860 	buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3861 	buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3862 	buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3863 
3864 	buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3865 	buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
3866 
3867 	D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3868 	sector = mdev->ldev->md.md_offset;
3869 
3870 	if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3871 		/* this was a try anyways ... */
3872 		dev_err(DEV, "meta data update failed!\n");
3873 		drbd_chk_io_error(mdev, 1, true);
3874 	}
3875 
3876 	/* Update mdev->ldev->md.la_size_sect,
3877 	 * since we updated it on metadata. */
3878 	mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3879 
3880 	drbd_md_put_buffer(mdev);
3881 out:
3882 	put_ldev(mdev);
3883 }
3884 
3885 /**
3886  * drbd_md_read() - Reads in the meta data super block
3887  * @mdev:	DRBD device.
3888  * @bdev:	Device from which the meta data should be read in.
3889  *
3890  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
3891  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3892  */
3893 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3894 {
3895 	struct meta_data_on_disk *buffer;
3896 	int i, rv = NO_ERROR;
3897 
3898 	if (!get_ldev_if_state(mdev, D_ATTACHING))
3899 		return ERR_IO_MD_DISK;
3900 
3901 	buffer = drbd_md_get_buffer(mdev);
3902 	if (!buffer)
3903 		goto out;
3904 
3905 	if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3906 		/* NOTE: can't do normal error processing here as this is
3907 		   called BEFORE disk is attached */
3908 		dev_err(DEV, "Error while reading metadata.\n");
3909 		rv = ERR_IO_MD_DISK;
3910 		goto err;
3911 	}
3912 
3913 	if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3914 		dev_err(DEV, "Error while reading metadata, magic not found.\n");
3915 		rv = ERR_MD_INVALID;
3916 		goto err;
3917 	}
3918 	if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3919 		dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3920 		    be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3921 		rv = ERR_MD_INVALID;
3922 		goto err;
3923 	}
3924 	if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3925 		dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3926 		    be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3927 		rv = ERR_MD_INVALID;
3928 		goto err;
3929 	}
3930 	if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3931 		dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3932 		    be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3933 		rv = ERR_MD_INVALID;
3934 		goto err;
3935 	}
3936 
3937 	if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3938 		dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3939 		    be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3940 		rv = ERR_MD_INVALID;
3941 		goto err;
3942 	}
3943 
3944 	bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3945 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3946 		bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3947 	bdev->md.flags = be32_to_cpu(buffer->flags);
3948 	mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3949 	bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3950 
3951 	spin_lock_irq(&mdev->req_lock);
3952 	if (mdev->state.conn < C_CONNECTED) {
3953 		int peer;
3954 		peer = be32_to_cpu(buffer->la_peer_max_bio_size);
3955 		peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
3956 		mdev->peer_max_bio_size = peer;
3957 	}
3958 	spin_unlock_irq(&mdev->req_lock);
3959 
3960 	if (mdev->sync_conf.al_extents < 7)
3961 		mdev->sync_conf.al_extents = 127;
3962 
3963  err:
3964 	drbd_md_put_buffer(mdev);
3965  out:
3966 	put_ldev(mdev);
3967 
3968 	return rv;
3969 }
3970 
3971 /**
3972  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3973  * @mdev:	DRBD device.
3974  *
3975  * Call this function if you change anything that should be written to
3976  * the meta-data super block. This function sets MD_DIRTY, and starts a
3977  * timer that ensures that within five seconds you have to call drbd_md_sync().
3978  */
3979 #ifdef DEBUG
3980 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
3981 {
3982 	if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
3983 		mod_timer(&mdev->md_sync_timer, jiffies + HZ);
3984 		mdev->last_md_mark_dirty.line = line;
3985 		mdev->last_md_mark_dirty.func = func;
3986 	}
3987 }
3988 #else
3989 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3990 {
3991 	if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
3992 		mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3993 }
3994 #endif
3995 
3996 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3997 {
3998 	int i;
3999 
4000 	for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
4001 		mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
4002 }
4003 
4004 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
4005 {
4006 	if (idx == UI_CURRENT) {
4007 		if (mdev->state.role == R_PRIMARY)
4008 			val |= 1;
4009 		else
4010 			val &= ~((u64)1);
4011 
4012 		drbd_set_ed_uuid(mdev, val);
4013 	}
4014 
4015 	mdev->ldev->md.uuid[idx] = val;
4016 	drbd_md_mark_dirty(mdev);
4017 }
4018 
4019 
4020 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
4021 {
4022 	if (mdev->ldev->md.uuid[idx]) {
4023 		drbd_uuid_move_history(mdev);
4024 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
4025 	}
4026 	_drbd_uuid_set(mdev, idx, val);
4027 }
4028 
4029 /**
4030  * drbd_uuid_new_current() - Creates a new current UUID
4031  * @mdev:	DRBD device.
4032  *
4033  * Creates a new current UUID, and rotates the old current UUID into
4034  * the bitmap slot. Causes an incremental resync upon next connect.
4035  */
4036 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
4037 {
4038 	u64 val;
4039 	unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
4040 
4041 	if (bm_uuid)
4042 		dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
4043 
4044 	mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
4045 
4046 	get_random_bytes(&val, sizeof(u64));
4047 	_drbd_uuid_set(mdev, UI_CURRENT, val);
4048 	drbd_print_uuids(mdev, "new current UUID");
4049 	/* get it to stable storage _now_ */
4050 	drbd_md_sync(mdev);
4051 }
4052 
4053 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
4054 {
4055 	if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
4056 		return;
4057 
4058 	if (val == 0) {
4059 		drbd_uuid_move_history(mdev);
4060 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
4061 		mdev->ldev->md.uuid[UI_BITMAP] = 0;
4062 	} else {
4063 		unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
4064 		if (bm_uuid)
4065 			dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
4066 
4067 		mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
4068 	}
4069 	drbd_md_mark_dirty(mdev);
4070 }
4071 
4072 /**
4073  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
4074  * @mdev:	DRBD device.
4075  *
4076  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
4077  */
4078 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
4079 {
4080 	int rv = -EIO;
4081 
4082 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
4083 		drbd_md_set_flag(mdev, MDF_FULL_SYNC);
4084 		drbd_md_sync(mdev);
4085 		drbd_bm_set_all(mdev);
4086 
4087 		rv = drbd_bm_write(mdev);
4088 
4089 		if (!rv) {
4090 			drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
4091 			drbd_md_sync(mdev);
4092 		}
4093 
4094 		put_ldev(mdev);
4095 	}
4096 
4097 	return rv;
4098 }
4099 
4100 /**
4101  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
4102  * @mdev:	DRBD device.
4103  *
4104  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
4105  */
4106 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
4107 {
4108 	int rv = -EIO;
4109 
4110 	drbd_resume_al(mdev);
4111 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
4112 		drbd_bm_clear_all(mdev);
4113 		rv = drbd_bm_write(mdev);
4114 		put_ldev(mdev);
4115 	}
4116 
4117 	return rv;
4118 }
4119 
4120 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
4121 {
4122 	struct bm_io_work *work = container_of(w, struct bm_io_work, w);
4123 	int rv = -EIO;
4124 
4125 	D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
4126 
4127 	if (get_ldev(mdev)) {
4128 		drbd_bm_lock(mdev, work->why, work->flags);
4129 		rv = work->io_fn(mdev);
4130 		drbd_bm_unlock(mdev);
4131 		put_ldev(mdev);
4132 	}
4133 
4134 	clear_bit(BITMAP_IO, &mdev->flags);
4135 	smp_mb__after_clear_bit();
4136 	wake_up(&mdev->misc_wait);
4137 
4138 	if (work->done)
4139 		work->done(mdev, rv);
4140 
4141 	clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
4142 	work->why = NULL;
4143 	work->flags = 0;
4144 
4145 	return 1;
4146 }
4147 
4148 void drbd_ldev_destroy(struct drbd_conf *mdev)
4149 {
4150 	lc_destroy(mdev->resync);
4151 	mdev->resync = NULL;
4152 	lc_destroy(mdev->act_log);
4153 	mdev->act_log = NULL;
4154 	__no_warn(local,
4155 		drbd_free_bc(mdev->ldev);
4156 		mdev->ldev = NULL;);
4157 
4158 	if (mdev->md_io_tmpp) {
4159 		__free_page(mdev->md_io_tmpp);
4160 		mdev->md_io_tmpp = NULL;
4161 	}
4162 	clear_bit(GO_DISKLESS, &mdev->flags);
4163 }
4164 
4165 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused)
4166 {
4167 	D_ASSERT(mdev->state.disk == D_FAILED);
4168 	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
4169 	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
4170 	 * the protected members anymore, though, so once put_ldev reaches zero
4171 	 * again, it will be safe to free them. */
4172 	drbd_force_state(mdev, NS(disk, D_DISKLESS));
4173 	return 1;
4174 }
4175 
4176 void drbd_go_diskless(struct drbd_conf *mdev)
4177 {
4178 	D_ASSERT(mdev->state.disk == D_FAILED);
4179 	if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
4180 		drbd_queue_work(&mdev->data.work, &mdev->go_diskless);
4181 }
4182 
4183 /**
4184  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
4185  * @mdev:	DRBD device.
4186  * @io_fn:	IO callback to be called when bitmap IO is possible
4187  * @done:	callback to be called after the bitmap IO was performed
4188  * @why:	Descriptive text of the reason for doing the IO
4189  *
4190  * While IO on the bitmap happens we freeze application IO thus we ensure
4191  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
4192  * called from worker context. It MUST NOT be used while a previous such
4193  * work is still pending!
4194  */
4195 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
4196 			  int (*io_fn)(struct drbd_conf *),
4197 			  void (*done)(struct drbd_conf *, int),
4198 			  char *why, enum bm_flag flags)
4199 {
4200 	D_ASSERT(current == mdev->worker.task);
4201 
4202 	D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
4203 	D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
4204 	D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
4205 	if (mdev->bm_io_work.why)
4206 		dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
4207 			why, mdev->bm_io_work.why);
4208 
4209 	mdev->bm_io_work.io_fn = io_fn;
4210 	mdev->bm_io_work.done = done;
4211 	mdev->bm_io_work.why = why;
4212 	mdev->bm_io_work.flags = flags;
4213 
4214 	spin_lock_irq(&mdev->req_lock);
4215 	set_bit(BITMAP_IO, &mdev->flags);
4216 	if (atomic_read(&mdev->ap_bio_cnt) == 0) {
4217 		if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
4218 			drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
4219 	}
4220 	spin_unlock_irq(&mdev->req_lock);
4221 }
4222 
4223 /**
4224  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
4225  * @mdev:	DRBD device.
4226  * @io_fn:	IO callback to be called when bitmap IO is possible
4227  * @why:	Descriptive text of the reason for doing the IO
4228  *
4229  * freezes application IO while that the actual IO operations runs. This
4230  * functions MAY NOT be called from worker context.
4231  */
4232 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
4233 		char *why, enum bm_flag flags)
4234 {
4235 	int rv;
4236 
4237 	D_ASSERT(current != mdev->worker.task);
4238 
4239 	if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
4240 		drbd_suspend_io(mdev);
4241 
4242 	drbd_bm_lock(mdev, why, flags);
4243 	rv = io_fn(mdev);
4244 	drbd_bm_unlock(mdev);
4245 
4246 	if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
4247 		drbd_resume_io(mdev);
4248 
4249 	return rv;
4250 }
4251 
4252 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
4253 {
4254 	if ((mdev->ldev->md.flags & flag) != flag) {
4255 		drbd_md_mark_dirty(mdev);
4256 		mdev->ldev->md.flags |= flag;
4257 	}
4258 }
4259 
4260 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
4261 {
4262 	if ((mdev->ldev->md.flags & flag) != 0) {
4263 		drbd_md_mark_dirty(mdev);
4264 		mdev->ldev->md.flags &= ~flag;
4265 	}
4266 }
4267 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
4268 {
4269 	return (bdev->md.flags & flag) != 0;
4270 }
4271 
4272 static void md_sync_timer_fn(unsigned long data)
4273 {
4274 	struct drbd_conf *mdev = (struct drbd_conf *) data;
4275 
4276 	drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
4277 }
4278 
4279 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
4280 {
4281 	dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
4282 #ifdef DEBUG
4283 	dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
4284 		mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
4285 #endif
4286 	drbd_md_sync(mdev);
4287 	return 1;
4288 }
4289 
4290 #ifdef CONFIG_DRBD_FAULT_INJECTION
4291 /* Fault insertion support including random number generator shamelessly
4292  * stolen from kernel/rcutorture.c */
4293 struct fault_random_state {
4294 	unsigned long state;
4295 	unsigned long count;
4296 };
4297 
4298 #define FAULT_RANDOM_MULT 39916801  /* prime */
4299 #define FAULT_RANDOM_ADD	479001701 /* prime */
4300 #define FAULT_RANDOM_REFRESH 10000
4301 
4302 /*
4303  * Crude but fast random-number generator.  Uses a linear congruential
4304  * generator, with occasional help from get_random_bytes().
4305  */
4306 static unsigned long
4307 _drbd_fault_random(struct fault_random_state *rsp)
4308 {
4309 	long refresh;
4310 
4311 	if (!rsp->count--) {
4312 		get_random_bytes(&refresh, sizeof(refresh));
4313 		rsp->state += refresh;
4314 		rsp->count = FAULT_RANDOM_REFRESH;
4315 	}
4316 	rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
4317 	return swahw32(rsp->state);
4318 }
4319 
4320 static char *
4321 _drbd_fault_str(unsigned int type) {
4322 	static char *_faults[] = {
4323 		[DRBD_FAULT_MD_WR] = "Meta-data write",
4324 		[DRBD_FAULT_MD_RD] = "Meta-data read",
4325 		[DRBD_FAULT_RS_WR] = "Resync write",
4326 		[DRBD_FAULT_RS_RD] = "Resync read",
4327 		[DRBD_FAULT_DT_WR] = "Data write",
4328 		[DRBD_FAULT_DT_RD] = "Data read",
4329 		[DRBD_FAULT_DT_RA] = "Data read ahead",
4330 		[DRBD_FAULT_BM_ALLOC] = "BM allocation",
4331 		[DRBD_FAULT_AL_EE] = "EE allocation",
4332 		[DRBD_FAULT_RECEIVE] = "receive data corruption",
4333 	};
4334 
4335 	return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
4336 }
4337 
4338 unsigned int
4339 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
4340 {
4341 	static struct fault_random_state rrs = {0, 0};
4342 
4343 	unsigned int ret = (
4344 		(fault_devs == 0 ||
4345 			((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
4346 		(((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
4347 
4348 	if (ret) {
4349 		fault_count++;
4350 
4351 		if (__ratelimit(&drbd_ratelimit_state))
4352 			dev_warn(DEV, "***Simulating %s failure\n",
4353 				_drbd_fault_str(type));
4354 	}
4355 
4356 	return ret;
4357 }
4358 #endif
4359 
4360 const char *drbd_buildtag(void)
4361 {
4362 	/* DRBD built from external sources has here a reference to the
4363 	   git hash of the source code. */
4364 
4365 	static char buildtag[38] = "\0uilt-in";
4366 
4367 	if (buildtag[0] == 0) {
4368 #ifdef MODULE
4369 		sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
4370 #else
4371 		buildtag[0] = 'b';
4372 #endif
4373 	}
4374 
4375 	return buildtag;
4376 }
4377 
4378 module_init(drbd_init)
4379 module_exit(drbd_cleanup)
4380 
4381 EXPORT_SYMBOL(drbd_conn_str);
4382 EXPORT_SYMBOL(drbd_role_str);
4383 EXPORT_SYMBOL(drbd_disk_str);
4384 EXPORT_SYMBOL(drbd_set_st_err_str);
4385