Tokio - Rust 异步运行时

Tokio 是 Rust 生态中最流行的异步运行时,为编写高性能异步程序提供基础设施。

什么是 Tokio?

Tokio 是一个事件驱动的非阻塞 I/O 平台,用于编写异步应用程序。

核心组件

  • Runtime:任务执行器和调度器
  • Reactor:I/O 事件监听器(基于 mio)
  • Async I/O:异步网络、文件系统操作
  • Synchronization:异步同步原语
  • Timers:异步定时器

Runtime 架构

Tokio Runtime 架构可视化

📦 全局任务队列(Global Queue)
File I/O
Network
线程 0运行中
HTTP Request
3s
线程 1运行中
DB Query
2s
线程 2空闲
本地队列为空
线程 3空闲
本地队列为空
工作窃取(Work Stealing)
当线程 2 和 3 空闲时,它们会从线程 0 和 1 的本地队列中窃取任务来执行,实现负载均衡。
🎯 多线程运行时
Tokio 默认创建与 CPU 核心数相同的工作线程,每个线程有独立的本地队列。
⚖️ 负载均衡
通过工作窃取算法,确保所有线程都有任务执行,避免某些线程空闲。

多线程运行时

use tokio::runtime::Runtime;

fn main() {
    // 创建多线程运行时
    let rt = Runtime::new().unwrap();

    rt.block_on(async {
        println!("Hello from Tokio!");
    });
}

// 使用宏简化
#[tokio::main]
async fn main() {
    println!("Hello from Tokio!");
}

单线程运行时

use tokio::runtime::Builder;

fn main() {
    let rt = Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();

    rt.block_on(async {
        println!("Single-threaded runtime");
    });
}

async/await 基础

async 函数

async fn fetch_data() -> Result<String, reqwest::Error> {
    let response = reqwest::get("https://api.example.com/data").await?;
    let text = response.text().await?;
    Ok(text)
}

#[tokio::main]
async fn main() {
    match fetch_data().await {
        Ok(data) => println!("Data: {}", data),
        Err(e) => eprintln!("Error: {}", e),
    }
}

async 块

#[tokio::main]
async fn main() {
    let future = async {
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        println!("1 second passed");
    };

    future.await;
}

Future Trait

Future 状态机

pending
polling
ready
completed
PENDING
任务创建,等待首次 poll
📝 代码示例
async fn fetch_data() {
  let future = reqwest::get(url);

  // Poll 1: Pending
  // Poll 2: Pending
  // Poll 3: Ready!

  let data = future.await;
  data
}
🔍 Poll 机制
Poll::Pending:资源未就绪,注册 Waker
Poll::Ready(T):完成,返回结果
Waker 会在资源就绪时唤醒任务
⚙️ 状态转换规则
1. Pending → Polling: 执行器开始轮询 Future
2. Polling → Pending: 返回 Poll::Pending,等待唤醒
3. Polling → Ready: 资源就绪,准备完成
4. Ready → Completed: 返回 Poll::Ready(T)

Future 定义

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct MyFuture {
    value: Option<i32>,
}

impl Future for MyFuture {
    type Output = i32;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if let Some(value) = self.value.take() {
            Poll::Ready(value)
        } else {
            // 注册 Waker,等待唤醒
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

并发执行

spawn 任务

use tokio::task;

#[tokio::main]
async fn main() {
    let handle1 = task::spawn(async {
        println!("Task 1");
        42
    });

    let handle2 = task::spawn(async {
        println!("Task 2");
        100
    });

    let result1 = handle1.await.unwrap();
    let result2 = handle2.await.unwrap();

    println!("Results: {}, {}", result1, result2);
}

join! 宏

use tokio::join;

#[tokio::main]
async fn main() {
    let (res1, res2, res3) = join!(
        async { 1 + 1 },
        async { 2 * 2 },
        async { 3 - 1 }
    );

    println!("{}, {}, {}", res1, res2, res3);
}

select! 宏

use tokio::select;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let mut interval = tokio::time::interval(Duration::from_secs(1));

    loop {
        select! {
            _ = interval.tick() => println!("Tick"),
            _ = sleep(Duration::from_secs(5)) => {
                println!("Timeout!");
                break;
            }
        }
    }
}

异步 I/O

TCP Server

use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            let mut buf = [0; 1024];

            match socket.read(&mut buf).await {
                Ok(n) if n == 0 => return,
                Ok(n) => {
                    socket.write_all(&buf[0..n]).await.unwrap();
                }
                Err(e) => eprintln!("Error: {}", e),
            }
        });
    }
}

Channels

mpsc Channel

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);

    tokio::spawn(async move {
        for i in 0..10 {
            tx.send(i).await.unwrap();
        }
    });

    while let Some(msg) = rx.recv().await {
        println!("Received: {}", msg);
    }
}

oneshot Channel

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();

    tokio::spawn(async move {
        tx.send("Hello from task!").unwrap();
    });

    let msg = rx.await.unwrap();
    println!("{}", msg);
}

同步原语

Mutex

use tokio::sync::Mutex;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let data = Arc::new(Mutex::new(0));

    let mut handles = vec![];

    for _ in 0..10 {
        let data = Arc::clone(&data);
        let handle = tokio::spawn(async move {
            let mut lock = data.lock().await;
            *lock += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.await.unwrap();
    }

    println!("Result: {}", *data.lock().await);
}

性能对比

异步 vs 同步

// 同步版本
fn sync_download() {
    let urls = vec!["url1", "url2", "url3"];
    for url in urls {
        // 阻塞等待
        let _ = reqwest::blocking::get(url);
    }
}

// 异步版本(更高效)
async fn async_download() {
    let urls = vec!["url1", "url2", "url3"];
    let futures: Vec<_> = urls.iter()
        .map(|url| reqwest::get(*url))
        .collect();

    // 并发执行
    let results = futures_util::future::join_all(futures).await;
}

优势

  • 异步版本可以并发处理多个请求
  • 不阻塞线程,充分利用资源
  • 适合 I/O 密集型任务

小结

  • ✅ Tokio 是 Rust 最流行的异步运行时
  • ✅ 基于 Future + Reactor 模式
  • ✅ 提供多线程任务调度(工作窃取)
  • ✅ 丰富的异步 I/O 和同步原语
  • ✅ 适合高并发 I/O 密集型应用
  • ✅ 零成本抽象,性能接近手写状态机

探索更多:Runtime 架构 | 任务调度 | Reactor 模式