Node.js C++ 层的任务管理
admin
2024-03-13 23:39:23
0

我们都知道 Node.js 是基于事件循环来运行的,本质上是一个生产者 / 消费者模型,所以就少不了任务的管理机制,不过本文不是介绍事件循环中的任务管理,而是 C++ 层的任务管理。本文主要介绍 SetImmediate、SetImmediateThreadsafe、RequestInterrupt、AddCleanupHook 这四个 API 产生的任务。时间关系,随便写写,权当笔记。

任务管理机制的初始化

首先来看一下 Node.js 启动的过程中,和任务管理相关的逻辑。

uv_check_start(immediate_check_handle(), CheckImmediate)
uv_async_init(event_loop(),&task_queues_async_,[](uv_async_t* async) {Environment* env = ContainerOf(&Environment::task_queues_async_, async);env->RunAndClearNativeImmediates();})

CheckImmediate 是在 check 阶段执行的函数,task_queues_async_ 则用于线程间通信,即当子线程往主线程提交任务时,通过 task_queues_async_ 通知主线程,然后主线程执行 uv_async_init 注册的回调。上面的代码就是消费者的逻辑。后面再详细分析里面的处理流程。

提交任务

接下来逐个看一下生产者的逻辑。

template 
void Environment::SetImmediate(Fn&& cb, CallbackFlags::Flags flags) {auto callback = native_immediates_.CreateCallback(std::move(cb), flags);native_immediates_.Push(std::move(callback));// ...
}

SetImmediate 用于同线程的代码提交任务。

template 
void Environment::SetImmediateThreadsafe(Fn&& cb, CallbackFlags::Flags flags) {auto callback = native_immediates_threadsafe_.CreateCallback(std::move(cb), flags);{Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);native_immediates_threadsafe_.Push(std::move(callback));if (task_queues_async_initialized_)uv_async_send(&task_queues_async_);}
}

SetImmediateThreadsafe 用于子线程给主线程提交任务,所以需要加锁。

template 
void Environment::RequestInterrupt(Fn&& cb) {auto callback = native_immediates_interrupts_.CreateCallback(std::move(cb), CallbackFlags::kRefed);{Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);native_immediates_interrupts_.Push(std::move(callback));if (task_queues_async_initialized_)uv_async_send(&task_queues_async_);}RequestInterruptFromV8();
}

RequestInterrupt 用于子线程给主线程提交代码,他和 SetImmediateThreadsafe 有一个很重要的区别是调用了 RequestInterruptFromV8。

void Environment::RequestInterruptFromV8() {isolate()->RequestInterrupt([](Isolate* isolate, void* data) {std::unique_ptr env_ptr { static_cast(data) };Environment* env = *env_ptr;env->RunAndClearInterrupts();}, interrupt_data);
}

RequestInterrupt 可以使得提交的代码在 JS 代码死循环时依然会被执行。接着看 AddCleanupHook。

void Environment::AddCleanupHook(CleanupQueue::Callback fn, void* arg) {cleanup_queue_.Add(fn, arg);
}

AddCleanupHook 用于注册线程退出前的回调。生产者的逻辑都比较简单,就是往任务队列里插入一个任务,如果是涉及到线程间的任务,则通知主线程。

消费者

接下来看一下消费者的逻辑,根据前面的分析可以知道,消费者有几个:CheckImmediate,task_queues_async_ 的处理函数、RequestInterrupt 注册的函数、退出前回调处理函数。先看 CheckImmediate。

void Environment::CheckImmediate(uv_check_t* handle) {Environment* env = Environment::from_immediate_check_handle(handle);env->RunAndClearNativeImmediates();
}void Environment::RunAndClearNativeImmediates(bool only_refed) {RunAndClearInterrupts();auto drain_list = [&](NativeImmediateQueue* queue) {while (auto head = queue->Shift()) {head->Call(this);}return false;};while (drain_list(&native_immediates_)) {}NativeImmediateQueue threadsafe_immediates;if (native_immediates_threadsafe_.size() > 0) {Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);threadsafe_immediates.ConcatMove(std::move(native_immediates_threadsafe_));}while (drain_list(&threadsafe_immediates)) {}
}void Environment::RunAndClearInterrupts() {while (native_immediates_interrupts_.size() > 0) {NativeImmediateQueue queue;{Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);queue.ConcatMove(std::move(native_immediates_interrupts_));}while (auto head = queue.Shift())head->Call(this);}
}

