Večernja gužva: Izgradnja otporne kuhinje


Naš restoran je sada model moderne efikasnosti. Možemo da obrađujemo složene porudžbine i upravljamo stanjem uživo (StateFlow) i događajima (SharedFlow). Ali složimo se, do sada smo radili pod idealnim uslovima.

Šta se dešava kada nastane gužva u subotu uveče?

  • Kuhinja (producer) počinje da izbacuje jela mnogo brže nego što konobari (consumers) mogu da ih isporuče.
  • Specijalizovani zadatak poput sečenja mesa (IO-intenzivan) obavlja se tik do delikatne stanice za serviranje (UI), izazivajući haos :(.
  • Jedno jelo zagori, i ceo švedski sto se zbog toga zatvara.

Da bi preživela gužvu, naša kuhinja mora biti više od efikasne. Mora biti otporna.

Backpressure: Kada je kuvar prebrz


Backpressure je pojava kada proizvođač (producer) emituje stavke brže nego što potrošač (consumer) može da ih obradi. Podrazumevano, Flow je sekvencijalan. Kuvar čeka da konobar isporuči jedno jelo pre nego što počne sa sledećim. Ovo je sigurno, ali ne uvek i najefikasnije.

Pogledajmo primer brzog kuvara i sporog konobara:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
fun brzoPripremiJela(): Flow<Int> = flow {
    repeat(5) { brojJela ->
        println("Kuvar: Pripremam jelo $brojJela")
        // Kuvar je brz
        delay(100)
        emit(brojJela)
    }
}

val pocetnoVreme = System.currentTimeMillis()
brzoPripremiJela().collect { jelo ->
    println("Konobar: Služim jelo $jelo...")
    // Konobar je spor
    delay(500)
}
println("Ukupno vreme: ${System.currentTimeMillis() - pocetnoVreme}ms")

Izlaz:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
Kuvar: Pripremam jelo 0
Konobar: Služim jelo 0...
Kuvar: Pripremam jelo 1
Konobar: Služim jelo 1...
Kuvar: Pripremam jelo 2
Konobar: Služim jelo 2...
Kuvar: Pripremam jelo 3
Konobar: Služim jelo 3...
Kuvar: Pripremam jelo 4
Konobar: Služim jelo 4...
Ukupno vreme: 3000ms
// (5 jela * (100ms kuvanje + 500ms služenje))

Kuvar je neprestano blokiran, čekajući sporog konobara. Ipak možemo to bolje.

Strategija 1: buffer() - Sto za podgrevanje

Operator buffer() pokreće korutinu proizvođača konkurentno sa potrošačem, sa baferom između. Kuvar može da stavlja jela na sto za podgrevanje bez čekanja na konobara.

Buffer
1
2
3
4
5
6
brzoPripremiJela()
    .buffer()
    .collect { jelo -> 
        println("Konobar: Služim jelo $jelo...")
        delay(500)
    }

Izlaz:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
Kuvar: Pripremam jelo 0
Kuvar: Pripremam jelo 1
Kuvar: Pripremam jelo 2
Kuvar: Pripremam jelo 3
Kuvar: Pripremam jelo 4
Konobar: Služim jelo 0...
Konobar: Služim jelo 1...
Konobar: Služim jelo 2...
Konobar: Služim jelo 3...
Konobar: Služim jelo 4...
Ukupno vreme: 2600ms 
// Kuvar završava za ~500ms, konobaru treba ~2500ms. Mnogo brže!

Kuvar završava kuvanje skoro trenutno, a ukupno vreme sada zavisi samo od sporog konobara.

Strategija 2: conflate() - Tabla “Današnja preporuka”

Šta ako nam je stalo samo do najnovije vrednosti? conflate() je strategija gde spori potrošač preskače međuvrednosti. Ako kuvar stavi tri nova jela dok je konobar zauzet, konobar će ignorisati prva dva i isporučiti samo poslednje.

Conflate
1
2
3
4
5
6
7
brzoPripremiJela()
    // Isporuči samo poslednje jelo
    .conflate()
    .collect { jelo ->
        println("Konobar: Služim jelo $jelo...")
        delay(500)
    }

Izlaz:

1
2
3
4
5
6
7
8
9
Kuvar: Pripremam jelo 0
Kuvar: Pripremam jelo 1
Kuvar: Pripremam jelo 2
Kuvar: Pripremam jelo 3
Kuvar: Pripremam jelo 4
Konobar: Služim jelo 0... // Dok se ovo završi, kuvar je već kod jela 4
Konobar: Služim jelo 4... // Preskače 1, 2 i 3
Ukupno vreme: 1100ms
// Super brzo, ali sa gubitkom podataka

Strategija 3: collectLatest() - Polje za pretragu

Ovaj kolektor obrađuje samo poslednju vrednost, ali ide korak dalje. Ako nova vrednost stigne dok se prethodna obrađuje, on otkazuje stari blok za obradu i počinje iznova sa novom vrednošću. Ovo je savršen obrazac za obradu brzih UI događaja, poput upita u polju za pretragu.

CollectLatest
1
2
3
4
5
6
7
brzoPripremiJela()
    // Ovo je kolektor, ne operator!
    .collectLatest { jelo ->
        println("Konobar: Uzimam jelo $jelo...")
        delay(500)
        println("Konobar: ZAVRŠIO sa posluživanjem jela $jelo.")
    }

Izlaz:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
Kuvar: Pripremam jelo 0
Konobar: Uzimam jelo 0...
Kuvar: Pripremam jelo 1
Konobar: Uzimam jelo 1... // Otkazuje posluživanje jela 0
Kuvar: Pripremam jelo 2
Konobar: Uzimam jelo 2... // Otkazuje posluživanje jela 1
Kuvar: Pripremam jelo 3
Konobar: Uzimam jelo 3... // Otkazuje posluživanje jela 2
Kuvar: Pripremam jelo 4
Konobar: Uzimam jelo 4... // Otkazuje posluživanje jela 3
Konobar: ZAVRŠIO sa posluživanjem jela 4.
// Samo se poslednji završava!
Ukupno vreme: 1000ms

flowOn(): Održavanje reda u kuhinji


Podrazumevano, proizvođač i kolektor se izvršavaju u istoj korutini i na istom tredu. Ovo može biti problem ako kuvar obavlja težak posao (poput sečenja mesa na IO tredu) koji ne bi trebalo da se dešava na delikatnoj stanici za serviranje (Main UI tred).

Operator flowOn() menja kontekst izvršavanja za upstream kod (proizvođača i sve operatore pre njega).

FlowOn
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
flow {
    println("Kuvar: Pripremam sastojke. [Tred: ${Thread.currentThread().name}]")
    emit("Jelo")
}
.map { jelo -> 
    println("Pomoćni kuvar: Dekorišem jelo. [Tred: ${Thread.currentThread().name}]")
    "$jelo sa dekoracijom"
}
.flowOn(Dispatchers.IO)

// Sve IZNAD se izvršava na IO Dispatcher-u

.collect { jelo ->
    println("Konobar: Služim '$jelo'. [Tred: ${Thread.currentThread().name}]")
}

Izlaz:

1
2
3
Kuvar: Pripremam sastojke. [Tred: DefaultDispatcher-worker-1]
Pomoćni kuvar: Dekorišem jelo. [Tred: DefaultDispatcher-worker-1]
Konobar: Služim 'Jelo sa dekoracijom'. [Tred: main]

flowOn deluje kao granica. Težak kuhinjski posao ostaje van glavnog treda.

catch(): Obrada zagorelog jela


Šta se dešava ako nešto krene po zlu u strimu? Podrazumevano, izuzetak će prekinuti flow i srušiti kolektor.

1
2
3
4
5
6
7
8
flow {
    emit("Salata")
    emit("Hleb")
    throw RuntimeException("Zagoreo biftek!")
    emit("Dezert")
}
// Ovo bi se srušilo!
.collect { jelo -> println("Uživam u: $jelo") }

Operator catch pruža deklarativan način za obradu upstream izuzetaka.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
flow {
    emit("Salata")
    emit("Hleb")
    throw RuntimeException("Zagoreo biftek!")
}
.catch { e -> 
    println("Inspektor: Uhvaćen problem! ${e.message}")
    // Moguće je emitovati i zamensku vrednost
    emit("Kolačići kao kompenzacija")
}
.collect { jelo -> 
    println("Gost: Uživam u: $jelo") 
}

Izlaz:

1
2
3
4
Gost: Uživam u: Salata
Gost: Uživam u: Hleb
Inspektor: Uhvaćen problem! Zagoreo biftek!
Gost: Uživam u: Kolačići kao kompenzacija

Strim se nije srušio. Greška je elegantno obrađena.

Napomena: catch može da obradi samo izuzetke iz upstream operatora. Ne može da uhvati izuzetak u samom collect bloku!

Zaključak

Kuhinja je sada zvanično otporna i spremna za večernju gužvu.

  • Strategije za Backpressure: Upravlja brzim proizvođačima i sporim potrošačima pomoću buffer-a (konkurentnost), conflate-a (poslednja vrednost) i collectLatest-a (poništivi rad).
  • flowOn: Dodeljuje određene delove strima ispravnom kontekstu, održavajući kod organizovanim i UI responzivnim.
  • catch: Koristi se za deklarativnu obradu grešaka unutar strima, sprečavajući rušenja i omogućavajući oporavak.

Šta nas čeka u petom delu?

Dizajnirali smo neverovatan restoran, obučili osoblje i izgradili otporne sisteme. Ali kako da dokažemo da sve funkcioniše bez otvaranja za goste? Kako možemo biti sigurni da je kuvarov tajming ispravan i da je logika konobara dobra?

U poslednjem delu, zaronićemo u ključnu temu Testiranja korutina i Flow-ova. Istražićemo biblioteku kotlinx-coroutines-test, naučiti kako da kontrolišemo virtuelno vreme i pišemo stabilne, pouzdane testove za naš asinhroni svet.