Obsah
Kotlin Coroutines
Korutiny umožňujú efektívne spravovať asynchrónné procesy
Coroutine (korutina) je proces, v ktorom príkazy sú vykonávané podľa stanoveného postupu, i keď výsledky týchto operácií sú v samostatných vláknach. Teda umožňujú pozastavenie svojho vykonávania a čakanie na výsledky, prípadne ukončenie korutiny. Táto vlastnosť zjednodušuje písanie kódu, pretože nie je treba definvať Executor či Observer, ktoré v závislosti na zmenách pokračujú vo vykonávaní kódu. Efektivita spracovania pomocou korutin je pri náročnejších procesoch značne vyššia a teda aj rýchlosť je optimalizovaná. Korutiny šetria pamäť pri ich mnohonásobnom spustení, čo sa prejaví na vyššej stabilite aplikácie. Klasické vlákna (Threads) nie sú tak optimalizované.
Následné využitie je teda vhodné hlavne v nádstavbách Kotlinu, napríklad v Kotlin Jetpack Compose, knižniciach pre získavaniu dát z SQLite - Room, vstupno-výstupné operácie, získavanie dát cez API a iné.
V podstate sa jedná o asynchrónné programovanie, kedy kód v coroutine bloku sa vykoná mimo behu hlavného programu.
Zvládnutie práce s korutinami predpokladá znalosti: Kotlin OOP, Kotlin kolekcie a informatívne Java Threads.
Popis
- Slúžia na súčastné načítanie a spracovanie dát
- Sú to ľahké vlákna, teda šetria pamäť a výkon
- Tri spôsoby tvorby coroutines:
- launch {} - jednoduchá tvorba
- async {} - vracajú hodnotu
- runBlocking {} - zablokuje pôvodný threade a po prevedení kódu ho odblokuje
Životný cyklus korutiny
┏━━━━━━━━━━━━┓
┃ NEW ┃ (optional initial)
┗━━━━━━━━━━━━┛
⬇ start
┏━━━━━━━━━━━━┓ ┏━━━━━━━━━━━━┓
┃ CANCELLING ┃◄━┳━━━━━━━━━━━━┃ ACTIVE ┃ (default initial)
┗━━━━━━━━━━━━┛ ┃cancel/fail ┗━━━━━━━━━━━━┛
┃ ┃ ⬇ complete
┃ ┃ ┏━━━━━━━━━━━━┓
┃ ┗━━━━━━━━━━━━┃ COMPLETING ┃
┃ ┗━━━━━━━━━━━━┛
(final)▼ (final)⬇ finish
┏━━━━━━━━━━━━┓ ┏━━━━━━━━━━━━┓
┃ CANCELLED ┃ ┃ COMPLETED ┃
┗━━━━━━━━━━━━┛ ┗━━━━━━━━━━━━┛
linky
- kt.academy - celá kniha : Prečo používať Kotlin Coroutines?
Pridanie závislostí
Postup je aj na githube
build.gradle → dependencies :
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.2"
rôzne vhodné importy:
import kotlin.coroutines.CoroutineContext import kotlin.system.measureTimeMillis // kotlinx import kotlinx.coroutines.async import kotlinx.coroutines.cancel import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.isActive import kotlinx.coroutines.Job import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.time.delay
@OptIn
Deaktivácia upozornení kompilátora na vybrané riziká
@ExperimentalCoroutineApi
Anotácia upozorňuje na deklaráciu funkcie, ktorá môže byť vývojom jazyka pozmenená, čoho následkom kód nemusí byť funkčný, alebo sa stane depracted
@DelicateCoroutinesApi
Anotácia upozorňuje na obozretnosť pri práci s alokovanou pamäťou vo funkcii, teda treba dbať na jej uvoľnenie (napríklad metóda close().
Implementácia
coroutine sa využijú v bloku launch {} vnútri funkcie typu Unit = runBlocking {}. V príklade sú ľahké vlákna ( launch {} ) spustené paralelne a aj ich návratové hodnoty budú prijaté paralélne - nezoradené,
fun main(): Unit = runBlocking { launch { repeat(3) { println("Hello $it") } } // next launch launch { repeat(3) { // ... } } }
runBlocking {}
runBlocking zablokuje thread z ktorého je volaný a vytvorí novú coroutine, v ktorej je možné použiť časovo náročnejšie funkcie. V príklade budú posledné 2 hlášky zobrazené po odblokovaní hlavného vlákna, teda: 1, pauza, 2, 3 :
fun testRunBlocking() { println("1") runBlocking { delay(5000) println("2") } println("3") }
launch {} v runBlocking {}
launch umožní vykonanie procesov v jeho bloku, ale neblokuje procesy za ním. runBlocking pozastaví hlavné vlákno, pokiaľ nie sú procesy v ňom ukončené. Teda zobrazí sa: 1, 3, pauza, 2, pauza, 4, 5 :
fun testRunBlocking() { println("1") runBlocking { launch { delay(5000) println("2") } println("3") delay(10000) println("4") } println("5") }
Pre bližšie objasnenie launch ovplyvní len svoj blok, napríklad zobrazenie pre následujúci príklad by bolo: 3, pauza, 1, pauza, 2 :
runBlocking { launch { delay(5000) println("1") } launch { delay(10000) println("2") } launch { println("3") } }
join() pre launch a Job
Volanie launch má v sebe rozhranie Job, čo je stav vykonávania coroutine. Poradie: 1, pauza, 2, 3 sa docieli zreťazením pomocou job() :
runBlocking { val firstJob = launch { delay(5000) println("1") } val secondJob = launch { delay(10000) println("2") } firstJob.join() secondJob.join() launch { println("3") } }
async {} v runBlocking {}
Blok async na rozdiel od bloku launch vracia hodnotu. To zabezpečuje jeho rozhranie Deffered, ktoré dedí z Job. Metóda .await() vracia hodnotu až po vykonaní coroutine :
fun testRunBlocking() { runBlocking { val firstDeffered = async { delay(3000) return@async 2 // return nie je nutné písať } val secondDeffered = async { delay(6000) 3 } println("Sum: ${firstDeffered.await() + secondDeffered.await()}") } }
Funkcie v runBlocking {}
- delay() - pozastavenie vykonávania a uvoľnenie pre iný proces. Podobný výsledok má
Thread.sleep(), ale to blokuje vlákno úplne - join() - reťazenie launch {} v runBlocking {}
- cancel() - ukončenie pre job // pozri aj CoroutineScope
Dispatchers
Dispatchers automaticky prideľujú vlákna jednotlivým coroutines. Taktiež pomocou dispatchers určujeme na akom type vlákna bude coroutine spustená a tým i rýchlosť jej vykonania. Podobnú funkcionalitu v Java zabezpečujú Executor.
Štyri druhy dispatcher knižnice kotlinx.coroutines:
- Dispatcher.Default - pre náročné procesy pre 2 až všetky jadrá CPU
- Dispatcher.IO - pre prúdy IO (vstupno-výstupné operácie). Nie je obmädzený počtom jadier a defaultne využije do 64 vlákien
- Dispatcher.Main - pre prácu s UI a krátkodobé procesy (JavaFX, Android). Beží na hlavnom vlákne
- Dispatcher.Unconfined - nevyužívame, nieje zviazaný s vláknom, je len pre špecifické prípady (apk môže byť nestabilná)
CoroutineContext môže meniť Dispatchers a ovplyvňovať chovanie coroutine. Predvolene je aktívny Dispatchers.Default
Zmena Dispatchers:
fun main(): Unit = runBlocking { launch(Dispatchers.Default) { println("threadName: ${Thread.currentThread().name}") } } // output: threadName: DefaultDispatcher-worker-1
Dispatcher.Default zdieľa stejné vlákna s Dispatcher.IO, rozdiel je v ich počte.
newSingleThreadContext()
Metóda na vytvorenie vlastného jednovláknového dispatcher. Vracia dispatcher typu ClosableCoroutineDispatcher, ktorý na záver sa ukončí metódou close()
definicia:
@OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class) // before fun declaration val singleThreadDispatcher = newSingleThreadContext("NameOfMyThread") val scope = CoroutineScope(singleThreadDispatcher)
Executors
Ak je treba sa vyhnúť experimentálnemu newSingleThreadContext(), potom s Executors to je možné. V Java sa využíva pre vlákna aj rozhranie Executor, ktoré je podobné Dispatcher. Je možná konverzia ExecutorService na CoroutineDispatcher metódou asCoroutineDispatcher() s návratom typu ExecutorCoroutineDispatcher, ktorý na záver sa ukončí metódou close()
definicia pre viacvláknový proces:
val executor = Executors.newFixedThreadPool(nThreads) val dispatcher = executor.asCoroutineDispatcher() val scope = CoroutineScope(dispatcher)
definicia pre jednovláknový proces:
val executor = Executors.newSingleThreadExecutor() val dispatcher = executor.asCoroutineDispatcher() val scope = CoroutineScope(dispatcher) ... scope.launch { while(isActive) { Thread.sleep(1000) println("cycle at time in launch A: ${LocalTime.now()}") yield() } }
Cyklický jednovláknový proces - môže zablokovať dispatcher ak sa používaThread.sleep, preto je vhodné v závere cyklu použiť metóduyield(), čím sa spustia aj následujúce korutíny vscope. Pri použitídelay()nie je trebayield(), pretože nedochádza k blokácii vlákna.
CoroutineScope
CoroutineScope zabezpečuje štrukturovanú paralélnosť. Obalí coroutines do rámca, na ktorý umožňuje aplikovať metódy:
- cancel() - ukončí všetky coroutines a zabráni spusteniu novej po
cancel()
Každá coroutine v scope môže byť riadená iným Dispatchers.
Implementácia:
fun main(): Unit = runBlocking { val scope = CoroutineScope(Dispatchers.IO) // <======= println("begin process") repeat(100) { scope.launch { delay(6000) // or any suspend fun } } delay(3000) println("break process") scope.cancel() // <======= println("end process") }
nepraktický spôsob:
fun main(): Unit = runBlocking { val jobList = mutableListOf<Job>() // <======= println("begin process") repeat(100) { jobList.add( launch { delay(6000) // or any suspend fun } ) } delay(3000) println("break process") jobList.forEach { it.cancel() // <======= } println("end process") }
isActive
Ak je Coroutine aktívna, status isActice je true a ak je neaktívna, napríklad pozastavená metódou delay() tak vracia false. Metóda Thread.sleep() ponechá Coroutine aktívnu.
Pozastavenie Coroutine pomocou cancel() ak je aplikované delay() generuje výnimku CancellationException a ukončí dannú coroutine a jej potomkov
Ukončenie v coroutineScope {} je možné i vlastnou funkciou s obsahom:
if(!isActive) throw CancellationException()
Sequence
sequence<> je súčasťou štandardnej knižnice. Vracia postupne získané dáta, avšak blokuje hlavné vlákno. Odoslanie dát realizuje metóda yield():
fun main(args: Array<String>) { readData().forEach { // ToDo } } fun readData() = sequence<Int> { while(true) { // get data yield(data) } }
Flow
Vracia postupne získané dáta, avšak blokuje hlavné vlákno. Odoslanie dát realizuje metóda emit():
fun main(): Unit = runBlocking { readData().collect { // ToDo } } fun readData() = flow<ObjectType> { while(true) { // get data emit(data) } }
- neblokuje vlákno
main - nie je treba špecifikovať návratový typ
Cold Flow a Sequence
Sequence a Flow sa označujú cold, teda vykonajú sa pri použití forEach a collect.
asFlow()
metóda asFlow() zjednodušuje zápisy v runBlocking {}:
transform()
metóda transform() prevádza hodnoty z asFlow()
take()
metóda take() limituje interval Flow - povolí n hodnôt od začiatku
fun main() = runBlocking { (1..10).asFlow().take(4).transform { emit(anyOpertionWithitIt(it)) emit(anyOpertionWithitIt2(it)) }.collect { println(it) } } fun anyOpertionWithitIt(i: Int): Int { return i * i } fun anyOpertionWithitIt2(i: Int): Double { return kotlin.math.sqrt(i.toDouble()) }
- Dokumentácia pre Flow
- Iný popis pre Cold Flow and Hot Flow in Android
Channel - predávnie dát
Sprostredkováva komunikáciu medzi coroutines. K tomu Channel poskytuje metódy send() a receive(), pre odoslanie a príjem cez kanál. Pri definícii channel sa uvádza typ dát prenosu.
close()
zabezpečí, že už nebudú príjaté ďalšie objekty na spracovanie, avšk ukončí aj ostané coroutines a spôsobí výnimku kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed, ktorú je nutné odchytiť, aby kontinuita programu nebola narušená
fun main(): Unit = runBlocking { val channel = Channel<Int>() launch { repeat(3) { channel.send(it) delay(1000) } channel.close() } launch { try { while (isActive) { println(channel.receive()) } } catch (_: ClosedReceiveChannelException) { println("ClosedReceiveChannelException captured") } } launch { while(isActive) { delay(1000) println("this coroutine isActive") } } }
cyklus for pre získané objektov
nie je nutné použiť receive() a zachytávať výnimku. Postačí aj z channel získať objekty v cykle for:
fun main(): Unit = runBlocking { val channel = Channel<Int>() launch { repeat(3) { channel.send(it) delay(1000) } channel.close() } launch { for(number in channel) { println(number) } } }
Základné typy Channel
- Rendezvous channel - metóda
send()sa vykoná, až keď ju zavoláreceive(). Toto je default nastavenie. Teda tu obe metódy blokujú vlákno do svojho vykonania. - Unlimited channel -
send()sa vykonáva bez zavolaniareceive(). Naplní tak Channel neobmädzene. Následnereceive()môže preberať hodnoty z naplnenéhochannel - CONFLATED channel - zlučuje volania
send()tak, žereceive()vráti poslednýsend
definicie:
val channelR = Channel<Int>(Channel.RENDEZVOUS) val channelU = Channel<Int>(Channel.UNLIMITED) val channelC = Channel<Int>(Channel.CONFLATED)
kapacita Channel
veľkosť zásobníka pre Channel je možné limitovať pri jeho definícii:
val channel = Channel<Int>(sizeChannel)
následne send() bude vykonávané do limitu sizeChannel a po limite bude receive() čakať na nové vykonanie send()
Modifikácia kapacity Channel
meted na modifikáciu send():
- BufferOverflow.SUSPEND - predvolené chovanie, teda pozastavenie
send()po naplnení zásobníka až do uvoľnenia poreceive() - BufferOverflow.DROP_OLDEST - aj ak nie sú z kanála odobrané hodnoty, tak najstaršia bude odobraná a nová bude pridaná po každom
send() - BufferOverflow.DROP_LATEST -
send()sa vykoná, ale hodnota nebude odoslaná dochannel(v zásobníku budú len prvé hodnoty do jeho limitu)
definicia:
val channel = Channel<Int>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST)
Uzatváranie pomocou onUndeliveredElement
Zabrániť únikom pamäte je možné tak, že uzavrieme všetky zdroje - inštancie.
Môže nastať situácia, že niektoré zdroje nebudú uzatvorené, napríklad pri použití capacity a DROP_OLDEST, preto na tieto nedoručené zdroje pri vytváraní inštancie channel použijeme uzatvorenie:
val channel = Channel<Resource>( capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST, onUndeliveredElement = { resource -> println("Undelivered") resource.close() } )
StateFlow a SharedFlow
Sú určené na spracovanie dátových tokov a pomáhajú riadiť stavy a udalosti reaktívnym spôsobom zohľadňujúcim životný cyklus.
- StateFlow - uchováva stav a poskytuje informácie o ňom pre viacerých sledovateľov
- SharedFlow - určený na zdieľanie udalostí alebo správ medzi viacerými spotrebiteľmi
StateFlow
vlastnosti StateFlow:
- Vždy má aktuálnu hodnotu
- Pozorovatelia získajú najnovšiu hodnotu, keď začnú zbierať údaje
- Je horúci, čo znamená, že je vždy aktívny, aj keď nie sú žiadni pozorovatelia
- Je navrhnutý ako jeden hodnotový tok (ako držiteľ živých údajov)
Používa sa na reprezentáciu stavu používateľského rozhrania, ktorý sa má zobraziť v aplikácii, napríklad ako sú: aktuálne údaje obrazovky, stav prihlásenia alebo zoznam položiek.
Praktické použitie môže pozostávať napríklad z načítania údajov cez API, zobrazenie načítaných údajov a zobrazenie stavu o načítaní.
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun main(): Unit = coroutineScope { val waitTime = 2000L val state = MutableStateFlow("A") println("init value = ${state.value}") // A launch { state.collect { println("Value changed to $it") } } delay(waitTime / 4) launch { state.collect { println("and now it is $it") } } delay(waitTime) state.value = "B" delay(waitTime) state.value = "C" } // launch collect is infinity
SharedFlow
vlastnosti SharedFlow:
- Neobsahuje stav, ale generuje udalosti
- Rovnaké udalosti môžu dostať viacerí zberatelia
- Je horúci, čo znamená, že nie je závislý od zberateľov, aby vysielali udalosti
Používa sa na jednorazové udalosti, ako je zobrazenie upozornenia, prechod na inú obrazovku alebo zobrazenie chybových hlásení
Praktické použitie môže pozostávať napríklad zo zobrazenia udalosti o zmene v zoznamoch a podobne
suspend fun main(): Unit = coroutineScope { val waitTime = 2000L val mutableSharedFlow = MutableSharedFlow<String>(replay = 0) launch { mutableSharedFlow.collect { // <~ println(" ~ 1 received $it") } } launch { delay(waitTime / 4) mutableSharedFlow.collect { // <~ println(" ~ 2 received $it") } } delay(waitTime) mutableSharedFlow.emit("Message 1") // ~> delay(waitTime) mutableSharedFlow.emit("Message 2") // ~> }
- replay obmädzuje počet posledných hodnôt, ale vyrovnávaciu pamäť je možné vynulovať pomocou resetReplayCache
- v prípade že replay nie je nastavené, potom emit() odovzdá následujúcu hodnotu až všetci odberatelia odoberú pôvodnú hodnotu
- ak replay je nastavené, potom emit() pod limitom zásobníka sa vykponá ihneď a hodnoty v limite sa odovzdajú po odoobraní všetkými odberateľmi
@OptIn(ExperimentalCoroutinesApi::class) suspend fun main(): Unit = coroutineScope { val mutableSharedFlow = MutableSharedFlow<String>(replay = 2) launch { mutableSharedFlow.collect { println(" ~ 1 received $it") } } mutableSharedFlow.emit("Message1") // ~> mutableSharedFlow.emit("Message2") // ~> mutableSharedFlow.emit("Message3") // ~> println(mutableSharedFlow.replayCache) // [Message2, Message3] delay(100) mutableSharedFlow.resetReplayCache() println(mutableSharedFlow.replayCache) // [] }
- kotlinlang.org : SharedFlow StateFlow
- developer.android.com : StateFlow and SharedFlow
- medium.com : StateFlow and SharedFlow in Android - vysvetlenie a kompletný príklad
- kt.academy : SharedFlow a StateFLow
Job a Deffered
Umožňujú pristupovať k metódam a funkciám knižnice kotlinx.coroutines. Blok launch {} je inštancia typu Job. Blok async {} je inštancia typu Deffered má návratovú hodnotu. Rozhranie Deffered dedí z rozhrania Job.
Job
Job obsahuje lokálne premenné, miesto pozastavenia, dedenie, kontext s parametrami korutiny a iné. Každá korutina má svoju vlastný job. Job je zrušiteľný a má svoj životný cyklus.
Metóda join() aplikovaná na definovanú coroutine v bloku inej coroutine pozastaví vykonávanie až do ukončenia definovanej.
fun test() { runBlocking { val previousCoroutine = launch { println("previousCoroutine : Begin") delay(1000) println("previousCoroutine : End") } launch { println("nextCoroutine : waiting for previousCoroutine") previousCoroutine.join() println("nextCoroutine : previousCoroutine is ended") } } }
CompletableJob
Rozhranie CompletableJob dedí z Job. Poskytuje metódy complete() a completeExceptionally().
complete()
Ak je aplikovaná metóda invokeOnCompletion() na blok completableJob, tak blok s korutinou bude vykonaný až po volaní completableJob.complete(), teda obdobne ako pri join()
completeExceptionally(exception: Throwable): Boolean
Dokončí úlohu s výnimkou a podradené úlohy budú zrušené.
kotlinlang.org : CompletableJob
Stavy Job
- isActive - true ak nebol dokončený čí zrušený
- isCompleted - true ak bol dokončený čí zrušený
- isCancelled - true ak bol zrušený
Hierarchia coroutines
Každá inštancia Job môže mať potomkov, čím sa vytvorí hierarchia. Rodičovská korutina sa môže ukončiť napríklad metódou complette(), avšak až sa ukončia všetci potomkovia. Hierarchia môže byť stromová.
príklad definicie:
val parentJob = Job() val child1Job = Job(parentJob) val child2Job = Job(parentJob)
ukončenie potomka výnimkou pomocou metódy completeExceptionally() ukončí rovnako i rodiča:
fun test() { val parentJob = Job() val childJob = Job(parentJob) parentJob.invokeOnCompletion { println("Parent: ${it.toString()}") } childJob.invokeOnCompletion { println("Child: ${it.toString()}") } childJob.completeExceptionally(RuntimeException()) }
ukončenie rodiča ho ukončí výnimkou RuntimeException a potomkov výnimkou JobCancellationException
SupervisorJob
Výnimky sa propagujú od potomkov k rodičom. Rozhranie SupervisorJob použijeme, ak nie je žiadúce aby výnimka potomka ovplyvnila rodiča, ani ostatných potomkov. V potomkoch je nutné výnimky ošetriť, inak sa zlyhanie bude šíriť (propagovať) smerom nahor a tak zapríčini zlyhanie celého procesu. SupervisorJob dohliada iba na svojich potomkov v CoroutineScope, avšak supervisorScope {} úplne izoluje zlyhania pre svoj rozsah, teda nie je potrebné ošetrenie výnimiek v potomkoch.
CoroutineScopeajlaunchsú inštancieJob
definicia:
val scope = CoroutineScope(SupervisorJob()) // alebo: val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())
CoroutineExceptionHandler
Element CoroutineExceptionHandler môže byť súčasťou CoroutineContext. Zachytáva všetky výnimky, takže ak je treba zachytiť jednotlivé výnimky, je vhodné použiť try - catch priamo v korutine:
@OptIn(DelicateCoroutinesApi::class) fun main() = runBlocking { val exceptionHandler = CoroutineExceptionHandler { _, exception -> println("Akákoľvek globálna výnimka: ${exception.message}") } val job = GlobalScope.launch(exceptionHandler) { delay(100) throw RuntimeException("Vyskytla sa výnimka!") } job.join() }
Linky
Zvláštne poďakovanie patrí itnetwork.cz za kurzy pre začínajúcich i pokročilých programátorov.
