Tokio - Rust 异步运行时
Tokio 是 Rust 生态中最流行的异步运行时,为编写高性能异步程序提供基础设施。
什么是 Tokio?
Tokio 是一个事件驱动的非阻塞 I/O 平台,用于编写异步应用程序。
核心组件
- Runtime:任务执行器和调度器
- Reactor:I/O 事件监听器(基于 mio)
- Async I/O:异步网络、文件系统操作
- Synchronization:异步同步原语
- Timers:异步定时器
Runtime 架构
多线程运行时
use tokio::runtime::Runtime;
fn main() {
// 创建多线程运行时
let rt = Runtime::new().unwrap();
rt.block_on(async {
println!("Hello from Tokio!");
});
}
// 使用宏简化
#[tokio::main]
async fn main() {
println!("Hello from Tokio!");
}
单线程运行时
use tokio::runtime::Builder;
fn main() {
let rt = Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
println!("Single-threaded runtime");
});
}
async/await 基础
async 函数
async fn fetch_data() -> Result<String, reqwest::Error> {
let response = reqwest::get("https://api.example.com/data").await?;
let text = response.text().await?;
Ok(text)
}
#[tokio::main]
async fn main() {
match fetch_data().await {
Ok(data) => println!("Data: {}", data),
Err(e) => eprintln!("Error: {}", e),
}
}
async 块
#[tokio::main]
async fn main() {
let future = async {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
println!("1 second passed");
};
future.await;
}
Future Trait
Future 定义
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct MyFuture {
value: Option<i32>,
}
impl Future for MyFuture {
type Output = i32;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(value) = self.value.take() {
Poll::Ready(value)
} else {
// 注册 Waker,等待唤醒
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
并发执行
spawn 任务
use tokio::task;
#[tokio::main]
async fn main() {
let handle1 = task::spawn(async {
println!("Task 1");
42
});
let handle2 = task::spawn(async {
println!("Task 2");
100
});
let result1 = handle1.await.unwrap();
let result2 = handle2.await.unwrap();
println!("Results: {}, {}", result1, result2);
}
join! 宏
use tokio::join;
#[tokio::main]
async fn main() {
let (res1, res2, res3) = join!(
async { 1 + 1 },
async { 2 * 2 },
async { 3 - 1 }
);
println!("{}, {}, {}", res1, res2, res3);
}
select! 宏
use tokio::select;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
select! {
_ = interval.tick() => println!("Tick"),
_ = sleep(Duration::from_secs(5)) => {
println!("Timeout!");
break;
}
}
}
}
异步 I/O
TCP Server
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = [0; 1024];
match socket.read(&mut buf).await {
Ok(n) if n == 0 => return,
Ok(n) => {
socket.write_all(&buf[0..n]).await.unwrap();
}
Err(e) => eprintln!("Error: {}", e),
}
});
}
}
Channels
mpsc Channel
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
tokio::spawn(async move {
for i in 0..10 {
tx.send(i).await.unwrap();
}
});
while let Some(msg) = rx.recv().await {
println!("Received: {}", msg);
}
}
oneshot Channel
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
tx.send("Hello from task!").unwrap();
});
let msg = rx.await.unwrap();
println!("{}", msg);
}
同步原语
Mutex
use tokio::sync::Mutex;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let data = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let data = Arc::clone(&data);
let handle = tokio::spawn(async move {
let mut lock = data.lock().await;
*lock += 1;
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
println!("Result: {}", *data.lock().await);
}
性能对比
异步 vs 同步
// 同步版本
fn sync_download() {
let urls = vec!["url1", "url2", "url3"];
for url in urls {
// 阻塞等待
let _ = reqwest::blocking::get(url);
}
}
// 异步版本(更高效)
async fn async_download() {
let urls = vec!["url1", "url2", "url3"];
let futures: Vec<_> = urls.iter()
.map(|url| reqwest::get(*url))
.collect();
// 并发执行
let results = futures_util::future::join_all(futures).await;
}
优势:
- 异步版本可以并发处理多个请求
- 不阻塞线程,充分利用资源
- 适合 I/O 密集型任务
小结
- ✅ Tokio 是 Rust 最流行的异步运行时
- ✅ 基于 Future + Reactor 模式
- ✅ 提供多线程任务调度(工作窃取)
- ✅ 丰富的异步 I/O 和同步原语
- ✅ 适合高并发 I/O 密集型应用
- ✅ 零成本抽象,性能接近手写状态机
探索更多:Runtime 架构 | 任务调度 | Reactor 模式