xref: /illumos-gate/usr/src/uts/common/fs/nfs/nfs4_db.c (revision 9b9d39d2a32ff806d2431dbcc50968ef1e6d46b2)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  */
25 
26 /*
27  * Copyright 2018 Nexenta Systems, Inc.
28  */
29 
30 #include <sys/systm.h>
31 #include <sys/cmn_err.h>
32 #include <sys/kmem.h>
33 #include <sys/disp.h>
34 #include <sys/id_space.h>
35 #include <sys/atomic.h>
36 #include <rpc/rpc.h>
37 #include <nfs/nfs4.h>
38 #include <nfs/nfs4_db_impl.h>
39 #include <sys/sdt.h>
40 
41 static int rfs4_reap_interval = RFS4_REAP_INTERVAL;
42 
43 static void rfs4_dbe_reap(rfs4_table_t *, time_t, uint32_t);
44 static void rfs4_dbe_destroy(rfs4_dbe_t *);
45 static rfs4_dbe_t *rfs4_dbe_create(rfs4_table_t *, id_t, rfs4_entry_t);
46 static void rfs4_start_reaper(rfs4_table_t *);
47 
48 /*
49  * t_lowat - integer percentage of table entries	/etc/system only
50  * t_hiwat - integer percentage of table entries	/etc/system only
51  * t_lreap - integer percentage of table reap time	mdb or /etc/system
52  * t_hreap - integer percentage of table reap time	mdb or /etc/system
53  */
54 uint32_t	t_lowat = 50;	/* reap at t_lreap when id's in use hit 50% */
55 uint32_t	t_hiwat = 75;	/* reap at t_hreap when id's in use hit 75% */
56 time_t		t_lreap = 50;	/* default to 50% of table's reap interval */
57 time_t		t_hreap = 10;	/* default to 10% of table's reap interval */
58 
59 id_t
60 rfs4_dbe_getid(rfs4_dbe_t *entry)
61 {
62 	return (entry->dbe_id);
63 }
64 
65 void
66 rfs4_dbe_hold(rfs4_dbe_t *entry)
67 {
68 	atomic_inc_32(&entry->dbe_refcnt);
69 }
70 
71 /*
72  * rfs4_dbe_rele_nolock only decrements the reference count of the entry.
73  */
74 void
75 rfs4_dbe_rele_nolock(rfs4_dbe_t *entry)
76 {
77 	atomic_dec_32(&entry->dbe_refcnt);
78 }
79 
80 
81 uint32_t
82 rfs4_dbe_refcnt(rfs4_dbe_t *entry)
83 {
84 	return (entry->dbe_refcnt);
85 }
86 
87 /*
88  * Mark an entry such that the dbsearch will skip it.
89  * Caller does not want this entry to be found any longer
90  */
91 void
92 rfs4_dbe_invalidate(rfs4_dbe_t *entry)
93 {
94 	entry->dbe_invalid = TRUE;
95 	entry->dbe_skipsearch = TRUE;
96 }
97 
98 /*
99  * Is this entry invalid?
100  */
101 bool_t
102 rfs4_dbe_is_invalid(rfs4_dbe_t *entry)
103 {
104 	return (entry->dbe_invalid);
105 }
106 
107 time_t
108 rfs4_dbe_get_timerele(rfs4_dbe_t *entry)
109 {
110 	return (entry->dbe_time_rele);
111 }
112 
113 /*
114  * Use these to temporarily hide/unhide a db entry.
115  */
116 void
117 rfs4_dbe_hide(rfs4_dbe_t *entry)
118 {
119 	rfs4_dbe_lock(entry);
120 	entry->dbe_skipsearch = TRUE;
121 	rfs4_dbe_unlock(entry);
122 }
123 
124 void
125 rfs4_dbe_unhide(rfs4_dbe_t *entry)
126 {
127 	rfs4_dbe_lock(entry);
128 	entry->dbe_skipsearch = FALSE;
129 	rfs4_dbe_unlock(entry);
130 }
131 
132 void
133 rfs4_dbe_rele(rfs4_dbe_t *entry)
134 {
135 	mutex_enter(entry->dbe_lock);
136 	ASSERT(entry->dbe_refcnt > 1);
137 	atomic_dec_32(&entry->dbe_refcnt);
138 	entry->dbe_time_rele = gethrestime_sec();
139 	mutex_exit(entry->dbe_lock);
140 }
141 
142 void
143 rfs4_dbe_lock(rfs4_dbe_t *entry)
144 {
145 	mutex_enter(entry->dbe_lock);
146 }
147 
148 void
149 rfs4_dbe_unlock(rfs4_dbe_t *entry)
150 {
151 	mutex_exit(entry->dbe_lock);
152 }
153 
154 bool_t
155 rfs4_dbe_islocked(rfs4_dbe_t *entry)
156 {
157 	return (mutex_owned(entry->dbe_lock));
158 }
159 
160 clock_t
161 rfs4_dbe_twait(rfs4_dbe_t *entry, clock_t timeout)
162 {
163 	return (cv_timedwait(entry->dbe_cv, entry->dbe_lock, timeout));
164 }
165 
166 void
167 rfs4_dbe_cv_broadcast(rfs4_dbe_t *entry)
168 {
169 	cv_broadcast(entry->dbe_cv);
170 }
171 
172 static int
173 rfs4_dbe_kmem_constructor(void *obj, void *private __unused,
174     int kmflag __unused)
175 {
176 	rfs4_dbe_t *entry = obj;
177 
178 	mutex_init(entry->dbe_lock, NULL, MUTEX_DEFAULT, NULL);
179 	cv_init(entry->dbe_cv, NULL, CV_DEFAULT, NULL);
180 
181 	return (0);
182 }
183 
184 static void
185 rfs4_dbe_kmem_destructor(void *obj, void *private __unused)
186 {
187 	rfs4_dbe_t *entry = obj;
188 
189 	mutex_destroy(entry->dbe_lock);
190 	cv_destroy(entry->dbe_cv);
191 }
192 
193 rfs4_database_t *
194 rfs4_database_create(uint32_t flags)
195 {
196 	rfs4_database_t *db;
197 
198 	db = kmem_alloc(sizeof (rfs4_database_t), KM_SLEEP);
199 	mutex_init(db->db_lock, NULL, MUTEX_DEFAULT, NULL);
200 	db->db_tables = NULL;
201 	db->db_debug_flags = flags;
202 	db->db_shutdown_count = 0;
203 	cv_init(&db->db_shutdown_wait, NULL, CV_DEFAULT, NULL);
204 	return (db);
205 }
206 
207 
208 /*
209  * The reaper threads that have been created for the tables in this
210  * database must be stopped and the entries in the tables released.
211  * Each table will be marked as "shutdown" and the reaper threads
212  * poked and they will see that a shutdown is in progress and cleanup
213  * and exit.  This function waits for all reaper threads to stop
214  * before returning to the caller.
215  */
216 void
217 rfs4_database_shutdown(rfs4_database_t *db)
218 {
219 	rfs4_table_t *table;
220 
221 	mutex_enter(db->db_lock);
222 	for (table = db->db_tables; table; table = table->dbt_tnext) {
223 		mutex_enter(&table->dbt_reaper_cv_lock);
224 		table->dbt_reaper_shutdown = TRUE;
225 		cv_broadcast(&table->dbt_reaper_wait);
226 		db->db_shutdown_count++;
227 		mutex_exit(&table->dbt_reaper_cv_lock);
228 	}
229 	while (db->db_shutdown_count > 0) {
230 		cv_wait(&db->db_shutdown_wait, db->db_lock);
231 	}
232 	mutex_exit(db->db_lock);
233 }
234 
235 /*
236  * Given a database that has been "shutdown" by the function above all
237  * of the table tables are destroyed and then the database itself
238  * freed.
239  */
240 void
241 rfs4_database_destroy(rfs4_database_t *db)
242 {
243 	rfs4_table_t *next, *tmp;
244 
245 	for (next = db->db_tables; next; ) {
246 		tmp = next;
247 		next = tmp->dbt_tnext;
248 		rfs4_table_destroy(db, tmp);
249 	}
250 
251 	mutex_destroy(db->db_lock);
252 	kmem_free(db, sizeof (rfs4_database_t));
253 }
254 
255 /*
256  * Used to get the correct kmem_cache database for the state table being
257  * created.
258  * Helper function for rfs4_table_create
259  */
260 static kmem_cache_t *
261 get_db_mem_cache(char *name)
262 {
263 	int i;
264 
265 	for (i = 0; i < RFS4_DB_MEM_CACHE_NUM; i++) {
266 		if (strcmp(name, rfs4_db_mem_cache_table[i].r_db_name) == 0)
267 			return (rfs4_db_mem_cache_table[i].r_db_mem_cache);
268 	}
269 	/*
270 	 * There is no associated kmem cache for this NFS4 server state
271 	 * table name
272 	 */
273 	return (NULL);
274 }
275 
276 /*
277  * Used to initialize the global NFSv4 server state database.
278  * Helper funtion for rfs4_state_g_init and called when module is loaded.
279  */
280 kmem_cache_t *
281 /* CSTYLED */
282 nfs4_init_mem_cache(char *cache_name, uint32_t idxcnt, uint32_t size, uint32_t idx)
283 {
284 	kmem_cache_t *mem_cache = kmem_cache_create(cache_name,
285 	    sizeof (rfs4_dbe_t) + idxcnt * sizeof (rfs4_link_t) + size,
286 	    0,
287 	    rfs4_dbe_kmem_constructor,
288 	    rfs4_dbe_kmem_destructor,
289 	    NULL,
290 	    NULL,
291 	    NULL,
292 	    0);
293 	(void) strlcpy(rfs4_db_mem_cache_table[idx].r_db_name, cache_name,
294 	    strlen(cache_name) + 1);
295 	rfs4_db_mem_cache_table[idx].r_db_mem_cache = mem_cache;
296 	return (mem_cache);
297 }
298 
299 rfs4_table_t *
300 rfs4_table_create(rfs4_database_t *db, char *tabname, time_t max_cache_time,
301     uint32_t idxcnt, bool_t (*create)(rfs4_entry_t, void *),
302     void (*destroy)(rfs4_entry_t),
303     bool_t (*expiry)(rfs4_entry_t),
304     uint32_t size, uint32_t hashsize,
305     uint32_t maxentries, id_t start)
306 {
307 	rfs4_table_t	*table;
308 	int		 len;
309 	char		*cache_name;
310 	char		*id_name;
311 
312 	table = kmem_alloc(sizeof (rfs4_table_t), KM_SLEEP);
313 	table->dbt_db = db;
314 	rw_init(table->dbt_t_lock, NULL, RW_DEFAULT, NULL);
315 	mutex_init(table->dbt_lock, NULL, MUTEX_DEFAULT, NULL);
316 	mutex_init(&table->dbt_reaper_cv_lock, NULL, MUTEX_DEFAULT, NULL);
317 	cv_init(&table->dbt_reaper_wait, NULL, CV_DEFAULT, NULL);
318 
319 	len = strlen(tabname);
320 	table->dbt_name = kmem_alloc(len+1, KM_SLEEP);
321 	cache_name = kmem_alloc(len + 12 /* "_entry_cache" */ + 1, KM_SLEEP);
322 	(void) strcpy(table->dbt_name, tabname);
323 	(void) sprintf(cache_name, "%s_entry_cache", table->dbt_name);
324 	table->dbt_max_cache_time = max_cache_time;
325 	table->dbt_usize = size;
326 	table->dbt_len = hashsize;
327 	table->dbt_count = 0;
328 	table->dbt_idxcnt = 0;
329 	table->dbt_ccnt = 0;
330 	table->dbt_maxcnt = idxcnt;
331 	table->dbt_indices = NULL;
332 	table->dbt_id_space = NULL;
333 	table->dbt_reaper_shutdown = FALSE;
334 
335 	if (start >= 0) {
336 		if (maxentries + (uint32_t)start > (uint32_t)INT32_MAX)
337 			maxentries = INT32_MAX - start;
338 		id_name = kmem_alloc(len + 9 /* "_id_space" */ + 1, KM_SLEEP);
339 		(void) sprintf(id_name, "%s_id_space", table->dbt_name);
340 		table->dbt_id_space = id_space_create(id_name, start,
341 		    maxentries + start);
342 		kmem_free(id_name, len + 10);
343 	}
344 	ASSERT(t_lowat != 0);
345 	table->dbt_id_lwat = (maxentries * t_lowat) / 100;
346 	ASSERT(t_hiwat != 0);
347 	table->dbt_id_hwat = (maxentries * t_hiwat) / 100;
348 	table->dbt_id_reap = MIN(rfs4_reap_interval, max_cache_time);
349 	table->dbt_maxentries = maxentries;
350 	table->dbt_create = create;
351 	table->dbt_destroy = destroy;
352 	table->dbt_expiry = expiry;
353 
354 	/*
355 	 * get the correct kmem_cache for this table type based on the name.
356 	 */
357 	table->dbt_mem_cache = get_db_mem_cache(cache_name);
358 
359 	kmem_free(cache_name, len+13);
360 
361 	table->dbt_debug = db->db_debug_flags;
362 
363 	mutex_enter(db->db_lock);
364 	table->dbt_tnext = db->db_tables;
365 	db->db_tables = table;
366 	mutex_exit(db->db_lock);
367 
368 	rfs4_start_reaper(table);
369 
370 	return (table);
371 }
372 
373 void
374 rfs4_table_destroy(rfs4_database_t *db, rfs4_table_t *table)
375 {
376 	rfs4_table_t *p;
377 	rfs4_index_t *idx;
378 
379 	ASSERT(table->dbt_count == 0);
380 
381 	mutex_enter(db->db_lock);
382 	if (table == db->db_tables)
383 		db->db_tables = table->dbt_tnext;
384 	else {
385 		for (p = db->db_tables; p; p = p->dbt_tnext)
386 			if (p->dbt_tnext == table) {
387 				p->dbt_tnext = table->dbt_tnext;
388 				table->dbt_tnext = NULL;
389 				break;
390 			}
391 		ASSERT(p != NULL);
392 	}
393 	mutex_exit(db->db_lock);
394 
395 	/* Destroy indices */
396 	while (table->dbt_indices) {
397 		idx = table->dbt_indices;
398 		table->dbt_indices = idx->dbi_inext;
399 		rfs4_index_destroy(idx);
400 	}
401 
402 	rw_destroy(table->dbt_t_lock);
403 	mutex_destroy(table->dbt_lock);
404 	mutex_destroy(&table->dbt_reaper_cv_lock);
405 	cv_destroy(&table->dbt_reaper_wait);
406 
407 	kmem_free(table->dbt_name, strlen(table->dbt_name) + 1);
408 	if (table->dbt_id_space)
409 		id_space_destroy(table->dbt_id_space);
410 	table->dbt_mem_cache = NULL;
411 	kmem_free(table, sizeof (rfs4_table_t));
412 }
413 
414 rfs4_index_t *
415 rfs4_index_create(rfs4_table_t *table, char *keyname,
416     uint32_t (*hash)(void *),
417     bool_t (compare)(rfs4_entry_t, void *),
418     void *(*mkkey)(rfs4_entry_t),
419     bool_t createable)
420 {
421 	rfs4_index_t *idx;
422 
423 	ASSERT(table->dbt_idxcnt < table->dbt_maxcnt);
424 
425 	idx = kmem_alloc(sizeof (rfs4_index_t), KM_SLEEP);
426 
427 	idx->dbi_table = table;
428 	idx->dbi_keyname = kmem_alloc(strlen(keyname) + 1, KM_SLEEP);
429 	(void) strcpy(idx->dbi_keyname, keyname);
430 	idx->dbi_hash = hash;
431 	idx->dbi_compare = compare;
432 	idx->dbi_mkkey = mkkey;
433 	idx->dbi_tblidx = table->dbt_idxcnt;
434 	table->dbt_idxcnt++;
435 	if (createable) {
436 		table->dbt_ccnt++;
437 		if (table->dbt_ccnt > 1)
438 			panic("Table %s currently can have only have one "
439 			    "index that will allow creation of entries",
440 			    table->dbt_name);
441 		idx->dbi_createable = TRUE;
442 	} else {
443 		idx->dbi_createable = FALSE;
444 	}
445 
446 	idx->dbi_inext = table->dbt_indices;
447 	table->dbt_indices = idx;
448 	idx->dbi_buckets = kmem_zalloc(sizeof (rfs4_bucket_t) * table->dbt_len,
449 	    KM_SLEEP);
450 
451 	return (idx);
452 }
453 
454 void
455 rfs4_index_destroy(rfs4_index_t *idx)
456 {
457 	kmem_free(idx->dbi_keyname, strlen(idx->dbi_keyname) + 1);
458 	kmem_free(idx->dbi_buckets,
459 	    sizeof (rfs4_bucket_t) * idx->dbi_table->dbt_len);
460 	kmem_free(idx, sizeof (rfs4_index_t));
461 }
462 
463 static void
464 rfs4_dbe_destroy(rfs4_dbe_t *entry)
465 {
466 	rfs4_index_t *idx;
467 	void *key;
468 	int i;
469 	rfs4_bucket_t *bp;
470 	rfs4_table_t *table = entry->dbe_table;
471 	rfs4_link_t *l;
472 
473 	NFS4_DEBUG(table->dbt_debug & DESTROY_DEBUG,
474 	    (CE_NOTE, "Destroying entry %p from %s",
475 	    (void*)entry, table->dbt_name));
476 
477 	mutex_enter(entry->dbe_lock);
478 	ASSERT(entry->dbe_refcnt == 0);
479 	mutex_exit(entry->dbe_lock);
480 
481 	/* Unlink from all indices */
482 	for (idx = table->dbt_indices; idx; idx = idx->dbi_inext) {
483 		l = &entry->dbe_indices[idx->dbi_tblidx];
484 		/* check and see if we were ever linked in to the index */
485 		if (INVALID_LINK(l)) {
486 			ASSERT(l->next == NULL && l->prev == NULL);
487 			continue;
488 		}
489 		key = idx->dbi_mkkey(entry->dbe_data);
490 		i = HASH(idx, key);
491 		bp = &idx->dbi_buckets[i];
492 		ASSERT(bp->dbk_head != NULL);
493 		DEQUEUE_IDX(bp, &entry->dbe_indices[idx->dbi_tblidx]);
494 	}
495 
496 	/* Destroy user data */
497 	if (table->dbt_destroy)
498 		(*table->dbt_destroy)(entry->dbe_data);
499 
500 	if (table->dbt_id_space)
501 		id_free(table->dbt_id_space, entry->dbe_id);
502 
503 	mutex_enter(table->dbt_lock);
504 	table->dbt_count--;
505 	mutex_exit(table->dbt_lock);
506 
507 	/* Destroy the entry itself */
508 	kmem_cache_free(table->dbt_mem_cache, entry);
509 }
510 
511 
512 static rfs4_dbe_t *
513 rfs4_dbe_create(rfs4_table_t *table, id_t id, rfs4_entry_t data)
514 {
515 	rfs4_dbe_t *entry;
516 	int i;
517 
518 	NFS4_DEBUG(table->dbt_debug & CREATE_DEBUG,
519 	    (CE_NOTE, "Creating entry in table %s", table->dbt_name));
520 
521 	entry = kmem_cache_alloc(table->dbt_mem_cache, KM_SLEEP);
522 
523 	entry->dbe_refcnt = 1;
524 	entry->dbe_invalid = FALSE;
525 	entry->dbe_skipsearch = FALSE;
526 	entry->dbe_time_rele = 0;
527 	entry->dbe_id = 0;
528 
529 	if (table->dbt_id_space)
530 		entry->dbe_id = id;
531 	entry->dbe_table = table;
532 
533 	for (i = 0; i < table->dbt_maxcnt; i++) {
534 		entry->dbe_indices[i].next = entry->dbe_indices[i].prev = NULL;
535 		entry->dbe_indices[i].entry = entry;
536 		/*
537 		 * We mark the entry as not indexed by setting the low
538 		 * order bit, since address are word aligned. This has
539 		 * the advantage of causeing a trap if the address is
540 		 * used. After the entry is linked in to the
541 		 * corresponding index the bit will be cleared.
542 		 */
543 		INVALIDATE_ADDR(entry->dbe_indices[i].entry);
544 	}
545 
546 	entry->dbe_data = (rfs4_entry_t)&entry->dbe_indices[table->dbt_maxcnt];
547 	bzero(entry->dbe_data, table->dbt_usize);
548 	entry->dbe_data->dbe = entry;
549 
550 	if (!(*table->dbt_create)(entry->dbe_data, data)) {
551 		kmem_cache_free(table->dbt_mem_cache, entry);
552 		return (NULL);
553 	}
554 
555 	mutex_enter(table->dbt_lock);
556 	table->dbt_count++;
557 	mutex_exit(table->dbt_lock);
558 
559 	return (entry);
560 }
561 
562 static void
563 rfs4_dbe_tabreap_adjust(rfs4_table_t *table)
564 {
565 	clock_t		tabreap;
566 	clock_t		reap_int;
567 	uint32_t	in_use;
568 
569 	/*
570 	 * Adjust the table's reap interval based on the
571 	 * number of id's currently in use. Each table's
572 	 * default remains the same if id usage subsides.
573 	 */
574 	ASSERT(MUTEX_HELD(&table->dbt_reaper_cv_lock));
575 	tabreap = MIN(rfs4_reap_interval, table->dbt_max_cache_time);
576 
577 	in_use = table->dbt_count + 1;	/* see rfs4_dbe_create */
578 	if (in_use >= table->dbt_id_hwat) {
579 		ASSERT(t_hreap != 0);
580 		reap_int = (tabreap * t_hreap) / 100;
581 	} else if (in_use >= table->dbt_id_lwat) {
582 		ASSERT(t_lreap != 0);
583 		reap_int = (tabreap * t_lreap) / 100;
584 	} else {
585 		reap_int = tabreap;
586 	}
587 	table->dbt_id_reap = reap_int;
588 	DTRACE_PROBE2(table__reap__interval, char *,
589 	    table->dbt_name, time_t, table->dbt_id_reap);
590 }
591 
592 rfs4_entry_t
593 rfs4_dbsearch(rfs4_index_t *idx, void *key, bool_t *create, void *arg,
594     rfs4_dbsearch_type_t dbsearch_type)
595 {
596 	int		 already_done;
597 	uint32_t	 i;
598 	rfs4_table_t	*table = idx->dbi_table;
599 	rfs4_index_t	*ip;
600 	rfs4_bucket_t	*bp;
601 	rfs4_link_t	*l;
602 	rfs4_dbe_t	*entry;
603 	id_t		 id = -1;
604 
605 	i = HASH(idx, key);
606 	bp = &idx->dbi_buckets[i];
607 
608 	NFS4_DEBUG(table->dbt_debug & SEARCH_DEBUG,
609 	    (CE_NOTE, "Searching for key %p in table %s by %s",
610 	    key, table->dbt_name, idx->dbi_keyname));
611 
612 	rw_enter(bp->dbk_lock, RW_READER);
613 retry:
614 	for (l = bp->dbk_head; l; l = l->next) {
615 		if (l->entry->dbe_refcnt > 0 &&
616 		    (l->entry->dbe_skipsearch == FALSE ||
617 		    (l->entry->dbe_skipsearch == TRUE &&
618 		    dbsearch_type == RFS4_DBS_INVALID)) &&
619 		    (*idx->dbi_compare)(l->entry->dbe_data, key)) {
620 			mutex_enter(l->entry->dbe_lock);
621 			if (l->entry->dbe_refcnt == 0) {
622 				mutex_exit(l->entry->dbe_lock);
623 				continue;
624 			}
625 
626 			/* place an additional hold since we are returning */
627 			rfs4_dbe_hold(l->entry);
628 
629 			mutex_exit(l->entry->dbe_lock);
630 			rw_exit(bp->dbk_lock);
631 
632 			*create = FALSE;
633 
634 			NFS4_DEBUG((table->dbt_debug & SEARCH_DEBUG),
635 			    (CE_NOTE, "Found entry %p for %p in table %s",
636 			    (void *)l->entry, key, table->dbt_name));
637 
638 			if (id != -1)
639 				id_free(table->dbt_id_space, id);
640 			return (l->entry->dbe_data);
641 		}
642 	}
643 
644 	if (!*create || table->dbt_create == NULL || !idx->dbi_createable ||
645 	    table->dbt_maxentries == table->dbt_count) {
646 		NFS4_DEBUG(table->dbt_debug & SEARCH_DEBUG,
647 		    (CE_NOTE, "Entry for %p in %s not found",
648 		    key, table->dbt_name));
649 
650 		rw_exit(bp->dbk_lock);
651 		if (id != -1)
652 			id_free(table->dbt_id_space, id);
653 		return (NULL);
654 	}
655 
656 	if (table->dbt_id_space && id == -1) {
657 		rw_exit(bp->dbk_lock);
658 
659 		/* get an id, ok to sleep for it here */
660 		id = id_alloc(table->dbt_id_space);
661 		ASSERT(id != -1);
662 
663 		mutex_enter(&table->dbt_reaper_cv_lock);
664 		rfs4_dbe_tabreap_adjust(table);
665 		mutex_exit(&table->dbt_reaper_cv_lock);
666 
667 		rw_enter(bp->dbk_lock, RW_WRITER);
668 		goto retry;
669 	}
670 
671 	/* get an exclusive lock on the bucket */
672 	if (rw_read_locked(bp->dbk_lock) && !rw_tryupgrade(bp->dbk_lock)) {
673 		NFS4_DEBUG(table->dbt_debug & OTHER_DEBUG,
674 		    (CE_NOTE, "Trying to upgrade lock on "
675 		    "hash chain %d (%p) for  %s by %s",
676 		    i, (void*)bp, table->dbt_name, idx->dbi_keyname));
677 
678 		rw_exit(bp->dbk_lock);
679 		rw_enter(bp->dbk_lock, RW_WRITER);
680 		goto retry;
681 	}
682 
683 	/* create entry */
684 	entry = rfs4_dbe_create(table, id, arg);
685 	if (entry == NULL) {
686 		rw_exit(bp->dbk_lock);
687 		if (id != -1)
688 			id_free(table->dbt_id_space, id);
689 
690 		NFS4_DEBUG(table->dbt_debug & CREATE_DEBUG,
691 		    (CE_NOTE, "Constructor for table %s failed",
692 		    table->dbt_name));
693 		return (NULL);
694 	}
695 
696 	/*
697 	 * Add one ref for entry into table's hash - only one
698 	 * reference added even though there may be multiple indices
699 	 */
700 	rfs4_dbe_hold(entry);
701 	ENQUEUE(bp->dbk_head, &entry->dbe_indices[idx->dbi_tblidx]);
702 	VALIDATE_ADDR(entry->dbe_indices[idx->dbi_tblidx].entry);
703 
704 	already_done = idx->dbi_tblidx;
705 	rw_exit(bp->dbk_lock);
706 
707 	for (ip = table->dbt_indices; ip; ip = ip->dbi_inext) {
708 		if (ip->dbi_tblidx == already_done)
709 			continue;
710 		l = &entry->dbe_indices[ip->dbi_tblidx];
711 		i = HASH(ip, ip->dbi_mkkey(entry->dbe_data));
712 		ASSERT(i < ip->dbi_table->dbt_len);
713 		bp = &ip->dbi_buckets[i];
714 		ENQUEUE_IDX(bp, l);
715 	}
716 
717 	NFS4_DEBUG(
718 	    table->dbt_debug & SEARCH_DEBUG || table->dbt_debug & CREATE_DEBUG,
719 	    (CE_NOTE, "Entry %p created for %s = %p in table %s",
720 	    (void*)entry, idx->dbi_keyname, (void*)key, table->dbt_name));
721 
722 	return (entry->dbe_data);
723 }
724 
725 /*ARGSUSED*/
726 boolean_t
727 rfs4_cpr_callb(void *arg, int code)
728 {
729 	rfs4_bucket_t *buckets, *bp;
730 	rfs4_link_t *l;
731 	rfs4_client_t *cp;
732 	int i;
733 
734 	nfs4_srv_t *nsrv4 = nfs4_get_srv();
735 	rfs4_table_t *table = nsrv4->rfs4_client_tab;
736 
737 	/*
738 	 * We get called for Suspend and Resume events.
739 	 * For the suspend case we simply don't care!  Nor do we care if
740 	 * there are no clients.
741 	 */
742 	if (code == CB_CODE_CPR_CHKPT || table == NULL) {
743 		return (B_TRUE);
744 	}
745 
746 	buckets = table->dbt_indices->dbi_buckets;
747 
748 	/*
749 	 * When we get this far we are in the process of
750 	 * resuming the system from a previous suspend.
751 	 *
752 	 * We are going to blast through and update the
753 	 * last_access time for all the clients and in
754 	 * doing so extend them by one lease period.
755 	 */
756 	for (i = 0; i < table->dbt_len; i++) {
757 		bp = &buckets[i];
758 		for (l = bp->dbk_head; l; l = l->next) {
759 			cp = (rfs4_client_t *)l->entry->dbe_data;
760 			cp->rc_last_access = gethrestime_sec();
761 		}
762 	}
763 
764 	return (B_TRUE);
765 }
766 
767 /*
768  * Given a table, lock each of the buckets and walk all entries (in
769  * turn locking those) and calling the provided "callout" function
770  * with the provided parameter.  Obviously used to iterate across all
771  * entries in a particular table via the database locking hierarchy.
772  * Obviously the caller must not hold locks on any of the entries in
773  * the specified table.
774  */
775 void
776 rfs4_dbe_walk(rfs4_table_t *table,
777     void (*callout)(rfs4_entry_t, void *),
778     void *data)
779 {
780 	rfs4_bucket_t *buckets = table->dbt_indices->dbi_buckets, *bp;
781 	rfs4_link_t *l;
782 	rfs4_dbe_t *entry;
783 	int i;
784 
785 	NFS4_DEBUG(table->dbt_debug & WALK_DEBUG,
786 	    (CE_NOTE, "Walking entries in %s", table->dbt_name));
787 
788 	/* Walk the buckets looking for entries to release/destroy */
789 	for (i = 0; i < table->dbt_len; i++) {
790 		bp = &buckets[i];
791 		rw_enter(bp->dbk_lock, RW_READER);
792 		for (l = bp->dbk_head; l; l = l->next) {
793 			entry = l->entry;
794 			mutex_enter(entry->dbe_lock);
795 			(*callout)(entry->dbe_data, data);
796 			mutex_exit(entry->dbe_lock);
797 		}
798 		rw_exit(bp->dbk_lock);
799 	}
800 
801 	NFS4_DEBUG(table->dbt_debug & WALK_DEBUG,
802 	    (CE_NOTE, "Walking entries complete %s", table->dbt_name));
803 }
804 
805 
806 static void
807 rfs4_dbe_reap(rfs4_table_t *table, time_t cache_time, uint32_t desired)
808 {
809 	rfs4_index_t *idx = table->dbt_indices;
810 	rfs4_bucket_t *buckets = idx->dbi_buckets, *bp;
811 	rfs4_link_t *l, *t;
812 	rfs4_dbe_t *entry;
813 	bool_t found;
814 	int i;
815 	int count = 0;
816 
817 	NFS4_DEBUG(table->dbt_debug & REAP_DEBUG,
818 	    (CE_NOTE, "Reaping %d entries older than %ld seconds in table %s",
819 	    desired, cache_time, table->dbt_name));
820 
821 	/* Walk the buckets looking for entries to release/destroy */
822 	for (i = 0; i < table->dbt_len; i++) {
823 		bp = &buckets[i];
824 		do {
825 			found = FALSE;
826 			rw_enter(bp->dbk_lock, RW_READER);
827 			for (l = bp->dbk_head; l; l = l->next) {
828 				entry = l->entry;
829 				/*
830 				 * Examine an entry.  Ref count of 1 means
831 				 * that the only reference is for the hash
832 				 * table reference.
833 				 */
834 				if (entry->dbe_refcnt != 1)
835 					continue;
836 				mutex_enter(entry->dbe_lock);
837 				if ((entry->dbe_refcnt == 1) &&
838 				    (table->dbt_reaper_shutdown ||
839 				    table->dbt_expiry == NULL ||
840 				    (*table->dbt_expiry)(entry->dbe_data))) {
841 					entry->dbe_refcnt--;
842 					count++;
843 					found = TRUE;
844 				}
845 				mutex_exit(entry->dbe_lock);
846 			}
847 			if (found) {
848 				if (!rw_tryupgrade(bp->dbk_lock)) {
849 					rw_exit(bp->dbk_lock);
850 					rw_enter(bp->dbk_lock, RW_WRITER);
851 				}
852 
853 				l = bp->dbk_head;
854 				while (l) {
855 					t = l;
856 					entry = t->entry;
857 					l = l->next;
858 					if (entry->dbe_refcnt == 0) {
859 						DEQUEUE(bp->dbk_head, t);
860 						t->next = NULL;
861 						t->prev = NULL;
862 						INVALIDATE_ADDR(t->entry);
863 						rfs4_dbe_destroy(entry);
864 					}
865 				}
866 			}
867 			rw_exit(bp->dbk_lock);
868 			/*
869 			 * delay slightly if there is more work to do
870 			 * with the expectation that other reaper
871 			 * threads are freeing data structures as well
872 			 * and in turn will reduce ref counts on
873 			 * entries in this table allowing them to be
874 			 * released.  This is only done in the
875 			 * instance that the tables are being shut down.
876 			 */
877 			if (table->dbt_reaper_shutdown && bp->dbk_head != NULL)
878 				delay(hz/100);
879 		/*
880 		 * If this is a table shutdown, keep going until
881 		 * everything is gone
882 		 */
883 		} while (table->dbt_reaper_shutdown && bp->dbk_head != NULL);
884 
885 		if (!table->dbt_reaper_shutdown && desired && count >= desired)
886 			break;
887 	}
888 
889 	NFS4_DEBUG(table->dbt_debug & REAP_DEBUG,
890 	    (CE_NOTE, "Reaped %d entries older than %ld seconds in table %s",
891 	    count, cache_time, table->dbt_name));
892 }
893 
894 static void
895 reaper_thread(caddr_t *arg)
896 {
897 	rfs4_table_t	*table = (rfs4_table_t *)arg;
898 	clock_t		 rc;
899 
900 	NFS4_DEBUG(table->dbt_debug,
901 	    (CE_NOTE, "rfs4_reaper_thread starting for %s", table->dbt_name));
902 
903 	CALLB_CPR_INIT(&table->dbt_reaper_cpr_info, &table->dbt_reaper_cv_lock,
904 	    callb_generic_cpr, "nfsv4Reaper");
905 
906 	mutex_enter(&table->dbt_reaper_cv_lock);
907 	do {
908 		CALLB_CPR_SAFE_BEGIN(&table->dbt_reaper_cpr_info);
909 		rc = cv_reltimedwait_sig(&table->dbt_reaper_wait,
910 		    &table->dbt_reaper_cv_lock,
911 		    SEC_TO_TICK(table->dbt_id_reap), TR_CLOCK_TICK);
912 		CALLB_CPR_SAFE_END(&table->dbt_reaper_cpr_info,
913 		    &table->dbt_reaper_cv_lock);
914 		rfs4_dbe_reap(table, table->dbt_max_cache_time, 0);
915 	} while (rc != 0 && table->dbt_reaper_shutdown == FALSE);
916 
917 	CALLB_CPR_EXIT(&table->dbt_reaper_cpr_info);
918 
919 	NFS4_DEBUG(table->dbt_debug,
920 	    (CE_NOTE, "rfs4_reaper_thread exiting for %s", table->dbt_name));
921 
922 	/* Notify the database shutdown processing that the table is shutdown */
923 	mutex_enter(table->dbt_db->db_lock);
924 	table->dbt_db->db_shutdown_count--;
925 	cv_signal(&table->dbt_db->db_shutdown_wait);
926 	mutex_exit(table->dbt_db->db_lock);
927 	zthread_exit();
928 }
929 
930 static void
931 rfs4_start_reaper(rfs4_table_t *table)
932 {
933 	if (table->dbt_max_cache_time == 0)
934 		return;
935 
936 	(void) zthread_create(NULL, 0, reaper_thread, table, 0,
937 	    minclsyspri);
938 }
939 
940 #ifdef DEBUG
941 void
942 rfs4_dbe_debug(rfs4_dbe_t *entry)
943 {
944 	cmn_err(CE_NOTE, "Entry %p from table %s",
945 	    (void *)entry, entry->dbe_table->dbt_name);
946 	cmn_err(CE_CONT, "\trefcnt = %d id = %d",
947 	    entry->dbe_refcnt, entry->dbe_id);
948 }
949 #endif
950