Rlog

Coroutine DeepDive - 1 본문

Kotlin

Coroutine DeepDive - 1

dev_roach 2022. 8. 18. 07:38
728x90

Coroutine Deep Dive - 1

Purpose

이 게시글은 일단 코루틴을 사용하는데 좀 더 이해도를 높이기 위해 작성된 글이다. 대부분의 내용은 코루틴 라이브러리를 보고 작성되었으니, 어느정도 신뢰하여도 좋다.

What is Coroutine?

코루틴은 일시중단이 가능한 연산의 인스턴스 이다. 여기서 인스턴스라는 말이 중요한데, 일단 이해하지 말고, 왜 인스턴스라고 하지? 라는 고민을 가지고 이 글을 읽어줬으면 한다. 밑에서 다 설명할 것이다. 일시 중단 된 지점으로 부터 후에 다시 재 실행될 수 있으며, 이때 특정 Thread 에 Bound 되어 있는 것이 아닌, 일시중단을한 Thread 와 다른 Thread 가 와서 작업을 재개할 수도 있다.

Coroutine use cases

Coroutine 개발자들이 설명하는 Coroutine 을 사용하기 좋은 상황은 아래와 같다.

  • Channel-based concurrency
  • Actor-based concurrency (Actor Model 을 뜻함.)
  • 유저 상호작용이 필요한 백그라운드 프로세스 (모달을 뛰우는 작업과 같은 상황)
  • 각 Actore 를 State Machine 이 아닌 Sequence 로 구현할때 (요건 잘 모르겠다..)
  • Web application flows

suspend function

suspend function 이란 함수앞에 suspend modifier 가 붙어있는 함수를 뜻한다. 현재 Thread 를 Blocking 하지 않고도 suspend 가 가능함을 의미한다. suspend function 은 일반적인 코드에서 실행시키는 것이 불가능하고, 오직 suspending function 이나 Suspending lambdase 를 통해 실행해야 한다.

suspend fun <T> CompletableFuture<T>.await(): T =
    suspendCoroutine<T> { cont: Continuation<T> ->
        whenComplete { result, exception ->
            if (exception == null) // the future has been completed normally
                cont.resume(result)
            else // the future has completed with an exception
                cont.resumeWithException(exception)
        }
    }

Suspension Point

코루틴이 실행되는 동안 코루틴의 실행이 중단된 지점을 나타내는 Point, 문법적으로 suspension point 는 suspending function 의 호출을 의미하지만, 실질적인 suspension 은 기본 라이브러리가 suspend 를 호출할때 발생합니다. 쉽게 말하면, suspend function 내의 다른 suspend function 이 호출되는 지점이 보통 Suspension point 이다.
- suspending function 은 yield()await() 같은 함수와 같이 suspend modifier 가 붙여져있는 함수를 뜻한다.

- Intellij 기준으로 저 화살표를 보면 쉽다.

Continuation

suspend 된 코루틴의 중단된 지점에서의 상태를 의미한다. 개념적으로는 suspension point 이후로 실행해야할 나머지 Execution 을 나타내준다.

interface Continuation<in T> {
   val context: CoroutineContext
   fun resumeWith(result: Result<T>)
}

전에도 얘기한 적이 있는 것 같은데, Continuation 은 Callback + 기타 실행 정보 라고 생각하는것이 편하다. 라고 했는데 이제 어떻게 Callback 을 구성하는지 알아보도록 하자.

/**  
 * File Read 를 하는 함수  
 */  
suspend fun a() {  
    println("File Reading....")  
}  

fun callback() {  
    println("File read Done")  
}

a 라는 함수는 File 을 읽는 기능을 하는 suspend function 이다. a 라는 함수의 실행이 끝마치면 우리는 Callback 으로 "File read Done" 이라는 메세지를 받아야 한다. 위와 같은 경우를 Continuation 으로 어떻게 표현할 수 있을까? 아래와 같은 코드로 표현할 수 있다.

