xref: /titanic_50/usr/src/lib/auditd_plugins/remote/transport.c (revision d6114e2d100d9ec3b45f9968d45ac2e3a0827af0)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  *
25  * transport layer for audit_remote (handles connection establishment, gss
26  * context initialization, message encryption and verification)
27  *
28  */
29 
30 #include <assert.h>
31 #include <audit_plugin.h>
32 #include <errno.h>
33 #include <fcntl.h>
34 #include <gssapi/gssapi.h>
35 #include <libintl.h>
36 #include <mtmalloc.h>
37 #include <netdb.h>
38 #include <netinet/in.h>
39 #include <netinet/tcp.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <string.h>
43 #include <strings.h>
44 #include <syslog.h>
45 #include <sys/types.h>
46 #include <sys/socket.h>
47 #include <unistd.h>
48 #include <poll.h>
49 #include <pthread.h>
50 
51 #include "audit_remote.h"
52 
53 
54 static int 		sockfd = -1;
55 static struct hostent	*current_host;
56 static gss_OID		*current_mech_oid;
57 static in_port_t 	current_port;
58 static boolean_t	flush_transq;
59 
60 static char		*ver_str = "01";	/* supported protocol version */
61 static char		*ver_str_concat;	/* concat serv/client version */
62 
63 static gss_ctx_id_t	gss_ctx;
64 static boolean_t	gss_ctx_initialized;
65 
66 pthread_t		recv_tid;		/* receiving thread */
67 static pthread_once_t	recv_once_control = PTHREAD_ONCE_INIT;
68 
69 extern int		timeout;		/* connection timeout */
70 
71 extern pthread_mutex_t	plugin_mutex;
72 transq_hdr_t		transq_hdr;
73 
74 /*
75  * The three locks synchronize the simultaneous actions on top of transmission
76  * queue, socket, gss_context.
77  */
78 pthread_mutex_t		transq_lock = PTHREAD_MUTEX_INITIALIZER;
79 pthread_mutex_t		sock_lock = PTHREAD_MUTEX_INITIALIZER;
80 pthread_mutex_t		gss_ctx_lock = PTHREAD_MUTEX_INITIALIZER;
81 
82 /* reset routine synchronization - required by the sending thread */
83 pthread_mutex_t		reset_lock = PTHREAD_MUTEX_INITIALIZER;
84 static boolean_t	reset_in_progress;	/* reset routine in progress */
85 
86 #define	NP_CLOSE	-1		/* notification pipe - close message */
87 #define	NP_EXIT		-2		/* notification pipe - exit message */
88 boolean_t		notify_pipe_ready;
89 int			notify_pipe[2]; /* notif. pipe - receiving thread */
90 
91 pthread_cond_t		reset_cv = PTHREAD_COND_INITIALIZER;
92 static close_rsn_t	recv_closure_rsn;
93 
94 #define	MAX_TOK_LEN	(128 * 1000)	/* max token length we accept (B) */
95 
96 /* transmission queue helpers */
97 static void		transq_dequeue(transq_node_t *);
98 static boolean_t	transq_enqueue(transq_node_t **, gss_buffer_t,
99     uint64_t);
100 static int		transq_retransmit(void);
101 
102 static boolean_t	init_poll(int);
103 static void 		do_reset(int *, struct pollfd *, boolean_t);
104 static void 		do_cleanup(int *, struct pollfd *, boolean_t);
105 
106 static void		init_recv_record(void);
107 static void		recv_record();
108 static int		connect_timeout(int, struct sockaddr *, int);
109 static int		send_timeout(int, const char *, size_t);
110 static int		recv_timeout(int, char *, size_t);
111 static int		send_token(int *, gss_buffer_t);
112 static int		recv_token(int, gss_buffer_t);
113 
114 
115 /*
116  * report_err() - wrapper, mainly due to enhance the code readability - report
117  * error to syslog via call to __audit_syslog().
118  */
119 static void
120 report_err(char *msg)
121 {
122 	__audit_syslog("audit_remote.so", LOG_CONS | LOG_NDELAY, LOG_DAEMON,
123 	    LOG_ERR, msg);
124 
125 }
126 
127 
128 /*
129  * report_gss_err() - GSS API error reporting
130  */
131 static void
132 report_gss_err(char *msg, OM_uint32 maj_stat, OM_uint32 min_stat)
133 {
134 	gss_buffer_desc	msg_buf;
135 	OM_uint32	_min, msg_ctx;
136 	char		*err_msg;
137 
138 	/* major stat */
139 	msg_ctx = 0;
140 	do {
141 		(void) gss_display_status(&_min, maj_stat, GSS_C_GSS_CODE,
142 		    *current_mech_oid, &msg_ctx, &msg_buf);
143 		(void) asprintf(&err_msg,
144 		    gettext("GSS API error - %s(%u): %.*s\n"), msg, maj_stat,
145 		    msg_buf.length, (char *)msg_buf.value);
146 		if (err_msg != NULL) {
147 			report_err(err_msg);
148 			free(err_msg);
149 		}
150 		(void) gss_release_buffer(&_min, &msg_buf);
151 	} while (msg_ctx);
152 
153 	/* minor stat */
154 	msg_ctx = 0;
155 	do {
156 		(void) gss_display_status(&_min, min_stat, GSS_C_MECH_CODE,
157 		    *current_mech_oid, &msg_ctx, &msg_buf);
158 		(void) asprintf(&err_msg,
159 		    gettext("GSS mech error - %s(%u): %.*s\n"), msg, min_stat,
160 		    msg_buf.length, (char *)msg_buf.value);
161 		if (err_msg != NULL) {
162 			report_err(err_msg);
163 			free(err_msg);
164 		}
165 		(void) gss_release_buffer(&_min, &msg_buf);
166 	} while (msg_ctx);
167 }
168 
169 /*
170  * prot_ver_negotiate() - negotiate/acknowledge the protocol version. Currently,
171  * there is only one version supported by the plugin - "01".
172  * Note: connection must be initiated prior version negotiation
173  */
174 static int
175 prot_ver_negotiate()
176 {
177 	gss_buffer_desc	out_buf, in_buf;
178 	size_t		ver_str_concat_sz;
179 
180 	/*
181 	 * Set the version proposal string - once we support more than
182 	 * version "01" this part should be extended to solve the concatenation
183 	 * of supported version identifiers.
184 	 */
185 	out_buf.value = (void *)ver_str;
186 	out_buf.length = strlen((char *)out_buf.value);
187 	DPRINT((dfile, "Protocol version proposal (size=%d): %.*s\n",
188 	    out_buf.length, out_buf.length, (char *)out_buf.value));
189 
190 	if (send_token(&sockfd, &out_buf) < 0) {
191 		DPRINT((dfile, "Sending protocol version token failed\n"));
192 		return (-1);
193 	}
194 
195 	if (recv_token(sockfd, &in_buf) < 0) {
196 		DPRINT((dfile, "Receiving protocol version token failed\n"));
197 		return (-1);
198 	}
199 
200 	/*
201 	 * Verify the sent/received string - memcmp() is sufficient here
202 	 * because we support only one version and it is represented by
203 	 * the "01" string. The received version has to be "01" string as well.
204 	 */
205 	if (out_buf.length != in_buf.length ||
206 	    memcmp(out_buf.value, in_buf.value, out_buf.length) != 0) {
207 		DPRINT((dfile, "Verification of the protocol version strings "
208 		    "failed [%d:%s][%d:%s]\n", out_buf.length,
209 		    (char *)out_buf.value, in_buf.length,
210 		    (char *)in_buf.value));
211 		free(in_buf.value);
212 		return (-1);
213 	}
214 
215 	/*
216 	 * Prepare the concatenated client/server version strings later used
217 	 * as an application_data field in the gss_channel_bindings_struct
218 	 * structure.
219 	 */
220 	ver_str_concat_sz = out_buf.length + in_buf.length + 1;
221 	ver_str_concat = (char *)calloc(1, ver_str_concat_sz);
222 	if (ver_str_concat == NULL) {
223 		report_err(gettext("Memory allocation failed"));
224 		DPRINT((dfile, "Memory allocation failed: %s\n",
225 		    strerror(errno)));
226 		free(in_buf.value);
227 		return (-1);
228 	}
229 	(void) memcpy(ver_str_concat, out_buf.value, out_buf.length);
230 	(void) memcpy(ver_str_concat + out_buf.length, in_buf.value,
231 	    in_buf.length);
232 	DPRINT((dfile, "Concatenated version strings: %s\n", ver_str_concat));
233 
234 	DPRINT((dfile, "Protocol version agreed.\n"));
235 	free(in_buf.value);
236 	return (0);
237 }
238 
239 /*
240  * sock_prepare() - creates and connects socket. Function returns
241  * B_FALSE/B_TRUE on failure/success and sets the err_rsn accordingly to the
242  * reason of failure.
243  */
244 static boolean_t
245 sock_prepare(int *sockfdptr, struct hostent *host, close_rsn_t *err_rsn)
246 {
247 	struct sockaddr_storage	addr;
248 	struct sockaddr_in	*sin;
249 	struct sockaddr_in6	*sin6;
250 	size_t			addr_len;
251 	int			sock;
252 
253 	DPRINT((dfile, "Creating socket for %s\n", host->h_name));
254 	bzero(&addr, sizeof (addr));
255 	addr.ss_family = host->h_addrtype;
256 	switch (host->h_addrtype) {
257 	case AF_INET:
258 		sin = (struct sockaddr_in *)&addr;
259 		addr_len = sizeof (struct sockaddr_in);
260 		bcopy(host->h_addr_list[0],
261 		    &(sin->sin_addr), sizeof (struct in_addr));
262 		sin->sin_port = current_port;
263 		break;
264 	case AF_INET6:
265 		sin6 = (struct sockaddr_in6 *)&addr;
266 		addr_len = sizeof (struct sockaddr_in6);
267 		bcopy(host->h_addr_list[0],
268 		    &(sin6->sin6_addr), sizeof (struct in6_addr));
269 		sin6->sin6_port = current_port;
270 		break;
271 	default:
272 		/* unknown address family */
273 		*err_rsn = RSN_UNKNOWN_AF;
274 		return (B_FALSE);
275 	}
276 	if ((sock = socket(addr.ss_family, SOCK_STREAM, 0)) == -1) {
277 		*err_rsn = RSN_SOCKET_CREATE;
278 		return (B_FALSE);
279 	}
280 	DPRINT((dfile, "Socket created, fd=%d, connecting..\n", sock));
281 
282 	if (connect_timeout(sock, (struct sockaddr *)&addr, addr_len)) {
283 		(void) close(sock);
284 		*err_rsn = RSN_CONNECTION_CREATE;
285 		return (B_FALSE);
286 	}
287 	*sockfdptr = sock;
288 	DPRINT((dfile, "Connected to %s via fd=%d\n", host->h_name,
289 	    *sockfdptr));
290 
291 	return (B_TRUE);
292 }
293 
294 /*
295  * establish_context() - establish the client/server GSS context.
296  *
297  * Note: connection must be established and version negotiated (in plain text)
298  * prior to establishing context.
299  */
300 static int
301 establish_context()
302 {
303 	gss_buffer_desc				send_tok, recv_tok, *token_ptr;
304 	OM_uint32				maj_stat, min_stat;
305 	OM_uint32				init_sec_min_stat, ret_flags;
306 	gss_name_t				gss_name;
307 	char					*gss_svc_name = "audit";
308 	char					*svc_name;
309 	struct gss_channel_bindings_struct	input_chan_bindings;
310 
311 	/* GSS service name = gss_svc_name + "@" + remote hostname (fqdn) */
312 	(void) asprintf(&svc_name, "%s@%s", gss_svc_name, current_host->h_name);
313 	if (svc_name == NULL) {
314 		report_err(gettext("Cannot allocate service name\n"));
315 		DPRINT((dfile, "Memory allocation failed: %s\n",
316 		    strerror(errno)));
317 		return (-1);
318 	}
319 	DPRINT((dfile, "Service name: %s\n", svc_name));
320 
321 	send_tok.value = svc_name;
322 	send_tok.length = strlen(svc_name);
323 	maj_stat = gss_import_name(&min_stat, &send_tok,
324 	    (gss_OID)GSS_C_NT_HOSTBASED_SERVICE, &gss_name);
325 	if (maj_stat != GSS_S_COMPLETE) {
326 		report_gss_err(gettext("initializing context"), maj_stat,
327 		    min_stat);
328 		free(svc_name);
329 		return (-1);
330 	}
331 	token_ptr = GSS_C_NO_BUFFER;
332 	gss_ctx = GSS_C_NO_CONTEXT;
333 
334 	/* initialize channel binding */
335 	bzero(&input_chan_bindings, sizeof (input_chan_bindings));
336 	input_chan_bindings.initiator_addrtype = GSS_C_AF_NULLADDR;
337 	input_chan_bindings.acceptor_addrtype = GSS_C_AF_NULLADDR;
338 	input_chan_bindings.application_data.length = strlen(ver_str_concat);
339 	input_chan_bindings.application_data.value = ver_str_concat;
340 
341 	(void) pthread_mutex_lock(&gss_ctx_lock);
342 	do {
343 		maj_stat = gss_init_sec_context(&init_sec_min_stat,
344 		    GSS_C_NO_CREDENTIAL, &gss_ctx, gss_name, *current_mech_oid,
345 		    GSS_C_MUTUAL_FLAG | GSS_C_REPLAY_FLAG | GSS_C_SEQUENCE_FLAG
346 		    | GSS_C_CONF_FLAG, 0, &input_chan_bindings, token_ptr,
347 		    NULL, &send_tok, &ret_flags, NULL);
348 
349 		if (token_ptr != GSS_C_NO_BUFFER) {
350 			(void) gss_release_buffer(&min_stat, &recv_tok);
351 		}
352 
353 		if (send_tok.length != 0) {
354 			DPRINT((dfile,
355 			    "Sending init_sec_context token (size=%d)\n",
356 			    send_tok.length));
357 			if (send_token(&sockfd, &send_tok) < 0) {
358 				free(svc_name);
359 				(void) gss_release_name(&min_stat, &gss_name);
360 				(void) pthread_mutex_unlock(&gss_ctx_lock);
361 				return (-1);
362 			}
363 		}
364 		if (send_tok.value != NULL) {
365 			free(send_tok.value);	/* freeing svc_name */
366 			send_tok.value = NULL;
367 			send_tok.length = 0;
368 		}
369 
370 		if (maj_stat != GSS_S_COMPLETE &&
371 		    maj_stat != GSS_S_CONTINUE_NEEDED) {
372 			report_gss_err(gettext("initializing context"),
373 			    maj_stat, init_sec_min_stat);
374 			if (gss_ctx == GSS_C_NO_CONTEXT) {
375 				(void) gss_delete_sec_context(&min_stat,
376 				    &gss_ctx, GSS_C_NO_BUFFER);
377 			}
378 			(void) gss_release_name(&min_stat, &gss_name);
379 			(void) pthread_mutex_unlock(&gss_ctx_lock);
380 			return (-1);
381 		}
382 
383 		if (maj_stat == GSS_S_CONTINUE_NEEDED) {
384 			DPRINT((dfile, "continue needed... "));
385 			if (recv_token(sockfd, &recv_tok) < 0) {
386 				(void) gss_release_name(&min_stat, &gss_name);
387 				(void) pthread_mutex_unlock(&gss_ctx_lock);
388 				return (-1);
389 			}
390 			token_ptr = &recv_tok;
391 		}
392 	} while (maj_stat == GSS_S_CONTINUE_NEEDED);
393 	(void) gss_release_name(&min_stat, &gss_name);
394 
395 	DPRINT((dfile, "context established\n"));
396 	(void) pthread_mutex_unlock(&gss_ctx_lock);
397 	return (0);
398 }
399 
400 /*
401  * delete_context() - release GSS context.
402  */
403 static void
404 delete_context()
405 {
406 	OM_uint32	min_stat;
407 
408 	(void) gss_delete_sec_context(&min_stat, &gss_ctx, GSS_C_NO_BUFFER);
409 	DPRINT((dfile, "context deleted\n"));
410 }
411 
412 /*
413  * send_token() - send GSS token over the wire.
414  */
415 static int
416 send_token(int *fdptr, gss_buffer_t tok)
417 {
418 	uint32_t	len;
419 	uint32_t	lensz;
420 	char		*out_buf;
421 	int		fd;
422 
423 	(void) pthread_mutex_lock(&sock_lock);
424 	if (*fdptr == -1) {
425 		(void) pthread_mutex_unlock(&sock_lock);
426 		DPRINT((dfile, "Socket detected as closed.\n"));
427 		return (-1);
428 	}
429 	fd = *fdptr;
430 
431 	len = htonl(tok->length);
432 	lensz = sizeof (len);
433 
434 	out_buf = (char *)malloc((size_t)(lensz + tok->length));
435 	if (out_buf == NULL) {
436 		(void) pthread_mutex_unlock(&sock_lock);
437 		report_err(gettext("Memory allocation failed"));
438 		DPRINT((dfile, "Memory allocation failed: %s\n",
439 		    strerror(errno)));
440 		return (-1);
441 	}
442 	(void) memcpy((void *)out_buf, (void *)&len, lensz);
443 	(void) memcpy((void *)(out_buf + lensz), (void *)tok->value,
444 	    tok->length);
445 
446 	if (send_timeout(fd, out_buf, (lensz + tok->length))) {
447 		(void) pthread_mutex_unlock(&sock_lock);
448 		free(out_buf);
449 		return (-1);
450 	}
451 
452 	(void) pthread_mutex_unlock(&sock_lock);
453 	free(out_buf);
454 	return (0);
455 }
456 
457 
458 /*
459  * recv_token() - receive GSS token over the wire.
460  */
461 static int
462 recv_token(int fd, gss_buffer_t tok)
463 {
464 	uint32_t	len;
465 
466 	if (recv_timeout(fd, (char *)&len, sizeof (len))) {
467 		return (-1);
468 	}
469 	len = ntohl(len);
470 
471 	/* simple DOS prevention mechanism */
472 	if (len > MAX_TOK_LEN) {
473 		report_err(gettext("Indicated invalid token length"));
474 		DPRINT((dfile, "Indicated token length > %dB\n", MAX_TOK_LEN));
475 		return (-1);
476 	}
477 
478 	tok->value = (char *)malloc(len);
479 	if (tok->value == NULL) {
480 		report_err(gettext("Memory allocation failed"));
481 		DPRINT((dfile, "Memory allocation failed: %s\n",
482 		    strerror(errno)));
483 		tok->length = 0;
484 		return (-1);
485 	}
486 
487 	if (recv_timeout(fd, tok->value, len)) {
488 		free(tok->value);
489 		tok->value = NULL;
490 		tok->length = 0;
491 		return (-1);
492 	}
493 
494 	tok->length = len;
495 	return (0);
496 }
497 
498 
499 /*
500  * I/O functions
501  */
502 
503 /*
504  * connect_timeout() - sets nonblocking I/O on a socket and timeout-connects
505  */
506 static int
507 connect_timeout(int sockfd, struct sockaddr *name, int namelen)
508 {
509 	int			flags;
510 	struct pollfd		fds;
511 	int			rc;
512 	struct sockaddr_storage	addr;
513 	socklen_t		addr_len = sizeof (addr);
514 
515 
516 	flags = fcntl(sockfd, F_GETFL, 0);
517 	if (fcntl(sockfd, F_SETFL, flags | O_NONBLOCK) == -1) {
518 		return (-1);
519 	}
520 	if (connect(sockfd, name, namelen)) {
521 		if (!(errno == EINTR || errno == EINPROGRESS ||
522 		    errno == EWOULDBLOCK)) {
523 			return (-1);
524 		}
525 	}
526 	fds.fd = sockfd;
527 	fds.events = POLLOUT;
528 	for (;;) {
529 		fds.revents = 0;
530 		rc = poll(&fds, 1, timeout * 1000);
531 		if (rc == 0) {	/* timeout */
532 			return (-1);
533 		} else if (rc < 0) {
534 			if (errno == EINTR || errno == EAGAIN) {
535 				continue;
536 			} else {
537 				return (-1);
538 			}
539 		}
540 		if (fds.revents) {
541 			if (getpeername(sockfd, (struct sockaddr *)&addr,
542 			    &addr_len))
543 				return (-1);
544 		} else {
545 			return (-1);
546 		}
547 		return (0);
548 	}
549 }
550 
551 /*
552  * send_timeout() - send data (in chunks if needed, each chunk in timeout secs).
553  */
554 static int
555 send_timeout(int fd, const char *buf, size_t len)
556 {
557 	int		bytes;
558 	struct pollfd	fds;
559 	int		rc;
560 
561 	fds.fd = fd;
562 	fds.events = POLLOUT;
563 
564 	while (len) {
565 		fds.revents = 0;
566 		rc = poll(&fds, 1, timeout * 1000);
567 		if (rc == 0) {	/* timeout */
568 			return (-1);
569 		} else if (rc < 0) {
570 			if (errno == EINTR || errno == EAGAIN) {
571 				continue;
572 			} else {
573 				return (-1);
574 			}
575 		}
576 		if (!fds.revents) {
577 			return (-1);
578 		}
579 
580 		bytes = write(fd, buf, len);
581 		if (bytes < 0) {
582 			if (errno == EINTR) {
583 				continue;
584 			} else {
585 				return (-1);
586 			}
587 		} else if (bytes == 0) {	/* eof */
588 			return (-1);
589 		}
590 
591 		len -= bytes;
592 		buf += bytes;
593 	}
594 
595 	return (0);
596 }
597 
598 /*
599  * recv_timeout() - receive data (in chunks if needed, each chunk in timeout
600  * secs). In case the function is called from receiving thread, the function
601  * cycles the poll() call in timeout seconds (waits for input from server).
602  */
603 static int
604 recv_timeout(int fd, char *buf, size_t len)
605 {
606 	int		bytes;
607 	struct pollfd	fds;
608 	int		rc;
609 
610 	fds.fd = fd;
611 	fds.events = POLLIN;
612 
613 	while (len) {
614 		fds.revents = 0;
615 		rc = poll(&fds, 1, timeout * 1000);
616 		if (rc == 0) {			/* timeout */
617 			return (-1);
618 		} else if (rc < 0) {
619 			if (errno == EINTR || errno == EAGAIN) {
620 				continue;
621 			} else {
622 				return (-1);
623 			}
624 		}
625 
626 		if (!fds.revents) {
627 			return (-1);
628 		}
629 
630 		bytes = read(fd, buf, len);
631 		if (bytes < 0) {
632 			if (errno == EINTR) {
633 				continue;
634 			} else {
635 				return (-1);
636 			}
637 		} else if (bytes == 0) {	/* eof */
638 			return (-1);
639 		}
640 
641 		len -= bytes;
642 		buf += bytes;
643 	}
644 
645 	return (0);
646 }
647 
648 /*
649  * read_fd() - reads data of length len from the given file descriptor fd to the
650  * buffer buf, in chunks if needed. Function returns B_FALSE on failure,
651  * otherwise B_TRUE. Function preserves errno, if it was set by the read(2).
652  */
653 static boolean_t
654 read_fd(int fd, char *buf, size_t len)
655 {
656 	int		bytes;
657 #ifdef DEBUG
658 	size_t		len_o = len;
659 #endif
660 
661 	while (len) {
662 		bytes = read(fd, buf, len);
663 		if (bytes < 0) {		/* err */
664 			if (errno == EINTR || errno == EAGAIN) {
665 				continue;
666 			} else {
667 				return (B_FALSE);
668 			}
669 		} else if (bytes == 0) {	/* eof */
670 			return (B_FALSE);
671 		}
672 
673 		len -= bytes;
674 		buf += bytes;
675 	}
676 
677 	DPRINT((dfile, "read_fd: Read %d bytes.\n", len_o - len));
678 	return (B_TRUE);
679 }
680 
681 /*
682  * write_fd() - writes buf of length len to the opened file descriptor fd, in
683  * chunks if needed. The data from the pipe are processed in the receiving
684  * thread. Function returns B_FALSE on failure, otherwise B_TRUE. Function
685  * preserves errno, if it was set by the write(2).
686  */
687 static boolean_t
688 write_fd(int fd, char *buf, size_t len)
689 {
690 	int		bytes;
691 #ifdef DEBUG
692 	size_t		len_o = len;
693 #endif
694 
695 	while (len) {
696 		bytes = write(fd, buf, len);
697 		if (bytes == -1) {		/* err */
698 			if (errno == EINTR || errno == EAGAIN) {
699 				continue;
700 			} else {
701 				return (B_FALSE);
702 			}
703 		}
704 
705 		len -= bytes;
706 		buf += bytes;
707 	}
708 
709 	DPRINT((dfile, "write_fd: Wrote %d bytes.\n", len_o - len));
710 	return (B_TRUE);
711 }
712 
713 /*
714  * Plug-in entry point
715  */
716 
717 /*
718  * send_record() - send an audit record to a host opening a connection,
719  * negotiate version and establish context if necessary.
720  */
721 send_record_rc_t
722 send_record(struct hostlist_s *hostlptr, const char *input, size_t in_len,
723     uint64_t sequence, close_rsn_t *err_rsn)
724 {
725 	gss_buffer_desc		in_buf, out_buf;
726 	OM_uint32		maj_stat, min_stat;
727 	int			conf_state;
728 	int			rc;
729 	transq_node_t		*node_ptr;
730 	uint64_t		seq_n;	/* sequence in the network byte order */
731 	boolean_t		init_sock_poll = B_FALSE;
732 
733 	/*
734 	 * We need to grab the reset_lock here, to prevent eventual
735 	 * unsynchronized cleanup calls within the reset routine (reset caused
736 	 * by the receiving thread) and the initialization calls in the
737 	 * send_record() code path.
738 	 */
739 	(void) pthread_mutex_lock(&reset_lock);
740 
741 	/*
742 	 * Check whether the socket was closed by the recv thread prior to call
743 	 * send_record() and behave accordingly to the reason of the closure.
744 	 */
745 	if (recv_closure_rsn != RSN_UNDEFINED) {
746 		*err_rsn = recv_closure_rsn;
747 		if (recv_closure_rsn == RSN_GSS_CTX_EXP) {
748 			rc = SEND_RECORD_RETRY;
749 		} else {
750 			rc = SEND_RECORD_NEXT;
751 		}
752 		recv_closure_rsn = RSN_UNDEFINED;
753 		(void) pthread_mutex_unlock(&reset_lock);
754 		return (rc);
755 	}
756 
757 	/*
758 	 * Send request to other then previously used host.
759 	 */
760 	if (current_host != hostlptr->host) {
761 		DPRINT((dfile, "Set new host: %s\n", hostlptr->host->h_name));
762 		if (sockfd != -1) {
763 			(void) pthread_mutex_unlock(&reset_lock);
764 			reset_transport(DO_CLOSE, DO_SYNC);
765 			return (SEND_RECORD_RETRY);
766 		}
767 		current_host = (struct hostent *)hostlptr->host;
768 		current_mech_oid = &hostlptr->mech;
769 		current_port = hostlptr->port;
770 	}
771 
772 	/* initiate the receiving thread */
773 	(void) pthread_once(&recv_once_control, init_recv_record);
774 
775 	/* create and connect() socket, negotiate the protocol version */
776 	if (sockfd == -1) {
777 		/* socket operations */
778 		DPRINT((dfile, "Socket creation and connect\n"));
779 		if (!sock_prepare(&sockfd, current_host, err_rsn)) {
780 			/* we believe the err_rsn set by sock_prepare() */
781 			(void) pthread_mutex_unlock(&reset_lock);
782 			return (SEND_RECORD_NEXT);
783 		}
784 
785 		/* protocol version negotiation */
786 		DPRINT((dfile, "Protocol version negotiation\n"));
787 		if (prot_ver_negotiate() != 0) {
788 			DPRINT((dfile,
789 			    "Protocol version negotiation failed\n"));
790 			(void) pthread_mutex_unlock(&reset_lock);
791 			reset_transport(DO_CLOSE, DO_SYNC);
792 			*err_rsn = RSN_PROTOCOL_NEGOTIATE;
793 			return (SEND_RECORD_NEXT);
794 		}
795 
796 		/* let the socket be initiated for poll() */
797 		init_sock_poll = B_TRUE;
798 	}
799 
800 	if (!gss_ctx_initialized) {
801 		DPRINT((dfile, "Establishing context..\n"));
802 		if (establish_context() != 0) {
803 			(void) pthread_mutex_unlock(&reset_lock);
804 			reset_transport(DO_CLOSE, DO_SYNC);
805 			*err_rsn = RSN_GSS_CTX_ESTABLISH;
806 			return (SEND_RECORD_NEXT);
807 		}
808 		gss_ctx_initialized = B_TRUE;
809 	}
810 
811 	/* let the recv thread poll() on the sockfd */
812 	if (init_sock_poll) {
813 		init_sock_poll = B_FALSE;
814 		if (!init_poll(sockfd)) {
815 			*err_rsn = RSN_INIT_POLL;
816 			(void) pthread_mutex_unlock(&reset_lock);
817 			return (SEND_RECORD_RETRY);
818 		}
819 	}
820 
821 	(void) pthread_mutex_unlock(&reset_lock);
822 
823 	/* if not empty, retransmit contents of the transmission queue */
824 	if (flush_transq) {
825 		DPRINT((dfile, "Retransmitting remaining (%ld) tokens from "
826 		    "the transmission queue\n", transq_hdr.count));
827 		if ((rc = transq_retransmit()) == 2) { /* gss context exp */
828 			reset_transport(DO_CLOSE, DO_SYNC);
829 			*err_rsn = RSN_GSS_CTX_EXP;
830 			return (SEND_RECORD_RETRY);
831 		} else if (rc == 1) {
832 			reset_transport(DO_CLOSE, DO_SYNC);
833 			*err_rsn = RSN_OTHER_ERR;
834 			return (SEND_RECORD_NEXT);
835 		}
836 		flush_transq = B_FALSE;
837 	}
838 
839 	/*
840 	 * Concatenate sequence number and the new record. Note, that the
841 	 * pointer to the chunk of memory allocated for the concatenated values
842 	 * is later passed to the transq_enqueu() function which stores the
843 	 * pointer in the transmission queue; subsequently called
844 	 * transq_dequeue() frees the allocated memory once the MIC is verified
845 	 * by the recv_record() function.
846 	 *
847 	 * If we return earlier than the transq_enqueue() is called, it's
848 	 * necessary to free the in_buf.value explicitly prior to return.
849 	 *
850 	 */
851 	in_buf.length = in_len + sizeof (sequence);
852 	in_buf.value = malloc(in_buf.length);
853 	if (in_buf.value == NULL) {
854 			report_err(gettext("Memory allocation failed"));
855 			DPRINT((dfile, "Memory allocation failed: %s\n",
856 			    strerror(errno)));
857 			reset_transport(DO_CLOSE, DO_SYNC);
858 			*err_rsn = RSN_MEMORY_ALLOCATE;
859 			return (SEND_RECORD_FAIL);
860 	}
861 	seq_n = htonll(sequence);
862 	(void) memcpy(in_buf.value, &seq_n, sizeof (seq_n));
863 	(void) memcpy((char *)in_buf.value + sizeof (seq_n), input, in_len);
864 
865 	/* wrap sequence number and the new record to the per-message token */
866 	(void) pthread_mutex_lock(&gss_ctx_lock);
867 	if (gss_ctx != NULL) {
868 		maj_stat = gss_wrap(&min_stat, gss_ctx, 1, GSS_C_QOP_DEFAULT,
869 		    &in_buf, &conf_state, &out_buf);
870 		(void) pthread_mutex_unlock(&gss_ctx_lock);
871 		switch (maj_stat) {
872 		case GSS_S_COMPLETE:
873 			break;
874 		case GSS_S_CONTEXT_EXPIRED:
875 			reset_transport(DO_CLOSE, DO_SYNC);
876 			free(in_buf.value);
877 			*err_rsn = RSN_GSS_CTX_EXP;
878 			return (SEND_RECORD_RETRY);
879 		default:
880 			report_gss_err(gettext("gss_wrap message"), maj_stat,
881 			    min_stat);
882 			reset_transport(DO_CLOSE, DO_SYNC);
883 			free(in_buf.value);
884 			*err_rsn = RSN_OTHER_ERR;
885 			return (SEND_RECORD_NEXT);
886 		}
887 	} else {	/* GSS context deleted by the recv thread */
888 		(void) pthread_mutex_unlock(&gss_ctx_lock);
889 		reset_transport(DO_CLOSE, DO_SYNC);
890 		free(in_buf.value);
891 		*err_rsn = RSN_OTHER_ERR;
892 		return (SEND_RECORD_NEXT);
893 	}
894 
895 
896 	/* enqueue the to-be-sent token into transmission queue */
897 	(void) pthread_mutex_lock(&transq_lock);
898 	if (!transq_enqueue(&node_ptr, &in_buf, sequence)) {
899 		(void) pthread_mutex_unlock(&transq_lock);
900 		reset_transport(DO_CLOSE, DO_SYNC);
901 		free(in_buf.value);
902 		(void) gss_release_buffer(&min_stat, &out_buf);
903 		*err_rsn = RSN_OTHER_ERR;
904 		return (SEND_RECORD_RETRY);
905 	}
906 	DPRINT((dfile, "Token enqueued for later verification\n"));
907 	(void) pthread_mutex_unlock(&transq_lock);
908 
909 	/* send token */
910 	if (send_token(&sockfd, &out_buf) < 0) {
911 		DPRINT((dfile, "Token sending failed\n"));
912 		reset_transport(DO_CLOSE, DO_SYNC);
913 		(void) gss_release_buffer(&min_stat, &out_buf);
914 
915 		(void) pthread_mutex_lock(&transq_lock);
916 		transq_dequeue(node_ptr);
917 		(void) pthread_mutex_unlock(&transq_lock);
918 
919 		*err_rsn = RSN_OTHER_ERR;
920 		return (SEND_RECORD_NEXT);
921 	}
922 	DPRINT((dfile, "Token sent (transq size = %ld)\n", transq_hdr.count));
923 
924 	(void) gss_release_buffer(&min_stat, &out_buf);
925 
926 	return (SEND_RECORD_SUCCESS);
927 }
928 
929 /*
930  * init_recv_record() - initialize the receiver thread
931  */
932 static void
933 init_recv_record()
934 {
935 	DPRINT((dfile, "Initiating the recv thread\n"));
936 	(void) pthread_create(&recv_tid, NULL, (void *(*)(void *))recv_record,
937 	    (void *)NULL);
938 
939 }
940 
941 
942 /*
943  * recv_record() - the receiver thread routine
944  */
945 static void
946 recv_record()
947 {
948 	OM_uint32		maj_stat, min_stat;
949 	gss_qop_t		qop_state;
950 	gss_buffer_desc		in_buf = GSS_C_EMPTY_BUFFER;
951 	gss_buffer_desc		in_buf_mic = GSS_C_EMPTY_BUFFER;
952 	transq_node_t		*cur_node;
953 	uint64_t		r_seq_num;	/* received sequence number */
954 	boolean_t		token_verified;
955 	boolean_t		break_flag;
956 	struct pollfd		fds[2];
957 	int			fds_cnt;
958 	struct pollfd		*pipe_fd = &fds[0];
959 	struct pollfd		*recv_fd = &fds[1];
960 	uint32_t		len;
961 	int			rc;
962 	pipe_msg_t		np_data;
963 
964 	DPRINT((dfile, "Receiver thread initiated\n"));
965 
966 	/*
967 	 * Fill in the information in the vector of file descriptors passed
968 	 * later on to the poll() function. In the initial state, there is only
969 	 * one struct pollfd in the vector which contains file descriptor of the
970 	 * notification pipe - notify_pipe[1]. There might be up to two file
971 	 * descriptors (struct pollfd) in the vector - notify_pipe[1] which
972 	 * resides in the vector during the entire life of the receiving thread,
973 	 * and the own file descriptor from which we read data sent by the
974 	 * remote server application.
975 	 */
976 	pipe_fd->fd = notify_pipe[1];
977 	pipe_fd->events = POLLIN;
978 	recv_fd->fd = -1;
979 	recv_fd->events = POLLIN;
980 	fds_cnt = 1;
981 
982 	/*
983 	 * In the endless loop, try to grab some data from the socket or
984 	 * notify_pipe[1].
985 	 */
986 	for (;;) {
987 
988 		pipe_fd->revents = 0;
989 		recv_fd->revents = 0;
990 		recv_closure_rsn = RSN_UNDEFINED;
991 
992 		/* block on poll, thus rc != 0 */
993 		rc = poll(fds, fds_cnt, -1);
994 		if (rc == -1) {
995 			if (errno == EAGAIN || errno == EINTR) {
996 				/* silently continue on EAGAIN || EINTR */
997 				continue;
998 			} else {
999 				/* log the debug message in any other case */
1000 				DPRINT((dfile, "poll() failed: %s\n",
1001 				    strerror(errno)));
1002 				report_err(gettext("poll() failed.\n"));
1003 				continue;
1004 			}
1005 		}
1006 
1007 		/*
1008 		 * Receive a message from the notification pipe. Information
1009 		 * from the notification pipe takes precedence over the received
1010 		 * data from the remote server application.
1011 		 *
1012 		 * Notification pipe message format - message accepted
1013 		 * from the notify pipe comprises of two parts (int ||
1014 		 * boolean_t), where if the first part (sizeof (int)) equals
1015 		 * NP_CLOSE, then the second part (sizeof (boolean_t)) signals
1016 		 * the necessity of broadcasting (DO_SYNC/DO_NOT_SYNC) the end
1017 		 * of the reset routine.
1018 		 */
1019 		if (pipe_fd->revents & POLLIN) {
1020 			DPRINT((dfile, "An event on notify pipe detected\n"));
1021 			if (!read_fd(pipe_fd->fd, (char *)&np_data,
1022 			    sizeof (np_data))) {
1023 				DPRINT((dfile, "Reading notify pipe failed: "
1024 				    "%s\n", strerror(errno)));
1025 				report_err(gettext("Reading notify pipe "
1026 				    "failed"));
1027 			} else {
1028 				switch (np_data.sock_num) {
1029 				case NP_EXIT:	/* exit receiving thread */
1030 					do_cleanup(&fds_cnt, recv_fd,
1031 					    np_data.sync);
1032 					pthread_exit((void *)NULL);
1033 					break;
1034 				case NP_CLOSE:	/* close and remove recv_fd */
1035 					do_reset(&fds_cnt, recv_fd,
1036 					    np_data.sync);
1037 					continue;
1038 				default:	/* add rc_pipe to the fds */
1039 					recv_fd->fd = np_data.sock_num;
1040 					fds_cnt = 2;
1041 					continue;
1042 				}
1043 			}
1044 		}
1045 		/* Receive a token from the remote server application */
1046 		if (recv_fd->revents & POLLIN) {
1047 			DPRINT((dfile, "An event on fd detected\n"));
1048 			if (!read_fd(recv_fd->fd, (char *)&len, sizeof (len))) {
1049 				DPRINT((dfile, "Token length recv failed\n"));
1050 				recv_closure_rsn = RSN_TOK_RECV_FAILED;
1051 				reset_transport(DO_CLOSE, DO_NOT_SYNC);
1052 				continue;
1053 			}
1054 			len = ntohl(len);
1055 
1056 			/* simple DOS prevention mechanism */
1057 			if (len > MAX_TOK_LEN) {
1058 				report_err(gettext("Indicated invalid token "
1059 				    "length"));
1060 				DPRINT((dfile, "Indicated token length > %dB\n",
1061 				    MAX_TOK_LEN));
1062 				recv_closure_rsn = RSN_TOK_TOO_BIG;
1063 				reset_transport(DO_CLOSE, DO_NOT_SYNC);
1064 				continue;
1065 			}
1066 
1067 			in_buf.value = (char *)malloc(len);
1068 			if (in_buf.value == NULL) {
1069 				report_err(gettext("Memory allocation failed"));
1070 				DPRINT((dfile, "Memory allocation failed: %s\n",
1071 				    strerror(errno)));
1072 				recv_closure_rsn = RSN_MEMORY_ALLOCATE;
1073 				reset_transport(DO_CLOSE, DO_NOT_SYNC);
1074 				continue;
1075 			}
1076 			if (!read_fd(recv_fd->fd, (char *)in_buf.value, len)) {
1077 				DPRINT((dfile, "Token value recv failed\n"));
1078 				free(in_buf.value);
1079 				recv_closure_rsn = RSN_TOK_RECV_FAILED;
1080 				reset_transport(DO_CLOSE, DO_NOT_SYNC);
1081 				continue;
1082 			}
1083 
1084 			in_buf.length = len;
1085 		}
1086 
1087 		/*
1088 		 * Extract the sequence number and the MIC from
1089 		 * the per-message token
1090 		 */
1091 		(void) memcpy(&r_seq_num, in_buf.value, sizeof (r_seq_num));
1092 		r_seq_num = ntohll(r_seq_num);
1093 		in_buf_mic.length = in_buf.length - sizeof (r_seq_num);
1094 		in_buf_mic.value = (char *)in_buf.value + sizeof (r_seq_num);
1095 
1096 		/*
1097 		 * seq_num/r_seq_num - the sequence number does not need to
1098 		 * be unique in the transmission queue. Any token in the
1099 		 * transmission queue with the same seq_num as the acknowledge
1100 		 * token received from the server is tested. This is due to the
1101 		 * fact that the plugin cannot influence (in the current
1102 		 * implementation) sequence numbers generated by the kernel (we
1103 		 * are reusing record sequence numbers as a transmission queue
1104 		 * sequence numbers). The probability of having two or more
1105 		 * tokens in the transmission queue is low and at the same time
1106 		 * the performance gain due to using sequence numbers is quite
1107 		 * high.
1108 		 *
1109 		 * In case a harder condition with regard to duplicate sequence
1110 		 * numbers in the transmission queue will be desired over time,
1111 		 * the break_flag behavior used below should be
1112 		 * removed/changed_accordingly.
1113 		 */
1114 		break_flag = B_FALSE;
1115 		token_verified = B_FALSE;
1116 		(void) pthread_mutex_lock(&transq_lock);
1117 		cur_node = transq_hdr.head;
1118 		while (cur_node != NULL && !break_flag) {
1119 			if (cur_node->seq_num != r_seq_num) {
1120 				cur_node = cur_node->next;
1121 				continue;
1122 			}
1123 
1124 			(void) pthread_mutex_lock(&gss_ctx_lock);
1125 			maj_stat = gss_verify_mic(&min_stat, gss_ctx,
1126 			    &(cur_node->seq_token), &in_buf_mic,
1127 			    &qop_state);
1128 			(void) pthread_mutex_unlock(&gss_ctx_lock);
1129 
1130 			if (!GSS_ERROR(maj_stat)) { /* the success case */
1131 				switch (maj_stat) {
1132 				/*
1133 				 * All the GSS_S_OLD_TOKEN, GSS_S_UNSEQ_TOKEN,
1134 				 * GSS_S_GAP_TOKEN are perceived as correct
1135 				 * behavior of the server side. The plugin
1136 				 * implementation is resistant to any of the
1137 				 * above mention cases of returned status codes.
1138 				 */
1139 				/*FALLTHRU*/
1140 				case GSS_S_OLD_TOKEN:
1141 				case GSS_S_UNSEQ_TOKEN:
1142 				case GSS_S_GAP_TOKEN:
1143 				case GSS_S_COMPLETE:
1144 					/*
1145 					 * remove the verified record/node from
1146 					 * the transmission queue
1147 					 */
1148 					transq_dequeue(cur_node);
1149 					DPRINT((dfile, "Recv thread verified "
1150 					    "the token (transq len = %ld)\n",
1151 					    transq_hdr.count));
1152 
1153 					token_verified = B_TRUE;
1154 					break_flag = B_TRUE;
1155 					break;
1156 
1157 				/*
1158 				 * Both the default case as well as
1159 				 * GSS_S_DUPLICATE_TOKEN case should never
1160 				 * occur. It's been left here for the sake of
1161 				 * completeness.
1162 				 * If any of the two cases occur, it is
1163 				 * subsequently cought because we don't set
1164 				 * the token_verified flag.
1165 				 */
1166 				/*FALLTHRU*/
1167 				case GSS_S_DUPLICATE_TOKEN:
1168 				default:
1169 					break_flag = B_TRUE;
1170 					break;
1171 				} /* switch (maj_stat) */
1172 
1173 			} else { 	/* the failure case */
1174 				report_gss_err(
1175 				    gettext("signature verification of the "
1176 				    "received token failed"),
1177 				    maj_stat, min_stat);
1178 
1179 				switch (maj_stat) {
1180 				case GSS_S_CONTEXT_EXPIRED:
1181 					/* retransmission necessary */
1182 					recv_closure_rsn = RSN_GSS_CTX_EXP;
1183 					break_flag = B_TRUE;
1184 					DPRINT((dfile, "Recv thread detected "
1185 					    "the GSS context expiration\n"));
1186 					break;
1187 				case GSS_S_BAD_SIG:
1188 					DPRINT((dfile, "Bad signature "
1189 					    "detected (seq_num = %lld)\n",
1190 					    cur_node->seq_num));
1191 					cur_node = cur_node->next;
1192 					break;
1193 				default:
1194 					report_gss_err(
1195 					    gettext("signature verification"),
1196 					    maj_stat, min_stat);
1197 					break_flag = B_TRUE;
1198 					break;
1199 				}
1200 			}
1201 
1202 		} /* while */
1203 		(void) pthread_mutex_unlock(&transq_lock);
1204 
1205 		if (in_buf.value != NULL) {
1206 			free(in_buf.value);
1207 			in_buf.value = NULL;
1208 			in_buf.length = 0;
1209 		}
1210 
1211 		if (!token_verified) {
1212 			/*
1213 			 * Received, but unverifiable token is perceived as
1214 			 * the protocol flow corruption with the penalty of
1215 			 * reinitializing the client/server connection.
1216 			 */
1217 			DPRINT((dfile, "received unverifiable token\n"));
1218 			report_err(gettext("received unverifiable token\n"));
1219 			if (recv_closure_rsn == RSN_UNDEFINED) {
1220 				recv_closure_rsn = RSN_TOK_UNVERIFIABLE;
1221 			}
1222 			reset_transport(DO_CLOSE, DO_NOT_SYNC);
1223 		}
1224 
1225 	} /* for (;;) */
1226 
1227 
1228 }
1229 
1230 
1231 /*
1232  * init_poll() - initiates the polling in the receiving thread via sending the
1233  * appropriate message over the notify pipe. Message format = (int ||
1234  * booleant_t), where the first part (sizeof (int)) contains the
1235  * newly_opened/to_be_polled socket file descriptor. The contents of the second
1236  * part (sizeof (boolean_t)) of the message works only as a padding here and no
1237  * action (no recv/send thread synchronisation) is made in the receiving thread
1238  * based on its value.
1239  */
1240 static boolean_t
1241 init_poll(int fd)
1242 {
1243 	pipe_msg_t	np_data;
1244 	int		pipe_in = notify_pipe[0];
1245 
1246 	np_data.sock_num = fd;
1247 	np_data.sync = B_FALSE;	/* padding only */
1248 
1249 	if (!write_fd(pipe_in, (char *)&np_data, sizeof (np_data))) {
1250 		DPRINT((dfile, "Cannot write to the notify pipe\n"));
1251 		report_err(gettext("writing to the notify pipe failed"));
1252 		return (B_FALSE);
1253 	}
1254 
1255 	return (B_TRUE);
1256 }
1257 
1258 
1259 /*
1260  * reset_transport() - locked by the reset_lock initiates the reset of socket,
1261  * GSS security context and (possibly) flags the transq for retransmission; for
1262  * more detailed information see do_reset(). The reset_transport() also allows
1263  * the synchronization - waiting for the reset to be finished.
1264  *
1265  * do_close: DO_SYNC, DO_NOT_SYNC
1266  * sync_on_return: DO_EXIT (DO_NOT_CLOSE), DO_CLOSE (DO_NOT_EXIT)
1267  *
1268  */
1269 void
1270 reset_transport(boolean_t do_close, boolean_t sync_on_return)
1271 {
1272 	int		pipe_in = notify_pipe[0];
1273 	pipe_msg_t	np_data;
1274 
1275 	/*
1276 	 * Check if the reset routine is in progress or whether it was already
1277 	 * executed by some other thread.
1278 	 */
1279 	(void) pthread_mutex_lock(&reset_lock);
1280 	if (reset_in_progress) {
1281 		(void) pthread_mutex_unlock(&reset_lock);
1282 		return;
1283 	}
1284 	reset_in_progress = B_TRUE;
1285 
1286 	np_data.sock_num = (do_close ? NP_CLOSE : NP_EXIT);
1287 	np_data.sync = sync_on_return;
1288 	(void) write_fd(pipe_in, (char *)&np_data, sizeof (np_data));
1289 
1290 	if (sync_on_return) {
1291 		while (reset_in_progress) {
1292 			(void) pthread_cond_wait(&reset_cv, &reset_lock);
1293 			DPRINT((dfile, "Wait for sync\n"));
1294 		}
1295 		DPRINT((dfile, "Synced\n"));
1296 	}
1297 	(void) pthread_mutex_unlock(&reset_lock);
1298 
1299 }
1300 
1301 
1302 /*
1303  * do_reset() - the own reseting routine called from the recv thread. If the
1304  * synchronization was requested, signal the finish via conditional variable.
1305  */
1306 static void
1307 do_reset(int *fds_cnt, struct pollfd *recv_fd, boolean_t do_signal)
1308 {
1309 
1310 	(void) pthread_mutex_lock(&reset_lock);
1311 
1312 	/* socket */
1313 	(void) pthread_mutex_lock(&sock_lock);
1314 	if (sockfd == -1) {
1315 		DPRINT((dfile, "socket already closed\n"));
1316 		(void) pthread_mutex_unlock(&sock_lock);
1317 		goto out;
1318 	} else {
1319 		(void) close(sockfd);
1320 		sockfd = -1;
1321 		recv_fd->fd = -1;
1322 		(void) pthread_mutex_unlock(&sock_lock);
1323 	}
1324 	*fds_cnt = 1;
1325 
1326 	/* context */
1327 	if (gss_ctx_initialized) {
1328 		delete_context();
1329 	}
1330 	gss_ctx_initialized = B_FALSE;
1331 	gss_ctx = NULL;
1332 
1333 	/* mark transq to be flushed */
1334 	(void) pthread_mutex_lock(&transq_lock);
1335 	if (transq_hdr.count > 0) {
1336 		flush_transq = B_TRUE;
1337 	}
1338 	(void) pthread_mutex_unlock(&transq_lock);
1339 
1340 out:
1341 	reset_in_progress = B_FALSE;
1342 	if (do_signal) {
1343 		(void) pthread_cond_broadcast(&reset_cv);
1344 	}
1345 
1346 	(void) pthread_mutex_unlock(&reset_lock);
1347 }
1348 
1349 /*
1350  * do_cleanup() - removes all the preallocated space by the plugin; prepares the
1351  * plugin/application to be gracefully finished. Even thought the function
1352  * allows execution without signalling the successful finish, it's recommended
1353  * to use it (we usually want to wait for cleanup before exiting).
1354  */
1355 static void
1356 do_cleanup(int *fds_cnt, struct pollfd *recv_fd, boolean_t do_signal)
1357 {
1358 
1359 	(void) pthread_mutex_lock(&reset_lock);
1360 
1361 	/*
1362 	 * socket
1363 	 * note: keeping locking for safety, thought it shouldn't be necessary
1364 	 * in current implementation - we get here only in case the sending code
1365 	 * path calls auditd_plugin_close() (thus no socket manipulation) and
1366 	 * the recv thread is doing the own socket closure.
1367 	 */
1368 	(void) pthread_mutex_lock(&sock_lock);
1369 	if (sockfd != -1) {
1370 		DPRINT((dfile, "Closing socket: %d\n", sockfd));
1371 		(void) close(sockfd);
1372 		sockfd = -1;
1373 		recv_fd->fd = -1;
1374 	}
1375 	*fds_cnt = 1;
1376 	(void) pthread_mutex_unlock(&sock_lock);
1377 
1378 	/* context */
1379 	if (gss_ctx_initialized) {
1380 		DPRINT((dfile, "Deleting context: "));
1381 		delete_context();
1382 	}
1383 	gss_ctx_initialized = B_FALSE;
1384 	gss_ctx = NULL;
1385 
1386 	/* transmission queue */
1387 	(void) pthread_mutex_lock(&transq_lock);
1388 	if (transq_hdr.count > 0) {
1389 		DPRINT((dfile, "Deallocating the transmission queue "
1390 		    "(len = %ld)\n", transq_hdr.count));
1391 		while (transq_hdr.count > 0) {
1392 			transq_dequeue(transq_hdr.head);
1393 		}
1394 	}
1395 	(void) pthread_mutex_unlock(&transq_lock);
1396 
1397 	/* notification pipe */
1398 	if (notify_pipe_ready) {
1399 		(void) close(notify_pipe[0]);
1400 		(void) close(notify_pipe[1]);
1401 		notify_pipe_ready = B_FALSE;
1402 	}
1403 
1404 	reset_in_progress = B_FALSE;
1405 	if (do_signal) {
1406 		(void) pthread_cond_broadcast(&reset_cv);
1407 	}
1408 	(void) pthread_mutex_unlock(&reset_lock);
1409 }
1410 
1411 
1412 /*
1413  * transq_dequeue() - dequeues given node pointed by the node_ptr from the
1414  * transmission queue. Transmission queue should be locked prior to use of this
1415  * function.
1416  */
1417 static void
1418 transq_dequeue(transq_node_t *node_ptr)
1419 {
1420 
1421 	if (node_ptr == NULL) {
1422 		DPRINT((dfile, "transq_dequeue(): called with NULL pointer\n"));
1423 		return;
1424 	}
1425 
1426 	free(node_ptr->seq_token.value);
1427 
1428 	if (node_ptr->prev != NULL) {
1429 		node_ptr->prev->next = node_ptr->next;
1430 	}
1431 	if (node_ptr->next != NULL) {
1432 		node_ptr->next->prev = node_ptr->prev;
1433 	}
1434 
1435 
1436 	/* update the transq_hdr */
1437 	if (node_ptr->next == NULL) {
1438 		transq_hdr.end = node_ptr->prev;
1439 	}
1440 	if (node_ptr->prev == NULL) {
1441 		transq_hdr.head = node_ptr->next;
1442 	}
1443 
1444 	transq_hdr.count--;
1445 
1446 	free(node_ptr);
1447 }
1448 
1449 
1450 /*
1451  * transq_enqueue() - creates new node in (at the end of) the transmission
1452  * queue. in_ptoken_ptr is a pointer to the plain token in a form of
1453  * gss_buffer_desc. Function returns 0 on success and updates the *node_ptr to
1454  * point to a newly added transmission queue node. In case of any failure
1455  * function returns 1 and sets the *node_ptr to NULL.
1456  * Transmission queue should be locked prior to use of this function.
1457  */
1458 static boolean_t
1459 transq_enqueue(transq_node_t **node_ptr, gss_buffer_t in_seqtoken_ptr,
1460     uint64_t sequence)
1461 {
1462 
1463 	*node_ptr = calloc(1, sizeof (transq_node_t));
1464 	if (*node_ptr == NULL) {
1465 		report_err(gettext("Memory allocation failed"));
1466 		DPRINT((dfile, "Memory allocation failed: %s\n",
1467 		    strerror(errno)));
1468 		goto errout;
1469 	}
1470 
1471 	/* value of the seq_token.value = (sequence number || plain token) */
1472 	(*node_ptr)->seq_num = sequence;
1473 	(*node_ptr)->seq_token.length = in_seqtoken_ptr->length;
1474 	(*node_ptr)->seq_token.value = in_seqtoken_ptr->value;
1475 
1476 	/* update the transq_hdr */
1477 	if (transq_hdr.head == NULL) {
1478 		transq_hdr.head = *node_ptr;
1479 	}
1480 	if (transq_hdr.end != NULL) {
1481 		(transq_hdr.end)->next = *node_ptr;
1482 		(*node_ptr)->prev = transq_hdr.end;
1483 	}
1484 	transq_hdr.end = *node_ptr;
1485 
1486 	transq_hdr.count++;
1487 
1488 	return (B_TRUE);
1489 
1490 errout:
1491 	if (*node_ptr != NULL) {
1492 		if ((*node_ptr)->seq_token.value != NULL) {
1493 			free((*node_ptr)->seq_token.value);
1494 		}
1495 		free(*node_ptr);
1496 		*node_ptr = NULL;
1497 	}
1498 	return (B_FALSE);
1499 }
1500 
1501 
1502 /*
1503  * transq_retransmit() - traverse the transmission queue and try to, 1 by 1,
1504  * re-wrap the tokens with the recent context information and retransmit the
1505  * tokens from the transmission queue.
1506  * Function returns 2 on GSS context expiration, 1 on any other error, 0 on
1507  * successfully resent transmission queue.
1508  */
1509 static int
1510 transq_retransmit()
1511 {
1512 
1513 	OM_uint32	maj_stat, min_stat;
1514 	transq_node_t	*cur_node = transq_hdr.head;
1515 	gss_buffer_desc	out_buf;
1516 	int		conf_state;
1517 
1518 	DPRINT((dfile, "Retransmission of the remainder in the transqueue\n"));
1519 
1520 	while (cur_node != NULL) {
1521 
1522 		(void) pthread_mutex_lock(&transq_lock);
1523 		(void) pthread_mutex_lock(&gss_ctx_lock);
1524 		maj_stat = gss_wrap(&min_stat, gss_ctx, 1, GSS_C_QOP_DEFAULT,
1525 		    &(cur_node->seq_token), &conf_state, &out_buf);
1526 		(void) pthread_mutex_unlock(&gss_ctx_lock);
1527 
1528 		switch (maj_stat) {
1529 		case GSS_S_COMPLETE:
1530 			break;
1531 		case GSS_S_CONTEXT_EXPIRED:
1532 			DPRINT((dfile, "Context expired.\n"));
1533 			report_gss_err(gettext("gss_wrap message"), maj_stat,
1534 			    min_stat);
1535 			(void) pthread_mutex_unlock(&transq_lock);
1536 			return (2);
1537 		default:
1538 			report_gss_err(gettext("gss_wrap message"), maj_stat,
1539 			    min_stat);
1540 			(void) pthread_mutex_unlock(&transq_lock);
1541 			return (1);
1542 		}
1543 
1544 		DPRINT((dfile, "Sending transmission queue token (seq=%lld, "
1545 		    "size=%d, transq len=%ld)\n", cur_node->seq_num,
1546 		    out_buf.length, transq_hdr.count));
1547 		if (send_token(&sockfd, &out_buf) < 0) {
1548 			(void) gss_release_buffer(&min_stat, &out_buf);
1549 			(void) pthread_mutex_unlock(&transq_lock);
1550 			return (1);
1551 		}
1552 		(void) gss_release_buffer(&min_stat, &out_buf);
1553 
1554 		cur_node = cur_node->next;
1555 		(void) pthread_mutex_unlock(&transq_lock);
1556 
1557 	} /* while */
1558 
1559 	return (0);
1560 }
1561