Как выполнять async/await в Rust

В этой статье мы рассмотрим, как использовать async/await синтаксис в Rust. Мы создадим простой пример, а затем постепенно усложним его. В конце мы добавим Tokio runtime и посмотрите, как его использовать.

Инициализация проекта

cargo init

Мы добавимkubeиk8s-openapiящики в наш демонстрационный проект. Использование этих библиотек не находится в центре внимания. В будущем мы напишем статью о том, как использоватьRustдля взаимодействия с Kubernetes.

А пока просто добавим ящики в наш проект, используя следующие команды:

cargo add kube --features default, derive
cargo add k8s-openapi --features v1_24

async/await в Rust

async/await синтаксис — это способ написания асинхронного кода вRust Чтобы продемонстрировать, как это работает, начнем с следующего кода:

fn main() {
    println!("Hello, world!");
}

fn get_my_pods() {
    println!("Get all my pods in default namespace");
}

Давайте сделаем get_my_pods функцию асинхронной. Мы будем использовать async ключевое слово для этого:

fn main() {
    println!("Hello, world!");
}

async fn get_my_pods() {
    println!("Get all my pods in default namespace");
}

ВRust async/wait — это специальный синтаксис, который позволяет нам писать функции, замыкания и блоки, которые можно приостанавливать и вернуть управление вызывающей стороне. Это позволяет другим частям программы работать, пока асинхронная функция ожидает и продолжите с того места, на котором остановились, когда будете готовы продолжить.

Одно из преимуществ использования async/await Синтаксис позволяет нам писать код, который выглядит как синхронный код.

async/await синтаксис аналогичен синтаксису on в других языках, таких как JavaScript или C#, с некоторыми ключевыми отличиями.

Ключевое слово async на самом деле является синтаксическим сахаром для функции, подобной этой:

fn main() {
    println!("Hello, world!");
}

fn get_my_pods() -> impl Future<Output=()> {
    println!("Get all my pods in default namespace");
}

//async fn get_my_pods() {
//    println!("Get all my pods in default namespace");
//}

Асинхронные функции — это особый вид функций, которые возвращают значение, реализующее Future черта. Future тип вывода — это тип, который возвращается функцией после ее завершения. В нашем случае мы не возвращаемся что угодно, поэтому мы используем () тип.

Упрощенная версия Future черта выглядит так:

trait Future {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}

fn main() {
    println!("Hello, world!");
}

fn get_my_pods() -> impl Future<Output=()> {
    println!("Get all my pods in default namespace");
}

Future — это простой конечный автомат, который можно опросить, чтобы проверить, готов он или нет. poll метод возвращает перечисление с двумя возможными значениями: Ready или Pending. Он также принимает функцию обратного вызова, называемую wake.

Если позвонить poll метод возвращает ожидание, тогда Future будет продолжать прогрессировать в фоновом режиме, пока не будет готов пройти повторный опрос. wake функция обратного вызова используется для уведомления исполнителя о том, что Future готов к использованию. опросил еще раз.

В Javascript это похоже на Promises, ожидайте, что в Rust Futures ленивы и не начинают исполняться, пока они не будут доведено до завершения путем опроса.

Futures можно довести до завершения, либо ожидая их, либо передавая их исполнителю.

Давайте посмотрим, как мы можем ждать будущего в Rust . Мы добавим еще одну функцию под названием get_all_pods_in_namespace который будет напечатайте имена всех модулей в заданном пространстве имен:

async fn get_all_pods_in_namespace(namespace: &str) -> ObjectList<Pod> {
    println!("Get all my pods in a namespace {}", namespace);
    let client = Client::try_default().await;
    let api = Api::<Pod>::namespaced(client.unwrap(), namespace);
    let lp = ListParams::default();
    api.list(&lp).await.unwrap()
}

В нашей функции get_my_pods мы позвоним get_all_pods_in_namespace функцию для двух пространств имен и распечатайте количество модулей в каждом пространстве имен. Мы будем использовать await ключевое слово, чтобы дождаться завершения будущего.

async fn get_my_pods() {
    println!("Get all my pods");
    let pods1 = get_all_pods_in_namespace("default").await;
    println!("Got {} pods", pods1.items.len());
    let pods2 = get_all_pods_in_namespace("kube-system").await;
    println!("Got {} pods", pods2.items.len());
}

