1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* AF_RXRPC sendmsg() implementation.
3 *
4 * Copyright (C) 2007, 2016 Red Hat, Inc. All Rights Reserved.
5 * Written by David Howells (dhowells@redhat.com)
6 */
7
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9
10 #include <linux/net.h>
11 #include <linux/gfp.h>
12 #include <linux/skbuff.h>
13 #include <linux/export.h>
14 #include <linux/sched/signal.h>
15
16 #include <net/sock.h>
17 #include <net/af_rxrpc.h>
18 #include "ar-internal.h"
19
20 /*
21 * Propose an abort to be made in the I/O thread.
22 */
rxrpc_propose_abort(struct rxrpc_call * call,s32 abort_code,int error,enum rxrpc_abort_reason why)23 bool rxrpc_propose_abort(struct rxrpc_call *call, s32 abort_code, int error,
24 enum rxrpc_abort_reason why)
25 {
26 _enter("{%d},%d,%d,%u", call->debug_id, abort_code, error, why);
27
28 if (!call->send_abort && !rxrpc_call_is_complete(call)) {
29 call->send_abort_why = why;
30 call->send_abort_err = error;
31 call->send_abort_seq = 0;
32 trace_rxrpc_abort_call(call, abort_code);
33 /* Request abort locklessly vs rxrpc_input_call_event(). */
34 smp_store_release(&call->send_abort, abort_code);
35 rxrpc_poke_call(call, rxrpc_call_poke_abort);
36 return true;
37 }
38
39 return false;
40 }
41
42 /*
43 * Wait for a call to become connected. Interruption here doesn't cause the
44 * call to be aborted.
45 */
rxrpc_wait_to_be_connected(struct rxrpc_call * call,long * timeo)46 static int rxrpc_wait_to_be_connected(struct rxrpc_call *call, long *timeo)
47 {
48 DECLARE_WAITQUEUE(myself, current);
49 int ret = 0;
50
51 _enter("%d", call->debug_id);
52
53 if (rxrpc_call_state(call) != RXRPC_CALL_CLIENT_AWAIT_CONN)
54 goto no_wait;
55
56 add_wait_queue_exclusive(&call->waitq, &myself);
57
58 for (;;) {
59 switch (call->interruptibility) {
60 case RXRPC_INTERRUPTIBLE:
61 case RXRPC_PREINTERRUPTIBLE:
62 set_current_state(TASK_INTERRUPTIBLE);
63 break;
64 case RXRPC_UNINTERRUPTIBLE:
65 default:
66 set_current_state(TASK_UNINTERRUPTIBLE);
67 break;
68 }
69
70 if (rxrpc_call_state(call) != RXRPC_CALL_CLIENT_AWAIT_CONN)
71 break;
72 if ((call->interruptibility == RXRPC_INTERRUPTIBLE ||
73 call->interruptibility == RXRPC_PREINTERRUPTIBLE) &&
74 signal_pending(current)) {
75 ret = sock_intr_errno(*timeo);
76 break;
77 }
78 *timeo = schedule_timeout(*timeo);
79 }
80
81 remove_wait_queue(&call->waitq, &myself);
82 __set_current_state(TASK_RUNNING);
83
84 no_wait:
85 if (ret == 0 && rxrpc_call_is_complete(call))
86 ret = call->error;
87
88 _leave(" = %d", ret);
89 return ret;
90 }
91
92 /*
93 * Return true if there's sufficient Tx queue space.
94 */
rxrpc_check_tx_space(struct rxrpc_call * call,rxrpc_seq_t * _tx_win)95 static bool rxrpc_check_tx_space(struct rxrpc_call *call, rxrpc_seq_t *_tx_win)
96 {
97 rxrpc_seq_t tx_bottom = READ_ONCE(call->tx_bottom);
98
99 if (_tx_win)
100 *_tx_win = tx_bottom;
101 return call->send_top - tx_bottom < 256;
102 }
103
104 /*
105 * Wait for space to appear in the Tx queue or a signal to occur.
106 */
rxrpc_wait_for_tx_window_intr(struct rxrpc_sock * rx,struct rxrpc_call * call,long * timeo)107 static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
108 struct rxrpc_call *call,
109 long *timeo)
110 {
111 for (;;) {
112 set_current_state(TASK_INTERRUPTIBLE);
113 if (rxrpc_check_tx_space(call, NULL))
114 return 0;
115
116 if (rxrpc_call_is_complete(call))
117 return call->error;
118
119 if (signal_pending(current))
120 return sock_intr_errno(*timeo);
121
122 trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
123 *timeo = schedule_timeout(*timeo);
124 }
125 }
126
127 /*
128 * Wait for space to appear in the Tx queue uninterruptibly, but with
129 * a timeout of 2*RTT if no progress was made and a signal occurred.
130 */
rxrpc_wait_for_tx_window_waitall(struct rxrpc_sock * rx,struct rxrpc_call * call)131 static int rxrpc_wait_for_tx_window_waitall(struct rxrpc_sock *rx,
132 struct rxrpc_call *call)
133 {
134 rxrpc_seq_t tx_start, tx_win;
135 signed long rtt, timeout;
136
137 rtt = READ_ONCE(call->srtt_us) >> 3;
138 rtt = usecs_to_jiffies(rtt) * 2;
139 if (rtt < 2)
140 rtt = 2;
141
142 timeout = rtt;
143 tx_start = READ_ONCE(call->tx_bottom);
144
145 for (;;) {
146 set_current_state(TASK_UNINTERRUPTIBLE);
147
148 if (rxrpc_check_tx_space(call, &tx_win))
149 return 0;
150
151 if (rxrpc_call_is_complete(call))
152 return call->error;
153
154 if (timeout == 0 &&
155 tx_win == tx_start && signal_pending(current))
156 return -EINTR;
157
158 if (tx_win != tx_start) {
159 timeout = rtt;
160 tx_start = tx_win;
161 }
162
163 trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
164 timeout = schedule_timeout(timeout);
165 }
166 }
167
168 /*
169 * Wait for space to appear in the Tx queue uninterruptibly.
170 */
rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock * rx,struct rxrpc_call * call,long * timeo)171 static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
172 struct rxrpc_call *call,
173 long *timeo)
174 {
175 for (;;) {
176 set_current_state(TASK_UNINTERRUPTIBLE);
177 if (rxrpc_check_tx_space(call, NULL))
178 return 0;
179
180 if (rxrpc_call_is_complete(call))
181 return call->error;
182
183 trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
184 *timeo = schedule_timeout(*timeo);
185 }
186 }
187
188 /*
189 * wait for space to appear in the transmit/ACK window
190 * - caller holds the socket locked
191 */
rxrpc_wait_for_tx_window(struct rxrpc_sock * rx,struct rxrpc_call * call,long * timeo,bool waitall)192 static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,
193 struct rxrpc_call *call,
194 long *timeo,
195 bool waitall)
196 {
197 DECLARE_WAITQUEUE(myself, current);
198 int ret;
199
200 _enter(",{%u,%u,%u}",
201 call->tx_bottom, call->tx_top, call->tx_winsize);
202
203 add_wait_queue(&call->waitq, &myself);
204
205 switch (call->interruptibility) {
206 case RXRPC_INTERRUPTIBLE:
207 if (waitall)
208 ret = rxrpc_wait_for_tx_window_waitall(rx, call);
209 else
210 ret = rxrpc_wait_for_tx_window_intr(rx, call, timeo);
211 break;
212 case RXRPC_PREINTERRUPTIBLE:
213 case RXRPC_UNINTERRUPTIBLE:
214 default:
215 ret = rxrpc_wait_for_tx_window_nonintr(rx, call, timeo);
216 break;
217 }
218
219 remove_wait_queue(&call->waitq, &myself);
220 set_current_state(TASK_RUNNING);
221 _leave(" = %d", ret);
222 return ret;
223 }
224
225 /*
226 * Notify the owner of the call that the transmit phase is ended and the last
227 * packet has been queued.
228 */
rxrpc_notify_end_tx(struct rxrpc_sock * rx,struct rxrpc_call * call,rxrpc_notify_end_tx_t notify_end_tx)229 static void rxrpc_notify_end_tx(struct rxrpc_sock *rx, struct rxrpc_call *call,
230 rxrpc_notify_end_tx_t notify_end_tx)
231 {
232 if (notify_end_tx)
233 notify_end_tx(&rx->sk, call, call->user_call_ID);
234 }
235
236 /*
237 * Queue a DATA packet for transmission, set the resend timeout and send
238 * the packet immediately. Returns the error from rxrpc_send_data_packet()
239 * in case the caller wants to do something with it.
240 */
rxrpc_queue_packet(struct rxrpc_sock * rx,struct rxrpc_call * call,struct rxrpc_txbuf * txb,rxrpc_notify_end_tx_t notify_end_tx)241 static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
242 struct rxrpc_txbuf *txb,
243 rxrpc_notify_end_tx_t notify_end_tx)
244 {
245 struct rxrpc_txqueue *sq = call->send_queue;
246 rxrpc_seq_t seq = txb->seq;
247 bool poke, last = txb->flags & RXRPC_LAST_PACKET;
248 int ix = seq & RXRPC_TXQ_MASK;
249 rxrpc_inc_stat(call->rxnet, stat_tx_data);
250
251 ASSERTCMP(txb->seq, ==, call->send_top + 1);
252
253 if (last)
254 trace_rxrpc_txqueue(call, rxrpc_txqueue_queue_last);
255 else
256 trace_rxrpc_txqueue(call, rxrpc_txqueue_queue);
257
258 if (WARN_ON_ONCE(sq->bufs[ix]))
259 trace_rxrpc_tq(call, sq, seq, rxrpc_tq_queue_dup);
260 else
261 trace_rxrpc_tq(call, sq, seq, rxrpc_tq_queue);
262
263 /* Add the packet to the call's output buffer */
264 poke = (READ_ONCE(call->tx_bottom) == call->send_top);
265 sq->bufs[ix] = txb;
266 /* Order send_top after the queue->next pointer and txb content. */
267 smp_store_release(&call->send_top, seq);
268 if (last) {
269 set_bit(RXRPC_CALL_TX_NO_MORE, &call->flags);
270 rxrpc_notify_end_tx(rx, call, notify_end_tx);
271 call->send_queue = NULL;
272 }
273
274 if (poke)
275 rxrpc_poke_call(call, rxrpc_call_poke_start);
276 }
277
278 /*
279 * Allocate a new txqueue unit and add it to the transmission queue.
280 */
rxrpc_alloc_txqueue(struct sock * sk,struct rxrpc_call * call)281 static int rxrpc_alloc_txqueue(struct sock *sk, struct rxrpc_call *call)
282 {
283 struct rxrpc_txqueue *tq;
284
285 tq = kzalloc(sizeof(*tq), sk->sk_allocation);
286 if (!tq)
287 return -ENOMEM;
288
289 tq->xmit_ts_base = KTIME_MIN;
290 for (int i = 0; i < RXRPC_NR_TXQUEUE; i++)
291 tq->segment_xmit_ts[i] = UINT_MAX;
292
293 if (call->send_queue) {
294 tq->qbase = call->send_top + 1;
295 call->send_queue->next = tq;
296 call->send_queue = tq;
297 } else if (WARN_ON(call->tx_queue)) {
298 kfree(tq);
299 return -ENOMEM;
300 } else {
301 /* We start at seq 1, so pretend seq 0 is hard-acked. */
302 tq->nr_reported_acks = 1;
303 tq->segment_acked = 1UL;
304 tq->qbase = 0;
305 call->tx_qbase = 0;
306 call->send_queue = tq;
307 call->tx_qtail = tq;
308 call->tx_queue = tq;
309 }
310
311 trace_rxrpc_tq(call, tq, call->send_top, rxrpc_tq_alloc);
312 return 0;
313 }
314
315 /*
316 * send data through a socket
317 * - must be called in process context
318 * - The caller holds the call user access mutex, but not the socket lock.
319 */
rxrpc_send_data(struct rxrpc_sock * rx,struct rxrpc_call * call,struct msghdr * msg,size_t len,rxrpc_notify_end_tx_t notify_end_tx,bool * _dropped_lock)320 static int rxrpc_send_data(struct rxrpc_sock *rx,
321 struct rxrpc_call *call,
322 struct msghdr *msg, size_t len,
323 rxrpc_notify_end_tx_t notify_end_tx,
324 bool *_dropped_lock)
325 {
326 struct rxrpc_txbuf *txb;
327 struct sock *sk = &rx->sk;
328 enum rxrpc_call_state state;
329 long timeo;
330 bool more = msg->msg_flags & MSG_MORE;
331 int ret, copied = 0;
332
333 if (test_bit(RXRPC_CALL_TX_NO_MORE, &call->flags)) {
334 trace_rxrpc_abort(call->debug_id, rxrpc_sendmsg_late_send,
335 call->cid, call->call_id, call->rx_consumed,
336 0, -EPROTO);
337 return -EPROTO;
338 }
339
340 timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
341
342 ret = rxrpc_wait_to_be_connected(call, &timeo);
343 if (ret < 0)
344 return ret;
345
346 if (call->conn->state == RXRPC_CONN_CLIENT_UNSECURED) {
347 ret = rxrpc_init_client_conn_security(call->conn);
348 if (ret < 0)
349 return ret;
350 }
351
352 /* this should be in poll */
353 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
354
355 reload:
356 txb = call->tx_pending;
357 call->tx_pending = NULL;
358 if (txb)
359 rxrpc_see_txbuf(txb, rxrpc_txbuf_see_send_more);
360
361 ret = -EPIPE;
362 if (sk->sk_shutdown & SEND_SHUTDOWN)
363 goto maybe_error;
364 state = rxrpc_call_state(call);
365 ret = -ESHUTDOWN;
366 if (state >= RXRPC_CALL_COMPLETE)
367 goto maybe_error;
368 ret = -EPROTO;
369 if (state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
370 state != RXRPC_CALL_SERVER_ACK_REQUEST &&
371 state != RXRPC_CALL_SERVER_SEND_REPLY) {
372 /* Request phase complete for this client call */
373 trace_rxrpc_abort(call->debug_id, rxrpc_sendmsg_late_send,
374 call->cid, call->call_id, call->rx_consumed,
375 0, -EPROTO);
376 goto maybe_error;
377 }
378
379 ret = -EMSGSIZE;
380 if (call->tx_total_len != -1) {
381 if (len - copied > call->tx_total_len)
382 goto maybe_error;
383 if (!more && len - copied != call->tx_total_len)
384 goto maybe_error;
385 }
386
387 do {
388 if (!txb) {
389 size_t remain;
390
391 _debug("alloc");
392
393 if (!rxrpc_check_tx_space(call, NULL))
394 goto wait_for_space;
395
396 /* See if we need to begin/extend the Tx queue. */
397 if (!call->send_queue || !((call->send_top + 1) & RXRPC_TXQ_MASK)) {
398 ret = rxrpc_alloc_txqueue(sk, call);
399 if (ret < 0)
400 goto maybe_error;
401 }
402
403 /* Work out the maximum size of a packet. Assume that
404 * the security header is going to be in the padded
405 * region (enc blocksize), but the trailer is not.
406 */
407 remain = more ? INT_MAX : msg_data_left(msg);
408 txb = call->conn->security->alloc_txbuf(call, remain, sk->sk_allocation);
409 if (!txb) {
410 ret = -ENOMEM;
411 goto maybe_error;
412 }
413 }
414
415 _debug("append");
416
417 /* append next segment of data to the current buffer */
418 if (msg_data_left(msg) > 0) {
419 size_t copy = umin(txb->space, msg_data_left(msg));
420
421 _debug("add %zu", copy);
422 if (!copy_from_iter_full(txb->data + txb->offset,
423 copy, &msg->msg_iter))
424 goto efault;
425 _debug("added");
426 txb->space -= copy;
427 txb->len += copy;
428 txb->offset += copy;
429 copied += copy;
430 if (call->tx_total_len != -1)
431 call->tx_total_len -= copy;
432 }
433
434 /* check for the far side aborting the call or a network error
435 * occurring */
436 if (rxrpc_call_is_complete(call))
437 goto call_terminated;
438
439 /* add the packet to the send queue if it's now full */
440 if (!txb->space ||
441 (msg_data_left(msg) == 0 && !more)) {
442 if (msg_data_left(msg) == 0 && !more)
443 txb->flags |= RXRPC_LAST_PACKET;
444
445 ret = call->security->secure_packet(call, txb);
446 if (ret < 0)
447 goto out;
448 rxrpc_queue_packet(rx, call, txb, notify_end_tx);
449 txb = NULL;
450 }
451 } while (msg_data_left(msg) > 0);
452
453 success:
454 ret = copied;
455 if (rxrpc_call_is_complete(call) &&
456 call->error < 0)
457 ret = call->error;
458 out:
459 call->tx_pending = txb;
460 _leave(" = %d", ret);
461 return ret;
462
463 call_terminated:
464 rxrpc_put_txbuf(txb, rxrpc_txbuf_put_send_aborted);
465 _leave(" = %d", call->error);
466 return call->error;
467
468 maybe_error:
469 if (copied)
470 goto success;
471 goto out;
472
473 efault:
474 ret = -EFAULT;
475 goto out;
476
477 wait_for_space:
478 ret = -EAGAIN;
479 if (msg->msg_flags & MSG_DONTWAIT)
480 goto maybe_error;
481 mutex_unlock(&call->user_mutex);
482 *_dropped_lock = true;
483 ret = rxrpc_wait_for_tx_window(rx, call, &timeo,
484 msg->msg_flags & MSG_WAITALL);
485 if (ret < 0)
486 goto maybe_error;
487 if (call->interruptibility == RXRPC_INTERRUPTIBLE) {
488 if (mutex_lock_interruptible(&call->user_mutex) < 0) {
489 ret = sock_intr_errno(timeo);
490 goto maybe_error;
491 }
492 } else {
493 mutex_lock(&call->user_mutex);
494 }
495 *_dropped_lock = false;
496 goto reload;
497 }
498
499 /*
500 * extract control messages from the sendmsg() control buffer
501 */
rxrpc_sendmsg_cmsg(struct msghdr * msg,struct rxrpc_send_params * p)502 static int rxrpc_sendmsg_cmsg(struct msghdr *msg, struct rxrpc_send_params *p)
503 {
504 struct cmsghdr *cmsg;
505 bool got_user_ID = false;
506 int len;
507
508 if (msg->msg_controllen == 0)
509 return -EINVAL;
510
511 for_each_cmsghdr(cmsg, msg) {
512 if (!CMSG_OK(msg, cmsg))
513 return -EINVAL;
514
515 len = cmsg->cmsg_len - sizeof(struct cmsghdr);
516 _debug("CMSG %d, %d, %d",
517 cmsg->cmsg_level, cmsg->cmsg_type, len);
518
519 if (cmsg->cmsg_level != SOL_RXRPC)
520 continue;
521
522 switch (cmsg->cmsg_type) {
523 case RXRPC_USER_CALL_ID:
524 if (msg->msg_flags & MSG_CMSG_COMPAT) {
525 if (len != sizeof(u32))
526 return -EINVAL;
527 p->call.user_call_ID = *(u32 *)CMSG_DATA(cmsg);
528 } else {
529 if (len != sizeof(unsigned long))
530 return -EINVAL;
531 p->call.user_call_ID = *(unsigned long *)
532 CMSG_DATA(cmsg);
533 }
534 got_user_ID = true;
535 break;
536
537 case RXRPC_ABORT:
538 if (p->command != RXRPC_CMD_SEND_DATA)
539 return -EINVAL;
540 p->command = RXRPC_CMD_SEND_ABORT;
541 if (len != sizeof(p->abort_code))
542 return -EINVAL;
543 p->abort_code = *(unsigned int *)CMSG_DATA(cmsg);
544 if (p->abort_code == 0)
545 return -EINVAL;
546 break;
547
548 case RXRPC_CHARGE_ACCEPT:
549 if (p->command != RXRPC_CMD_SEND_DATA)
550 return -EINVAL;
551 p->command = RXRPC_CMD_CHARGE_ACCEPT;
552 if (len != 0)
553 return -EINVAL;
554 break;
555
556 case RXRPC_EXCLUSIVE_CALL:
557 p->exclusive = true;
558 if (len != 0)
559 return -EINVAL;
560 break;
561
562 case RXRPC_UPGRADE_SERVICE:
563 p->upgrade = true;
564 if (len != 0)
565 return -EINVAL;
566 break;
567
568 case RXRPC_TX_LENGTH:
569 if (p->call.tx_total_len != -1 || len != sizeof(__s64))
570 return -EINVAL;
571 p->call.tx_total_len = *(__s64 *)CMSG_DATA(cmsg);
572 if (p->call.tx_total_len < 0)
573 return -EINVAL;
574 break;
575
576 case RXRPC_SET_CALL_TIMEOUT:
577 if (len & 3 || len < 4 || len > 12)
578 return -EINVAL;
579 memcpy(&p->call.timeouts, CMSG_DATA(cmsg), len);
580 p->call.nr_timeouts = len / 4;
581 if (p->call.timeouts.hard > INT_MAX / HZ)
582 return -ERANGE;
583 if (p->call.nr_timeouts >= 2 && p->call.timeouts.idle > 60 * 60 * 1000)
584 return -ERANGE;
585 if (p->call.nr_timeouts >= 3 && p->call.timeouts.normal > 60 * 60 * 1000)
586 return -ERANGE;
587 break;
588
589 default:
590 return -EINVAL;
591 }
592 }
593
594 if (!got_user_ID)
595 return -EINVAL;
596 if (p->call.tx_total_len != -1 && p->command != RXRPC_CMD_SEND_DATA)
597 return -EINVAL;
598 _leave(" = 0");
599 return 0;
600 }
601
602 /*
603 * Create a new client call for sendmsg().
604 * - Called with the socket lock held, which it must release.
605 * - If it returns a call, the call's lock will need releasing by the caller.
606 */
607 static struct rxrpc_call *
rxrpc_new_client_call_for_sendmsg(struct rxrpc_sock * rx,struct msghdr * msg,struct rxrpc_send_params * p)608 rxrpc_new_client_call_for_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg,
609 struct rxrpc_send_params *p)
610 __releases(&rx->sk.sk_lock.slock)
611 __acquires(&call->user_mutex)
612 {
613 struct rxrpc_conn_parameters cp;
614 struct rxrpc_peer *peer;
615 struct rxrpc_call *call;
616 struct key *key;
617
618 DECLARE_SOCKADDR(struct sockaddr_rxrpc *, srx, msg->msg_name);
619
620 _enter("");
621
622 if (!msg->msg_name) {
623 release_sock(&rx->sk);
624 return ERR_PTR(-EDESTADDRREQ);
625 }
626
627 peer = rxrpc_lookup_peer(rx->local, srx, GFP_KERNEL);
628 if (!peer) {
629 release_sock(&rx->sk);
630 return ERR_PTR(-ENOMEM);
631 }
632
633 key = rx->key;
634 if (key && !rx->key->payload.data[0])
635 key = NULL;
636
637 memset(&cp, 0, sizeof(cp));
638 cp.local = rx->local;
639 cp.peer = peer;
640 cp.key = rx->key;
641 cp.security_level = rx->min_sec_level;
642 cp.exclusive = rx->exclusive | p->exclusive;
643 cp.upgrade = p->upgrade;
644 cp.service_id = srx->srx_service;
645 call = rxrpc_new_client_call(rx, &cp, &p->call, GFP_KERNEL,
646 atomic_inc_return(&rxrpc_debug_id));
647 /* The socket is now unlocked */
648
649 rxrpc_put_peer(peer, rxrpc_peer_put_application);
650 _leave(" = %p\n", call);
651 return call;
652 }
653
654 /*
655 * send a message forming part of a client call through an RxRPC socket
656 * - caller holds the socket locked
657 * - the socket may be either a client socket or a server socket
658 */
rxrpc_do_sendmsg(struct rxrpc_sock * rx,struct msghdr * msg,size_t len)659 int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
660 __releases(&rx->sk.sk_lock.slock)
661 {
662 struct rxrpc_call *call;
663 bool dropped_lock = false;
664 int ret;
665
666 struct rxrpc_send_params p = {
667 .call.tx_total_len = -1,
668 .call.user_call_ID = 0,
669 .call.nr_timeouts = 0,
670 .call.interruptibility = RXRPC_INTERRUPTIBLE,
671 .abort_code = 0,
672 .command = RXRPC_CMD_SEND_DATA,
673 .exclusive = false,
674 .upgrade = false,
675 };
676
677 _enter("");
678
679 ret = rxrpc_sendmsg_cmsg(msg, &p);
680 if (ret < 0)
681 goto error_release_sock;
682
683 if (p.command == RXRPC_CMD_CHARGE_ACCEPT) {
684 ret = -EINVAL;
685 if (rx->sk.sk_state != RXRPC_SERVER_LISTENING)
686 goto error_release_sock;
687 ret = rxrpc_user_charge_accept(rx, p.call.user_call_ID);
688 goto error_release_sock;
689 }
690
691 call = rxrpc_find_call_by_user_ID(rx, p.call.user_call_ID);
692 if (!call) {
693 ret = -EBADSLT;
694 if (p.command != RXRPC_CMD_SEND_DATA)
695 goto error_release_sock;
696 call = rxrpc_new_client_call_for_sendmsg(rx, msg, &p);
697 /* The socket is now unlocked... */
698 if (IS_ERR(call))
699 return PTR_ERR(call);
700 /* ... and we have the call lock. */
701 p.call.nr_timeouts = 0;
702 ret = 0;
703 if (rxrpc_call_is_complete(call))
704 goto out_put_unlock;
705 } else {
706 switch (rxrpc_call_state(call)) {
707 case RXRPC_CALL_CLIENT_AWAIT_CONN:
708 case RXRPC_CALL_SERVER_RECV_REQUEST:
709 if (p.command == RXRPC_CMD_SEND_ABORT)
710 break;
711 fallthrough;
712 case RXRPC_CALL_UNINITIALISED:
713 case RXRPC_CALL_SERVER_PREALLOC:
714 rxrpc_put_call(call, rxrpc_call_put_sendmsg);
715 ret = -EBUSY;
716 goto error_release_sock;
717 default:
718 break;
719 }
720
721 ret = mutex_lock_interruptible(&call->user_mutex);
722 release_sock(&rx->sk);
723 if (ret < 0) {
724 ret = -ERESTARTSYS;
725 goto error_put;
726 }
727
728 if (p.call.tx_total_len != -1) {
729 ret = -EINVAL;
730 if (call->tx_total_len != -1 ||
731 call->tx_pending ||
732 call->tx_top != 0)
733 goto out_put_unlock;
734 call->tx_total_len = p.call.tx_total_len;
735 }
736 }
737
738 switch (p.call.nr_timeouts) {
739 case 3:
740 WRITE_ONCE(call->next_rx_timo, p.call.timeouts.normal);
741 fallthrough;
742 case 2:
743 WRITE_ONCE(call->next_req_timo, p.call.timeouts.idle);
744 fallthrough;
745 case 1:
746 if (p.call.timeouts.hard > 0) {
747 ktime_t delay = ms_to_ktime(p.call.timeouts.hard * MSEC_PER_SEC);
748
749 WRITE_ONCE(call->expect_term_by,
750 ktime_add(p.call.timeouts.hard,
751 ktime_get_real()));
752 trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_hard);
753 rxrpc_poke_call(call, rxrpc_call_poke_set_timeout);
754
755 }
756 break;
757 }
758
759 if (rxrpc_call_is_complete(call)) {
760 /* it's too late for this call */
761 ret = -ESHUTDOWN;
762 } else if (p.command == RXRPC_CMD_SEND_ABORT) {
763 rxrpc_propose_abort(call, p.abort_code, -ECONNABORTED,
764 rxrpc_abort_call_sendmsg);
765 ret = 0;
766 } else if (p.command != RXRPC_CMD_SEND_DATA) {
767 ret = -EINVAL;
768 } else {
769 ret = rxrpc_send_data(rx, call, msg, len, NULL, &dropped_lock);
770 }
771
772 out_put_unlock:
773 if (!dropped_lock)
774 mutex_unlock(&call->user_mutex);
775 error_put:
776 rxrpc_put_call(call, rxrpc_call_put_sendmsg);
777 _leave(" = %d", ret);
778 return ret;
779
780 error_release_sock:
781 release_sock(&rx->sk);
782 return ret;
783 }
784
785 /**
786 * rxrpc_kernel_send_data - Allow a kernel service to send data on a call
787 * @sock: The socket the call is on
788 * @call: The call to send data through
789 * @msg: The data to send
790 * @len: The amount of data to send
791 * @notify_end_tx: Notification that the last packet is queued.
792 *
793 * Allow a kernel service to send data on a call. The call must be in an state
794 * appropriate to sending data. No control data should be supplied in @msg,
795 * nor should an address be supplied. MSG_MORE should be flagged if there's
796 * more data to come, otherwise this data will end the transmission phase.
797 */
rxrpc_kernel_send_data(struct socket * sock,struct rxrpc_call * call,struct msghdr * msg,size_t len,rxrpc_notify_end_tx_t notify_end_tx)798 int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
799 struct msghdr *msg, size_t len,
800 rxrpc_notify_end_tx_t notify_end_tx)
801 {
802 bool dropped_lock = false;
803 int ret;
804
805 _enter("{%d},", call->debug_id);
806
807 ASSERTCMP(msg->msg_name, ==, NULL);
808 ASSERTCMP(msg->msg_control, ==, NULL);
809
810 mutex_lock(&call->user_mutex);
811
812 ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len,
813 notify_end_tx, &dropped_lock);
814 if (ret == -ESHUTDOWN)
815 ret = call->error;
816
817 if (!dropped_lock)
818 mutex_unlock(&call->user_mutex);
819 _leave(" = %d", ret);
820 return ret;
821 }
822 EXPORT_SYMBOL(rxrpc_kernel_send_data);
823
824 /**
825 * rxrpc_kernel_abort_call - Allow a kernel service to abort a call
826 * @sock: The socket the call is on
827 * @call: The call to be aborted
828 * @abort_code: The abort code to stick into the ABORT packet
829 * @error: Local error value
830 * @why: Indication as to why.
831 *
832 * Allow a kernel service to abort a call, if it's still in an abortable state
833 * and return true if the call was aborted, false if it was already complete.
834 */
rxrpc_kernel_abort_call(struct socket * sock,struct rxrpc_call * call,u32 abort_code,int error,enum rxrpc_abort_reason why)835 bool rxrpc_kernel_abort_call(struct socket *sock, struct rxrpc_call *call,
836 u32 abort_code, int error, enum rxrpc_abort_reason why)
837 {
838 bool aborted;
839
840 _enter("{%d},%d,%d,%u", call->debug_id, abort_code, error, why);
841
842 mutex_lock(&call->user_mutex);
843 aborted = rxrpc_propose_abort(call, abort_code, error, why);
844 mutex_unlock(&call->user_mutex);
845 return aborted;
846 }
847 EXPORT_SYMBOL(rxrpc_kernel_abort_call);
848
849 /**
850 * rxrpc_kernel_set_tx_length - Set the total Tx length on a call
851 * @sock: The socket the call is on
852 * @call: The call to be informed
853 * @tx_total_len: The amount of data to be transmitted for this call
854 *
855 * Allow a kernel service to set the total transmit length on a call. This
856 * allows buffer-to-packet encrypt-and-copy to be performed.
857 *
858 * This function is primarily for use for setting the reply length since the
859 * request length can be set when beginning the call.
860 */
rxrpc_kernel_set_tx_length(struct socket * sock,struct rxrpc_call * call,s64 tx_total_len)861 void rxrpc_kernel_set_tx_length(struct socket *sock, struct rxrpc_call *call,
862 s64 tx_total_len)
863 {
864 WARN_ON(call->tx_total_len != -1);
865 call->tx_total_len = tx_total_len;
866 }
867 EXPORT_SYMBOL(rxrpc_kernel_set_tx_length);
868