1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* RxRPC Tx data buffering. 3 * 4 * Copyright (C) 2022 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 9 10 #include <linux/slab.h> 11 #include "ar-internal.h" 12 13 static atomic_t rxrpc_txbuf_debug_ids; 14 atomic_t rxrpc_nr_txbuf; 15 16 /* 17 * Allocate and partially initialise a data transmission buffer. 18 */ 19 struct rxrpc_txbuf *rxrpc_alloc_data_txbuf(struct rxrpc_call *call, size_t data_size, 20 size_t data_align, gfp_t gfp) 21 { 22 struct rxrpc_wire_header *whdr; 23 struct rxrpc_txbuf *txb; 24 size_t total, hoff = 0; 25 void *buf; 26 27 txb = kmalloc(sizeof(*txb), gfp); 28 if (!txb) 29 return NULL; 30 31 if (data_align) 32 hoff = round_up(sizeof(*whdr), data_align) - sizeof(*whdr); 33 total = hoff + sizeof(*whdr) + data_size; 34 35 mutex_lock(&call->conn->tx_data_alloc_lock); 36 buf = __page_frag_alloc_align(&call->conn->tx_data_alloc, total, gfp, 37 ~(data_align - 1) & ~(L1_CACHE_BYTES - 1)); 38 mutex_unlock(&call->conn->tx_data_alloc_lock); 39 if (!buf) { 40 kfree(txb); 41 return NULL; 42 } 43 44 whdr = buf + hoff; 45 46 INIT_LIST_HEAD(&txb->call_link); 47 INIT_LIST_HEAD(&txb->tx_link); 48 refcount_set(&txb->ref, 1); 49 txb->last_sent = KTIME_MIN; 50 txb->call_debug_id = call->debug_id; 51 txb->debug_id = atomic_inc_return(&rxrpc_txbuf_debug_ids); 52 txb->space = data_size; 53 txb->len = 0; 54 txb->offset = sizeof(*whdr); 55 txb->flags = call->conn->out_clientflag; 56 txb->ack_why = 0; 57 txb->seq = call->tx_prepared + 1; 58 txb->serial = 0; 59 txb->cksum = 0; 60 txb->nr_kvec = 1; 61 txb->kvec[0].iov_base = whdr; 62 txb->kvec[0].iov_len = sizeof(*whdr); 63 64 whdr->epoch = htonl(call->conn->proto.epoch); 65 whdr->cid = htonl(call->cid); 66 whdr->callNumber = htonl(call->call_id); 67 whdr->seq = htonl(txb->seq); 68 whdr->type = RXRPC_PACKET_TYPE_DATA; 69 whdr->flags = 0; 70 whdr->userStatus = 0; 71 whdr->securityIndex = call->security_ix; 72 whdr->_rsvd = 0; 73 whdr->serviceId = htons(call->dest_srx.srx_service); 74 75 trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, 1, 76 rxrpc_txbuf_alloc_data); 77 78 atomic_inc(&rxrpc_nr_txbuf); 79 return txb; 80 } 81 82 /* 83 * Allocate and partially initialise an ACK packet. 84 */ 85 struct rxrpc_txbuf *rxrpc_alloc_ack_txbuf(struct rxrpc_call *call, size_t sack_size) 86 { 87 struct rxrpc_wire_header *whdr; 88 struct rxrpc_acktrailer *trailer; 89 struct rxrpc_ackpacket *ack; 90 struct rxrpc_txbuf *txb; 91 gfp_t gfp = rcu_read_lock_held() ? GFP_ATOMIC | __GFP_NOWARN : GFP_NOFS; 92 void *buf, *buf2 = NULL; 93 u8 *filler; 94 95 txb = kmalloc(sizeof(*txb), gfp); 96 if (!txb) 97 return NULL; 98 99 buf = page_frag_alloc(&call->local->tx_alloc, 100 sizeof(*whdr) + sizeof(*ack) + 1 + 3 + sizeof(*trailer), gfp); 101 if (!buf) { 102 kfree(txb); 103 return NULL; 104 } 105 106 if (sack_size) { 107 buf2 = page_frag_alloc(&call->local->tx_alloc, sack_size, gfp); 108 if (!buf2) { 109 page_frag_free(buf); 110 kfree(txb); 111 return NULL; 112 } 113 } 114 115 whdr = buf; 116 ack = buf + sizeof(*whdr); 117 filler = buf + sizeof(*whdr) + sizeof(*ack) + 1; 118 trailer = buf + sizeof(*whdr) + sizeof(*ack) + 1 + 3; 119 120 INIT_LIST_HEAD(&txb->call_link); 121 INIT_LIST_HEAD(&txb->tx_link); 122 refcount_set(&txb->ref, 1); 123 txb->call_debug_id = call->debug_id; 124 txb->debug_id = atomic_inc_return(&rxrpc_txbuf_debug_ids); 125 txb->space = 0; 126 txb->len = sizeof(*whdr) + sizeof(*ack) + 3 + sizeof(*trailer); 127 txb->offset = 0; 128 txb->flags = call->conn->out_clientflag; 129 txb->ack_rwind = 0; 130 txb->seq = 0; 131 txb->serial = 0; 132 txb->cksum = 0; 133 txb->nr_kvec = 3; 134 txb->kvec[0].iov_base = whdr; 135 txb->kvec[0].iov_len = sizeof(*whdr) + sizeof(*ack); 136 txb->kvec[1].iov_base = buf2; 137 txb->kvec[1].iov_len = sack_size; 138 txb->kvec[2].iov_base = filler; 139 txb->kvec[2].iov_len = 3 + sizeof(*trailer); 140 141 whdr->epoch = htonl(call->conn->proto.epoch); 142 whdr->cid = htonl(call->cid); 143 whdr->callNumber = htonl(call->call_id); 144 whdr->seq = 0; 145 whdr->type = RXRPC_PACKET_TYPE_ACK; 146 whdr->flags = 0; 147 whdr->userStatus = 0; 148 whdr->securityIndex = call->security_ix; 149 whdr->_rsvd = 0; 150 whdr->serviceId = htons(call->dest_srx.srx_service); 151 152 get_page(virt_to_head_page(trailer)); 153 154 trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, 1, 155 rxrpc_txbuf_alloc_ack); 156 atomic_inc(&rxrpc_nr_txbuf); 157 return txb; 158 } 159 160 void rxrpc_get_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what) 161 { 162 int r; 163 164 __refcount_inc(&txb->ref, &r); 165 trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, r + 1, what); 166 } 167 168 void rxrpc_see_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what) 169 { 170 int r = refcount_read(&txb->ref); 171 172 trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, r, what); 173 } 174 175 static void rxrpc_free_txbuf(struct rxrpc_txbuf *txb) 176 { 177 int i; 178 179 trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, 0, 180 rxrpc_txbuf_free); 181 for (i = 0; i < txb->nr_kvec; i++) 182 if (txb->kvec[i].iov_base) 183 page_frag_free(txb->kvec[i].iov_base); 184 kfree(txb); 185 atomic_dec(&rxrpc_nr_txbuf); 186 } 187 188 void rxrpc_put_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what) 189 { 190 unsigned int debug_id, call_debug_id; 191 rxrpc_seq_t seq; 192 bool dead; 193 int r; 194 195 if (txb) { 196 debug_id = txb->debug_id; 197 call_debug_id = txb->call_debug_id; 198 seq = txb->seq; 199 dead = __refcount_dec_and_test(&txb->ref, &r); 200 trace_rxrpc_txbuf(debug_id, call_debug_id, seq, r - 1, what); 201 if (dead) 202 rxrpc_free_txbuf(txb); 203 } 204 } 205 206 /* 207 * Shrink the transmit buffer. 208 */ 209 void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call) 210 { 211 struct rxrpc_txbuf *txb; 212 rxrpc_seq_t hard_ack = smp_load_acquire(&call->acks_hard_ack); 213 bool wake = false; 214 215 _enter("%x/%x/%x", call->tx_bottom, call->acks_hard_ack, call->tx_top); 216 217 while ((txb = list_first_entry_or_null(&call->tx_buffer, 218 struct rxrpc_txbuf, call_link))) { 219 hard_ack = smp_load_acquire(&call->acks_hard_ack); 220 if (before(hard_ack, txb->seq)) 221 break; 222 223 if (txb->seq != call->tx_bottom + 1) 224 rxrpc_see_txbuf(txb, rxrpc_txbuf_see_out_of_step); 225 ASSERTCMP(txb->seq, ==, call->tx_bottom + 1); 226 smp_store_release(&call->tx_bottom, call->tx_bottom + 1); 227 list_del_rcu(&txb->call_link); 228 229 trace_rxrpc_txqueue(call, rxrpc_txqueue_dequeue); 230 231 rxrpc_put_txbuf(txb, rxrpc_txbuf_put_rotated); 232 if (after(call->acks_hard_ack, call->tx_bottom + 128)) 233 wake = true; 234 } 235 236 if (wake) 237 wake_up(&call->waitq); 238 } 239