Linux workqueue介绍

Linux中的workqueue机制就是为了简化内核线程的创建。通过调用workqueue的接口就能创建内核线程。并且可以根据当前系统的CPU的个数创建线程的数量,使得线程处理的事务能够并行化。

工作队列(workqueue)是另外一种将工作推后执行的形式。工作队列可以把工作推后,交由一个内核线程去执行,也就是说,这个下半部分可以在进程上下文执行。最重要的就是工作队列允许被重新调度甚至睡眠。

为什么需要工作队列?

在内核代码中,经常会遇到不能或不合适马上调用某个处理过程,此时希望将该工作推给某个内核线程执行,这样做的原因有很多,比如:

  • 中断触发了某个过程的执行条件,而该过程执行时间较长或者会调用导致睡眠的函数,则该过程不应该在中断上下文中立即被调用。
  • 类似于中断,一些紧急性的任务不希望执行比较耗时的非关键过程,则需要把该过程提交到低优先级线程执行。比如一个轮询的通信接收线程,它需要快速完成检测和接收数据,而对数据的解析则应该交给低优先级线程慢慢处理。
  • 有时希望将一些工作集中起来以获取批处理的性能;或者合并缩减一些执行线程,减少资源消耗。

基于以上需求,人们开发除了工作队列这一机制。工作队列不光在操作系统内核中会用到,一些应用程序或协议栈也会实现自己的工作队列。

工作队列的概念

工作队列(workqueue):是将操作(或回调)延期异步执行的一种机制。工作队列可以把工作推后,交由一个内核线程去执行,并且工作队列是执行在线程上下文中,因此工作队列执行过程中可以被重新调度、抢占、睡眠。

工作项(work item):是工作队列中的元素,是一个回调函数和多个回调函数参数的集合,有时也会有额外的属性成员,总之通过一个结构体即可记录和描述一个工作项。

关键数据结构

work_struct

struct work_struct {
	atomic_long_t data;
	struct list_head entry;
	work_func_t func;
#ifdef CONFIG_LOCKDEP
	struct lockdep_map lockdep_map;
#endif
	ANDROID_KABI_RESERVE(1);
	ANDROID_KABI_RESERVE(2);
};

 workqueue_struct

如何传参?

func的参数是一个work_struct指针,指向的数据就是定义func的work_struct

看到这里,会有两个疑问:

第一:如何把用户的数据作为参数传递给func呢?

第二:如何实现延迟工作?

解决第一个问题:工作队列需要把work_struct定义在用户的数据结构中,然后通过container_of来得到用户数据。

对于第二个问题,新的工作队列把timer拿掉的用意是使得work_struct更加单纯。首先回忆一下以前的版本,只有在需要延迟执行工作时才会用到timer,普通情况下timer是没有意义的,所以之前的做法在一定程序上有些浪费资源。所以新版本中,将timer从work_struct中拿掉,然后又定义了一个新的结构delayed_work用于延迟执行。

struct delayed_work {

struct work_struct work;

struct timer_list timer;

};

API介绍

不是所有的驱动程序都必须有自己的工作队列。驱动程序可以使用内核提供的缺省工作队列。由于这个工作队列由很多驱动程序共享,任务可能会需要比较长一段时间才能开始执行。为了解决这一问题,工作函数中的延迟应该保持最小或者不要延时。

创建工作队列

每个工作队列由一个专门的线程(即一个工作队列一个线程),所有来自运行队列的任务在进程的上下文中运行(这样它们可以休眠)。驱动程序可以创建并使用它们自己的工作队列,或者使用内核的一个工作队列。

//创建工作队列
struct workqueue_struct *create_workqueue(const char *name);

 创建工作队列的任务

工作队列任务可以在编译时或者运行时创建。

//编译时创建
DECLARE_WORK(name, void (*function)(void *), void *data);
//运行时创建
INIT_WORK(struct work_struct *work, void (*function)(void *), void *data);

 将任务添加到工作队列中

//添加到指定工作队列
int queue_work(struct workqueue_struct *queue, struct work_struct *work);
        <br>
int queue_delayed_work(struct workqueue_struct *queue, struct work_struct
        <br>
*work, unsigned long delay);


//添加到内核默认工作队列
int schedule_work(struct work_struct *work);
int schedule_delayed_work(struct work_struct *work, unsigned long delay);

delay:保证至少在经过一段给定的最小延迟时间以后,工作队列中的任务才可以真正执行。

队列和任务的清除操作

//取消任务
int cancel_delayed_work(struct work_struct *work);


//清空队列中的所有任务
void flush_workqueue(struct workqueue_struct *queue);


//销毁工作队列
void destroy_workqueue(struct workqueue_struct *queue);

 举例

struct my_struct_t {

char *name;

struct work_struct my_work;

};

void my_func(struct work_struct *work)

{

struct my_struct_t *my_name = container_of(work, struct my_struct_t, my_work);

printk(KERN_INFO “Hello world, my name is %s!\n”, my_name->name);

}

struct workqueue_struct *my_wq = create_workqueue(“my wq”);

struct my_struct_t my_name;

my_name.name = “Jack”;