CheckImmediate 函数中处理了SetImmediate、SetImmediateThreadsafe 和 RequestInterrupt 产生的任务。但是如果主线程阻塞在 Poll IO 阶段时,只有子线程提交任务时会唤醒主线程,具体是通过 task_queues_async_ 结构体,看一下处理函数。

env->RunAndClearNativeImmediates();

可以看到这时候也是处理了SetImmediate、SetImmediateThreadsafe 和 RequestInterrupt 产生的任务。最后来看一下处理退出前回调的函数,具体时机是 FreeEnvironment 函数中的 env->RunCleanup()。

void Environment::RunCleanup() {RunAndClearNativeImmediates(true);while (!cleanup_queue_.empty() || principal_realm_->HasCleanupHooks() ||native_immediates_.size() > 0 ||native_immediates_threadsafe_.size() > 0 ||native_immediates_interrupts_.size() > 0) {// 见 CleanupQueue::Draincleanup_queue_.Drain();RunAndClearNativeImmediates(true);}
}// cleanup_queue_.Drain();
void CleanupQueue::Drain() {std::vector callbacks(cleanup_hooks_.begin(),cleanup_hooks_.end());std::sort(callbacks.begin(),callbacks.end(),[](const CleanupHookCallback& a, const CleanupHookCallback& b) {return a.insertion_order_counter_ > b.insertion_order_counter_;});for (const CleanupHookCallback& cb : callbacks) {cb.fn_(cb.arg_);cleanup_hooks_.erase(cb);}
}

RunCleanup 中同时处理了 SetImmediate、SetImmediateThreadsafe、 RequestInterrupt 产生的任务和注册的退出前回调。

相关内容

热门资讯

【MySQL】锁 锁 文章目录锁全局锁表级锁表锁元数据锁(MDL)意向锁AUTO-INC锁...
【内网安全】 隧道搭建穿透上线... 文章目录内网穿透-Ngrok-入门-上线1、服务端配置:2、客户端连接服务端ÿ...
GCN的几种模型复现笔记 引言 本篇笔记紧接上文,主要是上一篇看写了快2w字,再去接入代码感觉有点...
数据分页展示逻辑 import java.util.Arrays;import java.util.List;impo...
Redis为什么选择单线程?R... 目录专栏导读一、Redis版本迭代二、Redis4.0之前为什么一直采用单线程?三、R...
【已解决】ERROR: Cou... 正确指令: pip install pyyaml
关于测试,我发现了哪些新大陆 关于测试 平常也只是听说过一些关于测试的术语,但并没有使用过测试工具。偶然看到编程老师...
Lock 接口解读 前置知识点Synchronized synchronized 是 Java 中的关键字,...
Win7 专业版安装中文包、汉... 参考资料:http://www.metsky.com/archives/350.htm...
3 ROS1通讯编程提高(1) 3 ROS1通讯编程提高3.1 使用VS Code编译ROS13.1.1 VS Code的安装和配置...
大模型未来趋势 大模型是人工智能领域的重要发展趋势之一,未来有着广阔的应用前景和发展空间。以下是大模型未来的趋势和展...
python实战应用讲解-【n... 目录 如何在Python中计算残余的平方和 方法1:使用其Base公式 方法2:使用statsmod...
学习u-boot 需要了解的m... 一、常用函数 1. origin 函数 origin 函数的返回值就是变量来源。使用格式如下...
常用python爬虫库介绍与简... 通用 urllib -网络库(stdlib)。 requests -网络库。 grab – 网络库&...
药品批准文号查询|药融云-中国... 药品批文是国家食品药品监督管理局(NMPA)对药品的审评和批准的证明文件...
【2023-03-22】SRS... 【2023-03-22】SRS推流搭配FFmpeg实现目标检测 说明: 外侧测试使用SRS播放器测...
有限元三角形单元的等效节点力 文章目录前言一、重新复习一下有限元三角形单元的理论1、三角形单元的形函数(Nÿ...
初级算法-哈希表 主要记录算法和数据结构学习笔记,新的一年更上一层楼! 初级算法-哈希表...
进程间通信【Linux】 1. 进程间通信 1.1 什么是进程间通信 在 Linux 系统中,进程间通信...
【Docker】P3 Dock... Docker数据卷、宿主机与挂载数据卷的概念及作用挂载宿主机配置数据卷挂载操作示例一个容器挂载多个目...