Rust中channel的使用

https://blog.csdn.net/weixin_44277699/article/details/124422055


关于Rust中的channel


Rust的channel是一种用于在不同线程间传递信息的通信机制,它实现了线程间的消息传递。

Channel允许在Rust中创建一个消息传递渠道,它返回一个元组结构体,其中包含发送和接收端。发送端用于向通道发送数据,而接收端则用于从通道接收数据。

每个channel由两部分组成:发送端(Sender)和接收端(Receiver)。

发送端用于向channel发送消息,而接收端则用于接收这些消息。这种机制允许线程之间的安全通信,避免了共享内存的复杂性和潜在的数据竞争问题。 (通过通信来共享内存,而非通过共享内存来通信)

Rust的channel为线程间通信提供了一种安全、简单的方式,是构建并发应用的基础工具之一。


channel是Rust标准库的一部分,自Rust 1.0版本以来就包含了这个功能。随着Rust语言和标准库的发展,channel的实现和API可能会有所改进,但其基本概念和用法保持一致。


使用方式


基本步骤如下:

  1. 创建: 使用std::sync::mpsc::channel()函数创建一个新的channel,这个函数返回一个包含发送端(Sender)和接收端(Receiver)的元组。

  2. 发送: 使用发送端的send方法发送消息。send方法接受一个消息值,如果接收端已经被丢弃,会返回一个错误。

  3. 接收: 使用接收端的recv方法接收消息。recv会阻塞当前线程直到一个消息可用,或者channel被关闭。

示例

以下是一个使用channel在两个线程间发送和接收消息的简单例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use std::sync::mpsc;
use std::thread;

fn main() {
// 创建一个channel
let (tx, rx) = mpsc::channel();

// 创建一个新线程,并向其中发送一个消息
thread::spawn(move || {
let msg = "Hello from the thread";
tx.send(msg).unwrap();
println!("Sent message: {}", msg);
});

// 在主线程中接收消息
let received = rx.recv().unwrap();
println!("Received message: {}", received);
}

上面例子展示了channel的基本方法:先创建一个channel,然后在一个新线程中发送一个字符串消息,并在主线程中接收这个消息。

注意: 发送端tx通过move关键字移动到新线程中,这是因为Rust的所有权规则要求确保使用数据的线程拥有该数据的所有权。


关于MPSC


其中mpsc是Multi producer, Single consumer FIFO queue的缩写,即多生产者单消费者先入先出队列

Rust标准库提供的channel是MPSC(多生产者,单消费者)模型,这意味着可以有多个发送端(Sender)向同一个接收端(Receiver)发送消息。这种模式非常适用于工作队列模型,其中多个生产者线程生成任务,而单个消费者线程处理这些任务。


除了MPSC之外,Rust channel 还支持其他几种模型:

  • SPSC(Single Producer Single Consumer):单生产者单消费者。只允许一个goroutine向通道发送,只允许一个goroutine从通道接收。

  • SPMC(Single Producer Multiple Consumer):单生产者多消费者。只允许一个goroutine向通道发送,允许多个goroutine从通道接收。

  • MPSC(Multi Producer Single Consumer):多生产者单消费者, rust中标准的mpsc模型。允许多个goroutine向通道发送,只允许一个goroutine从通道接收。

  • MPMC(Multi Producer Multi Consumer):多生产者多消费者。允许多个goroutine向通道发送和接收。Rust标准库没有提供这种通道,需要依靠第三方库如 crossbeam 来实现。

故而,Rust的通道主要支持SPSC、SPMC、MPSC三种模型

MPSC是标准库里最常用的一个模型

(如果需要多消费者,可以使用MPMC第三方库如crossbeam,或自己实现)


不需要阻塞吗?


主线程是否会立马结束退出程序?

在上面的示例中,如果主线程执行得太快,有可能在接收到 子线程发送消息之前就结束了,没打印出接收到的内容程序就退出了.

但事实上,并没有发生这种现象. 即便在新进程段添加休眠3s的代码,thread::sleep(std::time::Duration::from_secs(3));, 程序也不会提早退出.

关于Rust中程序的休眠,可参考Rust中程序休眠的几种方式


这是因为,recv方法是阻塞的,即 它会阻塞当前线程, 直到从通道中接收到消息。

因此,在上面例子中,主线程在调用rx.recv().unwrap()时会阻塞 等待消息的到来。一旦子线程通过tx.send(msg).unwrap();发送了消息,主线程会接收到这个消息并继续执行,之后程序才会正常退出。



探索更多阻塞方式


可以使用join方法,来确保主线程等待一个或多个子线程完成执行。这在处理多个线程时特别有用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use std::sync::mpsc;
use std::thread;