INIT_WORK(&(my_name.my_work), my_func);

queue_work(my_wq, &my_work);

工作原理

workqueue是内核里面很重要的一个机制,特别是内核驱动,一般的小型任务(work)都不会自己起一个线程来处理,而是扔到workqueue中处理。workqueue的主要工作就是用进程上下文来处理内核中大量的小任务。

所以workqueue的主要涉及思想:一个是并行,多个work不要相互阻塞。另一个是节省资源,多个work尽量共享资源(进程、调度、内存),不要造成系统过多的资源浪费。

为了实现设计思想,workqueue的设计

实现也更新了很多版本。最新的workqueue实现叫做CMWQ(concurrency Managed Workqueue),也就是用更加只能的算法来实现“并行和节省”。新版本的workqueue创建函数改成alloc_workqueue(),旧版本的函数create_workqueue()逐渐会被废弃。

CMWQ的几个基本概念

关于workqueue中几个概念都是work相关的数据结构,非常容易混淆,大概可以这样理解。

1)work:工作

2)workqueue:工作集合。workqueue和work是一对多的关系

3)worker: 工人。在代码中worker对应一个work_thread()内核线程

4)worker_pool: 工人的集合。worker_pool和worker是一对多的关系

5)PWQ(pool_workqueue):中间人/中介,负责建立workqueue和worker_pool之间的关系,workqueue和pwq是一对多的关系,pwq和worker_pool是一对一的关系。

worker_pool

每个执行work的线程叫做worker,一组worker的结合叫做worker_pool。CMWQ的精髓就在worker_pool里面的worker的动态增减的管理上 manage_workers()。

CMWQ对worker_pool分成两类:

normal worker_pool,给通用的workqueue使用;

unbound worker_pool,给WQ_UNBOUND类型的workqueue使用;

normal worker_pool

默认work是在normal worker_pool中处理的。系统的规划是每个CPU创建两个normal worker_pool:一个Nomal的优先级(nice=0),一个高优先级(nice=HIGHPRI_NICE_LEVEL),对应创建出来的worker进程的nice不一样。

每个worker对应一个worker_thread()内核线程,一个worker_pool包含一个或者多个worker,worker_pool中worker的数量是根据worker_pool中work的负载来动态增减的。

我们可以通过ps aux | grep kworker命令来查看所有worker对应的内核线程,normal worker_pool对应内核线程(worker_thread())的命名规则是这样的:

snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
		 pool->attrs->nice < 0  ? "H" : "");

	worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
					      "kworker/%s", id_buf);

so 类似名字是 normal worker_pool:

shell@PRO5:/ $ ps | grep "kworker"
root      14    2     0      0     worker_thr 0000000000 S kworker/1:0H	// cpu1 高优先级 worker_pool 的第 0 个 worker 进程
root      17    2     0      0     worker_thr 0000000000 S kworker/2:0	// cpu2 低优先级 worker_pool 的第 0 个 worker 进程
root      18    2     0      0     worker_thr 0000000000 S kworker/2:0H	// cpu2 高优先级 worker_pool 的第 0 个 worker 进程
root      23699 2     0      0     worker_thr 0000000000 S kworker/0:1	// cpu0 低优先级 worker_pool 的第 1 个 worker 进程

unbound worker_pool

大部分的work都是通过normal worker_pool来执行的(例如通过schedule_work()、schedule_work_on()压入到系统workqueue中的work),最后都是通过normal worker_pool中的worker来执行的。这些worker是和某个CPU绑定的,work一旦被worker开始执行,都是一直运行到某个CPU上的,不会切换CPU。

unbound worker_pool相对应的意思,就是worker可以在多个CPU上调度。但是它其实也是绑定的,只不过它绑定的单位不是CPU,而是node,所谓的node是对NUMA(Non uniform Memory Access Architecture)系统来说的,NUMA可能存在多个Node,每个node可能包含一个或者多个CPU。

unbound worker_pool对应内核线程(worker_thread())的命名规则是这样的:

snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);

	worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
					      "kworker/%s", id_buf);

so 类似名字是 unbound worker_pool:

shell@PRO5:/ $ ps | grep "kworker"
root      23906 2     0      0     worker_thr 0000000000 S kworker/u20:2/* unbound pool 20 的第 2 个 worker 进程*/
root      24564 2     0      0     worker_thr 0000000000 S kworker/u20:0/* unbound pool 20 的第 0 个 worker 进程*/
root      24622 2     0      0     worker_thr 0000000000 S kworker/u21:1/* unbound pool 21 的第 1 个 worker 进程*/

worker

每个worker对应一个worker_thread()内核线程,一个worker_pool对应一个或者多个worker。多个worker从同一个链表中worker_pool->worklist获取work进行处理。

这其中有几个重点:

  • worker怎么处理work;
  • worker_pool怎么动态管理worker的数量;

worker处理work

处理 work 的过程主要在 worker_thread() -> process_one_work() 中处理,我们具体看看代码的实现过程。

kernel/workqueue.c: worker_thread() -> process_one_work()

