1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #include <stdio.h> 28 #include <stdlib.h> 29 #include <syslog.h> 30 #include <string.h> 31 #include <thread.h> 32 #include <synch.h> 33 #include <slp-internal.h> 34 35 /* This is used to pass needed params to consumer_thr and slp_call */ 36 struct thr_call_args { 37 slp_handle_impl_t *hp; 38 SLPGenericAppCB *cb; 39 void *cookie; 40 SLPMsgReplyCB *msg_cb; 41 slp_target_list_t *targets; 42 }; 43 44 static void *consumer(void *); 45 static void *slp_call(void *); 46 static SLPError check_message_fit(slp_handle_impl_t *, slp_target_list_t *); 47 48 SLPError slp_ua_common(SLPHandle hSLP, const char *scopes, 49 SLPGenericAppCB cb, void *cookie, SLPMsgReplyCB msg_cb) 50 { 51 slp_handle_impl_t *hp; 52 slp_target_list_t *targets; 53 struct thr_call_args *args; 54 slp_queue_t *q; 55 SLPError err; 56 thread_t tid; 57 int terr; 58 59 hp = (slp_handle_impl_t *)hSLP; 60 61 /* select targets */ 62 if ((err = slp_new_target_list(hp, scopes, &targets)) != SLP_OK) 63 return (err); 64 if ((err = check_message_fit(hp, targets)) != SLP_OK) { 65 slp_destroy_target_list(targets); 66 return (err); 67 } 68 69 /* populate the args structure */ 70 args = malloc(sizeof (*args)); 71 if (args == NULL) { 72 slp_err(LOG_CRIT, 0, "ua_common", "out of memory"); 73 return (SLP_MEMORY_ALLOC_FAILED); 74 } 75 76 args->hp = hp; 77 args->cb = cb; 78 args->cookie = cookie; 79 args->msg_cb = msg_cb; 80 args->targets = targets; 81 82 /* create the queue that this call will use */ 83 q = slp_new_queue(&err); /* freed in consumer_thr */ 84 if (err != SLP_OK) 85 goto error; 86 hp->q = q; 87 88 /* kick off the producer thread */ 89 if ((terr = thr_create(NULL, 0, slp_call, args, 0, &tid)) != 0) { 90 slp_err(LOG_CRIT, 0, "ua_common", "could not start thread: %s", 91 strerror(terr)); 92 err = SLP_INTERNAL_SYSTEM_ERROR; 93 goto error; 94 } 95 hp->producer_tid = tid; 96 97 if (hp->async) { 98 /* kick off the consumer thread */ 99 if ((terr = thr_create(NULL, 0, consumer, 100 args, 0, NULL)) != 0) { 101 slp_err(LOG_CRIT, 0, "ua_common", 102 "could not start thread: %s", 103 strerror(terr)); 104 err = SLP_INTERNAL_SYSTEM_ERROR; 105 /* cleanup producer thread, if necessary */ 106 hp->cancel = 1; 107 (void) thr_join(tid, NULL, NULL); 108 109 goto error; 110 } 111 return (SLP_OK); 112 } 113 /* else sync */ 114 return ((SLPError)consumer(args)); 115 error: 116 free(args); 117 return (err); 118 } 119 120 static void * 121 consumer(void *ap) 122 { 123 slp_handle_impl_t *hp; 124 char *reply; 125 void *collator; 126 int numResults = 0; 127 struct thr_call_args *args = (struct thr_call_args *)ap; 128 129 hp = args->hp; 130 collator = NULL; 131 hp->consumer_tid = thr_self(); 132 /* while cb wants more and there is more to get ... */ 133 for (;;) { 134 SLPBoolean cont; 135 136 reply = slp_dequeue(hp->q); 137 /* reply == NULL if no more available or SLPClosed */ 138 cont = args->msg_cb(hp, reply, args->cb, args->cookie, 139 &collator, &numResults); 140 141 if (reply) { 142 free(reply); 143 } else { 144 break; 145 } 146 147 if (!cont) { 148 /* cb doesn't want any more; invoke last call */ 149 args->msg_cb(hp, NULL, args->cb, args->cookie, 150 &collator, &numResults); 151 break; 152 } 153 } 154 /* cleanup */ 155 /* clean stop producer [thread] */ 156 hp->cancel = 1; 157 (void) thr_join(hp->producer_tid, NULL, NULL); 158 159 /* empty and free queue */ 160 slp_flush_queue(hp->q, free); 161 slp_destroy_queue(hp->q); 162 163 free(args); 164 slp_end_call(hp); 165 return ((void *)SLP_OK); 166 } 167 168 /* 169 * This is the producer thread 170 */ 171 static void * 172 slp_call(void *ap) 173 { 174 struct thr_call_args *args = (struct thr_call_args *)ap; 175 slp_target_t *t; 176 const char *uc_scopes, *mc_scopes; 177 SLPBoolean use_tcp = SLP_FALSE; 178 size_t len; 179 180 /* Unicast */ 181 if (uc_scopes = slp_get_uc_scopes(args->targets)) { 182 size_t mtu; 183 int i; 184 185 /* calculate msg length */ 186 len = slp_hdrlang_length(args->hp); 187 for (i = 0; i < args->hp->msg.iovlen; i++) { 188 len += args->hp->msg.iov[i].iov_len; 189 } 190 len += strlen(uc_scopes); 191 192 mtu = slp_get_mtu(); 193 if (len > mtu) 194 use_tcp = SLP_TRUE; 195 196 for (t = slp_next_uc_target(args->targets); t != NULL; 197 t = slp_next_uc_target(args->targets)) { 198 if (args->hp->cancel) 199 break; 200 201 if (use_tcp) 202 slp_uc_tcp_send(args->hp, t, uc_scopes, 203 SLP_FALSE, 0); 204 else 205 slp_uc_udp_send(args->hp, t, uc_scopes); 206 } 207 } 208 209 /* Multicast */ 210 if ((!args->hp->cancel) && 211 (mc_scopes = slp_get_mc_scopes(args->targets))) 212 slp_mc_send(args->hp, mc_scopes); 213 214 /* Wait for TCP to complete, if necessary */ 215 if (args->hp->tcp_lock) 216 slp_tcp_wait(args->hp); 217 218 slp_destroy_target_list(args->targets); 219 220 /* free the message */ 221 free(args->hp->msg.iov); 222 free(args->hp->msg.msg); 223 224 /* null terminate message queue */ 225 (void) slp_enqueue(args->hp->q, NULL); 226 227 thr_exit(NULL); /* we're outa here */ 228 } 229 230 /* 231 * If the message to be sent needs to be multicast, check that it 232 * can fit into a datagram. If not, return BUFFER_OVERFLOW, otherwise 233 * return SLP_OK. 234 */ 235 static SLPError check_message_fit(slp_handle_impl_t *hp, 236 slp_target_list_t *targets) { 237 size_t msgSize; 238 int i; 239 const char *mc_scopes; 240 241 if (!(mc_scopes = slp_get_mc_scopes(targets))) 242 return (SLP_OK); /* no mc targets to worry about */ 243 244 msgSize = slp_hdrlang_length(hp); 245 for (i = 0; i < hp->msg.iovlen; i++) { 246 msgSize += hp->msg.iov[i].iov_len; 247 } 248 msgSize += strlen(mc_scopes); 249 250 if (msgSize > slp_get_mtu()) 251 return (SLP_BUFFER_OVERFLOW); 252 return (SLP_OK); 253 } 254