Kotlin Coroutine 을 사용하기 전에

kimji1
18 min readMay 5, 2021

Coroutine 이란

동시 실행 가능한 코드 블럭을 가진다는 점에서 스레드와 유사하지만, 한 스레드에서만 한정되는 것이 아닌 처음 실행된 스레드에서 일시중지 후 다른 스레드에서 다시 시작될 수 있다는 점에서 스레드와 차이가 있다.

Structured Concurrency

명확성, 품질, 개발시간을 개선하기 위한 프로그래밍 패러다임으로 핵심 개념은 명확한 진입 점과 종료점을 갖고 모든 생성 된 스레드가 종료 전에 완료되었는지 확인하는 제어 흐름 구조를 통해 동시 실행 스레드를 캡슐화 하는 것이다. 동시성이 존재하더라도 소스 코드의 구조를 통해 제어 흐름을 쉽게 확인할 수 있다.

코루틴은 코루틴의 생명주기를 제한하는 특정 Coroutine Scope에서 시작하도록 하여 Structured Concurrency의 원칙을 따른다.

Suspending Function

kotlin 에서 function 앞에 `suspend `키워드를 넣어서 만들 수 있다. 일반 함수와의 차이점은 다른 suspending function을 사용하여 코루틴의 실행을 suspending 할 수 있다는 점이다.

Coroutine Builder

runBlocking

내부의 코루틴이 실행 완료될 때 까지 해당 스레드가 blocking 된다는 것을 의미한다

launch

실행된 코루틴을 다룰 수 있는 Job 객체를 반환하고, 이 Job은 완료를 명시적으로 대기하는데에 사용할 수 있다.

async

Deferred 객체를 반환한다. launch와 다른점은 launch는 특정한 반환값을 예상하고 연산을 시작하지 않아 Coroutine을 나타내는 Job을 반환하고, Job.join() 을 호출하여 코루틴이 완료되는 것을 기다릴 수 있다.

  • Deferred : Future나 Promise와 같은 개념으로, 연산 결과를 가진다. 하지만 이 연산 결과를 get 하는 시점까지 연산을 미룬다 — 어느 미래(future)에는 결과가 있다는 것을 보장한다.(promise) — . Job을 확장한 제네릭 타입으로, deferredObject.await() 처럼 await()을 호출하여 값을 얻을 수 있다.

Scope Builder

coroutineScope Builder를 사용하여 고유한 범위를 선언 할 수 있다. 코루틴 스코프를 만들고 시작된 모든 sub coroutine이 완료 될 때까지 완료되지 않는다.

coroutineScope {} 는 모든 하위 코루틴의 실행 완료를 기다린다는 점에서 runBlocking{} 과 비슷하게 보이지만, runBlocking은 suspending 을 위해 현재 스레드를 blocking 하는 반면 coroutineScope는 현재 스레드에서 다른 작업을 하려하면 suspending 된다는 점에서 runBlocking은 일반 함수이고, coroutineScope는 suspend 함수라고 할 수 있다.

Coroutine Scope

새 코루틴을 정의할 때 사용된다. 모든 Coroutine Builder는 Coroutine Scope의 확장이다. Coroutine Scope의 coroutineContext 를 상속받아 모든 element와 cancellation 을 전파한다.

안드로이드에서 다양한 코루틴을 실행하여 asynchronous 한 작업을 할 때, 메모리 릭을 피하기 위해 Activity가 destroy 될 때 모든 코루틴을 취소해야 한다. 안드로이드에서는 캡슐화된 추상 coroutineScope를 제공하여 해당 scope에서 실행하면 라이프 사이클에 따라 취소 되도록 할 수 있다.

Coroutine Context

Coroutine 은 Context를 가지는데 이게 CoroutinContext 타입의 값이다. 여기에는 Coroutine의 Job이나 Dispatcher 등을 포함한다.

Dispatcher

해당 코루틴이 실행을 위해 사용하는 스레드를 결정한다. 특정 스레드로 실행을 제한할 수도 있고, 제한하지 않을 수도 있다. 모든 coroutine builder는 특정 Dispatcher 설정을 위해 CoroutineContext 파라미터를 선택적으로 넘길 수 있다.

Dispatcher는 기본적으로 외부 CoroutineScope에서 상속된다. 특히 runBlocking 코 루틴에 대한 기본 디스패처는 호출자 스레드에 제한되어 있어 상속하면 예측 가능한 FIFO 스케줄링을 사용하여 실행을 해당 스레드로 제한하는 효과가 있다.

