Kotlin

코드스피츠 6강) 코루틴

dev_roach 2022. 7. 7. 23:07
728x90

Week6

선점형 멀티태스킹

대부분의 OS 는 선점형 멀티태스킹 방식을 취하고 있음. 예를 들면 A 프로세스나 스레드를 스케쥴링 하다가도, B 프로스세스나 스케쥴러가 좀 더 높은 우선순위로 실행되야 한다면,
OS 가 A 프로세스를 중단시키고, B 프로세스를 실행시킬 수 있음

비선점형 멀티태스킹

OS 가 강제로 현재 실행중인 프로그램을 멈출 수 없음. 로드된 프로그램이 종료되어야 다른 프로그램이 실행됨. 보통 경량스레드들이 이에 속함. 보통 하나의 로직이 죽으면 전부 다 죽음.

비선점형 멀티태스킹의 단점

단점은 진짜 동시성이 아니다. 하나의 작업을 여러개의 쓰레드나 프로세서로 분산시킬 수 없음.
-> 각 작업을 길게 쪼개면 각각 조각을 스레드로 분산시킬 수는 있음.

image

위의 사진 처럼 하나의 Job 을 잘게 쪼개서 마치 동시에 실행되는 것처럼 보여줌. Coroutine 도 label 을 붙이면서 label 마다 suspend 될 수 있는데, 그게 요거랑 같은건가보다. 이 생각을 했다. (추측임. 아닐수도).

비선점형 멀티태스킹의 장점

장점또한 마찬가지로 진짜 동시성이 아니다. 동기화 문제가 일어나지 않게 할 수 있음 (일어나게도 할 수 있음). JavaScript 는 SingleThread 가 아니라, Process 를 처리할때 비선점형으로 처리하는 Main Process 가 하나밖에 없기에 Single Thread 라고 하는 것임.

구현체

OS 레벨 - 파이버
SW 레벨 - 그린스레드, 코루틴, 제네레이터 등

코틀린에서는?

sequence, coroutine 을 통해 소프트웨어 스택으로 구현되어 있음.

JAva 에서는?

  • 그린스레드
  • Project loom : OS Level 의 파이버와 연동

CPS (Continuation Passing Style)

Continuation 이란 어떤 Thread 가 Coroutine 에 들어갔다가 나와도, 그 Coroutine 안의 정보들은 Persistence 하게 보존하는 Context

코루틴의 일반적인 구현(함수형 CPS가 원형)

  • 루틴: 진입하면 반드시 반환까지 한 번에 실행됨.
  • 코루틴: 진입한 뒤 중간에 반환하고 다시 그 위치 부터 재실행 가능 (suspend func)

보통 중간 반한되는 포인트마다 묶어서 서브루틴으로 만들어 서브루틴의 배열화 하면 재진입 시 다음 서브루틴을 실행하는 방식으로 처리. 보통 나누는 기준은
yield, await 등 으로 서브루틴을 나눔. 코틀린은 suspend 함수를 호출하는 기준으로 나뉜다.

Linked Task

위의 말을 들어봤을때 하나의 Job 을 세분화 하고, 그걸 마치 동시성으로 실행되는 것처럼 보일 수 있다는 뜻은 결국 Sequential 하게 일어난다는 것인데, 이를 표현하려면 어떻게 해야할까?
아마 SubTask 들이 연결된 LinkedTask 로 표현하면 좋을 것이다.

class Task( ​​​​val run: (Controller) -> Unit ) { ​​​​var isCompleted = false ​​​​var result: Result<Any?>? = null ​​​​var next: Task? = null } class Controller internal constructor( ​​​​private val task: Task ) { ​​​​val data get() = task.result ​​​​fun cancel(throwable: Throwable) { ​​​​​​​​task.next?.result = Result.failure(throwable) ​​​​​​​​task.isCompleted = true ​​​​} ​​​​fun resume(data: Any? = null) { ​​​​​​​​task.next?.result = Result.success(data) ​​​​​​​​task.isCompleted = true ​​​​} }

위의 코드를 보면 resume 이나 cancel 모두 task 는 종료되고, 다음 Task 로 이동되게 된다. 따라서 resume / cancel 모두 next.result 에 현재의 데이터를 넘겨주게 된다.

image

EventLooper

