xref: /freebsd/sys/rpc/replay.c (revision 123af6ec70016f5556da5972d4d63c7d175c06d3)
1 /*-
2  * SPDX-License-Identifier: BSD-2-Clause-FreeBSD
3  *
4  * Copyright (c) 2008 Isilon Inc http://www.isilon.com/
5  * Authors: Doug Rabson <dfr@rabson.org>
6  * Developed with Red Inc: Alfred Perlstein <alfred@freebsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27  * SUCH DAMAGE.
28  */
29 
30 #include <sys/cdefs.h>
31 __FBSDID("$FreeBSD$");
32 
33 #include <sys/param.h>
34 #include <sys/hash.h>
35 #include <sys/kernel.h>
36 #include <sys/lock.h>
37 #include <sys/malloc.h>
38 #include <sys/mbuf.h>
39 #include <sys/mutex.h>
40 #include <sys/queue.h>
41 
42 #include <rpc/rpc.h>
43 #include <rpc/replay.h>
44 
45 struct replay_cache_entry {
46 	int		rce_hash;
47 	struct rpc_msg	rce_msg;
48 	struct sockaddr_storage rce_addr;
49 	struct rpc_msg	rce_repmsg;
50 	struct mbuf	*rce_repbody;
51 
52 	TAILQ_ENTRY(replay_cache_entry) rce_link;
53 	TAILQ_ENTRY(replay_cache_entry) rce_alllink;
54 };
55 TAILQ_HEAD(replay_cache_list, replay_cache_entry);
56 
57 static struct replay_cache_entry *
58 		replay_alloc(struct replay_cache *rc, struct rpc_msg *msg,
59 		    struct sockaddr *addr, int h);
60 static void	replay_free(struct replay_cache *rc,
61     struct replay_cache_entry *rce);
62 static void	replay_prune(struct replay_cache *rc);
63 
64 #define REPLAY_HASH_SIZE	256
65 #define REPLAY_MAX		1024
66 
67 struct replay_cache {
68 	struct replay_cache_list	rc_cache[REPLAY_HASH_SIZE];
69 	struct replay_cache_list	rc_all;
70 	struct mtx			rc_lock;
71 	int				rc_count;
72 	size_t				rc_size;
73 	size_t				rc_maxsize;
74 };
75 
76 struct replay_cache *
77 replay_newcache(size_t maxsize)
78 {
79 	struct replay_cache *rc;
80 	int i;
81 
82 	rc = malloc(sizeof(*rc), M_RPC, M_WAITOK|M_ZERO);
83 	for (i = 0; i < REPLAY_HASH_SIZE; i++)
84 		TAILQ_INIT(&rc->rc_cache[i]);
85 	TAILQ_INIT(&rc->rc_all);
86 	mtx_init(&rc->rc_lock, "rc_lock", NULL, MTX_DEF);
87 	rc->rc_maxsize = maxsize;
88 
89 	return (rc);
90 }
91 
92 void
93 replay_setsize(struct replay_cache *rc, size_t newmaxsize)
94 {
95 
96 	mtx_lock(&rc->rc_lock);
97 	rc->rc_maxsize = newmaxsize;
98 	replay_prune(rc);
99 	mtx_unlock(&rc->rc_lock);
100 }
101 
102 void
103 replay_freecache(struct replay_cache *rc)
104 {
105 
106 	mtx_lock(&rc->rc_lock);
107 	while (TAILQ_FIRST(&rc->rc_all))
108 		replay_free(rc, TAILQ_FIRST(&rc->rc_all));
109 	mtx_destroy(&rc->rc_lock);
110 	free(rc, M_RPC);
111 }
112 
113 static struct replay_cache_entry *
114 replay_alloc(struct replay_cache *rc,
115     struct rpc_msg *msg, struct sockaddr *addr, int h)
116 {
117 	struct replay_cache_entry *rce;
118 
119 	mtx_assert(&rc->rc_lock, MA_OWNED);
120 
121 	rc->rc_count++;
122 	rce = malloc(sizeof(*rce), M_RPC, M_NOWAIT|M_ZERO);
123 	if (!rce)
124 		return (NULL);
125 	rce->rce_hash = h;
126 	rce->rce_msg = *msg;
127 	bcopy(addr, &rce->rce_addr, addr->sa_len);
128 
129 	TAILQ_INSERT_HEAD(&rc->rc_cache[h], rce, rce_link);
130 	TAILQ_INSERT_HEAD(&rc->rc_all, rce, rce_alllink);
131 
132 	return (rce);
133 }
134 
135 static void
136 replay_free(struct replay_cache *rc, struct replay_cache_entry *rce)
137 {
138 
139 	mtx_assert(&rc->rc_lock, MA_OWNED);
140 
141 	rc->rc_count--;
142 	TAILQ_REMOVE(&rc->rc_cache[rce->rce_hash], rce, rce_link);
143 	TAILQ_REMOVE(&rc->rc_all, rce, rce_alllink);
144 	if (rce->rce_repbody) {
145 		rc->rc_size -= m_length(rce->rce_repbody, NULL);
146 		m_freem(rce->rce_repbody);
147 	}
148 	free(rce, M_RPC);
149 }
150 
151 static void
152 replay_prune(struct replay_cache *rc)
153 {
154 	struct replay_cache_entry *rce;
155 
156 	mtx_assert(&rc->rc_lock, MA_OWNED);
157 
158 	if (rc->rc_count < REPLAY_MAX && rc->rc_size <= rc->rc_maxsize)
159 		return;
160 
161 	do {
162 		/*
163 		 * Try to free an entry. Don't free in-progress entries.
164 		 */
165 		TAILQ_FOREACH_REVERSE(rce, &rc->rc_all, replay_cache_list,
166 		    rce_alllink) {
167 			if (rce->rce_repmsg.rm_xid)
168 				break;
169 		}
170 		if (rce)
171 			replay_free(rc, rce);
172 	} while (rce && (rc->rc_count >= REPLAY_MAX
173 	    || rc->rc_size > rc->rc_maxsize));
174 }
175 
176 enum replay_state
177 replay_find(struct replay_cache *rc, struct rpc_msg *msg,
178     struct sockaddr *addr, struct rpc_msg *repmsg, struct mbuf **mp)
179 {
180 	int h = HASHSTEP(HASHINIT, msg->rm_xid) % REPLAY_HASH_SIZE;
181 	struct replay_cache_entry *rce;
182 
183 	mtx_lock(&rc->rc_lock);
184 	TAILQ_FOREACH(rce, &rc->rc_cache[h], rce_link) {
185 		if (rce->rce_msg.rm_xid == msg->rm_xid
186 		    && rce->rce_msg.rm_call.cb_prog == msg->rm_call.cb_prog
187 		    && rce->rce_msg.rm_call.cb_vers == msg->rm_call.cb_vers
188 		    && rce->rce_msg.rm_call.cb_proc == msg->rm_call.cb_proc
189 		    && rce->rce_addr.ss_len == addr->sa_len
190 		    && bcmp(&rce->rce_addr, addr, addr->sa_len) == 0) {
191 			if (rce->rce_repmsg.rm_xid) {
192 				/*
193 				 * We have a reply for this
194 				 * message. Copy it and return. Keep
195 				 * replay_all LRU sorted
196 				 */
197 				TAILQ_REMOVE(&rc->rc_all, rce, rce_alllink);
198 				TAILQ_INSERT_HEAD(&rc->rc_all, rce,
199 				    rce_alllink);
200 				*repmsg = rce->rce_repmsg;
201 				if (rce->rce_repbody) {
202 					*mp = m_copym(rce->rce_repbody,
203 					    0, M_COPYALL, M_NOWAIT);
204 					mtx_unlock(&rc->rc_lock);
205 					if (!*mp)
206 						return (RS_ERROR);
207 				} else {
208 					mtx_unlock(&rc->rc_lock);
209 				}
210 				return (RS_DONE);
211 			} else {
212 				mtx_unlock(&rc->rc_lock);
213 				return (RS_INPROGRESS);
214 			}
215 		}
216 	}
217 
218 	replay_prune(rc);
219 
220 	rce = replay_alloc(rc, msg, addr, h);
221 
222 	mtx_unlock(&rc->rc_lock);
223 
224 	if (!rce)
225 		return (RS_ERROR);
226 	else
227 		return (RS_NEW);
228 }
229 
230 void
231 replay_setreply(struct replay_cache *rc,
232     struct rpc_msg *repmsg, struct sockaddr *addr, struct mbuf *m)
233 {
234 	int h = HASHSTEP(HASHINIT, repmsg->rm_xid) % REPLAY_HASH_SIZE;
235 	struct replay_cache_entry *rce;
236 
237 	/*
238 	 * Copy the reply before the lock so we can sleep.
239 	 */
240 	if (m)
241 		m = m_copym(m, 0, M_COPYALL, M_WAITOK);
242 
243 	mtx_lock(&rc->rc_lock);
244 	TAILQ_FOREACH(rce, &rc->rc_cache[h], rce_link) {
245 		if (rce->rce_msg.rm_xid == repmsg->rm_xid
246 		    && rce->rce_addr.ss_len == addr->sa_len
247 		    && bcmp(&rce->rce_addr, addr, addr->sa_len) == 0) {
248 			break;
249 		}
250 	}
251 	if (rce) {
252 		rce->rce_repmsg = *repmsg;
253 		rce->rce_repbody = m;
254 		if (m)
255 			rc->rc_size += m_length(m, NULL);
256 	}
257 	mtx_unlock(&rc->rc_lock);
258 }
259