다른 코루틴의 CoroutineScope에서 코루틴을 시작하면 coroutineContext을 상속 받고, Job도 상위 코루틴의 하위 Job이 된다. 상위 코루틴이 취소되면 모든 하위 코루틴도 재귀적으로 취소된다. 이 상위-하위 관계는 2가지 방법 중 하나로 명시적으로 재정의 될 수 있다.

  1. 코루틴을 시작할 때 다른 범위가 명시적으로 지정되면 (예 : GlobalScope.launch) 상위 범위에서 Job을 상속하지 않는다.
  2. 다른 Job 객체가 새 코루틴의 컨텍스트로 전달되면 전달된 Job 객체의 하위 Job이 되기 때문에, 상위 Job에 영향을 받지 않는다.

두 경우 모두 코루틴은 시작된 범위에 연결되지 않고 독립적으로 작동한다.

launch {…} 처럼 dispatcher 를 따로 설정하지 않고 호출할 경우 CoroutineScope에서 상속된 context를 갖게 되고, 지금 작업중인 스레드에서 실행된다.

Dispatchers.Unconfined

main 스레드에서 실행되는 것으로 보이는 특수한 dispatcher 인데, 실제 동작방식은 main 스레드에서만 실행되지는 않는다.

Dispatchers.Unconfined로 설정된 코루틴은 해당 코루틴을 호출한 스레드에서 suspension 지점까지 실행된다. suspension 지점 이후 재개되는 때에는 중간에 호출 되었던 suspend 함수에 의해 실행될 스레드가 결정된다. 그렇기 때문에 CPU 시간을 소비하거나 특정 스레드에 제한된 공유 데이터 (예 : UI)를 업데이트하지 않는 코루틴에 적합하다.

Dispatchers.Default

scope에 별다른 Dispatcher를 설정하지 않았을 때 사용되는 기본 dispatcher 이다. 공유 백그라운드 스레드 풀을 사용한다.

newSingleThreadContext

코루틴이 실행할 스레드를 만든다. 전용 스레드는 매우 비싼 리소스이다. 실제 애플리케이션에서는 더 이상 필요하지 않을 때 닫기 함수를 사용하여 해제하거나 최상위 변수에 저장하고 애플리케이션 전체에서 재사용 해야한다.

Flow

Flow

  • 비동기로 계산된 복수의 값을 반환할 때 사용한다.
  • Sequence 처럼 collect와 같은 terminal operator를 호출하기 전에는 아무것도 실행되지 않기 때문에 cold stream 이라고 부른다.
  • flow 블럭에서는 다른 suspend function을 호출할 수 있다는 점이 sequence와의 차이점이다.
  • transform operator를 통해 map, filter와 같은 operator 처럼 원하는 조건을 실행하고 emit을 통해 방출할 수 있다.
  • 호출한 coroutine 의 context를 따르도록 구현되어 비동기 코드에 대한 context를 고민하지 않으면서 caller를 blocking 하지 않고, 빠르게 실행할 수 있다. 연산이 오래걸리는 작업을 실행해야 해서 context를 변경해줘야 한다면withContext(<Dispatcher>) 를 직접 호출하여 context를 변경하면 값을 방출할 수 없고, flowOn(<Dispactcher>) 을 통해 해야한다.
  • launchIn(<CoroutineScope>)terminal operator 를 통해 flow의 실행만 다른 coroutine에서 실행되도록 할 수 있다.
fun simple(): List<Int> = listOf(1, 2, 3)

fun main() {
simple().forEach { value -> println(value) }
}
// sequence
fun simple(): Sequence<Int> = sequence { // sequence builder
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it
yield(i) // yield next value
}
}
fun main() {
simple().forEach { value -> println(value) }
}
// suspend function
suspend fun simple(): List<Int> {
delay(1000) // pretend we are doing something asynchronous here
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
simple().forEach { value -> println(value) }
}
// flow
fun simple(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
// Collect the flow
simple().collect { value -> println(value) }
}
  • [참고] Sequence

