Многонишково програмиране
21 ноември 2024
Fearless concurrency
Fearless concurrency
Rust предотвратява data races
Fearless concurrency
Rust предотвратява data races
- две нишки достъпват една и съща памет
- поне единия достъп е за писане
- достъпите не са синхронизирани
Fearless concurrency
Rust предотвратява data races
- две нишки достъпват една и съща памет
- поне единия достъп е за писане
- достъпите не са синхронизирани
Rust закодира в типовата система понятието за thread safety
Fearless concurrency
Rust предотвратява data races
- две нишки достъпват една и съща памет
- поне единия достъп е за писане
- достъпите не са синхронизирани
Rust закодира в типовата система понятието за thread safety
- кои обекти или операции могат да се използват безопасно в паралелен код
- компилационна грешка при нарушаване
Fearless concurrency
Rust предотвратява data races
- две нишки достъпват една и съща памет
- поне единия достъп е за писане
- достъпите не са синхронизирани
Rust закодира в типовата система понятието за thread safety
- кои обекти или операции могат да се използват безопасно в паралелен код
- компилационна грешка при нарушаване
Rust не може да предотврати логически бъгове - race conditions, deadlocks и др.
Fearless concurrency
Rust предотвратява data races
- две нишки достъпват една и съща памет
- поне единия достъп е за писане
- достъпите не са синхронизирани
Rust закодира в типовата система понятието за thread safety
- кои обекти или операции могат да се използват безопасно в паралелен код
- компилационна грешка при нарушаване
Rust не може да предотврати логически бъгове - race conditions, deadlocks и др.
- но добри абстракции помагат с това
Нишки
thread::spawnпуска нова нишка на операционната система- подадената функция се изпълнява в новата нишка
- когато функцията завърши, нишката се спира
use std::thread;
fn main() {
thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
}
use std::thread;
fn main() {
thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
}
Нишки
- програмата приключва, когато главната нишка завърши
- останалите нишки се убиват
use std::thread;
fn main() {
thread::spawn(|| {
// това няма да се принтира, защото програмата
// ще завърши преди втората нишка да е започнала
println!("hi from spawned thread");
});
println!("hi from main thread");
}
hi from main thread
use std::thread;
fn main() {
thread::spawn(|| {
// това няма да се принтира, защото програмата
// ще завърши преди втората нишка да е започнала
println!("hi from spawned thread");
});
println!("hi from main thread");
}
Нишки
- ако искаме да изчакаме, трябва да си запазим
JoinHandle join()блокира, докато нишката не приключи
use std::thread;
fn main() {
let handle = thread::spawn(|| {
// това ще се принтира, защото главната нишка
// ни изчаква
println!("hi from spawned thread")
});
println!("hi from main thread");
let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread;
fn main() {
let handle = thread::spawn(|| {
// това ще се принтира, защото главната нишка
// ни изчаква
println!("hi from spawned thread")
});
println!("hi from main thread");
let _ = handle.join();
}
Нишки
Сигнатурата на std::thread::spawn
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
Нишки
spawnвръщаJoinHandle- можем да използваме
joinза да изчакаме пуснатите нишки - когато
JoinHandleсе drop-не нишката се detach-ва
use std::thread;
fn main() {
let handle = thread::spawn(|| {
// това ще се принтира, защото изчакваме
println!("hi from spawned thread")
});
println!("hi from main thread");
let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread;
fn main() {
let handle = thread::spawn(|| {
// това ще се принтира, защото изчакваме
println!("hi from spawned thread")
});
println!("hi from main thread");
let _ = handle.join();
}
Нишки
- типа
TвJoinHandle<T>е резултата от подадената функция - обикновенно ще е
(), но може да се върне и друго
use std::thread;
fn main() {
let handle = thread::spawn(|| {
// very hard computation ...
42
});
let answ = handle.join();
println!("The answer is {:?}", answ);
}
The answer is Ok(42)
use std::thread;
fn main() {
let handle = thread::spawn(|| {
// very hard computation ...
42
});
let answ = handle.join();
println!("The answer is {:?}", answ);
}
Нишки
joinвръщаErr, ако във функцията е имало паника
use std::thread;
fn main() {
let handle = thread::spawn(|| {
panic!("too hard computation ...");
});
let answ = handle.join();
println!("The answer is {:?}", answ);
}
thread '<unnamed>' panicked at src/bin/main_480a341d0afaddfcfc405e2dd9b455c8eb27e962.rs:5:9: too hard computation ... note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace The answer is Err(Any { .. })
use std::thread;
fn main() {
let handle = thread::spawn(|| {
panic!("too hard computation ...");
});
let answ = handle.join();
println!("The answer is {:?}", answ);
}
Panic в нишка
panic!в нишка unwind-ва стека и убива нишката
Panic в нишка
panic!в нишка unwind-ва стека и убива нишката- ако това е главната нишка,
panic!убива програмата (и всички други нишки)
Panic в нишка
panic!в нишка unwind-ва стека и убива нишката- ако това е главната нишка,
panic!убива програмата (и всички други нишки) - ако не е главната нишка - връща се грешка от
join()
Panic в нишка
panic!в нишка unwind-ва стека и убива нишката- ако това е главната нишка,
panic!убива програмата (и всички други нишки) - ако не е главната нишка - връща се грешка от
join() - паниката може да се продължи с
join().unwrap()
Споделяне на стойности
Споделяне на стойности
Нека искаме да достъпим една и съща стойност от няколко нишки.
Тривиалният подход …
use std::thread;
fn main() {
let nums = vec![0, 1, 2, 3];
let handle = thread::spawn(|| {
for i in &nums {
println!("number {}", i);
}
});
let _ = handle.join();
}
Споделяне на стойности
Нека искаме да достъпим една и съща стойност от няколко нишки.
Тривиалният подход - води до компилационна грешка.
use std::thread;
fn main() {
let nums = vec![0, 1, 2, 3];
let handle = thread::spawn(|| {
for i in &nums {
println!("number {}", i);
}
});
let _ = handle.join();
}
error[E0373]: closure may outlive the current function, but it borrows `nums`, which is owned by the current function --> src/bin/main_44c598e1dd6669cdea1e15fcdbf808719a029646.rs:6:32 | 6 | let handle = thread::spawn(|| { | ^^ may outlive borrowed value `nums` 7 | for i in &nums { | ---- `nums` is borrowed here | note: function requires argument type to outlive `'static` --> src/bin/main_44c598e1dd6669cdea1e15fcdbf808719a029646.rs:6:18 | 6 | let handle = thread::spawn(|| { | __________________^ 7 | | for i in &nums { 8 | | println!("number {}", i); 9 | | } 10 | | }); | |______^ help: to force the closure to take ownership of `nums` (and any other referenced variables), use the `move` keyword | 6 | let handle = thread::spawn(move || { | ++++ For more information about this error, try `rustc --explain E0373`. error: could not compile `rust` (bin "main_44c598e1dd6669cdea1e15fcdbf808719a029646") due to 1 previous error
use std::thread;
fn main() {
let nums = vec![0, 1, 2, 3];
let handle = thread::spawn(|| {
for i in &nums {
println!("number {}", i);
}
});
let _ = handle.join();
}
Споделяне на стойности
- новосъздадената нишка може да надживее функцията в която е извикана
fn thread1() {
println!("thread 1 started");
thread::spawn(|| {
println!("thread 2 started");
thread::sleep(std::time::Duration::from_millis(1));
println!("thread 2 will exit");
});
println!("thread 1 will exit");
}
fn main() {
let _ = thread::spawn(thread1).join();
println!("thread 1 exited");
thread::sleep(std::time::Duration::from_millis(100));
}
thread 1 started thread 1 will exit thread 2 started thread 1 exited thread 2 will exit
use std::thread;
fn thread1() {
println!("thread 1 started");
thread::spawn(|| {
println!("thread 2 started");
thread::sleep(std::time::Duration::from_millis(1));
println!("thread 2 will exit");
});
println!("thread 1 will exit");
}
fn main() {
let _ = thread::spawn(thread1).join();
println!("thread 1 exited");
thread::sleep(std::time::Duration::from_millis(100));
}
Споделяне на стойности
- новосъздадената нишка може да надживее функцията в която е извикана
- затова Rust не позволява да подадем референции към локални променливи.
- това се налага от ограничението на
spawn, която приемаF: 'static
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static
Споделяне на стойности
Ако използваме стойността само от новата нишка, можем да я преместим с move closure
use std::thread;
fn main() {
let nums = vec![0, 1, 2, 3];
let handle = thread::spawn(move || {
for i in &nums {
println!("number {}", i);
}
});
let _ = handle.join();
}
number 0 number 1 number 2 number 3
use std::thread;
fn main() {
let nums = vec![0, 1, 2, 3];
let handle = thread::spawn(move || {
for i in &nums {
println!("number {}", i);
}
});
let _ = handle.join();
}
Споделяне на стойности
Но това не би работило ако имаме повече от една нишка
use std::thread;
fn main() {
let nums = vec![0, 1, 2, 3];
let mut handles = vec![];
for _ in 0..2 {
handles.push(thread::spawn(move || {
for i in &nums {
println!("number {}", i);
}
}));
}
for h in handles {
let _ = h.join();
}
}
error[E0382]: use of moved value: `nums` --> src/bin/main_fb665c10ea221cdd41e075bfd1468b11553fe02f.rs:8:36 | 4 | let nums = vec![0, 1, 2, 3]; | ---- move occurs because `nums` has type `Vec<i32>`, which does not implement the `Copy` trait ... 7 | for _ in 0..2 { | ------------- inside of this loop 8 | handles.push(thread::spawn(move || { | ^^^^^^^ value moved into closure here, in previous iteration of loop 9 | for i in &nums { | ---- use occurs due to use in closure For more information about this error, try `rustc --explain E0382`. error: could not compile `rust` (bin "main_fb665c10ea221cdd41e075bfd1468b11553fe02f") due to 1 previous error
use std::thread;
fn main() {
let nums = vec![0, 1, 2, 3];
let mut handles = vec![];
for _ in 0..2 {
handles.push(thread::spawn(move || {
for i in &nums {
println!("number {}", i);
}
}));
}
for h in handles {
let _ = h.join();
}
}
Scoped threads
Един вариант е да използваме scoped threads API-то
use std::thread;
fn main() {
let nums = vec![0, 1, 2, 3];
thread::scope(|s| {
for _ in 0..2 {
s.spawn(|| {
for i in &nums {
println!("number {}", i);
}
});
}
});
}
number 0 number 1 number 2 number 3 number 0 number 1 number 2 number 3
use std::thread;
fn main() {
let nums = vec![0, 1, 2, 3];
thread::scope(|s| {
for _ in 0..2 {
s.spawn(|| {
for i in &nums {
println!("number {}", i);
}
});
}
});
}
Scoped threads
thread::scope(|s /*: thread::Scope<'_, '_> */| {
// тази функция се изпълнява в същата нишка
for _ in 0..2 {
// Scope::spawn създава нова нишка
// Новата нишка може да държи референции към локални променливи
s.spawn(|| {
for i in &nums {
println!("number {}", i);
}
});
}
// на края на функцията всички нишки създадени чрез Scope::spawn
// се join-ват.
});
use std::thread;
fn main() {
let nums = vec![1, 2, 3];
thread::scope(|s /*: thread::Scope<'_, '_> */| {
// тази функция се изпълнява в същата нишка
for _ in 0..2 {
// Scope::spawn създава нова нишка
// Новата нишка може да държи референции към локални променливи
s.spawn(|| {
for i in &nums {
println!("number {}", i);
}
});
}
// на края на функцията всички нишки създадени чрез Scope::spawn
// се join-ват.
});
}
Scoped threads
В сигнатурата на Scope::spawn ограничението е F: 'scope, а не F: 'static
impl<'scope, 'env> Scope<'scope, 'env> {
pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
where
F: FnOnce() -> T + Send + 'scope,
T: Send + 'scope,
{ /* ... */ }
}
Споделяне на стойности
Друг вариант е да използваме нещо, което:
- притежава стойността - за да покрием ограничението
F: 'static - позволява споделяне на стойността
fn main() {
// TODO: какво да добавим тук?
let nums = vec![0, 1, 2, 3];
let mut handles = vec![];
for _ in 0..2 {
handles.push(thread::spawn(|| {
for i in &nums {
println!("number {}", i);
}
}));
}
for h in handles {
let _ = h.join();
}
}
error[E0373]: closure may outlive the current function, but it borrows `nums`, which is owned by the current function --> src/bin/main_9cb04f9a01531d4cd9e0ce9932649a0da8c1a35c.rs:9:36 | 9 | handles.push(thread::spawn(|| { | ^^ may outlive borrowed value `nums` 10 | for i in &nums { | ---- `nums` is borrowed here | note: function requires argument type to outlive `'static` --> src/bin/main_9cb04f9a01531d4cd9e0ce9932649a0da8c1a35c.rs:9:22 | 9 | handles.push(thread::spawn(|| { | ______________________^ 10 | | for i in &nums { 11 | | println!("number {}", i); 12 | | } 13 | | })); | |__________^ help: to force the closure to take ownership of `nums` (and any other referenced variables), use the `move` keyword | 9 | handles.push(thread::spawn(move || { | ++++ For more information about this error, try `rustc --explain E0373`. error: could not compile `rust` (bin "main_9cb04f9a01531d4cd9e0ce9932649a0da8c1a35c") due to 1 previous error
use std::thread;
fn main() {
// TODO: какво да добавим тук?
let nums = vec![0, 1, 2, 3];
let mut handles = vec![];
for _ in 0..2 {
handles.push(thread::spawn(|| {
for i in &nums {
println!("number {}", i);
}
}));
}
for h in handles {
let _ = h.join();
}
}
Споделяне на стойности - Rc
Rc позволява "споделена собственост" (shared ownership).
Това дали ще проработи?
fn main() {
let nums_vec = vec![0, 1, 2, 3];
let nums_rc = Rc::new(nums_vec);
let mut handles = vec![];
for _ in 0..2 {
let nums_rc = Rc::clone(&nums_rc);
handles.push(thread::spawn(move || {
for i in &*nums_rc {
println!("number {}", i);
}
}));
}
for h in handles {
let _ = h.join();
}
}
Споделяне на стойности - Rc
Rc позволява "споделена собственост" (shared ownership).
Това дали ще проработи? - не
fn main() {
let nums_vec = vec![0, 1, 2, 3];
let nums_rc = Rc::new(nums_vec);
let mut handles = vec![];
for _ in 0..2 {
let nums_rc = Rc::clone(&nums_rc);
handles.push(thread::spawn(move || {
for i in &*nums_rc {
println!("number {}", i);
}
}));
}
for h in handles {
let _ = h.join();
}
}
error[E0277]: `Rc<Vec<i32>>` cannot be sent between threads safely --> src/bin/main_1c4b38ddc008640338e13791daaf3eccafcab0dc.rs:11:36 | 11 | handles.push(thread::spawn(move || { | ------------- ^------ | | | | ______________________|_____________within this `{closure@src/bin/main_1c4b38ddc008640338e13791daaf3eccafcab0dc.rs:11:36: 11:43}` | | | | | required by a bound introduced by this call 12 | | for i in &*nums_rc { 13 | | println!("number {}", i); 14 | | } 15 | | })); | |_________^ `Rc<Vec<i32>>` cannot be sent between threads safely | = help: within `{closure@src/bin/main_1c4b38ddc008640338e13791daaf3eccafcab0dc.rs:11:36: 11:43}`, the trait `Send` is not implemented for `Rc<Vec<i32>>`, which is required by `{closure@src/bin/main_1c4b38ddc008640338e13791daaf3eccafcab0dc.rs:11:36: 11:43}: Send` note: required because it's used within this closure --> src/bin/main_1c4b38ddc008640338e13791daaf3eccafcab0dc.rs:11:36 | 11 | handles.push(thread::spawn(move || { | ^^^^^^^ note: required by a bound in `spawn` --> /rustc/f6e511eec7342f59a25f7c0534f1dbea00d01b14/library/std/src/thread/mod.rs:672:1 For more information about this error, try `rustc --explain E0277`. error: could not compile `rust` (bin "main_1c4b38ddc008640338e13791daaf3eccafcab0dc") due to 1 previous error
use std::thread;
use std::rc::Rc;
fn main() {
let nums_vec = vec![0, 1, 2, 3];
let nums_rc = Rc::new(nums_vec);
let mut handles = vec![];
for _ in 0..2 {
let nums_rc = Rc::clone(&nums_rc);
handles.push(thread::spawn(move || {
for i in &*nums_rc {
println!("number {}", i);
}
}));
}
for h in handles {
let _ = h.join();
}
}
Споделяне на стойности - Arc
Трябва да използваме Arc.
.
fn main() {
let nums_vec = vec![0, 1, 2, 3];
let nums_arc = Arc::new(nums_vec);
let mut handles = vec![];
for _ in 0..2 {
let nums_arc = Arc::clone(&nums_arc);
handles.push(thread::spawn(move || {
for i in &*nums_arc {
println!("number {}", i);
}
}));
}
for h in handles {
let _ = h.join();
}
}
number 0 number 1 number 2 number 3 number 0 number 1 number 2 number 3
use std::thread;
use std::sync::Arc;
fn main() {
let nums_vec = vec![0, 1, 2, 3];
let nums_arc = Arc::new(nums_vec);
let mut handles = vec![];
for _ in 0..2 {
let nums_arc = Arc::clone(&nums_arc);
handles.push(thread::spawn(move || {
for i in &*nums_arc {
println!("number {}", i);
}
}));
}
for h in handles {
let _ = h.join();
}
}
Споделяне на стойности - Arc
std::sync::Arc
Споделяне на стойности - Arc
std::sync::Arc
- "Atomically Reference Counted" value
Споделяне на стойности - Arc
std::sync::Arc
- "Atomically Reference Counted" value
- аналогично на
Rc(споделена собственост, позволява само взимане на&Tкъм вътрешността)
Споделяне на стойности - Arc
std::sync::Arc
- "Atomically Reference Counted" value
- аналогично на
Rc(споделена собственост, позволява само взимане на&Tкъм вътрешността) - но използва атомарни операции за броене на референциите
Споделяне на стойности - Arc
std::sync::Arc
- "Atomically Reference Counted" value
- аналогично на
Rc(споделена собственост, позволява само взимане на&Tкъм вътрешността) - но използва атомарни операции за броене на референциите
- поради това може да се използва от няколко нишки едновременно
Send и Sync
- грешката която получихме беше че
Rc<Vec<i32>>не имплементираSend - следователно closure-а
Fне имплементираSend - а
spawnизискваF: Send
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static
Send и Sync
Трейтовете std::marker::Send и std::marker::Sync показват дали един тип е thread safe.
Т.е. дали обекти от този тип могат да се използват безопасно в многонишков контекст.
pub unsafe auto trait Send { }
pub unsafe auto trait Sync { }
Send и Sync
Send
- позволява прехвърляне на стойност между нишки
Send и Sync
Send
- позволява прехвърляне на стойност между нишки
- пример за типове, които не са
Send:
Send и Sync
Send
- позволява прехвърляне на стойност между нишки
- пример за типове, които не са
Send:- thread local типове, напр.
rand::rngs::ThreadRng
- thread local типове, напр.
Send и Sync
Send
- позволява прехвърляне на стойност между нишки
- пример за типове, които не са
Send:- thread local типове, напр.
rand::rngs::ThreadRng Rc
- thread local типове, напр.
Send и Sync
Send
- позволява прехвърляне на стойност между нишки
- пример за типове, които не са
Send:- thread local типове, напр.
rand::rngs::ThreadRng Rc- голи указатели -
*const Tи*mut T
- thread local типове, напр.
Send и Sync
Sync
- позволява споделен достъп до стойност от няколко нишки
Send и Sync
Sync
- позволява споделен достъп до стойност от няколко нишки
- т.е. позволява прехвърляне на референция
&Tмежду нишки
Send и Sync
Sync
- позволява споделен достъп до стойност от няколко нишки
- т.е. позволява прехвърляне на референция
&Tмежду нишки T: Sync⟺&T: Send
Send и Sync
Sync
- позволява споделен достъп до стойност от няколко нишки
- т.е. позволява прехвърляне на референция
&Tмежду нишки T: Sync⟺&T: Send- пример за типове, които не са
Sync:
Send и Sync
Sync
- позволява споделен достъп до стойност от няколко нишки
- т.е. позволява прехвърляне на референция
&Tмежду нишки T: Sync⟺&T: Send- пример за типове, които не са
Sync:- internal mutability без синхронизация -
Rc,Cell,RefCell
- internal mutability без синхронизация -
Send и Sync
Sync
- позволява споделен достъп до стойност от няколко нишки
- т.е. позволява прехвърляне на референция
&Tмежду нишки T: Sync⟺&T: Send- пример за типове, които не са
Sync:- internal mutability без синхронизация -
Rc,Cell,RefCell - голи указатели -
*const Tи*mut T
- internal mutability без синхронизация -
Send и Sync
Въпрос
Дали обикновен тип като Vec<T> имплементира Sync?
- да, ако
T: Sync
Send и Sync
Въпрос
Дали обикновен тип като Vec<T> имплементира Sync?
- да, ако
T: Sync - ако нашата нишка има
&Vec<_>-- никой не може да модифицира вектора
Send и Sync
Въпрос
Дали обикновен тип като Vec<T> имплементира Sync?
- да, ако
T: Sync - ако нашата нишка има
&Vec<_>-- никой не може да модифицира вектора - ако нашата нишка има
&mut Vec<_>-- никой друг няма референция до вектора
Send и Sync
Аuto traits
- имплементират се автоматично ако всичките полета са съответно
SendиSync
pub struct Token(u32);
pub struct Token(u32);
fn main() {}

Send и Sync
Unsafe traits
- unsafe са за ръчна имплементация
struct MyBox(*mut u8);
unsafe impl Send for MyBox {}
unsafe impl Sync for MyBox {}
fn main() {}
struct MyBox(*mut u8);
unsafe impl Send for MyBox {}
unsafe impl Sync for MyBox {}
Send и Sync
Деимплементация
// Само на nightly
#![feature(optin_builtin_traits)]
struct SpecialToken(u8);
impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}
Send и Sync
Деимплементация
// Само на nightly
#![feature(optin_builtin_traits)]
struct SpecialToken(u8);
impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}
- автоматичната имплементация никога няма да е грешна от само себе си
Send и Sync
Деимплементация
// Само на nightly
#![feature(optin_builtin_traits)]
struct SpecialToken(u8);
impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}
- автоматичната имплементация никога няма да е грешна от само себе си
- но може да пишем код, който разчита, че определен тип не може да се прехвърля / споделя
Send и Sync
Деимплементация
Хак за stable
use std::marker::PhantomData;
struct SpecialToken(u8, PhantomData<*const ()>);
fn main() {}
use std::marker::PhantomData;
struct SpecialToken(u8, PhantomData<*const ()>);
Примитиви за синхронизация

Примитиви за синхронизация
Стандартния пример за грешен многонишков алгоритъм не се компилира
let v = Arc::new((0..100).collect::<Vec<_>>());
let mut sum = 0;
let t1 = {
let v = Arc::clone(&v);
let sum = &mut sum;
thread::spawn(move || for i in &v[0..50] { *sum += i; })
};
let t2 = {
let v = Arc::clone(&v);
let sum = &mut sum;
thread::spawn(move || for i in &v[51..100] { *sum += i; })
};
let _ = t1.join();
let _ = t2.join();
println!("sum: {}", sum);
error[E0597]: `sum` does not live long enough --> src/bin/main_92ae06cbac3bb0b057de58f0494c471228f42c4d.rs:9:15 | 5 | let mut sum = 0; | ------- binding `sum` declared here ... 9 | let sum = &mut sum; | ^^^^^^^^ borrowed value does not live long enough 10 | thread::spawn(move || for i in &v[0..50] { *sum += i; }) | -------------------------------------------------------- argument requires that `sum` is borrowed for `'static` ... 22 | } | - `sum` dropped here while still borrowed error[E0499]: cannot borrow `sum` as mutable more than once at a time --> src/bin/main_92ae06cbac3bb0b057de58f0494c471228f42c4d.rs:15:15 | 9 | let sum = &mut sum; | -------- first mutable borrow occurs here 10 | thread::spawn(move || for i in &v[0..50] { *sum += i; }) | -------------------------------------------------------- argument requires that `sum` is borrowed for `'static` ... 15 | let sum = &mut sum; | ^^^^^^^^ second mutable borrow occurs here error[E0502]: cannot borrow `sum` as immutable because it is also borrowed as mutable --> src/bin/main_92ae06cbac3bb0b057de58f0494c471228f42c4d.rs:21:21 | 9 | let sum = &mut sum; | -------- mutable borrow occurs here 10 | thread::spawn(move || for i in &v[0..50] { *sum += i; }) | -------------------------------------------------------- argument requires that `sum` is borrowed for `'static` ... 21 | println!("sum: {}", sum); | ^^^ immutable borrow occurs here | = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info) Some errors have detailed explanations: E0499, E0502, E0597. For more information about an error, try `rustc --explain E0499`. error: could not compile `rust` (bin "main_92ae06cbac3bb0b057de58f0494c471228f42c4d") due to 3 previous errors
use std::sync::Arc;
use std::thread;
fn main() {
let v = Arc::new((0..100).collect::>());
let mut sum = 0;
let t1 = {
let v = Arc::clone(&v);
let sum = &mut sum;
thread::spawn(move || for i in &v[0..50] { *sum += i; })
};
let t2 = {
let v = Arc::clone(&v);
let sum = &mut sum;
thread::spawn(move || for i in &v[51..100] { *sum += i; })
};
let _ = t1.join();
let _ = t2.join();
println!("sum: {}", sum);
}
Примитиви за синхронизация
Защо не се компилира? Какъв може да е типа на sum?
Примитиви за синхронизация
Защо не се компилира? Какъв може да е типа на sum?
&mut i32- не можем да имаме два пъти&mut, а иspawnочаква'static
Примитиви за синхронизация
Защо не се компилира? Какъв може да е типа на sum?
&mut i32- не можем да имаме два пъти&mut, а иspawnочаква'staticArc<i32>- нямаме как да модифицираме съдържанието
Примитиви за синхронизация
Защо не се компилира? Какъв може да е типа на sum?
&mut i32- не можем да имаме два пъти&mut, а иspawnочаква'staticArc<i32>- нямаме как да модифицираме съдържаниетоArc<Cell<i32>>,Arc<RefCell<i32>>-CellиRefCellне саSync
Примитиви за синхронизация
Можем да го накараме да работи
Примитиви за синхронизация
Можем да го накараме да работи
- мутекс
Примитиви за синхронизация
Можем да го накараме да работи
- мутекс
- атомарни числа
Примитиви за синхронизация
Можем да го накараме да работи
- мутекс
- атомарни числа
- да връщаме резултат от нишката
Примитиви за синхронизация
Можем да го накараме да работи
- мутекс
- атомарни числа
- да връщаме резултат от нишката
- …
Примитиви за синхронизация
Модула std::sync
- std::sync
ArcMutex,RwLockCondvar,Barrieratomicmpsc
Mutex
use std::sync::Mutex;
fn main() {
// мутекса опакова стойността, която предпазва
let mutex = Mutex::new(10);
{
// заключваме мутекса
// `lock()` връща "умен указател" с deref до `&T` и `&mut T`
let mut lock = mutex.lock().unwrap();
*lock += 32;
// мутекса се отключва когато `lock` се деалокира
}
}
use std::sync::Mutex;
fn main() {
// мутекса опакова стойността, която предпазва
let mutex = Mutex::new(10);
{
// заключваме мутекса
// `lock()` връща "умен указател" с deref до `&T` и `&mut T`
let mut lock = mutex.lock().unwrap();
*lock += 32;
// мутекса се отключва когато `lock` се деалокира
}
}
Mutex
- mutual exclusion
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
- работи по следния начин
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
- работи по следния начин
- съдържа флаг - дали мутекса е заключен или свободен
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
- работи по следния начин
- съдържа флаг - дали мутекса е заключен или свободен
- ако мутекса е отключен и извикаме
lock- заключваме го
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
- работи по следния начин
- съдържа флаг - дали мутекса е заключен или свободен
- ако мутекса е отключен и извикаме
lock- заключваме го - ако мутекса е заключен и извикаме
lock- нишката ни се спира
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
- работи по следния начин
- съдържа флаг - дали мутекса е заключен или свободен
- ако мутекса е отключен и извикаме
lock- заключваме го - ако мутекса е заключен и извикаме
lock- нишката ни се спира - операционната система ще я събуди когато мутекса е свободен
Мutex
Обикновенно мутекса се възприема като примитива която определя критична секция
lock(my_mutex);
// начало на критичната секция
do_stuff(shared_data);
// край на критичната секция
unlock(my_mutex);
В Ръст това не би било удобно, защото не дава достатъчна информация на компилатора как ползваме данните.
Затова Mutex е generic и опакова данните.
Mutex
Mutex<T>опакова данни от типT
Mutex
Mutex<T>опакова данни от типT- ако искаме мутекс без данни може да се използва
Mutex<()>
Mutex
Mutex<T>опакова данни от типT- ако искаме мутекс без данни може да се използва
Mutex<()> mutex.lock()връщаResult<MutexGuard<'a, T>, PoisonError>
Mutex
Mutex<T>опакова данни от типT- ако искаме мутекс без данни може да се използва
Mutex<()> mutex.lock()връщаResult<MutexGuard<'a, T>, PoisonError>mutex.lock().unwrap()връщаMutexGuard<'a, T>
Mutex
Mutex<T>опакова данни от типT- ако искаме мутекс без данни може да се използва
Mutex<()> mutex.lock()връщаResult<MutexGuard<'a, T>, PoisonError>mutex.lock().unwrap()връщаMutexGuard<'a, T>MutexGuardимаDerefдо&Tи&mut T
Mutex
Mutex<T>опакова данни от типT- ако искаме мутекс без данни може да се използва
Mutex<()> mutex.lock()връщаResult<MutexGuard<'a, T>, PoisonError>mutex.lock().unwrap()връщаMutexGuard<'a, T>MutexGuardимаDerefдо&Tи&mut T- единствения начин да достъпим данните е през
MutexGuard
Mutex
Panic
mutex.lock()връщаResult<MutexGuard<'a, T>, PoisonError>
Mutex
Panic
mutex.lock()връщаResult<MutexGuard<'a, T>, PoisonError>- ако нишка е заключила мутекс и влезе в
panic!по това време, може данните да са останали в (логически) невалидно състояние
Mutex
Panic
mutex.lock()връщаResult<MutexGuard<'a, T>, PoisonError>- ако нишка е заключила мутекс и влезе в
panic!по това време, може данните да са останали в (логически) невалидно състояние - мутекса се зачита за отровен
Mutex
Panic
mutex.lock()връщаResult<MutexGuard<'a, T>, PoisonError>- ако нишка е заключила мутекс и влезе в
panic!по това време, може данните да са останали в (логически) невалидно състояние - мутекса се зачита за отровен
- от
PoisonErrorможе да се извадиMutexGuard
Mutex
Panic
mutex.lock()връщаResult<MutexGuard<'a, T>, PoisonError>- ако нишка е заключила мутекс и влезе в
panic!по това време, може данните да са останали в (логически) невалидно състояние - мутекса се зачита за отровен
- от
PoisonErrorможе да се извадиMutexGuard - често срещано е резултата от
lockпросто да сеunwrap-не
RwLock
- Reader-writer lock
RwLock
- Reader-writer lock
- позволява четене от много места
RwLock
- Reader-writer lock
- позволява четене от много места
- или писане от едно място
RwLock
- Reader-writer lock
- позволява четене от много места
- или писане от едно място
- подобно на
RefCell, но в многонишков контекст
Mutex или RwLock
Mutexе по-бърз и по-лек отRwLock
Mutex или RwLock
Mutexе по-бърз и по-лек отRwLockMutexналага дисциплина да държим критичните секции възможно най-кратки
Mutex или RwLock
Mutexе по-бърз и по-лек отRwLockMutexналага дисциплина да държим критичните секции възможно най-кратки- понякога
RwLockсе налага - напр. за опаковане на стари C++ библиотеки
Arc + Mutex
Подобно на Rc<RefCell<T>>, може често да виждате Arc<Mutex<T>> или Arc<RwLock<T>>
Arc + Mutex
Пример
let v = Arc::new((0..100).collect::<Vec<_>>());
let total_sum = Arc::new(Mutex::new(0));
let t1 = {
let v = Arc::clone(&v);
let total_sum = Arc::clone(&total_sum);
thread::spawn(move || {
let local_sum = v[..50].iter().sum::<i32>();
*total_sum.lock().unwrap() += local_sum;
})
};
let t2 = {
let v = Arc::clone(&v);
let total_sum = Arc::clone(&total_sum);
thread::spawn(move || {
let local_sum = v[50..].iter().sum::<i32>();
*total_sum.lock().unwrap() += local_sum;
})
};
let _ = t1.join();
let _ = t2.join();
println!("sum: {}", *total_sum.lock().unwrap());
sum: 4950
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let v = Arc::new((0..100).collect::>());
let total_sum = Arc::new(Mutex::new(0));
let t1 = {
let v = Arc::clone(&v);
let total_sum = Arc::clone(&total_sum);
thread::spawn(move || {
let local_sum = v[..50].iter().sum::();
*total_sum.lock().unwrap() += local_sum;
})
};
let t2 = {
let v = Arc::clone(&v);
let total_sum = Arc::clone(&total_sum);
thread::spawn(move || {
let local_sum = v[50..].iter().sum::();
*total_sum.lock().unwrap() += local_sum;
})
};
let _ = t1.join();
let _ = t2.join();
println!("sum: {}", *total_sum.lock().unwrap());
}
Arc + Mutex
Пример
let v = (0..100).collect::<Vec<_>>();
let mut total_sum = Mutex::new(0);
thread::scope(|s| {
s.spawn(|| {
let local_sum = v[..50].iter().sum::<i32>();
*total_sum.lock().unwrap() += local_sum;
});
s.spawn(|| {
let local_sum = v[50..].iter().sum::<i32>();
*total_sum.lock().unwrap() += local_sum;
});
});
println!("sum: {}", *total_sum.get_mut().unwrap());
sum: 4950
use std::thread;
use std::sync::Mutex;
fn main() {
let v = (0..100).collect::>();
let mut total_sum = Mutex::new(0);
thread::scope(|s| {
s.spawn(|| {
let local_sum = v[..50].iter().sum::();
*total_sum.lock().unwrap() += local_sum;
});
s.spawn(|| {
let local_sum = v[50..].iter().sum::();
*total_sum.lock().unwrap() += local_sum;
});
});
println!("sum: {}", *total_sum.get_mut().unwrap());
}
Други примитиви
- разгледайте документацията на std::sync
CondvarOnceBarrier
Атомарни числа
- аритметичните операции се свеждат до няколко отделни инструкции
- едновременни операции могат да видят стари стойности
- затова не могат да се използват от множество нишки без синхронизация
let mut num = 10;
// thread 1 // thread 2
num += 5; num += 5;
// =============================================
let reg = load(&num);
let reg = reg + 5; let reg = load(&num);
let reg = reg + 5
store(&mut num, reg);
store(&mut num, reg); /* num = 15 */
/* num = 15 */
Атомарни числа
- има специални процесорни инструкции, които правят аритметична операция за една инструкция
- атомарни / неразделими
// псевдокод
let num = 10;
// thread 1 // thread 2
fetch_add(&num, 5); fetch_add(&num, 5);
load(&num); load(&num);
/* num = 20 */ /* num = 20 */
Атомарни числа
- атомарните числа изпозват атомарни инструкции
Атомарни числа
- атомарните числа изпозват атомарни инструкции
AtomicUsize,AtomicIsize,AtomicU8,AtomicU16, …
Атомарни числа
- атомарните числа изпозват атомарни инструкции
AtomicUsize,AtomicIsize,AtomicU8,AtomicU16, …AtomicBool
Атомарни числа
- атомарните числа изпозват атомарни инструкции
AtomicUsize,AtomicIsize,AtomicU8,AtomicU16, …AtomicBoolAtomicPtr
Атомарни числа
- атомарните числа изпозват атомарни инструкции
AtomicUsize,AtomicIsize,AtomicU8,AtomicU16, …AtomicBoolAtomicPtr- аритметични и побитови операции:
fetch_add,fetch_xor, …
Атомарни числа
- атомарните числа изпозват атомарни инструкции
AtomicUsize,AtomicIsize,AtomicU8,AtomicU16, …AtomicBoolAtomicPtr- аритметични и побитови операции:
fetch_add,fetch_xor, … - oперации по паметта:
load,store,compare_and_swap, …
Атомарни числа
Атомарните числа могат да се модифицират през споделена референция
use std::sync::atomic::{AtomicI32, Ordering};
let num = AtomicI32::new(10); // няма `mut`
num.fetch_add(5, Ordering::SeqCst);
num.fetch_add(5, Ordering::SeqCst);
println!("{}", num.load(Ordering::SeqCst));
20
fn main() {
use std::sync::atomic::{AtomicI32, Ordering};
let num = AtomicI32::new(10); // няма `mut`
num.fetch_add(5, Ordering::SeqCst);
num.fetch_add(5, Ordering::SeqCst);
println!("{}", num.load(Ordering::SeqCst));
}
Атомарни числа
- удобни са за създаване на различни флагове и броячи
Атомарни числа
- удобни са за създаване на различни флагове и броячи
- стоят в основата на много алгоритми и стуктури от данни
Атомарни числа
- удобни са за създаване на различни флагове и броячи
- стоят в основата на много алгоритми и стуктури от данни
- препоръчително да се използват пред
Mutex<{integer}>
Атомарни числа
Пример
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
let v = Arc::new((0..100).collect::<Vec<_>>());
let total_sum = Arc::new(AtomicI32::new(0));
let t1 = {
let v = Arc::clone(&v);
let total_sum = Arc::clone(&total_sum);
thread::spawn(move || {
let local_sum = v[..50].iter().sum::<i32>();
total_sum.fetch_add(local_sum, Ordering::SeqCst);
})
};
let t2 = {
let v = Arc::clone(&v);
let total_sum = Arc::clone(&total_sum);
thread::spawn(move || {
let local_sum = v[50..].iter().sum::<i32>();
total_sum.fetch_add(local_sum, Ordering::SeqCst);
})
};
let _ = t1.join();
let _ = t2.join();
println!("sum: {}", total_sum.load(Ordering::SeqCst));
sum: 4950
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
use std::thread;
fn main() {
let v = Arc::new((0..100).collect::>());
let total_sum = Arc::new(AtomicI32::new(0));
let t1 = {
let v = Arc::clone(&v);
let total_sum = Arc::clone(&total_sum);
thread::spawn(move || {
let local_sum = v[..50].iter().sum::();
total_sum.fetch_add(local_sum, Ordering::SeqCst);
})
};
let t2 = {
let v = Arc::clone(&v);
let total_sum = Arc::clone(&total_sum);
thread::spawn(move || {
let local_sum = v[50..].iter().sum::();
total_sum.fetch_add(local_sum, Ordering::SeqCst);
})
};
let _ = t1.join();
let _ = t2.join();
println!("sum: {}", total_sum.load(Ordering::SeqCst));
}
Атомарни числа
Пример
//
use std::sync::atomic::{AtomicI32, Ordering};
let v = (0..100).collect::<Vec<_>>();
let total_sum = AtomicI32::new(0);
thread::scope(|scope| {
scope.spawn(|| {
let local_sum = v[..50].iter().sum::<i32>();
total_sum.fetch_add(local_sum, Ordering::SeqCst);
});
scope.spawn(|| {
let local_sum = v[50..].iter().sum::<i32>();
total_sum.fetch_add(local_sum, Ordering::SeqCst);
});
});
println!("sum: {}", total_sum.load(Ordering::SeqCst));
sum: 4950
use std::sync::atomic::{AtomicI32, Ordering};
use std::thread;
fn main() {
let v = (0..100).collect::>();
let total_sum = AtomicI32::new(0);
thread::scope(|scope| {
scope.spawn(|| {
let local_sum = v[..50].iter().sum::();
total_sum.fetch_add(local_sum, Ordering::SeqCst);
});
scope.spawn(|| {
let local_sum = v[50..].iter().sum::();
total_sum.fetch_add(local_sum, Ordering::SeqCst);
});
});
println!("sum: {}", total_sum.load(Ordering::SeqCst));
}
Атомарни числа
Пример
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
let should_stop = Arc::new(AtomicBool::new(false));
let t1 = thread::spawn({
let should_stop = Arc::clone(&should_stop);
move || {
while !should_stop.load(Ordering::SeqCst) {
println!("running");
thread::sleep(Duration::from_millis(100));
}
}
});
thread::sleep(Duration::from_millis(300));
should_stop.store(true, Ordering::SeqCst);
let _ = t1.join();
running running running
fn main() {
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use std::thread;
let should_stop = Arc::new(AtomicBool::new(false));
let t1 = thread::spawn({
let should_stop = Arc::clone(&should_stop);
move || {
while !should_stop.load(Ordering::SeqCst) {
println!("running");
thread::sleep(Duration::from_millis(100));
}
}
});
thread::sleep(Duration::from_millis(300));
should_stop.store(true, Ordering::SeqCst);
let _ = t1.join();
}
Канали

Канали
Don't communicate by sharing memory,
share memory by communicating
Канали в стандартната библиотека
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
sender.send(10).unwrap();
});
println!("received {}", receiver.recv().unwrap());
}
received 10
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
sender.send(10).unwrap();
});
println!("received {}", receiver.recv().unwrap());
}
Типове канали
Неограничен канал
Типове канали
Неограничен канал
- unbounded / infinitely buffered / "asynchronous"
Типове канали
Неограничен канал
- unbounded / infinitely buffered / "asynchronous"
std::sync::mpsc::channel()
Типове канали
Неограничен канал
- unbounded / infinitely buffered / "asynchronous"
std::sync::mpsc::channel()(Sender, Receiver)
Типове канали
Неограничен канал
- unbounded / infinitely buffered / "asynchronous"
std::sync::mpsc::channel()(Sender, Receiver)- буфера се оразмерява, когато се напълни
Типове канали
Неограничен канал
- unbounded / infinitely buffered / "asynchronous"
std::sync::mpsc::channel()(Sender, Receiver)- буфера се оразмерява, когато се напълни
- изпращане на съобщение никога не блокира
Типове канали
Неограничен канал
| тип | метод | резултат | грешки |
|---|---|---|---|
| Sender | send(T) | Result<(), SendError<T>> |
disconnected |
| Receiver | recv() | Result<T, RecvError> |
disconnected |
| Receiver | try_recv() | Result<T, TryRecvError> |
Empty, Disconnected |
| Receiver | recv_timeout(Duration) | Result<T, RecvTimeoutError> |
Timeout, Disconnected |
Типове канали
Неограничен канал
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
sender.send(1).unwrap();
sender.send(2).unwrap();
sender.send(3).unwrap();
});
assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
assert_eq!(receiver.recv().unwrap(), 3);
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
sender.send(1).unwrap();
sender.send(2).unwrap();
sender.send(3).unwrap();
});
assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
assert_eq!(receiver.recv().unwrap(), 3);
}
Типове канали
Oграничен канал
- bounded / "synchronous"
Типове канали
Oграничен канал
- bounded / "synchronous"
std::sync::mpsc::sync_channel(k)
Типове канали
Oграничен канал
- bounded / "synchronous"
std::sync::mpsc::sync_channel(k)(SyncSender, Receiver)
Типове канали
Oграничен канал
- bounded / "synchronous"
std::sync::mpsc::sync_channel(k)(SyncSender, Receiver)- има буфер за
kсъобщения
Типове канали
Oграничен канал
- bounded / "synchronous"
std::sync::mpsc::sync_channel(k)(SyncSender, Receiver)- има буфер за
kсъобщения - изпращане на съобщения ще блокира, ако буфера е пълен
Типове канали
Ограничен канал
| тип | метод | резултат | грешки |
|---|---|---|---|
| SyncSender | send(T) | Result<(), SendError<T>> |
disconnected |
| SyncSender | try_send(T) | Result<(), TrySendError<T>> |
Full, Disconnected |
| Receiver | recv() | Result<T, RecvError> |
disconnected |
| Receiver | try_recv() | Result<T, TryRecvError> |
Empty, Disconnected |
| Receiver | recv_timeout(Duration) | Result<T, RecvTimeoutError> |
Timeout, Disconnected |
Типове канали
Ограничен канал
let (sender, receiver) = mpsc::sync_channel(1);
thread::spawn(move || {
// записва съобщението и връща веднага
sender.send(1).unwrap();
// ще блокира докато главната нишка не извика `receiver.recv()`
sender.send(2).unwrap();
});
assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::sync_channel(1);
thread::spawn(move || {
// записва съобщението и връща веднага
sender.send(1).unwrap();
// ще блокира докато главната нишка не извика `receiver.recv()`
sender.send(2).unwrap();
});
assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
}
Sender
Методи
let (sender, receiver) = mpsc::channel();
assert_eq!(sender.send(12), Ok(()));
// унищожаваме получателя
// съобщението `12` никога няма да бъде получено
mem::drop(receiver);
// грешка - получателя е унищожен
// можем да си върнем съобщението `23` от грешката
assert_eq!(sender.send(23), Err(SendError(23)));
use std::mem;
use std::sync::mpsc::{self, SendError};
fn main() {
let (sender, receiver) = mpsc::channel();
assert_eq!(sender.send(12), Ok(()));
// унищожаваме получателя
// съобщението `12` никога няма да бъде получено
mem::drop(receiver);
// грешка - получателя е унищожен
// можем да си върнем съобщението `23` от грешката
assert_eq!(sender.send(23), Err(SendError(23)));
}
SyncSender
Методи
let (sender, receiver) = mpsc::sync_channel(1);
assert_eq!(sender.try_send(12), Ok(()));
assert_eq!(sender.try_send(23), Err(TrySendError::Full(23)));
mem::drop(receiver);
assert_eq!(sender.try_send(23), Err(TrySendError::Disconnected(23)));
use std::mem;
use std::sync::mpsc::{self, TrySendError};
fn main() {
let (sender, receiver) = mpsc::sync_channel(1);
assert_eq!(sender.try_send(12), Ok(()));
assert_eq!(sender.try_send(23), Err(TrySendError::Full(23)));
mem::drop(receiver);
assert_eq!(sender.try_send(23), Err(TrySendError::Disconnected(23)));
}
Множество изпращачи
Изпращащата част може да се клонира
let (sender, receiver) = mpsc::channel();
let sender2 = sender.clone();
thread::spawn(move || {
sender.send(1).unwrap();
sender.send(2).unwrap();
});
thread::spawn(move || {
sender2.send(3).unwrap();
sender2.send(4).unwrap();
});
println!("{} {} {} {}",
receiver.recv().unwrap(), receiver.recv().unwrap(),
receiver.recv().unwrap(), receiver.recv().unwrap());
1 2 3 4
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();
let sender2 = sender.clone();
thread::spawn(move || {
sender.send(1).unwrap();
sender.send(2).unwrap();
});
thread::spawn(move || {
sender2.send(3).unwrap();
sender2.send(4).unwrap();
});
println!("{} {} {} {}",
receiver.recv().unwrap(), receiver.recv().unwrap(),
receiver.recv().unwrap(), receiver.recv().unwrap());
}
Множество получатели
- не може - каналите са multi-producer, single-consumer
Множество получатели
- не може - каналите са multi-producer, single-consumer
Receiverне може да се клонира
Множество получатели
- не може - каналите са multi-producer, single-consumer
Receiverне може да се клонираReceivereSend-- можем да го изпратим на друга нишка
Множество получатели
- не може - каналите са multi-producer, single-consumer
Receiverне може да се клонираReceivereSend-- можем да го изпратим на друга нишкаReceiverне еSync-- не можем да подадем&ReceiverилиArc<Receiver>
Receiver
Методи
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
for i in (0..50).rev() {
sender.send(i).unwrap();
}
});
while let Ok(msg) = receiver.recv() {
println!("received {}", msg);
}
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
for i in (0..50).rev() {
sender.send(i).unwrap();
}
});
while let Ok(msg) = receiver.recv() {
println!("received {}", msg);
}
}
Receiver
Итератори
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
for i in (0..50).rev() {
sender.send(i).unwrap();
}
});
// обхожда всички съобщения в канала
// ако няма налично съобщение блокира
// излиза от цикъла когато всички изпращачи са унищожени
for msg in receiver.iter() {
println!("received {}", msg);
}
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
for i in (0..50).rev() {
sender.send(i).unwrap();
}
});
// обхожда всички съобщения в канала
// ако няма налично съобщение блокира
// излиза от цикъла когато всички изпращачи са унищожени
for msg in receiver.iter() {
println!("received {}", msg);
}
}
Receiver
Итератори
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
for i in (0..50).rev() {
sender.send(i).unwrap();
}
});
// обхожда всички вече изпратени съобения в канала,
// след което излиза от цикъла
for msg in receiver.try_iter() {
println!("received {}", msg);
}
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
for i in (0..50).rev() {
sender.send(i).unwrap();
}
});
// обхожда всички вече изпратени съобения в канала,
// след което излиза от цикъла
for msg in receiver.try_iter() {
println!("received {}", msg);
}
}
Crossbeam channel
- https://docs.rs/crossbeam-channel/
- multi-producer multi-consumer (mpmc)
- по-чисто API
- каналите в
stdбяха сменени да използват същата имплементация катоcrossbeam-channel, но с различен интерфейс
Crossbeam channel
Разлики в API-то
| std::sync::mpsc | crossbeam_channel | |
|---|---|---|
| вид | MPSC | MPMC |
| неограничен | channel() |
unbounded() |
| неограничен - типове | (Sender, Receiver) |
(Sender, Receiver) |
| ограничен | sync_channel(k) |
bounded(k) |
| ограничен - типове | (SyncSender, Receiver) |
(Sender, Receiver) |
Външни библиотеки
Crossbeam
- https://docs.rs/crossbeam/
- колекция от алгоритми и структури от данни
- lock-free структури от данни - опашка, стек, deque
- и доста utilities
Външни библиотеки
Parking lot
- https://docs.rs/parking_lot
- https://github.com/Amanieu/parking_lot
- алтернативна имплементация на
Mutex,RwLock,Condvar,Once - по-малки и по-бързи от
std, различни tradeoffs - вижте README-то в github за детайли