xref: /titanic_44/usr/src/cmd/fm/modules/SUNW,SPARC-Enterprise/event-transport/ex_dscp.c (revision 677fd05c3b05c78948501f6ffdced37dab9368fe)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 /*
28  * FMA Event Transport Module Transport Layer API implementation.
29  *
30  * Library for establishing connections and transporting FMA events between
31  * ETMs (event-transport modules) in separate fault domains.
32  *
33  * The transport for this library is internet socket based and uses the DSCP
34  * client services library (libdscp).
35  */
36 
37 #include "ex_dscp.h"
38 
39 /*
40  * On the SP, there is one DSCP interface for every domain.
41  * Each domain has one and only one DSCP interface to the SP.
42  *
43  * The DSCP interface is created when the domain powers-on.  On the SP,
44  * a sysevent will be generated when the DSCP interface is up.  On the domain,
45  * the DSCP interface should be up when ETM loads.
46  */
47 
48 exs_conn_t Acc;				/* Connection for accepting/listening */
49 pthread_t Acc_tid;			/* Thread ID for accepting conns */
50 int Acc_quit;				/* Signal to quit the acceptor thread */
51 int Acc_destroy;			/* Destroy accept/listen thread? */
52 exs_hdl_t *Exh_head = NULL;		/* Head of ex_hdl_t list */
53 pthread_mutex_t	List_lock = PTHREAD_MUTEX_INITIALIZER;
54 					/* Protects linked list of ex_hdl_t */
55 static void *Dlp = NULL;		/* Handle for dlopen/dlclose/dlsym */
56 static int (*Send_filter)(fmd_hdl_t *hdl, nvlist_t *event, const char *dest);
57 static int (*Post_filter)(fmd_hdl_t *hdl, nvlist_t *event, const char *src);
58 
59 /*
60  * * * * * * * * * * * * * *
61  * Module specific routines
62  * * * * * * * * * * * * * *
63  */
64 
65 /*
66  * Allocate and initialize a transport instance handle.
67  * Return hdl pointer for success, NULL for failure.
68  */
69 static exs_hdl_t *
exs_hdl_alloc(fmd_hdl_t * hdl,char * endpoint_id,int (* cb_func)(fmd_hdl_t * hdl,etm_xport_conn_t conn,etm_cb_flag_t flag,void * arg),void * cb_func_arg,int dom)70 exs_hdl_alloc(fmd_hdl_t *hdl, char *endpoint_id,
71     int (*cb_func)(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag,
72     void *arg), void *cb_func_arg, int dom)
73 {
74 	exs_hdl_t *hp;
75 
76 	hp = fmd_hdl_zalloc(hdl, sizeof (exs_hdl_t), FMD_SLEEP);
77 
78 	hp->h_endpt_id = fmd_hdl_strdup(hdl, endpoint_id, FMD_SLEEP);
79 	hp->h_dom = dom;
80 	hp->h_client.c_sd = EXS_SD_FREE;
81 	hp->h_server.c_sd = EXS_SD_FREE;
82 	hp->h_tid = EXS_TID_FREE;
83 	hp->h_destroy = 0;
84 	hp->h_hdl = hdl;
85 	hp->h_cb_func = cb_func;
86 	hp->h_cb_func_arg = cb_func_arg;
87 	hp->h_quit = 0;
88 
89 	return (hp);
90 }
91 
92 /*
93  * dlopen() the platform filter library and dlsym() the filter funcs.
94  */
95 static void
exs_filter_init(fmd_hdl_t * hdl)96 exs_filter_init(fmd_hdl_t *hdl)
97 {
98 	char *propstr = fmd_prop_get_string(hdl, "filter_path");
99 
100 	if (propstr == NULL) {
101 		fmd_hdl_debug(hdl, "No filter plugin specified");
102 		Send_filter = NULL;
103 		Post_filter = NULL;
104 		return;
105 	} else {
106 		if ((Dlp = dlopen(propstr, RTLD_LOCAL | RTLD_NOW)) == NULL) {
107 			fmd_hdl_debug(hdl, "Failed to dlopen filter plugin");
108 			Send_filter = NULL;
109 			Post_filter = NULL;
110 			fmd_prop_free_string(hdl, propstr);
111 			return;
112 		}
113 
114 		if ((Send_filter = (int (*)())dlsym(Dlp, "send_filter"))
115 		    == NULL) {
116 			fmd_hdl_debug(hdl, "failed to dlsym send_filter()");
117 			Send_filter = NULL;
118 		}
119 
120 		if ((Post_filter = (int (*)())dlsym(Dlp, "post_filter"))
121 		    == NULL) {
122 			fmd_hdl_debug(hdl, "failed to dlsym post_filter()");
123 			Post_filter = NULL;
124 		}
125 	}
126 
127 	fmd_prop_free_string(hdl, propstr);
128 }
129 
130 /*
131  * If open, dlclose() the platform filter library.
132  */
133 /*ARGSUSED*/
134 static void
exs_filter_fini(fmd_hdl_t * hdl)135 exs_filter_fini(fmd_hdl_t *hdl)
136 {
137 	if (Dlp != NULL)
138 		(void) dlclose(Dlp);
139 }
140 
141 /*
142  * Translate endpoint_id string to int.
143  * Return the domain ID via "dom_id".
144  * Return 0 for success, nonzero for failure
145  */
146 static int
exs_get_id(fmd_hdl_t * hdl,char * endpoint_id,int * dom_id)147 exs_get_id(fmd_hdl_t *hdl, char *endpoint_id, int *dom_id)
148 {
149 	char *ptr;
150 
151 	if (strstr(endpoint_id, EXS_SP_PREFIX) != NULL) {
152 		/* Remote endpoint is the SP */
153 		*dom_id = DSCP_IDENT_SP;
154 		return (0);
155 	} else {
156 		if ((ptr = strstr(endpoint_id, EXS_DOMAIN_PREFIX)) == NULL) {
157 			fmd_hdl_error(hdl, "Property parsing error : %s not "
158 			    "found in %s. Check event-transport.conf\n",
159 			    EXS_DOMAIN_PREFIX, endpoint_id);
160 			return (1);
161 		}
162 
163 		ptr += EXS_DOMAIN_PREFIX_LEN;
164 
165 		if ((sscanf(ptr, "%d", dom_id)) != 1) {
166 			fmd_hdl_error(hdl, "Property parsing error : no "
167 			    "integer found in %s. Check event-transport.conf\n",
168 			    endpoint_id);
169 			return (2);
170 		}
171 	}
172 
173 	return (0);
174 }
175 
176 /*
177  * Prepare the client connection.
178  * Return 0 for success, nonzero for failure.
179  */
180 static int
exs_prep_client(exs_hdl_t * hp)181 exs_prep_client(exs_hdl_t *hp)
182 {
183 	int rv, optval = 1;
184 	struct linger ling;
185 
186 	/* Find the DSCP address for the remote endpoint */
187 	if ((rv = dscpAddr(hp->h_dom, DSCP_ADDR_REMOTE,
188 	    (struct sockaddr *)&hp->h_client.c_saddr,
189 	    &hp->h_client.c_len)) != DSCP_OK) {
190 		fmd_hdl_debug(hp->h_hdl, "dscpAddr on the client socket "
191 		    "failed for %s : rv = %d\n", hp->h_endpt_id, rv);
192 		return (1);
193 	}
194 
195 	if ((hp->h_client.c_sd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
196 		fmd_hdl_error(hp->h_hdl, "Failed to create the client socket "
197 		    "for %s",  hp->h_endpt_id);
198 		return (2);
199 	}
200 
201 	if (setsockopt(hp->h_client.c_sd, SOL_SOCKET, SO_REUSEADDR,
202 	    &optval, sizeof (optval))) {
203 		fmd_hdl_error(hp->h_hdl, "Failed to set REUSEADDR on the "
204 		    "client socket for %s", hp->h_endpt_id);
205 		EXS_CLOSE_CLR(hp->h_client);
206 		return (3);
207 	}
208 
209 	/*
210 	 * Set SO_LINGER so TCP aborts the connection when closed.
211 	 * If the domain's client socket goes into the TIME_WAIT state,
212 	 * ETM will be unable to connect to the SP until this clears.
213 	 * This connection is over DSCP, which is a simple point-to-point
214 	 * connection and therefore has no routers or multiple forwarding.
215 	 * The risk of receiving old packets from a previously terminated
216 	 * connection is very small.
217 	 */
218 	ling.l_onoff = 1;
219 	ling.l_linger = 0;
220 	if (setsockopt(hp->h_client.c_sd, SOL_SOCKET, SO_LINGER, &ling,
221 	    sizeof (ling))) {
222 		fmd_hdl_error(hp->h_hdl, "Failed to set SO_LINGER on the "
223 		    "client socket for %s", hp->h_endpt_id);
224 		EXS_CLOSE_CLR(hp->h_client);
225 		return (4);
226 	}
227 
228 	/* Bind the socket to the local IP address of the DSCP link */
229 	if ((rv = dscpBind(hp->h_dom, hp->h_client.c_sd,
230 	    EXS_CLIENT_PORT)) != DSCP_OK) {
231 		if (rv == DSCP_ERROR_DOWN) {
232 			fmd_hdl_debug(hp->h_hdl, "xport - dscp link for %s "
233 			    "is down", hp->h_endpt_id);
234 		} else {
235 			fmd_hdl_debug(hp->h_hdl, "dscpBind on the client "
236 			    "socket failed : rv = %d\n", rv);
237 		}
238 		EXS_CLOSE_CLR(hp->h_client);
239 		return (5);
240 	}
241 
242 	hp->h_client.c_saddr.sin_port = htons(EXS_SERVER_PORT);
243 
244 	/* Set IPsec security policy for this socket */
245 	if ((rv = dscpSecure(hp->h_dom, hp->h_client.c_sd)) != DSCP_OK) {
246 		fmd_hdl_error(hp->h_hdl, "dscpSecure on the client socket "
247 		    "failed for %s : rv = %d\n", hp->h_endpt_id, rv);
248 		EXS_CLOSE_CLR(hp->h_client);
249 		return (6);
250 	}
251 
252 	return (0);
253 }
254 
255 /*
256  * Server function/thread.  There is one thread per endpoint.
257  * Accepts incoming connections and notifies ETM of incoming data.
258  */
259 void
exs_server(void * arg)260 exs_server(void *arg)
261 {
262 	exs_hdl_t *hp = (exs_hdl_t *)arg;
263 	struct pollfd pfd;
264 
265 	while (!hp->h_quit) {
266 		pfd.events = POLLIN;
267 		pfd.revents = 0;
268 		pfd.fd = hp->h_server.c_sd;
269 
270 		if (poll(&pfd, 1, -1) <= 0)
271 			continue; /* loop around and check h_quit */
272 
273 		if (pfd.revents & (POLLHUP | POLLERR)) {
274 			fmd_hdl_debug(hp->h_hdl, "xport - poll hangup/err for "
275 			    "%s server socket", hp->h_endpt_id);
276 			EXS_CLOSE_CLR(hp->h_server);
277 			hp->h_destroy++;
278 			break;	/* thread exits */
279 		}
280 
281 		if (pfd.revents & POLLIN) {
282 			/* Notify ETM that incoming data is available */
283 			if (hp->h_cb_func(hp->h_hdl, &hp->h_server,
284 			    ETM_CBFLAG_RECV, hp->h_cb_func_arg)) {
285 				/*
286 				 * For any non-zero return, close the
287 				 * connection and exit the thread.
288 				 */
289 				EXS_CLOSE_CLR(hp->h_server);
290 				hp->h_destroy++;
291 				break;	/* thread exits */
292 			}
293 		}
294 	}
295 
296 	fmd_hdl_debug(hp->h_hdl, "xport - exiting server thread for %s",
297 	    hp->h_endpt_id);
298 }
299 
300 /*
301  * Accept a new incoming connection.
302  */
303 static void
exs_accept(fmd_hdl_t * hdl)304 exs_accept(fmd_hdl_t *hdl)
305 {
306 	int new_sd, dom, flags, rv;
307 	struct sockaddr_in new_saddr;
308 	socklen_t new_len = sizeof (struct sockaddr);
309 	exs_hdl_t *hp;
310 
311 	if ((new_sd = accept(Acc.c_sd, (struct sockaddr *)&new_saddr,
312 	    &new_len)) != -1) {
313 		/* Translate saddr to domain id */
314 		if ((rv = dscpIdent((struct sockaddr *)&new_saddr, (int)new_len,
315 		    &dom)) != DSCP_OK) {
316 			fmd_hdl_error(hdl, "dscpIdent failed : rv = %d\n", rv);
317 			(void) close(new_sd);
318 			return;
319 		}
320 
321 		/* Find the exs_hdl_t for the domain trying to connect */
322 		(void) pthread_mutex_lock(&List_lock);
323 		for (hp = Exh_head; hp; hp = hp->h_next) {
324 			if (hp->h_dom == dom)
325 				break;
326 		}
327 		(void) pthread_mutex_unlock(&List_lock);
328 
329 		if (hp == NULL) {
330 			fmd_hdl_error(hdl, "Not configured to accept a "
331 			    "connection from domain %d. Check "
332 			    "event-transport.conf\n", dom);
333 			(void) close(new_sd);
334 			return;
335 		}
336 
337 		/* Authenticate this connection request */
338 		if ((rv = dscpAuth(dom, (struct sockaddr *)&new_saddr,
339 		    (int)new_len)) != DSCP_OK) {
340 			fmd_hdl_error(hdl, "dscpAuth failed for %s : rv = %d ",
341 			    " Possible spoofing attack\n", hp->h_endpt_id, rv);
342 			(void) close(new_sd);
343 			return;
344 		}
345 
346 		if (hp->h_tid != EXS_TID_FREE) {
347 			hp->h_quit = 1;
348 			fmd_thr_signal(hp->h_hdl, hp->h_tid);
349 			fmd_thr_destroy(hp->h_hdl, hp->h_tid);
350 			hp->h_destroy = 0;
351 			hp->h_quit = 0;
352 		}
353 
354 		if (hp->h_server.c_sd != EXS_SD_FREE)
355 			EXS_CLOSE_CLR(hp->h_server);
356 
357 		/* Set the socket to be non-blocking */
358 		flags = fcntl(new_sd, F_GETFL, 0);
359 		(void) fcntl(new_sd, F_SETFL, flags | O_NONBLOCK);
360 
361 		hp->h_server.c_sd = new_sd;
362 
363 		hp->h_tid = fmd_thr_create(hdl, exs_server, hp);
364 
365 	} else {
366 		fmd_hdl_error(hdl, "Failed to accept() a new connection");
367 	}
368 }
369 
370 /*
371  * Listen for and accept incoming connections.
372  * There is only one such thread.
373  */
374 void
exs_listen(void * arg)375 exs_listen(void *arg)
376 {
377 	fmd_hdl_t *hdl = (fmd_hdl_t *)arg;
378 	struct pollfd pfd;
379 
380 	while (!Acc_quit) {
381 		pfd.events = POLLIN;
382 		pfd.revents = 0;
383 		pfd.fd = Acc.c_sd;
384 
385 		if (poll(&pfd, 1, -1) <= 0)
386 			continue; /* loop around and check Acc_quit */
387 
388 		if (pfd.revents & (POLLHUP | POLLERR)) {
389 			fmd_hdl_debug(hdl, "xport - poll hangup/err on "
390 			    "accept socket");
391 			EXS_CLOSE_CLR(Acc);
392 			Acc_destroy++;
393 			break;	/* thread exits */
394 		}
395 
396 		if (pfd.revents & POLLIN)
397 			exs_accept(hdl);
398 	}
399 
400 	fmd_hdl_debug(hdl, "xport - exiting accept-listen thread");
401 }
402 
403 /*
404  * Prepare to accept a connection.
405  * Return 0 for success, nonzero for failure.
406  */
407 void
exs_prep_accept(fmd_hdl_t * hdl,int dom)408 exs_prep_accept(fmd_hdl_t *hdl, int dom)
409 {
410 	int flags, optval = 1;
411 	int rv;
412 
413 	if (Acc.c_sd != EXS_SD_FREE)
414 		return;	/* nothing to do */
415 
416 	if (Acc_destroy) {
417 		fmd_thr_destroy(hdl, Acc_tid);
418 		Acc_tid = EXS_TID_FREE;
419 	}
420 
421 	/* Check to see if the DSCP interface is configured */
422 	if ((rv = dscpAddr(dom, DSCP_ADDR_LOCAL,
423 	    (struct sockaddr *)&Acc.c_saddr, &Acc.c_len)) != DSCP_OK) {
424 		fmd_hdl_debug(hdl, "xport - dscpAddr on the accept socket "
425 		    "failed for domain %d : rv = %d", dom, rv);
426 		return;
427 	}
428 
429 	if ((Acc.c_sd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
430 		fmd_hdl_error(hdl, "Failed to create the accept socket");
431 		return;
432 	}
433 
434 	if (setsockopt(Acc.c_sd, SOL_SOCKET, SO_REUSEADDR, &optval,
435 	    sizeof (optval))) {
436 		fmd_hdl_error(hdl, "Failed to set REUSEADDR for the accept "
437 		    "socket");
438 		EXS_CLOSE_CLR(Acc);
439 		return;
440 	}
441 
442 	/* Bind the socket to the local IP address of the DSCP link */
443 	if ((rv = dscpBind(dom, Acc.c_sd, EXS_SERVER_PORT)) != DSCP_OK) {
444 		if (rv == DSCP_ERROR_DOWN) {
445 			fmd_hdl_debug(hdl, "xport - dscp link for domain %d "
446 			    "is down", dom);
447 		} else {
448 			fmd_hdl_debug(hdl, "dscpBind on the accept socket "
449 			    "failed : rv = %d\n", rv);
450 		}
451 		EXS_CLOSE_CLR(Acc);
452 		return;
453 	}
454 
455 	/* Activate IPsec security policy for this socket */
456 	if ((rv = dscpSecure(dom, Acc.c_sd)) != DSCP_OK) {
457 		fmd_hdl_error(hdl, "dscpSecure on the accept socket failed : "
458 		    "rv = %d\n", dom, rv);
459 		EXS_CLOSE_CLR(Acc);
460 		return;
461 	}
462 
463 	if ((listen(Acc.c_sd, EXS_NUM_SOCKS)) == -1) {
464 		fmd_hdl_debug(hdl, "Failed to listen() for connections");
465 		EXS_CLOSE_CLR(Acc);
466 		return;
467 	}
468 
469 	flags = fcntl(Acc.c_sd, F_GETFL, 0);
470 	(void) fcntl(Acc.c_sd, F_SETFL, flags | O_NONBLOCK);
471 
472 	Acc_tid = fmd_thr_create(hdl, exs_listen, hdl);
473 }
474 
475 /*
476  * * * * * * * * * * * * * * * * * * * * * * * * * * *
477  * ETM-to-Transport API Connection Management routines
478  * * * * * * * * * * * * * * * * * * * * * * * * * * *
479  */
480 
481 /*
482  * Initialize and setup any transport infrastructure before any connections
483  * are opened.
484  * Return etm_xport_hdl_t for success, NULL for failure.
485  */
486 etm_xport_hdl_t
etm_xport_init(fmd_hdl_t * hdl,char * endpoint_id,int (* cb_func)(fmd_hdl_t * hdl,etm_xport_conn_t conn,etm_cb_flag_t flag,void * arg),void * cb_func_arg)487 etm_xport_init(fmd_hdl_t *hdl, char *endpoint_id,
488     int (*cb_func)(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag,
489     void *arg), void *cb_func_arg)
490 {
491 	exs_hdl_t *hp, *curr;
492 	int dom;
493 
494 	if (exs_get_id(hdl, endpoint_id, &dom))
495 		return (NULL);
496 
497 	(void) pthread_mutex_lock(&List_lock);
498 
499 	/* Check for a duplicate endpoint_id on the list */
500 	for (curr = Exh_head; curr; curr = curr->h_next) {
501 		if (dom == curr->h_dom) {
502 			fmd_hdl_debug(hdl, "xport - init failed, "
503 			    "duplicate domain id : %d\n", dom);
504 			(void) pthread_mutex_unlock(&List_lock);
505 			return (NULL);
506 		}
507 	}
508 
509 	if (Exh_head == NULL) {
510 		/* Do one-time initializations */
511 		exs_filter_init(hdl);
512 
513 		/* Initialize the accept/listen vars */
514 		Acc.c_sd = EXS_SD_FREE;
515 		Acc_tid = EXS_TID_FREE;
516 		Acc_destroy = 0;
517 		Acc_quit = 0;
518 	}
519 
520 	hp = exs_hdl_alloc(hdl, endpoint_id, cb_func, cb_func_arg, dom);
521 
522 	/* Add this transport instance handle to the list */
523 	hp->h_next = Exh_head;
524 	Exh_head = hp;
525 
526 	(void) pthread_mutex_unlock(&List_lock);
527 
528 	exs_prep_accept(hdl, dom);
529 
530 	return ((etm_xport_hdl_t)hp);
531 }
532 
533 /*
534  * Teardown any transport infrastructure after all connections are closed.
535  * Return 0 for success, or nonzero for failure.
536  */
537 int
etm_xport_fini(fmd_hdl_t * hdl,etm_xport_hdl_t tlhdl)538 etm_xport_fini(fmd_hdl_t *hdl, etm_xport_hdl_t tlhdl)
539 {
540 	exs_hdl_t *hp = (exs_hdl_t *)tlhdl;
541 	exs_hdl_t *xp, **ppx;
542 
543 	(void) pthread_mutex_lock(&List_lock);
544 
545 	ppx = &Exh_head;
546 
547 	for (xp = *ppx; xp; xp = xp->h_next) {
548 		if (xp != hp)
549 			ppx = &xp->h_next;
550 		else
551 			break;
552 	}
553 
554 	if (xp != hp) {
555 		(void) pthread_mutex_unlock(&List_lock);
556 		fmd_hdl_abort(hdl, "xport - fini failed, tlhdl %p not on list",
557 		    (void *)hp);
558 	}
559 
560 	*ppx = hp->h_next;
561 	hp->h_next = NULL;
562 
563 	if (hp->h_tid != EXS_TID_FREE) {
564 		hp->h_quit = 1;
565 		fmd_thr_signal(hdl, hp->h_tid);
566 		fmd_thr_destroy(hdl, hp->h_tid);
567 	}
568 
569 	if (hp->h_server.c_sd != EXS_SD_FREE)
570 		(void) close(hp->h_server.c_sd);
571 
572 	if (hp->h_client.c_sd != EXS_SD_FREE)
573 		(void) close(hp->h_client.c_sd);
574 
575 	fmd_hdl_strfree(hdl, hp->h_endpt_id);
576 	fmd_hdl_free(hdl, hp, sizeof (exs_hdl_t));
577 
578 	if (Exh_head == NULL) {
579 		/* Undo one-time initializations */
580 		exs_filter_fini(hdl);
581 
582 		/* Destroy the accept/listen thread */
583 		if (Acc_tid != EXS_TID_FREE) {
584 			Acc_quit = 1;
585 			fmd_thr_signal(hdl, Acc_tid);
586 			fmd_thr_destroy(hdl, Acc_tid);
587 		}
588 
589 		if (Acc.c_sd != EXS_SD_FREE)
590 			EXS_CLOSE_CLR(Acc);
591 	}
592 
593 	(void) pthread_mutex_unlock(&List_lock);
594 
595 	return (0);
596 }
597 
598 /*
599  * Open a connection with the given endpoint,
600  * Return etm_xport_conn_t for success, NULL and set errno for failure.
601  */
602 etm_xport_conn_t
etm_xport_open(fmd_hdl_t * hdl,etm_xport_hdl_t tlhdl)603 etm_xport_open(fmd_hdl_t *hdl, etm_xport_hdl_t tlhdl)
604 {
605 	int flags;
606 	exs_hdl_t *hp = (exs_hdl_t *)tlhdl;
607 
608 	if (hp->h_destroy) {
609 		fmd_thr_destroy(hp->h_hdl, hp->h_tid);
610 		hp->h_tid = EXS_TID_FREE;
611 		hp->h_destroy = 0;
612 	}
613 
614 	if (hp->h_client.c_sd == EXS_SD_FREE) {
615 		if (exs_prep_client(hp) != 0)
616 			return (NULL);
617 	}
618 
619 	/* Set the socket to be non-blocking */
620 	flags = fcntl(hp->h_client.c_sd, F_GETFL, 0);
621 	(void) fcntl(hp->h_client.c_sd, F_SETFL, flags | O_NONBLOCK);
622 
623 	if ((connect(hp->h_client.c_sd,
624 	    (struct sockaddr *)&hp->h_client.c_saddr,
625 	    hp->h_client.c_len)) == -1) {
626 		if (errno != EINPROGRESS) {
627 			fmd_hdl_debug(hdl, "xport - failed to connect to %s",
628 			    hp->h_endpt_id);
629 			EXS_CLOSE_CLR(hp->h_client);
630 			return (NULL);
631 		}
632 	}
633 
634 	fmd_hdl_debug(hdl, "xport - connected client socket for %s",
635 	    hp->h_endpt_id);
636 
637 	return (&hp->h_client);
638 }
639 
640 /*
641  * Close a connection from either endpoint.
642  * Return zero for success, nonzero for failure.
643  */
644 /*ARGSUSED*/
645 int
etm_xport_close(fmd_hdl_t * hdl,etm_xport_conn_t conn)646 etm_xport_close(fmd_hdl_t *hdl, etm_xport_conn_t conn)
647 {
648 	exs_conn_t *cp = (exs_conn_t *)conn;
649 
650 	if (cp->c_sd == EXS_SD_FREE)
651 		return (0);	/* Connection already closed */
652 
653 	(void) close(cp->c_sd);
654 	cp->c_sd = EXS_SD_FREE;
655 
656 	return (0);
657 }
658 
659 /*
660  * * * * * * * * * * * * * * * * * *
661  * ETM-to-Transport API I/O routines
662  * * * * * * * * * * * * * * * * * *
663  */
664 
665 /*
666  * Try to read byte_cnt bytes from the connection into the given buffer.
667  * Return how many bytes actually read for success, negative value for failure.
668  */
669 ssize_t
etm_xport_read(fmd_hdl_t * hdl,etm_xport_conn_t conn,hrtime_t timeout,void * buf,size_t byte_cnt)670 etm_xport_read(fmd_hdl_t *hdl, etm_xport_conn_t conn, hrtime_t timeout,
671     void *buf, size_t byte_cnt)
672 {
673 	ssize_t len, nbytes = 0;
674 	hrtime_t endtime, sleeptime;
675 	struct timespec tms;
676 	char *ptr = (char *)buf;
677 	exs_conn_t *cp = (exs_conn_t *)conn;
678 
679 	if (cp->c_sd == EXS_SD_FREE) {
680 		fmd_hdl_debug(hdl, "xport - read socket %d is closed\n",
681 		    cp->c_sd);
682 		return (-EBADF);
683 	}
684 
685 	endtime = gethrtime() + timeout;
686 	sleeptime = timeout / EXS_IO_SLEEP_DIV;
687 
688 	tms.tv_sec = 0;
689 	tms.tv_nsec = sleeptime;
690 
691 	while (nbytes < byte_cnt) {
692 		if (gethrtime() < endtime) {
693 			if ((len = recv(cp->c_sd, ptr, byte_cnt - nbytes,
694 			    0)) < 0) {
695 				if (errno != EINTR && errno != EWOULDBLOCK) {
696 					fmd_hdl_debug(hdl, "xport - recv "
697 					    "failed for socket %d", cp->c_sd);
698 				}
699 
700 				(void) nanosleep(&tms, 0);
701 				continue;
702 			} else if (len == 0) {
703 				fmd_hdl_debug(hdl, "xport - remote endpt "
704 				    "closed for socket %d", cp->c_sd);
705 				return (0);
706 			}
707 
708 			ptr += len;
709 			nbytes += len;
710 		} else {
711 			fmd_hdl_debug(hdl, "xport - read timed out for socket "
712 			    "%d", cp->c_sd);
713 			break;
714 		}
715 	}
716 
717 	if (nbytes)
718 		return (nbytes);
719 	else
720 		return (-1);
721 }
722 
723 /*
724  * Try to write byte_cnt bytes to the connection from the given buffer.
725  * Return how many bytes actually written for success, negative value
726  * for failure.
727  */
728 ssize_t
etm_xport_write(fmd_hdl_t * hdl,etm_xport_conn_t conn,hrtime_t timeout,void * buf,size_t byte_cnt)729 etm_xport_write(fmd_hdl_t *hdl, etm_xport_conn_t conn, hrtime_t timeout,
730     void *buf, size_t byte_cnt)
731 {
732 	ssize_t len, nbytes = 0;
733 	hrtime_t endtime, sleeptime;
734 	struct timespec tms;
735 	char *ptr = (char *)buf;
736 	exs_conn_t *cp = (exs_conn_t *)conn;
737 
738 	if (cp->c_sd == EXS_SD_FREE) {
739 		fmd_hdl_debug(hdl, "xport - write socket %d is closed\n",
740 		    cp->c_sd);
741 		return (-EBADF);
742 	}
743 
744 	endtime = gethrtime() + timeout;
745 	sleeptime = timeout / EXS_IO_SLEEP_DIV;
746 
747 	tms.tv_sec = 0;
748 	tms.tv_nsec = sleeptime;
749 
750 	while (nbytes < byte_cnt) {
751 		if (gethrtime() < endtime) {
752 			if ((len = send(cp->c_sd, ptr, byte_cnt - nbytes,
753 			    0)) < 0) {
754 				if (errno != EINTR && errno != EWOULDBLOCK) {
755 					fmd_hdl_debug(hdl, "xport - send "
756 					    "failed for socket %d", cp->c_sd);
757 				}
758 
759 				(void) nanosleep(&tms, 0);
760 				continue;
761 			}
762 
763 			ptr += len;
764 			nbytes += len;
765 		} else {
766 			fmd_hdl_debug(hdl, "xport - write timed out for socket "
767 			    "%d", cp->c_sd);
768 			break;
769 		}
770 	}
771 
772 	if (nbytes)
773 		return (nbytes);
774 	else
775 		return (-1);
776 }
777 
778 /*
779  * * * * * * * * * * * * * * * * * * * *
780  * ETM-to-Transport API Filter routines
781  * * * * * * * * * * * * * * * * * * * *
782  */
783 
784 /*
785  * Call the platform's send_filter function.
786  * Otherwise return ETM_XPORT_FILTER_OK.
787  */
788 int
etm_xport_send_filter(fmd_hdl_t * hdl,nvlist_t * event,const char * dest)789 etm_xport_send_filter(fmd_hdl_t *hdl, nvlist_t *event, const char *dest)
790 {
791 	if (Send_filter != NULL)
792 		return (Send_filter(hdl, event, dest));
793 	else
794 		return (ETM_XPORT_FILTER_OK);
795 }
796 
797 /*
798  * Call the platform's post_filter function.
799  * Otherwise return ETM_XPORT_FILTER_OK.
800  */
801 int
etm_xport_post_filter(fmd_hdl_t * hdl,nvlist_t * event,const char * src)802 etm_xport_post_filter(fmd_hdl_t *hdl, nvlist_t *event, const char *src)
803 {
804 	if (Post_filter != NULL)
805 		return (Post_filter(hdl, event, src));
806 	else
807 		return (ETM_XPORT_FILTER_OK);
808 }
809