当前位置: 首页 > news >正文

led 网站建设劳务公司找项目平台

led 网站建设,劳务公司找项目平台,嘉祥网站建设多少钱,怎么看网站的建设时间async-std是rust异步生态中的基础运行时库之一#xff0c;核心理念是合理的性能 用户友好的api体验。经过几个月密集的开发#xff0c;前些天已经发布1.0稳定版本。因此是时候来一次深入的底层源码分析。async-std的核心是一个带工作窃取的多线程Executor#xff0c;而其本…async-std是rust异步生态中的基础运行时库之一核心理念是合理的性能 用户友好的api体验。经过几个月密集的开发前些天已经发布1.0稳定版本。因此是时候来一次深入的底层源码分析。async-std的核心是一个带工作窃取的多线程Executor而其本身的实现又依赖于async-task这个关键库因此本文主要对async-task的源码进行分析。当Future提交给Executor执行时Executor需要在堆上为这个Future分配空间同时需要给它分配一些状态信息比如Future是否可以执行(poll)是否在等待被唤醒是否已经执行完成等等。我们一般把提交给Executor执行的Future和其连带的状态称为 task。async-task这个库就是对task进行抽象封装以便于Executor的实现其有几个创新的特性整个task只需要一次内存分配完全隐藏了RawWaker以避免实现Executor时处理unsafe代码的麻烦提供了 JoinHandle这样spawn函数对Future没有 Output()的限制极大方便用户使用使用方式async-task只对外暴露了一个函数接口以及对应了两个返回值类型pub fn spawnF, R, S, T(future: F, schedule: S, tag: T) - (TaskT, JoinHandleR, T) whereF: FutureOutput R Send static,R: Send static,S: Fn(TaskT) Send Sync static,T: Send Sync static, 其中参数future表示要执行的Futureschedule是一个闭包当task变为可执行状态时会调用这个函数以调度该task重新执行tag是附带在该task上的额外上下文信息比如task的名字id等。 返回值Task就是构造好的task对象JoinHandle实现了Future用于接收最终执行的结果。值得注意的是spawn这个函数并不会做类似在后台进行计算的操作而仅仅是分配内存创建一个task出来因此其实叫create_task反而更为恰当且好理解。Task提供了如下几个方法 // 对该task进行调度pub fn schedule(self);// poll一次内部的Future如果Future完成了则会通知JoinHandle取结果。否则task进// 入等待直到被被下一次唤醒进行重新调度执行。pub fn run(self);// 取消task的执行pub fn cancel(self);// 返回创建时传入的tag信息pub fn tag(self) - T;JoinHandle实现了Future trait同时也提供了如下几个方法 // 取消task的执行pub fn cancel(self);// 返回创建时传入的tag信息pub fn tag(self) - T;同时Task和JoinHandle都实现了SendSync所以他们可以出现在不同的线程并通过tag方法可以同时持有 T因此spawn函数对T有Sync的约束。借助于async_task的抽象下面的几十行代码就实现了一个共享全局任务队列的多线程Executoruse std::future::Future; use std::thread;use crossbeam::channel::{unbounded, Sender}; use futures::executor; use once_cell::sync::Lazy;static QUEUE: LazySenderasync_task::Task() Lazy::new(|| {let (sender, receiver) unbounded::async_task::Task()();for _ in 0..4 {let recv receiver.clone();thread::spawn(|| {for task in recv {task.run(); }});}sender });fn spawnF, R(future: F) - async_task::JoinHandleR, () whereF: FutureOutput R Send static,R: Send static, {let schedule |task| QUEUE.send(task).unwrap();let (task, handle) async_task::spawn(future, schedule, ());task.schedule();handle }fn main() {let handles: Vec_ (0..10).map(|i| {spawn(async move {println!(Hello from task {}, i);})}).collect();// Wait for the tasks to finish.for handle in handles {executor::block_on(handle);} }Task的结构图通常rust里的并发数据结构会包含底层的实现一般叫Inner或者RawXXX包含大量裸指针等unsafe操作然后再其基础上进行类型安全包装提供上层语义。比如channel上层暴露出 Sender和 Receiver,其行为不一样但内部表示是完全一样的。async-task也类似JoinHandle, Task以及调用Future::poll时传递的Waker类型内部都共享同一个RawTask结构。由于JoinHandle本身是一个Future整个并发结构还有第四个角色-在JoinHandle上调用poll的task传递的Waker为避免引起混淆就称它为Awaiter吧。整个的结构图大致如下整个task在堆上一次分配内存布局按HeaderTag, Schedule,Future/Output排列。由于Future和Output不同时存在因此他们共用同一块内存。JoinHandle只有一个不访问Future可以访问Output一旦销毁就不再生成Task主要访问Future销毁后可以继续生成不过同一时间最多只有一个这样可以避免潜在的多个Task对Future进行并发访问的bugWaker可以存在多份主要访问schedule数据由于spawn函数的参数要求schedule必须是SendSync因此多个waker并发调用是安全的。Header本身包含三个部分state是一个原子变量包含引用计数task的执行状态awaiter锁等信息awaiter保存的是JoinHandle所在的task执行时传递的Waker用于当Output生成后通知JoinHandle来取vtable是一个指向静态变量的虚表指针。task中的状态所有的并发操作都是通过Header中的state这个原子变量来进行同步协调的。主要有以下几种flagconstSCHEDULED:usize10; task已经调度准备下一次执行这个flag可以和RUNGING同时存在。constRUNNING:usize11; 这个task正在执行中这个flag可以和SCHEDULED同时存在。constCOMPLETED:usize12; 这个task的future已经执行完成。constCLOSED:usize13; 表示这个task要么被cancel掉了要么output被JoinHandle取走了是一个终结状态。constHANDLE:usize14; 表示JoinHandle存在。constAWAITER:usize15; 表示JoinHandle正在等待Output用于快速判断Header里的awaiter不为None避免获取锁的操作。constLOCKED:usize16; 读写Header里的awaiter时需要设置这个字段标识是否处于locked状态。constREFERENCE:usize17; 从第7bit开始到最高位当作引用计数用代表Task和Waker的总数主要JoinHandle在HANDLE的flag里跟踪。JoinHandle的实现分析JoinHandle::cancel为避免并发问题JoinHandle不接触Future数据而由于取消task的执行需要析构Future数据因此cancel操作通过重新schedule一次把操作传递给Task执行。implR, T JoinHandleR, T {pub fn cancel(self) {let ptr self.raw_task.as_ptr();let header ptr as *const Header;unsafe {let mut state (*header).state.load(Ordering::Acquire);loop {// 如果task已经结束或者closed什么也不做。if state (COMPLETED | CLOSED) ! 0 {break;}let new if state (SCHEDULED | RUNNING) 0 {// 如果不处于scheduled或running状态那么下面就需要调用schedule// 函数通知Task因此要加上SCHEDULED 和增加引用计数(state | SCHEDULED | CLOSED) REFERENCE} else {// 否则要么task已经schedue过了过段时间会重新执行要么当前正在// 运行因此只需要设置closed状态task执行完后会收到close状态并// 进行处理。state | CLOSED};match (*header).state.compare_exchange_weak(state,new,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) {// 重新schedule以便executor将Future销毁if state (SCHEDULED | RUNNING) 0 {((*header).vtable.schedule)(ptr);}// 如果有awaiter的话通知相应的的task。if state AWAITER ! 0 {(*header).notify();}break;}Err(s) state s,// 失败重试}}}} }JoinHandle::drop由于整个task的所有权是由JoinHandle,Task和Waker共享的因此都需要手动实现drop。Output只会由JoinHandle访问因此如果有的话也要一同销毁。implR, T Drop for JoinHandleR, T {fn drop(mut self) {let ptr self.raw_task.as_ptr();let header ptr as *const Header;let mut output None;unsafe {// 由于很多时候JoinHandle不用会在刚创建的时候直接drop掉因此针对这种情// 况作一个特殊化处理。这样一个原子操作就完成了。if let Err(mut state) (*header).state.compare_exchange_weak(SCHEDULED | HANDLE | REFERENCE,SCHEDULED | REFERENCE,Ordering::AcqRel,Ordering::Acquire,) {loop {// 如果task完成了但是还没有close掉说明output还没有被取走需// 要在这里取出来进行析构。if state COMPLETED ! 0 state CLOSED 0 {// 标记为closed这样就可以安全地读取output的数据。match (*header).state.compare_exchange_weak(state,state | CLOSED,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) { output Some((((*header).vtable.get_output)(ptr) as *mut R).read());// 更新状态重新循环state | CLOSED;}Err(s) state s,}} else {// 进到这里说明task要么没完成要么已经closed了。let new if state (!(REFERENCE - 1) | CLOSED) 0 {// Task和Waker都已经没了并且没closed根据进else的条// 件可知task没完成Future还在重新schedule一次让// executor把Future析构掉。SCHEDULED | CLOSED | REFERENCE} else {// 移除HANDLE flagstate !HANDLE};match (*header).state.compare_exchange_weak(state,new,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) {// 如果这是最后一个引用if state !(REFERENCE - 1) 0 { if state CLOSED 0 {//并且没closed根据进else的条件可知task没// 完成,重新schedule一次析构Future((*header).vtable.schedule)(ptr);} else {// task已经完成了output也已经在上面读出// 来了同时也是最后一个引用需要把task自// 身析构掉。((*header).vtable.destroy)(ptr);}}// 还有其他引用在资源的释放由他们负责。break;}Err(s) state s,}}}}}// 析构读取出来的outputdrop(output);} }JoinHandle::poll检查Output是否已经可以拿没有的话注册cx.waker()等通知。implR, T Future for JoinHandleR, T {type Output OptionR;fn poll(self: Pinmut Self, cx: mut Context_) - PollSelf::Output {let ptr self.raw_task.as_ptr();let header ptr as *const Header;unsafe {let mut state (*header).state.load(Ordering::Acquire);loop {// task已经closed了没output可拿。if state CLOSED ! 0 {// 大部分可情况下header里的awaiter就是cx.waker也有例外因// 此一并进行通知。(*header).notify_unless(cx.waker());return Poll::Ready(None);}// 如果task还没完成if state COMPLETED 0 {// 那么注册当前的cx.waker到Header::awaiter里这样完成了可以收// 到通知。abort_on_panic(|| {(*header).swap_awaiter(Some(cx.waker().clone()));});// 要是在上面注册前正好task完成了那么就收不到通知了因此注册后// 需要重新读取下状态看看。state (*header).state.load(Ordering::Acquire);// task已经closed了没output可拿返回None。if state CLOSED ! 0 {// 这里我分析下来是不需要再通知了提了个pr等作者回应。(*header).notify_unless(cx.waker());return Poll::Ready(None);}// task还没完成上面已经注册了waker可以直接返回Pending。if state COMPLETED 0 {return Poll::Pending;}}// 到这里说明task已经完成了。把它设置为closed状态就可以拿output了。match (*header).state.compare_exchange(state,state | CLOSED,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) {// 设置closed成功通知其他的awaiter。由于上面是原子的swap操// 作且一旦设置为closedawaiter就不会再变更了因此可以// 用AWAITER这个flag进行快速判断。if state AWAITER ! 0 {(*header).notify_unless(cx.waker());}// 读取出Output并返回。let output ((*header).vtable.get_output)(ptr) as *mut R;return Poll::Ready(Some(output.read()));}Err(s) state s,}}}} }Task的实现分析Task::schedule这个函数先通过Task内部保存的指针指向Header并从Header的vtable字段中拿到schedule函数指针这个函数最终调用的是用户调用spawn时传入的schedule闭包。因此本身很直接。Task::run这个函数先通过Task内部保存的指针指向Header并从Header的vtable字段中拿到run函数指针其指向RawTask::run实现如下首先根据指针参数强转为RawTask并根据Header的vtable拿到RawWakerVTable构造好Waker和Context,为调用Future::poll做准备。unsafe fn run(ptr: *const ()) {let raw Self::from_ptr(ptr);let waker ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr,(*raw.header).vtable.raw_waker,)));let cx mut Context::from_waker(waker);//... }然后获取当前的state循环直到更新state的RUNING成功为止。 let mut state (*raw.header).state.load(Ordering::Acquire);loop {// 如果task已经closed那么Future可以直接析构掉并返回。if state CLOSED ! 0 {if state AWAITER ! 0 {(*raw.header).notify();}Self::drop_future(ptr);// 扣掉当前task的引用计数因为run函数的参数是self。Self::decrement(ptr);return;}// 移除SCHEDULED状态并标记RUNINGmatch (*raw.header).state.compare_exchange_weak(state,(state !SCHEDULED) | RUNNING,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) {// 更新state到新的状态后面的代码还要复用state。state (state !SCHEDULED) | RUNNING;break;}Err(s) state s,}}标记为RUNING状态后就可以开始正式调用Future::poll了不过在调用前设置Guard以便poll函数panic时可以调用Guard的drop函数保证状态一致。 let guard Guard(raw);let poll F as Future::poll(Pin::new_unchecked(mut *raw.future), cx);mem::forget(guard); // 没panic移除掉guard.drop的调用。match poll {Poll::Ready(out) {/// ... }Poll::Pending {// ... }}如果Future完成了那么先把Future析构掉腾出内存把output写进去。并循环尝试将RUNING状态去掉。match poll {Poll::Ready(out) {Self::drop_future(ptr);raw.output.write(out);let mut output None;loop {// JoinHandle已经没了那么output没人取我们需要析构掉output并设置为// closed状态。let new if state HANDLE 0 {(state !RUNNING !SCHEDULED) | COMPLETED | CLOSED} else {(state !RUNNING !SCHEDULED) | COMPLETED};match (*raw.header).state.compare_exchange_weak(state,new,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) {// 如果handle没了或者跑的时候closed了那么需要把output再读取// 出来析构掉。if state HANDLE 0 || state CLOSED ! 0 {output Some(raw.output.read());}// 通知JoinHandle来取数据。if state AWAITER ! 0 {(*raw.header).notify();}Self::decrement(ptr);break;}Err(s) state s,}}drop(output);}Poll::Pending {// ...} 如果没完成的话循环尝试移除RUNING同时在poll的时候其他线程不能调用shedule函数而是设置SCHEDULED所以需要检查这个flag如果设置了则需要代劳。match poll {Poll::Ready(out) {/// handle ready case ... }Poll::Pending {loop {// poll的时候closed了这里为啥要移除SCHEDULED状态暂时不清楚需要问问// 作者。let new if state CLOSED ! 0 {state !RUNNING !SCHEDULED} else {state !RUNNING};match (*raw.header).state.compare_exchange_weak(state,new,Ordering::AcqRel,Ordering::Acquire,) {Ok(state) {if state CLOSED ! 0 {// 设置closed状态的那个线程是不能碰Future的否则和当前线程// 产生内存并发访问冲突。因此代劳析构操作。Self::drop_future(ptr);Self::decrement(ptr);} else if state SCHEDULED ! 0 {// poll的时候其他线程想schedule这个task但是不能调用因此// 当前线程代劳。 chedule函数接收self类似move语义,因此这里// 不需要decrement。Self::schedule(ptr);} else {Self::decrement(ptr);}break;}Err(s) state s,}}} }在poll时如果发生panic则Guard负责收拾残局。fn drop(mut self) {let raw self.0;let ptr raw.header as *const ();unsafe {let mut state (*raw.header).state.load(Ordering::Acquire);loop {// poll的时候被其他线程closed了if state CLOSED ! 0 {// 看代码state一旦处于CLOSED后schedule不会再运行。这里为啥要移除// SCHEDULED状态暂时不清楚需要问问作者。(*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);// 析构FutureRawTask::F, R, S, T::drop_future(ptr);RawTask::F, R, S, T::decrement(ptr);break;}match (*raw.header).state.compare_exchange_weak(state,(state !RUNNING !SCHEDULED) | CLOSED,Ordering::AcqRel,Ordering::Acquire,) {Ok(state) {// 析构FutureRawTask::F, R, S, T::drop_future(ptr);// 通知awaitertask已经close了.if state AWAITER ! 0 {(*raw.header).notify();}RawTask::F, R, S, T::decrement(ptr);break;}Err(s) state s,}}} }Waker相关函数的实现wake函数wake函数主要功能是设置SCHEDULE状态并尝试调用schedule函数有两个重要的细节需要注意task正在执行时不能调用schedule函数当task已经被schedule过了时也需要额外做一次原子操作施加Release语义。unsafe fn wake(ptr: *const ()) {let raw Self::from_ptr(ptr);let mut state (*raw.header).state.load(Ordering::Acquire);loop { if state (COMPLETED | CLOSED) ! 0 {// 如果task完成或者close了直接drop掉自己wake的参数是self语义Self::decrement(ptr);break;}if state SCHEDULED ! 0 {// 这段代码极为关键如果task已经schedule过了则重新把读出来的state// 设置回去虽然看起来好像是无用的其实是为了施加Release同步语义// 把当前线程的内存视图同步到其他线程去。即便是rust标准库之前也因为// 没处理好类似这个情况出过bug。match (*raw.header).state.compare_exchange_weak(state,state,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) {Self::decrement(ptr);break;}Err(s) state s,}} else {// task没schedule过则设置状态。match (*raw.header).state.compare_exchange_weak(state,state | SCHEDULED,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) {// 如果task当前没有运行那么可以调用schedule函数。if state (SCHEDULED | RUNNING) 0 {// Schedule the task.let task Task {raw_task: NonNull::new_unchecked(ptr as *mut ()),_marker: PhantomData,};(*raw.schedule)(task);} else {// task正在运行不需要调用schedule等运行结束后对应的// 线程会代劳。Self::decrement(ptr);}break;}Err(s) state s,}}} }wake_by_ref这个函数的功能和wake类似唯一的区别就是wake的参数是self有move语义wakebyref是self。实现差异不大就不做具体分析了。clone_wakerwaker的clone实现也比较简单直接将Header里的state的引用计数加一即可。unsafe fn clone_waker(ptr: *const ()) - RawWaker {let raw Self::from_ptr(ptr);let raw_waker (*raw.header).vtable.raw_waker;let state (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);if state isize::max_value() as usize {std::process::abort();}RawWaker::new(ptr, raw_waker) }总结整个task的设计非常精细api也非常直观难怪一发布就直接上1.0版本。
http://www.zqtcl.cn/news/918948/