static int worker_thread(void *__worker)
{
	struct worker *worker = __worker;
	struct worker_pool *pool = worker->pool;

	/* tell the scheduler that this is a workqueue worker */
	worker->task->flags |= PF_WQ_WORKER;
woke_up:
	spin_lock_irq(&pool->lock);

	// (1) 是否 die
	/* am I supposed to die? */
	if (unlikely(worker->flags & WORKER_DIE)) {
		spin_unlock_irq(&pool->lock);
		WARN_ON_ONCE(!list_empty(&worker->entry));
		worker->task->flags &= ~PF_WQ_WORKER;

		set_task_comm(worker->task, "kworker/dying");
		ida_simple_remove(&pool->worker_ida, worker->id);
		worker_detach_from_pool(worker, pool);
		kfree(worker);
		return 0;
	}

	// (2) 脱离 idle 状态
	// 被唤醒之前 worker 都是 idle 状态
	worker_leave_idle(worker);
recheck:

	// (3) 如果需要本 worker 继续执行则继续,否则进入 idle 状态
	// need more worker 的条件: (pool->worklist != 0) && (pool->nr_running == 0)
	// worklist 上有 work 需要执行,并且现在没有处于 running 的 work
	/* no more worker necessary? */
	if (!need_more_worker(pool))
		goto sleep;

	// (4) 如果 (pool->nr_idle == 0),则启动创建更多的 worker
	// 说明 idle 队列中已经没有备用 worker 了,先创建 一些 worker 备用
	/* do we need to manage? */
	if (unlikely(!may_start_working(pool)) && manage_workers(worker))
		goto recheck;

	/*
	 * ->scheduled list can only be filled while a worker is
	 * preparing to process a work or actually processing it.
	 * Make sure nobody diddled with it while I was sleeping.
	 */
	WARN_ON_ONCE(!list_empty(&worker->scheduled));

	/*
	 * Finish PREP stage.  We're guaranteed to have at least one idle
	 * worker or that someone else has already assumed the manager
	 * role.  This is where @worker starts participating in concurrency
	 * management if applicable and concurrency management is restored
	 * after being rebound.  See rebind_workers() for details.
	 */
	worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);

	do {
		// (5) 如果 pool->worklist 不为空,从其中取出一个 work 进行处理
		struct work_struct *work =
			list_first_entry(&pool->worklist,
					 struct work_struct, entry);

		if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
			/* optimization path, not strictly necessary */
			// (6) 执行正常的 work
			process_one_work(worker, work);
			if (unlikely(!list_empty(&worker->scheduled)))
				process_scheduled_works(worker);
		} else {
			// (7) 执行系统特意 scheduled 给某个 worker 的 work
			// 普通的 work 是放在池子的公共 list 中的 pool->worklist
			// 只有一些特殊的 work 被特意派送给某个 worker 的 worker->scheduled
			// 包括:1、执行 flush_work 时插入的 barrier work;
			// 2、collision 时从其他 worker 推送到本 worker 的 work
			move_linked_works(work, &worker->scheduled, NULL);
			process_scheduled_works(worker);
		}
	// (8) worker keep_working 的条件:
	// pool->worklist 不为空 && (pool->nr_running <= 1)
	} while (keep_working(pool));

	worker_set_flags(worker, WORKER_PREP);supposed