Именно это позволяет нам писать асинхронный код, похожий на синхронный. await ключевое слово также будет приостановлено выполнение текущей функции возвращает управление во время выполнения.

Чтобы понять, как это работает, представьте себе процесс вызова get_my_pods как конечного автомата, представляющего собой перечисление с тремя состояниями:

enum FutureStateMachine {
    State1,
    State2,
    State3,
}

Когда get_my_pods вызывается первым, весь код функции выполняется до первого await точка. будущее вернет Pending состояние, потому что оно ожидает, пока API Kubernetes вернет список модулей. Как только возвращается список модулей, get_my_pods уведомит своего исполнителя о том, что он готов к повторному опросу. Исполнитель будет затем возобновляем выполнение функции и продолжаем до следующего await точка. Этот процесс будет продолжаться до тех пор, пока второй await точка достигнута. Код в состоянии 2 будет выполняться синхронно всё до второй await точка.

И снова функция вернет Pending состояние, потому что оно ожидает API Kubernetes вернуться. Как только модули списка будут возвращены, get_my_pods уведомит своего исполнителя о том, что он готов к повторному опросу. В третьем состоянии остальная часть кода будет выполнена синхронно, и функция вернет значение. Ready заявить как мы находимся в конце функции.

Теперь мы знаем, как async/await синтаксис работает, теперь попробуем вызвать get_my_pods функция от main функция.

Если мы попытаемся вызвать get_my_pods функция от main функции, мы получим ошибку компилятора:

fn main() {
    println!("Hello, world!");
    get_my_pods();
}

Вы получите предупреждение о том, что Features ничего не сделают, если вы .await или опросите их. Итак, попробуем дождаться функция:

fn main() {
    println!("Hello, world!");
    get_my_pods().await;
}

Теперь мы получаем ошибку компилятора, в которой говорится, что await разрешено только в async функции. Итак, давайте сделаем main функция async:

async fn main() {
    println!("Hello, world!");
    get_my_pods().await;
}

Теперь мы получаем ошибку компилятора, в которой говорится, что main не разрешено быть async.

Итак, как нам вызвать async функцию от main функции? Futures можно довести до завершения двумя способами:

  1. Ожидая их.
  2. Или вручную опрашивать их, пока они не будут готовы.

Для Futures внутри других Futures мы можем использовать await ключевое слово. Но если мы хотим вызвать async функция сверху На большинстве уровней нашей программы нам нужно вручную опрашивать будущее, пока оно не будет готово. Этот код называется runtime или а executor.

А runtime отвечает за опрос Futures верхнего уровня до тех пор, пока они не будут готовы. Он также отвечает за запуск несколько Futures параллельно. Стандартная библиотека не предоставляет среду выполнения, но существует множество созданных сообществом доступные среды выполнения. Мы будем использовать самый популярный из них под названием tokio в этом уроке.

Что такое Tokio?

Tokio это асинхронная среда выполнения для Rust. Он предоставляет строительные блоки, необходимые для написания сетевых приложений. Это дает гибкость для работы с широким спектром систем, от больших серверов с десятками ядра для небольших встраиваемых устройств.

На высоком уровне, Tokio содержит несколько основных компонентов:

  • Многопоточная среда выполнения для выполнения асинхронного кода.
  • Асинхронная версия стандартной библиотеки.
  • Большая экосистема библиотек.

Преимущество использования Tokio заключается в том, что он быстрый, надежный, простой в использовании и очень гибкий.

Чтобы добавить Tokio для нашего проекта нам нужно запустить следующую команду:

cargo add tokio --features full

Это добавит tokio crate в наш проект и включите все функции.

Теперь мы можем использовать атрибут tokio::main макрос для запуска нашего main функция:

#[tokio::main]
async fn main() {
    println!("Hello, world!");
    get_my_pods().await;
}

Это указывает на то, что main функция является асинхронной функцией и будет выполняться Tokio время выполнения.

Запустим нашу программу:

cargo run

Вы должны увидеть следующий вывод:

Hello, world!
Get all my pods
Get all my pods in a namespace default
Got 0 pods
Get all my pods in a namespace kube-system
Got 9 pods

