티스토리 뷰

Kotlin

Kotlin Flow의 개념

ylog 2024. 1. 13. 22:26

이 글은 Kotlin Flow for Android: Getting Started 의 일부를 번역한 글입니다.

Flow

 

Kotlin Flow는 JetBrains가 만든 stream processing API다. Kotlin Flow는 Kotlin Coroutines 를 기반으로 제작되었다.

값들의 스트림을 다루는데 Flow를 사용함으로써 복잡한 멀티쓰레드 안에 있는 데이터를 변형시킬 수 있다.

이 과정에서 다루는 것

  • Data collections and streams.
  • Synchronous and asynchronous API calls.
  • Hot and cold data streams.
  • Exception handling during flow processing.

Returning Multiple Values

Suspended functions은 싱글 value를 비동기적으로 반환한다. 이를 이용하면 쓰레드에 대해 고민할 필요가 없다. 코루틴 API가 다 해주기 때문에.

하지만 Flow는 multiple values를 (시간이 지남에 따라) 비동기적으로 반환할 수 있다.

List

suspend fun getValues(): List<Int> {
  delay(1000)
  return listOf(1, 2, 3)
}

fun processValues() {
  runBlocking {
    val values = getValues()
    for (value in values) {
      println(value)
    }
  }
}

fun main() {
  processValues()
}

List는 위 정도로는 사용해도 괜찮지만 위 세 값을 계산하는데 많은 시간이 든다면 적절치 않다. data processing하는데 더 많은 딜레이가 더해진다면 이는 매우 비효율적이다. 따라서 당신은 각 리스트 아이템이 available하게 되면 바로 가공을 시작하고 싶다. Sequence가 이를 해준다.

Sequence

List랑 비슷하지만, 다른 점은 lazily evaluated 된다는 점. 한번에 값을 만드는게 아니라 iterate 할 때마다 값을 만든다.

suspend fun getValues(): Sequence<Int> = sequence {
  Thread.sleep(250)
  yield(1)
  Thread.sleep(250)
  yield(2)
  Thread.sleep(250)
  yield(3)
}

이로써 모든 아이템을 기다리는 대신 getValues 메서드가 만드는 아이템을 processValues가 소비한다.

Sequence는 Iterator를 사용하고, 다음 아이템을 기다리는 동안 block한다. 이는 간단한 리스트를 반환할 때는 좋은데, 만약 당신 애플리케이션이 streaming API와 커뮤니케이션 해야 할 때는 어떨까.

Channel

Streaming API는 REST API와 거의 정반대이다. REST API와 통신할 때는, 당신이 request를 만들고 API는 response를 돌려준다. Streaming API는 이와 다르게 동작한다. 이것은 client와 연결하여, 지속적으로 새로운 정보를 listening한다. (예를 들어, 트위터는 streaming API를 제공하는데, 이를 이용해 실시간으로 트윗을 stream하기 위해 사용할 수 있다.)

Sequence를 synchronous stream을 위해 사용할 순 있지만, asynchronous stream을 위해선 다른 솔루션이 필요하다. 이를 위해 Kotlin Coroutines에서 나온 Channel을 사용해야 한다. 개념적으로, channel은 pipe로 생각할 수 있다. 아이템들을 하나의 파이프를 통해 보내고, 다른 파이프를 통해 리스폰스를 받는다. 하지만 하나의 channel은 값들의 hot stream이다. hot streams는 값들을 즉시 만들기 시작한다.

Hot Versus Cold Streams

Hot stream인 channel은 다른쪽에서 listening하지 않아도 값을 생산한다. stream을 리스닝하지 않으면, 값을 잃어버린다.

getValues가 값을 채널을 통해 방출하며 processValues가 1, 2, 3 값을 받고, 아이템 리스닝을 중단한다. 아무도 듣고 있지 않아도 채널은 여전히 아이템을 생산한다. (4의 경우!)

실제로 open network connection을 갖기 위해 channel을 이용할 수 있지만 이는 메모리 릭으로 이어질 수 있다. 또는 channel을 subscribe하는 것을 깜빡 잊어서 값들을 잃어버릴 수 있다.

Hot stream은 값들을 소비하는 곳이 없어도 값들을 push한다. 하지만 Cold Stream은 오직 소비하는 곳이 있을 때만 값을 푸시한다. Kotlin Flow는 cold stream을 구현한 것이다.

Kotlin Flow Basics

Flow는 값들을 비동기적으로 생산하는 stream이다. 게다가 Flow는 내부적으로 coroutines를 이용한다. 그 덕에 structured concurrency의 장점를 즐길 수 있다.

structured concurrency으로 인해, coroutines는 한정된 시간 동안만 살아있다. 이 시간은 CoroutineScope 와 연결되어 있다. 그 안에서 당신의 coroutine이 시작된다.

당신이 scope를 cancel하면, running 중인 coroutines를 방출한다. 이 룰은 Kotlin Flow에도 적용된다. scope를 취소하면 Flow를 방출한다. 메모리를 수동으로 비워주지 않아도 된다.

몇가지 비슷한 점들이 있다. 이들은 observer pattern을 실행하는 방법을 제공한다.

  • LiveData is a simple observable data holder. It’s best used to store UI state, such as lists of items. It’s easy to learn and work with. But it doesn’t provide much more than that .
  • RxJava is a very powerful tool for reactive streams. It has many features and a plethora of transformation operators. But it has a steep learning curve!
  • Flow falls somewhere in between LiveData and RxJava. It’s very powerful but also very easy to use! The Flow API even looks a lot like RxJava!

Flow Builders

Flow를 만들기 위해선 flow builder를 이용해야 한다. flow { … } 가 가장 베이직한 빌더이다.