sleep:
	// (9) worker 进入 idle 状态
	/*
	 * pool->lock is held and there's no work to process and no need to
	 * manage, sleep.  Workers are woken up only while holding
	 * pool->lock or from local cpu, so setting the current state
	 * before releasing pool->lock is enough to prevent losing any
	 * event.
	 */
	worker_enter_idle(worker);
	__set_current_state(TASK_INTERRUPTIBLE);
	spin_unlock_irq(&pool->lock);
	schedule();
	goto woke_up;
}
| →
static void process_one_work(struct worker *worker, struct work_struct *work)
__releases(&pool->lock)
__acquires(&pool->lock)
{
	struct pool_workqueue *pwq = get_work_pwq(work);
	struct worker_pool *pool = worker->pool;
	bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE;
	int work_color;
	struct worker *collision;
#ifdef CONFIG_LOCKDEP
	/*
	 * It is permissible to free the struct work_struct from
	 * inside the function that is called from it, this we need to
	 * take into account for lockdep too.  To avoid bogus "held
	 * lock freed" warnings as well as problems when looking into
	 * work->lockdep_map, make a copy and use that here.
	 */
	struct lockdep_map lockdep_map;

	lockdep_copy_map(&lockdep_map, &work->lockdep_map);
#endif
	/* ensure we're on the correct CPU */
	WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&
		     raw_smp_processor_id() != pool->cpu);

	// (8.1) 如果 work 已经在 worker_pool 的其他 worker 上执行,
	// 将 work 放入对应 worker 的 scheduled 队列中延后执行
	/*
	 * A single work shouldn't be executed concurrently by
	 * multiple workers on a single cpu.  Check whether anyone is
	 * already processing the work.  If so, defer the work to the
	 * currently executing one.
	 */
	collision = find_worker_executing_work(pool, work);
	if (unlikely(collision)) {
		move_linked_works(work, &collision->scheduled, NULL);
		return;
	}

	// (8.2) 将 worker 加入 busy 队列 pool->busy_hash
	/* claim and dequeue */
	debug_work_deactivate(work);
	hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work);
	worker->current_work = work;
	worker->current_func = work->func;
	worker->current_pwq = pwq;
	work_color = get_work_color(work);

	list_del_init(&work->entry);

	// (8.3) 如果 work 所在的 wq 是 cpu 密集型的 WQ_CPU_INTENSIVE
	// 则当前 work 的执行脱离 worker_pool 的动态调度,成为一个独立的线程
	/*
	 * CPU intensive works don't participate in concurrency management.
	 * They're the scheduler's responsibility.  This takes @worker out
	 * of concurrency management and the next code block will chain
	 * execution of the pending work items.
	 */
	if (unlikely(cpu_intensive))
		worker_set_flags(worker, WORKER_CPU_INTENSIVE);

	// (8.4) 在 UNBOUND 或者 CPU_INTENSIVE work 中判断是否需要唤醒 idle worker
	// 普通 work 不会执行这个操作
	/*
	 * Wake up another worker if necessary.  The condition is always
	 * false for normal per-cpu workers since nr_running would always
	 * be >= 1 at this point.  This is used to chain execution of the
	 * pending work items for WORKER_NOT_RUNNING workers such as the
	 * UNBOUND and CPU_INTENSIVE ones.
	 */
	if (need_more_worker(pool))
		wake_up_worker(pool);

	/*
	 * Record the last pool and clear PENDING which should be the last
	 * update to @work.  Also, do this inside @pool->lock so that
	 * PENDING and queued state changes happen together while IRQ is
	 * disabled.
	 */
	set_work_pool_and_clear_pending(work, pool->id);

	spin_unlock_irq(&pool->lock);

	lock_map_acquire_read(&pwq->wq->lockdep_map);
	lock_map_acquire(&lockdep_map);
	trace_workqueue_execute_start(work);
	// (8.5) 执行 work 函数
	worker->current_func(work);
	/*
	 * While we must be careful to not use "work" after this, the trace
	 * point will only record its address.
	 */
	trace_workqueue_execute_end(work);
	lock_map_release(&lockdep_map);
	lock_map_release(&pwq->wq->lockdep_map);

	if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
		pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n"
		       "     last function: %pf\n",
		       current->comm, preempt_count(), task_pid_nr(current),
		       worker->current_func);
		debug_show_held_locks(current);
		dump_stack();
	}

	/*
	 * The following prevents a kworker from hogging CPU on !PREEMPT
	 * kernels, where a requeueing work item waiting for something to
	 * happen could deadlock with stop_machine as such work item could
	 * indefinitely requeue itself while all other CPUs are trapped in
	 * stop_machine. At the same time, report a quiescent RCU state so
	 * the same condition doesn't freeze RCU.
	 */
	cond_resched_rcu_qs();

	spin_lock_irq(&pool->lock);

	/* clear cpu intensive status */
	if (unlikely(cpu_intensive))
		worker_clr_flags(worker, WORKER_CPU_INTENSIVE);

	/* we're done with it, release */
	hash_del(&worker->hentry);
	worker->current_work = NULL;
	worker->current_func = NULL;
	worker->current_pwq = NULL;
	worker->desc_valid = false;
	pwq_dec_nr_in_flight(pwq, work_color);
}

worker_pool 动态管理 worker

worker_pool 怎么来动态增减 worker,这部分的算法是 CMWQ 的核心。其思想如下:

  • worker_pool中的worker有3中状态:idle、running、suspend;
  • 如果worker_pool中有work需要处理,保持至少一个running worker来处理;
  • running worker在处理work的过程中进入了阻塞suspend状态,为了保持其他work的执行,需要唤醒新的idle worker来处理work;
  • 如果有work需要执行且running worker大于1个,会让多余的running worker进入idle状态。
  • 如果没有work需要执行,会让所有work进入idle状态;
  • 如果创建的worker过多,destroy_worker在300s(IDLE_WORKER_TIMEOUT)时间内没有再次运行的idle_worker。

workqueue

workqueue就是存放一组work的集合,基本可以分为两类:一类是系统创建的workqueue,一类是用户自己创建的workqueue。不论是系统还是用户的workqueue,如果没有指定WQ_UNBOUND,默认都是和normal worker_pool绑定。

系统wrokqueue

系统在初始化时创建了一批默认的workqueue:system_wq、system_highpri_wq、system_unbound_wq、system_freezable_wq、system_power_efficient_wq、system_freezable_power_efficient_wq。

像system_wq,就是schedule_work()默认使用的。

kernel/workqueue.c:init_workqueues()

static int __init init_workqueues(void)
{
	system_wq = alloc_workqueue("events", 0, 0);
	system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
	system_long_wq = alloc_workqueue("events_long", 0, 0);
	system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
					    WQ_UNBOUND_MAX_ACTIVE);
	system_freezable_wq = alloc_workqueue("events_freezable",
					      WQ_FREEZABLE, 0);
	system_power_efficient_wq = alloc_workqueue("events_power_efficient",
					      WQ_POWER_EFFICIENT, 0);

	system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient",
					      WQ_FREEZABLE | WQ_POWER_EFFICIENT,
					      0);
}

