xref: /linux/fs/dlm/dir.c (revision 96de0e252cedffad61b3cb5e05662c591898e69a)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "lowcomms.h"
18 #include "rcom.h"
19 #include "config.h"
20 #include "memory.h"
21 #include "recover.h"
22 #include "util.h"
23 #include "lock.h"
24 #include "dir.h"
25 
26 
27 static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
28 {
29 	spin_lock(&ls->ls_recover_list_lock);
30 	list_add(&de->list, &ls->ls_recover_list);
31 	spin_unlock(&ls->ls_recover_list_lock);
32 }
33 
34 static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
35 {
36 	int found = 0;
37 	struct dlm_direntry *de;
38 
39 	spin_lock(&ls->ls_recover_list_lock);
40 	list_for_each_entry(de, &ls->ls_recover_list, list) {
41 		if (de->length == len) {
42 			list_del(&de->list);
43 			de->master_nodeid = 0;
44 			memset(de->name, 0, len);
45 			found = 1;
46 			break;
47 		}
48 	}
49 	spin_unlock(&ls->ls_recover_list_lock);
50 
51 	if (!found)
52 		de = allocate_direntry(ls, len);
53 	return de;
54 }
55 
56 void dlm_clear_free_entries(struct dlm_ls *ls)
57 {
58 	struct dlm_direntry *de;
59 
60 	spin_lock(&ls->ls_recover_list_lock);
61 	while (!list_empty(&ls->ls_recover_list)) {
62 		de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
63 				list);
64 		list_del(&de->list);
65 		free_direntry(de);
66 	}
67 	spin_unlock(&ls->ls_recover_list_lock);
68 }
69 
70 /*
71  * We use the upper 16 bits of the hash value to select the directory node.
72  * Low bits are used for distribution of rsb's among hash buckets on each node.
73  *
74  * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
75  * num_nodes to the hash value.  This value in the desired range is used as an
76  * offset into the sorted list of nodeid's to give the particular nodeid.
77  */
78 
79 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
80 {
81 	struct list_head *tmp;
82 	struct dlm_member *memb = NULL;
83 	uint32_t node, n = 0;
84 	int nodeid;
85 
86 	if (ls->ls_num_nodes == 1) {
87 		nodeid = dlm_our_nodeid();
88 		goto out;
89 	}
90 
91 	if (ls->ls_node_array) {
92 		node = (hash >> 16) % ls->ls_total_weight;
93 		nodeid = ls->ls_node_array[node];
94 		goto out;
95 	}
96 
97 	/* make_member_array() failed to kmalloc ls_node_array... */
98 
99 	node = (hash >> 16) % ls->ls_num_nodes;
100 
101 	list_for_each(tmp, &ls->ls_nodes) {
102 		if (n++ != node)
103 			continue;
104 		memb = list_entry(tmp, struct dlm_member, list);
105 		break;
106 	}
107 
108 	DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
109 				 ls->ls_num_nodes, n, node););
110 	nodeid = memb->nodeid;
111  out:
112 	return nodeid;
113 }
114 
115 int dlm_dir_nodeid(struct dlm_rsb *r)
116 {
117 	return dlm_hash2nodeid(r->res_ls, r->res_hash);
118 }
119 
120 static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
121 {
122 	uint32_t val;
123 
124 	val = jhash(name, len, 0);
125 	val &= (ls->ls_dirtbl_size - 1);
126 
127 	return val;
128 }
129 
130 static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
131 {
132 	uint32_t bucket;
133 
134 	bucket = dir_hash(ls, de->name, de->length);
135 	list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
136 }
137 
138 static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
139 					  int namelen, uint32_t bucket)
140 {
141 	struct dlm_direntry *de;
142 
143 	list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
144 		if (de->length == namelen && !memcmp(name, de->name, namelen))
145 			goto out;
146 	}
147 	de = NULL;
148  out:
149 	return de;
150 }
151 
152 void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
153 {
154 	struct dlm_direntry *de;
155 	uint32_t bucket;
156 
157 	bucket = dir_hash(ls, name, namelen);
158 
159 	write_lock(&ls->ls_dirtbl[bucket].lock);
160 
161 	de = search_bucket(ls, name, namelen, bucket);
162 
163 	if (!de) {
164 		log_error(ls, "remove fr %u none", nodeid);
165 		goto out;
166 	}
167 
168 	if (de->master_nodeid != nodeid) {
169 		log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
170 		goto out;
171 	}
172 
173 	list_del(&de->list);
174 	free_direntry(de);
175  out:
176 	write_unlock(&ls->ls_dirtbl[bucket].lock);
177 }
178 
179 void dlm_dir_clear(struct dlm_ls *ls)
180 {
181 	struct list_head *head;
182 	struct dlm_direntry *de;
183 	int i;
184 
185 	DLM_ASSERT(list_empty(&ls->ls_recover_list), );
186 
187 	for (i = 0; i < ls->ls_dirtbl_size; i++) {
188 		write_lock(&ls->ls_dirtbl[i].lock);
189 		head = &ls->ls_dirtbl[i].list;
190 		while (!list_empty(head)) {
191 			de = list_entry(head->next, struct dlm_direntry, list);
192 			list_del(&de->list);
193 			put_free_de(ls, de);
194 		}
195 		write_unlock(&ls->ls_dirtbl[i].lock);
196 	}
197 }
198 
199 int dlm_recover_directory(struct dlm_ls *ls)
200 {
201 	struct dlm_member *memb;
202 	struct dlm_direntry *de;
203 	char *b, *last_name = NULL;
204 	int error = -ENOMEM, last_len, count = 0;
205 	uint16_t namelen;
206 
207 	log_debug(ls, "dlm_recover_directory");
208 
209 	if (dlm_no_directory(ls))
210 		goto out_status;
211 
212 	dlm_dir_clear(ls);
213 
214 	last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_KERNEL);
215 	if (!last_name)
216 		goto out;
217 
218 	list_for_each_entry(memb, &ls->ls_nodes, list) {
219 		memset(last_name, 0, DLM_RESNAME_MAXLEN);
220 		last_len = 0;
221 
222 		for (;;) {
223 			error = dlm_recovery_stopped(ls);
224 			if (error)
225 				goto out_free;
226 
227 			error = dlm_rcom_names(ls, memb->nodeid,
228 					       last_name, last_len);
229 			if (error)
230 				goto out_free;
231 
232 			schedule();
233 
234 			/*
235 			 * pick namelen/name pairs out of received buffer
236 			 */
237 
238 			b = ls->ls_recover_buf + sizeof(struct dlm_rcom);
239 
240 			for (;;) {
241 				memcpy(&namelen, b, sizeof(uint16_t));
242 				namelen = be16_to_cpu(namelen);
243 				b += sizeof(uint16_t);
244 
245 				/* namelen of 0xFFFFF marks end of names for
246 				   this node; namelen of 0 marks end of the
247 				   buffer */
248 
249 				if (namelen == 0xFFFF)
250 					goto done;
251 				if (!namelen)
252 					break;
253 
254 				error = -ENOMEM;
255 				de = get_free_de(ls, namelen);
256 				if (!de)
257 					goto out_free;
258 
259 				de->master_nodeid = memb->nodeid;
260 				de->length = namelen;
261 				last_len = namelen;
262 				memcpy(de->name, b, namelen);
263 				memcpy(last_name, b, namelen);
264 				b += namelen;
265 
266 				add_entry_to_hash(ls, de);
267 				count++;
268 			}
269 		}
270          done:
271 		;
272 	}
273 
274  out_status:
275 	error = 0;
276 	dlm_set_recover_status(ls, DLM_RS_DIR);
277 	log_debug(ls, "dlm_recover_directory %d entries", count);
278  out_free:
279 	kfree(last_name);
280  out:
281 	dlm_clear_free_entries(ls);
282 	return error;
283 }
284 
285 static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
286 		     int namelen, int *r_nodeid)
287 {
288 	struct dlm_direntry *de, *tmp;
289 	uint32_t bucket;
290 
291 	bucket = dir_hash(ls, name, namelen);
292 
293 	write_lock(&ls->ls_dirtbl[bucket].lock);
294 	de = search_bucket(ls, name, namelen, bucket);
295 	if (de) {
296 		*r_nodeid = de->master_nodeid;
297 		write_unlock(&ls->ls_dirtbl[bucket].lock);
298 		if (*r_nodeid == nodeid)
299 			return -EEXIST;
300 		return 0;
301 	}
302 
303 	write_unlock(&ls->ls_dirtbl[bucket].lock);
304 
305 	de = allocate_direntry(ls, namelen);
306 	if (!de)
307 		return -ENOMEM;
308 
309 	de->master_nodeid = nodeid;
310 	de->length = namelen;
311 	memcpy(de->name, name, namelen);
312 
313 	write_lock(&ls->ls_dirtbl[bucket].lock);
314 	tmp = search_bucket(ls, name, namelen, bucket);
315 	if (tmp) {
316 		free_direntry(de);
317 		de = tmp;
318 	} else {
319 		list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
320 	}
321 	*r_nodeid = de->master_nodeid;
322 	write_unlock(&ls->ls_dirtbl[bucket].lock);
323 	return 0;
324 }
325 
326 int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
327 		   int *r_nodeid)
328 {
329 	return get_entry(ls, nodeid, name, namelen, r_nodeid);
330 }
331 
332 /* Copy the names of master rsb's into the buffer provided.
333    Only select names whose dir node is the given nodeid. */
334 
335 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
336  			   char *outbuf, int outlen, int nodeid)
337 {
338 	struct list_head *list;
339 	struct dlm_rsb *start_r = NULL, *r = NULL;
340 	int offset = 0, start_namelen, error, dir_nodeid;
341 	char *start_name;
342 	uint16_t be_namelen;
343 
344 	/*
345 	 * Find the rsb where we left off (or start again)
346 	 */
347 
348 	start_namelen = inlen;
349 	start_name = inbuf;
350 
351 	if (start_namelen > 1) {
352 		/*
353 		 * We could also use a find_rsb_root() function here that
354 		 * searched the ls_root_list.
355 		 */
356 		error = dlm_find_rsb(ls, start_name, start_namelen, R_MASTER,
357 				     &start_r);
358 		DLM_ASSERT(!error && start_r,
359 			   printk("error %d\n", error););
360 		DLM_ASSERT(!list_empty(&start_r->res_root_list),
361 			   dlm_print_rsb(start_r););
362 		dlm_put_rsb(start_r);
363 	}
364 
365 	/*
366 	 * Send rsb names for rsb's we're master of and whose directory node
367 	 * matches the requesting node.
368 	 */
369 
370 	down_read(&ls->ls_root_sem);
371 	if (start_r)
372 		list = start_r->res_root_list.next;
373 	else
374 		list = ls->ls_root_list.next;
375 
376 	for (offset = 0; list != &ls->ls_root_list; list = list->next) {
377 		r = list_entry(list, struct dlm_rsb, res_root_list);
378 		if (r->res_nodeid)
379 			continue;
380 
381 		dir_nodeid = dlm_dir_nodeid(r);
382 		if (dir_nodeid != nodeid)
383 			continue;
384 
385 		/*
386 		 * The block ends when we can't fit the following in the
387 		 * remaining buffer space:
388 		 * namelen (uint16_t) +
389 		 * name (r->res_length) +
390 		 * end-of-block record 0x0000 (uint16_t)
391 		 */
392 
393 		if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
394 			/* Write end-of-block record */
395 			be_namelen = 0;
396 			memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
397 			offset += sizeof(uint16_t);
398 			goto out;
399 		}
400 
401 		be_namelen = cpu_to_be16(r->res_length);
402 		memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
403 		offset += sizeof(uint16_t);
404 		memcpy(outbuf + offset, r->res_name, r->res_length);
405 		offset += r->res_length;
406 	}
407 
408 	/*
409 	 * If we've reached the end of the list (and there's room) write a
410 	 * terminating record.
411 	 */
412 
413 	if ((list == &ls->ls_root_list) &&
414 	    (offset + sizeof(uint16_t) <= outlen)) {
415 		be_namelen = 0xFFFF;
416 		memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
417 		offset += sizeof(uint16_t);
418 	}
419 
420  out:
421 	up_read(&ls->ls_root_sem);
422 }
423 
424