1 /*- 2 * SPDX-License-Identifier: BSD-2-Clause 3 * 4 * Copyright (c) 2021 Ng Peng Nam Sean 5 * Copyright (c) 2022 Alexander V. Chernikov <melifaro@FreeBSD.org> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 #include <sys/param.h> 30 #include <sys/ck.h> 31 #include <sys/lock.h> 32 #include <sys/malloc.h> 33 #include <sys/mbuf.h> 34 #include <sys/mutex.h> 35 #include <sys/socket.h> 36 #include <sys/socketvar.h> 37 #include <sys/syslog.h> 38 39 #include <netlink/netlink.h> 40 #include <netlink/netlink_ctl.h> 41 #include <netlink/netlink_linux.h> 42 #include <netlink/netlink_var.h> 43 44 #define DEBUG_MOD_NAME nl_io 45 #define DEBUG_MAX_LEVEL LOG_DEBUG3 46 #include <netlink/netlink_debug.h> 47 _DECLARE_DEBUG(LOG_INFO); 48 49 /* 50 * The logic below provide a p2p interface for receiving and 51 * sending netlink data between the kernel and userland. 52 */ 53 54 static const struct sockaddr_nl _nl_empty_src = { 55 .nl_len = sizeof(struct sockaddr_nl), 56 .nl_family = PF_NETLINK, 57 .nl_pid = 0 /* comes from the kernel */ 58 }; 59 static const struct sockaddr *nl_empty_src = (const struct sockaddr *)&_nl_empty_src; 60 61 static struct mbuf *nl_process_mbuf(struct mbuf *m, struct nlpcb *nlp); 62 63 64 static void 65 queue_push(struct nl_io_queue *q, struct mbuf *mq) 66 { 67 while (mq != NULL) { 68 struct mbuf *m = mq; 69 mq = mq->m_nextpkt; 70 m->m_nextpkt = NULL; 71 72 q->length += m_length(m, NULL); 73 STAILQ_INSERT_TAIL(&q->head, m, m_stailqpkt); 74 } 75 } 76 77 static void 78 queue_push_head(struct nl_io_queue *q, struct mbuf *m) 79 { 80 MPASS(m->m_nextpkt == NULL); 81 82 q->length += m_length(m, NULL); 83 STAILQ_INSERT_HEAD(&q->head, m, m_stailqpkt); 84 } 85 86 static struct mbuf * 87 queue_pop(struct nl_io_queue *q) 88 { 89 if (!STAILQ_EMPTY(&q->head)) { 90 struct mbuf *m = STAILQ_FIRST(&q->head); 91 STAILQ_REMOVE_HEAD(&q->head, m_stailqpkt); 92 m->m_nextpkt = NULL; 93 q->length -= m_length(m, NULL); 94 95 return (m); 96 } 97 return (NULL); 98 } 99 100 static struct mbuf * 101 queue_head(const struct nl_io_queue *q) 102 { 103 return (STAILQ_FIRST(&q->head)); 104 } 105 106 static inline bool 107 queue_empty(const struct nl_io_queue *q) 108 { 109 return (q->length == 0); 110 } 111 112 static void 113 queue_free(struct nl_io_queue *q) 114 { 115 while (!STAILQ_EMPTY(&q->head)) { 116 struct mbuf *m = STAILQ_FIRST(&q->head); 117 STAILQ_REMOVE_HEAD(&q->head, m_stailqpkt); 118 m->m_nextpkt = NULL; 119 m_freem(m); 120 } 121 q->length = 0; 122 } 123 124 void 125 nl_add_msg_info(struct mbuf *m) 126 { 127 struct nlpcb *nlp = nl_get_thread_nlp(curthread); 128 NL_LOG(LOG_DEBUG2, "Trying to recover nlp from thread %p: %p", 129 curthread, nlp); 130 131 if (nlp == NULL) 132 return; 133 134 /* Prepare what we want to encode - PID, socket PID & msg seq */ 135 struct { 136 struct nlattr nla; 137 uint32_t val; 138 } data[] = { 139 { 140 .nla.nla_len = sizeof(struct nlattr) + sizeof(uint32_t), 141 .nla.nla_type = NLMSGINFO_ATTR_PROCESS_ID, 142 .val = nlp->nl_process_id, 143 }, 144 { 145 .nla.nla_len = sizeof(struct nlattr) + sizeof(uint32_t), 146 .nla.nla_type = NLMSGINFO_ATTR_PORT_ID, 147 .val = nlp->nl_port, 148 }, 149 }; 150 151 152 while (m->m_next != NULL) 153 m = m->m_next; 154 m->m_next = sbcreatecontrol(data, sizeof(data), 155 NETLINK_MSG_INFO, SOL_NETLINK, M_NOWAIT); 156 157 NL_LOG(LOG_DEBUG2, "Storing %u bytes of data, ctl: %p", 158 (unsigned)sizeof(data), m->m_next); 159 } 160 161 static __noinline struct mbuf * 162 extract_msg_info(struct mbuf *m) 163 { 164 while (m->m_next != NULL) { 165 if (m->m_next->m_type == MT_CONTROL) { 166 struct mbuf *ctl = m->m_next; 167 m->m_next = NULL; 168 return (ctl); 169 } 170 m = m->m_next; 171 } 172 return (NULL); 173 } 174 175 static void 176 nl_schedule_taskqueue(struct nlpcb *nlp) 177 { 178 if (!nlp->nl_task_pending) { 179 nlp->nl_task_pending = true; 180 taskqueue_enqueue(nlp->nl_taskqueue, &nlp->nl_task); 181 NL_LOG(LOG_DEBUG3, "taskqueue scheduled"); 182 } else { 183 NL_LOG(LOG_DEBUG3, "taskqueue schedule skipped"); 184 } 185 } 186 187 int 188 nl_receive_async(struct mbuf *m, struct socket *so) 189 { 190 struct nlpcb *nlp = sotonlpcb(so); 191 int error = 0; 192 193 m->m_nextpkt = NULL; 194 195 NLP_LOCK(nlp); 196 197 if ((__predict_true(nlp->nl_active))) { 198 sbappend(&so->so_snd, m, 0); 199 NL_LOG(LOG_DEBUG3, "enqueue %u bytes", m_length(m, NULL)); 200 nl_schedule_taskqueue(nlp); 201 } else { 202 NL_LOG(LOG_DEBUG, "ignoring %u bytes on non-active socket", 203 m_length(m, NULL)); 204 m_free(m); 205 error = EINVAL; 206 } 207 208 NLP_UNLOCK(nlp); 209 210 return (error); 211 } 212 213 static bool 214 tx_check_locked(struct nlpcb *nlp) 215 { 216 if (queue_empty(&nlp->tx_queue)) 217 return (true); 218 219 /* 220 * Check if something can be moved from the internal TX queue 221 * to the socket queue. 222 */ 223 224 bool appended = false; 225 struct sockbuf *sb = &nlp->nl_socket->so_rcv; 226 SOCKBUF_LOCK(sb); 227 228 while (true) { 229 struct mbuf *m = queue_head(&nlp->tx_queue); 230 if (m != NULL) { 231 struct mbuf *ctl = NULL; 232 if (__predict_false(m->m_next != NULL)) 233 ctl = extract_msg_info(m); 234 if (sbappendaddr_locked(sb, nl_empty_src, m, ctl) != 0) { 235 /* appended successfully */ 236 queue_pop(&nlp->tx_queue); 237 appended = true; 238 } else 239 break; 240 } else 241 break; 242 } 243 244 SOCKBUF_UNLOCK(sb); 245 246 if (appended) 247 sorwakeup(nlp->nl_socket); 248 249 return (queue_empty(&nlp->tx_queue)); 250 } 251 252 static bool 253 nl_process_received_one(struct nlpcb *nlp) 254 { 255 bool reschedule = false; 256 257 NLP_LOCK(nlp); 258 nlp->nl_task_pending = false; 259 260 if (!tx_check_locked(nlp)) { 261 /* TX overflow queue still not empty, ignore RX */ 262 NLP_UNLOCK(nlp); 263 return (false); 264 } 265 266 if (queue_empty(&nlp->rx_queue)) { 267 /* 268 * Grab all data we have from the socket TX queue 269 * and store it the internal queue, so it can be worked on 270 * w/o holding socket lock. 271 */ 272 struct sockbuf *sb = &nlp->nl_socket->so_snd; 273 274 SOCKBUF_LOCK(sb); 275 unsigned int avail = sbavail(sb); 276 if (avail > 0) { 277 NL_LOG(LOG_DEBUG3, "grabbed %u bytes", avail); 278 queue_push(&nlp->rx_queue, sbcut_locked(sb, avail)); 279 } 280 SOCKBUF_UNLOCK(sb); 281 } else { 282 /* Schedule another pass to read from the socket queue */ 283 reschedule = true; 284 } 285 286 int prev_hiwat = nlp->tx_queue.hiwat; 287 NLP_UNLOCK(nlp); 288 289 while (!queue_empty(&nlp->rx_queue)) { 290 struct mbuf *m = queue_pop(&nlp->rx_queue); 291 292 m = nl_process_mbuf(m, nlp); 293 if (m != NULL) { 294 queue_push_head(&nlp->rx_queue, m); 295 reschedule = false; 296 break; 297 } 298 } 299 if (nlp->tx_queue.hiwat > prev_hiwat) { 300 NLP_LOG(LOG_DEBUG, nlp, "TX override peaked to %d", nlp->tx_queue.hiwat); 301 302 } 303 304 return (reschedule); 305 } 306 307 static void 308 nl_process_received(struct nlpcb *nlp) 309 { 310 NL_LOG(LOG_DEBUG3, "taskqueue called"); 311 312 if (__predict_false(nlp->nl_need_thread_setup)) { 313 nl_set_thread_nlp(curthread, nlp); 314 NLP_LOCK(nlp); 315 nlp->nl_need_thread_setup = false; 316 NLP_UNLOCK(nlp); 317 } 318 319 while (nl_process_received_one(nlp)) 320 ; 321 } 322 323 void 324 nl_init_io(struct nlpcb *nlp) 325 { 326 STAILQ_INIT(&nlp->rx_queue.head); 327 STAILQ_INIT(&nlp->tx_queue.head); 328 } 329 330 void 331 nl_free_io(struct nlpcb *nlp) 332 { 333 queue_free(&nlp->rx_queue); 334 queue_free(&nlp->tx_queue); 335 } 336 337 /* 338 * Called after some data have been read from the socket. 339 */ 340 void 341 nl_on_transmit(struct nlpcb *nlp) 342 { 343 NLP_LOCK(nlp); 344 345 struct socket *so = nlp->nl_socket; 346 if (__predict_false(nlp->nl_dropped_bytes > 0 && so != NULL)) { 347 unsigned long dropped_bytes = nlp->nl_dropped_bytes; 348 unsigned long dropped_messages = nlp->nl_dropped_messages; 349 nlp->nl_dropped_bytes = 0; 350 nlp->nl_dropped_messages = 0; 351 352 struct sockbuf *sb = &so->so_rcv; 353 NLP_LOG(LOG_DEBUG, nlp, 354 "socket RX overflowed, %lu messages (%lu bytes) dropped. " 355 "bytes: [%u/%u] mbufs: [%u/%u]", dropped_messages, dropped_bytes, 356 sb->sb_ccc, sb->sb_hiwat, sb->sb_mbcnt, sb->sb_mbmax); 357 /* TODO: send netlink message */ 358 } 359 360 nl_schedule_taskqueue(nlp); 361 NLP_UNLOCK(nlp); 362 } 363 364 void 365 nl_taskqueue_handler(void *_arg, int pending) 366 { 367 struct nlpcb *nlp = (struct nlpcb *)_arg; 368 369 CURVNET_SET(nlp->nl_socket->so_vnet); 370 nl_process_received(nlp); 371 CURVNET_RESTORE(); 372 } 373 374 static __noinline void 375 queue_push_tx(struct nlpcb *nlp, struct mbuf *m) 376 { 377 queue_push(&nlp->tx_queue, m); 378 nlp->nl_tx_blocked = true; 379 380 if (nlp->tx_queue.length > nlp->tx_queue.hiwat) 381 nlp->tx_queue.hiwat = nlp->tx_queue.length; 382 } 383 384 /* 385 * Tries to send @m to the socket @nlp. 386 * 387 * @m: mbuf(s) to send to. Consumed in any case. 388 * @nlp: socket to send to 389 * @cnt: number of messages in @m 390 * @io_flags: combination of NL_IOF_* flags 391 * 392 * Returns true on success. 393 * If no queue overrunes happened, wakes up socket owner. 394 */ 395 bool 396 nl_send_one(struct mbuf *m, struct nlpcb *nlp, int num_messages, int io_flags) 397 { 398 bool untranslated = io_flags & NL_IOF_UNTRANSLATED; 399 bool ignore_limits = io_flags & NL_IOF_IGNORE_LIMIT; 400 bool result = true; 401 402 IF_DEBUG_LEVEL(LOG_DEBUG2) { 403 struct nlmsghdr *hdr = mtod(m, struct nlmsghdr *); 404 NLP_LOG(LOG_DEBUG2, nlp, 405 "TX mbuf len %u msgs %u msg type %d first hdrlen %u io_flags %X", 406 m_length(m, NULL), num_messages, hdr->nlmsg_type, hdr->nlmsg_len, 407 io_flags); 408 } 409 410 if (__predict_false(nlp->nl_linux && linux_netlink_p != NULL && untranslated)) { 411 m = linux_netlink_p->mbufs_to_linux(nlp->nl_proto, m, nlp); 412 if (m == NULL) 413 return (false); 414 } 415 416 NLP_LOCK(nlp); 417 418 if (__predict_false(nlp->nl_socket == NULL)) { 419 NLP_UNLOCK(nlp); 420 m_freem(m); 421 return (false); 422 } 423 424 if (!queue_empty(&nlp->tx_queue)) { 425 if (ignore_limits) { 426 queue_push_tx(nlp, m); 427 } else { 428 m_free(m); 429 result = false; 430 } 431 NLP_UNLOCK(nlp); 432 return (result); 433 } 434 435 struct socket *so = nlp->nl_socket; 436 struct mbuf *ctl = NULL; 437 if (__predict_false(m->m_next != NULL)) 438 ctl = extract_msg_info(m); 439 if (sbappendaddr(&so->so_rcv, nl_empty_src, m, ctl) != 0) { 440 sorwakeup(so); 441 NLP_LOG(LOG_DEBUG3, nlp, "appended data & woken up"); 442 } else { 443 if (ignore_limits) { 444 queue_push_tx(nlp, m); 445 } else { 446 /* 447 * Store dropped data so it can be reported 448 * on the next read 449 */ 450 nlp->nl_dropped_bytes += m_length(m, NULL); 451 nlp->nl_dropped_messages += num_messages; 452 NLP_LOG(LOG_DEBUG2, nlp, "RX oveflow: %lu m (+%d), %lu b (+%d)", 453 (unsigned long)nlp->nl_dropped_messages, num_messages, 454 (unsigned long)nlp->nl_dropped_bytes, m_length(m, NULL)); 455 soroverflow(so); 456 m_freem(m); 457 result = false; 458 } 459 } 460 NLP_UNLOCK(nlp); 461 462 return (result); 463 } 464 465 static int 466 nl_receive_message(struct nlmsghdr *hdr, int remaining_length, 467 struct nlpcb *nlp, struct nl_pstate *npt) 468 { 469 nl_handler_f handler = nl_handlers[nlp->nl_proto].cb; 470 int error = 0; 471 472 NLP_LOG(LOG_DEBUG2, nlp, "msg len: %u type: %d: flags: 0x%X seq: %u pid: %u", 473 hdr->nlmsg_len, hdr->nlmsg_type, hdr->nlmsg_flags, hdr->nlmsg_seq, 474 hdr->nlmsg_pid); 475 476 if (__predict_false(hdr->nlmsg_len > remaining_length)) { 477 NLP_LOG(LOG_DEBUG, nlp, "message is not entirely present: want %d got %d", 478 hdr->nlmsg_len, remaining_length); 479 return (EINVAL); 480 } else if (__predict_false(hdr->nlmsg_len < sizeof(*hdr))) { 481 NL_LOG(LOG_DEBUG, "message too short: %d", hdr->nlmsg_len); 482 return (EINVAL); 483 } 484 /* Stamp each message with sender pid */ 485 hdr->nlmsg_pid = nlp->nl_port; 486 487 npt->hdr = hdr; 488 489 if (hdr->nlmsg_flags & NLM_F_REQUEST && hdr->nlmsg_type >= NLMSG_MIN_TYPE) { 490 NL_LOG(LOG_DEBUG2, "handling message with msg type: %d", 491 hdr->nlmsg_type); 492 493 if (nlp->nl_linux && linux_netlink_p != NULL) { 494 struct nlmsghdr *hdr_orig = hdr; 495 hdr = linux_netlink_p->msg_from_linux(nlp->nl_proto, hdr, npt); 496 if (hdr == NULL) { 497 /* Failed to translate to kernel format. Report an error back */ 498 hdr = hdr_orig; 499 npt->hdr = hdr; 500 if (hdr->nlmsg_flags & NLM_F_ACK) 501 nlmsg_ack(nlp, EOPNOTSUPP, hdr, npt); 502 return (0); 503 } 504 } 505 error = handler(hdr, npt); 506 NL_LOG(LOG_DEBUG2, "retcode: %d", error); 507 } 508 if ((hdr->nlmsg_flags & NLM_F_ACK) || (error != 0 && error != EINTR)) { 509 if (!npt->nw->suppress_ack) { 510 NL_LOG(LOG_DEBUG3, "ack"); 511 nlmsg_ack(nlp, error, hdr, npt); 512 } 513 } 514 515 return (0); 516 } 517 518 static void 519 npt_clear(struct nl_pstate *npt) 520 { 521 lb_clear(&npt->lb); 522 npt->error = 0; 523 npt->err_msg = NULL; 524 npt->err_off = 0; 525 npt->hdr = NULL; 526 npt->nw->suppress_ack = false; 527 } 528 529 /* 530 * Processes an incoming packet, which can contain multiple netlink messages 531 */ 532 static struct mbuf * 533 nl_process_mbuf(struct mbuf *m, struct nlpcb *nlp) 534 { 535 int offset, buffer_length; 536 struct nlmsghdr *hdr; 537 char *buffer; 538 int error; 539 540 NL_LOG(LOG_DEBUG3, "RX netlink mbuf %p on %p", m, nlp->nl_socket); 541 542 struct nl_writer nw = {}; 543 if (!nlmsg_get_unicast_writer(&nw, NLMSG_SMALL, nlp)) { 544 m_freem(m); 545 NL_LOG(LOG_DEBUG, "error allocating socket writer"); 546 return (NULL); 547 } 548 549 nlmsg_ignore_limit(&nw); 550 /* TODO: alloc this buf once for nlp */ 551 int data_length = m_length(m, NULL); 552 buffer_length = roundup2(data_length, 8) + SCRATCH_BUFFER_SIZE; 553 if (nlp->nl_linux) 554 buffer_length += roundup2(data_length, 8); 555 buffer = malloc(buffer_length, M_NETLINK, M_NOWAIT | M_ZERO); 556 if (buffer == NULL) { 557 m_freem(m); 558 nlmsg_flush(&nw); 559 NL_LOG(LOG_DEBUG, "Unable to allocate %d bytes of memory", 560 buffer_length); 561 return (NULL); 562 } 563 m_copydata(m, 0, data_length, buffer); 564 565 struct nl_pstate npt = { 566 .nlp = nlp, 567 .lb.base = &buffer[roundup2(data_length, 8)], 568 .lb.size = buffer_length - roundup2(data_length, 8), 569 .nw = &nw, 570 .strict = nlp->nl_flags & NLF_STRICT, 571 }; 572 573 for (offset = 0; offset + sizeof(struct nlmsghdr) <= data_length;) { 574 hdr = (struct nlmsghdr *)&buffer[offset]; 575 /* Save length prior to calling handler */ 576 int msglen = NLMSG_ALIGN(hdr->nlmsg_len); 577 NL_LOG(LOG_DEBUG3, "parsing offset %d/%d", offset, data_length); 578 npt_clear(&npt); 579 error = nl_receive_message(hdr, data_length - offset, nlp, &npt); 580 offset += msglen; 581 if (__predict_false(error != 0 || nlp->nl_tx_blocked)) 582 break; 583 } 584 NL_LOG(LOG_DEBUG3, "packet parsing done"); 585 free(buffer, M_NETLINK); 586 nlmsg_flush(&nw); 587 588 if (nlp->nl_tx_blocked) { 589 NLP_LOCK(nlp); 590 nlp->nl_tx_blocked = false; 591 NLP_UNLOCK(nlp); 592 m_adj(m, offset); 593 return (m); 594 } else { 595 m_freem(m); 596 return (NULL); 597 } 598 } 599