Понимание Kotlin Coroutines

Coroutines (Корутины) — это шаблон проектирования для написания асинхронных программ для одновременного выполнения нескольких задач.

В асинхронных программах несколько задач выполняются параллельно в отдельных потоках, не дожидаясь завершения других задач. Потоки являются дорогостоящим ресурсом, и слишком большое количество потоков приводит к снижению производительности из-за высокого потребления памяти и использования ЦП.

Корутины — это альтернативный способ написания асинхронных программ, но он гораздо более легкий по сравнению с потоками. Это вычисления, которые выполняются поверх потоков.

Мы можем приостановить корутину, чтобы другие корутины могли работать в том же потоке. Далее мы можем возобновить работу корутины в том же или другом потоке.

Когда корутина приостанавливается, соответствующие вычисления приостанавливаются, удаляются из потока и сохраняются в памяти, оставляя поток свободным для выполнения других действий.

Таким образом, мы можем запускать множество корутин одновременно, используя лишь небольшой пул потоков, тем самым используя очень ограниченные системные ресурсы.

В этом посте мы поймем, как использовать корутины в Котлине.

Запуск параллельной программы с потоком

Давайте начнем с запуска программы, которая будет выполнять некоторые инструкции, а также вызывать долго выполняющуюся функцию:

//Todo: statement1
//Todo: call longRunningFunction
//Todo: statement2
...
...

Если мы последовательно выполним все операторы в одном потоке, longRunningFunctionпоток не сможет выполнить оставшиеся операторы, и программа в целом займет много времени для завершения.

Чтобы сделать его более эффективным, мы выполним его longRunningFunctionв отдельном потоке и позволим программе продолжить выполнение в основном потоке:

import kotlin.concurrent.thread

