Как выполнять async/await в Rust
В этой статье мы рассмотрим, как использовать async/await синтаксис в Rust. Мы создадим простой пример, а затем постепенно усложним его. В конце мы добавим Tokio runtime и посмотрите, как его использовать.
Инициализация проекта
cargo init
Мы добавимkube
иk8s-openapi
ящики в наш демонстрационный проект. Использование этих библиотек не находится в центре внимания. В будущем мы напишем статью о том, как использоватьRust
для взаимодействия с Kubernetes.
А пока просто добавим ящики в наш проект, используя следующие команды:
cargo add kube --features default, derive
cargo add k8s-openapi --features v1_24
async/await в Rust
async/await
синтаксис — это способ написания асинхронного кода вRust
Чтобы продемонстрировать, как это работает, начнем с следующего кода:
fn main() {
println!("Hello, world!");
}
fn get_my_pods() {
println!("Get all my pods in default namespace");
}
Давайте сделаем get_my_pods
функцию асинхронной. Мы будем использовать async
ключевое слово для этого:
fn main() {
println!("Hello, world!");
}
async fn get_my_pods() {
println!("Get all my pods in default namespace");
}
ВRust async/wait
— это специальный синтаксис, который позволяет нам писать функции, замыкания и блоки, которые можно приостанавливать и вернуть управление вызывающей стороне. Это позволяет другим частям программы работать, пока асинхронная функция ожидает и продолжите с того места, на котором остановились, когда будете готовы продолжить.
Одно из преимуществ использования async/await
Синтаксис позволяет нам писать код, который выглядит как синхронный код.
async/await
синтаксис аналогичен синтаксису on в других языках, таких как JavaScript или C#, с некоторыми ключевыми отличиями.
Ключевое слово async
на самом деле является синтаксическим сахаром для функции, подобной этой:
fn main() {
println!("Hello, world!");
}
fn get_my_pods() -> impl Future<Output=()> {
println!("Get all my pods in default namespace");
}
//async fn get_my_pods() {
// println!("Get all my pods in default namespace");
//}
Асинхронные функции — это особый вид функций, которые возвращают значение, реализующее Future
черта. Future
тип вывода — это тип, который возвращается функцией после ее завершения. В нашем случае мы не возвращаемся что угодно, поэтому мы используем ()
тип.
Упрощенная версия Future
черта выглядит так:
trait Future {
type Output;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T),
Pending,
}
fn main() {
println!("Hello, world!");
}
fn get_my_pods() -> impl Future<Output=()> {
println!("Get all my pods in default namespace");
}
Future
— это простой конечный автомат, который можно опросить, чтобы проверить, готов он или нет. poll
метод возвращает перечисление с двумя возможными значениями: Ready
или Pending
. Он также принимает функцию обратного вызова, называемую wake
.
Если позвонить poll
метод возвращает ожидание, тогда Future будет продолжать прогрессировать в фоновом режиме, пока не будет готов пройти повторный опрос. wake
функция обратного вызова используется для уведомления исполнителя о том, что Future готов к использованию. опросил еще раз.
В Javascript это похоже на Promises, ожидайте, что в Rust
Futures ленивы и не начинают исполняться, пока они не будут доведено до завершения путем опроса.
Futures можно довести до завершения, либо ожидая их, либо передавая их исполнителю.
Давайте посмотрим, как мы можем ждать будущего в Rust
. Мы добавим еще одну функцию под названием get_all_pods_in_namespace
который будет напечатайте имена всех модулей в заданном пространстве имен:
async fn get_all_pods_in_namespace(namespace: &str) -> ObjectList<Pod> {
println!("Get all my pods in a namespace {}", namespace);
let client = Client::try_default().await;
let api = Api::<Pod>::namespaced(client.unwrap(), namespace);
let lp = ListParams::default();
api.list(&lp).await.unwrap()
}
В нашей функции get_my_pods
мы позвоним get_all_pods_in_namespace
функцию для двух пространств имен и распечатайте количество модулей в каждом пространстве имен. Мы будем использовать await
ключевое слово, чтобы дождаться завершения будущего.
async fn get_my_pods() {
println!("Get all my pods");
let pods1 = get_all_pods_in_namespace("default").await;
println!("Got {} pods", pods1.items.len());
let pods2 = get_all_pods_in_namespace("kube-system").await;
println!("Got {} pods", pods2.items.len());
}
Именно это позволяет нам писать асинхронный код, похожий на синхронный. await
ключевое слово также будет приостановлено выполнение текущей функции возвращает управление во время выполнения.
Чтобы понять, как это работает, представьте себе процесс вызова get_my_pods как конечного автомата, представляющего собой перечисление с тремя состояниями:
enum FutureStateMachine {
State1,
State2,
State3,
}
Когда get_my_pods
вызывается первым, весь код функции выполняется до первого await
точка. будущее вернет Pending
состояние, потому что оно ожидает, пока API Kubernetes вернет список модулей. Как только возвращается список модулей, get_my_pods
уведомит своего исполнителя о том, что он готов к повторному опросу. Исполнитель будет затем возобновляем выполнение функции и продолжаем до следующего await
точка. Этот процесс будет продолжаться до тех пор, пока второй await
точка достигнута. Код в состоянии 2 будет выполняться синхронно всё до второй await
точка.
И снова функция вернет Pending
состояние, потому что оно ожидает API Kubernetes вернуться. Как только модули списка будут возвращены, get_my_pods
уведомит своего исполнителя о том, что он готов к повторному опросу. В третьем состоянии остальная часть кода будет выполнена синхронно, и функция вернет значение. Ready
заявить как мы находимся в конце функции.
Теперь мы знаем, как async/await
синтаксис работает, теперь попробуем вызвать get_my_pods
функция от main
функция.
Если мы попытаемся вызвать get_my_pods
функция от main
функции, мы получим ошибку компилятора:
fn main() {
println!("Hello, world!");
get_my_pods();
}
Вы получите предупреждение о том, что Features ничего не сделают, если вы .await
или опросите их. Итак, попробуем дождаться функция:
fn main() {
println!("Hello, world!");
get_my_pods().await;
}
Теперь мы получаем ошибку компилятора, в которой говорится, что await
разрешено только в async
функции. Итак, давайте сделаем main
функция async
:
async fn main() {
println!("Hello, world!");
get_my_pods().await;
}
Теперь мы получаем ошибку компилятора, в которой говорится, что main
не разрешено быть async
.
Итак, как нам вызвать async
функцию от main
функции? Futures можно довести до завершения двумя способами:
- Ожидая их.
- Или вручную опрашивать их, пока они не будут готовы.
Для Futures внутри других Futures мы можем использовать await
ключевое слово. Но если мы хотим вызвать async
функция сверху На большинстве уровней нашей программы нам нужно вручную опрашивать будущее, пока оно не будет готово. Этот код называется runtime
или а executor
.
А runtime
отвечает за опрос Futures верхнего уровня до тех пор, пока они не будут готовы. Он также отвечает за запуск несколько Futures параллельно. Стандартная библиотека не предоставляет среду выполнения, но существует множество созданных сообществом доступные среды выполнения. Мы будем использовать самый популярный из них под названием tokio
в этом уроке.
Что такое Tokio?
Tokio
это асинхронная среда выполнения для Rust
. Он предоставляет строительные блоки, необходимые для написания сетевых приложений. Это дает гибкость для работы с широким спектром систем, от больших серверов с десятками ядра для небольших встраиваемых устройств.
На высоком уровне, Tokio
содержит несколько основных компонентов:
- Многопоточная среда выполнения для выполнения асинхронного кода.
- Асинхронная версия стандартной библиотеки.
- Большая экосистема библиотек.
Преимущество использования Tokio
заключается в том, что он быстрый, надежный, простой в использовании и очень гибкий.
Чтобы добавить Tokio
для нашего проекта нам нужно запустить следующую команду:
cargo add tokio --features full
Это добавит tokio
crate в наш проект и включите все функции.
Теперь мы можем использовать атрибут tokio::main
макрос для запуска нашего main
функция:
#[tokio::main]
async fn main() {
println!("Hello, world!");
get_my_pods().await;
}
Это указывает на то, что main
функция является асинхронной функцией и будет выполняться Tokio
время выполнения.
Запустим нашу программу:
cargo run
Вы должны увидеть следующий вывод:
Hello, world!
Get all my pods
Get all my pods in a namespace default
Got 0 pods
Get all my pods in a namespace kube-system
Got 9 pods
Вы можете видеть, что наши операторы печати печатаются в том порядке, в котором они вызываются. Как упоминалось ранее: фьючерсы ленивы. и ничего не делайте, пока вы не дождетесь их или не опросите их.
Мы можем сохранить результат get_my_pods
функцию в переменной и вызвать await
об этом позже:
#[tokio::main]
async fn main() {
println!("Hello, world!");
let p = get_my_pods();
println!("Where are my pods?");
p.await;
}
Давайте снова запустим нашу программу:
cargo run
Вы должны увидеть следующий вывод:
Hello, world!
Where are my pods?
Get all my pods
Get all my pods in a namespace default
Got 0 pods
Get all my pods in a namespace kube-system
Got 9 pods
На этот раз оператор println Where are my pods?
печатается до get_my_pods
функция вызывается.
Преимущество ленивости фьючерсов в том, что они представляют собой абстракцию с нулевой стоимостью. Это означает, что вы не понесете затрат во время выполнения. если только вы на самом деле не ждете будущего. Еще одним преимуществом является то, что вы можете отменить будущее в любое время. Чтобы отменить будущее все, что вам нужно сделать, это прекратить его опрос.
В настоящее время мы не используем преимущества асинхронной природы get_my_pods
функция, потому что все работает серийно. Чтобы выполнить наш запуск одновременно, мы можем использовать задачу tokio. Задача — это облегченная неблокирующая единица исполнение.
Давайте изменим код, чтобы использовать задачу. Сначала мы создаем пустой вектор для хранения дескрипторов наших задач, а затем создаем цикл с двумя итерациями. На каждой итерации мы будем создавать задачу и сохранять ее дескриптор в векторе.
Мы проходим async
блокировать до spawn
функция. Мы можем использовать move
ключевое слово с async
заблокировать так, чтобы тот async
блок может захватывать переменные из внешней области. В этом случае мы фиксируем i
переменная и передать это get_my_pods
функция.
В конце основного кода мы перебираем вектор дескрипторов задач и вызываем await
на каждом из них.
#[tokio::main]
async fn main() {
println!("Hello, world!");
let mut handles = vec![];
for i in 0..2 {
let handle = tokio::spawn(async move {
get_my_pods(i).await;
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
}
async fn get_my_pods(i: i32) {
println!("[{i}] Get all my pods");
let pods1 = get_all_pods_in_namespace("default").await;
println!("[{i}] Got {} pods", pods1.items.len());
let pods2 = get_all_pods_in_namespace("kube-system").await;
println!("[{i}] Got {} pods", pods2.items.len());
}
Итак, давайте снова запустим нашу программу:
cargo run
Вы должны увидеть следующий вывод:
Hello, world!
[0] Get all my pods
Get all my pods in a namespace default
[1] Get all my pods
Get all my pods in a namespace default
[0] Got 0 pods
Get all my pods in a namespace kube-system
[1] Got 0 pods
Get all my pods in a namespace kube-system
[0] Got 9 pods
[1] Got 9 pods
Вы можете видеть, что две задачи выполняются одновременно. По умолчанию, Tokio
использует пул потоков для выполнения задач. Этот позволяет задачам выполнять задачи параллельно.
Существует также возможность заставить Токио запускать задачи в одном потоке. Это можно сделать с помощью тот current_thread
вкус tokio::main
макрос. Это приведет к одновременному выполнению потоков с использованием времени вместо этого нарезая нити.
Подведение итогов
В этом уроке мы узнали, как использовать async
и await
ключевые слова для написания асинхронного кода. Мы также узнали, как использовать tokio
как среду выполнения для одновременного выполнения нашего асинхронного кода.
Когда вы имеете дело с асинхронным кодом, вы должны знать, что мы сообщаем среде выполнения, когда блок асинхронного кода готов к работе, чтобы можно было выполнить другие задачи. Это дает нам контроль над рудой, однако это также возлагает на нас больше ответственности. Нам нужно убедиться, что мы пишем эффективный код. Например, мы не хотим помещать операции с интенсивным использованием ЦП внутри асинхронного блока, поскольку это заблокирует поток и предотвратит выполнение других задач.