Многонишково програмиране
23 ноември 2021
Административни неща
- на 25 ноември (четвъртък) няма да има лекция
Fearless concurrency
Fearless concurrency
Fearless concurrency
Rust предотвратява data races
Fearless concurrency
Rust предотвратява data races
- две нишки достъпват една и съща памет
- поне единия достъп е за писане
- достъпите не са синхронизирани
Fearless concurrency
Rust предотвратява data races
- две нишки достъпват една и съща памет
- поне единия достъп е за писане
- достъпите не са синхронизирани
Rust закодира в типовата система понятието за thread safety
Fearless concurrency
Rust предотвратява data races
- две нишки достъпват една и съща памет
- поне единия достъп е за писане
- достъпите не са синхронизирани
Rust закодира в типовата система понятието за thread safety
- кои обекти или операции могат да се използват безопасно в паралелен код
- компилационна грешка при нарушаване
Fearless concurrency
Rust предотвратява data races
- две нишки достъпват една и съща памет
- поне единия достъп е за писане
- достъпите не са синхронизирани
Rust закодира в типовата система понятието за thread safety
- кои обекти или операции могат да се използват безопасно в паралелен код
- компилационна грешка при нарушаване
Rust не може да предотврати логически бъгове - race conditions, deadlocks и др.
Fearless concurrency
Rust предотвратява data races
- две нишки достъпват една и съща памет
- поне единия достъп е за писане
- достъпите не са синхронизирани
Rust закодира в типовата система понятието за thread safety
- кои обекти или операции могат да се използват безопасно в паралелен код
- компилационна грешка при нарушаване
Rust не може да предотврати логически бъгове - race conditions, deadlocks и др.
- но добри абстракции помагат с това
Нишки
thread::spawn
пуска нова нишка на операционната система- подадената функция се изпълнява в новата нишка
- когато функцията завърши, нишката се спира
use std::thread;
fn main() {
thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
}
use std::thread; fn main() { thread::spawn(|| println!("hi from spawned thread")); println!("hi from main thread"); }
Нишки
1:1 multithreading
Нишки
1:1 multithreading
- една нишка в езика отговаря на една нишка на ОС
- "обикновенния" начин за многонишково програмиране
thread::spawn
работи така- в тази лекция ще говорим само за 1:1
Нишки
1:1 multithreading
- една нишка в езика отговаря на една нишка на ОС
- "обикновенния" начин за многонишково програмиране
thread::spawn
работи така- в тази лекция ще говорим само за 1:1
съществува и друг модел - M:N multithreading
Нишки
1:1 multithreading
- една нишка в езика отговаря на една нишка на ОС
- "обикновенния" начин за многонишково програмиране
thread::spawn
работи така- в тази лекция ще говорим само за 1:1
съществува и друг модел - M:N multithreading
- зелени нишки
- M нишки в езика се изпълняват върху N нишки на ОС
- обикновенно M >> N
- изисква специaлна подръжка от езика - runtime
- в Rust се имплементира във външни библиотеки
- ще говорим за това в лекцията за futures и async/await
Нишки
- програмата приключва, когато главната нишка завърши
- останалите нишки се убиват
use std::thread;
fn main() {
thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
}
hi from main thread
use std::thread; fn main() { thread::spawn(|| println!("hi from spawned thread")); println!("hi from main thread"); }
Нишки
Сигнатурата на std::thread::spawn
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
Нишки
spawn
връщаJoinHandle
- можем да използваме
join
за да изчакаме пуснатите нишки - когато
JoinHandle
се drop-не нишката се detach-ва
use std::thread;
fn main() {
let handle = thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread; fn main() { let handle = thread::spawn(|| println!("hi from spawned thread")); println!("hi from main thread"); let _ = handle.join(); }
Нишки
- типа
T
вJoinHandle<T>
е резултата от подадената функция - обикновенно ще е
()
, но може да се върне и друго
use std::thread;
fn main() {
let handle = thread::spawn(|| {
// very hard computation ...
42
});
let answ = handle.join();
println!("{:?}", answ);
}
Ok(42)
use std::thread; fn main() { let handle = thread::spawn(|| { // very hard computation ... 42 }); let answ = handle.join(); println!("{:?}", answ); }
Panic в нишка
panic!
в главната нишка спира програмата
Panic в нишка
panic!
в главната нишка спира програматаpanic!
в друга нишка спира нишката
Panic в нишка
panic!
в главната нишка спира програматаpanic!
в друга нишка спира нишкатаJoinHandle::join
връща резултат
Panic в нишка
panic!
в главната нишка спира програматаpanic!
в друга нишка спира нишкатаJoinHandle::join
връща резултатOk(T)
ако функцията е завършила успешно
Panic в нишка
panic!
в главната нишка спира програматаpanic!
в друга нишка спира нишкатаJoinHandle::join
връща резултатOk(T)
ако функцията е завършила успешноErr(Box<Any>)
ако е имало паника
Join в деструктор
JoinHandle::join
приемаself
по стойност- следователно не можем да правим това?
struct Wrapper { handle: JoinHandle<()>, }
impl Drop for Wrapper {
fn drop(&mut self) {
self.handle.join().unwrap();
}
}
fn main() {
let handle = thread::spawn(|| println!("Изчакай ме"));
let wrapper = Wrapper { handle };
}
error[E0507]: cannot move out of `self.handle` which is behind a mutable reference --> src/bin/main_0bb11696c80ccacf695fc367afd2d70dc403e61e.rs:7:9 | 7 | self.handle.join().unwrap(); | ^^^^^^^^^^^ move occurs because `self.handle` has type `JoinHandle<()>`, which does not implement the `Copy` trait For more information about this error, try `rustc --explain E0507`. error: could not compile `rust` due to previous error
use std::thread::{self, JoinHandle}; struct Wrapper { handle: JoinHandle<()>, } impl Drop for Wrapper { fn drop(&mut self) { self.handle.join().unwrap(); } } fn main() { let handle = thread::spawn(|| println!("Изчакай ме")); let wrapper = Wrapper { handle }; }
Join в деструктор
- няма проблем
- просто добавяме
Option
struct Wrapper { handle: Option<JoinHandle<()>>, }
impl Drop for Wrapper {
fn drop(&mut self) {
if let Some(handle) = self.handle.take() {
handle.join().unwrap();
}
}
}
fn main() {
let handle = thread::spawn(|| println!("Изчакай ме"));
let wrapper = Wrapper { handle: Some(handle) };
}
Изчакай ме
use std::thread::{self, JoinHandle}; struct Wrapper { handle: Option>, } impl Drop for Wrapper { fn drop(&mut self) { if let Some(handle) = self.handle.take() { handle.join().unwrap(); } } } fn main() { let handle = thread::spawn(|| println!("Изчакай ме")); let wrapper = Wrapper { handle: Some(handle) }; }
Споделяне на стойности
Споделяне на стойности
use std::thread;
fn main() {
let nums = vec![0, 1, 2, 3];
let handle = thread::spawn(|| {
for i in &nums {
println!("number {}", i);
}
});
let _ = handle.join();
}
Споделяне на стойности
use std::thread;
fn main() {
let nums = vec![0, 1, 2, 3];
let handle = thread::spawn(|| {
for i in &nums {
println!("number {}", i);
}
});
let _ = handle.join();
}
error[E0373]: closure may outlive the current function, but it borrows `nums`, which is owned by the current function --> src/bin/main_44c598e1dd6669cdea1e15fcdbf808719a029646.rs:6:32 | 6 | let handle = thread::spawn(|| { | ^^ may outlive borrowed value `nums` 7 | for i in &nums { | ---- `nums` is borrowed here | note: function requires argument type to outlive `'static` --> src/bin/main_44c598e1dd6669cdea1e15fcdbf808719a029646.rs:6:18 | 6 | let handle = thread::spawn(|| { | __________________^ 7 | | for i in &nums { 8 | | println!("number {}", i); 9 | | } 10 | | }); | |______^ help: to force the closure to take ownership of `nums` (and any other referenced variables), use the `move` keyword | 6 | let handle = thread::spawn(move || { | ++++ For more information about this error, try `rustc --explain E0373`. error: could not compile `rust` due to previous error
use std::thread; fn main() { let nums = vec![0, 1, 2, 3]; let handle = thread::spawn(|| { for i in &nums { println!("number {}", i); } }); let _ = handle.join(); }
Споделяне на стойности
- новосъздадената нишка може да надживее функцията в която е извикана
Споделяне на стойности
- новосъздадената нишка може да надживее функцията в която е извикана
- затова rust не позволява да подадем референции към локални променливи
Споделяне на стойности
- новосъздадената нишка може да надживее функцията в която е извикана
- затова rust не позволява да подадем референции към локални променливи
- това се налага от ограничението на
spawn
, която приемаF: 'static
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static
Споделяне на стойности
Можем да преместим стойността в новата нишка с move
closure
use std::thread;
fn main() {
let nums = vec![0, 1, 2, 3];
let handle = thread::spawn(move || {
for i in &nums {
println!("number {}", i);
}
});
let _ = handle.join();
}
number 0 number 1 number 2 number 3
use std::thread; fn main() { let nums = vec![0, 1, 2, 3]; let handle = thread::spawn(move || { for i in &nums { println!("number {}", i); } }); let _ = handle.join(); }
Споделяне между няколко нишки
Как бихме споделили стойност между няколко нишки?
use std::thread;
fn main() {
let nums = vec![0, 1, 2, 3];
let handles = (0..2).map(|_| {
thread::spawn(move || {
for i in &nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
Споделяне между няколко нишки
- прехвърляне на собствеността няма как да работи
Споделяне между няколко нишки
- прехвърляне на собствеността няма как да работи
- не можем да ползваме референция, защото нишката може да надживее
main
Споделяне между няколко нишки
- прехвърляне на собствеността няма как да работи
- не можем да ползваме референция, защото нишката може да надживее
main
- можем да пробваме с
Rc
Споделяне между няколко нишки
- прехвърляне на собствеността няма как да работи
- не можем да ползваме референция, защото нишката може да надживее
main
- можем да пробваме с
Rc
use std::rc::Rc;
use std::thread;
fn main() {
let nums_vec = vec![0, 1, 2, 3];
let nums_rc = Rc::new(nums_vec);
let handles = (0..2)
.map(|_| {
let nums_rc = Rc::clone(&nums_rc);
thread::spawn(move || {
for i in &*nums_rc {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
error[E0277]: `Rc<Vec<i32>>` cannot be sent between threads safely --> src/bin/main_6b26de4d2125d19c147e3afab659adbe061f420d.rs:12:13 | 12 | thread::spawn(move || { | _____________^^^^^^^^^^^^^_- | | | | | `Rc<Vec<i32>>` cannot be sent between threads safely 13 | | for i in &*nums_rc { 14 | | println!("number {}", i); 15 | | } 16 | | }) | |_____________- within this `[closure@src/bin/main_6b26de4d2125d19c147e3afab659adbe061f420d.rs:12:27: 16:14]` | = help: within `[closure@src/bin/main_6b26de4d2125d19c147e3afab659adbe061f420d.rs:12:27: 16:14]`, the trait `Send` is not implemented for `Rc<Vec<i32>>` = note: required because it appears within the type `[closure@src/bin/main_6b26de4d2125d19c147e3afab659adbe061f420d.rs:12:27: 16:14]` note: required by a bound in `spawn` --> /home/andrew/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/mod.rs:625:8 | 625 | F: Send + 'static, | ^^^^ required by this bound in `spawn` For more information about this error, try `rustc --explain E0277`. error: could not compile `rust` due to previous error
use std::rc::Rc; use std::thread; fn main() { let nums_vec = vec![0, 1, 2, 3]; let nums_rc = Rc::new(nums_vec); let handles = (0..2) .map(|_| { let nums_rc = Rc::clone(&nums_rc); thread::spawn(move || { for i in &*nums_rc { println!("number {}", i); } }) }) .collect::>(); for h in handles { let _ = h.join(); } }
Send и Sync
Rc
не може да се използва от няколко нишки едновременно
Send и Sync
Rc
не може да се използва от няколко нишки едновременноRc
сочещи към един обект използват споделен брояч на референциите
Send и Sync
Rc
не може да се използва от няколко нишки едновременноRc
сочещи към един обект използват споделен брояч на референциите- този брояч не е синхронизиран
Send и Sync
Rc
не може да се използва от няколко нишки едновременноRc
сочещи към един обект използват споделен брояч на референциите- този брояч не е синхронизиран
- при
clone
илиdrop
този брояч се модифицира
Send и Sync
Rc
не може да се използва от няколко нишки едновременноRc
сочещи към един обект използват споделен брояч на референциите- този брояч не е синхронизиран
- при
clone
илиdrop
този брояч се модифицира - ако
clone
илиdrop
се извикат едновременно от две нишки - това би било data race
Send и Sync
Rc
не може да се използва от няколко нишки едновременноRc
сочещи към един обект използват споделен брояч на референциите- този брояч не е синхронизиран
- при
clone
илиdrop
този брояч се модифицира - ако
clone
илиdrop
се извикат едновременно от две нишки - това би било data race - Rust предотвратява това, като не позволява да пращаме
Rc
до други нишки
Send и Sync
- не можем да пращаме
Rc
, защотоRc
не имплементираSend
- следователно closuer-а
F
не имплементираSend
- а
spawn
изискваF: Send
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static
Send и Sync
Send и Sync
- Send - позволява прехвърляне на собственост между нишки
Send и Sync
- Send - позволява прехвърляне на собственост между нишки
- Sync - позволява споделяне между нишки през референция (
&T
)
Send и Sync
- Send - позволява прехвърляне на собственост между нишки
- Sync - позволява споделяне между нишки през референция (
&T
) - marker traits
Send и Sync
- Send - позволява прехвърляне на собственост между нишки
- Sync - позволява споделяне между нишки през референция (
&T
) - marker traits
- имплементирани са за повечето типове
Send и Sync
- Send - позволява прехвърляне на собственост между нишки
- Sync - позволява споделяне между нишки през референция (
&T
) - marker traits
- имплементирани са за повечето типове
- auto traits
Send и Sync
- Send - позволява прехвърляне на собственост между нишки
- Sync - позволява споделяне между нишки през референция (
&T
) - marker traits
- имплементирани са за повечето типове
- auto traits
- unsafe traits
Send и Sync
Send
- позволява прехвърляне на собственост между нишки
Send и Sync
Send
- позволява прехвърляне на собственост между нишки
- пример за типове, които не са
Send
:
Send и Sync
Send
- позволява прехвърляне на собственост между нишки
- пример за типове, които не са
Send
:Rc
Send и Sync
Send
- позволява прехвърляне на собственост между нишки
- пример за типове, които не са
Send
:Rc
*const T
и*mut T
Send и Sync
Send
- позволява прехвърляне на собственост между нишки
- пример за типове, които не са
Send
:Rc
*const T
и*mut T
- thread local типове, напр.
rand::rngs::ThreadRng
Send и Sync
Send
- позволява прехвърляне на собственост между нишки
- пример за типове, които не са
Send
:Rc
*const T
и*mut T
- thread local типове, напр.
rand::rngs::ThreadRng
- и други
Send и Sync
Sync
- позволява споделяне на референция
&T
между нишки
Send и Sync
Sync
- позволява споделяне на референция
&T
между нишки T: Sync
⟺&T: Send
Send и Sync
Sync
- позволява споделяне на референция
&T
между нишки T: Sync
⟺&T: Send
- пример за типове, които не са
Sync
:
Send и Sync
Sync
- позволява споделяне на референция
&T
между нишки T: Sync
⟺&T: Send
- пример за типове, които не са
Sync
:Rc
Send и Sync
Sync
- позволява споделяне на референция
&T
между нишки T: Sync
⟺&T: Send
- пример за типове, които не са
Sync
:Rc
Cell
,RefCell
Send и Sync
Sync
- позволява споделяне на референция
&T
между нишки T: Sync
⟺&T: Send
- пример за типове, които не са
Sync
:Rc
Cell
,RefCell
*const T
и*mut T
Send и Sync
Sync
- позволява споделяне на референция
&T
между нишки T: Sync
⟺&T: Send
- пример за типове, които не са
Sync
:Rc
Cell
,RefCell
*const T
и*mut T
- и други
Send и Sync
Това значи ли че Vec<T>
е Sync
?
Send и Sync
Това значи ли че Vec<T>
е Sync
?
- да, ако
T: Sync
Send и Sync
Това значи ли че Vec<T>
е Sync
?
- да, ако
T: Sync
- ако нашата нишка има
&Vec<_>
значи никой не може да модифицира вектора
Send и Sync
Това значи ли че Vec<T>
е Sync
?
- да, ако
T: Sync
- ако нашата нишка има
&Vec<_>
значи никой не може да модифицира вектора - ако нашата нишка има
&mut Vec<_>
значи никой друг няма референция до вектора
Send и Sync
Това значи ли че Vec<T>
е Sync
?
- да, ако
T: Sync
- ако нашата нишка има
&Vec<_>
значи никой не може да модифицира вектора - ако нашата нишка има
&mut Vec<_>
значи никой друг няма референция до вектора - типове, които не са
Sync
, обикновено имат internal mutability без синхронизация
Send и Sync
Аuto traits
- имплементират се автоматично ако всичките полета са съответно
Send
иSync
pub struct Token(u32);
pub struct Token(u32); fn main() {}
Send и Sync
Unsafe traits
- unsafe са за ръчна имплементация
struct MyBox(*mut u8);
unsafe impl Send for MyBox {}
unsafe impl Sync for MyBox {}
fn main() {} struct MyBox(*mut u8); unsafe impl Send for MyBox {} unsafe impl Sync for MyBox {}
Send и Sync
Деимплементация
// Само на nightly
#![feature(optin_builtin_traits)]
struct SpecialToken(u8);
impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}
Send и Sync
Деимплементация
// Само на nightly
#![feature(optin_builtin_traits)]
struct SpecialToken(u8);
impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}
- автоматичната имплементация никога няма да е грешна от само себе си
Send и Sync
Деимплементация
// Само на nightly
#![feature(optin_builtin_traits)]
struct SpecialToken(u8);
impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}
- автоматичната имплементация никога няма да е грешна от само себе си
- но може да пишем код, който разчита, че определен тип не може да се прехвърля / споделя
Send и Sync
Деимплементация
Хак за stable
use std::marker::PhantomData;
struct SpecialToken(u8, PhantomData<*const ()>);
fn main() {} use std::marker::PhantomData; struct SpecialToken(u8, PhantomData<*const ()>);
Arc
Да се върнем на кода, който не се компилираше
use std::rc::Rc;
use std::thread;
fn main() {
let nums = Rc::new(vec![0, 1, 2, 3]);
let handles = (0..2)
.map(|_| {
let nums = Rc::clone(&nums);
thread::spawn(move || {
for i in &*nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
error[E0277]: `Rc<Vec<i32>>` cannot be sent between threads safely --> src/bin/main_8f1d186d4cfc4309e094e98b192d752bdba03441.rs:11:13 | 11 | thread::spawn(move || { | _____________^^^^^^^^^^^^^_- | | | | | `Rc<Vec<i32>>` cannot be sent between threads safely 12 | | for i in &*nums { 13 | | println!("number {}", i); 14 | | } 15 | | }) | |_____________- within this `[closure@src/bin/main_8f1d186d4cfc4309e094e98b192d752bdba03441.rs:11:27: 15:14]` | = help: within `[closure@src/bin/main_8f1d186d4cfc4309e094e98b192d752bdba03441.rs:11:27: 15:14]`, the trait `Send` is not implemented for `Rc<Vec<i32>>` = note: required because it appears within the type `[closure@src/bin/main_8f1d186d4cfc4309e094e98b192d752bdba03441.rs:11:27: 15:14]` note: required by a bound in `spawn` --> /home/andrew/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/mod.rs:625:8 | 625 | F: Send + 'static, | ^^^^ required by this bound in `spawn` For more information about this error, try `rustc --explain E0277`. error: could not compile `rust` due to previous error
use std::rc::Rc; use std::thread; fn main() { let nums = Rc::new(vec![0, 1, 2, 3]); let handles = (0..2) .map(|_| { let nums = Rc::clone(&nums); thread::spawn(move || { for i in &*nums { println!("number {}", i); } }) }) .collect::>(); for h in handles { let _ = h.join(); } }
Arc
Решението е да заменим std::rc::Rc
с std::sync::Arc
use std::sync::Arc;
use std::thread;
fn main() {
let nums = Arc::new(vec![0, 1, 2, 3]);
let handles = (0..2)
.map(|_| {
let nums = Arc::clone(&nums);
thread::spawn(move || {
for i in &*nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
number 0 number 1 number 2 number 3 number 0 number 1 number 2 number 3
use std::sync::Arc; use std::thread; fn main() { let nums = Arc::new(vec![0, 1, 2, 3]); let handles = (0..2) .map(|_| { let nums = Arc::clone(&nums); thread::spawn(move || { for i in &*nums { println!("number {}", i); } }) }) .collect::>(); for h in handles { let _ = h.join(); } }
Arc
Arc
- Atomic Reference Counter
Arc
- Atomic Reference Counter
- аналогично на Rc (споделена собственост, позволява само взимане на
&T
към вътрешността)
Arc
- Atomic Reference Counter
- аналогично на Rc (споделена собственост, позволява само взимане на
&T
към вътрешността) - но използва атомарни операции за броене на референциите
Arc
- Atomic Reference Counter
- аналогично на Rc (споделена собственост, позволява само взимане на
&T
към вътрешността) - но използва атомарни операции за броене на референциите
- може да се използва за споделяне на стойности между нишки, ако
T: Send + Sync
Синхронизация
Примитиви за синхронизация
Стандартния пример за грешен многонишков алгоритъм не се компилира
let v = Arc::new((0..100).collect::<Vec<_>>());
let mut sum = 0;
let t1 = {
let v = Arc::clone(&v);
let sum = &mut sum;
thread::spawn(move || for i in &v[0..50] { *sum += i; })
};
let t2 = {
let v = Arc::clone(&v);
let sum = &mut sum;
thread::spawn(move || for i in &v[51..100] { *sum += i; })
};
let _ = t1.join();
let _ = t2.join();
println!("sum: {}", sum);
Примитиви за синхронизация
Защо не се компилира? Какъв може да е типа на sum
?
Примитиви за синхронизация
Защо не се компилира? Какъв може да е типа на sum
?
&mut i32
- не можем да имаме два пъти&mut
, а иspawn
очаква'static
Примитиви за синхронизация
Защо не се компилира? Какъв може да е типа на sum
?
&mut i32
- не можем да имаме два пъти&mut
, а иspawn
очаква'static
Arc<i32>
- нямаме как да модифицираме съдържанието
Примитиви за синхронизация
Защо не се компилира? Какъв може да е типа на sum
?
&mut i32
- не можем да имаме два пъти&mut
, а иspawn
очаква'static
Arc<i32>
- нямаме как да модифицираме съдържаниетоArc<Cell<i32>>
,Arc<RefCell<i32>>
-Cell
иRefCell
не саSync
Примитиви за синхронизация
Можем да го накараме да работи
Примитиви за синхронизация
Можем да го накараме да работи
- мутекс
Примитиви за синхронизация
Можем да го накараме да работи
- мутекс
- атомарни числа
Примитиви за синхронизация
Можем да го накараме да работи
- мутекс
- атомарни числа
- да връщаме резултат от нишката
Примитиви за синхронизация
Можем да го накараме да работи
- мутекс
- атомарни числа
- да връщаме резултат от нишката
- …
Примитиви за синхронизация
Модула std::sync
- std::sync
Arc
Mutex
,RwLock
Condvar
,Barrier
atomic
mpsc
Mutex
use std::sync::Mutex;
fn main() {
// мутекса опакова стойността, която предпазва
let mutex = Mutex::new(10);
{
// заключваме мутекса
// `lock()` връща "умен указател" с deref до `&T` и `&mut T`
let mut lock = mutex.lock().unwrap();
*lock += 32;
// мутекса се отключва когато `lock` се деалокира
}
}
use std::sync::Mutex; fn main() { // мутекса опакова стойността, която предпазва let mutex = Mutex::new(10); { // заключваме мутекса // `lock()` връща "умен указател" с deref до `&T` и `&mut T` let mut lock = mutex.lock().unwrap(); *lock += 32; // мутекса се отключва когато `lock` се деалокира } }
Mutex
- mutual exclusion
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
- работи по следния начин
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
- работи по следния начин
- съдържа флаг - дали мутекса е заключен или свободен
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
- работи по следния начин
- съдържа флаг - дали мутекса е заключен или свободен
- ако мутекса е отключен и извикаме
lock
- заключваме го
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
- работи по следния начин
- съдържа флаг - дали мутекса е заключен или свободен
- ако мутекса е отключен и извикаме
lock
- заключваме го - ако мутекса е заключен и извикаме
lock
- нишката ни се спира
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
- работи по следния начин
- съдържа флаг - дали мутекса е заключен или свободен
- ако мутекса е отключен и извикаме
lock
- заключваме го - ако мутекса е заключен и извикаме
lock
- нишката ни се спира - операционната система ще я събуди когато мутекса е свободен
Мutex
Обикновенно мутекса се възприема като примитива която определя критична секция
lock(my_mutex);
// начало на критичната секция
do_stuff(shared_data);
// край на критичната секция
unlock(my_mutex);
В Ръст това не би било удобно, защото не дава достатъчна информация на компилатора как ползваме данните.
Затова Mutex
е generic и опакова данните.
Mutex
Mutex<T>
опакова данни от типT
Mutex
Mutex<T>
опакова данни от типT
- ако искаме мутекс без данни може да се използва
Mutex<()>
Mutex
Mutex<T>
опакова данни от типT
- ако искаме мутекс без данни може да се използва
Mutex<()>
mutex.lock()
връщаResult<MutexGuard<'a, T>, PoisonError>
Mutex
Mutex<T>
опакова данни от типT
- ако искаме мутекс без данни може да се използва
Mutex<()>
mutex.lock()
връщаResult<MutexGuard<'a, T>, PoisonError>
mutex.lock().unwrap()
връщаMutexGuard<'a, T>
Mutex
Mutex<T>
опакова данни от типT
- ако искаме мутекс без данни може да се използва
Mutex<()>
mutex.lock()
връщаResult<MutexGuard<'a, T>, PoisonError>
mutex.lock().unwrap()
връщаMutexGuard<'a, T>
MutexGuard
имаDeref
до&T
и&mut T
Mutex
Mutex<T>
опакова данни от типT
- ако искаме мутекс без данни може да се използва
Mutex<()>
mutex.lock()
връщаResult<MutexGuard<'a, T>, PoisonError>
mutex.lock().unwrap()
връщаMutexGuard<'a, T>
MutexGuard
имаDeref
до&T
и&mut T
- единствения начин да достъпим данните е през
MutexGuard
Mutex
Panic
mutex.lock()
връщаResult<MutexGuard<'a, T>, PoisonError>
Mutex
Panic
mutex.lock()
връщаResult<MutexGuard<'a, T>, PoisonError>
- ако нишка е заключила мутекс и влезе в
panic!
по това време, може данните да са останали в (логически) невалидно състояние
Mutex
Panic
mutex.lock()
връщаResult<MutexGuard<'a, T>, PoisonError>
- ако нишка е заключила мутекс и влезе в
panic!
по това време, може данните да са останали в (логически) невалидно състояние - мутекса се зачита за отровен
Mutex
Panic
mutex.lock()
връщаResult<MutexGuard<'a, T>, PoisonError>
- ако нишка е заключила мутекс и влезе в
panic!
по това време, може данните да са останали в (логически) невалидно състояние - мутекса се зачита за отровен
- от
PoisonError
може да се извадиMutexGuard
Mutex
Panic
mutex.lock()
връщаResult<MutexGuard<'a, T>, PoisonError>
- ако нишка е заключила мутекс и влезе в
panic!
по това време, може данните да са останали в (логически) невалидно състояние - мутекса се зачита за отровен
- от
PoisonError
може да се извадиMutexGuard
- често срещано е резултата от
lock
просто да сеunwrap
-не
RwLock
- Reader-writer lock
RwLock
- Reader-writer lock
- позволява четене от много места
RwLock
- Reader-writer lock
- позволява четене от много места
- или писане от едно място
RwLock
- Reader-writer lock
- позволява четене от много места
- или писане от едно място
- подобно на
RefCell
, но в многонишков контекст
Mutex или RwLock
- обикновенно
Mutex
е по-доброто решение
Mutex или RwLock
- обикновенно
Mutex
е по-доброто решение Mutex
е по-бърз и по-лек отRwLock
Mutex или RwLock
- обикновенно
Mutex
е по-доброто решение Mutex
е по-бърз и по-лек отRwLock
Mutex
налага дисциплина да държим критичните секции възможно най-кратки
Mutex или RwLock
- обикновенно
Mutex
е по-доброто решение Mutex
е по-бърз и по-лек отRwLock
Mutex
налага дисциплина да държим критичните секции възможно най-краткиMutex<Arc<State>>
или ArcSwapпонякога е добра алтернатива на RwLock<State>
Mutex или RwLock
- обикновенно
Mutex
е по-доброто решение Mutex
е по-бърз и по-лек отRwLock
Mutex
налага дисциплина да държим критичните секции възможно най-краткиMutex<Arc<State>>
или ArcSwapпонякога е добра алтернатива на RwLock<State>
RwLock
може да се използва да опаковаме стари C++ библиотеки
Arc + Mutex
Подобно на Rc<RefCell<T>>
, може често да виждате Arc<Mutex<T>>
или Arc<RwLock<T>>
Други примитиви
- разгледайте документацията на std::sync
Condvar
Once
Barrier
Атомарни числа
AtomicBool
,AtomicUsize
,AtomicIsize
,AtomicPtr
Атомарни числа
AtomicBool
,AtomicUsize
,AtomicIsize
,AtomicPtr
AtomicU8
,AtomicU16
, …
Атомарни числа
AtomicBool
,AtomicUsize
,AtomicIsize
,AtomicPtr
AtomicU8
,AtomicU16
, …- имплементират се чрез специални инструкции на процесора
Атомарни числа
AtomicBool
,AtomicUsize
,AtomicIsize
,AtomicPtr
AtomicU8
,AtomicU16
, …- имплементират се чрез специални инструкции на процесора
- удобни са за създаване на различни броячи и подобни
Атомарни числа
AtomicBool
,AtomicUsize
,AtomicIsize
,AtomicPtr
AtomicU8
,AtomicU16
, …- имплементират се чрез специални инструкции на процесора
- удобни са за създаване на различни броячи и подобни
- стоят в основата на много алгоритми
Атомарни числа
- препоръчително да се използват пред
Mutex<{integer}>
Атомарни числа
- препоръчително да се използват пред
Mutex<{integer}>
- интерфейса наподобява
Cell<{integer}>
, но саSend + Sync
Атомарни числа
- препоръчително да се използват пред
Mutex<{integer}>
- интерфейса наподобява
Cell<{integer}>
, но саSend + Sync
- т.е. модифицират се през
&T
и връщат копие на числото
Атомарни числа
- препоръчително да се използват пред
Mutex<{integer}>
- интерфейса наподобява
Cell<{integer}>
, но саSend + Sync
- т.е. модифицират се през
&T
и връщат копие на числото - аритметични операции:
fetch_add
,fetch_xor
, …
Атомарни числа
- препоръчително да се използват пред
Mutex<{integer}>
- интерфейса наподобява
Cell<{integer}>
, но саSend + Sync
- т.е. модифицират се през
&T
и връщат копие на числото - аритметични операции:
fetch_add
,fetch_xor
, … - oперации по паметта:
load
,store
,compare_and_swap
, …
Канали
Канали
Don't communicate by sharing memory,
share memory by communicating
Канали в стандартната библиотека
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
sender.send(10).unwrap();
});
println!("received {}", receiver.recv().unwrap());
}
received 10
use std::sync::mpsc; use std::thread; fn main() { let (sender, receiver) = mpsc::channel(); thread::spawn(move || { sender.send(10).unwrap(); }); println!("received {}", receiver.recv().unwrap()); }
Типове канали
Неограничен канал
Типове канали
Неограничен канал
- unbounded / infinitely buffered / "asynchronous"
Типове канали
Неограничен канал
- unbounded / infinitely buffered / "asynchronous"
std::sync::mpsc::channel()
Типове канали
Неограничен канал
- unbounded / infinitely buffered / "asynchronous"
std::sync::mpsc::channel()
(Sender, Receiver)
Типове канали
Неограничен канал
- unbounded / infinitely buffered / "asynchronous"
std::sync::mpsc::channel()
(Sender, Receiver)
- изпращане на съобщение никога не блокира
Типове канали
Неограничен канал
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
sender.send(1).unwrap();
sender.send(2).unwrap();
sender.send(3).unwrap();
});
assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
assert_eq!(receiver.recv().unwrap(), 3);
use std::sync::mpsc; use std::thread; fn main() { let (sender, receiver) = mpsc::channel(); thread::spawn(move || { sender.send(1).unwrap(); sender.send(2).unwrap(); sender.send(3).unwrap(); }); assert_eq!(receiver.recv().unwrap(), 1); assert_eq!(receiver.recv().unwrap(), 2); assert_eq!(receiver.recv().unwrap(), 3); }
Типове канали
Oграничен канал
- bounded / "synchronous"
Типове канали
Oграничен канал
- bounded / "synchronous"
std::sync::mpsc::sync_channel(k)
Типове канали
Oграничен канал
- bounded / "synchronous"
std::sync::mpsc::sync_channel(k)
(SyncSender, Receiver)
Типове канали
Oграничен канал
- bounded / "synchronous"
std::sync::mpsc::sync_channel(k)
(SyncSender, Receiver)
- има буфер за
k
съобщения
Типове канали
Oграничен канал
- bounded / "synchronous"
std::sync::mpsc::sync_channel(k)
(SyncSender, Receiver)
- има буфер за
k
съобщения - изпращане на съобщения ще блокира ако буфера е пълен
Типове канали
Ограничен канал
let (sender, receiver) = mpsc::sync_channel(1);
thread::spawn(move || {
// записва съобщението и връща веднага
sender.send(1).unwrap();
// ще блокира докато главната нишка не извика `receiver.recv()`
sender.send(2).unwrap();
});
assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
use std::sync::mpsc; use std::thread; fn main() { let (sender, receiver) = mpsc::sync_channel(1); thread::spawn(move || { // записва съобщението и връща веднага sender.send(1).unwrap(); // ще блокира докато главната нишка не извика `receiver.recv()` sender.send(2).unwrap(); }); assert_eq!(receiver.recv().unwrap(), 1); assert_eq!(receiver.recv().unwrap(), 2); }
Множество изпращачи
Изпращащата част може да се клонира
let (sender, receiver) = mpsc::channel();
let sender2 = sender.clone();
thread::spawn(move || {
sender.send(1).unwrap();
sender.send(2).unwrap();
});
thread::spawn(move || {
sender2.send(3).unwrap();
sender2.send(4).unwrap();
});
println!("{} {} {} {}",
receiver.recv().unwrap(), receiver.recv().unwrap(),
receiver.recv().unwrap(), receiver.recv().unwrap());
1 2 3 4
use std::sync::mpsc; use std::thread; fn main() { let (sender, receiver) = mpsc::channel(); let sender2 = sender.clone(); thread::spawn(move || { sender.send(1).unwrap(); sender.send(2).unwrap(); }); thread::spawn(move || { sender2.send(3).unwrap(); sender2.send(4).unwrap(); }); println!("{} {} {} {}", receiver.recv().unwrap(), receiver.recv().unwrap(), receiver.recv().unwrap(), receiver.recv().unwrap()); }
Sender
Методи
// изпраща `t`
// връща грешка ако получателят е бил унищожен
fn send(&self, t: T) -> Result<(), SendError<T>>
Sender
Методи
let (sender, receiver) = mpsc::channel();
assert_eq!(sender.send(12), Ok(()));
// унищожаваме получателя
// съобщението `12` никога няма да бъде получено
mem::drop(receiver);
// грешка - получателя е унищожен
// можем да си върнем съобщението `23` от грешката
assert_eq!(sender.send(23), Err(SendError(23)));
use std::mem; use std::sync::mpsc::{self, SendError}; fn main() { let (sender, receiver) = mpsc::channel(); assert_eq!(sender.send(12), Ok(())); // унищожаваме получателя // съобщението `12` никога няма да бъде получено mem::drop(receiver); // грешка - получателя е унищожен // можем да си върнем съобщението `23` от грешката assert_eq!(sender.send(23), Err(SendError(23))); }
SyncSender
Методи
// блокира ако буфера е пълен
fn send(&self, t: T) -> Result<(), SendError<T>>
// връща грешка ако буфера е пълен или получателят е бил унищожен
fn try_send(&self, t: T) -> Result<(), TrySendError<T>>
SyncSender
Методи
let (sender, receiver) = mpsc::sync_channel(1);
assert_eq!(sender.try_send(12), Ok(()));
assert_eq!(sender.try_send(23), Err(TrySendError::Full(23)));
mem::drop(receiver);
assert_eq!(sender.try_send(23), Err(TrySendError::Disconnected(23)));
use std::mem; use std::sync::mpsc::{self, TrySendError}; fn main() { let (sender, receiver) = mpsc::sync_channel(1); assert_eq!(sender.try_send(12), Ok(())); assert_eq!(sender.try_send(23), Err(TrySendError::Full(23))); mem::drop(receiver); assert_eq!(sender.try_send(23), Err(TrySendError::Disconnected(23))); }
Множество получатели
Множество получатели
- не може - каналите са multi-producer, single-consumer
Множество получатели
- не може - каналите са multi-producer, single-consumer
Receiver
не може да се клонира
Множество получатели
- не може - каналите са multi-producer, single-consumer
Receiver
не може да се клонираReceiver
eSend
, но не еSync
Receiver
Методи
// блокира докато не получи съобщение
// връща грешка ако всички изпращачи са унищожени
fn recv(&self) -> Result<T, RecvError>
// не блокира
// връща грешка ако всички изпращачи са унищожени или няма съобщение в опашката
fn try_recv(&self) -> Result<T, TryRecvError>
// блокира за определено време
// връща грешка ако всички изпращачи са унищожени или е изтекло времето
fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>
Receiver
Методи
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
for i in (0..50).rev() {
sender.send(i).unwrap();
}
});
while let Ok(msg) = receiver.recv() {
println!("received {}", msg);
}
use std::sync::mpsc; use std::thread; fn main() { let (sender, receiver) = mpsc::channel(); thread::spawn(move || { for i in (0..50).rev() { sender.send(i).unwrap(); } }); while let Ok(msg) = receiver.recv() { println!("received {}", msg); } }
Receiver
Итератори
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
for i in (0..50).rev() {
sender.send(i).unwrap();
}
});
for msg in receiver.iter() {
println!("received {}", msg);
}
use std::sync::mpsc; use std::thread; fn main() { let (sender, receiver) = mpsc::channel(); thread::spawn(move || { for i in (0..50).rev() { sender.send(i).unwrap(); } }); for msg in receiver.iter() { println!("received {}", msg); } }
Външни библиотеки
Crossbeam channel
- https://crates.io/crates/crossbeam-channel
- https://docs.rs/crossbeam-channel/
- multi-producer multi-consumer (mpmc) channel
- с опция за select по няколко канала
Външни библиотеки
Crossbeam
- https://crates.io/crates/crossbeam
- https://docs.rs/crossbeam/
- колекция от алгоритми и структури от данни
- lock-free структури от данни - опашка, стек, deque
- crossbeam_channel
- scoped threads
- и доста utilities
Външни библиотеки
Parking lot
- https://crates.io/crates/parking_lot
- https://docs.rs/parking_lot
- алтернативна имплементация на
Mutex
,RwLock
,Condvar
,Once
- по-малки и по-бързи от
std
- вижте README-то в github
Външни библиотеки
Rayon
- https://crates.io/crates/rayon
- https://docs.rs/rayon/
- библиотека за паралелизъм по данни
- threadpool
- parallel iterators
- split/join