1 /* 2 * Copyright 2022-2023 The OpenSSL Project Authors. All Rights Reserved. 3 * 4 * Licensed under the Apache License 2.0 (the "License"). You may not use 5 * this file except in compliance with the License. You can obtain a copy 6 * in the file LICENSE in the source distribution or at 7 * https://www.openssl.org/source/license.html 8 */ 9 10 #include "internal/quic_stream.h" 11 #include "internal/uint_set.h" 12 #include "internal/common.h" 13 #include "internal/ring_buf.h" 14 15 /* 16 * ================================================================== 17 * QUIC Send Stream 18 */ 19 struct quic_sstream_st { 20 struct ring_buf ring_buf; 21 22 /* 23 * Any logical byte in the stream is in one of these states: 24 * 25 * - NEW: The byte has not yet been transmitted, or has been lost and is 26 * in need of retransmission. 27 * 28 * - IN_FLIGHT: The byte has been transmitted but is awaiting 29 * acknowledgement. We continue to store the data in case we return 30 * to the NEW state. 31 * 32 * - ACKED: The byte has been acknowledged and we can cease storing it. 33 * We do not necessarily cull it immediately, so there may be a delay 34 * between reaching the ACKED state and the buffer space actually being 35 * recycled. 36 * 37 * A logical byte in the stream is 38 * 39 * - in the NEW state if it is in new_set; 40 * - is in the ACKED state if it is in acked_set 41 * (and may or may not have been culled); 42 * - is in the IN_FLIGHT state otherwise. 43 * 44 * Invariant: No logical byte is ever in both new_set and acked_set. 45 */ 46 UINT_SET new_set, acked_set; 47 48 /* 49 * The current size of the stream is ring_buf.head_offset. If 50 * have_final_size is true, this is also the final size of the stream. 51 */ 52 unsigned int have_final_size : 1; 53 unsigned int sent_final_size : 1; 54 unsigned int acked_final_size : 1; 55 unsigned int cleanse : 1; 56 }; 57 58 static void qss_cull(QUIC_SSTREAM *qss); 59 60 QUIC_SSTREAM *ossl_quic_sstream_new(size_t init_buf_size) 61 { 62 QUIC_SSTREAM *qss; 63 64 qss = OPENSSL_zalloc(sizeof(QUIC_SSTREAM)); 65 if (qss == NULL) 66 return NULL; 67 68 ring_buf_init(&qss->ring_buf); 69 if (!ring_buf_resize(&qss->ring_buf, init_buf_size, 0)) { 70 ring_buf_destroy(&qss->ring_buf, 0); 71 OPENSSL_free(qss); 72 return NULL; 73 } 74 75 ossl_uint_set_init(&qss->new_set); 76 ossl_uint_set_init(&qss->acked_set); 77 return qss; 78 } 79 80 void ossl_quic_sstream_free(QUIC_SSTREAM *qss) 81 { 82 if (qss == NULL) 83 return; 84 85 ossl_uint_set_destroy(&qss->new_set); 86 ossl_uint_set_destroy(&qss->acked_set); 87 ring_buf_destroy(&qss->ring_buf, qss->cleanse); 88 OPENSSL_free(qss); 89 } 90 91 int ossl_quic_sstream_get_stream_frame(QUIC_SSTREAM *qss, 92 size_t skip, 93 OSSL_QUIC_FRAME_STREAM *hdr, 94 OSSL_QTX_IOVEC *iov, 95 size_t *num_iov) 96 { 97 size_t num_iov_ = 0, src_len = 0, total_len = 0, i; 98 uint64_t max_len; 99 const unsigned char *src = NULL; 100 UINT_SET_ITEM *range = ossl_list_uint_set_head(&qss->new_set); 101 102 if (*num_iov < 2) 103 return 0; 104 105 for (i = 0; i < skip && range != NULL; ++i) 106 range = ossl_list_uint_set_next(range); 107 108 if (range == NULL) { 109 if (i < skip) 110 /* Don't return FIN for infinitely increasing skip */ 111 return 0; 112 113 /* No new bytes to send, but we might have a FIN */ 114 if (!qss->have_final_size || qss->sent_final_size) 115 return 0; 116 117 hdr->offset = qss->ring_buf.head_offset; 118 hdr->len = 0; 119 hdr->is_fin = 1; 120 *num_iov = 0; 121 return 1; 122 } 123 124 /* 125 * We can only send a contiguous range of logical bytes in a single 126 * stream frame, so limit ourselves to the range of the first set entry. 127 * 128 * Set entries never have 'adjacent' entries so we don't have to worry 129 * about them here. 130 */ 131 max_len = range->range.end - range->range.start + 1; 132 133 for (i = 0;; ++i) { 134 if (total_len >= max_len) 135 break; 136 137 if (!ring_buf_get_buf_at(&qss->ring_buf, 138 range->range.start + total_len, 139 &src, &src_len)) 140 return 0; 141 142 if (src_len == 0) 143 break; 144 145 assert(i < 2); 146 147 if (total_len + src_len > max_len) 148 src_len = (size_t)(max_len - total_len); 149 150 iov[num_iov_].buf = src; 151 iov[num_iov_].buf_len = src_len; 152 153 total_len += src_len; 154 ++num_iov_; 155 } 156 157 hdr->offset = range->range.start; 158 hdr->len = total_len; 159 hdr->is_fin = qss->have_final_size 160 && hdr->offset + hdr->len == qss->ring_buf.head_offset; 161 162 *num_iov = num_iov_; 163 return 1; 164 } 165 166 int ossl_quic_sstream_has_pending(QUIC_SSTREAM *qss) 167 { 168 OSSL_QUIC_FRAME_STREAM shdr; 169 OSSL_QTX_IOVEC iov[2]; 170 size_t num_iov = OSSL_NELEM(iov); 171 172 return ossl_quic_sstream_get_stream_frame(qss, 0, &shdr, iov, &num_iov); 173 } 174 175 uint64_t ossl_quic_sstream_get_cur_size(QUIC_SSTREAM *qss) 176 { 177 return qss->ring_buf.head_offset; 178 } 179 180 int ossl_quic_sstream_mark_transmitted(QUIC_SSTREAM *qss, 181 uint64_t start, 182 uint64_t end) 183 { 184 UINT_RANGE r; 185 186 r.start = start; 187 r.end = end; 188 189 if (!ossl_uint_set_remove(&qss->new_set, &r)) 190 return 0; 191 192 return 1; 193 } 194 195 int ossl_quic_sstream_mark_transmitted_fin(QUIC_SSTREAM *qss, 196 uint64_t final_size) 197 { 198 /* 199 * We do not really need final_size since we already know the size of the 200 * stream, but this serves as a sanity check. 201 */ 202 if (!qss->have_final_size || final_size != qss->ring_buf.head_offset) 203 return 0; 204 205 qss->sent_final_size = 1; 206 return 1; 207 } 208 209 int ossl_quic_sstream_mark_lost(QUIC_SSTREAM *qss, 210 uint64_t start, 211 uint64_t end) 212 { 213 UINT_RANGE r; 214 r.start = start; 215 r.end = end; 216 217 /* 218 * We lost a range of stream data bytes, so reinsert them into the new set, 219 * so that they are returned once more by ossl_quic_sstream_get_stream_frame. 220 */ 221 if (!ossl_uint_set_insert(&qss->new_set, &r)) 222 return 0; 223 224 return 1; 225 } 226 227 int ossl_quic_sstream_mark_lost_fin(QUIC_SSTREAM *qss) 228 { 229 if (qss->acked_final_size) 230 /* Does not make sense to lose a FIN after it has been ACKed */ 231 return 0; 232 233 /* FIN was lost, so we need to transmit it again. */ 234 qss->sent_final_size = 0; 235 return 1; 236 } 237 238 int ossl_quic_sstream_mark_acked(QUIC_SSTREAM *qss, 239 uint64_t start, 240 uint64_t end) 241 { 242 UINT_RANGE r; 243 r.start = start; 244 r.end = end; 245 246 if (!ossl_uint_set_insert(&qss->acked_set, &r)) 247 return 0; 248 249 qss_cull(qss); 250 return 1; 251 } 252 253 int ossl_quic_sstream_mark_acked_fin(QUIC_SSTREAM *qss) 254 { 255 if (!qss->have_final_size) 256 /* Cannot ack final size before we have a final size */ 257 return 0; 258 259 qss->acked_final_size = 1; 260 return 1; 261 } 262 263 void ossl_quic_sstream_fin(QUIC_SSTREAM *qss) 264 { 265 if (qss->have_final_size) 266 return; 267 268 qss->have_final_size = 1; 269 } 270 271 int ossl_quic_sstream_get_final_size(QUIC_SSTREAM *qss, uint64_t *final_size) 272 { 273 if (!qss->have_final_size) 274 return 0; 275 276 if (final_size != NULL) 277 *final_size = qss->ring_buf.head_offset; 278 279 return 1; 280 } 281 282 int ossl_quic_sstream_append(QUIC_SSTREAM *qss, 283 const unsigned char *buf, 284 size_t buf_len, 285 size_t *consumed) 286 { 287 size_t l, consumed_ = 0; 288 UINT_RANGE r; 289 struct ring_buf old_ring_buf = qss->ring_buf; 290 291 if (qss->have_final_size) { 292 *consumed = 0; 293 return 0; 294 } 295 296 /* 297 * Note: It is assumed that ossl_quic_sstream_append will be called during a 298 * call to e.g. SSL_write and this function is therefore designed to support 299 * such semantics. In particular, the buffer pointed to by buf is only 300 * assumed to be valid for the duration of this call, therefore we must copy 301 * the data here. We will later copy-and-encrypt the data during packet 302 * encryption, so this is a two-copy design. Supporting a one-copy design in 303 * the future will require applications to use a different kind of API. 304 * Supporting such changes in future will require corresponding enhancements 305 * to this code. 306 */ 307 while (buf_len > 0) { 308 l = ring_buf_push(&qss->ring_buf, buf, buf_len); 309 if (l == 0) 310 break; 311 312 buf += l; 313 buf_len -= l; 314 consumed_ += l; 315 } 316 317 if (consumed_ > 0) { 318 r.start = old_ring_buf.head_offset; 319 r.end = r.start + consumed_ - 1; 320 assert(r.end + 1 == qss->ring_buf.head_offset); 321 if (!ossl_uint_set_insert(&qss->new_set, &r)) { 322 qss->ring_buf = old_ring_buf; 323 *consumed = 0; 324 return 0; 325 } 326 } 327 328 *consumed = consumed_; 329 return 1; 330 } 331 332 static void qss_cull(QUIC_SSTREAM *qss) 333 { 334 UINT_SET_ITEM *h = ossl_list_uint_set_head(&qss->acked_set); 335 336 /* 337 * Potentially cull data from our ring buffer. This can happen once data has 338 * been ACKed and we know we are never going to have to transmit it again. 339 * 340 * Since we use a ring buffer design for simplicity, we cannot cull byte n + 341 * k (for k > 0) from the ring buffer until byte n has also been culled. 342 * This means if parts of the stream get acknowledged out of order we might 343 * keep around some data we technically don't need to for a while. The 344 * impact of this is likely to be small and limited to quite a short 345 * duration, and doesn't justify the use of a more complex design. 346 */ 347 348 /* 349 * We only need to check the first range entry in the integer set because we 350 * can only cull contiguous areas at the start of the ring buffer anyway. 351 */ 352 if (h != NULL) 353 ring_buf_cpop_range(&qss->ring_buf, h->range.start, h->range.end, 354 qss->cleanse); 355 } 356 357 int ossl_quic_sstream_set_buffer_size(QUIC_SSTREAM *qss, size_t num_bytes) 358 { 359 return ring_buf_resize(&qss->ring_buf, num_bytes, qss->cleanse); 360 } 361 362 size_t ossl_quic_sstream_get_buffer_size(QUIC_SSTREAM *qss) 363 { 364 return qss->ring_buf.alloc; 365 } 366 367 size_t ossl_quic_sstream_get_buffer_used(QUIC_SSTREAM *qss) 368 { 369 return ring_buf_used(&qss->ring_buf); 370 } 371 372 size_t ossl_quic_sstream_get_buffer_avail(QUIC_SSTREAM *qss) 373 { 374 return ring_buf_avail(&qss->ring_buf); 375 } 376 377 int ossl_quic_sstream_is_totally_acked(QUIC_SSTREAM *qss) 378 { 379 UINT_RANGE r; 380 uint64_t cur_size; 381 382 if (qss->have_final_size && !qss->acked_final_size) 383 return 0; 384 385 if (ossl_quic_sstream_get_cur_size(qss) == 0) 386 return 1; 387 388 if (ossl_list_uint_set_num(&qss->acked_set) != 1) 389 return 0; 390 391 r = ossl_list_uint_set_head(&qss->acked_set)->range; 392 cur_size = qss->ring_buf.head_offset; 393 394 /* 395 * The invariants of UINT_SET guarantee a single list element if we have a 396 * single contiguous range, which is what we should have if everything has 397 * been acked. 398 */ 399 assert(r.end + 1 <= cur_size); 400 return r.start == 0 && r.end + 1 == cur_size; 401 } 402 403 void ossl_quic_sstream_adjust_iov(size_t len, 404 OSSL_QTX_IOVEC *iov, 405 size_t num_iov) 406 { 407 size_t running = 0, i, iovlen; 408 409 for (i = 0, running = 0; i < num_iov; ++i) { 410 iovlen = iov[i].buf_len; 411 412 if (running >= len) 413 iov[i].buf_len = 0; 414 else if (running + iovlen > len) 415 iov[i].buf_len = len - running; 416 417 running += iovlen; 418 } 419 } 420 421 void ossl_quic_sstream_set_cleanse(QUIC_SSTREAM *qss, int cleanse) 422 { 423 qss->cleanse = cleanse; 424 } 425