xref: /linux/net/rds/tcp_listen.c (revision 9611c0ce215a66770ccbe5c126bf57ba8c31bcad)
1 /*
2  * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * OpenIB.org BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  *
32  */
33 #include <linux/kernel.h>
34 #include <linux/gfp.h>
35 #include <linux/in.h>
36 #include <net/tcp.h>
37 #include <trace/events/sock.h>
38 #include <net/net_namespace.h>
39 #include <net/netns/generic.h>
40 
41 #include "rds.h"
42 #include "tcp.h"
43 
44 void rds_tcp_keepalive(struct socket *sock)
45 {
46 	/* values below based on xs_udp_default_timeout */
47 	int keepidle = 5; /* send a probe 'keepidle' secs after last data */
48 	int keepcnt = 5; /* number of unack'ed probes before declaring dead */
49 
50 	sock_set_keepalive(sock->sk);
51 	tcp_sock_set_keepcnt(sock->sk, keepcnt);
52 	tcp_sock_set_keepidle(sock->sk, keepidle);
53 	/* KEEPINTVL is the interval between successive probes. We follow
54 	 * the model in xs_tcp_finish_connecting() and re-use keepidle.
55 	 */
56 	tcp_sock_set_keepintvl(sock->sk, keepidle);
57 }
58 
59 static int
60 rds_tcp_get_peer_sport(struct socket *sock)
61 {
62 	struct sock *sk = sock->sk;
63 
64 	if (!sk)
65 		return -1;
66 
67 	return ntohs(READ_ONCE(inet_sk(sk)->inet_dport));
68 }
69 
70 /* rds_tcp_accept_one_path(): if accepting on cp_index > 0, make sure the
71  * client's ipaddr < server's ipaddr. Otherwise, close the accepted
72  * socket and force a reconnect from smaller -> larger ip addr. The reason
73  * we special case cp_index 0 is to allow the rds probe ping itself to itself
74  * get through efficiently.
75  */
76 static struct rds_tcp_connection *
77 rds_tcp_accept_one_path(struct rds_connection *conn, struct socket *sock)
78 {
79 	int sport, npaths, i_min, i_max, i;
80 
81 	if (conn->c_with_sport_idx)
82 		/* cp->cp_index is encoded in lowest bits of source-port */
83 		sport = rds_tcp_get_peer_sport(sock);
84 	else
85 		sport = -1;
86 
87 	npaths = max_t(int, 1, conn->c_npaths);
88 
89 	if (sport >= 0) {
90 		i_min = sport % npaths;
91 		i_max = i_min;
92 	} else {
93 		i_min = 0;
94 		i_max = npaths - 1;
95 	}
96 
97 	for (i = i_min; i <= i_max; i++) {
98 		struct rds_conn_path *cp = &conn->c_path[i];
99 
100 		if (rds_conn_path_transition(cp, RDS_CONN_DOWN,
101 					     RDS_CONN_CONNECTING))
102 			return cp->cp_transport_data;
103 	}
104 
105 	return NULL;
106 }
107 
108 void rds_tcp_conn_slots_available(struct rds_connection *conn, bool fan_out)
109 {
110 	struct rds_tcp_connection *tc;
111 	struct rds_tcp_net *rtn;
112 	struct socket *sock;
113 	int sport, npaths;
114 
115 	if (rds_destroy_pending(conn))
116 		return;
117 
118 	tc = conn->c_path->cp_transport_data;
119 	rtn = tc->t_rtn;
120 	if (!rtn)
121 		return;
122 
123 	sock = tc->t_sock;
124 
125 	/* During fan-out, check that the connection we already
126 	 * accepted in slot#0 carried the proper source port modulo.
127 	 */
128 	if (fan_out && conn->c_with_sport_idx && sock &&
129 	    rds_addr_cmp(&conn->c_laddr, &conn->c_faddr) > 0) {
130 		/* cp->cp_index is encoded in lowest bits of source-port */
131 		sport = rds_tcp_get_peer_sport(sock);
132 		npaths = max_t(int, 1, conn->c_npaths);
133 		if (sport >= 0 && sport % npaths != 0)
134 			/* peer initiated with a non-#0 lane first */
135 			rds_conn_path_drop(conn->c_path, 0);
136 	}
137 
138 	/* As soon as a connection went down,
139 	 * it is safe to schedule a "rds_tcp_accept_one"
140 	 * attempt even if there are no connections pending:
141 	 * Function "rds_tcp_accept_one" won't block
142 	 * but simply return -EAGAIN in that case.
143 	 *
144 	 * Doing so is necessary to address the case where an
145 	 * incoming connection on "rds_tcp_listen_sock" is ready
146 	 * to be accepted prior to a free slot being available:
147 	 * the -ENOBUFS case in "rds_tcp_accept_one".
148 	 */
149 	rds_tcp_accept_work(rtn);
150 }
151 
152 int rds_tcp_accept_one(struct rds_tcp_net *rtn)
153 {
154 	struct socket *listen_sock = rtn->rds_tcp_listen_sock;
155 	struct socket *new_sock = NULL;
156 	struct rds_connection *conn;
157 	int ret;
158 	struct inet_sock *inet;
159 	struct rds_tcp_connection *rs_tcp = NULL;
160 	int conn_state;
161 	struct rds_conn_path *cp;
162 	struct sock *sk;
163 	struct in6_addr *my_addr, *peer_addr;
164 #if !IS_ENABLED(CONFIG_IPV6)
165 	struct in6_addr saddr, daddr;
166 #endif
167 	int dev_if = 0;
168 
169 	if (!listen_sock) /* module unload or netns delete in progress */
170 		return -ENETUNREACH;
171 
172 	mutex_lock(&rtn->rds_tcp_accept_lock);
173 	new_sock = rtn->rds_tcp_accepted_sock;
174 	rtn->rds_tcp_accepted_sock = NULL;
175 
176 	if (!new_sock) {
177 		ret = kernel_accept(listen_sock, &new_sock, O_NONBLOCK);
178 		if (ret)
179 			goto out;
180 
181 		rds_tcp_keepalive(new_sock);
182 		if (!rds_tcp_tune(new_sock)) {
183 			ret = -EINVAL;
184 			goto out;
185 		}
186 	}
187 
188 	inet = inet_sk(new_sock->sk);
189 
190 #if IS_ENABLED(CONFIG_IPV6)
191 	my_addr = &new_sock->sk->sk_v6_rcv_saddr;
192 	peer_addr = &new_sock->sk->sk_v6_daddr;
193 #else
194 	ipv6_addr_set_v4mapped(inet->inet_saddr, &saddr);
195 	ipv6_addr_set_v4mapped(inet->inet_daddr, &daddr);
196 	my_addr = &saddr;
197 	peer_addr = &daddr;
198 #endif
199 	rdsdebug("accepted family %d tcp %pI6c:%u -> %pI6c:%u\n",
200 		 listen_sock->sk->sk_family,
201 		 my_addr, ntohs(inet->inet_sport),
202 		 peer_addr, ntohs(inet->inet_dport));
203 
204 #if IS_ENABLED(CONFIG_IPV6)
205 	/* sk_bound_dev_if is not set if the peer address is not link local
206 	 * address.  In this case, it happens that mcast_oif is set.  So
207 	 * just use it.
208 	 */
209 	if ((ipv6_addr_type(my_addr) & IPV6_ADDR_LINKLOCAL) &&
210 	    !(ipv6_addr_type(peer_addr) & IPV6_ADDR_LINKLOCAL)) {
211 		struct ipv6_pinfo *inet6;
212 
213 		inet6 = inet6_sk(new_sock->sk);
214 		dev_if = READ_ONCE(inet6->mcast_oif);
215 	} else {
216 		dev_if = new_sock->sk->sk_bound_dev_if;
217 	}
218 #endif
219 
220 	if (!rds_tcp_laddr_check(sock_net(listen_sock->sk), peer_addr, dev_if)) {
221 		/* local address connection is only allowed via loopback */
222 		ret = -EOPNOTSUPP;
223 		goto out;
224 	}
225 
226 	conn = rds_conn_create(sock_net(listen_sock->sk),
227 			       my_addr, peer_addr,
228 			       &rds_tcp_transport, 0, GFP_KERNEL, dev_if);
229 
230 	if (IS_ERR(conn)) {
231 		ret = PTR_ERR(conn);
232 		goto out;
233 	}
234 	/* An incoming SYN request came in, and TCP just accepted it.
235 	 *
236 	 * If the client reboots, this conn will need to be cleaned up.
237 	 * rds_tcp_state_change() will do that cleanup
238 	 */
239 	if (rds_addr_cmp(&conn->c_faddr, &conn->c_laddr) < 0) {
240 		/* Try to obtain a free connection slot.
241 		 * If unsuccessful, we need to preserve "new_sock"
242 		 * that we just accepted, since its "sk_receive_queue"
243 		 * may contain messages already that have been acknowledged
244 		 * to and discarded by the sender.
245 		 * We must not throw those away!
246 		 */
247 		rs_tcp = rds_tcp_accept_one_path(conn, new_sock);
248 		if (!rs_tcp) {
249 			/* It's okay to stash "new_sock", since
250 			 * "rds_tcp_conn_slots_available" triggers
251 			 * "rds_tcp_accept_one" again as soon as one of the
252 			 * connection slots becomes available again
253 			 */
254 			rtn->rds_tcp_accepted_sock = new_sock;
255 			new_sock = NULL;
256 			ret = -ENOBUFS;
257 			goto out;
258 		}
259 	} else {
260 		/* This connection request came from a peer with
261 		 * a larger address.
262 		 * Function "rds_tcp_state_change" makes sure
263 		 * that the connection doesn't transition
264 		 * to state "RDS_CONN_UP", and therefore
265 		 * we should not have received any messages
266 		 * on this socket yet.
267 		 * This is the only case where it's okay to
268 		 * not dequeue messages from "sk_receive_queue".
269 		 */
270 		if (conn->c_npaths <= 1)
271 			rds_conn_path_connect_if_down(&conn->c_path[0]);
272 		rs_tcp = NULL;
273 		goto rst_nsk;
274 	}
275 
276 	mutex_lock(&rs_tcp->t_conn_path_lock);
277 	cp = rs_tcp->t_cpath;
278 	conn_state = rds_conn_path_state(cp);
279 	WARN_ON(conn_state == RDS_CONN_UP);
280 	if (conn_state != RDS_CONN_CONNECTING && conn_state != RDS_CONN_ERROR) {
281 		rds_conn_path_drop(cp, 0);
282 		goto rst_nsk;
283 	}
284 	/* Save a local pointer to sk and hold a reference before setting
285 	 * callbacks. Once callbacks are set, a concurrent
286 	 * rds_tcp_conn_path_shutdown() may call sock_release(), which
287 	 * sets new_sock->sk to NULL and drops a reference on sk.
288 	 * The local pointer lets us safely access sk_state below even
289 	 * if new_sock->sk has been nulled, and sock_hold() keeps sk
290 	 * itself valid until we are done.
291 	 */
292 	sk = new_sock->sk;
293 	sock_hold(sk);
294 
295 	if (rs_tcp->t_sock) {
296 		/* Duelling SYN has been handled in rds_tcp_accept_one() */
297 		rds_tcp_reset_callbacks(new_sock, cp);
298 		/* rds_connect_path_complete() marks RDS_CONN_UP */
299 		rds_connect_path_complete(cp, RDS_CONN_RESETTING);
300 	} else {
301 		rds_tcp_set_callbacks(new_sock, cp);
302 		rds_connect_path_complete(cp, RDS_CONN_CONNECTING);
303 	}
304 
305 	/* Since "rds_tcp_set_callbacks" happens this late
306 	 * the connection may already have been closed without
307 	 * "rds_tcp_state_change" doing its due diligence.
308 	 *
309 	 * If that's the case, we simply drop the path,
310 	 * knowing that "rds_tcp_conn_path_shutdown" will
311 	 * dequeue pending messages.
312 	 */
313 	if (READ_ONCE(sk->sk_state) == TCP_CLOSE_WAIT ||
314 	    READ_ONCE(sk->sk_state) == TCP_LAST_ACK ||
315 	    READ_ONCE(sk->sk_state) == TCP_CLOSE)
316 		rds_conn_path_drop(cp, 0);
317 	else
318 		queue_delayed_work(cp->cp_wq, &cp->cp_recv_w, 0);
319 
320 	sock_put(sk);
321 
322 	new_sock = NULL;
323 	ret = 0;
324 	if (conn->c_npaths == 0)
325 		rds_send_ping(cp->cp_conn, cp->cp_index);
326 	goto out;
327 rst_nsk:
328 	/* reset the newly returned accept sock and bail.
329 	 * It is safe to set linger on new_sock because the RDS connection
330 	 * has not been brought up on new_sock, so no RDS-level data could
331 	 * be pending on it. By setting linger, we achieve the side-effect
332 	 * of avoiding TIME_WAIT state on new_sock.
333 	 */
334 	sock_no_linger(new_sock->sk);
335 	kernel_sock_shutdown(new_sock, SHUT_RDWR);
336 	ret = 0;
337 out:
338 	if (rs_tcp)
339 		mutex_unlock(&rs_tcp->t_conn_path_lock);
340 	if (new_sock)
341 		sock_release(new_sock);
342 
343 	mutex_unlock(&rtn->rds_tcp_accept_lock);
344 
345 	return ret;
346 }
347 
348 void rds_tcp_listen_data_ready(struct sock *sk)
349 {
350 	void (*ready)(struct sock *sk);
351 
352 	trace_sk_data_ready(sk);
353 	rdsdebug("listen data ready sk %p\n", sk);
354 
355 	read_lock_bh(&sk->sk_callback_lock);
356 	ready = sk->sk_user_data;
357 	if (!ready) { /* check for teardown race */
358 		ready = sk->sk_data_ready;
359 		goto out;
360 	}
361 
362 	/*
363 	 * ->sk_data_ready is also called for a newly established child socket
364 	 * before it has been accepted and the accepter has set up their
365 	 * data_ready.. we only want to queue listen work for our listening
366 	 * socket
367 	 *
368 	 * (*ready)() may be null if we are racing with netns delete, and
369 	 * the listen socket is being torn down.
370 	 */
371 	if (sk->sk_state == TCP_LISTEN)
372 		rds_tcp_accept_work(net_generic(sock_net(sk), rds_tcp_netid));
373 	else
374 		ready = rds_tcp_listen_sock_def_readable(sock_net(sk));
375 
376 out:
377 	read_unlock_bh(&sk->sk_callback_lock);
378 	if (ready)
379 		ready(sk);
380 }
381 
382 struct socket *rds_tcp_listen_init(struct net *net, bool isv6)
383 {
384 	struct socket *sock = NULL;
385 	struct sockaddr_storage ss;
386 	struct sockaddr_in6 *sin6;
387 	struct sockaddr_in *sin;
388 	int addr_len;
389 	int ret;
390 
391 	ret = sock_create_kern(net, isv6 ? PF_INET6 : PF_INET, SOCK_STREAM,
392 			       IPPROTO_TCP, &sock);
393 	if (ret < 0) {
394 		rdsdebug("could not create %s listener socket: %d\n",
395 			 isv6 ? "IPv6" : "IPv4", ret);
396 		goto out;
397 	}
398 
399 	sock->sk->sk_reuse = SK_CAN_REUSE;
400 	tcp_sock_set_nodelay(sock->sk);
401 
402 	write_lock_bh(&sock->sk->sk_callback_lock);
403 	sock->sk->sk_user_data = sock->sk->sk_data_ready;
404 	sock->sk->sk_data_ready = rds_tcp_listen_data_ready;
405 	write_unlock_bh(&sock->sk->sk_callback_lock);
406 
407 	if (isv6) {
408 		sin6 = (struct sockaddr_in6 *)&ss;
409 		sin6->sin6_family = PF_INET6;
410 		sin6->sin6_addr = in6addr_any;
411 		sin6->sin6_port = htons(RDS_TCP_PORT);
412 		sin6->sin6_scope_id = 0;
413 		sin6->sin6_flowinfo = 0;
414 		addr_len = sizeof(*sin6);
415 	} else {
416 		sin = (struct sockaddr_in *)&ss;
417 		sin->sin_family = PF_INET;
418 		sin->sin_addr.s_addr = htonl(INADDR_ANY);
419 		sin->sin_port = htons(RDS_TCP_PORT);
420 		addr_len = sizeof(*sin);
421 	}
422 
423 	ret = kernel_bind(sock, (struct sockaddr_unsized *)&ss, addr_len);
424 	if (ret < 0) {
425 		rdsdebug("could not bind %s listener socket: %d\n",
426 			 isv6 ? "IPv6" : "IPv4", ret);
427 		goto out;
428 	}
429 
430 	ret = sock->ops->listen(sock, 64);
431 	if (ret < 0)
432 		goto out;
433 
434 	return sock;
435 out:
436 	if (sock)
437 		sock_release(sock);
438 	return NULL;
439 }
440 
441 void rds_tcp_listen_stop(struct socket *sock, struct work_struct *acceptor)
442 {
443 	struct sock *sk;
444 
445 	if (!sock)
446 		return;
447 
448 	sk = sock->sk;
449 
450 	/* serialize with and prevent further callbacks */
451 	lock_sock(sk);
452 	write_lock_bh(&sk->sk_callback_lock);
453 	if (sk->sk_user_data) {
454 		sk->sk_data_ready = sk->sk_user_data;
455 		sk->sk_user_data = NULL;
456 	}
457 	write_unlock_bh(&sk->sk_callback_lock);
458 	release_sock(sk);
459 
460 	/* wait for accepts to stop and close the socket */
461 	flush_workqueue(rds_wq);
462 	flush_work(acceptor);
463 	sock_release(sock);
464 }
465