xref: /linux/fs/dlm/dir.c (revision 8bc7c5e525584903ea83332e18a2118ed3b1985e)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include "dlm_internal.h"
13 #include "lockspace.h"
14 #include "member.h"
15 #include "lowcomms.h"
16 #include "rcom.h"
17 #include "config.h"
18 #include "memory.h"
19 #include "recover.h"
20 #include "util.h"
21 #include "lock.h"
22 #include "dir.h"
23 
24 /*
25  * We use the upper 16 bits of the hash value to select the directory node.
26  * Low bits are used for distribution of rsb's among hash buckets on each node.
27  *
28  * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
29  * num_nodes to the hash value.  This value in the desired range is used as an
30  * offset into the sorted list of nodeid's to give the particular nodeid.
31  */
32 
33 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
34 {
35 	uint32_t node;
36 
37 	if (ls->ls_num_nodes == 1)
38 		return dlm_our_nodeid();
39 	else {
40 		node = (hash >> 16) % ls->ls_total_weight;
41 		return ls->ls_node_array[node];
42 	}
43 }
44 
45 int dlm_dir_nodeid(struct dlm_rsb *r)
46 {
47 	return r->res_dir_nodeid;
48 }
49 
50 void dlm_recover_dir_nodeid(struct dlm_ls *ls, const struct list_head *root_list)
51 {
52 	struct dlm_rsb *r;
53 
54 	list_for_each_entry(r, root_list, res_root_list) {
55 		r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
56 	}
57 }
58 
59 int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq)
60 {
61 	struct dlm_member *memb;
62 	char *b, *last_name = NULL;
63 	int error = -ENOMEM, last_len, nodeid, result;
64 	uint16_t namelen;
65 	unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
66 
67 	log_rinfo(ls, "dlm_recover_directory");
68 
69 	if (dlm_no_directory(ls))
70 		goto out_status;
71 
72 	last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
73 	if (!last_name)
74 		goto out;
75 
76 	list_for_each_entry(memb, &ls->ls_nodes, list) {
77 		if (memb->nodeid == dlm_our_nodeid())
78 			continue;
79 
80 		memset(last_name, 0, DLM_RESNAME_MAXLEN);
81 		last_len = 0;
82 
83 		for (;;) {
84 			int left;
85 			if (dlm_recovery_stopped(ls)) {
86 				error = -EINTR;
87 				goto out_free;
88 			}
89 
90 			error = dlm_rcom_names(ls, memb->nodeid,
91 					       last_name, last_len, seq);
92 			if (error)
93 				goto out_free;
94 
95 			cond_resched();
96 
97 			/*
98 			 * pick namelen/name pairs out of received buffer
99 			 */
100 
101 			b = ls->ls_recover_buf->rc_buf;
102 			left = le16_to_cpu(ls->ls_recover_buf->rc_header.h_length);
103 			left -= sizeof(struct dlm_rcom);
104 
105 			for (;;) {
106 				__be16 v;
107 
108 				error = -EINVAL;
109 				if (left < sizeof(__be16))
110 					goto out_free;
111 
112 				memcpy(&v, b, sizeof(__be16));
113 				namelen = be16_to_cpu(v);
114 				b += sizeof(__be16);
115 				left -= sizeof(__be16);
116 
117 				/* namelen of 0xFFFFF marks end of names for
118 				   this node; namelen of 0 marks end of the
119 				   buffer */
120 
121 				if (namelen == 0xFFFF)
122 					goto done;
123 				if (!namelen)
124 					break;
125 
126 				if (namelen > left)
127 					goto out_free;
128 
129 				if (namelen > DLM_RESNAME_MAXLEN)
130 					goto out_free;
131 
132 				error = dlm_master_lookup(ls, memb->nodeid,
133 							  b, namelen,
134 							  DLM_LU_RECOVER_DIR,
135 							  &nodeid, &result);
136 				if (error) {
137 					log_error(ls, "recover_dir lookup %d",
138 						  error);
139 					goto out_free;
140 				}
141 
142 				/* The name was found in rsbtbl, but the
143 				 * master nodeid is different from
144 				 * memb->nodeid which says it is the master.
145 				 * This should not happen. */
146 
147 				if (result == DLM_LU_MATCH &&
148 				    nodeid != memb->nodeid) {
149 					count_bad++;
150 					log_error(ls, "recover_dir lookup %d "
151 						  "nodeid %d memb %d bad %u",
152 						  result, nodeid, memb->nodeid,
153 						  count_bad);
154 					print_hex_dump_bytes("dlm_recover_dir ",
155 							     DUMP_PREFIX_NONE,
156 							     b, namelen);
157 				}
158 
159 				/* The name was found in rsbtbl, and the
160 				 * master nodeid matches memb->nodeid. */
161 
162 				if (result == DLM_LU_MATCH &&
163 				    nodeid == memb->nodeid) {
164 					count_match++;
165 				}
166 
167 				/* The name was not found in rsbtbl and was
168 				 * added with memb->nodeid as the master. */
169 
170 				if (result == DLM_LU_ADD) {
171 					count_add++;
172 				}
173 
174 				last_len = namelen;
175 				memcpy(last_name, b, namelen);
176 				b += namelen;
177 				left -= namelen;
178 				count++;
179 			}
180 		}
181 	 done:
182 		;
183 	}
184 
185  out_status:
186 	error = 0;
187 	dlm_set_recover_status(ls, DLM_RS_DIR);
188 
189 	log_rinfo(ls, "dlm_recover_directory %u in %u new",
190 		  count, count_add);
191  out_free:
192 	kfree(last_name);
193  out:
194 	return error;
195 }
196 
197 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
198 				     int len)
199 {
200 	struct dlm_rsb *r;
201 	int rv;
202 
203 	read_lock_bh(&ls->ls_rsbtbl_lock);
204 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
205 	read_unlock_bh(&ls->ls_rsbtbl_lock);
206 	if (!rv)
207 		return r;
208 
209 	list_for_each_entry(r, &ls->ls_masters_list, res_masters_list) {
210 		if (len == r->res_length && !memcmp(name, r->res_name, len)) {
211 			log_debug(ls, "find_rsb_root revert to root_list %s",
212 				  r->res_name);
213 			return r;
214 		}
215 	}
216 	return NULL;
217 }
218 
219 struct dlm_dir_dump {
220 	/* init values to match if whole
221 	 * dump fits to one seq. Sanity check only.
222 	 */
223 	uint64_t seq_init;
224 	uint64_t nodeid_init;
225 	/* compare local pointer with last lookup,
226 	 * just a sanity check.
227 	 */
228 	struct list_head *last;
229 
230 	unsigned int sent_res; /* for log info */
231 	unsigned int sent_msg; /* for log info */
232 
233 	struct list_head list;
234 };
235 
236 static void drop_dir_ctx(struct dlm_ls *ls, int nodeid)
237 {
238 	struct dlm_dir_dump *dd, *safe;
239 
240 	write_lock_bh(&ls->ls_dir_dump_lock);
241 	list_for_each_entry_safe(dd, safe, &ls->ls_dir_dump_list, list) {
242 		if (dd->nodeid_init == nodeid) {
243 			log_error(ls, "drop dump seq %llu",
244 				 (unsigned long long)dd->seq_init);
245 			list_del(&dd->list);
246 			kfree(dd);
247 		}
248 	}
249 	write_unlock_bh(&ls->ls_dir_dump_lock);
250 }
251 
252 static struct dlm_dir_dump *lookup_dir_dump(struct dlm_ls *ls, int nodeid)
253 {
254 	struct dlm_dir_dump *iter, *dd = NULL;
255 
256 	read_lock_bh(&ls->ls_dir_dump_lock);
257 	list_for_each_entry(iter, &ls->ls_dir_dump_list, list) {
258 		if (iter->nodeid_init == nodeid) {
259 			dd = iter;
260 			break;
261 		}
262 	}
263 	read_unlock_bh(&ls->ls_dir_dump_lock);
264 
265 	return dd;
266 }
267 
268 static struct dlm_dir_dump *init_dir_dump(struct dlm_ls *ls, int nodeid)
269 {
270 	struct dlm_dir_dump *dd;
271 
272 	dd = lookup_dir_dump(ls, nodeid);
273 	if (dd) {
274 		log_error(ls, "found ongoing dir dump for node %d, will drop it",
275 			  nodeid);
276 		drop_dir_ctx(ls, nodeid);
277 	}
278 
279 	dd = kzalloc(sizeof(*dd), GFP_ATOMIC);
280 	if (!dd)
281 		return NULL;
282 
283 	dd->seq_init = ls->ls_recover_seq;
284 	dd->nodeid_init = nodeid;
285 
286 	write_lock_bh(&ls->ls_dir_dump_lock);
287 	list_add(&dd->list, &ls->ls_dir_dump_list);
288 	write_unlock_bh(&ls->ls_dir_dump_lock);
289 
290 	return dd;
291 }
292 
293 /* Find the rsb where we left off (or start again), then send rsb names
294    for rsb's we're master of and whose directory node matches the requesting
295    node.  inbuf is the rsb name last sent, inlen is the name's length */
296 
297 void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
298  			   char *outbuf, int outlen, int nodeid)
299 {
300 	struct list_head *list;
301 	struct dlm_rsb *r;
302 	int offset = 0, dir_nodeid;
303 	struct dlm_dir_dump *dd;
304 	__be16 be_namelen;
305 
306 	read_lock_bh(&ls->ls_masters_lock);
307 
308 	if (inlen > 1) {
309 		dd = lookup_dir_dump(ls, nodeid);
310 		if (!dd) {
311 			log_error(ls, "failed to lookup dir dump context nodeid: %d",
312 				  nodeid);
313 			goto out;
314 		}
315 
316 		/* next chunk in dump */
317 		r = find_rsb_root(ls, inbuf, inlen);
318 		if (!r) {
319 			log_error(ls, "copy_master_names from %d start %d %.*s",
320 				  nodeid, inlen, inlen, inbuf);
321 			goto out;
322 		}
323 		list = r->res_masters_list.next;
324 
325 		/* sanity checks */
326 		if (dd->last != &r->res_masters_list ||
327 		    dd->seq_init != ls->ls_recover_seq) {
328 			log_error(ls, "failed dir dump sanity check seq_init: %llu seq: %llu",
329 				  (unsigned long long)dd->seq_init,
330 				  (unsigned long long)ls->ls_recover_seq);
331 			goto out;
332 		}
333 	} else {
334 		dd = init_dir_dump(ls, nodeid);
335 		if (!dd) {
336 			log_error(ls, "failed to allocate dir dump context");
337 			goto out;
338 		}
339 
340 		/* start dump */
341 		list = ls->ls_masters_list.next;
342 		dd->last = list;
343 	}
344 
345 	for (offset = 0; list != &ls->ls_masters_list; list = list->next) {
346 		r = list_entry(list, struct dlm_rsb, res_masters_list);
347 		dir_nodeid = dlm_dir_nodeid(r);
348 		if (dir_nodeid != nodeid)
349 			continue;
350 
351 		/*
352 		 * The block ends when we can't fit the following in the
353 		 * remaining buffer space:
354 		 * namelen (uint16_t) +
355 		 * name (r->res_length) +
356 		 * end-of-block record 0x0000 (uint16_t)
357 		 */
358 
359 		if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
360 			/* Write end-of-block record */
361 			be_namelen = cpu_to_be16(0);
362 			memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
363 			offset += sizeof(__be16);
364 			dd->sent_msg++;
365 			goto out;
366 		}
367 
368 		be_namelen = cpu_to_be16(r->res_length);
369 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
370 		offset += sizeof(__be16);
371 		memcpy(outbuf + offset, r->res_name, r->res_length);
372 		offset += r->res_length;
373 		dd->sent_res++;
374 		dd->last = list;
375 	}
376 
377 	/*
378 	 * If we've reached the end of the list (and there's room) write a
379 	 * terminating record.
380 	 */
381 
382 	if ((list == &ls->ls_masters_list) &&
383 	    (offset + sizeof(uint16_t) <= outlen)) {
384 		/* end dump */
385 		be_namelen = cpu_to_be16(0xFFFF);
386 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
387 		offset += sizeof(__be16);
388 		dd->sent_msg++;
389 		log_rinfo(ls, "dlm_recover_directory nodeid %d sent %u res out %u messages",
390 			  nodeid, dd->sent_res, dd->sent_msg);
391 
392 		write_lock_bh(&ls->ls_dir_dump_lock);
393 		list_del_init(&dd->list);
394 		write_unlock_bh(&ls->ls_dir_dump_lock);
395 		kfree(dd);
396 	}
397  out:
398 	read_unlock_bh(&ls->ls_masters_lock);
399 }
400 
401