1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License, Version 1.0 only
6 * (the "License"). You may not use this file except in compliance
7 * with the License.
8 *
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or http://www.opensolaris.org/os/licensing.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
13 *
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
19 *
20 * CDDL HEADER END
21 */
22 /*
23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 */
26
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <syslog.h>
30 #include <string.h>
31 #include <thread.h>
32 #include <synch.h>
33 #include <slp-internal.h>
34
35 /* This is used to pass needed params to consumer_thr and slp_call */
36 struct thr_call_args {
37 slp_handle_impl_t *hp;
38 SLPGenericAppCB *cb;
39 void *cookie;
40 SLPMsgReplyCB *msg_cb;
41 slp_target_list_t *targets;
42 };
43
44 static void *consumer(void *);
45 static void *slp_call(void *);
46 static SLPError check_message_fit(slp_handle_impl_t *, slp_target_list_t *);
47
slp_ua_common(SLPHandle hSLP,const char * scopes,SLPGenericAppCB cb,void * cookie,SLPMsgReplyCB msg_cb)48 SLPError slp_ua_common(SLPHandle hSLP, const char *scopes,
49 SLPGenericAppCB cb, void *cookie, SLPMsgReplyCB msg_cb)
50 {
51 slp_handle_impl_t *hp;
52 slp_target_list_t *targets;
53 struct thr_call_args *args;
54 slp_queue_t *q;
55 SLPError err;
56 thread_t tid;
57 int terr;
58
59 hp = (slp_handle_impl_t *)hSLP;
60
61 /* select targets */
62 if ((err = slp_new_target_list(hp, scopes, &targets)) != SLP_OK)
63 return (err);
64 if ((err = check_message_fit(hp, targets)) != SLP_OK) {
65 slp_destroy_target_list(targets);
66 return (err);
67 }
68
69 /* populate the args structure */
70 args = malloc(sizeof (*args));
71 if (args == NULL) {
72 slp_err(LOG_CRIT, 0, "ua_common", "out of memory");
73 return (SLP_MEMORY_ALLOC_FAILED);
74 }
75
76 args->hp = hp;
77 args->cb = cb;
78 args->cookie = cookie;
79 args->msg_cb = msg_cb;
80 args->targets = targets;
81
82 /* create the queue that this call will use */
83 q = slp_new_queue(&err); /* freed in consumer_thr */
84 if (err != SLP_OK)
85 goto error;
86 hp->q = q;
87
88 /* kick off the producer thread */
89 if ((terr = thr_create(NULL, 0, slp_call, args, 0, &tid)) != 0) {
90 slp_err(LOG_CRIT, 0, "ua_common", "could not start thread: %s",
91 strerror(terr));
92 err = SLP_INTERNAL_SYSTEM_ERROR;
93 goto error;
94 }
95 hp->producer_tid = tid;
96
97 if (hp->async) {
98 /* kick off the consumer thread */
99 if ((terr = thr_create(NULL, 0, consumer,
100 args, 0, NULL)) != 0) {
101 slp_err(LOG_CRIT, 0, "ua_common",
102 "could not start thread: %s",
103 strerror(terr));
104 err = SLP_INTERNAL_SYSTEM_ERROR;
105 /* cleanup producer thread, if necessary */
106 hp->cancel = 1;
107 (void) thr_join(tid, NULL, NULL);
108
109 goto error;
110 }
111 return (SLP_OK);
112 }
113 /* else sync */
114 return ((SLPError)consumer(args));
115 error:
116 free(args);
117 return (err);
118 }
119
120 static void *
consumer(void * ap)121 consumer(void *ap)
122 {
123 slp_handle_impl_t *hp;
124 char *reply;
125 void *collator;
126 int numResults = 0;
127 struct thr_call_args *args = (struct thr_call_args *)ap;
128
129 hp = args->hp;
130 collator = NULL;
131 hp->consumer_tid = thr_self();
132 /* while cb wants more and there is more to get ... */
133 for (;;) {
134 SLPBoolean cont;
135
136 reply = slp_dequeue(hp->q);
137 /* reply == NULL if no more available or SLPClosed */
138 cont = args->msg_cb(hp, reply, args->cb, args->cookie,
139 &collator, &numResults);
140
141 if (reply) {
142 free(reply);
143 } else {
144 break;
145 }
146
147 if (!cont) {
148 /* cb doesn't want any more; invoke last call */
149 args->msg_cb(hp, NULL, args->cb, args->cookie,
150 &collator, &numResults);
151 break;
152 }
153 }
154 /* cleanup */
155 /* clean stop producer [thread] */
156 hp->cancel = 1;
157 (void) thr_join(hp->producer_tid, NULL, NULL);
158
159 /* empty and free queue */
160 slp_flush_queue(hp->q, free);
161 slp_destroy_queue(hp->q);
162
163 free(args);
164 slp_end_call(hp);
165 return ((void *)SLP_OK);
166 }
167
168 /*
169 * This is the producer thread
170 */
171 static void *
slp_call(void * ap)172 slp_call(void *ap)
173 {
174 struct thr_call_args *args = (struct thr_call_args *)ap;
175 slp_target_t *t;
176 const char *uc_scopes, *mc_scopes;
177 SLPBoolean use_tcp = SLP_FALSE;
178 size_t len;
179
180 /* Unicast */
181 if (uc_scopes = slp_get_uc_scopes(args->targets)) {
182 size_t mtu;
183 int i;
184
185 /* calculate msg length */
186 len = slp_hdrlang_length(args->hp);
187 for (i = 0; i < args->hp->msg.iovlen; i++) {
188 len += args->hp->msg.iov[i].iov_len;
189 }
190 len += strlen(uc_scopes);
191
192 mtu = slp_get_mtu();
193 if (len > mtu)
194 use_tcp = SLP_TRUE;
195
196 for (t = slp_next_uc_target(args->targets); t != NULL;
197 t = slp_next_uc_target(args->targets)) {
198 if (args->hp->cancel)
199 break;
200
201 if (use_tcp)
202 slp_uc_tcp_send(args->hp, t, uc_scopes,
203 SLP_FALSE, 0);
204 else
205 slp_uc_udp_send(args->hp, t, uc_scopes);
206 }
207 }
208
209 /* Multicast */
210 if ((!args->hp->cancel) &&
211 (mc_scopes = slp_get_mc_scopes(args->targets)))
212 slp_mc_send(args->hp, mc_scopes);
213
214 /* Wait for TCP to complete, if necessary */
215 if (args->hp->tcp_lock)
216 slp_tcp_wait(args->hp);
217
218 slp_destroy_target_list(args->targets);
219
220 /* free the message */
221 free(args->hp->msg.iov);
222 free(args->hp->msg.msg);
223
224 /* null terminate message queue */
225 (void) slp_enqueue(args->hp->q, NULL);
226
227 thr_exit(NULL); /* we're outa here */
228 }
229
230 /*
231 * If the message to be sent needs to be multicast, check that it
232 * can fit into a datagram. If not, return BUFFER_OVERFLOW, otherwise
233 * return SLP_OK.
234 */
check_message_fit(slp_handle_impl_t * hp,slp_target_list_t * targets)235 static SLPError check_message_fit(slp_handle_impl_t *hp,
236 slp_target_list_t *targets) {
237 size_t msgSize;
238 int i;
239 const char *mc_scopes;
240
241 if (!(mc_scopes = slp_get_mc_scopes(targets)))
242 return (SLP_OK); /* no mc targets to worry about */
243
244 msgSize = slp_hdrlang_length(hp);
245 for (i = 0; i < hp->msg.iovlen; i++) {
246 msgSize += hp->msg.iov[i].iov_len;
247 }
248 msgSize += strlen(mc_scopes);
249
250 if (msgSize > slp_get_mtu())
251 return (SLP_BUFFER_OVERFLOW);
252 return (SLP_OK);
253 }
254