코틀린 표준 라이브러리에서 제공하는 collection 으로, Iterable 과 동일한 기능을 제공하지만 multi-step collection 처리가 다르게 구현되어 있다. Iterable에서 multiple step을 처리할 때는 각 처리과정을 전부 완료하고 그 중간 산출결과를 반환한다. sequence는 최대한 lazy 하게 실행되어 연결된 모든 처리 과정의 결과를 요청할 때에만 실행된다. 모든 step이 item 하나 하나씩 처리된다. 반면 Iterable은 모든 item에 대해서 각 step이 완료된 후 다음 step을 진행한다. sequence는 중간산출물을 만들지 않음으로써 전체 처리 과정의 성능을 향상시킨다. 다만 lazy 한 특성으로 더 작은 컬렉션을 처리하거나 더 간단한 계산을 수행 할 때 상당한 overhead가 발생할 수 있으므로 상황에 맞게 sequence / Iterable 중에 선택해서 사용해야 한다.

Buffering

  • buffer() operator를 통해 emitting과 collecting 코드를 동시에 실행할 수 있도록 할 수 있다.
  • conflate() operator를 통해 stream의 중간 item이 너무 느리게 처리되고 마지막(제일 최신) item의 처리가 완료된 경우 완료되지 않은 중간 산출물은 무시할 수 있다.
  • xxxLatest() operator를 통해 stream의 가장 최신 item에 대한 결과물을 받을 수 있다.

Flatten

  • flatMapConcat — 각 flow에 대해 순차적으로 처리한다.
  • flatMapMerge — 각 flow 동시에 실행, concurrency 파라미터를 통해 동시에 처리할 수 있는 최대 수를 정할 수 있다.
  • flatMapLatest — 기존 flow를 cancel하고 새로운 flow를 emit 한다.

Channel

Deferred 는 코루틴 간에 단일 값을 전달할 수 있는 방법이고, Channel은 stream을 전달할 수 있는 방법이다.

개념적으로 BlockingQueue와 유사하다. 다른점은 blocking put 은 suspending send 로, blocking take 는 suspending receive 라는 점이다.

https://play.kotlinlang.org/hands-on/Introduction%20to%20Coroutines%20and%20Channels/08_Channels
  • BlockingQueue : 생산자 스레드는 처리하는 소비자 스레드를 위해 메시지를 전달하는데 블로킹 큐는 두 스레드 사이의 조정자 역할을 한다. 스레드 조건을 사용하여, 큐가 가득 차 있을 때는 생산자 스레드가 새로운 메시지를 큐에 넣을 수 없게 하고 소비자 스레드에는 가져갈 메시지가 있음을 알려준다. 스레드의 차단 뿐만 아니라, 리스트가 가득 차지 않았다던가 비어 있지 않다던가 같은 중요한 상태 변화의 시그널링도 한다.
// Blocking Queue 의 구현

@Synchronized
@Throws(InterruptedException::class)
open fun put(item: Any?) {
while (this.queue.size() === this.limit) {
wait()
}
if (this.queue.size() === 0) {
notifyAll()
}
this.queue.add(item)
}


@Synchronized
@Throws(InterruptedException::class)
open fun take(): Any? {
while (this.queue.size() === 0) {
wait()
}
if (this.queue.size() === this.limit) {
notifyAll()
}
return this.queue.remove(0)
}

Pipelines

pipeline은 하나의 코루틴은 무한히 생산할 수 있고 다른 코루틴들은 stream을 소비할 때 처리를 통해 다른 결과를 도출하는 패턴이다.

fun main() = runBlocking {
var cur = numbersFrom(2) // [1] cur - 2,3,4,5,6,7,8,9,10,...
repeat(10) {
val prime = cur.receive() // [2] prime = 2
println(prime)
cur = filter(cur, prime) // [3] cur -
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // infinite stream of integers from start
}
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
}

[1] 에서 cur — 2,3,4,5,6,7,8,9,10,…

repeat(1) 인 경우
print(2)
[2] 에서 prime = 2,
[3] filter 후 cur — 3,5,7,9,11,…

repeat(2) 인 경우
print(3)
[2] 에서 prime = 3,
[3] filter 후 cur — 5,7,11,…

처럼 실행된고, coroutineContext.cancelChildren() 을 통해 하위 coroutine을 종료할 수 있다.

iterator coroutine builder를 사용하여 통일한 pipeline을 만들 수 있다. produceiterator 로, sendyield 로, receivenext 로, ReceiveChannelIterator 로 변경하고, CoroutineScope와 runBlocking을 지울 수 있다. 하지만 channel을 사용한 pipeline은 Dispatchers.Default context를 사용했을 때 처럼 multiple CPU cores를 사용하여 실행할 수 있다.