workqueue 创建

详细过程见上几节的代码分析:alloc_workqueue() -> __alloc_workqueue_key() -> alloc_and_link_pwqs()。

queue_work()

将work压入到workqueue当中。

kernel/workqueue.c: queue_work() -> queue_work_on() -> __queue_work()

static void __queue_work(int cpu, struct workqueue_struct *wq,
			 struct work_struct *work)
{
	struct pool_workqueue *pwq;
	struct worker_pool *last_pool;
	struct list_head *worklist;
	unsigned int work_flags;
	unsigned int req_cpu = cpu;

	/*
	 * While a work item is PENDING && off queue, a task trying to
	 * steal the PENDING will busy-loop waiting for it to either get
	 * queued or lose PENDING.  Grabbing PENDING and queueing should
	 * happen with IRQ disabled.
	 */
	WARN_ON_ONCE(!irqs_disabled());

	debug_work_activate(work);

	/* if draining, only works from the same workqueue are allowed */
	if (unlikely(wq->flags & __WQ_DRAINING) &&
	    WARN_ON_ONCE(!is_chained_work(wq)))
		return;
retry:
	// (1) 如果没有指定 cpu,则使用当前 cpu
	if (req_cpu == WORK_CPU_UNBOUND)
		cpu = raw_smp_processor_id();

	/* pwq which will be used unless @work is executing elsewhere */
	if (!(wq->flags & WQ_UNBOUND))
		// (2) 对于 normal wq,使用当前 cpu 对应的 normal worker_pool
		pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
	else
		// (3) 对于 unbound wq,使用当前 cpu 对应 node 的 worker_pool
		pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));

	// (4) 如果 work 在其他 worker 上正在被执行,把 work 压到对应的 worker 上去
	// 避免 work 出现重入的问题
	/*
	 * If @work was previously on a different pool, it might still be
	 * running there, in which case the work needs to be queued on that
	 * pool to guarantee non-reentrancy.
	 */
	last_pool = get_work_pool(work);
	if (last_pool && last_pool != pwq->pool) {
		struct worker *worker;

		spin_lock(&last_pool->lock);

		worker = find_worker_executing_work(last_pool, work);

		if (worker && worker->current_pwq->wq == wq) {
			pwq = worker->current_pwq;
		} else {
			/* meh... not running there, queue here */
			spin_unlock(&last_pool->lock);
			spin_lock(&pwq->pool->lock);
		}
	} else {
		spin_lock(&pwq->pool->lock);
	}

	/*
	 * pwq is determined and locked.  For unbound pools, we could have
	 * raced with pwq release and it could already be dead.  If its
	 * refcnt is zero, repeat pwq selection.  Note that pwqs never die
	 * without another pwq replacing it in the numa_pwq_tbl or while
	 * work items are executing on it, so the retrying is guaranteed to
	 * make forward-progress.
	 */
	if (unlikely(!pwq->refcnt)) {
		if (wq->flags & WQ_UNBOUND) {
			spin_unlock(&pwq->pool->lock);
			cpu_relax();
			goto retry;
		}
		/* oops */
		WARN_ONCE(true, "workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt",
			  wq->name, cpu);
	}

	/* pwq determined, queue */
	trace_workqueue_queue_work(req_cpu, pwq, work);

	if (WARN_ON(!list_empty(&work->entry))) {
		spin_unlock(&pwq->pool->lock);
		return;
	}

	pwq->nr_in_flight[pwq->work_color]++;
	work_flags = work_color_to_flags(pwq->work_color);

	// (5) 如果还没有达到 max_active,将 work 挂载到 pool->worklist
	if (likely(pwq->nr_active < pwq->max_active)) {
		trace_workqueue_activate_work(work);
		pwq->nr_active++;
		worklist = &pwq->pool->worklist;
	// 否则,将 work 挂载到临时队列 pwq->delayed_works
	} else {
		work_flags |= WORK_STRUCT_DELAYED;
		worklist = &pwq->delayed_works;
	}

	// (6) 将 work 压入 worklist 当中
	insert_work(pwq, work, worklist, work_flags);

	spin_unlock(&pwq->pool->lock);
}

flush_work()

flush某个work,确保work执行完成。

怎么判断异步的work已经执行完成?这里面使用了一个技巧:在目标work后面插入一个新的work wq_barrier,如果wq_barrier执行完成,那么目标work肯定已经执行完成。

kernel/workqueue.c: queue_work() -> queue_work_on() -> __queue_work()

/**
 * flush_work - wait for a work to finish executing the last queueing instance
 * @work: the work to flush
 *
 * Wait until @work has finished execution.  @work is guaranteed to be idle
 * on return if it hasn't been requeued since flush started.
 *
 * Return:
 * %true if flush_work() waited for the work to finish execution,
 * %false if it was already idle.
 */
