Home
Name Modified Size Downloads / Week Status
Totals: 10 Items   66.1 kB 10
Linux_Port 2010-09-04
runEnv 2010-09-04 111 Bytes 11 weekly downloads
taskq_utils.c 2010-09-04 20.6 kB 22 weekly downloads
taskq.h 2010-09-04 7.0 kB 11 weekly downloads
taskq.c 2010-09-04 8.2 kB 11 weekly downloads
README.txt 2010-09-04 21.0 kB 11 weekly downloads
mcp.c 2010-09-04 4.1 kB 11 weekly downloads
makefile 2010-09-04 2.0 kB 11 weekly downloads
libtaskq_defs.h 2010-09-04 1.7 kB 11 weekly downloads
libtaskq.h 2010-09-04 1.4 kB 11 weekly downloads
The libtaskq library intends to provide a userland implementation of the Task Queue facility from the OpenSolaris kernel. Task Queues are used extensively in the OpenSolaris kernel and provides a flexible framework for thread management with a simple interface. It is somewhat similar to Dispatch Queues from Apple's Grand Central Dispatch, but is more flexible. For now the full description of the Task Queue framework is reproduced from the OpenSolaris kernel documentation below. Note that the current userland implementation does not cover every feature described below and some of the semantics in the description below may be relevant only in a kernel context. This description will be updated in due course. ============================================================================ /* * Kernel task queues: general-purpose asynchronous task scheduling. * * A common problem in kernel programming is the need to schedule tasks * to be performed later, by another thread. There are several reasons * you may want or need to do this: * * (1) The task isn't time-critical, but your current code path is. * * (2) The task may require grabbing locks that you already hold. * * (3) The task may need to block (e.g. to wait for memory), but you * cannot block in your current context. * * (4) Your code path can't complete because of some condition, but you can't * sleep or fail, so you queue the task for later execution when condition * disappears. * * (5) You just want a simple way to launch multiple tasks in parallel. * * Task queues provide such a facility. In its simplest form (used when * performance is not a critical consideration) a task queue consists of a * single list of tasks, together with one or more threads to service the * list. There are some cases when this simple queue is not sufficient: * * (1) The task queues are very hot and there is a need to avoid data and lock * contention over global resources. * * (2) Some tasks may depend on other tasks to complete, so they can't be put in * the same list managed by the same thread. * * (3) Some tasks may block for a long time, and this should not block other * tasks in the queue. * * To provide useful service in such cases we define a "dynamic task queue" * which has an individual thread for each of the tasks. These threads are * dynamically created as they are needed and destroyed when they are not in * use. The API for managing task pools is the same as for managing task queues * with the exception of a taskq creation flag TASKQ_DYNAMIC which tells that * dynamic task pool behavior is desired. * * Dynamic task queues may also place tasks in the normal queue (called "backing * queue") when task pool runs out of resources. Users of task queues may * disallow such queued scheduling by specifying TQ_NOQUEUE in the dispatch * flags. * * The backing task queue is also used for scheduling internal tasks needed for * dynamic task queue maintenance. * * INTERFACES ================================================================== * * taskq_t *taskq_create(name, nthreads, pri, minalloc, maxall, flags); * * Create a taskq with specified properties. * Possible 'flags': * * TASKQ_DYNAMIC: Create task pool for task management. If this flag is * specified, 'nthreads' specifies the maximum number of threads in * the task queue. Task execution order for dynamic task queues is * not predictable. * * If this flag is not specified (default case) a * single-list task queue is created with 'nthreads' threads * servicing it. Entries in this queue are managed by * taskq_ent_alloc() and taskq_ent_free() which try to keep the * task population between 'minalloc' and 'maxalloc', but the * latter limit is only advisory for TQ_SLEEP dispatches and the * former limit is only advisory for TQ_NOALLOC dispatches. If * TASKQ_PREPOPULATE is set in 'flags', the taskq will be * prepopulated with 'minalloc' task structures. * * Since non-DYNAMIC taskqs are queues, tasks are guaranteed to be * executed in the order they are scheduled if nthreads == 1. * If nthreads > 1, task execution order is not predictable. * * TASKQ_PREPOPULATE: Prepopulate task queue with threads. * Also prepopulate the task queue with 'minalloc' task structures. * * TASKQ_THREADS_CPU_PCT: This flag specifies that 'nthreads' should be * interpreted as a percentage of the # of online CPUs on the * system. The taskq subsystem will automatically adjust the * number of threads in the taskq in response to CPU online * and offline events, to keep the ratio. nthreads must be in * the range [0,100]. * * The calculation used is: * * MAX((ncpus_online * percentage)/100, 1) * * This flag is not supported for DYNAMIC task queues. * This flag is not compatible with TASKQ_CPR_SAFE. * * TASKQ_CPR_SAFE: This flag specifies that users of the task queue will * use their own protocol for handling CPR issues. This flag is not * supported for DYNAMIC task queues. This flag is not compatible * with TASKQ_THREADS_CPU_PCT. * * The 'pri' field specifies the default priority for the threads that * service all scheduled tasks. * * taskq_t *taskq_create_instance(name, instance, nthreads, pri, minalloc, * maxall, flags); * * Like taskq_create(), but takes an instance number (or -1 to indicate * no instance). * * taskq_t *taskq_create_proc(name, nthreads, pri, minalloc, maxall, proc, * flags); * * Like taskq_create(), but creates the taskq threads in the specified * system process. If proc != &p0, this must be called from a thread * in that process. * * taskq_t *taskq_create_sysdc(name, nthreads, minalloc, maxall, proc, * dc, flags); * * Like taskq_create_proc(), but the taskq threads will use the * System Duty Cycle (SDC) scheduling class with a duty cycle of dc. * * void taskq_destroy(tap): * * Waits for any scheduled tasks to complete, then destroys the taskq. * Caller should guarantee that no new tasks are scheduled in the closing * taskq. * * taskqid_t taskq_dispatch(tq, func, arg, flags): * * Dispatches the task "func(arg)" to taskq. The 'flags' indicates whether * the caller is willing to block for memory. The function returns an * opaque value which is zero iff dispatch fails. If flags is TQ_NOSLEEP * or TQ_NOALLOC and the task can't be dispatched, taskq_dispatch() fails * and returns (taskqid_t)0. * * ASSUMES: func != NULL. * * Possible flags: * TQ_NOSLEEP: Do not wait for resources; may fail. * * TQ_NOALLOC: Do not allocate memory; may fail. May only be used with * non-dynamic task queues. * * TQ_NOQUEUE: Do not enqueue a task if it can't dispatch it due to * lack of available resources and fail. If this flag is not * set, and the task pool is exhausted, the task may be scheduled * in the backing queue. This flag may ONLY be used with dynamic * task queues. * * NOTE: This flag should always be used when a task queue is used * for tasks that may depend on each other for completion. * Enqueueing dependent tasks may create deadlocks. * * TQ_SLEEP: May block waiting for resources. May still fail for * dynamic task queues if TQ_NOQUEUE is also specified, otherwise * always succeed. * * TQ_FRONT: Puts the new task at the front of the queue. Be careful. * * NOTE: Dynamic task queues are much more likely to fail in * taskq_dispatch() (especially if TQ_NOQUEUE was specified), so it * is important to have backup strategies handling such failures. * * void taskq_wait(tq): * * Waits for all previously scheduled tasks to complete. * * NOTE: It does not stop any new task dispatches. * Do NOT call taskq_wait() from a task: it will cause deadlock. * * void taskq_suspend(tq) * * Suspend all task execution. Tasks already scheduled for a dynamic task * queue will still be executed, but all new scheduled tasks will be * suspended until taskq_resume() is called. * * int taskq_suspended(tq) * * Returns 1 if taskq is suspended and 0 otherwise. It is intended to * ASSERT that the task queue is suspended. * * void taskq_resume(tq) * * Resume task queue execution. * * int taskq_member(tq, thread) * * Returns 1 if 'thread' belongs to taskq 'tq' and 0 otherwise. The * intended use is to ASSERT that a given function is called in taskq * context only. * * system_taskq * * Global system-wide dynamic task queue for common uses. It may be used by * any subsystem that needs to schedule tasks and does not need to manage * its own task queues. It is initialized quite early during system boot. * * IMPLEMENTATION ============================================================== * * This is schematic representation of the task queue structures. * * taskq: * +-------------+ * | tq_lock | +---< taskq_ent_free() * +-------------+ | * |... | | tqent: tqent: * +-------------+ | +------------+ +------------+ * | tq_freelist |-->| tqent_next |--> ... ->| tqent_next | * +-------------+ +------------+ +------------+ * |... | | ... | | ... | * +-------------+ +------------+ +------------+ * | tq_task | | * | | +-------------->taskq_ent_alloc() * +--------------------------------------------------------------------------+ * | | | tqent tqent | * | +---------------------+ +--> +------------+ +--> +------------+ | * | | ... | | | func, arg | | | func, arg | | * +>+---------------------+ <---|-+ +------------+ <---|-+ +------------+ | * | tq_taskq.tqent_next | ----+ | | tqent_next | --->+ | | tqent_next |--+ * +---------------------+ | +------------+ ^ | +------------+ * +-| tq_task.tqent_prev | +--| tqent_prev | | +--| tqent_prev | ^ * | +---------------------+ +------------+ | +------------+ | * | |... | | ... | | | ... | | * | +---------------------+ +------------+ | +------------+ | * | ^ | | * | | | | * +--------------------------------------+--------------+ TQ_APPEND() -+ * | | | * |... | taskq_thread()-----+ * +-------------+ * | tq_buckets |--+-------> [ NULL ] (for regular task queues) * +-------------+ | * | DYNAMIC TASK QUEUES: * | * +-> taskq_bucket[nCPU] taskq_bucket_dispatch() * +-------------------+ ^ * +--->| tqbucket_lock | | * | +-------------------+ +--------+ +--------+ * | | tqbucket_freelist |-->| tqent |-->...| tqent | ^ * | +-------------------+<--+--------+<--...+--------+ | * | | ... | | thread | | thread | | * | +-------------------+ +--------+ +--------+ | * | +-------------------+ | * taskq_dispatch()--+--->| tqbucket_lock | TQ_APPEND()------+ * TQ_HASH() | +-------------------+ +--------+ +--------+ * | | tqbucket_freelist |-->| tqent |-->...| tqent | * | +-------------------+<--+--------+<--...+--------+ * | | ... | | thread | | thread | * | +-------------------+ +--------+ +--------+ * +---> ... * * * Task queues use tq_task field to link new entry in the queue. The queue is a * circular doubly-linked list. Entries are put in the end of the list with * TQ_APPEND() and processed from the front of the list by taskq_thread() in * FIFO order. Task queue entries are cached in the free list managed by * taskq_ent_alloc() and taskq_ent_free() functions. * * All threads used by task queues mark t_taskq field of the thread to * point to the task queue. * * Taskq Thread Management ----------------------------------------------------- * * Taskq's non-dynamic threads are managed with several variables and flags: * * * tq_nthreads - The number of threads in taskq_thread() for the * taskq. * * * tq_active - The number of threads not waiting on a CV in * taskq_thread(); includes newly created threads * not yet counted in tq_nthreads. * * * tq_nthreads_target * - The number of threads desired for the taskq. * * * tq_flags & TASKQ_CHANGING * - Indicates that tq_nthreads != tq_nthreads_target. * * * tq_flags & TASKQ_THREAD_CREATED * - Indicates that a thread is being created in the taskq. * * During creation, tq_nthreads and tq_active are set to 0, and * tq_nthreads_target is set to the number of threads desired. The * TASKQ_CHANGING flag is set, and taskq_thread_create() is called to * create the first thread. taskq_thread_create() increments tq_active, * sets TASKQ_THREAD_CREATED, and creates the new thread. * * Each thread starts in taskq_thread(), clears the TASKQ_THREAD_CREATED * flag, and increments tq_nthreads. It stores the new value of * tq_nthreads as its "thread_id", and stores its thread pointer in the * tq_threadlist at the (thread_id - 1). We keep the thread_id space * densely packed by requiring that only the largest thread_id can exit during * normal adjustment. The exception is during the destruction of the * taskq; once tq_nthreads_target is set to zero, no new threads will be created * for the taskq queue, so every thread can exit without any ordering being * necessary. * * Threads will only process work if their thread id is <= tq_nthreads_target. * * When TASKQ_CHANGING is set, threads will check the current thread target * whenever they wake up, and do whatever they can to apply its effects. * * TASKQ_THREAD_CPU_PCT -------------------------------------------------------- * * When a taskq is created with TASKQ_THREAD_CPU_PCT, we store their requested * percentage in tq_threads_ncpus_pct, start them off with the correct thread * target, and add them to the taskq_cpupct_list for later adjustment. * * We register taskq_cpu_setup() to be called whenever a CPU changes state. It * walks the list of TASKQ_THREAD_CPU_PCT taskqs, adjusts their nthread_target * if need be, and wakes up all of the threads to process the change. * * Dynamic Task Queues Implementation ------------------------------------------ * * For a dynamic task queues there is a 1-to-1 mapping between a thread and * taskq_ent_structure. Each entry is serviced by its own thread and each thread * is controlled by a single entry. * * Entries are distributed over a set of buckets. To avoid using modulo * arithmetics the number of buckets is 2^n and is determined as the nearest * power of two roundown of the number of CPUs in the system. Tunable * variable 'taskq_maxbuckets' limits the maximum number of buckets. Each entry * is attached to a bucket for its lifetime and can't migrate to other buckets. * * Entries that have scheduled tasks are not placed in any list. The dispatch * function sets their "func" and "arg" fields and signals the corresponding * thread to execute the task. Once the thread executes the task it clears the * "func" field and places an entry on the bucket cache of free entries pointed * by "tqbucket_freelist" field. ALL entries on the free list should have "func" * field equal to NULL. The free list is a circular doubly-linked list identical * in structure to the tq_task list above, but entries are taken from it in LIFO * order - the last freed entry is the first to be allocated. The * taskq_bucket_dispatch() function gets the most recently used entry from the * free list, sets its "func" and "arg" fields and signals a worker thread. * * After executing each task a per-entry thread taskq_d_thread() places its * entry on the bucket free list and goes to a timed sleep. If it wakes up * without getting new task it removes the entry from the free list and destroys * itself. The thread sleep time is controlled by a tunable variable * `taskq_thread_timeout'. * * There are various statistics kept in the bucket which allows for later * analysis of taskq usage patterns. Also, a global copy of taskq creation and * death statistics is kept in the global taskq data structure. Since thread * creation and death happen rarely, updating such global data does not present * a performance problem. * * NOTE: Threads are not bound to any CPU and there is absolutely no association * between the bucket and actual thread CPU, so buckets are used only to * split resources and reduce resource contention. Having threads attached * to the CPU denoted by a bucket may reduce number of times the job * switches between CPUs. * * Current algorithm creates a thread whenever a bucket has no free * entries. It would be nice to know how many threads are in the running * state and don't create threads if all CPUs are busy with existing * tasks, but it is unclear how such strategy can be implemented. * * Currently buckets are created statically as an array attached to task * queue. On some system with nCPUs < max_ncpus it may waste system * memory. One solution may be allocation of buckets when they are first * touched, but it is not clear how useful it is. * * SUSPEND/RESUME implementation ----------------------------------------------- * * Before executing a task taskq_thread() (executing non-dynamic task * queues) obtains taskq's thread lock as a reader. The taskq_suspend() * function gets the same lock as a writer blocking all non-dynamic task * execution. The taskq_resume() function releases the lock allowing * taskq_thread to continue execution. * * For dynamic task queues, each bucket is marked as TQBUCKET_SUSPEND by * taskq_suspend() function. After that taskq_bucket_dispatch() always * fails, so that taskq_dispatch() will either enqueue tasks for a * suspended backing queue or fail if TQ_NOQUEUE is specified in dispatch * flags. * * NOTE: taskq_suspend() does not immediately block any tasks already * scheduled for dynamic task queues. It only suspends new tasks * scheduled after taskq_suspend() was called. * * taskq_member() function works by comparing a thread t_taskq pointer with * the passed thread pointer. * * LOCKS and LOCK Hierarchy ---------------------------------------------------- * * There are three locks used in task queues: * * 1) The taskq_t's tq_lock, protecting global task queue state. * * 2) Each per-CPU bucket has a lock for bucket management. * * 3) The global taskq_cpupct_lock, which protects the list of * TASKQ_THREADS_CPU_PCT taskqs. * * If both (1) and (2) are needed, tq_lock should be taken *after* the bucket * lock. * * If both (1) and (3) are needed, tq_lock should be taken *after* * taskq_cpupct_lock. * * DEBUG FACILITIES ------------------------------------------------------------ * * For DEBUG kernels it is possible to induce random failures to * taskq_dispatch() function when it is given TQ_NOSLEEP argument. The value of * taskq_dmtbf and taskq_smtbf tunables control the mean time between induced * failures for dynamic and static task queues respectively. * * Setting TASKQ_STATISTIC to 0 will disable per-bucket statistics. * * TUNABLES -------------------------------------------------------------------- * * system_taskq_size - Size of the global system_taskq. * This value is multiplied by nCPUs to determine * actual size. * Default value: 64 * * taskq_minimum_nthreads_max * - Minimum size of the thread list for a taskq. * Useful for testing different thread pool * sizes by overwriting tq_nthreads_target. * * taskq_thread_timeout - Maximum idle time for taskq_d_thread() * Default value: 5 minutes * * taskq_maxbuckets - Maximum number of buckets in any task queue * Default value: 128 * * taskq_search_depth - Maximum # of buckets searched for a free entry * Default value: 4 * * taskq_dmtbf - Mean time between induced dispatch failures * for dynamic task queues. * Default value: UINT_MAX (no induced failures) * * taskq_smtbf - Mean time between induced dispatch failures * for static task queues. * Default value: UINT_MAX (no induced failures) * * CONDITIONAL compilation ----------------------------------------------------- * * TASKQ_STATISTIC - If set will enable bucket statistic (default). * */
Source: README.txt, updated 2010-09-04