Fan in

처리 할 수있는 입력의 수로 하나의 Channel에 복수의 coroutine에서 값을 보내는 경우, 아래 예제처럼 할 수 있다.

fun main() = runBlocking {
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}

Fan out

팬 아웃은 논리 회로에서 하나의 논리 게이트의 출력이 얼마나 많은 논리 게이트의 입력으로 사용되는지에 대해 서술할 때에 쓰인다. 팬 아웃이 크다는 말은 하나의 출력이 많은 논리게이트의 입력으로 사용된다는 뜻이다. -위키백과

복수의 coroutine에서 하나의 Channel로 부터 값을 얻어오는 경우, 아래와 같이 할 수 있다.

fun main() = runBlocking<Unit> {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
}
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}

for loop 로 작성된 위의 예제에서는 한 coroutine이 실패해도 다른 coroutine은 channel 의 처리를 계속할 수 있는데, consumeEach 로 작성된 경우 channel에 대한 정상/비정상 완료인 경우 채널은 소비(취소)된다.

ticker channel

마지막 소비 이후 주어진 delay(ms) 만큼 대기 후 Unit을 생성하는 rendezvous channel

SharedFlow

hot stream

event를 브로드캐스팅 할 때 유용하다. 모든 collector 에 브로드캐스트 방식으로 값을 방출하는 hot flow 이다. hot 이라고 부르는 이유는 active instance 가 collector의 존재와 상관 없이 존재하기 때문이다. (Flow는 cold)

SharedFlow는 완료되지 않는다.

sharedFlow의 collector를 subscriber 라고 한다. sharedFlow의 subscriber는 취소될 수 있으므로 값을 emission 전에 취소가 되었는지 확인해야 한다. 대부분의 terminal operator 는 완료되지 않지만 take, takeWhile과 같은 operator로 완료할 수 있다.

SharedFlow vs BroadcastChannel

개념적으로 BroadcastChannel 과 비슷하고, 미래에는 완전히 대체할 수 있도록 설계되었다. 차이점으로는 SharedFlow가 모든 Channel API 를 구현하지 않아도 되서 간단하고, replay와 buffer overflow 전략을 지원하고, read-only 와 변경가능한 인터페이스를 구분해서 사용할 수 있어 더 명확하다. 마지막으로 BroadcastChannel과 달리 닫을 수 없고 실패를 표현할 수 없어 모든 에러와 완료 신호가 필요하다면 명시적으로 구체화되어야 한다.

Concurrency

별도의 동기화 처리 없이 cocurrent coroutines 를 처리할 수 있고, thread-safe 하다.

  • thread-safe : 멀티 스레드 프로그래밍에서 일반적으로 어떤 함수나 변수, 혹은 객체가 여러 스레드로부터 동시에 접근이 이루어져도 프로그램의 실행에 문제가 없음

StateFlow

가장 최근 상태값을 공유해야할 때 쓰이는 SharedFlow의 특수한 구현으로, SharedFlow 의 특성을 그대로 가진다. 변경 가능 단일 데이터의 update를 읽기 전용 값으로 emit 하는 SharedFlow 이다.

distinctUntilChanged 연산자와 유사한 방식으로 Any.equals 비교를 사용하여 병합된다. 이전과 동일한 새 값은 collector 에 방출하지 않도록 한다.

항상 초기값이 있고, 새 subscriber 에 최신 값을 방출하고, 최신 값을 제외한 다른 값을 버퍼링 하지 않는다.

StateFlow vs ConflatedBroadcastChannel

개념적으로 ConflatedBroadcastChannel과 유사하며, 미래에는 완전히 대체할 수 있도록 설계되었다.

  • emit 되는 모든 데이터에 객체를 할당하는 ConflatedBroadcastCannel과 달리 모든 Channel API 를 구현하지 않아도 되서 간단하고, 빠르고, garbage-free 하다
  • 값이 없을 수도 있는 ConflatedBroadcastCannel과 달리 항상 데이터가 있어서 value property를 안전하게 읽을 수 있다.
  • conflation 동작이 ConflatedBroadcastCannel에서는 reference에 기반하고 있는 것과 달리 distinctUntilChanged 와 같이 equality 에 기반하고 있다.

함께 보면 좋은 글

Substituting Android’s LiveData: StateFlow or SharedFlow?

--

--