class ContinuationExample(override val context: CoroutineContext = EmptyCoroutineContext) : Continuation<Unit> {  
    override fun resumeWith(result: Result<Unit>) {  
        callback()  
    }  

}  

suspend fun main() {  
    ::a.startCoroutine(ContinuationExample())  
}

resumeWith 를 통해서 위와 같이 구현이 가능한데, resumeWith 가 completion callback 의 역할을 하기 때문에 위와 같은 형태로 구현이 가능한 것이다. 그렇다면 CoroutineContext 가 어떤것인지 알아보자.

Coroutine Context

Coroutine 을 조금 써봤다면 CoroutineContext 를 만들어 넣어본적이 있을 것이다. 위에서 보여준 EmptyCoroutineContext 도 CoroutineContext 이다. Coroutine Context 는 threading policy, logging, coroutine identity 등의 정보를 저장한다. 쉽게 생각해서 Thread-local 인데 Coroutine 에 Bound 되어 있는 Element Set 이라고 생각하면 된다. CoroutineContext 는 아래와 같은 Interface 를 가지고 있다.

interface CoroutineContext {
    operator fun <E : Element> get(key: Key<E>): E?
    fun <R> fold(initial: R, operation: (R, Element) -> R): R
    operator fun plus(context: CoroutineContext): CoroutineContext
    fun minusKey(key: Key<*>): CoroutineContext

    interface Element : CoroutineContext {
        val key: Key<*>
    }

    interface Key<E : Element>
}

그래서 우리가 아마 많이 보게되는 CoroutineContext 는 대충 아래와 같은 코드 일 것이다.

launch(context = Dispatchers.IO) {   
    // do some api call  
}

그럼 우리가 만약에 Custom 하게 뭔가 Map 을 넘기거고 싶다거나, 특정 객체를 넘기고 싶은 경우에는 어떻게 사용할 수 있을까? 바로 아래와 같은 방법처럼 AbstractCoroutineContextElement 를 통해서 가능하다. 우리가 어떤 Key 를 사용할 것인지 커스텀하게 만들어서 사용할 수 있다.

class User(  
    val name: String,  
    val age: Int  
): AbstractCoroutineContextElement(User) {  
    companion object Key: CoroutineContext.Key<User>  
}  

suspend fun test() = coroutineScope {  
    launch(User(name = "roach", age = 10)) {  
        val userAge = coroutineContext[User]?.age  
        val userName = coroutineContext[User]?.name  
    }  
}

또한 위의 interface 를 보면 알듯이 기본적으로 plus 와 같은 기능이 존재하여, Threading Policy 가 있는 Context 와 합쳐서 사용할 수 있다.

class User(  
    val name: String,  
    val age: Int  
): AbstractCoroutineContextElement(User) {  
    companion object Key: CoroutineContext.Key<User>  
}  

suspend fun test() = coroutineScope {  
    launch(User(name = "roach", age = 10) + Dispatchers.IO) {  
        val userAge = coroutineContext[User]?.age  
        val userName = coroutineContext[User]?.name  
    }  
}

만약 자신이 Coroutine 에 외부 Scope 의 변수를 Closure 처럼 넘겨서 사용하고 있다면, 위와 같은 방법으로 조금 더 좋게 개선하거나, Indetion 을 증가시키지 않을 수 있는 방법일지를 고민해볼 수 있을 것이다. 사실 Coroutine Context 에 관한 더 깊은 내용은 따로 설명해야 할 정도로 분량이 길기 때문에, 이제 위에서 말한 Coroutine 이 왜 suspendable function 의 Instance 인지 확인해보자.

Coroutine

우리가 아까 작성한 코드를 다시 들고와서 이에 관한 내용을 설명해보려고 한다.

suspend fun a() {  
    println("File Reading....")  
}  

suspend fun b() {  
    println("Read Another file")  
}  

suspend fun c() {  
    println("Read Another file")  
}  

fun callback() {  
    println("File read Done")  
}  