相关文章:

  • 沈阳市浑南区城乡建设局网站淄博哪里有网站建设平台
  • 做不锈钢管网站口碑好的定制网站建设提供商
  • 做网站推广销售wordpress 随机页面
  • 陈坤做直播在哪个网站如何在建设银行网站预约纪念币
  • 如何做网站么新网站一天做多少外链
  • 用家用路由器ip做网站营销策略方案
  • 学历教育网站建设网页前端是什么
  • 相同网站名网站县区分站点建设
  • 医疗器械网站建设方案南京网站制作系统
  • 小网站托管费用企查宝企业查询
  • 专门做特卖的网站是什么外国炫酷网站网址
  • 学习网站的建设wordpress批量拿shell
  • 中企动力做的网站推软件
  • 北京财优化沧州seo公司
  • 收到网站代码后怎么做啥是东莞网站优化推广
  • 重庆商城网站开发网站建设中英版
  • 免费企业网站开发给酒吧做网站
  • 想用自己电脑做服务器做个网站吗网站制作工作室哪家比较好
  • 这样建立网站vs2008做网站
  • 做网站创业故事好看大方的企业网站源码.net
  • 做家常菜哪个网站最好香蜜湖附近网站建设
  • 网站index.php被修改seo网络推广经理招聘
  • 南京做网站联系南京乐识网站建设培训福州
  • 比较冷门的视频网站做搬运网站建设 分析
  • 网站开发实习计划模板有做数学题的网站吗
  • 汕头 网站网页设计图片轮播切换
  • 免费ui网站美橙网站设计
  • 网站建设 海口哪里有网站设计公司
  • 广西建设监理协会官方网站网站建设的需求文档
  • 网站后台怎么做飘窗wordpress add_theme_page