xref: /linux/drivers/block/drbd/drbd_main.c (revision c116cc94969447f44fd7205a027084ceebe90d34)
1 /*
2    drbd.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12 
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17 
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22 
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 
27  */
28 
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48 
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52 
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56 
57 #include "drbd_vli.h"
58 
59 struct after_state_chg_work {
60 	struct drbd_work w;
61 	union drbd_state os;
62 	union drbd_state ns;
63 	enum chg_state_flags flags;
64 	struct completion *done;
65 };
66 
67 static DEFINE_MUTEX(drbd_main_mutex);
68 int drbdd_init(struct drbd_thread *);
69 int drbd_worker(struct drbd_thread *);
70 int drbd_asender(struct drbd_thread *);
71 
72 int drbd_init(void);
73 static int drbd_open(struct block_device *bdev, fmode_t mode);
74 static int drbd_release(struct gendisk *gd, fmode_t mode);
75 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
76 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
77 			   union drbd_state ns, enum chg_state_flags flags);
78 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
79 static void md_sync_timer_fn(unsigned long data);
80 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
81 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused);
82 
83 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
84 	      "Lars Ellenberg <lars@linbit.com>");
85 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
86 MODULE_VERSION(REL_VERSION);
87 MODULE_LICENSE("GPL");
88 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices ("
89 		 __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
90 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
91 
92 #include <linux/moduleparam.h>
93 /* allow_open_on_secondary */
94 MODULE_PARM_DESC(allow_oos, "DONT USE!");
95 /* thanks to these macros, if compiled into the kernel (not-module),
96  * this becomes the boot parameter drbd.minor_count */
97 module_param(minor_count, uint, 0444);
98 module_param(disable_sendpage, bool, 0644);
99 module_param(allow_oos, bool, 0);
100 module_param(cn_idx, uint, 0444);
101 module_param(proc_details, int, 0644);
102 
103 #ifdef CONFIG_DRBD_FAULT_INJECTION
104 int enable_faults;
105 int fault_rate;
106 static int fault_count;
107 int fault_devs;
108 /* bitmap of enabled faults */
109 module_param(enable_faults, int, 0664);
110 /* fault rate % value - applies to all enabled faults */
111 module_param(fault_rate, int, 0664);
112 /* count of faults inserted */
113 module_param(fault_count, int, 0664);
114 /* bitmap of devices to insert faults on */
115 module_param(fault_devs, int, 0644);
116 #endif
117 
118 /* module parameter, defined */
119 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
120 bool disable_sendpage;
121 bool allow_oos;
122 unsigned int cn_idx = CN_IDX_DRBD;
123 int proc_details;       /* Detail level in proc drbd*/
124 
125 /* Module parameter for setting the user mode helper program
126  * to run. Default is /sbin/drbdadm */
127 char usermode_helper[80] = "/sbin/drbdadm";
128 
129 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
130 
131 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
132  * as member "struct gendisk *vdisk;"
133  */
134 struct drbd_conf **minor_table;
135 
136 struct kmem_cache *drbd_request_cache;
137 struct kmem_cache *drbd_ee_cache;	/* epoch entries */
138 struct kmem_cache *drbd_bm_ext_cache;	/* bitmap extents */
139 struct kmem_cache *drbd_al_ext_cache;	/* activity log extents */
140 mempool_t *drbd_request_mempool;
141 mempool_t *drbd_ee_mempool;
142 mempool_t *drbd_md_io_page_pool;
143 struct bio_set *drbd_md_io_bio_set;
144 
145 /* I do not use a standard mempool, because:
146    1) I want to hand out the pre-allocated objects first.
147    2) I want to be able to interrupt sleeping allocation with a signal.
148    Note: This is a single linked list, the next pointer is the private
149 	 member of struct page.
150  */
151 struct page *drbd_pp_pool;
152 spinlock_t   drbd_pp_lock;
153 int          drbd_pp_vacant;
154 wait_queue_head_t drbd_pp_wait;
155 
156 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
157 
158 static const struct block_device_operations drbd_ops = {
159 	.owner =   THIS_MODULE,
160 	.open =    drbd_open,
161 	.release = drbd_release,
162 };
163 
164 static void bio_destructor_drbd(struct bio *bio)
165 {
166 	bio_free(bio, drbd_md_io_bio_set);
167 }
168 
169 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
170 {
171 	struct bio *bio;
172 
173 	if (!drbd_md_io_bio_set)
174 		return bio_alloc(gfp_mask, 1);
175 
176 	bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
177 	if (!bio)
178 		return NULL;
179 	bio->bi_destructor = bio_destructor_drbd;
180 	return bio;
181 }
182 
183 #ifdef __CHECKER__
184 /* When checking with sparse, and this is an inline function, sparse will
185    give tons of false positives. When this is a real functions sparse works.
186  */
187 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
188 {
189 	int io_allowed;
190 
191 	atomic_inc(&mdev->local_cnt);
192 	io_allowed = (mdev->state.disk >= mins);
193 	if (!io_allowed) {
194 		if (atomic_dec_and_test(&mdev->local_cnt))
195 			wake_up(&mdev->misc_wait);
196 	}
197 	return io_allowed;
198 }
199 
200 #endif
201 
202 /**
203  * DOC: The transfer log
204  *
205  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
206  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
207  * of the list. There is always at least one &struct drbd_tl_epoch object.
208  *
209  * Each &struct drbd_tl_epoch has a circular double linked list of requests
210  * attached.
211  */
212 static int tl_init(struct drbd_conf *mdev)
213 {
214 	struct drbd_tl_epoch *b;
215 
216 	/* during device minor initialization, we may well use GFP_KERNEL */
217 	b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
218 	if (!b)
219 		return 0;
220 	INIT_LIST_HEAD(&b->requests);
221 	INIT_LIST_HEAD(&b->w.list);
222 	b->next = NULL;
223 	b->br_number = 4711;
224 	b->n_writes = 0;
225 	b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
226 
227 	mdev->oldest_tle = b;
228 	mdev->newest_tle = b;
229 	INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
230 	INIT_LIST_HEAD(&mdev->barrier_acked_requests);
231 
232 	mdev->tl_hash = NULL;
233 	mdev->tl_hash_s = 0;
234 
235 	return 1;
236 }
237 
238 static void tl_cleanup(struct drbd_conf *mdev)
239 {
240 	D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
241 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
242 	kfree(mdev->oldest_tle);
243 	mdev->oldest_tle = NULL;
244 	kfree(mdev->unused_spare_tle);
245 	mdev->unused_spare_tle = NULL;
246 	kfree(mdev->tl_hash);
247 	mdev->tl_hash = NULL;
248 	mdev->tl_hash_s = 0;
249 }
250 
251 /**
252  * _tl_add_barrier() - Adds a barrier to the transfer log
253  * @mdev:	DRBD device.
254  * @new:	Barrier to be added before the current head of the TL.
255  *
256  * The caller must hold the req_lock.
257  */
258 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
259 {
260 	struct drbd_tl_epoch *newest_before;
261 
262 	INIT_LIST_HEAD(&new->requests);
263 	INIT_LIST_HEAD(&new->w.list);
264 	new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
265 	new->next = NULL;
266 	new->n_writes = 0;
267 
268 	newest_before = mdev->newest_tle;
269 	new->br_number = newest_before->br_number+1;
270 	if (mdev->newest_tle != new) {
271 		mdev->newest_tle->next = new;
272 		mdev->newest_tle = new;
273 	}
274 }
275 
276 /**
277  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
278  * @mdev:	DRBD device.
279  * @barrier_nr:	Expected identifier of the DRBD write barrier packet.
280  * @set_size:	Expected number of requests before that barrier.
281  *
282  * In case the passed barrier_nr or set_size does not match the oldest
283  * &struct drbd_tl_epoch objects this function will cause a termination
284  * of the connection.
285  */
286 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
287 		       unsigned int set_size)
288 {
289 	struct drbd_tl_epoch *b, *nob; /* next old barrier */
290 	struct list_head *le, *tle;
291 	struct drbd_request *r;
292 
293 	spin_lock_irq(&mdev->req_lock);
294 
295 	b = mdev->oldest_tle;
296 
297 	/* first some paranoia code */
298 	if (b == NULL) {
299 		dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
300 			barrier_nr);
301 		goto bail;
302 	}
303 	if (b->br_number != barrier_nr) {
304 		dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
305 			barrier_nr, b->br_number);
306 		goto bail;
307 	}
308 	if (b->n_writes != set_size) {
309 		dev_err(DEV, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
310 			barrier_nr, set_size, b->n_writes);
311 		goto bail;
312 	}
313 
314 	/* Clean up list of requests processed during current epoch */
315 	list_for_each_safe(le, tle, &b->requests) {
316 		r = list_entry(le, struct drbd_request, tl_requests);
317 		_req_mod(r, barrier_acked);
318 	}
319 	/* There could be requests on the list waiting for completion
320 	   of the write to the local disk. To avoid corruptions of
321 	   slab's data structures we have to remove the lists head.
322 
323 	   Also there could have been a barrier ack out of sequence, overtaking
324 	   the write acks - which would be a bug and violating write ordering.
325 	   To not deadlock in case we lose connection while such requests are
326 	   still pending, we need some way to find them for the
327 	   _req_mode(connection_lost_while_pending).
328 
329 	   These have been list_move'd to the out_of_sequence_requests list in
330 	   _req_mod(, barrier_acked) above.
331 	   */
332 	list_splice_init(&b->requests, &mdev->barrier_acked_requests);
333 
334 	nob = b->next;
335 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
336 		_tl_add_barrier(mdev, b);
337 		if (nob)
338 			mdev->oldest_tle = nob;
339 		/* if nob == NULL b was the only barrier, and becomes the new
340 		   barrier. Therefore mdev->oldest_tle points already to b */
341 	} else {
342 		D_ASSERT(nob != NULL);
343 		mdev->oldest_tle = nob;
344 		kfree(b);
345 	}
346 
347 	spin_unlock_irq(&mdev->req_lock);
348 	dec_ap_pending(mdev);
349 
350 	return;
351 
352 bail:
353 	spin_unlock_irq(&mdev->req_lock);
354 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
355 }
356 
357 
358 /**
359  * _tl_restart() - Walks the transfer log, and applies an action to all requests
360  * @mdev:	DRBD device.
361  * @what:       The action/event to perform with all request objects
362  *
363  * @what might be one of connection_lost_while_pending, resend, fail_frozen_disk_io,
364  * restart_frozen_disk_io.
365  */
366 static void _tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
367 {
368 	struct drbd_tl_epoch *b, *tmp, **pn;
369 	struct list_head *le, *tle, carry_reads;
370 	struct drbd_request *req;
371 	int rv, n_writes, n_reads;
372 
373 	b = mdev->oldest_tle;
374 	pn = &mdev->oldest_tle;
375 	while (b) {
376 		n_writes = 0;
377 		n_reads = 0;
378 		INIT_LIST_HEAD(&carry_reads);
379 		list_for_each_safe(le, tle, &b->requests) {
380 			req = list_entry(le, struct drbd_request, tl_requests);
381 			rv = _req_mod(req, what);
382 
383 			n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
384 			n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
385 		}
386 		tmp = b->next;
387 
388 		if (n_writes) {
389 			if (what == resend) {
390 				b->n_writes = n_writes;
391 				if (b->w.cb == NULL) {
392 					b->w.cb = w_send_barrier;
393 					inc_ap_pending(mdev);
394 					set_bit(CREATE_BARRIER, &mdev->flags);
395 				}
396 
397 				drbd_queue_work(&mdev->data.work, &b->w);
398 			}
399 			pn = &b->next;
400 		} else {
401 			if (n_reads)
402 				list_add(&carry_reads, &b->requests);
403 			/* there could still be requests on that ring list,
404 			 * in case local io is still pending */
405 			list_del(&b->requests);
406 
407 			/* dec_ap_pending corresponding to queue_barrier.
408 			 * the newest barrier may not have been queued yet,
409 			 * in which case w.cb is still NULL. */
410 			if (b->w.cb != NULL)
411 				dec_ap_pending(mdev);
412 
413 			if (b == mdev->newest_tle) {
414 				/* recycle, but reinit! */
415 				D_ASSERT(tmp == NULL);
416 				INIT_LIST_HEAD(&b->requests);
417 				list_splice(&carry_reads, &b->requests);
418 				INIT_LIST_HEAD(&b->w.list);
419 				b->w.cb = NULL;
420 				b->br_number = net_random();
421 				b->n_writes = 0;
422 
423 				*pn = b;
424 				break;
425 			}
426 			*pn = tmp;
427 			kfree(b);
428 		}
429 		b = tmp;
430 		list_splice(&carry_reads, &b->requests);
431 	}
432 
433 	/* Actions operating on the disk state, also want to work on
434 	   requests that got barrier acked. */
435 	switch (what) {
436 	case fail_frozen_disk_io:
437 	case restart_frozen_disk_io:
438 		list_for_each_safe(le, tle, &mdev->barrier_acked_requests) {
439 			req = list_entry(le, struct drbd_request, tl_requests);
440 			_req_mod(req, what);
441 		}
442 
443 	case connection_lost_while_pending:
444 	case resend:
445 		break;
446 	default:
447 		dev_err(DEV, "what = %d in _tl_restart()\n", what);
448 	}
449 }
450 
451 
452 /**
453  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
454  * @mdev:	DRBD device.
455  *
456  * This is called after the connection to the peer was lost. The storage covered
457  * by the requests on the transfer gets marked as our of sync. Called from the
458  * receiver thread and the worker thread.
459  */
460 void tl_clear(struct drbd_conf *mdev)
461 {
462 	struct list_head *le, *tle;
463 	struct drbd_request *r;
464 
465 	spin_lock_irq(&mdev->req_lock);
466 
467 	_tl_restart(mdev, connection_lost_while_pending);
468 
469 	/* we expect this list to be empty. */
470 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
471 
472 	/* but just in case, clean it up anyways! */
473 	list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
474 		r = list_entry(le, struct drbd_request, tl_requests);
475 		/* It would be nice to complete outside of spinlock.
476 		 * But this is easier for now. */
477 		_req_mod(r, connection_lost_while_pending);
478 	}
479 
480 	/* ensure bit indicating barrier is required is clear */
481 	clear_bit(CREATE_BARRIER, &mdev->flags);
482 
483 	memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
484 
485 	spin_unlock_irq(&mdev->req_lock);
486 }
487 
488 void tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
489 {
490 	spin_lock_irq(&mdev->req_lock);
491 	_tl_restart(mdev, what);
492 	spin_unlock_irq(&mdev->req_lock);
493 }
494 
495 /**
496  * tl_abort_disk_io() - Abort disk I/O for all requests for a certain mdev in the TL
497  * @mdev:	DRBD device.
498  */
499 void tl_abort_disk_io(struct drbd_conf *mdev)
500 {
501 	struct drbd_tl_epoch *b;
502 	struct list_head *le, *tle;
503 	struct drbd_request *req;
504 
505 	spin_lock_irq(&mdev->req_lock);
506 	b = mdev->oldest_tle;
507 	while (b) {
508 		list_for_each_safe(le, tle, &b->requests) {
509 			req = list_entry(le, struct drbd_request, tl_requests);
510 			if (!(req->rq_state & RQ_LOCAL_PENDING))
511 				continue;
512 			_req_mod(req, abort_disk_io);
513 		}
514 		b = b->next;
515 	}
516 
517 	list_for_each_safe(le, tle, &mdev->barrier_acked_requests) {
518 		req = list_entry(le, struct drbd_request, tl_requests);
519 		if (!(req->rq_state & RQ_LOCAL_PENDING))
520 			continue;
521 		_req_mod(req, abort_disk_io);
522 	}
523 
524 	spin_unlock_irq(&mdev->req_lock);
525 }
526 
527 /**
528  * cl_wide_st_chg() - true if the state change is a cluster wide one
529  * @mdev:	DRBD device.
530  * @os:		old (current) state.
531  * @ns:		new (wanted) state.
532  */
533 static int cl_wide_st_chg(struct drbd_conf *mdev,
534 			  union drbd_state os, union drbd_state ns)
535 {
536 	return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
537 		 ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
538 		  (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
539 		  (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
540 		  (os.disk != D_FAILED && ns.disk == D_FAILED))) ||
541 		(os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
542 		(os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
543 }
544 
545 enum drbd_state_rv
546 drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
547 		  union drbd_state mask, union drbd_state val)
548 {
549 	unsigned long flags;
550 	union drbd_state os, ns;
551 	enum drbd_state_rv rv;
552 
553 	spin_lock_irqsave(&mdev->req_lock, flags);
554 	os = mdev->state;
555 	ns.i = (os.i & ~mask.i) | val.i;
556 	rv = _drbd_set_state(mdev, ns, f, NULL);
557 	ns = mdev->state;
558 	spin_unlock_irqrestore(&mdev->req_lock, flags);
559 
560 	return rv;
561 }
562 
563 /**
564  * drbd_force_state() - Impose a change which happens outside our control on our state
565  * @mdev:	DRBD device.
566  * @mask:	mask of state bits to change.
567  * @val:	value of new state bits.
568  */
569 void drbd_force_state(struct drbd_conf *mdev,
570 	union drbd_state mask, union drbd_state val)
571 {
572 	drbd_change_state(mdev, CS_HARD, mask, val);
573 }
574 
575 static enum drbd_state_rv is_valid_state(struct drbd_conf *, union drbd_state);
576 static enum drbd_state_rv is_valid_state_transition(struct drbd_conf *,
577 						    union drbd_state,
578 						    union drbd_state);
579 enum sanitize_state_warnings {
580 	NO_WARNING,
581 	ABORTED_ONLINE_VERIFY,
582 	ABORTED_RESYNC,
583 	CONNECTION_LOST_NEGOTIATING,
584 	IMPLICITLY_UPGRADED_DISK,
585 	IMPLICITLY_UPGRADED_PDSK,
586 };
587 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
588 				       union drbd_state ns, enum sanitize_state_warnings *warn);
589 int drbd_send_state_req(struct drbd_conf *,
590 			union drbd_state, union drbd_state);
591 
592 static enum drbd_state_rv
593 _req_st_cond(struct drbd_conf *mdev, union drbd_state mask,
594 	     union drbd_state val)
595 {
596 	union drbd_state os, ns;
597 	unsigned long flags;
598 	enum drbd_state_rv rv;
599 
600 	if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
601 		return SS_CW_SUCCESS;
602 
603 	if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
604 		return SS_CW_FAILED_BY_PEER;
605 
606 	rv = 0;
607 	spin_lock_irqsave(&mdev->req_lock, flags);
608 	os = mdev->state;
609 	ns.i = (os.i & ~mask.i) | val.i;
610 	ns = sanitize_state(mdev, os, ns, NULL);
611 
612 	if (!cl_wide_st_chg(mdev, os, ns))
613 		rv = SS_CW_NO_NEED;
614 	if (!rv) {
615 		rv = is_valid_state(mdev, ns);
616 		if (rv == SS_SUCCESS) {
617 			rv = is_valid_state_transition(mdev, ns, os);
618 			if (rv == SS_SUCCESS)
619 				rv = SS_UNKNOWN_ERROR; /* cont waiting, otherwise fail. */
620 		}
621 	}
622 	spin_unlock_irqrestore(&mdev->req_lock, flags);
623 
624 	return rv;
625 }
626 
627 /**
628  * drbd_req_state() - Perform an eventually cluster wide state change
629  * @mdev:	DRBD device.
630  * @mask:	mask of state bits to change.
631  * @val:	value of new state bits.
632  * @f:		flags
633  *
634  * Should not be called directly, use drbd_request_state() or
635  * _drbd_request_state().
636  */
637 static enum drbd_state_rv
638 drbd_req_state(struct drbd_conf *mdev, union drbd_state mask,
639 	       union drbd_state val, enum chg_state_flags f)
640 {
641 	struct completion done;
642 	unsigned long flags;
643 	union drbd_state os, ns;
644 	enum drbd_state_rv rv;
645 
646 	init_completion(&done);
647 
648 	if (f & CS_SERIALIZE)
649 		mutex_lock(&mdev->state_mutex);
650 
651 	spin_lock_irqsave(&mdev->req_lock, flags);
652 	os = mdev->state;
653 	ns.i = (os.i & ~mask.i) | val.i;
654 	ns = sanitize_state(mdev, os, ns, NULL);
655 
656 	if (cl_wide_st_chg(mdev, os, ns)) {
657 		rv = is_valid_state(mdev, ns);
658 		if (rv == SS_SUCCESS)
659 			rv = is_valid_state_transition(mdev, ns, os);
660 		spin_unlock_irqrestore(&mdev->req_lock, flags);
661 
662 		if (rv < SS_SUCCESS) {
663 			if (f & CS_VERBOSE)
664 				print_st_err(mdev, os, ns, rv);
665 			goto abort;
666 		}
667 
668 		drbd_state_lock(mdev);
669 		if (!drbd_send_state_req(mdev, mask, val)) {
670 			drbd_state_unlock(mdev);
671 			rv = SS_CW_FAILED_BY_PEER;
672 			if (f & CS_VERBOSE)
673 				print_st_err(mdev, os, ns, rv);
674 			goto abort;
675 		}
676 
677 		wait_event(mdev->state_wait,
678 			(rv = _req_st_cond(mdev, mask, val)));
679 
680 		if (rv < SS_SUCCESS) {
681 			drbd_state_unlock(mdev);
682 			if (f & CS_VERBOSE)
683 				print_st_err(mdev, os, ns, rv);
684 			goto abort;
685 		}
686 		spin_lock_irqsave(&mdev->req_lock, flags);
687 		os = mdev->state;
688 		ns.i = (os.i & ~mask.i) | val.i;
689 		rv = _drbd_set_state(mdev, ns, f, &done);
690 		drbd_state_unlock(mdev);
691 	} else {
692 		rv = _drbd_set_state(mdev, ns, f, &done);
693 	}
694 
695 	spin_unlock_irqrestore(&mdev->req_lock, flags);
696 
697 	if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
698 		D_ASSERT(current != mdev->worker.task);
699 		wait_for_completion(&done);
700 	}
701 
702 abort:
703 	if (f & CS_SERIALIZE)
704 		mutex_unlock(&mdev->state_mutex);
705 
706 	return rv;
707 }
708 
709 /**
710  * _drbd_request_state() - Request a state change (with flags)
711  * @mdev:	DRBD device.
712  * @mask:	mask of state bits to change.
713  * @val:	value of new state bits.
714  * @f:		flags
715  *
716  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
717  * flag, or when logging of failed state change requests is not desired.
718  */
719 enum drbd_state_rv
720 _drbd_request_state(struct drbd_conf *mdev, union drbd_state mask,
721 		    union drbd_state val, enum chg_state_flags f)
722 {
723 	enum drbd_state_rv rv;
724 
725 	wait_event(mdev->state_wait,
726 		   (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
727 
728 	return rv;
729 }
730 
731 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
732 {
733 	dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
734 	    name,
735 	    drbd_conn_str(ns.conn),
736 	    drbd_role_str(ns.role),
737 	    drbd_role_str(ns.peer),
738 	    drbd_disk_str(ns.disk),
739 	    drbd_disk_str(ns.pdsk),
740 	    is_susp(ns) ? 's' : 'r',
741 	    ns.aftr_isp ? 'a' : '-',
742 	    ns.peer_isp ? 'p' : '-',
743 	    ns.user_isp ? 'u' : '-'
744 	    );
745 }
746 
747 void print_st_err(struct drbd_conf *mdev, union drbd_state os,
748 	          union drbd_state ns, enum drbd_state_rv err)
749 {
750 	if (err == SS_IN_TRANSIENT_STATE)
751 		return;
752 	dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
753 	print_st(mdev, " state", os);
754 	print_st(mdev, "wanted", ns);
755 }
756 
757 
758 /**
759  * is_valid_state() - Returns an SS_ error code if ns is not valid
760  * @mdev:	DRBD device.
761  * @ns:		State to consider.
762  */
763 static enum drbd_state_rv
764 is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
765 {
766 	/* See drbd_state_sw_errors in drbd_strings.c */
767 
768 	enum drbd_fencing_p fp;
769 	enum drbd_state_rv rv = SS_SUCCESS;
770 
771 	fp = FP_DONT_CARE;
772 	if (get_ldev(mdev)) {
773 		fp = mdev->ldev->dc.fencing;
774 		put_ldev(mdev);
775 	}
776 
777 	if (get_net_conf(mdev)) {
778 		if (!mdev->net_conf->two_primaries &&
779 		    ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
780 			rv = SS_TWO_PRIMARIES;
781 		put_net_conf(mdev);
782 	}
783 
784 	if (rv <= 0)
785 		/* already found a reason to abort */;
786 	else if (ns.role == R_SECONDARY && mdev->open_cnt)
787 		rv = SS_DEVICE_IN_USE;
788 
789 	else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
790 		rv = SS_NO_UP_TO_DATE_DISK;
791 
792 	else if (fp >= FP_RESOURCE &&
793 		 ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
794 		rv = SS_PRIMARY_NOP;
795 
796 	else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
797 		rv = SS_NO_UP_TO_DATE_DISK;
798 
799 	else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
800 		rv = SS_NO_LOCAL_DISK;
801 
802 	else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
803 		rv = SS_NO_REMOTE_DISK;
804 
805 	else if (ns.conn > C_CONNECTED && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE)
806 		rv = SS_NO_UP_TO_DATE_DISK;
807 
808 	else if ((ns.conn == C_CONNECTED ||
809 		  ns.conn == C_WF_BITMAP_S ||
810 		  ns.conn == C_SYNC_SOURCE ||
811 		  ns.conn == C_PAUSED_SYNC_S) &&
812 		  ns.disk == D_OUTDATED)
813 		rv = SS_CONNECTED_OUTDATES;
814 
815 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
816 		 (mdev->sync_conf.verify_alg[0] == 0))
817 		rv = SS_NO_VERIFY_ALG;
818 
819 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
820 		  mdev->agreed_pro_version < 88)
821 		rv = SS_NOT_SUPPORTED;
822 
823 	else if (ns.conn >= C_CONNECTED && ns.pdsk == D_UNKNOWN)
824 		rv = SS_CONNECTED_OUTDATES;
825 
826 	return rv;
827 }
828 
829 /**
830  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
831  * @mdev:	DRBD device.
832  * @ns:		new state.
833  * @os:		old state.
834  */
835 static enum drbd_state_rv
836 is_valid_state_transition(struct drbd_conf *mdev, union drbd_state ns,
837 			  union drbd_state os)
838 {
839 	enum drbd_state_rv rv = SS_SUCCESS;
840 
841 	if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
842 	    os.conn > C_CONNECTED)
843 		rv = SS_RESYNC_RUNNING;
844 
845 	if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
846 		rv = SS_ALREADY_STANDALONE;
847 
848 	if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
849 		rv = SS_IS_DISKLESS;
850 
851 	if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
852 		rv = SS_NO_NET_CONFIG;
853 
854 	if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
855 		rv = SS_LOWER_THAN_OUTDATED;
856 
857 	if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
858 		rv = SS_IN_TRANSIENT_STATE;
859 
860 	if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
861 		rv = SS_IN_TRANSIENT_STATE;
862 
863 	/* While establishing a connection only allow cstate to change.
864 	   Delay/refuse role changes, detach attach etc... */
865 	if (test_bit(STATE_SENT, &mdev->flags) &&
866 	    !(os.conn == C_WF_REPORT_PARAMS ||
867 	      (ns.conn == C_WF_REPORT_PARAMS && os.conn == C_WF_CONNECTION)))
868 		rv = SS_IN_TRANSIENT_STATE;
869 
870 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
871 		rv = SS_NEED_CONNECTION;
872 
873 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
874 	    ns.conn != os.conn && os.conn > C_CONNECTED)
875 		rv = SS_RESYNC_RUNNING;
876 
877 	if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
878 	    os.conn < C_CONNECTED)
879 		rv = SS_NEED_CONNECTION;
880 
881 	if ((ns.conn == C_SYNC_TARGET || ns.conn == C_SYNC_SOURCE)
882 	    && os.conn < C_WF_REPORT_PARAMS)
883 		rv = SS_NEED_CONNECTION; /* No NetworkFailure -> SyncTarget etc... */
884 
885 	return rv;
886 }
887 
888 static void print_sanitize_warnings(struct drbd_conf *mdev, enum sanitize_state_warnings warn)
889 {
890 	static const char *msg_table[] = {
891 		[NO_WARNING] = "",
892 		[ABORTED_ONLINE_VERIFY] = "Online-verify aborted.",
893 		[ABORTED_RESYNC] = "Resync aborted.",
894 		[CONNECTION_LOST_NEGOTIATING] = "Connection lost while negotiating, no data!",
895 		[IMPLICITLY_UPGRADED_DISK] = "Implicitly upgraded disk",
896 		[IMPLICITLY_UPGRADED_PDSK] = "Implicitly upgraded pdsk",
897 	};
898 
899 	if (warn != NO_WARNING)
900 		dev_warn(DEV, "%s\n", msg_table[warn]);
901 }
902 
903 /**
904  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
905  * @mdev:	DRBD device.
906  * @os:		old state.
907  * @ns:		new state.
908  * @warn_sync_abort:
909  *
910  * When we loose connection, we have to set the state of the peers disk (pdsk)
911  * to D_UNKNOWN. This rule and many more along those lines are in this function.
912  */
913 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
914 				       union drbd_state ns, enum sanitize_state_warnings *warn)
915 {
916 	enum drbd_fencing_p fp;
917 	enum drbd_disk_state disk_min, disk_max, pdsk_min, pdsk_max;
918 
919 	if (warn)
920 		*warn = NO_WARNING;
921 
922 	fp = FP_DONT_CARE;
923 	if (get_ldev(mdev)) {
924 		fp = mdev->ldev->dc.fencing;
925 		put_ldev(mdev);
926 	}
927 
928 	/* Disallow Network errors to configure a device's network part */
929 	if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
930 	    os.conn <= C_DISCONNECTING)
931 		ns.conn = os.conn;
932 
933 	/* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow.
934 	 * If you try to go into some Sync* state, that shall fail (elsewhere). */
935 	if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
936 	    ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING && ns.conn <= C_CONNECTED)
937 		ns.conn = os.conn;
938 
939 	/* we cannot fail (again) if we already detached */
940 	if (ns.disk == D_FAILED && os.disk == D_DISKLESS)
941 		ns.disk = D_DISKLESS;
942 
943 	/* After C_DISCONNECTING only C_STANDALONE may follow */
944 	if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
945 		ns.conn = os.conn;
946 
947 	if (ns.conn < C_CONNECTED) {
948 		ns.peer_isp = 0;
949 		ns.peer = R_UNKNOWN;
950 		if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
951 			ns.pdsk = D_UNKNOWN;
952 	}
953 
954 	/* Clear the aftr_isp when becoming unconfigured */
955 	if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
956 		ns.aftr_isp = 0;
957 
958 	/* Abort resync if a disk fails/detaches */
959 	if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
960 	    (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
961 		if (warn)
962 			*warn =	os.conn == C_VERIFY_S || os.conn == C_VERIFY_T ?
963 				ABORTED_ONLINE_VERIFY : ABORTED_RESYNC;
964 		ns.conn = C_CONNECTED;
965 	}
966 
967 	/* Connection breaks down before we finished "Negotiating" */
968 	if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
969 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
970 		if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
971 			ns.disk = mdev->new_state_tmp.disk;
972 			ns.pdsk = mdev->new_state_tmp.pdsk;
973 		} else {
974 			if (warn)
975 				*warn = CONNECTION_LOST_NEGOTIATING;
976 			ns.disk = D_DISKLESS;
977 			ns.pdsk = D_UNKNOWN;
978 		}
979 		put_ldev(mdev);
980 	}
981 
982 	/* D_CONSISTENT and D_OUTDATED vanish when we get connected */
983 	if (ns.conn >= C_CONNECTED && ns.conn < C_AHEAD) {
984 		if (ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED)
985 			ns.disk = D_UP_TO_DATE;
986 		if (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)
987 			ns.pdsk = D_UP_TO_DATE;
988 	}
989 
990 	/* Implications of the connection stat on the disk states */
991 	disk_min = D_DISKLESS;
992 	disk_max = D_UP_TO_DATE;
993 	pdsk_min = D_INCONSISTENT;
994 	pdsk_max = D_UNKNOWN;
995 	switch ((enum drbd_conns)ns.conn) {
996 	case C_WF_BITMAP_T:
997 	case C_PAUSED_SYNC_T:
998 	case C_STARTING_SYNC_T:
999 	case C_WF_SYNC_UUID:
1000 	case C_BEHIND:
1001 		disk_min = D_INCONSISTENT;
1002 		disk_max = D_OUTDATED;
1003 		pdsk_min = D_UP_TO_DATE;
1004 		pdsk_max = D_UP_TO_DATE;
1005 		break;
1006 	case C_VERIFY_S:
1007 	case C_VERIFY_T:
1008 		disk_min = D_UP_TO_DATE;
1009 		disk_max = D_UP_TO_DATE;
1010 		pdsk_min = D_UP_TO_DATE;
1011 		pdsk_max = D_UP_TO_DATE;
1012 		break;
1013 	case C_CONNECTED:
1014 		disk_min = D_DISKLESS;
1015 		disk_max = D_UP_TO_DATE;
1016 		pdsk_min = D_DISKLESS;
1017 		pdsk_max = D_UP_TO_DATE;
1018 		break;
1019 	case C_WF_BITMAP_S:
1020 	case C_PAUSED_SYNC_S:
1021 	case C_STARTING_SYNC_S:
1022 	case C_AHEAD:
1023 		disk_min = D_UP_TO_DATE;
1024 		disk_max = D_UP_TO_DATE;
1025 		pdsk_min = D_INCONSISTENT;
1026 		pdsk_max = D_CONSISTENT; /* D_OUTDATED would be nice. But explicit outdate necessary*/
1027 		break;
1028 	case C_SYNC_TARGET:
1029 		disk_min = D_INCONSISTENT;
1030 		disk_max = D_INCONSISTENT;
1031 		pdsk_min = D_UP_TO_DATE;
1032 		pdsk_max = D_UP_TO_DATE;
1033 		break;
1034 	case C_SYNC_SOURCE:
1035 		disk_min = D_UP_TO_DATE;
1036 		disk_max = D_UP_TO_DATE;
1037 		pdsk_min = D_INCONSISTENT;
1038 		pdsk_max = D_INCONSISTENT;
1039 		break;
1040 	case C_STANDALONE:
1041 	case C_DISCONNECTING:
1042 	case C_UNCONNECTED:
1043 	case C_TIMEOUT:
1044 	case C_BROKEN_PIPE:
1045 	case C_NETWORK_FAILURE:
1046 	case C_PROTOCOL_ERROR:
1047 	case C_TEAR_DOWN:
1048 	case C_WF_CONNECTION:
1049 	case C_WF_REPORT_PARAMS:
1050 	case C_MASK:
1051 		break;
1052 	}
1053 	if (ns.disk > disk_max)
1054 		ns.disk = disk_max;
1055 
1056 	if (ns.disk < disk_min) {
1057 		if (warn)
1058 			*warn = IMPLICITLY_UPGRADED_DISK;
1059 		ns.disk = disk_min;
1060 	}
1061 	if (ns.pdsk > pdsk_max)
1062 		ns.pdsk = pdsk_max;
1063 
1064 	if (ns.pdsk < pdsk_min) {
1065 		if (warn)
1066 			*warn = IMPLICITLY_UPGRADED_PDSK;
1067 		ns.pdsk = pdsk_min;
1068 	}
1069 
1070 	if (fp == FP_STONITH &&
1071 	    (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
1072 	    !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
1073 		ns.susp_fen = 1; /* Suspend IO while fence-peer handler runs (peer lost) */
1074 
1075 	if (mdev->sync_conf.on_no_data == OND_SUSPEND_IO &&
1076 	    (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE) &&
1077 	    !(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE))
1078 		ns.susp_nod = 1; /* Suspend IO while no data available (no accessible data available) */
1079 
1080 	if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
1081 		if (ns.conn == C_SYNC_SOURCE)
1082 			ns.conn = C_PAUSED_SYNC_S;
1083 		if (ns.conn == C_SYNC_TARGET)
1084 			ns.conn = C_PAUSED_SYNC_T;
1085 	} else {
1086 		if (ns.conn == C_PAUSED_SYNC_S)
1087 			ns.conn = C_SYNC_SOURCE;
1088 		if (ns.conn == C_PAUSED_SYNC_T)
1089 			ns.conn = C_SYNC_TARGET;
1090 	}
1091 
1092 	return ns;
1093 }
1094 
1095 /* helper for __drbd_set_state */
1096 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
1097 {
1098 	if (mdev->agreed_pro_version < 90)
1099 		mdev->ov_start_sector = 0;
1100 	mdev->rs_total = drbd_bm_bits(mdev);
1101 	mdev->ov_position = 0;
1102 	if (cs == C_VERIFY_T) {
1103 		/* starting online verify from an arbitrary position
1104 		 * does not fit well into the existing protocol.
1105 		 * on C_VERIFY_T, we initialize ov_left and friends
1106 		 * implicitly in receive_DataRequest once the
1107 		 * first P_OV_REQUEST is received */
1108 		mdev->ov_start_sector = ~(sector_t)0;
1109 	} else {
1110 		unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
1111 		if (bit >= mdev->rs_total) {
1112 			mdev->ov_start_sector =
1113 				BM_BIT_TO_SECT(mdev->rs_total - 1);
1114 			mdev->rs_total = 1;
1115 		} else
1116 			mdev->rs_total -= bit;
1117 		mdev->ov_position = mdev->ov_start_sector;
1118 	}
1119 	mdev->ov_left = mdev->rs_total;
1120 }
1121 
1122 static void drbd_resume_al(struct drbd_conf *mdev)
1123 {
1124 	if (test_and_clear_bit(AL_SUSPENDED, &mdev->flags))
1125 		dev_info(DEV, "Resumed AL updates\n");
1126 }
1127 
1128 /**
1129  * __drbd_set_state() - Set a new DRBD state
1130  * @mdev:	DRBD device.
1131  * @ns:		new state.
1132  * @flags:	Flags
1133  * @done:	Optional completion, that will get completed after the after_state_ch() finished
1134  *
1135  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
1136  */
1137 enum drbd_state_rv
1138 __drbd_set_state(struct drbd_conf *mdev, union drbd_state ns,
1139 	         enum chg_state_flags flags, struct completion *done)
1140 {
1141 	union drbd_state os;
1142 	enum drbd_state_rv rv = SS_SUCCESS;
1143 	enum sanitize_state_warnings ssw;
1144 	struct after_state_chg_work *ascw;
1145 
1146 	os = mdev->state;
1147 
1148 	ns = sanitize_state(mdev, os, ns, &ssw);
1149 
1150 	if (ns.i == os.i)
1151 		return SS_NOTHING_TO_DO;
1152 
1153 	if (!(flags & CS_HARD)) {
1154 		/*  pre-state-change checks ; only look at ns  */
1155 		/* See drbd_state_sw_errors in drbd_strings.c */
1156 
1157 		rv = is_valid_state(mdev, ns);
1158 		if (rv < SS_SUCCESS) {
1159 			/* If the old state was illegal as well, then let
1160 			   this happen...*/
1161 
1162 			if (is_valid_state(mdev, os) == rv)
1163 				rv = is_valid_state_transition(mdev, ns, os);
1164 		} else
1165 			rv = is_valid_state_transition(mdev, ns, os);
1166 	}
1167 
1168 	if (rv < SS_SUCCESS) {
1169 		if (flags & CS_VERBOSE)
1170 			print_st_err(mdev, os, ns, rv);
1171 		return rv;
1172 	}
1173 
1174 	print_sanitize_warnings(mdev, ssw);
1175 
1176 	{
1177 	char *pbp, pb[300];
1178 	pbp = pb;
1179 	*pbp = 0;
1180 	if (ns.role != os.role)
1181 		pbp += sprintf(pbp, "role( %s -> %s ) ",
1182 			       drbd_role_str(os.role),
1183 			       drbd_role_str(ns.role));
1184 	if (ns.peer != os.peer)
1185 		pbp += sprintf(pbp, "peer( %s -> %s ) ",
1186 			       drbd_role_str(os.peer),
1187 			       drbd_role_str(ns.peer));
1188 	if (ns.conn != os.conn)
1189 		pbp += sprintf(pbp, "conn( %s -> %s ) ",
1190 			       drbd_conn_str(os.conn),
1191 			       drbd_conn_str(ns.conn));
1192 	if (ns.disk != os.disk)
1193 		pbp += sprintf(pbp, "disk( %s -> %s ) ",
1194 			       drbd_disk_str(os.disk),
1195 			       drbd_disk_str(ns.disk));
1196 	if (ns.pdsk != os.pdsk)
1197 		pbp += sprintf(pbp, "pdsk( %s -> %s ) ",
1198 			       drbd_disk_str(os.pdsk),
1199 			       drbd_disk_str(ns.pdsk));
1200 	if (is_susp(ns) != is_susp(os))
1201 		pbp += sprintf(pbp, "susp( %d -> %d ) ",
1202 			       is_susp(os),
1203 			       is_susp(ns));
1204 	if (ns.aftr_isp != os.aftr_isp)
1205 		pbp += sprintf(pbp, "aftr_isp( %d -> %d ) ",
1206 			       os.aftr_isp,
1207 			       ns.aftr_isp);
1208 	if (ns.peer_isp != os.peer_isp)
1209 		pbp += sprintf(pbp, "peer_isp( %d -> %d ) ",
1210 			       os.peer_isp,
1211 			       ns.peer_isp);
1212 	if (ns.user_isp != os.user_isp)
1213 		pbp += sprintf(pbp, "user_isp( %d -> %d ) ",
1214 			       os.user_isp,
1215 			       ns.user_isp);
1216 	dev_info(DEV, "%s\n", pb);
1217 	}
1218 
1219 	/* solve the race between becoming unconfigured,
1220 	 * worker doing the cleanup, and
1221 	 * admin reconfiguring us:
1222 	 * on (re)configure, first set CONFIG_PENDING,
1223 	 * then wait for a potentially exiting worker,
1224 	 * start the worker, and schedule one no_op.
1225 	 * then proceed with configuration.
1226 	 */
1227 	if (ns.disk == D_DISKLESS &&
1228 	    ns.conn == C_STANDALONE &&
1229 	    ns.role == R_SECONDARY &&
1230 	    !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
1231 		set_bit(DEVICE_DYING, &mdev->flags);
1232 
1233 	/* if we are going -> D_FAILED or D_DISKLESS, grab one extra reference
1234 	 * on the ldev here, to be sure the transition -> D_DISKLESS resp.
1235 	 * drbd_ldev_destroy() won't happen before our corresponding
1236 	 * after_state_ch works run, where we put_ldev again. */
1237 	if ((os.disk != D_FAILED && ns.disk == D_FAILED) ||
1238 	    (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))
1239 		atomic_inc(&mdev->local_cnt);
1240 
1241 	mdev->state = ns;
1242 
1243 	if (os.disk == D_ATTACHING && ns.disk >= D_NEGOTIATING)
1244 		drbd_print_uuids(mdev, "attached to UUIDs");
1245 
1246 	wake_up(&mdev->misc_wait);
1247 	wake_up(&mdev->state_wait);
1248 
1249 	/* aborted verify run. log the last position */
1250 	if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1251 	    ns.conn < C_CONNECTED) {
1252 		mdev->ov_start_sector =
1253 			BM_BIT_TO_SECT(drbd_bm_bits(mdev) - mdev->ov_left);
1254 		dev_info(DEV, "Online Verify reached sector %llu\n",
1255 			(unsigned long long)mdev->ov_start_sector);
1256 	}
1257 
1258 	if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1259 	    (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1260 		dev_info(DEV, "Syncer continues.\n");
1261 		mdev->rs_paused += (long)jiffies
1262 				  -(long)mdev->rs_mark_time[mdev->rs_last_mark];
1263 		if (ns.conn == C_SYNC_TARGET)
1264 			mod_timer(&mdev->resync_timer, jiffies);
1265 	}
1266 
1267 	if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1268 	    (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1269 		dev_info(DEV, "Resync suspended\n");
1270 		mdev->rs_mark_time[mdev->rs_last_mark] = jiffies;
1271 	}
1272 
1273 	if (os.conn == C_CONNECTED &&
1274 	    (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1275 		unsigned long now = jiffies;
1276 		int i;
1277 
1278 		set_ov_position(mdev, ns.conn);
1279 		mdev->rs_start = now;
1280 		mdev->rs_last_events = 0;
1281 		mdev->rs_last_sect_ev = 0;
1282 		mdev->ov_last_oos_size = 0;
1283 		mdev->ov_last_oos_start = 0;
1284 
1285 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1286 			mdev->rs_mark_left[i] = mdev->ov_left;
1287 			mdev->rs_mark_time[i] = now;
1288 		}
1289 
1290 		drbd_rs_controller_reset(mdev);
1291 
1292 		if (ns.conn == C_VERIFY_S) {
1293 			dev_info(DEV, "Starting Online Verify from sector %llu\n",
1294 					(unsigned long long)mdev->ov_position);
1295 			mod_timer(&mdev->resync_timer, jiffies);
1296 		}
1297 	}
1298 
1299 	if (get_ldev(mdev)) {
1300 		u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1301 						 MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1302 						 MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1303 
1304 		if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1305 			mdf |= MDF_CRASHED_PRIMARY;
1306 		if (mdev->state.role == R_PRIMARY ||
1307 		    (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1308 			mdf |= MDF_PRIMARY_IND;
1309 		if (mdev->state.conn > C_WF_REPORT_PARAMS)
1310 			mdf |= MDF_CONNECTED_IND;
1311 		if (mdev->state.disk > D_INCONSISTENT)
1312 			mdf |= MDF_CONSISTENT;
1313 		if (mdev->state.disk > D_OUTDATED)
1314 			mdf |= MDF_WAS_UP_TO_DATE;
1315 		if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1316 			mdf |= MDF_PEER_OUT_DATED;
1317 		if (mdf != mdev->ldev->md.flags) {
1318 			mdev->ldev->md.flags = mdf;
1319 			drbd_md_mark_dirty(mdev);
1320 		}
1321 		if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1322 			drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1323 		put_ldev(mdev);
1324 	}
1325 
1326 	/* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1327 	if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1328 	    os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1329 		set_bit(CONSIDER_RESYNC, &mdev->flags);
1330 
1331 	/* Receiver should clean up itself */
1332 	if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1333 		drbd_thread_stop_nowait(&mdev->receiver);
1334 
1335 	/* Now the receiver finished cleaning up itself, it should die */
1336 	if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1337 		drbd_thread_stop_nowait(&mdev->receiver);
1338 
1339 	/* Upon network failure, we need to restart the receiver. */
1340 	if (os.conn > C_WF_CONNECTION &&
1341 	    ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1342 		drbd_thread_restart_nowait(&mdev->receiver);
1343 
1344 	/* Resume AL writing if we get a connection */
1345 	if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
1346 		drbd_resume_al(mdev);
1347 
1348 	/* remember last connect and attach times so request_timer_fn() won't
1349 	 * kill newly established sessions while we are still trying to thaw
1350 	 * previously frozen IO */
1351 	if (os.conn != C_WF_REPORT_PARAMS && ns.conn == C_WF_REPORT_PARAMS)
1352 		mdev->last_reconnect_jif = jiffies;
1353 	if ((os.disk == D_ATTACHING || os.disk == D_NEGOTIATING) &&
1354 	    ns.disk > D_NEGOTIATING)
1355 		mdev->last_reattach_jif = jiffies;
1356 
1357 	ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1358 	if (ascw) {
1359 		ascw->os = os;
1360 		ascw->ns = ns;
1361 		ascw->flags = flags;
1362 		ascw->w.cb = w_after_state_ch;
1363 		ascw->done = done;
1364 		drbd_queue_work(&mdev->data.work, &ascw->w);
1365 	} else {
1366 		dev_warn(DEV, "Could not kmalloc an ascw\n");
1367 	}
1368 
1369 	return rv;
1370 }
1371 
1372 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1373 {
1374 	struct after_state_chg_work *ascw =
1375 		container_of(w, struct after_state_chg_work, w);
1376 	after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1377 	if (ascw->flags & CS_WAIT_COMPLETE) {
1378 		D_ASSERT(ascw->done != NULL);
1379 		complete(ascw->done);
1380 	}
1381 	kfree(ascw);
1382 
1383 	return 1;
1384 }
1385 
1386 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1387 {
1388 	if (rv) {
1389 		dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1390 		_drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1391 		return;
1392 	}
1393 
1394 	switch (mdev->state.conn) {
1395 	case C_STARTING_SYNC_T:
1396 		_drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1397 		break;
1398 	case C_STARTING_SYNC_S:
1399 		drbd_start_resync(mdev, C_SYNC_SOURCE);
1400 		break;
1401 	}
1402 }
1403 
1404 int drbd_bitmap_io_from_worker(struct drbd_conf *mdev,
1405 		int (*io_fn)(struct drbd_conf *),
1406 		char *why, enum bm_flag flags)
1407 {
1408 	int rv;
1409 
1410 	D_ASSERT(current == mdev->worker.task);
1411 
1412 	/* open coded non-blocking drbd_suspend_io(mdev); */
1413 	set_bit(SUSPEND_IO, &mdev->flags);
1414 
1415 	drbd_bm_lock(mdev, why, flags);
1416 	rv = io_fn(mdev);
1417 	drbd_bm_unlock(mdev);
1418 
1419 	drbd_resume_io(mdev);
1420 
1421 	return rv;
1422 }
1423 
1424 /**
1425  * after_state_ch() - Perform after state change actions that may sleep
1426  * @mdev:	DRBD device.
1427  * @os:		old state.
1428  * @ns:		new state.
1429  * @flags:	Flags
1430  */
1431 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1432 			   union drbd_state ns, enum chg_state_flags flags)
1433 {
1434 	enum drbd_fencing_p fp;
1435 	enum drbd_req_event what = nothing;
1436 	union drbd_state nsm = (union drbd_state){ .i = -1 };
1437 
1438 	if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1439 		clear_bit(CRASHED_PRIMARY, &mdev->flags);
1440 		if (mdev->p_uuid)
1441 			mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1442 	}
1443 
1444 	fp = FP_DONT_CARE;
1445 	if (get_ldev(mdev)) {
1446 		fp = mdev->ldev->dc.fencing;
1447 		put_ldev(mdev);
1448 	}
1449 
1450 	/* Inform userspace about the change... */
1451 	drbd_bcast_state(mdev, ns);
1452 
1453 	if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1454 	    (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1455 		drbd_khelper(mdev, "pri-on-incon-degr");
1456 
1457 	/* Here we have the actions that are performed after a
1458 	   state change. This function might sleep */
1459 
1460 	if (os.disk <= D_NEGOTIATING && ns.disk > D_NEGOTIATING)
1461 		mod_timer(&mdev->request_timer, jiffies + HZ);
1462 
1463 	nsm.i = -1;
1464 	if (ns.susp_nod) {
1465 		if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
1466 			what = resend;
1467 
1468 		if ((os.disk == D_ATTACHING || os.disk == D_NEGOTIATING) &&
1469 		    ns.disk > D_NEGOTIATING)
1470 			what = restart_frozen_disk_io;
1471 
1472 		if (what != nothing)
1473 			nsm.susp_nod = 0;
1474 	}
1475 
1476 	if (ns.susp_fen) {
1477 		/* case1: The outdate peer handler is successful: */
1478 		if (os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) {
1479 			tl_clear(mdev);
1480 			if (test_bit(NEW_CUR_UUID, &mdev->flags)) {
1481 				drbd_uuid_new_current(mdev);
1482 				clear_bit(NEW_CUR_UUID, &mdev->flags);
1483 			}
1484 			spin_lock_irq(&mdev->req_lock);
1485 			_drbd_set_state(_NS(mdev, susp_fen, 0), CS_VERBOSE, NULL);
1486 			spin_unlock_irq(&mdev->req_lock);
1487 		}
1488 		/* case2: The connection was established again: */
1489 		if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
1490 			clear_bit(NEW_CUR_UUID, &mdev->flags);
1491 			what = resend;
1492 			nsm.susp_fen = 0;
1493 		}
1494 	}
1495 
1496 	if (what != nothing) {
1497 		spin_lock_irq(&mdev->req_lock);
1498 		_tl_restart(mdev, what);
1499 		nsm.i &= mdev->state.i;
1500 		_drbd_set_state(mdev, nsm, CS_VERBOSE, NULL);
1501 		spin_unlock_irq(&mdev->req_lock);
1502 	}
1503 
1504 	/* Became sync source.  With protocol >= 96, we still need to send out
1505 	 * the sync uuid now. Need to do that before any drbd_send_state, or
1506 	 * the other side may go "paused sync" before receiving the sync uuids,
1507 	 * which is unexpected. */
1508 	if ((os.conn != C_SYNC_SOURCE && os.conn != C_PAUSED_SYNC_S) &&
1509 	    (ns.conn == C_SYNC_SOURCE || ns.conn == C_PAUSED_SYNC_S) &&
1510 	    mdev->agreed_pro_version >= 96 && get_ldev(mdev)) {
1511 		drbd_gen_and_send_sync_uuid(mdev);
1512 		put_ldev(mdev);
1513 	}
1514 
1515 	/* Do not change the order of the if above and the two below... */
1516 	if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1517 		/* we probably will start a resync soon.
1518 		 * make sure those things are properly reset. */
1519 		mdev->rs_total = 0;
1520 		mdev->rs_failed = 0;
1521 		atomic_set(&mdev->rs_pending_cnt, 0);
1522 		drbd_rs_cancel_all(mdev);
1523 
1524 		drbd_send_uuids(mdev);
1525 		drbd_send_state(mdev, ns);
1526 	}
1527 	/* No point in queuing send_bitmap if we don't have a connection
1528 	 * anymore, so check also the _current_ state, not only the new state
1529 	 * at the time this work was queued. */
1530 	if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S &&
1531 	    mdev->state.conn == C_WF_BITMAP_S)
1532 		drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL,
1533 				"send_bitmap (WFBitMapS)",
1534 				BM_LOCKED_TEST_ALLOWED);
1535 
1536 	/* Lost contact to peer's copy of the data */
1537 	if ((os.pdsk >= D_INCONSISTENT &&
1538 	     os.pdsk != D_UNKNOWN &&
1539 	     os.pdsk != D_OUTDATED)
1540 	&&  (ns.pdsk < D_INCONSISTENT ||
1541 	     ns.pdsk == D_UNKNOWN ||
1542 	     ns.pdsk == D_OUTDATED)) {
1543 		if (get_ldev(mdev)) {
1544 			if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1545 			    mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1546 				if (is_susp(mdev->state)) {
1547 					set_bit(NEW_CUR_UUID, &mdev->flags);
1548 				} else {
1549 					drbd_uuid_new_current(mdev);
1550 					drbd_send_uuids(mdev);
1551 				}
1552 			}
1553 			put_ldev(mdev);
1554 		}
1555 	}
1556 
1557 	if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1558 		if (os.peer == R_SECONDARY && ns.peer == R_PRIMARY &&
1559 		    mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1560 			drbd_uuid_new_current(mdev);
1561 			drbd_send_uuids(mdev);
1562 		}
1563 		/* D_DISKLESS Peer becomes secondary */
1564 		if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1565 			/* We may still be Primary ourselves.
1566 			 * No harm done if the bitmap still changes,
1567 			 * redirtied pages will follow later. */
1568 			drbd_bitmap_io_from_worker(mdev, &drbd_bm_write,
1569 				"demote diskless peer", BM_LOCKED_SET_ALLOWED);
1570 		put_ldev(mdev);
1571 	}
1572 
1573 	/* Write out all changed bits on demote.
1574 	 * Though, no need to da that just yet
1575 	 * if there is a resync going on still */
1576 	if (os.role == R_PRIMARY && ns.role == R_SECONDARY &&
1577 		mdev->state.conn <= C_CONNECTED && get_ldev(mdev)) {
1578 		/* No changes to the bitmap expected this time, so assert that,
1579 		 * even though no harm was done if it did change. */
1580 		drbd_bitmap_io_from_worker(mdev, &drbd_bm_write,
1581 				"demote", BM_LOCKED_TEST_ALLOWED);
1582 		put_ldev(mdev);
1583 	}
1584 
1585 	/* Last part of the attaching process ... */
1586 	if (ns.conn >= C_CONNECTED &&
1587 	    os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1588 		drbd_send_sizes(mdev, 0, 0);  /* to start sync... */
1589 		drbd_send_uuids(mdev);
1590 		drbd_send_state(mdev, ns);
1591 	}
1592 
1593 	/* We want to pause/continue resync, tell peer. */
1594 	if (ns.conn >= C_CONNECTED &&
1595 	     ((os.aftr_isp != ns.aftr_isp) ||
1596 	      (os.user_isp != ns.user_isp)))
1597 		drbd_send_state(mdev, ns);
1598 
1599 	/* In case one of the isp bits got set, suspend other devices. */
1600 	if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1601 	    (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1602 		suspend_other_sg(mdev);
1603 
1604 	/* Make sure the peer gets informed about eventual state
1605 	   changes (ISP bits) while we were in WFReportParams. */
1606 	if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1607 		drbd_send_state(mdev, ns);
1608 
1609 	if (os.conn != C_AHEAD && ns.conn == C_AHEAD)
1610 		drbd_send_state(mdev, ns);
1611 
1612 	/* We are in the progress to start a full sync... */
1613 	if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1614 	    (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1615 		/* no other bitmap changes expected during this phase */
1616 		drbd_queue_bitmap_io(mdev,
1617 			&drbd_bmio_set_n_write, &abw_start_sync,
1618 			"set_n_write from StartingSync", BM_LOCKED_TEST_ALLOWED);
1619 
1620 	/* We are invalidating our self... */
1621 	if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1622 	    os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1623 		/* other bitmap operation expected during this phase */
1624 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL,
1625 			"set_n_write from invalidate", BM_LOCKED_MASK);
1626 
1627 	/* first half of local IO error, failure to attach,
1628 	 * or administrative detach */
1629 	if (os.disk != D_FAILED && ns.disk == D_FAILED) {
1630 		enum drbd_io_error_p eh = EP_PASS_ON;
1631 		int was_io_error = 0;
1632 		/* corresponding get_ldev was in __drbd_set_state, to serialize
1633 		 * our cleanup here with the transition to D_DISKLESS.
1634 		 * But is is still not save to dreference ldev here, since
1635 		 * we might come from an failed Attach before ldev was set. */
1636 		if (mdev->ldev) {
1637 			eh = mdev->ldev->dc.on_io_error;
1638 			was_io_error = test_and_clear_bit(WAS_IO_ERROR, &mdev->flags);
1639 
1640 			if (was_io_error && eh == EP_CALL_HELPER)
1641 				drbd_khelper(mdev, "local-io-error");
1642 
1643 			/* Immediately allow completion of all application IO,
1644 			 * that waits for completion from the local disk,
1645 			 * if this was a force-detach due to disk_timeout
1646 			 * or administrator request (drbdsetup detach --force).
1647 			 * Do NOT abort otherwise.
1648 			 * Aborting local requests may cause serious problems,
1649 			 * if requests are completed to upper layers already,
1650 			 * and then later the already submitted local bio completes.
1651 			 * This can cause DMA into former bio pages that meanwhile
1652 			 * have been re-used for other things.
1653 			 * So aborting local requests may cause crashes,
1654 			 * or even worse, silent data corruption.
1655 			 */
1656 			if (test_and_clear_bit(FORCE_DETACH, &mdev->flags))
1657 				tl_abort_disk_io(mdev);
1658 
1659 			/* current state still has to be D_FAILED,
1660 			 * there is only one way out: to D_DISKLESS,
1661 			 * and that may only happen after our put_ldev below. */
1662 			if (mdev->state.disk != D_FAILED)
1663 				dev_err(DEV,
1664 					"ASSERT FAILED: disk is %s during detach\n",
1665 					drbd_disk_str(mdev->state.disk));
1666 
1667 			if (ns.conn >= C_CONNECTED)
1668 				drbd_send_state(mdev, ns);
1669 
1670 			drbd_rs_cancel_all(mdev);
1671 
1672 			/* In case we want to get something to stable storage still,
1673 			 * this may be the last chance.
1674 			 * Following put_ldev may transition to D_DISKLESS. */
1675 			drbd_md_sync(mdev);
1676 		}
1677 		put_ldev(mdev);
1678 	}
1679 
1680         /* second half of local IO error, failure to attach,
1681          * or administrative detach,
1682          * after local_cnt references have reached zero again */
1683         if (os.disk != D_DISKLESS && ns.disk == D_DISKLESS) {
1684                 /* We must still be diskless,
1685                  * re-attach has to be serialized with this! */
1686                 if (mdev->state.disk != D_DISKLESS)
1687                         dev_err(DEV,
1688                                 "ASSERT FAILED: disk is %s while going diskless\n",
1689                                 drbd_disk_str(mdev->state.disk));
1690 
1691 		if (ns.conn >= C_CONNECTED)
1692 			drbd_send_state(mdev, ns);
1693 
1694 		/* corresponding get_ldev in __drbd_set_state
1695 		 * this may finally trigger drbd_ldev_destroy. */
1696 		put_ldev(mdev);
1697 	}
1698 
1699 	/* Notify peer that I had a local IO error, and did not detached.. */
1700 	if (os.disk == D_UP_TO_DATE && ns.disk == D_INCONSISTENT && ns.conn >= C_CONNECTED)
1701 		drbd_send_state(mdev, ns);
1702 
1703 	/* Disks got bigger while they were detached */
1704 	if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1705 	    test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1706 		if (ns.conn == C_CONNECTED)
1707 			resync_after_online_grow(mdev);
1708 	}
1709 
1710 	/* A resync finished or aborted, wake paused devices... */
1711 	if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1712 	    (os.peer_isp && !ns.peer_isp) ||
1713 	    (os.user_isp && !ns.user_isp))
1714 		resume_next_sg(mdev);
1715 
1716 	/* sync target done with resync.  Explicitly notify peer, even though
1717 	 * it should (at least for non-empty resyncs) already know itself. */
1718 	if (os.disk < D_UP_TO_DATE && os.conn >= C_SYNC_SOURCE && ns.conn == C_CONNECTED)
1719 		drbd_send_state(mdev, ns);
1720 
1721 	/* Wake up role changes, that were delayed because of connection establishing */
1722 	if (os.conn == C_WF_REPORT_PARAMS && ns.conn != C_WF_REPORT_PARAMS) {
1723 		clear_bit(STATE_SENT, &mdev->flags);
1724 		wake_up(&mdev->state_wait);
1725 	}
1726 
1727 	/* This triggers bitmap writeout of potentially still unwritten pages
1728 	 * if the resync finished cleanly, or aborted because of peer disk
1729 	 * failure, or because of connection loss.
1730 	 * For resync aborted because of local disk failure, we cannot do
1731 	 * any bitmap writeout anymore.
1732 	 * No harm done if some bits change during this phase.
1733 	 */
1734 	if (os.conn > C_CONNECTED && ns.conn <= C_CONNECTED && get_ldev(mdev)) {
1735 		drbd_queue_bitmap_io(mdev, &drbd_bm_write_copy_pages, NULL,
1736 			"write from resync_finished", BM_LOCKED_CHANGE_ALLOWED);
1737 		put_ldev(mdev);
1738 	}
1739 
1740 	/* free tl_hash if we Got thawed and are C_STANDALONE */
1741 	if (ns.conn == C_STANDALONE && !is_susp(ns) && mdev->tl_hash)
1742 		drbd_free_tl_hash(mdev);
1743 
1744 	/* Upon network connection, we need to start the receiver */
1745 	if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1746 		drbd_thread_start(&mdev->receiver);
1747 
1748 	/* Terminate worker thread if we are unconfigured - it will be
1749 	   restarted as needed... */
1750 	if (ns.disk == D_DISKLESS &&
1751 	    ns.conn == C_STANDALONE &&
1752 	    ns.role == R_SECONDARY) {
1753 		if (os.aftr_isp != ns.aftr_isp)
1754 			resume_next_sg(mdev);
1755 		/* set in __drbd_set_state, unless CONFIG_PENDING was set */
1756 		if (test_bit(DEVICE_DYING, &mdev->flags))
1757 			drbd_thread_stop_nowait(&mdev->worker);
1758 	}
1759 
1760 	drbd_md_sync(mdev);
1761 }
1762 
1763 
1764 static int drbd_thread_setup(void *arg)
1765 {
1766 	struct drbd_thread *thi = (struct drbd_thread *) arg;
1767 	struct drbd_conf *mdev = thi->mdev;
1768 	unsigned long flags;
1769 	int retval;
1770 
1771 restart:
1772 	retval = thi->function(thi);
1773 
1774 	spin_lock_irqsave(&thi->t_lock, flags);
1775 
1776 	/* if the receiver has been "Exiting", the last thing it did
1777 	 * was set the conn state to "StandAlone",
1778 	 * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1779 	 * and receiver thread will be "started".
1780 	 * drbd_thread_start needs to set "Restarting" in that case.
1781 	 * t_state check and assignment needs to be within the same spinlock,
1782 	 * so either thread_start sees Exiting, and can remap to Restarting,
1783 	 * or thread_start see None, and can proceed as normal.
1784 	 */
1785 
1786 	if (thi->t_state == Restarting) {
1787 		dev_info(DEV, "Restarting %s\n", current->comm);
1788 		thi->t_state = Running;
1789 		spin_unlock_irqrestore(&thi->t_lock, flags);
1790 		goto restart;
1791 	}
1792 
1793 	thi->task = NULL;
1794 	thi->t_state = None;
1795 	smp_mb();
1796 	complete(&thi->stop);
1797 	spin_unlock_irqrestore(&thi->t_lock, flags);
1798 
1799 	dev_info(DEV, "Terminating %s\n", current->comm);
1800 
1801 	/* Release mod reference taken when thread was started */
1802 	module_put(THIS_MODULE);
1803 	return retval;
1804 }
1805 
1806 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1807 		      int (*func) (struct drbd_thread *))
1808 {
1809 	spin_lock_init(&thi->t_lock);
1810 	thi->task    = NULL;
1811 	thi->t_state = None;
1812 	thi->function = func;
1813 	thi->mdev = mdev;
1814 }
1815 
1816 int drbd_thread_start(struct drbd_thread *thi)
1817 {
1818 	struct drbd_conf *mdev = thi->mdev;
1819 	struct task_struct *nt;
1820 	unsigned long flags;
1821 
1822 	const char *me =
1823 		thi == &mdev->receiver ? "receiver" :
1824 		thi == &mdev->asender  ? "asender"  :
1825 		thi == &mdev->worker   ? "worker"   : "NONSENSE";
1826 
1827 	/* is used from state engine doing drbd_thread_stop_nowait,
1828 	 * while holding the req lock irqsave */
1829 	spin_lock_irqsave(&thi->t_lock, flags);
1830 
1831 	switch (thi->t_state) {
1832 	case None:
1833 		dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1834 				me, current->comm, current->pid);
1835 
1836 		/* Get ref on module for thread - this is released when thread exits */
1837 		if (!try_module_get(THIS_MODULE)) {
1838 			dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1839 			spin_unlock_irqrestore(&thi->t_lock, flags);
1840 			return false;
1841 		}
1842 
1843 		init_completion(&thi->stop);
1844 		D_ASSERT(thi->task == NULL);
1845 		thi->reset_cpu_mask = 1;
1846 		thi->t_state = Running;
1847 		spin_unlock_irqrestore(&thi->t_lock, flags);
1848 		flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1849 
1850 		nt = kthread_create(drbd_thread_setup, (void *) thi,
1851 				    "drbd%d_%s", mdev_to_minor(mdev), me);
1852 
1853 		if (IS_ERR(nt)) {
1854 			dev_err(DEV, "Couldn't start thread\n");
1855 
1856 			module_put(THIS_MODULE);
1857 			return false;
1858 		}
1859 		spin_lock_irqsave(&thi->t_lock, flags);
1860 		thi->task = nt;
1861 		thi->t_state = Running;
1862 		spin_unlock_irqrestore(&thi->t_lock, flags);
1863 		wake_up_process(nt);
1864 		break;
1865 	case Exiting:
1866 		thi->t_state = Restarting;
1867 		dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1868 				me, current->comm, current->pid);
1869 		/* fall through */
1870 	case Running:
1871 	case Restarting:
1872 	default:
1873 		spin_unlock_irqrestore(&thi->t_lock, flags);
1874 		break;
1875 	}
1876 
1877 	return true;
1878 }
1879 
1880 
1881 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1882 {
1883 	unsigned long flags;
1884 
1885 	enum drbd_thread_state ns = restart ? Restarting : Exiting;
1886 
1887 	/* may be called from state engine, holding the req lock irqsave */
1888 	spin_lock_irqsave(&thi->t_lock, flags);
1889 
1890 	if (thi->t_state == None) {
1891 		spin_unlock_irqrestore(&thi->t_lock, flags);
1892 		if (restart)
1893 			drbd_thread_start(thi);
1894 		return;
1895 	}
1896 
1897 	if (thi->t_state != ns) {
1898 		if (thi->task == NULL) {
1899 			spin_unlock_irqrestore(&thi->t_lock, flags);
1900 			return;
1901 		}
1902 
1903 		thi->t_state = ns;
1904 		smp_mb();
1905 		init_completion(&thi->stop);
1906 		if (thi->task != current)
1907 			force_sig(DRBD_SIGKILL, thi->task);
1908 
1909 	}
1910 
1911 	spin_unlock_irqrestore(&thi->t_lock, flags);
1912 
1913 	if (wait)
1914 		wait_for_completion(&thi->stop);
1915 }
1916 
1917 #ifdef CONFIG_SMP
1918 /**
1919  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1920  * @mdev:	DRBD device.
1921  *
1922  * Forces all threads of a device onto the same CPU. This is beneficial for
1923  * DRBD's performance. May be overwritten by user's configuration.
1924  */
1925 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1926 {
1927 	int ord, cpu;
1928 
1929 	/* user override. */
1930 	if (cpumask_weight(mdev->cpu_mask))
1931 		return;
1932 
1933 	ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1934 	for_each_online_cpu(cpu) {
1935 		if (ord-- == 0) {
1936 			cpumask_set_cpu(cpu, mdev->cpu_mask);
1937 			return;
1938 		}
1939 	}
1940 	/* should not be reached */
1941 	cpumask_setall(mdev->cpu_mask);
1942 }
1943 
1944 /**
1945  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1946  * @mdev:	DRBD device.
1947  *
1948  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1949  * prematurely.
1950  */
1951 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1952 {
1953 	struct task_struct *p = current;
1954 	struct drbd_thread *thi =
1955 		p == mdev->asender.task  ? &mdev->asender  :
1956 		p == mdev->receiver.task ? &mdev->receiver :
1957 		p == mdev->worker.task   ? &mdev->worker   :
1958 		NULL;
1959 	ERR_IF(thi == NULL)
1960 		return;
1961 	if (!thi->reset_cpu_mask)
1962 		return;
1963 	thi->reset_cpu_mask = 0;
1964 	set_cpus_allowed_ptr(p, mdev->cpu_mask);
1965 }
1966 #endif
1967 
1968 /* the appropriate socket mutex must be held already */
1969 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1970 			  enum drbd_packets cmd, struct p_header80 *h,
1971 			  size_t size, unsigned msg_flags)
1972 {
1973 	int sent, ok;
1974 
1975 	ERR_IF(!h) return false;
1976 	ERR_IF(!size) return false;
1977 
1978 	h->magic   = BE_DRBD_MAGIC;
1979 	h->command = cpu_to_be16(cmd);
1980 	h->length  = cpu_to_be16(size-sizeof(struct p_header80));
1981 
1982 	sent = drbd_send(mdev, sock, h, size, msg_flags);
1983 
1984 	ok = (sent == size);
1985 	if (!ok && !signal_pending(current))
1986 		dev_warn(DEV, "short sent %s size=%d sent=%d\n",
1987 		    cmdname(cmd), (int)size, sent);
1988 	return ok;
1989 }
1990 
1991 /* don't pass the socket. we may only look at it
1992  * when we hold the appropriate socket mutex.
1993  */
1994 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1995 		  enum drbd_packets cmd, struct p_header80 *h, size_t size)
1996 {
1997 	int ok = 0;
1998 	struct socket *sock;
1999 
2000 	if (use_data_socket) {
2001 		mutex_lock(&mdev->data.mutex);
2002 		sock = mdev->data.socket;
2003 	} else {
2004 		mutex_lock(&mdev->meta.mutex);
2005 		sock = mdev->meta.socket;
2006 	}
2007 
2008 	/* drbd_disconnect() could have called drbd_free_sock()
2009 	 * while we were waiting in down()... */
2010 	if (likely(sock != NULL))
2011 		ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
2012 
2013 	if (use_data_socket)
2014 		mutex_unlock(&mdev->data.mutex);
2015 	else
2016 		mutex_unlock(&mdev->meta.mutex);
2017 	return ok;
2018 }
2019 
2020 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
2021 		   size_t size)
2022 {
2023 	struct p_header80 h;
2024 	int ok;
2025 
2026 	h.magic   = BE_DRBD_MAGIC;
2027 	h.command = cpu_to_be16(cmd);
2028 	h.length  = cpu_to_be16(size);
2029 
2030 	if (!drbd_get_data_sock(mdev))
2031 		return 0;
2032 
2033 	ok = (sizeof(h) ==
2034 		drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
2035 	ok = ok && (size ==
2036 		drbd_send(mdev, mdev->data.socket, data, size, 0));
2037 
2038 	drbd_put_data_sock(mdev);
2039 
2040 	return ok;
2041 }
2042 
2043 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
2044 {
2045 	struct p_rs_param_95 *p;
2046 	struct socket *sock;
2047 	int size, rv;
2048 	const int apv = mdev->agreed_pro_version;
2049 
2050 	size = apv <= 87 ? sizeof(struct p_rs_param)
2051 		: apv == 88 ? sizeof(struct p_rs_param)
2052 			+ strlen(mdev->sync_conf.verify_alg) + 1
2053 		: apv <= 94 ? sizeof(struct p_rs_param_89)
2054 		: /* apv >= 95 */ sizeof(struct p_rs_param_95);
2055 
2056 	/* used from admin command context and receiver/worker context.
2057 	 * to avoid kmalloc, grab the socket right here,
2058 	 * then use the pre-allocated sbuf there */
2059 	mutex_lock(&mdev->data.mutex);
2060 	sock = mdev->data.socket;
2061 
2062 	if (likely(sock != NULL)) {
2063 		enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
2064 
2065 		p = &mdev->data.sbuf.rs_param_95;
2066 
2067 		/* initialize verify_alg and csums_alg */
2068 		memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2069 
2070 		p->rate = cpu_to_be32(sc->rate);
2071 		p->c_plan_ahead = cpu_to_be32(sc->c_plan_ahead);
2072 		p->c_delay_target = cpu_to_be32(sc->c_delay_target);
2073 		p->c_fill_target = cpu_to_be32(sc->c_fill_target);
2074 		p->c_max_rate = cpu_to_be32(sc->c_max_rate);
2075 
2076 		if (apv >= 88)
2077 			strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
2078 		if (apv >= 89)
2079 			strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
2080 
2081 		rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
2082 	} else
2083 		rv = 0; /* not ok */
2084 
2085 	mutex_unlock(&mdev->data.mutex);
2086 
2087 	return rv;
2088 }
2089 
2090 int drbd_send_protocol(struct drbd_conf *mdev)
2091 {
2092 	struct p_protocol *p;
2093 	int size, cf, rv;
2094 
2095 	size = sizeof(struct p_protocol);
2096 
2097 	if (mdev->agreed_pro_version >= 87)
2098 		size += strlen(mdev->net_conf->integrity_alg) + 1;
2099 
2100 	/* we must not recurse into our own queue,
2101 	 * as that is blocked during handshake */
2102 	p = kmalloc(size, GFP_NOIO);
2103 	if (p == NULL)
2104 		return 0;
2105 
2106 	p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
2107 	p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
2108 	p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
2109 	p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
2110 	p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
2111 
2112 	cf = 0;
2113 	if (mdev->net_conf->want_lose)
2114 		cf |= CF_WANT_LOSE;
2115 	if (mdev->net_conf->dry_run) {
2116 		if (mdev->agreed_pro_version >= 92)
2117 			cf |= CF_DRY_RUN;
2118 		else {
2119 			dev_err(DEV, "--dry-run is not supported by peer");
2120 			kfree(p);
2121 			return -1;
2122 		}
2123 	}
2124 	p->conn_flags    = cpu_to_be32(cf);
2125 
2126 	if (mdev->agreed_pro_version >= 87)
2127 		strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
2128 
2129 	rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
2130 			   (struct p_header80 *)p, size);
2131 	kfree(p);
2132 	return rv;
2133 }
2134 
2135 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
2136 {
2137 	struct p_uuids p;
2138 	int i;
2139 
2140 	if (!get_ldev_if_state(mdev, D_NEGOTIATING))
2141 		return 1;
2142 
2143 	for (i = UI_CURRENT; i < UI_SIZE; i++)
2144 		p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
2145 
2146 	mdev->comm_bm_set = drbd_bm_total_weight(mdev);
2147 	p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
2148 	uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
2149 	uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
2150 	uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
2151 	p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
2152 
2153 	put_ldev(mdev);
2154 
2155 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
2156 			     (struct p_header80 *)&p, sizeof(p));
2157 }
2158 
2159 int drbd_send_uuids(struct drbd_conf *mdev)
2160 {
2161 	return _drbd_send_uuids(mdev, 0);
2162 }
2163 
2164 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
2165 {
2166 	return _drbd_send_uuids(mdev, 8);
2167 }
2168 
2169 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
2170 {
2171 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
2172 		u64 *uuid = mdev->ldev->md.uuid;
2173 		dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
2174 		     text,
2175 		     (unsigned long long)uuid[UI_CURRENT],
2176 		     (unsigned long long)uuid[UI_BITMAP],
2177 		     (unsigned long long)uuid[UI_HISTORY_START],
2178 		     (unsigned long long)uuid[UI_HISTORY_END]);
2179 		put_ldev(mdev);
2180 	} else {
2181 		dev_info(DEV, "%s effective data uuid: %016llX\n",
2182 				text,
2183 				(unsigned long long)mdev->ed_uuid);
2184 	}
2185 }
2186 
2187 int drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
2188 {
2189 	struct p_rs_uuid p;
2190 	u64 uuid;
2191 
2192 	D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
2193 
2194 	uuid = mdev->ldev->md.uuid[UI_BITMAP];
2195 	if (uuid && uuid != UUID_JUST_CREATED)
2196 		uuid = uuid + UUID_NEW_BM_OFFSET;
2197 	else
2198 		get_random_bytes(&uuid, sizeof(u64));
2199 	drbd_uuid_set(mdev, UI_BITMAP, uuid);
2200 	drbd_print_uuids(mdev, "updated sync UUID");
2201 	drbd_md_sync(mdev);
2202 	p.uuid = cpu_to_be64(uuid);
2203 
2204 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
2205 			     (struct p_header80 *)&p, sizeof(p));
2206 }
2207 
2208 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
2209 {
2210 	struct p_sizes p;
2211 	sector_t d_size, u_size;
2212 	int q_order_type;
2213 	unsigned int max_bio_size;
2214 	int ok;
2215 
2216 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
2217 		D_ASSERT(mdev->ldev->backing_bdev);
2218 		d_size = drbd_get_max_capacity(mdev->ldev);
2219 		u_size = mdev->ldev->dc.disk_size;
2220 		q_order_type = drbd_queue_order_type(mdev);
2221 		max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
2222 		max_bio_size = min(max_bio_size, DRBD_MAX_BIO_SIZE);
2223 		put_ldev(mdev);
2224 	} else {
2225 		d_size = 0;
2226 		u_size = 0;
2227 		q_order_type = QUEUE_ORDERED_NONE;
2228 		max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
2229 	}
2230 
2231 	/* Never allow old drbd (up to 8.3.7) to see more than 32KiB */
2232 	if (mdev->agreed_pro_version <= 94)
2233 		max_bio_size = min(max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
2234 
2235 	p.d_size = cpu_to_be64(d_size);
2236 	p.u_size = cpu_to_be64(u_size);
2237 	p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
2238 	p.max_bio_size = cpu_to_be32(max_bio_size);
2239 	p.queue_order_type = cpu_to_be16(q_order_type);
2240 	p.dds_flags = cpu_to_be16(flags);
2241 
2242 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
2243 			   (struct p_header80 *)&p, sizeof(p));
2244 	return ok;
2245 }
2246 
2247 /**
2248  * drbd_send_current_state() - Sends the drbd state to the peer
2249  * @mdev:	DRBD device.
2250  */
2251 int drbd_send_current_state(struct drbd_conf *mdev)
2252 {
2253 	struct socket *sock;
2254 	struct p_state p;
2255 	int ok = 0;
2256 
2257 	/* Grab state lock so we wont send state if we're in the middle
2258 	 * of a cluster wide state change on another thread */
2259 	drbd_state_lock(mdev);
2260 
2261 	mutex_lock(&mdev->data.mutex);
2262 
2263 	p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
2264 	sock = mdev->data.socket;
2265 
2266 	if (likely(sock != NULL)) {
2267 		ok = _drbd_send_cmd(mdev, sock, P_STATE,
2268 				    (struct p_header80 *)&p, sizeof(p), 0);
2269 	}
2270 
2271 	mutex_unlock(&mdev->data.mutex);
2272 
2273 	drbd_state_unlock(mdev);
2274 	return ok;
2275 }
2276 
2277 /**
2278  * drbd_send_state() - After a state change, sends the new state to the peer
2279  * @mdev:	DRBD device.
2280  * @state:	the state to send, not necessarily the current state.
2281  *
2282  * Each state change queues an "after_state_ch" work, which will eventually
2283  * send the resulting new state to the peer. If more state changes happen
2284  * between queuing and processing of the after_state_ch work, we still
2285  * want to send each intermediary state in the order it occurred.
2286  */
2287 int drbd_send_state(struct drbd_conf *mdev, union drbd_state state)
2288 {
2289 	struct socket *sock;
2290 	struct p_state p;
2291 	int ok = 0;
2292 
2293 	mutex_lock(&mdev->data.mutex);
2294 
2295 	p.state = cpu_to_be32(state.i);
2296 	sock = mdev->data.socket;
2297 
2298 	if (likely(sock != NULL)) {
2299 		ok = _drbd_send_cmd(mdev, sock, P_STATE,
2300 				    (struct p_header80 *)&p, sizeof(p), 0);
2301 	}
2302 
2303 	mutex_unlock(&mdev->data.mutex);
2304 
2305 	return ok;
2306 }
2307 
2308 int drbd_send_state_req(struct drbd_conf *mdev,
2309 	union drbd_state mask, union drbd_state val)
2310 {
2311 	struct p_req_state p;
2312 
2313 	p.mask    = cpu_to_be32(mask.i);
2314 	p.val     = cpu_to_be32(val.i);
2315 
2316 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
2317 			     (struct p_header80 *)&p, sizeof(p));
2318 }
2319 
2320 int drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
2321 {
2322 	struct p_req_state_reply p;
2323 
2324 	p.retcode    = cpu_to_be32(retcode);
2325 
2326 	return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
2327 			     (struct p_header80 *)&p, sizeof(p));
2328 }
2329 
2330 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
2331 	struct p_compressed_bm *p,
2332 	struct bm_xfer_ctx *c)
2333 {
2334 	struct bitstream bs;
2335 	unsigned long plain_bits;
2336 	unsigned long tmp;
2337 	unsigned long rl;
2338 	unsigned len;
2339 	unsigned toggle;
2340 	int bits;
2341 
2342 	/* may we use this feature? */
2343 	if ((mdev->sync_conf.use_rle == 0) ||
2344 		(mdev->agreed_pro_version < 90))
2345 			return 0;
2346 
2347 	if (c->bit_offset >= c->bm_bits)
2348 		return 0; /* nothing to do. */
2349 
2350 	/* use at most thus many bytes */
2351 	bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
2352 	memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
2353 	/* plain bits covered in this code string */
2354 	plain_bits = 0;
2355 
2356 	/* p->encoding & 0x80 stores whether the first run length is set.
2357 	 * bit offset is implicit.
2358 	 * start with toggle == 2 to be able to tell the first iteration */
2359 	toggle = 2;
2360 
2361 	/* see how much plain bits we can stuff into one packet
2362 	 * using RLE and VLI. */
2363 	do {
2364 		tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
2365 				    : _drbd_bm_find_next(mdev, c->bit_offset);
2366 		if (tmp == -1UL)
2367 			tmp = c->bm_bits;
2368 		rl = tmp - c->bit_offset;
2369 
2370 		if (toggle == 2) { /* first iteration */
2371 			if (rl == 0) {
2372 				/* the first checked bit was set,
2373 				 * store start value, */
2374 				DCBP_set_start(p, 1);
2375 				/* but skip encoding of zero run length */
2376 				toggle = !toggle;
2377 				continue;
2378 			}
2379 			DCBP_set_start(p, 0);
2380 		}
2381 
2382 		/* paranoia: catch zero runlength.
2383 		 * can only happen if bitmap is modified while we scan it. */
2384 		if (rl == 0) {
2385 			dev_err(DEV, "unexpected zero runlength while encoding bitmap "
2386 			    "t:%u bo:%lu\n", toggle, c->bit_offset);
2387 			return -1;
2388 		}
2389 
2390 		bits = vli_encode_bits(&bs, rl);
2391 		if (bits == -ENOBUFS) /* buffer full */
2392 			break;
2393 		if (bits <= 0) {
2394 			dev_err(DEV, "error while encoding bitmap: %d\n", bits);
2395 			return 0;
2396 		}
2397 
2398 		toggle = !toggle;
2399 		plain_bits += rl;
2400 		c->bit_offset = tmp;
2401 	} while (c->bit_offset < c->bm_bits);
2402 
2403 	len = bs.cur.b - p->code + !!bs.cur.bit;
2404 
2405 	if (plain_bits < (len << 3)) {
2406 		/* incompressible with this method.
2407 		 * we need to rewind both word and bit position. */
2408 		c->bit_offset -= plain_bits;
2409 		bm_xfer_ctx_bit_to_word_offset(c);
2410 		c->bit_offset = c->word_offset * BITS_PER_LONG;
2411 		return 0;
2412 	}
2413 
2414 	/* RLE + VLI was able to compress it just fine.
2415 	 * update c->word_offset. */
2416 	bm_xfer_ctx_bit_to_word_offset(c);
2417 
2418 	/* store pad_bits */
2419 	DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
2420 
2421 	return len;
2422 }
2423 
2424 /**
2425  * send_bitmap_rle_or_plain
2426  *
2427  * Return 0 when done, 1 when another iteration is needed, and a negative error
2428  * code upon failure.
2429  */
2430 static int
2431 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
2432 			 struct p_header80 *h, struct bm_xfer_ctx *c)
2433 {
2434 	struct p_compressed_bm *p = (void*)h;
2435 	unsigned long num_words;
2436 	int len;
2437 	int ok;
2438 
2439 	len = fill_bitmap_rle_bits(mdev, p, c);
2440 
2441 	if (len < 0)
2442 		return -EIO;
2443 
2444 	if (len) {
2445 		DCBP_set_code(p, RLE_VLI_Bits);
2446 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
2447 			sizeof(*p) + len, 0);
2448 
2449 		c->packets[0]++;
2450 		c->bytes[0] += sizeof(*p) + len;
2451 
2452 		if (c->bit_offset >= c->bm_bits)
2453 			len = 0; /* DONE */
2454 	} else {
2455 		/* was not compressible.
2456 		 * send a buffer full of plain text bits instead. */
2457 		num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
2458 		len = num_words * sizeof(long);
2459 		if (len)
2460 			drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
2461 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
2462 				   h, sizeof(struct p_header80) + len, 0);
2463 		c->word_offset += num_words;
2464 		c->bit_offset = c->word_offset * BITS_PER_LONG;
2465 
2466 		c->packets[1]++;
2467 		c->bytes[1] += sizeof(struct p_header80) + len;
2468 
2469 		if (c->bit_offset > c->bm_bits)
2470 			c->bit_offset = c->bm_bits;
2471 	}
2472 	if (ok) {
2473 		if (len == 0) {
2474 			INFO_bm_xfer_stats(mdev, "send", c);
2475 			return 0;
2476 		} else
2477 			return 1;
2478 	}
2479 	return -EIO;
2480 }
2481 
2482 /* See the comment at receive_bitmap() */
2483 int _drbd_send_bitmap(struct drbd_conf *mdev)
2484 {
2485 	struct bm_xfer_ctx c;
2486 	struct p_header80 *p;
2487 	int err;
2488 
2489 	ERR_IF(!mdev->bitmap) return false;
2490 
2491 	/* maybe we should use some per thread scratch page,
2492 	 * and allocate that during initial device creation? */
2493 	p = (struct p_header80 *) __get_free_page(GFP_NOIO);
2494 	if (!p) {
2495 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
2496 		return false;
2497 	}
2498 
2499 	if (get_ldev(mdev)) {
2500 		if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2501 			dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2502 			drbd_bm_set_all(mdev);
2503 			if (drbd_bm_write(mdev)) {
2504 				/* write_bm did fail! Leave full sync flag set in Meta P_DATA
2505 				 * but otherwise process as per normal - need to tell other
2506 				 * side that a full resync is required! */
2507 				dev_err(DEV, "Failed to write bitmap to disk!\n");
2508 			} else {
2509 				drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2510 				drbd_md_sync(mdev);
2511 			}
2512 		}
2513 		put_ldev(mdev);
2514 	}
2515 
2516 	c = (struct bm_xfer_ctx) {
2517 		.bm_bits = drbd_bm_bits(mdev),
2518 		.bm_words = drbd_bm_words(mdev),
2519 	};
2520 
2521 	do {
2522 		err = send_bitmap_rle_or_plain(mdev, p, &c);
2523 	} while (err > 0);
2524 
2525 	free_page((unsigned long) p);
2526 	return err == 0;
2527 }
2528 
2529 int drbd_send_bitmap(struct drbd_conf *mdev)
2530 {
2531 	int err;
2532 
2533 	if (!drbd_get_data_sock(mdev))
2534 		return -1;
2535 	err = !_drbd_send_bitmap(mdev);
2536 	drbd_put_data_sock(mdev);
2537 	return err;
2538 }
2539 
2540 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2541 {
2542 	int ok;
2543 	struct p_barrier_ack p;
2544 
2545 	p.barrier  = barrier_nr;
2546 	p.set_size = cpu_to_be32(set_size);
2547 
2548 	if (mdev->state.conn < C_CONNECTED)
2549 		return false;
2550 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2551 			(struct p_header80 *)&p, sizeof(p));
2552 	return ok;
2553 }
2554 
2555 /**
2556  * _drbd_send_ack() - Sends an ack packet
2557  * @mdev:	DRBD device.
2558  * @cmd:	Packet command code.
2559  * @sector:	sector, needs to be in big endian byte order
2560  * @blksize:	size in byte, needs to be in big endian byte order
2561  * @block_id:	Id, big endian byte order
2562  */
2563 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2564 			  u64 sector,
2565 			  u32 blksize,
2566 			  u64 block_id)
2567 {
2568 	int ok;
2569 	struct p_block_ack p;
2570 
2571 	p.sector   = sector;
2572 	p.block_id = block_id;
2573 	p.blksize  = blksize;
2574 	p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2575 
2576 	if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2577 		return false;
2578 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2579 				(struct p_header80 *)&p, sizeof(p));
2580 	return ok;
2581 }
2582 
2583 /* dp->sector and dp->block_id already/still in network byte order,
2584  * data_size is payload size according to dp->head,
2585  * and may need to be corrected for digest size. */
2586 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2587 		     struct p_data *dp, int data_size)
2588 {
2589 	data_size -= (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
2590 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
2591 	return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2592 			      dp->block_id);
2593 }
2594 
2595 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2596 		     struct p_block_req *rp)
2597 {
2598 	return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2599 }
2600 
2601 /**
2602  * drbd_send_ack() - Sends an ack packet
2603  * @mdev:	DRBD device.
2604  * @cmd:	Packet command code.
2605  * @e:		Epoch entry.
2606  */
2607 int drbd_send_ack(struct drbd_conf *mdev,
2608 	enum drbd_packets cmd, struct drbd_epoch_entry *e)
2609 {
2610 	return _drbd_send_ack(mdev, cmd,
2611 			      cpu_to_be64(e->sector),
2612 			      cpu_to_be32(e->size),
2613 			      e->block_id);
2614 }
2615 
2616 /* This function misuses the block_id field to signal if the blocks
2617  * are is sync or not. */
2618 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2619 		     sector_t sector, int blksize, u64 block_id)
2620 {
2621 	return _drbd_send_ack(mdev, cmd,
2622 			      cpu_to_be64(sector),
2623 			      cpu_to_be32(blksize),
2624 			      cpu_to_be64(block_id));
2625 }
2626 
2627 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2628 		       sector_t sector, int size, u64 block_id)
2629 {
2630 	int ok;
2631 	struct p_block_req p;
2632 
2633 	p.sector   = cpu_to_be64(sector);
2634 	p.block_id = block_id;
2635 	p.blksize  = cpu_to_be32(size);
2636 
2637 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2638 				(struct p_header80 *)&p, sizeof(p));
2639 	return ok;
2640 }
2641 
2642 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2643 			    sector_t sector, int size,
2644 			    void *digest, int digest_size,
2645 			    enum drbd_packets cmd)
2646 {
2647 	int ok;
2648 	struct p_block_req p;
2649 
2650 	p.sector   = cpu_to_be64(sector);
2651 	p.block_id = BE_DRBD_MAGIC + 0xbeef;
2652 	p.blksize  = cpu_to_be32(size);
2653 
2654 	p.head.magic   = BE_DRBD_MAGIC;
2655 	p.head.command = cpu_to_be16(cmd);
2656 	p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + digest_size);
2657 
2658 	mutex_lock(&mdev->data.mutex);
2659 
2660 	ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2661 	ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2662 
2663 	mutex_unlock(&mdev->data.mutex);
2664 
2665 	return ok;
2666 }
2667 
2668 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2669 {
2670 	int ok;
2671 	struct p_block_req p;
2672 
2673 	p.sector   = cpu_to_be64(sector);
2674 	p.block_id = BE_DRBD_MAGIC + 0xbabe;
2675 	p.blksize  = cpu_to_be32(size);
2676 
2677 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2678 			   (struct p_header80 *)&p, sizeof(p));
2679 	return ok;
2680 }
2681 
2682 /* called on sndtimeo
2683  * returns false if we should retry,
2684  * true if we think connection is dead
2685  */
2686 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2687 {
2688 	int drop_it;
2689 	/* long elapsed = (long)(jiffies - mdev->last_received); */
2690 
2691 	drop_it =   mdev->meta.socket == sock
2692 		|| !mdev->asender.task
2693 		|| get_t_state(&mdev->asender) != Running
2694 		|| mdev->state.conn < C_CONNECTED;
2695 
2696 	if (drop_it)
2697 		return true;
2698 
2699 	drop_it = !--mdev->ko_count;
2700 	if (!drop_it) {
2701 		dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2702 		       current->comm, current->pid, mdev->ko_count);
2703 		request_ping(mdev);
2704 	}
2705 
2706 	return drop_it; /* && (mdev->state == R_PRIMARY) */;
2707 }
2708 
2709 /* The idea of sendpage seems to be to put some kind of reference
2710  * to the page into the skb, and to hand it over to the NIC. In
2711  * this process get_page() gets called.
2712  *
2713  * As soon as the page was really sent over the network put_page()
2714  * gets called by some part of the network layer. [ NIC driver? ]
2715  *
2716  * [ get_page() / put_page() increment/decrement the count. If count
2717  *   reaches 0 the page will be freed. ]
2718  *
2719  * This works nicely with pages from FSs.
2720  * But this means that in protocol A we might signal IO completion too early!
2721  *
2722  * In order not to corrupt data during a resync we must make sure
2723  * that we do not reuse our own buffer pages (EEs) to early, therefore
2724  * we have the net_ee list.
2725  *
2726  * XFS seems to have problems, still, it submits pages with page_count == 0!
2727  * As a workaround, we disable sendpage on pages
2728  * with page_count == 0 or PageSlab.
2729  */
2730 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2731 		   int offset, size_t size, unsigned msg_flags)
2732 {
2733 	int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, msg_flags);
2734 	kunmap(page);
2735 	if (sent == size)
2736 		mdev->send_cnt += size>>9;
2737 	return sent == size;
2738 }
2739 
2740 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2741 		    int offset, size_t size, unsigned msg_flags)
2742 {
2743 	mm_segment_t oldfs = get_fs();
2744 	int sent, ok;
2745 	int len = size;
2746 
2747 	/* e.g. XFS meta- & log-data is in slab pages, which have a
2748 	 * page_count of 0 and/or have PageSlab() set.
2749 	 * we cannot use send_page for those, as that does get_page();
2750 	 * put_page(); and would cause either a VM_BUG directly, or
2751 	 * __page_cache_release a page that would actually still be referenced
2752 	 * by someone, leading to some obscure delayed Oops somewhere else. */
2753 	if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2754 		return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
2755 
2756 	msg_flags |= MSG_NOSIGNAL;
2757 	drbd_update_congested(mdev);
2758 	set_fs(KERNEL_DS);
2759 	do {
2760 		sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2761 							offset, len,
2762 							msg_flags);
2763 		if (sent == -EAGAIN) {
2764 			if (we_should_drop_the_connection(mdev,
2765 							  mdev->data.socket))
2766 				break;
2767 			else
2768 				continue;
2769 		}
2770 		if (sent <= 0) {
2771 			dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2772 			     __func__, (int)size, len, sent);
2773 			break;
2774 		}
2775 		len    -= sent;
2776 		offset += sent;
2777 	} while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2778 	set_fs(oldfs);
2779 	clear_bit(NET_CONGESTED, &mdev->flags);
2780 
2781 	ok = (len == 0);
2782 	if (likely(ok))
2783 		mdev->send_cnt += size>>9;
2784 	return ok;
2785 }
2786 
2787 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2788 {
2789 	struct bio_vec *bvec;
2790 	int i;
2791 	/* hint all but last page with MSG_MORE */
2792 	bio_for_each_segment(bvec, bio, i) {
2793 		if (!_drbd_no_send_page(mdev, bvec->bv_page,
2794 				     bvec->bv_offset, bvec->bv_len,
2795 				     i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2796 			return 0;
2797 	}
2798 	return 1;
2799 }
2800 
2801 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2802 {
2803 	struct bio_vec *bvec;
2804 	int i;
2805 	/* hint all but last page with MSG_MORE */
2806 	bio_for_each_segment(bvec, bio, i) {
2807 		if (!_drbd_send_page(mdev, bvec->bv_page,
2808 				     bvec->bv_offset, bvec->bv_len,
2809 				     i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2810 			return 0;
2811 	}
2812 	return 1;
2813 }
2814 
2815 static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2816 {
2817 	struct page *page = e->pages;
2818 	unsigned len = e->size;
2819 	/* hint all but last page with MSG_MORE */
2820 	page_chain_for_each(page) {
2821 		unsigned l = min_t(unsigned, len, PAGE_SIZE);
2822 		if (!_drbd_send_page(mdev, page, 0, l,
2823 				page_chain_next(page) ? MSG_MORE : 0))
2824 			return 0;
2825 		len -= l;
2826 	}
2827 	return 1;
2828 }
2829 
2830 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
2831 {
2832 	if (mdev->agreed_pro_version >= 95)
2833 		return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
2834 			(bi_rw & REQ_FUA ? DP_FUA : 0) |
2835 			(bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
2836 			(bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
2837 	else
2838 		return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
2839 }
2840 
2841 /* Used to send write requests
2842  * R_PRIMARY -> Peer	(P_DATA)
2843  */
2844 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2845 {
2846 	int ok = 1;
2847 	struct p_data p;
2848 	unsigned int dp_flags = 0;
2849 	void *dgb;
2850 	int dgs;
2851 
2852 	if (!drbd_get_data_sock(mdev))
2853 		return 0;
2854 
2855 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2856 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2857 
2858 	if (req->size <= DRBD_MAX_SIZE_H80_PACKET) {
2859 		p.head.h80.magic   = BE_DRBD_MAGIC;
2860 		p.head.h80.command = cpu_to_be16(P_DATA);
2861 		p.head.h80.length  =
2862 			cpu_to_be16(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2863 	} else {
2864 		p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2865 		p.head.h95.command = cpu_to_be16(P_DATA);
2866 		p.head.h95.length  =
2867 			cpu_to_be32(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2868 	}
2869 
2870 	p.sector   = cpu_to_be64(req->sector);
2871 	p.block_id = (unsigned long)req;
2872 	p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2873 
2874 	dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
2875 
2876 	if (mdev->state.conn >= C_SYNC_SOURCE &&
2877 	    mdev->state.conn <= C_PAUSED_SYNC_T)
2878 		dp_flags |= DP_MAY_SET_IN_SYNC;
2879 
2880 	p.dp_flags = cpu_to_be32(dp_flags);
2881 	set_bit(UNPLUG_REMOTE, &mdev->flags);
2882 	ok = (sizeof(p) ==
2883 		drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
2884 	if (ok && dgs) {
2885 		dgb = mdev->int_dig_out;
2886 		drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2887 		ok = dgs == drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2888 	}
2889 	if (ok) {
2890 		/* For protocol A, we have to memcpy the payload into
2891 		 * socket buffers, as we may complete right away
2892 		 * as soon as we handed it over to tcp, at which point the data
2893 		 * pages may become invalid.
2894 		 *
2895 		 * For data-integrity enabled, we copy it as well, so we can be
2896 		 * sure that even if the bio pages may still be modified, it
2897 		 * won't change the data on the wire, thus if the digest checks
2898 		 * out ok after sending on this side, but does not fit on the
2899 		 * receiving side, we sure have detected corruption elsewhere.
2900 		 */
2901 		if (mdev->net_conf->wire_protocol == DRBD_PROT_A || dgs)
2902 			ok = _drbd_send_bio(mdev, req->master_bio);
2903 		else
2904 			ok = _drbd_send_zc_bio(mdev, req->master_bio);
2905 
2906 		/* double check digest, sometimes buffers have been modified in flight. */
2907 		if (dgs > 0 && dgs <= 64) {
2908 			/* 64 byte, 512 bit, is the largest digest size
2909 			 * currently supported in kernel crypto. */
2910 			unsigned char digest[64];
2911 			drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, digest);
2912 			if (memcmp(mdev->int_dig_out, digest, dgs)) {
2913 				dev_warn(DEV,
2914 					"Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
2915 					(unsigned long long)req->sector, req->size);
2916 			}
2917 		} /* else if (dgs > 64) {
2918 		     ... Be noisy about digest too large ...
2919 		} */
2920 	}
2921 
2922 	drbd_put_data_sock(mdev);
2923 
2924 	return ok;
2925 }
2926 
2927 /* answer packet, used to send data back for read requests:
2928  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2929  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2930  */
2931 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2932 		    struct drbd_epoch_entry *e)
2933 {
2934 	int ok;
2935 	struct p_data p;
2936 	void *dgb;
2937 	int dgs;
2938 
2939 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2940 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2941 
2942 	if (e->size <= DRBD_MAX_SIZE_H80_PACKET) {
2943 		p.head.h80.magic   = BE_DRBD_MAGIC;
2944 		p.head.h80.command = cpu_to_be16(cmd);
2945 		p.head.h80.length  =
2946 			cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2947 	} else {
2948 		p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2949 		p.head.h95.command = cpu_to_be16(cmd);
2950 		p.head.h95.length  =
2951 			cpu_to_be32(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2952 	}
2953 
2954 	p.sector   = cpu_to_be64(e->sector);
2955 	p.block_id = e->block_id;
2956 	/* p.seq_num  = 0;    No sequence numbers here.. */
2957 
2958 	/* Only called by our kernel thread.
2959 	 * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2960 	 * in response to admin command or module unload.
2961 	 */
2962 	if (!drbd_get_data_sock(mdev))
2963 		return 0;
2964 
2965 	ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0);
2966 	if (ok && dgs) {
2967 		dgb = mdev->int_dig_out;
2968 		drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
2969 		ok = dgs == drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2970 	}
2971 	if (ok)
2972 		ok = _drbd_send_zc_ee(mdev, e);
2973 
2974 	drbd_put_data_sock(mdev);
2975 
2976 	return ok;
2977 }
2978 
2979 int drbd_send_oos(struct drbd_conf *mdev, struct drbd_request *req)
2980 {
2981 	struct p_block_desc p;
2982 
2983 	p.sector  = cpu_to_be64(req->sector);
2984 	p.blksize = cpu_to_be32(req->size);
2985 
2986 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OUT_OF_SYNC, &p.head, sizeof(p));
2987 }
2988 
2989 /*
2990   drbd_send distinguishes two cases:
2991 
2992   Packets sent via the data socket "sock"
2993   and packets sent via the meta data socket "msock"
2994 
2995 		    sock                      msock
2996   -----------------+-------------------------+------------------------------
2997   timeout           conf.timeout / 2          conf.timeout / 2
2998   timeout action    send a ping via msock     Abort communication
2999 					      and close all sockets
3000 */
3001 
3002 /*
3003  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
3004  */
3005 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
3006 	      void *buf, size_t size, unsigned msg_flags)
3007 {
3008 	struct kvec iov;
3009 	struct msghdr msg;
3010 	int rv, sent = 0;
3011 
3012 	if (!sock)
3013 		return -1000;
3014 
3015 	/* THINK  if (signal_pending) return ... ? */
3016 
3017 	iov.iov_base = buf;
3018 	iov.iov_len  = size;
3019 
3020 	msg.msg_name       = NULL;
3021 	msg.msg_namelen    = 0;
3022 	msg.msg_control    = NULL;
3023 	msg.msg_controllen = 0;
3024 	msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
3025 
3026 	if (sock == mdev->data.socket) {
3027 		mdev->ko_count = mdev->net_conf->ko_count;
3028 		drbd_update_congested(mdev);
3029 	}
3030 	do {
3031 		/* STRANGE
3032 		 * tcp_sendmsg does _not_ use its size parameter at all ?
3033 		 *
3034 		 * -EAGAIN on timeout, -EINTR on signal.
3035 		 */
3036 /* THINK
3037  * do we need to block DRBD_SIG if sock == &meta.socket ??
3038  * otherwise wake_asender() might interrupt some send_*Ack !
3039  */
3040 		rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
3041 		if (rv == -EAGAIN) {
3042 			if (we_should_drop_the_connection(mdev, sock))
3043 				break;
3044 			else
3045 				continue;
3046 		}
3047 		D_ASSERT(rv != 0);
3048 		if (rv == -EINTR) {
3049 			flush_signals(current);
3050 			rv = 0;
3051 		}
3052 		if (rv < 0)
3053 			break;
3054 		sent += rv;
3055 		iov.iov_base += rv;
3056 		iov.iov_len  -= rv;
3057 	} while (sent < size);
3058 
3059 	if (sock == mdev->data.socket)
3060 		clear_bit(NET_CONGESTED, &mdev->flags);
3061 
3062 	if (rv <= 0) {
3063 		if (rv != -EAGAIN) {
3064 			dev_err(DEV, "%s_sendmsg returned %d\n",
3065 			    sock == mdev->meta.socket ? "msock" : "sock",
3066 			    rv);
3067 			drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
3068 		} else
3069 			drbd_force_state(mdev, NS(conn, C_TIMEOUT));
3070 	}
3071 
3072 	return sent;
3073 }
3074 
3075 static int drbd_open(struct block_device *bdev, fmode_t mode)
3076 {
3077 	struct drbd_conf *mdev = bdev->bd_disk->private_data;
3078 	unsigned long flags;
3079 	int rv = 0;
3080 
3081 	mutex_lock(&drbd_main_mutex);
3082 	spin_lock_irqsave(&mdev->req_lock, flags);
3083 	/* to have a stable mdev->state.role
3084 	 * and no race with updating open_cnt */
3085 
3086 	if (mdev->state.role != R_PRIMARY) {
3087 		if (mode & FMODE_WRITE)
3088 			rv = -EROFS;
3089 		else if (!allow_oos)
3090 			rv = -EMEDIUMTYPE;
3091 	}
3092 
3093 	if (!rv)
3094 		mdev->open_cnt++;
3095 	spin_unlock_irqrestore(&mdev->req_lock, flags);
3096 	mutex_unlock(&drbd_main_mutex);
3097 
3098 	return rv;
3099 }
3100 
3101 static int drbd_release(struct gendisk *gd, fmode_t mode)
3102 {
3103 	struct drbd_conf *mdev = gd->private_data;
3104 	mutex_lock(&drbd_main_mutex);
3105 	mdev->open_cnt--;
3106 	mutex_unlock(&drbd_main_mutex);
3107 	return 0;
3108 }
3109 
3110 static void drbd_set_defaults(struct drbd_conf *mdev)
3111 {
3112 	/* This way we get a compile error when sync_conf grows,
3113 	   and we forgot to initialize it here */
3114 	mdev->sync_conf = (struct syncer_conf) {
3115 		/* .rate = */		DRBD_RATE_DEF,
3116 		/* .after = */		DRBD_AFTER_DEF,
3117 		/* .al_extents = */	DRBD_AL_EXTENTS_DEF,
3118 		/* .verify_alg = */	{}, 0,
3119 		/* .cpu_mask = */	{}, 0,
3120 		/* .csums_alg = */	{}, 0,
3121 		/* .use_rle = */	0,
3122 		/* .on_no_data = */	DRBD_ON_NO_DATA_DEF,
3123 		/* .c_plan_ahead = */	DRBD_C_PLAN_AHEAD_DEF,
3124 		/* .c_delay_target = */	DRBD_C_DELAY_TARGET_DEF,
3125 		/* .c_fill_target = */	DRBD_C_FILL_TARGET_DEF,
3126 		/* .c_max_rate = */	DRBD_C_MAX_RATE_DEF,
3127 		/* .c_min_rate = */	DRBD_C_MIN_RATE_DEF
3128 	};
3129 
3130 	/* Have to use that way, because the layout differs between
3131 	   big endian and little endian */
3132 	mdev->state = (union drbd_state) {
3133 		{ .role = R_SECONDARY,
3134 		  .peer = R_UNKNOWN,
3135 		  .conn = C_STANDALONE,
3136 		  .disk = D_DISKLESS,
3137 		  .pdsk = D_UNKNOWN,
3138 		  .susp = 0,
3139 		  .susp_nod = 0,
3140 		  .susp_fen = 0
3141 		} };
3142 }
3143 
3144 void drbd_init_set_defaults(struct drbd_conf *mdev)
3145 {
3146 	/* the memset(,0,) did most of this.
3147 	 * note: only assignments, no allocation in here */
3148 
3149 	drbd_set_defaults(mdev);
3150 
3151 	atomic_set(&mdev->ap_bio_cnt, 0);
3152 	atomic_set(&mdev->ap_pending_cnt, 0);
3153 	atomic_set(&mdev->rs_pending_cnt, 0);
3154 	atomic_set(&mdev->unacked_cnt, 0);
3155 	atomic_set(&mdev->local_cnt, 0);
3156 	atomic_set(&mdev->net_cnt, 0);
3157 	atomic_set(&mdev->packet_seq, 0);
3158 	atomic_set(&mdev->pp_in_use, 0);
3159 	atomic_set(&mdev->pp_in_use_by_net, 0);
3160 	atomic_set(&mdev->rs_sect_in, 0);
3161 	atomic_set(&mdev->rs_sect_ev, 0);
3162 	atomic_set(&mdev->ap_in_flight, 0);
3163 	atomic_set(&mdev->md_io_in_use, 0);
3164 
3165 	mutex_init(&mdev->data.mutex);
3166 	mutex_init(&mdev->meta.mutex);
3167 	sema_init(&mdev->data.work.s, 0);
3168 	sema_init(&mdev->meta.work.s, 0);
3169 	mutex_init(&mdev->state_mutex);
3170 
3171 	spin_lock_init(&mdev->data.work.q_lock);
3172 	spin_lock_init(&mdev->meta.work.q_lock);
3173 
3174 	spin_lock_init(&mdev->al_lock);
3175 	spin_lock_init(&mdev->req_lock);
3176 	spin_lock_init(&mdev->peer_seq_lock);
3177 	spin_lock_init(&mdev->epoch_lock);
3178 
3179 	INIT_LIST_HEAD(&mdev->active_ee);
3180 	INIT_LIST_HEAD(&mdev->sync_ee);
3181 	INIT_LIST_HEAD(&mdev->done_ee);
3182 	INIT_LIST_HEAD(&mdev->read_ee);
3183 	INIT_LIST_HEAD(&mdev->net_ee);
3184 	INIT_LIST_HEAD(&mdev->resync_reads);
3185 	INIT_LIST_HEAD(&mdev->data.work.q);
3186 	INIT_LIST_HEAD(&mdev->meta.work.q);
3187 	INIT_LIST_HEAD(&mdev->resync_work.list);
3188 	INIT_LIST_HEAD(&mdev->unplug_work.list);
3189 	INIT_LIST_HEAD(&mdev->go_diskless.list);
3190 	INIT_LIST_HEAD(&mdev->md_sync_work.list);
3191 	INIT_LIST_HEAD(&mdev->start_resync_work.list);
3192 	INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
3193 
3194 	mdev->resync_work.cb  = w_resync_timer;
3195 	mdev->unplug_work.cb  = w_send_write_hint;
3196 	mdev->go_diskless.cb  = w_go_diskless;
3197 	mdev->md_sync_work.cb = w_md_sync;
3198 	mdev->bm_io_work.w.cb = w_bitmap_io;
3199 	mdev->start_resync_work.cb = w_start_resync;
3200 	init_timer(&mdev->resync_timer);
3201 	init_timer(&mdev->md_sync_timer);
3202 	init_timer(&mdev->start_resync_timer);
3203 	init_timer(&mdev->request_timer);
3204 	mdev->resync_timer.function = resync_timer_fn;
3205 	mdev->resync_timer.data = (unsigned long) mdev;
3206 	mdev->md_sync_timer.function = md_sync_timer_fn;
3207 	mdev->md_sync_timer.data = (unsigned long) mdev;
3208 	mdev->start_resync_timer.function = start_resync_timer_fn;
3209 	mdev->start_resync_timer.data = (unsigned long) mdev;
3210 	mdev->request_timer.function = request_timer_fn;
3211 	mdev->request_timer.data = (unsigned long) mdev;
3212 
3213 	init_waitqueue_head(&mdev->misc_wait);
3214 	init_waitqueue_head(&mdev->state_wait);
3215 	init_waitqueue_head(&mdev->net_cnt_wait);
3216 	init_waitqueue_head(&mdev->ee_wait);
3217 	init_waitqueue_head(&mdev->al_wait);
3218 	init_waitqueue_head(&mdev->seq_wait);
3219 
3220 	drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
3221 	drbd_thread_init(mdev, &mdev->worker, drbd_worker);
3222 	drbd_thread_init(mdev, &mdev->asender, drbd_asender);
3223 
3224 	mdev->agreed_pro_version = PRO_VERSION_MAX;
3225 	mdev->write_ordering = WO_bdev_flush;
3226 	mdev->resync_wenr = LC_FREE;
3227 	mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
3228 	mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
3229 }
3230 
3231 void drbd_mdev_cleanup(struct drbd_conf *mdev)
3232 {
3233 	int i;
3234 	if (mdev->receiver.t_state != None)
3235 		dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
3236 				mdev->receiver.t_state);
3237 
3238 	/* no need to lock it, I'm the only thread alive */
3239 	if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
3240 		dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
3241 	mdev->al_writ_cnt  =
3242 	mdev->bm_writ_cnt  =
3243 	mdev->read_cnt     =
3244 	mdev->recv_cnt     =
3245 	mdev->send_cnt     =
3246 	mdev->writ_cnt     =
3247 	mdev->p_size       =
3248 	mdev->rs_start     =
3249 	mdev->rs_total     =
3250 	mdev->rs_failed    = 0;
3251 	mdev->rs_last_events = 0;
3252 	mdev->rs_last_sect_ev = 0;
3253 	for (i = 0; i < DRBD_SYNC_MARKS; i++) {
3254 		mdev->rs_mark_left[i] = 0;
3255 		mdev->rs_mark_time[i] = 0;
3256 	}
3257 	D_ASSERT(mdev->net_conf == NULL);
3258 
3259 	drbd_set_my_capacity(mdev, 0);
3260 	if (mdev->bitmap) {
3261 		/* maybe never allocated. */
3262 		drbd_bm_resize(mdev, 0, 1);
3263 		drbd_bm_cleanup(mdev);
3264 	}
3265 
3266 	drbd_free_resources(mdev);
3267 	clear_bit(AL_SUSPENDED, &mdev->flags);
3268 
3269 	/*
3270 	 * currently we drbd_init_ee only on module load, so
3271 	 * we may do drbd_release_ee only on module unload!
3272 	 */
3273 	D_ASSERT(list_empty(&mdev->active_ee));
3274 	D_ASSERT(list_empty(&mdev->sync_ee));
3275 	D_ASSERT(list_empty(&mdev->done_ee));
3276 	D_ASSERT(list_empty(&mdev->read_ee));
3277 	D_ASSERT(list_empty(&mdev->net_ee));
3278 	D_ASSERT(list_empty(&mdev->resync_reads));
3279 	D_ASSERT(list_empty(&mdev->data.work.q));
3280 	D_ASSERT(list_empty(&mdev->meta.work.q));
3281 	D_ASSERT(list_empty(&mdev->resync_work.list));
3282 	D_ASSERT(list_empty(&mdev->unplug_work.list));
3283 	D_ASSERT(list_empty(&mdev->go_diskless.list));
3284 
3285 	drbd_set_defaults(mdev);
3286 }
3287 
3288 
3289 static void drbd_destroy_mempools(void)
3290 {
3291 	struct page *page;
3292 
3293 	while (drbd_pp_pool) {
3294 		page = drbd_pp_pool;
3295 		drbd_pp_pool = (struct page *)page_private(page);
3296 		__free_page(page);
3297 		drbd_pp_vacant--;
3298 	}
3299 
3300 	/* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
3301 
3302 	if (drbd_md_io_bio_set)
3303 		bioset_free(drbd_md_io_bio_set);
3304 	if (drbd_md_io_page_pool)
3305 		mempool_destroy(drbd_md_io_page_pool);
3306 	if (drbd_ee_mempool)
3307 		mempool_destroy(drbd_ee_mempool);
3308 	if (drbd_request_mempool)
3309 		mempool_destroy(drbd_request_mempool);
3310 	if (drbd_ee_cache)
3311 		kmem_cache_destroy(drbd_ee_cache);
3312 	if (drbd_request_cache)
3313 		kmem_cache_destroy(drbd_request_cache);
3314 	if (drbd_bm_ext_cache)
3315 		kmem_cache_destroy(drbd_bm_ext_cache);
3316 	if (drbd_al_ext_cache)
3317 		kmem_cache_destroy(drbd_al_ext_cache);
3318 
3319 	drbd_md_io_bio_set   = NULL;
3320 	drbd_md_io_page_pool = NULL;
3321 	drbd_ee_mempool      = NULL;
3322 	drbd_request_mempool = NULL;
3323 	drbd_ee_cache        = NULL;
3324 	drbd_request_cache   = NULL;
3325 	drbd_bm_ext_cache    = NULL;
3326 	drbd_al_ext_cache    = NULL;
3327 
3328 	return;
3329 }
3330 
3331 static int drbd_create_mempools(void)
3332 {
3333 	struct page *page;
3334 	const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
3335 	int i;
3336 
3337 	/* prepare our caches and mempools */
3338 	drbd_request_mempool = NULL;
3339 	drbd_ee_cache        = NULL;
3340 	drbd_request_cache   = NULL;
3341 	drbd_bm_ext_cache    = NULL;
3342 	drbd_al_ext_cache    = NULL;
3343 	drbd_pp_pool         = NULL;
3344 	drbd_md_io_page_pool = NULL;
3345 	drbd_md_io_bio_set   = NULL;
3346 
3347 	/* caches */
3348 	drbd_request_cache = kmem_cache_create(
3349 		"drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
3350 	if (drbd_request_cache == NULL)
3351 		goto Enomem;
3352 
3353 	drbd_ee_cache = kmem_cache_create(
3354 		"drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
3355 	if (drbd_ee_cache == NULL)
3356 		goto Enomem;
3357 
3358 	drbd_bm_ext_cache = kmem_cache_create(
3359 		"drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
3360 	if (drbd_bm_ext_cache == NULL)
3361 		goto Enomem;
3362 
3363 	drbd_al_ext_cache = kmem_cache_create(
3364 		"drbd_al", sizeof(struct lc_element), 0, 0, NULL);
3365 	if (drbd_al_ext_cache == NULL)
3366 		goto Enomem;
3367 
3368 	/* mempools */
3369 #ifdef COMPAT_HAVE_BIOSET_CREATE
3370 	drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
3371 	if (drbd_md_io_bio_set == NULL)
3372 		goto Enomem;
3373 #endif
3374 
3375 	drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
3376 	if (drbd_md_io_page_pool == NULL)
3377 		goto Enomem;
3378 
3379 	drbd_request_mempool = mempool_create(number,
3380 		mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
3381 	if (drbd_request_mempool == NULL)
3382 		goto Enomem;
3383 
3384 	drbd_ee_mempool = mempool_create(number,
3385 		mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
3386 	if (drbd_ee_mempool == NULL)
3387 		goto Enomem;
3388 
3389 	/* drbd's page pool */
3390 	spin_lock_init(&drbd_pp_lock);
3391 
3392 	for (i = 0; i < number; i++) {
3393 		page = alloc_page(GFP_HIGHUSER);
3394 		if (!page)
3395 			goto Enomem;
3396 		set_page_private(page, (unsigned long)drbd_pp_pool);
3397 		drbd_pp_pool = page;
3398 	}
3399 	drbd_pp_vacant = number;
3400 
3401 	return 0;
3402 
3403 Enomem:
3404 	drbd_destroy_mempools(); /* in case we allocated some */
3405 	return -ENOMEM;
3406 }
3407 
3408 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
3409 	void *unused)
3410 {
3411 	/* just so we have it.  you never know what interesting things we
3412 	 * might want to do here some day...
3413 	 */
3414 
3415 	return NOTIFY_DONE;
3416 }
3417 
3418 static struct notifier_block drbd_notifier = {
3419 	.notifier_call = drbd_notify_sys,
3420 };
3421 
3422 static void drbd_release_ee_lists(struct drbd_conf *mdev)
3423 {
3424 	int rr;
3425 
3426 	rr = drbd_release_ee(mdev, &mdev->active_ee);
3427 	if (rr)
3428 		dev_err(DEV, "%d EEs in active list found!\n", rr);
3429 
3430 	rr = drbd_release_ee(mdev, &mdev->sync_ee);
3431 	if (rr)
3432 		dev_err(DEV, "%d EEs in sync list found!\n", rr);
3433 
3434 	rr = drbd_release_ee(mdev, &mdev->read_ee);
3435 	if (rr)
3436 		dev_err(DEV, "%d EEs in read list found!\n", rr);
3437 
3438 	rr = drbd_release_ee(mdev, &mdev->done_ee);
3439 	if (rr)
3440 		dev_err(DEV, "%d EEs in done list found!\n", rr);
3441 
3442 	rr = drbd_release_ee(mdev, &mdev->net_ee);
3443 	if (rr)
3444 		dev_err(DEV, "%d EEs in net list found!\n", rr);
3445 }
3446 
3447 /* caution. no locking.
3448  * currently only used from module cleanup code. */
3449 static void drbd_delete_device(unsigned int minor)
3450 {
3451 	struct drbd_conf *mdev = minor_to_mdev(minor);
3452 
3453 	if (!mdev)
3454 		return;
3455 
3456 	del_timer_sync(&mdev->request_timer);
3457 
3458 	/* paranoia asserts */
3459 	if (mdev->open_cnt != 0)
3460 		dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
3461 				__FILE__ , __LINE__);
3462 
3463 	ERR_IF (!list_empty(&mdev->data.work.q)) {
3464 		struct list_head *lp;
3465 		list_for_each(lp, &mdev->data.work.q) {
3466 			dev_err(DEV, "lp = %p\n", lp);
3467 		}
3468 	};
3469 	/* end paranoia asserts */
3470 
3471 	del_gendisk(mdev->vdisk);
3472 
3473 	/* cleanup stuff that may have been allocated during
3474 	 * device (re-)configuration or state changes */
3475 
3476 	if (mdev->this_bdev)
3477 		bdput(mdev->this_bdev);
3478 
3479 	drbd_free_resources(mdev);
3480 
3481 	drbd_release_ee_lists(mdev);
3482 
3483 	/* should be freed on disconnect? */
3484 	kfree(mdev->ee_hash);
3485 	/*
3486 	mdev->ee_hash_s = 0;
3487 	mdev->ee_hash = NULL;
3488 	*/
3489 
3490 	lc_destroy(mdev->act_log);
3491 	lc_destroy(mdev->resync);
3492 
3493 	kfree(mdev->p_uuid);
3494 	/* mdev->p_uuid = NULL; */
3495 
3496 	kfree(mdev->int_dig_out);
3497 	kfree(mdev->int_dig_in);
3498 	kfree(mdev->int_dig_vv);
3499 
3500 	/* cleanup the rest that has been
3501 	 * allocated from drbd_new_device
3502 	 * and actually free the mdev itself */
3503 	drbd_free_mdev(mdev);
3504 }
3505 
3506 static void drbd_cleanup(void)
3507 {
3508 	unsigned int i;
3509 
3510 	unregister_reboot_notifier(&drbd_notifier);
3511 
3512 	/* first remove proc,
3513 	 * drbdsetup uses it's presence to detect
3514 	 * whether DRBD is loaded.
3515 	 * If we would get stuck in proc removal,
3516 	 * but have netlink already deregistered,
3517 	 * some drbdsetup commands may wait forever
3518 	 * for an answer.
3519 	 */
3520 	if (drbd_proc)
3521 		remove_proc_entry("drbd", NULL);
3522 
3523 	drbd_nl_cleanup();
3524 
3525 	if (minor_table) {
3526 		i = minor_count;
3527 		while (i--)
3528 			drbd_delete_device(i);
3529 		drbd_destroy_mempools();
3530 	}
3531 
3532 	kfree(minor_table);
3533 
3534 	unregister_blkdev(DRBD_MAJOR, "drbd");
3535 
3536 	printk(KERN_INFO "drbd: module cleanup done.\n");
3537 }
3538 
3539 /**
3540  * drbd_congested() - Callback for pdflush
3541  * @congested_data:	User data
3542  * @bdi_bits:		Bits pdflush is currently interested in
3543  *
3544  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
3545  */
3546 static int drbd_congested(void *congested_data, int bdi_bits)
3547 {
3548 	struct drbd_conf *mdev = congested_data;
3549 	struct request_queue *q;
3550 	char reason = '-';
3551 	int r = 0;
3552 
3553 	if (!may_inc_ap_bio(mdev)) {
3554 		/* DRBD has frozen IO */
3555 		r = bdi_bits;
3556 		reason = 'd';
3557 		goto out;
3558 	}
3559 
3560 	if (test_bit(CALLBACK_PENDING, &mdev->flags)) {
3561 		r |= (1 << BDI_async_congested);
3562 		/* Without good local data, we would need to read from remote,
3563 		 * and that would need the worker thread as well, which is
3564 		 * currently blocked waiting for that usermode helper to
3565 		 * finish.
3566 		 */
3567 		if (!get_ldev_if_state(mdev, D_UP_TO_DATE))
3568 			r |= (1 << BDI_sync_congested);
3569 		else
3570 			put_ldev(mdev);
3571 		r &= bdi_bits;
3572 		reason = 'c';
3573 		goto out;
3574 	}
3575 
3576 	if (get_ldev(mdev)) {
3577 		q = bdev_get_queue(mdev->ldev->backing_bdev);
3578 		r = bdi_congested(&q->backing_dev_info, bdi_bits);
3579 		put_ldev(mdev);
3580 		if (r)
3581 			reason = 'b';
3582 	}
3583 
3584 	if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
3585 		r |= (1 << BDI_async_congested);
3586 		reason = reason == 'b' ? 'a' : 'n';
3587 	}
3588 
3589 out:
3590 	mdev->congestion_reason = reason;
3591 	return r;
3592 }
3593 
3594 struct drbd_conf *drbd_new_device(unsigned int minor)
3595 {
3596 	struct drbd_conf *mdev;
3597 	struct gendisk *disk;
3598 	struct request_queue *q;
3599 
3600 	/* GFP_KERNEL, we are outside of all write-out paths */
3601 	mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
3602 	if (!mdev)
3603 		return NULL;
3604 	if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
3605 		goto out_no_cpumask;
3606 
3607 	mdev->minor = minor;
3608 
3609 	drbd_init_set_defaults(mdev);
3610 
3611 	q = blk_alloc_queue(GFP_KERNEL);
3612 	if (!q)
3613 		goto out_no_q;
3614 	mdev->rq_queue = q;
3615 	q->queuedata   = mdev;
3616 
3617 	disk = alloc_disk(1);
3618 	if (!disk)
3619 		goto out_no_disk;
3620 	mdev->vdisk = disk;
3621 
3622 	set_disk_ro(disk, true);
3623 
3624 	disk->queue = q;
3625 	disk->major = DRBD_MAJOR;
3626 	disk->first_minor = minor;
3627 	disk->fops = &drbd_ops;
3628 	sprintf(disk->disk_name, "drbd%d", minor);
3629 	disk->private_data = mdev;
3630 
3631 	mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3632 	/* we have no partitions. we contain only ourselves. */
3633 	mdev->this_bdev->bd_contains = mdev->this_bdev;
3634 
3635 	q->backing_dev_info.congested_fn = drbd_congested;
3636 	q->backing_dev_info.congested_data = mdev;
3637 
3638 	blk_queue_make_request(q, drbd_make_request);
3639 	blk_queue_flush(q, REQ_FLUSH | REQ_FUA);
3640 	/* Setting the max_hw_sectors to an odd value of 8kibyte here
3641 	   This triggers a max_bio_size message upon first attach or connect */
3642 	blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
3643 	blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3644 	blk_queue_merge_bvec(q, drbd_merge_bvec);
3645 	q->queue_lock = &mdev->req_lock;
3646 
3647 	mdev->md_io_page = alloc_page(GFP_KERNEL);
3648 	if (!mdev->md_io_page)
3649 		goto out_no_io_page;
3650 
3651 	if (drbd_bm_init(mdev))
3652 		goto out_no_bitmap;
3653 	/* no need to lock access, we are still initializing this minor device. */
3654 	if (!tl_init(mdev))
3655 		goto out_no_tl;
3656 
3657 	mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3658 	if (!mdev->app_reads_hash)
3659 		goto out_no_app_reads;
3660 
3661 	mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3662 	if (!mdev->current_epoch)
3663 		goto out_no_epoch;
3664 
3665 	INIT_LIST_HEAD(&mdev->current_epoch->list);
3666 	mdev->epochs = 1;
3667 
3668 	return mdev;
3669 
3670 /* out_whatever_else:
3671 	kfree(mdev->current_epoch); */
3672 out_no_epoch:
3673 	kfree(mdev->app_reads_hash);
3674 out_no_app_reads:
3675 	tl_cleanup(mdev);
3676 out_no_tl:
3677 	drbd_bm_cleanup(mdev);
3678 out_no_bitmap:
3679 	__free_page(mdev->md_io_page);
3680 out_no_io_page:
3681 	put_disk(disk);
3682 out_no_disk:
3683 	blk_cleanup_queue(q);
3684 out_no_q:
3685 	free_cpumask_var(mdev->cpu_mask);
3686 out_no_cpumask:
3687 	kfree(mdev);
3688 	return NULL;
3689 }
3690 
3691 /* counterpart of drbd_new_device.
3692  * last part of drbd_delete_device. */
3693 void drbd_free_mdev(struct drbd_conf *mdev)
3694 {
3695 	kfree(mdev->current_epoch);
3696 	kfree(mdev->app_reads_hash);
3697 	tl_cleanup(mdev);
3698 	if (mdev->bitmap) /* should no longer be there. */
3699 		drbd_bm_cleanup(mdev);
3700 	__free_page(mdev->md_io_page);
3701 	put_disk(mdev->vdisk);
3702 	blk_cleanup_queue(mdev->rq_queue);
3703 	free_cpumask_var(mdev->cpu_mask);
3704 	drbd_free_tl_hash(mdev);
3705 	kfree(mdev);
3706 }
3707 
3708 
3709 int __init drbd_init(void)
3710 {
3711 	int err;
3712 
3713 	if (sizeof(struct p_handshake) != 80) {
3714 		printk(KERN_ERR
3715 		       "drbd: never change the size or layout "
3716 		       "of the HandShake packet.\n");
3717 		return -EINVAL;
3718 	}
3719 
3720 	if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
3721 		printk(KERN_ERR
3722 			"drbd: invalid minor_count (%d)\n", minor_count);
3723 #ifdef MODULE
3724 		return -EINVAL;
3725 #else
3726 		minor_count = 8;
3727 #endif
3728 	}
3729 
3730 	err = drbd_nl_init();
3731 	if (err)
3732 		return err;
3733 
3734 	err = register_blkdev(DRBD_MAJOR, "drbd");
3735 	if (err) {
3736 		printk(KERN_ERR
3737 		       "drbd: unable to register block device major %d\n",
3738 		       DRBD_MAJOR);
3739 		return err;
3740 	}
3741 
3742 	register_reboot_notifier(&drbd_notifier);
3743 
3744 	/*
3745 	 * allocate all necessary structs
3746 	 */
3747 	err = -ENOMEM;
3748 
3749 	init_waitqueue_head(&drbd_pp_wait);
3750 
3751 	drbd_proc = NULL; /* play safe for drbd_cleanup */
3752 	minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3753 				GFP_KERNEL);
3754 	if (!minor_table)
3755 		goto Enomem;
3756 
3757 	err = drbd_create_mempools();
3758 	if (err)
3759 		goto Enomem;
3760 
3761 	drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
3762 	if (!drbd_proc)	{
3763 		printk(KERN_ERR "drbd: unable to register proc file\n");
3764 		goto Enomem;
3765 	}
3766 
3767 	rwlock_init(&global_state_lock);
3768 
3769 	printk(KERN_INFO "drbd: initialized. "
3770 	       "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3771 	       API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3772 	printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3773 	printk(KERN_INFO "drbd: registered as block device major %d\n",
3774 		DRBD_MAJOR);
3775 	printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3776 
3777 	return 0; /* Success! */
3778 
3779 Enomem:
3780 	drbd_cleanup();
3781 	if (err == -ENOMEM)
3782 		/* currently always the case */
3783 		printk(KERN_ERR "drbd: ran out of memory\n");
3784 	else
3785 		printk(KERN_ERR "drbd: initialization failure\n");
3786 	return err;
3787 }
3788 
3789 void drbd_free_bc(struct drbd_backing_dev *ldev)
3790 {
3791 	if (ldev == NULL)
3792 		return;
3793 
3794 	blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
3795 	blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
3796 
3797 	kfree(ldev);
3798 }
3799 
3800 void drbd_free_sock(struct drbd_conf *mdev)
3801 {
3802 	if (mdev->data.socket) {
3803 		mutex_lock(&mdev->data.mutex);
3804 		kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3805 		sock_release(mdev->data.socket);
3806 		mdev->data.socket = NULL;
3807 		mutex_unlock(&mdev->data.mutex);
3808 	}
3809 	if (mdev->meta.socket) {
3810 		mutex_lock(&mdev->meta.mutex);
3811 		kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3812 		sock_release(mdev->meta.socket);
3813 		mdev->meta.socket = NULL;
3814 		mutex_unlock(&mdev->meta.mutex);
3815 	}
3816 }
3817 
3818 
3819 void drbd_free_resources(struct drbd_conf *mdev)
3820 {
3821 	crypto_free_hash(mdev->csums_tfm);
3822 	mdev->csums_tfm = NULL;
3823 	crypto_free_hash(mdev->verify_tfm);
3824 	mdev->verify_tfm = NULL;
3825 	crypto_free_hash(mdev->cram_hmac_tfm);
3826 	mdev->cram_hmac_tfm = NULL;
3827 	crypto_free_hash(mdev->integrity_w_tfm);
3828 	mdev->integrity_w_tfm = NULL;
3829 	crypto_free_hash(mdev->integrity_r_tfm);
3830 	mdev->integrity_r_tfm = NULL;
3831 
3832 	drbd_free_sock(mdev);
3833 
3834 	__no_warn(local,
3835 		  drbd_free_bc(mdev->ldev);
3836 		  mdev->ldev = NULL;);
3837 }
3838 
3839 /* meta data management */
3840 
3841 struct meta_data_on_disk {
3842 	u64 la_size;           /* last agreed size. */
3843 	u64 uuid[UI_SIZE];   /* UUIDs. */
3844 	u64 device_uuid;
3845 	u64 reserved_u64_1;
3846 	u32 flags;             /* MDF */
3847 	u32 magic;
3848 	u32 md_size_sect;
3849 	u32 al_offset;         /* offset to this block */
3850 	u32 al_nr_extents;     /* important for restoring the AL */
3851 	      /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3852 	u32 bm_offset;         /* offset to the bitmap, from here */
3853 	u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3854 	u32 la_peer_max_bio_size;   /* last peer max_bio_size */
3855 	u32 reserved_u32[3];
3856 
3857 } __packed;
3858 
3859 /**
3860  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3861  * @mdev:	DRBD device.
3862  */
3863 void drbd_md_sync(struct drbd_conf *mdev)
3864 {
3865 	struct meta_data_on_disk *buffer;
3866 	sector_t sector;
3867 	int i;
3868 
3869 	del_timer(&mdev->md_sync_timer);
3870 	/* timer may be rearmed by drbd_md_mark_dirty() now. */
3871 	if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3872 		return;
3873 
3874 	/* We use here D_FAILED and not D_ATTACHING because we try to write
3875 	 * metadata even if we detach due to a disk failure! */
3876 	if (!get_ldev_if_state(mdev, D_FAILED))
3877 		return;
3878 
3879 	buffer = drbd_md_get_buffer(mdev);
3880 	if (!buffer)
3881 		goto out;
3882 
3883 	memset(buffer, 0, 512);
3884 
3885 	buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3886 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3887 		buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3888 	buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3889 	buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3890 
3891 	buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3892 	buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3893 	buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3894 	buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3895 	buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3896 
3897 	buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3898 	buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
3899 
3900 	D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3901 	sector = mdev->ldev->md.md_offset;
3902 
3903 	if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3904 		/* this was a try anyways ... */
3905 		dev_err(DEV, "meta data update failed!\n");
3906 		drbd_chk_io_error(mdev, 1, DRBD_META_IO_ERROR);
3907 	}
3908 
3909 	/* Update mdev->ldev->md.la_size_sect,
3910 	 * since we updated it on metadata. */
3911 	mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3912 
3913 	drbd_md_put_buffer(mdev);
3914 out:
3915 	put_ldev(mdev);
3916 }
3917 
3918 /**
3919  * drbd_md_read() - Reads in the meta data super block
3920  * @mdev:	DRBD device.
3921  * @bdev:	Device from which the meta data should be read in.
3922  *
3923  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
3924  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3925  */
3926 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3927 {
3928 	struct meta_data_on_disk *buffer;
3929 	int i, rv = NO_ERROR;
3930 
3931 	if (!get_ldev_if_state(mdev, D_ATTACHING))
3932 		return ERR_IO_MD_DISK;
3933 
3934 	buffer = drbd_md_get_buffer(mdev);
3935 	if (!buffer)
3936 		goto out;
3937 
3938 	if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3939 		/* NOTE: can't do normal error processing here as this is
3940 		   called BEFORE disk is attached */
3941 		dev_err(DEV, "Error while reading metadata.\n");
3942 		rv = ERR_IO_MD_DISK;
3943 		goto err;
3944 	}
3945 
3946 	if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3947 		dev_err(DEV, "Error while reading metadata, magic not found.\n");
3948 		rv = ERR_MD_INVALID;
3949 		goto err;
3950 	}
3951 	if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3952 		dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3953 		    be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3954 		rv = ERR_MD_INVALID;
3955 		goto err;
3956 	}
3957 	if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3958 		dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3959 		    be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3960 		rv = ERR_MD_INVALID;
3961 		goto err;
3962 	}
3963 	if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3964 		dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3965 		    be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3966 		rv = ERR_MD_INVALID;
3967 		goto err;
3968 	}
3969 
3970 	if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3971 		dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3972 		    be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3973 		rv = ERR_MD_INVALID;
3974 		goto err;
3975 	}
3976 
3977 	bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3978 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3979 		bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3980 	bdev->md.flags = be32_to_cpu(buffer->flags);
3981 	mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3982 	bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3983 
3984 	spin_lock_irq(&mdev->req_lock);
3985 	if (mdev->state.conn < C_CONNECTED) {
3986 		unsigned int peer;
3987 		peer = be32_to_cpu(buffer->la_peer_max_bio_size);
3988 		peer = max(peer, DRBD_MAX_BIO_SIZE_SAFE);
3989 		mdev->peer_max_bio_size = peer;
3990 	}
3991 	spin_unlock_irq(&mdev->req_lock);
3992 
3993 	if (mdev->sync_conf.al_extents < 7)
3994 		mdev->sync_conf.al_extents = 127;
3995 
3996  err:
3997 	drbd_md_put_buffer(mdev);
3998  out:
3999 	put_ldev(mdev);
4000 
4001 	return rv;
4002 }
4003 
4004 /**
4005  * drbd_md_mark_dirty() - Mark meta data super block as dirty
4006  * @mdev:	DRBD device.
4007  *
4008  * Call this function if you change anything that should be written to
4009  * the meta-data super block. This function sets MD_DIRTY, and starts a
4010  * timer that ensures that within five seconds you have to call drbd_md_sync().
4011  */
4012 #ifdef DEBUG
4013 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
4014 {
4015 	if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
4016 		mod_timer(&mdev->md_sync_timer, jiffies + HZ);
4017 		mdev->last_md_mark_dirty.line = line;
4018 		mdev->last_md_mark_dirty.func = func;
4019 	}
4020 }
4021 #else
4022 void drbd_md_mark_dirty(struct drbd_conf *mdev)
4023 {
4024 	if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
4025 		mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
4026 }
4027 #endif
4028 
4029 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
4030 {
4031 	int i;
4032 
4033 	for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
4034 		mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
4035 }
4036 
4037 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
4038 {
4039 	if (idx == UI_CURRENT) {
4040 		if (mdev->state.role == R_PRIMARY)
4041 			val |= 1;
4042 		else
4043 			val &= ~((u64)1);
4044 
4045 		drbd_set_ed_uuid(mdev, val);
4046 	}
4047 
4048 	mdev->ldev->md.uuid[idx] = val;
4049 	drbd_md_mark_dirty(mdev);
4050 }
4051 
4052 
4053 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
4054 {
4055 	if (mdev->ldev->md.uuid[idx]) {
4056 		drbd_uuid_move_history(mdev);
4057 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
4058 	}
4059 	_drbd_uuid_set(mdev, idx, val);
4060 }
4061 
4062 /**
4063  * drbd_uuid_new_current() - Creates a new current UUID
4064  * @mdev:	DRBD device.
4065  *
4066  * Creates a new current UUID, and rotates the old current UUID into
4067  * the bitmap slot. Causes an incremental resync upon next connect.
4068  */
4069 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
4070 {
4071 	u64 val;
4072 	unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
4073 
4074 	if (bm_uuid)
4075 		dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
4076 
4077 	mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
4078 
4079 	get_random_bytes(&val, sizeof(u64));
4080 	_drbd_uuid_set(mdev, UI_CURRENT, val);
4081 	drbd_print_uuids(mdev, "new current UUID");
4082 	/* get it to stable storage _now_ */
4083 	drbd_md_sync(mdev);
4084 }
4085 
4086 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
4087 {
4088 	if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
4089 		return;
4090 
4091 	if (val == 0) {
4092 		drbd_uuid_move_history(mdev);
4093 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
4094 		mdev->ldev->md.uuid[UI_BITMAP] = 0;
4095 	} else {
4096 		unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
4097 		if (bm_uuid)
4098 			dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
4099 
4100 		mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
4101 	}
4102 	drbd_md_mark_dirty(mdev);
4103 }
4104 
4105 /**
4106  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
4107  * @mdev:	DRBD device.
4108  *
4109  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
4110  */
4111 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
4112 {
4113 	int rv = -EIO;
4114 
4115 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
4116 		drbd_md_set_flag(mdev, MDF_FULL_SYNC);
4117 		drbd_md_sync(mdev);
4118 		drbd_bm_set_all(mdev);
4119 
4120 		rv = drbd_bm_write(mdev);
4121 
4122 		if (!rv) {
4123 			drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
4124 			drbd_md_sync(mdev);
4125 		}
4126 
4127 		put_ldev(mdev);
4128 	}
4129 
4130 	return rv;
4131 }
4132 
4133 /**
4134  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
4135  * @mdev:	DRBD device.
4136  *
4137  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
4138  */
4139 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
4140 {
4141 	int rv = -EIO;
4142 
4143 	drbd_resume_al(mdev);
4144 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
4145 		drbd_bm_clear_all(mdev);
4146 		rv = drbd_bm_write(mdev);
4147 		put_ldev(mdev);
4148 	}
4149 
4150 	return rv;
4151 }
4152 
4153 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
4154 {
4155 	struct bm_io_work *work = container_of(w, struct bm_io_work, w);
4156 	int rv = -EIO;
4157 
4158 	D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
4159 
4160 	if (get_ldev(mdev)) {
4161 		drbd_bm_lock(mdev, work->why, work->flags);
4162 		rv = work->io_fn(mdev);
4163 		drbd_bm_unlock(mdev);
4164 		put_ldev(mdev);
4165 	}
4166 
4167 	clear_bit(BITMAP_IO, &mdev->flags);
4168 	smp_mb__after_clear_bit();
4169 	wake_up(&mdev->misc_wait);
4170 
4171 	if (work->done)
4172 		work->done(mdev, rv);
4173 
4174 	clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
4175 	work->why = NULL;
4176 	work->flags = 0;
4177 
4178 	return 1;
4179 }
4180 
4181 void drbd_ldev_destroy(struct drbd_conf *mdev)
4182 {
4183 	lc_destroy(mdev->resync);
4184 	mdev->resync = NULL;
4185 	lc_destroy(mdev->act_log);
4186 	mdev->act_log = NULL;
4187 	__no_warn(local,
4188 		drbd_free_bc(mdev->ldev);
4189 		mdev->ldev = NULL;);
4190 
4191 	if (mdev->md_io_tmpp) {
4192 		__free_page(mdev->md_io_tmpp);
4193 		mdev->md_io_tmpp = NULL;
4194 	}
4195 	clear_bit(GO_DISKLESS, &mdev->flags);
4196 }
4197 
4198 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused)
4199 {
4200 	D_ASSERT(mdev->state.disk == D_FAILED);
4201 	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
4202 	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
4203 	 * the protected members anymore, though, so once put_ldev reaches zero
4204 	 * again, it will be safe to free them. */
4205 	drbd_force_state(mdev, NS(disk, D_DISKLESS));
4206 	return 1;
4207 }
4208 
4209 void drbd_go_diskless(struct drbd_conf *mdev)
4210 {
4211 	D_ASSERT(mdev->state.disk == D_FAILED);
4212 	if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
4213 		drbd_queue_work(&mdev->data.work, &mdev->go_diskless);
4214 }
4215 
4216 /**
4217  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
4218  * @mdev:	DRBD device.
4219  * @io_fn:	IO callback to be called when bitmap IO is possible
4220  * @done:	callback to be called after the bitmap IO was performed
4221  * @why:	Descriptive text of the reason for doing the IO
4222  *
4223  * While IO on the bitmap happens we freeze application IO thus we ensure
4224  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
4225  * called from worker context. It MUST NOT be used while a previous such
4226  * work is still pending!
4227  */
4228 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
4229 			  int (*io_fn)(struct drbd_conf *),
4230 			  void (*done)(struct drbd_conf *, int),
4231 			  char *why, enum bm_flag flags)
4232 {
4233 	D_ASSERT(current == mdev->worker.task);
4234 
4235 	D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
4236 	D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
4237 	D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
4238 	if (mdev->bm_io_work.why)
4239 		dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
4240 			why, mdev->bm_io_work.why);
4241 
4242 	mdev->bm_io_work.io_fn = io_fn;
4243 	mdev->bm_io_work.done = done;
4244 	mdev->bm_io_work.why = why;
4245 	mdev->bm_io_work.flags = flags;
4246 
4247 	spin_lock_irq(&mdev->req_lock);
4248 	set_bit(BITMAP_IO, &mdev->flags);
4249 	if (atomic_read(&mdev->ap_bio_cnt) == 0) {
4250 		if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
4251 			drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
4252 	}
4253 	spin_unlock_irq(&mdev->req_lock);
4254 }
4255 
4256 /**
4257  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
4258  * @mdev:	DRBD device.
4259  * @io_fn:	IO callback to be called when bitmap IO is possible
4260  * @why:	Descriptive text of the reason for doing the IO
4261  *
4262  * freezes application IO while that the actual IO operations runs. This
4263  * functions MAY NOT be called from worker context.
4264  */
4265 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
4266 		char *why, enum bm_flag flags)
4267 {
4268 	int rv;
4269 
4270 	D_ASSERT(current != mdev->worker.task);
4271 
4272 	if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
4273 		drbd_suspend_io(mdev);
4274 
4275 	drbd_bm_lock(mdev, why, flags);
4276 	rv = io_fn(mdev);
4277 	drbd_bm_unlock(mdev);
4278 
4279 	if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
4280 		drbd_resume_io(mdev);
4281 
4282 	return rv;
4283 }
4284 
4285 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
4286 {
4287 	if ((mdev->ldev->md.flags & flag) != flag) {
4288 		drbd_md_mark_dirty(mdev);
4289 		mdev->ldev->md.flags |= flag;
4290 	}
4291 }
4292 
4293 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
4294 {
4295 	if ((mdev->ldev->md.flags & flag) != 0) {
4296 		drbd_md_mark_dirty(mdev);
4297 		mdev->ldev->md.flags &= ~flag;
4298 	}
4299 }
4300 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
4301 {
4302 	return (bdev->md.flags & flag) != 0;
4303 }
4304 
4305 static void md_sync_timer_fn(unsigned long data)
4306 {
4307 	struct drbd_conf *mdev = (struct drbd_conf *) data;
4308 
4309 	drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
4310 }
4311 
4312 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
4313 {
4314 	dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
4315 #ifdef DEBUG
4316 	dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
4317 		mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
4318 #endif
4319 	drbd_md_sync(mdev);
4320 	return 1;
4321 }
4322 
4323 #ifdef CONFIG_DRBD_FAULT_INJECTION
4324 /* Fault insertion support including random number generator shamelessly
4325  * stolen from kernel/rcutorture.c */
4326 struct fault_random_state {
4327 	unsigned long state;
4328 	unsigned long count;
4329 };
4330 
4331 #define FAULT_RANDOM_MULT 39916801  /* prime */
4332 #define FAULT_RANDOM_ADD	479001701 /* prime */
4333 #define FAULT_RANDOM_REFRESH 10000
4334 
4335 /*
4336  * Crude but fast random-number generator.  Uses a linear congruential
4337  * generator, with occasional help from get_random_bytes().
4338  */
4339 static unsigned long
4340 _drbd_fault_random(struct fault_random_state *rsp)
4341 {
4342 	long refresh;
4343 
4344 	if (!rsp->count--) {
4345 		get_random_bytes(&refresh, sizeof(refresh));
4346 		rsp->state += refresh;
4347 		rsp->count = FAULT_RANDOM_REFRESH;
4348 	}
4349 	rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
4350 	return swahw32(rsp->state);
4351 }
4352 
4353 static char *
4354 _drbd_fault_str(unsigned int type) {
4355 	static char *_faults[] = {
4356 		[DRBD_FAULT_MD_WR] = "Meta-data write",
4357 		[DRBD_FAULT_MD_RD] = "Meta-data read",
4358 		[DRBD_FAULT_RS_WR] = "Resync write",
4359 		[DRBD_FAULT_RS_RD] = "Resync read",
4360 		[DRBD_FAULT_DT_WR] = "Data write",
4361 		[DRBD_FAULT_DT_RD] = "Data read",
4362 		[DRBD_FAULT_DT_RA] = "Data read ahead",
4363 		[DRBD_FAULT_BM_ALLOC] = "BM allocation",
4364 		[DRBD_FAULT_AL_EE] = "EE allocation",
4365 		[DRBD_FAULT_RECEIVE] = "receive data corruption",
4366 	};
4367 
4368 	return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
4369 }
4370 
4371 unsigned int
4372 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
4373 {
4374 	static struct fault_random_state rrs = {0, 0};
4375 
4376 	unsigned int ret = (
4377 		(fault_devs == 0 ||
4378 			((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
4379 		(((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
4380 
4381 	if (ret) {
4382 		fault_count++;
4383 
4384 		if (__ratelimit(&drbd_ratelimit_state))
4385 			dev_warn(DEV, "***Simulating %s failure\n",
4386 				_drbd_fault_str(type));
4387 	}
4388 
4389 	return ret;
4390 }
4391 #endif
4392 
4393 const char *drbd_buildtag(void)
4394 {
4395 	/* DRBD built from external sources has here a reference to the
4396 	   git hash of the source code. */
4397 
4398 	static char buildtag[38] = "\0uilt-in";
4399 
4400 	if (buildtag[0] == 0) {
4401 #ifdef MODULE
4402 		sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
4403 #else
4404 		buildtag[0] = 'b';
4405 #endif
4406 	}
4407 
4408 	return buildtag;
4409 }
4410 
4411 module_init(drbd_init)
4412 module_exit(drbd_cleanup)
4413 
4414 EXPORT_SYMBOL(drbd_conn_str);
4415 EXPORT_SYMBOL(drbd_role_str);
4416 EXPORT_SYMBOL(drbd_disk_str);
4417 EXPORT_SYMBOL(drbd_set_st_err_str);
4418