bool flush_work(struct work_struct *work)
{
	struct wq_barrier barr;

	lock_map_acquire(&work->lockdep_map);
	lock_map_release(&work->lockdep_map);

	if (start_flush_work(work, &barr)) {
		// 等待 barr work 执行完成的信号
		wait_for_completion(&barr.done);
		destroy_work_on_stack(&barr.work);
		return true;
	} else {
		return false;
	}
}
| →
static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr)
{
	struct worker *worker = NULL;
	struct worker_pool *pool;
	struct pool_workqueue *pwq;

	might_sleep();

	// (1) 如果 work 所在 worker_pool 为 NULL,说明 work 已经执行完
	local_irq_disable();
	pool = get_work_pool(work);
	if (!pool) {
		local_irq_enable();
		return false;
	}

	spin_lock(&pool->lock);
	/* see the comment in try_to_grab_pending() with the same code */
	pwq = get_work_pwq(work);
	if (pwq) {
		// (2) 如果 work 所在 pwq 指向的 worker_pool 不等于上一步得到的 worker_pool,说明 work 已经执行完
		if (unlikely(pwq->pool != pool))
			goto already_gone;
	} else {
		// (3) 如果 work 所在 pwq 为 NULL,并且也没有在当前执行的 work 中,说明 work 已经执行完
		worker = find_worker_executing_work(pool, work);
		if (!worker)
			goto already_gone;
		pwq = worker->current_pwq;
	}

	// (4) 如果 work 没有执行完,向 work 的后面插入 barr work
	insert_wq_barrier(pwq, barr, work, worker);
	spin_unlock_irq(&pool->lock);

	/*
	 * If @max_active is 1 or rescuer is in use, flushing another work
	 * item on the same workqueue may lead to deadlock.  Make sure the
	 * flusher is not running on the same workqueue by verifying write
	 * access.
	 */
	if (pwq->wq->saved_max_active == 1 || pwq->wq->rescuer)
		lock_map_acquire(&pwq->wq->lockdep_map);
	else
		lock_map_acquire_read(&pwq->wq->lockdep_map);
	lock_map_release(&pwq->wq->lockdep_map);

	return true;
already_gone:
	spin_unlock_irq(&pool->lock);
	return false;
}
|| →
static void insert_wq_barrier(struct pool_workqueue *pwq,
			      struct wq_barrier *barr,
			      struct work_struct *target, struct worker *worker)
{
	struct list_head *head;
	unsigned int linked = 0;

	/*
	 * debugobject calls are safe here even with pool->lock locked
	 * as we know for sure that this will not trigger any of the
	 * checks and call back into the fixup functions where we
	 * might deadlock.
	 */
	// (4.1) barr work 的执行函数 wq_barrier_func()
	INIT_WORK_ONSTACK(&barr->work, wq_barrier_func);
	__set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
	init_completion(&barr->done);

	/*
	 * If @target is currently being executed, schedule the
	 * barrier to the worker; otherwise, put it after @target.
	 */
	// (4.2) 如果 work 当前在 worker 中执行,则 barr work 插入 scheduled 队列
	if (worker)
		head = worker->scheduled.next;
	// 否则,则 barr work 插入正常的 worklist 队列中,插入位置在目标 work 后面
	// 并且置上 WORK_STRUCT_LINKED 标志
	else {
		unsigned long *bits = work_data_bits(target);

		head = target->entry.next;
		/* there can already be other linked works, inherit and set */
		linked = *bits & WORK_STRUCT_LINKED;
		__set_bit(WORK_STRUCT_LINKED_BIT, bits);
	}

	debug_work_activate(&barr->work);
	insert_work(pwq, &barr->work, head,
		    work_color_to_flags(WORK_NO_COLOR) | linked);
}
||| →
static void wq_barrier_func(struct work_struct *work)
{
	struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
	// (4.1.1) barr work 执行完成,发出 complete 信号。
	complete(&barr->done);
}

Workqueue 对外接口函数

CMWQ 实现的 workqueue 机制,被包装成相应的对外接口函数。

schedule_work()

把work压入系统默认wq system_wq,WORK_CPU_UNBOUND指定worker为当前CPU绑定的normal work_pool创建的worker。

kernel/workqueue.c: schedule_work() -> queue_work_on() -> __queue_work()

static inline bool schedule_work(struct work_struct *work)
{
	return queue_work(system_wq, work);
}
| →
static inline bool queue_work(struct workqueue_struct *wq,
			      struct work_struct *work)
{
	return queue_work_on(WORK_CPU_UNBOUND, wq, work);
}

schedule_work_on() 

在schedule_work()基础上,可以指定work运行的CPU。

kernel/workqueue.c: schedule_work_on() -> queue_work_on() -> __queue_work()

static inline bool schedule_work_on(int cpu, struct work_struct *work)
{
	return queue_work_on(cpu, system_wq, work);
}

schedule_delayed_work()

启动一个timer,在timer定时到了以后调用delayed_work_timer_fn()把work压入系统默认wq system_wq。

kernel/workqueue.c: schedule_work_on() -> queue_work_on() -> __queue_work()