class EventLooper( ​​​​private val dispatcher: Dispatcher ): Runnable { ​​​​private val tasks: Queue<Task> = LinkedList() ​​​​private val currTask: Task? = null ​​​​fun linkedTask(vararg blocks: (Controller) -> Unit) { ​​​​​​​​if (blocks.isEmpty()) return ​​​​​​​​synchronized(tasks) { // multi-Thread 에서 실행될 수 있으므로. ​​​​​​​​​​​​var prev = Task(blocks[0]) ​​​​​​​​​​​​tasks.add(prev) // Add Task to TaskQueue ​​​​​​​​​​​​for (i in 1..blocks.lastIndex) { // Linked Task ​​​​​​​​​​​​​​​​val task = Task(blocks[i]) ​​​​​​​​​​​​​​​​prev.next = task ​​​​​​​​​​​​​​​​prev = task ​​​​​​​​​​​​} ​​​​​​​​} ​​​​} }

일단 Thread 를 어떻게 운영할것인지는 Dispatcher 에게 Runnable Interface 를 구현하여 일임하고, EventLooper 는 TaskQueue 를 받아서 Queue 에서 마지막 작업을 빼내서 currentTask 로 사용하는 구조.

어려워 보이지만, 잘 들여다보면 여러개의 Task Block 을 Linking 하여 결국 Queue 에는 하나의 Task 로 들어간다. SubRoutine + SubRoutine = Routine 요런식이라고 생각하면 될듯하다.

​​​​override fun run() { ​​​​​​​​while (!Thread.currentThread().isInterrupted) { ​​​​​​​​​​​​Thread.sleep(16) // Thread Block 을 막기 위해서 ​​​​​​​​​​​​synchronized(this) { // Task 는 Synchronized 대상 ​​​​​​​​​​​​​​​​if (currTask != null) { // 실행중이라는 뜻. ​​​​​​​​​​​​​​​​​​​​currTask?.let {curr -> ​​​​​​​​​​​​​​​​​​​​​​​​if (curr.isCompleted) { // 현재 Task 가 끝났는지 확인 ​​​​​​​​​​​​​​​​​​​​​​​​​​​​curr.next?.let { tasks.add(it) } // 현재 Task 가 끝났다면 다음 Task 를 Queue 에 집어 넣음. ​​​​​​​​​​​​​​​​​​​​​​​​​​​​currTask = null ​​​​​​​​​​​​​​​​​​​​​​​​} ​​​​​​​​​​​​​​​​​​​​} ​​​​​​​​​​​​​​​​} else { ​​​​​​​​​​​​​​​​​​​​tasks.poll()?.let { ​​​​​​​​​​​​​​​​​​​​​​​​currTask = it ​​​​​​​​​​​​​​​​​​​​​​​​it.run(Controller(it)) ​​​​​​​​​​​​​​​​​​​​} ​​​​​​​​​​​​​​​​} ​​​​​​​​​​​​} ​​​​​​​​} ​​​​}

Dispatcher

interface Dispatcher { ​​​​fun start(looper: EventLooper) ​​​​fun join() } class FixedDispatcher( ​​​​private val threads: Int ) : Dispatcher { ​​​​private val executor = Executors.newFixedThreadPool(threads) ​​​​override fun start(looper: EventLooper) { ​​​​​​​​for (i in 1..threads) { ​​​​​​​​​​​​executor.execute(looper) ​​​​​​​​} ​​​​} ​​​​override fun join() { ​​​​​​​​while (!executor.isShutdown) {} ​​​​} }

FixedDispatcher 는 Thread 가 정해진 Pool 을 만들고, 내부에서 Looper 를 스레드 갯수만큼 돌면서 실행시킴. Looper 의 작업을 여러 스레드가 처리함. 여기서 약간 코루틴의 내부 구조가 잘 이해가 갔는데, 사실 Coroutine 의 suspend function 을 여러 스레드가 Heap 에 있는 Context 를 공유하며 실행시킨 다는 것은 알았는데, 약간 이렇게 코드를 보니 더 새로웠다.

이걸 보면서 느낀게 내가 Dispatcher 를 고르면 그 Dispatcher 가 내 Job 을 실행시킨 다는게 결국 그 Dispatcher 의 Thread 들이 EventLooper 를 돌면서 내 Job 을 실행시키는 거구나. 이런 생각이 들었다.

실제 실행

fun main() { ​​​​val looper = EventLooper(FixedDispatcher(10)) ​​​​for (i in 0..5) { ​​​​​​​​looper.linkedTask({ ​​​​​​​​​​​​println("$i-0 ${Thread.currentThread().id}") ​​​​​​​​​​​​Thread.sleep(i * 100L) ​​​​​​​​​​​​it.resume() ​​​​​​​​}, { ​​​​​​​​​​​​println("$i-0 ${Thread.currentThread().id}") ​​​​​​​​​​​​Thread.sleep(i * 100L) ​​​​​​​​​​​​it.resume() ​​​​​​​​}, { ​​​​​​​​​​​​println("$i-0 ${Thread.currentThread().id}") ​​​​​​​​​​​​Thread.sleep(i * 100L) ​​​​​​​​​​​​it.resume() ​​​​​​​​}) ​​​​} ​​​​looper.launch() ​​​​looper.join() } /** ​* Executed Result 0-0 24 1-0 18 2-0 23 3-0 21 4-0 16 5-0 17 0-0 22 1-0 18 2-0 24 3-0 18 4-0 22 5-0 23 0-0 15 1-0 17 2-0 18 3-0 20 4-0 19 5-0 22 ​*/

위의 Thread Id 를 보면 각각 다른 Thread 들이 실행하거나 또는 같은 Thread 가 실행했음을 알 수 있음. 즉, 하나의 큰 Task 를 여러 Thread 들이 실행가능함. 위에 Thread 가 달라서 Multi-Thread 에서 동시에 실행되는 것처럼 보이지만, 결국 EventQueue 에 쌓인 Task 순서대로 실행됨을 알 수가 있다.

Serial Task

Task 하

/** * Ready 는 딱 생성됬을때 상태 * Mark 는 사용자 요구 사항 마킹 (Confirm 되지 않으면 의미 없음) * Confirm 사용자의 요구 사항 마킹 수용 */ enum class Stat { ​​​​READY, MARK, CONFIRM } class Task internal constructor( ​​​​val run: (Controller) -> Unit ) { ​​​​internal var isStarted = Stat.READY ​​​​internal var isCompleted = Stat.READY ​​​​internal var result: Result<Any?>? = null ​​​​internal var next: Task? = null }

이제는 시작했는지 안했는지 구분값도 설정해줌.

class Controller internal constructor( ​​​​private val task: Task ) { ​​​​val data get() = task.result ​​​​fun cancel(throwable: Throwable) { ​​​​​​​​task.next?.result = Result.failure(throwable) ​​​​​​​​task.isCompleted = Stat.MARK ​​​​} ​​​​fun resume(data: Any? = null) { ​​​​​​​​task.next?.result = Result.success(data) ​​​​​​​​task.isCompleted = Stat.MARK ​​​​} }

MARK 를 하는 이유는 내가 이걸 할꺼야~ 라고 마킹한다고 생각하면 된다. 결국 Thread 가 이를 용인하고 실행시켜준 뒤 Confirm 으로 바꿔주는 구조이다.

class SerialTask( ​​​​private val dispatcher: Dispatcher, ​​​​vararg blocks: (Controller) -> Unit ) : Runnable { ​​​​private val task: Task ​​​​/** * 생성자에서 Block 으로 Task 를 구성해버림. linkedTask 가 오직 SerialTask 가 실행되는 시점에만 사용됨. */ ​​​​init { ​​​​​​​​if (blocks.isEmpty()) throw Throwable("no blocks") ​​​​​​​​var prev = Task(blocks[0]) ​​​​​​​​task = prev ​​​​​​​​prev.isStarted = Stat.MARK ​​​​​​​​for (i in 1..blocks.lastIndex) { ​​​​​​​​​​​​val task = Task(blocks[i]) ​​​​​​​​​​​​prev.next = task ​​​​​​​​​​​​prev = task ​​​​​​​​} ​​​​} ​​​​override fun run() { ​​​​​​​​while (!Thread.currentThread().isInterrupted) { ​​​​​​​​​​​​Thread.sleep(5) ​​​​​​​​​​​​if (task.isCompleted == Stat.MARK) { ​​​​​​​​​​​​​​​​task.next?.let { ​​​​​​​​​​​​​​​​​​​​it.isStarted = Stat.MARK ​​​​​​​​​​​​​​​​​​​​task = it ​​​​​​​​​​​​​​​​} ​​​​​​​​​​​​} ​​​​​​​​​​​​if (task.isStarted == Stat.MARK) { ​​​​​​​​​​​​​​​​task.run(Controller(task)) ​​​​​​​​​​​​​​​​task.isStarted = Stat.CONFIRM ​​​​​​​​​​​​} ​​​​​​​​} ​​​​} ​​​​fun launch() { ​​​​​​​​dispatcher.start(this) ​​​​} }

이 Class 의 목적은 Serial 된 Task 를 만들기 위함이다. TaskQueue 가 필요없다.
Maeng 교수님 강의를 따라가면 코드를 바꿔야 하는데, 나는 기존코드도 살리고 확장성도 좀 더 더해주고 싶어서 일단 코드를 아래처럼 바꿨다.

interface EventLooper : Runnable { ​​​​fun launch() } class SerialTask( ​​​​private val dispatcher: Dispatcher, ​​​​vararg blocks: (Controller) -> Unit ) : EventLooper { ​​​​private var task: Task ​​​​// 생략.. ​​​​override fun launch() { ​​​​​​​​dispatcher.start(this) ​​​​} }
fun main() { ​​​​val dispatcher = FixedDispatcher(10) ​​​​for (i in 0..5) { ​​​​​​​​val looper = SerialTask(dispatcher, { ​​​​​​​​​​​​println("$i-0 ${Thread.currentThread().id}") ​​​​​​​​​​​​Thread.sleep(i * 100L) ​​​​​​​​​​​​it.resume() ​​​​​​​​}, { ​​​​​​​​​​​​println("$i-0 ${Thread.currentThread().id}") ​​​​​​​​​​​​Thread.sleep(i * 100L) ​​​​​​​​​​​​it.resume() ​​​​​​​​}, { ​​​​​​​​​​​​println("$i-0 ${Thread.currentThread().id}") ​​​​​​​​​​​​Thread.sleep(i * 100L) ​​​​​​​​​​​​it.resume() ​​​​​​​​}) ​​​​​​​​looper.launch() ​​​​} } /* 4-0 19 0-0 15 2-0 17 5-0 20 1-0 16 3-0 18 0-1 15 0-2 15 1-1 16 2-1 17 1-2 16 3-1 18 4-1 19 2-2 17 5-1 20 3-2 18 4-2 19 5-2 20 */

이렇게 Serial Task 로 실행하면 장점은 무엇일까? Result 를 보면 알 수 있듯이, for 문을 돌면서 dispatcher 의 Thread 로 한번의 여러 가지 Serial Task 를 처리하게 되는 것이다. 이게 잘보면 하나의 SerialTask 는 동일한 Thread 로 실행된다. 이렇게 처리하게 되면 아까전 처럼 A -> B -> C 이런식으로 Sequencial 하게 처리되는게 아니라, 모든 Thread 들 사이에서 경쟁되며 사용될 수 있다. 좀 더 Thread 에서 효율적이다. synchronize 또한 없음을 확인할 수 있는데, 그 이유는 위에서 설명한대로 하나의 SerialTask 는 하나의 Thread 로 처리되기 때문이다.

이게 비선점형 멀티태스킹 방식의 장점인데 위 코드처럼 synchronize 를 할 필요가 없다. 다만 위의 코드는 SerialJob 간의 순서는 보장되지 않는다.

Continuation

class ContinuationTask( ​​​​private val dispatcher: Dispatcher, ​​​​isLazy: Boolean, ​​​​block: (Controller) -> Unit ) : EventLooper { ​​​​private val task: Task = Task(block) ​​​​init { ​​​​​​​​if (!isLazy) launch() ​​​​} ​​​​override fun run() { ​​​​​​​​while (!Thread.currentThread().isInterrupted) { ​​​​​​​​​​​​Thread.sleep(5) ​​​​​​​​​​​​if (task.isCompleted == Stat.MARK) break ​​​​​​​​​​​​if (task.isStarted == Stat.READY) { ​​​​​​​​​​​​​​​​task.isStarted == Stat.CONFIRM ​​​​​​​​​​​​​​​​task.run(task.continuation) ​​​​​​​​​​​​} ​​​​​​​​​​​​task.continuation.failed?.let {throw it} ​​​​​​​​} ​​​​} ​​​​override fun launch() { ​​​​​​​​dispatcher.start(this) ​​​​} } fun main() { ​​​​val dispatcher = FixedDispatcher(10) ​​​​for (i in 0..5) { ​​​​​​​​ContinuationTask(dispatcher, false) { ​​​​​​​​​​​​when (it.step) { ​​​​​​​​​​​​​​​​0 -> { ​​​​​​​​​​​​​​​​​​​​println("$i-0 ${Thread.currentThread().id}") ​​​​​​​​​​​​​​​​​​​​it.resume(1) ​​​​​​​​​​​​​​​​} ​​​​​​​​​​​​​​​​1 -> { ​​​​​​​​​​​​​​​​​​​​println("$i-0 ${Thread.currentThread().id}") ​​​​​​​​​​​​​​​​​​​​it.resume(2) ​​​​​​​​​​​​​​​​} ​​​​​​​​​​​​​​​​2 -> { ​​​​​​​​​​​​​​​​​​​​println("$i-0 ${Thread.currentThread().id}") ​​​​​​​​​​​​​​​​​​​​it.complete() ​​​​​​​​​​​​​​​​} ​​​​​​​​​​​​} ​​​​​​​​} ​​​​} ​​​​dispatcher.join() }

위의 코드를 보면 step(Coroutine 에서는 label) 별로 나누어져있는데, 이는 초반에 설명했듯이, 컴파일러가 yield() 나 suspend 등을 만나면 label 별로 나눠버리는 것이다. 이래서 suspend 가능한것이다. 원하는 step 으로 jump 뛸 수 있기에. 결국 원하는 subroutine 으로 jump 할 수 있다는 것이다. (코루틴도 Java Code 를 확인해보면 위와 같다.) 바로 이러한 스타일이 CPS 이다.

class ContinuationTask internal constructor( ​​​​val run: (Continuation) -> Unit ) { ​​​​internal var isStarted = Stat.READY ​​​​internal var isCompleted = Stat.READY ​​​​internal var continuation = Continuation(this) ​​​​internal var env: MutableMap<String, Any?> = mutableMapOf() } class Continuation internal constructor( ​​​​private val task: ContinuationTask, ) { ​​​​var step = 0 ​​​​​​​​private set ​​​​operator fun get(key: String): Any? = task.env[key] ​​​​operator fun set(key: String, value: Any?) { task.env[key] = value } ​​​​internal var failed: Throwable? = null ​​​​fun cancel(throwable: Throwable) { ​​​​​​​​failed = Throwable("step: $step, env: ${task.env}", throwable) ​​​​​​​​task.isCompleted = Stat.MARK ​​​​} ​​​​fun complete() { ​​​​​​​​task.isCompleted = Stat.MARK ​​​​} ​​​​fun resume(step: Int) { ​​​​​​​​this.step = step ​​​​​​​​task.isStarted = Stat.READY ​​​​} }

위의 코드를 보면 결국 나갔다 들어와도 상태가 저장되어 있을 수 있는 이유는 env 에 local variable 등을 저장해둘 수 있기 때문이다. 다만 이 영역은 Heap 에 저장된다. 보통의 Thread 는 자신의 Stack 변수에 보통 local variable 을 담아두고 Context Switching 이 일어날때 이를 전달해줘야 한다. 하지만, 지금과 같은 경우는 Heap 에 저장되므로 Context Switching 비용이 적다. (없다고 해야하나?)

강의를 들으면서 한 생각

isCanceled 와 예외처리를 만들면 이렇게 하면 되지 않을까?

그냥 위의 코드를 보면서 느낀건데 코루틴 코드상에서 isCanceled 상태를 만들고, UnCaughtException 이 발생하면 해당 Task 를 isCanceled = true 로 만들고, async 의 경우에는 await() 시점에 isCanceled 면 예외를 rethrow 하고, launch 의 경우에는 isCanceled 로 만듬과 동시에 rethrow 하고 있지 않을까? 라는 생각을 했다.

후기

평소에 CS 공부를 꾸준하게 해두기 잘했단 생각이 많이 들었다. 코루틴의 내부 구조가 어떻게 돌아갔는지 코드로 조금 정리해서 좀 더 깊게 이해하고, 대충 이렇게 돌겠네.. 라고 이해할 수 있었다. 읽고 있는 코루틴 책도 한번 더읽고, 다음 코드스피츠 코루틴 강좌는 꼭 신청해봐야겠다는 생각을 했다.

Github

https://github.com/tmdgusya/codespitz-kt/tree/master/src/main/kotlin/week6

 

GitHub - tmdgusya/codespitz-kt: codespitz 코틀린 영상을 보고 배운점을 정리합니다.

codespitz 코틀린 영상을 보고 배운점을 정리합니다. Contribute to tmdgusya/codespitz-kt development by creating an account on GitHub.

github.com

 

코드스피츠

https://www.youtube.com/watch?v=bOR2OBmJrMw&t=147s 

 

728x90

'Kotlin' 카테고리의 다른 글

Kotlin Coroutine) Channel  (0) 2022.07.15
Coroutine withContext 를 이용한 await 처리  (3) 2022.07.11
코루틴 빌더 예외처리  (0) 2022.07.07
CoroutineScope 과 Runblocking 의 차이  (0) 2022.07.05
Kotlin Coroutine Exception Handling  (0) 2022.07.05