Асинхронно програмиране
async/.await
10 декември 2024
Паралелизъм и concurrency
Паралелизъм и concurrency
- паралелизъм - изпълняваме няколко задачи едновременно върху няколко нишки
Паралелизъм и concurrency
- паралелизъм - изпълняваме няколко задачи едновременно върху няколко нишки
- concurrency - прогресираме изпълнението на няколко задачи едновременно, като позволяваме на всяка задача да се изпълнява за малък период от време и ги редуваме
Паралелизъм и concurrency
- паралелизъм - изпълняваме няколко задачи едновременно върху няколко нишки
- concurrency - прогресираме изпълнението на няколко задачи едновременно, като позволяваме на всяка задача да се изпълнява за малък период от време и ги редуваме
Двете понятия са ортогонални:
- concurrency без паралелизъм - върху една нишка
- concurrency с паралелизъм - върху множество нишки
Приложение
- паралелизъм - задачи с тежки сметки
Приложение
- паралелизъм - задачи с тежки сметки
- concurrency - множество леки задачи с много I/O операции
Приложение
- паралелизъм - задачи с тежки сметки
- concurrency - множество леки задачи с много I/O операции
- networking, уеб сървъри, …
Защо?
Пример: TCP echo сървър
use std::io::{Read, Write};
use std::error::Error;
fn main() -> Result<(), Box<dyn Error>> {
let addr = "127.0.0.1:8080".to_string();
let listener = std::net::TcpListener::bind(&addr)?;
println!("Listening on: {}", addr);
loop {
let (mut socket, _) = listener.accept()?;
std::thread::spawn(move || {
let mut buf = vec![0; 1024];
loop {
let n = socket
.read(&mut buf)
.expect("failed to read data from socket");
if n == 0 {
return;
}
socket
.write_all(&buf[0..n])
.expect("failed to write data to socket");
}
});
}
}
use std::io::{Read, Write};
use std::error::Error;
fn main() -> Result<(), Box> {
let addr = "127.0.0.1:8080".to_string();
let listener = std::net::TcpListener::bind(&addr)?;
println!("Listening on: {}", addr);
loop {
let (mut socket, _) = listener.accept()?;
std::thread::spawn(move || {
let mut buf = vec![0; 1024];
loop {
let n = socket
.read(&mut buf)
.expect("failed to read data from socket");
if n == 0 {
return;
}
socket
.write_all(&buf[0..n])
.expect("failed to write data to socket");
}
});
}
}
Защо?
- кодът е коректен, но не е оптимален
- няма да скалира добре при повече от 10-100 паралелни рекуести
Защо?
- кодът е коректен, но не е оптимален
- няма да скалира добре при повече от 10-100 паралелни рекуести
- кодът пуска нова нишка за всяка заявка
Защо?
- кодът е коректен, но не е оптимален
- няма да скалира добре при повече от 10-100 паралелни рекуести
- кодът пуска нова нишка за всяка заявка
- пускането на нишки на ОС отнема някакво време (освен ако не се използва thread pool)
Защо?
- кодът е коректен, но не е оптимален
- няма да скалира добре при повече от 10-100 паралелни рекуести
- кодът пуска нова нишка за всяка заявка
- пускането на нишки на ОС отнема някакво време (освен ако не се използва thread pool)
- всяка пусната нишка заема ресурси (памет)
Защо?
- кодът е коректен, но не е оптимален
- няма да скалира добре при повече от 10-100 паралелни рекуести
- кодът пуска нова нишка за всяка заявка
- пускането на нишки на ОС отнема някакво време (освен ако не се използва thread pool)
- всяка пусната нишка заема ресурси (памет)
- при достатъчно прости заявки, overhead-а от менажиране и превключване между нишки може да се окаже голям процент
Защо?
- кодът е коректен, но не е оптимален
- няма да скалира добре при повече от 10-100 паралелни рекуести
- кодът пуска нова нишка за всяка заявка
- пускането на нишки на ОС отнема някакво време (освен ако не се използва thread pool)
- всяка пусната нишка заема ресурси (памет)
- при достатъчно прости заявки, overhead-а от менажиране и превключване между нишки може да се окаже голям процент
- много от операциите по вход/изход са леки откъм процесорно време, но изискват дълго изчакване
Защо?
- кодът е коректен, но не е оптимален
- няма да скалира добре при повече от 10-100 паралелни рекуести
- кодът пуска нова нишка за всяка заявка
- пускането на нишки на ОС отнема някакво време (освен ако не се използва thread pool)
- всяка пусната нишка заема ресурси (памет)
- при достатъчно прости заявки, overhead-а от менажиране и превключване между нишки може да се окаже голям процент
- много от операциите по вход/изход са леки откъм процесорно време, но изискват дълго изчакване
- четене / писане в сокет
Защо?
- кодът е коректен, но не е оптимален
- няма да скалира добре при повече от 10-100 паралелни рекуести
- кодът пуска нова нишка за всяка заявка
- пускането на нишки на ОС отнема някакво време (освен ако не се използва thread pool)
- всяка пусната нишка заема ресурси (памет)
- при достатъчно прости заявки, overhead-а от менажиране и превключване между нишки може да се окаже голям процент
- много от операциите по вход/изход са леки откъм процесорно време, но изискват дълго изчакване
- четене / писане в сокет
- заявка към база данни
Защо?
- кодът е коректен, но не е оптимален
- няма да скалира добре при повече от 10-100 паралелни рекуести
- кодът пуска нова нишка за всяка заявка
- пускането на нишки на ОС отнема някакво време (освен ако не се използва thread pool)
- всяка пусната нишка заема ресурси (памет)
- при достатъчно прости заявки, overhead-а от менажиране и превключване между нишки може да се окаже голям процент
- много от операциите по вход/изход са леки откъм процесорно време, но изискват дълго изчакване
- четене / писане в сокет
- заявка към база данни
- микросървиси?
Защо?
- кодът е коректен, но не е оптимален
- няма да скалира добре при повече от 10-100 паралелни рекуести
- кодът пуска нова нишка за всяка заявка
- пускането на нишки на ОС отнема някакво време (освен ако не се използва thread pool)
- всяка пусната нишка заема ресурси (памет)
- при достатъчно прости заявки, overhead-а от менажиране и превключване между нишки може да се окаже голям процент
- много от операциите по вход/изход са леки откъм процесорно време, но изискват дълго изчакване
- четене / писане в сокет
- заявка към база данни
- микросървиси?
- бихме искали да си уплътним времето - да работим по други заявки докато чакаме
Как?
Как?
- функционалност на ядрото наречена evented io
Как?
- функционалност на ядрото наречена evented io
- epoll (Linux)
- kqueue (Unix/MacOs)
Как?
- функционалност на ядрото наречена evented io
- epoll (Linux)
- kqueue (Unix/MacOs)
- имаме голям брой файлоподобни неща - файлове, сокети, pipe-ове, …
Как?
- функционалност на ядрото наречена evented io
- epoll (Linux)
- kqueue (Unix/MacOs)
- имаме голям брой файлоподобни неща - файлове, сокети, pipe-ове, …
- казваме каква операция (
read,write, …) бихме искали да изпълним върху всеки "файл"
Как?
- функционалност на ядрото наречена evented io
- epoll (Linux)
- kqueue (Unix/MacOs)
- имаме голям брой файлоподобни неща - файлове, сокети, pipe-ове, …
- казваме каква операция (
read,write, …) бихме искали да изпълним върху всеки "файл" - ядрото ни казва кои операции са готови да се изпълнят веднага без чакане
Как?
- функционалност на ядрото наречена evented io
- epoll (Linux)
- kqueue (Unix/MacOs)
- имаме голям брой файлоподобни неща - файлове, сокети, pipe-ове, …
- казваме каква операция (
read,write, …) бихме искали да изпълним върху всеки "файл" - ядрото ни казва кои операции са готови да се изпълнят веднага без чакане
- IOCP - IO completion ports (Windows)
- io_uring (Linux)
Как?
- функционалност на ядрото наречена evented io
- epoll (Linux)
- kqueue (Unix/MacOs)
- имаме голям брой файлоподобни неща - файлове, сокети, pipe-ове, …
- казваме каква операция (
read,write, …) бихме искали да изпълним върху всеки "файл" - ядрото ни казва кои операции са готови да се изпълнят веднага без чакане
- IOCP - IO completion ports (Windows)
- io_uring (Linux)
- имаме голям брой файлоподобни неща - файлове, сокети, pipe-ове, …
Как?
- функционалност на ядрото наречена evented io
- epoll (Linux)
- kqueue (Unix/MacOs)
- имаме голям брой файлоподобни неща - файлове, сокети, pipe-ове, …
- казваме каква операция (
read,write, …) бихме искали да изпълним върху всеки "файл" - ядрото ни казва кои операции са готови да се изпълнят веднага без чакане
- IOCP - IO completion ports (Windows)
- io_uring (Linux)
- имаме голям брой файлоподобни неща - файлове, сокети, pipe-ове, …
- казваме на ядрото да започне да изпълнява операция (
read,write, …) върху всеки "файл"
Как?
- функционалност на ядрото наречена evented io
- epoll (Linux)
- kqueue (Unix/MacOs)
- имаме голям брой файлоподобни неща - файлове, сокети, pipe-ове, …
- казваме каква операция (
read,write, …) бихме искали да изпълним върху всеки "файл" - ядрото ни казва кои операции са готови да се изпълнят веднага без чакане
- IOCP - IO completion ports (Windows)
- io_uring (Linux)
- имаме голям брой файлоподобни неща - файлове, сокети, pipe-ове, …
- казваме на ядрото да започне да изпълнява операция (
read,write, …) върху всеки "файл" - ядрото ни казва кои операции са приключили
Как?
- evented io обикновено не се използва директно
Как?
- evented io обикновено не се използва директно
- а е опаковано в библиотека с някаква абстракция
Имплементации
Event driven (с callback функции)
- изпълняваме работата която можем да свършим без да се наложи да чакаме
Имплементации
Event driven (с callback функции)
- изпълняваме работата която можем да свършим без да се наложи да чакаме
- задаваме на библиотеката каква входно/изходна операция трябва да се извърши
Имплементации
Event driven (с callback функции)
- изпълняваме работата която можем да свършим без да се наложи да чакаме
- задаваме на библиотеката каква входно/изходна операция трябва да се извърши
- задаваме closure, който да се извика след като операцията е готова
Имплементации
// псевдокод
void process_request(shared_ptr<State> state) {
async_connect(io_executor, state->socket, bind(on_connected, state));
}
void on_connected(shared_ptr<State> state) {
state->request_buffer = prepare_request();
async_write(socket, state->request_buffer, bind(on_send_complete, state));
}
void on_send_complete(shared_ptr<State> state) {
asyn_read(socket, state->response_buffer, bind(on_read_complete, state));
}
void on_read_complete(shared_ptr<State> state) {
auto state->headers = parse_headers(state->response_buffer);
auto state->body = parse_body(state->response_buffer);
post(io_executor, bind(process, state));
}
void process(shared_ptr<State>) {
/* ... */
}
Имплементации
Event driven (с callback функции)
Недостатъци:
- по-трудна за проследяване логика
- няма как да пазим локални променливи между асинхронните операции
- нужна е голяма промяна на кода за рефакториране от синхронен вариант до вариант с callbacks
Имплементации
Coroutines
- кода изглежда като обикновена синхронна функция
Имплементации
Coroutines
- кода изглежда като обикновена синхронна функция
- но изпълнението може да се прекъсне по всяко време
Имплементации
Coroutines
- кода изглежда като обикновена синхронна функция
- но изпълнението може да се прекъсне по всяко време
- и да се продължи по-късно
Асинхронно програмиране в Rust
Пример
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let addr = "127.0.0.1:8080".to_string();
let listener = tokio::net::TcpListener::bind(&addr).await?;
println!("Listening on: {}", addr);
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = vec![0; 1024];
loop {
let n = socket
.read(&mut buf)
.await
.expect("failed to read data from socket");
if n == 0 {
return;
}
socket
.write_all(&buf[0..n])
.await
.expect("failed to write data to socket");
}
});
}
}
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box> {
let addr = "127.0.0.1:8080".to_string();
let listener = tokio::net::TcpListener::bind(&addr).await?;
println!("Listening on: {}", addr);
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = vec![0; 1024];
loop {
let n = socket
.read(&mut buf)
.await
.expect("failed to read data from socket");
if n == 0 {
return;
}
socket
.write_all(&buf[0..n])
.await
.expect("failed to write data to socket");
}
});
}
}
Асинхронно програмиране в Rust
- от rust 1.39 (ноември 2019)
Асинхронно програмиране в Rust
- от rust 1.39 (ноември 2019)
- Ръст предоставя async/await синтаксиса
Асинхронно програмиране в Rust
- от rust 1.39 (ноември 2019)
- Ръст предоставя async/await синтаксиса
- Ръст предоставя фундаменталните типове и trait-ове
Асинхронно програмиране в Rust
- от rust 1.39 (ноември 2019)
- Ръст предоставя async/await синтаксиса
- Ръст предоставя фундаменталните типове и trait-ове
- библиотеката
futuresпредоставя множество полезни функции и макроси
Асинхронно програмиране в Rust
- от rust 1.39 (ноември 2019)
- Ръст предоставя async/await синтаксиса
- Ръст предоставя фундаменталните типове и trait-ове
- библиотеката
futuresпредоставя множество полезни функции и макроси - за изпълнение на асинхронен код е необходим async runtime - предоставя се от външни библиотеки
Асинхронно програмиране в Rust
Библиотечни функции:
- използват се типовете от
stdиfutures - обикновенно работят с всеки async runtime
Изпълними програми:
- трябва да се избере (поне) един async runtime
Async/.await в Rust
async функции
- в превод
async fn(...) -> T⇒fn(...) -> impl Future<Output = T> five()е от типimpl Future<Output = u8>five().awaitе от типu8
// връща анонимен тип, който имплементира trait-а `Future<Output = u8>`
async fn five() -> u8 {
5
}
#![allow(dead_code)] // връща анонимен тип, който имплементира trait-а `Future
Async/.await в Rust
async блокове
use std::future::Future;
fn ten() -> impl Future<Output = u8> {
// връща анонимен тип, който имплементира trait-а `Future<Output = u8>`
async {
let x: u8 = five().await;
x + 5
}
}
#![allow(dead_code)] use std::future::Future; fn ten() -> impl Future
Async/.await в Rust
.await
.awaitе постфиксен оператор.awaitможе да се използва само вasync fnилиasync {}
async fn five() -> u8 {
5
}
async fn ten() -> u8 {
five().await + 5
}
fn main() {
let x = ten().await;
}
error[E0728]: `await` is only allowed inside `async` functions and blocks --> src/bin/main_c247aff303b227bd4f1fa2405d3f7ad357520e11.rs:10:19 | 9 | fn main() { | --------- this is not `async` 10 | let x = ten().await; | ^^^^^ only allowed inside `async` functions and blocks For more information about this error, try `rustc --explain E0728`. error: could not compile `rust` (bin "main_c247aff303b227bd4f1fa2405d3f7ad357520e11") due to 1 previous error
async fn five() -> u8 {
5
}
async fn ten() -> u8 {
five().await + 5
}
fn main() {
let x = ten().await;
}
Async/.await в Rust
.await
.awaitе постфиксен оператор.awaitможе да се използва само вasync fnилиasync {}
async fn five() -> u8 {
5
}
async fn ten() -> u8 {
five().await + 5
}
fn main() {
let x = ::futures::executor::block_on(async {
ten().await
});
}
async fn five() -> u8 {
5
}
async fn ten() -> u8 {
five().await + 5
}
fn main() {
let x = ::futures::executor::block_on(async {
ten().await
});
}
Trait Future
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
use std::task::Context;
use std::pin::Pin;
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll;
}
pub enum Poll {
Ready(T),
Pending,
}
fn main() {}
- тип, който имплементира
Future, съдържа всичката информация нужна за изпълнението на асинхронна операция - игнорирайте
PinиContextзасега
Trait Future
- future-а е само структура
- по само себе си не прави нищо (той е мързелив)
- за да започне работа трябва някой да му извика
poll
async fn foo() {
println!("foo");
}
let foo_future = foo();
#![allow(unused_variables)]
fn main() {
async fn foo() {
println!("foo");
}
let foo_future = foo();
}
Изпълнение на future
Future може да се изпълни
- като му се извика
.awaitвasyncблок или функция - като се подаде на executor
Изпълнение на future
Async функцията връща анонимна структура, която имплементира трейта Future.
Тази структура е enum, който съдържа всички възможни междинни състояния.
fn main() {
let fut1 = async_func1();
::futures::executor::block_on(fut1);
}
async fn async_func1() -> i32 {
let fut2 = another_async_func();
let x = fut2.await;
let y = regular_func();
let fut3 = make_call_to_db();
let z = fut3.await;
x + y + z
}
fn main() {
let fut1 = async_func1();
::futures::executor::block_on(fut1);
}
async fn async_func1() -> i32 {
let fut2 = another_async_func();
let x = fut2.await;
let y = regular_func();
let fut3 = make_call_to_db();
let z = fut3.await;
x + y + z
}
async fn another_async_func() -> i32 { 0 }
async fn make_call_to_db() -> i32 { 0 }
fn regular_func() -> i32 { 0 }
enum Fut1 {
Init,
AtAwait1 { fut2: Fut2 },
AtAwait2 { x: i32, y: i32, fut3: Fut3 },
Done,
}
Изпълнение на future
Futures извършват прогрес, когато някой executor им извика poll
enum Fut1 {
Init,
AtAwait1 { fut2: Fut2 },
AtAwait2 { x: i32, y: i32, fut3: Fut3 },
Done,
}
impl Future for Fut1 {
type Output = i32;
// Една възможна имплементация
// Все още игнорираме `Pin` и `Context`
fn poll(self: &mut Self) -> Poll<Self::Output> {
loop {
match std::mem::replace(self, Fut1::Done) {
Fut1::Init => {
let fut2 = another_async_func();
*self = Fut1::AtAwait1 { fut2 };
}
Fut1::AtAwait1 { mut fut2 } => {
match fut2.poll() {
Poll::Ready(res) => {
let x = res;
let y = regular_function();
let fut3 = make_call_to_db();
*self = Fut1::AtAwait2 { x, y, fut3 };
},
Poll::Pending => {
*self = Fut1::AtAwait1 { fut2 };
return Poll::Pending;
}
}
},
Fut1::AtAwait2 { x, y, mut fut3 } => {
match fut3.poll() {
Poll::Ready(res) => {
let z = res;
return Poll::Ready(x + y + z);
},
Poll::Pending => {
*self = Fut1::AtAwait2 { x, y, fut3 };
return Poll::Pending;
}
}
}
Fut1::Done => {
panic!("`poll` called on a finished future");
}
}
}
}
}
trait Future {
type Output;
fn poll(self: &mut Self) -> Poll;
}
enum Poll { Ready(T), Pending }
enum Fut1 {
Init,
AtAwait1 { fut2: Fut2 },
AtAwait2 { x: i32, y: i32, fut3: Fut3 },
Done,
}
struct Fut2;
impl Future for Fut2 { type Output = i32; fn poll(&mut self) -> Poll { todo!(); } }
struct Fut3;
impl Future for Fut3 { type Output = i32; fn poll(&mut self) -> Poll { todo!(); } }
fn regular_function() -> i32 { 0 }
fn another_async_func() -> Fut2 { todo!() }
fn make_call_to_db() -> Fut3 { todo!() }
impl Future for Fut1 {
type Output = i32;
// Една възможна имплементация
// Все още игнорираме `Pin` и `Context`
fn poll(self: &mut Self) -> Poll {
loop {
match std::mem::replace(self, Fut1::Done) {
Fut1::Init => {
let fut2 = another_async_func();
*self = Fut1::AtAwait1 { fut2 };
}
Fut1::AtAwait1 { mut fut2 } => {
match fut2.poll() {
Poll::Ready(res) => {
let x = res;
let y = regular_function();
let fut3 = make_call_to_db();
*self = Fut1::AtAwait2 { x, y, fut3 };
},
Poll::Pending => {
*self = Fut1::AtAwait1 { fut2 };
return Poll::Pending;
}
}
},
Fut1::AtAwait2 { x, y, mut fut3 } => {
match fut3.poll() {
Poll::Ready(res) => {
let z = res;
return Poll::Ready(x + y + z);
},
Poll::Pending => {
*self = Fut1::AtAwait2 { x, y, fut3 };
return Poll::Pending;
}
}
}
Fut1::Done => {
panic!("`poll` called on a finished future");
}
}
}
}
}
fn main() {}
Executors
- за да използвате async/await трябва да си изберете executor
Executors
- за да използвате async/await трябва да си изберете executor
- https://tokio.rs/ - най-често използвания framework е
tokio
Executors
- за да използвате async/await трябва да си изберете executor
- https://tokio.rs/ - най-често използвания framework е
tokio - https://docs.rs/tokio
Tokio
Tokio имплементира цялата машинария за изпълнение на future-и:
- event loop около функциите на операционната система за evented io (epoll, kqueue, iocp, …)
- scheduler за планиране на изпълнението на задачите
- таймери
Tokio
Нормално главната функция на програмата не може да е async.
Tokio предоставя макрос, който скрива това
#[tokio::main]
async fn main() {
}
#[tokio::main]
async fn main() {
}
Tokio
Не е нужно да се използва макроса, същото може да се постигне и на ръка.
Този начин също така позволява настройване на библиотеката
use tokio::runtime::Runtime;
fn main() {
let runtime = Runtime::new().unwrap();
match runtime.block_on(run()) {
Ok(()) => (),
Err(e) => {
eprintln!("An error occurred: {}", e);
std::process::exit(1);
}
}
}
async fn run() -> Result<(), Box<dyn std::error::Error>> {
/* ... */
}
use tokio::runtime::Runtime;
fn main() {
let runtime = Runtime::new().unwrap();
match runtime.block_on(run()) {
Ok(()) => (),
Err(e) => {
eprintln!("An error occurred: {}", e);
std::process::exit(1);
}
}
}
async fn run() -> Result<(), Box> {
/* ... */
Ok(())
}
Tokio
По подразбиране tokio стартира многонишков runtime.
Това в много случаи не е необходимо - tokio ни дава concurrency, много често не ни е нужен и паралелизъм. В такива случаи можем да настроим tokio да използва еднонишков runtime.
use tokio::runtime::Builder;
fn main() {
let runtime = Builder::new_current_thread()
.build()
.unwrap();
match runtime.block_on(run()) {
Ok(()) => (),
Err(e) => {
eprintln!("An error occurred: {}", e);
std::process::exit(1);
}
}
}
async fn run() -> Result<(), Box<dyn std::error::Error>> {
/* ... */
}
use tokio::runtime::Builder;
fn main() {
let runtime = Builder::new_current_thread()
.build()
.unwrap();
match runtime.block_on(run()) {
Ok(()) => (),
Err(e) => {
eprintln!("An error occurred: {}", e);
std::process::exit(1);
}
}
}
async fn run() -> Result<(), Box> {
/* ... */
Ok(())
}
Executors
- други executor-и
Executors
- други executor-и
- https://docs.rs/async-std - огледално копие на стандартната библиотека, но всички блокиращи функции са заменени с асинхронни
Executors
- други executor-и
- https://docs.rs/async-std - огледално копие на стандартната библиотека, но всички блокиращи функции са заменени с асинхронни
- https://docs.rs/smol - малък, прост executor
Executors
- други executor-и
- https://docs.rs/async-std - огледално копие на стандартната библиотека, но всички блокиращи функции са заменени с асинхронни
- https://docs.rs/smol - малък, прост executor
- https://docs.rs/pollster - не е асинхронен executor
- предоставя единствено функция
block_on - когато пишем синхронен код, но се налага да извикаме някоя функция, маркирана като
async
- предоставя единствено функция
Executors
- други executor-и
- https://docs.rs/async-std - огледално копие на стандартната библиотека, но всички блокиращи функции са заменени с асинхронни
- https://docs.rs/smol - малък, прост executor
- https://docs.rs/pollster - не е асинхронен executor
- предоставя единствено функция
block_on - когато пишем синхронен код, но се налага да извикаме някоя функция, маркирана като
async
- предоставя единствено функция
- https://github.com/embassy-rs/embassy - за embedded устройства
Executors
Съвместимост
- Не всички async библиотеки са съвместими една с друга!
Executors
Съвместимост
- Не всички async библиотеки са съвместими една с друга!
- функционалности, които могат да работят под всеки executor
- комбинатори, синхронизационни примитиви
- асинхронни задачи (task-ове), които не използват директно I/O
- код, който използва само
Futuretrait-а
Executors
Съвместимост
- Не всички async библиотеки са съвместими една с друга!
- функционалности, които могат да работят под всеки executor
- комбинатори, синхронизационни примитиви
- асинхронни задачи (task-ове), които не използват директно I/O
- код, който използва само
Futuretrait-а
- функционалности, които могат да работят само на определен executor
- асинхронен вход/изход
- таймери
- и всички абстракции, надграждащи над това
- код, който има тясно взаимодействие с event loop-а на executor-а
Executors
Съвместимост
- Не всички async библиотеки са съвместими една с друга!
- функционалности, които могат да работят под всеки executor
- комбинатори, синхронизационни примитиви
- асинхронни задачи (task-ове), които не използват директно I/O
- код, който използва само
Futuretrait-а
- функционалности, които могат да работят само на определен executor
- асинхронен вход/изход
- таймери
- и всички абстракции, надграждащи над това
- код, който има тясно взаимодействие с event loop-а на executor-а
- https://rust-lang.github.io/async-book/08_ecosystem/00_chapter.html
Futures екосистемата
- преди да бъдат добавени към
stdfutures съществуваха в rust екосистемата като библиотеката futures
Futures екосистемата
- преди да бъдат добавени към
stdfutures съществуваха в rust екосистемата като библиотеката futures - в стандартната библиотека са стабилизирани част от
futures 0.3
Futures екосистемата
- преди да бъдат добавени към
stdfutures съществуваха в rust екосистемата като библиотеката futures - в стандартната библиотека са стабилизирани част от
futures 0.3 - но има още неща, които не са добавени
- композиране на futures
StreamиSinktraitsAsyncReadиAsyncWritetraitsselect!,join!block_on- и други полезни неща
Подводни камъни
Блокиращи операции
- когато ползваме async е важно да нямаме блокиращи операции
Подводни камъни
Блокиращи операции
- когато ползваме async е важно да нямаме блокиращи операции
- блокиращите операции блокират текущата нишка
Подводни камъни
Блокиращи операции
- когато ползваме async е важно да нямаме блокиращи операции
- блокиращите операции блокират текущата нишка
- async executor-ите ползват ограничено количество нишки за изпълнение на задачи
Подводни камъни
Блокиращи операции
- когато ползваме async е важно да нямаме блокиращи операции
- блокиращите операции блокират текущата нишка
- async executor-ите ползват ограничено количество нишки за изпълнение на задачи
- в резултат можем лесно да си забавим или тотално забием програмата
Подводни камъни
Примитиви за синхронизация
- примитивите в
std::syncблокират текущата нишка
Подводни камъни
Примитиви за синхронизация
- примитивите в
std::syncблокират текущата нишка - вместо това трябва да се използват async версии, които блокират само текущата задача
Подводни камъни
Примитиви за синхронизация
- примитивите в
std::syncблокират текущата нишка - вместо това трябва да се използват async версии, които блокират само текущата задача
tokio::sync
Подводни камъни
Тежки операции
- други блокиращи операции, като
- тежки изчисления
- блокиращ код, който не може да се промени
Подводни камъни
Тежки операции
- други блокиращи операции, като
- тежки изчисления
- блокиращ код, който не може да се промени
- трябва да се изпълняват на отделна ОС нишка
Подводни камъни
Тежки операции
- други блокиращи операции, като
- тежки изчисления
- блокиращ код, който не може да се промени
- трябва да се изпълняват на отделна ОС нишка
- може да се пуска нова нишка за всяка операция
Подводни камъни
Тежки операции
- други блокиращи операции, като
- тежки изчисления
- блокиращ код, който не може да се промени
- трябва да се изпълняват на отделна ОС нишка
- може да се пуска нова нишка за всяка операция
- или да се използва thread pool
Подводни камъни
Тежки операции
- други блокиращи операции, като
- тежки изчисления
- блокиращ код, който не може да се промени
- трябва да се изпълняват на отделна ОС нишка
- може да се пуска нова нишка за всяка операция
- или да се използва thread pool
- tokio има
tokio::task::spawn_blocking
Подводни камъни
Задържане на променливи между await състояния
async fn some_function() -> i32 {
let big_vec = vec![ /* ... */ ];
// Променливата `big_vec` се използва за последно тук
let val1 = some_async_op(&big_vec).await;
let val2 = other_async_op().await;
val1 + val2
// Деструктора на `big_vec` се извиква тук, в края на scope-а.
// Това означава, че променливата трябва да се пази жива през цялото време
// докато future-а е жив - ненужно заемане на памет
}
async fn some_function() -> i32 {
let big_vec = vec![ /* ... */ ];
// Променливата `big_vec` се използва за последно тук
let val1 = some_async_op(&big_vec).await;
let val2 = other_async_op().await;
val1 + val2
// Деструктора на `big_vec` се извиква тук, в края на scope-а.
// Това означава, че променливата трябва да се пази жива през цялото време
// докато future-а е жив - ненужно заемане на памет
}
async fn some_async_op(_: &[u8]) -> i32 { 0 }
async fn other_async_op() -> i32 { 0 }
fn main() {}
Подводни камъни
Задържане на променливи между await състояния
async fn some_function() -> i32 {
let big_vec = vec![ /* ... */ ];
// Променливата `big_vec` се използва за последно тука
let val1 = some_async_op(&big_vec).await;
// Ръчно деструктираме променливата.
// Така тя няма да се пази в състоянието на future-а при
// следващите await точки.
drop(big_vec);
let val2 = other_async_op().await;
val1 + val2
}
async fn some_function() -> i32 {
let big_vec = vec![ /* ... */ ];
// Променливата `big_vec` се използва за последно тука
let val1 = some_async_op(&big_vec).await;
// Ръчно деструктираме променливата.
// Така тя няма да се пази в състоянието на future-а при
// следващите await точки.
drop(big_vec);
let val2 = other_async_op().await;
val1 + val2
}
async fn some_async_op(_: &[u8]) -> i32 { 0 }
async fn other_async_op() -> i32 { 0 }
fn main() {}
Допълнителни материали
- https://rust-lang.github.io/async-book/ - официалната книга за асинхронен Ръст
- https://book.async.rs/ - книгата за async std, имат добър туториал за имплементация на чат сървър
- https://www.youtube.com/watch?v=L7X0vpAU-sU - презентация за async-std