static inline bool schedule_delayed_work(struct delayed_work *dwork,
					 unsigned long delay)
{
	return queue_delayed_work(system_wq, dwork, delay);
}
| →
static inline bool queue_delayed_work(struct workqueue_struct *wq,
				      struct delayed_work *dwork,
				      unsigned long delay)
{
	return queue_delayed_work_on(WORK_CPU_UNBOUND, wq, dwork, delay);
}
|| →
bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
			   struct delayed_work *dwork, unsigned long delay)
{
	struct work_struct *work = &dwork->work;
	bool ret = false;
	unsigned long flags;

	/* read the comment in __queue_work() */
	local_irq_save(flags);

	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
		__queue_delayed_work(cpu, wq, dwork, delay);
		ret = true;
	}

	local_irq_restore(flags);
	return ret;
}
||| →
static void __queue_delayed_work(int cpu, struct workqueue_struct *wq,
				struct delayed_work *dwork, unsigned long delay)
{
	struct timer_list *timer = &dwork->timer;
	struct work_struct *work = &dwork->work;

	WARN_ON_ONCE(timer->function != delayed_work_timer_fn ||
		     timer->data != (unsigned long)dwork);
	WARN_ON_ONCE(timer_pending(timer));
	WARN_ON_ONCE(!list_empty(&work->entry));

	/*
	 * If @delay is 0, queue @dwork->work immediately.  This is for
	 * both optimization and correctness.  The earliest @timer can
	 * expire is on the closest next tick and delayed_work users depend
	 * on that there's no such delay when @delay is 0.
	 */
	if (!delay) {
		__queue_work(cpu, wq, &dwork->work);
		return;
	}

	timer_stats_timer_set_start_info(&dwork->timer);

	dwork->wq = wq;
	dwork->cpu = cpu;
	timer->expires = jiffies + delay;

