1 /*- 2 * SPDX-License-Identifier: BSD-2-Clause 3 * 4 * Copyright (c) 2021 Ng Peng Nam Sean 5 * Copyright (c) 2022 Alexander V. Chernikov <melifaro@FreeBSD.org> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 #include <sys/cdefs.h> 30 #include <sys/param.h> 31 #include <sys/ck.h> 32 #include <sys/lock.h> 33 #include <sys/malloc.h> 34 #include <sys/mbuf.h> 35 #include <sys/mutex.h> 36 #include <sys/socket.h> 37 #include <sys/socketvar.h> 38 #include <sys/syslog.h> 39 40 #include <netlink/netlink.h> 41 #include <netlink/netlink_ctl.h> 42 #include <netlink/netlink_linux.h> 43 #include <netlink/netlink_var.h> 44 45 #define DEBUG_MOD_NAME nl_io 46 #define DEBUG_MAX_LEVEL LOG_DEBUG3 47 #include <netlink/netlink_debug.h> 48 _DECLARE_DEBUG(LOG_INFO); 49 50 /* 51 * The logic below provide a p2p interface for receiving and 52 * sending netlink data between the kernel and userland. 53 */ 54 55 static const struct sockaddr_nl _nl_empty_src = { 56 .nl_len = sizeof(struct sockaddr_nl), 57 .nl_family = PF_NETLINK, 58 .nl_pid = 0 /* comes from the kernel */ 59 }; 60 static const struct sockaddr *nl_empty_src = (const struct sockaddr *)&_nl_empty_src; 61 62 static struct mbuf *nl_process_mbuf(struct mbuf *m, struct nlpcb *nlp); 63 64 65 static void 66 queue_push(struct nl_io_queue *q, struct mbuf *mq) 67 { 68 while (mq != NULL) { 69 struct mbuf *m = mq; 70 mq = mq->m_nextpkt; 71 m->m_nextpkt = NULL; 72 73 q->length += m_length(m, NULL); 74 STAILQ_INSERT_TAIL(&q->head, m, m_stailqpkt); 75 } 76 } 77 78 static void 79 queue_push_head(struct nl_io_queue *q, struct mbuf *m) 80 { 81 MPASS(m->m_nextpkt == NULL); 82 83 q->length += m_length(m, NULL); 84 STAILQ_INSERT_HEAD(&q->head, m, m_stailqpkt); 85 } 86 87 static struct mbuf * 88 queue_pop(struct nl_io_queue *q) 89 { 90 if (!STAILQ_EMPTY(&q->head)) { 91 struct mbuf *m = STAILQ_FIRST(&q->head); 92 STAILQ_REMOVE_HEAD(&q->head, m_stailqpkt); 93 m->m_nextpkt = NULL; 94 q->length -= m_length(m, NULL); 95 96 return (m); 97 } 98 return (NULL); 99 } 100 101 static struct mbuf * 102 queue_head(const struct nl_io_queue *q) 103 { 104 return (STAILQ_FIRST(&q->head)); 105 } 106 107 static inline bool 108 queue_empty(const struct nl_io_queue *q) 109 { 110 return (q->length == 0); 111 } 112 113 static void 114 queue_free(struct nl_io_queue *q) 115 { 116 while (!STAILQ_EMPTY(&q->head)) { 117 struct mbuf *m = STAILQ_FIRST(&q->head); 118 STAILQ_REMOVE_HEAD(&q->head, m_stailqpkt); 119 m->m_nextpkt = NULL; 120 m_freem(m); 121 } 122 q->length = 0; 123 } 124 125 void 126 nl_add_msg_info(struct mbuf *m) 127 { 128 struct nlpcb *nlp = nl_get_thread_nlp(curthread); 129 NL_LOG(LOG_DEBUG2, "Trying to recover nlp from thread %p: %p", 130 curthread, nlp); 131 132 if (nlp == NULL) 133 return; 134 135 /* Prepare what we want to encode - PID, socket PID & msg seq */ 136 struct { 137 struct nlattr nla; 138 uint32_t val; 139 } data[] = { 140 { 141 .nla.nla_len = sizeof(struct nlattr) + sizeof(uint32_t), 142 .nla.nla_type = NLMSGINFO_ATTR_PROCESS_ID, 143 .val = nlp->nl_process_id, 144 }, 145 { 146 .nla.nla_len = sizeof(struct nlattr) + sizeof(uint32_t), 147 .nla.nla_type = NLMSGINFO_ATTR_PORT_ID, 148 .val = nlp->nl_port, 149 }, 150 }; 151 152 153 while (m->m_next != NULL) 154 m = m->m_next; 155 m->m_next = sbcreatecontrol(data, sizeof(data), 156 NETLINK_MSG_INFO, SOL_NETLINK, M_NOWAIT); 157 158 NL_LOG(LOG_DEBUG2, "Storing %u bytes of data, ctl: %p", 159 (unsigned)sizeof(data), m->m_next); 160 } 161 162 static __noinline struct mbuf * 163 extract_msg_info(struct mbuf *m) 164 { 165 while (m->m_next != NULL) { 166 if (m->m_next->m_type == MT_CONTROL) { 167 struct mbuf *ctl = m->m_next; 168 m->m_next = NULL; 169 return (ctl); 170 } 171 m = m->m_next; 172 } 173 return (NULL); 174 } 175 176 static void 177 nl_schedule_taskqueue(struct nlpcb *nlp) 178 { 179 if (!nlp->nl_task_pending) { 180 nlp->nl_task_pending = true; 181 taskqueue_enqueue(nlp->nl_taskqueue, &nlp->nl_task); 182 NL_LOG(LOG_DEBUG3, "taskqueue scheduled"); 183 } else { 184 NL_LOG(LOG_DEBUG3, "taskqueue schedule skipped"); 185 } 186 } 187 188 int 189 nl_receive_async(struct mbuf *m, struct socket *so) 190 { 191 struct nlpcb *nlp = sotonlpcb(so); 192 int error = 0; 193 194 m->m_nextpkt = NULL; 195 196 NLP_LOCK(nlp); 197 198 if ((__predict_true(nlp->nl_active))) { 199 sbappend(&so->so_snd, m, 0); 200 NL_LOG(LOG_DEBUG3, "enqueue %u bytes", m_length(m, NULL)); 201 nl_schedule_taskqueue(nlp); 202 } else { 203 NL_LOG(LOG_DEBUG, "ignoring %u bytes on non-active socket", 204 m_length(m, NULL)); 205 m_free(m); 206 error = EINVAL; 207 } 208 209 NLP_UNLOCK(nlp); 210 211 return (error); 212 } 213 214 static bool 215 tx_check_locked(struct nlpcb *nlp) 216 { 217 if (queue_empty(&nlp->tx_queue)) 218 return (true); 219 220 /* 221 * Check if something can be moved from the internal TX queue 222 * to the socket queue. 223 */ 224 225 bool appended = false; 226 struct sockbuf *sb = &nlp->nl_socket->so_rcv; 227 SOCKBUF_LOCK(sb); 228 229 while (true) { 230 struct mbuf *m = queue_head(&nlp->tx_queue); 231 if (m != NULL) { 232 struct mbuf *ctl = NULL; 233 if (__predict_false(m->m_next != NULL)) 234 ctl = extract_msg_info(m); 235 if (sbappendaddr_locked(sb, nl_empty_src, m, ctl) != 0) { 236 /* appended successfully */ 237 queue_pop(&nlp->tx_queue); 238 appended = true; 239 } else 240 break; 241 } else 242 break; 243 } 244 245 SOCKBUF_UNLOCK(sb); 246 247 if (appended) 248 sorwakeup(nlp->nl_socket); 249 250 return (queue_empty(&nlp->tx_queue)); 251 } 252 253 static bool 254 nl_process_received_one(struct nlpcb *nlp) 255 { 256 bool reschedule = false; 257 258 NLP_LOCK(nlp); 259 nlp->nl_task_pending = false; 260 261 if (!tx_check_locked(nlp)) { 262 /* TX overflow queue still not empty, ignore RX */ 263 NLP_UNLOCK(nlp); 264 return (false); 265 } 266 267 if (queue_empty(&nlp->rx_queue)) { 268 /* 269 * Grab all data we have from the socket TX queue 270 * and store it the internal queue, so it can be worked on 271 * w/o holding socket lock. 272 */ 273 struct sockbuf *sb = &nlp->nl_socket->so_snd; 274 275 SOCKBUF_LOCK(sb); 276 unsigned int avail = sbavail(sb); 277 if (avail > 0) { 278 NL_LOG(LOG_DEBUG3, "grabbed %u bytes", avail); 279 queue_push(&nlp->rx_queue, sbcut_locked(sb, avail)); 280 } 281 SOCKBUF_UNLOCK(sb); 282 } else { 283 /* Schedule another pass to read from the socket queue */ 284 reschedule = true; 285 } 286 287 int prev_hiwat = nlp->tx_queue.hiwat; 288 NLP_UNLOCK(nlp); 289 290 while (!queue_empty(&nlp->rx_queue)) { 291 struct mbuf *m = queue_pop(&nlp->rx_queue); 292 293 m = nl_process_mbuf(m, nlp); 294 if (m != NULL) { 295 queue_push_head(&nlp->rx_queue, m); 296 reschedule = false; 297 break; 298 } 299 } 300 if (nlp->tx_queue.hiwat > prev_hiwat) { 301 NLP_LOG(LOG_DEBUG, nlp, "TX override peaked to %d", nlp->tx_queue.hiwat); 302 303 } 304 305 return (reschedule); 306 } 307 308 static void 309 nl_process_received(struct nlpcb *nlp) 310 { 311 NL_LOG(LOG_DEBUG3, "taskqueue called"); 312 313 if (__predict_false(nlp->nl_need_thread_setup)) { 314 nl_set_thread_nlp(curthread, nlp); 315 NLP_LOCK(nlp); 316 nlp->nl_need_thread_setup = false; 317 NLP_UNLOCK(nlp); 318 } 319 320 while (nl_process_received_one(nlp)) 321 ; 322 } 323 324 void 325 nl_init_io(struct nlpcb *nlp) 326 { 327 STAILQ_INIT(&nlp->rx_queue.head); 328 STAILQ_INIT(&nlp->tx_queue.head); 329 } 330 331 void 332 nl_free_io(struct nlpcb *nlp) 333 { 334 queue_free(&nlp->rx_queue); 335 queue_free(&nlp->tx_queue); 336 } 337 338 /* 339 * Called after some data have been read from the socket. 340 */ 341 void 342 nl_on_transmit(struct nlpcb *nlp) 343 { 344 NLP_LOCK(nlp); 345 346 struct socket *so = nlp->nl_socket; 347 if (__predict_false(nlp->nl_dropped_bytes > 0 && so != NULL)) { 348 unsigned long dropped_bytes = nlp->nl_dropped_bytes; 349 unsigned long dropped_messages = nlp->nl_dropped_messages; 350 nlp->nl_dropped_bytes = 0; 351 nlp->nl_dropped_messages = 0; 352 353 struct sockbuf *sb = &so->so_rcv; 354 NLP_LOG(LOG_DEBUG, nlp, 355 "socket RX overflowed, %lu messages (%lu bytes) dropped. " 356 "bytes: [%u/%u] mbufs: [%u/%u]", dropped_messages, dropped_bytes, 357 sb->sb_ccc, sb->sb_hiwat, sb->sb_mbcnt, sb->sb_mbmax); 358 /* TODO: send netlink message */ 359 } 360 361 nl_schedule_taskqueue(nlp); 362 NLP_UNLOCK(nlp); 363 } 364 365 void 366 nl_taskqueue_handler(void *_arg, int pending) 367 { 368 struct nlpcb *nlp = (struct nlpcb *)_arg; 369 370 CURVNET_SET(nlp->nl_socket->so_vnet); 371 nl_process_received(nlp); 372 CURVNET_RESTORE(); 373 } 374 375 static __noinline void 376 queue_push_tx(struct nlpcb *nlp, struct mbuf *m) 377 { 378 queue_push(&nlp->tx_queue, m); 379 nlp->nl_tx_blocked = true; 380 381 if (nlp->tx_queue.length > nlp->tx_queue.hiwat) 382 nlp->tx_queue.hiwat = nlp->tx_queue.length; 383 } 384 385 /* 386 * Tries to send @m to the socket @nlp. 387 * 388 * @m: mbuf(s) to send to. Consumed in any case. 389 * @nlp: socket to send to 390 * @cnt: number of messages in @m 391 * @io_flags: combination of NL_IOF_* flags 392 * 393 * Returns true on success. 394 * If no queue overrunes happened, wakes up socket owner. 395 */ 396 bool 397 nl_send_one(struct mbuf *m, struct nlpcb *nlp, int num_messages, int io_flags) 398 { 399 bool untranslated = io_flags & NL_IOF_UNTRANSLATED; 400 bool ignore_limits = io_flags & NL_IOF_IGNORE_LIMIT; 401 bool result = true; 402 403 IF_DEBUG_LEVEL(LOG_DEBUG2) { 404 struct nlmsghdr *hdr = mtod(m, struct nlmsghdr *); 405 NLP_LOG(LOG_DEBUG2, nlp, 406 "TX mbuf len %u msgs %u msg type %d first hdrlen %u io_flags %X", 407 m_length(m, NULL), num_messages, hdr->nlmsg_type, hdr->nlmsg_len, 408 io_flags); 409 } 410 411 if (__predict_false(nlp->nl_linux && linux_netlink_p != NULL && untranslated)) { 412 m = linux_netlink_p->mbufs_to_linux(nlp->nl_proto, m, nlp); 413 if (m == NULL) 414 return (false); 415 } 416 417 NLP_LOCK(nlp); 418 419 if (__predict_false(nlp->nl_socket == NULL)) { 420 NLP_UNLOCK(nlp); 421 m_freem(m); 422 return (false); 423 } 424 425 if (!queue_empty(&nlp->tx_queue)) { 426 if (ignore_limits) { 427 queue_push_tx(nlp, m); 428 } else { 429 m_free(m); 430 result = false; 431 } 432 NLP_UNLOCK(nlp); 433 return (result); 434 } 435 436 struct socket *so = nlp->nl_socket; 437 struct mbuf *ctl = NULL; 438 if (__predict_false(m->m_next != NULL)) 439 ctl = extract_msg_info(m); 440 if (sbappendaddr(&so->so_rcv, nl_empty_src, m, ctl) != 0) { 441 sorwakeup(so); 442 NLP_LOG(LOG_DEBUG3, nlp, "appended data & woken up"); 443 } else { 444 if (ignore_limits) { 445 queue_push_tx(nlp, m); 446 } else { 447 /* 448 * Store dropped data so it can be reported 449 * on the next read 450 */ 451 nlp->nl_dropped_bytes += m_length(m, NULL); 452 nlp->nl_dropped_messages += num_messages; 453 NLP_LOG(LOG_DEBUG2, nlp, "RX oveflow: %lu m (+%d), %lu b (+%d)", 454 (unsigned long)nlp->nl_dropped_messages, num_messages, 455 (unsigned long)nlp->nl_dropped_bytes, m_length(m, NULL)); 456 soroverflow(so); 457 m_freem(m); 458 result = false; 459 } 460 } 461 NLP_UNLOCK(nlp); 462 463 return (result); 464 } 465 466 static int 467 nl_receive_message(struct nlmsghdr *hdr, int remaining_length, 468 struct nlpcb *nlp, struct nl_pstate *npt) 469 { 470 nl_handler_f handler = nl_handlers[nlp->nl_proto].cb; 471 int error = 0; 472 473 NLP_LOG(LOG_DEBUG2, nlp, "msg len: %u type: %d: flags: 0x%X seq: %u pid: %u", 474 hdr->nlmsg_len, hdr->nlmsg_type, hdr->nlmsg_flags, hdr->nlmsg_seq, 475 hdr->nlmsg_pid); 476 477 if (__predict_false(hdr->nlmsg_len > remaining_length)) { 478 NLP_LOG(LOG_DEBUG, nlp, "message is not entirely present: want %d got %d", 479 hdr->nlmsg_len, remaining_length); 480 return (EINVAL); 481 } else if (__predict_false(hdr->nlmsg_len < sizeof(*hdr))) { 482 NL_LOG(LOG_DEBUG, "message too short: %d", hdr->nlmsg_len); 483 return (EINVAL); 484 } 485 /* Stamp each message with sender pid */ 486 hdr->nlmsg_pid = nlp->nl_port; 487 488 npt->hdr = hdr; 489 490 if (hdr->nlmsg_flags & NLM_F_REQUEST && hdr->nlmsg_type >= NLMSG_MIN_TYPE) { 491 NL_LOG(LOG_DEBUG2, "handling message with msg type: %d", 492 hdr->nlmsg_type); 493 494 if (nlp->nl_linux && linux_netlink_p != NULL) { 495 struct nlmsghdr *hdr_orig = hdr; 496 hdr = linux_netlink_p->msg_from_linux(nlp->nl_proto, hdr, npt); 497 if (hdr == NULL) { 498 /* Failed to translate to kernel format. Report an error back */ 499 hdr = hdr_orig; 500 npt->hdr = hdr; 501 if (hdr->nlmsg_flags & NLM_F_ACK) 502 nlmsg_ack(nlp, EOPNOTSUPP, hdr, npt); 503 return (0); 504 } 505 } 506 error = handler(hdr, npt); 507 NL_LOG(LOG_DEBUG2, "retcode: %d", error); 508 } 509 if ((hdr->nlmsg_flags & NLM_F_ACK) || (error != 0 && error != EINTR)) { 510 if (!npt->nw->suppress_ack) { 511 NL_LOG(LOG_DEBUG3, "ack"); 512 nlmsg_ack(nlp, error, hdr, npt); 513 } 514 } 515 516 return (0); 517 } 518 519 static void 520 npt_clear(struct nl_pstate *npt) 521 { 522 lb_clear(&npt->lb); 523 npt->error = 0; 524 npt->err_msg = NULL; 525 npt->err_off = 0; 526 npt->hdr = NULL; 527 npt->nw->suppress_ack = false; 528 } 529 530 /* 531 * Processes an incoming packet, which can contain multiple netlink messages 532 */ 533 static struct mbuf * 534 nl_process_mbuf(struct mbuf *m, struct nlpcb *nlp) 535 { 536 int offset, buffer_length; 537 struct nlmsghdr *hdr; 538 char *buffer; 539 int error; 540 541 NL_LOG(LOG_DEBUG3, "RX netlink mbuf %p on %p", m, nlp->nl_socket); 542 543 struct nl_writer nw = {}; 544 if (!nlmsg_get_unicast_writer(&nw, NLMSG_SMALL, nlp)) { 545 m_freem(m); 546 NL_LOG(LOG_DEBUG, "error allocating socket writer"); 547 return (NULL); 548 } 549 550 nlmsg_ignore_limit(&nw); 551 /* TODO: alloc this buf once for nlp */ 552 int data_length = m_length(m, NULL); 553 buffer_length = roundup2(data_length, 8) + SCRATCH_BUFFER_SIZE; 554 if (nlp->nl_linux) 555 buffer_length += roundup2(data_length, 8); 556 buffer = malloc(buffer_length, M_NETLINK, M_NOWAIT | M_ZERO); 557 if (buffer == NULL) { 558 m_freem(m); 559 nlmsg_flush(&nw); 560 NL_LOG(LOG_DEBUG, "Unable to allocate %d bytes of memory", 561 buffer_length); 562 return (NULL); 563 } 564 m_copydata(m, 0, data_length, buffer); 565 566 struct nl_pstate npt = { 567 .nlp = nlp, 568 .lb.base = &buffer[roundup2(data_length, 8)], 569 .lb.size = buffer_length - roundup2(data_length, 8), 570 .nw = &nw, 571 .strict = nlp->nl_flags & NLF_STRICT, 572 }; 573 574 for (offset = 0; offset + sizeof(struct nlmsghdr) <= data_length;) { 575 hdr = (struct nlmsghdr *)&buffer[offset]; 576 /* Save length prior to calling handler */ 577 int msglen = NLMSG_ALIGN(hdr->nlmsg_len); 578 NL_LOG(LOG_DEBUG3, "parsing offset %d/%d", offset, data_length); 579 npt_clear(&npt); 580 error = nl_receive_message(hdr, data_length - offset, nlp, &npt); 581 offset += msglen; 582 if (__predict_false(error != 0 || nlp->nl_tx_blocked)) 583 break; 584 } 585 NL_LOG(LOG_DEBUG3, "packet parsing done"); 586 free(buffer, M_NETLINK); 587 nlmsg_flush(&nw); 588 589 if (nlp->nl_tx_blocked) { 590 NLP_LOCK(nlp); 591 nlp->nl_tx_blocked = false; 592 NLP_UNLOCK(nlp); 593 m_adj(m, offset); 594 return (m); 595 } else { 596 m_freem(m); 597 return (NULL); 598 } 599 } 600