xref: /titanic_44/usr/src/uts/common/fs/nfs/nfs4_db.c (revision 1db2880b3a411e3c56e50c7dc42d3b137fcc4e48)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
23  */
24 
25 #include <sys/systm.h>
26 #include <sys/cmn_err.h>
27 #include <sys/kmem.h>
28 #include <sys/disp.h>
29 #include <sys/id_space.h>
30 #include <sys/atomic.h>
31 #include <rpc/rpc.h>
32 #include <nfs/nfs4.h>
33 #include <nfs/nfs4_db_impl.h>
34 #include <sys/sdt.h>
35 
36 static int rfs4_reap_interval = RFS4_REAP_INTERVAL;
37 
38 static void rfs4_dbe_reap(rfs4_table_t *, time_t, uint32_t);
39 static void rfs4_dbe_destroy(rfs4_dbe_t *);
40 static rfs4_dbe_t *rfs4_dbe_create(rfs4_table_t *, id_t, rfs4_entry_t);
41 static void rfs4_start_reaper(rfs4_table_t *);
42 
43 /*
44  * t_lowat - integer percentage of table entries	/etc/system only
45  * t_hiwat - integer percentage of table entries	/etc/system only
46  * t_lreap - integer percentage of table reap time	mdb or /etc/system
47  * t_hreap - integer percentage of table reap time	mdb or /etc/system
48  */
49 uint32_t	t_lowat = 50;	/* reap at t_lreap when id's in use hit 50% */
50 uint32_t	t_hiwat = 75;	/* reap at t_hreap when id's in use hit 75% */
51 time_t		t_lreap = 50;	/* default to 50% of table's reap interval */
52 time_t		t_hreap = 10;	/* default to 10% of table's reap interval */
53 
54 id_t
55 rfs4_dbe_getid(rfs4_dbe_t *entry)
56 {
57 	return (entry->dbe_id);
58 }
59 
60 void
61 rfs4_dbe_hold(rfs4_dbe_t *entry)
62 {
63 	atomic_inc_32(&entry->dbe_refcnt);
64 }
65 
66 /*
67  * rfs4_dbe_rele_nolock only decrements the reference count of the entry.
68  */
69 void
70 rfs4_dbe_rele_nolock(rfs4_dbe_t *entry)
71 {
72 	atomic_dec_32(&entry->dbe_refcnt);
73 }
74 
75 
76 uint32_t
77 rfs4_dbe_refcnt(rfs4_dbe_t *entry)
78 {
79 	return (entry->dbe_refcnt);
80 }
81 
82 /*
83  * Mark an entry such that the dbsearch will skip it.
84  * Caller does not want this entry to be found any longer
85  */
86 void
87 rfs4_dbe_invalidate(rfs4_dbe_t *entry)
88 {
89 	entry->dbe_invalid = TRUE;
90 	entry->dbe_skipsearch = TRUE;
91 }
92 
93 /*
94  * Is this entry invalid?
95  */
96 bool_t
97 rfs4_dbe_is_invalid(rfs4_dbe_t *entry)
98 {
99 	return (entry->dbe_invalid);
100 }
101 
102 time_t
103 rfs4_dbe_get_timerele(rfs4_dbe_t *entry)
104 {
105 	return (entry->dbe_time_rele);
106 }
107 
108 /*
109  * Use these to temporarily hide/unhide a db entry.
110  */
111 void
112 rfs4_dbe_hide(rfs4_dbe_t *entry)
113 {
114 	rfs4_dbe_lock(entry);
115 	entry->dbe_skipsearch = TRUE;
116 	rfs4_dbe_unlock(entry);
117 }
118 
119 void
120 rfs4_dbe_unhide(rfs4_dbe_t *entry)
121 {
122 	rfs4_dbe_lock(entry);
123 	entry->dbe_skipsearch = FALSE;
124 	rfs4_dbe_unlock(entry);
125 }
126 
127 void
128 rfs4_dbe_rele(rfs4_dbe_t *entry)
129 {
130 	mutex_enter(entry->dbe_lock);
131 	ASSERT(entry->dbe_refcnt > 1);
132 	atomic_dec_32(&entry->dbe_refcnt);
133 	entry->dbe_time_rele = gethrestime_sec();
134 	mutex_exit(entry->dbe_lock);
135 }
136 
137 void
138 rfs4_dbe_lock(rfs4_dbe_t *entry)
139 {
140 	mutex_enter(entry->dbe_lock);
141 }
142 
143 void
144 rfs4_dbe_unlock(rfs4_dbe_t *entry)
145 {
146 	mutex_exit(entry->dbe_lock);
147 }
148 
149 bool_t
150 rfs4_dbe_islocked(rfs4_dbe_t *entry)
151 {
152 	return (mutex_owned(entry->dbe_lock));
153 }
154 
155 clock_t
156 rfs4_dbe_twait(rfs4_dbe_t *entry, clock_t timeout)
157 {
158 	return (cv_timedwait(entry->dbe_cv, entry->dbe_lock, timeout));
159 }
160 
161 void
162 rfs4_dbe_cv_broadcast(rfs4_dbe_t *entry)
163 {
164 	cv_broadcast(entry->dbe_cv);
165 }
166 
167 /* ARGSUSED */
168 static int
169 rfs4_dbe_kmem_constructor(void *obj, void *private, int kmflag)
170 {
171 	rfs4_dbe_t *entry = obj;
172 
173 	mutex_init(entry->dbe_lock, NULL, MUTEX_DEFAULT, NULL);
174 	cv_init(entry->dbe_cv, NULL, CV_DEFAULT, NULL);
175 
176 	return (0);
177 }
178 
179 static void
180 rfs4_dbe_kmem_destructor(void *obj, void *private)
181 {
182 	rfs4_dbe_t *entry = obj;
183 	/*LINTED*/
184 	rfs4_table_t *table = private;
185 
186 	mutex_destroy(entry->dbe_lock);
187 	cv_destroy(entry->dbe_cv);
188 }
189 
190 rfs4_database_t *
191 rfs4_database_create(uint32_t flags)
192 {
193 	rfs4_database_t *db;
194 
195 	db = kmem_alloc(sizeof (rfs4_database_t), KM_SLEEP);
196 	mutex_init(db->db_lock, NULL, MUTEX_DEFAULT, NULL);
197 	db->db_tables = NULL;
198 	db->db_debug_flags = flags;
199 	db->db_shutdown_count = 0;
200 	cv_init(&db->db_shutdown_wait, NULL, CV_DEFAULT, NULL);
201 	return (db);
202 }
203 
204 
205 /*
206  * The reaper threads that have been created for the tables in this
207  * database must be stopped and the entries in the tables released.
208  * Each table will be marked as "shutdown" and the reaper threads
209  * poked and they will see that a shutdown is in progress and cleanup
210  * and exit.  This function waits for all reaper threads to stop
211  * before returning to the caller.
212  */
213 void
214 rfs4_database_shutdown(rfs4_database_t *db)
215 {
216 	rfs4_table_t *table;
217 
218 	mutex_enter(db->db_lock);
219 	for (table = db->db_tables; table; table = table->dbt_tnext) {
220 		mutex_enter(&table->dbt_reaper_cv_lock);
221 		table->dbt_reaper_shutdown = TRUE;
222 		cv_broadcast(&table->dbt_reaper_wait);
223 		db->db_shutdown_count++;
224 		mutex_exit(&table->dbt_reaper_cv_lock);
225 	}
226 	while (db->db_shutdown_count > 0) {
227 		cv_wait(&db->db_shutdown_wait, db->db_lock);
228 	}
229 	mutex_exit(db->db_lock);
230 }
231 
232 /*
233  * Given a database that has been "shutdown" by the function above all
234  * of the table tables are destroyed and then the database itself
235  * freed.
236  */
237 void
238 rfs4_database_destroy(rfs4_database_t *db)
239 {
240 	rfs4_table_t *next, *tmp;
241 
242 	for (next = db->db_tables; next; ) {
243 		tmp = next;
244 		next = tmp->dbt_tnext;
245 		rfs4_table_destroy(db, tmp);
246 	}
247 
248 	mutex_destroy(db->db_lock);
249 	kmem_free(db, sizeof (rfs4_database_t));
250 }
251 
252 rfs4_table_t *
253 rfs4_table_create(rfs4_database_t *db, char *tabname, time_t max_cache_time,
254     uint32_t idxcnt, bool_t (*create)(rfs4_entry_t, void *),
255     void (*destroy)(rfs4_entry_t),
256     bool_t (*expiry)(rfs4_entry_t),
257     uint32_t size, uint32_t hashsize,
258     uint32_t maxentries, id_t start)
259 {
260 	rfs4_table_t	*table;
261 	int		 len;
262 	char		*cache_name;
263 	char		*id_name;
264 
265 	table = kmem_alloc(sizeof (rfs4_table_t), KM_SLEEP);
266 	table->dbt_db = db;
267 	rw_init(table->dbt_t_lock, NULL, RW_DEFAULT, NULL);
268 	mutex_init(table->dbt_lock, NULL, MUTEX_DEFAULT, NULL);
269 	mutex_init(&table->dbt_reaper_cv_lock, NULL, MUTEX_DEFAULT, NULL);
270 	cv_init(&table->dbt_reaper_wait, NULL, CV_DEFAULT, NULL);
271 
272 	len = strlen(tabname);
273 	table->dbt_name = kmem_alloc(len+1, KM_SLEEP);
274 	cache_name = kmem_alloc(len + 12 /* "_entry_cache" */ + 1, KM_SLEEP);
275 	(void) strcpy(table->dbt_name, tabname);
276 	(void) sprintf(cache_name, "%s_entry_cache", table->dbt_name);
277 	table->dbt_max_cache_time = max_cache_time;
278 	table->dbt_usize = size;
279 	table->dbt_len = hashsize;
280 	table->dbt_count = 0;
281 	table->dbt_idxcnt = 0;
282 	table->dbt_ccnt = 0;
283 	table->dbt_maxcnt = idxcnt;
284 	table->dbt_indices = NULL;
285 	table->dbt_id_space = NULL;
286 	table->dbt_reaper_shutdown = FALSE;
287 
288 	if (start >= 0) {
289 		if (maxentries + (uint32_t)start > (uint32_t)INT32_MAX)
290 			maxentries = INT32_MAX - start;
291 		id_name = kmem_alloc(len + 9 /* "_id_space" */ + 1, KM_SLEEP);
292 		(void) sprintf(id_name, "%s_id_space", table->dbt_name);
293 		table->dbt_id_space = id_space_create(id_name, start,
294 		    maxentries + start);
295 		kmem_free(id_name, len + 10);
296 	}
297 	ASSERT(t_lowat != 0);
298 	table->dbt_id_lwat = (maxentries * t_lowat) / 100;
299 	ASSERT(t_hiwat != 0);
300 	table->dbt_id_hwat = (maxentries * t_hiwat) / 100;
301 	table->dbt_id_reap = MIN(rfs4_reap_interval, max_cache_time);
302 	table->dbt_maxentries = maxentries;
303 	table->dbt_create = create;
304 	table->dbt_destroy = destroy;
305 	table->dbt_expiry = expiry;
306 
307 	table->dbt_mem_cache = kmem_cache_create(cache_name,
308 	    sizeof (rfs4_dbe_t) + idxcnt * sizeof (rfs4_link_t) + size,
309 	    0,
310 	    rfs4_dbe_kmem_constructor,
311 	    rfs4_dbe_kmem_destructor,
312 	    NULL,
313 	    table,
314 	    NULL,
315 	    0);
316 	kmem_free(cache_name, len+13);
317 
318 	table->dbt_debug = db->db_debug_flags;
319 
320 	mutex_enter(db->db_lock);
321 	table->dbt_tnext = db->db_tables;
322 	db->db_tables = table;
323 	mutex_exit(db->db_lock);
324 
325 	rfs4_start_reaper(table);
326 
327 	return (table);
328 }
329 
330 void
331 rfs4_table_destroy(rfs4_database_t *db, rfs4_table_t *table)
332 {
333 	rfs4_table_t *p;
334 	rfs4_index_t *idx;
335 
336 	ASSERT(table->dbt_count == 0);
337 
338 	mutex_enter(db->db_lock);
339 	if (table == db->db_tables)
340 		db->db_tables = table->dbt_tnext;
341 	else {
342 		for (p = db->db_tables; p; p = p->dbt_tnext)
343 			if (p->dbt_tnext == table) {
344 				p->dbt_tnext = table->dbt_tnext;
345 				table->dbt_tnext = NULL;
346 				break;
347 			}
348 		ASSERT(p != NULL);
349 	}
350 	mutex_exit(db->db_lock);
351 
352 	/* Destroy indices */
353 	while (table->dbt_indices) {
354 		idx = table->dbt_indices;
355 		table->dbt_indices = idx->dbi_inext;
356 		rfs4_index_destroy(idx);
357 	}
358 
359 	rw_destroy(table->dbt_t_lock);
360 	mutex_destroy(table->dbt_lock);
361 	mutex_destroy(&table->dbt_reaper_cv_lock);
362 	cv_destroy(&table->dbt_reaper_wait);
363 
364 	kmem_free(table->dbt_name, strlen(table->dbt_name) + 1);
365 	if (table->dbt_id_space)
366 		id_space_destroy(table->dbt_id_space);
367 	kmem_cache_destroy(table->dbt_mem_cache);
368 	kmem_free(table, sizeof (rfs4_table_t));
369 }
370 
371 rfs4_index_t *
372 rfs4_index_create(rfs4_table_t *table, char *keyname,
373     uint32_t (*hash)(void *),
374     bool_t (compare)(rfs4_entry_t, void *),
375     void *(*mkkey)(rfs4_entry_t),
376     bool_t createable)
377 {
378 	rfs4_index_t *idx;
379 
380 	ASSERT(table->dbt_idxcnt < table->dbt_maxcnt);
381 
382 	idx = kmem_alloc(sizeof (rfs4_index_t), KM_SLEEP);
383 
384 	idx->dbi_table = table;
385 	idx->dbi_keyname = kmem_alloc(strlen(keyname) + 1, KM_SLEEP);
386 	(void) strcpy(idx->dbi_keyname, keyname);
387 	idx->dbi_hash = hash;
388 	idx->dbi_compare = compare;
389 	idx->dbi_mkkey = mkkey;
390 	idx->dbi_tblidx = table->dbt_idxcnt;
391 	table->dbt_idxcnt++;
392 	if (createable) {
393 		table->dbt_ccnt++;
394 		if (table->dbt_ccnt > 1)
395 			panic("Table %s currently can have only have one "
396 			    "index that will allow creation of entries",
397 			    table->dbt_name);
398 		idx->dbi_createable = TRUE;
399 	} else {
400 		idx->dbi_createable = FALSE;
401 	}
402 
403 	idx->dbi_inext = table->dbt_indices;
404 	table->dbt_indices = idx;
405 	idx->dbi_buckets = kmem_zalloc(sizeof (rfs4_bucket_t) * table->dbt_len,
406 	    KM_SLEEP);
407 
408 	return (idx);
409 }
410 
411 void
412 rfs4_index_destroy(rfs4_index_t *idx)
413 {
414 	kmem_free(idx->dbi_keyname, strlen(idx->dbi_keyname) + 1);
415 	kmem_free(idx->dbi_buckets,
416 	    sizeof (rfs4_bucket_t) * idx->dbi_table->dbt_len);
417 	kmem_free(idx, sizeof (rfs4_index_t));
418 }
419 
420 static void
421 rfs4_dbe_destroy(rfs4_dbe_t *entry)
422 {
423 	rfs4_index_t *idx;
424 	void *key;
425 	int i;
426 	rfs4_bucket_t *bp;
427 	rfs4_table_t *table = entry->dbe_table;
428 	rfs4_link_t *l;
429 
430 	NFS4_DEBUG(table->dbt_debug & DESTROY_DEBUG,
431 	    (CE_NOTE, "Destroying entry %p from %s",
432 	    (void*)entry, table->dbt_name));
433 
434 	mutex_enter(entry->dbe_lock);
435 	ASSERT(entry->dbe_refcnt == 0);
436 	mutex_exit(entry->dbe_lock);
437 
438 	/* Unlink from all indices */
439 	for (idx = table->dbt_indices; idx; idx = idx->dbi_inext) {
440 		l = &entry->dbe_indices[idx->dbi_tblidx];
441 		/* check and see if we were ever linked in to the index */
442 		if (INVALID_LINK(l)) {
443 			ASSERT(l->next == NULL && l->prev == NULL);
444 			continue;
445 		}
446 		key = idx->dbi_mkkey(entry->dbe_data);
447 		i = HASH(idx, key);
448 		bp = &idx->dbi_buckets[i];
449 		ASSERT(bp->dbk_head != NULL);
450 		DEQUEUE_IDX(bp, &entry->dbe_indices[idx->dbi_tblidx]);
451 	}
452 
453 	/* Destroy user data */
454 	if (table->dbt_destroy)
455 		(*table->dbt_destroy)(entry->dbe_data);
456 
457 	if (table->dbt_id_space)
458 		id_free(table->dbt_id_space, entry->dbe_id);
459 
460 	mutex_enter(table->dbt_lock);
461 	table->dbt_count--;
462 	mutex_exit(table->dbt_lock);
463 
464 	/* Destroy the entry itself */
465 	kmem_cache_free(table->dbt_mem_cache, entry);
466 }
467 
468 
469 static rfs4_dbe_t *
470 rfs4_dbe_create(rfs4_table_t *table, id_t id, rfs4_entry_t data)
471 {
472 	rfs4_dbe_t *entry;
473 	int i;
474 
475 	NFS4_DEBUG(table->dbt_debug & CREATE_DEBUG,
476 	    (CE_NOTE, "Creating entry in table %s", table->dbt_name));
477 
478 	entry = kmem_cache_alloc(table->dbt_mem_cache, KM_SLEEP);
479 
480 	entry->dbe_refcnt = 1;
481 	entry->dbe_invalid = FALSE;
482 	entry->dbe_skipsearch = FALSE;
483 	entry->dbe_time_rele = 0;
484 	entry->dbe_id = 0;
485 
486 	if (table->dbt_id_space)
487 		entry->dbe_id = id;
488 	entry->dbe_table = table;
489 
490 	for (i = 0; i < table->dbt_maxcnt; i++) {
491 		entry->dbe_indices[i].next = entry->dbe_indices[i].prev = NULL;
492 		entry->dbe_indices[i].entry = entry;
493 		/*
494 		 * We mark the entry as not indexed by setting the low
495 		 * order bit, since address are word aligned. This has
496 		 * the advantage of causeing a trap if the address is
497 		 * used. After the entry is linked in to the
498 		 * corresponding index the bit will be cleared.
499 		 */
500 		INVALIDATE_ADDR(entry->dbe_indices[i].entry);
501 	}
502 
503 	entry->dbe_data = (rfs4_entry_t)&entry->dbe_indices[table->dbt_maxcnt];
504 	bzero(entry->dbe_data, table->dbt_usize);
505 	entry->dbe_data->dbe = entry;
506 
507 	if (!(*table->dbt_create)(entry->dbe_data, data)) {
508 		kmem_cache_free(table->dbt_mem_cache, entry);
509 		return (NULL);
510 	}
511 
512 	mutex_enter(table->dbt_lock);
513 	table->dbt_count++;
514 	mutex_exit(table->dbt_lock);
515 
516 	return (entry);
517 }
518 
519 static void
520 rfs4_dbe_tabreap_adjust(rfs4_table_t *table)
521 {
522 	clock_t		tabreap;
523 	clock_t		reap_int;
524 	uint32_t	in_use;
525 
526 	/*
527 	 * Adjust the table's reap interval based on the
528 	 * number of id's currently in use. Each table's
529 	 * default remains the same if id usage subsides.
530 	 */
531 	ASSERT(MUTEX_HELD(&table->dbt_reaper_cv_lock));
532 	tabreap = MIN(rfs4_reap_interval, table->dbt_max_cache_time);
533 
534 	in_use = table->dbt_count + 1;	/* see rfs4_dbe_create */
535 	if (in_use >= table->dbt_id_hwat) {
536 		ASSERT(t_hreap != 0);
537 		reap_int = (tabreap * t_hreap) / 100;
538 	} else if (in_use >= table->dbt_id_lwat) {
539 		ASSERT(t_lreap != 0);
540 		reap_int = (tabreap * t_lreap) / 100;
541 	} else {
542 		reap_int = tabreap;
543 	}
544 	table->dbt_id_reap = reap_int;
545 	DTRACE_PROBE2(table__reap__interval, char *,
546 	    table->dbt_name, time_t, table->dbt_id_reap);
547 }
548 
549 rfs4_entry_t
550 rfs4_dbsearch(rfs4_index_t *idx, void *key, bool_t *create, void *arg,
551     rfs4_dbsearch_type_t dbsearch_type)
552 {
553 	int		 already_done;
554 	uint32_t	 i;
555 	rfs4_table_t	*table = idx->dbi_table;
556 	rfs4_index_t	*ip;
557 	rfs4_bucket_t	*bp;
558 	rfs4_link_t	*l;
559 	rfs4_dbe_t	*entry;
560 	id_t		 id = -1;
561 
562 	i = HASH(idx, key);
563 	bp = &idx->dbi_buckets[i];
564 
565 	NFS4_DEBUG(table->dbt_debug & SEARCH_DEBUG,
566 	    (CE_NOTE, "Searching for key %p in table %s by %s",
567 	    key, table->dbt_name, idx->dbi_keyname));
568 
569 	rw_enter(bp->dbk_lock, RW_READER);
570 retry:
571 	for (l = bp->dbk_head; l; l = l->next) {
572 		if (l->entry->dbe_refcnt > 0 &&
573 		    (l->entry->dbe_skipsearch == FALSE ||
574 		    (l->entry->dbe_skipsearch == TRUE &&
575 		    dbsearch_type == RFS4_DBS_INVALID)) &&
576 		    (*idx->dbi_compare)(l->entry->dbe_data, key)) {
577 			mutex_enter(l->entry->dbe_lock);
578 			if (l->entry->dbe_refcnt == 0) {
579 				mutex_exit(l->entry->dbe_lock);
580 				continue;
581 			}
582 
583 			/* place an additional hold since we are returning */
584 			rfs4_dbe_hold(l->entry);
585 
586 			mutex_exit(l->entry->dbe_lock);
587 			rw_exit(bp->dbk_lock);
588 
589 			*create = FALSE;
590 
591 			NFS4_DEBUG((table->dbt_debug & SEARCH_DEBUG),
592 			    (CE_NOTE, "Found entry %p for %p in table %s",
593 			    (void *)l->entry, key, table->dbt_name));
594 
595 			if (id != -1)
596 				id_free(table->dbt_id_space, id);
597 			return (l->entry->dbe_data);
598 		}
599 	}
600 
601 	if (!*create || table->dbt_create == NULL || !idx->dbi_createable ||
602 	    table->dbt_maxentries == table->dbt_count) {
603 		NFS4_DEBUG(table->dbt_debug & SEARCH_DEBUG,
604 		    (CE_NOTE, "Entry for %p in %s not found",
605 		    key, table->dbt_name));
606 
607 		rw_exit(bp->dbk_lock);
608 		if (id != -1)
609 			id_free(table->dbt_id_space, id);
610 		return (NULL);
611 	}
612 
613 	if (table->dbt_id_space && id == -1) {
614 		rw_exit(bp->dbk_lock);
615 
616 		/* get an id, ok to sleep for it here */
617 		id = id_alloc(table->dbt_id_space);
618 		ASSERT(id != -1);
619 
620 		mutex_enter(&table->dbt_reaper_cv_lock);
621 		rfs4_dbe_tabreap_adjust(table);
622 		mutex_exit(&table->dbt_reaper_cv_lock);
623 
624 		rw_enter(bp->dbk_lock, RW_WRITER);
625 		goto retry;
626 	}
627 
628 	/* get an exclusive lock on the bucket */
629 	if (rw_read_locked(bp->dbk_lock) && !rw_tryupgrade(bp->dbk_lock)) {
630 		NFS4_DEBUG(table->dbt_debug & OTHER_DEBUG,
631 		    (CE_NOTE, "Trying to upgrade lock on "
632 		    "hash chain %d (%p) for  %s by %s",
633 		    i, (void*)bp, table->dbt_name, idx->dbi_keyname));
634 
635 		rw_exit(bp->dbk_lock);
636 		rw_enter(bp->dbk_lock, RW_WRITER);
637 		goto retry;
638 	}
639 
640 	/* create entry */
641 	entry = rfs4_dbe_create(table, id, arg);
642 	if (entry == NULL) {
643 		rw_exit(bp->dbk_lock);
644 		if (id != -1)
645 			id_free(table->dbt_id_space, id);
646 
647 		NFS4_DEBUG(table->dbt_debug & CREATE_DEBUG,
648 		    (CE_NOTE, "Constructor for table %s failed",
649 		    table->dbt_name));
650 		return (NULL);
651 	}
652 
653 	/*
654 	 * Add one ref for entry into table's hash - only one
655 	 * reference added even though there may be multiple indices
656 	 */
657 	rfs4_dbe_hold(entry);
658 	ENQUEUE(bp->dbk_head, &entry->dbe_indices[idx->dbi_tblidx]);
659 	VALIDATE_ADDR(entry->dbe_indices[idx->dbi_tblidx].entry);
660 
661 	already_done = idx->dbi_tblidx;
662 	rw_exit(bp->dbk_lock);
663 
664 	for (ip = table->dbt_indices; ip; ip = ip->dbi_inext) {
665 		if (ip->dbi_tblidx == already_done)
666 			continue;
667 		l = &entry->dbe_indices[ip->dbi_tblidx];
668 		i = HASH(ip, ip->dbi_mkkey(entry->dbe_data));
669 		ASSERT(i < ip->dbi_table->dbt_len);
670 		bp = &ip->dbi_buckets[i];
671 		ENQUEUE_IDX(bp, l);
672 	}
673 
674 	NFS4_DEBUG(
675 	    table->dbt_debug & SEARCH_DEBUG || table->dbt_debug & CREATE_DEBUG,
676 	    (CE_NOTE, "Entry %p created for %s = %p in table %s",
677 	    (void*)entry, idx->dbi_keyname, (void*)key, table->dbt_name));
678 
679 	return (entry->dbe_data);
680 }
681 
682 /*ARGSUSED*/
683 boolean_t
684 rfs4_cpr_callb(void *arg, int code)
685 {
686 	rfs4_table_t *table = rfs4_client_tab;
687 	rfs4_bucket_t *buckets, *bp;
688 	rfs4_link_t *l;
689 	rfs4_client_t *cp;
690 	int i;
691 
692 	/*
693 	 * We get called for Suspend and Resume events.
694 	 * For the suspend case we simply don't care!  Nor do we care if
695 	 * there are no clients.
696 	 */
697 	if (code == CB_CODE_CPR_CHKPT || table == NULL) {
698 		return (B_TRUE);
699 	}
700 
701 	buckets = table->dbt_indices->dbi_buckets;
702 
703 	/*
704 	 * When we get this far we are in the process of
705 	 * resuming the system from a previous suspend.
706 	 *
707 	 * We are going to blast through and update the
708 	 * last_access time for all the clients and in
709 	 * doing so extend them by one lease period.
710 	 */
711 	for (i = 0; i < table->dbt_len; i++) {
712 		bp = &buckets[i];
713 		for (l = bp->dbk_head; l; l = l->next) {
714 			cp = (rfs4_client_t *)l->entry->dbe_data;
715 			cp->rc_last_access = gethrestime_sec();
716 		}
717 	}
718 
719 	return (B_TRUE);
720 }
721 
722 /*
723  * Given a table, lock each of the buckets and walk all entries (in
724  * turn locking those) and calling the provided "callout" function
725  * with the provided parameter.  Obviously used to iterate across all
726  * entries in a particular table via the database locking hierarchy.
727  * Obviously the caller must not hold locks on any of the entries in
728  * the specified table.
729  */
730 void
731 rfs4_dbe_walk(rfs4_table_t *table,
732     void (*callout)(rfs4_entry_t, void *),
733     void *data)
734 {
735 	rfs4_bucket_t *buckets = table->dbt_indices->dbi_buckets, *bp;
736 	rfs4_link_t *l;
737 	rfs4_dbe_t *entry;
738 	int i;
739 
740 	NFS4_DEBUG(table->dbt_debug & WALK_DEBUG,
741 	    (CE_NOTE, "Walking entries in %s", table->dbt_name));
742 
743 	/* Walk the buckets looking for entries to release/destroy */
744 	for (i = 0; i < table->dbt_len; i++) {
745 		bp = &buckets[i];
746 		rw_enter(bp->dbk_lock, RW_READER);
747 		for (l = bp->dbk_head; l; l = l->next) {
748 			entry = l->entry;
749 			mutex_enter(entry->dbe_lock);
750 			(*callout)(entry->dbe_data, data);
751 			mutex_exit(entry->dbe_lock);
752 		}
753 		rw_exit(bp->dbk_lock);
754 	}
755 
756 	NFS4_DEBUG(table->dbt_debug & WALK_DEBUG,
757 	    (CE_NOTE, "Walking entries complete %s", table->dbt_name));
758 }
759 
760 
761 static void
762 rfs4_dbe_reap(rfs4_table_t *table, time_t cache_time, uint32_t desired)
763 {
764 	rfs4_index_t *idx = table->dbt_indices;
765 	rfs4_bucket_t *buckets = idx->dbi_buckets, *bp;
766 	rfs4_link_t *l, *t;
767 	rfs4_dbe_t *entry;
768 	bool_t found;
769 	int i;
770 	int count = 0;
771 
772 	NFS4_DEBUG(table->dbt_debug & REAP_DEBUG,
773 	    (CE_NOTE, "Reaping %d entries older than %ld seconds in table %s",
774 	    desired, cache_time, table->dbt_name));
775 
776 	/* Walk the buckets looking for entries to release/destroy */
777 	for (i = 0; i < table->dbt_len; i++) {
778 		bp = &buckets[i];
779 		do {
780 			found = FALSE;
781 			rw_enter(bp->dbk_lock, RW_READER);
782 			for (l = bp->dbk_head; l; l = l->next) {
783 				entry = l->entry;
784 				/*
785 				 * Examine an entry.  Ref count of 1 means
786 				 * that the only reference is for the hash
787 				 * table reference.
788 				 */
789 				if (entry->dbe_refcnt != 1)
790 					continue;
791 				mutex_enter(entry->dbe_lock);
792 				if ((entry->dbe_refcnt == 1) &&
793 				    (table->dbt_reaper_shutdown ||
794 				    table->dbt_expiry == NULL ||
795 				    (*table->dbt_expiry)(entry->dbe_data))) {
796 					entry->dbe_refcnt--;
797 					count++;
798 					found = TRUE;
799 				}
800 				mutex_exit(entry->dbe_lock);
801 			}
802 			if (found) {
803 				if (!rw_tryupgrade(bp->dbk_lock)) {
804 					rw_exit(bp->dbk_lock);
805 					rw_enter(bp->dbk_lock, RW_WRITER);
806 				}
807 
808 				l = bp->dbk_head;
809 				while (l) {
810 					t = l;
811 					entry = t->entry;
812 					l = l->next;
813 					if (entry->dbe_refcnt == 0) {
814 						DEQUEUE(bp->dbk_head, t);
815 						t->next = NULL;
816 						t->prev = NULL;
817 						INVALIDATE_ADDR(t->entry);
818 						rfs4_dbe_destroy(entry);
819 					}
820 				}
821 			}
822 			rw_exit(bp->dbk_lock);
823 			/*
824 			 * delay slightly if there is more work to do
825 			 * with the expectation that other reaper
826 			 * threads are freeing data structures as well
827 			 * and in turn will reduce ref counts on
828 			 * entries in this table allowing them to be
829 			 * released.  This is only done in the
830 			 * instance that the tables are being shut down.
831 			 */
832 			if (table->dbt_reaper_shutdown && bp->dbk_head != NULL)
833 				delay(hz/100);
834 		/*
835 		 * If this is a table shutdown, keep going until
836 		 * everything is gone
837 		 */
838 		} while (table->dbt_reaper_shutdown && bp->dbk_head != NULL);
839 
840 		if (!table->dbt_reaper_shutdown && desired && count >= desired)
841 			break;
842 	}
843 
844 	NFS4_DEBUG(table->dbt_debug & REAP_DEBUG,
845 	    (CE_NOTE, "Reaped %d entries older than %ld seconds in table %s",
846 	    count, cache_time, table->dbt_name));
847 }
848 
849 static void
850 reaper_thread(caddr_t *arg)
851 {
852 	rfs4_table_t	*table = (rfs4_table_t *)arg;
853 	clock_t		 rc;
854 
855 	NFS4_DEBUG(table->dbt_debug,
856 	    (CE_NOTE, "rfs4_reaper_thread starting for %s", table->dbt_name));
857 
858 	CALLB_CPR_INIT(&table->dbt_reaper_cpr_info, &table->dbt_reaper_cv_lock,
859 	    callb_generic_cpr, "nfsv4Reaper");
860 
861 	mutex_enter(&table->dbt_reaper_cv_lock);
862 	do {
863 		CALLB_CPR_SAFE_BEGIN(&table->dbt_reaper_cpr_info);
864 		rc = cv_reltimedwait_sig(&table->dbt_reaper_wait,
865 		    &table->dbt_reaper_cv_lock,
866 		    SEC_TO_TICK(table->dbt_id_reap), TR_CLOCK_TICK);
867 		CALLB_CPR_SAFE_END(&table->dbt_reaper_cpr_info,
868 		    &table->dbt_reaper_cv_lock);
869 		rfs4_dbe_reap(table, table->dbt_max_cache_time, 0);
870 	} while (rc != 0 && table->dbt_reaper_shutdown == FALSE);
871 
872 	CALLB_CPR_EXIT(&table->dbt_reaper_cpr_info);
873 
874 	NFS4_DEBUG(table->dbt_debug,
875 	    (CE_NOTE, "rfs4_reaper_thread exiting for %s", table->dbt_name));
876 
877 	/* Notify the database shutdown processing that the table is shutdown */
878 	mutex_enter(table->dbt_db->db_lock);
879 	table->dbt_db->db_shutdown_count--;
880 	cv_signal(&table->dbt_db->db_shutdown_wait);
881 	mutex_exit(table->dbt_db->db_lock);
882 }
883 
884 static void
885 rfs4_start_reaper(rfs4_table_t *table)
886 {
887 	if (table->dbt_max_cache_time == 0)
888 		return;
889 
890 	(void) thread_create(NULL, 0, reaper_thread, table, 0, &p0, TS_RUN,
891 	    minclsyspri);
892 }
893 
894 #ifdef DEBUG
895 void
896 rfs4_dbe_debug(rfs4_dbe_t *entry)
897 {
898 	cmn_err(CE_NOTE, "Entry %p from table %s",
899 	    (void *)entry, entry->dbe_table->dbt_name);
900 	cmn_err(CE_CONT, "\trefcnt = %d id = %d",
901 	    entry->dbe_refcnt, entry->dbe_id);
902 }
903 #endif
904