	if (unlikely(cpu != WORK_CPU_UNBOUND))
		add_timer_on(timer, cpu);
	else
		add_timer(timer);
}
|||| →
void delayed_work_timer_fn(unsigned long __data)
{
	struct delayed_work *dwork = (struct delayed_work *)__data;

	/* should have been called from irqsafe timer with irq already off */
	__queue_work(dwork->cpu, dwork->wq, &dwork->work);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/782910.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Arduino】XIAOFEIYU(TM)实验ESP32使用霍尔传感器(图文)

霍尔传感器是一种可以测量磁力变化的传感器&#xff0c;今天XIAOFEIYU就来测试一下ESP32使用霍尔传感器。 霍尔传感器&#xff1a;正负极加一个数据接口。 将传感器与ESP32进行电路连接&#xff1a; 编写程序&#xff1a; #define SIGNAL_PIN 33int value 0; // 存储传感…

51单片机-第一节-LED和独立按键

一、点亮LED&#xff1a; 首先包含头文件 <REGX52.H> 随后令P2为0xFE。(此时二进制对应1111 1110&#xff0c;为0 的LED亮&#xff0c;故八个灯中的最后一个亮起)。 注&#xff1a;P2为控制LED的8位寄存器。 void main() {P2 0xFE;//1111 1110while(1){} } 二、L…

《算法笔记》总结No.3——排序

基础算法之一&#xff0c;相当重要。在普通的机试中如果没有数据类型和时空限制&#xff0c;基本上选择自己最熟悉的就好。本篇只总结选择排序和插入排序&#xff0c;侧重应用&#xff0c;408中要求的种类更加繁多&#xff0c;此处先不扩展难度~总结最常用的两种排序。 一.选择…

腾讯课堂即将停止服务?来试试这款开源的知识付费系统

项目介绍 本系统基于ThinkPhp5.0layuiVue开发,功能包含在线直播、付费视频、付费音频、付费阅读、会员系统、分销系统、拼团活动、直播带货、直播打赏、商城系统等。能够快速积累客户、会员数据分析、智能转化客户、有效提高销售、吸引流量、网络营销、品牌推广的一款应用&…

javaIO流(2)

一.字符流 字符流对数据的操作是以一个个字符为单位的,字符流只能读文本文件,并将读到的字节按照编码表转为对应的字符,Reader和Writer是字符流的两个最大的抽象类,InputStreamReader和OutputStreamWriter分别继承了Reader和Writer,它俩的功能就是将读取到的字节转换为字符,所…

【大模型LLM面试合集】大语言模型基础_NLP面试题

NLP面试题 1.BERT 1.1 基础知识 BERT&#xff08;Bidirectional Encoder Representations from Transformers&#xff09;是谷歌提出&#xff0c;作为一个Word2Vec的替代者&#xff0c;其在NLP领域的11个方向大幅刷新了精度&#xff0c;可以说是近年来自残差网络最优突破性的…

流程图编辑框架LogicFlow-vue-ts和js

LogicFlow官网https://site.logic-flow.cn/LogicFlow 是一款流程图编辑框架&#xff0c;提供了一系列流程图交互、编辑所必需的功能和灵活的节点自定义、插件等拓展机制。LogicFlow支持前端研发自定义开发各种逻辑编排场景&#xff0c;如流程图、ER图、BPMN流程等。在工作审批配…

数据结构/作业/2024/7/7

搭建个场景: 将学生的信息&#xff0c;以顺序表的方式存储&#xff08;堆区)&#xff0c;并且实现封装函数︰1】顺序表的创建&#xff0c; 2】判满、 3】判空、 4】往顺序表里增加学生、5】遍历、 6】任意位置插入学生、7】任意位置删除学生、8】修改、 9】查找(按学生的学号查…

不同层数PCB如何选择合适板厚?

在回答这个问题前&#xff0c;我们首先需要了解什么是PCB厚度。 PCB厚度是指电路板完成后的厚度。 覆铜板的厚度&#xff1a;0.5、0.7、0.8、1.0、1.2、1.5、1.6、2.0、2.4、3.2和6.4毫米。 纸基覆铜板的标称厚度为 0.7 至 1.5 毫米。让我们开始了解更多细节。 标准 PCB 铜厚度…

使用GZip对npm run build打包的vendor.js文件进行压缩

vue-cli项目 安装npm i compression-webpack-plugin -D npm i compression-webpack-plugin -D使用&#xff1a;在vue.config.js文件中 const CompressionPlugin require(compression-webpack-plugin) module.exports {configureWebpack: {plugins: [new CompressionPlugin…

北斗在高铁轨道位移监测中的应用

随着高速铁路的飞速发展&#xff0c;轨道的监测与维护变得至关重要。传统的监测方法已难以满足现代高铁的需求。 近年来&#xff0c;北斗卫星导航系统凭借其高精度、全天候、全球覆盖的优势&#xff0c;在高铁轨道位移监测中发挥了重要作用。 高铁轨道监测系统通过集成北斗卫星…

谷歌正在试行人脸识别办公室安全系统

内容提要&#xff1a; &#x1f9ff;据美国消费者新闻与商业频道 CNBC 获悉&#xff0c;谷歌正在为其企业园区安全测试面部追踪技术。 &#x1f9ff;测试最初在华盛顿州柯克兰的一间办公室进行。 &#x1f9ff;一份内部文件称&#xff0c;谷歌的安全和弹性服务 (GSRS) 团队将…

C/C++ 代码注释规范及 doxygen 工具

参考 谷歌项目风格指南——注释 C doxygen 风格注释示例 ubuntu20 中 doxygen 文档生成 doxygen 官方文档 在 /Doxygen/Special Command/ 章节介绍 doxygen 的关键字 注释说明 注释的目的是提高代码的可读性与可维护性。 C 风格注释 // 单行注释/* 多行注释 */ C 风格注…

Git注释规范

主打一个有用 代码的提交规范参考如下&#xff1a; init:初始化项目feat:新功能&#xff08;feature&#xff09;fix:修补bugdocs:文档&#xff08;documentation&#xff09;style:格式&#xff08;不影响代码运行的变动&#xff09;refactor:重构&#xff08;即不是新增功能…

动手实操微软开源的GraphRAG

微软在今年4月份的时候提出了GraphRAG的概念&#xff0c;然后在上周开源了GraphRAG,Github链接见https://github.com/microsoft/graphrag,截止当前&#xff0c;已有6900Star。 安装教程 官方推荐使用Python3.10-3.12版本&#xff0c;我使用Python3.10版本安装时&#xff0c;在…

【C++】类和对象(中)--下篇

个人主页~ 类和对象上 类和对象中-上篇 类和对象 五、赋值运算符重载1、运算符重载2、赋值运算符重载3、前置和后置重载 六、const成员七、日期类的实现Date.hDate.cpptest.cpptest1测试结果test2测试结果test3测试结果test4测试结果test5测试结果test6测试结果test7测试结果 八…

如何在Excel中对一个或多个条件求和?

在Excel中&#xff0c;基于一个或多个条件的求和值是我们大多数人的常见任务&#xff0c;SUMIF函数可以帮助我们根据一个条件快速求和&#xff0c;而SUMIFS函数可以帮助我们对多个条件求和。 本文&#xff0c;我将描述如何在Excel中对一个或多个条件求和&#xff1f; 在Excel中…

R语言学习笔记2-RRStudio环境配置(Windows版)

R语言学习笔记2-R&RStudio环境配置&#xff08;Windows版&#xff09; 安装 R安装RStudio修改默认工作目录修改镜像验证镜像源修改文件编码 环境测试 安装 R R下载地址&#xff1a; https://mirrors.tuna.tsinghua.edu.cn/CRAN/bin/windows/base/ 点击下载链接并运行安装程…

《网络安全和信息化》是什么级别的期刊?是正规期刊吗?能评职称吗?

​问题解答 问&#xff1a;《网络安全和信息化》是不是核心期刊&#xff1f; 答&#xff1a;不是&#xff0c;是知网收录的正规学术期刊。 问&#xff1a;《网络安全和信息化》级别&#xff1f; 答&#xff1a;国家级。主管单位&#xff1a;工业和信息化部 主办单位&#…

PostgreSQL主从同步

目录 一、主从复制原理 二、配置主数据库 2.1 创建同步账号 2.2 配置同步账号访问控制 2.3 设置同步参数 3.4 重启主数据库 三、配置从数据库 3.1 停止从库 3.2 清空从库数据文件 3.3 拉取主库数据文件 3.4 配置从库同步参数 3.5 启动从库 四、测试主从 4.1在主库…