val namesFlow = flow {
  val names = listOf("Jody", "Steve", "Lance", "Joe")
  for (name in names) {
    delay(100)
    emit(name)
  }
}
  • kotlinx.coroutines package를 임포트 해야 한다.
  • 아래와 같은 방법도 있다.
val namesFlow = flowOf("Jody", "Steve", "Lance", "Joe")
val namesFlow = listOf("Jody", "Steve", "Lance", "Joe").asFlow()

Flow Operators

Intermediate Operators

fun main() = runBlocking {
  namesFlow
      .map { name -> name.length }
      .filter { length -> length < 5 }
    
  println()
}
  • map 은 각 값을 다른 값으로 변형시킨다.
  • filter 는 조건에 맞는 값들을 선택한다.

이렇게 돌려보면 아무 일도 발생하지 않는데, intermediate operator는 cold이기 때문이다. operation은 오직 당신이 terminal operator를 final stream에서 불러와야 실행된다.

Terminal Operators

Flow가 cold이기 때문에, terminal operator가 불려질때까지 Flow는 값을 방출하지 않는다. Termainal operators는 flow의 collection을 시작하는 suspending function이다. terminal operator를 호출하면 intermediate operators를 함께 호출하는 것이다.

fun main() = runBlocking {
  namesFlow
      .map { name -> name.length }
      .filter { length -> length < 5 }
      .collect { println(it) }

  println()
}
  • collect() 가 suspending function이기 때문에, coroutine이나 다른 suspending function 으로부터만 호출될 수 있다. 이 때문에 runBlocking()으로 코드를 감싼 것.
  • collect() 는 가장 베이직한 terminal operator. Flow로부터 값들을 collect하고 그 아이템으로 액션을 실행한다. 여기에선 아이템을 출력했다.
출력값:
4
3

Flow on Android(참고)

날씨앱 만드는 과정인데, 필요한 부분만 정리한다.

이것이 앱 아키텍쳐에 대한 구글의 추천 가이드이다.

repository는 Retrofit을 이용하여 네트워크로부터 데이터를 불러오고, Room Database 안에 데이터를 저장한다. Room, Retrofit은 Kotlin Coroutines를 지원한다. 우리는 Kotlin Flow를 db로부터 ViewModel까지 데이터를 전달하는데 사용할 것이다.

HomeActivity.kt

homeViewModel.fetchLocationDetails(851128)

Room and Flow

기존 프로젝트는 유저만이 data fetching을 트리거할 수 있지만, 이를 3시간마다 디비가 업데이트 되도록 작성할 것이다.

ForecastDao.kt

@Query("SELECT * FROM forecasts_table")
fun getForecasts(): Flow<List<DbForecast>>
  • 테이블이 변화하면 Flow는 신선한 데이터를 방출한다.

WeatherRepository.kt

fun getForecasts(): Flow<List<Forecast>>

WeatherRepositoryImpl.kt

override fun getForecasts() =
    forecastDao
      .getForecasts()
      .map { dbMapper.mapDbForecastsToDomain(it) }
  • map을 사용하여 db model → Forecast 도메인 모델로 변환

HomeViewModel.kt

//1
val forecasts: LiveData<List<ForecastViewState>> = weatherRepository
    //2
    .getForecasts()
    //3
    .map {
      homeViewStateMapper.mapForecastsToViewState(it)
    }
    //4
    .asLiveData()
  1. Activity는 forecasts의 변화를 옵저빙할 것이다. forecasts 는 Flow<List<ForecastViewState>> 타입이였지만, View와 ViewModel 간 커뮤니케이션을 구현하기엔 LiveData가 선호된다. 왜냐하면 LiveData는 내부적으로 라이프사이클 핸들링을 해주기 때문!

Context Preservation and Backpressure

Flow의 collection은 항상 parent coroutine의 context에서만 발생한다. 이를 context preservation이라 한다. 하지만 items를 emit할 때 context를 바꿀 수 있다. emission의 context를 바꾸기 위해서 flowOn() 메서드를 이용할 수 있다.

Flow가 만약 collector가 consume하는 속도보다 빠르게 아이템을 생산할 수도 있는데, 이를 backpressure라고 한다. Kotlin Flow는 코루틴 기반이기에 backpressure를 지원한다. consumer가 suspended state에 있거나 특정 작업을 하느라 바쁘면 producer는 이를 인지하여 아이템을 생산하지 않는다.

Observing Values

HomeActivity.kt

homeViewModel.forecasts.observe(this, Observer {
  forecastAdapter.setData(it)
})

Cancellation

위의 프로젝트에서, Flow collection은 LiveData가 active해질 때 시작된다. 만약 Flow가 끝나기 전에 LiveData가 inactive해 지면 flow collection은 cancel된다. 이 타임아웃전에 다시 액티브되지 않는 한 cancellation은 타임 딜레이 후에 발생한다. 디폴트 값은 5000 밀리초.

LiveData가 취소된 후 다시 액티브해지면 Flow collection도 다시 시작된다.

LiveData active → Flow collection start.

Exceptions

val forecasts: LiveData<List<ForecastViewState>> = weatherRepository
    .getForecasts()
    .map {
      homeViewStateMapper.mapForecastsToViewState(it)
      throw Exception()
    }
    .catch {
      // Log Error
    }
    .asLiveData()
  • catch()는 오직 upstream exceptions만 catch할 수 있다. 즉 catch 위의 operator로부터의 exception만 잡을 수 있다는 것. 따라서 throw를 catch 하기 위해선 catch를 throw 밑에 작성해야 한다.

 

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/09   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30
글 보관함