1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 #pragma ident "%Z%%M% %I% %E% SMI" 28 29 #include <stdio.h> 30 #include <stdlib.h> 31 #include <syslog.h> 32 #include <string.h> 33 #include <thread.h> 34 #include <synch.h> 35 #include <slp-internal.h> 36 37 /* This is used to pass needed params to consumer_thr and slp_call */ 38 struct thr_call_args { 39 slp_handle_impl_t *hp; 40 SLPGenericAppCB *cb; 41 void *cookie; 42 SLPMsgReplyCB *msg_cb; 43 slp_target_list_t *targets; 44 }; 45 46 static SLPError consumer(void *); 47 static void slp_call(void *); 48 static SLPError check_message_fit(slp_handle_impl_t *, slp_target_list_t *); 49 50 SLPError slp_ua_common(SLPHandle hSLP, const char *scopes, 51 SLPGenericAppCB cb, void *cookie, 52 SLPMsgReplyCB msg_cb) { 53 slp_handle_impl_t *hp; 54 slp_target_list_t *targets; 55 struct thr_call_args *args; 56 slp_queue_t *q; 57 SLPError err; 58 thread_t tid; 59 int terr; 60 61 hp = (slp_handle_impl_t *)hSLP; 62 63 /* select targets */ 64 if ((err = slp_new_target_list(hp, scopes, &targets)) != SLP_OK) 65 return (err); 66 if ((err = check_message_fit(hp, targets)) != SLP_OK) { 67 slp_destroy_target_list(targets); 68 return (err); 69 } 70 71 /* populate the args structure */ 72 args = malloc(sizeof (*args)); 73 if (args == NULL) { 74 slp_err(LOG_CRIT, 0, "ua_common", "out of memory"); 75 return (SLP_MEMORY_ALLOC_FAILED); 76 } 77 78 args->hp = hp; 79 args->cb = cb; 80 args->cookie = cookie; 81 args->msg_cb = msg_cb; 82 args->targets = targets; 83 84 /* create the queue that this call will use */ 85 q = slp_new_queue(&err); /* freed in consumer_thr */ 86 if (err != SLP_OK) 87 goto error; 88 hp->q = q; 89 90 /* kick off the producer thread */ 91 if ((terr = thr_create( 92 NULL, 0, (void *(*)(void *)) slp_call, args, 0, &tid)) != 0) { 93 slp_err(LOG_CRIT, 0, "ua_common", "could not start thread: %s", 94 strerror(terr)); 95 err = SLP_INTERNAL_SYSTEM_ERROR; 96 goto error; 97 } 98 hp->producer_tid = tid; 99 100 if (hp->async) { 101 /* kick off the consumer thread */ 102 if ((terr = thr_create( 103 NULL, 0, (void *(*)(void *))consumer, 104 args, 0, NULL)) != 0) { 105 slp_err(LOG_CRIT, 0, "ua_common", 106 "could not start thread: %s", 107 strerror(terr)); 108 err = SLP_INTERNAL_SYSTEM_ERROR; 109 /* cleanup producer thread, if necessary */ 110 hp->cancel = 1; 111 (void) thr_join(tid, NULL, NULL); 112 113 goto error; 114 } 115 return (SLP_OK); 116 } 117 /* else sync */ 118 return (consumer(args)); 119 error: 120 free(args); 121 return (err); 122 } 123 124 static SLPError consumer(void *ap) { 125 slp_handle_impl_t *hp; 126 char *reply; 127 void *collator; 128 int numResults = 0; 129 struct thr_call_args *args = (struct thr_call_args *)ap; 130 131 hp = args->hp; 132 collator = NULL; 133 hp->consumer_tid = thr_self(); 134 /* while cb wants more and there is more to get ... */ 135 for (;;) { 136 SLPBoolean cont; 137 138 reply = slp_dequeue(hp->q); 139 /* reply == NULL if no more available or SLPClosed */ 140 cont = args->msg_cb(hp, reply, args->cb, args->cookie, 141 &collator, &numResults); 142 143 if (reply) { 144 free(reply); 145 } else { 146 break; 147 } 148 149 if (!cont) { 150 /* cb doesn't want any more; invoke last call */ 151 args->msg_cb(hp, NULL, args->cb, args->cookie, 152 &collator, &numResults); 153 break; 154 } 155 } 156 /* cleanup */ 157 /* clean stop producer [thread] */ 158 hp->cancel = 1; 159 (void) thr_join(hp->producer_tid, NULL, NULL); 160 161 /* empty and free queue */ 162 slp_flush_queue(hp->q, free); 163 slp_destroy_queue(hp->q); 164 165 free(args); 166 slp_end_call(hp); 167 return (SLP_OK); 168 } 169 170 /* 171 * This is the producer thread 172 */ 173 static void slp_call(void *ap) { 174 struct thr_call_args *args = (struct thr_call_args *)ap; 175 slp_target_t *t; 176 const char *uc_scopes, *mc_scopes; 177 SLPBoolean use_tcp = SLP_FALSE; 178 size_t len; 179 180 /* Unicast */ 181 if (uc_scopes = slp_get_uc_scopes(args->targets)) { 182 size_t mtu; 183 int i; 184 185 /* calculate msg length */ 186 len = slp_hdrlang_length(args->hp); 187 for (i = 0; i < args->hp->msg.iovlen; i++) { 188 len += args->hp->msg.iov[i].iov_len; 189 } 190 len += strlen(uc_scopes); 191 192 mtu = slp_get_mtu(); 193 if (len > mtu) 194 use_tcp = SLP_TRUE; 195 196 for ( 197 t = slp_next_uc_target(args->targets); 198 t; 199 t = slp_next_uc_target(args->targets)) { 200 if (args->hp->cancel) 201 break; 202 203 if (use_tcp) 204 slp_uc_tcp_send(args->hp, t, uc_scopes, 205 SLP_FALSE, 0); 206 else 207 slp_uc_udp_send(args->hp, t, uc_scopes); 208 } 209 } 210 211 /* Multicast */ 212 if ((!args->hp->cancel) && 213 (mc_scopes = slp_get_mc_scopes(args->targets))) 214 slp_mc_send(args->hp, mc_scopes); 215 216 /* Wait for TCP to complete, if necessary */ 217 if (args->hp->tcp_lock) 218 slp_tcp_wait(args->hp); 219 220 slp_destroy_target_list(args->targets); 221 222 /* free the message */ 223 free(args->hp->msg.iov); 224 free(args->hp->msg.msg); 225 226 /* null terminate message queue */ 227 (void) slp_enqueue(args->hp->q, NULL); 228 229 thr_exit(NULL); /* we're outa here */ 230 } 231 232 /* 233 * If the message to be sent needs to be multicast, check that it 234 * can fit into a datagram. If not, return BUFFER_OVERFLOW, otherwise 235 * return SLP_OK. 236 */ 237 static SLPError check_message_fit(slp_handle_impl_t *hp, 238 slp_target_list_t *targets) { 239 size_t msgSize; 240 int i; 241 const char *mc_scopes; 242 243 if (!(mc_scopes = slp_get_mc_scopes(targets))) 244 return (SLP_OK); /* no mc targets to worry about */ 245 246 msgSize = slp_hdrlang_length(hp); 247 for (i = 0; i < hp->msg.iovlen; i++) { 248 msgSize += hp->msg.iov[i].iov_len; 249 } 250 msgSize += strlen(mc_scopes); 251 252 if (msgSize > slp_get_mtu()) 253 return (SLP_BUFFER_OVERFLOW); 254 return (SLP_OK); 255 } 256