xref: /illumos-gate/usr/src/uts/common/fs/nfs/nfs4_db.c (revision 986b458dd38036ac346e3cedf55812c5fad90cde)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #include <sys/systm.h>
27 #include <sys/cmn_err.h>
28 #include <sys/kmem.h>
29 #include <sys/disp.h>
30 #include <sys/id_space.h>
31 #include <sys/atomic.h>
32 #include <rpc/rpc.h>
33 #include <nfs/nfs4.h>
34 #include <nfs/nfs4_db_impl.h>
35 #include <sys/sdt.h>
36 
37 static int rfs4_reap_interval = RFS4_REAP_INTERVAL;
38 
39 static void rfs4_dbe_reap(rfs4_table_t *, time_t, uint32_t);
40 static void rfs4_dbe_destroy(rfs4_dbe_t *);
41 static rfs4_dbe_t *rfs4_dbe_create(rfs4_table_t *, id_t, rfs4_entry_t);
42 static void rfs4_start_reaper(rfs4_table_t *);
43 
44 /*
45  * t_lowat - integer percentage of table entries	/etc/system only
46  * t_hiwat - integer percentage of table entries	/etc/system only
47  * t_lreap - integer percentage of table reap time	mdb or /etc/system
48  * t_hreap - integer percentage of table reap time	mdb or /etc/system
49  */
50 uint32_t	t_lowat = 50;	/* reap at t_lreap when id's in use hit 50% */
51 uint32_t	t_hiwat = 75;	/* reap at t_hreap when id's in use hit 75% */
52 time_t		t_lreap = 50;	/* default to 50% of table's reap interval */
53 time_t		t_hreap = 10;	/* default to 10% of table's reap interval */
54 
55 id_t
56 rfs4_dbe_getid(rfs4_dbe_t *entry)
57 {
58 	return (entry->dbe_id);
59 }
60 
61 void
62 rfs4_dbe_hold(rfs4_dbe_t *entry)
63 {
64 	atomic_add_32(&entry->dbe_refcnt, 1);
65 }
66 
67 /*
68  * rfs4_dbe_rele_nolock only decrements the reference count of the entry.
69  */
70 void
71 rfs4_dbe_rele_nolock(rfs4_dbe_t *entry)
72 {
73 	atomic_add_32(&entry->dbe_refcnt, -1);
74 }
75 
76 
77 uint32_t
78 rfs4_dbe_refcnt(rfs4_dbe_t *entry)
79 {
80 	return (entry->dbe_refcnt);
81 }
82 
83 /*
84  * Mark an entry such that the dbsearch will skip it.
85  * Caller does not want this entry to be found any longer
86  */
87 void
88 rfs4_dbe_invalidate(rfs4_dbe_t *entry)
89 {
90 	entry->dbe_invalid = TRUE;
91 	entry->dbe_skipsearch = TRUE;
92 }
93 
94 /*
95  * Is this entry invalid?
96  */
97 bool_t
98 rfs4_dbe_is_invalid(rfs4_dbe_t *entry)
99 {
100 	return (entry->dbe_invalid);
101 }
102 
103 time_t
104 rfs4_dbe_get_timerele(rfs4_dbe_t *entry)
105 {
106 	return (entry->dbe_time_rele);
107 }
108 
109 /*
110  * Use these to temporarily hide/unhide a db entry.
111  */
112 void
113 rfs4_dbe_hide(rfs4_dbe_t *entry)
114 {
115 	rfs4_dbe_lock(entry);
116 	entry->dbe_skipsearch = TRUE;
117 	rfs4_dbe_unlock(entry);
118 }
119 
120 void
121 rfs4_dbe_unhide(rfs4_dbe_t *entry)
122 {
123 	rfs4_dbe_lock(entry);
124 	entry->dbe_skipsearch = FALSE;
125 	rfs4_dbe_unlock(entry);
126 }
127 
128 void
129 rfs4_dbe_rele(rfs4_dbe_t *entry)
130 {
131 	mutex_enter(entry->dbe_lock);
132 	ASSERT(entry->dbe_refcnt > 1);
133 	atomic_add_32(&entry->dbe_refcnt, -1);
134 	entry->dbe_time_rele = gethrestime_sec();
135 	mutex_exit(entry->dbe_lock);
136 }
137 
138 void
139 rfs4_dbe_lock(rfs4_dbe_t *entry)
140 {
141 	mutex_enter(entry->dbe_lock);
142 }
143 
144 void
145 rfs4_dbe_unlock(rfs4_dbe_t *entry)
146 {
147 	mutex_exit(entry->dbe_lock);
148 }
149 
150 bool_t
151 rfs4_dbe_islocked(rfs4_dbe_t *entry)
152 {
153 	return (mutex_owned(entry->dbe_lock));
154 }
155 
156 clock_t
157 rfs4_dbe_twait(rfs4_dbe_t *entry, clock_t timeout)
158 {
159 	return (cv_timedwait(entry->dbe_cv, entry->dbe_lock, timeout));
160 }
161 
162 void
163 rfs4_dbe_cv_broadcast(rfs4_dbe_t *entry)
164 {
165 	cv_broadcast(entry->dbe_cv);
166 }
167 
168 /* ARGSUSED */
169 static int
170 rfs4_dbe_kmem_constructor(void *obj, void *private, int kmflag)
171 {
172 	rfs4_dbe_t *entry = obj;
173 
174 	mutex_init(entry->dbe_lock, NULL, MUTEX_DEFAULT, NULL);
175 	cv_init(entry->dbe_cv, NULL, CV_DEFAULT, NULL);
176 
177 	return (0);
178 }
179 
180 static void
181 rfs4_dbe_kmem_destructor(void *obj, void *private)
182 {
183 	rfs4_dbe_t *entry = obj;
184 	/*LINTED*/
185 	rfs4_table_t *table = private;
186 
187 	mutex_destroy(entry->dbe_lock);
188 	cv_destroy(entry->dbe_cv);
189 }
190 
191 rfs4_database_t *
192 rfs4_database_create(uint32_t flags)
193 {
194 	rfs4_database_t *db;
195 
196 	db = kmem_alloc(sizeof (rfs4_database_t), KM_SLEEP);
197 	mutex_init(db->db_lock, NULL, MUTEX_DEFAULT, NULL);
198 	db->db_tables = NULL;
199 	db->db_debug_flags = flags;
200 	db->db_shutdown_count = 0;
201 	cv_init(&db->db_shutdown_wait, NULL, CV_DEFAULT, NULL);
202 	return (db);
203 }
204 
205 
206 /*
207  * The reaper threads that have been created for the tables in this
208  * database must be stopped and the entries in the tables released.
209  * Each table will be marked as "shutdown" and the reaper threads
210  * poked and they will see that a shutdown is in progress and cleanup
211  * and exit.  This function waits for all reaper threads to stop
212  * before returning to the caller.
213  */
214 void
215 rfs4_database_shutdown(rfs4_database_t *db)
216 {
217 	rfs4_table_t *table;
218 
219 	mutex_enter(db->db_lock);
220 	for (table = db->db_tables; table; table = table->dbt_tnext) {
221 		table->dbt_reaper_shutdown = TRUE;
222 		mutex_enter(&table->dbt_reaper_cv_lock);
223 		cv_broadcast(&table->dbt_reaper_wait);
224 		db->db_shutdown_count++;
225 		mutex_exit(&table->dbt_reaper_cv_lock);
226 	}
227 	while (db->db_shutdown_count > 0) {
228 		cv_wait(&db->db_shutdown_wait, db->db_lock);
229 	}
230 	mutex_exit(db->db_lock);
231 }
232 
233 /*
234  * Given a database that has been "shutdown" by the function above all
235  * of the table tables are destroyed and then the database itself
236  * freed.
237  */
238 void
239 rfs4_database_destroy(rfs4_database_t *db)
240 {
241 	rfs4_table_t *next, *tmp;
242 
243 	for (next = db->db_tables; next; ) {
244 		tmp = next;
245 		next = tmp->dbt_tnext;
246 		rfs4_table_destroy(db, tmp);
247 	}
248 
249 	mutex_destroy(db->db_lock);
250 	kmem_free(db, sizeof (rfs4_database_t));
251 }
252 
253 rfs4_table_t *
254 rfs4_table_create(rfs4_database_t *db, char *tabname, time_t max_cache_time,
255     uint32_t idxcnt, bool_t (*create)(rfs4_entry_t, void *),
256     void (*destroy)(rfs4_entry_t),
257     bool_t (*expiry)(rfs4_entry_t),
258     uint32_t size, uint32_t hashsize,
259     uint32_t maxentries, id_t start)
260 {
261 	rfs4_table_t	*table;
262 	int		 len;
263 	char		*cache_name;
264 	char		*id_name;
265 
266 	table = kmem_alloc(sizeof (rfs4_table_t), KM_SLEEP);
267 	table->dbt_db = db;
268 	rw_init(table->dbt_t_lock, NULL, RW_DEFAULT, NULL);
269 	mutex_init(table->dbt_lock, NULL, MUTEX_DEFAULT, NULL);
270 	mutex_init(&table->dbt_reaper_cv_lock, NULL, MUTEX_DEFAULT, NULL);
271 	cv_init(&table->dbt_reaper_wait, NULL, CV_DEFAULT, NULL);
272 
273 	len = strlen(tabname);
274 	table->dbt_name = kmem_alloc(len+1, KM_SLEEP);
275 	cache_name = kmem_alloc(len + 12 /* "_entry_cache" */ + 1, KM_SLEEP);
276 	(void) strcpy(table->dbt_name, tabname);
277 	(void) sprintf(cache_name, "%s_entry_cache", table->dbt_name);
278 	table->dbt_max_cache_time = max_cache_time;
279 	table->dbt_usize = size;
280 	table->dbt_len = hashsize;
281 	table->dbt_count = 0;
282 	table->dbt_idxcnt = 0;
283 	table->dbt_ccnt = 0;
284 	table->dbt_maxcnt = idxcnt;
285 	table->dbt_indices = NULL;
286 	table->dbt_id_space = NULL;
287 	table->dbt_reaper_shutdown = FALSE;
288 
289 	if (start >= 0) {
290 		if (maxentries + (uint32_t)start > (uint32_t)INT32_MAX)
291 			maxentries = INT32_MAX - start;
292 		id_name = kmem_alloc(len + 9 /* "_id_space" */ + 1, KM_SLEEP);
293 		(void) sprintf(id_name, "%s_id_space", table->dbt_name);
294 		table->dbt_id_space = id_space_create(id_name, start,
295 		    maxentries + start);
296 		kmem_free(id_name, len + 10);
297 	}
298 	ASSERT(t_lowat != 0);
299 	table->dbt_id_lwat = (maxentries * t_lowat) / 100;
300 	ASSERT(t_hiwat != 0);
301 	table->dbt_id_hwat = (maxentries * t_hiwat) / 100;
302 	table->dbt_id_reap = MIN(rfs4_reap_interval, max_cache_time);
303 	table->dbt_maxentries = maxentries;
304 	table->dbt_create = create;
305 	table->dbt_destroy = destroy;
306 	table->dbt_expiry = expiry;
307 
308 	table->dbt_mem_cache = kmem_cache_create(cache_name,
309 	    sizeof (rfs4_dbe_t) + idxcnt * sizeof (rfs4_link_t) + size,
310 	    0,
311 	    rfs4_dbe_kmem_constructor,
312 	    rfs4_dbe_kmem_destructor,
313 	    NULL,
314 	    table,
315 	    NULL,
316 	    0);
317 	kmem_free(cache_name, len+13);
318 
319 	table->dbt_debug = db->db_debug_flags;
320 
321 	mutex_enter(db->db_lock);
322 	table->dbt_tnext = db->db_tables;
323 	db->db_tables = table;
324 	mutex_exit(db->db_lock);
325 
326 	rfs4_start_reaper(table);
327 
328 	return (table);
329 }
330 
331 void
332 rfs4_table_destroy(rfs4_database_t *db, rfs4_table_t *table)
333 {
334 	rfs4_table_t *p;
335 	rfs4_index_t *idx;
336 
337 	ASSERT(table->dbt_count == 0);
338 
339 	mutex_enter(db->db_lock);
340 	if (table == db->db_tables)
341 		db->db_tables = table->dbt_tnext;
342 	else {
343 		for (p = db->db_tables; p; p = p->dbt_tnext)
344 			if (p->dbt_tnext == table) {
345 				p->dbt_tnext = table->dbt_tnext;
346 				table->dbt_tnext = NULL;
347 				break;
348 			}
349 		ASSERT(p != NULL);
350 	}
351 	mutex_exit(db->db_lock);
352 
353 	/* Destroy indices */
354 	while (table->dbt_indices) {
355 		idx = table->dbt_indices;
356 		table->dbt_indices = idx->dbi_inext;
357 		rfs4_index_destroy(idx);
358 	}
359 
360 	rw_destroy(table->dbt_t_lock);
361 	mutex_destroy(table->dbt_lock);
362 	mutex_destroy(&table->dbt_reaper_cv_lock);
363 	cv_destroy(&table->dbt_reaper_wait);
364 
365 	kmem_free(table->dbt_name, strlen(table->dbt_name) + 1);
366 	if (table->dbt_id_space)
367 		id_space_destroy(table->dbt_id_space);
368 	kmem_cache_destroy(table->dbt_mem_cache);
369 	kmem_free(table, sizeof (rfs4_table_t));
370 }
371 
372 rfs4_index_t *
373 rfs4_index_create(rfs4_table_t *table, char *keyname,
374     uint32_t (*hash)(void *),
375     bool_t (compare)(rfs4_entry_t, void *),
376     void *(*mkkey)(rfs4_entry_t),
377     bool_t createable)
378 {
379 	rfs4_index_t *idx;
380 
381 	ASSERT(table->dbt_idxcnt < table->dbt_maxcnt);
382 
383 	idx = kmem_alloc(sizeof (rfs4_index_t), KM_SLEEP);
384 
385 	idx->dbi_table = table;
386 	idx->dbi_keyname = kmem_alloc(strlen(keyname) + 1, KM_SLEEP);
387 	(void) strcpy(idx->dbi_keyname, keyname);
388 	idx->dbi_hash = hash;
389 	idx->dbi_compare = compare;
390 	idx->dbi_mkkey = mkkey;
391 	idx->dbi_tblidx = table->dbt_idxcnt;
392 	table->dbt_idxcnt++;
393 	if (createable) {
394 		table->dbt_ccnt++;
395 		if (table->dbt_ccnt > 1)
396 			panic("Table %s currently can have only have one "
397 			    "index that will allow creation of entries",
398 			    table->dbt_name);
399 		idx->dbi_createable = TRUE;
400 	} else {
401 		idx->dbi_createable = FALSE;
402 	}
403 
404 	idx->dbi_inext = table->dbt_indices;
405 	table->dbt_indices = idx;
406 	idx->dbi_buckets = kmem_zalloc(sizeof (rfs4_bucket_t) * table->dbt_len,
407 	    KM_SLEEP);
408 
409 	return (idx);
410 }
411 
412 void
413 rfs4_index_destroy(rfs4_index_t *idx)
414 {
415 	kmem_free(idx->dbi_keyname, strlen(idx->dbi_keyname) + 1);
416 	kmem_free(idx->dbi_buckets,
417 	    sizeof (rfs4_bucket_t) * idx->dbi_table->dbt_len);
418 	kmem_free(idx, sizeof (rfs4_index_t));
419 }
420 
421 static void
422 rfs4_dbe_destroy(rfs4_dbe_t *entry)
423 {
424 	rfs4_index_t *idx;
425 	void *key;
426 	int i;
427 	rfs4_bucket_t *bp;
428 	rfs4_table_t *table = entry->dbe_table;
429 	rfs4_link_t *l;
430 
431 	NFS4_DEBUG(table->dbt_debug & DESTROY_DEBUG,
432 	    (CE_NOTE, "Destroying entry %p from %s",
433 	    (void*)entry, table->dbt_name));
434 
435 	mutex_enter(entry->dbe_lock);
436 	ASSERT(entry->dbe_refcnt == 0);
437 	mutex_exit(entry->dbe_lock);
438 
439 	/* Unlink from all indices */
440 	for (idx = table->dbt_indices; idx; idx = idx->dbi_inext) {
441 		l = &entry->dbe_indices[idx->dbi_tblidx];
442 		/* check and see if we were ever linked in to the index */
443 		if (INVALID_LINK(l)) {
444 			ASSERT(l->next == NULL && l->prev == NULL);
445 			continue;
446 		}
447 		key = idx->dbi_mkkey(entry->dbe_data);
448 		i = HASH(idx, key);
449 		bp = &idx->dbi_buckets[i];
450 		ASSERT(bp->dbk_head != NULL);
451 		DEQUEUE_IDX(bp, &entry->dbe_indices[idx->dbi_tblidx]);
452 	}
453 
454 	/* Destroy user data */
455 	if (table->dbt_destroy)
456 		(*table->dbt_destroy)(entry->dbe_data);
457 
458 	if (table->dbt_id_space)
459 		id_free(table->dbt_id_space, entry->dbe_id);
460 
461 	mutex_enter(table->dbt_lock);
462 	table->dbt_count--;
463 	mutex_exit(table->dbt_lock);
464 
465 	/* Destroy the entry itself */
466 	kmem_cache_free(table->dbt_mem_cache, entry);
467 }
468 
469 
470 static rfs4_dbe_t *
471 rfs4_dbe_create(rfs4_table_t *table, id_t id, rfs4_entry_t data)
472 {
473 	rfs4_dbe_t *entry;
474 	int i;
475 
476 	NFS4_DEBUG(table->dbt_debug & CREATE_DEBUG,
477 	    (CE_NOTE, "Creating entry in table %s", table->dbt_name));
478 
479 	entry = kmem_cache_alloc(table->dbt_mem_cache, KM_SLEEP);
480 
481 	entry->dbe_refcnt = 1;
482 	entry->dbe_invalid = FALSE;
483 	entry->dbe_skipsearch = FALSE;
484 	entry->dbe_time_rele = 0;
485 	entry->dbe_id = 0;
486 
487 	if (table->dbt_id_space)
488 		entry->dbe_id = id;
489 	entry->dbe_table = table;
490 
491 	for (i = 0; i < table->dbt_maxcnt; i++) {
492 		entry->dbe_indices[i].next = entry->dbe_indices[i].prev = NULL;
493 		entry->dbe_indices[i].entry = entry;
494 		/*
495 		 * We mark the entry as not indexed by setting the low
496 		 * order bit, since address are word aligned. This has
497 		 * the advantage of causeing a trap if the address is
498 		 * used. After the entry is linked in to the
499 		 * corresponding index the bit will be cleared.
500 		 */
501 		INVALIDATE_ADDR(entry->dbe_indices[i].entry);
502 	}
503 
504 	entry->dbe_data = (rfs4_entry_t)&entry->dbe_indices[table->dbt_maxcnt];
505 	bzero(entry->dbe_data, table->dbt_usize);
506 	entry->dbe_data->dbe = entry;
507 
508 	if (!(*table->dbt_create)(entry->dbe_data, data)) {
509 		kmem_cache_free(table->dbt_mem_cache, entry);
510 		return (NULL);
511 	}
512 
513 	mutex_enter(table->dbt_lock);
514 	table->dbt_count++;
515 	mutex_exit(table->dbt_lock);
516 
517 	return (entry);
518 }
519 
520 static void
521 rfs4_dbe_tabreap_adjust(rfs4_table_t *table)
522 {
523 	clock_t		tabreap;
524 	clock_t		reap_int;
525 	uint32_t	in_use;
526 
527 	/*
528 	 * Adjust the table's reap interval based on the
529 	 * number of id's currently in use. Each table's
530 	 * default remains the same if id usage subsides.
531 	 */
532 	ASSERT(MUTEX_HELD(&table->dbt_reaper_cv_lock));
533 	tabreap = MIN(rfs4_reap_interval, table->dbt_max_cache_time);
534 
535 	in_use = table->dbt_count + 1;	/* see rfs4_dbe_create */
536 	if (in_use >= table->dbt_id_hwat) {
537 		ASSERT(t_hreap != 0);
538 		reap_int = (tabreap * t_hreap) / 100;
539 	} else if (in_use >= table->dbt_id_lwat) {
540 		ASSERT(t_lreap != 0);
541 		reap_int = (tabreap * t_lreap) / 100;
542 	} else {
543 		reap_int = tabreap;
544 	}
545 	table->dbt_id_reap = reap_int;
546 	DTRACE_PROBE2(table__reap__interval, char *,
547 	    table->dbt_name, time_t, table->dbt_id_reap);
548 }
549 
550 rfs4_entry_t
551 rfs4_dbsearch(rfs4_index_t *idx, void *key, bool_t *create, void *arg,
552     rfs4_dbsearch_type_t dbsearch_type)
553 {
554 	int		 already_done;
555 	uint32_t	 i;
556 	rfs4_table_t	*table = idx->dbi_table;
557 	rfs4_index_t	*ip;
558 	rfs4_bucket_t	*bp;
559 	rfs4_link_t	*l;
560 	rfs4_dbe_t	*entry;
561 	id_t		 id = -1;
562 
563 	i = HASH(idx, key);
564 	bp = &idx->dbi_buckets[i];
565 
566 	NFS4_DEBUG(table->dbt_debug & SEARCH_DEBUG,
567 	    (CE_NOTE, "Searching for key %p in table %s by %s",
568 	    key, table->dbt_name, idx->dbi_keyname));
569 
570 	rw_enter(bp->dbk_lock, RW_READER);
571 retry:
572 	for (l = bp->dbk_head; l; l = l->next) {
573 		if (l->entry->dbe_refcnt > 0 &&
574 		    (l->entry->dbe_skipsearch == FALSE ||
575 		    (l->entry->dbe_skipsearch == TRUE &&
576 		    dbsearch_type == RFS4_DBS_INVALID)) &&
577 		    (*idx->dbi_compare)(l->entry->dbe_data, key)) {
578 			mutex_enter(l->entry->dbe_lock);
579 			if (l->entry->dbe_refcnt == 0) {
580 				mutex_exit(l->entry->dbe_lock);
581 				continue;
582 			}
583 
584 			/* place an additional hold since we are returning */
585 			rfs4_dbe_hold(l->entry);
586 
587 			mutex_exit(l->entry->dbe_lock);
588 			rw_exit(bp->dbk_lock);
589 
590 			*create = FALSE;
591 
592 			NFS4_DEBUG((table->dbt_debug & SEARCH_DEBUG),
593 			    (CE_NOTE, "Found entry %p for %p in table %s",
594 			    (void *)l->entry, key, table->dbt_name));
595 
596 			if (id != -1)
597 				id_free(table->dbt_id_space, id);
598 			return (l->entry->dbe_data);
599 		}
600 	}
601 
602 	if (!*create || table->dbt_create == NULL || !idx->dbi_createable ||
603 	    table->dbt_maxentries == table->dbt_count) {
604 		NFS4_DEBUG(table->dbt_debug & SEARCH_DEBUG,
605 		    (CE_NOTE, "Entry for %p in %s not found",
606 		    key, table->dbt_name));
607 
608 		rw_exit(bp->dbk_lock);
609 		if (id != -1)
610 			id_free(table->dbt_id_space, id);
611 		return (NULL);
612 	}
613 
614 	if (table->dbt_id_space && id == -1) {
615 		rw_exit(bp->dbk_lock);
616 
617 		/* get an id, ok to sleep for it here */
618 		id = id_alloc(table->dbt_id_space);
619 		ASSERT(id != -1);
620 
621 		mutex_enter(&table->dbt_reaper_cv_lock);
622 		rfs4_dbe_tabreap_adjust(table);
623 		mutex_exit(&table->dbt_reaper_cv_lock);
624 
625 		rw_enter(bp->dbk_lock, RW_WRITER);
626 		goto retry;
627 	}
628 
629 	/* get an exclusive lock on the bucket */
630 	if (rw_read_locked(bp->dbk_lock) && !rw_tryupgrade(bp->dbk_lock)) {
631 		NFS4_DEBUG(table->dbt_debug & OTHER_DEBUG,
632 		    (CE_NOTE, "Trying to upgrade lock on "
633 		    "hash chain %d (%p) for  %s by %s",
634 		    i, (void*)bp, table->dbt_name, idx->dbi_keyname));
635 
636 		rw_exit(bp->dbk_lock);
637 		rw_enter(bp->dbk_lock, RW_WRITER);
638 		goto retry;
639 	}
640 
641 	/* create entry */
642 	entry = rfs4_dbe_create(table, id, arg);
643 	if (entry == NULL) {
644 		rw_exit(bp->dbk_lock);
645 		if (id != -1)
646 			id_free(table->dbt_id_space, id);
647 
648 		NFS4_DEBUG(table->dbt_debug & CREATE_DEBUG,
649 		    (CE_NOTE, "Constructor for table %s failed",
650 		    table->dbt_name));
651 		return (NULL);
652 	}
653 
654 	/*
655 	 * Add one ref for entry into table's hash - only one
656 	 * reference added even though there may be multiple indices
657 	 */
658 	rfs4_dbe_hold(entry);
659 	ENQUEUE(bp->dbk_head, &entry->dbe_indices[idx->dbi_tblidx]);
660 	VALIDATE_ADDR(entry->dbe_indices[idx->dbi_tblidx].entry);
661 
662 	already_done = idx->dbi_tblidx;
663 	rw_exit(bp->dbk_lock);
664 
665 	for (ip = table->dbt_indices; ip; ip = ip->dbi_inext) {
666 		if (ip->dbi_tblidx == already_done)
667 			continue;
668 		l = &entry->dbe_indices[ip->dbi_tblidx];
669 		i = HASH(ip, ip->dbi_mkkey(entry->dbe_data));
670 		ASSERT(i < ip->dbi_table->dbt_len);
671 		bp = &ip->dbi_buckets[i];
672 		ENQUEUE_IDX(bp, l);
673 	}
674 
675 	NFS4_DEBUG(
676 	    table->dbt_debug & SEARCH_DEBUG || table->dbt_debug & CREATE_DEBUG,
677 	    (CE_NOTE, "Entry %p created for %s = %p in table %s",
678 	    (void*)entry, idx->dbi_keyname, (void*)key, table->dbt_name));
679 
680 	return (entry->dbe_data);
681 }
682 
683 /*ARGSUSED*/
684 boolean_t
685 rfs4_cpr_callb(void *arg, int code)
686 {
687 	rfs4_table_t *table = rfs4_client_tab;
688 	rfs4_bucket_t *buckets, *bp;
689 	rfs4_link_t *l;
690 	rfs4_client_t *cp;
691 	int i;
692 
693 	/*
694 	 * We get called for Suspend and Resume events.
695 	 * For the suspend case we simply don't care!  Nor do we care if
696 	 * there are no clients.
697 	 */
698 	if (code == CB_CODE_CPR_CHKPT || table == NULL) {
699 		return (B_TRUE);
700 	}
701 
702 	buckets = table->dbt_indices->dbi_buckets;
703 
704 	/*
705 	 * When we get this far we are in the process of
706 	 * resuming the system from a previous suspend.
707 	 *
708 	 * We are going to blast through and update the
709 	 * last_access time for all the clients and in
710 	 * doing so extend them by one lease period.
711 	 */
712 	for (i = 0; i < table->dbt_len; i++) {
713 		bp = &buckets[i];
714 		for (l = bp->dbk_head; l; l = l->next) {
715 			cp = (rfs4_client_t *)l->entry->dbe_data;
716 			cp->rc_last_access = gethrestime_sec();
717 		}
718 	}
719 
720 	return (B_TRUE);
721 }
722 
723 /*
724  * Given a table, lock each of the buckets and walk all entries (in
725  * turn locking those) and calling the provided "callout" function
726  * with the provided parameter.  Obviously used to iterate across all
727  * entries in a particular table via the database locking hierarchy.
728  * Obviously the caller must not hold locks on any of the entries in
729  * the specified table.
730  */
731 void
732 rfs4_dbe_walk(rfs4_table_t *table,
733     void (*callout)(rfs4_entry_t, void *),
734     void *data)
735 {
736 	rfs4_bucket_t *buckets = table->dbt_indices->dbi_buckets, *bp;
737 	rfs4_link_t *l;
738 	rfs4_dbe_t *entry;
739 	int i;
740 
741 	NFS4_DEBUG(table->dbt_debug & WALK_DEBUG,
742 	    (CE_NOTE, "Walking entries in %s", table->dbt_name));
743 
744 	/* Walk the buckets looking for entries to release/destroy */
745 	for (i = 0; i < table->dbt_len; i++) {
746 		bp = &buckets[i];
747 		rw_enter(bp->dbk_lock, RW_READER);
748 		for (l = bp->dbk_head; l; l = l->next) {
749 			entry = l->entry;
750 			mutex_enter(entry->dbe_lock);
751 			(*callout)(entry->dbe_data, data);
752 			mutex_exit(entry->dbe_lock);
753 		}
754 		rw_exit(bp->dbk_lock);
755 	}
756 
757 	NFS4_DEBUG(table->dbt_debug & WALK_DEBUG,
758 	    (CE_NOTE, "Walking entries complete %s", table->dbt_name));
759 }
760 
761 
762 static void
763 rfs4_dbe_reap(rfs4_table_t *table, time_t cache_time, uint32_t desired)
764 {
765 	rfs4_index_t *idx = table->dbt_indices;
766 	rfs4_bucket_t *buckets = idx->dbi_buckets, *bp;
767 	rfs4_link_t *l, *t;
768 	rfs4_dbe_t *entry;
769 	bool_t found;
770 	int i;
771 	int count = 0;
772 
773 	NFS4_DEBUG(table->dbt_debug & REAP_DEBUG,
774 	    (CE_NOTE, "Reaping %d entries older than %ld seconds in table %s",
775 	    desired, cache_time, table->dbt_name));
776 
777 	/* Walk the buckets looking for entries to release/destroy */
778 	for (i = 0; i < table->dbt_len; i++) {
779 		bp = &buckets[i];
780 		do {
781 			found = FALSE;
782 			rw_enter(bp->dbk_lock, RW_READER);
783 			for (l = bp->dbk_head; l; l = l->next) {
784 				entry = l->entry;
785 				/*
786 				 * Examine an entry.  Ref count of 1 means
787 				 * that the only reference is for the hash
788 				 * table reference.
789 				 */
790 				if (entry->dbe_refcnt != 1)
791 					continue;
792 				mutex_enter(entry->dbe_lock);
793 				if ((entry->dbe_refcnt == 1) &&
794 				    (table->dbt_reaper_shutdown ||
795 				    table->dbt_expiry == NULL ||
796 				    (*table->dbt_expiry)(entry->dbe_data))) {
797 					entry->dbe_refcnt--;
798 					count++;
799 					found = TRUE;
800 				}
801 				mutex_exit(entry->dbe_lock);
802 			}
803 			if (found) {
804 				if (!rw_tryupgrade(bp->dbk_lock)) {
805 					rw_exit(bp->dbk_lock);
806 					rw_enter(bp->dbk_lock, RW_WRITER);
807 				}
808 
809 				l = bp->dbk_head;
810 				while (l) {
811 					t = l;
812 					entry = t->entry;
813 					l = l->next;
814 					if (entry->dbe_refcnt == 0) {
815 						DEQUEUE(bp->dbk_head, t);
816 						t->next = NULL;
817 						t->prev = NULL;
818 						INVALIDATE_ADDR(t->entry);
819 						rfs4_dbe_destroy(entry);
820 					}
821 				}
822 			}
823 			rw_exit(bp->dbk_lock);
824 			/*
825 			 * delay slightly if there is more work to do
826 			 * with the expectation that other reaper
827 			 * threads are freeing data structures as well
828 			 * and in turn will reduce ref counts on
829 			 * entries in this table allowing them to be
830 			 * released.  This is only done in the
831 			 * instance that the tables are being shut down.
832 			 */
833 			if (table->dbt_reaper_shutdown && bp->dbk_head != NULL)
834 				delay(hz/100);
835 		/*
836 		 * If this is a table shutdown, keep going until
837 		 * everything is gone
838 		 */
839 		} while (table->dbt_reaper_shutdown && bp->dbk_head != NULL);
840 
841 		if (!table->dbt_reaper_shutdown && desired && count >= desired)
842 			break;
843 	}
844 
845 	NFS4_DEBUG(table->dbt_debug & REAP_DEBUG,
846 	    (CE_NOTE, "Reaped %d entries older than %ld seconds in table %s",
847 	    count, cache_time, table->dbt_name));
848 }
849 
850 static void
851 reaper_thread(caddr_t *arg)
852 {
853 	rfs4_table_t	*table = (rfs4_table_t *)arg;
854 	clock_t		 rc;
855 
856 	NFS4_DEBUG(table->dbt_debug,
857 	    (CE_NOTE, "rfs4_reaper_thread starting for %s", table->dbt_name));
858 
859 	CALLB_CPR_INIT(&table->dbt_reaper_cpr_info, &table->dbt_reaper_cv_lock,
860 	    callb_generic_cpr, "nfsv4Reaper");
861 
862 	mutex_enter(&table->dbt_reaper_cv_lock);
863 	do {
864 		CALLB_CPR_SAFE_BEGIN(&table->dbt_reaper_cpr_info);
865 		rc = cv_reltimedwait_sig(&table->dbt_reaper_wait,
866 		    &table->dbt_reaper_cv_lock,
867 		    SEC_TO_TICK(table->dbt_id_reap), TR_CLOCK_TICK);
868 		CALLB_CPR_SAFE_END(&table->dbt_reaper_cpr_info,
869 		    &table->dbt_reaper_cv_lock);
870 		rfs4_dbe_reap(table, table->dbt_max_cache_time, 0);
871 	} while (rc != 0 && table->dbt_reaper_shutdown == FALSE);
872 
873 	CALLB_CPR_EXIT(&table->dbt_reaper_cpr_info);
874 
875 	NFS4_DEBUG(table->dbt_debug,
876 	    (CE_NOTE, "rfs4_reaper_thread exiting for %s", table->dbt_name));
877 
878 	/* Notify the database shutdown processing that the table is shutdown */
879 	mutex_enter(table->dbt_db->db_lock);
880 	table->dbt_db->db_shutdown_count--;
881 	cv_signal(&table->dbt_db->db_shutdown_wait);
882 	mutex_exit(table->dbt_db->db_lock);
883 }
884 
885 static void
886 rfs4_start_reaper(rfs4_table_t *table)
887 {
888 	if (table->dbt_max_cache_time == 0)
889 		return;
890 
891 	(void) thread_create(NULL, 0, reaper_thread, table, 0, &p0, TS_RUN,
892 	    minclsyspri);
893 }
894 
895 #ifdef DEBUG
896 void
897 rfs4_dbe_debug(rfs4_dbe_t *entry)
898 {
899 	cmn_err(CE_NOTE, "Entry %p from table %s",
900 	    (void *)entry, entry->dbe_table->dbt_name);
901 	cmn_err(CE_CONT, "\trefcnt = %d id = %d",
902 	    entry->dbe_refcnt, entry->dbe_id);
903 }
904 #endif
905