Lines Matching +full:set +full:- +full:io +full:- +full:isolation
2 * work_thread.c - threads implementation for blocking worker child.
26 #define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1)
29 * The request queue grows a bit faster than the response queue -- the
38 * set the maximum to 256kB. If the minimum goes below the
39 * system-defined minimum stack size, we have to adjust accordingly.
64 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
95 /* --------------------------------------------------------------------
110 /* --------------------------------------------------------------------
111 * implementation isolation wrapper
121 /* --------------------------------------------------------------------
136 return -1;
141 return -1;
145 rc = wait_for_sem(c->wake_scheduled_sleep, &until);
147 return -1;
148 if (-1 == rc && ETIMEDOUT == errno)
151 return -1;
155 /* --------------------------------------------------------------------
166 if (NULL == c || NULL == c->wake_scheduled_sleep)
168 tickle_sem(c->wake_scheduled_sleep);
172 /* --------------------------------------------------------------------
193 sizeof(blocking_children[0]->workitems[0]);
199 slots_used = c->head_workitem - c->tail_workitem;
200 if (slots_used >= c->workitems_alloc) {
201 new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC;
202 c->workitems = erealloc(c->workitems, new_alloc * each);
203 for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
204 c->workitems[sidx] = NULL;
205 c->tail_workitem = 0;
206 c->head_workitem = c->workitems_alloc;
207 c->workitems_alloc = new_alloc;
209 INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
213 /* --------------------------------------------------------------------
229 sizeof(blocking_children[0]->responses[0]);
235 slots_used = c->head_response - c->tail_response;
236 if (slots_used >= c->responses_alloc) {
237 new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC;
238 c->responses = erealloc(c->responses, new_alloc * each);
239 for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
240 c->responses[sidx] = NULL;
241 c->tail_response = 0;
242 c->head_response = c->responses_alloc;
243 c->responses_alloc = new_alloc;
245 INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
250 /* --------------------------------------------------------------------
251 * queue_req_pointer() - append a work item or idle exit request to
263 wait_for_sem(c->accesslock, NULL);
265 qhead = c->head_workitem;
266 c->workitems[qhead % c->workitems_alloc] = hdr;
267 c->head_workitem = 1 + qhead;
268 tickle_sem(c->accesslock);
271 /* queue consumer wake-up notification */
272 tickle_sem(c->workitems_pending);
277 /* --------------------------------------------------------------------
294 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
296 if (hdr->octets <= sizeof(*hdr))
298 payload_octets = hdr->octets - sizeof(*hdr);
300 if (NULL == c->thread_ref)
302 threadcopy = emalloc(hdr->octets);
309 /* --------------------------------------------------------------------
324 wait_for_sem(c->workitems_pending, NULL);
327 wait_for_sem(c->accesslock, NULL);
328 qhead = c->head_workitem;
330 qtail = c->tail_workitem;
333 c->tail_workitem = qtail + 1;
334 qtail %= c->workitems_alloc;
335 req = c->workitems[qtail];
336 c->workitems[qtail] = NULL;
338 tickle_sem(c->accesslock);
352 /* --------------------------------------------------------------------
366 wait_for_sem(c->accesslock, NULL);
368 qhead = c->head_response;
369 c->responses[qhead % c->responses_alloc] = resp;
370 c->head_response = 1 + qhead;
371 tickle_sem(c->accesslock);
374 /* queue consumer wake-up notification */
378 if (1 != write(c->resp_write_pipe, "", 1))
381 (BLOCKING_GETNAMEINFO == resp->rtype)
386 tickle_sem(c->responses_pending);
395 /* --------------------------------------------------------------------
396 * Check if a (Windows-)handle to a semaphore is actually the same we
405 return obj && osh && (obj->shnd == (HANDLE)osh);
408 /* --------------------------------------------------------------------
424 c->thread_ref != NULL &&
425 same_os_sema(c->responses_pending, context))
433 /* --------------------------------------------------------------------
450 rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
451 while (-1 == rc && EINTR == errno);
455 wait_for_sem(c->accesslock, NULL);
456 qhead = c->head_response;
457 qtail = c->tail_response;
459 slot = qtail % c->responses_alloc;
460 removed = c->responses[slot];
461 c->responses[slot] = NULL;
463 c->tail_response = qtail;
464 tickle_sem(c->accesslock);
469 BLOCKING_RESP_MAGIC == removed->magic_sig);
479 /* --------------------------------------------------------------------
488 DEBUG_INSIST(!c->reusable);
494 /* --------------------------------------------------------------------
496 * and Windows, of course -- most notably the Windows thread is a
510 c->thread_ref = NULL;
511 (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
512 c->thr_table[0].thnd =
521 if (NULL == c->thr_table[0].thnd) {
523 exit(-1);
526 if (!SetThreadPriority(c->thr_table[0].thnd,
531 (*pSetThreadDescription)(c->thr_table[0].thnd, L"ntp_worker");
533 resumed = ResumeThread(c->thr_table[0].thnd);
535 c->thread_ref = &c->thr_table[0];
551 c->thread_ref = NULL;
569 c->resp_read_pipe = move_fd(pipe_ends[0]);
570 c->resp_write_pipe = move_fd(pipe_ends[1]);
571 c->ispipe = is_pipe;
572 flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
573 if (-1 == flags) {
577 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
578 if (-1 == rc) {
583 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
591 "start_blocking_thread: pthread_attr_getstacksize() -> %s",
617 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
628 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
630 rc = pthread_create(&c->thr_table[0], &thr_attr,
635 msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
639 c->thread_ref = &c->thr_table[0];
643 /* --------------------------------------------------------------------
692 /* --------------------------------------------------------------------
711 semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
712 if (NULL == semptr->shnd)
727 /* ------------------------------------------------------------------ */
736 if (obj->shnd)
737 CloseHandle(obj->shnd);
738 obj->shnd = NULL;
751 /* --------------------------------------------------------------------
773 c->accesslock = create_sema(&c->sem_table[0], 1, 1);
774 c->workitems_pending = create_sema(&c->sem_table[1], 0, 0);
775 c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
777 c->responses_pending = create_sema(&c->sem_table[3], 0, 0);
781 /* --------------------------------------------------------------------
783 * internally resume -- When this function returns, there is either no
793 struct timespec * timeout /* wall-clock */
802 if (!(sem && sem->shnd)) {
804 return -1;
821 rc = WaitForSingleObject(sem->shnd, msec);
826 return -1;
830 return -1;
834 int rc = -1;
841 } while (rc == -1 && errno == EINTR);
849 /* --------------------------------------------------------------------
850 * blocking_thread - thread functions have WINAPI (aka 'stdcall')
851 * calling conventions under Windows and POSIX-defined signature
872 /* --------------------------------------------------------------------
888 return (c->accesslock)
893 /* --------------------------------------------------------------------
901 DEBUG_INSIST(!c->reusable);
907 if (c->thread_ref && c->thread_ref->thnd) {
908 WaitForSingleObject(c->thread_ref->thnd, INFINITE);
909 INSIST(CloseHandle(c->thread_ref->thnd));
910 c->thread_ref->thnd = NULL;
913 c->thread_ref = NULL;
915 /* remove semaphores and (if signalling vi IO) pipes */
917 c->accesslock = delete_sema(c->accesslock);
918 c->workitems_pending = delete_sema(c->workitems_pending);
919 c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
922 DEBUG_INSIST(-1 != c->resp_read_pipe);
923 DEBUG_INSIST(-1 != c->resp_write_pipe);
924 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
925 close(c->resp_write_pipe);
926 close(c->resp_read_pipe);
927 c->resp_write_pipe = -1;
928 c->resp_read_pipe = -1;
930 DEBUG_INSIST(NULL != c->responses_pending);
931 (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
932 c->responses_pending = delete_sema(c->responses_pending);
939 /* re-init buffer index sequencers */
940 c->head_workitem = 0;
941 c->tail_workitem = 0;
942 c->head_response = 0;
943 c->tail_response = 0;
945 c->reusable = TRUE;