Понимание Kotlin Coroutines
Coroutines (Корутины) — это шаблон проектирования для написания асинхронных программ для одновременного выполнения нескольких задач.
В асинхронных программах несколько задач выполняются параллельно в отдельных потоках, не дожидаясь завершения других задач. Потоки являются дорогостоящим ресурсом, и слишком большое количество потоков приводит к снижению производительности из-за высокого потребления памяти и использования ЦП.
Корутины — это альтернативный способ написания асинхронных программ, но он гораздо более легкий по сравнению с потоками. Это вычисления, которые выполняются поверх потоков.
Мы можем приостановить корутину, чтобы другие корутины могли работать в том же потоке. Далее мы можем возобновить работу корутины в том же или другом потоке.
Когда корутина приостанавливается, соответствующие вычисления приостанавливаются, удаляются из потока и сохраняются в памяти, оставляя поток свободным для выполнения других действий.
Таким образом, мы можем запускать множество корутин одновременно, используя лишь небольшой пул потоков, тем самым используя очень ограниченные системные ресурсы.
В этом посте мы поймем, как использовать корутины в Котлине.
Запуск параллельной программы с потоком
Давайте начнем с запуска программы, которая будет выполнять некоторые инструкции, а также вызывать долго выполняющуюся функцию:
//Todo: statement1 //Todo: call longRunningFunction //Todo: statement2 ... ...
Если мы последовательно выполним все операторы в одном потоке, longRunningFunction
поток не сможет выполнить оставшиеся операторы, и программа в целом займет много времени для завершения.
Чтобы сделать его более эффективным, мы выполним его longRunningFunction
в отдельном потоке и позволим программе продолжить выполнение в основном потоке:
import kotlin.concurrent.thread fun main() { println("My program runs...: ${Thread.currentThread().name}") thread { longRunningTask() } println("My program run ends...: ${Thread.currentThread().name}") } fun longRunningTask(){ println("executing longRunningTask on...: ${Thread.currentThread().name}") Thread.sleep(1000) println("longRunningTask ends on thread ...: ${Thread.currentThread().name}") }
Здесь мы моделируем длительное поведение, вызывая Thread.sleep()
внутри функции: longRunningTask()
. Мы вызываем эту функцию внутри thread
функции. Это позволит main
потоку продолжить выполнение, не дожидаясь longRunningTask()
завершения функции.
Функция longRunningTask()
будет выполняться в другом потоке, как мы можем наблюдать из вывода операторов println
, запустив эту программу:
My program runs...: main My program run ends...: main executing longRunningTask on...: Thread-0 longRunningTask ends on thread ...: Thread-0 Process finished with exit code 0
Как мы видим в выводе, программа запускается в потоке: main
. Он выполняет longRunningTask()
поток on Thread-0
, но не ждет его завершения и println()
снова переходит к выполнению следующего оператора в потоке: main
. Однако программа завершается кодом выхода 0
только после longRunningTask
завершения выполнения Thread-0
.
В следующих разделах мы изменим эту программу для запуска с использованием корутин.
Добавление зависимостей для корутин
Язык Kotlin предоставляет нам базовые конструкции для написания корутин, но в библиотеке доступны более полезные конструкции, построенные на основе базовых корутин kotlinx-coroutines-core
. Поэтому нам нужно добавить зависимость в kotlinx-coroutines-core
библиотеку, прежде чем начинать писать корутины:
Мы выбрали инструмент сборки Gradle , поэтому зависимость от kotlinx-coroutines-core
библиотеки будет выглядеть следующим образом:
dependencies { implementation 'org.jetbrains.kotlin:kotlin-stdlib' implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.2' }
Здесь мы добавили зависимость от стандартной библиотеки Kotlin и библиотеки kotlinx-coroutines-core
.
Простая корутина в Котлине
Корутины известны как облегченные потоки, что означает, что мы можем запускать код в корутинах аналогично тому, как мы запускаем код в потоках. Давайте изменим предыдущую программу, чтобы она запускала длительную функцию в корутине вместо отдельного потока, как показано ниже:
fun main() = runBlocking{ println("My program runs...: ${Thread.currentThread().name}") launch { // starting a coroutine longRunningTask() // calling the long running function } println("My program run ends...: ${Thread.currentThread().name}") } suspend fun longRunningTask(){ println("executing longRunningTask on...: ${Thread.currentThread().name}") delay(1000) // simulating the slow behavior by adding a delay println( "longRunningTask ends on thread ...: ${Thread.currentThread().name}") }
Давайте разберемся, что делает этот код: launch{}
функция запускает новую корутину, которая выполняется одновременно с остальной частью кода.
runBlocking{}
также запускает новую корутину, но блокирует текущий поток: main
на время вызова, пока весь код внутри runBlocking{}
тела функции не завершит свое выполнение.
longRunningTask
функция называется приостанавливающей функцией. Он приостанавливает корутину, не блокируя базовый поток, но позволяет другим корутинам запускаться и использовать базовый поток для своего кода.
Мы узнаем больше о запуске новых корутин с помощью таких функций, как launch{}
и runBlocking{}
, в следующем разделе, посвященном сборщикам и областям действия корутин.
Когда мы запустим эту программу, мы получим следующий результат:
My program runs...: main My program run ends...: main executing longRunningTask on...: main longRunningTask ends on a thread ...: main Process finished with exit code 0
Из этого вывода мы видим, что программа выполняется в потоке с именем main
. Он не ждет завершения longRunningTask
, переходит к выполнению следующего оператора и печатает My program run ends...: main
. Корутина выполняется одновременно в одном и том же потоке, как мы можем видеть из вывода двух операторов печати в функции longRunningTask
.
Далее мы разберемся с различными компонентами корутины в следующих разделах.
Знакомство с функциями приостановки
Функция приостановки является основным строительным блоком корутины. Это похоже на любую другую обычную функцию, которая может при необходимости принимать один или несколько входных данных и возвращать выходные данные. Поток, выполняющий обычную функцию, блокирует запуск других функций до завершения выполнения. Это приведет к отрицательному воздействию на производительность, если функция является длительно выполняемой функцией, которая, вероятно, извлекает данные с помощью внешнего API по сети.
Чтобы смягчить это, нам нужно изменить обычную функцию на приостанавливающую функцию и вызывать ее из области корутины. Вызов функции приостановки приостановит/приостановит функцию и позволит потоку выполнять другие действия. Приостановленная/приостановленная функция может возобновиться через некоторое время и работать в том же или другом потоке.
Синтаксис приостанавливающей функции также аналогичен синтаксису обычной функции с добавлением ключевого слова, suspend
как показано ниже:
suspend fun longRunningTask(){ ... ... }
Функции, отмеченные ключевым словом suspend, преобразуются во время компиляции и становятся асинхронными. Давайте рассмотрим пример вызова приостанавливающей функции вместе с некоторыми обычными функциями:
fun main() = runBlocking{ println("${Instant.now()}: My program runs...: ${Thread.currentThread().name}") val productId = findProduct() launch (Dispatchers.Unconfined) { // start a coroutine val price = fetchPrice(productId) // call the suspending function } updateProduct() println("${Instant.now()}: My program run ends...: " + "${Thread.currentThread().name}") } suspend fun fetchPrice(productId: String) : Double{ println("${Instant.now()}: fetchPrice starts on...: ${Thread.currentThread().name} ") delay(2000) // simulate the slow function by adding a delay println("${Instant.now()}: fetchPrice ends on...: ${Thread.currentThread().name} ") return 234.5 } fun findProduct() : String{ println("${Instant.now()}: findProduct on...: ${Thread.currentThread().name}") return "P12333" } fun updateProduct() : String{ println("${Instant.now()}: updateProduct on...: ${Thread.currentThread().name}") return "Product updated" }
Как мы видим в этом примере, функции findProduct()
и updateProduct()
являются обычными функциями. Функция fetchPrice()
представляет собой медленную функцию, которую мы смоделировали, добавив функцию delay()
.
В main()
функции мы сначала вызываем findProduct()
функцию, а затем вызываем fetchPrice()
функцию приостановки с помощью launch{}
функции. После приостановки выполнение корутины возобновляется в потоке. После этого вызываем updateProduct()
функцию.
Функция launch{}
запускает корутину, как объяснялось ранее. Мы передаем диспетчер корутины: Dispatchers.Unconfined
функции launch
, которая управляет потоками, в которых корутина запускается и возобновляется. В последующих разделах мы узнаем больше о диспетчерах корутин.
Давайте запустим эту программу, чтобы увидеть, как корутина приостанавливает работу и позволяет потоку выполнять другие обычные функции:
2022-06-24T04:09:40..: My program runs...: main 2022-06-24T04:09:40..: findProduct on...: main 2022-06-24T04:09:40..: fetchPrice starts on...: main 2022-06-24T04:09:40..: updateProduct on...: main 2022-06-24T04:09:40..: My program run ends...: main 2022-06-24T04:09:42..: fetchPrice ends on.: kotlinx.coroutines.DefaultExecutor Process finished with exit code 0
Как мы видим из вывода, функции findProduct()
и updateProduct()
вызываются в main
потоке. Функция fetchPrice()
запускается в main
потоке и приостанавливается, чтобы обеспечить выполнение функций findProduct()
и updateProduct()
в main
потоке. Функция fetchPrice()
возобновляет работу в другом потоке для выполнения println()
оператора.
Также важно понимать, что приостанавливающие функции могут быть вызваны только другой приостанавливающей функцией или из корутины. Функция delay()
, вызываемая внутри fetchPrice()
функции, также является функцией приостановки, предоставляемой библиотекой kotlinx-coroutines-core
.
Области действия и построители корутин
Как объяснялось в предыдущих разделах, мы можем запускать функции приостановки только в областях корутин, запущенных такими разработчиками корутин, как launch{}
.
Мы используем построитель корутин, чтобы запустить новую корутину и установить соответствующую область, чтобы ограничить время жизни корутины. Область действия корутины предоставляет методы жизненного цикла корутин, которые позволяют нам запускать и останавливать их.
Давайте разберемся с тремя сборщиками корутин в Котлине: runBlocking{}
, launch{}
и async{}
:
Запуск корутин путем блокировки текущего потока с помощьюrunBlocking
Корутины более эффективны, чем потоки, поскольку они приостанавливаются и возобновляются, а не блокируют выполнение. Однако в некоторых конкретных случаях нам необходимо блокировать потоки. Например, в main()
функции нам нужно заблокировать поток, иначе наша программа завершится, не дождавшись завершения корутин.
Построитель runBlocking
корутины запускает корутину, блокируя текущий исполняемый поток до тех пор, пока весь код в корутине не будет завершен.
Сигнатура runBlocking
функций выглядит так:
expect fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T
Функция принимает два параметра:
context
: Предоставляет контекст корутины, представленной интерфейсомCoroutineContext
, который представляет собой индексированный наборElement
экземпляров.block
: код корутины, которая вызывается. Требуется тип функции:suspend CoroutineScope.() -> Unit
Построитель runBlocking{}
корутин предназначен для соединения обычного кода блокировки с библиотеками, написанными в стиле приостановки. Так что наиболее подходящая ситуация для использования runBlocking{}
в основных функциях и в тестах JUnit.
Функция runBlocking{}
, вызываемая из main()
функции, выглядит следующим образом:
fun main() = runBlocking{ ... ... }
Мы использовали runBlocking{}
блокировку выполнения всех main()
функций в наших предыдущих примерах.
Поскольку runBlocking{}
блокирует исполняемый поток, он редко используется внутри кода в телах функций, поскольку потоки являются дорогостоящими ресурсами, и их блокировка неэффективна и нежелательна.
Запуск корутин в Fire and Forget
режиме сlaunch
Функция launch{}
запускает новую корутину, которая не вернет никакого результата вызывающей стороне. Он не блокирует текущий поток. Сигнатура функции launch{}
:
fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job
Функция принимает три параметра и возвращает Job
объект:
context
: Предоставляет контекст корутины, представленной интерфейсомCoroutineContext
, который представляет собой индексированный наборElement
экземпляров.start
: опция запуска корутины. Значение по умолчанию —CoroutineStart.DEFAULT
немедленное планирование выполнения корутины. Мы можем установить опцию start дляCoroutineStart.LAZY
ленивого запуска корутины.block
: код корутины, которая вызывается. Требуется тип функции:suspend CoroutineScope.() -> Unit
Новая корутина, начавшая использовать launch{}
функцию, выглядит следующим образом:
fun main() = runBlocking{ println("My program runs...: ${Thread.currentThread().name}") // calling launch passing all 3 parameters val job:Job = launch (EmptyCoroutineContext, CoroutineStart.DEFAULT){ longRunningTask() } // Another way of calling launch passing only the block parameter // context and start parameters are set to their default values val job1:Job = launch{longRunningTask()} job.join() println("My program run ends...: ${Thread.currentThread().name}") } suspend fun longRunningTask(){ println("executing longRunningTask on...: ${Thread.currentThread().name}") delay(1000) println("longRunningTask ends on thread ...: ${Thread.currentThread().name}") }
Здесь launch{}
функция вызывается внутри runBlocking{}
функции. Функция launch{}
запускает корутину, которая выполнит функцию longRunningTask
и Job
немедленно вернет объект в качестве ссылки.
Мы вызываем join()
метод этого Job
объекта, который приостанавливает корутину, оставляя текущий поток свободным делать все, что ему заблагорассудится (например, выполнение другой корутины).
Мы также можем использовать Job
объект для отмены корутины при отмене результирующего задания.
Вернуть результат приостановки функции в запускающий поток с помощьюasync
Это async
еще один способ запустить корутину. Иногда, когда мы запускаем корутину, нам может потребоваться вернуть значение из этой корутины обратно в поток, который ее запустил.
async
запускает корутину параллельно, аналогично запуску. Но он ожидает завершения одной корутины, прежде чем запускать другую корутину. Подпись async показана ниже:
fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T ): Deferred<T>
Функция async{}
принимает те же три параметра, что и launch{}
функция, но возвращает Deferred<T>
экземпляр вместо Job
. Мы можем получить результат вычислений, выполненных в корутине, из экземпляра, Deferred<T>
вызвав await()
метод.
Мы можем использовать async
, как показано в этом примере:
fun main() = runBlocking{ println("program runs...: ${Thread.currentThread().name}") val taskDeferred = async { generateUniqueID() } val taskResult = taskDeferred.await() println("program run ends...: ${taskResult} ${Thread.currentThread().name}") } suspend fun generateUniqueID(): String{ println("executing generateUniqueID on...: ${Thread.currentThread().name}") delay(1000) println("generateUniqueID ends on thread ...: ${Thread.currentThread().name}") return UUID.randomUUID().toString() }
В этом примере мы генерируем уникальный идентификатор в приостанавливающей функции: generateUniqueID
которая вызывается из корутины, начинающейся с async
. Функция async
возвращает экземпляр Deffered<T>
. Тип T
указан Unit
по умолчанию.
Здесь тип — T
так String
как функция приостановки generateUniqueID
возвращает значение типа String
.
Далее мы вызываем await()
метод отложенного экземпляра: taskDeferred
для извлечения результата.
Запустив программу, мы получим следующий результат:
program runs...: main executing generateUniqueID on...: main generateUniqueID ends on thread ...: main program run ends...: f18ac8c7-25ef-4755-8ab8-73c8219aadd3 main Process finished with exit code 0
Здесь мы можем увидеть результат приостановленной функции, напечатанный на выходе.
Диспетчеры корутин: определение потока для запуска корутины
Диспетчер корутин определяет поток или пул потоков, которые соответствующая корутина использует для своего выполнения. Все корутины выполняются в контексте, представленном интерфейсом CoroutineContext
. Это CoroutineContext
индексированный набор элементов, доступный внутри корутины через свойство: CoroutineContext
. Диспетчер корутин является важным элементом этого индексированного набора.
Диспетчер корутины может ограничить выполнение корутины определенным потоком, отправить ее в пул потоков или разрешить ее неограниченное выполнение.
Как мы видели в предыдущем разделе, все разработчики корутин любят launch{}
и async{}
принимают необязательный CoroutineContext
параметр в своей сигнатуре:
fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T ): Deferred<T>
Используется CoroutineContext
для явного указания диспетчера новой корутины. Kotlin имеет несколько реализаций, CoroutineDispatchers
которые мы можем указать при создании корутин с помощью таких конструкторов корутин, как launch
и async
. Давайте посмотрим на некоторые из часто используемых диспетчеров:
Наследование диспетчера от родительской корутины
Когда launch{}
функция используется без параметров, она наследует CoroutineContext
(и, следовательно, диспетчер) от функции, CoroutineScope
из которой она запускается. Давайте понаблюдаем за этим поведением на примере ниже:
fun main() = runBlocking { launch { println( "launch default: running in thread ${Thread.currentThread().name}") longTask() } } suspend fun longTask(){ println("executing longTask on...: ${Thread.currentThread().name}") delay(1000) println("longTask ends on thread ...: ${Thread.currentThread().name}") }
Здесь launch{}
построитель корутины наследует контекст и, следовательно, диспетчер runBlocking
области корутины, которая работает в main
потоке. Следовательно, корутина, запущенная launch{}
строителем корутины, также использует тот же диспетчер, который запускает корутину в основном потоке.
Когда мы запускаем эту программу, мы можем наблюдать такое поведение в выводе ниже:
completed tasks launch default: running in thread main executing longTask on...: main longTask ends on thread ...: main Process finished with exit code 0
Как мы видим в выводе, корутина, запущенная построителем launch{}
корутин, также выполняется в main
потоке.
Диспетчер по умолчанию для выполнения операций с интенсивным использованием ЦП
Диспетчер по умолчанию используется, когда в области видимости явно не указан другой диспетчер. Он представлен Dispatchers.Default
и использует общий фоновый пул потоков. Пул потоков имеет размер, равный количеству ядер на машине, на которой выполняется наш код с минимальным количеством 2
потоков.
Давайте запустим следующий код, чтобы проверить это поведение:
fun main() = runBlocking { repeat(1000) { launch(Dispatchers.Default) { // will get dispatched to DefaultDispatcher println("Default : running in thread ${Thread.currentThread().name}") longTask() } } }
Вот фрагмент вывода, показывающий потоки, используемые корутиной:
Default : running in thread DefaultDispatcher-worker-1 Default : running in thread DefaultDispatcher-worker-2 Default : running in thread DefaultDispatcher-worker-4 Default : running in thread DefaultDispatcher-worker-3 Default : running in thread DefaultDispatcher-worker-5 Default : running in thread DefaultDispatcher-worker-6 Default : running in thread DefaultDispatcher-worker-7 Default : running in thread DefaultDispatcher-worker-8 Default : running in thread DefaultDispatcher-worker-9 Default : running in thread DefaultDispatcher-worker-10 Default : running in thread DefaultDispatcher-worker-3 Default : running in thread DefaultDispatcher-worker-2 Default : running in thread DefaultDispatcher-worker-2 Default : running in thread DefaultDispatcher-worker-6 Default : running in thread DefaultDispatcher-worker-4
Мы можем видеть 10
потоки из пула потоков, используемые для запуска корутин.
Мы также можем использовать limitedParallelism
ограничение количества активно выполняемых параллельно корутин, как показано в этом примере:
fun main() = runBlocking { repeat(1000) { // will get dispatched to DefaultDispatcher with // limit to running 3 coroutines in parallel val dispatcher = Dispatchers.Default.limitedParallelism(3) launch(dispatcher) { println("Default : running in thread ${Thread.currentThread().name}") longTask() } } }
Здесь мы установили ограничение на параллельное 3
выполнение максимального количества корутин.3
Создание новой темы с помощьюnewSingleThreadContext
newSingleThreadContext
создает новый поток, который будет предназначен исключительно для запуска корутины. Этот диспетчер гарантирует, что корутина всегда выполняется в определенном потоке:
fun main() = runBlocking { launch(newSingleThreadContext("MyThread")) { // will get its own new thread MyThread println("newSingleThreadContext: running in thread ${Thread.currentThread().name}") longTask() } println("completed tasks") }
В этом примере мы выполняем нашу корутину в выделенном потоке с именем, MyThread
которое видно из результатов, полученных при запуске программы:
newSingleThreadContext: running in thread MyThread Process finished with exit code 0
Однако выделенный поток — дорогостоящий ресурс. В реальном приложении поток необходимо либо освободить, когда он больше не нужен, с помощью функции close, либо повторно использовать его во всем приложении, сохранив ссылку на него в переменной верхнего уровня.
Запускайте без ограничений сDispatchers.Unconfined
Диспетчер Dispatchers.Unconfined
корутины запускает корутину в вызывающем потоке, но только до первой точки приостановки. После приостановки он возобновляет выполнение корутины в потоке, который полностью определяется вызванной функцией приостановки.
Давайте изменим наш предыдущий пример, чтобы передать параметр: Dispatchers.Unconfined
в launch{}
функцию:
fun main() = runBlocking { launch(Dispatchers.Unconfined) { // not confined -- will work with main thread println( "Unconfined : running in thread ${Thread.currentThread().name}") longTask() } println("completed tasks") }
Запустив эту программу, мы получим следующий результат:
Unconfined : running in thread main executing longTask on...: main // coroutine starts completed tasks // printed by main thread with the coroutine suspended longTask ends on thread ...: kotlinx.coroutines.DefaultExecutor // coroutine resumes Process finished with exit code 0
Как мы видим из вывода, корутина запускается в потоке main
сразу после ее вызова. Он приостановлен, чтобы позволить main
потоку работать. Корутина возобновляет работу в другом потоке: kotlinx.coroutines.DefaultExecutor
для выполнения println
оператора функции longTask
.
Неограниченный диспетчер подходит для корутин, которые не потребляют процессорное время и не обновляют какие-либо общие данные (например, пользовательский интерфейс), ограниченные определенным потоком. Неограниченный диспетчер не следует использовать в общем коде. Это полезно в ситуациях, когда какую-либо операцию в корутине необходимо выполнить немедленно.
Отмена выполнения корутины
Возможно, нам захочется отменить длительные задания до их завершения. Примером ситуации, когда мы захотим отменить задание, будет: когда мы перешли на другой экран в приложении на основе пользовательского интерфейса (например, Android) и больше не заинтересованы в результате длительной функции.
Другой пример: мы хотим выйти из процесса из-за какого-то исключения и хотим выполнить очистку, отменив все длительные задания, которые все еще выполняются.
В более раннем примере мы уже видели функцию, launch{}
возвращающую значение Job
. Объект Job
предоставляет cancel()
метод отмены запущенной корутины, который мы можем использовать, как показано в этом примере:
fun main() = runBlocking{ println("My program runs...: ${Thread.currentThread().name}") val job:Job = launch { longRunningFunction() } delay(1500) // delay ending the program job.cancel() // cancel the job job.join() // wait for the job to be cancelled // job.cancelAndJoin() // we can also call this in a single step println( "My program run ends...: ${Thread.currentThread().name}") } suspend fun longRunningFunction(){ repeat(1000){ i -> println("executing :$i step on thread: ${Thread.currentThread().name}") delay(600) } }
В этом примере мы выполняем оператор печати через longRunningFunction
каждые 600
миллисекунды. Это имитирует долго выполняющуюся функцию с 1000
шагами и выполняет оператор печати в конце каждого шага. При запуске этой программы мы получаем следующий вывод:
My program runs...: main executing step 0 on thread: main executing step 1 on thread: main executing step 2 on thread: main My program run ends...: main Process finished with exit code 0
Мы можем видеть longRunningFunction
выполнение шага до 2
и затем остановку после вызова cancel
объекта job
. Вместо двух операторов для cancel
и join
мы также можем использовать функцию расширения Job: cancelAndJoin
которая объединяет cancel
и join
вызывает.
Отмена корутин
Как объяснялось в предыдущем разделе, нам необходимо отменить корутины, чтобы не выполнять больше работы, чем необходимо, для экономии памяти и ресурсов обработки. Нам необходимо убедиться, что мы контролируем жизнь корутины и отменяем ее, когда она больше не нужна.
Код корутины должен взаимодействовать, чтобы его можно было отменить. Нам необходимо убедиться, что весь код в корутине поддерживает отмену, периодически проверяя отмену или перед началом любой длительной задачи.
Существует два подхода к тому, чтобы сделать код корутины отменяемым:
Периодический вызов приостанавливающей функцииyield
Мы можем периодически вызывать функцию приостановки, например, yield
для проверки статуса отмены корутины и получения потока (или пула потоков) текущей корутины, чтобы позволить другим корутинам выполняться в том же потоке (или пуле потоков):
fun main() = runBlocking{ try { val job1 = launch { repeat(20){ println( "processing job 1: ${Thread.currentThread().name}") yield() } } val job2 = launch { repeat(20){ println( "processing job 2: ${Thread.currentThread().name}") yield() } } job1.join() job2.join() } catch (e: CancellationException) { // clean up code } }
Здесь мы запускаем две корутины, каждая из которых вызывает yield
функцию, позволяющую другой корутине работать в потоке main
. Фрагмент вывода запуска этой программы показан ниже:
processing job 1: main processing job 2: main processing job 1: main processing job 2: main processing job 1: main
Мы можем видеть выходные данные первой корутины, после которой она вызывает yield
. Это приостанавливает первую корутину и позволяет запустить вторую корутину. Аналогично, вторая корутина также вызывает yield
функцию и позволяет первой корутине возобновить выполнение.
Когда отмена корутины принимается, kotlinx.coroutines.JobCancellationException
выдается исключение. Мы можем перехватить это исключение и запустить здесь весь код очистки.
Явно проверьте статус отмены с помощьюisActive
Мы также можем явно проверить статус отмены работающей корутины, isActive
свойство расширения которой доступно внутри корутины через CoroutineScope
объект:
fun main() = runBlocking{ println("program runs...: ${Thread.currentThread().name}") val job:Job = launch { val files = File ("<File Path>").listFiles() var loop = 0 while (loop < files.size-1 ) { if(isActive) { // check the cancellation status readFile(files.get(++loop)) } } } delay(1500) job.cancelAndJoin() println("program run ends...: ${Thread.currentThread().name}") } suspend fun readFile(file: File) { println("reading file ${file.name}") if (file.isFile) { // process file } delay(100) }
Здесь мы обрабатываем набор файлов из каталога. Мы проверяем статус отмены isActive
перед обработкой каждого файла. Свойство isActive
возвращается true
, когда текущее задание все еще активно (еще не завершено и не отменено).
Заключение
В этой статье мы разобрались в различных способах использования корутин в Котлине. Вот несколько важных моментов, которые следует запомнить:
- Корутина — это шаблон проектирования параллелизма, используемый для написания асинхронных программ.
- Корутины — это вычисления, которые выполняются поверх потоков, которые можно приостанавливать и возобновлять.
- Когда корутина «приостанавливается», соответствующие вычисления приостанавливаются, удаляются из потока и сохраняются в памяти, оставляя поток свободным для выполнения других действий.
- Корутины запускаются разработчиками корутин, которые также устанавливают область действия.
launch{}
,async{}
, иrunBlocking{}
— это разные типы построителей корутин.- Функция
launch
возвращает значениеjob
, используя которое также можно отменить корутину. Функцияasync
возвращаетDeferred<T>
экземпляр. Мы можем получить результат вычислений, выполненных в корутине, из экземпляра,Deferred<T>
вызвавawait()
метод. - Отмена корутины является кооперативной. Код корутины должен взаимодействовать, чтобы его можно было отменить. В противном случае мы не сможем отменить его в середине выполнения, даже после вызова
Job.cancel()
. - Функция
async
параллельно запускает корутину, аналогично функцииlaunch{}
. Однако он ожидает завершения корутины, прежде чем запускать другую корутину. - Диспетчер корутины определяет поток или потоки, которые соответствующая корутина использует для своего выполнения. Диспетчер корутины может ограничить выполнение корутины определенным потоком, отправить ее в пул потоков или позволить ей выполняться без ограничений.
- Корутины более легкие по сравнению с потоками. Поток блокируется, пока корутина приостановлена, оставляя поток для продолжения выполнения, что позволяет использовать один и тот же поток для запуска нескольких корутин.