xref: /freebsd/sys/rpc/replay.c (revision 7a0c41d5d7d4e9770ef6f5d56f893efc8f18ab7c)
1 /*-
2  * Copyright (c) 2008 Isilon Inc http://www.isilon.com/
3  * Authors: Doug Rabson <dfr@rabson.org>
4  * Developed with Red Inc: Alfred Perlstein <alfred@freebsd.org>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25  * SUCH DAMAGE.
26  */
27 
28 #include <sys/cdefs.h>
29 __FBSDID("$FreeBSD$");
30 
31 #include <sys/param.h>
32 #include <sys/hash.h>
33 #include <sys/kernel.h>
34 #include <sys/lock.h>
35 #include <sys/malloc.h>
36 #include <sys/mbuf.h>
37 #include <sys/mutex.h>
38 #include <sys/queue.h>
39 
40 #include <rpc/rpc.h>
41 #include <rpc/replay.h>
42 
43 struct replay_cache_entry {
44 	int		rce_hash;
45 	struct rpc_msg	rce_msg;
46 	struct sockaddr_storage rce_addr;
47 	struct rpc_msg	rce_repmsg;
48 	struct mbuf	*rce_repbody;
49 
50 	TAILQ_ENTRY(replay_cache_entry) rce_link;
51 	TAILQ_ENTRY(replay_cache_entry) rce_alllink;
52 };
53 TAILQ_HEAD(replay_cache_list, replay_cache_entry);
54 
55 static struct replay_cache_entry *
56 		replay_alloc(struct replay_cache *rc, struct rpc_msg *msg,
57 		    struct sockaddr *addr, int h);
58 static void	replay_free(struct replay_cache *rc,
59     struct replay_cache_entry *rce);
60 static void	replay_prune(struct replay_cache *rc);
61 
62 #define REPLAY_HASH_SIZE	256
63 #define REPLAY_MAX		1024
64 
65 struct replay_cache {
66 	struct replay_cache_list	rc_cache[REPLAY_HASH_SIZE];
67 	struct replay_cache_list	rc_all;
68 	struct mtx			rc_lock;
69 	int				rc_count;
70 	size_t				rc_size;
71 	size_t				rc_maxsize;
72 };
73 
74 struct replay_cache *
75 replay_newcache(size_t maxsize)
76 {
77 	struct replay_cache *rc;
78 	int i;
79 
80 	rc = malloc(sizeof(*rc), M_RPC, M_WAITOK|M_ZERO);
81 	for (i = 0; i < REPLAY_HASH_SIZE; i++)
82 		TAILQ_INIT(&rc->rc_cache[i]);
83 	TAILQ_INIT(&rc->rc_all);
84 	mtx_init(&rc->rc_lock, "rc_lock", NULL, MTX_DEF);
85 	rc->rc_maxsize = maxsize;
86 
87 	return (rc);
88 }
89 
90 void
91 replay_setsize(struct replay_cache *rc, size_t newmaxsize)
92 {
93 
94 	mtx_lock(&rc->rc_lock);
95 	rc->rc_maxsize = newmaxsize;
96 	replay_prune(rc);
97 	mtx_unlock(&rc->rc_lock);
98 }
99 
100 void
101 replay_freecache(struct replay_cache *rc)
102 {
103 
104 	mtx_lock(&rc->rc_lock);
105 	while (TAILQ_FIRST(&rc->rc_all))
106 		replay_free(rc, TAILQ_FIRST(&rc->rc_all));
107 	mtx_destroy(&rc->rc_lock);
108 	free(rc, M_RPC);
109 }
110 
111 static struct replay_cache_entry *
112 replay_alloc(struct replay_cache *rc,
113     struct rpc_msg *msg, struct sockaddr *addr, int h)
114 {
115 	struct replay_cache_entry *rce;
116 
117 	mtx_assert(&rc->rc_lock, MA_OWNED);
118 
119 	rc->rc_count++;
120 	rce = malloc(sizeof(*rce), M_RPC, M_NOWAIT|M_ZERO);
121 	if (!rce)
122 		return (NULL);
123 	rce->rce_hash = h;
124 	rce->rce_msg = *msg;
125 	bcopy(addr, &rce->rce_addr, addr->sa_len);
126 
127 	TAILQ_INSERT_HEAD(&rc->rc_cache[h], rce, rce_link);
128 	TAILQ_INSERT_HEAD(&rc->rc_all, rce, rce_alllink);
129 
130 	return (rce);
131 }
132 
133 static void
134 replay_free(struct replay_cache *rc, struct replay_cache_entry *rce)
135 {
136 
137 	mtx_assert(&rc->rc_lock, MA_OWNED);
138 
139 	rc->rc_count--;
140 	TAILQ_REMOVE(&rc->rc_cache[rce->rce_hash], rce, rce_link);
141 	TAILQ_REMOVE(&rc->rc_all, rce, rce_alllink);
142 	if (rce->rce_repbody) {
143 		rc->rc_size -= m_length(rce->rce_repbody, NULL);
144 		m_freem(rce->rce_repbody);
145 	}
146 	free(rce, M_RPC);
147 }
148 
149 static void
150 replay_prune(struct replay_cache *rc)
151 {
152 	struct replay_cache_entry *rce;
153 
154 	mtx_assert(&rc->rc_lock, MA_OWNED);
155 
156 	if (rc->rc_count < REPLAY_MAX && rc->rc_size <= rc->rc_maxsize)
157 		return;
158 
159 	do {
160 		/*
161 		 * Try to free an entry. Don't free in-progress entries.
162 		 */
163 		TAILQ_FOREACH_REVERSE(rce, &rc->rc_all, replay_cache_list,
164 		    rce_alllink) {
165 			if (rce->rce_repmsg.rm_xid)
166 				break;
167 		}
168 		if (rce)
169 			replay_free(rc, rce);
170 	} while (rce && (rc->rc_count >= REPLAY_MAX
171 	    || rc->rc_size > rc->rc_maxsize));
172 }
173 
174 enum replay_state
175 replay_find(struct replay_cache *rc, struct rpc_msg *msg,
176     struct sockaddr *addr, struct rpc_msg *repmsg, struct mbuf **mp)
177 {
178 	int h = HASHSTEP(HASHINIT, msg->rm_xid) % REPLAY_HASH_SIZE;
179 	struct replay_cache_entry *rce;
180 
181 	mtx_lock(&rc->rc_lock);
182 	TAILQ_FOREACH(rce, &rc->rc_cache[h], rce_link) {
183 		if (rce->rce_msg.rm_xid == msg->rm_xid
184 		    && rce->rce_msg.rm_call.cb_prog == msg->rm_call.cb_prog
185 		    && rce->rce_msg.rm_call.cb_vers == msg->rm_call.cb_vers
186 		    && rce->rce_msg.rm_call.cb_proc == msg->rm_call.cb_proc
187 		    && rce->rce_addr.ss_len == addr->sa_len
188 		    && bcmp(&rce->rce_addr, addr, addr->sa_len) == 0) {
189 			if (rce->rce_repmsg.rm_xid) {
190 				/*
191 				 * We have a reply for this
192 				 * message. Copy it and return. Keep
193 				 * replay_all LRU sorted
194 				 */
195 				TAILQ_REMOVE(&rc->rc_all, rce, rce_alllink);
196 				TAILQ_INSERT_HEAD(&rc->rc_all, rce,
197 				    rce_alllink);
198 				*repmsg = rce->rce_repmsg;
199 				if (rce->rce_repbody) {
200 					*mp = m_copym(rce->rce_repbody,
201 					    0, M_COPYALL, M_NOWAIT);
202 					mtx_unlock(&rc->rc_lock);
203 					if (!*mp)
204 						return (RS_ERROR);
205 				} else {
206 					mtx_unlock(&rc->rc_lock);
207 				}
208 				return (RS_DONE);
209 			} else {
210 				mtx_unlock(&rc->rc_lock);
211 				return (RS_INPROGRESS);
212 			}
213 		}
214 	}
215 
216 	replay_prune(rc);
217 
218 	rce = replay_alloc(rc, msg, addr, h);
219 
220 	mtx_unlock(&rc->rc_lock);
221 
222 	if (!rce)
223 		return (RS_ERROR);
224 	else
225 		return (RS_NEW);
226 }
227 
228 void
229 replay_setreply(struct replay_cache *rc,
230     struct rpc_msg *repmsg, struct sockaddr *addr, struct mbuf *m)
231 {
232 	int h = HASHSTEP(HASHINIT, repmsg->rm_xid) % REPLAY_HASH_SIZE;
233 	struct replay_cache_entry *rce;
234 
235 	/*
236 	 * Copy the reply before the lock so we can sleep.
237 	 */
238 	if (m)
239 		m = m_copym(m, 0, M_COPYALL, M_WAITOK);
240 
241 	mtx_lock(&rc->rc_lock);
242 	TAILQ_FOREACH(rce, &rc->rc_cache[h], rce_link) {
243 		if (rce->rce_msg.rm_xid == repmsg->rm_xid
244 		    && rce->rce_addr.ss_len == addr->sa_len
245 		    && bcmp(&rce->rce_addr, addr, addr->sa_len) == 0) {
246 			break;
247 		}
248 	}
249 	if (rce) {
250 		rce->rce_repmsg = *repmsg;
251 		rce->rce_repbody = m;
252 		if (m)
253 			rc->rc_size += m_length(m, NULL);
254 	}
255 	mtx_unlock(&rc->rc_lock);
256 }
257