xref: /linux/fs/dlm/dir.c (revision 643d1f7fe3aa12c8bdea6fa5b4ba874ff6dd601d)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "lowcomms.h"
18 #include "rcom.h"
19 #include "config.h"
20 #include "memory.h"
21 #include "recover.h"
22 #include "util.h"
23 #include "lock.h"
24 #include "dir.h"
25 
26 
27 static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
28 {
29 	spin_lock(&ls->ls_recover_list_lock);
30 	list_add(&de->list, &ls->ls_recover_list);
31 	spin_unlock(&ls->ls_recover_list_lock);
32 }
33 
34 static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
35 {
36 	int found = 0;
37 	struct dlm_direntry *de;
38 
39 	spin_lock(&ls->ls_recover_list_lock);
40 	list_for_each_entry(de, &ls->ls_recover_list, list) {
41 		if (de->length == len) {
42 			list_del(&de->list);
43 			de->master_nodeid = 0;
44 			memset(de->name, 0, len);
45 			found = 1;
46 			break;
47 		}
48 	}
49 	spin_unlock(&ls->ls_recover_list_lock);
50 
51 	if (!found)
52 		de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_KERNEL);
53 	return de;
54 }
55 
56 void dlm_clear_free_entries(struct dlm_ls *ls)
57 {
58 	struct dlm_direntry *de;
59 
60 	spin_lock(&ls->ls_recover_list_lock);
61 	while (!list_empty(&ls->ls_recover_list)) {
62 		de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
63 				list);
64 		list_del(&de->list);
65 		kfree(de);
66 	}
67 	spin_unlock(&ls->ls_recover_list_lock);
68 }
69 
70 /*
71  * We use the upper 16 bits of the hash value to select the directory node.
72  * Low bits are used for distribution of rsb's among hash buckets on each node.
73  *
74  * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
75  * num_nodes to the hash value.  This value in the desired range is used as an
76  * offset into the sorted list of nodeid's to give the particular nodeid.
77  */
78 
79 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
80 {
81 	struct list_head *tmp;
82 	struct dlm_member *memb = NULL;
83 	uint32_t node, n = 0;
84 	int nodeid;
85 
86 	if (ls->ls_num_nodes == 1) {
87 		nodeid = dlm_our_nodeid();
88 		goto out;
89 	}
90 
91 	if (ls->ls_node_array) {
92 		node = (hash >> 16) % ls->ls_total_weight;
93 		nodeid = ls->ls_node_array[node];
94 		goto out;
95 	}
96 
97 	/* make_member_array() failed to kmalloc ls_node_array... */
98 
99 	node = (hash >> 16) % ls->ls_num_nodes;
100 
101 	list_for_each(tmp, &ls->ls_nodes) {
102 		if (n++ != node)
103 			continue;
104 		memb = list_entry(tmp, struct dlm_member, list);
105 		break;
106 	}
107 
108 	DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
109 				 ls->ls_num_nodes, n, node););
110 	nodeid = memb->nodeid;
111  out:
112 	return nodeid;
113 }
114 
115 int dlm_dir_nodeid(struct dlm_rsb *r)
116 {
117 	return dlm_hash2nodeid(r->res_ls, r->res_hash);
118 }
119 
120 static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
121 {
122 	uint32_t val;
123 
124 	val = jhash(name, len, 0);
125 	val &= (ls->ls_dirtbl_size - 1);
126 
127 	return val;
128 }
129 
130 static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
131 {
132 	uint32_t bucket;
133 
134 	bucket = dir_hash(ls, de->name, de->length);
135 	list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
136 }
137 
138 static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
139 					  int namelen, uint32_t bucket)
140 {
141 	struct dlm_direntry *de;
142 
143 	list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
144 		if (de->length == namelen && !memcmp(name, de->name, namelen))
145 			goto out;
146 	}
147 	de = NULL;
148  out:
149 	return de;
150 }
151 
152 void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
153 {
154 	struct dlm_direntry *de;
155 	uint32_t bucket;
156 
157 	bucket = dir_hash(ls, name, namelen);
158 
159 	write_lock(&ls->ls_dirtbl[bucket].lock);
160 
161 	de = search_bucket(ls, name, namelen, bucket);
162 
163 	if (!de) {
164 		log_error(ls, "remove fr %u none", nodeid);
165 		goto out;
166 	}
167 
168 	if (de->master_nodeid != nodeid) {
169 		log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
170 		goto out;
171 	}
172 
173 	list_del(&de->list);
174 	kfree(de);
175  out:
176 	write_unlock(&ls->ls_dirtbl[bucket].lock);
177 }
178 
179 void dlm_dir_clear(struct dlm_ls *ls)
180 {
181 	struct list_head *head;
182 	struct dlm_direntry *de;
183 	int i;
184 
185 	DLM_ASSERT(list_empty(&ls->ls_recover_list), );
186 
187 	for (i = 0; i < ls->ls_dirtbl_size; i++) {
188 		write_lock(&ls->ls_dirtbl[i].lock);
189 		head = &ls->ls_dirtbl[i].list;
190 		while (!list_empty(head)) {
191 			de = list_entry(head->next, struct dlm_direntry, list);
192 			list_del(&de->list);
193 			put_free_de(ls, de);
194 		}
195 		write_unlock(&ls->ls_dirtbl[i].lock);
196 	}
197 }
198 
199 int dlm_recover_directory(struct dlm_ls *ls)
200 {
201 	struct dlm_member *memb;
202 	struct dlm_direntry *de;
203 	char *b, *last_name = NULL;
204 	int error = -ENOMEM, last_len, count = 0;
205 	uint16_t namelen;
206 
207 	log_debug(ls, "dlm_recover_directory");
208 
209 	if (dlm_no_directory(ls))
210 		goto out_status;
211 
212 	dlm_dir_clear(ls);
213 
214 	last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_KERNEL);
215 	if (!last_name)
216 		goto out;
217 
218 	list_for_each_entry(memb, &ls->ls_nodes, list) {
219 		memset(last_name, 0, DLM_RESNAME_MAXLEN);
220 		last_len = 0;
221 
222 		for (;;) {
223 			error = dlm_recovery_stopped(ls);
224 			if (error)
225 				goto out_free;
226 
227 			error = dlm_rcom_names(ls, memb->nodeid,
228 					       last_name, last_len);
229 			if (error)
230 				goto out_free;
231 
232 			schedule();
233 
234 			/*
235 			 * pick namelen/name pairs out of received buffer
236 			 */
237 
238 			b = ls->ls_recover_buf + sizeof(struct dlm_rcom);
239 
240 			for (;;) {
241 				memcpy(&namelen, b, sizeof(uint16_t));
242 				namelen = be16_to_cpu(namelen);
243 				b += sizeof(uint16_t);
244 
245 				/* namelen of 0xFFFFF marks end of names for
246 				   this node; namelen of 0 marks end of the
247 				   buffer */
248 
249 				if (namelen == 0xFFFF)
250 					goto done;
251 				if (!namelen)
252 					break;
253 
254 				error = -ENOMEM;
255 				de = get_free_de(ls, namelen);
256 				if (!de)
257 					goto out_free;
258 
259 				de->master_nodeid = memb->nodeid;
260 				de->length = namelen;
261 				last_len = namelen;
262 				memcpy(de->name, b, namelen);
263 				memcpy(last_name, b, namelen);
264 				b += namelen;
265 
266 				add_entry_to_hash(ls, de);
267 				count++;
268 			}
269 		}
270          done:
271 		;
272 	}
273 
274  out_status:
275 	error = 0;
276 	dlm_set_recover_status(ls, DLM_RS_DIR);
277 	log_debug(ls, "dlm_recover_directory %d entries", count);
278  out_free:
279 	kfree(last_name);
280  out:
281 	dlm_clear_free_entries(ls);
282 	return error;
283 }
284 
285 static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
286 		     int namelen, int *r_nodeid)
287 {
288 	struct dlm_direntry *de, *tmp;
289 	uint32_t bucket;
290 
291 	bucket = dir_hash(ls, name, namelen);
292 
293 	write_lock(&ls->ls_dirtbl[bucket].lock);
294 	de = search_bucket(ls, name, namelen, bucket);
295 	if (de) {
296 		*r_nodeid = de->master_nodeid;
297 		write_unlock(&ls->ls_dirtbl[bucket].lock);
298 		if (*r_nodeid == nodeid)
299 			return -EEXIST;
300 		return 0;
301 	}
302 
303 	write_unlock(&ls->ls_dirtbl[bucket].lock);
304 
305 	de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_KERNEL);
306 	if (!de)
307 		return -ENOMEM;
308 
309 	de->master_nodeid = nodeid;
310 	de->length = namelen;
311 	memcpy(de->name, name, namelen);
312 
313 	write_lock(&ls->ls_dirtbl[bucket].lock);
314 	tmp = search_bucket(ls, name, namelen, bucket);
315 	if (tmp) {
316 		kfree(de);
317 		de = tmp;
318 	} else {
319 		list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
320 	}
321 	*r_nodeid = de->master_nodeid;
322 	write_unlock(&ls->ls_dirtbl[bucket].lock);
323 	return 0;
324 }
325 
326 int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
327 		   int *r_nodeid)
328 {
329 	return get_entry(ls, nodeid, name, namelen, r_nodeid);
330 }
331 
332 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
333 {
334 	struct dlm_rsb *r;
335 
336 	down_read(&ls->ls_root_sem);
337 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
338 		if (len == r->res_length && !memcmp(name, r->res_name, len)) {
339 			up_read(&ls->ls_root_sem);
340 			return r;
341 		}
342 	}
343 	up_read(&ls->ls_root_sem);
344 	return NULL;
345 }
346 
347 /* Find the rsb where we left off (or start again), then send rsb names
348    for rsb's we're master of and whose directory node matches the requesting
349    node.  inbuf is the rsb name last sent, inlen is the name's length */
350 
351 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
352  			   char *outbuf, int outlen, int nodeid)
353 {
354 	struct list_head *list;
355 	struct dlm_rsb *r;
356 	int offset = 0, dir_nodeid;
357 	uint16_t be_namelen;
358 
359 	down_read(&ls->ls_root_sem);
360 
361 	if (inlen > 1) {
362 		r = find_rsb_root(ls, inbuf, inlen);
363 		if (!r) {
364 			inbuf[inlen - 1] = '\0';
365 			log_error(ls, "copy_master_names from %d start %d %s",
366 				  nodeid, inlen, inbuf);
367 			goto out;
368 		}
369 		list = r->res_root_list.next;
370 	} else {
371 		list = ls->ls_root_list.next;
372 	}
373 
374 	for (offset = 0; list != &ls->ls_root_list; list = list->next) {
375 		r = list_entry(list, struct dlm_rsb, res_root_list);
376 		if (r->res_nodeid)
377 			continue;
378 
379 		dir_nodeid = dlm_dir_nodeid(r);
380 		if (dir_nodeid != nodeid)
381 			continue;
382 
383 		/*
384 		 * The block ends when we can't fit the following in the
385 		 * remaining buffer space:
386 		 * namelen (uint16_t) +
387 		 * name (r->res_length) +
388 		 * end-of-block record 0x0000 (uint16_t)
389 		 */
390 
391 		if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
392 			/* Write end-of-block record */
393 			be_namelen = 0;
394 			memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
395 			offset += sizeof(uint16_t);
396 			goto out;
397 		}
398 
399 		be_namelen = cpu_to_be16(r->res_length);
400 		memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
401 		offset += sizeof(uint16_t);
402 		memcpy(outbuf + offset, r->res_name, r->res_length);
403 		offset += r->res_length;
404 	}
405 
406 	/*
407 	 * If we've reached the end of the list (and there's room) write a
408 	 * terminating record.
409 	 */
410 
411 	if ((list == &ls->ls_root_list) &&
412 	    (offset + sizeof(uint16_t) <= outlen)) {
413 		be_namelen = 0xFFFF;
414 		memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
415 		offset += sizeof(uint16_t);
416 	}
417 
418  out:
419 	up_read(&ls->ls_root_sem);
420 }
421 
422