1 /* 2 * Copyright (c) 2004-2006 Voltaire, Inc. All rights reserved. 3 * Copyright (c) 2002-2005 Mellanox Technologies LTD. All rights reserved. 4 * Copyright (c) 1996-2003 Intel Corporation. All rights reserved. 5 * 6 * This software is available to you under a choice of one of two 7 * licenses. You may choose to be licensed under the terms of the GNU 8 * General Public License (GPL) Version 2, available from the file 9 * COPYING in the main directory of this source tree, or the 10 * OpenIB.org BSD license below: 11 * 12 * Redistribution and use in source and binary forms, with or 13 * without modification, are permitted provided that the following 14 * conditions are met: 15 * 16 * - Redistributions of source code must retain the above 17 * copyright notice, this list of conditions and the following 18 * disclaimer. 19 * 20 * - Redistributions in binary form must reproduce the above 21 * copyright notice, this list of conditions and the following 22 * disclaimer in the documentation and/or other materials 23 * provided with the distribution. 24 * 25 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 26 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 27 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 28 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 29 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 30 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 31 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 32 * SOFTWARE. 33 * 34 */ 35 36 /* 37 * Abstract: 38 * Implementation of Dispatcher abstraction. 39 * 40 */ 41 42 #if HAVE_CONFIG_H 43 # include <config.h> 44 #endif /* HAVE_CONFIG_H */ 45 46 #include <stdlib.h> 47 #include <complib/cl_dispatcher.h> 48 #include <complib/cl_thread.h> 49 #include <complib/cl_timer.h> 50 51 /* give some guidance when we build our cl_pool of messages */ 52 #define CL_DISP_INITIAL_MSG_COUNT 256 53 #define CL_DISP_MSG_GROW_SIZE 64 54 55 /* give some guidance when we build our cl_pool of registration elements */ 56 #define CL_DISP_INITIAL_REG_COUNT 16 57 #define CL_DISP_REG_GROW_SIZE 16 58 59 /******************************************************************** 60 __cl_disp_worker 61 62 Description: 63 This function takes messages off the FIFO and calls Processmsg() 64 This function executes as passive level. 65 66 Inputs: 67 p_disp - Pointer to Dispatcher object 68 69 Outputs: 70 None 71 72 Returns: 73 None 74 ********************************************************************/ 75 void __cl_disp_worker(IN void *context) 76 { 77 cl_disp_msg_t *p_msg; 78 cl_dispatcher_t *p_disp = (cl_dispatcher_t *) context; 79 80 cl_spinlock_acquire(&p_disp->lock); 81 82 /* Process the FIFO until we drain it dry. */ 83 while (cl_qlist_count(&p_disp->msg_fifo)) { 84 /* Pop the message at the head from the FIFO. */ 85 p_msg = 86 (cl_disp_msg_t *) cl_qlist_remove_head(&p_disp->msg_fifo); 87 88 /* we track the tim ethe last message spent in the queue */ 89 p_disp->last_msg_queue_time_us = 90 cl_get_time_stamp() - p_msg->in_time; 91 92 /* 93 * Release the spinlock while the message is processed. 94 * The user's callback may reenter the dispatcher 95 * and cause the lock to be reaquired. 96 */ 97 cl_spinlock_release(&p_disp->lock); 98 p_msg->p_dest_reg->pfn_rcv_callback((void *)p_msg->p_dest_reg-> 99 context, 100 (void *)p_msg->p_data); 101 102 cl_atomic_dec(&p_msg->p_dest_reg->ref_cnt); 103 104 /* The client has seen the data. Notify the sender as appropriate. */ 105 if (p_msg->pfn_xmt_callback) { 106 p_msg->pfn_xmt_callback((void *)p_msg->context, 107 (void *)p_msg->p_data); 108 cl_atomic_dec(&p_msg->p_src_reg->ref_cnt); 109 } 110 111 /* Grab the lock for the next iteration through the list. */ 112 cl_spinlock_acquire(&p_disp->lock); 113 114 /* Return this message to the pool. */ 115 cl_qpool_put(&p_disp->msg_pool, (cl_pool_item_t *) p_msg); 116 } 117 118 cl_spinlock_release(&p_disp->lock); 119 } 120 121 void cl_disp_construct(IN cl_dispatcher_t * const p_disp) 122 { 123 CL_ASSERT(p_disp); 124 125 cl_qlist_init(&p_disp->reg_list); 126 cl_ptr_vector_construct(&p_disp->reg_vec); 127 cl_qlist_init(&p_disp->msg_fifo); 128 cl_spinlock_construct(&p_disp->lock); 129 cl_qpool_construct(&p_disp->msg_pool); 130 } 131 132 void cl_disp_shutdown(IN cl_dispatcher_t * const p_disp) 133 { 134 CL_ASSERT(p_disp); 135 136 /* Stop the thread pool. */ 137 cl_thread_pool_destroy(&p_disp->worker_threads); 138 139 /* Process all outstanding callbacks. */ 140 __cl_disp_worker(p_disp); 141 142 /* Free all registration info. */ 143 while (!cl_is_qlist_empty(&p_disp->reg_list)) 144 free(cl_qlist_remove_head(&p_disp->reg_list)); 145 } 146 147 void cl_disp_destroy(IN cl_dispatcher_t * const p_disp) 148 { 149 CL_ASSERT(p_disp); 150 151 cl_spinlock_destroy(&p_disp->lock); 152 /* Destroy the message pool */ 153 cl_qpool_destroy(&p_disp->msg_pool); 154 /* Destroy the pointer vector of registrants. */ 155 cl_ptr_vector_destroy(&p_disp->reg_vec); 156 } 157 158 cl_status_t cl_disp_init(IN cl_dispatcher_t * const p_disp, 159 IN const uint32_t thread_count, 160 IN const char *const name) 161 { 162 cl_status_t status; 163 164 CL_ASSERT(p_disp); 165 166 cl_disp_construct(p_disp); 167 168 status = cl_spinlock_init(&p_disp->lock); 169 if (status != CL_SUCCESS) { 170 cl_disp_destroy(p_disp); 171 return (status); 172 } 173 174 /* Specify no upper limit to the number of messages in the pool */ 175 status = cl_qpool_init(&p_disp->msg_pool, CL_DISP_INITIAL_MSG_COUNT, 176 0, CL_DISP_MSG_GROW_SIZE, sizeof(cl_disp_msg_t), 177 NULL, NULL, NULL); 178 if (status != CL_SUCCESS) { 179 cl_disp_destroy(p_disp); 180 return (status); 181 } 182 183 status = cl_ptr_vector_init(&p_disp->reg_vec, CL_DISP_INITIAL_REG_COUNT, 184 CL_DISP_REG_GROW_SIZE); 185 if (status != CL_SUCCESS) { 186 cl_disp_destroy(p_disp); 187 return (status); 188 } 189 190 status = cl_thread_pool_init(&p_disp->worker_threads, thread_count, 191 __cl_disp_worker, p_disp, name); 192 if (status != CL_SUCCESS) 193 cl_disp_destroy(p_disp); 194 195 return (status); 196 } 197 198 cl_disp_reg_handle_t cl_disp_register(IN cl_dispatcher_t * const p_disp, 199 IN const cl_disp_msgid_t msg_id, 200 IN cl_pfn_msgrcv_cb_t pfn_callback 201 OPTIONAL, 202 IN const void *const context OPTIONAL) 203 { 204 cl_disp_reg_info_t *p_reg; 205 cl_status_t status; 206 207 CL_ASSERT(p_disp); 208 209 /* Check that the requested registrant ID is available. */ 210 cl_spinlock_acquire(&p_disp->lock); 211 if ((msg_id != CL_DISP_MSGID_NONE) && 212 (msg_id < cl_ptr_vector_get_size(&p_disp->reg_vec)) && 213 (cl_ptr_vector_get(&p_disp->reg_vec, msg_id))) { 214 cl_spinlock_release(&p_disp->lock); 215 return (NULL); 216 } 217 218 /* Get a registration info from the pool. */ 219 p_reg = (cl_disp_reg_info_t *) malloc(sizeof(cl_disp_reg_info_t)); 220 if (!p_reg) { 221 cl_spinlock_release(&p_disp->lock); 222 return (NULL); 223 } else { 224 memset(p_reg, 0, sizeof(cl_disp_reg_info_t)); 225 } 226 227 p_reg->p_disp = p_disp; 228 p_reg->ref_cnt = 0; 229 p_reg->pfn_rcv_callback = pfn_callback; 230 p_reg->context = context; 231 p_reg->msg_id = msg_id; 232 233 /* Insert the registration in the list. */ 234 cl_qlist_insert_tail(&p_disp->reg_list, (cl_list_item_t *) p_reg); 235 236 /* Set the array entry to the registrant. */ 237 /* The ptr_vector grow automatically as necessary. */ 238 if (msg_id != CL_DISP_MSGID_NONE) { 239 status = cl_ptr_vector_set(&p_disp->reg_vec, msg_id, p_reg); 240 if (status != CL_SUCCESS) { 241 free(p_reg); 242 cl_spinlock_release(&p_disp->lock); 243 return (NULL); 244 } 245 } 246 247 cl_spinlock_release(&p_disp->lock); 248 249 return (p_reg); 250 } 251 252 void cl_disp_unregister(IN const cl_disp_reg_handle_t handle) 253 { 254 cl_disp_reg_info_t *p_reg; 255 cl_dispatcher_t *p_disp; 256 257 if (handle == CL_DISP_INVALID_HANDLE) 258 return; 259 260 p_reg = (cl_disp_reg_info_t *) handle; 261 p_disp = p_reg->p_disp; 262 CL_ASSERT(p_disp); 263 264 cl_spinlock_acquire(&p_disp->lock); 265 /* 266 * Clear the registrant vector entry. This will cause any further 267 * post calls to fail. 268 */ 269 if (p_reg->msg_id != CL_DISP_MSGID_NONE) { 270 CL_ASSERT(p_reg->msg_id < 271 cl_ptr_vector_get_size(&p_disp->reg_vec)); 272 cl_ptr_vector_set(&p_disp->reg_vec, p_reg->msg_id, NULL); 273 } 274 cl_spinlock_release(&p_disp->lock); 275 276 while (p_reg->ref_cnt > 0) 277 cl_thread_suspend(1); 278 279 cl_spinlock_acquire(&p_disp->lock); 280 /* Remove the registrant from the list. */ 281 cl_qlist_remove_item(&p_disp->reg_list, (cl_list_item_t *) p_reg); 282 /* Return the registration info to the pool */ 283 free(p_reg); 284 285 cl_spinlock_release(&p_disp->lock); 286 } 287 288 cl_status_t cl_disp_post(IN const cl_disp_reg_handle_t handle, 289 IN const cl_disp_msgid_t msg_id, 290 IN const void *const p_data, 291 IN cl_pfn_msgdone_cb_t pfn_callback OPTIONAL, 292 IN const void *const context OPTIONAL) 293 { 294 cl_disp_reg_info_t *p_src_reg = (cl_disp_reg_info_t *) handle; 295 cl_disp_reg_info_t *p_dest_reg; 296 cl_dispatcher_t *p_disp; 297 cl_disp_msg_t *p_msg; 298 299 p_disp = handle->p_disp; 300 CL_ASSERT(p_disp); 301 CL_ASSERT(msg_id != CL_DISP_MSGID_NONE); 302 303 cl_spinlock_acquire(&p_disp->lock); 304 /* Check that the recipient exists. */ 305 if (cl_ptr_vector_get_size(&p_disp->reg_vec) <= msg_id) { 306 cl_spinlock_release(&p_disp->lock); 307 return (CL_NOT_FOUND); 308 } 309 310 p_dest_reg = cl_ptr_vector_get(&p_disp->reg_vec, msg_id); 311 if (!p_dest_reg) { 312 cl_spinlock_release(&p_disp->lock); 313 return (CL_NOT_FOUND); 314 } 315 316 /* Get a free message from the pool. */ 317 p_msg = (cl_disp_msg_t *) cl_qpool_get(&p_disp->msg_pool); 318 if (!p_msg) { 319 cl_spinlock_release(&p_disp->lock); 320 return (CL_INSUFFICIENT_MEMORY); 321 } 322 323 /* Initialize the message */ 324 p_msg->p_src_reg = p_src_reg; 325 p_msg->p_dest_reg = p_dest_reg; 326 p_msg->p_data = p_data; 327 p_msg->pfn_xmt_callback = pfn_callback; 328 p_msg->context = context; 329 p_msg->in_time = cl_get_time_stamp(); 330 331 /* 332 * Increment the sender's reference count if they request a completion 333 * notification. 334 */ 335 if (pfn_callback) 336 cl_atomic_inc(&p_src_reg->ref_cnt); 337 338 /* Increment the recipient's reference count. */ 339 cl_atomic_inc(&p_dest_reg->ref_cnt); 340 341 /* Queue the message in the FIFO. */ 342 cl_qlist_insert_tail(&p_disp->msg_fifo, (cl_list_item_t *) p_msg); 343 cl_spinlock_release(&p_disp->lock); 344 345 /* Signal the thread pool that there is work to be done. */ 346 cl_thread_pool_signal(&p_disp->worker_threads); 347 return (CL_SUCCESS); 348 } 349 350 void cl_disp_get_queue_status(IN const cl_disp_reg_handle_t handle, 351 OUT uint32_t * p_num_queued_msgs, 352 OUT uint64_t * p_last_msg_queue_time_ms) 353 { 354 cl_dispatcher_t *p_disp = ((cl_disp_reg_info_t *) handle)->p_disp; 355 356 cl_spinlock_acquire(&p_disp->lock); 357 358 if (p_last_msg_queue_time_ms) 359 *p_last_msg_queue_time_ms = 360 p_disp->last_msg_queue_time_us / 1000; 361 362 if (p_num_queued_msgs) 363 *p_num_queued_msgs = cl_qlist_count(&p_disp->msg_fifo); 364 365 cl_spinlock_release(&p_disp->lock); 366 } 367