fn main() {
let (tx, rx) = mpsc::channel();

// 创建一个新线程,并保存其句柄
let handle = thread::spawn(move || {
let msg = "Hello from the thread";
tx.send(msg).unwrap();
println!("Sent message: {}", msg);
});

// 在主线程中接收消息
let received = rx.recv().unwrap();
println!("Received message: {}", received);

// 使用join等待子线程完成
handle.join().unwrap();
}

thread::spawn返回一个JoinHandle,通过调用这个句柄的join方法来确保主线程在子线程完成其执行之后才继续执行


但是因为recv方法本身就是阻塞的,已经确保了主线程会等待至少一个消息的到来,这时再使用join看起来没有太大必要。

但当有多个线程执行独立任务,且这些任务不一定涉及到主线程立即需要的通道通信时,join的作用就变得非常明显了, 如下示例展示了如何创建多个线程,并使用join确保它们都完成了工作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
use std::thread;
use std::time::Duration;

fn main() {
// 创建一个向量来存储子线程的句柄
let mut handles = vec![];

for i in 0..10 {
// 创建10个子线程
let handle = thread::spawn(move || {
println!("Thread {} is starting", i);
println!("--------------");
// 模拟工作负载,耗时1s
thread::sleep(Duration::from_secs(1));
println!("Thread {} has finished", i);
println!("~~~~~~~~~~~~~~");
});
handles.push(handle);
}


// 等待所有子线程完成
for handle in handles {
handle.join().unwrap();
}

println!("All threads have finished");
}

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Thread 0 is starting
--------------
Thread 1 is starting
--------------
Thread 3 is starting
--------------
Thread 2 is starting
--------------
Thread 4 is starting
--------------
Thread 5 is starting
--------------
Thread 6 is starting
--------------
Thread 7 is starting
--------------
Thread 9 is starting
--------------
Thread 8 is starting
-------------- (到此都是立刻打印出来; 下面的输出等1s后一股脑打印出来)
Thread 0 has finished
~~~~~~~~~~~~~~
Thread 1 has finished
Thread 2 has finished
Thread 5 has finished
~~~~~~~~~~~~~~
~~~~~~~~~~~~~~
Thread 4 has finished
~~~~~~~~~~~~~~
Thread 6 has finished
~~~~~~~~~~~~~~
Thread 3 has finished
~~~~~~~~~~~~~~
~~~~~~~~~~~~~~
Thread 7 has finished
~~~~~~~~~~~~~~
Thread 8 has finished
~~~~~~~~~~~~~~
Thread 9 has finished
~~~~~~~~~~~~~~
All threads have finished

在这个例子中创建了10个子线程,每个子线程都模拟执行一些操作,然后在主线程中使用一个循环来join这些线程。

通过这种方式,即使这些子线程并没有向主线程发送任何消息,仍然能够确保它们都完成了各自的工作,然后程序才会退出。这就是join在处理多个线程时的优势所在。

使用join确保主线程等待所有子线程完成其任务,这在处理并行计算、执行多个独立任务时特别重要,因为这些任务可能不会立即或根本不会向主线程报告其完成状态。在这种情况下,如果没有使用join,主线程可能会在子线程完成它们的工作之前结束,导致程序提前退出,而且可能留下未完成的后台工作。



Rust channel的更多高阶用法


Rust中的channel不仅仅支持简单的消息传递,还可以用于实现更复杂的并发模式和高级用法。这些用法可以增加程序的灵活性和性能,特别是在处理大量数据、多线程任务或需要高度并行的场景中。


选择性接收(Select)


在处理多个channel时,可能希望能够选择性地接收多个来源的消息。

Rust的标准库目前并没有直接支持select机制,但是crossbeam-channel库提供了这样的功能,使得可以从多个channel中选择性地接收消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use crossbeam_channel::{select, unbounded};
use std::thread;

