xref: /linux/fs/dlm/dir.c (revision ca64d84e93762f4e587e040a44ad9f6089afc777)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include "dlm_internal.h"
13 #include "lockspace.h"
14 #include "member.h"
15 #include "lowcomms.h"
16 #include "rcom.h"
17 #include "config.h"
18 #include "memory.h"
19 #include "recover.h"
20 #include "util.h"
21 #include "lock.h"
22 #include "dir.h"
23 
24 /*
25  * We use the upper 16 bits of the hash value to select the directory node.
26  * Low bits are used for distribution of rsb's among hash buckets on each node.
27  *
28  * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
29  * num_nodes to the hash value.  This value in the desired range is used as an
30  * offset into the sorted list of nodeid's to give the particular nodeid.
31  */
32 
33 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
34 {
35 	uint32_t node;
36 
37 	if (ls->ls_num_nodes == 1)
38 		return dlm_our_nodeid();
39 	else {
40 		node = (hash >> 16) % ls->ls_total_weight;
41 		return ls->ls_node_array[node];
42 	}
43 }
44 
45 int dlm_dir_nodeid(struct dlm_rsb *r)
46 {
47 	return r->res_dir_nodeid;
48 }
49 
50 void dlm_recover_dir_nodeid(struct dlm_ls *ls)
51 {
52 	struct dlm_rsb *r;
53 
54 	down_read(&ls->ls_root_sem);
55 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
56 		r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
57 	}
58 	up_read(&ls->ls_root_sem);
59 }
60 
61 int dlm_recover_directory(struct dlm_ls *ls)
62 {
63 	struct dlm_member *memb;
64 	char *b, *last_name = NULL;
65 	int error = -ENOMEM, last_len, nodeid, result;
66 	uint16_t namelen;
67 	unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
68 
69 	log_rinfo(ls, "dlm_recover_directory");
70 
71 	if (dlm_no_directory(ls))
72 		goto out_status;
73 
74 	last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
75 	if (!last_name)
76 		goto out;
77 
78 	list_for_each_entry(memb, &ls->ls_nodes, list) {
79 		if (memb->nodeid == dlm_our_nodeid())
80 			continue;
81 
82 		memset(last_name, 0, DLM_RESNAME_MAXLEN);
83 		last_len = 0;
84 
85 		for (;;) {
86 			int left;
87 			error = dlm_recovery_stopped(ls);
88 			if (error)
89 				goto out_free;
90 
91 			error = dlm_rcom_names(ls, memb->nodeid,
92 					       last_name, last_len);
93 			if (error)
94 				goto out_free;
95 
96 			cond_resched();
97 
98 			/*
99 			 * pick namelen/name pairs out of received buffer
100 			 */
101 
102 			b = ls->ls_recover_buf->rc_buf;
103 			left = ls->ls_recover_buf->rc_header.h_length;
104 			left -= sizeof(struct dlm_rcom);
105 
106 			for (;;) {
107 				__be16 v;
108 
109 				error = -EINVAL;
110 				if (left < sizeof(__be16))
111 					goto out_free;
112 
113 				memcpy(&v, b, sizeof(__be16));
114 				namelen = be16_to_cpu(v);
115 				b += sizeof(__be16);
116 				left -= sizeof(__be16);
117 
118 				/* namelen of 0xFFFFF marks end of names for
119 				   this node; namelen of 0 marks end of the
120 				   buffer */
121 
122 				if (namelen == 0xFFFF)
123 					goto done;
124 				if (!namelen)
125 					break;
126 
127 				if (namelen > left)
128 					goto out_free;
129 
130 				if (namelen > DLM_RESNAME_MAXLEN)
131 					goto out_free;
132 
133 				error = dlm_master_lookup(ls, memb->nodeid,
134 							  b, namelen,
135 							  DLM_LU_RECOVER_DIR,
136 							  &nodeid, &result);
137 				if (error) {
138 					log_error(ls, "recover_dir lookup %d",
139 						  error);
140 					goto out_free;
141 				}
142 
143 				/* The name was found in rsbtbl, but the
144 				 * master nodeid is different from
145 				 * memb->nodeid which says it is the master.
146 				 * This should not happen. */
147 
148 				if (result == DLM_LU_MATCH &&
149 				    nodeid != memb->nodeid) {
150 					count_bad++;
151 					log_error(ls, "recover_dir lookup %d "
152 						  "nodeid %d memb %d bad %u",
153 						  result, nodeid, memb->nodeid,
154 						  count_bad);
155 					print_hex_dump_bytes("dlm_recover_dir ",
156 							     DUMP_PREFIX_NONE,
157 							     b, namelen);
158 				}
159 
160 				/* The name was found in rsbtbl, and the
161 				 * master nodeid matches memb->nodeid. */
162 
163 				if (result == DLM_LU_MATCH &&
164 				    nodeid == memb->nodeid) {
165 					count_match++;
166 				}
167 
168 				/* The name was not found in rsbtbl and was
169 				 * added with memb->nodeid as the master. */
170 
171 				if (result == DLM_LU_ADD) {
172 					count_add++;
173 				}
174 
175 				last_len = namelen;
176 				memcpy(last_name, b, namelen);
177 				b += namelen;
178 				left -= namelen;
179 				count++;
180 			}
181 		}
182 	 done:
183 		;
184 	}
185 
186  out_status:
187 	error = 0;
188 	dlm_set_recover_status(ls, DLM_RS_DIR);
189 
190 	log_rinfo(ls, "dlm_recover_directory %u in %u new",
191 		  count, count_add);
192  out_free:
193 	kfree(last_name);
194  out:
195 	return error;
196 }
197 
198 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
199 {
200 	struct dlm_rsb *r;
201 	uint32_t hash, bucket;
202 	int rv;
203 
204 	hash = jhash(name, len, 0);
205 	bucket = hash & (ls->ls_rsbtbl_size - 1);
206 
207 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
208 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
209 	if (rv)
210 		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
211 					 name, len, &r);
212 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
213 
214 	if (!rv)
215 		return r;
216 
217 	down_read(&ls->ls_root_sem);
218 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
219 		if (len == r->res_length && !memcmp(name, r->res_name, len)) {
220 			up_read(&ls->ls_root_sem);
221 			log_debug(ls, "find_rsb_root revert to root_list %s",
222 				  r->res_name);
223 			return r;
224 		}
225 	}
226 	up_read(&ls->ls_root_sem);
227 	return NULL;
228 }
229 
230 /* Find the rsb where we left off (or start again), then send rsb names
231    for rsb's we're master of and whose directory node matches the requesting
232    node.  inbuf is the rsb name last sent, inlen is the name's length */
233 
234 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
235  			   char *outbuf, int outlen, int nodeid)
236 {
237 	struct list_head *list;
238 	struct dlm_rsb *r;
239 	int offset = 0, dir_nodeid;
240 	__be16 be_namelen;
241 
242 	down_read(&ls->ls_root_sem);
243 
244 	if (inlen > 1) {
245 		r = find_rsb_root(ls, inbuf, inlen);
246 		if (!r) {
247 			inbuf[inlen - 1] = '\0';
248 			log_error(ls, "copy_master_names from %d start %d %s",
249 				  nodeid, inlen, inbuf);
250 			goto out;
251 		}
252 		list = r->res_root_list.next;
253 	} else {
254 		list = ls->ls_root_list.next;
255 	}
256 
257 	for (offset = 0; list != &ls->ls_root_list; list = list->next) {
258 		r = list_entry(list, struct dlm_rsb, res_root_list);
259 		if (r->res_nodeid)
260 			continue;
261 
262 		dir_nodeid = dlm_dir_nodeid(r);
263 		if (dir_nodeid != nodeid)
264 			continue;
265 
266 		/*
267 		 * The block ends when we can't fit the following in the
268 		 * remaining buffer space:
269 		 * namelen (uint16_t) +
270 		 * name (r->res_length) +
271 		 * end-of-block record 0x0000 (uint16_t)
272 		 */
273 
274 		if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
275 			/* Write end-of-block record */
276 			be_namelen = cpu_to_be16(0);
277 			memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
278 			offset += sizeof(__be16);
279 			ls->ls_recover_dir_sent_msg++;
280 			goto out;
281 		}
282 
283 		be_namelen = cpu_to_be16(r->res_length);
284 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
285 		offset += sizeof(__be16);
286 		memcpy(outbuf + offset, r->res_name, r->res_length);
287 		offset += r->res_length;
288 		ls->ls_recover_dir_sent_res++;
289 	}
290 
291 	/*
292 	 * If we've reached the end of the list (and there's room) write a
293 	 * terminating record.
294 	 */
295 
296 	if ((list == &ls->ls_root_list) &&
297 	    (offset + sizeof(uint16_t) <= outlen)) {
298 		be_namelen = cpu_to_be16(0xFFFF);
299 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
300 		offset += sizeof(__be16);
301 		ls->ls_recover_dir_sent_msg++;
302 	}
303  out:
304 	up_read(&ls->ls_root_sem);
305 }
306 
307