Lines Matching full:queue

107 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,  in _timeout_task_init()  argument
112 callout_init_mtx(&timeout_task->c, &queue->tq_mutex, in _timeout_task_init()
114 timeout_task->q = queue; in _timeout_task_init()
127 task_get_busy(struct taskqueue *queue, struct task *task) in task_get_busy() argument
131 TQ_ASSERT_LOCKED(queue); in task_get_busy()
132 LIST_FOREACH(tb, &queue->tq_active, tb_link) { in task_get_busy()
144 struct taskqueue *queue; in _taskqueue_create() local
151 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); in _taskqueue_create()
152 if (queue == NULL) { in _taskqueue_create()
159 STAILQ_INIT(&queue->tq_queue); in _taskqueue_create()
160 LIST_INIT(&queue->tq_active); in _taskqueue_create()
161 queue->tq_enqueue = enqueue; in _taskqueue_create()
162 queue->tq_context = context; in _taskqueue_create()
163 queue->tq_name = tq_name; in _taskqueue_create()
164 queue->tq_spin = (mtxflags & MTX_SPIN) != 0; in _taskqueue_create()
165 queue->tq_flags |= TQ_FLAGS_ACTIVE; in _taskqueue_create()
170 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; in _taskqueue_create()
171 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); in _taskqueue_create()
173 return (queue); in _taskqueue_create()
186 taskqueue_set_callback(struct taskqueue *queue, in taskqueue_set_callback() argument
195 KASSERT((queue->tq_callbacks[cb_type] == NULL), in taskqueue_set_callback()
198 queue->tq_callbacks[cb_type] = callback; in taskqueue_set_callback()
199 queue->tq_cb_contexts[cb_type] = context; in taskqueue_set_callback()
216 taskqueue_free(struct taskqueue *queue) in taskqueue_free() argument
219 TQ_LOCK(queue); in taskqueue_free()
220 queue->tq_flags &= ~TQ_FLAGS_ACTIVE; in taskqueue_free()
221 taskqueue_terminate(queue->tq_threads, queue); in taskqueue_free()
222 KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?")); in taskqueue_free()
223 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); in taskqueue_free()
224 mtx_destroy(&queue->tq_mutex); in taskqueue_free()
225 free(queue->tq_threads, M_TASKQUEUE); in taskqueue_free()
226 free(queue->tq_name, M_TASKQUEUE); in taskqueue_free()
227 free(queue, M_TASKQUEUE); in taskqueue_free()
231 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task, int flags) in taskqueue_enqueue_locked() argument
242 tb = task_get_busy(queue, task); in taskqueue_enqueue_locked()
244 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
254 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
259 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
269 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); in taskqueue_enqueue_locked()
271 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); in taskqueue_enqueue_locked()
273 prev = queue->tq_hint; in taskqueue_enqueue_locked()
278 ins = STAILQ_FIRST(&queue->tq_queue); in taskqueue_enqueue_locked()
285 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); in taskqueue_enqueue_locked()
286 queue->tq_hint = task; in taskqueue_enqueue_locked()
288 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); in taskqueue_enqueue_locked()
292 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0) in taskqueue_enqueue_locked()
293 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
294 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) in taskqueue_enqueue_locked()
295 queue->tq_enqueue(queue->tq_context); in taskqueue_enqueue_locked()
296 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) in taskqueue_enqueue_locked()
297 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
304 taskqueue_enqueue_flags(struct taskqueue *queue, struct task *task, int flags) in taskqueue_enqueue_flags() argument
308 TQ_LOCK(queue); in taskqueue_enqueue_flags()
309 res = taskqueue_enqueue_locked(queue, task, flags); in taskqueue_enqueue_flags()
316 taskqueue_enqueue(struct taskqueue *queue, struct task *task) in taskqueue_enqueue() argument
318 return (taskqueue_enqueue_flags(queue, task, 0)); in taskqueue_enqueue()
324 struct taskqueue *queue; in taskqueue_timeout_func() local
328 queue = timeout_task->q; in taskqueue_timeout_func()
331 queue->tq_callouts--; in taskqueue_timeout_func()
337 taskqueue_enqueue_timeout_sbt(struct taskqueue *queue, in taskqueue_enqueue_timeout_sbt() argument
342 TQ_LOCK(queue); in taskqueue_enqueue_timeout_sbt()
343 KASSERT(timeout_task->q == NULL || timeout_task->q == queue, in taskqueue_enqueue_timeout_sbt()
344 ("Migrated queue")); in taskqueue_enqueue_timeout_sbt()
345 timeout_task->q = queue; in taskqueue_enqueue_timeout_sbt()
349 TQ_UNLOCK(queue); in taskqueue_enqueue_timeout_sbt()
352 taskqueue_enqueue_locked(queue, &timeout_task->t, 0); in taskqueue_enqueue_timeout_sbt()
358 queue->tq_callouts++; in taskqueue_enqueue_timeout_sbt()
364 if (queue->tq_spin) in taskqueue_enqueue_timeout_sbt()
366 if (queue->tq_spin && queue->tq_tcount == 1 && in taskqueue_enqueue_timeout_sbt()
367 queue->tq_threads[0] == curthread) { in taskqueue_enqueue_timeout_sbt()
375 TQ_UNLOCK(queue); in taskqueue_enqueue_timeout_sbt()
381 taskqueue_enqueue_timeout(struct taskqueue *queue, in taskqueue_enqueue_timeout() argument
385 return (taskqueue_enqueue_timeout_sbt(queue, ttask, ticks * tick_sbt, in taskqueue_enqueue_timeout()
400 taskqueue_drain_tq_queue(struct taskqueue *queue) in taskqueue_drain_tq_queue() argument
404 if (STAILQ_EMPTY(&queue->tq_queue)) in taskqueue_drain_tq_queue()
413 * queue lock. in taskqueue_drain_tq_queue()
416 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); in taskqueue_drain_tq_queue()
417 queue->tq_hint = &t_barrier; in taskqueue_drain_tq_queue()
425 TQ_SLEEP(queue, &t_barrier, "tq_qdrain"); in taskqueue_drain_tq_queue()
435 taskqueue_drain_tq_active(struct taskqueue *queue) in taskqueue_drain_tq_active() argument
440 if (LIST_EMPTY(&queue->tq_active)) in taskqueue_drain_tq_active()
444 queue->tq_callouts++; in taskqueue_drain_tq_active()
447 seq = queue->tq_seq; in taskqueue_drain_tq_active()
449 LIST_FOREACH(tb, &queue->tq_active, tb_link) { in taskqueue_drain_tq_active()
451 TQ_SLEEP(queue, tb->tb_running, "tq_adrain"); in taskqueue_drain_tq_active()
457 queue->tq_callouts--; in taskqueue_drain_tq_active()
458 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) in taskqueue_drain_tq_active()
459 wakeup_one(queue->tq_threads); in taskqueue_drain_tq_active()
464 taskqueue_block(struct taskqueue *queue) in taskqueue_block() argument
467 TQ_LOCK(queue); in taskqueue_block()
468 queue->tq_flags |= TQ_FLAGS_BLOCKED; in taskqueue_block()
469 TQ_UNLOCK(queue); in taskqueue_block()
473 taskqueue_unblock(struct taskqueue *queue) in taskqueue_unblock() argument
476 TQ_LOCK(queue); in taskqueue_unblock()
477 queue->tq_flags &= ~TQ_FLAGS_BLOCKED; in taskqueue_unblock()
478 if (!STAILQ_EMPTY(&queue->tq_queue)) in taskqueue_unblock()
479 queue->tq_enqueue(queue->tq_context); in taskqueue_unblock()
480 TQ_UNLOCK(queue); in taskqueue_unblock()
484 taskqueue_run_locked(struct taskqueue *queue) in taskqueue_run_locked() argument
492 KASSERT(queue != NULL, ("tq is NULL")); in taskqueue_run_locked()
493 TQ_ASSERT_LOCKED(queue); in taskqueue_run_locked()
495 LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link); in taskqueue_run_locked()
498 while ((task = STAILQ_FIRST(&queue->tq_queue)) != NULL) { in taskqueue_run_locked()
499 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); in taskqueue_run_locked()
500 if (queue->tq_hint == task) in taskqueue_run_locked()
501 queue->tq_hint = NULL; in taskqueue_run_locked()
505 tb.tb_seq = ++queue->tq_seq; in taskqueue_run_locked()
507 TQ_UNLOCK(queue); in taskqueue_run_locked()
519 TQ_LOCK(queue); in taskqueue_run_locked()
528 taskqueue_run(struct taskqueue *queue) in taskqueue_run() argument
531 TQ_LOCK(queue); in taskqueue_run()
532 taskqueue_run_locked(queue); in taskqueue_run()
533 TQ_UNLOCK(queue); in taskqueue_run()
542 taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task) in taskqueue_poll_is_busy() argument
546 TQ_LOCK(queue); in taskqueue_poll_is_busy()
547 retval = task->ta_pending > 0 || task_get_busy(queue, task) != NULL; in taskqueue_poll_is_busy()
548 TQ_UNLOCK(queue); in taskqueue_poll_is_busy()
554 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task, in taskqueue_cancel_locked() argument
561 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link); in taskqueue_cancel_locked()
562 if (queue->tq_hint == task) in taskqueue_cancel_locked()
563 queue->tq_hint = NULL; in taskqueue_cancel_locked()
568 tb = task_get_busy(queue, task); in taskqueue_cancel_locked()
578 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp) in taskqueue_cancel() argument
582 TQ_LOCK(queue); in taskqueue_cancel()
583 error = taskqueue_cancel_locked(queue, task, pendp); in taskqueue_cancel()
584 TQ_UNLOCK(queue); in taskqueue_cancel()
590 taskqueue_cancel_timeout(struct taskqueue *queue, in taskqueue_cancel_timeout() argument
596 TQ_LOCK(queue); in taskqueue_cancel_timeout()
598 error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); in taskqueue_cancel_timeout()
601 queue->tq_callouts--; in taskqueue_cancel_timeout()
603 TQ_UNLOCK(queue); in taskqueue_cancel_timeout()
611 taskqueue_drain(struct taskqueue *queue, struct task *task) in taskqueue_drain() argument
614 if (!queue->tq_spin) in taskqueue_drain()
617 TQ_LOCK(queue); in taskqueue_drain()
618 while (task->ta_pending != 0 || task_get_busy(queue, task) != NULL) in taskqueue_drain()
619 TQ_SLEEP(queue, task, "tq_drain"); in taskqueue_drain()
620 TQ_UNLOCK(queue); in taskqueue_drain()
624 taskqueue_drain_all(struct taskqueue *queue) in taskqueue_drain_all() argument
627 if (!queue->tq_spin) in taskqueue_drain_all()
630 TQ_LOCK(queue); in taskqueue_drain_all()
631 (void)taskqueue_drain_tq_queue(queue); in taskqueue_drain_all()
632 (void)taskqueue_drain_tq_active(queue); in taskqueue_drain_all()
633 TQ_UNLOCK(queue); in taskqueue_drain_all()
637 taskqueue_drain_timeout(struct taskqueue *queue, in taskqueue_drain_timeout() argument
644 TQ_LOCK(queue); in taskqueue_drain_timeout()
648 TQ_UNLOCK(queue); in taskqueue_drain_timeout()
651 taskqueue_drain(queue, &timeout_task->t); in taskqueue_drain_timeout()
656 TQ_LOCK(queue); in taskqueue_drain_timeout()
658 TQ_UNLOCK(queue); in taskqueue_drain_timeout()
662 taskqueue_quiesce(struct taskqueue *queue) in taskqueue_quiesce() argument
666 TQ_LOCK(queue); in taskqueue_quiesce()
668 ret = taskqueue_drain_tq_queue(queue); in taskqueue_quiesce()
670 ret = taskqueue_drain_tq_active(queue); in taskqueue_quiesce()
672 TQ_UNLOCK(queue); in taskqueue_quiesce()
867 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
903 taskqueue_member(struct taskqueue *queue, struct thread *td) in taskqueue_member() argument
908 if (queue->tq_threads[i] == NULL) in taskqueue_member()
910 if (queue->tq_threads[i] == td) { in taskqueue_member()
914 if (++j >= queue->tq_tcount) in taskqueue_member()