fn main() {
let (tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded();

thread::spawn(move || {
tx1.send(1).unwrap();
});

thread::spawn(move || {
tx2.send(2).unwrap();
});

select! {
recv(rx1) -> msg => println!("Received {} from rx1", msg.unwrap()),
recv(rx2) -> msg => println!("Received {} from rx2", msg.unwrap()),
}
}

cargo add crossbeam_channel 添加依赖库,

而后多次 cargo run, 可以发现,会在Received 1 from rx1
和Received 2 from rx2中随机打印其中一个


如上代码演示了如何在Rust中使用crossbeam-channel库实现选择性接收(select)机制。该机制允许程序从多个不同的channel中接收消息,而不是被限制在单一的channel上等待。这是通过select!宏来实现的,它可以监听多个channel,并在任一channel接收到消息时立即响应。

具体来说,代码的功能如下:

  1. 引入库:首先,引入了crossbeam_channelselectunbounded,以及std::threadcrossbeam_channel是一个提供了高性能channel实现的外部库,包括了select机制。unbounded用于创建一个无界(unbounded)的channel,即没有容量限制的channel。

  2. 创建无界channel:通过调用unbounded()函数,创建了两个无界channel,分别是tx1/rx1tx2/rx2。这里,tx1tx2是发送端(Sender),而rx1rx2是接收端(Receiver)。

  3. 发送消息:接下来,创建了两个线程,每个线程向各自的channel发送一个整数消息,第一个线程通过tx1发送1,第二个线程通过tx2发送2。这两个线程是并行执行的,因此发送操作是异步的。

  4. 选择性接收消息select!宏用于同时监听rx1rx2这两个接收端。当任一channel接收到消息时,select!宏会立即匹配到相应的分支并执行。这里有两个recv调用,分别对应两个接收端。一旦任一接收端接收到消息,对应的代码块就会执行,并打印出接收到的消息及其来源。msg.unwrap()用于获取Result类型中的消息值,前提是没有发生错误。

代码中的select!宏使得程序不必在单一的channel上阻塞等待,而是可以灵活地处理来自多个源的消息。这种模式在需要处理多个异步事件源时非常有用,例如在网络服务器或并发系统中处理来自不同客户端或任务的输入。

有点类似Go的select语句


迭代器接收


Receiver实现了Iterator,这意味着可以使用迭代器的方式接收所有可用的消息,直到channel被关闭。这种方式简化了接收端的代码,特别是当需要处理所有消息而不必关心接收的具体时机时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
use std::sync::mpsc;
use std::thread;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
for i in 1..=5 {
tx.send(i).unwrap();
}
});

// 通过迭代器接收消息
for received in rx {
println!("Received: {}", received);
}
}

输出:

1
2
3
4
5
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5


https://www.bilibili.com/video/BV1K2421F7dV

让我们讨论Rust语言中的channel通道及其使用。Channel通道允许在Rust中创建一个消息传递渠道,它返回一个元组结构体,其中包含发送和接收端。发送端用于向通道发送数据,而接收端则用于从通道接收数据。例如,如果你定义了一个结构体A,其中包含一个名为name的字符串字段,你可以将A的实例发送到通道中。这样,通道就专门用于接收A结构体的实例。

当你通过解构方式接收数据时,Rust允许你直接获取通道中的数据,而不是整个消息包装。这意味着如果你发送了一个结构体A的实例到通道,接收端将得到相同类型的实例。基本的使用方式就是创建发送者和接收者,通过它们来进行数据的发送和接收。

一个重要的概念是发送者和接收者的引用计数。当任一端的引用计数降至零时,通道会被自动释放。这与Rust的Arc智能指针类似,允许多个发送者或接收者共享对通道的访问。这意味着你可以通过克隆发送者来创建多个发送者,每个发送者都可以向通道发送数据。

通道的一个关键应用是在异步运行时中,如Rust的mini-tokio,用于执行异步任务。你可以创建一个通道来存储任务,这些任务封装了将要执行的Future。这样,不同的线程可以作为执行器,发送和接收任务,通过通道协调任务的执行。

例如,如果你创建了一个通道,并为每个线程克隆了一个发送者,这些线程可以向通道发送封装了异步代码的任务。接收端,如执行器,可以从通道中接收这些任务并执行它们。这种方式允许跨线程的异步任务调度和执行,是Rust异步编程中的一种常见模式。

在实践中,将发送者和接收者分布到不同的函数或文件中可以提高代码的模块化和可读性。这样,一部分代码负责发送数据,而另一部分代码则负责接收和处理这些数据,从而实现更清晰、更易于管理的代码结构。

总之,Rust的channel通道是一种强大的并发通信机制,它支持高效的数据传递和异步任务执行,是Rust编程中不可或缺的一部分。


现在,让我们谈谈Rust中的channels(通道)。channels是Rust中实现线程间通信的一种方式,允许一个线程向另一个线程发送数据。当你创建一个通道时,它会返回一个包含发送者(sender)和接收者(receiver)的元组。发送者用于向通道发送数据,而接收者用于从通道接收数据。

以一个简单的例子为例,假设我们定义了一个结构体A,包含一个名为name的字符串字段。我们可以创建一个专门用于接收A类型数据的通道。在发送数据时,我们通过发送者将A实例发送到通道中。接收数据时,我们通过接收者从通道中取出数据。如果通道中有数据,接收者将能够成功接收到A类型的数据。

一个重要的概念是,当通道的发送者或接收者数量减至零时,通道会被自动释放。这意味着,如果所有的发送者都已经被丢弃,那么通道会被关闭,反之亦然。这个机制对于管理资源和避免悬挂线程非常重要。

通道在异步编程中尤为重要,因为它们允许多个线程或任务之间安全地传递信息。例如,在一个异步运行时中,通道可以用于将任务(futures)从生产者(可能是多个线程)传递给消费者(执行器),这些任务随后将被调度执行。这种机制使得在复杂的异步应用程序中管理和调度任务变得更加容易。

简而言之,Rust的通道提供了一种强大的方式来进行线程间或任务间的通信,它是构建并发和异步Rust应用程序的关键组件。