class ContinuationExample(override val context: CoroutineContext = EmptyCoroutineContext) : Continuation<Unit> {  
    override fun resumeWith(result: Result<Unit>) {  
        callback()  
    }  

}  

suspend fun main() {  
    ::a.startCoroutine(ContinuationExample())  
    b()  
    c()  
}

main function 을 보면 몇개의 suspend function 을 더 추가했다. 위와 같은 코드는 자바 코드로 어떻게 바뀔까?

image

위와 같이 큰 switch 문으로 변한다. 자세히 보면 switch(label) 을 이용하여 코드를 나눈 것을 확인할 수 있는데, 위에서 설명한 suspension point 를 기반으로 labeling 을 하게 된다. 보면 main function 은 위와 같이 변경되었고, 위의 인스턴스를 실제 Main 문에서 실행하고 있음을 알 수 있다. 

// $FF: synthetic method  
public static void main(String[] var0) {  
   RunSuspendKt.runSuspend(new ContinuationExampleKt$$$main(var0));  
}

우리의 Style 로 표현

위에서 보여준 코드는 Java 로 Decompile 된 코드라 아마도 이해하기 힘들 것이다. 공식문서에 의거해서 우리가 조금 더 이해할 수 있는 코드로 변경해보자. 현재 main 문의 상태는 아래와 같다.

suspend fun main() {  
    ::a.startCoroutine(ContinuationExample())  
    b()  
    c()  

}

만약 위의 코드가 바뀌게 되면 어떻게 될까? 우리가 배운 지식들로 조금 더 알아보기 쉽게 바꿔보자.

fun main() {  
    val mainCoroutine = MainContinuationImpl()  
    mainCoroutine.resumeWith(Result.success(Unit))  
}  

class MainContinuationImpl(  
    var label: Int = 0,  
    override val context: CoroutineContext = EmptyCoroutineContext  
): Continuation<Unit> {  


    override fun resumeWith(result: Result<Unit>) {  
        when(label) {  
            0 -> {  
                label = 1  
                a()  
                callback() // a.resumeWith()  
                this.resumeWith(Result.success(Unit)) // goto loop
            }  
            1 -> {  
                label = 2  
                b()  
                this.resumeWith(Result.success(Unit)) // goto loop  
            }  
            2 -> {  
                c()  
                return  
            }  
        }  
    }  
}

우리의 Main 문은 위와 같이 변했을 것이다. 이제 조금 이해가 가는가? Main 문은 하나의 StateMachine 으로 변했다. 그래서 우리가 Coroutine 을 suspendable function 의 Instance 라고 하는 이유이다. 여기서 코루틴이 시작하게 되면 첫번째로 우리는 resumeWith 를 처음으로 실행해서 label 0 인 곳으로 goto 하게 된다. label 0 에서의 작업을 마치면 우리는 다음 state 인 label 1 로 가기 위해 label 을 1로 만들어주게 된다.

위의 지식을 좀 더 딥다이브 하기 위해서는 선행적으로 CPS Style에 대해서 조금 알면 좋다. 모르면 아래 내글을 봐도 좋고 아니면 다른 글을 찾아서 봐도 좋다.

https://devroach.tistory.com/149

참고

https://github.com/Kotlin/KEEP/blob/master/proposals/coroutines.md#coroutine-context

Github

https://github.com/tmdgusya/analyze-coroutine

 

GitHub - tmdgusya/analyze-coroutine

Contribute to tmdgusya/analyze-coroutine development by creating an account on GitHub.

github.com

 

'Kotlin' 카테고리의 다른 글

Coroutine Series-2) Continuation  (0) 2022.09.28
Coroutine Series-1 ) 코루틴을 왜 사용해야 하는가?  (0) 2022.09.25
CPS Style  (1) 2022.07.31
Hot And Cold Data Stream  (0) 2022.07.16
Kotlin) Channel 을 이용한 Actor Model 구현  (0) 2022.07.15