博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
深入浅出Rust Future Part-3
阅读量:5884 次
发布时间:2019-06-19

本文共 7698 字,大约阅读时间需要 25 分钟。

译自
本文时间:2018-12-03,译者:
, 简介:motecshine

欢迎向Rust中文社区投稿, ,好文将在以下地方直接展示

  1. Rust中文社区Rust文章栏目
  2. 知乎专栏
  3. sf.gg专栏
  4. 微博

Intro

在这篇文章中我们将会讨论和阐释Reactor是如何工作的.在上篇文章中我们,我们频繁的使用Reactor来执行我们的Future,但是并没有阐述它是如何工作的。现在是时候阐明它了。

Reactor? Loop?

如果用一句话来描述Reactor,那应该是:

Reactor是一个环(
Loop)

举个栗子:

你决定通过Email邀请你喜欢的女孩或者男孩(emmm, 这个栗子听起来很老套), 你怀着忐忑的心将这份邮件发送出去,心里焦急着等待着, 不停的一遍又一遍的检查你的邮箱是否有新的回复. 直到收到回复。
Rust's Reactor就是这样, 你给他一个future, 他会不断的检查,直到这个future完成(或者返回错误). Reactor通过调用程序员实现的Poll函数,来检查Future是否已完成。你所要做的就是实现future poll 并且返回Poll<T, E>结构。但是 Reactor也不会无休止的对你的future function轮询。

A future from scratch

为了让我们能更容易理解Reactor知识,我们还是从零开始实现一个Future. 换句话说就是,我们将动手实现Future Trait.

#[derive(Debug)]struct WaitForIt {    message: String,    until: DateTime
, polls: u64,}

我们的结构体字段也很简单:

  • message: 自定义字符串消息体
  • polls: 轮循次数
  • util: 等待时间

我们还会实现 WaitFotIt结构体的new方法.这个方法作用是初始化WaitForIt

impl WaitForIt {    pub fn new(message: String, delay: Duration) -> WaitForIt {        WaitForIt {            polls: 0,            message: message,            until: Utc::now() + delay,        }    }}impl Future for WaitForIt {    type Item = String;    type Error = Box
; fn poll(&mut self) -> Poll
{ let now = Utc::now(); if self.until < now { Ok(Async::Ready( format!("{} after {} polls!", self.message, self.polls), )) } else { self.polls += 1; println!("not ready yet --> {:?}", self); Ok(Async::NotReady) } }}

让我们逐步解释

type Item = String;    type Error = Box
;

上面两行在RUST里被叫做associated types, 意思就是Future在将来完成时返回的值(或者错误).

fn poll(&mut self) -> Poll
{}

定义轮询的方法。Self::Item, Self::Error 是我们定义的associated types站位符。在我们的例子中,该方法如下:

fn poll(&mut self) - > Poll 
>

现在看看我们的逻辑代码:

let now = Utc::now();if self.until < now {// 告诉reactor `Future` 已经完成了!} else {// 告诉 reactor `Future` 还没准备好,过会儿再来。}

Rust里我们该怎样告诉Reactor某个Future已经完成了?很简单使用枚举

Ok(Async::NotReady(.......)) // 还没完成Ok(Async::Ready(......)) // 完成了

让我们来实现上述的方法:

impl Future for WaitForIt {    type Item = String;    type Error = Box
; fn poll(&mut self) -> Poll
{ let now = Utc::now(); if self.until < now { Ok(Async::Ready( format!("{} after {} polls!", self.message, self.polls), )) } else { self.polls += 1; println!("not ready yet --> {:?}", self); Ok(Async::NotReady) } }}

为了让这段代码运行起来我们还需要:

extern crate chrono;extern crate futures;extern crate tokio_core;use futures::done;use futures::prelude::*;use futures::future::{err, ok};use tokio_core::reactor::Core;use std::error::Error;use futures::prelude::*;use futures::*;use chrono::prelude::*;use chrono::*;fn main() {    let mut reactor = Core::new().unwrap();    let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));    println!("wfi_1 == {:?}", wfi_1);    let ret = reactor.run(wfi_1).unwrap();    println!("ret == {:?}", ret);}

运行!! 等待一秒我们将会看到结果:

Running `target/debug/tst_fut_create`wfi_1 == WaitForIt { message: "I\'m done:", until: 2017-11-07T16:07:06.382232234Z, polls: 0 }not ready yet --> WaitForIt { message: "I\'m done:", until: 2017-11-07T16:07:06.382232234Z, polls: 1 }

emmm~, 只运行一次就被卡住了, 但是没有额外的消耗CPU.但是为什么会这样?

如果不明确告诉
Reactor
Reactor是不会再次轮询停放(park)给它的
Future.

(- 译注: Park: 翻译成停放其实也挺好的,就像车场的停车位一样.)

在我们的例子里, Reactor会立即执行我们停放的Future方法, 当我们返回Async::NotReady, 它就会认为当前停放的Future还未完成。如果我们不主动去解除停放,Reactor永远也不会再次调用。

空闲中的Reactor是不会消耗CPU的。这样看起来Reactor效率还是很高的。

在我们的电子邮件示例中,我们可以避免手动检查邮件并等待通知。 所以我们可以在此期间自由玩Doom。(emm~看来作者很喜欢这款游戏).

另一个更有意义的示例可能是从网络接收数据。 我们可以阻止我们的线程等待网络数据包,或者我们等待时可以做其他事情。 您可能想知道为什么这种方法比使用OS线程更好?

Unparking

我们该如何纠正我们例子?我们需要以某种方式取消我们的Future。 理想情况下,我们应该有一些外部事件来取消我们的Future(例如按键或网络数据包),但是对于我们的示例,我们将使用这个简单的行手动取消停放

