Lines Matching full:bucket
379 * One additional bucket is used as the "idle bucket" (details below).
381 * Each bucket also has a "backlog" list, used to store pending jobs,
384 * threads become available in each bucket, they begin work on the backlog
388 * Each bucket usually has some worker threads ready to accept new work,
390 * that array of buckets there is one more bucket called the "idle bucket",
392 * bucket when that bucket needs another worker thread. When a dispatch
394 * bucket free list nor in the idle bucket, it will attempt to create a
395 * new thread in the hashed bucket (see taskq_bucket_extend).
397 * Dispatch first tries a bucket chosen by hash, then the idle bucket.
399 * bucket chosen by hash, and makes a dispatch attempt on that bucket.
401 * an entry is placed on a per-bucket backlog queue. The backlog is
402 * serviced as soon as other bucket threads become available.
405 * free list for the bucket in which they were dispatched, and if no new
407 * wait, the thread takes itself off that bucket free list and moves to
408 * the "idle bucket", where waits longer (taskq_thread_timeout), before
412 * and stay there while there's work for that bucket. After a thread
413 * waits in a bucket for a short time (taskq_d_svc_tmo) without having
414 * any task assigned, it migrates to the idle bucket. An exception
416 * the idle bucket.
421 * field and places an entry on the per-bucket "tqbucket_freelist" which is
422 * used as a short-term cache of threads available for that bucket. All
426 * that threads seeing no work for a while can move to the idle bucket.
430 * thread. Dispatch first tries a bucket selected via hash, then the idle
431 * bucket. If both of those fail (and depending on options) an attempt to
432 * add threads to the bucket is made.
435 * entry on the bucket free list and goes to a (short) timed sleep. If it
437 * free list and "migrates" to the "idle bucket" for a longer wait.
440 * A thread may be dispatched work from the idle bucket (eg. when dispatch
442 * dispatched from the idle bucket, it moves to the bucket that the hash
448 * calls, used to request thread creation for some bucket after a dispatch
449 * call fails to find a ready thread in some bucket. There is only one
453 * There are various statistics kept in the bucket which allows for later
460 * between the bucket and actual thread CPU, so buckets are used only to
462 * to the CPU denoted by a bucket may reduce number of times the job
465 * Current algorithm creates a thread whenever a bucket has no free
483 * For dynamic task queues, each bucket is marked as TQBUCKET_SUSPEND by
500 * 1a) The idle bucket lock for bucket management.
501 * 1b) The hashed bucket locks for bucket management.
518 * Setting TASKQ_STATISTIC to 0 will disable per-bucket statistics.
548 * TASKQ_STATISTIC - If set will enable bucket statistic (default).
613 * Dynamic taskq queue threads stay in an empty bucket for only a
614 * relatively short time before moving to the "idle bucket".
625 * Must be a power of two. Dynamic should have more than one bucket.
717 kstat_named_t tqd_ihits; /* idle bucket hits */
718 kstat_named_t tqd_imisses; /* idle bucket misses */
760 * Collect per-bucket statistic when TASKQ_STATISTIC is defined.
1190 * Dispatch a task "func(arg)" to a free entry of bucket b.
1192 * Assumes: no bucket locks is held.
1195 * NULL if there are no free entries or if the bucket is suspended.
1242 * Dispatch a task "func(arg)" using a free entry from the "idle" bucket.
1244 * bucket to the bucket passed (b).
1246 * Assumes: no bucket locks is held.
1249 * NULL if there are no free entries or if the bucket is suspended.
1272 * Need the mutex on both the idle bucket (idleb) and bucket (b) in taskq_idlebucket_dispatch()
1281 * Get an entry from the idle bucket freelist if there is one. in taskq_idlebucket_dispatch()
1298 * Note move TQE to new bucket here! in taskq_idlebucket_dispatch()
1304 * Track the "alloc" on the bucket moved to, in taskq_idlebucket_dispatch()
1317 /* Count this as a "hit" on the idle bucket. */ in taskq_idlebucket_dispatch()
1330 * Enqueue a taskq job on the per-bucket backlog.
1333 taskq_backlog_dispatch(taskq_bucket_t *bucket, task_func_t func, void *arg, in taskq_backlog_dispatch() argument
1346 mutex_enter(&bucket->tqbucket_lock); in taskq_backlog_dispatch()
1347 taskq_backlog_enqueue(bucket, tqe, flags); in taskq_backlog_dispatch()
1348 mutex_exit(&bucket->tqbucket_lock); in taskq_backlog_dispatch()
1354 taskq_backlog_enqueue(taskq_bucket_t *bucket, taskq_ent_t *tqe, int flags) in taskq_backlog_enqueue() argument
1357 ASSERT(MUTEX_HELD(&bucket->tqbucket_lock)); in taskq_backlog_enqueue()
1359 tqe->tqent_un.tqent_bucket = bucket; in taskq_backlog_enqueue()
1361 TQ_PREPEND(bucket->tqbucket_backlog, tqe); in taskq_backlog_enqueue()
1363 TQ_APPEND(bucket->tqbucket_backlog, tqe); in taskq_backlog_enqueue()
1365 bucket->tqbucket_nbacklog++; in taskq_backlog_enqueue()
1369 taskq_bucket_t *, bucket, in taskq_backlog_enqueue()
1371 TQ_STAT(bucket, tqs_overflow); in taskq_backlog_enqueue()
1373 if (bucket->tqbucket_stat.tqs_maxbacklog < in taskq_backlog_enqueue()
1374 bucket->tqbucket_nbacklog) { in taskq_backlog_enqueue()
1375 bucket->tqbucket_stat.tqs_maxbacklog = in taskq_backlog_enqueue()
1376 bucket->tqbucket_nbacklog; in taskq_backlog_enqueue()
1382 * not found any idle TQE. The bucket lock is dropped in taskq_backlog_enqueue()
1388 if (bucket->tqbucket_nfree != 0) { in taskq_backlog_enqueue()
1390 itqe = bucket->tqbucket_freelist.tqent_prev; in taskq_backlog_enqueue()
1408 taskq_bucket_t *bucket = NULL; /* Which bucket needs extension */ in taskq_dispatch() local
1457 * In a single-CPU case there is only one bucket, so get in taskq_dispatch()
1463 bucket = tq->tq_buckets; in taskq_dispatch()
1467 bucket = &tq->tq_buckets[h & (bsize - 1)]; in taskq_dispatch()
1468 ASSERT(bucket->tqbucket_taskq == tq); /* Sanity check */ in taskq_dispatch()
1471 * Do a quick check before grabbing the lock. If the bucket does in taskq_dispatch()
1475 if (bucket->tqbucket_nfree != 0) { in taskq_dispatch()
1476 tqe = taskq_bucket_dispatch(bucket, func, arg); in taskq_dispatch()
1480 TQ_STAT(bucket, tqs_misses); in taskq_dispatch()
1485 * Try the "idle" bucket, which if successful, will in taskq_dispatch()
1486 * migrate an idle thread into this bucket. in taskq_dispatch()
1488 tqe = taskq_idlebucket_dispatch(bucket, func, arg); in taskq_dispatch()
1499 * For KM_SLEEP dispatches, try to extend the bucket and retry dispatch. in taskq_dispatch()
1502 * fine - we deal with it later. If the bucket was successfully in taskq_dispatch()
1510 while (taskq_bucket_extend(bucket) != NULL) { in taskq_dispatch()
1511 TQ_STAT(bucket, tqs_disptcreates); in taskq_dispatch()
1512 tqe = taskq_bucket_dispatch(bucket, func, arg); in taskq_dispatch()
1522 * Use the per-bucket backlog queue unless TQ_NOQUEUE was asked. in taskq_dispatch()
1524 * task to try to extend (add a thread to) this bucket. in taskq_dispatch()
1527 tqe = taskq_backlog_dispatch(bucket, func, arg, flags); in taskq_dispatch()
1531 * Since there are not enough free entries in the bucket, add a in taskq_dispatch()
1539 if (!taskq_ent_exists(tq, taskq_bucket_overflow, bucket)) { in taskq_dispatch()
1542 TQ_ENQUEUE(tq, tqe1, taskq_bucket_overflow, bucket); in taskq_dispatch()
1952 * When a thread becomes idle in one bucket and goes in search of another
1953 * bucket to service, it's not on any free list. For consistency with the
1967 * When taskq_bucket_redist needs to direct a thread from one bucket
1968 * to another, this function is dispatched into the bucket that will
1969 * donate the thread, with the arg pointing to the bucket that will
1980 * Helper for taskq_d_thread() -- service a bucket
1984 taskq_bucket_t *bucket, taskq_t *tq) in taskq_d_svc_bucket() argument
1986 kmutex_t *lock = &bucket->tqbucket_lock; in taskq_d_svc_bucket()
2000 /* We started on the bucket free list. */ in taskq_d_svc_bucket()
2002 ASSERT(bucket->tqbucket_nfree > 0); in taskq_d_svc_bucket()
2008 if (bucket->tqbucket_nbacklog > 0) { in taskq_d_svc_bucket()
2010 bucket->tqbucket_nfree--; in taskq_d_svc_bucket()
2012 bucket->tqbucket_nalloc++; in taskq_d_svc_bucket()
2027 bucket->tqbucket_flags &= ~TQBUCKET_REDIRECT; in taskq_d_svc_bucket()
2032 * Migration to a new bucket (forced or voluntary). in taskq_d_svc_bucket()
2036 * this function after servicing another bucket. in taskq_d_svc_bucket()
2039 bucket->tqbucket_nalloc++; in taskq_d_svc_bucket()
2053 ASSERT(bucket->tqbucket_nalloc > 0); in taskq_d_svc_bucket()
2060 bucket->tqbucket_nalloc--; in taskq_d_svc_bucket()
2070 taskq_bucket_t *, bucket, taskq_ent_t *, tqe); in taskq_d_svc_bucket()
2073 taskq_bucket_t *, bucket, taskq_ent_t *, tqe); in taskq_d_svc_bucket()
2076 bucket->tqbucket_totaltime += end - start; in taskq_d_svc_bucket()
2085 if (bucket->tqbucket_nbacklog > 0) { in taskq_d_svc_bucket()
2093 ASSERT(bucket->tqbucket_nalloc > 0); in taskq_d_svc_bucket()
2095 ASSERT(!IS_EMPTY(bucket->tqbucket_backlog)); in taskq_d_svc_bucket()
2096 bltqe = bucket->tqbucket_backlog.tqent_next; in taskq_d_svc_bucket()
2098 bucket->tqbucket_nbacklog--; in taskq_d_svc_bucket()
2101 taskq_bucket_t *, bucket, in taskq_d_svc_bucket()
2120 * We've run out of work in this bucket. in taskq_d_svc_bucket()
2124 ASSERT(bucket->tqbucket_nalloc > 0); in taskq_d_svc_bucket()
2125 bucket->tqbucket_nalloc--; in taskq_d_svc_bucket()
2127 TQ_APPEND(bucket->tqbucket_freelist, tqe); in taskq_d_svc_bucket()
2128 bucket->tqbucket_nfree++; in taskq_d_svc_bucket()
2134 cv_signal(&bucket->tqbucket_cv); in taskq_d_svc_bucket()
2148 if ((bucket->tqbucket_flags & TQBUCKET_CLOSE) != 0) in taskq_d_svc_bucket()
2153 * Sleep only briefly here on the bucket. in taskq_d_svc_bucket()
2154 * If no work lands in the bucket, return and in taskq_d_svc_bucket()
2168 * We're done servicing this bucket. in taskq_d_svc_bucket()
2177 ASSERT(bucket->tqbucket_nfree > 0); in taskq_d_svc_bucket()
2189 if (bucket->tqbucket_nbacklog > 0) { in taskq_d_svc_bucket()
2191 bucket->tqbucket_nfree--; in taskq_d_svc_bucket()
2193 bucket->tqbucket_nalloc++; in taskq_d_svc_bucket()
2216 * Will migrate to another bucket. in taskq_d_svc_bucket()
2220 * and enter the mutex for the next bucket we serve. in taskq_d_svc_bucket()
2224 ASSERT(bucket->tqbucket_nfree > 0); in taskq_d_svc_bucket()
2225 bucket->tqbucket_nfree--; in taskq_d_svc_bucket()
2226 cv_signal(&bucket->tqbucket_cv); in taskq_d_svc_bucket()
2240 taskq_bucket_t *bucket = tqe->tqent_un.tqent_bucket; in taskq_d_thread() local
2241 taskq_t *tq = bucket->tqbucket_taskq; in taskq_d_thread()
2254 bucket = tqe->tqent_un.tqent_bucket; in taskq_d_thread()
2258 * we'll be on the idle bucket free list. in taskq_d_thread()
2261 if (bucket == idle_bucket) { in taskq_d_thread()
2274 * Service the bucket pointed to by the TQE. in taskq_d_thread()
2276 * We may or may not be on the bucket free list. in taskq_d_thread()
2279 bucket = tqe->tqent_un.tqent_bucket; in taskq_d_thread()
2280 VERIFY3P(bucket, >=, tq->tq_buckets); in taskq_d_thread()
2281 VERIFY3P(bucket, <, idle_bucket); in taskq_d_thread()
2283 /* Enters/exits bucket->tqbucket_lock */ in taskq_d_thread()
2284 taskq_d_svc_bucket(tqe, bucket, tq); in taskq_d_thread()
2287 * Finished servicing a bucket where we became idle. in taskq_d_thread()
2288 * Not on any free list. Migrate to another bucket. in taskq_d_thread()
2290 * bucket indicated by the arg. in taskq_d_thread()
2295 * Migrate to this bucket. in taskq_d_thread()
2306 * to that bucket. Search starting at the next bucket in taskq_d_thread()
2310 * Unlocked access is OK here. A bucket may be missed in taskq_d_thread()
2313 * visit a bucket needlessly, the visit will be short. in taskq_d_thread()
2321 VERIFY3P(bucket, >=, tq->tq_buckets); in taskq_d_thread()
2322 VERIFY3P(bucket, <, idle_bucket); in taskq_d_thread()
2324 b = bucket; in taskq_d_thread()
2326 /* Next bucket */ in taskq_d_thread()
2332 * Migrate to this bucket. in taskq_d_thread()
2340 } while (b != bucket); in taskq_d_thread()
2346 * Migrate to the idle bucket, put this TQE on in taskq_d_thread()
2347 * the free list for that bucket, then wait. in taskq_d_thread()
2392 * Migrate to the new bucket. in taskq_d_thread()
2421 * Note that tqent_bucket is the idle bucket in taskq_d_thread()
2424 * first bucket. This happens rarely. in taskq_d_thread()
2426 bucket = tq->tq_buckets; in taskq_d_thread()
2430 tqe->tqent_un.tqent_bucket = bucket; in taskq_d_thread()
2437 * taskq_ent_t from the idle bucket (idleb) to a in taskq_d_thread()
2438 * new bucket (newb). In detail, it has: in taskq_d_thread()
2459 * The bucket lock is dropped by CALLB_CPR_EXIT(). in taskq_d_thread()
2467 /* Note: Creates and deaths are on the idle bucket. */ in taskq_d_thread()
2687 * for the bucket used as the "idle bucket". in taskq_create_common()
2690 taskq_bucket_t *bucket = kmem_zalloc(sizeof (taskq_bucket_t) * in taskq_create_common() local
2692 taskq_bucket_t *idle_bucket = &bucket[bsize]; in taskq_create_common()
2695 tq->tq_buckets = bucket; in taskq_create_common()
2697 /* Initialize each bucket */ in taskq_create_common()
2698 for (b_id = 0; b_id < (bsize + 1); b_id++, bucket++) { in taskq_create_common()
2699 mutex_init(&bucket->tqbucket_lock, NULL, MUTEX_DEFAULT, in taskq_create_common()
2701 cv_init(&bucket->tqbucket_cv, NULL, CV_DEFAULT, NULL); in taskq_create_common()
2702 bucket->tqbucket_taskq = tq; in taskq_create_common()
2703 TQ_LIST_INIT(bucket->tqbucket_freelist); in taskq_create_common()
2704 TQ_LIST_INIT(bucket->tqbucket_backlog); in taskq_create_common()
2707 * Always create at least one idle bucket thread. in taskq_create_common()
2828 * Mark each bucket as closing and wakeup all sleeping threads. in taskq_destroy()
2830 * Include the idle bucket here. in taskq_destroy()
2852 * The idle bucket may have many threads. in taskq_destroy()
2869 /* idle bucket; just wake one. */ in taskq_destroy()
2936 * and if that fails, make sure the bucket has at least one thread,
2937 * redirecting a thread from another bucket if necessary.
2950 * Extend a bucket with a new entry on the free list and attach a worker
2956 * Argument: pointer to the bucket.
2968 /* How many threads currently in this bucket? */ in taskq_bucket_extend()
2976 * When there are no threads in this bucket, this call should in taskq_bucket_extend()
3018 * Once the entry is ready, link it to the the bucket free list. in taskq_bucket_extend()
3027 * Account for creates in the idle bucket, because in taskq_bucket_extend()
3064 * new thread to another racing dispatch call. If this bucket has a
3066 * from another bucket (the donor bucket) into this one. A thread in
3067 * the donor bucket is redirected by dispatching the special function
3068 * taskq_d_redirect in the donor bucket. As soon as some thread in the
3069 * donor bucket completes, it will find taskq_d_redirect in the backlog
3070 * and move to the recipient bucket (the bucket arg here).
3073 taskq_bucket_redist(taskq_bucket_t *bucket) in taskq_bucket_redist() argument
3075 taskq_t *tq = bucket->tqbucket_taskq; in taskq_bucket_redist()
3077 taskq_bucket_t *db; /* donor bucket candidate */ in taskq_bucket_redist()
3081 VERIFY3P(bucket, >=, tq->tq_buckets); in taskq_bucket_redist()
3082 VERIFY3P(bucket, <, idle_bucket); in taskq_bucket_redist()
3085 * This makes no sense with a single bucket. in taskq_bucket_redist()
3095 mutex_enter(&bucket->tqbucket_lock); in taskq_bucket_redist()
3096 nthreads = bucket->tqbucket_nalloc + bucket->tqbucket_nfree; in taskq_bucket_redist()
3097 if (nthreads > 0 || bucket->tqbucket_nbacklog == 0 || in taskq_bucket_redist()
3098 (bucket->tqbucket_flags & TQBUCKET_REDIRECT) != 0) { in taskq_bucket_redist()
3099 mutex_exit(&bucket->tqbucket_lock); in taskq_bucket_redist()
3103 bucket->tqbucket_flags |= TQBUCKET_REDIRECT; in taskq_bucket_redist()
3104 mutex_exit(&bucket->tqbucket_lock); in taskq_bucket_redist()
3112 tqe->tqent_arg = bucket; /* redirected to */ in taskq_bucket_redist()
3115 * Find a "donor bucket" (db) that can afford to lose a thread. in taskq_bucket_redist()
3116 * Search starting at the next bucket after the passed in one. in taskq_bucket_redist()
3118 * because the recipient bucket has no threads. in taskq_bucket_redist()
3120 db = bucket; in taskq_bucket_redist()
3122 /* Next bucket */ in taskq_bucket_redist()
3125 if (db == bucket) in taskq_bucket_redist()
3138 * No bucket with more than an average number of threads. in taskq_bucket_redist()
3142 taskq_bucket_t *, bucket); in taskq_bucket_redist()
3145 mutex_enter(&bucket->tqbucket_lock); in taskq_bucket_redist()
3146 bucket->tqbucket_flags &= ~TQBUCKET_REDIRECT; in taskq_bucket_redist()
3147 mutex_exit(&bucket->tqbucket_lock); in taskq_bucket_redist()
3153 * was added to some bucket, wake an idle thread. in taskq_bucket_redist()
3158 itqe = bucket->tqbucket_freelist.tqent_prev; in taskq_bucket_redist()
3164 taskq_bucket_t *, bucket, taskq_ent_t *, tqe); in taskq_bucket_redist()
3237 * For the idle bucket, update ihits, imisses in taskq_d_kstat_update()