Вы можете видеть, что наши операторы печати печатаются в том порядке, в котором они вызываются. Как упоминалось ранее: фьючерсы ленивы. и ничего не делайте, пока вы не дождетесь их или не опросите их.

Мы можем сохранить результат get_my_pods функцию в переменной и вызвать await об этом позже:

#[tokio::main]
async fn main() {
    println!("Hello, world!");
    let p = get_my_pods();
    println!("Where are my pods?");
    p.await;
}

Давайте снова запустим нашу программу:

cargo run

Вы должны увидеть следующий вывод:

Hello, world!
Where are my pods?
Get all my pods
Get all my pods in a namespace default
Got 0 pods
Get all my pods in a namespace kube-system
Got 9 pods

На этот раз оператор println Where are my pods? печатается до get_my_pods функция вызывается.

Преимущество ленивости фьючерсов в том, что они представляют собой абстракцию с нулевой стоимостью. Это означает, что вы не понесете затрат во время выполнения. если только вы на самом деле не ждете будущего. Еще одним преимуществом является то, что вы можете отменить будущее в любое время. Чтобы отменить будущее все, что вам нужно сделать, это прекратить его опрос.

В настоящее время мы не используем преимущества асинхронной природы get_my_pods функция, потому что все работает серийно. Чтобы выполнить наш запуск одновременно, мы можем использовать задачу tokio. Задача — это облегченная неблокирующая единица исполнение.

Давайте изменим код, чтобы использовать задачу. Сначала мы создаем пустой вектор для хранения дескрипторов наших задач, а затем создаем цикл с двумя итерациями. На каждой итерации мы будем создавать задачу и сохранять ее дескриптор в векторе.

Мы проходим async блокировать до spawn функция. Мы можем использовать move ключевое слово с async заблокировать так, чтобы тот async блок может захватывать переменные из внешней области. В этом случае мы фиксируем i переменная и передать это get_my_pods функция.

В конце основного кода мы перебираем вектор дескрипторов задач и вызываем await на каждом из них.

#[tokio::main]
async fn main() {
    println!("Hello, world!");
    let mut handles = vec![];
    for i in 0..2 {
        let handle = tokio::spawn(async move {
            get_my_pods(i).await;
        });
        handles.push(handle);
    }
    for handle in handles {
        handle.await.unwrap();
    }
}

async fn get_my_pods(i: i32) {
    println!("[{i}] Get all my pods");
    let pods1 = get_all_pods_in_namespace("default").await;
    println!("[{i}] Got {} pods", pods1.items.len());
    let pods2 = get_all_pods_in_namespace("kube-system").await;
    println!("[{i}] Got {} pods", pods2.items.len());
}

Итак, давайте снова запустим нашу программу:

cargo run

Вы должны увидеть следующий вывод:

Hello, world!
[0] Get all my pods
Get all my pods in a namespace default
[1] Get all my pods
Get all my pods in a namespace default
[0] Got 0 pods
Get all my pods in a namespace kube-system
[1] Got 0 pods
Get all my pods in a namespace kube-system
[0] Got 9 pods
[1] Got 9 pods

Вы можете видеть, что две задачи выполняются одновременно. По умолчанию, Tokio использует пул потоков для выполнения задач. Этот позволяет задачам выполнять задачи параллельно.

Существует также возможность заставить Токио запускать задачи в одном потоке. Это можно сделать с помощью тот current_thread вкус tokio::main макрос. Это приведет к одновременному выполнению потоков с использованием времени вместо этого нарезая нити.

Подведение итогов

В этом уроке мы узнали, как использовать async и await ключевые слова для написания асинхронного кода. Мы также узнали, как использовать tokio как среду выполнения для одновременного выполнения нашего асинхронного кода.

Когда вы имеете дело с асинхронным кодом, вы должны знать, что мы сообщаем среде выполнения, когда блок асинхронного кода готов к работе, чтобы можно было выполнить другие задачи. Это дает нам контроль над рудой, однако это также возлагает на нас больше ответственности. Нам нужно убедиться, что мы пишем эффективный код. Например, мы не хотим помещать операции с интенсивным использованием ЦП внутри асинхронного блока, поскольку это заблокирует поток и предотвратит выполнение других задач.