futures::task::current().notify();

像这样:

impl Future for WaitForIt {    type Item = String;    type Error = Box
; fn poll(&mut self) -> Poll
{ let now = Utc::now(); if self.until < now { Ok(Async::Ready( format!("{} after {} polls!", self.message, self.polls), )) } else { self.polls += 1; println!("not ready yet --> {:?}", self); futures::task::current().notify(); Ok(Async::NotReady) } }}

现在代码完成了。 请注意,在我的情况下,该函数已被调用超过50k次, CPU占用也很高!

这是严重的浪费,也清楚地说明你为什么需要在某个合理的时间点去Unpark Future.( That's a waste of resources and clearly demonstrates why you should unpark your future only when something happened. )

另请注意循环如何仅消耗单个线程。 这是设计和效率的来源之一。 当然,如果需要,您可以使用更多线程。

Joining

Reactor可以同时运行多个Future,这也是他为什么如此有 那么我们该如何充分利用单线程: 当一个Future被停放的时候, 另一个可以继续工作。

对于这个例子,我们将重用我们的WaitForIt结构。 我们只是同时调用两次。 我们开始创建两个Future的实例:

let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));println!("wfi_1 == {:?}", wfi_1);let wfi_2 = WaitForIt::new("I'm done too:".to_owned(), Duration::seconds(1));println!("wfi_2 == {:?}", wfi_2);

现在我们来调用futures::future::join_all, 他需要一个vec![]迭代器, 并且返回枚举过的Future

let v = vec![wfi_1, wfi_2];let sel = join_all(v);

我们重新实现的代码像这样:

fn main() {    let mut reactor = Core::new().unwrap();    let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));    println!("wfi_1 == {:?}", wfi_1);    let wfi_2 = WaitForIt::new("I'm done too:".to_owned(), Duration::seconds(1));    println!("wfi_2 == {:?}", wfi_2);    let v = vec![wfi_1, wfi_2];    let sel = join_all(v);    let ret = reactor.run(sel).unwrap();    println!("ret == {:?}", ret);}

这里的关键点是两个请求是交错的:第一个Future被调用,然后是第二个,然后是第一个,依此类推,直到两个完成。 如上图所示,第一个Future在第二个之前完成。 第二个在完成之前被调用两次。

Select

Future的特性还有很多功能。 这里值得探讨的另一件事是select函数。 select函数运行两个(或者在select_all的情况下更多)Future,并返回第一个完成。 这对于实现超时很有用。 我们的例子可以简单:

fn main() {    let mut reactor = Core::new().unwrap();    let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));    println!("wfi_1 == {:?}", wfi_1);    let wfi_2 = WaitForIt::new("I'm done too:".to_owned(), Duration::seconds(2));    println!("wfi_2 == {:?}", wfi_2);    let v = vec![wfi_1, wfi_2];    let sel = select_all(v);    let ret = reactor.run(sel).unwrap();    println!("ret == {:?}", ret);}

Closing remarks

下篇将会创建一个更RealFuture.

可运行的代码

extern crate chrono;extern crate futures;extern crate tokio_core;use futures::done;use futures::prelude::*;use futures::future::{err, ok};use tokio_core::reactor::Core;use std::error::Error;use futures::prelude::*;use futures::*;use chrono::prelude::*;use chrono::*;use futures::future::join_all;#[derive(Debug)]struct WaitForIt {    message: String,    until: DateTime
, polls: u64,}impl WaitForIt { pub fn new(message: String, delay: Duration) -> WaitForIt { WaitForIt { polls: 0, message: message, until: Utc::now() + delay, } }}iml Future for WaitForIt { type Item = String; type Error = Box
; fn poll(&mut self) -> Poll
{ let now = Utc::now(); if self.until < now { Ok(Async::Ready( format!("{} after {} polls!", self.message, self.polls), )) } else { self.polls += 1; println!("not ready yet --> {:?}", self); futures::task::current().notify(); Ok(Async::NotReady) } }}fn main() { let mut reactor = Core::new().unwrap(); let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1)); println!("wfi_1 == {:?}", wfi_1); let wfi_2 = WaitForIt::new("I'm done too:".to_owned(), Duration::seconds(1)); println!("wfi_2 == {:?}", wfi_2); let v = vec![wfi_1, wfi_2]; let sel = join_all(v); let ret = reactor.run(sel).unwrap(); println!("ret == {:?}", ret);}

转载地址:http://ielix.baihongyu.com/

你可能感兴趣的文章
hdu2159
查看>>
Windows7+VS2012下OpenGL 4的环境配置
查看>>
Maven for Eclipse 第一章 ——Maven的介绍
查看>>
Linux Kernel中断子系统来龙去脉浅析【转】
查看>>
Linux NFS服务器的安装与配置
查看>>
Ada boost学习
查看>>
Unity中SendMessage和Delegate效率比较
查看>>
Linux下EPoll通信模型简析
查看>>
react-native 制作购物车ShopCart
查看>>
Linux服务器 java生成的图片验证码乱码问题
查看>>
【转】QT中QDataStream中浮点数输出问题
查看>>
AD RMS之Windows 内部数据库迁移到 SQL 服务器
查看>>
记录我第一次在Android开发图像处理算法的经历
查看>>
mongodb3.2配置文件yaml格式 详解
查看>>
git设置默认编辑为vim
查看>>
android api (82) —— InputConnection [输入法]
查看>>
数据库事务的四大特性
查看>>
webshell 提升 for linux
查看>>
Java游戏开发中怎样才能获得更快的FPS?
查看>>
文件搜索工具
查看>>