fun main() {
    println("My program runs...: 
        ${Thread.currentThread().name}")

    thread {
        longRunningTask()
    }

    println("My program run ends...: 
        ${Thread.currentThread().name}")
}

fun longRunningTask(){
    println("executing longRunningTask on...: 
        ${Thread.currentThread().name}")
    Thread.sleep(1000)
    println("longRunningTask ends on thread ...: 
        ${Thread.currentThread().name}")
}

Здесь мы моделируем длительное поведение, вызывая Thread.sleep() внутри функции: longRunningTask(). Мы вызываем эту функцию внутри thread функции. Это позволит main потоку продолжить выполнение, не дожидаясь longRunningTask() завершения функции.

Функция longRunningTask() будет выполняться в другом потоке, как мы можем наблюдать из вывода операторов println, запустив эту программу:

My program runs...: main
My program run ends...: main
executing longRunningTask on...: Thread-0
longRunningTask ends on thread ...: Thread-0

Process finished with exit code 0

Как мы видим в выводе, программа запускается в потоке: main. Он выполняет longRunningTask() поток on Thread-0, но не ждет его завершения и println() снова переходит к выполнению следующего оператора в потоке: main. Однако программа завершается кодом выхода 0только после longRunningTaskзавершения выполнения Thread-0.

В следующих разделах мы изменим эту программу для запуска с использованием корутин.

Добавление зависимостей для корутин

Язык Kotlin предоставляет нам базовые конструкции для написания корутин, но в библиотеке доступны более полезные конструкции, построенные на основе базовых корутин kotlinx-coroutines-core. Поэтому нам нужно добавить зависимость в kotlinx-coroutines-core библиотеку, прежде чем начинать писать корутины:

Мы выбрали инструмент сборки Gradle , поэтому зависимость от kotlinx-coroutines-core библиотеки будет выглядеть следующим образом:

dependencies {
    implementation 'org.jetbrains.kotlin:kotlin-stdlib'
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.2'
}

Здесь мы добавили зависимость от стандартной библиотеки Kotlin и библиотеки kotlinx-coroutines-core.

Простая корутина в Котлине

Корутины известны как облегченные потоки, что означает, что мы можем запускать код в корутинах аналогично тому, как мы запускаем код в потоках. Давайте изменим предыдущую программу, чтобы она запускала длительную функцию в корутине вместо отдельного потока, как показано ниже:

fun main() = runBlocking{
    println("My program runs...: ${Thread.currentThread().name}")

    launch { // starting a coroutine
        longRunningTask()  // calling the long running function
    }

    println("My program run ends...: ${Thread.currentThread().name}")
}

suspend fun longRunningTask(){
    println("executing longRunningTask on...: ${Thread.currentThread().name}")
    delay(1000)  // simulating the slow behavior by adding a delay
    println(
     "longRunningTask ends on thread ...: ${Thread.currentThread().name}")
}

Давайте разберемся, что делает этот код: launch{} функция запускает новую корутину, которая выполняется одновременно с остальной частью кода.

runBlocking{} также запускает новую корутину, но блокирует текущий поток: main на время вызова, пока весь код внутри runBlocking{} тела функции не завершит свое выполнение.

longRunningTask функция называется приостанавливающей функцией. Он приостанавливает корутину, не блокируя базовый поток, но позволяет другим корутинам запускаться и использовать базовый поток для своего кода.

Мы узнаем больше о запуске новых корутин с помощью таких функций, как launch{} и runBlocking{}, в следующем разделе, посвященном сборщикам и областям действия корутин.

Когда мы запустим эту программу, мы получим следующий результат:

My program runs...: main
My program run ends...: main
executing longRunningTask on...: main
longRunningTask ends on a thread ...: main

Process finished with exit code 0

Из этого вывода мы видим, что программа выполняется в потоке с именем main. Он не ждет завершения longRunningTask, переходит к выполнению следующего оператора и печатает My program run ends...: main. Корутина выполняется одновременно в одном и том же потоке, как мы можем видеть из вывода двух операторов печати в функции longRunningTask.

Далее мы разберемся с различными компонентами корутины в следующих разделах.

Знакомство с функциями приостановки

Функция приостановки является основным строительным блоком корутины. Это похоже на любую другую обычную функцию, которая может при необходимости принимать один или несколько входных данных и возвращать выходные данные. Поток, выполняющий обычную функцию, блокирует запуск других функций до завершения выполнения. Это приведет к отрицательному воздействию на производительность, если функция является длительно выполняемой функцией, которая, вероятно, извлекает данные с помощью внешнего API по сети.

Чтобы смягчить это, нам нужно изменить обычную функцию на приостанавливающую функцию и вызывать ее из области корутины. Вызов функции приостановки приостановит/приостановит функцию и позволит потоку выполнять другие действия. Приостановленная/приостановленная функция может возобновиться через некоторое время и работать в том же или другом потоке.

Синтаксис приостанавливающей функции также аналогичен синтаксису обычной функции с добавлением ключевого слова, suspend как показано ниже:

suspend fun longRunningTask(){
    ...
    ...
}

Функции, отмеченные ключевым словом suspend, преобразуются во время компиляции и становятся асинхронными. Давайте рассмотрим пример вызова приостанавливающей функции вместе с некоторыми обычными функциями:

fun main() = runBlocking{
    println("${Instant.now()}: 
        My program runs...: ${Thread.currentThread().name}")

    val productId = findProduct()

    launch (Dispatchers.Unconfined) { // start a coroutine
        val price = fetchPrice(productId) // call the suspending function
    }
    updateProduct()

    println("${Instant.now()}: My program run ends...: " +
            "${Thread.currentThread().name}")
}

suspend fun fetchPrice(productId: String) : Double{
    println("${Instant.now()}: fetchPrice starts on...: 
        ${Thread.currentThread().name} ")
    delay(2000) // simulate the slow function by adding a delay
    println("${Instant.now()}: fetchPrice ends on...: 
        ${Thread.currentThread().name} ")
    return 234.5
}

fun findProduct() : String{
    println("${Instant.now()}: 
        findProduct on...: ${Thread.currentThread().name}")
    return "P12333"
}

fun updateProduct() : String{
    println("${Instant.now()}: 
        updateProduct on...: ${Thread.currentThread().name}")
    return "Product updated"
}

Как мы видим в этом примере, функции findProduct() и updateProduct() являются обычными функциями. Функция fetchPrice() представляет собой медленную функцию, которую мы смоделировали, добавив функцию delay().

В main() функции мы сначала вызываем findProduct() функцию, а затем вызываем fetchPrice()функцию приостановки с помощью launch{} функции. После приостановки выполнение корутины возобновляется в потоке. После этого вызываем updateProduct() функцию.

Функция launch{} запускает корутину, как объяснялось ранее. Мы передаем диспетчер корутины: Dispatchers.Unconfined функции launch, которая управляет потоками, в которых корутина запускается и возобновляется. В последующих разделах мы узнаем больше о диспетчерах корутин.

Давайте запустим эту программу, чтобы увидеть, как корутина приостанавливает работу и позволяет потоку выполнять другие обычные функции:

2022-06-24T04:09:40..: My program runs...: main
2022-06-24T04:09:40..: findProduct on...: main
2022-06-24T04:09:40..: fetchPrice starts on...: main
2022-06-24T04:09:40..: updateProduct on...: main
2022-06-24T04:09:40..: My program run ends...: main
2022-06-24T04:09:42..: fetchPrice ends on.: kotlinx.coroutines.DefaultExecutor

Process finished with exit code 0

Как мы видим из вывода, функции findProduct() и updateProduct() вызываются в main потоке. Функция fetchPrice() запускается в main потоке и приостанавливается, чтобы обеспечить выполнение функций findProduct() и updateProduct() в mainпотоке. Функция fetchPrice() возобновляет работу в другом потоке для выполнения println() оператора.

Также важно понимать, что приостанавливающие функции могут быть вызваны только другой приостанавливающей функцией или из корутины. Функция delay(), вызываемая внутри fetchPrice() функции, также является функцией приостановки, предоставляемой библиотекой kotlinx-coroutines-core.

Области действия и построители корутин

Как объяснялось в предыдущих разделах, мы можем запускать функции приостановки только в областях корутин, запущенных такими разработчиками корутин, как launch{}.

Мы используем построитель корутин, чтобы запустить новую корутину и установить соответствующую область, чтобы ограничить время жизни корутины. Область действия корутины предоставляет методы жизненного цикла корутин, которые позволяют нам запускать и останавливать их.

Давайте разберемся с тремя сборщиками корутин в Котлине: runBlocking{}, launch{}и async{}:

Запуск корутин путем блокировки текущего потока с помощьюrunBlocking

Корутины более эффективны, чем потоки, поскольку они приостанавливаются и возобновляются, а не блокируют выполнение. Однако в некоторых конкретных случаях нам необходимо блокировать потоки. Например, в main()функции нам нужно заблокировать поток, иначе наша программа завершится, не дождавшись завершения корутин.

Построитель runBlocking корутины запускает корутину, блокируя текущий исполняемый поток до тех пор, пока весь код в корутине не будет завершен.

Сигнатура runBlocking функций выглядит так:

expect fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, 
                           block: suspend CoroutineScope.() -> T): T

Функция принимает два параметра:

  1. context: Предоставляет контекст корутины, представленной интерфейсом CoroutineContext, который представляет собой индексированный набор Element экземпляров.
  2. block: код корутины, которая вызывается. Требуется тип функции:suspend CoroutineScope.() -> Unit

Построитель runBlocking{} корутин предназначен для соединения обычного кода блокировки с библиотеками, написанными в стиле приостановки. Так что наиболее подходящая ситуация для использования runBlocking{} в основных функциях и в тестах JUnit.

Функция runBlocking{}, вызываемая из main()функции, выглядит следующим образом:

fun main() = runBlocking{
    ...
    ...
}

Мы использовали runBlocking{}блокировку выполнения всех main()функций в наших предыдущих примерах.

Поскольку runBlocking{} блокирует исполняемый поток, он редко используется внутри кода в телах функций, поскольку потоки являются дорогостоящими ресурсами, и их блокировка неэффективна и нежелательна.

Запуск корутин в Fire and Forget режиме сlaunch

Функция launch{} запускает новую корутину, которая не вернет никакого результата вызывающей стороне. Он не блокирует текущий поток. Сигнатура функции launch{}:

fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext, 
    start: CoroutineStart = CoroutineStart.DEFAULT, 
    block: suspend CoroutineScope.() -> Unit
): Job

Функция принимает три параметра и возвращает Jobобъект:

  1. context: Предоставляет контекст корутины, представленной интерфейсом CoroutineContext, который представляет собой индексированный набор Element экземпляров.
  2. start: опция запуска корутины. Значение по умолчанию — CoroutineStart.DEFAULTнемедленное планирование выполнения корутины. Мы можем установить опцию start для CoroutineStart.LAZY ленивого запуска корутины.
  3. block: код корутины, которая вызывается. Требуется тип функции:suspend CoroutineScope.() -> Unit

Новая корутина, начавшая использовать launch{} функцию, выглядит следующим образом:

fun main() = runBlocking{
    println("My program runs...: ${Thread.currentThread().name}")

    // calling launch passing all 3 parameters
    val job:Job = launch (EmptyCoroutineContext, CoroutineStart.DEFAULT){
        longRunningTask()
    }

    // Another way of calling launch passing only the block parameter
    // context and start parameters are set to their default values
    val job1:Job = launch{longRunningTask()} 
    
    job.join()

    println("My program run ends...: ${Thread.currentThread().name}")
}

suspend fun longRunningTask(){
    println("executing longRunningTask on...: ${Thread.currentThread().name}")
    delay(1000)
    println("longRunningTask ends on thread ...: 
        ${Thread.currentThread().name}")
}

Здесь launch{}функция вызывается внутри runBlocking{} функции. Функция launch{} запускает корутину, которая выполнит функцию longRunningTaskи Jobнемедленно вернет объект в качестве ссылки.

Мы вызываем join() метод этого Job объекта, который приостанавливает корутину, оставляя текущий поток свободным делать все, что ему заблагорассудится (например, выполнение другой корутины).

Мы также можем использовать Job объект для отмены корутины при отмене результирующего задания.

Вернуть результат приостановки функции в запускающий поток с помощьюasync

Это async еще один способ запустить корутину. Иногда, когда мы запускаем корутину, нам может потребоваться вернуть значение из этой корутины обратно в поток, который ее запустил.

async запускает корутину параллельно, аналогично запуску. Но он ожидает завершения одной корутины, прежде чем запускать другую корутину. Подпись async показана ниже:

fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext, 
    start: CoroutineStart = CoroutineStart.DEFAULT, 
    block: suspend CoroutineScope.() -> T
): Deferred<T>

Функция async{} принимает те же три параметра, что и launch{} функция, но возвращает Deferred<T>экземпляр вместо Job. Мы можем получить результат вычислений, выполненных в корутине, из экземпляра, Deferred<T> вызвав await() метод.

Мы можем использовать async, как показано в этом примере:

fun main() = runBlocking{
    println("program runs...: ${Thread.currentThread().name}")

    val taskDeferred = async {
        generateUniqueID()
    }

    val taskResult = taskDeferred.await()

    println("program run ends...:  
        ${taskResult}  ${Thread.currentThread().name}")
}

suspend fun generateUniqueID(): String{
    println("executing generateUniqueID on...: 
        ${Thread.currentThread().name}")
    delay(1000)
    println("generateUniqueID ends on thread ...: 
        ${Thread.currentThread().name}")

    return UUID.randomUUID().toString()
}

В этом примере мы генерируем уникальный идентификатор в приостанавливающей функции: generateUniqueIDкоторая вызывается из корутины, начинающейся с async. Функция async возвращает экземпляр Deffered<T>. Тип T указан Unit по умолчанию.

Здесь тип — Tтак String как функция приостановки generateUniqueID возвращает значение типа String.

Далее мы вызываем await() метод отложенного экземпляра: taskDeferred для извлечения результата.

Запустив программу, мы получим следующий результат:

program runs...: main
executing generateUniqueID on...: main
generateUniqueID ends on thread ...: main
program run ends...:  f18ac8c7-25ef-4755-8ab8-73c8219aadd3  main

Process finished with exit code 0

Здесь мы можем увидеть результат приостановленной функции, напечатанный на выходе.

Диспетчеры корутин: определение потока для запуска корутины

Диспетчер корутин определяет поток или пул потоков, которые соответствующая корутина использует для своего выполнения. Все корутины выполняются в контексте, представленном интерфейсом CoroutineContext. Это CoroutineContext индексированный набор элементов, доступный внутри корутины через свойство: CoroutineContext. Диспетчер корутин является важным элементом этого индексированного набора.

Диспетчер корутины может ограничить выполнение корутины определенным потоком, отправить ее в пул потоков или разрешить ее неограниченное выполнение.

Как мы видели в предыдущем разделе, все разработчики корутин любят launch{} и async{} принимают необязательный CoroutineContext параметр в своей сигнатуре:

fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext, 
    start: CoroutineStart = CoroutineStart.DEFAULT, 
    block: suspend CoroutineScope.() -> T
): Deferred<T>

Используется CoroutineContext для явного указания диспетчера новой корутины. Kotlin имеет несколько реализаций, CoroutineDispatchersкоторые мы можем указать при создании корутин с помощью таких конструкторов корутин, как launch и async. Давайте посмотрим на некоторые из часто используемых диспетчеров:

Наследование диспетчера от родительской корутины

Когда launch{}функция используется без параметров, она наследует CoroutineContext(и, следовательно, диспетчер) от функции, CoroutineScope из которой она запускается. Давайте понаблюдаем за этим поведением на примере ниже:

fun main() = runBlocking {
    launch {
        println(
           "launch default: running in thread ${Thread.currentThread().name}")
        longTask()
    }
}

suspend fun longTask(){
    println("executing longTask on...: ${Thread.currentThread().name}")
    delay(1000)
    println("longTask ends on thread ...: ${Thread.currentThread().name}")
}

Здесь launch{} построитель корутины наследует контекст и, следовательно, диспетчер runBlocking области корутины, которая работает в main потоке. Следовательно, корутина, запущенная launch{} строителем корутины, также использует тот же диспетчер, который запускает корутину в основном потоке.

Когда мы запускаем эту программу, мы можем наблюдать такое поведение в выводе ниже:

completed tasks
launch default: running in  thread main
executing longTask on...: main
longTask ends on thread ...: main

Process finished with exit code 0

Как мы видим в выводе, корутина, запущенная построителем launch{}корутин, также выполняется в mainпотоке.

Диспетчер по умолчанию для выполнения операций с интенсивным использованием ЦП

Диспетчер по умолчанию используется, когда в области видимости явно не указан другой диспетчер. Он представлен Dispatchers.Default и использует общий фоновый пул потоков. Пул потоков имеет размер, равный количеству ядер на машине, на которой выполняется наш код с минимальным количеством 2потоков.

Давайте запустим следующий код, чтобы проверить это поведение:

fun main() = runBlocking {
    repeat(1000) {
      launch(Dispatchers.Default) { // will get dispatched to DefaultDispatcher
        println("Default  : running in thread ${Thread.currentThread().name}")
            longTask()
        }
    }
}

Вот фрагмент вывода, показывающий потоки, используемые корутиной:

Default  : running in thread DefaultDispatcher-worker-1
Default  : running in thread DefaultDispatcher-worker-2
Default  : running in thread DefaultDispatcher-worker-4
Default  : running in thread DefaultDispatcher-worker-3
Default  : running in thread DefaultDispatcher-worker-5
Default  : running in thread DefaultDispatcher-worker-6
Default  : running in thread DefaultDispatcher-worker-7
Default  : running in thread DefaultDispatcher-worker-8
Default  : running in thread DefaultDispatcher-worker-9
Default  : running in thread DefaultDispatcher-worker-10
Default  : running in thread DefaultDispatcher-worker-3
Default  : running in thread DefaultDispatcher-worker-2
Default  : running in thread DefaultDispatcher-worker-2
Default  : running in thread DefaultDispatcher-worker-6
Default  : running in thread DefaultDispatcher-worker-4

Мы можем видеть 10потоки из пула потоков, используемые для запуска корутин.

Мы также можем использовать limitedParallelism ограничение количества активно выполняемых параллельно корутин, как показано в этом примере:

fun main() = runBlocking {
    repeat(1000) {
        // will get dispatched to DefaultDispatcher with 
        // limit to running 3 coroutines in parallel
        val dispatcher = Dispatchers.Default.limitedParallelism(3)
        launch(dispatcher) {
            println("Default : running in thread ${Thread.currentThread().name}")
            longTask()
        }
    }
}

Здесь мы установили ограничение на параллельное 3выполнение максимального количества корутин.3

Создание новой темы с помощьюnewSingleThreadContext

newSingleThreadContext создает новый поток, который будет предназначен исключительно для запуска корутины. Этот диспетчер гарантирует, что корутина всегда выполняется в определенном потоке:

fun main() = runBlocking {
    launch(newSingleThreadContext("MyThread")) { // will get its own new thread MyThread
        println("newSingleThreadContext: running in  thread ${Thread.currentThread().name}")
        longTask()
    }
    println("completed tasks")
}

В этом примере мы выполняем нашу корутину в выделенном потоке с именем, MyThreadкоторое видно из результатов, полученных при запуске программы:

newSingleThreadContext: running in  thread MyThread

Process finished with exit code 0

Однако выделенный поток — дорогостоящий ресурс. В реальном приложении поток необходимо либо освободить, когда он больше не нужен, с помощью функции close, либо повторно использовать его во всем приложении, сохранив ссылку на него в переменной верхнего уровня.

Запускайте без ограничений сDispatchers.Unconfined

Диспетчер Dispatchers.Unconfinedкорутины запускает корутину в вызывающем потоке, но только до первой точки приостановки. После приостановки он возобновляет выполнение корутины в потоке, который полностью определяется вызванной функцией приостановки.

Давайте изменим наш предыдущий пример, чтобы передать параметр: Dispatchers.Unconfinedв launch{}функцию:

fun main() = runBlocking {
    launch(Dispatchers.Unconfined) { // not confined -- will work with main thread
      println(
        "Unconfined : running in thread ${Thread.currentThread().name}")
      longTask()
    }
    println("completed tasks")
}

Запустив эту программу, мы получим следующий результат:

Unconfined : running in thread main
executing longTask on...: main   // coroutine starts
completed tasks  // printed by main thread with the coroutine suspended
longTask ends on thread ...: kotlinx.coroutines.DefaultExecutor  // coroutine resumes

Process finished with exit code 0

Как мы видим из вывода, корутина запускается в потоке main сразу после ее вызова. Он приостановлен, чтобы позволить main потоку работать. Корутина возобновляет работу в другом потоке: kotlinx.coroutines.DefaultExecutor для выполнения println оператора функции longTask.

Неограниченный диспетчер подходит для корутин, которые не потребляют процессорное время и не обновляют какие-либо общие данные (например, пользовательский интерфейс), ограниченные определенным потоком. Неограниченный диспетчер не следует использовать в общем коде. Это полезно в ситуациях, когда какую-либо операцию в корутине необходимо выполнить немедленно.

Отмена выполнения корутины

Возможно, нам захочется отменить длительные задания до их завершения. Примером ситуации, когда мы захотим отменить задание, будет: когда мы перешли на другой экран в приложении на основе пользовательского интерфейса (например, Android) и больше не заинтересованы в результате длительной функции.

Другой пример: мы хотим выйти из процесса из-за какого-то исключения и хотим выполнить очистку, отменив все длительные задания, которые все еще выполняются.

В более раннем примере мы уже видели функцию, launch{} возвращающую значение Job. Объект Job предоставляет cancel() метод отмены запущенной корутины, который мы можем использовать, как показано в этом примере:

fun main() = runBlocking{
    println("My program runs...: ${Thread.currentThread().name}")

    val job:Job = launch {

        longRunningFunction()
    }
    delay(1500) // delay ending the program
    job.cancel() // cancel the job
    job.join()  // wait for the job to be cancelled

    // job.cancelAndJoin() // we can also call this in a single step

    println(
        "My program run ends...: ${Thread.currentThread().name}")
}

suspend fun longRunningFunction(){
    repeat(1000){ i ->
        println("executing :$i step on thread: ${Thread.currentThread().name}")
        delay(600)
    }
}

В этом примере мы выполняем оператор печати через longRunningFunctionкаждые 600миллисекунды. Это имитирует долго выполняющуюся функцию с 1000шагами и выполняет оператор печати в конце каждого шага. При запуске этой программы мы получаем следующий вывод:

My program runs...: main
executing step 0 on thread: main
executing step 1 on thread: main
executing step 2 on thread: main
My program run ends...: main

Process finished with exit code 0

Мы можем видеть longRunningFunction выполнение шага до 2 и затем остановку после вызова cancelобъекта job. Вместо двух операторов для cancel и join мы также можем использовать функцию расширения Job: cancelAndJoin которая объединяет cancel и join вызывает.

Отмена корутин

Как объяснялось в предыдущем разделе, нам необходимо отменить корутины, чтобы не выполнять больше работы, чем необходимо, для экономии памяти и ресурсов обработки. Нам необходимо убедиться, что мы контролируем жизнь корутины и отменяем ее, когда она больше не нужна.

Код корутины должен взаимодействовать, чтобы его можно было отменить. Нам необходимо убедиться, что весь код в корутине поддерживает отмену, периодически проверяя отмену или перед началом любой длительной задачи.

Существует два подхода к тому, чтобы сделать код корутины отменяемым:

Периодический вызов приостанавливающей функцииyield

Мы можем периодически вызывать функцию приостановки, например, yieldдля проверки статуса отмены корутины и получения потока (или пула потоков) текущей корутины, чтобы позволить другим корутинам выполняться в том же потоке (или пуле потоков):

fun main() = runBlocking{
    try {
        val job1 = launch {
            repeat(20){
                println(
                 "processing job 1: ${Thread.currentThread().name}")
                yield()
            }
        }

        val job2 = launch {
            repeat(20){
                println(
                 "processing job 2: ${Thread.currentThread().name}")
                yield()
            }
        }

        job1.join()
        job2.join()

    } catch (e: CancellationException) {
        // clean up code

    }
}

Здесь мы запускаем две корутины, каждая из которых вызывает yield функцию, позволяющую другой корутине работать в потоке main. Фрагмент вывода запуска этой программы показан ниже:

processing job 1: main
processing job 2: main
processing job 1: main
processing job 2: main
processing job 1: main

Мы можем видеть выходные данные первой корутины, после которой она вызывает yield. Это приостанавливает первую корутину и позволяет запустить вторую корутину. Аналогично, вторая корутина также вызывает yield функцию и позволяет первой корутине возобновить выполнение.

Когда отмена корутины принимается, kotlinx.coroutines.JobCancellationException выдается исключение. Мы можем перехватить это исключение и запустить здесь весь код очистки.

Явно проверьте статус отмены с помощьюisActive

Мы также можем явно проверить статус отмены работающей корутины, isActive свойство расширения которой доступно внутри корутины через CoroutineScope объект:

fun main() = runBlocking{
    println("program runs...: ${Thread.currentThread().name}")

    val job:Job = launch {
        val files = File ("<File Path>").listFiles()
        var loop = 0

        while (loop < files.size-1 ) {
            if(isActive) { // check the cancellation status
                readFile(files.get(++loop))
            }
        }
    }
    delay(1500)
    job.cancelAndJoin()

    println("program run ends...: ${Thread.currentThread().name}")
}

suspend fun readFile(file: File) {
    println("reading file ${file.name}")
    if (file.isFile) {
        // process file
    }
    delay(100)
}

Здесь мы обрабатываем набор файлов из каталога. Мы проверяем статус отмены isActive перед обработкой каждого файла. Свойство isActive возвращается true, когда текущее задание все еще активно (еще не завершено и не отменено).

Заключение

В этой статье мы разобрались в различных способах использования корутин в Котлине. Вот несколько важных моментов, которые следует запомнить:

  1. Корутина — это шаблон проектирования параллелизма, используемый для написания асинхронных программ.
  2. Корутины — это вычисления, которые выполняются поверх потоков, которые можно приостанавливать и возобновлять.
  3. Когда корутина «приостанавливается», соответствующие вычисления приостанавливаются, удаляются из потока и сохраняются в памяти, оставляя поток свободным для выполнения других действий.
  4. Корутины запускаются разработчиками корутин, которые также устанавливают область действия.
  5. launch{}, async{}, и runBlocking{}— это разные типы построителей корутин.
  6. Функция launchвозвращает значение job, используя которое также можно отменить корутину. Функция asyncвозвращает Deferred<T>экземпляр. Мы можем получить результат вычислений, выполненных в корутине, из экземпляра, Deferred<T>вызвав await()метод.
  7. Отмена корутины является кооперативной. Код корутины должен взаимодействовать, чтобы его можно было отменить. В противном случае мы не сможем отменить его в середине выполнения, даже после вызова Job.cancel().
  8. Функция asyncпараллельно запускает корутину, аналогично функции launch{}. Однако он ожидает завершения корутины, прежде чем запускать другую корутину.
  9. Диспетчер корутины определяет поток или потоки, которые соответствующая корутина использует для своего выполнения. Диспетчер корутины может ограничить выполнение корутины определенным потоком, отправить ее в пул потоков или позволить ей выполняться без ограничений.
  10. Корутины более легкие по сравнению с потоками. Поток блокируется, пока корутина приостановлена, оставляя поток для продолжения выполнения, что позволяет использовать один и тот же поток для запуска нескольких корутин.