1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* RxRPC Tx data buffering. 3 * 4 * Copyright (C) 2022 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 9 10 #include <linux/slab.h> 11 #include "ar-internal.h" 12 13 static atomic_t rxrpc_txbuf_debug_ids; 14 atomic_t rxrpc_nr_txbuf; 15 16 /* 17 * Allocate and partially initialise an I/O request structure. 18 */ 19 struct rxrpc_txbuf *rxrpc_alloc_txbuf(struct rxrpc_call *call, u8 packet_type, 20 gfp_t gfp) 21 { 22 struct rxrpc_txbuf *txb; 23 24 txb = kmalloc(sizeof(*txb), gfp); 25 if (txb) { 26 INIT_LIST_HEAD(&txb->call_link); 27 INIT_LIST_HEAD(&txb->tx_link); 28 refcount_set(&txb->ref, 1); 29 txb->call = call; 30 txb->call_debug_id = call->debug_id; 31 txb->debug_id = atomic_inc_return(&rxrpc_txbuf_debug_ids); 32 txb->space = sizeof(txb->data); 33 txb->len = 0; 34 txb->offset = 0; 35 txb->flags = 0; 36 txb->ack_why = 0; 37 txb->seq = call->tx_top + 1; 38 txb->wire.epoch = htonl(call->conn->proto.epoch); 39 txb->wire.cid = htonl(call->cid); 40 txb->wire.callNumber = htonl(call->call_id); 41 txb->wire.seq = htonl(txb->seq); 42 txb->wire.type = packet_type; 43 txb->wire.flags = call->conn->out_clientflag; 44 txb->wire.userStatus = 0; 45 txb->wire.securityIndex = call->security_ix; 46 txb->wire._rsvd = 0; 47 txb->wire.serviceId = htons(call->service_id); 48 49 trace_rxrpc_txbuf(txb->debug_id, 50 txb->call_debug_id, txb->seq, 1, 51 packet_type == RXRPC_PACKET_TYPE_DATA ? 52 rxrpc_txbuf_alloc_data : 53 rxrpc_txbuf_alloc_ack); 54 atomic_inc(&rxrpc_nr_txbuf); 55 } 56 57 return txb; 58 } 59 60 void rxrpc_get_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what) 61 { 62 int r; 63 64 __refcount_inc(&txb->ref, &r); 65 trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, r + 1, what); 66 } 67 68 void rxrpc_see_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what) 69 { 70 int r = refcount_read(&txb->ref); 71 72 trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, r, what); 73 } 74 75 static void rxrpc_free_txbuf(struct rcu_head *rcu) 76 { 77 struct rxrpc_txbuf *txb = container_of(rcu, struct rxrpc_txbuf, rcu); 78 79 trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, 0, 80 rxrpc_txbuf_free); 81 kfree(txb); 82 atomic_dec(&rxrpc_nr_txbuf); 83 } 84 85 void rxrpc_put_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what) 86 { 87 unsigned int debug_id, call_debug_id; 88 rxrpc_seq_t seq; 89 bool dead; 90 int r; 91 92 if (txb) { 93 debug_id = txb->debug_id; 94 call_debug_id = txb->call_debug_id; 95 seq = txb->seq; 96 dead = __refcount_dec_and_test(&txb->ref, &r); 97 trace_rxrpc_txbuf(debug_id, call_debug_id, seq, r - 1, what); 98 if (dead) 99 call_rcu(&txb->rcu, rxrpc_free_txbuf); 100 } 101 } 102 103 /* 104 * Shrink the transmit buffer. 105 */ 106 void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call) 107 { 108 struct rxrpc_txbuf *txb; 109 rxrpc_seq_t hard_ack = smp_load_acquire(&call->acks_hard_ack); 110 111 _enter("%x/%x/%x", call->tx_bottom, call->acks_hard_ack, call->tx_top); 112 113 for (;;) { 114 spin_lock(&call->tx_lock); 115 txb = list_first_entry_or_null(&call->tx_buffer, 116 struct rxrpc_txbuf, call_link); 117 if (!txb) 118 break; 119 hard_ack = smp_load_acquire(&call->acks_hard_ack); 120 if (before(hard_ack, txb->seq)) 121 break; 122 123 ASSERTCMP(txb->seq, ==, call->tx_bottom + 1); 124 call->tx_bottom++; 125 list_del_rcu(&txb->call_link); 126 127 trace_rxrpc_txqueue(call, rxrpc_txqueue_dequeue); 128 129 spin_unlock(&call->tx_lock); 130 131 rxrpc_put_txbuf(txb, rxrpc_txbuf_put_rotated); 132 } 133 134 spin_unlock(&call->tx_lock); 135 } 136