xref: /freebsd/sys/rpc/replay.c (revision 9162f64b58d01ec01481d60b6cdc06ffd8e8c7fc)
1 /*-
2  * Copyright (c) 2008 Isilon Inc http://www.isilon.com/
3  * Authors: Doug Rabson <dfr@rabson.org>
4  * Developed with Red Inc: Alfred Perlstein <alfred@freebsd.org>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25  * SUCH DAMAGE.
26  */
27 
28 #include <sys/cdefs.h>
29 __FBSDID("$FreeBSD$");
30 
31 #include <sys/param.h>
32 #include <sys/hash.h>
33 #include <sys/kernel.h>
34 #include <sys/lock.h>
35 #include <sys/mbuf.h>
36 #include <sys/mutex.h>
37 #include <sys/queue.h>
38 
39 #include <rpc/rpc.h>
40 #include <rpc/replay.h>
41 
42 struct replay_cache_entry {
43 	int		rce_hash;
44 	struct rpc_msg	rce_msg;
45 	struct sockaddr_storage rce_addr;
46 	struct rpc_msg	rce_repmsg;
47 	struct mbuf	*rce_repbody;
48 
49 	TAILQ_ENTRY(replay_cache_entry) rce_link;
50 	TAILQ_ENTRY(replay_cache_entry) rce_alllink;
51 };
52 TAILQ_HEAD(replay_cache_list, replay_cache_entry);
53 
54 static struct replay_cache_entry *
55 		replay_alloc(struct replay_cache *rc, struct rpc_msg *msg,
56 		    struct sockaddr *addr, int h);
57 static void	replay_free(struct replay_cache *rc,
58     struct replay_cache_entry *rce);
59 static void	replay_prune(struct replay_cache *rc);
60 
61 #define REPLAY_HASH_SIZE	256
62 #define REPLAY_MAX		1024
63 
64 struct replay_cache {
65 	struct replay_cache_list	rc_cache[REPLAY_HASH_SIZE];
66 	struct replay_cache_list	rc_all;
67 	struct mtx			rc_lock;
68 	int				rc_count;
69 	size_t				rc_size;
70 	size_t				rc_maxsize;
71 };
72 
73 struct replay_cache *
74 replay_newcache(size_t maxsize)
75 {
76 	struct replay_cache *rc;
77 	int i;
78 
79 	rc = malloc(sizeof(*rc), M_RPC, M_WAITOK|M_ZERO);
80 	for (i = 0; i < REPLAY_HASH_SIZE; i++)
81 		TAILQ_INIT(&rc->rc_cache[i]);
82 	TAILQ_INIT(&rc->rc_all);
83 	mtx_init(&rc->rc_lock, "rc_lock", NULL, MTX_DEF);
84 	rc->rc_maxsize = maxsize;
85 
86 	return (rc);
87 }
88 
89 void
90 replay_setsize(struct replay_cache *rc, size_t newmaxsize)
91 {
92 
93 	rc->rc_maxsize = newmaxsize;
94 	replay_prune(rc);
95 }
96 
97 void
98 replay_freecache(struct replay_cache *rc)
99 {
100 
101 	mtx_lock(&rc->rc_lock);
102 	while (TAILQ_FIRST(&rc->rc_all))
103 		replay_free(rc, TAILQ_FIRST(&rc->rc_all));
104 	mtx_destroy(&rc->rc_lock);
105 	free(rc, M_RPC);
106 }
107 
108 static struct replay_cache_entry *
109 replay_alloc(struct replay_cache *rc,
110     struct rpc_msg *msg, struct sockaddr *addr, int h)
111 {
112 	struct replay_cache_entry *rce;
113 
114 	rc->rc_count++;
115 	rce = malloc(sizeof(*rce), M_RPC, M_NOWAIT|M_ZERO);
116 	rce->rce_hash = h;
117 	rce->rce_msg = *msg;
118 	bcopy(addr, &rce->rce_addr, addr->sa_len);
119 
120 	TAILQ_INSERT_HEAD(&rc->rc_cache[h], rce, rce_link);
121 	TAILQ_INSERT_HEAD(&rc->rc_all, rce, rce_alllink);
122 
123 	return (rce);
124 }
125 
126 static void
127 replay_free(struct replay_cache *rc, struct replay_cache_entry *rce)
128 {
129 
130 	rc->rc_count--;
131 	TAILQ_REMOVE(&rc->rc_cache[rce->rce_hash], rce, rce_link);
132 	TAILQ_REMOVE(&rc->rc_all, rce, rce_alllink);
133 	if (rce->rce_repbody) {
134 		rc->rc_size -= m_length(rce->rce_repbody, NULL);
135 		m_freem(rce->rce_repbody);
136 	}
137 	free(rce, M_RPC);
138 }
139 
140 static void
141 replay_prune(struct replay_cache *rc)
142 {
143 	struct replay_cache_entry *rce;
144 	bool_t freed_one;
145 
146 	if (rc->rc_count >= REPLAY_MAX || rc->rc_size > rc->rc_maxsize) {
147 		freed_one = FALSE;
148 		do {
149 			/*
150 			 * Try to free an entry. Don't free in-progress entries
151 			 */
152 			TAILQ_FOREACH_REVERSE(rce, &rc->rc_all,
153 			    replay_cache_list, rce_alllink) {
154 				if (rce->rce_repmsg.rm_xid) {
155 					replay_free(rc, rce);
156 					freed_one = TRUE;
157 					break;
158 				}
159 			}
160 		} while (freed_one
161 		    && (rc->rc_count >= REPLAY_MAX
162 			|| rc->rc_size > rc->rc_maxsize));
163 	}
164 }
165 
166 enum replay_state
167 replay_find(struct replay_cache *rc, struct rpc_msg *msg,
168     struct sockaddr *addr, struct rpc_msg *repmsg, struct mbuf **mp)
169 {
170 	int h = HASHSTEP(HASHINIT, msg->rm_xid) % REPLAY_HASH_SIZE;
171 	struct replay_cache_entry *rce;
172 
173 	mtx_lock(&rc->rc_lock);
174 	TAILQ_FOREACH(rce, &rc->rc_cache[h], rce_link) {
175 		if (rce->rce_msg.rm_xid == msg->rm_xid
176 		    && rce->rce_msg.rm_call.cb_prog == msg->rm_call.cb_prog
177 		    && rce->rce_msg.rm_call.cb_vers == msg->rm_call.cb_vers
178 		    && rce->rce_msg.rm_call.cb_proc == msg->rm_call.cb_proc
179 		    && rce->rce_addr.ss_len == addr->sa_len
180 		    && bcmp(&rce->rce_addr, addr, addr->sa_len) == 0) {
181 			if (rce->rce_repmsg.rm_xid) {
182 				/*
183 				 * We have a reply for this
184 				 * message. Copy it and return. Keep
185 				 * replay_all LRU sorted
186 				 */
187 				TAILQ_REMOVE(&rc->rc_all, rce, rce_alllink);
188 				TAILQ_INSERT_HEAD(&rc->rc_all, rce,
189 				    rce_alllink);
190 				*repmsg = rce->rce_repmsg;
191 				if (rce->rce_repbody) {
192 					*mp = m_copym(rce->rce_repbody,
193 					    0, M_COPYALL, M_NOWAIT);
194 					mtx_unlock(&rc->rc_lock);
195 					if (!*mp)
196 						return (RS_ERROR);
197 				} else {
198 					mtx_unlock(&rc->rc_lock);
199 				}
200 				return (RS_DONE);
201 			} else {
202 				mtx_unlock(&rc->rc_lock);
203 				return (RS_INPROGRESS);
204 			}
205 		}
206 	}
207 
208 	replay_prune(rc);
209 
210 	rce = replay_alloc(rc, msg, addr, h);
211 
212 	mtx_unlock(&rc->rc_lock);
213 
214 	if (!rce)
215 		return (RS_ERROR);
216 	else
217 		return (RS_NEW);
218 }
219 
220 void
221 replay_setreply(struct replay_cache *rc,
222     struct rpc_msg *repmsg, struct sockaddr *addr, struct mbuf *m)
223 {
224 	int h = HASHSTEP(HASHINIT, repmsg->rm_xid) % REPLAY_HASH_SIZE;
225 	struct replay_cache_entry *rce;
226 
227 	/*
228 	 * Copy the reply before the lock so we can sleep.
229 	 */
230 	if (m)
231 		m = m_copym(m, 0, M_COPYALL, M_WAITOK);
232 
233 	mtx_lock(&rc->rc_lock);
234 	TAILQ_FOREACH(rce, &rc->rc_cache[h], rce_link) {
235 		if (rce->rce_msg.rm_xid == repmsg->rm_xid
236 		    && rce->rce_addr.ss_len == addr->sa_len
237 		    && bcmp(&rce->rce_addr, addr, addr->sa_len) == 0) {
238 			break;
239 		}
240 	}
241 	if (rce) {
242 		rce->rce_repmsg = *repmsg;
243 		rce->rce_repbody = m;
244 		if (m)
245 			rc->rc_size += m_length(m, NULL);
246 	}
247 	mtx_unlock(&rc->rc_lock);
248 }
249